From 5b0fba89a4804bbfc990049d22b031f85c62b940 Mon Sep 17 00:00:00 2001 From: ahmed Date: Sat, 4 Apr 2026 15:30:06 +0300 Subject: [PATCH] dist --- Esiur.sln | 72 +++++++ ...Esiur.Tests.ConcurrentAttach.Client.csproj | 10 + .../ConcurrentAttach/Client/Program.cs | 181 ++++++++++++++++++ ...Esiur.Tests.ConcurrentAttach.Server.csproj | 10 + .../ConcurrentAttach/Server/Program.cs | 181 ++++++++++++++++++ .../Esiur.Tests.NodeFanout.Client.csproj | 10 + .../Distribution/NodeFanout/Client/Program.cs | 114 +++++++++++ .../Esiur.Tests.NodeFanout.Server.csproj | 10 + .../Distribution/NodeFanout/Server/Program.cs | 79 ++++++++ .../NodeFanout/Server/SensorResource.cs | 24 +++ .../Esiur.Tests.ResourceCount.Client.csproj | 10 + .../ResourceCount/Client/Program.cs | 97 ++++++++++ .../Esiur.Tests.ResourceCount.Server.csproj | 10 + .../ResourceCount/Server/Program.cs | 45 +++++ 14 files changed, 853 insertions(+) create mode 100644 Tests/Distribution/ConcurrentAttach/Client/Esiur.Tests.ConcurrentAttach.Client.csproj create mode 100644 Tests/Distribution/ConcurrentAttach/Client/Program.cs create mode 100644 Tests/Distribution/ConcurrentAttach/Server/Esiur.Tests.ConcurrentAttach.Server.csproj create mode 100644 Tests/Distribution/ConcurrentAttach/Server/Program.cs create mode 100644 Tests/Distribution/NodeFanout/Client/Esiur.Tests.NodeFanout.Client.csproj create mode 100644 Tests/Distribution/NodeFanout/Client/Program.cs create mode 100644 Tests/Distribution/NodeFanout/Server/Esiur.Tests.NodeFanout.Server.csproj create mode 100644 Tests/Distribution/NodeFanout/Server/Program.cs create mode 100644 Tests/Distribution/NodeFanout/Server/SensorResource.cs create mode 100644 Tests/Distribution/ResourceCount/Client/Esiur.Tests.ResourceCount.Client.csproj create mode 100644 Tests/Distribution/ResourceCount/Client/Program.cs create mode 100644 Tests/Distribution/ResourceCount/Server/Esiur.Tests.ResourceCount.Server.csproj create mode 100644 Tests/Distribution/ResourceCount/Server/Program.cs diff --git a/Esiur.sln b/Esiur.sln index 5bd937b..ae08b33 100644 --- a/Esiur.sln +++ b/Esiur.sln @@ -42,6 +42,38 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Stores.MongoDB", "Sto EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Stores.EntityCore", "Stores\Esiur.Stores.EntityCore\Esiur.Stores.EntityCore.csproj", "{2E5449E2-9A62-16CD-0068-90FE44ABEFEE}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Distribution", "Distribution", "{94C8CFDB-C7C6-40DF-A596-647FEEA3C917}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "NodeFanout", "NodeFanout", "{2E384DCC-3289-47C3-AEE8-48AB23589E41}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Server", "Server", "{5BF70037-FD23-4E4C-9EF8-AAF0BD83CE67}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Client", "Client", "{8AC1C925-068F-4E78-ADE4-4DA3CF996662}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.NodeFanout.Server", "Tests\Distribution\NodeFanout\Server\Esiur.Tests.NodeFanout.Server.csproj", "{F072C376-70B4-B061-745B-0B1BDEBF8CDE}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.NodeFanout.Client", "Tests\Distribution\NodeFanout\Client\Esiur.Tests.NodeFanout.Client.csproj", "{D8340DC7-5D27-2A71-74CC-634493847FF0}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ConcurrentAttach", "ConcurrentAttach", "{336B5CE1-95DA-4FDD-A876-0919E3C446CA}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ResourceCount", "ResourceCount", "{058F6BE3-A684-45F9-B61A-25839C64F503}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Server", "Server", "{17796DCC-760D-4AD7-BCA9-EFE4801B9044}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Client", "Client", "{FFF7D07F-BA9F-4129-B7AD-99861E11F05E}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Server", "Server", "{DA2EC9AF-E2D9-4B8D-8EC3-CC65CFD3B974}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Client", "Client", "{413A4292-C2B3-4096-94CF-D6F607C20939}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.ConcurrentAttach.Client", "Tests\Distribution\ConcurrentAttach\Client\Esiur.Tests.ConcurrentAttach.Client.csproj", "{CD889154-4EA5-61D3-9FF4-E15F7B3D3573}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.ConcurrentAttach.Server", "Tests\Distribution\ConcurrentAttach\Server\Esiur.Tests.ConcurrentAttach.Server.csproj", "{9A468603-1310-7434-3A2B-4528DA8221C6}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.ResourceCount.Client", "Tests\Distribution\ResourceCount\Client\Esiur.Tests.ResourceCount.Client.csproj", "{69A075E7-D924-59C6-0BF2-17A09201DDF3}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.ResourceCount.Server", "Tests\Distribution\ResourceCount\Server\Esiur.Tests.ResourceCount.Server.csproj", "{D1DF309F-40DE-9C0E-A78B-2648544B77D2}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -92,6 +124,30 @@ Global {2E5449E2-9A62-16CD-0068-90FE44ABEFEE}.Debug|Any CPU.Build.0 = Debug|Any CPU {2E5449E2-9A62-16CD-0068-90FE44ABEFEE}.Release|Any CPU.ActiveCfg = Release|Any CPU {2E5449E2-9A62-16CD-0068-90FE44ABEFEE}.Release|Any CPU.Build.0 = Release|Any CPU + {F072C376-70B4-B061-745B-0B1BDEBF8CDE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {F072C376-70B4-B061-745B-0B1BDEBF8CDE}.Debug|Any CPU.Build.0 = Debug|Any CPU + {F072C376-70B4-B061-745B-0B1BDEBF8CDE}.Release|Any CPU.ActiveCfg = Release|Any CPU + {F072C376-70B4-B061-745B-0B1BDEBF8CDE}.Release|Any CPU.Build.0 = Release|Any CPU + {D8340DC7-5D27-2A71-74CC-634493847FF0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {D8340DC7-5D27-2A71-74CC-634493847FF0}.Debug|Any CPU.Build.0 = Debug|Any CPU + {D8340DC7-5D27-2A71-74CC-634493847FF0}.Release|Any CPU.ActiveCfg = Release|Any CPU + {D8340DC7-5D27-2A71-74CC-634493847FF0}.Release|Any CPU.Build.0 = Release|Any CPU + {CD889154-4EA5-61D3-9FF4-E15F7B3D3573}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {CD889154-4EA5-61D3-9FF4-E15F7B3D3573}.Debug|Any CPU.Build.0 = Debug|Any CPU + {CD889154-4EA5-61D3-9FF4-E15F7B3D3573}.Release|Any CPU.ActiveCfg = Release|Any CPU + {CD889154-4EA5-61D3-9FF4-E15F7B3D3573}.Release|Any CPU.Build.0 = Release|Any CPU + {9A468603-1310-7434-3A2B-4528DA8221C6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {9A468603-1310-7434-3A2B-4528DA8221C6}.Debug|Any CPU.Build.0 = Debug|Any CPU + {9A468603-1310-7434-3A2B-4528DA8221C6}.Release|Any CPU.ActiveCfg = Release|Any CPU + {9A468603-1310-7434-3A2B-4528DA8221C6}.Release|Any CPU.Build.0 = Release|Any CPU + {69A075E7-D924-59C6-0BF2-17A09201DDF3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {69A075E7-D924-59C6-0BF2-17A09201DDF3}.Debug|Any CPU.Build.0 = Debug|Any CPU + {69A075E7-D924-59C6-0BF2-17A09201DDF3}.Release|Any CPU.ActiveCfg = Release|Any CPU + {69A075E7-D924-59C6-0BF2-17A09201DDF3}.Release|Any CPU.Build.0 = Release|Any CPU + {D1DF309F-40DE-9C0E-A78B-2648544B77D2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {D1DF309F-40DE-9C0E-A78B-2648544B77D2}.Debug|Any CPU.Build.0 = Debug|Any CPU + {D1DF309F-40DE-9C0E-A78B-2648544B77D2}.Release|Any CPU.ActiveCfg = Release|Any CPU + {D1DF309F-40DE-9C0E-A78B-2648544B77D2}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -111,6 +167,22 @@ Global {9AD6065B-F7FD-AC29-D9EC-153C2F084386} = {D4F8D620-2693-46F8-8D6B-E4E66FF2C511} {09B7271A-1C9B-FB05-019F-779462CB84A7} = {0CB116EC-47FC-4622-9EDD-3D3048F55E2A} {2E5449E2-9A62-16CD-0068-90FE44ABEFEE} = {0CB116EC-47FC-4622-9EDD-3D3048F55E2A} + {94C8CFDB-C7C6-40DF-A596-647FEEA3C917} = {2769C4C3-2595-413B-B7FE-5903826770C1} + {2E384DCC-3289-47C3-AEE8-48AB23589E41} = {94C8CFDB-C7C6-40DF-A596-647FEEA3C917} + {5BF70037-FD23-4E4C-9EF8-AAF0BD83CE67} = {2E384DCC-3289-47C3-AEE8-48AB23589E41} + {8AC1C925-068F-4E78-ADE4-4DA3CF996662} = {2E384DCC-3289-47C3-AEE8-48AB23589E41} + {F072C376-70B4-B061-745B-0B1BDEBF8CDE} = {5BF70037-FD23-4E4C-9EF8-AAF0BD83CE67} + {D8340DC7-5D27-2A71-74CC-634493847FF0} = {8AC1C925-068F-4E78-ADE4-4DA3CF996662} + {336B5CE1-95DA-4FDD-A876-0919E3C446CA} = {94C8CFDB-C7C6-40DF-A596-647FEEA3C917} + {058F6BE3-A684-45F9-B61A-25839C64F503} = {94C8CFDB-C7C6-40DF-A596-647FEEA3C917} + {17796DCC-760D-4AD7-BCA9-EFE4801B9044} = {336B5CE1-95DA-4FDD-A876-0919E3C446CA} + {FFF7D07F-BA9F-4129-B7AD-99861E11F05E} = {336B5CE1-95DA-4FDD-A876-0919E3C446CA} + {DA2EC9AF-E2D9-4B8D-8EC3-CC65CFD3B974} = {058F6BE3-A684-45F9-B61A-25839C64F503} + {413A4292-C2B3-4096-94CF-D6F607C20939} = {058F6BE3-A684-45F9-B61A-25839C64F503} + {CD889154-4EA5-61D3-9FF4-E15F7B3D3573} = {FFF7D07F-BA9F-4129-B7AD-99861E11F05E} + {9A468603-1310-7434-3A2B-4528DA8221C6} = {17796DCC-760D-4AD7-BCA9-EFE4801B9044} + {69A075E7-D924-59C6-0BF2-17A09201DDF3} = {413A4292-C2B3-4096-94CF-D6F607C20939} + {D1DF309F-40DE-9C0E-A78B-2648544B77D2} = {DA2EC9AF-E2D9-4B8D-8EC3-CC65CFD3B974} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {C584421D-5EC0-4821-B7D8-2633D8D405F2} diff --git a/Tests/Distribution/ConcurrentAttach/Client/Esiur.Tests.ConcurrentAttach.Client.csproj b/Tests/Distribution/ConcurrentAttach/Client/Esiur.Tests.ConcurrentAttach.Client.csproj new file mode 100644 index 0000000..ed9781c --- /dev/null +++ b/Tests/Distribution/ConcurrentAttach/Client/Esiur.Tests.ConcurrentAttach.Client.csproj @@ -0,0 +1,10 @@ + + + + Exe + net10.0 + enable + enable + + + diff --git a/Tests/Distribution/ConcurrentAttach/Client/Program.cs b/Tests/Distribution/ConcurrentAttach/Client/Program.cs new file mode 100644 index 0000000..48b699a --- /dev/null +++ b/Tests/Distribution/ConcurrentAttach/Client/Program.cs @@ -0,0 +1,181 @@ +// ============================================================ +// Test 3: Concurrent Attachments — COMBINED (server + clients +// in the same process for local stress testing, or run the +// server section separately for multi-machine testing). +// +// Fires N concurrent Warehouse.Get calls simultaneously and +// measures: +// - Time for all proxies to reach Ready state +// - Whether any attachments fail or deadlock (timeout) +// - Distribution of per-attachment latency +// +// This directly stress-tests Algorithm 1 (FETCH-RESOURCE) and +// the parallel deadlock detection mechanism from Section V.D. +// +// Usage (single process): dotnet run -- --mode both --concurrent 50 --resources 200 +// Usage (server only): dotnet run -- --mode server --resources 200 --port 10902 +// Usage (client only): dotnet run -- --mode client --host 127.0.0.1 --concurrent 50 --resources 200 +// ============================================================ + +using Esiur.Resource; +using Esiur.Stores; +using Esiur.Net.IIP; +using System.Diagnostics; + +var mode = GetArg(args, "--mode", "both"); +var host = GetArg(args, "--host", "127.0.0.1"); +var port = int.Parse(GetArg(args, "--port", "10902")); +var concurrent = int.Parse(GetArg(args, "--concurrent", "50")); +var resources = int.Parse(GetArg(args, "--resources", "200")); +var timeoutMs = int.Parse(GetArg(args, "--timeout", "10000")); +var rounds = int.Parse(GetArg(args, "--rounds", "5")); + +// ---------------------------------------------------------------- +// SERVER SIDE +// ---------------------------------------------------------------- +if (mode == "server" || mode == "both") +{ + await Warehouse.Put("sys", new MemoryStore()); + await Warehouse.Put("sys/server", new DistributedServer() { Port = (ushort)port }); + + for (int i = 0; i < resources; i++) + { + await Warehouse.Put($"sys/sensor_{i}", new SensorResource { SensorId = i, Value = i }); + } + + await Warehouse.Open(); + Console.WriteLine($"[Server-T3] Ready: {resources} resources on port {port}"); + + if (mode == "server") + { + Console.WriteLine("Press ENTER to stop."); + Console.ReadLine(); + await Warehouse.Close(); + return; + } + + // Give server a moment to fully initialise before client fires + await Task.Delay(500); +} + +// ---------------------------------------------------------------- +// CLIENT SIDE +// ---------------------------------------------------------------- +Console.WriteLine($"[Client-T3] concurrent={concurrent} resources={resources} rounds={rounds}"); + +var roundResults = new List(); + +for (int round = 0; round < rounds; round++) +{ + Console.WriteLine($"\n[Client-T3] Round {round + 1}/{rounds}"); + + // Pick `concurrent` random resource indices (may overlap — intentional, + // because overlapping triggers the "already in progress" path of Algorithm 1) + var rng = new Random(round); + var targets = Enumerable.Range(0, concurrent) + .Select(_ => rng.Next(resources)) + .ToArray(); + + long succeeded = 0, failed = 0, timedOut = 0; + var latencies = new double[concurrent]; + + var roundSw = Stopwatch.StartNew(); + + // Fire all attachments simultaneously + var tasks = targets.Select((resourceIdx, taskIdx) => Task.Run(async () => + { + var sw = Stopwatch.StartNew(); + using var cts = new CancellationTokenSource(timeoutMs); + try + { + var proxy = await Warehouse.Get( + $"iip://{host}:{port}/sys/sensor_{resourceIdx}"); + + sw.Stop(); + latencies[taskIdx] = sw.Elapsed.TotalMilliseconds; + + if (proxy != null) + Interlocked.Increment(ref succeeded); + else + Interlocked.Increment(ref failed); + } + catch (OperationCanceledException) + { + sw.Stop(); + latencies[taskIdx] = timeoutMs; + Interlocked.Increment(ref timedOut); + Console.WriteLine($" [!] Timeout on sensor_{resourceIdx} after {timeoutMs}ms"); + } + catch (Exception ex) + { + sw.Stop(); + latencies[taskIdx] = sw.Elapsed.TotalMilliseconds; + Interlocked.Increment(ref failed); + Console.WriteLine($" [!] Error on sensor_{resourceIdx}: {ex.Message}"); + } + })).ToArray(); + + await Task.WhenAll(tasks); + roundSw.Stop(); + + var sorted = latencies.OrderBy(x => x).ToArray(); + int n = sorted.Length; + + var result = new RoundResult + { + Round = round + 1, + Concurrent = concurrent, + Succeeded = succeeded, + Failed = failed, + TimedOut = timedOut, + TotalMs = roundSw.Elapsed.TotalMilliseconds, + MinMs = sorted[0], + P50Ms = sorted[(int)(n * 0.50)], + P95Ms = sorted[(int)(n * 0.95)], + P99Ms = sorted[(int)(n * 0.99)], + MaxMs = sorted[n - 1], + MeanMs = sorted.Average() + }; + roundResults.Add(result); + + Console.WriteLine($" succeeded={succeeded}/{concurrent} failed={failed} timedOut={timedOut}"); + Console.WriteLine($" total_wall={result.TotalMs:F0}ms"); + Console.WriteLine($" latency: min={result.MinMs:F1} p50={result.P50Ms:F1} p95={result.P95Ms:F1} " + + $"p99={result.P99Ms:F1} max={result.MaxMs:F1} mean={result.MeanMs:F1} (ms)"); + + // Release all proxies before next round to reset attachment state + GC.Collect(); + await Task.Delay(1000); +} + +// ---------------------------------------------------------------- +// CSV output +// ---------------------------------------------------------------- +var csv = "round,concurrent,succeeded,failed,timed_out,total_wall_ms,min_ms,p50_ms,p95_ms,p99_ms,max_ms,mean_ms\n" + + string.Join("\n", roundResults.Select(r => + $"{r.Round},{r.Concurrent},{r.Succeeded},{r.Failed},{r.TimedOut}," + + $"{r.TotalMs:F1},{r.MinMs:F2},{r.P50Ms:F2},{r.P95Ms:F2},{r.P99Ms:F2},{r.MaxMs:F2},{r.MeanMs:F2}")); + +await File.WriteAllTextAsync("test3_concurrent_attach.csv", csv); +Console.WriteLine("\n[Client-T3] Results written to test3_concurrent_attach.csv"); + +if (mode == "both") + await Warehouse.Close(); + + +// ---------------------------------------------------------------- +// Helpers +// ---------------------------------------------------------------- +static string GetArg(string[] args, string key, string def) +{ + int i = Array.IndexOf(args, key); + return (i >= 0 && i + 1 < args.Length) ? args[i + 1] : def; +} + +record RoundResult +{ + public int Round; + public int Concurrent; + public long Succeeded, Failed, TimedOut; + public double TotalMs, MinMs, P50Ms, P95Ms, P99Ms, MaxMs, MeanMs; +} diff --git a/Tests/Distribution/ConcurrentAttach/Server/Esiur.Tests.ConcurrentAttach.Server.csproj b/Tests/Distribution/ConcurrentAttach/Server/Esiur.Tests.ConcurrentAttach.Server.csproj new file mode 100644 index 0000000..ed9781c --- /dev/null +++ b/Tests/Distribution/ConcurrentAttach/Server/Esiur.Tests.ConcurrentAttach.Server.csproj @@ -0,0 +1,10 @@ + + + + Exe + net10.0 + enable + enable + + + diff --git a/Tests/Distribution/ConcurrentAttach/Server/Program.cs b/Tests/Distribution/ConcurrentAttach/Server/Program.cs new file mode 100644 index 0000000..48b699a --- /dev/null +++ b/Tests/Distribution/ConcurrentAttach/Server/Program.cs @@ -0,0 +1,181 @@ +// ============================================================ +// Test 3: Concurrent Attachments — COMBINED (server + clients +// in the same process for local stress testing, or run the +// server section separately for multi-machine testing). +// +// Fires N concurrent Warehouse.Get calls simultaneously and +// measures: +// - Time for all proxies to reach Ready state +// - Whether any attachments fail or deadlock (timeout) +// - Distribution of per-attachment latency +// +// This directly stress-tests Algorithm 1 (FETCH-RESOURCE) and +// the parallel deadlock detection mechanism from Section V.D. +// +// Usage (single process): dotnet run -- --mode both --concurrent 50 --resources 200 +// Usage (server only): dotnet run -- --mode server --resources 200 --port 10902 +// Usage (client only): dotnet run -- --mode client --host 127.0.0.1 --concurrent 50 --resources 200 +// ============================================================ + +using Esiur.Resource; +using Esiur.Stores; +using Esiur.Net.IIP; +using System.Diagnostics; + +var mode = GetArg(args, "--mode", "both"); +var host = GetArg(args, "--host", "127.0.0.1"); +var port = int.Parse(GetArg(args, "--port", "10902")); +var concurrent = int.Parse(GetArg(args, "--concurrent", "50")); +var resources = int.Parse(GetArg(args, "--resources", "200")); +var timeoutMs = int.Parse(GetArg(args, "--timeout", "10000")); +var rounds = int.Parse(GetArg(args, "--rounds", "5")); + +// ---------------------------------------------------------------- +// SERVER SIDE +// ---------------------------------------------------------------- +if (mode == "server" || mode == "both") +{ + await Warehouse.Put("sys", new MemoryStore()); + await Warehouse.Put("sys/server", new DistributedServer() { Port = (ushort)port }); + + for (int i = 0; i < resources; i++) + { + await Warehouse.Put($"sys/sensor_{i}", new SensorResource { SensorId = i, Value = i }); + } + + await Warehouse.Open(); + Console.WriteLine($"[Server-T3] Ready: {resources} resources on port {port}"); + + if (mode == "server") + { + Console.WriteLine("Press ENTER to stop."); + Console.ReadLine(); + await Warehouse.Close(); + return; + } + + // Give server a moment to fully initialise before client fires + await Task.Delay(500); +} + +// ---------------------------------------------------------------- +// CLIENT SIDE +// ---------------------------------------------------------------- +Console.WriteLine($"[Client-T3] concurrent={concurrent} resources={resources} rounds={rounds}"); + +var roundResults = new List(); + +for (int round = 0; round < rounds; round++) +{ + Console.WriteLine($"\n[Client-T3] Round {round + 1}/{rounds}"); + + // Pick `concurrent` random resource indices (may overlap — intentional, + // because overlapping triggers the "already in progress" path of Algorithm 1) + var rng = new Random(round); + var targets = Enumerable.Range(0, concurrent) + .Select(_ => rng.Next(resources)) + .ToArray(); + + long succeeded = 0, failed = 0, timedOut = 0; + var latencies = new double[concurrent]; + + var roundSw = Stopwatch.StartNew(); + + // Fire all attachments simultaneously + var tasks = targets.Select((resourceIdx, taskIdx) => Task.Run(async () => + { + var sw = Stopwatch.StartNew(); + using var cts = new CancellationTokenSource(timeoutMs); + try + { + var proxy = await Warehouse.Get( + $"iip://{host}:{port}/sys/sensor_{resourceIdx}"); + + sw.Stop(); + latencies[taskIdx] = sw.Elapsed.TotalMilliseconds; + + if (proxy != null) + Interlocked.Increment(ref succeeded); + else + Interlocked.Increment(ref failed); + } + catch (OperationCanceledException) + { + sw.Stop(); + latencies[taskIdx] = timeoutMs; + Interlocked.Increment(ref timedOut); + Console.WriteLine($" [!] Timeout on sensor_{resourceIdx} after {timeoutMs}ms"); + } + catch (Exception ex) + { + sw.Stop(); + latencies[taskIdx] = sw.Elapsed.TotalMilliseconds; + Interlocked.Increment(ref failed); + Console.WriteLine($" [!] Error on sensor_{resourceIdx}: {ex.Message}"); + } + })).ToArray(); + + await Task.WhenAll(tasks); + roundSw.Stop(); + + var sorted = latencies.OrderBy(x => x).ToArray(); + int n = sorted.Length; + + var result = new RoundResult + { + Round = round + 1, + Concurrent = concurrent, + Succeeded = succeeded, + Failed = failed, + TimedOut = timedOut, + TotalMs = roundSw.Elapsed.TotalMilliseconds, + MinMs = sorted[0], + P50Ms = sorted[(int)(n * 0.50)], + P95Ms = sorted[(int)(n * 0.95)], + P99Ms = sorted[(int)(n * 0.99)], + MaxMs = sorted[n - 1], + MeanMs = sorted.Average() + }; + roundResults.Add(result); + + Console.WriteLine($" succeeded={succeeded}/{concurrent} failed={failed} timedOut={timedOut}"); + Console.WriteLine($" total_wall={result.TotalMs:F0}ms"); + Console.WriteLine($" latency: min={result.MinMs:F1} p50={result.P50Ms:F1} p95={result.P95Ms:F1} " + + $"p99={result.P99Ms:F1} max={result.MaxMs:F1} mean={result.MeanMs:F1} (ms)"); + + // Release all proxies before next round to reset attachment state + GC.Collect(); + await Task.Delay(1000); +} + +// ---------------------------------------------------------------- +// CSV output +// ---------------------------------------------------------------- +var csv = "round,concurrent,succeeded,failed,timed_out,total_wall_ms,min_ms,p50_ms,p95_ms,p99_ms,max_ms,mean_ms\n" + + string.Join("\n", roundResults.Select(r => + $"{r.Round},{r.Concurrent},{r.Succeeded},{r.Failed},{r.TimedOut}," + + $"{r.TotalMs:F1},{r.MinMs:F2},{r.P50Ms:F2},{r.P95Ms:F2},{r.P99Ms:F2},{r.MaxMs:F2},{r.MeanMs:F2}")); + +await File.WriteAllTextAsync("test3_concurrent_attach.csv", csv); +Console.WriteLine("\n[Client-T3] Results written to test3_concurrent_attach.csv"); + +if (mode == "both") + await Warehouse.Close(); + + +// ---------------------------------------------------------------- +// Helpers +// ---------------------------------------------------------------- +static string GetArg(string[] args, string key, string def) +{ + int i = Array.IndexOf(args, key); + return (i >= 0 && i + 1 < args.Length) ? args[i + 1] : def; +} + +record RoundResult +{ + public int Round; + public int Concurrent; + public long Succeeded, Failed, TimedOut; + public double TotalMs, MinMs, P50Ms, P95Ms, P99Ms, MaxMs, MeanMs; +} diff --git a/Tests/Distribution/NodeFanout/Client/Esiur.Tests.NodeFanout.Client.csproj b/Tests/Distribution/NodeFanout/Client/Esiur.Tests.NodeFanout.Client.csproj new file mode 100644 index 0000000..ed9781c --- /dev/null +++ b/Tests/Distribution/NodeFanout/Client/Esiur.Tests.NodeFanout.Client.csproj @@ -0,0 +1,10 @@ + + + + Exe + net10.0 + enable + enable + + + diff --git a/Tests/Distribution/NodeFanout/Client/Program.cs b/Tests/Distribution/NodeFanout/Client/Program.cs new file mode 100644 index 0000000..b774940 --- /dev/null +++ b/Tests/Distribution/NodeFanout/Client/Program.cs @@ -0,0 +1,114 @@ +// ============================================================ +// Test 1: Node Fan-Out — CLIENT NODE +// Connects to the server, attaches to all sensor resources, +// and counts received property-change notifications. +// +// Run N instances of this client simultaneously to simulate +// N subscriber nodes. The server's fan-out load grows with N. +// +// Usage: dotnet run -- --host 127.0.0.1 --port 10900 --resources 100 --duration 30 +// ============================================================ + +using Esiur.Resource; +using System.Diagnostics; + +var host = GetArg(args, "--host", "127.0.0.1"); +var port = int.Parse(GetArg(args, "--port", "10900")); +var resourceCount = int.Parse(GetArg(args, "--resources", "100")); +var durationSec = int.Parse(GetArg(args, "--duration", "30")); +var clientId = GetArg(args, "--id", Environment.MachineName); + +Console.WriteLine($"[Client {clientId}] Connecting to {host}:{port}, resources={resourceCount}, duration={durationSec}s"); + +// Counters +long totalReceived = 0; +long lateCount = 0; // notifications arriving > 500ms after the previous +double sumLatencyMs = 0; +long latencySamples = 0; + +var latencyLock = new object(); + +// --- Attach all resources ------------------------------------------- +var proxies = new dynamic[resourceCount]; +var sw = Stopwatch.StartNew(); + +try +{ + for (int i = 0; i < resourceCount; i++) + { + proxies[i] = await Warehouse.Get($"iip://{host}:{port}/sys/sensor_{i}"); + + // Subscribe to property change notifications via the Esiur event model + double lastValue = (double)proxies[i].Value; + long lastTick = Stopwatch.GetTimestamp(); + int capturedI = i; + + proxies[i].OnPropertyModified += (string propName, object oldVal, object newVal) => + { + if (propName != "Value") return; + + long nowTick = Stopwatch.GetTimestamp(); + double elapsedMs = (nowTick - lastTick) * 1000.0 / Stopwatch.Frequency; + lastTick = nowTick; + + Interlocked.Increment(ref totalReceived); + + lock (latencyLock) + { + sumLatencyMs += elapsedMs; + latencySamples++; + if (elapsedMs > 500) lateCount++; + } + }; + } + + double attachTime = sw.Elapsed.TotalSeconds; + Console.WriteLine($"[Client {clientId}] All {resourceCount} resources attached in {attachTime:F2}s"); +} +catch (Exception ex) +{ + Console.WriteLine($"[Client {clientId}] Attach error: {ex.Message}"); + return; +} + +// --- Measurement window --------------------------------------------- +sw.Restart(); +long lastReceived = 0; +var results = new List<(double TimeSec, long ReceivedDelta, double AvgIntervalMs)>(); + +while (sw.Elapsed.TotalSeconds < durationSec) +{ + await Task.Delay(5000); + + long delta = totalReceived - lastReceived; + lastReceived = totalReceived; + + double avgInterval; + lock (latencyLock) + { + avgInterval = latencySamples > 0 ? sumLatencyMs / latencySamples : 0; + sumLatencyMs = 0; + latencySamples = 0; + } + + double t = sw.Elapsed.TotalSeconds; + results.Add((t, delta, avgInterval)); + Console.WriteLine($"[Client {clientId}] t={t:F0}s recv/5s={delta} rate={delta/5.0:F1}/s avg_interval={avgInterval:F1}ms late={lateCount}"); +} + +// --- CSV output ----------------------------------------------------- +string csv = $"time_s,received_per_5s,rate_per_s,avg_interval_ms\n" + + string.Join("\n", results.Select(r => + $"{r.TimeSec:F1},{r.ReceivedDelta},{r.ReceivedDelta/5.0:F1},{r.AvgIntervalMs:F2}")); + +string outFile = $"client_{clientId}_results.csv"; +await File.WriteAllTextAsync(outFile, csv); +Console.WriteLine($"[Client {clientId}] Results written to {outFile}"); +Console.WriteLine($"[Client {clientId}] Total received={totalReceived} late(>500ms)={lateCount}"); + + +static string GetArg(string[] args, string key, string def) +{ + int i = Array.IndexOf(args, key); + return (i >= 0 && i + 1 < args.Length) ? args[i + 1] : def; +} diff --git a/Tests/Distribution/NodeFanout/Server/Esiur.Tests.NodeFanout.Server.csproj b/Tests/Distribution/NodeFanout/Server/Esiur.Tests.NodeFanout.Server.csproj new file mode 100644 index 0000000..ed9781c --- /dev/null +++ b/Tests/Distribution/NodeFanout/Server/Esiur.Tests.NodeFanout.Server.csproj @@ -0,0 +1,10 @@ + + + + Exe + net10.0 + enable + enable + + + diff --git a/Tests/Distribution/NodeFanout/Server/Program.cs b/Tests/Distribution/NodeFanout/Server/Program.cs new file mode 100644 index 0000000..05d0304 --- /dev/null +++ b/Tests/Distribution/NodeFanout/Server/Program.cs @@ -0,0 +1,79 @@ +// ============================================================ +// Test 1: Node Fan-Out — SERVER NODE +// One server hosts N resources. Clients attach and subscribe. +// The server emits property updates at a fixed rate and measures +// notification throughput vs. subscriber count. +// ============================================================ +// Usage: dotnet run -- --resources 100 --interval 50 +// ============================================================ + +using Esiur.Resource; +using Esiur.Stores; +using Esiur.Net.IIP; +using System.Diagnostics; + +var resourceCount = int.Parse(GetArg(args, "--resources", "100")); +var intervalMs = int.Parse(GetArg(args, "--interval", "50")); +var port = int.Parse(GetArg(args, "--port", "10900")); + +Console.WriteLine($"[Server] resources={resourceCount} interval={intervalMs}ms port={port}"); + +// --- Warehouse setup ------------------------------------------------- +await Warehouse.Put("sys", new MemoryStore()); +await Warehouse.Put("sys/server", new DistributedServer() { Port = (ushort)port }); + +// Create and register all sensor resources +var sensors = new SensorResource[resourceCount]; +for (int i = 0; i < resourceCount; i++) +{ + sensors[i] = new SensorResource { SensorId = i }; + await Warehouse.Put($"sys/sensor_{i}", sensors[i]); +} + +await Warehouse.Open(); +Console.WriteLine($"[Server] Listening on port {port} with {resourceCount} resources."); + +// --- Emit loop ------------------------------------------------------- +// Continuously update all resource properties at the given interval. +// This drives property-change notifications to all attached clients. +long totalEmitted = 0; +var sw = Stopwatch.StartNew(); + +_ = Task.Run(async () => +{ + while (true) + { + await Task.Delay(intervalMs); + + double value = sw.Elapsed.TotalSeconds; + foreach (var s in sensors) + s.Value = value; // triggers PropertyModified → propagate to peers + + totalEmitted += resourceCount; + } +}); + +// --- Stats reporter -------------------------------------------------- +_ = Task.Run(async () => +{ + long lastEmitted = 0; + while (true) + { + await Task.Delay(5000); + long delta = totalEmitted - lastEmitted; + lastEmitted = totalEmitted; + Console.WriteLine($"[Server] {DateTime.Now:HH:mm:ss} emitted/5s={delta} rate={delta/5.0:F0}/s"); + } +}); + +Console.WriteLine("Press ENTER to stop."); +Console.ReadLine(); +await Warehouse.Close(); + + +// --- Helpers --------------------------------------------------------- +static string GetArg(string[] args, string key, string def) +{ + int i = Array.IndexOf(args, key); + return (i >= 0 && i + 1 < args.Length) ? args[i + 1] : def; +} diff --git a/Tests/Distribution/NodeFanout/Server/SensorResource.cs b/Tests/Distribution/NodeFanout/Server/SensorResource.cs new file mode 100644 index 0000000..de25dae --- /dev/null +++ b/Tests/Distribution/NodeFanout/Server/SensorResource.cs @@ -0,0 +1,24 @@ +using Esiur.Resource; + +/// +/// A simple observable sensor resource. +/// Property changes are automatically propagated to all attached peers. +/// +[Resource] +public class SensorResource : Resource +{ + public int SensorId { get; set; } + + private double _value; + + [ResourceProperty] + public double Value + { + get => _value; + set + { + _value = value; + PropertyModified("Value"); // notifies Esiur runtime to propagate + } + } +} diff --git a/Tests/Distribution/ResourceCount/Client/Esiur.Tests.ResourceCount.Client.csproj b/Tests/Distribution/ResourceCount/Client/Esiur.Tests.ResourceCount.Client.csproj new file mode 100644 index 0000000..ed9781c --- /dev/null +++ b/Tests/Distribution/ResourceCount/Client/Esiur.Tests.ResourceCount.Client.csproj @@ -0,0 +1,10 @@ + + + + Exe + net10.0 + enable + enable + + + diff --git a/Tests/Distribution/ResourceCount/Client/Program.cs b/Tests/Distribution/ResourceCount/Client/Program.cs new file mode 100644 index 0000000..30c5239 --- /dev/null +++ b/Tests/Distribution/ResourceCount/Client/Program.cs @@ -0,0 +1,97 @@ +// ============================================================ +// Test 2: Resource Count Scalability — CLIENT NODE +// Sequentially attaches to each resource on the server and +// records per-resource attach latency. Reports p50/p95/p99. +// +// Also tests notification latency after all resources are ready. +// +// Usage: dotnet run -- --host 127.0.0.1 --port 10901 --resources 10000 +// ============================================================ + +using Esiur.Resource; +using System.Diagnostics; + +var host = GetArg(args, "--host", "127.0.0.1"); +var port = int.Parse(GetArg(args, "--port", "10901")); +var resourceCount = int.Parse(GetArg(args, "--resources", "10000")); +var batchSize = int.Parse(GetArg(args, "--batch", "100")); + +Console.WriteLine($"[Client-T2] Connecting to {host}:{port}, resources={resourceCount}"); + +var attachLatencies = new List(resourceCount); +var proxies = new dynamic[resourceCount]; + +// --- Attach in batches to avoid overwhelming the runtime ------------- +var totalSw = Stopwatch.StartNew(); + +for (int batch = 0; batch < resourceCount; batch += batchSize) +{ + int end = Math.Min(batch + batchSize, resourceCount); + var batchTasks = new Task[end - batch]; + + for (int i = batch; i < end; i++) + { + int capturedI = i; + batchTasks[i - batch] = Task.Run(async () => + { + var sw = Stopwatch.StartNew(); + proxies[capturedI] = await Warehouse.Get( + $"iip://{host}:{port}/sys/sensor_{capturedI}"); + sw.Stop(); + + lock (attachLatencies) + attachLatencies.Add(sw.Elapsed.TotalMilliseconds); + }); + } + + await Task.WhenAll(batchTasks); + + if (batch % 1000 == 0) + Console.WriteLine($"[Client-T2] Attached {Math.Min(batch + batchSize, resourceCount)}/{resourceCount} " + + $"elapsed={totalSw.Elapsed.TotalSeconds:F1}s"); +} + +totalSw.Stop(); +Console.WriteLine($"[Client-T2] All attached in {totalSw.Elapsed.TotalSeconds:F2}s"); + +// --- Latency statistics --------------------------------------------- +attachLatencies.Sort(); +int n = attachLatencies.Count; + +Console.WriteLine($"[Client-T2] Attach latency (ms):"); +Console.WriteLine($" min={attachLatencies[0]:F2}"); +Console.WriteLine($" p50={attachLatencies[(int)(n*0.50)]:F2}"); +Console.WriteLine($" p95={attachLatencies[(int)(n*0.95)]:F2}"); +Console.WriteLine($" p99={attachLatencies[(int)(n*0.99)]:F2}"); +Console.WriteLine($" max={attachLatencies[n-1]:F2}"); +Console.WriteLine($" mean={attachLatencies.Average():F2}"); + +// --- Notification round-trip after full load ------------------------ +Console.WriteLine("[Client-T2] Measuring notification latency under full resource load..."); +long received = 0; +double sumLatencyMs = 0; + +for (int i = 0; i < Math.Min(500, resourceCount); i++) +{ + int capturedI = i; + proxies[capturedI].OnPropertyModified += (string propName, object oldVal, object newVal) => + { + if (propName == "Value") + Interlocked.Increment(ref received); + }; +} + +await Task.Delay(10000); // observe for 10s +Console.WriteLine($"[Client-T2] Received {received} notifications in 10s from first 500 resources"); + +// --- CSV output ----------------------------------------------------- +string csv = "attach_latency_ms\n" + string.Join("\n", attachLatencies.Select(l => l.ToString("F3"))); +await File.WriteAllTextAsync("test2_attach_latencies.csv", csv); +Console.WriteLine("[Client-T2] Attach latencies written to test2_attach_latencies.csv"); + + +static string GetArg(string[] args, string key, string def) +{ + int i = Array.IndexOf(args, key); + return (i >= 0 && i + 1 < args.Length) ? args[i + 1] : def; +} diff --git a/Tests/Distribution/ResourceCount/Server/Esiur.Tests.ResourceCount.Server.csproj b/Tests/Distribution/ResourceCount/Server/Esiur.Tests.ResourceCount.Server.csproj new file mode 100644 index 0000000..ed9781c --- /dev/null +++ b/Tests/Distribution/ResourceCount/Server/Esiur.Tests.ResourceCount.Server.csproj @@ -0,0 +1,10 @@ + + + + Exe + net10.0 + enable + enable + + + diff --git a/Tests/Distribution/ResourceCount/Server/Program.cs b/Tests/Distribution/ResourceCount/Server/Program.cs new file mode 100644 index 0000000..08d2cba --- /dev/null +++ b/Tests/Distribution/ResourceCount/Server/Program.cs @@ -0,0 +1,45 @@ +// ============================================================ +// Test 2: Resource Count Scalability — SERVER NODE +// Hosts an increasing number of resources to test warehouse +// lookup overhead and memory footprint as the store grows. +// +// Usage: dotnet run -- --resources 10000 --port 10901 +// ============================================================ + +using Esiur.Resource; +using Esiur.Stores; +using Esiur.Net.IIP; + +var resourceCount = int.Parse(GetArg(args, "--resources", "10000")); +var port = int.Parse(GetArg(args, "--port", "10901")); + +Console.WriteLine($"[Server-T2] Creating {resourceCount} resources on port {port}"); + +await Warehouse.Put("sys", new MemoryStore()); +await Warehouse.Put("sys/server", new DistributedServer() { Port = (ushort)port }); + +long memBefore = GC.GetTotalMemory(forceFullCollection: true); + +for (int i = 0; i < resourceCount; i++) +{ + var s = new SensorResource { SensorId = i, Value = i * 0.1 }; + await Warehouse.Put($"sys/sensor_{i}", s); +} + +await Warehouse.Open(); + +long memAfter = GC.GetTotalMemory(forceFullCollection: true); +double memMB = (memAfter - memBefore) / (1024.0 * 1024.0); + +Console.WriteLine($"[Server-T2] Ready. Resources={resourceCount} MemoryUsed={memMB:F1} MB"); +Console.WriteLine($"[Server-T2] Per-resource ≈ {(memAfter - memBefore) / (double)resourceCount:F0} bytes"); +Console.WriteLine("Press ENTER to stop."); +Console.ReadLine(); +await Warehouse.Close(); + + +static string GetArg(string[] args, string key, string def) +{ + int i = Array.IndexOf(args, key); + return (i >= 0 && i + 1 < args.Length) ? args[i + 1] : def; +}