mirror of
https://github.com/esiur/esiur-dotnet.git
synced 2025-06-27 05:23:13 +00:00
Static Calling
This commit is contained in:
@ -40,6 +40,7 @@ using System.Linq;
|
||||
using System.Diagnostics;
|
||||
using static Esiur.Net.Packets.IIPPacket;
|
||||
using Esiur.Net.HTTP;
|
||||
using System.Timers;
|
||||
|
||||
namespace Esiur.Net.IIP;
|
||||
public partial class DistributedConnection : NetworkConnection, IStore
|
||||
@ -47,6 +48,9 @@ public partial class DistributedConnection : NetworkConnection, IStore
|
||||
public delegate void ReadyEvent(DistributedConnection sender);
|
||||
public delegate void ErrorEvent(DistributedConnection sender, byte errorCode, string errorMessage);
|
||||
|
||||
|
||||
Timer keepAliveTimer;
|
||||
|
||||
/// <summary>
|
||||
/// Ready event is raised when the connection is fully established.
|
||||
/// </summary>
|
||||
@ -330,13 +334,64 @@ public partial class DistributedConnection : NetworkConnection, IStore
|
||||
else
|
||||
x.Resource._UpdatePropertyByIndex(x.Index, x.Value);
|
||||
});
|
||||
//q.timeout?.Dispose();
|
||||
|
||||
|
||||
var r = new Random();
|
||||
localNonce = new byte[32];
|
||||
r.NextBytes(localNonce);
|
||||
|
||||
keepAliveTimer = new Timer(KeepAliveInterval * 1000);
|
||||
keepAliveTimer.Elapsed += KeepAliveTimer_Elapsed;
|
||||
}
|
||||
|
||||
[Public] public virtual uint Jitter { get; set; }
|
||||
|
||||
public uint KeepAliveTime { get; set; } = 10;
|
||||
|
||||
DateTime? lastKeepAliveSent;
|
||||
|
||||
private void KeepAliveTimer_Elapsed(object sender, ElapsedEventArgs e)
|
||||
{
|
||||
if (!IsConnected)
|
||||
return;
|
||||
|
||||
|
||||
keepAliveTimer.Stop();
|
||||
|
||||
var now = DateTime.UtcNow;
|
||||
|
||||
uint interval = lastKeepAliveSent == null ? 0 :
|
||||
(uint)(now - (DateTime)lastKeepAliveSent).TotalMilliseconds;
|
||||
|
||||
lastKeepAliveSent = now;
|
||||
|
||||
SendRequest(IIPPacketAction.KeepAlive)
|
||||
.AddDateTime(now)
|
||||
.AddUInt32(interval)
|
||||
.Done()
|
||||
.Then(x =>
|
||||
{
|
||||
|
||||
Jitter = (uint)x[1];
|
||||
keepAliveTimer.Start();
|
||||
//Console.WriteLine($"Keep Alive Received {Jitter}");
|
||||
}).Error(ex =>
|
||||
{
|
||||
keepAliveTimer.Stop();
|
||||
Close();
|
||||
}).Timeout((int)(KeepAliveTime * 1000), () =>
|
||||
{
|
||||
keepAliveTimer.Stop();
|
||||
Close();
|
||||
});
|
||||
|
||||
//Console.WriteLine("Keep Alive sent");
|
||||
|
||||
|
||||
}
|
||||
|
||||
public uint KeepAliveInterval { get; set; } = 30;
|
||||
|
||||
public override void Destroy()
|
||||
{
|
||||
UnsubscribeAll();
|
||||
@ -535,6 +590,19 @@ public partial class DistributedConnection : NetworkConnection, IStore
|
||||
// @TODO : fix this
|
||||
//IIPRequestClearAttributes(packet.CallbackId, packet.ResourceId, packet.Content, false);
|
||||
break;
|
||||
|
||||
case IIPPacketAction.KeepAlive:
|
||||
IIPRequestKeepAlive(packet.CallbackId, packet.CurrentTime, packet.Interval);
|
||||
break;
|
||||
|
||||
case IIPPacketAction.ProcedureCall:
|
||||
IIPRequestProcedureCall(packet.CallbackId, packet.Procedure, (TransmissionType)packet.DataType, msg);
|
||||
break;
|
||||
|
||||
case IIPPacketAction.StaticCall:
|
||||
IIPRequestStaticCall(packet.CallbackId, packet.ClassId, packet.MethodIndex, (TransmissionType)packet.DataType, msg);
|
||||
break;
|
||||
|
||||
}
|
||||
}
|
||||
else if (packet.Command == IIPPacket.IIPPacketCommand.Reply)
|
||||
@ -584,7 +652,9 @@ public partial class DistributedConnection : NetworkConnection, IStore
|
||||
break;
|
||||
|
||||
// Invoke
|
||||
case IIPPacket.IIPPacketAction.InvokeFunction:
|
||||
case IIPPacketAction.InvokeFunction:
|
||||
case IIPPacketAction.StaticCall:
|
||||
case IIPPacketAction.ProcedureCall:
|
||||
IIPReplyInvoke(packet.CallbackId, (TransmissionType)packet.DataType, msg);// packet.Content);
|
||||
break;
|
||||
|
||||
@ -615,6 +685,9 @@ public partial class DistributedConnection : NetworkConnection, IStore
|
||||
IIPReply(packet.CallbackId);
|
||||
break;
|
||||
|
||||
case IIPPacketAction.KeepAlive:
|
||||
IIPReply(packet.CallbackId, packet.CurrentTime, packet.Jitter);
|
||||
break;
|
||||
}
|
||||
|
||||
}
|
||||
@ -781,28 +854,28 @@ public partial class DistributedConnection : NetworkConnection, IStore
|
||||
{
|
||||
Server.Membership.TokenExists(authPacket.RemoteTokenIndex, authPacket.Domain).Then(x =>
|
||||
{
|
||||
if (x != null)
|
||||
{
|
||||
session.RemoteAuthentication.Username = x;
|
||||
session.RemoteAuthentication.TokenIndex = authPacket.RemoteTokenIndex;
|
||||
remoteNonce = authPacket.RemoteNonce;
|
||||
session.RemoteAuthentication.Domain = authPacket.Domain;
|
||||
SendParams()
|
||||
.AddUInt8(0xa0)
|
||||
.AddUInt8Array(localNonce)
|
||||
.Done();
|
||||
}
|
||||
else
|
||||
{
|
||||
//Console.WriteLine("User not found");
|
||||
SendParams()
|
||||
.AddUInt8(0xc0)
|
||||
.AddUInt8((byte)ExceptionCode.UserOrTokenNotFound)
|
||||
.AddUInt16(15)
|
||||
.AddString("Token not found")
|
||||
.Done();
|
||||
}
|
||||
});
|
||||
if (x != null)
|
||||
{
|
||||
session.RemoteAuthentication.Username = x;
|
||||
session.RemoteAuthentication.TokenIndex = authPacket.RemoteTokenIndex;
|
||||
remoteNonce = authPacket.RemoteNonce;
|
||||
session.RemoteAuthentication.Domain = authPacket.Domain;
|
||||
SendParams()
|
||||
.AddUInt8(0xa0)
|
||||
.AddUInt8Array(localNonce)
|
||||
.Done();
|
||||
}
|
||||
else
|
||||
{
|
||||
//Console.WriteLine("User not found");
|
||||
SendParams()
|
||||
.AddUInt8(0xc0)
|
||||
.AddUInt8((byte)ExceptionCode.UserOrTokenNotFound)
|
||||
.AddUInt16(15)
|
||||
.AddString("Token not found")
|
||||
.Done();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
@ -937,7 +1010,6 @@ public partial class DistributedConnection : NetworkConnection, IStore
|
||||
{
|
||||
Warehouse.Put(this.RemoteUsername, this, null, Server).Then(x =>
|
||||
{
|
||||
|
||||
ready = true;
|
||||
openReply?.Trigger(true);
|
||||
OnReady?.Invoke(this);
|
||||
@ -1061,6 +1133,9 @@ public partial class DistributedConnection : NetworkConnection, IStore
|
||||
openReply?.Trigger(true);
|
||||
OnReady?.Invoke(this);
|
||||
}
|
||||
|
||||
// start perodic keep alive timer
|
||||
keepAliveTimer.Start();
|
||||
}
|
||||
}
|
||||
else if (authPacket.Command == IIPAuthPacket.IIPAuthPacketCommand.Error)
|
||||
@ -1326,28 +1401,61 @@ public partial class DistributedConnection : NetworkConnection, IStore
|
||||
// clean up
|
||||
readyToEstablish = false;
|
||||
|
||||
keepAliveTimer.Stop();
|
||||
|
||||
|
||||
foreach (var x in requests.Values)
|
||||
x.TriggerError(new AsyncException(ErrorType.Management, 0, "Connection closed"));
|
||||
{
|
||||
try
|
||||
{
|
||||
x.TriggerError(new AsyncException(ErrorType.Management, 0, "Connection closed"));
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Global.Log(ex);
|
||||
}
|
||||
}
|
||||
|
||||
foreach (var x in resourceRequests.Values)
|
||||
x.TriggerError(new AsyncException(ErrorType.Management, 0, "Connection closed"));
|
||||
{
|
||||
try
|
||||
{
|
||||
x.TriggerError(new AsyncException(ErrorType.Management, 0, "Connection closed"));
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Global.Log(ex);
|
||||
}
|
||||
}
|
||||
|
||||
foreach (var x in templateRequests.Values)
|
||||
x.TriggerError(new AsyncException(ErrorType.Management, 0, "Connection closed"));
|
||||
{
|
||||
try
|
||||
{
|
||||
x.TriggerError(new AsyncException(ErrorType.Management, 0, "Connection closed"));
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Global.Log(ex);
|
||||
}
|
||||
}
|
||||
|
||||
requests.Clear();
|
||||
resourceRequests.Clear();
|
||||
templateRequests.Clear();
|
||||
|
||||
foreach (var x in resources.Values)
|
||||
x.Suspend();
|
||||
|
||||
UnsubscribeAll();
|
||||
|
||||
Warehouse.Remove(this);
|
||||
if (Server != null) {
|
||||
foreach (var x in resources.Values)
|
||||
x.Suspend();
|
||||
UnsubscribeAll();
|
||||
Warehouse.Remove(this);
|
||||
|
||||
if (ready)
|
||||
Server.Membership?.Logout(session);
|
||||
};
|
||||
|
||||
if (ready)
|
||||
Server?.Membership?.Logout(session);
|
||||
|
||||
ready = false;
|
||||
}
|
||||
|
@ -67,6 +67,9 @@ partial class DistributedConnection
|
||||
|
||||
AsyncQueue<DistributedResourceQueueItem> queue = new AsyncQueue<DistributedResourceQueueItem>();
|
||||
|
||||
|
||||
DateTime? lastKeepAliveReceived;
|
||||
|
||||
/// <summary>
|
||||
/// Send IIP request.
|
||||
/// </summary>
|
||||
@ -99,18 +102,6 @@ partial class DistributedConnection
|
||||
|
||||
internal SendList SendReply(IIPPacket.IIPPacketAction action, uint callbackId)
|
||||
{
|
||||
/*
|
||||
if (callbackId > maxcallerid)
|
||||
{
|
||||
maxcallerid = callbackId;
|
||||
}
|
||||
else
|
||||
{
|
||||
Console.Beep();
|
||||
|
||||
}
|
||||
*/
|
||||
|
||||
return (SendList)SendParams().AddUInt8((byte)(0x80 | (byte)action)).AddUInt32(callbackId);
|
||||
}
|
||||
|
||||
@ -150,6 +141,53 @@ partial class DistributedConnection
|
||||
}
|
||||
|
||||
|
||||
public AsyncReply<object> StaticCall(Guid classId, byte index, Map<byte, object> parameters)
|
||||
{
|
||||
var pb = Codec.Compose(parameters, this);// Codec.ComposeVarArray(parameters, this, true);
|
||||
|
||||
var reply = new AsyncReply<object>();
|
||||
var c = callbackCounter++;
|
||||
requests.Add(c, reply);
|
||||
|
||||
|
||||
SendParams().AddUInt8((byte)(0x40 | (byte)IIPPacket.IIPPacketAction.StaticCall))
|
||||
.AddUInt32(c)
|
||||
.AddGuid(classId)
|
||||
.AddUInt8(index)
|
||||
.AddUInt8Array(pb)
|
||||
.Done();
|
||||
|
||||
return reply;
|
||||
}
|
||||
|
||||
public AsyncReply<object> Call(string procedureCall, params object[] parameters)
|
||||
{
|
||||
var args = new Map<byte, object>();
|
||||
for (byte i = 0; i < parameters.Length; i++)
|
||||
args.Add(i, parameters[i]);
|
||||
return Call(procedureCall, args);
|
||||
}
|
||||
|
||||
public AsyncReply<object> Call(string procedureCall, Map<byte, object> parameters)
|
||||
{
|
||||
var pb = Codec.Compose(parameters, this);
|
||||
|
||||
var reply = new AsyncReply<object>();
|
||||
var c = callbackCounter++;
|
||||
requests.Add(c, reply);
|
||||
|
||||
var callName = DC.ToBytes(procedureCall);
|
||||
|
||||
SendParams().AddUInt8((byte)(0x40 | (byte)IIPPacket.IIPPacketAction.ProcedureCall))
|
||||
.AddUInt32(c)
|
||||
.AddUInt16((ushort)callName.Length)
|
||||
.AddUInt8Array(callName)
|
||||
.AddUInt8Array(pb)
|
||||
.Done();
|
||||
|
||||
return reply;
|
||||
}
|
||||
|
||||
internal AsyncReply<object> SendInvoke(uint instanceId, byte index, Map<byte, object> parameters)
|
||||
{
|
||||
var pb = Codec.Compose(parameters, this);// Codec.ComposeVarArray(parameters, this, true);
|
||||
@ -158,17 +196,12 @@ partial class DistributedConnection
|
||||
var c = callbackCounter++;
|
||||
requests.Add(c, reply);
|
||||
|
||||
SendParams().AddUInt8((byte)(0x40 | (byte)Packets.IIPPacket.IIPPacketAction.InvokeFunction))
|
||||
SendParams().AddUInt8((byte)(0x40 | (byte)IIPPacket.IIPPacketAction.InvokeFunction))
|
||||
.AddUInt32(c)
|
||||
.AddUInt32(instanceId)
|
||||
.AddUInt8(index)
|
||||
.AddUInt8Array(pb)
|
||||
.Done();
|
||||
|
||||
//var bl = new BinaryList((byte)(0x40 | (byte)Packets.IIPPacket.IIPPacketAction.InvokeFunctionArrayArguments),
|
||||
// callbackCounter, instanceId, index, pb);
|
||||
//Send(bl.ToArray());
|
||||
|
||||
return reply;
|
||||
}
|
||||
|
||||
@ -1212,6 +1245,102 @@ partial class DistributedConnection
|
||||
return new Tuple<ushort, string>((ushort)code, $"{source}: {msg}\n{trace}");
|
||||
}
|
||||
|
||||
|
||||
void IIPRequestProcedureCall(uint callback, string procedureCall, TransmissionType transmissionType, byte[] content)
|
||||
{
|
||||
|
||||
if (Server == null)
|
||||
{
|
||||
SendError(ErrorType.Management, callback, (ushort)ExceptionCode.GeneralFailure);
|
||||
return;
|
||||
}
|
||||
|
||||
var call = Server.Calls[procedureCall];
|
||||
|
||||
if (call == null)
|
||||
{
|
||||
SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound);
|
||||
return;
|
||||
}
|
||||
|
||||
var (_, parsed) = Codec.Parse(content, 0, this, null, transmissionType);
|
||||
|
||||
parsed.Then(results =>
|
||||
{
|
||||
var arguments = (Map<byte, object>)results;// (object[])results;
|
||||
|
||||
// un hold the socket to send data immediately
|
||||
this.Socket.Unhold();
|
||||
|
||||
// @TODO: Make managers for procedure calls
|
||||
//if (r.Instance.Applicable(session, ActionType.Execute, ft) == Ruling.Denied)
|
||||
//{
|
||||
// SendError(ErrorType.Management, callback,
|
||||
// (ushort)ExceptionCode.InvokeDenied);
|
||||
// return;
|
||||
//}
|
||||
|
||||
InvokeFunction(call.Method, callback, arguments, IIPPacket.IIPPacketAction.ProcedureCall, call.Target);
|
||||
|
||||
}).Error(x =>
|
||||
{
|
||||
SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError);
|
||||
});
|
||||
}
|
||||
|
||||
void IIPRequestStaticCall(uint callback, Guid classId, byte index, TransmissionType transmissionType, byte[] content)
|
||||
{
|
||||
var template = Warehouse.GetTemplateByClassId(classId);
|
||||
|
||||
if (template == null)
|
||||
{
|
||||
SendError(ErrorType.Management, callback, (ushort)ExceptionCode.TemplateNotFound);
|
||||
return;
|
||||
}
|
||||
|
||||
var ft = template.GetFunctionTemplateByIndex(index);
|
||||
|
||||
if (ft == null)
|
||||
{
|
||||
// no function at this index
|
||||
SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound);
|
||||
return;
|
||||
}
|
||||
|
||||
var (_, parsed) = Codec.Parse(content, 0, this, null, transmissionType);
|
||||
|
||||
parsed.Then(results =>
|
||||
{
|
||||
var arguments = (Map<byte, object>)results;// (object[])results;
|
||||
|
||||
// un hold the socket to send data immediately
|
||||
this.Socket.Unhold();
|
||||
|
||||
var fi = ft.MethodInfo;
|
||||
|
||||
if (fi == null)
|
||||
{
|
||||
// ft found, fi not found, this should never happen
|
||||
SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound);
|
||||
return;
|
||||
}
|
||||
|
||||
// @TODO: Make managers for static calls
|
||||
//if (r.Instance.Applicable(session, ActionType.Execute, ft) == Ruling.Denied)
|
||||
//{
|
||||
// SendError(ErrorType.Management, callback,
|
||||
// (ushort)ExceptionCode.InvokeDenied);
|
||||
// return;
|
||||
//}
|
||||
|
||||
InvokeFunction(fi, callback, arguments, IIPPacket.IIPPacketAction.StaticCall, null);
|
||||
|
||||
}).Error(x =>
|
||||
{
|
||||
SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError);
|
||||
});
|
||||
}
|
||||
|
||||
void IIPRequestInvokeFunction(uint callback, uint resourceId, byte index, TransmissionType transmissionType, byte[] content)
|
||||
{
|
||||
//Console.WriteLine("IIPRequestInvokeFunction " + callback + " " + resourceId + " " + index);
|
||||
@ -1263,128 +1392,135 @@ partial class DistributedConnection
|
||||
}
|
||||
else
|
||||
{
|
||||
|
||||
var fi = r.GetType().GetMethod(ft.Name);
|
||||
|
||||
if (fi != null)
|
||||
{
|
||||
if (r.Instance.Applicable(session, ActionType.Execute, ft) == Ruling.Denied)
|
||||
{
|
||||
SendError(ErrorType.Management, callback,
|
||||
(ushort)ExceptionCode.InvokeDenied);
|
||||
return;
|
||||
}
|
||||
|
||||
// cast arguments
|
||||
ParameterInfo[] pis = fi.GetParameters();
|
||||
|
||||
object[] args = new object[pis.Length];
|
||||
|
||||
if (pis.Length > 0)
|
||||
{
|
||||
if (pis.Last().ParameterType == typeof(DistributedConnection))
|
||||
{
|
||||
for (byte i = 0; i < pis.Length - 1; i++)
|
||||
args[i] = arguments.ContainsKey(i) ?
|
||||
DC.CastConvert(arguments[i], pis[i].ParameterType) : Type.Missing;
|
||||
args[args.Length - 1] = this;
|
||||
}
|
||||
else
|
||||
{
|
||||
for (byte i = 0; i < pis.Length; i++)
|
||||
args[i] = arguments.ContainsKey(i) ?
|
||||
DC.CastConvert(arguments[i], pis[i].ParameterType) : Type.Missing;
|
||||
}
|
||||
}
|
||||
|
||||
object rt;
|
||||
|
||||
try
|
||||
{
|
||||
rt = fi.Invoke(r, args);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
var (code, msg) = SummerizeException(ex);
|
||||
SendError(ErrorType.Exception, callback, code, msg);
|
||||
return;
|
||||
}
|
||||
|
||||
if (rt is System.Collections.IEnumerable && !(rt is Array || rt is Map<string, object> || rt is string))
|
||||
{
|
||||
var enu = rt as System.Collections.IEnumerable;
|
||||
|
||||
try
|
||||
{
|
||||
foreach (var v in enu)
|
||||
SendChunk(callback, v);
|
||||
SendReply(IIPPacket.IIPPacketAction.InvokeFunction, callback)
|
||||
.AddUInt8((byte)TransmissionTypeIdentifier.Null)
|
||||
.Done();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
var (code, msg) = SummerizeException(ex);
|
||||
SendError(ErrorType.Exception, callback, code, msg);
|
||||
}
|
||||
|
||||
}
|
||||
else if (rt is Task)
|
||||
{
|
||||
(rt as Task).ContinueWith(t =>
|
||||
{
|
||||
#if NETSTANDARD
|
||||
var res = t.GetType().GetTypeInfo().GetProperty("Result").GetValue(t);
|
||||
#else
|
||||
var res = t.GetType().GetProperty("Result").GetValue(t);
|
||||
#endif
|
||||
SendReply(IIPPacket.IIPPacketAction.InvokeFunction, callback)
|
||||
.AddUInt8Array(Codec.Compose(res, this))
|
||||
.Done();
|
||||
});
|
||||
|
||||
//await t;
|
||||
//SendParams((byte)0x90, callback, Codec.Compose(res, this));
|
||||
}
|
||||
else if (rt is AsyncReply)// Codec.ImplementsInterface(rt.GetType(), typeof(IAsyncReply<>)))// rt.GetType().GetTypeInfo().IsGenericType
|
||||
//&& rt.GetType().GetGenericTypeDefinition() == typeof(IAsyncReply<>))
|
||||
{
|
||||
(rt as AsyncReply).Then(res =>
|
||||
{
|
||||
SendReply(IIPPacket.IIPPacketAction.InvokeFunction, callback)
|
||||
.AddUInt8Array(Codec.Compose(res, this))
|
||||
.Done();
|
||||
}).Error(ex =>
|
||||
{
|
||||
var (code, msg) = SummerizeException(ex);
|
||||
SendError(ErrorType.Exception, callback, code, msg);
|
||||
}).Progress((pt, pv, pm) =>
|
||||
{
|
||||
SendProgress(callback, pv, pm);
|
||||
}).Chunk(v =>
|
||||
{
|
||||
SendChunk(callback, v);
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
SendReply(IIPPacket.IIPPacketAction.InvokeFunction, callback)
|
||||
.AddUInt8Array(Codec.Compose(rt, this))
|
||||
.Done();
|
||||
}
|
||||
}
|
||||
else
|
||||
if (fi == null)
|
||||
{
|
||||
// ft found, fi not found, this should never happen
|
||||
SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
if (r.Instance.Applicable(session, ActionType.Execute, ft) == Ruling.Denied)
|
||||
{
|
||||
SendError(ErrorType.Management, callback,
|
||||
(ushort)ExceptionCode.InvokeDenied);
|
||||
return;
|
||||
}
|
||||
|
||||
InvokeFunction(fi, callback, arguments, IIPPacket.IIPPacketAction.InvokeFunction, r);
|
||||
}
|
||||
});
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
|
||||
void InvokeFunction(MethodInfo fi, uint callback, Map<byte, object> arguments, IIPPacket.IIPPacketAction actionType, object target = null)
|
||||
{
|
||||
|
||||
// cast arguments
|
||||
ParameterInfo[] pis = fi.GetParameters();
|
||||
|
||||
object[] args = new object[pis.Length];
|
||||
|
||||
if (pis.Length > 0)
|
||||
{
|
||||
if (pis.Last().ParameterType == typeof(DistributedConnection))
|
||||
{
|
||||
for (byte i = 0; i < pis.Length - 1; i++)
|
||||
args[i] = arguments.ContainsKey(i) ?
|
||||
DC.CastConvert(arguments[i], pis[i].ParameterType) : Type.Missing;
|
||||
args[args.Length - 1] = this;
|
||||
}
|
||||
else
|
||||
{
|
||||
for (byte i = 0; i < pis.Length; i++)
|
||||
args[i] = arguments.ContainsKey(i) ?
|
||||
DC.CastConvert(arguments[i], pis[i].ParameterType) : Type.Missing;
|
||||
}
|
||||
}
|
||||
|
||||
object rt;
|
||||
|
||||
try
|
||||
{
|
||||
rt = fi.Invoke(target, args);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
var (code, msg) = SummerizeException(ex);
|
||||
msg = "Arguments: " + string.Join(", ", args.Select(x => x?.ToString() ?? "[Null]").ToArray()) + "\r\n" + msg;
|
||||
|
||||
SendError(ErrorType.Exception, callback, code, msg);
|
||||
return;
|
||||
}
|
||||
|
||||
if (rt is System.Collections.IEnumerable && !(rt is Array || rt is Map<string, object> || rt is string))
|
||||
{
|
||||
var enu = rt as System.Collections.IEnumerable;
|
||||
|
||||
try
|
||||
{
|
||||
foreach (var v in enu)
|
||||
SendChunk(callback, v);
|
||||
SendReply(actionType, callback)
|
||||
.AddUInt8((byte)TransmissionTypeIdentifier.Null)
|
||||
.Done();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
var (code, msg) = SummerizeException(ex);
|
||||
SendError(ErrorType.Exception, callback, code, msg);
|
||||
}
|
||||
|
||||
}
|
||||
else if (rt is Task)
|
||||
{
|
||||
(rt as Task).ContinueWith(t =>
|
||||
{
|
||||
#if NETSTANDARD
|
||||
var res = t.GetType().GetTypeInfo().GetProperty("Result").GetValue(t);
|
||||
#else
|
||||
var res = t.GetType().GetProperty("Result").GetValue(t);
|
||||
#endif
|
||||
SendReply(actionType, callback)
|
||||
.AddUInt8Array(Codec.Compose(res, this))
|
||||
.Done();
|
||||
});
|
||||
|
||||
//await t;
|
||||
//SendParams((byte)0x90, callback, Codec.Compose(res, this));
|
||||
}
|
||||
else if (rt is AsyncReply)// Codec.ImplementsInterface(rt.GetType(), typeof(IAsyncReply<>)))// rt.GetType().GetTypeInfo().IsGenericType
|
||||
//&& rt.GetType().GetGenericTypeDefinition() == typeof(IAsyncReply<>))
|
||||
{
|
||||
(rt as AsyncReply).Then(res =>
|
||||
{
|
||||
SendReply(actionType, callback)
|
||||
.AddUInt8Array(Codec.Compose(res, this))
|
||||
.Done();
|
||||
}).Error(ex =>
|
||||
{
|
||||
var (code, msg) = SummerizeException(ex);
|
||||
SendError(ErrorType.Exception, callback, code, msg);
|
||||
}).Progress((pt, pv, pm) =>
|
||||
{
|
||||
SendProgress(callback, pv, pm);
|
||||
}).Chunk(v =>
|
||||
{
|
||||
SendChunk(callback, v);
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
SendReply(actionType, callback)
|
||||
.AddUInt8Array(Codec.Compose(rt, this))
|
||||
.Done();
|
||||
}
|
||||
}
|
||||
|
||||
void IIPRequestListen(uint callback, uint resourceId, byte index)
|
||||
{
|
||||
@ -1651,11 +1787,11 @@ partial class DistributedConnection
|
||||
{
|
||||
|
||||
/*
|
||||
#if NETSTANDARD
|
||||
#if NETSTANDARD
|
||||
var pi = r.GetType().GetTypeInfo().GetProperty(pt.Name);
|
||||
#else
|
||||
#else
|
||||
var pi = r.GetType().GetProperty(pt.Name);
|
||||
#endif*/
|
||||
#endif*/
|
||||
|
||||
var pi = pt.PropertyInfo;
|
||||
|
||||
@ -2531,4 +2667,31 @@ partial class DistributedConnection
|
||||
.AddUInt8Array(Codec.Compose(info.Value, this))
|
||||
.Done();
|
||||
}
|
||||
|
||||
|
||||
|
||||
void IIPRequestKeepAlive(uint callbackId, DateTime peerTime, uint interval)
|
||||
{
|
||||
|
||||
uint jitter = 0;
|
||||
|
||||
var now = DateTime.UtcNow;
|
||||
|
||||
if (lastKeepAliveReceived != null)
|
||||
{
|
||||
var diff = (uint)(now - (DateTime)lastKeepAliveReceived).TotalMilliseconds;
|
||||
//Console.WriteLine("Diff " + diff + " " + interval);
|
||||
|
||||
jitter = (uint)Math.Abs((int)diff - (int)interval);
|
||||
}
|
||||
|
||||
SendParams()
|
||||
.AddUInt8((byte)(0x80 | (byte)IIPPacket.IIPPacketAction.KeepAlive))
|
||||
.AddUInt32(callbackId)
|
||||
.AddDateTime(now)
|
||||
.AddUInt32(jitter)
|
||||
.Done();
|
||||
|
||||
lastKeepAliveReceived = now;
|
||||
}
|
||||
}
|
||||
|
@ -222,9 +222,8 @@ public class DistributedResource : DynamicObject, IResource
|
||||
|
||||
public AsyncReply<object> _Invoke(byte index, Map<byte, object> args)
|
||||
{
|
||||
|
||||
if (destroyed)
|
||||
throw new Exception("Trying to access destroyed object");
|
||||
throw new Exception("Trying to access a destroyed object");
|
||||
|
||||
if (suspended)
|
||||
throw new Exception("Trying to access suspended object");
|
||||
@ -232,11 +231,17 @@ public class DistributedResource : DynamicObject, IResource
|
||||
if (index >= Instance.Template.Functions.Length)
|
||||
throw new Exception("Function index is incorrect");
|
||||
|
||||
var ft = Instance.Template.GetFunctionTemplateByIndex(index);
|
||||
|
||||
return connection.SendInvoke(instanceId, index, args);
|
||||
if (ft == null)
|
||||
throw new Exception("Function template not found.");
|
||||
|
||||
if (ft.IsStatic)
|
||||
return connection.StaticCall(Instance.Template.ClassId, index, args);
|
||||
else
|
||||
return connection.SendInvoke(instanceId, index, args);
|
||||
}
|
||||
|
||||
|
||||
public AsyncReply Listen(EventTemplate et)
|
||||
{
|
||||
if (et == null)
|
||||
|
@ -176,5 +176,11 @@ public class DistributedServer : NetworkServer<DistributedConnection>, IResource
|
||||
|
||||
}
|
||||
|
||||
public KeyList<string, Delegate> Calls { get; } = new KeyList<string, Delegate>();
|
||||
|
||||
public void MapCall(string call, Delegate handler)
|
||||
{
|
||||
Calls.Add(call, handler);
|
||||
}
|
||||
|
||||
}
|
||||
|
Reference in New Issue
Block a user