2
0
mirror of https://github.com/esiur/esiur-dotnet.git synced 2025-05-06 03:32:57 +00:00

AutoReconnect

This commit is contained in:
Esiur Project 2022-08-11 21:28:16 +03:00
parent af94ce318a
commit 21a2061fc4
12 changed files with 355 additions and 154 deletions

View File

@ -38,7 +38,7 @@ namespace Esiur.Core;
[AsyncMethodBuilder(typeof(AsyncReplyBuilder))] [AsyncMethodBuilder(typeof(AsyncReplyBuilder))]
public class AsyncReply public class AsyncReply
{ {
public bool Debug = false; //public bool Debug = false;
protected List<Action<object>> callbacks = new List<Action<object>>(); protected List<Action<object>> callbacks = new List<Action<object>>();
protected object result; protected object result;
@ -75,14 +75,14 @@ public class AsyncReply
if (resultReady) if (resultReady)
return result; return result;
if (Debug) //if (Debug)
Console.WriteLine($"AsyncReply: {Id} Wait"); // Console.WriteLine($"AsyncReply: {Id} Wait");
//mutex = new AutoResetEvent(false); //mutex = new AutoResetEvent(false);
mutex.WaitOne(); mutex.WaitOne();
if (Debug) //if (Debug)
Console.WriteLine($"AsyncReply: {Id} Wait ended"); // Console.WriteLine($"AsyncReply: {Id} Wait ended");
if (exception != null) if (exception != null)
throw exception; throw exception;
@ -90,12 +90,19 @@ public class AsyncReply
return result; return result;
} }
public AsyncReply Timeout(int milliseconds, Action callback)
public AsyncReply Timeout(int milliseconds, Action callback = null)
{ {
Task.Delay(milliseconds).ContinueWith(x => Task.Delay(milliseconds).ContinueWith(x =>
{ {
if (!resultReady && exception == null) if (!resultReady && exception == null)
callback(); {
TriggerError(new AsyncException(ErrorType.Management,
(ushort)ExceptionCode.Timeout, "Execution timeout expired."));
callback?.Invoke();
}
}); });
return this; return this;
@ -106,8 +113,8 @@ public class AsyncReply
if (resultReady) if (resultReady)
return result; return result;
if (Debug) //if (Debug)
Console.WriteLine($"AsyncReply: {Id} Wait"); // Console.WriteLine($"AsyncReply: {Id} Wait");
if (!mutex.WaitOne(millisecondsTimeout)) if (!mutex.WaitOne(millisecondsTimeout))
{ {
@ -116,8 +123,8 @@ public class AsyncReply
throw e; throw e;
} }
if (Debug) //if (Debug)
Console.WriteLine($"AsyncReply: {Id} Wait ended"); // Console.WriteLine($"AsyncReply: {Id} Wait ended");
return result; return result;
} }
@ -138,8 +145,8 @@ public class AsyncReply
if (resultReady) if (resultReady)
{ {
if (Debug) //if (Debug)
Console.WriteLine($"AsyncReply: {Id} Then ready"); // Console.WriteLine($"AsyncReply: {Id} Then ready");
callback(result); callback(result);
return this; return this;
@ -159,8 +166,8 @@ public class AsyncReply
//}, null, 15000, 0); //}, null, 15000, 0);
if (Debug) //if (Debug)
Console.WriteLine($"AsyncReply: {Id} Then pending"); // Console.WriteLine($"AsyncReply: {Id} Then pending");
@ -210,8 +217,11 @@ public class AsyncReply
{ {
//timeout?.Dispose(); //timeout?.Dispose();
if (Debug) if (exception != null)
Console.WriteLine($"AsyncReply: {Id} Trigger"); return this;
//if (Debug)
// Console.WriteLine($"AsyncReply: {Id} Trigger");
if (resultReady) if (resultReady)
return this; return this;
@ -227,8 +237,8 @@ public class AsyncReply
cb(result); cb(result);
if (Debug) //if (Debug)
Console.WriteLine($"AsyncReply: {Id} Trigger ended"); // Console.WriteLine($"AsyncReply: {Id} Trigger ended");
} }

View File

@ -41,5 +41,6 @@ public enum ExceptionCode : ushort
AlreadyListened, AlreadyListened,
AlreadyUnlistened, AlreadyUnlistened,
NotListenable, NotListenable,
ParseError ParseError,
Timeout
} }

View File

@ -132,14 +132,12 @@ public static class DataDeserializer
{ {
fixed (byte* ptr = &data[offset]) fixed (byte* ptr = &data[offset])
return connection.Fetch(*(uint*)ptr, requestSequence); return connection.Fetch(*(uint*)ptr, requestSequence);
} }
public static unsafe AsyncReply LocalResourceParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) public static unsafe AsyncReply LocalResourceParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence)
{ {
fixed (byte* ptr = &data[offset]) fixed (byte* ptr = &data[offset])
return Warehouse.GetById(*(uint*)ptr); return Warehouse.GetById(*(uint*)ptr);
} }

