From 414b34e5aa3d0a8ab77548bb5383555c63395528 Mon Sep 17 00:00:00 2001 From: ahmed Date: Tue, 23 Dec 2025 18:27:45 +0300 Subject: [PATCH] stats --- Esiur/Core/AsyncQueue.cs | 76 +++++++++++++++++-- Esiur/Core/AsyncReply.cs | 7 ++ Esiur/Data/Codec.cs | 4 + Esiur/Data/DataDeserializer.cs | 19 +++++ Esiur/Data/DataSerializer.cs | 8 ++ Esiur/Data/ResourceLink.cs | 27 +++++++ Esiur/Data/ResourceLinkGeneric.cs | 27 +++++++ Esiur/Data/TDUIdentifier.cs | 6 +- Esiur/Data/TRU.cs | 26 ++++--- Esiur/Net/IIP/DistributedConnection.cs | 8 ++ .../Net/IIP/DistributedConnectionProtocol.cs | 26 +++++-- Esiur/Net/NetworkConnection.cs | 2 +- Esiur/Proxy/ResourceGenerator.cs | 6 +- Esiur/Proxy/TemplateGenerator.cs | 8 +- Esiur/Resource/Template/TypeTemplate.cs | 9 ++- Esiur/Resource/Warehouse.cs | 1 - 16 files changed, 222 insertions(+), 38 deletions(-) create mode 100644 Esiur/Data/ResourceLink.cs create mode 100644 Esiur/Data/ResourceLinkGeneric.cs diff --git a/Esiur/Core/AsyncQueue.cs b/Esiur/Core/AsyncQueue.cs index c16dd0f..470f421 100644 --- a/Esiur/Core/AsyncQueue.cs +++ b/Esiur/Core/AsyncQueue.cs @@ -30,9 +30,29 @@ using System.Threading.Tasks; namespace Esiur.Core; +public struct AsyncQueueItem +{ + public AsyncReply Reply; + public int Sequence; + public DateTime Arrival; + public DateTime Delivered; + public DateTime Ready; + public int BatchSize; + public int FlushId; + public int NotificationsCountWaitingInTheQueueAtEnqueueing; + public bool HasResource; +} + + public class AsyncQueue : AsyncReply { - List> list = new List>(); + + int currentId = 0; + int currentFlushId; + + public List> Processed = new(); + + List> list = new List>(); //Action callback; object queueLock = new object(); @@ -46,32 +66,76 @@ public class AsyncQueue : AsyncReply public void Add(AsyncReply reply) { lock (queueLock) - list.Add(reply); + { + currentId++; + list.Add(new AsyncQueueItem() + { + Sequence = currentId, + NotificationsCountWaitingInTheQueueAtEnqueueing = list.Count, + Reply = reply, + Arrival = DateTime.Now, + HasResource = !reply.Ready + }); + } resultReady = false; - reply.Then(processQueue); + if (reply.Ready) + processQueue(default(T)); + else + reply.Then(processQueue); } public void Remove(AsyncReply reply) { lock (queueLock) - list.Remove(reply); + { + var item = list.FirstOrDefault(i => i.Reply == reply); + list.Remove(item); + } + processQueue(default(T)); } void processQueue(T o) { lock (queueLock) + { + var batchSize = 0; for (var i = 0; i < list.Count; i++) - if (list[i].Ready) + { + if (list[i].Reply.Ready) { - Trigger(list[i].Result); + batchSize++; + } + else + { + break; + } + } + + var flushId = currentFlushId++; + + for (var i = 0; i < list.Count; i++) + if (list[i].Reply.Ready) + { + Trigger(list[i].Reply.Result); resultReady = false; + + var p = list[i]; + p.Delivered = DateTime.Now; + p.Ready = p.Reply.ReadyTime; + p.BatchSize = batchSize; + p.FlushId = flushId; + //p.HasResource = p.Reply. (p.Ready - p.Arrival).TotalMilliseconds > 5; + Processed.Add(p); + list.RemoveAt(i); + i--; } else break; + } resultReady = (list.Count == 0); } diff --git a/Esiur/Core/AsyncReply.cs b/Esiur/Core/AsyncReply.cs index 61639c7..d2eb82f 100644 --- a/Esiur/Core/AsyncReply.cs +++ b/Esiur/Core/AsyncReply.cs @@ -39,6 +39,8 @@ namespace Esiur.Core; public class AsyncReply { + public DateTime ReadyTime; + protected List> callbacks = new List>(); protected object result; @@ -238,6 +240,8 @@ public class AsyncReply { lock (asyncLock) { + ReadyTime = DateTime.Now; + //timeout?.Dispose(); if (exception != null) @@ -362,6 +366,9 @@ public class AsyncReply public AsyncReply(object result) { // this.Debug = true; + + ReadyTime = DateTime.Now; + resultReady = true; this.result = result; diff --git a/Esiur/Data/Codec.cs b/Esiur/Data/Codec.cs index ff7ac81..11ab61a 100644 --- a/Esiur/Data/Codec.cs +++ b/Esiur/Data/Codec.cs @@ -95,6 +95,7 @@ public static class Codec DataDeserializer.ListParserAsync, DataDeserializer.ResourceListParserAsync, DataDeserializer.RecordListParserAsync, + DataDeserializer.ResourceLinkParserAsync, }; static AsyncParser[] TypedAsyncParsers = new AsyncParser[] @@ -163,6 +164,8 @@ public static class Codec DataDeserializer.ListParser, DataDeserializer.ResourceListParser, DataDeserializer.RecordListParser, + DataDeserializer.ResourceLinkParser, + // @TODO: Map and MapList parsers to be added }; static SyncParser[] TypedParsers = new SyncParser[] @@ -343,6 +346,7 @@ public static class Codec [typeof(List)] = DataSerializer.RawDataComposerFromList, //[typeof(List)] = DataSerializer.RawDataComposerFromList, [typeof(string)] = DataSerializer.StringComposer, + [typeof(ResourceLink)] = DataSerializer.ResourceLinkComposer, [typeof(UUID)] = DataSerializer.UUIDComposer, // Special [typeof(object[])] = DataSerializer.ListComposer, diff --git a/Esiur/Data/DataDeserializer.cs b/Esiur/Data/DataDeserializer.cs index 1e1ee57..5db0625 100644 --- a/Esiur/Data/DataDeserializer.cs +++ b/Esiur/Data/DataDeserializer.cs @@ -264,6 +264,25 @@ public static class DataDeserializer } + public static object ResourceLinkParserAsync(ParsedTDU tdu, DistributedConnection connection, uint[] requestSequence) + { + var link = tdu.Data.GetString(tdu.Offset, (uint)tdu.ContentLength); + if (connection == null) + { + return new ResourceLink(link); + } + else + { + return connection.Instance.Warehouse.Get(link); + } + } + + public static object ResourceLinkParser(ParsedTDU tdu, Warehouse warehouse) + { + var link = tdu.Data.GetString(tdu.Offset, (uint)tdu.ContentLength); + return new ResourceLink(link); + } + public static unsafe object ResourceParser8Async(ParsedTDU tdu, DistributedConnection connection, uint[] requestSequence) { if (connection == null) diff --git a/Esiur/Data/DataSerializer.cs b/Esiur/Data/DataSerializer.cs index 6cd8230..c7f1d12 100644 --- a/Esiur/Data/DataSerializer.cs +++ b/Esiur/Data/DataSerializer.cs @@ -388,6 +388,14 @@ public static class DataSerializer return new TDU(TDUIdentifier.String, b, (uint)b.Length); } + public static TDU ResourceLinkComposer(object value, Warehouse warehouse, DistributedConnection connection) + { + var b = Encoding.UTF8.GetBytes((ResourceLink)value); + + return new TDU(TDUIdentifier.ResourceLink, b, (uint)b.Length); + } + + public static TDU EnumComposer(object value, Warehouse warehouse, DistributedConnection connection) { if (value == null) diff --git a/Esiur/Data/ResourceLink.cs b/Esiur/Data/ResourceLink.cs new file mode 100644 index 0000000..b0ed58f --- /dev/null +++ b/Esiur/Data/ResourceLink.cs @@ -0,0 +1,27 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Esiur.Data +{ + public class ResourceLink + { + readonly string value; + + public ResourceLink(string value) + { + this.value = value; + } + public static implicit operator string(ResourceLink d) + { + return d.value; + } + public static implicit operator ResourceLink(string d) + { + return new ResourceLink(d); + } + + public override string ToString() => value; + + } +} diff --git a/Esiur/Data/ResourceLinkGeneric.cs b/Esiur/Data/ResourceLinkGeneric.cs new file mode 100644 index 0000000..146a79a --- /dev/null +++ b/Esiur/Data/ResourceLinkGeneric.cs @@ -0,0 +1,27 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Esiur.Data +{ + public class ResourceLink + { + readonly string value; + + public ResourceLink(string value) + { + this.value = value; + } + public static implicit operator string(ResourceLink d) + { + return d.value; + } + public static implicit operator ResourceLink(string d) + { + return new ResourceLink(d); + } + + public override string ToString() => value; + + } +} diff --git a/Esiur/Data/TDUIdentifier.cs b/Esiur/Data/TDUIdentifier.cs index b170978..4ea38d9 100644 --- a/Esiur/Data/TDUIdentifier.cs +++ b/Esiur/Data/TDUIdentifier.cs @@ -46,9 +46,9 @@ namespace Esiur.Data List = 0x42, ResourceList = 0x43, RecordList = 0x44, - Map = 0x45, - MapList = 0x46, - ResourceLink = 0x47, + ResourceLink = 0x45, + Map = 0x46, + MapList = 0x47, Record = 0x80, TypedList = 0x81, diff --git a/Esiur/Data/TRU.cs b/Esiur/Data/TRU.cs index 8240a6f..ec4adb8 100644 --- a/Esiur/Data/TRU.cs +++ b/Esiur/Data/TRU.cs @@ -295,10 +295,14 @@ namespace Esiur.Data return new TRU(TRUIdentifier.Resource, nullable); } else if (type == typeof(IRecord) || type == typeof(Record)) + { return new TRU(TRUIdentifier.Record, nullable); + } else if (type == typeof(Map) - || type == typeof(Dictionary)) + || type == typeof(Dictionary)) + { return new TRU(TRUIdentifier.Map, nullable); + } else if (Codec.ImplementsInterface(type, typeof(IResource))) { tru = new TRU( @@ -306,10 +310,6 @@ namespace Esiur.Data nullable, TypeTemplate.GetTypeUUID(type) ); - - //_cache.Add(type, tru); - - //return tru; } else if (Codec.ImplementsInterface(type, typeof(IRecord))) { @@ -318,14 +318,11 @@ namespace Esiur.Data nullable, TypeTemplate.GetTypeUUID(type) ); - - //_cache.Add(type, tru); - - //return tru; } else if (type.IsGenericType) { var genericType = type.GetGenericTypeDefinition(); + if (genericType == typeof(List<>) || genericType == typeof(VarList<>) || genericType == typeof(IList<>)) @@ -365,11 +362,17 @@ namespace Esiur.Data if (subType2 == null) return null; - tru = new TRU(TRUIdentifier.TypedMap, nullable, null, - new TRU[] { subType1, subType2 }); + tru = new TRU(TRUIdentifier.TypedMap, nullable, null, + new TRU[] { subType1, subType2 }); } } + else if (genericType == typeof(ResourceLink<>)) + { + var args = type.GetGenericArguments(); + + return FromType(args[0]); + } else if (genericType == typeof(ValueTuple<,>)) { var args = type.GetGenericArguments(); @@ -518,6 +521,7 @@ namespace Esiur.Data _ when type == typeof(decimal) => new TRU(TRUIdentifier.Decimal, nullable), _ when type == typeof(string) => new TRU(TRUIdentifier.String, nullable), _ when type == typeof(DateTime) => new TRU(TRUIdentifier.DateTime, nullable), + _ when type == typeof(ResourceLink) => new TRU(TRUIdentifier.Resource, nullable), _ => null }; diff --git a/Esiur/Net/IIP/DistributedConnection.cs b/Esiur/Net/IIP/DistributedConnection.cs index 67d4611..77560c6 100644 --- a/Esiur/Net/IIP/DistributedConnection.cs +++ b/Esiur/Net/IIP/DistributedConnection.cs @@ -348,6 +348,13 @@ public partial class DistributedConnection : NetworkConnection, IStore } + public List> GetFinishedQueue() + { + var l = queue.Processed.ToArray().ToList(); + queue.Processed.Clear(); + return l; + } + void init() { //var q = queue; @@ -438,6 +445,7 @@ public partial class DistributedConnection : NetworkConnection, IStore if (packet.Method == IIPPacketMethod.Notification) { + var dt = packet.DataType.Value; switch (packet.Notification) diff --git a/Esiur/Net/IIP/DistributedConnectionProtocol.cs b/Esiur/Net/IIP/DistributedConnectionProtocol.cs index 2391269..f48f67f 100644 --- a/Esiur/Net/IIP/DistributedConnectionProtocol.cs +++ b/Esiur/Net/IIP/DistributedConnectionProtocol.cs @@ -31,6 +31,7 @@ using Esiur.Resource.Template; using Esiur.Security.Authority; using Esiur.Security.Permissions; using System; +using System.Collections; using System.Collections.Generic; using System.ComponentModel.DataAnnotations; using System.Linq; @@ -67,7 +68,7 @@ partial class DistributedConnection object subscriptionsLock = new object(); - AsyncQueue queue = new AsyncQueue(); + AsyncQueue queue = new (); @@ -118,6 +119,7 @@ partial class DistributedConnection /// AsyncReply SendNotification(IIPPacketNotification action, params object[] args) { + var reply = new AsyncReply(); if (args.Length == 0) @@ -144,6 +146,9 @@ partial class DistributedConnection void SendReply(IIPPacketReply action, uint callbackId, params object[] args) { + if (Instance == null) + return; + if (args.Length == 0) { var bl = new BinaryList(); @@ -311,7 +316,7 @@ partial class DistributedConnection return; } - var (_, parsed) = Codec.ParseAsync(data, 0, this, null); + var (_, parsed) = Codec.ParseAsync(dataType, this, null); if (parsed is AsyncReply reply) { reply.Then(result => @@ -445,22 +450,23 @@ partial class DistributedConnection var (valueOffset, valueSize, args) = DataDeserializer.LimitedCountListParser(dataType.Data, dataType.Offset, dataType.ContentLength, Instance.Warehouse, 2); - var rid = (uint)args[0]; + var rid =Convert.ToUInt32(args[0]); var index = (byte)args[1]; Fetch(rid, null).Then(r => { var pt = r.Instance.Template.GetPropertyTemplateByIndex(index); - if (pt != null) + if (pt == null) return; - var item = new AsyncReply(); - queue.Add(item); var (_, parsed) = Codec.ParseAsync(dataType.Data, valueOffset, this, null); if (parsed is AsyncReply) { + var item = new AsyncReply(); + queue.Add(item); + (parsed as AsyncReply).Then((result) => { item.Trigger(new DistributedResourceQueueItem((DistributedResource)r, @@ -470,9 +476,13 @@ partial class DistributedConnection } else { - item.Trigger(new DistributedResourceQueueItem((DistributedResource)r, + queue.Add(new AsyncReply(new DistributedResourceQueueItem((DistributedResource)r, DistributedResourceQueueItem.DistributedResourceQueueItemType.Propery, - parsed, index)); + parsed, index))); + + //item.Trigger(new DistributedResourceQueueItem((DistributedResource)r, + // DistributedResourceQueueItem.DistributedResourceQueueItemType.Propery, + // parsed, index)); } }); } diff --git a/Esiur/Net/NetworkConnection.cs b/Esiur/Net/NetworkConnection.cs index d1f6cdd..b617c09 100644 --- a/Esiur/Net/NetworkConnection.cs +++ b/Esiur/Net/NetworkConnection.cs @@ -209,7 +209,7 @@ public abstract class NetworkConnection : IDestructible, INetworkReceiver) - || genericType == typeof(PropertyContext<>)) + || genericType == typeof(PropertyContext<>) + || genericType == typeof(AsyncReply<>) + || genericType == typeof(ResourceLink<>)) { return GetDistributedTypes(genericTypeArgs[0]); } diff --git a/Esiur/Resource/Warehouse.cs b/Esiur/Resource/Warehouse.cs index b3b008a..40d1c73 100644 --- a/Esiur/Resource/Warehouse.cs +++ b/Esiur/Resource/Warehouse.cs @@ -320,7 +320,6 @@ public class Warehouse return (T)await store.Get(url[3]); else return (T)store; - } catch (Exception ex) {