From e2e4ac90bc2fe11f8d775c68c2318a240788687f Mon Sep 17 00:00:00 2001 From: ahmed Date: Sat, 8 Nov 2025 12:36:22 +0300 Subject: [PATCH] test complete --- Esiur/Core/AsyncBag.cs | 10 +- Esiur/Core/AsyncReply.cs | 15 +- Esiur/Core/AsyncReplyGeneric.cs | 7 +- Esiur/Data/DataConverter.cs | 2 +- Esiur/Data/DataDeserializer.cs | 232 +++++++++++------- Esiur/Data/DataSerializer.cs | 46 ++-- Esiur/Data/TDU.cs | 3 +- Esiur/Net/IIP/DistributedConnection.cs | 7 +- .../Net/IIP/DistributedConnectionProtocol.cs | 19 +- Test/MyService.cs | 6 +- Test/Program.cs | 12 +- 11 files changed, 238 insertions(+), 121 deletions(-) diff --git a/Esiur/Core/AsyncBag.cs b/Esiur/Core/AsyncBag.cs index a6bf245..0bacc16 100644 --- a/Esiur/Core/AsyncBag.cs +++ b/Esiur/Core/AsyncBag.cs @@ -50,6 +50,14 @@ public class AsyncBag : AsyncReply, IAsyncBag public AsyncBag Then(Action callback) { + //if (!sealedBag && !resultReady) + // throw new Exception("Not sealed"); + + Timeout(6000, () => + { + Console.WriteLine("Timeout " + count + this.Result); + }); + base.Then(new Action(o => callback((T[])o))); return this; } @@ -99,7 +107,7 @@ public class AsyncBag : AsyncReply, IAsyncBag count++; if (count == replies.Count) Trigger(results); - }); + }).Error(e => TriggerError(e)); } else { diff --git a/Esiur/Core/AsyncReply.cs b/Esiur/Core/AsyncReply.cs index 7b7491c..be5fd27 100644 --- a/Esiur/Core/AsyncReply.cs +++ b/Esiur/Core/AsyncReply.cs @@ -86,10 +86,12 @@ public class AsyncReply return result; } - + //int timeoutMilliseconds = 0; public AsyncReply Timeout(int milliseconds, Action callback = null) { + //timeoutMilliseconds = milliseconds; + Task.Delay(milliseconds).ContinueWith(x => { if (!resultReady && exception == null) @@ -131,8 +133,15 @@ public class AsyncReply } - public AsyncReply Then(Action callback) + protected string codePath, codeMethod; + protected int codeLine; + + public AsyncReply Then(Action callback, [CallerMemberName] string methodName = null, [CallerFilePath] string filePath = null, [CallerLineNumber] int lineNumber = 0) { + if (codeLine == 0) + { + codeLine = lineNumber; codeMethod = methodName; codePath = filePath; + } //lock (callbacksLock) //{ lock (asyncLock) @@ -165,8 +174,6 @@ public class AsyncReply //if (Debug) // Console.WriteLine($"AsyncReply: {Id} Then pending"); - - callbacks.Add(callback); return this; diff --git a/Esiur/Core/AsyncReplyGeneric.cs b/Esiur/Core/AsyncReplyGeneric.cs index 5be91e1..031e6e5 100644 --- a/Esiur/Core/AsyncReplyGeneric.cs +++ b/Esiur/Core/AsyncReplyGeneric.cs @@ -39,8 +39,13 @@ namespace Esiur.Core; public class AsyncReply : AsyncReply { - public AsyncReply Then(Action callback) + public AsyncReply Then(Action callback, [CallerMemberName] string methodName = null, [CallerFilePath] string filePath = null, [CallerLineNumber] int lineNumber = 0) { + if (base.codeLine == 0) + { + base.codeLine = lineNumber; base.codeMethod = methodName; base.codePath = filePath; + } + base.Then((x) => callback((T)x)); return this; } diff --git a/Esiur/Data/DataConverter.cs b/Esiur/Data/DataConverter.cs index c9c78c1..527703c 100644 --- a/Esiur/Data/DataConverter.cs +++ b/Esiur/Data/DataConverter.cs @@ -846,7 +846,7 @@ public static class DC // Data Converter public static byte[] Clip(this byte[] data, uint offset, uint length) { if (data.Length < offset + length) - return null; + throw new ArgumentException("Length exceeds array boundary."); // if (length == data.Length && offset == 0) // return data.ToArray(); diff --git a/Esiur/Data/DataDeserializer.cs b/Esiur/Data/DataDeserializer.cs index 75999c9..739b4f1 100644 --- a/Esiur/Data/DataDeserializer.cs +++ b/Esiur/Data/DataDeserializer.cs @@ -464,6 +464,10 @@ public static class DataDeserializer rt.Trigger(record); } + }).Error(e => + { + Console.WriteLine(e); + rt.TriggerError(e); }); }; @@ -817,22 +821,71 @@ public static class DataDeserializer public static AsyncBag ListParserAsync(ParsedTDU tdu, DistributedConnection connection, uint[] requestSequence) { + //var rt = new AsyncBag(); + + + //var offset = tdu.Offset; + //var length = tdu.ContentLength; + + //while (length > 0) + //{ + // var (cs, reply) = Codec.ParseAsync(tdu.Data, offset, connection, requestSequence); + + // rt.Add(reply); + + // if (cs > 0) + // { + // offset += (uint)cs; + // length -= (uint)cs; + // } + // else + // throw new Exception("Error while parsing structured data"); + + //} + + //rt.Seal(); + //return rt; + + + + + var rt = new AsyncBag(); + //var list = new List(); + + ParsedTDU current; + ParsedTDU? previous = null; var offset = tdu.Offset; var length = tdu.ContentLength; + var ends = offset + (uint)length; while (length > 0) { - var (cs, reply) = Codec.ParseAsync(tdu.Data, offset, connection, requestSequence); + current = ParsedTDU.Parse(tdu.Data, offset, ends); + + if (current.Class == TDUClass.Invalid) + throw new Exception("Unknown type."); + + + if (current.Identifier == TDUIdentifier.TypeContinuation) + { + current.Class = previous.Value.Class; + current.Identifier = previous.Value.Identifier; + current.Metadata = previous.Value.Metadata; + } + + + var (cs, reply) = Codec.ParseAsync(current, connection, requestSequence); rt.Add(reply); if (cs > 0) { - offset += (uint)cs; - length -= (uint)cs; + offset += (uint)current.TotalLength; + length -= (uint)current.TotalLength; + previous = current; } else throw new Exception("Error while parsing structured data"); @@ -840,7 +893,9 @@ public static class DataDeserializer } rt.Seal(); + return rt; + } public static object ListParser(ParsedTDU tdu, Warehouse warehouse) @@ -876,8 +931,8 @@ public static class DataDeserializer if (cs > 0) { - offset += (uint)cs; - length -= (uint)cs; + offset += (uint)current.TotalLength; + length -= (uint)current.TotalLength; previous = current; } else @@ -887,35 +942,13 @@ public static class DataDeserializer return list.ToArray(); - - - //var rt = new List(); - - //var offset = tdu.Offset; - //var length = tdu.ContentLength; - - //while (length > 0) - //{ - // var (cs, reply) = Codec.ParseSync(tdu.Data, offset, warehouse); - - // rt.Add(reply); - - // if (cs > 0) - // { - // offset += (uint)cs; - // length -= (uint)cs; - // } - // else - // throw new Exception("Error while parsing structured data"); - - //} - - //return rt.ToArray(); } public static (uint, ulong, object[]) LimitedCountListParser(byte[] data, uint offset, ulong length, Warehouse warehouse, uint countLimit = uint.MaxValue) { + + // @TODO: add TypeContinuation var rt = new List(); while (length > 0 && rt.Count < countLimit) @@ -953,7 +986,6 @@ public static class DataDeserializer valuesTru.GetRuntimeType(connection.Instance.Warehouse))); - var keysTdu = ParsedTDU.Parse(tdu.Data, tdu.Offset, (uint)(tdu.Offset + tdu.ContentLength)); @@ -971,8 +1003,9 @@ public static class DataDeserializer { for (var i = 0; i < ((Array)keys).Length; i++) map.Add(((Array)keys).GetValue(i), ((Array)values).GetValue(i)); - }); - }); + rt.Trigger(map); + }).Error(e => rt.TriggerError(e)); + }).Error(e => rt.TriggerError(e)); return rt; @@ -1132,73 +1165,70 @@ public static class DataDeserializer map.Add(keys.GetValue(i), values.GetValue(i)); return map; - //var results = new List(); - - //var offset = tdu.Offset; - //var length = tdu.ContentLength; - - //while (length > 0) - //{ - // var (cs, reply) = Codec.ParseSync(tdu.Data, offset, warehouse); - - - // results.Add(reply); - - // if (cs > 0) - // { - // offset += (uint)cs; - // length -= (uint)cs; - // } - // else - // throw new Exception("Error while parsing structured data"); - - //} - - //for (var i = 0; i < results.Count; i += 2) - // map.Add(results[i], results[i + 1]); - - //return map; + } public static AsyncReply TupleParserAsync(ParsedTDU tdu, DistributedConnection connection, uint[] requestSequence) { + var rt = new AsyncReply(); - var results = new AsyncBag(); - var tupleSize = tdu.Metadata[0]; + // var tupleSize = tdu.Metadata[0]; - var types = new List(); + var trus = new List(); - uint mtOffset = 1; - for (var i = 0; i < tupleSize; i++) + uint mtOffset = 0; + + while (mtOffset < tdu.Metadata.Length) { - var (cs, rep) = TRU.Parse(tdu.Metadata, mtOffset); - types.Add(rep.GetRuntimeType(connection.Instance.Warehouse)); + var (cs, tru) = TRU.Parse(tdu.Metadata, mtOffset); + trus.Add(tru); mtOffset += cs; } - var length = tdu.ContentLength; + var results = new AsyncBag(); + var types = trus.Select(x => x.GetRuntimeType(connection.Instance.Warehouse)).ToArray(); + + ParsedTDU current; + ParsedTDU? previous = null; + var offset = tdu.Offset; + var length = tdu.ContentLength; + var ends = offset + (uint)length; - while (length > 0) + for (var i = 0; i < trus.Count; i++) { - var (cs, reply) = Codec.ParseAsync(tdu.Data, offset, connection, requestSequence); + current = ParsedTDU.Parse(tdu.Data, offset, ends); + + if (current.Class == TDUClass.Invalid) + throw new Exception("Unknown type."); + + if (current.Identifier == TDUIdentifier.TypeOfTarget) + { + var (idf, mt) = trus[i].GetMetadata(); + current.Class = TDUClass.Typed; + current.Identifier = idf; + current.Metadata = mt; + current.Index = (int)idf & 0x7; + } + + var (_, reply) = Codec.ParseAsync(current, connection, requestSequence); results.Add(reply); - if (cs > 0) + if (current.TotalLength > 0) { - offset += (uint)cs; - length -= (uint)cs; + offset += (uint)current.TotalLength; + length -= (uint)current.TotalLength; } else throw new Exception("Error while parsing structured data"); } - results.Seal(); + results.Seal(); results.Then(ar => { @@ -1239,32 +1269,53 @@ public static class DataDeserializer public static object TupleParser(ParsedTDU tdu, Warehouse warehouse) { - var results = new List(); var tupleSize = tdu.Metadata[0]; - var types = new List(); + var trus = new List(); uint mtOffset = 1; for (var i = 0; i < tupleSize; i++) { - var (cs, rep) = TRU.Parse(tdu.Metadata, mtOffset); - types.Add(rep.GetRuntimeType(warehouse)); + var (cs, tru) = TRU.Parse(tdu.Metadata, mtOffset); + trus.Add(tru); mtOffset += cs; } - var length = tdu.ContentLength; - var offset = tdu.Offset; + var results = new List(); + var types = trus.Select(x => x.GetRuntimeType(warehouse)).ToArray(); - while (length > 0) + ParsedTDU current; + ParsedTDU? previous = null; + + var offset = tdu.Offset; + var length = tdu.ContentLength; + var ends = offset + (uint)length; + + + for (var i = 0; i < tupleSize; i++) { - var (cs, reply) = Codec.ParseSync(tdu.Data, offset, warehouse); + current = ParsedTDU.Parse(tdu.Data, offset, ends); + + if (current.Class == TDUClass.Invalid) + throw new Exception("Unknown type."); + + if (current.Identifier == TDUIdentifier.TypeOfTarget) + { + var (idf, mt) = trus[i].GetMetadata(); + current.Class = TDUClass.Typed; + current.Identifier = idf; + current.Metadata = mt; + current.Index = (int)idf & 0x7; + } + + var (_, reply) = Codec.ParseSync(current, warehouse); results.Add(reply); - if (cs > 0) + if (current.TotalLength > 0) { - offset += (uint)cs; - length -= (uint)cs; + offset += (uint)current.TotalLength; + length -= (uint)current.TotalLength; } else throw new Exception("Error while parsing structured data"); @@ -1274,32 +1325,32 @@ public static class DataDeserializer if (results.Count == 2) { - var type = typeof(ValueTuple<,>).MakeGenericType(types.ToArray()); + var type = typeof(ValueTuple<,>).MakeGenericType(types); return Activator.CreateInstance(type, results[0], results[1]); } else if (results.Count == 3) { - var type = typeof(ValueTuple<,,>).MakeGenericType(types.ToArray()); + var type = typeof(ValueTuple<,,>).MakeGenericType(types); return Activator.CreateInstance(type, results[0], results[1], results[2]); } else if (results.Count == 4) { - var type = typeof(ValueTuple<,,,>).MakeGenericType(types.ToArray()); + var type = typeof(ValueTuple<,,,>).MakeGenericType(types); return Activator.CreateInstance(type, results[0], results[1], results[2], results[3]); } else if (results.Count == 5) { - var type = typeof(ValueTuple<,,,,>).MakeGenericType(types.ToArray()); + var type = typeof(ValueTuple<,,,,>).MakeGenericType(types); return Activator.CreateInstance(type, results[0], results[1], results[2], results[3], results[4]); } else if (results.Count == 6) { - var type = typeof(ValueTuple<,,,,,>).MakeGenericType(types.ToArray()); + var type = typeof(ValueTuple<,,,,,>).MakeGenericType(types); return Activator.CreateInstance(type, results[0], results[1], results[2], results[3], results[4], results[5]); } else if (results.Count == 7) { - var type = typeof(ValueTuple<,,,,,,>).MakeGenericType(types.ToArray()); + var type = typeof(ValueTuple<,,,,,,>).MakeGenericType(types); return Activator.CreateInstance(type, results[0], results[1], results[2], results[3], results[4], results[5], results[6]); } @@ -1476,10 +1527,13 @@ public static class DataDeserializer { var rt = new AsyncBag(); + Console.WriteLine("PropertyValueArrayParserAsync " + length); ListParserAsync(new ParsedTDU() { Data = data, Offset = offset, ContentLength = length } , connection, requestSequence).Then(x => { + Console.WriteLine("PropertyValueArrayParserAsync:Done " + length); + var ar = (object[])x; var pvs = new List(); diff --git a/Esiur/Data/DataSerializer.cs b/Esiur/Data/DataSerializer.cs index c2b1ccb..165c470 100644 --- a/Esiur/Data/DataSerializer.cs +++ b/Esiur/Data/DataSerializer.cs @@ -609,7 +609,7 @@ public static class DataSerializer var tru = TRU.FromType(type); byte[] composed = TypedArrayComposer(value, tru, warehouse, connection); - + if (composed == null) return new TDU(TDUIdentifier.Null, new byte[0], 0); @@ -681,7 +681,7 @@ public static class DataSerializer return new TDU(TDUIdentifier.TypedMap, all, (uint)all.Length, metadata); - + //return new TDU(TDUIdentifier.TypedMap, rt.ToArray(), (uint)rt.Count, // ); @@ -884,7 +884,7 @@ public static class DataSerializer public static unsafe TDU RecordComposer(object value, Warehouse warehouse, DistributedConnection connection) { - var rt = new List();// BinaryList(); + var rt = new List(); var record = (IRecord)value; var template = warehouse.GetTemplateByType(record.GetType()); @@ -900,12 +900,12 @@ public static class DataSerializer var tdu = Codec.ComposeInternal(propValue, warehouse, connection); - if (pt.ValueType.IsTyped() && // pt.ValueType.Identifier == TRUIdentifier.TypedRecord && + if (pt.ValueType.IsTyped() && pt.ValueType.Match(tru)) { // strip metadata var len = (uint)tdu.Composed.Length - tdu.ContentOffset; - tdu = new TDU(TDUIdentifier.TypeOfTarget, + tdu = new TDU(TDUIdentifier.TypeOfTarget, tdu.Composed.Clip(tdu.ContentOffset, len), len); } @@ -940,22 +940,40 @@ public static class DataSerializer var fields = value.GetType().GetFields(); var list = fields.Select(x => x.GetValue(value)).ToArray(); - var types = fields.Select(x => TRU.FromType(x.FieldType).Compose()).ToArray(); + var trus = fields.Select(x => TRU.FromType(x.FieldType)).ToArray(); var metadata = new List(); - foreach (var t in types) - metadata.AddRange(t); - var composed = DynamicArrayComposer(list, warehouse, connection); + foreach (var t in trus) + metadata.AddRange(t.Compose()); - if (composed == null) - return new TDU(TDUIdentifier.Null, new byte[0], 0); - else + var rt = new List(); + + for (var i = 0; i < fields.Length; i++) { - return new TDU(TDUIdentifier.TypedTuple, composed, - (uint)composed.Length, metadata.ToArray()); + var tupleValue = list[i]; + var targetTru = trus[i]; + + var tdu = Codec.ComposeInternal(tupleValue, warehouse, connection); + + var valueTru = TRU.FromType(tupleValue?.GetType()); + + if (targetTru.IsTyped() && + targetTru.Match(valueTru)) + { + // strip metadata + var len = (uint)tdu.Composed.Length - tdu.ContentOffset; + tdu = new TDU(TDUIdentifier.TypeOfTarget, + tdu.Composed.Clip(tdu.ContentOffset, len), len); + } + + rt.AddRange(tdu.Composed); } + + return new TDU(TDUIdentifier.TypedTuple, rt.ToArray(), + (uint)rt.Count, metadata.ToArray()); + } } diff --git a/Esiur/Data/TDU.cs b/Esiur/Data/TDU.cs index 0ffcdf5..a4a8059 100644 --- a/Esiur/Data/TDU.cs +++ b/Esiur/Data/TDU.cs @@ -184,9 +184,10 @@ public struct TDU var len = 1 + (ulong)metaLen + length; - if (length == 0) + if (length == 0 && (metadata == null || metadata.Length == 0)) { Composed = new byte[1] { (byte)Identifier }; + throw new Exception("Need check"); } else if (metadata.Length > 0xFF) { diff --git a/Esiur/Net/IIP/DistributedConnection.cs b/Esiur/Net/IIP/DistributedConnection.cs index c4af607..759d68b 100644 --- a/Esiur/Net/IIP/DistributedConnection.cs +++ b/Esiur/Net/IIP/DistributedConnection.cs @@ -357,6 +357,11 @@ public partial class DistributedConnection : NetworkConnection, IStore x.Resource._EmitEventByIndex(x.Index, x.Value); else x.Resource._UpdatePropertyByIndex(x.Index, x.Value); + }).Error(e => + { + // do nothing + Console.WriteLine("Queue is empty"); + //throw e; }); // set local nonce @@ -429,7 +434,7 @@ public partial class DistributedConnection : NetworkConnection, IStore if (packet.DataType == null) return offset; - Console.WriteLine("Incoming: " + packet); + Console.WriteLine("Incoming: " + packet + " " + packet.CallbackId); if (packet.Method == IIPPacketMethod.Notification) { diff --git a/Esiur/Net/IIP/DistributedConnectionProtocol.cs b/Esiur/Net/IIP/DistributedConnectionProtocol.cs index 1fee7cc..933d512 100644 --- a/Esiur/Net/IIP/DistributedConnectionProtocol.cs +++ b/Esiur/Net/IIP/DistributedConnectionProtocol.cs @@ -266,6 +266,8 @@ partial class DistributedConnection { var req = requests.Take(callbackId); + Console.WriteLine("Completed " + callbackId); + if (req == null) { // @TODO: Send general failure @@ -278,6 +280,11 @@ partial class DistributedConnection reply.Then(result => { req.Trigger(result); + }) + .Error(e => + { + Console.WriteLine(callbackId + ": failed"); + req.TriggerError(e); }); } else @@ -1640,11 +1647,14 @@ partial class DistributedConnection SendRequest(IIPPacketRequest.TemplateFromClassId, classId) .Then((result) => { + Console.WriteLine("Parsing template..."); var tt = TypeTemplate.Parse((byte[])result); templateRequests.Remove(classId); templates.Add(tt.ClassId, tt); Instance.Warehouse.PutTemplate(tt); reply.Trigger(tt); + Console.WriteLine("Done parsing template..."); + }).Error((ex) => { reply.TriggerError(ex); @@ -1695,12 +1705,15 @@ partial class DistributedConnection var rt = new AsyncReply(); - SendRequest(IIPPacketRequest.GetResourceIdByLink, path) - .Then(result => + var req = SendRequest(IIPPacketRequest.GetResourceIdByLink, path); + + + req.Then(result => { rt.Trigger(result); }).Error(ex => rt.TriggerError(ex)); + //Query(path).Then(ar => //{ @@ -1823,7 +1836,7 @@ partial class DistributedConnection if (template?.DefinedType != null && template.IsWrapper) dr = Activator.CreateInstance(template.DefinedType, this, id, (ulong)args[1], (string)args[2]) as DistributedResource; else - dr = new DistributedResource(this, id, Convert.ToUInt64( args[1]), (string)args[2]); + dr = new DistributedResource(this, id, Convert.ToUInt64(args[1]), (string)args[2]); } else { diff --git a/Test/MyService.cs b/Test/MyService.cs index 3cfd41f..49fe3b9 100644 --- a/Test/MyService.cs +++ b/Test/MyService.cs @@ -119,7 +119,7 @@ public partial class MyService int MyPasscode = 2025; public PropertyContext Passcode { - get => new ((sender) => sender.Session.AuthorizedAccount == "alice" ? MyPasscode : 0); + get => new((sender) => sender.Session.AuthorizedAccount == "alice" ? MyPasscode : 0); set { if (value.Connection.Session.AuthorizedAccount != "alice") @@ -141,7 +141,7 @@ public partial class MyService [Export] public IRecord[] RecordsArray => new IRecord[] { new MyRecord() { Id = 22, Name = "Test", Score = 22.1 } }; [Export] public List RecordsList => new() { new MyRecord() { Id = 22, Name = "Test", Score = 22.1 } }; - //[Export] public IMyRecord myrecord { get; set; } + [Export] public IMyRecord myrecord { get; set; } [Export] public MyResource[]? myResources; @@ -160,7 +160,7 @@ public partial class MyService public void InvokeEvents(string msg, InvocationContext context) { //if (context.Connection.Session.AuthorizedAccount != "Alice") - // throw new Exception("Only Alice is allowed."); + // throw new Exception("Only Alice is allowed."); StringEvent?.Invoke(msg); ArrayEvent?.Invoke(new object[] { DateTime.UtcNow, "Event", msg }); diff --git a/Test/Program.cs b/Test/Program.cs index a3f9574..396f62a 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -184,6 +184,10 @@ namespace Test await wh.Open(); + + //var sc = service.GetGenericRecord(); + //var d = Codec.Compose(sc, Warehouse.Default, null); + // Start testing TestClient(service); } @@ -216,9 +220,13 @@ namespace Test }); - dynamic remote = await con.Get("sys/service"); + var gr = await remote.GetGenericRecord(); + Console.WriteLine(gr); + //return; + Console.WriteLine("OK"); + perodicTimer = new Timer(new TimerCallback(perodicTimerElapsed), remote, 0, 1000); var pcall = await con.Call("Hello", "whats up ?", DateTime.UtcNow); @@ -231,8 +239,6 @@ namespace Test TestObjectProps(local, remote); - var gr = await remote.GetGenericRecord(); - Console.WriteLine(gr); var opt = await remote.Optional(new { a1 = 22, a2 = 33, a4 = "What?" }); Console.WriteLine(opt);