2
0
mirror of https://github.com/esiur/esiur-dotnet.git synced 2026-06-13 14:38:43 +00:00

Prevent unbounded queue history retention

This commit is contained in:
2026-06-04 18:23:10 +03:00
parent 3cd611970a
commit 2963c0505b
5 changed files with 146 additions and 47 deletions
+3
View File
@@ -34,6 +34,9 @@ bld/
[Tt]est[Rr]esult*/
[Bb]uild[Ll]og.*
# Generated test result artifacts
Tests/**/[Rr]esults/
# NUNIT
*.VisualState.xml
TestResult.xml
+34 -2
View File
@@ -50,7 +50,8 @@ public class AsyncQueue<T> : AsyncReply<T>
int currentId = 0;
int currentFlushId;
public List<AsyncQueueItem<T>> Processed = new();
bool captureProcessedItems;
List<AsyncQueueItem<T>> processed = new();
List<AsyncQueueItem<T>> list = new List<AsyncQueueItem<T>>();
//Action<T> callback;
@@ -63,6 +64,34 @@ public class AsyncQueue<T> : AsyncReply<T>
//return this;
//}
/// <summary>
/// Enables or disables retaining delivered queue items for diagnostics.
/// Capture is disabled by default, and disabling it discards retained items.
/// </summary>
public void SetProcessedCapture(bool enabled)
{
lock (queueLock)
{
captureProcessedItems = enabled;
if (!enabled)
processed = new();
}
}
/// <summary>
/// Atomically returns and removes the delivered queue items retained since the previous drain.
/// </summary>
public List<AsyncQueueItem<T>> DrainProcessed()
{
lock (queueLock)
{
var result = processed;
processed = new();
return result;
}
}
public void Add(AsyncReply<T> reply)
{
lock (queueLock)
@@ -121,13 +150,16 @@ public class AsyncQueue<T> : AsyncReply<T>
Trigger(list[i].Reply.Result);
resultReady = false;
if (captureProcessedItems)
{
var p = list[i];
p.Delivered = DateTime.Now;
p.Ready = p.Reply.ReadyTime;
p.BatchSize = batchSize;
p.FlushId = flushId;
//p.HasResource = p.Reply. (p.Ready - p.Arrival).TotalMilliseconds > 5;
Processed.Add(p);
processed.Add(p);
}
list.RemoveAt(i);
+13 -3
View File
@@ -378,11 +378,21 @@ public partial class EpConnection : NetworkConnection, IStore
}
/// <summary>
/// Enables or disables retaining delivered resource queue items for diagnostics.
/// Capture is disabled by default.
/// </summary>
public void SetFinishedQueueCapture(bool enabled)
{
_queue.SetProcessedCapture(enabled);
}
/// <summary>
/// Atomically returns and removes the retained delivered resource queue items.
/// </summary>
public List<AsyncQueueItem<EpResourceQueueItem>> GetFinishedQueue()
{
var l = _queue.Processed.ToArray().ToList();
_queue.Processed.Clear();
return l;
return _queue.DrainProcessed();
}
void init()
+12 -35
View File
@@ -7,9 +7,9 @@
// Table III (λ, μ, R̄, δ̄, D̄, P99(D), queue length, batch B).
//
// Each replication uses an identical configuration; the server
// runs StartUpdatesLocal back-to-back, and the client snapshots
// the cumulative finished-queue length between replications so
// that each replication's evaluation sees only its own items.
// runs StartUpdatesLocal back-to-back, and the client drains the
// captured finished queue between replications so that each
// replication's evaluation sees only its own items.
//
// Usage:
// dotnet run -- --host 127.0.0.1 --port 10901 \
@@ -52,20 +52,20 @@ Console.WriteLine($"[Client-T4-R] {delays.Length * alphas.Length} configurations
var wh = new Warehouse();
var serviceResource = await wh.Get<EpResource>($"ep://{host}:{port}/sys/queueing");
var service = (dynamic)serviceResource;
serviceResource.ResourceConnection.SetFinishedQueueCapture(true);
// ---------- replication coordinator state ----------
//
// The server's StartUpdatesLocal fires `trials` PropertyChanged events
// across a single call. We count incoming events; when `trials` arrive,
// the current replication is complete. We then slice off this rep's
// portion of the cumulative finished-queue and hand it to QueueEval.
// the current replication is complete. We then drain this rep's
// finished-queue items and hand them to QueueEval.
//
// `repDone` is signaled once per replication so the orchestrator coroutine
// can drive the next call.
int eventsThisRep = 0;
TaskCompletionSource<bool> repDone = new(TaskCreationOptions.RunContinuationsAsynchronously);
int finishedQueueBaseline = 0; // cumulative length BEFORE current rep started
serviceResource.PropertyChanged += (object? sender, PropertyChangedEventArgs e) =>
{
@@ -100,10 +100,8 @@ foreach (var delay in delays)
repDone = new TaskCompletionSource<bool>(
TaskCreationOptions.RunContinuationsAsynchronously);
// Snapshot the cumulative finished-queue length right before this rep
// so we can slice off only this rep's portion afterwards.
var preQueue = service.ResourceConnection.GetFinishedQueue();
finishedQueueBaseline = preQueue.Count;
// Discard any straggler notifications from the previous replication.
serviceResource.ResourceConnection.GetFinishedQueue();
// Kick off the server-driven trial sequence (fire-and-forget;
// completion is signalled via PropertyChanged → repDone).
@@ -112,15 +110,10 @@ foreach (var delay in delays)
// Wait until `trials` PropertyChanged events have been received.
await repDone.Task;
// The server completed `trials` events; slice off this rep's
// portion of the cumulative finished-queue. GetFinishedQueue()
// returns IReadOnlyList<AsyncQueueItem<T>>; we forward the
// typed sliced subset directly to Evaluate which is generic
// on T (the property's runtime payload type).
var fullQueue = service.ResourceConnection.GetFinishedQueue();
var typedQueue = SliceQueue(fullQueue, finishedQueueBaseline);
var repResult = EsiurQueueEval.Evaluate(typedQueue);
// The server completed `trials` events; drain this replication's
// captured queue items and evaluate them directly.
var repQueue = serviceResource.ResourceConnection.GetFinishedQueue();
var repResult = EsiurQueueEval.Evaluate(repQueue);
reps.Add(repResult);
Console.WriteLine($" rep {rep + 1}/{replications}: " +
@@ -158,19 +151,3 @@ static string GetArg(string[] args, string key, string def)
int i = Array.IndexOf(args, key);
return (i >= 0 && i + 1 < args.Length) ? args[i + 1] : def;
}
// ----------------------------------------------------------------
// Slice the cumulative finished-queue down to only the items added
// during the current replication.
//
// The queue is dynamically typed (returned from a dynamic-dispatched
// member) and its element type is AsyncQueueItem<T> where T is the
// runtime payload type of the observed property. We rely on the DLR
// to bind the LINQ Skip<T>/ToList<T> generic methods at runtime, just
// as the original code does with the Evaluate<T> call below it.
// ----------------------------------------------------------------
static dynamic SliceQueue(dynamic fullQueue, int skipCount)
{
return System.Linq.Enumerable.ToList(
System.Linq.Enumerable.Skip(fullQueue, skipCount));
}
+77
View File
@@ -0,0 +1,77 @@
using Esiur.Core;
namespace Esiur.Tests.Unit;
public class AsyncQueueTests
{
[Fact]
public void ProcessedCapture_IsDisabledByDefault()
{
var queue = new AsyncQueue<int>();
var delivered = new List<int>();
queue.Then(delivered.Add);
for (var i = 0; i < 100; i++)
queue.Add(new AsyncReply<int>(i));
Assert.Equal(Enumerable.Range(0, 100), delivered);
Assert.Empty(queue.DrainProcessed());
}
[Fact]
public void DrainProcessed_ReturnsCapturedItemsExactlyOnce()
{
var queue = new AsyncQueue<int>();
queue.SetProcessedCapture(true);
for (var i = 0; i < 100; i++)
queue.Add(new AsyncReply<int>(i));
var processed = queue.DrainProcessed();
Assert.Equal(100, processed.Count);
Assert.Equal(Enumerable.Range(1, 100), processed.Select(x => x.Sequence));
Assert.All(processed, x => Assert.NotEqual(default, x.Delivered));
Assert.Empty(queue.DrainProcessed());
}
[Fact]
public void DisablingCapture_DiscardsHistoryAndStopsCapturing()
{
var queue = new AsyncQueue<int>();
queue.SetProcessedCapture(true);
queue.Add(new AsyncReply<int>(1));
queue.SetProcessedCapture(false);
queue.Add(new AsyncReply<int>(2));
Assert.Empty(queue.DrainProcessed());
}
[Fact]
public async Task DrainProcessed_DoesNotLoseItemsDuringConcurrentAdds()
{
const int itemCount = 1000;
var queue = new AsyncQueue<int>();
var sequences = new HashSet<int>();
queue.SetProcessedCapture(true);
var producer = Task.Run(() =>
Parallel.For(0, itemCount, i => queue.Add(new AsyncReply<int>(i))));
while (!producer.IsCompleted)
{
foreach (var item in queue.DrainProcessed())
Assert.True(sequences.Add(item.Sequence));
await Task.Yield();
}
await producer;
foreach (var item in queue.DrainProcessed())
Assert.True(sequences.Add(item.Sequence));
Assert.Equal(Enumerable.Range(1, itemCount), sequences.OrderBy(x => x));
}
}