diff --git a/README.md b/README.md index bb816f1..e534235 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,8 @@ replayable scheduling traces, and canary/shadow release decisions. outcomes. - Backend mirror normalization for vLLM/SGLang-style serving observations before the release gate runs. +- Streaming token-event normalization with route and scheduler provenance for + mirrored serving traces. - Exact output checks, model-aware numeric tolerances for backend drift, per-segment release summaries, error-rate deltas, p95 latency regression policy, TTFT and decode-token p95 checks, KV memory-pressure reporting, @@ -54,6 +56,10 @@ cargo run --release -- gate \ cargo run --release -- mirror-gate \ --input fixtures/backend_mirror_vllm_sglang.json \ --output artifacts/backend-mirror-report.json + +cargo run --release -- mirror-gate \ + --input fixtures/backend_mirror_streaming_vllm_sglang.json \ + --output artifacts/backend-mirror-streaming-report.json ``` The safe fixture produces `promote`. The candidate with an output mismatch and @@ -64,6 +70,11 @@ The backend-mirror fixture converts vLLM/SGLang-style request observations into the same release gate and produces `promote` with a vLLM to SGLang segment, model-version transition metadata, queue depth, KV memory pressure, TTFT, and decode-token p95 telemetry. +The streaming mirror fixture uses per-token stream events instead of compact +token arrays and requires complete candidate route and scheduler provenance. It +produces `promote` with `candidate_routing_provenance_rate: 1.0`, +`candidate_streaming_trace_rate: 1.0`, two candidate routes, and +`continuous-batching` scheduler evidence. The checked workload fixture completes four requests in 11 scheduler ticks, accounts for 224 prompt tokens, 18 decode tokens, and 18 reserved KV pages, @@ -108,14 +119,18 @@ gate input. `runtime-lab mirror-gate` performs the conversion and immediately evaluates the release policy. The adapter accepts per-request latency, health, model, backend, accelerator, -output token IDs, explicit output fingerprints, and optional numeric output -vectors. Successful observations must carry output material so correctness -checks remain auditable. Token IDs and numeric vectors are converted into -stable FNV-1a fingerprints when an engine-specific fingerprint is not supplied. -Observations may also carry model version, queue depth, KV page usage, TTFT, -decode-token latencies, and token-trace fingerprints. Those fields let the gate -surface rollout context and hold a candidate when latency or memory-pressure -telemetry crosses policy even if output correctness is intact. +output token IDs, explicit output fingerprints, optional numeric output +vectors, and optional streaming token events. Successful observations must +carry output material so correctness checks remain auditable. Token IDs, +streaming token events, and numeric vectors are converted into stable FNV-1a +fingerprints when an engine-specific fingerprint is not supplied. Observations +may also carry model version, route ID, replica ID, scheduler policy, queue +depth, KV page usage, TTFT, decode-token latencies, and token-trace +fingerprints. When per-token stream events are provided, the adapter derives +TTFT and decode-token gaps from their elapsed timestamps. Those fields let the +gate surface rollout context and hold a candidate when latency, memory +pressure, routing provenance, or streaming trace coverage crosses policy even +if output correctness is intact. ## Release Policy @@ -134,6 +149,7 @@ hint, and the next investigation action. | Error-rate increase above policy | `rollback` | | p95 latency regression above policy | `hold` | | TTFT, decode-token p95, or memory-pressure regression above policy | `hold` | +| Missing required candidate route/scheduler or streaming-token evidence | `hold` | | Missing or insufficient matched traffic | `hold` | | Complete evidence within policy | `promote` | diff --git a/artifacts/backend-mirror-report.json b/artifacts/backend-mirror-report.json index fac4fd0..c2bfa36 100644 --- a/artifacts/backend-mirror-report.json +++ b/artifacts/backend-mirror-report.json @@ -1,5 +1,5 @@ { - "schema_version": 3, + "schema_version": 4, "decision": "promote", "matched_requests": 4, "baseline_requests": 4, @@ -25,6 +25,10 @@ "decode_token_p95_regression_pct": -6.666667, "max_candidate_queue_depth": 6, "max_candidate_memory_pressure_pct": 60.0, + "candidate_routing_provenance_rate": 0.0, + "candidate_streaming_trace_rate": 0.0, + "candidate_route_count": 0, + "candidate_scheduler_policies": [], "token_trace_pairs": 4, "token_trace_mismatch_rate": 0.0, "segments": [ @@ -50,6 +54,10 @@ "decode_token_p95_regression_pct": -6.666667, "max_candidate_queue_depth": 6, "max_candidate_memory_pressure_pct": 60.0, + "candidate_routing_provenance_rate": 0.0, + "candidate_streaming_trace_rate": 0.0, + "candidate_route_count": 0, + "candidate_scheduler_policies": [], "token_trace_pairs": 4, "token_trace_mismatch_rate": 0.0 } diff --git a/artifacts/backend-mirror-streaming-report.json b/artifacts/backend-mirror-streaming-report.json new file mode 100644 index 0000000..9799cdc --- /dev/null +++ b/artifacts/backend-mirror-streaming-report.json @@ -0,0 +1,73 @@ +{ + "schema_version": 4, + "decision": "promote", + "matched_requests": 4, + "baseline_requests": 4, + "candidate_requests": 4, + "coverage_rate": 1.0, + "output_mismatch_rate": 0.0, + "numeric_pairs": 0, + "tolerated_numeric_outputs": 0, + "numeric_drift_rate": 0.0, + "max_numeric_abs_error": null, + "max_numeric_rel_error": null, + "baseline_error_rate": 0.0, + "candidate_error_rate": 0.0, + "error_rate_increase": 0.0, + "baseline_p95_latency_ms": 28.0, + "candidate_p95_latency_ms": 27.2, + "p95_latency_regression_pct": -2.857143, + "baseline_p95_ttft_ms": 9.0, + "candidate_p95_ttft_ms": 8.5, + "ttft_regression_pct": -5.555556, + "baseline_decode_token_p95_ms": 7.0, + "candidate_decode_token_p95_ms": 6.5, + "decode_token_p95_regression_pct": -7.142857, + "max_candidate_queue_depth": 6, + "max_candidate_memory_pressure_pct": 60.0, + "candidate_routing_provenance_rate": 1.0, + "candidate_streaming_trace_rate": 1.0, + "candidate_route_count": 2, + "candidate_scheduler_policies": [ + "continuous-batching" + ], + "token_trace_pairs": 4, + "token_trace_mismatch_rate": 0.0, + "segments": [ + { + "model": "decoder-7b", + "baseline_backend": "vllm", + "candidate_backend": "sglang", + "accelerator": "h100", + "baseline_model_version": "decoder-7b@baseline-2026-06-28", + "candidate_model_version": "decoder-7b@candidate-2026-06-28", + "matched_requests": 4, + "output_mismatch_rate": 0.0, + "baseline_error_rate": 0.0, + "candidate_error_rate": 0.0, + "baseline_p95_latency_ms": 28.0, + "candidate_p95_latency_ms": 27.2, + "p95_latency_regression_pct": -2.857143, + "baseline_p95_ttft_ms": 9.0, + "candidate_p95_ttft_ms": 8.5, + "ttft_regression_pct": -5.555556, + "baseline_decode_token_p95_ms": 7.0, + "candidate_decode_token_p95_ms": 6.5, + "decode_token_p95_regression_pct": -7.142857, + "max_candidate_queue_depth": 6, + "max_candidate_memory_pressure_pct": 60.0, + "candidate_routing_provenance_rate": 1.0, + "candidate_streaming_trace_rate": 1.0, + "candidate_route_count": 2, + "candidate_scheduler_policies": [ + "continuous-batching" + ], + "token_trace_pairs": 4, + "token_trace_mismatch_rate": 0.0 + } + ], + "triage": [], + "reasons": [ + "candidate stayed within correctness, reliability, latency, and telemetry policy" + ] +} diff --git a/artifacts/release-gate-numeric-tolerance.json b/artifacts/release-gate-numeric-tolerance.json index 0507ae5..e0209a0 100644 --- a/artifacts/release-gate-numeric-tolerance.json +++ b/artifacts/release-gate-numeric-tolerance.json @@ -1,5 +1,5 @@ { - "schema_version": 3, + "schema_version": 4, "decision": "promote", "matched_requests": 4, "baseline_requests": 4, @@ -25,6 +25,10 @@ "decode_token_p95_regression_pct": null, "max_candidate_queue_depth": null, "max_candidate_memory_pressure_pct": null, + "candidate_routing_provenance_rate": 0.0, + "candidate_streaming_trace_rate": 0.0, + "candidate_route_count": 0, + "candidate_scheduler_policies": [], "token_trace_pairs": 0, "token_trace_mismatch_rate": 0.0, "segments": [ @@ -50,6 +54,10 @@ "decode_token_p95_regression_pct": null, "max_candidate_queue_depth": null, "max_candidate_memory_pressure_pct": null, + "candidate_routing_provenance_rate": 0.0, + "candidate_streaming_trace_rate": 0.0, + "candidate_route_count": 0, + "candidate_scheduler_policies": [], "token_trace_pairs": 0, "token_trace_mismatch_rate": 0.0 } diff --git a/artifacts/release-gate-promote.json b/artifacts/release-gate-promote.json index 9936d06..e462ea0 100644 --- a/artifacts/release-gate-promote.json +++ b/artifacts/release-gate-promote.json @@ -1,5 +1,5 @@ { - "schema_version": 3, + "schema_version": 4, "decision": "promote", "matched_requests": 4, "baseline_requests": 4, @@ -25,6 +25,10 @@ "decode_token_p95_regression_pct": null, "max_candidate_queue_depth": null, "max_candidate_memory_pressure_pct": null, + "candidate_routing_provenance_rate": 0.0, + "candidate_streaming_trace_rate": 0.0, + "candidate_route_count": 0, + "candidate_scheduler_policies": [], "token_trace_pairs": 0, "token_trace_mismatch_rate": 0.0, "segments": [ @@ -50,6 +54,10 @@ "decode_token_p95_regression_pct": null, "max_candidate_queue_depth": null, "max_candidate_memory_pressure_pct": null, + "candidate_routing_provenance_rate": 0.0, + "candidate_streaming_trace_rate": 0.0, + "candidate_route_count": 0, + "candidate_scheduler_policies": [], "token_trace_pairs": 0, "token_trace_mismatch_rate": 0.0 } diff --git a/artifacts/release-gate-rollback.json b/artifacts/release-gate-rollback.json index 8a6a625..9c7b48c 100644 --- a/artifacts/release-gate-rollback.json +++ b/artifacts/release-gate-rollback.json @@ -1,5 +1,5 @@ { - "schema_version": 3, + "schema_version": 4, "decision": "rollback", "matched_requests": 4, "baseline_requests": 4, @@ -25,6 +25,10 @@ "decode_token_p95_regression_pct": null, "max_candidate_queue_depth": null, "max_candidate_memory_pressure_pct": null, + "candidate_routing_provenance_rate": 0.0, + "candidate_streaming_trace_rate": 0.0, + "candidate_route_count": 0, + "candidate_scheduler_policies": [], "token_trace_pairs": 0, "token_trace_mismatch_rate": 0.0, "segments": [ @@ -50,6 +54,10 @@ "decode_token_p95_regression_pct": null, "max_candidate_queue_depth": null, "max_candidate_memory_pressure_pct": null, + "candidate_routing_provenance_rate": 0.0, + "candidate_streaming_trace_rate": 0.0, + "candidate_route_count": 0, + "candidate_scheduler_policies": [], "token_trace_pairs": 0, "token_trace_mismatch_rate": 0.0 } diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index 01689bb..65af203 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -48,13 +48,15 @@ It computes: - successful-request p95 latency; and - candidate p95 regression; - TTFT p95 and decode-token p95 regression; -- candidate queue depth and KV memory pressure; and +- candidate queue depth and KV memory pressure; +- candidate route/scheduler provenance and streaming trace coverage; and - segment summaries by model, model version, baseline backend, candidate backend, and accelerator. Correctness, numeric drift, or reliability regressions produce `rollback`. -Latency, token-path, memory-pressure regressions, or incomplete evidence produce -`hold`. A complete candidate within policy produces `promote`. +Latency, token-path, memory-pressure, missing routing provenance, missing +streaming trace coverage, or incomplete evidence produce `hold`. A complete +candidate within policy produces `promote`. Hold and rollback reports include structured triage items so CI or rollout tooling can route the failed signal to a likely owner without parsing prose. @@ -68,11 +70,13 @@ The adapter sits before the release gate. It normalizes backend-specific mirrored observations into `GateInput` without changing the gate policy. This keeps ingestion concerns separate from rollout decisions. -The adapter currently accepts compact vLLM/SGLang-style request summaries: -request ID, latency, health, model, backend, accelerator, output token IDs, -optional explicit fingerprints, optional numeric vectors, model version, queue -depth, KV page usage, TTFT, decode-token latencies, and optional token-trace -fingerprints. If an engine does not provide an output fingerprint, the adapter -computes a stable FNV-1a fingerprint from token IDs or numeric values. -Successful observations without output material are rejected so a candidate -cannot be promoted from latency-only evidence. +The adapter currently accepts compact vLLM/SGLang-style request summaries and +streaming request traces: request ID, latency, health, model, backend, +accelerator, output token IDs, streaming token events, optional explicit +fingerprints, optional numeric vectors, model version, route ID, replica ID, +scheduler policy, queue depth, KV page usage, TTFT, decode-token latencies, and +optional token-trace fingerprints. If an engine does not provide an output +fingerprint, the adapter computes a stable FNV-1a fingerprint from token IDs, +streaming token events, or numeric values. Successful observations without +output material are rejected so a candidate cannot be promoted from +latency-only evidence. diff --git a/docs/RELEASE_VALIDATION.md b/docs/RELEASE_VALIDATION.md index a67b638..66c2337 100644 --- a/docs/RELEASE_VALIDATION.md +++ b/docs/RELEASE_VALIDATION.md @@ -12,6 +12,8 @@ Use `promote` when: - the candidate error-rate increase stays within policy; and - candidate p95 latency, TTFT p95, decode-token p95, and KV memory pressure stay within the configured regression budgets. +- required candidate route/scheduler provenance and streaming token traces are + complete when those checks are enabled. ## Hold @@ -21,6 +23,9 @@ output pairs, or a p95 latency regression without a correctness failure. The same response is used for excessive TTFT regression, decode-token p95 regression, or candidate KV memory pressure because those are operational signals that need investigation before rollout. +If route/scheduler provenance or streaming token traces are required but +missing, the gate also returns `hold`; the candidate may still be correct, but +the rollout evidence is not complete enough to trust the serving path. ## Rollback @@ -54,17 +59,20 @@ The report includes: Mirrored observations can include rollout context and token-path telemetry: - model version; +- route ID, replica ID, and scheduler policy; - queue depth; - KV pages used and available; - time to first token; - per-token decode latencies; and -- token-trace fingerprints. +- streaming token events and token-trace fingerprints. The gate reports aggregate and per-segment TTFT p95, decode-token p95, maximum -candidate queue depth, maximum candidate memory pressure, and token-trace -mismatch rate. Correct outputs with excessive latency or memory pressure produce -`hold`, not `rollback`, because the evidence points to performance or capacity -risk rather than a correctness failure. +candidate queue depth, maximum candidate memory pressure, candidate +route/scheduler provenance coverage, candidate streaming-trace coverage, +candidate route count, scheduler policies, and token-trace mismatch rate. +Correct outputs with excessive latency, memory pressure, missing provenance, or +missing streaming traces produce `hold`, not `rollback`, because the evidence +points to operational risk rather than a correctness failure. ## Triage Output @@ -87,11 +95,13 @@ baseline/candidate comparisons such as vLLM versus SGLang, or a current runtime versus a candidate runtime behind shadow traffic. Each observation records request ID, latency, health, model, backend, -accelerator, output material, and optional operational telemetry. Engines may provide their own -`output_fingerprint`; otherwise the adapter hashes output token IDs or numeric -output vectors with a stable FNV-1a fingerprint. Successful observations -without output material are rejected because the release gate cannot audit -correctness from latency alone. +accelerator, output material, and optional operational telemetry. Engines may +provide their own `output_fingerprint`; otherwise the adapter hashes output +token IDs, streaming token events, or numeric output vectors with a stable +FNV-1a fingerprint. Streaming token events also let the adapter derive TTFT and +decode-token gaps from elapsed timestamps. Successful observations without +output material are rejected because the release gate cannot audit correctness +from latency alone. ## Production Extension Points @@ -101,7 +111,8 @@ A real rollout system should add: - prompt-class and region segmentation; - SLO burn-rate and saturation signals; - canary population controls and audited rollback execution; and -- provenance linking every decision to build, model, and configuration IDs. +- provenance linking every decision to build, model, route, scheduler, and + configuration IDs. The checked fixtures are synthetic and exist to make the policy executable in CI. They are not claims about production traffic or fleet scale. diff --git a/fixtures/backend_mirror_streaming_vllm_sglang.json b/fixtures/backend_mirror_streaming_vllm_sglang.json new file mode 100644 index 0000000..b509413 --- /dev/null +++ b/fixtures/backend_mirror_streaming_vllm_sglang.json @@ -0,0 +1,167 @@ +{ + "thresholds": { + "min_matched_requests": 4, + "max_output_mismatch_rate": 0.0, + "max_error_rate_increase": 0.01, + "max_p95_latency_regression_pct": 10.0, + "max_ttft_regression_pct": 10.0, + "max_decode_token_p95_regression_pct": 10.0, + "max_candidate_memory_pressure_pct": 90.0, + "max_numeric_drift_rate": 0.0, + "numeric_tolerances": [], + "require_candidate_route_provenance": true, + "require_streaming_token_trace": true + }, + "baseline": { + "backend": "vllm", + "model": "decoder-7b", + "accelerator": "h100", + "observations": [ + { + "request_id": "prompt-a", + "latency_ms": 18.0, + "ok": true, + "model_version": "decoder-7b@baseline-2026-06-28", + "route_id": "vllm-prefill-a", + "replica_id": "vllm-replica-0", + "scheduler_policy": "continuous-batching", + "queue_depth": 2, + "kv_pages_used": 8, + "kv_pages_capacity": 20, + "streaming_token_events": [ + { "sequence": 0, "token_id": 101, "elapsed_ms": 6.0 }, + { "sequence": 1, "token_id": 1402, "elapsed_ms": 10.0 }, + { "sequence": 2, "token_id": 13, "elapsed_ms": 14.5 } + ] + }, + { + "request_id": "prompt-b", + "latency_ms": 21.0, + "ok": true, + "model_version": "decoder-7b@baseline-2026-06-28", + "route_id": "vllm-prefill-a", + "replica_id": "vllm-replica-1", + "scheduler_policy": "continuous-batching", + "queue_depth": 3, + "kv_pages_used": 9, + "kv_pages_capacity": 20, + "streaming_token_events": [ + { "sequence": 0, "token_id": 205, "elapsed_ms": 7.0 }, + { "sequence": 1, "token_id": 778, "elapsed_ms": 12.0 }, + { "sequence": 2, "token_id": 990, "elapsed_ms": 17.5 } + ] + }, + { + "request_id": "prompt-c", + "latency_ms": 24.0, + "ok": true, + "model_version": "decoder-7b@baseline-2026-06-28", + "route_id": "vllm-prefill-b", + "replica_id": "vllm-replica-0", + "scheduler_policy": "continuous-batching", + "queue_depth": 4, + "kv_pages_used": 10, + "kv_pages_capacity": 20, + "streaming_token_events": [ + { "sequence": 0, "token_id": 42, "elapsed_ms": 8.0 }, + { "sequence": 1, "token_id": 42, "elapsed_ms": 14.0 }, + { "sequence": 2, "token_id": 7, "elapsed_ms": 20.5 } + ] + }, + { + "request_id": "prompt-d", + "latency_ms": 28.0, + "ok": true, + "model_version": "decoder-7b@baseline-2026-06-28", + "route_id": "vllm-prefill-b", + "replica_id": "vllm-replica-1", + "scheduler_policy": "continuous-batching", + "queue_depth": 5, + "kv_pages_used": 11, + "kv_pages_capacity": 20, + "streaming_token_events": [ + { "sequence": 0, "token_id": 301, "elapsed_ms": 9.0 }, + { "sequence": 1, "token_id": 302, "elapsed_ms": 15.5 }, + { "sequence": 2, "token_id": 303, "elapsed_ms": 22.5 }, + { "sequence": 3, "token_id": 2, "elapsed_ms": 28.0 } + ] + } + ] + }, + "candidate": { + "backend": "sglang", + "model": "decoder-7b", + "accelerator": "h100", + "observations": [ + { + "request_id": "prompt-a", + "latency_ms": 17.5, + "ok": true, + "model_version": "decoder-7b@candidate-2026-06-28", + "route_id": "sglang-prefill-a", + "replica_id": "sglang-replica-0", + "scheduler_policy": "continuous-batching", + "queue_depth": 3, + "kv_pages_used": 9, + "kv_pages_capacity": 20, + "streaming_token_events": [ + { "sequence": 0, "token_id": 101, "elapsed_ms": 5.5 }, + { "sequence": 1, "token_id": 1402, "elapsed_ms": 9.3 }, + { "sequence": 2, "token_id": 13, "elapsed_ms": 13.3 } + ] + }, + { + "request_id": "prompt-b", + "latency_ms": 20.6, + "ok": true, + "model_version": "decoder-7b@candidate-2026-06-28", + "route_id": "sglang-prefill-a", + "replica_id": "sglang-replica-1", + "scheduler_policy": "continuous-batching", + "queue_depth": 4, + "kv_pages_used": 10, + "kv_pages_capacity": 20, + "streaming_token_events": [ + { "sequence": 0, "token_id": 205, "elapsed_ms": 6.5 }, + { "sequence": 1, "token_id": 778, "elapsed_ms": 11.2 }, + { "sequence": 2, "token_id": 990, "elapsed_ms": 16.2 } + ] + }, + { + "request_id": "prompt-c", + "latency_ms": 23.5, + "ok": true, + "model_version": "decoder-7b@candidate-2026-06-28", + "route_id": "sglang-prefill-b", + "replica_id": "sglang-replica-0", + "scheduler_policy": "continuous-batching", + "queue_depth": 5, + "kv_pages_used": 11, + "kv_pages_capacity": 20, + "streaming_token_events": [ + { "sequence": 0, "token_id": 42, "elapsed_ms": 7.5 }, + { "sequence": 1, "token_id": 42, "elapsed_ms": 13.3 }, + { "sequence": 2, "token_id": 7, "elapsed_ms": 19.3 } + ] + }, + { + "request_id": "prompt-d", + "latency_ms": 27.2, + "ok": true, + "model_version": "decoder-7b@candidate-2026-06-28", + "route_id": "sglang-prefill-b", + "replica_id": "sglang-replica-1", + "scheduler_policy": "continuous-batching", + "queue_depth": 6, + "kv_pages_used": 12, + "kv_pages_capacity": 20, + "streaming_token_events": [ + { "sequence": 0, "token_id": 301, "elapsed_ms": 8.5 }, + { "sequence": 1, "token_id": 302, "elapsed_ms": 14.7 }, + { "sequence": 2, "token_id": 303, "elapsed_ms": 21.2 }, + { "sequence": 3, "token_id": 2, "elapsed_ms": 27.2 } + ] + } + ] + } +} diff --git a/src/adapter.rs b/src/adapter.rs index 041bfab..b0574e4 100644 --- a/src/adapter.rs +++ b/src/adapter.rs @@ -39,6 +39,14 @@ pub struct BackendObservation { #[serde(default)] pub model_version: Option, #[serde(default)] + pub route_id: Option, + #[serde(default)] + pub replica_id: Option, + #[serde(default)] + pub scheduler_policy: Option, + #[serde(default)] + pub streaming_token_events: Vec, + #[serde(default)] pub queue_depth: Option, #[serde(default)] pub kv_pages_used: Option, @@ -52,6 +60,13 @@ pub struct BackendObservation { pub token_trace_fingerprint: Option, } +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct StreamingTokenEvent { + pub sequence: usize, + pub token_id: i64, + pub elapsed_ms: f64, +} + #[derive(Debug, Clone, PartialEq, Eq)] pub struct AdapterError { message: String, @@ -149,6 +164,7 @@ fn validate_operational_telemetry(observation: &BackendObservation) -> Result<() ))); } } + validate_streaming_token_events(observation)?; match (observation.kv_pages_used, observation.kv_pages_capacity) { (Some(_), Some(0)) => Err(AdapterError::new(format!( @@ -163,14 +179,56 @@ fn validate_operational_telemetry(observation: &BackendObservation) -> Result<() } } +fn validate_streaming_token_events(observation: &BackendObservation) -> Result<(), AdapterError> { + let mut last_sequence = None; + let mut last_elapsed_ms = None; + for event in &observation.streaming_token_events { + if !event.elapsed_ms.is_finite() || event.elapsed_ms < 0.0 { + return Err(AdapterError::new(format!( + "request {} has invalid streaming_token_events elapsed_ms", + observation.request_id + ))); + } + if let Some(last_sequence) = last_sequence + && event.sequence <= last_sequence + { + return Err(AdapterError::new(format!( + "request {} has non-increasing streaming token sequence", + observation.request_id + ))); + } + if let Some(last_elapsed_ms) = last_elapsed_ms + && event.elapsed_ms < last_elapsed_ms + { + return Err(AdapterError::new(format!( + "request {} has decreasing streaming token elapsed_ms", + observation.request_id + ))); + } + last_sequence = Some(event.sequence); + last_elapsed_ms = Some(event.elapsed_ms); + } + Ok(()) +} + fn operational_telemetry(observation: &BackendObservation) -> OperationalTelemetry { OperationalTelemetry { model_version: observation.model_version.clone(), + route_id: normalized_label(&observation.route_id), + replica_id: normalized_label(&observation.replica_id), + scheduler_policy: normalized_label(&observation.scheduler_policy), + streamed_token_count: observation.streaming_token_events.len(), queue_depth: observation.queue_depth, kv_pages_used: observation.kv_pages_used, kv_pages_capacity: observation.kv_pages_capacity, - time_to_first_token_ms: observation.time_to_first_token_ms, - decode_token_latencies_ms: observation.decode_token_latencies_ms.clone(), + time_to_first_token_ms: observation + .time_to_first_token_ms + .or_else(|| first_stream_token_ms(observation)), + decode_token_latencies_ms: if observation.decode_token_latencies_ms.is_empty() { + decode_latencies_from_stream(observation) + } else { + observation.decode_token_latencies_ms.clone() + }, token_trace_fingerprint: token_trace_fingerprint(observation), } } @@ -182,12 +240,13 @@ fn token_trace_fingerprint(observation: &BackendObservation) -> Option { return Some(fingerprint.clone()); } - if observation.output_token_ids.is_empty() { + let token_ids = observed_token_ids(observation); + if token_ids.is_empty() { None } else { Some(format!( "token-trace-fnv64:{:016x}", - hash_i64_values(&observation.output_token_ids) + hash_i64_values(&token_ids) )) } } @@ -199,11 +258,9 @@ fn output_fingerprint(observation: &BackendObservation, ok: bool) -> Result Result Vec { + if !observation.output_token_ids.is_empty() { + observation.output_token_ids.clone() + } else { + observation + .streaming_token_events + .iter() + .map(|event| event.token_id) + .collect() + } +} + +fn first_stream_token_ms(observation: &BackendObservation) -> Option { + observation + .streaming_token_events + .first() + .map(|event| event.elapsed_ms) +} + +fn decode_latencies_from_stream(observation: &BackendObservation) -> Vec { + observation + .streaming_token_events + .windows(2) + .map(|events| round6(events[1].elapsed_ms - events[0].elapsed_ms)) + .collect() +} + +fn normalized_label(value: &Option) -> Option { + value + .as_ref() + .map(|value| value.trim()) + .filter(|value| !value.is_empty()) + .map(str::to_owned) +} + +fn round6(value: f64) -> f64 { + (value * 1_000_000.0).round() / 1_000_000.0 +} + fn hash_i64_values(values: &[i64]) -> u64 { let mut hash = FNV_OFFSET_BASIS; feed_usize(&mut hash, values.len()); diff --git a/src/lib.rs b/src/lib.rs index 2f8e644..e87f07c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,7 +4,7 @@ pub mod scheduler; pub use adapter::{ AdapterError, BackendMirrorInput, BackendObservation, BackendObservationSet, - mirror_to_gate_input, + StreamingTokenEvent, mirror_to_gate_input, }; pub use release::{ GateDecision, GateInput, GateReport, GateThresholds, NumericTolerance, Observation, diff --git a/src/release.rs b/src/release.rs index 62e997b..dd20a8d 100644 --- a/src/release.rs +++ b/src/release.rs @@ -1,4 +1,4 @@ -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use serde::{Deserialize, Serialize}; @@ -25,6 +25,14 @@ pub struct OperationalTelemetry { #[serde(default)] pub model_version: Option, #[serde(default)] + pub route_id: Option, + #[serde(default)] + pub replica_id: Option, + #[serde(default)] + pub scheduler_policy: Option, + #[serde(default)] + pub streamed_token_count: usize, + #[serde(default)] pub queue_depth: Option, #[serde(default)] pub kv_pages_used: Option, @@ -104,6 +112,10 @@ pub struct GateThresholds { pub max_numeric_drift_rate: f64, #[serde(default)] pub numeric_tolerances: Vec, + #[serde(default)] + pub require_candidate_route_provenance: bool, + #[serde(default)] + pub require_streaming_token_trace: bool, } impl Default for GateThresholds { @@ -118,6 +130,8 @@ impl Default for GateThresholds { max_candidate_memory_pressure_pct: default_max_candidate_memory_pressure_pct(), max_numeric_drift_rate: 0.0, numeric_tolerances: Vec::new(), + require_candidate_route_provenance: false, + require_streaming_token_trace: false, } } } @@ -182,6 +196,10 @@ pub struct SegmentReport { pub decode_token_p95_regression_pct: Option, pub max_candidate_queue_depth: Option, pub max_candidate_memory_pressure_pct: Option, + pub candidate_routing_provenance_rate: f64, + pub candidate_streaming_trace_rate: f64, + pub candidate_route_count: usize, + pub candidate_scheduler_policies: Vec, pub token_trace_pairs: usize, pub token_trace_mismatch_rate: f64, } @@ -214,6 +232,10 @@ pub struct GateReport { pub decode_token_p95_regression_pct: Option, pub max_candidate_queue_depth: Option, pub max_candidate_memory_pressure_pct: Option, + pub candidate_routing_provenance_rate: f64, + pub candidate_streaming_trace_rate: f64, + pub candidate_route_count: usize, + pub candidate_scheduler_policies: Vec, pub token_trace_pairs: usize, pub token_trace_mismatch_rate: f64, pub segments: Vec, @@ -361,6 +383,31 @@ pub fn evaluate_release(input: &GateInput) -> GateReport { .filter_map(|(_, candidate)| candidate.telemetry.memory_pressure_pct()) .map(Some), ); + let successful_candidate_requests = + matched.iter().filter(|(_, candidate)| candidate.ok).count(); + let candidate_routing_provenance_count = matched + .iter() + .filter(|(_, candidate)| candidate.ok && has_candidate_route_provenance(candidate)) + .count(); + let candidate_streaming_trace_count = matched + .iter() + .filter(|(_, candidate)| candidate.ok && has_streaming_token_trace(candidate)) + .count(); + let candidate_routing_provenance_rate = ratio( + candidate_routing_provenance_count, + successful_candidate_requests, + ); + let candidate_streaming_trace_rate = ratio( + candidate_streaming_trace_count, + successful_candidate_requests, + ); + let candidate_route_count = unique_telemetry_values(&matched, |candidate| { + candidate.telemetry.route_id.as_deref() + }) + .len(); + let candidate_scheduler_policies = unique_telemetry_values(&matched, |candidate| { + candidate.telemetry.scheduler_policy.as_deref() + }); let token_trace_pairs = matched .iter() .filter(|(baseline, candidate)| { @@ -531,6 +578,38 @@ pub fn evaluate_release(input: &GateInput) -> GateReport { ), ); } + let route_provenance_failed = input.thresholds.require_candidate_route_provenance + && candidate_routing_provenance_rate < 1.0; + if route_provenance_failed { + add_triage( + &mut reasons, + &mut triage, + "routing_provenance", + GateDecision::Hold, + "serving_runtime", + "collect route and scheduler provenance for every successful candidate request", + format!( + "candidate routing provenance rate {:.4}; expected complete route and scheduler coverage", + candidate_routing_provenance_rate + ), + ); + } + let streaming_trace_failed = + input.thresholds.require_streaming_token_trace && candidate_streaming_trace_rate < 1.0; + if streaming_trace_failed { + add_triage( + &mut reasons, + &mut triage, + "streaming_token_trace", + GateDecision::Hold, + "decode_runtime", + "collect streaming token events before trusting decode-path rollout evidence", + format!( + "candidate streaming-token trace rate {:.4}; expected complete streaming trace coverage", + candidate_streaming_trace_rate + ), + ); + } let decision = if correctness_failed || numeric_correctness_failed || reliability_failed { GateDecision::Rollback @@ -540,6 +619,8 @@ pub fn evaluate_release(input: &GateInput) -> GateReport { || ttft_failed || decode_token_latency_failed || memory_pressure_failed + || route_provenance_failed + || streaming_trace_failed { GateDecision::Hold } else { @@ -551,7 +632,7 @@ pub fn evaluate_release(input: &GateInput) -> GateReport { }; GateReport { - schema_version: 3, + schema_version: 4, decision, matched_requests, baseline_requests, @@ -577,6 +658,10 @@ pub fn evaluate_release(input: &GateInput) -> GateReport { decode_token_p95_regression_pct, max_candidate_queue_depth, max_candidate_memory_pressure_pct, + candidate_routing_provenance_rate, + candidate_streaming_trace_rate, + candidate_route_count, + candidate_scheduler_policies, token_trace_pairs, token_trace_mismatch_rate, segments: segment_reports(&matched, &matched_comparisons), @@ -733,6 +818,11 @@ struct SegmentAccumulator { candidate_decode_token_latencies: Vec, candidate_queue_depths: Vec, candidate_memory_pressure_pct: Vec, + candidate_successes: usize, + candidate_routing_provenance_count: usize, + candidate_streaming_trace_count: usize, + candidate_routes: BTreeSet, + candidate_scheduler_policies: BTreeSet, token_trace_pairs: usize, token_trace_mismatches: usize, } @@ -753,6 +843,21 @@ fn segment_reports( if !candidate.ok { entry.candidate_errors += 1; } + if candidate.ok { + entry.candidate_successes += 1; + if has_candidate_route_provenance(candidate) { + entry.candidate_routing_provenance_count += 1; + } + if has_streaming_token_trace(candidate) { + entry.candidate_streaming_trace_count += 1; + } + if let Some(route_id) = candidate.telemetry.route_id.as_ref() { + entry.candidate_routes.insert(route_id.clone()); + } + if let Some(policy) = candidate.telemetry.scheduler_policy.as_ref() { + entry.candidate_scheduler_policies.insert(policy.clone()); + } + } if baseline.ok && candidate.ok { entry.output_pairs += 1; if comparison.is_some_and(|comparison| !comparison.matches) { @@ -859,6 +964,19 @@ fn segment_reports( .into_iter() .map(Some), ), + candidate_routing_provenance_rate: ratio( + accumulator.candidate_routing_provenance_count, + accumulator.candidate_successes, + ), + candidate_streaming_trace_rate: ratio( + accumulator.candidate_streaming_trace_count, + accumulator.candidate_successes, + ), + candidate_route_count: accumulator.candidate_routes.len(), + candidate_scheduler_policies: accumulator + .candidate_scheduler_policies + .into_iter() + .collect(), token_trace_pairs: accumulator.token_trace_pairs, token_trace_mismatch_rate: ratio( accumulator.token_trace_mismatches, @@ -898,6 +1016,33 @@ fn segment_key(baseline: &Observation, candidate: &Observation) -> SegmentKey { } } +fn has_candidate_route_provenance(candidate: &Observation) -> bool { + has_value(&candidate.telemetry.route_id) && has_value(&candidate.telemetry.scheduler_policy) +} + +fn has_streaming_token_trace(candidate: &Observation) -> bool { + candidate.telemetry.streamed_token_count > 0 + && candidate.telemetry.token_trace_fingerprint.is_some() +} + +fn unique_telemetry_values( + matched: &[(&Observation, &Observation)], + selector: impl Fn(&Observation) -> Option<&str>, +) -> Vec { + matched + .iter() + .filter_map(|(_, candidate)| selector(candidate)) + .filter(|value| !value.trim().is_empty()) + .map(str::to_owned) + .collect::>() + .into_iter() + .collect() +} + +fn has_value(value: &Option) -> bool { + value.as_ref().is_some_and(|value| !value.trim().is_empty()) +} + fn optional_label(value: &Option) -> String { value.clone().unwrap_or_else(|| "unspecified".into()) } diff --git a/tests/adapter.rs b/tests/adapter.rs index 9c91247..01a8d42 100644 --- a/tests/adapter.rs +++ b/tests/adapter.rs @@ -1,6 +1,6 @@ use rust_inference_runtime::{ BackendMirrorInput, BackendObservation, BackendObservationSet, GateDecision, GateThresholds, - NumericTolerance, evaluate_release, mirror_to_gate_input, + NumericTolerance, StreamingTokenEvent, evaluate_release, mirror_to_gate_input, }; fn backend_set(backend: &str, observations: Vec) -> BackendObservationSet { @@ -22,6 +22,10 @@ fn token_observation(id: &str, latency_ms: f64, token_ids: &[i64]) -> BackendObs output_values: None, error: None, model_version: None, + route_id: None, + replica_id: None, + scheduler_policy: None, + streaming_token_events: Vec::new(), queue_depth: None, kv_pages_used: None, kv_pages_capacity: None, @@ -49,6 +53,10 @@ fn telemetry_observation( output_values: None, error: None, model_version: Some("decoder-7b@2026-06-24".into()), + route_id: None, + replica_id: None, + scheduler_policy: None, + streaming_token_events: Vec::new(), queue_depth: Some(queue_depth), kv_pages_used: Some(kv_pages_used), kv_pages_capacity: Some(20), @@ -68,6 +76,10 @@ fn numeric_observation(id: &str, latency_ms: f64, values: &[f64]) -> BackendObse output_values: Some(values.to_vec()), error: None, model_version: None, + route_id: None, + replica_id: None, + scheduler_policy: None, + streaming_token_events: Vec::new(), queue_depth: None, kv_pages_used: None, kv_pages_capacity: None, @@ -77,6 +89,45 @@ fn numeric_observation(id: &str, latency_ms: f64, values: &[f64]) -> BackendObse } } +fn streaming_observation( + id: &str, + latency_ms: f64, + token_ids: &[i64], + elapsed_ms: &[f64], + route_id: &str, + scheduler_policy: &str, +) -> BackendObservation { + BackendObservation { + request_id: id.into(), + latency_ms, + ok: Some(true), + output_fingerprint: None, + output_token_ids: Vec::new(), + output_values: None, + error: None, + model_version: Some("decoder-7b@stream-2026-06-28".into()), + route_id: Some(route_id.into()), + replica_id: Some(format!("{route_id}-replica-0")), + scheduler_policy: Some(scheduler_policy.into()), + streaming_token_events: token_ids + .iter() + .zip(elapsed_ms.iter()) + .enumerate() + .map(|(sequence, (token_id, elapsed_ms))| StreamingTokenEvent { + sequence, + token_id: *token_id, + elapsed_ms: *elapsed_ms, + }) + .collect(), + queue_depth: Some(3), + kv_pages_used: Some(10), + kv_pages_capacity: Some(20), + time_to_first_token_ms: None, + decode_token_latencies_ms: Vec::new(), + token_trace_fingerprint: None, + } +} + #[test] fn converts_vllm_and_sglang_mirrors_into_release_gate_input() { let input = BackendMirrorInput { @@ -147,6 +198,160 @@ fn carries_operational_telemetry_into_release_report() { assert_eq!(report.candidate_decode_token_p95_ms, Some(6.0)); } +#[test] +fn derives_streaming_events_and_route_provenance_into_release_report() { + let input = BackendMirrorInput { + thresholds: GateThresholds { + min_matched_requests: 3, + require_candidate_route_provenance: true, + require_streaming_token_trace: true, + ..GateThresholds::default() + }, + baseline: backend_set( + "vllm", + vec![ + streaming_observation( + "prompt-a", + 18.0, + &[101, 1402, 13], + &[6.0, 10.0, 14.5], + "route-prefill-a", + "continuous-batching", + ), + streaming_observation( + "prompt-b", + 21.0, + &[205, 778, 990], + &[6.5, 11.0, 15.5], + "route-prefill-a", + "continuous-batching", + ), + streaming_observation( + "prompt-c", + 24.0, + &[42, 42, 7], + &[7.0, 11.5, 16.0], + "route-prefill-b", + "continuous-batching", + ), + ], + ), + candidate: backend_set( + "sglang", + vec![ + streaming_observation( + "prompt-a", + 17.5, + &[101, 1402, 13], + &[5.1, 9.0, 12.9], + "route-prefill-a", + "continuous-batching", + ), + streaming_observation( + "prompt-b", + 20.6, + &[205, 778, 990], + &[5.4, 9.5, 13.5], + "route-prefill-a", + "continuous-batching", + ), + streaming_observation( + "prompt-c", + 23.5, + &[42, 42, 7], + &[5.8, 10.2, 14.5], + "route-prefill-b", + "continuous-batching", + ), + ], + ), + }; + + let report = evaluate_release(&mirror_to_gate_input(&input).expect("valid mirror input")); + + assert_eq!(report.decision, GateDecision::Promote); + assert_eq!(report.output_mismatch_rate, 0.0); + assert_eq!(report.candidate_routing_provenance_rate, 1.0); + assert_eq!(report.candidate_streaming_trace_rate, 1.0); + assert_eq!(report.candidate_route_count, 2); + assert_eq!( + report.candidate_scheduler_policies, + vec!["continuous-batching".to_string()] + ); + assert_eq!(report.candidate_p95_ttft_ms, Some(5.8)); + assert_eq!(report.candidate_decode_token_p95_ms, Some(4.4)); + assert_eq!(report.token_trace_pairs, 3); + assert_eq!(report.segments[0].candidate_route_count, 2); + assert_eq!(report.segments[0].candidate_streaming_trace_rate, 1.0); +} + +#[test] +fn holds_when_required_streaming_or_route_provenance_is_missing() { + let input = BackendMirrorInput { + thresholds: GateThresholds { + min_matched_requests: 3, + require_candidate_route_provenance: true, + require_streaming_token_trace: true, + ..GateThresholds::default() + }, + baseline: backend_set( + "vllm", + vec![ + streaming_observation( + "prompt-a", + 18.0, + &[101], + &[6.0], + "route-prefill-a", + "continuous-batching", + ), + streaming_observation( + "prompt-b", + 21.0, + &[205], + &[6.5], + "route-prefill-a", + "continuous-batching", + ), + streaming_observation( + "prompt-c", + 24.0, + &[42], + &[7.0], + "route-prefill-b", + "continuous-batching", + ), + ], + ), + candidate: backend_set( + "sglang", + vec![ + token_observation("prompt-a", 17.5, &[101]), + token_observation("prompt-b", 20.6, &[205]), + token_observation("prompt-c", 23.5, &[42]), + ], + ), + }; + + let report = evaluate_release(&mirror_to_gate_input(&input).expect("valid mirror input")); + + assert_eq!(report.decision, GateDecision::Hold); + assert_eq!(report.candidate_routing_provenance_rate, 0.0); + assert_eq!(report.candidate_streaming_trace_rate, 0.0); + assert!( + report + .triage + .iter() + .any(|item| item.signal == "routing_provenance") + ); + assert!( + report + .triage + .iter() + .any(|item| item.signal == "streaming_token_trace") + ); +} + #[test] fn holds_when_candidate_telemetry_exceeds_policy() { let input = BackendMirrorInput { @@ -226,6 +431,8 @@ fn numeric_mirror_outputs_use_model_backend_tolerance() { max_abs_error: 0.002, max_rel_error: 0.01, }], + require_candidate_route_provenance: false, + require_streaming_token_trace: false, }, baseline: backend_set( "vllm", @@ -268,6 +475,10 @@ fn rejects_successful_observation_without_output_material() { output_values: None, error: None, model_version: None, + route_id: None, + replica_id: None, + scheduler_policy: None, + streaming_token_events: Vec::new(), queue_depth: None, kv_pages_used: None, kv_pages_capacity: None, diff --git a/tests/release.rs b/tests/release.rs index 6191c70..36df9c5 100644 --- a/tests/release.rs +++ b/tests/release.rs @@ -130,6 +130,8 @@ fn promotes_candidate_with_numeric_outputs_inside_model_tolerance() { max_abs_error: 0.002, max_rel_error: 0.01, }], + require_candidate_route_provenance: false, + require_streaming_token_trace: false, }, baseline: vec![ numeric_observation("a", "baseline-a", "baseline", &[1.0, 2.0], 10.0), @@ -171,6 +173,8 @@ fn rolls_back_numeric_outputs_outside_model_tolerance() { max_abs_error: 0.002, max_rel_error: 0.01, }], + require_candidate_route_provenance: false, + require_streaming_token_trace: false, }, baseline: vec![ numeric_observation("a", "baseline-a", "baseline", &[1.0, 2.0], 10.0),