diff --git a/Esiur.sln b/Esiur.sln index d7b4209..20c603f 100644 --- a/Esiur.sln +++ b/Esiur.sln @@ -95,11 +95,15 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.ConcurrentAttac EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.Unit", "Tests\Unit\Esiur.Tests.Unit.csproj", "{D1B99C5A-82F7-459D-B56D-F8FD096D3854}" EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Distribution", "Distribution", "{1C087695-14B5-C927-8D92-12D1EE36BDAB}" +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Deadlock", "Deadlock", "{C5FB16A3-952C-4078-A15A-3C7CE42E73B5}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.Deadlock.Server", "Tests\Distribution\Deadlock\Server\Esiur.Tests.Deadlock.Server.csproj", "{41FD182A-2A7E-4E3A-BEDE-F55C0D9C83EF}" +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Client", "Client", "{22A76A25-333D-4516-8EA6-4D03E3023183}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.Deadlock.Client", "Tests\Distribution\Deadlock\Client\Esiur.Tests.Deadlock.Client.csproj", "{8D12333C-4619-4145-A6C6-000F9EF471B8}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.Deadlock.Client", "Tests\Distribution\Deadlock\Client\Esiur.Tests.Deadlock.Client.csproj", "{28A7F758-951B-6502-6EA4-C216BA12F77C}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Server", "Server", "{967F62B4-2815-473F-9F1E-E7F146EE8872}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Esiur.Tests.Deadlock.Server", "Tests\Distribution\Deadlock\Server\Esiur.Tests.Deadlock.Server.csproj", "{F2FE7C0B-58C1-D768-C37A-D428D2B85940}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -375,30 +379,30 @@ Global {D1B99C5A-82F7-459D-B56D-F8FD096D3854}.Release|x64.Build.0 = Release|Any CPU {D1B99C5A-82F7-459D-B56D-F8FD096D3854}.Release|x86.ActiveCfg = Release|Any CPU {D1B99C5A-82F7-459D-B56D-F8FD096D3854}.Release|x86.Build.0 = Release|Any CPU - {41FD182A-2A7E-4E3A-BEDE-F55C0D9C83EF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {41FD182A-2A7E-4E3A-BEDE-F55C0D9C83EF}.Debug|Any CPU.Build.0 = Debug|Any CPU - {41FD182A-2A7E-4E3A-BEDE-F55C0D9C83EF}.Debug|x64.ActiveCfg = Debug|Any CPU - {41FD182A-2A7E-4E3A-BEDE-F55C0D9C83EF}.Debug|x64.Build.0 = Debug|Any CPU - {41FD182A-2A7E-4E3A-BEDE-F55C0D9C83EF}.Debug|x86.ActiveCfg = Debug|Any CPU - {41FD182A-2A7E-4E3A-BEDE-F55C0D9C83EF}.Debug|x86.Build.0 = Debug|Any CPU - {41FD182A-2A7E-4E3A-BEDE-F55C0D9C83EF}.Release|Any CPU.ActiveCfg = Release|Any CPU - {41FD182A-2A7E-4E3A-BEDE-F55C0D9C83EF}.Release|Any CPU.Build.0 = Release|Any CPU - {41FD182A-2A7E-4E3A-BEDE-F55C0D9C83EF}.Release|x64.ActiveCfg = Release|Any CPU - {41FD182A-2A7E-4E3A-BEDE-F55C0D9C83EF}.Release|x64.Build.0 = Release|Any CPU - {41FD182A-2A7E-4E3A-BEDE-F55C0D9C83EF}.Release|x86.ActiveCfg = Release|Any CPU - {41FD182A-2A7E-4E3A-BEDE-F55C0D9C83EF}.Release|x86.Build.0 = Release|Any CPU - {8D12333C-4619-4145-A6C6-000F9EF471B8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {8D12333C-4619-4145-A6C6-000F9EF471B8}.Debug|Any CPU.Build.0 = Debug|Any CPU - {8D12333C-4619-4145-A6C6-000F9EF471B8}.Debug|x64.ActiveCfg = Debug|Any CPU - {8D12333C-4619-4145-A6C6-000F9EF471B8}.Debug|x64.Build.0 = Debug|Any CPU - {8D12333C-4619-4145-A6C6-000F9EF471B8}.Debug|x86.ActiveCfg = Debug|Any CPU - {8D12333C-4619-4145-A6C6-000F9EF471B8}.Debug|x86.Build.0 = Debug|Any CPU - {8D12333C-4619-4145-A6C6-000F9EF471B8}.Release|Any CPU.ActiveCfg = Release|Any CPU - {8D12333C-4619-4145-A6C6-000F9EF471B8}.Release|Any CPU.Build.0 = Release|Any CPU - {8D12333C-4619-4145-A6C6-000F9EF471B8}.Release|x64.ActiveCfg = Release|Any CPU - {8D12333C-4619-4145-A6C6-000F9EF471B8}.Release|x64.Build.0 = Release|Any CPU - {8D12333C-4619-4145-A6C6-000F9EF471B8}.Release|x86.ActiveCfg = Release|Any CPU - {8D12333C-4619-4145-A6C6-000F9EF471B8}.Release|x86.Build.0 = Release|Any CPU + {28A7F758-951B-6502-6EA4-C216BA12F77C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {28A7F758-951B-6502-6EA4-C216BA12F77C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {28A7F758-951B-6502-6EA4-C216BA12F77C}.Debug|x64.ActiveCfg = Debug|Any CPU + {28A7F758-951B-6502-6EA4-C216BA12F77C}.Debug|x64.Build.0 = Debug|Any CPU + {28A7F758-951B-6502-6EA4-C216BA12F77C}.Debug|x86.ActiveCfg = Debug|Any CPU + {28A7F758-951B-6502-6EA4-C216BA12F77C}.Debug|x86.Build.0 = Debug|Any CPU + {28A7F758-951B-6502-6EA4-C216BA12F77C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {28A7F758-951B-6502-6EA4-C216BA12F77C}.Release|Any CPU.Build.0 = Release|Any CPU + {28A7F758-951B-6502-6EA4-C216BA12F77C}.Release|x64.ActiveCfg = Release|Any CPU + {28A7F758-951B-6502-6EA4-C216BA12F77C}.Release|x64.Build.0 = Release|Any CPU + {28A7F758-951B-6502-6EA4-C216BA12F77C}.Release|x86.ActiveCfg = Release|Any CPU + {28A7F758-951B-6502-6EA4-C216BA12F77C}.Release|x86.Build.0 = Release|Any CPU + {F2FE7C0B-58C1-D768-C37A-D428D2B85940}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {F2FE7C0B-58C1-D768-C37A-D428D2B85940}.Debug|Any CPU.Build.0 = Debug|Any CPU + {F2FE7C0B-58C1-D768-C37A-D428D2B85940}.Debug|x64.ActiveCfg = Debug|Any CPU + {F2FE7C0B-58C1-D768-C37A-D428D2B85940}.Debug|x64.Build.0 = Debug|Any CPU + {F2FE7C0B-58C1-D768-C37A-D428D2B85940}.Debug|x86.ActiveCfg = Debug|Any CPU + {F2FE7C0B-58C1-D768-C37A-D428D2B85940}.Debug|x86.Build.0 = Debug|Any CPU + {F2FE7C0B-58C1-D768-C37A-D428D2B85940}.Release|Any CPU.ActiveCfg = Release|Any CPU + {F2FE7C0B-58C1-D768-C37A-D428D2B85940}.Release|Any CPU.Build.0 = Release|Any CPU + {F2FE7C0B-58C1-D768-C37A-D428D2B85940}.Release|x64.ActiveCfg = Release|Any CPU + {F2FE7C0B-58C1-D768-C37A-D428D2B85940}.Release|x64.Build.0 = Release|Any CPU + {F2FE7C0B-58C1-D768-C37A-D428D2B85940}.Release|x86.ActiveCfg = Release|Any CPU + {F2FE7C0B-58C1-D768-C37A-D428D2B85940}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -444,8 +448,11 @@ Global {E713D25F-2602-44C9-AB9E-C9477FB2BA93} = {94C8CFDB-C7C6-40DF-A596-647FEEA3C917} {3FFB2BF4-159E-3073-4BDF-08AE93C7A2C1} = {E713D25F-2602-44C9-AB9E-C9477FB2BA93} {D1B99C5A-82F7-459D-B56D-F8FD096D3854} = {2769C4C3-2595-413B-B7FE-5903826770C1} - {41FD182A-2A7E-4E3A-BEDE-F55C0D9C83EF} = {1C087695-14B5-C927-8D92-12D1EE36BDAB} - {8D12333C-4619-4145-A6C6-000F9EF471B8} = {1C087695-14B5-C927-8D92-12D1EE36BDAB} + {C5FB16A3-952C-4078-A15A-3C7CE42E73B5} = {94C8CFDB-C7C6-40DF-A596-647FEEA3C917} + {22A76A25-333D-4516-8EA6-4D03E3023183} = {C5FB16A3-952C-4078-A15A-3C7CE42E73B5} + {28A7F758-951B-6502-6EA4-C216BA12F77C} = {22A76A25-333D-4516-8EA6-4D03E3023183} + {967F62B4-2815-473F-9F1E-E7F146EE8872} = {C5FB16A3-952C-4078-A15A-3C7CE42E73B5} + {F2FE7C0B-58C1-D768-C37A-D428D2B85940} = {967F62B4-2815-473F-9F1E-E7F146EE8872} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {C584421D-5EC0-4821-B7D8-2633D8D405F2} diff --git a/Tests/Distribution/Deadlock/Client/Program.cs b/Tests/Distribution/Deadlock/Client/Program.cs index 812eb1f..f37c4be 100644 --- a/Tests/Distribution/Deadlock/Client/Program.cs +++ b/Tests/Distribution/Deadlock/Client/Program.cs @@ -40,9 +40,9 @@ var roots = rootsArg.Equals("all", StringComparison.OrdinalIgnoreCase) Console.WriteLine($"[Client] {host}:{port} nodes={nodeCount} mode={mode} roots={roots.Length} " + $"iterations={iterations} stallMs={stallMs} hardMs={hardMs}"); -Console.WriteLine($"[Client] {"iter",-5}{"outcome",-14}{"ms",10}{"breaks",10}{"unnec",8}{"unpublished",13}"); +Console.WriteLine($"[Client] {"iter",-5}{"outcome",-13}{"ms",10}{"attached",10}{"breaks",9}{"unnec",8}{"unpub",8}"); -var rows = new List<(int iter, string outcome, double ms, long breaks, long unnec, int unpublished)>(); +var rows = new List<(int iter, string outcome, double ms, long attached, long breaks, long unnec, int unpublished)>(); for (var it = 0; it < iterations; it++) { @@ -58,8 +58,8 @@ for (var it = 0; it < iterations; it++) var (outcome, ms, results) = await Classify(con, roots, stallMs, hardMs); var unpublished = results == null ? -1 : CountUnpublished(results); - rows.Add((it + 1, outcome, ms, con.CycleBreakCount, con.UnnecessaryPlaceholderCount, unpublished)); - Console.WriteLine($"[Client] {it + 1,-5}{outcome,-14}{ms,10:F1}{con.CycleBreakCount,10}{con.UnnecessaryPlaceholderCount,8}{unpublished,13}"); + rows.Add((it + 1, outcome, ms, con.AttachedResourceCount, con.CycleBreakCount, con.UnnecessaryPlaceholderCount, unpublished)); + Console.WriteLine($"[Client] {it + 1,-5}{outcome,-13}{ms,10:F1}{con.AttachedResourceCount,10}{con.CycleBreakCount,9}{con.UnnecessaryPlaceholderCount,8}{unpublished,8}"); try { con.Destroy(); } catch { } } @@ -73,17 +73,20 @@ Console.WriteLine(); Console.WriteLine($"[Client] === summary ({mode}) ==="); Console.WriteLine($" completed={completed.Count} deadlocked={rows.Count(r => r.outcome == "Deadlocked")} " + $"slow={rows.Count(r => r.outcome == "SlowTimeout")} faulted={rows.Count(r => r.outcome == "Faulted")}"); +Console.WriteLine($" resources attached per run (max)={rows.Max(r => r.attached)}"); Console.WriteLine($" completion ms: median={Pct(0.5):F1} p99={Pct(0.99):F1} max={(times.Count > 0 ? times[^1] : 0):F1}"); Console.WriteLine($" cycle-breaks total={rows.Sum(r => r.breaks)} unnecessary-placeholders total={rows.Sum(r => r.unnec)}"); Console.WriteLine($" partial deliveries (unpublished>0) in {rows.Count(r => r.unpublished > 0)}/{rows.Count} runs"); -var csv = "iteration,outcome,ms,cycle_breaks,unnecessary_placeholders,unpublished\n" + - string.Join("\n", rows.Select(r => $"{r.iter},{r.outcome},{r.ms:F1},{r.breaks},{r.unnec},{r.unpublished}")); +var csv = "iteration,outcome,ms,attached,cycle_breaks,unnecessary_placeholders,unpublished\n" + + string.Join("\n", rows.Select(r => $"{r.iter},{r.outcome},{r.ms:F1},{r.attached},{r.breaks},{r.unnec},{r.unpublished}")); var outFile = $"deadlock_{mode}_{host}_{port}.csv"; await File.WriteAllTextAsync(outFile, csv); Console.WriteLine($"[Client] results written to {outFile}"); -Console.ReadLine(); +// Keep the window open only when run interactively; scripted/redirected runs exit immediately. +if (!Console.IsInputRedirected) + Console.ReadLine(); // ---- stall-based classification --------------------------------------------------------------- @@ -131,7 +134,9 @@ static async Task<(string outcome, double ms, EpResource[]? results)> Classify( } // Counts resources reachable from the delivered roots that are not Published — i.e. handed to the -// application while their dependency graph was not fully attached. Links is property index 1. +// application while their dependency graph was not fully attached. Traverses every reference-typed +// property (Node.Links/Resources1/Resources2 and the Resource1/Resource2 cross-references) so the +// whole delivered graph is checked, not just the node links. static int CountUnpublished(EpResource[] roots) { var seen = new HashSet(); @@ -145,14 +150,26 @@ static int CountUnpublished(EpResource[] roots) if (node.Status != ResourceStatus.Published) unpublished++; - if (node.Status == ResourceStatus.Attached && node.TryGetPropertyValue((byte)1, out var linksObj) && linksObj is IEnumerable links) - foreach (var child in links) - if (child is EpResource childResource) - queue.Enqueue(childResource); + // Only attached/published resources can be safely read for further references. + if (node.Status != ResourceStatus.Attached && node.Status != ResourceStatus.Published) continue; + + var properties = node.Instance.Definition.Properties.Length; + for (byte p = 0; p < properties; p++) + if (node.TryGetPropertyValue(p, out var value)) + Flatten(value, queue); } return unpublished; } +static void Flatten(object? value, Queue queue) +{ + if (value is EpResource resource) + queue.Enqueue(resource); + else if (value is IEnumerable sequence && value is not string) + foreach (var item in sequence) + Flatten(item, queue); +} + static string GetArg(string[] args, string key, string def) { var i = Array.IndexOf(args, key); diff --git a/Tests/Distribution/Deadlock/Server/Program.cs b/Tests/Distribution/Deadlock/Server/Program.cs index 3883e93..e568638 100644 --- a/Tests/Distribution/Deadlock/Server/Program.cs +++ b/Tests/Distribution/Deadlock/Server/Program.cs @@ -25,11 +25,45 @@ var res2Count = int.Parse(GetArg(args, "--res2", "100")); var seed = int.Parse(GetArg(args, "--seed", "20260603")); var edgeProb = double.Parse(GetArg(args, "--edge-prob", "0.22")); -var edges = BuildTopology(topology, ref nodeCount, seed, edgeProb); -var (hasCycle, backEdges) = CycleCensus(nodeCount, edges); +var nodeEdges = BuildTopology(topology, ref nodeCount, seed, edgeProb); -Console.WriteLine($"[Server] topology={topology} nodes={nodeCount} edges={edges.Count} " + - $"cyclic={hasCycle} backEdges={backEdges} port={port}"); +// One RNG, seeded once, for all random assignment. (Previously a new Random(seed) was created +// inside each loop, so every node/resource pointed at the same target and the cycle structure +// collapsed; one RNG yields a genuinely random, densely cyclic resource graph.) +var rng = new Random(seed); + +// Plan the resource cross-references as indices first, so the FULL graph (nodes + Resource1 + +// Resource2 + every reference) can be censused for circular dependencies before it is wired. +var nodeRes1 = new int[nodeCount][]; +var nodeRes2 = new int[nodeCount][]; +for (var i = 0; i < nodeCount; i++) +{ + nodeRes1[i] = Sample(rng, res1Count, res1Count / 2); + nodeRes2[i] = Sample(rng, res2Count, res2Count / 2); +} + +var res1Ref1 = new int[res1Count]; +var res1Ref2 = new int[res1Count]; +for (var i = 0; i < res1Count; i++) +{ + res1Ref1[i] = res1Count > 0 ? rng.Next(res1Count) : -1; + res1Ref2[i] = res2Count > 0 ? rng.Next(res2Count) : -1; +} + +var res2Ref1 = new int[res2Count]; +var res2Ref2 = new int[res2Count]; +for (var i = 0; i < res2Count; i++) +{ + res2Ref1[i] = res1Count > 0 ? rng.Next(res1Count) : -1; + res2Ref2[i] = res2Count > 0 ? rng.Next(res2Count) : -1; +} + +var totalResources = nodeCount + res1Count + res2Count; +var (hasCycle, backEdges, totalEdges) = FullCensus( + nodeCount, res1Count, res2Count, nodeEdges, nodeRes1, nodeRes2, res1Ref1, res1Ref2, res2Ref1, res2Ref2); + +Console.WriteLine($"[Server] topology={topology} nodes={nodeCount} res1={res1Count} res2={res2Count} " + + $"totalResources={totalResources} edges={totalEdges} cyclic={hasCycle} backEdges={backEdges} port={port}"); var wh = new Warehouse(); await wh.Put("sys", new MemoryStore()); @@ -41,52 +75,28 @@ var nodes = new Node[nodeCount]; var resources1 = new Resource1[res1Count]; var resources2 = new Resource2[res2Count]; -for (var i = 0; i < nodeCount; i++) { - nodes[i] = new Node { Id = i }; - await wh.Put($"sys/n{i}", nodes[i]); -} +for (var i = 0; i < nodeCount; i++) { nodes[i] = new Node { Id = i }; await wh.Put($"sys/n{i}", nodes[i]); } +for (var i = 0; i < res1Count; i++) { resources1[i] = new Resource1(); await wh.Put($"sys/r1_{i}", resources1[i]); } +for (var i = 0; i < res2Count; i++) { resources2[i] = new Resource2(); await wh.Put($"sys/r2_{i}", resources2[i]); } +// Wire the planned references: each Node also pulls in a random subset of Resource1/Resource2, and +// the resources cross-reference one another, creating dense cycles for the fetch to resolve. +for (var i = 0; i < nodeCount; i++) +{ + nodes[i].Resources1 = nodeRes1[i].Select(k => resources1[k]).ToArray(); + nodes[i].Resources2 = nodeRes2[i].Select(k => resources2[k]).ToArray(); +} for (var i = 0; i < res1Count; i++) { - resources1[i] = new Resource1(); - await wh.Put($"sys/r1_{i}", resources1[i]); + if (res1Ref1[i] >= 0) resources1[i].res1 = resources1[res1Ref1[i]]; + if (res1Ref2[i] >= 0) resources1[i].res2 = resources2[res1Ref2[i]]; } - for (var i = 0; i < res2Count; i++) { - resources2[i] = new Resource2(); - await wh.Put($"sys/r2_{i}", resources2[i]); + if (res2Ref1[i] >= 0) resources2[i].res1 = resources1[res2Ref1[i]]; + if (res2Ref2[i] >= 0) resources2[i].res2 = resources2[res2Ref2[i]]; } - -// randomly assign some resources to each node so the fetches do some work beyond just traversing the links; this also -for(var i = 0; i < nodeCount; i++) -{ - var rng = new Random(seed); - - - nodes[i].Resources1 = rng.GetItems(resources1, res1Count / 2); - nodes[i].Resources2 = rng.GetItems(resources2, res2Count / 2); -} - -for(var i =0; i < res1Count; i++) -{ - var rng = new Random(seed); - var res1Index = rng.Next(res1Count); - var res2Index = rng.Next(res2Count); - resources1[i].res1 = resources1[res1Index]; - resources1[i].res2 = resources2[res2Index]; -} - -for (var i = 0; i < res2Count; i++) -{ - var rng = new Random(seed); - var res1Index = rng.Next(res1Count); - var res2Index = rng.Next(res2Count); - resources2[i].res1 = resources1[res1Index]; - resources2[i].res2 = resources2[res2Index]; -} - -foreach (var grp in edges.GroupBy(e => e.from)) +foreach (var grp in nodeEdges.GroupBy(e => e.from)) nodes[grp.Key].Links = grp.Select(e => nodes[e.to]).ToArray(); await wh.Open(); @@ -152,16 +162,54 @@ static List<(int from, int to)> BuildTopology(string topo, ref int n, int seed, return edges; } -// DFS three-colouring; counts back edges (cycle-closing edges, including self loops). -static (bool hasCycle, int backEdges) CycleCensus(int n, IReadOnlyList<(int from, int to)> edges) +// k indices drawn (with replacement) from [0, count); empty if count or k is 0. +static int[] Sample(Random rng, int count, int k) { - var adj = new List[n]; - for (var i = 0; i < n; i++) adj[i] = new List(); - var back = 0; - foreach (var (a, b) in edges) { if (a == b) back++; else adj[a].Add(b); } + if (count <= 0 || k <= 0) return Array.Empty(); + var result = new int[k]; + for (var i = 0; i < k; i++) result[i] = rng.Next(count); + return result; +} - var color = new byte[n]; // 0 unvisited, 1 on-stack, 2 done - for (var s = 0; s < n; s++) +// Censuses the FULL request graph — Node Links + Node->Resource1/2 + Resource1/2 cross-references — +// for circular dependencies via DFS three-colouring. Vertices: [0..nodes) nodes, then res1, then +// res2. Returns whether the graph is cyclic, the number of cycle-closing (back) edges, and the +// total edge count. +static (bool hasCycle, int backEdges, int totalEdges) FullCensus( + int nodes, int r1, int r2, + IReadOnlyList<(int from, int to)> nodeEdges, + int[][] nodeRes1, int[][] nodeRes2, + int[] res1Ref1, int[] res1Ref2, int[] res2Ref1, int[] res2Ref2) +{ + var v = nodes + r1 + r2; + int R1(int i) => nodes + i; + int R2(int i) => nodes + r1 + i; + + var adj = new List[v]; + for (var i = 0; i < v; i++) adj[i] = new List(); + var total = 0; + void Add(int a, int b) { adj[a].Add(b); total++; } + + foreach (var (a, b) in nodeEdges) Add(a, b); + for (var i = 0; i < nodes; i++) + { + foreach (var k in nodeRes1[i]) Add(i, R1(k)); + foreach (var k in nodeRes2[i]) Add(i, R2(k)); + } + for (var i = 0; i < r1; i++) + { + if (res1Ref1[i] >= 0) Add(R1(i), R1(res1Ref1[i])); + if (res1Ref2[i] >= 0) Add(R1(i), R2(res1Ref2[i])); + } + for (var i = 0; i < r2; i++) + { + if (res2Ref1[i] >= 0) Add(R2(i), R1(res2Ref1[i])); + if (res2Ref2[i] >= 0) Add(R2(i), R2(res2Ref2[i])); + } + + var back = 0; + var color = new byte[v]; // 0 unvisited, 1 on-stack, 2 done + for (var s = 0; s < v; s++) { if (color[s] != 0) continue; var stack = new Stack<(int node, int idx)>(); @@ -172,14 +220,15 @@ static (bool hasCycle, int backEdges) CycleCensus(int n, IReadOnlyList<(int from if (idx < adj[u].Count) { stack.Push((u, idx + 1)); - var v = adj[u][idx]; - if (color[v] == 1) back++; - else if (color[v] == 0) { color[v] = 1; stack.Push((v, 0)); } + var w = adj[u][idx]; + if (w == u) back++; // self-loop + else if (color[w] == 1) back++; // back edge -> cycle + else if (color[w] == 0) { color[w] = 1; stack.Push((w, 0)); } } else color[u] = 2; } } - return (back > 0, back); + return (back > 0, back, total); } static string GetArg(string[] args, string key, string def) diff --git a/Tests/Distribution/Deadlock/sweep-results.csv b/Tests/Distribution/Deadlock/sweep-results.csv new file mode 100644 index 0000000..0b4425d --- /dev/null +++ b/Tests/Distribution/Deadlock/sweep-results.csv @@ -0,0 +1,7 @@ +"nodes","res1","res2","totalResources","backEdges","attached","cycleBreaks","medianMs","completed","deadlocked" +"25","25","25","75","20","75","42","23.3","3","0" +"50","50","50","150","39","150","90","79.8","3","0" +"100","100","100","300","88","300","159","273.7","3","0" +"200","200","200","600","149","600","243","670.0","3","0" +"400","400","400","1200","329","1200","486","5649.9","3","0" +"800","800","800","2400","614","2400","975","56724.2","3","0" diff --git a/Tests/Distribution/Deadlock/sweep-results.md b/Tests/Distribution/Deadlock/sweep-results.md new file mode 100644 index 0000000..4e58782 --- /dev/null +++ b/Tests/Distribution/Deadlock/sweep-results.md @@ -0,0 +1,9 @@ +| nodes | res1 | res2 | total resources | back-edges | attached/run | cycle-breaks(3 runs) | median ms | completed | deadlocked | +|------:|-----:|-----:|----------------:|-----------:|-------------:|---------------------:|----------:|----------:|-----------:| +| 25 | 25 | 25 | 75 | 20 | 75 | 42 | 23.3 | 3 | 0 | +| 50 | 50 | 50 | 150 | 39 | 150 | 90 | 79.8 | 3 | 0 | +| 100 | 100 | 100 | 300 | 88 | 300 | 159 | 273.7 | 3 | 0 | +| 200 | 200 | 200 | 600 | 149 | 600 | 243 | 670.0 | 3 | 0 | +| 400 | 400 | 400 | 1200 | 329 | 1200 | 486 | 5649.9 | 3 | 0 | +| 800 | 800 | 800 | 2400 | 614 | 2400 | 975 | 56724.2 | 3 | 0 | + diff --git a/Tests/Distribution/Deadlock/sweep.ps1 b/Tests/Distribution/Deadlock/sweep.ps1 new file mode 100644 index 0000000..34c576c --- /dev/null +++ b/Tests/Distribution/Deadlock/sweep.ps1 @@ -0,0 +1,55 @@ +# Scalability sweep for the distributed deadlock test (loopback). +# For each size, starts a fresh server hosting a ring of `nodes` plus `res1`+`res2` densely +# cross-referencing resources, then runs the client (WaitWithCycleDetection) and records the +# graph size, back-edges, cycle-breaks, and completion time. Output: sweep-results.csv / .md +$ErrorActionPreference = "SilentlyContinue" +Set-Location $PSScriptRoot\..\..\.. + +$srv = "Tests\Distribution\Deadlock\Server\bin\Release\net10.0\Esiur.Tests.Deadlock.Server.exe" +$cli = "Tests\Distribution\Deadlock\Client\bin\Release\net10.0\Esiur.Tests.Deadlock.Client.exe" + +$sizes = @(25, 50, 100, 200, 400, 800) +$port = 11200 +$rows = @() + +foreach ($s in $sizes) { + $port++ + Get-Process -Name "Esiur.Tests.Deadlock.Server" -ErrorAction SilentlyContinue | Stop-Process -Force + $so = "Tests\Distribution\Deadlock\sweep_srv_$s.txt" + Remove-Item $so -ErrorAction SilentlyContinue + $p = Start-Process -FilePath $srv -ArgumentList "--port $port --topology ring --nodes $s --res1 $s --res2 $s" -PassThru -NoNewWindow -RedirectStandardOutput $so + + # Wait until the server reports it is listening (poll up to 90s). + $census = "" + for ($t = 0; $t -lt 180; $t++) { + Start-Sleep -Milliseconds 500 + $c = Get-Content $so -ErrorAction SilentlyContinue + if ($c -match "Listening") { $census = ($c | Select-Object -First 1); break } + } + + $backEdges = if ($census -match "backEdges=(\d+)") { $matches[1] } else { "?" } + + # Run the client; parse its summary. + $out = & $cli --host 127.0.0.1 --port $port --nodes $s --mode WaitWithCycleDetection --iterations 3 --stall-ms 30000 --hard-ms 180000 2>&1 + $total = $s * 3 + $median = ($out | Select-String "completion ms: median=([\d.]+)").Matches.Groups[1].Value + $attached = ($out | Select-String "resources attached per run \(max\)=(\d+)").Matches.Groups[1].Value + $breaks = ($out | Select-String "cycle-breaks total=(\d+)").Matches.Groups[1].Value + $completed = ($out | Select-String "completed=(\d+)").Matches.Groups[1].Value + $deadlocked = ($out | Select-String "deadlocked=(\d+)").Matches.Groups[1].Value + + $rows += [pscustomobject]@{ nodes=$s; res1=$s; res2=$s; totalResources=$total; backEdges=$backEdges; attached=$attached; cycleBreaks=$breaks; medianMs=$median; completed=$completed; deadlocked=$deadlocked } + Write-Output "size=$s total=$total back=$backEdges attached=$attached breaks=$breaks median=$median ms completed=$completed" + + Stop-Process -Id $p.Id -Force -ErrorAction SilentlyContinue + Get-Process -Name "Esiur.Tests.Deadlock.Server" -ErrorAction SilentlyContinue | Stop-Process -Force + Start-Sleep -Seconds 1 +} + +$rows | Export-Csv -Path "Tests\Distribution\Deadlock\sweep-results.csv" -NoTypeInformation +$md = "| nodes | res1 | res2 | total resources | back-edges | attached/run | cycle-breaks(3 runs) | median ms | completed | deadlocked |`n" +$md += "|------:|-----:|-----:|----------------:|-----------:|-------------:|---------------------:|----------:|----------:|-----------:|`n" +foreach ($r in $rows) { $md += "| $($r.nodes) | $($r.res1) | $($r.res2) | $($r.totalResources) | $($r.backEdges) | $($r.attached) | $($r.cycleBreaks) | $($r.medianMs) | $($r.completed) | $($r.deadlocked) |`n" } +Set-Content -Path "Tests\Distribution\Deadlock\sweep-results.md" -Value $md -Encoding utf8 +Write-Output "=== DONE ===" +Write-Output $md diff --git a/Tests/Distribution/NodeFanoutSweep/Client/ControlResource.cs b/Tests/Distribution/NodeFanoutSweep/Client/ControlResource.cs new file mode 100644 index 0000000..2ed192e --- /dev/null +++ b/Tests/Distribution/NodeFanoutSweep/Client/ControlResource.cs @@ -0,0 +1,15 @@ +using Esiur.Resource; + +/// +/// Server-side telemetry resource the sweep orchestrator attaches to (sys/control). The server +/// updates these once per second; the updates propagate to the orchestrator so it can attribute +/// fan-out saturation to the server (CPU across all cores, can exceed 100%) and confirm the +/// connected-subscriber count. Exported as fields so the runtime generates change-notifying +/// properties (CpuPercent, ConnectedClients). +/// +[Resource] +public partial class ControlResource : Resource +{ + [Export] public double cpuPercent; + [Export] public int connectedClients; +} diff --git a/Tests/Distribution/NodeFanoutSweep/Client/Esiur.Tests.NodeFanoutSweep.Client.csproj b/Tests/Distribution/NodeFanoutSweep/Client/Esiur.Tests.NodeFanoutSweep.Client.csproj index 634bddd..44e8917 100644 --- a/Tests/Distribution/NodeFanoutSweep/Client/Esiur.Tests.NodeFanoutSweep.Client.csproj +++ b/Tests/Distribution/NodeFanoutSweep/Client/Esiur.Tests.NodeFanoutSweep.Client.csproj @@ -8,7 +8,7 @@ - + diff --git a/Tests/Distribution/NodeFanoutSweep/Client/Program.cs b/Tests/Distribution/NodeFanoutSweep/Client/Program.cs index 1bc52a6..063a36b 100644 --- a/Tests/Distribution/NodeFanoutSweep/Client/Program.cs +++ b/Tests/Distribution/NodeFanoutSweep/Client/Program.cs @@ -1 +1,77 @@ -Console.WriteLine("Hello, World!"); +// ============================================================ +// Scalability Extension: Fan-Out — SERVER NODE +// Hosts M sensor resources and emits Value updates at a fixed interval (the fan-out source). Also +// hosts sys/control, updated once per second with the server process CPU (% across all cores) and +// the live subscriber count, which the sweep orchestrator reads to characterise saturation. +// Anonymous (None-mode) access so subscribers connect without credentials. +// +// Usage: dotnet run -- --port 10900 --resources 100 --emit-interval-ms 50 +// (Run the orchestrator from the sibling "Server" project against this host:port.) +// ============================================================ + +using Esiur.Protocol; +using Esiur.Resource; +using Esiur.Stores; +using System.Diagnostics; + +var port = int.Parse(GetArg(args, "--port", "10900")); +var resources = int.Parse(GetArg(args, "--resources", "100")); +var emitIntervalMs = int.Parse(GetArg(args, "--emit-interval-ms", "50")); + +Console.WriteLine($"[Server] resources={resources} emit-interval={emitIntervalMs}ms port={port} cores={Environment.ProcessorCount}"); + +var wh = new Warehouse(); +await wh.Put("sys", new MemoryStore()); +var server = await wh.Put("sys/server", new EpServer { Port = (ushort)port, AllowUnauthorizedAccess = true }); + +var sensors = new SensorResource[resources]; +for (var i = 0; i < resources; i++) { sensors[i] = new SensorResource { SensorId = i }; await wh.Put($"sys/sensor_{i}", sensors[i]); } + +var control = new ControlResource(); +await wh.Put("sys/control", control); + +await wh.Open(); +Console.WriteLine($"[Server] Listening on port {port} with {resources} sensors + sys/control. Press Ctrl+C to stop."); + +// Emit loop: drives property-change notifications to every attached subscriber. +var sw = Stopwatch.StartNew(); +_ = Task.Run(async () => +{ + while (true) + { + await Task.Delay(emitIntervalMs); + var value = sw.Elapsed.TotalSeconds; + foreach (var s in sensors) s.Value = value; + } +}); + +// Telemetry loop: publish server CPU (% across all cores) and subscriber count once per second. +_ = Task.Run(async () => +{ + var proc = Process.GetCurrentProcess(); + var prevCpu = proc.TotalProcessorTime; + var prevWall = DateTime.UtcNow; + while (true) + { + await Task.Delay(1000); + proc.Refresh(); + var nowCpu = proc.TotalProcessorTime; + var nowWall = DateTime.UtcNow; + var wallMs = (nowWall - prevWall).TotalMilliseconds; + control.CpuPercent = wallMs > 0 ? (nowCpu - prevCpu).TotalMilliseconds / wallMs * 100.0 : 0; + control.ConnectedClients = server.Connections.Count; + prevCpu = nowCpu; + prevWall = nowWall; + } +}); + +var stop = new TaskCompletionSource(); +Console.CancelKeyPress += (_, e) => { e.Cancel = true; stop.TrySetResult(); }; +await stop.Task; +await wh.Close(); + +static string GetArg(string[] args, string key, string def) +{ + var i = Array.IndexOf(args, key); + return (i >= 0 && i + 1 < args.Length) ? args[i + 1] : def; +} diff --git a/Tests/Distribution/NodeFanoutSweep/Client/SensorResource.cs b/Tests/Distribution/NodeFanoutSweep/Client/SensorResource.cs new file mode 100644 index 0000000..8940580 --- /dev/null +++ b/Tests/Distribution/NodeFanoutSweep/Client/SensorResource.cs @@ -0,0 +1,14 @@ +using Esiur.Resource; + +/// +/// Observable sensor resource. Setting value raises a property-change notification that the +/// Esiur runtime propagates to every attached subscriber — the fan-out path under measurement. +/// (The generated property is named Value; subscribers filter on that name.) +/// +[Resource] +public partial class SensorResource : Resource +{ + public int SensorId { get; set; } + + [Export] public double value; +} diff --git a/Tests/Distribution/NodeFanoutSweep/Server/Program.cs b/Tests/Distribution/NodeFanoutSweep/Server/Program.cs index 3cf6ff2..b03d372 100644 --- a/Tests/Distribution/NodeFanoutSweep/Server/Program.cs +++ b/Tests/Distribution/NodeFanoutSweep/Server/Program.cs @@ -67,11 +67,17 @@ Console.WriteLine($"[Orchestrator] N values: {string.Join(",", nValues)}"); // Attach to the server's control resource once. // ---------------------------------------------------------------- var controlWh = new Warehouse(); -dynamic? control = null; +EpResource? control = null; +byte cpuIdx = 255, clientsIdx = 255; try { var controlConn = await controlWh.Get($"ep://{host}:{port}"); - control = await controlConn.Get("sys/control"); + control = (EpResource)await controlConn.Get("sys/control"); + // Resolve property indices by name (EpResource exposes values by index, not dynamic member). + var props = control.Instance.Definition.Properties; + cpuIdx = (byte)Array.FindIndex(props, p => p.Name == "CpuPercent"); + clientsIdx = (byte)Array.FindIndex(props, p => p.Name == "ConnectedClients"); + Console.WriteLine($"[Orchestrator] sys/control attached (CpuPercent=idx {cpuIdx}, ConnectedClients=idx {clientsIdx})."); } catch (Exception ex) { @@ -141,19 +147,38 @@ foreach (int n in nValues) Console.WriteLine($"[Orchestrator] Measurement window {windowSec}s..."); var cpuSamples = new List(); var connSamples = new List(); + var clientCpuSamples = new List(); + var clientProc = Process.GetCurrentProcess(); + var prevClientCpu = clientProc.TotalProcessorTime; + var prevClientWall = DateTime.UtcNow; var winSw = Stopwatch.StartNew(); while (winSw.Elapsed.TotalSeconds < windowSec) { await Task.Delay(1000); - if (control != null) + + // Server CPU + subscriber count via the control resource (read by property index; + // values arrive as variable-width numerics, hence Convert.*). + if (control != null && cpuIdx != 255) { try { - cpuSamples.Add((double)control.CpuPercent); - connSamples.Add((int)control.ConnectedClients); + if (control.TryGetPropertyValue(cpuIdx, out var cpuVal) && cpuVal != null) + cpuSamples.Add(Convert.ToDouble(cpuVal)); + if (control.TryGetPropertyValue(clientsIdx, out var cliVal) && cliVal != null) + connSamples.Add(Convert.ToInt32(cliVal)); } - catch { /* control resource may not have current value yet */ } + catch { /* control resource may not have a current value yet */ } } + + // This harness's own CPU (% across all cores). Recorded so saturation can be attributed + // to the server rather than to the single subscriber process driving N connections. + clientProc.Refresh(); + var nowClientCpu = clientProc.TotalProcessorTime; + var nowClientWall = DateTime.UtcNow; + var wallMs = (nowClientWall - prevClientWall).TotalMilliseconds; + if (wallMs > 0) clientCpuSamples.Add((nowClientCpu - prevClientCpu).TotalMilliseconds / wallMs * 100.0); + prevClientCpu = nowClientCpu; + prevClientWall = nowClientWall; } double elapsedSec = winSw.Elapsed.TotalSeconds; @@ -176,12 +201,15 @@ foreach (int n in nValues) double aggregate = perSubRates.Sum(); double avgServerCpu = cpuSamples.Count > 0 ? cpuSamples.Average() : double.NaN; double peakServerCpu = cpuSamples.Count > 0 ? cpuSamples.Max() : double.NaN; + double avgClientCpu = clientCpuSamples.Count > 0 ? clientCpuSamples.Average() : double.NaN; + double peakClientCpu = clientCpuSamples.Count > 0 ? clientCpuSamples.Max() : double.NaN; Console.WriteLine($"[Orchestrator] N={n} rep={rep + 1}: " + $"mean_per_sub={meanPerSub:F1}/s " + $"aggregate={aggregate:F0}/s " + $"late={totalLate} " - + $"server_cpu_avg={avgServerCpu:F1}% peak={peakServerCpu:F1}%"); + + $"server_cpu_avg={avgServerCpu:F1}%/peak={peakServerCpu:F1}% " + + $"client_cpu_avg={avgClientCpu:F1}%/peak={peakClientCpu:F1}%"); perRepResults.Add(new RepResult { @@ -195,6 +223,8 @@ foreach (int n in nValues) LateDeliveries = totalLate, ServerCpuAvg = avgServerCpu, ServerCpuPeak = peakServerCpu, + ClientCpuAvg = avgClientCpu, + ClientCpuPeak = peakClientCpu, }); // ---------- teardown ---------- @@ -237,6 +267,8 @@ foreach (int n in nValues) TotalLate = perRepResults.Sum(r => r.LateDeliveries), MeanServerCpuAvg = perRepResults.Average(r => r.ServerCpuAvg), MeanServerCpuPeak = perRepResults.Average(r => r.ServerCpuPeak), + MeanClientCpuAvg = perRepResults.Average(r => r.ClientCpuAvg), + MeanClientCpuPeak = perRepResults.Average(r => r.ClientCpuPeak), }); } } @@ -246,12 +278,13 @@ foreach (int n in nValues) // ---------------------------------------------------------------- var sb = new System.Text.StringBuilder(); sb.AppendLine("n,replications,mean_per_sub_rate,ci95_halfwidth,mean_aggregate," + - "total_late,mean_server_cpu_avg,mean_server_cpu_peak"); + "total_late,mean_server_cpu_avg,mean_server_cpu_peak,mean_client_cpu_avg,mean_client_cpu_peak"); foreach (var r in allResults) { sb.AppendLine(string.Create(CultureInfo.InvariantCulture, $"{r.N},{r.Replications},{r.MeanPerSubRate:F2},{r.Ci95HalfWidth:F2}," + - $"{r.MeanAggregate:F1},{r.TotalLate},{r.MeanServerCpuAvg:F2},{r.MeanServerCpuPeak:F2}")); + $"{r.MeanAggregate:F1},{r.TotalLate},{r.MeanServerCpuAvg:F2},{r.MeanServerCpuPeak:F2}," + + $"{r.MeanClientCpuAvg:F2},{r.MeanClientCpuPeak:F2}")); } await File.WriteAllTextAsync(outputCsv, sb.ToString()); Console.WriteLine($"\n[Orchestrator] Results written to {outputCsv}"); @@ -375,6 +408,8 @@ record RepResult public long LateDeliveries; public double ServerCpuAvg; public double ServerCpuPeak; + public double ClientCpuAvg; + public double ClientCpuPeak; } record SweepResult @@ -387,4 +422,6 @@ record SweepResult public long TotalLate; public double MeanServerCpuAvg; public double MeanServerCpuPeak; + public double MeanClientCpuAvg; + public double MeanClientCpuPeak; } \ No newline at end of file diff --git a/deadlock_WaitWithCycleDetection_127.0.0.1_11001.csv b/deadlock_WaitWithCycleDetection_127.0.0.1_11001.csv new file mode 100644 index 0000000..490e4da --- /dev/null +++ b/deadlock_WaitWithCycleDetection_127.0.0.1_11001.csv @@ -0,0 +1,4 @@ +iteration,outcome,ms,attached,cycle_breaks,unnecessary_placeholders,unpublished +1,Completed,394.7,300,53,0,0 +2,Completed,508.0,300,53,0,0 +3,Completed,335.2,300,53,0,0 \ No newline at end of file diff --git a/deadlock_WaitWithCycleDetection_127.0.0.1_11201.csv b/deadlock_WaitWithCycleDetection_127.0.0.1_11201.csv new file mode 100644 index 0000000..b1a836f --- /dev/null +++ b/deadlock_WaitWithCycleDetection_127.0.0.1_11201.csv @@ -0,0 +1,4 @@ +iteration,outcome,ms,attached,cycle_breaks,unnecessary_placeholders,unpublished +1,Completed,71.4,75,14,0,0 +2,Completed,16.7,75,14,0,0 +3,Completed,23.3,75,14,0,0 \ No newline at end of file diff --git a/deadlock_WaitWithCycleDetection_127.0.0.1_11202.csv b/deadlock_WaitWithCycleDetection_127.0.0.1_11202.csv new file mode 100644 index 0000000..6904ab4 --- /dev/null +++ b/deadlock_WaitWithCycleDetection_127.0.0.1_11202.csv @@ -0,0 +1,4 @@ +iteration,outcome,ms,attached,cycle_breaks,unnecessary_placeholders,unpublished +1,Completed,115.2,150,30,0,0 +2,Completed,74.1,150,30,0,0 +3,Completed,79.8,150,30,0,0 \ No newline at end of file diff --git a/deadlock_WaitWithCycleDetection_127.0.0.1_11203.csv b/deadlock_WaitWithCycleDetection_127.0.0.1_11203.csv new file mode 100644 index 0000000..7a107e6 --- /dev/null +++ b/deadlock_WaitWithCycleDetection_127.0.0.1_11203.csv @@ -0,0 +1,4 @@ +iteration,outcome,ms,attached,cycle_breaks,unnecessary_placeholders,unpublished +1,Completed,344.4,300,53,0,0 +2,Completed,254.2,300,53,0,0 +3,Completed,273.7,300,53,0,0 \ No newline at end of file diff --git a/deadlock_WaitWithCycleDetection_127.0.0.1_11204.csv b/deadlock_WaitWithCycleDetection_127.0.0.1_11204.csv new file mode 100644 index 0000000..1d089bf --- /dev/null +++ b/deadlock_WaitWithCycleDetection_127.0.0.1_11204.csv @@ -0,0 +1,4 @@ +iteration,outcome,ms,attached,cycle_breaks,unnecessary_placeholders,unpublished +1,Completed,975.6,600,81,0,0 +2,Completed,623.9,600,81,0,0 +3,Completed,670.0,600,81,0,0 \ No newline at end of file diff --git a/deadlock_WaitWithCycleDetection_127.0.0.1_11205.csv b/deadlock_WaitWithCycleDetection_127.0.0.1_11205.csv new file mode 100644 index 0000000..2bbacc2 --- /dev/null +++ b/deadlock_WaitWithCycleDetection_127.0.0.1_11205.csv @@ -0,0 +1,4 @@ +iteration,outcome,ms,attached,cycle_breaks,unnecessary_placeholders,unpublished +1,Completed,5956.2,1200,162,0,0 +2,Completed,5649.9,1200,162,0,0 +3,Completed,4763.6,1200,162,0,0 \ No newline at end of file diff --git a/deadlock_WaitWithCycleDetection_127.0.0.1_11206.csv b/deadlock_WaitWithCycleDetection_127.0.0.1_11206.csv new file mode 100644 index 0000000..e3ea074 --- /dev/null +++ b/deadlock_WaitWithCycleDetection_127.0.0.1_11206.csv @@ -0,0 +1,4 @@ +iteration,outcome,ms,attached,cycle_breaks,unnecessary_placeholders,unpublished +1,Completed,56724.2,2400,325,0,0 +2,Completed,64782.9,2400,325,0,0 +3,Completed,50450.8,2400,325,0,0 \ No newline at end of file