2
0
mirror of https://github.com/esiur/esiur-dart.git synced 2025-05-06 12:02:57 +00:00

AutoReconnect

This commit is contained in:
Esiur Project 2022-08-11 21:28:41 +03:00
parent c9ead2dabd
commit 083ea92d3b
10 changed files with 407 additions and 271 deletions

View File

@ -12,7 +12,7 @@ class AsyncQueue<T> extends AsyncReply<T?> {
_list.add(reply); _list.add(reply);
//super._resultReady = false; //super._resultReady = false;
super.setResultReady(false); super.ready = false; // setResultReady(false);
reply.then(processQueue); reply.then(processQueue);
} }
@ -35,7 +35,7 @@ class AsyncQueue<T> extends AsyncReply<T?> {
break; break;
//super._resultReady = (_list.length == 0); //super._resultReady = (_list.length == 0);
super.setResultReady(_list.length == 0); super.ready = _list.length == 0; // .setResultReady(_list.length == 0);
} }
AsyncQueue() {} AsyncQueue() {}

View File

@ -23,6 +23,8 @@ SOFTWARE.
*/ */
import 'dart:async'; import 'dart:async';
import 'dart:core'; import 'dart:core';
import 'package:esiur/esiur.dart';
import 'AsyncException.dart'; import 'AsyncException.dart';
import 'ProgressType.dart'; import 'ProgressType.dart';
@ -55,9 +57,9 @@ class AsyncReply<T> implements Future<T> {
return _result; return _result;
} }
void setResultReady(bool val) { // void setResultReady(bool val) {
_resultReady = val; // _resultReady = val;
} // }
AsyncReply<T> next(Function(T) callback) { AsyncReply<T> next(Function(T) callback) {
then(callback); then(callback);
@ -127,7 +129,11 @@ class AsyncReply<T> implements Future<T> {
AsyncReply<T> timeout(Duration timeLimit, {FutureOr<T?> onTimeout()?}) { AsyncReply<T> timeout(Duration timeLimit, {FutureOr<T?> onTimeout()?}) {
Future.delayed(timeLimit, () { Future.delayed(timeLimit, () {
if (!_resultReady && _exception == null) onTimeout?.call(); if (!_resultReady && _exception == null) {
triggerError(AsyncException(ErrorType.Management,
ExceptionCode.Timeout.index, "Execution timeout expired"));
onTimeout?.call();
}
}); });
return this; return this;
@ -152,6 +158,7 @@ class AsyncReply<T> implements Future<T> {
AsyncReply<T> trigger(T result) { AsyncReply<T> trigger(T result) {
if (_resultReady) return this; if (_resultReady) return this;
if (_exception != null) return this;
_result = result; _result = result;
_resultReady = true; _resultReady = true;

View File

@ -34,5 +34,6 @@ enum ExceptionCode {
AlreadyListened, AlreadyListened,
AlreadyUnlistened, AlreadyUnlistened,
NotListenable, NotListenable,
ParseError ParseError,
Timeout
} }

View File

@ -122,8 +122,13 @@ class DistributedConnection extends NetworkConnection with IStore {
bool _ready = false, _readyToEstablish = false; bool _ready = false, _readyToEstablish = false;
KeyList<int, DistributedResource> _resources = KeyList<int, WeakReference<DistributedResource>> _attachedResources =
new KeyList<int, WeakReference<DistributedResource>>();
KeyList<int, DistributedResource> _neededResources =
new KeyList<int, DistributedResource>(); new KeyList<int, DistributedResource>();
KeyList<int, WeakReference<DistributedResource>> _suspendedResources =
new KeyList<int, WeakReference<DistributedResource>>();
KeyList<int, AsyncReply<DistributedResource>> _resourceRequests = KeyList<int, AsyncReply<DistributedResource>> _resourceRequests =
new KeyList<int, AsyncReply<DistributedResource>>(); new KeyList<int, AsyncReply<DistributedResource>>();
@ -285,6 +290,7 @@ class DistributedConnection extends NetworkConnection with IStore {
_session?.localAuthentication.domain = domain; _session?.localAuthentication.domain = domain;
_session?.localAuthentication.username = username; _session?.localAuthentication.username = username;
_localPasswordOrToken = passwordOrToken; _localPasswordOrToken = passwordOrToken;
_invalidCredentials = false;
} }
if (_session == null) if (_session == null)
@ -302,38 +308,59 @@ class DistributedConnection extends NetworkConnection with IStore {
if (_hostname == null) throw Exception("Host not specified."); if (_hostname == null) throw Exception("Host not specified.");
if (socket != null) { _connectSocket(socket);
socket.connect(_hostname as String, _port)
..then((x) {
assign(socket as ISocket);
})
..error((x) {
_openReply?.triggerError(x);
_openReply = null;
});
}
return _openReply as AsyncReply<bool>; return _openReply as AsyncReply<bool>;
} }
_connectSocket(ISocket socket) {
socket.connect(_hostname as String, _port)
..then((x) {
assign(socket);
})
..error((x) {
if (autoReconnect) {
print("Reconnecting socket...");
Future.delayed(Duration(seconds: 5), () => _connectSocket(socket));
} else {
_openReply?.triggerError(x);
_openReply = null;
}
});
}
bool autoReconnect = false;
bool _invalidCredentials = false;
@override @override
void disconnected() { void disconnected() {
// clean up // clean up
_ready = false; _ready = false;
_readyToEstablish = false; _readyToEstablish = false;
_requests.values.forEach((x) { try { print("Disconnected ..");
x.triggerError(AsyncException(ErrorType.Management, 0, "Connection closed"));
_keepAliveTimer?.cancel();
_keepAliveTimer = null;
_requests.values.forEach((x) {
try {
x.triggerError(
AsyncException(ErrorType.Management, 0, "Connection closed"));
} catch (ex) {} } catch (ex) {}
}); });
_resourceRequests.values.forEach((x) { try { _resourceRequests.values.forEach((x) {
x.triggerError(AsyncException(ErrorType.Management, 0, "Connection closed")); try {
x.triggerError(
AsyncException(ErrorType.Management, 0, "Connection closed"));
} catch (ex) {} } catch (ex) {}
}); });
_templateRequests.values.forEach((x) { try { _templateRequests.values.forEach((x) {
x.triggerError(AsyncException(ErrorType.Management, 0, "Connection closed")); try {
x.triggerError(
AsyncException(ErrorType.Management, 0, "Connection closed"));
} catch (ex) {} } catch (ex) {}
}); });
@ -341,38 +368,91 @@ class DistributedConnection extends NetworkConnection with IStore {
_resourceRequests.clear(); _resourceRequests.clear();
_templateRequests.clear(); _templateRequests.clear();
for (var x in _attachedResources.values) {
var r = x.target;
if (server != null){ if (r != null) {
// @TODO: check if we need this with reconnect r.suspend();
_resources.values.forEach((x) => x.suspend()); _suspendedResources[r.distributedResourceInstanceId ?? 0] = x;
_unsubscribeAll(); }
Warehouse.remove(this);
} }
if (server != null) {
_suspendedResources.clear();
_unsubscribeAll();
Warehouse.remove(this);
// @TODO: implement this
// if (ready)
// _server.membership?.Logout(session);
} else if (autoReconnect && !_invalidCredentials) {
Future.delayed(Duration(seconds: 5), reconnect);
} else {
_suspendedResources.clear();
}
_attachedResources.clear();
_ready = false;
} }
Future<bool> reconnect() async { Future<bool> reconnect() async {
if (await connect()) { try {
var bag = AsyncBag(); if (!await connect()) return false;
for (var i = 0; i < _resources.keys.length; i++) { try {
var index = _resources.keys.elementAt(i); var toBeRestored = <DistributedResource>[];
// print("Re $i ${_resources[index].instance.template.className}");
bag.add(fetch(index, null)); _suspendedResources.forEach((key, value) {
var r = value.target;
if (r != null) toBeRestored.add(r);
});
for (var r in toBeRestored) {
var link = DC.stringToBytes(r.distributedResourceLink ?? "");
print("Restoring " + (r.distributedResourceLink ?? ""));
try {
var ar = await (sendRequest(IIPPacketAction.QueryLink)
..addUint16(link.length)
..addDC(link))
.done();
var dataType = ar?[0] as TransmissionType;
var data = ar?[1] as DC;
if (dataType.identifier ==
TransmissionTypeIdentifier.ResourceList) {
// parse them as int
var id = data.getUint32(8);
if (id != r.distributedResourceInstanceId)
r.distributedResourceInstanceId = id;
_neededResources[id] = r;
_suspendedResources.remove(id);
await fetch(id, null);
}
} catch (ex) {
if (ex is AsyncException &&
ex.code == ExceptionCode.ResourceNotFound) {
// skip this resource
} else {
break;
}
}
}
} catch (ex) {
print(ex);
}
} catch (ex) {
return false;
} }
bag.seal();
await bag;
return true; return true;
} }
return false;
}
/// <summary> /// <summary>
/// KeyList to store user variables related to this connection. /// KeyList to store user variables related to this connection.
/// </summary> /// </summary>
@ -526,8 +606,6 @@ class DistributedConnection extends NetworkConnection with IStore {
// Timer(Duration(seconds: KeepAliveInterval), _keepAliveTimer_Elapsed); // Timer(Duration(seconds: KeepAliveInterval), _keepAliveTimer_Elapsed);
} }
void _keepAliveTimer_Elapsed() { void _keepAliveTimer_Elapsed() {
if (!isConnected) return; if (!isConnected) return;
@ -541,23 +619,25 @@ class DistributedConnection extends NetworkConnection with IStore {
_lastKeepAliveSent = now; _lastKeepAliveSent = now;
sendRequest(IIPPacketAction.KeepAlive) //print("keep alive sent");
(sendRequest(IIPPacketAction.KeepAlive)
..addDateTime(now) ..addDateTime(now)
..addUint32(interval) ..addUint32(interval))
..done().then((x) { .done()
..then((x) {
jitter = x?[1]; jitter = x?[1];
_keepAliveTimer = Timer( _keepAliveTimer = Timer(
Duration(seconds: KeepAliveInterval), _keepAliveTimer_Elapsed); Duration(seconds: KeepAliveInterval), _keepAliveTimer_Elapsed);
print("Keep Alive Received ${jitter}"); //print("Keep Alive Received ${jitter}");
}).error((ex) { })
..error((ex) {
_keepAliveTimer?.cancel(); _keepAliveTimer?.cancel();
close(); close();
}).timeout(Duration(microseconds: keepAliveTime), onTimeout: () { })
_keepAliveTimer?.cancel(); ..timeout(Duration(seconds: keepAliveTime));
close();
});
} }
int processPacket( int processPacket(
@ -743,20 +823,20 @@ class DistributedConnection extends NetworkConnection with IStore {
// packet.callbackId, packet.resourceId, packet.content, false); // packet.callbackId, packet.resourceId, packet.content, false);
break; break;
case IIPPacketAction.KeepAlive: case IIPPacketAction.KeepAlive:
iipRequestKeepAlive(packet.callbackId, packet.currentTime, packet.interval); iipRequestKeepAlive(
packet.callbackId, packet.currentTime, packet.interval);
break; break;
case IIPPacketAction.ProcedureCall: case IIPPacketAction.ProcedureCall:
iipRequestProcedureCall(packet.callbackId, packet.procedure, packet.dataType as TransmissionType, msg); iipRequestProcedureCall(packet.callbackId, packet.procedure,
packet.dataType as TransmissionType, msg);
break; break;
case IIPPacketAction.StaticCall: case IIPPacketAction.StaticCall:
iipRequestStaticCall(packet.callbackId, packet.classId, packet.methodIndex, packet.dataType as TransmissionType, msg); iipRequestStaticCall(packet.callbackId, packet.classId,
packet.methodIndex, packet.dataType as TransmissionType, msg);
break; break;
} }
} else if (packet.command == IIPPacketCommand.Reply) { } else if (packet.command == IIPPacketCommand.Reply) {
switch (packet.action) { switch (packet.action) {
@ -823,7 +903,6 @@ class DistributedConnection extends NetworkConnection with IStore {
case IIPPacketAction.InvokeFunction: case IIPPacketAction.InvokeFunction:
case IIPPacketAction.StaticCall: case IIPPacketAction.StaticCall:
case IIPPacketAction.ProcedureCall: case IIPPacketAction.ProcedureCall:
iipReplyInvoke(packet.callbackId, iipReplyInvoke(packet.callbackId,
packet.dataType ?? TransmissionType.Null, msg); packet.dataType ?? TransmissionType.Null, msg);
break; break;
@ -1038,10 +1117,11 @@ class DistributedConnection extends NetworkConnection with IStore {
emitArgs("ready", []); emitArgs("ready", []);
// start perodic keep alive timer // start perodic keep alive timer
_keepAliveTimer = Timer(Duration(seconds: KeepAliveInterval), _keepAliveTimer_Elapsed); _keepAliveTimer = Timer(Duration(seconds: KeepAliveInterval),
_keepAliveTimer_Elapsed);
} }
} else if (_authPacket.command == IIPAuthPacketCommand.Error) { } else if (_authPacket.command == IIPAuthPacketCommand.Error) {
_invalidCredentials = true;
var ex = AsyncException(ErrorType.Management, _authPacket.errorCode, var ex = AsyncException(ErrorType.Management, _authPacket.errorCode,
_authPacket.errorMessage); _authPacket.errorMessage);
_openReply?.triggerError(ex); _openReply?.triggerError(ex);
@ -1096,7 +1176,7 @@ class DistributedConnection extends NetworkConnection with IStore {
/// <returns></returns> /// <returns></returns>
AsyncReply<bool> put(IResource resource) { AsyncReply<bool> put(IResource resource) {
if (Codec.isLocalResource(resource, this)) if (Codec.isLocalResource(resource, this))
_resources.add( _neededResources.add(
(resource as DistributedResource).distributedResourceInstanceId (resource as DistributedResource).distributedResourceInstanceId
as int, as int,
resource); resource);
@ -1190,7 +1270,21 @@ class DistributedConnection extends NetworkConnection with IStore {
return reply; return reply;
} }
AsyncReply<dynamic>? sendDetachRequest(int instanceId) { void detachResource(int instanceId) async {
try {
if (_attachedResources.containsKey(instanceId))
_attachedResources.remove(instanceId);
if (_suspendedResources.containsKey(instanceId))
_suspendedResources.remove(instanceId);
await _sendDetachRequest(instanceId);
} catch (ex) {
// do nothing
}
}
AsyncReply<dynamic>? _sendDetachRequest(int instanceId) {
try { try {
return (sendRequest(IIPPacketAction.DetachResource) return (sendRequest(IIPPacketAction.DetachResource)
..addUint32(instanceId)) ..addUint32(instanceId))
@ -1277,11 +1371,16 @@ class DistributedConnection extends NetworkConnection with IStore {
void iipEventResourceReassigned(int resourceId, int newResourceId) {} void iipEventResourceReassigned(int resourceId, int newResourceId) {}
void iipEventResourceDestroyed(int resourceId) { void iipEventResourceDestroyed(int resourceId) {
if (_resources.contains(resourceId)) { var r = _attachedResources[resourceId]?.target;
var r = _resources[resourceId]; if (r != null) {
_resources.remove(resourceId); r.destroy();
r?.destroy(); return;
} else if (_neededResources.contains(resourceId)) {
// @TODO: handle this mess
_neededResources.remove(resourceId);
} }
_attachedResources.remove(resourceId);
} }
// @TODO: Check for deadlocks // @TODO: Check for deadlocks
@ -1537,13 +1636,11 @@ void _unsubscribeAll(){
resource.instance?.off("resourceEventOccurred", _instance_EventOccurred); resource.instance?.off("resourceEventOccurred", _instance_EventOccurred);
resource.instance?.off("resourceModified", _instance_PropertyModified); resource.instance?.off("resourceModified", _instance_PropertyModified);
resource.instance?.off("resourceDestroyed", _instance_ResourceDestroyed); resource.instance?.off("resourceDestroyed", _instance_ResourceDestroyed);
}); });
_subscriptions.clear(); _subscriptions.clear();
} }
void iipRequestReattachResource( void iipRequestReattachResource(
int callback, int resourceId, int resourceAge) { int callback, int resourceId, int resourceAge) {
Warehouse.getById(resourceId).then((r) { Warehouse.getById(resourceId).then((r) {
@ -2071,11 +2168,11 @@ void _unsubscribeAll(){
}); });
} }
void iipRequestProcedureCall(int callback, String procedureCall,
void iipRequestProcedureCall(int callback, String procedureCall, TransmissionType transmissionType, DC content) TransmissionType transmissionType, DC content) {
{
// server not implemented // server not implemented
sendError(ErrorType.Management, callback, ExceptionCode.GeneralFailure.index); sendError(
ErrorType.Management, callback, ExceptionCode.GeneralFailure.index);
// if (server == null) // if (server == null)
// { // {
@ -2116,22 +2213,22 @@ void _unsubscribeAll(){
// }); // });
} }
void iipRequestStaticCall(int callback, Guid classId, int index, TransmissionType transmissionType, DC content) void iipRequestStaticCall(int callback, Guid classId, int index,
{ TransmissionType transmissionType, DC content) {
var template = Warehouse.getTemplateByClassId(classId); var template = Warehouse.getTemplateByClassId(classId);
if (template == null) if (template == null) {
{ sendError(
sendError(ErrorType.Management, callback, ExceptionCode.TemplateNotFound.index); ErrorType.Management, callback, ExceptionCode.TemplateNotFound.index);
return; return;
} }
var ft = template.getFunctionTemplateByIndex(index); var ft = template.getFunctionTemplateByIndex(index);
if (ft == null) if (ft == null) {
{
// no function at this index // no function at this index
sendError(ErrorType.Management, callback, ExceptionCode.MethodNotFound.index); sendError(
ErrorType.Management, callback, ExceptionCode.MethodNotFound.index);
return; return;
} }
@ -2169,7 +2266,6 @@ void _unsubscribeAll(){
// }); // });
} }
void iipRequestResourceAttribute(int callback, int resourceId) {} void iipRequestResourceAttribute(int callback, int resourceId) {}
// @TODO: Check for deadlocks // @TODO: Check for deadlocks
@ -2404,14 +2500,7 @@ void _unsubscribeAll(){
sendError(x.type, callback, x.code, x.message); sendError(x.type, callback, x.code, x.message);
}); });
} else { } else {
/* var pi = null;
#if NETSTANDARD1_5
var pi = r.GetType().GetTypeInfo().GetProperty(pt.Name);
#else
var pi = r.GetType().GetProperty(pt.Name);
#endif*/
var pi = null; // pt.Info;
if (pi != null) { if (pi != null) {
if (r.instance?.applicable(_session as Session, if (r.instance?.applicable(_session as Session,
@ -2549,16 +2638,16 @@ void _unsubscribeAll(){
return rt; return rt;
} }
/// <summary> // /// <summary>
/// Retrive a resource by its instance Id. // /// Retrive a resource by its instance Id.
/// </summary> // /// </summary>
/// <param name="iid">Instance Id</param> // /// <param name="iid">Instance Id</param>
/// <returns>Resource</returns> // /// <returns>Resource</returns>
AsyncReply<IResource?> retrieve(int iid) { // AsyncReply<IResource?> retrieve(int iid) {
for (var r in _resources.values) // for (var r in _resources.values)
if (r.instance?.id == iid) return new AsyncReply<IResource>.ready(r); // if (r.instance?.id == iid) return new AsyncReply<IResource>.ready(r);
return new AsyncReply<IResource?>.ready(null); // return new AsyncReply<IResource?>.ready(null);
} // }
AsyncReply<List<TypeTemplate>> getLinkTemplates(String link) { AsyncReply<List<TypeTemplate>> getLinkTemplates(String link) {
var reply = new AsyncReply<List<TypeTemplate>>(); var reply = new AsyncReply<List<TypeTemplate>>();
@ -2603,7 +2692,13 @@ void _unsubscribeAll(){
/// <param name="id">Resource Id</param>Guid classId /// <param name="id">Resource Id</param>Guid classId
/// <returns>DistributedResource</returns> /// <returns>DistributedResource</returns>
AsyncReply<DistributedResource> fetch(int id, List<int>? requestSequence) { AsyncReply<DistributedResource> fetch(int id, List<int>? requestSequence) {
var resource = _resources[id]; var resource = _attachedResources[id]?.target;
if (resource != null)
return AsyncReply<DistributedResource>.ready(resource);
resource = _neededResources[id];
var request = _resourceRequests[id]; var request = _resourceRequests[id];
//print("fetch $id"); //print("fetch $id");
@ -2612,8 +2707,11 @@ void _unsubscribeAll(){
if (resource != null && (requestSequence?.contains(id) ?? false)) if (resource != null && (requestSequence?.contains(id) ?? false))
return AsyncReply<DistributedResource>.ready(resource); return AsyncReply<DistributedResource>.ready(resource);
return request; return request;
} else if (resource != null && !resource.distributedResourceSuspended) } else if (resource != null && !resource.distributedResourceSuspended) {
// @REVIEW: this should never happen
print("DCON: Resource not moved to attached.");
return new AsyncReply<DistributedResource>.ready(resource); return new AsyncReply<DistributedResource>.ready(resource);
}
var reply = new AsyncReply<DistributedResource>(); var reply = new AsyncReply<DistributedResource>();
_resourceRequests.add(id, reply); _resourceRequests.add(id, reply);
@ -2652,8 +2750,10 @@ void _unsubscribeAll(){
dr = new DistributedResource(); dr = new DistributedResource();
dr.internal_init(this, id, rt[1] as int, rt[2] as String); dr.internal_init(this, id, rt[1] as int, rt[2] as String);
} }
} else } else {
dr = resource; dr = resource;
template = resource.instance?.template;
}
TransmissionType transmissionType = rt[3] as TransmissionType; TransmissionType transmissionType = rt[3] as TransmissionType;
DC content = rt[4] as DC; DC content = rt[4] as DC;
@ -2674,8 +2774,12 @@ void _unsubscribeAll(){
ar[i + 2], ar[i] as int, ar[i + 1] as DateTime)); ar[i + 2], ar[i] as int, ar[i + 1] as DateTime));
dr.internal_attach(pvs); dr.internal_attach(pvs);
_resourceRequests.remove(id); _resourceRequests.remove(id);
// move from needed to attached.
_neededResources.remove(id);
_attachedResources[id] = WeakReference<DistributedResource>(dr);
reply.trigger(dr); reply.trigger(dr);
}) })
..error((ex) => reply.triggerError(ex)); ..error((ex) => reply.triggerError(ex));
@ -3042,18 +3146,16 @@ void _unsubscribeAll(){
TemplateDescriber get template => TemplateDescriber get template =>
TemplateDescriber("Esiur.Net.IIP.DistributedConnection"); TemplateDescriber("Esiur.Net.IIP.DistributedConnection");
AsyncReply<dynamic> staticCall(
Guid classId, int index, Map<UInt8, dynamic> parameters) {
AsyncReply<dynamic> staticCall(Guid classId, int index, Map<UInt8, dynamic> parameters)
{
var pb = Codec.compose(parameters, this); var pb = Codec.compose(parameters, this);
var reply = AsyncReply<dynamic>(); var reply = AsyncReply<dynamic>();
var c = _callbackCounter++; var c = _callbackCounter++;
_requests.add(c, reply); _requests.add(c, reply);
sendParams()
sendParams()..addUint8((0x40 | IIPPacketAction.StaticCall)) ..addUint8((0x40 | IIPPacketAction.StaticCall))
..addUint32(c) ..addUint32(c)
..addGuid(classId) ..addGuid(classId)
..addUint8(index) ..addUint8(index)
@ -3063,16 +3165,20 @@ void _unsubscribeAll(){
return reply; return reply;
} }
// AsyncReply<dynamic> Call(String procedureCall, params object[] parameters) AsyncReply<dynamic> call(String procedureCall, [List? parameters = null]) {
// { if (parameters == null) {
// var args = new Map<byte, object>(); return callArgs(procedureCall, Map<UInt8, dynamic>());
// for (byte i = 0; i < parameters.Length; i++) } else {
// args.Add(i, parameters[i]); var map = Map<UInt8, dynamic>();
// return Call(procedureCall, args); parameters.forEachIndexed((index, element) {
// } map[UInt8(index)] = element;
});
return callArgs(procedureCall, map);
}
}
AsyncReply<dynamic> call(String procedureCall, Map<UInt8, dynamic> parameters) AsyncReply<dynamic> callArgs(
{ String procedureCall, Map<UInt8, dynamic> parameters) {
var pb = Codec.compose(parameters, this); var pb = Codec.compose(parameters, this);
var reply = new AsyncReply<dynamic>(); var reply = new AsyncReply<dynamic>();
@ -3081,7 +3187,8 @@ void _unsubscribeAll(){
var callName = DC.stringToBytes(procedureCall); var callName = DC.stringToBytes(procedureCall);
sendParams()..addUint8(0x40 | IIPPacketAction.ProcedureCall) sendParams()
..addUint8(0x40 | IIPPacketAction.ProcedureCall)
..addUint32(c) ..addUint32(c)
..addUint16(callName.length) ..addUint16(callName.length)
..addDC(callName) ..addDC(callName)
@ -3091,15 +3198,12 @@ void _unsubscribeAll(){
return reply; return reply;
} }
void iipRequestKeepAlive(int callbackId, DateTime peerTime, int interval) void iipRequestKeepAlive(int callbackId, DateTime peerTime, int interval) {
{
int jitter = 0; int jitter = 0;
var now = DateTime.now().toUtc(); var now = DateTime.now().toUtc();
if (_lastKeepAliveReceived != null) if (_lastKeepAliveReceived != null) {
{
var diff = now.difference(_lastKeepAliveReceived!).inMicroseconds; var diff = now.difference(_lastKeepAliveReceived!).inMicroseconds;
//Console.WriteLine("Diff " + diff + " " + interval); //Console.WriteLine("Diff " + diff + " " + interval);
@ -3115,5 +3219,4 @@ void _unsubscribeAll(){
_lastKeepAliveReceived = now; _lastKeepAliveReceived = now;
} }
} }

View File

@ -76,6 +76,7 @@ class DistributedResource extends IResource {
/// Instance Id given by the other end. /// Instance Id given by the other end.
/// </summary> /// </summary>
int? get distributedResourceInstanceId => _instanceId; int? get distributedResourceInstanceId => _instanceId;
set distributedResourceInstanceId(value) => _instanceId = value;
//bool get destroyed => _destroyed; //bool get destroyed => _destroyed;
@ -90,7 +91,7 @@ class DistributedResource extends IResource {
void destroy() { void destroy() {
_destroyed = true; _destroyed = true;
_attached = false; _attached = false;
_connection?.sendDetachRequest(_instanceId as int); _connection?.detachResource(_instanceId as int);
emitArgs("destroy", [this]); emitArgs("destroy", [this]);
} }
@ -194,8 +195,8 @@ class DistributedResource extends IResource {
} }
AsyncReply<dynamic> listen(event) { AsyncReply<dynamic> listen(event) {
if (_destroyed) throw new Exception("Trying to access destroyed object"); if (_destroyed) throw new Exception("Trying to access a destroyed object.");
if (_suspended) throw new Exception("Trying to access suspended object"); if (_suspended) throw new Exception("Trying to access a suspended object.");
EventTemplate? et = event is EventTemplate EventTemplate? et = event is EventTemplate
? event ? event
@ -214,8 +215,8 @@ class DistributedResource extends IResource {
} }
AsyncReply<dynamic> unlisten(event) { AsyncReply<dynamic> unlisten(event) {
if (_destroyed) throw new Exception("Trying to access destroyed object"); if (_destroyed) throw new Exception("Trying to access a destroyed object.");
if (_suspended) throw new Exception("Trying to access suspended object"); if (_suspended) throw new Exception("Trying to access a suspended object.");
EventTemplate? et = event is EventTemplate EventTemplate? et = event is EventTemplate
? event ? event
@ -245,7 +246,7 @@ class DistributedResource extends IResource {
} }
AsyncReply<dynamic> internal_invoke(int index, Map<UInt8, dynamic> args) { AsyncReply<dynamic> internal_invoke(int index, Map<UInt8, dynamic> args) {
if (_destroyed) throw new Exception("Trying to access a destroyed object"); if (_destroyed) throw new Exception("Trying to access a destroyed object.");
if (_suspended) throw new Exception("Trying to access a suspended object."); if (_suspended) throw new Exception("Trying to access a suspended object.");
if (instance == null) throw Exception("Object not initialized."); if (instance == null) throw Exception("Object not initialized.");
@ -290,6 +291,10 @@ class DistributedResource extends IResource {
@override //overring noSuchMethod @override //overring noSuchMethod
noSuchMethod(Invocation invocation) { noSuchMethod(Invocation invocation) {
if (_destroyed) throw new Exception("Trying to access a destroyed object.");
if (_suspended) throw new Exception("Trying to access a suspended object.");
var memberName = _getMemberName(invocation.memberName); var memberName = _getMemberName(invocation.memberName);
if (invocation.isMethod) { if (invocation.isMethod) {
@ -365,6 +370,10 @@ class DistributedResource extends IResource {
/// <param name="value">Value</param> /// <param name="value">Value</param>
/// <returns>Indicator when the property is set.</returns> /// <returns>Indicator when the property is set.</returns>
AsyncReply<dynamic> set(int index, dynamic value) { AsyncReply<dynamic> set(int index, dynamic value) {
if (_destroyed) throw new Exception("Trying to access a destroyed object.");
if (_suspended) throw new Exception("Trying to access a suspended object.");
if (index >= _properties.length) if (index >= _properties.length)
throw Exception("Property with index `${index}` not found."); throw Exception("Property with index `${index}` not found.");

View File

@ -100,7 +100,6 @@ class NetworkConnection extends IDestructible with INetworkReceiver<ISocket> {
if (_sock != null) _sock?.close(); if (_sock != null) _sock?.close();
} catch (ex) { } catch (ex) {
//Global.Log("NetworkConenction:Close", LogType.Error, ex.ToString()); //Global.Log("NetworkConenction:Close", LogType.Error, ex.ToString());
} }
} }

View File

@ -153,6 +153,8 @@ class TCPSocket extends ISocket {
} }
void close() { void close() {
if (state == SocketState.Closed) return;
if (state != SocketState.Closed && state != SocketState.Terminated) if (state != SocketState.Closed && state != SocketState.Terminated)
_state = SocketState.Closed; _state = SocketState.Closed;

View File

@ -126,6 +126,8 @@ class WSocket extends ISocket {
} }
void close() { void close() {
if (state == SocketState.Closed) return;
if (state != SocketState.Closed && state != SocketState.Terminated) if (state != SocketState.Closed && state != SocketState.Terminated)
_state = SocketState.Closed; _state = SocketState.Closed;

View File

@ -33,7 +33,7 @@ import '../Data/PropertyValue.dart';
// new // new
abstract class IStore implements IResource { abstract class IStore implements IResource {
AsyncReply<IResource?> get(String path); AsyncReply<IResource?> get(String path);
AsyncReply<IResource?> retrieve(int iid); // AsyncReply<IResource?> retrieve(int iid);
AsyncReply<bool> put(IResource resource); AsyncReply<bool> put(IResource resource);
String? link(IResource resource); String? link(IResource resource);
bool record(IResource resource, String propertyName, dynamic value, int? age, bool record(IResource resource, String propertyName, dynamic value, int? age,

View File

@ -22,6 +22,7 @@ SOFTWARE.
*/ */
import '../Data/IntType.dart'; import '../Data/IntType.dart';
import '../Data/TransmissionType.dart'; import '../Data/TransmissionType.dart';
@ -55,7 +56,8 @@ import '../Net/IIP/DistributedConnection.dart';
// Centeral Resource Issuer // Centeral Resource Issuer
class Warehouse { class Warehouse {
static AutoList<IStore, Instance> _stores = AutoList<IStore, Instance>(); static AutoList<IStore, Instance> _stores = AutoList<IStore, Instance>();
static Map<int, IResource> _resources = new Map<int, IResource>(); static Map<int, WeakReference<IResource>> _resources =
new Map<int, WeakReference<IResource>>();
static int resourceCounter = 0; static int resourceCounter = 0;
static KeyList<TemplateType, KeyList<Guid, TypeTemplate>> _templates = static KeyList<TemplateType, KeyList<Guid, TypeTemplate>> _templates =
@ -100,7 +102,7 @@ class Warehouse {
/// <returns></returns> /// <returns></returns>
static AsyncReply<IResource?> getById(int id) { static AsyncReply<IResource?> getById(int id) {
if (_resources.containsKey(id)) if (_resources.containsKey(id))
return new AsyncReply<IResource?>.ready(_resources[id]); return new AsyncReply<IResource?>.ready(_resources[id]?.target);
else else
return new AsyncReply<IResource?>.ready(null); return new AsyncReply<IResource?>.ready(null);
} }
@ -155,15 +157,19 @@ class Warehouse {
static AsyncReply<bool> close() { static AsyncReply<bool> close() {
var bag = new AsyncBag<bool>(); var bag = new AsyncBag<bool>();
for (var resource in _resources.values) for (var resource in _resources.values) {
if (!(resource is IStore)) var r = resource.target;
bag.add(resource.trigger(ResourceTrigger.Terminate)); if ((r != null) && !(r is IStore))
bag.add(r.trigger(ResourceTrigger.Terminate));
}
for (var s in _stores) bag.add(s.trigger(ResourceTrigger.Terminate)); for (var s in _stores) bag.add(s.trigger(ResourceTrigger.Terminate));
for (var resource in _resources.values) for (var resource in _resources.values) {
if (!(resource is IStore)) var r = resource.target;
bag.add(resource.trigger(ResourceTrigger.SystemTerminated)); if ((r != null) && !(resource is IStore))
bag.add(r.trigger(ResourceTrigger.SystemTerminated));
}
for (var store in _stores) for (var store in _stores)
bag.add(store.trigger(ResourceTrigger.SystemTerminated)); bag.add(store.trigger(ResourceTrigger.SystemTerminated));
@ -451,7 +457,7 @@ class Warehouse {
var initResource = () { var initResource = () {
if (resource.instance == null) return; if (resource.instance == null) return;
_resources[(resource.instance as Instance).id] = resource; _resources[(resource.instance as Instance).id] = WeakReference(resource);
if (_warehouseIsOpen) { if (_warehouseIsOpen) {
resource.trigger(ResourceTrigger.Initialize) resource.trigger(ResourceTrigger.Initialize)
@ -644,8 +650,15 @@ class Warehouse {
_stores.remove(resource); _stores.remove(resource);
// remove all objects associated with the store // remove all objects associated with the store
var toBeRemoved = //var toBeRemoved =
_resources.values.where((x) => x.instance?.store == resource); // _resources.values.where((x) => x.target?.instance?.store == resource);
var toBeRemoved = <IResource>[];
for (var wr in _resources.values) {
var r = wr.target;
if (r != null && r.instance?.store == resource) toBeRemoved.add(r);
}
for (var o in toBeRemoved) remove(o); for (var o in toBeRemoved) remove(o);
// StoreDisconnected?.Invoke(resource as IStore); // StoreDisconnected?.Invoke(resource as IStore);