2
0
mirror of https://github.com/esiur/esiur-dotnet.git synced 2026-06-13 14:38:43 +00:00

new tests

This commit is contained in:
2026-05-25 14:12:56 +03:00
parent eb323e8bf8
commit 7e27d3cfac
16 changed files with 1982 additions and 42 deletions
+33
View File
@@ -78,6 +78,20 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Server", "Server", "{02A07E
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.Queueing.Server", "Tests\Distribution\Queueing\Server\Esiur.Tests.Queueing.Server.csproj", "{7FD57668-2AD8-0F53-4006-03927B5A385C}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.Queueing.Server", "Tests\Distribution\Queueing\Server\Esiur.Tests.Queueing.Server.csproj", "{7FD57668-2AD8-0F53-4006-03927B5A385C}"
EndProject EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "NodeFanoutSweep", "NodeFanoutSweep", "{33D973D8-4D3E-47BA-8135-FCA0CFF7E210}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Server", "Server", "{57E80693-7AFC-4446-87DE-25E97C036E2F}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Client", "Client", "{21D42B96-99F9-4E48-A499-5170A5A9597F}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.NodeFanoutSweep.Server", "Tests\Distribution\NodeFanoutSweep\Server\Esiur.Tests.NodeFanoutSweep.Server.csproj", "{9FF626DF-1AD4-2BE1-F834-F49121D65085}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.NodeFanoutSweep.Client", "Tests\Distribution\NodeFanoutSweep\Client\Esiur.Tests.NodeFanoutSweep.Client.csproj", "{550A20AB-8E97-BCDD-9F54-27823663120A}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ConcurrentAttachSweep", "ConcurrentAttachSweep", "{E713D25F-2602-44C9-AB9E-C9477FB2BA93}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.ConcurrentAttachSweep", "Tests\Distribution\ConcurrentAttachSweep\Esiur.Tests.ConcurrentAttachSweep.csproj", "{3FFB2BF4-159E-3073-4BDF-08AE93C7A2C1}"
EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU Debug|Any CPU = Debug|Any CPU
@@ -156,6 +170,18 @@ Global
{7FD57668-2AD8-0F53-4006-03927B5A385C}.Debug|Any CPU.Build.0 = Debug|Any CPU {7FD57668-2AD8-0F53-4006-03927B5A385C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7FD57668-2AD8-0F53-4006-03927B5A385C}.Release|Any CPU.ActiveCfg = Release|Any CPU {7FD57668-2AD8-0F53-4006-03927B5A385C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7FD57668-2AD8-0F53-4006-03927B5A385C}.Release|Any CPU.Build.0 = Release|Any CPU {7FD57668-2AD8-0F53-4006-03927B5A385C}.Release|Any CPU.Build.0 = Release|Any CPU
{9FF626DF-1AD4-2BE1-F834-F49121D65085}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9FF626DF-1AD4-2BE1-F834-F49121D65085}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9FF626DF-1AD4-2BE1-F834-F49121D65085}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9FF626DF-1AD4-2BE1-F834-F49121D65085}.Release|Any CPU.Build.0 = Release|Any CPU
{550A20AB-8E97-BCDD-9F54-27823663120A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{550A20AB-8E97-BCDD-9F54-27823663120A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{550A20AB-8E97-BCDD-9F54-27823663120A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{550A20AB-8E97-BCDD-9F54-27823663120A}.Release|Any CPU.Build.0 = Release|Any CPU
{3FFB2BF4-159E-3073-4BDF-08AE93C7A2C1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{3FFB2BF4-159E-3073-4BDF-08AE93C7A2C1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3FFB2BF4-159E-3073-4BDF-08AE93C7A2C1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3FFB2BF4-159E-3073-4BDF-08AE93C7A2C1}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection EndGlobalSection
GlobalSection(SolutionProperties) = preSolution GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE HideSolutionNode = FALSE
@@ -193,6 +219,13 @@ Global
{543158AD-BCB6-44A5-91AB-FE97B42A9C95} = {6F173323-75C1-4142-A4FE-0CEC2EB5EAF9} {543158AD-BCB6-44A5-91AB-FE97B42A9C95} = {6F173323-75C1-4142-A4FE-0CEC2EB5EAF9}
{02A07E3C-67DB-4489-88E3-D568C477F540} = {6F173323-75C1-4142-A4FE-0CEC2EB5EAF9} {02A07E3C-67DB-4489-88E3-D568C477F540} = {6F173323-75C1-4142-A4FE-0CEC2EB5EAF9}
{7FD57668-2AD8-0F53-4006-03927B5A385C} = {02A07E3C-67DB-4489-88E3-D568C477F540} {7FD57668-2AD8-0F53-4006-03927B5A385C} = {02A07E3C-67DB-4489-88E3-D568C477F540}
{33D973D8-4D3E-47BA-8135-FCA0CFF7E210} = {94C8CFDB-C7C6-40DF-A596-647FEEA3C917}
{57E80693-7AFC-4446-87DE-25E97C036E2F} = {33D973D8-4D3E-47BA-8135-FCA0CFF7E210}
{21D42B96-99F9-4E48-A499-5170A5A9597F} = {33D973D8-4D3E-47BA-8135-FCA0CFF7E210}
{9FF626DF-1AD4-2BE1-F834-F49121D65085} = {57E80693-7AFC-4446-87DE-25E97C036E2F}
{550A20AB-8E97-BCDD-9F54-27823663120A} = {21D42B96-99F9-4E48-A499-5170A5A9597F}
{E713D25F-2602-44C9-AB9E-C9477FB2BA93} = {94C8CFDB-C7C6-40DF-A596-647FEEA3C917}
{3FFB2BF4-159E-3073-4BDF-08AE93C7A2C1} = {E713D25F-2602-44C9-AB9E-C9477FB2BA93}
EndGlobalSection EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {C584421D-5EC0-4821-B7D8-2633D8D405F2} SolutionGuid = {C584421D-5EC0-4821-B7D8-2633D8D405F2}
+1 -37
View File
@@ -1805,8 +1805,6 @@ public static class DataDeserializer
} }
} }
public static AsyncReply TypedParserAsync(ParsedTdu tdu, EpConnection connection, uint[] requestSequence) public static AsyncReply TypedParserAsync(ParsedTdu tdu, EpConnection connection, uint[] requestSequence)
{ {
var tru = tdu.Metadata; var tru = tdu.Metadata;
@@ -1831,20 +1829,6 @@ public static class DataDeserializer
else if (tru is TruTypeDef truTypeDef) else if (tru is TruTypeDef truTypeDef)
{ {
return TypedObjectParserAsync(tdu, truTypeDef.TypeDef, connection, requestSequence); return TypedObjectParserAsync(tdu, truTypeDef.TypeDef, connection, requestSequence);
// return tru.Identifier switch
// {
// TruIdentifier.LocalType8 or
// TruIdentifier.LocalType16 or
// TruIdentifier.LocalType32 or
// TruIdentifier.LocalType64 or
// TruIdentifier.RemoteType8 or
// TruIdentifier.RemoteType16 or
// TruIdentifier.RemoteType32 or
// TruIdentifier.RemoteType64 => TypedObjectParserAsync(tdu, truTypeDef.TypeDef, connection, requestSequence);
//,
// _ => throw new Exception("Unsupported type for typed parser.")
// };
} }
throw new Exception("Unknown TRU."); throw new Exception("Unknown TRU.");
@@ -1874,32 +1858,12 @@ public static class DataDeserializer
} }
else if (tru is TruTypeDef truTypeDef) else if (tru is TruTypeDef truTypeDef)
{ {
return tru.Identifier switch return TypedObjectParser(tdu, truTypeDef.TypeDef, warehouse);
{
TruIdentifier.LocalType8 or
TruIdentifier.LocalType16 or
TruIdentifier.LocalType32 or
TruIdentifier.LocalType64 or
TruIdentifier.RemoteType8 or
TruIdentifier.RemoteType16 or
TruIdentifier.RemoteType32 or
TruIdentifier.RemoteType64 => TypedObjectParser(tdu, truTypeDef.TypeDef, warehouse),
_ => throw new Exception("Unsupported type for typed parser.")
};
} }
throw new Exception("Unknown TRU."); throw new Exception("Unknown TRU.");
} }
//public static object TypedListParser(ParsedTdu tdu, Warehouse warehouse)
//{
// // get the type
// var (hdrCs, tru) = Tru.Parse(tdu.Metadata, 0);
// return TypedArrayParser(tdu, tru, warehouse);
//}
public static AsyncBag<PropertyValue> PropertyValueArrayParserAsync(byte[] data, uint offset, uint length, EpConnection connection, uint[] requestSequence)//, bool ageIncluded = true) public static AsyncBag<PropertyValue> PropertyValueArrayParserAsync(byte[] data, uint offset, uint length, EpConnection connection, uint[] requestSequence)//, bool ageIncluded = true)
{ {
var rt = new AsyncBag<PropertyValue>(); var rt = new AsyncBag<PropertyValue>();
+8
View File
@@ -0,0 +1,8 @@
using System;
public class Class1
{
public Class1()
{
}
}
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\Libraries\Esiur\Esiur.csproj" OutputItemType="Analyzer"/>
</ItemGroup>
</Project>
@@ -0,0 +1,278 @@
// ============================================================
// Scalability Extension: Concurrent Attach Sweep
// ------------------------------------------------------------
// Extends Tests/Distribution/ConcurrentAttach with:
// - Sweep over a wider range of concurrent request counts A.
// - More rounds per A for confidence-interval reporting.
// - Auto-stop when timeouts or failures appear (the
// saturation signal for concurrent attach is different
// from fan-out: it's *correctness* failure, not slowdown).
// - Unified CSV output suitable for direct plotting.
//
// Server: re-use the existing
// Tests/Distribution/ConcurrentAttach with --mode server.
// Or run this binary with --mode both.
// ------------------------------------------------------------
// Usage:
// dotnet run -- --mode both --resources 200 \
// --a-values 10,25,50,100,250,500,1000,2000 \
// --rounds 10 --timeout 10000
// ============================================================
using Esiur.Protocol;
using Esiur.Resource;
using Esiur.Stores;
using Esiur.Tests.ConcurrentAttachSweep;
using System.Data.Common;
using System.Diagnostics;
using System.Globalization;
var mode = GetArg(args, "--mode", "both");
var host = GetArg(args, "--host", "127.0.0.1");
var port = int.Parse(GetArg(args, "--port", "10902"));
var resources = int.Parse(GetArg(args, "--resources", "200"));
var timeoutMs = int.Parse(GetArg(args, "--timeout", "10000"));
var rounds = int.Parse(GetArg(args, "--rounds", "10"));
var aValStr = GetArg(args, "--a-values", "10,25,50,100,250,500,1000,2000");
var outCsv = GetArg(args, "--output", "concurrent_attach_sweep.csv");
var aValues = aValStr.Split(',').Select(int.Parse).ToArray();
var serverWh = new Warehouse();
var clientWh = new Warehouse();
// ----------------------------------------------------------------
// SERVER SIDE
// ----------------------------------------------------------------
if (mode == "server" || mode == "both")
{
await serverWh.Put("sys", new MemoryStore());
await serverWh.Put("sys/server", new EpServer() { Port = (ushort)port });
for (int i = 0; i < resources; i++)
{
await serverWh.Put($"sys/sensor_{i}", new SensorResource { SensorId = i, Value = i });
}
await serverWh.Open();
Console.WriteLine($"[Server-T3+] Ready: {resources} resources on port {port}");
if (mode == "server")
{
Console.WriteLine("Press ENTER to stop.");
Console.ReadLine();
await serverWh.Close();
return;
}
await Task.Delay(500);
}
// ----------------------------------------------------------------
// CLIENT SIDE: sweep over A values
// ----------------------------------------------------------------
Console.WriteLine($"[Client-T3+] resources={resources} timeout={timeoutMs}ms rounds={rounds}");
Console.WriteLine($"[Client-T3+] A values: {string.Join(",", aValues)}");
var summary = new List<ASummary>();
bool failureDetected = false;
foreach (int A in aValues)
{
if (failureDetected)
{
Console.WriteLine($"\n[Client-T3+] A={A}: SKIPPED (failure at lower A)");
continue;
}
Console.WriteLine($"\n[Client-T3+] === A={A} ===");
var roundResults = new List<RoundResult>();
for (int round = 0; round < rounds; round++)
{
var rng = new Random(round * 1000 + A);
var targets = Enumerable.Range(0, A)
.Select(_ => rng.Next(resources))
.ToArray();
long succeeded = 0, failed = 0, timedOut = 0;
var latencies = new double[A];
var roundSw = Stopwatch.StartNew();
// One shared connection per round, matching the existing test methodology
var connection = await clientWh.Get<EpConnection>($"ep://{host}:{port}");
var tasks = targets.Select((resourceIdx, taskIdx) => Task.Run(async () =>
{
var sw = Stopwatch.StartNew();
using var cts = new CancellationTokenSource(timeoutMs);
try
{
var proxy = await connection.Get($"sys/sensor_{resourceIdx}");
sw.Stop();
latencies[taskIdx] = sw.Elapsed.TotalMilliseconds;
if (proxy != null) Interlocked.Increment(ref succeeded);
else Interlocked.Increment(ref failed);
}
catch (OperationCanceledException)
{
sw.Stop();
latencies[taskIdx] = timeoutMs;
Interlocked.Increment(ref timedOut);
}
catch
{
sw.Stop();
latencies[taskIdx] = sw.Elapsed.TotalMilliseconds;
Interlocked.Increment(ref failed);
}
})).ToArray();
await Task.WhenAll(tasks);
roundSw.Stop();
var sorted = latencies.OrderBy(x => x).ToArray();
int n = sorted.Length;
var rr = new RoundResult
{
Round = round + 1,
A = A,
Succeeded = succeeded,
Failed = failed,
TimedOut = timedOut,
WallMs = roundSw.Elapsed.TotalMilliseconds,
P50 = sorted[Math.Min(n - 1, (int)(n * 0.50))],
P95 = sorted[Math.Min(n - 1, (int)(n * 0.95))],
P99 = sorted[Math.Min(n - 1, (int)(n * 0.99))],
Max = sorted[n - 1],
};
roundResults.Add(rr);
Console.WriteLine($" round {round + 1}/{rounds}: ok={succeeded}/{A} fail={failed} "
+ $"timeout={timedOut} wall={rr.WallMs:F0}ms p50={rr.P50:F0} p99={rr.P99:F0}");
// Round 1 of each A is conventionally excluded from latency
// aggregation due to connection-establishment overhead (matches
// the existing test methodology).
GC.Collect();
await Task.Delay(500);
}
var steady = roundResults.Skip(1).ToList(); // exclude round 1
if (steady.Count == 0) steady = roundResults;
var anyFailure = roundResults.Any(r => r.Failed > 0 || r.TimedOut > 0);
var s = new ASummary
{
A = A,
Rounds = roundResults.Count,
AnyFailures = anyFailure,
TotalSucceeded = roundResults.Sum(r => r.Succeeded),
TotalFailed = roundResults.Sum(r => r.Failed),
TotalTimedOut = roundResults.Sum(r => r.TimedOut),
MeanP50 = steady.Average(r => r.P50),
Ci95P50 = ConfidenceIntervalHalfWidth95(steady.Select(r => r.P50).ToArray()),
MeanP99 = steady.Average(r => r.P99),
Ci95P99 = ConfidenceIntervalHalfWidth95(steady.Select(r => r.P99).ToArray()),
MeanWall = steady.Average(r => r.WallMs),
Ci95Wall = ConfidenceIntervalHalfWidth95(steady.Select(r => r.WallMs).ToArray()),
};
summary.Add(s);
Console.WriteLine($" [A={A}] SUMMARY: "
+ $"p50={s.MeanP50:F1}±{s.Ci95P50:F1} "
+ $"p99={s.MeanP99:F1}±{s.Ci95P99:F1} "
+ $"wall={s.MeanWall:F0}±{s.Ci95Wall:F0}ms "
+ $"failures={s.TotalFailed + s.TotalTimedOut}/{s.TotalSucceeded + s.TotalFailed + s.TotalTimedOut}");
if (anyFailure)
{
Console.WriteLine($" [A={A}] *** FAILURE DETECTED: stopping sweep ***");
failureDetected = true;
}
}
// ----------------------------------------------------------------
// CSV output
// ----------------------------------------------------------------
var sb = new System.Text.StringBuilder();
sb.AppendLine("a,rounds,any_failures,total_succeeded,total_failed,total_timed_out," +
"mean_p50_ms,ci95_p50,mean_p99_ms,ci95_p99,mean_wall_ms,ci95_wall");
foreach (var r in summary)
{
sb.AppendLine(string.Create(CultureInfo.InvariantCulture,
$"{r.A},{r.Rounds},{r.AnyFailures},{r.TotalSucceeded},{r.TotalFailed},{r.TotalTimedOut}," +
$"{r.MeanP50:F2},{r.Ci95P50:F2},{r.MeanP99:F2},{r.Ci95P99:F2}," +
$"{r.MeanWall:F2},{r.Ci95Wall:F2}"));
}
await File.WriteAllTextAsync(outCsv, sb.ToString());
Console.WriteLine($"\n[Client-T3+] Results written to {outCsv}");
if (mode == "server" || mode == "both") await serverWh.Close();
if (mode == "client" || mode == "both") await clientWh.Close();
// ----------------------------------------------------------------
// Helpers
// ----------------------------------------------------------------
static double ConfidenceIntervalHalfWidth95(double[] xs)
{
int n = xs.Length;
if (n < 2) return 0;
double mean = xs.Average();
double sumSq = xs.Sum(x => (x - mean) * (x - mean));
double std = Math.Sqrt(sumSq / (n - 1));
double sem = std / Math.Sqrt(n);
double t = (n - 1) switch
{
1 => 12.706,
2 => 4.303,
3 => 3.182,
4 => 2.776,
5 => 2.571,
6 => 2.447,
7 => 2.365,
8 => 2.306,
9 => 2.262,
_ => 1.960
};
return t * sem;
}
static string GetArg(string[] args, string key, string def)
{
int i = Array.IndexOf(args, key);
return (i >= 0 && i + 1 < args.Length) ? args[i + 1] : def;
}
record RoundResult
{
public int Round;
public int A;
public long Succeeded;
public long Failed;
public long TimedOut;
public double WallMs;
public double P50;
public double P95;
public double P99;
public double Max;
}
record ASummary
{
public int A;
public int Rounds;
public bool AnyFailures;
public long TotalSucceeded;
public long TotalFailed;
public long TotalTimedOut;
public double MeanP50;
public double Ci95P50;
public double MeanP99;
public double Ci95P99;
public double MeanWall;
public double Ci95Wall;
}
@@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace Esiur.Tests.ConcurrentAttachSweep;
using Esiur.Resource;
[Resource]
public partial class SensorResource : Resource
{
public int SensorId { get; set; }
[Export]
public double value;
}
@@ -0,0 +1,10 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
</Project>
@@ -0,0 +1 @@
Console.WriteLine("Hello, World!");
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\..\..\Libraries\Esiur\Esiur.csproj" />
</ItemGroup>
</Project>
@@ -0,0 +1,390 @@
// ============================================================
// Scalability Extension: Fan-Out — ORCHESTRATOR CLIENT
// ------------------------------------------------------------
// Drives a full sweep of subscriber counts N against a single
// server instance. For each N value:
// 1. Spawns N in-process subscriber tasks, each opening its
// own EpConnection to the server.
// 2. Each subscriber attaches to all M resources and counts
// property-change notifications it receives over a fixed
// measurement window.
// 3. The orchestrator polls the server's sys/control resource
// to capture server-side CPU during the window.
// 4. Tears down all N subscribers and waits a settle interval
// before the next sweep point.
// 5. Repeats for `replications` rounds so the per-N mean and
// 95% confidence interval can be computed.
// 6. Auto-stops the sweep if either:
// - mean per-subscriber rate drops below 10% of theoretical,
// - or server CPU stays at >180% (>90% of 2 cores) for the
// entire measurement window.
//
// Note on in-process vs separate processes: subscribers are
// tasks within a single client process to keep the test self-
// contained and avoid spawning N OS processes. Each task uses
// its own EpConnection (TCP connection) to the server, so from
// the server's perspective the load looks identical to N
// distinct subscriber nodes for the property-propagation path.
// The single-client-process design does mean that the client
// host's CPU is shared across all subscribers; the orchestrator
// records this too so degradation can be attributed correctly.
// ------------------------------------------------------------
// Usage:
// dotnet run -- --host 127.0.0.1 --port 10900 --resources 100 \
// --emit-interval-ms 50 --window-sec 60 \
// --warmup-sec 5 --replications 3 \
// --n-values 2,5,10,20,50,100,200,500
// ============================================================
using Esiur.Protocol;
using Esiur.Resource;
using System.Data.Common;
using System.Diagnostics;
using System.Globalization;
var host = GetArg(args, "--host", "127.0.0.1");
var port = int.Parse(GetArg(args, "--port", "10900"));
var resources = int.Parse(GetArg(args, "--resources", "100"));
var emitIntervalMs = int.Parse(GetArg(args, "--emit-interval-ms", "50"));
var windowSec = int.Parse(GetArg(args, "--window-sec", "60"));
var warmupSec = int.Parse(GetArg(args, "--warmup-sec", "5"));
var settleSec = int.Parse(GetArg(args, "--settle-sec", "5"));
var replications = int.Parse(GetArg(args, "--replications", "3"));
var nValuesStr = GetArg(args, "--n-values", "2,5,10,20,50,100,200,500");
var outputCsv = GetArg(args, "--output", "fanout_sweep_results.csv");
var nValues = nValuesStr.Split(',').Select(int.Parse).ToArray();
double theoreticalMaxRate = 1000.0 / emitIntervalMs * resources;
double minAcceptableRate = theoreticalMaxRate * 0.10;
Console.WriteLine($"[Orchestrator] resources={resources} interval={emitIntervalMs}ms "
+ $"window={windowSec}s replications={replications}");
Console.WriteLine($"[Orchestrator] theoretical_max_per_subscriber_rate={theoreticalMaxRate:F0} notif/s");
Console.WriteLine($"[Orchestrator] saturation_threshold={minAcceptableRate:F0} notif/s");
Console.WriteLine($"[Orchestrator] N values: {string.Join(",", nValues)}");
// ----------------------------------------------------------------
// Attach to the server's control resource once.
// ----------------------------------------------------------------
var controlWh = new Warehouse();
dynamic? control = null;
try
{
var controlConn = await controlWh.Get<EpConnection>($"ep://{host}:{port}");
control = await controlConn.Get("sys/control");
}
catch (Exception ex)
{
Console.WriteLine($"[Orchestrator] WARNING: could not attach to sys/control: {ex.Message}");
Console.WriteLine("[Orchestrator] Server CPU will be reported as N/A.");
}
// ----------------------------------------------------------------
// All sweep points x replications, with per-N early-stop logic.
// ----------------------------------------------------------------
var allResults = new List<SweepResult>();
bool saturatedDetected = false;
foreach (int n in nValues)
{
if (saturatedDetected)
{
Console.WriteLine($"\n[Orchestrator] N={n}: SKIPPED (saturation reached at lower N)");
continue;
}
var perRepResults = new List<RepResult>();
for (int rep = 0; rep < replications; rep++)
{
Console.WriteLine($"\n[Orchestrator] === N={n} rep={rep + 1}/{replications} ===");
var subscribers = new SubscriberTask[n];
var subscriberWhs = new Warehouse[n];
// ---------- spawn N subscribers ----------
Console.WriteLine($"[Orchestrator] Spawning {n} subscribers...");
var spawnSw = Stopwatch.StartNew();
var spawnTasks = new Task<SubscriberTask?>[n];
for (int i = 0; i < n; i++)
{
int captured = i;
subscriberWhs[i] = new Warehouse();
spawnTasks[i] = SpawnSubscriber(subscriberWhs[i], host, port, resources, captured);
}
await Task.WhenAll(spawnTasks);
bool spawnFailed = false;
for (int i = 0; i < n; i++)
{
if (spawnTasks[i].Result == null) { spawnFailed = true; break; }
subscribers[i] = spawnTasks[i].Result!;
}
spawnSw.Stop();
if (spawnFailed)
{
Console.WriteLine($"[Orchestrator] N={n}: spawn failed; treating as saturation.");
saturatedDetected = true;
await TeardownAll(subscriberWhs);
break;
}
Console.WriteLine($"[Orchestrator] All {n} subscribers attached in {spawnSw.Elapsed.TotalSeconds:F2}s");
// ---------- warmup ----------
Console.WriteLine($"[Orchestrator] Warmup {warmupSec}s...");
await Task.Delay(warmupSec * 1000);
foreach (var s in subscribers) s.ResetCounters();
// ---------- measurement window with CPU sampling ----------
Console.WriteLine($"[Orchestrator] Measurement window {windowSec}s...");
var cpuSamples = new List<double>();
var connSamples = new List<int>();
var winSw = Stopwatch.StartNew();
while (winSw.Elapsed.TotalSeconds < windowSec)
{
await Task.Delay(1000);
if (control != null)
{
try
{
cpuSamples.Add((double)control.CpuPercent);
connSamples.Add((int)control.ConnectedClients);
}
catch { /* control resource may not have current value yet */ }
}
}
double elapsedSec = winSw.Elapsed.TotalSeconds;
// ---------- collect per-subscriber counts ----------
var perSubRates = new double[n];
long totalReceived = 0;
long totalLate = 0;
for (int i = 0; i < n; i++)
{
perSubRates[i] = subscribers[i].Received / elapsedSec;
totalReceived += subscribers[i].Received;
totalLate += subscribers[i].LateDeliveries;
}
double meanPerSub = perSubRates.Average();
double stdPerSub = StdDev(perSubRates);
double minPerSub = perSubRates.Min();
double maxPerSub = perSubRates.Max();
double aggregate = perSubRates.Sum();
double avgServerCpu = cpuSamples.Count > 0 ? cpuSamples.Average() : double.NaN;
double peakServerCpu = cpuSamples.Count > 0 ? cpuSamples.Max() : double.NaN;
Console.WriteLine($"[Orchestrator] N={n} rep={rep + 1}: "
+ $"mean_per_sub={meanPerSub:F1}/s "
+ $"aggregate={aggregate:F0}/s "
+ $"late={totalLate} "
+ $"server_cpu_avg={avgServerCpu:F1}% peak={peakServerCpu:F1}%");
perRepResults.Add(new RepResult
{
N = n,
Rep = rep + 1,
MeanPerSub = meanPerSub,
StdPerSub = stdPerSub,
MinPerSub = minPerSub,
MaxPerSub = maxPerSub,
Aggregate = aggregate,
LateDeliveries = totalLate,
ServerCpuAvg = avgServerCpu,
ServerCpuPeak = peakServerCpu,
});
// ---------- teardown ----------
Console.WriteLine($"[Orchestrator] Tearing down {n} subscribers...");
await TeardownAll(subscriberWhs);
await Task.Delay(settleSec * 1000);
}
// ---------- per-N aggregation ----------
if (perRepResults.Count > 0)
{
double meanOfMeans = perRepResults.Average(r => r.MeanPerSub);
double ciHalfWidth = ConfidenceIntervalHalfWidth95(
perRepResults.Select(r => r.MeanPerSub).ToArray());
Console.WriteLine($"\n[Orchestrator] N={n} SUMMARY: "
+ $"mean_per_sub={meanOfMeans:F1} ± {ciHalfWidth:F1} notif/s (95% CI)");
// Saturation detection: stop sweep if per-sub rate falls below
// 10% of theoretical OR server CPU peaked above 180% (>90% of 2 cores)
if (meanOfMeans < minAcceptableRate)
{
Console.WriteLine($"[Orchestrator] *** SATURATION DETECTED: rate {meanOfMeans:F0} < {minAcceptableRate:F0} ***");
saturatedDetected = true;
}
else if (perRepResults.Average(r => r.ServerCpuPeak) > 180.0)
{
Console.WriteLine($"[Orchestrator] *** SATURATION DETECTED: server CPU peaked > 180% ***");
saturatedDetected = true;
}
// Aggregate row for CSV
allResults.Add(new SweepResult
{
N = n,
Replications = perRepResults.Count,
MeanPerSubRate = meanOfMeans,
Ci95HalfWidth = ciHalfWidth,
MeanAggregate = perRepResults.Average(r => r.Aggregate),
TotalLate = perRepResults.Sum(r => r.LateDeliveries),
MeanServerCpuAvg = perRepResults.Average(r => r.ServerCpuAvg),
MeanServerCpuPeak = perRepResults.Average(r => r.ServerCpuPeak),
});
}
}
// ----------------------------------------------------------------
// Output
// ----------------------------------------------------------------
var sb = new System.Text.StringBuilder();
sb.AppendLine("n,replications,mean_per_sub_rate,ci95_halfwidth,mean_aggregate," +
"total_late,mean_server_cpu_avg,mean_server_cpu_peak");
foreach (var r in allResults)
{
sb.AppendLine(string.Create(CultureInfo.InvariantCulture,
$"{r.N},{r.Replications},{r.MeanPerSubRate:F2},{r.Ci95HalfWidth:F2}," +
$"{r.MeanAggregate:F1},{r.TotalLate},{r.MeanServerCpuAvg:F2},{r.MeanServerCpuPeak:F2}"));
}
await File.WriteAllTextAsync(outputCsv, sb.ToString());
Console.WriteLine($"\n[Orchestrator] Results written to {outputCsv}");
// ----------------------------------------------------------------
// Subscriber spawn / teardown
// ----------------------------------------------------------------
static async Task<SubscriberTask?> SpawnSubscriber(
Warehouse wh, string host, int port, int resources, int subId)
{
try
{
var conn = await wh.Get<EpConnection>($"ep://{host}:{port}");
var sub = new SubscriberTask { SubscriberId = subId };
for (int i = 0; i < resources; i++)
{
var proxy = await conn.Get($"sys/sensor_{i}");
long lastTick = Stopwatch.GetTimestamp();
proxy.Instance.PropertyModified += (PropertyModificationInfo data) =>
{
if (data.Name != "Value") return;
long now = Stopwatch.GetTimestamp();
double elapsedMs = (now - lastTick) * 1000.0 / Stopwatch.Frequency;
lastTick = now;
Interlocked.Increment(ref sub._received);
if (elapsedMs > 400) Interlocked.Increment(ref sub._lateDeliveries);
};
}
return sub;
}
catch (Exception ex)
{
Console.WriteLine($" [Spawn-{subId}] FAILED: {ex.Message}");
return null;
}
}
static async Task TeardownAll(Warehouse[] whs)
{
foreach (var wh in whs)
{
try { await wh.Close(); }
catch { /* ignore */ }
}
}
// ----------------------------------------------------------------
// Stats helpers
// ----------------------------------------------------------------
static double StdDev(double[] xs)
{
if (xs.Length < 2) return 0;
double mean = xs.Average();
double sumSq = xs.Sum(x => (x - mean) * (x - mean));
return Math.Sqrt(sumSq / (xs.Length - 1));
}
/// <summary>
/// 95% confidence interval half-width using Student's t-distribution.
/// For very small samples (n &lt; 3) returns 0 (not enough data).
/// t values for 95% two-sided are hard-coded; see standard tables.
/// </summary>
static double ConfidenceIntervalHalfWidth95(double[] xs)
{
int n = xs.Length;
if (n < 2) return 0;
double std = StdDev(xs);
double sem = std / Math.Sqrt(n);
// t for df=n-1, two-sided 95%
double t = (n - 1) switch
{
1 => 12.706,
2 => 4.303,
3 => 3.182,
4 => 2.776,
5 => 2.571,
6 => 2.447,
7 => 2.365,
8 => 2.306,
9 => 2.262,
_ => 1.960 // normal approximation
};
return t * sem;
}
static string GetArg(string[] args, string key, string def)
{
int i = Array.IndexOf(args, key);
return (i >= 0 && i + 1 < args.Length) ? args[i + 1] : def;
}
// ----------------------------------------------------------------
// Records
// ----------------------------------------------------------------
class SubscriberTask
{
public int SubscriberId;
internal long _received;
internal long _lateDeliveries;
public long Received => Interlocked.Read(ref _received);
public long LateDeliveries => Interlocked.Read(ref _lateDeliveries);
public void ResetCounters()
{
Interlocked.Exchange(ref _received, 0);
Interlocked.Exchange(ref _lateDeliveries, 0);
}
}
record RepResult
{
public int N;
public int Rep;
public double MeanPerSub;
public double StdPerSub;
public double MinPerSub;
public double MaxPerSub;
public double Aggregate;
public long LateDeliveries;
public double ServerCpuAvg;
public double ServerCpuPeak;
}
record SweepResult
{
public int N;
public int Replications;
public double MeanPerSubRate;
public double Ci95HalfWidth;
public double MeanAggregate;
public long TotalLate;
public double MeanServerCpuAvg;
public double MeanServerCpuPeak;
}
@@ -7,6 +7,14 @@
<Nullable>enable</Nullable> <Nullable>enable</Nullable>
</PropertyGroup> </PropertyGroup>
<ItemGroup>
<Compile Remove="Program.cs" />
</ItemGroup>
<ItemGroup>
<None Include="Program.cs" />
</ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\..\..\Libraries\Esiur\Esiur.csproj" /> <ProjectReference Include="..\..\..\..\Libraries\Esiur\Esiur.csproj" />
</ItemGroup> </ItemGroup>
@@ -0,0 +1,176 @@
// ============================================================
// Test 4: Fork-Join Queueing Test — CLIENT NODE (REPLICATED)
//
// Extends the original single-shot client to run K independent
// replications of each (delay, α) configuration so that 95%
// confidence intervals can be reported for the metrics in
// Table III (λ, μ, R̄, δ̄, D̄, P99(D), queue length, batch B).
//
// Each replication uses an identical configuration; the server
// runs StartUpdatesLocal back-to-back, and the client snapshots
// the cumulative finished-queue length between replications so
// that each replication's evaluation sees only its own items.
//
// Usage:
// dotnet run -- --host 127.0.0.1 --port 10901 \
// --trials 1000 \
// --delays 5:10:20:30:50:100 \
// --alphas 0.0:0.25:0.5:0.75:1.0 \
// --replications 5 \
// --output forkjoin_replicated.csv
// ============================================================
using Esiur.Data;
using Esiur.Protocol;
using Esiur.Resource;
using Esiur.Tests.Queueing.Client;
using System.ComponentModel;
using System.Globalization;
using System.IO;
// ---------- arguments ----------
var host = GetArg(args, "--host", "127.0.0.1");
var port = int.Parse(GetArg(args, "--port", "10901"));
var trials = int.Parse(GetArg(args, "--trials", "1000"));
var replications = int.Parse(GetArg(args, "--replications", "5"));
var settleMs = int.Parse(GetArg(args, "--settle-ms", "1000"));
var outputCsv = GetArg(args, "--output", "forkjoin_replicated.csv");
var delays = GetArg(args, "--delays", "5:10:20:30:50:100")
.Split(':').Select(int.Parse).ToArray();
var alphas = GetArg(args, "--alphas", "0.0:0.25:0.5:0.75:1.0")
.Split(':').Select(s => double.Parse(s, CultureInfo.InvariantCulture)).ToArray();
Console.WriteLine($"[Client-T4-R] Connecting to {host}:{port}");
Console.WriteLine($"[Client-T4-R] trials/rep={trials} replications={replications} " +
$"settle={settleMs}ms");
Console.WriteLine($"[Client-T4-R] delays={string.Join(",", delays)}");
Console.WriteLine($"[Client-T4-R] alphas={string.Join(",", alphas.Select(a => a.ToString("F2", CultureInfo.InvariantCulture)))}");
Console.WriteLine($"[Client-T4-R] {delays.Length * alphas.Length} configurations × {replications} reps " +
$"= {delays.Length * alphas.Length * replications} trial runs");
// ---------- connect ----------
var wh = new Warehouse();
var serviceResource = await wh.Get<EpResource>($"ep://{host}:{port}/sys/queueing");
var service = (dynamic)serviceResource;
// ---------- replication coordinator state ----------
//
// The server's StartUpdatesLocal fires `trials` PropertyChanged events
// across a single call. We count incoming events; when `trials` arrive,
// the current replication is complete. We then slice off this rep's
// portion of the cumulative finished-queue and hand it to QueueEval.
//
// `repDone` is signaled once per replication so the orchestrator coroutine
// can drive the next call.
int eventsThisRep = 0;
TaskCompletionSource<bool> repDone = new(TaskCreationOptions.RunContinuationsAsynchronously);
int finishedQueueBaseline = 0; // cumulative length BEFORE current rep started
serviceResource.PropertyChanged += (object? sender, PropertyChangedEventArgs e) =>
{
int n = Interlocked.Increment(ref eventsThisRep);
if (n == trials)
{
repDone.TrySetResult(true);
}
};
// ---------- main sweep ----------
var rows = new List<ReplicatedResult>();
using var writer = new StreamWriter(outputCsv);
writer.WriteLine(ReplicatedEvalAggregator.CsvHeader);
writer.Flush();
foreach (var delay in delays)
{
foreach (var alpha in alphas)
{
Console.WriteLine();
Console.WriteLine($"[Client-T4-R] >>> delay={delay} ms α={alpha:F2} " +
$"(running {replications} replications) <<<");
var reps = new List<EsiurQueueEval.EvalResult>(replications);
for (int rep = 0; rep < replications; rep++)
{
// Reset per-rep state
Interlocked.Exchange(ref eventsThisRep, 0);
repDone = new TaskCompletionSource<bool>(
TaskCreationOptions.RunContinuationsAsynchronously);
// Snapshot the cumulative finished-queue length right before this rep
// so we can slice off only this rep's portion afterwards.
var preQueue = service.DistributedResourceConnection.GetFinishedQueue();
finishedQueueBaseline = preQueue.Count;
// Kick off the server-driven trial sequence (fire-and-forget;
// completion is signalled via PropertyChanged → repDone).
service.StartUpdatesLocal(delay, trials, alpha);
// Wait until `trials` PropertyChanged events have been received.
await repDone.Task;
// The server completed `trials` events; slice off this rep's
// portion of the cumulative finished-queue. GetFinishedQueue()
// returns IReadOnlyList<AsyncQueueItem<T>>; we forward the
// typed sliced subset directly to Evaluate which is generic
// on T (the property's runtime payload type).
var fullQueue = service.DistributedResourceConnection.GetFinishedQueue();
var typedQueue = SliceQueue(fullQueue, finishedQueueBaseline);
var repResult = EsiurQueueEval.Evaluate(typedQueue);
reps.Add(repResult);
Console.WriteLine($" rep {rep + 1}/{replications}: " +
$"λ={repResult.LambdaEventsPerSecond:F1}/s " +
$"R̄={repResult.Latency.ReadinessMs.Mean:F1}ms " +
$"δ̄={repResult.Latency.HolMs.Mean:F1}ms " +
$"D̄={repResult.Latency.EndToEndMs.Mean:F1}ms");
// Settle period between reps to let any straggler notifications drain
// and to keep the per-rep arrivals statistically independent of any
// residual server state from the previous rep.
await Task.Delay(settleMs);
}
var agg = ReplicatedEvalAggregator.Aggregate(delay, alpha, reps);
rows.Add(agg);
ReplicatedEvalAggregator.PrintSummary(agg);
// Append to CSV immediately so partial progress is preserved
// if the process is killed mid-sweep.
writer.WriteLine(ReplicatedEvalAggregator.ToCsvRow(agg));
writer.Flush();
}
}
Console.WriteLine();
Console.WriteLine($"[Client-T4-R] Done. {rows.Count} configurations written to {outputCsv}");
Environment.Exit(0);
// ----------------------------------------------------------------
static string GetArg(string[] args, string key, string def)
{
int i = Array.IndexOf(args, key);
return (i >= 0 && i + 1 < args.Length) ? args[i + 1] : def;
}
// ----------------------------------------------------------------
// Slice the cumulative finished-queue down to only the items added
// during the current replication.
//
// The queue is dynamically typed (returned from a dynamic-dispatched
// member) and its element type is AsyncQueueItem<T> where T is the
// runtime payload type of the observed property. We rely on the DLR
// to bind the LINQ Skip<T>/ToList<T> generic methods at runtime, just
// as the original code does with the Evaluate<T> call below it.
// ----------------------------------------------------------------
static dynamic SliceQueue(dynamic fullQueue, int skipCount)
{
return System.Linq.Enumerable.ToList(
System.Linq.Enumerable.Skip(fullQueue, skipCount));
}
@@ -0,0 +1,183 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
namespace Esiur.Tests.Queueing.Client
{
/// <summary>
/// Point estimate accompanied by a 95% confidence-interval half-width
/// (computed with Student's t for small samples). Use ToString() to
/// render as "mean ± half" in print output.
/// </summary>
public readonly record struct MeanCi(double Mean, double Ci95HalfWidth, int N)
{
public static MeanCi From(IEnumerable<double> xs)
{
var arr = xs.ToArray();
int n = arr.Length;
if (n == 0) return new MeanCi(0, 0, 0);
if (n == 1) return new MeanCi(arr[0], 0, 1);
double mean = arr.Average();
double sumSq = 0;
for (int i = 0; i < n; i++)
{
double d = arr[i] - mean;
sumSq += d * d;
}
double std = Math.Sqrt(sumSq / (n - 1));
double sem = std / Math.Sqrt(n);
// Student's t two-sided 95% for small df. df = n - 1.
// Values from standard tables; ≥10 falls back to normal (1.960).
double t = (n - 1) switch
{
1 => 12.706,
2 => 4.303,
3 => 3.182,
4 => 2.776,
5 => 2.571,
6 => 2.447,
7 => 2.365,
8 => 2.306,
9 => 2.262,
10 => 2.228,
11 => 2.201,
12 => 2.179,
13 => 2.160,
14 => 2.145,
15 => 2.131,
16 => 2.120,
17 => 2.110,
18 => 2.101,
19 => 2.093,
20 => 2.086,
_ => 1.960 // normal approximation for df > 20
};
return new MeanCi(mean, t * sem, n);
}
public override string ToString() =>
N <= 1
? Mean.ToString("F2", CultureInfo.InvariantCulture)
: string.Create(CultureInfo.InvariantCulture,
$"{Mean:F2}±{Ci95HalfWidth:F2}");
}
/// <summary>
/// Aggregated result over K replications of the same (delay, alpha)
/// configuration. Carries point estimates plus per-metric 95% CI
/// half-widths for the headline metrics reported in the paper:
/// arrival rate λ, service rate μ, mean readiness R̄, mean HOL δ̄,
/// and mean end-to-end latency D̄.
///
/// The companion <see cref="EsiurQueueEval.EvalResult"/> field
/// (PerRepMean) holds the existing-style averaged point estimates
/// so downstream code that already consumed EvalResult continues
/// to work unchanged.
/// </summary>
public sealed record ReplicatedResult(
int Delay,
double Alpha,
int Replications,
MeanCi Lambda,
MeanCi Mu,
MeanCi ReadinessMeanMs,
MeanCi HolMeanMs,
MeanCi EndToEndMeanMs,
MeanCi EndToEndP99Ms,
MeanCi QueueLengthMean,
MeanCi BatchSizeMean,
EsiurQueueEval.EvalResult PerRepMean);
public static class ReplicatedEvalAggregator
{
/// <summary>
/// Combine K per-replication EvalResult objects into a single
/// ReplicatedResult, computing point estimates and 95% CIs.
/// </summary>
public static ReplicatedResult Aggregate(
int delay,
double alpha,
IReadOnlyList<EsiurQueueEval.EvalResult> reps)
{
if (reps == null) throw new ArgumentNullException(nameof(reps));
if (reps.Count == 0) throw new ArgumentException("reps is empty.", nameof(reps));
var lambda = MeanCi.From(reps.Select(r => r.LambdaEventsPerSecond));
var mu = MeanCi.From(reps.Select(r => r.MuEventsPerSecond));
var readiness = MeanCi.From(reps.Select(r => r.Latency.ReadinessMs.Mean));
var hol = MeanCi.From(reps.Select(r => r.Latency.HolMs.Mean));
var e2eMean = MeanCi.From(reps.Select(r => r.Latency.EndToEndMs.Mean));
var e2eP99 = MeanCi.From(reps.Select(r => r.Latency.EndToEndMs.P99));
var qLen = MeanCi.From(reps.Select(r => r.QueueLength.Mean));
var batch = MeanCi.From(reps.Select(
r => r.FlushSizeStats?.Mean ?? double.NaN)
.Where(v => !double.IsNaN(v)));
// Use the existing Average helper for the carry-along point estimates.
var perRepMean = EsiurQueueEval.Average(reps);
return new ReplicatedResult(
Delay: delay,
Alpha: alpha,
Replications: reps.Count,
Lambda: lambda,
Mu: mu,
ReadinessMeanMs: readiness,
HolMeanMs: hol,
EndToEndMeanMs: e2eMean,
EndToEndP99Ms: e2eP99,
QueueLengthMean: qLen,
BatchSizeMean: batch,
PerRepMean: perRepMean);
}
public static string CsvHeader =>
"delay_ms,alpha,replications," +
"lambda_mean,lambda_ci95," +
"mu_mean,mu_ci95," +
"readiness_mean_ms,readiness_ci95," +
"hol_mean_ms,hol_ci95," +
"e2e_mean_ms,e2e_ci95," +
"e2e_p99_ms,e2e_p99_ci95," +
"queue_len_mean,queue_len_ci95," +
"batch_mean,batch_ci95";
public static string ToCsvRow(ReplicatedResult r)
{
var inv = CultureInfo.InvariantCulture;
return string.Create(inv,
$"{r.Delay},{r.Alpha:F3},{r.Replications}," +
$"{r.Lambda.Mean:F3},{r.Lambda.Ci95HalfWidth:F3}," +
$"{r.Mu.Mean:F3},{r.Mu.Ci95HalfWidth:F3}," +
$"{r.ReadinessMeanMs.Mean:F3},{r.ReadinessMeanMs.Ci95HalfWidth:F3}," +
$"{r.HolMeanMs.Mean:F3},{r.HolMeanMs.Ci95HalfWidth:F3}," +
$"{r.EndToEndMeanMs.Mean:F3},{r.EndToEndMeanMs.Ci95HalfWidth:F3}," +
$"{r.EndToEndP99Ms.Mean:F3},{r.EndToEndP99Ms.Ci95HalfWidth:F3}," +
$"{r.QueueLengthMean.Mean:F3},{r.QueueLengthMean.Ci95HalfWidth:F3}," +
$"{r.BatchSizeMean.Mean:F3},{r.BatchSizeMean.Ci95HalfWidth:F3}");
}
/// <summary>
/// Console-friendly compact summary, one configuration per call.
/// </summary>
public static void PrintSummary(ReplicatedResult r)
{
Console.WriteLine();
Console.WriteLine($"=== Configuration: delay={r.Delay} ms, α={r.Alpha:F2}, " +
$"replications={r.Replications} ===");
Console.WriteLine("Metric | Mean ± 95% CI half-width");
Console.WriteLine("----------------+----------------------------------------");
Console.WriteLine($"λ (/s) | {r.Lambda}");
Console.WriteLine($"μ (/s) | {r.Mu}");
Console.WriteLine($"R̄ (ms) | {r.ReadinessMeanMs}");
Console.WriteLine($"δ̄ (ms) | {r.HolMeanMs}");
Console.WriteLine($"D̄ (ms) | {r.EndToEndMeanMs}");
Console.WriteLine($"P99(D) (ms) | {r.EndToEndP99Ms}");
Console.WriteLine($"Queue length | {r.QueueLengthMean}");
Console.WriteLine($"Batch size B | {r.BatchSizeMean}");
}
}
}
@@ -0,0 +1,511 @@
using System;
using System.Collections.Generic;
using System.Linq;
#nullable enable
namespace Esiur.Tests.ComplexModel;
// ============================================================================
// DdsCdrCodec.cs
// ----------------------------------------------------------------------------
// Implements ICodec by encoding BusinessDocument as OMG XCDR2 (Extended CDR
// version 2, PLAIN_CDR2, all types FINAL). This is the on-the-wire payload
// format used by every conformant DDS implementation per DDS-XTypes 1.3 for
// final-extensibility types.
//
// The corresponding IDL definition lives in BusinessDocument.idl alongside
// this file. The struct layout, member order, and union discriminator
// values below MUST match the IDL exactly; a divergence between this code
// and the IDL would produce a wire format incompatible with real DDS
// implementations, defeating the purpose of the benchmark.
//
// Encoding choices documented inline. Where the spec offered multiple
// equivalent options (e.g., DateTime as int64 ticks vs. struct), the choice
// that minimizes wire size was selected, so that this measurement reports
// the most favorable DDS payload size achievable for this schema.
// ============================================================================
public sealed class DdsCdrCodec : ICodec
{
public string Name => "DDS-XCDR2";
public byte[]? Serialize(BusinessDocument obj)
{
var w = new Xcdr2Writer(capacity: 8192);
WriteBusinessDocument(w, obj);
var bytes = w.ToArray();
// Optional self-check (cheap): decode and compare. Mirrors the
// pattern used by FlatBuffersCodec and ProtobufCodec in
// ModelRunner.cs for outside-the-timing-loop validation.
// Disabled by default to keep parity with most other codecs; the
// outer harness performs equality testing.
return bytes;
}
public BusinessDocument Deserialize(byte[] data)
{
var r = new Xcdr2Reader(data);
return ReadBusinessDocument(r);
}
// ---- BusinessDocument (FINAL struct, top-level) -----------------------
//
// In XCDR2 the top-level FINAL struct does NOT carry a DHEADER. It is
// emitted as a plain concatenation of members in declaration order.
// (DDS-XTypes 1.3 §7.4.3.5.3 rule 7: FINAL aggregated types use
// PLAIN_CDR2 without delimiter.)
private static void WriteBusinessDocument(Xcdr2Writer w, BusinessDocument d)
{
// member 0: optional<DocumentHeader> Header
WriteOptionalStruct(w, d.Header, WriteDocumentHeader);
// member 1: optional<Party> Seller
WriteOptionalStruct(w, d.Seller, WriteParty);
// member 2: optional<Party> Buyer
WriteOptionalStruct(w, d.Buyer, WriteParty);
// member 3: sequence<LineItem> Items
WriteSequenceOfStruct(w, d.Items, WriteLineItem);
// member 4: sequence<Payment> Payments
WriteSequenceOfStruct(w, d.Payments, WritePayment);
// member 5: sequence<Attachment> Attachments
WriteSequenceOfStruct(w, d.Attachments, WriteAttachment);
// member 6: sequence<int32> RiskScores (primitive, no DHEADER)
WriteSequenceOfInt32(w, d.RiskScores);
}
private static BusinessDocument ReadBusinessDocument(Xcdr2Reader r)
{
var d = new BusinessDocument
{
Header = ReadOptionalStruct(r, ReadDocumentHeader),
Seller = ReadOptionalStruct(r, ReadParty),
Buyer = ReadOptionalStruct(r, ReadParty),
Items = ReadSequenceOfStruct(r, ReadLineItem),
Payments = ReadSequenceOfStruct(r, ReadPayment),
Attachments = ReadSequenceOfStruct(r, ReadAttachment),
RiskScores = ReadSequenceOfInt32(r),
};
return d;
}
// ---- DocumentHeader ---------------------------------------------------
private static void WriteDocumentHeader(Xcdr2Writer w, DocumentHeader h)
{
// member 0: sequence<octet> DocId (primitive seq, no DHEADER)
w.WriteOctetSequence(h.DocId ?? Array.Empty<byte>());
// member 1: DocType Type (enum -> int32)
w.WriteInt32((int)h.Type);
// member 2: int32 Version
w.WriteInt32(h.Version);
// member 3: int64 CreatedAtTicks
w.WriteInt64(h.CreatedAt.Ticks);
// member 4: optional<int64> UpdatedAtTicks
if (h.UpdatedAt.HasValue)
{
w.WriteBool(true);
w.WriteInt64(h.UpdatedAt.Value.Ticks);
}
else
{
w.WriteBool(false);
}
// member 5: Currency (enum -> int32)
w.WriteInt32((int)h.Currency);
// member 6: optional<string> Notes
WriteOptionalString(w, h.Notes);
// member 7: sequence<MetaEntry> Meta (non-primitive seq -> DHEADER)
WriteVariantDictionary(w, h.Meta);
}
private static DocumentHeader ReadDocumentHeader(Xcdr2Reader r)
{
var h = new DocumentHeader();
h.DocId = r.ReadOctetSequence();
h.Type = (DocType)r.ReadInt32();
h.Version = r.ReadInt32();
h.CreatedAt = new DateTime(r.ReadInt64());
h.UpdatedAt = r.ReadBool() ? new DateTime(r.ReadInt64()) : (DateTime?)null;
h.Currency = (Currency)r.ReadInt32();
h.Notes = ReadOptionalString(r);
h.Meta = ReadVariantDictionary(r);
if (h.Meta != null)
{
h.MetaKeys = h.Meta.Keys.ToArray();
h.MetaValues = h.Meta.Values.ToArray();
}
return h;
}
// ---- Party ------------------------------------------------------------
private static void WriteParty(Xcdr2Writer w, Party p)
{
w.WriteUInt64(p.Id);
w.WriteString(p.Name);
WriteOptionalString(w, p.TaxId);
WriteOptionalString(w, p.Email);
WriteOptionalString(w, p.Phone);
// optional<Address>
if (p.Address != null)
{
w.WriteBool(true);
WriteAddress(w, p.Address);
}
else
{
w.WriteBool(false);
}
WriteOptionalString(w, p.PreferredLanguage);
}
private static Party ReadParty(Xcdr2Reader r)
{
return new Party
{
Id = r.ReadUInt64(),
Name = r.ReadString(),
TaxId = ReadOptionalString(r),
Email = ReadOptionalString(r),
Phone = ReadOptionalString(r),
Address = r.ReadBool() ? ReadAddress(r) : null,
PreferredLanguage = ReadOptionalString(r),
};
}
// ---- Address ----------------------------------------------------------
private static void WriteAddress(Xcdr2Writer w, Address a)
{
w.WriteString(a.Line1);
WriteOptionalString(w, a.Line2);
w.WriteString(a.City);
w.WriteString(a.Region);
w.WriteString(a.Country);
WriteOptionalString(w, a.PostalCode);
}
private static Address ReadAddress(Xcdr2Reader r)
{
return new Address
{
Line1 = r.ReadString(),
Line2 = ReadOptionalString(r),
City = r.ReadString(),
Region = r.ReadString(),
Country = r.ReadString(),
PostalCode = ReadOptionalString(r),
};
}
// ---- LineItem ---------------------------------------------------------
private static void WriteLineItem(Xcdr2Writer w, LineItem li)
{
w.WriteInt32(li.LineNo);
w.WriteInt32((int)li.Type);
w.WriteString(li.SKU);
w.WriteString(li.Description);
w.WriteDouble(li.Qty);
w.WriteString(li.QtyUnit);
w.WriteDouble(li.UnitPrice);
WriteOptionalDouble(w, li.VatRate);
WriteOptionalDouble(w, li.Discount);
WriteVariantDictionary(w, li.Ext);
}
private static LineItem ReadLineItem(Xcdr2Reader r)
{
var li = new LineItem
{
LineNo = r.ReadInt32(),
Type = (LineType)r.ReadInt32(),
SKU = r.ReadString(),
Description = r.ReadString(),
Qty = r.ReadDouble(),
QtyUnit = r.ReadString(),
UnitPrice = r.ReadDouble(),
VatRate = ReadOptionalDouble(r),
Discount = ReadOptionalDouble(r),
Ext = ReadVariantDictionary(r),
};
if (li.Ext != null)
{
li.ExtKeys = li.Ext.Keys.ToArray();
li.ExtValues = li.Ext.Values.ToArray();
}
return li;
}
// ---- Payment ----------------------------------------------------------
private static void WritePayment(Xcdr2Writer w, Payment p)
{
w.WriteInt32((int)p.Method);
w.WriteDouble(p.Amount);
WriteOptionalString(w, p.Reference);
w.WriteInt64(p.Timestamp.Ticks);
WriteOptionalDouble(w, p.Fee);
}
private static Payment ReadPayment(Xcdr2Reader r)
{
return new Payment
{
Method = (PaymentMethod)r.ReadInt32(),
Amount = r.ReadDouble(),
Reference = ReadOptionalString(r),
Timestamp = new DateTime(r.ReadInt64()),
Fee = ReadOptionalDouble(r),
};
}
// ---- Attachment -------------------------------------------------------
private static void WriteAttachment(Xcdr2Writer w, Attachment a)
{
w.WriteString(a.Name);
w.WriteString(a.MimeType);
w.WriteOctetSequence(a.Data);
}
private static Attachment ReadAttachment(Xcdr2Reader r)
{
return new Attachment
{
Name = r.ReadString(),
MimeType = r.ReadString(),
Data = r.ReadOctetSequence(),
};
}
// ---- Variant (union discriminated by Kind) ----------------------------
//
// IDL mapping (see BusinessDocument.idl):
// union Variant switch(int32 /* Kind */) {
// case 0 (Null): /* no member */;
// case 1 (Bool): boolean b;
// case 2 (Int64): int64 i64;
// case 3 (UInt64): uint64 u64;
// case 4 (Double): double d;
// case 6 (String): string s;
// case 7 (Bytes): sequence<octet> by;
// case 8 (DateTime): int64 dt; // ticks
// case 9 (Guid): octet[16] g;
// };
//
// XCDR2 union encoding: int32 discriminator + the selected branch.
private static void WriteVariant(Xcdr2Writer w, Variant v)
{
int tag = (int)v.Tag;
w.WriteInt32(tag);
switch (v.Tag)
{
case Variant.Kind.Null:
break;
case Variant.Kind.Bool:
w.WriteBool(v.Bool ?? false);
break;
case Variant.Kind.Int64:
w.WriteInt64(v.I64 ?? 0);
break;
case Variant.Kind.UInt64:
w.WriteUInt64(v.U64 ?? 0);
break;
case Variant.Kind.Double:
case Variant.Kind.Decimal: // Decimal mapped to double in IDL
w.WriteDouble(v.F64 ?? 0.0);
break;
case Variant.Kind.String:
w.WriteString(v.Str ?? "");
break;
case Variant.Kind.Bytes:
w.WriteOctetSequence(v.Bytes ?? Array.Empty<byte>());
break;
case Variant.Kind.DateTime:
w.WriteInt64(v.Dt?.Ticks ?? v.DtAsLong);
break;
case Variant.Kind.Guid:
w.WriteOctetArrayFixed(v.Guid ?? new byte[16], 16);
break;
default:
throw new InvalidOperationException($"Unknown Variant kind {v.Tag}");
}
}
private static Variant ReadVariant(Xcdr2Reader r)
{
var tag = (Variant.Kind)r.ReadInt32();
var v = new Variant { Tag = tag };
switch (tag)
{
case Variant.Kind.Null:
break;
case Variant.Kind.Bool:
v.Bool = r.ReadBool();
break;
case Variant.Kind.Int64:
v.I64 = r.ReadInt64();
break;
case Variant.Kind.UInt64:
v.U64 = r.ReadUInt64();
break;
case Variant.Kind.Double:
case Variant.Kind.Decimal:
v.F64 = r.ReadDouble();
break;
case Variant.Kind.String:
v.Str = r.ReadString();
break;
case Variant.Kind.Bytes:
v.Bytes = r.ReadOctetSequence();
break;
case Variant.Kind.DateTime:
{
long ticks = r.ReadInt64();
v.Dt = new DateTime(ticks);
v.DtAsLong = ticks;
break;
}
case Variant.Kind.Guid:
v.Guid = r.ReadOctetArrayFixed(16);
break;
default:
throw new InvalidOperationException($"Unknown Variant kind {tag}");
}
return v;
}
// ---- Dictionary<string, Variant> -> sequence<MetaEntry> ---------------
//
// IDL: struct MetaEntry { string key; Variant value; };
// sequence<MetaEntry> ...
//
// Non-primitive sequence: per XCDR2 rule 15, MUST emit DHEADER before
// the sequence length and elements.
private static void WriteVariantDictionary(Xcdr2Writer w, Dictionary<string, Variant>? dict)
{
int token = w.BeginDHeader();
if (dict == null)
{
w.WriteUInt32(0);
}
else
{
w.WriteUInt32((uint)dict.Count);
foreach (var kv in dict)
{
w.WriteString(kv.Key);
WriteVariant(w, kv.Value);
}
}
w.EndDHeader(token);
}
private static Dictionary<string, Variant>? ReadVariantDictionary(Xcdr2Reader r)
{
_ = r.ReadDHeader(); // size, ignored (schema-driven decode)
uint n = r.ReadUInt32();
if (n == 0) return new Dictionary<string, Variant>();
var d = new Dictionary<string, Variant>((int)n);
for (uint i = 0; i < n; i++)
{
var k = r.ReadString();
var v = ReadVariant(r);
d[k] = v;
}
return d;
}
// ---- helpers: optional<T> and sequence<T> -----------------------------
private static void WriteOptionalString(Xcdr2Writer w, string? s)
{
if (s is null) { w.WriteBool(false); }
else { w.WriteBool(true); w.WriteString(s); }
}
private static string? ReadOptionalString(Xcdr2Reader r)
=> r.ReadBool() ? r.ReadString() : null;
private static void WriteOptionalDouble(Xcdr2Writer w, double? v)
{
if (v is null) { w.WriteBool(false); }
else { w.WriteBool(true); w.WriteDouble(v.Value); }
}
private static double? ReadOptionalDouble(Xcdr2Reader r)
=> r.ReadBool() ? r.ReadDouble() : null;
private static void WriteOptionalStruct<T>(
Xcdr2Writer w, T? value, Action<Xcdr2Writer, T> writeInner)
where T : class
{
if (value is null) { w.WriteBool(false); }
else { w.WriteBool(true); writeInner(w, value); }
}
private static T? ReadOptionalStruct<T>(
Xcdr2Reader r, Func<Xcdr2Reader, T> readInner)
where T : class
{
return r.ReadBool() ? readInner(r) : null;
}
private static void WriteSequenceOfStruct<T>(
Xcdr2Writer w, T[]? arr, Action<Xcdr2Writer, T> writeInner)
{
// Non-primitive sequence: DHEADER required.
int token = w.BeginDHeader();
if (arr is null)
{
w.WriteUInt32(0);
}
else
{
w.WriteUInt32((uint)arr.Length);
for (int i = 0; i < arr.Length; i++)
writeInner(w, arr[i]);
}
w.EndDHeader(token);
}
private static T[] ReadSequenceOfStruct<T>(
Xcdr2Reader r, Func<Xcdr2Reader, T> readInner)
{
_ = r.ReadDHeader();
uint n = r.ReadUInt32();
var arr = new T[n];
for (uint i = 0; i < n; i++)
arr[i] = readInner(r);
return arr;
}
private static void WriteSequenceOfInt32(Xcdr2Writer w, int[]? arr)
{
// Primitive sequence: NO DHEADER per XCDR2 rule 14.
if (arr is null)
{
w.WriteUInt32(0);
}
else
{
w.WriteUInt32((uint)arr.Length);
for (int i = 0; i < arr.Length; i++)
w.WriteInt32(arr[i]);
}
}
private static int[] ReadSequenceOfInt32(Xcdr2Reader r)
{
uint n = r.ReadUInt32();
var arr = new int[n];
for (uint i = 0; i < n; i++)
arr[i] = r.ReadInt32();
return arr;
}
}
@@ -280,12 +280,13 @@ public sealed class ModelRunner
{ {
new JsonCodec(), new JsonCodec(),
new EsiurCodec(), new EsiurCodec(),
new DdsCdrCodec(),
new MessagePackCodec(), new MessagePackCodec(),
new ProtobufCodec(), new ProtobufCodec(),
new FlatBuffersCodec(), new FlatBuffersCodec(),
new CborCodec(), new CborCodec(),
new BsonCodec(), new BsonCodec(),
new AvroCodec() new AvroCodec(),
}; };
} }
@@ -380,7 +381,7 @@ public sealed class ModelRunner
Console.WriteLine(); Console.WriteLine();
Console.WriteLine("{0,-14} {1,12} {2,12} {3,10} {4,26} {5,18} {6,18}", Console.WriteLine("{0,-14} {1,12} {2,12} {3,10} {4,26} {5,18} {6,18}",
"Codec", "Mean(B)", "Median(B)", "Ratio", "Class vs JSON", "Enc CPU (µs)", "Dec CPU (µs)"); "Codec", "Mean(B)", "Median(B)", "Ratio", "Reduction", "Enc CPU (µs)", "Dec CPU (µs)");
Console.WriteLine(new string('-', 118)); Console.WriteLine(new string('-', 118));
foreach (var c in _codecs) foreach (var c in _codecs)
@@ -397,18 +398,19 @@ public sealed class ModelRunner
string meanS = double.IsNaN(mean) ? "N/A" : mean.ToString("F1"); string meanS = double.IsNaN(mean) ? "N/A" : mean.ToString("F1");
string medS = double.IsNaN(med) ? "N/A" : med.ToString("F1"); string medS = double.IsNaN(med) ? "N/A" : med.ToString("F1");
string ratioS = double.IsNaN(ratio) ? "N/A" : ratio.ToString("F3"); string ratioS = double.IsNaN(ratio) ? "N/A" : ratio.ToString("F3");
string reduction = double.IsNaN(ratio) ? "N/A" : ((1 - ratio) * 100).ToString("F3");
// average CPU µs/op across samples where serialization succeeded // average CPU µs/op across samples where serialization succeeded
string encCpuS = (r.Samples == 0) ? "N/A" : (r.EncodeCpuUsSum / r.Samples).ToString("F1"); string encCpuS = (r.Samples == 0) ? "N/A" : (r.EncodeCpuUsSum / r.Samples).ToString("F1");
string decCpuS = (r.Samples == 0) ? "N/A" : (r.DecodeCpuUsSum / r.Samples).ToString("F1"); string decCpuS = (r.Samples == 0) ? "N/A" : (r.DecodeCpuUsSum / r.Samples).ToString("F1");
Console.WriteLine("{0,-14} {1,12} {2,12} {3,10} {4,26} {5,18} {6,18}", Console.WriteLine("{0,-14} {1,12} {2,12} {3,10} {4,26} {5,18} {6,18}",
c.Name, meanS, medS, ratioS, cls, encCpuS, decCpuS); c.Name, meanS, medS, ratioS, reduction, encCpuS, decCpuS);
} }
Console.WriteLine(); Console.WriteLine();
Console.ReadLine(); //Console.ReadLine();
} }
} }
@@ -0,0 +1,332 @@
using System;
using System.Buffers.Binary;
using System.IO;
using System.Text;
#nullable enable
namespace Esiur.Tests.ComplexModel;
// ============================================================================
// Xcdr2Stream.cs
// ----------------------------------------------------------------------------
// OMG Extended Common Data Representation (XCDR) Version 2 encoder/decoder.
//
// Implements PLAIN_CDR2 for FINAL-extensibility structures, the most compact
// XCDR2 mode defined by DDS-XTypes 1.3 (OMG formal/2024-04-01). This mode is
// the on-the-wire format used by every conformant DDS implementation when
// the @final annotation is applied (or no extensibility annotation is given
// and the implementation defaults to final).
//
// Implemented rules (DDS-XTypes 1.3, §7.4.3.5.3 Complete Serialization Rules):
// - Encapsulation header (4 bytes): representation_identifier (2B) +
// options (2B). We use CDR2_LE = 0x00 0x09 for the identifier and
// 0x00 0x00 for the options field.
// - Maximum alignment is 4 bytes (vs 8 in XCDR1). 64-bit primitives align
// to 4, not 8.
// - Strings: uint32 length-including-null + UTF-8 bytes + 0x00 terminator.
// - Sequences of primitives: uint32 length + elements (rule 14, no DHEADER).
// - Sequences of non-primitives: DHEADER (uint32, bytes-remaining) +
// uint32 length + elements (rule 15).
// - Optionals: 1-byte is_present + value if present (rule 9).
// - Unions (Variant): int32 discriminator aligned to 4 + selected branch.
// - Octet arrays of fixed length: emitted as-is, no length prefix.
//
// Reference implementations consulted:
// - foxglove/cdr (https://github.com/foxglove/cdr)
// - eclipse-cyclonedds dds_cdrstream.c
// - eProsima Fast-CDR
// ============================================================================
internal sealed class Xcdr2Writer
{
private byte[] _buf;
private int _pos;
public Xcdr2Writer(int capacity = 4096)
{
_buf = new byte[capacity];
_pos = 0;
WriteEncapsulationHeader();
}
public int Position => _pos;
public byte[] ToArray()
{
var result = new byte[_pos];
Buffer.BlockCopy(_buf, 0, result, 0, _pos);
return result;
}
private void WriteEncapsulationHeader()
{
// CDR2_LE representation_identifier (DDS-RTPS table 10.3)
Ensure(4);
_buf[_pos++] = 0x00;
_buf[_pos++] = 0x09;
_buf[_pos++] = 0x00;
_buf[_pos++] = 0x00;
}
// The encapsulation header is NOT counted when computing alignment, per
// DDS-XTypes 1.3 §7.4.3.4: alignment is measured from the start of the
// user payload (byte 4).
private int PayloadPos => _pos - 4;
private void Align(int n)
{
// XCDR2 caps max alignment at 4. Callers pass 1, 2, 4 only.
int mis = PayloadPos & (n - 1);
if (mis == 0) return;
int pad = n - mis;
Ensure(pad);
for (int i = 0; i < pad; i++) _buf[_pos++] = 0x00;
}
private void Ensure(int extra)
{
if (_pos + extra <= _buf.Length) return;
int newCap = _buf.Length * 2;
while (newCap < _pos + extra) newCap *= 2;
var nb = new byte[newCap];
Buffer.BlockCopy(_buf, 0, nb, 0, _pos);
_buf = nb;
}
// ---- primitive writers ----
public void WriteByte(byte v)
{
Ensure(1);
_buf[_pos++] = v;
}
public void WriteBool(bool v) => WriteByte(v ? (byte)1 : (byte)0);
public void WriteInt16(short v)
{
Align(2);
Ensure(2);
BinaryPrimitives.WriteInt16LittleEndian(_buf.AsSpan(_pos), v);
_pos += 2;
}
public void WriteUInt16(ushort v)
{
Align(2);
Ensure(2);
BinaryPrimitives.WriteUInt16LittleEndian(_buf.AsSpan(_pos), v);
_pos += 2;
}
public void WriteInt32(int v)
{
Align(4);
Ensure(4);
BinaryPrimitives.WriteInt32LittleEndian(_buf.AsSpan(_pos), v);
_pos += 4;
}
public void WriteUInt32(uint v)
{
Align(4);
Ensure(4);
BinaryPrimitives.WriteUInt32LittleEndian(_buf.AsSpan(_pos), v);
_pos += 4;
}
// XCDR2: 64-bit primitives align to 4, NOT 8 (per max-alignment rule)
public void WriteInt64(long v)
{
Align(4);
Ensure(8);
BinaryPrimitives.WriteInt64LittleEndian(_buf.AsSpan(_pos), v);
_pos += 8;
}
public void WriteUInt64(ulong v)
{
Align(4);
Ensure(8);
BinaryPrimitives.WriteUInt64LittleEndian(_buf.AsSpan(_pos), v);
_pos += 8;
}
public void WriteDouble(double v) => WriteInt64(BitConverter.DoubleToInt64Bits(v));
public void WriteString(string s)
{
var bytes = Encoding.UTF8.GetBytes(s);
WriteUInt32((uint)(bytes.Length + 1)); // includes null terminator
Ensure(bytes.Length + 1);
Buffer.BlockCopy(bytes, 0, _buf, _pos, bytes.Length);
_pos += bytes.Length;
_buf[_pos++] = 0x00; // null terminator
}
// Fixed-length octet array (e.g., 16-byte GUID).
// No length prefix; just the bytes.
public void WriteOctetArrayFixed(byte[] data, int expectedLen)
{
if (data.Length != expectedLen)
throw new ArgumentException($"Expected {expectedLen} bytes, got {data.Length}");
Ensure(expectedLen);
Buffer.BlockCopy(data, 0, _buf, _pos, expectedLen);
_pos += expectedLen;
}
// Variable-length octet sequence: uint32 length + bytes.
// No DHEADER (octet is primitive).
public void WriteOctetSequence(byte[] data)
{
WriteUInt32((uint)data.Length);
Ensure(data.Length);
Buffer.BlockCopy(data, 0, _buf, _pos, data.Length);
_pos += data.Length;
}
// DHEADER for sequences of non-primitive types and for non-final structs
// and for optionals containing complex types. Reserves 4 bytes now,
// returns a token used by EndDHeader to backfill the size.
public int BeginDHeader()
{
Align(4);
Ensure(4);
int token = _pos;
// placeholder, will be backfilled
_buf[_pos++] = 0; _buf[_pos++] = 0; _buf[_pos++] = 0; _buf[_pos++] = 0;
return token;
}
public void EndDHeader(int token)
{
// Size = number of bytes after the DHEADER, exclusive of the DHEADER
// itself. (DDS-XTypes 1.3 §7.4.3.5.1 D-HEADER definition.)
int sizeAfter = _pos - (token + 4);
BinaryPrimitives.WriteUInt32LittleEndian(_buf.AsSpan(token), (uint)sizeAfter);
}
}
internal sealed class Xcdr2Reader
{
private readonly byte[] _buf;
private int _pos;
private readonly bool _littleEndian;
public Xcdr2Reader(byte[] data)
{
_buf = data;
// Encapsulation header (4 bytes)
if (data.Length < 4) throw new IOException("Truncated XCDR2 stream");
if (data[0] != 0x00 || (data[1] != 0x09 && data[1] != 0x0A))
throw new IOException(
$"Not an XCDR2 stream (representation_identifier {data[0]:X2} {data[1]:X2})");
_littleEndian = data[1] == 0x09;
if (!_littleEndian)
throw new NotSupportedException("Only CDR2_LE is implemented in this benchmark.");
_pos = 4;
}
private int PayloadPos => _pos - 4;
private void Align(int n)
{
int mis = PayloadPos & (n - 1);
if (mis == 0) return;
_pos += (n - mis);
}
public byte ReadByte() => _buf[_pos++];
public bool ReadBool() => ReadByte() != 0;
public short ReadInt16()
{
Align(2);
var v = BinaryPrimitives.ReadInt16LittleEndian(_buf.AsSpan(_pos));
_pos += 2;
return v;
}
public ushort ReadUInt16()
{
Align(2);
var v = BinaryPrimitives.ReadUInt16LittleEndian(_buf.AsSpan(_pos));
_pos += 2;
return v;
}
public int ReadInt32()
{
Align(4);
var v = BinaryPrimitives.ReadInt32LittleEndian(_buf.AsSpan(_pos));
_pos += 4;
return v;
}
public uint ReadUInt32()
{
Align(4);
var v = BinaryPrimitives.ReadUInt32LittleEndian(_buf.AsSpan(_pos));
_pos += 4;
return v;
}
public long ReadInt64()
{
Align(4); // XCDR2 max alignment
var v = BinaryPrimitives.ReadInt64LittleEndian(_buf.AsSpan(_pos));
_pos += 8;
return v;
}
public ulong ReadUInt64()
{
Align(4);
var v = BinaryPrimitives.ReadUInt64LittleEndian(_buf.AsSpan(_pos));
_pos += 8;
return v;
}
public double ReadDouble() => BitConverter.Int64BitsToDouble(ReadInt64());
public string ReadString()
{
uint lenIncNull = ReadUInt32();
if (lenIncNull == 0)
throw new IOException("XCDR2 string length must include null terminator (>= 1)");
int payloadLen = (int)lenIncNull - 1;
var s = Encoding.UTF8.GetString(_buf, _pos, payloadLen);
_pos += payloadLen;
// consume null terminator
if (_buf[_pos] != 0x00)
throw new IOException("XCDR2 string missing null terminator");
_pos += 1;
return s;
}
public byte[] ReadOctetArrayFixed(int len)
{
var result = new byte[len];
Buffer.BlockCopy(_buf, _pos, result, 0, len);
_pos += len;
return result;
}
public byte[] ReadOctetSequence()
{
uint len = ReadUInt32();
var result = new byte[len];
Buffer.BlockCopy(_buf, _pos, result, 0, (int)len);
_pos += (int)len;
return result;
}
public int ReadDHeader()
{
// We don't actually need to use the size for decoding because we know
// the schema; we just consume the 4 bytes.
return (int)ReadUInt32();
}
}