From aab6c2bebdeb960e64e34e2bbef74894fcf93b4b Mon Sep 17 00:00:00 2001 From: Ahmed Zamil Date: Wed, 6 Apr 2022 02:29:27 +0300 Subject: [PATCH] Fix AsyncReply --- lib/src/Core/AsyncReply.dart | 36 ++++--- lib/src/Net/IIP/DistributedConnection.dart | 111 ++++++++++++--------- lib/src/Proxy/TemplateGenerator.dart | 6 +- lib/src/Resource/Warehouse.dart | 50 ++++++---- 4 files changed, 114 insertions(+), 89 deletions(-) diff --git a/lib/src/Core/AsyncReply.dart b/lib/src/Core/AsyncReply.dart index cb3bc8c..5e66483 100644 --- a/lib/src/Core/AsyncReply.dart +++ b/lib/src/Core/AsyncReply.dart @@ -63,36 +63,41 @@ class AsyncReply implements Future { } AsyncReply then(FutureOr onValue(T value), {Function? onError}) { + if (onError != null) {} + _callbacks.add(onValue); if (onError != null) { _errorCallbacks.add(onError); + //print("On ERROR $onError ${this.hashCode}"); + } if (_resultReady) onValue(result as T); -// if (R == Null) - // return null; - //else - //if (R == T) - return AsyncReply(); + //if (R == Null) + return AsyncReply(); + //else if (R == T) return (AsyncReply)this; } - AsyncReply whenComplete(FutureOr action()) { + @override + Future whenComplete(FutureOr action()) { return this; - //_callbacks.add(action); } + // AsyncReply whenComplete(FutureOr action()) { + // return this; + // //_callbacks.add(action); + // } + Stream asStream() { return Stream.empty(); //return null; } -// Future catchError(Function onError, {bool test(Object error)?}); - - AsyncReply catchError(Function onError, {bool test(Object error)?}) { - ///return this.error(onError); - + @override + AsyncReply catchError(Function onError, + {bool Function(Object error)? test}) { _errorCallbacks.add(onError); if (_exception != null) { @@ -152,9 +157,6 @@ class AsyncReply implements Future { else _exception = AsyncException.toAsyncException(exception); - ///lock (callbacksLock) - //{ - if (this._errorCallbacks.length == 0) throw _exception as AsyncException; else @@ -167,6 +169,10 @@ class AsyncReply implements Future { x(); } else if (x is Function(Object, StackTrace)) { x(_exception as Object, StackTrace.current); + } else if (x is Function(AsyncException)) { + x(_exception!); + } else { + throw Exception("Unknown error handler $x"); } //x(_exception as AsyncException); }); diff --git a/lib/src/Net/IIP/DistributedConnection.dart b/lib/src/Net/IIP/DistributedConnection.dart index ef1f234..bffaf0d 100644 --- a/lib/src/Net/IIP/DistributedConnection.dart +++ b/lib/src/Net/IIP/DistributedConnection.dart @@ -260,6 +260,7 @@ class DistributedConnection extends NetworkConnection with IStore { throw AsyncException(ErrorType.Exception, 0, "Connection in progress"); _openReply = new AsyncReply(); + //print("_openReply hash ${_openReply.hashCode}"); if (hostname != null) { _session = @@ -288,12 +289,14 @@ class DistributedConnection extends NetworkConnection with IStore { if (_hostname == null) throw Exception("Host not specified."); if (socket != null) { - socket.connect(_hostname as String, _port).then((x) { - assign(socket as ISocket); - }).error((x) { - _openReply?.triggerError(x); - _openReply = null; - }); + socket.connect(_hostname as String, _port) + ..then((x) { + assign(socket as ISocket); + }) + ..error((x) { + _openReply?.triggerError(x); + _openReply = null; + }); } return _openReply as AsyncReply; @@ -1566,16 +1569,17 @@ class DistributedConnection extends NetworkConnection with IStore { null; //Activator.CreateInstance(type, args) as IResource; Warehouse.put( - name, resource as IResource, store, parent) - .then((ok) { - sendReply(IIPPacketAction.CreateResource, callback) - ..addUint32((resource.instance as Instance).id) - ..done(); - }).error((ex) { - // send some error - sendError(ErrorType.Management, callback, - ExceptionCode.AddToStoreFailed.index); - }); + name, resource as IResource, store, parent) + ..then((ok) { + sendReply(IIPPacketAction.CreateResource, callback) + ..addUint32((resource.instance as Instance).id) + ..done(); + }) + ..error((ex) { + // send some error + sendError(ErrorType.Management, callback, + ExceptionCode.AddToStoreFailed.index); + }); }); }); }); @@ -2185,11 +2189,13 @@ class DistributedConnection extends NetworkConnection with IStore { Codec.parse(data, 0, this, null, dataType).reply.then((value) { if (r is DistributedResource) { // propagation - (r as DistributedResource).set(index, value).then((x) { - sendReply(IIPPacketAction.SetProperty, callback).done(); - }).error((x) { - sendError(x.type, callback, x.code, x.message); - }); + (r as DistributedResource).set(index, value) + ..then((x) { + sendReply(IIPPacketAction.SetProperty, callback).done(); + }) + ..error((x) { + sendError(x.type, callback, x.code, x.message); + }); } else { /* #if NETSTANDARD1_5 @@ -2473,9 +2479,10 @@ class DistributedConnection extends NetworkConnection with IStore { AsyncReply> getChildren(IResource resource) { var rt = new AsyncReply>(); - sendRequest(IIPPacketAction.ResourceChildren) - ..addUint32(resource.instance?.id as int) - ..done().then((ar) { + (sendRequest(IIPPacketAction.ResourceChildren) + ..addUint32(resource.instance?.id as int)) + .done() + ..then((ar) { if (ar != null) { TransmissionType dataType = ar[0] as TransmissionType; DC data = ar[1] as DC; @@ -2487,7 +2494,7 @@ class DistributedConnection extends NetworkConnection with IStore { } else { rt.triggerError(Exception("Null response")); } - }); + }).error((ex) => rt.triggerError(ex)); return rt; } @@ -2496,9 +2503,10 @@ class DistributedConnection extends NetworkConnection with IStore { AsyncReply> getParents(IResource resource) { var rt = new AsyncReply>(); - sendRequest(IIPPacketAction.ResourceParents) - ..addUint32((resource.instance as Instance).id) - ..done().then((ar) { + (sendRequest(IIPPacketAction.ResourceParents) + ..addUint32((resource.instance as Instance).id)) + .done() + ..then((ar) { if (ar != null) { TransmissionType dataType = ar[0] as TransmissionType; DC data = ar[1] as DC; @@ -2509,7 +2517,8 @@ class DistributedConnection extends NetworkConnection with IStore { } else { rt.triggerError(Exception("Null response")); } - }); + }) + ..error((ex) => rt.triggerError(ex)); return rt; } @@ -2531,7 +2540,7 @@ class DistributedConnection extends NetworkConnection with IStore { ..addInt32(attrs.length) ..addDC(attrs)) .done() - ..then((ar) => rt.trigger(true)) + ..then((ar) => rt.trigger(true)) ..error((ex) => rt.triggerError(ex)); } @@ -2543,14 +2552,14 @@ class DistributedConnection extends NetworkConnection with IStore { [bool clearAttributes = false]) { var rt = new AsyncReply(); - sendRequest(clearAttributes - ? IIPPacketAction.UpdateAllAttributes - : IIPPacketAction.UpdateAttributes) - ..addUint32(resource.instance?.id as int) - ..addDC(Codec.compose(attributes, this)) - ..done() - .then((ar) => rt.trigger(true)) - .error((ex) => rt.triggerError(ex)); + (sendRequest(clearAttributes + ? IIPPacketAction.UpdateAllAttributes + : IIPPacketAction.UpdateAttributes) + ..addUint32(resource.instance?.id as int) + ..addDC(Codec.compose(attributes, this))) + .done() + ..then((ar) => rt.trigger(true)) + ..error((ex) => rt.triggerError(ex)); return rt; } @@ -2629,11 +2638,12 @@ class DistributedConnection extends NetworkConnection with IStore { var reply = new AsyncReply>>(); - sendRequest(IIPPacketAction.ResourceHistory) - ..addUint32(dr.distributedResourceInstanceId as int) - ..addDateTime(fromDate) - ..addDateTime(toDate) - ..done().then((rt) { + (sendRequest(IIPPacketAction.ResourceHistory) + ..addUint32(dr.distributedResourceInstanceId as int) + ..addDateTime(fromDate) + ..addDateTime(toDate)) + .done() + ..then((rt) { if (rt != null) { var content = rt[0] as DC; @@ -2643,7 +2653,8 @@ class DistributedConnection extends NetworkConnection with IStore { } else { reply.triggerError(Exception("Null response")); } - }).error((ex) => reply.triggerError(ex)); + }) + ..error((ex) => reply.triggerError(ex)); return reply; } else @@ -2661,10 +2672,11 @@ class DistributedConnection extends NetworkConnection with IStore { var str = DC.stringToBytes(path); var reply = new AsyncReply>(); - sendRequest(IIPPacketAction.QueryLink) - ..addUint16(str.length) - ..addDC(str) - ..done().then((ar) { + (sendRequest(IIPPacketAction.QueryLink) + ..addUint16(str.length) + ..addDC(str)) + .done() + ..then((ar) { if (ar != null) { TransmissionType dataType = ar[0] as TransmissionType; DC data = ar[1] as DC; @@ -2675,7 +2687,8 @@ class DistributedConnection extends NetworkConnection with IStore { } else { reply.triggerError(Exception("Null response")); } - }).error((ex) => reply.triggerError(ex)); + }) + ..error((ex) => reply.triggerError(ex)); return reply; } diff --git a/lib/src/Proxy/TemplateGenerator.dart b/lib/src/Proxy/TemplateGenerator.dart index d6a778b..3b1128d 100644 --- a/lib/src/Proxy/TemplateGenerator.dart +++ b/lib/src/Proxy/TemplateGenerator.dart @@ -442,9 +442,9 @@ class TemplateGenerator { rt.writeln("var rt = AsyncReply<$rtTypeName>();"); rt.writeln("internal_invoke(${f.index}, args)"); - rt.writeln(".then((x) => rt.trigger(x))"); - rt.writeln(".error((x) => rt.triggerError(x))"); - rt.writeln(".chunk((x) => rt.triggerChunk(x));"); + rt.writeln("..then((x) => rt.trigger(x))"); + rt.writeln("..error((x) => rt.triggerError(x))"); + rt.writeln("..chunk((x) => rt.triggerChunk(x));"); rt.writeln("return rt; }"); }); diff --git a/lib/src/Resource/Warehouse.dart b/lib/src/Resource/Warehouse.dart index 14adad3..e9e748b 100644 --- a/lib/src/Resource/Warehouse.dart +++ b/lib/src/Resource/Warehouse.dart @@ -454,20 +454,24 @@ class Warehouse { _resources[(resource.instance as Instance).id] = resource; if (_warehouseIsOpen) { - resource.trigger(ResourceTrigger.Initialize).then((value) { - if (resource is IStore) - resource.trigger(ResourceTrigger.Open).then((value) { + resource.trigger(ResourceTrigger.Initialize) + ..then((value) { + if (resource is IStore) + resource.trigger(ResourceTrigger.Open) + ..then((value) { + rt.trigger(resource); + }) + ..error((ex) { + Warehouse.remove(resource); + rt.triggerError(ex); + }); + else rt.trigger(resource); - }).error((ex) { - Warehouse.remove(resource); - rt.triggerError(ex); - }); - else - rt.trigger(resource); - }).error((ex) { - Warehouse.remove(resource); - rt.triggerError(ex); - }); + }) + ..error((ex) { + Warehouse.remove(resource); + rt.triggerError(ex); + }); } }; @@ -475,15 +479,17 @@ class Warehouse { _stores.add(resource); initResource(); } else { - store?.put(resource).then((value) { - if (value) - initResource(); - else - rt.trigger(null); - }).error((ex) { - Warehouse.remove(resource); - rt.triggerError(ex); - }); + store?.put(resource) + ?..then((value) { + if (value) + initResource(); + else + rt.trigger(null); + }) + ..error((ex) { + Warehouse.remove(resource); + rt.triggerError(ex); + }); } // return new name