2
0
mirror of https://github.com/esiur/esiur-dotnet.git synced 2025-06-27 13:33:13 +00:00

deadlock prevention

This commit is contained in:
2022-03-31 02:27:30 +03:00
parent f258464063
commit 20fcaba518
7 changed files with 268 additions and 216 deletions

View File

@ -232,7 +232,7 @@ partial class DistributedConnection
{
var req = requests.Take(callbackId);
var (_, parsed) = Codec.Parse(content, 0, this, transmissionType);
var (_, parsed) = Codec.Parse(content, 0, this, null, transmissionType);
parsed.Then((rt) =>
{
req?.Trigger(rt);
@ -256,7 +256,7 @@ partial class DistributedConnection
if (requests.ContainsKey(callbackId))
{
var req = requests[callbackId];
var (_, parsed) = Codec.Parse(data, dataType.Offset, this, dataType);
var (_, parsed) = Codec.Parse(data, dataType.Offset, this, null, dataType);
parsed.Then((x) =>
{
req.TriggerChunk(x);
@ -282,12 +282,12 @@ partial class DistributedConnection
void IIPEventPropertyUpdated(uint resourceId, byte index, TransmissionType dataType, byte[] data)
{
Fetch(resourceId).Then(r =>
Fetch(resourceId, null).Then(r =>
{
var item = new AsyncReply<DistributedResourceQueueItem>();
queue.Add(item);
var (_, parsed) = Codec.Parse(data, dataType.Offset, this, dataType);// 0, this);
var (_, parsed) = Codec.Parse(data, dataType.Offset, this, null, dataType);// 0, this);
parsed.Then((arguments) =>
{
var pt = r.Instance.Template.GetPropertyTemplateByIndex(index);
@ -345,13 +345,13 @@ partial class DistributedConnection
void IIPEventEventOccurred(uint resourceId, byte index, TransmissionType dataType, byte[] data)
{
Fetch(resourceId).Then(r =>
Fetch(resourceId, null).Then(r =>
{
// push to the queue to gaurantee serialization
var item = new AsyncReply<DistributedResourceQueueItem>();
queue.Add(item);
var (_, parsed) = Codec.Parse(data, dataType.Offset, this, dataType);//, 0, this);
var (_, parsed) = Codec.Parse(data, dataType.Offset, this, null, dataType);//, 0, this);
parsed.Then((arguments) =>
{
var et = r.Instance.Template.GetEventTemplateByIndex(index);
@ -406,9 +406,9 @@ partial class DistributedConnection
void IIPEventChildAdded(uint resourceId, uint childId)
{
Fetch(resourceId).Then(parent =>
Fetch(resourceId, null).Then(parent =>
{
Fetch(childId).Then(child =>
Fetch(childId, null).Then(child =>
{
parent.children.Add(child);
child.parents.Add(parent);
@ -420,9 +420,9 @@ partial class DistributedConnection
void IIPEventChildRemoved(uint resourceId, uint childId)
{
Fetch(resourceId).Then(parent =>
Fetch(resourceId, null).Then(parent =>
{
Fetch(childId).Then(child =>
Fetch(childId, null).Then(child =>
{
parent.children.Remove(child);
child.parents.Remove(parent);
@ -434,7 +434,7 @@ partial class DistributedConnection
void IIPEventRenamed(uint resourceId, string name)
{
Fetch(resourceId).Then(resource =>
Fetch(resourceId, null).Then(resource =>
{
resource.Instance.Variables["name"] = name;
});
@ -443,7 +443,7 @@ partial class DistributedConnection
void IIPEventAttributesUpdated(uint resourceId, byte[] attributes)
{
Fetch(resourceId).Then(resource =>
Fetch(resourceId, null).Then(resource =>
{
var attrs = attributes.GetStringArray(0, (uint)attributes.Length);
@ -724,19 +724,19 @@ partial class DistributedConnection
return;
}
DataDeserializer.ListParser(content, offset, cl, this).Then(parameters =>
DataDeserializer.ListParser(content, offset, cl, this, null).Then(parameters =>
{
offset += cl;
cl = content.GetUInt32(offset, Endian.Little);
//Codec.ParseStructure(content, offset, cl, this).Then(attributes =>
DataDeserializer.TypedMapParser(content, offset, cl, this).Then(attributes =>
DataDeserializer.TypedMapParser(content, offset, cl, this, null).Then(attributes =>
{
offset += cl;
cl = (uint)content.Length - offset;
//Codec.ParseStructure(content, offset, cl, this).Then(values =>
DataDeserializer.TypedMapParser(content, offset, cl, this).Then(values =>
DataDeserializer.TypedMapParser(content, offset, cl, this, null).Then(values =>
{
#if NETSTANDARD
@ -1049,7 +1049,7 @@ partial class DistributedConnection
}
DataDeserializer.TypedMapParser(attributes, 0, (uint)attributes.Length, this).Then(attrs =>
DataDeserializer.TypedMapParser(attributes, 0, (uint)attributes.Length, this, null).Then(attrs =>
{
if (r.Instance.SetAttributes((Map<string, object>)attrs, clearAttributes))
SendReply(clearAttributes ? IIPPacket.IIPPacketAction.ClearAllAttributes : IIPPacket.IIPPacketAction.ClearAttributes,
@ -1232,11 +1232,11 @@ partial class DistributedConnection
return;
}
var (_, parsed) = Codec.Parse(content, 0, this, transmissionType);
var (_, parsed) = Codec.Parse(content, 0, this, null, transmissionType);
parsed.Then(results =>
{
var arguments = (Map<byte, object>)results ;// (object[])results;
var arguments = (Map<byte, object>)results;// (object[])results;
// un hold the socket to send data immediately
this.Socket.Unhold();
@ -1281,14 +1281,14 @@ partial class DistributedConnection
{
if (pis.Last().ParameterType == typeof(DistributedConnection))
{
for(byte i = 0; i< pis.Length - 1; i++)
args[i] = arguments.ContainsKey(i) ?
for (byte i = 0; i < pis.Length - 1; i++)
args[i] = arguments.ContainsKey(i) ?
DC.CastConvert(arguments[i], pis[i].ParameterType) : Type.Missing;
args[args.Length - 1] = this;
}
else
{
for (byte i = 0; i < pis.Length ; i++)
for (byte i = 0; i < pis.Length; i++)
args[i] = arguments.ContainsKey(i) ?
DC.CastConvert(arguments[i], pis[i].ParameterType) : Type.Missing;
}
@ -1331,13 +1331,13 @@ partial class DistributedConnection
(rt as Task).ContinueWith(t =>
{
#if NETSTANDARD
var res = t.GetType().GetTypeInfo().GetProperty("Result").GetValue(t);
var res = t.GetType().GetTypeInfo().GetProperty("Result").GetValue(t);
#else
var res = t.GetType().GetProperty("Result").GetValue(t);
#endif
SendReply(IIPPacket.IIPPacketAction.InvokeFunction, callback)
.AddUInt8Array(Codec.Compose(res, this))
.Done();
SendReply(IIPPacket.IIPPacketAction.InvokeFunction, callback)
.AddUInt8Array(Codec.Compose(res, this))
.Done();
});
//await t;
@ -1631,7 +1631,7 @@ partial class DistributedConnection
var pt = r.Instance.Template.GetPropertyTemplateByIndex(index);
if (pt != null)
{
var (_, parsed) = Codec.Parse(content, 0, this, transmissionType);
var (_, parsed) = Codec.Parse(content, 0, this, null, transmissionType);
parsed.Then((value) =>
{
if (r is DistributedResource)
@ -2000,15 +2000,14 @@ partial class DistributedConnection
/// <param name="classId">Class GUID</param>
/// <param name="id">Resource Id</param>Guid classId
/// <returns>DistributedResource</returns>
public AsyncReply<DistributedResource> Fetch(uint id)
public AsyncReply<DistributedResource> Fetch(uint id, uint[] requestSequence)
{
var resource = resources[id];
var request = resourceRequests[id];
if (request != null)
{
if (resource != null)
// dig for dead locks // or not
if (resource != null && (requestSequence?.Contains(id) ?? false))
return new AsyncReply<DistributedResource>(resource);
else
return request;
@ -2022,17 +2021,28 @@ partial class DistributedConnection
var reply = new AsyncReply<DistributedResource>();
resourceRequests.Add(id, reply);
var newSequence = requestSequence != null ? requestSequence.Concat(new uint[] { id }).ToArray() : new uint[] { id };
SendRequest(IIPPacket.IIPPacketAction.AttachResource)
.AddUInt32(id)
.Done()
.Then((rt) =>
{
if (rt == null)
{
reply.TriggerError(new AsyncException(ErrorType.Management,
(ushort)ExceptionCode.ResourceNotFound, "Null response"));
return;
}
DistributedResource dr;
TypeTemplate template = null;
Guid classId = (Guid)rt[0];
if (resource == null)
{
var template = Warehouse.GetTemplateByClassId((Guid)rt[0], TemplateType.Wrapper);
template = Warehouse.GetTemplateByClassId(classId, TemplateType.Wrapper);
if (template?.DefinedType != null)
dr = Activator.CreateInstance(template.DefinedType, this, id, (ulong)rt[1], (string)rt[2]) as DistributedResource;
else
@ -2044,57 +2054,64 @@ partial class DistributedConnection
var transmissionType = (TransmissionType)rt[3];
var content = (byte[])rt[4];
GetTemplate((Guid)rt[0]).Then((tmp) =>
var initResource = (DistributedResource ok) =>
{
var (_, parsed) = Codec.Parse(content, 0, this, newSequence, transmissionType);
parsed.Then(results =>
{
var ar = results as object[];
var pvs = new List<PropertyValue>();
for (var i = 0; i < ar.Length; i += 3)
pvs.Add(new PropertyValue(ar[i + 2], (ulong?)ar[i], (DateTime?)ar[i + 1]));
dr._Attach(pvs.ToArray());// (PropertyValue[])pvs);
resourceRequests.Remove(id);
reply.Trigger(dr);
}).Error(ex => reply.TriggerError(ex));
};
if (template == null)
{
GetTemplate((Guid)rt[0]).Then((tmp) =>
{
// ClassId, ResourceAge, ResourceLink, Content
if (resource == null)
{
Warehouse.Put(id.ToString(), dr, this, null, tmp).Then(initResource).Error(ex => reply.TriggerError(ex));
}
else
{
initResource(resource);
}
}).Error((ex) =>
{
reply.TriggerError(ex);
});
}
else
{
// ClassId, ResourceAge, ResourceLink, Content
if (resource == null)
{
Warehouse.Put(id.ToString(), dr, this, null, tmp).Then((ok) =>
{
var (_, parsed) = Codec.Parse(content, 0, this, transmissionType);
parsed.Then(results =>
{
var ar = results as object[];
var pvs = new List<PropertyValue>();
for (var i = 0; i < ar.Length; i += 3)
pvs.Add(new PropertyValue(ar[i + 2], (ulong?)ar[i], (DateTime?)ar[i + 1]));
dr._Attach(pvs.ToArray());// (PropertyValue[])pvs);
resourceRequests.Remove(id);
reply.Trigger(dr);
}).Error(ex => reply.TriggerError(ex));
}).Error(ex => reply.TriggerError(ex));
Warehouse.Put(id.ToString(), dr, this, null, template)
.Then(initResource).Error((ex) => reply.TriggerError(ex));
}
else
{
var (_, parsed) = Codec.Parse(content, 0, this, transmissionType);
parsed.Then((results) =>
{
var ar = results as object[];
var pvs = new List<PropertyValue>();
for (var i = 0; i < ar.Length; i += 3)
pvs.Add(new PropertyValue(ar[i + 2], (ulong?)ar[i], (DateTime?)ar[i + 1]));
dr._Attach(pvs.ToArray());// (PropertyValue[])pvs);
// resourceRequests.Remove(id);
reply.Trigger(dr);
}).Error(ex => reply.TriggerError(ex));
initResource(resource);
}
}).Error((ex) =>
{
reply.TriggerError(ex);
});
}
}).Error((ex) =>
{
reply.TriggerError(ex);
});
return reply;
}
@ -2111,7 +2128,7 @@ partial class DistributedConnection
var dataType = (TransmissionType)ar[0];
var data = (byte[])ar[1];
var (_, parsed) = Codec.Parse(data, dataType.Offset, this, dataType);
var (_, parsed) = Codec.Parse(data, dataType.Offset, this, null, dataType);
parsed.Then(resources => rt.Trigger(resources))
.Error(ex => rt.TriggerError(ex));
@ -2136,7 +2153,7 @@ partial class DistributedConnection
{
var dataType = (TransmissionType)ar[0];
var data = (byte[])ar[1];
var (_, parsed) = Codec.Parse(data, dataType.Offset, this, dataType);
var (_, parsed) = Codec.Parse(data, dataType.Offset, this, null, dataType);
parsed.Then(resources => rt.Trigger(resources))
.Error(ex => rt.TriggerError(ex));
@ -2204,7 +2221,7 @@ partial class DistributedConnection
var dataType = (TransmissionType)ar[0];
var data = (byte[])ar[1];
//Codec.Parse(d, )
var (_, parsed) = Codec.Parse(data, 0, this, dataType);
var (_, parsed) = Codec.Parse(data, 0, this, null, dataType);
parsed.Then(st =>
{
@ -2227,7 +2244,7 @@ partial class DistributedConnection
var dataType = (TransmissionType)ar[0];
var data = (byte[])ar[1];
var (_, parsed) = Codec.Parse(data, 0, this, dataType);
var (_, parsed) = Codec.Parse(data, 0, this, null, dataType);
parsed.Then(st =>
{
@ -2268,7 +2285,7 @@ partial class DistributedConnection
{
var content = (byte[])rt[0];
DataDeserializer.HistoryParser(content, 0, (uint)content.Length, resource, this)
DataDeserializer.HistoryParser(content, 0, (uint)content.Length, resource, this, null)
.Then((history) => reply.Trigger(history));
}).Error((ex) => reply.TriggerError(ex));
@ -2298,7 +2315,7 @@ partial class DistributedConnection
var dataType = (TransmissionType)ar[0];
var data = ar[1] as byte[];
var (_, parsed) = Codec.Parse(data, dataType.Offset, this, dataType);
var (_, parsed) = Codec.Parse(data, dataType.Offset, this, null, dataType);
parsed.Then(resources => reply.Trigger(resources))
.Error(ex => reply.TriggerError(ex));
@ -2342,7 +2359,7 @@ partial class DistributedConnection
{
var rid = (uint)args[0];
Fetch(rid).Then((r) =>
Fetch(rid, null).Then((r) =>
{
reply.Trigger(r);
});