2
0
mirror of https://github.com/esiur/esiur-js.git synced 2025-05-06 04:22:58 +00:00

Static Calling

This commit is contained in:
Esiur Project 2022-08-07 23:08:33 +03:00
parent 551f0f4684
commit bf861395c3
10 changed files with 429 additions and 77 deletions

View File

@ -614,8 +614,10 @@ var _default = //const ExceptionCode =
NotAttached: 31,
AlreadyListened: 32,
AlreadyUnlistened: 33,
NotListenable: 34
NotListenable: 34,
ParseError: 35
};
exports["default"] = _default;
},{}],8:[function(require,module,exports){

1
node.js Normal file
View File

@ -0,0 +1 @@
await import("./src/esiur.js");

View File

@ -85,6 +85,14 @@ export default class AsyncReply extends Promise
this.chunk(callback);
}
timeout(milliseconds, onTimeout){
let self = this;
setTimeout(() => {
if (!self.ready && self.exception == null)
onTimeout();
}, milliseconds);
}
trigger(result)
{
if (this.ready)

View File

@ -142,6 +142,10 @@ export default class DistributedConnection extends IStore {
});
this.localNonce = this.generateNonce(32);
this.jitter = 0;
this.keepAliveTime = 10;
this.keepAliveInterval = 30;
}
@ -300,6 +304,18 @@ export default class DistributedConnection extends IStore {
// this.IIPRequestClearAttributes(packet.callbackId, packet.resourceId, packet.content, false);
break;
case IIPPacketAction.KeepAlive:
this.IIPRequestKeepAlive(packet.callbackId, packet.currentTime, packet.interval);
break;
case IIPPacketAction.ProcedureCall:
this.IIPRequestProcedureCall(packet.callbackId, packet.procedure, packet.dataType, msg);
break;
case IIPPacketAction.StaticCall:
this.IIPRequestStaticCall(packet.callbackId, packet.classId, packet.methodIndex, packet.dataType, msg);
break;
}
}
else if (packet.command == IIPPacketCommand.Reply) {
@ -346,6 +362,9 @@ export default class DistributedConnection extends IStore {
break;
case IIPPacketAction.InvokeFunction:
case IIPPacketAction.StaticCall:
case IIPPacketAction.ProcedureCall:
this.IIPReplyInvoke(packet.callbackId, packet.dataType, msg);
break;
@ -375,6 +394,10 @@ export default class DistributedConnection extends IStore {
this.IIPReply(packet.callbackId);
break;
case IIPPacketAction.KeepAlive:
this.IIPReply(packet.callbackId, packet.currentTime, packet.jitter);
break;
}
}
@ -754,6 +777,9 @@ export default class DistributedConnection extends IStore {
this.openReply?.trigger(true);
this._emit("ready", this);
}
// start perodic keep alive timer
setTimeout(this._keepAliveTimer_Elapsed, this.keepAliveInterval);
}
}
else if (authPacket.command == IIPAuthPacketCommand.Error)
@ -1182,13 +1208,28 @@ export default class DistributedConnection extends IStore {
networkClose(socket)
{
// clean up
this.readyToEstablish = false;
try
{
this.requests.values.forEach((x) => x.triggerError(new AsyncException(ErrorType.Management, 0, "Connection closed")));
this.resourceRequests.values.forEach((x) => x.triggerError(new AsyncException(ErrorType.Management, 0, "Connection closed")));
this.templateRequests.values.forEach((x) => x.triggerError(new AsyncException(ErrorType.Management, 0, "Connection closed")));
this.requests.values.forEach((x) => {
try {
x.triggerError(new AsyncException(ErrorType.Management, 0, "Connection closed"));
} catch (ex) { }
});
this.resourceRequests.values.forEach((x) => {
try {
x.triggerError(new AsyncException(ErrorType.Management, 0, "Connection closed"));
} catch (ex) { }
});
this.templateRequests.values.forEach((x) => {
try {
x.triggerError(new AsyncException(ErrorType.Management, 0, "Connection closed"));
} catch (ex) { }
});
}
catch(ex)
{
@ -1199,14 +1240,16 @@ export default class DistributedConnection extends IStore {
this.resourceRequests.clear();
this.templateRequests.clear();
this.resources.values.forEach((x) => x._suspend());
if (this.server != null) {
this._unsubscribeAll();
this.resources.values.forEach((x) => x._suspend());
this._unsubscribeAll();
Warehouse.remove(this);
Warehouse.remove(this);
if (this.ready)
this.server.membership.logout(this.session);
}
if (this.ready)
this.server?.membership.logout(this.session);
this.ready = false;
@ -1830,6 +1873,99 @@ export default class DistributedConnection extends IStore {
});
}
IIPRequestProcedureCall(callback, procedureCall, transmissionType, content)
{
if (this.server == null)
{
this.sendError(ErrorType.Management, callback, ExceptionCode.GeneralFailure);
return;
}
var call = this.server.calls.get(procedureCall);
if (call == null)
{
this.sendError(ErrorType.Management, callback, ExceptionCode.MethodNotFound);
return;
}
let parsed = Codec.parse(content, 0, this, null, transmissionType);
parsed.Then(results =>
{
// un hold the socket to send data immediately
this.socket.unhold();
// @TODO: Make managers for procedure calls
//if (r.Instance.Applicable(session, ActionType.Execute, ft) == Ruling.Denied)
//{
// SendError(ErrorType.Management, callback,
// (ushort)ExceptionCode.InvokeDenied);
// return;
//}
this.invokeFunction(call.method, callback, results, IIPPacketAction.ProcedureCall, call.target);
}).error(x =>
{
this.sendError(ErrorType.Management, callback, ExceptionCode.ParseError);
});
}
IIPRequestStaticCall(callback, classId, index, transmissionType, content)
{
let template = Warehouse.getTemplateByClassId(classId);
if (template == null)
{
this.sendError(ErrorType.Management, callback, ExceptionCode.TemplateNotFound);
return;
}
let ft = template.getFunctionTemplateByIndex(index);
if (ft == null)
{
// no function at this index
this.sendError(ErrorType.Management, callback, ExceptionCode.MethodNotFound);
return;
}
let parsed = Codec.parse(content, 0, this, null, transmissionType);
parsed.Then(results =>
{
// un hold the socket to send data immediately
this.socket.unhold();
var fi = ft.MethodInfo;
if (fi == null)
{
// ft found, fi not found, this should never happen
this.sendError(ErrorType.Management, callback, ExceptionCode.MethodNotFound);
return;
}
// @TODO: Make managers for static calls
//if (r.Instance.Applicable(session, ActionType.Execute, ft) == Ruling.Denied)
//{
// SendError(ErrorType.Management, callback,
// (ushort)ExceptionCode.InvokeDenied);
// return;
//}
this.invokeFunction(fi, callback, results, IIPPacketAction.StaticCall, null);
}).error(x =>
{
this.sendError(ErrorType.Management, callback, ExceptionCode.ParseError);
});
}
IIPRequestInvokeFunction(callback, resourceId, index, dataType, data) {
let self = this;
@ -1880,72 +2016,73 @@ export default class DistributedConnection extends IStore {
self.sendError(ErrorType.Management, callback, ExceptionCode.InvokeDenied);
return;
}
let indexedArgs = [];
for(let i = 0; i < ft.args.length; i++)
indexedArgs.push(args.get(i));
indexedArgs.push(self);
let rt;
try
{
rt = fi.apply(r, indexedArgs);
}
catch(ex)
{
self.sendError(ErrorType.Exception, callback, 0, ex.toString());
return;
}
// Is iterator ?
if (rt != null && rt[Symbol.iterator] instanceof Function) {
for (let v of rt)
self.sendChunk(callback, v);
self.sendReply(IIPPacketAction.InvokeFunction, callback)
.addUint8(DataType.Void)
.done();
}
else if (rt instanceof AsyncReply) {
rt.then(function (res) {
self.sendReply(IIPPacketAction.InvokeFunction, callback)
.addUint8Array(Codec.compose(res, self))
.done();
}).error(ex => {
self.sendError(ErrorType.Exception, callback, ex.code, ex.message);
}).progress((pt, pv, pm) =>
{
self.sendProgress(callback, pv, pm);
}).chunk(v =>
{
self.sendChunk(callback, v);
});
}
else if (rt instanceof Promise)
{
rt.then(function (res) {
self.sendReply(IIPPacketAction.InvokeFunction, callback)
.addUint8Array(Codec.compose(res, self))
.done();
}).catch(ex => {
self.sendError(ErrorType.Exception, callback, 0, ex.toString());
});
}
else {
self.sendReply(IIPPacketAction.InvokeFunction, callback)
.addUint8Array(Codec.compose(rt, self))
.done();
}
}
});
});
}
invokeFunction(fi, callback, parameters, actionType, target = null)
{
let indexedArgs = [];
for(let i = 0; i < ft.args.length; i++)
indexedArgs.push(parameters.get(i));
indexedArgs.push(self);
let rt;
try
{
rt = fi.apply(r, indexedArgs);
}
catch(ex)
{
self.sendError(ErrorType.Exception, callback, 0, ex.toString());
return;
}
// Is iterator ?
if (rt != null && rt[Symbol.iterator] instanceof Function) {
for (let v of rt)
self.sendChunk(callback, v);
self.sendReply(actionType, callback)
.addUint8(DataType.Void)
.done();
}
else if (rt instanceof AsyncReply) {
rt.then(function (res) {
self.sendReply(actionType, callback)
.addUint8Array(Codec.compose(res, self))
.done();
}).error(ex => {
self.sendError(ErrorType.Exception, callback, ex.code, ex.message);
}).progress((pt, pv, pm) =>
{
self.sendProgress(callback, pv, pm);
}).chunk(v =>
{
self.sendChunk(callback, v);
});
}
else if (rt instanceof Promise)
{
rt.then(function (res) {
self.sendReply(actionType, callback)
.addUint8Array(Codec.compose(res, self))
.done();
}).catch(ex => {
self.sendError(ErrorType.Exception, callback, 0, ex.toString());
});
}
else {
self.sendReply(actionType, callback)
.addUint8Array(Codec.compose(rt, self))
.done();
}
}
// IIPRequestGetProperty(callback, resourceId, index) {
// var self = this;
@ -2912,4 +3049,117 @@ export default class DistributedConnection extends IStore {
return rt;
}
_keepAliveTimer_Elapsed()
{
if (!this.isConnected)
return;
let self = this;
let now = new Date();
let interval = this._lastKeepAliveSent == null ? 0 :
(now - this._lastKeepAliveSent);
this._lastKeepAliveSent = now;
this.sendRequest(IIPPacketAction.KeepAlive)
.addDateTime(now)
.addUint32(interval)
.done()
.then(x =>
{
self.jitter = x[1];
setTimeout(_keepAliveTimer_Elapsed, self.keepAliveInterval);
console.log("Keep Alive Received " + jitter);
}).error((ex) =>
{
self.close();
}).timeout(self.keepAliveTime * 1000, () =>
{
self.close();
});
console.log("Keep alive sent ");
}
staticCall(classId, index, parameters)
{
var pb = Codec.compose(parameters, this);
var reply = new AsyncReply();
var c = this.callbackCounter++;
this.requests.add(c, reply);
this.sendParams()
.addUint8(0x40 | IIPPacket.IIPPacketAction.StaticCall)
.addUint32(c)
.addGuid(classId)
.addUint8(index)
.addUint8Array(pb)
.done();
return reply;
}
call(procedureCall)
{
var args = Map.from(UInt8, Object);
for (var i = 0; i < arguments.Length - 2; i++)
args.add(i, arguments[i+1]);
return this.callArgs(procedureCall, args);
}
callArgs(procedureCall, parameters)
{
var pb = Codec.Compose(parameters, this);
var reply = new AsyncReply();
var c = this.callbackCounter++;
this.requests.add(c, reply);
var callName = DC.stringToBytes(procedureCall);
sendParams()
.addUint8(0x40 | IIPPacketAction.ProcedureCall)
.addUint32(c)
.addUint16(callName.length)
.addUint8Array(callName)
.addUint8Array(pb)
.done();
return reply;
}
IIPRequestKeepAlive(callbackId, peerTime, interval)
{
let jitter = 0;
let now = new Date();
if (this._lastKeepAliveReceived != null)
{
var diff = now - this._lastKeepAliveReceived;
//Console.WriteLine("Diff " + diff + " " + interval);
jitter = Math.Abs(diff - interval);
}
this.sendParams()
.addUint8(0x80 | IIPPacketAction.KeepAlive)
.addUint32(callbackId)
.addDateTime(now)
.addUint32(jitter)
.done();
this._lastKeepAliveReceived = now;
}
}

