From 09b010612ab6a0381cc850d85580c042501f5818 Mon Sep 17 00:00:00 2001 From: Ahmed Zamil Date: Wed, 25 Mar 2020 04:59:27 +0300 Subject: [PATCH] Resume connection --- lib/src/Net/IIP/DistributedConnection.dart | 180 ++++++++++++++++----- lib/src/Net/IIP/DistributedResource.dart | 54 +++++-- lib/src/Net/NetworkConnection.dart | 12 +- lib/src/Resource/Warehouse.dart | 10 -- test/main.dart | 31 +++- 5 files changed, 210 insertions(+), 77 deletions(-) diff --git a/lib/src/Net/IIP/DistributedConnection.dart b/lib/src/Net/IIP/DistributedConnection.dart index 8ad87c9..17ec66b 100644 --- a/lib/src/Net/IIP/DistributedConnection.dart +++ b/lib/src/Net/IIP/DistributedConnection.dart @@ -21,6 +21,9 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ + +import '../../Core/AsyncBag.dart'; + import '../Sockets/TCPSocket.dart'; import 'DistributedPropertyContext.dart'; import '../../Data/PropertyValue.dart'; @@ -97,15 +100,17 @@ class DistributedConnection extends NetworkConnection with IStore DC _localPassword; DC _localNonce, _remoteNonce; - bool _ready = false, _readyToEstablish = false; + String _hostname; + int _port; - DateTime _loginDate; + bool _ready = false, _readyToEstablish = false; KeyList _resources = new KeyList(); + KeyList> _resourceRequests = new KeyList>(); KeyList> _templateRequests = new KeyList>(); - KeyList> _pathRequests = new KeyList>(); + //KeyList> _pathRequests = new KeyList>(); Map _templates = new Map(); KeyList> _requests = new KeyList>(); int _callbackCounter = 0; @@ -177,7 +182,7 @@ class DistributedConnection extends NetworkConnection with IStore { - var host = instance.name.split(":");// ("://").skip(1).join("://").split("/")[0]; + var host = instance.name.split(":"); // assign domain from hostname if not provided var address = host[0]; @@ -186,29 +191,107 @@ class DistributedConnection extends NetworkConnection with IStore var domain = instance.attributes.containsKey("domain") ? instance.attributes["domain"] : address; - _session = new Session(new ClientAuthentication() - , new HostAuthentication()); - - _session.localAuthentication.domain = domain; - _session.localAuthentication.username = username; - _localPassword = DC.stringToBytes(instance.attributes["password"].toString()); - - _openReply = new AsyncReply(); - var sock = new TCPSocket(); + var password = DC.stringToBytes(instance.attributes["password"].toString()); - sock.connect(address, port).then((x){ - assign(sock); - //rt.trigger(true); - }).error((x)=>_openReply.triggerError(x)); - return _openReply; + return connect(domain: domain, hostname: address, port: port, password: password, username: username); + } } return new AsyncReply.ready(true); } + AsyncReply connect({ISocket socket, String hostname, int port, String username, DC password, String domain}) + { + if (_openReply != null) + throw AsyncException(ErrorType.Exception, 0, "Connection in progress"); + + _openReply = new AsyncReply(); + + if (hostname != null) + { + _session = new Session(new ClientAuthentication() + , new HostAuthentication()); + + _session.localAuthentication.domain = domain; + _session.localAuthentication.username = username; + _localPassword = password; + } + + if (_session == null) + throw AsyncException(ErrorType.Exception, 0, "Session not initialized"); + + if (socket == null) + socket = new TCPSocket(); + + _port = port ?? _port; + _hostname = hostname ?? _hostname; + + socket.connect(_hostname, _port).then((x){ + assign(socket); + }).error((x){ + _openReply.triggerError(x); + _openReply = null; + }); + + return _openReply; + } + + + @override + void connectionClosed() + { + // clean up + _ready = false; + _readyToEstablish = false; + + _requests.values.forEach((x)=>x.triggerError(AsyncException(ErrorType.Management, 0, "Connection closed"))); + _resourceRequests.values.forEach((x)=>x.triggerError(AsyncException(ErrorType.Management, 0, "Connection closed"))); + _templateRequests.values.forEach((x)=>x.triggerError(AsyncException(ErrorType.Management, 0, "Connection closed"))); + + _requests.clear(); + _resourceRequests.clear(); + _templateRequests.clear(); + + _resources.values.forEach((x)=>x.suspend()); + } + + Future reconnect() async + { + try + { + if (await connect()) + { + try + { + var bag = AsyncBag(); + + for(var i = 0; i < _resources.keys.length; i++) + { + var index = _resources.keys.elementAt(i); + // print("Re $i ${_resources[index].instance.template.className}"); + bag.add(fetch(index)); + } + + bag.seal(); + await bag; + } + catch(ex) + { + print(ex.toString()); + } + } + } + catch(ex) + { + return false; + } + + return true; + } + /// /// KeyList to store user variables related to this connection. /// @@ -227,10 +310,10 @@ class DistributedConnection extends NetworkConnection with IStore { super.assign(socket); - session.remoteAuthentication.source.attributes.add(SourceAttributeType.IPv4, socket.remoteEndPoint.address); - session.remoteAuthentication.source.attributes.add(SourceAttributeType.Port, socket.remoteEndPoint.port); - session.localAuthentication.source.attributes.add(SourceAttributeType.IPv4, socket.localEndPoint.address); - session.localAuthentication.source.attributes.add(SourceAttributeType.Port, socket.localEndPoint.port); + session.remoteAuthentication.source.attributes[SourceAttributeType.IPv4] = socket.remoteEndPoint.address; + session.remoteAuthentication.source.attributes[SourceAttributeType.Port] = socket.remoteEndPoint.port; + session.localAuthentication.source.attributes[SourceAttributeType.IPv4] = socket.localEndPoint.address; + session.localAuthentication.source.attributes[SourceAttributeType.Port] = socket.localEndPoint.port; if (session.localAuthentication.type == AuthenticationType.Client) { @@ -680,6 +763,7 @@ class DistributedConnection extends NetworkConnection with IStore _ready = true; _openReply.trigger(true); + _openReply = null; emitArgs("ready", []); //OnReady?.Invoke(this); // server.membership.login(session); @@ -747,6 +831,7 @@ class DistributedConnection extends NetworkConnection with IStore _ready = true; _openReply.trigger(true); + _openReply = null; emitArgs("ready", []); //OnReady?.Invoke(this); @@ -757,6 +842,7 @@ class DistributedConnection extends NetworkConnection with IStore { var ex = AsyncException(ErrorType.Management, _authPacket.errorCode, _authPacket.errorMessage); _openReply.triggerError(ex); + _openReply = null; emitArgs("error", [ex]); //OnError?.Invoke(this, authPacket.ErrorCode, authPacket.ErrorMessage); close(); @@ -815,6 +901,10 @@ class DistributedConnection extends NetworkConnection with IStore return true; } + + + + bool record(IResource resource, String propertyName, value, int age, DateTime dateTime) { // nothing to do @@ -874,6 +964,18 @@ class DistributedConnection extends NetworkConnection with IStore return reply; } + AsyncReply sendDetachRequest(int instanceId) + { + try + { + return sendRequest(IIPPacketAction.DetachResource).addUint32(instanceId).done(); + } + catch(ex) + { + return null; + } + } + AsyncReply sendInvokeByNamedArguments(int instanceId, int index, Structure parameters) { var pb = Codec.composeStructure(parameters, this, true, true, true); @@ -2148,48 +2250,44 @@ class DistributedConnection extends NetworkConnection with IStore /// DistributedResource AsyncReply fetch(int id) { - if (_resourceRequests.containsKey(id) && _resources.containsKey(id)) - { - //Console.WriteLine("DEAD LOCK " + id); + var resource = _resources[id]; + var request = _resourceRequests[id]; - return new AsyncReply.ready(_resources[id]); + if (request != null) + { // dig for dead locks - //return resourceRequests[id]; + if (resource != null) // dead lock + return new AsyncReply.ready(_resources[id]); + else + return request; } - else if (_resourceRequests.containsKey(id)) - return _resourceRequests[id]; - else if (_resources.containsKey(id)) - return new AsyncReply.ready(_resources[id]); + else if (resource != null && !resource.suspended) + return new AsyncReply.ready(resource); var reply = new AsyncReply(); _resourceRequests.add(id, reply); - - //print("fetch ${id}"); - + sendRequest(IIPPacketAction.AttachResource) .addUint32(id) .done() .then((rt) { - - //print("fetched ${id}"); - - var dr = new DistributedResource(this, id, rt[1], rt[2]); - + var dr = resource ?? new DistributedResource(this, id, rt[1], rt[2]); getTemplate(rt[0] as Guid).then((tmp) { //print("New template "); // ClassId, ResourceAge, ResourceLink, Content - Warehouse.put(dr, id.toString(), this, null, tmp); + if (resource == null) + Warehouse.put(dr, id.toString(), this, null, tmp); var d = rt[3] as DC; Codec.parsePropertyValueArray(d, 0, d.length, this).then((ar) { //print("attached"); - dr.attached(ar); + dr.attach(ar); _resourceRequests.remove(id); reply.trigger(dr); }); diff --git a/lib/src/Net/IIP/DistributedResource.dart b/lib/src/Net/IIP/DistributedResource.dart index e9120a7..b270057 100644 --- a/lib/src/Net/IIP/DistributedResource.dart +++ b/lib/src/Net/IIP/DistributedResource.dart @@ -22,6 +22,8 @@ SOFTWARE. */ +import 'package:esyur/esyur.dart'; + import '../../Resource/IResource.dart'; import '../../Core/AsyncReply.dart'; import '../../Data/PropertyValue.dart'; @@ -36,11 +38,8 @@ class DistributedResource extends IResource int _instanceId; DistributedConnection _connection; - - bool _isAttached = false; - bool _isReady = false; - - + bool _attached = false; + //bool _isReady = false; String _link; List _properties; @@ -63,24 +62,37 @@ class DistributedResource extends IResource /// int get id => _instanceId; + //bool get destroyed => _destroyed; + + bool get suspended => _suspended; + bool _suspended = true; + /// /// IDestructible interface. /// void destroy() { _destroyed = true; + _attached = false; + _connection.sendDetachRequest(_instanceId); emitArgs("destroy", [this]); } - + + void suspend() + { + _suspended = true; + _attached = false; + } + /// /// Resource is ready when all its properties are attached. /// - bool get isReady => _isReady; + // bool get isReady => _isReady; /// /// Resource is attached when all its properties are received. /// - bool get isAttached => _isAttached; + bool get attached => _attached; // public DistributedResourceStack Stack @@ -102,10 +114,10 @@ class DistributedResource extends IResource this._instanceId = instanceId; } - void _ready() - { - _isReady = true; - } + //void _ready() + //{ + // _isReady = true; + // } /// /// Export all properties with ResourceProperty attributed as bytes array. @@ -123,12 +135,14 @@ class DistributedResource extends IResource return props; } - bool attached(List properties) + bool attach(List properties) { - if (_isAttached) + if (_attached) return false; else { + _suspended = false; + _properties = new List(properties.length);// object[properties.Length]; //_events = new DistributedResourceEvent[Instance.Template.Events.Length]; @@ -146,7 +160,7 @@ class DistributedResource extends IResource //afterAttachmentTriggers.Clear(); - _isAttached = true; + _attached = true; } return true; @@ -166,6 +180,9 @@ class DistributedResource extends IResource if (_destroyed) throw new Exception("Trying to access destroyed object"); + if (_suspended) + throw new Exception("Trying to access suspended object"); + if (index >= instance.template.functions.length) throw new Exception("Function index is incorrect"); @@ -173,11 +190,16 @@ class DistributedResource extends IResource return connection.sendInvokeByNamedArguments(_instanceId, index, namedArgs); } + + AsyncReply invokeByArrayArguments(int index, List args) { if (_destroyed) throw new Exception("Trying to access destroyed object"); + if (_suspended) + throw new Exception("Trying to access suspended object"); + if (index >= instance.template.functions.length) throw new Exception("Function index is incorrect"); @@ -217,7 +239,7 @@ class DistributedResource extends IResource { var ft = instance.template.getFunctionTemplateByName(memberName); - if (_isAttached && ft!=null) + if (_attached && ft!=null) { if (invocation.namedArguments.length > 0) { diff --git a/lib/src/Net/NetworkConnection.dart b/lib/src/Net/NetworkConnection.dart index 224b72c..7b93ac3 100644 --- a/lib/src/Net/NetworkConnection.dart +++ b/lib/src/Net/NetworkConnection.dart @@ -48,11 +48,17 @@ class NetworkConnection extends IDestructible bool _processing = false; + // to be overridden + void connectionClosed() + { + + } + void destroy() { // if (connected) close(); - emitArgs("close", [this]); + //emitArgs("close", [this]); //OnDestroy?.Invoke(this); } @@ -73,7 +79,6 @@ class NetworkConnection extends IDestructible socket.on("receive", socket_OnReceive); socket.on("close", socket_OnClose); socket.on("connect", socket_OnConnect); - } @@ -84,6 +89,7 @@ class NetworkConnection extends IDestructible void socket_OnClose() { + connectionClosed(); emitArgs("close", [this]); } @@ -153,10 +159,8 @@ class NetworkConnection extends IDestructible void close() { - try { - if (_sock != null) _sock.close(); } diff --git a/lib/src/Resource/Warehouse.dart b/lib/src/Resource/Warehouse.dart index 3ba2f0c..cea61af 100644 --- a/lib/src/Resource/Warehouse.dart +++ b/lib/src/Resource/Warehouse.dart @@ -246,19 +246,10 @@ class Warehouse var rt = new AsyncReply(); // Should we create a new store ? - if (_urlRegex.hasMatch(path)) - //if (path.contains("://")) { var url = _urlRegex.allMatches(path).first; - //var url = path.split(_urlRegex); - - //var url = path.split(new string[] { "://" }, 2, StringSplitOptions.None); - //var hostname = url[1].Split(new char[] { '/' }, 2)[0]; - //var pathname = string.Join("/", url[1].Split(new char[] { '/' }).Skip(1)); - - if (protocols.containsKey(url[1])) { var handler = protocols[url[1]]; @@ -266,7 +257,6 @@ class Warehouse var store = handler(); put(store, url[2], null, parent, null, 0, manager, attributes); - store.trigger(ResourceTrigger.Open).then((x) { diff --git a/test/main.dart b/test/main.dart index 2c03a26..ac0dfc2 100644 --- a/test/main.dart +++ b/test/main.dart @@ -2,20 +2,39 @@ import "package:test/test.dart"; import 'package:esyur/esyur.dart'; import 'dart:io'; -main() +main() async { - test("Connect to server", () async { + //test("Connect to server", () async { // connect to the server - // var x = await Warehouse.get("iip://localhost:5000/sys/su", {"username": "admin", "password": "1234" - // , "domain": "example.com"}); + var x = await Warehouse.get("iip://localhost:5000/sys/su", {"username": "admin", "password": "1234" + , "domain": "example.com"}); + + var now = DateTime.now(); // desc(x); - // List trackers = await x.getMyTrackers(); + List trackers = await x.getMyTrackers(); + + print("Time ${DateTime.now().difference(now).inSeconds}"); + + print(x.suspended); + + DistributedConnection con = x.connection; + con.close(); + print(x.suspended); + + now = DateTime.now(); + + await con.reconnect(); + + print("Time ${DateTime.now().difference(now).inSeconds}"); + print(x.suspended); + var u = await x.getMyTrackers(); + print(trackers[0].suspended); // for(var i = 0; i < trackers.length; i++) // print(trackers[i].name); @@ -53,7 +72,7 @@ main() //print("Done"); - }); + //}); }