diff --git a/Libraries/Esiur/Protocol/EpConnectionProtocol.cs b/Libraries/Esiur/Protocol/EpConnectionProtocol.cs index 76123f4..3ca1264 100644 --- a/Libraries/Esiur/Protocol/EpConnectionProtocol.cs +++ b/Libraries/Esiur/Protocol/EpConnectionProtocol.cs @@ -755,7 +755,7 @@ partial class EpConnection var resourceId = Convert.ToUInt32(args[0]); - var age = (ulong)args[1]; + var age = Convert.ToUInt64(args[1]); Instance.Warehouse.GetById(resourceId).Then((res) => { diff --git a/Tests/Serialization/Gvwie/GeneratorPattern.cs b/Tests/Serialization/Gvwie/GeneratorPattern.cs index 7134da3..bea7ad3 100644 --- a/Tests/Serialization/Gvwie/GeneratorPattern.cs +++ b/Tests/Serialization/Gvwie/GeneratorPattern.cs @@ -11,6 +11,7 @@ namespace Esiur.Tests.Gvwie Negative, Alternating, Small, + Medium, Ascending, Clustering, } diff --git a/Tests/Serialization/Gvwie/IntArrayGenerator.cs b/Tests/Serialization/Gvwie/IntArrayGenerator.cs index fbe8e9d..6dfdc93 100644 --- a/Tests/Serialization/Gvwie/IntArrayGenerator.cs +++ b/Tests/Serialization/Gvwie/IntArrayGenerator.cs @@ -24,7 +24,7 @@ public static class IntArrayGenerator /// - minGap / maxGap: approximate gap between runs (large gaps produce the jump examples) /// /// - public static void InitRng(int seed= 24241564) => rng = new Random(seed); + public static void InitRng(int seed = 24241564) => rng = new Random(seed); public static long[] GenerateRuns(int length, int minRunSize = 3, @@ -140,6 +140,17 @@ public static class IntArrayGenerator data[i] = rng.Next(0, int.MaxValue); break; + case GeneratorPattern.Medium: + for (int i = 0; i < length; i++) + data[i] = rng.Next(0, short.MaxValue); + break; + + //case GeneratorPattern.Large: + // for (int i = 0; i < length; i++) + // data[i] = rng.Next(0, int.MaxValue); + // break; + + case GeneratorPattern.Negative: for (int i = 0; i < length; i++) data[i] = -rng.Next(int.MinValue, 0); @@ -156,7 +167,8 @@ public static class IntArrayGenerator case GeneratorPattern.Small: // Focused on small magnitudes to test ZigZag fast path for (int i = 0; i < length; i++) - data[i] = rng.Next(-64, 65); + //data[i] = rng.Next(-64, 65); + data[i] = rng.Next(sbyte.MinValue, sbyte.MaxValue); break; diff --git a/Tests/Serialization/Gvwie/IntArrayRunner.cs b/Tests/Serialization/Gvwie/IntArrayRunner.cs index d608a69..4c02d27 100644 --- a/Tests/Serialization/Gvwie/IntArrayRunner.cs +++ b/Tests/Serialization/Gvwie/IntArrayRunner.cs @@ -29,7 +29,7 @@ namespace Esiur.Tests.Gvwie public void Run() { - const int TEST_ITERATIONS = 100; + const int TEST_ITERATIONS = 10; const int SAMPLE_SIZE = 100; Console.WriteLine(",Esiur,Aligned,FlatBuffer,ProtoBuffer,MessagePack,BSON,CBOR,Avro,Optimal"); @@ -41,15 +41,15 @@ namespace Esiur.Tests.Gvwie Average(() => CompareInt(IntArrayGenerator.GenerateInt32(SAMPLE_SIZE, GeneratorPattern.Clustering)), TEST_ITERATIONS) ); - Console.Write("Positive (Int32);"); + Console.Write("Large (Int32);"); PrintAverage( Average(() => CompareInt(IntArrayGenerator.GenerateInt32(SAMPLE_SIZE, GeneratorPattern.Uniform)), TEST_ITERATIONS) ); - Console.Write("Negative (Int32);"); + Console.Write("Medium (Int32);"); PrintAverage( - Average(() => CompareInt(IntArrayGenerator.GenerateInt32(SAMPLE_SIZE, GeneratorPattern.Negative)), TEST_ITERATIONS) + Average(() => CompareInt(IntArrayGenerator.GenerateInt32(SAMPLE_SIZE, GeneratorPattern.Medium)), TEST_ITERATIONS) ); Console.Write("Small (Int32);"); @@ -57,6 +57,15 @@ namespace Esiur.Tests.Gvwie Average(() => CompareInt(IntArrayGenerator.GenerateInt32(SAMPLE_SIZE, GeneratorPattern.Small)), TEST_ITERATIONS) ); + + Console.Write("Negative (Int32);"); + PrintAverage( + Average(() => CompareInt(IntArrayGenerator.GenerateInt32(SAMPLE_SIZE, GeneratorPattern.Negative)), TEST_ITERATIONS) + ); + + + + Console.Write("Alternating (Int32);"); PrintAverage( Average(() => CompareInt(IntArrayGenerator.GenerateInt32(SAMPLE_SIZE, GeneratorPattern.Alternating)), TEST_ITERATIONS) @@ -68,6 +77,10 @@ namespace Esiur.Tests.Gvwie Average(() => CompareInt(IntArrayGenerator.GenerateInt32(SAMPLE_SIZE, GeneratorPattern.Ascending)), TEST_ITERATIONS) ); + + + + Console.Write("Int64;"); PrintAverage( @@ -450,7 +463,7 @@ namespace Esiur.Tests.Gvwie public static byte[] SerializeFlatBuffers(ArrayRoot array) { - var buffer = new byte[1000000000]; + var buffer = new byte[1000000]; var len = FlatBufferSerializer.Default.Serialize(array, buffer); return buffer.Take(len).ToArray(); } diff --git a/Tests/Serialization/Gvwie/Program.cs b/Tests/Serialization/Gvwie/Program.cs index 0dc26d2..f8c38d3 100644 --- a/Tests/Serialization/Gvwie/Program.cs +++ b/Tests/Serialization/Gvwie/Program.cs @@ -23,6 +23,6 @@ MessagePack.MessagePackSerializer.DefaultOptions = MessagePackSerializerOptions. var ints = new IntArrayRunner(); IntArrayGenerator.InitRng(); ints.Run(); -IntArrayGenerator.InitRng(); -ints.RunChart(); +//IntArrayGenerator.InitRng(); +//ints.RunChart(); diff --git a/experiments/CrossLanguageRecoveryServer/CrossLanguageRecoveryServer.csproj b/experiments/CrossLanguageRecoveryServer/CrossLanguageRecoveryServer.csproj new file mode 100644 index 0000000..62bb4c5 --- /dev/null +++ b/experiments/CrossLanguageRecoveryServer/CrossLanguageRecoveryServer.csproj @@ -0,0 +1,14 @@ + + + + Exe + net10.0 + enable + enable + + + + + + + diff --git a/experiments/CrossLanguageRecoveryServer/Program.cs b/experiments/CrossLanguageRecoveryServer/Program.cs new file mode 100644 index 0000000..ba98863 --- /dev/null +++ b/experiments/CrossLanguageRecoveryServer/Program.cs @@ -0,0 +1,108 @@ +using System.Text.Json; +using Esiur.Protocol; +using Esiur.Resource; +using Esiur.Stores; +using Esiur.Experiments.CrossLanguageRecoveryServer; + +var host = GetArg(args, "--host", "127.0.0.1"); +var port = int.Parse(GetArg(args, "--port", "10901")); +var updatePeriodMs = int.Parse(GetArg(args, "--update-period", "100")); +var outputDirectory = Path.GetFullPath(GetArg(args, "--output", Path.Combine("results", "cross-language-recovery"))); +var waitForStdin = !HasFlag(args, "--no-stdin"); + +Directory.CreateDirectory(outputDirectory); + +var shutdown = new CancellationTokenSource(); +Console.CancelKeyPress += (_, e) => +{ + e.Cancel = true; + shutdown.Cancel(); +}; + +var wh = new Warehouse(); +await wh.Put("sys", new MemoryStore()); +var epServer = new EpServer +{ + Port = (ushort)port, + AllowUnauthorizedAccess = true +}; + +if (!string.IsNullOrWhiteSpace(host) && host != "0.0.0.0") + epServer.IP = host; + +var server = await wh.Put("sys/server", epServer); + +var resource = await wh.Put("sys/recovery", new RecoveryTestResource(outputDirectory)); +await wh.Open(); +resource.SetStatus("ready"); +resource.StartPeriodicUpdates(updatePeriodMs, shutdown.Token); +resource.AppendLog("server_started"); + +var ready = new +{ + host, + port, + url = $"ep://{host}:{port}", + websocket_url = $"ws://{host}:{port}", + resource_path = "sys/recovery", + update_period_ms = updatePeriodMs, + output_directory = outputDirectory, + started_utc = DateTimeOffset.UtcNow +}; + +var readyPath = Path.Combine(outputDirectory, "server-ready.json"); +await File.WriteAllTextAsync(readyPath, JsonSerializer.Serialize(ready, new JsonSerializerOptions { WriteIndented = true })); + +Console.WriteLine($"[CrossLanguageRecoveryServer] listening ep://{host}:{port}/sys/recovery"); +Console.WriteLine($"[CrossLanguageRecoveryServer] updatePeriodMs={updatePeriodMs}"); +Console.WriteLine($"[CrossLanguageRecoveryServer] output={outputDirectory}"); +Console.WriteLine(waitForStdin + ? "[CrossLanguageRecoveryServer] Press ENTER or Ctrl+C to stop." + : "[CrossLanguageRecoveryServer] Running until the process is stopped."); + +_ = Task.Run(async () => +{ + try + { + while (!shutdown.IsCancellationRequested) + { + await Task.Delay(1000, shutdown.Token).ConfigureAwait(false); + var state = resource.CreateSnapshot(); + Console.WriteLine($"[CrossLanguageRecoveryServer] counter={state.Counter} status={state.Status} age={state.Age} clients={server.Connections.Count}"); + } + } + catch (TaskCanceledException) + { + } +}); + +if (waitForStdin) +{ + _ = Task.Run(() => + { + Console.ReadLine(); + shutdown.Cancel(); + }); +} + +try +{ + await Task.Delay(Timeout.InfiniteTimeSpan, shutdown.Token); +} +catch (TaskCanceledException) +{ +} +finally +{ + resource.AppendLog("server_stopping"); + resource.StopPeriodicUpdates(); + await wh.Close(); +} + +static string GetArg(string[] args, string key, string defaultValue) +{ + var i = Array.IndexOf(args, key); + return i >= 0 && i + 1 < args.Length ? args[i + 1] : defaultValue; +} + +static bool HasFlag(string[] args, string key) => args.Contains(key); diff --git a/experiments/CrossLanguageRecoveryServer/RecoveryTestResource.cs b/experiments/CrossLanguageRecoveryServer/RecoveryTestResource.cs new file mode 100644 index 0000000..0d08284 --- /dev/null +++ b/experiments/CrossLanguageRecoveryServer/RecoveryTestResource.cs @@ -0,0 +1,128 @@ +using System.Text.Json; +using Esiur.Resource; + +namespace Esiur.Experiments.CrossLanguageRecoveryServer; + +[Resource] +[Annotation("experiment", "Cross-Language TypeDef Discovery and Reattachment Recovery")] +public partial class RecoveryTestResource +{ + readonly object sync = new(); + readonly string logPath; + CancellationTokenSource? updatesCts; + volatile bool updatesPaused; + + [Export] + [Annotation("semantic", "monotonic counter incremented by the C# server")] + int counter; + + [Export] + [Annotation("semantic", "client-visible service status")] + string status = "starting"; + + [Export] + [Annotation("semantic", "UTC tick timestamp of the last counter update")] + long lastUpdateTicks; + + [Export] + [Annotation("semantic", "raised whenever Counter is incremented")] + public event ResourceEventHandler? CounterChanged; + + public RecoveryTestResource(string outputDirectory) + { + Directory.CreateDirectory(outputDirectory); + logPath = Path.Combine(outputDirectory, "server_log.jsonl"); + File.WriteAllText(logPath, ""); + } + + [Export] + [Annotation("semantic", "returns a + b; used to verify dynamic function invocation")] + public int Add(int a, int b) => a + b; + + [Export] + [Annotation("semantic", "sets Status and returns true")] + public bool SetStatus(string value) + { + Status = value; + return true; + } + + [Export] + [Annotation("semantic", "pauses or resumes periodic server updates")] + public bool SetUpdatesPaused(bool paused) + { + updatesPaused = paused; + return true; + } + + [Export] + [Annotation("semantic", "authoritative C# state snapshot encoded as JSON")] + public string GetAuthoritativeStateJson() => JsonSerializer.Serialize(CreateSnapshot()); + + public void StartPeriodicUpdates(int updatePeriodMs, CancellationToken shutdown) + { + updatesCts = CancellationTokenSource.CreateLinkedTokenSource(shutdown); + var token = updatesCts.Token; + _ = Task.Run(async () => + { + while (!token.IsCancellationRequested) + { + await Task.Delay(updatePeriodMs, token).ConfigureAwait(false); + if (updatesPaused) + continue; + + int value; + lock (sync) + { + Counter = Counter + 1; + LastUpdateTicks = DateTime.UtcNow.Ticks; + value = Counter; + } + + CounterChanged?.Invoke(value); + AppendLog("tick"); + } + }, token); + } + + public void StopPeriodicUpdates() + { + updatesCts?.Cancel(); + } + + public RecoveryStateSnapshot CreateSnapshot() + { + lock (sync) + { + return new RecoveryStateSnapshot + { + Counter = Counter, + Status = Status, + LastUpdateTicks = LastUpdateTicks, + Age = Instance?.Age ?? 0, + TimestampUtc = DateTimeOffset.UtcNow, + UpdatesPaused = updatesPaused + }; + } + } + + public void AppendLog(string eventName) + { + var payload = new + { + event_name = eventName, + state = CreateSnapshot() + }; + File.AppendAllText(logPath, JsonSerializer.Serialize(payload) + Environment.NewLine); + } +} + +public sealed class RecoveryStateSnapshot +{ + public int Counter { get; set; } + public string Status { get; set; } = ""; + public long LastUpdateTicks { get; set; } + public ulong Age { get; set; } + public DateTimeOffset TimestampUtc { get; set; } + public bool UpdatesPaused { get; set; } +}