2
0
mirror of https://github.com/esiur/esiur-dart.git synced 2025-05-06 12:02:57 +00:00

Fix AsyncReply

This commit is contained in:
Ahmed Zamil 2022-04-06 02:29:27 +03:00
parent 8479fe59c9
commit aab6c2bebd
4 changed files with 114 additions and 89 deletions

View File

@ -63,36 +63,41 @@ class AsyncReply<T> implements Future<T> {
} }
AsyncReply<R> then<R>(FutureOr<R> onValue(T value), {Function? onError}) { AsyncReply<R> then<R>(FutureOr<R> onValue(T value), {Function? onError}) {
if (onError != null) {}
_callbacks.add(onValue); _callbacks.add(onValue);
if (onError != null) { if (onError != null) {
_errorCallbacks.add(onError); _errorCallbacks.add(onError);
//print("On ERROR $onError ${this.hashCode}");
} }
if (_resultReady) onValue(result as T); if (_resultReady) onValue(result as T);
//if (R == Null) //if (R == Null)
// return null;
//else
//if (R == T)
return AsyncReply<R>(); return AsyncReply<R>();
//else if (R == T) return (AsyncReply<R>)this;
} }
AsyncReply<T> whenComplete(FutureOr action()) { @override
Future<T> whenComplete(FutureOr action()) {
return this; return this;
//_callbacks.add(action);
} }
// AsyncReply<T> whenComplete(FutureOr action()) {
// return this;
// //_callbacks.add(action);
// }
Stream<T> asStream() { Stream<T> asStream() {
return Stream.empty(); return Stream.empty();
//return null; //return null;
} }
// Future<T> catchError(Function onError, {bool test(Object error)?}); @override
AsyncReply<T> catchError(Function onError,
AsyncReply<T> catchError(Function onError, {bool test(Object error)?}) { {bool Function(Object error)? test}) {
///return this.error(onError);
_errorCallbacks.add(onError); _errorCallbacks.add(onError);
if (_exception != null) { if (_exception != null) {
@ -152,9 +157,6 @@ class AsyncReply<T> implements Future<T> {
else else
_exception = AsyncException.toAsyncException(exception); _exception = AsyncException.toAsyncException(exception);
///lock (callbacksLock)
//{
if (this._errorCallbacks.length == 0) if (this._errorCallbacks.length == 0)
throw _exception as AsyncException; throw _exception as AsyncException;
else else
@ -167,6 +169,10 @@ class AsyncReply<T> implements Future<T> {
x(); x();
} else if (x is Function(Object, StackTrace)) { } else if (x is Function(Object, StackTrace)) {
x(_exception as Object, StackTrace.current); x(_exception as Object, StackTrace.current);
} else if (x is Function(AsyncException)) {
x(_exception!);
} else {
throw Exception("Unknown error handler $x");
} }
//x(_exception as AsyncException); //x(_exception as AsyncException);
}); });

View File

