/* Copyright (c) 2017-2025 Ahmed Kh. Zamil Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ using Esiur.Core; using Esiur.Data; using Esiur.Data.Types; using Esiur.Misc; using Esiur.Net; using Esiur.Net.Packets; using Esiur.Resource; using Esiur.Security.Authority; using Esiur.Security.Permissions; using Microsoft.CodeAnalysis.CSharp.Syntax; using System; using System.Collections; using System.Collections.Generic; using System.ComponentModel.DataAnnotations; using System.Linq; using System.Reflection; using System.Reflection.Emit; using System.Security.Cryptography.X509Certificates; using System.Text; using System.Threading.Tasks; namespace Esiur.Protocol; partial class EpConnection { KeyList _neededTypeDefs = new KeyList(); KeyList _cachedTypeDefs = new KeyList(); KeyList> _typeDefRequests = new KeyList>(); //KeyList> _typeDefsByIdRequests = new KeyList>(); KeyList _neededResources = new KeyList(); KeyList> _attachedResources = new KeyList>(); KeyList> _suspendedResources = new KeyList>(); KeyList> _resourceRequests = new KeyList>(); // Wait-for graph for in-flight resource fetches: maps a resource id to the set of in-flight // child resource ids its attachment is currently blocked on. Used to detect genuine cycles // (e.g. two concurrent fetches A<->B) so a placeholder can break the deadlock, while // independent/app-facing fetches of an in-flight resource simply wait for full attachment. readonly Dictionary> _fetchBlockedOn = new Dictionary>(); /// /// Strategy FetchResource uses for an in-flight resource. Defaults to the new wait + cycle /// detection. Selectable for experimental evaluation (see ). /// public DeadlockResolutionMode DeadlockResolution { get; set; } = DeadlockResolutionMode.WaitWithCycleDetection; // Per-connection diagnostics (free of the cross-connection contamination that the shared // Global.Counters suffer from). Used by the deadlock experiments. /// Number of resources fully attached on this connection (a monotonic progress signal). public long AttachedResourceCount { get; private set; } /// Number of wait-for-cycle breaks (placeholders returned to break a cycle) on this connection. public long CycleBreakCount { get; private set; } /// Number of placeholders returned where no genuine cycle existed (legacy resolver only). public long UnnecessaryPlaceholderCount { get; private set; } //KeyList> _typeDefsByIdRequests = new KeyList>(); //KeyList> _typeDefsByNameRequests = new KeyList>(); //Dictionary typeDefs = new Dictionary(); object _typeDefsLock = new object(); KeyList _requests = new KeyList(); volatile int _callbackCounter = 0; Dictionary> _subscriptions = new Dictionary>(); // resources might get attached by the client internal KeyList _cache = new(); object _subscriptionsLock = new object(); AsyncQueue _queue = new(); /// /// Send EP request. /// /// Packet action. /// Arguments to send. /// /// AsyncReply SendRequest(EpPacketRequest action, params object[] args) { var reply = new AsyncReply(); var c = (uint)Interlocked.Increment(ref _callbackCounter); //callbackCounter++; // avoid thread racing _requests.Add(c, reply); if (args.Length == 0) { var bl = new BinaryList(); bl.AddUInt8((byte)(0x40 | (byte)action)) .AddUInt32(c); Send(bl.ToArray()); } if (args.Length == 1) { var bl = new BinaryList(); bl.AddUInt8((byte)(0x60 | (byte)action)) .AddUInt32(c) .AddUInt8Array(Codec.Compose(args[0], this.Instance?.Warehouse ?? _serverWarehouse, this)); Send(bl.ToArray()); } else { var bl = new BinaryList(); bl.AddUInt8((byte)(0x60 | (byte)action)) .AddUInt32(c) .AddUInt8Array(Codec.Compose(args, this.Instance?.Warehouse ?? _serverWarehouse, this)); Send(bl.ToArray()); } return reply; } //void SendAuthMaterials(EpAuthPacketMethod method, AuthenticationMaterial[] authenticationMaterials) //{ // if (authenticationMaterials != null) // { // var authMap = new Map(); // foreach (var material in authenticationMaterials) // authMap.Add(material.Type, material.Value); // var bl = new BinaryList(); // bl.AddUInt8((byte)((byte)method | 0x20)); // bl.AddUInt8Array(Codec.Compose(authMap, Instance.Warehouse, this)); // Send(bl.ToArray()); // } // else // { // Send(new byte[] { (byte)method }); // } //} void SendAuthData(EpAuthPacketMethod method, object data) { if (data != null) { var bl = new BinaryList(); bl.AddUInt8((byte)((byte)method | 0x20)); bl.AddUInt8Array(Codec.Compose(data, this.Instance?.Warehouse ?? _serverWarehouse, this)); Send(bl.ToArray()); } else { Send(new byte[] { (byte)method }); } } void SendAuth(EpAuthPacketMethod method) { Send(new byte[] { (byte)method }); } void SendAuthMessage(EpAuthPacketMethod method, string message) { var bl = new BinaryList(); bl.AddUInt8((byte)((byte)method | 0x20)); bl.AddUInt8Array(Codec.Compose(message, this.Instance?.Warehouse ?? _serverWarehouse, this)); Send(bl.ToArray()); } void SendAuthHeaders(EpAuthPacketMethod method, Map authHeaders) { if (authHeaders != null) { //var authMap = new Map(); //foreach (var header in authHeaders) //{ // authMap.Add(header.Key, header.Value); //} var bl = new BinaryList(); bl.AddUInt8((byte)((byte)method | 0x20)); bl.AddUInt8Array(Codec.Compose(authHeaders, this.Instance?.Warehouse ?? _serverWarehouse, this)); Send(bl.ToArray()); } else { Send(new byte[] { (byte)method }); } } /// /// Send EP notification. /// /// Packet action. /// Arguments to send. /// AsyncReply SendNotification(EpPacketNotification action, params object[] args) { var reply = new AsyncReply(); if (args.Length == 0) { Send(new byte[] { (byte)action }); } if (args.Length == 1) { var bl = new BinaryList(); bl.AddUInt8((byte)(0x20 | (byte)action)) .AddUInt8Array(Codec.Compose(args[0], this.Instance?.Warehouse ?? _serverWarehouse, this)); Send(bl.ToArray()); } else { var bl = new BinaryList(); bl.AddUInt8((byte)(0x20 | (byte)action)) .AddUInt8Array(Codec.Compose(args, this.Instance?.Warehouse ?? _serverWarehouse, this)); Send(bl.ToArray()); } return reply; } void SendReply(EpPacketReply action, uint callbackId, params object[] args) { if (Instance == null) return; if (args.Length == 0) { var bl = new BinaryList(); bl.AddUInt8((byte)(0x80 | (byte)action)) .AddUInt32(callbackId); Send(bl.ToArray()); } if (args.Length == 1) { var bl = new BinaryList(); bl.AddUInt8((byte)(0xA0 | (byte)action)) .AddUInt32(callbackId) .AddUInt8Array(Codec.Compose(args[0], this.Instance?.Warehouse ?? _serverWarehouse, this)); Send(bl.ToArray()); } else { var bl = new BinaryList(); bl.AddUInt8((byte)(0xA0 | (byte)action)) .AddUInt32(callbackId) .AddUInt8Array(Codec.Compose(args, this.Instance?.Warehouse ?? _serverWarehouse, this)); Send(bl.ToArray()); } } internal AsyncReply SendSubscribeRequest(uint instanceId, byte index) { return SendRequest(EpPacketRequest.Subscribe, instanceId, index); } internal AsyncReply SendUnsubscribeRequest(uint instanceId, byte index) { return SendRequest(EpPacketRequest.Unsubscribe, instanceId, index); } public AsyncReply StaticCall(ulong typeId, byte index, object parameters) { return SendRequest(EpPacketRequest.StaticCall, typeId, index, parameters); } public AsyncReply Call(string procedureCall, params object[] parameters) { //var args = new Map(); //for (byte i = 0; i < parameters.Length; i++) // args.Add(i, parameters[i]); // return Call(procedureCall, parameters); return SendRequest(EpPacketRequest.ProcedureCall, procedureCall, parameters); } public AsyncReply Call(string procedureCall, Map parameters) { return SendRequest(EpPacketRequest.ProcedureCall, procedureCall, parameters); } internal AsyncReply SendInvoke(uint instanceId, byte index, object parameters) { return SendRequest(EpPacketRequest.InvokeFunction, instanceId, index, parameters); } internal AsyncReply SendSetProperty(uint instanceId, byte index, object value) { return SendRequest(EpPacketRequest.SetProperty, instanceId, index, value); } internal AsyncReply SendDetachRequest(uint instanceId) { try { var sendDetach = false; if (_attachedResources.ContainsKey(instanceId)) { _attachedResources.Remove(instanceId); sendDetach = true; } if (_suspendedResources.ContainsKey(instanceId)) { _suspendedResources.Remove(instanceId); sendDetach = true; } if (sendDetach) return SendRequest(EpPacketRequest.DetachResource, instanceId); return null; // no one is waiting for this } catch { return null; } } void SendError(ErrorType type, uint callbackId, ushort errorCodeOrWarningLevel, string message = "") { if (type == ErrorType.Management) SendReply(EpPacketReply.PermissionError, callbackId, errorCodeOrWarningLevel, message); else if (type == ErrorType.Exception) SendReply(EpPacketReply.ExecutionError, callbackId, errorCodeOrWarningLevel, message); else if (type == ErrorType.Warning) SendReply(EpPacketReply.Warning, callbackId, (byte)errorCodeOrWarningLevel, message); } internal void SendProgress(uint callbackId, uint value, uint max) { SendReply(EpPacketReply.Progress, callbackId, value, max); } internal void SendWarning(uint callbackId, byte level, string message) { SendReply(EpPacketReply.Warning, callbackId, level, message); } internal void SendChunk(uint callbackId, object chunk) { SendReply(EpPacketReply.Chunk, callbackId, chunk); } void EpReplyCompleted(uint callbackId, PlainTdu tdu) { var req = _requests.Take(callbackId); //Console.WriteLine("Completed " + callbackId); if (req == null) { // @TODO: Send general failure return; } var pr = Codec.Parse(tdu, this, null); if (pr is AsyncReply asyncReply) { asyncReply.Then(req.Trigger) .Error(req.TriggerError); } else { req.Trigger(pr); } //var pr = Codec.ParseAsync(dataType, this, null).Then(pr => //{ // if (pr.Value is AsyncReply asyncReply) // { // asyncReply.Then(req.Trigger) // .Error(req.TriggerError); // } // else // { // req.Trigger(pr.Value); // } //}).Error(req.TriggerError); } void EpExtensionAction(byte actionId, PlainTdu? tdu) { // nothing is supported now } void EpReplyPropagated(uint callbackId, PlainTdu tdu) { var req = _requests[callbackId]; if (req == null) { // @TODO: Send general failure return; } var value = Codec.Parse(tdu, this, null); if (value is AsyncReply reply) { reply.Then(req.TriggerPropagation) .Error(req.TriggerError); } else { req.TriggerPropagation(value); } //var pr = Codec.ParseAsync(dataType, this, null).Then(pr => //{ // if (pr.Value is AsyncReply reply) // { // reply.Then(req.TriggerPropagation) // .Error(req.TriggerError); // } // else // { // req.TriggerPropagation(pr.Value); // } //}).Error(req.TriggerError); } void EpReplyError(uint callbackId, PlainTdu plainTdu, ErrorType type) { var req = _requests.Take(callbackId); if (req == null) { // @TODO: Send general failure return; } var tdu = ParsedTdu.ParseSync(plainTdu.Data, plainTdu.TduOffset, plainTdu.Ends, Instance.Warehouse); var args = DataDeserializer.ListParser(tdu, Instance.Warehouse) as object[]; var errorCode = Convert.ToUInt16(args[0]); var errorMsg = (string)args[1]; req.TriggerError(new AsyncException(type, errorCode, errorMsg)); } void EpReplyProgress(uint callbackId, PlainTdu plainTdu) { var req = _requests[callbackId]; if (req == null) { // @TODO: Send general failure return; } var tdu = ParsedTdu.ParseSync(plainTdu.Data, plainTdu.TduOffset, plainTdu.Ends, Instance.Warehouse); var args = DataDeserializer.ListParser(tdu, Instance.Warehouse) as object[]; var current = (uint)args[0]; var total = (uint)args[1]; req.TriggerProgress(ProgressType.Execution, current, total); } void EpReplyWarning(uint callbackId, PlainTdu plainTdu) { var req = _requests[callbackId]; if (req == null) { // @TODO: Send general failure return; } var tdu = ParsedTdu.ParseSync(plainTdu.Data, plainTdu.TduOffset, plainTdu.Ends, Instance.Warehouse); var args = DataDeserializer.ListParser(tdu, Instance.Warehouse) as object[]; var level = (byte)args[0]; var message = (string)args[1]; req.TriggerWarning(level, message); } void EpReplyChunk(uint callbackId, PlainTdu tdu) { var req = _requests[callbackId]; if (req == null) return; var value = Codec.Parse(tdu, this, null); if (value is AsyncReply asyncReply) { asyncReply.Then(req.TriggerChunk) .Error(req.TriggerError); } else { req.TriggerChunk(value); } //Codec.ParseAsync(dataType, this, null).Then(pr => //{ // if (pr.Value is AsyncReply asyncReply) // { // asyncReply.Then(req.TriggerChunk) // .Error(req.TriggerError); // } // else // { // req.TriggerChunk(pr.Value); // } //}).Error(req.TriggerError); } void EpNotificationResourceReassigned(PlainTdu dataType) { // uint resourceId, uint newResourceId } void EpNotificationResourceMoved(PlainTdu tdu) { } void EpNotificationSystemFailure(PlainTdu tdu) { } void EpNotificationResourceDestroyed(PlainTdu tdu) { var (size, rt) = Codec.ParseSync(tdu.Data, tdu.TduOffset, Instance.Warehouse); var resourceId = Convert.ToUInt32(rt); if (_attachedResources.Contains(resourceId)) { EpResource r; if (_attachedResources[resourceId].TryGetTarget(out r)) { // remove from attached to avoid sending unnecessary detach request when Destroy() is called _attachedResources.Remove(resourceId); r.Destroy(); } else { _attachedResources.Remove(resourceId); } } else if (_neededResources.Contains(resourceId)) { // @TODO: handle this mess _neededResources.Remove(resourceId); } } void EpNotificationPropertyModified(PlainTdu tdu) { // resourceId, index, value var (valueOffset, valueSize, args) = DataDeserializer.LimitedCountListParser(tdu.Data, tdu.PayloadOffset, tdu.PayloadLength, Instance.Warehouse, 2); var rid = Convert.ToUInt32(args[0]); var index = (byte)args[1]; FetchResource(rid, null).Then(r => { var pt = r.Instance.Definition.GetPropertyDefByIndex(index); if (pt == null) return; Codec.ParseAsync(tdu.Data, valueOffset, this, null).Then(pr => { if (pr.Value is AsyncReply asyncReply) { var item = new AsyncReply(); _queue.Add(item); asyncReply.Then((result) => { item.Trigger(new EpResourceQueueItem((EpResource)r, EpResourceQueueItem.DistributedResourceQueueItemType.Propery, result, index)); }); } else { _queue.Add(new AsyncReply(new EpResourceQueueItem((EpResource)r, EpResourceQueueItem.DistributedResourceQueueItemType.Propery, pr.Value, index))); } }).Error((ex) => { //.Error(x => SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError)); throw ex; }); }); } void EpNotificationEventOccurred(PlainTdu tdu) { // resourceId, index, value var (valueOffset, valueSize, args) = DataDeserializer.LimitedCountListParser(tdu.Data, tdu.PayloadOffset, tdu.PayloadLength, Instance.Warehouse, 2); var resourceId = Convert.ToUInt32(args[0]); var index = (byte)args[1]; FetchResource(resourceId, null).Then(r => { var et = r.Instance.Definition.GetEventDefByIndex(index); if (et == null) // this should never happen return; // push to the queue to guarantee serialization var item = new AsyncReply(); _queue.Add(item); Codec.ParseAsync(tdu.Data, valueOffset, this, null).Then(pr => { if (pr.Value is AsyncReply asyncReply) { asyncReply.Then((result) => { item.Trigger(new EpResourceQueueItem((EpResource)r, EpResourceQueueItem.DistributedResourceQueueItemType.Event, result, index)); }); } else { item.Trigger(new EpResourceQueueItem((EpResource)r, EpResourceQueueItem.DistributedResourceQueueItemType.Event, pr.Value, index)); } }).Error((ex) => throw ex); // @TODO: Send general error //.Error(x => SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError)); }); } void EpEventRenamed(uint resourceId, string name) { FetchResource(resourceId, null).Then(resource => { resource.Instance.Variables["name"] = name; }); } void EpRequestAttachResource(uint callback, PlainTdu tdu) { var value = Codec.ParseSync(tdu, Instance.Warehouse); var resourceId = Convert.ToUInt32(value); Instance.Warehouse.GetById(resourceId).Then((res) => { if (res != null) { if (res.Instance.Applicable(_session, ActionType.Attach, null) == Ruling.Denied) { SendError(ErrorType.Management, callback, 6); return; } var r = res as IResource; // unsubscribe Unsubscribe(r); // reply ok SendReply(EpPacketReply.Completed, callback, r.Instance.Definition.Id, r.Instance.Age, r.Instance.Link, r.Instance.Hops, r.Instance.Serialize()); // subscribe Subscribe(r); } else { // reply failed Global.Log("EpConnection", LogType.Debug, "Not found " + resourceId); SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); } }); } void EpRequestReattachResource(uint callback, PlainTdu tdu) { // resourceId, index, value var (valueOffset, valueSize, args) = DataDeserializer.LimitedCountListParser(tdu.Data, tdu.PayloadOffset, tdu.PayloadLength, Instance.Warehouse, 2); var resourceId = Convert.ToUInt32(args[0]); var age = (ulong)args[1]; Instance.Warehouse.GetById(resourceId).Then((res) => { if (res != null) { if (res.Instance.Applicable(_session, ActionType.Attach, null) == Ruling.Denied) { SendError(ErrorType.Management, callback, 6); return; } var r = res as IResource; // unsubscribe Unsubscribe(r); // reply ok SendReply(EpPacketReply.Completed, callback, r.Instance.Definition.Id, r.Instance.Age, r.Instance.Link, r.Instance.Hops, r.Instance.SerializeAfter(age)); // subscribe Subscribe(r); } else { // reply failed Global.Log("EpConnection", LogType.Debug, "Not found " + resourceId); SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); } }); } void EpRequestDetachResource(uint callback, PlainTdu tdu) { var value = Codec.ParseSync(tdu, Instance.Warehouse); var resourceId = Convert.ToUInt32(value); Instance.Warehouse.GetById(resourceId).Then((res) => { if (res != null) { // unsubscribe Unsubscribe(res); // remove from cache _cache.Remove(res); // remove from attached resources //attachedResources.Remove(res); // reply ok SendReply(EpPacketReply.Completed, callback); } else { // reply failed SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); } }); } void EpRequestCreateResource(uint callback, PlainTdu tdu) { Codec.ParseAsync(tdu.Data, tdu.TduOffset, this, null).Then(pr => { var args = (object[])pr.Value; var path = (string)args[0]; TypeDef typeDef = null; if (args[1] is uint || args[1] is byte || args[1] is ushort) // @TODO: this is a mess, we should have a better way to distinguish between type id and name typeDef = Instance.Warehouse.GetLocalTypeDefById(Convert.ToUInt64(args[1])); else if (args[1] is string) typeDef = Instance.Warehouse.GetLocalTypeDefByName((string)args[1]); if (typeDef == null || typeDef is not LocalTypeDef) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ClassNotFound); return; } var localTypeDef = typeDef as LocalTypeDef; var props = (Map)((object[])args)[2]; var attrs = (Map)((object[])args)[3]; // Get store var sc = path.Split('/'); Instance.Warehouse.Get(string.Join("/", sc.Take(sc.Length - 1))) .Then(r => { if (r == null) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.StoreNotFound); return; } var store = r.Instance.Store; // check security if (store.Instance.Applicable(_session, ActionType.CreateResource, null) != Ruling.Allowed) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.CreateDenied); return; } Instance.Warehouse.New(localTypeDef.DefinedType, path, new ResourceContext(0, attrs, props.Select(x => new KeyValuePair (localTypeDef.GetPropertyDefByIndex(x.Key).Name, x.Value)), null)) .Then(resource => { SendReply(EpPacketReply.Completed, callback, resource.Instance.Id); }).Error(e => { SendError(e.Type, callback, (ushort)e.Code, e.Message); }); }).Error(e => { SendError(e.Type, callback, (ushort)e.Code, e.Message); }); }).Error(x => SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError)); } void EpRequestDeleteResource(uint callback, PlainTdu tdu) { var value = Codec.ParseSync(tdu, Instance.Warehouse); var resourceId = Convert.ToUInt32(value); Instance.Warehouse.GetById(resourceId).Then(r => { if (r == null) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); return; } if (r.Instance.Store.Instance.Applicable(_session, ActionType.Delete, null) != Ruling.Allowed) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.DeleteDenied); return; } if (Instance.Warehouse.Remove(r)) SendReply(EpPacketReply.Completed, callback); else SendError(ErrorType.Management, callback, (ushort)ExceptionCode.DeleteFailed); }); } void EpRequestMoveResource(uint callback, PlainTdu tdu) { var (offset, length, args) = DataDeserializer.LimitedCountListParser(tdu.Data, tdu.PayloadOffset, tdu.PayloadLength, Instance.Warehouse); var resourceId = Convert.ToUInt32(args[0]); var name = (string)args[1]; if (name.Contains("/")) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.NotSupported); return; } Instance.Warehouse.GetById(resourceId).Then(resource => { if (resource == null) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); return; } if (resource.Instance.Applicable(this._session, ActionType.Rename, null) != Ruling.Allowed) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.RenameDenied); return; } resource.Instance.Name = name; SendReply(EpPacketReply.Completed, callback); }); } void EpRequestToken(uint callback, PlainTdu tdu) { // @TODO: To be implemented } void EpRequestLinkTypeDefs(uint callback, PlainTdu tdu) { var value = Codec.ParseSync(tdu, Instance.Warehouse); var resourceLink = (string)value; Action queryCallback = (r) => { if (r == null) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); return; } if (r.Instance.Applicable(_session, ActionType.ViewTypeDef, null) == Ruling.Denied) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.NotAllowed); return; } // make sure the resource is a local type def. if (r.Instance.Definition is LocalTypeDef localTypeDef) { var typeDefs = LocalTypeDef.GetDependencies(localTypeDef, Instance.Warehouse); // Send SendReply(EpPacketReply.Completed, callback, typeDefs.Select(x => x.Compose(this)).ToArray()); } else { // @TODO: Add support for remote type defs SendError(ErrorType.Management, callback, (ushort)ExceptionCode.NotSupported); } }; if (Server?.EntryPoint != null) Server.EntryPoint.Query(resourceLink, this).Then(queryCallback); else Instance.Warehouse.Query(resourceLink).Then(queryCallback); } void EpRequestTypeDefByName(uint callback, PlainTdu tdu) { var value = Codec.ParseSync(tdu, Instance.Warehouse); var className = (string)value; var typeDef = Instance.Warehouse.GetRemoteTypeDefByName(_remoteDomain, className); if (typeDef != null) { SendReply(EpPacketReply.Completed, callback, typeDef.Compose(this)); } else { // reply failed SendError(ErrorType.Management, callback, (ushort)ExceptionCode.TypeDefNotFound); } } void EpRequestTypeDefById(uint callback, PlainTdu tdu) { var value = Codec.ParseSync(tdu, Instance.Warehouse); var typeId = Convert.ToUInt32(value); var t = Instance.Warehouse.GetLocalTypeDefById(typeId); if (t != null) { SendReply(EpPacketReply.Completed, callback, t.Compose(this)); } else { // reply failed SendError(ErrorType.Management, callback, (ushort)ExceptionCode.TypeDefNotFound); } } void EpRequestTypeDefByResourceId(uint callback, PlainTdu tdu) { var value = Codec.ParseSync(tdu, Instance.Warehouse); var resourceId = Convert.ToUInt32(value); Instance.Warehouse.GetById(resourceId).Then((r) => { if (r != null) { SendReply(EpPacketReply.Completed, callback, r.Instance.Definition.Compose(this)); } else { // reply failed SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); } }); } void EpRequestGetResourceIdByLink(uint callback, PlainTdu tdu) { var parsed = Codec.ParseSync(tdu, Instance.Warehouse); var resourceLink = (string)parsed; Action queryCallback = (r) => { if (r == null) SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); else { if (r.Instance.Applicable(_session, ActionType.Attach, null) == Ruling.Denied) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); return; } SendReply(EpPacketReply.Completed, callback, r); } }; if (Server?.EntryPoint != null) Server.EntryPoint.Query(resourceLink, this).Then(queryCallback); else Instance.Warehouse.Query(resourceLink).Then(queryCallback); } void EpRequestQueryResources(uint callback, PlainTdu tdu) { var parsed = Codec.ParseSync(tdu, Instance.Warehouse); var resourceLink = (string)parsed; Action queryCallback = (r) => { if (r == null) SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); else { if (r.Instance.Applicable(_session, ActionType.Attach, null) == Ruling.Denied) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.NotAllowed); return; } r.Instance.Children().Then(children => { var list = children.Where(x => x.Instance.Applicable(_session, ActionType.Attach, null) != Ruling.Denied).ToArray(); SendReply(EpPacketReply.Completed, callback, list); }).Error(e => { SendError(e.Type, callback, (ushort)e.Code, e.Message); }); } }; if (Server?.EntryPoint != null) Server.EntryPoint.Query(resourceLink, this) .Then(queryCallback) .Error(e => SendError(e.Type, callback, (ushort)e.Code, e.Message)); else Instance.Warehouse.Query(resourceLink) .Then(queryCallback) .Error(e => SendError(e.Type, callback, (ushort)e.Code, e.Message)); } void EpRequestResourceAttribute(uint callback, uint resourceId) { } private Tuple SummerizeException(Exception ex) { ex = ex.InnerException != null ? ex.InnerException : ex; var code = (ExceptionLevel & ExceptionLevel.Code) == 0 ? 0 : ex is AsyncException ae ? ae.Code : 0; var msg = (ExceptionLevel & ExceptionLevel.Message) == 0 ? "" : ex.Message; var source = (ExceptionLevel & ExceptionLevel.Source) == 0 ? "" : ex.Source; var trace = (ExceptionLevel & ExceptionLevel.Trace) == 0 ? "" : ex.StackTrace; return new Tuple((ushort)code, $"{source}: {msg}\n{trace}"); } void EpRequestProcedureCall(uint callback, PlainTdu tdu) { var (offset, length, args) = DataDeserializer.LimitedCountListParser(tdu.Data, tdu.PayloadOffset, tdu.PayloadLength, Instance.Warehouse, 1); var procedureCall = (string)args[0]; if (Server == null) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.NotSupported); return; } var call = Server.Calls[procedureCall]; if (call == null) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); return; } Codec.ParseAsync(tdu.Data, offset, this, null).Then(pr => { if (pr.Value is AsyncReply reply) { reply.Then(results => { //var arguments = (Map)results; // un hold the socket to send data immediately this.Socket.Unhold(); // @TODO: Make managers for procedure calls //if (r.Instance.Applicable(session, ActionType.Execute, ft) == Ruling.Denied) //{ // SendError(ErrorType.Management, callback, // (ushort)ExceptionCode.InvokeDenied); // return; //} InvokeFunction(call.Value.Definition, callback, results, EpPacketRequest.ProcedureCall, call.Value.Delegate.Target); }).Error(x => { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError); }); } else { //var arguments = (Map)parsed; // un hold the socket to send data immediately this.Socket.Unhold(); // @TODO: Make managers for procedure calls InvokeFunction(call.Value.Definition, callback, pr.Value, EpPacketRequest.ProcedureCall, call.Value.Delegate.Target); } }).Error(x => SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError)); } void EpRequestStaticCall(uint callback, PlainTdu tdu) { var (offset, length, args) = DataDeserializer.LimitedCountListParser(tdu.Data, tdu.PayloadOffset, tdu.PayloadLength, Instance.Warehouse, 2); var typeId = Convert.ToUInt32(args[0]); var index = (byte)args[1]; var typeDef = Instance.Warehouse.GetLocalTypeDefById(typeId); if (typeDef == null) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.TypeDefNotFound); return; } var fd = typeDef.GetFunctionDefByIndex(index); if (fd == null) { // no function at this index SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); return; } var fi = fd.MethodInfo; if (fi == null) { // ft found, fi not found, this should never happen SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); return; } Codec.ParseAsync(tdu.Data, offset, this, null).Then(pr => { if (pr.Value is AsyncReply reply) { reply.Then(results => { //var arguments = (Map)results; // un hold the socket to send data immediately this.Socket.Unhold(); // @TODO: Make managers for static calls //if (r.Instance.Applicable(session, ActionType.Execute, ft) == Ruling.Denied) //{ // SendError(ErrorType.Management, callback, // (ushort)ExceptionCode.InvokeDenied); // return; //} InvokeFunction(fd, callback, results, EpPacketRequest.StaticCall, null); }).Error(x => { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError); }); } else { //var arguments = (Map)parsed; // un hold the socket to send data immediately this.Socket.Unhold(); // @TODO: Make managers for static calls InvokeFunction(fd, callback, pr.Value, EpPacketRequest.StaticCall, null); } }).Error(x => SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError)); } void EpRequestInvokeFunction(uint callback, PlainTdu tdu) { var (offset, length, args) = DataDeserializer.LimitedCountListParser(tdu.Data, tdu.PayloadOffset, tdu.PayloadLength, Instance.Warehouse, 2); var resourceId = Convert.ToUInt32(args[0]); var index = (byte)args[1]; Instance.Warehouse.GetById(resourceId).Then((r) => { if (r == null) { // no resource with this id SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); return; } var ft = r.Instance.Definition.GetFunctionDefByIndex(index); if (ft == null) { // no function at this index SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); return; } Codec.ParseAsync(tdu.Data, offset, this, null).Then(pr => { if (pr.Value is AsyncReply asyncReply) { asyncReply.Then(result => { // var arguments = result; // un hold the socket to send data immediately this.Socket.Unhold(); if (r is EpResource) { var rt = (r as EpResource)._Invoke(index, result); if (rt != null) { rt.Then(res => { SendReply(EpPacketReply.Completed, callback, res); }); } else { // function not found on a distributed object SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); } } else { if (r.Instance.Applicable(_session, ActionType.Execute, ft) == Ruling.Denied) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.InvokeDenied); return; } InvokeFunction(ft, callback, result, EpPacketRequest.InvokeFunction, r); } }); } else { //var arguments = (Map)parsed; // un hold the socket to send data immediately this.Socket.Unhold(); if (r is EpResource) { var rt = (r as EpResource)._Invoke(index, pr.Value); if (rt != null) { rt.Then(res => { SendReply(EpPacketReply.Completed, callback, res); }); } else { // function not found on a distributed object SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); } } else { if (r.Instance.Applicable(_session, ActionType.Execute, ft) == Ruling.Denied) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.InvokeDenied); return; } InvokeFunction(ft, callback, pr.Value, EpPacketRequest.InvokeFunction, r); } } }).Error(x => SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError)); ; }).Error(x => SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError)); ; } void InvokeFunction(FunctionDef ft, uint callback, object arguments, EpPacketRequest actionType, object target = null) { // cast arguments ParameterInfo[] pis = ft.MethodInfo.GetParameters(); object[] args = new object[pis.Length]; InvocationContext context = null; if (pis.Length > 0) { if (pis.Last().ParameterType == typeof(EpConnection)) { if (arguments is Map indexedArguments) { for (byte i = 0; i < pis.Length - 1; i++) { if (indexedArguments.ContainsKey(i)) args[i] = RuntimeCaster.Cast(indexedArguments[i], pis[i].ParameterType); else if (ft.Arguments[i].Type.Nullable) args[i] = null; else args[i] = Type.Missing; } } else if (arguments is object[] arrayArguments) { for (var i = 0; (i < arrayArguments.Length) && (i < pis.Length - 1); i++) { args[i] = RuntimeCaster.Cast(arrayArguments[i], pis[i].ParameterType); } for (var i = arrayArguments.Length; i < pis.Length - 1; i++) { args[i] = Type.Missing; } } else { // assume first argument // Note: if object[] is intended, sender should send nest it withing object[] { object[] } if (pis.Length > 1) args[0] = RuntimeCaster.Cast(arguments, pis[0].ParameterType); } args[args.Length - 1] = this; } else if (pis.Last().ParameterType == typeof(InvocationContext)) { context = new InvocationContext(this, callback); if (arguments is Map indexedArguments) { for (byte i = 0; i < pis.Length - 1; i++) { if (indexedArguments.ContainsKey(i)) args[i] = RuntimeCaster.Cast(indexedArguments[i], pis[i].ParameterType); else if (ft.Arguments[i].Type.Nullable) args[i] = null; else args[i] = Type.Missing; } } else if (arguments is object[] arrayArguments) { for (var i = 0; (i < arrayArguments.Length) && (i < pis.Length - 1); i++) { args[i] = RuntimeCaster.Cast(arrayArguments[i], pis[i].ParameterType); } for (var i = arrayArguments.Length; i < pis.Length - 1; i++) { args[i] = Type.Missing; } } else { // assume first argument // Note: if object[] is intended, sender should send nest it withing object[] { object[] } if (pis.Length > 1) args[0] = RuntimeCaster.Cast(arguments, pis[0].ParameterType); //throw new NotImplementedException("Arguments type not supported."); } args[args.Length - 1] = context; } else { if (arguments is Map indexedArguments) { for (byte i = 0; i < pis.Length; i++) { if (indexedArguments.ContainsKey(i)) args[i] = RuntimeCaster.Cast(indexedArguments[i], pis[i].ParameterType); else if (ft.Arguments[i].Type.Nullable) //Nullable.GetUnderlyingType(pis[i].ParameterType) != null) args[i] = null; else args[i] = Type.Missing; } } else if (arguments is object[] arrayArguments) { for (var i = 0; (i < arrayArguments.Length) && (i < pis.Length); i++) { args[i] = RuntimeCaster.Cast(arrayArguments[i], pis[i].ParameterType); } for (var i = arrayArguments.Length; i < pis.Length; i++) { args[i] = Type.Missing; } } else { // assume first argument // Note: if object[] is intended, sender should send nest it withing object[] { object[] } if (pis.Length > 0) args[0] = RuntimeCaster.Cast(arguments, pis[0].ParameterType); } } } object rt; try { rt = ft.MethodInfo.Invoke(target, args); } catch (Exception ex) { var (code, msg) = SummerizeException(ex); msg = "Arguments: " + string.Join(", ", args.Select(x => x?.ToString() ?? "[Null]").ToArray()) + "\r\n" + msg; SendError(ErrorType.Exception, callback, code, msg); return; } if (rt is IAsyncEnumerable) { var enu = rt as IAsyncEnumerable; var enumerator = enu.GetAsyncEnumerator(); Task.Run(async () => { try { while (await enumerator.MoveNextAsync()) { var v = enumerator.Current; SendChunk(callback, v); } SendReply(EpPacketReply.Completed, callback); if (context != null) context.Ended = true; } catch (Exception ex) { if (context != null) context.Ended = true; var (code, msg) = SummerizeException(ex); SendError(ErrorType.Exception, callback, code, msg); } }); } else if (rt is System.Collections.IEnumerable && !(rt is Array || rt is Map || rt is string)) { var enu = rt as System.Collections.IEnumerable; try { foreach (var v in enu) SendChunk(callback, v); SendReply(EpPacketReply.Completed, callback); if (context != null) context.Ended = true; } catch (Exception ex) { if (context != null) context.Ended = true; var (code, msg) = SummerizeException(ex); SendError(ErrorType.Exception, callback, code, msg); } } else if (rt is Task) { (rt as Task).ContinueWith(t => { if (context != null) context.Ended = true; #if NETSTANDARD var res = t.GetType().GetTypeInfo().GetProperty("Result").GetValue(t); #else var res = t.GetType().GetProperty("Result").GetValue(t); #endif SendReply(EpPacketReply.Completed, callback, res); }); } else if (rt is AsyncReply) { (rt as AsyncReply).Then(res => { if (context != null) context.Ended = true; SendReply(EpPacketReply.Completed, callback, res); }).Error(ex => { var (code, msg) = SummerizeException(ex); SendError(ErrorType.Exception, callback, code, msg); }).Progress((pt, pv, pm) => { SendProgress(callback, pv, pm); }).Chunk(v => { SendChunk(callback, v); }).Warning((level, message) => { SendError(ErrorType.Warning, callback, level, message); }); } else { if (context != null) context.Ended = true; SendReply(EpPacketReply.Completed, callback, rt); } } void EpRequestSubscribe(uint callback, PlainTdu tdu) { var (offset, length, args) = DataDeserializer.LimitedCountListParser(tdu.Data, tdu.PayloadOffset, tdu.PayloadLength, Instance.Warehouse); var resourceId = Convert.ToUInt32(args[0]); var index = (byte)args[1]; Instance.Warehouse.GetById(resourceId).Then((r) => { if (r == null) { // resource not found SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); return; } var et = r.Instance.Definition.GetEventDefByIndex(index); if (et != null) { // et not found SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); return; } if (r is EpResource) { (r as EpResource).Subscribe(et).Then(x => { SendReply(EpPacketReply.Completed, callback); }).Error(x => SendError(ErrorType.Exception, callback, (ushort)ExceptionCode.GeneralFailure)); } else { lock (_subscriptionsLock) { if (!_subscriptions.ContainsKey(r)) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.NotAttached); return; } if (_subscriptions[r].Contains(index)) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.AlreadyListened); return; } _subscriptions[r].Add(index); SendReply(EpPacketReply.Completed, callback); } } }); } void EpRequestUnsubscribe(uint callback, PlainTdu tdu) { var (offset, length, args) = DataDeserializer.LimitedCountListParser(tdu.Data, tdu.PayloadOffset, tdu.PayloadLength, Instance.Warehouse); var resourceId = Convert.ToUInt32(args[0]); var index = (byte)args[1]; Instance.Warehouse.GetById(resourceId).Then((r) => { if (r == null) { // resource not found SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); return; } var et = r.Instance.Definition.GetEventDefByIndex(index); if (et == null) { // pt not found SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); return; } if (r is EpResource) { (r as EpResource).Unsubscribe(et).Then(x => { SendReply(EpPacketReply.Completed, callback); }).Error(x => SendError(ErrorType.Exception, callback, (ushort)ExceptionCode.GeneralFailure)); } else { lock (_subscriptionsLock) { if (!_subscriptions.ContainsKey(r)) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.NotAttached); return; } if (!_subscriptions[r].Contains(index)) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.AlreadyUnsubscribed); return; } _subscriptions[r].Remove(index); SendReply(EpPacketReply.Completed, callback); } } }); } void EpRequestSetProperty(uint callback, PlainTdu tdu) { var (offset, length, args) = DataDeserializer.LimitedCountListParser(tdu.Data, tdu.PayloadOffset, tdu.PayloadLength, Instance.Warehouse, 2); var rid = (uint)args[0]; var index = (byte)args[1]; // un hold the socket to send data immediately this.Socket.Unhold(); Instance.Warehouse.GetById(rid).Then((r) => { if (r == null) { // resource not found SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); return; } var pt = r.Instance.Definition.GetPropertyDefByIndex(index); if (pt != null) { // property not found SendError(ErrorType.Management, callback, (ushort)ExceptionCode.PropertyNotFound); return; } if (r is IDynamicResource dynamicResource) { Codec.ParseAsync(tdu.Data, offset, this, null).Then(pr => { if (pr.Value is AsyncReply asyncReply) { asyncReply.Then((value) => { // propagation dynamicResource.SetResourcePropertyAsync(index, value).Then((x) => { SendReply(EpPacketReply.Completed, callback); }).Error(x => { SendError(x.Type, callback, (ushort)x.Code, x.Message); }); }); } else { // propagation dynamicResource.SetResourcePropertyAsync(index, pr.Value).Then((x) => { SendReply(EpPacketReply.Completed, callback); }).Error(x => { SendError(x.Type, callback, (ushort)x.Code, x.Message); }); } }).Error(x => SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError)); ; } else { var pi = pt.PropertyInfo; if (pi == null) { // pt found, pi not found, this should never happen SendError(ErrorType.Management, callback, (ushort)ExceptionCode.PropertyNotFound); return; } if (r.Instance.Applicable(_session, ActionType.SetProperty, pt, this) == Ruling.Denied) { SendError(ErrorType.Exception, callback, (ushort)ExceptionCode.SetPropertyDenied); return; } if (!pi.CanWrite) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ReadOnlyProperty); return; } Codec.ParseAsync(tdu.Data, offset, this, null).Then(pr => { if (pr.Value is AsyncReply asyncReply) { asyncReply.Then((value) => { if (pi.PropertyType.IsGenericType && pi.PropertyType.GetGenericTypeDefinition() == typeof(PropertyContext<>)) { value = Activator.CreateInstance(pi.PropertyType, this, value); } else { // cast new value type to property type value = RuntimeCaster.Cast(value, pi.PropertyType); } try { pi.SetValue(r, value); SendReply(EpPacketReply.Completed, callback); } catch (Exception ex) { SendError(ErrorType.Exception, callback, 0, ex.Message); } }); } else { var value = pr.Value; if (pi.PropertyType.IsGenericType && pi.PropertyType.GetGenericTypeDefinition() == typeof(PropertyContext<>)) { value = Activator.CreateInstance(pi.PropertyType, this, value); //value = new DistributedPropertyContext(this, value); } else { // cast new value type to property type value = RuntimeCaster.Cast(value, pi.PropertyType); } try { pi.SetValue(r, value); SendReply(EpPacketReply.Completed, callback); } catch (Exception ex) { SendError(ErrorType.Exception, callback, 0, ex.Message); } } }).Error(x => SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError)); } }); } /// /// Get the TypeDef for a given type Id. /// /// Type UUID. /// TypeDef. //public AsyncReply GetTypeDefById(ulong typeId) //{ // lock (_typeDefsLock) // { // if (_remoteTypeDefs.ContainsKey(typeId)) // return new AsyncReply(_remoteTypeDefs[typeId]); // else if (_typeDefsByIdRequests.ContainsKey(typeId)) // return _typeDefsByIdRequests[typeId]; // var reply = new AsyncReply(); // _typeDefsByIdRequests.Add(typeId, reply); // SendRequest(EpPacketRequest.TypeDefById, typeId) // .Then((result) => // { // // @TODO: Solve for dependency deadlock // RemoteTypeDef.Parse(_remoteDomain, (byte[])result, this).Then(td => // { // _typeDefsByIdRequests.Remove(typeId); // _remoteTypeDefs.Add(td.Id, td); // // register all remote TypeDefs to warehouse to be used in future parsing before the actual request for them arrives. // Instance.Warehouse.TryRegisterRemoteTypeDef(_remoteDomain, td); // reply.Trigger(td); // }); // }).Error((ex) => // { // reply.TriggerError(ex); // }); // return reply; // } //} //public AsyncReply GetTypeDefByName(string typeName) //{ // lock (_typeDefsLock) // { // var typeDef = _remoteTypeDefs.Values.FirstOrDefault(x => x.Name == typeName); // if (typeDef != null) // return new AsyncReply(typeDef); // if (_typeDefsByNameRequests.ContainsKey(typeName)) // return _typeDefsByNameRequests[typeName]; // var reply = new AsyncReply(); // _typeDefsByNameRequests.Add(typeName, reply); // SendRequest(EpPacketRequest.TypeDefByName, typeName) // .Then((result) => // { // RemoteTypeDef.Parse(_remoteDomain, (byte[])result, this).Then(td => // { // _typeDefsByNameRequests.Remove(typeName); // _remoteTypeDefs.Add(td.Id, td); // Instance.Warehouse.TryRegisterRemoteTypeDef(_remoteDomain, td); // reply.Trigger(td); // }); // }).Error((ex) => // { // reply.TriggerError(ex); // }); // return reply; // } //} // IStore interface /// /// Get a resource by its path. /// /// Path to the resource. /// Resource public AsyncReply Get(string path) { var rt = new AsyncReply(); var req = SendRequest(EpPacketRequest.GetResourceIdByLink, path); req.Then(result => { // The resource is being handed to the application: publish its fully-attached // graph so that, if any dependency is only partially attached, it stays unpublished. if (result is EpResource resource) PublishGraph(resource); rt.Trigger(result); }).Error(ex => rt.TriggerError(ex)); //Query(path).Then(ar => //{ // //if (filter != null) // // ar = ar?.Where(filter).ToArray(); // // MISSING: should dispatch the unused resources. // if (ar?.Length > 0) // rt.Trigger(ar[0]); // else // rt.Trigger(null); //}).Error(ex => rt.TriggerError(ex)); return rt; } public AsyncReply GetLinkDefinitions(string link) { throw new NotImplementedException(); //var reply = new AsyncReply(); //SendRequest(EpPacketRequest.LinkTypeDefs, link) //.Then((result) => //{ // var defs = new List(); // foreach (var def in (byte[][])result) // { // defs.Add(RemoteTypeDef.Parse(_remoteDomain, def)); // } // reply.Trigger(defs.ToArray()); //}).Error((ex) => //{ // reply.TriggerError(ex); //}); //return reply; } /// /// Fetch a resource from the other end /// /// Resource Id /// DistributedResource /// //object fetchResourceLock = new object(); // Records that the attachment of `parent` is now blocked waiting on in-flight child `child`. void AddFetchBlock(uint parent, uint child) { if (!_fetchBlockedOn.TryGetValue(parent, out var set)) _fetchBlockedOn[parent] = set = new HashSet(); set.Add(child); } // Removes a resource from the wait-for graph once it is attached or its fetch failed: it is // no longer blocked on anything and no longer a pending child of anyone. void ClearFetchNode(uint id) { _fetchBlockedOn.Remove(id); foreach (var set in _fetchBlockedOn.Values) set.Remove(id); } /// /// Returns true if completing the fetch of by waiting for its in-flight /// request would deadlock, i.e. the resource is (transitively) blocked on a resource that the /// current request chain is itself building. In that case the caller should hand back the /// placeholder to break the cycle instead of waiting. /// internal static bool HasWaitForCycle(uint id, uint[] requestSequence, IReadOnlyDictionary> blockedOn) { if (requestSequence == null || requestSequence.Length == 0) return false; var chain = new HashSet(requestSequence); var visited = new HashSet(); var stack = new Stack(); stack.Push(id); while (stack.Count > 0) { var current = stack.Pop(); if (!visited.Add(current)) continue; if (!blockedOn.TryGetValue(current, out var children)) continue; foreach (var child in children) { // Reaching a node that the current chain is attaching closes the cycle. if (chain.Contains(child)) return true; stack.Push(child); } } return false; } /// /// Publishes a fully-attached object graph to the application: every resource reachable from /// is marked , but only if the /// entire reachable graph is already attached. If any reachable resource is still being /// attached (e.g. a placeholder handed out to break a cycle), the graph is left unpublished — /// exactly the partially-attached delivery that the wait-by-default resolver prevents and the /// legacy resolver does not. /// internal void PublishGraph(EpResource root) { if (root == null) return; var seen = new HashSet(); var reachable = new List(); var queue = new Queue(); queue.Enqueue(root); var fullyAttached = true; while (queue.Count > 0) { var node = queue.Dequeue(); if (node == null || !seen.Add(node.ResourceInstanceId)) continue; reachable.Add(node); if (node.Status != ResourceStatus.Attached) { fullyAttached = false; continue; // do not traverse into a not-yet-attached node } foreach (var child in node.GetReferencedResources()) queue.Enqueue(child); } if (fullyAttached) foreach (var node in reachable) node.Publish(); } public AsyncReply FetchResource(uint id, uint[] requestSequence) { //lock (fetchLock) //{ EpResource resource = null; _attachedResources[id]?.TryGetTarget(out resource); if (resource != null) { Global.Counters["EpResourceAttachedCacheHit"]++; return new AsyncReply(resource); } resource = _neededResources[id]; var requestInfo = _resourceRequests[id]; // The resource that triggered this fetch (the tail of the chain), if any. Used to record // wait-for edges and to tell graph-internal references from app-facing fetches (no chain). uint? parent = requestSequence != null && requestSequence.Length > 0 ? requestSequence[requestSequence.Length - 1] : (uint?)null; if (requestInfo != null) { // Same dependency chain (A->B->A): the placeholder is an internal node of the graph // currently being attached. The application only observes the chain's top-level reply, // which fires after full attachment, so returning the not-yet-attached placeholder here // is safe and breaks the reference cycle. NaiveWait skips this so that even same-chain // cycles deadlock (used to demonstrate the protection is necessary). if (DeadlockResolution != DeadlockResolutionMode.NaiveWait && resource != null && (requestSequence?.Contains(id) ?? false)) { Global.Counters["EpResourceDeadLockSameChain"]++; CycleBreakCount++; return new AsyncReply(resource); } // Decide whether to break the wait by returning the placeholder: // - Legacy: hand it to ANY cross-chain requester (over-eager; the bug under study). // - WaitWithCycleDetection: only on a genuine wait-for cycle. // - NaiveWait: never — always wait below (deadlocks on cycles). var breakCycle = resource != null && DeadlockResolution switch { DeadlockResolutionMode.LegacyCrossChainPlaceholder => requestInfo.RequestSequence.Contains(id), DeadlockResolutionMode.WaitWithCycleDetection => HasWaitForCycle(id, requestSequence, _fetchBlockedOn), _ => false, }; if (breakCycle) { Global.Counters["EpResourceDeadLockCrossChain"]++; CycleBreakCount++; // Instrumentation: a placeholder handed out where there is no genuine wait-for cycle // is an unnecessary, partial delivery — the new resolver would have waited for full // attachment instead. This counts the legacy resolver's over-eager placeholders. if (DeadlockResolution == DeadlockResolutionMode.LegacyCrossChainPlaceholder && !HasWaitForCycle(id, requestSequence, _fetchBlockedOn)) { Global.Counters["EpResourceUnnecessaryPlaceholder"]++; UnnecessaryPlaceholderCount++; } return new AsyncReply(resource); } // Otherwise an independent or application-facing requester: wait for the in-flight // attachment to complete fully rather than exposing a partially attached resource. Global.Counters["EpResourcePendingCacheHit"]++; if (parent != null) AddFetchBlock(parent.Value, id); return requestInfo.Reply; } else if (resource != null && resource.Status != ResourceStatus.Suspended) { // @REVIEW: this should never happen Global.Log("DCON", LogType.Error, "Resource not moved to attached."); return new AsyncReply(resource); } var newSequence = requestSequence != null ? requestSequence.Concat(new uint[] { id }).ToArray() : new uint[] { id }; var reply = new AsyncReply(); _resourceRequests.Add(id, new FetchRequestInfo(reply, newSequence)); // This fetch's parent now waits on `id` until it attaches. if (parent != null) AddFetchBlock(parent.Value, id); SendRequest(EpPacketRequest.AttachResource, id) .Then((result) => { if (result == null) { reply.TriggerError(new AsyncException(ErrorType.Management, (ushort)ExceptionCode.ResourceNotFound, "Null response")); return; } // TypeId, Age, Link, Hops, PropertyValue[] var args = (object[])result; var typeId = Convert.ToUInt32(args[0]); var age = Convert.ToUInt64(args[1]); var link = (string)args[2]; var hops = (byte)args[3]; var pvData = (byte[])args[4]; var typeDef = resource != null ? resource.Instance.Definition as RemoteTypeDef : Instance.Warehouse.GetRemoteTypeDefById( _remoteDomain, typeId ); var initResource = (EpResource dr) => { var parsedReply = DataDeserializer.PropertyValueArrayParserAsync(pvData, 0, (uint)pvData.Length, this, newSequence); parsedReply.Then(results => { var pvs = results as PropertyValue[]; dr._Attach(pvs); // Progress signal: a resource has fully attached. Used by tests to // distinguish a true deadlock (no progress while requests pend) from // merely slow processing (these counters keep advancing). Global.Counters["EpResourceAttached"]++; AttachedResourceCount++; _resourceRequests.Remove(id); // move from needed to attached. _neededResources.Remove(id); _attachedResources[id] = new WeakReference(dr); // attached: no longer part of the in-flight wait-for graph. ClearFetchNode(id); reply.Trigger(dr); }).Error(ex => { _resourceRequests.Remove(id); ClearFetchNode(id); reply.TriggerError(ex); }); }; if (typeDef == null) { FetchTypeDef(typeId, null).Then((td) => { // typeId, ResourceAge, ResourceLink, Content if (resource == null) { if (td.ProxyType != null) resource = Activator.CreateInstance(td.ProxyType, this, id, Convert.ToUInt64(args[1]), (string)args[2]) as EpResource; else resource = new EpResource(this, id, Convert.ToUInt64(args[1]), (string)args[2]); resource.ResourceDefinition = td; typeDef = td; // Register the placeholder before parsing properties so cyclic // references in the graph can resolve back to this instance. _neededResources[id] = resource; Instance.Warehouse.Put(Instance.Link + "/" + id.ToString(), resource) .Then(initResource) .Error(ex => reply.TriggerError(ex)); } else { initResource(resource); } }).Error((ex) => { reply.TriggerError(ex); }); } else { if (resource == null) { if (typeDef.ProxyType != null) resource = Activator.CreateInstance(typeDef.ProxyType, this, id, Convert.ToUInt64(args[1]), (string)args[2]) as EpResource; else resource = new EpResource(this, id, Convert.ToUInt64(args[1]), (string)args[2]); resource.ResourceDefinition = typeDef; // Register the placeholder before parsing properties so cyclic // references in the graph can resolve back to this instance. _neededResources[id] = resource; Instance.Warehouse.Put(this.Instance.Link + "/" + id.ToString(), resource) .Then(initResource).Error((ex) => reply.TriggerError(ex)); } else { initResource(resource); } } }).Error((ex) => { // Failed to attach: drop the in-flight request and wait-for edges so a // later retry is not blocked by a stale entry. _resourceRequests.Remove(id); ClearFetchNode(id); reply.TriggerError(ex); }); return reply; //} } /// /// Fetch a resource from the other end /// /// Resource Id /// DistributedResource /// /// /// Re-attaches an already-known resource after reconnection using its last-known age. The peer /// returns only the properties modified after (the delta), which are /// merged into the existing instance instead of re-fetching everything. Falls back to a full /// if there is no prior state to merge into. /// public AsyncReply Reattach(uint id, ulong age, EpResource resource) { EpResource attachedResource = null; _attachedResources[id]?.TryGetTarget(out attachedResource); if (attachedResource != null) return new AsyncReply(attachedResource); var existing = _resourceRequests[id]; if (existing != null) return existing.Reply; var reply = new AsyncReply(); var sequence = new uint[] { id }; _resourceRequests.Add(id, new FetchRequestInfo(reply, sequence)); SendRequest(EpPacketRequest.ReattachResource, id, age).Then(result => { if (result == null) { _resourceRequests.Remove(id); reply.TriggerError(new AsyncException(ErrorType.Management, (ushort)ExceptionCode.ResourceNotFound, "Null response")); return; } // typeId, age, link, hops, delta(index -> PropertyValue) var args = (object[])result; var deltaData = (byte[])args[4]; DataDeserializer.PropertyValueMapParserAsync(deltaData, 0, (uint)deltaData.Length, this, sequence) .Then(delta => { if (!resource._Reattach(delta)) { // No prior state to merge into — perform a full attach instead. _resourceRequests.Remove(id); FetchResource(id, null).Then(r => reply.Trigger(r)).Error(ex => reply.TriggerError(ex)); return; } _resourceRequests.Remove(id); _neededResources.Remove(id); _attachedResources[id] = new WeakReference(resource); ClearFetchNode(id); reply.Trigger(resource); }) .Error(ex => { _resourceRequests.Remove(id); ClearFetchNode(id); reply.TriggerError(ex); }); }).Error(ex => { _resourceRequests.Remove(id); ClearFetchNode(id); reply.TriggerError(ex); }); return reply; } //object fetchResourceLock = new object(); public AsyncReply FetchTypeDef(ulong id, ulong[] requestSequence) { //Console.WriteLine($"Fetching typedef {id}"); RemoteTypeDef typeDef = _cachedTypeDefs[id]; if (typeDef != null) return new AsyncReply(typeDef); typeDef = _neededTypeDefs[id]; var requestInfo = _typeDefRequests[id]; if (requestInfo != null) { if (typeDef != null && (requestSequence?.Contains(id) ?? false)) { // dead lock avoidance for loop reference. return new AsyncReply(typeDef); } else if (typeDef != null && requestInfo.RequestSequence.Contains(id)) { // dead lock avoidance for dependent reference. return new AsyncReply(typeDef); } else { return requestInfo.Reply; } } var newSequence = requestSequence != null ? requestSequence.Concat(new ulong[] { id }).ToArray() : new ulong[] { id }; var reply = new AsyncReply(); _typeDefRequests.Add(id, new FetchRequestInfo(reply, newSequence)); SendRequest(EpPacketRequest.TypeDefById, id) .Then((result) => { if (result == null) { reply.TriggerError(new AsyncException(ErrorType.Management, (ushort)ExceptionCode.ResourceNotFound, "Null response")); return; } // TypeDef Data //var args = (object[])result; var typeDefData = (byte[])result; var od = new RemoteTypeDef(); _neededTypeDefs[id] = od; RemoteTypeDef.Parse(od, this.RemoteDomain, typeDefData, this, newSequence).Then(td => { _typeDefRequests.Remove(id); // move from needed to attached. _neededTypeDefs.Remove(id); _cachedTypeDefs[id] = td; reply.Trigger(td); }).Error(reply.TriggerError); }).Error(reply.TriggerError); return reply; } /// /// Query resources at specific link. /// /// Link path. /// public AsyncReply Query(string path) { var str = DC.ToBytes(path); var reply = new AsyncReply(); SendRequest(EpPacketRequest.Query, path) .Then(result => { reply.Trigger((IResource[])result); }).Error(ex => reply.TriggerError(ex)); return reply; } /// /// Create a new resource. /// /// Resource path. /// Type definition. /// Values for the resource properties. /// Resource attributes. /// New resource instance public AsyncReply Create(string path, TypeDef type, Map properties, Map attributes) { var reply = new AsyncReply(); SendRequest(EpPacketRequest.CreateResource, path, type.Id, type.CastProperties(properties), attributes) .Then(r => reply.Trigger((EpResource)r)) .Warning((l, m) => reply.TriggerWarning(l, m)) .Error(e => reply.TriggerError(e)); return reply; } private void Subscribe(IResource resource) { lock (_subscriptionsLock) { resource.Instance.EventOccurred += Instance_EventOccurred; resource.Instance.CustomEventOccurred += Instance_CustomEventOccurred; resource.Instance.PropertyModified += Instance_PropertyModified; resource.Instance.Destroyed += Instance_ResourceDestroyed; _subscriptions.Add(resource, new List()); } } private void Unsubscribe(IResource resource) { lock (_subscriptionsLock) { // do something with the list... resource.Instance.EventOccurred -= Instance_EventOccurred; resource.Instance.CustomEventOccurred -= Instance_CustomEventOccurred; resource.Instance.PropertyModified -= Instance_PropertyModified; resource.Instance.Destroyed -= Instance_ResourceDestroyed; _subscriptions.Remove(resource); } } private void UnsubscribeAll() { lock (_subscriptionsLock) { foreach (var resource in _subscriptions.Keys) { resource.Instance.EventOccurred -= Instance_EventOccurred; resource.Instance.CustomEventOccurred -= Instance_CustomEventOccurred; resource.Instance.PropertyModified -= Instance_PropertyModified; resource.Instance.Destroyed -= Instance_ResourceDestroyed; } _subscriptions.Clear(); } } private void Instance_ResourceDestroyed(IResource resource) { Unsubscribe(resource); // compose the packet SendNotification(EpPacketNotification.ResourceDestroyed, resource.Instance.Id); } private void Instance_PropertyModified(PropertyModificationInfo info) { SendNotification(EpPacketNotification.PropertyModified, info.Resource.Instance.Id, info.PropertyDef.Index, info.Value); } private void Instance_CustomEventOccurred(CustomEventOccurredInfo info) { if (info.EventDef.Subscribable) { lock (_subscriptionsLock) { // check the client requested listen if (!_subscriptions.ContainsKey(info.Resource)) return; if (!_subscriptions[info.Resource].Contains(info.EventDef.Index)) return; } } if (!info.Receivers(_session)) return; if (info.Resource.Instance.Applicable(_session, ActionType.ReceiveEvent, info.EventDef, info.Issuer) == Ruling.Denied) return; // compose the packet SendNotification(EpPacketNotification.EventOccurred, info.Resource.Instance.Id, info.EventDef.Index, info.Value); } private void Instance_EventOccurred(EventOccurredInfo info) { if (info.Definition.Subscribable) { lock (_subscriptionsLock) { // check the client requested listen if (!_subscriptions.ContainsKey(info.Resource)) return; if (!_subscriptions[info.Resource].Contains(info.Definition.Index)) return; } } if (info.Resource.Instance.Applicable(_session, ActionType.ReceiveEvent, info.Definition, null) == Ruling.Denied) return; // compose the packet SendNotification(EpPacketNotification.EventOccurred, info.Resource.Instance.Id, info.Definition.Index, info.Value); } void EpRequestKeepAlive(uint callback, PlainTdu tdu) { var (offset, length, args) = DataDeserializer.LimitedCountListParser(tdu.Data, tdu.PayloadOffset, tdu.PayloadLength, Instance.Warehouse); var peerTime = (DateTime)args[0]; var interval = Convert.ToUInt32(args[1]); uint jitter = 0; var now = DateTime.UtcNow; if (_lastKeepAliveReceived != null) { var diff = (uint)(now - (DateTime)_lastKeepAliveReceived).TotalMilliseconds; jitter = (uint)Math.Abs((int)diff - (int)interval); } SendReply(EpPacketReply.Completed, callback, now, jitter); _lastKeepAliveReceived = now; } }