From b45c205d12c7febfcf16b252aab4846f1350878b Mon Sep 17 00:00:00 2001 From: ahmed Date: Mon, 13 Apr 2026 15:46:44 +0300 Subject: [PATCH] Queue --- Esiur.sln | 23 ++ .../Client/Esiur.Tests.Queueing.Client.csproj | 14 + Tests/Distribution/Queueing/Client/Program.cs | 92 +++++ .../Distribution/Queueing/Client/QueueEval.cs | 349 ++++++++++++++++++ .../Server/Esiur.Tests.Queueing.Server.csproj | 14 + Tests/Distribution/Queueing/Server/Program.cs | 40 ++ .../Queueing/Server/QueueingService.cs | 204 ++++++++++ .../Queueing/Server/TestObject.cs | 15 + 8 files changed, 751 insertions(+) create mode 100644 Tests/Distribution/Queueing/Client/Esiur.Tests.Queueing.Client.csproj create mode 100644 Tests/Distribution/Queueing/Client/Program.cs create mode 100644 Tests/Distribution/Queueing/Client/QueueEval.cs create mode 100644 Tests/Distribution/Queueing/Server/Esiur.Tests.Queueing.Server.csproj create mode 100644 Tests/Distribution/Queueing/Server/Program.cs create mode 100644 Tests/Distribution/Queueing/Server/QueueingService.cs create mode 100644 Tests/Distribution/Queueing/Server/TestObject.cs diff --git a/Esiur.sln b/Esiur.sln index 03080a6..e59c225 100644 --- a/Esiur.sln +++ b/Esiur.sln @@ -68,6 +68,16 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.ResourceCount.S EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.ConcurrentAttach", "Tests\Distribution\ConcurrentAttach\Esiur.Tests.ConcurrentAttach.csproj", "{7D88CAF1-1887-A011-BA72-F38C87C1A7D9}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Queueing", "Queueing", "{6F173323-75C1-4142-A4FE-0CEC2EB5EAF9}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.Queueing.Client", "Tests\Distribution\Queueing\Client\Esiur.Tests.Queueing.Client.csproj", "{E7BF2911-582D-C403-254F-F7FC895BFD68}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Client", "Client", "{543158AD-BCB6-44A5-91AB-FE97B42A9C95}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Server", "Server", "{02A07E3C-67DB-4489-88E3-D568C477F540}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.Queueing.Server", "Tests\Distribution\Queueing\Server\Esiur.Tests.Queueing.Server.csproj", "{7FD57668-2AD8-0F53-4006-03927B5A385C}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -138,6 +148,14 @@ Global {7D88CAF1-1887-A011-BA72-F38C87C1A7D9}.Debug|Any CPU.Build.0 = Debug|Any CPU {7D88CAF1-1887-A011-BA72-F38C87C1A7D9}.Release|Any CPU.ActiveCfg = Release|Any CPU {7D88CAF1-1887-A011-BA72-F38C87C1A7D9}.Release|Any CPU.Build.0 = Release|Any CPU + {E7BF2911-582D-C403-254F-F7FC895BFD68}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {E7BF2911-582D-C403-254F-F7FC895BFD68}.Debug|Any CPU.Build.0 = Debug|Any CPU + {E7BF2911-582D-C403-254F-F7FC895BFD68}.Release|Any CPU.ActiveCfg = Release|Any CPU + {E7BF2911-582D-C403-254F-F7FC895BFD68}.Release|Any CPU.Build.0 = Release|Any CPU + {7FD57668-2AD8-0F53-4006-03927B5A385C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {7FD57668-2AD8-0F53-4006-03927B5A385C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {7FD57668-2AD8-0F53-4006-03927B5A385C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {7FD57668-2AD8-0F53-4006-03927B5A385C}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -170,6 +188,11 @@ Global {69A075E7-D924-59C6-0BF2-17A09201DDF3} = {413A4292-C2B3-4096-94CF-D6F607C20939} {D1DF309F-40DE-9C0E-A78B-2648544B77D2} = {DA2EC9AF-E2D9-4B8D-8EC3-CC65CFD3B974} {7D88CAF1-1887-A011-BA72-F38C87C1A7D9} = {336B5CE1-95DA-4FDD-A876-0919E3C446CA} + {6F173323-75C1-4142-A4FE-0CEC2EB5EAF9} = {94C8CFDB-C7C6-40DF-A596-647FEEA3C917} + {E7BF2911-582D-C403-254F-F7FC895BFD68} = {543158AD-BCB6-44A5-91AB-FE97B42A9C95} + {543158AD-BCB6-44A5-91AB-FE97B42A9C95} = {6F173323-75C1-4142-A4FE-0CEC2EB5EAF9} + {02A07E3C-67DB-4489-88E3-D568C477F540} = {6F173323-75C1-4142-A4FE-0CEC2EB5EAF9} + {7FD57668-2AD8-0F53-4006-03927B5A385C} = {02A07E3C-67DB-4489-88E3-D568C477F540} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {C584421D-5EC0-4821-B7D8-2633D8D405F2} diff --git a/Tests/Distribution/Queueing/Client/Esiur.Tests.Queueing.Client.csproj b/Tests/Distribution/Queueing/Client/Esiur.Tests.Queueing.Client.csproj new file mode 100644 index 0000000..634bddd --- /dev/null +++ b/Tests/Distribution/Queueing/Client/Esiur.Tests.Queueing.Client.csproj @@ -0,0 +1,14 @@ + + + + Exe + net10.0 + enable + enable + + + + + + + diff --git a/Tests/Distribution/Queueing/Client/Program.cs b/Tests/Distribution/Queueing/Client/Program.cs new file mode 100644 index 0000000..4c4c4d8 --- /dev/null +++ b/Tests/Distribution/Queueing/Client/Program.cs @@ -0,0 +1,92 @@ +// ============================================================ +// Test 4: Fork-Join Queueing Test — CLIENT NODE +// +// Usage: dotnet run -- --host 127.0.0.1 --port 10901 --trials 10000 +// ============================================================ + +using Esiur.Data; +using Esiur.Protocol; +using Esiur.Resource; +using Esiur.Tests.Queueing.Client; +using System.ComponentModel; +using System.Diagnostics; +using System.Diagnostics.Metrics; +using System.Text.RegularExpressions; + +var results = new List(); +int counter = 0; + + + +int currentAlpha = 0; +int currentDelay = 0; + +var host = GetArg(args, "--host", "127.0.0.1"); +var port = int.Parse(GetArg(args, "--port", "10901")); +var trials = int.Parse(GetArg(args, "--trials", "1000")); +var delays = GetArg(args, "--delays", "5:8:10:20:30:100") + .Split(":").Select(x => Convert.ToInt32(x)).ToArray(); +var alphas = GetArg(args, "--alphas", "0.0:0.25:0.5:0.75:1") + .Split(":").Select(y => Convert.ToDouble(y)).ToArray(); + + +Console.WriteLine($"[Client-T2] Connecting to {host}:{port}, trials={trials}"); + +var wh = new Warehouse(); + +var serviceResource = await wh.Get( + $"ep://{host}:{port}/queueing"); + +var service = (dynamic)serviceResource; + +serviceResource.PropertyChanged += Service_PropertyChanged; + + + +Console.WriteLine("Starting next test: Delay=" + delays[currentDelay] + " Alpha=" + alphas[currentAlpha]); + +service.StartUpdatesLocal(delays[currentDelay], trials, alphas[currentAlpha]); + + + +void Service_PropertyChanged(object? sender, PropertyChangedEventArgs e) +{ + counter++; + + if (counter == trials) + { + var queue = service.DistributedResourceConnection.GetFinishedQueue(); + var result = EsiurQueueEval.Evaluate(queue); + + Console.WriteLine(result); + counter = 0; + + if (currentAlpha == delays.Length - 1) + { + currentAlpha = 0; + currentDelay++; + } + else + { + currentAlpha++; + } + + if (currentDelay == delays.Length) + { + System.Environment.Exit(0); + return; + } + + Console.WriteLine(); + Console.WriteLine("Starting next test: Delay=" + delays[currentDelay] + " Alpha=" + alphas[currentAlpha]); + + service.StartUpdatesLocal(delays[currentDelay], trials, alphas[currentAlpha]);//, 0, resourceLink); + + } +} + +static string GetArg(string[] args, string key, string def) +{ + int i = Array.IndexOf(args, key); + return (i >= 0 && i + 1 < args.Length) ? args[i + 1] : def; +} diff --git a/Tests/Distribution/Queueing/Client/QueueEval.cs b/Tests/Distribution/Queueing/Client/QueueEval.cs new file mode 100644 index 0000000..dd3ea78 --- /dev/null +++ b/Tests/Distribution/Queueing/Client/QueueEval.cs @@ -0,0 +1,349 @@ +using Esiur.Core; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Esiur.Tests.Queueing.Client +{ + public static class EsiurQueueEval + { + + public static class EvalPrinter + { + public static void Print(EsiurQueueEval.EvalResult r) + { + Console.WriteLine("=== Evaluation Result ==="); + Console.WriteLine($"α (HasResource probability): {r.Alpha:F3}"); + Console.WriteLine($"λ̂ (effective arrival rate): {r.LambdaEventsPerSecond:F2} events/s"); + Console.WriteLine(); + + PrintLatencyTable(r.Latency); + + Console.WriteLine(); + PrintValidation(r.Validation); + + if (r.FlushSizeStats != null) + { + Console.WriteLine(); + PrintStats("Flush size (≤ window)", r.FlushSizeStats); + } + } + + private static void PrintLatencyTable(EsiurQueueEval.LatencyDecomposition l) + { + Console.WriteLine("Latency Decomposition (ms)"); + Console.WriteLine("-----------------------------------------------"); + Console.WriteLine($"{"Metric",-16} {"Mean",8} {"P50",8} {"P95",8} {"P99",8} {"Max",8}"); + Console.WriteLine("-----------------------------------------------"); + + PrintRow("Readiness R", l.ReadinessMs); + PrintRow("HOL Δ", l.HolMs); + PrintRow("End-to-End D", l.EndToEndMs); + + Console.WriteLine("-----------------------------------------------"); + } + + private static void PrintValidation(EsiurQueueEval.ModelValidation v) + { + Console.WriteLine("Resequencing Model Validation"); + Console.WriteLine("-----------------------------------------------"); + Console.WriteLine("Absolute error |d − d̂| (ms)"); + PrintStats("Error", v.AbsErrorMs); + + if (v.MaxNegativeSlackMs > 0) + { + Console.WriteLine($"Max negative slack (delivered earlier than model): {v.MaxNegativeSlackMs:F4} ms"); + } + else + { + Console.WriteLine("No negative slack observed (model is conservative)."); + } + } + + private static void PrintRow(string name, EsiurQueueEval.Stats s) + { + Console.WriteLine( + $"{name,-16} " + + $"{s.Mean,8:F2} " + + $"{s.P50,8:F2} " + + $"{s.P95,8:F2} " + + $"{s.P99,8:F2} " + + $"{s.Max,8:F2}" + ); + } + + private static void PrintStats(string name, EsiurQueueEval.Stats s) + { + Console.WriteLine( + $"{name,-20} " + + $"mean={s.Mean,8:F02} " + + $"p50={s.P50,8:F02} " + + $"p95={s.P95,8:F02} " + + $"p99={s.P99,8:F02} " + + $"max={s.Max,8:F02}" + ); + } + } + + public sealed record Stats(double Mean, double P50, double P95, double P99, double Max); + + public sealed record LatencyDecomposition(Stats ReadinessMs, Stats HolMs, Stats EndToEndMs); + + public sealed record ModelValidation( + Stats AbsErrorMs, + double MaxNegativeSlackMs, // worst case where Delivered < predicted (if happens) + int Count); + + public sealed record EvalResult( + double Alpha, + double LambdaEventsPerSecond, + double MuEventsPerSecond, // <-- NEW + LatencyDecomposition Latency, + ModelValidation Validation, + Stats QueueLength, + Stats? FlushSizeStats); + + /// + /// Evaluates Esiur fork-join readiness + in-order resequencing using in-memory items. + /// Assumes items refer to a single ordered stream (per resource queue). + /// + public static EvalResult Evaluate( + IReadOnlyList> items, + double flushWindowMs = 0.5, + bool computeFlush = true) + { + if (items == null) throw new ArgumentNullException(nameof(items)); + if (items.Count == 0) throw new ArgumentException("items is empty."); + + // Ensure deterministic order: prefer Sequence, then Arrival timestamp + var ordered = items + .OrderBy(x => x.Sequence) + .ThenBy(x => x.Arrival) + .ToArray(); + + int n = ordered.Length; + + // Latency components in milliseconds + var readiness = new double[n]; // R = r-a + var hol = new double[n]; // Δ = d-r + var endToEnd = new double[n]; // D = d-a + + int resCount = 0; + for (int i = 0; i < n; i++) + { + var e = ordered[i]; + + double Rms = (e.Ready - e.Arrival).TotalMilliseconds; + double Hms = (e.Delivered - e.Ready).TotalMilliseconds; + double Dms = (e.Delivered - e.Arrival).TotalMilliseconds; + + // Robustness against logging placement or clock issues + if (Rms < 0) Rms = 0; + if (Hms < 0) Hms = 0; + if (Dms < 0) Dms = 0; + + readiness[i] = Rms; + hol[i] = Hms; + endToEnd[i] = Dms; + + if (e.HasResource) resCount++; + } + + // α = P(HasResource) + double alpha = (double)resCount / n; + + + // Effective arrival rate λ̂ from arrival timeline + double lambda = EstimateLambda(ordered); + + // Effective departure / readiness rate μ̂ from delivery timeline + double mu = EstimateMu(ordered); + + var latency = new LatencyDecomposition( + ReadinessMs: ComputeStats(readiness), + HolMs: ComputeStats(hol), + EndToEndMs: ComputeStats(endToEnd)); + + // --- Resequencing validation: d_hat_i = max(r_i, d_hat_{i-1}) --- + // Use DateTime ticks for exactness, then convert to ms. + var absErrMs = new double[n]; + long prevPredictedTicks = long.MinValue; + double maxNegativeSlackMs = 0; + + for (int i = 0; i < n; i++) + { + long ri = ordered[i].Ready.Ticks; + + long predicted = (i == 0) + ? ri + : Math.Max(ri, prevPredictedTicks); + + prevPredictedTicks = predicted; + + long observed = ordered[i].Delivered.Ticks; + long errTicks = Math.Abs(observed - predicted); + absErrMs[i] = TicksToMs(errTicks); + + // If observed delivery occurs earlier than model predicts (shouldn't, but track it) + long slackTicks = observed - predicted; // negative => earlier than predicted + if (slackTicks < 0) + { + double slackMs = TicksToMs(-slackTicks); + if (slackMs > maxNegativeSlackMs) maxNegativeSlackMs = slackMs; + } + } + + var validation = new ModelValidation( + AbsErrorMs: ComputeStats(absErrMs), + MaxNegativeSlackMs: maxNegativeSlackMs, + Count: n); + + // --- Flush sizes (optional): consecutive deliveries within window --- + //Stats? flushStats = null; + //if (computeFlush) + //{ + // var flushSizes = ComputeFlushSizes(ordered, flushWindowMs); + // flushStats = ComputeStats(flushSizes.Select(x => (double)x).ToArray()); + //} + + var queueLength = ComputeStats(ordered.Select(x => (double)x.NotificationsCountWaitingInTheQueueAtEnqueueing).ToArray()); + + var flushStats = ComputeStats(ordered.GroupBy(x => x.FlushId).Select(x => (double)x.First().BatchSize).ToArray()); + + return new EvalResult(alpha, lambda, mu, latency, validation, queueLength, flushStats); + } + + // ---------------- Helpers ---------------- + + private static double EstimateLambda(AsyncQueueItem[] ordered) + { + if (ordered.Length < 2) return 0; + + DateTime first = ordered[0].Arrival; + DateTime last = ordered[^1].Arrival; + double seconds = (last - first).TotalSeconds; + if (seconds <= 0) return 0; + + // N-1 arrivals over observed interval + return (ordered.Length - 1) / seconds; + } + + private static double TicksToMs(long ticks) => ticks / 10_000.0; // 1 tick = 100ns + + private static int[] ComputeFlushSizes(AsyncQueueItem[] ordered, double windowMs) + { + var sizes = new List(ordered.Length); + long windowTicks = (long)(windowMs * 10_000.0); + + int i = 0; + while (i < ordered.Length) + { + int j = i + 1; + long baseD = ordered[i].Delivered.Ticks; + + while (j < ordered.Length) + { + long dj = ordered[j].Delivered.Ticks; + if (Math.Abs(dj - baseD) <= windowTicks) j++; + else break; + } + + sizes.Add(j - i); + i = j; + } + + return sizes.ToArray(); + } + + private static Stats ComputeStats(double[] values) + { + if (values.Length == 0) return new Stats(0, 0, 0, 0, 0); + + double mean = values.Average(); + double max = values.Max(); + + var sorted = (double[])values.Clone(); + Array.Sort(sorted); + + return new Stats( + Mean: mean, + P50: QuantileSorted(sorted, 0.50), + P95: QuantileSorted(sorted, 0.95), + P99: QuantileSorted(sorted, 0.99), + Max: max); + } + + // Linear interpolation quantile + private static double QuantileSorted(double[] sorted, double q) + { + if (sorted.Length == 1) return sorted[0]; + + double pos = (sorted.Length - 1) * q; + int lo = (int)Math.Floor(pos); + int hi = (int)Math.Ceiling(pos); + if (lo == hi) return sorted[lo]; + + double frac = pos - lo; + return sorted[lo] + (sorted[hi] - sorted[lo]) * frac; + } + + // Compute the element-wise average of a sequence of EvalResult + public static EvalResult Average(IEnumerable results) + { + if (results == null) throw new ArgumentNullException(nameof(results)); + var arr = results.ToArray(); + if (arr.Length == 0) throw new ArgumentException("results is empty.", nameof(results)); + + double avgAlpha = arr.Average(r => r.Alpha); + double avgLambda = arr.Average(r => r.LambdaEventsPerSecond); + double avgMu = arr.Average(r => r.MuEventsPerSecond); + + Stats avgReadiness = AverageStats(arr.Select(r => r.Latency.ReadinessMs)); + Stats avgHol = AverageStats(arr.Select(r => r.Latency.HolMs)); + Stats avgE2E = AverageStats(arr.Select(r => r.Latency.EndToEndMs)); + + var avgLatency = new LatencyDecomposition(avgReadiness, avgHol, avgE2E); + + Stats avgAbsError = AverageStats(arr.Select(r => r.Validation.AbsErrorMs)); + double worstNegativeSlack = arr.Max(r => r.Validation.MaxNegativeSlackMs); + int totalCount = arr.Sum(r => r.Validation.Count); + + var avgValidation = new ModelValidation(avgAbsError, worstNegativeSlack, totalCount); + + Stats? avgFlush = null; + var flushStatsSeq = arr.Select(r => r.FlushSizeStats).Where(s => s != null).Select(s => s!).ToArray(); + if (flushStatsSeq.Length > 0) avgFlush = AverageStats(flushStatsSeq); + + + var avgQueue = AverageStats(arr.Select(x => x.QueueLength)); + + return new EvalResult(avgAlpha, avgLambda, avgMu, avgLatency, avgValidation, avgQueue, avgFlush); + } + + private static double EstimateMu(AsyncQueueItem[] ordered) + { + if (ordered.Length < 2) return 0; + + DateTime first = ordered[0].Delivered; + DateTime last = ordered[^1].Delivered; + double seconds = (last - first).TotalSeconds; + if (seconds <= 0) return 0; + + // N-1 completions over observed interval + return (ordered.Length - 1) / seconds; + } + + private static Stats AverageStats(IEnumerable seq) + { + var arr = seq.ToArray(); + if (arr.Length == 0) return new Stats(0, 0, 0, 0, 0); + return new Stats( + Mean: arr.Average(s => s.Mean), + P50: arr.Average(s => s.P50), + P95: arr.Average(s => s.P95), + P99: arr.Average(s => s.P99), + Max: arr.Average(s => s.Max)); + } + } + +} diff --git a/Tests/Distribution/Queueing/Server/Esiur.Tests.Queueing.Server.csproj b/Tests/Distribution/Queueing/Server/Esiur.Tests.Queueing.Server.csproj new file mode 100644 index 0000000..0efb788 --- /dev/null +++ b/Tests/Distribution/Queueing/Server/Esiur.Tests.Queueing.Server.csproj @@ -0,0 +1,14 @@ + + + + Exe + net10.0 + enable + enable + + + + + + + diff --git a/Tests/Distribution/Queueing/Server/Program.cs b/Tests/Distribution/Queueing/Server/Program.cs new file mode 100644 index 0000000..2f5e56c --- /dev/null +++ b/Tests/Distribution/Queueing/Server/Program.cs @@ -0,0 +1,40 @@ +// ============================================================ +// Test 4: Fork-Join Queueing Test — SERVER NODE// +// Usage: dotnet run -- --port 10901 +// ============================================================ + +using Esiur.Resource; +using Esiur.Stores; +using Esiur.Protocol; +using Esiur.Tests.Queueing.Server; + +var port = int.Parse(GetArg(args, "--port", "10901")); + + +Console.WriteLine($"[Server] Listening on port {port}..."); + +var wh = Warehouse.Default; +var mem = await wh.Put("sys", new MemoryStore()); +var service = await wh.Put("sys/queueing", new QueueingService()); +var server = await wh.Put("sys/server", new EpServer() { Port = (ushort)port, + EntryPoint = service }); + + +long memBefore = GC.GetTotalMemory(forceFullCollection: true); + +await wh.Open(); + + +long memAfter = GC.GetTotalMemory(forceFullCollection: true); +double memMB = (memAfter - memBefore) / (1024.0 * 1024.0); + +Console.WriteLine("Press ENTER to stop."); +Console.ReadLine(); +await wh.Close(); + + +static string GetArg(string[] args, string key, string def) +{ + int i = Array.IndexOf(args, key); + return (i >= 0 && i + 1 < args.Length) ? args[i + 1] : def; +} diff --git a/Tests/Distribution/Queueing/Server/QueueingService.cs b/Tests/Distribution/Queueing/Server/QueueingService.cs new file mode 100644 index 0000000..745d59f --- /dev/null +++ b/Tests/Distribution/Queueing/Server/QueueingService.cs @@ -0,0 +1,204 @@ +using Esiur.Core; +using Esiur.Data; +using Esiur.Protocol; +using Esiur.Resource; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Esiur.Tests.Queueing.Server +{ + [Resource] + public partial class QueueingService : EntryPoint + { + static Random rand = new Random(322221); + public List TestObjects = new List(); + + [Export] + public object testProperty; + + [Export] + public async AsyncReply StartUpdatesLocal(int interval, int count, double localProbability) + { + + var dis = GenerateRandomBoolSequence(count, localProbability, new Random(2222)); + + for (var i = 0; i < count; i++) + { + if (dis[i]) + { + var o = await Warehouse.Default.New("sys/anything"); + + o.Value = i; + o.Name = "Update " + i; + + TestObjects.Add(o); + + TestProperty = o; + } + else + { + TestProperty = i; + } + + await Task.Delay(interval); + + } + + return null; + } + + [Export] + public async AsyncReply> StartUpdatesRemote(int interval, int count, double remoteProbability, string remoteLink) + { + for (var i = 0; i < count; i++) + { + var probability = rand.NextDouble(); + + if (probability <= remoteProbability) + { + TestProperty = remoteLink; + } + else + { + TestProperty = i; + } + + await Task.Delay(interval); + + } + + return null; + } + + + [Export] + public async AsyncReply StartUpdatesMirror(int interval, int count, double remoteProbability, string remoteNode, string remoteLink) + { + + var remoteCon = await Warehouse.Default.Get(remoteNode); + + for (var i = 0; i < count; i++) + { + var probability = rand.NextDouble(); + + if (probability <= remoteProbability) + { + var o = await remoteCon.Get(remoteLink); + TestObjects.Add(o as TestObject); + + TestProperty = o; + } + else + { + TestProperty = i; + } + + await Task.Delay(interval); + + } + + return null; + } + + + + [Export] + public async AsyncReply> StartUpdates(int interval, int count, double localProbability, double remoteProbability, string remoteHostLink) + { + for (var i = 0; i < count; i++) + { + var probability = rand.NextDouble(); + + if ((localProbability != 0 && probability <= localProbability) || localProbability == 1) + { + var o = await Warehouse.Default.New("sys/anything"); + + o.Value = i; + o.Name = "Update " + i; + + TestObjects.Add(o); + + TestProperty = o; + } + else if (probability < localProbability + remoteProbability) + { + TestProperty = new ResourceLink(remoteHostLink); + } + else + { + TestProperty = i; + } + + await Task.Delay(interval); + } + + TestObjects.Clear(); + + return null; + } + + public override async AsyncReply Query(string path, EpConnection sender) + { + if (path == "gen") + { + var o = await Warehouse.Default.New("sys/anything"); + o.Value = rand.Next(); + o.Name = "Update " + o.Value; + TestObjects.Add(o); + return o; + } + else + { + if (this.Instance != null) + return await this.Instance.Warehouse.Query(path); + else + return null; + } + } + + public static bool[] GenerateRandomBoolSequence( + int length, + double probabilityTrue, + Random? rng = null) + { + if (length <= 0) + throw new ArgumentOutOfRangeException(nameof(length)); + + if (probabilityTrue < 0.0 || probabilityTrue > 1.0) + throw new ArgumentOutOfRangeException(nameof(probabilityTrue)); + + rng ??= Random.Shared; + + int trueTarget = (int)Math.Round(length * probabilityTrue); + int remaining = length; + int remainingTrue = trueTarget; + + var result = new bool[length]; + + for (int i = 0; i < length; i++) + { + // Probability adjusted to guarantee exact total + double p = (double)remainingTrue / remaining; + bool value = rng.NextDouble() < p; + + result[i] = value; + + if (value) + remainingTrue--; + + remaining--; + } + + return result; + } + + + protected override bool Create() + { + return true; + } + + + } +} diff --git a/Tests/Distribution/Queueing/Server/TestObject.cs b/Tests/Distribution/Queueing/Server/TestObject.cs new file mode 100644 index 0000000..b215a83 --- /dev/null +++ b/Tests/Distribution/Queueing/Server/TestObject.cs @@ -0,0 +1,15 @@ +using Esiur.Resource; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Esiur.Tests.Queueing.Server +{ + [Resource] + public partial class TestObject + { + [Export] int size; + [Export] string name; + [Export] object value; + } +}