2
0
mirror of https://github.com/esiur/esiur-dotnet.git synced 2025-06-27 13:33:13 +00:00

AsyncReply is awaitable

This commit is contained in:
2019-07-21 05:29:58 +03:00
parent 48e450ffc4
commit 2d9f61c0d9
26 changed files with 561 additions and 213 deletions

View File

@ -0,0 +1,40 @@
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Text;
namespace Esiur.Engine
{
public class AsyncAwaiter<T> : INotifyCompletion
{
Action callback = null;
T result;
private bool completed;
public AsyncAwaiter(AsyncReply<T> reply)
{
reply.Then(x =>
{
completed = true;
result = x;
callback?.Invoke();
});
}
public T GetResult()
{
return result;
}
public bool IsCompleted => completed;
//From INotifyCompletion
public void OnCompleted(Action continuation)
{
Console.WriteLine("Continue....");
}
}
}

View File

@ -30,21 +30,23 @@ using System.Threading.Tasks;
namespace Esiur.Engine
{
public class AsyncBag<T>:AsyncReply
public class AsyncBag<T>: AsyncReply<T[]>
{
//Dictionary<AsyncReply, T> results = new Dictionary<AsyncReply, T>();
List<AsyncReply> replies = new List<AsyncReply>();
List<IAsyncReply<T>> replies = new List<IAsyncReply<T>>();
List<T> results = new List<T>();
int count = 0;
bool sealedBag = false;
/*
public AsyncBag<T> Then(Action<T[]> callback)
{
base.Then(new Action<object>(o => callback((T[])o)));
return this;
}
*/
public void Seal()
{
@ -72,7 +74,7 @@ namespace Esiur.Engine
}
}
public void Add(AsyncReply reply)
public void Add(IAsyncReply<T> reply)
{
if (!sealedBag)
{
@ -82,6 +84,12 @@ namespace Esiur.Engine
//results.Add(reply, default(T));
}
public void AddBag(AsyncBag<T> bag)
{
foreach (var r in bag.replies)
Add(r);
}
public AsyncBag()
{

View File

@ -63,23 +63,21 @@ namespace Esiur.Engine
{
AsyncReply.ErrorType type;
ExceptionCode code;
public readonly ErrorType Type;
public readonly ExceptionCode Code;
public AsyncReply.ErrorType Type => type;
public ExceptionCode Code => code;
public AsyncException(AsyncReply.ErrorType type, ushort code, string message)
: base(type == AsyncReply.ErrorType.Management ? ((ExceptionCode)code).ToString() : message)
public AsyncException(ErrorType type, ushort code, string message)
: base(type == ErrorType.Management ? ((ExceptionCode)code).ToString() : message)
{
this.type = type;
this.code = (ExceptionCode)code;
this.Type = type;
this.Code = (ExceptionCode)code;
}
public override string ToString()
{
return code.ToString() + ": " + Message;
return Code.ToString() + ": " + Message;
}
}
}

View File

@ -30,18 +30,18 @@ using System.Threading.Tasks;
namespace Esiur.Engine
{
public class AsyncQueue<T> : AsyncReply
public class AsyncQueue<T> : AsyncReply<T>
{
List<AsyncReply<T>> list = new List<AsyncReply<T>>();
//Action<T> callback;
object queueLock = new object();
public AsyncQueue<T> Then(Action<T> callback)
{
base.Then(new Action<object>(o => callback((T)o)));
//public AsyncQueue<T> Then(Action<T> callback)
//{
// base.Then(new Action<object>(o => callback((T)o)));
return this;
}
//return this;
//}
public void Add(AsyncReply<T> reply)
{

View File

@ -32,17 +32,9 @@ namespace Esiur.Engine
{
public class AsyncReply
{
public enum ErrorType
{
Management,
Exception
}
public enum ProgressType
{
Execution,
Network,
}
protected List<Action<object>> callbacks = new List<Action<object>>();
protected object result;

View File

@ -29,13 +29,175 @@ using System.Text;
using System.Threading.Tasks;
using Esiur.Resource;
using System.Reflection;
using System.Threading;
using System.Runtime.CompilerServices;
namespace Esiur.Engine
{
public class AsyncReply<T>: AsyncReply
public class AsyncReply<T>: IAsyncReply<T>
{
protected List<Action<T>> callbacks = new List<Action<T>>();
protected T result;
protected List<Action<AsyncException>> errorCallbacks = new List<Action<AsyncException>>();
public AsyncReply<T> Then(Action<T> callback)
protected List<Action<ProgressType, int, int>> progressCallbacks = new List<Action<ProgressType, int, int>>();
protected List<Action<T>> chunkCallbacks = new List<Action<T>>();
//List<AsyncAwaiter> awaiters = new List<AsyncAwaiter>();
object callbacksLock = new object();
protected bool resultReady = false;
AsyncException exception;
TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();
public bool Ready
{
get { return resultReady; }
}
public object Result
{
get { return result; }
}
public IAsyncReply<T> Then(Action<T> callback)
{
callbacks.Add(callback);
if (resultReady)
callback(result);
return this;
}
public IAsyncReply<T> Error(Action<AsyncException> callback)
{
errorCallbacks.Add(callback);
if (exception != null)
{
callback(exception);
tcs.SetException(exception);
}
return this;
}
public IAsyncReply<T> Progress(Action<ProgressType, int, int> callback)
{
progressCallbacks.Add(callback);
return this;
}
public IAsyncReply<T> Chunk(Action<T> callback)
{
chunkCallbacks.Add(callback);
return this;
}
public void Trigger(object result)
{
lock (callbacksLock)
{
if (resultReady)
return;
this.result = (T)result;
resultReady = true;
foreach (var cb in callbacks)
cb((T)result);
tcs.TrySetResult(result);
}
}
public void TriggerError(AsyncException exception)
{
if (resultReady)
return;
this.exception = exception;
lock (callbacksLock)
{
foreach (var cb in errorCallbacks)
cb(exception);
}
tcs.TrySetException(exception);
}
public void TriggerProgress(ProgressType type, int value, int max)
{
if (resultReady)
return;
lock (callbacksLock)
{
foreach (var cb in progressCallbacks)
cb(type, value, max);
}
}
public void TriggerChunk(object value)
{
if (resultReady)
return;
lock (callbacksLock)
{
foreach (var cb in chunkCallbacks)
cb((T)value);
}
}
public AsyncAwaiter<T> GetAwaiter()
{
return new AsyncAwaiter<T>(this);
}
public Task Task
{
get
{
return tcs.Task;
}
}
public AsyncReply()
{
}
public AsyncReply(T result)
{
resultReady = true;
tcs.SetResult(result);
this.result = result;
}
/*
public AsyncReply<T> Then(Action<T> callback)
{
base.Then(new Action<object>(o => callback((T)o)));
return this;
@ -46,6 +208,15 @@ namespace Esiur.Engine
Trigger((object)result);
}
public Task<bool> MoveNext(CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
public void Dispose()
{
}
public AsyncReply()
{
@ -67,13 +238,15 @@ namespace Esiur.Engine
}
}
public T Current => throw new NotImplementedException();
public AsyncReply(T result)
: base(result)
{
}
*/
}
}

12
Esiur/Engine/ErrorType.cs Normal file
View File

@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace Esiur.Engine
{
public enum ErrorType
{
Management,
Exception
}
}

View File

@ -0,0 +1,21 @@
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Text;
namespace Esiur.Engine
{
public interface IAsyncReply<out T>//IAsyncEnumerator<T>
{
IAsyncReply<T> Then(Action<T> callback);
IAsyncReply<T> Error(Action<AsyncException> callback);
IAsyncReply<T> Progress(Action<ProgressType, int, int> callback);
IAsyncReply<T> Chunk(Action<T> callback);
void Trigger(object result);
void TriggerError(AsyncException exception);
void TriggerProgress(ProgressType type, int value, int max);
void TriggerChunk(object value);
}
}

View File

@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace Esiur.Engine
{
public enum ProgressType
{
Execution,
Network,
}
}