feat(scheduler): foundation — class, config, policy (PR 2 M1)#1541
feat(scheduler): foundation — class, config, policy (PR 2 M1)#1541CatherineSue wants to merge 5 commits into
Conversation
4 classes (Bulk < Default < Interactive < System) with #[repr(u8)] and Ord derived so tenant-clamp is min(header_class, max_class). parse_header is case-insensitive, whitespace-tolerant, and degrades unknown values to Default. PRIORITY_HEADER constant "x-smg-priority". Signed-off-by: Chang Su <8605658+CatherineSue@users.noreply.github.com>
ClassConfig is the YAML-shaped on-disk form (u64 secs, f32 multiplier); ClassRuntimeConfig is the runtime form (Duration). Built-in per-class defaults match the table in 02-priority-scheduler-design.md §3. Signed-off-by: Chang Su <8605658+CatherineSue@users.noreply.github.com>
Optional YAML file loaded via --priority-scheduler-config <path>. Classes and tenant_policies are both optional; absent keys fall back to built-in defaults in a later commit. Class enum serializes as lowercase strings. Signed-off-by: Chang Su <8605658+CatherineSue@users.noreply.github.com>
Merges built-in defaults + optional YAML + CLI flags into one runtime config. Validates per-field invariants (non-negative multiplier, positive timeouts); capacity-vs-reserved validation deferred to PriorityScheduler. Signed-off-by: Chang Su <8605658+CatherineSue@users.noreply.github.com>
Trait-shaped so a future async store-backed impl can land without touching the admission path. v1 impl is a HashMap lookup with a default fallback. TenantPolicy is the future growth point (weight, slot_quota, rps_cap). Signed-off-by: Chang Su <8605658+CatherineSue@users.noreply.github.com>
📝 WalkthroughWalkthroughThis PR introduces a new priority-aware admission scheduler middleware for model_gateway. It adds a complete scheduler infrastructure module with priority class definitions, configuration management with YAML and CLI integration, per-tenant policy enforcement, and modular re-exports. All new code includes comprehensive unit test coverage. ChangesPriority-aware admission scheduler middleware
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: a9943ba220
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
|
|
||
| for class in Class::ALL { | ||
| let cfg = &classes[class as usize]; | ||
| if cfg.queue_size_per_slot < 0.0 { |
There was a problem hiding this comment.
Validate queue_size_per_slot is finite
from_cli_and_yaml only rejects queue_size_per_slot < 0.0, so YAML values like .nan (and potentially inf) pass validation even though they are not meaningful multipliers. This can silently produce invalid effective queue limits when the scheduler applies the documented formula ceil(queue_size_per_slot * capacity), leading to unpredictable admission behavior once the feature is wired in. Please reject non-finite values (e.g., !is_finite()) in the same validation block.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Code Review
This pull request introduces a priority-aware admission scheduler, including the definition of priority classes, configuration management for per-class and per-tenant policies, and a static policy resolver. The review feedback identifies a performance optimization opportunity in the header parsing logic to avoid unnecessary heap allocations and suggests enhancing configuration validation to handle non-finite floating-point values for queue size multipliers.
| pub fn parse_header(value: &str) -> Class { | ||
| match value.trim().to_ascii_lowercase().as_str() { | ||
| "system" => Self::System, | ||
| "interactive" => Self::Interactive, | ||
| "bulk" => Self::Bulk, | ||
| _ => Self::Default, | ||
| } | ||
| } |
There was a problem hiding this comment.
The parse_header function currently allocates a new String via to_ascii_lowercase() on every call. Since this is intended for the request admission path, it is more efficient to avoid heap allocations by using eq_ignore_ascii_case on the trimmed string slice.
pub fn parse_header(value: &str) -> Class {
let trimmed = value.trim();
if trimmed.eq_ignore_ascii_case("system") {
Self::System
} else if trimmed.eq_ignore_ascii_case("interactive") {
Self::Interactive
} else if trimmed.eq_ignore_ascii_case("bulk") {
Self::Bulk
} else {
Self::Default
}
}References
- Avoid heap allocations (like Vec or String) within performance-critical comparison logic. Use allocation-free helpers to reduce memory pressure.
|
|
||
| for class in Class::ALL { | ||
| let cfg = &classes[class as usize]; | ||
| if cfg.queue_size_per_slot < 0.0 { |
There was a problem hiding this comment.
The validation for queue_size_per_slot only checks if the value is negative. It should also check for non-finite values like NaN or Infinity to ensure that downstream capacity calculations (e.g., using ceil) remain deterministic and safe.
| if cfg.queue_size_per_slot < 0.0 { | |
| if cfg.queue_size_per_slot < 0.0 || !cfg.queue_size_per_slot.is_finite() { |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@model_gateway/src/middleware/scheduler/config.rs`:
- Around line 208-210: Update the validation in the scheduler settings so
non-finite queue_size_per_slot values are rejected: in the code that currently
checks cfg.queue_size_per_slot < 0.0 (the branch that returns
SettingsValidationError::NegativeMultiplier), additionally test
cfg.queue_size_per_slot.is_finite() and reject when false (or explicitly check
for NaN/inf) so NaN/+inf do not pass; update the error path to return the same
SettingsValidationError (or add a new variant if you prefer) and ensure this
logic is inside the same validation routine invoked by
SchedulerSettings::from_cli_and_yaml. Also add unit tests that load YAML with
queue_size_per_slot: .nan and queue_size_per_slot: .inf and assert
SchedulerSettings::from_cli_and_yaml returns an Err for those inputs.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 9a761abb-7598-44ac-b21d-85e13b3132d5
📒 Files selected for processing (5)
model_gateway/src/middleware/mod.rsmodel_gateway/src/middleware/scheduler/class.rsmodel_gateway/src/middleware/scheduler/config.rsmodel_gateway/src/middleware/scheduler/mod.rsmodel_gateway/src/middleware/scheduler/policy.rs
| if cfg.queue_size_per_slot < 0.0 { | ||
| return Err(SettingsValidationError::NegativeMultiplier { class }); | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Locate and print the relevant validation block
rg -n "queue_size_per_slot" model_gateway/src/middleware/scheduler/config.rs
sed -n '160,260p' model_gateway/src/middleware/scheduler/config.rs
# Find where SettingsValidationError is defined/used
rg -n "SettingsValidationError" model_gateway/src/middleware/scheduler/config.rs
rg -n "enum SettingsValidationError|SettingsValidationError::" model_gateway/src/middleware/scheduler -S
# Search for tests covering YAML parsing / validation errors
fd -t f -e rs model_gateway | rg -n "queue_size_per_slot|NegativeMultiplier|SettingsValidationError" -S
fd -t f -e rs model_gateway | rg -n "yaml|serde_yaml|Number|NaN|inf|infinity|non-finite|is_finite" -S
# If there are YAML fixtures, inspect them
fd -t f model_gateway -g "*.yaml" -g "*.yml" | head -n 50
fd -t f model_gateway -g "*.yaml" -g "*.yml" | xargs -I{} sh -c 'echo "---- {}"; sed -n "1,200p" "{}" | rg -n "queue_size_per_slot|nan|inf|infinity" || true'Repository: lightseekorg/smg
Length of output: 5836
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Show the relevant validation block and error enum
sed -n '120,240p' model_gateway/src/middleware/scheduler/config.rs
# Show existing tests around queue_size_per_slot validation
sed -n '240,360p' model_gateway/src/middleware/scheduler/config.rs
# Show tests that assert SettingsValidationError variants
sed -n '420,540p' model_gateway/src/middleware/scheduler/config.rs
# Locate YAML structs / deserialization types
rg -n "struct PrioritySchedulerYaml|PrioritySchedulerYaml|serde_yaml|serde.*yaml" model_gateway/src/middleware/scheduler/config.rs model_gateway/src/middleware/scheduler -S
# If PrioritySchedulerYaml is defined in another file, show it
fd -t f -e rs model_gateway/src/middleware/scheduler | xargs -I{} sh -c 'rg -n "struct PrioritySchedulerYaml|PrioritySchedulerYaml" "{}" && true'
# Print the file(s) where PrioritySchedulerYaml is defined
fd -t f -e rs model_gateway/src/middleware/scheduler | xargs -I{} sh -c 'rg -n "struct PrioritySchedulerYaml|PrioritySchedulerYaml" "{}" | head -n 5 || true'Repository: lightseekorg/smg
Length of output: 15628
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Search for non-finite handling in this module and repo
rg -n "is_finite|is_nan|NaN|nan|INFINITY|inf|infinity|non-finite" model_gateway/src -S || true
rg -n "queue_size_per_slot.*(is_finite|nan|inf)" model_gateway/src -S || true
# Check whether other float validations exist
rg -n "must be.*finite|finite" model_gateway/src -S || true
# Look for tests that parse YAML containing NaN/inf
rg -n "queue_size_per_slot:.*(nan|inf|infinity)" model_gateway/src -S || true
rg -n "from_str\\(.*nan|from_str\\(.*inf|\\.nan|\\.inf|NaN|Infinity" model_gateway/src -S || true
# If there are other YAML float validation tests, inspect them
fd -t f -e rs model_gateway/src | xargs -I{} rg -n "serde_yaml::from_str\\(" {} | head -n 50 || trueRepository: lightseekorg/smg
Length of output: 50765
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find all usages of queue_size_per_slot (text search, but scoped)
rg -n "queue_size_per_slot" model_gateway/src/middleware/scheduler -S
# Inspect the code that computes queue limits (likely uses queue_size_per_slot * capacity)
rg -n "queue_size_per_slot.*capacity|capacity.*queue_size_per_slot|ceil\\(queue_size_per_slot|queue_size_per_slot.*ceil" model_gateway/src/middleware/scheduler -S
# Search broader for conversions of f32 -> integer / as-casts involving queue sizes
rg -n "queue_size.*per_slot|per_slot.*queue_size|queue_limit|queue.*limit|ceil\\(" model_gateway/src/middleware/scheduler -S
# Look for serde_yaml float tests (NaN/inf) across repo
rg -n "NaN|nan|Infinity|infinity|\\binf\\b|non-finite|is_finite|is_nan|is_infinite" model_gateway/src -S || trueRepository: lightseekorg/smg
Length of output: 50372
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find computations that use queue_size_per_slot
rg -n "queue_size_per_slot.*(capacity|cap|ceil|floor|max|min)|ceil\\(.*queue_size_per_slot|queue_size_per_slot.*\\*" model_gateway/src/middleware/scheduler -S
# Inspect nearby code in scheduler files around those usages
rg -n "queue_size_per_slot" model_gateway/src/middleware/scheduler -S | cut -d: -f1 | sort -u
# Show the first 250 lines around each usage site of queue_size_per_slot (to avoid huge output)
rg -n "queue_size_per_slot" model_gateway/src/middleware/scheduler -S | head -n 50
# Use sed to show ranges in config.rs and any other file identified
sed -n '1,220p' model_gateway/src/middleware/scheduler/config.rsRepository: lightseekorg/smg
Length of output: 10513
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# 1) Inspect scheduler module files to find where queue_size_per_slot affects queue limits
ls -1 model_gateway/src/middleware/scheduler
# Search for queue-limit computations using queue_size_per_slot
rg -n "queue_size_per_slot" model_gateway/src/middleware/scheduler -S
# Search specifically for ceil/casts based on the config comment
rg -n "ceil\\(|floor\\(|max\\(|min\\(|as u32|as usize|as i32" model_gateway/src/middleware/scheduler -S
# Narrow to files likely doing the computation: scheduler.rs / mod.rs / any file with "PriorityScheduler"
fd -t f model_gateway/src/middleware/scheduler | xargs -I{} sh -c 'rg -n "PriorityScheduler" "{}" || true'
# Find the constructor `PriorityScheduler::new` mentioned in comments
fd -t f model_gateway/src/middleware/scheduler | xargs -I{} sh -c 'rg -n "PriorityScheduler::new|fn new\\(" "{}" || true'
# Print any matches around those likely computations
rg -n "queue_size_per_slot|effective limit|effective limit =|max\\(queue_size|queue_size_per_slot \\* capacity" model_gateway/src/middleware/scheduler -S -n --context 3
# 2) Check serde_yaml parsing of non-finite floats (for grounding tests)
# Search in Cargo.lock / docs? limited; we'll just find any yaml parsing patterns in tests.
rg -n "serde_yaml::from_str\\(|from_str\\(y.*yaml|NaN|nan|Infinity|inf|is_finite" model_gateway/src -S || trueRepository: lightseekorg/smg
Length of output: 3501
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Where is the field used anywhere in the repo?
rg -n "queue_size_per_slot" . -S
# Check for effective queue limit computation: look for ceil/floor and casts around queue_size/per_slot
rg -n "ceil\\(|floor\\(|max\\(|min\\(|effective.*queue|queue.*limit|queue.*depth|queue.*size" model_gateway/src -S | rg -n "queue" -S || true
# Narrow to queue_size computations (any file mentioning ceil with queue_size)
rg -n "ceil\\(.*queue_size|queue_size.*ceil\\(|queue_size_per_slot.*ceil|ceil\\(.*capacity|capacity.*queue_size_per_slot" model_gateway/src -S || true
# Inspect scheduler module files for queue depth computations by searching for u32 casts
rg -n "as u32|as i32|as usize|queue.*as" model_gateway/src/middleware/scheduler -S || trueRepository: lightseekorg/smg
Length of output: 8700
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Inspect scheduler module files
sed -n '1,220p' model_gateway/src/middleware/scheduler/mod.rs
sed -n '1,260p' model_gateway/src/middleware/scheduler/class.rs
sed -n '1,260p' model_gateway/src/middleware/scheduler/policy.rs
# Search for effective queue/limit calculations mentioning capacity or queue_size_per_slot math
rg -n "capacity|queue limit|effective|ceil\\(|queue_size_per_slot|queue_size" model_gateway/src/middleware/scheduler -SRepository: lightseekorg/smg
Length of output: 13891
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find the actual scheduler implementation
rg -n "struct PriorityScheduler|PriorityScheduler::new|fn new\\(|PriorityScheduler" model_gateway/src -S
# Look for the effective queue limit formula mentioned in config.rs
rg -n "queue_size_per_slot.*\\*|\\*.*queue_size_per_slot|ceil\\(|floor\\(|max\\(|effective.*queue|queue.*size" model_gateway/src/middleware -S
# Search for uses of ClassConfig fields outside config.rs
rg -n "queue_size_per_slot|queue_size:" model_gateway/src/middleware -S
# Also narrow to any file that mentions WorkerCapacity / admission scheduler computation
rg -n "WorkerCapacity|watch channel|AtomicU16|capacity" model_gateway/src/middleware -SRepository: lightseekorg/smg
Length of output: 20000
🌐 Web query:
Does serde_yaml parse YAML scalars like ".nan", "nan", ".inf", "inf", and "Infinity" into f32/f64 (and what tokens are supported)?
💡 Result:
Yes. serde_yaml can parse YAML scalars representing IEEE-754 special floats into f32/f64. Supported tokens (case-sensitive) shown by serde_yaml’s number parsing: 1) ".nan" -> NaN (e.g., v.as_f64.unwrap.is_nan) [1] 2) ".inf" -> +Infinity (e.g., v.as_f64 == Some(f64::INFINITY)) [1] 3) "-.inf" -> -Infinity (e.g., v.as_f64 == Some(f64::NEG_INFINITY)) [1] So, among the tokens you listed: - ".nan": supported [1] - "nan": not evidenced as supported in the sources I found (serde_yaml’s docs/examples explicitly use ".nan") [1] - ".inf": supported [1] - "inf": not evidenced as supported (examples use ".inf") [1] - "Infinity": not evidenced as supported [1] Notes on f32/f64: - serde_yaml’s Number type stores floats as an f64 internally (“May be infinite or NaN”); the parsing examples validate values via as_f64 [1]. (I didn’t find an explicit token-by-token f32 conversion table, but the float-special handling is part of serde_yaml’s Number parsing.) [1]
Citations:
Reject non-finite queue_size_per_slot during settings validation (NaN/+inf currently slip through)
In model_gateway/src/middleware/scheduler/config.rs (around lines 208-210), the check cfg.queue_size_per_slot < 0.0 rejects only negative values/-inf, but it does not reject NaN (comparisons are false) or +inf—so those can pass validation and break the later ceil(queue_size_per_slot * capacity) behavior.
Suggested fix
- if cfg.queue_size_per_slot < 0.0 {
+ if !cfg.queue_size_per_slot.is_finite() || cfg.queue_size_per_slot < 0.0 {
return Err(SettingsValidationError::NegativeMultiplier { class });
}Add unit tests that parse YAML with queue_size_per_slot: .nan and queue_size_per_slot: .inf and assert SchedulerSettings::from_cli_and_yaml rejects them.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if cfg.queue_size_per_slot < 0.0 { | |
| return Err(SettingsValidationError::NegativeMultiplier { class }); | |
| } | |
| if !cfg.queue_size_per_slot.is_finite() || cfg.queue_size_per_slot < 0.0 { | |
| return Err(SettingsValidationError::NegativeMultiplier { class }); | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@model_gateway/src/middleware/scheduler/config.rs` around lines 208 - 210,
Update the validation in the scheduler settings so non-finite
queue_size_per_slot values are rejected: in the code that currently checks
cfg.queue_size_per_slot < 0.0 (the branch that returns
SettingsValidationError::NegativeMultiplier), additionally test
cfg.queue_size_per_slot.is_finite() and reject when false (or explicitly check
for NaN/inf) so NaN/+inf do not pass; update the error path to return the same
SettingsValidationError (or add a new variant if you prefer) and ensure this
logic is inside the same validation routine invoked by
SchedulerSettings::from_cli_and_yaml. Also add unit tests that load YAML with
queue_size_per_slot: .nan and queue_size_per_slot: .inf and assert
SchedulerSettings::from_cli_and_yaml returns an Err for those inputs.
Description
Problem
The model gateway has no notion of priority on inbound traffic. Today's
concurrency_limit_middlewareadmits requests in FIFO order against a single global slot pool — interactive chat completions queue behind whatever bulk batch happened to arrive first.The full design lives in
.claude/docs/scheduler/02-priority-scheduler-design.md. Splitting the implementation into milestones lets each chunk land reviewably without flipping the master--priority-scheduler-enabledflag totrueuntil everything is wired up.Solution
This PR is Milestone 1 of 6 for PR 2 (priority scheduler). It introduces the vocabulary used by every later milestone: the
Classenum, per-class config (YAML + runtime), tenant policy resolver. No call sites yet — these types are reachable fromcrate::middleware::scheduler::*but no production code path constructs them. Behavior onmainis unchanged.Subsequent milestones (each its own PR):
AppContextwiring,priority_admission_middleware, cancel-token plumbing across handlersThe implementation plan lives at
.claude/docs/scheduler/02-priority-scheduler-plan.md(also in this PR's diff — it's a new file).Changes
Five commits, one per task in M1:
feat(scheduler): add Class enum + priority header parser—Class { Bulk=0, Default=1, Interactive=2, System=3 }withOrdderived (tenant clamp =min(header, max)) andrepr(u8)(later packed intoAtomicU64).parse_headeris case-insensitive, whitespace-tolerant, lenient on unknown values.PRIORITY_HEADER = "x-smg-priority".feat(scheduler): add ClassConfig + ClassRuntimeConfig with defaults—ClassConfigis the YAML-shaped on-disk form (u64secs,f32multiplier);ClassRuntimeConfigis the runtime form (Duration).default_for(Class)returns the built-in per-class defaults from the design's §3 table.feat(scheduler): PrioritySchedulerYaml serde schema—PrioritySchedulerYaml { classes, tenant_policies }, both#[serde(default)].ClassderivesSerialize/Deserializewith#[serde(rename_all = "lowercase")]. Empty YAML round-trips to default.feat(scheduler): SchedulerSettings::from_cli_and_yaml builder— Merges built-in defaults + optional YAML + CLI flags into a single read-onlySchedulerSettings. Per-field validation (non-negative multiplier, positive timeouts). Capacity-vs-reserved check is deferred toPriorityScheduler::new(M2).feat(scheduler): tenant policy resolver trait + static impl—trait TenantPolicyResolver(lets a future async store-backed impl swap in without touching the admission path) +StaticTenantPolicyResolver(HashMap with default fallback).Plus:
.claude/docs/scheduler/02-priority-scheduler-plan.md(the implementation plan for PR 2).Test Plan
Each commit follows TDD discipline: failing tests first, verify they fail with the expected error, then implement, verify pass, commit. 29 new unit tests cover:
Class::parse_headerover all four valid values, case-insensitivity, whitespace tolerance, unknown→Default, and theOrd-derived clamp semantics.ClassConfig::default_formatches the design §3 defaults table exactly for all four classes.ClassRuntimeConfig::from_class_configconverts seconds toDurationand preservescan_preempt.PrioritySchedulerYamlserde round-trips: empty document, partial class override, tenant policy map, unknown-class serde error, lowercase serialization.SchedulerSettings::from_cli_and_yaml: defaults-only, partial YAML override merges with defaults, tenant policies propagate, validation rejects negative multiplier / zero queue timeout / zero starvation threshold.StaticTenantPolicyResolver: empty resolver returns default, known tenant overrides default, trait-object dispatch, admission clamp viamin(header, policy.max_class).$ cargo clippy --package smg --all-targets --all-features -- -D warnings ... Finished `dev` profile [unoptimized + debuginfo] target(s)Checklist
cargo +nightly fmtpasses (enforced by pre-commit hook)cargo clippy --all-targets --all-features -- -D warningspasses.claude/docs/scheduler/Summary by CodeRabbit