From 3375a814e68428c768b63f77be14c80372bff07d Mon Sep 17 00:00:00 2001 From: Ahmed Zamil Date: Tue, 31 Dec 2019 12:44:54 +0300 Subject: [PATCH] New AsyncReply --- Esyur.Stores.MongoDB/MongoDBStore.cs | 11 +- Esyur/Core/AsyncAwaiter.cs | 4 +- Esyur/Core/AsyncBag.cs | 37 ++- Esyur/Core/AsyncBagGeneric.cs | 70 +++++ Esyur/Core/AsyncReply.cs | 295 +++++++++++++++++- Esyur/Core/AsyncReplyGeneric.cs | 63 +++- Esyur/Data/BinaryList.cs | 2 +- Esyur/Data/Codec.cs | 8 +- Esyur/Esyur.csproj | 2 + Esyur/Net/IIP/DistributedConnection.cs | 2 +- .../Net/IIP/DistributedConnectionProtocol.cs | 10 +- Esyur/Net/SendList.cs | 6 +- Esyur/Resource/Warehouse.cs | 21 +- 13 files changed, 470 insertions(+), 61 deletions(-) create mode 100644 Esyur/Core/AsyncBagGeneric.cs diff --git a/Esyur.Stores.MongoDB/MongoDBStore.cs b/Esyur.Stores.MongoDB/MongoDBStore.cs index 79deb66..2f72f1f 100644 --- a/Esyur.Stores.MongoDB/MongoDBStore.cs +++ b/Esyur.Stores.MongoDB/MongoDBStore.cs @@ -202,6 +202,7 @@ namespace Esyur.Stores.MongoDB x); }); + bag.Add(av); } @@ -222,7 +223,7 @@ namespace Esyur.Stores.MongoDB return rt; } - IAsyncReply Parse(BsonValue value) + AsyncReply Parse(BsonValue value) { if (value.BsonType == BsonType.Document) { @@ -850,8 +851,8 @@ namespace Esyur.Stores.MongoDB foreach (var child in children) { var r = Warehouse.Get(child); - if (r is IAsyncReply) - rt.Add((IAsyncReply)r); + if (r is AsyncReply) + rt.Add(r);// (AsyncReply)r); } rt.Seal(); @@ -882,8 +883,8 @@ namespace Esyur.Stores.MongoDB foreach (var parent in parents) { var r = Warehouse.Get(parent); - if (r is IAsyncReply) - rt.Add((IAsyncReply)r); + if (r is AsyncReply) + rt.Add(r);// (AsyncReply)r); } diff --git a/Esyur/Core/AsyncAwaiter.cs b/Esyur/Core/AsyncAwaiter.cs index cd6953f..9766ed2 100644 --- a/Esyur/Core/AsyncAwaiter.cs +++ b/Esyur/Core/AsyncAwaiter.cs @@ -14,12 +14,12 @@ namespace Esyur.Core T result; - public AsyncAwaiter(AsyncReply reply) + public AsyncAwaiter(AsyncReply reply) { reply.Then(x => { this.IsCompleted = true; - this.result = x; + this.result = (T)x; this.callback?.Invoke(); }).Error(x => { diff --git a/Esyur/Core/AsyncBag.cs b/Esyur/Core/AsyncBag.cs index c27ab99..6b44a92 100644 --- a/Esyur/Core/AsyncBag.cs +++ b/Esyur/Core/AsyncBag.cs @@ -30,23 +30,27 @@ using System.Threading.Tasks; namespace Esyur.Core { - public class AsyncBag: AsyncReply + public class AsyncBag: AsyncReply { - //Dictionary results = new Dictionary(); - List> replies = new List>(); - List results = new List(); + protected List replies = new List(); + List results = new List(); int count = 0; bool sealedBag = false; - /* - public AsyncBag Then(Action callback) + + public AsyncBag Then(Action callback) { - base.Then(new Action(o => callback((T[])o))); + base.Then(new Action(o => callback((object[])o))); return this; } - */ + + public new AsyncAwaiter GetAwaiter() + { + return new AsyncAwaiter(this); + } + public void Seal() { @@ -56,7 +60,7 @@ namespace Esyur.Core sealedBag = true; if (results.Count == 0) - Trigger(new T[0]); + Trigger(new object[0]); for (var i = 0; i < results.Count; i++) //foreach(var reply in results.Keys) @@ -66,7 +70,7 @@ namespace Esyur.Core k.Then((r) => { - results[index] = (T)r; + results[index] = r; count++; if (count == results.Count) Trigger(results.ToArray()); @@ -74,17 +78,16 @@ namespace Esyur.Core } } - public void Add(IAsyncReply reply) + public void Add(AsyncReply reply) { if (!sealedBag) { - results.Add(default(T)); + results.Add(null); replies.Add(reply); } - //results.Add(reply, default(T)); } - public void AddBag(AsyncBag bag) + public void AddBag(AsyncBag bag) { foreach (var r in bag.replies) Add(r); @@ -97,10 +100,10 @@ namespace Esyur.Core } - public AsyncBag(T[] results) + public AsyncBag(object[] results) + : base(results) { - resultReady = true; - base.result = results; + } } diff --git a/Esyur/Core/AsyncBagGeneric.cs b/Esyur/Core/AsyncBagGeneric.cs new file mode 100644 index 0000000..ad12c08 --- /dev/null +++ b/Esyur/Core/AsyncBagGeneric.cs @@ -0,0 +1,70 @@ +/* + +Copyright (c) 2017 Ahmed Kh. Zamil + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + +*/ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Esyur.Core +{ + public class AsyncBag: AsyncBag + { + public AsyncBag Then(Action callback) + { + base.Then(new Action((o) => callback(((object[])o).Select(x=>(T)x).ToArray()))); + return this; + } + + + public void Add(AsyncReply reply) + { + base.Add(reply); + } + + public void AddBag(AsyncBag bag) + { + foreach (var r in bag.replies) + Add(r); + } + + + public new AsyncAwaiter GetAwaiter() + { + return new AsyncAwaiter(this); + } + + public AsyncBag() + { + + } + + public AsyncBag(T[] results) + : base(results.Select(x=>(object)x).ToArray()) + { + + } + } +} diff --git a/Esyur/Core/AsyncReply.cs b/Esyur/Core/AsyncReply.cs index 5ab4dc8..0533a3c 100644 --- a/Esyur/Core/AsyncReply.cs +++ b/Esyur/Core/AsyncReply.cs @@ -36,8 +36,301 @@ using System.Diagnostics; namespace Esyur.Core { [AsyncMethodBuilder(typeof(AsyncReplyBuilder))] - public class AsyncReply : AsyncReply + public class AsyncReply { + public bool Debug = false; + + protected List> callbacks = new List>(); + protected object result; + + protected List> errorCallbacks = new List>(); + + protected List> progressCallbacks = new List>(); + + protected List> chunkCallbacks = new List>(); + + //List awaiters = new List(); + + object asyncLock = new object(); + + //public Timer timeout;// = new Timer() + protected bool resultReady = false; + AsyncException exception; + // StackTrace trace; + AutoResetEvent mutex = new AutoResetEvent(false); + + public static int MaxId; + + public int Id; + + public bool Ready + { + get { return resultReady; } + + } + + + public object Wait() + { + + if (resultReady) + return result; + + if (Debug) + Console.WriteLine($"AsyncReply: {Id} Wait"); + + //mutex = new AutoResetEvent(false); + mutex.WaitOne(); + + if (Debug) + Console.WriteLine($"AsyncReply: {Id} Wait ended"); + + + return result; + } + + + public object Result + { + get { return result; } + } + + + public AsyncReply Then(Action callback) + { + //lock (callbacksLock) + //{ + lock (asyncLock) + { + // trace = new StackTrace(); + + if (resultReady) + { + if (Debug) + Console.WriteLine($"AsyncReply: {Id} Then ready"); + + callback(result); + return this; + } + + + //timeout = new Timer(x => + //{ + // // Get calling method name + // Console.WriteLine(trace.GetFrame(1).GetMethod().Name); + + // var tr = String.Join("\r\n", trace.GetFrames().Select(f => f.GetMethod().Name)); + // timeout.Dispose(); + + // tr = trace.ToString(); + // throw new Exception("Request timeout " + Id); + //}, null, 15000, 0); + + + if (Debug) + Console.WriteLine($"AsyncReply: {Id} Then pending"); + + + + callbacks.Add(callback); + + return this; + } + } + + + + public AsyncReply Error(Action callback) + { + // lock (callbacksLock) + // { + errorCallbacks.Add(callback); + + if (exception != null) + callback(exception); + + return this; + //} + } + + public AsyncReply Progress(Action callback) + { + //lock (callbacksLock) + //{ + progressCallbacks.Add(callback); + return this; + //} + } + + + public AsyncReply Chunk(Action callback) + { + // lock (callbacksLock) + // { + chunkCallbacks.Add(callback); + return this; + // } + } + + public void Trigger(object result) + { + lock (asyncLock) + { + //timeout?.Dispose(); + + if (Debug) + Console.WriteLine($"AsyncReply: {Id} Trigger"); + + if (resultReady) + return; + + this.result = result; + + resultReady = true; + + //if (mutex != null) + mutex.Set(); + + foreach (var cb in callbacks) + cb(result); + + + if (Debug) + Console.WriteLine($"AsyncReply: {Id} Trigger ended"); + + } + } + + public void TriggerError(Exception exception) + { + //timeout?.Dispose(); + + if (resultReady) + return; + + if (exception is AsyncException) + this.exception = exception as AsyncException; + else + this.exception = new AsyncException(ErrorType.Management, 0, exception.Message); + + + // lock (callbacksLock) + // { + foreach (var cb in errorCallbacks) + cb(this.exception); + // } + + mutex?.Set(); + + } + + public void TriggerProgress(ProgressType type, int value, int max) + { + //timeout?.Dispose(); + + if (resultReady) + return; + + //lock (callbacksLock) + //{ + foreach (var cb in progressCallbacks) + cb(type, value, max); + + //} + } + + + public void TriggerChunk(object value) + { + + //timeout?.Dispose(); + + if (resultReady) + return; + + //lock (callbacksLock) + //{ + foreach (var cb in chunkCallbacks) + cb(value); + + //} + } + + public AsyncAwaiter GetAwaiter() + { + return new AsyncAwaiter(this); + } + + + + public AsyncReply() + { + // this.Debug = true; + Id = MaxId++; + } + + public AsyncReply(object result) + { + // this.Debug = true; + resultReady = true; + this.result = result; + + Id = MaxId++; + } + + /* + public AsyncReply Then(Action callback) + { + base.Then(new Action(o => callback((T)o))); + return this; + } + + public void Trigger(T result) + { + Trigger((object)result); + } + + public Task MoveNext(CancellationToken cancellationToken) + { + throw new NotImplementedException(); + } + + public void Dispose() + { + } + + public AsyncReply() + { + + } + + public new Task Task + { + get + { + return base.Task.ContinueWith((t) => + { + +#if NETSTANDARD + return (T)t.GetType().GetTypeInfo().GetProperty("Result").GetValue(t); +#else + return (T)t.GetType().GetProperty("Result").GetValue(t); +#endif + }); + } + } + + public T Current => throw new NotImplementedException(); + + public AsyncReply(T result) + : base(result) + { + + } + + */ + + } } diff --git a/Esyur/Core/AsyncReplyGeneric.cs b/Esyur/Core/AsyncReplyGeneric.cs index ed362f2..c91a1a7 100644 --- a/Esyur/Core/AsyncReplyGeneric.cs +++ b/Esyur/Core/AsyncReplyGeneric.cs @@ -36,28 +36,64 @@ using System.Diagnostics; namespace Esyur.Core { [AsyncMethodBuilder(typeof(AsyncReplyBuilder<>))] - public class AsyncReply : IAsyncReply + public class AsyncReply : AsyncReply { - public bool Debug = false; + public AsyncReply Then(Action callback) + { + base.Then((x)=>callback((T)x)); + return this; + } - protected List> callbacks = new List>(); - protected T result; + public new AsyncReply Progress(Action callback) + { + base.Progress(callback); + return this; + } - protected List> errorCallbacks = new List>(); - protected List> progressCallbacks = new List>(); + public AsyncReply Chunk(Action callback) + { + chunkCallbacks.Add((x)=>callback((T)x)); + return this; + } - protected List> chunkCallbacks = new List>(); + public AsyncReply(T result) + : base(result) + { + + } + + public AsyncReply() + :base() + { + + } + + public new AsyncAwaiter GetAwaiter() + { + return new AsyncAwaiter(this); + } + + + /* + protected new List callbacks = new List(); + protected new object result; + + protected new List> errorCallbacks = new List>(); + + protected new List> progressCallbacks = new List>(); + + protected new List chunkCallbacks = new List(); //List awaiters = new List(); object asyncLock = new object(); //public Timer timeout;// = new Timer() - protected bool resultReady = false; + AsyncException exception; - // StackTrace trace; + // StackTrace trace; AutoResetEvent mutex = new AutoResetEvent(false); public static int MaxId; @@ -103,7 +139,7 @@ namespace Esyur.Core //{ lock (asyncLock) { - // trace = new StackTrace(); + // trace = new StackTrace(); if (resultReady) { @@ -323,15 +359,12 @@ namespace Esyur.Core public T Current => throw new NotImplementedException(); - public AsyncReply(T result) - : base(result) - { - - } + */ + } } diff --git a/Esyur/Data/BinaryList.cs b/Esyur/Data/BinaryList.cs index f38ec2f..69b2634 100644 --- a/Esyur/Data/BinaryList.cs +++ b/Esyur/Data/BinaryList.cs @@ -705,7 +705,7 @@ namespace Esyur.Data return list.ToArray(); } - public virtual IAsyncReply Done() + public virtual AsyncReply Done() { return null; // diff --git a/Esyur/Data/Codec.cs b/Esyur/Data/Codec.cs index 34f3631..a838565 100644 --- a/Esyur/Data/Codec.cs +++ b/Esyur/Data/Codec.cs @@ -365,7 +365,7 @@ namespace Esyur.Data /// DistributedConnection is required in case a structure in the array holds items at the other end. /// DataType, in case the data is not prepended with DataType /// Structure - public static IAsyncReply Parse(byte[] data, uint offset, DistributedConnection connection, DataType dataType = DataType.Unspecified) + public static AsyncReply Parse(byte[] data, uint offset, DistributedConnection connection, DataType dataType = DataType.Unspecified) { uint size; return Parse(data, offset, out size, connection); @@ -380,7 +380,7 @@ namespace Esyur.Data /// DistributedConnection is required in case a structure in the array holds items at the other end. /// DataType, in case the data is not prepended with DataType /// Value - public static IAsyncReply Parse(byte[] data, uint offset, out uint size, DistributedConnection connection, DataType dataType = DataType.Unspecified) + public static AsyncReply Parse(byte[] data, uint offset, out uint size, DistributedConnection connection, DataType dataType = DataType.Unspecified) { bool isArray; @@ -697,7 +697,7 @@ namespace Esyur.Data // var result = (ResourceComparisonResult)data[offset++]; - IAsyncReply previous = null; + AsyncReply previous = null; if (result == ResourceComparisonResult.Null) previous = new AsyncReply(null); @@ -719,7 +719,7 @@ namespace Esyur.Data { result = (ResourceComparisonResult)data[offset++]; - IAsyncReply current = null; + AsyncReply current = null; if (result == ResourceComparisonResult.Null) { diff --git a/Esyur/Esyur.csproj b/Esyur/Esyur.csproj index 3cd7563..b9b0552 100644 --- a/Esyur/Esyur.csproj +++ b/Esyur/Esyur.csproj @@ -28,6 +28,7 @@ + @@ -37,6 +38,7 @@ + diff --git a/Esyur/Net/IIP/DistributedConnection.cs b/Esyur/Net/IIP/DistributedConnection.cs index cabe6f1..a119978 100644 --- a/Esyur/Net/IIP/DistributedConnection.cs +++ b/Esyur/Net/IIP/DistributedConnection.cs @@ -114,7 +114,7 @@ namespace Esyur.Net.IIP /// Send data to the other end as parameters /// /// Values will be converted to bytes then sent. - internal SendList SendParams(IAsyncReply reply = null)//params object[] values) + internal SendList SendParams(AsyncReply reply = null)//params object[] values) { return new SendList(this, reply); diff --git a/Esyur/Net/IIP/DistributedConnectionProtocol.cs b/Esyur/Net/IIP/DistributedConnectionProtocol.cs index db4ae78..42486f7 100644 --- a/Esyur/Net/IIP/DistributedConnectionProtocol.cs +++ b/Esyur/Net/IIP/DistributedConnectionProtocol.cs @@ -50,7 +50,7 @@ namespace Esyur.Net.IIP Dictionary templates = new Dictionary(); - KeyList> requests = new KeyList>(); + KeyList requests = new KeyList(); volatile uint callbackCounter = 0; @@ -1196,10 +1196,10 @@ namespace Esyur.Net.IIP //await t; //SendParams((byte)0x90, callback, Codec.Compose(res, this)); } - else if (rt is IAsyncReply)// Codec.ImplementsInterface(rt.GetType(), typeof(IAsyncReply<>)))// rt.GetType().GetTypeInfo().IsGenericType + else if (rt is AsyncReply)// Codec.ImplementsInterface(rt.GetType(), typeof(IAsyncReply<>)))// rt.GetType().GetTypeInfo().IsGenericType //&& rt.GetType().GetGenericTypeDefinition() == typeof(IAsyncReply<>)) { - (rt as IAsyncReply).Then(res => + (rt as AsyncReply).Then(res => { SendReply(IIPPacket.IIPPacketAction.InvokeFunctionArrayArguments, callback) .AddUInt8Array(Codec.Compose(res, this)) @@ -1345,9 +1345,9 @@ namespace Esyur.Net.IIP }); } - else if (rt is IAsyncReply) + else if (rt is AsyncReply) { - (rt as IAsyncReply).Then(res => + (rt as AsyncReply).Then(res => { SendReply(IIPPacket.IIPPacketAction.InvokeFunctionNamedArguments, callback) .AddUInt8Array(Codec.Compose(res, this)) diff --git a/Esyur/Net/SendList.cs b/Esyur/Net/SendList.cs index 8428551..61e9d43 100644 --- a/Esyur/Net/SendList.cs +++ b/Esyur/Net/SendList.cs @@ -9,15 +9,15 @@ namespace Esyur.Net public class SendList : BinaryList { NetworkConnection connection; - IAsyncReply reply; + AsyncReply reply; - public SendList(NetworkConnection connection, IAsyncReply reply) + public SendList(NetworkConnection connection, AsyncReply reply) { this.reply = reply; this.connection = connection; } - public override IAsyncReply Done() + public override AsyncReply Done() { connection.Send(this.ToArray()); return reply; diff --git a/Esyur/Resource/Warehouse.cs b/Esyur/Resource/Warehouse.cs index f91f833..a921331 100644 --- a/Esyur/Resource/Warehouse.cs +++ b/Esyur/Resource/Warehouse.cs @@ -60,6 +60,7 @@ namespace Esyur.Resource private static Regex urlRegex = new Regex(@"^(?:([\S]*)://([^/]*)/?)"); + private static object resourcesLock = new object(); static KeyList> getSupportedProtocols() { @@ -591,12 +592,13 @@ namespace Esyur.Resource public static bool Remove(IResource resource) { - + if (resource.Instance == null) return false; if (resources.ContainsKey(resource.Instance.Id)) - resources.Remove(resource.Instance.Id); + lock(resourcesLock) + resources.Remove(resource.Instance.Id); else return false; @@ -604,12 +606,17 @@ namespace Esyur.Resource { stores.Remove(resource as IStore); - // remove all objects associated with the store - var toBeRemoved = resources.Values.Where(x => + WeakReference[] toBeRemoved; + + lock (resourcesLock) { - IResource r; - return x.TryGetTarget(out r) && r.Instance.Store == resource; - }).ToArray(); + // remove all objects associated with the store + toBeRemoved = resources.Values.Where(x => + { + IResource r; + return x.TryGetTarget(out r) && r.Instance.Store == resource; + }).ToArray(); + } foreach (var o in toBeRemoved) {