@ -260,6 +260,7 @@ class DistributedConnection extends NetworkConnection with IStore {
throw AsyncException(ErrorType.Exception, 0, "Connection in progress"); throw AsyncException(ErrorType.Exception, 0, "Connection in progress");
_openReply = new AsyncReply<bool>(); _openReply = new AsyncReply<bool>();
//print("_openReply hash ${_openReply.hashCode}");
if (hostname != null) { if (hostname != null) {
_session = _session =
@ -288,9 +289,11 @@ class DistributedConnection extends NetworkConnection with IStore {
if (_hostname == null) throw Exception("Host not specified."); if (_hostname == null) throw Exception("Host not specified.");
if (socket != null) { if (socket != null) {
socket.connect(_hostname as String, _port).then<dynamic>((x) { socket.connect(_hostname as String, _port)
..then((x) {
assign(socket as ISocket); assign(socket as ISocket);
}).error((x) { })
..error((x) {
_openReply?.triggerError(x); _openReply?.triggerError(x);
_openReply = null; _openReply = null;
}); });
@ -1567,11 +1570,12 @@ class DistributedConnection extends NetworkConnection with IStore {
Warehouse.put<IResource>( Warehouse.put<IResource>(
name, resource as IResource, store, parent) name, resource as IResource, store, parent)
.then<dynamic>((ok) { ..then((ok) {
sendReply(IIPPacketAction.CreateResource, callback) sendReply(IIPPacketAction.CreateResource, callback)
..addUint32((resource.instance as Instance).id) ..addUint32((resource.instance as Instance).id)
..done(); ..done();
}).error((ex) { })
..error((ex) {
// send some error // send some error
sendError(ErrorType.Management, callback, sendError(ErrorType.Management, callback,
ExceptionCode.AddToStoreFailed.index); ExceptionCode.AddToStoreFailed.index);
@ -2185,9 +2189,11 @@ class DistributedConnection extends NetworkConnection with IStore {
Codec.parse(data, 0, this, null, dataType).reply.then((value) { Codec.parse(data, 0, this, null, dataType).reply.then((value) {
if (r is DistributedResource) { if (r is DistributedResource) {
// propagation // propagation
(r as DistributedResource).set(index, value).then<dynamic>((x) { (r as DistributedResource).set(index, value)
..then((x) {
sendReply(IIPPacketAction.SetProperty, callback).done(); sendReply(IIPPacketAction.SetProperty, callback).done();
}).error((x) { })
..error((x) {
sendError(x.type, callback, x.code, x.message); sendError(x.type, callback, x.code, x.message);
}); });
} else { } else {
@ -2473,9 +2479,10 @@ class DistributedConnection extends NetworkConnection with IStore {
AsyncReply<List<IResource?>> getChildren(IResource resource) { AsyncReply<List<IResource?>> getChildren(IResource resource) {
var rt = new AsyncReply<List<IResource?>>(); var rt = new AsyncReply<List<IResource?>>();
sendRequest(IIPPacketAction.ResourceChildren) (sendRequest(IIPPacketAction.ResourceChildren)
..addUint32(resource.instance?.id as int) ..addUint32(resource.instance?.id as int))
..done().then<dynamic>((ar) { .done()
..then((ar) {
if (ar != null) { if (ar != null) {
TransmissionType dataType = ar[0] as TransmissionType; TransmissionType dataType = ar[0] as TransmissionType;
DC data = ar[1] as DC; DC data = ar[1] as DC;
@ -2487,7 +2494,7 @@ class DistributedConnection extends NetworkConnection with IStore {
} else { } else {
rt.triggerError(Exception("Null response")); rt.triggerError(Exception("Null response"));
} }
}); }).error((ex) => rt.triggerError(ex));
return rt; return rt;
} }
@ -2496,9 +2503,10 @@ class DistributedConnection extends NetworkConnection with IStore {
AsyncReply<List<IResource?>> getParents(IResource resource) { AsyncReply<List<IResource?>> getParents(IResource resource) {
var rt = new AsyncReply<List<IResource?>>(); var rt = new AsyncReply<List<IResource?>>();
sendRequest(IIPPacketAction.ResourceParents) (sendRequest(IIPPacketAction.ResourceParents)
..addUint32((resource.instance as Instance).id) ..addUint32((resource.instance as Instance).id))
..done().then<dynamic>((ar) { .done()
..then((ar) {
if (ar != null) { if (ar != null) {
TransmissionType dataType = ar[0] as TransmissionType; TransmissionType dataType = ar[0] as TransmissionType;
DC data = ar[1] as DC; DC data = ar[1] as DC;
@ -2509,7 +2517,8 @@ class DistributedConnection extends NetworkConnection with IStore {
} else { } else {
rt.triggerError(Exception("Null response")); rt.triggerError(Exception("Null response"));
} }
}); })
..error((ex) => rt.triggerError(ex));
return rt; return rt;
} }
@ -2531,7 +2540,7 @@ class DistributedConnection extends NetworkConnection with IStore {
..addInt32(attrs.length) ..addInt32(attrs.length)
..addDC(attrs)) ..addDC(attrs))
.done() .done()
..then<dynamic>((ar) => rt.trigger(true)) ..then((ar) => rt.trigger(true))
..error((ex) => rt.triggerError(ex)); ..error((ex) => rt.triggerError(ex));
} }
@ -2543,14 +2552,14 @@ class DistributedConnection extends NetworkConnection with IStore {
[bool clearAttributes = false]) { [bool clearAttributes = false]) {
var rt = new AsyncReply<bool>(); var rt = new AsyncReply<bool>();
sendRequest(clearAttributes (sendRequest(clearAttributes
? IIPPacketAction.UpdateAllAttributes ? IIPPacketAction.UpdateAllAttributes
: IIPPacketAction.UpdateAttributes) : IIPPacketAction.UpdateAttributes)
..addUint32(resource.instance?.id as int) ..addUint32(resource.instance?.id as int)
..addDC(Codec.compose(attributes, this)) ..addDC(Codec.compose(attributes, this)))
..done() .done()
.then<dynamic>((ar) => rt.trigger(true)) ..then((ar) => rt.trigger(true))
.error((ex) => rt.triggerError(ex)); ..error((ex) => rt.triggerError(ex));
return rt; return rt;
} }
@ -2629,11 +2638,12 @@ class DistributedConnection extends NetworkConnection with IStore {
var reply = var reply =
new AsyncReply<KeyList<PropertyTemplate, List<PropertyValue>>>(); new AsyncReply<KeyList<PropertyTemplate, List<PropertyValue>>>();
sendRequest(IIPPacketAction.ResourceHistory) (sendRequest(IIPPacketAction.ResourceHistory)
..addUint32(dr.distributedResourceInstanceId as int) ..addUint32(dr.distributedResourceInstanceId as int)
..addDateTime(fromDate) ..addDateTime(fromDate)
..addDateTime(toDate) ..addDateTime(toDate))
..done().then<dynamic>((rt) { .done()
..then((rt) {
if (rt != null) { if (rt != null) {
var content = rt[0] as DC; var content = rt[0] as DC;
@ -2643,7 +2653,8 @@ class DistributedConnection extends NetworkConnection with IStore {
} else { } else {
reply.triggerError(Exception("Null response")); reply.triggerError(Exception("Null response"));
} }
}).error((ex) => reply.triggerError(ex)); })
..error((ex) => reply.triggerError(ex));
return reply; return reply;
} else } else
@ -2661,10 +2672,11 @@ class DistributedConnection extends NetworkConnection with IStore {
var str = DC.stringToBytes(path); var str = DC.stringToBytes(path);
var reply = new AsyncReply<List<IResource?>>(); var reply = new AsyncReply<List<IResource?>>();
sendRequest(IIPPacketAction.QueryLink) (sendRequest(IIPPacketAction.QueryLink)
..addUint16(str.length) ..addUint16(str.length)
..addDC(str) ..addDC(str))
..done().then<dynamic>((ar) { .done()
..then((ar) {
if (ar != null) { if (ar != null) {
TransmissionType dataType = ar[0] as TransmissionType; TransmissionType dataType = ar[0] as TransmissionType;
DC data = ar[1] as DC; DC data = ar[1] as DC;
@ -2675,7 +2687,8 @@ class DistributedConnection extends NetworkConnection with IStore {
} else { } else {
reply.triggerError(Exception("Null response")); reply.triggerError(Exception("Null response"));
} }
}).error((ex) => reply.triggerError(ex)); })
..error((ex) => reply.triggerError(ex));
return reply; return reply;
} }

View File

@ -442,9 +442,9 @@ class TemplateGenerator {
rt.writeln("var rt = AsyncReply<$rtTypeName>();"); rt.writeln("var rt = AsyncReply<$rtTypeName>();");
rt.writeln("internal_invoke(${f.index}, args)"); rt.writeln("internal_invoke(${f.index}, args)");
rt.writeln(".then<dynamic>((x) => rt.trigger(x))"); rt.writeln("..then((x) => rt.trigger(x))");
rt.writeln(".error((x) => rt.triggerError(x))"); rt.writeln("..error((x) => rt.triggerError(x))");
rt.writeln(".chunk((x) => rt.triggerChunk(x));"); rt.writeln("..chunk((x) => rt.triggerChunk(x));");
rt.writeln("return rt; }"); rt.writeln("return rt; }");
}); });

View File

@ -454,17 +454,21 @@ class Warehouse {
_resources[(resource.instance as Instance).id] = resource; _resources[(resource.instance as Instance).id] = resource;
if (_warehouseIsOpen) { if (_warehouseIsOpen) {
resource.trigger(ResourceTrigger.Initialize).then<dynamic>((value) { resource.trigger(ResourceTrigger.Initialize)
..then((value) {
if (resource is IStore) if (resource is IStore)
resource.trigger(ResourceTrigger.Open).then<dynamic>((value) { resource.trigger(ResourceTrigger.Open)
..then((value) {
rt.trigger(resource); rt.trigger(resource);
}).error((ex) { })
..error((ex) {
Warehouse.remove(resource); Warehouse.remove(resource);
rt.triggerError(ex); rt.triggerError(ex);
}); });
else else
rt.trigger(resource); rt.trigger(resource);
}).error((ex) { })
..error((ex) {
Warehouse.remove(resource); Warehouse.remove(resource);
rt.triggerError(ex); rt.triggerError(ex);
}); });
@ -475,12 +479,14 @@ class Warehouse {
_stores.add(resource); _stores.add(resource);
initResource(); initResource();
} else { } else {
store?.put(resource).then<dynamic>((value) { store?.put(resource)
?..then((value) {
if (value) if (value)
initResource(); initResource();
else else
rt.trigger(null); rt.trigger(null);
}).error((ex) { })
..error((ex) {
Warehouse.remove(resource); Warehouse.remove(resource);
rt.triggerError(ex); rt.triggerError(ex);
}); });