/* Copyright (c) 2017-2025 Ahmed Kh. Zamil Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ using Esiur.Core; using Esiur.Data; using Esiur.Misc; using Esiur.Net.Packets; using Esiur.Resource; using Esiur.Resource.Template; using Esiur.Security.Authority; using Esiur.Security.Permissions; using System; using System.Collections.Generic; using System.ComponentModel.DataAnnotations; using System.Linq; using System.Reflection; using System.Security.Cryptography.X509Certificates; using System.Text; using System.Threading.Tasks; namespace Esiur.Net.IIP; partial class DistributedConnection { KeyList neededResources = new KeyList(); KeyList> attachedResources = new KeyList>(); KeyList> suspendedResources = new KeyList>(); KeyList resourceRequests = new KeyList(); KeyList> templateRequests = new KeyList>(); KeyList> templateByNameRequests = new KeyList>(); Dictionary templates = new Dictionary(); KeyList requests = new KeyList(); volatile uint callbackCounter = 0; Dictionary> subscriptions = new Dictionary>(); // resources might get attached by the client internal KeyList cache = new(); object subscriptionsLock = new object(); AsyncQueue queue = new AsyncQueue(); /// /// Send IIP request. /// /// Packet action. /// Arguments to send. /// AsyncReply SendRequest(IIPPacketRequest action, params object[] args) { var reply = new AsyncReply(); var c = callbackCounter++; // avoid thread racing requests.Add(c, reply); if (args.Length == 0) { var bl = new BinaryList(); bl.AddUInt8((byte)(0x40 | (byte)action)) .AddUInt32(c); Send(bl.ToArray()); } if (args.Length == 1) { var bl = new BinaryList(); bl.AddUInt8((byte)(0x60 | (byte)action)) .AddUInt32(c) .AddUInt8Array(Codec.Compose(args[0], this)); Send(bl.ToArray()); } else { var bl = new BinaryList(); bl.AddUInt8((byte)(0x60 | (byte)action)) .AddUInt32(c) .AddUInt8Array(Codec.Compose(args, this)); Send(bl.ToArray()); } return reply; } /// /// Send IIP notification. /// /// Packet action. /// Arguments to send. /// AsyncReply SendNotification(IIPPacketNotification action, params object[] args) { var reply = new AsyncReply(); if (args.Length == 0) { Send(new byte[] { (byte)action }); } if (args.Length == 1) { var bl = new BinaryList(); bl.AddUInt8((byte)(0x20 | (byte)action)) .AddUInt8Array(Codec.Compose(args[0], this)); Send(bl.ToArray()); } else { var bl = new BinaryList(); bl.AddUInt8((byte)(0x20 | (byte)action)) .AddUInt8Array(Codec.Compose(args, this)); Send(bl.ToArray()); } return reply; } void SendReply(IIPPacketReply action, uint callbackId, params object[] args) { if (args.Length == 0) { var bl = new BinaryList(); bl.AddUInt8((byte)(0x80 | (byte)action)) .AddUInt32(callbackId); Send(bl.ToArray()); } if (args.Length == 1) { var bl = new BinaryList(); bl.AddUInt8((byte)(0xA0 | (byte)action)) .AddUInt32(callbackId) .AddUInt8Array(Codec.Compose(args[0], this)); Send(bl.ToArray()); } else { var bl = new BinaryList(); bl.AddUInt8((byte)(0xA0 | (byte)action)) .AddUInt32(callbackId) .AddUInt8Array(Codec.Compose(args, this)); Send(bl.ToArray()); } } internal AsyncReply SendSubscribeRequest(uint instanceId, byte index) { return SendRequest(IIPPacketRequest.Subscribe, instanceId, index); } internal AsyncReply SendUnsubscribeRequest(uint instanceId, byte index) { return SendRequest(IIPPacketRequest.Unsubscribe, instanceId, index); } public AsyncReply StaticCall(UUID classId, byte index, Map parameters) { return SendRequest(IIPPacketRequest.StaticCall, classId, index, parameters); } public AsyncReply Call(string procedureCall, params object[] parameters) { var args = new Map(); for (byte i = 0; i < parameters.Length; i++) args.Add(i, parameters[i]); return Call(procedureCall, args); } public AsyncReply Call(string procedureCall, Map parameters) { return SendRequest(IIPPacketRequest.ProcedureCall, procedureCall, parameters); } internal AsyncReply SendInvoke(uint instanceId, byte index, Map parameters) { return SendRequest(IIPPacketRequest.InvokeFunction, instanceId, index, parameters); } internal AsyncReply SendSetProperty(uint instanceId, byte index, object value) { return SendRequest(IIPPacketRequest.SetProperty, instanceId, index, value); } internal AsyncReply SendDetachRequest(uint instanceId) { try { var sendDetach = false; if (attachedResources.ContainsKey(instanceId)) { attachedResources.Remove(instanceId); sendDetach = true; } if (suspendedResources.ContainsKey(instanceId)) { suspendedResources.Remove(instanceId); sendDetach = true; } if (sendDetach) return SendRequest(IIPPacketRequest.DetachResource, instanceId); return null; // no one is waiting for this } catch { return null; } } void SendError(ErrorType type, uint callbackId, ushort errorCodeOrWarningLevel, string message = "") { if (type == ErrorType.Management) SendReply(IIPPacketReply.PermissionError, callbackId, errorCodeOrWarningLevel, message); else if (type == ErrorType.Exception) SendReply(IIPPacketReply.ExecutionError, callbackId, errorCodeOrWarningLevel, message); else if (type == ErrorType.Warning) SendReply(IIPPacketReply.Warning, callbackId, (byte)errorCodeOrWarningLevel, message); } internal void SendProgress(uint callbackId, uint value, uint max) { SendReply(IIPPacketReply.Progress, callbackId, value, max); } internal void SendChunk(uint callbackId, object chunk) { SendReply(IIPPacketReply.Chunk, callbackId, chunk); } void IIPReplyCompleted(uint callbackId, TransmissionType dataType, byte[] data) { var req = requests.Take(callbackId); if (req == null) { // @TODO: Send general failure return; } var (_, parsed) = Codec.ParseAsync(data, 0, this, null); if (parsed is AsyncReply reply) { reply.Then(result => { req.Trigger(result); }); } else { req.Trigger(parsed); } } void IIPExtensionAction(byte actionId, TransmissionType? dataType, byte[] data) { // nothing is supported now } void IIPReplyPropagated(uint callbackId, TransmissionType dataType, byte[] data) { var req = requests[callbackId]; if (req == null) { // @TODO: Send general failure return; } var (_, parsed) = Codec.ParseAsync(data, 0, this, null); if (parsed is AsyncReply reply) { reply.Then(result => { req.TriggerPropagation(result); }); } else { req.TriggerPropagation(parsed); } } void IIPReplyError(uint callbackId, TransmissionType dataType, byte[] data, ErrorType type) { var req = requests.Take(callbackId); if (req == null) { // @TODO: Send general failure return; } var args = DataDeserializer.ListParser(data, dataType.Offset, (uint)dataType.ContentLength) as object[]; var errorCode = (ushort)args[0]; var errorMsg = (string)args[1]; req.TriggerError(new AsyncException(type, errorCode, errorMsg)); } void IIPReplyProgress(uint callbackId, TransmissionType dataType, byte[] data) { var req = requests[callbackId]; if (req == null) { // @TODO: Send general failure return; } var args = DataDeserializer.ListParser(data, dataType.Offset, (uint)dataType.ContentLength) as object[]; var current = (uint)args[0]; var total = (uint)args[1]; req.TriggerProgress(ProgressType.Execution, current, total); } void IIPReplyWarning(uint callbackId, TransmissionType dataType, byte[] data) { var req = requests[callbackId]; if (req == null) { // @TODO: Send general failure return; } var args = DataDeserializer.ListParser(data, dataType.Offset, (uint)dataType.ContentLength) as object[]; var level = (byte)args[0]; var message = (string)args[1]; req.TriggerWarning(level, message); } void IIPReplyChunk(uint callbackId, TransmissionType dataType, byte[] data) { var req = requests[callbackId]; if (req == null) return; var (_, parsed) = Codec.ParseAsync(data, dataType.Offset, this, null, dataType); if (parsed is AsyncReply reply) reply.Then(result => req.TriggerChunk(result)); else req.TriggerChunk(parsed); } void IIPNotificationResourceReassigned(TransmissionType dataType, byte[] data) { // uint resourceId, uint newResourceId } void IIPNotificationResourceMoved(TransmissionType dataType, byte[] data) { } void IIPNotificationSystemFailure(TransmissionType dataType, byte[] data) { } void IIPNotificationResourceDestroyed(TransmissionType dataType, byte[] data) { var (size, rt) = Codec.ParseSync(data, dataType.Offset, dataType); var resourceId = (uint)rt; if (attachedResources.Contains(resourceId)) { DistributedResource r; if (attachedResources[resourceId].TryGetTarget(out r)) { // remove from attached to avoid sending unnecessary detach request when Destroy() is called attachedResources.Remove(resourceId); r.Destroy(); } else { attachedResources.Remove(resourceId); } } else if (neededResources.Contains(resourceId)) { // @TODO: handle this mess neededResources.Remove(resourceId); } } void IIPNotificationPropertyModified(TransmissionType dataType, byte[] data) { // resourceId, index, value var (valueOffset, valueSize, args) = DataDeserializer.LimitedCountListParser(data, dataType.Offset, dataType.ContentLength, 2); var rid = (uint)args[0]; var index = (byte)args[1]; Fetch(rid, null).Then(r => { var pt = r.Instance.Template.GetPropertyTemplateByIndex(index); if (pt != null) return; var item = new AsyncReply(); queue.Add(item); var (_, parsed) = Codec.ParseAsync(data, valueOffset, this, null); if (parsed is AsyncReply) { (parsed as AsyncReply).Then((result) => { item.Trigger(new DistributedResourceQueueItem((DistributedResource)r, DistributedResourceQueueItem.DistributedResourceQueueItemType.Propery, result, index)); }); } else { item.Trigger(new DistributedResourceQueueItem((DistributedResource)r, DistributedResourceQueueItem.DistributedResourceQueueItemType.Propery, parsed, index)); } }); } void IIPNotificationEventOccurred(TransmissionType dataType, byte[] data) { // resourceId, index, value var (valueOffset, valueSize, args) = DataDeserializer.LimitedCountListParser(data, dataType.Offset, dataType.ContentLength, 2); var resourceId = (uint)args[0]; var index = (byte)args[1]; Fetch(resourceId, null).Then(r => { var et = r.Instance.Template.GetEventTemplateByIndex(index); if (et == null) // this should never happen return; // push to the queue to guarantee serialization var item = new AsyncReply(); queue.Add(item); var (_, parsed) = Codec.ParseAsync(data, valueOffset, this, null); if (parsed is AsyncReply) { (parsed as AsyncReply).Then((result) => { item.Trigger(new DistributedResourceQueueItem((DistributedResource)r, DistributedResourceQueueItem.DistributedResourceQueueItemType.Event, result, index)); }); } else { item.Trigger(new DistributedResourceQueueItem((DistributedResource)r, DistributedResourceQueueItem.DistributedResourceQueueItemType.Event, parsed, index)); } }); } void IIPEventRenamed(uint resourceId, string name) { Fetch(resourceId, null).Then(resource => { resource.Instance.Variables["name"] = name; }); } void IIPRequestAttachResource(uint callback, TransmissionType dataType, byte[] data) { var (_, value) = Codec.ParseSync(data, 0, dataType); var resourceId = (uint)value; Instance.Warehouse.GetById(resourceId).Then((res) => { if (res != null) { if (res.Instance.Applicable(session, ActionType.Attach, null) == Ruling.Denied) { SendError(ErrorType.Management, callback, 6); return; } var r = res as IResource; // unsubscribe Unsubscribe(r); if (r is DistributedResource dr) { // reply ok SendReply(IIPPacketReply.Completed, callback, r.Instance.Template.ClassId, r.Instance.Age, r.Instance.Link, r.Instance.Hops, dr._Serialize()); } else { // reply ok SendReply(IIPPacketReply.Completed, callback, r.Instance.Template.ClassId, r.Instance.Age, r.Instance.Link, r.Instance.Hops, r.Instance.Serialize()); } // subscribe Subscribe(r); } else { // reply failed Global.Log("DistributedConnection", LogType.Debug, "Not found " + resourceId); SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); } }); } void IIPRequestReattachResource(uint callback, TransmissionType dataType, byte[] data) { // resourceId, index, value var (valueOffset, valueSize, args) = DataDeserializer.LimitedCountListParser(data, dataType.Offset, dataType.ContentLength, 2); var resourceId = (uint)args[0]; var age = (ulong)args[1]; Instance.Warehouse.GetById(resourceId).Then((res) => { if (res != null) { if (res.Instance.Applicable(session, ActionType.Attach, null) == Ruling.Denied) { SendError(ErrorType.Management, callback, 6); return; } var r = res as IResource; // unsubscribe Unsubscribe(r); if (r is DistributedResource dr) { // reply ok SendReply(IIPPacketReply.Completed, callback, r.Instance.Template.ClassId, r.Instance.Age, r.Instance.Link, r.Instance.Hops, dr._SerializeAfter(age)); } else { // reply ok SendReply(IIPPacketReply.Completed, callback, r.Instance.Template.ClassId, r.Instance.Age, r.Instance.Link, r.Instance.Hops, r.Instance.SerializeAfter(age)); } // subscribe Subscribe(r); } else { // reply failed Global.Log("DistributedConnection", LogType.Debug, "Not found " + resourceId); SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); } }); } void IIPRequestDetachResource(uint callback, TransmissionType dataType, byte[] data) { var (_, value) = Codec.ParseSync(data, 0, dataType); var resourceId = (uint)value; Instance.Warehouse.GetById(resourceId).Then((res) => { if (res != null) { // unsubscribe Unsubscribe(res); // remove from cache cache.Remove(res); // remove from attached resources //attachedResources.Remove(res); // reply ok SendReply(IIPPacketReply.Completed, callback); } else { // reply failed SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); } }); } void IIPRequestCreateResource(uint callback, TransmissionType dataType, byte[] data) { var (_, parsed) = Codec.ParseAsync(data, 0, this, null, dataType); var args = (object[])parsed; var path = (string)args[0]; TypeTemplate type = null; if (args[1] is UUID) type = Instance.Warehouse.GetTemplateByClassId((UUID)args[1]); else if (args[1] is string) type = Instance.Warehouse.GetTemplateByClassName((string)args[1]); if (type == null) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ClassNotFound); return; } var props = (Map)((object[])args)[2]; var attrs = (Map)((object[])args)[3]; // Get store var sc = path.Split('/'); Instance.Warehouse.Get(string.Join("/", sc.Take(sc.Length - 1))) .Then(r => { if (r == null) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.StoreNotFound); return; } var store = r.Instance.Store; // check security if (store.Instance.Applicable(session, ActionType.CreateResource, null) != Ruling.Allowed) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.CreateDenied); return; } Instance.Warehouse.New(type.DefinedType, path, null, attrs, props).Then(resource => { SendReply(IIPPacketReply.Completed, callback, resource.Instance.Id); }).Error(e => { SendError(e.Type, callback, (ushort)e.Code, e.Message); }); }).Error(e => { SendError(e.Type, callback, (ushort)e.Code, e.Message); }); } void IIPRequestDeleteResource(uint callback, TransmissionType dataType, byte[] data) { var (_, value) = Codec.ParseSync(data, 0, dataType); var resourceId = (uint)value; Instance.Warehouse.GetById(resourceId).Then(r => { if (r == null) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); return; } if (r.Instance.Store.Instance.Applicable(session, ActionType.Delete, null) != Ruling.Allowed) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.DeleteDenied); return; } if (Instance.Warehouse.Remove(r)) SendReply(IIPPacketReply.Completed, callback); else SendError(ErrorType.Management, callback, (ushort)ExceptionCode.DeleteFailed); }); } void IIPRequestMoveResource(uint callback, TransmissionType dataType, byte[] data) { var (offset, length, args) = DataDeserializer.LimitedCountListParser(data, dataType.Offset, dataType.ContentLength); var resourceId = (uint)args[0]; var name = (string)args[1]; if (name.Contains("/")) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.NotSupported); return; } Instance.Warehouse.GetById(resourceId).Then(resource => { if (resource == null) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); return; } if (resource.Instance.Applicable(this.session, ActionType.Rename, null) != Ruling.Allowed) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.RenameDenied); return; } resource.Instance.Name = name; SendReply(IIPPacketReply.Completed, callback); }); } void IIPRequestToken(uint callback, TransmissionType dataType, byte[] data) { // @TODO: To be implemented } void IIPRequestLinkTemplates(uint callback, TransmissionType dataType, byte[] data) { var (_, value) = Codec.ParseSync(data, 0, dataType); var resourceLink = (string)value; Action queryCallback = (r) => { if (r == null) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); return; } if (r.Instance.Applicable(session, ActionType.ViewTemplate, null) == Ruling.Denied) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.NotAllowed); return; } var templates = TypeTemplate.GetDependencies(r.Instance.Template, Instance.Warehouse); // Send SendReply(IIPPacketReply.Completed, callback, templates.Select(x=>x.Content).ToArray()); }; if (Server?.EntryPoint != null) Server.EntryPoint.Query(resourceLink, this).Then(queryCallback); else Instance.Warehouse.Query(resourceLink).Then(queryCallback); } void IIPRequestTemplateFromClassName(uint callback, TransmissionType dataType, byte[] data) { var (_, value) = Codec.ParseSync(data, 0, dataType); var className = (string)value; var t = Instance.Warehouse.GetTemplateByClassName(className); if (t != null) { SendReply(IIPPacketReply.Completed, callback, t.Content); } else { // reply failed SendError(ErrorType.Management, callback, (ushort)ExceptionCode.TemplateNotFound); } } void IIPRequestTemplateFromClassId(uint callback, TransmissionType dataType, byte[] data) { var (_, value) = Codec.ParseSync(data, 0, dataType); var classId = new UUID((byte[])value); var t = Instance.Warehouse.GetTemplateByClassId(classId); if (t != null) { SendReply(IIPPacketReply.Completed, callback, t.Content); } else { // reply failed SendError(ErrorType.Management, callback, (ushort)ExceptionCode.TemplateNotFound); } } void IIPRequestTemplateFromResourceId(uint callback, TransmissionType dataType, byte[] data) { var (_, value) = Codec.ParseSync(data, 0, dataType); var resourceId = (uint)value; Instance.Warehouse.GetById(resourceId).Then((r) => { if (r != null) { SendReply(IIPPacketReply.Completed, callback, r.Instance.Template.Content); } else { // reply failed SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); } }); } void IIPRequestGetResourceIdByLink(uint callback, TransmissionType dataType, byte[] data) { var (_, parsed) = Codec.ParseSync(data, 0, dataType); var resourceLink = (string)parsed; Action queryCallback = (r) => { if (r == null) SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); else { if (r.Instance.Applicable(session, ActionType.Attach, null) == Ruling.Denied) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); return; } SendReply(IIPPacketReply.Completed, callback, r); } }; if (Server?.EntryPoint != null) Server.EntryPoint.Query(resourceLink, this).Then(queryCallback); else Instance.Warehouse.Query(resourceLink).Then(queryCallback); } void IIPRequestQueryResources(uint callback, TransmissionType dataType, byte[] data) { var (_, parsed) = Codec.ParseSync(data, 0, dataType); var resourceLink = (string)parsed; Action queryCallback = (r) => { if (r == null) SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); else { if (r.Instance.Applicable(session, ActionType.Attach, null) == Ruling.Denied) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.NotAllowed); return; } r.Instance.Children().Then(children => { var list = children.Where(x => x.Instance.Applicable(session, ActionType.Attach, null) != Ruling.Denied).ToArray(); SendReply(IIPPacketReply.Completed, callback, list); }).Error(e => { SendError(e.Type, callback, (ushort)e.Code, e.Message); }); } }; if (Server?.EntryPoint != null) Server.EntryPoint.Query(resourceLink, this) .Then(queryCallback) .Error(e => SendError(e.Type, callback, (ushort)e.Code, e.Message)); else Instance.Warehouse.Query(resourceLink) .Then(queryCallback) .Error(e => SendError(e.Type, callback, (ushort)e.Code, e.Message)); } void IIPRequestResourceAttribute(uint callback, uint resourceId) { } private Tuple SummerizeException(Exception ex) { ex = ex.InnerException != null ? ex.InnerException : ex; var code = (ExceptionLevel & ExceptionLevel.Code) == 0 ? 0 : ex is AsyncException ae ? ae.Code : 0; var msg = (ExceptionLevel & ExceptionLevel.Message) == 0 ? "" : ex.Message; var source = (ExceptionLevel & ExceptionLevel.Source) == 0 ? "" : ex.Source; var trace = (ExceptionLevel & ExceptionLevel.Trace) == 0 ? "" : ex.StackTrace; return new Tuple((ushort)code, $"{source}: {msg}\n{trace}"); } void IIPRequestProcedureCall(uint callback, TransmissionType dataType, byte[] data) { var (offset, length, args) = DataDeserializer.LimitedCountListParser(data, dataType.Offset, dataType.ContentLength, 1); var procedureCall = (string)args[0]; if (Server == null) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.NotSupported); return; } var call = Server.Calls[procedureCall]; if (call == null) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); return; } var (_, parsed) = Codec.ParseAsync(data, offset, this, null); if (parsed is AsyncReply reply) { reply.Then(results => { var arguments = (Map)results; // un hold the socket to send data immediately this.Socket.Unhold(); // @TODO: Make managers for procedure calls //if (r.Instance.Applicable(session, ActionType.Execute, ft) == Ruling.Denied) //{ // SendError(ErrorType.Management, callback, // (ushort)ExceptionCode.InvokeDenied); // return; //} InvokeFunction(call.Value.Template, callback, arguments, IIPPacketRequest.ProcedureCall, call.Value.Delegate.Target); }).Error(x => { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError); }); } else { var arguments = (Map)parsed; // un hold the socket to send data immediately this.Socket.Unhold(); // @TODO: Make managers for procedure calls InvokeFunction(call.Value.Template, callback, arguments, IIPPacketRequest.ProcedureCall, call.Value.Delegate.Target); } } void IIPRequestStaticCall(uint callback, TransmissionType dataType, byte[] data) { var (offset, length, args) = DataDeserializer.LimitedCountListParser(data, dataType.Offset, dataType.ContentLength, 2); var classId = new UUID((byte[])args[0]); var index = (byte)args[1]; var template = Instance.Warehouse.GetTemplateByClassId(classId); if (template == null) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.TemplateNotFound); return; } var ft = template.GetFunctionTemplateByIndex(index); if (ft == null) { // no function at this index SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); return; } var fi = ft.MethodInfo; if (fi == null) { // ft found, fi not found, this should never happen SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); return; } var (_, parsed) = Codec.ParseAsync(data, offset, this, null); if (parsed is AsyncReply reply) { reply.Then(results => { var arguments = (Map)results; // un hold the socket to send data immediately this.Socket.Unhold(); // @TODO: Make managers for static calls //if (r.Instance.Applicable(session, ActionType.Execute, ft) == Ruling.Denied) //{ // SendError(ErrorType.Management, callback, // (ushort)ExceptionCode.InvokeDenied); // return; //} InvokeFunction(ft, callback, arguments, IIPPacketRequest.StaticCall, null); }).Error(x => { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ParseError); }); } else { var arguments = (Map)parsed; // un hold the socket to send data immediately this.Socket.Unhold(); // @TODO: Make managers for static calls InvokeFunction(ft, callback, arguments, IIPPacketRequest.StaticCall, null); } } void IIPRequestInvokeFunction(uint callback, TransmissionType dataType, byte[] data) { var (offset, length, args) = DataDeserializer.LimitedCountListParser(data, dataType.Offset, dataType.ContentLength, 2); var resourceId = (uint)args[0]; var index = (byte)args[1]; Instance.Warehouse.GetById(resourceId).Then((r) => { if (r == null) { // no resource with this id SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); return; } var ft = r.Instance.Template.GetFunctionTemplateByIndex(index); if (ft == null) { // no function at this index SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); return; } var (_, parsed) = Codec.ParseAsync(data, offset, this, null); if (parsed is AsyncReply) { (parsed as AsyncReply).Then(result => { var arguments = (Map)result; // un hold the socket to send data immediately this.Socket.Unhold(); if (r is DistributedResource) { var rt = (r as DistributedResource)._Invoke(index, arguments); if (rt != null) { rt.Then(res => { SendReply(IIPPacketReply.Completed, callback, res); }); } else { // function not found on a distributed object SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); } } else { if (r.Instance.Applicable(session, ActionType.Execute, ft) == Ruling.Denied) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.InvokeDenied); return; } InvokeFunction(ft, callback, arguments, IIPPacketRequest.InvokeFunction, r); } }); } else { var arguments = (Map)parsed; // un hold the socket to send data immediately this.Socket.Unhold(); if (r is DistributedResource) { var rt = (r as DistributedResource)._Invoke(index, arguments); if (rt != null) { rt.Then(res => { SendReply(IIPPacketReply.Completed, callback, res); }); } else { // function not found on a distributed object SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); } } else { if (r.Instance.Applicable(session, ActionType.Execute, ft) == Ruling.Denied) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.InvokeDenied); return; } InvokeFunction(ft, callback, arguments, IIPPacketRequest.InvokeFunction, r); } } }); } void InvokeFunction(FunctionTemplate ft, uint callback, Map arguments, IIPPacketRequest actionType, object target = null) { // cast arguments ParameterInfo[] pis = ft.MethodInfo.GetParameters(); object[] args = new object[pis.Length]; InvocationContext context = null; if (pis.Length > 0) { if (pis.Last().ParameterType == typeof(DistributedConnection)) { for (byte i = 0; i < pis.Length - 1; i++) { if (arguments.ContainsKey(i)) args[i] = DC.CastConvert(arguments[i], pis[i].ParameterType); else if (ft.Arguments[i].Type.Nullable) args[i] = null; else args[i] = Type.Missing; } args[args.Length - 1] = this; } else if (pis.Last().ParameterType == typeof(InvocationContext)) { context = new InvocationContext(this, callback); for (byte i = 0; i < pis.Length - 1; i++) { if (arguments.ContainsKey(i)) args[i] = DC.CastConvert(arguments[i], pis[i].ParameterType); else if (ft.Arguments[i].Type.Nullable) args[i] = null; else args[i] = Type.Missing; } args[args.Length - 1] = context; } else { for (byte i = 0; i < pis.Length; i++) { if (arguments.ContainsKey(i)) args[i] = DC.CastConvert(arguments[i], pis[i].ParameterType); else if (ft.Arguments[i].Type.Nullable) //Nullable.GetUnderlyingType(pis[i].ParameterType) != null) args[i] = null; else args[i] = Type.Missing; } } } object rt; try { rt = ft.MethodInfo.Invoke(target, args); } catch (Exception ex) { var (code, msg) = SummerizeException(ex); msg = "Arguments: " + string.Join(", ", args.Select(x => x?.ToString() ?? "[Null]").ToArray()) + "\r\n" + msg; SendError(ErrorType.Exception, callback, code, msg); return; } if (rt is System.Collections.IEnumerable && !(rt is Array || rt is Map || rt is string)) { var enu = rt as System.Collections.IEnumerable; try { foreach (var v in enu) SendChunk(callback, v); SendReply(IIPPacketReply.Completed, callback); if (context != null) context.Ended = true; } catch (Exception ex) { if (context != null) context.Ended = true; var (code, msg) = SummerizeException(ex); SendError(ErrorType.Exception, callback, code, msg); } } else if (rt is Task) { (rt as Task).ContinueWith(t => { if (context != null) context.Ended = true; #if NETSTANDARD var res = t.GetType().GetTypeInfo().GetProperty("Result").GetValue(t); #else var res = t.GetType().GetProperty("Result").GetValue(t); #endif SendReply(IIPPacketReply.Completed, callback, res); }); } else if (rt is AsyncReply) { (rt as AsyncReply).Then(res => { if (context != null) context.Ended = true; SendReply(IIPPacketReply.Completed, callback, res); }).Error(ex => { var (code, msg) = SummerizeException(ex); SendError(ErrorType.Exception, callback, code, msg); }).Progress((pt, pv, pm) => { SendProgress(callback, pv, pm); }).Chunk(v => { SendChunk(callback, v); }).Warning((level, message) => { SendError(ErrorType.Warning, callback, level, message); }); } else { if (context != null) context.Ended = true; SendReply(IIPPacketReply.Completed, callback, rt); } } void IIPRequestSubscribe(uint callback, TransmissionType dataType, byte[] data) { var (offset, length, args) = DataDeserializer.LimitedCountListParser(data, dataType.Offset, dataType.ContentLength); var resourceId = (uint)args[0]; var index = (byte)args[1]; Instance.Warehouse.GetById(resourceId).Then((r) => { if (r == null) { // resource not found SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); return; } var et = r.Instance.Template.GetEventTemplateByIndex(index); if (et != null) { // et not found SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); return; } if (r is DistributedResource) { (r as DistributedResource).Subscribe(et).Then(x => { SendReply(IIPPacketReply.Completed, callback); }).Error(x => SendError(ErrorType.Exception, callback, (ushort)ExceptionCode.GeneralFailure)); } else { lock (subscriptionsLock) { if (!subscriptions.ContainsKey(r)) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.NotAttached); return; } if (subscriptions[r].Contains(index)) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.AlreadyListened); return; } subscriptions[r].Add(index); SendReply(IIPPacketReply.Completed, callback); } } }); } void IIPRequestUnsubscribe(uint callback, TransmissionType dataType, byte[] data) { var (offset, length, args) = DataDeserializer.LimitedCountListParser(data, dataType.Offset, dataType.ContentLength); var resourceId = (uint)args[0]; var index = (byte)args[1]; Instance.Warehouse.GetById(resourceId).Then((r) => { if (r == null) { // resource not found SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); return; } var et = r.Instance.Template.GetEventTemplateByIndex(index); if (et == null) { // pt not found SendError(ErrorType.Management, callback, (ushort)ExceptionCode.MethodNotFound); return; } if (r is DistributedResource) { (r as DistributedResource).Unsubscribe(et).Then(x => { SendReply(IIPPacketReply.Completed, callback); }).Error(x => SendError(ErrorType.Exception, callback, (ushort)ExceptionCode.GeneralFailure)); } else { lock (subscriptionsLock) { if (!subscriptions.ContainsKey(r)) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.NotAttached); return; } if (!subscriptions[r].Contains(index)) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.AlreadyUnsubscribed); return; } subscriptions[r].Remove(index); SendReply(IIPPacketReply.Completed, callback); } } }); } void IIPRequestSetProperty(uint callback, TransmissionType dataType, byte[] data) { var (offset, length, args) = DataDeserializer.LimitedCountListParser(data, dataType.Offset, dataType.ContentLength, 2); var rid = (uint)args[0]; var index = (byte)args[1]; // un hold the socket to send data immediately this.Socket.Unhold(); Instance.Warehouse.GetById(rid).Then((r) => { if (r == null) { // resource not found SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ResourceNotFound); return; } var pt = r.Instance.Template.GetPropertyTemplateByIndex(index); if (pt != null) { // property not found SendError(ErrorType.Management, callback, (ushort)ExceptionCode.PropertyNotFound); return; } if (r is DistributedResource) { var (_, parsed) = Codec.ParseAsync(data, offset, this, null); if (parsed is AsyncReply) { (parsed as AsyncReply).Then((value) => { // propagation (r as DistributedResource)._Set(index, value).Then((x) => { SendReply(IIPPacketReply.Completed, callback); }).Error(x => { SendError(x.Type, callback, (ushort)x.Code, x.Message); }); }); } } else { var pi = pt.PropertyInfo; if (pi == null) { // pt found, pi not found, this should never happen SendError(ErrorType.Management, callback, (ushort)ExceptionCode.PropertyNotFound); return; } if (r.Instance.Applicable(session, ActionType.SetProperty, pt, this) == Ruling.Denied) { SendError(ErrorType.Exception, callback, (ushort)ExceptionCode.SetPropertyDenied); return; } if (!pi.CanWrite) { SendError(ErrorType.Management, callback, (ushort)ExceptionCode.ReadOnlyProperty); return; } var (_, parsed) = Codec.ParseAsync(data, offset, this, null); if (parsed is AsyncReply) { (parsed as AsyncReply).Then((value) => { if (pi.PropertyType.IsGenericType && pi.PropertyType.GetGenericTypeDefinition() == typeof(PropertyContext<>)) { value = Activator.CreateInstance(pi.PropertyType, this, value); } else { // cast new value type to property type value = DC.CastConvert(value, pi.PropertyType); } try { pi.SetValue(r, value); SendReply(IIPPacketReply.Completed, callback); } catch (Exception ex) { SendError(ErrorType.Exception, callback, 0, ex.Message); } }); } else { if (pi.PropertyType.IsGenericType && pi.PropertyType.GetGenericTypeDefinition() == typeof(PropertyContext<>)) { parsed = Activator.CreateInstance(pi.PropertyType, this, parsed); //value = new DistributedPropertyContext(this, value); } else { // cast new value type to property type parsed = DC.CastConvert(parsed, pi.PropertyType); } try { pi.SetValue(r, parsed); SendReply(IIPPacketReply.Completed, callback); } catch (Exception ex) { SendError(ErrorType.Exception, callback, 0, ex.Message); } } } }); } /// /// Get the ResourceTemplate for a given class Id. /// /// Class GUID. /// ResourceTemplate. public AsyncReply GetTemplate(UUID classId) { if (templates.ContainsKey(classId)) return new AsyncReply(templates[classId]); else if (templateRequests.ContainsKey(classId)) return templateRequests[classId]; var reply = new AsyncReply(); templateRequests.Add(classId, reply); SendRequest(IIPPacketRequest.TemplateFromClassId, classId) .Then((result) => { var args = (object[])result; templateRequests.Remove(classId); templates.Add(((TypeTemplate)args[0]).ClassId, (TypeTemplate)args[0]); Instance.Warehouse.PutTemplate(args[0] as TypeTemplate); reply.Trigger(args[0]); }).Error((ex) => { reply.TriggerError(ex); }); return reply; } public AsyncReply GetTemplateByClassName(string className) { var template = templates.Values.FirstOrDefault(x => x.ClassName == className); if (template != null) return new AsyncReply(template); if (templateByNameRequests.ContainsKey(className)) return templateByNameRequests[className]; var reply = new AsyncReply(); templateByNameRequests.Add(className, reply); SendRequest(IIPPacketRequest.TemplateFromClassName, className) .Then((result) => { var tt = TypeTemplate.Parse((byte[])result); templateByNameRequests.Remove(className); templates.Add(tt.ClassId, tt); Instance.Warehouse.PutTemplate(tt); reply.Trigger(tt); }).Error((ex) => { reply.TriggerError(ex); }); return reply; } // IStore interface /// /// Get a resource by its path. /// /// Path to the resource. /// Resource public AsyncReply Get(string path) { var rt = new AsyncReply(); Query(path).Then(ar => { //if (filter != null) // ar = ar?.Where(filter).ToArray(); // MISSING: should dispatch the unused resources. if (ar?.Length > 0) rt.Trigger(ar[0]); else rt.Trigger(null); }).Error(ex => rt.TriggerError(ex)); return rt; } public AsyncReply GetLinkTemplates(string link) { var reply = new AsyncReply(); SendRequest(IIPPacketRequest.LinkTemplates, link) .Then((result) => { var templates = new List(); foreach (var template in (byte[][])result) { templates.Add(TypeTemplate.Parse(template)); } reply.Trigger(templates.ToArray()); }).Error((ex) => { reply.TriggerError(ex); }); return reply; } /// /// Fetch a resource from the other end /// /// Class GUID /// Resource IdGuid classId /// DistributedResource public AsyncReply Fetch(uint id, uint[] requestSequence) { DistributedResource resource = null; attachedResources[id]?.TryGetTarget(out resource); if (resource != null) return new AsyncReply(resource); resource = neededResources[id]; var requestInfo = resourceRequests[id]; if (requestInfo != null) { if (resource != null && (requestSequence?.Contains(id) ?? false)) { // dead lock avoidance for loop reference. return new AsyncReply(resource); } else if (resource != null && requestInfo.RequestSequence.Contains(id)) { // dead lock avoidance for dependent reference. return new AsyncReply(resource); } else { return requestInfo.Reply; } } else if (resource != null && !resource.DistributedResourceSuspended) { // @REVIEW: this should never happen Global.Log("DCON", LogType.Error, "Resource not moved to attached."); return new AsyncReply(resource); } var newSequence = requestSequence != null ? requestSequence.Concat(new uint[] { id }).ToArray() : new uint[] { id }; var reply = new AsyncReply(); resourceRequests.Add(id, new DistributedResourceAttachRequestInfo(reply, newSequence)); SendRequest(IIPPacketRequest.AttachResource, id) .Then((result) => { if (result == null) { reply.TriggerError(new AsyncException(ErrorType.Management, (ushort)ExceptionCode.ResourceNotFound, "Null response")); return; } // ClassId, Age, Link, Hops, PropertyValue[] var args = (object[])result; var classId = (UUID)args[0]; var age = (ulong)args[1]; var link = (string)args[2]; var hops = (byte)args[3]; var pv = (PropertyValue[])args[4]; DistributedResource dr; TypeTemplate template = null; if (resource == null) { template = Instance.Warehouse.GetTemplateByClassId(classId, TemplateType.Resource); if (template?.DefinedType != null && template.IsWrapper) dr = Activator.CreateInstance(template.DefinedType, this, id, (ulong)args[1], (string)args[2]) as DistributedResource; else dr = new DistributedResource(this, id, (ulong)args[1], (string)args[2]); } else { dr = resource; template = resource.Instance.Template; } var initResource = (DistributedResource ok) => { var (_, parsed) = Codec.ParseAsync(content, 0, this, newSequence, transmissionType); if (parsed is AsyncReply parsedReply) { parsedReply.Then(results => { var ar = results as object[]; var pvs = new List(); for (var i = 0; i < ar.Length; i += 3) pvs.Add(new PropertyValue(ar[i + 2], Convert.ToUInt64(ar[i]), (DateTime)ar[i + 1])); dr._Attach(pvs.ToArray()); resourceRequests.Remove(id); // move from needed to attached. neededResources.Remove(id); attachedResources[id] = new WeakReference(dr); reply.Trigger(dr); }).Error(ex => reply.TriggerError(ex)); } else { var ar = parsed as object[]; var pvs = new List(); for (var i = 0; i < ar.Length; i += 3) pvs.Add(new PropertyValue(ar[i + 2], Convert.ToUInt64(ar[i]), (DateTime)ar[i + 1])); dr._Attach(pvs.ToArray()); resourceRequests.Remove(id); // move from needed to attached. neededResources.Remove(id); attachedResources[id] = new WeakReference(dr); reply.Trigger(dr); } }; if (template == null) { GetTemplate(classId).Then((tmp) => { // ClassId, ResourceAge, ResourceLink, Content if (resource == null) { Instance.Warehouse.Put(this.Instance.Link + "/" + id.ToString(), dr, tmp) .Then(initResource) .Error(ex => reply.TriggerError(ex)); } else { initResource(resource); } }).Error((ex) => { reply.TriggerError(ex); }); } else { if (resource == null) { Instance.Warehouse.Put(this.Instance.Link + "/" + id.ToString(), dr, template) .Then(initResource).Error((ex) => reply.TriggerError(ex)); } else { initResource(resource); } } }).Error((ex) => { reply.TriggerError(ex); }); return reply; } /// /// Query resources at specific link. /// /// Link path. /// public AsyncReply Query(string path) { var str = DC.ToBytes(path); var reply = new AsyncReply(); SendRequest(IIPPacketRequest.Query, path) .Then(result => { reply.Trigger((IResource[])result); }).Error(ex => reply.TriggerError(ex)); return reply; } /// /// Create a new resource. /// /// Resource path. /// Type template. /// Values for the resource properties. /// Resource attributes. /// New resource instance public AsyncReply Create(string path, TypeTemplate type, Map properties, Map attributes) { var reply = new AsyncReply(); SendRequest(IIPPacketRequest.CreateResource, path, type.ClassId, type.CastProperties(properties), attributes) .Then(r => reply.Trigger((DistributedResource)r)) .Error(e => reply.TriggerError(e)) .Warning((l, m) => reply.TriggerWarning(l, m)); return reply; } private void Subscribe(IResource resource) { lock (subscriptionsLock) { resource.Instance.EventOccurred += Instance_EventOccurred; resource.Instance.CustomEventOccurred += Instance_CustomEventOccurred; resource.Instance.PropertyModified += Instance_PropertyModified; resource.Instance.Destroyed += Instance_ResourceDestroyed; subscriptions.Add(resource, new List()); } } private void Unsubscribe(IResource resource) { lock (subscriptionsLock) { // do something with the list... resource.Instance.EventOccurred -= Instance_EventOccurred; resource.Instance.CustomEventOccurred -= Instance_CustomEventOccurred; resource.Instance.PropertyModified -= Instance_PropertyModified; resource.Instance.Destroyed -= Instance_ResourceDestroyed; subscriptions.Remove(resource); } } private void UnsubscribeAll() { lock (subscriptionsLock) { foreach (var resource in subscriptions.Keys) { resource.Instance.EventOccurred -= Instance_EventOccurred; resource.Instance.CustomEventOccurred -= Instance_CustomEventOccurred; resource.Instance.PropertyModified -= Instance_PropertyModified; resource.Instance.Destroyed -= Instance_ResourceDestroyed; } subscriptions.Clear(); } } private void Instance_ResourceDestroyed(IResource resource) { Unsubscribe(resource); // compose the packet SendNotification(IIPPacketNotification.ResourceDestroyed, resource.Instance.Id); } private void Instance_PropertyModified(PropertyModificationInfo info) { SendNotification(IIPPacketNotification.PropertyModified, info.Resource.Instance.Id, info.PropertyTemplate.Index, info.Value); } private void Instance_CustomEventOccurred(CustomEventOccurredInfo info) { if (info.EventTemplate.Subscribable) { lock (subscriptionsLock) { // check the client requested listen if (!subscriptions.ContainsKey(info.Resource)) return; if (!subscriptions[info.Resource].Contains(info.EventTemplate.Index)) return; } } if (!info.Receivers(this.session)) return; if (info.Resource.Instance.Applicable(this.session, ActionType.ReceiveEvent, info.EventTemplate, info.Issuer) == Ruling.Denied) return; // compose the packet SendNotification(IIPPacketNotification.EventOccurred, info.Resource.Instance.Id, info.EventTemplate.Index, info.Value); } private void Instance_EventOccurred(EventOccurredInfo info) { if (info.EventTemplate.Subscribable) { lock (subscriptionsLock) { // check the client requested listen if (!subscriptions.ContainsKey(info.Resource)) return; if (!subscriptions[info.Resource].Contains(info.EventTemplate.Index)) return; } } if (info.Resource.Instance.Applicable(this.session, ActionType.ReceiveEvent, info.EventTemplate, null) == Ruling.Denied) return; // compose the packet SendNotification(IIPPacketNotification.EventOccurred, info.Resource.Instance.Id, info.EventTemplate.Index, info.Value); } void IIPRequestKeepAlive(uint callback, TransmissionType dataType, byte[] data) { var (offset, length, args) = DataDeserializer.LimitedCountListParser(data, dataType.Offset, dataType.ContentLength); var peerTime = (DateTime)args[0]; var interval = (uint)args[0]; uint jitter = 0; var now = DateTime.UtcNow; if (lastKeepAliveReceived != null) { var diff = (uint)(now - (DateTime)lastKeepAliveReceived).TotalMilliseconds; jitter = (uint)Math.Abs((int)diff - (int)interval); } SendRequest(IIPPacketRequest.KeepAlive, now, jitter); lastKeepAliveReceived = now; } }