2
0
mirror of https://github.com/esiur/esiur-dart.git synced 2025-05-07 12:22:57 +00:00

Resume connection

This commit is contained in:
Ahmed Zamil 2020-03-25 04:59:27 +03:00
parent 586088aad9
commit 09b010612a
5 changed files with 210 additions and 77 deletions

View File

@ -21,6 +21,9 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE. SOFTWARE.
*/ */
import '../../Core/AsyncBag.dart';
import '../Sockets/TCPSocket.dart'; import '../Sockets/TCPSocket.dart';
import 'DistributedPropertyContext.dart'; import 'DistributedPropertyContext.dart';
import '../../Data/PropertyValue.dart'; import '../../Data/PropertyValue.dart';
@ -97,15 +100,17 @@ class DistributedConnection extends NetworkConnection with IStore
DC _localPassword; DC _localPassword;
DC _localNonce, _remoteNonce; DC _localNonce, _remoteNonce;
bool _ready = false, _readyToEstablish = false; String _hostname;
int _port;
DateTime _loginDate; bool _ready = false, _readyToEstablish = false;
KeyList<int, DistributedResource> _resources = new KeyList<int, DistributedResource>(); KeyList<int, DistributedResource> _resources = new KeyList<int, DistributedResource>();
KeyList<int, AsyncReply<DistributedResource>> _resourceRequests = new KeyList<int, AsyncReply<DistributedResource>>(); KeyList<int, AsyncReply<DistributedResource>> _resourceRequests = new KeyList<int, AsyncReply<DistributedResource>>();
KeyList<Guid, AsyncReply<ResourceTemplate>> _templateRequests = new KeyList<Guid, AsyncReply<ResourceTemplate>>(); KeyList<Guid, AsyncReply<ResourceTemplate>> _templateRequests = new KeyList<Guid, AsyncReply<ResourceTemplate>>();
KeyList<String, AsyncReply<IResource>> _pathRequests = new KeyList<String, AsyncReply<IResource>>(); //KeyList<String, AsyncReply<IResource>> _pathRequests = new KeyList<String, AsyncReply<IResource>>();
Map<Guid, ResourceTemplate> _templates = new Map<Guid, ResourceTemplate>(); Map<Guid, ResourceTemplate> _templates = new Map<Guid, ResourceTemplate>();
KeyList<int, AsyncReply<dynamic>> _requests = new KeyList<int, AsyncReply<dynamic>>(); KeyList<int, AsyncReply<dynamic>> _requests = new KeyList<int, AsyncReply<dynamic>>();
int _callbackCounter = 0; int _callbackCounter = 0;
@ -177,7 +182,7 @@ class DistributedConnection extends NetworkConnection with IStore
{ {
var host = instance.name.split(":");// ("://").skip(1).join("://").split("/")[0]; var host = instance.name.split(":");
// assign domain from hostname if not provided // assign domain from hostname if not provided
var address = host[0]; var address = host[0];
@ -186,29 +191,107 @@ class DistributedConnection extends NetworkConnection with IStore
var domain = instance.attributes.containsKey("domain") ? instance.attributes["domain"] : address; var domain = instance.attributes.containsKey("domain") ? instance.attributes["domain"] : address;
_session = new Session(new ClientAuthentication() var password = DC.stringToBytes(instance.attributes["password"].toString());
, new HostAuthentication());
_session.localAuthentication.domain = domain;
_session.localAuthentication.username = username;
_localPassword = DC.stringToBytes(instance.attributes["password"].toString());
_openReply = new AsyncReply<bool>();
var sock = new TCPSocket();
sock.connect(address, port).then<dynamic>((x){
assign(sock);
//rt.trigger(true);
}).error((x)=>_openReply.triggerError(x));
return _openReply; return connect(domain: domain, hostname: address, port: port, password: password, username: username);
} }
} }
return new AsyncReply<bool>.ready(true); return new AsyncReply<bool>.ready(true);
} }
AsyncReply<bool> connect({ISocket socket, String hostname, int port, String username, DC password, String domain})
{
if (_openReply != null)
throw AsyncException(ErrorType.Exception, 0, "Connection in progress");
_openReply = new AsyncReply<bool>();
if (hostname != null)
{
_session = new Session(new ClientAuthentication()
, new HostAuthentication());
_session.localAuthentication.domain = domain;
_session.localAuthentication.username = username;
_localPassword = password;
}
if (_session == null)
throw AsyncException(ErrorType.Exception, 0, "Session not initialized");
if (socket == null)
socket = new TCPSocket();
_port = port ?? _port;
_hostname = hostname ?? _hostname;
socket.connect(_hostname, _port).then<dynamic>((x){
assign(socket);
}).error((x){
_openReply.triggerError(x);
_openReply = null;
});
return _openReply;
}
@override
void connectionClosed()
{
// clean up
_ready = false;
_readyToEstablish = false;
_requests.values.forEach((x)=>x.triggerError(AsyncException(ErrorType.Management, 0, "Connection closed")));
_resourceRequests.values.forEach((x)=>x.triggerError(AsyncException(ErrorType.Management, 0, "Connection closed")));
_templateRequests.values.forEach((x)=>x.triggerError(AsyncException(ErrorType.Management, 0, "Connection closed")));
_requests.clear();
_resourceRequests.clear();
_templateRequests.clear();
_resources.values.forEach((x)=>x.suspend());
}
Future<bool> reconnect() async
{
try
{
if (await connect())
{
try
{
var bag = AsyncBag();
for(var i = 0; i < _resources.keys.length; i++)
{
var index = _resources.keys.elementAt(i);
// print("Re $i ${_resources[index].instance.template.className}");
bag.add(fetch(index));
}
bag.seal();
await bag;
}
catch(ex)
{
print(ex.toString());
}
}
}
catch(ex)
{
return false;
}
return true;
}
/// <summary> /// <summary>
/// KeyList to store user variables related to this connection. /// KeyList to store user variables related to this connection.
/// </summary> /// </summary>
@ -227,10 +310,10 @@ class DistributedConnection extends NetworkConnection with IStore
{ {
super.assign(socket); super.assign(socket);
session.remoteAuthentication.source.attributes.add(SourceAttributeType.IPv4, socket.remoteEndPoint.address); session.remoteAuthentication.source.attributes[SourceAttributeType.IPv4] = socket.remoteEndPoint.address;
session.remoteAuthentication.source.attributes.add(SourceAttributeType.Port, socket.remoteEndPoint.port); session.remoteAuthentication.source.attributes[SourceAttributeType.Port] = socket.remoteEndPoint.port;
session.localAuthentication.source.attributes.add(SourceAttributeType.IPv4, socket.localEndPoint.address); session.localAuthentication.source.attributes[SourceAttributeType.IPv4] = socket.localEndPoint.address;
session.localAuthentication.source.attributes.add(SourceAttributeType.Port, socket.localEndPoint.port); session.localAuthentication.source.attributes[SourceAttributeType.Port] = socket.localEndPoint.port;
if (session.localAuthentication.type == AuthenticationType.Client) if (session.localAuthentication.type == AuthenticationType.Client)
{ {
@ -680,6 +763,7 @@ class DistributedConnection extends NetworkConnection with IStore
_ready = true; _ready = true;
_openReply.trigger(true); _openReply.trigger(true);
_openReply = null;
emitArgs("ready", []); emitArgs("ready", []);
//OnReady?.Invoke(this); //OnReady?.Invoke(this);
// server.membership.login(session); // server.membership.login(session);
@ -747,6 +831,7 @@ class DistributedConnection extends NetworkConnection with IStore
_ready = true; _ready = true;
_openReply.trigger(true); _openReply.trigger(true);
_openReply = null;
emitArgs("ready", []); emitArgs("ready", []);
//OnReady?.Invoke(this); //OnReady?.Invoke(this);
@ -757,6 +842,7 @@ class DistributedConnection extends NetworkConnection with IStore
{ {
var ex = AsyncException(ErrorType.Management, _authPacket.errorCode, _authPacket.errorMessage); var ex = AsyncException(ErrorType.Management, _authPacket.errorCode, _authPacket.errorMessage);
_openReply.triggerError(ex); _openReply.triggerError(ex);
_openReply = null;
emitArgs("error", [ex]); emitArgs("error", [ex]);
//OnError?.Invoke(this, authPacket.ErrorCode, authPacket.ErrorMessage); //OnError?.Invoke(this, authPacket.ErrorCode, authPacket.ErrorMessage);
close(); close();
@ -815,6 +901,10 @@ class DistributedConnection extends NetworkConnection with IStore
return true; return true;
} }
bool record(IResource resource, String propertyName, value, int age, DateTime dateTime) bool record(IResource resource, String propertyName, value, int age, DateTime dateTime)
{ {
// nothing to do // nothing to do
@ -874,6 +964,18 @@ class DistributedConnection extends NetworkConnection with IStore
return reply; return reply;
} }
AsyncReply<dynamic> sendDetachRequest(int instanceId)
{
try
{
return sendRequest(IIPPacketAction.DetachResource).addUint32(instanceId).done();
}
catch(ex)
{
return null;
}
}
AsyncReply<dynamic> sendInvokeByNamedArguments(int instanceId, int index, Structure parameters) AsyncReply<dynamic> sendInvokeByNamedArguments(int instanceId, int index, Structure parameters)
{ {
var pb = Codec.composeStructure(parameters, this, true, true, true); var pb = Codec.composeStructure(parameters, this, true, true, true);
@ -2148,48 +2250,44 @@ class DistributedConnection extends NetworkConnection with IStore
/// <returns>DistributedResource</returns> /// <returns>DistributedResource</returns>
AsyncReply<DistributedResource> fetch(int id) AsyncReply<DistributedResource> fetch(int id)
{ {
if (_resourceRequests.containsKey(id) && _resources.containsKey(id)) var resource = _resources[id];
{ var request = _resourceRequests[id];
//Console.WriteLine("DEAD LOCK " + id);
return new AsyncReply<DistributedResource>.ready(_resources[id]); if (request != null)
{
// dig for dead locks // dig for dead locks
//return resourceRequests[id]; if (resource != null) // dead lock
return new AsyncReply<DistributedResource>.ready(_resources[id]);
else
return request;
} }
else if (_resourceRequests.containsKey(id)) else if (resource != null && !resource.suspended)
return _resourceRequests[id]; return new AsyncReply<DistributedResource>.ready(resource);
else if (_resources.containsKey(id))
return new AsyncReply<DistributedResource>.ready(_resources[id]);
var reply = new AsyncReply<DistributedResource>(); var reply = new AsyncReply<DistributedResource>();
_resourceRequests.add(id, reply); _resourceRequests.add(id, reply);
//print("fetch ${id}");
sendRequest(IIPPacketAction.AttachResource) sendRequest(IIPPacketAction.AttachResource)
.addUint32(id) .addUint32(id)
.done() .done()
.then<dynamic>((rt) .then<dynamic>((rt)
{ {
var dr = resource ?? new DistributedResource(this, id, rt[1], rt[2]);
//print("fetched ${id}");
var dr = new DistributedResource(this, id, rt[1], rt[2]);
getTemplate(rt[0] as Guid).then<dynamic>((tmp) getTemplate(rt[0] as Guid).then<dynamic>((tmp)
{ {
//print("New template "); //print("New template ");
// ClassId, ResourceAge, ResourceLink, Content // ClassId, ResourceAge, ResourceLink, Content
Warehouse.put(dr, id.toString(), this, null, tmp); if (resource == null)
Warehouse.put(dr, id.toString(), this, null, tmp);
var d = rt[3] as DC; var d = rt[3] as DC;
Codec.parsePropertyValueArray(d, 0, d.length, this).then((ar) Codec.parsePropertyValueArray(d, 0, d.length, this).then((ar)
{ {
//print("attached"); //print("attached");
dr.attached(ar); dr.attach(ar);
_resourceRequests.remove(id); _resourceRequests.remove(id);
reply.trigger(dr); reply.trigger(dr);
}); });

View File

@ -22,6 +22,8 @@ SOFTWARE.
*/ */
import 'package:esyur/esyur.dart';
import '../../Resource/IResource.dart'; import '../../Resource/IResource.dart';
import '../../Core/AsyncReply.dart'; import '../../Core/AsyncReply.dart';
import '../../Data/PropertyValue.dart'; import '../../Data/PropertyValue.dart';
@ -36,11 +38,8 @@ class DistributedResource extends IResource
int _instanceId; int _instanceId;
DistributedConnection _connection; DistributedConnection _connection;
bool _attached = false;
bool _isAttached = false; //bool _isReady = false;
bool _isReady = false;
String _link; String _link;
List _properties; List _properties;
@ -63,24 +62,37 @@ class DistributedResource extends IResource
/// </summary> /// </summary>
int get id => _instanceId; int get id => _instanceId;
//bool get destroyed => _destroyed;
bool get suspended => _suspended;
bool _suspended = true;
/// <summary> /// <summary>
/// IDestructible interface. /// IDestructible interface.
/// </summary> /// </summary>
void destroy() void destroy()
{ {
_destroyed = true; _destroyed = true;
_attached = false;
_connection.sendDetachRequest(_instanceId);
emitArgs("destroy", [this]); emitArgs("destroy", [this]);
} }
void suspend()
{
_suspended = true;
_attached = false;
}
/// <summary> /// <summary>
/// Resource is ready when all its properties are attached. /// Resource is ready when all its properties are attached.
/// </summary> /// </summary>
bool get isReady => _isReady; // bool get isReady => _isReady;
/// <summary> /// <summary>
/// Resource is attached when all its properties are received. /// Resource is attached when all its properties are received.
/// </summary> /// </summary>
bool get isAttached => _isAttached; bool get attached => _attached;
// public DistributedResourceStack Stack // public DistributedResourceStack Stack
@ -102,10 +114,10 @@ class DistributedResource extends IResource
this._instanceId = instanceId; this._instanceId = instanceId;
} }
void _ready() //void _ready()
{ //{
_isReady = true; // _isReady = true;
} // }
/// <summary> /// <summary>
/// Export all properties with ResourceProperty attributed as bytes array. /// Export all properties with ResourceProperty attributed as bytes array.
@ -123,12 +135,14 @@ class DistributedResource extends IResource
return props; return props;
} }
bool attached(List<PropertyValue> properties) bool attach(List<PropertyValue> properties)
{ {
if (_isAttached) if (_attached)
return false; return false;
else else
{ {
_suspended = false;
_properties = new List(properties.length);// object[properties.Length]; _properties = new List(properties.length);// object[properties.Length];
//_events = new DistributedResourceEvent[Instance.Template.Events.Length]; //_events = new DistributedResourceEvent[Instance.Template.Events.Length];
@ -146,7 +160,7 @@ class DistributedResource extends IResource
//afterAttachmentTriggers.Clear(); //afterAttachmentTriggers.Clear();
_isAttached = true; _attached = true;
} }
return true; return true;
@ -166,6 +180,9 @@ class DistributedResource extends IResource
if (_destroyed) if (_destroyed)
throw new Exception("Trying to access destroyed object"); throw new Exception("Trying to access destroyed object");
if (_suspended)
throw new Exception("Trying to access suspended object");
if (index >= instance.template.functions.length) if (index >= instance.template.functions.length)
throw new Exception("Function index is incorrect"); throw new Exception("Function index is incorrect");
@ -173,11 +190,16 @@ class DistributedResource extends IResource
return connection.sendInvokeByNamedArguments(_instanceId, index, namedArgs); return connection.sendInvokeByNamedArguments(_instanceId, index, namedArgs);
} }
AsyncReply<dynamic> invokeByArrayArguments(int index, List<dynamic> args) AsyncReply<dynamic> invokeByArrayArguments(int index, List<dynamic> args)
{ {
if (_destroyed) if (_destroyed)
throw new Exception("Trying to access destroyed object"); throw new Exception("Trying to access destroyed object");
if (_suspended)
throw new Exception("Trying to access suspended object");
if (index >= instance.template.functions.length) if (index >= instance.template.functions.length)
throw new Exception("Function index is incorrect"); throw new Exception("Function index is incorrect");
@ -217,7 +239,7 @@ class DistributedResource extends IResource
{ {
var ft = instance.template.getFunctionTemplateByName(memberName); var ft = instance.template.getFunctionTemplateByName(memberName);
if (_isAttached && ft!=null) if (_attached && ft!=null)
{ {
if (invocation.namedArguments.length > 0) if (invocation.namedArguments.length > 0)
{ {

View File

@ -48,11 +48,17 @@ class NetworkConnection extends IDestructible
bool _processing = false; bool _processing = false;
// to be overridden
void connectionClosed()
{
}
void destroy() void destroy()
{ {
// if (connected) // if (connected)
close(); close();
emitArgs("close", [this]); //emitArgs("close", [this]);
//OnDestroy?.Invoke(this); //OnDestroy?.Invoke(this);
} }
@ -73,7 +79,6 @@ class NetworkConnection extends IDestructible
socket.on("receive", socket_OnReceive); socket.on("receive", socket_OnReceive);
socket.on("close", socket_OnClose); socket.on("close", socket_OnClose);
socket.on("connect", socket_OnConnect); socket.on("connect", socket_OnConnect);
} }
@ -84,6 +89,7 @@ class NetworkConnection extends IDestructible
void socket_OnClose() void socket_OnClose()
{ {
connectionClosed();
emitArgs("close", [this]); emitArgs("close", [this]);
} }
@ -153,10 +159,8 @@ class NetworkConnection extends IDestructible
void close() void close()
{ {
try try
{ {
if (_sock != null) if (_sock != null)
_sock.close(); _sock.close();
} }

View File

@ -246,19 +246,10 @@ class Warehouse
var rt = new AsyncReply<IResource>(); var rt = new AsyncReply<IResource>();
// Should we create a new store ? // Should we create a new store ?
if (_urlRegex.hasMatch(path)) if (_urlRegex.hasMatch(path))
//if (path.contains("://"))
{ {
var url = _urlRegex.allMatches(path).first; var url = _urlRegex.allMatches(path).first;
//var url = path.split(_urlRegex);
//var url = path.split(new string[] { "://" }, 2, StringSplitOptions.None);
//var hostname = url[1].Split(new char[] { '/' }, 2)[0];
//var pathname = string.Join("/", url[1].Split(new char[] { '/' }).Skip(1));
if (protocols.containsKey(url[1])) if (protocols.containsKey(url[1]))
{ {
var handler = protocols[url[1]]; var handler = protocols[url[1]];
@ -266,7 +257,6 @@ class Warehouse
var store = handler(); var store = handler();
put(store, url[2], null, parent, null, 0, manager, attributes); put(store, url[2], null, parent, null, 0, manager, attributes);
store.trigger(ResourceTrigger.Open).then<dynamic>((x) store.trigger(ResourceTrigger.Open).then<dynamic>((x)
{ {

View File

@ -2,20 +2,39 @@ import "package:test/test.dart";
import 'package:esyur/esyur.dart'; import 'package:esyur/esyur.dart';
import 'dart:io'; import 'dart:io';
main() main() async
{ {
test("Connect to server", () async { //test("Connect to server", () async {
// connect to the server // connect to the server
// var x = await Warehouse.get("iip://localhost:5000/sys/su", {"username": "admin", "password": "1234" var x = await Warehouse.get("iip://localhost:5000/sys/su", {"username": "admin", "password": "1234"
// , "domain": "example.com"}); , "domain": "example.com"});
var now = DateTime.now();
// desc(x); // desc(x);
// List<dynamic> trackers = await x.getMyTrackers(); List<dynamic> trackers = await x.getMyTrackers();
print("Time ${DateTime.now().difference(now).inSeconds}");
print(x.suspended);
DistributedConnection con = x.connection;
con.close();
print(x.suspended);
now = DateTime.now();
await con.reconnect();
print("Time ${DateTime.now().difference(now).inSeconds}");
print(x.suspended);
var u = await x.getMyTrackers();
print(trackers[0].suspended);
// for(var i = 0; i < trackers.length; i++) // for(var i = 0; i < trackers.length; i++)
// print(trackers[i].name); // print(trackers[i].name);
@ -53,7 +72,7 @@ main()
//print("Done"); //print("Done");
}); //});
} }