diff --git a/Esiur/Net/IIP/DistributedConnection.cs b/Esiur/Net/IIP/DistributedConnection.cs
index ac15763..6f6c8d3 100644
--- a/Esiur/Net/IIP/DistributedConnection.cs
+++ b/Esiur/Net/IIP/DistributedConnection.cs
@@ -42,6 +42,7 @@ using static Esiur.Net.Packets.IIPPacket;
using Esiur.Net.HTTP;
using System.Timers;
using System.Threading.Tasks;
+using System.Runtime.InteropServices;
namespace Esiur.Net.IIP;
public partial class DistributedConnection : NetworkConnection, IStore
@@ -1005,7 +1006,7 @@ public partial class DistributedConnection : NetworkConnection, IStore
var r = new Random();
session.Id = new byte[32];
r.NextBytes(session.Id);
-
+
SendParams().AddUInt8(0x28)
.AddUInt8Array(session.Id)
.Done();
@@ -1227,6 +1228,12 @@ public partial class DistributedConnection : NetworkConnection, IStore
[Attribute]
public string Username { get; set; }
+ [Attribute]
+ public bool UseWebSocket { get; set; }
+
+ [Attribute]
+ public bool SecureWebSocket { get; set; }
+
[Attribute]
public string Password { get; set; }
@@ -1308,7 +1315,13 @@ public partial class DistributedConnection : NetworkConnection, IStore
throw new AsyncException(ErrorType.Exception, 0, "Session not initialized");
if (socket == null)
- socket = new TCPSocket();
+ {
+ var os = RuntimeInformation.FrameworkDescription;
+ if (UseWebSocket || RuntimeInformation.OSDescription == "Browser")
+ socket = new ClientWSocket();
+ else
+ socket = new TCPSocket();
+ }
if (port > 0)
this._port = port;
@@ -1378,16 +1391,22 @@ public partial class DistributedConnection : NetworkConnection, IStore
if (dataType.Identifier == TransmissionTypeIdentifier.ResourceList)
{
+
+ // remove from suspended.
+ suspendedResources.Remove(r.DistributedResourceInstanceId);
+
// parse them as int
var id = data.GetUInt32(8, Endian.Little);
+
+ // id changed ?
if (id != r.DistributedResourceInstanceId)
r.DistributedResourceInstanceId = id;
neededResources[id] = r;
- suspendedResources.Remove(id);
await Fetch(id, null);
+ Console.WriteLine("Restored " + id);
}
}
catch (AsyncException ex)
@@ -1398,6 +1417,7 @@ public partial class DistributedConnection : NetworkConnection, IStore
}
else
{
+ Global.Log(ex);
break;
}
}
diff --git a/Esiur/Net/IIP/DistributedConnectionProtocol.cs b/Esiur/Net/IIP/DistributedConnectionProtocol.cs
index 8ea6f27..40460c0 100644
--- a/Esiur/Net/IIP/DistributedConnectionProtocol.cs
+++ b/Esiur/Net/IIP/DistributedConnectionProtocol.cs
@@ -1798,6 +1798,10 @@ partial class DistributedConnection
void IIPRequestSetProperty(uint callback, uint resourceId, byte index, TransmissionType transmissionType, byte[] content)
{
+
+ // un hold the socket to send data immediately
+ this.Socket.Unhold();
+
Warehouse.GetById(resourceId).Then((r) =>
{
if (r != null)
@@ -1863,6 +1867,7 @@ partial class DistributedConnection
try
{
+
pi.SetValue(r, value);
SendReply(IIPPacket.IIPPacketAction.SetProperty, callback).Done();
}
diff --git a/Esiur/Net/IIP/DistributedResource.cs b/Esiur/Net/IIP/DistributedResource.cs
index 5263201..a297a4a 100644
--- a/Esiur/Net/IIP/DistributedResource.cs
+++ b/Esiur/Net/IIP/DistributedResource.cs
@@ -412,6 +412,8 @@ public class DistributedResource : DynamicObject, IResource, INotifyPropertyChan
/// Indicator when the property is set.
protected object _SetSync(byte index, object value)
{
+ Console.WriteLine("Setting..." + index + " " + value);
+
if (destroyed)
throw new Exception("Trying to access a destroyed object.");
@@ -428,7 +430,10 @@ public class DistributedResource : DynamicObject, IResource, INotifyPropertyChan
if (properties[index] == value)
return value;
- return _Set(index, value).Wait();
+ var rt = _Set(index, value).Wait();
+
+ Console.WriteLine("Done Setting");
+ return rt;
}
///
diff --git a/Esiur/Net/Sockets/ClientWSocket.cs b/Esiur/Net/Sockets/ClientWSocket.cs
new file mode 100644
index 0000000..d23716c
--- /dev/null
+++ b/Esiur/Net/Sockets/ClientWSocket.cs
@@ -0,0 +1,221 @@
+using Esiur.Core;
+using System;
+using System.Collections.Generic;
+using System.Net;
+using System.Text;
+using System.Net.WebSockets;
+using Esiur.Net.Packets;
+using Esiur.Resource;
+using Esiur.Misc;
+using System.Drawing;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Net.Sockets;
+
+namespace Esiur.Net.Sockets
+{
+ public class ClientWSocket : ISocket
+ {
+
+ ClientWebSocket sock;
+
+ NetworkBuffer receiveNetworkBuffer = new NetworkBuffer();
+ NetworkBuffer sendNetworkBuffer = new NetworkBuffer();
+
+ byte[] websocketReceiveBuffer = new byte[10240];
+ ArraySegment websocketReceiveBufferSegment;
+
+ object sendLock = new object();
+ bool held;
+
+ public event DestroyedEvent OnDestroy;
+
+ long totalSent, totalReceived;
+
+
+ public IPEndPoint LocalEndPoint { get; } = new IPEndPoint(IPAddress.Any, 0);
+
+ public IPEndPoint RemoteEndPoint { get; } = new IPEndPoint(IPAddress.Any, 0);
+
+
+ public SocketState State { get; internal set; } = SocketState.Closed;
+
+ public INetworkReceiver Receiver { get; set; }
+
+ public ClientWSocket()
+ {
+ websocketReceiveBufferSegment = new ArraySegment(websocketReceiveBuffer);
+ }
+
+
+ public void Send(byte[] message)
+ {
+
+ lock (sendLock)
+ {
+ if (held)
+ {
+ sendNetworkBuffer.Write(message);
+ }
+ else
+ {
+ totalSent += message.Length;
+ sock.SendAsync(new ArraySegment(message), WebSocketMessageType.Binary,
+ true, new System.Threading.CancellationToken());
+ }
+ }
+ }
+
+
+ public void Send(byte[] message, int offset, int size)
+ {
+ lock (sendLock)
+ {
+ if (held)
+ {
+ sendNetworkBuffer.Write(message, (uint)offset, (uint)size);
+ }
+ else
+ {
+ totalSent += size;
+
+ sock.SendAsync(new ArraySegment(message, offset, size),
+ WebSocketMessageType.Binary, true, new System.Threading.CancellationToken());
+ }
+ }
+ }
+
+
+ public void Close()
+ {
+ sock.CloseAsync(WebSocketCloseStatus.NormalClosure, "", new System.Threading.CancellationToken());
+ }
+
+ public bool Secure { get; set; }
+
+ public async AsyncReply Connect(string hostname, ushort port)
+ {
+ var url = new Uri($"{(Secure ? "wss" : "ws")}://{hostname}:{port}");
+
+ sock = new ClientWebSocket();
+ await sock.ConnectAsync(url, new CancellationToken());
+
+ State = SocketState.Established;
+
+ sock.ReceiveAsync(websocketReceiveBufferSegment, new CancellationToken())
+ .ContinueWith(NetworkReceive);
+
+ return true;
+
+
+ }
+
+
+ public bool Begin()
+ {
+ return true;
+ }
+
+ public bool Trigger(ResourceTrigger trigger)
+ {
+ return true;
+ }
+
+ public void Destroy()
+ {
+ Close();
+
+ receiveNetworkBuffer = null;
+ Receiver = null;
+
+ sock = null;
+ OnDestroy?.Invoke(this);
+ OnDestroy = null;
+ }
+
+ public AsyncReply AcceptAsync()
+ {
+ throw new NotImplementedException();
+ }
+
+ public void Hold()
+ {
+ held = true;
+ }
+
+ public void Unhold()
+ {
+ lock (sendLock)
+ {
+ held = false;
+
+ var message = sendNetworkBuffer.Read();
+
+ if (message == null)
+ return;
+
+ totalSent += message.Length;
+
+ sock.SendAsync(new ArraySegment(message), WebSocketMessageType.Binary,
+ true, new System.Threading.CancellationToken());
+
+ }
+ }
+
+ public async AsyncReply SendAsync(byte[] message, int offset, int length)
+ {
+ if (held)
+ {
+ sendNetworkBuffer.Write(message, (uint)offset, (uint)length);
+ }
+ else
+ {
+ totalSent += length;
+
+ await sock.SendAsync(new ArraySegment(message, offset, length),
+ WebSocketMessageType.Binary, true, new System.Threading.CancellationToken());
+ }
+
+
+ return true;
+ }
+
+ public ISocket Accept()
+ {
+ throw new NotImplementedException();
+ }
+
+ public AsyncReply BeginAsync()
+ {
+ return new AsyncReply(true);
+ }
+
+
+ private void NetworkReceive(Task task)
+ {
+
+ if (sock.State == WebSocketState.Closed)
+ {
+ Receiver?.NetworkClose(this);
+ return;
+ }
+
+
+ var receivedLength = task.Result.Count;
+
+ receiveNetworkBuffer.Write(websocketReceiveBuffer, 0, (uint)receivedLength);
+
+ Receiver?.NetworkReceive(this, receiveNetworkBuffer);
+
+ sock.ReceiveAsync(websocketReceiveBufferSegment, new CancellationToken())
+ .ContinueWith(NetworkReceive);
+
+ }
+
+
+ public void NetworkConnect(ISocket sender)
+ {
+ Receiver?.NetworkConnect(this);
+ }
+ }
+}
diff --git a/Esiur/Net/Sockets/SocketState.cs b/Esiur/Net/Sockets/SocketState.cs
index 3e85465..aac61cf 100644
--- a/Esiur/Net/Sockets/SocketState.cs
+++ b/Esiur/Net/Sockets/SocketState.cs
@@ -36,5 +36,4 @@ public enum SocketState
Connecting,
Established,
Closed,
- //Terminated
}
diff --git a/Esiur/Net/Sockets/TCPSocket.cs b/Esiur/Net/Sockets/TCPSocket.cs
index b097ead..822a0b6 100644
--- a/Esiur/Net/Sockets/TCPSocket.cs
+++ b/Esiur/Net/Sockets/TCPSocket.cs
@@ -172,118 +172,7 @@ public class TCPSocket : ISocket
return rt;
}
-
-
- //private void DataReceived(Task task)
- //{
- // try
- // {
- // // SocketError err;
-
- // if (state == SocketState.Closed || state == SocketState.Terminated)
- // return;
-
- // if (task.Result <= 0)
- // {
- // Close();
- // return;
- // }
-
- // receiveNetworkBuffer.Write(receiveBuffer, 0, (uint)task.Result);
- // //OnReceive?.Invoke(receiveNetworkBuffer);
- // Receiver?.NetworkReceive(this, receiveNetworkBuffer);
- // if (state == SocketState.Established)
- // sock.ReceiveAsync(receiveBufferSegment, SocketFlags.None).ContinueWith(DataReceived);
-
- // }
- // catch (Exception ex)
- // {
- // if (state != SocketState.Closed && !sock.Connected)
- // {
- // state = SocketState.Terminated;
- // Close();
- // }
-
- // Global.Log("TCPSocket", LogType.Error, ex.ToString());
- // }
- //}
-
- //private void SocketArgs_Completed(object sender, SocketAsyncEventArgs e)
- //{
- // try
- // {
- // if (state != SocketState.Established)
- // return;
-
- // if (e.BytesTransferred <= 0)
- // {
- // Close();
- // return;
- // }
- // else if (e.SocketError != SocketError.Success)
- // {
- // Close();
- // return;
-
- // }
-
- // var recCount = e.BytesTransferred > e.Count ? e.Count : e.BytesTransferred;
- // receiveNetworkBuffer.Write(receiveBuffer, 0, (uint)recCount);
-
- // //OnReceive?.Invoke(receiveNetworkBuffer);
- // Receiver?.NetworkReceive(this, receiveNetworkBuffer);
-
- // if (state == SocketState.Established)
- // while (!sock.ReceiveAsync(e))
- // {
- // if (e.SocketError != SocketError.Success)
- // {
- // Close();
- // return;
- // }
-
- // if (State != SocketState.Established)
- // return;
-
- // //if (e.BytesTransferred < 0)
- // // Console.WriteLine("BytesTransferred is less than zero");
-
- // if (e.BytesTransferred <= 0)
- // {
- // Close();
- // return;
- // }
- // else if (e.SocketError != SocketError.Success)
- // {
- // Close();
- // return;
- // }
-
-
- // //if (e.BytesTransferred > 100000)
- // // Console.WriteLine("BytesTransferred is large " + e.BytesTransferred);
-
- // recCount = e.BytesTransferred > e.Count ? e.Count : e.BytesTransferred;
-
- // receiveNetworkBuffer.Write(receiveBuffer, 0, (uint)recCount);
-
- // //OnReceive?.Invoke(receiveNetworkBuffer);
- // Receiver?.NetworkReceive(this, receiveNetworkBuffer);
- // }
-
- // }
- // catch (Exception ex)
- // {
- // if (state != SocketState.Closed && !sock.Connected)
- // {
- // state = SocketState.Terminated;
- // Close();
- // }
-
- // Global.Log("TCPSocket", LogType.Error, ex.ToString());
- // }
- //}
-
+
public IPEndPoint LocalEndPoint
{
get { return (IPEndPoint)sock.LocalEndPoint; }
@@ -312,40 +201,7 @@ public class TCPSocket : ISocket
Connect(hostname, port);
}
-
- //private void DataSent(Task task)
- //{
- // try
- // {
- // lock (sendLock)
- // {
-
- // if (sendBufferQueue.Count > 0)
- // {
- // byte[] data = sendBufferQueue.Dequeue();
- // //Console.WriteLine(Encoding.UTF8.GetString(data));
- // sock.SendAsync(new ArraySegment(data), SocketFlags.None).ContinueWith(DataSent);
- // }
-
- // else
- // {
- // asyncSending = false;
- // }
- // }
- // }
- // catch (Exception ex)
- // {
- // if (state != SocketState.Closed && !sock.Connected)
- // {
- // state = SocketState.Terminated;
- // Close();
- // }
-
- // asyncSending = false;
-
- // Global.Log("TCPSocket", LogType.Error, ex.ToString());
- // }
- //}
+
public TCPSocket(IPEndPoint localEndPoint)
{
@@ -368,9 +224,6 @@ public class TCPSocket : ISocket
}
-
-
-
public IPEndPoint RemoteEndPoint
{
get { return (IPEndPoint)sock.RemoteEndPoint; }
@@ -616,14 +469,14 @@ public class TCPSocket : ISocket
public AsyncReply SendAsync(byte[] message, int offset, int length)
{
- if (state == SocketState.Closed)// || state == SocketState.Terminated)
+ if (state == SocketState.Closed)
return new AsyncReply(false);
var msg = message.Clip((uint)offset, (uint)length);
lock (sendLock)
{
- if (state == SocketState.Closed)// || state == SocketState.Terminated)
+ if (state == SocketState.Closed)
return new AsyncReply(false);
if (!sock.Connected)
@@ -641,13 +494,12 @@ public class TCPSocket : ISocket
try
{
currentReply = rt;
- sock.BeginSend(msg, 0, msg.Length, SocketFlags.None, sendCallback, this);// null);
+ sock.BeginSend(msg, 0, msg.Length, SocketFlags.None, sendCallback, this);
}
catch (Exception ex)
{
rt.TriggerError(ex);
asyncSending = false;
- //state = SocketState.Terminated;
Close();
}
//sock.SendAsync(new ArraySegment(msg), SocketFlags.None).ContinueWith(DataSent);
diff --git a/Esiur/Net/Sockets/WSocket.cs b/Esiur/Net/Sockets/WSocket.cs
index 86c9ad2..dc6e75d 100644
--- a/Esiur/Net/Sockets/WSocket.cs
+++ b/Esiur/Net/Sockets/WSocket.cs
@@ -56,7 +56,7 @@ public class WSocket : ISocket, INetworkReceiver
long totalSent, totalReceived;
- bool processing = false;
+
public IPEndPoint LocalEndPoint
{
@@ -254,14 +254,12 @@ public class WSocket : ISocket, INetworkReceiver
public void NetworkReceive(ISocket sender, NetworkBuffer buffer)
{
- if (sock.State == SocketState.Closed)// || sock.State == SocketState.Terminated)
+ if (sock.State == SocketState.Closed)
return;
if (buffer.Protected)
return;
- if (processing)
- return;
var msg = buffer.Read();
@@ -270,7 +268,6 @@ public class WSocket : ISocket, INetworkReceiver
return;
var wsPacketLength = pkt_receive.Parse(msg, 0, (uint)msg.Length);
- //Console.WriteLine("WSP: " + wsPacketLength);
if (wsPacketLength < 0)
{
@@ -289,12 +286,14 @@ public class WSocket : ISocket, INetworkReceiver
}
else if (pkt_receive.Opcode == WebsocketPacket.WSOpcode.Ping)
{
- var pkt_pong = new WebsocketPacket();
+ var pkt_pong = new WebsocketPacket()
+ {
+ FIN = true,
+ Mask = false,
+ Opcode = WebsocketPacket.WSOpcode.Pong,
+ Message = pkt_receive.Message
+ };
- pkt_pong.FIN = true;
- pkt_pong.Mask = false;
- pkt_pong.Opcode = WebsocketPacket.WSOpcode.Pong;
- pkt_pong.Message = pkt_receive.Message;
offset += (uint)wsPacketLength;
Send(pkt_pong);
@@ -322,7 +321,6 @@ public class WSocket : ISocket, INetworkReceiver
if (offset == msg.Length)
{
- //OnReceive?.Invoke(receiveNetworkBuffer);
Receiver?.NetworkReceive(this, receiveNetworkBuffer);
return;
}
@@ -330,25 +328,20 @@ public class WSocket : ISocket, INetworkReceiver
wsPacketLength = pkt_receive.Parse(msg, offset, (uint)msg.Length);
}
- if (wsPacketLength < 0)//(offset < msg.Length) && (offset > 0))
+ if (wsPacketLength < 0)
{
- //receiveNetworkBuffer.HoldFor(msg, offset, (uint)(msg.Length - offset), (uint)msg.Length + (uint)-wsPacketLength);
// save the incomplete packet to the heldBuffer queue
-
buffer.HoldFor(msg, offset, (uint)(msg.Length - offset), (uint)(msg.Length - offset) + (uint)-wsPacketLength);
}
//Console.WriteLine("WS IN: " + receiveNetworkBuffer.Available);
- //OnReceive?.Invoke(receiveNetworkBuffer);
Receiver?.NetworkReceive(this, receiveNetworkBuffer);
- processing = false;
if (buffer.Available > 0 && !buffer.Protected)
- Receiver?.NetworkReceive(this, buffer);
- //Sock_OnReceive(buffer);
+ NetworkReceive(this, buffer);
}
diff --git a/Esiur/Proxy/ResourceGenerator.cs b/Esiur/Proxy/ResourceGenerator.cs
index 814931d..9e8fb3f 100644
--- a/Esiur/Proxy/ResourceGenerator.cs
+++ b/Esiur/Proxy/ResourceGenerator.cs
@@ -43,7 +43,7 @@ public class ResourceGenerator : ISourceGenerator
{
if (tmp.Type == TemplateType.Resource)
{
- var source = TemplateGenerator.GenerateClass(tmp, templates);
+ var source = TemplateGenerator.GenerateClass(tmp, templates, false);
// File.WriteAllText($@"C:\gen\{tmp.ClassName}.cs", source);
context.AddSource(tmp.ClassName + ".Generated.cs", source);
}
@@ -100,7 +100,6 @@ public class ResourceGenerator : ISourceGenerator
continue;
- //File.WriteAllLines("C:\\gen\\ref.log", context.Compilation.ReferencedAssemblyNames.Select(x => x.ToString()));
if (cache.Contains(path))
{
@@ -131,7 +130,6 @@ public class ResourceGenerator : ISourceGenerator
catch (Exception ex)
{
ReportError(context, ex.Source, ex.Message, "Esiur");
- //System.IO.File.AppendAllText("c:\\gen\\error.log", ex.ToString() + "\r\n");
}
//inProgress.Remove(path);
diff --git a/Esiur/Proxy/TemplateGenerator.cs b/Esiur/Proxy/TemplateGenerator.cs
index 04d89ee..5a00fab 100644
--- a/Esiur/Proxy/TemplateGenerator.cs
+++ b/Esiur/Proxy/TemplateGenerator.cs
@@ -170,7 +170,7 @@ public static class TemplateGenerator
return (representationType.Nullable) ? name + "?" : name;
}
- public static string GetTemplate(string url, string dir = null, string username = null, string password = null)
+ public static string GetTemplate(string url, string dir = null, string username = null, string password = null, bool asyncSetters = false)
{
try
{
@@ -209,7 +209,7 @@ public static class TemplateGenerator
{
if (tmp.Type == TemplateType.Resource)
{
- var source = GenerateClass(tmp, templates);
+ var source = GenerateClass(tmp, templates, asyncSetters);
File.WriteAllText(tempDir.FullName + Path.DirectorySeparatorChar + tmp.ClassName + ".Generated.cs", source);
}
else if (tmp.Type == TemplateType.Record)
@@ -253,7 +253,7 @@ public static class TemplateGenerator
}
}
- internal static string GenerateClass(TypeTemplate template, TypeTemplate[] templates)
+ internal static string GenerateClass(TypeTemplate template, TypeTemplate[] templates, bool asyncSetters)
{
var cls = template.ClassName.Split('.');
@@ -353,7 +353,10 @@ public static class TemplateGenerator
var ptTypeName = GetTypeName(p.ValueType, templates);
rt.AppendLine($"[Public] public {ptTypeName} {p.Name} {{");
rt.AppendLine($"get => ({ptTypeName})properties[{p.Index}];");
- rt.AppendLine($"set => _SetSync({p.Index}, value);");
+ if (asyncSetters)
+ rt.AppendLine($"set => _Set({p.Index}, value);");
+ else
+ rt.AppendLine($"set => _SetSync({p.Index}, value);");
rt.AppendLine("}");
}
diff --git a/Esiur/Tools/Esiur.psd1 b/Esiur/Tools/Esiur.psd1
index 06ef1a2..3d425cd 100644
Binary files a/Esiur/Tools/Esiur.psd1 and b/Esiur/Tools/Esiur.psd1 differ
diff --git a/Esiur/Tools/Esiur.psm1 b/Esiur/Tools/Esiur.psm1
index 7f1a4d2..90f3c3a 100644
--- a/Esiur/Tools/Esiur.psm1
+++ b/Esiur/Tools/Esiur.psm1
@@ -1,11 +1,11 @@
-function Get-Template($url, $dir, $username, $password)
+function Get-Template($url, $dir, $username, $password, $asyncSetters)
{
$lib = Resolve-Path -Path "$($PSScriptRoot)\..\lib\netstandard2.0\Esiur.dll"
#write-host "Lib is at $($lib)"
$assembly = [Reflection.Assembly]::LoadFile($lib)
- $tempPath = [Esiur.Proxy.TemplateGenerator]::GetTemplate($url, $dir, $username,$password);
+ $tempPath = [Esiur.Proxy.TemplateGenerator]::GetTemplate($url, $dir, $username,$password, $asyncSetters);
$startupProject = GetStartupProject
diff --git a/Test/Program.cs b/Test/Program.cs
index c8d6fcc..32f8ecb 100644
--- a/Test/Program.cs
+++ b/Test/Program.cs
@@ -53,8 +53,7 @@ namespace Test
{
static async Task Main(string[] args)
- {
-
+ {
// Create stores to keep objects.
var system = await Warehouse.Put("sys", new MemoryStore());