View File

@ -231,7 +231,18 @@ export default class DistributedResource extends IResource
if (index >= this.instance.template.functions.length)
throw new Error("Function index is incorrect");
return this._p.connection.sendInvoke(this._p.instanceId, index, args);
let ft = this.instance.template.getFunctionTemplateByIndex(index);
if (ft == null)
throw new Exception("Function template not found.");
if (ft.isStatic)
return this._p.connection.staticCall(this.instance.template.classId, index, args);
else
return this._p.connection.sendInvoke(this._p.instanceId, index, args);
//return this._p.connection.sendInvoke(this._p.instanceId, index, args);
}
_get(index)

View File

@ -29,6 +29,7 @@
import IResource from '../../Resource/IResource.js';
import AsyncReply from '../../Core/AsyncReply.js';
import DistributedConnection from './DistributedConnection.js';
import KeyList from '../../Data/KeyList.js';
export default class DistributedServer extends IResource
{
@ -56,6 +57,7 @@ export default class DistributedServer extends IResource
{
super();
this.connections = [];
this.calls = new KeyList();
}
//@TODO: con.off("close", ...)
@ -72,4 +74,10 @@ export default class DistributedServer extends IResource
if (i > -1)
this.connections.splice(i, 1);
}
mapCall(call, handler)
{
this.calls.add(call, handler);
}
}

