feat(worker): add WorkerCapacity tracker (4-tier capacity sourcing)#1534
Conversation
First commit of the WorkerCapacity component. Tracks which precedence tier (Override / WorkerReported / Mixed / LegacyFallback) produced the current capacity value; exposed as a Prometheus gauge in later commits. Signed-off-by: Simo Lin <25425177+slin1237@users.noreply.github.com>
CLI-flag-shaped config for WorkerCapacity: override (i32, 0 = derive), slots_per_worker default 64, legacy_max_concurrent_requests default 1024. Signed-off-by: Simo Lin <25425177+slin1237@users.noreply.github.com>
Pure function mapping (settings, healthy workers) -> (capacity, source). Tiers in precedence order: Override, WorkerReported, Mixed, LegacyFallback. The Mixed tier covers both "some workers reported, some didn't" and "workers exist but none report" — non-reporters contribute slots_per_worker each, reporters contribute their reported value. Saturating arithmetic caps at u16::MAX. Six unit tests exercise all four tiers plus the u16 saturation boundary. Signed-off-by: Simo Lin <25425177+slin1237@users.noreply.github.com>
current() returns the latest computed capacity; source() returns the tier label; watch() returns a tokio::watch::Receiver<u16> for reactive consumers. Event task and spawn() constructor land in next commits. A test-only `for_test_with_value` constructor lets unit tests exercise the read API without spinning up a WorkerRegistry / event task. Signed-off-by: Simo Lin <25425177+slin1237@users.noreply.github.com>
Synchronous initial compute against a snapshot of the registry's healthy workers, so callers see a valid `current()` immediately after spawn. Event-loop task wiring lands in the next commit. Signed-off-by: Simo Lin <25425177+slin1237@users.noreply.github.com>
Spawns a tokio task that subscribes to WorkerRegistry::subscribe_events, recomputes capacity on each event, updates the atomic + watch channel. Panic in the inner loop restarts after 1s. broadcast::error::RecvError:: Lagged triggers a re-snapshot; Closed (registry dropped) exits cleanly. Integration test exercises the full path: register a worker reporting 256 capacity → watch channel signals change → tracker.current() = 256, source = WorkerReported. Signed-off-by: Simo Lin <25425177+slin1237@users.noreply.github.com>
Allows other modules to import via 'crate::worker::WorkerCapacity', 'crate::worker::CapacitySource', 'crate::worker::CapacityTrackerSettings' matching the pattern used for WorkerRegistry and other worker types. Signed-off-by: Simo Lin <25425177+slin1237@users.noreply.github.com>
- nightly rustfmt: split a long assert! across multiple lines - clippy::unnecessary_qualification: drop redundant std::sync:: and crate::worker:: prefixes in the test module (Arc and Worker are already in scope via `use super::*;`) Signed-off-by: Simo Lin <25425177+slin1237@users.noreply.github.com>
📝 WalkthroughWalkthroughThis PR introduces a multi-tier worker capacity tracking system that aggregates healthy worker capacity into a single scheduled value. The implementation spawns a supervised async task that recomputes capacity upon worker registry events and publishes updates via a watch channel, with 4-tier precedence covering operator override, reported capacities, mixed capacity, and legacy fallback. ChangesWorker Capacity Tracking System
Sequence DiagramsequenceDiagram
participant Client
participant WorkerCapacity
participant WorkerRegistry
participant SupervisedTask
participant WatchChannel
Client->>WorkerCapacity: spawn(registry, settings)
WorkerCapacity->>WorkerCapacity: recompute initial capacity
WorkerCapacity->>WatchChannel: publish initial value
WorkerCapacity->>SupervisedTask: spawn supervised event loop
loop on registry events
WorkerRegistry->>SupervisedTask: emit worker lifecycle event
SupervisedTask->>WorkerCapacity: recompute(registry, settings)
WorkerCapacity->>WorkerCapacity: update atomics (capacity, source)
WorkerCapacity->>WatchChannel: send update if capacity changed
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: c5a4d6e9de
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
|
|
||
| if new_capacity != old_capacity { | ||
| // send() returns Err if there are no subscribers; we don't care. | ||
| let _ = tracker.watch_tx.send(new_capacity); |
There was a problem hiding this comment.
Persist channel state without active subscribers
Using watch::Sender::send here drops the new value when no receivers are currently subscribed, and this code explicitly ignores that error. Because the initial receiver from watch::channel is not retained, early capacity updates can be lost, so a later watch() subscriber may start from a stale capacity even though current() has already advanced; it will not self-correct until another capacity change occurs. This breaks the expected "latest value" behavior of the watch API for late subscribers.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Code Review
This pull request introduces a WorkerCapacity tracker to manage backend capacity using a tiered precedence model, including a background task that recomputes capacity based on worker registry events. Key feedback includes addressing a circular reference in the task logic that causes memory leaks, removing an unused struct field, and ensuring atomic updates and consistent logging for capacity and source state changes.
| let task_self = this.clone(); | ||
| let task_registry = registry; | ||
| let task_settings = settings; | ||
| #[expect( | ||
| clippy::disallowed_methods, | ||
| reason = "supervised long-lived task: panics are caught and restarted, lifetime tied to the registry" | ||
| )] | ||
| tokio::spawn(async move { | ||
| loop { | ||
| let result = std::panic::AssertUnwindSafe(run_event_loop( | ||
| task_self.clone(), | ||
| task_registry.clone(), | ||
| task_settings.clone(), | ||
| )) | ||
| .catch_unwind() | ||
| .await; | ||
| match result { | ||
| Ok(()) => break, | ||
| Err(_) => { | ||
| tracing::error!("WorkerCapacity event task panicked; restarting in 1s"); | ||
| tokio::time::sleep(std::time::Duration::from_secs(1)).await; | ||
| } | ||
| } | ||
| } | ||
| }); |
There was a problem hiding this comment.
This implementation creates a circular reference that leads to a memory and resource leak. The spawned task holds a strong Arc<WorkerCapacity>, which in turn holds an Arc<WorkerRegistry> via the _registry field. Because the task's loop only exits when the registry is dropped (via RecvError::Closed), and the registry will never be dropped as long as the task holds an Arc to it, the tracker and registry will persist indefinitely even after all external references are dropped.
To fix this, the task should hold a Weak<WorkerCapacity> and run_event_loop should be modified to upgrade the weak reference only when performing updates, ensuring it doesn't hold a strong reference while awaiting events.
References
- Data passed to spawned background tasks must have a 'static lifetime. Use owned types or reference-counted pointers like Arc instead of passing references to ensure the data outlives the task.
| // Held for the lifetime of the tracker so the registry stays alive | ||
| // as long as the event task references it. Set to None in test builders. | ||
| _registry: Option<Arc<WorkerRegistry>>, | ||
| _settings: CapacityTrackerSettings, |
There was a problem hiding this comment.
| let old_capacity = tracker.capacity.swap(new_capacity, Ordering::AcqRel); | ||
| tracker.source.store(new_source as u8, Ordering::Release); | ||
|
|
||
| if new_capacity != old_capacity { | ||
| // send() returns Err if there are no subscribers; we don't care. | ||
| let _ = tracker.watch_tx.send(new_capacity); | ||
| tracing::info!( | ||
| old = old_capacity, | ||
| new = new_capacity, | ||
| source = ?new_source, | ||
| "backend capacity updated" | ||
| ); | ||
| } |
There was a problem hiding this comment.
The capacity and source atomics are updated using separate operations, which can lead to transient consistency issues where a reader sees a new capacity with an old source (or vice versa). Furthermore, if the capacity value remains unchanged but the source tier transitions (e.g., from Override to WorkerReported), the change is neither logged nor notified via the watch channel. Consider using swap for both and checking both values for changes to ensure logs and metrics stay in sync.
| let old_capacity = tracker.capacity.swap(new_capacity, Ordering::AcqRel); | |
| tracker.source.store(new_source as u8, Ordering::Release); | |
| if new_capacity != old_capacity { | |
| // send() returns Err if there are no subscribers; we don't care. | |
| let _ = tracker.watch_tx.send(new_capacity); | |
| tracing::info!( | |
| old = old_capacity, | |
| new = new_capacity, | |
| source = ?new_source, | |
| "backend capacity updated" | |
| ); | |
| } | |
| let old_capacity = tracker.capacity.swap(new_capacity, Ordering::AcqRel); | |
| let old_source = tracker.source.swap(new_source as u8, Ordering::AcqRel); | |
| if new_capacity != old_capacity || new_source as u8 != old_source { | |
| // send() returns Err if there are no subscribers; we don't care. | |
| let _ = tracker.watch_tx.send(new_capacity); | |
| tracing::info!( | |
| old_cap = old_capacity, | |
| new_cap = new_capacity, | |
| old_src = ?CapacitySource::from_u8(old_source), | |
| new_src = ?new_source, | |
| "backend capacity updated" | |
| ); | |
| } |
References
- Maintain consistency with existing logging practices within the same crate for similar functionalities. (link)
- Instead of silently ignoring potential failures or state changes, log them to aid in debugging and maintain visibility.
| } | ||
| } | ||
| } | ||
|
|
||
| impl Default for CapacityTrackerSettings { | ||
| fn default() -> Self { |
There was a problem hiding this comment.
🟡 Nit: u16::try_from(raw) silently maps values above 65 535 (and negative values) to None, meaning the override is quietly ignored and capacity falls back to dynamic computation. An operator setting --worker-capacity-override 70000 would get no error and no override.
Consider either returning a Result so the CLI layer can surface a startup error, or at minimum logging a warning here:
| } | |
| } | |
| } | |
| impl Default for CapacityTrackerSettings { | |
| fn default() -> Self { | |
| pub fn with_override(raw: i32) -> Self { | |
| let override_capacity = match u16::try_from(raw).ok().filter(|n| *n > 0) { | |
| v @ Some(_) => v, | |
| None if raw > 0 => { | |
| tracing::warn!( | |
| value = raw, | |
| max = u16::MAX, | |
| "worker-capacity-override out of u16 range; falling back to dynamic capacity" | |
| ); | |
| None | |
| } | |
| None => None, | |
| }; | |
| Self { | |
| override_capacity, | |
| ..Self::default() | |
| } | |
| } |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@model_gateway/src/worker/capacity.rs`:
- Around line 265-286: The local binding reporters is only read to increment but
never used later, so replace its declaration with an underscore-prefixed name
(e.g., change let mut reporters: usize = 0; to let mut _reporters: usize = 0;)
so the unused-warning is suppressed idiomatically while still allowing
increments in the match over workers and calls to max_running_requests();
alternatively remove the reporters variable entirely if you confirm its count
isn’t required (sum_reported already captures the reported contribution).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: a2cb9c79-90a2-4d51-b714-01fa5b98483a
📒 Files selected for processing (2)
model_gateway/src/worker/capacity.rsmodel_gateway/src/worker/mod.rs
| match result { | ||
| Ok(()) => break, | ||
| Err(_) => { | ||
| tracing::error!("WorkerCapacity event task panicked; restarting in 1s"); | ||
| tokio::time::sleep(std::time::Duration::from_secs(1)).await; | ||
| } |
There was a problem hiding this comment.
🟡 Nit: The wildcard Err(_) discards the panic payload — the error log will say "panicked; restarting" with zero diagnostic context. Extracting the panic message is cheap and makes this debuggable in production:
| match result { | |
| Ok(()) => break, | |
| Err(_) => { | |
| tracing::error!("WorkerCapacity event task panicked; restarting in 1s"); | |
| tokio::time::sleep(std::time::Duration::from_secs(1)).await; | |
| } | |
| Err(payload) => { | |
| let msg = payload | |
| .downcast_ref::<&str>() | |
| .copied() | |
| .map(String::from) | |
| .or_else(|| payload.downcast_ref::<String>().cloned()) | |
| .unwrap_or_else(|| "(non-string panic)".into()); | |
| tracing::error!( | |
| panic.message = %msg, | |
| "WorkerCapacity event task panicked; restarting in 1s" | |
| ); | |
| tokio::time::sleep(std::time::Duration::from_secs(1)).await; | |
| } |
| async fn test_event_task_recomputes_on_registered() { | ||
| let registry = Arc::new(WorkerRegistry::new()); | ||
| let settings = CapacityTrackerSettings { | ||
| slots_per_worker: 64, | ||
| legacy_max_concurrent_requests: 0, | ||
| ..Default::default() | ||
| }; | ||
| let tracker = WorkerCapacity::spawn(registry.clone(), settings); | ||
|
|
||
| // Initial: 0 workers → tier 4 fallback (0). | ||
| assert_eq!(tracker.current(), 0); | ||
|
|
||
| let mut rx = tracker.watch(); | ||
| rx.borrow_and_update(); | ||
|
|
||
| // Register a worker reporting 256 capacity. | ||
| let mut labels = HashMap::new(); | ||
| labels.insert("max_running_requests".to_string(), "256".to_string()); | ||
| let worker: Arc<dyn Worker> = Arc::new( | ||
| BasicWorkerBuilder::new("http://w1:8000") | ||
| .labels(labels) | ||
| .build(), | ||
| ); | ||
| let worker_id = registry.register(worker).expect("registered"); | ||
| registry.transition_status(&worker_id, WorkerStatus::Ready); | ||
|
|
||
| // Wait for the watch channel to update. | ||
| let changed = timeout(Duration::from_secs(2), rx.changed()).await; | ||
| assert!(changed.is_ok(), "watch channel did not signal a change"); | ||
| assert_eq!(*rx.borrow(), 256); | ||
| assert_eq!(tracker.source(), CapacitySource::WorkerReported); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_spawn_computes_initial_capacity_from_empty_registry() { |
There was a problem hiding this comment.
🟡 Nit: This integration test covers the "add a worker → capacity goes up" path, but there's no corresponding test for "worker goes unhealthy → capacity goes down." Since the primary safety property of this module is shedding capacity when backends fail, consider adding a test that registers two workers, confirms the combined capacity, then transitions one to NotReady/Draining and asserts the watch channel fires with the reduced value.
There was a problem hiding this comment.
Clean, well-structured addition. The 4-tier capacity model is well-decomposed, recompute is a pure function that's easy to reason about, and the supervised event loop handles edge cases (lagged broadcast, registry drop) correctly. The initial synchronous compute in spawn() ensures callers see a valid current() immediately — nice touch.
Review summary: 0 🔴 Important · 3 🟡 Nit · 0 🟣 Pre-existing
Nits posted:
with_overridesilently drops out-of-rangei32values — suggest logging a warning so operators notice misconfiguration.- The
catch_unwindhandler discards the panic payload — extracting the message makes production debugging feasible. - No integration test for capacity decreasing when a worker goes unhealthy — this is the module's primary safety property.
…city Addresses review feedback on PR #1534: - Memory leak / Arc cycle (Gemini, high): the event task held an Arc<WorkerCapacity> while WorkerCapacity held an Arc<WorkerRegistry> via the `_registry` field. The task only exited on RecvError::Closed, which required the registry to be dropped — which could not happen because the task itself kept it alive. Fix: the task now holds Weak references to both the tracker and the registry, and upgrades them per iteration. The struct drops the `_registry` and `_settings` fields (the task owns its own settings clone). - Source-only changes silently ignored (Gemini, medium): log on either capacity OR source change. The watch channel still only signals on capacity change (it carries u16), but operators see source transitions in logs. - Panic payload discarded (Claude nit): extract the panic message in the catch_unwind handler instead of `Err(_)`. Makes production diagnostics actually useful. - with_override silently drops out-of-range values (Claude nit): --worker-capacity-override 70000 now logs a warning that the value is out of u16 range and the override is being ignored. - Dead `reporters` counter (CodeRabbit): the variable was incremented but never read. Removed. - Adds an integration test for the primary safety property: capacity must shrink when a worker transitions to NotReady. Covers register-two → confirm 512 → mark one NotReady → confirm < 512. 17 unit tests, all 897 lib tests pass locally; fmt + clippy clean. Signed-off-by: Simo Lin <25425177+slin1237@users.noreply.github.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 238319e94e
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| // send() returns Err if there are no subscribers; we don't care. | ||
| // The stored watch value is still updated, so late subscribers | ||
| // see the latest. | ||
| let _ = t.watch_tx.send(new_capacity); |
There was a problem hiding this comment.
Persist watch value when no receivers are subscribed
When all watch() receivers are temporarily dropped, watch::Sender::send returns Err and does not retain the new value, so this update path can silently lose capacity changes and later subscribers will read stale state until another event arrives. This can happen in normal operation whenever reactive consumers restart. Fresh evidence: the repo’s WatchRegistry::publish uses send_replace specifically to preserve state across zero-subscriber windows (model_gateway/src/observability/metrics_ws/registry.rs).
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@model_gateway/src/worker/capacity.rs`:
- Around line 256-260: The code uses t.watch_tx.send(new_capacity) which fails
to update the stored watch value when there are zero receivers; replace that
call with t.watch_tx.send_replace(new_capacity) so the new_capacity is persisted
for future watch() subscribers even if send() would return Err — update the
block guarded by capacity_changed (where t.watch_tx is invoked) to call
send_replace instead of send.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: c01e9c15-af51-4f3e-ad33-70e655c5b7a4
📒 Files selected for processing (1)
model_gateway/src/worker/capacity.rs
| if capacity_changed { | ||
| // send() returns Err if there are no subscribers; we don't care. | ||
| // The stored watch value is still updated, so late subscribers | ||
| // see the latest. | ||
| let _ = t.watch_tx.send(new_capacity); |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
tokio::sync::watch::Sender::send behavior when there are zero receivers, and whether Sender::send_replace persists the latest value for future subscribers
💡 Result:
Answer: 1) tokio::sync::watch::Sender::send with zero receivers: Sender::send fails (returns SendError) when the channel is closed, which happens when every receiver has been dropped. If send fails, the value is not made available for future receivers. 2) Does Sender::send_replace persist the latest value for future subscribers? Yes: Sender::send_replace explicitly allows sending even when there are no receivers, and it updates the watched value in the channel. New receivers created later via Sender::subscribe will see the most recently sent value (messages sent before subscribe are initially marked as seen by the new receiver). Grounded details: - Sender::send returns Result<, SendError> and “fails if the channel is closed… when every receiver has been dropped”; “when send fails, the value isn’t made available for future receivers.” [1] - Sender::send_replace returns the previous value and “permits sending values even when there are no receivers.” [1] - Sender::subscribe creates a new Receiver connected to the Sender and “all messages sent before this call to subscribe are initially marked as seen by the new Receiver.” [1] So the difference is: - send: if there are no receivers (and thus the channel is closed), the send fails and nothing is retained for future subscribers. - send_replace: the value is retained in the watch channel and a future subscribe will observe it as the latest value (marked seen immediately).
Citations:
Persist watch updates even when there are temporarily zero receivers
tokio::sync::watch::Sender::send() doesn’t update the stored watched value when the watch channel is closed (i.e., there are no remaining receivers)—it returns SendError, and late subscribers won’t observe the new new_capacity. Sender::send_replace() persists the latest value even with zero receivers, so future watch() subscribers see it.
🔧 Minimal fix
if capacity_changed {
- // send() returns Err if there are no subscribers; we don't care.
- // The stored watch value is still updated, so late subscribers
- // see the latest.
- let _ = t.watch_tx.send(new_capacity);
+ // Persist the latest value even when no receivers are currently attached.
+ t.watch_tx.send_replace(new_capacity);
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@model_gateway/src/worker/capacity.rs` around lines 256 - 260, The code uses
t.watch_tx.send(new_capacity) which fails to update the stored watch value when
there are zero receivers; replace that call with
t.watch_tx.send_replace(new_capacity) so the new_capacity is persisted for
future watch() subscribers even if send() would return Err — update the block
guarded by capacity_changed (where t.watch_tx is invoked) to call send_replace
instead of send.
Summary
Introduces
WorkerCapacity(inmodel_gateway/src/worker/capacity.rs) — a small component sitting next toWorkerRegistrythat derives aggregate backend in-flight capacity from the current healthy worker fleet and exposes it via:current() -> u16getter (single atomic load on the hot path), andtokio::sync::watch::Receiver<u16>channel for reactive consumers.A single supervised tokio task subscribes to
WorkerRegistry::subscribe_eventsand recomputes capacity on every worker lifecycle change. Panic in the inner loop restarts the task after a 1s backoff; broadcastLaggedtriggers a re-snapshot;Closedexits cleanly when the registry is dropped.Capacity sourcing (4-tier precedence)
--worker-capacity-overrideCLI flag pinned non-zero.max_running_requestsvia metadata discovery (SGLang gRPC populates this label today; SGLang HTTP starting in feat(workflow): surface max_running_requests on SGLang HTTP /server_info #1529; vLLM is currently None).slots_per_worker(default 64). Also covers the pure-tier-3 case where no worker reports.max_concurrent_requests.Public API
What this PR does NOT include
--worker-capacity-override,--worker-capacity-slots-per-worker) — added in the next PRAppContextwiring +server.rsstartup construction — next PRsmg_backend_capacity,smg_backend_capacity_source,smg_backend_workers_available,smg_backend_capacity_recompute_total) — separate PRWorkerRegistry— separate PRThis PR introduces only the module itself + unit tests. The new types are exported from
model_gateway::worker::*so subsequent PRs can wire them up.Test plan
CapacitySourceround-trip,CapacityTrackerSettings::default+with_override,recomputeexhaustive over all four tiers + u16 saturation boundary,WorkerCapacityread accessors via test-only constructor,spawn()initial-value compute, and event-task end-to-end (register worker → watch channel signals → current() updates).cargo +nightly fmt --all -- --checkclean.cargo clippy --all-targets --all-features -- -D warningsclean.cargo test --package smg --lib— 897 passing, 0 failed.Design reference
.claude/priority-scheduling/01-worker-capacity-design.md(kept local; not in this PR).Summary by CodeRabbit