From f1bbe8b6cd7d4774153e888afaaa565341c6b0d3 Mon Sep 17 00:00:00 2001 From: ahmed Date: Fri, 5 Jun 2026 18:10:53 +0300 Subject: [PATCH] Queue test --- Libraries/Esiur/Core/AsyncQueue.cs | 5 +++- .../Esiur/Protocol/EpConnectionProtocol.cs | 30 ++++++++++++++----- Tests/Distribution/Queueing/Server/Program.cs | 6 ++-- Tests/Unit/AsyncQueueTests.cs | 20 +++++++++++++ 4 files changed, 50 insertions(+), 11 deletions(-) diff --git a/Libraries/Esiur/Core/AsyncQueue.cs b/Libraries/Esiur/Core/AsyncQueue.cs index 617105a..8929485 100644 --- a/Libraries/Esiur/Core/AsyncQueue.cs +++ b/Libraries/Esiur/Core/AsyncQueue.cs @@ -93,6 +93,9 @@ public class AsyncQueue : AsyncReply } public void Add(AsyncReply reply) + => Add(reply, !reply.Ready); + + public void Add(AsyncReply reply, bool hasResource) { lock (queueLock) { @@ -103,7 +106,7 @@ public class AsyncQueue : AsyncReply NotificationsCountWaitingInTheQueueAtEnqueueing = list.Count, Reply = reply, Arrival = DateTime.Now, - HasResource = !reply.Ready + HasResource = hasResource }); } diff --git a/Libraries/Esiur/Protocol/EpConnectionProtocol.cs b/Libraries/Esiur/Protocol/EpConnectionProtocol.cs index 0e894af..76123f4 100644 --- a/Libraries/Esiur/Protocol/EpConnectionProtocol.cs +++ b/Libraries/Esiur/Protocol/EpConnectionProtocol.cs @@ -606,12 +606,13 @@ partial class EpConnection if (pt == null) return; - Codec.ParseAsync(tdu.Data, valueOffset, this, null).Then(pr => + void EnqueueParsedProperty(ParsedTdu parsed) { - if (pr.Value is AsyncReply asyncReply) + var value = Codec.ParseAsync(parsed, this, null); + if (value is AsyncReply asyncReply) { var item = new AsyncReply(); - _queue.Add(item); + _queue.Add(item, hasResource: true); asyncReply.Then((result) => { @@ -624,14 +625,27 @@ partial class EpConnection { _queue.Add(new AsyncReply(new EpResourceQueueItem((EpResource)r, EpResourceQueueItem.DistributedResourceQueueItemType.Propery, - pr.Value, index))); + value, index)), hasResource: false); } + } - }).Error((ex) => + var parsed = ParsedTdu.Parse(tdu.Data, valueOffset, (uint)tdu.Data.Length, this); + if (parsed is ParsedTdu parsedTdu) { - //.Error(x => SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError)); - throw ex; - }); + EnqueueParsedProperty(parsedTdu); + } + else if (parsed is AsyncReply parsedReply) + { + parsedReply.Then(EnqueueParsedProperty).Error((ex) => + { + //.Error(x => SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError)); + throw ex; + }); + } + else + { + throw new NullReferenceException("DataType can't be parsed."); + } }); } diff --git a/Tests/Distribution/Queueing/Server/Program.cs b/Tests/Distribution/Queueing/Server/Program.cs index 76a691f..1c43907 100644 --- a/Tests/Distribution/Queueing/Server/Program.cs +++ b/Tests/Distribution/Queueing/Server/Program.cs @@ -29,8 +29,10 @@ await wh.Open(); long memAfter = GC.GetTotalMemory(forceFullCollection: true); double memMB = (memAfter - memBefore) / (1024.0 * 1024.0); -Console.WriteLine("Press ENTER to stop."); -Console.ReadLine(); +Console.WriteLine("Ready. Press Ctrl+C to stop."); +var stop = new TaskCompletionSource(); +Console.CancelKeyPress += (_, e) => { e.Cancel = true; stop.TrySetResult(); }; +await stop.Task; await wh.Close(); diff --git a/Tests/Unit/AsyncQueueTests.cs b/Tests/Unit/AsyncQueueTests.cs index 0cdaa41..f345fcc 100644 --- a/Tests/Unit/AsyncQueueTests.cs +++ b/Tests/Unit/AsyncQueueTests.cs @@ -35,6 +35,26 @@ public class AsyncQueueTests Assert.Empty(queue.DrainProcessed()); } + [Fact] + public void DrainProcessed_PreservesExplicitResourceWorkFlag() + { + var queue = new AsyncQueue(); + queue.SetProcessedCapture(true); + + queue.Add(new AsyncReply(1), hasResource: false); + + var pending = new AsyncReply(); + queue.Add(pending, hasResource: true); + pending.Trigger(2); + + var processed = queue.DrainProcessed(); + + Assert.Collection( + processed, + x => Assert.False(x.HasResource), + x => Assert.True(x.HasResource)); + } + [Fact] public void DisablingCapture_DiscardsHistoryAndStopsCapturing() {