diff --git a/.gitignore b/.gitignore index c02784d..e339994 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ +experiments/ + ## Ignore Visual Studio temporary files, build results, and ## files generated by popular Visual Studio add-ons. diff --git a/Libraries/Esiur/Protocol/EpConnectionProtocol.cs b/Libraries/Esiur/Protocol/EpConnectionProtocol.cs index a9feb9a..c693c9a 100644 --- a/Libraries/Esiur/Protocol/EpConnectionProtocol.cs +++ b/Libraries/Esiur/Protocol/EpConnectionProtocol.cs @@ -2140,10 +2140,10 @@ partial class EpConnection /// exactly the partially-attached delivery that the wait-by-default resolver prevents and the /// legacy resolver does not. /// - internal void PublishGraph(EpResource root) + internal bool PublishGraph(EpResource root) { if (root == null) - return; + return true; var seen = new HashSet(); var reachable = new List(); @@ -2173,10 +2173,15 @@ partial class EpConnection if (fullyAttached) foreach (var node in reachable) node.Publish(); + + return fullyAttached; } void TrackDeliveredRoot(EpResource root) { + if (PublishGraph(root)) + return; + lock (_deliveredRootsLock) _deliveredRoots[root.ResourceInstanceId] = new WeakReference(root); @@ -2190,9 +2195,7 @@ partial class EpConnection var stale = new List(); foreach (var pair in _deliveredRoots) { - if (pair.Value.TryGetTarget(out var root)) - PublishGraph(root); - else + if (!pair.Value.TryGetTarget(out var root) || PublishGraph(root)) stale.Add(pair.Key); } diff --git a/Tests/Distribution/ConcurrentAttachSweep/Program.cs b/Tests/Distribution/ConcurrentAttachSweep/Program.cs index 3b0a5fa..03976a1 100644 --- a/Tests/Distribution/ConcurrentAttachSweep/Program.cs +++ b/Tests/Distribution/ConcurrentAttachSweep/Program.cs @@ -3,7 +3,7 @@ // ------------------------------------------------------------ // Extends Tests/Distribution/ConcurrentAttach with: // - Sweep over a wider range of concurrent request counts A. -// - More rounds per A for confidence-interval reporting. +// - More rounds per A for sample-standard-deviation reporting. // - Auto-stop when timeouts or failures appear (the // saturation signal for concurrent attach is different // from fan-out: it's *correctness* failure, not slowdown). @@ -35,10 +35,10 @@ var timeoutMs = int.Parse(GetArg(args, "--timeout", "10000")); var rounds = int.Parse(GetArg(args, "--rounds", "10")); var aValStr = GetArg(args, "--a-values", "10,25,50,100,250,500,1000,2000"); var outCsv = GetArg(args, "--output", "concurrent_attach_sweep.csv"); +var roundsCsv = GetArg(args, "--round-output", Path.ChangeExtension(outCsv, ".rounds.csv")); var aValues = aValStr.Split(',').Select(int.Parse).ToArray(); var serverWh = new Warehouse(); -var clientWh = new Warehouse(); // ---------------------------------------------------------------- // SERVER SIDE @@ -72,8 +72,11 @@ if (mode == "server" || mode == "both") // ---------------------------------------------------------------- Console.WriteLine($"[Client-T3+] resources={resources} timeout={timeoutMs}ms rounds={rounds}"); Console.WriteLine($"[Client-T3+] A values: {string.Join(",", aValues)}"); +Console.WriteLine($"[Client-T3+] output={outCsv}"); +Console.WriteLine($"[Client-T3+] round-output={roundsCsv}"); var summary = new List(); +var allRoundResults = new List(); bool failureDetected = false; foreach (int A in aValues) @@ -98,7 +101,9 @@ foreach (int A in aValues) var latencies = new double[A]; var roundSw = Stopwatch.StartNew(); - // One shared connection per round, matching the existing test methodology + // A fresh client warehouse per round keeps the ten samples independent: + // previous attaches cannot turn later rounds into local cache hits. + var clientWh = new Warehouse(); var connection = await clientWh.Get($"ep://{host}:{port}"); var tasks = targets.Select((resourceIdx, taskIdx) => Task.Run(async () => @@ -141,29 +146,32 @@ foreach (int A in aValues) Failed = failed, TimedOut = timedOut, WallMs = roundSw.Elapsed.TotalMilliseconds, - P50 = sorted[Math.Min(n - 1, (int)(n * 0.50))], - P95 = sorted[Math.Min(n - 1, (int)(n * 0.95))], - P99 = sorted[Math.Min(n - 1, (int)(n * 0.99))], + P50 = Quantile(sorted, 0.50), + P90 = Quantile(sorted, 0.90), + P95 = Quantile(sorted, 0.95), + P99 = Quantile(sorted, 0.99), Max = sorted[n - 1], + Mean = sorted.Average(), }; roundResults.Add(rr); + allRoundResults.Add(rr); Console.WriteLine($" round {round + 1}/{rounds}: ok={succeeded}/{A} fail={failed} " - + $"timeout={timedOut} wall={rr.WallMs:F0}ms p50={rr.P50:F0} p99={rr.P99:F0}"); - - // Round 1 of each A is conventionally excluded from latency - // aggregation due to connection-establishment overhead (matches - // the existing test methodology). + + $"timeout={timedOut} wall={rr.WallMs:F0}ms p50={rr.P50:F0} p90={rr.P90:F0} p99={rr.P99:F0}"); + await clientWh.Close(); GC.Collect(); await Task.Delay(500); } - var steady = roundResults.Skip(1).ToList(); // exclude round 1 - if (steady.Count == 0) steady = roundResults; - var anyFailure = roundResults.Any(r => r.Failed > 0 || r.TimedOut > 0); + var p50 = MeanStdDev(roundResults.Select(r => r.P50)); + var p90 = MeanStdDev(roundResults.Select(r => r.P90)); + var p99 = MeanStdDev(roundResults.Select(r => r.P99)); + var mean = MeanStdDev(roundResults.Select(r => r.Mean)); + var wall = MeanStdDev(roundResults.Select(r => r.WallMs)); + var s = new ASummary { A = A, @@ -172,19 +180,25 @@ foreach (int A in aValues) TotalSucceeded = roundResults.Sum(r => r.Succeeded), TotalFailed = roundResults.Sum(r => r.Failed), TotalTimedOut = roundResults.Sum(r => r.TimedOut), - MeanP50 = steady.Average(r => r.P50), - Ci95P50 = ConfidenceIntervalHalfWidth95(steady.Select(r => r.P50).ToArray()), - MeanP99 = steady.Average(r => r.P99), - Ci95P99 = ConfidenceIntervalHalfWidth95(steady.Select(r => r.P99).ToArray()), - MeanWall = steady.Average(r => r.WallMs), - Ci95Wall = ConfidenceIntervalHalfWidth95(steady.Select(r => r.WallMs).ToArray()), + MeanP50 = p50.Mean, + SdP50 = p50.SampleStdDev, + MeanP90 = p90.Mean, + SdP90 = p90.SampleStdDev, + MeanP99 = p99.Mean, + SdP99 = p99.SampleStdDev, + MeanLatency = mean.Mean, + SdLatency = mean.SampleStdDev, + MeanWall = wall.Mean, + SdWall = wall.SampleStdDev, }; summary.Add(s); Console.WriteLine($" [A={A}] SUMMARY: " - + $"p50={s.MeanP50:F1}±{s.Ci95P50:F1} " - + $"p99={s.MeanP99:F1}±{s.Ci95P99:F1} " - + $"wall={s.MeanWall:F0}±{s.Ci95Wall:F0}ms " + + $"p50={s.MeanP50:F1}±{s.SdP50:F1} " + + $"p90={s.MeanP90:F1}±{s.SdP90:F1} " + + $"p99={s.MeanP99:F1}±{s.SdP99:F1} " + + $"mean={s.MeanLatency:F1}±{s.SdLatency:F1} " + + $"wall={s.MeanWall:F0}±{s.SdWall:F0}ms " + $"failures={s.TotalFailed + s.TotalTimedOut}/{s.TotalSucceeded + s.TotalFailed + s.TotalTimedOut}"); if (anyFailure) @@ -199,46 +213,56 @@ foreach (int A in aValues) // ---------------------------------------------------------------- var sb = new System.Text.StringBuilder(); sb.AppendLine("a,rounds,any_failures,total_succeeded,total_failed,total_timed_out," + - "mean_p50_ms,ci95_p50,mean_p99_ms,ci95_p99,mean_wall_ms,ci95_wall"); + "mean_p50_ms,sd_p50_ms,mean_p90_ms,sd_p90_ms,mean_p99_ms,sd_p99_ms," + + "mean_latency_ms,sd_latency_ms,mean_wall_ms,sd_wall_ms"); foreach (var r in summary) { sb.AppendLine(string.Create(CultureInfo.InvariantCulture, $"{r.A},{r.Rounds},{r.AnyFailures},{r.TotalSucceeded},{r.TotalFailed},{r.TotalTimedOut}," + - $"{r.MeanP50:F2},{r.Ci95P50:F2},{r.MeanP99:F2},{r.Ci95P99:F2}," + - $"{r.MeanWall:F2},{r.Ci95Wall:F2}")); + $"{r.MeanP50:F2},{r.SdP50:F2},{r.MeanP90:F2},{r.SdP90:F2}," + + $"{r.MeanP99:F2},{r.SdP99:F2},{r.MeanLatency:F2},{r.SdLatency:F2}," + + $"{r.MeanWall:F2},{r.SdWall:F2}")); } await File.WriteAllTextAsync(outCsv, sb.ToString()); Console.WriteLine($"\n[Client-T3+] Results written to {outCsv}"); +var rsb = new System.Text.StringBuilder(); +rsb.AppendLine("a,round,succeeded,failed,timed_out,wall_ms,p50_ms,p90_ms,p95_ms,p99_ms,max_ms,mean_ms"); +foreach (var r in allRoundResults) +{ + rsb.AppendLine(string.Create(CultureInfo.InvariantCulture, + $"{r.A},{r.Round},{r.Succeeded},{r.Failed},{r.TimedOut}," + + $"{r.WallMs:F2},{r.P50:F2},{r.P90:F2},{r.P95:F2},{r.P99:F2},{r.Max:F2},{r.Mean:F2}")); +} +await File.WriteAllTextAsync(roundsCsv, rsb.ToString()); +Console.WriteLine($"[Client-T3+] Round results written to {roundsCsv}"); + if (mode == "server" || mode == "both") await serverWh.Close(); -if (mode == "client" || mode == "both") await clientWh.Close(); // ---------------------------------------------------------------- // Helpers // ---------------------------------------------------------------- -static double ConfidenceIntervalHalfWidth95(double[] xs) +static double Quantile(double[] sorted, double p) { - int n = xs.Length; - if (n < 2) return 0; - double mean = xs.Average(); - double sumSq = xs.Sum(x => (x - mean) * (x - mean)); - double std = Math.Sqrt(sumSq / (n - 1)); - double sem = std / Math.Sqrt(n); - double t = (n - 1) switch + return sorted[Math.Min(sorted.Length - 1, (int)(sorted.Length * p))]; +} + +static (double Mean, double SampleStdDev) MeanStdDev(IEnumerable values) +{ + var xs = values.ToArray(); + var mean = xs.Average(); + + if (xs.Length < 2) + return (mean, 0); + + var sumSq = xs.Sum(x => { - 1 => 12.706, - 2 => 4.303, - 3 => 3.182, - 4 => 2.776, - 5 => 2.571, - 6 => 2.447, - 7 => 2.365, - 8 => 2.306, - 9 => 2.262, - _ => 1.960 - }; - return t * sem; + var d = x - mean; + return d * d; + }); + + return (mean, Math.Sqrt(sumSq / (xs.Length - 1))); } static string GetArg(string[] args, string key, string def) @@ -256,9 +280,11 @@ record RoundResult public long TimedOut; public double WallMs; public double P50; + public double P90; public double P95; public double P99; public double Max; + public double Mean; } record ASummary @@ -270,9 +296,13 @@ record ASummary public long TotalFailed; public long TotalTimedOut; public double MeanP50; - public double Ci95P50; + public double SdP50; + public double MeanP90; + public double SdP90; public double MeanP99; - public double Ci95P99; + public double SdP99; + public double MeanLatency; + public double SdLatency; public double MeanWall; - public double Ci95Wall; -} \ No newline at end of file + public double SdWall; +} diff --git a/Tests/Distribution/NodeFanoutSweep/Client/Program.cs b/Tests/Distribution/NodeFanoutSweep/Client/Program.cs index bd0273a..5353b2a 100644 --- a/Tests/Distribution/NodeFanoutSweep/Client/Program.cs +++ b/Tests/Distribution/NodeFanoutSweep/Client/Program.cs @@ -52,6 +52,7 @@ var settleSec = int.Parse(GetArg(args, "--settle-sec", "5")); var replications = int.Parse(GetArg(args, "--replications", "3")); var nValuesStr = GetArg(args, "--n-values", "2,5,10,20,50,100,200,500"); var outputCsv = GetArg(args, "--output", "fanout_sweep_results.csv"); +var repOutputCsv = GetArg(args, "--rep-output", ""); var nValues = nValuesStr.Split(',').Select(int.Parse).ToArray(); double theoreticalMaxRate = 1000.0 / emitIntervalMs * resources; @@ -89,6 +90,7 @@ catch (Exception ex) // All sweep points x replications, with per-N early-stop logic. // ---------------------------------------------------------------- var allResults = new List(); +var allRepResults = new List(); bool saturatedDetected = false; foreach (int n in nValues) @@ -208,6 +210,7 @@ foreach (int n in nValues) Console.WriteLine($"[Orchestrator] N={n} rep={rep + 1}: " + $"mean_per_sub={meanPerSub:F1}/s " + + $"std_per_sub={stdPerSub:F1}/s " + $"aggregate={aggregate:F0}/s " + $"late={totalLate} " + $"server_cpu_avg={avgServerCpu:F1}%/peak={peakServerCpu:F1}% " @@ -238,6 +241,8 @@ foreach (int n in nValues) // ---------- per-N aggregation ---------- if (perRepResults.Count > 0) { + allRepResults.AddRange(perRepResults); + double meanOfMeans = perRepResults.Average(r => r.MeanPerSub); double ciHalfWidth = ConfidenceIntervalHalfWidth95( perRepResults.Select(r => r.MeanPerSub).ToArray()); @@ -291,6 +296,24 @@ foreach (var r in allResults) await File.WriteAllTextAsync(outputCsv, sb.ToString()); Console.WriteLine($"\n[Orchestrator] Results written to {outputCsv}"); +if (!string.IsNullOrWhiteSpace(repOutputCsv)) +{ + var repSb = new System.Text.StringBuilder(); + repSb.AppendLine("n,rep,mean_per_sub_rate,std_per_sub_rate,min_per_sub_rate,max_per_sub_rate," + + "aggregate,late_deliveries,server_cpu_avg,server_cpu_peak,client_cpu_avg,client_cpu_peak"); + + foreach (var r in allRepResults) + { + repSb.AppendLine(string.Create(CultureInfo.InvariantCulture, + $"{r.N},{r.Rep},{r.MeanPerSub:F2},{r.StdPerSub:F2},{r.MinPerSub:F2},{r.MaxPerSub:F2}," + + $"{r.Aggregate:F1},{r.LateDeliveries},{r.ServerCpuAvg:F2},{r.ServerCpuPeak:F2}," + + $"{r.ClientCpuAvg:F2},{r.ClientCpuPeak:F2}")); + } + + await File.WriteAllTextAsync(repOutputCsv, repSb.ToString()); + Console.WriteLine($"[Orchestrator] Replication rows written to {repOutputCsv}"); +} + // ---------------------------------------------------------------- // Subscriber spawn / teardown // ---------------------------------------------------------------- diff --git a/Tests/Distribution/ResourceCount/Client/Program.cs b/Tests/Distribution/ResourceCount/Client/Program.cs index 299a49b..c628aeb 100644 --- a/Tests/Distribution/ResourceCount/Client/Program.cs +++ b/Tests/Distribution/ResourceCount/Client/Program.cs @@ -5,107 +5,227 @@ // // Also tests notification latency after all resources are ready. // -// Usage: dotnet run -- --host 127.0.0.1 --port 10901 --resources 10000 +// Usage: +// dotnet run -- --host 127.0.0.1 --port 10901 --resources 10000 --batch 10000 +// dotnet run -- --host 127.0.0.1 --port 10901 --resources 10000 --batch 10 --rounds 10 +// +// Outputs: +// test2_attach_rounds.csv per-round P50/P95/P99/mean/wall time +// test2_attach_aggregate.csv mean + sample standard deviation across rounds +// test2_attach_latencies.csv raw per-attach latencies, tagged by round // ============================================================ using Esiur.Protocol; using Esiur.Resource; using System.Diagnostics; -using System.Text.RegularExpressions; +using System.Globalization; +using System.Text; var host = GetArg(args, "--host", "127.0.0.1"); var port = int.Parse(GetArg(args, "--port", "10901")); var resourceCount = int.Parse(GetArg(args, "--resources", "10000")); var batchSize = int.Parse(GetArg(args, "--batch", "10000")); +var rounds = int.Parse(GetArg(args, "--rounds", "1")); +var measureNotifications = bool.Parse(GetArg(args, "--notifications", rounds == 1 ? "true" : "false")); -Console.WriteLine($"[Client-T2] Connecting to {host}:{port}, resources={resourceCount}"); +if (rounds < 1) + throw new ArgumentOutOfRangeException(nameof(rounds), "--rounds must be >= 1."); -var wh = new Warehouse(); +Console.WriteLine($"[Client-T2] Connecting to {host}:{port}, resources={resourceCount}, batch={batchSize}, rounds={rounds}"); -var connnection = await wh.Get( - $"ep://{host}:{port}"); +var roundResults = new List(rounds); +var latencyRows = new List<(int Round, double LatencyMs)>(resourceCount * rounds); -var attachLatencies = new List(resourceCount); -var proxies = new IResource[resourceCount]; - -// --- Attach in batches to avoid overwhelming the runtime ------------- -var totalSw = Stopwatch.StartNew(); - -for (int batch = 0; batch < resourceCount; batch += batchSize) +for (int round = 1; round <= rounds; round++) { + var result = await RunRound(round); + roundResults.Add(result); + latencyRows.AddRange(result.Latencies.Select(l => (round, l))); - int end = Math.Min(batch + batchSize, resourceCount); - var batchTasks = new Task[end - batch]; - - for (int i = batch; i < end; i++) - { - int capturedI = i; - batchTasks[i - batch] = Task.Run(async () => - { - var sw = Stopwatch.StartNew(); - - //Console.WriteLine(capturedI); - proxies[capturedI] = await connnection.Get($"sys/sensor_{capturedI}"); - - //Console.WriteLine(proxies[capturedI].Instance.Link); - - sw.Stop(); - - lock (attachLatencies) - attachLatencies.Add(sw.Elapsed.TotalMilliseconds); - }); - } - - await Task.WhenAll(batchTasks); - //Console.WriteLine("D"); - if (batch % 1000 == 0) - Console.WriteLine($"[Client-T2] Attached {Math.Min(batch + batchSize, resourceCount)}/{resourceCount} " + - $"elapsed={totalSw.Elapsed.TotalSeconds:F1}s"); + Console.WriteLine( + $"[Client-T2] Round {round}/{rounds}: " + + $"wall={result.WallSeconds:F2}s " + + $"p50={result.P50:F2}ms p99={result.P99:F2}ms mean={result.Mean:F2}ms"); } -totalSw.Stop(); -Console.WriteLine($"[Client-T2] All attached in {totalSw.Elapsed.TotalSeconds:F2}s"); - -// --- Latency statistics --------------------------------------------- -attachLatencies.Sort(); -int n = attachLatencies.Count; - -Console.WriteLine($"[Client-T2] Attach latency (ms):"); -Console.WriteLine($" min={attachLatencies[0]:F2}"); -Console.WriteLine($" p50={attachLatencies[(int)(n * 0.50)]:F2}"); -Console.WriteLine($" p95={attachLatencies[(int)(n * 0.95)]:F2}"); -Console.WriteLine($" p99={attachLatencies[(int)(n * 0.99)]:F2}"); -Console.WriteLine($" max={attachLatencies[n - 1]:F2}"); -Console.WriteLine($" mean={attachLatencies.Average():F2}"); - -// --- Notification round-trip after full load ------------------------ -Console.WriteLine("[Client-T2] Measuring notification latency under full resource load..."); -long received = 0; -double sumLatencyMs = 0; - -for (int i = 0; i < resourceCount; i++) -{ - int capturedI = i; - proxies[capturedI].Instance.PropertyModified += (PropertyModificationInfo data) => - { - if (data.Name == "Value") - Interlocked.Increment(ref received); - }; -} - -await connnection.Call("UpdateValues"); - -await Task.Delay(10000); // observe for 10s -Console.WriteLine($"[Client-T2] Received {received} notifications in 10s from first 500 resources"); +Console.WriteLine("[Client-T2] Aggregate across rounds (sample standard deviation):"); +PrintAggregate("wall_s", roundResults.Select(r => r.WallSeconds)); +PrintAggregate("min_ms", roundResults.Select(r => r.Min)); +PrintAggregate("p50_ms", roundResults.Select(r => r.P50)); +PrintAggregate("p95_ms", roundResults.Select(r => r.P95)); +PrintAggregate("p99_ms", roundResults.Select(r => r.P99)); +PrintAggregate("max_ms", roundResults.Select(r => r.Max)); +PrintAggregate("mean_ms", roundResults.Select(r => r.Mean)); // --- CSV output ----------------------------------------------------- -string csv = "attach_latency_ms\n" + string.Join("\n", attachLatencies.Select(l => l.ToString("F3"))); -await File.WriteAllTextAsync("test2_attach_latencies.csv", csv); +var roundCsv = new StringBuilder(); +roundCsv.AppendLine("round,resources,batch,wall_s,min_ms,p50_ms,p95_ms,p99_ms,max_ms,mean_ms,notifications_received"); +foreach (var r in roundResults) +{ + roundCsv.AppendLine( + $"{r.Round},{resourceCount},{batchSize},{F(r.WallSeconds)},{F(r.Min)},{F(r.P50)},{F(r.P95)},{F(r.P99)},{F(r.Max)},{F(r.Mean)},{r.NotificationsReceived}"); +} + +await File.WriteAllTextAsync("test2_attach_rounds.csv", roundCsv.ToString()); + +var aggregateCsv = new StringBuilder(); +aggregateCsv.AppendLine("metric,rounds,mean,sample_stddev"); +AppendAggregateCsv(aggregateCsv, "wall_s", roundResults.Select(r => r.WallSeconds)); +AppendAggregateCsv(aggregateCsv, "min_ms", roundResults.Select(r => r.Min)); +AppendAggregateCsv(aggregateCsv, "p50_ms", roundResults.Select(r => r.P50)); +AppendAggregateCsv(aggregateCsv, "p95_ms", roundResults.Select(r => r.P95)); +AppendAggregateCsv(aggregateCsv, "p99_ms", roundResults.Select(r => r.P99)); +AppendAggregateCsv(aggregateCsv, "max_ms", roundResults.Select(r => r.Max)); +AppendAggregateCsv(aggregateCsv, "mean_ms", roundResults.Select(r => r.Mean)); +await File.WriteAllTextAsync("test2_attach_aggregate.csv", aggregateCsv.ToString()); + +var latencyCsv = new StringBuilder(); +latencyCsv.AppendLine("round,attach_latency_ms"); +foreach (var (round, latencyMs) in latencyRows) + latencyCsv.AppendLine($"{round},{F(latencyMs)}"); + +await File.WriteAllTextAsync("test2_attach_latencies.csv", latencyCsv.ToString()); + +Console.WriteLine("[Client-T2] Round summaries written to test2_attach_rounds.csv"); +Console.WriteLine("[Client-T2] Aggregate statistics written to test2_attach_aggregate.csv"); Console.WriteLine("[Client-T2] Attach latencies written to test2_attach_latencies.csv"); +async Task RunRound(int round) +{ + var wh = new Warehouse(); + var connection = await wh.Get($"ep://{host}:{port}"); + + var attachLatencies = new List(resourceCount); + var proxies = new IResource[resourceCount]; + + // --- Attach in batches to avoid overwhelming the runtime ------------- + var totalSw = Stopwatch.StartNew(); + + for (int batch = 0; batch < resourceCount; batch += batchSize) + { + int end = Math.Min(batch + batchSize, resourceCount); + var batchTasks = new Task[end - batch]; + + for (int i = batch; i < end; i++) + { + int capturedI = i; + batchTasks[i - batch] = Task.Run(async () => + { + var sw = Stopwatch.StartNew(); + proxies[capturedI] = await connection.Get($"sys/sensor_{capturedI}"); + sw.Stop(); + + lock (attachLatencies) + attachLatencies.Add(sw.Elapsed.TotalMilliseconds); + }); + } + + await Task.WhenAll(batchTasks); + + if (batch % 1000 == 0) + Console.WriteLine($"[Client-T2] Round {round}/{rounds}: attached {Math.Min(batch + batchSize, resourceCount)}/{resourceCount} " + + $"elapsed={totalSw.Elapsed.TotalSeconds:F1}s"); + } + + totalSw.Stop(); + + attachLatencies.Sort(); + int n = attachLatencies.Count; + + long received = 0; + if (measureNotifications) + { + Console.WriteLine($"[Client-T2] Round {round}/{rounds}: measuring notification latency under full resource load..."); + + for (int i = 0; i < resourceCount; i++) + { + int capturedI = i; + var proxy = proxies[capturedI] ?? throw new InvalidOperationException($"Resource {capturedI} was not attached."); + var instance = proxy.Instance ?? throw new InvalidOperationException($"Resource {capturedI} has no instance."); + instance.PropertyModified += (PropertyModificationInfo data) => + { + if (data.Name == "Value") + Interlocked.Increment(ref received); + }; + } + + await connection.Call("UpdateValues"); + await Task.Delay(10000); + Console.WriteLine($"[Client-T2] Round {round}/{rounds}: received {received} notifications in 10s"); + } + + await wh.Close(); + + return new RoundResult( + round, + totalSw.Elapsed.TotalSeconds, + attachLatencies[0], + Quantile(attachLatencies, 0.50), + Quantile(attachLatencies, 0.95), + Quantile(attachLatencies, 0.99), + attachLatencies[n - 1], + attachLatencies.Average(), + received, + attachLatencies); +} + +static double Quantile(IReadOnlyList sorted, double p) +{ + return sorted[Math.Min(sorted.Count - 1, (int)(sorted.Count * p))]; +} + +static void PrintAggregate(string metric, IEnumerable values) +{ + var arr = values.ToArray(); + Console.WriteLine($" {metric,-8} mean={Mean(arr):F3} sample_stddev={SampleStdDev(arr):F3}"); +} + +static void AppendAggregateCsv(StringBuilder csv, string metric, IEnumerable values) +{ + var arr = values.ToArray(); + csv.AppendLine($"{metric},{arr.Length},{F(Mean(arr))},{F(SampleStdDev(arr))}"); +} + +static double Mean(IReadOnlyList values) +{ + return values.Average(); +} + +static double SampleStdDev(IReadOnlyList values) +{ + if (values.Count < 2) + return double.NaN; + + var mean = Mean(values); + var sumSquares = values.Sum(x => + { + var d = x - mean; + return d * d; + }); + + return Math.Sqrt(sumSquares / (values.Count - 1)); +} + +static string F(double value) +{ + return value.ToString("F3", CultureInfo.InvariantCulture); +} + static string GetArg(string[] args, string key, string def) { int i = Array.IndexOf(args, key); return (i >= 0 && i + 1 < args.Length) ? args[i + 1] : def; } + +record RoundResult( + int Round, + double WallSeconds, + double Min, + double P50, + double P95, + double P99, + double Max, + double Mean, + long NotificationsReceived, + List Latencies); diff --git a/deadlock_WaitWithCycleDetection_127.0.0.1_11001.csv b/deadlock_WaitWithCycleDetection_127.0.0.1_11001.csv deleted file mode 100644 index 490e4da..0000000 --- a/deadlock_WaitWithCycleDetection_127.0.0.1_11001.csv +++ /dev/null @@ -1,4 +0,0 @@ -iteration,outcome,ms,attached,cycle_breaks,unnecessary_placeholders,unpublished -1,Completed,394.7,300,53,0,0 -2,Completed,508.0,300,53,0,0 -3,Completed,335.2,300,53,0,0 \ No newline at end of file diff --git a/deadlock_WaitWithCycleDetection_127.0.0.1_11201.csv b/deadlock_WaitWithCycleDetection_127.0.0.1_11201.csv deleted file mode 100644 index b1a836f..0000000 --- a/deadlock_WaitWithCycleDetection_127.0.0.1_11201.csv +++ /dev/null @@ -1,4 +0,0 @@ -iteration,outcome,ms,attached,cycle_breaks,unnecessary_placeholders,unpublished -1,Completed,71.4,75,14,0,0 -2,Completed,16.7,75,14,0,0 -3,Completed,23.3,75,14,0,0 \ No newline at end of file diff --git a/deadlock_WaitWithCycleDetection_127.0.0.1_11202.csv b/deadlock_WaitWithCycleDetection_127.0.0.1_11202.csv deleted file mode 100644 index 6904ab4..0000000 --- a/deadlock_WaitWithCycleDetection_127.0.0.1_11202.csv +++ /dev/null @@ -1,4 +0,0 @@ -iteration,outcome,ms,attached,cycle_breaks,unnecessary_placeholders,unpublished -1,Completed,115.2,150,30,0,0 -2,Completed,74.1,150,30,0,0 -3,Completed,79.8,150,30,0,0 \ No newline at end of file diff --git a/deadlock_WaitWithCycleDetection_127.0.0.1_11203.csv b/deadlock_WaitWithCycleDetection_127.0.0.1_11203.csv deleted file mode 100644 index 7a107e6..0000000 --- a/deadlock_WaitWithCycleDetection_127.0.0.1_11203.csv +++ /dev/null @@ -1,4 +0,0 @@ -iteration,outcome,ms,attached,cycle_breaks,unnecessary_placeholders,unpublished -1,Completed,344.4,300,53,0,0 -2,Completed,254.2,300,53,0,0 -3,Completed,273.7,300,53,0,0 \ No newline at end of file diff --git a/deadlock_WaitWithCycleDetection_127.0.0.1_11204.csv b/deadlock_WaitWithCycleDetection_127.0.0.1_11204.csv deleted file mode 100644 index 1d089bf..0000000 --- a/deadlock_WaitWithCycleDetection_127.0.0.1_11204.csv +++ /dev/null @@ -1,4 +0,0 @@ -iteration,outcome,ms,attached,cycle_breaks,unnecessary_placeholders,unpublished -1,Completed,975.6,600,81,0,0 -2,Completed,623.9,600,81,0,0 -3,Completed,670.0,600,81,0,0 \ No newline at end of file diff --git a/deadlock_WaitWithCycleDetection_127.0.0.1_11205.csv b/deadlock_WaitWithCycleDetection_127.0.0.1_11205.csv deleted file mode 100644 index 2bbacc2..0000000 --- a/deadlock_WaitWithCycleDetection_127.0.0.1_11205.csv +++ /dev/null @@ -1,4 +0,0 @@ -iteration,outcome,ms,attached,cycle_breaks,unnecessary_placeholders,unpublished -1,Completed,5956.2,1200,162,0,0 -2,Completed,5649.9,1200,162,0,0 -3,Completed,4763.6,1200,162,0,0 \ No newline at end of file diff --git a/deadlock_WaitWithCycleDetection_127.0.0.1_11206.csv b/deadlock_WaitWithCycleDetection_127.0.0.1_11206.csv deleted file mode 100644 index e3ea074..0000000 --- a/deadlock_WaitWithCycleDetection_127.0.0.1_11206.csv +++ /dev/null @@ -1,4 +0,0 @@ -iteration,outcome,ms,attached,cycle_breaks,unnecessary_placeholders,unpublished -1,Completed,56724.2,2400,325,0,0 -2,Completed,64782.9,2400,325,0,0 -3,Completed,50450.8,2400,325,0,0 \ No newline at end of file