From 1985363ad767ea32343c30bbca46569320ff5d21 Mon Sep 17 00:00:00 2001 From: Esiur Project Date: Thu, 11 Aug 2022 21:27:58 +0300 Subject: [PATCH] Reconnect --- src/Core/AsyncReply.js | 16 +- src/Core/ExceptionCode.js | 4 +- src/Net/IIP/DistributedConnection.js | 456 +++++++++++++-------------- src/Net/IIP/DistributedResource.js | 29 +- 4 files changed, 250 insertions(+), 255 deletions(-) diff --git a/src/Core/AsyncReply.js b/src/Core/AsyncReply.js index 99c2e41..6557327 100644 --- a/src/Core/AsyncReply.js +++ b/src/Core/AsyncReply.js @@ -27,6 +27,8 @@ "use strict"; import AsyncException from './AsyncException.js'; +import ExceptionCode from './ExceptionCode.js'; +import ErrorType from './ErrorType.js'; export default class AsyncReply extends Promise { @@ -88,16 +90,24 @@ export default class AsyncReply extends Promise timeout(milliseconds, onTimeout){ let self = this; setTimeout(() => { - if (!self.ready && self.exception == null) - onTimeout(); + if (!self.ready && self.exception == null){ + + self.triggerError(ErrorType.Management, ExceptionCode.Timeout, "Execution timeout expired."); + + if (onTimeout instanceof Function) + onTimeout(); + } }, milliseconds); } trigger(result) { if (this.ready) - return; + return this; + if (this.exception != null) + return this; + this.result = result; this.ready = true; diff --git a/src/Core/ExceptionCode.js b/src/Core/ExceptionCode.js index a5468f4..3139102 100644 --- a/src/Core/ExceptionCode.js +++ b/src/Core/ExceptionCode.js @@ -34,5 +34,7 @@ export default //const ExceptionCode = NotAttached: 31, AlreadyListened: 32, AlreadyUnlistened: 33, - NotListenable: 34 + NotListenable: 34, + ParseError: 35, + Timeout: 36 }; \ No newline at end of file diff --git a/src/Net/IIP/DistributedConnection.js b/src/Net/IIP/DistributedConnection.js index 6667748..081c0c7 100644 --- a/src/Net/IIP/DistributedConnection.js +++ b/src/Net/IIP/DistributedConnection.js @@ -119,10 +119,16 @@ export default class DistributedConnection extends IStore { this.packet = new IIPPacket(); this.authPacket = new IIPAuthPacket(); - this.resources = new KeyList();//{}; + //this.resources = new KeyList();//{}; + + this._neededResources = new KeyList(); + this._attachedResources = new KeyList(); + this._suspendedResources = new KeyList(); + + this.templates = new KeyList(); this.requests = new KeyList();// {}; - //this.pathRequests = new KeyList();// {}; + this.templateRequests = new KeyList(); this.templateByNameRequests = new KeyList(); this.resourceRequests = new KeyList();// {}; @@ -141,11 +147,14 @@ export default class DistributedConnection extends IStore { } }); - this.localNonce = this.generateNonce(32); + this._localNonce = this.generateNonce(32); this.jitter = 0; this.keepAliveTime = 10; this.keepAliveInterval = 30; + this._invalidCredentials = false; + + this.autoReconnect = false; } @@ -453,11 +462,11 @@ export default class DistributedConnection extends IStore { if (x) { this.session.remoteAuthentication.username = authPacket.remoteUsername; - this.remoteNonce = authPacket.remoteNonce; + this._remoteNonce = authPacket.remoteNonce; this.session.remoteAuthentication.domain = authPacket.domain; this.sendParams() .addUint8(0xa0) - .addUint8Array(this.localNonce) + .addUint8Array(this._localNonce) .done(); } else @@ -496,11 +505,11 @@ export default class DistributedConnection extends IStore { { this.session.remoteAuthentication.username = x; this.session.remoteAuthentication.tokenIndex = authPacket.remoteTokenIndex; - this.remoteNonce = authPacket.remoteNonce; + this._remoteNonce = authPacket.remoteNonce; this.session.remoteAuthentication.domain = authPacket.domain; this.sendParams() .addUint8(0xa0) - .addUint8Array(this.localNonce) + .addUint8Array(this._localNonce) .done(); } else @@ -599,15 +608,15 @@ export default class DistributedConnection extends IStore { { var hash = SHA256.compute(BL() .addUint8Array(pw) - .addUint8Array(this.remoteNonce) - .addUint8Array(this.localNonce) + .addUint8Array(this._remoteNonce) + .addUint8Array(this._localNonce) .toArray()); if (hash.sequenceEqual(remoteHash)) { // send our hash var localHash = SHA256.compute(BL() - .addUint8Array(this.localNonce) - .addUint8Array(this.remoteNonce) + .addUint8Array(this._localNonce) + .addUint8Array(this._remoteNonce) .addUint8Array(pw) .toArray()); @@ -662,19 +671,22 @@ export default class DistributedConnection extends IStore { { this.ready = true; - this.openReply?.trigger(true); + this._openReply?.trigger(true); + this._openReply = null; this._emit("ready", this); this.server?.membership.login(this.session); }).error( x=> { - this.openReply?.triggerError(x); + this._openReply?.triggerError(x); + this._openReply = null; }); } else { this.ready = true; - this.openReply?.trigger(true); + this._openReply?.trigger(true); + this._openReply = null; this._emit("ready", this); this.server?.membership.login(this.session); } @@ -709,13 +721,13 @@ export default class DistributedConnection extends IStore { || authPacket.remoteMethod == AuthenticationMethod.Token) { - this.remoteNonce = authPacket.remoteNonce; + this._remoteNonce = authPacket.remoteNonce; // send our hash var localHash = SHA256.compute(BL() - .addUint8Array(this.localPasswordOrToken) - .addUint8Array(this.localNonce) - .addUint8Array(this.remoteNonce) + .addUint8Array(this._localPasswordOrToken) + .addUint8Array(this._localNonce) + .addUint8Array(this._remoteNonce) .toArray()); this.sendParams() @@ -731,9 +743,9 @@ export default class DistributedConnection extends IStore { { // check if the server knows my password var remoteHash = SHA256.compute(BL() - .addUint8Array(this.remoteNonce) - .addUint8Array(this.localNonce) - .addUint8Array(this.localPasswordOrToken) + .addUint8Array(this._remoteNonce) + .addUint8Array(this._localNonce) + .addUint8Array(this._localPasswordOrToken) .toArray()); @@ -760,7 +772,6 @@ export default class DistributedConnection extends IStore { { this.session.id = authPacket.sessionId; this.ready = true; - this.openReply?.trigger(true); this._emit("ready", this); // put it in the warehouse @@ -768,173 +779,41 @@ export default class DistributedConnection extends IStore { { Warehouse.put(this.localUsername, this, null, this.server).then(x => { - this.openReply?.trigger(true); + this._openReply?.trigger(true); + this._openReply = null; this._emit("ready", this); - }).error(x=> this.openReply?.triggerError(x)); + }).error( x=> { + this._openReply?.triggerError(x); + this._openReply = null; + }); } else { - this.openReply?.trigger(true); + this._openReply?.trigger(true); + this._openReply = null; this._emit("ready", this); } // start perodic keep alive timer - setTimeout(this._keepAliveTimer_Elapsed, this.keepAliveInterval); + this._keepAliveTimer = setTimeout(this._keepAliveTimerElapsed, this.keepAliveInterval); } } else if (authPacket.command == IIPAuthPacketCommand.Error) { - this.openReply?.triggerError(new AsyncException(ErrorType.Management, authPacket.errorCode, authPacket.errorMessage)); + this._invalidCredentials = true; + this._openReply?.triggerError(new AsyncException(ErrorType.Management, authPacket.errorCode, authPacket.errorMessage)); + this._openReply = null; + this._emit("error", this, authPacket.errorCode, authPacket.errorMessage); this.close(); } } - - - - - - - - - - - - - - - - - - - - // if (this.session.localAuthentication.type == AuthenticationType.Host) { - // if (authPacket.command == IIPAuthPacketCommand.Declare) { - // if (authPacket.remoteMethod == AuthenticationMethod.Credentials - // && authPacket.localMethod == AuthenticationMethod.None) { - - // console.log("Declare"); - // this.session.remoteAuthentication.username = authPacket.remoteUsername; - // this.remoteNonce = authPacket.remoteNonce; - // this.domain = authPacket.domain; - // this.sendParams().addUint8(0xa0).addUint8Array(this.localNonce).done(); - // } - // } - // else if (authPacket.command == IIPAuthPacketCommand.Action) { - // if (authPacket.action == IIPAuthPacketAction.AuthenticateHash) { - // var remoteHash = authPacket.hash; - - // this.server.membership.getPassword(this.session.remoteAuthentication.username, this.domain).then(function (pw) { - // if (pw != null) { - - // //var hash = new DC(sha256.arrayBuffer(BL().addString(pw).addUint8Array(remoteNonce).addUint8Array(this.localNonce).toArray())); - // var hash = SHA256.compute(BL().addString(pw).addUint8Array(remoteNonce).addUint8Array(this.localNonce).toDC()); - - - // if (hash.sequenceEqual(remoteHash)) { - // // send our hash - // //var localHash = new DC(sha256.arrayBuffer((new BinaryList()).addUint8Array(this.localNonce).addUint8Array(remoteNonce).addUint8Array(pw).toArray())); - // var localHash = SHA256.compute(BL().addUint8Array(this.localNonce).addUint8Array(remoteNonce).addUint8Array(pw).toDC()); - // this.sendParams().addUint8(0).addUint8Array(localHash).done(); - - // this.readyToEstablish = true; - // } - // else { - // // incorrect password - // this.sendParams().addUint8(0xc0) - // .addInt32(ExceptionCode.AccessDenied) - // .addUint16(13) - // .addString("Access Denied") - // .done(); - // } - // } - // }); - // } - // else if (authPacket.action == IIPAuthPacketAction.NewConnection) { - // if (readyToEstablish) { - // this.session.id = this.generateNonce(32);// new DC(32); - // //window.crypto.getRandomValues(this.session.id); - - // this.sendParams().addUint8(0x28).addUint8Array(this.session.id).done(); - // this.ready = true; - - // this.openReply.trigger(this); - // this.openReply = null; - // //this._emit("ready", this); - // } - // } - // } - // } - // else if (this.session.localAuthentication.type == AuthenticationType.Client) { - // if (authPacket.command == IIPAuthPacketCommand.Acknowledge) { - // this.remoteNonce = authPacket.remoteNonce; - - // // send our hash - - // var localHash = SHA256.compute(BL().addUint8Array(this.localPasswordOrToken) - // .addUint8Array(this.localNonce) - // .addUint8Array(this.remoteNonce).toDC()); - - // this.sendParams().addUint8(0).addUint8Array(localHash).done(); - // } - // else if (authPacket.command == IIPAuthPacketCommand.Action) { - // if (authPacket.action == IIPAuthPacketAction.AuthenticateHash) { - - // var remoteHash = SHA256.compute(BL().addUint8Array(this.remoteNonce) - // .addUint8Array(this.localNonce) - // .addUint8Array(this.localPasswordOrToken).toDC()); - - - // if (remoteHash.sequenceEqual(authPacket.hash)) { - // // send establish request - // this.sendParams().addUint8(0x20).addUint16(0).done(); - // } - // else { - // this.sendParams().addUint8(0xc0) - // .addUint32(ExceptionCode.ChallengeFailed) - // .addUint16(16) - // .addString("Challenge Failed") - // .done(); - // } - // } - // else if (authPacket.action == IIPAuthPacketAction.ConnectionEstablished) { - // this.session.id = authPacket.sessionId; - // this.ready = true; - // this.openReply.trigger(this); - // this.openReply = null; - - // //this._emit("ready", this); - // } - // } - // else if (authPacket.command == IIPAuthPacketCommand.Error) { - // this.openReply.triggerError(1, authPacket.errorCode, authPacket.errorMessage); - // this.openReply = null; - // //this._emit("error", this, authPacket.errorCode, authPacket.errorMessage); - // this.close(); - // } - // } } } return offset; - //if (offset < ends) - // this.processPacket(msg, offset, ends, data); } - - // dataReceived(data) { - // var msg = data.read(); - // var offset = 0; - // var ends = msg.length; - // var packet = this.packet; - - // //console.log("Data"); - - // while (offset < ends) { - // offset = this.processPacket(msg, offset, ends, data); - // } - // } - _dataReceived(data) { @@ -970,22 +849,64 @@ export default class DistributedConnection extends IStore { async reconnect() { try { - if (await this.connect()) { - try { - var bag = new AsyncBag(); + if (!await this.connect()) + return false; - for (var i = 0; i < this.resources.keys.length; i++) { - var index = this.resources.keys[i]; - bag.add(this.fetch(index, null)); + try { + + let toBeRestored = []; + for(var i = 0 ; i < this.suspendedResources.length; i++) + { + var r = this.suspendedResources.values[i].deref(); + if (r != null) + toBeRestored.push(r); + } + + for(let r of toBeRestored) + { + let link = DC.stringToBytes(r._p.link); + console.log("Restoreing " + r._p.link); + + try + { + var ar = await this.sendRequest(IIPPacket.IIPPacketAction.QueryLink) + .addUint16(link.Length) + .addUint8Array(link) + .done(); + + var dataType = ar[0]; + var data = ar[1]; + + if (dataType.identifier == TransmissionTypeIdentifier.ResourceList) + { + // parse them as int + var id = data.getUint32(8); + if (id != r._p.id) + r._p.id = id; + + this.neededResources.set(id, r); + this.suspendedResources.remove(id); + + await this.fetch(id, null); + + } + } + catch (ex) + { + if (ex.code == ExceptionCode.ResourceNotFound) + { + // skip this resource + } + else + { + break; + } } - - bag.seal(); - await bag; - } - catch (ex) { - console.log(ex); } } + catch (ex) { + console.log(ex); + } } catch (ex) { return false; @@ -1065,10 +986,10 @@ export default class DistributedConnection extends IStore { username = null, tokenIndex = 0, passwordOrToken = null, domain = null, secure = false) { - if (this.openReply != null) + if (this._openReply != null) throw new AsyncException(ErrorType.Exception, 0, "Connection in progress"); - this.openReply = new AsyncReply(); + this._openReply = new AsyncReply(); if (hostname != null) { @@ -1079,7 +1000,8 @@ export default class DistributedConnection extends IStore { this.session.localAuthentication.tokenIndex = tokenIndex; this.session.localAuthentication.domain = domain; this.session.localAuthentication.username = username; - this.localPasswordOrToken = passwordOrToken; + this._localPasswordOrToken = passwordOrToken; + this._invalidCredentials = false; } if (this.session == null) @@ -1097,46 +1019,32 @@ export default class DistributedConnection extends IStore { if (secure != null) this._secure = secure; + this._connectSocket(socket); + + return this._openReply; + + } + + _connectSocket(socket){ let self = this; socket.connect(this._hostname, this._port, this._secure).then(x => - { - self.assign(socket); - }).error((x) => - { - self.openReply?.triggerError(x); - self.openReply = null; - }); - - return this.openReply; - - // //connect(secure, method, hostname, port, username, tokenIndex, passwordOrToken, domain) { - // this.openReply = new AsyncReply(); - - - // if (secure !== undefined) { - - // this.session.localAuthentication.method = method; - // this.session.localAuthentication.tokenIndex = tokenIndex; - - // this.session.localAuthentication.domain = domain; - // this.session.localAuthentication.username = username; - // this.localPasswordOrToken = passwordOrToken; - - // //this.url = `ws${secure ? 's' : ''}://${this.instance.name}`; - // this.url = `ws${secure ? 's' : ''}://${hostname}:${port}`; - - // let socket = new WebSocket(this.url, "iip"); - // socket.binaryType = "arraybuffer"; - // socket.connection = this; - - // this.assign(socket); - - // return this.openReply; - // } + { + self.assign(socket); + }).error((x) => + { + if (self.autoReconnect){ + console.log("Reconnecting socket..."); + setTimeout(() => { + self._connectSocket(socket); + }, 5000); + } else { + self._openReply?.triggerError(x); + self._openReply = null; + } + }); } - _declare() { // declare (Credentials -> No Auth, No Enctypt) var dmn = DC.stringToBytes(this.session.localAuthentication.domain); @@ -1148,7 +1056,7 @@ export default class DistributedConnection extends IStore { .addUint8(0x60) .addUint8(dmn.length) .addUint8Array(dmn) - .addUint8Array(this.localNonce) + .addUint8Array(this._localNonce) .addUint8(un.length) .addUint8Array(un) .done(); @@ -1158,7 +1066,7 @@ export default class DistributedConnection extends IStore { .addUint8(0x70) .addUint8(dmn.length) .addUint8Array(dmn) - .addUint8Array(this.localNonce) + .addUint8Array(this._localNonce) .addUint64(this.session.localAuthentication.tokenIndex) .done(); } @@ -1209,8 +1117,11 @@ export default class DistributedConnection extends IStore { networkClose(socket) { // clean up + this.ready = false; this.readyToEstablish = false; + clearTimeout(this._keepAliveTimer); + try { this.requests.values.forEach((x) => { @@ -1240,17 +1151,33 @@ export default class DistributedConnection extends IStore { this.resourceRequests.clear(); this.templateRequests.clear(); + for (let x of this._attachedResources.values) + { + let r = x.deref(); + if (r != null){ + r.susped(); + this._suspendedResources.set(r._p.id, x); + } + } + if (this.server != null) { - this.resources.values.forEach((x) => x._suspend()); + this._suspendedResources.clear(); + this._unsubscribeAll(); Warehouse.remove(this); if (this.ready) this.server.membership.logout(this.session); } + else if (this.autoReconnect && !this._invalidCredentials){ + setTimeout(this.reconnect, 5000); + } + else { + this._suspendedResources.clear(); + } - + this._attachedResources.clear(); this.ready = false; this._emit("close", this); @@ -1306,12 +1233,11 @@ export default class DistributedConnection extends IStore { } } - reconnect() { - - } put(resource) { - this.resources.add(parseInt(resource.instance.name), resource); + if (codec.isLocalResource(resource, this)) + this.neededResources.add(resource._p.id, resource); + return new AsyncReply(true); } @@ -1327,7 +1253,25 @@ export default class DistributedConnection extends IStore { return this.sendParams(reply).addUint8(0x40 | action).addUint32(this.callbackCounter); } - sendDetachRequest(instanceId) + + detachResource(instanceId){ + try + { + if (this._attachedResources.containsKey(instanceId)) + this._attachedResources.remove(instanceId); + + if (this._suspendedResources.containsKey(instanceId)) + this._suspendedResources.remove(instanceId); + + await this._sendDetachRequest(instanceId); + } + catch + { + + } + } + + _sendDetachRequest(instanceId) { try { @@ -1446,11 +1390,20 @@ export default class DistributedConnection extends IStore { } IIPEventResourceDestroyed(resourceId) { - if (this.resources.item(resourceId)) { - var r = this.resources.item(resourceId); - this.resources.remove(resourceId); - r.destroy(); + + if (this._attachedResources.contains(resourceId)) + { + let r = this._attachedResources.get(resourceId).deref(); + r?.destroy(); + + this._attachedResources.remove(resourceId); } + else if (this._neededResources.contains(resourceId)) + { + // @TODO: handle this mess + this._neededResources.remove(resourceId); + } + } IIPEventPropertyUpdated(resourceId, index, dataType, data) { @@ -2494,17 +2447,17 @@ export default class DistributedConnection extends IStore { */ } - retrieve(iid) { + // retrieve(iid) { - let r = this.resources.item(iid); + // let r = this.resources.item(iid); - return new AsyncReply(r); + // return new AsyncReply(r); - //for (var r in this.resources) - // if (this.resources[r].instance.id == iid) - // return new AsyncReply(r); - //return new AsyncReply(null); - } + // //for (var r in this.resources) + // // if (this.resources[r].instance.id == iid) + // // return new AsyncReply(r); + // //return new AsyncReply(null); + // } getLinkTemplates(link) { @@ -2547,7 +2500,14 @@ export default class DistributedConnection extends IStore { // Get a resource from the other end fetch(id, requestSequence) { - let resource = this.resources.item(id); + + let resource = this._attachedResources.item(id)?.deref(); + + if (resource != null) + return new AsyncReply(resource); + + resource = this._neededResources.item(id); + let request = this.resourceRequests.item(id); if (request != null) { @@ -2557,6 +2517,9 @@ export default class DistributedConnection extends IStore { return request; } else if (resource != null && !resource._p.suspended) { + + // @REVIEW: this should never happen + console.log("DCON", LogType.Error, "Resource not moved to attached."); return new AsyncReply(resource); } @@ -2592,9 +2555,11 @@ export default class DistributedConnection extends IStore { else dr = new DistributedResource(self, id, rt[1], rt[2]); } - else + else { dr = resource; - + template = resource.instance.template; + } + //let dr = resource || new DistributedResource(self, id, rt[1], rt[2]); let transmissionType = rt[3] ; @@ -2611,8 +2576,10 @@ export default class DistributedConnection extends IStore { ar[i + 2], ar[i], ar[i + 1])); dr._attach(pvs); - self.resourceRequests.remove(id); + // move from needed to attached + self._neededResources.remove(id); + self._attachedResources.set(id, new WeakRef(resource)); reply.trigger(dr); }) .error((ex) => reply.triggerError(ex)); @@ -3050,7 +3017,7 @@ export default class DistributedConnection extends IStore { } - _keepAliveTimer_Elapsed() + _keepAliveTimerElapsed() { if (!this.isConnected) return; @@ -3070,9 +3037,8 @@ export default class DistributedConnection extends IStore { .then(x => { self.jitter = x[1]; - setTimeout(_keepAliveTimer_Elapsed, self.keepAliveInterval); + self._keepAliveTimer = setTimeout(_keepAliveTimerElapsed, self.keepAliveInterval); console.log("Keep Alive Received " + jitter); - }).error((ex) => { self.close(); diff --git a/src/Net/IIP/DistributedResource.js b/src/Net/IIP/DistributedResource.js index 8fc22aa..cbe68e0 100644 --- a/src/Net/IIP/DistributedResource.js +++ b/src/Net/IIP/DistributedResource.js @@ -43,12 +43,16 @@ export default class DistributedResource extends IResource { destroy() { - this.destroyed = true; + this._p.destroyed = true; this._p.attached = false; - this._p.connection.sendDetachRequest(this._p.instanceId); + this._p.connection.detachResource(this._p.instanceId); this._emit("destroy", this); } + get destroyed () { + return this._p.destroyed; + } + _suspend() { this._p.suspended = true; @@ -65,6 +69,7 @@ export default class DistributedResource extends IResource super(); this._p = { + destroyed: false, suspended: false, attached: false, connection: connection, @@ -119,6 +124,12 @@ export default class DistributedResource extends IResource { var func = function () { + if (self._p.destroyed) + throw new Error("Trying to access a destroyed object."); + + if (self._p.suspended) + throw new Error("Trying to access a suspended object."); + var argsMap = new (TypedMap.of(UInt8, Object)); if ( arguments.length == 1 @@ -160,6 +171,12 @@ export default class DistributedResource extends IResource var makeSetter = function(index) { return function (value) { + if (self._p.destroyed) + throw new Error("Trying to access a destroyed object."); + + if (self._p.suspended) + throw new Error("Trying to access a suspended object."); + self._set(index, value); }; }; @@ -222,14 +239,14 @@ export default class DistributedResource extends IResource } _invoke(index, args) { - if (this.destroyed) - throw new Error("Trying to access destroyed object"); + if (this._p.destroyed) + throw new Error("Trying to access a destroyed object."); if (this._p.suspended) - throw new Error("Trying to access suspended object"); + throw new Error("Trying to access a suspended object."); if (index >= this.instance.template.functions.length) - throw new Error("Function index is incorrect"); + throw new Error("Function index is incorrect."); let ft = this.instance.template.getFunctionTemplateByIndex(index);