// ============================================================ // Scalability Extension: Fan-Out — ORCHESTRATOR CLIENT // ------------------------------------------------------------ // Drives a full sweep of subscriber counts N against a single // server instance. For each N value: // 1. Spawns N in-process subscriber tasks, each opening its // own EpConnection to the server. // 2. Each subscriber attaches to all M resources and counts // property-change notifications it receives over a fixed // measurement window. // 3. The orchestrator polls the server's sys/control resource // to capture server-side CPU during the window. // 4. Tears down all N subscribers and waits a settle interval // before the next sweep point. // 5. Repeats for `replications` rounds so the per-N mean and // 95% confidence interval can be computed. // 6. Auto-stops the sweep if either: // - mean per-subscriber rate drops below 10% of theoretical, // - or server CPU stays at >180% (>90% of 2 cores) for the // entire measurement window. // // Note on in-process vs separate processes: subscribers are // tasks within a single client process to keep the test self- // contained and avoid spawning N OS processes. Each task uses // its own EpConnection (TCP connection) to the server, so from // the server's perspective the load looks identical to N // distinct subscriber nodes for the property-propagation path. // The single-client-process design does mean that the client // host's CPU is shared across all subscribers; the orchestrator // records this too so degradation can be attributed correctly. // ------------------------------------------------------------ // Usage: // dotnet run -- --host 127.0.0.1 --port 10900 --resources 100 \ // --emit-interval-ms 50 --window-sec 60 \ // --warmup-sec 5 --replications 3 \ // --n-values 2,5,10,20,50,100,200,500 // ============================================================ using Esiur.Protocol; using Esiur.Resource; using System.Data.Common; using System.Diagnostics; using System.Globalization; var host = GetArg(args, "--host", "127.0.0.1"); var port = int.Parse(GetArg(args, "--port", "10900")); var resources = int.Parse(GetArg(args, "--resources", "100")); var emitIntervalMs = int.Parse(GetArg(args, "--emit-interval-ms", "50")); var windowSec = int.Parse(GetArg(args, "--window-sec", "60")); var warmupSec = int.Parse(GetArg(args, "--warmup-sec", "5")); var settleSec = int.Parse(GetArg(args, "--settle-sec", "5")); var replications = int.Parse(GetArg(args, "--replications", "3")); // Late-delivery deadline. The paper defines a late delivery as a gap exceeding eight emission // intervals (400 ms at the 50 ms default). When sweeping emit rate this MUST scale with the // interval, otherwise low-rate runs (e.g. 1 Hz, where deliveries are 1000 ms apart by design) // would flag every delivery as late. Orchestrator passes 8 x emit-interval-ms. var lateThresholdMs = double.Parse(GetArg(args, "--late-threshold-ms", "400"), CultureInfo.InvariantCulture); var nValuesStr = GetArg(args, "--n-values", "2,5,10,20,50,100,200,500"); var outputCsv = GetArg(args, "--output", "fanout_sweep_results.csv"); var repOutputCsv = GetArg(args, "--rep-output", ""); var nValues = nValuesStr.Split(',').Select(int.Parse).ToArray(); double theoreticalMaxRate = 1000.0 / emitIntervalMs * resources; double minAcceptableRate = theoreticalMaxRate * 0.10; Console.WriteLine($"[Orchestrator] resources={resources} interval={emitIntervalMs}ms " + $"window={windowSec}s replications={replications}"); Console.WriteLine($"[Orchestrator] theoretical_max_per_subscriber_rate={theoreticalMaxRate:F0} notif/s"); Console.WriteLine($"[Orchestrator] late_threshold={lateThresholdMs:F0} ms"); Console.WriteLine($"[Orchestrator] saturation_threshold={minAcceptableRate:F0} notif/s"); Console.WriteLine($"[Orchestrator] N values: {string.Join(",", nValues)}"); // ---------------------------------------------------------------- // Attach to the server's control resource once. // ---------------------------------------------------------------- var controlWh = new Warehouse(); EpResource? control = null; byte cpuIdx = 255, clientsIdx = 255; try { var controlConn = await controlWh.Get($"ep://{host}:{port}"); control = (EpResource)await controlConn.Get("sys/control"); // Resolve property indices by name (EpResource exposes values by index, not dynamic member). var props = control.Instance.Definition.Properties; cpuIdx = (byte)Array.FindIndex(props, p => p.Name == "CpuPercent"); clientsIdx = (byte)Array.FindIndex(props, p => p.Name == "ConnectedClients"); Console.WriteLine($"[Orchestrator] sys/control attached (CpuPercent=idx {cpuIdx}, ConnectedClients=idx {clientsIdx})."); } catch (Exception ex) { Console.WriteLine($"[Orchestrator] WARNING: could not attach to sys/control: {ex.Message}"); Console.WriteLine("[Orchestrator] Server CPU will be reported as N/A."); } // ---------------------------------------------------------------- // All sweep points x replications, with per-N early-stop logic. // ---------------------------------------------------------------- var allResults = new List(); var allRepResults = new List(); bool saturatedDetected = false; foreach (int n in nValues) { if (saturatedDetected) { Console.WriteLine($"\n[Orchestrator] N={n}: SKIPPED (saturation reached at lower N)"); continue; } var perRepResults = new List(); for (int rep = 0; rep < replications; rep++) { Console.WriteLine($"\n[Orchestrator] === N={n} rep={rep + 1}/{replications} ==="); var subscribers = new SubscriberTask[n]; var subscriberWhs = new Warehouse[n]; // ---------- spawn N subscribers ---------- Console.WriteLine($"[Orchestrator] Spawning {n} subscribers..."); var spawnSw = Stopwatch.StartNew(); var spawnTasks = new Task[n]; for (int i = 0; i < n; i++) { int captured = i; subscriberWhs[i] = new Warehouse(); spawnTasks[i] = SpawnSubscriber(subscriberWhs[i], host, port, resources, captured, lateThresholdMs); } await Task.WhenAll(spawnTasks); bool spawnFailed = false; for (int i = 0; i < n; i++) { if (spawnTasks[i].Result == null) spawnFailed = true; else subscribers[i] = spawnTasks[i].Result!; } spawnSw.Stop(); if (spawnFailed) { Console.WriteLine($"[Orchestrator] N={n}: spawn failed; treating as saturation."); saturatedDetected = true; await TeardownAll(subscribers, subscriberWhs); break; } Console.WriteLine($"[Orchestrator] All {n} subscribers attached in {spawnSw.Elapsed.TotalSeconds:F2}s"); // ---------- warmup ---------- Console.WriteLine($"[Orchestrator] Warmup {warmupSec}s..."); await Task.Delay(warmupSec * 1000); foreach (var s in subscribers) s.ResetCounters(); // ---------- measurement window with CPU sampling ---------- Console.WriteLine($"[Orchestrator] Measurement window {windowSec}s..."); var cpuSamples = new List(); var connSamples = new List(); var clientCpuSamples = new List(); var clientProc = Process.GetCurrentProcess(); var prevClientCpu = clientProc.TotalProcessorTime; var prevClientWall = DateTime.UtcNow; var winSw = Stopwatch.StartNew(); while (winSw.Elapsed.TotalSeconds < windowSec) { await Task.Delay(1000); // Server CPU + subscriber count via the control resource (read by property index; // values arrive as variable-width numerics, hence Convert.*). if (control != null && cpuIdx != 255) { try { if (control.TryGetPropertyValue(cpuIdx, out var cpuVal) && cpuVal != null) cpuSamples.Add(Convert.ToDouble(cpuVal)); if (control.TryGetPropertyValue(clientsIdx, out var cliVal) && cliVal != null) connSamples.Add(Convert.ToInt32(cliVal)); } catch { /* control resource may not have a current value yet */ } } // This harness's own CPU (% across all cores). Recorded so saturation can be attributed // to the server rather than to the single subscriber process driving N connections. clientProc.Refresh(); var nowClientCpu = clientProc.TotalProcessorTime; var nowClientWall = DateTime.UtcNow; var wallMs = (nowClientWall - prevClientWall).TotalMilliseconds; if (wallMs > 0) clientCpuSamples.Add((nowClientCpu - prevClientCpu).TotalMilliseconds / wallMs * 100.0); prevClientCpu = nowClientCpu; prevClientWall = nowClientWall; } double elapsedSec = winSw.Elapsed.TotalSeconds; // ---------- collect per-subscriber counts ---------- var perSubRates = new double[n]; long totalReceived = 0; long totalLate = 0; for (int i = 0; i < n; i++) { perSubRates[i] = subscribers[i].Received / elapsedSec; totalReceived += subscribers[i].Received; totalLate += subscribers[i].LateDeliveries; } double meanPerSub = perSubRates.Average(); double stdPerSub = StdDev(perSubRates); double minPerSub = perSubRates.Min(); double maxPerSub = perSubRates.Max(); double aggregate = perSubRates.Sum(); double avgServerCpu = cpuSamples.Count > 0 ? cpuSamples.Average() : double.NaN; double peakServerCpu = cpuSamples.Count > 0 ? cpuSamples.Max() : double.NaN; double avgClientCpu = clientCpuSamples.Count > 0 ? clientCpuSamples.Average() : double.NaN; double peakClientCpu = clientCpuSamples.Count > 0 ? clientCpuSamples.Max() : double.NaN; Console.WriteLine($"[Orchestrator] N={n} rep={rep + 1}: " + $"mean_per_sub={meanPerSub:F1}/s " + $"std_per_sub={stdPerSub:F1}/s " + $"aggregate={aggregate:F0}/s " + $"late={totalLate} " + $"server_cpu_avg={avgServerCpu:F1}%/peak={peakServerCpu:F1}% " + $"client_cpu_avg={avgClientCpu:F1}%/peak={peakClientCpu:F1}%"); perRepResults.Add(new RepResult { N = n, Rep = rep + 1, MeanPerSub = meanPerSub, StdPerSub = stdPerSub, MinPerSub = minPerSub, MaxPerSub = maxPerSub, Aggregate = aggregate, LateDeliveries = totalLate, ServerCpuAvg = avgServerCpu, ServerCpuPeak = peakServerCpu, ClientCpuAvg = avgClientCpu, ClientCpuPeak = peakClientCpu, }); // ---------- teardown ---------- Console.WriteLine($"[Orchestrator] Tearing down {n} subscribers..."); await TeardownAll(subscribers, subscriberWhs); await Task.Delay(settleSec * 1000); } // ---------- per-N aggregation ---------- if (perRepResults.Count > 0) { allRepResults.AddRange(perRepResults); double meanOfMeans = perRepResults.Average(r => r.MeanPerSub); double ciHalfWidth = ConfidenceIntervalHalfWidth95( perRepResults.Select(r => r.MeanPerSub).ToArray()); Console.WriteLine($"\n[Orchestrator] N={n} SUMMARY: " + $"mean_per_sub={meanOfMeans:F1} ± {ciHalfWidth:F1} notif/s (95% CI)"); // Saturation detection: stop sweep if per-sub rate falls below 10% of theoretical OR the // server's dispatch path saturates a core. CPU is "% across all cores" (100% = one full // core), and fan-out is single-threaded, so the relevant signal is a per-core plateau // near 100%, not 2-core saturation. if (meanOfMeans < minAcceptableRate) { Console.WriteLine($"[Orchestrator] *** SATURATION DETECTED: rate {meanOfMeans:F0} < {minAcceptableRate:F0} ***"); saturatedDetected = true; } else if (perRepResults.Average(r => r.ServerCpuPeak) > 95.0) { Console.WriteLine($"[Orchestrator] *** SATURATION DETECTED: server dispatch core saturated (peak > 95%) ***"); saturatedDetected = true; } // Aggregate row for CSV allResults.Add(new SweepResult { N = n, Replications = perRepResults.Count, MeanPerSubRate = meanOfMeans, Ci95HalfWidth = ciHalfWidth, MeanAggregate = perRepResults.Average(r => r.Aggregate), TotalLate = perRepResults.Sum(r => r.LateDeliveries), MeanServerCpuAvg = perRepResults.Average(r => r.ServerCpuAvg), MeanServerCpuPeak = perRepResults.Average(r => r.ServerCpuPeak), MeanClientCpuAvg = perRepResults.Average(r => r.ClientCpuAvg), MeanClientCpuPeak = perRepResults.Average(r => r.ClientCpuPeak), }); } } // ---------------------------------------------------------------- // Output // ---------------------------------------------------------------- var sb = new System.Text.StringBuilder(); sb.AppendLine("n,replications,mean_per_sub_rate,ci95_halfwidth,mean_aggregate," + "total_late,mean_server_cpu_avg,mean_server_cpu_peak,mean_client_cpu_avg,mean_client_cpu_peak"); foreach (var r in allResults) { sb.AppendLine(string.Create(CultureInfo.InvariantCulture, $"{r.N},{r.Replications},{r.MeanPerSubRate:F2},{r.Ci95HalfWidth:F2}," + $"{r.MeanAggregate:F1},{r.TotalLate},{r.MeanServerCpuAvg:F2},{r.MeanServerCpuPeak:F2}," + $"{r.MeanClientCpuAvg:F2},{r.MeanClientCpuPeak:F2}")); } await File.WriteAllTextAsync(outputCsv, sb.ToString()); Console.WriteLine($"\n[Orchestrator] Results written to {outputCsv}"); if (!string.IsNullOrWhiteSpace(repOutputCsv)) { var repSb = new System.Text.StringBuilder(); repSb.AppendLine("n,rep,mean_per_sub_rate,std_per_sub_rate,min_per_sub_rate,max_per_sub_rate," + "aggregate,late_deliveries,server_cpu_avg,server_cpu_peak,client_cpu_avg,client_cpu_peak"); foreach (var r in allRepResults) { repSb.AppendLine(string.Create(CultureInfo.InvariantCulture, $"{r.N},{r.Rep},{r.MeanPerSub:F2},{r.StdPerSub:F2},{r.MinPerSub:F2},{r.MaxPerSub:F2}," + $"{r.Aggregate:F1},{r.LateDeliveries},{r.ServerCpuAvg:F2},{r.ServerCpuPeak:F2}," + $"{r.ClientCpuAvg:F2},{r.ClientCpuPeak:F2}")); } await File.WriteAllTextAsync(repOutputCsv, repSb.ToString()); Console.WriteLine($"[Orchestrator] Replication rows written to {repOutputCsv}"); } // ---------------------------------------------------------------- // Subscriber spawn / teardown // ---------------------------------------------------------------- static async Task SpawnSubscriber( Warehouse wh, string host, int port, int resources, int subId, double lateThresholdMs) { try { var conn = await wh.Get($"ep://{host}:{port}"); var sub = new SubscriberTask { SubscriberId = subId, Connection = conn }; for (int i = 0; i < resources; i++) { var proxy = await conn.Get($"sys/sensor_{i}"); sub.Resources.Add(proxy); long lastTick = Stopwatch.GetTimestamp(); proxy.Instance.PropertyModified += (PropertyModificationInfo data) => { if (data.Name != "Value") return; long now = Stopwatch.GetTimestamp(); double elapsedMs = (now - lastTick) * 1000.0 / Stopwatch.Frequency; lastTick = now; Interlocked.Increment(ref sub._received); if (elapsedMs > lateThresholdMs) Interlocked.Increment(ref sub._lateDeliveries); }; } return sub; } catch (Exception ex) { Console.WriteLine($" [Spawn-{subId}] FAILED: {ex.Message}"); return null; } } static async Task TeardownAll(SubscriberTask[] subscribers, Warehouse[] whs) { foreach (var subscriber in subscribers) { if (subscriber == null) continue; try { subscriber.Connection?.Close(); } catch { /* ignore */ } subscriber.Connection = null; subscriber.Resources.Clear(); } foreach (var wh in whs) { try { await wh.Close(); } catch { /* ignore */ } } } // ---------------------------------------------------------------- // Stats helpers // ---------------------------------------------------------------- static double StdDev(double[] xs) { if (xs.Length < 2) return 0; double mean = xs.Average(); double sumSq = xs.Sum(x => (x - mean) * (x - mean)); return Math.Sqrt(sumSq / (xs.Length - 1)); } /// /// 95% confidence interval half-width using Student's t-distribution. /// For very small samples (n < 3) returns 0 (not enough data). /// t values for 95% two-sided are hard-coded; see standard tables. /// static double ConfidenceIntervalHalfWidth95(double[] xs) { int n = xs.Length; if (n < 2) return 0; double std = StdDev(xs); double sem = std / Math.Sqrt(n); // t for df=n-1, two-sided 95% double t = (n - 1) switch { 1 => 12.706, 2 => 4.303, 3 => 3.182, 4 => 2.776, 5 => 2.571, 6 => 2.447, 7 => 2.365, 8 => 2.306, 9 => 2.262, _ => 1.960 // normal approximation }; return t * sem; } static string GetArg(string[] args, string key, string def) { int i = Array.IndexOf(args, key); return (i >= 0 && i + 1 < args.Length) ? args[i + 1] : def; } // ---------------------------------------------------------------- // Records // ---------------------------------------------------------------- class SubscriberTask { public int SubscriberId; public EpConnection? Connection; public readonly List Resources = new(); internal long _received; internal long _lateDeliveries; public long Received => Interlocked.Read(ref _received); public long LateDeliveries => Interlocked.Read(ref _lateDeliveries); public void ResetCounters() { Interlocked.Exchange(ref _received, 0); Interlocked.Exchange(ref _lateDeliveries, 0); } } record RepResult { public int N; public int Rep; public double MeanPerSub; public double StdPerSub; public double MinPerSub; public double MaxPerSub; public double Aggregate; public long LateDeliveries; public double ServerCpuAvg; public double ServerCpuPeak; public double ClientCpuAvg; public double ClientCpuPeak; } record SweepResult { public int N; public int Replications; public double MeanPerSubRate; public double Ci95HalfWidth; public double MeanAggregate; public long TotalLate; public double MeanServerCpuAvg; public double MeanServerCpuPeak; public double MeanClientCpuAvg; public double MeanClientCpuPeak; }