using Esiur.Core;
using System;
using System.Collections.Generic;
using System.Text;
namespace Esiur.Tests.Queueing.Client
{
public static class EsiurQueueEval
{
public sealed record Stats(double Mean, double P50, double P95, double P99, double Max);
public sealed record LatencyDecomposition(Stats ReadinessMs, Stats HolMs, Stats EndToEndMs);
public sealed record ModelValidation(
Stats AbsErrorMs,
double MaxNegativeSlackMs, // worst case where Delivered < predicted (if happens)
int Count);
public sealed record EvalResult(
double Alpha,
double LambdaEventsPerSecond,
double MuEventsPerSecond, // <-- NEW
LatencyDecomposition Latency,
ModelValidation Validation,
Stats QueueLength,
Stats? FlushSizeStats);
///
/// Evaluates Esiur fork-join readiness + in-order resequencing using in-memory items.
/// Assumes items refer to a single ordered stream (per resource queue).
///
public static EvalResult Evaluate(
IReadOnlyList> items,
double flushWindowMs = 0.5,
bool computeFlush = true)
{
if (items == null) throw new ArgumentNullException(nameof(items));
if (items.Count == 0) throw new ArgumentException("items is empty.");
// Ensure deterministic order: prefer Sequence, then Arrival timestamp
var ordered = items
.OrderBy(x => x.Sequence)
.ThenBy(x => x.Arrival)
.ToArray();
int n = ordered.Length;
// Latency components in milliseconds
var readiness = new double[n]; // R = r-a
var hol = new double[n]; // Δ = d-r
var endToEnd = new double[n]; // D = d-a
int resCount = 0;
for (int i = 0; i < n; i++)
{
var e = ordered[i];
double Rms = (e.Ready - e.Arrival).TotalMilliseconds;
double Hms = (e.Delivered - e.Ready).TotalMilliseconds;
double Dms = (e.Delivered - e.Arrival).TotalMilliseconds;
// Robustness against logging placement or clock issues
if (Rms < 0) Rms = 0;
if (Hms < 0) Hms = 0;
if (Dms < 0) Dms = 0;
readiness[i] = Rms;
hol[i] = Hms;
endToEnd[i] = Dms;
if (e.HasResource) resCount++;
}
// α = P(HasResource)
double alpha = (double)resCount / n;
// Effective arrival rate λ̂ from arrival timeline
double lambda = EstimateLambda(ordered);
// Effective departure / readiness rate μ̂ from delivery timeline
double mu = EstimateMu(ordered);
var latency = new LatencyDecomposition(
ReadinessMs: ComputeStats(readiness),
HolMs: ComputeStats(hol),
EndToEndMs: ComputeStats(endToEnd));
// --- Resequencing validation: d_hat_i = max(r_i, d_hat_{i-1}) ---
// Use DateTime ticks for exactness, then convert to ms.
var absErrMs = new double[n];
long prevPredictedTicks = long.MinValue;
double maxNegativeSlackMs = 0;
for (int i = 0; i < n; i++)
{
long ri = ordered[i].Ready.Ticks;
long predicted = (i == 0)
? ri
: Math.Max(ri, prevPredictedTicks);
prevPredictedTicks = predicted;
long observed = ordered[i].Delivered.Ticks;
long errTicks = Math.Abs(observed - predicted);
absErrMs[i] = TicksToMs(errTicks);
// If observed delivery occurs earlier than model predicts (shouldn't, but track it)
long slackTicks = observed - predicted; // negative => earlier than predicted
if (slackTicks < 0)
{
double slackMs = TicksToMs(-slackTicks);
if (slackMs > maxNegativeSlackMs) maxNegativeSlackMs = slackMs;
}
}
var validation = new ModelValidation(
AbsErrorMs: ComputeStats(absErrMs),
MaxNegativeSlackMs: maxNegativeSlackMs,
Count: n);
// --- Flush sizes (optional): consecutive deliveries within window ---
//Stats? flushStats = null;
//if (computeFlush)
//{
// var flushSizes = ComputeFlushSizes(ordered, flushWindowMs);
// flushStats = ComputeStats(flushSizes.Select(x => (double)x).ToArray());
//}
var queueLength = ComputeStats(ordered.Select(x => (double)x.NotificationsCountWaitingInTheQueueAtEnqueueing).ToArray());
var flushStats = ComputeStats(ordered.GroupBy(x => x.FlushId).Select(x => (double)x.First().BatchSize).ToArray());
return new EvalResult(alpha, lambda, mu, latency, validation, queueLength, flushStats);
}
// ---------------- Helpers ----------------
private static double EstimateLambda(AsyncQueueItem[] ordered)
{
if (ordered.Length < 2) return 0;
DateTime first = ordered[0].Arrival;
DateTime last = ordered[^1].Arrival;
double seconds = (last - first).TotalSeconds;
if (seconds <= 0) return 0;
// N-1 arrivals over observed interval
return (ordered.Length - 1) / seconds;
}
private static double TicksToMs(long ticks) => ticks / 10_000.0; // 1 tick = 100ns
private static int[] ComputeFlushSizes(AsyncQueueItem[] ordered, double windowMs)
{
var sizes = new List(ordered.Length);
long windowTicks = (long)(windowMs * 10_000.0);
int i = 0;
while (i < ordered.Length)
{
int j = i + 1;
long baseD = ordered[i].Delivered.Ticks;
while (j < ordered.Length)
{
long dj = ordered[j].Delivered.Ticks;
if (Math.Abs(dj - baseD) <= windowTicks) j++;
else break;
}
sizes.Add(j - i);
i = j;
}
return sizes.ToArray();
}
private static Stats ComputeStats(double[] values)
{
if (values.Length == 0) return new Stats(0, 0, 0, 0, 0);
double mean = values.Average();
double max = values.Max();
var sorted = (double[])values.Clone();
Array.Sort(sorted);
return new Stats(
Mean: mean,
P50: QuantileSorted(sorted, 0.50),
P95: QuantileSorted(sorted, 0.95),
P99: QuantileSorted(sorted, 0.99),
Max: max);
}
// Linear interpolation quantile
private static double QuantileSorted(double[] sorted, double q)
{
if (sorted.Length == 1) return sorted[0];
double pos = (sorted.Length - 1) * q;
int lo = (int)Math.Floor(pos);
int hi = (int)Math.Ceiling(pos);
if (lo == hi) return sorted[lo];
double frac = pos - lo;
return sorted[lo] + (sorted[hi] - sorted[lo]) * frac;
}
// Compute the element-wise average of a sequence of EvalResult
public static EvalResult Average(IEnumerable results)
{
if (results == null) throw new ArgumentNullException(nameof(results));
var arr = results.ToArray();
if (arr.Length == 0) throw new ArgumentException("results is empty.", nameof(results));
double avgAlpha = arr.Average(r => r.Alpha);
double avgLambda = arr.Average(r => r.LambdaEventsPerSecond);
double avgMu = arr.Average(r => r.MuEventsPerSecond);
Stats avgReadiness = AverageStats(arr.Select(r => r.Latency.ReadinessMs));
Stats avgHol = AverageStats(arr.Select(r => r.Latency.HolMs));
Stats avgE2E = AverageStats(arr.Select(r => r.Latency.EndToEndMs));
var avgLatency = new LatencyDecomposition(avgReadiness, avgHol, avgE2E);
Stats avgAbsError = AverageStats(arr.Select(r => r.Validation.AbsErrorMs));
double worstNegativeSlack = arr.Max(r => r.Validation.MaxNegativeSlackMs);
int totalCount = arr.Sum(r => r.Validation.Count);
var avgValidation = new ModelValidation(avgAbsError, worstNegativeSlack, totalCount);
Stats? avgFlush = null;
var flushStatsSeq = arr.Select(r => r.FlushSizeStats).Where(s => s != null).Select(s => s!).ToArray();
if (flushStatsSeq.Length > 0) avgFlush = AverageStats(flushStatsSeq);
var avgQueue = AverageStats(arr.Select(x => x.QueueLength));
return new EvalResult(avgAlpha, avgLambda, avgMu, avgLatency, avgValidation, avgQueue, avgFlush);
}
private static double EstimateMu(AsyncQueueItem[] ordered)
{
if (ordered.Length < 2) return 0;
DateTime first = ordered[0].Delivered;
DateTime last = ordered[^1].Delivered;
double seconds = (last - first).TotalSeconds;
if (seconds <= 0) return 0;
// N-1 completions over observed interval
return (ordered.Length - 1) / seconds;
}
private static Stats AverageStats(IEnumerable seq)
{
var arr = seq.ToArray();
if (arr.Length == 0) return new Stats(0, 0, 0, 0, 0);
return new Stats(
Mean: arr.Average(s => s.Mean),
P50: arr.Average(s => s.P50),
P95: arr.Average(s => s.P95),
P99: arr.Average(s => s.P99),
Max: arr.Average(s => s.Max));
}
}
}