diff --git a/Libraries/Esiur/Data/Tru.cs b/Libraries/Esiur/Data/Tru.cs index e253705..2d397f6 100644 --- a/Libraries/Esiur/Data/Tru.cs +++ b/Libraries/Esiur/Data/Tru.cs @@ -9,6 +9,7 @@ using System.Collections.Generic; using System.Data; using System.Dynamic; using System.Linq; +using System.Reflection; using System.Text; #nullable enable @@ -368,13 +369,21 @@ namespace Esiur.Data || type.IsEnum) { - var typeDef = warehouse.GetLocalTypeDefByType(type); + var remoteAttr = type.GetCustomAttribute(); - if (typeDef == null) - throw new Exception("Unregistered type: " + type.FullName + "."); + if (remoteAttr != null) + { + var typeDef = warehouse.GetRemoteTypeDefByName(remoteAttr.Domains, remoteAttr.FullName); + } + else + { + var typeDef = warehouse.GetLocalTypeDefByType(type); - return new TruTypeDef(nullable, typeDef); + if (typeDef == null) + throw new Exception("Unregistered type: " + type.FullName + "."); + return new TruTypeDef(nullable, typeDef); + } } else if (type.IsGenericType) { @@ -392,10 +401,10 @@ namespace Esiur.Data else { var subType = FromType(args[0], warehouse); - + if (subType == null) // unrecongnized type throw new Exception("Unrecognized type: " + args[0].FullName); - + return new TruComposite(TruIdentifier.TypedList, nullable, new Tru[] { subType }, type); @@ -867,9 +876,9 @@ namespace Esiur.Data { var td = connection.Instance.Warehouse.GetLocalTypeDefById(typeDefId); - if ( td == null) + if (td == null) throw new Exception("TypeDef not found."); - + return new ParseResult( new TruTypeDef(nullable, td), offset - oOffset); @@ -900,7 +909,7 @@ namespace Esiur.Data if (runtimeType != null && runtimeType.IsValueType && nullable) { - //if (runtimeType.IsValueType)// && Nullable.GetUnderlyingType(runtimeType) == null) + //if (runtimeType.IsValueType)// && Nullable.GetUnderlyingType(runtimeType) == null) runtimeType = typeof(Nullable<>).MakeGenericType(runtimeType); } @@ -911,7 +920,7 @@ namespace Esiur.Data } else { - var runtimeType = nullable ? Tru.NullableTypesMapping[identifier] + var runtimeType = nullable ? Tru.NullableTypesMapping[identifier] : Tru.TypesMapping[identifier]; return new ParseResult( new TruPrimitive(identifier, nullable, runtimeType), diff --git a/Libraries/Esiur/Data/Types/LocalTypeDef.cs b/Libraries/Esiur/Data/Types/LocalTypeDef.cs index 56a59e5..b65dace 100644 --- a/Libraries/Esiur/Data/Types/LocalTypeDef.cs +++ b/Libraries/Esiur/Data/Types/LocalTypeDef.cs @@ -264,7 +264,8 @@ public class LocalTypeDef:TypeDef else throw new Exception("Type must implement IResource, IRecord or inherit from DistributedResource."); - //IsWrapper = Codec.InheritsClass(type, typeof(EpResource)); + if (type.GetCustomAttribute() != null) + throw new Exception("Remote types are not supported as local type definitions."); type = ResourceProxy.GetBaseType(type); diff --git a/Libraries/Esiur/Net/Packets/EpPacketRequest.cs b/Libraries/Esiur/Net/Packets/EpPacketRequest.cs index 72a0b88..516d335 100644 --- a/Libraries/Esiur/Net/Packets/EpPacketRequest.cs +++ b/Libraries/Esiur/Net/Packets/EpPacketRequest.cs @@ -13,7 +13,7 @@ namespace Esiur.Net.Packets Unsubscribe = 0x3, // Request Inquire - TypeDefByName = 0x8, + TypeDefIdsByNames = 0x8, TypeDefById = 0x9, TypeDefByResourceId = 0xA, Query = 0xB, diff --git a/Libraries/Esiur/Protocol/EpConnection.cs b/Libraries/Esiur/Protocol/EpConnection.cs index 9dfbcbe..5168e55 100644 --- a/Libraries/Esiur/Protocol/EpConnection.cs +++ b/Libraries/Esiur/Protocol/EpConnection.cs @@ -532,8 +532,8 @@ public partial class EpConnection : NetworkConnection, IStore EpRequestUnsubscribe(_packet.CallbackId, dt); break; // Inquire - case EpPacketRequest.TypeDefByName: - EpRequestTypeDefByName(_packet.CallbackId, dt); + case EpPacketRequest.TypeDefIdsByNames: + EpRequestTypeDefIdsByNames(_packet.CallbackId, dt); break; case EpPacketRequest.TypeDefById: EpRequestTypeDefById(_packet.CallbackId, dt); @@ -736,7 +736,7 @@ public partial class EpConnection : NetworkConnection, IStore } SendAuthHeaders(EpAuthPacketMethod.SessionEstablished, localHeaders); - + _session.Authenticated = true; _session.LocalIdentity = null; _session.RemoteIdentity = null; @@ -1031,7 +1031,7 @@ public partial class EpConnection : NetworkConnection, IStore - void AuthenticatonCompleted() + async Task AuthenticatonCompleted() { if (this.Instance == null) @@ -1062,12 +1062,50 @@ public partial class EpConnection : NetworkConnection, IStore { _authenticated = true; Status = EpConnectionStatus.Connected; - _openReply?.Trigger(true); - _openReply = null; - OnReady?.Invoke(this); _session.AuthenticationHandler?.Provider?.Login(_session); - //Server?.Membership?.Login(_session); + + + OnReady?.Invoke(this); + + var proxyTypes = Instance.Warehouse.GetProxyTypesByDomain(_remoteDomain); + + var typeDefNames = new List(); + + foreach (var kk in proxyTypes) + { + foreach (var kv in kk.Value) + { + typeDefNames.Add(kv.Key); + } + } + if (typeDefNames.Count > 0) + { + GetTypeDefIds(typeDefNames.ToArray()).Then(ids => + { + var bag = new AsyncBag(); + foreach (var id in ids) + bag.Add(FetchTypeDef(id, null)); + + bag.Seal(); + + bag.Then((o) => + { + _openReply?.Trigger(true); + _openReply = null; + }); + }).Error(ex => + { + _openReply.TriggerError(ex); + // do nothing, proxies won't work but connection is established + }); + } + else + { + _openReply?.Trigger(true); + _openReply = null; + } + } } //private void ProcessClientAuth(byte[] data) diff --git a/Libraries/Esiur/Protocol/EpConnectionProtocol.cs b/Libraries/Esiur/Protocol/EpConnectionProtocol.cs index 5062817..a9feb9a 100644 --- a/Libraries/Esiur/Protocol/EpConnectionProtocol.cs +++ b/Libraries/Esiur/Protocol/EpConnectionProtocol.cs @@ -1011,17 +1011,25 @@ partial class EpConnection Instance.Warehouse.Query(resourceLink).Then(queryCallback); } - void EpRequestTypeDefByName(uint callback, PlainTdu tdu) + void EpRequestTypeDefIdsByNames(uint callback, PlainTdu tdu) { var value = Codec.ParseSync(tdu, Instance.Warehouse); - var className = (string)value; + var classNames = (string[])value; - var typeDef = Instance.Warehouse.GetRemoteTypeDefByName(_remoteDomain, className); + var typeDefs = new List(); - if (typeDef != null) + foreach (var className in classNames) { - SendReply(EpPacketReply.Completed, callback, typeDef.Compose(this)); + //@TODO: need to search in remoteTypeDefs as well + var typeDef = Instance.Warehouse.GetLocalTypeDefByName(className); + if (typeDef != null) + typeDefs.Add(typeDef.Id); + } + + if (typeDefs.Count > 0) + { + SendReply(EpPacketReply.Completed, callback, typeDefs.ToArray()); } else { @@ -2567,6 +2575,18 @@ partial class EpConnection return reply; } + public AsyncReply GetTypeDefIds(string[] fullNames) + { + var reply = new AsyncReply(); + + SendRequest(EpPacketRequest.TypeDefIdsByNames, fullNames) + .Then(result => + { + reply.Trigger((ulong[])result); + }).Error(ex => reply.TriggerError(ex)); + + return reply; + } /// /// Create a new resource. diff --git a/Libraries/Esiur/Resource/Warehouse.cs b/Libraries/Esiur/Resource/Warehouse.cs index dc1768f..a1c9546 100644 --- a/Libraries/Esiur/Resource/Warehouse.cs +++ b/Libraries/Esiur/Resource/Warehouse.cs @@ -713,6 +713,11 @@ public class Warehouse } } + internal KeyList> GetProxyTypesByDomain(string domain) + { + return _proxyTypeDefs[domain]; + } + public bool TryRegisterRemoteTypeDef(string domain, RemoteTypeDef typeDef) { lock (_typeDefsLock) @@ -843,6 +848,14 @@ public class Warehouse return _remoteTypeDefs[domain].Values.FirstOrDefault(x => x.Name == typeName); } + public TypeDef GetProxyTypeDef(string domain, string typeName, TypeDefKind? typeDefKind = null) + { + if (!string.IsNullOrEmpty(domain) || !_remoteTypeDefs.ContainsKey(domain)) + return null; + sdcsdc + return _remoteTypeDefs[domain].Values.FirstOrDefault(x => x.Name == typeName); + } + //public TypeDef GetRemoteTypeDefByType(Type type) //{ // var remoteAttr = type.GetCustomAttribute(); diff --git a/Tests/RPC/Client/DocGenerator.cs b/Tests/RPC/Client/DocGenerator.cs index 7ffe294..4abb7fe 100644 --- a/Tests/RPC/Client/DocGenerator.cs +++ b/Tests/RPC/Client/DocGenerator.cs @@ -12,6 +12,8 @@ namespace Esiur.Tests.RPC.Client; public static class DocGenerator { + private static readonly DateTime BaseUtc = new(2024, 1, 1, 0, 0, 0, DateTimeKind.Utc); + public sealed class GenOptions { public int Lines { get; init; } = 20; // items count @@ -35,12 +37,12 @@ public static class DocGenerator var seller = MakeParty(rng, opt.IncludeV2Fields, isSeller: true, opt.IncludeUnicode); var buyer = MakeParty(rng, opt.IncludeV2Fields, isSeller: false, opt.IncludeUnicode); - var createdAt = DateTime.UtcNow.AddMinutes(-rng.Next(0, 60 * 24)); + var createdAt = BaseUtc.AddMinutes(rng.Next(0, 60 * 24 * 365)); var doc = new BusinessDocument { Header = new DocumentHeader { - DocId = Guid.NewGuid().ToByteArray(), + DocId = RandomBytes(rng, 16), Type = (DocType)rng.Next(0, 4), Version = 1, CreatedAt = createdAt, @@ -102,7 +104,7 @@ public static class DocGenerator var rng = new Random(seed); var v2 = DeepClone(v1); - v2.Header.UpdatedAt = DateTime.UtcNow; + v2.Header.UpdatedAt = BaseUtc.AddMinutes(seed); var toChange = Math.Max(1, (int)Math.Round(v2.Items.Length * changeRatio)); // change random lines @@ -113,7 +115,7 @@ public static class DocGenerator li.Qty = RoundQty(li.Qty + (double)(rng.NextDouble() * 2.0 - 1.0)); // ±1 li.UnitPrice = RoundMoney(li.UnitPrice * (double)(0.95 + rng.NextDouble() * 0.1)); // ±5% if (li.Ext == null) li.Ext = new Map(); - li.Ext["lastEdit"] = VDate(DateTime.UtcNow); + li.Ext["lastEdit"] = VDate(BaseUtc.AddMinutes(seed + i)); } @@ -201,7 +203,7 @@ public static class DocGenerator { 0 => VStr(rng.Next(0, 3) switch { 0 => "red", 1 => "blue", _ => "green" }), 1 => VStr(rng.Next(0, 3) switch { 0 => "S", 1 => "M", _ => "L" }), - 2 => VGuid(Guid.NewGuid()), + 2 => VGuid(RandomGuid(rng)), _ => VInt(rng.Next(0, 1000)) }); } @@ -216,7 +218,7 @@ public static class DocGenerator Method = (PaymentMethod)rng.Next(0, 5), Amount = RoundMoney(amount), Reference = "REF-" + rng.Next(100_000, 999_999), - Timestamp = DateTime.UtcNow.AddMinutes(-rng.Next(0, 60 * 24)), + Timestamp = BaseUtc.AddMinutes(rng.Next(0, 60 * 24 * 365)), Fee = includeV2 && rng.Next(0, 2) == 0 ? RoundMoney((double)rng.NextDouble() * 2.0) : null, //CurrencyOverride = includeV2 && rng.Next(0, 2) == 0 ? Currency.IQD : Currency.USD }; @@ -254,6 +256,14 @@ public static class DocGenerator // -------------------------- Utils -------------------------- private static double RoundMoney(double v) => Math.Round(v, 2, MidpointRounding.AwayFromZero); private static double RoundQty(double v) => Math.Round(v, 3, MidpointRounding.AwayFromZero); + private static byte[] RandomBytes(Random rng, int bytes) + { + var data = new byte[bytes]; + rng.NextBytes(data); + return data; + } + + private static Guid RandomGuid(Random rng) => new(RandomBytes(rng, 16)); /// /// Simple deep clone via manual copy to stay serializer-agnostic. @@ -371,7 +381,7 @@ public static class DocGenerator IncludeV2Fields = (i % 2 == 0), IncludeUnicode = true, RiskScores = 100, - Seed = 1000 + i + Seed = seed + i }); items.Add(doc); @@ -394,7 +404,7 @@ public static class DocGenerator IncludeV2Fields = (i % 3 == 0), IncludeUnicode = true, RiskScores = 1000, - Seed = 2000 + i + Seed = seed + 10_000 + i }); items.Add(doc); @@ -418,7 +428,7 @@ public static class DocGenerator IncludeV2Fields = (i % 2 == 1), IncludeUnicode = true, RiskScores = 3000, - Seed = 3000 + i + Seed = seed + 20_000 + i }); items.Add(doc); diff --git a/Tests/RPC/Client/EsiurTest.cs b/Tests/RPC/Client/EsiurTest.cs index e5f1da4..e2240a7 100644 --- a/Tests/RPC/Client/EsiurTest.cs +++ b/Tests/RPC/Client/EsiurTest.cs @@ -10,13 +10,16 @@ namespace Esiur.Tests.RPC.Client public static async Task DoTest(string address, Dictionary docsWorkloads, Dictionary dataWorkloads, - Dictionary intWorkloads ) + Dictionary intWorkloads, + int warmupDelayMs = 3000, + int postHandshakeDelayMs = 2000, + int sampleDelayMs = 3000) { var rt = new TestResults(); using var mon = new PerProcessNetMonitor(Process.GetCurrentProcess().Id); - mon.Start(); + //mon.Start(); Console.WriteLine($"\n== Esiur @ {address} =="); @@ -28,13 +31,13 @@ namespace Esiur.Tests.RPC.Client var sock = service.ResourceConnection.Socket as TcpSocket; - Thread.Sleep(3000); + Thread.Sleep(warmupDelayMs); var (tx, rx, ctx, crx) = mon.GetDiff(0, 0); Console.WriteLine($"Handshake {ctx}/{crx}"); - await Task.Delay(2000); + await Task.Delay(postHandshakeDelayMs); foreach (var w in docsWorkloads) { @@ -47,7 +50,7 @@ namespace Esiur.Tests.RPC.Client throw new Exception("No match"); - await Task.Delay(3000); + await Task.Delay(sampleDelayMs); (tx, rx, ctx, crx) = mon.GetDiff(tx, rx); Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}"); Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}"); @@ -67,7 +70,7 @@ namespace Esiur.Tests.RPC.Client throw new Exception("No match"); - await Task.Delay(3000); + await Task.Delay(sampleDelayMs); (tx, rx, ctx, crx) = mon.GetDiff(tx, rx); Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}"); Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}"); @@ -87,7 +90,7 @@ namespace Esiur.Tests.RPC.Client throw new Exception("No match"); - await Task.Delay(3000); + await Task.Delay(sampleDelayMs); (tx, rx, ctx, crx) = mon.GetDiff(tx, rx); Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}"); Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}"); @@ -97,7 +100,7 @@ namespace Esiur.Tests.RPC.Client } - await Task.Delay(3000); + await Task.Delay(sampleDelayMs); (tx, rx) = mon.GetTotals(); Console.WriteLine($"Transfer {tx}/{rx}"); diff --git a/Tests/RPC/Client/ExperimentResultWriter.cs b/Tests/RPC/Client/ExperimentResultWriter.cs new file mode 100644 index 0000000..a837043 --- /dev/null +++ b/Tests/RPC/Client/ExperimentResultWriter.cs @@ -0,0 +1,250 @@ +using System.Globalization; +using System.Runtime.InteropServices; +using System.Text; + +namespace Esiur.Tests.RPC.Client; + +public sealed record ExperimentRunSettings( + int Rounds, + int BaseSeed, + int SerializationIterations, + int WarmupDelayMs, + int PostHandshakeDelayMs, + int SampleDelayMs, + int ProtocolTimeoutMs, + bool RanRpc, + bool RanSerialization); + +public sealed record TransferSample( + int Round, + string Protocol, + string Category, + string Workload, + long TxBytes, + long RxBytes); + +public static class ExperimentResultWriter +{ + public static string WriteAll( + string outputDirectory, + ExperimentRunSettings settings, + IReadOnlyDictionary> transferResults, + IReadOnlyList serializationSamples) + { + Directory.CreateDirectory(outputDirectory); + + var transferSamples = FlattenTransferResults(transferResults).ToList(); + var transferSummary = SummarizeTransfer(transferSamples).ToList(); + var serializationSummary = SummarizeSerialization(serializationSamples).ToList(); + + WriteTransferDetail(Path.Combine(outputDirectory, "transfer-detail.csv"), transferSamples); + WriteTransferSummary(Path.Combine(outputDirectory, "transfer-summary.csv"), transferSummary); + WriteSerializationDetail(Path.Combine(outputDirectory, "serialization-detail.csv"), serializationSamples); + WriteSerializationSummary(Path.Combine(outputDirectory, "serialization-summary.csv"), serializationSummary); + + var reportPath = Path.Combine(outputDirectory, "report.md"); + WriteReport(reportPath, settings, transferSummary, serializationSummary); + return reportPath; + } + + private static IEnumerable FlattenTransferResults(IReadOnlyDictionary> results) + { + foreach (var protocol in results.OrderBy(x => x.Key)) + { + for (var round = 0; round < protocol.Value.Count; round++) + { + foreach (var sample in FlattenCategory(protocol.Value[round].Docs, round + 1, protocol.Key, "Docs")) + yield return sample; + foreach (var sample in FlattenCategory(protocol.Value[round].Bytes, round + 1, protocol.Key, "Bytes")) + yield return sample; + foreach (var sample in FlattenCategory(protocol.Value[round].Ints, round + 1, protocol.Key, "Ints")) + yield return sample; + } + } + } + + private static IEnumerable FlattenCategory( + Dictionary category, + int round, + string protocol, + string categoryName) + { + foreach (var sample in category.OrderBy(x => x.Key)) + yield return new TransferSample(round, protocol, categoryName, sample.Key, sample.Value.txBytes, sample.Value.rxBytes); + } + + private static IEnumerable<(string Protocol, string Category, string Workload, int Count, NumberStats Tx, NumberStats Rx)> SummarizeTransfer( + IReadOnlyList samples) + { + return samples + .GroupBy(x => new { x.Protocol, x.Category, x.Workload }) + .OrderBy(x => x.Key.Protocol) + .ThenBy(x => x.Key.Category) + .ThenBy(x => x.Key.Workload) + .Select(x => ( + x.Key.Protocol, + x.Key.Category, + x.Key.Workload, + x.Count(), + NumberStats.From(x.Select(v => (double)v.TxBytes)), + NumberStats.From(x.Select(v => (double)v.RxBytes)))); + } + + private static IEnumerable<(string Protocol, string Category, string Workload, int Count, NumberStats Payload, NumberStats Serialize, NumberStats Deserialize)> SummarizeSerialization( + IReadOnlyList samples) + { + return samples + .GroupBy(x => new { x.Protocol, x.Category, x.Workload }) + .OrderBy(x => x.Key.Protocol) + .ThenBy(x => x.Key.Category) + .ThenBy(x => x.Key.Workload) + .Select(x => ( + x.Key.Protocol, + x.Key.Category, + x.Key.Workload, + x.Count(), + NumberStats.From(x.Select(v => (double)v.PayloadBytes)), + NumberStats.From(x.Select(v => v.SerializeMs)), + NumberStats.From(x.Select(v => v.DeserializeMs)))); + } + + private static void WriteTransferDetail(string path, IReadOnlyList samples) + { + var csv = new StringBuilder(); + csv.AppendLine("round,protocol,category,workload,tx_bytes,rx_bytes"); + foreach (var x in samples) + csv.AppendLine(string.Join(",", x.Round, Csv(x.Protocol), Csv(x.Category), Csv(x.Workload), x.TxBytes, x.RxBytes)); + File.WriteAllText(path, csv.ToString()); + } + + private static void WriteTransferSummary( + string path, + IReadOnlyList<(string Protocol, string Category, string Workload, int Count, NumberStats Tx, NumberStats Rx)> rows) + { + var csv = new StringBuilder(); + csv.AppendLine("protocol,category,workload,samples,tx_avg_bytes,tx_min_bytes,tx_max_bytes,tx_median_bytes,rx_avg_bytes,rx_min_bytes,rx_max_bytes,rx_median_bytes"); + foreach (var x in rows) + { + csv.AppendLine(string.Join(",", + Csv(x.Protocol), Csv(x.Category), Csv(x.Workload), x.Count, + D(x.Tx.Average), D(x.Tx.Minimum), D(x.Tx.Maximum), D(x.Tx.Median), + D(x.Rx.Average), D(x.Rx.Minimum), D(x.Rx.Maximum), D(x.Rx.Median))); + } + File.WriteAllText(path, csv.ToString()); + } + + private static void WriteSerializationDetail(string path, IReadOnlyList samples) + { + var csv = new StringBuilder(); + csv.AppendLine("round,seed,protocol,category,workload,payload_bytes,serialize_ms,deserialize_ms"); + foreach (var x in samples) + { + csv.AppendLine(string.Join(",", + x.Round, x.Seed, Csv(x.Protocol), Csv(x.Category), Csv(x.Workload), + x.PayloadBytes, D(x.SerializeMs), D(x.DeserializeMs))); + } + File.WriteAllText(path, csv.ToString()); + } + + private static void WriteSerializationSummary( + string path, + IReadOnlyList<(string Protocol, string Category, string Workload, int Count, NumberStats Payload, NumberStats Serialize, NumberStats Deserialize)> rows) + { + var csv = new StringBuilder(); + csv.AppendLine("protocol,category,workload,samples,payload_avg_bytes,payload_min_bytes,payload_max_bytes,payload_median_bytes,serialize_avg_ms,serialize_min_ms,serialize_max_ms,serialize_median_ms,deserialize_avg_ms,deserialize_min_ms,deserialize_max_ms,deserialize_median_ms"); + foreach (var x in rows) + { + csv.AppendLine(string.Join(",", + Csv(x.Protocol), Csv(x.Category), Csv(x.Workload), x.Count, + D(x.Payload.Average), D(x.Payload.Minimum), D(x.Payload.Maximum), D(x.Payload.Median), + D(x.Serialize.Average), D(x.Serialize.Minimum), D(x.Serialize.Maximum), D(x.Serialize.Median), + D(x.Deserialize.Average), D(x.Deserialize.Minimum), D(x.Deserialize.Maximum), D(x.Deserialize.Median))); + } + File.WriteAllText(path, csv.ToString()); + } + + private static void WriteReport( + string path, + ExperimentRunSettings settings, + IReadOnlyList<(string Protocol, string Category, string Workload, int Count, NumberStats Tx, NumberStats Rx)> transferRows, + IReadOnlyList<(string Protocol, string Category, string Workload, int Count, NumberStats Payload, NumberStats Serialize, NumberStats Deserialize)> serializationRows) + { + var md = new StringBuilder(); + md.AppendLine("# RPC Serialization Supplementary Experiment"); + md.AppendLine(); + md.AppendLine("## Run Configuration"); + md.AppendLine(); + md.AppendLine($"- Generated: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss} UTC"); + md.AppendLine($"- Rounds: {settings.Rounds}"); + md.AppendLine($"- Base seed: {settings.BaseSeed}"); + md.AppendLine($"- Serialization iterations per workload sample: {settings.SerializationIterations}"); + md.AppendLine($"- RPC warmup/post-handshake/sample delays: {settings.WarmupDelayMs}/{settings.PostHandshakeDelayMs}/{settings.SampleDelayMs} ms"); + md.AppendLine($"- Per-protocol RPC timeout: {settings.ProtocolTimeoutMs} ms"); + md.AppendLine($"- Runtime: {RuntimeInformation.FrameworkDescription}"); + md.AppendLine($"- OS: {RuntimeInformation.OSDescription}"); + md.AppendLine($"- Architecture: {RuntimeInformation.ProcessArchitecture}"); + md.AppendLine($"- Logical processors: {Environment.ProcessorCount}"); + md.AppendLine(); + md.AppendLine("The document workloads are synthetic but deterministic. Each round uses `baseSeed + (round - 1) * 1000`; workloads cover small, medium, and large business documents with nested records, enums, nullable values, maps, Unicode text, integer arrays, and attachments."); + md.AppendLine(); + md.AppendLine("Serialization measurements are local codec payload measurements. They include Esiur, gRPC, JSON, and SignalR JSON model payloads. Thrift is included in RPC transfer measurements; local Thrift codec timing is not emitted because the current Thrift package does not expose a stable in-memory transport in this test project."); + md.AppendLine(); + md.AppendLine("RPC transfer counters use Windows ETW kernel network tracing. If the client is not run with Administrator rights, the RPC calls still execute but transfer counters are recorded as zero. Protocol failures or timeouts are printed to the console and omitted from the transfer summary."); + md.AppendLine(); + + md.AppendLine("## Serialization Payload Summary"); + md.AppendLine(); + md.AppendLine("| Protocol | Category | Workload | Samples | Payload avg bytes | Encode avg ms | Decode avg ms |"); + md.AppendLine("|---|---|---:|---:|---:|---:|---:|"); + foreach (var row in serializationRows) + { + md.AppendLine($"| {row.Protocol} | {row.Category} | {row.Workload} | {row.Count} | {D(row.Payload.Average)} | {D(row.Serialize.Average)} | {D(row.Deserialize.Average)} |"); + } + md.AppendLine(); + + md.AppendLine("## RPC Transfer Summary"); + md.AppendLine(); + md.AppendLine("| Protocol | Category | Workload | Samples | TX avg bytes | RX avg bytes |"); + md.AppendLine("|---|---|---:|---:|---:|---:|"); + foreach (var row in transferRows) + { + md.AppendLine($"| {row.Protocol} | {row.Category} | {row.Workload} | {row.Count} | {D(row.Tx.Average)} | {D(row.Rx.Average)} |"); + } + md.AppendLine(); + md.AppendLine("## Output Files"); + md.AppendLine(); + md.AppendLine("- `serialization-detail.csv`: per-round codec payload bytes and encode/decode timing."); + md.AppendLine("- `serialization-summary.csv`: aggregate codec payload and timing statistics."); + md.AppendLine("- `transfer-detail.csv`: per-round process network TX/RX deltas from the RPC client."); + md.AppendLine("- `transfer-summary.csv`: aggregate RPC transfer statistics."); + + File.WriteAllText(path, md.ToString()); + } + + private static string Csv(string value) + { + if (value.Contains(',') || value.Contains('"') || value.Contains('\n') || value.Contains('\r')) + return "\"" + value.Replace("\"", "\"\"") + "\""; + + return value; + } + + private static string D(double value) + => value.ToString("0.###", CultureInfo.InvariantCulture); + + public readonly record struct NumberStats(double Average, double Minimum, double Maximum, double Median) + { + public static NumberStats From(IEnumerable values) + { + var sorted = values.OrderBy(x => x).ToArray(); + if (sorted.Length == 0) + return new NumberStats(double.NaN, double.NaN, double.NaN, double.NaN); + + var median = sorted.Length % 2 == 1 + ? sorted[sorted.Length / 2] + : (sorted[sorted.Length / 2 - 1] + sorted[sorted.Length / 2]) / 2.0; + + return new NumberStats(sorted.Average(), sorted[0], sorted[^1], median); + } + } +} diff --git a/Tests/RPC/Client/GrpcTest.cs b/Tests/RPC/Client/GrpcTest.cs index 7aa44c4..eab4a90 100644 --- a/Tests/RPC/Client/GrpcTest.cs +++ b/Tests/RPC/Client/GrpcTest.cs @@ -16,7 +16,10 @@ public class GrpcTest public static async Task DoTest(string address, Dictionary docsWorkloads, Dictionary dataWorkloads, - Dictionary intWorkloads) + Dictionary intWorkloads, + int warmupDelayMs = 3000, + int postHandshakeDelayMs = 2000, + int sampleDelayMs = 3000) { var rt = new TestResults(); @@ -29,13 +32,13 @@ public class GrpcTest var service = new Client.Grpc.EchoService.EchoServiceClient(channel); - Thread.Sleep(3000); + Thread.Sleep(warmupDelayMs); var (tx, rx, ctx, crx) = mon.GetDiff(0, 0); Console.WriteLine($"Handshake {ctx}/{crx}"); - await Task.Delay(2000); + await Task.Delay(postHandshakeDelayMs); foreach (var w in docsWorkloads) { @@ -50,7 +53,7 @@ public class GrpcTest // throw new Exception("No match"); - await Task.Delay(3000); + await Task.Delay(sampleDelayMs); (tx, rx, ctx, crx) = mon.GetDiff(tx, rx); Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}"); @@ -72,7 +75,7 @@ public class GrpcTest // throw new Exception("No match"); - await Task.Delay(3000); + await Task.Delay(sampleDelayMs); (tx, rx, ctx, crx) = mon.GetDiff(tx, rx); Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}"); //Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}"); @@ -96,7 +99,7 @@ public class GrpcTest // throw new Exception("No match"); - await Task.Delay(3000); + await Task.Delay(sampleDelayMs); (tx, rx, ctx, crx) = mon.GetDiff(tx, rx); Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}"); //Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}"); @@ -105,7 +108,7 @@ public class GrpcTest } - await Task.Delay(3000); + await Task.Delay(sampleDelayMs); (tx, rx) = mon.GetTotals(); Console.WriteLine($"Transfer {tx}/{rx}"); @@ -115,4 +118,4 @@ public class GrpcTest return rt; } -} \ No newline at end of file +} diff --git a/Tests/RPC/Client/JsonTest.cs b/Tests/RPC/Client/JsonTest.cs index 4ded7c5..411faf9 100644 --- a/Tests/RPC/Client/JsonTest.cs +++ b/Tests/RPC/Client/JsonTest.cs @@ -15,7 +15,10 @@ public class JsonTest public static async Task DoTest(string address, Dictionary docsWorkloads, Dictionary dataWorkloads, - Dictionary intWorkloads) + Dictionary intWorkloads, + int warmupDelayMs = 3000, + int postHandshakeDelayMs = 2000, + int sampleDelayMs = 3000) { var rt = new TestResults(); @@ -28,20 +31,20 @@ public class JsonTest using var http = new HttpClient { BaseAddress = new Uri(address) }; - Thread.Sleep(3000); + Thread.Sleep(warmupDelayMs); var (tx, rx, ctx, crx) = mon.GetDiff(0, 0); Console.WriteLine($"Handshake {ctx}/{crx}"); - await Task.Delay(2000); + await Task.Delay(postHandshakeDelayMs); foreach (var w in docsWorkloads) { Console.Write("Workload: " + w.Key); var docs = await JsonRpcCallAsync(http, "EchoDocuments", w.Value); - await Task.Delay(3000); + await Task.Delay(sampleDelayMs); (tx, rx, ctx, crx) = mon.GetDiff(tx, rx); Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}"); @@ -62,7 +65,7 @@ public class JsonTest // throw new Exception("No match"); - await Task.Delay(3000); + await Task.Delay(sampleDelayMs); (tx, rx, ctx, crx) = mon.GetDiff(tx, rx); Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}"); //Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}"); @@ -82,7 +85,7 @@ public class JsonTest // throw new Exception("No match"); - await Task.Delay(3000); + await Task.Delay(sampleDelayMs); (tx, rx, ctx, crx) = mon.GetDiff(tx, rx); Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}"); //Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}"); @@ -91,7 +94,7 @@ public class JsonTest } - await Task.Delay(3000); + await Task.Delay(sampleDelayMs); (tx, rx) = mon.GetTotals(); Console.WriteLine($"Transfer {tx}/{rx}"); @@ -125,4 +128,4 @@ public class JsonTest using var doc = JsonDocument.Parse(raw); return (doc.RootElement.Clone(), raw); } -} \ No newline at end of file +} diff --git a/Tests/RPC/Client/Model/Esiur/Esiur.Tests.RPC.EsiurServer.Service.g.cs b/Tests/RPC/Client/Model/Esiur/Esiur.Tests.RPC.EsiurServer.Service.g.cs index 46bf3de..7f7e12d 100644 --- a/Tests/RPC/Client/Model/Esiur/Esiur.Tests.RPC.EsiurServer.Service.g.cs +++ b/Tests/RPC/Client/Model/Esiur/Esiur.Tests.RPC.EsiurServer.Service.g.cs @@ -35,6 +35,7 @@ namespace Esiur.Tests.RPC.EsiurServer .Chunk(x => rt.TriggerChunk(x)); return rt; } + [Annotation("", "([BusinessDocument[]] payload) -> BusinessDocument[]")] [Export] public AsyncReply EchoDocuments(Esiur.Tests.RPC.EsiurServer.BusinessDocument[] payload) @@ -47,6 +48,7 @@ namespace Esiur.Tests.RPC.EsiurServer .Chunk(x => rt.TriggerChunk(x)); return rt; } + [Annotation("", "([DocType[]] payload) -> DocType[]")] [Export] public AsyncReply EchoEnumArray(Esiur.Tests.RPC.EsiurServer.DocType[] payload) diff --git a/Tests/RPC/Client/Monitor.cs b/Tests/RPC/Client/Monitor.cs index b3bea0a..2525c96 100644 --- a/Tests/RPC/Client/Monitor.cs +++ b/Tests/RPC/Client/Monitor.cs @@ -17,6 +17,7 @@ public class PerProcessNetMonitor : IDisposable private long _txBytes; private long _rxBytes; private volatile bool _running; + private static int _warningWritten; public PerProcessNetMonitor(int pid) { @@ -25,12 +26,25 @@ public class PerProcessNetMonitor : IDisposable public void Start() { - // Use a unique session name - string sessionName = "NetMon_" + Guid.NewGuid(); - _session = new TraceEventSession(sessionName); + try + { + // Use a unique session name + string sessionName = "NetMon_" + Guid.NewGuid(); + _session = new TraceEventSession(sessionName); - // Enable kernel network provider - _session.EnableKernelProvider(KernelTraceEventParser.Keywords.NetworkTCPIP); + // Enable kernel network provider + _session.EnableKernelProvider(KernelTraceEventParser.Keywords.NetworkTCPIP); + } + catch (UnauthorizedAccessException ex) + { + _session?.Dispose(); + _session = null; + + if (Interlocked.Exchange(ref _warningWritten, 1) == 0) + Console.WriteLine($"Network monitor disabled: {ex.Message}"); + + return; + } _running = true; _listenTask = Task.Run(() => diff --git a/Tests/RPC/Client/Program.cs b/Tests/RPC/Client/Program.cs index 8df58a9..f62b57c 100644 --- a/Tests/RPC/Client/Program.cs +++ b/Tests/RPC/Client/Program.cs @@ -1,63 +1,215 @@ -using MQTTnet; using Esiur.Tests.RPC.Client; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; +var rounds = ReadIntArg(args, "--rounds", 10); +var baseSeed = ReadIntArg(args, "--seed", 1000); +var serializationIterations = ReadIntArg(args, "--serialization-iterations", 3); +var warmupDelayMs = ReadIntArg(args, "--warmup-ms", 3000); +var postHandshakeDelayMs = ReadIntArg(args, "--post-handshake-ms", 2000); +var sampleDelayMs = ReadIntArg(args, "--sample-ms", 3000); +var protocolTimeoutMs = ReadIntArg(args, "--protocol-timeout-ms", 120000); +var runRpc = !HasArg(args, "--skip-rpc"); +var runSerialization = false;// !HasArg(args, "--skip-serialization"); +var outputDirectory = Path.GetFullPath(ReadStringArg( + args, + "--output", + Path.Combine("Tests", "RPC", "Results", DateTime.UtcNow.ToString("yyyyMMdd-HHmmss")))); -var results = new Dictionary>(); - -results.Add("esiur", new List()); -results.Add("grpc", new List()); -results.Add("thrift", new List()); -results.Add("json", new List()); -results.Add("signalr", new List()); - -for (var i = 0; i < 10; i++) +var results = new Dictionary> { - var seed = 1000 + (i * 1000); + ["esiur"] = new(), + ["grpc"] = new(), + ["thrift"] = new(), + ["json"] = new(), + ["signalr"] = new() +}; - var docsWorkloads = new Dictionary();// RPC.Client.Tests.DocGenerator.BuildWorkloads(seed); +var serializationSamples = new List(); + +Console.WriteLine("RPC supplementary experiment"); +Console.WriteLine($"Rounds={rounds}, seed={baseSeed}, serialization iterations={serializationIterations}"); +Console.WriteLine($"Output={outputDirectory}"); + +for (var i = 0; i < rounds; i++) +{ + var round = i + 1; + var seed = baseSeed + (i * 1000); + + Console.WriteLine($"\n# Round {round}/{rounds}, seed {seed}"); + + var docsWorkloads = DocGenerator.BuildWorkloads(seed); var dataWorkLoads = Shared.BuildBytesWorkLoads(seed); var intWorkloads = Shared.BuildIntWorkloads(seed); - results["esiur"].Add( - await EsiurTest.DoTest("ep://localhost:5005/sys/service", docsWorkloads, dataWorkLoads, intWorkloads) - ); + if (runSerialization) + { + Console.WriteLine("Collecting local serialization samples..."); + serializationSamples.AddRange(SerializationExperiment.RunRound( + round, + seed, + docsWorkloads, + dataWorkLoads, + intWorkloads, + serializationIterations)); + } - results["thrift"].Add( - await ThriftTest.DoTest("127.0.0.1", 5400, - docsWorkloads.ToDictionary(x => x.Key, v => v.Value.Select(x => x.ToThrift()).ToArray()), - dataWorkLoads, - intWorkloads - ) - ); + if (!runRpc) + continue; - results["signalr"].Add(await SignalRTest.DoTest("http://127.0.0.1:5200/hub/echo", - docsWorkloads.ToDictionary(x => x.Key, v => v.Value.Select(x => x.ToShared()).ToArray()), - dataWorkLoads, - intWorkloads - )); + var thriftDocs = docsWorkloads.ToDictionary(x => x.Key, v => v.Value.Select(x => x.ToThrift()).ToArray()); + var signalRDocs = docsWorkloads.ToDictionary(x => x.Key, v => v.Value.Select(x => x.ToShared()).ToArray()); + var grpcDocs = docsWorkloads.ToDictionary(x => x.Key, v => v.Value.Select(x => x.ToGrpc()).ToArray()); - results["json"].Add( await JsonTest.DoTest("http://127.0.0.1:5100", + if (await RunProtocol("esiur", () => EsiurTest.DoTest( + "ep://localhost:5005/sys/service", docsWorkloads, dataWorkLoads, - intWorkloads - ) ); + intWorkloads, + warmupDelayMs, + postHandshakeDelayMs, + sampleDelayMs), + protocolTimeoutMs) is { } esiurResults) + { + results["esiur"].Add(esiurResults); + } - results["grpc"].Add(await GrpcTest.DoTest("http://127.0.0.1:5300", - docsWorkloads.ToDictionary(x => x.Key, v => v.Value.Select(x => x.ToGrpc()).ToArray()), + if (await RunProtocol("thrift", () => ThriftTest.DoTest( + "127.0.0.1", + 5400, + thriftDocs, dataWorkLoads, - intWorkloads - )); + intWorkloads, + warmupDelayMs, + postHandshakeDelayMs, + sampleDelayMs), + protocolTimeoutMs) is { } thriftResults) + { + results["thrift"].Add(thriftResults); + } + + if (await RunProtocol("signalr", () => SignalRTest.DoTest( + "http://127.0.0.1:5200/hub/echo", + signalRDocs, + dataWorkLoads, + intWorkloads, + warmupDelayMs, + postHandshakeDelayMs, + sampleDelayMs), + protocolTimeoutMs) is { } signalRResults) + { + results["signalr"].Add(signalRResults); + } + + if (await RunProtocol("json", () => JsonTest.DoTest( + "http://127.0.0.1:5100", + docsWorkloads, + dataWorkLoads, + intWorkloads, + warmupDelayMs, + postHandshakeDelayMs, + sampleDelayMs), + protocolTimeoutMs) is { } jsonResults) + { + results["json"].Add(jsonResults); + } + + if (await RunProtocol("grpc", () => GrpcTest.DoTest( + "http://127.0.0.1:5300", + grpcDocs, + dataWorkLoads, + intWorkloads, + warmupDelayMs, + postHandshakeDelayMs, + sampleDelayMs), + protocolTimeoutMs) is { } grpcResults) + { + results["grpc"].Add(grpcResults); + } +} + +if (runRpc) + PrintTransferStats(results); + +var reportPath = ExperimentResultWriter.WriteAll( + outputDirectory, + new ExperimentRunSettings( + rounds, + baseSeed, + serializationIterations, + warmupDelayMs, + postHandshakeDelayMs, + sampleDelayMs, + protocolTimeoutMs, + runRpc, + runSerialization), + results, + serializationSamples); + +Console.WriteLine($"\nReport written to {reportPath}"); + +static void PrintTransferStats(Dictionary> results) +{ + foreach (var transport in results.Keys) + { + Console.WriteLine($"\n== Stats for {transport} =="); + + var rounds = results[transport]; + if (rounds.Count == 0) + { + Console.WriteLine("No results."); + continue; + } + + var categories = new Dictionary>> + { + { "Docs", tr => tr.Docs }, + { "Bytes", tr => tr.Bytes }, + { "Ints", tr => tr.Ints } + }; + + foreach (var cat in categories) + { + Console.WriteLine($"-- {cat.Key} --"); + + var allKeys = new HashSet(); + foreach (var r in rounds) + { + foreach (var k in cat.Value(r).Keys) + allKeys.Add(k); + } + + foreach (var key in allKeys.OrderBy(k => k)) + { + var txList = new List(); + var rxList = new List(); + foreach (var r in rounds) + { + if (cat.Value(r).TryGetValue(key, out var tup)) + { + txList.Add(tup.Item1); + rxList.Add(tup.Item2); + } + } + + if (txList.Count == 0) + { + Console.WriteLine($"{key}: no samples"); + continue; + } + + var sTx = StatsLongs(txList); + var sRx = StatsLongs(rxList); + + Console.WriteLine($"{key}: TX avg={sTx.avg:0.##}, min={sTx.min}, max={sTx.max}, med={sTx.median:0.##} | RX avg={sRx.avg:0.##}, min={sRx.min}, max={sRx.max}, med={sRx.median:0.##}"); + } + } + } } -// Compute statistics: average, min, max, median for tx/rx per transport and workload static (double avg, long min, long max, double median) StatsLongs(List xs) { - if (xs == null || xs.Count == 0) return (double.NaN, 0, 0, double.NaN); + if (xs == null || xs.Count == 0) + return (double.NaN, 0, 0, double.NaN); + xs.Sort(); double avg = xs.Average(x => (double)x); long min = xs.First(); @@ -66,60 +218,53 @@ static (double avg, long min, long max, double median) StatsLongs(List xs) return (avg, min, max, median); } -foreach (var transport in results.Keys) +static async Task RunProtocol(string protocol, Func> action, int timeoutMs) { - Console.WriteLine($"\n== Stats for {transport} =="); - - var rounds = results[transport]; - if (rounds.Count == 0) + try { - Console.WriteLine("No results."); - continue; + var task = action(); + + if (timeoutMs > 0) + { + var completed = await Task.WhenAny(task, Task.Delay(timeoutMs)); + if (completed != task) + { + Console.WriteLine($"{protocol} failed: timed out after {timeoutMs} ms"); + return null; + } + } + + return await task; } - - // categories: Docs, Bytes, Ints - var categories = new Dictionary>>() + catch (Exception ex) { - { "Docs", tr => tr.Docs }, - { "Bytes", tr => tr.Bytes }, - { "Ints", tr => tr.Ints } - }; - - foreach (var cat in categories) - { - Console.WriteLine($"-- {cat.Key} --"); - - // collect all workload keys seen in any round - var allKeys = new HashSet(); - foreach (var r in rounds) - { - foreach (var k in cat.Value(r).Keys) allKeys.Add(k); - } - - foreach (var key in allKeys.OrderBy(k => k)) - { - var txList = new List(); - var rxList = new List(); - foreach (var r in rounds) - { - if (cat.Value(r).TryGetValue(key, out var tup)) - { - txList.Add(tup.Item1); - rxList.Add(tup.Item2); - } - } - - if (txList.Count == 0) - { - Console.WriteLine($"{key}: no samples"); - continue; - } - - var sTx = StatsLongs(txList); - var sRx = StatsLongs(rxList); - - Console.WriteLine($"{key}: TX avg={sTx.avg:0.##}, min={sTx.min}, max={sTx.max}, med={sTx.median:0.##} | RX avg={sRx.avg:0.##}, min={sRx.min}, max={sRx.max}, med={sRx.median:0.##}"); - } + Console.WriteLine($"{protocol} failed: {ex.GetType().Name}: {ex.Message}"); + return null; } } +static int ReadIntArg(string[] args, string name, int defaultValue) +{ + var raw = TryReadStringArg(args, name); + return raw == null ? defaultValue : int.Parse(raw); +} + +static string ReadStringArg(string[] args, string name, string defaultValue) + => TryReadStringArg(args, name) ?? defaultValue; + +static string? TryReadStringArg(string[] args, string name) +{ + var prefix = name + "="; + var inline = args.FirstOrDefault(x => x.StartsWith(prefix, StringComparison.OrdinalIgnoreCase)); + if (inline != null) + return inline[prefix.Length..]; + + var index = Array.FindIndex(args, x => string.Equals(x, name, StringComparison.OrdinalIgnoreCase)); + if (index >= 0 && index + 1 < args.Length) + return args[index + 1]; + + return null; +} + +static bool HasArg(string[] args, string name) + => args.Any(x => string.Equals(x, name, StringComparison.OrdinalIgnoreCase)); diff --git a/Tests/RPC/Client/SerializationExperiment.cs b/Tests/RPC/Client/SerializationExperiment.cs new file mode 100644 index 0000000..d6cc4ff --- /dev/null +++ b/Tests/RPC/Client/SerializationExperiment.cs @@ -0,0 +1,178 @@ +using Esiur.Data; +using Esiur.Resource; +using Google.Protobuf; +using System.Diagnostics; +using System.Text.Json; +using System.Text.Json.Serialization; +using EsiurModel = Esiur.Tests.RPC.EsiurServer; +using GrpcModel = Esiur.Tests.RPC.Client.Grpc; +using SharedModel = Esiur.Tests.RPC.Client.SharedModel; + +namespace Esiur.Tests.RPC.Client; + +public sealed record SerializationSample( + int Round, + int Seed, + string Protocol, + string Category, + string Workload, + long PayloadBytes, + double SerializeMs, + double DeserializeMs); + +public static class SerializationExperiment +{ + private static readonly JsonSerializerOptions JsonOptions = new() + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase, + DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull + }; + + public static List RunRound( + int round, + int seed, + Dictionary docsWorkloads, + Dictionary dataWorkloads, + Dictionary intWorkloads, + int iterations) + { + if (iterations <= 0) + throw new ArgumentOutOfRangeException(nameof(iterations)); + + EsiurModel.Initialization.RegisterTypes(Warehouse.Default); + + var samples = new List(); + + foreach (var workload in docsWorkloads) + { + var docs = workload.Value; + var grpcDocs = docs.Select(x => x.ToGrpc()).ToArray(); + var sharedDocs = docs.Select(x => x.ToShared()).ToArray(); + + samples.Add(Measure( + round, seed, "esiur", "Docs", workload.Key, iterations, + () => Codec.Compose(docs, Warehouse.Default, null), + payload => GC.KeepAlive(Codec.ParseSync(payload, 0, Warehouse.Default).Item2))); + + samples.Add(Measure( + round, seed, "grpc", "Docs", workload.Key, iterations, + () => + { + var request = new GrpcModel.DocumentsRequest(); + request.Docs.AddRange(grpcDocs); + return request.ToByteArray(); + }, + payload => GC.KeepAlive(GrpcModel.DocumentsRequest.Parser.ParseFrom(payload)))); + + samples.Add(Measure( + round, seed, "json", "Docs", workload.Key, iterations, + () => JsonSerializer.SerializeToUtf8Bytes(docs, JsonOptions), + payload => GC.KeepAlive(JsonSerializer.Deserialize(payload, JsonOptions)))); + + samples.Add(Measure( + round, seed, "signalr", "Docs", workload.Key, iterations, + () => JsonSerializer.SerializeToUtf8Bytes(sharedDocs, JsonOptions), + payload => GC.KeepAlive(JsonSerializer.Deserialize(payload, JsonOptions)))); + } + + foreach (var workload in dataWorkloads) + { + var data = workload.Value; + + samples.Add(Measure( + round, seed, "esiur", "Bytes", workload.Key, iterations, + () => Codec.Compose(data, Warehouse.Default, null), + payload => GC.KeepAlive(Codec.ParseSync(payload, 0, Warehouse.Default).Item2))); + + samples.Add(Measure( + round, seed, "grpc", "Bytes", workload.Key, iterations, + () => new GrpcModel.BytesRequest { Data = ByteString.CopyFrom(data) }.ToByteArray(), + payload => GC.KeepAlive(GrpcModel.BytesRequest.Parser.ParseFrom(payload)))); + + samples.Add(Measure( + round, seed, "json", "Bytes", workload.Key, iterations, + () => JsonSerializer.SerializeToUtf8Bytes(data, JsonOptions), + payload => GC.KeepAlive(JsonSerializer.Deserialize(payload, JsonOptions)))); + + samples.Add(Measure( + round, seed, "signalr", "Bytes", workload.Key, iterations, + () => JsonSerializer.SerializeToUtf8Bytes(data, JsonOptions), + payload => GC.KeepAlive(JsonSerializer.Deserialize(payload, JsonOptions)))); + } + + foreach (var workload in intWorkloads) + { + var data = workload.Value; + + samples.Add(Measure( + round, seed, "esiur", "Ints", workload.Key, iterations, + () => Codec.Compose(data, Warehouse.Default, null), + payload => GC.KeepAlive(Codec.ParseSync(payload, 0, Warehouse.Default).Item2))); + + samples.Add(Measure( + round, seed, "grpc", "Ints", workload.Key, iterations, + () => + { + var request = new GrpcModel.IntArrayRequest(); + request.Array.AddRange(data); + return request.ToByteArray(); + }, + payload => GC.KeepAlive(GrpcModel.IntArrayRequest.Parser.ParseFrom(payload)))); + + samples.Add(Measure( + round, seed, "json", "Ints", workload.Key, iterations, + () => JsonSerializer.SerializeToUtf8Bytes(data, JsonOptions), + payload => GC.KeepAlive(JsonSerializer.Deserialize(payload, JsonOptions)))); + + samples.Add(Measure( + round, seed, "signalr", "Ints", workload.Key, iterations, + () => JsonSerializer.SerializeToUtf8Bytes(data, JsonOptions), + payload => GC.KeepAlive(JsonSerializer.Deserialize(payload, JsonOptions)))); + } + + return samples; + } + + private static SerializationSample Measure( + int round, + int seed, + string protocol, + string category, + string workload, + int iterations, + Func serialize, + Action deserialize) + { + var payload = serialize(); + deserialize(payload); + + long payloadBytes = payload.Length; + long serializeTicks = 0; + long deserializeTicks = 0; + + for (var i = 0; i < iterations; i++) + { + var started = Stopwatch.GetTimestamp(); + payload = serialize(); + serializeTicks += Stopwatch.GetTimestamp() - started; + payloadBytes = payload.Length; + + started = Stopwatch.GetTimestamp(); + deserialize(payload); + deserializeTicks += Stopwatch.GetTimestamp() - started; + } + + return new SerializationSample( + round, + seed, + protocol, + category, + workload, + payloadBytes, + TicksToMs(serializeTicks) / iterations, + TicksToMs(deserializeTicks) / iterations); + } + + private static double TicksToMs(long ticks) + => ticks * 1000.0 / Stopwatch.Frequency; +} diff --git a/Tests/RPC/Client/Shared.cs b/Tests/RPC/Client/Shared.cs index ce66b3b..af3b2d3 100644 --- a/Tests/RPC/Client/Shared.cs +++ b/Tests/RPC/Client/Shared.cs @@ -12,33 +12,34 @@ namespace Esiur.Tests.RPC.Client // Generate random int array of given length and distribution public static int[] GenerateInt32(int length, string pattern = "uniform", - int range = int.MaxValue) + int range = int.MaxValue, Random? random = null) { var data = new int[length]; + var source = random ?? rng; switch (pattern.ToLower()) { case "uniform": // Random values in [-range, range] for (int i = 0; i < length; i++) - data[i] = rng.Next(-range, range); + data[i] = source.Next(-range, range); break; case "positive": for (int i = 0; i < length; i++) - data[i] = rng.Next(0, range); + data[i] = source.Next(0, range); break; case "negative": for (int i = 0; i < length; i++) - data[i] = -rng.Next(0, range); + data[i] = -source.Next(0, range); break; case "alternating": for (int i = 0; i < length; i++) { - int val = rng.Next(0, range); + int val = source.Next(0, range); data[i] = (i % 2 == 0) ? val : -val; } break; @@ -46,13 +47,13 @@ namespace Esiur.Tests.RPC.Client case "small": // Focused on small magnitudes to test ZigZag fast path for (int i = 0; i < length; i++) - data[i] = rng.Next(-64, 65); + data[i] = source.Next(-64, 65); break; case "ascending": { - int start = rng.Next(-range, range); + int start = source.Next(-range, range); for (int i = 0; i < length; i++) data[i] = start + i; } @@ -103,10 +104,10 @@ namespace Esiur.Tests.RPC.Client var result = new Dictionary(); var r = new Random(seed); - result.Add("uniform", GenerateInt32(1000, "uniform")); - result.Add("small", GenerateInt32(1000, "small")); - result.Add("alternating", GenerateInt32(1000, "alternating")); - result.Add("ascending", GenerateInt32(1000, "ascending")); + result.Add("uniform", GenerateInt32(1000, "uniform", random: r)); + result.Add("small", GenerateInt32(1000, "small", random: r)); + result.Add("alternating", GenerateInt32(1000, "alternating", random: r)); + result.Add("ascending", GenerateInt32(1000, "ascending", random: r)); return result; diff --git a/Tests/RPC/Client/SignalRTest.cs b/Tests/RPC/Client/SignalRTest.cs index 21161cc..8206071 100644 --- a/Tests/RPC/Client/SignalRTest.cs +++ b/Tests/RPC/Client/SignalRTest.cs @@ -20,7 +20,10 @@ namespace Esiur.Tests.RPC.Client public static async Task DoTest(string address, Dictionary docsWorkloads, Dictionary dataWorkloads, - Dictionary intWorkloads) + Dictionary intWorkloads, + int warmupDelayMs = 3000, + int postHandshakeDelayMs = 2000, + int sampleDelayMs = 3000) { var rt = new TestResults(); @@ -39,13 +42,13 @@ namespace Esiur.Tests.RPC.Client await service.StartAsync(); - Thread.Sleep(3000); + Thread.Sleep(warmupDelayMs); var (tx, rx, ctx, crx) = mon.GetDiff(0, 0); Console.WriteLine($"Handshake {ctx}/{crx}"); - await Task.Delay(2000); + await Task.Delay(postHandshakeDelayMs); foreach (var w in docsWorkloads) { @@ -58,7 +61,7 @@ namespace Esiur.Tests.RPC.Client // throw new Exception("No match"); - await Task.Delay(3000); + await Task.Delay(sampleDelayMs); (tx, rx, ctx, crx) = mon.GetDiff(tx, rx); Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}"); //Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}"); @@ -79,7 +82,7 @@ namespace Esiur.Tests.RPC.Client // throw new Exception("No match"); - await Task.Delay(3000); + await Task.Delay(sampleDelayMs); (tx, rx, ctx, crx) = mon.GetDiff(tx, rx); Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}"); //Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}"); @@ -99,7 +102,7 @@ namespace Esiur.Tests.RPC.Client // throw new Exception("No match"); - await Task.Delay(3000); + await Task.Delay(sampleDelayMs); (tx, rx, ctx, crx) = mon.GetDiff(tx, rx); Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}"); //Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}"); @@ -108,7 +111,7 @@ namespace Esiur.Tests.RPC.Client } - await Task.Delay(3000); + await Task.Delay(sampleDelayMs); (tx, rx) = mon.GetTotals(); Console.WriteLine($"Transfer {tx}/{rx}"); diff --git a/Tests/RPC/Client/ThriftTest.cs b/Tests/RPC/Client/ThriftTest.cs index 401aa0e..2983b34 100644 --- a/Tests/RPC/Client/ThriftTest.cs +++ b/Tests/RPC/Client/ThriftTest.cs @@ -12,7 +12,10 @@ public class ThriftTest public static async Task DoTest(string host, int port, Dictionary docsWorkloads, Dictionary dataWorkloads, - Dictionary intWorkloads) + Dictionary intWorkloads, + int warmupDelayMs = 3000, + int postHandshakeDelayMs = 2000, + int sampleDelayMs = 3000) { var rt = new TestResults(); @@ -28,14 +31,14 @@ public class ThriftTest var service = new EchoService.Client(proto); - Thread.Sleep(3000); + Thread.Sleep(warmupDelayMs); var (tx, rx, ctx, crx) = mon.GetDiff(0, 0); Console.WriteLine($"Handshake {ctx}/{crx}"); - await Task.Delay(2000); + await Task.Delay(postHandshakeDelayMs); foreach (var w in docsWorkloads) { @@ -48,7 +51,7 @@ public class ThriftTest // throw new Exception("No match"); - await Task.Delay(3000); + await Task.Delay(sampleDelayMs); (tx, rx, ctx, crx) = mon.GetDiff(tx, rx); Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}"); @@ -68,7 +71,7 @@ public class ThriftTest throw new Exception("No match"); - await Task.Delay(3000); + await Task.Delay(sampleDelayMs); (tx, rx, ctx, crx) = mon.GetDiff(tx, rx); Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}"); //Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}"); @@ -89,7 +92,7 @@ public class ThriftTest throw new Exception("No match"); - await Task.Delay(3000); + await Task.Delay(sampleDelayMs); (tx, rx, ctx, crx) = mon.GetDiff(tx, rx); Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}"); //Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}"); @@ -98,7 +101,7 @@ public class ThriftTest } - await Task.Delay(3000); + await Task.Delay(sampleDelayMs); (tx, rx) = mon.GetTotals(); Console.WriteLine($"Transfer {tx}/{rx}"); @@ -108,4 +111,4 @@ public class ThriftTest return rt; } -} \ No newline at end of file +} diff --git a/Tests/RPC/Esiur/Program.cs b/Tests/RPC/Esiur/Program.cs index 63da94f..338f4d4 100644 --- a/Tests/RPC/Esiur/Program.cs +++ b/Tests/RPC/Esiur/Program.cs @@ -11,8 +11,9 @@ using System.Text; ushort port = 5005; -if (args.Count() > 0) - port = ushort.Parse(args[0]); +var portArg = args.FirstOrDefault(x => ushort.TryParse(x, out _)); +if (portArg != null) + port = ushort.Parse(portArg); Console.WriteLine($"Esiur server listening on port {port}..."); @@ -27,7 +28,12 @@ await wh.Open(); Console.WriteLine("Open"); -if (!Directory.Exists("template")) - Directory.CreateDirectory("template"); +if (args.Contains("--generate-client")) +{ + if (!Directory.Exists("template")) + Directory.CreateDirectory("template"); -TypeDefGenerator.GetTypes("ep://localhost:5005/sys/service", "template"); \ No newline at end of file + TypeDefGenerator.GetTypes($"ep://localhost:{port}/sys/service", "template"); +} + +await Task.Delay(Timeout.Infinite);