View File

@ -41,6 +41,7 @@ using System.Diagnostics;
using static Esiur.Net.Packets.IIPPacket; using static Esiur.Net.Packets.IIPPacket;
using Esiur.Net.HTTP; using Esiur.Net.HTTP;
using System.Timers; using System.Timers;
using System.Threading.Tasks;
namespace Esiur.Net.IIP; namespace Esiur.Net.IIP;
public partial class DistributedConnection : NetworkConnection, IStore public partial class DistributedConnection : NetworkConnection, IStore
@ -374,7 +375,7 @@ public partial class DistributedConnection : NetworkConnection, IStore
Jitter = (uint)x[1]; Jitter = (uint)x[1];
keepAliveTimer.Start(); keepAliveTimer.Start();
//Console.WriteLine($"Keep Alive Received {Jitter}"); //Console.WriteLine($"Keep Alive Received {Jitter}");
}).Error(ex => }).Error(ex =>
{ {
keepAliveTimer.Stop(); keepAliveTimer.Stop();
@ -1001,7 +1002,7 @@ public partial class DistributedConnection : NetworkConnection, IStore
var r = new Random(); var r = new Random();
session.Id = new byte[32]; session.Id = new byte[32];
r.NextBytes(session.Id); r.NextBytes(session.Id);
//SendParams((byte)0x28, session.Id);
SendParams().AddUInt8(0x28) SendParams().AddUInt8(0x28)
.AddUInt8Array(session.Id) .AddUInt8Array(session.Id)
.Done(); .Done();
@ -1012,6 +1013,7 @@ public partial class DistributedConnection : NetworkConnection, IStore
{ {
ready = true; ready = true;
openReply?.Trigger(true); openReply?.Trigger(true);
openReply = null;
OnReady?.Invoke(this); OnReady?.Invoke(this);
Server?.Membership?.Login(session); Server?.Membership?.Login(session);
@ -1020,12 +1022,16 @@ public partial class DistributedConnection : NetworkConnection, IStore
}).Error(x => }).Error(x =>
{ {
openReply?.TriggerError(x); openReply?.TriggerError(x);
openReply = null;
}); });
} }
else else
{ {
ready = true; ready = true;
openReply?.Trigger(true); openReply?.Trigger(true);
openReply = null;
OnReady?.Invoke(this); OnReady?.Invoke(this);
Server?.Membership?.Login(session); Server?.Membership?.Login(session);
} }
@ -1125,12 +1131,20 @@ public partial class DistributedConnection : NetworkConnection, IStore
{ {
openReply?.Trigger(true); openReply?.Trigger(true);
OnReady?.Invoke(this); OnReady?.Invoke(this);
openReply = null;
}).Error(x => openReply?.TriggerError(x));
}).Error(x =>
{
openReply?.TriggerError(x);
openReply = null;
});
} }
else else
{ {
openReply?.Trigger(true); openReply?.Trigger(true);
openReply = null;
OnReady?.Invoke(this); OnReady?.Invoke(this);
} }
@ -1140,7 +1154,9 @@ public partial class DistributedConnection : NetworkConnection, IStore
} }
else if (authPacket.Command == IIPAuthPacket.IIPAuthPacketCommand.Error) else if (authPacket.Command == IIPAuthPacket.IIPAuthPacketCommand.Error)
{ {
invalidCredentials = true;
openReply?.TriggerError(new AsyncException(ErrorType.Management, authPacket.ErrorCode, authPacket.ErrorMessage)); openReply?.TriggerError(new AsyncException(ErrorType.Management, authPacket.ErrorCode, authPacket.ErrorMessage));
openReply = null;
OnError?.Invoke(this, authPacket.ErrorCode, authPacket.ErrorMessage); OnError?.Invoke(this, authPacket.ErrorCode, authPacket.ErrorMessage);
Close(); Close();
} }
@ -1187,6 +1203,15 @@ public partial class DistributedConnection : NetworkConnection, IStore
} }
} }
[Attribute]
public ExceptionLevel ExceptionLevel { get; set; }
= ExceptionLevel.Code | ExceptionLevel.Message | ExceptionLevel.Source | ExceptionLevel.Trace;
bool invalidCredentials = false;
[Attribute]
public bool AutoReconnect { get; set; } = false;
[Attribute] [Attribute]
public string Username { get; set; } public string Username { get; set; }
@ -1261,6 +1286,8 @@ public partial class DistributedConnection : NetworkConnection, IStore
session.LocalAuthentication.Domain = domain; session.LocalAuthentication.Domain = domain;
session.LocalAuthentication.Username = username; session.LocalAuthentication.Username = username;
localPasswordOrToken = passwordOrToken; localPasswordOrToken = passwordOrToken;
invalidCredentials = false;
//localPassword = password; //localPassword = password;
} }
@ -1275,43 +1302,98 @@ public partial class DistributedConnection : NetworkConnection, IStore
if (hostname != null) if (hostname != null)
this._hostname = hostname; this._hostname = hostname;
connectSocket(socket);
return openReply;
}
void connectSocket(ISocket socket)
{
socket.Connect(this._hostname, this._port).Then(x => socket.Connect(this._hostname, this._port).Then(x =>
{ {
Assign(socket); Assign(socket);
}).Error((x) => }).Error((x) =>
{ {
openReply.TriggerError(x); if (AutoReconnect)
openReply = null; {
Console.Write("Reconnecting socket...");
Task.Delay(5000).ContinueWith((x) => connectSocket(socket));
}
else
{
openReply.TriggerError(x);
openReply = null;
}
}); });
return openReply;
} }
public async AsyncReply<bool> Reconnect() public async AsyncReply<bool> Reconnect()
{ {
try try
{ {
if (await Connect()) if (!await Connect())
return false;
try
{ {
try
{
var bag = new AsyncBag<IResource>();
for (var i = 0; i < resources.Keys.Count; i++) var toBeRestored = new List<DistributedResource>();
foreach (KeyValuePair<uint, WeakReference<DistributedResource>> kv in suspendedResources)
{
DistributedResource r;
if (kv.Value.TryGetTarget(out r))
toBeRestored.Add(r);
}
foreach (var r in toBeRestored)
{
var link = DC.ToBytes(r.Link);
Console.WriteLine("Restoreing " + r.Link);
try
{ {
var index = resources.Keys.ElementAt(i); var ar = await SendRequest(IIPPacket.IIPPacketAction.QueryLink)
bag.Add(Fetch(index, null)); .AddUInt16((ushort)link.Length)
} .AddUInt8Array(link)
.Done();
bag.Seal(); var dataType = (TransmissionType)ar[0];
await bag; var data = ar[1] as byte[];
}
catch (Exception ex) if (dataType.Identifier == TransmissionTypeIdentifier.ResourceList)
{ {
Global.Log(ex); // parse them as int
//print(ex.toString()); var id = data.GetUInt32(8, Endian.Little);
if (id != r.Id)
r.Id = id;
neededResources[id] = r;
suspendedResources.Remove(id);
await Fetch(id, null);
}
}
catch (AsyncException ex)
{
if (ex.Code == ExceptionCode.ResourceNotFound)
{
// skip this resource
}
else
{
break;
}
}
} }
} }
catch (Exception ex)
{
Global.Log(ex);
}
} }
catch catch
{ {
@ -1331,7 +1413,7 @@ public partial class DistributedConnection : NetworkConnection, IStore
public AsyncReply<bool> Put(IResource resource) public AsyncReply<bool> Put(IResource resource)
{ {
if (Codec.IsLocalResource(resource, this)) if (Codec.IsLocalResource(resource, this))
resources.Add((resource as DistributedResource).Id, (DistributedResource)resource); neededResources.Add((resource as DistributedResource).Id, (DistributedResource)resource);
// else ... send it to the peer // else ... send it to the peer
return new AsyncReply<bool>(true); return new AsyncReply<bool>(true);
} }
@ -1394,6 +1476,7 @@ public partial class DistributedConnection : NetworkConnection, IStore
{ {
if (session.LocalAuthentication.Type == AuthenticationType.Client) if (session.LocalAuthentication.Type == AuthenticationType.Client)
Declare(); Declare();
} }
protected override void Disconencted() protected override void Disconencted()
@ -1403,6 +1486,7 @@ public partial class DistributedConnection : NetworkConnection, IStore
keepAliveTimer.Stop(); keepAliveTimer.Stop();
// @TODO: lock requests
foreach (var x in requests.Values) foreach (var x in requests.Values)
{ {
@ -1445,17 +1529,39 @@ public partial class DistributedConnection : NetworkConnection, IStore
templateRequests.Clear(); templateRequests.Clear();
foreach (var x in attachedResources.Values)
{
DistributedResource r;
if (x.TryGetTarget(out r))
{
r.Suspend();
suspendedResources[r.Id] = x;
}
}
if (Server != null)
{
suspendedResources.Clear();
if (Server != null) {
foreach (var x in resources.Values)
x.Suspend();
UnsubscribeAll(); UnsubscribeAll();
Warehouse.Remove(this); Warehouse.Remove(this);
if (ready) if (ready)
Server.Membership?.Logout(session); Server.Membership?.Logout(session);
};
}
else if (AutoReconnect && !invalidCredentials)
{
// reconnect
Task.Delay(5000).ContinueWith((x) => Reconnect());
}
else
{
suspendedResources.Clear();
}
attachedResources.Clear();
ready = false; ready = false;
} }

View File

@ -37,30 +37,31 @@ using System.Reflection;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using System.Security.Cryptography.X509Certificates; using System.Security.Cryptography.X509Certificates;
using Esiur.Misc;
namespace Esiur.Net.IIP; namespace Esiur.Net.IIP;
partial class DistributedConnection partial class DistributedConnection
{ {
KeyList<uint, DistributedResource> resources = new KeyList<uint, DistributedResource>(); KeyList<uint, DistributedResource> neededResources = new KeyList<uint, DistributedResource>();
KeyList<uint, WeakReference<DistributedResource>> attachedResources = new KeyList<uint, WeakReference<DistributedResource>>();
KeyList<uint, WeakReference<DistributedResource>> suspendedResources = new KeyList<uint, WeakReference<DistributedResource>>();
KeyList<uint, AsyncReply<DistributedResource>> resourceRequests = new KeyList<uint, AsyncReply<DistributedResource>>(); KeyList<uint, AsyncReply<DistributedResource>> resourceRequests = new KeyList<uint, AsyncReply<DistributedResource>>();
KeyList<Guid, AsyncReply<TypeTemplate>> templateRequests = new KeyList<Guid, AsyncReply<TypeTemplate>>(); KeyList<Guid, AsyncReply<TypeTemplate>> templateRequests = new KeyList<Guid, AsyncReply<TypeTemplate>>();
KeyList<string, AsyncReply<TypeTemplate>> templateByNameRequests = new KeyList<string, AsyncReply<TypeTemplate>>(); KeyList<string, AsyncReply<TypeTemplate>> templateByNameRequests = new KeyList<string, AsyncReply<TypeTemplate>>();
KeyList<string, AsyncReply<IResource>> pathRequests = new KeyList<string, AsyncReply<IResource>>();
Dictionary<Guid, TypeTemplate> templates = new Dictionary<Guid, TypeTemplate>(); Dictionary<Guid, TypeTemplate> templates = new Dictionary<Guid, TypeTemplate>();
KeyList<uint, AsyncReply> requests = new KeyList<uint, AsyncReply>(); KeyList<uint, AsyncReply> requests = new KeyList<uint, AsyncReply>();
volatile uint callbackCounter = 0; volatile uint callbackCounter = 0;
//List<IResource> subscriptions = new List<IResource>(); Dictionary<IResource, List<byte>> subscriptions = new Dictionary<IResource, List<byte>>();
Dictionary<IResource, List<byte>> subscriptions = new Dictionary<IResource, List<byte>>();// new List<IResource>();
// resources might get attched by the client
internal KeyList<IResource, DateTime> cache = new(); internal KeyList<IResource, DateTime> cache = new();
object subscriptionsLock = new object(); object subscriptionsLock = new object();
@ -217,6 +218,24 @@ partial class DistributedConnection
} }
} }
public async void DetachResource(uint instanceId)
{
try
{
if (attachedResources.ContainsKey(instanceId))
attachedResources.Remove(instanceId);
if (suspendedResources.ContainsKey(instanceId))
suspendedResources.Remove(instanceId);
await SendDetachRequest(instanceId);
}
catch
{
}
}
void SendError(ErrorType type, uint callbackId, ushort errorCode, string errorMessage = "") void SendError(ErrorType type, uint callbackId, ushort errorCode, string errorMessage = "")
{ {
var msg = DC.ToBytes(errorMessage); var msg = DC.ToBytes(errorMessage);
@ -306,11 +325,19 @@ partial class DistributedConnection
void IIPEventResourceDestroyed(uint resourceId) void IIPEventResourceDestroyed(uint resourceId)
{ {
if (resources.Contains(resourceId)) if (attachedResources.Contains(resourceId))
{ {
var r = resources[resourceId]; DistributedResource r;
resources.Remove(resourceId);
r.Destroy(); if (attachedResources[resourceId].TryGetTarget(out r))
r.Destroy();
attachedResources.Remove(resourceId);
}
else if (neededResources.Contains(resourceId))
{
// @TODO: handle this mess
neededResources.Remove(resourceId);
} }
} }
@ -1229,9 +1256,6 @@ partial class DistributedConnection
} }
[Attribute]
public ExceptionLevel ExceptionLevel { get; set; }
= ExceptionLevel.Code | ExceptionLevel.Message | ExceptionLevel.Source | ExceptionLevel.Trace;
private Tuple<ushort, string> SummerizeException(Exception ex) private Tuple<ushort, string> SummerizeException(Exception ex)
{ {
@ -2115,18 +2139,18 @@ partial class DistributedConnection
*/ */
} }
/// <summary> ///// <summary>
/// Retrive a resource by its instance Id. ///// Retrive a resource by its instance Id.
/// </summary> ///// </summary>
/// <param name="iid">Instance Id</param> ///// <param name="iid">Instance Id</param>
/// <returns>Resource</returns> ///// <returns>Resource</returns>
public AsyncReply<IResource> Retrieve(uint iid) //public AsyncReply<IResource> Retrieve(uint iid)
{ //{
foreach (var r in resources.Values) // foreach (var r in resources.Values)
if (r.Instance.Id == iid) // if (r.Instance.Id == iid)
return new AsyncReply<IResource>(r); // return new AsyncReply<IResource>(r);
return new AsyncReply<IResource>(null); // return new AsyncReply<IResource>(null);
} //}
public AsyncReply<TypeTemplate[]> GetLinkTemplates(string link) public AsyncReply<TypeTemplate[]> GetLinkTemplates(string link)
@ -2173,7 +2197,15 @@ partial class DistributedConnection
/// <returns>DistributedResource</returns> /// <returns>DistributedResource</returns>
public AsyncReply<DistributedResource> Fetch(uint id, uint[] requestSequence) public AsyncReply<DistributedResource> Fetch(uint id, uint[] requestSequence)
{ {
var resource = resources[id]; DistributedResource resource = null;
attachedResources[id]?.TryGetTarget(out resource);
if (resource != null)
return new AsyncReply<DistributedResource>(resource);
resource = neededResources[id];
var request = resourceRequests[id]; var request = resourceRequests[id];
if (request != null) if (request != null)
@ -2185,7 +2217,10 @@ partial class DistributedConnection
} }
else if (resource != null && !resource.Suspended) else if (resource != null && !resource.Suspended)
{ {
// @REVIEW: this should never happen
Global.Log("DCON", LogType.Error, "Resource not moved to attached.");
return new AsyncReply<DistributedResource>(resource); return new AsyncReply<DistributedResource>(resource);
} }
@ -2220,7 +2255,10 @@ partial class DistributedConnection
dr = new DistributedResource(this, id, (ulong)rt[1], (string)rt[2]); dr = new DistributedResource(this, id, (ulong)rt[1], (string)rt[2]);
} }
else else
{
dr = resource; dr = resource;
template = resource.Instance.Template;
}
var transmissionType = (TransmissionType)rt[3]; var transmissionType = (TransmissionType)rt[3];
var content = (byte[])rt[4]; var content = (byte[])rt[4];
@ -2239,6 +2277,9 @@ partial class DistributedConnection
dr._Attach(pvs.ToArray());// (PropertyValue[])pvs); dr._Attach(pvs.ToArray());// (PropertyValue[])pvs);
resourceRequests.Remove(id); resourceRequests.Remove(id);
// move from needed to attached.
neededResources.Remove(id);
attachedResources[id] = new WeakReference<DistributedResource>(dr);
reply.Trigger(dr); reply.Trigger(dr);
}).Error(ex => reply.TriggerError(ex)); }).Error(ex => reply.TriggerError(ex));

