From 7b5201418f13a8f765e4e55260b2ceba59cf1690 Mon Sep 17 00:00:00 2001 From: ahmed Date: Sun, 5 Apr 2026 22:28:11 +0300 Subject: [PATCH] new TcpSocket --- Libraries/Esiur/Esiur.csproj | 2 + Libraries/Esiur/Net/Http/HttpServer.cs | 2 +- Libraries/Esiur/Net/Sockets/TCPSocket.cs | 20 +- Libraries/Esiur/Net/Sockets/TcpSocket2.cs | 614 ++++++++++++++++++++++ Libraries/Esiur/Net/Tcp/TcpServer.cs | 6 +- Libraries/Esiur/Protocol/EpConnection.cs | 2 +- Libraries/Esiur/Protocol/EpServer.cs | 6 +- 7 files changed, 634 insertions(+), 18 deletions(-) create mode 100644 Libraries/Esiur/Net/Sockets/TcpSocket2.cs diff --git a/Libraries/Esiur/Esiur.csproj b/Libraries/Esiur/Esiur.csproj index d7f5cf2..41d23b6 100644 --- a/Libraries/Esiur/Esiur.csproj +++ b/Libraries/Esiur/Esiur.csproj @@ -62,6 +62,7 @@ + @@ -78,6 +79,7 @@ + diff --git a/Libraries/Esiur/Net/Http/HttpServer.cs b/Libraries/Esiur/Net/Http/HttpServer.cs index 5f6453b..ecfc6b4 100644 --- a/Libraries/Esiur/Net/Http/HttpServer.cs +++ b/Libraries/Esiur/Net/Http/HttpServer.cs @@ -314,7 +314,7 @@ public class HttpServer : NetworkServer, IResource if (SSL) listener = new SSLSocket(new IPEndPoint(ipAdd, Port), new X509Certificate2(Certificate)); else - listener = new TCPSocket(new IPEndPoint(ipAdd, Port)); + listener = new TcpSocket(new IPEndPoint(ipAdd, Port)); Start(listener); } diff --git a/Libraries/Esiur/Net/Sockets/TCPSocket.cs b/Libraries/Esiur/Net/Sockets/TCPSocket.cs index 9e2574f..e07d2ee 100644 --- a/Libraries/Esiur/Net/Sockets/TCPSocket.cs +++ b/Libraries/Esiur/Net/Sockets/TCPSocket.cs @@ -36,7 +36,7 @@ using System.Threading.Tasks; using Esiur.Data; namespace Esiur.Net.Sockets; -public class TCPSocket : ISocket +public class TcpSocket : ISocket { public INetworkReceiver Receiver { get; set; } @@ -111,7 +111,7 @@ public class TCPSocket : ISocket private static void ReceiveCallback(IAsyncResult ar) { - var socket = ar.AsyncState as TCPSocket; + var socket = ar.AsyncState as TcpSocket; try { @@ -185,7 +185,7 @@ public class TCPSocket : ISocket get { return (IPEndPoint)sock.LocalEndPoint; } } - public TCPSocket() + public TcpSocket() { sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, @@ -195,7 +195,7 @@ public class TCPSocket : ISocket } - public TCPSocket(string hostname, ushort port) + public TcpSocket(string hostname, ushort port) { // create the socket sock = new Socket(AddressFamily.InterNetwork, @@ -210,7 +210,7 @@ public class TCPSocket : ISocket } - public TCPSocket(IPEndPoint localEndPoint) + public TcpSocket(IPEndPoint localEndPoint) { // create the socket sock = new Socket(AddressFamily.InterNetwork, @@ -245,7 +245,7 @@ public class TCPSocket : ISocket } - public TCPSocket(Socket socket) + public TcpSocket(Socket socket) { sock = socket; receiveBuffer = new byte[sock.ReceiveBufferSize]; @@ -330,7 +330,7 @@ public class TCPSocket : ISocket } - private static void Flush(TCPSocket socket) + private static void Flush(TcpSocket socket) { lock (socket.sendLock) { @@ -386,7 +386,7 @@ public class TCPSocket : ISocket try { - var socket = (TCPSocket)ar.AsyncState; + var socket = (TcpSocket)ar.AsyncState; socket.sock?.EndSend(ar); Flush(socket); @@ -429,7 +429,7 @@ public class TCPSocket : ISocket try { var s = sock.Accept(); - return new TCPSocket(s); + return new TcpSocket(s); } catch { @@ -443,7 +443,7 @@ public class TCPSocket : ISocket try { var s = await sock.AcceptAsync(); - return new TCPSocket(s); + return new TcpSocket(s); } catch { diff --git a/Libraries/Esiur/Net/Sockets/TcpSocket2.cs b/Libraries/Esiur/Net/Sockets/TcpSocket2.cs new file mode 100644 index 0000000..060588c --- /dev/null +++ b/Libraries/Esiur/Net/Sockets/TcpSocket2.cs @@ -0,0 +1,614 @@ +using System; +using System.Collections.Generic; +using System.Net; +using System.Net.Sockets; +using System.Threading.Tasks; +using Esiur.Core; +using Esiur.Data; +using Esiur.Misc; +using Esiur.Resource; + +namespace Esiur.Net.Sockets; + +public class TcpSocket : ISocket +{ + private sealed class PendingSend + { + public byte[] Buffer; + public int Offset; + public int Count; + public AsyncReply Reply; + } + + public INetworkReceiver Receiver { get; set; } + public event DestroyedEvent OnDestroy; + + private readonly Socket sock; + private readonly byte[] receiveBuffer; + private readonly NetworkBuffer receiveNetworkBuffer = new NetworkBuffer(); + + private readonly object stateLock = new object(); + private readonly object sendLock = new object(); + + private readonly Queue sendQueue = new Queue(); + + private SocketAsyncEventArgs receiveArgs; + private SocketAsyncEventArgs sendArgs; + + private PendingSend currentSend; + private bool sendInProgress; + private bool began; + private bool held; + private bool destroyed; + private bool closeNotified; + + private int bytesSent; + private int bytesReceived; + + private SocketState state = SocketState.Initial; + + public Socket Socket => sock; + public SocketState State => state; + public int BytesSent => bytesSent; + public int BytesReceived => bytesReceived; + + public IPEndPoint LocalEndPoint => sock.LocalEndPoint as IPEndPoint; + public IPEndPoint RemoteEndPoint => sock.RemoteEndPoint as IPEndPoint; + + public TcpSocket() + { + sock = CreateSocket(); + receiveBuffer = new byte[Math.Max(sock.ReceiveBufferSize, 8192)]; + } + + public TcpSocket(string hostname, ushort port) : this() + { + Connect(hostname, port); + } + + public TcpSocket(IPEndPoint localEndPoint) + { + sock = CreateSocket(); + receiveBuffer = new byte[Math.Max(sock.ReceiveBufferSize, 8192)]; + + sock.Bind(localEndPoint); + sock.Listen(ushort.MaxValue); + state = SocketState.Listening; + } + + public TcpSocket(Socket socket) + { + if (socket == null) + throw new ArgumentNullException(nameof(socket)); + + sock = socket; + ConfigureSocket(sock); + receiveBuffer = new byte[Math.Max(sock.ReceiveBufferSize, 8192)]; + + if (sock.Connected) + state = SocketState.Established; + } + + private static Socket CreateSocket() + { + var s = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + ConfigureSocket(s); + return s; + } + + private static void ConfigureSocket(Socket s) + { + s.NoDelay = true; + + try { s.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); } catch { } + try { s.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); } catch { } + } + + public AsyncReply Connect(string hostname, ushort port) + { + var rt = new AsyncReply(); + + lock (stateLock) + { + if (destroyed || state == SocketState.Closed) + { + rt.Trigger(false); + return rt; + } + + if (state == SocketState.Established) + { + rt.Trigger(true); + return rt; + } + + state = SocketState.Connecting; + } + + try + { + var args = new SocketAsyncEventArgs(); + args.RemoteEndPoint = new DnsEndPoint(hostname, port); + args.Completed += ConnectCompleted; + + bool pending = sock.ConnectAsync(args); + if (!pending) + ProcessConnect(args, rt); + else + args.UserToken = rt; + } + catch (Exception ex) + { + SafeClose(ex, false); + rt.TriggerError(ex); + } + + return rt; + } + + private void ConnectCompleted(object sender, SocketAsyncEventArgs e) + { + var rt = (AsyncReply)e.UserToken; + ProcessConnect(e, rt); + } + + private void ProcessConnect(SocketAsyncEventArgs e, AsyncReply rt) + { + try + { + if (e.SocketError == SocketError.Success) + { + lock (stateLock) + { + if (destroyed || state == SocketState.Closed) + { + rt.Trigger(false); + return; + } + + state = SocketState.Established; + } + + Receiver?.NetworkConnect(this); + Begin(); + rt.Trigger(true); + } + else + { + var ex = new SocketException((int)e.SocketError); + SafeClose(ex, false); + rt.TriggerError(ex); + } + } + finally + { + e.Dispose(); + } + } + + public bool Begin() + { + lock (stateLock) + { + if (destroyed || state != SocketState.Established || began) + return false; + + began = true; + } + + StartReceiveLoop(); + return true; + } + + public AsyncReply BeginAsync() + { + return new AsyncReply(Begin()); + } + + public void Send(byte[] message) + { + if (message == null) + throw new ArgumentNullException(nameof(message)); + + Send(message, 0, message.Length); + } + + public void Send(byte[] message, int offset, int length) + { + if (message == null) + throw new ArgumentNullException(nameof(message)); + + ValidateRange(message, offset, length); + + if (length == 0) + return; + + if (destroyed || state != SocketState.Established) + return; + + var copy = new byte[length]; + Buffer.BlockCopy(message, offset, copy, 0, length); + + lock (sendLock) + { + if (destroyed || state != SocketState.Established) + return; + + sendQueue.Enqueue(new PendingSend + { + Buffer = copy, + Offset = 0, + Count = copy.Length, + Reply = null + }); + + TryStartNextSend_NoLock(); + } + } + + public AsyncReply SendAsync(byte[] message, int offset, int length) + { + var rt = new AsyncReply(); + + if (message == null) + throw new ArgumentNullException(nameof(message)); + + ValidateRange(message, offset, length); + + if (length == 0) + { + rt.Trigger(true); + return rt; + } + + if (destroyed || state != SocketState.Established) + { + rt.Trigger(false); + return rt; + } + + var copy = new byte[length]; + Buffer.BlockCopy(message, offset, copy, 0, length); + + lock (sendLock) + { + if (destroyed || state != SocketState.Established) + { + rt.Trigger(false); + return rt; + } + + sendQueue.Enqueue(new PendingSend + { + Buffer = copy, + Offset = 0, + Count = copy.Length, + Reply = rt + }); + + TryStartNextSend_NoLock(); + } + + return rt; + } + + public ISocket Accept() + { + try + { + var s = sock.Accept(); + return new TcpSocket(s); + } + catch + { + state = SocketState.Closed; + return null; + } + } + + public async AsyncReply AcceptAsync() + { + try + { + var s = await Task.Factory.FromAsync(sock.BeginAccept, sock.EndAccept, null); + return new TcpSocket(s); + } + catch + { + state = SocketState.Closed; + return null; + } + } + + public void Hold() + { + held = true; + } + + public void Unhold() + { + held = false; + + lock (sendLock) + TryStartNextSend_NoLock(); + } + + public void Close() + { + SafeClose(null, true); + } + + public void Destroy() + { + if (destroyed) + return; + + destroyed = true; + SafeClose(null, true); + + try + { + if (receiveArgs != null) + { + receiveArgs.Completed -= IOCompleted; + receiveArgs.Dispose(); + receiveArgs = null; + } + } + catch { } + + try + { + if (sendArgs != null) + { + sendArgs.Completed -= IOCompleted; + sendArgs.Dispose(); + sendArgs = null; + } + } + catch { } + + OnDestroy?.Invoke(this); + OnDestroy = null; + } + + private void StartReceiveLoop() + { + if (receiveArgs != null) + return; + + receiveArgs = new SocketAsyncEventArgs(); + receiveArgs.SetBuffer(receiveBuffer, 0, receiveBuffer.Length); + receiveArgs.Completed += IOCompleted; + + StartReceive(); + } + + private void StartReceive() + { + try + { + if (destroyed || state != SocketState.Established) + return; + + bool pending = sock.ReceiveAsync(receiveArgs); + if (!pending) + ProcessReceive(receiveArgs); + } + catch (Exception ex) + { + SafeClose(ex, true); + } + } + + private void IOCompleted(object sender, SocketAsyncEventArgs e) + { + switch (e.LastOperation) + { + case SocketAsyncOperation.Receive: + ProcessReceive(e); + break; + + case SocketAsyncOperation.Send: + ProcessSend(e); + break; + } + } + + private void ProcessReceive(SocketAsyncEventArgs e) + { + try + { + if (e.SocketError != SocketError.Success) + { + SafeClose(new SocketException((int)e.SocketError), true); + return; + } + + if (e.BytesTransferred <= 0) + { + SafeClose(null, true); + return; + } + + bytesReceived += e.BytesTransferred; + receiveNetworkBuffer.Write(e.Buffer, (uint)e.Offset, (uint)e.BytesTransferred); + Receiver?.NetworkReceive(this, receiveNetworkBuffer); + + StartReceive(); + } + catch (Exception ex) + { + SafeClose(ex, true); + } + } + + private void TryStartNextSend_NoLock() + { + if (held || sendInProgress || destroyed || state != SocketState.Established) + return; + + if (sendQueue.Count == 0) + return; + + currentSend = sendQueue.Dequeue(); + sendInProgress = true; + + if (sendArgs == null) + { + sendArgs = new SocketAsyncEventArgs(); + sendArgs.Completed += IOCompleted; + } + + sendArgs.SetBuffer(currentSend.Buffer, currentSend.Offset, currentSend.Count); + + try + { + bool pending = sock.SendAsync(sendArgs); + if (!pending) + ProcessSend(sendArgs); + } + catch (Exception ex) + { + var reply = currentSend?.Reply; + currentSend = null; + sendInProgress = false; + reply?.TriggerError(ex); + FailPendingSends_NoLock(ex); + SafeClose(ex, true); + } + } + + private void ProcessSend(SocketAsyncEventArgs e) + { + lock (sendLock) + { + try + { + if (currentSend == null) + { + sendInProgress = false; + return; + } + + if (e.SocketError != SocketError.Success) + { + var ex = new SocketException((int)e.SocketError); + currentSend.Reply?.TriggerError(ex); + currentSend = null; + sendInProgress = false; + FailPendingSends_NoLock(ex); + SafeClose(ex, true); + return; + } + + if (e.BytesTransferred <= 0) + { + var ex = new SocketException((int)SocketError.ConnectionReset); + currentSend.Reply?.TriggerError(ex); + currentSend = null; + sendInProgress = false; + FailPendingSends_NoLock(ex); + SafeClose(ex, true); + return; + } + + currentSend.Offset += e.BytesTransferred; + currentSend.Count -= e.BytesTransferred; + + if (currentSend.Count > 0) + { + e.SetBuffer(currentSend.Buffer, currentSend.Offset, currentSend.Count); + + bool pending = sock.SendAsync(e); + if (!pending) + ProcessSend(e); + + return; + } + + currentSend.Reply?.Trigger(true); + currentSend = null; + sendInProgress = false; + + TryStartNextSend_NoLock(); + } + catch (Exception ex) + { + currentSend?.Reply?.TriggerError(ex); + currentSend = null; + sendInProgress = false; + FailPendingSends_NoLock(ex); + SafeClose(ex, true); + } + } + } + + private void FailPendingSends_NoLock(Exception ex) + { + while (sendQueue.Count > 0) + { + var item = sendQueue.Dequeue(); + try + { + item.Reply?.TriggerError(ex); + } + catch { } + } + } + + private void SafeClose(Exception ex, bool notifyReceiver) + { + bool notify = false; + + lock (stateLock) + { + if (state == SocketState.Closed) + return; + + state = SocketState.Closed; + notify = notifyReceiver && !closeNotified; + closeNotified = true; + } + + lock (sendLock) + { + sendInProgress = false; + + if (ex != null) + { + try { currentSend?.Reply?.TriggerError(ex); } catch { } + currentSend = null; + FailPendingSends_NoLock(ex); + } + else + { + currentSend = null; + while (sendQueue.Count > 0) + { + var item = sendQueue.Dequeue(); + try { item.Reply?.Trigger(false); } catch { } + } + } + } + + try { sock.Shutdown(SocketShutdown.Both); } catch { } + try { sock.Close(); } catch { } + try { sock.Dispose(); } catch { } + + if (ex != null) + Global.Log(ex); + + if (notify) + { + try { Receiver?.NetworkClose(this); } + catch (Exception e) { Global.Log(e); } + } + } + + private static void ValidateRange(byte[] message, int offset, int length) + { + if (offset < 0 || length < 0 || offset + length > message.Length) + throw new ArgumentOutOfRangeException(); + } +} \ No newline at end of file diff --git a/Libraries/Esiur/Net/Tcp/TcpServer.cs b/Libraries/Esiur/Net/Tcp/TcpServer.cs index b95ad6d..92de496 100644 --- a/Libraries/Esiur/Net/Tcp/TcpServer.cs +++ b/Libraries/Esiur/Net/Tcp/TcpServer.cs @@ -70,13 +70,13 @@ public class TcpServer : NetworkServer, IResource { if (trigger == ResourceTrigger.Initialize) { - TCPSocket listener; + TcpSocket listener; if (IP != null) - listener = new TCPSocket(new IPEndPoint(IPAddress.Parse(IP), Port)); + listener = new TcpSocket(new IPEndPoint(IPAddress.Parse(IP), Port)); else - listener = new TCPSocket(new IPEndPoint(IPAddress.Any, Port)); + listener = new TcpSocket(new IPEndPoint(IPAddress.Any, Port)); Start(listener); diff --git a/Libraries/Esiur/Protocol/EpConnection.cs b/Libraries/Esiur/Protocol/EpConnection.cs index 389ebd4..fa8bb66 100644 --- a/Libraries/Esiur/Protocol/EpConnection.cs +++ b/Libraries/Esiur/Protocol/EpConnection.cs @@ -1578,7 +1578,7 @@ public partial class EpConnection : NetworkConnection, IStore if (UseWebSocket || RuntimeInformation.OSDescription == "Browser") socket = new FrameworkWebSocket(); else - socket = new TCPSocket(); + socket = new TcpSocket(); } if (port > 0) diff --git a/Libraries/Esiur/Protocol/EpServer.cs b/Libraries/Esiur/Protocol/EpServer.cs index e846bab..bcd2f3d 100644 --- a/Libraries/Esiur/Protocol/EpServer.cs +++ b/Libraries/Esiur/Protocol/EpServer.cs @@ -111,12 +111,12 @@ public class EpServer : NetworkServer, IResource { if (trigger == ResourceTrigger.Initialize) { - TCPSocket listener; + TcpSocket listener; if (IP != null) - listener = new TCPSocket(new IPEndPoint(IPAddress.Parse(IP), Port)); + listener = new TcpSocket(new IPEndPoint(IPAddress.Parse(IP), Port)); else - listener = new TCPSocket(new IPEndPoint(IPAddress.Any, Port)); + listener = new TcpSocket(new IPEndPoint(IPAddress.Any, Port)); Start(listener); }