2
0
mirror of https://github.com/esiur/esiur-dotnet.git synced 2026-06-13 14:38:43 +00:00
This commit is contained in:
2026-06-10 17:47:21 +03:00
parent 2f39aabf7e
commit f16d40d1c5
12 changed files with 310 additions and 160 deletions
+2
View File
@@ -1,3 +1,5 @@
experiments/
## Ignore Visual Studio temporary files, build results, and ## Ignore Visual Studio temporary files, build results, and
## files generated by popular Visual Studio add-ons. ## files generated by popular Visual Studio add-ons.
@@ -2140,10 +2140,10 @@ partial class EpConnection
/// exactly the partially-attached delivery that the wait-by-default resolver prevents and the /// exactly the partially-attached delivery that the wait-by-default resolver prevents and the
/// legacy resolver does not. /// legacy resolver does not.
/// </summary> /// </summary>
internal void PublishGraph(EpResource root) internal bool PublishGraph(EpResource root)
{ {
if (root == null) if (root == null)
return; return true;
var seen = new HashSet<uint>(); var seen = new HashSet<uint>();
var reachable = new List<EpResource>(); var reachable = new List<EpResource>();
@@ -2173,10 +2173,15 @@ partial class EpConnection
if (fullyAttached) if (fullyAttached)
foreach (var node in reachable) foreach (var node in reachable)
node.Publish(); node.Publish();
return fullyAttached;
} }
void TrackDeliveredRoot(EpResource root) void TrackDeliveredRoot(EpResource root)
{ {
if (PublishGraph(root))
return;
lock (_deliveredRootsLock) lock (_deliveredRootsLock)
_deliveredRoots[root.ResourceInstanceId] = new WeakReference<EpResource>(root); _deliveredRoots[root.ResourceInstanceId] = new WeakReference<EpResource>(root);
@@ -2190,9 +2195,7 @@ partial class EpConnection
var stale = new List<uint>(); var stale = new List<uint>();
foreach (var pair in _deliveredRoots) foreach (var pair in _deliveredRoots)
{ {
if (pair.Value.TryGetTarget(out var root)) if (!pair.Value.TryGetTarget(out var root) || PublishGraph(root))
PublishGraph(root);
else
stale.Add(pair.Key); stale.Add(pair.Key);
} }
@@ -3,7 +3,7 @@
// ------------------------------------------------------------ // ------------------------------------------------------------
// Extends Tests/Distribution/ConcurrentAttach with: // Extends Tests/Distribution/ConcurrentAttach with:
// - Sweep over a wider range of concurrent request counts A. // - Sweep over a wider range of concurrent request counts A.
// - More rounds per A for confidence-interval reporting. // - More rounds per A for sample-standard-deviation reporting.
// - Auto-stop when timeouts or failures appear (the // - Auto-stop when timeouts or failures appear (the
// saturation signal for concurrent attach is different // saturation signal for concurrent attach is different
// from fan-out: it's *correctness* failure, not slowdown). // from fan-out: it's *correctness* failure, not slowdown).
@@ -35,10 +35,10 @@ var timeoutMs = int.Parse(GetArg(args, "--timeout", "10000"));
var rounds = int.Parse(GetArg(args, "--rounds", "10")); var rounds = int.Parse(GetArg(args, "--rounds", "10"));
var aValStr = GetArg(args, "--a-values", "10,25,50,100,250,500,1000,2000"); var aValStr = GetArg(args, "--a-values", "10,25,50,100,250,500,1000,2000");
var outCsv = GetArg(args, "--output", "concurrent_attach_sweep.csv"); var outCsv = GetArg(args, "--output", "concurrent_attach_sweep.csv");
var roundsCsv = GetArg(args, "--round-output", Path.ChangeExtension(outCsv, ".rounds.csv"));
var aValues = aValStr.Split(',').Select(int.Parse).ToArray(); var aValues = aValStr.Split(',').Select(int.Parse).ToArray();
var serverWh = new Warehouse(); var serverWh = new Warehouse();
var clientWh = new Warehouse();
// ---------------------------------------------------------------- // ----------------------------------------------------------------
// SERVER SIDE // SERVER SIDE
@@ -72,8 +72,11 @@ if (mode == "server" || mode == "both")
// ---------------------------------------------------------------- // ----------------------------------------------------------------
Console.WriteLine($"[Client-T3+] resources={resources} timeout={timeoutMs}ms rounds={rounds}"); Console.WriteLine($"[Client-T3+] resources={resources} timeout={timeoutMs}ms rounds={rounds}");
Console.WriteLine($"[Client-T3+] A values: {string.Join(",", aValues)}"); Console.WriteLine($"[Client-T3+] A values: {string.Join(",", aValues)}");
Console.WriteLine($"[Client-T3+] output={outCsv}");
Console.WriteLine($"[Client-T3+] round-output={roundsCsv}");
var summary = new List<ASummary>(); var summary = new List<ASummary>();
var allRoundResults = new List<RoundResult>();
bool failureDetected = false; bool failureDetected = false;
foreach (int A in aValues) foreach (int A in aValues)
@@ -98,7 +101,9 @@ foreach (int A in aValues)
var latencies = new double[A]; var latencies = new double[A];
var roundSw = Stopwatch.StartNew(); var roundSw = Stopwatch.StartNew();
// One shared connection per round, matching the existing test methodology // A fresh client warehouse per round keeps the ten samples independent:
// previous attaches cannot turn later rounds into local cache hits.
var clientWh = new Warehouse();
var connection = await clientWh.Get<EpConnection>($"ep://{host}:{port}"); var connection = await clientWh.Get<EpConnection>($"ep://{host}:{port}");
var tasks = targets.Select((resourceIdx, taskIdx) => Task.Run(async () => var tasks = targets.Select((resourceIdx, taskIdx) => Task.Run(async () =>
@@ -141,29 +146,32 @@ foreach (int A in aValues)
Failed = failed, Failed = failed,
TimedOut = timedOut, TimedOut = timedOut,
WallMs = roundSw.Elapsed.TotalMilliseconds, WallMs = roundSw.Elapsed.TotalMilliseconds,
P50 = sorted[Math.Min(n - 1, (int)(n * 0.50))], P50 = Quantile(sorted, 0.50),
P95 = sorted[Math.Min(n - 1, (int)(n * 0.95))], P90 = Quantile(sorted, 0.90),
P99 = sorted[Math.Min(n - 1, (int)(n * 0.99))], P95 = Quantile(sorted, 0.95),
P99 = Quantile(sorted, 0.99),
Max = sorted[n - 1], Max = sorted[n - 1],
Mean = sorted.Average(),
}; };
roundResults.Add(rr); roundResults.Add(rr);
allRoundResults.Add(rr);
Console.WriteLine($" round {round + 1}/{rounds}: ok={succeeded}/{A} fail={failed} " Console.WriteLine($" round {round + 1}/{rounds}: ok={succeeded}/{A} fail={failed} "
+ $"timeout={timedOut} wall={rr.WallMs:F0}ms p50={rr.P50:F0} p99={rr.P99:F0}"); + $"timeout={timedOut} wall={rr.WallMs:F0}ms p50={rr.P50:F0} p90={rr.P90:F0} p99={rr.P99:F0}");
// Round 1 of each A is conventionally excluded from latency
// aggregation due to connection-establishment overhead (matches
// the existing test methodology).
await clientWh.Close();
GC.Collect(); GC.Collect();
await Task.Delay(500); await Task.Delay(500);
} }
var steady = roundResults.Skip(1).ToList(); // exclude round 1
if (steady.Count == 0) steady = roundResults;
var anyFailure = roundResults.Any(r => r.Failed > 0 || r.TimedOut > 0); var anyFailure = roundResults.Any(r => r.Failed > 0 || r.TimedOut > 0);
var p50 = MeanStdDev(roundResults.Select(r => r.P50));
var p90 = MeanStdDev(roundResults.Select(r => r.P90));
var p99 = MeanStdDev(roundResults.Select(r => r.P99));
var mean = MeanStdDev(roundResults.Select(r => r.Mean));
var wall = MeanStdDev(roundResults.Select(r => r.WallMs));
var s = new ASummary var s = new ASummary
{ {
A = A, A = A,
@@ -172,19 +180,25 @@ foreach (int A in aValues)
TotalSucceeded = roundResults.Sum(r => r.Succeeded), TotalSucceeded = roundResults.Sum(r => r.Succeeded),
TotalFailed = roundResults.Sum(r => r.Failed), TotalFailed = roundResults.Sum(r => r.Failed),
TotalTimedOut = roundResults.Sum(r => r.TimedOut), TotalTimedOut = roundResults.Sum(r => r.TimedOut),
MeanP50 = steady.Average(r => r.P50), MeanP50 = p50.Mean,
Ci95P50 = ConfidenceIntervalHalfWidth95(steady.Select(r => r.P50).ToArray()), SdP50 = p50.SampleStdDev,
MeanP99 = steady.Average(r => r.P99), MeanP90 = p90.Mean,
Ci95P99 = ConfidenceIntervalHalfWidth95(steady.Select(r => r.P99).ToArray()), SdP90 = p90.SampleStdDev,
MeanWall = steady.Average(r => r.WallMs), MeanP99 = p99.Mean,
Ci95Wall = ConfidenceIntervalHalfWidth95(steady.Select(r => r.WallMs).ToArray()), SdP99 = p99.SampleStdDev,
MeanLatency = mean.Mean,
SdLatency = mean.SampleStdDev,
MeanWall = wall.Mean,
SdWall = wall.SampleStdDev,
}; };
summary.Add(s); summary.Add(s);
Console.WriteLine($" [A={A}] SUMMARY: " Console.WriteLine($" [A={A}] SUMMARY: "
+ $"p50={s.MeanP50:F1}±{s.Ci95P50:F1} " + $"p50={s.MeanP50:F1}±{s.SdP50:F1} "
+ $"p99={s.MeanP99:F1}±{s.Ci95P99:F1} " + $"p90={s.MeanP90:F1}±{s.SdP90:F1} "
+ $"wall={s.MeanWall:F0}±{s.Ci95Wall:F0}ms " + $"p99={s.MeanP99:F1}±{s.SdP99:F1} "
+ $"mean={s.MeanLatency:F1}±{s.SdLatency:F1} "
+ $"wall={s.MeanWall:F0}±{s.SdWall:F0}ms "
+ $"failures={s.TotalFailed + s.TotalTimedOut}/{s.TotalSucceeded + s.TotalFailed + s.TotalTimedOut}"); + $"failures={s.TotalFailed + s.TotalTimedOut}/{s.TotalSucceeded + s.TotalFailed + s.TotalTimedOut}");
if (anyFailure) if (anyFailure)
@@ -199,46 +213,56 @@ foreach (int A in aValues)
// ---------------------------------------------------------------- // ----------------------------------------------------------------
var sb = new System.Text.StringBuilder(); var sb = new System.Text.StringBuilder();
sb.AppendLine("a,rounds,any_failures,total_succeeded,total_failed,total_timed_out," + sb.AppendLine("a,rounds,any_failures,total_succeeded,total_failed,total_timed_out," +
"mean_p50_ms,ci95_p50,mean_p99_ms,ci95_p99,mean_wall_ms,ci95_wall"); "mean_p50_ms,sd_p50_ms,mean_p90_ms,sd_p90_ms,mean_p99_ms,sd_p99_ms," +
"mean_latency_ms,sd_latency_ms,mean_wall_ms,sd_wall_ms");
foreach (var r in summary) foreach (var r in summary)
{ {
sb.AppendLine(string.Create(CultureInfo.InvariantCulture, sb.AppendLine(string.Create(CultureInfo.InvariantCulture,
$"{r.A},{r.Rounds},{r.AnyFailures},{r.TotalSucceeded},{r.TotalFailed},{r.TotalTimedOut}," + $"{r.A},{r.Rounds},{r.AnyFailures},{r.TotalSucceeded},{r.TotalFailed},{r.TotalTimedOut}," +
$"{r.MeanP50:F2},{r.Ci95P50:F2},{r.MeanP99:F2},{r.Ci95P99:F2}," + $"{r.MeanP50:F2},{r.SdP50:F2},{r.MeanP90:F2},{r.SdP90:F2}," +
$"{r.MeanWall:F2},{r.Ci95Wall:F2}")); $"{r.MeanP99:F2},{r.SdP99:F2},{r.MeanLatency:F2},{r.SdLatency:F2}," +
$"{r.MeanWall:F2},{r.SdWall:F2}"));
} }
await File.WriteAllTextAsync(outCsv, sb.ToString()); await File.WriteAllTextAsync(outCsv, sb.ToString());
Console.WriteLine($"\n[Client-T3+] Results written to {outCsv}"); Console.WriteLine($"\n[Client-T3+] Results written to {outCsv}");
var rsb = new System.Text.StringBuilder();
rsb.AppendLine("a,round,succeeded,failed,timed_out,wall_ms,p50_ms,p90_ms,p95_ms,p99_ms,max_ms,mean_ms");
foreach (var r in allRoundResults)
{
rsb.AppendLine(string.Create(CultureInfo.InvariantCulture,
$"{r.A},{r.Round},{r.Succeeded},{r.Failed},{r.TimedOut}," +
$"{r.WallMs:F2},{r.P50:F2},{r.P90:F2},{r.P95:F2},{r.P99:F2},{r.Max:F2},{r.Mean:F2}"));
}
await File.WriteAllTextAsync(roundsCsv, rsb.ToString());
Console.WriteLine($"[Client-T3+] Round results written to {roundsCsv}");
if (mode == "server" || mode == "both") await serverWh.Close(); if (mode == "server" || mode == "both") await serverWh.Close();
if (mode == "client" || mode == "both") await clientWh.Close();
// ---------------------------------------------------------------- // ----------------------------------------------------------------
// Helpers // Helpers
// ---------------------------------------------------------------- // ----------------------------------------------------------------
static double ConfidenceIntervalHalfWidth95(double[] xs) static double Quantile(double[] sorted, double p)
{ {
int n = xs.Length; return sorted[Math.Min(sorted.Length - 1, (int)(sorted.Length * p))];
if (n < 2) return 0; }
double mean = xs.Average();
double sumSq = xs.Sum(x => (x - mean) * (x - mean)); static (double Mean, double SampleStdDev) MeanStdDev(IEnumerable<double> values)
double std = Math.Sqrt(sumSq / (n - 1)); {
double sem = std / Math.Sqrt(n); var xs = values.ToArray();
double t = (n - 1) switch var mean = xs.Average();
if (xs.Length < 2)
return (mean, 0);
var sumSq = xs.Sum(x =>
{ {
1 => 12.706, var d = x - mean;
2 => 4.303, return d * d;
3 => 3.182, });
4 => 2.776,
5 => 2.571, return (mean, Math.Sqrt(sumSq / (xs.Length - 1)));
6 => 2.447,
7 => 2.365,
8 => 2.306,
9 => 2.262,
_ => 1.960
};
return t * sem;
} }
static string GetArg(string[] args, string key, string def) static string GetArg(string[] args, string key, string def)
@@ -256,9 +280,11 @@ record RoundResult
public long TimedOut; public long TimedOut;
public double WallMs; public double WallMs;
public double P50; public double P50;
public double P90;
public double P95; public double P95;
public double P99; public double P99;
public double Max; public double Max;
public double Mean;
} }
record ASummary record ASummary
@@ -270,9 +296,13 @@ record ASummary
public long TotalFailed; public long TotalFailed;
public long TotalTimedOut; public long TotalTimedOut;
public double MeanP50; public double MeanP50;
public double Ci95P50; public double SdP50;
public double MeanP90;
public double SdP90;
public double MeanP99; public double MeanP99;
public double Ci95P99; public double SdP99;
public double MeanLatency;
public double SdLatency;
public double MeanWall; public double MeanWall;
public double Ci95Wall; public double SdWall;
} }
@@ -52,6 +52,7 @@ var settleSec = int.Parse(GetArg(args, "--settle-sec", "5"));
var replications = int.Parse(GetArg(args, "--replications", "3")); var replications = int.Parse(GetArg(args, "--replications", "3"));
var nValuesStr = GetArg(args, "--n-values", "2,5,10,20,50,100,200,500"); var nValuesStr = GetArg(args, "--n-values", "2,5,10,20,50,100,200,500");
var outputCsv = GetArg(args, "--output", "fanout_sweep_results.csv"); var outputCsv = GetArg(args, "--output", "fanout_sweep_results.csv");
var repOutputCsv = GetArg(args, "--rep-output", "");
var nValues = nValuesStr.Split(',').Select(int.Parse).ToArray(); var nValues = nValuesStr.Split(',').Select(int.Parse).ToArray();
double theoreticalMaxRate = 1000.0 / emitIntervalMs * resources; double theoreticalMaxRate = 1000.0 / emitIntervalMs * resources;
@@ -89,6 +90,7 @@ catch (Exception ex)
// All sweep points x replications, with per-N early-stop logic. // All sweep points x replications, with per-N early-stop logic.
// ---------------------------------------------------------------- // ----------------------------------------------------------------
var allResults = new List<SweepResult>(); var allResults = new List<SweepResult>();
var allRepResults = new List<RepResult>();
bool saturatedDetected = false; bool saturatedDetected = false;
foreach (int n in nValues) foreach (int n in nValues)
@@ -208,6 +210,7 @@ foreach (int n in nValues)
Console.WriteLine($"[Orchestrator] N={n} rep={rep + 1}: " Console.WriteLine($"[Orchestrator] N={n} rep={rep + 1}: "
+ $"mean_per_sub={meanPerSub:F1}/s " + $"mean_per_sub={meanPerSub:F1}/s "
+ $"std_per_sub={stdPerSub:F1}/s "
+ $"aggregate={aggregate:F0}/s " + $"aggregate={aggregate:F0}/s "
+ $"late={totalLate} " + $"late={totalLate} "
+ $"server_cpu_avg={avgServerCpu:F1}%/peak={peakServerCpu:F1}% " + $"server_cpu_avg={avgServerCpu:F1}%/peak={peakServerCpu:F1}% "
@@ -238,6 +241,8 @@ foreach (int n in nValues)
// ---------- per-N aggregation ---------- // ---------- per-N aggregation ----------
if (perRepResults.Count > 0) if (perRepResults.Count > 0)
{ {
allRepResults.AddRange(perRepResults);
double meanOfMeans = perRepResults.Average(r => r.MeanPerSub); double meanOfMeans = perRepResults.Average(r => r.MeanPerSub);
double ciHalfWidth = ConfidenceIntervalHalfWidth95( double ciHalfWidth = ConfidenceIntervalHalfWidth95(
perRepResults.Select(r => r.MeanPerSub).ToArray()); perRepResults.Select(r => r.MeanPerSub).ToArray());
@@ -291,6 +296,24 @@ foreach (var r in allResults)
await File.WriteAllTextAsync(outputCsv, sb.ToString()); await File.WriteAllTextAsync(outputCsv, sb.ToString());
Console.WriteLine($"\n[Orchestrator] Results written to {outputCsv}"); Console.WriteLine($"\n[Orchestrator] Results written to {outputCsv}");
if (!string.IsNullOrWhiteSpace(repOutputCsv))
{
var repSb = new System.Text.StringBuilder();
repSb.AppendLine("n,rep,mean_per_sub_rate,std_per_sub_rate,min_per_sub_rate,max_per_sub_rate," +
"aggregate,late_deliveries,server_cpu_avg,server_cpu_peak,client_cpu_avg,client_cpu_peak");
foreach (var r in allRepResults)
{
repSb.AppendLine(string.Create(CultureInfo.InvariantCulture,
$"{r.N},{r.Rep},{r.MeanPerSub:F2},{r.StdPerSub:F2},{r.MinPerSub:F2},{r.MaxPerSub:F2}," +
$"{r.Aggregate:F1},{r.LateDeliveries},{r.ServerCpuAvg:F2},{r.ServerCpuPeak:F2}," +
$"{r.ClientCpuAvg:F2},{r.ClientCpuPeak:F2}"));
}
await File.WriteAllTextAsync(repOutputCsv, repSb.ToString());
Console.WriteLine($"[Orchestrator] Replication rows written to {repOutputCsv}");
}
// ---------------------------------------------------------------- // ----------------------------------------------------------------
// Subscriber spawn / teardown // Subscriber spawn / teardown
// ---------------------------------------------------------------- // ----------------------------------------------------------------
@@ -5,35 +5,104 @@
// //
// Also tests notification latency after all resources are ready. // Also tests notification latency after all resources are ready.
// //
// Usage: dotnet run -- --host 127.0.0.1 --port 10901 --resources 10000 // Usage:
// dotnet run -- --host 127.0.0.1 --port 10901 --resources 10000 --batch 10000
// dotnet run -- --host 127.0.0.1 --port 10901 --resources 10000 --batch 10 --rounds 10
//
// Outputs:
// test2_attach_rounds.csv per-round P50/P95/P99/mean/wall time
// test2_attach_aggregate.csv mean + sample standard deviation across rounds
// test2_attach_latencies.csv raw per-attach latencies, tagged by round
// ============================================================ // ============================================================
using Esiur.Protocol; using Esiur.Protocol;
using Esiur.Resource; using Esiur.Resource;
using System.Diagnostics; using System.Diagnostics;
using System.Text.RegularExpressions; using System.Globalization;
using System.Text;
var host = GetArg(args, "--host", "127.0.0.1"); var host = GetArg(args, "--host", "127.0.0.1");
var port = int.Parse(GetArg(args, "--port", "10901")); var port = int.Parse(GetArg(args, "--port", "10901"));
var resourceCount = int.Parse(GetArg(args, "--resources", "10000")); var resourceCount = int.Parse(GetArg(args, "--resources", "10000"));
var batchSize = int.Parse(GetArg(args, "--batch", "10000")); var batchSize = int.Parse(GetArg(args, "--batch", "10000"));
var rounds = int.Parse(GetArg(args, "--rounds", "1"));
var measureNotifications = bool.Parse(GetArg(args, "--notifications", rounds == 1 ? "true" : "false"));
Console.WriteLine($"[Client-T2] Connecting to {host}:{port}, resources={resourceCount}"); if (rounds < 1)
throw new ArgumentOutOfRangeException(nameof(rounds), "--rounds must be >= 1.");
var wh = new Warehouse(); Console.WriteLine($"[Client-T2] Connecting to {host}:{port}, resources={resourceCount}, batch={batchSize}, rounds={rounds}");
var connnection = await wh.Get<EpConnection>( var roundResults = new List<RoundResult>(rounds);
$"ep://{host}:{port}"); var latencyRows = new List<(int Round, double LatencyMs)>(resourceCount * rounds);
var attachLatencies = new List<double>(resourceCount); for (int round = 1; round <= rounds; round++)
var proxies = new IResource[resourceCount];
// --- Attach in batches to avoid overwhelming the runtime -------------
var totalSw = Stopwatch.StartNew();
for (int batch = 0; batch < resourceCount; batch += batchSize)
{ {
var result = await RunRound(round);
roundResults.Add(result);
latencyRows.AddRange(result.Latencies.Select(l => (round, l)));
Console.WriteLine(
$"[Client-T2] Round {round}/{rounds}: " +
$"wall={result.WallSeconds:F2}s " +
$"p50={result.P50:F2}ms p99={result.P99:F2}ms mean={result.Mean:F2}ms");
}
Console.WriteLine("[Client-T2] Aggregate across rounds (sample standard deviation):");
PrintAggregate("wall_s", roundResults.Select(r => r.WallSeconds));
PrintAggregate("min_ms", roundResults.Select(r => r.Min));
PrintAggregate("p50_ms", roundResults.Select(r => r.P50));
PrintAggregate("p95_ms", roundResults.Select(r => r.P95));
PrintAggregate("p99_ms", roundResults.Select(r => r.P99));
PrintAggregate("max_ms", roundResults.Select(r => r.Max));
PrintAggregate("mean_ms", roundResults.Select(r => r.Mean));
// --- CSV output -----------------------------------------------------
var roundCsv = new StringBuilder();
roundCsv.AppendLine("round,resources,batch,wall_s,min_ms,p50_ms,p95_ms,p99_ms,max_ms,mean_ms,notifications_received");
foreach (var r in roundResults)
{
roundCsv.AppendLine(
$"{r.Round},{resourceCount},{batchSize},{F(r.WallSeconds)},{F(r.Min)},{F(r.P50)},{F(r.P95)},{F(r.P99)},{F(r.Max)},{F(r.Mean)},{r.NotificationsReceived}");
}
await File.WriteAllTextAsync("test2_attach_rounds.csv", roundCsv.ToString());
var aggregateCsv = new StringBuilder();
aggregateCsv.AppendLine("metric,rounds,mean,sample_stddev");
AppendAggregateCsv(aggregateCsv, "wall_s", roundResults.Select(r => r.WallSeconds));
AppendAggregateCsv(aggregateCsv, "min_ms", roundResults.Select(r => r.Min));
AppendAggregateCsv(aggregateCsv, "p50_ms", roundResults.Select(r => r.P50));
AppendAggregateCsv(aggregateCsv, "p95_ms", roundResults.Select(r => r.P95));
AppendAggregateCsv(aggregateCsv, "p99_ms", roundResults.Select(r => r.P99));
AppendAggregateCsv(aggregateCsv, "max_ms", roundResults.Select(r => r.Max));
AppendAggregateCsv(aggregateCsv, "mean_ms", roundResults.Select(r => r.Mean));
await File.WriteAllTextAsync("test2_attach_aggregate.csv", aggregateCsv.ToString());
var latencyCsv = new StringBuilder();
latencyCsv.AppendLine("round,attach_latency_ms");
foreach (var (round, latencyMs) in latencyRows)
latencyCsv.AppendLine($"{round},{F(latencyMs)}");
await File.WriteAllTextAsync("test2_attach_latencies.csv", latencyCsv.ToString());
Console.WriteLine("[Client-T2] Round summaries written to test2_attach_rounds.csv");
Console.WriteLine("[Client-T2] Aggregate statistics written to test2_attach_aggregate.csv");
Console.WriteLine("[Client-T2] Attach latencies written to test2_attach_latencies.csv");
async Task<RoundResult> RunRound(int round)
{
var wh = new Warehouse();
var connection = await wh.Get<EpConnection>($"ep://{host}:{port}");
var attachLatencies = new List<double>(resourceCount);
var proxies = new IResource[resourceCount];
// --- Attach in batches to avoid overwhelming the runtime -------------
var totalSw = Stopwatch.StartNew();
for (int batch = 0; batch < resourceCount; batch += batchSize)
{
int end = Math.Min(batch + batchSize, resourceCount); int end = Math.Min(batch + batchSize, resourceCount);
var batchTasks = new Task[end - batch]; var batchTasks = new Task[end - batch];
@@ -43,12 +112,7 @@ for (int batch = 0; batch < resourceCount; batch += batchSize)
batchTasks[i - batch] = Task.Run(async () => batchTasks[i - batch] = Task.Run(async () =>
{ {
var sw = Stopwatch.StartNew(); var sw = Stopwatch.StartNew();
proxies[capturedI] = await connection.Get($"sys/sensor_{capturedI}");
//Console.WriteLine(capturedI);
proxies[capturedI] = await connnection.Get($"sys/sensor_{capturedI}");
//Console.WriteLine(proxies[capturedI].Instance.Link);
sw.Stop(); sw.Stop();
lock (attachLatencies) lock (attachLatencies)
@@ -57,51 +121,95 @@ for (int batch = 0; batch < resourceCount; batch += batchSize)
} }
await Task.WhenAll(batchTasks); await Task.WhenAll(batchTasks);
//Console.WriteLine("D");
if (batch % 1000 == 0) if (batch % 1000 == 0)
Console.WriteLine($"[Client-T2] Attached {Math.Min(batch + batchSize, resourceCount)}/{resourceCount} " + Console.WriteLine($"[Client-T2] Round {round}/{rounds}: attached {Math.Min(batch + batchSize, resourceCount)}/{resourceCount} " +
$"elapsed={totalSw.Elapsed.TotalSeconds:F1}s"); $"elapsed={totalSw.Elapsed.TotalSeconds:F1}s");
} }
totalSw.Stop(); totalSw.Stop();
Console.WriteLine($"[Client-T2] All attached in {totalSw.Elapsed.TotalSeconds:F2}s");
// --- Latency statistics --------------------------------------------- attachLatencies.Sort();
attachLatencies.Sort(); int n = attachLatencies.Count;
int n = attachLatencies.Count;
Console.WriteLine($"[Client-T2] Attach latency (ms):"); long received = 0;
Console.WriteLine($" min={attachLatencies[0]:F2}"); if (measureNotifications)
Console.WriteLine($" p50={attachLatencies[(int)(n * 0.50)]:F2}"); {
Console.WriteLine($" p95={attachLatencies[(int)(n * 0.95)]:F2}"); Console.WriteLine($"[Client-T2] Round {round}/{rounds}: measuring notification latency under full resource load...");
Console.WriteLine($" p99={attachLatencies[(int)(n * 0.99)]:F2}");
Console.WriteLine($" max={attachLatencies[n - 1]:F2}");
Console.WriteLine($" mean={attachLatencies.Average():F2}");
// --- Notification round-trip after full load ------------------------ for (int i = 0; i < resourceCount; i++)
Console.WriteLine("[Client-T2] Measuring notification latency under full resource load..."); {
long received = 0;
double sumLatencyMs = 0;
for (int i = 0; i < resourceCount; i++)
{
int capturedI = i; int capturedI = i;
proxies[capturedI].Instance.PropertyModified += (PropertyModificationInfo data) => var proxy = proxies[capturedI] ?? throw new InvalidOperationException($"Resource {capturedI} was not attached.");
var instance = proxy.Instance ?? throw new InvalidOperationException($"Resource {capturedI} has no instance.");
instance.PropertyModified += (PropertyModificationInfo data) =>
{ {
if (data.Name == "Value") if (data.Name == "Value")
Interlocked.Increment(ref received); Interlocked.Increment(ref received);
}; };
}
await connection.Call("UpdateValues");
await Task.Delay(10000);
Console.WriteLine($"[Client-T2] Round {round}/{rounds}: received {received} notifications in 10s");
}
await wh.Close();
return new RoundResult(
round,
totalSw.Elapsed.TotalSeconds,
attachLatencies[0],
Quantile(attachLatencies, 0.50),
Quantile(attachLatencies, 0.95),
Quantile(attachLatencies, 0.99),
attachLatencies[n - 1],
attachLatencies.Average(),
received,
attachLatencies);
} }
await connnection.Call("UpdateValues"); static double Quantile(IReadOnlyList<double> sorted, double p)
{
return sorted[Math.Min(sorted.Count - 1, (int)(sorted.Count * p))];
}
await Task.Delay(10000); // observe for 10s static void PrintAggregate(string metric, IEnumerable<double> values)
Console.WriteLine($"[Client-T2] Received {received} notifications in 10s from first 500 resources"); {
var arr = values.ToArray();
Console.WriteLine($" {metric,-8} mean={Mean(arr):F3} sample_stddev={SampleStdDev(arr):F3}");
}
// --- CSV output ----------------------------------------------------- static void AppendAggregateCsv(StringBuilder csv, string metric, IEnumerable<double> values)
string csv = "attach_latency_ms\n" + string.Join("\n", attachLatencies.Select(l => l.ToString("F3"))); {
await File.WriteAllTextAsync("test2_attach_latencies.csv", csv); var arr = values.ToArray();
Console.WriteLine("[Client-T2] Attach latencies written to test2_attach_latencies.csv"); csv.AppendLine($"{metric},{arr.Length},{F(Mean(arr))},{F(SampleStdDev(arr))}");
}
static double Mean(IReadOnlyList<double> values)
{
return values.Average();
}
static double SampleStdDev(IReadOnlyList<double> values)
{
if (values.Count < 2)
return double.NaN;
var mean = Mean(values);
var sumSquares = values.Sum(x =>
{
var d = x - mean;
return d * d;
});
return Math.Sqrt(sumSquares / (values.Count - 1));
}
static string F(double value)
{
return value.ToString("F3", CultureInfo.InvariantCulture);
}
static string GetArg(string[] args, string key, string def) static string GetArg(string[] args, string key, string def)
@@ -109,3 +217,15 @@ static string GetArg(string[] args, string key, string def)
int i = Array.IndexOf(args, key); int i = Array.IndexOf(args, key);
return (i >= 0 && i + 1 < args.Length) ? args[i + 1] : def; return (i >= 0 && i + 1 < args.Length) ? args[i + 1] : def;
} }
record RoundResult(
int Round,
double WallSeconds,
double Min,
double P50,
double P95,
double P99,
double Max,
double Mean,
long NotificationsReceived,
List<double> Latencies);
@@ -1,4 +0,0 @@
iteration,outcome,ms,attached,cycle_breaks,unnecessary_placeholders,unpublished
1,Completed,394.7,300,53,0,0
2,Completed,508.0,300,53,0,0
3,Completed,335.2,300,53,0,0
1 iteration outcome ms attached cycle_breaks unnecessary_placeholders unpublished
2 1 Completed 394.7 300 53 0 0
3 2 Completed 508.0 300 53 0 0
4 3 Completed 335.2 300 53 0 0
@@ -1,4 +0,0 @@
iteration,outcome,ms,attached,cycle_breaks,unnecessary_placeholders,unpublished
1,Completed,71.4,75,14,0,0
2,Completed,16.7,75,14,0,0
3,Completed,23.3,75,14,0,0
1 iteration outcome ms attached cycle_breaks unnecessary_placeholders unpublished
2 1 Completed 71.4 75 14 0 0
3 2 Completed 16.7 75 14 0 0
4 3 Completed 23.3 75 14 0 0
@@ -1,4 +0,0 @@
iteration,outcome,ms,attached,cycle_breaks,unnecessary_placeholders,unpublished
1,Completed,115.2,150,30,0,0
2,Completed,74.1,150,30,0,0
3,Completed,79.8,150,30,0,0
1 iteration outcome ms attached cycle_breaks unnecessary_placeholders unpublished
2 1 Completed 115.2 150 30 0 0
3 2 Completed 74.1 150 30 0 0
4 3 Completed 79.8 150 30 0 0
@@ -1,4 +0,0 @@
iteration,outcome,ms,attached,cycle_breaks,unnecessary_placeholders,unpublished
1,Completed,344.4,300,53,0,0
2,Completed,254.2,300,53,0,0
3,Completed,273.7,300,53,0,0
1 iteration outcome ms attached cycle_breaks unnecessary_placeholders unpublished
2 1 Completed 344.4 300 53 0 0
3 2 Completed 254.2 300 53 0 0
4 3 Completed 273.7 300 53 0 0
@@ -1,4 +0,0 @@
iteration,outcome,ms,attached,cycle_breaks,unnecessary_placeholders,unpublished
1,Completed,975.6,600,81,0,0
2,Completed,623.9,600,81,0,0
3,Completed,670.0,600,81,0,0
1 iteration outcome ms attached cycle_breaks unnecessary_placeholders unpublished
2 1 Completed 975.6 600 81 0 0
3 2 Completed 623.9 600 81 0 0
4 3 Completed 670.0 600 81 0 0
@@ -1,4 +0,0 @@
iteration,outcome,ms,attached,cycle_breaks,unnecessary_placeholders,unpublished
1,Completed,5956.2,1200,162,0,0
2,Completed,5649.9,1200,162,0,0
3,Completed,4763.6,1200,162,0,0
1 iteration outcome ms attached cycle_breaks unnecessary_placeholders unpublished
2 1 Completed 5956.2 1200 162 0 0
3 2 Completed 5649.9 1200 162 0 0
4 3 Completed 4763.6 1200 162 0 0
@@ -1,4 +0,0 @@
iteration,outcome,ms,attached,cycle_breaks,unnecessary_placeholders,unpublished
1,Completed,56724.2,2400,325,0,0
2,Completed,64782.9,2400,325,0,0
3,Completed,50450.8,2400,325,0,0
1 iteration outcome ms attached cycle_breaks unnecessary_placeholders unpublished
2 1 Completed 56724.2 2400 325 0 0
3 2 Completed 64782.9 2400 325 0 0
4 3 Completed 50450.8 2400 325 0 0