diff --git a/README.md b/README.md index cdd9890..f88985f 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,8 @@ replayable scheduling traces, and canary/shadow release decisions. - Deterministic workload replay with a machine-readable trace fingerprint. - Baseline/candidate release validation with `promote`, `hold`, and `rollback` outcomes. +- Backend mirror normalization for vLLM/SGLang-style serving observations + before the release gate runs. - Exact output checks, model-aware numeric tolerances for backend drift, per-segment release summaries, error-rate deltas, p95 latency regression policy, tests, and CI. @@ -40,12 +42,18 @@ cargo run --release -- gate \ cargo run --release -- gate \ --input fixtures/release_gate_numeric_tolerance.json \ --output artifacts/release-gate-numeric-tolerance.json + +cargo run --release -- mirror-gate \ + --input fixtures/backend_mirror_vllm_sglang.json \ + --output artifacts/backend-mirror-report.json ``` The safe fixture produces `promote`. The candidate with an output mismatch and an added error produces `rollback`. The numeric-tolerance fixture produces `promote` while reporting four tolerated numeric comparisons across a baseline-runtime to candidate-runtime segment. +The backend-mirror fixture converts vLLM/SGLang-style request observations into +the same release gate and produces `promote` with a vLLM to SGLang segment. The checked workload fixture completes four requests in 11 scheduler ticks, peaks at 12 of 20 KV pages, returns all pages on completion, and emits trace @@ -69,6 +77,18 @@ Every tick records: The replay report includes a stable trace fingerprint, peak KV pages, total ticks, and completion count. +## Backend Mirror Adapter + +`runtime-lab mirror` converts backend-specific mirrored observations into a +gate input. `runtime-lab mirror-gate` performs the conversion and immediately +evaluates the release policy. + +The adapter accepts per-request latency, health, model, backend, accelerator, +output token IDs, explicit output fingerprints, and optional numeric output +vectors. Successful observations must carry output material so correctness +checks remain auditable. Token IDs and numeric vectors are converted into +stable FNV-1a fingerprints when an engine-specific fingerprint is not supplied. + ## Release Policy The gate joins mirrored baseline and candidate observations by request ID. diff --git a/artifacts/backend-mirror-report.json b/artifacts/backend-mirror-report.json new file mode 100644 index 0000000..1e4891d --- /dev/null +++ b/artifacts/backend-mirror-report.json @@ -0,0 +1,38 @@ +{ + "schema_version": 2, + "decision": "promote", + "matched_requests": 4, + "baseline_requests": 4, + "candidate_requests": 4, + "coverage_rate": 1.0, + "output_mismatch_rate": 0.0, + "numeric_pairs": 0, + "tolerated_numeric_outputs": 0, + "numeric_drift_rate": 0.0, + "max_numeric_abs_error": null, + "max_numeric_rel_error": null, + "baseline_error_rate": 0.0, + "candidate_error_rate": 0.0, + "error_rate_increase": 0.0, + "baseline_p95_latency_ms": 28.0, + "candidate_p95_latency_ms": 27.2, + "p95_latency_regression_pct": -2.857143, + "segments": [ + { + "model": "decoder-7b", + "baseline_backend": "vllm", + "candidate_backend": "sglang", + "accelerator": "h100", + "matched_requests": 4, + "output_mismatch_rate": 0.0, + "baseline_error_rate": 0.0, + "candidate_error_rate": 0.0, + "baseline_p95_latency_ms": 28.0, + "candidate_p95_latency_ms": 27.2, + "p95_latency_regression_pct": -2.857143 + } + ], + "reasons": [ + "candidate stayed within correctness, reliability, and latency policy" + ] +} diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index adf43f8..ea3fc7c 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -50,3 +50,17 @@ within policy produces `promote`. This is a local validation component, not a deployment controller. Production integration would obtain observations from mirrored traffic, canary populations, telemetry, and an audited rollout system. + +## Backend Mirror Adapter + +The adapter sits before the release gate. It normalizes backend-specific +mirrored observations into `GateInput` without changing the gate policy. This +keeps ingestion concerns separate from rollout decisions. + +The adapter currently accepts compact vLLM/SGLang-style request summaries: +request ID, latency, health, model, backend, accelerator, output token IDs, +optional explicit fingerprints, and optional numeric vectors. If an engine does +not provide a fingerprint, the adapter computes a stable FNV-1a fingerprint +from token IDs or numeric values. Successful observations without output +material are rejected so a candidate cannot be promoted from latency-only +evidence. diff --git a/docs/RELEASE_VALIDATION.md b/docs/RELEASE_VALIDATION.md index 9f496f9..a6e1c2e 100644 --- a/docs/RELEASE_VALIDATION.md +++ b/docs/RELEASE_VALIDATION.md @@ -45,6 +45,20 @@ The report includes: - segment summaries by model, baseline backend, candidate backend, and accelerator. +## Backend Mirror Adapter + +The `mirror` command normalizes request observations from backend-specific +serving traces into the release gate input format. It is intended for mirrored +baseline/candidate comparisons such as vLLM versus SGLang, or a current +runtime versus a candidate runtime behind shadow traffic. + +Each observation records request ID, latency, health, model, backend, +accelerator, and output material. Engines may provide their own +`output_fingerprint`; otherwise the adapter hashes output token IDs or numeric +output vectors with a stable FNV-1a fingerprint. Successful observations +without output material are rejected because the release gate cannot audit +correctness from latency alone. + ## Production Extension Points A real rollout system should add: diff --git a/fixtures/backend_mirror_vllm_sglang.json b/fixtures/backend_mirror_vllm_sglang.json new file mode 100644 index 0000000..4a4da3b --- /dev/null +++ b/fixtures/backend_mirror_vllm_sglang.json @@ -0,0 +1,72 @@ +{ + "thresholds": { + "min_matched_requests": 4, + "max_output_mismatch_rate": 0.0, + "max_error_rate_increase": 0.01, + "max_p95_latency_regression_pct": 10.0, + "max_numeric_drift_rate": 0.0, + "numeric_tolerances": [] + }, + "baseline": { + "backend": "vllm", + "model": "decoder-7b", + "accelerator": "h100", + "observations": [ + { + "request_id": "prompt-a", + "latency_ms": 18.0, + "ok": true, + "output_token_ids": [101, 1402, 13] + }, + { + "request_id": "prompt-b", + "latency_ms": 21.0, + "ok": true, + "output_token_ids": [205, 778, 990] + }, + { + "request_id": "prompt-c", + "latency_ms": 24.0, + "ok": true, + "output_token_ids": [42, 42, 7] + }, + { + "request_id": "prompt-d", + "latency_ms": 28.0, + "ok": true, + "output_token_ids": [301, 302, 303, 2] + } + ] + }, + "candidate": { + "backend": "sglang", + "model": "decoder-7b", + "accelerator": "h100", + "observations": [ + { + "request_id": "prompt-a", + "latency_ms": 17.5, + "ok": true, + "output_token_ids": [101, 1402, 13] + }, + { + "request_id": "prompt-b", + "latency_ms": 20.6, + "ok": true, + "output_token_ids": [205, 778, 990] + }, + { + "request_id": "prompt-c", + "latency_ms": 23.5, + "ok": true, + "output_token_ids": [42, 42, 7] + }, + { + "request_id": "prompt-d", + "latency_ms": 27.2, + "ok": true, + "output_token_ids": [301, 302, 303, 2] + } + ] + } +} diff --git a/src/adapter.rs b/src/adapter.rs new file mode 100644 index 0000000..a782c4d --- /dev/null +++ b/src/adapter.rs @@ -0,0 +1,178 @@ +use std::error::Error; +use std::fmt; + +use serde::{Deserialize, Serialize}; + +use crate::release::{GateInput, GateThresholds, Observation}; + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct BackendMirrorInput { + #[serde(default)] + pub thresholds: GateThresholds, + pub baseline: BackendObservationSet, + pub candidate: BackendObservationSet, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct BackendObservationSet { + pub backend: String, + pub model: String, + #[serde(default)] + pub accelerator: Option, + pub observations: Vec, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct BackendObservation { + pub request_id: String, + pub latency_ms: f64, + #[serde(default)] + pub ok: Option, + #[serde(default)] + pub output_fingerprint: Option, + #[serde(default)] + pub output_token_ids: Vec, + #[serde(default)] + pub output_values: Option>, + #[serde(default)] + pub error: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AdapterError { + message: String, +} + +impl AdapterError { + fn new(message: impl Into) -> Self { + Self { + message: message.into(), + } + } +} + +impl fmt::Display for AdapterError { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter.write_str(&self.message) + } +} + +impl Error for AdapterError {} + +pub fn mirror_to_gate_input(input: &BackendMirrorInput) -> Result { + Ok(GateInput { + thresholds: input.thresholds.clone(), + baseline: normalize_set(&input.baseline)?, + candidate: normalize_set(&input.candidate)?, + }) +} + +fn normalize_set(set: &BackendObservationSet) -> Result, AdapterError> { + if set.backend.trim().is_empty() { + return Err(AdapterError::new("backend must not be empty")); + } + if set.model.trim().is_empty() { + return Err(AdapterError::new("model must not be empty")); + } + + set.observations + .iter() + .map(|observation| normalize_observation(set, observation)) + .collect() +} + +fn normalize_observation( + set: &BackendObservationSet, + observation: &BackendObservation, +) -> Result { + if observation.request_id.trim().is_empty() { + return Err(AdapterError::new("request_id must not be empty")); + } + if !observation.latency_ms.is_finite() || observation.latency_ms < 0.0 { + return Err(AdapterError::new(format!( + "request {} has invalid latency_ms", + observation.request_id + ))); + } + + let ok = observation.ok.unwrap_or_else(|| { + observation + .error + .as_ref() + .is_none_or(|error| error.trim().is_empty()) + }); + let output_fingerprint = output_fingerprint(observation, ok)?; + + Ok(Observation { + request_id: observation.request_id.clone(), + output_fingerprint, + latency_ms: observation.latency_ms, + ok, + model: Some(set.model.clone()), + backend: Some(set.backend.clone()), + accelerator: set.accelerator.clone(), + output_values: observation.output_values.clone(), + }) +} + +fn output_fingerprint(observation: &BackendObservation, ok: bool) -> Result { + if let Some(fingerprint) = observation.output_fingerprint.as_ref() + && !fingerprint.trim().is_empty() + { + return Ok(fingerprint.clone()); + } + + if !observation.output_token_ids.is_empty() { + return Ok(format!( + "tokens-fnv64:{:016x}", + hash_i64_values(&observation.output_token_ids) + )); + } + + if let Some(values) = observation.output_values.as_ref() + && !values.is_empty() + { + return Ok(format!("values-fnv64:{:016x}", hash_f64_values(values))); + } + + if !ok { + return Ok("error".into()); + } + + Err(AdapterError::new(format!( + "request {} is successful but has no output fingerprint, token ids, or numeric values", + observation.request_id + ))) +} + +fn hash_i64_values(values: &[i64]) -> u64 { + let mut hash = FNV_OFFSET_BASIS; + feed_usize(&mut hash, values.len()); + for value in values { + feed_bytes(&mut hash, &value.to_le_bytes()); + } + hash +} + +fn hash_f64_values(values: &[f64]) -> u64 { + let mut hash = FNV_OFFSET_BASIS; + feed_usize(&mut hash, values.len()); + for value in values { + feed_bytes(&mut hash, &value.to_bits().to_le_bytes()); + } + hash +} + +const FNV_OFFSET_BASIS: u64 = 0xcbf2_9ce4_8422_2325; +const FNV_PRIME: u64 = 0x0000_0100_0000_01b3; + +fn feed_usize(hash: &mut u64, value: usize) { + feed_bytes(hash, &value.to_le_bytes()); +} + +fn feed_bytes(hash: &mut u64, bytes: &[u8]) { + for byte in bytes { + *hash ^= u64::from(*byte); + *hash = hash.wrapping_mul(FNV_PRIME); + } +} diff --git a/src/lib.rs b/src/lib.rs index c40e22d..67a26f5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,11 @@ +pub mod adapter; pub mod release; pub mod scheduler; +pub use adapter::{ + AdapterError, BackendMirrorInput, BackendObservation, BackendObservationSet, + mirror_to_gate_input, +}; pub use release::{ GateDecision, GateInput, GateReport, GateThresholds, NumericTolerance, Observation, SegmentReport, evaluate_release, diff --git a/src/main.rs b/src/main.rs index 0c76900..d15b10f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,9 @@ use std::error::Error; use std::fs; use std::path::{Path, PathBuf}; -use rust_inference_runtime::{GateInput, ReplayInput, evaluate_release, run_replay}; +use rust_inference_runtime::{ + BackendMirrorInput, GateInput, ReplayInput, evaluate_release, mirror_to_gate_input, run_replay, +}; use serde::Serialize; use serde::de::DeserializeOwned; @@ -28,6 +30,15 @@ fn run() -> Result<(), Box> { let gate_input: GateInput = read_json(&input)?; write_json(&output, &evaluate_release(&gate_input))?; } + "mirror" => { + let mirror_input: BackendMirrorInput = read_json(&input)?; + write_json(&output, &mirror_to_gate_input(&mirror_input)?)?; + } + "mirror-gate" => { + let mirror_input: BackendMirrorInput = read_json(&input)?; + let gate_input = mirror_to_gate_input(&mirror_input)?; + write_json(&output, &evaluate_release(&gate_input))?; + } _ => return Err(usage().into()), } @@ -73,5 +84,5 @@ fn write_json(path: &Path, value: &impl Serialize) -> Result<(), Box> } fn usage() -> String { - "usage: runtime-lab --input --output ".into() + "usage: runtime-lab --input --output ".into() } diff --git a/tests/adapter.rs b/tests/adapter.rs new file mode 100644 index 0000000..84cfddd --- /dev/null +++ b/tests/adapter.rs @@ -0,0 +1,144 @@ +use rust_inference_runtime::{ + BackendMirrorInput, BackendObservation, BackendObservationSet, GateDecision, GateThresholds, + NumericTolerance, evaluate_release, mirror_to_gate_input, +}; + +fn backend_set(backend: &str, observations: Vec) -> BackendObservationSet { + BackendObservationSet { + backend: backend.into(), + model: "decoder-7b".into(), + accelerator: Some("h100".into()), + observations, + } +} + +fn token_observation(id: &str, latency_ms: f64, token_ids: &[i64]) -> BackendObservation { + BackendObservation { + request_id: id.into(), + latency_ms, + ok: Some(true), + output_fingerprint: None, + output_token_ids: token_ids.to_vec(), + output_values: None, + error: None, + } +} + +fn numeric_observation(id: &str, latency_ms: f64, values: &[f64]) -> BackendObservation { + BackendObservation { + request_id: id.into(), + latency_ms, + ok: Some(true), + output_fingerprint: None, + output_token_ids: Vec::new(), + output_values: Some(values.to_vec()), + error: None, + } +} + +#[test] +fn converts_vllm_and_sglang_mirrors_into_release_gate_input() { + let input = BackendMirrorInput { + thresholds: GateThresholds::default(), + baseline: backend_set( + "vllm", + vec![ + token_observation("prompt-a", 18.0, &[101, 1402, 13]), + token_observation("prompt-b", 21.0, &[205, 778, 990]), + token_observation("prompt-c", 24.0, &[42, 42, 7]), + ], + ), + candidate: backend_set( + "sglang", + vec![ + token_observation("prompt-a", 17.5, &[101, 1402, 13]), + token_observation("prompt-b", 20.6, &[205, 778, 990]), + token_observation("prompt-c", 23.5, &[42, 42, 7]), + ], + ), + }; + + let gate_input = mirror_to_gate_input(&input).expect("valid mirror input"); + assert_eq!(gate_input.baseline[0].backend.as_deref(), Some("vllm")); + assert_eq!(gate_input.candidate[0].backend.as_deref(), Some("sglang")); + assert_eq!( + gate_input.baseline[0].output_fingerprint, + gate_input.candidate[0].output_fingerprint + ); + + let report = evaluate_release(&gate_input); + assert_eq!(report.decision, GateDecision::Promote); + assert_eq!(report.segments.len(), 1); + assert_eq!(report.segments[0].baseline_backend, "vllm"); + assert_eq!(report.segments[0].candidate_backend, "sglang"); +} + +#[test] +fn numeric_mirror_outputs_use_model_backend_tolerance() { + let input = BackendMirrorInput { + thresholds: GateThresholds { + min_matched_requests: 3, + max_output_mismatch_rate: 0.0, + max_error_rate_increase: 0.01, + max_p95_latency_regression_pct: 10.0, + max_numeric_drift_rate: 0.0, + numeric_tolerances: vec![NumericTolerance { + model: Some("decoder-7b".into()), + candidate_backend: Some("sglang".into()), + candidate_accelerator: Some("h100".into()), + max_abs_error: 0.002, + max_rel_error: 0.01, + }], + }, + baseline: backend_set( + "vllm", + vec![ + numeric_observation("prompt-a", 18.0, &[0.125, 0.5, 1.25]), + numeric_observation("prompt-b", 21.0, &[0.25, 0.75, 1.5]), + numeric_observation("prompt-c", 24.0, &[0.375, 1.0, 1.75]), + ], + ), + candidate: backend_set( + "sglang", + vec![ + numeric_observation("prompt-a", 18.1, &[0.126, 0.501, 1.251]), + numeric_observation("prompt-b", 20.9, &[0.251, 0.751, 1.501]), + numeric_observation("prompt-c", 23.8, &[0.376, 1.001, 1.751]), + ], + ), + }; + + let report = evaluate_release(&mirror_to_gate_input(&input).expect("valid mirror input")); + + assert_eq!(report.decision, GateDecision::Promote); + assert_eq!(report.numeric_pairs, 3); + assert_eq!(report.tolerated_numeric_outputs, 3); + assert_eq!(report.output_mismatch_rate, 0.0); +} + +#[test] +fn rejects_successful_observation_without_output_material() { + let input = BackendMirrorInput { + thresholds: GateThresholds::default(), + baseline: backend_set( + "vllm", + vec![BackendObservation { + request_id: "prompt-a".into(), + latency_ms: 18.0, + ok: Some(true), + output_fingerprint: None, + output_token_ids: Vec::new(), + output_values: None, + error: None, + }], + ), + candidate: backend_set("sglang", vec![token_observation("prompt-a", 17.5, &[1])]), + }; + + let error = mirror_to_gate_input(&input).expect_err("missing output must fail"); + assert!( + error + .to_string() + .contains("no output fingerprint, token ids, or numeric values") + ); +}