diff --git a/Esiur.sln b/Esiur.sln
index 03080a6..e59c225 100644
--- a/Esiur.sln
+++ b/Esiur.sln
@@ -68,6 +68,16 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.ResourceCount.S
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.ConcurrentAttach", "Tests\Distribution\ConcurrentAttach\Esiur.Tests.ConcurrentAttach.csproj", "{7D88CAF1-1887-A011-BA72-F38C87C1A7D9}"
EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Queueing", "Queueing", "{6F173323-75C1-4142-A4FE-0CEC2EB5EAF9}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.Queueing.Client", "Tests\Distribution\Queueing\Client\Esiur.Tests.Queueing.Client.csproj", "{E7BF2911-582D-C403-254F-F7FC895BFD68}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Client", "Client", "{543158AD-BCB6-44A5-91AB-FE97B42A9C95}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Server", "Server", "{02A07E3C-67DB-4489-88E3-D568C477F540}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.Queueing.Server", "Tests\Distribution\Queueing\Server\Esiur.Tests.Queueing.Server.csproj", "{7FD57668-2AD8-0F53-4006-03927B5A385C}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -138,6 +148,14 @@ Global
{7D88CAF1-1887-A011-BA72-F38C87C1A7D9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7D88CAF1-1887-A011-BA72-F38C87C1A7D9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7D88CAF1-1887-A011-BA72-F38C87C1A7D9}.Release|Any CPU.Build.0 = Release|Any CPU
+ {E7BF2911-582D-C403-254F-F7FC895BFD68}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {E7BF2911-582D-C403-254F-F7FC895BFD68}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {E7BF2911-582D-C403-254F-F7FC895BFD68}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {E7BF2911-582D-C403-254F-F7FC895BFD68}.Release|Any CPU.Build.0 = Release|Any CPU
+ {7FD57668-2AD8-0F53-4006-03927B5A385C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {7FD57668-2AD8-0F53-4006-03927B5A385C}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {7FD57668-2AD8-0F53-4006-03927B5A385C}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {7FD57668-2AD8-0F53-4006-03927B5A385C}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -170,6 +188,11 @@ Global
{69A075E7-D924-59C6-0BF2-17A09201DDF3} = {413A4292-C2B3-4096-94CF-D6F607C20939}
{D1DF309F-40DE-9C0E-A78B-2648544B77D2} = {DA2EC9AF-E2D9-4B8D-8EC3-CC65CFD3B974}
{7D88CAF1-1887-A011-BA72-F38C87C1A7D9} = {336B5CE1-95DA-4FDD-A876-0919E3C446CA}
+ {6F173323-75C1-4142-A4FE-0CEC2EB5EAF9} = {94C8CFDB-C7C6-40DF-A596-647FEEA3C917}
+ {E7BF2911-582D-C403-254F-F7FC895BFD68} = {543158AD-BCB6-44A5-91AB-FE97B42A9C95}
+ {543158AD-BCB6-44A5-91AB-FE97B42A9C95} = {6F173323-75C1-4142-A4FE-0CEC2EB5EAF9}
+ {02A07E3C-67DB-4489-88E3-D568C477F540} = {6F173323-75C1-4142-A4FE-0CEC2EB5EAF9}
+ {7FD57668-2AD8-0F53-4006-03927B5A385C} = {02A07E3C-67DB-4489-88E3-D568C477F540}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {C584421D-5EC0-4821-B7D8-2633D8D405F2}
diff --git a/Tests/Distribution/Queueing/Client/Esiur.Tests.Queueing.Client.csproj b/Tests/Distribution/Queueing/Client/Esiur.Tests.Queueing.Client.csproj
new file mode 100644
index 0000000..634bddd
--- /dev/null
+++ b/Tests/Distribution/Queueing/Client/Esiur.Tests.Queueing.Client.csproj
@@ -0,0 +1,14 @@
+
+
+
+ Exe
+ net10.0
+ enable
+ enable
+
+
+
+
+
+
+
diff --git a/Tests/Distribution/Queueing/Client/Program.cs b/Tests/Distribution/Queueing/Client/Program.cs
new file mode 100644
index 0000000..4c4c4d8
--- /dev/null
+++ b/Tests/Distribution/Queueing/Client/Program.cs
@@ -0,0 +1,92 @@
+// ============================================================
+// Test 4: Fork-Join Queueing Test — CLIENT NODE
+//
+// Usage: dotnet run -- --host 127.0.0.1 --port 10901 --trials 10000
+// ============================================================
+
+using Esiur.Data;
+using Esiur.Protocol;
+using Esiur.Resource;
+using Esiur.Tests.Queueing.Client;
+using System.ComponentModel;
+using System.Diagnostics;
+using System.Diagnostics.Metrics;
+using System.Text.RegularExpressions;
+
+var results = new List();
+int counter = 0;
+
+
+
+int currentAlpha = 0;
+int currentDelay = 0;
+
+var host = GetArg(args, "--host", "127.0.0.1");
+var port = int.Parse(GetArg(args, "--port", "10901"));
+var trials = int.Parse(GetArg(args, "--trials", "1000"));
+var delays = GetArg(args, "--delays", "5:8:10:20:30:100")
+ .Split(":").Select(x => Convert.ToInt32(x)).ToArray();
+var alphas = GetArg(args, "--alphas", "0.0:0.25:0.5:0.75:1")
+ .Split(":").Select(y => Convert.ToDouble(y)).ToArray();
+
+
+Console.WriteLine($"[Client-T2] Connecting to {host}:{port}, trials={trials}");
+
+var wh = new Warehouse();
+
+var serviceResource = await wh.Get(
+ $"ep://{host}:{port}/queueing");
+
+var service = (dynamic)serviceResource;
+
+serviceResource.PropertyChanged += Service_PropertyChanged;
+
+
+
+Console.WriteLine("Starting next test: Delay=" + delays[currentDelay] + " Alpha=" + alphas[currentAlpha]);
+
+service.StartUpdatesLocal(delays[currentDelay], trials, alphas[currentAlpha]);
+
+
+
+void Service_PropertyChanged(object? sender, PropertyChangedEventArgs e)
+{
+ counter++;
+
+ if (counter == trials)
+ {
+ var queue = service.DistributedResourceConnection.GetFinishedQueue();
+ var result = EsiurQueueEval.Evaluate(queue);
+
+ Console.WriteLine(result);
+ counter = 0;
+
+ if (currentAlpha == delays.Length - 1)
+ {
+ currentAlpha = 0;
+ currentDelay++;
+ }
+ else
+ {
+ currentAlpha++;
+ }
+
+ if (currentDelay == delays.Length)
+ {
+ System.Environment.Exit(0);
+ return;
+ }
+
+ Console.WriteLine();
+ Console.WriteLine("Starting next test: Delay=" + delays[currentDelay] + " Alpha=" + alphas[currentAlpha]);
+
+ service.StartUpdatesLocal(delays[currentDelay], trials, alphas[currentAlpha]);//, 0, resourceLink);
+
+ }
+}
+
+static string GetArg(string[] args, string key, string def)
+{
+ int i = Array.IndexOf(args, key);
+ return (i >= 0 && i + 1 < args.Length) ? args[i + 1] : def;
+}
diff --git a/Tests/Distribution/Queueing/Client/QueueEval.cs b/Tests/Distribution/Queueing/Client/QueueEval.cs
new file mode 100644
index 0000000..dd3ea78
--- /dev/null
+++ b/Tests/Distribution/Queueing/Client/QueueEval.cs
@@ -0,0 +1,349 @@
+using Esiur.Core;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Esiur.Tests.Queueing.Client
+{
+ public static class EsiurQueueEval
+ {
+
+ public static class EvalPrinter
+ {
+ public static void Print(EsiurQueueEval.EvalResult r)
+ {
+ Console.WriteLine("=== Evaluation Result ===");
+ Console.WriteLine($"α (HasResource probability): {r.Alpha:F3}");
+ Console.WriteLine($"λ̂ (effective arrival rate): {r.LambdaEventsPerSecond:F2} events/s");
+ Console.WriteLine();
+
+ PrintLatencyTable(r.Latency);
+
+ Console.WriteLine();
+ PrintValidation(r.Validation);
+
+ if (r.FlushSizeStats != null)
+ {
+ Console.WriteLine();
+ PrintStats("Flush size (≤ window)", r.FlushSizeStats);
+ }
+ }
+
+ private static void PrintLatencyTable(EsiurQueueEval.LatencyDecomposition l)
+ {
+ Console.WriteLine("Latency Decomposition (ms)");
+ Console.WriteLine("-----------------------------------------------");
+ Console.WriteLine($"{"Metric",-16} {"Mean",8} {"P50",8} {"P95",8} {"P99",8} {"Max",8}");
+ Console.WriteLine("-----------------------------------------------");
+
+ PrintRow("Readiness R", l.ReadinessMs);
+ PrintRow("HOL Δ", l.HolMs);
+ PrintRow("End-to-End D", l.EndToEndMs);
+
+ Console.WriteLine("-----------------------------------------------");
+ }
+
+ private static void PrintValidation(EsiurQueueEval.ModelValidation v)
+ {
+ Console.WriteLine("Resequencing Model Validation");
+ Console.WriteLine("-----------------------------------------------");
+ Console.WriteLine("Absolute error |d − d̂| (ms)");
+ PrintStats("Error", v.AbsErrorMs);
+
+ if (v.MaxNegativeSlackMs > 0)
+ {
+ Console.WriteLine($"Max negative slack (delivered earlier than model): {v.MaxNegativeSlackMs:F4} ms");
+ }
+ else
+ {
+ Console.WriteLine("No negative slack observed (model is conservative).");
+ }
+ }
+
+ private static void PrintRow(string name, EsiurQueueEval.Stats s)
+ {
+ Console.WriteLine(
+ $"{name,-16} " +
+ $"{s.Mean,8:F2} " +
+ $"{s.P50,8:F2} " +
+ $"{s.P95,8:F2} " +
+ $"{s.P99,8:F2} " +
+ $"{s.Max,8:F2}"
+ );
+ }
+
+ private static void PrintStats(string name, EsiurQueueEval.Stats s)
+ {
+ Console.WriteLine(
+ $"{name,-20} " +
+ $"mean={s.Mean,8:F02} " +
+ $"p50={s.P50,8:F02} " +
+ $"p95={s.P95,8:F02} " +
+ $"p99={s.P99,8:F02} " +
+ $"max={s.Max,8:F02}"
+ );
+ }
+ }
+
+ public sealed record Stats(double Mean, double P50, double P95, double P99, double Max);
+
+ public sealed record LatencyDecomposition(Stats ReadinessMs, Stats HolMs, Stats EndToEndMs);
+
+ public sealed record ModelValidation(
+ Stats AbsErrorMs,
+ double MaxNegativeSlackMs, // worst case where Delivered < predicted (if happens)
+ int Count);
+
+ public sealed record EvalResult(
+ double Alpha,
+ double LambdaEventsPerSecond,
+ double MuEventsPerSecond, // <-- NEW
+ LatencyDecomposition Latency,
+ ModelValidation Validation,
+ Stats QueueLength,
+ Stats? FlushSizeStats);
+
+ ///
+ /// Evaluates Esiur fork-join readiness + in-order resequencing using in-memory items.
+ /// Assumes items refer to a single ordered stream (per resource queue).
+ ///
+ public static EvalResult Evaluate(
+ IReadOnlyList> items,
+ double flushWindowMs = 0.5,
+ bool computeFlush = true)
+ {
+ if (items == null) throw new ArgumentNullException(nameof(items));
+ if (items.Count == 0) throw new ArgumentException("items is empty.");
+
+ // Ensure deterministic order: prefer Sequence, then Arrival timestamp
+ var ordered = items
+ .OrderBy(x => x.Sequence)
+ .ThenBy(x => x.Arrival)
+ .ToArray();
+
+ int n = ordered.Length;
+
+ // Latency components in milliseconds
+ var readiness = new double[n]; // R = r-a
+ var hol = new double[n]; // Δ = d-r
+ var endToEnd = new double[n]; // D = d-a
+
+ int resCount = 0;
+ for (int i = 0; i < n; i++)
+ {
+ var e = ordered[i];
+
+ double Rms = (e.Ready - e.Arrival).TotalMilliseconds;
+ double Hms = (e.Delivered - e.Ready).TotalMilliseconds;
+ double Dms = (e.Delivered - e.Arrival).TotalMilliseconds;
+
+ // Robustness against logging placement or clock issues
+ if (Rms < 0) Rms = 0;
+ if (Hms < 0) Hms = 0;
+ if (Dms < 0) Dms = 0;
+
+ readiness[i] = Rms;
+ hol[i] = Hms;
+ endToEnd[i] = Dms;
+
+ if (e.HasResource) resCount++;
+ }
+
+ // α = P(HasResource)
+ double alpha = (double)resCount / n;
+
+
+ // Effective arrival rate λ̂ from arrival timeline
+ double lambda = EstimateLambda(ordered);
+
+ // Effective departure / readiness rate μ̂ from delivery timeline
+ double mu = EstimateMu(ordered);
+
+ var latency = new LatencyDecomposition(
+ ReadinessMs: ComputeStats(readiness),
+ HolMs: ComputeStats(hol),
+ EndToEndMs: ComputeStats(endToEnd));
+
+ // --- Resequencing validation: d_hat_i = max(r_i, d_hat_{i-1}) ---
+ // Use DateTime ticks for exactness, then convert to ms.
+ var absErrMs = new double[n];
+ long prevPredictedTicks = long.MinValue;
+ double maxNegativeSlackMs = 0;
+
+ for (int i = 0; i < n; i++)
+ {
+ long ri = ordered[i].Ready.Ticks;
+
+ long predicted = (i == 0)
+ ? ri
+ : Math.Max(ri, prevPredictedTicks);
+
+ prevPredictedTicks = predicted;
+
+ long observed = ordered[i].Delivered.Ticks;
+ long errTicks = Math.Abs(observed - predicted);
+ absErrMs[i] = TicksToMs(errTicks);
+
+ // If observed delivery occurs earlier than model predicts (shouldn't, but track it)
+ long slackTicks = observed - predicted; // negative => earlier than predicted
+ if (slackTicks < 0)
+ {
+ double slackMs = TicksToMs(-slackTicks);
+ if (slackMs > maxNegativeSlackMs) maxNegativeSlackMs = slackMs;
+ }
+ }
+
+ var validation = new ModelValidation(
+ AbsErrorMs: ComputeStats(absErrMs),
+ MaxNegativeSlackMs: maxNegativeSlackMs,
+ Count: n);
+
+ // --- Flush sizes (optional): consecutive deliveries within window ---
+ //Stats? flushStats = null;
+ //if (computeFlush)
+ //{
+ // var flushSizes = ComputeFlushSizes(ordered, flushWindowMs);
+ // flushStats = ComputeStats(flushSizes.Select(x => (double)x).ToArray());
+ //}
+
+ var queueLength = ComputeStats(ordered.Select(x => (double)x.NotificationsCountWaitingInTheQueueAtEnqueueing).ToArray());
+
+ var flushStats = ComputeStats(ordered.GroupBy(x => x.FlushId).Select(x => (double)x.First().BatchSize).ToArray());
+
+ return new EvalResult(alpha, lambda, mu, latency, validation, queueLength, flushStats);
+ }
+
+ // ---------------- Helpers ----------------
+
+ private static double EstimateLambda(AsyncQueueItem[] ordered)
+ {
+ if (ordered.Length < 2) return 0;
+
+ DateTime first = ordered[0].Arrival;
+ DateTime last = ordered[^1].Arrival;
+ double seconds = (last - first).TotalSeconds;
+ if (seconds <= 0) return 0;
+
+ // N-1 arrivals over observed interval
+ return (ordered.Length - 1) / seconds;
+ }
+
+ private static double TicksToMs(long ticks) => ticks / 10_000.0; // 1 tick = 100ns
+
+ private static int[] ComputeFlushSizes(AsyncQueueItem[] ordered, double windowMs)
+ {
+ var sizes = new List(ordered.Length);
+ long windowTicks = (long)(windowMs * 10_000.0);
+
+ int i = 0;
+ while (i < ordered.Length)
+ {
+ int j = i + 1;
+ long baseD = ordered[i].Delivered.Ticks;
+
+ while (j < ordered.Length)
+ {
+ long dj = ordered[j].Delivered.Ticks;
+ if (Math.Abs(dj - baseD) <= windowTicks) j++;
+ else break;
+ }
+
+ sizes.Add(j - i);
+ i = j;
+ }
+
+ return sizes.ToArray();
+ }
+
+ private static Stats ComputeStats(double[] values)
+ {
+ if (values.Length == 0) return new Stats(0, 0, 0, 0, 0);
+
+ double mean = values.Average();
+ double max = values.Max();
+
+ var sorted = (double[])values.Clone();
+ Array.Sort(sorted);
+
+ return new Stats(
+ Mean: mean,
+ P50: QuantileSorted(sorted, 0.50),
+ P95: QuantileSorted(sorted, 0.95),
+ P99: QuantileSorted(sorted, 0.99),
+ Max: max);
+ }
+
+ // Linear interpolation quantile
+ private static double QuantileSorted(double[] sorted, double q)
+ {
+ if (sorted.Length == 1) return sorted[0];
+
+ double pos = (sorted.Length - 1) * q;
+ int lo = (int)Math.Floor(pos);
+ int hi = (int)Math.Ceiling(pos);
+ if (lo == hi) return sorted[lo];
+
+ double frac = pos - lo;
+ return sorted[lo] + (sorted[hi] - sorted[lo]) * frac;
+ }
+
+ // Compute the element-wise average of a sequence of EvalResult
+ public static EvalResult Average(IEnumerable results)
+ {
+ if (results == null) throw new ArgumentNullException(nameof(results));
+ var arr = results.ToArray();
+ if (arr.Length == 0) throw new ArgumentException("results is empty.", nameof(results));
+
+ double avgAlpha = arr.Average(r => r.Alpha);
+ double avgLambda = arr.Average(r => r.LambdaEventsPerSecond);
+ double avgMu = arr.Average(r => r.MuEventsPerSecond);
+
+ Stats avgReadiness = AverageStats(arr.Select(r => r.Latency.ReadinessMs));
+ Stats avgHol = AverageStats(arr.Select(r => r.Latency.HolMs));
+ Stats avgE2E = AverageStats(arr.Select(r => r.Latency.EndToEndMs));
+
+ var avgLatency = new LatencyDecomposition(avgReadiness, avgHol, avgE2E);
+
+ Stats avgAbsError = AverageStats(arr.Select(r => r.Validation.AbsErrorMs));
+ double worstNegativeSlack = arr.Max(r => r.Validation.MaxNegativeSlackMs);
+ int totalCount = arr.Sum(r => r.Validation.Count);
+
+ var avgValidation = new ModelValidation(avgAbsError, worstNegativeSlack, totalCount);
+
+ Stats? avgFlush = null;
+ var flushStatsSeq = arr.Select(r => r.FlushSizeStats).Where(s => s != null).Select(s => s!).ToArray();
+ if (flushStatsSeq.Length > 0) avgFlush = AverageStats(flushStatsSeq);
+
+
+ var avgQueue = AverageStats(arr.Select(x => x.QueueLength));
+
+ return new EvalResult(avgAlpha, avgLambda, avgMu, avgLatency, avgValidation, avgQueue, avgFlush);
+ }
+
+ private static double EstimateMu(AsyncQueueItem[] ordered)
+ {
+ if (ordered.Length < 2) return 0;
+
+ DateTime first = ordered[0].Delivered;
+ DateTime last = ordered[^1].Delivered;
+ double seconds = (last - first).TotalSeconds;
+ if (seconds <= 0) return 0;
+
+ // N-1 completions over observed interval
+ return (ordered.Length - 1) / seconds;
+ }
+
+ private static Stats AverageStats(IEnumerable seq)
+ {
+ var arr = seq.ToArray();
+ if (arr.Length == 0) return new Stats(0, 0, 0, 0, 0);
+ return new Stats(
+ Mean: arr.Average(s => s.Mean),
+ P50: arr.Average(s => s.P50),
+ P95: arr.Average(s => s.P95),
+ P99: arr.Average(s => s.P99),
+ Max: arr.Average(s => s.Max));
+ }
+ }
+
+}
diff --git a/Tests/Distribution/Queueing/Server/Esiur.Tests.Queueing.Server.csproj b/Tests/Distribution/Queueing/Server/Esiur.Tests.Queueing.Server.csproj
new file mode 100644
index 0000000..0efb788
--- /dev/null
+++ b/Tests/Distribution/Queueing/Server/Esiur.Tests.Queueing.Server.csproj
@@ -0,0 +1,14 @@
+
+
+
+ Exe
+ net10.0
+ enable
+ enable
+
+
+
+
+
+
+
diff --git a/Tests/Distribution/Queueing/Server/Program.cs b/Tests/Distribution/Queueing/Server/Program.cs
new file mode 100644
index 0000000..2f5e56c
--- /dev/null
+++ b/Tests/Distribution/Queueing/Server/Program.cs
@@ -0,0 +1,40 @@
+// ============================================================
+// Test 4: Fork-Join Queueing Test — SERVER NODE//
+// Usage: dotnet run -- --port 10901
+// ============================================================
+
+using Esiur.Resource;
+using Esiur.Stores;
+using Esiur.Protocol;
+using Esiur.Tests.Queueing.Server;
+
+var port = int.Parse(GetArg(args, "--port", "10901"));
+
+
+Console.WriteLine($"[Server] Listening on port {port}...");
+
+var wh = Warehouse.Default;
+var mem = await wh.Put("sys", new MemoryStore());
+var service = await wh.Put("sys/queueing", new QueueingService());
+var server = await wh.Put("sys/server", new EpServer() { Port = (ushort)port,
+ EntryPoint = service });
+
+
+long memBefore = GC.GetTotalMemory(forceFullCollection: true);
+
+await wh.Open();
+
+
+long memAfter = GC.GetTotalMemory(forceFullCollection: true);
+double memMB = (memAfter - memBefore) / (1024.0 * 1024.0);
+
+Console.WriteLine("Press ENTER to stop.");
+Console.ReadLine();
+await wh.Close();
+
+
+static string GetArg(string[] args, string key, string def)
+{
+ int i = Array.IndexOf(args, key);
+ return (i >= 0 && i + 1 < args.Length) ? args[i + 1] : def;
+}
diff --git a/Tests/Distribution/Queueing/Server/QueueingService.cs b/Tests/Distribution/Queueing/Server/QueueingService.cs
new file mode 100644
index 0000000..745d59f
--- /dev/null
+++ b/Tests/Distribution/Queueing/Server/QueueingService.cs
@@ -0,0 +1,204 @@
+using Esiur.Core;
+using Esiur.Data;
+using Esiur.Protocol;
+using Esiur.Resource;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Esiur.Tests.Queueing.Server
+{
+ [Resource]
+ public partial class QueueingService : EntryPoint
+ {
+ static Random rand = new Random(322221);
+ public List TestObjects = new List();
+
+ [Export]
+ public object testProperty;
+
+ [Export]
+ public async AsyncReply StartUpdatesLocal(int interval, int count, double localProbability)
+ {
+
+ var dis = GenerateRandomBoolSequence(count, localProbability, new Random(2222));
+
+ for (var i = 0; i < count; i++)
+ {
+ if (dis[i])
+ {
+ var o = await Warehouse.Default.New("sys/anything");
+
+ o.Value = i;
+ o.Name = "Update " + i;
+
+ TestObjects.Add(o);
+
+ TestProperty = o;
+ }
+ else
+ {
+ TestProperty = i;
+ }
+
+ await Task.Delay(interval);
+
+ }
+
+ return null;
+ }
+
+ [Export]
+ public async AsyncReply> StartUpdatesRemote(int interval, int count, double remoteProbability, string remoteLink)
+ {
+ for (var i = 0; i < count; i++)
+ {
+ var probability = rand.NextDouble();
+
+ if (probability <= remoteProbability)
+ {
+ TestProperty = remoteLink;
+ }
+ else
+ {
+ TestProperty = i;
+ }
+
+ await Task.Delay(interval);
+
+ }
+
+ return null;
+ }
+
+
+ [Export]
+ public async AsyncReply StartUpdatesMirror(int interval, int count, double remoteProbability, string remoteNode, string remoteLink)
+ {
+
+ var remoteCon = await Warehouse.Default.Get(remoteNode);
+
+ for (var i = 0; i < count; i++)
+ {
+ var probability = rand.NextDouble();
+
+ if (probability <= remoteProbability)
+ {
+ var o = await remoteCon.Get(remoteLink);
+ TestObjects.Add(o as TestObject);
+
+ TestProperty = o;
+ }
+ else
+ {
+ TestProperty = i;
+ }
+
+ await Task.Delay(interval);
+
+ }
+
+ return null;
+ }
+
+
+
+ [Export]
+ public async AsyncReply> StartUpdates(int interval, int count, double localProbability, double remoteProbability, string remoteHostLink)
+ {
+ for (var i = 0; i < count; i++)
+ {
+ var probability = rand.NextDouble();
+
+ if ((localProbability != 0 && probability <= localProbability) || localProbability == 1)
+ {
+ var o = await Warehouse.Default.New("sys/anything");
+
+ o.Value = i;
+ o.Name = "Update " + i;
+
+ TestObjects.Add(o);
+
+ TestProperty = o;
+ }
+ else if (probability < localProbability + remoteProbability)
+ {
+ TestProperty = new ResourceLink(remoteHostLink);
+ }
+ else
+ {
+ TestProperty = i;
+ }
+
+ await Task.Delay(interval);
+ }
+
+ TestObjects.Clear();
+
+ return null;
+ }
+
+ public override async AsyncReply Query(string path, EpConnection sender)
+ {
+ if (path == "gen")
+ {
+ var o = await Warehouse.Default.New("sys/anything");
+ o.Value = rand.Next();
+ o.Name = "Update " + o.Value;
+ TestObjects.Add(o);
+ return o;
+ }
+ else
+ {
+ if (this.Instance != null)
+ return await this.Instance.Warehouse.Query(path);
+ else
+ return null;
+ }
+ }
+
+ public static bool[] GenerateRandomBoolSequence(
+ int length,
+ double probabilityTrue,
+ Random? rng = null)
+ {
+ if (length <= 0)
+ throw new ArgumentOutOfRangeException(nameof(length));
+
+ if (probabilityTrue < 0.0 || probabilityTrue > 1.0)
+ throw new ArgumentOutOfRangeException(nameof(probabilityTrue));
+
+ rng ??= Random.Shared;
+
+ int trueTarget = (int)Math.Round(length * probabilityTrue);
+ int remaining = length;
+ int remainingTrue = trueTarget;
+
+ var result = new bool[length];
+
+ for (int i = 0; i < length; i++)
+ {
+ // Probability adjusted to guarantee exact total
+ double p = (double)remainingTrue / remaining;
+ bool value = rng.NextDouble() < p;
+
+ result[i] = value;
+
+ if (value)
+ remainingTrue--;
+
+ remaining--;
+ }
+
+ return result;
+ }
+
+
+ protected override bool Create()
+ {
+ return true;
+ }
+
+
+ }
+}
diff --git a/Tests/Distribution/Queueing/Server/TestObject.cs b/Tests/Distribution/Queueing/Server/TestObject.cs
new file mode 100644
index 0000000..b215a83
--- /dev/null
+++ b/Tests/Distribution/Queueing/Server/TestObject.cs
@@ -0,0 +1,15 @@
+using Esiur.Resource;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Esiur.Tests.Queueing.Server
+{
+ [Resource]
+ public partial class TestObject
+ {
+ [Export] int size;
+ [Export] string name;
+ [Export] object value;
+ }
+}