2
0
mirror of https://github.com/esiur/esiur-js.git synced 2025-05-06 12:32:58 +00:00

Reconnect

This commit is contained in:
Esiur Project 2022-08-11 21:27:58 +03:00
parent 78d74d540b
commit 1985363ad7
4 changed files with 250 additions and 255 deletions

View File

@ -27,6 +27,8 @@
"use strict";
import AsyncException from './AsyncException.js';
import ExceptionCode from './ExceptionCode.js';
import ErrorType from './ErrorType.js';
export default class AsyncReply extends Promise
{
@ -88,15 +90,23 @@ export default class AsyncReply extends Promise
timeout(milliseconds, onTimeout){
let self = this;
setTimeout(() => {
if (!self.ready && self.exception == null)
onTimeout();
if (!self.ready && self.exception == null){
self.triggerError(ErrorType.Management, ExceptionCode.Timeout, "Execution timeout expired.");
if (onTimeout instanceof Function)
onTimeout();
}
}, milliseconds);
}
trigger(result)
{
if (this.ready)
return;
return this;
if (this.exception != null)
return this;
this.result = result;
this.ready = true;

View File

@ -34,5 +34,7 @@ export default //const ExceptionCode =
NotAttached: 31,
AlreadyListened: 32,
AlreadyUnlistened: 33,
NotListenable: 34
NotListenable: 34,
ParseError: 35,
Timeout: 36
};

View File

@ -119,10 +119,16 @@ export default class DistributedConnection extends IStore {
this.packet = new IIPPacket();
this.authPacket = new IIPAuthPacket();
this.resources = new KeyList();//{};
//this.resources = new KeyList();//{};
this._neededResources = new KeyList();
this._attachedResources = new KeyList();
this._suspendedResources = new KeyList();
this.templates = new KeyList();
this.requests = new KeyList();// {};
//this.pathRequests = new KeyList();// {};
this.templateRequests = new KeyList();
this.templateByNameRequests = new KeyList();
this.resourceRequests = new KeyList();// {};
@ -141,11 +147,14 @@ export default class DistributedConnection extends IStore {
}
});
this.localNonce = this.generateNonce(32);
this._localNonce = this.generateNonce(32);
this.jitter = 0;
this.keepAliveTime = 10;
this.keepAliveInterval = 30;
this._invalidCredentials = false;
this.autoReconnect = false;
}
@ -453,11 +462,11 @@ export default class DistributedConnection extends IStore {
if (x)
{
this.session.remoteAuthentication.username = authPacket.remoteUsername;
this.remoteNonce = authPacket.remoteNonce;
this._remoteNonce = authPacket.remoteNonce;
this.session.remoteAuthentication.domain = authPacket.domain;
this.sendParams()
.addUint8(0xa0)
.addUint8Array(this.localNonce)
.addUint8Array(this._localNonce)
.done();
}
else
@ -496,11 +505,11 @@ export default class DistributedConnection extends IStore {
{
this.session.remoteAuthentication.username = x;
this.session.remoteAuthentication.tokenIndex = authPacket.remoteTokenIndex;
this.remoteNonce = authPacket.remoteNonce;
this._remoteNonce = authPacket.remoteNonce;
this.session.remoteAuthentication.domain = authPacket.domain;
this.sendParams()
.addUint8(0xa0)
.addUint8Array(this.localNonce)
.addUint8Array(this._localNonce)
.done();
}
else
@ -599,15 +608,15 @@ export default class DistributedConnection extends IStore {
{
var hash = SHA256.compute(BL()
.addUint8Array(pw)
.addUint8Array(this.remoteNonce)
.addUint8Array(this.localNonce)
.addUint8Array(this._remoteNonce)
.addUint8Array(this._localNonce)
.toArray());
if (hash.sequenceEqual(remoteHash))
{
// send our hash
var localHash = SHA256.compute(BL()
.addUint8Array(this.localNonce)
.addUint8Array(this.remoteNonce)
.addUint8Array(this._localNonce)
.addUint8Array(this._remoteNonce)
.addUint8Array(pw)
.toArray());
@ -662,19 +671,22 @@ export default class DistributedConnection extends IStore {
{
this.ready = true;
this.openReply?.trigger(true);
this._openReply?.trigger(true);
this._openReply = null;
this._emit("ready", this);
this.server?.membership.login(this.session);
}).error( x=>
{
this.openReply?.triggerError(x);
this._openReply?.triggerError(x);
this._openReply = null;
});
}
else
{
this.ready = true;
this.openReply?.trigger(true);
this._openReply?.trigger(true);
this._openReply = null;
this._emit("ready", this);
this.server?.membership.login(this.session);
}
@ -709,13 +721,13 @@ export default class DistributedConnection extends IStore {
|| authPacket.remoteMethod == AuthenticationMethod.Token)
{
this.remoteNonce = authPacket.remoteNonce;
this._remoteNonce = authPacket.remoteNonce;
// send our hash
var localHash = SHA256.compute(BL()
.addUint8Array(this.localPasswordOrToken)
.addUint8Array(this.localNonce)
.addUint8Array(this.remoteNonce)
.addUint8Array(this._localPasswordOrToken)
.addUint8Array(this._localNonce)
.addUint8Array(this._remoteNonce)
.toArray());
this.sendParams()
@ -731,9 +743,9 @@ export default class DistributedConnection extends IStore {
{
// check if the server knows my password
var remoteHash = SHA256.compute(BL()
.addUint8Array(this.remoteNonce)
.addUint8Array(this.localNonce)
.addUint8Array(this.localPasswordOrToken)
.addUint8Array(this._remoteNonce)
.addUint8Array(this._localNonce)
.addUint8Array(this._localPasswordOrToken)
.toArray());
@ -760,7 +772,6 @@ export default class DistributedConnection extends IStore {
{
this.session.id = authPacket.sessionId;
this.ready = true;
this.openReply?.trigger(true);
this._emit("ready", this);
// put it in the warehouse
@ -768,174 +779,42 @@ export default class DistributedConnection extends IStore {
{
Warehouse.put(this.localUsername, this, null, this.server).then(x =>
{
this.openReply?.trigger(true);
this._openReply?.trigger(true);
this._openReply = null;
this._emit("ready", this);
}).error(x=> this.openReply?.triggerError(x));
}).error( x=> {
this._openReply?.triggerError(x);
this._openReply = null;
});
}
else
{
this.openReply?.trigger(true);
this._openReply?.trigger(true);
this._openReply = null;
this._emit("ready", this);
}
// start perodic keep alive timer
setTimeout(this._keepAliveTimer_Elapsed, this.keepAliveInterval);
this._keepAliveTimer = setTimeout(this._keepAliveTimerElapsed, this.keepAliveInterval);
}
}
else if (authPacket.command == IIPAuthPacketCommand.Error)
{
this.openReply?.triggerError(new AsyncException(ErrorType.Management, authPacket.errorCode, authPacket.errorMessage));
this._invalidCredentials = true;
this._openReply?.triggerError(new AsyncException(ErrorType.Management, authPacket.errorCode, authPacket.errorMessage));
this._openReply = null;
this._emit("error", this, authPacket.errorCode, authPacket.errorMessage);
this.close();
}
}
// if (this.session.localAuthentication.type == AuthenticationType.Host) {
// if (authPacket.command == IIPAuthPacketCommand.Declare) {
// if (authPacket.remoteMethod == AuthenticationMethod.Credentials
// && authPacket.localMethod == AuthenticationMethod.None) {
// console.log("Declare");
// this.session.remoteAuthentication.username = authPacket.remoteUsername;
// this.remoteNonce = authPacket.remoteNonce;
// this.domain = authPacket.domain;
// this.sendParams().addUint8(0xa0).addUint8Array(this.localNonce).done();
// }
// }
// else if (authPacket.command == IIPAuthPacketCommand.Action) {
// if (authPacket.action == IIPAuthPacketAction.AuthenticateHash) {
// var remoteHash = authPacket.hash;
// this.server.membership.getPassword(this.session.remoteAuthentication.username, this.domain).then(function (pw) {
// if (pw != null) {
// //var hash = new DC(sha256.arrayBuffer(BL().addString(pw).addUint8Array(remoteNonce).addUint8Array(this.localNonce).toArray()));
// var hash = SHA256.compute(BL().addString(pw).addUint8Array(remoteNonce).addUint8Array(this.localNonce).toDC());
// if (hash.sequenceEqual(remoteHash)) {
// // send our hash
// //var localHash = new DC(sha256.arrayBuffer((new BinaryList()).addUint8Array(this.localNonce).addUint8Array(remoteNonce).addUint8Array(pw).toArray()));
// var localHash = SHA256.compute(BL().addUint8Array(this.localNonce).addUint8Array(remoteNonce).addUint8Array(pw).toDC());
// this.sendParams().addUint8(0).addUint8Array(localHash).done();
// this.readyToEstablish = true;
// }
// else {
// // incorrect password
// this.sendParams().addUint8(0xc0)
// .addInt32(ExceptionCode.AccessDenied)
// .addUint16(13)
// .addString("Access Denied")
// .done();
// }
// }
// });
// }
// else if (authPacket.action == IIPAuthPacketAction.NewConnection) {
// if (readyToEstablish) {
// this.session.id = this.generateNonce(32);// new DC(32);
// //window.crypto.getRandomValues(this.session.id);
// this.sendParams().addUint8(0x28).addUint8Array(this.session.id).done();
// this.ready = true;
// this.openReply.trigger(this);
// this.openReply = null;
// //this._emit("ready", this);
// }
// }
// }
// }
// else if (this.session.localAuthentication.type == AuthenticationType.Client) {
// if (authPacket.command == IIPAuthPacketCommand.Acknowledge) {
// this.remoteNonce = authPacket.remoteNonce;
// // send our hash
// var localHash = SHA256.compute(BL().addUint8Array(this.localPasswordOrToken)
// .addUint8Array(this.localNonce)
// .addUint8Array(this.remoteNonce).toDC());
// this.sendParams().addUint8(0).addUint8Array(localHash).done();
// }
// else if (authPacket.command == IIPAuthPacketCommand.Action) {
// if (authPacket.action == IIPAuthPacketAction.AuthenticateHash) {
// var remoteHash = SHA256.compute(BL().addUint8Array(this.remoteNonce)
// .addUint8Array(this.localNonce)
// .addUint8Array(this.localPasswordOrToken).toDC());
// if (remoteHash.sequenceEqual(authPacket.hash)) {
// // send establish request
// this.sendParams().addUint8(0x20).addUint16(0).done();
// }
// else {
// this.sendParams().addUint8(0xc0)
// .addUint32(ExceptionCode.ChallengeFailed)
// .addUint16(16)
// .addString("Challenge Failed")
// .done();
// }
// }
// else if (authPacket.action == IIPAuthPacketAction.ConnectionEstablished) {
// this.session.id = authPacket.sessionId;
// this.ready = true;
// this.openReply.trigger(this);
// this.openReply = null;
// //this._emit("ready", this);
// }
// }
// else if (authPacket.command == IIPAuthPacketCommand.Error) {
// this.openReply.triggerError(1, authPacket.errorCode, authPacket.errorMessage);
// this.openReply = null;
// //this._emit("error", this, authPacket.errorCode, authPacket.errorMessage);
// this.close();
// }
// }
}
}
return offset;
//if (offset < ends)
// this.processPacket(msg, offset, ends, data);
}
// dataReceived(data) {
// var msg = data.read();
// var offset = 0;
// var ends = msg.length;
// var packet = this.packet;
// //console.log("Data");
// while (offset < ends) {
// offset = this.processPacket(msg, offset, ends, data);
// }
// }
_dataReceived(data)
{
var msg = data.read();
@ -970,22 +849,64 @@ export default class DistributedConnection extends IStore {
async reconnect() {
try {
if (await this.connect()) {
try {
var bag = new AsyncBag();
if (!await this.connect())
return false;
for (var i = 0; i < this.resources.keys.length; i++) {
var index = this.resources.keys[i];
bag.add(this.fetch(index, null));
try {
let toBeRestored = [];
for(var i = 0 ; i < this.suspendedResources.length; i++)
{
var r = this.suspendedResources.values[i].deref();
if (r != null)
toBeRestored.push(r);
}
for(let r of toBeRestored)
{
let link = DC.stringToBytes(r._p.link);
console.log("Restoreing " + r._p.link);
try
{
var ar = await this.sendRequest(IIPPacket.IIPPacketAction.QueryLink)
.addUint16(link.Length)
.addUint8Array(link)
.done();
var dataType = ar[0];
var data = ar[1];
if (dataType.identifier == TransmissionTypeIdentifier.ResourceList)
{
// parse them as int
var id = data.getUint32(8);
if (id != r._p.id)
r._p.id = id;
this.neededResources.set(id, r);
this.suspendedResources.remove(id);
await this.fetch(id, null);
}
}
catch (ex)
{
if (ex.code == ExceptionCode.ResourceNotFound)
{
// skip this resource
}
else
{
break;
}
}
bag.seal();
await bag;
}
catch (ex) {
console.log(ex);
}
}
catch (ex) {
console.log(ex);
}
}
catch (ex) {
return false;
@ -1065,10 +986,10 @@ export default class DistributedConnection extends IStore {
username = null, tokenIndex = 0, passwordOrToken = null, domain = null, secure = false)
{
if (this.openReply != null)
if (this._openReply != null)
throw new AsyncException(ErrorType.Exception, 0, "Connection in progress");
this.openReply = new AsyncReply();
this._openReply = new AsyncReply();
if (hostname != null)
{
@ -1079,7 +1000,8 @@ export default class DistributedConnection extends IStore {
this.session.localAuthentication.tokenIndex = tokenIndex;
this.session.localAuthentication.domain = domain;
this.session.localAuthentication.username = username;
this.localPasswordOrToken = passwordOrToken;
this._localPasswordOrToken = passwordOrToken;
this._invalidCredentials = false;
}
if (this.session == null)
@ -1097,46 +1019,32 @@ export default class DistributedConnection extends IStore {
if (secure != null)
this._secure = secure;
this._connectSocket(socket);
return this._openReply;
}
_connectSocket(socket){
let self = this;
socket.connect(this._hostname, this._port, this._secure).then(x =>
{
self.assign(socket);
}).error((x) =>
{
self.openReply?.triggerError(x);
self.openReply = null;
});
return this.openReply;
// //connect(secure, method, hostname, port, username, tokenIndex, passwordOrToken, domain) {
// this.openReply = new AsyncReply();
// if (secure !== undefined) {
// this.session.localAuthentication.method = method;
// this.session.localAuthentication.tokenIndex = tokenIndex;
// this.session.localAuthentication.domain = domain;
// this.session.localAuthentication.username = username;
// this.localPasswordOrToken = passwordOrToken;
// //this.url = `ws${secure ? 's' : ''}://${this.instance.name}`;
// this.url = `ws${secure ? 's' : ''}://${hostname}:${port}`;
// let socket = new WebSocket(this.url, "iip");
// socket.binaryType = "arraybuffer";
// socket.connection = this;
// this.assign(socket);
// return this.openReply;
// }
{
self.assign(socket);
}).error((x) =>
{
if (self.autoReconnect){
console.log("Reconnecting socket...");
setTimeout(() => {
self._connectSocket(socket);
}, 5000);
} else {
self._openReply?.triggerError(x);
self._openReply = null;
}
});
}
_declare() {
// declare (Credentials -> No Auth, No Enctypt)
var dmn = DC.stringToBytes(this.session.localAuthentication.domain);
@ -1148,7 +1056,7 @@ export default class DistributedConnection extends IStore {
.addUint8(0x60)
.addUint8(dmn.length)
.addUint8Array(dmn)
.addUint8Array(this.localNonce)
.addUint8Array(this._localNonce)
.addUint8(un.length)
.addUint8Array(un)
.done();
@ -1158,7 +1066,7 @@ export default class DistributedConnection extends IStore {
.addUint8(0x70)
.addUint8(dmn.length)
.addUint8Array(dmn)
.addUint8Array(this.localNonce)
.addUint8Array(this._localNonce)
.addUint64(this.session.localAuthentication.tokenIndex)
.done();
}
@ -1209,8 +1117,11 @@ export default class DistributedConnection extends IStore {
networkClose(socket)
{
// clean up
this.ready = false;
this.readyToEstablish = false;
clearTimeout(this._keepAliveTimer);
try
{
this.requests.values.forEach((x) => {
@ -1240,17 +1151,33 @@ export default class DistributedConnection extends IStore {
this.resourceRequests.clear();
this.templateRequests.clear();
for (let x of this._attachedResources.values)
{
let r = x.deref();
if (r != null){
r.susped();
this._suspendedResources.set(r._p.id, x);
}
}
if (this.server != null) {
this.resources.values.forEach((x) => x._suspend());
this._suspendedResources.clear();
this._unsubscribeAll();
Warehouse.remove(this);
if (this.ready)
this.server.membership.logout(this.session);
}
else if (this.autoReconnect && !this._invalidCredentials){
setTimeout(this.reconnect, 5000);
}
else {
this._suspendedResources.clear();
}
this._attachedResources.clear();
this.ready = false;
this._emit("close", this);
@ -1306,12 +1233,11 @@ export default class DistributedConnection extends IStore {
}
}
reconnect() {
}
put(resource) {
this.resources.add(parseInt(resource.instance.name), resource);
if (codec.isLocalResource(resource, this))
this.neededResources.add(resource._p.id, resource);
return new AsyncReply(true);
}
@ -1327,7 +1253,25 @@ export default class DistributedConnection extends IStore {
return this.sendParams(reply).addUint8(0x40 | action).addUint32(this.callbackCounter);
}
sendDetachRequest(instanceId)
detachResource(instanceId){
try
{
if (this._attachedResources.containsKey(instanceId))
this._attachedResources.remove(instanceId);
if (this._suspendedResources.containsKey(instanceId))
this._suspendedResources.remove(instanceId);
await this._sendDetachRequest(instanceId);
}
catch
{
}
}
_sendDetachRequest(instanceId)
{
try
{
@ -1446,11 +1390,20 @@ export default class DistributedConnection extends IStore {
}
IIPEventResourceDestroyed(resourceId) {
if (this.resources.item(resourceId)) {
var r = this.resources.item(resourceId);
this.resources.remove(resourceId);
r.destroy();
if (this._attachedResources.contains(resourceId))
{
let r = this._attachedResources.get(resourceId).deref();
r?.destroy();
this._attachedResources.remove(resourceId);
}
else if (this._neededResources.contains(resourceId))
{
// @TODO: handle this mess
this._neededResources.remove(resourceId);
}
}
IIPEventPropertyUpdated(resourceId, index, dataType, data) {
@ -2494,17 +2447,17 @@ export default class DistributedConnection extends IStore {
*/
}
retrieve(iid) {
// retrieve(iid) {
let r = this.resources.item(iid);
// let r = this.resources.item(iid);
return new AsyncReply(r);
// return new AsyncReply(r);
//for (var r in this.resources)
// if (this.resources[r].instance.id == iid)
// return new AsyncReply(r);
//return new AsyncReply(null);
}
// //for (var r in this.resources)
// // if (this.resources[r].instance.id == iid)
// // return new AsyncReply(r);
// //return new AsyncReply(null);
// }
getLinkTemplates(link)
{
@ -2547,7 +2500,14 @@ export default class DistributedConnection extends IStore {
// Get a resource from the other end
fetch(id, requestSequence) {
let resource = this.resources.item(id);
let resource = this._attachedResources.item(id)?.deref();
if (resource != null)
return new AsyncReply<DistributedResource>(resource);
resource = this._neededResources.item(id);
let request = this.resourceRequests.item(id);
if (request != null) {
@ -2557,6 +2517,9 @@ export default class DistributedConnection extends IStore {
return request;
}
else if (resource != null && !resource._p.suspended) {
// @REVIEW: this should never happen
console.log("DCON", LogType.Error, "Resource not moved to attached.");
return new AsyncReply(resource);
}
@ -2592,8 +2555,10 @@ export default class DistributedConnection extends IStore {
else
dr = new DistributedResource(self, id, rt[1], rt[2]);
}
else
else {
dr = resource;
template = resource.instance.template;
}
//let dr = resource || new DistributedResource(self, id, rt[1], rt[2]);
@ -2611,8 +2576,10 @@ export default class DistributedConnection extends IStore {
ar[i + 2], ar[i], ar[i + 1]));
dr._attach(pvs);
self.resourceRequests.remove(id);
// move from needed to attached
self._neededResources.remove(id);
self._attachedResources.set(id, new WeakRef(resource));
reply.trigger(dr);
})
.error((ex) => reply.triggerError(ex));
@ -3050,7 +3017,7 @@ export default class DistributedConnection extends IStore {
}
_keepAliveTimer_Elapsed()
_keepAliveTimerElapsed()
{
if (!this.isConnected)
return;
@ -3070,9 +3037,8 @@ export default class DistributedConnection extends IStore {
.then(x =>
{
self.jitter = x[1];
setTimeout(_keepAliveTimer_Elapsed, self.keepAliveInterval);
self._keepAliveTimer = setTimeout(_keepAliveTimerElapsed, self.keepAliveInterval);
console.log("Keep Alive Received " + jitter);
}).error((ex) =>
{
self.close();

View File

@ -43,12 +43,16 @@ export default class DistributedResource extends IResource
{
destroy()
{
this.destroyed = true;
this._p.destroyed = true;
this._p.attached = false;
this._p.connection.sendDetachRequest(this._p.instanceId);
this._p.connection.detachResource(this._p.instanceId);
this._emit("destroy", this);
}
get destroyed () {
return this._p.destroyed;
}
_suspend()
{
this._p.suspended = true;
@ -65,6 +69,7 @@ export default class DistributedResource extends IResource
super();
this._p = {
destroyed: false,
suspended: false,
attached: false,
connection: connection,
@ -119,6 +124,12 @@ export default class DistributedResource extends IResource
{
var func = function () {
if (self._p.destroyed)
throw new Error("Trying to access a destroyed object.");
if (self._p.suspended)
throw new Error("Trying to access a suspended object.");
var argsMap = new (TypedMap.of(UInt8, Object));
if ( arguments.length == 1
@ -160,6 +171,12 @@ export default class DistributedResource extends IResource
var makeSetter = function(index)
{
return function (value) {
if (self._p.destroyed)
throw new Error("Trying to access a destroyed object.");
if (self._p.suspended)
throw new Error("Trying to access a suspended object.");
self._set(index, value);
};
};
@ -222,14 +239,14 @@ export default class DistributedResource extends IResource
}
_invoke(index, args) {
if (this.destroyed)
throw new Error("Trying to access destroyed object");
if (this._p.destroyed)
throw new Error("Trying to access a destroyed object.");
if (this._p.suspended)
throw new Error("Trying to access suspended object");
throw new Error("Trying to access a suspended object.");
if (index >= this.instance.template.functions.length)
throw new Error("Function index is incorrect");
throw new Error("Function index is incorrect.");
let ft = this.instance.template.getFunctionTemplateByIndex(index);