From 46b7744852d6dec9c877a780241a14f1c78f8508 Mon Sep 17 00:00:00 2001 From: ahmed Date: Tue, 19 Aug 2025 15:24:05 +0300 Subject: [PATCH] AsyncReply --- Esiur/Core/AsyncReply.cs | 117 +- Esiur/Core/AsyncReplyGeneric.cs | 2 +- Esiur/Core/ErrorType.cs | 3 +- Esiur/Core/ExceptionCode.cs | 5 +- Esiur/Data/Codec.cs | 92 +- Esiur/Data/DataDeserializer.cs | 105 +- Esiur/Data/DataSerializer.cs | 65 +- Esiur/Net/IIP/DistributedConnection.cs | 479 ++-- .../Net/IIP/DistributedConnectionProtocol.cs | 2120 +++++++---------- Esiur/Net/IIP/DistributedResource.cs | 25 +- Esiur/Net/Packets/IIPPacketNotification.cs | 17 +- Esiur/Net/Packets/IIPPacketReply.cs | 4 +- Esiur/Net/Packets/IIPPacketRequest.cs | 29 +- Esiur/Resource/Instance.cs | 42 +- Esiur/Resource/Warehouse.cs | 89 +- 15 files changed, 1473 insertions(+), 1721 deletions(-) diff --git a/Esiur/Core/AsyncReply.cs b/Esiur/Core/AsyncReply.cs index bdc772a..7b7491c 100644 --- a/Esiur/Core/AsyncReply.cs +++ b/Esiur/Core/AsyncReply.cs @@ -38,18 +38,19 @@ namespace Esiur.Core; [AsyncMethodBuilder(typeof(AsyncReplyBuilder))] public class AsyncReply { - //public bool Debug = false; protected List> callbacks = new List>(); protected object result; - protected List> errorCallbacks = new List>(); + protected List> errorCallbacks = null; - protected List> progressCallbacks = new List>(); + protected List> progressCallbacks = null; - protected List> chunkCallbacks = new List>(); + protected List> chunkCallbacks = null; + + protected List> propagationCallbacks = null; + protected List> warningCallbacks = null; - //List awaiters = new List(); object asyncLock = new object(); @@ -77,15 +78,8 @@ public class AsyncReply if (resultReady) return result; - //if (Debug) - // Console.WriteLine($"AsyncReply: {Id} Wait"); - - //mutex = new AutoResetEvent(false); mutex.WaitOne(); - //if (Debug) - // Console.WriteLine($"AsyncReply: {Id} Wait ended"); - if (exception != null) throw exception; @@ -95,7 +89,7 @@ public class AsyncReply public AsyncReply Timeout(int milliseconds, Action callback = null) { - + Task.Delay(milliseconds).ContinueWith(x => { if (!resultReady && exception == null) @@ -183,34 +177,52 @@ public class AsyncReply public AsyncReply Error(Action callback) { - // lock (callbacksLock) - // { + + if (errorCallbacks == null) + errorCallbacks = new List>(); + errorCallbacks.Add(callback); if (exception != null) callback(exception); return this; - //} } - public AsyncReply Progress(Action callback) + public AsyncReply Progress(Action callback) { - //lock (callbacksLock) - //{ + if (progressCallbacks == null) + progressCallbacks = new List>(); + progressCallbacks.Add(callback); return this; - //} } + public AsyncReply Warning(Action callback) + { + if (warningCallbacks == null) + warningCallbacks = new List>(); + + warningCallbacks.Add(callback); + return this; + } public AsyncReply Chunk(Action callback) { - // lock (callbacksLock) - // { + if (chunkCallbacks == null) + chunkCallbacks = new List>(); + chunkCallbacks.Add(callback); return this; - // } + } + + public AsyncReply Propagation(Action callback) + { + if (propagationCallbacks == null) + propagationCallbacks = new List>(); + + propagationCallbacks.Add(callback); + return this; } public AsyncReply Trigger(object result) @@ -259,32 +271,57 @@ public class AsyncReply else this.exception = new AsyncException(exception); - - // lock (callbacksLock) - // { - foreach (var cb in errorCallbacks) - cb(this.exception); - // } + if (errorCallbacks != null) + { + foreach (var cb in errorCallbacks) + cb(this.exception); + } + else + { + // no error handlers found + throw exception; + } mutex?.Set(); return this; } - public AsyncReply TriggerProgress(ProgressType type, int value, int max) + public AsyncReply TriggerProgress(ProgressType type, uint value, uint max) { //timeout?.Dispose(); - //lock (callbacksLock) - //{ - foreach (var cb in progressCallbacks) - cb(type, value, max); - - //} + if (progressCallbacks != null) + foreach (var cb in progressCallbacks) + cb(type, value, max); return this; } + public AsyncReply TriggerWarning(byte level, string message) + { + //timeout?.Dispose(); + + if (warningCallbacks != null) + foreach (var cb in warningCallbacks) + cb(level, message); + + return this; + } + + + public AsyncReply TriggerPropagation(object value) + { + //timeout?.Dispose(); + + if (propagationCallbacks != null) + foreach (var cb in propagationCallbacks) + cb(value); + + return this; + } + + public AsyncReply TriggerChunk(object value) { @@ -292,12 +329,10 @@ public class AsyncReply //timeout?.Dispose(); - //lock (callbacksLock) - //{ - foreach (var cb in chunkCallbacks) - cb(value); + if (chunkCallbacks != null) + foreach (var cb in chunkCallbacks) + cb(value); - //} return this; } diff --git a/Esiur/Core/AsyncReplyGeneric.cs b/Esiur/Core/AsyncReplyGeneric.cs index 042945d..5be91e1 100644 --- a/Esiur/Core/AsyncReplyGeneric.cs +++ b/Esiur/Core/AsyncReplyGeneric.cs @@ -45,7 +45,7 @@ public class AsyncReply : AsyncReply return this; } - public new AsyncReply Progress(Action callback) + public new AsyncReply Progress(Action callback) { base.Progress(callback); return this; diff --git a/Esiur/Core/ErrorType.cs b/Esiur/Core/ErrorType.cs index 68227f7..87659af 100644 --- a/Esiur/Core/ErrorType.cs +++ b/Esiur/Core/ErrorType.cs @@ -7,5 +7,6 @@ namespace Esiur.Core; public enum ErrorType { Management, - Exception + Exception, + Warning } diff --git a/Esiur/Core/ExceptionCode.cs b/Esiur/Core/ExceptionCode.cs index 973191c..39ad4a7 100644 --- a/Esiur/Core/ExceptionCode.cs +++ b/Esiur/Core/ExceptionCode.cs @@ -40,10 +40,11 @@ public enum ExceptionCode : ushort AddToStoreFailed, NotAttached, AlreadyListened, - AlreadyUnlistened, + AlreadyUnsubscribed, NotSubscribable, ParseError, Timeout, NotSupported, - NotImplemented + NotImplemented, + NotAllowed } diff --git a/Esiur/Data/Codec.cs b/Esiur/Data/Codec.cs index d37cc7d..e658fb0 100644 --- a/Esiur/Data/Codec.cs +++ b/Esiur/Data/Codec.cs @@ -47,47 +47,110 @@ public static class Codec static AsyncParser[][] FixedAsyncParsers = new AsyncParser[][] { new AsyncParser[]{ + DataDeserializer.NullParserAsync, + DataDeserializer.BooleanFalseParserAsync, + DataDeserializer.BooleanTrueParserAsync, + DataDeserializer.NotModifiedParserAsync, + }, + new AsyncParser[]{ + DataDeserializer.UInt8ParserAsync, + DataDeserializer.Int8ParserAsync, + DataDeserializer.Char8ParserAsync, + DataDeserializer.LocalResourceParser8Async, + DataDeserializer.ResourceParser8Async, + }, + new AsyncParser[]{ + DataDeserializer.Int16ParserAsync, + DataDeserializer.UInt16ParserAsync, + DataDeserializer.Char16ParserAsync, + DataDeserializer.LocalResourceParser16Async, + DataDeserializer.ResourceParser16Async, + }, + new AsyncParser[]{ + DataDeserializer.Int32ParserAsync, + DataDeserializer.UInt32ParserAsync, + DataDeserializer.Float32ParserAsync, + DataDeserializer.LocalResourceParser32Async, + DataDeserializer.ResourceParser32Async, + }, + new AsyncParser[]{ + DataDeserializer.Int64ParserAsync, + DataDeserializer.UInt64ParserAsync, + DataDeserializer.Float64ParserAsync, + DataDeserializer.DateTimeParserAsync, + }, + new AsyncParser[] + { + DataDeserializer.Int128ParserAsync, // int 128 + DataDeserializer.UInt128ParserAsync, // uint 128 + DataDeserializer.Float128ParserAsync, + } + }; + + static AsyncParser[] DynamicAsyncParsers = new AsyncParser[] + { + DataDeserializer.RawDataParserAsync, + DataDeserializer.StringParserAsync, + DataDeserializer.ListParserAsync, + DataDeserializer.ResourceListParserAsync, + DataDeserializer.RecordListParserAsync, + }; + + static AsyncParser[] TypedAsyncParsers = new AsyncParser[] + { + DataDeserializer.RecordParserAsync, + DataDeserializer.TypedListParserAsync, + DataDeserializer.TypedMapParserAsync, + DataDeserializer.TupleParserAsync, + DataDeserializer.EnumParserAsync, + DataDeserializer.ConstantParserAsync, + }; + + + static SyncParser[][] FixedParsers = new SyncParser[][] +{ + new SyncParser[]{ DataDeserializer.NullParser, DataDeserializer.BooleanFalseParser, DataDeserializer.BooleanTrueParser, DataDeserializer.NotModifiedParser, }, - new AsyncParser[]{ - DataDeserializer.ByteParser, - DataDeserializer.SByteParser, + new SyncParser[]{ + DataDeserializer.UInt8Parser, + DataDeserializer.Int8Parser, DataDeserializer.Char8Parser, DataDeserializer.LocalResourceParser8, DataDeserializer.ResourceParser8, }, - new AsyncParser[]{ + new SyncParser[]{ DataDeserializer.Int16Parser, DataDeserializer.UInt16Parser, DataDeserializer.Char16Parser, DataDeserializer.LocalResourceParser16, DataDeserializer.ResourceParser16, }, - new AsyncParser[]{ + new SyncParser[]{ DataDeserializer.Int32Parser, DataDeserializer.UInt32Parser, DataDeserializer.Float32Parser, DataDeserializer.LocalResourceParser32, DataDeserializer.ResourceParser32, }, - new AsyncParser[]{ + new SyncParser[]{ DataDeserializer.Int64Parser, DataDeserializer.UInt64Parser, DataDeserializer.Float64Parser, DataDeserializer.DateTimeParser, }, - new AsyncParser[] + new SyncParser[] { DataDeserializer.Int128Parser, // int 128 DataDeserializer.UInt128Parser, // uint 128 DataDeserializer.Float128Parser, } - }; +}; - static AsyncParser[] DynamicAsyncParsers = new AsyncParser[] + static SyncParser[] DynamicParsers = new SyncParser[] { DataDeserializer.RawDataParser, DataDeserializer.StringParser, @@ -96,7 +159,7 @@ public static class Codec DataDeserializer.RecordListParser, }; - static AsyncParser[] TypedAsyncParsers = new AsyncParser[] + static SyncParser[] TypedParsers = new SyncParser[] { DataDeserializer.RecordParser, DataDeserializer.TypedListParser, @@ -106,7 +169,6 @@ public static class Codec DataDeserializer.ConstantParser, }; - /// /// Parse a value /// @@ -246,15 +308,15 @@ public static class Codec [typeof(IResource[])] = DataSerializer.ResourceListComposer, [typeof(IResource?[])] = DataSerializer.ResourceListComposer, [typeof(List)] = DataSerializer.ResourceListComposer, - [typeof(List)] = DataSerializer.ResourceListComposer, - [typeof(VarList)] = DataSerializer.ResourceListComposer, - [typeof(VarList)] = DataSerializer.ResourceListComposer, + [typeof(List)] = DataSerializer.ResourceListComposer, + [typeof(VarList)] = DataSerializer.ResourceListComposer, + [typeof(VarList)] = DataSerializer.ResourceListComposer, [typeof(IRecord[])] = DataSerializer.RecordListComposer, [typeof(IRecord?[])] = DataSerializer.RecordListComposer, [typeof(List)] = DataSerializer.RecordListComposer, [typeof(List)] = DataSerializer.RecordListComposer, [typeof(VarList)] = DataSerializer.RecordListComposer, - [typeof(VarList)] = DataSerializer.RecordListComposer, + [typeof(VarList)] = DataSerializer.RecordListComposer, [typeof(Map)] = DataSerializer.MapComposer, [typeof(Map)] = DataSerializer.MapComposer, [typeof(Map)] = DataSerializer.MapComposer, diff --git a/Esiur/Data/DataDeserializer.cs b/Esiur/Data/DataDeserializer.cs index e4fec0d..e7ef840 100644 --- a/Esiur/Data/DataDeserializer.cs +++ b/Esiur/Data/DataDeserializer.cs @@ -373,7 +373,7 @@ public static class DataDeserializer var initRecord = (TypeTemplate template) => { - ListParser(data, offset, length, connection, requestSequence).Then(r => + ListParserAsync(data, offset, length, connection, requestSequence).Then(r => { var ar = (object[])r; @@ -684,6 +684,31 @@ public static class DataDeserializer return rt.ToArray(); } + + public static (uint, ulong, object[]) LimitedCountListParser(byte[] data, uint offset, ulong length, uint countLimit = uint.MaxValue) + { + var rt = new List(); + + while (length > 0 && rt.Count < countLimit) + { + var (cs, reply) = Codec.ParseSync(data, offset); + + rt.Add(reply); + + if (cs > 0) + { + offset += (uint)cs; + length -= (uint)cs; + } + else + throw new Exception("Error while parsing structured data"); + + } + + return (offset, length, rt.ToArray()); + } + + public static AsyncReply TypedMapParserAsync(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { // get key type @@ -826,6 +851,21 @@ public static class DataDeserializer var type = typeof(ValueTuple<,,,>).MakeGenericType(types.ToArray()); rt.Trigger(Activator.CreateInstance(type, ar[0], ar[1], ar[2], ar[3])); } + else if (ar.Length == 5) + { + var type = typeof(ValueTuple<,,,,>).MakeGenericType(types.ToArray()); + rt.Trigger(Activator.CreateInstance(type, ar[0], ar[1], ar[2], ar[3], ar[4])); + } + else if (ar.Length == 6) + { + var type = typeof(ValueTuple<,,,,,>).MakeGenericType(types.ToArray()); + rt.Trigger(Activator.CreateInstance(type, ar[0], ar[1], ar[2], ar[3], ar[4], ar[5])); + } + else if (ar.Length == 7) + { + var type = typeof(ValueTuple<,,,,,,>).MakeGenericType(types.ToArray()); + rt.Trigger(Activator.CreateInstance(type, ar[0], ar[1], ar[2], ar[3], ar[4], ar[5], ar[6])); + } }); return rt; @@ -896,9 +936,12 @@ public static class DataDeserializer var type = typeof(ValueTuple<,,,,,,>).MakeGenericType(types.ToArray()); return Activator.CreateInstance(type, results[0], results[1], results[2], results[3], results[4], results[5], results[6]); } + + throw new Exception("Unknown tuple length."); + } - public static AsyncReply TypedListParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) + public static AsyncReply TypedListParserAsync(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { var rt = new AsyncBag(); @@ -932,13 +975,47 @@ public static class DataDeserializer return rt; } + public static object TypedListParser(byte[] data, uint offset, uint length) + { - public static AsyncBag PropertyValueArrayParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence)//, bool ageIncluded = true) + // get the type + var (hdrCs, rep) = RepresentationType.Parse(data, offset); + + offset += hdrCs; + length -= hdrCs; + + var runtimeType = rep.GetRuntimeType(); + + var list = new List(); + + while (length > 0) + { + var (cs, reply) = Codec.ParseSync(data, offset); + + list.Add(reply); + + if (cs > 0) + { + offset += (uint)cs; + length -= (uint)cs; + } + else + throw new Exception("Error while parsing structured data"); + + } + + var rt = Array.CreateInstance(runtimeType, list.Count); + Array.Copy(list.ToArray(), rt, rt.Length); + + return rt; + } + + public static AsyncBag PropertyValueArrayParserAsync(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence)//, bool ageIncluded = true) { var rt = new AsyncBag(); - ListParser(data, offset, length, connection, requestSequence).Then(x => + ListParserAsync(data, offset, length, connection, requestSequence).Then(x => { var ar = (object[])x; var pvs = new List(); @@ -954,7 +1031,8 @@ public static class DataDeserializer } - public static (uint, AsyncReply) PropertyValueParser(byte[] data, uint offset, DistributedConnection connection, uint[] requestSequence)//, bool ageIncluded = true) + + public static (uint, AsyncReply) PropertyValueParserAsync(byte[] data, uint offset, DistributedConnection connection, uint[] requestSequence)//, bool ageIncluded = true) { var reply = new AsyncReply(); @@ -967,15 +1045,22 @@ public static class DataDeserializer var (valueSize, results) = Codec.ParseAsync(data, offset, connection, requestSequence); - results.Then(value => + if (results is AsyncReply) { - reply.Trigger(new PropertyValue(value, age, date)); - }); + (results as AsyncReply).Then(value => + { + reply.Trigger(new PropertyValue(value, age, date)); + }); + } + else + { + reply.Trigger(new PropertyValue(results, age, date)); + } return (16 + valueSize, reply); } - public static AsyncReply> HistoryParser(byte[] data, uint offset, uint length, IResource resource, DistributedConnection connection, uint[] requestSequence) + public static AsyncReply> HistoryParserAsync(byte[] data, uint offset, uint length, IResource resource, DistributedConnection connection, uint[] requestSequence) { //var count = (int)toAge - (int)fromAge; @@ -994,7 +1079,7 @@ public static class DataDeserializer var cs = data.GetUInt32(offset, Endian.Little); offset += 4; - var (len, pv) = PropertyValueParser(data, offset, connection, requestSequence); + var (len, pv) = PropertyValueParserAsync(data, offset, connection, requestSequence); bagOfBags.Add(pv);// ParsePropertyValueArray(data, offset, cs, connection)); offset += len; diff --git a/Esiur/Data/DataSerializer.cs b/Esiur/Data/DataSerializer.cs index 2831db4..55f3479 100644 --- a/Esiur/Data/DataSerializer.cs +++ b/Esiur/Data/DataSerializer.cs @@ -102,7 +102,7 @@ public static class DataSerializer var rt = new byte[16]; fixed (byte* ptr = rt) *((decimal*)ptr) = v; - return (TransmissionTypeIdentifier.Float128, rt); + return (TransmissionTypeIdentifier.Decimal128, rt); } @@ -122,7 +122,7 @@ public static class DataSerializer var intVal = Convert.ChangeType(value, (value as Enum).GetTypeCode()); var ct = template.Constants.FirstOrDefault(x => x.Value.Equals(intVal)); - + if (ct == null) return (TransmissionTypeIdentifier.Null, new byte[0]); @@ -131,7 +131,7 @@ public static class DataSerializer rt.AddRange(template.ClassId.Data); rt.Add(ct.Index); - return (TransmissionTypeIdentifier.Enum, rt.ToArray()); + return (TransmissionTypeIdentifier.TypedEnum, rt.ToArray()); } public static (TransmissionTypeIdentifier, byte[]) UInt8Composer(object value, DistributedConnection connection) @@ -146,7 +146,7 @@ public static class DataSerializer public static unsafe (TransmissionTypeIdentifier, byte[]) Char8Composer(object value, DistributedConnection connection) { - return (TransmissionTypeIdentifier.Char8, new byte[] { (byte)(char)value }); + return (TransmissionTypeIdentifier.Char8, new byte[] { (byte)(char)value }); } public static unsafe (TransmissionTypeIdentifier, byte[]) Char16Composer(object value, DistributedConnection connection) @@ -241,7 +241,7 @@ public static class DataSerializer // .ToArray(); //} - public static (TransmissionTypeIdentifier, byte[]) PropertyValueArrayComposer(object value, DistributedConnection connection) + public static (TransmissionTypeIdentifier, byte[]) PropertyValueArrayComposer(object value, DistributedConnection connection) { if (value == null) return (TransmissionTypeIdentifier.Null, new byte[0]); @@ -274,9 +274,9 @@ public static class DataSerializer var map = (IMap)value; - foreach(var el in map.Serialize()) + foreach (var el in map.Serialize()) rt.AddRange(Codec.Compose(el, connection)); - + return (TransmissionTypeIdentifier.TypedMap, rt.ToArray()); } @@ -315,32 +315,59 @@ public static class DataSerializer public static unsafe (TransmissionTypeIdentifier, byte[]) ResourceComposer(object value, DistributedConnection connection) { var resource = (IResource)value; - var rt = new byte[4]; - if (resource.Instance == null || resource.Instance.IsDestroyed) + if (resource.Instance == null || resource.Instance.IsDestroyed) { return (TransmissionTypeIdentifier.Null, new byte[0]); } if (Codec.IsLocalResource(resource, connection)) { + var rid = (resource as DistributedResource).DistributedResourceInstanceId; - fixed (byte* ptr = rt) - *((uint*)ptr) = (resource as DistributedResource).DistributedResourceInstanceId; + if (rid <= 0xFF) + return (TransmissionTypeIdentifier.LocalResource8, new byte[] { (byte)rid }); + else if (rid <= 0xFFFF) + { + var rt = new byte[2]; + fixed (byte* ptr = rt) + *((ushort*)ptr) = (ushort)rid; - return (TransmissionTypeIdentifier.ResourceLocal, rt); + return (TransmissionTypeIdentifier.LocalResource16, rt); + } + else + { + var rt = new byte[4]; + fixed (byte* ptr = rt) + *((uint*)ptr) = rid; + return (TransmissionTypeIdentifier.LocalResource32, rt); + } } else { - + //rt.Append((value as IResource).Instance.Template.ClassId, (value as IResource).Instance.Id); connection.cache.Add(value as IResource, DateTime.UtcNow); - - fixed (byte* ptr = rt) - *((uint*)ptr) = resource.Instance.Id; + var rid = resource.Instance.Id; - return (TransmissionTypeIdentifier.Resource, rt); + if (rid <= 0xFF) + return (TransmissionTypeIdentifier.RemoteResource8, new byte[] { (byte)rid }); + else if (rid <= 0xFFFF) + { + var rt = new byte[2]; + fixed (byte* ptr = rt) + *((ushort*)ptr) = (ushort)rid; + + return (TransmissionTypeIdentifier.RemoteResource16, rt); + } + else + { + var rt = new byte[4]; + fixed (byte* ptr = rt) + *((uint*)ptr) = rid; + return (TransmissionTypeIdentifier.RemoteResource32, rt); + } } } @@ -400,7 +427,7 @@ public static class DataSerializer var rt = new List(); var fields = value.GetType().GetFields(); - var list = fields.Select(x => x.GetValue(value)).ToArray(); + var list = fields.Select(x => x.GetValue(value)).ToArray(); var types = fields.Select(x => RepresentationType.FromType(x.FieldType).Compose()).ToArray(); rt.Add((byte)list.Length); @@ -415,7 +442,7 @@ public static class DataSerializer else { rt.AddRange(composed); - return (TransmissionTypeIdentifier.Tuple, rt.ToArray()); + return (TransmissionTypeIdentifier.TypedTuple, rt.ToArray()); } } } diff --git a/Esiur/Net/IIP/DistributedConnection.cs b/Esiur/Net/IIP/DistributedConnection.cs index a78ed77..93802e4 100644 --- a/Esiur/Net/IIP/DistributedConnection.cs +++ b/Esiur/Net/IIP/DistributedConnection.cs @@ -33,6 +33,7 @@ using Esiur.Resource; using Esiur.Resource.Template; using Esiur.Security.Authority; using Esiur.Security.Membership; +using Microsoft.CodeAnalysis.CSharp.Syntax; using System; using System.Collections.Generic; using System.ComponentModel.DataAnnotations; @@ -49,6 +50,39 @@ namespace Esiur.Net.IIP; public partial class DistributedConnection : NetworkConnection, IStore { + + public delegate void ProtocolGeneralHandler(DistributedConnection connection, TransmissionType dataType, byte[] data); + + public delegate void ProtocolRequestReplyHandler(DistributedConnection connection, uint callbackId, TransmissionType dataType, byte[] data); + + //ProtocolGeneralHandler[] NotificationHandlers = new ProtocolGeneralHandler[] + //{ + // IIPNotificationResourceDestroyed, + // IIPNotificationResourceReassigned, + // IIPNotificationResourceMoved, + // IIPNotificationSystemFailure, + // IIPNotificationPropertyModified + //}; + + //ProtocolRequestReplyHandler[] RequestHandlers = new ProtocolRequestReplyHandler[] + //{ + // IIPRequestAttachResource, + // IIPRequest + + //}; + + //ProtocolRequestReplyHandler[] ReplyHandlers = new ProtocolRequestReplyHandler[] + //{ + + //}; + + //ProtocolGeneralHandler[] ExtensionHandler = new ProtocolGeneralHandler[] + //{ + + //}; + + + // Delegates public delegate void ReadyEvent(DistributedConnection sender); public delegate void ErrorEvent(DistributedConnection sender, byte errorCode, string errorMessage); @@ -112,10 +146,10 @@ public partial class DistributedConnection : NetworkConnection, IStore /// public Session Session => session; - [Export] + [Export] public virtual ConnectionStatus Status { get; private set; } - [Export] + [Export] public virtual uint Jitter { get; private set; } // Attributes @@ -164,6 +198,9 @@ public partial class DistributedConnection : NetworkConnection, IStore return true; } + + + /// /// Send data to the other end as parameters /// @@ -373,16 +410,11 @@ public partial class DistributedConnection : NetworkConnection, IStore lastKeepAliveSent = now; - SendRequest(IIPPacketRequest.KeepAlive) - .AddDateTime(now) - .AddUInt32(interval) - .Done() + SendRequest(IIPPacketRequest.KeepAlive, now, interval) .Then(x => { - - Jitter = (uint)x[1]; + Jitter = (uint)x; keepAliveTimer.Start(); - //Console.WriteLine($"Keep Alive Received {Jitter}"); }).Error(ex => { keepAliveTimer.Stop(); @@ -393,9 +425,6 @@ public partial class DistributedConnection : NetworkConnection, IStore Close(); }); - //Console.WriteLine("Keep Alive sent"); - - } public uint KeepAliveInterval { get; set; } = 30; @@ -417,7 +446,6 @@ public partial class DistributedConnection : NetworkConnection, IStore if (rt <= 0) { - var size = ends - offset; data.HoldFor(msg, offset, size, size + (uint)(-rt)); return ends; @@ -427,311 +455,153 @@ public partial class DistributedConnection : NetworkConnection, IStore offset += (uint)rt; - if (packet.DataType != null) - { - var dt = packet.DataType.Value; + if (packet.DataType == null) + return offset; - var (_, parsed) = Codec.Parse(msg, dt.Offset, this, null, dt); - - parsed.Then(value => - { - if (packet.Method == IIPPacketMethod.Notification) - { - switch (packet.Notification) - { - case IIPPacketNotification.ResourceDestroyed: - IIPNotificationResourceDestroyed(value); - break; - case IIPPacketNotification.ResourceReassigned: - IIPNotificationResourceReassigned(value); - break; - case IIPPacketNotification.ResourceMoved: - IIPNotificationResourceMoved(value); - break; - case IIPPacketNotification.SystemFailure: - IIPNotificationSystemFailure(value); - break; - case IIPPacketNotification.PropertyModified: - IIPNotificationPropertyModified() - } - } - }); - } if (packet.Method == IIPPacketMethod.Notification) { + var dt = packet.DataType.Value; + switch (packet.Notification) { + // Invoke + case IIPPacketNotification.PropertyModified: + IIPNotificationPropertyModified(dt, msg); + break; + case IIPPacketNotification.EventOccurred: + IIPNotificationEventOccurred(dt, msg); + break; + // Manage case IIPPacketNotification.ResourceDestroyed: - - case IIPPacketEvent.ResourceReassigned: - IIPEventResourceReassigned(packet.ResourceId, packet.NewResourceId); + IIPNotificationResourceDestroyed(dt, msg); break; - case IIPPacketEvent.ResourceDestroyed: - IIPEventResourceDestroyed(packet.ResourceId); + case IIPPacketNotification.ResourceReassigned: + IIPNotificationResourceReassigned(dt, msg); break; - case IIPPacketEvent.PropertyUpdated: - IIPEventPropertyUpdated(packet.ResourceId, packet.MethodIndex, (TransmissionType)packet.DataType, msg); + case IIPPacketNotification.ResourceMoved: + IIPNotificationResourceMoved(dt, msg); break; - case IIPPacketEvent.EventOccurred: - IIPEventEventOccurred(packet.ResourceId, packet.MethodIndex, (TransmissionType)packet.DataType, msg); - break; - - case IIPPacketEvent.ChildAdded: - IIPEventChildAdded(packet.ResourceId, packet.ChildId); - break; - case IIPPacketEvent.ChildRemoved: - IIPEventChildRemoved(packet.ResourceId, packet.ChildId); - break; - case IIPPacketEvent.Renamed: - IIPEventRenamed(packet.ResourceId, packet.ResourceLink); - break; - case IIPPacketEvent.AttributesUpdated: - // @TODO: fix this - //IIPEventAttributesUpdated(packet.ResourceId, packet.Content); + case IIPPacketNotification.SystemFailure: + IIPNotificationSystemFailure(dt, msg); break; } } - else if (packet.Command == IIPPacketCommand.Request) + else if (packet.Method == IIPPacketMethod.Request) { - switch (packet.Action) + var dt = packet.DataType.Value; + + switch (packet.Request) { - // Manage - case IIPPacketRequest.AttachResource: - IIPRequestAttachResource(packet.CallbackId, packet.ResourceId); - break; - case IIPPacketRequest.ReattachResource: - IIPRequestReattachResource(packet.CallbackId, packet.ResourceId, packet.ResourceAge); - break; - case IIPPacketRequest.DetachResource: - IIPRequestDetachResource(packet.CallbackId, packet.ResourceId); - break; - case IIPPacketRequest.CreateResource: - //@TODO : fix this - //IIPRequestCreateResource(packet.CallbackId, packet.StoreId, packet.ResourceId, packet.Content); - break; - case IIPPacketRequest.DeleteResource: - IIPRequestDeleteResource(packet.CallbackId, packet.ResourceId); - break; - case IIPPacketRequest.AddChild: - IIPRequestAddChild(packet.CallbackId, packet.ResourceId, packet.ChildId); - break; - case IIPPacketRequest.RemoveChild: - IIPRequestRemoveChild(packet.CallbackId, packet.ResourceId, packet.ChildId); - break; - case IIPPacketRequest.RenameResource: - IIPRequestRenameResource(packet.CallbackId, packet.ResourceId, packet.ResourceName); - break; - - // Inquire - case IIPPacketRequest.TemplateFromClassName: - IIPRequestTemplateFromClassName(packet.CallbackId, packet.ClassName); - break; - case IIPPacketRequest.TemplateFromClassId: - IIPRequestTemplateFromClassId(packet.CallbackId, packet.ClassId); - break; - case IIPPacketRequest.TemplateFromResourceId: - IIPRequestTemplateFromResourceId(packet.CallbackId, packet.ResourceId); - break; - case IIPPacketRequest.QueryLink: - IIPRequestQueryResources(packet.CallbackId, packet.ResourceLink); - break; - - case IIPPacketRequest.ResourceChildren: - IIPRequestResourceChildren(packet.CallbackId, packet.ResourceId); - break; - case IIPPacketRequest.ResourceParents: - IIPRequestResourceParents(packet.CallbackId, packet.ResourceId); - break; - - case IIPPacketRequest.ResourceHistory: - IIPRequestInquireResourceHistory(packet.CallbackId, packet.ResourceId, packet.FromDate, packet.ToDate); - break; - - case IIPPacketRequest.LinkTemplates: - IIPRequestLinkTemplates(packet.CallbackId, packet.ResourceLink); - break; - // Invoke case IIPPacketRequest.InvokeFunction: - IIPRequestInvokeFunction(packet.CallbackId, packet.ResourceId, packet.MethodIndex, (TransmissionType)packet.DataType, msg); + IIPRequestInvokeFunction(packet.CallbackId, dt, msg); break; - - //case IIPPacket.IIPPacketAction.InvokeFunctionNamedArguments: - // IIPRequestInvokeFunctionNamedArguments(packet.CallbackId, packet.ResourceId, packet.MethodIndex, (TransmissionType)packet.DataType, msg); - // break; - - //case IIPPacket.IIPPacketAction.GetProperty: - // IIPRequestGetProperty(packet.CallbackId, packet.ResourceId, packet.MethodIndex); - // break; - //case IIPPacket.IIPPacketAction.GetPropertyIfModified: - // IIPRequestGetPropertyIfModifiedSince(packet.CallbackId, packet.ResourceId, packet.MethodIndex, packet.ResourceAge); - // break; - - case IIPPacketRequest.Listen: - IIPRequestListen(packet.CallbackId, packet.ResourceId, packet.MethodIndex); - break; - - case IIPPacketRequest.Unlisten: - IIPRequestUnlisten(packet.CallbackId, packet.ResourceId, packet.MethodIndex); - break; - case IIPPacketRequest.SetProperty: - IIPRequestSetProperty(packet.CallbackId, packet.ResourceId, packet.MethodIndex, (TransmissionType)packet.DataType, msg); + IIPRequestSetProperty(packet.CallbackId, dt, msg); break; - - // Attribute - case IIPPacketRequest.GetAllAttributes: - // @TODO : fix this - //IIPRequestGetAttributes(packet.CallbackId, packet.ResourceId, packet.Content, true); + case IIPPacketRequest.Subscribe: + IIPRequestSubscribe(packet.CallbackId, dt, msg); break; - case IIPPacketRequest.UpdateAllAttributes: - // @TODO : fix this - //IIPRequestUpdateAttributes(packet.CallbackId, packet.ResourceId, packet.Content, true); + case IIPPacketRequest.Unsubscribe: + IIPRequestUnsubscribe(packet.CallbackId, dt, msg); break; - case IIPPacketRequest.ClearAllAttributes: - // @TODO : fix this - //IIPRequestClearAttributes(packet.CallbackId, packet.ResourceId, packet.Content, true); + // Inquire + case IIPPacketRequest.TemplateFromClassName: + IIPRequestTemplateFromClassName(packet.CallbackId, dt, msg); break; - case IIPPacketRequest.GetAttributes: - // @TODO : fix this - //IIPRequestGetAttributes(packet.CallbackId, packet.ResourceId, packet.Content, false); + case IIPPacketRequest.TemplateFromClassId: + IIPRequestTemplateFromClassId(packet.CallbackId, dt, msg); break; - case IIPPacketRequest.UpdateAttributes: - // @TODO : fix this - //IIPRequestUpdateAttributes(packet.CallbackId, packet.ResourceId, packet.Content, false); + case IIPPacketRequest.TemplateFromResourceId: + IIPRequestTemplateFromResourceId(packet.CallbackId, dt, msg); break; - case IIPPacketRequest.ClearAttributes: - // @TODO : fix this - //IIPRequestClearAttributes(packet.CallbackId, packet.ResourceId, packet.Content, false); + case IIPPacketRequest.Query: + IIPRequestQueryResources(packet.CallbackId, dt, msg); break; - - case IIPPacketRequest.KeepAlive: - IIPRequestKeepAlive(packet.CallbackId, packet.CurrentTime, packet.Interval); + case IIPPacketRequest.LinkTemplates: + IIPRequestLinkTemplates(packet.CallbackId, dt, msg); break; - - case IIPPacketRequest.ProcedureCall: - IIPRequestProcedureCall(packet.CallbackId, packet.Procedure, (TransmissionType)packet.DataType, msg); + case IIPPacketRequest.Token: + IIPRequestToken(packet.CallbackId, dt, msg); break; - - case IIPPacketRequest.StaticCall: - IIPRequestStaticCall(packet.CallbackId, packet.ClassId, packet.MethodIndex, (TransmissionType)packet.DataType, msg); + case IIPPacketRequest.GetResourceIdByLink: + IIPRequestGetResourceIdByLink(packet.CallbackId, dt, msg); break; - - } - } - else if (packet.Command == IIPPacketCommand.Reply) - { - switch (packet.Action) - { // Manage case IIPPacketRequest.AttachResource: - IIPReply(packet.CallbackId, packet.ClassId, packet.ResourceAge, packet.ResourceLink, packet.DataType, msg); + IIPRequestAttachResource(packet.CallbackId, dt, msg); break; - case IIPPacketRequest.ReattachResource: - IIPReply(packet.CallbackId, packet.ResourceAge, packet.DataType, msg); - + IIPRequestReattachResource(packet.CallbackId, dt, msg); break; case IIPPacketRequest.DetachResource: - IIPReply(packet.CallbackId); + IIPRequestDetachResource(packet.CallbackId, dt, msg); break; - case IIPPacketRequest.CreateResource: - IIPReply(packet.CallbackId, packet.ResourceId); + IIPRequestCreateResource(packet.CallbackId, dt, msg); break; - case IIPPacketRequest.DeleteResource: - case IIPPacketRequest.AddChild: - case IIPPacketRequest.RemoveChild: - case IIPPacketRequest.RenameResource: - IIPReply(packet.CallbackId); + IIPRequestDeleteResource(packet.CallbackId, dt, msg); break; - - // Inquire - - case IIPPacketRequest.TemplateFromClassName: - case IIPPacketRequest.TemplateFromClassId: - case IIPPacketRequest.TemplateFromResourceId: - - var content = msg.Clip(packet.DataType.Value.Offset, (uint)packet.DataType.Value.ContentLength); - IIPReply(packet.CallbackId, TypeTemplate.Parse(content)); + case IIPPacketRequest.MoveResource: + IIPRequestMoveResource(packet.CallbackId, dt, msg); break; - - case IIPPacketRequest.QueryLink: - case IIPPacketRequest.ResourceChildren: - case IIPPacketRequest.ResourceParents: - case IIPPacketRequest.ResourceHistory: - case IIPPacketRequest.LinkTemplates: - IIPReply(packet.CallbackId, (TransmissionType)packet.DataType, msg);// packet.Content); - break; - - // Invoke - case IIPPacketRequest.InvokeFunction: - case IIPPacketRequest.StaticCall: - case IIPPacketRequest.ProcedureCall: - IIPReplyInvoke(packet.CallbackId, (TransmissionType)packet.DataType, msg);// packet.Content); - break; - - //case IIPPacket.IIPPacketAction.GetProperty: - // IIPReply(packet.CallbackId, packet.Content); - // break; - - //case IIPPacket.IIPPacketAction.GetPropertyIfModified: - // IIPReply(packet.CallbackId, packet.Content); - // break; - - case IIPPacketRequest.Listen: - case IIPPacketRequest.Unlisten: - case IIPPacketRequest.SetProperty: - IIPReply(packet.CallbackId); - break; - - // Attribute - case IIPPacketRequest.GetAllAttributes: - case IIPPacketRequest.GetAttributes: - IIPReply(packet.CallbackId, (TransmissionType)packet.DataType, msg);// packet.Content); - break; - - case IIPPacketRequest.UpdateAllAttributes: - case IIPPacketRequest.UpdateAttributes: - case IIPPacketRequest.ClearAllAttributes: - case IIPPacketRequest.ClearAttributes: - IIPReply(packet.CallbackId); - break; - + // Static case IIPPacketRequest.KeepAlive: - IIPReply(packet.CallbackId, packet.CurrentTime, packet.Jitter); + IIPRequestKeepAlive(packet.CallbackId, dt, msg); + break; + case IIPPacketRequest.ProcedureCall: + IIPRequestProcedureCall(packet.CallbackId, dt, msg); + break; + case IIPPacketRequest.StaticCall: + IIPRequestStaticCall(packet.CallbackId, dt, msg); break; } - } - else if (packet.Command == IIPPacketCommand.Report) + else if (packet.Method == IIPPacketMethod.Reply) { - switch (packet.Report) - { - case IIPPacketReport.ManagementError: - IIPReportError(packet.CallbackId, ErrorType.Management, packet.ErrorCode, null); - break; - case IIPPacketReport.ExecutionError: - IIPReportError(packet.CallbackId, ErrorType.Exception, packet.ErrorCode, packet.ErrorMessage); - break; - case IIPPacketReport.ProgressReport: - IIPReportProgress(packet.CallbackId, ProgressType.Execution, packet.ProgressValue, packet.ProgressMax); - break; - case IIPPacketReport.ChunkStream: - IIPReportChunk(packet.CallbackId, (TransmissionType)packet.DataType, msg); + var dt = packet.DataType.Value; + switch (packet.Reply) + { + case IIPPacketReply.Completed: + IIPReplyCompleted(packet.CallbackId, dt, msg); break; + case IIPPacketReply.Propagated: + IIPReplyPropagated(packet.CallbackId, dt, msg); + break; + case IIPPacketReply.PermissionError: + IIPReplyError(packet.CallbackId, dt, msg, ErrorType.Management); + break; + case IIPPacketReply.ExecutionError: + IIPReplyError(packet.CallbackId, dt, msg, ErrorType.Exception); + break; + + case IIPPacketReply.Progress: + IIPReplyProgress(packet.CallbackId, dt, msg); + break; + + case IIPPacketReply.Chunk: + IIPReplyChunk(packet.CallbackId, dt, msg); + break; + + case IIPPacketReply.Warning: + IIPReplyWarning(packet.Extension, dt, msg); + break; + } } + else if (packet.Method == IIPPacketMethod.Extension) + { + IIPExtensionAction(packet.Extension, packet.DataType, msg); + } } } - else { - // check if the reqeust through websockets + // check if the request through Websockets if (initialPacket) { @@ -755,7 +625,6 @@ public partial class DistributedConnection : NetworkConnection, IStore HTTPConnection.Upgrade(req, res); - res.Compose(HTTPComposeOption.AllCalculateLength); Send(res.Data); // replace my socket with websockets @@ -784,11 +653,8 @@ public partial class DistributedConnection : NetworkConnection, IStore } } - - var rt = authPacket.Parse(msg, offset, ends); - if (rt <= 0) { data.HoldFor(msg, ends + (uint)(-rt)); @@ -810,9 +676,6 @@ public partial class DistributedConnection : NetworkConnection, IStore } return offset; - - //if (offset < ends) - // processPacket(msg, offset, ends, data, chunkId); } private void ProcessClientAuth(byte[] data) @@ -831,9 +694,9 @@ public partial class DistributedConnection : NetworkConnection, IStore var dataType = authPacket.DataType.Value; - var (_, parsed) = Codec.Parse(data, dataType.Offset, this, null, dataType); + var (_, parsed) = Codec.ParseSync(data, dataType.Offset, dataType); - var rt = (Map)parsed.Wait(); + var rt = (Map)parsed; session.RemoteHeaders = rt.Select(x => new KeyValuePair((IIPAuthPacketHeader)x.Key, x.Value)); @@ -924,7 +787,7 @@ public partial class DistributedConnection : NetworkConnection, IStore ready = true; Status = ConnectionStatus.Connected; - + // put it in the warehouse if (this.Instance == null) @@ -957,8 +820,8 @@ public partial class DistributedConnection : NetworkConnection, IStore else if (authPacket.Event == IIPAuthPacketEvent.IAuthPlain) { var dataType = authPacket.DataType.Value; - var (_, parsed) = Codec.Parse(data, dataType.Offset, this, null, dataType); - var rt = (Map)parsed.Wait(); + var (_, parsed) = Codec.ParseSync(data, dataType.Offset, dataType); + var rt = (Map)parsed; var headers = rt.Select(x => new KeyValuePair((IIPAuthPacketIAuthHeader)x.Key, x.Value)); var iAuthRequest = new AuthorizationRequest(headers); @@ -983,7 +846,8 @@ public partial class DistributedConnection : NetworkConnection, IStore .Done(); }) .Timeout(iAuthRequest.Timeout * 1000, - () => { + () => + { SendParams() .AddUInt8((byte)IIPAuthPacketEvent.ErrorTerminate) .AddUInt8((byte)ExceptionCode.Timeout) @@ -996,8 +860,8 @@ public partial class DistributedConnection : NetworkConnection, IStore else if (authPacket.Event == IIPAuthPacketEvent.IAuthHashed) { var dataType = authPacket.DataType.Value; - var (_, parsed) = Codec.Parse(data, dataType.Offset, this, null, dataType); - var rt = (Map)parsed.Wait(); + var (_, parsed) = Codec.ParseSync(data, dataType.Offset, dataType); + var rt = (Map)parsed; var headers = rt.Select(x => new KeyValuePair((IIPAuthPacketIAuthHeader)x.Key, x.Value)); @@ -1033,14 +897,15 @@ public partial class DistributedConnection : NetworkConnection, IStore .Done(); }) .Timeout(iAuthRequest.Timeout * 1000, - () => { - SendParams() - .AddUInt8((byte)IIPAuthPacketEvent.ErrorTerminate) - .AddUInt8((byte)ExceptionCode.Timeout) - .AddUInt16(7) - .AddString("Timeout") - .Done(); - }); + () => + { + SendParams() + .AddUInt8((byte)IIPAuthPacketEvent.ErrorTerminate) + .AddUInt8((byte)ExceptionCode.Timeout) + .AddUInt16(7) + .AddString("Timeout") + .Done(); + }); } } else if (authPacket.Event == IIPAuthPacketEvent.IAuthEncrypted) @@ -1058,10 +923,9 @@ public partial class DistributedConnection : NetworkConnection, IStore var dataType = authPacket.DataType.Value; - var (_, parsed) = Codec.Parse(data, dataType.Offset, this, null, dataType); - - var rt = (Map)parsed.Wait(); + var (_, parsed) = Codec.ParseSync(data, dataType.Offset, dataType); + var rt = (Map)parsed; session.RemoteHeaders = rt.Select(x => new KeyValuePair((IIPAuthPacketHeader)x.Key, x.Value)); @@ -1250,7 +1114,7 @@ public partial class DistributedConnection : NetworkConnection, IStore reply = Server.Membership.GetToken((ulong)session.RemoteHeaders[IIPAuthPacketHeader.TokenIndex], (string)session.RemoteHeaders[IIPAuthPacketHeader.Domain]); } - else + else { throw new NotImplementedException("Authentication method unsupported."); } @@ -1317,9 +1181,7 @@ public partial class DistributedConnection : NetworkConnection, IStore var reference = authPacket.Reference; var dataType = authPacket.DataType.Value; - var (_, parsed) = Codec.Parse(data, dataType.Offset, this, null, dataType); - - var value = parsed.Wait(); + var (_, value) = Codec.ParseSync(data, dataType.Offset, dataType); Server.Membership.AuthorizePlain(session, reference, value) .Then(x => ProcessAuthorization(x)); @@ -1386,7 +1248,7 @@ public partial class DistributedConnection : NetworkConnection, IStore session.Id = new byte[32]; r.NextBytes(session.Id); var accountId = session.AuthorizedAccount.ToBytes(); - + SendParams() .AddUInt8((byte)IIPAuthPacketEvent.IndicationEstablished) .AddUInt8((byte)session.Id.Length) @@ -1687,33 +1549,22 @@ public partial class DistributedConnection : NetworkConnection, IStore try { - var ar = await SendRequest(IIPPacketRequest.QueryLink) - .AddUInt16((ushort)link.Length) - .AddUInt8Array(link) - .Done(); + var id = (uint)await SendRequest(IIPPacketRequest.GetResourceIdByLink, link); - var dataType = (TransmissionType)ar[0]; - var data = ar[1] as byte[]; - if (dataType.Identifier == TransmissionTypeIdentifier.ResourceList) - { + // remove from suspended. + suspendedResources.Remove(r.DistributedResourceInstanceId); - // remove from suspended. - suspendedResources.Remove(r.DistributedResourceInstanceId); + // id changed ? + if (id != r.DistributedResourceInstanceId) + r.DistributedResourceInstanceId = id; - // parse them as int - var id = data.GetUInt32(8, Endian.Little); + neededResources[id] = r; - // id changed ? - if (id != r.DistributedResourceInstanceId) - r.DistributedResourceInstanceId = id; + await Fetch(id, null); - neededResources[id] = r; + Global.Log("DistributedConnection", LogType.Debug, "Restored " + id); - await Fetch(id, null); - - Global.Log("DistributedConnection", LogType.Debug, "Restored " + id); - } } catch (AsyncException ex) { diff --git a/Esiur/Net/IIP/DistributedConnectionProtocol.cs b/Esiur/Net/IIP/DistributedConnectionProtocol.cs index 1f6b2cb..39b81f9 100644 --- a/Esiur/Net/IIP/DistributedConnectionProtocol.cs +++ b/Esiur/Net/IIP/DistributedConnectionProtocol.cs @@ -1,6 +1,6 @@ /* -Copyright (c) 2017 Ahmed Kh. Zamil +Copyright (c) 2017-2025 Ahmed Kh. Zamil Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal @@ -22,22 +22,22 @@ SOFTWARE. */ -using Esiur.Data; using Esiur.Core; +using Esiur.Data; +using Esiur.Misc; +using Esiur.Net.Packets; using Esiur.Resource; using Esiur.Resource.Template; using Esiur.Security.Authority; using Esiur.Security.Permissions; using System; using System.Collections.Generic; - +using System.ComponentModel.DataAnnotations; using System.Linq; using System.Reflection; +using System.Security.Cryptography.X509Certificates; using System.Text; using System.Threading.Tasks; -using System.Security.Cryptography.X509Certificates; -using Esiur.Misc; -using Esiur.Net.Packets; namespace Esiur.Net.IIP; @@ -61,7 +61,7 @@ partial class DistributedConnection Dictionary> subscriptions = new Dictionary>(); - // resources might get attched by the client + // resources might get attached by the client internal KeyList cache = new(); object subscriptionsLock = new object(); @@ -76,93 +76,116 @@ partial class DistributedConnection /// Packet action. /// Arguments to send. /// - SendList SendRequest(IIPPacketRequest action) + AsyncReply SendRequest(IIPPacketRequest action, params object[] args) { var reply = new AsyncReply(); var c = callbackCounter++; // avoid thread racing requests.Add(c, reply); - return (SendList)SendParams(reply) - .AddUInt8((byte)(0x40 | (byte)action)) - .AddUInt32(c); - } + if (args.Length == 0) + { + var bl = new BinaryList(); + bl.AddUInt8((byte)(0x40 | (byte)action)) + .AddUInt32(c); + Send(bl.ToArray()); + } + if (args.Length == 1) + { + var bl = new BinaryList(); + bl.AddUInt8((byte)(0x60 | (byte)action)) + .AddUInt32(c) + .AddUInt8Array(Codec.Compose(args[0], this)); + Send(bl.ToArray()); + } + else + { + var bl = new BinaryList(); + bl.AddUInt8((byte)(0x60 | (byte)action)) + .AddUInt32(c) + .AddUInt8Array(Codec.Compose(args, this)); + Send(bl.ToArray()); + } - /* - internal IAsyncReply SendRequest(IIPPacket.IIPPacketAction action, params object[] args) - { - var reply = new AsyncReply(); - callbackCounter++; - var bl = new BinaryList((byte)(0x40 | (byte)action), callbackCounter); - bl.AddRange(args); - Send(bl.ToArray()); - requests.Add(callbackCounter, reply); return reply; } - */ - //uint maxcallerid = 0; - - internal SendList SendReply(IIPPacketRequest action, uint callbackId) + /// + /// Send IIP notification. + /// + /// Packet action. + /// Arguments to send. + /// + AsyncReply SendNotification(IIPPacketNotification action, params object[] args) { - return (SendList)SendParams().AddUInt8((byte)(0x80 | (byte)action)).AddUInt32(callbackId); + var reply = new AsyncReply(); + + if (args.Length == 0) + { + Send(new byte[] { (byte)action }); + } + if (args.Length == 1) + { + var bl = new BinaryList(); + bl.AddUInt8((byte)(0x20 | (byte)action)) + .AddUInt8Array(Codec.Compose(args[0], this)); + Send(bl.ToArray()); + } + else + { + var bl = new BinaryList(); + bl.AddUInt8((byte)(0x20 | (byte)action)) + .AddUInt8Array(Codec.Compose(args, this)); + Send(bl.ToArray()); + } + + return reply; } - internal SendList SendEvent(IIPPacketEvent evt) + void SendReply(IIPPacketReply action, uint callbackId, params object[] args) { - return (SendList)SendParams().AddUInt8((byte)(evt)); + if (args.Length == 0) + { + var bl = new BinaryList(); + bl.AddUInt8((byte)(0x80 | (byte)action)) + .AddUInt32(callbackId); + Send(bl.ToArray()); + } + if (args.Length == 1) + { + var bl = new BinaryList(); + bl.AddUInt8((byte)(0xA0 | (byte)action)) + .AddUInt32(callbackId) + .AddUInt8Array(Codec.Compose(args[0], this)); + Send(bl.ToArray()); + } + else + { + var bl = new BinaryList(); + bl.AddUInt8((byte)(0xA0 | (byte)action)) + .AddUInt32(callbackId) + .AddUInt8Array(Codec.Compose(args, this)); + Send(bl.ToArray()); + } } + internal AsyncReply SendSubscribeRequest(uint instanceId, byte index) { - var reply = new AsyncReply(); - var c = callbackCounter++; - requests.Add(c, reply); - - SendParams().AddUInt8((byte)(0x40 | (byte)IIPPacketRequest.Subscribe)) - .AddUInt32(c) - .AddUInt32(instanceId) - .AddUInt8(index) - .Done(); - - return reply; + return SendRequest(IIPPacketRequest.Subscribe, instanceId, index); } internal AsyncReply SendUnsubscribeRequest(uint instanceId, byte index) { - var reply = new AsyncReply(); - var c = callbackCounter++; - requests.Add(c, reply); - - SendParams().AddUInt8((byte)(0x40 | (byte)IIPPacketRequest.Unsubscribe)) - .AddUInt32(c) - .AddUInt32(instanceId) - .AddUInt8(index) - .Done(); - - return reply; + return SendRequest(IIPPacketRequest.Unsubscribe, instanceId, index); } - public AsyncReply StaticCall(UUID classId, byte index, Map parameters) + public AsyncReply StaticCall(UUID classId, byte index, Map parameters) { - var pb = Codec.Compose(parameters, this);// Codec.ComposeVarArray(parameters, this, true); - - var reply = new AsyncReply(); - var c = callbackCounter++; - requests.Add(c, reply); - - - SendParams().AddUInt8((byte)(0x40 | (byte)IIPPacketRequest.StaticCall)) - .AddUInt32(c) - .AddUUID(classId) - .AddUInt8(index) - .AddUInt8Array(pb) - .Done(); - - return reply; + return SendRequest(IIPPacketRequest.StaticCall, classId, index, parameters); } - public AsyncReply Call(string procedureCall, params object[] parameters) + public AsyncReply Call(string procedureCall, params object[] parameters) { var args = new Map(); for (byte i = 0; i < parameters.Length; i++) @@ -170,55 +193,22 @@ partial class DistributedConnection return Call(procedureCall, args); } - public AsyncReply Call(string procedureCall, Map parameters) + public AsyncReply Call(string procedureCall, Map parameters) { - var pb = Codec.Compose(parameters, this); - - var reply = new AsyncReply(); - var c = callbackCounter++; - requests.Add(c, reply); - - var callName = DC.ToBytes(procedureCall); - - SendParams().AddUInt8((byte)(0x40 | (byte)IIPPacketRequest.ProcedureCall)) - .AddUInt32(c) - .AddUInt16((ushort)callName.Length) - .AddUInt8Array(callName) - .AddUInt8Array(pb) - .Done(); - - return reply; + return SendRequest(IIPPacketRequest.ProcedureCall, procedureCall, parameters); } - internal AsyncReply SendInvoke(uint instanceId, byte index, Map parameters) + internal AsyncReply SendInvoke(uint instanceId, byte index, Map parameters) { - var pb = Codec.Compose(parameters, this);// Codec.ComposeVarArray(parameters, this, true); - - var reply = new AsyncReply(); - var c = callbackCounter++; - requests.Add(c, reply); - - SendParams().AddUInt8((byte)(0x40 | (byte)IIPPacketRequest.InvokeFunction)) - .AddUInt32(c) - .AddUInt32(instanceId) - .AddUInt8(index) - .AddUInt8Array(pb) - .Done(); - return reply; + return SendRequest(IIPPacketRequest.InvokeFunction, instanceId, index, parameters); } - internal AsyncReply SendSetProperty(uint instanceId, byte index, object value) + internal AsyncReply SendSetProperty(uint instanceId, byte index, object value) { - var cv = Codec.Compose(value, this); - - return SendRequest(IIPPacketRequest.SetProperty) - .AddUInt32(instanceId) - .AddUInt8(index) - .AddUInt8Array(cv) - .Done(); + return SendRequest(IIPPacketRequest.SetProperty, instanceId, index, value); } - internal AsyncReply SendDetachRequest(uint instanceId) + internal AsyncReply SendDetachRequest(uint instanceId) { try { @@ -237,9 +227,7 @@ partial class DistributedConnection } if (sendDetach) - return SendRequest(IIPPacketRequest.DetachResource) - .AddUInt32(instanceId) - .Done(); + return SendRequest(IIPPacketRequest.DetachResource, instanceId); return null; // no one is waiting for this } @@ -249,100 +237,167 @@ partial class DistributedConnection } } - void SendError(ErrorType type, uint callbackId, ushort errorCode, string errorMessage = "") + void SendError(ErrorType type, uint callbackId, ushort errorCodeOrWarningLevel, string message = "") { - var msg = DC.ToBytes(errorMessage); if (type == ErrorType.Management) - SendParams() - .AddUInt8((byte)(0xC0 | (byte)IIPPacketReport.ManagementError)) - .AddUInt32(callbackId) - .AddUInt16(errorCode) - .Done(); + SendReply(IIPPacketReply.PermissionError, callbackId, errorCodeOrWarningLevel, message); else if (type == ErrorType.Exception) - SendParams() - .AddUInt8((byte)(0xC0 | (byte)IIPPacketReport.ExecutionError)) - .AddUInt32(callbackId) - .AddUInt16(errorCode) - .AddUInt16((ushort)msg.Length) - .AddUInt8Array(msg) - .Done(); + SendReply(IIPPacketReply.ExecutionError, callbackId, errorCodeOrWarningLevel, message); + else if (type == ErrorType.Warning) + SendReply(IIPPacketReply.Warning, callbackId, (byte)errorCodeOrWarningLevel, message); } - internal void SendProgress(uint callbackId, int value, int max) + internal void SendProgress(uint callbackId, uint value, uint max) { - SendParams() - .AddUInt8((byte)(0xC0 | (byte)IIPPacketReport.ProgressReport)) - .AddUInt32(callbackId) - .AddInt32(value) - .AddInt32(max) - .Done(); - //SendParams(, callbackId, value, max); + SendReply(IIPPacketReply.Progress, callbackId, value, max); } internal void SendChunk(uint callbackId, object chunk) { - var c = Codec.Compose(chunk, this); - SendParams() - .AddUInt8((byte)(0xC0 | (byte)IIPPacketReport.ChunkStream)) - .AddUInt32(callbackId) - .AddUInt8Array(c) - .Done(); + SendReply(IIPPacketReply.Chunk, callbackId, chunk); } - void IIPReply(uint callbackId, params object[] results) - { - var req = requests.Take(callbackId); - req?.Trigger(results); - } - - void IIPReplyInvoke(uint callbackId, TransmissionType transmissionType, byte[] content) + void IIPReplyCompleted(uint callbackId, TransmissionType dataType, byte[] data) { var req = requests.Take(callbackId); - var (_, parsed) = Codec.Parse(content, 0, this, null, transmissionType); - parsed.Then((rt) => + if (req == null) { - req?.Trigger(rt); - }); - } + // @TODO: Send general failure + return; + } - void IIPReportError(uint callbackId, ErrorType errorType, ushort errorCode, string errorMessage) - { - var req = requests.Take(callbackId); - req?.TriggerError(new AsyncException(errorType, errorCode, errorMessage)); - } - - void IIPReportProgress(uint callbackId, ProgressType type, int value, int max) - { - var req = requests[callbackId]; - req?.TriggerProgress(type, value, max); - } - - void IIPReportChunk(uint callbackId, TransmissionType dataType, byte[] data) - { - if (requests.ContainsKey(callbackId)) + var (_, parsed) = Codec.ParseAsync(data, 0, this, null); + if (parsed is AsyncReply reply) { - var req = requests[callbackId]; - var (_, parsed) = Codec.Parse(data, dataType.Offset, this, null, dataType); - parsed.Then((x) => + reply.Then(result => { - req.TriggerChunk(x); + req.Trigger(result); }); } + else + { + req.Trigger(parsed); + } } - void IIPNotificationResourceReassigned(object value) + void IIPExtensionAction(byte actionId, TransmissionType? dataType, byte[] data) + { + // nothing is supported now + } + + void IIPReplyPropagated(uint callbackId, TransmissionType dataType, byte[] data) + { + var req = requests[callbackId]; + + if (req == null) + { + // @TODO: Send general failure + return; + } + + var (_, parsed) = Codec.ParseAsync(data, 0, this, null); + if (parsed is AsyncReply reply) + { + reply.Then(result => + { + req.TriggerPropagation(result); + }); + } + else + { + req.TriggerPropagation(parsed); + } + } + + void IIPReplyError(uint callbackId, TransmissionType dataType, byte[] data, ErrorType type) + { + var req = requests.Take(callbackId); + + if (req == null) + { + // @TODO: Send general failure + return; + } + + var args = DataDeserializer.ListParser(data, dataType.Offset, (uint)dataType.ContentLength) + as object[]; + + var errorCode = (ushort)args[0]; + var errorMsg = (string)args[1]; + + req.TriggerError(new AsyncException(type, errorCode, errorMsg)); + } + + void IIPReplyProgress(uint callbackId, TransmissionType dataType, byte[] data) + { + var req = requests[callbackId]; + + if (req == null) + { + // @TODO: Send general failure + return; + } + + var args = DataDeserializer.ListParser(data, dataType.Offset, (uint)dataType.ContentLength) + as object[]; + + var current = (uint)args[0]; + var total = (uint)args[1]; + + req.TriggerProgress(ProgressType.Execution, current, total); + } + + void IIPReplyWarning(uint callbackId, TransmissionType dataType, byte[] data) + { + var req = requests[callbackId]; + + if (req == null) + { + // @TODO: Send general failure + return; + } + + var args = DataDeserializer.ListParser(data, dataType.Offset, (uint)dataType.ContentLength) + as object[]; + + var level = (byte)args[0]; + var message = (string)args[1]; + + req.TriggerWarning(level, message); + } + + + + void IIPReplyChunk(uint callbackId, TransmissionType dataType, byte[] data) + { + var req = requests[callbackId]; + + if (req == null) + return; + + var (_, parsed) = Codec.ParseAsync(data, dataType.Offset, this, null, dataType); + + if (parsed is AsyncReply reply) + reply.Then(result => req.TriggerChunk(result)); + else + req.TriggerChunk(parsed); + } + + void IIPNotificationResourceReassigned(TransmissionType dataType, byte[] data) { // uint resourceId, uint newResourceId } - void IIPNotificationResourceMoved(object value) { } + void IIPNotificationResourceMoved(TransmissionType dataType, byte[] data) { } - void IIPNotificationSystemFailure(object value) { } + void IIPNotificationSystemFailure(TransmissionType dataType, byte[] data) { } - void IIPNotificationResourceDestroyed(object value) + void IIPNotificationResourceDestroyed(TransmissionType dataType, byte[] data) { - var resourceId = (uint)value; + var (size, rt) = Codec.ParseSync(data, dataType.Offset, dataType); + + var resourceId = (uint)rt; if (attachedResources.Contains(resourceId)) { @@ -366,125 +421,87 @@ partial class DistributedConnection // @TODO: handle this mess neededResources.Remove(resourceId); } + } - void IIPNotificationPropertyModified(object value) + void IIPNotificationPropertyModified(TransmissionType dataType, byte[] data) { - // uint resourceId, byte index, TransmissionType dataType, byte[] data + // resourceId, index, value + var (valueOffset, valueSize, args) = + DataDeserializer.LimitedCountListParser(data, dataType.Offset, dataType.ContentLength, 2); + var rid = (uint)args[0]; + var index = (byte)args[1]; - Fetch(resourceId, null).Then(r => + Fetch(rid, null).Then(r => { + var pt = r.Instance.Template.GetPropertyTemplateByIndex(index); + if (pt != null) + return; + var item = new AsyncReply(); queue.Add(item); - var (_, parsed) = Codec.Parse(data, dataType.Offset, this, null, dataType);// 0, this); - parsed.Then((arguments) => + var (_, parsed) = Codec.ParseAsync(data, valueOffset, this, null); + + if (parsed is AsyncReply) { - var pt = r.Instance.Template.GetPropertyTemplateByIndex(index); - if (pt != null) + (parsed as AsyncReply).Then((result) => { item.Trigger(new DistributedResourceQueueItem((DistributedResource)r, DistributedResourceQueueItem.DistributedResourceQueueItemType.Propery, - arguments, index)); - } - else - { // ft found, fi not found, this should never happen - queue.Remove(item); - } - }); - + result, index)); + }); + } + else + { + item.Trigger(new DistributedResourceQueueItem((DistributedResource)r, + DistributedResourceQueueItem.DistributedResourceQueueItemType.Propery, + parsed, index)); + } }); } - void IIPEventEventOccurred(uint resourceId, byte index, TransmissionType dataType, byte[] data) + void IIPNotificationEventOccurred(TransmissionType dataType, byte[] data) { + // resourceId, index, value + var (valueOffset, valueSize, args) = + DataDeserializer.LimitedCountListParser(data, dataType.Offset, + dataType.ContentLength, 2); + + var resourceId = (uint)args[0]; + var index = (byte)args[1]; + Fetch(resourceId, null).Then(r => { - // push to the queue to gaurantee serialization + var et = r.Instance.Template.GetEventTemplateByIndex(index); + + if (et == null) // this should never happen + return; + + // push to the queue to guarantee serialization var item = new AsyncReply(); queue.Add(item); - var (_, parsed) = Codec.Parse(data, dataType.Offset, this, null, dataType);//, 0, this); - parsed.Then((arguments) => + + var (_, parsed) = Codec.ParseAsync(data, valueOffset, this, null); + + if (parsed is AsyncReply) { - var et = r.Instance.Template.GetEventTemplateByIndex(index); - if (et != null) + (parsed as AsyncReply).Then((result) => { item.Trigger(new DistributedResourceQueueItem((DistributedResource)r, - DistributedResourceQueueItem.DistributedResourceQueueItemType.Event, arguments, index)); - } - else - { // ft found, fi not found, this should never happen - queue.Remove(item); - } - - }); + DistributedResourceQueueItem.DistributedResourceQueueItemType.Event, result, index)); + }); + } + else + { + item.Trigger(new DistributedResourceQueueItem((DistributedResource)r, + DistributedResourceQueueItem.DistributedResourceQueueItemType.Event, parsed, index)); + } }); - /* - if (resources.Contains(resourceId)) - { - // push to the queue to gaurantee serialization - var reply = new AsyncReply(); - var r = resources[resourceId]; - - queue.Add(reply); - - Codec.ParseVarArray(content, this).Then((arguments) => - { - if (!r.IsAttached) - { - // event occurred before the template is received - r.AddAfterAttachement(reply, - new DistributedResourceQueueItem((DistributedResource)r, - DistributedResourceQueueItem.DistributedResourceQueueItemType.Event, arguments, index)); - } - else - { - var et = r.Instance.Template.GetEventTemplate(index); - if (et != null) - { - reply.Trigger(new DistributedResourceQueueItem((DistributedResource)r, - DistributedResourceQueueItem.DistributedResourceQueueItemType.Event, arguments, index)); - } - else - { // ft found, fi not found, this should never happen - queue.Remove(reply); - } - } - }); - } - */ - } - - void IIPEventChildAdded(uint resourceId, uint childId) - { - Fetch(resourceId, null).Then(parent => - { - Fetch(childId, null).Then(child => - { - parent.children.Add(child); - child.parents.Add(parent); - - //parent.Instance.Children.Add(child); - }); - }); - } - - void IIPEventChildRemoved(uint resourceId, uint childId) - { - Fetch(resourceId, null).Then(parent => - { - Fetch(childId, null).Then(child => - { - parent.children.Remove(child); - child.parents.Remove(parent); - - // parent.Instance.Children.Remove(child); - }); - }); } void IIPEventRenamed(uint resourceId, string name) @@ -509,9 +526,13 @@ partial class DistributedConnection }); } - void IIPRequestAttachResource(uint callback, uint resourceId) + void IIPRequestAttachResource(uint callback, TransmissionType dataType, byte[] data) { + var (_, value) = Codec.ParseSync(data, 0, dataType); + + var resourceId = (uint)value; + Warehouse.GetById(resourceId).Then((res) => { if (res != null) @@ -527,179 +548,105 @@ partial class DistributedConnection // unsubscribe Unsubscribe(r); - //r.Instance.ResourceEventOccurred -= Instance_EventOccurred; - //r.Instance.CustomResourceEventOccurred -= Instance_CustomEventOccurred; - //r.Instance.ResourceModified -= Instance_PropertyModified; - //r.Instance.ResourceDestroyed -= Instance_ResourceDestroyed; - - - // r.Instance.Children.OnAdd -= Children_OnAdd; - // r.Instance.Children.OnRemoved -= Children_OnRemoved; - - //r.Instance.Attributes.OnModified -= Attributes_OnModified; - - // Console.WriteLine("Attach {0} {1}", r.Instance.Link, r.Instance.Id); - - // add it to attached resources so GC won't remove it from memory - ///attachedResources.Add(r); - - var link = DC.ToBytes(r.Instance.Link); - - if (r is DistributedResource) + if (r is DistributedResource dr) { // reply ok - SendReply(IIPPacketRequest.AttachResource, callback) - .AddUUID(r.Instance.Template.ClassId) - .AddUInt64(r.Instance.Age) - .AddUInt16((ushort)link.Length) - .AddUInt8Array(link) - //.AddUInt8Array(DataSerializer.PropertyValueArrayComposer((r as DistributedResource)._Serialize(), this, true)) - .AddUInt8Array(Codec.Compose((r as DistributedResource)._Serialize(), this)) - .Done(); + SendReply(IIPPacketReply.Completed, callback, + r.Instance.Template.ClassId, + r.Instance.Age, + r.Instance.Link, + r.Instance.Hops, + dr._Serialize()); } else { // reply ok - SendReply(IIPPacketRequest.AttachResource, callback) - .AddUUID(r.Instance.Template.ClassId) - .AddUInt64(r.Instance.Age) - .AddUInt16((ushort)link.Length) - .AddUInt8Array(link) - .AddUInt8Array(Codec.Compose(r.Instance.Serialize(), this)) - //.AddUInt8Array(DataSerializer.PropertyValueArrayComposer(r.Instance.Serialize(), this, true)) - .Done(); + SendReply(IIPPacketReply.Completed, callback, + r.Instance.Template.ClassId, + r.Instance.Age, + r.Instance.Link, + r.Instance.Hops, + r.Instance.Serialize()); + } - - // subscribe - //r.Instance.ResourceEventOccurred += Instance_EventOccurred; - //r.Instance.CustomResourceEventOccurred += Instance_CustomEventOccurred; - //r.Instance.ResourceModified += Instance_PropertyModified; - //r.Instance.ResourceDestroyed += Instance_ResourceDestroyed; - Subscribe(r); - - //r.Instance.Children.OnAdd += Children_OnAdd; - //r.Instance.Children.OnRemoved += Children_OnRemoved; - - //r.Instance.Attributes.OnModified += Attributes_OnModified; - - } else { // reply failed - //SendParams(0x80, r.Instance.Id, r.Instance.Age, r.Instance.Serialize(false, this)); - Global.Log("DistributedConnection", LogType.Debug, "Not found " + resourceId); - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); } }); } - private void Attributes_OnModified(string key, object oldValue, object newValue, KeyList sender) + void IIPRequestReattachResource(uint callback, TransmissionType dataType, byte[] data) { - if (key == "name") - { - var instance = (sender.Owner as Instance); - var name = DC.ToBytes(newValue.ToString()); - SendEvent(IIPPacketEvent.ChildRemoved) - .AddUInt32(instance.Id) - .AddUInt16((ushort)name.Length) - .AddUInt8Array(name) - .Done(); - } - } + // resourceId, index, value + var (valueOffset, valueSize, args) = + DataDeserializer.LimitedCountListParser(data, dataType.Offset, + dataType.ContentLength, 2); - private void Children_OnRemoved(Instance sender, IResource value) - { - SendEvent(IIPPacketEvent.ChildRemoved) - .AddUInt32(sender.Id) - .AddUInt32(value.Instance.Id) - .Done(); - } + var resourceId = (uint)args[0]; + var age = (ulong)args[1]; - private void Children_OnAdd(Instance sender, IResource value) - { - //if (sender.Applicable(sender.Resource, this.session, ActionType.)) - SendEvent(IIPPacketEvent.ChildAdded) - .AddUInt32(sender.Id) - .AddUInt32(value.Instance.Id) - .Done(); - } - - - public bool RemoveChild(IResource parent, IResource child) - { - SendEvent(IIPPacketEvent.ChildRemoved) - .AddUInt32((parent as DistributedResource).DistributedResourceInstanceId) - .AddUInt32((child as DistributedResource).DistributedResourceInstanceId) - .Done(); - - return true; - } - - public bool AddChild(IResource parent, IResource child) - { - SendEvent(IIPPacketEvent.ChildAdded) - .AddUInt32((parent as DistributedResource).DistributedResourceInstanceId) - .AddUInt32((child as DistributedResource).DistributedResourceInstanceId) - .Done(); - - return true; - } - - - void IIPRequestReattachResource(uint callback, uint resourceId, ulong resourceAge) - { Warehouse.GetById(resourceId).Then((res) => { if (res != null) { + if (res.Instance.Applicable(session, ActionType.Attach, null) == Ruling.Denied) + { + SendError(ErrorType.Management, callback, 6); + return; + } + var r = res as IResource; + // unsubscribe Unsubscribe(r); - Subscribe(r); - //r.Instance.ResourceEventOccurred -= Instance_EventOccurred; - //r.Instance.CustomResourceEventOccurred -= Instance_CustomEventOccurred; - //r.Instance.ResourceModified -= Instance_PropertyModified; - //r.Instance.ResourceDestroyed -= Instance_ResourceDestroyed; - - //r.Instance.Children.OnAdd -= Children_OnAdd; - //r.Instance.Children.OnRemoved -= Children_OnRemoved; - - //r.Instance.Attributes.OnModified -= Attributes_OnModified; + if (r is DistributedResource dr) + { + // reply ok + SendReply(IIPPacketReply.Completed, callback, + r.Instance.Template.ClassId, + r.Instance.Age, + r.Instance.Link, + r.Instance.Hops, + dr._SerializeAfter(age)); + } + else + { + // reply ok + SendReply(IIPPacketReply.Completed, callback, + r.Instance.Template.ClassId, + r.Instance.Age, + r.Instance.Link, + r.Instance.Hops, + r.Instance.SerializeAfter(age)); + } // subscribe - //r.Instance.ResourceEventOccurred += Instance_EventOccurred; - //r.Instance.CustomResourceEventOccurred += Instance_CustomEventOccurred; - //r.Instance.ResourceModified += Instance_PropertyModified; - //r.Instance.ResourceDestroyed += Instance_ResourceDestroyed; - - //r.Instance.Children.OnAdd += Children_OnAdd; - //r.Instance.Children.OnRemoved += Children_OnRemoved; - - //r.Instance.Attributes.OnModified += Attributes_OnModified; - - // reply ok - SendReply(IIPPacketRequest.ReattachResource, callback) - .AddUInt64(r.Instance.Age) - .AddUInt8Array(Codec.Compose(r.Instance.Serialize(), this)) - .Done(); + Subscribe(r); } else { // reply failed + Global.Log("DistributedConnection", LogType.Debug, "Not found " + resourceId); SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); } }); } - void IIPRequestDetachResource(uint callback, uint resourceId) + void IIPRequestDetachResource(uint callback, TransmissionType dataType, byte[] data) { + + var (_, value) = Codec.ParseSync(data, 0, dataType); + + var resourceId = (uint)value; + Warehouse.GetById(resourceId).Then((res) => { if (res != null) @@ -714,7 +661,7 @@ partial class DistributedConnection //attachedResources.Remove(res); // reply ok - SendReply(IIPPacketRequest.DetachResource, callback).Done(); + SendReply(IIPPacketReply.Completed, callback); } else { @@ -724,9 +671,24 @@ partial class DistributedConnection }); } - void IIPRequestCreateResource(uint callback, uint storeId, uint parentId, byte[] content) + void IIPRequestCreateResource(uint callback, TransmissionType dataType, byte[] data) { + var (offset, length, args) = DataDeserializer.LimitedCountListParser(data, dataType.Offset, + dataType.ContentLength, 4); + var storeId = (uint)args[0]; + var parentId = (uint)args[1]; + var className = (string)args[2]; + var instanceName = (string)args[3]; + + var type = Type.GetType(className); + + if (type == null) + { + SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ClassNotFound); + return; + } + Warehouse.GetById(storeId).Then(store => { if (store == null) @@ -748,11 +710,10 @@ partial class DistributedConnection return; } + Warehouse.GetById(parentId).Then(parent => { - // check security - if (parent != null) if (parent.Instance.Applicable(session, ActionType.AddChild, null) != Ruling.Allowed) { @@ -760,105 +721,49 @@ partial class DistributedConnection return; } - uint offset = 0; - var className = content.GetString(offset + 1, content[0]); - offset += 1 + (uint)content[0]; + var (_, parsed) = Codec.ParseAsync(data, offset, this, null); - var nameLength = content.GetUInt16(offset, Endian.Little); - offset += 2; - var name = content.GetString(offset, nameLength); - - var cl = content.GetUInt32(offset, Endian.Little); - offset += 4; - - var type = Type.GetType(className); - - if (type == null) + if (parsed is AsyncReply reply) { - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ClassNotFound); - return; - } - - DataDeserializer.ListParser(content, offset, cl, this, null).Then(parameters => - { - offset += cl; - cl = content.GetUInt32(offset, Endian.Little); - - //Codec.ParseStructure(content, offset, cl, this).Then(attributes => - DataDeserializer.TypedMapParser(content, offset, cl, this, null).Then(attributes => + reply.Then((result) => { - offset += cl; - cl = (uint)content.Length - offset; - - //Codec.ParseStructure(content, offset, cl, this).Then(values => - DataDeserializer.TypedMapParser(content, offset, cl, this, null).Then(values => - { - -#if NETSTANDARD - var constructors = Type.GetType(className).GetTypeInfo().GetConstructors(); -#else - var constructors = Type.GetType(className).GetConstructors(); -#endif - - var matching = constructors.Where(x => - { - var ps = x.GetParameters(); - if (ps.Length > 0 && ps.Length == parameters.Length + 1) - if (ps.Last().ParameterType == typeof(DistributedConnection)) - return true; - - return ps.Length == parameters.Length; - } - ).ToArray(); - - var pi = matching[0].GetParameters(); - - // cast arguments - object[] args = null; - - if (pi.Length > 0) - { - int argsCount = pi.Length; - args = new object[pi.Length]; - - if (pi[pi.Length - 1].ParameterType == typeof(DistributedConnection)) - { - args[--argsCount] = this; - } - - if (parameters != null) - { - for (int i = 0; i < argsCount && i < parameters.Length; i++) - { - args[i] = DC.CastConvert(parameters[i], pi[i].ParameterType); - } - } - } - - // create the resource - var resource = Activator.CreateInstance(type, args) as IResource; - - Warehouse.Put(name, resource, store as IStore, parent).Then(ok => - { - SendReply(IIPPacketRequest.CreateResource, callback) - .AddUInt32(resource.Instance.Id) - .Done(); - - }).Error(x => - { - SendError(ErrorType.Exception, callback, (ushort)ExceptionCode.AddToStoreFailed); - }); - - }); + var map = (Map)result; + Warehouse.New(type, instanceName, store as IStore, parent, null, null, map) + .Then(resource => + { + SendReply(IIPPacketReply.Completed, callback, resource.Instance.Id); + }) + .Error(ex => + { + SendError(ErrorType.Exception, callback, (ushort)ExceptionCode.AddToStoreFailed); + }); }); - }); + } + else + { + var map = (Map)parsed; + Warehouse.New(type, instanceName, store as IStore, parent, null, null, map) + .Then(resource => + { + SendReply(IIPPacketReply.Completed, callback, resource.Instance.Id); + }).Error(x => + { + SendError(ErrorType.Exception, callback, (ushort)ExceptionCode.AddToStoreFailed); + }); + } }); }); } - void IIPRequestDeleteResource(uint callback, uint resourceId) + + void IIPRequestDeleteResource(uint callback, TransmissionType dataType, byte[] data) { + + var (_, value) = Codec.ParseSync(data, 0, dataType); + + var resourceId = (uint)value; + Warehouse.GetById(resourceId).Then(r => { if (r == null) @@ -874,127 +779,28 @@ partial class DistributedConnection } if (Warehouse.Remove(r)) - SendReply(IIPPacketRequest.DeleteResource, callback).Done(); - //SendParams((byte)0x84, callback); + SendReply(IIPPacketReply.Completed, callback); + else SendError(ErrorType.Management, callback, (ushort)ExceptionCode.DeleteFailed); }); } - void IIPRequestGetAttributes(uint callback, uint resourceId, byte[] attributes, bool all = false) + void IIPRequestMoveResource(uint callback, TransmissionType dataType, byte[] data) { - Warehouse.GetById(resourceId).Then(r => + + var (offset, length, args) = DataDeserializer.LimitedCountListParser(data, dataType.Offset, + dataType.ContentLength); + + var resourceId = (uint)args[0]; + var name = (string)args[1]; + + if (name.Contains("/")) { - if (r == null) - { - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); - return; - } + SendError(ErrorType.Management, callback, (ushort)ExceptionCode.NotSupported); + return; + } - // if (!r.Instance.Store.Instance.Applicable(r, session, ActionType.InquireAttributes, null)) - if (r.Instance.Applicable(session, ActionType.InquireAttributes, null) != Ruling.Allowed) - { - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ViewAttributeDenied); - return; - } - - string[] attrs = null; - - if (!all) - attrs = attributes.GetStringArray(0, (uint)attributes.Length); - - var st = r.Instance.GetAttributes(attrs); - - if (st != null) - SendReply(all ? IIPPacketRequest.GetAllAttributes : IIPPacketRequest.GetAttributes, callback) - .AddUInt8Array(Codec.Compose(st, this)) - .Done(); - else - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.GetAttributesFailed); - - }); - } - - void IIPRequestAddChild(uint callback, uint parentId, uint childId) - { - Warehouse.GetById(parentId).Then(parent => - { - if (parent == null) - { - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); - return; - } - - Warehouse.GetById(childId).Then(child => - { - if (child == null) - { - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); - return; - } - - if (parent.Instance.Applicable(this.session, ActionType.AddChild, null) != Ruling.Allowed) - { - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.AddChildDenied); - return; - } - - if (child.Instance.Applicable(this.session, ActionType.AddParent, null) != Ruling.Allowed) - { - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.AddParentDenied); - return; - } - - parent.Instance.Store.AddChild(parent, child); - - SendReply(IIPPacketRequest.AddChild, callback).Done(); - //child.Instance.Parents - }); - - }); - } - - void IIPRequestRemoveChild(uint callback, uint parentId, uint childId) - { - Warehouse.GetById(parentId).Then(parent => - { - if (parent == null) - { - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); - return; - } - - Warehouse.GetById(childId).Then(child => - { - if (child == null) - { - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); - return; - } - - if (parent.Instance.Applicable(this.session, ActionType.RemoveChild, null) != Ruling.Allowed) - { - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.AddChildDenied); - return; - } - - if (child.Instance.Applicable(this.session, ActionType.RemoveParent, null) != Ruling.Allowed) - { - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.AddParentDenied); - return; - } - - parent.Instance.Store.RemoveChild(parent, child);// Children.Remove(child); - - SendReply(IIPPacketRequest.RemoveChild, callback).Done(); - //child.Instance.Parents - }); - - }); - } - - void IIPRequestRenameResource(uint callback, uint resourceId, string name) - { Warehouse.GetById(resourceId).Then(resource => { if (resource == null) @@ -1011,115 +817,25 @@ partial class DistributedConnection resource.Instance.Name = name; - SendReply(IIPPacketRequest.RenameResource, callback).Done(); + SendReply(IIPPacketReply.Completed, callback); }); } - void IIPRequestResourceChildren(uint callback, uint resourceId) + + + + + void IIPRequestToken(uint callback, TransmissionType dataType, byte[] data) { - Warehouse.GetById(resourceId).Then(resource => - { - if (resource == null) - { - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); - return; - } - - resource.Instance.Children().Then(children => - { - SendReply(IIPPacketRequest.ResourceChildren, callback) - .AddUInt8Array(Codec.Compose(children, this))// Codec.ComposeResourceArray(children, this, true)) - .Done(); - - }); - - - }); + // @TODO: To be implemented } - void IIPRequestResourceParents(uint callback, uint resourceId) + void IIPRequestLinkTemplates(uint callback, TransmissionType dataType, byte[] data) { - Warehouse.GetById(resourceId).Then(resource => - { - if (resource == null) - { - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); - return; - } + var (_, value) = Codec.ParseSync(data, 0, dataType); - resource.Instance.Parents().Then(parents => - { - SendReply(IIPPacketRequest.ResourceParents, callback) - .AddUInt8Array(Codec.Compose(parents, this)) - //.AddUInt8Array(Codec.ComposeResourceArray(parents, this, true)) - .Done(); + var resourceLink = (string)value; - }); - - }); - } - - void IIPRequestClearAttributes(uint callback, uint resourceId, byte[] attributes, bool all = false) - { - Warehouse.GetById(resourceId).Then(r => - { - if (r == null) - { - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); - return; - } - - if (r.Instance.Store.Instance.Applicable(session, ActionType.UpdateAttributes, null) != Ruling.Allowed) - { - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.UpdateAttributeDenied); - return; - } - - string[] attrs = null; - - if (!all) - attrs = attributes.GetStringArray(0, (uint)attributes.Length); - - if (r.Instance.RemoveAttributes(attrs)) - SendReply(all ? IIPPacketRequest.ClearAllAttributes : IIPPacketRequest.ClearAttributes, callback).Done(); - else - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.UpdateAttributeFailed); - - }); - } - - void IIPRequestUpdateAttributes(uint callback, uint resourceId, byte[] attributes, bool clearAttributes = false) - { - Warehouse.GetById(resourceId).Then(r => - { - if (r == null) - { - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); - return; - } - - if (r.Instance.Store.Instance.Applicable(session, ActionType.UpdateAttributes, null) != Ruling.Allowed) - { - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.UpdateAttributeDenied); - return; - } - - - DataDeserializer.TypedMapParser(attributes, 0, (uint)attributes.Length, this, null).Then(attrs => - { - if (r.Instance.SetAttributes((Map)attrs, clearAttributes)) - SendReply(clearAttributes ? IIPPacketRequest.ClearAllAttributes : IIPPacketRequest.ClearAttributes, - callback).Done(); - else - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.UpdateAttributeFailed); - }); - - }); - - } - - void IIPRequestLinkTemplates(uint callback, string resourceLink) - { Action queryCallback = (r) => { if (r == null) @@ -1133,25 +849,18 @@ partial class DistributedConnection else { // get all templates related to this resource - - var msg = new BinaryList(); - var templates = new List(); foreach (var resource in list) templates.AddRange(TypeTemplate.GetDependencies(resource.Instance.Template).Where(x => !templates.Contains(x))); - foreach (var t in templates) - { - msg.AddInt32(t.Content.Length) - .AddUInt8Array(t.Content); - } + var binList = new List(); + + foreach (var t in templates) + binList.Add(t.Content); + + // Send + SendReply(IIPPacketReply.Completed, callback, binList.ToArray()); - // digggg - SendReply(IIPPacketRequest.LinkTemplates, callback) - //.AddInt32(msg.Length) - //.AddUInt8Array(msg.ToArray()) - .AddUInt8Array(TransmissionType.Compose(TransmissionTypeIdentifier.RawData, msg.ToArray())) - .Done(); } } }; @@ -1162,16 +871,18 @@ partial class DistributedConnection Warehouse.Query(resourceLink).Then(queryCallback); } - void IIPRequestTemplateFromClassName(uint callback, string className) + void IIPRequestTemplateFromClassName(uint callback, TransmissionType dataType, byte[] data) { + var (_, value) = Codec.ParseSync(data, 0, dataType); + + var className = (string)value; + var t = Warehouse.GetTemplateByClassName(className); if (t != null) - SendReply(IIPPacketRequest.TemplateFromClassName, callback) - .AddUInt8Array(TransmissionType.Compose(TransmissionTypeIdentifier.RawData, t.Content)) - //.AddInt32(t.Content.Length) - //.AddUInt8Array(t.Content) - .Done(); + { + SendReply(IIPPacketReply.Completed, callback, t.Content); + } else { // reply failed @@ -1179,16 +890,19 @@ partial class DistributedConnection } } - void IIPRequestTemplateFromClassId(uint callback, UUID classId) + void IIPRequestTemplateFromClassId(uint callback, TransmissionType dataType, byte[] data) { + + var (_, value) = Codec.ParseSync(data, 0, dataType); + + var classId = new UUID((byte[])value); + var t = Warehouse.GetTemplateByClassId(classId); if (t != null) - SendReply(IIPPacketRequest.TemplateFromClassId, callback) - .AddUInt8Array(TransmissionType.Compose(TransmissionTypeIdentifier.RawData, t.Content)) - //.AddInt32(t.Content.Length) - //.AddUInt8Array(t.Content) - .Done(); + { + SendReply(IIPPacketReply.Completed, callback, t.Content); + } else { // reply failed @@ -1198,28 +912,33 @@ partial class DistributedConnection - void IIPRequestTemplateFromResourceId(uint callback, uint resourceId) + void IIPRequestTemplateFromResourceId(uint callback, TransmissionType dataType, byte[] data) { + + var (_, value) = Codec.ParseSync(data, 0, dataType); + + var resourceId = (uint)value; + Warehouse.GetById(resourceId).Then((r) => { if (r != null) - SendReply(IIPPacketRequest.TemplateFromResourceId, callback) - .AddInt32(r.Instance.Template.Content.Length) - .AddUInt8Array(r.Instance.Template.Content) - .Done(); + { + SendReply(IIPPacketReply.Completed, callback, r.Instance.Template.Content); + } else { // reply failed - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.TemplateNotFound); + SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); } }); } - - void IIPRequestQueryResources(uint callback, string resourceLink) + void IIPRequestGetResourceIdByLink(uint callback, TransmissionType dataType, byte[] data) { + var (_, parsed) = Codec.ParseSync(data, 0, dataType); + var resourceLink = (string)parsed; Action queryCallback = (r) => { @@ -1232,10 +951,35 @@ partial class DistributedConnection if (list.Length == 0) SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); else - SendReply(IIPPacketRequest.QueryLink, callback) - .AddUInt8Array(Codec.Compose(list, this)) - //.AddUInt8Array(Codec.ComposeResourceArray(list, this, true)) - .Done(); + SendReply(IIPPacketReply.Completed, callback, list.FirstOrDefault()); + } + }; + + if (Server?.EntryPoint != null) + Server.EntryPoint.Query(resourceLink, this).Then(queryCallback); + else + Warehouse.Query(resourceLink).Then(queryCallback); + + } + + void IIPRequestQueryResources(uint callback, TransmissionType dataType, byte[] data) + { + var (_, parsed) = Codec.ParseSync(data, 0, dataType); + + var resourceLink = (string)parsed; + + Action queryCallback = (r) => + { + if (r == null) + SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); + else + { + var list = r.Where(x => x.Instance.Applicable(session, ActionType.Attach, null) != Ruling.Denied).ToArray(); + + if (list.Length == 0) + SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); + else + SendReply(IIPPacketReply.Completed, callback, list); } }; @@ -1264,12 +1008,17 @@ partial class DistributedConnection } - void IIPRequestProcedureCall(uint callback, string procedureCall, TransmissionType transmissionType, byte[] content) + void IIPRequestProcedureCall(uint callback, TransmissionType dataType, byte[] data) { + var (offset, length, args) = DataDeserializer.LimitedCountListParser(data, dataType.Offset, + dataType.ContentLength, 1); + + var procedureCall = (string)args[0]; + if (Server == null) { - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.GeneralFailure); + SendError(ErrorType.Management, callback, (ushort)ExceptionCode.NotSupported); return; } @@ -1281,35 +1030,56 @@ partial class DistributedConnection return; } - var (_, parsed) = Codec.Parse(content, 0, this, null, transmissionType); + var (_, parsed) = Codec.ParseAsync(data, offset, this, null); - parsed.Then(results => + if (parsed is AsyncReply reply) { - var arguments = (Map)results;// (object[])results; + reply.Then(results => + { + var arguments = (Map)results; + + // un hold the socket to send data immediately + this.Socket.Unhold(); + + // @TODO: Make managers for procedure calls + //if (r.Instance.Applicable(session, ActionType.Execute, ft) == Ruling.Denied) + //{ + // SendError(ErrorType.Management, callback, + // (ushort)ExceptionCode.InvokeDenied); + // return; + //} + + InvokeFunction(call.Value.Template, callback, arguments, IIPPacketRequest.ProcedureCall, call.Value.Delegate.Target); + + }).Error(x => + { + SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError); + }); + } + else + { + var arguments = (Map)parsed; // un hold the socket to send data immediately this.Socket.Unhold(); // @TODO: Make managers for procedure calls - //if (r.Instance.Applicable(session, ActionType.Execute, ft) == Ruling.Denied) - //{ - // SendError(ErrorType.Management, callback, - // (ushort)ExceptionCode.InvokeDenied); - // return; - //} - InvokeFunction(call.Value.Template, callback, arguments, IIPPacketRequest.ProcedureCall, call.Value.Delegate.Target); - - }).Error(x => - { - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError); - }); + } } - void IIPRequestStaticCall(uint callback, UUID classId, byte index, TransmissionType transmissionType, byte[] content) + void IIPRequestStaticCall(uint callback, TransmissionType dataType, byte[] data) { + var (offset, length, args) = DataDeserializer.LimitedCountListParser(data, dataType.Offset, + dataType.ContentLength, 2); + + var classId = new UUID((byte[])args[0]); + var index = (byte)args[1]; + + var template = Warehouse.GetTemplateByClassId(classId); + if (template == null) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.TemplateNotFound); @@ -1325,43 +1095,63 @@ partial class DistributedConnection return; } - var (_, parsed) = Codec.Parse(content, 0, this, null, transmissionType); + var fi = ft.MethodInfo; - parsed.Then(results => + if (fi == null) { - var arguments = (Map)results;// (object[])results; + // ft found, fi not found, this should never happen + SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); + return; + } + + var (_, parsed) = Codec.ParseAsync(data, offset, this, null); + + if (parsed is AsyncReply reply) + { + reply.Then(results => + { + var arguments = (Map)results; + + // un hold the socket to send data immediately + this.Socket.Unhold(); + + + // @TODO: Make managers for static calls + //if (r.Instance.Applicable(session, ActionType.Execute, ft) == Ruling.Denied) + //{ + // SendError(ErrorType.Management, callback, + // (ushort)ExceptionCode.InvokeDenied); + // return; + //} + + InvokeFunction(ft, callback, arguments, IIPPacketRequest.StaticCall, null); + + }).Error(x => + { + SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError); + }); + } + else + { + var arguments = (Map)parsed; // un hold the socket to send data immediately this.Socket.Unhold(); - var fi = ft.MethodInfo; - - if (fi == null) - { - // ft found, fi not found, this should never happen - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); - return; - } - // @TODO: Make managers for static calls - //if (r.Instance.Applicable(session, ActionType.Execute, ft) == Ruling.Denied) - //{ - // SendError(ErrorType.Management, callback, - // (ushort)ExceptionCode.InvokeDenied); - // return; - //} + InvokeFunction(ft, callback, arguments, IIPPacketRequest.StaticCall, null); - - }).Error(x => - { - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError); - }); + } } - void IIPRequestInvokeFunction(uint callback, uint resourceId, byte index, TransmissionType transmissionType, byte[] content) + void IIPRequestInvokeFunction(uint callback, TransmissionType dataType, byte[] data) { - //Console.WriteLine("IIPRequestInvokeFunction " + callback + " " + resourceId + " " + index); + var (offset, length, args) = DataDeserializer.LimitedCountListParser(data, dataType.Offset, + dataType.ContentLength, 2); + + var resourceId = (uint)args[0]; + var index = (byte)args[1]; Warehouse.GetById(resourceId).Then((r) => { @@ -1381,11 +1171,49 @@ partial class DistributedConnection return; } - var (_, parsed) = Codec.Parse(content, 0, this, null, transmissionType); + var (_, parsed) = Codec.ParseAsync(data, offset, this, null); - parsed.Then(results => + if (parsed is AsyncReply) { - var arguments = (Map)results;// (object[])results; + (parsed as AsyncReply).Then(result => + { + var arguments = (Map)result; + + // un hold the socket to send data immediately + this.Socket.Unhold(); + + if (r is DistributedResource) + { + var rt = (r as DistributedResource)._Invoke(index, arguments); + if (rt != null) + { + rt.Then(res => + { + SendReply(IIPPacketReply.Completed, callback, res); + }); + } + else + { + // function not found on a distributed object + SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); + } + } + else + { + if (r.Instance.Applicable(session, ActionType.Execute, ft) == Ruling.Denied) + { + SendError(ErrorType.Management, callback, + (ushort)ExceptionCode.InvokeDenied); + return; + } + + InvokeFunction(ft, callback, arguments, IIPPacketRequest.InvokeFunction, r); + } + }); + } + else + { + var arguments = (Map)parsed; // un hold the socket to send data immediately this.Socket.Unhold(); @@ -1397,9 +1225,7 @@ partial class DistributedConnection { rt.Then(res => { - SendReply(IIPPacketRequest.InvokeFunction, callback) - .AddUInt8Array(Codec.Compose(res, this)) - .Done(); + SendReply(IIPPacketReply.Completed, callback, res); }); } else @@ -1410,17 +1236,6 @@ partial class DistributedConnection } else { - - //var fi = r.GetType().GetMethod(ft.Name); - - //if (fi == null) - //{ - // // ft found, fi not found, this should never happen - // SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); - // return; - //} - - if (r.Instance.Applicable(session, ActionType.Execute, ft) == Ruling.Denied) { SendError(ErrorType.Management, callback, @@ -1430,7 +1245,8 @@ partial class DistributedConnection InvokeFunction(ft, callback, arguments, IIPPacketRequest.InvokeFunction, r); } - }); + + } }); } @@ -1518,9 +1334,8 @@ partial class DistributedConnection { foreach (var v in enu) SendChunk(callback, v); - SendReply(actionType, callback) - .AddUInt8((byte)TransmissionTypeIdentifier.Null) - .Done(); + + SendReply(IIPPacketReply.Completed, callback); if (context != null) context.Ended = true; @@ -1548,9 +1363,8 @@ partial class DistributedConnection #else var res = t.GetType().GetProperty("Result").GetValue(t); #endif - SendReply(actionType, callback) - .AddUInt8Array(Codec.Compose(res, this)) - .Done(); + + SendReply(IIPPacketReply.Completed, callback, res); }); } @@ -1561,9 +1375,8 @@ partial class DistributedConnection if (context != null) context.Ended = true; - SendReply(actionType, callback) - .AddUInt8Array(Codec.Compose(res, this)) - .Done(); + SendReply(IIPPacketReply.Completed, callback, res); + }).Error(ex => { var (code, msg) = SummerizeException(ex); @@ -1574,6 +1387,9 @@ partial class DistributedConnection }).Chunk(v => { SendChunk(callback, v); + }).Warning((level, message) => + { + SendError(ErrorType.Warning, callback, level, message); }); } else @@ -1581,246 +1397,258 @@ partial class DistributedConnection if (context != null) context.Ended = true; - SendReply(actionType, callback) - .AddUInt8Array(Codec.Compose(rt, this)) - .Done(); + SendReply(IIPPacketReply.Completed, callback, rt); } } - void IIPRequestListen(uint callback, uint resourceId, byte index) + void IIPRequestSubscribe(uint callback, TransmissionType dataType, byte[] data) { + + var (offset, length, args) = DataDeserializer.LimitedCountListParser(data, dataType.Offset, + dataType.ContentLength); + + var resourceId = (uint)args[0]; + var index = (byte)args[1]; + Warehouse.GetById(resourceId).Then((r) => { - if (r != null) - { - var et = r.Instance.Template.GetEventTemplateByIndex(index); - - if (et != null) - { - if (r is DistributedResource) - { - (r as DistributedResource).Listen(et).Then(x => - { - SendReply(IIPPacketRequest.Listen, callback).Done(); - }).Error(x => SendError(ErrorType.Exception, callback, (ushort)ExceptionCode.GeneralFailure)); - } - else - { - lock (subscriptionsLock) - { - if (!subscriptions.ContainsKey(r)) - { - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.NotAttached); - return; - } - - if (subscriptions[r].Contains(index)) - { - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.AlreadyListened); - return; - } - - subscriptions[r].Add(index); - - SendReply(IIPPacketRequest.Listen, callback).Done(); - } - } - } - else - { - // pt not found - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); - } - } - else + if (r == null) { // resource not found SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); + return; } - }); - } + var et = r.Instance.Template.GetEventTemplateByIndex(index); - void IIPRequestUnlisten(uint callback, uint resourceId, byte index) - { - Warehouse.GetById(resourceId).Then((r) => - { - if (r != null) + if (et != null) { - var et = r.Instance.Template.GetEventTemplateByIndex(index); + // et not found + SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); + return; + } - if (et != null) - { - if (r is DistributedResource) - { - (r as DistributedResource).Unsubscribe(et).Then(x => - { - SendReply(IIPPacketRequest.Unsubscribe, callback).Done(); - }).Error(x => SendError(ErrorType.Exception, callback, (ushort)ExceptionCode.GeneralFailure)); - } - else - { - lock (subscriptionsLock) - { - if (!subscriptions.ContainsKey(r)) - { - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.NotAttached); - return; - } - - if (!subscriptions[r].Contains(index)) - { - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.AlreadyUnlistened); - return; - } - - subscriptions[r].Remove(index); - - SendReply(IIPPacketRequest.Unlisten, callback).Done(); - } - } - } - else - { - // pt not found - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); - } + if (r is DistributedResource) + { + (r as DistributedResource).Subscribe(et).Then(x => + { + SendReply(IIPPacketReply.Completed, callback); + }).Error(x => SendError(ErrorType.Exception, callback, (ushort)ExceptionCode.GeneralFailure)); } else { - // resource not found - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); + lock (subscriptionsLock) + { + if (!subscriptions.ContainsKey(r)) + { + SendError(ErrorType.Management, callback, (ushort)ExceptionCode.NotAttached); + return; + } + + if (subscriptions[r].Contains(index)) + { + SendError(ErrorType.Management, callback, (ushort)ExceptionCode.AlreadyListened); + return; + } + + subscriptions[r].Add(index); + + SendReply(IIPPacketReply.Completed, callback); + } } }); } - - void IIPRequestInquireResourceHistory(uint callback, uint resourceId, DateTime fromDate, DateTime toDate) + void IIPRequestUnsubscribe(uint callback, TransmissionType dataType, byte[] data) { + + var (offset, length, args) = DataDeserializer.LimitedCountListParser(data, dataType.Offset, + dataType.ContentLength); + + var resourceId = (uint)args[0]; + var index = (byte)args[1]; + Warehouse.GetById(resourceId).Then((r) => { - if (r != null) + if (r == null) { - r.Instance.Store.GetRecord(r, fromDate, toDate).Then((results) => + // resource not found + SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); + return; + } + + var et = r.Instance.Template.GetEventTemplateByIndex(index); + + if (et == null) + { + // pt not found + SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); + return; + } + + if (r is DistributedResource) + { + (r as DistributedResource).Unsubscribe(et).Then(x => { - var history = DataSerializer.HistoryComposer(results, this, true); + SendReply(IIPPacketReply.Completed, callback); + }).Error(x => SendError(ErrorType.Exception, callback, (ushort)ExceptionCode.GeneralFailure)); + } + else + { + lock (subscriptionsLock) + { + if (!subscriptions.ContainsKey(r)) + { + SendError(ErrorType.Management, callback, (ushort)ExceptionCode.NotAttached); + return; + } - SendReply(IIPPacketRequest.ResourceHistory, callback) - .AddUInt8Array(history) - .Done(); + if (!subscriptions[r].Contains(index)) + { + SendError(ErrorType.Management, callback, (ushort)ExceptionCode.AlreadyUnsubscribed); + return; + } - }); + subscriptions[r].Remove(index); + + SendReply(IIPPacketReply.Completed, callback); + } } }); } - void IIPRequestSetProperty(uint callback, uint resourceId, byte index, TransmissionType transmissionType, byte[] content) + + + + void IIPRequestSetProperty(uint callback, TransmissionType dataType, byte[] data) { + var (offset, length, args) = DataDeserializer.LimitedCountListParser(data, dataType.Offset, + dataType.ContentLength, 2); + + var rid = (uint)args[0]; + var index = (byte)args[1]; + // un hold the socket to send data immediately this.Socket.Unhold(); - Warehouse.GetById(resourceId).Then((r) => + Warehouse.GetById(rid).Then((r) => { - if (r != null) + if (r == null) { + // resource not found + SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); + return; + } - var pt = r.Instance.Template.GetPropertyTemplateByIndex(index); - if (pt != null) + var pt = r.Instance.Template.GetPropertyTemplateByIndex(index); + + if (pt != null) + { + // property not found + SendError(ErrorType.Management, callback, (ushort)ExceptionCode.PropertyNotFound); + return; + } + + + if (r is DistributedResource) + { + var (_, parsed) = Codec.ParseAsync(data, offset, this, null); + if (parsed is AsyncReply) { - var (_, parsed) = Codec.Parse(content, 0, this, null, transmissionType); - parsed.Then((value) => + (parsed as AsyncReply).Then((value) => { - if (r is DistributedResource) + // propagation + (r as DistributedResource)._Set(index, value).Then((x) => { - // propagation - (r as DistributedResource)._Set(index, value).Then((x) => - { - SendReply(IIPPacketRequest.SetProperty, callback).Done(); + SendReply(IIPPacketReply.Completed, callback); }).Error(x => { SendError(x.Type, callback, (ushort)x.Code, x.Message); }); - } - else - { - - /* - #if NETSTANDARD - var pi = r.GetType().GetTypeInfo().GetProperty(pt.Name); - #else - var pi = r.GetType().GetProperty(pt.Name); - #endif*/ - - var pi = pt.PropertyInfo; - - if (pi != null) - { - - if (r.Instance.Applicable(session, ActionType.SetProperty, pt, this) == Ruling.Denied) - { - SendError(ErrorType.Exception, callback, (ushort)ExceptionCode.SetPropertyDenied); - return; - } - - if (!pi.CanWrite) - { - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ReadOnlyProperty); - return; - } - - - if (pi.PropertyType.IsGenericType && pi.PropertyType.GetGenericTypeDefinition() - == typeof(PropertyContext<>)) - { - value = Activator.CreateInstance(pi.PropertyType, this, value); - //value = new DistributedPropertyContext(this, value); - } - else - { - // cast new value type to property type - value = DC.CastConvert(value, pi.PropertyType); - } - - - try - { - - pi.SetValue(r, value); - SendReply(IIPPacketRequest.SetProperty, callback).Done(); - } - catch (Exception ex) - { - SendError(ErrorType.Exception, callback, 0, ex.Message); - } - - } - else - { - // pt found, pi not found, this should never happen - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.PropertyNotFound); - } - } - }); } - else - { - // property not found - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.PropertyNotFound); - } } else { - // resource not found - SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); + var pi = pt.PropertyInfo; + if (pi == null) + { + // pt found, pi not found, this should never happen + SendError(ErrorType.Management, callback, (ushort)ExceptionCode.PropertyNotFound); + return; + } + + if (r.Instance.Applicable(session, ActionType.SetProperty, pt, this) == Ruling.Denied) + { + SendError(ErrorType.Exception, callback, (ushort)ExceptionCode.SetPropertyDenied); + return; + } + + if (!pi.CanWrite) + { + SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ReadOnlyProperty); + return; + } + + var (_, parsed) = Codec.ParseAsync(data, offset, this, null); + + if (parsed is AsyncReply) + { + (parsed as AsyncReply).Then((value) => + { + if (pi.PropertyType.IsGenericType && pi.PropertyType.GetGenericTypeDefinition() + == typeof(PropertyContext<>)) + { + value = Activator.CreateInstance(pi.PropertyType, this, value); + } + else + { + // cast new value type to property type + value = DC.CastConvert(value, pi.PropertyType); + } + + try + { + pi.SetValue(r, value); + SendReply(IIPPacketReply.Completed, callback); + } + catch (Exception ex) + { + SendError(ErrorType.Exception, callback, 0, ex.Message); + } + }); + } + else + { + if (pi.PropertyType.IsGenericType && pi.PropertyType.GetGenericTypeDefinition() + == typeof(PropertyContext<>)) + { + parsed = Activator.CreateInstance(pi.PropertyType, this, parsed); + //value = new DistributedPropertyContext(this, value); + } + else + { + // cast new value type to property type + parsed = DC.CastConvert(parsed, pi.PropertyType); + } + + try + { + pi.SetValue(r, parsed); + SendReply(IIPPacketReply.Completed, callback); + } + catch (Exception ex) + { + SendError(ErrorType.Exception, callback, 0, ex.Message); + } + } } }); } - /// /// Get the ResourceTemplate for a given class Id. /// @@ -1836,15 +1664,14 @@ partial class DistributedConnection var reply = new AsyncReply(); templateRequests.Add(classId, reply); - SendRequest(IIPPacketRequest.TemplateFromClassId) - .AddUUID(classId) - .Done() - .Then((rt) => + SendRequest(IIPPacketRequest.TemplateFromClassId, classId) + .Then((result) => { + var args = (object[])result; templateRequests.Remove(classId); - templates.Add(((TypeTemplate)rt[0]).ClassId, (TypeTemplate)rt[0]); - Warehouse.PutTemplate(rt[0] as TypeTemplate); - reply.Trigger(rt[0]); + templates.Add(((TypeTemplate)args[0]).ClassId, (TypeTemplate)args[0]); + Warehouse.PutTemplate(args[0] as TypeTemplate); + reply.Trigger(args[0]); }).Error((ex) => { reply.TriggerError(ex); @@ -1866,18 +1693,16 @@ partial class DistributedConnection var reply = new AsyncReply(); templateByNameRequests.Add(className, reply); - var classNameBytes = DC.ToBytes(className); - SendRequest(IIPPacketRequest.TemplateFromClassName) - .AddUInt8((byte)classNameBytes.Length) - .AddUInt8Array(classNameBytes) - .Done() - .Then((rt) => + SendRequest(IIPPacketRequest.TemplateFromClassName, className) + .Then((result) => { + var tt = TypeTemplate.Parse((byte[])result); + templateByNameRequests.Remove(className); - templates.Add(((TypeTemplate)rt[0]).ClassId, (TypeTemplate)rt[0]); - Warehouse.PutTemplate(rt[0] as TypeTemplate); - reply.Trigger(rt[0]); + templates.Add(tt.ClassId, tt); + Warehouse.PutTemplate(tt); + reply.Trigger(tt); }).Error((ex) => { reply.TriggerError(ex); @@ -1912,74 +1737,22 @@ partial class DistributedConnection return rt; - - /* - - if (pathRequests.ContainsKey(path)) - return pathRequests[path]; - - var reply = new AsyncReply(); - pathRequests.Add(path, reply); - - var bl = new BinaryList(path); - bl.Insert(0, (ushort)bl.Length); - - SendRequest(IIPPacket.IIPPacketAction.QueryLink, bl.ToArray()).Then((rt) => - { - pathRequests.Remove(path); - //(Guid)rt[0], - Fetch((uint)rt[1]).Then((r) => - { - reply.Trigger(r); - }); - }).Error((ex) => - { - reply.TriggerError(ex); - }); ; - - - return reply; - */ } - ///// - ///// Retrive a resource by its instance Id. - ///// - ///// Instance Id - ///// Resource - //public AsyncReply Retrieve(uint iid) - //{ - // foreach (var r in resources.Values) - // if (r.Instance.Id == iid) - // return new AsyncReply(r); - // return new AsyncReply(null); - //} - - public AsyncReply GetLinkTemplates(string link) { var reply = new AsyncReply(); - var l = DC.ToBytes(link); - SendRequest(IIPPacketRequest.LinkTemplates) - .AddUInt16((ushort)l.Length) - .AddUInt8Array(l) - .Done() - .Then((rt) => + SendRequest(IIPPacketRequest.LinkTemplates, link) + .Then((result) => { - + var templates = new List(); - // parse templates - var tt = (TransmissionType)rt[0]; - var data = (byte[])rt[1]; - //var offset = 0; - for (var offset = tt.Offset; offset < tt.ContentLength;) + + foreach(var template in (byte[][])result) { - var cs = data.GetUInt32(offset, Endian.Little); - offset += 4; - templates.Add(TypeTemplate.Parse(data, offset, cs)); - offset += cs; + templates.Add(TypeTemplate.Parse(template)); } reply.Trigger(templates.ToArray()); @@ -2041,30 +1814,34 @@ partial class DistributedConnection var reply = new AsyncReply(); resourceRequests.Add(id, new DistributedResourceAttachRequestInfo(reply, newSequence)); - SendRequest(IIPPacketRequest.AttachResource) - .AddUInt32(id) - .Done() - .Then((rt) => + SendRequest(IIPPacketRequest.AttachResource, id) + .Then((result) => { - - if (rt == null) + if (result == null) { reply.TriggerError(new AsyncException(ErrorType.Management, (ushort)ExceptionCode.ResourceNotFound, "Null response")); return; } + // ClassId, Age, Link, Hops, PropertyValue[] + var args = (object[])result; + var classId = (UUID)args[0]; + var age = (ulong)args[1]; + var link = (string)args[2]; + var hops = (byte)args[3]; + var pv = (PropertyValue[])args[4]; + DistributedResource dr; TypeTemplate template = null; - UUID classId = (UUID)rt[0]; if (resource == null) { template = Warehouse.GetTemplateByClassId(classId, TemplateType.Resource); if (template?.DefinedType != null && template.IsWrapper) - dr = Activator.CreateInstance(template.DefinedType, this, id, (ulong)rt[1], (string)rt[2]) as DistributedResource; + dr = Activator.CreateInstance(template.DefinedType, this, id, (ulong)args[1], (string)args[2]) as DistributedResource; else - dr = new DistributedResource(this, id, (ulong)rt[1], (string)rt[2]); + dr = new DistributedResource(this, id, (ulong)args[1], (string)args[2]); } else { @@ -2072,28 +1849,46 @@ partial class DistributedConnection template = resource.Instance.Template; } - var transmissionType = (TransmissionType)rt[3]; - var content = (byte[])rt[4]; var initResource = (DistributedResource ok) => { - var (_, parsed) = Codec.Parse(content, 0, this, newSequence, transmissionType); - parsed.Then(results => + var (_, parsed) = Codec.ParseAsync(content, 0, this, newSequence, transmissionType); + + if (parsed is AsyncReply parsedReply) { - var ar = results as object[]; + parsedReply.Then(results => + { + var ar = results as object[]; + + var pvs = new List(); + + for (var i = 0; i < ar.Length; i += 3) + pvs.Add(new PropertyValue(ar[i + 2], Convert.ToUInt64(ar[i]), (DateTime)ar[i + 1])); + + dr._Attach(pvs.ToArray()); + resourceRequests.Remove(id); + // move from needed to attached. + neededResources.Remove(id); + attachedResources[id] = new WeakReference(dr); + reply.Trigger(dr); + }).Error(ex => reply.TriggerError(ex)); + } + else + { + var ar = parsed as object[]; var pvs = new List(); for (var i = 0; i < ar.Length; i += 3) pvs.Add(new PropertyValue(ar[i + 2], Convert.ToUInt64(ar[i]), (DateTime)ar[i + 1])); - dr._Attach(pvs.ToArray());// (PropertyValue[])pvs); + dr._Attach(pvs.ToArray()); resourceRequests.Remove(id); // move from needed to attached. neededResources.Remove(id); attachedResources[id] = new WeakReference(dr); reply.Trigger(dr); - }).Error(ex => reply.TriggerError(ex)); + } }; @@ -2191,135 +1986,9 @@ partial class DistributedConnection return rt; } - public AsyncReply RemoveAttributes(IResource resource, string[] attributes = null) - { - var rt = new AsyncReply(); - if (attributes == null) - SendRequest(IIPPacketRequest.ClearAllAttributes) - .AddUInt32(resource.Instance.Id) - .Done() - .Then(ar => rt.Trigger(true)) - .Error(ex => rt.TriggerError(ex)); - else - { - var attrs = DC.ToBytes(attributes); - SendRequest(IIPPacketRequest.ClearAttributes) - .AddUInt32(resource.Instance.Id) - .AddInt32(attrs.Length) - .AddUInt8Array(attrs) - .Done() - .Then(ar => rt.Trigger(true)) - .Error(ex => rt.TriggerError(ex)); - } - - return rt; - } - - public AsyncReply SetAttributes(IResource resource, Map attributes, bool clearAttributes = false) - { - var rt = new AsyncReply(); - - SendRequest(clearAttributes ? IIPPacketRequest.UpdateAllAttributes : IIPPacketRequest.UpdateAttributes) - .AddUInt32(resource.Instance.Id) - //.AddUInt8Array(Codec.ComposeStructure(attributes, this, true, true, true)) - .AddUInt8Array(Codec.Compose(attributes, this)) - .Done() - .Then(ar => rt.Trigger(true)) - .Error(ex => rt.TriggerError(ex)); - - return rt; - } - - public AsyncReply> GetAttributes(IResource resource, string[] attributes = null) - { - var rt = new AsyncReply>(); - - if (attributes == null) - { - SendRequest(IIPPacketRequest.GetAllAttributes) - .AddUInt32(resource.Instance.Id) - .Done() - .Then(ar => - { - var dataType = (TransmissionType)ar[0]; - var data = (byte[])ar[1]; - //Codec.Parse(d, ) - var (_, parsed) = Codec.Parse(data, 0, this, null, dataType); - parsed.Then(st => - { - - resource.Instance.SetAttributes(st as Map); - - rt.Trigger(st); - }).Error(ex => rt.TriggerError(ex)); - }); - } - else - { - var attrs = DC.ToBytes(attributes); - SendRequest(IIPPacketRequest.GetAttributes) - .AddUInt32(resource.Instance.Id) - .AddInt32(attrs.Length) - .AddUInt8Array(attrs) - .Done() - .Then(ar => - { - var dataType = (TransmissionType)ar[0]; - var data = (byte[])ar[1]; - - var (_, parsed) = Codec.Parse(data, 0, this, null, dataType); - parsed.Then(st => - { - - resource.Instance.SetAttributes((Map)st); - - rt.Trigger(st); - }).Error(ex => rt.TriggerError(ex)); - }); - } - - return rt; - } - - /// - /// Get resource history. - /// - /// IResource. - /// From date. - /// To date. - /// - public AsyncReply> GetRecord(IResource resource, DateTime fromDate, DateTime toDate) - { - if (resource is DistributedResource) - { - var dr = resource as DistributedResource; - - if (dr.DistributedResourceConnection != this) - return new AsyncReply>(null); - - var reply = new AsyncReply>(); - - SendRequest(IIPPacketRequest.ResourceHistory) - .AddUInt32(dr.DistributedResourceInstanceId) - .AddDateTime(fromDate) - .AddDateTime(toDate) - .Done() - .Then(rt => - { - var content = (byte[])rt[0]; - - DataDeserializer.HistoryParser(content, 0, (uint)content.Length, resource, this, null) - .Then((history) => reply.Trigger(history)); - - }).Error((ex) => reply.TriggerError(ex)); - - return reply; - } - else - return new AsyncReply>(null); - } + /// /// Query resources at specific link. /// @@ -2330,23 +1999,10 @@ partial class DistributedConnection var str = DC.ToBytes(path); var reply = new AsyncReply(); - SendRequest(IIPPacketRequest.QueryLink) - .AddUInt16((ushort)str.Length) - .AddUInt8Array(str) - .Done() - .Then(ar => + SendRequest(IIPPacketRequest.Query, path) + .Then(result => { - var dataType = (TransmissionType)ar[0]; - var data = ar[1] as byte[]; - - var (_, parsed) = Codec.Parse(data, dataType.Offset, this, null, dataType); - - parsed.Then(resources => reply.Trigger(resources)) - .Error(ex => reply.TriggerError(ex)); - - //Codec.ParseResourceArray(content, 0, (uint)content.Length, this) - // .Then(resources => reply.Trigger(resources)); - + reply.Trigger((IResource[])result); }).Error(ex => reply.TriggerError(ex)); return reply; @@ -2362,7 +2018,7 @@ partial class DistributedConnection /// Resource attributeds. /// Values for the resource properties. /// New resource instance - public AsyncReply Create(IStore store, IResource parent, string className, object[] parameters, Map attributes, Map values) + public AsyncReply Create(IStore store, IResource parent, string className, Map attributes, Map values) { var reply = new AsyncReply(); var pkt = new BinaryList() @@ -2497,7 +2153,7 @@ partial class DistributedConnection private void Instance_EventOccurred(EventOccurredInfo info) { - if (info.EventTemplate.Listenable) + if (info.EventTemplate.Subscribable) { lock (subscriptionsLock) { @@ -2514,7 +2170,7 @@ partial class DistributedConnection return; // compose the packet - SendEvent(IIPPacketEvent.EventOccurred) + SendEvent(IIPPacketNotification.EventOccurred) .AddUInt32(info.Resource.Instance.Id) .AddUInt8((byte)info.EventTemplate.Index) .AddUInt8Array(Codec.Compose(info.Value, this)) @@ -2523,9 +2179,15 @@ partial class DistributedConnection - void IIPRequestKeepAlive(uint callbackId, DateTime peerTime, uint interval) + void IIPRequestKeepAlive(uint callback, TransmissionType dataType, byte[] data) { + var (offset, length, args) = DataDeserializer.LimitedCountListParser(data, dataType.Offset, + dataType.ContentLength); + + var peerTime = (DateTime)args[0]; + var interval = (uint)args[0]; + uint jitter = 0; var now = DateTime.UtcNow; diff --git a/Esiur/Net/IIP/DistributedResource.cs b/Esiur/Net/IIP/DistributedResource.cs index d345397..6f0deb7 100644 --- a/Esiur/Net/IIP/DistributedResource.cs +++ b/Esiur/Net/IIP/DistributedResource.cs @@ -68,7 +68,7 @@ public class DistributedResource : DynamicObject, IResource, INotifyPropertyChan string link; //ulong age; - //ulong[] ages; + protected object[] properties; internal List parents = new List(); internal List children = new List(); @@ -174,16 +174,31 @@ public class DistributedResource : DynamicObject, IResource, INotifyPropertyChan /// internal PropertyValue[] _Serialize() { - var props = new PropertyValue[properties.Length]; - for (byte i = 0; i < properties.Length; i++) - props[i] = new PropertyValue(properties[i], Instance.GetAge(i), Instance.GetModificationDate(i)); + props[i] = new PropertyValue(properties[i], + Instance.GetAge(i), + Instance.GetModificationDate(i)); return props; } + internal Map _SerializeAfter(ulong age = 0) + { + var rt = new Map(); + + for (byte i = 0; i < properties.Length; i++) + if (Instance.GetAge(i) > age) + rt.Add(i, new PropertyValue(properties[i], + Instance.GetAge(i), + Instance.GetModificationDate(i))); + + + return rt; + } + + internal bool _Attach(PropertyValue[] properties) { if (attached) @@ -568,7 +583,7 @@ public class DistributedResource : DynamicObject, IResource, INotifyPropertyChan protected virtual void EmitPropertyChanged(string name) { - PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(name)); + PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(name)); } ~DistributedResource() diff --git a/Esiur/Net/Packets/IIPPacketNotification.cs b/Esiur/Net/Packets/IIPPacketNotification.cs index 0085c4d..3e34cdf 100644 --- a/Esiur/Net/Packets/IIPPacketNotification.cs +++ b/Esiur/Net/Packets/IIPPacketNotification.cs @@ -6,13 +6,14 @@ namespace Esiur.Net.Packets { public enum IIPPacketNotification : byte { - // Event Manage - ResourceDestroyed = 0, - ResourceReassigned, - ResourceMoved, - SystemFailure, - // Event Invoke - PropertyModified = 0x8, - EventOccurred, + // Notification Invoke + PropertyModified = 0x0, + EventOccurred = 0x1, + + // Notification Manage + ResourceDestroyed = 0x8, + ResourceReassigned = 0x9, + ResourceMoved = 0xA, + SystemFailure = 0xB, } } diff --git a/Esiur/Net/Packets/IIPPacketReply.cs b/Esiur/Net/Packets/IIPPacketReply.cs index 76e85e8..0827d29 100644 --- a/Esiur/Net/Packets/IIPPacketReply.cs +++ b/Esiur/Net/Packets/IIPPacketReply.cs @@ -11,8 +11,8 @@ namespace Esiur.Net.Packets Propagated = 0x1, // Error - Permission = 0x81, - Execution = 0x82, + PermissionError = 0x81, + ExecutionError = 0x82, // Partial Progress = 0x10, diff --git a/Esiur/Net/Packets/IIPPacketRequest.cs b/Esiur/Net/Packets/IIPPacketRequest.cs index c0b13a3..c706aa3 100644 --- a/Esiur/Net/Packets/IIPPacketRequest.cs +++ b/Esiur/Net/Packets/IIPPacketRequest.cs @@ -6,29 +6,30 @@ namespace Esiur.Net.Packets { public enum IIPPacketRequest : byte { - // Request Manage - AttachResource = 0x0, - ReattachResource = 0x1, - DetachResource = 0x2, - CreateResource = 0x3, - DeleteResource = 0x4, - MoveResource = 0x5, + // Request Invoke + InvokeFunction = 0x0, + SetProperty = 0x1, + Subscribe = 0x2, + Unsubscribe = 0x3, // Request Inquire TemplateFromClassName = 0x8, TemplateFromClassId = 0x9, TemplateFromResourceId = 0xA, - QueryLink = 0xB, + Query = 0xB, LinkTemplates = 0xC, Token = 0xD, + GetResourceIdByLink = 0xE, - // Request Invoke - InvokeFunction = 0x10, - Subscribe = 0x11, - Unsubscribe = 0x12, - SetProperty = 0x13, + // Request Manage + AttachResource = 0x10, + ReattachResource = 0x11, + DetachResource = 0x12, + CreateResource = 0x13, + DeleteResource = 0x14, + MoveResource = 0x15, - // Static calling + // Request Static KeepAlive = 0x18, ProcedureCall = 0x19, StaticCall = 0x1A diff --git a/Esiur/Resource/Instance.cs b/Esiur/Resource/Instance.cs index 333fb4d..dd00dd1 100644 --- a/Esiur/Resource/Instance.cs +++ b/Esiur/Resource/Instance.cs @@ -48,6 +48,7 @@ public class Instance List ages = new(); List modificationDates = new(); private ulong instanceAge; + private byte hops; private DateTime instanceModificationDate; uint id; @@ -389,6 +390,16 @@ public class Instance internal set { instanceAge = value; } } + /// + /// Number of nodes to reach the original resource. + /// + public ulong Hops + { + get { return hops; } + internal set { hops = value; } + } + + /// /// Last modification date. /// @@ -443,7 +454,7 @@ public class Instance /// public PropertyValue[] Serialize() { - List props = new List(); + var props = new List(); foreach (var pt in template.Properties) { @@ -459,6 +470,33 @@ public class Instance return props.ToArray(); } + /// + /// Export all properties with ResourceProperty attributed as bytes array after a specific age. + /// + /// + public Map SerializeAfter(ulong age = 0) + { + var props = new Map(); + + foreach (var pt in template.Properties) + { + IResource res; + if (resource.TryGetTarget(out res)) + { + if (res.Instance.GetAge(pt.Index) > age) + { + var rt = pt.PropertyInfo.GetValue(res, null); + props.Add(pt.Index, + new PropertyValue(rt, + ages[pt.Index], + modificationDates[pt.Index])); + } + } + } + + return props; + } + /* public bool Deserialize(byte[] data, uint offset, uint length) { @@ -588,7 +626,7 @@ public class Instance } } - + // internal void EmitResourceEvent(string name, string[] users, DistributedConnection[] connections, object[] args) diff --git a/Esiur/Resource/Warehouse.cs b/Esiur/Resource/Warehouse.cs index bac21df..56d9e9c 100644 --- a/Esiur/Resource/Warehouse.cs +++ b/Esiur/Resource/Warehouse.cs @@ -652,75 +652,51 @@ public static class Warehouse type = ResourceProxy.GetProxy(type); - - /* - if (arguments != null) - { - var constructors = type.GetConstructors(System.Reflection.BindingFlags.Public); - - foreach(var constructor in constructors) - { - var pi = constructor.GetParameters(); - if (pi.Length == constructor.le) - } - - // cast arguments - ParameterInfo[] pi = fi.GetParameters(); - - object[] args = new object[pi.Length]; - - for (var i = 0; i < pi.Length; i++) - { - if (pi[i].ParameterType == typeof(DistributedConnection)) - { - args[i] = this; - } - else if (namedArgs.ContainsKey(pi[i].Name)) - { - args[i] = DC.CastConvert(namedArgs[pi[i].Name], pi[i].ParameterType); - } - } - - constructors[0]. - } - */ var res = Activator.CreateInstance(type) as IResource; if (properties != null) { - var ps = Map.FromObject(properties); - - foreach (var p in ps) + if (properties is Map map) { + var template = GetTemplateByType(type); + foreach(var kvp in map) + template.GetPropertyTemplateByIndex(kvp.Key).PropertyInfo.SetValue(res, kvp.Value); + } + else + { + var ps = Map.FromObject(properties); - var pi = type.GetProperty(p.Key, System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.Instance); - if (pi != null) + foreach (var p in ps) { - if (pi.CanWrite) + var pi = type.GetProperty(p.Key, System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.Instance); + if (pi != null) { - try + if (pi.CanWrite) { - pi.SetValue(res, p.Value); - } - catch (Exception ex) - { - Global.Log(ex); + try + { + pi.SetValue(res, p.Value); + } + catch (Exception ex) + { + Global.Log(ex); + } } } - } - else - { - var fi = type.GetField(p.Key, System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.Instance); - if (fi != null) + else { - try + var fi = type.GetField(p.Key, System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.Instance); + if (fi != null) { - fi.SetValue(res, p.Value); - } - catch (Exception ex) - { - Global.Log(ex); + try + { + fi.SetValue(res, p.Value); + } + catch (Exception ex) + { + Global.Log(ex); + } } } } @@ -729,9 +705,6 @@ public static class Warehouse if (store != null || parent != null || res is IStore) { - //if (!await Put(name, res, store, parent, null, 0, manager, attributes)) - // return null; - await Put(name, res, store, parent, null, 0, manager, attributes); }