From dde86c9af794ccf5579c16cbe90c04411e46924f Mon Sep 17 00:00:00 2001 From: ahmed Date: Tue, 26 Aug 2025 06:12:59 +0300 Subject: [PATCH] Working --- Esiur/Core/AsyncBag.cs | 9 +++-- Esiur/Data/Codec.cs | 24 ++++++------ Esiur/Data/DataDeserializer.cs | 17 ++++++++- Esiur/Data/DataSerializer.cs | 8 +++- Esiur/Data/TransmissionType.cs | 1 + Esiur/Net/IIP/DistributedConnection.cs | 4 +- .../Net/IIP/DistributedConnectionProtocol.cs | 38 +++++++------------ Esiur/Net/Packets/IIPPacket.cs | 4 ++ 8 files changed, 60 insertions(+), 45 deletions(-) diff --git a/Esiur/Core/AsyncBag.cs b/Esiur/Core/AsyncBag.cs index c8def4d..565789b 100644 --- a/Esiur/Core/AsyncBag.cs +++ b/Esiur/Core/AsyncBag.cs @@ -86,14 +86,15 @@ public class AsyncBag : AsyncReply, IAsyncBag for (var i = 0; i < replies.Count; i++) { + var k = replies[i]; var index = i; - if (k is AsyncReply) + if (k is AsyncReply reply) { - (k as AsyncReply).Then((r) => + reply.Then((r) => { - results.SetValue(r, i); + results.SetValue(r, index); count++; if (count == replies.Count) Trigger(results); @@ -101,7 +102,7 @@ public class AsyncBag : AsyncReply, IAsyncBag } else { - results.SetValue(replies[i], i); + results.SetValue(replies[i], index); count++; if (count == replies.Count) Trigger(results); diff --git a/Esiur/Data/Codec.cs b/Esiur/Data/Codec.cs index b493cc6..76dda7b 100644 --- a/Esiur/Data/Codec.cs +++ b/Esiur/Data/Codec.cs @@ -60,30 +60,31 @@ public static class Codec DataDeserializer.ResourceParser8Async, }, new AsyncParser[]{ - DataDeserializer.Int16ParserAsync, DataDeserializer.UInt16ParserAsync, + DataDeserializer.Int16ParserAsync, DataDeserializer.Char16ParserAsync, DataDeserializer.LocalResourceParser16Async, DataDeserializer.ResourceParser16Async, }, - new AsyncParser[]{ - DataDeserializer.Int32ParserAsync, + new AsyncParser[]{ DataDeserializer.UInt32ParserAsync, + DataDeserializer.Int32ParserAsync, DataDeserializer.Float32ParserAsync, DataDeserializer.LocalResourceParser32Async, DataDeserializer.ResourceParser32Async, }, new AsyncParser[]{ - DataDeserializer.Int64ParserAsync, DataDeserializer.UInt64ParserAsync, + DataDeserializer.Int64ParserAsync, DataDeserializer.Float64ParserAsync, DataDeserializer.DateTimeParserAsync, }, new AsyncParser[] - { - DataDeserializer.Int128ParserAsync, // int 128 + { DataDeserializer.UInt128ParserAsync, // uint 128 - DataDeserializer.Float128ParserAsync, + DataDeserializer.Int128ParserAsync, // int 128 + DataDeserializer.Decimal128ParserAsync, + DataDeserializer.UUIDParserAsync } }; @@ -146,7 +147,8 @@ public static class Codec { DataDeserializer.UInt128Parser, // uint 128 DataDeserializer.Int128Parser, // int 128 - DataDeserializer.Float128Parser, + DataDeserializer.Decimal128Parser, + DataDeserializer.UUIDParser } }; @@ -197,11 +199,11 @@ public static class Codec var tt = dataType.Value; - Console.WriteLine("Parsing " + tt.Class + " " + tt.Identifier); + //Console.WriteLine("Parsing " + tt.Class + " " + tt.Identifier); if (tt.Class == TransmissionTypeClass.Fixed) { - return (len, FixedAsyncParsers[tt.Exponent][tt.Index](data, dataType.Value.Offset, (uint)tt.ContentLength, connection, requestSequence)); + return (len, FixedAsyncParsers[tt.Exponent][tt.Index](data, dataType.Value.Offset, (uint)tt.ContentLength, connection, requestSequence)); } else if (tt.Class == TransmissionTypeClass.Dynamic) { @@ -302,7 +304,7 @@ public static class Codec [typeof(List)] = DataSerializer.RawDataComposerFromList, //[typeof(List)] = DataSerializer.RawDataComposerFromList, [typeof(string)] = DataSerializer.StringComposer, - + [typeof(UUID)] = DataSerializer.UUIDComposer, // Special [typeof(object[])] = DataSerializer.ListComposer, [typeof(List)] = DataSerializer.ListComposer, diff --git a/Esiur/Data/DataDeserializer.cs b/Esiur/Data/DataDeserializer.cs index 74271d7..71dfa4d 100644 --- a/Esiur/Data/DataDeserializer.cs +++ b/Esiur/Data/DataDeserializer.cs @@ -169,18 +169,29 @@ public static class DataDeserializer } - public static unsafe object Float128ParserAsync(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) + public static unsafe object Decimal128ParserAsync(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { fixed (byte* ptr = &data[offset]) return *(decimal*)ptr; } - public static unsafe object Float128Parser(byte[] data, uint offset, uint length, Warehouse warehouse) + public static unsafe object Decimal128Parser(byte[] data, uint offset, uint length, Warehouse warehouse) { fixed (byte* ptr = &data[offset]) return *(decimal*)ptr; } + public static unsafe object UUIDParserAsync(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) + { + return new UUID(data, offset); + } + + public static unsafe object UUIDParser(byte[] data, uint offset, uint length, Warehouse warehouse) + { + return new UUID(data, offset); + } + + public static unsafe object Int128ParserAsync(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { @@ -962,6 +973,8 @@ public static class DataDeserializer { var (cs, reply) = Codec.ParseAsync(data, offset, connection, requestSequence); + + rt.Add(reply); if (cs > 0) diff --git a/Esiur/Data/DataSerializer.cs b/Esiur/Data/DataSerializer.cs index a6f6f0d..75333ce 100644 --- a/Esiur/Data/DataSerializer.cs +++ b/Esiur/Data/DataSerializer.cs @@ -114,8 +114,6 @@ public static class DataSerializer public static (TransmissionTypeIdentifier, byte[]) EnumComposer(object value, DistributedConnection connection) { - Console.WriteLine(value.GetType().Name); - if (value == null) return (TransmissionTypeIdentifier.Null, new byte[0]); @@ -391,6 +389,12 @@ public static class DataSerializer return (TransmissionTypeIdentifier.Map, rt.ToArray()); } + public static unsafe (TransmissionTypeIdentifier, byte[]) UUIDComposer(object value, DistributedConnection connection) + { + return (TransmissionTypeIdentifier.UUID, ((UUID)value).Data); + + } + public static unsafe (TransmissionTypeIdentifier, byte[]) RecordComposer(object value, DistributedConnection connection) { var rt = new List();// BinaryList(); diff --git a/Esiur/Data/TransmissionType.cs b/Esiur/Data/TransmissionType.cs index 92efc66..fa2068d 100644 --- a/Esiur/Data/TransmissionType.cs +++ b/Esiur/Data/TransmissionType.cs @@ -39,6 +39,7 @@ public enum TransmissionTypeIdentifier : byte UInt128 = 0x28, Int128 = 0x29, Decimal128 = 0x2A, + UUID = 0x2B, RawData = 0x40, String = 0x41, diff --git a/Esiur/Net/IIP/DistributedConnection.cs b/Esiur/Net/IIP/DistributedConnection.cs index aebfe02..99032f5 100644 --- a/Esiur/Net/IIP/DistributedConnection.cs +++ b/Esiur/Net/IIP/DistributedConnection.cs @@ -385,7 +385,7 @@ public partial class DistributedConnection : NetworkConnection, IStore SendRequest(IIPPacketRequest.KeepAlive, now, interval) .Then(x => { - Jitter = (uint)x; + Jitter = (uint)((object[])x)[1]; keepAliveTimer.Start(); }).Error(ex => { @@ -430,7 +430,7 @@ public partial class DistributedConnection : NetworkConnection, IStore if (packet.DataType == null) return offset; - Console.WriteLine("Incoming: " + packet); + //Console.WriteLine("Incoming: " + packet); if (packet.Method == IIPPacketMethod.Notification) { var dt = packet.DataType.Value; diff --git a/Esiur/Net/IIP/DistributedConnectionProtocol.cs b/Esiur/Net/IIP/DistributedConnectionProtocol.cs index 15334b1..b33f9b3 100644 --- a/Esiur/Net/IIP/DistributedConnectionProtocol.cs +++ b/Esiur/Net/IIP/DistributedConnectionProtocol.cs @@ -78,7 +78,7 @@ partial class DistributedConnection /// AsyncReply SendRequest(IIPPacketRequest action, params object[] args) { - var reply = new AsyncReply(); + var reply = new AsyncReply(); var c = callbackCounter++; // avoid thread racing requests.Add(c, reply); @@ -547,7 +547,7 @@ partial class DistributedConnection r.Instance.Age, r.Instance.Link, r.Instance.Hops, - dr._Serialize(), this); + dr._Serialize()); } else { @@ -557,7 +557,7 @@ partial class DistributedConnection r.Instance.Age, r.Instance.Link, r.Instance.Hops, - r.Instance.Serialize(), this); + r.Instance.Serialize()); } // subscribe @@ -852,7 +852,7 @@ partial class DistributedConnection var (_, value) = Codec.ParseSync(data, 0, Instance.Warehouse, dataType); - var classId = new UUID((byte[])value); + var classId = (UUID)value; var t = Instance.Warehouse.GetTemplateByClassId(classId); @@ -1637,11 +1637,11 @@ partial class DistributedConnection SendRequest(IIPPacketRequest.TemplateFromClassId, classId) .Then((result) => { - var args = (object[])result; + var tt = TypeTemplate.Parse((byte[])result); templateRequests.Remove(classId); - templates.Add(((TypeTemplate)args[0]).ClassId, (TypeTemplate)args[0]); - Instance.Warehouse.PutTemplate(args[0] as TypeTemplate); - reply.Trigger(args[0]); + templates.Add(tt.ClassId, tt); + Instance.Warehouse.PutTemplate(tt); + reply.Trigger(tt); }).Error((ex) => { reply.TriggerError(ex); @@ -1766,30 +1766,22 @@ partial class DistributedConnection { if (resource != null && (requestSequence?.Contains(id) ?? false)) { - Console.WriteLine("Fetching DL " + id); - // dead lock avoidance for loop reference. return new AsyncReply(resource); } else if (resource != null && requestInfo.RequestSequence.Contains(id)) { - Console.WriteLine("Fetching DL2 " + id); - // dead lock avoidance for dependent reference. return new AsyncReply(resource); } else { - Console.WriteLine("Fetching DL3 " + id); - return requestInfo.Reply; } } else if (resource != null && !resource.DistributedResourceSuspended) { // @REVIEW: this should never happen - Console.WriteLine("Fetching DLWWW " + id); - Global.Log("DCON", LogType.Error, "Resource not moved to attached."); return new AsyncReply(resource); @@ -1799,7 +1791,6 @@ partial class DistributedConnection var reply = new AsyncReply(); resourceRequests.Add(id, new DistributedResourceAttachRequestInfo(reply, newSequence)); - Console.WriteLine("Fetching " + id); SendRequest(IIPPacketRequest.AttachResource, id) .Then((result) => @@ -1819,7 +1810,6 @@ partial class DistributedConnection var hops = (byte)args[3]; var pvData = (byte[])args[4]; - Console.WriteLine("Fetching CL " + id); DistributedResource dr; TypeTemplate template = null; @@ -1846,14 +1836,14 @@ partial class DistributedConnection parsedReply.Then(results => { - var ar = results as object[]; + var pvs = results as PropertyValue[]; - var pvs = new List(); + //var pvs = new List(); - for (var i = 0; i < ar.Length; i += 3) - pvs.Add(new PropertyValue(ar[i + 2], Convert.ToUInt64(ar[i]), (DateTime)ar[i + 1])); + //for (var i = 0; i < ar.Length; i += 3) + // pvs.Add(new PropertyValue(ar[i + 2], Convert.ToUInt64(ar[i]), (DateTime)ar[i + 1])); - dr._Attach(pvs.ToArray()); + dr._Attach(pvs); resourceRequests.Remove(id); // move from needed to attached. neededResources.Remove(id); @@ -2085,7 +2075,7 @@ partial class DistributedConnection jitter = (uint)Math.Abs((int)diff - (int)interval); } - SendRequest(IIPPacketRequest.KeepAlive, now, jitter); + SendReply(IIPPacketReply.Completed, callback, now, jitter); lastKeepAliveReceived = now; } diff --git a/Esiur/Net/Packets/IIPPacket.cs b/Esiur/Net/Packets/IIPPacket.cs index 90df85c..07ee450 100644 --- a/Esiur/Net/Packets/IIPPacket.cs +++ b/Esiur/Net/Packets/IIPPacket.cs @@ -131,6 +131,10 @@ class IIPPacket : Packet offset += (uint)size; } + else + { + DataType = null; + } return offset - originalOffset; }