From 25d7449d4454f9cca89234d19104fdb0f787737d Mon Sep 17 00:00:00 2001 From: ahmed Date: Thu, 4 Jun 2026 18:45:11 +0300 Subject: [PATCH] Close fanout sweep subscriber connections --- .../NodeFanoutSweep/Client/Program.cs | 27 ++++++++++++++----- 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/Tests/Distribution/NodeFanoutSweep/Client/Program.cs b/Tests/Distribution/NodeFanoutSweep/Client/Program.cs index 3559cd9..bd0273a 100644 --- a/Tests/Distribution/NodeFanoutSweep/Client/Program.cs +++ b/Tests/Distribution/NodeFanoutSweep/Client/Program.cs @@ -124,8 +124,10 @@ foreach (int n in nValues) bool spawnFailed = false; for (int i = 0; i < n; i++) { - if (spawnTasks[i].Result == null) { spawnFailed = true; break; } - subscribers[i] = spawnTasks[i].Result!; + if (spawnTasks[i].Result == null) + spawnFailed = true; + else + subscribers[i] = spawnTasks[i].Result!; } spawnSw.Stop(); @@ -133,7 +135,7 @@ foreach (int n in nValues) { Console.WriteLine($"[Orchestrator] N={n}: spawn failed; treating as saturation."); saturatedDetected = true; - await TeardownAll(subscriberWhs); + await TeardownAll(subscribers, subscriberWhs); break; } Console.WriteLine($"[Orchestrator] All {n} subscribers attached in {spawnSw.Elapsed.TotalSeconds:F2}s"); @@ -229,7 +231,7 @@ foreach (int n in nValues) // ---------- teardown ---------- Console.WriteLine($"[Orchestrator] Tearing down {n} subscribers..."); - await TeardownAll(subscriberWhs); + await TeardownAll(subscribers, subscriberWhs); await Task.Delay(settleSec * 1000); } @@ -298,7 +300,7 @@ static async Task SpawnSubscriber( try { var conn = await wh.Get($"ep://{host}:{port}"); - var sub = new SubscriberTask { SubscriberId = subId }; + var sub = new SubscriberTask { SubscriberId = subId, Connection = conn }; for (int i = 0; i < resources; i++) { @@ -326,8 +328,20 @@ static async Task SpawnSubscriber( } } -static async Task TeardownAll(Warehouse[] whs) +static async Task TeardownAll(SubscriberTask[] subscribers, Warehouse[] whs) { + foreach (var subscriber in subscribers) + { + if (subscriber == null) + continue; + + try { subscriber.Connection?.Close(); } + catch { /* ignore */ } + + subscriber.Connection = null; + subscriber.Resources.Clear(); + } + foreach (var wh in whs) { try { await wh.Close(); } @@ -386,6 +400,7 @@ static string GetArg(string[] args, string key, string def) class SubscriberTask { public int SubscriberId; + public EpConnection? Connection; public readonly List Resources = new(); internal long _received; internal long _lateDeliveries;