From 4193fc9ba24c2df27aed026bcf7fefc79697aa40 Mon Sep 17 00:00:00 2001 From: Simo Lin <25425177+slin1237@users.noreply.github.com> Date: Sun, 24 May 2026 11:58:38 -0700 Subject: [PATCH 1/9] feat(worker): introduce capacity module with CapacitySource enum First commit of the WorkerCapacity component. Tracks which precedence tier (Override / WorkerReported / Mixed / LegacyFallback) produced the current capacity value; exposed as a Prometheus gauge in later commits. Signed-off-by: Simo Lin <25425177+slin1237@users.noreply.github.com> --- model_gateway/src/worker/capacity.rs | 57 ++++++++++++++++++++++++++++ model_gateway/src/worker/mod.rs | 1 + 2 files changed, 58 insertions(+) create mode 100644 model_gateway/src/worker/capacity.rs diff --git a/model_gateway/src/worker/capacity.rs b/model_gateway/src/worker/capacity.rs new file mode 100644 index 000000000..1532ae21b --- /dev/null +++ b/model_gateway/src/worker/capacity.rs @@ -0,0 +1,57 @@ +//! Aggregate backend capacity tracking. +//! +//! See `.claude/priority-scheduling/01-worker-capacity-design.md` for the +//! full design rationale. + +/// Which precedence tier produced the most recently computed capacity value. +/// Exposed as a Prometheus gauge so operators can debug capacity decisions. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] +pub enum CapacitySource { + /// Tier 4 — no workers known yet (or unhealthy fleet). Using legacy + /// `--max-concurrent-requests` value as a fallback. + LegacyFallback = 0, + /// Tier 1 — operator pinned via `--worker-capacity-override`. + Override = 1, + /// Tier 2 — every healthy worker reported `max_running_requests`. Sum + /// across the fleet. + WorkerReported = 2, + /// Tier 3 — at least one worker did not report; reporters contribute + /// their reported value, non-reporters contribute `slots_per_worker`. + /// Also used when no worker reports but workers exist (pure tier 3). + Mixed = 3, +} + +impl CapacitySource { + pub fn from_u8(raw: u8) -> Self { + match raw { + 1 => Self::Override, + 2 => Self::WorkerReported, + 3 => Self::Mixed, + _ => Self::LegacyFallback, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_capacity_source_round_trips_through_u8() { + for src in [ + CapacitySource::LegacyFallback, + CapacitySource::Override, + CapacitySource::WorkerReported, + CapacitySource::Mixed, + ] { + let raw = src as u8; + assert_eq!(CapacitySource::from_u8(raw), src); + } + } + + #[test] + fn test_capacity_source_unknown_u8_decodes_to_legacy() { + assert_eq!(CapacitySource::from_u8(99), CapacitySource::LegacyFallback); + } +} diff --git a/model_gateway/src/worker/mod.rs b/model_gateway/src/worker/mod.rs index 11da2b23c..6fd546d1e 100644 --- a/model_gateway/src/worker/mod.rs +++ b/model_gateway/src/worker/mod.rs @@ -1,6 +1,7 @@ //! Worker domain — identity, registry, health, resilience, monitoring, service. pub mod builder; +pub mod capacity; pub mod circuit_breaker; pub mod error; pub mod event; From b20fbf826ee7db71693458b427905d87208118bc Mon Sep 17 00:00:00 2001 From: Simo Lin <25425177+slin1237@users.noreply.github.com> Date: Sun, 24 May 2026 12:02:09 -0700 Subject: [PATCH 2/9] feat(worker): add CapacityTrackerSettings CLI-flag-shaped config for WorkerCapacity: override (i32, 0 = derive), slots_per_worker default 64, legacy_max_concurrent_requests default 1024. Signed-off-by: Simo Lin <25425177+slin1237@users.noreply.github.com> --- model_gateway/src/worker/capacity.rs | 61 ++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/model_gateway/src/worker/capacity.rs b/model_gateway/src/worker/capacity.rs index 1532ae21b..4c6704f3f 100644 --- a/model_gateway/src/worker/capacity.rs +++ b/model_gateway/src/worker/capacity.rs @@ -33,6 +33,47 @@ impl CapacitySource { } } +/// Configuration for the WorkerCapacity tracker. +/// +/// Built once at gateway startup from CLI flags. None of the fields +/// change at runtime (use a future `ArcSwap` if hot-reload +/// is ever needed). +#[derive(Debug, Clone)] +pub struct CapacityTrackerSettings { + /// Tier 1 override. `Some(n)` with `n > 0` pins capacity to `n` + /// regardless of the fleet. `None` (or originally-zero) means + /// "derive from workers." + pub override_capacity: Option, + /// Per-worker slot count used for tier 3 (mixed) and pure tier-3 + /// (workers known, none report). + pub slots_per_worker: u16, + /// Tier 4 fallback when no healthy workers are known. + /// Typically sourced from the existing `--max-concurrent-requests` flag. + pub legacy_max_concurrent_requests: u16, +} + +impl CapacityTrackerSettings { + /// Constructor that maps an `i32` override (0 = derive) to the + /// internal `Option` representation. Matches the shape of + /// the `--worker-capacity-override` CLI flag. + pub fn with_override(raw: i32) -> Self { + Self { + override_capacity: u16::try_from(raw).ok().filter(|n| *n > 0), + ..Self::default() + } + } +} + +impl Default for CapacityTrackerSettings { + fn default() -> Self { + Self { + override_capacity: None, + slots_per_worker: 64, + legacy_max_concurrent_requests: 1024, + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -54,4 +95,24 @@ mod tests { fn test_capacity_source_unknown_u8_decodes_to_legacy() { assert_eq!(CapacitySource::from_u8(99), CapacitySource::LegacyFallback); } + + #[test] + fn test_settings_default_has_sensible_values() { + let s = CapacityTrackerSettings::default(); + assert_eq!(s.override_capacity, None); + assert_eq!(s.slots_per_worker, 64); + assert_eq!(s.legacy_max_concurrent_requests, 1024); + } + + #[test] + fn test_settings_override_zero_treated_as_none() { + let s = CapacityTrackerSettings::with_override(0); + assert!(s.override_capacity.is_none(), "0 is the sentinel for 'derive'"); + } + + #[test] + fn test_settings_override_nonzero_preserved() { + let s = CapacityTrackerSettings::with_override(2048); + assert_eq!(s.override_capacity, Some(2048)); + } } From 61f95409dc9d6ccf4f1d03e63b6ca87b8cadd183 Mon Sep 17 00:00:00 2001 From: Simo Lin <25425177+slin1237@users.noreply.github.com> Date: Sun, 24 May 2026 12:04:37 -0700 Subject: [PATCH 3/9] feat(worker): implement 4-tier recompute for WorkerCapacity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pure function mapping (settings, healthy workers) -> (capacity, source). Tiers in precedence order: Override, WorkerReported, Mixed, LegacyFallback. The Mixed tier covers both "some workers reported, some didn't" and "workers exist but none report" — non-reporters contribute slots_per_worker each, reporters contribute their reported value. Saturating arithmetic caps at u16::MAX. Six unit tests exercise all four tiers plus the u16 saturation boundary. Signed-off-by: Simo Lin <25425177+slin1237@users.noreply.github.com> --- model_gateway/src/worker/capacity.rs | 149 +++++++++++++++++++++++++++ 1 file changed, 149 insertions(+) diff --git a/model_gateway/src/worker/capacity.rs b/model_gateway/src/worker/capacity.rs index 4c6704f3f..bc3128a04 100644 --- a/model_gateway/src/worker/capacity.rs +++ b/model_gateway/src/worker/capacity.rs @@ -3,6 +3,10 @@ //! See `.claude/priority-scheduling/01-worker-capacity-design.md` for the //! full design rationale. +use std::sync::Arc; + +use super::Worker; + /// Which precedence tier produced the most recently computed capacity value. /// Exposed as a Prometheus gauge so operators can debug capacity decisions. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -74,6 +78,58 @@ impl Default for CapacityTrackerSettings { } } +/// Compute capacity and source tier from settings + a snapshot of healthy workers. +/// +/// Pure function: no I/O, no atomics. Easily unit-testable. +/// Callers are responsible for filtering `workers` to only healthy ones. +pub(super) fn recompute( + settings: &CapacityTrackerSettings, + workers: &[Arc], +) -> (u16, CapacitySource) { + // Tier 1: operator override always wins. + if let Some(n) = settings.override_capacity { + if n > 0 { + return (n, CapacitySource::Override); + } + } + + let total_workers = workers.len(); + if total_workers == 0 { + return ( + settings.legacy_max_concurrent_requests, + CapacitySource::LegacyFallback, + ); + } + + let mut sum_reported: u32 = 0; + let mut reporters: usize = 0; + let mut non_reporters: usize = 0; + for w in workers { + match w.max_running_requests() { + Some(n) => { + sum_reported = sum_reported.saturating_add(u32::from(n)); + reporters += 1; + } + None => non_reporters += 1, + } + } + + if non_reporters == 0 { + // Tier 2: every worker reported. + let capped = sum_reported.min(u32::from(u16::MAX)) as u16; + return (capped, CapacitySource::WorkerReported); + } + + // Tier 3 (Mixed): reporters contribute reported, non-reporters contribute slots_per_worker. + // Also covers "zero reporters" since non_reporters > 0 here. + let _ = reporters; // reporters count is implied; non_reporters drives the formula + let from_non_reporters = + (non_reporters as u32).saturating_mul(u32::from(settings.slots_per_worker)); + let total = sum_reported.saturating_add(from_non_reporters); + let capped = total.min(u32::from(u16::MAX)) as u16; + (capped, CapacitySource::Mixed) +} + #[cfg(test)] mod tests { use super::*; @@ -115,4 +171,97 @@ mod tests { let s = CapacityTrackerSettings::with_override(2048); assert_eq!(s.override_capacity, Some(2048)); } + + use std::collections::HashMap; + + use crate::worker::BasicWorkerBuilder; + + fn worker_with_capacity( + url: &str, + reported: Option, + ) -> std::sync::Arc { + let mut labels = HashMap::new(); + if let Some(n) = reported { + labels.insert("max_running_requests".to_string(), n.to_string()); + } + std::sync::Arc::new(BasicWorkerBuilder::new(url).labels(labels).build()) + } + + #[test] + fn test_recompute_tier1_override_wins_even_with_workers() { + let settings = CapacityTrackerSettings { + override_capacity: Some(512), + ..Default::default() + }; + let workers = vec![worker_with_capacity("http://w1", Some(256))]; + let (capacity, source) = recompute(&settings, &workers); + assert_eq!(capacity, 512); + assert_eq!(source, CapacitySource::Override); + } + + #[test] + fn test_recompute_tier4_no_workers_uses_legacy_fallback() { + let settings = CapacityTrackerSettings { + legacy_max_concurrent_requests: 256, + ..Default::default() + }; + let (capacity, source) = recompute(&settings, &[]); + assert_eq!(capacity, 256); + assert_eq!(source, CapacitySource::LegacyFallback); + } + + #[test] + fn test_recompute_tier2_all_workers_report() { + let settings = CapacityTrackerSettings::default(); + let workers = vec![ + worker_with_capacity("http://w1", Some(256)), + worker_with_capacity("http://w2", Some(128)), + ]; + let (capacity, source) = recompute(&settings, &workers); + assert_eq!(capacity, 384); + assert_eq!(source, CapacitySource::WorkerReported); + } + + #[test] + fn test_recompute_tier3_mixed_reporters_and_non_reporters() { + let settings = CapacityTrackerSettings { + slots_per_worker: 64, + ..Default::default() + }; + let workers = vec![ + worker_with_capacity("http://w1", Some(256)), + worker_with_capacity("http://w2", None), + ]; + let (capacity, source) = recompute(&settings, &workers); + assert_eq!(capacity, 256 + 64); + assert_eq!(source, CapacitySource::Mixed); + } + + #[test] + fn test_recompute_tier3_zero_reporters_uses_worker_count() { + let settings = CapacityTrackerSettings { + slots_per_worker: 64, + ..Default::default() + }; + let workers = vec![ + worker_with_capacity("http://w1", None), + worker_with_capacity("http://w2", None), + worker_with_capacity("http://w3", None), + ]; + let (capacity, source) = recompute(&settings, &workers); + assert_eq!(capacity, 3 * 64); + assert_eq!(source, CapacitySource::Mixed); + } + + #[test] + fn test_recompute_saturates_at_u16_max() { + let settings = CapacityTrackerSettings::default(); + // 1000 workers × 100 slots = 100_000, exceeds u16::MAX (65_535). + let workers: Vec<_> = (0..1000) + .map(|i| worker_with_capacity(&format!("http://w{i}"), Some(100))) + .collect(); + let (capacity, source) = recompute(&settings, &workers); + assert_eq!(capacity, u16::MAX); + assert_eq!(source, CapacitySource::WorkerReported); + } } From dc44e6f2ef20632ca97fd9063b13879e668e89d6 Mon Sep 17 00:00:00 2001 From: Simo Lin <25425177+slin1237@users.noreply.github.com> Date: Sun, 24 May 2026 12:08:24 -0700 Subject: [PATCH 4/9] feat(worker): add WorkerCapacity struct with read accessors current() returns the latest computed capacity; source() returns the tier label; watch() returns a tokio::watch::Receiver for reactive consumers. Event task and spawn() constructor land in next commits. A test-only `for_test_with_value` constructor lets unit tests exercise the read API without spinning up a WorkerRegistry / event task. Signed-off-by: Simo Lin <25425177+slin1237@users.noreply.github.com> --- model_gateway/src/worker/capacity.rs | 71 +++++++++++++++++++++++++++- 1 file changed, 69 insertions(+), 2 deletions(-) diff --git a/model_gateway/src/worker/capacity.rs b/model_gateway/src/worker/capacity.rs index bc3128a04..ef2f64ba1 100644 --- a/model_gateway/src/worker/capacity.rs +++ b/model_gateway/src/worker/capacity.rs @@ -3,9 +3,14 @@ //! See `.claude/priority-scheduling/01-worker-capacity-design.md` for the //! full design rationale. -use std::sync::Arc; +use std::sync::{ + atomic::{AtomicU16, AtomicU8, Ordering}, + Arc, +}; -use super::Worker; +use tokio::sync::watch; + +use super::{registry::WorkerRegistry, Worker}; /// Which precedence tier produced the most recently computed capacity value. /// Exposed as a Prometheus gauge so operators can debug capacity decisions. @@ -78,6 +83,54 @@ impl Default for CapacityTrackerSettings { } } +/// Tracks aggregate backend capacity derived from a `WorkerRegistry`. +/// +/// One supervised tokio task subscribes to worker lifecycle events and +/// recomputes capacity by the 4-tier precedence (see `recompute`). +/// Consumers read the current value via `current()` or react to changes +/// via `watch()`. +pub struct WorkerCapacity { + capacity: AtomicU16, + source: AtomicU8, + watch_tx: watch::Sender, + // Held for the lifetime of the tracker so the registry stays alive + // as long as the event task references it. Set to None in test builders. + _registry: Option>, + _settings: CapacityTrackerSettings, +} + +impl WorkerCapacity { + /// Synchronous current capacity. Hot-path safe — single atomic load. + #[inline] + pub fn current(&self) -> u16 { + self.capacity.load(Ordering::Acquire) + } + + /// Which tier produced the current capacity. + pub fn source(&self) -> CapacitySource { + CapacitySource::from_u8(self.source.load(Ordering::Acquire)) + } + + /// Receiver for reacting to capacity changes. Cheap to clone. + pub fn watch(&self) -> watch::Receiver { + self.watch_tx.subscribe() + } + + /// Test-only constructor that bypasses the event task. Used in unit tests + /// that only need to exercise the read API. + #[cfg(test)] + pub(crate) fn for_test_with_value(capacity: u16, source: CapacitySource) -> Arc { + let (tx, _rx) = watch::channel(capacity); + Arc::new(Self { + capacity: AtomicU16::new(capacity), + source: AtomicU8::new(source as u8), + watch_tx: tx, + _registry: None, + _settings: CapacityTrackerSettings::default(), + }) + } +} + /// Compute capacity and source tier from settings + a snapshot of healthy workers. /// /// Pure function: no I/O, no atomics. Easily unit-testable. @@ -264,4 +317,18 @@ mod tests { assert_eq!(capacity, u16::MAX); assert_eq!(source, CapacitySource::WorkerReported); } + + #[test] + fn test_worker_capacity_initial_value_visible_via_current() { + let tracker = WorkerCapacity::for_test_with_value(777, CapacitySource::Override); + assert_eq!(tracker.current(), 777); + assert_eq!(tracker.source(), CapacitySource::Override); + } + + #[test] + fn test_worker_capacity_watch_returns_current_value() { + let tracker = WorkerCapacity::for_test_with_value(123, CapacitySource::Mixed); + let rx = tracker.watch(); + assert_eq!(*rx.borrow(), 123); + } } From d519c0d0d8447d5874174b91a892a7b8ba1d6d5d Mon Sep 17 00:00:00 2001 From: Simo Lin <25425177+slin1237@users.noreply.github.com> Date: Sun, 24 May 2026 12:09:25 -0700 Subject: [PATCH 5/9] feat(worker): WorkerCapacity::spawn computes initial value Synchronous initial compute against a snapshot of the registry's healthy workers, so callers see a valid `current()` immediately after spawn. Event-loop task wiring lands in the next commit. Signed-off-by: Simo Lin <25425177+slin1237@users.noreply.github.com> --- model_gateway/src/worker/capacity.rs | 68 ++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/model_gateway/src/worker/capacity.rs b/model_gateway/src/worker/capacity.rs index ef2f64ba1..1830ea854 100644 --- a/model_gateway/src/worker/capacity.rs +++ b/model_gateway/src/worker/capacity.rs @@ -129,6 +129,45 @@ impl WorkerCapacity { _settings: CapacityTrackerSettings::default(), }) } + + /// Construct a `WorkerCapacity`, compute the initial value + /// synchronously, and spawn the supervised event-loop task. + /// + /// The returned `Arc` is referenced both by the event task + /// (which holds it until the registry is dropped) and by any + /// number of consumers. + pub fn spawn( + registry: Arc, + settings: CapacityTrackerSettings, + ) -> Arc { + // Initial compute: synchronous over the current registry state, + // so callers see a valid `current()` immediately after spawn. + let workers = healthy_workers(®istry); + let (initial_capacity, initial_source) = recompute(&settings, &workers); + + let (watch_tx, _initial_rx) = watch::channel(initial_capacity); + + let this = Arc::new(Self { + capacity: AtomicU16::new(initial_capacity), + source: AtomicU8::new(initial_source as u8), + watch_tx, + _registry: Some(registry), + _settings: settings, + }); + + // Event task wiring lands in the next commit. + this + } +} + +/// Filter the registry to healthy workers only. Allocates a Vec so we +/// don't hold the registry's internal locks while iterating. +fn healthy_workers(registry: &WorkerRegistry) -> Vec> { + registry + .get_all() + .into_iter() + .filter(|w| w.is_healthy()) + .collect() } /// Compute capacity and source tier from settings + a snapshot of healthy workers. @@ -331,4 +370,33 @@ mod tests { let rx = tracker.watch(); assert_eq!(*rx.borrow(), 123); } + + use crate::worker::WorkerRegistry; + + #[tokio::test] + async fn test_spawn_computes_initial_capacity_from_empty_registry() { + let registry = Arc::new(WorkerRegistry::new()); + let settings = CapacityTrackerSettings { + legacy_max_concurrent_requests: 999, + ..Default::default() + }; + let tracker = WorkerCapacity::spawn(registry, settings); + + // Empty registry → tier 4 fallback. + assert_eq!(tracker.current(), 999); + assert_eq!(tracker.source(), CapacitySource::LegacyFallback); + } + + #[tokio::test] + async fn test_spawn_watch_starts_at_initial_value() { + let registry = Arc::new(WorkerRegistry::new()); + let settings = CapacityTrackerSettings { + legacy_max_concurrent_requests: 42, + ..Default::default() + }; + let tracker = WorkerCapacity::spawn(registry, settings); + + let rx = tracker.watch(); + assert_eq!(*rx.borrow(), 42); + } } From 2793e7264d71130fd4225bd445853e481ebf3218 Mon Sep 17 00:00:00 2001 From: Simo Lin <25425177+slin1237@users.noreply.github.com> Date: Sun, 24 May 2026 12:11:01 -0700 Subject: [PATCH 6/9] feat(worker): wire WorkerCapacity event task with supervised restart MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Spawns a tokio task that subscribes to WorkerRegistry::subscribe_events, recomputes capacity on each event, updates the atomic + watch channel. Panic in the inner loop restarts after 1s. broadcast::error::RecvError:: Lagged triggers a re-snapshot; Closed (registry dropped) exits cleanly. Integration test exercises the full path: register a worker reporting 256 capacity → watch channel signals change → tracker.current() = 256, source = WorkerReported. Signed-off-by: Simo Lin <25425177+slin1237@users.noreply.github.com> --- model_gateway/src/worker/capacity.rs | 119 ++++++++++++++++++++++++++- 1 file changed, 116 insertions(+), 3 deletions(-) diff --git a/model_gateway/src/worker/capacity.rs b/model_gateway/src/worker/capacity.rs index 1830ea854..bfb65aaca 100644 --- a/model_gateway/src/worker/capacity.rs +++ b/model_gateway/src/worker/capacity.rs @@ -8,6 +8,7 @@ use std::sync::{ Arc, }; +use futures::FutureExt as _; use tokio::sync::watch; use super::{registry::WorkerRegistry, Worker}; @@ -151,11 +152,40 @@ impl WorkerCapacity { capacity: AtomicU16::new(initial_capacity), source: AtomicU8::new(initial_source as u8), watch_tx, - _registry: Some(registry), - _settings: settings, + _registry: Some(registry.clone()), + _settings: settings.clone(), + }); + + // Supervised loop: any panic in the inner future restarts the task + // after a 1s backoff. Graceful exit (Ok) breaks out. + let task_self = this.clone(); + let task_registry = registry; + let task_settings = settings; + #[expect( + clippy::disallowed_methods, + reason = "supervised long-lived task: panics are caught and restarted, lifetime tied to the registry" + )] + tokio::spawn(async move { + loop { + let result = std::panic::AssertUnwindSafe(run_event_loop( + task_self.clone(), + task_registry.clone(), + task_settings.clone(), + )) + .catch_unwind() + .await; + match result { + Ok(()) => break, + Err(_) => { + tracing::error!( + "WorkerCapacity event task panicked; restarting in 1s" + ); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } + } }); - // Event task wiring lands in the next commit. this } } @@ -170,6 +200,50 @@ fn healthy_workers(registry: &WorkerRegistry) -> Vec> { .collect() } +async fn run_event_loop( + tracker: Arc, + registry: Arc, + settings: CapacityTrackerSettings, +) { + use tokio::sync::broadcast::error::RecvError; + + let mut events = registry.subscribe_events(); + loop { + let workers = healthy_workers(®istry); + let (new_capacity, new_source) = recompute(&settings, &workers); + + let old_capacity = tracker.capacity.swap(new_capacity, Ordering::AcqRel); + tracker.source.store(new_source as u8, Ordering::Release); + + if new_capacity != old_capacity { + // send() returns Err if there are no subscribers; we don't care. + let _ = tracker.watch_tx.send(new_capacity); + tracing::info!( + old = old_capacity, + new = new_capacity, + source = ?new_source, + "backend capacity updated" + ); + } + + // Wait for the next worker lifecycle event. + match events.recv().await { + Ok(_event) => continue, + Err(RecvError::Lagged(n)) => { + tracing::warn!( + skipped = n, + "WorkerCapacity event task lagged; recomputing from snapshot" + ); + continue; + } + Err(RecvError::Closed) => { + tracing::info!("WorkerCapacity event task exiting: registry dropped"); + break; + } + } + } +} + /// Compute capacity and source tier from settings + a snapshot of healthy workers. /// /// Pure function: no I/O, no atomics. Easily unit-testable. @@ -371,8 +445,47 @@ mod tests { assert_eq!(*rx.borrow(), 123); } + use std::time::Duration; + + use openai_protocol::worker::WorkerStatus; + use tokio::time::timeout; + use crate::worker::WorkerRegistry; + #[tokio::test] + async fn test_event_task_recomputes_on_registered() { + let registry = Arc::new(WorkerRegistry::new()); + let settings = CapacityTrackerSettings { + slots_per_worker: 64, + legacy_max_concurrent_requests: 0, + ..Default::default() + }; + let tracker = WorkerCapacity::spawn(registry.clone(), settings); + + // Initial: 0 workers → tier 4 fallback (0). + assert_eq!(tracker.current(), 0); + + let mut rx = tracker.watch(); + rx.borrow_and_update(); + + // Register a worker reporting 256 capacity. + let mut labels = HashMap::new(); + labels.insert("max_running_requests".to_string(), "256".to_string()); + let worker: Arc = Arc::new( + BasicWorkerBuilder::new("http://w1:8000") + .labels(labels) + .build(), + ); + let worker_id = registry.register(worker).expect("registered"); + registry.transition_status(&worker_id, WorkerStatus::Ready); + + // Wait for the watch channel to update. + let changed = timeout(Duration::from_secs(2), rx.changed()).await; + assert!(changed.is_ok(), "watch channel did not signal a change"); + assert_eq!(*rx.borrow(), 256); + assert_eq!(tracker.source(), CapacitySource::WorkerReported); + } + #[tokio::test] async fn test_spawn_computes_initial_capacity_from_empty_registry() { let registry = Arc::new(WorkerRegistry::new()); From 385b0333b1d2dd18bd0e23ee028b8672cc0af0a2 Mon Sep 17 00:00:00 2001 From: Simo Lin <25425177+slin1237@users.noreply.github.com> Date: Sun, 24 May 2026 12:12:16 -0700 Subject: [PATCH 7/9] feat(worker): re-export WorkerCapacity from worker module Allows other modules to import via 'crate::worker::WorkerCapacity', 'crate::worker::CapacitySource', 'crate::worker::CapacityTrackerSettings' matching the pattern used for WorkerRegistry and other worker types. Signed-off-by: Simo Lin <25425177+slin1237@users.noreply.github.com> --- model_gateway/src/worker/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/model_gateway/src/worker/mod.rs b/model_gateway/src/worker/mod.rs index 6fd546d1e..5a6e0fa5b 100644 --- a/model_gateway/src/worker/mod.rs +++ b/model_gateway/src/worker/mod.rs @@ -26,6 +26,7 @@ pub mod worker; // Re-export commonly used types for convenience pub use builder::BasicWorkerBuilder; +pub use capacity::{CapacitySource, CapacityTrackerSettings, WorkerCapacity}; pub use circuit_breaker::{CircuitBreaker, CircuitBreakerConfig}; pub use error::{WorkerError, WorkerResult}; pub use hash_ring::HashRing; From c5a4d6e9de70553d81e4620d11a93740b9dcf05b Mon Sep 17 00:00:00 2001 From: Simo Lin <25425177+slin1237@users.noreply.github.com> Date: Sun, 24 May 2026 12:16:22 -0700 Subject: [PATCH 8/9] style(worker): rustfmt + clippy fixes for capacity module - nightly rustfmt: split a long assert! across multiple lines - clippy::unnecessary_qualification: drop redundant std::sync:: and crate::worker:: prefixes in the test module (Arc and Worker are already in scope via `use super::*;`) Signed-off-by: Simo Lin <25425177+slin1237@users.noreply.github.com> --- model_gateway/src/worker/capacity.rs | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/model_gateway/src/worker/capacity.rs b/model_gateway/src/worker/capacity.rs index bfb65aaca..c79cc20aa 100644 --- a/model_gateway/src/worker/capacity.rs +++ b/model_gateway/src/worker/capacity.rs @@ -137,10 +137,7 @@ impl WorkerCapacity { /// The returned `Arc` is referenced both by the event task /// (which holds it until the registry is dropped) and by any /// number of consumers. - pub fn spawn( - registry: Arc, - settings: CapacityTrackerSettings, - ) -> Arc { + pub fn spawn(registry: Arc, settings: CapacityTrackerSettings) -> Arc { // Initial compute: synchronous over the current registry state, // so callers see a valid `current()` immediately after spawn. let workers = healthy_workers(®istry); @@ -177,9 +174,7 @@ impl WorkerCapacity { match result { Ok(()) => break, Err(_) => { - tracing::error!( - "WorkerCapacity event task panicked; restarting in 1s" - ); + tracing::error!("WorkerCapacity event task panicked; restarting in 1s"); tokio::time::sleep(std::time::Duration::from_secs(1)).await; } } @@ -329,7 +324,10 @@ mod tests { #[test] fn test_settings_override_zero_treated_as_none() { let s = CapacityTrackerSettings::with_override(0); - assert!(s.override_capacity.is_none(), "0 is the sentinel for 'derive'"); + assert!( + s.override_capacity.is_none(), + "0 is the sentinel for 'derive'" + ); } #[test] @@ -342,15 +340,12 @@ mod tests { use crate::worker::BasicWorkerBuilder; - fn worker_with_capacity( - url: &str, - reported: Option, - ) -> std::sync::Arc { + fn worker_with_capacity(url: &str, reported: Option) -> Arc { let mut labels = HashMap::new(); if let Some(n) = reported { labels.insert("max_running_requests".to_string(), n.to_string()); } - std::sync::Arc::new(BasicWorkerBuilder::new(url).labels(labels).build()) + Arc::new(BasicWorkerBuilder::new(url).labels(labels).build()) } #[test] @@ -471,7 +466,7 @@ mod tests { // Register a worker reporting 256 capacity. let mut labels = HashMap::new(); labels.insert("max_running_requests".to_string(), "256".to_string()); - let worker: Arc = Arc::new( + let worker: Arc = Arc::new( BasicWorkerBuilder::new("http://w1:8000") .labels(labels) .build(), From 238319e94e9bf05885e8a62cdabfe41b7c0640f2 Mon Sep 17 00:00:00 2001 From: Simo Lin <25425177+slin1237@users.noreply.github.com> Date: Sun, 24 May 2026 13:11:47 -0700 Subject: [PATCH 9/9] fix(worker): break Arc cycle and clean up unused fields in WorkerCapacity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses review feedback on PR #1534: - Memory leak / Arc cycle (Gemini, high): the event task held an Arc while WorkerCapacity held an Arc via the `_registry` field. The task only exited on RecvError::Closed, which required the registry to be dropped — which could not happen because the task itself kept it alive. Fix: the task now holds Weak references to both the tracker and the registry, and upgrades them per iteration. The struct drops the `_registry` and `_settings` fields (the task owns its own settings clone). - Source-only changes silently ignored (Gemini, medium): log on either capacity OR source change. The watch channel still only signals on capacity change (it carries u16), but operators see source transitions in logs. - Panic payload discarded (Claude nit): extract the panic message in the catch_unwind handler instead of `Err(_)`. Makes production diagnostics actually useful. - with_override silently drops out-of-range values (Claude nit): --worker-capacity-override 70000 now logs a warning that the value is out of u16 range and the override is being ignored. - Dead `reporters` counter (CodeRabbit): the variable was incremented but never read. Removed. - Adds an integration test for the primary safety property: capacity must shrink when a worker transitions to NotReady. Covers register-two → confirm 512 → mark one NotReady → confirm < 512. 17 unit tests, all 897 lib tests pass locally; fmt + clippy clean. Signed-off-by: Simo Lin <25425177+slin1237@users.noreply.github.com> --- model_gateway/src/worker/capacity.rs | 188 +++++++++++++++++++++------ 1 file changed, 150 insertions(+), 38 deletions(-) diff --git a/model_gateway/src/worker/capacity.rs b/model_gateway/src/worker/capacity.rs index c79cc20aa..0fccd6d9d 100644 --- a/model_gateway/src/worker/capacity.rs +++ b/model_gateway/src/worker/capacity.rs @@ -5,7 +5,7 @@ use std::sync::{ atomic::{AtomicU16, AtomicU8, Ordering}, - Arc, + Arc, Weak, }; use futures::FutureExt as _; @@ -66,9 +66,21 @@ impl CapacityTrackerSettings { /// Constructor that maps an `i32` override (0 = derive) to the /// internal `Option` representation. Matches the shape of /// the `--worker-capacity-override` CLI flag. + /// + /// Values that don't fit in `u16` (negative or > 65 535) fall back + /// to "derive from workers" *and* log a warning so operators notice + /// that their override was ignored. pub fn with_override(raw: i32) -> Self { + let override_capacity = u16::try_from(raw).ok().filter(|n| *n > 0); + if override_capacity.is_none() && raw != 0 { + tracing::warn!( + value = raw, + max = u16::MAX, + "worker-capacity-override out of u16 range; falling back to dynamic capacity" + ); + } Self { - override_capacity: u16::try_from(raw).ok().filter(|n| *n > 0), + override_capacity, ..Self::default() } } @@ -90,14 +102,17 @@ impl Default for CapacityTrackerSettings { /// recomputes capacity by the 4-tier precedence (see `recompute`). /// Consumers read the current value via `current()` or react to changes /// via `watch()`. +/// +/// **Lifecycle.** The struct itself holds no `Arc` to the registry or +/// to its own settings; the event task holds `Weak` references and +/// upgrades them per iteration. When the caller drops their last +/// `Arc`, the next event in the task triggers an exit +/// (or the task exits on `RecvError::Closed` when the registry itself +/// drops). This avoids the obvious Arc cycle. pub struct WorkerCapacity { capacity: AtomicU16, source: AtomicU8, watch_tx: watch::Sender, - // Held for the lifetime of the tracker so the registry stays alive - // as long as the event task references it. Set to None in test builders. - _registry: Option>, - _settings: CapacityTrackerSettings, } impl WorkerCapacity { @@ -126,17 +141,16 @@ impl WorkerCapacity { capacity: AtomicU16::new(capacity), source: AtomicU8::new(source as u8), watch_tx: tx, - _registry: None, - _settings: CapacityTrackerSettings::default(), }) } /// Construct a `WorkerCapacity`, compute the initial value /// synchronously, and spawn the supervised event-loop task. /// - /// The returned `Arc` is referenced both by the event task - /// (which holds it until the registry is dropped) and by any - /// number of consumers. + /// The task holds only `Weak` references to the registry and tracker + /// (and an owned copy of the settings). When the caller drops their + /// last `Arc` or the registry is dropped, the task + /// exits on the next iteration — no Arc cycle. pub fn spawn(registry: Arc, settings: CapacityTrackerSettings) -> Arc { // Initial compute: synchronous over the current registry state, // so callers see a valid `current()` immediately after spawn. @@ -149,32 +163,39 @@ impl WorkerCapacity { capacity: AtomicU16::new(initial_capacity), source: AtomicU8::new(initial_source as u8), watch_tx, - _registry: Some(registry.clone()), - _settings: settings.clone(), }); // Supervised loop: any panic in the inner future restarts the task // after a 1s backoff. Graceful exit (Ok) breaks out. - let task_self = this.clone(); - let task_registry = registry; + let weak_tracker = Arc::downgrade(&this); + let weak_registry = Arc::downgrade(®istry); let task_settings = settings; #[expect( clippy::disallowed_methods, - reason = "supervised long-lived task: panics are caught and restarted, lifetime tied to the registry" + reason = "supervised long-lived task: panics are caught and restarted; holds Weak refs so it cannot keep the tracker or registry alive" )] tokio::spawn(async move { loop { let result = std::panic::AssertUnwindSafe(run_event_loop( - task_self.clone(), - task_registry.clone(), + weak_tracker.clone(), + weak_registry.clone(), task_settings.clone(), )) .catch_unwind() .await; match result { Ok(()) => break, - Err(_) => { - tracing::error!("WorkerCapacity event task panicked; restarting in 1s"); + Err(payload) => { + let msg = payload + .downcast_ref::<&str>() + .copied() + .map(String::from) + .or_else(|| payload.downcast_ref::().cloned()) + .unwrap_or_else(|| "(non-string panic)".into()); + tracing::error!( + panic.message = %msg, + "WorkerCapacity event task panicked; restarting in 1s" + ); tokio::time::sleep(std::time::Duration::from_secs(1)).await; } } @@ -196,32 +217,66 @@ fn healthy_workers(registry: &WorkerRegistry) -> Vec> { } async fn run_event_loop( - tracker: Arc, - registry: Arc, + tracker: Weak, + registry: Weak, settings: CapacityTrackerSettings, ) { use tokio::sync::broadcast::error::RecvError; - let mut events = registry.subscribe_events(); + // Acquire the receiver up front. If the registry is already gone we + // have nothing to do; the receiver does not keep the registry alive. + let mut events = match registry.upgrade() { + Some(r) => r.subscribe_events(), + None => return, + }; + loop { - let workers = healthy_workers(®istry); + // Upgrade weak refs briefly to do the recompute. If either is gone, + // exit cleanly. Holding only Weak means the task itself cannot keep + // the tracker or registry alive. + let Some(t) = tracker.upgrade() else { + tracing::info!("WorkerCapacity event task exiting: tracker dropped"); + break; + }; + let Some(r) = registry.upgrade() else { + tracing::info!("WorkerCapacity event task exiting: registry dropped"); + break; + }; + + let workers = healthy_workers(&r); let (new_capacity, new_source) = recompute(&settings, &workers); - let old_capacity = tracker.capacity.swap(new_capacity, Ordering::AcqRel); - tracker.source.store(new_source as u8, Ordering::Release); + let old_capacity = t.capacity.swap(new_capacity, Ordering::AcqRel); + let old_source_raw = t.source.swap(new_source as u8, Ordering::AcqRel); + let new_source_raw = new_source as u8; - if new_capacity != old_capacity { + let capacity_changed = new_capacity != old_capacity; + let source_changed = new_source_raw != old_source_raw; + + if capacity_changed { // send() returns Err if there are no subscribers; we don't care. - let _ = tracker.watch_tx.send(new_capacity); + // The stored watch value is still updated, so late subscribers + // see the latest. + let _ = t.watch_tx.send(new_capacity); + } + + if capacity_changed || source_changed { tracing::info!( old = old_capacity, new = new_capacity, - source = ?new_source, + old_source = ?CapacitySource::from_u8(old_source_raw), + new_source = ?new_source, "backend capacity updated" ); } - // Wait for the next worker lifecycle event. + // Drop the strong refs before awaiting so the task does not keep + // the tracker or registry alive while it sleeps. + drop(t); + drop(r); + + // Wait for the next worker lifecycle event. `RecvError::Closed` + // fires when the registry is fully dropped (sender gone). match events.recv().await { Ok(_event) => continue, Err(RecvError::Lagged(n)) => { @@ -263,14 +318,10 @@ pub(super) fn recompute( } let mut sum_reported: u32 = 0; - let mut reporters: usize = 0; let mut non_reporters: usize = 0; for w in workers { match w.max_running_requests() { - Some(n) => { - sum_reported = sum_reported.saturating_add(u32::from(n)); - reporters += 1; - } + Some(n) => sum_reported = sum_reported.saturating_add(u32::from(n)), None => non_reporters += 1, } } @@ -281,9 +332,8 @@ pub(super) fn recompute( return (capped, CapacitySource::WorkerReported); } - // Tier 3 (Mixed): reporters contribute reported, non-reporters contribute slots_per_worker. - // Also covers "zero reporters" since non_reporters > 0 here. - let _ = reporters; // reporters count is implied; non_reporters drives the formula + // Tier 3 (Mixed): reporters contribute reported, non-reporters contribute + // slots_per_worker. Also covers "zero reporters" since non_reporters > 0 here. let from_non_reporters = (non_reporters as u32).saturating_mul(u32::from(settings.slots_per_worker)); let total = sum_reported.saturating_add(from_non_reporters); @@ -481,6 +531,68 @@ mod tests { assert_eq!(tracker.source(), CapacitySource::WorkerReported); } + #[tokio::test] + async fn test_event_task_shrinks_capacity_when_worker_goes_unhealthy() { + // Primary safety property: capacity must shed when a worker stops being + // healthy, otherwise the gateway over-admits while the backend is hurt. + let registry = Arc::new(WorkerRegistry::new()); + let settings = CapacityTrackerSettings { + slots_per_worker: 64, + legacy_max_concurrent_requests: 0, + ..Default::default() + }; + let tracker = WorkerCapacity::spawn(registry.clone(), settings); + + // Register two reporting workers, mark both Ready, wait for both updates. + let mut rx = tracker.watch(); + rx.borrow_and_update(); + let mk = |url: &str, cap: u16| -> Arc { + let mut labels = HashMap::new(); + labels.insert("max_running_requests".to_string(), cap.to_string()); + Arc::new(BasicWorkerBuilder::new(url).labels(labels).build()) + }; + let id1 = registry.register(mk("http://w1", 256)).expect("registered"); + registry.transition_status(&id1, WorkerStatus::Ready); + let id2 = registry.register(mk("http://w2", 256)).expect("registered"); + registry.transition_status(&id2, WorkerStatus::Ready); + + // Drain updates until we see 512 (the registrations may coalesce). + let deadline = Duration::from_secs(2); + let combined_seen = timeout(deadline, async { + loop { + if *rx.borrow_and_update() == 512 { + break; + } + rx.changed().await.expect("watch not closed"); + } + }) + .await; + assert!( + combined_seen.is_ok(), + "never observed combined capacity 512" + ); + assert_eq!(tracker.source(), CapacitySource::WorkerReported); + + // Take one worker unhealthy → capacity must drop. + registry.transition_status(&id1, WorkerStatus::NotReady); + + let shrunk = timeout(Duration::from_secs(2), async { + loop { + rx.changed().await.expect("watch not closed"); + if *rx.borrow() < 512 { + break; + } + } + }) + .await; + assert!(shrunk.is_ok(), "watch did not signal capacity shrink"); + assert_eq!( + tracker.current(), + 256, + "only the remaining healthy worker counts" + ); + } + #[tokio::test] async fn test_spawn_computes_initial_capacity_from_empty_registry() { let registry = Arc::new(WorkerRegistry::new());