From 216b891bf8d28f675e67d6c1eb2fb43e15fd1ecd Mon Sep 17 00:00:00 2001 From: Esiur Project Date: Tue, 30 Aug 2022 22:28:19 +0300 Subject: [PATCH] Blazor support --- Esiur/Net/IIP/DistributedConnection.cs | 26 ++- .../Net/IIP/DistributedConnectionProtocol.cs | 5 + Esiur/Net/IIP/DistributedResource.cs | 7 +- Esiur/Net/Sockets/ClientWSocket.cs | 221 ++++++++++++++++++ Esiur/Net/Sockets/SocketState.cs | 1 - Esiur/Net/Sockets/TCPSocket.cs | 158 +------------ Esiur/Net/Sockets/WSocket.cs | 29 +-- Esiur/Proxy/ResourceGenerator.cs | 4 +- Esiur/Proxy/TemplateGenerator.cs | 11 +- Esiur/Tools/Esiur.psd1 | Bin 7786 -> 7796 bytes Esiur/Tools/Esiur.psm1 | 4 +- Test/Program.cs | 3 +- 12 files changed, 282 insertions(+), 187 deletions(-) create mode 100644 Esiur/Net/Sockets/ClientWSocket.cs diff --git a/Esiur/Net/IIP/DistributedConnection.cs b/Esiur/Net/IIP/DistributedConnection.cs index ac15763..6f6c8d3 100644 --- a/Esiur/Net/IIP/DistributedConnection.cs +++ b/Esiur/Net/IIP/DistributedConnection.cs @@ -42,6 +42,7 @@ using static Esiur.Net.Packets.IIPPacket; using Esiur.Net.HTTP; using System.Timers; using System.Threading.Tasks; +using System.Runtime.InteropServices; namespace Esiur.Net.IIP; public partial class DistributedConnection : NetworkConnection, IStore @@ -1005,7 +1006,7 @@ public partial class DistributedConnection : NetworkConnection, IStore var r = new Random(); session.Id = new byte[32]; r.NextBytes(session.Id); - + SendParams().AddUInt8(0x28) .AddUInt8Array(session.Id) .Done(); @@ -1227,6 +1228,12 @@ public partial class DistributedConnection : NetworkConnection, IStore [Attribute] public string Username { get; set; } + [Attribute] + public bool UseWebSocket { get; set; } + + [Attribute] + public bool SecureWebSocket { get; set; } + [Attribute] public string Password { get; set; } @@ -1308,7 +1315,13 @@ public partial class DistributedConnection : NetworkConnection, IStore throw new AsyncException(ErrorType.Exception, 0, "Session not initialized"); if (socket == null) - socket = new TCPSocket(); + { + var os = RuntimeInformation.FrameworkDescription; + if (UseWebSocket || RuntimeInformation.OSDescription == "Browser") + socket = new ClientWSocket(); + else + socket = new TCPSocket(); + } if (port > 0) this._port = port; @@ -1378,16 +1391,22 @@ public partial class DistributedConnection : NetworkConnection, IStore if (dataType.Identifier == TransmissionTypeIdentifier.ResourceList) { + + // remove from suspended. + suspendedResources.Remove(r.DistributedResourceInstanceId); + // parse them as int var id = data.GetUInt32(8, Endian.Little); + + // id changed ? if (id != r.DistributedResourceInstanceId) r.DistributedResourceInstanceId = id; neededResources[id] = r; - suspendedResources.Remove(id); await Fetch(id, null); + Console.WriteLine("Restored " + id); } } catch (AsyncException ex) @@ -1398,6 +1417,7 @@ public partial class DistributedConnection : NetworkConnection, IStore } else { + Global.Log(ex); break; } } diff --git a/Esiur/Net/IIP/DistributedConnectionProtocol.cs b/Esiur/Net/IIP/DistributedConnectionProtocol.cs index 8ea6f27..40460c0 100644 --- a/Esiur/Net/IIP/DistributedConnectionProtocol.cs +++ b/Esiur/Net/IIP/DistributedConnectionProtocol.cs @@ -1798,6 +1798,10 @@ partial class DistributedConnection void IIPRequestSetProperty(uint callback, uint resourceId, byte index, TransmissionType transmissionType, byte[] content) { + + // un hold the socket to send data immediately + this.Socket.Unhold(); + Warehouse.GetById(resourceId).Then((r) => { if (r != null) @@ -1863,6 +1867,7 @@ partial class DistributedConnection try { + pi.SetValue(r, value); SendReply(IIPPacket.IIPPacketAction.SetProperty, callback).Done(); } diff --git a/Esiur/Net/IIP/DistributedResource.cs b/Esiur/Net/IIP/DistributedResource.cs index 5263201..a297a4a 100644 --- a/Esiur/Net/IIP/DistributedResource.cs +++ b/Esiur/Net/IIP/DistributedResource.cs @@ -412,6 +412,8 @@ public class DistributedResource : DynamicObject, IResource, INotifyPropertyChan /// Indicator when the property is set. protected object _SetSync(byte index, object value) { + Console.WriteLine("Setting..." + index + " " + value); + if (destroyed) throw new Exception("Trying to access a destroyed object."); @@ -428,7 +430,10 @@ public class DistributedResource : DynamicObject, IResource, INotifyPropertyChan if (properties[index] == value) return value; - return _Set(index, value).Wait(); + var rt = _Set(index, value).Wait(); + + Console.WriteLine("Done Setting"); + return rt; } /// diff --git a/Esiur/Net/Sockets/ClientWSocket.cs b/Esiur/Net/Sockets/ClientWSocket.cs new file mode 100644 index 0000000..d23716c --- /dev/null +++ b/Esiur/Net/Sockets/ClientWSocket.cs @@ -0,0 +1,221 @@ +using Esiur.Core; +using System; +using System.Collections.Generic; +using System.Net; +using System.Text; +using System.Net.WebSockets; +using Esiur.Net.Packets; +using Esiur.Resource; +using Esiur.Misc; +using System.Drawing; +using System.Threading; +using System.Threading.Tasks; +using System.Net.Sockets; + +namespace Esiur.Net.Sockets +{ + public class ClientWSocket : ISocket + { + + ClientWebSocket sock; + + NetworkBuffer receiveNetworkBuffer = new NetworkBuffer(); + NetworkBuffer sendNetworkBuffer = new NetworkBuffer(); + + byte[] websocketReceiveBuffer = new byte[10240]; + ArraySegment websocketReceiveBufferSegment; + + object sendLock = new object(); + bool held; + + public event DestroyedEvent OnDestroy; + + long totalSent, totalReceived; + + + public IPEndPoint LocalEndPoint { get; } = new IPEndPoint(IPAddress.Any, 0); + + public IPEndPoint RemoteEndPoint { get; } = new IPEndPoint(IPAddress.Any, 0); + + + public SocketState State { get; internal set; } = SocketState.Closed; + + public INetworkReceiver Receiver { get; set; } + + public ClientWSocket() + { + websocketReceiveBufferSegment = new ArraySegment(websocketReceiveBuffer); + } + + + public void Send(byte[] message) + { + + lock (sendLock) + { + if (held) + { + sendNetworkBuffer.Write(message); + } + else + { + totalSent += message.Length; + sock.SendAsync(new ArraySegment(message), WebSocketMessageType.Binary, + true, new System.Threading.CancellationToken()); + } + } + } + + + public void Send(byte[] message, int offset, int size) + { + lock (sendLock) + { + if (held) + { + sendNetworkBuffer.Write(message, (uint)offset, (uint)size); + } + else + { + totalSent += size; + + sock.SendAsync(new ArraySegment(message, offset, size), + WebSocketMessageType.Binary, true, new System.Threading.CancellationToken()); + } + } + } + + + public void Close() + { + sock.CloseAsync(WebSocketCloseStatus.NormalClosure, "", new System.Threading.CancellationToken()); + } + + public bool Secure { get; set; } + + public async AsyncReply Connect(string hostname, ushort port) + { + var url = new Uri($"{(Secure ? "wss" : "ws")}://{hostname}:{port}"); + + sock = new ClientWebSocket(); + await sock.ConnectAsync(url, new CancellationToken()); + + State = SocketState.Established; + + sock.ReceiveAsync(websocketReceiveBufferSegment, new CancellationToken()) + .ContinueWith(NetworkReceive); + + return true; + + + } + + + public bool Begin() + { + return true; + } + + public bool Trigger(ResourceTrigger trigger) + { + return true; + } + + public void Destroy() + { + Close(); + + receiveNetworkBuffer = null; + Receiver = null; + + sock = null; + OnDestroy?.Invoke(this); + OnDestroy = null; + } + + public AsyncReply AcceptAsync() + { + throw new NotImplementedException(); + } + + public void Hold() + { + held = true; + } + + public void Unhold() + { + lock (sendLock) + { + held = false; + + var message = sendNetworkBuffer.Read(); + + if (message == null) + return; + + totalSent += message.Length; + + sock.SendAsync(new ArraySegment(message), WebSocketMessageType.Binary, + true, new System.Threading.CancellationToken()); + + } + } + + public async AsyncReply SendAsync(byte[] message, int offset, int length) + { + if (held) + { + sendNetworkBuffer.Write(message, (uint)offset, (uint)length); + } + else + { + totalSent += length; + + await sock.SendAsync(new ArraySegment(message, offset, length), + WebSocketMessageType.Binary, true, new System.Threading.CancellationToken()); + } + + + return true; + } + + public ISocket Accept() + { + throw new NotImplementedException(); + } + + public AsyncReply BeginAsync() + { + return new AsyncReply(true); + } + + + private void NetworkReceive(Task task) + { + + if (sock.State == WebSocketState.Closed) + { + Receiver?.NetworkClose(this); + return; + } + + + var receivedLength = task.Result.Count; + + receiveNetworkBuffer.Write(websocketReceiveBuffer, 0, (uint)receivedLength); + + Receiver?.NetworkReceive(this, receiveNetworkBuffer); + + sock.ReceiveAsync(websocketReceiveBufferSegment, new CancellationToken()) + .ContinueWith(NetworkReceive); + + } + + + public void NetworkConnect(ISocket sender) + { + Receiver?.NetworkConnect(this); + } + } +} diff --git a/Esiur/Net/Sockets/SocketState.cs b/Esiur/Net/Sockets/SocketState.cs index 3e85465..aac61cf 100644 --- a/Esiur/Net/Sockets/SocketState.cs +++ b/Esiur/Net/Sockets/SocketState.cs @@ -36,5 +36,4 @@ public enum SocketState Connecting, Established, Closed, - //Terminated } diff --git a/Esiur/Net/Sockets/TCPSocket.cs b/Esiur/Net/Sockets/TCPSocket.cs index b097ead..822a0b6 100644 --- a/Esiur/Net/Sockets/TCPSocket.cs +++ b/Esiur/Net/Sockets/TCPSocket.cs @@ -172,118 +172,7 @@ public class TCPSocket : ISocket return rt; } - - - //private void DataReceived(Task task) - //{ - // try - // { - // // SocketError err; - - // if (state == SocketState.Closed || state == SocketState.Terminated) - // return; - - // if (task.Result <= 0) - // { - // Close(); - // return; - // } - - // receiveNetworkBuffer.Write(receiveBuffer, 0, (uint)task.Result); - // //OnReceive?.Invoke(receiveNetworkBuffer); - // Receiver?.NetworkReceive(this, receiveNetworkBuffer); - // if (state == SocketState.Established) - // sock.ReceiveAsync(receiveBufferSegment, SocketFlags.None).ContinueWith(DataReceived); - - // } - // catch (Exception ex) - // { - // if (state != SocketState.Closed && !sock.Connected) - // { - // state = SocketState.Terminated; - // Close(); - // } - - // Global.Log("TCPSocket", LogType.Error, ex.ToString()); - // } - //} - - //private void SocketArgs_Completed(object sender, SocketAsyncEventArgs e) - //{ - // try - // { - // if (state != SocketState.Established) - // return; - - // if (e.BytesTransferred <= 0) - // { - // Close(); - // return; - // } - // else if (e.SocketError != SocketError.Success) - // { - // Close(); - // return; - - // } - - // var recCount = e.BytesTransferred > e.Count ? e.Count : e.BytesTransferred; - // receiveNetworkBuffer.Write(receiveBuffer, 0, (uint)recCount); - - // //OnReceive?.Invoke(receiveNetworkBuffer); - // Receiver?.NetworkReceive(this, receiveNetworkBuffer); - - // if (state == SocketState.Established) - // while (!sock.ReceiveAsync(e)) - // { - // if (e.SocketError != SocketError.Success) - // { - // Close(); - // return; - // } - - // if (State != SocketState.Established) - // return; - - // //if (e.BytesTransferred < 0) - // // Console.WriteLine("BytesTransferred is less than zero"); - - // if (e.BytesTransferred <= 0) - // { - // Close(); - // return; - // } - // else if (e.SocketError != SocketError.Success) - // { - // Close(); - // return; - // } - - - // //if (e.BytesTransferred > 100000) - // // Console.WriteLine("BytesTransferred is large " + e.BytesTransferred); - - // recCount = e.BytesTransferred > e.Count ? e.Count : e.BytesTransferred; - - // receiveNetworkBuffer.Write(receiveBuffer, 0, (uint)recCount); - - // //OnReceive?.Invoke(receiveNetworkBuffer); - // Receiver?.NetworkReceive(this, receiveNetworkBuffer); - // } - - // } - // catch (Exception ex) - // { - // if (state != SocketState.Closed && !sock.Connected) - // { - // state = SocketState.Terminated; - // Close(); - // } - - // Global.Log("TCPSocket", LogType.Error, ex.ToString()); - // } - //} - + public IPEndPoint LocalEndPoint { get { return (IPEndPoint)sock.LocalEndPoint; } @@ -312,40 +201,7 @@ public class TCPSocket : ISocket Connect(hostname, port); } - - //private void DataSent(Task task) - //{ - // try - // { - // lock (sendLock) - // { - - // if (sendBufferQueue.Count > 0) - // { - // byte[] data = sendBufferQueue.Dequeue(); - // //Console.WriteLine(Encoding.UTF8.GetString(data)); - // sock.SendAsync(new ArraySegment(data), SocketFlags.None).ContinueWith(DataSent); - // } - - // else - // { - // asyncSending = false; - // } - // } - // } - // catch (Exception ex) - // { - // if (state != SocketState.Closed && !sock.Connected) - // { - // state = SocketState.Terminated; - // Close(); - // } - - // asyncSending = false; - - // Global.Log("TCPSocket", LogType.Error, ex.ToString()); - // } - //} + public TCPSocket(IPEndPoint localEndPoint) { @@ -368,9 +224,6 @@ public class TCPSocket : ISocket } - - - public IPEndPoint RemoteEndPoint { get { return (IPEndPoint)sock.RemoteEndPoint; } @@ -616,14 +469,14 @@ public class TCPSocket : ISocket public AsyncReply SendAsync(byte[] message, int offset, int length) { - if (state == SocketState.Closed)// || state == SocketState.Terminated) + if (state == SocketState.Closed) return new AsyncReply(false); var msg = message.Clip((uint)offset, (uint)length); lock (sendLock) { - if (state == SocketState.Closed)// || state == SocketState.Terminated) + if (state == SocketState.Closed) return new AsyncReply(false); if (!sock.Connected) @@ -641,13 +494,12 @@ public class TCPSocket : ISocket try { currentReply = rt; - sock.BeginSend(msg, 0, msg.Length, SocketFlags.None, sendCallback, this);// null); + sock.BeginSend(msg, 0, msg.Length, SocketFlags.None, sendCallback, this); } catch (Exception ex) { rt.TriggerError(ex); asyncSending = false; - //state = SocketState.Terminated; Close(); } //sock.SendAsync(new ArraySegment(msg), SocketFlags.None).ContinueWith(DataSent); diff --git a/Esiur/Net/Sockets/WSocket.cs b/Esiur/Net/Sockets/WSocket.cs index 86c9ad2..dc6e75d 100644 --- a/Esiur/Net/Sockets/WSocket.cs +++ b/Esiur/Net/Sockets/WSocket.cs @@ -56,7 +56,7 @@ public class WSocket : ISocket, INetworkReceiver long totalSent, totalReceived; - bool processing = false; + public IPEndPoint LocalEndPoint { @@ -254,14 +254,12 @@ public class WSocket : ISocket, INetworkReceiver public void NetworkReceive(ISocket sender, NetworkBuffer buffer) { - if (sock.State == SocketState.Closed)// || sock.State == SocketState.Terminated) + if (sock.State == SocketState.Closed) return; if (buffer.Protected) return; - if (processing) - return; var msg = buffer.Read(); @@ -270,7 +268,6 @@ public class WSocket : ISocket, INetworkReceiver return; var wsPacketLength = pkt_receive.Parse(msg, 0, (uint)msg.Length); - //Console.WriteLine("WSP: " + wsPacketLength); if (wsPacketLength < 0) { @@ -289,12 +286,14 @@ public class WSocket : ISocket, INetworkReceiver } else if (pkt_receive.Opcode == WebsocketPacket.WSOpcode.Ping) { - var pkt_pong = new WebsocketPacket(); + var pkt_pong = new WebsocketPacket() + { + FIN = true, + Mask = false, + Opcode = WebsocketPacket.WSOpcode.Pong, + Message = pkt_receive.Message + }; - pkt_pong.FIN = true; - pkt_pong.Mask = false; - pkt_pong.Opcode = WebsocketPacket.WSOpcode.Pong; - pkt_pong.Message = pkt_receive.Message; offset += (uint)wsPacketLength; Send(pkt_pong); @@ -322,7 +321,6 @@ public class WSocket : ISocket, INetworkReceiver if (offset == msg.Length) { - //OnReceive?.Invoke(receiveNetworkBuffer); Receiver?.NetworkReceive(this, receiveNetworkBuffer); return; } @@ -330,25 +328,20 @@ public class WSocket : ISocket, INetworkReceiver wsPacketLength = pkt_receive.Parse(msg, offset, (uint)msg.Length); } - if (wsPacketLength < 0)//(offset < msg.Length) && (offset > 0)) + if (wsPacketLength < 0) { - //receiveNetworkBuffer.HoldFor(msg, offset, (uint)(msg.Length - offset), (uint)msg.Length + (uint)-wsPacketLength); // save the incomplete packet to the heldBuffer queue - buffer.HoldFor(msg, offset, (uint)(msg.Length - offset), (uint)(msg.Length - offset) + (uint)-wsPacketLength); } //Console.WriteLine("WS IN: " + receiveNetworkBuffer.Available); - //OnReceive?.Invoke(receiveNetworkBuffer); Receiver?.NetworkReceive(this, receiveNetworkBuffer); - processing = false; if (buffer.Available > 0 && !buffer.Protected) - Receiver?.NetworkReceive(this, buffer); - //Sock_OnReceive(buffer); + NetworkReceive(this, buffer); } diff --git a/Esiur/Proxy/ResourceGenerator.cs b/Esiur/Proxy/ResourceGenerator.cs index 814931d..9e8fb3f 100644 --- a/Esiur/Proxy/ResourceGenerator.cs +++ b/Esiur/Proxy/ResourceGenerator.cs @@ -43,7 +43,7 @@ public class ResourceGenerator : ISourceGenerator { if (tmp.Type == TemplateType.Resource) { - var source = TemplateGenerator.GenerateClass(tmp, templates); + var source = TemplateGenerator.GenerateClass(tmp, templates, false); // File.WriteAllText($@"C:\gen\{tmp.ClassName}.cs", source); context.AddSource(tmp.ClassName + ".Generated.cs", source); } @@ -100,7 +100,6 @@ public class ResourceGenerator : ISourceGenerator continue; - //File.WriteAllLines("C:\\gen\\ref.log", context.Compilation.ReferencedAssemblyNames.Select(x => x.ToString())); if (cache.Contains(path)) { @@ -131,7 +130,6 @@ public class ResourceGenerator : ISourceGenerator catch (Exception ex) { ReportError(context, ex.Source, ex.Message, "Esiur"); - //System.IO.File.AppendAllText("c:\\gen\\error.log", ex.ToString() + "\r\n"); } //inProgress.Remove(path); diff --git a/Esiur/Proxy/TemplateGenerator.cs b/Esiur/Proxy/TemplateGenerator.cs index 04d89ee..5a00fab 100644 --- a/Esiur/Proxy/TemplateGenerator.cs +++ b/Esiur/Proxy/TemplateGenerator.cs @@ -170,7 +170,7 @@ public static class TemplateGenerator return (representationType.Nullable) ? name + "?" : name; } - public static string GetTemplate(string url, string dir = null, string username = null, string password = null) + public static string GetTemplate(string url, string dir = null, string username = null, string password = null, bool asyncSetters = false) { try { @@ -209,7 +209,7 @@ public static class TemplateGenerator { if (tmp.Type == TemplateType.Resource) { - var source = GenerateClass(tmp, templates); + var source = GenerateClass(tmp, templates, asyncSetters); File.WriteAllText(tempDir.FullName + Path.DirectorySeparatorChar + tmp.ClassName + ".Generated.cs", source); } else if (tmp.Type == TemplateType.Record) @@ -253,7 +253,7 @@ public static class TemplateGenerator } } - internal static string GenerateClass(TypeTemplate template, TypeTemplate[] templates) + internal static string GenerateClass(TypeTemplate template, TypeTemplate[] templates, bool asyncSetters) { var cls = template.ClassName.Split('.'); @@ -353,7 +353,10 @@ public static class TemplateGenerator var ptTypeName = GetTypeName(p.ValueType, templates); rt.AppendLine($"[Public] public {ptTypeName} {p.Name} {{"); rt.AppendLine($"get => ({ptTypeName})properties[{p.Index}];"); - rt.AppendLine($"set => _SetSync({p.Index}, value);"); + if (asyncSetters) + rt.AppendLine($"set => _Set({p.Index}, value);"); + else + rt.AppendLine($"set => _SetSync({p.Index}, value);"); rt.AppendLine("}"); } diff --git a/Esiur/Tools/Esiur.psd1 b/Esiur/Tools/Esiur.psd1 index 06ef1a21d5c9625c7213ad3b8c1562e7a5bb3d04..3d425cd606e84555d68191a23597ab535cb64d20 100644 GIT binary patch delta 22 ccmaE5^TlSvG8Rr<1|tRoAU4{(l;x--09pG6=Kufz delta 12 Ucmexj^U7w!GM3F7SdK^n04rVv^#A|> diff --git a/Esiur/Tools/Esiur.psm1 b/Esiur/Tools/Esiur.psm1 index 7f1a4d2..90f3c3a 100644 --- a/Esiur/Tools/Esiur.psm1 +++ b/Esiur/Tools/Esiur.psm1 @@ -1,11 +1,11 @@ -function Get-Template($url, $dir, $username, $password) +function Get-Template($url, $dir, $username, $password, $asyncSetters) { $lib = Resolve-Path -Path "$($PSScriptRoot)\..\lib\netstandard2.0\Esiur.dll" #write-host "Lib is at $($lib)" $assembly = [Reflection.Assembly]::LoadFile($lib) - $tempPath = [Esiur.Proxy.TemplateGenerator]::GetTemplate($url, $dir, $username,$password); + $tempPath = [Esiur.Proxy.TemplateGenerator]::GetTemplate($url, $dir, $username,$password, $asyncSetters); $startupProject = GetStartupProject diff --git a/Test/Program.cs b/Test/Program.cs index c8d6fcc..32f8ecb 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -53,8 +53,7 @@ namespace Test { static async Task Main(string[] args) - { - + { // Create stores to keep objects. var system = await Warehouse.Put("sys", new MemoryStore());