2
0
mirror of https://github.com/esiur/esiur-dotnet.git synced 2025-10-31 07:51:36 +00:00

Pull Stream

This commit is contained in:
2025-10-30 12:45:29 +03:00
parent a63a9b5511
commit 746f12320e
25 changed files with 1637 additions and 704 deletions

View File

@@ -1,5 +1,6 @@
using Esiur.Core;
using Esiur.Data;
using Esiur.Data.GVWIE;
using Esiur.Misc;
using Esiur.Net.IIP;
using Esiur.Resource;
@@ -742,8 +743,8 @@ public static class DataDeserializer
{
// get key type
var (keyCs, keyRepType) = RepresentationType.Parse(tdu.Metadata, 0);
var (valueCs, valueRepType) = RepresentationType.Parse(tdu.Metadata, keyCs);
var (keyCs, keyRepType) = TRU.Parse(tdu.Metadata, 0);
var (valueCs, valueRepType) = TRU.Parse(tdu.Metadata, keyCs);
var wh = connection.Instance.Warehouse;
@@ -792,8 +793,8 @@ public static class DataDeserializer
{
// get key type
var (keyCs, keyRepType) = RepresentationType.Parse(tdu.Metadata, 0);
var (valueCs, valueRepType) = RepresentationType.Parse(tdu.Metadata, keyCs);
var (keyCs, keyRepType) = TRU.Parse(tdu.Metadata, 0);
var (valueCs, valueRepType) = TRU.Parse(tdu.Metadata, keyCs);
var map = (IMap)Activator.CreateInstance(typeof(Map<,>).MakeGenericType(keyRepType.GetRuntimeType(warehouse), valueRepType.GetRuntimeType(warehouse)));
@@ -838,7 +839,7 @@ public static class DataDeserializer
uint mtOffset = 1;
for (var i = 0; i < tupleSize; i++)
{
var (cs, rep) = RepresentationType.Parse(tdu.Metadata, mtOffset);
var (cs, rep) = TRU.Parse(tdu.Metadata, mtOffset);
types.Add(rep.GetRuntimeType(connection.Instance.Warehouse));
mtOffset += cs;
}
@@ -913,7 +914,7 @@ public static class DataDeserializer
uint mtOffset = 1;
for (var i = 0; i < tupleSize; i++)
{
var (cs, rep) = RepresentationType.Parse(tdu.Metadata, mtOffset);
var (cs, rep) = TRU.Parse(tdu.Metadata, mtOffset);
types.Add(rep.GetRuntimeType(warehouse));
mtOffset += cs;
}
@@ -975,111 +976,152 @@ public static class DataDeserializer
public static AsyncReply TypedListParserAsync(ParsedTDU tdu, DistributedConnection connection, uint[] requestSequence)
{
var rt = new AsyncBag<object>();
// get the type
var (hdrCs, rep) = RepresentationType.Parse(tdu.Metadata, 0);
var (hdrCs, rep) = TRU.Parse(tdu.Metadata, 0);
var runtimeType = rep.GetRuntimeType(connection.Instance.Warehouse);
rt.ArrayType = runtimeType;
ParsedTDU current;
ParsedTDU? previous = null;
var offset = tdu.Offset;
var length = tdu.ContentLength;
var ends = offset + (uint)length;
while (length > 0)
switch (rep.Identifier)
{
case TRUIdentifier.Int32:
return new AsyncReply(GroupInt32Codec.Decode(tdu.Data.AsSpan(
(int)tdu.Offset, (int)tdu.ContentLength)));
case TRUIdentifier.Int64:
return new AsyncReply(GroupInt64Codec.Decode(tdu.Data.AsSpan(
(int)tdu.Offset, (int)tdu.ContentLength)));
case TRUIdentifier.Int16:
return new AsyncReply(GroupInt16Codec.Decode(tdu.Data.AsSpan(
(int)tdu.Offset, (int)tdu.ContentLength)));
case TRUIdentifier.UInt32:
return new AsyncReply(GroupUInt32Codec.Decode(tdu.Data.AsSpan(
(int)tdu.Offset, (int)tdu.ContentLength)));
case TRUIdentifier.UInt64:
return new AsyncReply(GroupUInt64Codec.Decode(tdu.Data.AsSpan(
(int)tdu.Offset, (int)tdu.ContentLength)));
case TRUIdentifier.UInt16:
return new AsyncReply(GroupUInt16Codec.Decode(tdu.Data.AsSpan(
(int)tdu.Offset, (int)tdu.ContentLength)));
default:
current = ParsedTDU.Parse(tdu.Data, offset, ends);
var rt = new AsyncBag<object>();
if (current.Class == TDUClass.Invalid)
throw new Exception("Unknown type.");
var runtimeType = rep.GetRuntimeType(connection.Instance.Warehouse);
rt.ArrayType = runtimeType;
ParsedTDU current;
ParsedTDU? previous = null;
var offset = tdu.Offset;
var length = tdu.ContentLength;
var ends = offset + (uint)length;
while (length > 0)
{
current = ParsedTDU.Parse(tdu.Data, offset, ends);
if (current.Class == TDUClass.Invalid)
throw new Exception("Unknown type.");
if (current.Identifier == TDUIdentifier.TypeContinuation)
{
current.Class = previous.Value.Class;
current.Identifier = previous.Value.Identifier;
current.Metadata = previous.Value.Metadata;
}
if (current.Identifier == TDUIdentifier.TypeContinuation)
{
current.Class = previous.Value.Class;
current.Identifier = previous.Value.Identifier;
current.Metadata = previous.Value.Metadata;
}
var (cs, reply) = Codec.ParseAsync(tdu.Data, offset, connection, requestSequence);
var (cs, reply) = Codec.ParseAsync(tdu.Data, offset, connection, requestSequence);
rt.Add(reply);
rt.Add(reply);
if (cs > 0)
{
offset += (uint)cs;
length -= (uint)cs;
}
else
throw new Exception("Error while parsing structured data");
if (cs > 0)
{
offset += (uint)cs;
length -= (uint)cs;
}
else
throw new Exception("Error while parsing structured data");
}
rt.Seal();
return rt;
}
rt.Seal();
return rt;
}
public static object TypedListParser(ParsedTDU tdu, Warehouse warehouse)
{
// get the type
var (hdrCs, rep) = RepresentationType.Parse(tdu.Metadata, 0);
var (hdrCs, rep) = TRU.Parse(tdu.Metadata, 0);
//offset += hdrCs;
//length -= hdrCs;
var runtimeType = rep.GetRuntimeType(warehouse);
var list = new List<object>();
ParsedTDU current;
ParsedTDU? previous = null;
var offset = tdu.Offset;
var length = tdu.ContentLength;
var ends = offset + (uint)length;
while (length > 0)
switch (rep.Identifier)
{
current = ParsedTDU.Parse(tdu.Data, offset, ends);
if (current.Class == TDUClass.Invalid)
throw new Exception("Unknown type.");
case TRUIdentifier.Int32:
return GroupInt32Codec.Decode(tdu.Data.AsSpan(
(int)tdu.Offset, (int)tdu.ContentLength));
case TRUIdentifier.Int64:
return GroupInt64Codec.Decode(tdu.Data.AsSpan(
(int)tdu.Offset, (int)tdu.ContentLength));
case TRUIdentifier.Int16:
return GroupInt16Codec.Decode(tdu.Data.AsSpan(
(int)tdu.Offset, (int)tdu.ContentLength));
case TRUIdentifier.UInt32:
return GroupUInt32Codec.Decode(tdu.Data.AsSpan(
(int)tdu.Offset, (int)tdu.ContentLength));
case TRUIdentifier.UInt64:
return GroupUInt64Codec.Decode(tdu.Data.AsSpan(
(int)tdu.Offset, (int)tdu.ContentLength));
case TRUIdentifier.UInt16:
return GroupUInt16Codec.Decode(tdu.Data.AsSpan(
(int)tdu.Offset, (int)tdu.ContentLength));
default:
if (current.Identifier == TDUIdentifier.TypeContinuation)
{
current.Class = previous.Value.Class;
current.Identifier = previous.Value.Identifier;
current.Metadata = previous.Value.Metadata;
}
var list = new List<object>();
var (cs, reply) = Codec.ParseSync(current, warehouse);
ParsedTDU current;
ParsedTDU? previous = null;
list.Add(reply);
var offset = tdu.Offset;
var length = tdu.ContentLength;
var ends = offset + (uint)length;
if (cs > 0)
{
offset += (uint)cs;
length -= (uint)cs;
previous = current;
}
else
throw new Exception("Error while parsing structured data");
while (length > 0)
{
current = ParsedTDU.Parse(tdu.Data, offset, ends);
if (current.Class == TDUClass.Invalid)
throw new Exception("Unknown type.");
if (current.Identifier == TDUIdentifier.TypeContinuation)
{
current.Class = previous.Value.Class;
current.Identifier = previous.Value.Identifier;
current.Metadata = previous.Value.Metadata;
}
var (cs, reply) = Codec.ParseSync(current, warehouse);
list.Add(reply);
if (cs > 0)
{
offset += (uint)cs;
length -= (uint)cs;
previous = current;
}
else
throw new Exception("Error while parsing structured data");
}
var runtimeType = rep.GetRuntimeType(warehouse);
var rt = Array.CreateInstance(runtimeType, list.Count);
Array.Copy(list.ToArray(), rt, rt.Length);
return rt;
}
var rt = Array.CreateInstance(runtimeType, list.Count);
Array.Copy(list.ToArray(), rt, rt.Length);
return rt;
}
public static AsyncBag<PropertyValue> PropertyValueArrayParserAsync(byte[] data, uint offset, uint length, DistributedConnection connection, uint[] requestSequence)//, bool ageIncluded = true)