From 7e27d3cfac15b467a59e3231bda557933b185e42 Mon Sep 17 00:00:00 2001 From: ahmed Date: Mon, 25 May 2026 14:12:56 +0300 Subject: [PATCH] new tests --- Esiur.sln | 33 ++ Libraries/Esiur/Data/DataDeserializer.cs | 38 +- Program2.cs | 8 + .../Esiur.Tests.ConcurrentAttachSweep.csproj | 14 + .../ConcurrentAttachSweep/Program.cs | 278 ++++++++++ .../ConcurrentAttachSweep/SensorResource.cs | 16 + .../Esiur.Tests.NodeFanoutSweep.Client.csproj | 10 + .../NodeFanoutSweep/Client/Program.cs | 1 + .../Esiur.Tests.NodeFanoutSweep.Server.csproj | 14 + .../NodeFanoutSweep/Server/Program.cs | 390 +++++++++++++ .../Client/Esiur.Tests.Queueing.Client.csproj | 8 + .../Distribution/Queueing/Client/Program2.cs | 176 ++++++ .../Queueing/Client/ReplicateState.cs | 183 +++++++ .../ComplexObject/DdsCdrCodec.cs | 511 ++++++++++++++++++ .../ComplexObject/ModelRunner.cs | 12 +- .../ComplexObject/Xcdr2Writer.cs | 332 ++++++++++++ 16 files changed, 1982 insertions(+), 42 deletions(-) create mode 100644 Program2.cs create mode 100644 Tests/Distribution/ConcurrentAttachSweep/Esiur.Tests.ConcurrentAttachSweep.csproj create mode 100644 Tests/Distribution/ConcurrentAttachSweep/Program.cs create mode 100644 Tests/Distribution/ConcurrentAttachSweep/SensorResource.cs create mode 100644 Tests/Distribution/NodeFanoutSweep/Client/Esiur.Tests.NodeFanoutSweep.Client.csproj create mode 100644 Tests/Distribution/NodeFanoutSweep/Client/Program.cs create mode 100644 Tests/Distribution/NodeFanoutSweep/Server/Esiur.Tests.NodeFanoutSweep.Server.csproj create mode 100644 Tests/Distribution/NodeFanoutSweep/Server/Program.cs create mode 100644 Tests/Distribution/Queueing/Client/Program2.cs create mode 100644 Tests/Distribution/Queueing/Client/ReplicateState.cs create mode 100644 Tests/Serialization/ComplexObject/DdsCdrCodec.cs create mode 100644 Tests/Serialization/ComplexObject/Xcdr2Writer.cs diff --git a/Esiur.sln b/Esiur.sln index e59c225..600224a 100644 --- a/Esiur.sln +++ b/Esiur.sln @@ -78,6 +78,20 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Server", "Server", "{02A07E EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.Queueing.Server", "Tests\Distribution\Queueing\Server\Esiur.Tests.Queueing.Server.csproj", "{7FD57668-2AD8-0F53-4006-03927B5A385C}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "NodeFanoutSweep", "NodeFanoutSweep", "{33D973D8-4D3E-47BA-8135-FCA0CFF7E210}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Server", "Server", "{57E80693-7AFC-4446-87DE-25E97C036E2F}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Client", "Client", "{21D42B96-99F9-4E48-A499-5170A5A9597F}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.NodeFanoutSweep.Server", "Tests\Distribution\NodeFanoutSweep\Server\Esiur.Tests.NodeFanoutSweep.Server.csproj", "{9FF626DF-1AD4-2BE1-F834-F49121D65085}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.NodeFanoutSweep.Client", "Tests\Distribution\NodeFanoutSweep\Client\Esiur.Tests.NodeFanoutSweep.Client.csproj", "{550A20AB-8E97-BCDD-9F54-27823663120A}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ConcurrentAttachSweep", "ConcurrentAttachSweep", "{E713D25F-2602-44C9-AB9E-C9477FB2BA93}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.ConcurrentAttachSweep", "Tests\Distribution\ConcurrentAttachSweep\Esiur.Tests.ConcurrentAttachSweep.csproj", "{3FFB2BF4-159E-3073-4BDF-08AE93C7A2C1}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -156,6 +170,18 @@ Global {7FD57668-2AD8-0F53-4006-03927B5A385C}.Debug|Any CPU.Build.0 = Debug|Any CPU {7FD57668-2AD8-0F53-4006-03927B5A385C}.Release|Any CPU.ActiveCfg = Release|Any CPU {7FD57668-2AD8-0F53-4006-03927B5A385C}.Release|Any CPU.Build.0 = Release|Any CPU + {9FF626DF-1AD4-2BE1-F834-F49121D65085}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {9FF626DF-1AD4-2BE1-F834-F49121D65085}.Debug|Any CPU.Build.0 = Debug|Any CPU + {9FF626DF-1AD4-2BE1-F834-F49121D65085}.Release|Any CPU.ActiveCfg = Release|Any CPU + {9FF626DF-1AD4-2BE1-F834-F49121D65085}.Release|Any CPU.Build.0 = Release|Any CPU + {550A20AB-8E97-BCDD-9F54-27823663120A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {550A20AB-8E97-BCDD-9F54-27823663120A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {550A20AB-8E97-BCDD-9F54-27823663120A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {550A20AB-8E97-BCDD-9F54-27823663120A}.Release|Any CPU.Build.0 = Release|Any CPU + {3FFB2BF4-159E-3073-4BDF-08AE93C7A2C1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {3FFB2BF4-159E-3073-4BDF-08AE93C7A2C1}.Debug|Any CPU.Build.0 = Debug|Any CPU + {3FFB2BF4-159E-3073-4BDF-08AE93C7A2C1}.Release|Any CPU.ActiveCfg = Release|Any CPU + {3FFB2BF4-159E-3073-4BDF-08AE93C7A2C1}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -193,6 +219,13 @@ Global {543158AD-BCB6-44A5-91AB-FE97B42A9C95} = {6F173323-75C1-4142-A4FE-0CEC2EB5EAF9} {02A07E3C-67DB-4489-88E3-D568C477F540} = {6F173323-75C1-4142-A4FE-0CEC2EB5EAF9} {7FD57668-2AD8-0F53-4006-03927B5A385C} = {02A07E3C-67DB-4489-88E3-D568C477F540} + {33D973D8-4D3E-47BA-8135-FCA0CFF7E210} = {94C8CFDB-C7C6-40DF-A596-647FEEA3C917} + {57E80693-7AFC-4446-87DE-25E97C036E2F} = {33D973D8-4D3E-47BA-8135-FCA0CFF7E210} + {21D42B96-99F9-4E48-A499-5170A5A9597F} = {33D973D8-4D3E-47BA-8135-FCA0CFF7E210} + {9FF626DF-1AD4-2BE1-F834-F49121D65085} = {57E80693-7AFC-4446-87DE-25E97C036E2F} + {550A20AB-8E97-BCDD-9F54-27823663120A} = {21D42B96-99F9-4E48-A499-5170A5A9597F} + {E713D25F-2602-44C9-AB9E-C9477FB2BA93} = {94C8CFDB-C7C6-40DF-A596-647FEEA3C917} + {3FFB2BF4-159E-3073-4BDF-08AE93C7A2C1} = {E713D25F-2602-44C9-AB9E-C9477FB2BA93} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {C584421D-5EC0-4821-B7D8-2633D8D405F2} diff --git a/Libraries/Esiur/Data/DataDeserializer.cs b/Libraries/Esiur/Data/DataDeserializer.cs index faf91f2..e35bdfa 100644 --- a/Libraries/Esiur/Data/DataDeserializer.cs +++ b/Libraries/Esiur/Data/DataDeserializer.cs @@ -1805,8 +1805,6 @@ public static class DataDeserializer } } - - public static AsyncReply TypedParserAsync(ParsedTdu tdu, EpConnection connection, uint[] requestSequence) { var tru = tdu.Metadata; @@ -1831,20 +1829,6 @@ public static class DataDeserializer else if (tru is TruTypeDef truTypeDef) { return TypedObjectParserAsync(tdu, truTypeDef.TypeDef, connection, requestSequence); -// return tru.Identifier switch -// { -// TruIdentifier.LocalType8 or -// TruIdentifier.LocalType16 or -// TruIdentifier.LocalType32 or -// TruIdentifier.LocalType64 or -// TruIdentifier.RemoteType8 or -// TruIdentifier.RemoteType16 or -// TruIdentifier.RemoteType32 or -// TruIdentifier.RemoteType64 => TypedObjectParserAsync(tdu, truTypeDef.TypeDef, connection, requestSequence); -//, - -// _ => throw new Exception("Unsupported type for typed parser.") -// }; } throw new Exception("Unknown TRU."); @@ -1874,32 +1858,12 @@ public static class DataDeserializer } else if (tru is TruTypeDef truTypeDef) { - return tru.Identifier switch - { - TruIdentifier.LocalType8 or - TruIdentifier.LocalType16 or - TruIdentifier.LocalType32 or - TruIdentifier.LocalType64 or - TruIdentifier.RemoteType8 or - TruIdentifier.RemoteType16 or - TruIdentifier.RemoteType32 or - TruIdentifier.RemoteType64 => TypedObjectParser(tdu, truTypeDef.TypeDef, warehouse), - _ => throw new Exception("Unsupported type for typed parser.") - }; - + return TypedObjectParser(tdu, truTypeDef.TypeDef, warehouse); } throw new Exception("Unknown TRU."); } - //public static object TypedListParser(ParsedTdu tdu, Warehouse warehouse) - //{ - // // get the type - // var (hdrCs, tru) = Tru.Parse(tdu.Metadata, 0); - - // return TypedArrayParser(tdu, tru, warehouse); - //} - public static AsyncBag PropertyValueArrayParserAsync(byte[] data, uint offset, uint length, EpConnection connection, uint[] requestSequence)//, bool ageIncluded = true) { var rt = new AsyncBag(); diff --git a/Program2.cs b/Program2.cs new file mode 100644 index 0000000..d555ecb --- /dev/null +++ b/Program2.cs @@ -0,0 +1,8 @@ +using System; + +public class Class1 +{ + public Class1() + { + } +} diff --git a/Tests/Distribution/ConcurrentAttachSweep/Esiur.Tests.ConcurrentAttachSweep.csproj b/Tests/Distribution/ConcurrentAttachSweep/Esiur.Tests.ConcurrentAttachSweep.csproj new file mode 100644 index 0000000..5a5b0cd --- /dev/null +++ b/Tests/Distribution/ConcurrentAttachSweep/Esiur.Tests.ConcurrentAttachSweep.csproj @@ -0,0 +1,14 @@ + + + + Exe + net10.0 + enable + enable + + + + + + + diff --git a/Tests/Distribution/ConcurrentAttachSweep/Program.cs b/Tests/Distribution/ConcurrentAttachSweep/Program.cs new file mode 100644 index 0000000..0ef1f09 --- /dev/null +++ b/Tests/Distribution/ConcurrentAttachSweep/Program.cs @@ -0,0 +1,278 @@ +// ============================================================ +// Scalability Extension: Concurrent Attach Sweep +// ------------------------------------------------------------ +// Extends Tests/Distribution/ConcurrentAttach with: +// - Sweep over a wider range of concurrent request counts A. +// - More rounds per A for confidence-interval reporting. +// - Auto-stop when timeouts or failures appear (the +// saturation signal for concurrent attach is different +// from fan-out: it's *correctness* failure, not slowdown). +// - Unified CSV output suitable for direct plotting. +// +// Server: re-use the existing +// Tests/Distribution/ConcurrentAttach with --mode server. +// Or run this binary with --mode both. +// ------------------------------------------------------------ +// Usage: +// dotnet run -- --mode both --resources 200 \ +// --a-values 10,25,50,100,250,500,1000,2000 \ +// --rounds 10 --timeout 10000 +// ============================================================ + +using Esiur.Protocol; +using Esiur.Resource; +using Esiur.Stores; +using Esiur.Tests.ConcurrentAttachSweep; +using System.Data.Common; +using System.Diagnostics; +using System.Globalization; + +var mode = GetArg(args, "--mode", "both"); +var host = GetArg(args, "--host", "127.0.0.1"); +var port = int.Parse(GetArg(args, "--port", "10902")); +var resources = int.Parse(GetArg(args, "--resources", "200")); +var timeoutMs = int.Parse(GetArg(args, "--timeout", "10000")); +var rounds = int.Parse(GetArg(args, "--rounds", "10")); +var aValStr = GetArg(args, "--a-values", "10,25,50,100,250,500,1000,2000"); +var outCsv = GetArg(args, "--output", "concurrent_attach_sweep.csv"); +var aValues = aValStr.Split(',').Select(int.Parse).ToArray(); + +var serverWh = new Warehouse(); +var clientWh = new Warehouse(); + +// ---------------------------------------------------------------- +// SERVER SIDE +// ---------------------------------------------------------------- +if (mode == "server" || mode == "both") +{ + await serverWh.Put("sys", new MemoryStore()); + await serverWh.Put("sys/server", new EpServer() { Port = (ushort)port }); + + for (int i = 0; i < resources; i++) + { + await serverWh.Put($"sys/sensor_{i}", new SensorResource { SensorId = i, Value = i }); + } + + await serverWh.Open(); + Console.WriteLine($"[Server-T3+] Ready: {resources} resources on port {port}"); + + if (mode == "server") + { + Console.WriteLine("Press ENTER to stop."); + Console.ReadLine(); + await serverWh.Close(); + return; + } + + await Task.Delay(500); +} + +// ---------------------------------------------------------------- +// CLIENT SIDE: sweep over A values +// ---------------------------------------------------------------- +Console.WriteLine($"[Client-T3+] resources={resources} timeout={timeoutMs}ms rounds={rounds}"); +Console.WriteLine($"[Client-T3+] A values: {string.Join(",", aValues)}"); + +var summary = new List(); +bool failureDetected = false; + +foreach (int A in aValues) +{ + if (failureDetected) + { + Console.WriteLine($"\n[Client-T3+] A={A}: SKIPPED (failure at lower A)"); + continue; + } + + Console.WriteLine($"\n[Client-T3+] === A={A} ==="); + var roundResults = new List(); + + for (int round = 0; round < rounds; round++) + { + var rng = new Random(round * 1000 + A); + var targets = Enumerable.Range(0, A) + .Select(_ => rng.Next(resources)) + .ToArray(); + + long succeeded = 0, failed = 0, timedOut = 0; + var latencies = new double[A]; + var roundSw = Stopwatch.StartNew(); + + // One shared connection per round, matching the existing test methodology + var connection = await clientWh.Get($"ep://{host}:{port}"); + + var tasks = targets.Select((resourceIdx, taskIdx) => Task.Run(async () => + { + var sw = Stopwatch.StartNew(); + using var cts = new CancellationTokenSource(timeoutMs); + try + { + var proxy = await connection.Get($"sys/sensor_{resourceIdx}"); + sw.Stop(); + latencies[taskIdx] = sw.Elapsed.TotalMilliseconds; + if (proxy != null) Interlocked.Increment(ref succeeded); + else Interlocked.Increment(ref failed); + } + catch (OperationCanceledException) + { + sw.Stop(); + latencies[taskIdx] = timeoutMs; + Interlocked.Increment(ref timedOut); + } + catch + { + sw.Stop(); + latencies[taskIdx] = sw.Elapsed.TotalMilliseconds; + Interlocked.Increment(ref failed); + } + })).ToArray(); + + await Task.WhenAll(tasks); + roundSw.Stop(); + + var sorted = latencies.OrderBy(x => x).ToArray(); + int n = sorted.Length; + + var rr = new RoundResult + { + Round = round + 1, + A = A, + Succeeded = succeeded, + Failed = failed, + TimedOut = timedOut, + WallMs = roundSw.Elapsed.TotalMilliseconds, + P50 = sorted[Math.Min(n - 1, (int)(n * 0.50))], + P95 = sorted[Math.Min(n - 1, (int)(n * 0.95))], + P99 = sorted[Math.Min(n - 1, (int)(n * 0.99))], + Max = sorted[n - 1], + }; + roundResults.Add(rr); + + Console.WriteLine($" round {round + 1}/{rounds}: ok={succeeded}/{A} fail={failed} " + + $"timeout={timedOut} wall={rr.WallMs:F0}ms p50={rr.P50:F0} p99={rr.P99:F0}"); + + // Round 1 of each A is conventionally excluded from latency + // aggregation due to connection-establishment overhead (matches + // the existing test methodology). + + GC.Collect(); + await Task.Delay(500); + } + + var steady = roundResults.Skip(1).ToList(); // exclude round 1 + if (steady.Count == 0) steady = roundResults; + + var anyFailure = roundResults.Any(r => r.Failed > 0 || r.TimedOut > 0); + + var s = new ASummary + { + A = A, + Rounds = roundResults.Count, + AnyFailures = anyFailure, + TotalSucceeded = roundResults.Sum(r => r.Succeeded), + TotalFailed = roundResults.Sum(r => r.Failed), + TotalTimedOut = roundResults.Sum(r => r.TimedOut), + MeanP50 = steady.Average(r => r.P50), + Ci95P50 = ConfidenceIntervalHalfWidth95(steady.Select(r => r.P50).ToArray()), + MeanP99 = steady.Average(r => r.P99), + Ci95P99 = ConfidenceIntervalHalfWidth95(steady.Select(r => r.P99).ToArray()), + MeanWall = steady.Average(r => r.WallMs), + Ci95Wall = ConfidenceIntervalHalfWidth95(steady.Select(r => r.WallMs).ToArray()), + }; + summary.Add(s); + + Console.WriteLine($" [A={A}] SUMMARY: " + + $"p50={s.MeanP50:F1}±{s.Ci95P50:F1} " + + $"p99={s.MeanP99:F1}±{s.Ci95P99:F1} " + + $"wall={s.MeanWall:F0}±{s.Ci95Wall:F0}ms " + + $"failures={s.TotalFailed + s.TotalTimedOut}/{s.TotalSucceeded + s.TotalFailed + s.TotalTimedOut}"); + + if (anyFailure) + { + Console.WriteLine($" [A={A}] *** FAILURE DETECTED: stopping sweep ***"); + failureDetected = true; + } +} + +// ---------------------------------------------------------------- +// CSV output +// ---------------------------------------------------------------- +var sb = new System.Text.StringBuilder(); +sb.AppendLine("a,rounds,any_failures,total_succeeded,total_failed,total_timed_out," + + "mean_p50_ms,ci95_p50,mean_p99_ms,ci95_p99,mean_wall_ms,ci95_wall"); +foreach (var r in summary) +{ + sb.AppendLine(string.Create(CultureInfo.InvariantCulture, + $"{r.A},{r.Rounds},{r.AnyFailures},{r.TotalSucceeded},{r.TotalFailed},{r.TotalTimedOut}," + + $"{r.MeanP50:F2},{r.Ci95P50:F2},{r.MeanP99:F2},{r.Ci95P99:F2}," + + $"{r.MeanWall:F2},{r.Ci95Wall:F2}")); +} +await File.WriteAllTextAsync(outCsv, sb.ToString()); +Console.WriteLine($"\n[Client-T3+] Results written to {outCsv}"); + +if (mode == "server" || mode == "both") await serverWh.Close(); +if (mode == "client" || mode == "both") await clientWh.Close(); + + +// ---------------------------------------------------------------- +// Helpers +// ---------------------------------------------------------------- +static double ConfidenceIntervalHalfWidth95(double[] xs) +{ + int n = xs.Length; + if (n < 2) return 0; + double mean = xs.Average(); + double sumSq = xs.Sum(x => (x - mean) * (x - mean)); + double std = Math.Sqrt(sumSq / (n - 1)); + double sem = std / Math.Sqrt(n); + double t = (n - 1) switch + { + 1 => 12.706, + 2 => 4.303, + 3 => 3.182, + 4 => 2.776, + 5 => 2.571, + 6 => 2.447, + 7 => 2.365, + 8 => 2.306, + 9 => 2.262, + _ => 1.960 + }; + return t * sem; +} + +static string GetArg(string[] args, string key, string def) +{ + int i = Array.IndexOf(args, key); + return (i >= 0 && i + 1 < args.Length) ? args[i + 1] : def; +} + +record RoundResult +{ + public int Round; + public int A; + public long Succeeded; + public long Failed; + public long TimedOut; + public double WallMs; + public double P50; + public double P95; + public double P99; + public double Max; +} + +record ASummary +{ + public int A; + public int Rounds; + public bool AnyFailures; + public long TotalSucceeded; + public long TotalFailed; + public long TotalTimedOut; + public double MeanP50; + public double Ci95P50; + public double MeanP99; + public double Ci95P99; + public double MeanWall; + public double Ci95Wall; +} \ No newline at end of file diff --git a/Tests/Distribution/ConcurrentAttachSweep/SensorResource.cs b/Tests/Distribution/ConcurrentAttachSweep/SensorResource.cs new file mode 100644 index 0000000..5455881 --- /dev/null +++ b/Tests/Distribution/ConcurrentAttachSweep/SensorResource.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Esiur.Tests.ConcurrentAttachSweep; + +using Esiur.Resource; + +[Resource] +public partial class SensorResource : Resource +{ + public int SensorId { get; set; } + + [Export] + public double value; +} \ No newline at end of file diff --git a/Tests/Distribution/NodeFanoutSweep/Client/Esiur.Tests.NodeFanoutSweep.Client.csproj b/Tests/Distribution/NodeFanoutSweep/Client/Esiur.Tests.NodeFanoutSweep.Client.csproj new file mode 100644 index 0000000..ed9781c --- /dev/null +++ b/Tests/Distribution/NodeFanoutSweep/Client/Esiur.Tests.NodeFanoutSweep.Client.csproj @@ -0,0 +1,10 @@ + + + + Exe + net10.0 + enable + enable + + + diff --git a/Tests/Distribution/NodeFanoutSweep/Client/Program.cs b/Tests/Distribution/NodeFanoutSweep/Client/Program.cs new file mode 100644 index 0000000..1bc52a6 --- /dev/null +++ b/Tests/Distribution/NodeFanoutSweep/Client/Program.cs @@ -0,0 +1 @@ +Console.WriteLine("Hello, World!"); diff --git a/Tests/Distribution/NodeFanoutSweep/Server/Esiur.Tests.NodeFanoutSweep.Server.csproj b/Tests/Distribution/NodeFanoutSweep/Server/Esiur.Tests.NodeFanoutSweep.Server.csproj new file mode 100644 index 0000000..4d18603 --- /dev/null +++ b/Tests/Distribution/NodeFanoutSweep/Server/Esiur.Tests.NodeFanoutSweep.Server.csproj @@ -0,0 +1,14 @@ + + + + Exe + net10.0 + enable + enable + + + + + + + diff --git a/Tests/Distribution/NodeFanoutSweep/Server/Program.cs b/Tests/Distribution/NodeFanoutSweep/Server/Program.cs new file mode 100644 index 0000000..3cf6ff2 --- /dev/null +++ b/Tests/Distribution/NodeFanoutSweep/Server/Program.cs @@ -0,0 +1,390 @@ +// ============================================================ +// Scalability Extension: Fan-Out — ORCHESTRATOR CLIENT +// ------------------------------------------------------------ +// Drives a full sweep of subscriber counts N against a single +// server instance. For each N value: +// 1. Spawns N in-process subscriber tasks, each opening its +// own EpConnection to the server. +// 2. Each subscriber attaches to all M resources and counts +// property-change notifications it receives over a fixed +// measurement window. +// 3. The orchestrator polls the server's sys/control resource +// to capture server-side CPU during the window. +// 4. Tears down all N subscribers and waits a settle interval +// before the next sweep point. +// 5. Repeats for `replications` rounds so the per-N mean and +// 95% confidence interval can be computed. +// 6. Auto-stops the sweep if either: +// - mean per-subscriber rate drops below 10% of theoretical, +// - or server CPU stays at >180% (>90% of 2 cores) for the +// entire measurement window. +// +// Note on in-process vs separate processes: subscribers are +// tasks within a single client process to keep the test self- +// contained and avoid spawning N OS processes. Each task uses +// its own EpConnection (TCP connection) to the server, so from +// the server's perspective the load looks identical to N +// distinct subscriber nodes for the property-propagation path. +// The single-client-process design does mean that the client +// host's CPU is shared across all subscribers; the orchestrator +// records this too so degradation can be attributed correctly. +// ------------------------------------------------------------ +// Usage: +// dotnet run -- --host 127.0.0.1 --port 10900 --resources 100 \ +// --emit-interval-ms 50 --window-sec 60 \ +// --warmup-sec 5 --replications 3 \ +// --n-values 2,5,10,20,50,100,200,500 +// ============================================================ + +using Esiur.Protocol; +using Esiur.Resource; +using System.Data.Common; +using System.Diagnostics; +using System.Globalization; + +var host = GetArg(args, "--host", "127.0.0.1"); +var port = int.Parse(GetArg(args, "--port", "10900")); +var resources = int.Parse(GetArg(args, "--resources", "100")); +var emitIntervalMs = int.Parse(GetArg(args, "--emit-interval-ms", "50")); +var windowSec = int.Parse(GetArg(args, "--window-sec", "60")); +var warmupSec = int.Parse(GetArg(args, "--warmup-sec", "5")); +var settleSec = int.Parse(GetArg(args, "--settle-sec", "5")); +var replications = int.Parse(GetArg(args, "--replications", "3")); +var nValuesStr = GetArg(args, "--n-values", "2,5,10,20,50,100,200,500"); +var outputCsv = GetArg(args, "--output", "fanout_sweep_results.csv"); + +var nValues = nValuesStr.Split(',').Select(int.Parse).ToArray(); +double theoreticalMaxRate = 1000.0 / emitIntervalMs * resources; +double minAcceptableRate = theoreticalMaxRate * 0.10; + +Console.WriteLine($"[Orchestrator] resources={resources} interval={emitIntervalMs}ms " + + $"window={windowSec}s replications={replications}"); +Console.WriteLine($"[Orchestrator] theoretical_max_per_subscriber_rate={theoreticalMaxRate:F0} notif/s"); +Console.WriteLine($"[Orchestrator] saturation_threshold={minAcceptableRate:F0} notif/s"); +Console.WriteLine($"[Orchestrator] N values: {string.Join(",", nValues)}"); + +// ---------------------------------------------------------------- +// Attach to the server's control resource once. +// ---------------------------------------------------------------- +var controlWh = new Warehouse(); +dynamic? control = null; +try +{ + var controlConn = await controlWh.Get($"ep://{host}:{port}"); + control = await controlConn.Get("sys/control"); +} +catch (Exception ex) +{ + Console.WriteLine($"[Orchestrator] WARNING: could not attach to sys/control: {ex.Message}"); + Console.WriteLine("[Orchestrator] Server CPU will be reported as N/A."); +} + +// ---------------------------------------------------------------- +// All sweep points x replications, with per-N early-stop logic. +// ---------------------------------------------------------------- +var allResults = new List(); +bool saturatedDetected = false; + +foreach (int n in nValues) +{ + if (saturatedDetected) + { + Console.WriteLine($"\n[Orchestrator] N={n}: SKIPPED (saturation reached at lower N)"); + continue; + } + + var perRepResults = new List(); + + for (int rep = 0; rep < replications; rep++) + { + Console.WriteLine($"\n[Orchestrator] === N={n} rep={rep + 1}/{replications} ==="); + + var subscribers = new SubscriberTask[n]; + var subscriberWhs = new Warehouse[n]; + + // ---------- spawn N subscribers ---------- + Console.WriteLine($"[Orchestrator] Spawning {n} subscribers..."); + var spawnSw = Stopwatch.StartNew(); + var spawnTasks = new Task[n]; + for (int i = 0; i < n; i++) + { + int captured = i; + subscriberWhs[i] = new Warehouse(); + spawnTasks[i] = SpawnSubscriber(subscriberWhs[i], host, port, resources, captured); + } + + await Task.WhenAll(spawnTasks); + + bool spawnFailed = false; + for (int i = 0; i < n; i++) + { + if (spawnTasks[i].Result == null) { spawnFailed = true; break; } + subscribers[i] = spawnTasks[i].Result!; + } + spawnSw.Stop(); + + if (spawnFailed) + { + Console.WriteLine($"[Orchestrator] N={n}: spawn failed; treating as saturation."); + saturatedDetected = true; + await TeardownAll(subscriberWhs); + break; + } + Console.WriteLine($"[Orchestrator] All {n} subscribers attached in {spawnSw.Elapsed.TotalSeconds:F2}s"); + + // ---------- warmup ---------- + Console.WriteLine($"[Orchestrator] Warmup {warmupSec}s..."); + await Task.Delay(warmupSec * 1000); + foreach (var s in subscribers) s.ResetCounters(); + + // ---------- measurement window with CPU sampling ---------- + Console.WriteLine($"[Orchestrator] Measurement window {windowSec}s..."); + var cpuSamples = new List(); + var connSamples = new List(); + var winSw = Stopwatch.StartNew(); + while (winSw.Elapsed.TotalSeconds < windowSec) + { + await Task.Delay(1000); + if (control != null) + { + try + { + cpuSamples.Add((double)control.CpuPercent); + connSamples.Add((int)control.ConnectedClients); + } + catch { /* control resource may not have current value yet */ } + } + } + + double elapsedSec = winSw.Elapsed.TotalSeconds; + + // ---------- collect per-subscriber counts ---------- + var perSubRates = new double[n]; + long totalReceived = 0; + long totalLate = 0; + for (int i = 0; i < n; i++) + { + perSubRates[i] = subscribers[i].Received / elapsedSec; + totalReceived += subscribers[i].Received; + totalLate += subscribers[i].LateDeliveries; + } + + double meanPerSub = perSubRates.Average(); + double stdPerSub = StdDev(perSubRates); + double minPerSub = perSubRates.Min(); + double maxPerSub = perSubRates.Max(); + double aggregate = perSubRates.Sum(); + double avgServerCpu = cpuSamples.Count > 0 ? cpuSamples.Average() : double.NaN; + double peakServerCpu = cpuSamples.Count > 0 ? cpuSamples.Max() : double.NaN; + + Console.WriteLine($"[Orchestrator] N={n} rep={rep + 1}: " + + $"mean_per_sub={meanPerSub:F1}/s " + + $"aggregate={aggregate:F0}/s " + + $"late={totalLate} " + + $"server_cpu_avg={avgServerCpu:F1}% peak={peakServerCpu:F1}%"); + + perRepResults.Add(new RepResult + { + N = n, + Rep = rep + 1, + MeanPerSub = meanPerSub, + StdPerSub = stdPerSub, + MinPerSub = minPerSub, + MaxPerSub = maxPerSub, + Aggregate = aggregate, + LateDeliveries = totalLate, + ServerCpuAvg = avgServerCpu, + ServerCpuPeak = peakServerCpu, + }); + + // ---------- teardown ---------- + Console.WriteLine($"[Orchestrator] Tearing down {n} subscribers..."); + await TeardownAll(subscriberWhs); + await Task.Delay(settleSec * 1000); + } + + // ---------- per-N aggregation ---------- + if (perRepResults.Count > 0) + { + double meanOfMeans = perRepResults.Average(r => r.MeanPerSub); + double ciHalfWidth = ConfidenceIntervalHalfWidth95( + perRepResults.Select(r => r.MeanPerSub).ToArray()); + + Console.WriteLine($"\n[Orchestrator] N={n} SUMMARY: " + + $"mean_per_sub={meanOfMeans:F1} ± {ciHalfWidth:F1} notif/s (95% CI)"); + + // Saturation detection: stop sweep if per-sub rate falls below + // 10% of theoretical OR server CPU peaked above 180% (>90% of 2 cores) + if (meanOfMeans < minAcceptableRate) + { + Console.WriteLine($"[Orchestrator] *** SATURATION DETECTED: rate {meanOfMeans:F0} < {minAcceptableRate:F0} ***"); + saturatedDetected = true; + } + else if (perRepResults.Average(r => r.ServerCpuPeak) > 180.0) + { + Console.WriteLine($"[Orchestrator] *** SATURATION DETECTED: server CPU peaked > 180% ***"); + saturatedDetected = true; + } + + // Aggregate row for CSV + allResults.Add(new SweepResult + { + N = n, + Replications = perRepResults.Count, + MeanPerSubRate = meanOfMeans, + Ci95HalfWidth = ciHalfWidth, + MeanAggregate = perRepResults.Average(r => r.Aggregate), + TotalLate = perRepResults.Sum(r => r.LateDeliveries), + MeanServerCpuAvg = perRepResults.Average(r => r.ServerCpuAvg), + MeanServerCpuPeak = perRepResults.Average(r => r.ServerCpuPeak), + }); + } +} + +// ---------------------------------------------------------------- +// Output +// ---------------------------------------------------------------- +var sb = new System.Text.StringBuilder(); +sb.AppendLine("n,replications,mean_per_sub_rate,ci95_halfwidth,mean_aggregate," + + "total_late,mean_server_cpu_avg,mean_server_cpu_peak"); +foreach (var r in allResults) +{ + sb.AppendLine(string.Create(CultureInfo.InvariantCulture, + $"{r.N},{r.Replications},{r.MeanPerSubRate:F2},{r.Ci95HalfWidth:F2}," + + $"{r.MeanAggregate:F1},{r.TotalLate},{r.MeanServerCpuAvg:F2},{r.MeanServerCpuPeak:F2}")); +} +await File.WriteAllTextAsync(outputCsv, sb.ToString()); +Console.WriteLine($"\n[Orchestrator] Results written to {outputCsv}"); + +// ---------------------------------------------------------------- +// Subscriber spawn / teardown +// ---------------------------------------------------------------- +static async Task SpawnSubscriber( + Warehouse wh, string host, int port, int resources, int subId) +{ + try + { + var conn = await wh.Get($"ep://{host}:{port}"); + var sub = new SubscriberTask { SubscriberId = subId }; + + for (int i = 0; i < resources; i++) + { + var proxy = await conn.Get($"sys/sensor_{i}"); + long lastTick = Stopwatch.GetTimestamp(); + + proxy.Instance.PropertyModified += (PropertyModificationInfo data) => + { + if (data.Name != "Value") return; + long now = Stopwatch.GetTimestamp(); + double elapsedMs = (now - lastTick) * 1000.0 / Stopwatch.Frequency; + lastTick = now; + Interlocked.Increment(ref sub._received); + if (elapsedMs > 400) Interlocked.Increment(ref sub._lateDeliveries); + }; + } + + return sub; + } + catch (Exception ex) + { + Console.WriteLine($" [Spawn-{subId}] FAILED: {ex.Message}"); + return null; + } +} + +static async Task TeardownAll(Warehouse[] whs) +{ + foreach (var wh in whs) + { + try { await wh.Close(); } + catch { /* ignore */ } + } +} + +// ---------------------------------------------------------------- +// Stats helpers +// ---------------------------------------------------------------- +static double StdDev(double[] xs) +{ + if (xs.Length < 2) return 0; + double mean = xs.Average(); + double sumSq = xs.Sum(x => (x - mean) * (x - mean)); + return Math.Sqrt(sumSq / (xs.Length - 1)); +} + +/// +/// 95% confidence interval half-width using Student's t-distribution. +/// For very small samples (n < 3) returns 0 (not enough data). +/// t values for 95% two-sided are hard-coded; see standard tables. +/// +static double ConfidenceIntervalHalfWidth95(double[] xs) +{ + int n = xs.Length; + if (n < 2) return 0; + double std = StdDev(xs); + double sem = std / Math.Sqrt(n); + // t for df=n-1, two-sided 95% + double t = (n - 1) switch + { + 1 => 12.706, + 2 => 4.303, + 3 => 3.182, + 4 => 2.776, + 5 => 2.571, + 6 => 2.447, + 7 => 2.365, + 8 => 2.306, + 9 => 2.262, + _ => 1.960 // normal approximation + }; + return t * sem; +} + +static string GetArg(string[] args, string key, string def) +{ + int i = Array.IndexOf(args, key); + return (i >= 0 && i + 1 < args.Length) ? args[i + 1] : def; +} + +// ---------------------------------------------------------------- +// Records +// ---------------------------------------------------------------- +class SubscriberTask +{ + public int SubscriberId; + internal long _received; + internal long _lateDeliveries; + public long Received => Interlocked.Read(ref _received); + public long LateDeliveries => Interlocked.Read(ref _lateDeliveries); + public void ResetCounters() + { + Interlocked.Exchange(ref _received, 0); + Interlocked.Exchange(ref _lateDeliveries, 0); + } +} + +record RepResult +{ + public int N; + public int Rep; + public double MeanPerSub; + public double StdPerSub; + public double MinPerSub; + public double MaxPerSub; + public double Aggregate; + public long LateDeliveries; + public double ServerCpuAvg; + public double ServerCpuPeak; +} + +record SweepResult +{ + public int N; + public int Replications; + public double MeanPerSubRate; + public double Ci95HalfWidth; + public double MeanAggregate; + public long TotalLate; + public double MeanServerCpuAvg; + public double MeanServerCpuPeak; +} \ No newline at end of file diff --git a/Tests/Distribution/Queueing/Client/Esiur.Tests.Queueing.Client.csproj b/Tests/Distribution/Queueing/Client/Esiur.Tests.Queueing.Client.csproj index 634bddd..6faa089 100644 --- a/Tests/Distribution/Queueing/Client/Esiur.Tests.Queueing.Client.csproj +++ b/Tests/Distribution/Queueing/Client/Esiur.Tests.Queueing.Client.csproj @@ -7,6 +7,14 @@ enable + + + + + + + + diff --git a/Tests/Distribution/Queueing/Client/Program2.cs b/Tests/Distribution/Queueing/Client/Program2.cs new file mode 100644 index 0000000..b3a62b6 --- /dev/null +++ b/Tests/Distribution/Queueing/Client/Program2.cs @@ -0,0 +1,176 @@ +// ============================================================ +// Test 4: Fork-Join Queueing Test — CLIENT NODE (REPLICATED) +// +// Extends the original single-shot client to run K independent +// replications of each (delay, α) configuration so that 95% +// confidence intervals can be reported for the metrics in +// Table III (λ, μ, R̄, δ̄, D̄, P99(D), queue length, batch B). +// +// Each replication uses an identical configuration; the server +// runs StartUpdatesLocal back-to-back, and the client snapshots +// the cumulative finished-queue length between replications so +// that each replication's evaluation sees only its own items. +// +// Usage: +// dotnet run -- --host 127.0.0.1 --port 10901 \ +// --trials 1000 \ +// --delays 5:10:20:30:50:100 \ +// --alphas 0.0:0.25:0.5:0.75:1.0 \ +// --replications 5 \ +// --output forkjoin_replicated.csv +// ============================================================ + +using Esiur.Data; +using Esiur.Protocol; +using Esiur.Resource; +using Esiur.Tests.Queueing.Client; +using System.ComponentModel; +using System.Globalization; +using System.IO; + +// ---------- arguments ---------- +var host = GetArg(args, "--host", "127.0.0.1"); +var port = int.Parse(GetArg(args, "--port", "10901")); +var trials = int.Parse(GetArg(args, "--trials", "1000")); +var replications = int.Parse(GetArg(args, "--replications", "5")); +var settleMs = int.Parse(GetArg(args, "--settle-ms", "1000")); +var outputCsv = GetArg(args, "--output", "forkjoin_replicated.csv"); +var delays = GetArg(args, "--delays", "5:10:20:30:50:100") + .Split(':').Select(int.Parse).ToArray(); +var alphas = GetArg(args, "--alphas", "0.0:0.25:0.5:0.75:1.0") + .Split(':').Select(s => double.Parse(s, CultureInfo.InvariantCulture)).ToArray(); + +Console.WriteLine($"[Client-T4-R] Connecting to {host}:{port}"); +Console.WriteLine($"[Client-T4-R] trials/rep={trials} replications={replications} " + + $"settle={settleMs}ms"); +Console.WriteLine($"[Client-T4-R] delays={string.Join(",", delays)}"); +Console.WriteLine($"[Client-T4-R] alphas={string.Join(",", alphas.Select(a => a.ToString("F2", CultureInfo.InvariantCulture)))}"); +Console.WriteLine($"[Client-T4-R] {delays.Length * alphas.Length} configurations × {replications} reps " + + $"= {delays.Length * alphas.Length * replications} trial runs"); + +// ---------- connect ---------- +var wh = new Warehouse(); +var serviceResource = await wh.Get($"ep://{host}:{port}/sys/queueing"); +var service = (dynamic)serviceResource; + +// ---------- replication coordinator state ---------- +// +// The server's StartUpdatesLocal fires `trials` PropertyChanged events +// across a single call. We count incoming events; when `trials` arrive, +// the current replication is complete. We then slice off this rep's +// portion of the cumulative finished-queue and hand it to QueueEval. +// +// `repDone` is signaled once per replication so the orchestrator coroutine +// can drive the next call. + +int eventsThisRep = 0; +TaskCompletionSource repDone = new(TaskCreationOptions.RunContinuationsAsynchronously); +int finishedQueueBaseline = 0; // cumulative length BEFORE current rep started + +serviceResource.PropertyChanged += (object? sender, PropertyChangedEventArgs e) => +{ + int n = Interlocked.Increment(ref eventsThisRep); + if (n == trials) + { + repDone.TrySetResult(true); + } +}; + +// ---------- main sweep ---------- +var rows = new List(); + +using var writer = new StreamWriter(outputCsv); +writer.WriteLine(ReplicatedEvalAggregator.CsvHeader); +writer.Flush(); + +foreach (var delay in delays) +{ + foreach (var alpha in alphas) + { + Console.WriteLine(); + Console.WriteLine($"[Client-T4-R] >>> delay={delay} ms α={alpha:F2} " + + $"(running {replications} replications) <<<"); + + var reps = new List(replications); + + for (int rep = 0; rep < replications; rep++) + { + // Reset per-rep state + Interlocked.Exchange(ref eventsThisRep, 0); + repDone = new TaskCompletionSource( + TaskCreationOptions.RunContinuationsAsynchronously); + + // Snapshot the cumulative finished-queue length right before this rep + // so we can slice off only this rep's portion afterwards. + var preQueue = service.DistributedResourceConnection.GetFinishedQueue(); + finishedQueueBaseline = preQueue.Count; + + // Kick off the server-driven trial sequence (fire-and-forget; + // completion is signalled via PropertyChanged → repDone). + service.StartUpdatesLocal(delay, trials, alpha); + + // Wait until `trials` PropertyChanged events have been received. + await repDone.Task; + + // The server completed `trials` events; slice off this rep's + // portion of the cumulative finished-queue. GetFinishedQueue() + // returns IReadOnlyList>; we forward the + // typed sliced subset directly to Evaluate which is generic + // on T (the property's runtime payload type). + var fullQueue = service.DistributedResourceConnection.GetFinishedQueue(); + var typedQueue = SliceQueue(fullQueue, finishedQueueBaseline); + + var repResult = EsiurQueueEval.Evaluate(typedQueue); + reps.Add(repResult); + + Console.WriteLine($" rep {rep + 1}/{replications}: " + + $"λ={repResult.LambdaEventsPerSecond:F1}/s " + + $"R̄={repResult.Latency.ReadinessMs.Mean:F1}ms " + + $"δ̄={repResult.Latency.HolMs.Mean:F1}ms " + + $"D̄={repResult.Latency.EndToEndMs.Mean:F1}ms"); + + // Settle period between reps to let any straggler notifications drain + // and to keep the per-rep arrivals statistically independent of any + // residual server state from the previous rep. + await Task.Delay(settleMs); + } + + var agg = ReplicatedEvalAggregator.Aggregate(delay, alpha, reps); + rows.Add(agg); + + ReplicatedEvalAggregator.PrintSummary(agg); + + // Append to CSV immediately so partial progress is preserved + // if the process is killed mid-sweep. + writer.WriteLine(ReplicatedEvalAggregator.ToCsvRow(agg)); + writer.Flush(); + } +} + +Console.WriteLine(); +Console.WriteLine($"[Client-T4-R] Done. {rows.Count} configurations written to {outputCsv}"); +Environment.Exit(0); + + +// ---------------------------------------------------------------- +static string GetArg(string[] args, string key, string def) +{ + int i = Array.IndexOf(args, key); + return (i >= 0 && i + 1 < args.Length) ? args[i + 1] : def; +} + +// ---------------------------------------------------------------- +// Slice the cumulative finished-queue down to only the items added +// during the current replication. +// +// The queue is dynamically typed (returned from a dynamic-dispatched +// member) and its element type is AsyncQueueItem where T is the +// runtime payload type of the observed property. We rely on the DLR +// to bind the LINQ Skip/ToList generic methods at runtime, just +// as the original code does with the Evaluate call below it. +// ---------------------------------------------------------------- +static dynamic SliceQueue(dynamic fullQueue, int skipCount) +{ + return System.Linq.Enumerable.ToList( + System.Linq.Enumerable.Skip(fullQueue, skipCount)); +} \ No newline at end of file diff --git a/Tests/Distribution/Queueing/Client/ReplicateState.cs b/Tests/Distribution/Queueing/Client/ReplicateState.cs new file mode 100644 index 0000000..2afab70 --- /dev/null +++ b/Tests/Distribution/Queueing/Client/ReplicateState.cs @@ -0,0 +1,183 @@ +using System; +using System.Collections.Generic; +using System.Globalization; +using System.Linq; + +namespace Esiur.Tests.Queueing.Client +{ + /// + /// Point estimate accompanied by a 95% confidence-interval half-width + /// (computed with Student's t for small samples). Use ToString() to + /// render as "mean ± half" in print output. + /// + public readonly record struct MeanCi(double Mean, double Ci95HalfWidth, int N) + { + public static MeanCi From(IEnumerable xs) + { + var arr = xs.ToArray(); + int n = arr.Length; + if (n == 0) return new MeanCi(0, 0, 0); + if (n == 1) return new MeanCi(arr[0], 0, 1); + + double mean = arr.Average(); + double sumSq = 0; + for (int i = 0; i < n; i++) + { + double d = arr[i] - mean; + sumSq += d * d; + } + double std = Math.Sqrt(sumSq / (n - 1)); + double sem = std / Math.Sqrt(n); + + // Student's t two-sided 95% for small df. df = n - 1. + // Values from standard tables; ≥10 falls back to normal (1.960). + double t = (n - 1) switch + { + 1 => 12.706, + 2 => 4.303, + 3 => 3.182, + 4 => 2.776, + 5 => 2.571, + 6 => 2.447, + 7 => 2.365, + 8 => 2.306, + 9 => 2.262, + 10 => 2.228, + 11 => 2.201, + 12 => 2.179, + 13 => 2.160, + 14 => 2.145, + 15 => 2.131, + 16 => 2.120, + 17 => 2.110, + 18 => 2.101, + 19 => 2.093, + 20 => 2.086, + _ => 1.960 // normal approximation for df > 20 + }; + return new MeanCi(mean, t * sem, n); + } + + public override string ToString() => + N <= 1 + ? Mean.ToString("F2", CultureInfo.InvariantCulture) + : string.Create(CultureInfo.InvariantCulture, + $"{Mean:F2}±{Ci95HalfWidth:F2}"); + } + + /// + /// Aggregated result over K replications of the same (delay, alpha) + /// configuration. Carries point estimates plus per-metric 95% CI + /// half-widths for the headline metrics reported in the paper: + /// arrival rate λ, service rate μ, mean readiness R̄, mean HOL δ̄, + /// and mean end-to-end latency D̄. + /// + /// The companion field + /// (PerRepMean) holds the existing-style averaged point estimates + /// so downstream code that already consumed EvalResult continues + /// to work unchanged. + /// + public sealed record ReplicatedResult( + int Delay, + double Alpha, + int Replications, + MeanCi Lambda, + MeanCi Mu, + MeanCi ReadinessMeanMs, + MeanCi HolMeanMs, + MeanCi EndToEndMeanMs, + MeanCi EndToEndP99Ms, + MeanCi QueueLengthMean, + MeanCi BatchSizeMean, + EsiurQueueEval.EvalResult PerRepMean); + + public static class ReplicatedEvalAggregator + { + /// + /// Combine K per-replication EvalResult objects into a single + /// ReplicatedResult, computing point estimates and 95% CIs. + /// + public static ReplicatedResult Aggregate( + int delay, + double alpha, + IReadOnlyList reps) + { + if (reps == null) throw new ArgumentNullException(nameof(reps)); + if (reps.Count == 0) throw new ArgumentException("reps is empty.", nameof(reps)); + + var lambda = MeanCi.From(reps.Select(r => r.LambdaEventsPerSecond)); + var mu = MeanCi.From(reps.Select(r => r.MuEventsPerSecond)); + var readiness = MeanCi.From(reps.Select(r => r.Latency.ReadinessMs.Mean)); + var hol = MeanCi.From(reps.Select(r => r.Latency.HolMs.Mean)); + var e2eMean = MeanCi.From(reps.Select(r => r.Latency.EndToEndMs.Mean)); + var e2eP99 = MeanCi.From(reps.Select(r => r.Latency.EndToEndMs.P99)); + var qLen = MeanCi.From(reps.Select(r => r.QueueLength.Mean)); + var batch = MeanCi.From(reps.Select( + r => r.FlushSizeStats?.Mean ?? double.NaN) + .Where(v => !double.IsNaN(v))); + + // Use the existing Average helper for the carry-along point estimates. + var perRepMean = EsiurQueueEval.Average(reps); + + return new ReplicatedResult( + Delay: delay, + Alpha: alpha, + Replications: reps.Count, + Lambda: lambda, + Mu: mu, + ReadinessMeanMs: readiness, + HolMeanMs: hol, + EndToEndMeanMs: e2eMean, + EndToEndP99Ms: e2eP99, + QueueLengthMean: qLen, + BatchSizeMean: batch, + PerRepMean: perRepMean); + } + + public static string CsvHeader => + "delay_ms,alpha,replications," + + "lambda_mean,lambda_ci95," + + "mu_mean,mu_ci95," + + "readiness_mean_ms,readiness_ci95," + + "hol_mean_ms,hol_ci95," + + "e2e_mean_ms,e2e_ci95," + + "e2e_p99_ms,e2e_p99_ci95," + + "queue_len_mean,queue_len_ci95," + + "batch_mean,batch_ci95"; + + public static string ToCsvRow(ReplicatedResult r) + { + var inv = CultureInfo.InvariantCulture; + return string.Create(inv, + $"{r.Delay},{r.Alpha:F3},{r.Replications}," + + $"{r.Lambda.Mean:F3},{r.Lambda.Ci95HalfWidth:F3}," + + $"{r.Mu.Mean:F3},{r.Mu.Ci95HalfWidth:F3}," + + $"{r.ReadinessMeanMs.Mean:F3},{r.ReadinessMeanMs.Ci95HalfWidth:F3}," + + $"{r.HolMeanMs.Mean:F3},{r.HolMeanMs.Ci95HalfWidth:F3}," + + $"{r.EndToEndMeanMs.Mean:F3},{r.EndToEndMeanMs.Ci95HalfWidth:F3}," + + $"{r.EndToEndP99Ms.Mean:F3},{r.EndToEndP99Ms.Ci95HalfWidth:F3}," + + $"{r.QueueLengthMean.Mean:F3},{r.QueueLengthMean.Ci95HalfWidth:F3}," + + $"{r.BatchSizeMean.Mean:F3},{r.BatchSizeMean.Ci95HalfWidth:F3}"); + } + + /// + /// Console-friendly compact summary, one configuration per call. + /// + public static void PrintSummary(ReplicatedResult r) + { + Console.WriteLine(); + Console.WriteLine($"=== Configuration: delay={r.Delay} ms, α={r.Alpha:F2}, " + + $"replications={r.Replications} ==="); + Console.WriteLine("Metric | Mean ± 95% CI half-width"); + Console.WriteLine("----------------+----------------------------------------"); + Console.WriteLine($"λ (/s) | {r.Lambda}"); + Console.WriteLine($"μ (/s) | {r.Mu}"); + Console.WriteLine($"R̄ (ms) | {r.ReadinessMeanMs}"); + Console.WriteLine($"δ̄ (ms) | {r.HolMeanMs}"); + Console.WriteLine($"D̄ (ms) | {r.EndToEndMeanMs}"); + Console.WriteLine($"P99(D) (ms) | {r.EndToEndP99Ms}"); + Console.WriteLine($"Queue length | {r.QueueLengthMean}"); + Console.WriteLine($"Batch size B | {r.BatchSizeMean}"); + } + } +} \ No newline at end of file diff --git a/Tests/Serialization/ComplexObject/DdsCdrCodec.cs b/Tests/Serialization/ComplexObject/DdsCdrCodec.cs new file mode 100644 index 0000000..0f41154 --- /dev/null +++ b/Tests/Serialization/ComplexObject/DdsCdrCodec.cs @@ -0,0 +1,511 @@ +using System; +using System.Collections.Generic; +using System.Linq; + +#nullable enable + +namespace Esiur.Tests.ComplexModel; + +// ============================================================================ +// DdsCdrCodec.cs +// ---------------------------------------------------------------------------- +// Implements ICodec by encoding BusinessDocument as OMG XCDR2 (Extended CDR +// version 2, PLAIN_CDR2, all types FINAL). This is the on-the-wire payload +// format used by every conformant DDS implementation per DDS-XTypes 1.3 for +// final-extensibility types. +// +// The corresponding IDL definition lives in BusinessDocument.idl alongside +// this file. The struct layout, member order, and union discriminator +// values below MUST match the IDL exactly; a divergence between this code +// and the IDL would produce a wire format incompatible with real DDS +// implementations, defeating the purpose of the benchmark. +// +// Encoding choices documented inline. Where the spec offered multiple +// equivalent options (e.g., DateTime as int64 ticks vs. struct), the choice +// that minimizes wire size was selected, so that this measurement reports +// the most favorable DDS payload size achievable for this schema. +// ============================================================================ + +public sealed class DdsCdrCodec : ICodec +{ + public string Name => "DDS-XCDR2"; + + public byte[]? Serialize(BusinessDocument obj) + { + var w = new Xcdr2Writer(capacity: 8192); + WriteBusinessDocument(w, obj); + var bytes = w.ToArray(); + + // Optional self-check (cheap): decode and compare. Mirrors the + // pattern used by FlatBuffersCodec and ProtobufCodec in + // ModelRunner.cs for outside-the-timing-loop validation. + // Disabled by default to keep parity with most other codecs; the + // outer harness performs equality testing. + return bytes; + } + + public BusinessDocument Deserialize(byte[] data) + { + var r = new Xcdr2Reader(data); + return ReadBusinessDocument(r); + } + + // ---- BusinessDocument (FINAL struct, top-level) ----------------------- + // + // In XCDR2 the top-level FINAL struct does NOT carry a DHEADER. It is + // emitted as a plain concatenation of members in declaration order. + // (DDS-XTypes 1.3 §7.4.3.5.3 rule 7: FINAL aggregated types use + // PLAIN_CDR2 without delimiter.) + + private static void WriteBusinessDocument(Xcdr2Writer w, BusinessDocument d) + { + // member 0: optional Header + WriteOptionalStruct(w, d.Header, WriteDocumentHeader); + // member 1: optional Seller + WriteOptionalStruct(w, d.Seller, WriteParty); + // member 2: optional Buyer + WriteOptionalStruct(w, d.Buyer, WriteParty); + // member 3: sequence Items + WriteSequenceOfStruct(w, d.Items, WriteLineItem); + // member 4: sequence Payments + WriteSequenceOfStruct(w, d.Payments, WritePayment); + // member 5: sequence Attachments + WriteSequenceOfStruct(w, d.Attachments, WriteAttachment); + // member 6: sequence RiskScores (primitive, no DHEADER) + WriteSequenceOfInt32(w, d.RiskScores); + } + + private static BusinessDocument ReadBusinessDocument(Xcdr2Reader r) + { + var d = new BusinessDocument + { + Header = ReadOptionalStruct(r, ReadDocumentHeader), + Seller = ReadOptionalStruct(r, ReadParty), + Buyer = ReadOptionalStruct(r, ReadParty), + Items = ReadSequenceOfStruct(r, ReadLineItem), + Payments = ReadSequenceOfStruct(r, ReadPayment), + Attachments = ReadSequenceOfStruct(r, ReadAttachment), + RiskScores = ReadSequenceOfInt32(r), + }; + return d; + } + + // ---- DocumentHeader --------------------------------------------------- + + private static void WriteDocumentHeader(Xcdr2Writer w, DocumentHeader h) + { + // member 0: sequence DocId (primitive seq, no DHEADER) + w.WriteOctetSequence(h.DocId ?? Array.Empty()); + // member 1: DocType Type (enum -> int32) + w.WriteInt32((int)h.Type); + // member 2: int32 Version + w.WriteInt32(h.Version); + // member 3: int64 CreatedAtTicks + w.WriteInt64(h.CreatedAt.Ticks); + // member 4: optional UpdatedAtTicks + if (h.UpdatedAt.HasValue) + { + w.WriteBool(true); + w.WriteInt64(h.UpdatedAt.Value.Ticks); + } + else + { + w.WriteBool(false); + } + // member 5: Currency (enum -> int32) + w.WriteInt32((int)h.Currency); + // member 6: optional Notes + WriteOptionalString(w, h.Notes); + // member 7: sequence Meta (non-primitive seq -> DHEADER) + WriteVariantDictionary(w, h.Meta); + } + + private static DocumentHeader ReadDocumentHeader(Xcdr2Reader r) + { + var h = new DocumentHeader(); + h.DocId = r.ReadOctetSequence(); + h.Type = (DocType)r.ReadInt32(); + h.Version = r.ReadInt32(); + h.CreatedAt = new DateTime(r.ReadInt64()); + h.UpdatedAt = r.ReadBool() ? new DateTime(r.ReadInt64()) : (DateTime?)null; + h.Currency = (Currency)r.ReadInt32(); + h.Notes = ReadOptionalString(r); + h.Meta = ReadVariantDictionary(r); + if (h.Meta != null) + { + h.MetaKeys = h.Meta.Keys.ToArray(); + h.MetaValues = h.Meta.Values.ToArray(); + } + return h; + } + + // ---- Party ------------------------------------------------------------ + + private static void WriteParty(Xcdr2Writer w, Party p) + { + w.WriteUInt64(p.Id); + w.WriteString(p.Name); + WriteOptionalString(w, p.TaxId); + WriteOptionalString(w, p.Email); + WriteOptionalString(w, p.Phone); + // optional
+ if (p.Address != null) + { + w.WriteBool(true); + WriteAddress(w, p.Address); + } + else + { + w.WriteBool(false); + } + WriteOptionalString(w, p.PreferredLanguage); + } + + private static Party ReadParty(Xcdr2Reader r) + { + return new Party + { + Id = r.ReadUInt64(), + Name = r.ReadString(), + TaxId = ReadOptionalString(r), + Email = ReadOptionalString(r), + Phone = ReadOptionalString(r), + Address = r.ReadBool() ? ReadAddress(r) : null, + PreferredLanguage = ReadOptionalString(r), + }; + } + + // ---- Address ---------------------------------------------------------- + + private static void WriteAddress(Xcdr2Writer w, Address a) + { + w.WriteString(a.Line1); + WriteOptionalString(w, a.Line2); + w.WriteString(a.City); + w.WriteString(a.Region); + w.WriteString(a.Country); + WriteOptionalString(w, a.PostalCode); + } + + private static Address ReadAddress(Xcdr2Reader r) + { + return new Address + { + Line1 = r.ReadString(), + Line2 = ReadOptionalString(r), + City = r.ReadString(), + Region = r.ReadString(), + Country = r.ReadString(), + PostalCode = ReadOptionalString(r), + }; + } + + // ---- LineItem --------------------------------------------------------- + + private static void WriteLineItem(Xcdr2Writer w, LineItem li) + { + w.WriteInt32(li.LineNo); + w.WriteInt32((int)li.Type); + w.WriteString(li.SKU); + w.WriteString(li.Description); + w.WriteDouble(li.Qty); + w.WriteString(li.QtyUnit); + w.WriteDouble(li.UnitPrice); + WriteOptionalDouble(w, li.VatRate); + WriteOptionalDouble(w, li.Discount); + WriteVariantDictionary(w, li.Ext); + } + + private static LineItem ReadLineItem(Xcdr2Reader r) + { + var li = new LineItem + { + LineNo = r.ReadInt32(), + Type = (LineType)r.ReadInt32(), + SKU = r.ReadString(), + Description = r.ReadString(), + Qty = r.ReadDouble(), + QtyUnit = r.ReadString(), + UnitPrice = r.ReadDouble(), + VatRate = ReadOptionalDouble(r), + Discount = ReadOptionalDouble(r), + Ext = ReadVariantDictionary(r), + }; + if (li.Ext != null) + { + li.ExtKeys = li.Ext.Keys.ToArray(); + li.ExtValues = li.Ext.Values.ToArray(); + } + return li; + } + + // ---- Payment ---------------------------------------------------------- + + private static void WritePayment(Xcdr2Writer w, Payment p) + { + w.WriteInt32((int)p.Method); + w.WriteDouble(p.Amount); + WriteOptionalString(w, p.Reference); + w.WriteInt64(p.Timestamp.Ticks); + WriteOptionalDouble(w, p.Fee); + } + + private static Payment ReadPayment(Xcdr2Reader r) + { + return new Payment + { + Method = (PaymentMethod)r.ReadInt32(), + Amount = r.ReadDouble(), + Reference = ReadOptionalString(r), + Timestamp = new DateTime(r.ReadInt64()), + Fee = ReadOptionalDouble(r), + }; + } + + // ---- Attachment ------------------------------------------------------- + + private static void WriteAttachment(Xcdr2Writer w, Attachment a) + { + w.WriteString(a.Name); + w.WriteString(a.MimeType); + w.WriteOctetSequence(a.Data); + } + + private static Attachment ReadAttachment(Xcdr2Reader r) + { + return new Attachment + { + Name = r.ReadString(), + MimeType = r.ReadString(), + Data = r.ReadOctetSequence(), + }; + } + + // ---- Variant (union discriminated by Kind) ---------------------------- + // + // IDL mapping (see BusinessDocument.idl): + // union Variant switch(int32 /* Kind */) { + // case 0 (Null): /* no member */; + // case 1 (Bool): boolean b; + // case 2 (Int64): int64 i64; + // case 3 (UInt64): uint64 u64; + // case 4 (Double): double d; + // case 6 (String): string s; + // case 7 (Bytes): sequence by; + // case 8 (DateTime): int64 dt; // ticks + // case 9 (Guid): octet[16] g; + // }; + // + // XCDR2 union encoding: int32 discriminator + the selected branch. + + private static void WriteVariant(Xcdr2Writer w, Variant v) + { + int tag = (int)v.Tag; + w.WriteInt32(tag); + switch (v.Tag) + { + case Variant.Kind.Null: + break; + case Variant.Kind.Bool: + w.WriteBool(v.Bool ?? false); + break; + case Variant.Kind.Int64: + w.WriteInt64(v.I64 ?? 0); + break; + case Variant.Kind.UInt64: + w.WriteUInt64(v.U64 ?? 0); + break; + case Variant.Kind.Double: + case Variant.Kind.Decimal: // Decimal mapped to double in IDL + w.WriteDouble(v.F64 ?? 0.0); + break; + case Variant.Kind.String: + w.WriteString(v.Str ?? ""); + break; + case Variant.Kind.Bytes: + w.WriteOctetSequence(v.Bytes ?? Array.Empty()); + break; + case Variant.Kind.DateTime: + w.WriteInt64(v.Dt?.Ticks ?? v.DtAsLong); + break; + case Variant.Kind.Guid: + w.WriteOctetArrayFixed(v.Guid ?? new byte[16], 16); + break; + default: + throw new InvalidOperationException($"Unknown Variant kind {v.Tag}"); + } + } + + private static Variant ReadVariant(Xcdr2Reader r) + { + var tag = (Variant.Kind)r.ReadInt32(); + var v = new Variant { Tag = tag }; + switch (tag) + { + case Variant.Kind.Null: + break; + case Variant.Kind.Bool: + v.Bool = r.ReadBool(); + break; + case Variant.Kind.Int64: + v.I64 = r.ReadInt64(); + break; + case Variant.Kind.UInt64: + v.U64 = r.ReadUInt64(); + break; + case Variant.Kind.Double: + case Variant.Kind.Decimal: + v.F64 = r.ReadDouble(); + break; + case Variant.Kind.String: + v.Str = r.ReadString(); + break; + case Variant.Kind.Bytes: + v.Bytes = r.ReadOctetSequence(); + break; + case Variant.Kind.DateTime: + { + long ticks = r.ReadInt64(); + v.Dt = new DateTime(ticks); + v.DtAsLong = ticks; + break; + } + case Variant.Kind.Guid: + v.Guid = r.ReadOctetArrayFixed(16); + break; + default: + throw new InvalidOperationException($"Unknown Variant kind {tag}"); + } + return v; + } + + // ---- Dictionary -> sequence --------------- + // + // IDL: struct MetaEntry { string key; Variant value; }; + // sequence ... + // + // Non-primitive sequence: per XCDR2 rule 15, MUST emit DHEADER before + // the sequence length and elements. + + private static void WriteVariantDictionary(Xcdr2Writer w, Dictionary? dict) + { + int token = w.BeginDHeader(); + if (dict == null) + { + w.WriteUInt32(0); + } + else + { + w.WriteUInt32((uint)dict.Count); + foreach (var kv in dict) + { + w.WriteString(kv.Key); + WriteVariant(w, kv.Value); + } + } + w.EndDHeader(token); + } + + private static Dictionary? ReadVariantDictionary(Xcdr2Reader r) + { + _ = r.ReadDHeader(); // size, ignored (schema-driven decode) + uint n = r.ReadUInt32(); + if (n == 0) return new Dictionary(); + var d = new Dictionary((int)n); + for (uint i = 0; i < n; i++) + { + var k = r.ReadString(); + var v = ReadVariant(r); + d[k] = v; + } + return d; + } + + // ---- helpers: optional and sequence ----------------------------- + + private static void WriteOptionalString(Xcdr2Writer w, string? s) + { + if (s is null) { w.WriteBool(false); } + else { w.WriteBool(true); w.WriteString(s); } + } + + private static string? ReadOptionalString(Xcdr2Reader r) + => r.ReadBool() ? r.ReadString() : null; + + private static void WriteOptionalDouble(Xcdr2Writer w, double? v) + { + if (v is null) { w.WriteBool(false); } + else { w.WriteBool(true); w.WriteDouble(v.Value); } + } + + private static double? ReadOptionalDouble(Xcdr2Reader r) + => r.ReadBool() ? r.ReadDouble() : null; + + private static void WriteOptionalStruct( + Xcdr2Writer w, T? value, Action writeInner) + where T : class + { + if (value is null) { w.WriteBool(false); } + else { w.WriteBool(true); writeInner(w, value); } + } + + private static T? ReadOptionalStruct( + Xcdr2Reader r, Func readInner) + where T : class + { + return r.ReadBool() ? readInner(r) : null; + } + + private static void WriteSequenceOfStruct( + Xcdr2Writer w, T[]? arr, Action writeInner) + { + // Non-primitive sequence: DHEADER required. + int token = w.BeginDHeader(); + if (arr is null) + { + w.WriteUInt32(0); + } + else + { + w.WriteUInt32((uint)arr.Length); + for (int i = 0; i < arr.Length; i++) + writeInner(w, arr[i]); + } + w.EndDHeader(token); + } + + private static T[] ReadSequenceOfStruct( + Xcdr2Reader r, Func readInner) + { + _ = r.ReadDHeader(); + uint n = r.ReadUInt32(); + var arr = new T[n]; + for (uint i = 0; i < n; i++) + arr[i] = readInner(r); + return arr; + } + + private static void WriteSequenceOfInt32(Xcdr2Writer w, int[]? arr) + { + // Primitive sequence: NO DHEADER per XCDR2 rule 14. + if (arr is null) + { + w.WriteUInt32(0); + } + else + { + w.WriteUInt32((uint)arr.Length); + for (int i = 0; i < arr.Length; i++) + w.WriteInt32(arr[i]); + } + } + + private static int[] ReadSequenceOfInt32(Xcdr2Reader r) + { + uint n = r.ReadUInt32(); + var arr = new int[n]; + for (uint i = 0; i < n; i++) + arr[i] = r.ReadInt32(); + return arr; + } +} \ No newline at end of file diff --git a/Tests/Serialization/ComplexObject/ModelRunner.cs b/Tests/Serialization/ComplexObject/ModelRunner.cs index 31b2cc1..a9f4da9 100644 --- a/Tests/Serialization/ComplexObject/ModelRunner.cs +++ b/Tests/Serialization/ComplexObject/ModelRunner.cs @@ -279,13 +279,14 @@ public sealed class ModelRunner _codecs = new ICodec[] { new JsonCodec(), - new EsiurCodec(), + new EsiurCodec(), + new DdsCdrCodec(), new MessagePackCodec(), new ProtobufCodec(), new FlatBuffersCodec(), new CborCodec(), new BsonCodec(), - new AvroCodec() + new AvroCodec(), }; } @@ -380,7 +381,7 @@ public sealed class ModelRunner Console.WriteLine(); Console.WriteLine("{0,-14} {1,12} {2,12} {3,10} {4,26} {5,18} {6,18}", - "Codec", "Mean(B)", "Median(B)", "Ratio", "Class vs JSON", "Enc CPU (µs)", "Dec CPU (µs)"); + "Codec", "Mean(B)", "Median(B)", "Ratio", "Reduction", "Enc CPU (µs)", "Dec CPU (µs)"); Console.WriteLine(new string('-', 118)); foreach (var c in _codecs) @@ -397,18 +398,19 @@ public sealed class ModelRunner string meanS = double.IsNaN(mean) ? "N/A" : mean.ToString("F1"); string medS = double.IsNaN(med) ? "N/A" : med.ToString("F1"); string ratioS = double.IsNaN(ratio) ? "N/A" : ratio.ToString("F3"); + string reduction = double.IsNaN(ratio) ? "N/A" : ((1 - ratio) * 100).ToString("F3"); // average CPU µs/op across samples where serialization succeeded string encCpuS = (r.Samples == 0) ? "N/A" : (r.EncodeCpuUsSum / r.Samples).ToString("F1"); string decCpuS = (r.Samples == 0) ? "N/A" : (r.DecodeCpuUsSum / r.Samples).ToString("F1"); Console.WriteLine("{0,-14} {1,12} {2,12} {3,10} {4,26} {5,18} {6,18}", - c.Name, meanS, medS, ratioS, cls, encCpuS, decCpuS); + c.Name, meanS, medS, ratioS, reduction, encCpuS, decCpuS); } Console.WriteLine(); - Console.ReadLine(); + //Console.ReadLine(); } } diff --git a/Tests/Serialization/ComplexObject/Xcdr2Writer.cs b/Tests/Serialization/ComplexObject/Xcdr2Writer.cs new file mode 100644 index 0000000..5534a2c --- /dev/null +++ b/Tests/Serialization/ComplexObject/Xcdr2Writer.cs @@ -0,0 +1,332 @@ +using System; +using System.Buffers.Binary; +using System.IO; +using System.Text; + +#nullable enable + +namespace Esiur.Tests.ComplexModel; + +// ============================================================================ +// Xcdr2Stream.cs +// ---------------------------------------------------------------------------- +// OMG Extended Common Data Representation (XCDR) Version 2 encoder/decoder. +// +// Implements PLAIN_CDR2 for FINAL-extensibility structures, the most compact +// XCDR2 mode defined by DDS-XTypes 1.3 (OMG formal/2024-04-01). This mode is +// the on-the-wire format used by every conformant DDS implementation when +// the @final annotation is applied (or no extensibility annotation is given +// and the implementation defaults to final). +// +// Implemented rules (DDS-XTypes 1.3, §7.4.3.5.3 Complete Serialization Rules): +// - Encapsulation header (4 bytes): representation_identifier (2B) + +// options (2B). We use CDR2_LE = 0x00 0x09 for the identifier and +// 0x00 0x00 for the options field. +// - Maximum alignment is 4 bytes (vs 8 in XCDR1). 64-bit primitives align +// to 4, not 8. +// - Strings: uint32 length-including-null + UTF-8 bytes + 0x00 terminator. +// - Sequences of primitives: uint32 length + elements (rule 14, no DHEADER). +// - Sequences of non-primitives: DHEADER (uint32, bytes-remaining) + +// uint32 length + elements (rule 15). +// - Optionals: 1-byte is_present + value if present (rule 9). +// - Unions (Variant): int32 discriminator aligned to 4 + selected branch. +// - Octet arrays of fixed length: emitted as-is, no length prefix. +// +// Reference implementations consulted: +// - foxglove/cdr (https://github.com/foxglove/cdr) +// - eclipse-cyclonedds dds_cdrstream.c +// - eProsima Fast-CDR +// ============================================================================ + +internal sealed class Xcdr2Writer +{ + private byte[] _buf; + private int _pos; + + public Xcdr2Writer(int capacity = 4096) + { + _buf = new byte[capacity]; + _pos = 0; + WriteEncapsulationHeader(); + } + + public int Position => _pos; + + public byte[] ToArray() + { + var result = new byte[_pos]; + Buffer.BlockCopy(_buf, 0, result, 0, _pos); + return result; + } + + private void WriteEncapsulationHeader() + { + // CDR2_LE representation_identifier (DDS-RTPS table 10.3) + Ensure(4); + _buf[_pos++] = 0x00; + _buf[_pos++] = 0x09; + _buf[_pos++] = 0x00; + _buf[_pos++] = 0x00; + } + + // The encapsulation header is NOT counted when computing alignment, per + // DDS-XTypes 1.3 §7.4.3.4: alignment is measured from the start of the + // user payload (byte 4). + private int PayloadPos => _pos - 4; + + private void Align(int n) + { + // XCDR2 caps max alignment at 4. Callers pass 1, 2, 4 only. + int mis = PayloadPos & (n - 1); + if (mis == 0) return; + int pad = n - mis; + Ensure(pad); + for (int i = 0; i < pad; i++) _buf[_pos++] = 0x00; + } + + private void Ensure(int extra) + { + if (_pos + extra <= _buf.Length) return; + int newCap = _buf.Length * 2; + while (newCap < _pos + extra) newCap *= 2; + var nb = new byte[newCap]; + Buffer.BlockCopy(_buf, 0, nb, 0, _pos); + _buf = nb; + } + + // ---- primitive writers ---- + + public void WriteByte(byte v) + { + Ensure(1); + _buf[_pos++] = v; + } + + public void WriteBool(bool v) => WriteByte(v ? (byte)1 : (byte)0); + + public void WriteInt16(short v) + { + Align(2); + Ensure(2); + BinaryPrimitives.WriteInt16LittleEndian(_buf.AsSpan(_pos), v); + _pos += 2; + } + + public void WriteUInt16(ushort v) + { + Align(2); + Ensure(2); + BinaryPrimitives.WriteUInt16LittleEndian(_buf.AsSpan(_pos), v); + _pos += 2; + } + + public void WriteInt32(int v) + { + Align(4); + Ensure(4); + BinaryPrimitives.WriteInt32LittleEndian(_buf.AsSpan(_pos), v); + _pos += 4; + } + + public void WriteUInt32(uint v) + { + Align(4); + Ensure(4); + BinaryPrimitives.WriteUInt32LittleEndian(_buf.AsSpan(_pos), v); + _pos += 4; + } + + // XCDR2: 64-bit primitives align to 4, NOT 8 (per max-alignment rule) + public void WriteInt64(long v) + { + Align(4); + Ensure(8); + BinaryPrimitives.WriteInt64LittleEndian(_buf.AsSpan(_pos), v); + _pos += 8; + } + + public void WriteUInt64(ulong v) + { + Align(4); + Ensure(8); + BinaryPrimitives.WriteUInt64LittleEndian(_buf.AsSpan(_pos), v); + _pos += 8; + } + + public void WriteDouble(double v) => WriteInt64(BitConverter.DoubleToInt64Bits(v)); + + public void WriteString(string s) + { + var bytes = Encoding.UTF8.GetBytes(s); + WriteUInt32((uint)(bytes.Length + 1)); // includes null terminator + Ensure(bytes.Length + 1); + Buffer.BlockCopy(bytes, 0, _buf, _pos, bytes.Length); + _pos += bytes.Length; + _buf[_pos++] = 0x00; // null terminator + } + + // Fixed-length octet array (e.g., 16-byte GUID). + // No length prefix; just the bytes. + public void WriteOctetArrayFixed(byte[] data, int expectedLen) + { + if (data.Length != expectedLen) + throw new ArgumentException($"Expected {expectedLen} bytes, got {data.Length}"); + Ensure(expectedLen); + Buffer.BlockCopy(data, 0, _buf, _pos, expectedLen); + _pos += expectedLen; + } + + // Variable-length octet sequence: uint32 length + bytes. + // No DHEADER (octet is primitive). + public void WriteOctetSequence(byte[] data) + { + WriteUInt32((uint)data.Length); + Ensure(data.Length); + Buffer.BlockCopy(data, 0, _buf, _pos, data.Length); + _pos += data.Length; + } + + // DHEADER for sequences of non-primitive types and for non-final structs + // and for optionals containing complex types. Reserves 4 bytes now, + // returns a token used by EndDHeader to backfill the size. + public int BeginDHeader() + { + Align(4); + Ensure(4); + int token = _pos; + // placeholder, will be backfilled + _buf[_pos++] = 0; _buf[_pos++] = 0; _buf[_pos++] = 0; _buf[_pos++] = 0; + return token; + } + + public void EndDHeader(int token) + { + // Size = number of bytes after the DHEADER, exclusive of the DHEADER + // itself. (DDS-XTypes 1.3 §7.4.3.5.1 D-HEADER definition.) + int sizeAfter = _pos - (token + 4); + BinaryPrimitives.WriteUInt32LittleEndian(_buf.AsSpan(token), (uint)sizeAfter); + } +} + +internal sealed class Xcdr2Reader +{ + private readonly byte[] _buf; + private int _pos; + private readonly bool _littleEndian; + + public Xcdr2Reader(byte[] data) + { + _buf = data; + // Encapsulation header (4 bytes) + if (data.Length < 4) throw new IOException("Truncated XCDR2 stream"); + if (data[0] != 0x00 || (data[1] != 0x09 && data[1] != 0x0A)) + throw new IOException( + $"Not an XCDR2 stream (representation_identifier {data[0]:X2} {data[1]:X2})"); + _littleEndian = data[1] == 0x09; + if (!_littleEndian) + throw new NotSupportedException("Only CDR2_LE is implemented in this benchmark."); + _pos = 4; + } + + private int PayloadPos => _pos - 4; + + private void Align(int n) + { + int mis = PayloadPos & (n - 1); + if (mis == 0) return; + _pos += (n - mis); + } + + public byte ReadByte() => _buf[_pos++]; + + public bool ReadBool() => ReadByte() != 0; + + public short ReadInt16() + { + Align(2); + var v = BinaryPrimitives.ReadInt16LittleEndian(_buf.AsSpan(_pos)); + _pos += 2; + return v; + } + + public ushort ReadUInt16() + { + Align(2); + var v = BinaryPrimitives.ReadUInt16LittleEndian(_buf.AsSpan(_pos)); + _pos += 2; + return v; + } + + public int ReadInt32() + { + Align(4); + var v = BinaryPrimitives.ReadInt32LittleEndian(_buf.AsSpan(_pos)); + _pos += 4; + return v; + } + + public uint ReadUInt32() + { + Align(4); + var v = BinaryPrimitives.ReadUInt32LittleEndian(_buf.AsSpan(_pos)); + _pos += 4; + return v; + } + + public long ReadInt64() + { + Align(4); // XCDR2 max alignment + var v = BinaryPrimitives.ReadInt64LittleEndian(_buf.AsSpan(_pos)); + _pos += 8; + return v; + } + + public ulong ReadUInt64() + { + Align(4); + var v = BinaryPrimitives.ReadUInt64LittleEndian(_buf.AsSpan(_pos)); + _pos += 8; + return v; + } + + public double ReadDouble() => BitConverter.Int64BitsToDouble(ReadInt64()); + + public string ReadString() + { + uint lenIncNull = ReadUInt32(); + if (lenIncNull == 0) + throw new IOException("XCDR2 string length must include null terminator (>= 1)"); + int payloadLen = (int)lenIncNull - 1; + var s = Encoding.UTF8.GetString(_buf, _pos, payloadLen); + _pos += payloadLen; + // consume null terminator + if (_buf[_pos] != 0x00) + throw new IOException("XCDR2 string missing null terminator"); + _pos += 1; + return s; + } + + public byte[] ReadOctetArrayFixed(int len) + { + var result = new byte[len]; + Buffer.BlockCopy(_buf, _pos, result, 0, len); + _pos += len; + return result; + } + + public byte[] ReadOctetSequence() + { + uint len = ReadUInt32(); + var result = new byte[len]; + Buffer.BlockCopy(_buf, _pos, result, 0, (int)len); + _pos += (int)len; + return result; + } + + public int ReadDHeader() + { + // We don't actually need to use the size for decoding because we know + // the schema; we just consume the 4 bytes. + return (int)ReadUInt32(); + } +} \ No newline at end of file