2
0
mirror of https://github.com/esiur/esiur-dotnet.git synced 2025-05-06 11:32:59 +00:00

AsyncReply thread safe

This commit is contained in:
Ahmed Zamil 2019-11-12 17:25:37 +03:00
parent 238ac2563b
commit 4f68c08640
3 changed files with 89 additions and 48 deletions

View File

@ -31,6 +31,7 @@ using Esiur.Resource;
using System.Reflection; using System.Reflection;
using System.Threading; using System.Threading;
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
using System.Diagnostics;
namespace Esiur.Core namespace Esiur.Core
{ {
@ -51,19 +52,25 @@ namespace Esiur.Core
//List<AsyncAwaiter> awaiters = new List<AsyncAwaiter>(); //List<AsyncAwaiter> awaiters = new List<AsyncAwaiter>();
// object callbacksLock = new object(); object asyncLock = new object();
//public Timer timeout;// = new Timer()
protected bool resultReady = false; protected bool resultReady = false;
AsyncException exception; AsyncException exception;
StackTrace trace;
AutoResetEvent mutex = new AutoResetEvent(false); AutoResetEvent mutex = new AutoResetEvent(false);
public static int MaxId;
public int Id;
public bool Ready public bool Ready
{ {
get { return resultReady; } get { return resultReady; }
} }
public T Wait() public T Wait()
{ {
@ -71,13 +78,14 @@ namespace Esiur.Core
return result; return result;
if (Debug) if (Debug)
Console.WriteLine($"AsyncReply: {GetHashCode()} Wait"); Console.WriteLine($"AsyncReply: {Id} Wait");
//mutex = new AutoResetEvent(false); //mutex = new AutoResetEvent(false);
mutex.WaitOne(); mutex.WaitOne();
if (Debug) if (Debug)
Console.WriteLine($"AsyncReply: {GetHashCode()} Wait ended"); Console.WriteLine($"AsyncReply: {Id} Wait ended");
return result; return result;
} }
@ -88,33 +96,51 @@ namespace Esiur.Core
get { return result; } get { return result; }
} }
public IAsyncReply<T> Then(Action<T> callback) public IAsyncReply<T> Then(Action<T> callback)
{ {
//lock (callbacksLock) //lock (callbacksLock)
//{ //{
lock (asyncLock)
{
trace = new StackTrace();
if (resultReady)
{
if (Debug)
Console.WriteLine($"AsyncReply: {Id} Then ready");
callback(result);
return this;
}
//timeout = new Timer(x =>
//{
// // Get calling method name
// Console.WriteLine(trace.GetFrame(1).GetMethod().Name);
// var tr = String.Join("\r\n", trace.GetFrames().Select(f => f.GetMethod().Name));
// timeout.Dispose();
// tr = trace.ToString();
// throw new Exception("Request timeout " + Id);
//}, null, 15000, 0);
if (resultReady)
{
if (Debug) if (Debug)
Console.WriteLine($"AsyncReply: {GetHashCode()} Then ready"); Console.WriteLine($"AsyncReply: {Id} Then pending");
callbacks.Add(callback);
callback(result);
return this; return this;
} }
if (Debug)
Console.WriteLine($"AsyncReply: {GetHashCode()} Then pending");
callbacks.Add(callback);
return this;
//}
} }
public IAsyncReply<T> Error(Action<AsyncException> callback) public IAsyncReply<T> Error(Action<AsyncException> callback)
{ {
// lock (callbacksLock) // lock (callbacksLock)
@ -149,37 +175,37 @@ namespace Esiur.Core
public void Trigger(object result) public void Trigger(object result)
{ {
lock (asyncLock)
{
//timeout?.Dispose();
if (Debug) if (Debug)
Console.WriteLine($"AsyncReply: {GetHashCode()} Trigger"); Console.WriteLine($"AsyncReply: {Id} Trigger");
if (resultReady)
return;
this.result = (T)result;
resultReady = true;
//if (mutex != null)
mutex.Set();
foreach (var cb in callbacks)
cb((T)result);
//lock (callbacksLock) if (Debug)
//{ Console.WriteLine($"AsyncReply: {Id} Trigger ended");
if (resultReady)
return;
this.result = (T)result;
resultReady = true;
//if (mutex != null)
mutex.Set();
foreach (var cb in callbacks)
cb((T)result);
if (Debug)
Console.WriteLine($"AsyncReply: {GetHashCode()} Trigger ended");
//}
}
} }
public void TriggerError(Exception exception) public void TriggerError(Exception exception)
{ {
//timeout?.Dispose();
if (resultReady) if (resultReady)
return; return;
@ -201,6 +227,8 @@ namespace Esiur.Core
public void TriggerProgress(ProgressType type, int value, int max) public void TriggerProgress(ProgressType type, int value, int max)
{ {
//timeout?.Dispose();
if (resultReady) if (resultReady)
return; return;
@ -215,6 +243,9 @@ namespace Esiur.Core
public void TriggerChunk(object value) public void TriggerChunk(object value)
{ {
//timeout?.Dispose();
if (resultReady) if (resultReady)
return; return;
@ -235,14 +266,17 @@ namespace Esiur.Core
public AsyncReply() public AsyncReply()
{ {
//this.Debug = true; // this.Debug = true;
Id = MaxId++;
} }
public AsyncReply(T result) public AsyncReply(T result)
{ {
//this.Debug = true; // this.Debug = true;
resultReady = true; resultReady = true;
this.result = result; this.result = result;
Id = MaxId++;
} }
/* /*

View File

@ -285,13 +285,15 @@ namespace Esiur.Net.IIP
void init() void init()
{ {
queue.Then((x) => var q = queue;
q.Then((x) =>
{ {
if (x.Type == DistributedResourceQueueItem.DistributedResourceQueueItemType.Event) if (x.Type == DistributedResourceQueueItem.DistributedResourceQueueItemType.Event)
x.Resource._EmitEventByIndex(x.Index, (object[])x.Value); x.Resource._EmitEventByIndex(x.Index, (object[])x.Value);
else else
x.Resource._UpdatePropertyByIndex(x.Index, x.Value); x.Resource._UpdatePropertyByIndex(x.Index, x.Value);
}); });
//q.timeout?.Dispose();
var r = new Random(); var r = new Random();
localNonce = new byte[32]; localNonce = new byte[32];

View File

@ -179,7 +179,9 @@ namespace Esiur.Net
listener = socket; listener = socket;
// Start accepting // Start accepting
listener.Accept().Then(NewConnection); var r = listener.Accept();
r.Then(NewConnection);
//r.timeout?.Dispose();
//var rt = listener.Accept().Then() //var rt = listener.Accept().Then()
//thread = new Thread(new System.Threading.ThreadStart(ListenForConnections)); //thread = new Thread(new System.Threading.ThreadStart(ListenForConnections));
@ -303,8 +305,11 @@ namespace Esiur.Net
// something wrong with the child. // something wrong with the child.
} }
// Accept more // Accept more
listener.Accept().Then(NewConnection); var l = listener.Accept();
l.Then(NewConnection);
//l.timeout?.Dispose();
sock.Begin(); sock.Begin();