View File

@ -55,6 +55,10 @@ export default class IIPPacket
this.originalOffset = 0;
this.resourceName = "";
this.dataType = null;
this.jitter = 0;
this.interval = 0;
this.procedure = "";
this.currentTime = null;
}
notEnough(offset, ends, needed)
@ -62,7 +66,6 @@ export default class IIPPacket
if (offset + needed > ends)
{
this.dataLengthNeeded = needed - (ends - offset);
// this.dataLengthNeeded = needed - (ends - this.originalOffset);
return true;
}
else
@ -425,7 +428,56 @@ export default class IIPPacket
//this.content = data.clip(offset, cl);
offset += cl;
}
else if (this.action == IIPPacketAction.KeepAlive)
{
if (this.notEnough(offset, ends, 12))
return -this.dataLengthNeeded;
this.currentTime = data.getDateTime(offset);
offset += 8;
this.interval = data.getUint32(offset);
offset += 4;
}
else if (this.action == IIPPacketAction.ProcedureCall)
{
if (this.notEnough(offset, ends, 2))
return -this.dataLengthNeeded;
let cl = data.getUint16(offset);
offset += 2;
if (this.notEnough(offset, ends, cl))
return -this.dataLengthNeeded;
this.procedure = data.getString(offset, cl);
offset += cl;
if (this.notEnough(offset, ends, 1))
return -this.dataLengthNeeded;
let parsed = TransmissionType.parse(data, offset, ends);
if (parsed.type == null) return -parsed.size;
offset += parsed.size;
} else if (this.action == IIPPacketAction.StaticCall)
{
if (this.notEnough(offset, ends, 18))
return -this.dataLengthNeeded;
this.classId = data.getGuid(offset);
offset += 16;
this.methodIndex = data[offset++];
let parsed = TransmissionType.Pparse(data, offset, ends);
if (parsed.type == null) return -parsed.size;
offset += parsed.size;
}
}
else if (this.command == IIPPacketCommand.Reply)
{
@ -499,7 +551,9 @@ export default class IIPPacket
offset += parsed.size;
}
else if (this.action == IIPPacketAction.InvokeFunction)
else if (this.action == IIPPacketAction.InvokeFunction
|| this.action == IIPPacketAction.ProcedureCall
|| this.action == IIPPacketAction.StaticCall )
{
if (this.notEnough(offset, ends, 1))
@ -519,6 +573,16 @@ export default class IIPPacket
{
// nothing to do
}
else if (this.action == IIPPacketAction.KeepAlive)
{
if (this.notEnough(offset, ends, 12))
return -this.dataLengthNeeded;
this.currentTime = data.getDateTime(offset);
offset += 8;
this.jitter = data.GetUint32(offset);
offset += 4;
}
}
else if (this.command == IIPPacketCommand.Report)
{

View File

@ -33,5 +33,11 @@ export default // const IIPPacketAction =
ClearAllAttributes: 26,
GetAttributes: 27,
UpdateAttributes: 28,
ClearAttributes: 29
ClearAttributes: 29,
// Static
KeepAlive: 0x20,
ProcedureCall: 0x21,
StaticCall: 0x22
};

