2
0
mirror of https://github.com/esiur/esiur-dotnet.git synced 2025-09-13 20:43:19 +00:00
This commit is contained in:
2025-08-26 06:12:59 +03:00
parent 4a7070ee6e
commit dde86c9af7
8 changed files with 60 additions and 45 deletions

View File

@@ -86,14 +86,15 @@ public class AsyncBag<T> : AsyncReply, IAsyncBag
for (var i = 0; i < replies.Count; i++) for (var i = 0; i < replies.Count; i++)
{ {
var k = replies[i]; var k = replies[i];
var index = i; var index = i;
if (k is AsyncReply) if (k is AsyncReply reply)
{ {
(k as AsyncReply).Then((r) => reply.Then((r) =>
{ {
results.SetValue(r, i); results.SetValue(r, index);
count++; count++;
if (count == replies.Count) if (count == replies.Count)
Trigger(results); Trigger(results);
@@ -101,7 +102,7 @@ public class AsyncBag<T> : AsyncReply, IAsyncBag
} }
else else
{ {
results.SetValue(replies[i], i); results.SetValue(replies[i], index);
count++; count++;
if (count == replies.Count) if (count == replies.Count)
Trigger(results); Trigger(results);

View File

@@ -60,30 +60,31 @@ public static class Codec
DataDeserializer.ResourceParser8Async, DataDeserializer.ResourceParser8Async,
}, },
new AsyncParser[]{ new AsyncParser[]{
DataDeserializer.Int16ParserAsync,
DataDeserializer.UInt16ParserAsync, DataDeserializer.UInt16ParserAsync,
DataDeserializer.Int16ParserAsync,
DataDeserializer.Char16ParserAsync, DataDeserializer.Char16ParserAsync,
DataDeserializer.LocalResourceParser16Async, DataDeserializer.LocalResourceParser16Async,
DataDeserializer.ResourceParser16Async, DataDeserializer.ResourceParser16Async,
}, },
new AsyncParser[]{ new AsyncParser[]{
DataDeserializer.Int32ParserAsync,
DataDeserializer.UInt32ParserAsync, DataDeserializer.UInt32ParserAsync,
DataDeserializer.Int32ParserAsync,
DataDeserializer.Float32ParserAsync, DataDeserializer.Float32ParserAsync,
DataDeserializer.LocalResourceParser32Async, DataDeserializer.LocalResourceParser32Async,
DataDeserializer.ResourceParser32Async, DataDeserializer.ResourceParser32Async,
}, },
new AsyncParser[]{ new AsyncParser[]{
DataDeserializer.Int64ParserAsync,
DataDeserializer.UInt64ParserAsync, DataDeserializer.UInt64ParserAsync,
DataDeserializer.Int64ParserAsync,
DataDeserializer.Float64ParserAsync, DataDeserializer.Float64ParserAsync,
DataDeserializer.DateTimeParserAsync, DataDeserializer.DateTimeParserAsync,
}, },
new AsyncParser[] new AsyncParser[]
{ {
DataDeserializer.Int128ParserAsync, // int 128
DataDeserializer.UInt128ParserAsync, // uint 128 DataDeserializer.UInt128ParserAsync, // uint 128
DataDeserializer.Float128ParserAsync, DataDeserializer.Int128ParserAsync, // int 128
DataDeserializer.Decimal128ParserAsync,
DataDeserializer.UUIDParserAsync
} }
}; };
@@ -146,7 +147,8 @@ public static class Codec
{ {
DataDeserializer.UInt128Parser, // uint 128 DataDeserializer.UInt128Parser, // uint 128
DataDeserializer.Int128Parser, // int 128 DataDeserializer.Int128Parser, // int 128
DataDeserializer.Float128Parser, DataDeserializer.Decimal128Parser,
DataDeserializer.UUIDParser
} }
}; };
@@ -197,7 +199,7 @@ public static class Codec
var tt = dataType.Value; var tt = dataType.Value;
Console.WriteLine("Parsing " + tt.Class + " " + tt.Identifier); //Console.WriteLine("Parsing " + tt.Class + " " + tt.Identifier);
if (tt.Class == TransmissionTypeClass.Fixed) if (tt.Class == TransmissionTypeClass.Fixed)
{ {
@@ -302,7 +304,7 @@ public static class Codec
[typeof(List<byte>)] = DataSerializer.RawDataComposerFromList, [typeof(List<byte>)] = DataSerializer.RawDataComposerFromList,
//[typeof(List<byte?>)] = DataSerializer.RawDataComposerFromList, //[typeof(List<byte?>)] = DataSerializer.RawDataComposerFromList,
[typeof(string)] = DataSerializer.StringComposer, [typeof(string)] = DataSerializer.StringComposer,
[typeof(UUID)] = DataSerializer.UUIDComposer,
// Special // Special
[typeof(object[])] = DataSerializer.ListComposer, [typeof(object[])] = DataSerializer.ListComposer,
[typeof(List<object>)] = DataSerializer.ListComposer, [typeof(List<object>)] = DataSerializer.ListComposer,

View File

@@ -169,18 +169,29 @@ public static class DataDeserializer
} }
public static unsafe object Float128ParserAsync(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) public static unsafe object Decimal128ParserAsync(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence)
{ {
fixed (byte* ptr = &data[offset]) fixed (byte* ptr = &data[offset])
return *(decimal*)ptr; return *(decimal*)ptr;
} }
public static unsafe object Float128Parser(byte[] data, uint offset, uint length, Warehouse warehouse) public static unsafe object Decimal128Parser(byte[] data, uint offset, uint length, Warehouse warehouse)
{ {
fixed (byte* ptr = &data[offset]) fixed (byte* ptr = &data[offset])
return *(decimal*)ptr; return *(decimal*)ptr;
} }
public static unsafe object UUIDParserAsync(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence)
{
return new UUID(data, offset);
}
public static unsafe object UUIDParser(byte[] data, uint offset, uint length, Warehouse warehouse)
{
return new UUID(data, offset);
}
public static unsafe object Int128ParserAsync(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence) public static unsafe object Int128ParserAsync(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence)
{ {
@@ -962,6 +973,8 @@ public static class DataDeserializer
{ {
var (cs, reply) = Codec.ParseAsync(data, offset, connection, requestSequence); var (cs, reply) = Codec.ParseAsync(data, offset, connection, requestSequence);
rt.Add(reply); rt.Add(reply);
if (cs > 0) if (cs > 0)

View File

@@ -114,8 +114,6 @@ public static class DataSerializer
public static (TransmissionTypeIdentifier, byte[]) EnumComposer(object value, DistributedConnection connection) public static (TransmissionTypeIdentifier, byte[]) EnumComposer(object value, DistributedConnection connection)
{ {
Console.WriteLine(value.GetType().Name);
if (value == null) if (value == null)
return (TransmissionTypeIdentifier.Null, new byte[0]); return (TransmissionTypeIdentifier.Null, new byte[0]);
@@ -391,6 +389,12 @@ public static class DataSerializer
return (TransmissionTypeIdentifier.Map, rt.ToArray()); return (TransmissionTypeIdentifier.Map, rt.ToArray());
} }
public static unsafe (TransmissionTypeIdentifier, byte[]) UUIDComposer(object value, DistributedConnection connection)
{
return (TransmissionTypeIdentifier.UUID, ((UUID)value).Data);
}
public static unsafe (TransmissionTypeIdentifier, byte[]) RecordComposer(object value, DistributedConnection connection) public static unsafe (TransmissionTypeIdentifier, byte[]) RecordComposer(object value, DistributedConnection connection)
{ {
var rt = new List<byte>();// BinaryList(); var rt = new List<byte>();// BinaryList();

View File

@@ -39,6 +39,7 @@ public enum TransmissionTypeIdentifier : byte
UInt128 = 0x28, UInt128 = 0x28,
Int128 = 0x29, Int128 = 0x29,
Decimal128 = 0x2A, Decimal128 = 0x2A,
UUID = 0x2B,
RawData = 0x40, RawData = 0x40,
String = 0x41, String = 0x41,

View File

@@ -385,7 +385,7 @@ public partial class DistributedConnection : NetworkConnection, IStore
SendRequest(IIPPacketRequest.KeepAlive, now, interval) SendRequest(IIPPacketRequest.KeepAlive, now, interval)
.Then(x => .Then(x =>
{ {
Jitter = (uint)x; Jitter = (uint)((object[])x)[1];
keepAliveTimer.Start(); keepAliveTimer.Start();
}).Error(ex => }).Error(ex =>
{ {
@@ -430,7 +430,7 @@ public partial class DistributedConnection : NetworkConnection, IStore
if (packet.DataType == null) if (packet.DataType == null)
return offset; return offset;
Console.WriteLine("Incoming: " + packet); //Console.WriteLine("Incoming: " + packet);
if (packet.Method == IIPPacketMethod.Notification) if (packet.Method == IIPPacketMethod.Notification)
{ {
var dt = packet.DataType.Value; var dt = packet.DataType.Value;

View File

@@ -78,7 +78,7 @@ partial class DistributedConnection
/// <returns></returns> /// <returns></returns>
AsyncReply SendRequest(IIPPacketRequest action, params object[] args) AsyncReply SendRequest(IIPPacketRequest action, params object[] args)
{ {
var reply = new AsyncReply<object[]>(); var reply = new AsyncReply();
var c = callbackCounter++; // avoid thread racing var c = callbackCounter++; // avoid thread racing
requests.Add(c, reply); requests.Add(c, reply);
@@ -547,7 +547,7 @@ partial class DistributedConnection
r.Instance.Age, r.Instance.Age,
r.Instance.Link, r.Instance.Link,
r.Instance.Hops, r.Instance.Hops,
dr._Serialize(), this); dr._Serialize());
} }
else else
{ {
@@ -557,7 +557,7 @@ partial class DistributedConnection
r.Instance.Age, r.Instance.Age,
r.Instance.Link, r.Instance.Link,
r.Instance.Hops, r.Instance.Hops,
r.Instance.Serialize(), this); r.Instance.Serialize());
} }
// subscribe // subscribe
@@ -852,7 +852,7 @@ partial class DistributedConnection
var (_, value) = Codec.ParseSync(data, 0, Instance.Warehouse, dataType); var (_, value) = Codec.ParseSync(data, 0, Instance.Warehouse, dataType);
var classId = new UUID((byte[])value); var classId = (UUID)value;
var t = Instance.Warehouse.GetTemplateByClassId(classId); var t = Instance.Warehouse.GetTemplateByClassId(classId);
@@ -1637,11 +1637,11 @@ partial class DistributedConnection
SendRequest(IIPPacketRequest.TemplateFromClassId, classId) SendRequest(IIPPacketRequest.TemplateFromClassId, classId)
.Then((result) => .Then((result) =>
{ {
var args = (object[])result; var tt = TypeTemplate.Parse((byte[])result);
templateRequests.Remove(classId); templateRequests.Remove(classId);
templates.Add(((TypeTemplate)args[0]).ClassId, (TypeTemplate)args[0]); templates.Add(tt.ClassId, tt);
Instance.Warehouse.PutTemplate(args[0] as TypeTemplate); Instance.Warehouse.PutTemplate(tt);
reply.Trigger(args[0]); reply.Trigger(tt);
}).Error((ex) => }).Error((ex) =>
{ {
reply.TriggerError(ex); reply.TriggerError(ex);
@@ -1766,30 +1766,22 @@ partial class DistributedConnection
{ {
if (resource != null && (requestSequence?.Contains(id) ?? false)) if (resource != null && (requestSequence?.Contains(id) ?? false))
{ {
Console.WriteLine("Fetching DL " + id);
// dead lock avoidance for loop reference. // dead lock avoidance for loop reference.
return new AsyncReply<DistributedResource>(resource); return new AsyncReply<DistributedResource>(resource);
} }
else if (resource != null && requestInfo.RequestSequence.Contains(id)) else if (resource != null && requestInfo.RequestSequence.Contains(id))
{ {
Console.WriteLine("Fetching DL2 " + id);
// dead lock avoidance for dependent reference. // dead lock avoidance for dependent reference.
return new AsyncReply<DistributedResource>(resource); return new AsyncReply<DistributedResource>(resource);
} }
else else
{ {
Console.WriteLine("Fetching DL3 " + id);
return requestInfo.Reply; return requestInfo.Reply;
} }
} }
else if (resource != null && !resource.DistributedResourceSuspended) else if (resource != null && !resource.DistributedResourceSuspended)
{ {
// @REVIEW: this should never happen // @REVIEW: this should never happen
Console.WriteLine("Fetching DLWWW " + id);
Global.Log("DCON", LogType.Error, "Resource not moved to attached."); Global.Log("DCON", LogType.Error, "Resource not moved to attached.");
return new AsyncReply<DistributedResource>(resource); return new AsyncReply<DistributedResource>(resource);
@@ -1799,7 +1791,6 @@ partial class DistributedConnection
var reply = new AsyncReply<DistributedResource>(); var reply = new AsyncReply<DistributedResource>();
resourceRequests.Add(id, new DistributedResourceAttachRequestInfo(reply, newSequence)); resourceRequests.Add(id, new DistributedResourceAttachRequestInfo(reply, newSequence));
Console.WriteLine("Fetching " + id);
SendRequest(IIPPacketRequest.AttachResource, id) SendRequest(IIPPacketRequest.AttachResource, id)
.Then((result) => .Then((result) =>
@@ -1819,7 +1810,6 @@ partial class DistributedConnection
var hops = (byte)args[3]; var hops = (byte)args[3];
var pvData = (byte[])args[4]; var pvData = (byte[])args[4];
Console.WriteLine("Fetching CL " + id);
DistributedResource dr; DistributedResource dr;
TypeTemplate template = null; TypeTemplate template = null;
@@ -1846,14 +1836,14 @@ partial class DistributedConnection
parsedReply.Then(results => parsedReply.Then(results =>
{ {
var ar = results as object[]; var pvs = results as PropertyValue[];
var pvs = new List<PropertyValue>(); //var pvs = new List<PropertyValue>();
for (var i = 0; i < ar.Length; i += 3) //for (var i = 0; i < ar.Length; i += 3)
pvs.Add(new PropertyValue(ar[i + 2], Convert.ToUInt64(ar[i]), (DateTime)ar[i + 1])); // pvs.Add(new PropertyValue(ar[i + 2], Convert.ToUInt64(ar[i]), (DateTime)ar[i + 1]));
dr._Attach(pvs.ToArray()); dr._Attach(pvs);
resourceRequests.Remove(id); resourceRequests.Remove(id);
// move from needed to attached. // move from needed to attached.
neededResources.Remove(id); neededResources.Remove(id);
@@ -2085,7 +2075,7 @@ partial class DistributedConnection
jitter = (uint)Math.Abs((int)diff - (int)interval); jitter = (uint)Math.Abs((int)diff - (int)interval);
} }
SendRequest(IIPPacketRequest.KeepAlive, now, jitter); SendReply(IIPPacketReply.Completed, callback, now, jitter);
lastKeepAliveReceived = now; lastKeepAliveReceived = now;
} }

View File

@@ -131,6 +131,10 @@ class IIPPacket : Packet
offset += (uint)size; offset += (uint)size;
} }
else
{
DataType = null;
}
return offset - originalOffset; return offset - originalOffset;
} }