2
0
mirror of https://github.com/esiur/esiur-dotnet.git synced 2026-04-29 06:48:41 +00:00
This commit is contained in:
2026-04-05 23:07:33 +03:00
parent 7b5201418f
commit f225e29f87
+93 -34
View File
@@ -2,6 +2,7 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Net; using System.Net;
using System.Net.Sockets; using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Esiur.Core; using Esiur.Core;
using Esiur.Data; using Esiur.Data;
@@ -129,13 +130,12 @@ public class TcpSocket : ISocket
{ {
var args = new SocketAsyncEventArgs(); var args = new SocketAsyncEventArgs();
args.RemoteEndPoint = new DnsEndPoint(hostname, port); args.RemoteEndPoint = new DnsEndPoint(hostname, port);
args.UserToken = rt;
args.Completed += ConnectCompleted; args.Completed += ConnectCompleted;
bool pending = sock.ConnectAsync(args); bool pending = sock.ConnectAsync(args);
if (!pending) if (!pending)
ProcessConnect(args, rt); ProcessConnect(args);
else
args.UserToken = rt;
} }
catch (Exception ex) catch (Exception ex)
{ {
@@ -148,12 +148,13 @@ public class TcpSocket : ISocket
private void ConnectCompleted(object sender, SocketAsyncEventArgs e) private void ConnectCompleted(object sender, SocketAsyncEventArgs e)
{ {
var rt = (AsyncReply<bool>)e.UserToken; ProcessConnect(e);
ProcessConnect(e, rt);
} }
private void ProcessConnect(SocketAsyncEventArgs e, AsyncReply<bool> rt) private void ProcessConnect(SocketAsyncEventArgs e)
{ {
var rt = e.UserToken as AsyncReply<bool>;
try try
{ {
if (e.SocketError == SocketError.Success) if (e.SocketError == SocketError.Success)
@@ -162,7 +163,7 @@ public class TcpSocket : ISocket
{ {
if (destroyed || state == SocketState.Closed) if (destroyed || state == SocketState.Closed)
{ {
rt.Trigger(false); rt?.Trigger(false);
return; return;
} }
@@ -171,13 +172,13 @@ public class TcpSocket : ISocket
Receiver?.NetworkConnect(this); Receiver?.NetworkConnect(this);
Begin(); Begin();
rt.Trigger(true); rt?.Trigger(true);
} }
else else
{ {
var ex = new SocketException((int)e.SocketError); var ex = new SocketException((int)e.SocketError);
SafeClose(ex, false); SafeClose(ex, false);
rt.TriggerError(ex); rt?.TriggerError(ex);
} }
} }
finally finally
@@ -431,7 +432,7 @@ public class TcpSocket : ISocket
return; return;
} }
bytesReceived += e.BytesTransferred; Interlocked.Add(ref bytesReceived, e.BytesTransferred);
receiveNetworkBuffer.Write(e.Buffer, (uint)e.Offset, (uint)e.BytesTransferred); receiveNetworkBuffer.Write(e.Buffer, (uint)e.Offset, (uint)e.BytesTransferred);
Receiver?.NetworkReceive(this, receiveNetworkBuffer); Receiver?.NetworkReceive(this, receiveNetworkBuffer);
@@ -445,14 +446,33 @@ public class TcpSocket : ISocket
private void TryStartNextSend_NoLock() private void TryStartNextSend_NoLock()
{ {
if (held || sendInProgress || destroyed || state != SocketState.Established) if (held || destroyed || state != SocketState.Established || sendInProgress)
return; return;
if (sendQueue.Count == 0) sendInProgress = true;
PumpSendQueue_NoLock();
}
private void PumpSendQueue_NoLock()
{
while (true)
{
if (held || destroyed || state != SocketState.Established)
{
sendInProgress = false;
return; return;
}
if (currentSend == null)
{
if (sendQueue.Count == 0)
{
sendInProgress = false;
return;
}
currentSend = sendQueue.Dequeue(); currentSend = sendQueue.Dequeue();
sendInProgress = true; }
if (sendArgs == null) if (sendArgs == null)
{ {
@@ -462,11 +482,10 @@ public class TcpSocket : ISocket
sendArgs.SetBuffer(currentSend.Buffer, currentSend.Offset, currentSend.Count); sendArgs.SetBuffer(currentSend.Buffer, currentSend.Offset, currentSend.Count);
bool pending;
try try
{ {
bool pending = sock.SendAsync(sendArgs); pending = sock.SendAsync(sendArgs);
if (!pending)
ProcessSend(sendArgs);
} }
catch (Exception ex) catch (Exception ex)
{ {
@@ -475,20 +494,39 @@ public class TcpSocket : ISocket
sendInProgress = false; sendInProgress = false;
reply?.TriggerError(ex); reply?.TriggerError(ex);
FailPendingSends_NoLock(ex); FailPendingSends_NoLock(ex);
SafeClose(ex, true); CloseDueToSendError_NoLock(ex);
return;
}
if (pending)
{
return;
}
if (!ProcessSendCompletion_NoLock(sendArgs))
return;
} }
} }
private void ProcessSend(SocketAsyncEventArgs e) private void ProcessSend(SocketAsyncEventArgs e)
{ {
lock (sendLock) lock (sendLock)
{
if (!ProcessSendCompletion_NoLock(e))
return;
PumpSendQueue_NoLock();
}
}
private bool ProcessSendCompletion_NoLock(SocketAsyncEventArgs e)
{ {
try try
{ {
if (currentSend == null) if (currentSend == null)
{ {
sendInProgress = false; sendInProgress = false;
return; return false;
} }
if (e.SocketError != SocketError.Success) if (e.SocketError != SocketError.Success)
@@ -498,8 +536,8 @@ public class TcpSocket : ISocket
currentSend = null; currentSend = null;
sendInProgress = false; sendInProgress = false;
FailPendingSends_NoLock(ex); FailPendingSends_NoLock(ex);
SafeClose(ex, true); CloseDueToSendError_NoLock(ex);
return; return false;
} }
if (e.BytesTransferred <= 0) if (e.BytesTransferred <= 0)
@@ -509,29 +547,23 @@ public class TcpSocket : ISocket
currentSend = null; currentSend = null;
sendInProgress = false; sendInProgress = false;
FailPendingSends_NoLock(ex); FailPendingSends_NoLock(ex);
SafeClose(ex, true); CloseDueToSendError_NoLock(ex);
return; return false;
} }
Interlocked.Add(ref bytesSent, e.BytesTransferred);
currentSend.Offset += e.BytesTransferred; currentSend.Offset += e.BytesTransferred;
currentSend.Count -= e.BytesTransferred; currentSend.Count -= e.BytesTransferred;
if (currentSend.Count > 0) if (currentSend.Count > 0)
{ {
e.SetBuffer(currentSend.Buffer, currentSend.Offset, currentSend.Count); return true;
bool pending = sock.SendAsync(e);
if (!pending)
ProcessSend(e);
return;
} }
currentSend.Reply?.Trigger(true); currentSend.Reply?.Trigger(true);
currentSend = null; currentSend = null;
sendInProgress = false; return true;
TryStartNextSend_NoLock();
} }
catch (Exception ex) catch (Exception ex)
{ {
@@ -539,8 +571,8 @@ public class TcpSocket : ISocket
currentSend = null; currentSend = null;
sendInProgress = false; sendInProgress = false;
FailPendingSends_NoLock(ex); FailPendingSends_NoLock(ex);
SafeClose(ex, true); CloseDueToSendError_NoLock(ex);
} return false;
} }
} }
@@ -557,6 +589,33 @@ public class TcpSocket : ISocket
} }
} }
private void CloseDueToSendError_NoLock(Exception ex)
{
bool notify = false;
lock (stateLock)
{
if (state == SocketState.Closed)
return;
state = SocketState.Closed;
notify = !closeNotified;
closeNotified = true;
}
try { sock.Shutdown(SocketShutdown.Both); } catch { }
try { sock.Close(); } catch { }
try { sock.Dispose(); } catch { }
Global.Log(ex);
if (notify)
{
try { Receiver?.NetworkClose(this); }
catch (Exception e) { Global.Log(e); }
}
}
private void SafeClose(Exception ex, bool notifyReceiver) private void SafeClose(Exception ex, bool notifyReceiver)
{ {
bool notify = false; bool notify = false;