2
0
mirror of https://github.com/esiur/esiur-dotnet.git synced 2026-04-29 06:48:41 +00:00

TryRegisterTypeDef

This commit is contained in:
2026-04-06 03:08:19 +03:00
parent f225e29f87
commit 9f322e5be5
6 changed files with 26 additions and 110 deletions
+1 -42
View File
@@ -35,15 +35,9 @@ public class NetworkBuffer
byte[] data; byte[] data;
uint neededDataLength = 0; uint neededDataLength = 0;
//bool trim;
object syncLock = new object(); object syncLock = new object();
//public object SyncLock
//{
// get { return syncLock; }
//}
public NetworkBuffer() public NetworkBuffer()
{ {
data = new byte[0]; data = new byte[0];
@@ -66,26 +60,14 @@ public class NetworkBuffer
} }
//public void HoldForAtLeast(byte[] src, uint offset, uint size, uint needed)
//{
// HoldFor(src, offset, size, needed);
// //trim = false;
//}
//public void HoldForAtLeast(byte[] src, uint needed)
//{
// HoldForAtLeast(src, 0, (uint)src.Length, needed);
//}
public void HoldForNextWrite(byte[] src) public void HoldForNextWrite(byte[] src)
{ {
//HoldForAtLeast(src, (uint)src.Length + 1);
HoldFor(src, (uint)src.Length + 1); HoldFor(src, (uint)src.Length + 1);
} }
public void HoldForNextWrite(byte[] src, uint offset, uint size) public void HoldForNextWrite(byte[] src, uint offset, uint size)
{ {
//HoldForAtLeast(src, offset, size, size + 1);
HoldFor(src, offset, size, size + 1); HoldFor(src, offset, size, size + 1);
} }
@@ -100,10 +82,6 @@ public class NetworkBuffer
//trim = true; //trim = true;
data = DC.Combine(src, offset, size, data, 0, (uint)data.Length); data = DC.Combine(src, offset, size, data, 0, (uint)data.Length);
neededDataLength = needed; neededDataLength = needed;
// Console.WriteLine("Hold StackTrace: '{0}'", Environment.StackTrace);
//Console.WriteLine("Holded {0} {1} {2} {3} - {4}", offset, size, needed, data.Length, GetHashCode());
} }
} }
@@ -112,17 +90,13 @@ public class NetworkBuffer
HoldFor(src, 0, (uint)src.Length, needed); HoldFor(src, 0, (uint)src.Length, needed);
} }
public bool Protect(byte[] data, uint offset, uint needed)//, bool exact = false) public bool Protect(byte[] data, uint offset, uint needed)
{ {
uint dataLength = (uint)data.Length - offset; uint dataLength = (uint)data.Length - offset;
// protection // protection
if (dataLength < needed) if (dataLength < needed)
{ {
//if (exact)
// HoldFor(data, offset, dataLength, needed);
//else
//HoldForAtLeast(data, offset, dataLength, needed);
HoldFor(data, offset, dataLength, needed); HoldFor(data, offset, dataLength, needed);
return true; return true;
} }
@@ -137,9 +111,6 @@ public class NetworkBuffer
public void Write(byte[] src, uint offset, uint length) public void Write(byte[] src, uint offset, uint length)
{ {
//if (data.Length > 0)
// Console.WriteLine();
lock (syncLock) lock (syncLock)
DC.Append(ref data, src, offset, length); DC.Append(ref data, src, offset, length);
} }
@@ -174,23 +145,11 @@ public class NetworkBuffer
} }
else else
{ {
//Console.WriteLine("P STATE:" + data.Length + " " + neededDataLength);
if (data.Length >= neededDataLength) if (data.Length >= neededDataLength)
{ {
//Console.WriteLine("data.Length >= neededDataLength " + data.Length + " >= " + neededDataLength + " " + trim);
//if (trim)
//{
// rt = DC.Clip(data, 0, neededDataLength);
// data = DC.Clip(data, neededDataLength, (uint)data.Length - neededDataLength);
//}
//else
//{
// return all data
rt = data; rt = data;
data = new byte[0]; data = new byte[0];
//}
neededDataLength = 0; neededDataLength = 0;
return rt; return rt;
-62
View File
@@ -37,23 +37,15 @@ namespace Esiur.Net;
public abstract class NetworkServer<TConnection> : IDestructible where TConnection : NetworkConnection, new() public abstract class NetworkServer<TConnection> : IDestructible where TConnection : NetworkConnection, new()
{ {
//private bool isRunning;
private Sockets.ISocket listener; private Sockets.ISocket listener;
public AutoList<TConnection, NetworkServer<TConnection>> Connections { get; internal set; } public AutoList<TConnection, NetworkServer<TConnection>> Connections { get; internal set; }
private Thread thread; private Thread thread;
//protected abstract void DataReceived(TConnection sender, NetworkBuffer data);
//protected abstract void ClientConnected(TConnection sender);
//protected abstract void ClientDisconnected(TConnection sender);
private Timer timer; private Timer timer;
//public KeyList<string, TSession> Sessions = new KeyList<string, TSession>();
public event DestroyedEvent OnDestroy; public event DestroyedEvent OnDestroy;
//public AutoList<TConnection, NetworkServer<TConnection>> Connections => connections;
private void MinuteThread(object state) private void MinuteThread(object state)
{ {
@@ -101,12 +93,6 @@ public abstract class NetworkServer<TConnection> : IDestructible where TConnecti
listener = socket; listener = socket;
// Start accepting
//var r = listener.Accept();
//r.Then(NewConnection);
//r.timeout?.Dispose();
//var rt = listener.Accept().Then()
thread = new Thread(new ThreadStart(() => thread = new Thread(new ThreadStart(() =>
{ {
while (true) while (true)
@@ -121,8 +107,6 @@ public abstract class NetworkServer<TConnection> : IDestructible where TConnecti
return; return;
} }
//Console.WriteLine("New Socket ... " + DateTime.Now);
var c = new TConnection(); var c = new TConnection();
c.Assign(s); c.Assign(s);
@@ -139,9 +123,6 @@ public abstract class NetworkServer<TConnection> : IDestructible where TConnecti
s.Begin(); s.Begin();
// Accept more
//listener.Accept().Then(NewConnection);
} }
catch (Exception ex) catch (Exception ex)
{ {
@@ -182,15 +163,6 @@ public abstract class NetworkServer<TConnection> : IDestructible where TConnecti
port = listener.LocalEndPoint.Port; port = listener.LocalEndPoint.Port;
listener.Close(); listener.Close();
} }
// wait until the listener stops
//while (isRunning)
//{
// Thread.Sleep(100);
//}
//Console.WriteLine("Listener stopped");
var cons = Connections.ToArray(); var cons = Connections.ToArray();
//lock (connections.SyncRoot) //lock (connections.SyncRoot)
@@ -199,14 +171,6 @@ public abstract class NetworkServer<TConnection> : IDestructible where TConnecti
con.Close(); con.Close();
//} //}
//Console.WriteLine("Sockets Closed");
//while (connections.Count > 0)
//{
// Console.WriteLine("Waiting... " + connections.Count);
// Thread.Sleep(1000);
//}
} }
finally finally
{ {
@@ -233,35 +197,9 @@ public abstract class NetworkServer<TConnection> : IDestructible where TConnecti
get get
{ {
return listener.State == SocketState.Listening; return listener.State == SocketState.Listening;
//isRunning;
} }
} }
//public void OnDataReceived(ISocket sender, NetworkBuffer data)
//{
// DataReceived((TConnection)sender, data);
//}
//public void OnClientConnect(ISocket sender)
//{
// if (sender == null)
// return;
// if (sender.RemoteEndPoint == null || sender.LocalEndPoint == null)
// { }
// //Console.WriteLine("NULL");
// else
// Global.Log("Connections", LogType.Debug, sender.RemoteEndPoint.Address.ToString()
// + "->" + sender.LocalEndPoint.Port + " at " + DateTime.UtcNow.ToString("d")
// + " " + DateTime.UtcNow.ToString("d"), false);
// // Console.WriteLine("Connected " + sender.RemoteEndPoint.ToString());
// ClientConnected((TConnection)sender);
//}
//public void OnClientClose(ISocket sender)
//{
//}
public void Destroy() public void Destroy()
@@ -1742,7 +1742,7 @@ partial class EpConnection
var tt = TypeDef.Parse((byte[])result); var tt = TypeDef.Parse((byte[])result);
typeDefsByIdRequests.Remove(typeId); typeDefsByIdRequests.Remove(typeId);
typeDefs.Add(tt.Id, tt); typeDefs.Add(tt.Id, tt);
Instance.Warehouse.RegisterTypeDef(tt); Instance.Warehouse.TryRegisterTypeDef(tt);
reply.Trigger(tt); reply.Trigger(tt);
}).Error((ex) => }).Error((ex) =>
+13
View File
@@ -519,6 +519,19 @@ public class Warehouse
} }
} }
public bool TryRegisterTypeDef(TypeDef typeDef)
{
lock (typeDefsLock)
{
if (typeDefs[typeDef.Kind].ContainsKey(typeDef.Id))
return false;
typeDefs[typeDef.Kind][typeDef.Id] = typeDef;
return true;
}
}
/// <summary> /// <summary>
/// Get a TypeDef by type from the warehouse. If not in the warehouse, a new TypeDef is created and added to the warehouse. /// Get a TypeDef by type from the warehouse. If not in the warehouse, a new TypeDef is created and added to the warehouse.
@@ -21,7 +21,6 @@ using Esiur.Protocol;
using Esiur.Resource; using Esiur.Resource;
using Esiur.Stores; using Esiur.Stores;
using System.Diagnostics; using System.Diagnostics;
var mode = GetArg(args, "--mode", "both"); var mode = GetArg(args, "--mode", "both");
var host = GetArg(args, "--host", "127.0.0.1"); var host = GetArg(args, "--host", "127.0.0.1");
var port = int.Parse(GetArg(args, "--port", "10902")); var port = int.Parse(GetArg(args, "--port", "10902"));
@@ -83,6 +82,8 @@ for (int round = 0; round < rounds; round++)
var roundSw = Stopwatch.StartNew(); var roundSw = Stopwatch.StartNew();
var connnection = await clientWh.Get<EpConnection>($"ep://{host}:{port}");
// Fire all attachments simultaneously // Fire all attachments simultaneously
var tasks = targets.Select((resourceIdx, taskIdx) => Task.Run(async () => var tasks = targets.Select((resourceIdx, taskIdx) => Task.Run(async () =>
{ {
@@ -90,8 +91,7 @@ for (int round = 0; round < rounds; round++)
using var cts = new CancellationTokenSource(timeoutMs); using var cts = new CancellationTokenSource(timeoutMs);
try try
{ {
var proxy = await clientWh.Get<IResource>( var proxy = await connnection.Get($"sys/sensor_{resourceIdx}");
$"ep://{host}:{port}/sys/sensor_{resourceIdx}");
sw.Stop(); sw.Stop();
latencies[taskIdx] = sw.Elapsed.TotalMilliseconds; latencies[taskIdx] = sw.Elapsed.TotalMilliseconds;
@@ -11,11 +11,12 @@
using Esiur.Protocol; using Esiur.Protocol;
using Esiur.Resource; using Esiur.Resource;
using System.Diagnostics; using System.Diagnostics;
using System.Text.RegularExpressions;
var host = GetArg(args, "--host", "127.0.0.1"); var host = GetArg(args, "--host", "127.0.0.1");
var port = int.Parse(GetArg(args, "--port", "10901")); var port = int.Parse(GetArg(args, "--port", "10901"));
var resourceCount = int.Parse(GetArg(args, "--resources", "10000")); var resourceCount = int.Parse(GetArg(args, "--resources", "10000"));
var batchSize = int.Parse(GetArg(args, "--batch", "100")); var batchSize = int.Parse(GetArg(args, "--batch", "10000"));
Console.WriteLine($"[Client-T2] Connecting to {host}:{port}, resources={resourceCount}"); Console.WriteLine($"[Client-T2] Connecting to {host}:{port}, resources={resourceCount}");
@@ -29,6 +30,7 @@ var proxies = new IResource[resourceCount];
// --- Attach in batches to avoid overwhelming the runtime ------------- // --- Attach in batches to avoid overwhelming the runtime -------------
var totalSw = Stopwatch.StartNew(); var totalSw = Stopwatch.StartNew();
for (int batch = 0; batch < resourceCount; batch += batchSize) for (int batch = 0; batch < resourceCount; batch += batchSize)
{ {
@@ -41,8 +43,12 @@ for (int batch = 0; batch < resourceCount; batch += batchSize)
batchTasks[i - batch] = Task.Run(async () => batchTasks[i - batch] = Task.Run(async () =>
{ {
var sw = Stopwatch.StartNew(); var sw = Stopwatch.StartNew();
Console.WriteLine(capturedI);
proxies[capturedI] = await connnection.Get($"sys/sensor_{capturedI}"); proxies[capturedI] = await connnection.Get($"sys/sensor_{capturedI}");
Console.WriteLine(proxies[capturedI].Instance.Link);
sw.Stop(); sw.Stop();
lock (attachLatencies) lock (attachLatencies)
@@ -77,7 +83,7 @@ Console.WriteLine("[Client-T2] Measuring notification latency under full resourc
long received = 0; long received = 0;
double sumLatencyMs = 0; double sumLatencyMs = 0;
for (int i = 0; i < Math.Min(500, resourceCount); i++) for (int i = 0; i < resourceCount; i++)
{ {
int capturedI = i; int capturedI = i;
proxies[capturedI].Instance.PropertyModified += (PropertyModificationInfo data) => proxies[capturedI].Instance.PropertyModified += (PropertyModificationInfo data) =>