/* Copyright (c) 2017-2025 Ahmed Kh. Zamil Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ using Esiur.Core; using Esiur.Data; using Esiur.Data.Types; using Esiur.Misc; using Esiur.Net; using Esiur.Net.Packets; using Esiur.Resource; using Esiur.Security.Authority; using Esiur.Security.Permissions; using Microsoft.CodeAnalysis.CSharp.Syntax; using System; using System.Collections; using System.Collections.Generic; using System.ComponentModel.DataAnnotations; using System.Linq; using System.Reflection; using System.Reflection.Emit; using System.Security.Cryptography.X509Certificates; using System.Text; using System.Threading.Tasks; namespace Esiur.Protocol; partial class EpConnection { KeyList _neededTypeDefs = new KeyList(); KeyList _cachedTypeDefs = new KeyList(); KeyList> _typeDefRequests = new KeyList>(); //KeyList> _typeDefsByIdRequests = new KeyList>(); KeyList _neededResources = new KeyList(); KeyList> _attachedResources = new KeyList>(); KeyList> _suspendedResources = new KeyList>(); KeyList> _resourceRequests = new KeyList>(); //KeyList> _typeDefsByIdRequests = new KeyList>(); //KeyList> _typeDefsByNameRequests = new KeyList>(); //Dictionary typeDefs = new Dictionary(); object _typeDefsLock = new object(); KeyList _requests = new KeyList(); volatile int _callbackCounter = 0; Dictionary> _subscriptions = new Dictionary>(); // resources might get attached by the client internal KeyList _cache = new(); object _subscriptionsLock = new object(); AsyncQueue _queue = new(); /// /// Send EP request. /// /// Packet action. /// Arguments to send. /// /// AsyncReply SendRequest(EpPacketRequest action, params object[] args) { var reply = new AsyncReply(); var c = (uint)Interlocked.Increment(ref _callbackCounter); //callbackCounter++; // avoid thread racing _requests.Add(c, reply); if (args.Length == 0) { var bl = new BinaryList(); bl.AddUInt8((byte)(0x40 | (byte)action)) .AddUInt32(c); Send(bl.ToArray()); } if (args.Length == 1) { var bl = new BinaryList(); bl.AddUInt8((byte)(0x60 | (byte)action)) .AddUInt32(c) .AddUInt8Array(Codec.Compose(args[0], this.Instance.Warehouse, this)); Send(bl.ToArray()); } else { var bl = new BinaryList(); bl.AddUInt8((byte)(0x60 | (byte)action)) .AddUInt32(c) .AddUInt8Array(Codec.Compose(args, this.Instance.Warehouse, this)); Send(bl.ToArray()); } return reply; } //void SendAuthMaterials(EpAuthPacketMethod method, AuthenticationMaterial[] authenticationMaterials) //{ // if (authenticationMaterials != null) // { // var authMap = new Map(); // foreach (var material in authenticationMaterials) // authMap.Add(material.Type, material.Value); // var bl = new BinaryList(); // bl.AddUInt8((byte)((byte)method | 0x20)); // bl.AddUInt8Array(Codec.Compose(authMap, Instance.Warehouse, this)); // Send(bl.ToArray()); // } // else // { // Send(new byte[] { (byte)method }); // } //} void SendAuthData(EpAuthPacketMethod method, object data) { if (data != null) { var bl = new BinaryList(); bl.AddUInt8((byte)((byte)method | 0x20)); bl.AddUInt8Array(Codec.Compose(data, null, this)); Send(bl.ToArray()); } else { Send(new byte[] { (byte)method }); } } void SendAuth(EpAuthPacketMethod method) { Send(new byte[] { (byte)method }); } void SendAuthHeaders(EpAuthPacketMethod method, Map authHeaders) { if (authHeaders != null) { //var authMap = new Map(); //foreach (var header in authHeaders) //{ // authMap.Add(header.Key, header.Value); //} var bl = new BinaryList(); bl.AddUInt8((byte)((byte)method | 0x20)); bl.AddUInt8Array(Codec.Compose(authHeaders, null, this)); Send(bl.ToArray()); } else { Send(new byte[] { (byte)method }); } } /// /// Send EP notification. /// /// Packet action. /// Arguments to send. /// AsyncReply SendNotification(EpPacketNotification action, params object[] args) { var reply = new AsyncReply(); if (args.Length == 0) { Send(new byte[] { (byte)action }); } if (args.Length == 1) { var bl = new BinaryList(); bl.AddUInt8((byte)(0x20 | (byte)action)) .AddUInt8Array(Codec.Compose(args[0], this.Instance.Warehouse, this)); Send(bl.ToArray()); } else { var bl = new BinaryList(); bl.AddUInt8((byte)(0x20 | (byte)action)) .AddUInt8Array(Codec.Compose(args, this.Instance.Warehouse, this)); Send(bl.ToArray()); } return reply; } void SendReply(EpPacketReply action, uint callbackId, params object[] args) { if (Instance == null) return; if (args.Length == 0) { var bl = new BinaryList(); bl.AddUInt8((byte)(0x80 | (byte)action)) .AddUInt32(callbackId); Send(bl.ToArray()); } if (args.Length == 1) { var bl = new BinaryList(); bl.AddUInt8((byte)(0xA0 | (byte)action)) .AddUInt32(callbackId) .AddUInt8Array(Codec.Compose(args[0], this.Instance.Warehouse, this)); Send(bl.ToArray()); } else { var bl = new BinaryList(); bl.AddUInt8((byte)(0xA0 | (byte)action)) .AddUInt32(callbackId) .AddUInt8Array(Codec.Compose(args, this.Instance.Warehouse, this)); Send(bl.ToArray()); } } internal AsyncReply SendSubscribeRequest(uint instanceId, byte index) { return SendRequest(EpPacketRequest.Subscribe, instanceId, index); } internal AsyncReply SendUnsubscribeRequest(uint instanceId, byte index) { return SendRequest(EpPacketRequest.Unsubscribe, instanceId, index); } public AsyncReply StaticCall(ulong typeId, byte index, object parameters) { return SendRequest(EpPacketRequest.StaticCall, typeId, index, parameters); } public AsyncReply Call(string procedureCall, params object[] parameters) { //var args = new Map(); //for (byte i = 0; i < parameters.Length; i++) // args.Add(i, parameters[i]); // return Call(procedureCall, parameters); return SendRequest(EpPacketRequest.ProcedureCall, procedureCall, parameters); } public AsyncReply Call(string procedureCall, Map parameters) { return SendRequest(EpPacketRequest.ProcedureCall, procedureCall, parameters); } internal AsyncReply SendInvoke(uint instanceId, byte index, object parameters) { return SendRequest(EpPacketRequest.InvokeFunction, instanceId, index, parameters); } internal AsyncReply SendSetProperty(uint instanceId, byte index, object value) { return SendRequest(EpPacketRequest.SetProperty, instanceId, index, value); } internal AsyncReply SendDetachRequest(uint instanceId) { try { var sendDetach = false; if (_attachedResources.ContainsKey(instanceId)) { _attachedResources.Remove(instanceId); sendDetach = true; } if (_suspendedResources.ContainsKey(instanceId)) { _suspendedResources.Remove(instanceId); sendDetach = true; } if (sendDetach) return SendRequest(EpPacketRequest.DetachResource, instanceId); return null; // no one is waiting for this } catch { return null; } } void SendError(ErrorType type, uint callbackId, ushort errorCodeOrWarningLevel, string message = "") { if (type == ErrorType.Management) SendReply(EpPacketReply.PermissionError, callbackId, errorCodeOrWarningLevel, message); else if (type == ErrorType.Exception) SendReply(EpPacketReply.ExecutionError, callbackId, errorCodeOrWarningLevel, message); else if (type == ErrorType.Warning) SendReply(EpPacketReply.Warning, callbackId, (byte)errorCodeOrWarningLevel, message); } internal void SendProgress(uint callbackId, uint value, uint max) { SendReply(EpPacketReply.Progress, callbackId, value, max); } internal void SendWarning(uint callbackId, byte level, string message) { SendReply(EpPacketReply.Warning, callbackId, level, message); } internal void SendChunk(uint callbackId, object chunk) { SendReply(EpPacketReply.Chunk, callbackId, chunk); } void EpReplyCompleted(uint callbackId, PlainTdu tdu) { var req = _requests.Take(callbackId); //Console.WriteLine("Completed " + callbackId); if (req == null) { // @TODO: Send general failure return; } var pr = Codec.Parse(tdu, this, null); if (pr is AsyncReply asyncReply) { asyncReply.Then(req.Trigger) .Error(req.TriggerError); } else { req.Trigger(pr); } //var pr = Codec.ParseAsync(dataType, this, null).Then(pr => //{ // if (pr.Value is AsyncReply asyncReply) // { // asyncReply.Then(req.Trigger) // .Error(req.TriggerError); // } // else // { // req.Trigger(pr.Value); // } //}).Error(req.TriggerError); } void EpExtensionAction(byte actionId, PlainTdu? tdu) { // nothing is supported now } void EpReplyPropagated(uint callbackId, PlainTdu tdu) { var req = _requests[callbackId]; if (req == null) { // @TODO: Send general failure return; } var value = Codec.Parse(tdu, this, null); if (value is AsyncReply reply) { reply.Then(req.TriggerPropagation) .Error(req.TriggerError); } else { req.TriggerPropagation(value); } //var pr = Codec.ParseAsync(dataType, this, null).Then(pr => //{ // if (pr.Value is AsyncReply reply) // { // reply.Then(req.TriggerPropagation) // .Error(req.TriggerError); // } // else // { // req.TriggerPropagation(pr.Value); // } //}).Error(req.TriggerError); } void EpReplyError(uint callbackId, PlainTdu plainTdu, ErrorType type) { var req = _requests.Take(callbackId); if (req == null) { // @TODO: Send general failure return; } var tdu = ParsedTdu.ParseSync(plainTdu.Data, plainTdu.TduOffset, plainTdu.Ends, Instance.Warehouse); var args = DataDeserializer.ListParser(tdu, Instance.Warehouse) as object[]; var errorCode = Convert.ToUInt16(args[0]); var errorMsg = (string)args[1]; req.TriggerError(new AsyncException(type, errorCode, errorMsg)); } void EpReplyProgress(uint callbackId, PlainTdu plainTdu) { var req = _requests[callbackId]; if (req == null) { // @TODO: Send general failure return; } var tdu = ParsedTdu.ParseSync(plainTdu.Data, plainTdu.TduOffset, plainTdu.Ends, Instance.Warehouse); var args = DataDeserializer.ListParser(tdu, Instance.Warehouse) as object[]; var current = (uint)args[0]; var total = (uint)args[1]; req.TriggerProgress(ProgressType.Execution, current, total); } void EpReplyWarning(uint callbackId, PlainTdu plainTdu) { var req = _requests[callbackId]; if (req == null) { // @TODO: Send general failure return; } var tdu = ParsedTdu.ParseSync(plainTdu.Data, plainTdu.TduOffset, plainTdu.Ends, Instance.Warehouse); var args = DataDeserializer.ListParser(tdu, Instance.Warehouse) as object[]; var level = (byte)args[0]; var message = (string)args[1]; req.TriggerWarning(level, message); } void EpReplyChunk(uint callbackId, PlainTdu tdu) { var req = _requests[callbackId]; if (req == null) return; var value = Codec.Parse(tdu, this, null); if (value is AsyncReply asyncReply) { asyncReply.Then(req.TriggerChunk) .Error(req.TriggerError); } else { req.TriggerChunk(value); } //Codec.ParseAsync(dataType, this, null).Then(pr => //{ // if (pr.Value is AsyncReply asyncReply) // { // asyncReply.Then(req.TriggerChunk) // .Error(req.TriggerError); // } // else // { // req.TriggerChunk(pr.Value); // } //}).Error(req.TriggerError); } void EpNotificationResourceReassigned(PlainTdu dataType) { // uint resourceId, uint newResourceId } void EpNotificationResourceMoved(PlainTdu tdu) { } void EpNotificationSystemFailure(PlainTdu tdu) { } void EpNotificationResourceDestroyed(PlainTdu tdu) { var (size, rt) = Codec.ParseSync(tdu.Data, tdu.TduOffset, Instance.Warehouse); var resourceId = Convert.ToUInt32(rt); if (_attachedResources.Contains(resourceId)) { EpResource r; if (_attachedResources[resourceId].TryGetTarget(out r)) { // remove from attached to avoid sending unnecessary detach request when Destroy() is called _attachedResources.Remove(resourceId); r.Destroy(); } else { _attachedResources.Remove(resourceId); } } else if (_neededResources.Contains(resourceId)) { // @TODO: handle this mess _neededResources.Remove(resourceId); } } void EpNotificationPropertyModified(PlainTdu tdu) { // resourceId, index, value var (valueOffset, valueSize, args) = DataDeserializer.LimitedCountListParser(tdu.Data, tdu.PayloadOffset, tdu.PayloadLength, Instance.Warehouse, 2); var rid = Convert.ToUInt32(args[0]); var index = (byte)args[1]; FetchResource(rid, null).Then(r => { var pt = r.Instance.Definition.GetPropertyDefByIndex(index); if (pt == null) return; Codec.ParseAsync(tdu.Data, valueOffset, this, null).Then(pr => { if (pr.Value is AsyncReply asyncReply) { var item = new AsyncReply(); _queue.Add(item); asyncReply.Then((result) => { item.Trigger(new EpResourceQueueItem((EpResource)r, EpResourceQueueItem.DistributedResourceQueueItemType.Propery, result, index)); }); } else { _queue.Add(new AsyncReply(new EpResourceQueueItem((EpResource)r, EpResourceQueueItem.DistributedResourceQueueItemType.Propery, pr.Value, index))); } }).Error((ex) => { //.Error(x => SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError)); throw ex; }); }); } void EpNotificationEventOccurred(PlainTdu tdu) { // resourceId, index, value var (valueOffset, valueSize, args) = DataDeserializer.LimitedCountListParser(tdu.Data, tdu.PayloadOffset, tdu.PayloadLength, Instance.Warehouse, 2); var resourceId = Convert.ToUInt32(args[0]); var index = (byte)args[1]; FetchResource(resourceId, null).Then(r => { var et = r.Instance.Definition.GetEventDefByIndex(index); if (et == null) // this should never happen return; // push to the queue to guarantee serialization var item = new AsyncReply(); _queue.Add(item); Codec.ParseAsync(tdu.Data, valueOffset, this, null).Then(pr => { if (pr.Value is AsyncReply asyncReply) { asyncReply.Then((result) => { item.Trigger(new EpResourceQueueItem((EpResource)r, EpResourceQueueItem.DistributedResourceQueueItemType.Event, result, index)); }); } else { item.Trigger(new EpResourceQueueItem((EpResource)r, EpResourceQueueItem.DistributedResourceQueueItemType.Event, pr.Value, index)); } }).Error((ex) => throw ex); // @TODO: Send general error //.Error(x => SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError)); }); } void EpEventRenamed(uint resourceId, string name) { FetchResource(resourceId, null).Then(resource => { resource.Instance.Variables["name"] = name; }); } void EpRequestAttachResource(uint callback, PlainTdu tdu) { var value = Codec.ParseSync(tdu, Instance.Warehouse); var resourceId = Convert.ToUInt32(value); Instance.Warehouse.GetById(resourceId).Then((res) => { if (res != null) { if (res.Instance.Applicable(_session, ActionType.Attach, null) == Ruling.Denied) { SendError(ErrorType.Management, callback, 6); return; } var r = res as IResource; // unsubscribe Unsubscribe(r); // reply ok SendReply(EpPacketReply.Completed, callback, r.Instance.Definition.Id, r.Instance.Age, r.Instance.Link, r.Instance.Hops, r.Instance.Serialize()); // subscribe Subscribe(r); } else { // reply failed Global.Log("EpConnection", LogType.Debug, "Not found " + resourceId); SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); } }); } void EpRequestReattachResource(uint callback, PlainTdu tdu) { // resourceId, index, value var (valueOffset, valueSize, args) = DataDeserializer.LimitedCountListParser(tdu.Data, tdu.PayloadOffset, tdu.PayloadLength, Instance.Warehouse, 2); var resourceId = Convert.ToUInt32(args[0]); var age = (ulong)args[1]; Instance.Warehouse.GetById(resourceId).Then((res) => { if (res != null) { if (res.Instance.Applicable(_session, ActionType.Attach, null) == Ruling.Denied) { SendError(ErrorType.Management, callback, 6); return; } var r = res as IResource; // unsubscribe Unsubscribe(r); // reply ok SendReply(EpPacketReply.Completed, callback, r.Instance.Definition.Id, r.Instance.Age, r.Instance.Link, r.Instance.Hops, r.Instance.SerializeAfter(age)); // subscribe Subscribe(r); } else { // reply failed Global.Log("EpConnection", LogType.Debug, "Not found " + resourceId); SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); } }); } void EpRequestDetachResource(uint callback, PlainTdu tdu) { var value = Codec.ParseSync(tdu, Instance.Warehouse); var resourceId = Convert.ToUInt32(value); Instance.Warehouse.GetById(resourceId).Then((res) => { if (res != null) { // unsubscribe Unsubscribe(res); // remove from cache _cache.Remove(res); // remove from attached resources //attachedResources.Remove(res); // reply ok SendReply(EpPacketReply.Completed, callback); } else { // reply failed SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); } }); } void EpRequestCreateResource(uint callback, PlainTdu tdu) { Codec.ParseAsync(tdu.Data, tdu.TduOffset, this, null).Then(pr => { var args = (object[])pr.Value; var path = (string)args[0]; TypeDef typeDef = null; if (args[1] is uint || args[1] is byte || args[1] is ushort) // @TODO: this is a mess, we should have a better way to distinguish between type id and name typeDef = Instance.Warehouse.GetLocalTypeDefById(Convert.ToUInt64(args[1])); else if (args[1] is string) typeDef = Instance.Warehouse.GetLocalTypeDefByName((string)args[1]); if (typeDef == null || typeDef is not LocalTypeDef) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ClassNotFound); return; } var localTypeDef = typeDef as LocalTypeDef; var props = (Map)((object[])args)[2]; var attrs = (Map)((object[])args)[3]; // Get store var sc = path.Split('/'); Instance.Warehouse.Get(string.Join("/", sc.Take(sc.Length - 1))) .Then(r => { if (r == null) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.StoreNotFound); return; } var store = r.Instance.Store; // check security if (store.Instance.Applicable(_session, ActionType.CreateResource, null) != Ruling.Allowed) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.CreateDenied); return; } Instance.Warehouse.New(localTypeDef.DefinedType, path, new ResourceContext(0, attrs, props.Select(x => new KeyValuePair (localTypeDef.GetPropertyDefByIndex(x.Key).Name, x.Value)), null)) .Then(resource => { SendReply(EpPacketReply.Completed, callback, resource.Instance.Id); }).Error(e => { SendError(e.Type, callback, (ushort)e.Code, e.Message); }); }).Error(e => { SendError(e.Type, callback, (ushort)e.Code, e.Message); }); }).Error(x => SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError)); } void EpRequestDeleteResource(uint callback, PlainTdu tdu) { var value = Codec.ParseSync(tdu, Instance.Warehouse); var resourceId = Convert.ToUInt32(value); Instance.Warehouse.GetById(resourceId).Then(r => { if (r == null) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); return; } if (r.Instance.Store.Instance.Applicable(_session, ActionType.Delete, null) != Ruling.Allowed) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.DeleteDenied); return; } if (Instance.Warehouse.Remove(r)) SendReply(EpPacketReply.Completed, callback); else SendError(ErrorType.Management, callback, (ushort)ExceptionCode.DeleteFailed); }); } void EpRequestMoveResource(uint callback, PlainTdu tdu) { var (offset, length, args) = DataDeserializer.LimitedCountListParser(tdu.Data, tdu.PayloadOffset, tdu.PayloadLength, Instance.Warehouse); var resourceId = Convert.ToUInt32(args[0]); var name = (string)args[1]; if (name.Contains("/")) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.NotSupported); return; } Instance.Warehouse.GetById(resourceId).Then(resource => { if (resource == null) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); return; } if (resource.Instance.Applicable(this._session, ActionType.Rename, null) != Ruling.Allowed) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.RenameDenied); return; } resource.Instance.Name = name; SendReply(EpPacketReply.Completed, callback); }); } void EpRequestToken(uint callback, PlainTdu tdu) { // @TODO: To be implemented } void EpRequestLinkTypeDefs(uint callback, PlainTdu tdu) { var value = Codec.ParseSync(tdu, Instance.Warehouse); var resourceLink = (string)value; Action queryCallback = (r) => { if (r == null) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); return; } if (r.Instance.Applicable(_session, ActionType.ViewTypeDef, null) == Ruling.Denied) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.NotAllowed); return; } // make sure the resource is a local type def. if (r.Instance.Definition is LocalTypeDef localTypeDef) { var typeDefs = LocalTypeDef.GetDependencies(localTypeDef, Instance.Warehouse); // Send SendReply(EpPacketReply.Completed, callback, typeDefs.Select(x => x.Compose(this)).ToArray()); } else { // @TODO: Add support for remote type defs SendError(ErrorType.Management, callback, (ushort)ExceptionCode.NotSupported); } }; if (Server?.EntryPoint != null) Server.EntryPoint.Query(resourceLink, this).Then(queryCallback); else Instance.Warehouse.Query(resourceLink).Then(queryCallback); } void EpRequestTypeDefByName(uint callback, PlainTdu tdu) { var value = Codec.ParseSync(tdu, Instance.Warehouse); var className = (string)value; var typeDef = Instance.Warehouse.GetRemoteTypeDefByName(_remoteDomain, className); if (typeDef != null) { SendReply(EpPacketReply.Completed, callback, typeDef.Compose(this)); } else { // reply failed SendError(ErrorType.Management, callback, (ushort)ExceptionCode.TypeDefNotFound); } } void EpRequestTypeDefById(uint callback, PlainTdu tdu) { var value = Codec.ParseSync(tdu, Instance.Warehouse); var typeId = Convert.ToUInt32(value); var t = Instance.Warehouse.GetLocalTypeDefById(typeId); if (t != null) { SendReply(EpPacketReply.Completed, callback, t.Compose(this)); } else { // reply failed SendError(ErrorType.Management, callback, (ushort)ExceptionCode.TypeDefNotFound); } } void EpRequestTypeDefByResourceId(uint callback, PlainTdu tdu) { var value = Codec.ParseSync(tdu, Instance.Warehouse); var resourceId = Convert.ToUInt32(value); Instance.Warehouse.GetById(resourceId).Then((r) => { if (r != null) { SendReply(EpPacketReply.Completed, callback, r.Instance.Definition.Compose(this)); } else { // reply failed SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); } }); } void EpRequestGetResourceIdByLink(uint callback, PlainTdu tdu) { var parsed = Codec.ParseSync(tdu, Instance.Warehouse); var resourceLink = (string)parsed; Action queryCallback = (r) => { if (r == null) SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); else { if (r.Instance.Applicable(_session, ActionType.Attach, null) == Ruling.Denied) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); return; } SendReply(EpPacketReply.Completed, callback, r); } }; if (Server?.EntryPoint != null) Server.EntryPoint.Query(resourceLink, this).Then(queryCallback); else Instance.Warehouse.Query(resourceLink).Then(queryCallback); } void EpRequestQueryResources(uint callback, PlainTdu tdu) { var parsed = Codec.ParseSync(tdu, Instance.Warehouse); var resourceLink = (string)parsed; Action queryCallback = (r) => { if (r == null) SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); else { if (r.Instance.Applicable(_session, ActionType.Attach, null) == Ruling.Denied) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.NotAllowed); return; } r.Instance.Children().Then(children => { var list = children.Where(x => x.Instance.Applicable(_session, ActionType.Attach, null) != Ruling.Denied).ToArray(); SendReply(EpPacketReply.Completed, callback, list); }).Error(e => { SendError(e.Type, callback, (ushort)e.Code, e.Message); }); } }; if (Server?.EntryPoint != null) Server.EntryPoint.Query(resourceLink, this) .Then(queryCallback) .Error(e => SendError(e.Type, callback, (ushort)e.Code, e.Message)); else Instance.Warehouse.Query(resourceLink) .Then(queryCallback) .Error(e => SendError(e.Type, callback, (ushort)e.Code, e.Message)); } void EpRequestResourceAttribute(uint callback, uint resourceId) { } private Tuple SummerizeException(Exception ex) { ex = ex.InnerException != null ? ex.InnerException : ex; var code = (ExceptionLevel & ExceptionLevel.Code) == 0 ? 0 : ex is AsyncException ae ? ae.Code : 0; var msg = (ExceptionLevel & ExceptionLevel.Message) == 0 ? "" : ex.Message; var source = (ExceptionLevel & ExceptionLevel.Source) == 0 ? "" : ex.Source; var trace = (ExceptionLevel & ExceptionLevel.Trace) == 0 ? "" : ex.StackTrace; return new Tuple((ushort)code, $"{source}: {msg}\n{trace}"); } void EpRequestProcedureCall(uint callback, PlainTdu tdu) { var (offset, length, args) = DataDeserializer.LimitedCountListParser(tdu.Data, tdu.PayloadOffset, tdu.PayloadLength, Instance.Warehouse, 1); var procedureCall = (string)args[0]; if (Server == null) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.NotSupported); return; } var call = Server.Calls[procedureCall]; if (call == null) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); return; } Codec.ParseAsync(tdu.Data, offset, this, null).Then(pr => { if (pr.Value is AsyncReply reply) { reply.Then(results => { //var arguments = (Map)results; // un hold the socket to send data immediately this.Socket.Unhold(); // @TODO: Make managers for procedure calls //if (r.Instance.Applicable(session, ActionType.Execute, ft) == Ruling.Denied) //{ // SendError(ErrorType.Management, callback, // (ushort)ExceptionCode.InvokeDenied); // return; //} InvokeFunction(call.Value.Definition, callback, results, EpPacketRequest.ProcedureCall, call.Value.Delegate.Target); }).Error(x => { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError); }); } else { //var arguments = (Map)parsed; // un hold the socket to send data immediately this.Socket.Unhold(); // @TODO: Make managers for procedure calls InvokeFunction(call.Value.Definition, callback, pr.Value, EpPacketRequest.ProcedureCall, call.Value.Delegate.Target); } }).Error(x => SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError)); } void EpRequestStaticCall(uint callback, PlainTdu tdu) { var (offset, length, args) = DataDeserializer.LimitedCountListParser(tdu.Data, tdu.PayloadOffset, tdu.PayloadLength, Instance.Warehouse, 2); var typeId = Convert.ToUInt32(args[0]); var index = (byte)args[1]; var typeDef = Instance.Warehouse.GetLocalTypeDefById(typeId); if (typeDef == null) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.TypeDefNotFound); return; } var fd = typeDef.GetFunctionDefByIndex(index); if (fd == null) { // no function at this index SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); return; } var fi = fd.MethodInfo; if (fi == null) { // ft found, fi not found, this should never happen SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); return; } Codec.ParseAsync(tdu.Data, offset, this, null).Then(pr => { if (pr.Value is AsyncReply reply) { reply.Then(results => { //var arguments = (Map)results; // un hold the socket to send data immediately this.Socket.Unhold(); // @TODO: Make managers for static calls //if (r.Instance.Applicable(session, ActionType.Execute, ft) == Ruling.Denied) //{ // SendError(ErrorType.Management, callback, // (ushort)ExceptionCode.InvokeDenied); // return; //} InvokeFunction(fd, callback, results, EpPacketRequest.StaticCall, null); }).Error(x => { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError); }); } else { //var arguments = (Map)parsed; // un hold the socket to send data immediately this.Socket.Unhold(); // @TODO: Make managers for static calls InvokeFunction(fd, callback, pr.Value, EpPacketRequest.StaticCall, null); } }).Error(x => SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError)); } void EpRequestInvokeFunction(uint callback, PlainTdu tdu) { var (offset, length, args) = DataDeserializer.LimitedCountListParser(tdu.Data, tdu.PayloadOffset, tdu.PayloadLength, Instance.Warehouse, 2); var resourceId = Convert.ToUInt32(args[0]); var index = (byte)args[1]; Instance.Warehouse.GetById(resourceId).Then((r) => { if (r == null) { // no resource with this id SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); return; } var ft = r.Instance.Definition.GetFunctionDefByIndex(index); if (ft == null) { // no function at this index SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); return; } Codec.ParseAsync(tdu.Data, offset, this, null).Then(pr => { if (pr.Value is AsyncReply asyncReply) { asyncReply.Then(result => { // var arguments = result; // un hold the socket to send data immediately this.Socket.Unhold(); if (r is EpResource) { var rt = (r as EpResource)._Invoke(index, result); if (rt != null) { rt.Then(res => { SendReply(EpPacketReply.Completed, callback, res); }); } else { // function not found on a distributed object SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); } } else { if (r.Instance.Applicable(_session, ActionType.Execute, ft) == Ruling.Denied) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.InvokeDenied); return; } InvokeFunction(ft, callback, result, EpPacketRequest.InvokeFunction, r); } }); } else { //var arguments = (Map)parsed; // un hold the socket to send data immediately this.Socket.Unhold(); if (r is EpResource) { var rt = (r as EpResource)._Invoke(index, pr.Value); if (rt != null) { rt.Then(res => { SendReply(EpPacketReply.Completed, callback, res); }); } else { // function not found on a distributed object SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); } } else { if (r.Instance.Applicable(_session, ActionType.Execute, ft) == Ruling.Denied) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.InvokeDenied); return; } InvokeFunction(ft, callback, pr.Value, EpPacketRequest.InvokeFunction, r); } } }).Error(x => SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError)); ; }).Error(x => SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError)); ; } void InvokeFunction(FunctionDef ft, uint callback, object arguments, EpPacketRequest actionType, object target = null) { // cast arguments ParameterInfo[] pis = ft.MethodInfo.GetParameters(); object[] args = new object[pis.Length]; InvocationContext context = null; if (pis.Length > 0) { if (pis.Last().ParameterType == typeof(EpConnection)) { if (arguments is Map indexedArguments) { for (byte i = 0; i < pis.Length - 1; i++) { if (indexedArguments.ContainsKey(i)) args[i] = RuntimeCaster.Cast(indexedArguments[i], pis[i].ParameterType); else if (ft.Arguments[i].Type.Nullable) args[i] = null; else args[i] = Type.Missing; } } else if (arguments is object[] arrayArguments) { for (var i = 0; (i < arrayArguments.Length) && (i < pis.Length - 1); i++) { args[i] = RuntimeCaster.Cast(arrayArguments[i], pis[i].ParameterType); } for (var i = arrayArguments.Length; i < pis.Length - 1; i++) { args[i] = Type.Missing; } } else { // assume first argument // Note: if object[] is intended, sender should send nest it withing object[] { object[] } if (pis.Length > 1) args[0] = RuntimeCaster.Cast(arguments, pis[0].ParameterType); } args[args.Length - 1] = this; } else if (pis.Last().ParameterType == typeof(InvocationContext)) { context = new InvocationContext(this, callback); if (arguments is Map indexedArguments) { for (byte i = 0; i < pis.Length - 1; i++) { if (indexedArguments.ContainsKey(i)) args[i] = RuntimeCaster.Cast(indexedArguments[i], pis[i].ParameterType); else if (ft.Arguments[i].Type.Nullable) args[i] = null; else args[i] = Type.Missing; } } else if (arguments is object[] arrayArguments) { for (var i = 0; (i < arrayArguments.Length) && (i < pis.Length - 1); i++) { args[i] = RuntimeCaster.Cast(arrayArguments[i], pis[i].ParameterType); } for (var i = arrayArguments.Length; i < pis.Length - 1; i++) { args[i] = Type.Missing; } } else { // assume first argument // Note: if object[] is intended, sender should send nest it withing object[] { object[] } if (pis.Length > 1) args[0] = RuntimeCaster.Cast(arguments, pis[0].ParameterType); //throw new NotImplementedException("Arguments type not supported."); } args[args.Length - 1] = context; } else { if (arguments is Map indexedArguments) { for (byte i = 0; i < pis.Length; i++) { if (indexedArguments.ContainsKey(i)) args[i] = RuntimeCaster.Cast(indexedArguments[i], pis[i].ParameterType); else if (ft.Arguments[i].Type.Nullable) //Nullable.GetUnderlyingType(pis[i].ParameterType) != null) args[i] = null; else args[i] = Type.Missing; } } else if (arguments is object[] arrayArguments) { for (var i = 0; (i < arrayArguments.Length) && (i < pis.Length); i++) { args[i] = RuntimeCaster.Cast(arrayArguments[i], pis[i].ParameterType); } for (var i = arrayArguments.Length; i < pis.Length; i++) { args[i] = Type.Missing; } } else { // assume first argument // Note: if object[] is intended, sender should send nest it withing object[] { object[] } if (pis.Length > 0) args[0] = RuntimeCaster.Cast(arguments, pis[0].ParameterType); } } } object rt; try { rt = ft.MethodInfo.Invoke(target, args); } catch (Exception ex) { var (code, msg) = SummerizeException(ex); msg = "Arguments: " + string.Join(", ", args.Select(x => x?.ToString() ?? "[Null]").ToArray()) + "\r\n" + msg; SendError(ErrorType.Exception, callback, code, msg); return; } if (rt is IAsyncEnumerable) { var enu = rt as IAsyncEnumerable; var enumerator = enu.GetAsyncEnumerator(); Task.Run(async () => { try { while (await enumerator.MoveNextAsync()) { var v = enumerator.Current; SendChunk(callback, v); } SendReply(EpPacketReply.Completed, callback); if (context != null) context.Ended = true; } catch (Exception ex) { if (context != null) context.Ended = true; var (code, msg) = SummerizeException(ex); SendError(ErrorType.Exception, callback, code, msg); } }); } else if (rt is System.Collections.IEnumerable && !(rt is Array || rt is Map || rt is string)) { var enu = rt as System.Collections.IEnumerable; try { foreach (var v in enu) SendChunk(callback, v); SendReply(EpPacketReply.Completed, callback); if (context != null) context.Ended = true; } catch (Exception ex) { if (context != null) context.Ended = true; var (code, msg) = SummerizeException(ex); SendError(ErrorType.Exception, callback, code, msg); } } else if (rt is Task) { (rt as Task).ContinueWith(t => { if (context != null) context.Ended = true; #if NETSTANDARD var res = t.GetType().GetTypeInfo().GetProperty("Result").GetValue(t); #else var res = t.GetType().GetProperty("Result").GetValue(t); #endif SendReply(EpPacketReply.Completed, callback, res); }); } else if (rt is AsyncReply) { (rt as AsyncReply).Then(res => { if (context != null) context.Ended = true; SendReply(EpPacketReply.Completed, callback, res); }).Error(ex => { var (code, msg) = SummerizeException(ex); SendError(ErrorType.Exception, callback, code, msg); }).Progress((pt, pv, pm) => { SendProgress(callback, pv, pm); }).Chunk(v => { SendChunk(callback, v); }).Warning((level, message) => { SendError(ErrorType.Warning, callback, level, message); }); } else { if (context != null) context.Ended = true; SendReply(EpPacketReply.Completed, callback, rt); } } void EpRequestSubscribe(uint callback, PlainTdu tdu) { var (offset, length, args) = DataDeserializer.LimitedCountListParser(tdu.Data, tdu.PayloadOffset, tdu.PayloadLength, Instance.Warehouse); var resourceId = Convert.ToUInt32(args[0]); var index = (byte)args[1]; Instance.Warehouse.GetById(resourceId).Then((r) => { if (r == null) { // resource not found SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); return; } var et = r.Instance.Definition.GetEventDefByIndex(index); if (et != null) { // et not found SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); return; } if (r is EpResource) { (r as EpResource).Subscribe(et).Then(x => { SendReply(EpPacketReply.Completed, callback); }).Error(x => SendError(ErrorType.Exception, callback, (ushort)ExceptionCode.GeneralFailure)); } else { lock (_subscriptionsLock) { if (!_subscriptions.ContainsKey(r)) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.NotAttached); return; } if (_subscriptions[r].Contains(index)) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.AlreadyListened); return; } _subscriptions[r].Add(index); SendReply(EpPacketReply.Completed, callback); } } }); } void EpRequestUnsubscribe(uint callback, PlainTdu tdu) { var (offset, length, args) = DataDeserializer.LimitedCountListParser(tdu.Data, tdu.PayloadOffset, tdu.PayloadLength, Instance.Warehouse); var resourceId = Convert.ToUInt32(args[0]); var index = (byte)args[1]; Instance.Warehouse.GetById(resourceId).Then((r) => { if (r == null) { // resource not found SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); return; } var et = r.Instance.Definition.GetEventDefByIndex(index); if (et == null) { // pt not found SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); return; } if (r is EpResource) { (r as EpResource).Unsubscribe(et).Then(x => { SendReply(EpPacketReply.Completed, callback); }).Error(x => SendError(ErrorType.Exception, callback, (ushort)ExceptionCode.GeneralFailure)); } else { lock (_subscriptionsLock) { if (!_subscriptions.ContainsKey(r)) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.NotAttached); return; } if (!_subscriptions[r].Contains(index)) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.AlreadyUnsubscribed); return; } _subscriptions[r].Remove(index); SendReply(EpPacketReply.Completed, callback); } } }); } void EpRequestSetProperty(uint callback, PlainTdu tdu) { var (offset, length, args) = DataDeserializer.LimitedCountListParser(tdu.Data, tdu.PayloadOffset, tdu.PayloadLength, Instance.Warehouse, 2); var rid = (uint)args[0]; var index = (byte)args[1]; // un hold the socket to send data immediately this.Socket.Unhold(); Instance.Warehouse.GetById(rid).Then((r) => { if (r == null) { // resource not found SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); return; } var pt = r.Instance.Definition.GetPropertyDefByIndex(index); if (pt != null) { // property not found SendError(ErrorType.Management, callback, (ushort)ExceptionCode.PropertyNotFound); return; } if (r is IDynamicResource dynamicResource) { Codec.ParseAsync(tdu.Data, offset, this, null).Then(pr => { if (pr.Value is AsyncReply asyncReply) { asyncReply.Then((value) => { // propagation dynamicResource.SetResourcePropertyAsync(index, value).Then((x) => { SendReply(EpPacketReply.Completed, callback); }).Error(x => { SendError(x.Type, callback, (ushort)x.Code, x.Message); }); }); } else { // propagation dynamicResource.SetResourcePropertyAsync(index, pr.Value).Then((x) => { SendReply(EpPacketReply.Completed, callback); }).Error(x => { SendError(x.Type, callback, (ushort)x.Code, x.Message); }); } }).Error(x => SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError)); ; } else { var pi = pt.PropertyInfo; if (pi == null) { // pt found, pi not found, this should never happen SendError(ErrorType.Management, callback, (ushort)ExceptionCode.PropertyNotFound); return; } if (r.Instance.Applicable(_session, ActionType.SetProperty, pt, this) == Ruling.Denied) { SendError(ErrorType.Exception, callback, (ushort)ExceptionCode.SetPropertyDenied); return; } if (!pi.CanWrite) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ReadOnlyProperty); return; } Codec.ParseAsync(tdu.Data, offset, this, null).Then(pr => { if (pr.Value is AsyncReply asyncReply) { asyncReply.Then((value) => { if (pi.PropertyType.IsGenericType && pi.PropertyType.GetGenericTypeDefinition() == typeof(PropertyContext<>)) { value = Activator.CreateInstance(pi.PropertyType, this, value); } else { // cast new value type to property type value = RuntimeCaster.Cast(value, pi.PropertyType); } try { pi.SetValue(r, value); SendReply(EpPacketReply.Completed, callback); } catch (Exception ex) { SendError(ErrorType.Exception, callback, 0, ex.Message); } }); } else { var value = pr.Value; if (pi.PropertyType.IsGenericType && pi.PropertyType.GetGenericTypeDefinition() == typeof(PropertyContext<>)) { value = Activator.CreateInstance(pi.PropertyType, this, value); //value = new DistributedPropertyContext(this, value); } else { // cast new value type to property type value = RuntimeCaster.Cast(value, pi.PropertyType); } try { pi.SetValue(r, value); SendReply(EpPacketReply.Completed, callback); } catch (Exception ex) { SendError(ErrorType.Exception, callback, 0, ex.Message); } } }).Error(x => SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError)); } }); } /// /// Get the TypeDef for a given type Id. /// /// Type UUID. /// TypeDef. //public AsyncReply GetTypeDefById(ulong typeId) //{ // lock (_typeDefsLock) // { // if (_remoteTypeDefs.ContainsKey(typeId)) // return new AsyncReply(_remoteTypeDefs[typeId]); // else if (_typeDefsByIdRequests.ContainsKey(typeId)) // return _typeDefsByIdRequests[typeId]; // var reply = new AsyncReply(); // _typeDefsByIdRequests.Add(typeId, reply); // SendRequest(EpPacketRequest.TypeDefById, typeId) // .Then((result) => // { // // @TODO: Solve for dependency deadlock // RemoteTypeDef.Parse(_remoteDomain, (byte[])result, this).Then(td => // { // _typeDefsByIdRequests.Remove(typeId); // _remoteTypeDefs.Add(td.Id, td); // // register all remote TypeDefs to warehouse to be used in future parsing before the actual request for them arrives. // Instance.Warehouse.TryRegisterRemoteTypeDef(_remoteDomain, td); // reply.Trigger(td); // }); // }).Error((ex) => // { // reply.TriggerError(ex); // }); // return reply; // } //} //public AsyncReply GetTypeDefByName(string typeName) //{ // lock (_typeDefsLock) // { // var typeDef = _remoteTypeDefs.Values.FirstOrDefault(x => x.Name == typeName); // if (typeDef != null) // return new AsyncReply(typeDef); // if (_typeDefsByNameRequests.ContainsKey(typeName)) // return _typeDefsByNameRequests[typeName]; // var reply = new AsyncReply(); // _typeDefsByNameRequests.Add(typeName, reply); // SendRequest(EpPacketRequest.TypeDefByName, typeName) // .Then((result) => // { // RemoteTypeDef.Parse(_remoteDomain, (byte[])result, this).Then(td => // { // _typeDefsByNameRequests.Remove(typeName); // _remoteTypeDefs.Add(td.Id, td); // Instance.Warehouse.TryRegisterRemoteTypeDef(_remoteDomain, td); // reply.Trigger(td); // }); // }).Error((ex) => // { // reply.TriggerError(ex); // }); // return reply; // } //} // IStore interface /// /// Get a resource by its path. /// /// Path to the resource. /// Resource public AsyncReply Get(string path) { var rt = new AsyncReply(); var req = SendRequest(EpPacketRequest.GetResourceIdByLink, path); req.Then(result => { rt.Trigger(result); }).Error(ex => rt.TriggerError(ex)); //Query(path).Then(ar => //{ // //if (filter != null) // // ar = ar?.Where(filter).ToArray(); // // MISSING: should dispatch the unused resources. // if (ar?.Length > 0) // rt.Trigger(ar[0]); // else // rt.Trigger(null); //}).Error(ex => rt.TriggerError(ex)); return rt; } public AsyncReply GetLinkDefinitions(string link) { throw new NotImplementedException(); //var reply = new AsyncReply(); //SendRequest(EpPacketRequest.LinkTypeDefs, link) //.Then((result) => //{ // var defs = new List(); // foreach (var def in (byte[][])result) // { // defs.Add(RemoteTypeDef.Parse(_remoteDomain, def)); // } // reply.Trigger(defs.ToArray()); //}).Error((ex) => //{ // reply.TriggerError(ex); //}); //return reply; } /// /// Fetch a resource from the other end /// /// Resource Id /// DistributedResource /// //object fetchResourceLock = new object(); public AsyncReply FetchResource(uint id, uint[] requestSequence) { //lock (fetchLock) //{ EpResource resource = null; _attachedResources[id]?.TryGetTarget(out resource); if (resource != null) return new AsyncReply(resource); resource = _neededResources[id]; var requestInfo = _resourceRequests[id]; if (requestInfo != null) { if (resource != null && (requestSequence?.Contains(id) ?? false)) { // dead lock avoidance for loop reference. return new AsyncReply(resource); } else if (resource != null && requestInfo.RequestSequence.Contains(id)) { // dead lock avoidance for dependent reference. return new AsyncReply(resource); } else { return requestInfo.Reply; } } else if (resource != null && !resource.ResourceSuspended) { // @REVIEW: this should never happen Global.Log("DCON", LogType.Error, "Resource not moved to attached."); return new AsyncReply(resource); } var newSequence = requestSequence != null ? requestSequence.Concat(new uint[] { id }).ToArray() : new uint[] { id }; var reply = new AsyncReply(); _resourceRequests.Add(id, new FetchRequestInfo(reply, newSequence)); SendRequest(EpPacketRequest.AttachResource, id) .Then((result) => { if (result == null) { reply.TriggerError(new AsyncException(ErrorType.Management, (ushort)ExceptionCode.ResourceNotFound, "Null response")); return; } // TypeId, Age, Link, Hops, PropertyValue[] var args = (object[])result; var typeId = Convert.ToUInt32(args[0]); var age = Convert.ToUInt64(args[1]); var link = (string)args[2]; var hops = (byte)args[3]; var pvData = (byte[])args[4]; var typeDef = resource != null ? resource.Instance.Definition as RemoteTypeDef : Instance.Warehouse.GetRemoteTypeDefById( _remoteDomain, typeId ); var initResource = (EpResource dr) => { var parsedReply = DataDeserializer.PropertyValueArrayParserAsync(pvData, 0, (uint)pvData.Length, this, newSequence); parsedReply.Then(results => { var pvs = results as PropertyValue[]; dr._Attach(pvs); _resourceRequests.Remove(id); // move from needed to attached. _neededResources.Remove(id); _attachedResources[id] = new WeakReference(dr); reply.Trigger(dr); }).Error(ex => reply.TriggerError(ex)); }; if (typeDef == null) { FetchTypeDef(typeId, null).Then((td) => { // typeId, ResourceAge, ResourceLink, Content if (resource == null) { if (td.ProxyType != null) resource = Activator.CreateInstance(td.ProxyType, this, id, Convert.ToUInt64(args[1]), (string)args[2]) as EpResource; else resource = new EpResource(this, id, Convert.ToUInt64(args[1]), (string)args[2]); resource.ResourceDefinition = td; typeDef = td; Instance.Warehouse.Put(Instance.Link + "/" + id.ToString(), resource) .Then(initResource) .Error(ex => reply.TriggerError(ex)); } else { initResource(resource); } }).Error((ex) => { reply.TriggerError(ex); }); } else { if (resource == null) { if (typeDef.ProxyType != null) resource = Activator.CreateInstance(typeDef.ProxyType, this, id, Convert.ToUInt64(args[1]), (string)args[2]) as EpResource; else resource = new EpResource(this, id, Convert.ToUInt64(args[1]), (string)args[2]); resource.ResourceDefinition = typeDef; Instance.Warehouse.Put(this.Instance.Link + "/" + id.ToString(), resource) .Then(initResource).Error((ex) => reply.TriggerError(ex)); } else { initResource(resource); } } }).Error((ex) => { reply.TriggerError(ex); }); return reply; //} } /// /// Fetch a resource from the other end /// /// Resource Id /// DistributedResource /// //object fetchResourceLock = new object(); public AsyncReply FetchTypeDef(ulong id, ulong[] requestSequence) { //Console.WriteLine($"Fetching typedef {id}"); RemoteTypeDef typeDef = _cachedTypeDefs[id]; if (typeDef != null) return new AsyncReply(typeDef); typeDef = _neededTypeDefs[id]; var requestInfo = _typeDefRequests[id]; if (requestInfo != null) { if (typeDef != null && (requestSequence?.Contains(id) ?? false)) { // dead lock avoidance for loop reference. return new AsyncReply(typeDef); } else if (typeDef != null && requestInfo.RequestSequence.Contains(id)) { // dead lock avoidance for dependent reference. return new AsyncReply(typeDef); } else { return requestInfo.Reply; } } var newSequence = requestSequence != null ? requestSequence.Concat(new ulong[] { id }).ToArray() : new ulong[] { id }; var reply = new AsyncReply(); _typeDefRequests.Add(id, new FetchRequestInfo(reply, newSequence)); SendRequest(EpPacketRequest.TypeDefById, id) .Then((result) => { if (result == null) { reply.TriggerError(new AsyncException(ErrorType.Management, (ushort)ExceptionCode.ResourceNotFound, "Null response")); return; } // TypeDef Data //var args = (object[])result; var typeDefData = (byte[])result; var od = new RemoteTypeDef(); _neededTypeDefs[id] = od; RemoteTypeDef.Parse(od, this.RemoteDomain, typeDefData, this, newSequence).Then(td => { _typeDefRequests.Remove(id); // move from needed to attached. _neededTypeDefs.Remove(id); _cachedTypeDefs[id] = td; reply.Trigger(td); }).Error(reply.TriggerError); }).Error(reply.TriggerError); return reply; } /// /// Query resources at specific link. /// /// Link path. /// public AsyncReply Query(string path) { var str = DC.ToBytes(path); var reply = new AsyncReply(); SendRequest(EpPacketRequest.Query, path) .Then(result => { reply.Trigger((IResource[])result); }).Error(ex => reply.TriggerError(ex)); return reply; } /// /// Create a new resource. /// /// Resource path. /// Type definition. /// Values for the resource properties. /// Resource attributes. /// New resource instance public AsyncReply Create(string path, TypeDef type, Map properties, Map attributes) { var reply = new AsyncReply(); SendRequest(EpPacketRequest.CreateResource, path, type.Id, type.CastProperties(properties), attributes) .Then(r => reply.Trigger((EpResource)r)) .Warning((l, m) => reply.TriggerWarning(l, m)) .Error(e => reply.TriggerError(e)); return reply; } private void Subscribe(IResource resource) { lock (_subscriptionsLock) { resource.Instance.EventOccurred += Instance_EventOccurred; resource.Instance.CustomEventOccurred += Instance_CustomEventOccurred; resource.Instance.PropertyModified += Instance_PropertyModified; resource.Instance.Destroyed += Instance_ResourceDestroyed; _subscriptions.Add(resource, new List()); } } private void Unsubscribe(IResource resource) { lock (_subscriptionsLock) { // do something with the list... resource.Instance.EventOccurred -= Instance_EventOccurred; resource.Instance.CustomEventOccurred -= Instance_CustomEventOccurred; resource.Instance.PropertyModified -= Instance_PropertyModified; resource.Instance.Destroyed -= Instance_ResourceDestroyed; _subscriptions.Remove(resource); } } private void UnsubscribeAll() { lock (_subscriptionsLock) { foreach (var resource in _subscriptions.Keys) { resource.Instance.EventOccurred -= Instance_EventOccurred; resource.Instance.CustomEventOccurred -= Instance_CustomEventOccurred; resource.Instance.PropertyModified -= Instance_PropertyModified; resource.Instance.Destroyed -= Instance_ResourceDestroyed; } _subscriptions.Clear(); } } private void Instance_ResourceDestroyed(IResource resource) { Unsubscribe(resource); // compose the packet SendNotification(EpPacketNotification.ResourceDestroyed, resource.Instance.Id); } private void Instance_PropertyModified(PropertyModificationInfo info) { SendNotification(EpPacketNotification.PropertyModified, info.Resource.Instance.Id, info.PropertyDef.Index, info.Value); } private void Instance_CustomEventOccurred(CustomEventOccurredInfo info) { if (info.EventDef.Subscribable) { lock (_subscriptionsLock) { // check the client requested listen if (!_subscriptions.ContainsKey(info.Resource)) return; if (!_subscriptions[info.Resource].Contains(info.EventDef.Index)) return; } } if (!info.Receivers(_session)) return; if (info.Resource.Instance.Applicable(_session, ActionType.ReceiveEvent, info.EventDef, info.Issuer) == Ruling.Denied) return; // compose the packet SendNotification(EpPacketNotification.EventOccurred, info.Resource.Instance.Id, info.EventDef.Index, info.Value); } private void Instance_EventOccurred(EventOccurredInfo info) { if (info.Definition.Subscribable) { lock (_subscriptionsLock) { // check the client requested listen if (!_subscriptions.ContainsKey(info.Resource)) return; if (!_subscriptions[info.Resource].Contains(info.Definition.Index)) return; } } if (info.Resource.Instance.Applicable(_session, ActionType.ReceiveEvent, info.Definition, null) == Ruling.Denied) return; // compose the packet SendNotification(EpPacketNotification.EventOccurred, info.Resource.Instance.Id, info.Definition.Index, info.Value); } void EpRequestKeepAlive(uint callback, PlainTdu tdu) { var (offset, length, args) = DataDeserializer.LimitedCountListParser(tdu.Data, tdu.PayloadOffset, tdu.PayloadLength, Instance.Warehouse); var peerTime = (DateTime)args[0]; var interval = Convert.ToUInt32(args[1]); uint jitter = 0; var now = DateTime.UtcNow; if (_lastKeepAliveReceived != null) { var diff = (uint)(now - (DateTime)_lastKeepAliveReceived).TotalMilliseconds; jitter = (uint)Math.Abs((int)diff - (int)interval); } SendReply(EpPacketReply.Completed, callback, now, jitter); _lastKeepAliveReceived = now; } }