diff --git a/Esiur/Core/AsyncBag.cs b/Esiur/Core/AsyncBag.cs index ab596e6..28a9d1f 100644 --- a/Esiur/Core/AsyncBag.cs +++ b/Esiur/Core/AsyncBag.cs @@ -38,8 +38,8 @@ interface IAsyncBag public class AsyncBag : AsyncReply, IAsyncBag { - protected List replies = new List(); - List results = new(); + protected List replies = new List(); + //List results = new(); int count = 0; bool sealedBag = false; @@ -75,62 +75,54 @@ public class AsyncBag : AsyncReply, IAsyncBag sealedBag = true; - if (results.Count == 0) + var results = ArrayType == null ? new T[replies.Count] + : Array.CreateInstance(ArrayType, replies.Count); + + if (replies.Count == 0) { - if (ArrayType != null) + Trigger(results); + return; + } + + for (var i = 0; i < replies.Count; i++) + { + var k = replies[i]; + var index = i; + + if (k is AsyncReply) { - var ar = Array.CreateInstance(ArrayType, 0); - Trigger(ar); + (k as AsyncReply).Then((r) => + { + results.SetValue(r, i); + count++; + if (count == replies.Count) + Trigger(results); + }); } else { - Trigger(new object[0]); - } - } - - for (var i = 0; i < results.Count; i++) - //foreach(var reply in results.Keys) - { - var k = replies[i];// results.Keys.ElementAt(i); - var index = i; - - k.Then((r) => - { - results[index] = (T)r; + results.SetValue(replies[i], i); count++; - if (count == results.Count) - { - if (ArrayType != null) - { - try - { - // @TODO: Safe casting check - var ar = Array.CreateInstance(ArrayType, count); - for (var i = 0; i < count; i++) - ar.SetValue(results[i], i); - Trigger(ar); - } - catch - { - Trigger(results.ToArray()); - } - } - else - Trigger(results.ToArray()); - } - }); + if (count == replies.Count) + Trigger(results); + } } } - public void Add(AsyncReply reply) + + public void Add(object valueOrReply) { if (!sealedBag) { - results.Add(default(T)); - replies.Add(reply); + //if (valueOrReply is AsyncReply) + //{ + // results.Add(default(T)); + replies.Add(valueOrReply); + //} } } + public void AddBag(AsyncBag bag) { foreach (var r in bag.replies) diff --git a/Esiur/Data/Codec.cs b/Esiur/Data/Codec.cs index 213d5e6..19b1c50 100644 --- a/Esiur/Data/Codec.cs +++ b/Esiur/Data/Codec.cs @@ -38,43 +38,47 @@ namespace Esiur.Data; public static class Codec { - - delegate AsyncReply AsyncParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence); + + //delegate AsyncReply AsyncParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence); delegate object SyncParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence); - static AsyncParser[][] FixedParsers = new AsyncParser[][] + static SyncParser[][] FixedParsers = new SyncParser[][] { - new AsyncParser[]{ + new SyncParser[]{ DataDeserializer.NullParser, DataDeserializer.BooleanFalseParser, DataDeserializer.BooleanTrueParser, DataDeserializer.NotModifiedParser, }, - new AsyncParser[]{ + new SyncParser[]{ DataDeserializer.ByteParser, DataDeserializer.SByteParser, DataDeserializer.Char8Parser, + DataDeserializer.LocalResourceParser8, + DataDeserializer.ResourceParser8, }, - new AsyncParser[]{ + new SyncParser[]{ DataDeserializer.Int16Parser, DataDeserializer.UInt16Parser, DataDeserializer.Char16Parser, + DataDeserializer.LocalResourceParser16, + DataDeserializer.ResourceParser16, }, - new AsyncParser[]{ + new SyncParser[]{ DataDeserializer.Int32Parser, DataDeserializer.UInt32Parser, DataDeserializer.Float32Parser, - DataDeserializer.ResourceParser, - DataDeserializer.LocalResourceParser, + DataDeserializer.LocalResourceParser32, + DataDeserializer.ResourceParser32, }, - new AsyncParser[]{ + new SyncParser[]{ DataDeserializer.Int64Parser, DataDeserializer.UInt64Parser, DataDeserializer.Float64Parser, DataDeserializer.DateTimeParser, }, - new AsyncParser[] + new SyncParser[] { DataDeserializer.Int128Parser, // int 128 DataDeserializer.UInt128Parser, // uint 128 @@ -82,7 +86,7 @@ public static class Codec } }; - static AsyncParser[] DynamicParsers = new AsyncParser[] + static SyncParser[] DynamicParsers = new SyncParser[] { DataDeserializer.RawDataParser, DataDeserializer.StringParser, @@ -91,7 +95,7 @@ public static class Codec DataDeserializer.RecordListParser, }; - static AsyncParser[] TypedParsers = new AsyncParser[] + static SyncParser[] TypedParsers = new SyncParser[] { DataDeserializer.RecordParser, DataDeserializer.TypedListParser, @@ -111,7 +115,7 @@ public static class Codec /// DistributedConnection is required in case a structure in the array holds items at the other end. /// DataType, in case the data is not prepended with DataType /// Value - public static (uint, AsyncReply) ParseAsync(byte[] data, uint offset, DistributedConnection connection, uint[] requestSequence, TransmissionType? dataType = null) + public static (uint, object) ParseAsync(byte[] data, uint offset, DistributedConnection connection, uint[] requestSequence, TransmissionType? dataType = null) { uint len = 0; @@ -144,10 +148,6 @@ public static class Codec } } - public static uint ParseSync(byte[] data, uint offset, TransmissionType? dataType = null) - { - - } /// /// Check if a resource is local to a given connection. @@ -206,7 +206,7 @@ public static class Codec [typeof(List)] = DataSerializer.RawDataComposerFromList, //[typeof(List)] = DataSerializer.RawDataComposerFromList, [typeof(string)] = DataSerializer.StringComposer, - + // Special [typeof(object[])] = DataSerializer.ListComposer,// DataSerializer.ListComposerFromArray, [typeof(List)] = DataSerializer.ListComposer,// DataSerializer.ListComposerFromList, @@ -348,7 +348,7 @@ public static class Codec return TransmissionType.Compose(hdr, data); //} - } + } else if (type.IsEnum) { var (hdr, data) = DataSerializer.EnumComposer(valueOrSource, connection); diff --git a/Esiur/Data/DataDeserializer.cs b/Esiur/Data/DataDeserializer.cs index 320dcb3..c644d7a 100644 --- a/Esiur/Data/DataDeserializer.cs +++ b/Esiur/Data/DataDeserializer.cs @@ -72,7 +72,7 @@ public static class DataDeserializer return *(int*)ptr; } - public static unsafe uint UInt32Parser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) + public static unsafe object UInt32Parser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { fixed (byte* ptr = &data[offset]) return *(uint*)ptr; @@ -129,17 +129,56 @@ public static class DataDeserializer } - - public static unsafe object ResourceParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) + public static unsafe object ResourceParser8(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { - fixed (byte* ptr = &data[offset]) - return connection.Fetch(*(uint*)ptr, requestSequence); + if (connection == null) + return new ResourceId(false, data[offset]); + else + return connection.Fetch(data[offset], requestSequence); } - public static unsafe object LocalResourceParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) + public static unsafe object LocalResourceParser8(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) + { + if (connection == null) + return new ResourceId(true, data[offset]); + else + return Warehouse.GetById(data[offset]); + } + + public static unsafe object ResourceParser16(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { fixed (byte* ptr = &data[offset]) - return Warehouse.GetById(*(uint*)ptr); + if (connection == null) + return new ResourceId(false, *(ushort*)ptr); + else + return connection.Fetch(*(ushort*)ptr, requestSequence); + } + + public static unsafe object LocalResourceParser16(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) + { + fixed (byte* ptr = &data[offset]) + if (connection == null) + return new ResourceId(true, *(ushort*)ptr); + else + return Warehouse.GetById(*(ushort*)ptr); + } + + public static unsafe object ResourceParser32(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) + { + fixed (byte* ptr = &data[offset]) + if (connection == null) + return new ResourceId(false, *(uint*)ptr); + else + return connection.Fetch(*(uint*)ptr, requestSequence); + } + + public static unsafe object LocalResourceParser32(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) + { + fixed (byte* ptr = &data[offset]) + if (connection == null) + return new ResourceId(true, *(uint*)ptr); + else + return Warehouse.GetById(*(uint*)ptr); } diff --git a/Esiur/Data/ResourceId.cs b/Esiur/Data/ResourceId.cs new file mode 100644 index 0000000..5b7cdd4 --- /dev/null +++ b/Esiur/Data/ResourceId.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Esiur.Data +{ + public struct ResourceId + { + public bool Local; + public uint Id; + + public ResourceId(bool local, uint id) + { + this.Id = id; + this.Local = local; + } + } +} diff --git a/Esiur/Data/TransmissionType.cs b/Esiur/Data/TransmissionType.cs index 1ff3584..92efc66 100644 --- a/Esiur/Data/TransmissionType.cs +++ b/Esiur/Data/TransmissionType.cs @@ -14,21 +14,31 @@ public enum TransmissionTypeIdentifier : byte UInt8 = 0x8, Int8 = 0x9, Char8 = 0xA, - Int16 = 0x10, - UInt16 = 0x11, + LocalResource8 = 0xB, + RemoteResource8 = 0xC, + LocalProcedure8 = 0xD, + RemoteProcedure8 = 0xE, + UInt16 = 0x10, + Int16 = 0x11, Char16 = 0x12, - Int32 = 0x18, - UInt32 = 0x19, + LocalResource16 = 0x13, + RemoteResource16 = 0x14, + LocalProcedure16 = 0x15, + RemoteProcedure16 = 0x16, + UInt32 = 0x18, + Int32 = 0x19, Float32 = 0x1A, - Resource = 0x1B, - ResourceLocal = 0x1C, - Int64 = 0x20, - UInt64 = 0x21, + LocalResource32 = 0x1B, + RemoteResource32 = 0x1C, + LocalProcedure32 = 0x1D, + RemoteProcedure32 = 0x1E, + UInt64 = 0x20, + Int64 = 0x21, Float64 = 0x22, DateTime = 0x23, - Int128 = 0x28, - UInt128 = 0x29, - Float128 = 0x2A, + UInt128 = 0x28, + Int128 = 0x29, + Decimal128 = 0x2A, RawData = 0x40, String = 0x41, @@ -37,17 +47,15 @@ public enum TransmissionTypeIdentifier : byte RecordList = 0x44, Map = 0x45, MapList = 0x46, - //Tuple = 0x47, Record = 0x80, TypedList = 0x81, TypedMap = 0x82, - Tuple = 0x83, - Enum = 0x84, - Constant = 0x85 - //TypedResourceList = 0x81, - //TypedRecordList = 0x82, + TypedTuple = 0x83, + TypedEnum = 0x84, + TypedConstant = 0x85, + ResourceLink = 0xC0 } public enum TransmissionTypeClass diff --git a/Esiur/Net/IIP/DistributedConnectionProtocol.cs b/Esiur/Net/IIP/DistributedConnectionProtocol.cs index c5c98d8..77340dd 100644 --- a/Esiur/Net/IIP/DistributedConnectionProtocol.cs +++ b/Esiur/Net/IIP/DistributedConnectionProtocol.cs @@ -2152,7 +2152,7 @@ partial class DistributedConnection var dataType = (TransmissionType)ar[0]; var data = (byte[])ar[1]; - var (_, parsed) = Codec.Parse(data, dataType.Offset, this, null, dataType); + var (_, parsed) = Codec.ParseAsync(data, dataType.Offset, this, null, dataType); parsed.Then(resources => rt.Trigger(resources)) .Error(ex => rt.TriggerError(ex));