2
0
mirror of https://github.com/esiur/esiur-dotnet.git synced 2025-09-13 12:43:17 +00:00
This commit is contained in:
2025-08-13 11:42:34 +03:00
parent 24a2eadec5
commit c475f403ce
6 changed files with 145 additions and 88 deletions

View File

@@ -38,8 +38,8 @@ interface IAsyncBag
public class AsyncBag<T> : AsyncReply, IAsyncBag
{
protected List<AsyncReply> replies = new List<AsyncReply>();
List<T> results = new();
protected List<object> replies = new List<object>();
//List<T> results = new();
int count = 0;
bool sealedBag = false;
@@ -75,62 +75,54 @@ public class AsyncBag<T> : AsyncReply, IAsyncBag
sealedBag = true;
if (results.Count == 0)
var results = ArrayType == null ? new T[replies.Count]
: Array.CreateInstance(ArrayType, replies.Count);
if (replies.Count == 0)
{
if (ArrayType != null)
{
var ar = Array.CreateInstance(ArrayType, 0);
Trigger(ar);
}
else
{
Trigger(new object[0]);
}
Trigger(results);
return;
}
for (var i = 0; i < results.Count; i++)
//foreach(var reply in results.Keys)
for (var i = 0; i < replies.Count; i++)
{
var k = replies[i];// results.Keys.ElementAt(i);
var k = replies[i];
var index = i;
k.Then((r) =>
if (k is AsyncReply)
{
results[index] = (T)r;
(k as AsyncReply).Then((r) =>
{
results.SetValue(r, i);
count++;
if (count == results.Count)
{
if (ArrayType != null)
{
try
{
// @TODO: Safe casting check
var ar = Array.CreateInstance(ArrayType, count);
for (var i = 0; i < count; i++)
ar.SetValue(results[i], i);
Trigger(ar);
}
catch
{
Trigger(results.ToArray());
}
if (count == replies.Count)
Trigger(results);
});
}
else
Trigger(results.ToArray());
{
results.SetValue(replies[i], i);
count++;
if (count == replies.Count)
Trigger(results);
}
});
}
}
public void Add(AsyncReply reply)
public void Add(object valueOrReply)
{
if (!sealedBag)
{
results.Add(default(T));
replies.Add(reply);
//if (valueOrReply is AsyncReply)
//{
// results.Add(default(T));
replies.Add(valueOrReply);
//}
}
}
public void AddBag(AsyncBag<T> bag)
{
foreach (var r in bag.replies)

View File

@@ -39,42 +39,46 @@ namespace Esiur.Data;
public static class Codec
{
delegate AsyncReply AsyncParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence);
//delegate AsyncReply AsyncParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence);
delegate object SyncParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence);
static AsyncParser[][] FixedParsers = new AsyncParser[][]
static SyncParser[][] FixedParsers = new SyncParser[][]
{
new AsyncParser[]{
new SyncParser[]{
DataDeserializer.NullParser,
DataDeserializer.BooleanFalseParser,
DataDeserializer.BooleanTrueParser,
DataDeserializer.NotModifiedParser,
},
new AsyncParser[]{
new SyncParser[]{
DataDeserializer.ByteParser,
DataDeserializer.SByteParser,
DataDeserializer.Char8Parser,
DataDeserializer.LocalResourceParser8,
DataDeserializer.ResourceParser8,
},
new AsyncParser[]{
new SyncParser[]{
DataDeserializer.Int16Parser,
DataDeserializer.UInt16Parser,
DataDeserializer.Char16Parser,
DataDeserializer.LocalResourceParser16,
DataDeserializer.ResourceParser16,
},
new AsyncParser[]{
new SyncParser[]{
DataDeserializer.Int32Parser,
DataDeserializer.UInt32Parser,
DataDeserializer.Float32Parser,
DataDeserializer.ResourceParser,
DataDeserializer.LocalResourceParser,
DataDeserializer.LocalResourceParser32,
DataDeserializer.ResourceParser32,
},
new AsyncParser[]{
new SyncParser[]{
DataDeserializer.Int64Parser,
DataDeserializer.UInt64Parser,
DataDeserializer.Float64Parser,
DataDeserializer.DateTimeParser,
},
new AsyncParser[]
new SyncParser[]
{
DataDeserializer.Int128Parser, // int 128
DataDeserializer.UInt128Parser, // uint 128
@@ -82,7 +86,7 @@ public static class Codec
}
};
static AsyncParser[] DynamicParsers = new AsyncParser[]
static SyncParser[] DynamicParsers = new SyncParser[]
{
DataDeserializer.RawDataParser,
DataDeserializer.StringParser,
@@ -91,7 +95,7 @@ public static class Codec
DataDeserializer.RecordListParser,
};
static AsyncParser[] TypedParsers = new AsyncParser[]
static SyncParser[] TypedParsers = new SyncParser[]
{
DataDeserializer.RecordParser,
DataDeserializer.TypedListParser,
@@ -111,7 +115,7 @@ public static class Codec
/// <param name="connection">DistributedConnection is required in case a structure in the array holds items at the other end.</param>
/// <param name="dataType">DataType, in case the data is not prepended with DataType</param>
/// <returns>Value</returns>
public static (uint, AsyncReply) ParseAsync(byte[] data, uint offset, DistributedConnection connection, uint[] requestSequence, TransmissionType? dataType = null)
public static (uint, object) ParseAsync(byte[] data, uint offset, DistributedConnection connection, uint[] requestSequence, TransmissionType? dataType = null)
{
uint len = 0;
@@ -144,10 +148,6 @@ public static class Codec
}
}
public static uint ParseSync(byte[] data, uint offset, TransmissionType? dataType = null)
{
}
/// <summary>
/// Check if a resource is local to a given connection.

View File

@@ -72,7 +72,7 @@ public static class DataDeserializer
return *(int*)ptr;
}
public static unsafe uint UInt32Parser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence)
public static unsafe object UInt32Parser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence)
{
fixed (byte* ptr = &data[offset])
return *(uint*)ptr;
@@ -129,16 +129,55 @@ public static class DataDeserializer
}
public static unsafe object ResourceParser8(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence)
{
if (connection == null)
return new ResourceId(false, data[offset]);
else
return connection.Fetch(data[offset], requestSequence);
}
public static unsafe object ResourceParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence)
public static unsafe object LocalResourceParser8(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence)
{
if (connection == null)
return new ResourceId(true, data[offset]);
else
return Warehouse.GetById(data[offset]);
}
public static unsafe object ResourceParser16(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence)
{
fixed (byte* ptr = &data[offset])
if (connection == null)
return new ResourceId(false, *(ushort*)ptr);
else
return connection.Fetch(*(ushort*)ptr, requestSequence);
}
public static unsafe object LocalResourceParser16(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence)
{
fixed (byte* ptr = &data[offset])
if (connection == null)
return new ResourceId(true, *(ushort*)ptr);
else
return Warehouse.GetById(*(ushort*)ptr);
}
public static unsafe object ResourceParser32(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence)
{
fixed (byte* ptr = &data[offset])
if (connection == null)
return new ResourceId(false, *(uint*)ptr);
else
return connection.Fetch(*(uint*)ptr, requestSequence);
}
public static unsafe object LocalResourceParser(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence)
public static unsafe object LocalResourceParser32(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence)
{
fixed (byte* ptr = &data[offset])
if (connection == null)
return new ResourceId(true, *(uint*)ptr);
else
return Warehouse.GetById(*(uint*)ptr);
}

18
Esiur/Data/ResourceId.cs Normal file
View File

@@ -0,0 +1,18 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace Esiur.Data
{
public struct ResourceId
{
public bool Local;
public uint Id;
public ResourceId(bool local, uint id)
{
this.Id = id;
this.Local = local;
}
}
}

View File

@@ -14,21 +14,31 @@ public enum TransmissionTypeIdentifier : byte
UInt8 = 0x8,
Int8 = 0x9,
Char8 = 0xA,
Int16 = 0x10,
UInt16 = 0x11,
LocalResource8 = 0xB,
RemoteResource8 = 0xC,
LocalProcedure8 = 0xD,
RemoteProcedure8 = 0xE,
UInt16 = 0x10,
Int16 = 0x11,
Char16 = 0x12,
Int32 = 0x18,
UInt32 = 0x19,
LocalResource16 = 0x13,
RemoteResource16 = 0x14,
LocalProcedure16 = 0x15,
RemoteProcedure16 = 0x16,
UInt32 = 0x18,
Int32 = 0x19,
Float32 = 0x1A,
Resource = 0x1B,
ResourceLocal = 0x1C,
Int64 = 0x20,
UInt64 = 0x21,
LocalResource32 = 0x1B,
RemoteResource32 = 0x1C,
LocalProcedure32 = 0x1D,
RemoteProcedure32 = 0x1E,
UInt64 = 0x20,
Int64 = 0x21,
Float64 = 0x22,
DateTime = 0x23,
Int128 = 0x28,
UInt128 = 0x29,
Float128 = 0x2A,
UInt128 = 0x28,
Int128 = 0x29,
Decimal128 = 0x2A,
RawData = 0x40,
String = 0x41,
@@ -37,17 +47,15 @@ public enum TransmissionTypeIdentifier : byte
RecordList = 0x44,
Map = 0x45,
MapList = 0x46,
//Tuple = 0x47,
Record = 0x80,
TypedList = 0x81,
TypedMap = 0x82,
Tuple = 0x83,
Enum = 0x84,
Constant = 0x85
//TypedResourceList = 0x81,
//TypedRecordList = 0x82,
TypedTuple = 0x83,
TypedEnum = 0x84,
TypedConstant = 0x85,
ResourceLink = 0xC0
}
public enum TransmissionTypeClass

View File

@@ -2152,7 +2152,7 @@ partial class DistributedConnection
var dataType = (TransmissionType)ar[0];
var data = (byte[])ar[1];
var (_, parsed) = Codec.Parse(data, dataType.Offset, this, null, dataType);
var (_, parsed) = Codec.ParseAsync(data, dataType.Offset, this, null, dataType);
parsed.Then(resources => rt.Trigger(resources))
.Error(ex => rt.TriggerError(ex));