2
0
mirror of https://github.com/esiur/esiur-dotnet.git synced 2026-01-27 01:20:39 +00:00
This commit is contained in:
2025-12-23 18:27:45 +03:00
parent d730fe1b8d
commit 414b34e5aa
16 changed files with 222 additions and 38 deletions

View File

@@ -30,9 +30,29 @@ using System.Threading.Tasks;
namespace Esiur.Core;
public struct AsyncQueueItem<T>
{
public AsyncReply<T> Reply;
public int Sequence;
public DateTime Arrival;
public DateTime Delivered;
public DateTime Ready;
public int BatchSize;
public int FlushId;
public int NotificationsCountWaitingInTheQueueAtEnqueueing;
public bool HasResource;
}
public class AsyncQueue<T> : AsyncReply<T>
{
List<AsyncReply<T>> list = new List<AsyncReply<T>>();
int currentId = 0;
int currentFlushId;
public List<AsyncQueueItem<T>> Processed = new();
List<AsyncQueueItem<T>> list = new List<AsyncQueueItem<T>>();
//Action<T> callback;
object queueLock = new object();
@@ -46,32 +66,76 @@ public class AsyncQueue<T> : AsyncReply<T>
public void Add(AsyncReply<T> reply)
{
lock (queueLock)
list.Add(reply);
{
currentId++;
list.Add(new AsyncQueueItem<T>()
{
Sequence = currentId,
NotificationsCountWaitingInTheQueueAtEnqueueing = list.Count,
Reply = reply,
Arrival = DateTime.Now,
HasResource = !reply.Ready
});
}
resultReady = false;
if (reply.Ready)
processQueue(default(T));
else
reply.Then(processQueue);
}
public void Remove(AsyncReply<T> reply)
{
lock (queueLock)
list.Remove(reply);
{
var item = list.FirstOrDefault(i => i.Reply == reply);
list.Remove(item);
}
processQueue(default(T));
}
void processQueue(T o)
{
lock (queueLock)
for (var i = 0; i < list.Count; i++)
if (list[i].Ready)
{
Trigger(list[i].Result);
var batchSize = 0;
for (var i = 0; i < list.Count; i++)
{
if (list[i].Reply.Ready)
{
batchSize++;
}
else
{
break;
}
}
var flushId = currentFlushId++;
for (var i = 0; i < list.Count; i++)
if (list[i].Reply.Ready)
{
Trigger(list[i].Reply.Result);
resultReady = false;
var p = list[i];
p.Delivered = DateTime.Now;
p.Ready = p.Reply.ReadyTime;
p.BatchSize = batchSize;
p.FlushId = flushId;
//p.HasResource = p.Reply. (p.Ready - p.Arrival).TotalMilliseconds > 5;
Processed.Add(p);
list.RemoveAt(i);
i--;
}
else
break;
}
resultReady = (list.Count == 0);
}

View File

@@ -39,6 +39,8 @@ namespace Esiur.Core;
public class AsyncReply
{
public DateTime ReadyTime;
protected List<Action<object>> callbacks = new List<Action<object>>();
protected object result;
@@ -238,6 +240,8 @@ public class AsyncReply
{
lock (asyncLock)
{
ReadyTime = DateTime.Now;
//timeout?.Dispose();
if (exception != null)
@@ -362,6 +366,9 @@ public class AsyncReply
public AsyncReply(object result)
{
// this.Debug = true;
ReadyTime = DateTime.Now;
resultReady = true;
this.result = result;

View File

@@ -95,6 +95,7 @@ public static class Codec
DataDeserializer.ListParserAsync,
DataDeserializer.ResourceListParserAsync,
DataDeserializer.RecordListParserAsync,
DataDeserializer.ResourceLinkParserAsync,
};
static AsyncParser[] TypedAsyncParsers = new AsyncParser[]
@@ -163,6 +164,8 @@ public static class Codec
DataDeserializer.ListParser,
DataDeserializer.ResourceListParser,
DataDeserializer.RecordListParser,
DataDeserializer.ResourceLinkParser,
// @TODO: Map and MapList parsers to be added
};
static SyncParser[] TypedParsers = new SyncParser[]
@@ -343,6 +346,7 @@ public static class Codec
[typeof(List<byte>)] = DataSerializer.RawDataComposerFromList,
//[typeof(List<byte?>)] = DataSerializer.RawDataComposerFromList,
[typeof(string)] = DataSerializer.StringComposer,
[typeof(ResourceLink)] = DataSerializer.ResourceLinkComposer,
[typeof(UUID)] = DataSerializer.UUIDComposer,
// Special
[typeof(object[])] = DataSerializer.ListComposer,

View File

@@ -264,6 +264,25 @@ public static class DataDeserializer
}
public static object ResourceLinkParserAsync(ParsedTDU tdu, DistributedConnection connection, uint[] requestSequence)
{
var link = tdu.Data.GetString(tdu.Offset, (uint)tdu.ContentLength);
if (connection == null)
{
return new ResourceLink(link);
}
else
{
return connection.Instance.Warehouse.Get<IResource>(link);
}
}
public static object ResourceLinkParser(ParsedTDU tdu, Warehouse warehouse)
{
var link = tdu.Data.GetString(tdu.Offset, (uint)tdu.ContentLength);
return new ResourceLink(link);
}
public static unsafe object ResourceParser8Async(ParsedTDU tdu, DistributedConnection connection, uint[] requestSequence)
{
if (connection == null)

View File

@@ -388,6 +388,14 @@ public static class DataSerializer
return new TDU(TDUIdentifier.String, b, (uint)b.Length);
}
public static TDU ResourceLinkComposer(object value, Warehouse warehouse, DistributedConnection connection)
{
var b = Encoding.UTF8.GetBytes((ResourceLink)value);
return new TDU(TDUIdentifier.ResourceLink, b, (uint)b.Length);
}
public static TDU EnumComposer(object value, Warehouse warehouse, DistributedConnection connection)
{
if (value == null)

View File

@@ -0,0 +1,27 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace Esiur.Data
{
public class ResourceLink
{
readonly string value;
public ResourceLink(string value)
{
this.value = value;
}
public static implicit operator string(ResourceLink d)
{
return d.value;
}
public static implicit operator ResourceLink(string d)
{
return new ResourceLink(d);
}
public override string ToString() => value;
}
}

View File

@@ -0,0 +1,27 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace Esiur.Data
{
public class ResourceLink<T>
{
readonly string value;
public ResourceLink(string value)
{
this.value = value;
}
public static implicit operator string(ResourceLink<T> d)
{
return d.value;
}
public static implicit operator ResourceLink<T>(string d)
{
return new ResourceLink<T>(d);
}
public override string ToString() => value;
}
}

View File

@@ -46,9 +46,9 @@ namespace Esiur.Data
List = 0x42,
ResourceList = 0x43,
RecordList = 0x44,
Map = 0x45,
MapList = 0x46,
ResourceLink = 0x47,
ResourceLink = 0x45,
Map = 0x46,
MapList = 0x47,
Record = 0x80,
TypedList = 0x81,

View File

@@ -295,10 +295,14 @@ namespace Esiur.Data
return new TRU(TRUIdentifier.Resource, nullable);
}
else if (type == typeof(IRecord) || type == typeof(Record))
{
return new TRU(TRUIdentifier.Record, nullable);
}
else if (type == typeof(Map<object, object>)
|| type == typeof(Dictionary<object, object>))
{
return new TRU(TRUIdentifier.Map, nullable);
}
else if (Codec.ImplementsInterface(type, typeof(IResource)))
{
tru = new TRU(
@@ -306,10 +310,6 @@ namespace Esiur.Data
nullable,
TypeTemplate.GetTypeUUID(type)
);
//_cache.Add(type, tru);
//return tru;
}
else if (Codec.ImplementsInterface(type, typeof(IRecord)))
{
@@ -318,14 +318,11 @@ namespace Esiur.Data
nullable,
TypeTemplate.GetTypeUUID(type)
);
//_cache.Add(type, tru);
//return tru;
}
else if (type.IsGenericType)
{
var genericType = type.GetGenericTypeDefinition();
if (genericType == typeof(List<>)
|| genericType == typeof(VarList<>)
|| genericType == typeof(IList<>))
@@ -370,6 +367,12 @@ namespace Esiur.Data
}
}
else if (genericType == typeof(ResourceLink<>))
{
var args = type.GetGenericArguments();
return FromType(args[0]);
}
else if (genericType == typeof(ValueTuple<,>))
{
var args = type.GetGenericArguments();
@@ -518,6 +521,7 @@ namespace Esiur.Data
_ when type == typeof(decimal) => new TRU(TRUIdentifier.Decimal, nullable),
_ when type == typeof(string) => new TRU(TRUIdentifier.String, nullable),
_ when type == typeof(DateTime) => new TRU(TRUIdentifier.DateTime, nullable),
_ when type == typeof(ResourceLink) => new TRU(TRUIdentifier.Resource, nullable),
_ => null
};

