diff --git a/Libraries/Esiur/Net/NetworkBuffer.cs b/Libraries/Esiur/Net/NetworkBuffer.cs index 54a32b8..d525473 100644 --- a/Libraries/Esiur/Net/NetworkBuffer.cs +++ b/Libraries/Esiur/Net/NetworkBuffer.cs @@ -35,15 +35,9 @@ public class NetworkBuffer byte[] data; uint neededDataLength = 0; - //bool trim; object syncLock = new object(); - //public object SyncLock - //{ - // get { return syncLock; } - //} - public NetworkBuffer() { data = new byte[0]; @@ -66,26 +60,14 @@ public class NetworkBuffer } - //public void HoldForAtLeast(byte[] src, uint offset, uint size, uint needed) - //{ - // HoldFor(src, offset, size, needed); - // //trim = false; - //} - - //public void HoldForAtLeast(byte[] src, uint needed) - //{ - // HoldForAtLeast(src, 0, (uint)src.Length, needed); - //} public void HoldForNextWrite(byte[] src) { - //HoldForAtLeast(src, (uint)src.Length + 1); HoldFor(src, (uint)src.Length + 1); } public void HoldForNextWrite(byte[] src, uint offset, uint size) { - //HoldForAtLeast(src, offset, size, size + 1); HoldFor(src, offset, size, size + 1); } @@ -100,10 +82,6 @@ public class NetworkBuffer //trim = true; data = DC.Combine(src, offset, size, data, 0, (uint)data.Length); neededDataLength = needed; - - // Console.WriteLine("Hold StackTrace: '{0}'", Environment.StackTrace); - - //Console.WriteLine("Holded {0} {1} {2} {3} - {4}", offset, size, needed, data.Length, GetHashCode()); } } @@ -112,17 +90,13 @@ public class NetworkBuffer HoldFor(src, 0, (uint)src.Length, needed); } - public bool Protect(byte[] data, uint offset, uint needed)//, bool exact = false) + public bool Protect(byte[] data, uint offset, uint needed) { uint dataLength = (uint)data.Length - offset; // protection if (dataLength < needed) { - //if (exact) - // HoldFor(data, offset, dataLength, needed); - //else - //HoldForAtLeast(data, offset, dataLength, needed); HoldFor(data, offset, dataLength, needed); return true; } @@ -137,9 +111,6 @@ public class NetworkBuffer public void Write(byte[] src, uint offset, uint length) { - //if (data.Length > 0) - // Console.WriteLine(); - lock (syncLock) DC.Append(ref data, src, offset, length); } @@ -174,23 +145,11 @@ public class NetworkBuffer } else { - //Console.WriteLine("P STATE:" + data.Length + " " + neededDataLength); if (data.Length >= neededDataLength) { - //Console.WriteLine("data.Length >= neededDataLength " + data.Length + " >= " + neededDataLength + " " + trim); - - //if (trim) - //{ - // rt = DC.Clip(data, 0, neededDataLength); - // data = DC.Clip(data, neededDataLength, (uint)data.Length - neededDataLength); - //} - //else - //{ - // return all data rt = data; data = new byte[0]; - //} neededDataLength = 0; return rt; diff --git a/Libraries/Esiur/Net/NetworkServer.cs b/Libraries/Esiur/Net/NetworkServer.cs index 0f763a0..dead829 100644 --- a/Libraries/Esiur/Net/NetworkServer.cs +++ b/Libraries/Esiur/Net/NetworkServer.cs @@ -37,23 +37,15 @@ namespace Esiur.Net; public abstract class NetworkServer : IDestructible where TConnection : NetworkConnection, new() { - //private bool isRunning; private Sockets.ISocket listener; public AutoList> Connections { get; internal set; } private Thread thread; - //protected abstract void DataReceived(TConnection sender, NetworkBuffer data); - //protected abstract void ClientConnected(TConnection sender); - //protected abstract void ClientDisconnected(TConnection sender); - - private Timer timer; - //public KeyList Sessions = new KeyList(); public event DestroyedEvent OnDestroy; - //public AutoList> Connections => connections; private void MinuteThread(object state) { @@ -101,12 +93,6 @@ public abstract class NetworkServer : IDestructible where TConnecti listener = socket; - // Start accepting - //var r = listener.Accept(); - //r.Then(NewConnection); - //r.timeout?.Dispose(); - - //var rt = listener.Accept().Then() thread = new Thread(new ThreadStart(() => { while (true) @@ -121,8 +107,6 @@ public abstract class NetworkServer : IDestructible where TConnecti return; } - //Console.WriteLine("New Socket ... " + DateTime.Now); - var c = new TConnection(); c.Assign(s); @@ -139,9 +123,6 @@ public abstract class NetworkServer : IDestructible where TConnecti s.Begin(); - // Accept more - //listener.Accept().Then(NewConnection); - } catch (Exception ex) { @@ -182,15 +163,6 @@ public abstract class NetworkServer : IDestructible where TConnecti port = listener.LocalEndPoint.Port; listener.Close(); } - - // wait until the listener stops - //while (isRunning) - //{ - // Thread.Sleep(100); - //} - - //Console.WriteLine("Listener stopped"); - var cons = Connections.ToArray(); //lock (connections.SyncRoot) @@ -199,14 +171,6 @@ public abstract class NetworkServer : IDestructible where TConnecti con.Close(); //} - //Console.WriteLine("Sockets Closed"); - - //while (connections.Count > 0) - //{ - // Console.WriteLine("Waiting... " + connections.Count); - // Thread.Sleep(1000); - //} - } finally { @@ -233,35 +197,9 @@ public abstract class NetworkServer : IDestructible where TConnecti get { return listener.State == SocketState.Listening; - //isRunning; } } - //public void OnDataReceived(ISocket sender, NetworkBuffer data) - //{ - // DataReceived((TConnection)sender, data); - //} - - //public void OnClientConnect(ISocket sender) - //{ - // if (sender == null) - // return; - - // if (sender.RemoteEndPoint == null || sender.LocalEndPoint == null) - // { } - // //Console.WriteLine("NULL"); - // else - // Global.Log("Connections", LogType.Debug, sender.RemoteEndPoint.Address.ToString() - // + "->" + sender.LocalEndPoint.Port + " at " + DateTime.UtcNow.ToString("d") - // + " " + DateTime.UtcNow.ToString("d"), false); - - // // Console.WriteLine("Connected " + sender.RemoteEndPoint.ToString()); - // ClientConnected((TConnection)sender); - //} - - //public void OnClientClose(ISocket sender) - //{ - //} public void Destroy() diff --git a/Libraries/Esiur/Protocol/EpConnectionProtocol.cs b/Libraries/Esiur/Protocol/EpConnectionProtocol.cs index 4d9098d..bfcfd7d 100644 --- a/Libraries/Esiur/Protocol/EpConnectionProtocol.cs +++ b/Libraries/Esiur/Protocol/EpConnectionProtocol.cs @@ -1742,7 +1742,7 @@ partial class EpConnection var tt = TypeDef.Parse((byte[])result); typeDefsByIdRequests.Remove(typeId); typeDefs.Add(tt.Id, tt); - Instance.Warehouse.RegisterTypeDef(tt); + Instance.Warehouse.TryRegisterTypeDef(tt); reply.Trigger(tt); }).Error((ex) => diff --git a/Libraries/Esiur/Resource/Warehouse.cs b/Libraries/Esiur/Resource/Warehouse.cs index 316a283..f538a12 100644 --- a/Libraries/Esiur/Resource/Warehouse.cs +++ b/Libraries/Esiur/Resource/Warehouse.cs @@ -519,6 +519,19 @@ public class Warehouse } } + public bool TryRegisterTypeDef(TypeDef typeDef) + { + lock (typeDefsLock) + { + if (typeDefs[typeDef.Kind].ContainsKey(typeDef.Id)) + return false; + + typeDefs[typeDef.Kind][typeDef.Id] = typeDef; + + return true; + } + } + /// /// Get a TypeDef by type from the warehouse. If not in the warehouse, a new TypeDef is created and added to the warehouse. diff --git a/Tests/Distribution/ConcurrentAttach/Program.cs b/Tests/Distribution/ConcurrentAttach/Program.cs index df0f7cc..d5a335f 100644 --- a/Tests/Distribution/ConcurrentAttach/Program.cs +++ b/Tests/Distribution/ConcurrentAttach/Program.cs @@ -21,7 +21,6 @@ using Esiur.Protocol; using Esiur.Resource; using Esiur.Stores; using System.Diagnostics; - var mode = GetArg(args, "--mode", "both"); var host = GetArg(args, "--host", "127.0.0.1"); var port = int.Parse(GetArg(args, "--port", "10902")); @@ -83,6 +82,8 @@ for (int round = 0; round < rounds; round++) var roundSw = Stopwatch.StartNew(); + var connnection = await clientWh.Get($"ep://{host}:{port}"); + // Fire all attachments simultaneously var tasks = targets.Select((resourceIdx, taskIdx) => Task.Run(async () => { @@ -90,8 +91,7 @@ for (int round = 0; round < rounds; round++) using var cts = new CancellationTokenSource(timeoutMs); try { - var proxy = await clientWh.Get( - $"ep://{host}:{port}/sys/sensor_{resourceIdx}"); + var proxy = await connnection.Get($"sys/sensor_{resourceIdx}"); sw.Stop(); latencies[taskIdx] = sw.Elapsed.TotalMilliseconds; diff --git a/Tests/Distribution/ResourceCount/Client/Program.cs b/Tests/Distribution/ResourceCount/Client/Program.cs index 8e28b92..4d92958 100644 --- a/Tests/Distribution/ResourceCount/Client/Program.cs +++ b/Tests/Distribution/ResourceCount/Client/Program.cs @@ -11,11 +11,12 @@ using Esiur.Protocol; using Esiur.Resource; using System.Diagnostics; +using System.Text.RegularExpressions; var host = GetArg(args, "--host", "127.0.0.1"); var port = int.Parse(GetArg(args, "--port", "10901")); var resourceCount = int.Parse(GetArg(args, "--resources", "10000")); -var batchSize = int.Parse(GetArg(args, "--batch", "100")); +var batchSize = int.Parse(GetArg(args, "--batch", "10000")); Console.WriteLine($"[Client-T2] Connecting to {host}:{port}, resources={resourceCount}"); @@ -29,6 +30,7 @@ var proxies = new IResource[resourceCount]; // --- Attach in batches to avoid overwhelming the runtime ------------- var totalSw = Stopwatch.StartNew(); + for (int batch = 0; batch < resourceCount; batch += batchSize) { @@ -41,8 +43,12 @@ for (int batch = 0; batch < resourceCount; batch += batchSize) batchTasks[i - batch] = Task.Run(async () => { var sw = Stopwatch.StartNew(); + + Console.WriteLine(capturedI); proxies[capturedI] = await connnection.Get($"sys/sensor_{capturedI}"); + Console.WriteLine(proxies[capturedI].Instance.Link); + sw.Stop(); lock (attachLatencies) @@ -77,7 +83,7 @@ Console.WriteLine("[Client-T2] Measuring notification latency under full resourc long received = 0; double sumLatencyMs = 0; -for (int i = 0; i < Math.Min(500, resourceCount); i++) +for (int i = 0; i < resourceCount; i++) { int capturedI = i; proxies[capturedI].Instance.PropertyModified += (PropertyModificationInfo data) =>