2
0
mirror of https://github.com/esiur/esiur-dotnet.git synced 2026-06-13 22:48:42 +00:00
This commit is contained in:
2026-06-09 01:40:29 +03:00
parent 2bdd5d5022
commit a741013621
19 changed files with 887 additions and 185 deletions
+15 -6
View File
@@ -9,6 +9,7 @@ using System.Collections.Generic;
using System.Data; using System.Data;
using System.Dynamic; using System.Dynamic;
using System.Linq; using System.Linq;
using System.Reflection;
using System.Text; using System.Text;
#nullable enable #nullable enable
@@ -368,13 +369,21 @@ namespace Esiur.Data
|| type.IsEnum) || type.IsEnum)
{ {
var typeDef = warehouse.GetLocalTypeDefByType(type); var remoteAttr = type.GetCustomAttribute<RemoteAttribute>();
if (typeDef == null) if (remoteAttr != null)
throw new Exception("Unregistered type: " + type.FullName + "."); {
var typeDef = warehouse.GetRemoteTypeDefByName(remoteAttr.Domains, remoteAttr.FullName);
}
else
{
var typeDef = warehouse.GetLocalTypeDefByType(type);
return new TruTypeDef(nullable, typeDef); if (typeDef == null)
throw new Exception("Unregistered type: " + type.FullName + ".");
return new TruTypeDef(nullable, typeDef);
}
} }
else if (type.IsGenericType) else if (type.IsGenericType)
{ {
@@ -867,7 +876,7 @@ namespace Esiur.Data
{ {
var td = connection.Instance.Warehouse.GetLocalTypeDefById(typeDefId); var td = connection.Instance.Warehouse.GetLocalTypeDefById(typeDefId);
if ( td == null) if (td == null)
throw new Exception("TypeDef not found."); throw new Exception("TypeDef not found.");
return new ParseResult<TruTypeDef>( return new ParseResult<TruTypeDef>(
@@ -900,7 +909,7 @@ namespace Esiur.Data
if (runtimeType != null && runtimeType.IsValueType && nullable) if (runtimeType != null && runtimeType.IsValueType && nullable)
{ {
//if (runtimeType.IsValueType)// && Nullable.GetUnderlyingType(runtimeType) == null) //if (runtimeType.IsValueType)// && Nullable.GetUnderlyingType(runtimeType) == null)
runtimeType = typeof(Nullable<>).MakeGenericType(runtimeType); runtimeType = typeof(Nullable<>).MakeGenericType(runtimeType);
} }
+2 -1
View File
@@ -264,7 +264,8 @@ public class LocalTypeDef:TypeDef
else else
throw new Exception("Type must implement IResource, IRecord or inherit from DistributedResource."); throw new Exception("Type must implement IResource, IRecord or inherit from DistributedResource.");
//IsWrapper = Codec.InheritsClass(type, typeof(EpResource)); if (type.GetCustomAttribute<RemoteAttribute>() != null)
throw new Exception("Remote types are not supported as local type definitions.");
type = ResourceProxy.GetBaseType(type); type = ResourceProxy.GetBaseType(type);
@@ -13,7 +13,7 @@ namespace Esiur.Net.Packets
Unsubscribe = 0x3, Unsubscribe = 0x3,
// Request Inquire // Request Inquire
TypeDefByName = 0x8, TypeDefIdsByNames = 0x8,
TypeDefById = 0x9, TypeDefById = 0x9,
TypeDefByResourceId = 0xA, TypeDefByResourceId = 0xA,
Query = 0xB, Query = 0xB,
+45 -7
View File
@@ -532,8 +532,8 @@ public partial class EpConnection : NetworkConnection, IStore
EpRequestUnsubscribe(_packet.CallbackId, dt); EpRequestUnsubscribe(_packet.CallbackId, dt);
break; break;
// Inquire // Inquire
case EpPacketRequest.TypeDefByName: case EpPacketRequest.TypeDefIdsByNames:
EpRequestTypeDefByName(_packet.CallbackId, dt); EpRequestTypeDefIdsByNames(_packet.CallbackId, dt);
break; break;
case EpPacketRequest.TypeDefById: case EpPacketRequest.TypeDefById:
EpRequestTypeDefById(_packet.CallbackId, dt); EpRequestTypeDefById(_packet.CallbackId, dt);
@@ -1031,7 +1031,7 @@ public partial class EpConnection : NetworkConnection, IStore
void AuthenticatonCompleted() async Task AuthenticatonCompleted()
{ {
if (this.Instance == null) if (this.Instance == null)
@@ -1062,12 +1062,50 @@ public partial class EpConnection : NetworkConnection, IStore
{ {
_authenticated = true; _authenticated = true;
Status = EpConnectionStatus.Connected; Status = EpConnectionStatus.Connected;
_openReply?.Trigger(true);
_openReply = null;
OnReady?.Invoke(this);
_session.AuthenticationHandler?.Provider?.Login(_session); _session.AuthenticationHandler?.Provider?.Login(_session);
//Server?.Membership?.Login(_session);
OnReady?.Invoke(this);
var proxyTypes = Instance.Warehouse.GetProxyTypesByDomain(_remoteDomain);
var typeDefNames = new List<string>();
foreach (var kk in proxyTypes)
{
foreach (var kv in kk.Value)
{
typeDefNames.Add(kv.Key);
}
}
if (typeDefNames.Count > 0)
{
GetTypeDefIds(typeDefNames.ToArray()).Then(ids =>
{
var bag = new AsyncBag<object>();
foreach (var id in ids)
bag.Add(FetchTypeDef(id, null));
bag.Seal();
bag.Then((o) =>
{
_openReply?.Trigger(true);
_openReply = null;
});
}).Error(ex =>
{
_openReply.TriggerError(ex);
// do nothing, proxies won't work but connection is established
});
}
else
{
_openReply?.Trigger(true);
_openReply = null;
}
} }
} }
//private void ProcessClientAuth(byte[] data) //private void ProcessClientAuth(byte[] data)
@@ -1011,17 +1011,25 @@ partial class EpConnection
Instance.Warehouse.Query(resourceLink).Then(queryCallback); Instance.Warehouse.Query(resourceLink).Then(queryCallback);
} }
void EpRequestTypeDefByName(uint callback, PlainTdu tdu) void EpRequestTypeDefIdsByNames(uint callback, PlainTdu tdu)
{ {
var value = Codec.ParseSync(tdu, Instance.Warehouse); var value = Codec.ParseSync(tdu, Instance.Warehouse);
var className = (string)value; var classNames = (string[])value;
var typeDef = Instance.Warehouse.GetRemoteTypeDefByName(_remoteDomain, className); var typeDefs = new List<ulong>();
if (typeDef != null) foreach (var className in classNames)
{ {
SendReply(EpPacketReply.Completed, callback, typeDef.Compose(this)); //@TODO: need to search in remoteTypeDefs as well
var typeDef = Instance.Warehouse.GetLocalTypeDefByName(className);
if (typeDef != null)
typeDefs.Add(typeDef.Id);
}
if (typeDefs.Count > 0)
{
SendReply(EpPacketReply.Completed, callback, typeDefs.ToArray());
} }
else else
{ {
@@ -2567,6 +2575,18 @@ partial class EpConnection
return reply; return reply;
} }
public AsyncReply<ulong[]> GetTypeDefIds(string[] fullNames)
{
var reply = new AsyncReply<ulong[]>();
SendRequest(EpPacketRequest.TypeDefIdsByNames, fullNames)
.Then(result =>
{
reply.Trigger((ulong[])result);
}).Error(ex => reply.TriggerError(ex));
return reply;
}
/// <summary> /// <summary>
/// Create a new resource. /// Create a new resource.
+13
View File
@@ -713,6 +713,11 @@ public class Warehouse
} }
} }
internal KeyList<TypeDefKind, KeyList<string , Type>> GetProxyTypesByDomain(string domain)
{
return _proxyTypeDefs[domain];
}
public bool TryRegisterRemoteTypeDef(string domain, RemoteTypeDef typeDef) public bool TryRegisterRemoteTypeDef(string domain, RemoteTypeDef typeDef)
{ {
lock (_typeDefsLock) lock (_typeDefsLock)
@@ -843,6 +848,14 @@ public class Warehouse
return _remoteTypeDefs[domain].Values.FirstOrDefault(x => x.Name == typeName); return _remoteTypeDefs[domain].Values.FirstOrDefault(x => x.Name == typeName);
} }
public TypeDef GetProxyTypeDef(string domain, string typeName, TypeDefKind? typeDefKind = null)
{
if (!string.IsNullOrEmpty(domain) || !_remoteTypeDefs.ContainsKey(domain))
return null;
sdcsdc
return _remoteTypeDefs[domain].Values.FirstOrDefault(x => x.Name == typeName);
}
//public TypeDef GetRemoteTypeDefByType(Type type) //public TypeDef GetRemoteTypeDefByType(Type type)
//{ //{
// var remoteAttr = type.GetCustomAttribute<RemoteAttribute>(); // var remoteAttr = type.GetCustomAttribute<RemoteAttribute>();
+19 -9
View File
@@ -12,6 +12,8 @@ namespace Esiur.Tests.RPC.Client;
public static class DocGenerator public static class DocGenerator
{ {
private static readonly DateTime BaseUtc = new(2024, 1, 1, 0, 0, 0, DateTimeKind.Utc);
public sealed class GenOptions public sealed class GenOptions
{ {
public int Lines { get; init; } = 20; // items count public int Lines { get; init; } = 20; // items count
@@ -35,12 +37,12 @@ public static class DocGenerator
var seller = MakeParty(rng, opt.IncludeV2Fields, isSeller: true, opt.IncludeUnicode); var seller = MakeParty(rng, opt.IncludeV2Fields, isSeller: true, opt.IncludeUnicode);
var buyer = MakeParty(rng, opt.IncludeV2Fields, isSeller: false, opt.IncludeUnicode); var buyer = MakeParty(rng, opt.IncludeV2Fields, isSeller: false, opt.IncludeUnicode);
var createdAt = DateTime.UtcNow.AddMinutes(-rng.Next(0, 60 * 24)); var createdAt = BaseUtc.AddMinutes(rng.Next(0, 60 * 24 * 365));
var doc = new BusinessDocument var doc = new BusinessDocument
{ {
Header = new DocumentHeader Header = new DocumentHeader
{ {
DocId = Guid.NewGuid().ToByteArray(), DocId = RandomBytes(rng, 16),
Type = (DocType)rng.Next(0, 4), Type = (DocType)rng.Next(0, 4),
Version = 1, Version = 1,
CreatedAt = createdAt, CreatedAt = createdAt,
@@ -102,7 +104,7 @@ public static class DocGenerator
var rng = new Random(seed); var rng = new Random(seed);
var v2 = DeepClone(v1); var v2 = DeepClone(v1);
v2.Header.UpdatedAt = DateTime.UtcNow; v2.Header.UpdatedAt = BaseUtc.AddMinutes(seed);
var toChange = Math.Max(1, (int)Math.Round(v2.Items.Length * changeRatio)); var toChange = Math.Max(1, (int)Math.Round(v2.Items.Length * changeRatio));
// change random lines // change random lines
@@ -113,7 +115,7 @@ public static class DocGenerator
li.Qty = RoundQty(li.Qty + (double)(rng.NextDouble() * 2.0 - 1.0)); // ±1 li.Qty = RoundQty(li.Qty + (double)(rng.NextDouble() * 2.0 - 1.0)); // ±1
li.UnitPrice = RoundMoney(li.UnitPrice * (double)(0.95 + rng.NextDouble() * 0.1)); // ±5% li.UnitPrice = RoundMoney(li.UnitPrice * (double)(0.95 + rng.NextDouble() * 0.1)); // ±5%
if (li.Ext == null) li.Ext = new Map<string, Variant>(); if (li.Ext == null) li.Ext = new Map<string, Variant>();
li.Ext["lastEdit"] = VDate(DateTime.UtcNow); li.Ext["lastEdit"] = VDate(BaseUtc.AddMinutes(seed + i));
} }
@@ -201,7 +203,7 @@ public static class DocGenerator
{ {
0 => VStr(rng.Next(0, 3) switch { 0 => "red", 1 => "blue", _ => "green" }), 0 => VStr(rng.Next(0, 3) switch { 0 => "red", 1 => "blue", _ => "green" }),
1 => VStr(rng.Next(0, 3) switch { 0 => "S", 1 => "M", _ => "L" }), 1 => VStr(rng.Next(0, 3) switch { 0 => "S", 1 => "M", _ => "L" }),
2 => VGuid(Guid.NewGuid()), 2 => VGuid(RandomGuid(rng)),
_ => VInt(rng.Next(0, 1000)) _ => VInt(rng.Next(0, 1000))
}); });
} }
@@ -216,7 +218,7 @@ public static class DocGenerator
Method = (PaymentMethod)rng.Next(0, 5), Method = (PaymentMethod)rng.Next(0, 5),
Amount = RoundMoney(amount), Amount = RoundMoney(amount),
Reference = "REF-" + rng.Next(100_000, 999_999), Reference = "REF-" + rng.Next(100_000, 999_999),
Timestamp = DateTime.UtcNow.AddMinutes(-rng.Next(0, 60 * 24)), Timestamp = BaseUtc.AddMinutes(rng.Next(0, 60 * 24 * 365)),
Fee = includeV2 && rng.Next(0, 2) == 0 ? RoundMoney((double)rng.NextDouble() * 2.0) : null, Fee = includeV2 && rng.Next(0, 2) == 0 ? RoundMoney((double)rng.NextDouble() * 2.0) : null,
//CurrencyOverride = includeV2 && rng.Next(0, 2) == 0 ? Currency.IQD : Currency.USD //CurrencyOverride = includeV2 && rng.Next(0, 2) == 0 ? Currency.IQD : Currency.USD
}; };
@@ -254,6 +256,14 @@ public static class DocGenerator
// -------------------------- Utils -------------------------- // -------------------------- Utils --------------------------
private static double RoundMoney(double v) => Math.Round(v, 2, MidpointRounding.AwayFromZero); private static double RoundMoney(double v) => Math.Round(v, 2, MidpointRounding.AwayFromZero);
private static double RoundQty(double v) => Math.Round(v, 3, MidpointRounding.AwayFromZero); private static double RoundQty(double v) => Math.Round(v, 3, MidpointRounding.AwayFromZero);
private static byte[] RandomBytes(Random rng, int bytes)
{
var data = new byte[bytes];
rng.NextBytes(data);
return data;
}
private static Guid RandomGuid(Random rng) => new(RandomBytes(rng, 16));
/// <summary> /// <summary>
/// Simple deep clone via manual copy to stay serializer-agnostic. /// Simple deep clone via manual copy to stay serializer-agnostic.
@@ -371,7 +381,7 @@ public static class DocGenerator
IncludeV2Fields = (i % 2 == 0), IncludeV2Fields = (i % 2 == 0),
IncludeUnicode = true, IncludeUnicode = true,
RiskScores = 100, RiskScores = 100,
Seed = 1000 + i Seed = seed + i
}); });
items.Add(doc); items.Add(doc);
@@ -394,7 +404,7 @@ public static class DocGenerator
IncludeV2Fields = (i % 3 == 0), IncludeV2Fields = (i % 3 == 0),
IncludeUnicode = true, IncludeUnicode = true,
RiskScores = 1000, RiskScores = 1000,
Seed = 2000 + i Seed = seed + 10_000 + i
}); });
items.Add(doc); items.Add(doc);
@@ -418,7 +428,7 @@ public static class DocGenerator
IncludeV2Fields = (i % 2 == 1), IncludeV2Fields = (i % 2 == 1),
IncludeUnicode = true, IncludeUnicode = true,
RiskScores = 3000, RiskScores = 3000,
Seed = 3000 + i Seed = seed + 20_000 + i
}); });
items.Add(doc); items.Add(doc);
+11 -8
View File
@@ -10,13 +10,16 @@ namespace Esiur.Tests.RPC.Client
public static async Task<TestResults> DoTest(string address, public static async Task<TestResults> DoTest(string address,
Dictionary<string, BusinessDocument[]> docsWorkloads, Dictionary<string, BusinessDocument[]> docsWorkloads,
Dictionary<string, byte[]> dataWorkloads, Dictionary<string, byte[]> dataWorkloads,
Dictionary<string, int[]> intWorkloads ) Dictionary<string, int[]> intWorkloads,
int warmupDelayMs = 3000,
int postHandshakeDelayMs = 2000,
int sampleDelayMs = 3000)
{ {
var rt = new TestResults(); var rt = new TestResults();
using var mon = new PerProcessNetMonitor(Process.GetCurrentProcess().Id); using var mon = new PerProcessNetMonitor(Process.GetCurrentProcess().Id);
mon.Start(); //mon.Start();
Console.WriteLine($"\n== Esiur @ {address} =="); Console.WriteLine($"\n== Esiur @ {address} ==");
@@ -28,13 +31,13 @@ namespace Esiur.Tests.RPC.Client
var sock = service.ResourceConnection.Socket as TcpSocket; var sock = service.ResourceConnection.Socket as TcpSocket;
Thread.Sleep(3000); Thread.Sleep(warmupDelayMs);
var (tx, rx, ctx, crx) = mon.GetDiff(0, 0); var (tx, rx, ctx, crx) = mon.GetDiff(0, 0);
Console.WriteLine($"Handshake {ctx}/{crx}"); Console.WriteLine($"Handshake {ctx}/{crx}");
await Task.Delay(2000); await Task.Delay(postHandshakeDelayMs);
foreach (var w in docsWorkloads) foreach (var w in docsWorkloads)
{ {
@@ -47,7 +50,7 @@ namespace Esiur.Tests.RPC.Client
throw new Exception("No match"); throw new Exception("No match");
await Task.Delay(3000); await Task.Delay(sampleDelayMs);
(tx, rx, ctx, crx) = mon.GetDiff(tx, rx); (tx, rx, ctx, crx) = mon.GetDiff(tx, rx);
Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}"); Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}");
Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}"); Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}");
@@ -67,7 +70,7 @@ namespace Esiur.Tests.RPC.Client
throw new Exception("No match"); throw new Exception("No match");
await Task.Delay(3000); await Task.Delay(sampleDelayMs);
(tx, rx, ctx, crx) = mon.GetDiff(tx, rx); (tx, rx, ctx, crx) = mon.GetDiff(tx, rx);
Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}"); Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}");
Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}"); Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}");
@@ -87,7 +90,7 @@ namespace Esiur.Tests.RPC.Client
throw new Exception("No match"); throw new Exception("No match");
await Task.Delay(3000); await Task.Delay(sampleDelayMs);
(tx, rx, ctx, crx) = mon.GetDiff(tx, rx); (tx, rx, ctx, crx) = mon.GetDiff(tx, rx);
Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}"); Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}");
Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}"); Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}");
@@ -97,7 +100,7 @@ namespace Esiur.Tests.RPC.Client
} }
await Task.Delay(3000); await Task.Delay(sampleDelayMs);
(tx, rx) = mon.GetTotals(); (tx, rx) = mon.GetTotals();
Console.WriteLine($"Transfer {tx}/{rx}"); Console.WriteLine($"Transfer {tx}/{rx}");
+250
View File
@@ -0,0 +1,250 @@
using System.Globalization;
using System.Runtime.InteropServices;
using System.Text;
namespace Esiur.Tests.RPC.Client;
public sealed record ExperimentRunSettings(
int Rounds,
int BaseSeed,
int SerializationIterations,
int WarmupDelayMs,
int PostHandshakeDelayMs,
int SampleDelayMs,
int ProtocolTimeoutMs,
bool RanRpc,
bool RanSerialization);
public sealed record TransferSample(
int Round,
string Protocol,
string Category,
string Workload,
long TxBytes,
long RxBytes);
public static class ExperimentResultWriter
{
public static string WriteAll(
string outputDirectory,
ExperimentRunSettings settings,
IReadOnlyDictionary<string, List<TestResults>> transferResults,
IReadOnlyList<SerializationSample> serializationSamples)
{
Directory.CreateDirectory(outputDirectory);
var transferSamples = FlattenTransferResults(transferResults).ToList();
var transferSummary = SummarizeTransfer(transferSamples).ToList();
var serializationSummary = SummarizeSerialization(serializationSamples).ToList();
WriteTransferDetail(Path.Combine(outputDirectory, "transfer-detail.csv"), transferSamples);
WriteTransferSummary(Path.Combine(outputDirectory, "transfer-summary.csv"), transferSummary);
WriteSerializationDetail(Path.Combine(outputDirectory, "serialization-detail.csv"), serializationSamples);
WriteSerializationSummary(Path.Combine(outputDirectory, "serialization-summary.csv"), serializationSummary);
var reportPath = Path.Combine(outputDirectory, "report.md");
WriteReport(reportPath, settings, transferSummary, serializationSummary);
return reportPath;
}
private static IEnumerable<TransferSample> FlattenTransferResults(IReadOnlyDictionary<string, List<TestResults>> results)
{
foreach (var protocol in results.OrderBy(x => x.Key))
{
for (var round = 0; round < protocol.Value.Count; round++)
{
foreach (var sample in FlattenCategory(protocol.Value[round].Docs, round + 1, protocol.Key, "Docs"))
yield return sample;
foreach (var sample in FlattenCategory(protocol.Value[round].Bytes, round + 1, protocol.Key, "Bytes"))
yield return sample;
foreach (var sample in FlattenCategory(protocol.Value[round].Ints, round + 1, protocol.Key, "Ints"))
yield return sample;
}
}
}
private static IEnumerable<TransferSample> FlattenCategory(
Dictionary<string, (long txBytes, long rxBytes)> category,
int round,
string protocol,
string categoryName)
{
foreach (var sample in category.OrderBy(x => x.Key))
yield return new TransferSample(round, protocol, categoryName, sample.Key, sample.Value.txBytes, sample.Value.rxBytes);
}
private static IEnumerable<(string Protocol, string Category, string Workload, int Count, NumberStats Tx, NumberStats Rx)> SummarizeTransfer(
IReadOnlyList<TransferSample> samples)
{
return samples
.GroupBy(x => new { x.Protocol, x.Category, x.Workload })
.OrderBy(x => x.Key.Protocol)
.ThenBy(x => x.Key.Category)
.ThenBy(x => x.Key.Workload)
.Select(x => (
x.Key.Protocol,
x.Key.Category,
x.Key.Workload,
x.Count(),
NumberStats.From(x.Select(v => (double)v.TxBytes)),
NumberStats.From(x.Select(v => (double)v.RxBytes))));
}
private static IEnumerable<(string Protocol, string Category, string Workload, int Count, NumberStats Payload, NumberStats Serialize, NumberStats Deserialize)> SummarizeSerialization(
IReadOnlyList<SerializationSample> samples)
{
return samples
.GroupBy(x => new { x.Protocol, x.Category, x.Workload })
.OrderBy(x => x.Key.Protocol)
.ThenBy(x => x.Key.Category)
.ThenBy(x => x.Key.Workload)
.Select(x => (
x.Key.Protocol,
x.Key.Category,
x.Key.Workload,
x.Count(),
NumberStats.From(x.Select(v => (double)v.PayloadBytes)),
NumberStats.From(x.Select(v => v.SerializeMs)),
NumberStats.From(x.Select(v => v.DeserializeMs))));
}
private static void WriteTransferDetail(string path, IReadOnlyList<TransferSample> samples)
{
var csv = new StringBuilder();
csv.AppendLine("round,protocol,category,workload,tx_bytes,rx_bytes");
foreach (var x in samples)
csv.AppendLine(string.Join(",", x.Round, Csv(x.Protocol), Csv(x.Category), Csv(x.Workload), x.TxBytes, x.RxBytes));
File.WriteAllText(path, csv.ToString());
}
private static void WriteTransferSummary(
string path,
IReadOnlyList<(string Protocol, string Category, string Workload, int Count, NumberStats Tx, NumberStats Rx)> rows)
{
var csv = new StringBuilder();
csv.AppendLine("protocol,category,workload,samples,tx_avg_bytes,tx_min_bytes,tx_max_bytes,tx_median_bytes,rx_avg_bytes,rx_min_bytes,rx_max_bytes,rx_median_bytes");
foreach (var x in rows)
{
csv.AppendLine(string.Join(",",
Csv(x.Protocol), Csv(x.Category), Csv(x.Workload), x.Count,
D(x.Tx.Average), D(x.Tx.Minimum), D(x.Tx.Maximum), D(x.Tx.Median),
D(x.Rx.Average), D(x.Rx.Minimum), D(x.Rx.Maximum), D(x.Rx.Median)));
}
File.WriteAllText(path, csv.ToString());
}
private static void WriteSerializationDetail(string path, IReadOnlyList<SerializationSample> samples)
{
var csv = new StringBuilder();
csv.AppendLine("round,seed,protocol,category,workload,payload_bytes,serialize_ms,deserialize_ms");
foreach (var x in samples)
{
csv.AppendLine(string.Join(",",
x.Round, x.Seed, Csv(x.Protocol), Csv(x.Category), Csv(x.Workload),
x.PayloadBytes, D(x.SerializeMs), D(x.DeserializeMs)));
}
File.WriteAllText(path, csv.ToString());
}
private static void WriteSerializationSummary(
string path,
IReadOnlyList<(string Protocol, string Category, string Workload, int Count, NumberStats Payload, NumberStats Serialize, NumberStats Deserialize)> rows)
{
var csv = new StringBuilder();
csv.AppendLine("protocol,category,workload,samples,payload_avg_bytes,payload_min_bytes,payload_max_bytes,payload_median_bytes,serialize_avg_ms,serialize_min_ms,serialize_max_ms,serialize_median_ms,deserialize_avg_ms,deserialize_min_ms,deserialize_max_ms,deserialize_median_ms");
foreach (var x in rows)
{
csv.AppendLine(string.Join(",",
Csv(x.Protocol), Csv(x.Category), Csv(x.Workload), x.Count,
D(x.Payload.Average), D(x.Payload.Minimum), D(x.Payload.Maximum), D(x.Payload.Median),
D(x.Serialize.Average), D(x.Serialize.Minimum), D(x.Serialize.Maximum), D(x.Serialize.Median),
D(x.Deserialize.Average), D(x.Deserialize.Minimum), D(x.Deserialize.Maximum), D(x.Deserialize.Median)));
}
File.WriteAllText(path, csv.ToString());
}
private static void WriteReport(
string path,
ExperimentRunSettings settings,
IReadOnlyList<(string Protocol, string Category, string Workload, int Count, NumberStats Tx, NumberStats Rx)> transferRows,
IReadOnlyList<(string Protocol, string Category, string Workload, int Count, NumberStats Payload, NumberStats Serialize, NumberStats Deserialize)> serializationRows)
{
var md = new StringBuilder();
md.AppendLine("# RPC Serialization Supplementary Experiment");
md.AppendLine();
md.AppendLine("## Run Configuration");
md.AppendLine();
md.AppendLine($"- Generated: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss} UTC");
md.AppendLine($"- Rounds: {settings.Rounds}");
md.AppendLine($"- Base seed: {settings.BaseSeed}");
md.AppendLine($"- Serialization iterations per workload sample: {settings.SerializationIterations}");
md.AppendLine($"- RPC warmup/post-handshake/sample delays: {settings.WarmupDelayMs}/{settings.PostHandshakeDelayMs}/{settings.SampleDelayMs} ms");
md.AppendLine($"- Per-protocol RPC timeout: {settings.ProtocolTimeoutMs} ms");
md.AppendLine($"- Runtime: {RuntimeInformation.FrameworkDescription}");
md.AppendLine($"- OS: {RuntimeInformation.OSDescription}");
md.AppendLine($"- Architecture: {RuntimeInformation.ProcessArchitecture}");
md.AppendLine($"- Logical processors: {Environment.ProcessorCount}");
md.AppendLine();
md.AppendLine("The document workloads are synthetic but deterministic. Each round uses `baseSeed + (round - 1) * 1000`; workloads cover small, medium, and large business documents with nested records, enums, nullable values, maps, Unicode text, integer arrays, and attachments.");
md.AppendLine();
md.AppendLine("Serialization measurements are local codec payload measurements. They include Esiur, gRPC, JSON, and SignalR JSON model payloads. Thrift is included in RPC transfer measurements; local Thrift codec timing is not emitted because the current Thrift package does not expose a stable in-memory transport in this test project.");
md.AppendLine();
md.AppendLine("RPC transfer counters use Windows ETW kernel network tracing. If the client is not run with Administrator rights, the RPC calls still execute but transfer counters are recorded as zero. Protocol failures or timeouts are printed to the console and omitted from the transfer summary.");
md.AppendLine();
md.AppendLine("## Serialization Payload Summary");
md.AppendLine();
md.AppendLine("| Protocol | Category | Workload | Samples | Payload avg bytes | Encode avg ms | Decode avg ms |");
md.AppendLine("|---|---|---:|---:|---:|---:|---:|");
foreach (var row in serializationRows)
{
md.AppendLine($"| {row.Protocol} | {row.Category} | {row.Workload} | {row.Count} | {D(row.Payload.Average)} | {D(row.Serialize.Average)} | {D(row.Deserialize.Average)} |");
}
md.AppendLine();
md.AppendLine("## RPC Transfer Summary");
md.AppendLine();
md.AppendLine("| Protocol | Category | Workload | Samples | TX avg bytes | RX avg bytes |");
md.AppendLine("|---|---|---:|---:|---:|---:|");
foreach (var row in transferRows)
{
md.AppendLine($"| {row.Protocol} | {row.Category} | {row.Workload} | {row.Count} | {D(row.Tx.Average)} | {D(row.Rx.Average)} |");
}
md.AppendLine();
md.AppendLine("## Output Files");
md.AppendLine();
md.AppendLine("- `serialization-detail.csv`: per-round codec payload bytes and encode/decode timing.");
md.AppendLine("- `serialization-summary.csv`: aggregate codec payload and timing statistics.");
md.AppendLine("- `transfer-detail.csv`: per-round process network TX/RX deltas from the RPC client.");
md.AppendLine("- `transfer-summary.csv`: aggregate RPC transfer statistics.");
File.WriteAllText(path, md.ToString());
}
private static string Csv(string value)
{
if (value.Contains(',') || value.Contains('"') || value.Contains('\n') || value.Contains('\r'))
return "\"" + value.Replace("\"", "\"\"") + "\"";
return value;
}
private static string D(double value)
=> value.ToString("0.###", CultureInfo.InvariantCulture);
public readonly record struct NumberStats(double Average, double Minimum, double Maximum, double Median)
{
public static NumberStats From(IEnumerable<double> values)
{
var sorted = values.OrderBy(x => x).ToArray();
if (sorted.Length == 0)
return new NumberStats(double.NaN, double.NaN, double.NaN, double.NaN);
var median = sorted.Length % 2 == 1
? sorted[sorted.Length / 2]
: (sorted[sorted.Length / 2 - 1] + sorted[sorted.Length / 2]) / 2.0;
return new NumberStats(sorted.Average(), sorted[0], sorted[^1], median);
}
}
}
+10 -7
View File
@@ -16,7 +16,10 @@ public class GrpcTest
public static async Task<TestResults> DoTest(string address, public static async Task<TestResults> DoTest(string address,
Dictionary<string, BusinessDocument[]> docsWorkloads, Dictionary<string, BusinessDocument[]> docsWorkloads,
Dictionary<string, byte[]> dataWorkloads, Dictionary<string, byte[]> dataWorkloads,
Dictionary<string, int[]> intWorkloads) Dictionary<string, int[]> intWorkloads,
int warmupDelayMs = 3000,
int postHandshakeDelayMs = 2000,
int sampleDelayMs = 3000)
{ {
var rt = new TestResults(); var rt = new TestResults();
@@ -29,13 +32,13 @@ public class GrpcTest
var service = new Client.Grpc.EchoService.EchoServiceClient(channel); var service = new Client.Grpc.EchoService.EchoServiceClient(channel);
Thread.Sleep(3000); Thread.Sleep(warmupDelayMs);
var (tx, rx, ctx, crx) = mon.GetDiff(0, 0); var (tx, rx, ctx, crx) = mon.GetDiff(0, 0);
Console.WriteLine($"Handshake {ctx}/{crx}"); Console.WriteLine($"Handshake {ctx}/{crx}");
await Task.Delay(2000); await Task.Delay(postHandshakeDelayMs);
foreach (var w in docsWorkloads) foreach (var w in docsWorkloads)
{ {
@@ -50,7 +53,7 @@ public class GrpcTest
// throw new Exception("No match"); // throw new Exception("No match");
await Task.Delay(3000); await Task.Delay(sampleDelayMs);
(tx, rx, ctx, crx) = mon.GetDiff(tx, rx); (tx, rx, ctx, crx) = mon.GetDiff(tx, rx);
Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}"); Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}");
@@ -72,7 +75,7 @@ public class GrpcTest
// throw new Exception("No match"); // throw new Exception("No match");
await Task.Delay(3000); await Task.Delay(sampleDelayMs);
(tx, rx, ctx, crx) = mon.GetDiff(tx, rx); (tx, rx, ctx, crx) = mon.GetDiff(tx, rx);
Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}"); Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}");
//Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}"); //Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}");
@@ -96,7 +99,7 @@ public class GrpcTest
// throw new Exception("No match"); // throw new Exception("No match");
await Task.Delay(3000); await Task.Delay(sampleDelayMs);
(tx, rx, ctx, crx) = mon.GetDiff(tx, rx); (tx, rx, ctx, crx) = mon.GetDiff(tx, rx);
Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}"); Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}");
//Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}"); //Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}");
@@ -105,7 +108,7 @@ public class GrpcTest
} }
await Task.Delay(3000); await Task.Delay(sampleDelayMs);
(tx, rx) = mon.GetTotals(); (tx, rx) = mon.GetTotals();
Console.WriteLine($"Transfer {tx}/{rx}"); Console.WriteLine($"Transfer {tx}/{rx}");
+10 -7
View File
@@ -15,7 +15,10 @@ public class JsonTest
public static async Task<TestResults> DoTest(string address, public static async Task<TestResults> DoTest(string address,
Dictionary<string, BusinessDocument[]> docsWorkloads, Dictionary<string, BusinessDocument[]> docsWorkloads,
Dictionary<string, byte[]> dataWorkloads, Dictionary<string, byte[]> dataWorkloads,
Dictionary<string, int[]> intWorkloads) Dictionary<string, int[]> intWorkloads,
int warmupDelayMs = 3000,
int postHandshakeDelayMs = 2000,
int sampleDelayMs = 3000)
{ {
var rt = new TestResults(); var rt = new TestResults();
@@ -28,20 +31,20 @@ public class JsonTest
using var http = new HttpClient { BaseAddress = new Uri(address) }; using var http = new HttpClient { BaseAddress = new Uri(address) };
Thread.Sleep(3000); Thread.Sleep(warmupDelayMs);
var (tx, rx, ctx, crx) = mon.GetDiff(0, 0); var (tx, rx, ctx, crx) = mon.GetDiff(0, 0);
Console.WriteLine($"Handshake {ctx}/{crx}"); Console.WriteLine($"Handshake {ctx}/{crx}");
await Task.Delay(2000); await Task.Delay(postHandshakeDelayMs);
foreach (var w in docsWorkloads) foreach (var w in docsWorkloads)
{ {
Console.Write("Workload: " + w.Key); Console.Write("Workload: " + w.Key);
var docs = await JsonRpcCallAsync(http, "EchoDocuments", w.Value); var docs = await JsonRpcCallAsync(http, "EchoDocuments", w.Value);
await Task.Delay(3000); await Task.Delay(sampleDelayMs);
(tx, rx, ctx, crx) = mon.GetDiff(tx, rx); (tx, rx, ctx, crx) = mon.GetDiff(tx, rx);
Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}"); Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}");
@@ -62,7 +65,7 @@ public class JsonTest
// throw new Exception("No match"); // throw new Exception("No match");
await Task.Delay(3000); await Task.Delay(sampleDelayMs);
(tx, rx, ctx, crx) = mon.GetDiff(tx, rx); (tx, rx, ctx, crx) = mon.GetDiff(tx, rx);
Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}"); Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}");
//Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}"); //Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}");
@@ -82,7 +85,7 @@ public class JsonTest
// throw new Exception("No match"); // throw new Exception("No match");
await Task.Delay(3000); await Task.Delay(sampleDelayMs);
(tx, rx, ctx, crx) = mon.GetDiff(tx, rx); (tx, rx, ctx, crx) = mon.GetDiff(tx, rx);
Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}"); Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}");
//Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}"); //Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}");
@@ -91,7 +94,7 @@ public class JsonTest
} }
await Task.Delay(3000); await Task.Delay(sampleDelayMs);
(tx, rx) = mon.GetTotals(); (tx, rx) = mon.GetTotals();
Console.WriteLine($"Transfer {tx}/{rx}"); Console.WriteLine($"Transfer {tx}/{rx}");
@@ -35,6 +35,7 @@ namespace Esiur.Tests.RPC.EsiurServer
.Chunk(x => rt.TriggerChunk(x)); .Chunk(x => rt.TriggerChunk(x));
return rt; return rt;
} }
[Annotation("", "([BusinessDocument[]] payload) -> BusinessDocument[]")] [Annotation("", "([BusinessDocument[]] payload) -> BusinessDocument[]")]
[Export] [Export]
public AsyncReply<Esiur.Tests.RPC.EsiurServer.BusinessDocument[]> EchoDocuments(Esiur.Tests.RPC.EsiurServer.BusinessDocument[] payload) public AsyncReply<Esiur.Tests.RPC.EsiurServer.BusinessDocument[]> EchoDocuments(Esiur.Tests.RPC.EsiurServer.BusinessDocument[] payload)
@@ -47,6 +48,7 @@ namespace Esiur.Tests.RPC.EsiurServer
.Chunk(x => rt.TriggerChunk(x)); .Chunk(x => rt.TriggerChunk(x));
return rt; return rt;
} }
[Annotation("", "([DocType[]] payload) -> DocType[]")] [Annotation("", "([DocType[]] payload) -> DocType[]")]
[Export] [Export]
public AsyncReply<Esiur.Tests.RPC.EsiurServer.DocType[]> EchoEnumArray(Esiur.Tests.RPC.EsiurServer.DocType[] payload) public AsyncReply<Esiur.Tests.RPC.EsiurServer.DocType[]> EchoEnumArray(Esiur.Tests.RPC.EsiurServer.DocType[] payload)
+19 -5
View File
@@ -17,6 +17,7 @@ public class PerProcessNetMonitor : IDisposable
private long _txBytes; private long _txBytes;
private long _rxBytes; private long _rxBytes;
private volatile bool _running; private volatile bool _running;
private static int _warningWritten;
public PerProcessNetMonitor(int pid) public PerProcessNetMonitor(int pid)
{ {
@@ -25,12 +26,25 @@ public class PerProcessNetMonitor : IDisposable
public void Start() public void Start()
{ {
// Use a unique session name try
string sessionName = "NetMon_" + Guid.NewGuid(); {
_session = new TraceEventSession(sessionName); // Use a unique session name
string sessionName = "NetMon_" + Guid.NewGuid();
_session = new TraceEventSession(sessionName);
// Enable kernel network provider // Enable kernel network provider
_session.EnableKernelProvider(KernelTraceEventParser.Keywords.NetworkTCPIP); _session.EnableKernelProvider(KernelTraceEventParser.Keywords.NetworkTCPIP);
}
catch (UnauthorizedAccessException ex)
{
_session?.Dispose();
_session = null;
if (Interlocked.Exchange(ref _warningWritten, 1) == 0)
Console.WriteLine($"Network monitor disabled: {ex.Message}");
return;
}
_running = true; _running = true;
_listenTask = Task.Run(() => _listenTask = Task.Run(() =>
+236 -91
View File
@@ -1,63 +1,215 @@
using MQTTnet;
using Esiur.Tests.RPC.Client; using Esiur.Tests.RPC.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
var rounds = ReadIntArg(args, "--rounds", 10);
var baseSeed = ReadIntArg(args, "--seed", 1000);
var serializationIterations = ReadIntArg(args, "--serialization-iterations", 3);
var warmupDelayMs = ReadIntArg(args, "--warmup-ms", 3000);
var postHandshakeDelayMs = ReadIntArg(args, "--post-handshake-ms", 2000);
var sampleDelayMs = ReadIntArg(args, "--sample-ms", 3000);
var protocolTimeoutMs = ReadIntArg(args, "--protocol-timeout-ms", 120000);
var runRpc = !HasArg(args, "--skip-rpc");
var runSerialization = false;// !HasArg(args, "--skip-serialization");
var outputDirectory = Path.GetFullPath(ReadStringArg(
args,
"--output",
Path.Combine("Tests", "RPC", "Results", DateTime.UtcNow.ToString("yyyyMMdd-HHmmss"))));
var results = new Dictionary<string, List<TestResults>>(); var results = new Dictionary<string, List<TestResults>>
results.Add("esiur", new List<TestResults>());
results.Add("grpc", new List<TestResults>());
results.Add("thrift", new List<TestResults>());
results.Add("json", new List<TestResults>());
results.Add("signalr", new List<TestResults>());
for (var i = 0; i < 10; i++)
{ {
var seed = 1000 + (i * 1000); ["esiur"] = new(),
["grpc"] = new(),
["thrift"] = new(),
["json"] = new(),
["signalr"] = new()
};
var docsWorkloads = new Dictionary<string, Esiur.Tests.RPC.EsiurServer.BusinessDocument[]>();// RPC.Client.Tests.DocGenerator.BuildWorkloads(seed); var serializationSamples = new List<SerializationSample>();
Console.WriteLine("RPC supplementary experiment");
Console.WriteLine($"Rounds={rounds}, seed={baseSeed}, serialization iterations={serializationIterations}");
Console.WriteLine($"Output={outputDirectory}");
for (var i = 0; i < rounds; i++)
{
var round = i + 1;
var seed = baseSeed + (i * 1000);
Console.WriteLine($"\n# Round {round}/{rounds}, seed {seed}");
var docsWorkloads = DocGenerator.BuildWorkloads(seed);
var dataWorkLoads = Shared.BuildBytesWorkLoads(seed); var dataWorkLoads = Shared.BuildBytesWorkLoads(seed);
var intWorkloads = Shared.BuildIntWorkloads(seed); var intWorkloads = Shared.BuildIntWorkloads(seed);
results["esiur"].Add( if (runSerialization)
await EsiurTest.DoTest("ep://localhost:5005/sys/service", docsWorkloads, dataWorkLoads, intWorkloads) {
); Console.WriteLine("Collecting local serialization samples...");
serializationSamples.AddRange(SerializationExperiment.RunRound(
round,
seed,
docsWorkloads,
dataWorkLoads,
intWorkloads,
serializationIterations));
}
results["thrift"].Add( if (!runRpc)
await ThriftTest.DoTest("127.0.0.1", 5400, continue;
docsWorkloads.ToDictionary(x => x.Key, v => v.Value.Select(x => x.ToThrift()).ToArray()),
dataWorkLoads,
intWorkloads
)
);
results["signalr"].Add(await SignalRTest.DoTest("http://127.0.0.1:5200/hub/echo", var thriftDocs = docsWorkloads.ToDictionary(x => x.Key, v => v.Value.Select(x => x.ToThrift()).ToArray());
docsWorkloads.ToDictionary(x => x.Key, v => v.Value.Select(x => x.ToShared()).ToArray()), var signalRDocs = docsWorkloads.ToDictionary(x => x.Key, v => v.Value.Select(x => x.ToShared()).ToArray());
dataWorkLoads, var grpcDocs = docsWorkloads.ToDictionary(x => x.Key, v => v.Value.Select(x => x.ToGrpc()).ToArray());
intWorkloads
));
results["json"].Add( await JsonTest.DoTest("http://127.0.0.1:5100", if (await RunProtocol("esiur", () => EsiurTest.DoTest(
"ep://localhost:5005/sys/service",
docsWorkloads, docsWorkloads,
dataWorkLoads, dataWorkLoads,
intWorkloads intWorkloads,
) ); warmupDelayMs,
postHandshakeDelayMs,
sampleDelayMs),
protocolTimeoutMs) is { } esiurResults)
{
results["esiur"].Add(esiurResults);
}
results["grpc"].Add(await GrpcTest.DoTest("http://127.0.0.1:5300", if (await RunProtocol("thrift", () => ThriftTest.DoTest(
docsWorkloads.ToDictionary(x => x.Key, v => v.Value.Select(x => x.ToGrpc()).ToArray()), "127.0.0.1",
5400,
thriftDocs,
dataWorkLoads, dataWorkLoads,
intWorkloads intWorkloads,
)); warmupDelayMs,
postHandshakeDelayMs,
sampleDelayMs),
protocolTimeoutMs) is { } thriftResults)
{
results["thrift"].Add(thriftResults);
}
if (await RunProtocol("signalr", () => SignalRTest.DoTest(
"http://127.0.0.1:5200/hub/echo",
signalRDocs,
dataWorkLoads,
intWorkloads,
warmupDelayMs,
postHandshakeDelayMs,
sampleDelayMs),
protocolTimeoutMs) is { } signalRResults)
{
results["signalr"].Add(signalRResults);
}
if (await RunProtocol("json", () => JsonTest.DoTest(
"http://127.0.0.1:5100",
docsWorkloads,
dataWorkLoads,
intWorkloads,
warmupDelayMs,
postHandshakeDelayMs,
sampleDelayMs),
protocolTimeoutMs) is { } jsonResults)
{
results["json"].Add(jsonResults);
}
if (await RunProtocol("grpc", () => GrpcTest.DoTest(
"http://127.0.0.1:5300",
grpcDocs,
dataWorkLoads,
intWorkloads,
warmupDelayMs,
postHandshakeDelayMs,
sampleDelayMs),
protocolTimeoutMs) is { } grpcResults)
{
results["grpc"].Add(grpcResults);
}
}
if (runRpc)
PrintTransferStats(results);
var reportPath = ExperimentResultWriter.WriteAll(
outputDirectory,
new ExperimentRunSettings(
rounds,
baseSeed,
serializationIterations,
warmupDelayMs,
postHandshakeDelayMs,
sampleDelayMs,
protocolTimeoutMs,
runRpc,
runSerialization),
results,
serializationSamples);
Console.WriteLine($"\nReport written to {reportPath}");
static void PrintTransferStats(Dictionary<string, List<TestResults>> results)
{
foreach (var transport in results.Keys)
{
Console.WriteLine($"\n== Stats for {transport} ==");
var rounds = results[transport];
if (rounds.Count == 0)
{
Console.WriteLine("No results.");
continue;
}
var categories = new Dictionary<string, Func<TestResults, Dictionary<string, (long, long)>>>
{
{ "Docs", tr => tr.Docs },
{ "Bytes", tr => tr.Bytes },
{ "Ints", tr => tr.Ints }
};
foreach (var cat in categories)
{
Console.WriteLine($"-- {cat.Key} --");
var allKeys = new HashSet<string>();
foreach (var r in rounds)
{
foreach (var k in cat.Value(r).Keys)
allKeys.Add(k);
}
foreach (var key in allKeys.OrderBy(k => k))
{
var txList = new List<long>();
var rxList = new List<long>();
foreach (var r in rounds)
{
if (cat.Value(r).TryGetValue(key, out var tup))
{
txList.Add(tup.Item1);
rxList.Add(tup.Item2);
}
}
if (txList.Count == 0)
{
Console.WriteLine($"{key}: no samples");
continue;
}
var sTx = StatsLongs(txList);
var sRx = StatsLongs(rxList);
Console.WriteLine($"{key}: TX avg={sTx.avg:0.##}, min={sTx.min}, max={sTx.max}, med={sTx.median:0.##} | RX avg={sRx.avg:0.##}, min={sRx.min}, max={sRx.max}, med={sRx.median:0.##}");
}
}
}
} }
// Compute statistics: average, min, max, median for tx/rx per transport and workload
static (double avg, long min, long max, double median) StatsLongs(List<long> xs) static (double avg, long min, long max, double median) StatsLongs(List<long> xs)
{ {
if (xs == null || xs.Count == 0) return (double.NaN, 0, 0, double.NaN); if (xs == null || xs.Count == 0)
return (double.NaN, 0, 0, double.NaN);
xs.Sort(); xs.Sort();
double avg = xs.Average(x => (double)x); double avg = xs.Average(x => (double)x);
long min = xs.First(); long min = xs.First();
@@ -66,60 +218,53 @@ static (double avg, long min, long max, double median) StatsLongs(List<long> xs)
return (avg, min, max, median); return (avg, min, max, median);
} }
foreach (var transport in results.Keys) static async Task<TestResults?> RunProtocol(string protocol, Func<Task<TestResults>> action, int timeoutMs)
{ {
Console.WriteLine($"\n== Stats for {transport} =="); try
var rounds = results[transport];
if (rounds.Count == 0)
{ {
Console.WriteLine("No results."); var task = action();
continue;
if (timeoutMs > 0)
{
var completed = await Task.WhenAny(task, Task.Delay(timeoutMs));
if (completed != task)
{
Console.WriteLine($"{protocol} failed: timed out after {timeoutMs} ms");
return null;
}
}
return await task;
} }
catch (Exception ex)
// categories: Docs, Bytes, Ints
var categories = new Dictionary<string, Func<TestResults, Dictionary<string, (long, long)>>>()
{ {
{ "Docs", tr => tr.Docs }, Console.WriteLine($"{protocol} failed: {ex.GetType().Name}: {ex.Message}");
{ "Bytes", tr => tr.Bytes }, return null;
{ "Ints", tr => tr.Ints }
};
foreach (var cat in categories)
{
Console.WriteLine($"-- {cat.Key} --");
// collect all workload keys seen in any round
var allKeys = new HashSet<string>();
foreach (var r in rounds)
{
foreach (var k in cat.Value(r).Keys) allKeys.Add(k);
}
foreach (var key in allKeys.OrderBy(k => k))
{
var txList = new List<long>();
var rxList = new List<long>();
foreach (var r in rounds)
{
if (cat.Value(r).TryGetValue(key, out var tup))
{
txList.Add(tup.Item1);
rxList.Add(tup.Item2);
}
}
if (txList.Count == 0)
{
Console.WriteLine($"{key}: no samples");
continue;
}
var sTx = StatsLongs(txList);
var sRx = StatsLongs(rxList);
Console.WriteLine($"{key}: TX avg={sTx.avg:0.##}, min={sTx.min}, max={sTx.max}, med={sTx.median:0.##} | RX avg={sRx.avg:0.##}, min={sRx.min}, max={sRx.max}, med={sRx.median:0.##}");
}
} }
} }
static int ReadIntArg(string[] args, string name, int defaultValue)
{
var raw = TryReadStringArg(args, name);
return raw == null ? defaultValue : int.Parse(raw);
}
static string ReadStringArg(string[] args, string name, string defaultValue)
=> TryReadStringArg(args, name) ?? defaultValue;
static string? TryReadStringArg(string[] args, string name)
{
var prefix = name + "=";
var inline = args.FirstOrDefault(x => x.StartsWith(prefix, StringComparison.OrdinalIgnoreCase));
if (inline != null)
return inline[prefix.Length..];
var index = Array.FindIndex(args, x => string.Equals(x, name, StringComparison.OrdinalIgnoreCase));
if (index >= 0 && index + 1 < args.Length)
return args[index + 1];
return null;
}
static bool HasArg(string[] args, string name)
=> args.Any(x => string.Equals(x, name, StringComparison.OrdinalIgnoreCase));
+178
View File
@@ -0,0 +1,178 @@
using Esiur.Data;
using Esiur.Resource;
using Google.Protobuf;
using System.Diagnostics;
using System.Text.Json;
using System.Text.Json.Serialization;
using EsiurModel = Esiur.Tests.RPC.EsiurServer;
using GrpcModel = Esiur.Tests.RPC.Client.Grpc;
using SharedModel = Esiur.Tests.RPC.Client.SharedModel;
namespace Esiur.Tests.RPC.Client;
public sealed record SerializationSample(
int Round,
int Seed,
string Protocol,
string Category,
string Workload,
long PayloadBytes,
double SerializeMs,
double DeserializeMs);
public static class SerializationExperiment
{
private static readonly JsonSerializerOptions JsonOptions = new()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull
};
public static List<SerializationSample> RunRound(
int round,
int seed,
Dictionary<string, EsiurModel.BusinessDocument[]> docsWorkloads,
Dictionary<string, byte[]> dataWorkloads,
Dictionary<string, int[]> intWorkloads,
int iterations)
{
if (iterations <= 0)
throw new ArgumentOutOfRangeException(nameof(iterations));
EsiurModel.Initialization.RegisterTypes(Warehouse.Default);
var samples = new List<SerializationSample>();
foreach (var workload in docsWorkloads)
{
var docs = workload.Value;
var grpcDocs = docs.Select(x => x.ToGrpc()).ToArray();
var sharedDocs = docs.Select(x => x.ToShared()).ToArray();
samples.Add(Measure(
round, seed, "esiur", "Docs", workload.Key, iterations,
() => Codec.Compose(docs, Warehouse.Default, null),
payload => GC.KeepAlive(Codec.ParseSync(payload, 0, Warehouse.Default).Item2)));
samples.Add(Measure(
round, seed, "grpc", "Docs", workload.Key, iterations,
() =>
{
var request = new GrpcModel.DocumentsRequest();
request.Docs.AddRange(grpcDocs);
return request.ToByteArray();
},
payload => GC.KeepAlive(GrpcModel.DocumentsRequest.Parser.ParseFrom(payload))));
samples.Add(Measure(
round, seed, "json", "Docs", workload.Key, iterations,
() => JsonSerializer.SerializeToUtf8Bytes(docs, JsonOptions),
payload => GC.KeepAlive(JsonSerializer.Deserialize<EsiurModel.BusinessDocument[]>(payload, JsonOptions))));
samples.Add(Measure(
round, seed, "signalr", "Docs", workload.Key, iterations,
() => JsonSerializer.SerializeToUtf8Bytes(sharedDocs, JsonOptions),
payload => GC.KeepAlive(JsonSerializer.Deserialize<SharedModel.BusinessDocument[]>(payload, JsonOptions))));
}
foreach (var workload in dataWorkloads)
{
var data = workload.Value;
samples.Add(Measure(
round, seed, "esiur", "Bytes", workload.Key, iterations,
() => Codec.Compose(data, Warehouse.Default, null),
payload => GC.KeepAlive(Codec.ParseSync(payload, 0, Warehouse.Default).Item2)));
samples.Add(Measure(
round, seed, "grpc", "Bytes", workload.Key, iterations,
() => new GrpcModel.BytesRequest { Data = ByteString.CopyFrom(data) }.ToByteArray(),
payload => GC.KeepAlive(GrpcModel.BytesRequest.Parser.ParseFrom(payload))));
samples.Add(Measure(
round, seed, "json", "Bytes", workload.Key, iterations,
() => JsonSerializer.SerializeToUtf8Bytes(data, JsonOptions),
payload => GC.KeepAlive(JsonSerializer.Deserialize<byte[]>(payload, JsonOptions))));
samples.Add(Measure(
round, seed, "signalr", "Bytes", workload.Key, iterations,
() => JsonSerializer.SerializeToUtf8Bytes(data, JsonOptions),
payload => GC.KeepAlive(JsonSerializer.Deserialize<byte[]>(payload, JsonOptions))));
}
foreach (var workload in intWorkloads)
{
var data = workload.Value;
samples.Add(Measure(
round, seed, "esiur", "Ints", workload.Key, iterations,
() => Codec.Compose(data, Warehouse.Default, null),
payload => GC.KeepAlive(Codec.ParseSync(payload, 0, Warehouse.Default).Item2)));
samples.Add(Measure(
round, seed, "grpc", "Ints", workload.Key, iterations,
() =>
{
var request = new GrpcModel.IntArrayRequest();
request.Array.AddRange(data);
return request.ToByteArray();
},
payload => GC.KeepAlive(GrpcModel.IntArrayRequest.Parser.ParseFrom(payload))));
samples.Add(Measure(
round, seed, "json", "Ints", workload.Key, iterations,
() => JsonSerializer.SerializeToUtf8Bytes(data, JsonOptions),
payload => GC.KeepAlive(JsonSerializer.Deserialize<int[]>(payload, JsonOptions))));
samples.Add(Measure(
round, seed, "signalr", "Ints", workload.Key, iterations,
() => JsonSerializer.SerializeToUtf8Bytes(data, JsonOptions),
payload => GC.KeepAlive(JsonSerializer.Deserialize<int[]>(payload, JsonOptions))));
}
return samples;
}
private static SerializationSample Measure(
int round,
int seed,
string protocol,
string category,
string workload,
int iterations,
Func<byte[]> serialize,
Action<byte[]> deserialize)
{
var payload = serialize();
deserialize(payload);
long payloadBytes = payload.Length;
long serializeTicks = 0;
long deserializeTicks = 0;
for (var i = 0; i < iterations; i++)
{
var started = Stopwatch.GetTimestamp();
payload = serialize();
serializeTicks += Stopwatch.GetTimestamp() - started;
payloadBytes = payload.Length;
started = Stopwatch.GetTimestamp();
deserialize(payload);
deserializeTicks += Stopwatch.GetTimestamp() - started;
}
return new SerializationSample(
round,
seed,
protocol,
category,
workload,
payloadBytes,
TicksToMs(serializeTicks) / iterations,
TicksToMs(deserializeTicks) / iterations);
}
private static double TicksToMs(long ticks)
=> ticks * 1000.0 / Stopwatch.Frequency;
}
+12 -11
View File
@@ -12,33 +12,34 @@ namespace Esiur.Tests.RPC.Client
// Generate random int array of given length and distribution // Generate random int array of given length and distribution
public static int[] GenerateInt32(int length, string pattern = "uniform", public static int[] GenerateInt32(int length, string pattern = "uniform",
int range = int.MaxValue) int range = int.MaxValue, Random? random = null)
{ {
var data = new int[length]; var data = new int[length];
var source = random ?? rng;
switch (pattern.ToLower()) switch (pattern.ToLower())
{ {
case "uniform": case "uniform":
// Random values in [-range, range] // Random values in [-range, range]
for (int i = 0; i < length; i++) for (int i = 0; i < length; i++)
data[i] = rng.Next(-range, range); data[i] = source.Next(-range, range);
break; break;
case "positive": case "positive":
for (int i = 0; i < length; i++) for (int i = 0; i < length; i++)
data[i] = rng.Next(0, range); data[i] = source.Next(0, range);
break; break;
case "negative": case "negative":
for (int i = 0; i < length; i++) for (int i = 0; i < length; i++)
data[i] = -rng.Next(0, range); data[i] = -source.Next(0, range);
break; break;
case "alternating": case "alternating":
for (int i = 0; i < length; i++) for (int i = 0; i < length; i++)
{ {
int val = rng.Next(0, range); int val = source.Next(0, range);
data[i] = (i % 2 == 0) ? val : -val; data[i] = (i % 2 == 0) ? val : -val;
} }
break; break;
@@ -46,13 +47,13 @@ namespace Esiur.Tests.RPC.Client
case "small": case "small":
// Focused on small magnitudes to test ZigZag fast path // Focused on small magnitudes to test ZigZag fast path
for (int i = 0; i < length; i++) for (int i = 0; i < length; i++)
data[i] = rng.Next(-64, 65); data[i] = source.Next(-64, 65);
break; break;
case "ascending": case "ascending":
{ {
int start = rng.Next(-range, range); int start = source.Next(-range, range);
for (int i = 0; i < length; i++) for (int i = 0; i < length; i++)
data[i] = start + i; data[i] = start + i;
} }
@@ -103,10 +104,10 @@ namespace Esiur.Tests.RPC.Client
var result = new Dictionary<string, int[]>(); var result = new Dictionary<string, int[]>();
var r = new Random(seed); var r = new Random(seed);
result.Add("uniform", GenerateInt32(1000, "uniform")); result.Add("uniform", GenerateInt32(1000, "uniform", random: r));
result.Add("small", GenerateInt32(1000, "small")); result.Add("small", GenerateInt32(1000, "small", random: r));
result.Add("alternating", GenerateInt32(1000, "alternating")); result.Add("alternating", GenerateInt32(1000, "alternating", random: r));
result.Add("ascending", GenerateInt32(1000, "ascending")); result.Add("ascending", GenerateInt32(1000, "ascending", random: r));
return result; return result;
+10 -7
View File
@@ -20,7 +20,10 @@ namespace Esiur.Tests.RPC.Client
public static async Task<TestResults> DoTest(string address, public static async Task<TestResults> DoTest(string address,
Dictionary<string, SharedModel.BusinessDocument[]> docsWorkloads, Dictionary<string, SharedModel.BusinessDocument[]> docsWorkloads,
Dictionary<string, byte[]> dataWorkloads, Dictionary<string, byte[]> dataWorkloads,
Dictionary<string, int[]> intWorkloads) Dictionary<string, int[]> intWorkloads,
int warmupDelayMs = 3000,
int postHandshakeDelayMs = 2000,
int sampleDelayMs = 3000)
{ {
var rt = new TestResults(); var rt = new TestResults();
@@ -39,13 +42,13 @@ namespace Esiur.Tests.RPC.Client
await service.StartAsync(); await service.StartAsync();
Thread.Sleep(3000); Thread.Sleep(warmupDelayMs);
var (tx, rx, ctx, crx) = mon.GetDiff(0, 0); var (tx, rx, ctx, crx) = mon.GetDiff(0, 0);
Console.WriteLine($"Handshake {ctx}/{crx}"); Console.WriteLine($"Handshake {ctx}/{crx}");
await Task.Delay(2000); await Task.Delay(postHandshakeDelayMs);
foreach (var w in docsWorkloads) foreach (var w in docsWorkloads)
{ {
@@ -58,7 +61,7 @@ namespace Esiur.Tests.RPC.Client
// throw new Exception("No match"); // throw new Exception("No match");
await Task.Delay(3000); await Task.Delay(sampleDelayMs);
(tx, rx, ctx, crx) = mon.GetDiff(tx, rx); (tx, rx, ctx, crx) = mon.GetDiff(tx, rx);
Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}"); Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}");
//Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}"); //Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}");
@@ -79,7 +82,7 @@ namespace Esiur.Tests.RPC.Client
// throw new Exception("No match"); // throw new Exception("No match");
await Task.Delay(3000); await Task.Delay(sampleDelayMs);
(tx, rx, ctx, crx) = mon.GetDiff(tx, rx); (tx, rx, ctx, crx) = mon.GetDiff(tx, rx);
Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}"); Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}");
//Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}"); //Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}");
@@ -99,7 +102,7 @@ namespace Esiur.Tests.RPC.Client
// throw new Exception("No match"); // throw new Exception("No match");
await Task.Delay(3000); await Task.Delay(sampleDelayMs);
(tx, rx, ctx, crx) = mon.GetDiff(tx, rx); (tx, rx, ctx, crx) = mon.GetDiff(tx, rx);
Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}"); Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}");
//Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}"); //Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}");
@@ -108,7 +111,7 @@ namespace Esiur.Tests.RPC.Client
} }
await Task.Delay(3000); await Task.Delay(sampleDelayMs);
(tx, rx) = mon.GetTotals(); (tx, rx) = mon.GetTotals();
Console.WriteLine($"Transfer {tx}/{rx}"); Console.WriteLine($"Transfer {tx}/{rx}");
+10 -7
View File
@@ -12,7 +12,10 @@ public class ThriftTest
public static async Task<TestResults> DoTest(string host, int port, public static async Task<TestResults> DoTest(string host, int port,
Dictionary<string, BusinessDocument[]> docsWorkloads, Dictionary<string, BusinessDocument[]> docsWorkloads,
Dictionary<string, byte[]> dataWorkloads, Dictionary<string, byte[]> dataWorkloads,
Dictionary<string, int[]> intWorkloads) Dictionary<string, int[]> intWorkloads,
int warmupDelayMs = 3000,
int postHandshakeDelayMs = 2000,
int sampleDelayMs = 3000)
{ {
var rt = new TestResults(); var rt = new TestResults();
@@ -28,14 +31,14 @@ public class ThriftTest
var service = new EchoService.Client(proto); var service = new EchoService.Client(proto);
Thread.Sleep(3000); Thread.Sleep(warmupDelayMs);
var (tx, rx, ctx, crx) = mon.GetDiff(0, 0); var (tx, rx, ctx, crx) = mon.GetDiff(0, 0);
Console.WriteLine($"Handshake {ctx}/{crx}"); Console.WriteLine($"Handshake {ctx}/{crx}");
await Task.Delay(2000); await Task.Delay(postHandshakeDelayMs);
foreach (var w in docsWorkloads) foreach (var w in docsWorkloads)
{ {
@@ -48,7 +51,7 @@ public class ThriftTest
// throw new Exception("No match"); // throw new Exception("No match");
await Task.Delay(3000); await Task.Delay(sampleDelayMs);
(tx, rx, ctx, crx) = mon.GetDiff(tx, rx); (tx, rx, ctx, crx) = mon.GetDiff(tx, rx);
Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}"); Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}");
@@ -68,7 +71,7 @@ public class ThriftTest
throw new Exception("No match"); throw new Exception("No match");
await Task.Delay(3000); await Task.Delay(sampleDelayMs);
(tx, rx, ctx, crx) = mon.GetDiff(tx, rx); (tx, rx, ctx, crx) = mon.GetDiff(tx, rx);
Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}"); Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}");
//Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}"); //Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}");
@@ -89,7 +92,7 @@ public class ThriftTest
throw new Exception("No match"); throw new Exception("No match");
await Task.Delay(3000); await Task.Delay(sampleDelayMs);
(tx, rx, ctx, crx) = mon.GetDiff(tx, rx); (tx, rx, ctx, crx) = mon.GetDiff(tx, rx);
Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}"); Console.WriteLine($", {tx}/{rx}, {ctx}/{crx}");
//Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}"); //Console.WriteLine($"Socket {sock.BytesSent}/{sock.BytesReceived}");
@@ -98,7 +101,7 @@ public class ThriftTest
} }
await Task.Delay(3000); await Task.Delay(sampleDelayMs);
(tx, rx) = mon.GetTotals(); (tx, rx) = mon.GetTotals();
Console.WriteLine($"Transfer {tx}/{rx}"); Console.WriteLine($"Transfer {tx}/{rx}");
+11 -5
View File
@@ -11,8 +11,9 @@ using System.Text;
ushort port = 5005; ushort port = 5005;
if (args.Count() > 0) var portArg = args.FirstOrDefault(x => ushort.TryParse(x, out _));
port = ushort.Parse(args[0]); if (portArg != null)
port = ushort.Parse(portArg);
Console.WriteLine($"Esiur server listening on port {port}..."); Console.WriteLine($"Esiur server listening on port {port}...");
@@ -27,7 +28,12 @@ await wh.Open();
Console.WriteLine("Open"); Console.WriteLine("Open");
if (!Directory.Exists("template")) if (args.Contains("--generate-client"))
Directory.CreateDirectory("template"); {
if (!Directory.Exists("template"))
Directory.CreateDirectory("template");
TypeDefGenerator.GetTypes("ep://localhost:5005/sys/service", "template"); TypeDefGenerator.GetTypes($"ep://localhost:{port}/sys/service", "template");
}
await Task.Delay(Timeout.Infinite);