diff --git a/Esiur.sln b/Esiur.sln
index 20c603f..ac5e84a 100644
--- a/Esiur.sln
+++ b/Esiur.sln
@@ -85,10 +85,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Server", "Server", "{57E806
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Client", "Client", "{21D42B96-99F9-4E48-A499-5170A5A9597F}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.NodeFanoutSweep.Server", "Tests\Distribution\NodeFanoutSweep\Server\Esiur.Tests.NodeFanoutSweep.Server.csproj", "{9FF626DF-1AD4-2BE1-F834-F49121D65085}"
-EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.NodeFanoutSweep.Client", "Tests\Distribution\NodeFanoutSweep\Client\Esiur.Tests.NodeFanoutSweep.Client.csproj", "{550A20AB-8E97-BCDD-9F54-27823663120A}"
-EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ConcurrentAttachSweep", "ConcurrentAttachSweep", "{E713D25F-2602-44C9-AB9E-C9477FB2BA93}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.ConcurrentAttachSweep", "Tests\Distribution\ConcurrentAttachSweep\Esiur.Tests.ConcurrentAttachSweep.csproj", "{3FFB2BF4-159E-3073-4BDF-08AE93C7A2C1}"
@@ -105,6 +101,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Server", "Server", "{967F62
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.Deadlock.Server", "Tests\Distribution\Deadlock\Server\Esiur.Tests.Deadlock.Server.csproj", "{F2FE7C0B-58C1-D768-C37A-D428D2B85940}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.NodeFanoutSweep.Client", "Tests\Distribution\NodeFanoutSweep\Client\Esiur.Tests.NodeFanoutSweep.Client.csproj", "{550A20AB-8E97-BCDD-9F54-27823663120A}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.NodeFanoutSweep.Server", "Tests\Distribution\NodeFanoutSweep\Server\Esiur.Tests.NodeFanoutSweep.Server.csproj", "{9FF626DF-1AD4-2BE1-F834-F49121D65085}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -331,30 +331,6 @@ Global
{7FD57668-2AD8-0F53-4006-03927B5A385C}.Release|x64.Build.0 = Release|Any CPU
{7FD57668-2AD8-0F53-4006-03927B5A385C}.Release|x86.ActiveCfg = Release|Any CPU
{7FD57668-2AD8-0F53-4006-03927B5A385C}.Release|x86.Build.0 = Release|Any CPU
- {9FF626DF-1AD4-2BE1-F834-F49121D65085}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {9FF626DF-1AD4-2BE1-F834-F49121D65085}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {9FF626DF-1AD4-2BE1-F834-F49121D65085}.Debug|x64.ActiveCfg = Debug|Any CPU
- {9FF626DF-1AD4-2BE1-F834-F49121D65085}.Debug|x64.Build.0 = Debug|Any CPU
- {9FF626DF-1AD4-2BE1-F834-F49121D65085}.Debug|x86.ActiveCfg = Debug|Any CPU
- {9FF626DF-1AD4-2BE1-F834-F49121D65085}.Debug|x86.Build.0 = Debug|Any CPU
- {9FF626DF-1AD4-2BE1-F834-F49121D65085}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {9FF626DF-1AD4-2BE1-F834-F49121D65085}.Release|Any CPU.Build.0 = Release|Any CPU
- {9FF626DF-1AD4-2BE1-F834-F49121D65085}.Release|x64.ActiveCfg = Release|Any CPU
- {9FF626DF-1AD4-2BE1-F834-F49121D65085}.Release|x64.Build.0 = Release|Any CPU
- {9FF626DF-1AD4-2BE1-F834-F49121D65085}.Release|x86.ActiveCfg = Release|Any CPU
- {9FF626DF-1AD4-2BE1-F834-F49121D65085}.Release|x86.Build.0 = Release|Any CPU
- {550A20AB-8E97-BCDD-9F54-27823663120A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {550A20AB-8E97-BCDD-9F54-27823663120A}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {550A20AB-8E97-BCDD-9F54-27823663120A}.Debug|x64.ActiveCfg = Debug|Any CPU
- {550A20AB-8E97-BCDD-9F54-27823663120A}.Debug|x64.Build.0 = Debug|Any CPU
- {550A20AB-8E97-BCDD-9F54-27823663120A}.Debug|x86.ActiveCfg = Debug|Any CPU
- {550A20AB-8E97-BCDD-9F54-27823663120A}.Debug|x86.Build.0 = Debug|Any CPU
- {550A20AB-8E97-BCDD-9F54-27823663120A}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {550A20AB-8E97-BCDD-9F54-27823663120A}.Release|Any CPU.Build.0 = Release|Any CPU
- {550A20AB-8E97-BCDD-9F54-27823663120A}.Release|x64.ActiveCfg = Release|Any CPU
- {550A20AB-8E97-BCDD-9F54-27823663120A}.Release|x64.Build.0 = Release|Any CPU
- {550A20AB-8E97-BCDD-9F54-27823663120A}.Release|x86.ActiveCfg = Release|Any CPU
- {550A20AB-8E97-BCDD-9F54-27823663120A}.Release|x86.Build.0 = Release|Any CPU
{3FFB2BF4-159E-3073-4BDF-08AE93C7A2C1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{3FFB2BF4-159E-3073-4BDF-08AE93C7A2C1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3FFB2BF4-159E-3073-4BDF-08AE93C7A2C1}.Debug|x64.ActiveCfg = Debug|Any CPU
@@ -403,6 +379,30 @@ Global
{F2FE7C0B-58C1-D768-C37A-D428D2B85940}.Release|x64.Build.0 = Release|Any CPU
{F2FE7C0B-58C1-D768-C37A-D428D2B85940}.Release|x86.ActiveCfg = Release|Any CPU
{F2FE7C0B-58C1-D768-C37A-D428D2B85940}.Release|x86.Build.0 = Release|Any CPU
+ {550A20AB-8E97-BCDD-9F54-27823663120A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {550A20AB-8E97-BCDD-9F54-27823663120A}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {550A20AB-8E97-BCDD-9F54-27823663120A}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {550A20AB-8E97-BCDD-9F54-27823663120A}.Debug|x64.Build.0 = Debug|Any CPU
+ {550A20AB-8E97-BCDD-9F54-27823663120A}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {550A20AB-8E97-BCDD-9F54-27823663120A}.Debug|x86.Build.0 = Debug|Any CPU
+ {550A20AB-8E97-BCDD-9F54-27823663120A}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {550A20AB-8E97-BCDD-9F54-27823663120A}.Release|Any CPU.Build.0 = Release|Any CPU
+ {550A20AB-8E97-BCDD-9F54-27823663120A}.Release|x64.ActiveCfg = Release|Any CPU
+ {550A20AB-8E97-BCDD-9F54-27823663120A}.Release|x64.Build.0 = Release|Any CPU
+ {550A20AB-8E97-BCDD-9F54-27823663120A}.Release|x86.ActiveCfg = Release|Any CPU
+ {550A20AB-8E97-BCDD-9F54-27823663120A}.Release|x86.Build.0 = Release|Any CPU
+ {9FF626DF-1AD4-2BE1-F834-F49121D65085}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {9FF626DF-1AD4-2BE1-F834-F49121D65085}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {9FF626DF-1AD4-2BE1-F834-F49121D65085}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {9FF626DF-1AD4-2BE1-F834-F49121D65085}.Debug|x64.Build.0 = Debug|Any CPU
+ {9FF626DF-1AD4-2BE1-F834-F49121D65085}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {9FF626DF-1AD4-2BE1-F834-F49121D65085}.Debug|x86.Build.0 = Debug|Any CPU
+ {9FF626DF-1AD4-2BE1-F834-F49121D65085}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {9FF626DF-1AD4-2BE1-F834-F49121D65085}.Release|Any CPU.Build.0 = Release|Any CPU
+ {9FF626DF-1AD4-2BE1-F834-F49121D65085}.Release|x64.ActiveCfg = Release|Any CPU
+ {9FF626DF-1AD4-2BE1-F834-F49121D65085}.Release|x64.Build.0 = Release|Any CPU
+ {9FF626DF-1AD4-2BE1-F834-F49121D65085}.Release|x86.ActiveCfg = Release|Any CPU
+ {9FF626DF-1AD4-2BE1-F834-F49121D65085}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -443,8 +443,6 @@ Global
{33D973D8-4D3E-47BA-8135-FCA0CFF7E210} = {94C8CFDB-C7C6-40DF-A596-647FEEA3C917}
{57E80693-7AFC-4446-87DE-25E97C036E2F} = {33D973D8-4D3E-47BA-8135-FCA0CFF7E210}
{21D42B96-99F9-4E48-A499-5170A5A9597F} = {33D973D8-4D3E-47BA-8135-FCA0CFF7E210}
- {9FF626DF-1AD4-2BE1-F834-F49121D65085} = {57E80693-7AFC-4446-87DE-25E97C036E2F}
- {550A20AB-8E97-BCDD-9F54-27823663120A} = {21D42B96-99F9-4E48-A499-5170A5A9597F}
{E713D25F-2602-44C9-AB9E-C9477FB2BA93} = {94C8CFDB-C7C6-40DF-A596-647FEEA3C917}
{3FFB2BF4-159E-3073-4BDF-08AE93C7A2C1} = {E713D25F-2602-44C9-AB9E-C9477FB2BA93}
{D1B99C5A-82F7-459D-B56D-F8FD096D3854} = {2769C4C3-2595-413B-B7FE-5903826770C1}
@@ -453,6 +451,8 @@ Global
{28A7F758-951B-6502-6EA4-C216BA12F77C} = {22A76A25-333D-4516-8EA6-4D03E3023183}
{967F62B4-2815-473F-9F1E-E7F146EE8872} = {C5FB16A3-952C-4078-A15A-3C7CE42E73B5}
{F2FE7C0B-58C1-D768-C37A-D428D2B85940} = {967F62B4-2815-473F-9F1E-E7F146EE8872}
+ {550A20AB-8E97-BCDD-9F54-27823663120A} = {21D42B96-99F9-4E48-A499-5170A5A9597F}
+ {9FF626DF-1AD4-2BE1-F834-F49121D65085} = {57E80693-7AFC-4446-87DE-25E97C036E2F}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {C584421D-5EC0-4821-B7D8-2633D8D405F2}
diff --git a/Libraries/Esiur/Protocol/EpResource.cs b/Libraries/Esiur/Protocol/EpResource.cs
index f52e45e..4754b93 100644
--- a/Libraries/Esiur/Protocol/EpResource.cs
+++ b/Libraries/Esiur/Protocol/EpResource.cs
@@ -357,7 +357,7 @@ public class EpResource : DynamicObject, IResource, INotifyPropertyChanged, IDyn
throw new Exception("Trying to access a suspended object.");
- if (_status != ResourceStatus.Attached)
+ if (_status != ResourceStatus.Published)
{
result = null;
return false;
@@ -440,7 +440,7 @@ public class EpResource : DynamicObject, IResource, INotifyPropertyChanged, IDyn
result = null;
- if (_status != ResourceStatus.Attached)
+ if (_status != ResourceStatus.Published)
return false;
var pt = Instance.Definition.GetPropertyDefByName(binder.Name);
@@ -498,7 +498,7 @@ public class EpResource : DynamicObject, IResource, INotifyPropertyChanged, IDyn
if (_status == ResourceStatus.Suspended)
throw new Exception("Trying to access a suspended object.");
- if (_status != ResourceStatus.Attached)
+ if (_status != ResourceStatus.Published)
return false;
var pt = Instance.Definition.GetPropertyDefByName(binder.Name);
@@ -619,8 +619,8 @@ public class EpResource : DynamicObject, IResource, INotifyPropertyChanged, IDyn
if (_status == ResourceStatus.Suspended)
throw new Exception("Trying to access a suspended object.");
- if (_status != ResourceStatus.Attached)
- throw new Exception("Resource is not attached.");
+ if (_status != ResourceStatus.Published)
+ throw new Exception("Resource is not published.");
if (index >= _properties.Length)
throw new Exception("Property index not found."); ;
diff --git a/Tests/Distribution/ConcurrentAttach/Program.cs b/Tests/Distribution/ConcurrentAttach/Program.cs
index d5a335f..10a2724 100644
--- a/Tests/Distribution/ConcurrentAttach/Program.cs
+++ b/Tests/Distribution/ConcurrentAttach/Program.cs
@@ -37,7 +37,7 @@ var serverWh = new Warehouse();
if (mode == "server" || mode == "both")
{
await serverWh.Put("sys", new MemoryStore());
- await serverWh.Put("sys/server", new EpServer() { Port = (ushort)port });
+ await serverWh.Put("sys/server", new EpServer() { Port = (ushort)port, AllowUnauthorizedAccess = true });
for (int i = 0; i < resources; i++)
{
diff --git a/Tests/Distribution/ConcurrentAttachSweep/Program.cs b/Tests/Distribution/ConcurrentAttachSweep/Program.cs
index 0ef1f09..3b0a5fa 100644
--- a/Tests/Distribution/ConcurrentAttachSweep/Program.cs
+++ b/Tests/Distribution/ConcurrentAttachSweep/Program.cs
@@ -46,7 +46,7 @@ var clientWh = new Warehouse();
if (mode == "server" || mode == "both")
{
await serverWh.Put("sys", new MemoryStore());
- await serverWh.Put("sys/server", new EpServer() { Port = (ushort)port });
+ await serverWh.Put("sys/server", new EpServer() { Port = (ushort)port, AllowUnauthorizedAccess = true });
for (int i = 0; i < resources; i++)
{
diff --git a/Tests/Distribution/NodeFanout/Server/Program.cs b/Tests/Distribution/NodeFanout/Server/Program.cs
index 2ba9191..b6ee4cb 100644
--- a/Tests/Distribution/NodeFanout/Server/Program.cs
+++ b/Tests/Distribution/NodeFanout/Server/Program.cs
@@ -21,7 +21,7 @@ Console.WriteLine($"[Server] resources={resourceCount} interval={intervalMs}ms
var wh = new Warehouse();
// --- Warehouse setup -------------------------------------------------
await wh.Put("sys", new MemoryStore());
-var server = await wh.Put("sys/server", new EpServer() { Port = (ushort)port });
+var server = await wh.Put("sys/server", new EpServer() { Port = (ushort)port, AllowUnauthorizedAccess = true });
// Create and register all sensor resources
var sensors = new SensorResource[resourceCount];
diff --git a/Tests/Distribution/NodeFanoutSweep/Client/Esiur.Tests.NodeFanoutSweep.Client.csproj b/Tests/Distribution/NodeFanoutSweep/Client/Esiur.Tests.NodeFanoutSweep.Client.csproj
index 44e8917..634bddd 100644
--- a/Tests/Distribution/NodeFanoutSweep/Client/Esiur.Tests.NodeFanoutSweep.Client.csproj
+++ b/Tests/Distribution/NodeFanoutSweep/Client/Esiur.Tests.NodeFanoutSweep.Client.csproj
@@ -8,7 +8,7 @@
-
+
diff --git a/Tests/Distribution/NodeFanoutSweep/Client/Program.cs b/Tests/Distribution/NodeFanoutSweep/Client/Program.cs
index 063a36b..b03d372 100644
--- a/Tests/Distribution/NodeFanoutSweep/Client/Program.cs
+++ b/Tests/Distribution/NodeFanoutSweep/Client/Program.cs
@@ -1,77 +1,427 @@
-// ============================================================
-// Scalability Extension: Fan-Out — SERVER NODE
-// Hosts M sensor resources and emits Value updates at a fixed interval (the fan-out source). Also
-// hosts sys/control, updated once per second with the server process CPU (% across all cores) and
-// the live subscriber count, which the sweep orchestrator reads to characterise saturation.
-// Anonymous (None-mode) access so subscribers connect without credentials.
+// ============================================================
+// Scalability Extension: Fan-Out — ORCHESTRATOR CLIENT
+// ------------------------------------------------------------
+// Drives a full sweep of subscriber counts N against a single
+// server instance. For each N value:
+// 1. Spawns N in-process subscriber tasks, each opening its
+// own EpConnection to the server.
+// 2. Each subscriber attaches to all M resources and counts
+// property-change notifications it receives over a fixed
+// measurement window.
+// 3. The orchestrator polls the server's sys/control resource
+// to capture server-side CPU during the window.
+// 4. Tears down all N subscribers and waits a settle interval
+// before the next sweep point.
+// 5. Repeats for `replications` rounds so the per-N mean and
+// 95% confidence interval can be computed.
+// 6. Auto-stops the sweep if either:
+// - mean per-subscriber rate drops below 10% of theoretical,
+// - or server CPU stays at >180% (>90% of 2 cores) for the
+// entire measurement window.
//
-// Usage: dotnet run -- --port 10900 --resources 100 --emit-interval-ms 50
-// (Run the orchestrator from the sibling "Server" project against this host:port.)
+// Note on in-process vs separate processes: subscribers are
+// tasks within a single client process to keep the test self-
+// contained and avoid spawning N OS processes. Each task uses
+// its own EpConnection (TCP connection) to the server, so from
+// the server's perspective the load looks identical to N
+// distinct subscriber nodes for the property-propagation path.
+// The single-client-process design does mean that the client
+// host's CPU is shared across all subscribers; the orchestrator
+// records this too so degradation can be attributed correctly.
+// ------------------------------------------------------------
+// Usage:
+// dotnet run -- --host 127.0.0.1 --port 10900 --resources 100 \
+// --emit-interval-ms 50 --window-sec 60 \
+// --warmup-sec 5 --replications 3 \
+// --n-values 2,5,10,20,50,100,200,500
// ============================================================
using Esiur.Protocol;
using Esiur.Resource;
-using Esiur.Stores;
+using System.Data.Common;
using System.Diagnostics;
+using System.Globalization;
-var port = int.Parse(GetArg(args, "--port", "10900"));
-var resources = int.Parse(GetArg(args, "--resources", "100"));
+var host = GetArg(args, "--host", "127.0.0.1");
+var port = int.Parse(GetArg(args, "--port", "10900"));
+var resources = int.Parse(GetArg(args, "--resources", "100"));
var emitIntervalMs = int.Parse(GetArg(args, "--emit-interval-ms", "50"));
+var windowSec = int.Parse(GetArg(args, "--window-sec", "60"));
+var warmupSec = int.Parse(GetArg(args, "--warmup-sec", "5"));
+var settleSec = int.Parse(GetArg(args, "--settle-sec", "5"));
+var replications = int.Parse(GetArg(args, "--replications", "3"));
+var nValuesStr = GetArg(args, "--n-values", "2,5,10,20,50,100,200,500");
+var outputCsv = GetArg(args, "--output", "fanout_sweep_results.csv");
-Console.WriteLine($"[Server] resources={resources} emit-interval={emitIntervalMs}ms port={port} cores={Environment.ProcessorCount}");
+var nValues = nValuesStr.Split(',').Select(int.Parse).ToArray();
+double theoreticalMaxRate = 1000.0 / emitIntervalMs * resources;
+double minAcceptableRate = theoreticalMaxRate * 0.10;
-var wh = new Warehouse();
-await wh.Put("sys", new MemoryStore());
-var server = await wh.Put("sys/server", new EpServer { Port = (ushort)port, AllowUnauthorizedAccess = true });
+Console.WriteLine($"[Orchestrator] resources={resources} interval={emitIntervalMs}ms "
+ + $"window={windowSec}s replications={replications}");
+Console.WriteLine($"[Orchestrator] theoretical_max_per_subscriber_rate={theoreticalMaxRate:F0} notif/s");
+Console.WriteLine($"[Orchestrator] saturation_threshold={minAcceptableRate:F0} notif/s");
+Console.WriteLine($"[Orchestrator] N values: {string.Join(",", nValues)}");
-var sensors = new SensorResource[resources];
-for (var i = 0; i < resources; i++) { sensors[i] = new SensorResource { SensorId = i }; await wh.Put($"sys/sensor_{i}", sensors[i]); }
-
-var control = new ControlResource();
-await wh.Put("sys/control", control);
-
-await wh.Open();
-Console.WriteLine($"[Server] Listening on port {port} with {resources} sensors + sys/control. Press Ctrl+C to stop.");
-
-// Emit loop: drives property-change notifications to every attached subscriber.
-var sw = Stopwatch.StartNew();
-_ = Task.Run(async () =>
+// ----------------------------------------------------------------
+// Attach to the server's control resource once.
+// ----------------------------------------------------------------
+var controlWh = new Warehouse();
+EpResource? control = null;
+byte cpuIdx = 255, clientsIdx = 255;
+try
{
- while (true)
- {
- await Task.Delay(emitIntervalMs);
- var value = sw.Elapsed.TotalSeconds;
- foreach (var s in sensors) s.Value = value;
- }
-});
-
-// Telemetry loop: publish server CPU (% across all cores) and subscriber count once per second.
-_ = Task.Run(async () =>
+ var controlConn = await controlWh.Get($"ep://{host}:{port}");
+ control = (EpResource)await controlConn.Get("sys/control");
+ // Resolve property indices by name (EpResource exposes values by index, not dynamic member).
+ var props = control.Instance.Definition.Properties;
+ cpuIdx = (byte)Array.FindIndex(props, p => p.Name == "CpuPercent");
+ clientsIdx = (byte)Array.FindIndex(props, p => p.Name == "ConnectedClients");
+ Console.WriteLine($"[Orchestrator] sys/control attached (CpuPercent=idx {cpuIdx}, ConnectedClients=idx {clientsIdx}).");
+}
+catch (Exception ex)
{
- var proc = Process.GetCurrentProcess();
- var prevCpu = proc.TotalProcessorTime;
- var prevWall = DateTime.UtcNow;
- while (true)
- {
- await Task.Delay(1000);
- proc.Refresh();
- var nowCpu = proc.TotalProcessorTime;
- var nowWall = DateTime.UtcNow;
- var wallMs = (nowWall - prevWall).TotalMilliseconds;
- control.CpuPercent = wallMs > 0 ? (nowCpu - prevCpu).TotalMilliseconds / wallMs * 100.0 : 0;
- control.ConnectedClients = server.Connections.Count;
- prevCpu = nowCpu;
- prevWall = nowWall;
- }
-});
+ Console.WriteLine($"[Orchestrator] WARNING: could not attach to sys/control: {ex.Message}");
+ Console.WriteLine("[Orchestrator] Server CPU will be reported as N/A.");
+}
-var stop = new TaskCompletionSource();
-Console.CancelKeyPress += (_, e) => { e.Cancel = true; stop.TrySetResult(); };
-await stop.Task;
-await wh.Close();
+// ----------------------------------------------------------------
+// All sweep points x replications, with per-N early-stop logic.
+// ----------------------------------------------------------------
+var allResults = new List();
+bool saturatedDetected = false;
+
+foreach (int n in nValues)
+{
+ if (saturatedDetected)
+ {
+ Console.WriteLine($"\n[Orchestrator] N={n}: SKIPPED (saturation reached at lower N)");
+ continue;
+ }
+
+ var perRepResults = new List();
+
+ for (int rep = 0; rep < replications; rep++)
+ {
+ Console.WriteLine($"\n[Orchestrator] === N={n} rep={rep + 1}/{replications} ===");
+
+ var subscribers = new SubscriberTask[n];
+ var subscriberWhs = new Warehouse[n];
+
+ // ---------- spawn N subscribers ----------
+ Console.WriteLine($"[Orchestrator] Spawning {n} subscribers...");
+ var spawnSw = Stopwatch.StartNew();
+ var spawnTasks = new Task[n];
+ for (int i = 0; i < n; i++)
+ {
+ int captured = i;
+ subscriberWhs[i] = new Warehouse();
+ spawnTasks[i] = SpawnSubscriber(subscriberWhs[i], host, port, resources, captured);
+ }
+
+ await Task.WhenAll(spawnTasks);
+
+ bool spawnFailed = false;
+ for (int i = 0; i < n; i++)
+ {
+ if (spawnTasks[i].Result == null) { spawnFailed = true; break; }
+ subscribers[i] = spawnTasks[i].Result!;
+ }
+ spawnSw.Stop();
+
+ if (spawnFailed)
+ {
+ Console.WriteLine($"[Orchestrator] N={n}: spawn failed; treating as saturation.");
+ saturatedDetected = true;
+ await TeardownAll(subscriberWhs);
+ break;
+ }
+ Console.WriteLine($"[Orchestrator] All {n} subscribers attached in {spawnSw.Elapsed.TotalSeconds:F2}s");
+
+ // ---------- warmup ----------
+ Console.WriteLine($"[Orchestrator] Warmup {warmupSec}s...");
+ await Task.Delay(warmupSec * 1000);
+ foreach (var s in subscribers) s.ResetCounters();
+
+ // ---------- measurement window with CPU sampling ----------
+ Console.WriteLine($"[Orchestrator] Measurement window {windowSec}s...");
+ var cpuSamples = new List();
+ var connSamples = new List();
+ var clientCpuSamples = new List();
+ var clientProc = Process.GetCurrentProcess();
+ var prevClientCpu = clientProc.TotalProcessorTime;
+ var prevClientWall = DateTime.UtcNow;
+ var winSw = Stopwatch.StartNew();
+ while (winSw.Elapsed.TotalSeconds < windowSec)
+ {
+ await Task.Delay(1000);
+
+ // Server CPU + subscriber count via the control resource (read by property index;
+ // values arrive as variable-width numerics, hence Convert.*).
+ if (control != null && cpuIdx != 255)
+ {
+ try
+ {
+ if (control.TryGetPropertyValue(cpuIdx, out var cpuVal) && cpuVal != null)
+ cpuSamples.Add(Convert.ToDouble(cpuVal));
+ if (control.TryGetPropertyValue(clientsIdx, out var cliVal) && cliVal != null)
+ connSamples.Add(Convert.ToInt32(cliVal));
+ }
+ catch { /* control resource may not have a current value yet */ }
+ }
+
+ // This harness's own CPU (% across all cores). Recorded so saturation can be attributed
+ // to the server rather than to the single subscriber process driving N connections.
+ clientProc.Refresh();
+ var nowClientCpu = clientProc.TotalProcessorTime;
+ var nowClientWall = DateTime.UtcNow;
+ var wallMs = (nowClientWall - prevClientWall).TotalMilliseconds;
+ if (wallMs > 0) clientCpuSamples.Add((nowClientCpu - prevClientCpu).TotalMilliseconds / wallMs * 100.0);
+ prevClientCpu = nowClientCpu;
+ prevClientWall = nowClientWall;
+ }
+
+ double elapsedSec = winSw.Elapsed.TotalSeconds;
+
+ // ---------- collect per-subscriber counts ----------
+ var perSubRates = new double[n];
+ long totalReceived = 0;
+ long totalLate = 0;
+ for (int i = 0; i < n; i++)
+ {
+ perSubRates[i] = subscribers[i].Received / elapsedSec;
+ totalReceived += subscribers[i].Received;
+ totalLate += subscribers[i].LateDeliveries;
+ }
+
+ double meanPerSub = perSubRates.Average();
+ double stdPerSub = StdDev(perSubRates);
+ double minPerSub = perSubRates.Min();
+ double maxPerSub = perSubRates.Max();
+ double aggregate = perSubRates.Sum();
+ double avgServerCpu = cpuSamples.Count > 0 ? cpuSamples.Average() : double.NaN;
+ double peakServerCpu = cpuSamples.Count > 0 ? cpuSamples.Max() : double.NaN;
+ double avgClientCpu = clientCpuSamples.Count > 0 ? clientCpuSamples.Average() : double.NaN;
+ double peakClientCpu = clientCpuSamples.Count > 0 ? clientCpuSamples.Max() : double.NaN;
+
+ Console.WriteLine($"[Orchestrator] N={n} rep={rep + 1}: "
+ + $"mean_per_sub={meanPerSub:F1}/s "
+ + $"aggregate={aggregate:F0}/s "
+ + $"late={totalLate} "
+ + $"server_cpu_avg={avgServerCpu:F1}%/peak={peakServerCpu:F1}% "
+ + $"client_cpu_avg={avgClientCpu:F1}%/peak={peakClientCpu:F1}%");
+
+ perRepResults.Add(new RepResult
+ {
+ N = n,
+ Rep = rep + 1,
+ MeanPerSub = meanPerSub,
+ StdPerSub = stdPerSub,
+ MinPerSub = minPerSub,
+ MaxPerSub = maxPerSub,
+ Aggregate = aggregate,
+ LateDeliveries = totalLate,
+ ServerCpuAvg = avgServerCpu,
+ ServerCpuPeak = peakServerCpu,
+ ClientCpuAvg = avgClientCpu,
+ ClientCpuPeak = peakClientCpu,
+ });
+
+ // ---------- teardown ----------
+ Console.WriteLine($"[Orchestrator] Tearing down {n} subscribers...");
+ await TeardownAll(subscriberWhs);
+ await Task.Delay(settleSec * 1000);
+ }
+
+ // ---------- per-N aggregation ----------
+ if (perRepResults.Count > 0)
+ {
+ double meanOfMeans = perRepResults.Average(r => r.MeanPerSub);
+ double ciHalfWidth = ConfidenceIntervalHalfWidth95(
+ perRepResults.Select(r => r.MeanPerSub).ToArray());
+
+ Console.WriteLine($"\n[Orchestrator] N={n} SUMMARY: "
+ + $"mean_per_sub={meanOfMeans:F1} ± {ciHalfWidth:F1} notif/s (95% CI)");
+
+ // Saturation detection: stop sweep if per-sub rate falls below
+ // 10% of theoretical OR server CPU peaked above 180% (>90% of 2 cores)
+ if (meanOfMeans < minAcceptableRate)
+ {
+ Console.WriteLine($"[Orchestrator] *** SATURATION DETECTED: rate {meanOfMeans:F0} < {minAcceptableRate:F0} ***");
+ saturatedDetected = true;
+ }
+ else if (perRepResults.Average(r => r.ServerCpuPeak) > 180.0)
+ {
+ Console.WriteLine($"[Orchestrator] *** SATURATION DETECTED: server CPU peaked > 180% ***");
+ saturatedDetected = true;
+ }
+
+ // Aggregate row for CSV
+ allResults.Add(new SweepResult
+ {
+ N = n,
+ Replications = perRepResults.Count,
+ MeanPerSubRate = meanOfMeans,
+ Ci95HalfWidth = ciHalfWidth,
+ MeanAggregate = perRepResults.Average(r => r.Aggregate),
+ TotalLate = perRepResults.Sum(r => r.LateDeliveries),
+ MeanServerCpuAvg = perRepResults.Average(r => r.ServerCpuAvg),
+ MeanServerCpuPeak = perRepResults.Average(r => r.ServerCpuPeak),
+ MeanClientCpuAvg = perRepResults.Average(r => r.ClientCpuAvg),
+ MeanClientCpuPeak = perRepResults.Average(r => r.ClientCpuPeak),
+ });
+ }
+}
+
+// ----------------------------------------------------------------
+// Output
+// ----------------------------------------------------------------
+var sb = new System.Text.StringBuilder();
+sb.AppendLine("n,replications,mean_per_sub_rate,ci95_halfwidth,mean_aggregate," +
+ "total_late,mean_server_cpu_avg,mean_server_cpu_peak,mean_client_cpu_avg,mean_client_cpu_peak");
+foreach (var r in allResults)
+{
+ sb.AppendLine(string.Create(CultureInfo.InvariantCulture,
+ $"{r.N},{r.Replications},{r.MeanPerSubRate:F2},{r.Ci95HalfWidth:F2}," +
+ $"{r.MeanAggregate:F1},{r.TotalLate},{r.MeanServerCpuAvg:F2},{r.MeanServerCpuPeak:F2}," +
+ $"{r.MeanClientCpuAvg:F2},{r.MeanClientCpuPeak:F2}"));
+}
+await File.WriteAllTextAsync(outputCsv, sb.ToString());
+Console.WriteLine($"\n[Orchestrator] Results written to {outputCsv}");
+
+// ----------------------------------------------------------------
+// Subscriber spawn / teardown
+// ----------------------------------------------------------------
+static async Task SpawnSubscriber(
+ Warehouse wh, string host, int port, int resources, int subId)
+{
+ try
+ {
+ var conn = await wh.Get($"ep://{host}:{port}");
+ var sub = new SubscriberTask { SubscriberId = subId };
+
+ for (int i = 0; i < resources; i++)
+ {
+ var proxy = await conn.Get($"sys/sensor_{i}");
+ long lastTick = Stopwatch.GetTimestamp();
+
+ proxy.Instance.PropertyModified += (PropertyModificationInfo data) =>
+ {
+ if (data.Name != "Value") return;
+ long now = Stopwatch.GetTimestamp();
+ double elapsedMs = (now - lastTick) * 1000.0 / Stopwatch.Frequency;
+ lastTick = now;
+ Interlocked.Increment(ref sub._received);
+ if (elapsedMs > 400) Interlocked.Increment(ref sub._lateDeliveries);
+ };
+ }
+
+ return sub;
+ }
+ catch (Exception ex)
+ {
+ Console.WriteLine($" [Spawn-{subId}] FAILED: {ex.Message}");
+ return null;
+ }
+}
+
+static async Task TeardownAll(Warehouse[] whs)
+{
+ foreach (var wh in whs)
+ {
+ try { await wh.Close(); }
+ catch { /* ignore */ }
+ }
+}
+
+// ----------------------------------------------------------------
+// Stats helpers
+// ----------------------------------------------------------------
+static double StdDev(double[] xs)
+{
+ if (xs.Length < 2) return 0;
+ double mean = xs.Average();
+ double sumSq = xs.Sum(x => (x - mean) * (x - mean));
+ return Math.Sqrt(sumSq / (xs.Length - 1));
+}
+
+///
+/// 95% confidence interval half-width using Student's t-distribution.
+/// For very small samples (n < 3) returns 0 (not enough data).
+/// t values for 95% two-sided are hard-coded; see standard tables.
+///
+static double ConfidenceIntervalHalfWidth95(double[] xs)
+{
+ int n = xs.Length;
+ if (n < 2) return 0;
+ double std = StdDev(xs);
+ double sem = std / Math.Sqrt(n);
+ // t for df=n-1, two-sided 95%
+ double t = (n - 1) switch
+ {
+ 1 => 12.706,
+ 2 => 4.303,
+ 3 => 3.182,
+ 4 => 2.776,
+ 5 => 2.571,
+ 6 => 2.447,
+ 7 => 2.365,
+ 8 => 2.306,
+ 9 => 2.262,
+ _ => 1.960 // normal approximation
+ };
+ return t * sem;
+}
static string GetArg(string[] args, string key, string def)
{
- var i = Array.IndexOf(args, key);
+ int i = Array.IndexOf(args, key);
return (i >= 0 && i + 1 < args.Length) ? args[i + 1] : def;
}
+
+// ----------------------------------------------------------------
+// Records
+// ----------------------------------------------------------------
+class SubscriberTask
+{
+ public int SubscriberId;
+ internal long _received;
+ internal long _lateDeliveries;
+ public long Received => Interlocked.Read(ref _received);
+ public long LateDeliveries => Interlocked.Read(ref _lateDeliveries);
+ public void ResetCounters()
+ {
+ Interlocked.Exchange(ref _received, 0);
+ Interlocked.Exchange(ref _lateDeliveries, 0);
+ }
+}
+
+record RepResult
+{
+ public int N;
+ public int Rep;
+ public double MeanPerSub;
+ public double StdPerSub;
+ public double MinPerSub;
+ public double MaxPerSub;
+ public double Aggregate;
+ public long LateDeliveries;
+ public double ServerCpuAvg;
+ public double ServerCpuPeak;
+ public double ClientCpuAvg;
+ public double ClientCpuPeak;
+}
+
+record SweepResult
+{
+ public int N;
+ public int Replications;
+ public double MeanPerSubRate;
+ public double Ci95HalfWidth;
+ public double MeanAggregate;
+ public long TotalLate;
+ public double MeanServerCpuAvg;
+ public double MeanServerCpuPeak;
+ public double MeanClientCpuAvg;
+ public double MeanClientCpuPeak;
+}
\ No newline at end of file
diff --git a/Tests/Distribution/NodeFanoutSweep/Client/ControlResource.cs b/Tests/Distribution/NodeFanoutSweep/Server/ControlResource.cs
similarity index 100%
rename from Tests/Distribution/NodeFanoutSweep/Client/ControlResource.cs
rename to Tests/Distribution/NodeFanoutSweep/Server/ControlResource.cs
diff --git a/Tests/Distribution/NodeFanoutSweep/Server/Esiur.Tests.NodeFanoutSweep.Server.csproj b/Tests/Distribution/NodeFanoutSweep/Server/Esiur.Tests.NodeFanoutSweep.Server.csproj
index 634bddd..44e8917 100644
--- a/Tests/Distribution/NodeFanoutSweep/Server/Esiur.Tests.NodeFanoutSweep.Server.csproj
+++ b/Tests/Distribution/NodeFanoutSweep/Server/Esiur.Tests.NodeFanoutSweep.Server.csproj
@@ -8,7 +8,7 @@
-
+
diff --git a/Tests/Distribution/NodeFanoutSweep/Server/Program.cs b/Tests/Distribution/NodeFanoutSweep/Server/Program.cs
index b03d372..063a36b 100644
--- a/Tests/Distribution/NodeFanoutSweep/Server/Program.cs
+++ b/Tests/Distribution/NodeFanoutSweep/Server/Program.cs
@@ -1,427 +1,77 @@
-// ============================================================
-// Scalability Extension: Fan-Out — ORCHESTRATOR CLIENT
-// ------------------------------------------------------------
-// Drives a full sweep of subscriber counts N against a single
-// server instance. For each N value:
-// 1. Spawns N in-process subscriber tasks, each opening its
-// own EpConnection to the server.
-// 2. Each subscriber attaches to all M resources and counts
-// property-change notifications it receives over a fixed
-// measurement window.
-// 3. The orchestrator polls the server's sys/control resource
-// to capture server-side CPU during the window.
-// 4. Tears down all N subscribers and waits a settle interval
-// before the next sweep point.
-// 5. Repeats for `replications` rounds so the per-N mean and
-// 95% confidence interval can be computed.
-// 6. Auto-stops the sweep if either:
-// - mean per-subscriber rate drops below 10% of theoretical,
-// - or server CPU stays at >180% (>90% of 2 cores) for the
-// entire measurement window.
+// ============================================================
+// Scalability Extension: Fan-Out — SERVER NODE
+// Hosts M sensor resources and emits Value updates at a fixed interval (the fan-out source). Also
+// hosts sys/control, updated once per second with the server process CPU (% across all cores) and
+// the live subscriber count, which the sweep orchestrator reads to characterise saturation.
+// Anonymous (None-mode) access so subscribers connect without credentials.
//
-// Note on in-process vs separate processes: subscribers are
-// tasks within a single client process to keep the test self-
-// contained and avoid spawning N OS processes. Each task uses
-// its own EpConnection (TCP connection) to the server, so from
-// the server's perspective the load looks identical to N
-// distinct subscriber nodes for the property-propagation path.
-// The single-client-process design does mean that the client
-// host's CPU is shared across all subscribers; the orchestrator
-// records this too so degradation can be attributed correctly.
-// ------------------------------------------------------------
-// Usage:
-// dotnet run -- --host 127.0.0.1 --port 10900 --resources 100 \
-// --emit-interval-ms 50 --window-sec 60 \
-// --warmup-sec 5 --replications 3 \
-// --n-values 2,5,10,20,50,100,200,500
+// Usage: dotnet run -- --port 10900 --resources 100 --emit-interval-ms 50
+// (Run the orchestrator from the sibling "Server" project against this host:port.)
// ============================================================
using Esiur.Protocol;
using Esiur.Resource;
-using System.Data.Common;
+using Esiur.Stores;
using System.Diagnostics;
-using System.Globalization;
-var host = GetArg(args, "--host", "127.0.0.1");
-var port = int.Parse(GetArg(args, "--port", "10900"));
-var resources = int.Parse(GetArg(args, "--resources", "100"));
+var port = int.Parse(GetArg(args, "--port", "10900"));
+var resources = int.Parse(GetArg(args, "--resources", "100"));
var emitIntervalMs = int.Parse(GetArg(args, "--emit-interval-ms", "50"));
-var windowSec = int.Parse(GetArg(args, "--window-sec", "60"));
-var warmupSec = int.Parse(GetArg(args, "--warmup-sec", "5"));
-var settleSec = int.Parse(GetArg(args, "--settle-sec", "5"));
-var replications = int.Parse(GetArg(args, "--replications", "3"));
-var nValuesStr = GetArg(args, "--n-values", "2,5,10,20,50,100,200,500");
-var outputCsv = GetArg(args, "--output", "fanout_sweep_results.csv");
-var nValues = nValuesStr.Split(',').Select(int.Parse).ToArray();
-double theoreticalMaxRate = 1000.0 / emitIntervalMs * resources;
-double minAcceptableRate = theoreticalMaxRate * 0.10;
+Console.WriteLine($"[Server] resources={resources} emit-interval={emitIntervalMs}ms port={port} cores={Environment.ProcessorCount}");
-Console.WriteLine($"[Orchestrator] resources={resources} interval={emitIntervalMs}ms "
- + $"window={windowSec}s replications={replications}");
-Console.WriteLine($"[Orchestrator] theoretical_max_per_subscriber_rate={theoreticalMaxRate:F0} notif/s");
-Console.WriteLine($"[Orchestrator] saturation_threshold={minAcceptableRate:F0} notif/s");
-Console.WriteLine($"[Orchestrator] N values: {string.Join(",", nValues)}");
+var wh = new Warehouse();
+await wh.Put("sys", new MemoryStore());
+var server = await wh.Put("sys/server", new EpServer { Port = (ushort)port, AllowUnauthorizedAccess = true });
-// ----------------------------------------------------------------
-// Attach to the server's control resource once.
-// ----------------------------------------------------------------
-var controlWh = new Warehouse();
-EpResource? control = null;
-byte cpuIdx = 255, clientsIdx = 255;
-try
+var sensors = new SensorResource[resources];
+for (var i = 0; i < resources; i++) { sensors[i] = new SensorResource { SensorId = i }; await wh.Put($"sys/sensor_{i}", sensors[i]); }
+
+var control = new ControlResource();
+await wh.Put("sys/control", control);
+
+await wh.Open();
+Console.WriteLine($"[Server] Listening on port {port} with {resources} sensors + sys/control. Press Ctrl+C to stop.");
+
+// Emit loop: drives property-change notifications to every attached subscriber.
+var sw = Stopwatch.StartNew();
+_ = Task.Run(async () =>
{
- var controlConn = await controlWh.Get($"ep://{host}:{port}");
- control = (EpResource)await controlConn.Get("sys/control");
- // Resolve property indices by name (EpResource exposes values by index, not dynamic member).
- var props = control.Instance.Definition.Properties;
- cpuIdx = (byte)Array.FindIndex(props, p => p.Name == "CpuPercent");
- clientsIdx = (byte)Array.FindIndex(props, p => p.Name == "ConnectedClients");
- Console.WriteLine($"[Orchestrator] sys/control attached (CpuPercent=idx {cpuIdx}, ConnectedClients=idx {clientsIdx}).");
-}
-catch (Exception ex)
-{
- Console.WriteLine($"[Orchestrator] WARNING: could not attach to sys/control: {ex.Message}");
- Console.WriteLine("[Orchestrator] Server CPU will be reported as N/A.");
-}
-
-// ----------------------------------------------------------------
-// All sweep points x replications, with per-N early-stop logic.
-// ----------------------------------------------------------------
-var allResults = new List();
-bool saturatedDetected = false;
-
-foreach (int n in nValues)
-{
- if (saturatedDetected)
+ while (true)
{
- Console.WriteLine($"\n[Orchestrator] N={n}: SKIPPED (saturation reached at lower N)");
- continue;
+ await Task.Delay(emitIntervalMs);
+ var value = sw.Elapsed.TotalSeconds;
+ foreach (var s in sensors) s.Value = value;
}
+});
- var perRepResults = new List();
-
- for (int rep = 0; rep < replications; rep++)
- {
- Console.WriteLine($"\n[Orchestrator] === N={n} rep={rep + 1}/{replications} ===");
-
- var subscribers = new SubscriberTask[n];
- var subscriberWhs = new Warehouse[n];
-
- // ---------- spawn N subscribers ----------
- Console.WriteLine($"[Orchestrator] Spawning {n} subscribers...");
- var spawnSw = Stopwatch.StartNew();
- var spawnTasks = new Task[n];
- for (int i = 0; i < n; i++)
- {
- int captured = i;
- subscriberWhs[i] = new Warehouse();
- spawnTasks[i] = SpawnSubscriber(subscriberWhs[i], host, port, resources, captured);
- }
-
- await Task.WhenAll(spawnTasks);
-
- bool spawnFailed = false;
- for (int i = 0; i < n; i++)
- {
- if (spawnTasks[i].Result == null) { spawnFailed = true; break; }
- subscribers[i] = spawnTasks[i].Result!;
- }
- spawnSw.Stop();
-
- if (spawnFailed)
- {
- Console.WriteLine($"[Orchestrator] N={n}: spawn failed; treating as saturation.");
- saturatedDetected = true;
- await TeardownAll(subscriberWhs);
- break;
- }
- Console.WriteLine($"[Orchestrator] All {n} subscribers attached in {spawnSw.Elapsed.TotalSeconds:F2}s");
-
- // ---------- warmup ----------
- Console.WriteLine($"[Orchestrator] Warmup {warmupSec}s...");
- await Task.Delay(warmupSec * 1000);
- foreach (var s in subscribers) s.ResetCounters();
-
- // ---------- measurement window with CPU sampling ----------
- Console.WriteLine($"[Orchestrator] Measurement window {windowSec}s...");
- var cpuSamples = new List();
- var connSamples = new List();
- var clientCpuSamples = new List();
- var clientProc = Process.GetCurrentProcess();
- var prevClientCpu = clientProc.TotalProcessorTime;
- var prevClientWall = DateTime.UtcNow;
- var winSw = Stopwatch.StartNew();
- while (winSw.Elapsed.TotalSeconds < windowSec)
- {
- await Task.Delay(1000);
-
- // Server CPU + subscriber count via the control resource (read by property index;
- // values arrive as variable-width numerics, hence Convert.*).
- if (control != null && cpuIdx != 255)
- {
- try
- {
- if (control.TryGetPropertyValue(cpuIdx, out var cpuVal) && cpuVal != null)
- cpuSamples.Add(Convert.ToDouble(cpuVal));
- if (control.TryGetPropertyValue(clientsIdx, out var cliVal) && cliVal != null)
- connSamples.Add(Convert.ToInt32(cliVal));
- }
- catch { /* control resource may not have a current value yet */ }
- }
-
- // This harness's own CPU (% across all cores). Recorded so saturation can be attributed
- // to the server rather than to the single subscriber process driving N connections.
- clientProc.Refresh();
- var nowClientCpu = clientProc.TotalProcessorTime;
- var nowClientWall = DateTime.UtcNow;
- var wallMs = (nowClientWall - prevClientWall).TotalMilliseconds;
- if (wallMs > 0) clientCpuSamples.Add((nowClientCpu - prevClientCpu).TotalMilliseconds / wallMs * 100.0);
- prevClientCpu = nowClientCpu;
- prevClientWall = nowClientWall;
- }
-
- double elapsedSec = winSw.Elapsed.TotalSeconds;
-
- // ---------- collect per-subscriber counts ----------
- var perSubRates = new double[n];
- long totalReceived = 0;
- long totalLate = 0;
- for (int i = 0; i < n; i++)
- {
- perSubRates[i] = subscribers[i].Received / elapsedSec;
- totalReceived += subscribers[i].Received;
- totalLate += subscribers[i].LateDeliveries;
- }
-
- double meanPerSub = perSubRates.Average();
- double stdPerSub = StdDev(perSubRates);
- double minPerSub = perSubRates.Min();
- double maxPerSub = perSubRates.Max();
- double aggregate = perSubRates.Sum();
- double avgServerCpu = cpuSamples.Count > 0 ? cpuSamples.Average() : double.NaN;
- double peakServerCpu = cpuSamples.Count > 0 ? cpuSamples.Max() : double.NaN;
- double avgClientCpu = clientCpuSamples.Count > 0 ? clientCpuSamples.Average() : double.NaN;
- double peakClientCpu = clientCpuSamples.Count > 0 ? clientCpuSamples.Max() : double.NaN;
-
- Console.WriteLine($"[Orchestrator] N={n} rep={rep + 1}: "
- + $"mean_per_sub={meanPerSub:F1}/s "
- + $"aggregate={aggregate:F0}/s "
- + $"late={totalLate} "
- + $"server_cpu_avg={avgServerCpu:F1}%/peak={peakServerCpu:F1}% "
- + $"client_cpu_avg={avgClientCpu:F1}%/peak={peakClientCpu:F1}%");
-
- perRepResults.Add(new RepResult
- {
- N = n,
- Rep = rep + 1,
- MeanPerSub = meanPerSub,
- StdPerSub = stdPerSub,
- MinPerSub = minPerSub,
- MaxPerSub = maxPerSub,
- Aggregate = aggregate,
- LateDeliveries = totalLate,
- ServerCpuAvg = avgServerCpu,
- ServerCpuPeak = peakServerCpu,
- ClientCpuAvg = avgClientCpu,
- ClientCpuPeak = peakClientCpu,
- });
-
- // ---------- teardown ----------
- Console.WriteLine($"[Orchestrator] Tearing down {n} subscribers...");
- await TeardownAll(subscriberWhs);
- await Task.Delay(settleSec * 1000);
- }
-
- // ---------- per-N aggregation ----------
- if (perRepResults.Count > 0)
- {
- double meanOfMeans = perRepResults.Average(r => r.MeanPerSub);
- double ciHalfWidth = ConfidenceIntervalHalfWidth95(
- perRepResults.Select(r => r.MeanPerSub).ToArray());
-
- Console.WriteLine($"\n[Orchestrator] N={n} SUMMARY: "
- + $"mean_per_sub={meanOfMeans:F1} ± {ciHalfWidth:F1} notif/s (95% CI)");
-
- // Saturation detection: stop sweep if per-sub rate falls below
- // 10% of theoretical OR server CPU peaked above 180% (>90% of 2 cores)
- if (meanOfMeans < minAcceptableRate)
- {
- Console.WriteLine($"[Orchestrator] *** SATURATION DETECTED: rate {meanOfMeans:F0} < {minAcceptableRate:F0} ***");
- saturatedDetected = true;
- }
- else if (perRepResults.Average(r => r.ServerCpuPeak) > 180.0)
- {
- Console.WriteLine($"[Orchestrator] *** SATURATION DETECTED: server CPU peaked > 180% ***");
- saturatedDetected = true;
- }
-
- // Aggregate row for CSV
- allResults.Add(new SweepResult
- {
- N = n,
- Replications = perRepResults.Count,
- MeanPerSubRate = meanOfMeans,
- Ci95HalfWidth = ciHalfWidth,
- MeanAggregate = perRepResults.Average(r => r.Aggregate),
- TotalLate = perRepResults.Sum(r => r.LateDeliveries),
- MeanServerCpuAvg = perRepResults.Average(r => r.ServerCpuAvg),
- MeanServerCpuPeak = perRepResults.Average(r => r.ServerCpuPeak),
- MeanClientCpuAvg = perRepResults.Average(r => r.ClientCpuAvg),
- MeanClientCpuPeak = perRepResults.Average(r => r.ClientCpuPeak),
- });
- }
-}
-
-// ----------------------------------------------------------------
-// Output
-// ----------------------------------------------------------------
-var sb = new System.Text.StringBuilder();
-sb.AppendLine("n,replications,mean_per_sub_rate,ci95_halfwidth,mean_aggregate," +
- "total_late,mean_server_cpu_avg,mean_server_cpu_peak,mean_client_cpu_avg,mean_client_cpu_peak");
-foreach (var r in allResults)
+// Telemetry loop: publish server CPU (% across all cores) and subscriber count once per second.
+_ = Task.Run(async () =>
{
- sb.AppendLine(string.Create(CultureInfo.InvariantCulture,
- $"{r.N},{r.Replications},{r.MeanPerSubRate:F2},{r.Ci95HalfWidth:F2}," +
- $"{r.MeanAggregate:F1},{r.TotalLate},{r.MeanServerCpuAvg:F2},{r.MeanServerCpuPeak:F2}," +
- $"{r.MeanClientCpuAvg:F2},{r.MeanClientCpuPeak:F2}"));
-}
-await File.WriteAllTextAsync(outputCsv, sb.ToString());
-Console.WriteLine($"\n[Orchestrator] Results written to {outputCsv}");
-
-// ----------------------------------------------------------------
-// Subscriber spawn / teardown
-// ----------------------------------------------------------------
-static async Task SpawnSubscriber(
- Warehouse wh, string host, int port, int resources, int subId)
-{
- try
+ var proc = Process.GetCurrentProcess();
+ var prevCpu = proc.TotalProcessorTime;
+ var prevWall = DateTime.UtcNow;
+ while (true)
{
- var conn = await wh.Get($"ep://{host}:{port}");
- var sub = new SubscriberTask { SubscriberId = subId };
-
- for (int i = 0; i < resources; i++)
- {
- var proxy = await conn.Get($"sys/sensor_{i}");
- long lastTick = Stopwatch.GetTimestamp();
-
- proxy.Instance.PropertyModified += (PropertyModificationInfo data) =>
- {
- if (data.Name != "Value") return;
- long now = Stopwatch.GetTimestamp();
- double elapsedMs = (now - lastTick) * 1000.0 / Stopwatch.Frequency;
- lastTick = now;
- Interlocked.Increment(ref sub._received);
- if (elapsedMs > 400) Interlocked.Increment(ref sub._lateDeliveries);
- };
- }
-
- return sub;
+ await Task.Delay(1000);
+ proc.Refresh();
+ var nowCpu = proc.TotalProcessorTime;
+ var nowWall = DateTime.UtcNow;
+ var wallMs = (nowWall - prevWall).TotalMilliseconds;
+ control.CpuPercent = wallMs > 0 ? (nowCpu - prevCpu).TotalMilliseconds / wallMs * 100.0 : 0;
+ control.ConnectedClients = server.Connections.Count;
+ prevCpu = nowCpu;
+ prevWall = nowWall;
}
- catch (Exception ex)
- {
- Console.WriteLine($" [Spawn-{subId}] FAILED: {ex.Message}");
- return null;
- }
-}
+});
-static async Task TeardownAll(Warehouse[] whs)
-{
- foreach (var wh in whs)
- {
- try { await wh.Close(); }
- catch { /* ignore */ }
- }
-}
-
-// ----------------------------------------------------------------
-// Stats helpers
-// ----------------------------------------------------------------
-static double StdDev(double[] xs)
-{
- if (xs.Length < 2) return 0;
- double mean = xs.Average();
- double sumSq = xs.Sum(x => (x - mean) * (x - mean));
- return Math.Sqrt(sumSq / (xs.Length - 1));
-}
-
-///
-/// 95% confidence interval half-width using Student's t-distribution.
-/// For very small samples (n < 3) returns 0 (not enough data).
-/// t values for 95% two-sided are hard-coded; see standard tables.
-///
-static double ConfidenceIntervalHalfWidth95(double[] xs)
-{
- int n = xs.Length;
- if (n < 2) return 0;
- double std = StdDev(xs);
- double sem = std / Math.Sqrt(n);
- // t for df=n-1, two-sided 95%
- double t = (n - 1) switch
- {
- 1 => 12.706,
- 2 => 4.303,
- 3 => 3.182,
- 4 => 2.776,
- 5 => 2.571,
- 6 => 2.447,
- 7 => 2.365,
- 8 => 2.306,
- 9 => 2.262,
- _ => 1.960 // normal approximation
- };
- return t * sem;
-}
+var stop = new TaskCompletionSource();
+Console.CancelKeyPress += (_, e) => { e.Cancel = true; stop.TrySetResult(); };
+await stop.Task;
+await wh.Close();
static string GetArg(string[] args, string key, string def)
{
- int i = Array.IndexOf(args, key);
+ var i = Array.IndexOf(args, key);
return (i >= 0 && i + 1 < args.Length) ? args[i + 1] : def;
}
-
-// ----------------------------------------------------------------
-// Records
-// ----------------------------------------------------------------
-class SubscriberTask
-{
- public int SubscriberId;
- internal long _received;
- internal long _lateDeliveries;
- public long Received => Interlocked.Read(ref _received);
- public long LateDeliveries => Interlocked.Read(ref _lateDeliveries);
- public void ResetCounters()
- {
- Interlocked.Exchange(ref _received, 0);
- Interlocked.Exchange(ref _lateDeliveries, 0);
- }
-}
-
-record RepResult
-{
- public int N;
- public int Rep;
- public double MeanPerSub;
- public double StdPerSub;
- public double MinPerSub;
- public double MaxPerSub;
- public double Aggregate;
- public long LateDeliveries;
- public double ServerCpuAvg;
- public double ServerCpuPeak;
- public double ClientCpuAvg;
- public double ClientCpuPeak;
-}
-
-record SweepResult
-{
- public int N;
- public int Replications;
- public double MeanPerSubRate;
- public double Ci95HalfWidth;
- public double MeanAggregate;
- public long TotalLate;
- public double MeanServerCpuAvg;
- public double MeanServerCpuPeak;
- public double MeanClientCpuAvg;
- public double MeanClientCpuPeak;
-}
\ No newline at end of file
diff --git a/Tests/Distribution/NodeFanoutSweep/Client/SensorResource.cs b/Tests/Distribution/NodeFanoutSweep/Server/SensorResource.cs
similarity index 100%
rename from Tests/Distribution/NodeFanoutSweep/Client/SensorResource.cs
rename to Tests/Distribution/NodeFanoutSweep/Server/SensorResource.cs
diff --git a/Tests/Distribution/Queueing/Client/Esiur.Tests.Queueing.Client.csproj b/Tests/Distribution/Queueing/Client/Esiur.Tests.Queueing.Client.csproj
index 6faa089..634bddd 100644
--- a/Tests/Distribution/Queueing/Client/Esiur.Tests.Queueing.Client.csproj
+++ b/Tests/Distribution/Queueing/Client/Esiur.Tests.Queueing.Client.csproj
@@ -7,14 +7,6 @@
enable
-
-
-
-
-
-
-
-
diff --git a/Tests/Distribution/Queueing/Client/Program.cs b/Tests/Distribution/Queueing/Client/Program.cs
index ad07006..fc3660f 100644
--- a/Tests/Distribution/Queueing/Client/Program.cs
+++ b/Tests/Distribution/Queueing/Client/Program.cs
@@ -1,7 +1,23 @@
-// ============================================================
-// Test 4: Fork-Join Queueing Test — CLIENT NODE
+// ============================================================
+// Test 4: Fork-Join Queueing Test — CLIENT NODE (REPLICATED)
//
-// Usage: dotnet run -- --host 127.0.0.1 --port 10901 --trials 10000
+// Extends the original single-shot client to run K independent
+// replications of each (delay, α) configuration so that 95%
+// confidence intervals can be reported for the metrics in
+// Table III (λ, μ, R̄, δ̄, D̄, P99(D), queue length, batch B).
+//
+// Each replication uses an identical configuration; the server
+// runs StartUpdatesLocal back-to-back, and the client snapshots
+// the cumulative finished-queue length between replications so
+// that each replication's evaluation sees only its own items.
+//
+// Usage:
+// dotnet run -- --host 127.0.0.1 --port 10901 \
+// --trials 1000 \
+// --delays 5:10:20:30:50:100 \
+// --alphas 0.0:0.25:0.5:0.75:1.0 \
+// --replications 5 \
+// --output forkjoin_replicated.csv
// ============================================================
using Esiur.Data;
@@ -9,85 +25,152 @@ using Esiur.Protocol;
using Esiur.Resource;
using Esiur.Tests.Queueing.Client;
using System.ComponentModel;
-using System.Diagnostics;
-using System.Diagnostics.Metrics;
-using System.Text.RegularExpressions;
-
-var results = new List();
-int counter = 0;
-
-
-
-int currentAlpha = 0;
-int currentDelay = 0;
+using System.Globalization;
+using System.IO;
+// ---------- arguments ----------
var host = GetArg(args, "--host", "127.0.0.1");
var port = int.Parse(GetArg(args, "--port", "10901"));
var trials = int.Parse(GetArg(args, "--trials", "1000"));
-var delays = GetArg(args, "--delays", "5:8:10:20:30:100")
- .Split(":").Select(x => Convert.ToInt32(x)).ToArray();
-var alphas = GetArg(args, "--alphas", "0.0:0.25:0.5:0.75:1")
- .Split(":").Select(y => Convert.ToDouble(y)).ToArray();
+var replications = int.Parse(GetArg(args, "--replications", "5"));
+var settleMs = int.Parse(GetArg(args, "--settle-ms", "1000"));
+var outputCsv = GetArg(args, "--output", "forkjoin_replicated.csv");
+var delays = GetArg(args, "--delays", "5:10:20:30:50:100")
+ .Split(':').Select(int.Parse).ToArray();
+var alphas = GetArg(args, "--alphas", "0.0:0.25:0.5:0.75:1.0")
+ .Split(':').Select(s => double.Parse(s, CultureInfo.InvariantCulture)).ToArray();
+Console.WriteLine($"[Client-T4-R] Connecting to {host}:{port}");
+Console.WriteLine($"[Client-T4-R] trials/rep={trials} replications={replications} " +
+ $"settle={settleMs}ms");
+Console.WriteLine($"[Client-T4-R] delays={string.Join(",", delays)}");
+Console.WriteLine($"[Client-T4-R] alphas={string.Join(",", alphas.Select(a => a.ToString("F2", CultureInfo.InvariantCulture)))}");
+Console.WriteLine($"[Client-T4-R] {delays.Length * alphas.Length} configurations × {replications} reps " +
+ $"= {delays.Length * alphas.Length * replications} trial runs");
-Console.WriteLine($"[Client-T2] Connecting to {host}:{port}, trials={trials}");
-
+// ---------- connect ----------
var wh = new Warehouse();
-
-var serviceResource = await wh.Get(
- $"ep://{host}:{port}/sys/queueing");
-
+var serviceResource = await wh.Get($"ep://{host}:{port}/sys/queueing");
var service = (dynamic)serviceResource;
-serviceResource.PropertyChanged += Service_PropertyChanged;
+// ---------- replication coordinator state ----------
+//
+// The server's StartUpdatesLocal fires `trials` PropertyChanged events
+// across a single call. We count incoming events; when `trials` arrive,
+// the current replication is complete. We then slice off this rep's
+// portion of the cumulative finished-queue and hand it to QueueEval.
+//
+// `repDone` is signaled once per replication so the orchestrator coroutine
+// can drive the next call.
+int eventsThisRep = 0;
+TaskCompletionSource repDone = new(TaskCreationOptions.RunContinuationsAsynchronously);
+int finishedQueueBaseline = 0; // cumulative length BEFORE current rep started
-
-Console.WriteLine("Starting test: Delay=" + delays[currentDelay] + " Alpha=" + alphas[currentAlpha]);
-
-service.StartUpdatesLocal(delays[currentDelay], trials, alphas[currentAlpha]);
-
-await Task.Delay(-1);
-
-
-void Service_PropertyChanged(object? sender, PropertyChangedEventArgs e)
+serviceResource.PropertyChanged += (object? sender, PropertyChangedEventArgs e) =>
{
- counter++;
-
- if (counter == trials)
+ int n = Interlocked.Increment(ref eventsThisRep);
+ if (n == trials)
{
- var queue = service.DistributedResourceConnection.GetFinishedQueue();
- var result = EsiurQueueEval.Evaluate(queue);
+ repDone.TrySetResult(true);
+ }
+};
- Console.WriteLine(result);
- counter = 0;
+// ---------- main sweep ----------
+var rows = new List();
- if (currentAlpha == alphas.Length - 1)
- {
- currentAlpha = 0;
- currentDelay++;
- }
- else
- {
- currentAlpha++;
- }
-
- if (currentDelay == delays.Length)
- {
- System.Environment.Exit(0);
- return;
- }
+using var writer = new StreamWriter(outputCsv);
+writer.WriteLine(ReplicatedEvalAggregator.CsvHeader);
+writer.Flush();
+foreach (var delay in delays)
+{
+ foreach (var alpha in alphas)
+ {
Console.WriteLine();
- Console.WriteLine("Starting next test: Delay=" + delays[currentDelay] + " Alpha=" + alphas[currentAlpha]);
+ Console.WriteLine($"[Client-T4-R] >>> delay={delay} ms α={alpha:F2} " +
+ $"(running {replications} replications) <<<");
- service.StartUpdatesLocal(delays[currentDelay], trials, alphas[currentAlpha]);//, 0, resourceLink);
+ var reps = new List(replications);
+ for (int rep = 0; rep < replications; rep++)
+ {
+ // Reset per-rep state
+ Interlocked.Exchange(ref eventsThisRep, 0);
+ repDone = new TaskCompletionSource(
+ TaskCreationOptions.RunContinuationsAsynchronously);
+
+ // Snapshot the cumulative finished-queue length right before this rep
+ // so we can slice off only this rep's portion afterwards.
+ var preQueue = service.ResourceConnection.GetFinishedQueue();
+ finishedQueueBaseline = preQueue.Count;
+
+ // Kick off the server-driven trial sequence (fire-and-forget;
+ // completion is signalled via PropertyChanged → repDone).
+ service.StartUpdatesLocal(delay, trials, alpha);
+
+ // Wait until `trials` PropertyChanged events have been received.
+ await repDone.Task;
+
+ // The server completed `trials` events; slice off this rep's
+ // portion of the cumulative finished-queue. GetFinishedQueue()
+ // returns IReadOnlyList>; we forward the
+ // typed sliced subset directly to Evaluate which is generic
+ // on T (the property's runtime payload type).
+ var fullQueue = service.ResourceConnection.GetFinishedQueue();
+ var typedQueue = SliceQueue(fullQueue, finishedQueueBaseline);
+
+ var repResult = EsiurQueueEval.Evaluate(typedQueue);
+ reps.Add(repResult);
+
+ Console.WriteLine($" rep {rep + 1}/{replications}: " +
+ $"λ={repResult.LambdaEventsPerSecond:F1}/s " +
+ $"R̄={repResult.Latency.ReadinessMs.Mean:F1}ms " +
+ $"δ̄={repResult.Latency.HolMs.Mean:F1}ms " +
+ $"D̄={repResult.Latency.EndToEndMs.Mean:F1}ms");
+
+ // Settle period between reps to let any straggler notifications drain
+ // and to keep the per-rep arrivals statistically independent of any
+ // residual server state from the previous rep.
+ await Task.Delay(settleMs);
+ }
+
+ var agg = ReplicatedEvalAggregator.Aggregate(delay, alpha, reps);
+ rows.Add(agg);
+
+ ReplicatedEvalAggregator.PrintSummary(agg);
+
+ // Append to CSV immediately so partial progress is preserved
+ // if the process is killed mid-sweep.
+ writer.WriteLine(ReplicatedEvalAggregator.ToCsvRow(agg));
+ writer.Flush();
}
}
+Console.WriteLine();
+Console.WriteLine($"[Client-T4-R] Done. {rows.Count} configurations written to {outputCsv}");
+Environment.Exit(0);
+
+
+// ----------------------------------------------------------------
static string GetArg(string[] args, string key, string def)
{
int i = Array.IndexOf(args, key);
return (i >= 0 && i + 1 < args.Length) ? args[i + 1] : def;
}
+
+// ----------------------------------------------------------------
+// Slice the cumulative finished-queue down to only the items added
+// during the current replication.
+//
+// The queue is dynamically typed (returned from a dynamic-dispatched
+// member) and its element type is AsyncQueueItem where T is the
+// runtime payload type of the observed property. We rely on the DLR
+// to bind the LINQ Skip/ToList generic methods at runtime, just
+// as the original code does with the Evaluate call below it.
+// ----------------------------------------------------------------
+static dynamic SliceQueue(dynamic fullQueue, int skipCount)
+{
+ return System.Linq.Enumerable.ToList(
+ System.Linq.Enumerable.Skip(fullQueue, skipCount));
+}
\ No newline at end of file
diff --git a/Tests/Distribution/Queueing/Client/Program2.cs b/Tests/Distribution/Queueing/Client/Program2.cs
deleted file mode 100644
index b3a62b6..0000000
--- a/Tests/Distribution/Queueing/Client/Program2.cs
+++ /dev/null
@@ -1,176 +0,0 @@
-// ============================================================
-// Test 4: Fork-Join Queueing Test — CLIENT NODE (REPLICATED)
-//
-// Extends the original single-shot client to run K independent
-// replications of each (delay, α) configuration so that 95%
-// confidence intervals can be reported for the metrics in
-// Table III (λ, μ, R̄, δ̄, D̄, P99(D), queue length, batch B).
-//
-// Each replication uses an identical configuration; the server
-// runs StartUpdatesLocal back-to-back, and the client snapshots
-// the cumulative finished-queue length between replications so
-// that each replication's evaluation sees only its own items.
-//
-// Usage:
-// dotnet run -- --host 127.0.0.1 --port 10901 \
-// --trials 1000 \
-// --delays 5:10:20:30:50:100 \
-// --alphas 0.0:0.25:0.5:0.75:1.0 \
-// --replications 5 \
-// --output forkjoin_replicated.csv
-// ============================================================
-
-using Esiur.Data;
-using Esiur.Protocol;
-using Esiur.Resource;
-using Esiur.Tests.Queueing.Client;
-using System.ComponentModel;
-using System.Globalization;
-using System.IO;
-
-// ---------- arguments ----------
-var host = GetArg(args, "--host", "127.0.0.1");
-var port = int.Parse(GetArg(args, "--port", "10901"));
-var trials = int.Parse(GetArg(args, "--trials", "1000"));
-var replications = int.Parse(GetArg(args, "--replications", "5"));
-var settleMs = int.Parse(GetArg(args, "--settle-ms", "1000"));
-var outputCsv = GetArg(args, "--output", "forkjoin_replicated.csv");
-var delays = GetArg(args, "--delays", "5:10:20:30:50:100")
- .Split(':').Select(int.Parse).ToArray();
-var alphas = GetArg(args, "--alphas", "0.0:0.25:0.5:0.75:1.0")
- .Split(':').Select(s => double.Parse(s, CultureInfo.InvariantCulture)).ToArray();
-
-Console.WriteLine($"[Client-T4-R] Connecting to {host}:{port}");
-Console.WriteLine($"[Client-T4-R] trials/rep={trials} replications={replications} " +
- $"settle={settleMs}ms");
-Console.WriteLine($"[Client-T4-R] delays={string.Join(",", delays)}");
-Console.WriteLine($"[Client-T4-R] alphas={string.Join(",", alphas.Select(a => a.ToString("F2", CultureInfo.InvariantCulture)))}");
-Console.WriteLine($"[Client-T4-R] {delays.Length * alphas.Length} configurations × {replications} reps " +
- $"= {delays.Length * alphas.Length * replications} trial runs");
-
-// ---------- connect ----------
-var wh = new Warehouse();
-var serviceResource = await wh.Get($"ep://{host}:{port}/sys/queueing");
-var service = (dynamic)serviceResource;
-
-// ---------- replication coordinator state ----------
-//
-// The server's StartUpdatesLocal fires `trials` PropertyChanged events
-// across a single call. We count incoming events; when `trials` arrive,
-// the current replication is complete. We then slice off this rep's
-// portion of the cumulative finished-queue and hand it to QueueEval.
-//
-// `repDone` is signaled once per replication so the orchestrator coroutine
-// can drive the next call.
-
-int eventsThisRep = 0;
-TaskCompletionSource repDone = new(TaskCreationOptions.RunContinuationsAsynchronously);
-int finishedQueueBaseline = 0; // cumulative length BEFORE current rep started
-
-serviceResource.PropertyChanged += (object? sender, PropertyChangedEventArgs e) =>
-{
- int n = Interlocked.Increment(ref eventsThisRep);
- if (n == trials)
- {
- repDone.TrySetResult(true);
- }
-};
-
-// ---------- main sweep ----------
-var rows = new List();
-
-using var writer = new StreamWriter(outputCsv);
-writer.WriteLine(ReplicatedEvalAggregator.CsvHeader);
-writer.Flush();
-
-foreach (var delay in delays)
-{
- foreach (var alpha in alphas)
- {
- Console.WriteLine();
- Console.WriteLine($"[Client-T4-R] >>> delay={delay} ms α={alpha:F2} " +
- $"(running {replications} replications) <<<");
-
- var reps = new List(replications);
-
- for (int rep = 0; rep < replications; rep++)
- {
- // Reset per-rep state
- Interlocked.Exchange(ref eventsThisRep, 0);
- repDone = new TaskCompletionSource(
- TaskCreationOptions.RunContinuationsAsynchronously);
-
- // Snapshot the cumulative finished-queue length right before this rep
- // so we can slice off only this rep's portion afterwards.
- var preQueue = service.DistributedResourceConnection.GetFinishedQueue();
- finishedQueueBaseline = preQueue.Count;
-
- // Kick off the server-driven trial sequence (fire-and-forget;
- // completion is signalled via PropertyChanged → repDone).
- service.StartUpdatesLocal(delay, trials, alpha);
-
- // Wait until `trials` PropertyChanged events have been received.
- await repDone.Task;
-
- // The server completed `trials` events; slice off this rep's
- // portion of the cumulative finished-queue. GetFinishedQueue()
- // returns IReadOnlyList>; we forward the
- // typed sliced subset directly to Evaluate which is generic
- // on T (the property's runtime payload type).
- var fullQueue = service.DistributedResourceConnection.GetFinishedQueue();
- var typedQueue = SliceQueue(fullQueue, finishedQueueBaseline);
-
- var repResult = EsiurQueueEval.Evaluate(typedQueue);
- reps.Add(repResult);
-
- Console.WriteLine($" rep {rep + 1}/{replications}: " +
- $"λ={repResult.LambdaEventsPerSecond:F1}/s " +
- $"R̄={repResult.Latency.ReadinessMs.Mean:F1}ms " +
- $"δ̄={repResult.Latency.HolMs.Mean:F1}ms " +
- $"D̄={repResult.Latency.EndToEndMs.Mean:F1}ms");
-
- // Settle period between reps to let any straggler notifications drain
- // and to keep the per-rep arrivals statistically independent of any
- // residual server state from the previous rep.
- await Task.Delay(settleMs);
- }
-
- var agg = ReplicatedEvalAggregator.Aggregate(delay, alpha, reps);
- rows.Add(agg);
-
- ReplicatedEvalAggregator.PrintSummary(agg);
-
- // Append to CSV immediately so partial progress is preserved
- // if the process is killed mid-sweep.
- writer.WriteLine(ReplicatedEvalAggregator.ToCsvRow(agg));
- writer.Flush();
- }
-}
-
-Console.WriteLine();
-Console.WriteLine($"[Client-T4-R] Done. {rows.Count} configurations written to {outputCsv}");
-Environment.Exit(0);
-
-
-// ----------------------------------------------------------------
-static string GetArg(string[] args, string key, string def)
-{
- int i = Array.IndexOf(args, key);
- return (i >= 0 && i + 1 < args.Length) ? args[i + 1] : def;
-}
-
-// ----------------------------------------------------------------
-// Slice the cumulative finished-queue down to only the items added
-// during the current replication.
-//
-// The queue is dynamically typed (returned from a dynamic-dispatched
-// member) and its element type is AsyncQueueItem where T is the
-// runtime payload type of the observed property. We rely on the DLR
-// to bind the LINQ Skip/ToList generic methods at runtime, just
-// as the original code does with the Evaluate call below it.
-// ----------------------------------------------------------------
-static dynamic SliceQueue(dynamic fullQueue, int skipCount)
-{
- return System.Linq.Enumerable.ToList(
- System.Linq.Enumerable.Skip(fullQueue, skipCount));
-}
\ No newline at end of file
diff --git a/Tests/Distribution/Queueing/Server/Program.cs b/Tests/Distribution/Queueing/Server/Program.cs
index 2f5e56c..76a691f 100644
--- a/Tests/Distribution/Queueing/Server/Program.cs
+++ b/Tests/Distribution/Queueing/Server/Program.cs
@@ -16,8 +16,9 @@ Console.WriteLine($"[Server] Listening on port {port}...");
var wh = Warehouse.Default;
var mem = await wh.Put("sys", new MemoryStore());
var service = await wh.Put("sys/queueing", new QueueingService());
-var server = await wh.Put("sys/server", new EpServer() { Port = (ushort)port,
- EntryPoint = service });
+var server = await wh.Put("sys/server", new EpServer() { Port = (ushort)port,
+ EntryPoint = service,
+ AllowUnauthorizedAccess = true });
long memBefore = GC.GetTotalMemory(forceFullCollection: true);