View File

@ -46,18 +46,19 @@ export default class FunctionTemplate extends MemberTemplate {
bl
.addInt32(exp.length)
.addDC(exp);
bl.insertUint8(0, this.inherited ? 0x90 : 0x10);
bl.insertUint8(0, (this.inherited ? 0x90 : 0x10) | (this.isStatic ? 0x4 : 0));
} else
bl.insertUint8(0, this.inherited ? 0x80 : 0x0);
bl.insertUint8(0, (this.inherited ? 0x80 : 0x0) | (this.isStatic ? 0x4 : 0));
return bl.toDC();
}
constructor(template, index, name, inherited, args, returnType, annotation = null){
constructor(template, index, name, inherited, isStatic, args, returnType, annotation = null){
super(template, index, name, inherited);
this.args = args;
this.returnType = returnType;
this.annotation = annotation;
this.isStatic = isStatic;
}
}

View File

@ -452,6 +452,7 @@ export default class TypeTemplate {
if (type == 0) // function
{
let annotation = null;
let isStatic = ((data[offset] & 0x4) == 0x4);
let hasAnnotation = ((data.getUint8(offset++) & 0x10) == 0x10);
let len = data.getUint8(offset++);
@ -484,7 +485,7 @@ export default class TypeTemplate {
offset += cs;
}
let ft = new FunctionTemplate(od, functionIndex++, name, inherited,
let ft = new FunctionTemplate(od, functionIndex++, name, inherited, isStatic,
args, dt.type, annotation);
od.functions.push(ft);