From f225e29f87741d38237c56bdd3035e2fa25af22c Mon Sep 17 00:00:00 2001 From: ahmed Date: Sun, 5 Apr 2026 23:07:33 +0300 Subject: [PATCH] Socket2 --- Libraries/Esiur/Net/Sockets/TcpSocket2.cs | 227 ++++++++++++++-------- 1 file changed, 143 insertions(+), 84 deletions(-) diff --git a/Libraries/Esiur/Net/Sockets/TcpSocket2.cs b/Libraries/Esiur/Net/Sockets/TcpSocket2.cs index 060588c..f77a1d6 100644 --- a/Libraries/Esiur/Net/Sockets/TcpSocket2.cs +++ b/Libraries/Esiur/Net/Sockets/TcpSocket2.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Net; using System.Net.Sockets; +using System.Threading; using System.Threading.Tasks; using Esiur.Core; using Esiur.Data; @@ -129,13 +130,12 @@ public class TcpSocket : ISocket { var args = new SocketAsyncEventArgs(); args.RemoteEndPoint = new DnsEndPoint(hostname, port); + args.UserToken = rt; args.Completed += ConnectCompleted; bool pending = sock.ConnectAsync(args); if (!pending) - ProcessConnect(args, rt); - else - args.UserToken = rt; + ProcessConnect(args); } catch (Exception ex) { @@ -148,12 +148,13 @@ public class TcpSocket : ISocket private void ConnectCompleted(object sender, SocketAsyncEventArgs e) { - var rt = (AsyncReply)e.UserToken; - ProcessConnect(e, rt); + ProcessConnect(e); } - private void ProcessConnect(SocketAsyncEventArgs e, AsyncReply rt) + private void ProcessConnect(SocketAsyncEventArgs e) { + var rt = e.UserToken as AsyncReply; + try { if (e.SocketError == SocketError.Success) @@ -162,7 +163,7 @@ public class TcpSocket : ISocket { if (destroyed || state == SocketState.Closed) { - rt.Trigger(false); + rt?.Trigger(false); return; } @@ -171,13 +172,13 @@ public class TcpSocket : ISocket Receiver?.NetworkConnect(this); Begin(); - rt.Trigger(true); + rt?.Trigger(true); } else { var ex = new SocketException((int)e.SocketError); SafeClose(ex, false); - rt.TriggerError(ex); + rt?.TriggerError(ex); } } finally @@ -431,7 +432,7 @@ public class TcpSocket : ISocket return; } - bytesReceived += e.BytesTransferred; + Interlocked.Add(ref bytesReceived, e.BytesTransferred); receiveNetworkBuffer.Write(e.Buffer, (uint)e.Offset, (uint)e.BytesTransferred); Receiver?.NetworkReceive(this, receiveNetworkBuffer); @@ -445,37 +446,65 @@ public class TcpSocket : ISocket private void TryStartNextSend_NoLock() { - if (held || sendInProgress || destroyed || state != SocketState.Established) + if (held || destroyed || state != SocketState.Established || sendInProgress) return; - if (sendQueue.Count == 0) - return; - - currentSend = sendQueue.Dequeue(); sendInProgress = true; + PumpSendQueue_NoLock(); + } - if (sendArgs == null) + private void PumpSendQueue_NoLock() + { + while (true) { - sendArgs = new SocketAsyncEventArgs(); - sendArgs.Completed += IOCompleted; - } + if (held || destroyed || state != SocketState.Established) + { + sendInProgress = false; + return; + } - sendArgs.SetBuffer(currentSend.Buffer, currentSend.Offset, currentSend.Count); + if (currentSend == null) + { + if (sendQueue.Count == 0) + { + sendInProgress = false; + return; + } - try - { - bool pending = sock.SendAsync(sendArgs); - if (!pending) - ProcessSend(sendArgs); - } - catch (Exception ex) - { - var reply = currentSend?.Reply; - currentSend = null; - sendInProgress = false; - reply?.TriggerError(ex); - FailPendingSends_NoLock(ex); - SafeClose(ex, true); + currentSend = sendQueue.Dequeue(); + } + + if (sendArgs == null) + { + sendArgs = new SocketAsyncEventArgs(); + sendArgs.Completed += IOCompleted; + } + + sendArgs.SetBuffer(currentSend.Buffer, currentSend.Offset, currentSend.Count); + + bool pending; + try + { + pending = sock.SendAsync(sendArgs); + } + catch (Exception ex) + { + var reply = currentSend?.Reply; + currentSend = null; + sendInProgress = false; + reply?.TriggerError(ex); + FailPendingSends_NoLock(ex); + CloseDueToSendError_NoLock(ex); + return; + } + + if (pending) + { + return; + } + + if (!ProcessSendCompletion_NoLock(sendArgs)) + return; } } @@ -483,64 +512,67 @@ public class TcpSocket : ISocket { lock (sendLock) { - try + if (!ProcessSendCompletion_NoLock(e)) + return; + + PumpSendQueue_NoLock(); + } + } + + private bool ProcessSendCompletion_NoLock(SocketAsyncEventArgs e) + { + try + { + if (currentSend == null) { - if (currentSend == null) - { - sendInProgress = false; - return; - } - - if (e.SocketError != SocketError.Success) - { - var ex = new SocketException((int)e.SocketError); - currentSend.Reply?.TriggerError(ex); - currentSend = null; - sendInProgress = false; - FailPendingSends_NoLock(ex); - SafeClose(ex, true); - return; - } - - if (e.BytesTransferred <= 0) - { - var ex = new SocketException((int)SocketError.ConnectionReset); - currentSend.Reply?.TriggerError(ex); - currentSend = null; - sendInProgress = false; - FailPendingSends_NoLock(ex); - SafeClose(ex, true); - return; - } - - currentSend.Offset += e.BytesTransferred; - currentSend.Count -= e.BytesTransferred; - - if (currentSend.Count > 0) - { - e.SetBuffer(currentSend.Buffer, currentSend.Offset, currentSend.Count); - - bool pending = sock.SendAsync(e); - if (!pending) - ProcessSend(e); - - return; - } - - currentSend.Reply?.Trigger(true); - currentSend = null; sendInProgress = false; - - TryStartNextSend_NoLock(); + return false; } - catch (Exception ex) + + if (e.SocketError != SocketError.Success) { - currentSend?.Reply?.TriggerError(ex); + var ex = new SocketException((int)e.SocketError); + currentSend.Reply?.TriggerError(ex); currentSend = null; sendInProgress = false; FailPendingSends_NoLock(ex); - SafeClose(ex, true); + CloseDueToSendError_NoLock(ex); + return false; } + + if (e.BytesTransferred <= 0) + { + var ex = new SocketException((int)SocketError.ConnectionReset); + currentSend.Reply?.TriggerError(ex); + currentSend = null; + sendInProgress = false; + FailPendingSends_NoLock(ex); + CloseDueToSendError_NoLock(ex); + return false; + } + + Interlocked.Add(ref bytesSent, e.BytesTransferred); + + currentSend.Offset += e.BytesTransferred; + currentSend.Count -= e.BytesTransferred; + + if (currentSend.Count > 0) + { + return true; + } + + currentSend.Reply?.Trigger(true); + currentSend = null; + return true; + } + catch (Exception ex) + { + currentSend?.Reply?.TriggerError(ex); + currentSend = null; + sendInProgress = false; + FailPendingSends_NoLock(ex); + CloseDueToSendError_NoLock(ex); + return false; } } @@ -557,6 +589,33 @@ public class TcpSocket : ISocket } } + private void CloseDueToSendError_NoLock(Exception ex) + { + bool notify = false; + + lock (stateLock) + { + if (state == SocketState.Closed) + return; + + state = SocketState.Closed; + notify = !closeNotified; + closeNotified = true; + } + + try { sock.Shutdown(SocketShutdown.Both); } catch { } + try { sock.Close(); } catch { } + try { sock.Dispose(); } catch { } + + Global.Log(ex); + + if (notify) + { + try { Receiver?.NetworkClose(this); } + catch (Exception e) { Global.Log(e); } + } + } + private void SafeClose(Exception ex, bool notifyReceiver) { bool notify = false;