From c7d095ea9671a8a1a5d82431a842411204af363f Mon Sep 17 00:00:00 2001 From: ahmed Date: Sun, 5 Apr 2026 12:35:27 +0300 Subject: [PATCH] Tests --- Esiur.sln | 27 +- Libraries/Esiur/Data/Tru.cs | 457 +++++++++--------- Libraries/Esiur/Net/Packets/EpPacketReply.cs | 10 +- .../Esiur/Protocol/EpConnectionProtocol.cs | 337 +++++++------ Libraries/Esiur/Proxy/ResourceGenerator.cs | 35 +- Libraries/Esiur/Proxy/ResourceProxy.cs | 73 +-- Libraries/Esiur/Resource/Instance.cs | 2 +- Libraries/Esiur/Resource/Warehouse.cs | 31 +- ...oj => Esiur.Tests.ConcurrentAttach.csproj} | 2 +- .../ConcurrentAttach/{Client => }/Program.cs | 27 +- .../{Client => }/SensorResource.cs | 0 ...Esiur.Tests.ConcurrentAttach.Server.csproj | 14 - .../ConcurrentAttach/Server/Program.cs | 182 ------- .../ConcurrentAttach/Server/SensorResource.cs | 15 - .../Distribution/NodeFanout/Client/Program.cs | 13 +- .../ResourceCount/Client/Program.cs | 36 +- .../ResourceCount/Server/Program.cs | 17 +- 17 files changed, 546 insertions(+), 732 deletions(-) rename Tests/Distribution/ConcurrentAttach/{Client/Esiur.Tests.ConcurrentAttach.Client.csproj => Esiur.Tests.ConcurrentAttach.csproj} (72%) rename Tests/Distribution/ConcurrentAttach/{Client => }/Program.cs (91%) rename Tests/Distribution/ConcurrentAttach/{Client => }/SensorResource.cs (100%) delete mode 100644 Tests/Distribution/ConcurrentAttach/Server/Esiur.Tests.ConcurrentAttach.Server.csproj delete mode 100644 Tests/Distribution/ConcurrentAttach/Server/Program.cs delete mode 100644 Tests/Distribution/ConcurrentAttach/Server/SensorResource.cs diff --git a/Esiur.sln b/Esiur.sln index ae08b33..7baf7ea 100644 --- a/Esiur.sln +++ b/Esiur.sln @@ -58,22 +58,16 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ConcurrentAttach", "Concurr EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ResourceCount", "ResourceCount", "{058F6BE3-A684-45F9-B61A-25839C64F503}" EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Server", "Server", "{17796DCC-760D-4AD7-BCA9-EFE4801B9044}" -EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Client", "Client", "{FFF7D07F-BA9F-4129-B7AD-99861E11F05E}" -EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Server", "Server", "{DA2EC9AF-E2D9-4B8D-8EC3-CC65CFD3B974}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Client", "Client", "{413A4292-C2B3-4096-94CF-D6F607C20939}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.ConcurrentAttach.Client", "Tests\Distribution\ConcurrentAttach\Client\Esiur.Tests.ConcurrentAttach.Client.csproj", "{CD889154-4EA5-61D3-9FF4-E15F7B3D3573}" -EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.ConcurrentAttach.Server", "Tests\Distribution\ConcurrentAttach\Server\Esiur.Tests.ConcurrentAttach.Server.csproj", "{9A468603-1310-7434-3A2B-4528DA8221C6}" -EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.ResourceCount.Client", "Tests\Distribution\ResourceCount\Client\Esiur.Tests.ResourceCount.Client.csproj", "{69A075E7-D924-59C6-0BF2-17A09201DDF3}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.ResourceCount.Server", "Tests\Distribution\ResourceCount\Server\Esiur.Tests.ResourceCount.Server.csproj", "{D1DF309F-40DE-9C0E-A78B-2648544B77D2}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.ConcurrentAttach", "Tests\Distribution\ConcurrentAttach\Esiur.Tests.ConcurrentAttach.csproj", "{7D88CAF1-1887-A011-BA72-F38C87C1A7D9}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -132,14 +126,6 @@ Global {D8340DC7-5D27-2A71-74CC-634493847FF0}.Debug|Any CPU.Build.0 = Debug|Any CPU {D8340DC7-5D27-2A71-74CC-634493847FF0}.Release|Any CPU.ActiveCfg = Release|Any CPU {D8340DC7-5D27-2A71-74CC-634493847FF0}.Release|Any CPU.Build.0 = Release|Any CPU - {CD889154-4EA5-61D3-9FF4-E15F7B3D3573}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {CD889154-4EA5-61D3-9FF4-E15F7B3D3573}.Debug|Any CPU.Build.0 = Debug|Any CPU - {CD889154-4EA5-61D3-9FF4-E15F7B3D3573}.Release|Any CPU.ActiveCfg = Release|Any CPU - {CD889154-4EA5-61D3-9FF4-E15F7B3D3573}.Release|Any CPU.Build.0 = Release|Any CPU - {9A468603-1310-7434-3A2B-4528DA8221C6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {9A468603-1310-7434-3A2B-4528DA8221C6}.Debug|Any CPU.Build.0 = Debug|Any CPU - {9A468603-1310-7434-3A2B-4528DA8221C6}.Release|Any CPU.ActiveCfg = Release|Any CPU - {9A468603-1310-7434-3A2B-4528DA8221C6}.Release|Any CPU.Build.0 = Release|Any CPU {69A075E7-D924-59C6-0BF2-17A09201DDF3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {69A075E7-D924-59C6-0BF2-17A09201DDF3}.Debug|Any CPU.Build.0 = Debug|Any CPU {69A075E7-D924-59C6-0BF2-17A09201DDF3}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -148,6 +134,10 @@ Global {D1DF309F-40DE-9C0E-A78B-2648544B77D2}.Debug|Any CPU.Build.0 = Debug|Any CPU {D1DF309F-40DE-9C0E-A78B-2648544B77D2}.Release|Any CPU.ActiveCfg = Release|Any CPU {D1DF309F-40DE-9C0E-A78B-2648544B77D2}.Release|Any CPU.Build.0 = Release|Any CPU + {7D88CAF1-1887-A011-BA72-F38C87C1A7D9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {7D88CAF1-1887-A011-BA72-F38C87C1A7D9}.Debug|Any CPU.Build.0 = Debug|Any CPU + {7D88CAF1-1887-A011-BA72-F38C87C1A7D9}.Release|Any CPU.ActiveCfg = Release|Any CPU + {7D88CAF1-1887-A011-BA72-F38C87C1A7D9}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -175,14 +165,11 @@ Global {D8340DC7-5D27-2A71-74CC-634493847FF0} = {8AC1C925-068F-4E78-ADE4-4DA3CF996662} {336B5CE1-95DA-4FDD-A876-0919E3C446CA} = {94C8CFDB-C7C6-40DF-A596-647FEEA3C917} {058F6BE3-A684-45F9-B61A-25839C64F503} = {94C8CFDB-C7C6-40DF-A596-647FEEA3C917} - {17796DCC-760D-4AD7-BCA9-EFE4801B9044} = {336B5CE1-95DA-4FDD-A876-0919E3C446CA} - {FFF7D07F-BA9F-4129-B7AD-99861E11F05E} = {336B5CE1-95DA-4FDD-A876-0919E3C446CA} {DA2EC9AF-E2D9-4B8D-8EC3-CC65CFD3B974} = {058F6BE3-A684-45F9-B61A-25839C64F503} {413A4292-C2B3-4096-94CF-D6F607C20939} = {058F6BE3-A684-45F9-B61A-25839C64F503} - {CD889154-4EA5-61D3-9FF4-E15F7B3D3573} = {FFF7D07F-BA9F-4129-B7AD-99861E11F05E} - {9A468603-1310-7434-3A2B-4528DA8221C6} = {17796DCC-760D-4AD7-BCA9-EFE4801B9044} {69A075E7-D924-59C6-0BF2-17A09201DDF3} = {413A4292-C2B3-4096-94CF-D6F607C20939} {D1DF309F-40DE-9C0E-A78B-2648544B77D2} = {DA2EC9AF-E2D9-4B8D-8EC3-CC65CFD3B974} + {7D88CAF1-1887-A011-BA72-F38C87C1A7D9} = {336B5CE1-95DA-4FDD-A876-0919E3C446CA} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {C584421D-5EC0-4821-B7D8-2633D8D405F2} diff --git a/Libraries/Esiur/Data/Tru.cs b/Libraries/Esiur/Data/Tru.cs index bef089c..f6a1547 100644 --- a/Libraries/Esiur/Data/Tru.cs +++ b/Libraries/Esiur/Data/Tru.cs @@ -267,263 +267,266 @@ namespace Esiur.Data // case TRUIdentifier. } //} - private static Dictionary _cache = new Dictionary(); + private static Dictionary cache = new Dictionary(); + private static object cacheLook = new object(); public static Tru? FromType(Type type) { if (type == null) return new Tru(TruIdentifier.Void, true); - if (_cache.ContainsKey(type)) - return _cache[type]; - - var nullable = false; - - var nullType = System.Nullable.GetUnderlyingType(type); - - if (nullType != null) + lock (cacheLook) { - type = nullType; - nullable = true; - } + if (cache.ContainsKey(type)) + return cache[type]; - Tru? tru = null; + var nullable = false; - if (type == typeof(IResource)) - { - return new Tru(TruIdentifier.Resource, nullable); - } - else if (type == typeof(IRecord) || type == typeof(Record)) - { - return new Tru(TruIdentifier.Record, nullable); - } - else if (type == typeof(Map) - || type == typeof(Dictionary)) - { - return new Tru(TruIdentifier.Map, nullable); - } - else if (Codec.ImplementsInterface(type, typeof(IResource))) - { - tru = new Tru( - TruIdentifier.TypedResource, - nullable, - TypeDef.GetTypeUUID(type) - ); - } - else if (Codec.ImplementsInterface(type, typeof(IRecord))) - { - tru = new Tru( - TruIdentifier.TypedRecord, - nullable, - TypeDef.GetTypeUUID(type) - ); - } - else if (type.IsGenericType) - { - var genericType = type.GetGenericTypeDefinition(); + var nullType = System.Nullable.GetUnderlyingType(type); - if (genericType == typeof(List<>) - || genericType == typeof(VarList<>) - || genericType == typeof(IList<>)) + if (nullType != null) { - var args = type.GetGenericArguments(); - if (args[0] == typeof(object)) + type = nullType; + nullable = true; + } + + Tru? tru = null; + + if (type == typeof(IResource)) + { + return new Tru(TruIdentifier.Resource, nullable); + } + else if (type == typeof(IRecord) || type == typeof(Record)) + { + return new Tru(TruIdentifier.Record, nullable); + } + else if (type == typeof(Map) + || type == typeof(Dictionary)) + { + return new Tru(TruIdentifier.Map, nullable); + } + else if (Codec.ImplementsInterface(type, typeof(IResource))) + { + tru = new Tru( + TruIdentifier.TypedResource, + nullable, + TypeDef.GetTypeUUID(type) + ); + } + else if (Codec.ImplementsInterface(type, typeof(IRecord))) + { + tru = new Tru( + TruIdentifier.TypedRecord, + nullable, + TypeDef.GetTypeUUID(type) + ); + } + else if (type.IsGenericType) + { + var genericType = type.GetGenericTypeDefinition(); + + if (genericType == typeof(List<>) + || genericType == typeof(VarList<>) + || genericType == typeof(IList<>)) { - tru = new Tru(TruIdentifier.List, nullable); + var args = type.GetGenericArguments(); + if (args[0] == typeof(object)) + { + tru = new Tru(TruIdentifier.List, nullable); + } + else + { + var subType = FromType(args[0]); + if (subType == null) // unrecongnized type + return null; + + + tru = new Tru(TruIdentifier.TypedList, nullable, null, + new Tru[] { subType }); + + } + } + else if (genericType == typeof(Map<,>) + || genericType == typeof(Dictionary<,>)) + { + var args = type.GetGenericArguments(); + if (args[0] == typeof(object) && args[1] == typeof(object)) + { + tru = new Tru(TruIdentifier.Map, nullable); + } + else + { + var subType1 = FromType(args[0]); + if (subType1 == null) + return null; + + var subType2 = FromType(args[1]); + if (subType2 == null) + return null; + + tru = new Tru(TruIdentifier.TypedMap, nullable, null, + new Tru[] { subType1, subType2 }); + + } + } + else if (genericType == typeof(ResourceLink<>)) + { + var args = type.GetGenericArguments(); + + return FromType(args[0]); + } + else if (genericType == typeof(ValueTuple<,>)) + { + var args = type.GetGenericArguments(); + var subTypes = new Tru[args.Length]; + for (var i = 0; i < args.Length; i++) + { + var t = FromType(args[i]); + if (t == null) + return null; + subTypes[i] = t; + } + + + tru = new Tru(TruIdentifier.Tuple2, nullable, null, subTypes); + + } + else if (genericType == typeof(ValueTuple<,,>)) + { + var args = type.GetGenericArguments(); + var subTypes = new Tru[args.Length]; + for (var i = 0; i < args.Length; i++) + { + var t = FromType(args[i]); + if (t == null) + return null; + subTypes[i] = t; + } + + tru = new Tru(TruIdentifier.Tuple3, nullable, null, subTypes); + + } + else if (genericType == typeof(ValueTuple<,,,>)) + { + + var args = type.GetGenericArguments(); + var subTypes = new Tru[args.Length]; + for (var i = 0; i < args.Length; i++) + { + var t = FromType(args[i]); + if (t == null) + return null; + subTypes[i] = t; + } + + tru = new Tru(TruIdentifier.Tuple4, nullable, null, subTypes); + } + else if (genericType == typeof(ValueTuple<,,,,>)) + { + var args = type.GetGenericArguments(); + var subTypes = new Tru[args.Length]; + for (var i = 0; i < args.Length; i++) + { + var t = FromType(args[i]); + if (t == null) + return null; + subTypes[i] = t; + } + + tru = new Tru(TruIdentifier.Tuple5, nullable, null, subTypes); + } + else if (genericType == typeof(ValueTuple<,,,,,>)) + { + var args = type.GetGenericArguments(); + var subTypes = new Tru[args.Length]; + for (var i = 0; i < args.Length; i++) + { + var t = FromType(args[i]); + if (t == null) + return null; + subTypes[i] = t; + } + + tru = new Tru(TruIdentifier.Tuple6, nullable, null, subTypes); + } + else if (genericType == typeof(ValueTuple<,,,,,,>)) + { + var args = type.GetGenericArguments(); + var subTypes = new Tru[args.Length]; + for (var i = 0; i < args.Length; i++) + { + var t = FromType(args[i]); + if (t == null) + return null; + subTypes[i] = t; + } + + tru = new Tru(TruIdentifier.Tuple7, nullable, null, subTypes); } + else + return null; + } + else if (type.IsArray) + { + var elementType = type.GetElementType(); + if (elementType == typeof(object)) + tru = new Tru(TruIdentifier.List, nullable); else { - var subType = FromType(args[0]); - if (subType == null) // unrecongnized type - return null; + var subType = FromType(elementType); + if (subType == null) + return null; tru = new Tru(TruIdentifier.TypedList, nullable, null, new Tru[] { subType }); } } - else if (genericType == typeof(Map<,>) - || genericType == typeof(Dictionary<,>)) + else if (type.IsEnum) { - var args = type.GetGenericArguments(); - if (args[0] == typeof(object) && args[1] == typeof(object)) - { - tru = new Tru(TruIdentifier.Map, nullable); - } - else - { - var subType1 = FromType(args[0]); - if (subType1 == null) - return null; - - var subType2 = FromType(args[1]); - if (subType2 == null) - return null; - - tru = new Tru(TruIdentifier.TypedMap, nullable, null, - new Tru[] { subType1, subType2 }); - - } + tru = new Tru(TruIdentifier.Enum, nullable, TypeDef.GetTypeUUID(type)); } - else if (genericType == typeof(ResourceLink<>)) + else if (type.IsInterface) { - var args = type.GetGenericArguments(); - - return FromType(args[0]); + return null; // other interfaces are not supported } - else if (genericType == typeof(ValueTuple<,>)) + + //else if (typeof(Structure).IsAssignableFrom(t) || t == typeof(ExpandoObject) => TRUIdentifier.Structure) + //{ + + //} + + if (tru != null) { - var args = type.GetGenericArguments(); - var subTypes = new Tru[args.Length]; - for (var i = 0; i < args.Length; i++) - { - var t = FromType(args[i]); - if (t == null) - return null; - subTypes[i] = t; - } - - - tru = new Tru(TruIdentifier.Tuple2, nullable, null, subTypes); - + cache.Add(type, tru); + return tru; } - else if (genericType == typeof(ValueTuple<,,>)) + + // last check + return type switch { - var args = type.GetGenericArguments(); - var subTypes = new Tru[args.Length]; - for (var i = 0; i < args.Length; i++) - { - var t = FromType(args[i]); - if (t == null) - return null; - subTypes[i] = t; - } - - tru = new Tru(TruIdentifier.Tuple3, nullable, null, subTypes); - - } - else if (genericType == typeof(ValueTuple<,,,>)) - { - - var args = type.GetGenericArguments(); - var subTypes = new Tru[args.Length]; - for (var i = 0; i < args.Length; i++) - { - var t = FromType(args[i]); - if (t == null) - return null; - subTypes[i] = t; - } - - tru = new Tru(TruIdentifier.Tuple4, nullable, null, subTypes); - } - else if (genericType == typeof(ValueTuple<,,,,>)) - { - var args = type.GetGenericArguments(); - var subTypes = new Tru[args.Length]; - for (var i = 0; i < args.Length; i++) - { - var t = FromType(args[i]); - if (t == null) - return null; - subTypes[i] = t; - } - - tru = new Tru(TruIdentifier.Tuple5, nullable, null, subTypes); - } - else if (genericType == typeof(ValueTuple<,,,,,>)) - { - var args = type.GetGenericArguments(); - var subTypes = new Tru[args.Length]; - for (var i = 0; i < args.Length; i++) - { - var t = FromType(args[i]); - if (t == null) - return null; - subTypes[i] = t; - } - - tru = new Tru(TruIdentifier.Tuple6, nullable, null, subTypes); - } - else if (genericType == typeof(ValueTuple<,,,,,,>)) - { - var args = type.GetGenericArguments(); - var subTypes = new Tru[args.Length]; - for (var i = 0; i < args.Length; i++) - { - var t = FromType(args[i]); - if (t == null) - return null; - subTypes[i] = t; - } - - tru = new Tru(TruIdentifier.Tuple7, nullable, null, subTypes); - } - else - return null; + _ when type == typeof(void) => new Tru(TruIdentifier.Void, nullable), + _ when type == typeof(object) => new Tru(TruIdentifier.Dynamic, nullable), + _ when type == typeof(bool) => new Tru(TruIdentifier.Bool, nullable), + _ when type == typeof(char) => new Tru(TruIdentifier.Char, nullable), + _ when type == typeof(byte) => new Tru(TruIdentifier.UInt8, nullable), + _ when type == typeof(sbyte) => new Tru(TruIdentifier.Int8, nullable), + _ when type == typeof(short) => new Tru(TruIdentifier.Int16, nullable), + _ when type == typeof(ushort) => new Tru(TruIdentifier.UInt16, nullable), + _ when type == typeof(int) => new Tru(TruIdentifier.Int32, nullable), + _ when type == typeof(uint) => new Tru(TruIdentifier.UInt32, nullable), + _ when type == typeof(long) => new Tru(TruIdentifier.Int64, nullable), + _ when type == typeof(ulong) => new Tru(TruIdentifier.UInt64, nullable), + _ when type == typeof(float) => new Tru(TruIdentifier.Float32, nullable), + _ when type == typeof(double) => new Tru(TruIdentifier.Float64, nullable), + _ when type == typeof(decimal) => new Tru(TruIdentifier.Decimal, nullable), + _ when type == typeof(string) => new Tru(TruIdentifier.String, nullable), + _ when type == typeof(DateTime) => new Tru(TruIdentifier.DateTime, nullable), + _ when type == typeof(ResourceLink) => new Tru(TruIdentifier.Resource, nullable), + _ => null + }; } - else if (type.IsArray) - { - var elementType = type.GetElementType(); - if (elementType == typeof(object)) - tru = new Tru(TruIdentifier.List, nullable); - else - { - var subType = FromType(elementType); - - if (subType == null) - return null; - - tru = new Tru(TruIdentifier.TypedList, nullable, null, - new Tru[] { subType }); - - } - } - else if (type.IsEnum) - { - tru = new Tru(TruIdentifier.Enum, nullable, TypeDef.GetTypeUUID(type)); - } - else if (type.IsInterface) - { - return null; // other interfaces are not supported - } - - //else if (typeof(Structure).IsAssignableFrom(t) || t == typeof(ExpandoObject) => TRUIdentifier.Structure) - //{ - - //} - - if (tru != null) - { - _cache.Add(type, tru); - return tru; - } - - // last check - return type switch - { - _ when type == typeof(void) => new Tru(TruIdentifier.Void, nullable), - _ when type == typeof(object) => new Tru(TruIdentifier.Dynamic, nullable), - _ when type == typeof(bool) => new Tru(TruIdentifier.Bool, nullable), - _ when type == typeof(char) => new Tru(TruIdentifier.Char, nullable), - _ when type == typeof(byte) => new Tru(TruIdentifier.UInt8, nullable), - _ when type == typeof(sbyte) => new Tru(TruIdentifier.Int8, nullable), - _ when type == typeof(short) => new Tru(TruIdentifier.Int16, nullable), - _ when type == typeof(ushort) => new Tru(TruIdentifier.UInt16, nullable), - _ when type == typeof(int) => new Tru(TruIdentifier.Int32, nullable), - _ when type == typeof(uint) => new Tru(TruIdentifier.UInt32, nullable), - _ when type == typeof(long) => new Tru(TruIdentifier.Int64, nullable), - _ when type == typeof(ulong) => new Tru(TruIdentifier.UInt64, nullable), - _ when type == typeof(float) => new Tru(TruIdentifier.Float32, nullable), - _ when type == typeof(double) => new Tru(TruIdentifier.Float64, nullable), - _ when type == typeof(decimal) => new Tru(TruIdentifier.Decimal, nullable), - _ when type == typeof(string) => new Tru(TruIdentifier.String, nullable), - _ when type == typeof(DateTime) => new Tru(TruIdentifier.DateTime, nullable), - _ when type == typeof(ResourceLink) => new Tru(TruIdentifier.Resource, nullable), - _ => null - }; - } public Tru(TruIdentifier identifier, bool nullable, Uuid? uuid = null, Tru[]? subTypes = null) diff --git a/Libraries/Esiur/Net/Packets/EpPacketReply.cs b/Libraries/Esiur/Net/Packets/EpPacketReply.cs index a3a7d2e..b7195de 100644 --- a/Libraries/Esiur/Net/Packets/EpPacketReply.cs +++ b/Libraries/Esiur/Net/Packets/EpPacketReply.cs @@ -12,12 +12,12 @@ namespace Esiur.Net.Packets Stream = 0x2, // Error - PermissionError = 0x81, - ExecutionError = 0x82, + PermissionError = 0x4, + ExecutionError = 0x5, // Partial - Progress = 0x10, - Chunk = 0x11, - Warning = 0x12 + Progress = 0x8, + Chunk = 0x9, + Warning = 0xA } } diff --git a/Libraries/Esiur/Protocol/EpConnectionProtocol.cs b/Libraries/Esiur/Protocol/EpConnectionProtocol.cs index e6ed159..4d9098d 100644 --- a/Libraries/Esiur/Protocol/EpConnectionProtocol.cs +++ b/Libraries/Esiur/Protocol/EpConnectionProtocol.cs @@ -57,9 +57,11 @@ partial class EpConnection Dictionary typeDefs = new Dictionary(); + object typeDefsLock = new object(); + KeyList requests = new KeyList(); - volatile uint callbackCounter = 0; + volatile int callbackCounter = 0; Dictionary> subscriptions = new Dictionary>(); @@ -73,15 +75,18 @@ partial class EpConnection /// - /// Send IIP request. + /// Send EP request. /// /// Packet action. /// Arguments to send. /// + /// + AsyncReply SendRequest(EpPacketRequest action, params object[] args) { var reply = new AsyncReply(); - var c = callbackCounter++; // avoid thread racing + var c = (uint)Interlocked.Increment(ref callbackCounter); + //callbackCounter++; // avoid thread racing requests.Add(c, reply); if (args.Length == 0) @@ -112,7 +117,7 @@ partial class EpConnection } /// - /// Send IIP notification. + /// Send EP notification. /// /// Packet action. /// Arguments to send. @@ -343,7 +348,7 @@ partial class EpConnection var args = DataDeserializer.ListParser(dataType, Instance.Warehouse) as object[]; - var errorCode = (ushort)args[0]; + var errorCode =Convert.ToUInt16( args[0]); var errorMsg = (string)args[1]; req.TriggerError(new AsyncException(type, errorCode, errorMsg)); @@ -1721,60 +1726,66 @@ partial class EpConnection /// TypeSchema. public AsyncReply GetTypeDefById(Uuid typeId) { - if (typeDefs.ContainsKey(typeId)) - return new AsyncReply(typeDefs[typeId]); - else if (typeDefsByIdRequests.ContainsKey(typeId)) - return typeDefsByIdRequests[typeId]; + lock (typeDefsLock) + { + if (typeDefs.ContainsKey(typeId)) + return new AsyncReply(typeDefs[typeId]); + else if (typeDefsByIdRequests.ContainsKey(typeId)) + return typeDefsByIdRequests[typeId]; - var reply = new AsyncReply(); - typeDefsByIdRequests.Add(typeId, reply); + var reply = new AsyncReply(); + typeDefsByIdRequests.Add(typeId, reply); - SendRequest(EpPacketRequest.TypeDefById, typeId) - .Then((result) => - { - var tt = TypeDef.Parse((byte[])result); - typeDefsByIdRequests.Remove(typeId); - typeDefs.Add(tt.Id, tt); - Instance.Warehouse.RegisterTypeDef(tt); - reply.Trigger(tt); + SendRequest(EpPacketRequest.TypeDefById, typeId) + .Then((result) => + { + var tt = TypeDef.Parse((byte[])result); + typeDefsByIdRequests.Remove(typeId); + typeDefs.Add(tt.Id, tt); + Instance.Warehouse.RegisterTypeDef(tt); + reply.Trigger(tt); - }).Error((ex) => - { - reply.TriggerError(ex); - }); + }).Error((ex) => + { + reply.TriggerError(ex); + }); - return reply; + return reply; + } } public AsyncReply GetTypeDefByName(string typeName) { - var typeDef = typeDefs.Values.FirstOrDefault(x => x.Name == typeName); - if (typeDef != null) - return new AsyncReply(typeDef); + lock (typeDefsLock) + { + var typeDef = typeDefs.Values.FirstOrDefault(x => x.Name == typeName); + if (typeDef != null) + return new AsyncReply(typeDef); - if (typeDefsByNameRequests.ContainsKey(typeName)) - return typeDefsByNameRequests[typeName]; + if (typeDefsByNameRequests.ContainsKey(typeName)) + return typeDefsByNameRequests[typeName]; - var reply = new AsyncReply(); - typeDefsByNameRequests.Add(typeName, reply); + var reply = new AsyncReply(); + typeDefsByNameRequests.Add(typeName, reply); - SendRequest(EpPacketRequest.TypeDefByName, typeName) - .Then((result) => - { - var tt = TypeDef.Parse((byte[])result); + SendRequest(EpPacketRequest.TypeDefByName, typeName) + .Then((result) => + { + var tt = TypeDef.Parse((byte[])result); - typeDefsByNameRequests.Remove(typeName); - typeDefs.Add(tt.Id, tt); - Instance.Warehouse.RegisterTypeDef(tt); - reply.Trigger(tt); - }).Error((ex) => - { - reply.TriggerError(ex); - }); + typeDefsByNameRequests.Remove(typeName); + typeDefs.Add(tt.Id, tt); + Instance.Warehouse.RegisterTypeDef(tt); + reply.Trigger(tt); + }).Error((ex) => + { + reply.TriggerError(ex); + }); - return reply; + return reply; + } } // IStore interface @@ -1846,156 +1857,162 @@ partial class EpConnection /// /// Resource Id /// DistributedResource + /// + object fetchLock = new object(); public AsyncReply Fetch(uint id, uint[] requestSequence) { + //lock (fetchLock) + //{ + EpResource resource = null; - EpResource resource = null; + attachedResources[id]?.TryGetTarget(out resource); - attachedResources[id]?.TryGetTarget(out resource); - - if (resource != null) - return new AsyncReply(resource); - - resource = neededResources[id]; - - var requestInfo = resourceRequests[id]; - - if (requestInfo != null) - { - if (resource != null && (requestSequence?.Contains(id) ?? false)) - { - // dead lock avoidance for loop reference. + if (resource != null) return new AsyncReply(resource); - } - else if (resource != null && requestInfo.RequestSequence.Contains(id)) + + resource = neededResources[id]; + + var requestInfo = resourceRequests[id]; + + if (requestInfo != null) { - // dead lock avoidance for dependent reference. + if (resource != null && (requestSequence?.Contains(id) ?? false)) + { + // dead lock avoidance for loop reference. + return new AsyncReply(resource); + } + else if (resource != null && requestInfo.RequestSequence.Contains(id)) + { + // dead lock avoidance for dependent reference. + return new AsyncReply(resource); + } + else + { + return requestInfo.Reply; + } + } + else if (resource != null && !resource.DistributedResourceSuspended) + { + // @REVIEW: this should never happen + Global.Log("DCON", LogType.Error, "Resource not moved to attached."); return new AsyncReply(resource); + } - else - { - return requestInfo.Reply; - } - } - else if (resource != null && !resource.DistributedResourceSuspended) - { - // @REVIEW: this should never happen - Global.Log("DCON", LogType.Error, "Resource not moved to attached."); - return new AsyncReply(resource); - } + var newSequence = requestSequence != null ? requestSequence.Concat(new uint[] { id }).ToArray() : new uint[] { id }; - var newSequence = requestSequence != null ? requestSequence.Concat(new uint[] { id }).ToArray() : new uint[] { id }; + var reply = new AsyncReply(); + resourceRequests.Add(id, new EpResourceAttachRequestInfo(reply, newSequence)); - var reply = new AsyncReply(); - resourceRequests.Add(id, new EpResourceAttachRequestInfo(reply, newSequence)); - - SendRequest(EpPacketRequest.AttachResource, id) - .Then((result) => - { - if (result == null) + SendRequest(EpPacketRequest.AttachResource, id) + .Then((result) => { - reply.TriggerError(new AsyncException(ErrorType.Management, - (ushort)ExceptionCode.ResourceNotFound, "Null response")); - return; - } + if (result == null) + { + reply.TriggerError(new AsyncException(ErrorType.Management, + (ushort)ExceptionCode.ResourceNotFound, "Null response")); + return; + } - // TypeId, Age, Link, Hops, PropertyValue[] - var args = (object[])result; - var typeId = (Uuid)args[0]; - var age = Convert.ToUInt64(args[1]); - var link = (string)args[2]; - var hops = (byte)args[3]; - var pvData = (byte[])args[4]; + // TypeId, Age, Link, Hops, PropertyValue[] + var args = (object[])result; + var typeId = (Uuid)args[0]; + var age = Convert.ToUInt64(args[1]); + var link = (string)args[2]; + var hops = (byte)args[3]; + var pvData = (byte[])args[4]; - EpResource dr; - TypeDef typeDef = null; + EpResource dr; + TypeDef typeDef = null; - if (resource == null) - { - typeDef = Instance.Warehouse.GetTypeDefById(typeId, TypeDefKind.Resource); - if (typeDef?.DefinedType != null && typeDef.IsWrapper) - dr = Activator.CreateInstance(typeDef.DefinedType, this, id, Convert.ToUInt64(args[1]), (string)args[2]) as EpResource; + if (resource == null) + { + typeDef = Instance.Warehouse.GetTypeDefById(typeId, TypeDefKind.Resource); + if (typeDef?.DefinedType != null && typeDef.IsWrapper) + dr = Activator.CreateInstance(typeDef.DefinedType, this, id, Convert.ToUInt64(args[1]), (string)args[2]) as EpResource; + else + dr = new EpResource(this, id, Convert.ToUInt64(args[1]), (string)args[2]); + } else - dr = new EpResource(this, id, Convert.ToUInt64(args[1]), (string)args[2]); - } - else - { - dr = resource; - typeDef = resource.Instance.Definition; - } - - - var initResource = (EpResource ok) => - { - var parsedReply = DataDeserializer.PropertyValueArrayParserAsync(pvData, 0, (uint)pvData.Length, this, newSequence);// Codec.proper (content, 0, this, newSequence, transmissionType); - - - parsedReply.Then(results => { - var pvs = results as PropertyValue[]; - - //var pvs = new List(); - - //for (var i = 0; i < ar.Length; i += 3) - // pvs.Add(new PropertyValue(ar[i + 2], Convert.ToUInt64(ar[i]), (DateTime)ar[i + 1])); - - dr._Attach(pvs); - resourceRequests.Remove(id); - // move from needed to attached. - neededResources.Remove(id); - attachedResources[id] = new WeakReference(dr); - reply.Trigger(dr); - }).Error(ex => reply.TriggerError(ex)); + dr = resource; + typeDef = resource.Instance.Definition; + } - }; - - if (typeDef == null) - { - GetTypeDefById(typeId).Then((tmp) => + var initResource = (EpResource ok) => + { + var parsedReply = DataDeserializer.PropertyValueArrayParserAsync(pvData, 0, (uint)pvData.Length, this, newSequence);// Codec.proper (content, 0, this, newSequence, transmissionType); + + + parsedReply.Then(results => + { + var pvs = results as PropertyValue[]; + + //var pvs = new List(); + + //for (var i = 0; i < ar.Length; i += 3) + // pvs.Add(new PropertyValue(ar[i + 2], Convert.ToUInt64(ar[i]), (DateTime)ar[i + 1])); + + dr._Attach(pvs); + resourceRequests.Remove(id); + // move from needed to attached. + neededResources.Remove(id); + attachedResources[id] = new WeakReference(dr); + reply.Trigger(dr); + }).Error(ex => reply.TriggerError(ex)); + + + }; + + if (typeDef == null) + { + GetTypeDefById(typeId).Then((tmp) => + { + // typeId, ResourceAge, ResourceLink, Content + if (resource == null) + { + dr.ResourceDefinition = tmp; + + Instance.Warehouse.Put(this.Instance.Link + "/" + id.ToString(), dr) + .Then(initResource) + .Error(ex => reply.TriggerError(ex)); + } + else + { + initResource(resource); + } + }).Error((ex) => + { + reply.TriggerError(ex); + }); + + } + else { - // typeId, ResourceAge, ResourceLink, Content if (resource == null) { - dr.ResourceDefinition = tmp; + dr.ResourceDefinition = typeDef; Instance.Warehouse.Put(this.Instance.Link + "/" + id.ToString(), dr) - .Then(initResource) - .Error(ex => reply.TriggerError(ex)); + .Then(initResource).Error((ex) => reply.TriggerError(ex)); } else { initResource(resource); } - }).Error((ex) => - { - reply.TriggerError(ex); - }); - } - else + } + + }).Error((ex) => { - if (resource == null) - { - Instance.Warehouse.Put(this.Instance.Link + "/" + id.ToString(), dr) - .Then(initResource).Error((ex) => reply.TriggerError(ex)); - } - else - { - initResource(resource); - } - - } - - }).Error((ex) => - { - reply.TriggerError(ex); - }); + reply.TriggerError(ex); + }); - return reply; + return reply; + //} } diff --git a/Libraries/Esiur/Proxy/ResourceGenerator.cs b/Libraries/Esiur/Proxy/ResourceGenerator.cs index 6875d46..b790c49 100644 --- a/Libraries/Esiur/Proxy/ResourceGenerator.cs +++ b/Libraries/Esiur/Proxy/ResourceGenerator.cs @@ -117,30 +117,6 @@ namespace Esiur.Proxy } } -// var code = @$"using Esiur.Resource; -//using Esiur.Core; - -//#nullable enable - -//namespace {ci.ClassSymbol.ContainingNamespace.ToDisplayString()} {{ -//"; - -// if (IsInterfaceImplemented(ci, classes)) -// code += $"public partial class {ci.Name} {{\r\n"; -// else -// { -// code += -//$@" public partial class {ci.Name} : IResource {{ -// public virtual Instance? Instance {{ get; set; }} -// public virtual event DestroyedEvent? OnDestroy; - -// public virtual void Destroy() {{ OnDestroy?.Invoke(this); }} -//"; - -// if (!ci.HasTrigger) -// code += "\tpublic virtual AsyncReply Trigger(ResourceTrigger trigger) => new AsyncReply(true);\r\n\r\n"; -// } - foreach (var f in ci.Fields) { var givenName = f.GetAttributes().FirstOrDefault(x => x.AttributeClass?.Name == "ExportAttribute")?.ConstructorArguments.FirstOrDefault().Value as string; @@ -156,12 +132,17 @@ namespace Esiur.Proxy if (f.Type.Name.StartsWith("ResourceEventHandler") || f.Type.Name.StartsWith("CustomResourceEventHandler")) { - code.AppendLine($"public event {f.Type} {pn};"); + code.AppendLine($"\tpublic event {f.Type} {pn};"); } else { - code.AppendLine($"\t{attrs}\r\n\t public {f.Type} {pn} {{ \r\n\t\t get => {fn}; \r\n\t\t set {{ \r\n\t\t this.{fn} = value; \r\n\t\t Instance?.Modified(); \r\n\t\t}}\r\n\t}}\r\n"); - + code.AppendLine($"\tpublic {f.Type} {pn} {{"); + code.AppendLine($"\t\t get => {fn};"); + code.AppendLine($"\t\t set {{"); + code.AppendLine($"\t\t this.{fn} = value;"); + code.AppendLine($"\t\t Instance?.Modified();"); + code.AppendLine($"\t\t}}"); + code.AppendLine($"\t}}"); } } diff --git a/Libraries/Esiur/Proxy/ResourceProxy.cs b/Libraries/Esiur/Proxy/ResourceProxy.cs index 5f48330..d5fdfe7 100644 --- a/Libraries/Esiur/Proxy/ResourceProxy.cs +++ b/Libraries/Esiur/Proxy/ResourceProxy.cs @@ -11,6 +11,7 @@ namespace Esiur.Proxy; public static class ResourceProxy { static Dictionary cache = new Dictionary(); + static object cacheLock = new object(); #if NETSTANDARD static MethodInfo modifyMethod = typeof(Instance).GetTypeInfo().GetMethod("Modified"); @@ -48,33 +49,34 @@ public static class ResourceProxy public static Type GetProxy(Type type) { - - if (cache.ContainsKey(type)) - return cache[type]; - - // check if the type was made with code generation - if (type.GetCustomAttribute(false) != null) + lock (cacheLock) { - cache.Add(type, type); - return type; - } + if (cache.ContainsKey(type)) + return cache[type]; - if (!Codec.ImplementsInterface(type, typeof(IResource))) - { - cache.Add(type, type); - return type; - } + // check if the type was made with code generation + if (type.GetCustomAttribute(false) != null) + { + cache.Add(type, type); + return type; + } + + if (!Codec.ImplementsInterface(type, typeof(IResource))) + { + cache.Add(type, type); + return type; + } #if NETSTANDARD - var typeInfo = type.GetTypeInfo(); + var typeInfo = type.GetTypeInfo(); - if (typeInfo.IsSealed || typeInfo.IsAbstract) - throw new Exception("Sealed/Abastract classes can't be proxied."); + if (typeInfo.IsSealed || typeInfo.IsAbstract) + throw new Exception("Sealed/Abastract classes can't be proxied."); - var props = from p in typeInfo.GetProperties(BindingFlags.Instance | BindingFlags.Public) - where p.CanWrite && p.SetMethod.IsVirtual && !p.SetMethod.IsFinal && - p.GetCustomAttribute(false) != null - select p; + var props = from p in typeInfo.GetProperties(BindingFlags.Instance | BindingFlags.Public) + where p.CanWrite && p.SetMethod.IsVirtual && !p.SetMethod.IsFinal && + p.GetCustomAttribute(false) != null + select p; #else if (type.IsSealed) @@ -86,33 +88,34 @@ public static class ResourceProxy select p; #endif - var assemblyName = new AssemblyName("Esiur.Proxy.T." + type.Assembly.GetName().Name);// type.Namespace); - assemblyName.Version = type.Assembly.GetName().Version; - assemblyName.CultureInfo = type.Assembly.GetName().CultureInfo; - //assemblyName.SetPublicKeyToken(null); + var assemblyName = new AssemblyName("Esiur.Proxy.T." + type.Assembly.GetName().Name);// type.Namespace); + assemblyName.Version = type.Assembly.GetName().Version; + assemblyName.CultureInfo = type.Assembly.GetName().CultureInfo; + //assemblyName.SetPublicKeyToken(null); - var assemblyBuilder = AssemblyBuilder.DefineDynamicAssembly(assemblyName, AssemblyBuilderAccess.Run); - var moduleBuilder = assemblyBuilder.DefineDynamicModule(assemblyName.Name); - var typeName = "Esiur.Proxy.T." + type.FullName;// Assembly.CreateQualifiedName(assemblyName.FullName, "Esiur.Proxy.T." + type.FullName); + var assemblyBuilder = AssemblyBuilder.DefineDynamicAssembly(assemblyName, AssemblyBuilderAccess.Run); + var moduleBuilder = assemblyBuilder.DefineDynamicModule(assemblyName.Name); + var typeName = "Esiur.Proxy.T." + type.FullName;// Assembly.CreateQualifiedName(assemblyName.FullName, "Esiur.Proxy.T." + type.FullName); - var typeBuilder = moduleBuilder.DefineType(typeName, - TypeAttributes.Public | TypeAttributes.Class, type); + var typeBuilder = moduleBuilder.DefineType(typeName, + TypeAttributes.Public | TypeAttributes.Class, type); - foreach (PropertyInfo propertyInfo in props) - CreateProperty(propertyInfo, typeBuilder, type); + foreach (PropertyInfo propertyInfo in props) + CreateProperty(propertyInfo, typeBuilder, type); #if NETSTANDARD - var t = typeBuilder.CreateTypeInfo().AsType(); - cache.Add(type, t); - return t; + var t = typeBuilder.CreateTypeInfo().AsType(); + cache.Add(type, t); + return t; #else var t = typeBuilder.CreateType(); cache.Add(type, t); return t; #endif + } } public static Type GetProxy() diff --git a/Libraries/Esiur/Resource/Instance.cs b/Libraries/Esiur/Resource/Instance.cs index e1b55a8..734a638 100644 --- a/Libraries/Esiur/Resource/Instance.cs +++ b/Libraries/Esiur/Resource/Instance.cs @@ -753,7 +753,7 @@ public class Instance } else { - this.definition = Warehouse.GetTypeDefByType(resource.GetType()); + this.definition = warehouse.GetTypeDefByType(resource.GetType()); } // set ages diff --git a/Libraries/Esiur/Resource/Warehouse.cs b/Libraries/Esiur/Resource/Warehouse.cs index eba5f53..316a283 100644 --- a/Libraries/Esiur/Resource/Warehouse.cs +++ b/Libraries/Esiur/Resource/Warehouse.cs @@ -67,6 +67,8 @@ public class Warehouse [TypeDefKind.Enum] = new KeyList(), }; + object typeDefsLock = new object(); + bool warehouseIsOpen = false; public delegate void StoreEvent(IStore store); @@ -508,10 +510,13 @@ public class Warehouse /// Resource type definition. public void RegisterTypeDef(TypeDef typeDef) { - if (typeDefs[typeDef.Kind].ContainsKey(typeDef.Id)) - throw new Exception($"TypeDef with same class Id already exists. {typeDefs[typeDef.Kind][typeDef.Id].Name} -> {typeDef.Name}"); + lock (typeDefsLock) + { + if (typeDefs[typeDef.Kind].ContainsKey(typeDef.Id)) + throw new Exception($"TypeDef with same class Id already exists. {typeDefs[typeDef.Kind][typeDef.Id].Name} -> {typeDef.Name}"); - typeDefs[typeDef.Kind][typeDef.Id] = typeDef; + typeDefs[typeDef.Kind][typeDef.Id] = typeDef; + } } @@ -522,9 +527,11 @@ public class Warehouse /// Resource TypeDef. public TypeDef GetTypeDefByType(Type type) { + if (!(type.IsClass || type.IsEnum)) return null; + var baseType = ResourceProxy.GetBaseType(type); if (baseType == typeof(IResource) @@ -541,15 +548,17 @@ public class Warehouse else return null; - var typeDef = typeDefs[typeDefKind].Values.FirstOrDefault(x => x.DefinedType == baseType); - if (typeDef != null) + lock (typeDefsLock) + { + var typeDef = typeDefs[typeDefKind].Values.FirstOrDefault(x => x.DefinedType == baseType); + if (typeDef != null) + return typeDef; + + // create new TypeDef for type + typeDef = new TypeDef(baseType, this); + TypeDef.GetDependencies(typeDef, this); return typeDef; - - // create new TypeDef for type - typeDef = new TypeDef(baseType, this); - TypeDef.GetDependencies(typeDef, this); - - return typeDef; + } } /// diff --git a/Tests/Distribution/ConcurrentAttach/Client/Esiur.Tests.ConcurrentAttach.Client.csproj b/Tests/Distribution/ConcurrentAttach/Esiur.Tests.ConcurrentAttach.csproj similarity index 72% rename from Tests/Distribution/ConcurrentAttach/Client/Esiur.Tests.ConcurrentAttach.Client.csproj rename to Tests/Distribution/ConcurrentAttach/Esiur.Tests.ConcurrentAttach.csproj index 44e8917..3143391 100644 --- a/Tests/Distribution/ConcurrentAttach/Client/Esiur.Tests.ConcurrentAttach.Client.csproj +++ b/Tests/Distribution/ConcurrentAttach/Esiur.Tests.ConcurrentAttach.csproj @@ -8,7 +8,7 @@ - + diff --git a/Tests/Distribution/ConcurrentAttach/Client/Program.cs b/Tests/Distribution/ConcurrentAttach/Program.cs similarity index 91% rename from Tests/Distribution/ConcurrentAttach/Client/Program.cs rename to Tests/Distribution/ConcurrentAttach/Program.cs index 5a946a9..df0f7cc 100644 --- a/Tests/Distribution/ConcurrentAttach/Client/Program.cs +++ b/Tests/Distribution/ConcurrentAttach/Program.cs @@ -17,9 +17,9 @@ // Usage (client only): dotnet run -- --mode client --host 127.0.0.1 --concurrent 50 --resources 200 // ============================================================ +using Esiur.Protocol; using Esiur.Resource; using Esiur.Stores; -using Esiur.Protocol; using System.Diagnostics; var mode = GetArg(args, "--mode", "both"); @@ -29,30 +29,30 @@ var concurrent = int.Parse(GetArg(args, "--concurrent", "50")); var resources = int.Parse(GetArg(args, "--resources", "200")); var timeoutMs = int.Parse(GetArg(args, "--timeout", "10000")); var rounds = int.Parse(GetArg(args, "--rounds", "5")); -var wh = new Warehouse(); +var clientWh = new Warehouse(); +var serverWh = new Warehouse(); // ---------------------------------------------------------------- // SERVER SIDE // ---------------------------------------------------------------- if (mode == "server" || mode == "both") { - - await wh.Put("sys", new MemoryStore()); - await wh.Put("sys/server", new EpServer() { Port = (ushort)port }); + await serverWh.Put("sys", new MemoryStore()); + await serverWh.Put("sys/server", new EpServer() { Port = (ushort)port }); for (int i = 0; i < resources; i++) { - await wh.Put($"sys/sensor_{i}", new SensorResource { SensorId = i, Value = i }); + await serverWh.Put($"sys/sensor_{i}", new SensorResource { SensorId = i, Value = i }); } - await wh.Open(); + await serverWh.Open(); Console.WriteLine($"[Server-T3] Ready: {resources} resources on port {port}"); if (mode == "server") { Console.WriteLine("Press ENTER to stop."); Console.ReadLine(); - await wh.Close(); + await serverWh.Close(); return; } @@ -90,8 +90,8 @@ for (int round = 0; round < rounds; round++) using var cts = new CancellationTokenSource(timeoutMs); try { - var proxy = await wh.Get( - $"iip://{host}:{port}/sys/sensor_{resourceIdx}"); + var proxy = await clientWh.Get( + $"ep://{host}:{port}/sys/sensor_{resourceIdx}"); sw.Stop(); latencies[taskIdx] = sw.Elapsed.TotalMilliseconds; @@ -161,8 +161,11 @@ var csv = "round,concurrent,succeeded,failed,timed_out,total_wall_ms,min_ms,p50_ await File.WriteAllTextAsync("test3_concurrent_attach.csv", csv); Console.WriteLine("\n[Client-T3] Results written to test3_concurrent_attach.csv"); -if (mode == "both") - await wh.Close(); +if (mode == "server" || mode == "both") + await serverWh.Close(); + +if (mode == "client" || mode == "both") + await clientWh.Close(); // ---------------------------------------------------------------- diff --git a/Tests/Distribution/ConcurrentAttach/Client/SensorResource.cs b/Tests/Distribution/ConcurrentAttach/SensorResource.cs similarity index 100% rename from Tests/Distribution/ConcurrentAttach/Client/SensorResource.cs rename to Tests/Distribution/ConcurrentAttach/SensorResource.cs diff --git a/Tests/Distribution/ConcurrentAttach/Server/Esiur.Tests.ConcurrentAttach.Server.csproj b/Tests/Distribution/ConcurrentAttach/Server/Esiur.Tests.ConcurrentAttach.Server.csproj deleted file mode 100644 index 44e8917..0000000 --- a/Tests/Distribution/ConcurrentAttach/Server/Esiur.Tests.ConcurrentAttach.Server.csproj +++ /dev/null @@ -1,14 +0,0 @@ - - - - Exe - net10.0 - enable - enable - - - - - - - diff --git a/Tests/Distribution/ConcurrentAttach/Server/Program.cs b/Tests/Distribution/ConcurrentAttach/Server/Program.cs deleted file mode 100644 index ef092d2..0000000 --- a/Tests/Distribution/ConcurrentAttach/Server/Program.cs +++ /dev/null @@ -1,182 +0,0 @@ -// ============================================================ -// Test 3: Concurrent Attachments — COMBINED (server + clients -// in the same process for local stress testing, or run the -// server section separately for multi-machine testing). -// -// Fires N concurrent Warehouse.Get calls simultaneously and -// measures: -// - Time for all proxies to reach Ready state -// - Whether any attachments fail or deadlock (timeout) -// - Distribution of per-attachment latency -// -// This directly stress-tests Algorithm 1 (FETCH-RESOURCE) and -// the parallel deadlock detection mechanism from Section V.D. -// -// Usage (single process): dotnet run -- --mode both --concurrent 50 --resources 200 -// Usage (server only): dotnet run -- --mode server --resources 200 --port 10902 -// Usage (client only): dotnet run -- --mode client --host 127.0.0.1 --concurrent 50 --resources 200 -// ============================================================ - -using Esiur.Protocol; -using Esiur.Resource; -using Esiur.Stores; -using System.Diagnostics; - -var mode = GetArg(args, "--mode", "both"); -var host = GetArg(args, "--host", "127.0.0.1"); -var port = int.Parse(GetArg(args, "--port", "10902")); -var concurrent = int.Parse(GetArg(args, "--concurrent", "50")); -var resources = int.Parse(GetArg(args, "--resources", "200")); -var timeoutMs = int.Parse(GetArg(args, "--timeout", "10000")); -var rounds = int.Parse(GetArg(args, "--rounds", "5")); - -var wh = new Warehouse(); -// ---------------------------------------------------------------- -// SERVER SIDE -// ---------------------------------------------------------------- -if (mode == "server" || mode == "both") -{ - await wh.Put("sys", new MemoryStore()); - await wh.Put("sys/server", new EpServer() { Port = (ushort)port }); - - for (int i = 0; i < resources; i++) - { - await wh.Put($"sys/sensor_{i}", new SensorResource { SensorId = i, Value = i }); - } - - await wh.Open(); - Console.WriteLine($"[Server-T3] Ready: {resources} resources on port {port}"); - - if (mode == "server") - { - Console.WriteLine("Press ENTER to stop."); - Console.ReadLine(); - await wh.Close(); - return; - } - - // Give server a moment to fully initialise before client fires - await Task.Delay(500); -} - -// ---------------------------------------------------------------- -// CLIENT SIDE -// ---------------------------------------------------------------- -Console.WriteLine($"[Client-T3] concurrent={concurrent} resources={resources} rounds={rounds}"); - -var roundResults = new List(); - -for (int round = 0; round < rounds; round++) -{ - Console.WriteLine($"\n[Client-T3] Round {round + 1}/{rounds}"); - - // Pick `concurrent` random resource indices (may overlap — intentional, - // because overlapping triggers the "already in progress" path of Algorithm 1) - var rng = new Random(round); - var targets = Enumerable.Range(0, concurrent) - .Select(_ => rng.Next(resources)) - .ToArray(); - - long succeeded = 0, failed = 0, timedOut = 0; - var latencies = new double[concurrent]; - - var roundSw = Stopwatch.StartNew(); - - // Fire all attachments simultaneously - var tasks = targets.Select((resourceIdx, taskIdx) => Task.Run(async () => - { - var sw = Stopwatch.StartNew(); - using var cts = new CancellationTokenSource(timeoutMs); - try - { - var proxy = await wh.Get( - $"iip://{host}:{port}/sys/sensor_{resourceIdx}"); - - sw.Stop(); - latencies[taskIdx] = sw.Elapsed.TotalMilliseconds; - - if (proxy != null) - Interlocked.Increment(ref succeeded); - else - Interlocked.Increment(ref failed); - } - catch (OperationCanceledException) - { - sw.Stop(); - latencies[taskIdx] = timeoutMs; - Interlocked.Increment(ref timedOut); - Console.WriteLine($" [!] Timeout on sensor_{resourceIdx} after {timeoutMs}ms"); - } - catch (Exception ex) - { - sw.Stop(); - latencies[taskIdx] = sw.Elapsed.TotalMilliseconds; - Interlocked.Increment(ref failed); - Console.WriteLine($" [!] Error on sensor_{resourceIdx}: {ex.Message}"); - } - })).ToArray(); - - await Task.WhenAll(tasks); - roundSw.Stop(); - - var sorted = latencies.OrderBy(x => x).ToArray(); - int n = sorted.Length; - - var result = new RoundResult - { - Round = round + 1, - Concurrent = concurrent, - Succeeded = succeeded, - Failed = failed, - TimedOut = timedOut, - TotalMs = roundSw.Elapsed.TotalMilliseconds, - MinMs = sorted[0], - P50Ms = sorted[(int)(n * 0.50)], - P95Ms = sorted[(int)(n * 0.95)], - P99Ms = sorted[(int)(n * 0.99)], - MaxMs = sorted[n - 1], - MeanMs = sorted.Average() - }; - roundResults.Add(result); - - Console.WriteLine($" succeeded={succeeded}/{concurrent} failed={failed} timedOut={timedOut}"); - Console.WriteLine($" total_wall={result.TotalMs:F0}ms"); - Console.WriteLine($" latency: min={result.MinMs:F1} p50={result.P50Ms:F1} p95={result.P95Ms:F1} " + - $"p99={result.P99Ms:F1} max={result.MaxMs:F1} mean={result.MeanMs:F1} (ms)"); - - // Release all proxies before next round to reset attachment state - GC.Collect(); - await Task.Delay(1000); -} - -// ---------------------------------------------------------------- -// CSV output -// ---------------------------------------------------------------- -var csv = "round,concurrent,succeeded,failed,timed_out,total_wall_ms,min_ms,p50_ms,p95_ms,p99_ms,max_ms,mean_ms\n" + - string.Join("\n", roundResults.Select(r => - $"{r.Round},{r.Concurrent},{r.Succeeded},{r.Failed},{r.TimedOut}," + - $"{r.TotalMs:F1},{r.MinMs:F2},{r.P50Ms:F2},{r.P95Ms:F2},{r.P99Ms:F2},{r.MaxMs:F2},{r.MeanMs:F2}")); - -await File.WriteAllTextAsync("test3_concurrent_attach.csv", csv); -Console.WriteLine("\n[Client-T3] Results written to test3_concurrent_attach.csv"); - -if (mode == "both") - await wh.Close(); - - -// ---------------------------------------------------------------- -// Helpers -// ---------------------------------------------------------------- -static string GetArg(string[] args, string key, string def) -{ - int i = Array.IndexOf(args, key); - return (i >= 0 && i + 1 < args.Length) ? args[i + 1] : def; -} - -record RoundResult -{ - public int Round; - public int Concurrent; - public long Succeeded, Failed, TimedOut; - public double TotalMs, MinMs, P50Ms, P95Ms, P99Ms, MaxMs, MeanMs; -} diff --git a/Tests/Distribution/ConcurrentAttach/Server/SensorResource.cs b/Tests/Distribution/ConcurrentAttach/Server/SensorResource.cs deleted file mode 100644 index 8dd3474..0000000 --- a/Tests/Distribution/ConcurrentAttach/Server/SensorResource.cs +++ /dev/null @@ -1,15 +0,0 @@ -using Esiur.Resource; - -/// -/// Shared observable sensor resource used across all scalability tests. -/// Property changes via Value setter are automatically propagated -/// to all attached remote peers by the Esiur runtime. -/// -[Resource] -public partial class SensorResource : Resource -{ - public int SensorId { get; set; } - - [Export] - public double value; -} diff --git a/Tests/Distribution/NodeFanout/Client/Program.cs b/Tests/Distribution/NodeFanout/Client/Program.cs index 0108a4b..33e493a 100644 --- a/Tests/Distribution/NodeFanout/Client/Program.cs +++ b/Tests/Distribution/NodeFanout/Client/Program.cs @@ -29,7 +29,7 @@ long latencySamples = 0; var latencyLock = new object(); // --- Attach all resources ------------------------------------------- -var proxies = new dynamic[resourceCount]; +var proxies = new IResource[resourceCount]; var sw = Stopwatch.StartNew(); var wh = new Warehouse(); @@ -38,16 +38,18 @@ try { for (int i = 0; i < resourceCount; i++) { - proxies[i] = await wh.Get($"iip://{host}:{port}/sys/sensor_{i}"); + proxies[i] = await wh.Get($"ep://{host}:{port}/sys/sensor_{i}"); + + dynamic resource = proxies[i]; // Subscribe to property change notifications via the Esiur event model - double lastValue = (double)proxies[i].Value; + double lastValue = (double)resource.Value; long lastTick = Stopwatch.GetTimestamp(); int capturedI = i; - proxies[i].OnPropertyModified += (string propName, object oldVal, object newVal) => + proxies[i].Instance.PropertyModified += (PropertyModificationInfo data) => { - if (propName != "Value") return; + if (data.Name != "Value") return; long nowTick = Stopwatch.GetTimestamp(); double elapsedMs = (nowTick - lastTick) * 1000.0 / Stopwatch.Frequency; @@ -72,6 +74,7 @@ catch (Exception ex) Console.WriteLine($"[Client {clientId}] Attach error: {ex.Message}"); return; } + // --- Measurement window --------------------------------------------- sw.Restart(); diff --git a/Tests/Distribution/ResourceCount/Client/Program.cs b/Tests/Distribution/ResourceCount/Client/Program.cs index 890b6f6..8e28b92 100644 --- a/Tests/Distribution/ResourceCount/Client/Program.cs +++ b/Tests/Distribution/ResourceCount/Client/Program.cs @@ -8,24 +8,30 @@ // Usage: dotnet run -- --host 127.0.0.1 --port 10901 --resources 10000 // ============================================================ +using Esiur.Protocol; using Esiur.Resource; using System.Diagnostics; -var host = GetArg(args, "--host", "127.0.0.1"); -var port = int.Parse(GetArg(args, "--port", "10901")); +var host = GetArg(args, "--host", "127.0.0.1"); +var port = int.Parse(GetArg(args, "--port", "10901")); var resourceCount = int.Parse(GetArg(args, "--resources", "10000")); -var batchSize = int.Parse(GetArg(args, "--batch", "100")); +var batchSize = int.Parse(GetArg(args, "--batch", "100")); Console.WriteLine($"[Client-T2] Connecting to {host}:{port}, resources={resourceCount}"); +var wh = new Warehouse(); + +var connnection = await wh.Get( + $"ep://{host}:{port}"); + var attachLatencies = new List(resourceCount); -var proxies = new dynamic[resourceCount]; +var proxies = new IResource[resourceCount]; // --- Attach in batches to avoid overwhelming the runtime ------------- var totalSw = Stopwatch.StartNew(); -var wh = new Warehouse(); for (int batch = 0; batch < resourceCount; batch += batchSize) { + int end = Math.Min(batch + batchSize, resourceCount); var batchTasks = new Task[end - batch]; @@ -35,8 +41,8 @@ for (int batch = 0; batch < resourceCount; batch += batchSize) batchTasks[i - batch] = Task.Run(async () => { var sw = Stopwatch.StartNew(); - proxies[capturedI] = await wh.Get( - $"iip://{host}:{port}/sys/sensor_{capturedI}"); + proxies[capturedI] = await connnection.Get($"sys/sensor_{capturedI}"); + sw.Stop(); lock (attachLatencies) @@ -45,7 +51,7 @@ for (int batch = 0; batch < resourceCount; batch += batchSize) } await Task.WhenAll(batchTasks); - + //Console.WriteLine("D"); if (batch % 1000 == 0) Console.WriteLine($"[Client-T2] Attached {Math.Min(batch + batchSize, resourceCount)}/{resourceCount} " + $"elapsed={totalSw.Elapsed.TotalSeconds:F1}s"); @@ -60,10 +66,10 @@ int n = attachLatencies.Count; Console.WriteLine($"[Client-T2] Attach latency (ms):"); Console.WriteLine($" min={attachLatencies[0]:F2}"); -Console.WriteLine($" p50={attachLatencies[(int)(n*0.50)]:F2}"); -Console.WriteLine($" p95={attachLatencies[(int)(n*0.95)]:F2}"); -Console.WriteLine($" p99={attachLatencies[(int)(n*0.99)]:F2}"); -Console.WriteLine($" max={attachLatencies[n-1]:F2}"); +Console.WriteLine($" p50={attachLatencies[(int)(n * 0.50)]:F2}"); +Console.WriteLine($" p95={attachLatencies[(int)(n * 0.95)]:F2}"); +Console.WriteLine($" p99={attachLatencies[(int)(n * 0.99)]:F2}"); +Console.WriteLine($" max={attachLatencies[n - 1]:F2}"); Console.WriteLine($" mean={attachLatencies.Average():F2}"); // --- Notification round-trip after full load ------------------------ @@ -74,13 +80,15 @@ double sumLatencyMs = 0; for (int i = 0; i < Math.Min(500, resourceCount); i++) { int capturedI = i; - proxies[capturedI].OnPropertyModified += (string propName, object oldVal, object newVal) => + proxies[capturedI].Instance.PropertyModified += (PropertyModificationInfo data) => { - if (propName == "Value") + if (data.Name == "Value") Interlocked.Increment(ref received); }; } +await connnection.Call("UpdateValues"); + await Task.Delay(10000); // observe for 10s Console.WriteLine($"[Client-T2] Received {received} notifications in 10s from first 500 resources"); diff --git a/Tests/Distribution/ResourceCount/Server/Program.cs b/Tests/Distribution/ResourceCount/Server/Program.cs index 32f1860..63071ac 100644 --- a/Tests/Distribution/ResourceCount/Server/Program.cs +++ b/Tests/Distribution/ResourceCount/Server/Program.cs @@ -18,18 +18,29 @@ Console.WriteLine($"[Server-T2] Creating {resourceCount} resources on port {port var wh = new Warehouse(); await wh.Put("sys", new MemoryStore()); -await wh.Put("sys/server", new EpServer() { Port = (ushort)port }); +var server = await wh.Put("sys/server", new EpServer() { Port = (ushort)port }); long memBefore = GC.GetTotalMemory(forceFullCollection: true); +List sensors = new List(); + for (int i = 0; i < resourceCount; i++) { - var s = new SensorResource { SensorId = i, Value = i * 0.1 }; - await wh.Put($"sys/sensor_{i}", s); + var sensor = await wh.Put($"sys/sensor_{i}", + new SensorResource { SensorId = i, Value = i * 0.1 }); + sensors.Add(sensor); } await wh.Open(); +server.MapCall("UpdateValues", () => +{ + foreach(var sensor in sensors) + { + sensor.Value += 0.1; + } +}); + long memAfter = GC.GetTotalMemory(forceFullCollection: true); double memMB = (memAfter - memBefore) / (1024.0 * 1024.0);