diff --git a/Tests/Distribution/NodeFanoutSweep/Client/Program.cs b/Tests/Distribution/NodeFanoutSweep/Client/Program.cs index 5353b2a..70d41cd 100644 --- a/Tests/Distribution/NodeFanoutSweep/Client/Program.cs +++ b/Tests/Distribution/NodeFanoutSweep/Client/Program.cs @@ -50,6 +50,11 @@ var windowSec = int.Parse(GetArg(args, "--window-sec", "60")); var warmupSec = int.Parse(GetArg(args, "--warmup-sec", "5")); var settleSec = int.Parse(GetArg(args, "--settle-sec", "5")); var replications = int.Parse(GetArg(args, "--replications", "3")); +// Late-delivery deadline. The paper defines a late delivery as a gap exceeding eight emission +// intervals (400 ms at the 50 ms default). When sweeping emit rate this MUST scale with the +// interval, otherwise low-rate runs (e.g. 1 Hz, where deliveries are 1000 ms apart by design) +// would flag every delivery as late. Orchestrator passes 8 x emit-interval-ms. +var lateThresholdMs = double.Parse(GetArg(args, "--late-threshold-ms", "400"), CultureInfo.InvariantCulture); var nValuesStr = GetArg(args, "--n-values", "2,5,10,20,50,100,200,500"); var outputCsv = GetArg(args, "--output", "fanout_sweep_results.csv"); var repOutputCsv = GetArg(args, "--rep-output", ""); @@ -61,6 +66,7 @@ double minAcceptableRate = theoreticalMaxRate * 0.10; Console.WriteLine($"[Orchestrator] resources={resources} interval={emitIntervalMs}ms " + $"window={windowSec}s replications={replications}"); Console.WriteLine($"[Orchestrator] theoretical_max_per_subscriber_rate={theoreticalMaxRate:F0} notif/s"); +Console.WriteLine($"[Orchestrator] late_threshold={lateThresholdMs:F0} ms"); Console.WriteLine($"[Orchestrator] saturation_threshold={minAcceptableRate:F0} notif/s"); Console.WriteLine($"[Orchestrator] N values: {string.Join(",", nValues)}"); @@ -118,7 +124,7 @@ foreach (int n in nValues) { int captured = i; subscriberWhs[i] = new Warehouse(); - spawnTasks[i] = SpawnSubscriber(subscriberWhs[i], host, port, resources, captured); + spawnTasks[i] = SpawnSubscriber(subscriberWhs[i], host, port, resources, captured, lateThresholdMs); } await Task.WhenAll(spawnTasks); @@ -250,16 +256,18 @@ foreach (int n in nValues) Console.WriteLine($"\n[Orchestrator] N={n} SUMMARY: " + $"mean_per_sub={meanOfMeans:F1} ± {ciHalfWidth:F1} notif/s (95% CI)"); - // Saturation detection: stop sweep if per-sub rate falls below - // 10% of theoretical OR server CPU peaked above 180% (>90% of 2 cores) + // Saturation detection: stop sweep if per-sub rate falls below 10% of theoretical OR the + // server's dispatch path saturates a core. CPU is "% across all cores" (100% = one full + // core), and fan-out is single-threaded, so the relevant signal is a per-core plateau + // near 100%, not 2-core saturation. if (meanOfMeans < minAcceptableRate) { Console.WriteLine($"[Orchestrator] *** SATURATION DETECTED: rate {meanOfMeans:F0} < {minAcceptableRate:F0} ***"); saturatedDetected = true; } - else if (perRepResults.Average(r => r.ServerCpuPeak) > 180.0) + else if (perRepResults.Average(r => r.ServerCpuPeak) > 95.0) { - Console.WriteLine($"[Orchestrator] *** SATURATION DETECTED: server CPU peaked > 180% ***"); + Console.WriteLine($"[Orchestrator] *** SATURATION DETECTED: server dispatch core saturated (peak > 95%) ***"); saturatedDetected = true; } @@ -318,7 +326,7 @@ if (!string.IsNullOrWhiteSpace(repOutputCsv)) // Subscriber spawn / teardown // ---------------------------------------------------------------- static async Task SpawnSubscriber( - Warehouse wh, string host, int port, int resources, int subId) + Warehouse wh, string host, int port, int resources, int subId, double lateThresholdMs) { try { @@ -338,7 +346,7 @@ static async Task SpawnSubscriber( double elapsedMs = (now - lastTick) * 1000.0 / Stopwatch.Frequency; lastTick = now; Interlocked.Increment(ref sub._received); - if (elapsedMs > 400) Interlocked.Increment(ref sub._lateDeliveries); + if (elapsedMs > lateThresholdMs) Interlocked.Increment(ref sub._lateDeliveries); }; }