2
0
mirror of https://github.com/esiur/esiur-dart.git synced 2025-09-13 20:13:19 +00:00
This commit is contained in:
2022-04-02 22:36:52 +03:00
parent 9be227a65e
commit 872da0a2bf
9 changed files with 281 additions and 227 deletions

View File

@@ -123,6 +123,7 @@ class DistributedConnection extends NetworkConnection with IStore {
KeyList<int, AsyncReply<DistributedResource>> _resourceRequests =
new KeyList<int, AsyncReply<DistributedResource>>();
KeyList<Guid, AsyncReply<TypeTemplate?>> _templateRequests =
new KeyList<Guid, AsyncReply<TypeTemplate?>>();
//KeyList<String, AsyncReply<IResource>> _pathRequests = new KeyList<String, AsyncReply<IResource>>();
@@ -325,7 +326,7 @@ class DistributedConnection extends NetworkConnection with IStore {
for (var i = 0; i < _resources.keys.length; i++) {
var index = _resources.keys.elementAt(i);
// print("Re $i ${_resources[index].instance.template.className}");
bag.add(fetch(index));
bag.add(fetch(index, null));
}
bag.seal();
@@ -461,7 +462,9 @@ class DistributedConnection extends NetworkConnection with IStore {
String? link(IResource resource) {
if (resource is DistributedResource) {
if (resource.instance?.store == this)
return (this.instance?.name ?? "") + "/" + resource.distributedResourceInstanceId.toString();
return (this.instance?.name ?? "") +
"/" +
resource.distributedResourceInstanceId.toString();
}
return null;
@@ -486,11 +489,11 @@ class DistributedConnection extends NetworkConnection with IStore {
var packet = new IIPPacket();
if (_ready) {
print("Inc " + msg.length.toString());
//print("Inc " + msg.length.toString());
var rt = packet.parse(msg, offset, ends);
print("Packet " + packet.toString());
//print("Packet " + packet.toString());
if (rt <= 0) {
// print("hold");
@@ -960,8 +963,7 @@ class DistributedConnection extends NetworkConnection with IStore {
@override
void dataReceived(NetworkBuffer data) {
print("dataReceived");
// Console.WriteLine("DR " + hostType + " " + data.Available + " " + RemoteEndPoint.ToString());
// print("dataReceived");
var msg = data.read();
int offset = 0;
@@ -996,7 +998,9 @@ class DistributedConnection extends NetworkConnection with IStore {
AsyncReply<bool> put(IResource resource) {
if (Codec.isLocalResource(resource, this))
_resources.add(
(resource as DistributedResource).distributedResourceInstanceId as int, resource);
(resource as DistributedResource).distributedResourceInstanceId
as int,
resource);
// else .. put it in the server....
return AsyncReply.ready(true);
}
@@ -1140,10 +1144,11 @@ class DistributedConnection extends NetworkConnection with IStore {
req?.trigger(results);
}
// @TODO: check for deadlocks
void iipReplyInvoke(int callbackId, TransmissionType dataType, DC data) {
var req = _requests.take(callbackId);
Codec.parse(data, 0, this, dataType).reply.then((rt) {
Codec.parse(data, 0, this, null, dataType).reply.then((rt) {
req?.trigger(rt);
});
}
@@ -1160,10 +1165,11 @@ class DistributedConnection extends NetworkConnection with IStore {
req?.triggerProgress(type, value, max);
}
// @TODO: Check for deadlocks
void iipReportChunk(int callbackId, TransmissionType dataType, DC data) {
if (_requests.containsKey(callbackId)) {
var req = _requests[callbackId];
Codec.parse(data, 0, this, dataType).reply.then((x) {
Codec.parse(data, 0, this, null, dataType).reply.then((x) {
req?.triggerChunk(x);
});
}
@@ -1179,13 +1185,14 @@ class DistributedConnection extends NetworkConnection with IStore {
}
}
// @TODO: Check for deadlocks
void iipEventPropertyUpdated(
int resourceId, int index, TransmissionType dataType, DC data) {
fetch(resourceId).then((r) {
fetch(resourceId, null).then((r) {
var item = new AsyncReply<DistributedResourceQueueItem>();
_queue.add(item);
Codec.parse(data, 0, this, dataType).reply.then((arguments) {
Codec.parse(data, 0, this, null, dataType).reply.then((arguments) {
var pt = r.instance?.template.getPropertyTemplateByIndex(index);
if (pt != null) {
item.trigger(DistributedResourceQueueItem(
@@ -1234,14 +1241,15 @@ class DistributedConnection extends NetworkConnection with IStore {
*/
}
// @TODO: Check for deadlocks
void iipEventEventOccurred(
int resourceId, int index, TransmissionType dataType, DC data) {
fetch(resourceId).then((r) {
fetch(resourceId, null).then((r) {
// push to the queue to gaurantee serialization
var item = new AsyncReply<DistributedResourceQueueItem>();
_queue.add(item);
Codec.parse(data, 0, this, dataType).reply.then((arguments) {
Codec.parse(data, 0, this, null, dataType).reply.then((arguments) {
var et = r.instance?.template.getEventTemplateByIndex(index);
if (et != null) {
item.trigger(new DistributedResourceQueueItem(
@@ -1289,26 +1297,29 @@ class DistributedConnection extends NetworkConnection with IStore {
*/
}
// @TODO: check for deadlocks
void iipEventChildAdded(int resourceId, int childId) {
fetch(resourceId).then((parent) {
fetch(resourceId, null).then((parent) {
if (parent != null)
fetch(childId).then((child) {
fetch(childId, null).then((child) {
if (child != null) parent.instance?.children.add(child);
});
});
}
// @TODO: check for deadlocks
void iipEventChildRemoved(int resourceId, int childId) {
fetch(resourceId).then((parent) {
fetch(resourceId, null).then((parent) {
if (parent != null)
fetch(childId).then((child) {
fetch(childId, null).then((child) {
if (child != null) parent.instance?.children.remove(child);
});
});
}
// @TODO: check for deadlocks
void iipEventRenamed(int resourceId, String name) {
fetch(resourceId)
fetch(resourceId, null)
..then((resource) {
if (resource != null) {
resource.instance?.attributes["name"] = name;
@@ -1316,8 +1327,9 @@ class DistributedConnection extends NetworkConnection with IStore {
});
}
// @TODO: check for deadlocks
void iipEventAttributesUpdated(int resourceId, DC attributes) {
fetch(resourceId)
fetch(resourceId, null)
..then((resource) {
if (resource != null) {
var attrs = attributes.getStringArray(0, attributes.length);
@@ -1511,16 +1523,17 @@ class DistributedConnection extends NetworkConnection with IStore {
return;
}
DataDeserializer.listParser(content, offset, cl, this)
// @TODO: check for deadlocks
DataDeserializer.listParser(content, offset, cl, this, null)
.then((parameters) {
offset += cl;
cl = content.getUint32(offset);
DataDeserializer.typedMapParser(content, offset, cl, this)
DataDeserializer.typedMapParser(content, offset, cl, this, null)
.then((attributes) {
offset += cl;
cl = content.length - offset;
DataDeserializer.typedMapParser(content, offset, cl, this)
DataDeserializer.typedMapParser(content, offset, cl, this, null)
.then((values) {
var constructors =
[]; //Type.GetType(className).GetTypeInfo().GetConstructors();
@@ -1813,7 +1826,8 @@ class DistributedConnection extends NetworkConnection with IStore {
return;
}
DataDeserializer.typedListParser(attributes, 0, attributes.length, this)
DataDeserializer.typedListParser(
attributes, 0, attributes.length, this, null)
.then((attrs) {
if (r.instance?.setAttributes(
attrs as Map<String, dynamic>, clearAttributes) ==
@@ -1947,11 +1961,12 @@ class DistributedConnection extends NetworkConnection with IStore {
void IIPRequestResourceAttribute(int callback, int resourceId) {}
// @TODO: Check for deadlocks
void iipRequestInvokeFunction(int callback, int resourceId, int index,
TransmissionType dataType, DC data) {
Warehouse.getById(resourceId).then((r) {
if (r != null) {
Codec.parse(data, 0, this, dataType).reply.then((arguments) {
Codec.parse(data, 0, this, null, dataType).reply.then((arguments) {
var ft = r.instance?.template.getFunctionTemplateByIndex(index);
if (ft != null) {
if (r is DistributedResource) {
@@ -2160,13 +2175,14 @@ class DistributedConnection extends NetworkConnection with IStore {
// });
// }
// @TODO: Check for deadlocks
void iipRequestSetProperty(int callback, int resourceId, int index,
TransmissionType dataType, DC data) {
Warehouse.getById(resourceId).then((r) {
if (r != null) {
var pt = r.instance?.template.getPropertyTemplateByIndex(index);
if (pt != null) {
Codec.parse(data, 0, this, dataType).reply.then((value) {
Codec.parse(data, 0, this, null, dataType).reply.then((value) {
if (r is DistributedResource) {
// propagation
(r as DistributedResource).set(index, value).then<dynamic>((x) {
@@ -2238,6 +2254,8 @@ class DistributedConnection extends NetworkConnection with IStore {
/// <param name="classId">Class GUID.</param>
/// <returns>TypeTemplate.</returns>
AsyncReply<TypeTemplate?> getTemplate(Guid classId) {
//Warehouse.getTemplateByClassId(classId)
if (_templates.containsKey(classId))
return AsyncReply<TypeTemplate?>.ready(_templates[classId]);
else if (_templateRequests.containsKey(classId))
@@ -2338,99 +2356,110 @@ class DistributedConnection extends NetworkConnection with IStore {
/// <param name="classId">Class GUID</param>
/// <param name="id">Resource Id</param>Guid classId
/// <returns>DistributedResource</returns>
AsyncReply<DistributedResource> fetch(int id) {
AsyncReply<DistributedResource> fetch(int id, List<int>? requestSequence) {
var resource = _resources[id];
var request = _resourceRequests[id];
//print("fetch $id");
if (request != null) {
// dig for dead locks
if (resource != null) // dead lock
if (resource != null && (requestSequence?.contains(id) ?? false))
return AsyncReply<DistributedResource>.ready(resource);
else
return request;
return request;
} else if (resource != null && !resource.distributedResourceSuspended)
return new AsyncReply<DistributedResource>.ready(resource);
var reply = new AsyncReply<DistributedResource>();
_resourceRequests.add(id, reply);
//print("AttachResource sent ${id}");
var newSequence =
requestSequence != null ? List<int>.from(requestSequence) : <int>[];
newSequence.add(id);
(sendRequest(IIPPacketAction.AttachResource)..addUint32(id)).done()
..then((rt) {
if (rt != null) {
// @TODO: Generator code
DistributedResource dr;
//print("AttachResource rec ${id}");
if (resource == null) {
var template = Warehouse.getTemplateByClassId(
rt[0] as Guid, TemplateType.Wrapper);
if (template?.definedType != null) {
dr = Warehouse.createInstance(template?.definedType as Type);
dr.internal_init(this, id, rt[1] as int, rt[2] as String);
} else {
dr = new DistributedResource();
dr.internal_init(this, id, rt[1] as int, rt[2] as String);
}
} else
dr = resource;
// Resource not found (null)
if (rt == null) {
//print("Null response");
reply.triggerError(AsyncException(ErrorType.Management,
ExceptionCode.ResourceNotFound.index, "Null response"));
return;
}
//var dr = resource ?? new DistributedResource(this, id, rt[1], rt[2]);
DistributedResource dr;
TypeTemplate? template;
TransmissionType transmissionType = rt[3] as TransmissionType;
DC content = rt[4] as DC;
Guid classId = rt[0] as Guid;
if (resource == null) {
template =
Warehouse.getTemplateByClassId(classId, TemplateType.Wrapper);
if (template?.definedType != null) {
dr = Warehouse.createInstance(template?.definedType as Type);
dr.internal_init(this, id, rt[1] as int, rt[2] as String);
} else {
dr = new DistributedResource();
dr.internal_init(this, id, rt[1] as int, rt[2] as String);
}
} else
dr = resource;
TransmissionType transmissionType = rt[3] as TransmissionType;
DC content = rt[4] as DC;
var initResource = (ok) {
print("parse req ${id}");
Codec.parse(content, 0, this, newSequence, transmissionType)
.reply
.then((results) {
//print("parsed ${id}");
var pvs = <PropertyValue>[];
var ar = results as List;
for (var i = 0; i < ar.length; i += 3)
pvs.add(new PropertyValue(
ar[i + 2], ar[i] as int, ar[i + 1] as DateTime));
dr.internal_attach(pvs);
_resourceRequests.remove(id);
reply.trigger(dr);
})
..error((ex) => reply.triggerError(ex));
};
if (template == null) {
//print("tmp == null");
getTemplate(rt[0] as Guid)
..then((tmp) {
//print("New template ");
// ClassId, ResourceAge, ResourceLink, Content
if (resource == null) {
Warehouse.put(id.toString(), dr, this, null, tmp)
..then((ok) {
Codec.parse(content, 0, this, transmissionType)
.reply
.then((results) {
var pvs = <PropertyValue>[];
var ar = results as List;
for (var i = 0; i < ar.length; i += 3)
pvs.add(new PropertyValue(
ar[i + 2], ar[i] as int, ar[i + 1] as DateTime));
dr.internal_attach(pvs);
_resourceRequests.remove(id);
reply.trigger(dr);
})
..error((ex) => reply.triggerError(ex));
})
..then(initResource)
..error((ex) => reply.triggerError(ex));
} else {
Codec.parse(content, 0, this, transmissionType)
.reply
.then((results) {
//print("attached");
if (results != null) {
var pvs = <PropertyValue>[];
var ar = results as List;
for (var i = 0; i < ar.length; i += 3)
pvs.add(new PropertyValue(
ar[i + 2], ar[i] as int, ar[i + 1] as DateTime));
dr.internal_attach(pvs);
}
_resourceRequests.remove(id);
reply.trigger(dr);
})
..error((ex) => reply.triggerError(ex));
initResource(resource);
}
})
..error((ex) {
reply.triggerError(ex);
});
} else {
reply.triggerError(Exception("Null response"));
//print("tmp != null");
if (resource == null) {
Warehouse.put(id.toString(), dr, this, null, template)
..then(initResource)
..error((ex) => reply.triggerError(ex));
} else {
initResource(resource);
}
}
})
..error((ex) {
@@ -2440,6 +2469,7 @@ class DistributedConnection extends NetworkConnection with IStore {
return reply;
}
// @TODO: Check for deadlocks
AsyncReply<List<IResource?>> getChildren(IResource resource) {
var rt = new AsyncReply<List<IResource?>>();
@@ -2450,7 +2480,7 @@ class DistributedConnection extends NetworkConnection with IStore {
TransmissionType dataType = ar[0] as TransmissionType;
DC data = ar[1] as DC;
Codec.parse(data, 0, this, dataType).reply.then((resources) {
Codec.parse(data, 0, this, null, dataType).reply.then((resources) {
rt.trigger(resources as List<IResource?>);
})
..error((ex) => rt.triggerError(ex));
@@ -2462,6 +2492,7 @@ class DistributedConnection extends NetworkConnection with IStore {
return rt;
}
// @TODO: Check for deadlocks
AsyncReply<List<IResource?>> getParents(IResource resource) {
var rt = new AsyncReply<List<IResource?>>();
@@ -2471,7 +2502,7 @@ class DistributedConnection extends NetworkConnection with IStore {
if (ar != null) {
TransmissionType dataType = ar[0] as TransmissionType;
DC data = ar[1] as DC;
Codec.parse(data, 0, this, dataType).reply.then((resources) {
Codec.parse(data, 0, this, null, dataType).reply.then((resources) {
rt.trigger(resources as List<IResource>);
})
..error((ex) => rt.triggerError(ex));
@@ -2524,6 +2555,7 @@ class DistributedConnection extends NetworkConnection with IStore {
return rt;
}
// @TODO: Check for deadlocks
AsyncReply<Map<String, dynamic>> getAttributes(IResource resource,
[List<String>? attributes = null]) {
var rt = new AsyncReply<Map<String, dynamic>>();
@@ -2537,7 +2569,7 @@ class DistributedConnection extends NetworkConnection with IStore {
TransmissionType dataType = ar[0] as TransmissionType;
DC data = ar[1] as DC;
Codec.parse(data, 0, this, dataType).reply.then((st) {
Codec.parse(data, 0, this, null, dataType).reply.then((st) {
resource.instance?.setAttributes(st as Map<String, dynamic>);
rt.trigger(st as Map<String, dynamic>);
})
@@ -2560,7 +2592,7 @@ class DistributedConnection extends NetworkConnection with IStore {
TransmissionType dataType = ar[0] as TransmissionType;
DC data = ar[1] as DC;
Codec.parse(data, 0, this, dataType).reply
Codec.parse(data, 0, this, null, dataType).reply
..then((st) {
resource.instance?.setAttributes(st as Map<String, dynamic>);
@@ -2606,7 +2638,7 @@ class DistributedConnection extends NetworkConnection with IStore {
var content = rt[0] as DC;
DataDeserializer.historyParser(
content, 0, content.length, resource, this)
content, 0, content.length, resource, this, null)
.then((history) => reply.trigger(history));
} else {
reply.triggerError(Exception("Null response"));
@@ -2624,6 +2656,7 @@ class DistributedConnection extends NetworkConnection with IStore {
/// </summary>
/// <param name="path">Link path.</param>
/// <returns></returns>
// @TODO: Check for deadlocks
AsyncReply<List<IResource?>> query(String path) {
var str = DC.stringToBytes(path);
var reply = new AsyncReply<List<IResource?>>();
@@ -2636,7 +2669,7 @@ class DistributedConnection extends NetworkConnection with IStore {
TransmissionType dataType = ar[0] as TransmissionType;
DC data = ar[1] as DC;
Codec.parse(data, 0, this, dataType).reply.then((resources) =>
Codec.parse(data, 0, this, null, dataType).reply.then((resources) =>
reply.trigger((resources as List).cast<IResource?>()))
..error((ex) => reply.triggerError(ex));
} else {
@@ -2680,7 +2713,7 @@ class DistributedConnection extends NetworkConnection with IStore {
if (args != null) {
var rid = args[0];
fetch(rid as int).then((r) {
fetch(rid as int, null).then((r) {
reply.trigger(r);
});
} else {

View File

@@ -45,6 +45,8 @@ import '../Packets/IIPPacketAction.dart';
import '../../Resource/Template/EventTemplate.dart';
class DistributedResource extends IResource {
int? _instanceId;
DistributedConnection? _connection;
@@ -105,7 +107,7 @@ class DistributedResource extends IResource {
/// <summary>
/// Resource is attached when all its properties are received.
/// </summary>
bool get attached => _attached;
bool get distributedResourceAttached => _attached;
// public DistributedResourceStack Stack
//{

View File

@@ -397,6 +397,13 @@ class IIPPacket {
if (parsed.type == null) return -parsed.size;
//print("Not enough ${parsed.size}");
// } else {
// print(
// "attach parsed ${parsed.size} ${cl} ${data.length} ${ends} ${offset}");
// }
dataType = parsed.type;
offset += parsed.size;
} else if (action == IIPPacketAction.DetachResource) {