From 20fcaba518d8f41027b26981b8c054e57e2e437b Mon Sep 17 00:00:00 2001 From: Ahmed Zamil Date: Thu, 31 Mar 2022 02:27:30 +0300 Subject: [PATCH] deadlock prevention --- Esiur/Data/Codec.cs | 10 +- Esiur/Data/DataDeserializer.cs | 121 +++++++------ Esiur/Net/HTTP/HTTPServer.cs | 11 +- Esiur/Net/IIP/DistributedConnection.cs | 2 +- .../Net/IIP/DistributedConnectionProtocol.cs | 169 ++++++++++-------- Esiur/Resource/Template/TypeTemplate.cs | 142 ++++++++++----- Test/Program.cs | 29 +-- 7 files changed, 268 insertions(+), 216 deletions(-) diff --git a/Esiur/Data/Codec.cs b/Esiur/Data/Codec.cs index bcba455..ac4b94c 100644 --- a/Esiur/Data/Codec.cs +++ b/Esiur/Data/Codec.cs @@ -43,7 +43,7 @@ namespace Esiur.Data; public static class Codec { - delegate AsyncReply Parser(byte[] data, uint offset, uint length, DistributedConnection connection); + delegate AsyncReply Parser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence); static Parser[][] FixedParsers = new Parser[][] @@ -114,7 +114,7 @@ public static class Codec /// DistributedConnection is required in case a structure in the array holds items at the other end. /// DataType, in case the data is not prepended with DataType /// Value - public static (uint, AsyncReply) Parse(byte[] data, uint offset, DistributedConnection connection, TransmissionType? dataType = null) + public static (uint, AsyncReply) Parse(byte[] data, uint offset, DistributedConnection connection, uint[] requestSequence, TransmissionType? dataType = null) { uint len = 0; @@ -132,15 +132,15 @@ public static class Codec if (tt.Class == TransmissionTypeClass.Fixed) { - return (len, FixedParsers[tt.Exponent][tt.Index](data, dataType.Value.Offset, (uint)tt.ContentLength, connection)); + return (len, FixedParsers[tt.Exponent][tt.Index](data, dataType.Value.Offset, (uint)tt.ContentLength, connection, requestSequence)); } else if (tt.Class == TransmissionTypeClass.Dynamic) { - return (len, DynamicParsers[tt.Index](data, dataType.Value.Offset, (uint)tt.ContentLength, connection)); + return (len, DynamicParsers[tt.Index](data, dataType.Value.Offset, (uint)tt.ContentLength, connection, requestSequence)); } else //if (tt.Class == TransmissionTypeClass.Typed) { - return (len, TypedParsers[tt.Index](data, dataType.Value.Offset, (uint)tt.ContentLength, connection)); + return (len, TypedParsers[tt.Index](data, dataType.Value.Offset, (uint)tt.ContentLength, connection, requestSequence)); } } diff --git a/Esiur/Data/DataDeserializer.cs b/Esiur/Data/DataDeserializer.cs index c454acc..c01987b 100644 --- a/Esiur/Data/DataDeserializer.cs +++ b/Esiur/Data/DataDeserializer.cs @@ -12,115 +12,115 @@ namespace Esiur.Data; public static class DataDeserializer { - public static AsyncReply NullParser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static AsyncReply NullParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { return new AsyncReply(null); } - public static AsyncReply BooleanTrueParser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static AsyncReply BooleanTrueParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { return new AsyncReply(true); } - public static AsyncReply BooleanFalseParser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static AsyncReply BooleanFalseParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { return new AsyncReply(false); } - public static AsyncReply NotModifiedParser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static AsyncReply NotModifiedParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { return new AsyncReply(new NotModified()); } - public static AsyncReply ByteParser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static AsyncReply ByteParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { return new AsyncReply(data[offset]); } - public static AsyncReply SByteParser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static AsyncReply SByteParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { return new AsyncReply((sbyte)data[offset]); } - public static unsafe AsyncReply Char16Parser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static unsafe AsyncReply Char16Parser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { fixed (byte* ptr = &data[offset]) return new AsyncReply(*(char*)ptr); } - public static AsyncReply Char8Parser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static AsyncReply Char8Parser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { return new AsyncReply((char)data[offset]); } - public static unsafe AsyncReply Int16Parser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static unsafe AsyncReply Int16Parser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { fixed (byte* ptr = &data[offset]) return new AsyncReply(*(short*)ptr); } - public static unsafe AsyncReply UInt16Parser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static unsafe AsyncReply UInt16Parser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { fixed (byte* ptr = &data[offset]) return new AsyncReply(*(ushort*)ptr); } - public static unsafe AsyncReply Int32Parser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static unsafe AsyncReply Int32Parser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { fixed (byte* ptr = &data[offset]) return new AsyncReply(*(int*)ptr); } - public static unsafe AsyncReply UInt32Parser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static unsafe AsyncReply UInt32Parser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { fixed (byte* ptr = &data[offset]) return new AsyncReply(*(uint*)ptr); } - public static unsafe AsyncReply Float32Parser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static unsafe AsyncReply Float32Parser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { fixed (byte* ptr = &data[offset]) return new AsyncReply(*(float*)ptr); } - public static unsafe AsyncReply Float64Parser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static unsafe AsyncReply Float64Parser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { fixed (byte* ptr = &data[offset]) return new AsyncReply(*(double*)ptr); } - public static unsafe AsyncReply Float128Parser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static unsafe AsyncReply Float128Parser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { fixed (byte* ptr = &data[offset]) return new AsyncReply(*(decimal*)ptr); } - public static unsafe AsyncReply Int128Parser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static unsafe AsyncReply Int128Parser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { fixed (byte* ptr = &data[offset]) return new AsyncReply(*(decimal*)ptr); } - public static unsafe AsyncReply UInt128Parser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static unsafe AsyncReply UInt128Parser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { fixed (byte* ptr = &data[offset]) return new AsyncReply(*(decimal*)ptr); } - public static unsafe AsyncReply Int64Parser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static unsafe AsyncReply Int64Parser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { fixed (byte* ptr = &data[offset]) return new AsyncReply(*(long*)ptr); } - public static unsafe AsyncReply UInt64Parser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static unsafe AsyncReply UInt64Parser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { fixed (byte* ptr = &data[offset]) return new AsyncReply(*(ulong*)ptr); } - public static unsafe AsyncReply DateTimeParser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static unsafe AsyncReply DateTimeParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { fixed (byte* ptr = &data[offset]) return new AsyncReply(new DateTime(*(long*)ptr, DateTimeKind.Utc)); @@ -128,14 +128,14 @@ public static class DataDeserializer } - public static unsafe AsyncReply ResourceParser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static unsafe AsyncReply ResourceParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { fixed (byte* ptr = &data[offset]) - return connection.Fetch(*(uint*)ptr); + return connection.Fetch(*(uint*)ptr, requestSequence); } - public static unsafe AsyncReply LocalResourceParser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static unsafe AsyncReply LocalResourceParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { fixed (byte* ptr = &data[offset]) return Warehouse.GetById(*(uint*)ptr); @@ -143,17 +143,17 @@ public static class DataDeserializer } - public static unsafe AsyncReply RawDataParser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static unsafe AsyncReply RawDataParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { return new AsyncReply(data.Clip(offset, length)); } - public static unsafe AsyncReply StringParser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static unsafe AsyncReply StringParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { return new AsyncReply(data.GetString(offset, length)); } - public static unsafe AsyncReply RecordParser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static unsafe AsyncReply RecordParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { var reply = new AsyncReply(); @@ -165,10 +165,9 @@ public static class DataDeserializer var template = Warehouse.GetTemplateByClassId((Guid)classId, TemplateType.Record); - if (template != null) + var initRecord = (TypeTemplate template) => { - //ListParser(data, offset, length, connection) - ListParser(data, offset, length, connection).Then(r => + ListParser(data, offset, length, connection, requestSequence).Then(r => { var ar = (object[])r; @@ -181,7 +180,8 @@ public static class DataDeserializer { var v = Convert.ChangeType(ar[i], template.Properties[i].PropertyInfo.PropertyType); template.Properties[i].PropertyInfo.SetValue(record, v); - } catch ( Exception ex) + } + catch (Exception ex) { Console.WriteLine(ex); } @@ -199,21 +199,24 @@ public static class DataDeserializer reply.Trigger(record); } }); + }; + + if (template != null) + { + initRecord(template); } else { connection.GetTemplate((Guid)classId).Then(tmp => { - ListParser(data, offset, length, connection).Then(r => + ListParser(data, offset, length, connection, requestSequence).Then(r => { - var ar = (object[])r; + if (tmp == null) + reply.TriggerError(new AsyncException(ErrorType.Management, (ushort)ExceptionCode.TemplateNotFound, + "Template not found for record.")); + else + initRecord(tmp); - var record = new Record(); - - for (var i = 0; i < tmp.Properties.Length; i++) - record.Add(tmp.Properties[i].Name, ar[i]); - - reply.Trigger(record); }); }).Error(x => reply.TriggerError(x)); } @@ -221,12 +224,12 @@ public static class DataDeserializer return reply; } - public static unsafe AsyncReply ConstantParser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static unsafe AsyncReply ConstantParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { throw new NotImplementedException(); } - public static unsafe AsyncReply EnumParser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static unsafe AsyncReply EnumParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { var classId = data.GetGuid(offset); @@ -245,7 +248,7 @@ public static class DataDeserializer connection.GetTemplate((Guid)classId).Then(tmp => { - reply.Trigger(tmp.Constants[index].Value); + reply.Trigger(tmp.Constants[index].Value); }).Error(x => reply.TriggerError(x)); return reply; @@ -254,13 +257,13 @@ public static class DataDeserializer - public static AsyncReply RecordListParser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static AsyncReply RecordListParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { var rt = new AsyncBag(); while (length > 0) { - var (cs, reply) = Codec.Parse(data, offset, connection); + var (cs, reply) = Codec.Parse(data, offset, connection, requestSequence); rt.Add(reply); @@ -278,13 +281,13 @@ public static class DataDeserializer return rt; } - public static AsyncReply ResourceListParser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static AsyncReply ResourceListParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { var rt = new AsyncBag(); while (length > 0) { - var (cs, reply) = Codec.Parse(data, offset, connection); + var (cs, reply) = Codec.Parse(data, offset, connection, requestSequence); rt.Add(reply); @@ -303,13 +306,13 @@ public static class DataDeserializer } - public static AsyncBag ListParser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static AsyncBag ListParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { var rt = new AsyncBag(); while (length > 0) { - var (cs, reply) = Codec.Parse(data, offset, connection); + var (cs, reply) = Codec.Parse(data, offset, connection, requestSequence); rt.Add(reply); @@ -327,7 +330,7 @@ public static class DataDeserializer return rt; } - public static AsyncReply TypedMapParser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static AsyncReply TypedMapParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { // get key type var (keyCs, keyRepType) = RepresentationType.Parse(data, offset); @@ -346,7 +349,7 @@ public static class DataDeserializer while (length > 0) { - var (cs, reply) = Codec.Parse(data, offset, connection); + var (cs, reply) = Codec.Parse(data, offset, connection, requestSequence); results.Add(reply); @@ -376,7 +379,7 @@ public static class DataDeserializer } - public static AsyncReply TupleParser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static AsyncReply TupleParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { var results = new AsyncBag(); var rt = new AsyncReply(); @@ -396,7 +399,7 @@ public static class DataDeserializer while (length > 0) { - var (cs, reply) = Codec.Parse(data, offset, connection); + var (cs, reply) = Codec.Parse(data, offset, connection, requestSequence); results.Add(reply); @@ -435,7 +438,7 @@ public static class DataDeserializer return rt; } - public static AsyncReply TypedListParser(byte[] data, uint offset, uint length, DistributedConnection connection) + public static AsyncReply TypedListParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) { var rt = new AsyncBag(); @@ -451,7 +454,7 @@ public static class DataDeserializer while (length > 0) { - var (cs, reply) = Codec.Parse(data, offset, connection); + var (cs, reply) = Codec.Parse(data, offset, connection, requestSequence); rt.Add(reply); @@ -470,12 +473,12 @@ public static class DataDeserializer } - public static AsyncBag PropertyValueArrayParser(byte[] data, uint offset, uint length, DistributedConnection connection)//, bool ageIncluded = true) + public static AsyncBag PropertyValueArrayParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence)//, bool ageIncluded = true) { var rt = new AsyncBag(); - ListParser(data, offset, length, connection).Then(x => + ListParser(data, offset, length, connection, requestSequence).Then(x => { var ar = (object[])x; var pvs = new List(); @@ -491,7 +494,7 @@ public static class DataDeserializer } - public static (uint, AsyncReply) PropertyValueParser(byte[] data, uint offset, DistributedConnection connection)//, bool ageIncluded = true) + public static (uint, AsyncReply) PropertyValueParser(byte[] data, uint offset, DistributedConnection connection, uint[] requestSequence)//, bool ageIncluded = true) { var reply = new AsyncReply(); @@ -502,7 +505,7 @@ public static class DataDeserializer offset += 8; - var (valueSize, results) = Codec.Parse(data, offset, connection); + var (valueSize, results) = Codec.Parse(data, offset, connection, requestSequence); results.Then(value => { @@ -512,7 +515,7 @@ public static class DataDeserializer return (16 + valueSize, reply); } - public static AsyncReply> HistoryParser(byte[] data, uint offset, uint length, IResource resource, DistributedConnection connection) + public static AsyncReply> HistoryParser(byte[] data, uint offset, uint length, IResource resource, DistributedConnection connection, uint[] requestSequence) { //var count = (int)toAge - (int)fromAge; @@ -531,7 +534,7 @@ public static class DataDeserializer var cs = data.GetUInt32(offset, Endian.Little); offset += 4; - var (len, pv) = PropertyValueParser(data, offset, connection); + var (len, pv) = PropertyValueParser(data, offset, connection, requestSequence); bagOfBags.Add(pv);// ParsePropertyValueArray(data, offset, cs, connection)); offset += len; diff --git a/Esiur/Net/HTTP/HTTPServer.cs b/Esiur/Net/HTTP/HTTPServer.cs index d26df7f..0b73ba5 100644 --- a/Esiur/Net/HTTP/HTTPServer.cs +++ b/Esiur/Net/HTTP/HTTPServer.cs @@ -57,7 +57,7 @@ public class HTTPServer : NetworkServer, IResource [HTTPRequestPacket.HTTPMethod.UNKNOWN] = new List(), [HTTPRequestPacket.HTTPMethod.DELETE] = new List(), [HTTPRequestPacket.HTTPMethod.TRACE] = new List(), - [HTTPRequestPacket.HTTPMethod.CONNECT] = new List(), + [HTTPRequestPacket.HTTPMethod.CONNECT] = new List(), [HTTPRequestPacket.HTTPMethod.PUT] = new List() }; @@ -274,11 +274,10 @@ public class HTTPServer : NetworkServer, IResource internal bool Execute(HTTPConnection sender) { - foreach (var route in routes[sender.Request.Method]) - { - if (route.Invoke(sender)) - return true; - } + if (!sender.WSMode) + foreach (var route in routes[sender.Request.Method]) + if (route.Invoke(sender)) + return true; foreach (var resource in filters) diff --git a/Esiur/Net/IIP/DistributedConnection.cs b/Esiur/Net/IIP/DistributedConnection.cs index 396e8c9..5cde61e 100644 --- a/Esiur/Net/IIP/DistributedConnection.cs +++ b/Esiur/Net/IIP/DistributedConnection.cs @@ -1225,7 +1225,7 @@ public partial class DistributedConnection : NetworkConnection, IStore for (var i = 0; i < resources.Keys.Count; i++) { var index = resources.Keys.ElementAt(i); - bag.Add(Fetch(index)); + bag.Add(Fetch(index, null)); } bag.Seal(); diff --git a/Esiur/Net/IIP/DistributedConnectionProtocol.cs b/Esiur/Net/IIP/DistributedConnectionProtocol.cs index 7e0f8be..a4b2df4 100644 --- a/Esiur/Net/IIP/DistributedConnectionProtocol.cs +++ b/Esiur/Net/IIP/DistributedConnectionProtocol.cs @@ -232,7 +232,7 @@ partial class DistributedConnection { var req = requests.Take(callbackId); - var (_, parsed) = Codec.Parse(content, 0, this, transmissionType); + var (_, parsed) = Codec.Parse(content, 0, this, null, transmissionType); parsed.Then((rt) => { req?.Trigger(rt); @@ -256,7 +256,7 @@ partial class DistributedConnection if (requests.ContainsKey(callbackId)) { var req = requests[callbackId]; - var (_, parsed) = Codec.Parse(data, dataType.Offset, this, dataType); + var (_, parsed) = Codec.Parse(data, dataType.Offset, this, null, dataType); parsed.Then((x) => { req.TriggerChunk(x); @@ -282,12 +282,12 @@ partial class DistributedConnection void IIPEventPropertyUpdated(uint resourceId, byte index, TransmissionType dataType, byte[] data) { - Fetch(resourceId).Then(r => + Fetch(resourceId, null).Then(r => { var item = new AsyncReply(); queue.Add(item); - var (_, parsed) = Codec.Parse(data, dataType.Offset, this, dataType);// 0, this); + var (_, parsed) = Codec.Parse(data, dataType.Offset, this, null, dataType);// 0, this); parsed.Then((arguments) => { var pt = r.Instance.Template.GetPropertyTemplateByIndex(index); @@ -345,13 +345,13 @@ partial class DistributedConnection void IIPEventEventOccurred(uint resourceId, byte index, TransmissionType dataType, byte[] data) { - Fetch(resourceId).Then(r => + Fetch(resourceId, null).Then(r => { // push to the queue to gaurantee serialization var item = new AsyncReply(); queue.Add(item); - var (_, parsed) = Codec.Parse(data, dataType.Offset, this, dataType);//, 0, this); + var (_, parsed) = Codec.Parse(data, dataType.Offset, this, null, dataType);//, 0, this); parsed.Then((arguments) => { var et = r.Instance.Template.GetEventTemplateByIndex(index); @@ -406,9 +406,9 @@ partial class DistributedConnection void IIPEventChildAdded(uint resourceId, uint childId) { - Fetch(resourceId).Then(parent => + Fetch(resourceId, null).Then(parent => { - Fetch(childId).Then(child => + Fetch(childId, null).Then(child => { parent.children.Add(child); child.parents.Add(parent); @@ -420,9 +420,9 @@ partial class DistributedConnection void IIPEventChildRemoved(uint resourceId, uint childId) { - Fetch(resourceId).Then(parent => + Fetch(resourceId, null).Then(parent => { - Fetch(childId).Then(child => + Fetch(childId, null).Then(child => { parent.children.Remove(child); child.parents.Remove(parent); @@ -434,7 +434,7 @@ partial class DistributedConnection void IIPEventRenamed(uint resourceId, string name) { - Fetch(resourceId).Then(resource => + Fetch(resourceId, null).Then(resource => { resource.Instance.Variables["name"] = name; }); @@ -443,7 +443,7 @@ partial class DistributedConnection void IIPEventAttributesUpdated(uint resourceId, byte[] attributes) { - Fetch(resourceId).Then(resource => + Fetch(resourceId, null).Then(resource => { var attrs = attributes.GetStringArray(0, (uint)attributes.Length); @@ -724,19 +724,19 @@ partial class DistributedConnection return; } - DataDeserializer.ListParser(content, offset, cl, this).Then(parameters => + DataDeserializer.ListParser(content, offset, cl, this, null).Then(parameters => { offset += cl; cl = content.GetUInt32(offset, Endian.Little); //Codec.ParseStructure(content, offset, cl, this).Then(attributes => - DataDeserializer.TypedMapParser(content, offset, cl, this).Then(attributes => + DataDeserializer.TypedMapParser(content, offset, cl, this, null).Then(attributes => { offset += cl; cl = (uint)content.Length - offset; //Codec.ParseStructure(content, offset, cl, this).Then(values => - DataDeserializer.TypedMapParser(content, offset, cl, this).Then(values => + DataDeserializer.TypedMapParser(content, offset, cl, this, null).Then(values => { #if NETSTANDARD @@ -1049,7 +1049,7 @@ partial class DistributedConnection } - DataDeserializer.TypedMapParser(attributes, 0, (uint)attributes.Length, this).Then(attrs => + DataDeserializer.TypedMapParser(attributes, 0, (uint)attributes.Length, this, null).Then(attrs => { if (r.Instance.SetAttributes((Map)attrs, clearAttributes)) SendReply(clearAttributes ? IIPPacket.IIPPacketAction.ClearAllAttributes : IIPPacket.IIPPacketAction.ClearAttributes, @@ -1232,11 +1232,11 @@ partial class DistributedConnection return; } - var (_, parsed) = Codec.Parse(content, 0, this, transmissionType); + var (_, parsed) = Codec.Parse(content, 0, this, null, transmissionType); parsed.Then(results => { - var arguments = (Map)results ;// (object[])results; + var arguments = (Map)results;// (object[])results; // un hold the socket to send data immediately this.Socket.Unhold(); @@ -1281,14 +1281,14 @@ partial class DistributedConnection { if (pis.Last().ParameterType == typeof(DistributedConnection)) { - for(byte i = 0; i< pis.Length - 1; i++) - args[i] = arguments.ContainsKey(i) ? + for (byte i = 0; i < pis.Length - 1; i++) + args[i] = arguments.ContainsKey(i) ? DC.CastConvert(arguments[i], pis[i].ParameterType) : Type.Missing; args[args.Length - 1] = this; } else { - for (byte i = 0; i < pis.Length ; i++) + for (byte i = 0; i < pis.Length; i++) args[i] = arguments.ContainsKey(i) ? DC.CastConvert(arguments[i], pis[i].ParameterType) : Type.Missing; } @@ -1331,13 +1331,13 @@ partial class DistributedConnection (rt as Task).ContinueWith(t => { #if NETSTANDARD - var res = t.GetType().GetTypeInfo().GetProperty("Result").GetValue(t); + var res = t.GetType().GetTypeInfo().GetProperty("Result").GetValue(t); #else var res = t.GetType().GetProperty("Result").GetValue(t); #endif - SendReply(IIPPacket.IIPPacketAction.InvokeFunction, callback) - .AddUInt8Array(Codec.Compose(res, this)) - .Done(); + SendReply(IIPPacket.IIPPacketAction.InvokeFunction, callback) + .AddUInt8Array(Codec.Compose(res, this)) + .Done(); }); //await t; @@ -1631,7 +1631,7 @@ partial class DistributedConnection var pt = r.Instance.Template.GetPropertyTemplateByIndex(index); if (pt != null) { - var (_, parsed) = Codec.Parse(content, 0, this, transmissionType); + var (_, parsed) = Codec.Parse(content, 0, this, null, transmissionType); parsed.Then((value) => { if (r is DistributedResource) @@ -2000,15 +2000,14 @@ partial class DistributedConnection /// Class GUID /// Resource IdGuid classId /// DistributedResource - public AsyncReply Fetch(uint id) + public AsyncReply Fetch(uint id, uint[] requestSequence) { var resource = resources[id]; var request = resourceRequests[id]; if (request != null) { - if (resource != null) - // dig for dead locks // or not + if (resource != null && (requestSequence?.Contains(id) ?? false)) return new AsyncReply(resource); else return request; @@ -2022,17 +2021,28 @@ partial class DistributedConnection var reply = new AsyncReply(); resourceRequests.Add(id, reply); + var newSequence = requestSequence != null ? requestSequence.Concat(new uint[] { id }).ToArray() : new uint[] { id }; + SendRequest(IIPPacket.IIPPacketAction.AttachResource) .AddUInt32(id) .Done() .Then((rt) => { + if (rt == null) + { + reply.TriggerError(new AsyncException(ErrorType.Management, + (ushort)ExceptionCode.ResourceNotFound, "Null response")); + return; + } + DistributedResource dr; + TypeTemplate template = null; + Guid classId = (Guid)rt[0]; if (resource == null) { - var template = Warehouse.GetTemplateByClassId((Guid)rt[0], TemplateType.Wrapper); + template = Warehouse.GetTemplateByClassId(classId, TemplateType.Wrapper); if (template?.DefinedType != null) dr = Activator.CreateInstance(template.DefinedType, this, id, (ulong)rt[1], (string)rt[2]) as DistributedResource; else @@ -2044,57 +2054,64 @@ partial class DistributedConnection var transmissionType = (TransmissionType)rt[3]; var content = (byte[])rt[4]; - GetTemplate((Guid)rt[0]).Then((tmp) => + var initResource = (DistributedResource ok) => + { + var (_, parsed) = Codec.Parse(content, 0, this, newSequence, transmissionType); + parsed.Then(results => + { + var ar = results as object[]; + + var pvs = new List(); + + for (var i = 0; i < ar.Length; i += 3) + pvs.Add(new PropertyValue(ar[i + 2], (ulong?)ar[i], (DateTime?)ar[i + 1])); + + dr._Attach(pvs.ToArray());// (PropertyValue[])pvs); + resourceRequests.Remove(id); + reply.Trigger(dr); + }).Error(ex => reply.TriggerError(ex)); + + }; + + if (template == null) + { + GetTemplate((Guid)rt[0]).Then((tmp) => + { + // ClassId, ResourceAge, ResourceLink, Content + if (resource == null) + { + Warehouse.Put(id.ToString(), dr, this, null, tmp).Then(initResource).Error(ex => reply.TriggerError(ex)); + } + else + { + initResource(resource); + } + }).Error((ex) => + { + reply.TriggerError(ex); + }); + + } + else { - // ClassId, ResourceAge, ResourceLink, Content if (resource == null) { - Warehouse.Put(id.ToString(), dr, this, null, tmp).Then((ok) => - { - var (_, parsed) = Codec.Parse(content, 0, this, transmissionType); - parsed.Then(results => - { - var ar = results as object[]; - - var pvs = new List(); - - for (var i = 0; i < ar.Length; i += 3) - pvs.Add(new PropertyValue(ar[i + 2], (ulong?)ar[i], (DateTime?)ar[i + 1])); - - dr._Attach(pvs.ToArray());// (PropertyValue[])pvs); - resourceRequests.Remove(id); - reply.Trigger(dr); - }).Error(ex => reply.TriggerError(ex)); - - }).Error(ex => reply.TriggerError(ex)); + Warehouse.Put(id.ToString(), dr, this, null, template) + .Then(initResource).Error((ex) => reply.TriggerError(ex)); } else { - var (_, parsed) = Codec.Parse(content, 0, this, transmissionType); - parsed.Then((results) => - { - var ar = results as object[]; - - var pvs = new List(); - - for (var i = 0; i < ar.Length; i += 3) - pvs.Add(new PropertyValue(ar[i + 2], (ulong?)ar[i], (DateTime?)ar[i + 1])); - - dr._Attach(pvs.ToArray());// (PropertyValue[])pvs); - - // resourceRequests.Remove(id); - reply.Trigger(dr); - }).Error(ex => reply.TriggerError(ex)); + initResource(resource); } - }).Error((ex) => - { - reply.TriggerError(ex); - }); + + } + }).Error((ex) => { reply.TriggerError(ex); }); + return reply; } @@ -2111,7 +2128,7 @@ partial class DistributedConnection var dataType = (TransmissionType)ar[0]; var data = (byte[])ar[1]; - var (_, parsed) = Codec.Parse(data, dataType.Offset, this, dataType); + var (_, parsed) = Codec.Parse(data, dataType.Offset, this, null, dataType); parsed.Then(resources => rt.Trigger(resources)) .Error(ex => rt.TriggerError(ex)); @@ -2136,7 +2153,7 @@ partial class DistributedConnection { var dataType = (TransmissionType)ar[0]; var data = (byte[])ar[1]; - var (_, parsed) = Codec.Parse(data, dataType.Offset, this, dataType); + var (_, parsed) = Codec.Parse(data, dataType.Offset, this, null, dataType); parsed.Then(resources => rt.Trigger(resources)) .Error(ex => rt.TriggerError(ex)); @@ -2204,7 +2221,7 @@ partial class DistributedConnection var dataType = (TransmissionType)ar[0]; var data = (byte[])ar[1]; //Codec.Parse(d, ) - var (_, parsed) = Codec.Parse(data, 0, this, dataType); + var (_, parsed) = Codec.Parse(data, 0, this, null, dataType); parsed.Then(st => { @@ -2227,7 +2244,7 @@ partial class DistributedConnection var dataType = (TransmissionType)ar[0]; var data = (byte[])ar[1]; - var (_, parsed) = Codec.Parse(data, 0, this, dataType); + var (_, parsed) = Codec.Parse(data, 0, this, null, dataType); parsed.Then(st => { @@ -2268,7 +2285,7 @@ partial class DistributedConnection { var content = (byte[])rt[0]; - DataDeserializer.HistoryParser(content, 0, (uint)content.Length, resource, this) + DataDeserializer.HistoryParser(content, 0, (uint)content.Length, resource, this, null) .Then((history) => reply.Trigger(history)); }).Error((ex) => reply.TriggerError(ex)); @@ -2298,7 +2315,7 @@ partial class DistributedConnection var dataType = (TransmissionType)ar[0]; var data = ar[1] as byte[]; - var (_, parsed) = Codec.Parse(data, dataType.Offset, this, dataType); + var (_, parsed) = Codec.Parse(data, dataType.Offset, this, null, dataType); parsed.Then(resources => reply.Trigger(resources)) .Error(ex => reply.TriggerError(ex)); @@ -2342,7 +2359,7 @@ partial class DistributedConnection { var rid = (uint)args[0]; - Fetch(rid).Then((r) => + Fetch(rid, null).Then((r) => { reply.Trigger(r); }); diff --git a/Esiur/Resource/Template/TypeTemplate.cs b/Esiur/Resource/Template/TypeTemplate.cs index 22b4405..961dc77 100644 --- a/Esiur/Resource/Template/TypeTemplate.cs +++ b/Esiur/Resource/Template/TypeTemplate.cs @@ -167,13 +167,45 @@ public class TypeTemplate return new Guid(hash); } - static Type GetElementType(Type type) => type switch + static Type[] GetDistributedTypes(Type type) { - { IsArray: true } => type.GetElementType(), - // { IsEnum: true } => type.GetEnumUnderlyingType(), - (_) => type - }; + if (type.IsArray) + return GetDistributedTypes(type.GetElementType()); + else if (type.IsEnum) + return new Type[] { type }; + else if (type.IsGenericType) + { + var genericType = type.GetGenericTypeDefinition(); + var genericTypeArgs = type.GetGenericArguments(); + if (genericType == typeof(List<>) + || genericType == typeof(DistributedPropertyContext<>)) + { + return GetDistributedTypes(genericTypeArgs[0]); + } + else if (genericType == typeof(Tuple<>) + || genericType == typeof(Map<,>)) + { + var rt = new List(); + for (var i = 0; i < genericTypeArgs.Length; i++) + { + var depTypes = GetDistributedTypes(genericTypeArgs[i]); + foreach (var depType in depTypes) + if (!rt.Contains(depType)) + rt.Add(depType); + } + + return rt.ToArray(); + } + } + else if (Codec.ImplementsInterface(type, typeof(IRecord)) + || Codec.ImplementsInterface(type, typeof(IResource))) + { + return new Type[] { type }; + } + + return new Type[0]; + } public static TypeTemplate[] GetDependencies(TypeTemplate template) @@ -205,13 +237,20 @@ public class TypeTemplate // functions foreach (var f in tmp.functions) { - var frtt = Warehouse.GetTemplateByType(GetElementType(f.MethodInfo.ReturnType)); - if (frtt != null) + var functionReturnTypes = GetDistributedTypes(f.MethodInfo.ReturnType); + //.Select(x => Warehouse.GetTemplateByType(x)) + //.Where(x => x != null && !bag.Contains(x)) + + foreach (var functionReturnType in functionReturnTypes) { - if (!bag.Contains(frtt)) + var functionReturnTemplate = Warehouse.GetTemplateByType(functionReturnType); + if (functionReturnTemplate != null) { - list.Add(frtt); - getDependenciesFunc(frtt, bag); + if (!bag.Contains(functionReturnTemplate)) + { + list.Add(functionReturnTemplate); + getDependenciesFunc(functionReturnTemplate, bag); + } } } @@ -219,24 +258,11 @@ public class TypeTemplate for (var i = 0; i < args.Length - 1; i++) { - var fpt = Warehouse.GetTemplateByType(GetElementType(args[i].ParameterType)); - if (fpt != null) - { - if (!bag.Contains(fpt)) - { - bag.Add(fpt); - getDependenciesFunc(fpt, bag); - } - } - } + var fpTypes = GetDistributedTypes(args[i].ParameterType); - // skip DistributedConnection argument - if (args.Length > 0) - { - var last = args.Last(); - if (last.ParameterType != typeof(DistributedConnection)) + foreach (var fpType in fpTypes) { - var fpt = Warehouse.GetTemplateByType(GetElementType(last.ParameterType)); + var fpt = Warehouse.GetTemplateByType(fpType); if (fpt != null) { if (!bag.Contains(fpt)) @@ -248,18 +274,47 @@ public class TypeTemplate } } + // skip DistributedConnection argument + if (args.Length > 0) + { + var last = args.Last(); + if (last.ParameterType != typeof(DistributedConnection)) + { + + var fpTypes = GetDistributedTypes(last.ParameterType); + + foreach (var fpType in fpTypes) + { + var fpt = Warehouse.GetTemplateByType(fpType); + if (fpt != null) + { + if (!bag.Contains(fpt)) + { + bag.Add(fpt); + getDependenciesFunc(fpt, bag); + } + } + } + } + } + } // properties foreach (var p in tmp.properties) { - var pt = Warehouse.GetTemplateByType(GetElementType(p.PropertyInfo.PropertyType)); - if (pt != null) + var propertyTypes = GetDistributedTypes(p.PropertyInfo.PropertyType); + + foreach (var propertyType in propertyTypes) { - if (!bag.Contains(pt)) + var propertyTemplate = Warehouse.GetTemplateByType(propertyType); + if (propertyTemplate != null) { - bag.Add(pt); - getDependenciesFunc(pt, bag); + if (!bag.Contains(propertyTemplate)) + { + bag.Add(propertyTemplate); + getDependenciesFunc(propertyTemplate, bag); + } } } } @@ -267,14 +322,19 @@ public class TypeTemplate // events foreach (var e in tmp.events) { - var et = Warehouse.GetTemplateByType(GetElementType(e.EventInfo.EventHandlerType.GenericTypeArguments[0])); + var eventTypes = GetDistributedTypes(e.EventInfo.EventHandlerType.GenericTypeArguments[0]); - if (et != null) + foreach (var eventType in eventTypes) { - if (!bag.Contains(et)) + var eventTemplate = Warehouse.GetTemplateByType(eventType); + + if (eventTemplate != null) { - bag.Add(et); - getDependenciesFunc(et, bag); + if (!bag.Contains(eventTemplate)) + { + bag.Add(eventTemplate); + getDependenciesFunc(eventTemplate, bag); + } } } } @@ -323,7 +383,7 @@ public class TypeTemplate if (addToWarehouse) Warehouse.PutTemplate(this); - + PropertyInfo[] propsInfo = type.GetProperties(BindingFlags.Public | BindingFlags.Instance);// | BindingFlags.DeclaredOnly); EventInfo[] eventsInfo = type.GetEvents(BindingFlags.Public | BindingFlags.Instance);// | BindingFlags.DeclaredOnly); @@ -358,7 +418,7 @@ public class TypeTemplate if (templateType == TemplateType.Enum) value = Convert.ChangeType(value, ci.FieldType.GetEnumUnderlyingType()); - + var ct = new ConstantTemplate(this, i++, ci.Name, ci.DeclaringType != type, valueType, value, annotationAttr?.Annotation); @@ -707,11 +767,11 @@ public class TypeTemplate } - public static bool HasParent (Type type) + public static bool HasParent(Type type) { var parent = type.BaseType; - if (parent == typeof(Resource) + if (parent == typeof(Resource) || parent == typeof(Record)) return false; @@ -887,7 +947,7 @@ public class TypeTemplate offset += dts; - (dts, var value) = Codec.Parse(data, offset, null); + (dts, var value) = Codec.Parse(data, offset, null, null); offset += dts; diff --git a/Test/Program.cs b/Test/Program.cs index 3ec926b..667c7b7 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -50,34 +50,7 @@ namespace Test static async Task Main(string[] args) { - - //foreach (var a in ma) - // Console.WriteLine(a); - - //var route = "users/{id:int:min(1)}/{name:string}.jpg"; - //var route = "users/{id}/{name}.jpg"; - //var rr = getRouteRegex(route); - - - - // var m = rr.Match("users/222/fun.jpg"); - -// Console.WriteLine(m.Value); - - // var escaped = Regex.Escape(route); - - // var replace = Regex.Replace(route, @"\{([^}]*)\}", @"\{([^}]*)\}"); - - // var ss = route.Split('{', '}'); - - //var regex = "users/\" - //Regex regex = new Regex(@"\(([^()]+)\)*"); - - //foreach (Match match in regex.Matches("You id is (1) and your number is (0000000000)")) - //{ - // Console.WriteLine(match.Value); - //} - + // Create stores to keep objects. var system = await Warehouse.Put("mem", new MemoryStore()); var server = await Warehouse.Put("mem/server", new DistributedServer());