View File

@@ -348,6 +348,13 @@ public partial class DistributedConnection : NetworkConnection, IStore
}
public List<AsyncQueueItem<DistributedResourceQueueItem>> GetFinishedQueue()
{
var l = queue.Processed.ToArray().ToList();
queue.Processed.Clear();
return l;
}
void init()
{
//var q = queue;
@@ -438,6 +445,7 @@ public partial class DistributedConnection : NetworkConnection, IStore
if (packet.Method == IIPPacketMethod.Notification)
{
var dt = packet.DataType.Value;
switch (packet.Notification)

View File

@@ -31,6 +31,7 @@ using Esiur.Resource.Template;
using Esiur.Security.Authority;
using Esiur.Security.Permissions;
using System;
using System.Collections;
using System.Collections.Generic;
using System.ComponentModel.DataAnnotations;
using System.Linq;
@@ -67,7 +68,7 @@ partial class DistributedConnection
object subscriptionsLock = new object();
AsyncQueue<DistributedResourceQueueItem> queue = new AsyncQueue<DistributedResourceQueueItem>();
AsyncQueue<DistributedResourceQueueItem> queue = new ();
@@ -118,6 +119,7 @@ partial class DistributedConnection
/// <returns></returns>
AsyncReply SendNotification(IIPPacketNotification action, params object[] args)
{
var reply = new AsyncReply();
if (args.Length == 0)
@@ -144,6 +146,9 @@ partial class DistributedConnection
void SendReply(IIPPacketReply action, uint callbackId, params object[] args)
{
if (Instance == null)
return;
if (args.Length == 0)
{
var bl = new BinaryList();
@@ -311,7 +316,7 @@ partial class DistributedConnection
return;
}
var (_, parsed) = Codec.ParseAsync(data, 0, this, null);
var (_, parsed) = Codec.ParseAsync(dataType, this, null);
if (parsed is AsyncReply reply)
{
reply.Then(result =>
@@ -445,22 +450,23 @@ partial class DistributedConnection
var (valueOffset, valueSize, args) =
DataDeserializer.LimitedCountListParser(dataType.Data, dataType.Offset, dataType.ContentLength, Instance.Warehouse, 2);
var rid = (uint)args[0];
var rid =Convert.ToUInt32(args[0]);
var index = (byte)args[1];
Fetch(rid, null).Then(r =>
{
var pt = r.Instance.Template.GetPropertyTemplateByIndex(index);
if (pt != null)
if (pt == null)
return;
var item = new AsyncReply<DistributedResourceQueueItem>();
queue.Add(item);
var (_, parsed) = Codec.ParseAsync(dataType.Data, valueOffset, this, null);
if (parsed is AsyncReply)
{
var item = new AsyncReply<DistributedResourceQueueItem>();
queue.Add(item);
(parsed as AsyncReply).Then((result) =>
{
item.Trigger(new DistributedResourceQueueItem((DistributedResource)r,
@@ -470,9 +476,13 @@ partial class DistributedConnection
}
else
{
item.Trigger(new DistributedResourceQueueItem((DistributedResource)r,
queue.Add(new AsyncReply<DistributedResourceQueueItem>(new DistributedResourceQueueItem((DistributedResource)r,
DistributedResourceQueueItem.DistributedResourceQueueItemType.Propery,
parsed, index));
parsed, index)));
//item.Trigger(new DistributedResourceQueueItem((DistributedResource)r,
// DistributedResourceQueueItem.DistributedResourceQueueItemType.Propery,
// parsed, index));
}
});
}

View File

@@ -209,7 +209,7 @@ public abstract class NetworkConnection : IDestructible, INetworkReceiver<ISocke
{
get
{
return sock.State == SocketState.Established;
return sock == null ? false : sock.State == SocketState.Established;
}
}

View File

@@ -229,12 +229,12 @@ $@" public partial class {ci.Name} : IResource {{
if (tmp.Type == TemplateType.Resource)
{
var source = TemplateGenerator.GenerateClass(tmp, templates, false);
spc.AddSource(tmp.ClassName + ".Generated.cs", source);
spc.AddSource(tmp.ClassName + ".g.cs", source);
}
else if (tmp.Type == TemplateType.Record)
{
var source = TemplateGenerator.GenerateRecord(tmp, templates);
spc.AddSource(tmp.ClassName + ".Generated.cs", source);
spc.AddSource(tmp.ClassName + ".g.cs", source);
}
}
@@ -246,7 +246,7 @@ $@" public partial class {ci.Name} : IResource {{
"\r\n } \r\n}";
spc.AddSource("Esiur.Generated.cs", typesFile);
spc.AddSource("Esiur.g.cs", typesFile);
}
private static void Report(SourceProductionContext ctx, string title, string message, DiagnosticSeverity severity)

View File

@@ -217,17 +217,17 @@ public static class TemplateGenerator
if (tmp.Type == TemplateType.Resource)
{
var source = GenerateClass(tmp, templates, asyncSetters);
File.WriteAllText(dstDir.FullName + Path.DirectorySeparatorChar + tmp.ClassName + ".Generated.cs", source);
File.WriteAllText(dstDir.FullName + Path.DirectorySeparatorChar + tmp.ClassName + ".g.cs", source);
}
else if (tmp.Type == TemplateType.Record)
{
var source = GenerateRecord(tmp, templates);
File.WriteAllText(dstDir.FullName + Path.DirectorySeparatorChar + tmp.ClassName + ".Generated.cs", source);
File.WriteAllText(dstDir.FullName + Path.DirectorySeparatorChar + tmp.ClassName + ".g.cs", source);
}
else if (tmp.Type == TemplateType.Enum)
{
var source = GenerateEnum(tmp, templates);
File.WriteAllText(dstDir.FullName + Path.DirectorySeparatorChar + tmp.ClassName + ".Generated.cs", source);
File.WriteAllText(dstDir.FullName + Path.DirectorySeparatorChar + tmp.ClassName + ".g.cs", source);
}
}
@@ -248,7 +248,7 @@ public static class TemplateGenerator
"\r\n } \r\n}";
File.WriteAllText(dstDir.FullName + Path.DirectorySeparatorChar + "Esiur.Generated.cs", typesFile);
File.WriteAllText(dstDir.FullName + Path.DirectorySeparatorChar + "Esiur.g.cs", typesFile);
return dstDir.FullName;

View File

@@ -38,6 +38,11 @@ public class TypeTemplate
TemplateType templateType;
public override string ToString()
{
return className;
}
// protected TemplateType
//bool isReady;
@@ -191,7 +196,9 @@ public class TypeTemplate
var genericTypeArgs = type.GetGenericArguments();
if (genericType == typeof(List<>)
|| genericType == typeof(PropertyContext<>))
|| genericType == typeof(PropertyContext<>)
|| genericType == typeof(AsyncReply<>)
|| genericType == typeof(ResourceLink<>))
{
return GetDistributedTypes(genericTypeArgs[0]);
}

View File

@@ -320,7 +320,6 @@ public class Warehouse
return (T)await store.Get(url[3]);
else
return (T)store;
}
catch (Exception ex)
{