From 083ea92d3ba32311c4a48dffd442b125c133f11d Mon Sep 17 00:00:00 2001 From: Esiur Project Date: Thu, 11 Aug 2022 21:28:41 +0300 Subject: [PATCH] AutoReconnect --- lib/src/Core/AsyncQueue.dart | 4 +- lib/src/Core/AsyncReply.dart | 15 +- lib/src/Core/ExceptionCode.dart | 3 +- lib/src/Net/IIP/DistributedConnection.dart | 593 ++++++++++++--------- lib/src/Net/IIP/DistributedResource.dart | 21 +- lib/src/Net/NetworkConnection.dart | 1 - lib/src/Net/Sockets/TCPSocket.dart | 2 + lib/src/Net/Sockets/WSocket.dart | 2 + lib/src/Resource/IStore.dart | 2 +- lib/src/Resource/Warehouse.dart | 35 +- 10 files changed, 407 insertions(+), 271 deletions(-) diff --git a/lib/src/Core/AsyncQueue.dart b/lib/src/Core/AsyncQueue.dart index 6ab4c18..bf0f061 100644 --- a/lib/src/Core/AsyncQueue.dart +++ b/lib/src/Core/AsyncQueue.dart @@ -12,7 +12,7 @@ class AsyncQueue extends AsyncReply { _list.add(reply); //super._resultReady = false; - super.setResultReady(false); + super.ready = false; // setResultReady(false); reply.then(processQueue); } @@ -35,7 +35,7 @@ class AsyncQueue extends AsyncReply { break; //super._resultReady = (_list.length == 0); - super.setResultReady(_list.length == 0); + super.ready = _list.length == 0; // .setResultReady(_list.length == 0); } AsyncQueue() {} diff --git a/lib/src/Core/AsyncReply.dart b/lib/src/Core/AsyncReply.dart index 315b2a9..9267556 100644 --- a/lib/src/Core/AsyncReply.dart +++ b/lib/src/Core/AsyncReply.dart @@ -23,6 +23,8 @@ SOFTWARE. */ import 'dart:async'; import 'dart:core'; +import 'package:esiur/esiur.dart'; + import 'AsyncException.dart'; import 'ProgressType.dart'; @@ -55,9 +57,9 @@ class AsyncReply implements Future { return _result; } - void setResultReady(bool val) { - _resultReady = val; - } + // void setResultReady(bool val) { + // _resultReady = val; + // } AsyncReply next(Function(T) callback) { then(callback); @@ -127,7 +129,11 @@ class AsyncReply implements Future { AsyncReply timeout(Duration timeLimit, {FutureOr onTimeout()?}) { Future.delayed(timeLimit, () { - if (!_resultReady && _exception == null) onTimeout?.call(); + if (!_resultReady && _exception == null) { + triggerError(AsyncException(ErrorType.Management, + ExceptionCode.Timeout.index, "Execution timeout expired")); + onTimeout?.call(); + } }); return this; @@ -152,6 +158,7 @@ class AsyncReply implements Future { AsyncReply trigger(T result) { if (_resultReady) return this; + if (_exception != null) return this; _result = result; _resultReady = true; diff --git a/lib/src/Core/ExceptionCode.dart b/lib/src/Core/ExceptionCode.dart index 85fd342..d59dea7 100644 --- a/lib/src/Core/ExceptionCode.dart +++ b/lib/src/Core/ExceptionCode.dart @@ -34,5 +34,6 @@ enum ExceptionCode { AlreadyListened, AlreadyUnlistened, NotListenable, - ParseError + ParseError, + Timeout } diff --git a/lib/src/Net/IIP/DistributedConnection.dart b/lib/src/Net/IIP/DistributedConnection.dart index 27f5f93..c20526a 100644 --- a/lib/src/Net/IIP/DistributedConnection.dart +++ b/lib/src/Net/IIP/DistributedConnection.dart @@ -122,8 +122,13 @@ class DistributedConnection extends NetworkConnection with IStore { bool _ready = false, _readyToEstablish = false; - KeyList _resources = + KeyList> _attachedResources = + new KeyList>(); + + KeyList _neededResources = new KeyList(); + KeyList> _suspendedResources = + new KeyList>(); KeyList> _resourceRequests = new KeyList>(); @@ -285,6 +290,7 @@ class DistributedConnection extends NetworkConnection with IStore { _session?.localAuthentication.domain = domain; _session?.localAuthentication.username = username; _localPasswordOrToken = passwordOrToken; + _invalidCredentials = false; } if (_session == null) @@ -302,75 +308,149 @@ class DistributedConnection extends NetworkConnection with IStore { if (_hostname == null) throw Exception("Host not specified."); - if (socket != null) { - socket.connect(_hostname as String, _port) - ..then((x) { - assign(socket as ISocket); - }) - ..error((x) { - _openReply?.triggerError(x); - _openReply = null; - }); - } + _connectSocket(socket); return _openReply as AsyncReply; } + _connectSocket(ISocket socket) { + socket.connect(_hostname as String, _port) + ..then((x) { + assign(socket); + }) + ..error((x) { + if (autoReconnect) { + print("Reconnecting socket..."); + Future.delayed(Duration(seconds: 5), () => _connectSocket(socket)); + } else { + _openReply?.triggerError(x); + _openReply = null; + } + }); + } + + bool autoReconnect = false; + bool _invalidCredentials = false; + @override void disconnected() { // clean up _ready = false; _readyToEstablish = false; - _requests.values.forEach((x) { try { - x.triggerError(AsyncException(ErrorType.Management, 0, "Connection closed")); - } catch (ex){ } - }); + print("Disconnected .."); - _resourceRequests.values.forEach((x) { try { - x.triggerError(AsyncException(ErrorType.Management, 0, "Connection closed")); - } catch (ex){ } - }); + _keepAliveTimer?.cancel(); + _keepAliveTimer = null; - _templateRequests.values.forEach((x) { try { - x.triggerError(AsyncException(ErrorType.Management, 0, "Connection closed")); - } catch (ex){ } - }); + _requests.values.forEach((x) { + try { + x.triggerError( + AsyncException(ErrorType.Management, 0, "Connection closed")); + } catch (ex) {} + }); + + _resourceRequests.values.forEach((x) { + try { + x.triggerError( + AsyncException(ErrorType.Management, 0, "Connection closed")); + } catch (ex) {} + }); + + _templateRequests.values.forEach((x) { + try { + x.triggerError( + AsyncException(ErrorType.Management, 0, "Connection closed")); + } catch (ex) {} + }); _requests.clear(); _resourceRequests.clear(); _templateRequests.clear(); - - - if (server != null){ - // @TODO: check if we need this with reconnect - _resources.values.forEach((x) => x.suspend()); - _unsubscribeAll(); - Warehouse.remove(this); + for (var x in _attachedResources.values) { + var r = x.target; + if (r != null) { + r.suspend(); + _suspendedResources[r.distributedResourceInstanceId ?? 0] = x; + } } + if (server != null) { + _suspendedResources.clear(); + _unsubscribeAll(); + Warehouse.remove(this); + + // @TODO: implement this + // if (ready) + // _server.membership?.Logout(session); + + } else if (autoReconnect && !_invalidCredentials) { + Future.delayed(Duration(seconds: 5), reconnect); + } else { + _suspendedResources.clear(); + } + + _attachedResources.clear(); + _ready = false; } Future reconnect() async { - if (await connect()) { - var bag = AsyncBag(); + try { + if (!await connect()) return false; - for (var i = 0; i < _resources.keys.length; i++) { - var index = _resources.keys.elementAt(i); - // print("Re $i ${_resources[index].instance.template.className}"); - bag.add(fetch(index, null)); + try { + var toBeRestored = []; + + _suspendedResources.forEach((key, value) { + var r = value.target; + if (r != null) toBeRestored.add(r); + }); + + for (var r in toBeRestored) { + var link = DC.stringToBytes(r.distributedResourceLink ?? ""); + + print("Restoring " + (r.distributedResourceLink ?? "")); + + try { + var ar = await (sendRequest(IIPPacketAction.QueryLink) + ..addUint16(link.length) + ..addDC(link)) + .done(); + + var dataType = ar?[0] as TransmissionType; + var data = ar?[1] as DC; + + if (dataType.identifier == + TransmissionTypeIdentifier.ResourceList) { + // parse them as int + var id = data.getUint32(8); + if (id != r.distributedResourceInstanceId) + r.distributedResourceInstanceId = id; + + _neededResources[id] = r; + _suspendedResources.remove(id); + + await fetch(id, null); + } + } catch (ex) { + if (ex is AsyncException && + ex.code == ExceptionCode.ResourceNotFound) { + // skip this resource + } else { + break; + } + } + } + } catch (ex) { + print(ex); } - - bag.seal(); - - await bag; - - return true; + } catch (ex) { + return false; } - return false; + return true; } /// @@ -526,8 +606,6 @@ class DistributedConnection extends NetworkConnection with IStore { // Timer(Duration(seconds: KeepAliveInterval), _keepAliveTimer_Elapsed); } - - void _keepAliveTimer_Elapsed() { if (!isConnected) return; @@ -541,23 +619,25 @@ class DistributedConnection extends NetworkConnection with IStore { _lastKeepAliveSent = now; - sendRequest(IIPPacketAction.KeepAlive) - ..addDateTime(now) - ..addUint32(interval) - ..done().then((x) { + //print("keep alive sent"); + + (sendRequest(IIPPacketAction.KeepAlive) + ..addDateTime(now) + ..addUint32(interval)) + .done() + ..then((x) { jitter = x?[1]; _keepAliveTimer = Timer( Duration(seconds: KeepAliveInterval), _keepAliveTimer_Elapsed); - print("Keep Alive Received ${jitter}"); - }).error((ex) { + //print("Keep Alive Received ${jitter}"); + }) + ..error((ex) { _keepAliveTimer?.cancel(); close(); - }).timeout(Duration(microseconds: keepAliveTime), onTimeout: () { - _keepAliveTimer?.cancel(); - close(); - }); + }) + ..timeout(Duration(seconds: keepAliveTime)); } int processPacket( @@ -743,20 +823,20 @@ class DistributedConnection extends NetworkConnection with IStore { // packet.callbackId, packet.resourceId, packet.content, false); break; - - case IIPPacketAction.KeepAlive: - iipRequestKeepAlive(packet.callbackId, packet.currentTime, packet.interval); - break; + case IIPPacketAction.KeepAlive: + iipRequestKeepAlive( + packet.callbackId, packet.currentTime, packet.interval); + break; case IIPPacketAction.ProcedureCall: - iipRequestProcedureCall(packet.callbackId, packet.procedure, packet.dataType as TransmissionType, msg); - break; + iipRequestProcedureCall(packet.callbackId, packet.procedure, + packet.dataType as TransmissionType, msg); + break; case IIPPacketAction.StaticCall: - iipRequestStaticCall(packet.callbackId, packet.classId, packet.methodIndex, packet.dataType as TransmissionType, msg); - break; - - + iipRequestStaticCall(packet.callbackId, packet.classId, + packet.methodIndex, packet.dataType as TransmissionType, msg); + break; } } else if (packet.command == IIPPacketCommand.Reply) { switch (packet.action) { @@ -823,7 +903,6 @@ class DistributedConnection extends NetworkConnection with IStore { case IIPPacketAction.InvokeFunction: case IIPPacketAction.StaticCall: case IIPPacketAction.ProcedureCall: - iipReplyInvoke(packet.callbackId, packet.dataType ?? TransmissionType.Null, msg); break; @@ -1038,10 +1117,11 @@ class DistributedConnection extends NetworkConnection with IStore { emitArgs("ready", []); // start perodic keep alive timer - _keepAliveTimer = Timer(Duration(seconds: KeepAliveInterval), _keepAliveTimer_Elapsed); - + _keepAliveTimer = Timer(Duration(seconds: KeepAliveInterval), + _keepAliveTimer_Elapsed); } } else if (_authPacket.command == IIPAuthPacketCommand.Error) { + _invalidCredentials = true; var ex = AsyncException(ErrorType.Management, _authPacket.errorCode, _authPacket.errorMessage); _openReply?.triggerError(ex); @@ -1096,7 +1176,7 @@ class DistributedConnection extends NetworkConnection with IStore { /// AsyncReply put(IResource resource) { if (Codec.isLocalResource(resource, this)) - _resources.add( + _neededResources.add( (resource as DistributedResource).distributedResourceInstanceId as int, resource); @@ -1190,7 +1270,21 @@ class DistributedConnection extends NetworkConnection with IStore { return reply; } - AsyncReply? sendDetachRequest(int instanceId) { + void detachResource(int instanceId) async { + try { + if (_attachedResources.containsKey(instanceId)) + _attachedResources.remove(instanceId); + + if (_suspendedResources.containsKey(instanceId)) + _suspendedResources.remove(instanceId); + + await _sendDetachRequest(instanceId); + } catch (ex) { + // do nothing + } + } + + AsyncReply? _sendDetachRequest(int instanceId) { try { return (sendRequest(IIPPacketAction.DetachResource) ..addUint32(instanceId)) @@ -1277,11 +1371,16 @@ class DistributedConnection extends NetworkConnection with IStore { void iipEventResourceReassigned(int resourceId, int newResourceId) {} void iipEventResourceDestroyed(int resourceId) { - if (_resources.contains(resourceId)) { - var r = _resources[resourceId]; - _resources.remove(resourceId); - r?.destroy(); + var r = _attachedResources[resourceId]?.target; + if (r != null) { + r.destroy(); + return; + } else if (_neededResources.contains(resourceId)) { + // @TODO: handle this mess + _neededResources.remove(resourceId); } + + _attachedResources.remove(resourceId); } // @TODO: Check for deadlocks @@ -1532,17 +1631,15 @@ class DistributedConnection extends NetworkConnection with IStore { _subscriptions.remove(resource); } -void _unsubscribeAll(){ - _subscriptions.forEach((resource, value) { - resource.instance?.off("resourceEventOccurred", _instance_EventOccurred); - resource.instance?.off("resourceModified", _instance_PropertyModified); - resource.instance?.off("resourceDestroyed", _instance_ResourceDestroyed); - - }); - - _subscriptions.clear(); -} + void _unsubscribeAll() { + _subscriptions.forEach((resource, value) { + resource.instance?.off("resourceEventOccurred", _instance_EventOccurred); + resource.instance?.off("resourceModified", _instance_PropertyModified); + resource.instance?.off("resourceDestroyed", _instance_ResourceDestroyed); + }); + _subscriptions.clear(); + } void iipRequestReattachResource( int callback, int resourceId, int resourceAge) { @@ -2071,104 +2168,103 @@ void _unsubscribeAll(){ }); } + void iipRequestProcedureCall(int callback, String procedureCall, + TransmissionType transmissionType, DC content) { + // server not implemented + sendError( + ErrorType.Management, callback, ExceptionCode.GeneralFailure.index); - void iipRequestProcedureCall(int callback, String procedureCall, TransmissionType transmissionType, DC content) - { - // server not implemented - sendError(ErrorType.Management, callback, ExceptionCode.GeneralFailure.index); + // if (server == null) + // { + // sendError(ErrorType.Management, callback, ExceptionCode.GeneralFailure.index); + // return; + // } - // if (server == null) - // { - // sendError(ErrorType.Management, callback, ExceptionCode.GeneralFailure.index); - // return; - // } + // var call = Server.Calls[procedureCall]; - // var call = Server.Calls[procedureCall]; + // if (call == null) + // { + // sendError(ErrorType.Management, callback, ExceptionCode.MethodNotFound.index); + // return; + // } - // if (call == null) - // { - // sendError(ErrorType.Management, callback, ExceptionCode.MethodNotFound.index); - // return; - // } + // var (_, parsed) = Codec.Parse(content, 0, this, null, transmissionType); - // var (_, parsed) = Codec.Parse(content, 0, this, null, transmissionType); + // parsed.Then(results => + // { + // var arguments = (Map)results;// (object[])results; - // parsed.Then(results => - // { - // var arguments = (Map)results;// (object[])results; + // // un hold the socket to send data immediately + // this.Socket.Unhold(); - // // un hold the socket to send data immediately - // this.Socket.Unhold(); + // // @TODO: Make managers for procedure calls + // //if (r.Instance.Applicable(session, ActionType.Execute, ft) == Ruling.Denied) + // //{ + // // SendError(ErrorType.Management, callback, + // // (ushort)ExceptionCode.InvokeDenied); + // // return; + // //} - // // @TODO: Make managers for procedure calls - // //if (r.Instance.Applicable(session, ActionType.Execute, ft) == Ruling.Denied) - // //{ - // // SendError(ErrorType.Management, callback, - // // (ushort)ExceptionCode.InvokeDenied); - // // return; - // //} + // InvokeFunction(call.Method, callback, arguments, IIPPacket.IIPPacketAction.ProcedureCall, call.Target); - // InvokeFunction(call.Method, callback, arguments, IIPPacket.IIPPacketAction.ProcedureCall, call.Target); + // }).Error(x => + // { + // SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError); + // }); + } - // }).Error(x => - // { - // SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError); - // }); + void iipRequestStaticCall(int callback, Guid classId, int index, + TransmissionType transmissionType, DC content) { + var template = Warehouse.getTemplateByClassId(classId); + + if (template == null) { + sendError( + ErrorType.Management, callback, ExceptionCode.TemplateNotFound.index); + return; } - void iipRequestStaticCall(int callback, Guid classId, int index, TransmissionType transmissionType, DC content) - { - var template = Warehouse.getTemplateByClassId(classId); + var ft = template.getFunctionTemplateByIndex(index); - if (template == null) - { - sendError(ErrorType.Management, callback, ExceptionCode.TemplateNotFound.index); - return; - } - - var ft = template.getFunctionTemplateByIndex(index); - - if (ft == null) - { - // no function at this index - sendError(ErrorType.Management, callback, ExceptionCode.MethodNotFound.index); - return; - } - - // var parsed = Codec.parse(content, 0, this, null, transmissionType); - - // parsed.then((results) - // { - // var arguments = (Map)results; - - // // un hold the socket to send data immediately - // socket?.unhold(); - - // var fi = ft.methodInfo; - - // if (fi == null) - // { - // // ft found, fi not found, this should never happen - // sendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); - // return; - // } - - // // @TODO: Make managers for static calls - // //if (r.Instance.Applicable(session, ActionType.Execute, ft) == Ruling.Denied) - // //{ - // // SendError(ErrorType.Management, callback, - // // (ushort)ExceptionCode.InvokeDenied); - // // return; - // //} - - // InvokeFunction(fi, callback, arguments, IIPPacket.IIPPacketAction.StaticCall, null); - - // }).Error(x => - // { - // SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError); - // }); + if (ft == null) { + // no function at this index + sendError( + ErrorType.Management, callback, ExceptionCode.MethodNotFound.index); + return; } + // var parsed = Codec.parse(content, 0, this, null, transmissionType); + + // parsed.then((results) + // { + // var arguments = (Map)results; + + // // un hold the socket to send data immediately + // socket?.unhold(); + + // var fi = ft.methodInfo; + + // if (fi == null) + // { + // // ft found, fi not found, this should never happen + // sendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); + // return; + // } + + // // @TODO: Make managers for static calls + // //if (r.Instance.Applicable(session, ActionType.Execute, ft) == Ruling.Denied) + // //{ + // // SendError(ErrorType.Management, callback, + // // (ushort)ExceptionCode.InvokeDenied); + // // return; + // //} + + // InvokeFunction(fi, callback, arguments, IIPPacket.IIPPacketAction.StaticCall, null); + + // }).Error(x => + // { + // SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError); + // }); + } void iipRequestResourceAttribute(int callback, int resourceId) {} @@ -2404,14 +2500,7 @@ void _unsubscribeAll(){ sendError(x.type, callback, x.code, x.message); }); } else { - /* -#if NETSTANDARD1_5 - var pi = r.GetType().GetTypeInfo().GetProperty(pt.Name); -#else - var pi = r.GetType().GetProperty(pt.Name); -#endif*/ - - var pi = null; // pt.Info; + var pi = null; if (pi != null) { if (r.instance?.applicable(_session as Session, @@ -2549,16 +2638,16 @@ void _unsubscribeAll(){ return rt; } - /// - /// Retrive a resource by its instance Id. - /// - /// Instance Id - /// Resource - AsyncReply retrieve(int iid) { - for (var r in _resources.values) - if (r.instance?.id == iid) return new AsyncReply.ready(r); - return new AsyncReply.ready(null); - } + // /// + // /// Retrive a resource by its instance Id. + // /// + // /// Instance Id + // /// Resource + // AsyncReply retrieve(int iid) { + // for (var r in _resources.values) + // if (r.instance?.id == iid) return new AsyncReply.ready(r); + // return new AsyncReply.ready(null); + // } AsyncReply> getLinkTemplates(String link) { var reply = new AsyncReply>(); @@ -2603,7 +2692,13 @@ void _unsubscribeAll(){ /// Resource IdGuid classId /// DistributedResource AsyncReply fetch(int id, List? requestSequence) { - var resource = _resources[id]; + var resource = _attachedResources[id]?.target; + + if (resource != null) + return AsyncReply.ready(resource); + + resource = _neededResources[id]; + var request = _resourceRequests[id]; //print("fetch $id"); @@ -2612,8 +2707,11 @@ void _unsubscribeAll(){ if (resource != null && (requestSequence?.contains(id) ?? false)) return AsyncReply.ready(resource); return request; - } else if (resource != null && !resource.distributedResourceSuspended) + } else if (resource != null && !resource.distributedResourceSuspended) { + // @REVIEW: this should never happen + print("DCON: Resource not moved to attached."); return new AsyncReply.ready(resource); + } var reply = new AsyncReply(); _resourceRequests.add(id, reply); @@ -2652,8 +2750,10 @@ void _unsubscribeAll(){ dr = new DistributedResource(); dr.internal_init(this, id, rt[1] as int, rt[2] as String); } - } else + } else { dr = resource; + template = resource.instance?.template; + } TransmissionType transmissionType = rt[3] as TransmissionType; DC content = rt[4] as DC; @@ -2674,8 +2774,12 @@ void _unsubscribeAll(){ ar[i + 2], ar[i] as int, ar[i + 1] as DateTime)); dr.internal_attach(pvs); - _resourceRequests.remove(id); + + // move from needed to attached. + _neededResources.remove(id); + _attachedResources[id] = WeakReference(dr); + reply.trigger(dr); }) ..error((ex) => reply.triggerError(ex)); @@ -3042,78 +3146,77 @@ void _unsubscribeAll(){ TemplateDescriber get template => TemplateDescriber("Esiur.Net.IIP.DistributedConnection"); + AsyncReply staticCall( + Guid classId, int index, Map parameters) { + var pb = Codec.compose(parameters, this); + var reply = AsyncReply(); + var c = _callbackCounter++; + _requests.add(c, reply); - AsyncReply staticCall(Guid classId, int index, Map parameters) - { - var pb = Codec.compose(parameters, this); - - var reply = AsyncReply(); - var c = _callbackCounter++; - _requests.add(c, reply); - - - sendParams()..addUint8((0x40 | IIPPacketAction.StaticCall)) + sendParams() + ..addUint8((0x40 | IIPPacketAction.StaticCall)) ..addUint32(c) ..addGuid(classId) ..addUint8(index) ..addDC(pb) ..done(); - return reply; + return reply; } - // AsyncReply Call(String procedureCall, params object[] parameters) - // { - // var args = new Map(); - // for (byte i = 0; i < parameters.Length; i++) - // args.Add(i, parameters[i]); - // return Call(procedureCall, args); - // } + AsyncReply call(String procedureCall, [List? parameters = null]) { + if (parameters == null) { + return callArgs(procedureCall, Map()); + } else { + var map = Map(); + parameters.forEachIndexed((index, element) { + map[UInt8(index)] = element; + }); + return callArgs(procedureCall, map); + } + } - AsyncReply call(String procedureCall, Map parameters) - { - var pb = Codec.compose(parameters, this); + AsyncReply callArgs( + String procedureCall, Map parameters) { + var pb = Codec.compose(parameters, this); - var reply = new AsyncReply(); - var c = _callbackCounter++; - _requests.add(c, reply); + var reply = new AsyncReply(); + var c = _callbackCounter++; + _requests.add(c, reply); - var callName = DC.stringToBytes(procedureCall); + var callName = DC.stringToBytes(procedureCall); - sendParams()..addUint8(0x40 | IIPPacketAction.ProcedureCall) - ..addUint32(c) - ..addUint16(callName.length) - ..addDC(callName) - ..addDC(pb) - ..done(); + sendParams() + ..addUint8(0x40 | IIPPacketAction.ProcedureCall) + ..addUint32(c) + ..addUint16(callName.length) + ..addDC(callName) + ..addDC(pb) + ..done(); - return reply; + return reply; + } + + void iipRequestKeepAlive(int callbackId, DateTime peerTime, int interval) { + int jitter = 0; + + var now = DateTime.now().toUtc(); + + if (_lastKeepAliveReceived != null) { + var diff = now.difference(_lastKeepAliveReceived!).inMicroseconds; + //Console.WriteLine("Diff " + diff + " " + interval); + + jitter = (diff - interval).abs(); } - void iipRequestKeepAlive(int callbackId, DateTime peerTime, int interval) - { - - int jitter = 0; - - var now = DateTime.now().toUtc(); - - if (_lastKeepAliveReceived != null) - { - var diff = now.difference(_lastKeepAliveReceived!).inMicroseconds; - //Console.WriteLine("Diff " + diff + " " + interval); - - jitter =(diff -interval).abs(); - } - - sendParams() - ..addUint8(0x80 | IIPPacketAction.KeepAlive) - ..addUint32(callbackId) - ..addDateTime(now) - ..addUint32(jitter) - ..done(); - - _lastKeepAliveReceived = now; - } + sendParams() + ..addUint8(0x80 | IIPPacketAction.KeepAlive) + ..addUint32(callbackId) + ..addDateTime(now) + ..addUint32(jitter) + ..done(); + _lastKeepAliveReceived = now; + } } diff --git a/lib/src/Net/IIP/DistributedResource.dart b/lib/src/Net/IIP/DistributedResource.dart index 347ae18..1dc9b4f 100644 --- a/lib/src/Net/IIP/DistributedResource.dart +++ b/lib/src/Net/IIP/DistributedResource.dart @@ -76,6 +76,7 @@ class DistributedResource extends IResource { /// Instance Id given by the other end. /// int? get distributedResourceInstanceId => _instanceId; + set distributedResourceInstanceId(value) => _instanceId = value; //bool get destroyed => _destroyed; @@ -90,7 +91,7 @@ class DistributedResource extends IResource { void destroy() { _destroyed = true; _attached = false; - _connection?.sendDetachRequest(_instanceId as int); + _connection?.detachResource(_instanceId as int); emitArgs("destroy", [this]); } @@ -194,8 +195,8 @@ class DistributedResource extends IResource { } AsyncReply listen(event) { - if (_destroyed) throw new Exception("Trying to access destroyed object"); - if (_suspended) throw new Exception("Trying to access suspended object"); + if (_destroyed) throw new Exception("Trying to access a destroyed object."); + if (_suspended) throw new Exception("Trying to access a suspended object."); EventTemplate? et = event is EventTemplate ? event @@ -214,8 +215,8 @@ class DistributedResource extends IResource { } AsyncReply unlisten(event) { - if (_destroyed) throw new Exception("Trying to access destroyed object"); - if (_suspended) throw new Exception("Trying to access suspended object"); + if (_destroyed) throw new Exception("Trying to access a destroyed object."); + if (_suspended) throw new Exception("Trying to access a suspended object."); EventTemplate? et = event is EventTemplate ? event @@ -245,7 +246,7 @@ class DistributedResource extends IResource { } AsyncReply internal_invoke(int index, Map args) { - if (_destroyed) throw new Exception("Trying to access a destroyed object"); + if (_destroyed) throw new Exception("Trying to access a destroyed object."); if (_suspended) throw new Exception("Trying to access a suspended object."); if (instance == null) throw Exception("Object not initialized."); @@ -290,6 +291,10 @@ class DistributedResource extends IResource { @override //overring noSuchMethod noSuchMethod(Invocation invocation) { + if (_destroyed) throw new Exception("Trying to access a destroyed object."); + + if (_suspended) throw new Exception("Trying to access a suspended object."); + var memberName = _getMemberName(invocation.memberName); if (invocation.isMethod) { @@ -365,6 +370,10 @@ class DistributedResource extends IResource { /// Value /// Indicator when the property is set. AsyncReply set(int index, dynamic value) { + if (_destroyed) throw new Exception("Trying to access a destroyed object."); + + if (_suspended) throw new Exception("Trying to access a suspended object."); + if (index >= _properties.length) throw Exception("Property with index `${index}` not found."); diff --git a/lib/src/Net/NetworkConnection.dart b/lib/src/Net/NetworkConnection.dart index 9b0a30b..cf9f721 100644 --- a/lib/src/Net/NetworkConnection.dart +++ b/lib/src/Net/NetworkConnection.dart @@ -100,7 +100,6 @@ class NetworkConnection extends IDestructible with INetworkReceiver { if (_sock != null) _sock?.close(); } catch (ex) { //Global.Log("NetworkConenction:Close", LogType.Error, ex.ToString()); - } } diff --git a/lib/src/Net/Sockets/TCPSocket.dart b/lib/src/Net/Sockets/TCPSocket.dart index b1b05c9..36f0f0d 100644 --- a/lib/src/Net/Sockets/TCPSocket.dart +++ b/lib/src/Net/Sockets/TCPSocket.dart @@ -153,6 +153,8 @@ class TCPSocket extends ISocket { } void close() { + if (state == SocketState.Closed) return; + if (state != SocketState.Closed && state != SocketState.Terminated) _state = SocketState.Closed; diff --git a/lib/src/Net/Sockets/WSocket.dart b/lib/src/Net/Sockets/WSocket.dart index c3c0fa1..8905f90 100644 --- a/lib/src/Net/Sockets/WSocket.dart +++ b/lib/src/Net/Sockets/WSocket.dart @@ -126,6 +126,8 @@ class WSocket extends ISocket { } void close() { + if (state == SocketState.Closed) return; + if (state != SocketState.Closed && state != SocketState.Terminated) _state = SocketState.Closed; diff --git a/lib/src/Resource/IStore.dart b/lib/src/Resource/IStore.dart index ef085dd..0690cb1 100644 --- a/lib/src/Resource/IStore.dart +++ b/lib/src/Resource/IStore.dart @@ -33,7 +33,7 @@ import '../Data/PropertyValue.dart'; // new abstract class IStore implements IResource { AsyncReply get(String path); - AsyncReply retrieve(int iid); + // AsyncReply retrieve(int iid); AsyncReply put(IResource resource); String? link(IResource resource); bool record(IResource resource, String propertyName, dynamic value, int? age, diff --git a/lib/src/Resource/Warehouse.dart b/lib/src/Resource/Warehouse.dart index e9e748b..2e2362a 100644 --- a/lib/src/Resource/Warehouse.dart +++ b/lib/src/Resource/Warehouse.dart @@ -22,6 +22,7 @@ SOFTWARE. */ + import '../Data/IntType.dart'; import '../Data/TransmissionType.dart'; @@ -55,7 +56,8 @@ import '../Net/IIP/DistributedConnection.dart'; // Centeral Resource Issuer class Warehouse { static AutoList _stores = AutoList(); - static Map _resources = new Map(); + static Map> _resources = + new Map>(); static int resourceCounter = 0; static KeyList> _templates = @@ -100,7 +102,7 @@ class Warehouse { /// static AsyncReply getById(int id) { if (_resources.containsKey(id)) - return new AsyncReply.ready(_resources[id]); + return new AsyncReply.ready(_resources[id]?.target); else return new AsyncReply.ready(null); } @@ -155,15 +157,19 @@ class Warehouse { static AsyncReply close() { var bag = new AsyncBag(); - for (var resource in _resources.values) - if (!(resource is IStore)) - bag.add(resource.trigger(ResourceTrigger.Terminate)); + for (var resource in _resources.values) { + var r = resource.target; + if ((r != null) && !(r is IStore)) + bag.add(r.trigger(ResourceTrigger.Terminate)); + } for (var s in _stores) bag.add(s.trigger(ResourceTrigger.Terminate)); - for (var resource in _resources.values) - if (!(resource is IStore)) - bag.add(resource.trigger(ResourceTrigger.SystemTerminated)); + for (var resource in _resources.values) { + var r = resource.target; + if ((r != null) && !(resource is IStore)) + bag.add(r.trigger(ResourceTrigger.SystemTerminated)); + } for (var store in _stores) bag.add(store.trigger(ResourceTrigger.SystemTerminated)); @@ -451,7 +457,7 @@ class Warehouse { var initResource = () { if (resource.instance == null) return; - _resources[(resource.instance as Instance).id] = resource; + _resources[(resource.instance as Instance).id] = WeakReference(resource); if (_warehouseIsOpen) { resource.trigger(ResourceTrigger.Initialize) @@ -644,8 +650,15 @@ class Warehouse { _stores.remove(resource); // remove all objects associated with the store - var toBeRemoved = - _resources.values.where((x) => x.instance?.store == resource); + //var toBeRemoved = + // _resources.values.where((x) => x.target?.instance?.store == resource); + + var toBeRemoved = []; + for (var wr in _resources.values) { + var r = wr.target; + if (r != null && r.instance?.store == resource) toBeRemoved.add(r); + } + for (var o in toBeRemoved) remove(o); // StoreDisconnected?.Invoke(resource as IStore);