mirror of
https://github.com/esiur/esiur-dotnet.git
synced 2026-06-23 10:38:41 +00:00
FetchTypeDef
This commit is contained in:
@@ -1,8 +1,8 @@
|
||||
namespace Esiur.Protocol;
|
||||
|
||||
/// <summary>
|
||||
/// Strategy used by <c>EpConnection.FetchResource</c> when it is asked for a resource whose
|
||||
/// attachment is already in flight. Selectable mainly for experimental A/B/C evaluation of the
|
||||
/// Strategy used by <c>EpConnection.FetchResource</c> and <c>EpConnection.FetchTypeDef</c> when
|
||||
/// asked for an object whose fetch is already in flight. Selectable mainly for experimental A/B/C evaluation of the
|
||||
/// deadlock-prevention algorithm.
|
||||
/// </summary>
|
||||
public enum DeadlockResolutionMode : byte
|
||||
|
||||
@@ -2162,6 +2162,8 @@ public partial class EpConnection : NetworkConnection, IStore
|
||||
_requests.Clear();
|
||||
_resourceRequests.Clear();
|
||||
_typeDefRequests.Clear();
|
||||
_fetchBlockedOn.Clear();
|
||||
_typeDefFetchBlockedOn.Clear();
|
||||
|
||||
//_typeDefsByIdRequests.Clear();
|
||||
//_typeDefsByNameRequests.Clear();
|
||||
|
||||
@@ -63,11 +63,14 @@ partial class EpConnection
|
||||
// (e.g. two concurrent fetches A<->B) so a placeholder can break the deadlock, while
|
||||
// independent/app-facing fetches of an in-flight resource simply wait for full attachment.
|
||||
readonly Dictionary<uint, HashSet<uint>> _fetchBlockedOn = new Dictionary<uint, HashSet<uint>>();
|
||||
|
||||
// Same wait-for graph as above, but for in-flight remote type definition parsing.
|
||||
readonly Dictionary<ulong, HashSet<ulong>> _typeDefFetchBlockedOn = new Dictionary<ulong, HashSet<ulong>>();
|
||||
readonly object _deliveredRootsLock = new object();
|
||||
readonly Dictionary<uint, WeakReference<EpResource>> _deliveredRoots = new Dictionary<uint, WeakReference<EpResource>>();
|
||||
|
||||
/// <summary>
|
||||
/// Strategy FetchResource uses for an in-flight resource. Defaults to the new wait + cycle
|
||||
/// Strategy fetches use for in-flight resources and type definitions. Defaults to the new wait + cycle
|
||||
/// detection. Selectable for experimental evaluation (see <see cref="DeadlockResolutionMode"/>).
|
||||
/// </summary>
|
||||
public DeadlockResolutionMode DeadlockResolution { get; set; } = DeadlockResolutionMode.WaitWithCycleDetection;
|
||||
@@ -2081,23 +2084,31 @@ partial class EpConnection
|
||||
/// <returns>DistributedResource</returns>
|
||||
///
|
||||
//object fetchResourceLock = new object();
|
||||
// Records that the attachment of `parent` is now blocked waiting on in-flight child `child`.
|
||||
void AddFetchBlock(uint parent, uint child)
|
||||
// Records that the fetch of `parent` is now blocked waiting on in-flight child `child`.
|
||||
static void AddFetchBlock<TId>(Dictionary<TId, HashSet<TId>> blockedOn, TId parent, TId child)
|
||||
{
|
||||
if (!_fetchBlockedOn.TryGetValue(parent, out var set))
|
||||
_fetchBlockedOn[parent] = set = new HashSet<uint>();
|
||||
if (!blockedOn.TryGetValue(parent, out var set))
|
||||
blockedOn[parent] = set = new HashSet<TId>();
|
||||
set.Add(child);
|
||||
}
|
||||
|
||||
void AddFetchBlock(uint parent, uint child) => AddFetchBlock(_fetchBlockedOn, parent, child);
|
||||
|
||||
void AddTypeDefFetchBlock(ulong parent, ulong child) => AddFetchBlock(_typeDefFetchBlockedOn, parent, child);
|
||||
|
||||
// Removes a resource from the wait-for graph once it is attached or its fetch failed: it is
|
||||
// no longer blocked on anything and no longer a pending child of anyone.
|
||||
void ClearFetchNode(uint id)
|
||||
static void ClearFetchNode<TId>(Dictionary<TId, HashSet<TId>> blockedOn, TId id)
|
||||
{
|
||||
_fetchBlockedOn.Remove(id);
|
||||
foreach (var set in _fetchBlockedOn.Values)
|
||||
blockedOn.Remove(id);
|
||||
foreach (var set in blockedOn.Values)
|
||||
set.Remove(id);
|
||||
}
|
||||
|
||||
void ClearFetchNode(uint id) => ClearFetchNode(_fetchBlockedOn, id);
|
||||
|
||||
void ClearTypeDefFetchNode(ulong id) => ClearFetchNode(_typeDefFetchBlockedOn, id);
|
||||
|
||||
/// <summary>
|
||||
/// Returns true if completing the fetch of <paramref name="id"/> by waiting for its in-flight
|
||||
/// request would deadlock, i.e. the resource is (transitively) blocked on a resource that the
|
||||
@@ -2105,13 +2116,19 @@ partial class EpConnection
|
||||
/// placeholder to break the cycle instead of waiting.
|
||||
/// </summary>
|
||||
internal static bool HasWaitForCycle(uint id, uint[] requestSequence, IReadOnlyDictionary<uint, HashSet<uint>> blockedOn)
|
||||
=> HasWaitForCycleCore(id, requestSequence, blockedOn);
|
||||
|
||||
internal static bool HasWaitForCycle(ulong id, ulong[] requestSequence, IReadOnlyDictionary<ulong, HashSet<ulong>> blockedOn)
|
||||
=> HasWaitForCycleCore(id, requestSequence, blockedOn);
|
||||
|
||||
static bool HasWaitForCycleCore<TId>(TId id, TId[] requestSequence, IReadOnlyDictionary<TId, HashSet<TId>> blockedOn)
|
||||
{
|
||||
if (requestSequence == null || requestSequence.Length == 0)
|
||||
return false;
|
||||
|
||||
var chain = new HashSet<uint>(requestSequence);
|
||||
var visited = new HashSet<uint>();
|
||||
var stack = new Stack<uint>();
|
||||
var chain = new HashSet<TId>(requestSequence);
|
||||
var visited = new HashSet<TId>();
|
||||
var stack = new Stack<TId>();
|
||||
stack.Push(id);
|
||||
|
||||
while (stack.Count > 0)
|
||||
@@ -2505,23 +2522,39 @@ partial class EpConnection
|
||||
|
||||
var requestInfo = _typeDefRequests[id];
|
||||
|
||||
// The type definition that triggered this fetch (the tail of the chain), if any. Used to
|
||||
// record wait-for edges and to distinguish graph-internal typedef parsing from
|
||||
// application-facing fetches.
|
||||
ulong? parent = requestSequence != null && requestSequence.Length > 0
|
||||
? requestSequence[requestSequence.Length - 1]
|
||||
: (ulong?)null;
|
||||
|
||||
if (requestInfo != null)
|
||||
{
|
||||
if (typeDef != null && (requestSequence?.Contains(id) ?? false))
|
||||
if (DeadlockResolution != DeadlockResolutionMode.NaiveWait
|
||||
&& typeDef != null && (requestSequence?.Contains(id) ?? false))
|
||||
{
|
||||
// dead lock avoidance for loop reference.
|
||||
// Same dependency chain (A->B->A): return the in-progress placeholder to break
|
||||
// the reference cycle. NaiveWait skips this for deadlock detection experiments.
|
||||
return new AsyncReply<RemoteTypeDef>(typeDef);
|
||||
}
|
||||
else if (typeDef != null && requestInfo.RequestSequence.Contains(id))
|
||||
|
||||
var breakCycle = typeDef != null && DeadlockResolution switch
|
||||
{
|
||||
DeadlockResolutionMode.LegacyCrossChainPlaceholder => requestInfo.RequestSequence.Contains(id),
|
||||
DeadlockResolutionMode.WaitWithCycleDetection => HasWaitForCycle(id, requestSequence, _typeDefFetchBlockedOn),
|
||||
_ => false,
|
||||
};
|
||||
|
||||
if (breakCycle)
|
||||
{
|
||||
// dead lock avoidance for dependent reference.
|
||||
return new AsyncReply<RemoteTypeDef>(typeDef);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
||||
if (parent != null)
|
||||
AddTypeDefFetchBlock(parent.Value, id);
|
||||
return requestInfo.Reply;
|
||||
}
|
||||
}
|
||||
|
||||
//Console.WriteLine($"Sent typedef {id} {Instance.Warehouse.GetHashCode()}");
|
||||
|
||||
@@ -2530,11 +2563,16 @@ partial class EpConnection
|
||||
var reply = new AsyncReply<RemoteTypeDef>();
|
||||
_typeDefRequests.Add(id, new FetchRequestInfo<RemoteTypeDef, ulong>(reply, newSequence));
|
||||
|
||||
if (parent != null)
|
||||
AddTypeDefFetchBlock(parent.Value, id);
|
||||
|
||||
SendRequest(EpPacketRequest.TypeDefById, id)
|
||||
.Then((result) =>
|
||||
{
|
||||
if (result == null)
|
||||
{
|
||||
_typeDefRequests.Remove(id);
|
||||
ClearTypeDefFetchNode(id);
|
||||
reply.TriggerError(new AsyncException(ErrorType.Management,
|
||||
(ushort)ExceptionCode.ResourceNotFound, "Null response"));
|
||||
return;
|
||||
@@ -2553,12 +2591,24 @@ partial class EpConnection
|
||||
// move from needed to attached.
|
||||
_neededTypeDefs.Remove(id);
|
||||
_cachedTypeDefs[id] = td;
|
||||
ClearTypeDefFetchNode(id);
|
||||
|
||||
reply.Trigger(td);
|
||||
|
||||
}).Error(reply.TriggerError);
|
||||
}).Error(ex =>
|
||||
{
|
||||
_typeDefRequests.Remove(id);
|
||||
_neededTypeDefs.Remove(id);
|
||||
ClearTypeDefFetchNode(id);
|
||||
reply.TriggerError(ex);
|
||||
});
|
||||
|
||||
}).Error(reply.TriggerError);
|
||||
}).Error(ex =>
|
||||
{
|
||||
_typeDefRequests.Remove(id);
|
||||
ClearTypeDefFetchNode(id);
|
||||
reply.TriggerError(ex);
|
||||
});
|
||||
|
||||
return reply;
|
||||
|
||||
|
||||
@@ -18,6 +18,14 @@ public class FetchCycleDetectionTests
|
||||
return g;
|
||||
}
|
||||
|
||||
static Dictionary<ulong, HashSet<ulong>> Graph64(params (ulong parent, ulong[] children)[] edges)
|
||||
{
|
||||
var g = new Dictionary<ulong, HashSet<ulong>>();
|
||||
foreach (var (parent, children) in edges)
|
||||
g[parent] = new HashSet<ulong>(children);
|
||||
return g;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void AppFacingFetch_NoChain_NeverCycles()
|
||||
{
|
||||
@@ -83,4 +91,11 @@ public class FetchCycleDetectionTests
|
||||
var g = Graph((2u, new uint[] { 3 }), (3u, new uint[] { 2 }));
|
||||
Assert.False(EpConnection.HasWaitForCycle(2, new uint[] { 1 }, g));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void TypeDefIds_UseSameCycleDetection()
|
||||
{
|
||||
var g = Graph64((2ul, new ulong[] { 3 }), (3ul, new ulong[] { 1 }));
|
||||
Assert.True(EpConnection.HasWaitForCycle(2ul, new ulong[] { 1 }, g));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,6 +49,17 @@ public class DeadlockDetectionTests
|
||||
foreach (var grp in edgeList.GroupBy(e => e.from))
|
||||
ns[grp.Key].Links = grp.Select(e => ns[e.to]).ToArray();
|
||||
});
|
||||
|
||||
if (mode == DeadlockResolutionMode.NaiveWait)
|
||||
{
|
||||
// Node.Links is self-referential at the typedef level. Warm it with the default
|
||||
// resolver so this test isolates NaiveWait behavior to the resource graph.
|
||||
var nodeTypeDef = cluster.ServerWarehouse.GetLocalTypeDefByName(typeof(Node).FullName ?? nameof(Node));
|
||||
if (nodeTypeDef == null)
|
||||
throw new InvalidOperationException("Node typedef was not registered.");
|
||||
await cluster.Connection.FetchTypeDef(nodeTypeDef.Id, null);
|
||||
}
|
||||
|
||||
cluster.Connection.DeadlockResolution = mode;
|
||||
return cluster;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user