From 872da0a2bfc13b16b773386dec5171b04bf12f8c Mon Sep 17 00:00:00 2001 From: Ahmed Zamil Date: Sat, 2 Apr 2022 22:36:52 +0300 Subject: [PATCH] Nullable --- lib/esiur.dart | 4 +- lib/src/Data/Codec.dart | 26 ++- lib/src/Data/DataDeserializer.dart | 229 ++++++++++---------- lib/src/Data/TransmissionType.dart | 12 +- lib/src/Net/IIP/DistributedConnection.dart | 223 +++++++++++-------- lib/src/Net/IIP/DistributedResource.dart | 4 +- lib/src/Net/Packets/IIPPacket.dart | 7 + lib/src/Resource/Template/TypeTemplate.dart | 2 +- lib/src/Resource/Warehouse.dart | 1 + 9 files changed, 281 insertions(+), 227 deletions(-) diff --git a/lib/esiur.dart b/lib/esiur.dart index 92ec964..89db845 100644 --- a/lib/esiur.dart +++ b/lib/esiur.dart @@ -1,4 +1,3 @@ - // Resource export 'src/Resource/Warehouse.dart'; export 'src/Resource/Instance.dart'; @@ -34,10 +33,11 @@ export 'src/Core/IEventHandler.dart'; export 'src/Core/Tuple.dart'; // ----------------------------------------------------------------- // Data +export 'src/Data/RepresentationType.dart'; +export 'src/Data/TransmissionType.dart'; export 'src/Data/AutoList.dart'; export 'src/Data/BinaryList.dart'; export 'src/Data/Codec.dart'; -export 'src/Data/RepresentationType.dart'; export 'src/Data/DC.dart'; export 'src/Data/Guid.dart'; export 'src/Data/KeyList.dart'; diff --git a/lib/src/Data/Codec.dart b/lib/src/Data/Codec.dart index 8406cb4..c3bb75f 100644 --- a/lib/src/Data/Codec.dart +++ b/lib/src/Data/Codec.dart @@ -65,8 +65,8 @@ import 'TransmissionType.dart'; // Type get valueType => VT; // } -typedef Parser = AsyncReply Function( - DC data, int offset, int length, DistributedConnection? connection); +typedef Parser = AsyncReply Function(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence); typedef Composer = DataSerializerComposeResults Function( dynamic value, DistributedConnection? connection); @@ -150,16 +150,20 @@ class Codec { /// DistributedConnection is required in case a structure in the array holds items at the other end. /// DataType, in case the data is not prepended with DataType /// Value - static CodecParseResults parse( - DC data, int offset, DistributedConnection? connection, + static CodecParseResults parse(DC data, int offset, + DistributedConnection? connection, List? requestSequence, [TransmissionType? dataType = null]) { int len = 0; + //print("Parse ${offset} ${dataType} ${requestSequence}"); + if (dataType == null) { var parsedDataTyped = TransmissionType.parse(data, offset, data.length); len = parsedDataTyped.size; dataType = parsedDataTyped.type; offset = dataType?.offset ?? 0; + + //print("Parse TT ${len} ${parsedDataTyped.type} ${dataType?.offset}"); } else len = dataType.contentLength; @@ -168,18 +172,22 @@ class Codec { return CodecParseResults( len, fixedParsers[dataType.exponent][dataType.index]( - data, dataType.offset, dataType.contentLength, connection)); + data, + dataType.offset, + dataType.contentLength, + connection, + requestSequence)); } else if (dataType.classType == TransmissionTypeClass.Dynamic) { return CodecParseResults( len, - dynamicParsers[dataType.index]( - data, dataType.offset, dataType.contentLength, connection)); + dynamicParsers[dataType.index](data, dataType.offset, + dataType.contentLength, connection, requestSequence)); } else //if (tt.Class == TransmissionTypeClass.Typed) { return CodecParseResults( len, - typedParsers[dataType.index]( - data, dataType.offset, dataType.contentLength, connection)); + typedParsers[dataType.index](data, dataType.offset, + dataType.contentLength, connection, requestSequence)); } } diff --git a/lib/src/Data/DataDeserializer.dart b/lib/src/Data/DataDeserializer.dart index 2bd132e..6ae1045 100644 --- a/lib/src/Data/DataDeserializer.dart +++ b/lib/src/Data/DataDeserializer.dart @@ -1,5 +1,5 @@ import 'dart:core'; - + import 'IEnum.dart'; import '../Core/Tuple.dart'; @@ -23,136 +23,136 @@ class PropertyValueParserResults { } class DataDeserializer { - static AsyncReply nullParser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply nullParser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { return AsyncReply.ready(null); } - static AsyncReply booleanTrueParser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply booleanTrueParser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { return new AsyncReply.ready(true); } - static AsyncReply booleanFalseParser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply booleanFalseParser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { return new AsyncReply.ready(false); } - static AsyncReply notModifiedParser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply notModifiedParser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { return new AsyncReply.ready(NotModified()); } - static AsyncReply byteParser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply byteParser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { return new AsyncReply.ready(data[offset]); } - static AsyncReply sByteParser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply sByteParser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { return new AsyncReply.ready( data[offset] > 127 ? data[offset] - 256 : data[offset]); } - static AsyncReply char16Parser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply char16Parser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { return AsyncReply.ready(data.getChar(offset)); } - static AsyncReply char8Parser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply char8Parser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { return new AsyncReply.ready(String.fromCharCode(data[offset])); } - static AsyncReply int16Parser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply int16Parser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { return AsyncReply.ready(data.getInt16(offset)); } - static AsyncReply uInt16Parser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply uInt16Parser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { return AsyncReply.ready(data.getUint16(offset)); } - static AsyncReply int32Parser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply int32Parser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { return AsyncReply.ready(data.getInt32(offset)); } - static AsyncReply uInt32Parser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply uInt32Parser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { return AsyncReply.ready(data.getUint32(offset)); } - static AsyncReply float32Parser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply float32Parser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { return AsyncReply.ready(data.getFloat32(offset)); } - static AsyncReply float64Parser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply float64Parser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { return AsyncReply.ready(data.getFloat64(offset)); } - static AsyncReply float128Parser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply float128Parser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { // @TODO return AsyncReply.ready(data.getFloat64(offset)); } - static AsyncReply int128Parser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply int128Parser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { // @TODO return AsyncReply.ready(data.getInt64(offset)); } - static AsyncReply uInt128Parser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply uInt128Parser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { return AsyncReply.ready(data.getUint64(offset)); } - static AsyncReply int64Parser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply int64Parser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { return AsyncReply.ready(data.getInt64(offset)); } - static AsyncReply uInt64Parser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply uInt64Parser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { return AsyncReply.ready(data.getUint64(offset)); } - static AsyncReply dateTimeParser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply dateTimeParser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { return AsyncReply.ready(data.getDateTime(offset)); } - static AsyncReply resourceParser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply resourceParser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { if (connection != null) { var id = data.getUint32(offset); - return connection.fetch(id); + return connection.fetch(id, requestSequence); } throw Exception("Can't parse resource with no connection"); } - static AsyncReply localResourceParser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply localResourceParser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { var id = data.getUint32(offset); return Warehouse.getById(id); } - static AsyncReply rawDataParser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply rawDataParser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { return new AsyncReply.ready(data.clip(offset, length)); } - static AsyncReply stringParser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply stringParser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { return new AsyncReply.ready(data.getString(offset, length)); } - static AsyncReply recordParser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply recordParser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { var reply = new AsyncReply(); var classId = data.getGuid(offset); @@ -161,61 +161,54 @@ class DataDeserializer { var template = Warehouse.getTemplateByClassId(classId, TemplateType.Record); + var initRecord = (TypeTemplate template) => + listParser(data, offset, length, connection, requestSequence).then((r) { + var ar = r as List; + IRecord record; + + if (template.definedType != null) { + record = Warehouse.createInstance(template.definedType!) as IRecord; + } else { + record = Record(); + } + + var kv = Map(); + + for (var i = 0; i < template.properties.length; i++) + kv[template.properties[i].name] = ar[i]; + + record.deserialize(kv); + + reply.trigger(record); + }); + if (template != null) { - listParser(data, offset, length, connection).then((r) { - var ar = r as List; - IRecord record; - - if (template.definedType != null) { - record = Warehouse.createInstance(template.definedType!) as IRecord; - } else { - record = Record(); - } - - var kv = Map(); - - for (var i = 0; i < template.properties.length; i++) - kv[template.properties[i].name] = ar[i]; - - record.deserialize(kv); - - reply.trigger(record); - }); + initRecord(template); } else { if (connection == null) throw Exception("Can't parse record with no connection"); connection.getTemplate(classId).then((tmp) { if (tmp == null) - reply.triggerError(Exception("Couldn't fetch record template.")); - - listParser(data, offset, length, connection).then((r) { - var ar = r as List; - - var record = new Record(); - - var kv = Map(); - - for (var i = 0; i < tmp!.properties.length; i++) - kv[tmp.properties[i].name] = ar[i]; - - record.deserialize(kv); - - reply.trigger(record); - }); + reply.triggerError(AsyncException( + ErrorType.Management, + ExceptionCode.TemplateNotFound.index, + "Template not found for record.")); + else + initRecord(tmp); }).error((x) => reply.triggerError(x)); } return reply; } - static AsyncReply constantParser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply constantParser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { throw Exception("NotImplementedException"); } - static AsyncReply enumParser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply enumParser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { var classId = data.getGuid(offset); offset += 16; var index = data[offset++]; @@ -258,12 +251,12 @@ class DataDeserializer { } } - static AsyncReply recordListParser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply recordListParser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { var rt = new AsyncBag(); while (length > 0) { - var parsed = Codec.parse(data, offset, connection); + var parsed = Codec.parse(data, offset, connection, requestSequence); rt.add(parsed.reply); @@ -278,12 +271,12 @@ class DataDeserializer { return rt; } - static AsyncReply resourceListParser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply resourceListParser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { var rt = new AsyncBag(); while (length > 0) { - var parsed = Codec.parse(data, offset, connection); + var parsed = Codec.parse(data, offset, connection, requestSequence); rt.add(parsed.reply); @@ -298,12 +291,12 @@ class DataDeserializer { return rt; } - static AsyncBag listParser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncBag listParser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { var rt = new AsyncBag(); while (length > 0) { - var parsed = Codec.parse(data, offset, connection); + var parsed = Codec.parse(data, offset, connection, requestSequence); rt.add(parsed.reply); @@ -318,8 +311,8 @@ class DataDeserializer { return rt; } - static AsyncReply typedMapParser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply typedMapParser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { // get key type var keyRep = RepresentationType.parse(data, offset); offset += keyRep.size; @@ -329,15 +322,13 @@ class DataDeserializer { offset += valueRep.size; length -= valueRep.size; - - var map = Map(); var rt = new AsyncReply(); var results = new AsyncBag(); while (length > 0) { - var parsed = Codec.parse(data, offset, connection); + var parsed = Codec.parse(data, offset, connection, requestSequence); results.add(parsed.reply); @@ -359,8 +350,8 @@ class DataDeserializer { return rt; } - static AsyncReply tupleParser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply tupleParser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { var results = new AsyncBag(); var rt = new AsyncReply(); @@ -377,7 +368,7 @@ class DataDeserializer { } while (length > 0) { - var parsed = Codec.parse(data, offset, connection); + var parsed = Codec.parse(data, offset, connection, requestSequence); results.add(parsed.reply); @@ -397,8 +388,8 @@ class DataDeserializer { return rt; } - static AsyncReply typedListParser( - DC data, int offset, int length, DistributedConnection? connection) { + static AsyncReply typedListParser(DC data, int offset, int length, + DistributedConnection? connection, List? requestSequence) { var rt = new AsyncBag(); // get the type @@ -412,7 +403,7 @@ class DataDeserializer { rt.arrayType = runtimeType; while (length > 0) { - var parsed = Codec.parse(data, offset, connection); + var parsed = Codec.parse(data, offset, connection, requestSequence); rt.add(parsed.reply); @@ -431,11 +422,12 @@ class DataDeserializer { DC data, int offset, int length, - DistributedConnection? connection) //, bool ageIncluded = true) + DistributedConnection? connection, + List? requestSequence) //, bool ageIncluded = true) { var rt = new AsyncBag(); - listParser(data, offset, length, connection).then((x) { + listParser(data, offset, length, connection, requestSequence).then((x) { var pvs = []; for (var i = 0; i < x.length; i += 3) @@ -447,8 +439,11 @@ class DataDeserializer { return rt; } - static PropertyValueParserResults propertyValueParser(DC data, int offset, - DistributedConnection? connection) //, bool ageIncluded = true) + static PropertyValueParserResults propertyValueParser( + DC data, + int offset, + DistributedConnection? connection, + List? requestSequence) //, bool ageIncluded = true) { var reply = new AsyncReply(); @@ -458,7 +453,7 @@ class DataDeserializer { DateTime date = data.getDateTime(offset); offset += 8; - var parsed = Codec.parse(data, offset, connection); + var parsed = Codec.parse(data, offset, connection, requestSequence); parsed.reply.then((value) { reply.trigger(new PropertyValue(value, age, date)); @@ -469,7 +464,7 @@ class DataDeserializer { static AsyncReply>> historyParser(DC data, int offset, int length, IResource resource, - DistributedConnection? connection) { + DistributedConnection? connection, List? requestSequence) { throw Exception("Not implemented"); // @TODO // var list = new KeyList>(); diff --git a/lib/src/Data/TransmissionType.dart b/lib/src/Data/TransmissionType.dart index a2f8d62..50f1ade 100644 --- a/lib/src/Data/TransmissionType.dart +++ b/lib/src/Data/TransmissionType.dart @@ -69,6 +69,11 @@ class TransmissionType { this.offset, this.contentLength, [this.exponent = 0]); + @override + String toString() { + return "Id: ${identifier}, Index: ${index}, Class: ${classType}, Offset: ${offset}, ContentLength: ${contentLength}, Exp: $exponent"; + } + static DC compose(int identifier, DC data) { if (data.length == 0) return DC.fromList([identifier]); @@ -168,7 +173,7 @@ class TransmissionType { int cl = (1 << (exp - 1)); if (ends - offset < cl) - return TransmissionTypeParseResults(ends - offset - cl, null); + return TransmissionTypeParseResults(cl - (ends - offset), null); return TransmissionTypeParseResults( 1 + cl, new TransmissionType(h, cls, h & 0x7, offset, cl, exp)); @@ -176,12 +181,15 @@ class TransmissionType { int cll = (h >> 3) & 0x7; if (ends - offset < cll) - return TransmissionTypeParseResults(ends - offset - cll, null); + return TransmissionTypeParseResults(cll - (ends - offset), null); int cl = 0; for (var i = 0; i < cll; i++) cl = cl << 8 | data[offset++]; + if (ends - offset < cl) + return TransmissionTypeParseResults(cl - (ends - offset), null); + return TransmissionTypeParseResults( 1 + cl + cll, TransmissionType((h & 0xC7), cls, h & 0x7, offset, cl)); } diff --git a/lib/src/Net/IIP/DistributedConnection.dart b/lib/src/Net/IIP/DistributedConnection.dart index ac90376..cd542b4 100644 --- a/lib/src/Net/IIP/DistributedConnection.dart +++ b/lib/src/Net/IIP/DistributedConnection.dart @@ -123,6 +123,7 @@ class DistributedConnection extends NetworkConnection with IStore { KeyList> _resourceRequests = new KeyList>(); + KeyList> _templateRequests = new KeyList>(); //KeyList> _pathRequests = new KeyList>(); @@ -325,7 +326,7 @@ class DistributedConnection extends NetworkConnection with IStore { for (var i = 0; i < _resources.keys.length; i++) { var index = _resources.keys.elementAt(i); // print("Re $i ${_resources[index].instance.template.className}"); - bag.add(fetch(index)); + bag.add(fetch(index, null)); } bag.seal(); @@ -461,7 +462,9 @@ class DistributedConnection extends NetworkConnection with IStore { String? link(IResource resource) { if (resource is DistributedResource) { if (resource.instance?.store == this) - return (this.instance?.name ?? "") + "/" + resource.distributedResourceInstanceId.toString(); + return (this.instance?.name ?? "") + + "/" + + resource.distributedResourceInstanceId.toString(); } return null; @@ -486,11 +489,11 @@ class DistributedConnection extends NetworkConnection with IStore { var packet = new IIPPacket(); if (_ready) { - print("Inc " + msg.length.toString()); + //print("Inc " + msg.length.toString()); var rt = packet.parse(msg, offset, ends); - print("Packet " + packet.toString()); + //print("Packet " + packet.toString()); if (rt <= 0) { // print("hold"); @@ -960,8 +963,7 @@ class DistributedConnection extends NetworkConnection with IStore { @override void dataReceived(NetworkBuffer data) { - print("dataReceived"); - // Console.WriteLine("DR " + hostType + " " + data.Available + " " + RemoteEndPoint.ToString()); + // print("dataReceived"); var msg = data.read(); int offset = 0; @@ -996,7 +998,9 @@ class DistributedConnection extends NetworkConnection with IStore { AsyncReply put(IResource resource) { if (Codec.isLocalResource(resource, this)) _resources.add( - (resource as DistributedResource).distributedResourceInstanceId as int, resource); + (resource as DistributedResource).distributedResourceInstanceId + as int, + resource); // else .. put it in the server.... return AsyncReply.ready(true); } @@ -1140,10 +1144,11 @@ class DistributedConnection extends NetworkConnection with IStore { req?.trigger(results); } + // @TODO: check for deadlocks void iipReplyInvoke(int callbackId, TransmissionType dataType, DC data) { var req = _requests.take(callbackId); - Codec.parse(data, 0, this, dataType).reply.then((rt) { + Codec.parse(data, 0, this, null, dataType).reply.then((rt) { req?.trigger(rt); }); } @@ -1160,10 +1165,11 @@ class DistributedConnection extends NetworkConnection with IStore { req?.triggerProgress(type, value, max); } + // @TODO: Check for deadlocks void iipReportChunk(int callbackId, TransmissionType dataType, DC data) { if (_requests.containsKey(callbackId)) { var req = _requests[callbackId]; - Codec.parse(data, 0, this, dataType).reply.then((x) { + Codec.parse(data, 0, this, null, dataType).reply.then((x) { req?.triggerChunk(x); }); } @@ -1179,13 +1185,14 @@ class DistributedConnection extends NetworkConnection with IStore { } } + // @TODO: Check for deadlocks void iipEventPropertyUpdated( int resourceId, int index, TransmissionType dataType, DC data) { - fetch(resourceId).then((r) { + fetch(resourceId, null).then((r) { var item = new AsyncReply(); _queue.add(item); - Codec.parse(data, 0, this, dataType).reply.then((arguments) { + Codec.parse(data, 0, this, null, dataType).reply.then((arguments) { var pt = r.instance?.template.getPropertyTemplateByIndex(index); if (pt != null) { item.trigger(DistributedResourceQueueItem( @@ -1234,14 +1241,15 @@ class DistributedConnection extends NetworkConnection with IStore { */ } + // @TODO: Check for deadlocks void iipEventEventOccurred( int resourceId, int index, TransmissionType dataType, DC data) { - fetch(resourceId).then((r) { + fetch(resourceId, null).then((r) { // push to the queue to gaurantee serialization var item = new AsyncReply(); _queue.add(item); - Codec.parse(data, 0, this, dataType).reply.then((arguments) { + Codec.parse(data, 0, this, null, dataType).reply.then((arguments) { var et = r.instance?.template.getEventTemplateByIndex(index); if (et != null) { item.trigger(new DistributedResourceQueueItem( @@ -1289,26 +1297,29 @@ class DistributedConnection extends NetworkConnection with IStore { */ } + // @TODO: check for deadlocks void iipEventChildAdded(int resourceId, int childId) { - fetch(resourceId).then((parent) { + fetch(resourceId, null).then((parent) { if (parent != null) - fetch(childId).then((child) { + fetch(childId, null).then((child) { if (child != null) parent.instance?.children.add(child); }); }); } +// @TODO: check for deadlocks void iipEventChildRemoved(int resourceId, int childId) { - fetch(resourceId).then((parent) { + fetch(resourceId, null).then((parent) { if (parent != null) - fetch(childId).then((child) { + fetch(childId, null).then((child) { if (child != null) parent.instance?.children.remove(child); }); }); } +// @TODO: check for deadlocks void iipEventRenamed(int resourceId, String name) { - fetch(resourceId) + fetch(resourceId, null) ..then((resource) { if (resource != null) { resource.instance?.attributes["name"] = name; @@ -1316,8 +1327,9 @@ class DistributedConnection extends NetworkConnection with IStore { }); } +// @TODO: check for deadlocks void iipEventAttributesUpdated(int resourceId, DC attributes) { - fetch(resourceId) + fetch(resourceId, null) ..then((resource) { if (resource != null) { var attrs = attributes.getStringArray(0, attributes.length); @@ -1511,16 +1523,17 @@ class DistributedConnection extends NetworkConnection with IStore { return; } - DataDeserializer.listParser(content, offset, cl, this) +// @TODO: check for deadlocks + DataDeserializer.listParser(content, offset, cl, this, null) .then((parameters) { offset += cl; cl = content.getUint32(offset); - DataDeserializer.typedMapParser(content, offset, cl, this) + DataDeserializer.typedMapParser(content, offset, cl, this, null) .then((attributes) { offset += cl; cl = content.length - offset; - DataDeserializer.typedMapParser(content, offset, cl, this) + DataDeserializer.typedMapParser(content, offset, cl, this, null) .then((values) { var constructors = []; //Type.GetType(className).GetTypeInfo().GetConstructors(); @@ -1813,7 +1826,8 @@ class DistributedConnection extends NetworkConnection with IStore { return; } - DataDeserializer.typedListParser(attributes, 0, attributes.length, this) + DataDeserializer.typedListParser( + attributes, 0, attributes.length, this, null) .then((attrs) { if (r.instance?.setAttributes( attrs as Map, clearAttributes) == @@ -1947,11 +1961,12 @@ class DistributedConnection extends NetworkConnection with IStore { void IIPRequestResourceAttribute(int callback, int resourceId) {} +// @TODO: Check for deadlocks void iipRequestInvokeFunction(int callback, int resourceId, int index, TransmissionType dataType, DC data) { Warehouse.getById(resourceId).then((r) { if (r != null) { - Codec.parse(data, 0, this, dataType).reply.then((arguments) { + Codec.parse(data, 0, this, null, dataType).reply.then((arguments) { var ft = r.instance?.template.getFunctionTemplateByIndex(index); if (ft != null) { if (r is DistributedResource) { @@ -2160,13 +2175,14 @@ class DistributedConnection extends NetworkConnection with IStore { // }); // } +// @TODO: Check for deadlocks void iipRequestSetProperty(int callback, int resourceId, int index, TransmissionType dataType, DC data) { Warehouse.getById(resourceId).then((r) { if (r != null) { var pt = r.instance?.template.getPropertyTemplateByIndex(index); if (pt != null) { - Codec.parse(data, 0, this, dataType).reply.then((value) { + Codec.parse(data, 0, this, null, dataType).reply.then((value) { if (r is DistributedResource) { // propagation (r as DistributedResource).set(index, value).then((x) { @@ -2238,6 +2254,8 @@ class DistributedConnection extends NetworkConnection with IStore { /// Class GUID. /// TypeTemplate. AsyncReply getTemplate(Guid classId) { + //Warehouse.getTemplateByClassId(classId) + if (_templates.containsKey(classId)) return AsyncReply.ready(_templates[classId]); else if (_templateRequests.containsKey(classId)) @@ -2338,99 +2356,110 @@ class DistributedConnection extends NetworkConnection with IStore { /// Class GUID /// Resource IdGuid classId /// DistributedResource - AsyncReply fetch(int id) { + AsyncReply fetch(int id, List? requestSequence) { var resource = _resources[id]; var request = _resourceRequests[id]; + //print("fetch $id"); + if (request != null) { - // dig for dead locks - if (resource != null) // dead lock + if (resource != null && (requestSequence?.contains(id) ?? false)) return AsyncReply.ready(resource); - else - return request; + return request; } else if (resource != null && !resource.distributedResourceSuspended) return new AsyncReply.ready(resource); var reply = new AsyncReply(); _resourceRequests.add(id, reply); + //print("AttachResource sent ${id}"); + + var newSequence = + requestSequence != null ? List.from(requestSequence) : []; + + newSequence.add(id); + (sendRequest(IIPPacketAction.AttachResource)..addUint32(id)).done() ..then((rt) { - if (rt != null) { - // @TODO: Generator code - DistributedResource dr; + //print("AttachResource rec ${id}"); - if (resource == null) { - var template = Warehouse.getTemplateByClassId( - rt[0] as Guid, TemplateType.Wrapper); - if (template?.definedType != null) { - dr = Warehouse.createInstance(template?.definedType as Type); - dr.internal_init(this, id, rt[1] as int, rt[2] as String); - } else { - dr = new DistributedResource(); - dr.internal_init(this, id, rt[1] as int, rt[2] as String); - } - } else - dr = resource; + // Resource not found (null) + if (rt == null) { + //print("Null response"); + reply.triggerError(AsyncException(ErrorType.Management, + ExceptionCode.ResourceNotFound.index, "Null response")); + return; + } - //var dr = resource ?? new DistributedResource(this, id, rt[1], rt[2]); + DistributedResource dr; + TypeTemplate? template; - TransmissionType transmissionType = rt[3] as TransmissionType; - DC content = rt[4] as DC; + Guid classId = rt[0] as Guid; + if (resource == null) { + template = + Warehouse.getTemplateByClassId(classId, TemplateType.Wrapper); + if (template?.definedType != null) { + dr = Warehouse.createInstance(template?.definedType as Type); + dr.internal_init(this, id, rt[1] as int, rt[2] as String); + } else { + dr = new DistributedResource(); + dr.internal_init(this, id, rt[1] as int, rt[2] as String); + } + } else + dr = resource; + + TransmissionType transmissionType = rt[3] as TransmissionType; + DC content = rt[4] as DC; + + var initResource = (ok) { + print("parse req ${id}"); + + Codec.parse(content, 0, this, newSequence, transmissionType) + .reply + .then((results) { + //print("parsed ${id}"); + + var pvs = []; + var ar = results as List; + + for (var i = 0; i < ar.length; i += 3) + pvs.add(new PropertyValue( + ar[i + 2], ar[i] as int, ar[i + 1] as DateTime)); + + dr.internal_attach(pvs); + + _resourceRequests.remove(id); + reply.trigger(dr); + }) + ..error((ex) => reply.triggerError(ex)); + }; + + if (template == null) { + //print("tmp == null"); getTemplate(rt[0] as Guid) ..then((tmp) { - //print("New template "); - // ClassId, ResourceAge, ResourceLink, Content if (resource == null) { Warehouse.put(id.toString(), dr, this, null, tmp) - ..then((ok) { - Codec.parse(content, 0, this, transmissionType) - .reply - .then((results) { - var pvs = []; - var ar = results as List; - - for (var i = 0; i < ar.length; i += 3) - pvs.add(new PropertyValue( - ar[i + 2], ar[i] as int, ar[i + 1] as DateTime)); - - dr.internal_attach(pvs); - - _resourceRequests.remove(id); - reply.trigger(dr); - }) - ..error((ex) => reply.triggerError(ex)); - }) + ..then(initResource) ..error((ex) => reply.triggerError(ex)); } else { - Codec.parse(content, 0, this, transmissionType) - .reply - .then((results) { - //print("attached"); - if (results != null) { - var pvs = []; - - var ar = results as List; - for (var i = 0; i < ar.length; i += 3) - pvs.add(new PropertyValue( - ar[i + 2], ar[i] as int, ar[i + 1] as DateTime)); - - dr.internal_attach(pvs); - } - - _resourceRequests.remove(id); - reply.trigger(dr); - }) - ..error((ex) => reply.triggerError(ex)); + initResource(resource); } }) ..error((ex) { reply.triggerError(ex); }); } else { - reply.triggerError(Exception("Null response")); + //print("tmp != null"); + if (resource == null) { + Warehouse.put(id.toString(), dr, this, null, template) + ..then(initResource) + ..error((ex) => reply.triggerError(ex)); + } else { + initResource(resource); + } } }) ..error((ex) { @@ -2440,6 +2469,7 @@ class DistributedConnection extends NetworkConnection with IStore { return reply; } +// @TODO: Check for deadlocks AsyncReply> getChildren(IResource resource) { var rt = new AsyncReply>(); @@ -2450,7 +2480,7 @@ class DistributedConnection extends NetworkConnection with IStore { TransmissionType dataType = ar[0] as TransmissionType; DC data = ar[1] as DC; - Codec.parse(data, 0, this, dataType).reply.then((resources) { + Codec.parse(data, 0, this, null, dataType).reply.then((resources) { rt.trigger(resources as List); }) ..error((ex) => rt.triggerError(ex)); @@ -2462,6 +2492,7 @@ class DistributedConnection extends NetworkConnection with IStore { return rt; } +// @TODO: Check for deadlocks AsyncReply> getParents(IResource resource) { var rt = new AsyncReply>(); @@ -2471,7 +2502,7 @@ class DistributedConnection extends NetworkConnection with IStore { if (ar != null) { TransmissionType dataType = ar[0] as TransmissionType; DC data = ar[1] as DC; - Codec.parse(data, 0, this, dataType).reply.then((resources) { + Codec.parse(data, 0, this, null, dataType).reply.then((resources) { rt.trigger(resources as List); }) ..error((ex) => rt.triggerError(ex)); @@ -2524,6 +2555,7 @@ class DistributedConnection extends NetworkConnection with IStore { return rt; } +// @TODO: Check for deadlocks AsyncReply> getAttributes(IResource resource, [List? attributes = null]) { var rt = new AsyncReply>(); @@ -2537,7 +2569,7 @@ class DistributedConnection extends NetworkConnection with IStore { TransmissionType dataType = ar[0] as TransmissionType; DC data = ar[1] as DC; - Codec.parse(data, 0, this, dataType).reply.then((st) { + Codec.parse(data, 0, this, null, dataType).reply.then((st) { resource.instance?.setAttributes(st as Map); rt.trigger(st as Map); }) @@ -2560,7 +2592,7 @@ class DistributedConnection extends NetworkConnection with IStore { TransmissionType dataType = ar[0] as TransmissionType; DC data = ar[1] as DC; - Codec.parse(data, 0, this, dataType).reply + Codec.parse(data, 0, this, null, dataType).reply ..then((st) { resource.instance?.setAttributes(st as Map); @@ -2606,7 +2638,7 @@ class DistributedConnection extends NetworkConnection with IStore { var content = rt[0] as DC; DataDeserializer.historyParser( - content, 0, content.length, resource, this) + content, 0, content.length, resource, this, null) .then((history) => reply.trigger(history)); } else { reply.triggerError(Exception("Null response")); @@ -2624,6 +2656,7 @@ class DistributedConnection extends NetworkConnection with IStore { /// /// Link path. /// +// @TODO: Check for deadlocks AsyncReply> query(String path) { var str = DC.stringToBytes(path); var reply = new AsyncReply>(); @@ -2636,7 +2669,7 @@ class DistributedConnection extends NetworkConnection with IStore { TransmissionType dataType = ar[0] as TransmissionType; DC data = ar[1] as DC; - Codec.parse(data, 0, this, dataType).reply.then((resources) => + Codec.parse(data, 0, this, null, dataType).reply.then((resources) => reply.trigger((resources as List).cast())) ..error((ex) => reply.triggerError(ex)); } else { @@ -2680,7 +2713,7 @@ class DistributedConnection extends NetworkConnection with IStore { if (args != null) { var rid = args[0]; - fetch(rid as int).then((r) { + fetch(rid as int, null).then((r) { reply.trigger(r); }); } else { diff --git a/lib/src/Net/IIP/DistributedResource.dart b/lib/src/Net/IIP/DistributedResource.dart index 554240f..3708dd3 100644 --- a/lib/src/Net/IIP/DistributedResource.dart +++ b/lib/src/Net/IIP/DistributedResource.dart @@ -45,6 +45,8 @@ import '../Packets/IIPPacketAction.dart'; import '../../Resource/Template/EventTemplate.dart'; + + class DistributedResource extends IResource { int? _instanceId; DistributedConnection? _connection; @@ -105,7 +107,7 @@ class DistributedResource extends IResource { /// /// Resource is attached when all its properties are received. /// - bool get attached => _attached; + bool get distributedResourceAttached => _attached; // public DistributedResourceStack Stack //{ diff --git a/lib/src/Net/Packets/IIPPacket.dart b/lib/src/Net/Packets/IIPPacket.dart index fc78a03..eb01a8c 100644 --- a/lib/src/Net/Packets/IIPPacket.dart +++ b/lib/src/Net/Packets/IIPPacket.dart @@ -397,6 +397,13 @@ class IIPPacket { if (parsed.type == null) return -parsed.size; + //print("Not enough ${parsed.size}"); + + // } else { + // print( + // "attach parsed ${parsed.size} ${cl} ${data.length} ${ends} ${offset}"); + // } + dataType = parsed.type; offset += parsed.size; } else if (action == IIPPacketAction.DetachResource) { diff --git a/lib/src/Resource/Template/TypeTemplate.dart b/lib/src/Resource/Template/TypeTemplate.dart index 58a875c..d6183a2 100644 --- a/lib/src/Resource/Template/TypeTemplate.dart +++ b/lib/src/Resource/Template/TypeTemplate.dart @@ -627,7 +627,7 @@ class TypeTemplate { offset += dt.size; - var parsed = Codec.parse(data, offset, null); + var parsed = Codec.parse(data, offset, null, null); offset += parsed.size; diff --git a/lib/src/Resource/Warehouse.dart b/lib/src/Resource/Warehouse.dart index db84a92..14adad3 100644 --- a/lib/src/Resource/Warehouse.dart +++ b/lib/src/Resource/Warehouse.dart @@ -68,6 +68,7 @@ class Warehouse { rt.add(TemplateType.Resource, new KeyList()); rt.add(TemplateType.Record, new KeyList()); rt.add(TemplateType.Wrapper, new KeyList()); + rt.add(TemplateType.Enum, new KeyList()); return rt; }