From 2431166f25f38ffcce01179cce4e3b72f127d3db Mon Sep 17 00:00:00 2001 From: ahmed Date: Wed, 3 Jun 2026 13:02:56 +0300 Subject: [PATCH] Deadlock tests --- Esiur.sln | 32 ++ Libraries/Esiur/Data/Codec.cs | 4 +- Libraries/Esiur/Data/DataDeserializer.cs | 25 + Libraries/Esiur/Data/DataSerializer.cs | 25 + Libraries/Esiur/Esiur.csproj | 5 + Libraries/Esiur/Net/NetworkServer.cs | 23 +- .../Esiur/Protocol/DeadlockResolutionMode.cs | 31 ++ Libraries/Esiur/Protocol/EpConnection.cs | 21 +- .../Esiur/Protocol/EpConnectionProtocol.cs | 266 ++++++++++- Libraries/Esiur/Protocol/EpResource.cs | 347 ++++++++------ Libraries/Esiur/Resource/ResourceStatus.cs | 31 ++ .../Client/Esiur.Tests.Deadlock.Client.csproj | 14 + Tests/Distribution/Deadlock/Client/Program.cs | 160 +++++++ Tests/Distribution/Deadlock/README.md | 86 ++++ .../Server/Esiur.Tests.Deadlock.Server.csproj | 14 + Tests/Distribution/Deadlock/Server/Node.cs | 19 + Tests/Distribution/Deadlock/Server/Program.cs | 189 ++++++++ .../Distribution/Deadlock/Server/Resource1.cs | 15 + .../Distribution/Deadlock/Server/Resource2.cs | 14 + Tests/Unit/FetchCycleDetectionTests.cs | 86 ++++ .../Integration/DeadlockDetectionTests.cs | 268 +++++++++++ .../Integration/DeadlockIntegrationTests.cs | 446 ++++++++++++++++++ Tests/Unit/Integration/IntegrationHarness.cs | 104 ++++ Tests/Unit/Integration/Node.cs | 17 + Tests/Unit/ReattachDeltaTests.cs | 75 +++ 25 files changed, 2160 insertions(+), 157 deletions(-) create mode 100644 Libraries/Esiur/Protocol/DeadlockResolutionMode.cs create mode 100644 Libraries/Esiur/Resource/ResourceStatus.cs create mode 100644 Tests/Distribution/Deadlock/Client/Esiur.Tests.Deadlock.Client.csproj create mode 100644 Tests/Distribution/Deadlock/Client/Program.cs create mode 100644 Tests/Distribution/Deadlock/README.md create mode 100644 Tests/Distribution/Deadlock/Server/Esiur.Tests.Deadlock.Server.csproj create mode 100644 Tests/Distribution/Deadlock/Server/Node.cs create mode 100644 Tests/Distribution/Deadlock/Server/Program.cs create mode 100644 Tests/Distribution/Deadlock/Server/Resource1.cs create mode 100644 Tests/Distribution/Deadlock/Server/Resource2.cs create mode 100644 Tests/Unit/FetchCycleDetectionTests.cs create mode 100644 Tests/Unit/Integration/DeadlockDetectionTests.cs create mode 100644 Tests/Unit/Integration/DeadlockIntegrationTests.cs create mode 100644 Tests/Unit/Integration/IntegrationHarness.cs create mode 100644 Tests/Unit/Integration/Node.cs create mode 100644 Tests/Unit/ReattachDeltaTests.cs diff --git a/Esiur.sln b/Esiur.sln index 13bfa1c..d7b4209 100644 --- a/Esiur.sln +++ b/Esiur.sln @@ -95,6 +95,12 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.ConcurrentAttac EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.Unit", "Tests\Unit\Esiur.Tests.Unit.csproj", "{D1B99C5A-82F7-459D-B56D-F8FD096D3854}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Distribution", "Distribution", "{1C087695-14B5-C927-8D92-12D1EE36BDAB}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.Deadlock.Server", "Tests\Distribution\Deadlock\Server\Esiur.Tests.Deadlock.Server.csproj", "{41FD182A-2A7E-4E3A-BEDE-F55C0D9C83EF}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.Deadlock.Client", "Tests\Distribution\Deadlock\Client\Esiur.Tests.Deadlock.Client.csproj", "{8D12333C-4619-4145-A6C6-000F9EF471B8}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -369,6 +375,30 @@ Global {D1B99C5A-82F7-459D-B56D-F8FD096D3854}.Release|x64.Build.0 = Release|Any CPU {D1B99C5A-82F7-459D-B56D-F8FD096D3854}.Release|x86.ActiveCfg = Release|Any CPU {D1B99C5A-82F7-459D-B56D-F8FD096D3854}.Release|x86.Build.0 = Release|Any CPU + {41FD182A-2A7E-4E3A-BEDE-F55C0D9C83EF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {41FD182A-2A7E-4E3A-BEDE-F55C0D9C83EF}.Debug|Any CPU.Build.0 = Debug|Any CPU + {41FD182A-2A7E-4E3A-BEDE-F55C0D9C83EF}.Debug|x64.ActiveCfg = Debug|Any CPU + {41FD182A-2A7E-4E3A-BEDE-F55C0D9C83EF}.Debug|x64.Build.0 = Debug|Any CPU + {41FD182A-2A7E-4E3A-BEDE-F55C0D9C83EF}.Debug|x86.ActiveCfg = Debug|Any CPU + {41FD182A-2A7E-4E3A-BEDE-F55C0D9C83EF}.Debug|x86.Build.0 = Debug|Any CPU + {41FD182A-2A7E-4E3A-BEDE-F55C0D9C83EF}.Release|Any CPU.ActiveCfg = Release|Any CPU + {41FD182A-2A7E-4E3A-BEDE-F55C0D9C83EF}.Release|Any CPU.Build.0 = Release|Any CPU + {41FD182A-2A7E-4E3A-BEDE-F55C0D9C83EF}.Release|x64.ActiveCfg = Release|Any CPU + {41FD182A-2A7E-4E3A-BEDE-F55C0D9C83EF}.Release|x64.Build.0 = Release|Any CPU + {41FD182A-2A7E-4E3A-BEDE-F55C0D9C83EF}.Release|x86.ActiveCfg = Release|Any CPU + {41FD182A-2A7E-4E3A-BEDE-F55C0D9C83EF}.Release|x86.Build.0 = Release|Any CPU + {8D12333C-4619-4145-A6C6-000F9EF471B8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {8D12333C-4619-4145-A6C6-000F9EF471B8}.Debug|Any CPU.Build.0 = Debug|Any CPU + {8D12333C-4619-4145-A6C6-000F9EF471B8}.Debug|x64.ActiveCfg = Debug|Any CPU + {8D12333C-4619-4145-A6C6-000F9EF471B8}.Debug|x64.Build.0 = Debug|Any CPU + {8D12333C-4619-4145-A6C6-000F9EF471B8}.Debug|x86.ActiveCfg = Debug|Any CPU + {8D12333C-4619-4145-A6C6-000F9EF471B8}.Debug|x86.Build.0 = Debug|Any CPU + {8D12333C-4619-4145-A6C6-000F9EF471B8}.Release|Any CPU.ActiveCfg = Release|Any CPU + {8D12333C-4619-4145-A6C6-000F9EF471B8}.Release|Any CPU.Build.0 = Release|Any CPU + {8D12333C-4619-4145-A6C6-000F9EF471B8}.Release|x64.ActiveCfg = Release|Any CPU + {8D12333C-4619-4145-A6C6-000F9EF471B8}.Release|x64.Build.0 = Release|Any CPU + {8D12333C-4619-4145-A6C6-000F9EF471B8}.Release|x86.ActiveCfg = Release|Any CPU + {8D12333C-4619-4145-A6C6-000F9EF471B8}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -414,6 +444,8 @@ Global {E713D25F-2602-44C9-AB9E-C9477FB2BA93} = {94C8CFDB-C7C6-40DF-A596-647FEEA3C917} {3FFB2BF4-159E-3073-4BDF-08AE93C7A2C1} = {E713D25F-2602-44C9-AB9E-C9477FB2BA93} {D1B99C5A-82F7-459D-B56D-F8FD096D3854} = {2769C4C3-2595-413B-B7FE-5903826770C1} + {41FD182A-2A7E-4E3A-BEDE-F55C0D9C83EF} = {1C087695-14B5-C927-8D92-12D1EE36BDAB} + {8D12333C-4619-4145-A6C6-000F9EF471B8} = {1C087695-14B5-C927-8D92-12D1EE36BDAB} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {C584421D-5EC0-4821-B7D8-2633D8D405F2} diff --git a/Libraries/Esiur/Data/Codec.cs b/Libraries/Esiur/Data/Codec.cs index 0f3da05..0b3bc77 100644 --- a/Libraries/Esiur/Data/Codec.cs +++ b/Libraries/Esiur/Data/Codec.cs @@ -489,7 +489,9 @@ public static class Codec [typeof(Map)] = DataSerializer.MapComposer, [typeof(Map)] = DataSerializer.MapComposer, [typeof(Map)] = DataSerializer.MapComposer, - [typeof(PropertyValue[])] = DataSerializer.PropertyValueArrayComposer + [typeof(PropertyValue[])] = DataSerializer.PropertyValueArrayComposer, + // Sparse property delta for the reattach reply (index -> value/age/date). + [typeof(Map)] = DataSerializer.PropertyValueMapComposer // Typed // [typeof(bool[])] = (value, con) => DataSerializer.TypedListComposer((IEnumerable)value, typeof(bool), con), // [typeof(bool?[])] = (value, con) => (TransmissionDataUnitIdentifier.TypedList, new byte[] { (byte)value }), diff --git a/Libraries/Esiur/Data/DataDeserializer.cs b/Libraries/Esiur/Data/DataDeserializer.cs index c8d14bf..9e93c2d 100644 --- a/Libraries/Esiur/Data/DataDeserializer.cs +++ b/Libraries/Esiur/Data/DataDeserializer.cs @@ -1898,6 +1898,31 @@ public static class DataDeserializer } + /// + /// Parses a sparse property delta produced by PropertyValueMapComposer (the reattach + /// reply): a flat sequence of (index, age, date, value) TDUs, returned as a map keyed by the + /// property index. Mirrors but in groups of four. + /// + public static AsyncReply> PropertyValueMapParserAsync(byte[] data, uint offset, uint length, EpConnection connection, uint[] requestSequence) + { + var rt = new AsyncReply>(); + + ListParserAsync(new ParsedTdu() { Data = data, PayloadOffset = offset, PayloadLength = length } + , connection, requestSequence).Then(x => + { + var ar = (object[])x; + var map = new Map(); + + for (var i = 0; i + 3 < ar.Length; i += 4) + map[Convert.ToByte(ar[i])] = + new PropertyValue(ar[i + 3], Convert.ToUInt64(ar[i + 1]), (DateTime?)ar[i + 2]); + + rt.Trigger(map); + }); + + return rt; + } + public static async AsyncReply> PropertyValueParserAsync(byte[] data, uint offset, EpConnection connection, uint[] requestSequence)//, bool ageIncluded = true) { diff --git a/Libraries/Esiur/Data/DataSerializer.cs b/Libraries/Esiur/Data/DataSerializer.cs index 2eaf46d..940cdbf 100644 --- a/Libraries/Esiur/Data/DataSerializer.cs +++ b/Libraries/Esiur/Data/DataSerializer.cs @@ -630,6 +630,31 @@ public static class DataSerializer (uint)rt.Count, null, null); } + /// + /// Composes a sparse property delta (index -> value/age/date) used by the reattach reply, as + /// a flat sequence of (index, age, date, value) TDUs per modified property. PropertyValue is + /// not a self-describing type, so this dedicated composer is used instead of the generic map + /// path. Mirrors with a leading property index. + /// + public static Tdu PropertyValueMapComposer(object value, Warehouse warehouse, EpConnection connection) + { + if (value == null) + return new Tdu(TduIdentifier.Null, new byte[0], 0, null, null); + + var rt = new List(); + var map = (Map)value; + + foreach (var kv in map) + { + rt.AddRange(Codec.Compose(kv.Key, warehouse, connection)); // property index (u8) + rt.AddRange(Codec.Compose(kv.Value.Age, warehouse, connection)); // age + rt.AddRange(Codec.Compose(kv.Value.Date, warehouse, connection)); // modification date + rt.AddRange(Codec.Compose(kv.Value.Value, warehouse, connection)); // value + } + + return new Tdu(TduIdentifier.RawData, rt.ToArray(), (uint)rt.Count, null, null); + } + public static Tdu TypedMapComposer(object value, Type keyType, Type valueType, Warehouse warehouse, EpConnection connection) { if (value == null) diff --git a/Libraries/Esiur/Esiur.csproj b/Libraries/Esiur/Esiur.csproj index 5642d37..aa181d6 100644 --- a/Libraries/Esiur/Esiur.csproj +++ b/Libraries/Esiur/Esiur.csproj @@ -36,6 +36,11 @@ True + + + + + diff --git a/Libraries/Esiur/Net/NetworkServer.cs b/Libraries/Esiur/Net/NetworkServer.cs index b352882..27baf6c 100644 --- a/Libraries/Esiur/Net/NetworkServer.cs +++ b/Libraries/Esiur/Net/NetworkServer.cs @@ -158,19 +158,18 @@ public abstract class NetworkServer : IDestructible where TConnecti try { - if (listener != null) + var currentListener = listener; + if (currentListener != null) { - port = listener.LocalEndPoint.Port; - listener.Close(); + // Reading the endpoint can throw if the socket is already disposed (e.g. a second + // Stop or the finalizer after Destroy), so it is best-effort and only used for logging. + try { port = currentListener.LocalEndPoint.Port; } catch { } + try { currentListener.Close(); } catch { } + listener = null; // make Stop idempotent } - var cons = Connections.ToArray(); - - //lock (connections.SyncRoot) - //{ - foreach (TConnection con in cons) - con.Close(); - //} + foreach (TConnection con in Connections.ToArray()) + try { con.Close(); } catch { } } finally { @@ -206,6 +205,7 @@ public abstract class NetworkServer : IDestructible where TConnecti { Stop(); OnDestroy?.Invoke(this); + GC.SuppressFinalize(this); // explicit teardown done; no need for the finalizer to run Stop again } private void ClientDisconnectedEventReceiver(NetworkConnection connection) @@ -228,7 +228,8 @@ public abstract class NetworkServer : IDestructible where TConnecti ~NetworkServer() { - Stop(); + // Finalizers must never throw; Stop() is already guarded but wrap defensively. + try { Stop(); } catch { } listener = null; } } diff --git a/Libraries/Esiur/Protocol/DeadlockResolutionMode.cs b/Libraries/Esiur/Protocol/DeadlockResolutionMode.cs new file mode 100644 index 0000000..531bece --- /dev/null +++ b/Libraries/Esiur/Protocol/DeadlockResolutionMode.cs @@ -0,0 +1,31 @@ +namespace Esiur.Protocol; + +/// +/// Strategy used by EpConnection.FetchResource when it is asked for a resource whose +/// attachment is already in flight. Selectable mainly for experimental A/B/C evaluation of the +/// deadlock-prevention algorithm. +/// +public enum DeadlockResolutionMode : byte +{ + /// + /// Default. Wait for the in-flight attachment to complete, except when a genuine wait-for cycle + /// is detected (same dependency chain, or a cross-chain cycle in the wait-for graph), in which + /// case a placeholder is returned to break it. Never deadlocks and never returns an unnecessary + /// placeholder. + /// + WaitWithCycleDetection = 0, + + /// + /// Legacy behaviour: return the not-yet-attached placeholder to any cross-chain requester of an + /// in-flight resource. Never deadlocks, but delivers partially-attached resources for non-cyclic + /// contention (the bug under study). + /// + LegacyCrossChainPlaceholder = 1, + + /// + /// No cycle handling at all: always wait for the in-flight attachment, even within the same + /// dependency chain. Genuinely deadlocks whenever the request graph contains a cycle. Used only + /// to demonstrate that cycle handling is necessary and that the deadlock detector works. + /// + NaiveWait = 2, +} diff --git a/Libraries/Esiur/Protocol/EpConnection.cs b/Libraries/Esiur/Protocol/EpConnection.cs index 5d17058..2263c02 100644 --- a/Libraries/Esiur/Protocol/EpConnection.cs +++ b/Libraries/Esiur/Protocol/EpConnection.cs @@ -797,7 +797,22 @@ public partial class EpConnection : NetworkConnection, IStore } else if (_authPacket.Command == EpAuthPacketCommand.Acknowledge) { - if (_authPacket.Method == EpAuthPacketMethod.ProceedToHandshake + // Anonymous (None-mode) success: the responder establishes the session directly + // via SessionEstablished, without a handshake exchange. Complete the connection so + // the pending open request resolves. (Previously this was only handled inside the + // ProceedToHandshake branch, so a direct SessionEstablished left the initiator hung.) + if (_session.AuthenticationMode == AuthenticationMode.None + && _authPacket.Method == EpAuthPacketMethod.SessionEstablished) + { + _session.Authenticated = true; + _session.LocalIdentity = null; + _session.RemoteIdentity = null; + _session.Key = null; + AuthenticatonCompleted(); + return offset; + } + + if (_authPacket.Method == EpAuthPacketMethod.ProceedToHandshake || _authPacket.Method == EpAuthPacketMethod.ProceedToFinalHandshake) { var remoteHeaders @@ -1948,7 +1963,9 @@ public partial class EpConnection : NetworkConnection, IStore _neededResources[id] = r; - await FetchResource(id, null); + // Reattach using the last-known age so only properties modified while + // disconnected are transferred and merged, instead of re-fetching all. + await Reattach(id, r.Instance.Age, r); Global.Log("EpConnection", LogType.Debug, "Restored " + id); diff --git a/Libraries/Esiur/Protocol/EpConnectionProtocol.cs b/Libraries/Esiur/Protocol/EpConnectionProtocol.cs index 14d4512..c2fcf8d 100644 --- a/Libraries/Esiur/Protocol/EpConnectionProtocol.cs +++ b/Libraries/Esiur/Protocol/EpConnectionProtocol.cs @@ -57,6 +57,27 @@ partial class EpConnection KeyList> _attachedResources = new KeyList>(); KeyList> _suspendedResources = new KeyList>(); KeyList> _resourceRequests = new KeyList>(); + + // Wait-for graph for in-flight resource fetches: maps a resource id to the set of in-flight + // child resource ids its attachment is currently blocked on. Used to detect genuine cycles + // (e.g. two concurrent fetches A<->B) so a placeholder can break the deadlock, while + // independent/app-facing fetches of an in-flight resource simply wait for full attachment. + readonly Dictionary> _fetchBlockedOn = new Dictionary>(); + + /// + /// Strategy FetchResource uses for an in-flight resource. Defaults to the new wait + cycle + /// detection. Selectable for experimental evaluation (see ). + /// + public DeadlockResolutionMode DeadlockResolution { get; set; } = DeadlockResolutionMode.WaitWithCycleDetection; + + // Per-connection diagnostics (free of the cross-connection contamination that the shared + // Global.Counters suffer from). Used by the deadlock experiments. + /// Number of resources fully attached on this connection (a monotonic progress signal). + public long AttachedResourceCount { get; private set; } + /// Number of wait-for-cycle breaks (placeholders returned to break a cycle) on this connection. + public long CycleBreakCount { get; private set; } + /// Number of placeholders returned where no genuine cycle existed (legacy resolver only). + public long UnnecessaryPlaceholderCount { get; private set; } //KeyList> _typeDefsByIdRequests = new KeyList>(); //KeyList> _typeDefsByNameRequests = new KeyList>(); @@ -1969,6 +1990,11 @@ partial class EpConnection req.Then(result => { + // The resource is being handed to the application: publish its fully-attached + // graph so that, if any dependency is only partially attached, it stays unpublished. + if (result is EpResource resource) + PublishGraph(resource); + rt.Trigger(result); }).Error(ex => rt.TriggerError(ex)); @@ -2026,6 +2052,103 @@ partial class EpConnection /// DistributedResource /// //object fetchResourceLock = new object(); + // Records that the attachment of `parent` is now blocked waiting on in-flight child `child`. + void AddFetchBlock(uint parent, uint child) + { + if (!_fetchBlockedOn.TryGetValue(parent, out var set)) + _fetchBlockedOn[parent] = set = new HashSet(); + set.Add(child); + } + + // Removes a resource from the wait-for graph once it is attached or its fetch failed: it is + // no longer blocked on anything and no longer a pending child of anyone. + void ClearFetchNode(uint id) + { + _fetchBlockedOn.Remove(id); + foreach (var set in _fetchBlockedOn.Values) + set.Remove(id); + } + + /// + /// Returns true if completing the fetch of by waiting for its in-flight + /// request would deadlock, i.e. the resource is (transitively) blocked on a resource that the + /// current request chain is itself building. In that case the caller should hand back the + /// placeholder to break the cycle instead of waiting. + /// + internal static bool HasWaitForCycle(uint id, uint[] requestSequence, IReadOnlyDictionary> blockedOn) + { + if (requestSequence == null || requestSequence.Length == 0) + return false; + + var chain = new HashSet(requestSequence); + var visited = new HashSet(); + var stack = new Stack(); + stack.Push(id); + + while (stack.Count > 0) + { + var current = stack.Pop(); + if (!visited.Add(current)) + continue; + + if (!blockedOn.TryGetValue(current, out var children)) + continue; + + foreach (var child in children) + { + // Reaching a node that the current chain is attaching closes the cycle. + if (chain.Contains(child)) + return true; + stack.Push(child); + } + } + + return false; + } + + /// + /// Publishes a fully-attached object graph to the application: every resource reachable from + /// is marked , but only if the + /// entire reachable graph is already attached. If any reachable resource is still being + /// attached (e.g. a placeholder handed out to break a cycle), the graph is left unpublished — + /// exactly the partially-attached delivery that the wait-by-default resolver prevents and the + /// legacy resolver does not. + /// + internal void PublishGraph(EpResource root) + { + if (root == null) + return; + + var seen = new HashSet(); + var reachable = new List(); + var queue = new Queue(); + queue.Enqueue(root); + + var fullyAttached = true; + + while (queue.Count > 0) + { + var node = queue.Dequeue(); + if (node == null || !seen.Add(node.ResourceInstanceId)) + continue; + + reachable.Add(node); + + if (node.Status != ResourceStatus.Attached) + { + fullyAttached = false; + continue; // do not traverse into a not-yet-attached node + } + + foreach (var child in node.GetReferencedResources()) + queue.Enqueue(child); + } + + if (fullyAttached) + foreach (var node in reachable) + node.Publish(); + } + public AsyncReply FetchResource(uint id, uint[] requestSequence) { //lock (fetchLock) @@ -2044,27 +2167,64 @@ partial class EpConnection var requestInfo = _resourceRequests[id]; + // The resource that triggered this fetch (the tail of the chain), if any. Used to record + // wait-for edges and to tell graph-internal references from app-facing fetches (no chain). + uint? parent = requestSequence != null && requestSequence.Length > 0 + ? requestSequence[requestSequence.Length - 1] + : (uint?)null; + if (requestInfo != null) { - if (resource != null && (requestSequence?.Contains(id) ?? false)) + // Same dependency chain (A->B->A): the placeholder is an internal node of the graph + // currently being attached. The application only observes the chain's top-level reply, + // which fires after full attachment, so returning the not-yet-attached placeholder here + // is safe and breaks the reference cycle. NaiveWait skips this so that even same-chain + // cycles deadlock (used to demonstrate the protection is necessary). + if (DeadlockResolution != DeadlockResolutionMode.NaiveWait + && resource != null && (requestSequence?.Contains(id) ?? false)) { Global.Counters["EpResourceDeadLockSameChain"]++; - // dead lock avoidance for loop reference. + CycleBreakCount++; return new AsyncReply(resource); } - else if (resource != null && requestInfo.RequestSequence.Contains(id)) + + // Decide whether to break the wait by returning the placeholder: + // - Legacy: hand it to ANY cross-chain requester (over-eager; the bug under study). + // - WaitWithCycleDetection: only on a genuine wait-for cycle. + // - NaiveWait: never — always wait below (deadlocks on cycles). + var breakCycle = resource != null && DeadlockResolution switch + { + DeadlockResolutionMode.LegacyCrossChainPlaceholder => requestInfo.RequestSequence.Contains(id), + DeadlockResolutionMode.WaitWithCycleDetection => HasWaitForCycle(id, requestSequence, _fetchBlockedOn), + _ => false, + }; + + if (breakCycle) { Global.Counters["EpResourceDeadLockCrossChain"]++; - // dead lock avoidance for dependent reference. + CycleBreakCount++; + + // Instrumentation: a placeholder handed out where there is no genuine wait-for cycle + // is an unnecessary, partial delivery — the new resolver would have waited for full + // attachment instead. This counts the legacy resolver's over-eager placeholders. + if (DeadlockResolution == DeadlockResolutionMode.LegacyCrossChainPlaceholder + && !HasWaitForCycle(id, requestSequence, _fetchBlockedOn)) + { + Global.Counters["EpResourceUnnecessaryPlaceholder"]++; + UnnecessaryPlaceholderCount++; + } + return new AsyncReply(resource); } - else - { - Global.Counters["EpResourcePendingCacheHit"]++; - return requestInfo.Reply; - } + + // Otherwise an independent or application-facing requester: wait for the in-flight + // attachment to complete fully rather than exposing a partially attached resource. + Global.Counters["EpResourcePendingCacheHit"]++; + if (parent != null) + AddFetchBlock(parent.Value, id); + return requestInfo.Reply; } - else if (resource != null && !resource.ResourceSuspended) + else if (resource != null && resource.Status != ResourceStatus.Suspended) { // @REVIEW: this should never happen Global.Log("DCON", LogType.Error, "Resource not moved to attached."); @@ -2077,6 +2237,10 @@ partial class EpConnection var reply = new AsyncReply(); _resourceRequests.Add(id, new FetchRequestInfo(reply, newSequence)); + // This fetch's parent now waits on `id` until it attaches. + if (parent != null) + AddFetchBlock(parent.Value, id); + SendRequest(EpPacketRequest.AttachResource, id) .Then((result) => { @@ -2113,12 +2277,19 @@ partial class EpConnection var pvs = results as PropertyValue[]; dr._Attach(pvs); + // Progress signal: a resource has fully attached. Used by tests to + // distinguish a true deadlock (no progress while requests pend) from + // merely slow processing (these counters keep advancing). + Global.Counters["EpResourceAttached"]++; + AttachedResourceCount++; _resourceRequests.Remove(id); // move from needed to attached. _neededResources.Remove(id); _attachedResources[id] = new WeakReference(dr); + // attached: no longer part of the in-flight wait-for graph. + ClearFetchNode(id); reply.Trigger(dr); - }).Error(ex => reply.TriggerError(ex)); + }).Error(ex => { _resourceRequests.Remove(id); ClearFetchNode(id); reply.TriggerError(ex); }); }; if (typeDef == null) @@ -2135,6 +2306,9 @@ partial class EpConnection resource.ResourceDefinition = td; typeDef = td; + // Register the placeholder before parsing properties so cyclic + // references in the graph can resolve back to this instance. + _neededResources[id] = resource; Instance.Warehouse.Put(Instance.Link + "/" + id.ToString(), resource) .Then(initResource) .Error(ex => reply.TriggerError(ex)); @@ -2160,6 +2334,9 @@ partial class EpConnection resource.ResourceDefinition = typeDef; + // Register the placeholder before parsing properties so cyclic + // references in the graph can resolve back to this instance. + _neededResources[id] = resource; Instance.Warehouse.Put(this.Instance.Link + "/" + id.ToString(), resource) .Then(initResource).Error((ex) => reply.TriggerError(ex)); } @@ -2171,6 +2348,10 @@ partial class EpConnection }).Error((ex) => { + // Failed to attach: drop the in-flight request and wait-for edges so a + // later retry is not blocked by a stale entry. + _resourceRequests.Remove(id); + ClearFetchNode(id); reply.TriggerError(ex); }); @@ -2187,6 +2368,69 @@ partial class EpConnection /// Resource Id /// DistributedResource /// + /// + /// Re-attaches an already-known resource after reconnection using its last-known age. The peer + /// returns only the properties modified after (the delta), which are + /// merged into the existing instance instead of re-fetching everything. Falls back to a full + /// if there is no prior state to merge into. + /// + public AsyncReply Reattach(uint id, ulong age, EpResource resource) + { + EpResource attachedResource = null; + _attachedResources[id]?.TryGetTarget(out attachedResource); + if (attachedResource != null) + return new AsyncReply(attachedResource); + + var existing = _resourceRequests[id]; + if (existing != null) + return existing.Reply; + + var reply = new AsyncReply(); + var sequence = new uint[] { id }; + _resourceRequests.Add(id, new FetchRequestInfo(reply, sequence)); + + SendRequest(EpPacketRequest.ReattachResource, id, age).Then(result => + { + if (result == null) + { + _resourceRequests.Remove(id); + reply.TriggerError(new AsyncException(ErrorType.Management, + (ushort)ExceptionCode.ResourceNotFound, "Null response")); + return; + } + + // typeId, age, link, hops, delta(index -> PropertyValue) + var args = (object[])result; + var deltaData = (byte[])args[4]; + + DataDeserializer.PropertyValueMapParserAsync(deltaData, 0, (uint)deltaData.Length, this, sequence) + .Then(delta => + { + if (!resource._Reattach(delta)) + { + // No prior state to merge into — perform a full attach instead. + _resourceRequests.Remove(id); + FetchResource(id, null).Then(r => reply.Trigger(r)).Error(ex => reply.TriggerError(ex)); + return; + } + + _resourceRequests.Remove(id); + _neededResources.Remove(id); + _attachedResources[id] = new WeakReference(resource); + ClearFetchNode(id); + reply.Trigger(resource); + }) + .Error(ex => { _resourceRequests.Remove(id); ClearFetchNode(id); reply.TriggerError(ex); }); + }).Error(ex => + { + _resourceRequests.Remove(id); + ClearFetchNode(id); + reply.TriggerError(ex); + }); + + return reply; + } + //object fetchResourceLock = new object(); public AsyncReply FetchTypeDef(ulong id, ulong[] requestSequence) { diff --git a/Libraries/Esiur/Protocol/EpResource.cs b/Libraries/Esiur/Protocol/EpResource.cs index c45e168..f52e45e 100644 --- a/Libraries/Esiur/Protocol/EpResource.cs +++ b/Libraries/Esiur/Protocol/EpResource.cs @@ -56,25 +56,29 @@ public class EpResource : DynamicObject, IResource, INotifyPropertyChanged, IDyn //public event PropertyModifiedEvent PropertyModified; public event PropertyChangedEventHandler PropertyChanged; - uint instanceId; - TypeDef typeDef; - EpConnection connection; + uint _instanceId; + TypeDef _typeDef; + EpConnection _connection; - bool attached = false; - bool destroyed = false; - bool suspended = false; + // Single explicit lifecycle state, replacing the former attached/destroyed/suspended booleans. + Resource.ResourceStatus _status = Resource.ResourceStatus.Pending; + + // Internal read-only views kept so the existing guard checks read naturally. + //bool attached => status == Resource.ResourceStatus.Attached || status == Resource.ResourceStatus.Published; + //bool destroyed => status == Resource.ResourceStatus.Destroyed; + //bool suspended => status == Resource.ResourceStatus.Suspended; //Structure properties = new Structure(); - string link; - ulong age; + string _link; + ulong _age; - protected object[] properties; - internal List parents = new List(); - internal List children = new List(); + protected object[] _properties; + //internal List parents = new List(); + //internal List children = new List(); - EpResourceEvent[] events; + EpResourceEvent[] _events; @@ -83,7 +87,7 @@ public class EpResource : DynamicObject, IResource, INotifyPropertyChanged, IDyn /// public EpConnection ResourceConnection { - get { return connection; } + get { return _connection; } } /// @@ -91,7 +95,7 @@ public class EpResource : DynamicObject, IResource, INotifyPropertyChanged, IDyn /// public string ResourceLink { - get { return link; } + get { return _link; } } /// @@ -99,8 +103,8 @@ public class EpResource : DynamicObject, IResource, INotifyPropertyChanged, IDyn /// public uint ResourceInstanceId { - get { return instanceId; } - internal set { instanceId = value; } + get { return _instanceId; } + internal set { _instanceId = value; } } /// @@ -108,9 +112,8 @@ public class EpResource : DynamicObject, IResource, INotifyPropertyChanged, IDyn /// public void Destroy() { - destroyed = true; - attached = false; - connection.SendDetachRequest(instanceId); + _status = Resource.ResourceStatus.Destroyed; + _connection.SendDetachRequest(_instanceId); OnDestroy?.Invoke(this); } @@ -120,17 +123,69 @@ public class EpResource : DynamicObject, IResource, INotifyPropertyChanged, IDyn internal void Suspend() { - suspended = true; - attached = false; + _status = Resource.ResourceStatus.Suspended; } + /// + /// Marks the resource as published: attached and delivered to the application as part of a + /// fully-attached object graph. A resource only transitions Attached -> Published. + /// + internal void Publish() + { + if (_status == Resource.ResourceStatus.Attached) + _status = Resource.ResourceStatus.Published; + } /// - /// Resource is attached when all its properties are received. + /// The resource's current lifecycle state. Only + /// guarantees the resource and its whole dependency graph are ready for application use. /// - public bool ResourceAttached => attached; + public Resource.ResourceStatus Status => _status; - public bool ResourceSuspended => suspended; + /// + /// Resource is attached when all its own properties are received (it may be Published too). + /// + //public bool ResourceAttached => attached; + + //public bool ResourceSuspended => suspended; + + /// True once the resource has been published to the application. + //public bool ResourcePublished => status == Resource.ResourceStatus.Published; + + /// + /// Enumerates the distributed resources directly referenced by this resource's property values + /// (including those nested inside arrays/lists/maps). Used to walk the dependency graph when + /// publishing a fully-attached graph to the application. + /// + internal IEnumerable GetReferencedResources() + { + if (_properties == null) + yield break; + + foreach (var value in _properties) + foreach (var resource in FlattenResources(value)) + yield return resource; + } + + static IEnumerable FlattenResources(object value) + { + if (value is EpResource resource) + { + yield return resource; + } + else if (value is System.Collections.IDictionary dictionary) + { + foreach (var item in dictionary.Values) + foreach (var r in FlattenResources(item)) + yield return r; + } + else if (value is System.Collections.IEnumerable sequence && !(value is string)) + { + foreach (var item in sequence) + foreach (var r in FlattenResources(item)) + yield return r; + } + } // public DistributedResourceStack Stack @@ -146,40 +201,65 @@ public class EpResource : DynamicObject, IResource, INotifyPropertyChanged, IDyn /// Resource age. public EpResource(EpConnection connection, uint instanceId, ulong age, string link) { - this.link = link; - this.connection = connection; - this.instanceId = instanceId; - this.age = age; + this._link = link; + this._connection = connection; + this._instanceId = instanceId; + this._age = age; } internal bool _Attach(PropertyValue[] properties) { - if (attached) + if (_status == ResourceStatus.Attached) return false; - else + + _properties = new object[properties.Length]; + + _events = new EpResourceEvent[Instance.Definition.Events.Length]; + + for (byte i = 0; i < properties.Length; i++) { - suspended = false; - - this.properties = new object[properties.Length]; - - this.events = new EpResourceEvent[Instance.Definition.Events.Length]; - - for (byte i = 0; i < properties.Length; i++) - { - Instance.SetAge(i, properties[i].Age); - Instance.SetModificationDate(i, properties[i].Date); - this.properties[i] = properties[i].Value; - } - - // trigger holded events/property updates. - //foreach (var r in afterAttachmentTriggers) - // r.Key.Trigger(r.Value); - - //afterAttachmentTriggers.Clear(); - - attached = true; - + Instance.SetAge(i, properties[i].Age); + Instance.SetModificationDate(i, properties[i].Date); + this._properties[i] = properties[i].Value; } + + // trigger holded events/property updates. + //foreach (var r in afterAttachmentTriggers) + // r.Key.Trigger(r.Value); + + //afterAttachmentTriggers.Clear(); + + _status = Resource.ResourceStatus.Attached; + + + return true; + } + + /// + /// Re-attaches a previously attached (then suspended) resource after reconnection by merging + /// only the properties that changed while disconnected. The peer returns just the delta — the + /// properties whose age is newer than the age this side last knew — so unchanged properties + /// keep their existing value/age/date. Returns false if the resource was never attached (no + /// prior state to merge into), in which case the caller should perform a full attach. + /// + /// Modified properties keyed by their property index. + internal bool _Reattach(Map delta) + { + if (_properties == null || _events == null) + return false; // no prior state — caller should perform a full attach instead. + + foreach (var kv in delta) + { + var index = kv.Key; + if (index >= _properties.Length) + continue; + + Instance.SetAge(index, kv.Value.Age); + Instance.SetModificationDate(index, kv.Value.Date); + _properties[index] = kv.Value.Value; + } + + _status = Resource.ResourceStatus.Attached; return true; } @@ -187,16 +267,17 @@ public class EpResource : DynamicObject, IResource, INotifyPropertyChanged, IDyn protected internal virtual void _EmitEventByIndex(byte index, object args) { var et = Instance.Definition.GetEventDefByIndex(index); - events[index]?.Invoke(this, args); + _events[index]?.Invoke(this, args); Instance.EmitResourceEvent(et, args); } public AsyncReply _Invoke(byte index, object args) { - if (destroyed) + + if (_status == ResourceStatus.Destroyed) throw new Exception("Trying to access a destroyed object."); - if (suspended) + if (_status == ResourceStatus.Suspended) throw new Exception("Trying to access a suspended object."); if (index >= Instance.Definition.Functions.Length) @@ -208,9 +289,9 @@ public class EpResource : DynamicObject, IResource, INotifyPropertyChanged, IDyn throw new Exception("Function definition not found."); if (ft.IsStatic) - return connection.StaticCall(Instance.Definition.Id, index, args); + return _connection.StaticCall(Instance.Definition.Id, index, args); else - return connection.SendInvoke(instanceId, index, args); + return _connection.SendInvoke(_instanceId, index, args); } public AsyncReply Subscribe(EventDef et) @@ -229,7 +310,7 @@ public class EpResource : DynamicObject, IResource, INotifyPropertyChanged, IDyn return rt; } - return connection.SendSubscribeRequest(instanceId, et.Index); + return _connection.SendSubscribeRequest(_instanceId, et.Index); } public AsyncReply Subscribe(string eventName) @@ -244,7 +325,7 @@ public class EpResource : DynamicObject, IResource, INotifyPropertyChanged, IDyn { if (et == null) { - var rt = new AsyncReply(); + var rt = new AsyncReply(); rt.TriggerError(new AsyncException(ErrorType.Management, (ushort)ExceptionCode.MethodNotFound, "")); return rt; } @@ -256,7 +337,7 @@ public class EpResource : DynamicObject, IResource, INotifyPropertyChanged, IDyn return rt; } - return connection.SendUnsubscribeRequest(instanceId, et.Index); + return _connection.SendUnsubscribeRequest(_instanceId, et.Index); } public AsyncReply Unsubscribe(string eventName) @@ -269,61 +350,63 @@ public class EpResource : DynamicObject, IResource, INotifyPropertyChanged, IDyn public override bool TryInvokeMember(InvokeMemberBinder binder, object[] args, out object result) { - if (destroyed) + if (_status == ResourceStatus.Destroyed) throw new Exception("Trying to access a destroyed object."); - if (suspended) + if (_status == ResourceStatus.Suspended) throw new Exception("Trying to access a suspended object."); - var ft = Instance.Definition.GetFunctionDefByName(binder.Name); - var reply = new AsyncReply(); - - if (attached && ft != null) - { - - if (args.Length == 1) - { - // Detect anonymous types - var type = args[0].GetType(); - - - if (Codec.IsAnonymous(type)) - { - var indexedArgs = new Map(); - - var pis = type.GetProperties(); - - for (byte i = 0; i < ft.Arguments.Length; i++) - { - var pi = pis.FirstOrDefault(x => x.Name == ft.Arguments[i].Name); - if (pi != null) - indexedArgs.Add(i, pi.GetValue(args[0])); - } - - result = _Invoke(ft.Index, indexedArgs); - } - else if (args[0] is object[] || args[0] is Map) - { - result = _Invoke(ft.Index, new object[] { args }); - } - else - { - result = _Invoke(ft.Index, args); - } - } - else - { - - result = _Invoke(ft.Index, args); - } - return true; - } - else + if (_status != ResourceStatus.Attached) { result = null; return false; } + + var ft = Instance.Definition.GetFunctionDefByName(binder.Name) + ?? throw new Exception($"{binder.Name} does not exist"); + + var reply = new AsyncReply(); + + + if (args.Length == 1) + { + // Detect anonymous types + var type = args[0].GetType(); + + + if (Codec.IsAnonymous(type)) + { + var indexedArgs = new Map(); + + var pis = type.GetProperties(); + + for (byte i = 0; i < ft.Arguments.Length; i++) + { + var pi = pis.FirstOrDefault(x => x.Name == ft.Arguments[i].Name); + if (pi != null) + indexedArgs.Add(i, pi.GetValue(args[0])); + } + + result = _Invoke(ft.Index, indexedArgs); + } + else if (args[0] is object[] || args[0] is Map) + { + result = _Invoke(ft.Index, new object[] { args }); + } + else + { + result = _Invoke(ft.Index, args); + } + } + else + { + + result = _Invoke(ft.Index, args); + } + + return true; + } ///// @@ -337,34 +420,34 @@ public class EpResource : DynamicObject, IResource, INotifyPropertyChanged, IDyn public bool TryGetPropertyValue(byte index, out object value) { - if (index >= properties.Length) + if (index >= _properties.Length) { value = null; return false; } else { - value = properties[index]; + value = _properties[index]; return true; } } public override bool TryGetMember(GetMemberBinder binder, out object result) { - if (destroyed) + if (_status == ResourceStatus.Destroyed) throw new Exception("Trying to access a destroyed object."); result = null; - if (!attached) + if (_status != ResourceStatus.Attached) return false; var pt = Instance.Definition.GetPropertyDefByName(binder.Name); if (pt != null) { - result = properties[pt.Index]; + result = _properties[pt.Index]; return true; } else @@ -373,7 +456,7 @@ public class EpResource : DynamicObject, IResource, INotifyPropertyChanged, IDyn if (et == null) return false; - result = events[et.Index]; + result = _events[et.Index]; return true; } @@ -383,7 +466,7 @@ public class EpResource : DynamicObject, IResource, INotifyPropertyChanged, IDyn internal void _UpdatePropertyByIndex(byte index, object value) { var pt = Instance.Definition.GetPropertyDefByIndex(index); - properties[index] = value; + _properties[index] = value; Instance.EmitModification(pt, value); } @@ -409,13 +492,13 @@ public class EpResource : DynamicObject, IResource, INotifyPropertyChanged, IDyn public override bool TrySetMember(SetMemberBinder binder, object value) { - if (destroyed) + if (_status == ResourceStatus.Destroyed) throw new Exception("Trying to access a destroyed object."); - if (suspended) + if (_status == ResourceStatus.Suspended) throw new Exception("Trying to access a suspended object."); - if (!attached) + if (_status != ResourceStatus.Attached) return false; var pt = Instance.Definition.GetPropertyDefByName(binder.Name); @@ -431,7 +514,7 @@ public class EpResource : DynamicObject, IResource, INotifyPropertyChanged, IDyn if (et == null) return false; - events[et.Index] = (EpResourceEvent)value; + _events[et.Index] = (EpResourceEvent)value; return true; } @@ -452,11 +535,11 @@ public class EpResource : DynamicObject, IResource, INotifyPropertyChanged, IDyn { get { - return typeDef; + return _typeDef; } internal set { - typeDef = value; + _typeDef = value; } } @@ -494,10 +577,10 @@ public class EpResource : DynamicObject, IResource, INotifyPropertyChanged, IDyn public PropertyValue[] SerializeResource() { - var props = new PropertyValue[properties.Length]; + var props = new PropertyValue[_properties.Length]; - for (byte i = 0; i < properties.Length; i++) - props[i] = new PropertyValue(properties[i], + for (byte i = 0; i < _properties.Length; i++) + props[i] = new PropertyValue(_properties[i], Instance.GetAge(i), Instance.GetModificationDate(i)); @@ -508,9 +591,9 @@ public class EpResource : DynamicObject, IResource, INotifyPropertyChanged, IDyn { var rt = new Map(); - for (byte i = 0; i < properties.Length; i++) + for (byte i = 0; i < _properties.Length; i++) if (Instance.GetAge(i) > age) - rt.Add(i, new PropertyValue(properties[i], + rt.Add(i, new PropertyValue(_properties[i], Instance.GetAge(i), Instance.GetModificationDate(i))); @@ -523,33 +606,33 @@ public class EpResource : DynamicObject, IResource, INotifyPropertyChanged, IDyn public object GetResourceProperty(byte index) { - if (index >= properties.Length) + if (index >= _properties.Length) return null; - return properties[index]; + return _properties[index]; } public AsyncReply SetResourcePropertyAsync(byte index, object value) { - if (destroyed) + if (_status == ResourceStatus.Destroyed) throw new Exception("Trying to access a destroyed object."); - if (suspended) + if (_status == ResourceStatus.Suspended) throw new Exception("Trying to access a suspended object."); - if (!attached) + if (_status != ResourceStatus.Attached) throw new Exception("Resource is not attached."); - if (index >= properties.Length) + if (index >= _properties.Length) throw new Exception("Property index not found."); ; var reply = new AsyncReply(); - connection.SendSetProperty(instanceId, index, value) + _connection.SendSetProperty(_instanceId, index, value) .Then((res) => { // not really needed, server will always send property modified, // this only happens if the programmer forgot to emit in property setter - properties[index] = value; + _properties[index] = value; reply.Trigger(null); }); @@ -560,7 +643,7 @@ public class EpResource : DynamicObject, IResource, INotifyPropertyChanged, IDyn public void SetResourceProperty(byte index, object value) { // Don't set the same current value - if (properties[index] == value) + if (_properties[index] == value) return; SetResourcePropertyAsync(index, value).Wait(); diff --git a/Libraries/Esiur/Resource/ResourceStatus.cs b/Libraries/Esiur/Resource/ResourceStatus.cs new file mode 100644 index 0000000..537b63e --- /dev/null +++ b/Libraries/Esiur/Resource/ResourceStatus.cs @@ -0,0 +1,31 @@ +namespace Esiur.Resource; + +/// +/// Lifecycle state of a distributed (remote) resource on the consuming side. Replaces the former +/// separate attached/suspended/destroyed booleans with a single explicit state machine. +/// +public enum ResourceStatus : byte +{ + /// Created as a placeholder; its properties have not been received yet. + Pending = 0, + + /// + /// Its own properties have been received and merged, but its dependency graph may still be + /// incomplete (e.g. it was used to break a reference cycle). Not yet safe to hand to the + /// application as fully ready. + /// + Attached = 1, + + /// + /// Attached and delivered to the application as part of a fully-attached object graph. This is + /// the only state in which a resource — including every resource it depends on — is guaranteed + /// ready for application use. + /// + Published = 2, + + /// The connection was lost; the resource is awaiting reattachment. + Suspended = 3, + + /// The resource has been detached/destroyed and must not be accessed. + Destroyed = 4, +} diff --git a/Tests/Distribution/Deadlock/Client/Esiur.Tests.Deadlock.Client.csproj b/Tests/Distribution/Deadlock/Client/Esiur.Tests.Deadlock.Client.csproj new file mode 100644 index 0000000..a65d312 --- /dev/null +++ b/Tests/Distribution/Deadlock/Client/Esiur.Tests.Deadlock.Client.csproj @@ -0,0 +1,14 @@ + + + + Exe + net10.0 + enable + enable + + + + + + + diff --git a/Tests/Distribution/Deadlock/Client/Program.cs b/Tests/Distribution/Deadlock/Client/Program.cs new file mode 100644 index 0000000..812eb1f --- /dev/null +++ b/Tests/Distribution/Deadlock/Client/Program.cs @@ -0,0 +1,160 @@ +// ============================================================ +// Distributed deadlock test — CLIENT NODE +// Connects to the server, fetches the resource graph concurrently, and classifies each run as +// COMPLETED, DEADLOCKED, or SLOW using a progress (stall) detector — deadlock is detected as the +// absence of attachment progress while requests are still pending, NOT as a blunt timeout, so it is +// distinguished from slow WAN processing. Reports completion-time distribution, cycle-break and +// unnecessary-placeholder counts, and the published-state of delivered resources. +// +// Usage: +// dotnet run -- --host SERVER_IP --port 10950 --nodes 8 --mode WaitWithCycleDetection --iterations 20 +// dotnet run -- --host SERVER_IP --port 10950 --nodes 4 --roots 0 --mode WaitWithCycleDetection (single-root cycle) +// dotnet run -- --host SERVER_IP --port 10950 --nodes 8 --mode NaiveWait (control: deadlocks) +// Modes: WaitWithCycleDetection (default) | NaiveWait | LegacyCrossChainPlaceholder +// ============================================================ + +using System.Collections; +using System.Diagnostics; +using Esiur.Protocol; +using Esiur.Resource; + +var host = GetArg(args, "--host", "127.0.0.1"); +var port = int.Parse(GetArg(args, "--port", "10950")); +var nodeCount = int.Parse(GetArg(args, "--nodes", "100")); +var modeArg = GetArg(args, "--mode", "NaiveWait"); +var iterations = int.Parse(GetArg(args, "--iterations", "20")); +var stallMs = int.Parse(GetArg(args, "--stall-ms", "5000")); +var hardMs = int.Parse(GetArg(args, "--hard-ms", "60000")); +var rootsArg = GetArg(args, "--roots", "all"); + +if (!Enum.TryParse(modeArg, ignoreCase: true, out var mode)) +{ + Console.WriteLine($"Unknown --mode '{modeArg}'. Use WaitWithCycleDetection | NaiveWait | LegacyCrossChainPlaceholder."); + return; +} + +var roots = rootsArg.Equals("all", StringComparison.OrdinalIgnoreCase) + ? Enumerable.Range(0, nodeCount).Select(i => $"sys/n{i}").ToArray() + : rootsArg.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries) + .Select(s => $"sys/n{int.Parse(s)}").ToArray(); + +Console.WriteLine($"[Client] {host}:{port} nodes={nodeCount} mode={mode} roots={roots.Length} " + + $"iterations={iterations} stallMs={stallMs} hardMs={hardMs}"); +Console.WriteLine($"[Client] {"iter",-5}{"outcome",-14}{"ms",10}{"breaks",10}{"unnec",8}{"unpublished",13}"); + +var rows = new List<(int iter, string outcome, double ms, long breaks, long unnec, int unpublished)>(); + +for (var it = 0; it < iterations; it++) +{ + // Fresh warehouse + connection per iteration so the per-connection counters start at 0. + var wh = new Warehouse(); + EpConnection con; + try { con = await wh.Get($"ep://{host}:{port}"); } + catch (Exception ex) { Console.WriteLine($"[Client] connect failed: {ex.Message}"); return; } + + con.DeadlockResolution = mode; + Console.WriteLine($"[Client] iter {it + 1}: connected, fetching {roots.Length} roots..."); + + var (outcome, ms, results) = await Classify(con, roots, stallMs, hardMs); + var unpublished = results == null ? -1 : CountUnpublished(results); + + rows.Add((it + 1, outcome, ms, con.CycleBreakCount, con.UnnecessaryPlaceholderCount, unpublished)); + Console.WriteLine($"[Client] {it + 1,-5}{outcome,-14}{ms,10:F1}{con.CycleBreakCount,10}{con.UnnecessaryPlaceholderCount,8}{unpublished,13}"); + + try { con.Destroy(); } catch { } +} + +// ---- summary ---------------------------------------------------------------------------------- +var completed = rows.Where(r => r.outcome == "Completed").ToList(); +var times = completed.Select(r => r.ms).OrderBy(x => x).ToList(); +double Pct(double p) => times.Count == 0 ? 0 : times[(int)Math.Min(times.Count - 1, p * times.Count)]; + +Console.WriteLine(); +Console.WriteLine($"[Client] === summary ({mode}) ==="); +Console.WriteLine($" completed={completed.Count} deadlocked={rows.Count(r => r.outcome == "Deadlocked")} " + + $"slow={rows.Count(r => r.outcome == "SlowTimeout")} faulted={rows.Count(r => r.outcome == "Faulted")}"); +Console.WriteLine($" completion ms: median={Pct(0.5):F1} p99={Pct(0.99):F1} max={(times.Count > 0 ? times[^1] : 0):F1}"); +Console.WriteLine($" cycle-breaks total={rows.Sum(r => r.breaks)} unnecessary-placeholders total={rows.Sum(r => r.unnec)}"); +Console.WriteLine($" partial deliveries (unpublished>0) in {rows.Count(r => r.unpublished > 0)}/{rows.Count} runs"); + +var csv = "iteration,outcome,ms,cycle_breaks,unnecessary_placeholders,unpublished\n" + + string.Join("\n", rows.Select(r => $"{r.iter},{r.outcome},{r.ms:F1},{r.breaks},{r.unnec},{r.unpublished}")); +var outFile = $"deadlock_{mode}_{host}_{port}.csv"; +await File.WriteAllTextAsync(outFile, csv); +Console.WriteLine($"[Client] results written to {outFile}"); + +Console.ReadLine(); + +// ---- stall-based classification --------------------------------------------------------------- + +// Fires fetches for all roots and classifies the run. A run is DEADLOCKED when fetches are still +// pending but the connection's attached-resource count has not advanced for stallMs (no progress); +// SLOW if it is still progressing at hardMs; COMPLETED when every fetch resolves. +static async Task<(string outcome, double ms, EpResource[]? results)> Classify( + EpConnection con, string[] roots, int stallMs, int hardMs) +{ + var tasks = roots.Select(p => + { + var tcs = new TaskCompletionSource(); + con.Get(p) + .Then(r => tcs.TrySetResult(r as IResource)) + .Error(ex => { Console.WriteLine($"[Client] Get({p}) error: {ex.Message}"); tcs.TrySetException((Exception)ex); }); + return tcs.Task; + }).ToArray(); + var all = Task.WhenAll(tasks); + + var sw = Stopwatch.StartNew(); + var lastProgress = con.AttachedResourceCount; + var lastProgressMs = 0.0; + + while (true) + { + await Task.WhenAny(all, Task.Delay(25)); + + if (all.IsCompletedSuccessfully) + { + sw.Stop(); + return ("Completed", sw.Elapsed.TotalMilliseconds, all.Result.OfType().ToArray()); + } + if (all.IsFaulted) + { + sw.Stop(); + return ("Faulted", sw.Elapsed.TotalMilliseconds, null); + } + + var progress = con.AttachedResourceCount; + if (progress != lastProgress) { lastProgress = progress; lastProgressMs = sw.Elapsed.TotalMilliseconds; } + + if (sw.Elapsed.TotalMilliseconds - lastProgressMs >= stallMs) { sw.Stop(); return ("Deadlocked", sw.Elapsed.TotalMilliseconds, null); } + if (sw.Elapsed.TotalMilliseconds >= hardMs) { sw.Stop(); return ("SlowTimeout", sw.Elapsed.TotalMilliseconds, null); } + } +} + +// Counts resources reachable from the delivered roots that are not Published — i.e. handed to the +// application while their dependency graph was not fully attached. Links is property index 1. +static int CountUnpublished(EpResource[] roots) +{ + var seen = new HashSet(); + var queue = new Queue(roots); + var unpublished = 0; + + while (queue.Count > 0) + { + var node = queue.Dequeue(); + if (node == null || !seen.Add(node.ResourceInstanceId)) continue; + + if (node.Status != ResourceStatus.Published) unpublished++; + + if (node.Status == ResourceStatus.Attached && node.TryGetPropertyValue((byte)1, out var linksObj) && linksObj is IEnumerable links) + foreach (var child in links) + if (child is EpResource childResource) + queue.Enqueue(childResource); + } + return unpublished; +} + +static string GetArg(string[] args, string key, string def) +{ + var i = Array.IndexOf(args, key); + return (i >= 0 && i + 1 < args.Length) ? args[i + 1] : def; +} diff --git a/Tests/Distribution/Deadlock/README.md b/Tests/Distribution/Deadlock/README.md new file mode 100644 index 0000000..97c231b --- /dev/null +++ b/Tests/Distribution/Deadlock/README.md @@ -0,0 +1,86 @@ +# Distributed deadlock test (two nodes / WAN) + +Two console apps that evaluate the recursive-attachment deadlock-prevention algorithm over a real +TCP connection between two machines: + +- **Server** (`Server/`) hosts a configurable graph of `Node` resources whose references may form + cycles, and prints the *cycle census* of the deployed graph (so the experiment can state that + circular dependencies were actually generated). +- **Client** (`Client/`) connects, fetches the graph concurrently, and classifies each run as + **Completed / Deadlocked / Slow** using a *stall detector* — a deadlock is detected as the absence + of attachment progress while requests are still pending, which distinguishes it from slow WAN + processing rather than relying on a blunt timeout. + +Authentication is disabled (`AllowUnauthorizedAccess`, anonymous `None` mode), so no credentials are +needed. + +## Build + +``` +dotnet build Tests/Distribution/Deadlock/Server/Esiur.Tests.Deadlock.Server.csproj -c Release +dotnet build Tests/Distribution/Deadlock/Client/Esiur.Tests.Deadlock.Client.csproj -c Release +``` + +## Run + +**On node A (server):** +``` +dotnet run --project Tests/Distribution/Deadlock/Server -c Release -- \ + --port 10950 --topology ring --nodes 8 +``` +It prints, e.g.: `topology=ring nodes=8 edges=8 cyclic=True backEdges=1` and the node count to pass +to the client. Leave it running (Ctrl+C to stop). + +Topologies (`--topology`): + +| name | cyclic | description | +|------|:------:|-------------| +| `ring` | yes | `i → (i+1) mod n`; every node fetched as an independent request (cross-chain cycles) | +| `cycle` | yes | single-root cycle `0→1→…→n-1→0` (fetch only `--roots 0`) | +| `complete` | yes | every ordered pair `i → j` | +| `staggered` | no | two roots share a deep dependency reached at different depths (stresses non-cyclic contention; `--nodes` is derived) | +| `random` | usually | Erdős–Rényi directed graph (`--nodes`, `--seed`, `--edge-prob`) | +| `chain` | no | acyclic control `0→1→…→n-1` | +| `diamond` | no | acyclic control | + +**On node B (client):** +``` +dotnet run --project Tests/Distribution/Deadlock/Client -c Release -- \ + --host --port 10950 --nodes 8 \ + --mode WaitWithCycleDetection --iterations 20 --stall-ms 5000 --hard-ms 60000 +``` + +Modes (`--mode`): +- `WaitWithCycleDetection` (default, the production algorithm) — completes; breaks only genuine cycles. +- `NaiveWait` (control) — no cycle handling; **deadlocks** on any cyclic graph (detected via the stall window). +- `LegacyCrossChainPlaceholder` — for reference only. + +Other client options: `--roots all|0,1,2` (which nodes to fetch; default all `n0..n{N-1}`), +`--stall-ms` (no-progress window ⇒ deadlock; set comfortably above your WAN round-trip × graph depth), +`--hard-ms` (progress-but-unfinished ⇒ slow). + +## Output + +The client prints per-iteration rows and a summary, and writes `deadlock___.csv`: + +``` +iteration,outcome,ms,cycle_breaks,unnecessary_placeholders,unpublished +``` + +- `outcome` — `Completed` / `Deadlocked` / `SlowTimeout`. +- `ms` — fetch time (deadlocked rows equal the stall window). +- `cycle_breaks` — placeholders returned to break a cycle on this connection. +- `unnecessary_placeholders` — placeholders returned where no genuine cycle existed (always 0 for the + production resolver; non-zero only for the legacy reference mode). +- `unpublished` — resources delivered to the application whose dependency graph was not fully attached + at delivery (`-1` for a deadlocked/failed run). + +## Suggested WAN runs for the paper + +1. **Detection works and cycles exist.** Server `--topology ring --nodes 8`; client + `--mode WaitWithCycleDetection` (expect all *Completed*, `cycle_breaks > 0`) and then + `--mode NaiveWait` (expect *Deadlocked* — validates the detector on the same cyclic graph). +2. **Random pool census.** Server `--topology random --nodes 12 --seed 20260603`; the server prints + whether the deployed graph is cyclic; run the client in `WaitWithCycleDetection`. +3. **Threshold justification.** Compare the client's reported completion `ms` (median/p99) against + `--stall-ms`; the stall window should be orders of magnitude larger. diff --git a/Tests/Distribution/Deadlock/Server/Esiur.Tests.Deadlock.Server.csproj b/Tests/Distribution/Deadlock/Server/Esiur.Tests.Deadlock.Server.csproj new file mode 100644 index 0000000..a65d312 --- /dev/null +++ b/Tests/Distribution/Deadlock/Server/Esiur.Tests.Deadlock.Server.csproj @@ -0,0 +1,14 @@ + + + + Exe + net10.0 + enable + enable + + + + + + + diff --git a/Tests/Distribution/Deadlock/Server/Node.cs b/Tests/Distribution/Deadlock/Server/Node.cs new file mode 100644 index 0000000..1386fbd --- /dev/null +++ b/Tests/Distribution/Deadlock/Server/Node.cs @@ -0,0 +1,19 @@ +using Esiur.Resource; +using Esiur.Tests.Deadlock.Server; + +/// +/// Resource used to build reference topologies (cycles, cross-references) for the distributed +/// deadlock test. holds references to other nodes; fetching a node transitively +/// fetches its links, which is what exercises EpConnection.FetchResource cycle handling. +/// Property indices are stable: Id = 0, Links = 1. +/// +[Resource] +public partial class Node +{ + [Export] public int Id { get; set; } + + [Export] public Node[]? Links { get; set; } + + [Export] public Resource1[] Resources1 { get; set; } + [Export] public Resource2[] Resources2 { get; set; } +} diff --git a/Tests/Distribution/Deadlock/Server/Program.cs b/Tests/Distribution/Deadlock/Server/Program.cs new file mode 100644 index 0000000..3883e93 --- /dev/null +++ b/Tests/Distribution/Deadlock/Server/Program.cs @@ -0,0 +1,189 @@ +// ============================================================ +// Distributed deadlock test — SERVER NODE +// Hosts a configurable graph of Node resources (sys/n0 .. sys/n{N-1}) whose references can form +// cycles. A client on another node fetches the graph and measures whether the recursive-attachment +// resolver completes or deadlocks. The server prints the cycle census of the deployed graph so the +// experiment can state, for the record, that circular dependencies were actually generated. +// +// Usage: +// dotnet run -- --port 10950 --topology ring --nodes 8 +// dotnet run -- --port 10950 --topology random --nodes 12 --seed 20260603 --edge-prob 0.22 +// dotnet run -- --port 10950 --topology staggered +// Topologies: ring | cycle | chain | diamond | complete | staggered | random +// ============================================================ + +using Esiur.Protocol; +using Esiur.Resource; +using Esiur.Stores; +using Esiur.Tests.Deadlock.Server; + +var port = int.Parse(GetArg(args, "--port", "10950")); +var topology = GetArg(args, "--topology", "ring").ToLowerInvariant(); +var nodeCount = int.Parse(GetArg(args, "--nodes", "100")); +var res1Count = int.Parse(GetArg(args, "--res1", "100")); +var res2Count = int.Parse(GetArg(args, "--res2", "100")); +var seed = int.Parse(GetArg(args, "--seed", "20260603")); +var edgeProb = double.Parse(GetArg(args, "--edge-prob", "0.22")); + +var edges = BuildTopology(topology, ref nodeCount, seed, edgeProb); +var (hasCycle, backEdges) = CycleCensus(nodeCount, edges); + +Console.WriteLine($"[Server] topology={topology} nodes={nodeCount} edges={edges.Count} " + + $"cyclic={hasCycle} backEdges={backEdges} port={port}"); + +var wh = new Warehouse(); +await wh.Put("sys", new MemoryStore()); +// AllowUnauthorizedAccess enables anonymous (None-mode) connections so the test needs no +// credentials — the deadlock behaviour under study is independent of authentication. +var server = await wh.Put("sys/server", new EpServer { Port = (ushort)port, AllowUnauthorizedAccess = true }); + +var nodes = new Node[nodeCount]; +var resources1 = new Resource1[res1Count]; +var resources2 = new Resource2[res2Count]; + +for (var i = 0; i < nodeCount; i++) { + nodes[i] = new Node { Id = i }; + await wh.Put($"sys/n{i}", nodes[i]); +} + +for (var i = 0; i < res1Count; i++) +{ + resources1[i] = new Resource1(); + await wh.Put($"sys/r1_{i}", resources1[i]); +} + +for (var i = 0; i < res2Count; i++) +{ + resources2[i] = new Resource2(); + await wh.Put($"sys/r2_{i}", resources2[i]); +} + +// randomly assign some resources to each node so the fetches do some work beyond just traversing the links; this also +for(var i = 0; i < nodeCount; i++) +{ + var rng = new Random(seed); + + + nodes[i].Resources1 = rng.GetItems(resources1, res1Count / 2); + nodes[i].Resources2 = rng.GetItems(resources2, res2Count / 2); +} + +for(var i =0; i < res1Count; i++) +{ + var rng = new Random(seed); + var res1Index = rng.Next(res1Count); + var res2Index = rng.Next(res2Count); + resources1[i].res1 = resources1[res1Index]; + resources1[i].res2 = resources2[res2Index]; +} + +for (var i = 0; i < res2Count; i++) +{ + var rng = new Random(seed); + var res1Index = rng.Next(res1Count); + var res2Index = rng.Next(res2Count); + resources2[i].res1 = resources1[res1Index]; + resources2[i].res2 = resources2[res2Index]; +} + +foreach (var grp in edges.GroupBy(e => e.from)) + nodes[grp.Key].Links = grp.Select(e => nodes[e.to]).ToArray(); + +await wh.Open(); + +Console.WriteLine($"[Server] Listening on port {port}. Hosting {nodeCount} nodes: sys/n0 .. sys/n{nodeCount - 1}."); +Console.WriteLine($"[Server] The deployed request graph {(hasCycle ? "CONTAINS circular dependencies" : "is acyclic")} " + + $"({backEdges} cycle-closing edge(s))."); +Console.WriteLine($"[Server] Point the client at this host:port with --nodes {nodeCount}. Press Ctrl+C to stop."); + +// Stay up until Ctrl+C (works whether or not stdin is interactive / redirected). +var stop = new TaskCompletionSource(); +Console.CancelKeyPress += (_, e) => { e.Cancel = true; stop.TrySetResult(); }; +await stop.Task; +await wh.Close(); + + +// ---- topology + cycle census ------------------------------------------------------------- + +static List<(int from, int to)> BuildTopology(string topo, ref int n, int seed, double edgeProb) +{ + var edges = new List<(int, int)>(); + switch (topo) + { + case "ring": // i -> (i+1) mod n; every node a root + for (var i = 0; i < n; i++) edges.Add((i, (i + 1) % n)); + break; + case "cycle": // single-root cycle 0->1->..->n-1->0 + for (var i = 0; i < n - 1; i++) edges.Add((i, i + 1)); + edges.Add((n - 1, 0)); + break; + case "chain": // acyclic control + for (var i = 0; i < n - 1; i++) edges.Add((i, i + 1)); + break; + case "diamond": // acyclic control: 0->1,0->2,1->3,2->3 + n = Math.Max(n, 4); + edges.AddRange(new[] { (0, 1), (0, 2), (1, 3), (2, 3) }); + break; + case "complete": // every ordered pair + for (var i = 0; i < n; i++) for (var j = 0; j < n; j++) if (i != j) edges.Add((i, j)); + break; + case "staggered": // X (0) and Y (1) share S; Y reaches S late; no cycle + { + var e = new List<(int, int)>(); + var next = 2; + int Chain(int from, int depth) { for (var d = 0; d < depth; d++) { e.Add((from, next)); from = next; next++; } return from; } + var xTail = Chain(0, 0); // X reaches S immediately + var yTail = Chain(1, 3); // Y reaches S through a 3-hop chain + var shared = next++; + e.Add((xTail, shared)); e.Add((yTail, shared)); + Chain(shared, 3); // S has its own deep chain + n = next; + return e; + } + case "random": // Erdos-Renyi directed graph, fixed seed + { + var rng = new Random(seed); + for (var i = 0; i < n; i++) for (var j = 0; j < n; j++) if (i != j && rng.NextDouble() < edgeProb) edges.Add((i, j)); + break; + } + default: + throw new ArgumentException($"Unknown topology '{topo}'. Use ring|cycle|chain|diamond|complete|staggered|random."); + } + return edges; +} + +// DFS three-colouring; counts back edges (cycle-closing edges, including self loops). +static (bool hasCycle, int backEdges) CycleCensus(int n, IReadOnlyList<(int from, int to)> edges) +{ + var adj = new List[n]; + for (var i = 0; i < n; i++) adj[i] = new List(); + var back = 0; + foreach (var (a, b) in edges) { if (a == b) back++; else adj[a].Add(b); } + + var color = new byte[n]; // 0 unvisited, 1 on-stack, 2 done + for (var s = 0; s < n; s++) + { + if (color[s] != 0) continue; + var stack = new Stack<(int node, int idx)>(); + stack.Push((s, 0)); color[s] = 1; + while (stack.Count > 0) + { + var (u, idx) = stack.Pop(); + if (idx < adj[u].Count) + { + stack.Push((u, idx + 1)); + var v = adj[u][idx]; + if (color[v] == 1) back++; + else if (color[v] == 0) { color[v] = 1; stack.Push((v, 0)); } + } + else color[u] = 2; + } + } + return (back > 0, back); +} + +static string GetArg(string[] args, string key, string def) +{ + var i = Array.IndexOf(args, key); + return (i >= 0 && i + 1 < args.Length) ? args[i + 1] : def; +} diff --git a/Tests/Distribution/Deadlock/Server/Resource1.cs b/Tests/Distribution/Deadlock/Server/Resource1.cs new file mode 100644 index 0000000..4555350 --- /dev/null +++ b/Tests/Distribution/Deadlock/Server/Resource1.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Esiur.Protocol; +using Esiur.Resource; + +namespace Esiur.Tests.Deadlock.Server +{ + [Resource] + public partial class Resource1 + { + [Export] public Resource1 res1; + [Export] public Resource2 res2; + } +} diff --git a/Tests/Distribution/Deadlock/Server/Resource2.cs b/Tests/Distribution/Deadlock/Server/Resource2.cs new file mode 100644 index 0000000..30754a2 --- /dev/null +++ b/Tests/Distribution/Deadlock/Server/Resource2.cs @@ -0,0 +1,14 @@ +using Esiur.Resource; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Esiur.Tests.Deadlock.Server +{ + [Resource] + public partial class Resource2 + { + [Export] public Resource1 res1; + [Export] public Resource2 res2; + } +} diff --git a/Tests/Unit/FetchCycleDetectionTests.cs b/Tests/Unit/FetchCycleDetectionTests.cs new file mode 100644 index 0000000..c9ae6b8 --- /dev/null +++ b/Tests/Unit/FetchCycleDetectionTests.cs @@ -0,0 +1,86 @@ +using System.Collections.Generic; +using Esiur.Protocol; + +namespace Esiur.Tests.Unit; + +/// +/// Unit tests for EpConnection.HasWaitForCycle — the pure decision function that decides whether +/// waiting for an in-flight resource fetch would deadlock (and a placeholder must break the cycle) +/// versus being safe to wait for full attachment. This is the heart of the cross-chain fix. +/// +public class FetchCycleDetectionTests +{ + static Dictionary> Graph(params (uint parent, uint[] children)[] edges) + { + var g = new Dictionary>(); + foreach (var (parent, children) in edges) + g[parent] = new HashSet(children); + return g; + } + + [Fact] + public void AppFacingFetch_NoChain_NeverCycles() + { + // requestSequence == null marks an application-facing fetch: it must always wait, never + // receive a placeholder, regardless of the wait-for graph. + var g = Graph((1u, new uint[] { 2 }), (2u, new uint[] { 1 })); + Assert.False(EpConnection.HasWaitForCycle(2, null, g)); + Assert.False(EpConnection.HasWaitForCycle(2, new uint[0], g)); + } + + [Fact] + public void NoBlocking_IsNotCyclic() + { + var g = Graph(); + Assert.False(EpConnection.HasWaitForCycle(2, new uint[] { 1 }, g)); + } + + [Fact] + public void IndependentInFlight_IsNotCyclic() + { + // Chain [1] fetching 2; 2 is blocked on 3 (an unrelated resource). No path back to chain. + var g = Graph((2u, new uint[] { 3 })); + Assert.False(EpConnection.HasWaitForCycle(2, new uint[] { 1 }, g)); + } + + [Fact] + public void MutualCrossChain_IsCyclic() + { + // Two concurrent fetches: 1 is blocked on 2, and 2 is blocked on 1. Chain [1] now wants 2. + // Waiting would deadlock, so this must be reported as a cycle. + var g = Graph((1u, new uint[] { 2 }), (2u, new uint[] { 1 })); + Assert.True(EpConnection.HasWaitForCycle(2, new uint[] { 1 }, g)); + } + + [Fact] + public void TransitiveCycle_IsDetected() + { + // Chain [1] wants 2; 2 -> 3 -> 1 leads back into the chain. + var g = Graph((2u, new uint[] { 3 }), (3u, new uint[] { 1 })); + Assert.True(EpConnection.HasWaitForCycle(2, new uint[] { 1 }, g)); + } + + [Fact] + public void ParallelChildren_OnlyOneClosesCycle() + { + // 2 is blocked on several children; only one (5) leads back to the chain root. + var g = Graph((2u, new uint[] { 3, 4, 5 }), (5u, new uint[] { 1 })); + Assert.True(EpConnection.HasWaitForCycle(2, new uint[] { 1, 9 }, g)); + } + + [Fact] + public void DeeperChain_BackEdgeToAncestor_IsCyclic() + { + // Current chain is [1,2,3]; fetching 4 which is blocked on 2 (an ancestor) -> cycle. + var g = Graph((4u, new uint[] { 2 })); + Assert.True(EpConnection.HasWaitForCycle(4, new uint[] { 1, 2, 3 }, g)); + } + + [Fact] + public void SelfReferentialGraph_DoesNotInfiniteLoop() + { + // Defensive: a self-loop / disjoint cycle that never reaches the chain must terminate. + var g = Graph((2u, new uint[] { 3 }), (3u, new uint[] { 2 })); + Assert.False(EpConnection.HasWaitForCycle(2, new uint[] { 1 }, g)); + } +} diff --git a/Tests/Unit/Integration/DeadlockDetectionTests.cs b/Tests/Unit/Integration/DeadlockDetectionTests.cs new file mode 100644 index 0000000..14a2fff --- /dev/null +++ b/Tests/Unit/Integration/DeadlockDetectionTests.cs @@ -0,0 +1,268 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using Esiur.Core; +using Esiur.Misc; +using Esiur.Protocol; +using Esiur.Resource; +using Xunit.Abstractions; + +namespace Esiur.Tests.Unit.Integration; + +/// +/// Answers the methodological questions a deadlock-prevention experiment must address: +/// (a) the timeout / detection thresholds, justified against the measured completion-time +/// distribution; +/// (b) how a deadlock is detected as distinct from slow processing — via a progress (stall) +/// detector, validated by a NaiveWait resolver that genuinely deadlocks on cycles; +/// (c) that circular dependencies are actually present in the (randomly generated) request pool — +/// counted by static cycle detection (DFS) and by the resolver's cycle-break operations. +/// +[Collection("Integration")] +public class DeadlockDetectionTests +{ + readonly ITestOutputHelper _out; + public DeadlockDetectionTests(ITestOutputHelper output) => _out = output; + + // ---- detection thresholds (reported in the paper) -------------------------------------- + // A run is a DEADLOCK if no resource attaches for StallMs while fetches are still pending; it is + // SLOW (not deadlock) if it is still making progress at HardTimeoutMs. StallMs is ~3 orders of + // magnitude above the observed completion time, so a stall is unambiguous. + const int StallMs = 1500; + const int HardTimeoutMs = 15000; + const int PollMs = 25; + + enum Outcome { Completed, Deadlocked, SlowTimeout, Faulted } + + static long Counter(string name) => Global.Counters.Contains(name) ? Global.Counters[name] : 0; + + static async Task StartGraph(int nodes, IEnumerable<(int from, int to)> edges, DeadlockResolutionMode mode) + { + var edgeList = edges.ToArray(); + var cluster = await IntegrationCluster.StartAsync(async wh => + { + var ns = new Node[nodes]; + for (var i = 0; i < nodes; i++) { ns[i] = new Node { Id = i }; await wh.Put($"sys/n{i}", ns[i]); } + foreach (var grp in edgeList.GroupBy(e => e.from)) + ns[grp.Key].Links = grp.Select(e => ns[e.to]).ToArray(); + }); + cluster.Connection.DeadlockResolution = mode; + return cluster; + } + + // Fires fetches for all roots and classifies the run using the progress (stall) detector. + // Uses per-connection counters (each run has a fresh connection) so progress and cycle-break + // measurements are free of cross-connection contamination from the shared Global.Counters. + async Task<(Outcome outcome, double ms, long cycleBreaks)> Classify(IntegrationCluster cluster, int[] roots) + { + var connection = cluster.Connection; + + var tasks = roots.Select(r => + { + var tcs = new TaskCompletionSource(); + connection.Get($"sys/n{r}") + .Then(_ => tcs.TrySetResult(true)) + .Error(ex => tcs.TrySetException((Exception)ex)); + return tcs.Task; + }).ToArray(); + var all = Task.WhenAll(tasks); + + var sw = Stopwatch.StartNew(); + var lastProgress = connection.AttachedResourceCount; + var lastProgressMs = 0.0; + + while (true) + { + await Task.WhenAny(all, Task.Delay(PollMs)); + + if (all.IsCompletedSuccessfully) + { + sw.Stop(); + return (Outcome.Completed, sw.Elapsed.TotalMilliseconds, connection.CycleBreakCount); + } + if (all.IsFaulted) + { + sw.Stop(); + return (Outcome.Faulted, sw.Elapsed.TotalMilliseconds, 0); + } + + var progress = connection.AttachedResourceCount; + if (progress != lastProgress) { lastProgress = progress; lastProgressMs = sw.Elapsed.TotalMilliseconds; } + + var sinceProgress = sw.Elapsed.TotalMilliseconds - lastProgressMs; + if (sinceProgress >= StallMs) // pending, but no resource attached for the stall window + { + sw.Stop(); + return (Outcome.Deadlocked, sw.Elapsed.TotalMilliseconds, 0); + } + if (sw.Elapsed.TotalMilliseconds >= HardTimeoutMs) // still progressing but not done + { + sw.Stop(); + return (Outcome.SlowTimeout, sw.Elapsed.TotalMilliseconds, 0); + } + } + } + + // ---- (b) deadlock is real and detectable, distinct from slow ---------------------------- + + public static IEnumerable DemoTopologies() => new[] + { + new object[] { "acyclic chain", 5, new[]{ (0,1),(1,2),(2,3),(3,4) }, new[]{0}, false }, + new object[] { "acyclic diamond", 4, new[]{ (0,1),(0,2),(1,3),(2,3) }, new[]{0}, false }, + new object[] { "single-root 4-cycle", 4, new[]{ (0,1),(1,2),(2,3),(3,0) }, new[]{0}, true }, + new object[] { "concurrent ring x3", 3, new[]{ (0,1),(1,2),(2,0) }, new[]{0,1,2}, true }, + }; + + [Theory] + [MemberData(nameof(DemoTopologies))] + public async Task NaiveWait_Deadlocks_On_Cycles_While_Resolvers_Complete( + string name, int nodes, (int, int)[] edges, int[] roots, bool hasCycle) + { + // NaiveWait (no cycle handling): must deadlock iff the graph has a cycle. + await using (var c = await StartGraph(nodes, edges, DeadlockResolutionMode.NaiveWait)) + { + var (outcome, ms, _) = await Classify(c, roots); + _out.WriteLine($"[NaiveWait] {name}: {outcome} in {ms:F0} ms"); + Assert.Equal(hasCycle ? Outcome.Deadlocked : Outcome.Completed, outcome); + } + + // Both production resolvers must complete regardless of cycles. + foreach (var mode in new[] { DeadlockResolutionMode.LegacyCrossChainPlaceholder, DeadlockResolutionMode.WaitWithCycleDetection }) + { + await using var c = await StartGraph(nodes, edges, mode); + var (outcome, ms, breaks) = await Classify(c, roots); + _out.WriteLine($"[{mode}] {name}: {outcome} in {ms:F1} ms, cycle-breaks={breaks}"); + Assert.Equal(Outcome.Completed, outcome); + } + } + + // ---- (c) circular dependencies in a random request pool --------------------------------- + + // Static cycle detection over a directed graph (DFS three-colouring). Returns whether any cycle + // exists and the number of back edges (cycle-closing edges, including self loops). + static bool HasCycle(int n, IReadOnlyList<(int from, int to)> edges, out int backEdges) + { + var adj = new List[n]; + for (var i = 0; i < n; i++) adj[i] = new List(); + var back = 0; + foreach (var (a, b) in edges) + { + if (a == b) back++; // self loop + else adj[a].Add(b); + } + + var color = new byte[n]; // 0 = unvisited, 1 = on stack, 2 = done + var stack = new Stack<(int node, int idx)>(); + + for (var s = 0; s < n; s++) + { + if (color[s] != 0) continue; + stack.Push((s, 0)); + color[s] = 1; + while (stack.Count > 0) + { + var (u, idx) = stack.Pop(); + if (idx < adj[u].Count) + { + stack.Push((u, idx + 1)); + var v = adj[u][idx]; + if (color[v] == 1) back++; // back edge -> cycle + else if (color[v] == 0) { color[v] = 1; stack.Push((v, 0)); } + } + else color[u] = 2; + } + } + + backEdges = back; + return back > 0; + } + + static (int, int)[] RandomGraph(int n, double edgeProbability, Random rng) + { + var edges = new List<(int, int)>(); + for (var i = 0; i < n; i++) + for (var j = 0; j < n; j++) + if (i != j && rng.NextDouble() < edgeProbability) + edges.Add((i, j)); + return edges.ToArray(); + } + + [Fact] + public async Task RandomRequestPool_ContainsCycles_And_Resolves_Without_Deadlock() + { + const int graphs = 40; + const int nodes = 8; + const double edgeProbability = 0.22; + var rng = new Random(20260603); // fixed seed -> reproducible pool + + int graphsWithCycles = 0, totalBackEdges = 0; + int completed = 0, deadlocked = 0, slow = 0; + long totalCycleBreaks = 0; + var times = new List(); + + for (var g = 0; g < graphs; g++) + { + var edges = RandomGraph(nodes, edgeProbability, rng); + if (HasCycle(nodes, edges, out var backEdges)) { graphsWithCycles++; totalBackEdges += backEdges; } + + await using var cluster = await StartGraph(nodes, edges, DeadlockResolutionMode.WaitWithCycleDetection); + var (outcome, ms, breaks) = await Classify(cluster, Enumerable.Range(0, nodes).ToArray()); + totalCycleBreaks += breaks; + switch (outcome) + { + case Outcome.Completed: completed++; times.Add(ms); break; + case Outcome.Deadlocked: deadlocked++; break; + case Outcome.SlowTimeout: slow++; break; + } + } + + EmitDetectionReport(graphs, nodes, edgeProbability, graphsWithCycles, totalBackEdges, + totalCycleBreaks, completed, deadlocked, slow, times); + + // (c) the random pool must actually contain circular dependencies, otherwise the experiment + // would not exercise the mechanism at all. + Assert.True(graphsWithCycles > 0, "random request pool contained no circular dependencies"); + // and the new resolver must resolve every one of them without deadlock. + Assert.Equal(0, deadlocked); + Assert.Equal(0, slow); + } + + void EmitDetectionReport(int graphs, int nodes, double edgeProb, int graphsWithCycles, int backEdges, + long cycleBreaks, int completed, int deadlocked, int slow, List times) + { + times.Sort(); + double Pct(double p) => times.Count == 0 ? 0 : times[(int)Math.Min(times.Count - 1, p * times.Count)]; + + var sb = new System.Text.StringBuilder(); + sb.AppendLine("# Esiur deadlock detection — methodology and random-pool census"); + sb.AppendLine(); + sb.AppendLine($"Generated: {DateTime.UtcNow:yyyy-MM-dd HH:mm} UTC"); + sb.AppendLine(); + sb.AppendLine("## (a) Detection thresholds"); + sb.AppendLine($"- Stall window (no-progress => deadlock): **{StallMs} ms**"); + sb.AppendLine($"- Hard timeout (progress but unfinished => slow): **{HardTimeoutMs} ms**"); + sb.AppendLine($"- Observed completion time over {times.Count} successful runs: " + + $"median **{Pct(0.5):F1} ms**, p99 **{Pct(0.99):F1} ms**, max **{(times.Count > 0 ? times[^1] : 0):F1} ms**."); + sb.AppendLine($" The stall window is ~{(times.Count > 0 && Pct(0.5) > 0 ? StallMs / Pct(0.5) : 0):F0}x the median completion time, so a stall is unambiguously a deadlock, not slow processing."); + sb.AppendLine(); + sb.AppendLine("## (b) Deadlock detection"); + sb.AppendLine("A run is classified DEADLOCKED when fetches remain pending yet the progress counter"); + sb.AppendLine("(resources attached) does not advance for the stall window. Validated by the NaiveWait"); + sb.AppendLine("resolver, which genuinely deadlocks on cyclic graphs and is detected as such."); + sb.AppendLine(); + sb.AppendLine("## (c) Random request pool — circular-dependency census"); + sb.AppendLine($"- Pool: {graphs} random directed graphs, {nodes} nodes each, edge probability {edgeProb:F2}, fixed seed."); + sb.AppendLine($"- Graphs containing >=1 cycle (static DFS): **{graphsWithCycles}/{graphs}** ({100.0 * graphsWithCycles / graphs:F0}%), {backEdges} cycle-closing edges total."); + sb.AppendLine($"- Cycle-break operations performed by the resolver: **{cycleBreaks}** (circular dependencies actually exercised)."); + sb.AppendLine($"- Outcomes (new resolver): completed **{completed}**, deadlocked **{deadlocked}**, slow **{slow}**."); + + var report = sb.ToString(); + _out.WriteLine(report); + var path = Path.Combine(AppContext.BaseDirectory, "deadlock-detection.md"); + File.WriteAllText(path, report); + _out.WriteLine($"Report written to: {path}"); + } +} diff --git a/Tests/Unit/Integration/DeadlockIntegrationTests.cs b/Tests/Unit/Integration/DeadlockIntegrationTests.cs new file mode 100644 index 0000000..ab54473 --- /dev/null +++ b/Tests/Unit/Integration/DeadlockIntegrationTests.cs @@ -0,0 +1,446 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using Esiur.Core; +using Esiur.Misc; +using Esiur.Protocol; +using Esiur.Resource; +using Xunit.Abstractions; + +namespace Esiur.Tests.Unit.Integration; + +/// +/// End-to-end deadlock tests for EpConnection.FetchResource over a real loopback connection. +/// Builds a range of reference topologies (self-loop, cycles of increasing length, concurrent +/// cross-chain cycles, diamonds, dense graphs) and asserts, for every one, that the fetch +/// completes without deadlock (a timeout would indicate one) and that every resource delivered to +/// the application is fully attached (the cross-chain bug delivered partially-attached resources). +/// Per-topology statistics are collected from the protocol counters and written to a report. +/// +[Collection("Integration")] +public class DeadlockIntegrationTests +{ + readonly ITestOutputHelper _out; + public DeadlockIntegrationTests(ITestOutputHelper output) => _out = output; + + const int Timeout = 15000; + + // ---- async + counter helpers ----------------------------------------------------------- + + static Task ToTask(AsyncReply reply) + { + var tcs = new TaskCompletionSource(); + reply.Then(v => tcs.TrySetResult(v)).Error(ex => tcs.TrySetException((Exception)ex)); + return tcs.Task; + } + + static async Task WithTimeout(Task task, int ms = Timeout) + { + if (await Task.WhenAny(task, Task.Delay(ms)) != task) + throw new TimeoutException("Operation timed out — possible deadlock."); + return await task; + } + + static long Counter(string name) + => Global.Counters.Contains(name) ? Global.Counters[name] : 0; + + // ---- topology model -------------------------------------------------------------------- + + record Topology(string Name, int Nodes, (int From, int To)[] Edges, int[] FetchRoots, bool Concurrent); + + static IEnumerable Topologies() => new[] + { + new Topology("self-loop", 1, new[]{ (0,0) }, new[]{0}, false), + new Topology("2-cycle", 2, new[]{ (0,1),(1,0) }, new[]{0}, false), + new Topology("3-cycle", 3, new[]{ (0,1),(1,2),(2,0) }, new[]{0}, false), + new Topology("4-cycle", 4, new[]{ (0,1),(1,2),(2,3),(3,0) }, new[]{0}, false), + new Topology("cross-chain x2", 2, new[]{ (0,1),(1,0) }, new[]{0,1}, true), + new Topology("cross-chain x3", 3, new[]{ (0,1),(1,2),(2,0) }, new[]{0,1,2}, true), + new Topology("diamond", 4, new[]{ (0,1),(0,2),(1,3),(2,3) }, new[]{0}, false), + new Topology("figure-8", 4, new[]{ (0,1),(1,0),(1,2),(2,3),(3,1) }, new[]{0}, false), + new Topology("complete-4", 4, AllPairs(4), new[]{0}, false), + new Topology("complete-4 concur",4, AllPairs(4), new[]{0,1,2,3}, true), + }; + + // Topologies for the legacy-vs-new comparison. The fan-in cases have many roots referencing a + // single shared resource whose own dependency chain is deep: while that shared resource is + // attaching its chain, the other concurrent fetchers reach it, and the legacy resolver hands + // each of them the not-yet-attached placeholder (the bug), whereas the new resolver waits. + static IEnumerable ComparisonTopologies() => new[] + { + new Topology("single-root 4-cycle (control)", 4, new[]{ (0,1),(1,2),(2,3),(3,0) }, new[]{0}, false), + Cycle("cross-chain ring x3", 3), + // Staggered shared dependency (no cycle): X reaches the shared node S immediately while Y + // reaches it through a chain, arriving during S's own deep-chain attach window. The legacy + // resolver hands Y the not-yet-attached placeholder S (unnecessary — there is no cycle); the + // new resolver waits for S to finish attaching. + Staggered("staggered shared-dep", leadDepth: 0, lagDepth: 3, sharedDepth: 3), + Staggered("staggered shared-dep (deep)", leadDepth: 0, lagDepth: 4, sharedDepth: 4), + }; + + // An N-node ring (i -> i+1, last -> 0), every node fetched concurrently. + static Topology Cycle(string name, int n) + { + var edges = new (int, int)[n]; + for (var i = 0; i < n; i++) edges[i] = (i, (i + 1) % n); + return new Topology(name, n, edges, Enumerable.Range(0, n).ToArray(), true); + } + + // X (root 0) and Y (root 1) both depend on a shared node S. X reaches S through a chain of + // length `leadDepth`, Y through a chain of length `lagDepth` (make lag > lead so Y arrives at S + // later). S itself starts a chain of length `sharedDepth`, widening the window during which S is + // attaching and another fetcher can be handed a placeholder. No cycle exists. + static Topology Staggered(string name, int leadDepth, int lagDepth, int sharedDepth) + { + var edges = new List<(int, int)>(); + var next = 2; + int Chain(int from, int depth) + { + for (var d = 0; d < depth; d++) { edges.Add((from, next)); from = next; next++; } + return from; // tail + } + + var xTail = Chain(0, leadDepth); // X = 0 + var yTail = Chain(1, lagDepth); // Y = 1 + var shared = next++; // S + edges.Add((xTail, shared)); + edges.Add((yTail, shared)); + Chain(shared, sharedDepth); // S -> deep chain + + return new Topology(name, next, edges.ToArray(), new[] { 0, 1 }, true); + } + + static (int, int)[] AllPairs(int n) + { + var edges = new List<(int, int)>(); + for (var i = 0; i < n; i++) + for (var j = 0; j < n; j++) + if (i != j) edges.Add((i, j)); + return edges.ToArray(); + } + + // ---- graph attach verification --------------------------------------------------------- + + // Walks the client-side object graph reachable from the fetched roots and returns whether + // every node is fully attached, plus the number of distinct nodes reached. + static (bool allAttached, int reached) VerifyGraph(IEnumerable roots) + { + var seen = new HashSet(); + var queue = new Queue(roots); + var allAttached = true; + + while (queue.Count > 0) + { + var node = queue.Dequeue(); + if (node == null || !seen.Add(node.ResourceInstanceId)) + continue; + + if (node.Status != Resource.ResourceStatus.Attached) + { + allAttached = false; + continue; // do not traverse into a partially attached node + } + + // property index 1 == Links (Id is index 0) + if (node.TryGetPropertyValue((byte)1, out var linksObj) && linksObj is IEnumerable links) + foreach (var child in links) + if (child is EpResource childResource) + queue.Enqueue(childResource); + } + + return (allAttached, seen.Count); + } + + // ---- per-topology run ------------------------------------------------------------------ + + record StatRow(string Topology, int Nodes, int Reached, long SameChain, long CrossChain, + long Waits, long CacheHits, double Ms, bool AllAttached, bool Deadlock); + + async Task RunTopology(Topology topo) + { + await using var cluster = await IntegrationCluster.StartAsync(async wh => + { + var nodes = new Node[topo.Nodes]; + for (var i = 0; i < topo.Nodes; i++) + { + nodes[i] = new Node { Id = i }; + await wh.Put($"sys/n{i}", nodes[i]); + } + + foreach (var group in topo.Edges.GroupBy(e => e.From)) + nodes[group.Key].Links = group.Select(e => nodes[e.To]).ToArray(); + }); + + var c0 = (same: Counter("EpResourceDeadLockSameChain"), + cross: Counter("EpResourceDeadLockCrossChain"), + wait: Counter("EpResourcePendingCacheHit"), + hit: Counter("EpResourceAttachedCacheHit")); + + var sw = Stopwatch.StartNew(); + var deadlock = false; + var reached = 0; + var allAttached = false; + + try + { + var fetchTasks = topo.FetchRoots + .Select(r => ToTask(cluster.Connection.Get($"sys/n{r}"))) + .ToArray(); + + if (!topo.Concurrent) + { + // sequential roots (usually a single root) + foreach (var t in fetchTasks) + await WithTimeout(t); + } + + var results = await WithTimeout(Task.WhenAll(fetchTasks)); + sw.Stop(); + + (allAttached, reached) = VerifyGraph(results.Cast()); + } + catch (TimeoutException) + { + sw.Stop(); + deadlock = true; + } + + return new StatRow(topo.Name, topo.Nodes, reached, + Counter("EpResourceDeadLockSameChain") - c0.same, + Counter("EpResourceDeadLockCrossChain") - c0.cross, + Counter("EpResourcePendingCacheHit") - c0.wait, + Counter("EpResourceAttachedCacheHit") - c0.hit, + sw.Elapsed.TotalMilliseconds, allAttached, deadlock); + } + + // ---- tests ----------------------------------------------------------------------------- + + [Fact] + public async Task DeadlockMatrix_AllTopologies() + { + var rows = new List(); + + foreach (var topo in Topologies()) + { + var row = await RunTopology(topo); + rows.Add(row); + + Assert.False(row.Deadlock, $"{topo.Name}: fetch deadlocked (timed out)"); + Assert.True(row.AllAttached, $"{topo.Name}: a partially-attached resource reached the application"); + Assert.True(row.Reached >= topo.Nodes, $"{topo.Name}: expected to reach {topo.Nodes} nodes, reached {row.Reached}"); + } + + EmitReport(rows); + } + + [Theory] + [InlineData(1)] + [InlineData(2)] + [InlineData(4)] + [InlineData(8)] + [InlineData(16)] + public async Task Concurrency_Sweep_CyclicGraph(int concurrency) + { + // A 4-node cycle fetched by N concurrent application requests for all four roots. Stresses + // the wait-for/cycle-break paths under contention; all requests must complete and attach. + await using var cluster = await IntegrationCluster.StartAsync(async wh => + { + var nodes = new Node[4]; + for (var i = 0; i < 4; i++) + { + nodes[i] = new Node { Id = i }; + await wh.Put($"sys/n{i}", nodes[i]); + } + for (var i = 0; i < 4; i++) + nodes[i].Links = new[] { nodes[(i + 1) % 4] }; + }); + + var sw = Stopwatch.StartNew(); + var tasks = Enumerable.Range(0, concurrency) + .SelectMany(_ => Enumerable.Range(0, 4).Select(r => ToTask(cluster.Connection.Get($"sys/n{r}")))) + .ToArray(); + + var results = await WithTimeout(Task.WhenAll(tasks), 30000); + sw.Stop(); + + var (allAttached, _) = VerifyGraph(results.Cast()); + Assert.True(allAttached, $"concurrency {concurrency}: a partially-attached resource was delivered"); + + _out.WriteLine($"concurrency={concurrency,2} requests={tasks.Length,3} time={sw.Elapsed.TotalMilliseconds,8:F1} ms " + + $"throughput={tasks.Length / sw.Elapsed.TotalSeconds,7:F0} req/s"); + } + + // ---- legacy vs new comparison ---------------------------------------------------------- + + // Counts resources reachable from the delivered roots that are NOT published — i.e. handed to + // the application while their own dependency graph is not fully attached. + static int CountUnpublished(IEnumerable roots) + { + var seen = new HashSet(); + var queue = new Queue(roots); + var unpublished = 0; + + while (queue.Count > 0) + { + var node = queue.Dequeue(); + if (node == null || !seen.Add(node.ResourceInstanceId)) + continue; + + if (node.Status != ResourceStatus.Published) + unpublished++; + + if ((node.Status == ResourceStatus.Attached) && node.TryGetPropertyValue((byte)1, out var linksObj) && linksObj is IEnumerable links) + foreach (var child in links) + if (child is EpResource childResource) + queue.Enqueue(childResource); + } + + return unpublished; + } + + async Task<(bool deadlock, int unnecessaryPlaceholders)> RunForCompare(Topology topo, bool legacy) + { + await using var cluster = await IntegrationCluster.StartAsync(async wh => + { + var nodes = new Node[topo.Nodes]; + for (var i = 0; i < topo.Nodes; i++) + { + nodes[i] = new Node { Id = i }; + await wh.Put($"sys/n{i}", nodes[i]); + } + foreach (var group in topo.Edges.GroupBy(e => e.From)) + nodes[group.Key].Links = group.Select(e => nodes[e.To]).ToArray(); + }); + + cluster.Connection.DeadlockResolution = legacy + ? DeadlockResolutionMode.LegacyCrossChainPlaceholder + : DeadlockResolutionMode.WaitWithCycleDetection; + + var completions = new List>(); + + try + { + foreach (var r in topo.FetchRoots) + { + var tcs = new TaskCompletionSource(); + cluster.Connection.Get($"sys/n{r}") + .Then(_ => tcs.TrySetResult(true)) + .Error(ex => tcs.TrySetException((Exception)ex)); + completions.Add(tcs.Task); + } + + await WithTimeout(Task.WhenAll(completions)); + // Per-connection counter (fresh connection starts at 0), free of cross-connection noise. + return (false, (int)cluster.Connection.UnnecessaryPlaceholderCount); + } + catch (TimeoutException) + { + return (true, -1); + } + } + + record CompareRow(string Topology, int Iterations, + int LegacyDeadlocks, int LegacyBugRuns, double LegacyAvgUnnecessary, + int NewDeadlocks, int NewBugRuns, double NewAvgUnnecessary); + + [Fact] + public async Task LegacyVsNew_UnnecessaryPlaceholderComparison() + { + const int iterations = 20; + var rows = new List(); + + foreach (var topo in ComparisonTopologies()) + { + int legDead = 0, legBug = 0, legUnnec = 0; + int newDead = 0, newBug = 0, newUnnec = 0; + + for (var i = 0; i < iterations; i++) + { + var (ld, lu) = await RunForCompare(topo, legacy: true); + if (ld) legDead++; else { if (lu > 0) legBug++; legUnnec += Math.Max(0, lu); } + + var (nd, nu) = await RunForCompare(topo, legacy: false); + if (nd) newDead++; else { if (nu > 0) newBug++; newUnnec += Math.Max(0, nu); } + } + + rows.Add(new CompareRow(topo.Name, iterations, + legDead, legBug, (double)legUnnec / iterations, + newDead, newBug, (double)newUnnec / iterations)); + } + + EmitComparison(rows, iterations); + + // The new resolver must never deadlock and must never hand out an unnecessary placeholder + // (it only breaks genuine wait-for cycles) — both deterministic invariants. + Assert.All(rows, r => Assert.Equal(0, r.NewDeadlocks)); + Assert.All(rows, r => Assert.Equal(0, r.NewBugRuns)); + } + + void EmitComparison(List rows, int iterations) + { + var sb = new System.Text.StringBuilder(); + sb.AppendLine("# Esiur FetchResource — legacy vs new cross-chain resolution"); + sb.AppendLine(); + sb.AppendLine($"Generated: {DateTime.UtcNow:yyyy-MM-dd HH:mm} UTC | iterations per cell: {iterations}"); + sb.AppendLine(); + sb.AppendLine("Metric: 'unnecessary placeholder' = a not-yet-attached resource handed to a requester"); + sb.AppendLine("where NO genuine wait-for cycle exists — a partial delivery that the new resolver avoids"); + sb.AppendLine("by waiting for full attachment. Genuine cycles are excluded (both resolvers must break those)."); + sb.AppendLine(); + sb.AppendLine("| Topology | Legacy deadlocks | Legacy buggy runs | Legacy avg unnecessary | New deadlocks | New buggy runs | New avg unnecessary |"); + sb.AppendLine("|----------|-----------------:|------------------:|-----------------------:|--------------:|---------------:|--------------------:|"); + + foreach (var r in rows) + sb.AppendLine($"| {r.Topology} | {r.LegacyDeadlocks} | {r.LegacyBugRuns}/{r.Iterations} | {r.LegacyAvgUnnecessary:F2} | " + + $"{r.NewDeadlocks} | {r.NewBugRuns}/{r.Iterations} | {r.NewAvgUnnecessary:F2} |"); + + sb.AppendLine(); + sb.AppendLine($"Legacy: {rows.Sum(r => r.LegacyBugRuns)} runs with an unnecessary placeholder, " + + $"{rows.Sum(r => r.LegacyDeadlocks)} deadlocks across {rows.Count * iterations} runs."); + sb.AppendLine($"New: {rows.Sum(r => r.NewBugRuns)} runs with an unnecessary placeholder, " + + $"{rows.Sum(r => r.NewDeadlocks)} deadlocks across {rows.Count * iterations} runs."); + + var report = sb.ToString(); + _out.WriteLine(report); + var path = Path.Combine(AppContext.BaseDirectory, "deadlock-comparison.md"); + File.WriteAllText(path, report); + _out.WriteLine($"Comparison written to: {path}"); + } + + // ---- report ---------------------------------------------------------------------------- + + void EmitReport(List rows) + { + var sb = new System.Text.StringBuilder(); + sb.AppendLine("# Esiur FetchResource deadlock test results"); + sb.AppendLine(); + sb.AppendLine($"Generated: {DateTime.UtcNow:yyyy-MM-dd HH:mm} UTC"); + sb.AppendLine(); + sb.AppendLine("| Topology | Nodes | Reached | Same-chain breaks | Cross-chain breaks | Waits | Cache hits | Time (ms) | All attached | Deadlock |"); + sb.AppendLine("|----------|------:|--------:|------------------:|-------------------:|------:|-----------:|----------:|:------------:|:--------:|"); + + foreach (var r in rows) + sb.AppendLine($"| {r.Topology} | {r.Nodes} | {r.Reached} | {r.SameChain} | {r.CrossChain} | " + + $"{r.Waits} | {r.CacheHits} | {r.Ms:F1} | {(r.AllAttached ? "yes" : "**NO**")} | {(r.Deadlock ? "**YES**" : "no")} |"); + + sb.AppendLine(); + sb.AppendLine($"Topologies: {rows.Count} | Deadlocks: {rows.Count(r => r.Deadlock)} | " + + $"Fully attached: {rows.Count(r => r.AllAttached)}/{rows.Count} | " + + $"Total cycle breaks: same-chain {rows.Sum(r => r.SameChain)}, cross-chain {rows.Sum(r => r.CrossChain)} | " + + $"Total waits: {rows.Sum(r => r.Waits)}"); + + var report = sb.ToString(); + _out.WriteLine(report); + + var path = Path.Combine(AppContext.BaseDirectory, "deadlock-stats.md"); + File.WriteAllText(path, report); + _out.WriteLine($"Report written to: {path}"); + } +} + +[CollectionDefinition("Integration", DisableParallelization = true)] +public class IntegrationCollection { } diff --git a/Tests/Unit/Integration/IntegrationHarness.cs b/Tests/Unit/Integration/IntegrationHarness.cs new file mode 100644 index 0000000..ec5f788 --- /dev/null +++ b/Tests/Unit/Integration/IntegrationHarness.cs @@ -0,0 +1,104 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Esiur.Core; +using Esiur.Protocol; +using Esiur.Resource; +using Esiur.Security.Authority; +using Esiur.Security.Authority.Providers; +using Esiur.Stores; + +namespace Esiur.Tests.Unit.Integration; + +// ---- hash auth providers (self-consistent: client password {1..5} || server salt {6..10} +// == {1..10}, which is what the server stores the hash of) ------------------------------ + +internal class TestServerAuthProvider : PasswordAuthenticationProvider +{ + public override PasswordHash GetHostedAccountCredential(string identity, string domain) + => identity == "tester" && domain == "test" + ? new PasswordHash( + PasswordAuthenticationHandler.ComputeSha3(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }), + new byte[] { 6, 7, 8, 9, 10 }) + : new PasswordHash(null, null); +} + +internal class TestClientAuthProvider : PasswordAuthenticationProvider +{ + public override byte[] GetSelfCredential(string identity, string domain, string hostname) + => identity == "tester" && domain == "test" ? new byte[] { 1, 2, 3, 4, 5 } : null; + + public override IdentityPassword GetSelfIdentityAndCredential(string domain, string hostname) + => domain == "test" + ? new IdentityPassword { Identity = "tester", Password = new byte[] { 1, 2, 3, 4, 5 } } + : new IdentityPassword { Identity = null, Password = null }; +} + +/// +/// Spins up an in-process Esiur server and an authenticated client connection over loopback TCP, +/// so the real socket + protocol + FetchResource stack is exercised end to end. Each instance +/// uses a distinct port. Dispose closes the connection and tears down the server. +/// +internal sealed class IntegrationCluster : IAsyncDisposable +{ + static int _portCounter = 14400; + + public Warehouse ServerWarehouse { get; } + public Warehouse ClientWarehouse { get; } + public EpServer Server { get; } + public EpConnection Connection { get; private set; } + public int Port { get; } + + IntegrationCluster(Warehouse serverWh, EpServer server, int port) + { + ServerWarehouse = serverWh; + Server = server; + Port = port; + ClientWarehouse = new Warehouse(); + } + + /// + /// Builds a server hosting resources under "sys/<rootPath>" populated by + /// , opens it, then connects an authenticated client. + /// + public static async Task StartAsync(Func populate) + { + var port = Interlocked.Increment(ref _portCounter); + + var serverWh = new Warehouse(); + serverWh.RegisterAuthenticationProvider(new TestServerAuthProvider()); + + await serverWh.Put("sys", new MemoryStore()); + var server = await serverWh.Put("sys/server", new EpServer + { + Port = (ushort)port, + AllowedAuthenticationProviders = new[] { "hash" }, + }); + + await populate(serverWh); + + await serverWh.Open(); + + var cluster = new IntegrationCluster(serverWh, server, port); + + cluster.ClientWarehouse.RegisterAuthenticationProvider(new TestClientAuthProvider()); + cluster.Connection = await cluster.ClientWarehouse.Get( + $"ep://localhost:{port}", + new EpConnectionContext + { + AuthenticationMode = AuthenticationMode.InitializerIdentity, + Identity = "tester", + AuthenticationProtocol = "hash", + Domain = "test", + }); + + return cluster; + } + + public async ValueTask DisposeAsync() + { + try { Connection?.Destroy(); } catch { } + try { Server?.Destroy(); } catch { } + await Task.Delay(50); // let the listener socket release the port + } +} diff --git a/Tests/Unit/Integration/Node.cs b/Tests/Unit/Integration/Node.cs new file mode 100644 index 0000000..16a0b3e --- /dev/null +++ b/Tests/Unit/Integration/Node.cs @@ -0,0 +1,17 @@ +using Esiur.Resource; + +namespace Esiur.Tests.Unit.Integration; + +/// +/// A minimal distributed resource used to build arbitrary reference topologies (cycles, +/// cross-references, diamonds) for the deadlock integration tests. holds +/// references to other nodes; when a node is fetched the client transitively fetches its links, +/// which is what exercises EpConnection.FetchResource cycle handling. +/// +[Resource] +public partial class Node +{ + [Export] public int Id { get; set; } + + [Export] public Node[]? Links { get; set; } +} diff --git a/Tests/Unit/ReattachDeltaTests.cs b/Tests/Unit/ReattachDeltaTests.cs new file mode 100644 index 0000000..0c4da58 --- /dev/null +++ b/Tests/Unit/ReattachDeltaTests.cs @@ -0,0 +1,75 @@ +using System; +using Esiur.Data; +using Esiur.Resource; + +namespace Esiur.Tests.Unit; + +/// +/// Verifies the reattach property-delta wire format round-trips: the sparse +/// (index -> value/age/date) map composed by PropertyValueMapComposer is parsed back +/// identically by PropertyValueMapParserAsync. This is the format the age-based reattach +/// reply uses to send only the properties modified after the client's last-known age. +/// +public class ReattachDeltaTests +{ + static Map RoundTrip(Map delta) + { + // Compose -> RawData TDU; parse the TDU back to its raw payload; run the delta parser. + var tdu = Codec.Compose(delta, Warehouse.Default, null); + var (_, payloadObj) = Codec.ParseSync(tdu, 0, Warehouse.Default); + var payload = (byte[])payloadObj; + + return DataDeserializer + .PropertyValueMapParserAsync(payload, 0, (uint)payload.Length, null, new uint[] { 1 }) + .Wait(); + } + + [Fact] + public void Delta_RoundTrips_PreservingIndexValueAndAge() + { + var date0 = new DateTime(2026, 1, 1, 12, 0, 0, DateTimeKind.Utc); + var date3 = new DateTime(2026, 2, 2, 8, 30, 0, DateTimeKind.Utc); + + var delta = new Map + { + [0] = new PropertyValue(42, 5UL, date0), + [3] = new PropertyValue("hello", 9UL, date3), + }; + + var parsed = RoundTrip(delta); + + Assert.Equal(2, parsed.Count); + + Assert.Equal(42L, Convert.ToInt64(parsed[0].Value)); + Assert.Equal(5UL, parsed[0].Age); + Assert.Equal(date0.Ticks, ((DateTime)parsed[0].Date).ToUniversalTime().Ticks); + + Assert.Equal("hello", (string)parsed[3].Value); + Assert.Equal(9UL, parsed[3].Age); + Assert.Equal(date3.Ticks, ((DateTime)parsed[3].Date).ToUniversalTime().Ticks); + } + + [Fact] + public void EmptyDelta_RoundTrips_ToEmptyMap() + { + var parsed = RoundTrip(new Map()); + Assert.Empty(parsed); + } + + [Fact] + public void Delta_PreservesOnlyProvidedIndices() + { + // A sparse delta (only index 7 changed) must not introduce entries for other indices. + var delta = new Map + { + [7] = new PropertyValue(true, 100UL, new DateTime(2026, 6, 2, 0, 0, 0, DateTimeKind.Utc)), + }; + + var parsed = RoundTrip(delta); + + Assert.Single(parsed); + Assert.True(parsed.ContainsKey(7)); + Assert.True(Convert.ToBoolean(parsed[7].Value)); + Assert.Equal(100UL, parsed[7].Age); + } +}