2
0
mirror of https://github.com/esiur/esiur-dart.git synced 2025-05-06 12:02:57 +00:00

Static Calling

This commit is contained in:
Esiur Project 2022-08-07 23:09:25 +03:00
parent 8eca096b94
commit f5d3ea9ee5
9 changed files with 431 additions and 86 deletions

View File

@ -126,6 +126,10 @@ class AsyncReply<T> implements Future<T> {
}
AsyncReply<T> timeout(Duration timeLimit, {FutureOr<T?> onTimeout()?}) {
Future.delayed(timeLimit, () {
if (!_resultReady && _exception == null) onTimeout?.call();
});
return this;
}

View File

@ -1,39 +1,38 @@
enum ExceptionCode
{
HostNotReachable,
AccessDenied,
UserOrTokenNotFound,
ChallengeFailed,
ResourceNotFound,
AttachDenied,
InvalidMethod,
InvokeDenied,
CreateDenied,
AddParentDenied,
AddChildDenied,
ViewAttributeDenied,
UpdateAttributeDenied,
StoreNotFound,
ParentNotFound,
ChildNotFound,
ResourceIsNotStore,
DeleteDenied,
DeleteFailed,
UpdateAttributeFailed,
GetAttributesFailed,
ClearAttributesFailed,
TemplateNotFound,
RenameDenied,
ClassNotFound,
MethodNotFound,
PropertyNotFound,
SetPropertyDenied,
ReadOnlyProperty,
GeneralFailure,
AddToStoreFailed,
NotAttached,
AlreadyListened,
AlreadyUnlistened,
NotListenable
enum ExceptionCode {
HostNotReachable,
AccessDenied,
UserOrTokenNotFound,
ChallengeFailed,
ResourceNotFound,
AttachDenied,
InvalidMethod,
InvokeDenied,
CreateDenied,
AddParentDenied,
AddChildDenied,
ViewAttributeDenied,
UpdateAttributeDenied,
StoreNotFound,
ParentNotFound,
ChildNotFound,
ResourceIsNotStore,
DeleteDenied,
DeleteFailed,
UpdateAttributeFailed,
GetAttributesFailed,
ClearAttributesFailed,
TemplateNotFound,
RenameDenied,
ClassNotFound,
MethodNotFound,
PropertyNotFound,
SetPropertyDenied,
ReadOnlyProperty,
GeneralFailure,
AddToStoreFailed,
NotAttached,
AlreadyListened,
AlreadyUnlistened,
NotListenable,
ParseError
}

View File

@ -22,6 +22,8 @@ SOFTWARE.
*/
import 'dart:async';
import 'package:collection/collection.dart';
import '../../Data/IntType.dart';
@ -141,6 +143,11 @@ class DistributedConnection extends NetworkConnection with IStore {
Map<IResource, List<int>> _subscriptions = new Map<IResource, List<int>>();
DateTime? _lastKeepAliveSent;
DateTime? _lastKeepAliveReceived;
int jitter;
int keepAliveTime = 10;
/// <summary>
/// Local username to authenticate ourselves.
/// </summary>
@ -313,18 +320,31 @@ class DistributedConnection extends NetworkConnection with IStore {
_ready = false;
_readyToEstablish = false;
_requests.values.forEach((x) => x.triggerError(
AsyncException(ErrorType.Management, 0, "Connection closed")));
_resourceRequests.values.forEach((x) => x.triggerError(
AsyncException(ErrorType.Management, 0, "Connection closed")));
_templateRequests.values.forEach((x) => x.triggerError(
AsyncException(ErrorType.Management, 0, "Connection closed")));
_requests.values.forEach((x) { try {
x.triggerError(AsyncException(ErrorType.Management, 0, "Connection closed"));
} catch (ex){ }
});
_resourceRequests.values.forEach((x) { try {
x.triggerError(AsyncException(ErrorType.Management, 0, "Connection closed"));
} catch (ex){ }
});
_templateRequests.values.forEach((x) { try {
x.triggerError(AsyncException(ErrorType.Management, 0, "Connection closed"));
} catch (ex){ }
});
_requests.clear();
_resourceRequests.clear();
_templateRequests.clear();
_resources.values.forEach((x) => x.suspend());
// @TODO: check if we need this with reconnect
// _resources.values.forEach((x) => x.suspend());
//_unsubscribeAll();
}
Future<bool> reconnect() async {
@ -479,6 +499,10 @@ class DistributedConnection extends NetworkConnection with IStore {
return null;
}
Timer? _keepAliveTimer;
int KeepAliveInterval = 30;
void init() {
_queue.then((x) {
if (x?.type == DistributedResourceQueueItemType.Event)
@ -491,6 +515,43 @@ class DistributedConnection extends NetworkConnection with IStore {
var n = new DC(32);
for (var i = 0; i < 32; i++) n[i] = r.nextInt(255);
_localNonce = n;
// _keepAliveTimer =
// Timer(Duration(seconds: KeepAliveInterval), _keepAliveTimer_Elapsed);
}
void _keepAliveTimer_Elapsed() {
if (!isConnected) return;
_keepAliveTimer?.cancel();
var now = DateTime.now().toUtc();
int interval = _lastKeepAliveSent == null
? 0
: (now.difference(_lastKeepAliveSent!).inMilliseconds);
_lastKeepAliveSent = now;
sendRequest(IIPPacketAction.KeepAlive)
..addDateTime(now)
..addUint32(interval)
..done().then((x) {
jitter = x?[1];
_keepAliveTimer = Timer(
Duration(seconds: KeepAliveInterval), _keepAliveTimer_Elapsed);
print("Keep Alive Received ${jitter}");
}).error((ex) {
_keepAliveTimer?.cancel();
close();
}).timeout(Duration(microseconds: keepAliveTime), onTimeout: () {
_keepAliveTimer?.cancel();
close();
});
}
int processPacket(
@ -675,6 +736,21 @@ class DistributedConnection extends NetworkConnection with IStore {
//iipRequestClearAttributes(
// packet.callbackId, packet.resourceId, packet.content, false);
break;
case IIPPacketAction.KeepAlive:
iipRequestKeepAlive(packet.callbackId, packet.currentTime, packet.interval);
break;
case IIPPacketAction.ProcedureCall:
iipRequestProcedureCall(packet.callbackId, packet.procedure, (TransmissionType)packet.dataType, msg);
break;
case IIPPacketAction.StaticCall:
iipRequestStaticCall(packet.callbackId, packet.classId, packet.methodIndex, (TransmissionType)packet.dataType, msg);
break;
}
} else if (packet.command == IIPPacketCommand.Reply) {
switch (packet.action) {
@ -739,6 +815,9 @@ class DistributedConnection extends NetworkConnection with IStore {
// Invoke
case IIPPacketAction.InvokeFunction:
case IIPPacketAction.StaticCall:
case IIPPacketAction.ProcedureCall:
iipReplyInvoke(packet.callbackId,
packet.dataType ?? TransmissionType.Null, msg);
break;
@ -770,6 +849,10 @@ class DistributedConnection extends NetworkConnection with IStore {
case IIPPacketAction.ClearAttributes:
iipReply(packet.callbackId);
break;
case IIPPacketAction.KeepAlive:
iipReply(packet.callbackId, packet.currentTime, packet.jitter);
break;
}
} else if (packet.command == IIPPacketCommand.Report) {
switch (packet.report) {
@ -948,7 +1031,8 @@ class DistributedConnection extends NetworkConnection with IStore {
_openReply = null;
emitArgs("ready", []);
//OnReady?.Invoke(this);
// start perodic keep alive timer
_keepAliveTimer = Timer(Duration(seconds: KeepAliveInterval), _keepAliveTimer_Elapsed);
}
} else if (_authPacket.command == IIPAuthPacketCommand.Error) {
@ -1435,18 +1519,30 @@ class DistributedConnection extends NetworkConnection with IStore {
_subscriptions[resource] = <int>[];
}
void _unsubscrive(IResource resource) {
void _unsubscribe(IResource resource) {
resource.instance?.off("resourceEventOccurred", _instance_EventOccurred);
resource.instance?.off("resourceModified", _instance_PropertyModified);
resource.instance?.off("resourceDestroyed", _instance_ResourceDestroyed);
_subscriptions.remove(resource);
}
void _unsubscribeAll(){
_subscriptions.forEach((resource, value) => {
resource.instance?.off("resourceEventOccurred", _instance_EventOccurred);
resource.instance?.off("resourceModified", _instance_PropertyModified);
resource.instance?.off("resourceDestroyed", _instance_ResourceDestroyed);
});
_subscriptions.clear();
}
void iipRequestReattachResource(
int callback, int resourceId, int resourceAge) {
Warehouse.getById(resourceId).then((r) {
if (r != null) {
_unsubscrive(r);
_unsubscribe(r);
_subscribe(r);
// reply ok
@ -1465,7 +1561,7 @@ class DistributedConnection extends NetworkConnection with IStore {
void iipRequestDetachResource(int callback, int resourceId) {
Warehouse.getById(resourceId).then((res) {
if (res != null) {
_unsubscrive(res);
_unsubscribe(res);
// reply ok
sendReply(IIPPacketAction.DetachResource, callback).done();
} else {
@ -1969,7 +2065,106 @@ class DistributedConnection extends NetworkConnection with IStore {
});
}
void IIPRequestResourceAttribute(int callback, int resourceId) {}
void iipRequestProcedureCall(int callback, String procedureCall, TransmissionType transmissionType, DC content)
{
// server not implemented
sendError(ErrorType.Management, callback, ExceptionCode.GeneralFailure.index);
// if (server == null)
// {
// sendError(ErrorType.Management, callback, ExceptionCode.GeneralFailure.index);
// return;
// }
// var call = Server.Calls[procedureCall];
// if (call == null)
// {
// sendError(ErrorType.Management, callback, ExceptionCode.MethodNotFound.index);
// return;
// }
// var (_, parsed) = Codec.Parse(content, 0, this, null, transmissionType);
// parsed.Then(results =>
// {
// var arguments = (Map<byte, object>)results;// (object[])results;
// // un hold the socket to send data immediately
// this.Socket.Unhold();
// // @TODO: Make managers for procedure calls
// //if (r.Instance.Applicable(session, ActionType.Execute, ft) == Ruling.Denied)
// //{
// // SendError(ErrorType.Management, callback,
// // (ushort)ExceptionCode.InvokeDenied);
// // return;
// //}
// InvokeFunction(call.Method, callback, arguments, IIPPacket.IIPPacketAction.ProcedureCall, call.Target);
// }).Error(x =>
// {
// SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError);
// });
}
void IIPRequestStaticCall(int callback, Guid classId, int index, TransmissionType transmissionType, DC content)
{
var template = Warehouse.getTemplateByClassId(classId);
if (template == null)
{
sendError(ErrorType.Management, callback, ExceptionCode.TemplateNotFound.index);
return;
}
var ft = template.getFunctionTemplateByIndex(index);
if (ft == null)
{
// no function at this index
sendError(ErrorType.Management, callback, ExceptionCode.MethodNotFound.index);
return;
}
// var parsed = Codec.parse(content, 0, this, null, transmissionType);
// parsed.then((results)
// {
// var arguments = (Map<byte, object>)results;
// // un hold the socket to send data immediately
// socket?.unhold();
// var fi = ft.methodInfo;
// if (fi == null)
// {
// // ft found, fi not found, this should never happen
// sendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound);
// return;
// }
// // @TODO: Make managers for static calls
// //if (r.Instance.Applicable(session, ActionType.Execute, ft) == Ruling.Denied)
// //{
// // SendError(ErrorType.Management, callback,
// // (ushort)ExceptionCode.InvokeDenied);
// // return;
// //}
// InvokeFunction(fi, callback, arguments, IIPPacket.IIPPacketAction.StaticCall, null);
// }).Error(x =>
// {
// SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError);
// });
}
void iipRequestResourceAttribute(int callback, int resourceId) {}
// @TODO: Check for deadlocks
void iipRequestInvokeFunction(int callback, int resourceId, int index,
@ -2840,4 +3035,79 @@ class DistributedConnection extends NetworkConnection with IStore {
@override
TemplateDescriber get template =>
TemplateDescriber("Esiur.Net.IIP.DistributedConnection");
AsyncReply<dynamic> staticCall(Guid classId, int index, Map<UInt8, dynamic> parameters)
{
var pb = Codec.compose(parameters, this);
var reply = AsyncReply<dynamic>();
var c = _callbackCounter++;
_requests.add(c, reply);
sendParams()..addUint8((0x40 | IIPPacketAction.StaticCall))
..addUint32(c)
..addGuid(classId)
..addUint8(index)
..addDC(pb)
..done();
return reply;
}
// AsyncReply<dynamic> Call(String procedureCall, params object[] parameters)
// {
// var args = new Map<byte, object>();
// for (byte i = 0; i < parameters.Length; i++)
// args.Add(i, parameters[i]);
// return Call(procedureCall, args);
// }
AsyncReply<dynamic> call(String procedureCall, Map<UInt8, dynamic> parameters)
{
var pb = Codec.compose(parameters, this);
var reply = new AsyncReply<dynamic>();
var c = _callbackCounter++;
_requests.add(c, reply);
var callName = DC.stringToBytes(procedureCall);
sendParams()..addUint8(0x40 | IIPPacketAction.ProcedureCall)
..addUint32(c)
..addUint16(callName.length)
..addDC(callName)
..addDC(pb)
..done();
return reply;
}
void iipRequestKeepAlive(int callbackId, DateTime peerTime, int interval)
{
int jitter = 0;
var now = DateTime.now().toUtc();
if (_lastKeepAliveReceived != null)
{
var diff = now.difference(_lastKeepAliveReceived!).inMicroseconds;
//Console.WriteLine("Diff " + diff + " " + interval);
jitter =(diff -interval).abs();
}
sendParams()
..addUint8(0x80 | IIPPacketAction.KeepAlive)
..addUint32(callbackId)
..addDateTime(now)
..addUint32(jitter)
..done();
_lastKeepAliveReceived = now;
}
}

View File

@ -23,6 +23,7 @@ SOFTWARE.
*/
import 'dart:async';
import 'dart:ffi';
import '../../Data/IntType.dart';
@ -45,8 +46,6 @@ import '../Packets/IIPPacketAction.dart';
import '../../Resource/Template/EventTemplate.dart';
class DistributedResource extends IResource {
int? _instanceId;
DistributedConnection? _connection;
@ -255,8 +254,17 @@ class DistributedResource extends IResource {
if (index >= ins.template.functions.length)
throw new Exception("Function index is incorrect");
return _connection?.sendInvoke(_instanceId as int, index, args)
as AsyncReply;
// return _connection?.sendInvoke(_instanceId as int, index, args)
// as AsyncReply;
var ft = ins.template.getFunctionTemplateByIndex(index);
if (ft == null) throw new Exception("Function template not found.");
if (ft.isStatic)
return _connection?.staticCall(ins.template.classId, index, args);
else
return _connection?.sendInvoke(_instanceId as Int, index, args);
}
operator [](String index) {

View File

@ -73,6 +73,11 @@ class IIPPacket {
TransmissionType? dataType;
DateTime currentTime = DateTime(2000);
int interval = 0;
int jitter = 0;
String procedure = "";
int _dataLengthNeeded = 0;
int _originalOffset = 0;
@ -373,6 +378,44 @@ class IIPPacket {
//@TODO: fix this
//content = data.clip(offset, cl);
offset += cl;
} else if (action == IIPPacketAction.KeepAlive) {
if (_notEnough(offset, ends, 12)) return -_dataLengthNeeded;
currentTime = data.getDateTime(offset);
offset += 8;
interval = data.getUint32(offset);
offset += 4;
} else if (action == IIPPacketAction.ProcedureCall) {
if (_notEnough(offset, ends, 2)) return -_dataLengthNeeded;
var cl = data.getUint16(offset);
offset += 2;
if (_notEnough(offset, ends, cl)) return -_dataLengthNeeded;
procedure = data.getString(offset, cl);
offset += cl;
if (_notEnough(offset, ends, 1)) return -_dataLengthNeeded;
var parsed = TransmissionType.parse(data, offset, ends);
if (dataType == null) return -parsed.size;
offset += parsed.size;
} else if (action == IIPPacketAction.StaticCall) {
if (_notEnough(offset, ends, 18)) return -_dataLengthNeeded;
classId = data.getGuid(offset);
offset += 16;
methodIndex = data[offset++];
var parsed = TransmissionType.parse(data, offset, ends);
if (dataType == null) return -parsed.size;
offset += parsed.size;
}
} else if (command == IIPPacketCommand.Reply) {
if (action == IIPPacketAction.AttachResource ||
@ -397,13 +440,6 @@ class IIPPacket {
if (parsed.type == null) return -parsed.size;
//print("Not enough ${parsed.size}");
// } else {
// print(
// "attach parsed ${parsed.size} ${cl} ${data.length} ${ends} ${offset}");
// }
dataType = parsed.type;
offset += parsed.size;
} else if (action == IIPPacketAction.DetachResource) {
@ -411,9 +447,6 @@ class IIPPacket {
} else if (action == IIPPacketAction.CreateResource) {
if (_notEnough(offset, ends, 20)) return -_dataLengthNeeded;
//ClassId = data.GetGuid(offset);
//offset += 16;
resourceId = data.getUint32(offset);
offset += 4;
} else if (action == IIPPacketAction.DetachResource) {
@ -440,10 +473,9 @@ class IIPPacket {
dataType = parsed.type;
offset += parsed.size;
} else if (action == IIPPacketAction.InvokeFunction)
//|| action == IIPPacketAction.GetProperty
//|| action == IIPPacketAction.GetPropertyIfModified)
{
} else if (action == IIPPacketAction.InvokeFunction ||
action == IIPPacketAction.ProcedureCall ||
action == IIPPacketAction.StaticCall) {
if (_notEnough(offset, ends, 1)) return -_dataLengthNeeded;
var parsed = TransmissionType.parse(data, offset, ends);
@ -456,6 +488,13 @@ class IIPPacket {
action == IIPPacketAction.Listen ||
action == IIPPacketAction.Unlisten) {
// nothing to do
} else if (action == IIPPacketAction.KeepAlive) {
if (_notEnough(offset, ends, 12)) return -_dataLengthNeeded;
currentTime = data.getDateTime(offset);
offset += 8;
jitter = data.getUint32(offset);
offset += 4;
}
} else if (command == IIPPacketCommand.Report) {
if (report == IIPPacketReport.ManagementError) {

View File

@ -33,4 +33,9 @@ class IIPPacketAction {
static const int GetAttributes = 0x1B;
static const int UpdateAttributes = 0x1C;
static const int ClearAttributes = 0x1D;
// Static
static const int KeepAlive = 0x20;
static const int ProcedureCall = 0x21;
static const int StaticCall = 0x22;
}

View File

@ -468,16 +468,30 @@ class TemplateGenerator {
var positionalArgs = f.arguments.where((x) => !x.optional);
var optionalArgs = f.arguments.where((x) => x.optional);
rt.write("AsyncReply<$rtTypeName> ${f.name}(");
if (positionalArgs.length > 0)
if (f.isStatic) {
rt.write(
"${positionalArgs.map((a) => getTypeName(template, a.type, templates) + " " + a.name).join(',')}");
"AsyncReply<$rtTypeName> ${f.name}(DistributedConnection connection");
if (optionalArgs.length > 0) {
if (positionalArgs.length > 0) rt.write(",");
rt.write(
"[${optionalArgs.map((a) => getTypeName(template, a.type.toNullable(), templates) + " " + a.name).join(',')}]");
if (positionalArgs.length > 0)
rt.write(
", ${positionalArgs.map((a) => getTypeName(template, a.type, templates) + " " + a.name).join(',')}");
if (optionalArgs.length > 0) {
rt.write(
", [${optionalArgs.map((a) => getTypeName(template, a.type.toNullable(), templates) + " " + a.name).join(',')}]");
}
} else {
rt.write("AsyncReply<$rtTypeName> ${f.name}(");
if (positionalArgs.length > 0)
rt.write(
"${positionalArgs.map((a) => getTypeName(template, a.type, templates) + " " + a.name).join(',')}");
if (optionalArgs.length > 0) {
if (positionalArgs.length > 0) rt.write(",");
rt.write(
"[${optionalArgs.map((a) => getTypeName(template, a.type.toNullable(), templates) + " " + a.name).join(',')}]");
}
}
rt.writeln(") {");
@ -491,11 +505,17 @@ class TemplateGenerator {
});
rt.writeln("var rt = AsyncReply<$rtTypeName>();");
rt.writeln("internal_invoke(${f.index}, args)");
if (f.isStatic) {
rt.writeln(
'connection.staticCall(Guid.parse("${template.classId.toString()}"), ${f.index}, args)');
} else {
rt.writeln("internal_invoke(${f.index}, args)");
}
rt.writeln("..then((x) => rt.trigger(x))");
rt.writeln("..error((x) => rt.triggerError(x))");
rt.writeln("..chunk((x) => rt.triggerChunk(x));");
rt.writeln("return rt; }");
});
template.properties.where((p) => !p.inherited).forEach((p) {

View File

@ -2,7 +2,6 @@ import 'MemberTemplate.dart';
import '../../Data/DC.dart';
import '../../Data/BinaryList.dart';
import 'TypeTemplate.dart';
import 'MemberType.dart';
import 'ArgumentTemplate.dart';
import '../../Data/RepresentationType.dart';
@ -12,6 +11,7 @@ class FunctionTemplate extends MemberTemplate {
List<ArgumentTemplate> arguments;
RepresentationType returnType;
bool isStatic;
DC compose() {
var name = super.compose();
@ -29,15 +29,15 @@ class FunctionTemplate extends MemberTemplate {
bl
..addInt32(exp.length)
..addDC(exp);
bl.insertUint8(0, inherited ? 0x90 : 0x10);
bl.insertUint8(0, (inherited ? 0x90 : 0x10) | (isStatic ? 0x4 : 0));
} else
bl.insertUint8(0, inherited ? 0x80 : 0x0);
bl.insertUint8(0, (inherited ? 0x80 : 0x0) | (isStatic ? 0x4 : 0));
return bl.toDC();
}
FunctionTemplate(TypeTemplate template, int index, String name,
bool inherited, this.arguments, this.returnType,
bool inherited, this.isStatic, this.arguments, this.returnType,
[this.annotation = null])
: super(template, index, name, inherited) {}
}

View File

@ -305,7 +305,6 @@ class TypeTemplate {
_className = data.getString(offset + 1, data[offset]);
offset += data[offset] + 1;
if (hasParent) {
_parentId = data.getGuid(offset);
offset += 16;
@ -316,7 +315,6 @@ class TypeTemplate {
offset += 2;
_annotation = data.getString(offset, len);
offset += len;
}
_version = data.getInt32(offset);
@ -337,6 +335,8 @@ class TypeTemplate {
if (type == 0) // function
{
String? annotation = null;
var isStatic = ((data[offset] & 0x4) == 0x4);
var hasAnnotation = ((data[offset++] & 0x10) == 0x10);
var name = data.getString(offset + 1, data[offset]);
@ -364,7 +364,7 @@ class TypeTemplate {
}
var ft = new FunctionTemplate(this, functionIndex++, name, inherited,
arguments, dt.type, annotation);
isStatic, arguments, dt.type, annotation);
_functions.add(ft);
} else if (type == 1) // property