diff --git a/.gitignore b/.gitignore index d527a3c..c02784d 100644 --- a/.gitignore +++ b/.gitignore @@ -34,6 +34,9 @@ bld/ [Tt]est[Rr]esult*/ [Bb]uild[Ll]og.* +# Generated test result artifacts +Tests/**/[Rr]esults/ + # NUNIT *.VisualState.xml TestResult.xml @@ -261,4 +264,4 @@ paket-files/ # Python Tools for Visual Studio (PTVS) __pycache__/ -*.pyc \ No newline at end of file +*.pyc diff --git a/Libraries/Esiur/Core/AsyncQueue.cs b/Libraries/Esiur/Core/AsyncQueue.cs index 470f421..617105a 100644 --- a/Libraries/Esiur/Core/AsyncQueue.cs +++ b/Libraries/Esiur/Core/AsyncQueue.cs @@ -50,7 +50,8 @@ public class AsyncQueue : AsyncReply int currentId = 0; int currentFlushId; - public List> Processed = new(); + bool captureProcessedItems; + List> processed = new(); List> list = new List>(); //Action callback; @@ -63,6 +64,34 @@ public class AsyncQueue : AsyncReply //return this; //} + /// + /// Enables or disables retaining delivered queue items for diagnostics. + /// Capture is disabled by default, and disabling it discards retained items. + /// + public void SetProcessedCapture(bool enabled) + { + lock (queueLock) + { + captureProcessedItems = enabled; + + if (!enabled) + processed = new(); + } + } + + /// + /// Atomically returns and removes the delivered queue items retained since the previous drain. + /// + public List> DrainProcessed() + { + lock (queueLock) + { + var result = processed; + processed = new(); + return result; + } + } + public void Add(AsyncReply reply) { lock (queueLock) @@ -121,13 +150,16 @@ public class AsyncQueue : AsyncReply Trigger(list[i].Reply.Result); resultReady = false; - var p = list[i]; - p.Delivered = DateTime.Now; - p.Ready = p.Reply.ReadyTime; - p.BatchSize = batchSize; - p.FlushId = flushId; - //p.HasResource = p.Reply. (p.Ready - p.Arrival).TotalMilliseconds > 5; - Processed.Add(p); + if (captureProcessedItems) + { + var p = list[i]; + p.Delivered = DateTime.Now; + p.Ready = p.Reply.ReadyTime; + p.BatchSize = batchSize; + p.FlushId = flushId; + //p.HasResource = p.Reply. (p.Ready - p.Arrival).TotalMilliseconds > 5; + processed.Add(p); + } list.RemoveAt(i); diff --git a/Libraries/Esiur/Protocol/EpConnection.cs b/Libraries/Esiur/Protocol/EpConnection.cs index 2263c02..d9c4438 100644 --- a/Libraries/Esiur/Protocol/EpConnection.cs +++ b/Libraries/Esiur/Protocol/EpConnection.cs @@ -378,11 +378,21 @@ public partial class EpConnection : NetworkConnection, IStore } + /// + /// Enables or disables retaining delivered resource queue items for diagnostics. + /// Capture is disabled by default. + /// + public void SetFinishedQueueCapture(bool enabled) + { + _queue.SetProcessedCapture(enabled); + } + + /// + /// Atomically returns and removes the retained delivered resource queue items. + /// public List> GetFinishedQueue() { - var l = _queue.Processed.ToArray().ToList(); - _queue.Processed.Clear(); - return l; + return _queue.DrainProcessed(); } void init() diff --git a/Tests/Distribution/Queueing/Client/Program.cs b/Tests/Distribution/Queueing/Client/Program.cs index fc3660f..44f950a 100644 --- a/Tests/Distribution/Queueing/Client/Program.cs +++ b/Tests/Distribution/Queueing/Client/Program.cs @@ -7,9 +7,9 @@ // Table III (λ, μ, R̄, δ̄, D̄, P99(D), queue length, batch B). // // Each replication uses an identical configuration; the server -// runs StartUpdatesLocal back-to-back, and the client snapshots -// the cumulative finished-queue length between replications so -// that each replication's evaluation sees only its own items. +// runs StartUpdatesLocal back-to-back, and the client drains the +// captured finished queue between replications so that each +// replication's evaluation sees only its own items. // // Usage: // dotnet run -- --host 127.0.0.1 --port 10901 \ @@ -52,20 +52,20 @@ Console.WriteLine($"[Client-T4-R] {delays.Length * alphas.Length} configurations var wh = new Warehouse(); var serviceResource = await wh.Get($"ep://{host}:{port}/sys/queueing"); var service = (dynamic)serviceResource; +serviceResource.ResourceConnection.SetFinishedQueueCapture(true); // ---------- replication coordinator state ---------- // // The server's StartUpdatesLocal fires `trials` PropertyChanged events // across a single call. We count incoming events; when `trials` arrive, -// the current replication is complete. We then slice off this rep's -// portion of the cumulative finished-queue and hand it to QueueEval. +// the current replication is complete. We then drain this rep's +// finished-queue items and hand them to QueueEval. // // `repDone` is signaled once per replication so the orchestrator coroutine // can drive the next call. int eventsThisRep = 0; TaskCompletionSource repDone = new(TaskCreationOptions.RunContinuationsAsynchronously); -int finishedQueueBaseline = 0; // cumulative length BEFORE current rep started serviceResource.PropertyChanged += (object? sender, PropertyChangedEventArgs e) => { @@ -100,10 +100,8 @@ foreach (var delay in delays) repDone = new TaskCompletionSource( TaskCreationOptions.RunContinuationsAsynchronously); - // Snapshot the cumulative finished-queue length right before this rep - // so we can slice off only this rep's portion afterwards. - var preQueue = service.ResourceConnection.GetFinishedQueue(); - finishedQueueBaseline = preQueue.Count; + // Discard any straggler notifications from the previous replication. + serviceResource.ResourceConnection.GetFinishedQueue(); // Kick off the server-driven trial sequence (fire-and-forget; // completion is signalled via PropertyChanged → repDone). @@ -112,15 +110,10 @@ foreach (var delay in delays) // Wait until `trials` PropertyChanged events have been received. await repDone.Task; - // The server completed `trials` events; slice off this rep's - // portion of the cumulative finished-queue. GetFinishedQueue() - // returns IReadOnlyList>; we forward the - // typed sliced subset directly to Evaluate which is generic - // on T (the property's runtime payload type). - var fullQueue = service.ResourceConnection.GetFinishedQueue(); - var typedQueue = SliceQueue(fullQueue, finishedQueueBaseline); - - var repResult = EsiurQueueEval.Evaluate(typedQueue); + // The server completed `trials` events; drain this replication's + // captured queue items and evaluate them directly. + var repQueue = serviceResource.ResourceConnection.GetFinishedQueue(); + var repResult = EsiurQueueEval.Evaluate(repQueue); reps.Add(repResult); Console.WriteLine($" rep {rep + 1}/{replications}: " + @@ -158,19 +151,3 @@ static string GetArg(string[] args, string key, string def) int i = Array.IndexOf(args, key); return (i >= 0 && i + 1 < args.Length) ? args[i + 1] : def; } - -// ---------------------------------------------------------------- -// Slice the cumulative finished-queue down to only the items added -// during the current replication. -// -// The queue is dynamically typed (returned from a dynamic-dispatched -// member) and its element type is AsyncQueueItem where T is the -// runtime payload type of the observed property. We rely on the DLR -// to bind the LINQ Skip/ToList generic methods at runtime, just -// as the original code does with the Evaluate call below it. -// ---------------------------------------------------------------- -static dynamic SliceQueue(dynamic fullQueue, int skipCount) -{ - return System.Linq.Enumerable.ToList( - System.Linq.Enumerable.Skip(fullQueue, skipCount)); -} \ No newline at end of file diff --git a/Tests/Unit/AsyncQueueTests.cs b/Tests/Unit/AsyncQueueTests.cs new file mode 100644 index 0000000..0cdaa41 --- /dev/null +++ b/Tests/Unit/AsyncQueueTests.cs @@ -0,0 +1,77 @@ +using Esiur.Core; + +namespace Esiur.Tests.Unit; + +public class AsyncQueueTests +{ + [Fact] + public void ProcessedCapture_IsDisabledByDefault() + { + var queue = new AsyncQueue(); + var delivered = new List(); + queue.Then(delivered.Add); + + for (var i = 0; i < 100; i++) + queue.Add(new AsyncReply(i)); + + Assert.Equal(Enumerable.Range(0, 100), delivered); + Assert.Empty(queue.DrainProcessed()); + } + + [Fact] + public void DrainProcessed_ReturnsCapturedItemsExactlyOnce() + { + var queue = new AsyncQueue(); + queue.SetProcessedCapture(true); + + for (var i = 0; i < 100; i++) + queue.Add(new AsyncReply(i)); + + var processed = queue.DrainProcessed(); + + Assert.Equal(100, processed.Count); + Assert.Equal(Enumerable.Range(1, 100), processed.Select(x => x.Sequence)); + Assert.All(processed, x => Assert.NotEqual(default, x.Delivered)); + Assert.Empty(queue.DrainProcessed()); + } + + [Fact] + public void DisablingCapture_DiscardsHistoryAndStopsCapturing() + { + var queue = new AsyncQueue(); + queue.SetProcessedCapture(true); + queue.Add(new AsyncReply(1)); + + queue.SetProcessedCapture(false); + queue.Add(new AsyncReply(2)); + + Assert.Empty(queue.DrainProcessed()); + } + + [Fact] + public async Task DrainProcessed_DoesNotLoseItemsDuringConcurrentAdds() + { + const int itemCount = 1000; + var queue = new AsyncQueue(); + var sequences = new HashSet(); + queue.SetProcessedCapture(true); + + var producer = Task.Run(() => + Parallel.For(0, itemCount, i => queue.Add(new AsyncReply(i)))); + + while (!producer.IsCompleted) + { + foreach (var item in queue.DrainProcessed()) + Assert.True(sequences.Add(item.Sequence)); + + await Task.Yield(); + } + + await producer; + + foreach (var item in queue.DrainProcessed()) + Assert.True(sequences.Add(item.Sequence)); + + Assert.Equal(Enumerable.Range(1, itemCount), sequences.OrderBy(x => x)); + } +}