From 90ecc5ccc5cf9d76c193ba440c9fda7e37166578 Mon Sep 17 00:00:00 2001 From: Chang Su <8605658+CatherineSue@users.noreply.github.com> Date: Sun, 24 May 2026 18:04:51 -0700 Subject: [PATCH 1/5] feat(scheduler): add Class enum + priority header parser 4 classes (Bulk < Default < Interactive < System) with #[repr(u8)] and Ord derived so tenant-clamp is min(header_class, max_class). parse_header is case-insensitive, whitespace-tolerant, and degrades unknown values to Default. PRIORITY_HEADER constant "x-smg-priority". Signed-off-by: Chang Su <8605658+CatherineSue@users.noreply.github.com> --- model_gateway/src/middleware/mod.rs | 1 + .../src/middleware/scheduler/class.rs | 132 ++++++++++++++++++ model_gateway/src/middleware/scheduler/mod.rs | 9 ++ 3 files changed, 142 insertions(+) create mode 100644 model_gateway/src/middleware/scheduler/class.rs create mode 100644 model_gateway/src/middleware/scheduler/mod.rs diff --git a/model_gateway/src/middleware/mod.rs b/model_gateway/src/middleware/mod.rs index b771c28e8..0067ee469 100644 --- a/model_gateway/src/middleware/mod.rs +++ b/model_gateway/src/middleware/mod.rs @@ -9,6 +9,7 @@ pub mod concurrency; pub mod logging; pub mod metrics; pub mod request_id; +pub mod scheduler; pub mod storage_context; pub mod tenant_resolution; pub mod token_bucket; diff --git a/model_gateway/src/middleware/scheduler/class.rs b/model_gateway/src/middleware/scheduler/class.rs new file mode 100644 index 000000000..bca9f1188 --- /dev/null +++ b/model_gateway/src/middleware/scheduler/class.rs @@ -0,0 +1,132 @@ +//! Priority class + `X-SMG-Priority` header parser. + +/// HTTP request header that conveys the desired priority class. +/// +/// Case-insensitive; unknown values degrade to [`Class::Default`]. +pub const PRIORITY_HEADER: &str = "x-smg-priority"; + +/// Service class assigned to an inbound request. +/// +/// Numeric values are load-bearing: `Ord` is derived so the tenant clamp is +/// `std::cmp::min(header_class, max_class)`, and `repr(u8)` lets the scheduler +/// pack per-class inflight counts into a single `AtomicU64`. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[repr(u8)] +pub enum Class { + /// Background / batch jobs. Lowest priority; preemptible by everyone else. + Bulk = 0, + /// Unlabeled traffic. Middle of the road; what every request gets when no + /// header is set. + Default = 1, + /// Latency-sensitive traffic (chat completions, autocomplete). + Interactive = 2, + /// Internal control-plane traffic. Highest priority; never preempted by + /// external clients (the tenant clamp prevents any external tenant from + /// landing here in practice). + System = 3, +} + +impl Class { + /// All four variants in ascending priority order. + pub const ALL: [Class; 4] = [Self::Bulk, Self::Default, Self::Interactive, Self::System]; + + /// Parse a header value into a class. Case-insensitive, whitespace-tolerant. + /// Unknown values (including the empty string) map to [`Class::Default`] + /// — admission shouldn't fail because of a typo in a non-essential header. + pub fn parse_header(value: &str) -> Class { + match value.trim().to_ascii_lowercase().as_str() { + "system" => Self::System, + "interactive" => Self::Interactive, + "bulk" => Self::Bulk, + _ => Self::Default, + } + } + + /// Lowercase variant name. Used as a metrics label and structured-log field + /// only; never serialized back over the wire. + pub fn as_str(self) -> &'static str { + match self { + Self::Bulk => "bulk", + Self::Default => "default", + Self::Interactive => "interactive", + Self::System => "system", + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_header_known_values() { + assert_eq!(Class::parse_header("system"), Class::System); + assert_eq!(Class::parse_header("interactive"), Class::Interactive); + assert_eq!(Class::parse_header("default"), Class::Default); + assert_eq!(Class::parse_header("bulk"), Class::Bulk); + } + + #[test] + fn test_parse_header_is_case_insensitive() { + assert_eq!(Class::parse_header("Bulk"), Class::Bulk); + assert_eq!(Class::parse_header("INTERACTIVE"), Class::Interactive); + assert_eq!(Class::parse_header("SyStEm"), Class::System); + } + + #[test] + fn test_parse_header_unknown_defaults_to_default() { + assert_eq!(Class::parse_header("urgent"), Class::Default); + assert_eq!(Class::parse_header(""), Class::Default); + assert_eq!(Class::parse_header("123"), Class::Default); + } + + #[test] + fn test_parse_header_tolerates_whitespace() { + assert_eq!(Class::parse_header(" bulk "), Class::Bulk); + assert_eq!(Class::parse_header("\tinteractive\n"), Class::Interactive); + } + + #[test] + fn test_ord_min_implements_tenant_clamp() { + // Tenant clamp = min(header_class, max_class). + assert_eq!( + std::cmp::min(Class::Interactive, Class::Default), + Class::Default + ); + assert_eq!( + std::cmp::min(Class::System, Class::Interactive), + Class::Interactive + ); + assert_eq!(std::cmp::min(Class::Bulk, Class::System), Class::Bulk); + } + + #[test] + fn test_all_is_ascending() { + assert_eq!( + Class::ALL, + [ + Class::Bulk, + Class::Default, + Class::Interactive, + Class::System + ] + ); + // Ord order matches numeric order. + assert!(Class::Bulk < Class::Default); + assert!(Class::Default < Class::Interactive); + assert!(Class::Interactive < Class::System); + } + + #[test] + fn test_as_str_returns_lowercase_variant_name() { + assert_eq!(Class::Bulk.as_str(), "bulk"); + assert_eq!(Class::Default.as_str(), "default"); + assert_eq!(Class::Interactive.as_str(), "interactive"); + assert_eq!(Class::System.as_str(), "system"); + } + + #[test] + fn test_priority_header_constant() { + assert_eq!(PRIORITY_HEADER, "x-smg-priority"); + } +} diff --git a/model_gateway/src/middleware/scheduler/mod.rs b/model_gateway/src/middleware/scheduler/mod.rs new file mode 100644 index 000000000..444a07909 --- /dev/null +++ b/model_gateway/src/middleware/scheduler/mod.rs @@ -0,0 +1,9 @@ +//! Priority-aware admission scheduler. +//! +//! See `.claude/docs/scheduler/02-priority-scheduler-design.md` for the +//! full design rationale and `02-priority-scheduler-plan.md` for the +//! implementation sequencing. + +pub mod class; + +pub use class::{Class, PRIORITY_HEADER}; From 4ab5275a6998c769a7eb5449294d6152c95c2ca7 Mon Sep 17 00:00:00 2001 From: Chang Su <8605658+CatherineSue@users.noreply.github.com> Date: Sun, 24 May 2026 18:10:35 -0700 Subject: [PATCH 2/5] feat(scheduler): add ClassConfig + ClassRuntimeConfig with defaults MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ClassConfig is the YAML-shaped on-disk form (u64 secs, f32 multiplier); ClassRuntimeConfig is the runtime form (Duration). Built-in per-class defaults match the table in 02-priority-scheduler-design.md §3. Signed-off-by: Chang Su <8605658+CatherineSue@users.noreply.github.com> --- .../src/middleware/scheduler/config.rs | 172 ++++++++++++++++++ model_gateway/src/middleware/scheduler/mod.rs | 2 + 2 files changed, 174 insertions(+) create mode 100644 model_gateway/src/middleware/scheduler/config.rs diff --git a/model_gateway/src/middleware/scheduler/config.rs b/model_gateway/src/middleware/scheduler/config.rs new file mode 100644 index 000000000..79afd95f2 --- /dev/null +++ b/model_gateway/src/middleware/scheduler/config.rs @@ -0,0 +1,172 @@ +//! Scheduler config: on-disk YAML shape + runtime form. + +use std::time::Duration; + +use super::Class; + +/// Per-class configuration as it appears in the optional YAML file. +/// +/// Lives separately from [`ClassRuntimeConfig`] because the YAML form +/// uses primitive types friendly to serde and human editing, while the +/// runtime form pre-converts seconds into [`Duration`] so hot paths +/// don't repeat the conversion. +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct ClassConfig { + /// Slots reserved for this class. Higher-class reservations are + /// honored by lower-class admissions via the packed-CAS slot + /// accounting in [`super::scheduler`]. + pub reserved: u16, + /// Absolute floor on the per-class queue depth. + pub queue_size: u32, + /// Optional multiplier: effective limit = + /// `max(queue_size, ceil(queue_size_per_slot * capacity))`. + /// `0.0` disables the multiplier (use the absolute floor only). + pub queue_size_per_slot: f32, + /// How long a queued waiter waits before the admission middleware + /// returns 408. Seconds at rest; converted to [`Duration`] in + /// [`ClassRuntimeConfig`]. + pub queue_timeout_secs: u64, + /// Head-of-queue age past which the dispatcher promotes a waiter + /// out of normal priority order to avoid starvation. Seconds at + /// rest; converted to [`Duration`] in [`ClassRuntimeConfig`]. + pub starvation_threshold_secs: u64, + /// Whether admissions in this class are allowed to preempt a + /// lower-class inflight request that has not yet emitted its first + /// byte. Higher classes default to `true`; lower classes default + /// to `false`. + pub can_preempt: bool, +} + +impl ClassConfig { + /// Built-in defaults per `02-priority-scheduler-design.md` §3. + /// These are what every class gets when no YAML file supplies an + /// override. + pub fn default_for(class: Class) -> Self { + match class { + Class::System => Self { + reserved: 32, + queue_size: 64, + queue_size_per_slot: 0.0, + queue_timeout_secs: 30, + starvation_threshold_secs: 5, + can_preempt: true, + }, + Class::Interactive => Self { + reserved: 128, + queue_size: 256, + queue_size_per_slot: 0.25, + queue_timeout_secs: 30, + starvation_threshold_secs: 5, + can_preempt: true, + }, + Class::Default => Self { + reserved: 0, + queue_size: 512, + queue_size_per_slot: 0.5, + queue_timeout_secs: 60, + starvation_threshold_secs: 30, + can_preempt: false, + }, + Class::Bulk => Self { + reserved: 0, + queue_size: 1024, + queue_size_per_slot: 1.0, + queue_timeout_secs: 300, + starvation_threshold_secs: 120, + can_preempt: false, + }, + } + } +} + +/// Runtime view of [`ClassConfig`] — only the fields the dispatcher +/// reads on its hot path, with seconds pre-converted to [`Duration`]. +/// `reserved`, `queue_size`, and `queue_size_per_slot` live elsewhere +/// (the packed-CAS array and the per-class queue impl), so they don't +/// appear here. +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct ClassRuntimeConfig { + pub queue_timeout: Duration, + pub starvation_threshold: Duration, + pub can_preempt: bool, +} + +impl ClassRuntimeConfig { + pub fn from_class_config(cfg: &ClassConfig) -> Self { + Self { + queue_timeout: Duration::from_secs(cfg.queue_timeout_secs), + starvation_threshold: Duration::from_secs(cfg.starvation_threshold_secs), + can_preempt: cfg.can_preempt, + } + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::*; + use crate::middleware::scheduler::Class; + + #[test] + fn test_default_for_system() { + let cfg = ClassConfig::default_for(Class::System); + assert_eq!(cfg.reserved, 32); + assert_eq!(cfg.queue_size, 64); + assert_eq!(cfg.queue_size_per_slot, 0.0); + assert_eq!(cfg.queue_timeout_secs, 30); + assert_eq!(cfg.starvation_threshold_secs, 5); + assert!(cfg.can_preempt); + } + + #[test] + fn test_default_for_interactive() { + let cfg = ClassConfig::default_for(Class::Interactive); + assert_eq!(cfg.reserved, 128); + assert_eq!(cfg.queue_size, 256); + assert_eq!(cfg.queue_size_per_slot, 0.25); + assert_eq!(cfg.queue_timeout_secs, 30); + assert_eq!(cfg.starvation_threshold_secs, 5); + assert!(cfg.can_preempt); + } + + #[test] + fn test_default_for_default() { + let cfg = ClassConfig::default_for(Class::Default); + assert_eq!(cfg.reserved, 0); + assert_eq!(cfg.queue_size, 512); + assert_eq!(cfg.queue_size_per_slot, 0.5); + assert_eq!(cfg.queue_timeout_secs, 60); + assert_eq!(cfg.starvation_threshold_secs, 30); + assert!(!cfg.can_preempt); + } + + #[test] + fn test_default_for_bulk() { + let cfg = ClassConfig::default_for(Class::Bulk); + assert_eq!(cfg.reserved, 0); + assert_eq!(cfg.queue_size, 1024); + assert_eq!(cfg.queue_size_per_slot, 1.0); + assert_eq!(cfg.queue_timeout_secs, 300); + assert_eq!(cfg.starvation_threshold_secs, 120); + assert!(!cfg.can_preempt); + } + + #[test] + fn test_runtime_config_converts_seconds_to_duration() { + let cfg = ClassConfig::default_for(Class::Default); + let runtime = ClassRuntimeConfig::from_class_config(&cfg); + assert_eq!(runtime.queue_timeout, Duration::from_secs(60)); + assert_eq!(runtime.starvation_threshold, Duration::from_secs(30)); + assert!(!runtime.can_preempt); + } + + #[test] + fn test_runtime_config_preserves_can_preempt_flag() { + let interactive = + ClassRuntimeConfig::from_class_config(&ClassConfig::default_for(Class::Interactive)); + assert!(interactive.can_preempt); + let bulk = ClassRuntimeConfig::from_class_config(&ClassConfig::default_for(Class::Bulk)); + assert!(!bulk.can_preempt); + } +} diff --git a/model_gateway/src/middleware/scheduler/mod.rs b/model_gateway/src/middleware/scheduler/mod.rs index 444a07909..5fad027c5 100644 --- a/model_gateway/src/middleware/scheduler/mod.rs +++ b/model_gateway/src/middleware/scheduler/mod.rs @@ -5,5 +5,7 @@ //! implementation sequencing. pub mod class; +pub mod config; pub use class::{Class, PRIORITY_HEADER}; +pub use config::{ClassConfig, ClassRuntimeConfig}; From b5ab09e003fa377a177467fa7185b98421259a23 Mon Sep 17 00:00:00 2001 From: Chang Su <8605658+CatherineSue@users.noreply.github.com> Date: Sun, 24 May 2026 18:16:26 -0700 Subject: [PATCH 3/5] feat(scheduler): PrioritySchedulerYaml serde schema Optional YAML file loaded via --priority-scheduler-config . Classes and tenant_policies are both optional; absent keys fall back to built-in defaults in a later commit. Class enum serializes as lowercase strings. Signed-off-by: Chang Su <8605658+CatherineSue@users.noreply.github.com> --- .../src/middleware/scheduler/class.rs | 8 +- .../src/middleware/scheduler/config.rs | 109 +++++++++++++++++- model_gateway/src/middleware/scheduler/mod.rs | 2 +- 3 files changed, 114 insertions(+), 5 deletions(-) diff --git a/model_gateway/src/middleware/scheduler/class.rs b/model_gateway/src/middleware/scheduler/class.rs index bca9f1188..56bbc5130 100644 --- a/model_gateway/src/middleware/scheduler/class.rs +++ b/model_gateway/src/middleware/scheduler/class.rs @@ -9,8 +9,12 @@ pub const PRIORITY_HEADER: &str = "x-smg-priority"; /// /// Numeric values are load-bearing: `Ord` is derived so the tenant clamp is /// `std::cmp::min(header_class, max_class)`, and `repr(u8)` lets the scheduler -/// pack per-class inflight counts into a single `AtomicU64`. -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +/// pack per-class inflight counts into a single `AtomicU64`. Serde encoding is +/// lowercase so YAML files use `system`/`interactive`/`default`/`bulk`. +#[derive( + Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, +)] +#[serde(rename_all = "lowercase")] #[repr(u8)] pub enum Class { /// Background / batch jobs. Lowest priority; preemptible by everyone else. diff --git a/model_gateway/src/middleware/scheduler/config.rs b/model_gateway/src/middleware/scheduler/config.rs index 79afd95f2..18718cae4 100644 --- a/model_gateway/src/middleware/scheduler/config.rs +++ b/model_gateway/src/middleware/scheduler/config.rs @@ -1,6 +1,8 @@ //! Scheduler config: on-disk YAML shape + runtime form. -use std::time::Duration; +use std::{collections::HashMap, time::Duration}; + +use serde::{Deserialize, Serialize}; use super::Class; @@ -10,7 +12,7 @@ use super::Class; /// uses primitive types friendly to serde and human editing, while the /// runtime form pre-converts seconds into [`Duration`] so hot paths /// don't repeat the conversion. -#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] pub struct ClassConfig { /// Slots reserved for this class. Higher-class reservations are /// honored by lower-class admissions via the packed-CAS slot @@ -101,6 +103,30 @@ impl ClassRuntimeConfig { } } +/// Per-tenant policy entry in the YAML file. +/// +/// Future fields (`weight`, `slot_quota`, `rps_cap`) are additive: adding +/// them is non-breaking because the trait +/// [`super::policy::TenantPolicyResolver`] returns the whole struct. +#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] +pub struct TenantPolicyConfig { + pub max_class: Class, +} + +/// Optional YAML config loaded via `--priority-scheduler-config `. +/// +/// Both maps are absent-as-empty: an empty document parses to +/// `PrioritySchedulerYaml::default()`, and downstream +/// [`super::SchedulerSettings::from_cli_and_yaml`] fills in built-in +/// defaults for any class that wasn't overridden. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct PrioritySchedulerYaml { + #[serde(default)] + pub classes: HashMap, + #[serde(default)] + pub tenant_policies: HashMap, +} + #[cfg(test)] mod tests { use std::time::Duration; @@ -169,4 +195,83 @@ mod tests { let bulk = ClassRuntimeConfig::from_class_config(&ClassConfig::default_for(Class::Bulk)); assert!(!bulk.can_preempt); } + + // ── PrioritySchedulerYaml serde ─────────────────────────────────── + + #[test] + fn test_yaml_empty_document_yields_default() { + let parsed: PrioritySchedulerYaml = serde_yaml::from_str("").unwrap(); + assert!(parsed.classes.is_empty()); + assert!(parsed.tenant_policies.is_empty()); + } + + #[test] + fn test_yaml_partial_class_override_round_trips() { + let yaml = r" +classes: + interactive: + reserved: 200 + queue_size: 256 + queue_size_per_slot: 0.25 + queue_timeout_secs: 30 + starvation_threshold_secs: 5 + can_preempt: true +"; + let parsed: PrioritySchedulerYaml = serde_yaml::from_str(yaml).unwrap(); + let interactive = parsed + .classes + .get(&Class::Interactive) + .expect("interactive present"); + assert_eq!(interactive.reserved, 200); + // Only one class entry — others are absent (settings layer fills defaults). + assert_eq!(parsed.classes.len(), 1); + assert!(parsed.tenant_policies.is_empty()); + } + + #[test] + fn test_yaml_tenant_policy_round_trips() { + let yaml = r#" +tenant_policies: + "auth:acme": + max_class: interactive + "auth:internal-cron": + max_class: system +"#; + let parsed: PrioritySchedulerYaml = serde_yaml::from_str(yaml).unwrap(); + assert_eq!(parsed.tenant_policies.len(), 2); + assert_eq!( + parsed.tenant_policies["auth:acme"].max_class, + Class::Interactive + ); + assert_eq!( + parsed.tenant_policies["auth:internal-cron"].max_class, + Class::System + ); + } + + #[test] + fn test_yaml_unknown_class_value_is_serde_error() { + let yaml = r#" +tenant_policies: + "auth:acme": + max_class: garbage +"#; + let result: Result = serde_yaml::from_str(yaml); + assert!(result.is_err(), "expected serde error for unknown class"); + } + + #[test] + fn test_yaml_class_name_serializes_as_lowercase() { + let mut classes = HashMap::new(); + classes.insert(Class::Bulk, ClassConfig::default_for(Class::Bulk)); + let yaml = PrioritySchedulerYaml { + classes, + tenant_policies: Default::default(), + }; + let rendered = serde_yaml::to_string(&yaml).unwrap(); + assert!( + rendered.contains("bulk:"), + "class key should serialize as lowercase: {rendered}" + ); + } } diff --git a/model_gateway/src/middleware/scheduler/mod.rs b/model_gateway/src/middleware/scheduler/mod.rs index 5fad027c5..580edbeda 100644 --- a/model_gateway/src/middleware/scheduler/mod.rs +++ b/model_gateway/src/middleware/scheduler/mod.rs @@ -8,4 +8,4 @@ pub mod class; pub mod config; pub use class::{Class, PRIORITY_HEADER}; -pub use config::{ClassConfig, ClassRuntimeConfig}; +pub use config::{ClassConfig, ClassRuntimeConfig, PrioritySchedulerYaml, TenantPolicyConfig}; From 7daf997dfab999b283b090350a29f4519fb4103b Mon Sep 17 00:00:00 2001 From: Chang Su <8605658+CatherineSue@users.noreply.github.com> Date: Sun, 24 May 2026 18:36:04 -0700 Subject: [PATCH 4/5] feat(scheduler): SchedulerSettings::from_cli_and_yaml builder Merges built-in defaults + optional YAML + CLI flags into one runtime config. Validates per-field invariants (non-negative multiplier, positive timeouts); capacity-vs-reserved validation deferred to PriorityScheduler. Signed-off-by: Chang Su <8605658+CatherineSue@users.noreply.github.com> --- .../src/middleware/scheduler/config.rs | 224 +++++++++++++++++- model_gateway/src/middleware/scheduler/mod.rs | 5 +- 2 files changed, 226 insertions(+), 3 deletions(-) diff --git a/model_gateway/src/middleware/scheduler/config.rs b/model_gateway/src/middleware/scheduler/config.rs index 18718cae4..4356ecdc4 100644 --- a/model_gateway/src/middleware/scheduler/config.rs +++ b/model_gateway/src/middleware/scheduler/config.rs @@ -5,6 +5,7 @@ use std::{collections::HashMap, time::Duration}; use serde::{Deserialize, Serialize}; use super::Class; +use crate::tenant::TenantKey; /// Per-class configuration as it appears in the optional YAML file. /// @@ -117,8 +118,8 @@ pub struct TenantPolicyConfig { /// /// Both maps are absent-as-empty: an empty document parses to /// `PrioritySchedulerYaml::default()`, and downstream -/// [`super::SchedulerSettings::from_cli_and_yaml`] fills in built-in -/// defaults for any class that wasn't overridden. +/// [`SchedulerSettings::from_cli_and_yaml`] fills in built-in defaults +/// for any class that wasn't overridden. #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct PrioritySchedulerYaml { #[serde(default)] @@ -127,6 +128,113 @@ pub struct PrioritySchedulerYaml { pub tenant_policies: HashMap, } +/// Per-field validation failures discovered while assembling +/// [`SchedulerSettings`]. Capacity-vs-reserved validation lives later +/// (the scheduler doesn't know the backend capacity until +/// [`super::scheduler::PriorityScheduler::new`] receives it). +#[derive(Debug, thiserror::Error, PartialEq)] +pub enum SettingsValidationError { + #[error("class {class:?}: queue_size_per_slot must be >= 0.0")] + NegativeMultiplier { class: Class }, + #[error("class {class:?}: queue_timeout_secs must be > 0")] + ZeroQueueTimeout { class: Class }, + #[error("class {class:?}: starvation_threshold_secs must be > 0")] + ZeroStarvationThreshold { class: Class }, +} + +/// Runtime scheduler configuration assembled from CLI flags + the +/// optional YAML file + built-in defaults. Built once at startup and +/// then read-only. +/// +/// Capacity is *not* stored here — it lives on +/// [`super::scheduler::PriorityScheduler`] as a live `AtomicU16` fed by +/// [`crate::worker::WorkerCapacity`] (PR 1). The scheduler reacts to +/// capacity changes via the watch channel; settings stay fixed. +#[derive(Debug, Clone)] +pub struct SchedulerSettings { + /// Master switch. When `false`, the legacy `concurrency_limit_middleware` + /// stays wired in `server.rs` and none of the scheduler types are + /// constructed. + pub enabled: bool, + /// Tenant policy applied when a tenant is not in `tenant_policies`. + pub default_max_class: Class, + /// Per-class config, indexed by `class as usize`. Always populated + /// for all four classes — YAML overrides land on top of + /// [`ClassConfig::default_for`]. + classes: [ClassConfig; 4], + /// Per-tenant clamp lookup. Keys come from + /// [`crate::tenant::RouteRequestMeta::tenant_key`]. + pub tenant_policies: HashMap, + /// Cap on the number of tenants emitted as labels for + /// `scheduler_tenant_*` gauges. Everything past the cap is + /// bucketed under `tenant="other"`. + pub tenant_metric_top_n: u32, +} + +impl SchedulerSettings { + /// Indexed accessor for the per-class config. + pub fn class_config(&self, class: Class) -> &ClassConfig { + &self.classes[class as usize] + } + + /// Assemble settings from CLI flags + optional YAML, validating + /// per-field invariants. Capacity-vs-reserved validation is the + /// scheduler's job at construction time. + /// + /// Merge order: built-in defaults → YAML overrides per class. CLI + /// flags supply the top-level fields (`enabled`, `default_max_class`, + /// `tenant_metric_top_n`) since they aren't representable in the YAML. + pub fn from_cli_and_yaml( + enabled: bool, + default_max_class: Class, + tenant_metric_top_n: u32, + yaml: Option<&PrioritySchedulerYaml>, + ) -> Result { + let mut classes: [ClassConfig; 4] = [ + ClassConfig::default_for(Class::Bulk), + ClassConfig::default_for(Class::Default), + ClassConfig::default_for(Class::Interactive), + ClassConfig::default_for(Class::System), + ]; + + if let Some(yaml) = yaml { + for (class, override_cfg) in &yaml.classes { + classes[*class as usize] = *override_cfg; + } + } + + for class in Class::ALL { + let cfg = &classes[class as usize]; + if cfg.queue_size_per_slot < 0.0 { + return Err(SettingsValidationError::NegativeMultiplier { class }); + } + if cfg.queue_timeout_secs == 0 { + return Err(SettingsValidationError::ZeroQueueTimeout { class }); + } + if cfg.starvation_threshold_secs == 0 { + return Err(SettingsValidationError::ZeroStarvationThreshold { class }); + } + } + + let tenant_policies = yaml + .map(|y| { + y.tenant_policies + .iter() + .map(|(k, v)| (TenantKey::new(k), *v)) + .collect() + }) + .unwrap_or_default(); + + Ok(Self { + enabled, + default_max_class, + classes, + tenant_policies, + tenant_metric_top_n, + }) + } +} + #[cfg(test)] mod tests { use std::time::Duration; @@ -274,4 +382,116 @@ tenant_policies: "class key should serialize as lowercase: {rendered}" ); } + + // ── SchedulerSettings::from_cli_and_yaml ───────────────────────── + + use crate::tenant::TenantKey; + + #[test] + fn test_settings_no_yaml_uses_builtin_defaults() { + let s = SchedulerSettings::from_cli_and_yaml(false, Class::Default, 32, None).unwrap(); + assert!(!s.enabled); + assert_eq!(s.default_max_class, Class::Default); + assert_eq!(s.tenant_metric_top_n, 32); + for class in Class::ALL { + assert_eq!(s.class_config(class), &ClassConfig::default_for(class)); + } + assert!(s.tenant_policies.is_empty()); + } + + #[test] + fn test_settings_yaml_partial_override_merges_with_defaults() { + let mut classes = HashMap::new(); + let mut interactive = ClassConfig::default_for(Class::Interactive); + interactive.reserved = 64; + classes.insert(Class::Interactive, interactive); + let yaml = PrioritySchedulerYaml { + classes, + tenant_policies: Default::default(), + }; + let s = + SchedulerSettings::from_cli_and_yaml(true, Class::Default, 32, Some(&yaml)).unwrap(); + assert_eq!(s.class_config(Class::Interactive).reserved, 64); + // Bulk untouched — still equal to built-in default. + assert_eq!( + s.class_config(Class::Bulk), + &ClassConfig::default_for(Class::Bulk) + ); + } + + #[test] + fn test_settings_yaml_tenant_policy_propagates() { + let mut tenant_policies = HashMap::new(); + tenant_policies.insert( + "auth:acme".to_string(), + TenantPolicyConfig { + max_class: Class::Interactive, + }, + ); + let yaml = PrioritySchedulerYaml { + classes: Default::default(), + tenant_policies, + }; + let s = + SchedulerSettings::from_cli_and_yaml(true, Class::Default, 32, Some(&yaml)).unwrap(); + assert_eq!(s.tenant_policies.len(), 1); + let key = TenantKey::new("auth:acme"); + assert_eq!(s.tenant_policies[&key].max_class, Class::Interactive); + } + + #[test] + fn test_settings_rejects_negative_multiplier() { + let mut classes = HashMap::new(); + let mut interactive = ClassConfig::default_for(Class::Interactive); + interactive.queue_size_per_slot = -1.0; + classes.insert(Class::Interactive, interactive); + let yaml = PrioritySchedulerYaml { + classes, + tenant_policies: Default::default(), + }; + let err = SchedulerSettings::from_cli_and_yaml(true, Class::Default, 32, Some(&yaml)) + .unwrap_err(); + assert!(matches!( + err, + SettingsValidationError::NegativeMultiplier { + class: Class::Interactive + } + )); + } + + #[test] + fn test_settings_rejects_zero_queue_timeout() { + let mut classes = HashMap::new(); + let mut bulk = ClassConfig::default_for(Class::Bulk); + bulk.queue_timeout_secs = 0; + classes.insert(Class::Bulk, bulk); + let yaml = PrioritySchedulerYaml { + classes, + tenant_policies: Default::default(), + }; + let err = SchedulerSettings::from_cli_and_yaml(true, Class::Default, 32, Some(&yaml)) + .unwrap_err(); + assert!(matches!( + err, + SettingsValidationError::ZeroQueueTimeout { class: Class::Bulk } + )); + } + + #[test] + fn test_settings_rejects_zero_starvation_threshold() { + let mut classes = HashMap::new(); + let mut bulk = ClassConfig::default_for(Class::Bulk); + bulk.starvation_threshold_secs = 0; + classes.insert(Class::Bulk, bulk); + let yaml = PrioritySchedulerYaml { + classes, + tenant_policies: Default::default(), + }; + let err = SchedulerSettings::from_cli_and_yaml(true, Class::Default, 32, Some(&yaml)) + .unwrap_err(); + assert!(matches!( + err, + SettingsValidationError::ZeroStarvationThreshold { class: Class::Bulk } + )); + } } diff --git a/model_gateway/src/middleware/scheduler/mod.rs b/model_gateway/src/middleware/scheduler/mod.rs index 580edbeda..dbd8fc2ef 100644 --- a/model_gateway/src/middleware/scheduler/mod.rs +++ b/model_gateway/src/middleware/scheduler/mod.rs @@ -8,4 +8,7 @@ pub mod class; pub mod config; pub use class::{Class, PRIORITY_HEADER}; -pub use config::{ClassConfig, ClassRuntimeConfig, PrioritySchedulerYaml, TenantPolicyConfig}; +pub use config::{ + ClassConfig, ClassRuntimeConfig, PrioritySchedulerYaml, SchedulerSettings, + SettingsValidationError, TenantPolicyConfig, +}; From a9943ba220534d9a72e70f39252252f4cbe07dc4 Mon Sep 17 00:00:00 2001 From: Chang Su <8605658+CatherineSue@users.noreply.github.com> Date: Sun, 24 May 2026 18:41:38 -0700 Subject: [PATCH 5/5] feat(scheduler): tenant policy resolver trait + static impl Trait-shaped so a future async store-backed impl can land without touching the admission path. v1 impl is a HashMap lookup with a default fallback. TenantPolicy is the future growth point (weight, slot_quota, rps_cap). Signed-off-by: Chang Su <8605658+CatherineSue@users.noreply.github.com> --- model_gateway/src/middleware/scheduler/mod.rs | 2 + .../src/middleware/scheduler/policy.rs | 140 ++++++++++++++++++ 2 files changed, 142 insertions(+) create mode 100644 model_gateway/src/middleware/scheduler/policy.rs diff --git a/model_gateway/src/middleware/scheduler/mod.rs b/model_gateway/src/middleware/scheduler/mod.rs index dbd8fc2ef..0ee5ccc9d 100644 --- a/model_gateway/src/middleware/scheduler/mod.rs +++ b/model_gateway/src/middleware/scheduler/mod.rs @@ -6,9 +6,11 @@ pub mod class; pub mod config; +pub mod policy; pub use class::{Class, PRIORITY_HEADER}; pub use config::{ ClassConfig, ClassRuntimeConfig, PrioritySchedulerYaml, SchedulerSettings, SettingsValidationError, TenantPolicyConfig, }; +pub use policy::{StaticTenantPolicyResolver, TenantPolicy, TenantPolicyResolver}; diff --git a/model_gateway/src/middleware/scheduler/policy.rs b/model_gateway/src/middleware/scheduler/policy.rs new file mode 100644 index 000000000..9764d4425 --- /dev/null +++ b/model_gateway/src/middleware/scheduler/policy.rs @@ -0,0 +1,140 @@ +//! Tenant policy resolver: maps a `TenantKey` to a [`TenantPolicy`]. +//! +//! Trait-shaped so a future store-backed implementation (with async lookup + +//! sync cache) can land without touching the admission middleware. + +use std::{collections::HashMap, sync::Arc}; + +use super::{Class, SchedulerSettings}; +use crate::tenant::TenantKey; + +/// Per-tenant policy. The single field today is [`max_class`]; future +/// non-breaking additions: `weight: u32`, `slot_quota: Option`, +/// `rps_cap: Option`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct TenantPolicy { + pub max_class: Class, +} + +/// Lookup interface used by the admission middleware. A future async +/// store-backed impl can land via a sync cache wrapper without changing +/// the call site. +pub trait TenantPolicyResolver: Send + Sync { + fn policy(&self, tenant: &TenantKey) -> TenantPolicy; +} + +/// Static in-memory resolver — what gets used at v1. +/// +/// Built once from [`SchedulerSettings`] at gateway startup; lookups are +/// pure `HashMap::get` with a default-policy fallback. +pub struct StaticTenantPolicyResolver { + policies: HashMap, + default: TenantPolicy, +} + +impl StaticTenantPolicyResolver { + /// Build from the runtime settings. Reads `tenant_policies` and + /// `default_max_class`; never returns an error (validation happens + /// inside [`SchedulerSettings::from_cli_and_yaml`]). + pub fn from_settings(settings: &SchedulerSettings) -> Self { + let policies = settings + .tenant_policies + .iter() + .map(|(key, cfg)| { + ( + key.clone(), + TenantPolicy { + max_class: cfg.max_class, + }, + ) + }) + .collect(); + Self { + policies, + default: TenantPolicy { + max_class: settings.default_max_class, + }, + } + } + + /// Test-only constructor used by call sites that don't want to build + /// a full [`SchedulerSettings`] just to exercise the resolver. + #[cfg(test)] + pub(crate) fn with_default(default_max_class: Class) -> Self { + Self { + policies: HashMap::new(), + default: TenantPolicy { + max_class: default_max_class, + }, + } + } +} + +impl TenantPolicyResolver for StaticTenantPolicyResolver { + fn policy(&self, tenant: &TenantKey) -> TenantPolicy { + self.policies.get(tenant).copied().unwrap_or(self.default) + } +} + +impl TenantPolicyResolver for Arc { + fn policy(&self, tenant: &TenantKey) -> TenantPolicy { + (**self).policy(tenant) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::middleware::scheduler::{PrioritySchedulerYaml, TenantPolicyConfig}; + + #[test] + fn test_empty_resolver_returns_default() { + let resolver = StaticTenantPolicyResolver::with_default(Class::Default); + let policy = resolver.policy(&TenantKey::new("anonymous")); + assert_eq!(policy.max_class, Class::Default); + } + + #[test] + fn test_known_tenant_overrides_default() { + let mut tenant_policies = HashMap::new(); + tenant_policies.insert( + "auth:acme".to_string(), + TenantPolicyConfig { + max_class: Class::Interactive, + }, + ); + let yaml = PrioritySchedulerYaml { + classes: Default::default(), + tenant_policies, + }; + let settings = + SchedulerSettings::from_cli_and_yaml(true, Class::Default, 32, Some(&yaml)).unwrap(); + let resolver = StaticTenantPolicyResolver::from_settings(&settings); + + let known = resolver.policy(&TenantKey::new("auth:acme")); + assert_eq!(known.max_class, Class::Interactive); + + // Unknown tenant still gets the default. + let unknown = resolver.policy(&TenantKey::new("anonymous")); + assert_eq!(unknown.max_class, Class::Default); + } + + #[test] + fn test_resolver_trait_object_dispatches() { + let resolver: Arc = + Arc::new(StaticTenantPolicyResolver::with_default(Class::Interactive)); + // Dispatch through the &dyn TenantPolicyResolver impl on Arc. + let policy = resolver.policy(&TenantKey::new("anything")); + assert_eq!(policy.max_class, Class::Interactive); + } + + #[test] + fn test_clamp_via_min_with_resolved_policy() { + // Demonstrate the admission-time clamp: effective = min(header, policy.max_class). + let resolver = StaticTenantPolicyResolver::with_default(Class::Default); + let header_class = Class::Interactive; // client asked for interactive + let policy = resolver.policy(&TenantKey::new("anonymous")); + let effective = std::cmp::min(header_class, policy.max_class); + assert_eq!(effective, Class::Default); + } +}