View File

@ -108,6 +108,7 @@ public class DistributedResource : DynamicObject, IResource
public uint Id public uint Id
{ {
get { return instanceId; } get { return instanceId; }
internal set { instanceId = value; }
} }
/// <summary> /// <summary>
@ -117,7 +118,7 @@ public class DistributedResource : DynamicObject, IResource
{ {
destroyed = true; destroyed = true;
attached = false; attached = false;
connection.SendDetachRequest(instanceId); connection.DetachResource(instanceId);
OnDestroy?.Invoke(this); OnDestroy?.Invoke(this);
} }
@ -223,13 +224,13 @@ public class DistributedResource : DynamicObject, IResource
public AsyncReply<object> _Invoke(byte index, Map<byte, object> args) public AsyncReply<object> _Invoke(byte index, Map<byte, object> args)
{ {
if (destroyed) if (destroyed)
throw new Exception("Trying to access a destroyed object"); throw new Exception("Trying to access a destroyed object.");
if (suspended) if (suspended)
throw new Exception("Trying to access suspended object"); throw new Exception("Trying to access a suspended object.");
if (index >= Instance.Template.Functions.Length) if (index >= Instance.Template.Functions.Length)
throw new Exception("Function index is incorrect"); throw new Exception("Function index is incorrect.");
var ft = Instance.Template.GetFunctionTemplateByIndex(index); var ft = Instance.Template.GetFunctionTemplateByIndex(index);
@ -282,6 +283,12 @@ public class DistributedResource : DynamicObject, IResource
public override bool TryInvokeMember(InvokeMemberBinder binder, object[] args, out object result) public override bool TryInvokeMember(InvokeMemberBinder binder, object[] args, out object result)
{ {
if (destroyed)
throw new Exception("Trying to access a destroyed object.");
if (suspended)
throw new Exception("Trying to access a suspended object.");
var ft = Instance.Template.GetFunctionTemplateByName(binder.Name); var ft = Instance.Template.GetFunctionTemplateByName(binder.Name);
var reply = new AsyncReply<object>(); var reply = new AsyncReply<object>();
@ -361,7 +368,7 @@ public class DistributedResource : DynamicObject, IResource
public override bool TryGetMember(GetMemberBinder binder, out object result) public override bool TryGetMember(GetMemberBinder binder, out object result)
{ {
if (destroyed) if (destroyed)
throw new Exception("Trying to access destroyed object"); throw new Exception("Trying to access a destroyed object.");
result = null; result = null;
@ -430,10 +437,10 @@ public class DistributedResource : DynamicObject, IResource
public override bool TrySetMember(SetMemberBinder binder, object value) public override bool TrySetMember(SetMemberBinder binder, object value)
{ {
if (destroyed) if (destroyed)
throw new Exception("Trying to access destroyed object"); throw new Exception("Trying to access a destroyed object.");
if (suspended) if (suspended)
throw new Exception("Trying to access suspended object"); throw new Exception("Trying to access a suspended object.");
if (!attached) if (!attached)
return false; return false;
@ -519,4 +526,9 @@ public class DistributedResource : DynamicObject, IResource
// do nothing. // do nothing.
return new AsyncReply<bool>(true); return new AsyncReply<bool>(true);
} }
~DistributedResource()
{
Destroy();
}
} }

View File

@ -34,6 +34,7 @@ using Esiur.Core;
using System.Net; using System.Net;
using Esiur.Resource; using Esiur.Resource;
using Esiur.Security.Membership; using Esiur.Security.Membership;
using System.Threading.Tasks;
namespace Esiur.Net.IIP; namespace Esiur.Net.IIP;
public class DistributedServer : NetworkServer<DistributedConnection>, IResource public class DistributedServer : NetworkServer<DistributedConnection>, IResource
@ -148,11 +149,20 @@ public class DistributedServer : NetworkServer<DistributedConnection>, IResource
// base.AddConnection(connection); // base.AddConnection(connection);
//} //}
bool one = false;
protected override void ClientConnected(DistributedConnection connection) protected override void ClientConnected(DistributedConnection connection)
{ {
// if (!one)
//connection.OnReady += ConnectionReadyEventReceiver; //connection.OnReady += ConnectionReadyEventReceiver;
Task.Delay(10000).ContinueWith((x) =>
{
Console.WriteLine("By bye");
// Remove me from here
connection.Close();
one = true;
});
} }
public override void Add(DistributedConnection connection) public override void Add(DistributedConnection connection)
@ -178,9 +188,10 @@ public class DistributedServer : NetworkServer<DistributedConnection>, IResource
public KeyList<string, Delegate> Calls { get; } = new KeyList<string, Delegate>(); public KeyList<string, Delegate> Calls { get; } = new KeyList<string, Delegate>();
public void MapCall(string call, Delegate handler) public DistributedServer MapCall(string call, Delegate handler)
{ {
Calls.Add(call, handler); Calls.Add(call, handler);
return this;
} }
} }

View File

@ -158,8 +158,8 @@ public class TCPSocket : ISocket
{ {
state = SocketState.Established; state = SocketState.Established;
//OnConnect?.Invoke(); //OnConnect?.Invoke();
Receiver?.NetworkConnect(this); Receiver?.NetworkConnect(this);
Begin(); Begin();
rt.Trigger(true); rt.Trigger(true);
} }
@ -396,32 +396,32 @@ public class TCPSocket : ISocket
public void Close() public void Close()
{ {
if (state != SocketState.Closed)// && state != SocketState.Terminated) if (state == SocketState.Closed)
return; // && state != SocketState.Terminated)
state = SocketState.Closed;
if (sock.Connected)
{ {
state = SocketState.Closed;
if (sock.Connected)
{
try
{
sock.Shutdown(SocketShutdown.Both);
}
catch
{
}
}
try try
{ {
sendBufferQueue?.Clear(); sock.Shutdown(SocketShutdown.Both);
Receiver?.NetworkClose(this);
} }
catch (Exception ex) catch
{ {
Global.Log(ex);
} }
} }
try
{
sendBufferQueue?.Clear();
Receiver?.NetworkClose(this);
}
catch (Exception ex)
{
Global.Log(ex);
}
} }
public void Send(byte[] message) public void Send(byte[] message)
@ -543,7 +543,7 @@ public class TCPSocket : ISocket
public void Destroy() public void Destroy()
{ {
Close(); Close();
receiveNetworkBuffer = null; receiveNetworkBuffer = null;
@ -560,7 +560,7 @@ public class TCPSocket : ISocket
OnDestroy?.Invoke(this); OnDestroy?.Invoke(this);
OnDestroy = null; OnDestroy = null;
} }
public ISocket Accept() public ISocket Accept()
{ {

View File

@ -36,8 +36,7 @@ using Esiur.Security.Authority;
namespace Esiur.Resource; namespace Esiur.Resource;
public interface IStore : IResource public interface IStore : IResource
{ {
AsyncReply<IResource> Get(string path);//, Func<IResource, bool> filter = null); AsyncReply<IResource> Get(string path);
//AsyncReply<IResource> Retrieve(uint iid);
AsyncReply<bool> Put(IResource resource); AsyncReply<bool> Put(IResource resource);
string Link(IResource resource); string Link(IResource resource);
bool Record(IResource resource, string propertyName, object value, ulong? age, DateTime? dateTime); bool Record(IResource resource, string propertyName, object value, ulong? age, DateTime? dateTime);

View File

@ -873,7 +873,7 @@ public static class Warehouse
resources.TryRemove(resource.Instance.Id, out resourceReference); resources.TryRemove(resource.Instance.Id, out resourceReference);
else else
return false; return false;
//}
if (resource != resource.Instance.Store) if (resource != resource.Instance.Store)
{ {
@ -884,30 +884,16 @@ public static class Warehouse
lock (((ICollection)list).SyncRoot) lock (((ICollection)list).SyncRoot)
list.Remove(resourceReference); list.Remove(resourceReference);
//list.TryTake(resourceReference); }
}//.Remove(resourceReference);
} }
if (resource is IStore) if (resource is IStore)
{ {
var store = resource as IStore; var store = resource as IStore;
List<WeakReference<IResource>> toBeRemoved;// = stores[store]; List<WeakReference<IResource>> toBeRemoved;
stores.TryRemove(store, out toBeRemoved); stores.TryRemove(store, out toBeRemoved);
//lock (resourcesLock)
//{
// // remove all objects associated with the store
// toBeRemoved = resources.Values.Where(x =>
// {
// IResource r;
// if (x.TryGetTarget(out r))
// return r.Instance.Store == resource;
// else
// return false;
// }).ToArray();
//}
foreach (var o in toBeRemoved) foreach (var o in toBeRemoved)
{ {

View File

@ -27,17 +27,30 @@ public class MemoryStore : IStore
public string Link(IResource resource) public string Link(IResource resource)
{ {
if (resource.Instance.Store == this) if (resource.Instance.Store == this)
return this.Instance.Name + "/" + resource.Instance.Id; return this.Instance.Name + "/$" + resource.Instance.Id;
return null; return null;
} }
public AsyncReply<IResource> Get(string path) public AsyncReply<IResource> Get(string path)
{ {
foreach (var r in resources)
if (r.Value.Instance.Name == path)
return new AsyncReply<IResource>(r.Value);
if (path.StartsWith("$"))
{
uint id;
if (uint.TryParse(path.Substring(1), out id))
{
foreach (var r in resources)
if (r.Value.Instance.Id == id)
return new AsyncReply<IResource>(r.Value);
}
}
else
{
foreach (var r in resources)
if (r.Value.Instance.Name == path)
return new AsyncReply<IResource>(r.Value);
}
return new AsyncReply<IResource>(null); return new AsyncReply<IResource>(null);
} }

View File

@ -30,7 +30,7 @@ using Esiur.Net.Sockets;
using Esiur.Resource; using Esiur.Resource;
using Esiur.Security.Permissions; using Esiur.Security.Permissions;
using Esiur.Stores; using Esiur.Stores;
using System; using System;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
@ -50,7 +50,7 @@ namespace Test
class Program class Program
{ {
static async Task Main(string[] args) static async Task Main(string[] args)
{ {
@ -58,19 +58,20 @@ namespace Test
var system = await Warehouse.Put("sys", new MemoryStore()); var system = await Warehouse.Put("sys", new MemoryStore());
var server = await Warehouse.Put("sys/server", new DistributedServer()); var server = await Warehouse.Put("sys/server", new DistributedServer());
server.MapCall("Hello", (string msg, DateTime time, DistributedConnection sender) =>
{
Console.WriteLine(msg);
return "Hi " + DateTime.UtcNow;
});
var web = await Warehouse.Put("sys/web", new HTTPServer() { Port = 8088}); var web = await Warehouse.Put("sys/web", new HTTPServer() { Port = 8088 });
var service = await Warehouse.Put("sys/service", new MyService()); var service = await Warehouse.Put("sys/service", new MyService());
var res1 = await Warehouse.Put("sys/service/r1", new MyResource() { Description = "Testing 1", CategoryId = 10 }); var res1 = await Warehouse.Put("sys/service/r1", new MyResource() { Description = "Testing 1", CategoryId = 10 });
var res2 = await Warehouse.Put("sys/service/r2", new MyResource() { Description = "Testing 2", CategoryId = 11 }); var res2 = await Warehouse.Put("sys/service/r2", new MyResource() { Description = "Testing 2", CategoryId = 11 });
var res3 = await Warehouse.Put("sys/service/c1", new MyChildResource() { ChildName = "Child 1", Description = "Child Testing 3", CategoryId = 12 }); var res3 = await Warehouse.Put("sys/service/c1", new MyChildResource() { ChildName = "Child 1", Description = "Child Testing 3", CategoryId = 12 });
var res4 = await Warehouse.Put("sys/service/c2", new MyChildResource() { ChildName = "Child 2 Destroy", Description = "Testing Destroy Handler", CategoryId = 12 });
server.MapCall("Hello", (string msg, DateTime time, DistributedConnection sender) =>
{
Console.WriteLine(msg);
return "Hi " + DateTime.UtcNow;
}).MapCall("temp", () => res4);
service.Resource = res1; service.Resource = res1;
service.ChildResource = res3; service.ChildResource = res3;
@ -84,7 +85,8 @@ namespace Test
// sender.Send("Not found"); // sender.Send("Not found");
//}); //});
web.MapGet("/", (HTTPConnection sender) => { web.MapGet("/", (HTTPConnection sender) =>
{
sender.Send("Hello"); sender.Send("Hello");
}); });
@ -92,19 +94,26 @@ namespace Test
// Start testing // Start testing
TestClient(service); TestClient(service);
// TestClient(service); // TestClient(service);
} }
private static async void TestClient(IResource local) private static async void TestClient(IResource local)
{ {
dynamic remote = await Warehouse.Get<IResource>("iip://localhost/sys/service"); var con = await Warehouse.Get<DistributedConnection>("iip://localhost", new { AutoReconnect = true });
var con = remote.Connection as DistributedConnection; //dynamic remote = await Warehouse.Get<IResource>("iip://localhost/sys/service", new { AutoReconnect = true });
//var con = remote.Connection as DistributedConnection;
dynamic remote = await con.Get("sys/service");
var pcall = await con.Call("Hello", "whats up ?", DateTime.UtcNow); var pcall = await con.Call("Hello", "whats up ?", DateTime.UtcNow);
var temp = await con.Call("temp");
Console.WriteLine("Temp: " + temp.GetHashCode());
var template = await con.GetTemplateByClassName("Test.MyResource"); var template = await con.GetTemplateByClassName("Test.MyResource");
@ -141,30 +150,44 @@ namespace Test
await remote.InvokeEvents("Hello"); await remote.InvokeEvents("Hello");
//var childTemplate = remote.Child.Instance.Template;
var path = TemplateGenerator.GetTemplate("iip://localhost/sys/service", "Generated"); //var path = TemplateGenerator.GetTemplate("iip://localhost/sys/service", "Generated");
Console.WriteLine(path); //Console.WriteLine(path);
perodicTimer = new Timer(new TimerCallback(perodicTimerElapsed), remote, 0, 1000);
} }
static async void perodicTimerElapsed(object state)
{
GC.Collect();
try
{
dynamic remote = state;
Console.WriteLine("Perodic : " + await remote.AsyncHello());
}
catch (Exception ex)
{
Console.WriteLine("Perodic : " + ex.ToString());
}
}
static Timer perodicTimer;
static void TestObjectProps(IResource local, DistributedResource remote) static void TestObjectProps(IResource local, DistributedResource remote)
{ {
foreach(var pt in local.Instance.Template.Properties) foreach (var pt in local.Instance.Template.Properties)
{ {
var lv = pt.PropertyInfo.GetValue(local); var lv = pt.PropertyInfo.GetValue(local);
object v; object v;
var rv = remote.TryGetPropertyValue(pt.Index, out v); var rv = remote.TryGetPropertyValue(pt.Index, out v);
if (!rv) if (!rv)
Console.WriteLine($" ** {pt.Name } Failed"); Console.WriteLine($" ** {pt.Name} Failed");
else else
Console.WriteLine($"{pt.Name } {GetString(lv)} == {GetString(v)}"); Console.WriteLine($"{pt.Name} {GetString(lv)} == {GetString(v)}");
} }
} }
@ -189,15 +212,16 @@ namespace Test
rt += GetString(ar.GetValue(ar.Length - 1)) + "]"; rt += GetString(ar.GetValue(ar.Length - 1)) + "]";
return rt; return rt;
} else if (value is IRecord)
{
return "{" + String.Join(", ", t.GetProperties().Select(x => x.Name + ": " + x.GetValue(value))) + "}";
} }
else if (value is IRecord)
{
return "{" + String.Join(", ", t.GetProperties().Select(x => x.Name + ": " + x.GetValue(value))) + "}";
}
else else
return value.ToString(); return value.ToString();
} }
} }