diff --git a/changes/9566.feature.md b/changes/9566.feature.md new file mode 100644 index 00000000000..66ace8236c9 --- /dev/null +++ b/changes/9566.feature.md @@ -0,0 +1 @@ +Add the DEPLOYING lifecycle with strategy evaluator framework, sub-step handlers (PROVISIONING, PROGRESSING, ROLLED_BACK), and coordinator integration for BEP-1049. \ No newline at end of file diff --git a/proposals/BEP-1049-deployment-strategy-handler.md b/proposals/BEP-1049-deployment-strategy-handler.md index 3de4e24338a..10e7d14f9ea 100644 --- a/proposals/BEP-1049-deployment-strategy-handler.md +++ b/proposals/BEP-1049-deployment-strategy-handler.md @@ -28,15 +28,15 @@ Blue-Green deployment spans multiple coordinator cycles through several phases: Rolling Update similarly progresses gradually across cycles. Both strategies **keep the deployment in `DEPLOYING` state across multiple processing cycles until strategy completion or rollback.** -### Evaluator + Sub-Step Handler Pattern +### Composite Handler Pattern (DeployingHandler) -A single `evaluate()` call may produce different sub-steps for different deployments — some completed, others still PROGRESSING. To handle this, a **strategy evaluator** groups deployments by sub-step, and **per-sub-step handlers** process each group. Completed deployments are returned separately in `EvaluationResult.completed` and processed via the PROGRESSING handler's `post_process`. +A single `evaluate()` call may produce different sub-steps for different deployments — some completed, others still PROGRESSING. To handle this, DEPLOYING is represented as a **composite handler** (`DeployingHandler`) that internally owns the strategy evaluator and sub-step handlers. The coordinator treats DEPLOYING identically to every other lifecycle type through the unified `prepare → execute → finalize → post_process` flow. | Aspect | How it works | |--------|-------------| -| **State transition** | Each sub-step handler returns explicit `next_status()` → coordinator's generic path handles all transitions | -| **Routing** | Coordinator branches to evaluator path for `DeploymentLifecycleType.DEPLOYING` | -| **Cycles** | Evaluator runs strategy FSM + applies route changes → handlers process results → coordinator records history | +| **State transition** | Each sub-step handler returns explicit `status_transitions()` → coordinator's generic path handles all transitions | +| **Routing** | No special branching — `DeployingHandler.prepare()` runs the evaluator and returns sub-step handler tasks | +| **Cycles** | `prepare()`: evaluator runs strategy FSM + applies route changes → coordinator executes sub-step handlers → `finalize()`: records evaluation outcomes + transitions completed deployments | ## Sub-documents @@ -50,7 +50,7 @@ A single `evaluate()` call may produce different sub-steps for different deploym ### Overall Architecture -Core idea: A **strategy evaluator** evaluates DEPLOYING-state deployments and groups them by sub-step, then **per-sub-step handlers** process each group. The coordinator's generic `_handle_status_transitions()` path handles all history recording and lifecycle transitions. +Core idea: All lifecycle types — including DEPLOYING — follow a **single coordinator code path**: `prepare → execute → finalize → post_process`. The base `DeploymentHandler` provides default `prepare()` (returns self as single task) and `finalize()` (no-op). The composite `DeployingHandler` overrides these to run strategy evaluation, apply route changes, dispatch to sub-step handlers, and transition completed deployments. ``` ┌──────────────────────────────────────────────────────────────────────────────┐ @@ -76,14 +76,20 @@ Core idea: A **strategy evaluator** evaluates DEPLOYING-state deployments and gr ┌──────────────────────────────────────────────────────────────────────────────┐ │ DeploymentCoordinator │ │ │ -│ process_deployment_lifecycle(type) │ -│ evaluator = evaluators.get(type) │ -│ ├─ evaluator exists → _process_with_evaluator() (evaluator path) │ -│ └─ no evaluator → existing single-handler path │ +│ process_deployment_lifecycle(type) ← single unified code path │ +│ handler = handlers[type] │ +│ ├─ handler.prepare(deployments) → handler_tasks │ +│ ├─ _execute_and_transition_handlers(handler_tasks) │ +│ ├─ handler.finalize(records) │ +│ └─ _post_process_handlers(results) │ │ │ -│ Handler map key: HandlerKey │ -│ DeploymentLifecycleType ← single handlers │ -│ | (DeploymentLifecycleType, DeploymentSubStep) ← sub-step handlers │ +│ Handler map: Mapping[DeploymentLifecycleType, DeploymentHandler] │ +│ ├─ CHECK_PENDING → CheckPendingHandler │ +│ ├─ CHECK_REPLICA → CheckReplicaHandler │ +│ ├─ SCALING → ScalingHandler │ +│ ├─ RECONCILE → ReconcileHandler │ +│ ├─ DEPLOYING → DeployingHandler (composite) │ +│ └─ DESTROYING → DestroyingHandler │ │ │ │ Result handling (same generic path for all handlers): │ │ successes → next_status (transition + history) │ @@ -91,35 +97,28 @@ Core idea: A **strategy evaluator** evaluates DEPLOYING-state deployments and gr │ skipped → keep (no transition) │ └────────────────┬─────────────────────────────────────────────────────────────┘ │ - ┌──────────┴──────────────────────────┐ - ▼ ▼ -┌─────────────────────┐ ┌──────────────────────────────────────────────────┐ -│ DeploymentHandler │ │ DeploymentStrategyEvaluator │ -│ (single-handler) │ │ (evaluator path — DEPLOYING only) │ -│ │ │ │ -│ Implementations: │ │ evaluate(deployments) → EvaluationResult │ -│ ├─ CheckPending │ │ 1. Load policies/routes │ -│ ├─ Scaling │ │ 2. Run strategy FSM → CycleEvaluationResult │ -│ ├─ CheckReplica │ │ 3. Apply route changes (scale_out/scale_in) │ -│ ├─ Reconcile │ │ 4. Group by sub-step │ -│ └─ Destroying │ └───────────────┬──────────────────────────────────┘ -└─────────────────────┘ │ - ▼ - ┌──────────────────────────────────────┐ - │ Per-Sub-Step Handlers (composite) │ - │ │ - │ (DEPLOYING, PROVISIONING) │ - │ → DeployingProvisioningHandler │ - │ (DEPLOYING, PROGRESSING) │ - │ → DeployingProgressingHandler │ - │ next_status: DEPLOYING │ - │ post_process: revision swap │ - │ for completed deployments │ - │ │ - │ (DEPLOYING, ROLLED_BACK) │ - │ → DeployingRolledBackHandler │ - │ next_status: READY │ - └──────────────────────────────────────┘ + ▼ +┌──────────────────────────────────────────────────────────────────────────────┐ +│ DeploymentHandler (base) │ +│ ├─ prepare(deployments) → [(self, deployments)] ← default: single task │ +│ ├─ execute(deployments) → result ← abstract │ +│ ├─ finalize(records) → no-op ← default │ +│ └─ post_process(result) → ... ← abstract │ +│ │ +│ Simple handlers (CheckPending, Scaling, etc.): │ +│ Use defaults — prepare returns self, finalize is no-op │ +│ │ +│ DeployingHandler (composite): │ +│ ├─ prepare(): evaluator.evaluate() + apply route changes │ +│ │ → [(sub_handler, subset), ...] for each sub-step │ +│ ├─ finalize(): record evaluation outcomes + transition completed │ +│ └─ owns: │ +│ ├─ DeploymentStrategyEvaluator │ +│ └─ sub_step_handlers: │ +│ ├─ PROVISIONING → DeployingProvisioningHandler │ +│ ├─ PROGRESSING → DeployingProgressingHandler │ +│ └─ ROLLED_BACK → DeployingRolledBackHandler │ +└──────────────────────────────────────────────────────────────────────────────┘ ``` ### Revision Activation Trigger Branching @@ -168,11 +167,11 @@ Both Blue-Green and Rolling Update cycle FSMs share a common set of **sub-step v | **progressing** | Strategy making active progress — health checks pending, promotion waiting, or routes being replaced | DeployingProgressingHandler | DEPLOYING → DEPLOYING | | **rolled_back** | Strategy failed — rolled back to previous revision | DeployingRolledBackHandler | DEPLOYING → READY | -Completion is not a sub-step but a signal on `CycleEvaluationResult.completed`. When the strategy FSM detects that all new routes are healthy and no old routes remain, it returns `CycleEvaluationResult(sub_step=PROGRESSING, completed=True)`. The evaluator collects these into `EvaluationResult.completed`, and the coordinator passes them to the PROGRESSING handler's `post_process` for revision swap, then transitions to READY. +Completion is not a sub-step but a signal on `CycleEvaluationResult.completed`. When the strategy FSM detects that all new routes are healthy and no old routes remain, it returns `CycleEvaluationResult(sub_step=PROGRESSING, completed=True)`. The evaluator collects these into `EvaluationResult.completed`, and the coordinator directly calls `_transition_completed_deployments()` which atomically performs the revision swap and DEPLOYING→READY transition. ### DeploymentStrategyEvaluator -`DeploymentStrategyEvaluator` evaluates DEPLOYING-state deployments and groups them by sub-step. It is a separate component (not a handler) that the coordinator invokes before handler execution. +`DeploymentStrategyEvaluator` evaluates DEPLOYING-state deployments and groups them by sub-step. It is owned by `DeployingHandler`, which invokes it during `prepare()`. The coordinator does not interact with the evaluator directly. #### Execution Flow @@ -182,7 +181,7 @@ DeploymentStrategyEvaluator.evaluate(deployments) │ Phase 1: Load policies and routes │ ┌─────────────────────────────────────────────────────────┐ │ │ policy_map = load_policies(deployments) │ - │ │ route_map = fetch_active_routes_by_endpoint_ids(...) │ + │ │ route_map = fetch_routes_by_endpoint_ids(...) │ │ └─────────────────────────────────────────────────────────┘ │ │ Phase 2: Run per-deployment strategy FSM @@ -191,10 +190,8 @@ DeploymentStrategyEvaluator.evaluate(deployments) │ │ policy = policy_map[deployment.id] │ │ │ routes = route_map[deployment.id] │ │ │ │ - │ │ if policy.strategy == ROLLING: │ - │ │ cycle_result = rolling_update_evaluate(...) │ - │ │ elif policy.strategy == BLUE_GREEN: │ - │ │ cycle_result = blue_green_evaluate(...) │ + │ │ strategy_fsm = create_strategy(policy) │ + │ │ cycle_result = strategy_fsm.evaluate_cycle(...) │ │ │ │ │ │ if cycle_result.completed: │ │ │ completed.append(deployment) │ @@ -202,14 +199,6 @@ DeploymentStrategyEvaluator.evaluate(deployments) │ │ groups[cycle_result.sub_step].append(deployment) │ │ └─────────────────────────────────────────────────────────┘ │ - │ Phase 3: Apply route changes (in-progress only) - │ ┌─────────────────────────────────────────────────────────┐ - │ │ Collect route changes from PROVISIONING/PROGRESSING: │ - │ │ scale_out_creators → create new routes │ - │ │ scale_in_updater → terminate old routes │ - │ │ repo.scale_routes(scale_out, scale_in) │ - │ └─────────────────────────────────────────────────────────┘ - │ ▼ EvaluationResult { groups: { @@ -219,18 +208,23 @@ DeploymentStrategyEvaluator.evaluate(deployments) completed: [deploy_D], # strategy completed (revision swap pending) skipped: [deploy_E], # no policy / unsupported strategy errors: [error_F], # exception during evaluation + route_changes: RouteChanges { + rollout_specs: [Creator, ...], # new routes to create + drain_route_ids: [UUID, ...], # old routes to terminate + promote_route_ids: [UUID, ...], # green routes to activate (Blue-Green) + }, } ``` #### Key Design Principles -1. **Route changes are applied by the evaluator**: scale_out/scale_in are applied once in the evaluator. Individual handlers do not touch routes. -2. **Strategy FSMs live in the evaluator**: `_rolling_update_evaluate()`, `_blue_green_evaluate()` and other strategy FSM logic are internal helper methods of the evaluator. -3. **Only grouping is returned**: The evaluator classifies deployments by sub-step; actual processing (revision swap, deploying_revision cleanup, etc.) is delegated to handlers. +1. **Route changes are aggregated by the evaluator, applied by `DeployingHandler`**: The evaluator collects route mutations (rollout/drain/promote) from each strategy FSM into `EvaluationResult.route_changes`. `DeployingHandler._apply_route_changes()` applies them during `prepare()`. Individual sub-step handlers do not touch routes. +2. **Strategy FSMs implement a common interface via registry**: All strategy implementations extend the `BaseDeploymentStrategy` abstract base class and implement `evaluate_cycle()`. Concrete classes (`RollingUpdateStrategy`, `BlueGreenStrategy`) live in dedicated module files (`strategy/rolling_update.py`, `strategy/blue_green.py`). `DeployingHandler` creates and owns the `DeploymentStrategyRegistry`, which is injected into the evaluator to instantiate the appropriate strategy per deployment. +3. **Only grouping is returned**: The evaluator classifies deployments by sub-step; actual processing (revision swap, deploying_revision cleanup, etc.) is delegated to `DeployingHandler.finalize()` and sub-step handlers. ### Per-Sub-Step Handlers -Each handler is registered with a `(DeploymentLifecycleType, DeploymentSubStep)` composite key in the coordinator. +Each sub-step handler is owned by `DeployingHandler` and registered in its `sub_step_handlers` map keyed by `DeploymentSubStep`. They are not directly visible to the coordinator. #### State Transition Type: `DeploymentLifecycleStatus` @@ -247,7 +241,7 @@ The coordinator's `_handle_status_transitions()` extracts `.lifecycle` for DB up #### DeployingInProgressHandler (base) → Provisioning / Progressing -PROVISIONING and PROGRESSING share the same logic (evaluator already applied route changes; handler returns success + reschedules), so `DeployingInProgressHandler` base class defines common behavior, and subclasses hard-code their sub-step-specific `next_status()` and `status_transitions()`: +PROVISIONING and PROGRESSING share the same logic (coordinator already applied route changes; handler returns success + reschedules), so `DeployingInProgressHandler` base class defines common behavior, and subclasses hard-code their sub-step-specific `next_status()` and `status_transitions()`: ```python class DeployingInProgressHandler(DeploymentHandler): @@ -262,24 +256,18 @@ class DeployingInProgressHandler(DeploymentHandler): return None async def execute(self, deployments): - # Route changes already applied by evaluator + # Route changes already applied by coordinator return DeploymentExecutionResult(successes=list(deployments)) async def post_process(self, result): - if result.successes: - await self._deployment_controller.mark_lifecycle_needed( - DeploymentLifecycleType.DEPLOYING # reschedule next cycle - ) + # Re-schedule DEPLOYING for the next coordinator cycle + await self._deployment_controller.mark_lifecycle_needed( + DeploymentLifecycleType.DEPLOYING + ) + # Trigger route provisioning so new routes get sessions await self._route_controller.mark_lifecycle_needed( - RouteLifecycleType.PROVISIONING # trigger new route provisioning + RouteLifecycleType.PROVISIONING ) - # Revision swap for completed deployments - # (coordinator attaches eval_result.completed to result.completed) - if result.completed: - swap_ids = [d.id for d in result.completed - if d.deploying_revision_id is not None] - if swap_ids: - await repo.complete_deployment_revision_swap(swap_ids) class DeployingProvisioningHandler(DeployingInProgressHandler): @@ -302,7 +290,7 @@ class DeployingProgressingHandler(DeployingInProgressHandler): `next_status().lifecycle == DEPLOYING` so the coordinator records DEPLOYING→DEPLOYING SUCCESS history for in-progress deployments. The deployment stays in DEPLOYING state and is re-evaluated next cycle. -For completed deployments, the coordinator passes `EvaluationResult.completed` to the PROGRESSING handler's `post_process` via `result.completed`. The handler performs the revision swap, then the coordinator transitions the deployment to READY with history recording. +For completed deployments, the coordinator directly calls `_transition_completed_deployments()` after all handler post-processing. This method atomically performs the revision swap (`complete_deployment_revision_swap`) and transitions the deployment to READY with history recording. #### DeployingRolledBackHandler (ROLLED_BACK) @@ -329,52 +317,51 @@ class DeployingRolledBackHandler(DeploymentHandler): On rollback, only `deploying_revision` is cleared; `current_revision` is preserved. The coordinator transitions to READY. -### Coordinator Evaluator Path (`_process_with_evaluator`) +### Unified Coordinator Flow -The coordinator takes a separate path for lifecycle types that have an evaluator registered in `_deployment_evaluators`: +The coordinator uses a single code path for all lifecycle types, including DEPLOYING: ``` -_process_with_evaluator(lifecycle_type, evaluator) +process_deployment_lifecycle(lifecycle_type) │ - │ 1. Acquire distributed lock (evaluator.lock_id) - │ 2. Query DEPLOYING-state deployments + │ 1. Look up handler by lifecycle_type (simple enum key) + │ 2. Acquire distributed lock if handler.lock_id is set + │ 3. Query deployments by handler.target_statuses() │ - │ 3. Enter DeploymentRecorderContext.scope() + │ 4. Enter DeploymentRecorderContext.scope() │ ┌───────────────────────────────────────────────────────────────┐ │ │ │ - │ │ eval_result = evaluator.evaluate(deployments) │ + │ │ handler_tasks = handler.prepare(deployments) │ + │ │ ↑ simple handlers: [(self, deployments)] │ + │ │ ↑ DeployingHandler: evaluator.evaluate() │ + │ │ + _apply_route_changes() │ + │ │ → [(sub_handler_A, subset_A), (sub_handler_B, subset_B)] │ │ │ │ - │ │ for sub_step, group in eval_result.groups: │ - │ │ handler = handlers[(lifecycle_type, sub_step)] │ - │ │ result = handler.execute(group) │ - │ │ handler_results[sub_step] = (handler, result) │ + │ │ for (h, deps) in handler_tasks: │ + │ │ result = h.execute(deps) │ │ │ │ │ │ all_records = pool.build_all_records() │ │ │ │ - │ │ for sub_step, (handler, result) in handler_results: │ - │ │ _handle_status_transitions(handler, result, all_records) │ - │ │ ↑ same generic transition logic as single-handler path │ + │ │ for (h, result) in handler_results: │ + │ │ _handle_status_transitions(h, result, all_records) │ + │ │ ↑ same generic transition logic for ALL handlers │ │ │ │ │ └───────────────────────────────────────────────────────────────┘ │ - │ 4. Attach completed deployments to PROGRESSING handler's result - │ if eval_result.completed: - │ handler_results[PROGRESSING].result.completed = eval_result.completed - │ - │ 5. Post-process outside RecorderContext scope - │ for sub_step, (handler, result) in handler_results: - │ handler.post_process(result) - │ ↑ PROGRESSING handler performs revision swap for result.completed + │ 5. handler.finalize(all_records) + │ ↑ simple handlers: no-op + │ ↑ DeployingHandler: record evaluation outcomes + │ + transition completed deployments (atomic revision swap + │ + DEPLOYING → READY + history recording) │ - │ 6. Lifecycle transition for completed deployments - │ if eval_result.completed: - │ _transition_completed_deployments(completed, all_records) - │ ↑ DEPLOYING → READY + history recording + │ 6. Post-process outside RecorderContext scope + │ for (h, result) in handler_results: + │ h.post_process(result) │ ▼ ``` -Key: `_handle_status_transitions()` uses the **exact same generic method** as the single-handler path. It performs batch updates and history recording based on each handler's `next_status()`/`failure_status()`. Completed deployments bypass this path — their lifecycle transition is handled by `_transition_completed_deployments()` after the revision swap in `post_process`. +Key: The coordinator has **no DEPLOYING-specific logic**. `_handle_status_transitions()` uses the same generic method for all handlers. DEPLOYING-specific concerns (evaluator invocation, route mutations, completion transitions) are fully encapsulated in `DeployingHandler.prepare()` and `DeployingHandler.finalize()`. ### Sub-Step Recording @@ -382,45 +369,50 @@ Each cycle evaluation produces sub-step variants recorded via the existing `Depl The coordinator's `_handle_status_transitions()` calls `extract_sub_steps_for_entity()` for each handler's result, including the deployment's sub-step information in the history. -#### Rolling Update Per-Cycle Recording Examples +#### Sub-Step Recording: Route Mutation Granularity -**PROVISIONING cycle** — new routes still being provisioned: +Sub-steps are recorded at the **route mutation level** by the evaluator's `_record_route_changes()`. Each route mutation type (rollout, drain, promote) is recorded as a separate sub-step entry with the count of affected routes. + +**PROVISIONING cycle** — new routes created: ``` sub_steps: - [rolling_update_evaluate] classify_routes → success - [rolling_update_evaluate] wait_provisioning → success - [strategy_result] determine_sub_step → success (message: "provisioning") + rollout → SUCCESS (message: "3 new route(s)") + provisioning → SUCCESS ``` **PROGRESSING cycle** — creating new routes / terminating old routes: ``` sub_steps: - [rolling_update_evaluate] classify_routes → success - [rolling_update_evaluate] check_completion → success - [rolling_update_evaluate] calculate_surge → success - [rolling_update_evaluate] build_route_changes → success - [strategy_result] determine_sub_step → success (message: "progressing") + rollout → SUCCESS (message: "1 new route(s)") + drain → SUCCESS (message: "1 route(s)") + progressing → SUCCESS ``` -**COMPLETED cycle** — all new routes healthy, no old routes remaining: +**COMPLETED cycle (Blue-Green)** — promotion executed: ``` sub_steps: - [rolling_update_evaluate] classify_routes → success - [rolling_update_evaluate] check_completion → success - [strategy_result] determine_sub_step → success (message: "completed") + drain → SUCCESS (message: "3 route(s)") + promote → SUCCESS (message: "3 route(s)") ``` -The revision swap (`complete_deployment_revision_swap`) is performed by the PROGRESSING handler's `post_process` outside the recorder scope, so it does not appear in sub_steps. The coordinator then transitions the deployment to READY with history recording. +**COMPLETED cycle (Rolling Update)** — final drain: -Format is `[phase] step`. The `determine_sub_step` step's `message` field records the determined sub-step value. This information is stored as JSON in the `deployment_history` table's `sub_steps` column and is queryable via API/CLI. +``` +sub_steps: + drain → SUCCESS (message: "1 route(s)") +``` + +Route mutation sub-steps are recorded within the `DeploymentRecorderContext` scope. For in-progress deployments, handlers add their own sub-step (e.g., `provisioning`, `progressing`) to the same record. For completed deployments, `_transition_completed_deployments()` receives the recorder pool's `all_records` and includes the current cycle's route mutation sub-steps in the completion history. + +The revision swap (`complete_deployment_revision_swap`) is an atomic DB operation that does not appear as a sub-step. This enables: -- **Observability**: Each deployment's progress is tracked per-entity with sub-step granularity (e.g., "provisioning", "progressing", "completed") -- **Debugging**: The sub-step history shows exactly which phase each deployment was in at each cycle +- **Observability**: Each deployment's progress is tracked per-entity with route mutation granularity +- **Debugging**: The sub-step history shows exactly which route mutations occurred at each cycle - **Consistency**: All handlers use the same coordinator generic path ### Per-Strategy Configuration @@ -434,6 +426,29 @@ This enables: On strategy failure (all new routes fail), automatic rollback always occurs. +## Decision Log + +### 2026-03-04: Unified coordinator code path via composite handler pattern + +**Context**: PR #9566 review identified that the coordinator treated DEPLOYING as a special case with a separate method (`process_deploying_lifecycle`) and separate code path. This created two parallel flows, a union type for handler keys (`DeploymentLifecycleType | (DeploymentLifecycleType, DeploymentSubStep)`), and DEPLOYING-specific branching in the event handler. + +**Decision**: Refactor to a single unified code path using the composite handler pattern. + +Three design principles drove the change: + +1. **DEPLOYING generalization**: DEPLOYING is no longer a special lifecycle type. The coordinator processes it through the same `process_deployment_lifecycle()` as all other types. No `if DEPLOYING` branches exist in the coordinator or event handler. + +2. **Sub-step unification via `prepare()`/`finalize()`**: The base `DeploymentHandler` gains two concrete methods with defaults — `prepare()` returns `[(self, deployments)]` (treat self as single task) and `finalize()` is a no-op. Simple handlers use these defaults unchanged. The composite `DeployingHandler` overrides `prepare()` to run the evaluator and return sub-step handler tasks, and `finalize()` to record evaluation outcomes and transition completed deployments. + +3. **Evaluator interface integration**: The evaluator is no longer called directly by the coordinator. Instead, `DeployingHandler` owns the evaluator and invokes it within `prepare()`. The coordinator has no knowledge of strategy evaluation, route mutations, or completion transitions — these are fully encapsulated in the handler. + +**Changes**: +- `DeploymentHandler` base: added `prepare()`, `finalize()` with defaults +- New `DeployingHandler` composite class: owns evaluator, sub-step handlers, route mutation logic, completion transition logic +- `DeploymentCoordinator`: removed `process_deploying_lifecycle()`, `DeploymentHandlerKey` type, `_strategy_registry`/`_deploying_evaluator` fields, and four private methods moved to `DeployingHandler` +- Handler map key simplified: `Mapping[DeploymentLifecycleType, DeploymentHandler]` +- Event handler: removed DEPLOYING branch + ## References - [BEP-1006: Service Deployment Strategy](BEP-1006-service-deployment-strategy.md) — High-level design for Blue-Green and Rolling Update diff --git a/proposals/BEP-1049/blue-green.md b/proposals/BEP-1049/blue-green.md index 1f21b8fbc64..a5d25f0f68e 100644 --- a/proposals/BEP-1049/blue-green.md +++ b/proposals/BEP-1049/blue-green.md @@ -77,15 +77,15 @@ The `DeploymentStrategyEvaluator` periodically evaluates each Blue-Green deploym ### Sub-Step Variants -Each cycle evaluation directly returns one of the shared sub-step variants: +Each cycle evaluation directly returns one of the shared sub-step variants. Completion is not a sub-step but a signal on `CycleEvaluationResult(sub_step=PROGRESSING, completed=True)` — the coordinator handles revision swap and READY transition directly. | Sub-Step | Condition | Handler Action | |----------|-----------|----------------| -| **provisioning** | No Green routes → created all as INACTIVE | DeployingInProgressHandler → DEPLOYING→DEPLOYING, reschedule | -| **provisioning** | Green routes are PROVISIONING | DeployingInProgressHandler → DEPLOYING→DEPLOYING, reschedule | -| **progressing** | Not all Green healthy (mixed state, no PROVISIONING) | DeployingInProgressHandler → DEPLOYING→DEPLOYING, reschedule | -| **progressing** | All Green healthy, waiting for promotion trigger (manual or delay) | DeployingInProgressHandler → DEPLOYING→DEPLOYING, reschedule | -| **completed** | Promotion executed (Green→ACTIVE, Blue→TERMINATING) | DeployingCompletedHandler → DEPLOYING→READY, revision swap | +| **provisioning** | No Green routes → created all as INACTIVE | DeployingProvisioningHandler → DEPLOYING→DEPLOYING, reschedule | +| **provisioning** | Green routes are PROVISIONING | DeployingProvisioningHandler → DEPLOYING→DEPLOYING, reschedule | +| **progressing** | Not all Green healthy (mixed state, no PROVISIONING) | DeployingProgressingHandler → DEPLOYING→DEPLOYING, reschedule | +| **progressing** | All Green healthy, waiting for promotion trigger (manual or delay) | DeployingProgressingHandler → DEPLOYING→DEPLOYING, reschedule | +| **progressing** (`completed=True`) | Promotion executed (Green→ACTIVE, Blue→TERMINATING) | Coordinator → atomic revision swap + DEPLOYING→READY | | **rolled_back** | All Green failed → terminate Green | DeployingRolledBackHandler → DEPLOYING→READY, deploying_revision=NULL | ## promote_delay_seconds Handling @@ -220,8 +220,8 @@ With `auto_promote=False`: │ strategy = policy.strategy │ │ 3. Dispatch by strategy: │ │ BLUE_GREEN → blue_green_evaluate(...) │ - │ 4. Group by sub_step and return │ - │ 5. Apply route changes (scale_out + scale_in) │ + │ 4. Aggregate route changes + group by sub_step │ + │ Coordinator applies route changes after evaluation │ └──────────────────────────┬───────────────────────────────────┘ │ ▼ @@ -240,27 +240,23 @@ With `auto_promote=False`: │ │ blue_active: blue + is_active() │ │ │ └────────────────────────────────────────────────────┘ │ │ │ - │ Actions applied: │ + │ Route changes returned (applied by coordinator): │ │ ┌────────────────────────────────────────────────────┐ │ - │ │ ● Green creation: │ │ + │ │ ● Green creation (rollout_specs): │ │ │ │ RouteCreatorSpec( │ │ │ │ revision_id = deploying_revision, │ │ │ │ traffic_status = INACTIVE ← differs from RU │ │ │ │ ) × target_count │ │ │ │ │ │ │ │ ● Promotion (traffic switch): │ │ - │ │ Green: RouteBatchUpdaterSpec( │ │ - │ │ traffic_status = ACTIVE │ │ - │ │ ) │ │ - │ │ Blue: RouteBatchUpdaterSpec( │ │ - │ │ status = TERMINATING, │ │ - │ │ traffic_status = INACTIVE │ │ - │ │ ) │ │ + │ │ promote_route_ids: Green route IDs │ │ + │ │ → traffic_status = ACTIVE │ │ + │ │ drain_route_ids: Blue route IDs │ │ + │ │ → status = TERMINATING │ │ │ │ │ │ │ │ ● Rollback: │ │ - │ │ Green: RouteBatchUpdaterSpec( │ │ - │ │ status = TERMINATING │ │ - │ │ ) │ │ + │ │ drain_route_ids: Green route IDs │ │ + │ │ → status = TERMINATING │ │ │ └────────────────────────────────────────────────────┘ │ └──────────────────────────────────────────────────────────────┘ │ @@ -268,11 +264,12 @@ With `auto_promote=False`: ┌──────────────────────────────────────────────────────────────┐ │ Per-Sub-Step Handlers (coordinator generic path) │ │ │ - │ PROVISIONING/PROGRESSING → DeployingInProgressHandler │ + │ PROVISIONING → DeployingProvisioningHandler │ │ next_status: DEPLOYING → coordinator records history │ │ │ - │ COMPLETED → DeployingCompletedHandler │ - │ next_status: READY → revision swap + coordinator transit │ + │ PROGRESSING → DeployingProgressingHandler │ + │ next_status: DEPLOYING → coordinator records history │ + │ completed=True → coordinator atomic revision swap + READY │ │ │ │ ROLLED_BACK → DeployingRolledBackHandler │ │ next_status: READY → clear dep_rev + coordinator transit │ @@ -287,14 +284,13 @@ When all Green routes become ACTIVE and Blue routes are terminated: completed determination (evaluator) │ ▼ - DeployingCompletedHandler.execute() - → complete_deployment_revision_swap(ids) - current_revision = deploying_revision - deploying_revision = NULL - │ - ▼ - Coordinator generic path - → DEPLOYING → READY history recording + lifecycle transition + Coordinator._transition_completed_deployments() + → Atomic transaction: + 1. complete_deployment_revision_swap(ids) + current_revision = deploying_revision + deploying_revision = NULL + 2. DEPLOYING → READY lifecycle transition + 3. History recording ``` ## Comparison with Rolling Update diff --git a/proposals/BEP-1049/rolling-update.md b/proposals/BEP-1049/rolling-update.md index ed9aab3edbc..1714f9b72ab 100644 --- a/proposals/BEP-1049/rolling-update.md +++ b/proposals/BEP-1049/rolling-update.md @@ -55,13 +55,13 @@ The `DeploymentStrategyEvaluator` periodically evaluates each Rolling Update dep ### Sub-Step Variants -Each cycle evaluation directly returns one of the shared sub-step variants: +Each cycle evaluation directly returns one of the shared sub-step variants. Completion is not a sub-step but a signal on `CycleEvaluationResult(sub_step=PROGRESSING, completed=True)` — the coordinator handles revision swap and READY transition directly. | Sub-Step | Condition | Handler Action | |----------|-----------|----------------| -| **provisioning** | New routes are PROVISIONING | DeployingInProgressHandler → DEPLOYING→DEPLOYING, reschedule | -| **progressing** | Calculated surge/unavailable, created/terminated routes | DeployingInProgressHandler → DEPLOYING→DEPLOYING, reschedule | -| **completed** | No Old routes and New healthy >= desired_replicas | DeployingCompletedHandler → DEPLOYING→READY, revision swap | +| **provisioning** | New routes are PROVISIONING | DeployingProvisioningHandler → DEPLOYING→DEPLOYING, reschedule | +| **progressing** | Calculated surge/unavailable, created/terminated routes | DeployingProgressingHandler → DEPLOYING→DEPLOYING, reschedule | +| **progressing** (`completed=True`) | No Old routes and New healthy >= desired_replicas | Coordinator → atomic revision swap + DEPLOYING→READY | ## max_surge / max_unavailable Calculation @@ -190,8 +190,8 @@ Example with `desired_replicas = 3`, `max_surge = 1`, `max_unavailable = 1`: │ strategy = policy.strategy │ │ 3. Dispatch by strategy: │ │ ROLLING → rolling_update_evaluate(...) │ - │ 4. Group by sub_step and return │ - │ 5. Apply route changes (scale_out + scale_in) │ + │ 4. Aggregate route changes + group by sub_step │ + │ Coordinator applies route changes after evaluation │ └──────────────────────────┬───────────────────────────────────┘ │ ▼ @@ -209,17 +209,15 @@ Example with `desired_replicas = 3`, `max_surge = 1`, `max_unavailable = 1`: │ │ old_active: old + is_active() │ │ │ └────────────────────────────────────────────────────┘ │ │ │ - │ Actions applied: │ + │ Route changes returned (applied by coordinator): │ │ ┌────────────────────────────────────────────────────┐ │ - │ │ scale_out: RouteCreatorSpec( │ │ + │ │ rollout_specs: RouteCreatorSpec( │ │ │ │ revision_id = deploying_revision, │ │ │ │ traffic_status = ACTIVE ← differs from BG │ │ │ │ ) │ │ │ │ │ │ - │ │ scale_in: RouteBatchUpdaterSpec( │ │ - │ │ status = TERMINATING, │ │ - │ │ traffic_status = INACTIVE │ │ - │ │ ) │ │ + │ │ drain_route_ids: old route IDs │ │ + │ │ → status = TERMINATING │ │ │ └────────────────────────────────────────────────────┘ │ └──────────────────────────────────────────────────────────────┘ │ @@ -227,11 +225,12 @@ Example with `desired_replicas = 3`, `max_surge = 1`, `max_unavailable = 1`: ┌──────────────────────────────────────────────────────────────┐ │ Per-Sub-Step Handlers (coordinator generic path) │ │ │ - │ PROVISIONING/PROGRESSING → DeployingInProgressHandler │ + │ PROVISIONING → DeployingProvisioningHandler │ │ next_status: DEPLOYING → coordinator records history │ │ │ - │ COMPLETED → DeployingCompletedHandler │ - │ next_status: READY → revision swap + coordinator transit │ + │ PROGRESSING → DeployingProgressingHandler │ + │ next_status: DEPLOYING → coordinator records history │ + │ completed=True → coordinator atomic revision swap + READY │ └──────────────────────────────────────────────────────────────┘ ``` @@ -243,12 +242,11 @@ When all Old routes are removed and New routes reach desired_replicas or above a completed determination (evaluator) │ ▼ - DeployingCompletedHandler.execute() - → complete_deployment_revision_swap(ids) - current_revision = deploying_revision - deploying_revision = NULL - │ - ▼ - Coordinator generic path - → DEPLOYING → READY history recording + lifecycle transition + Coordinator._transition_completed_deployments() + → Atomic transaction: + 1. complete_deployment_revision_swap(ids) + current_revision = deploying_revision + deploying_revision = NULL + 2. DEPLOYING → READY lifecycle transition + 3. History recording ``` diff --git a/src/ai/backend/manager/data/deployment/types.py b/src/ai/backend/manager/data/deployment/types.py index 2b2d00caaf7..ccee6a29ed1 100644 --- a/src/ai/backend/manager/data/deployment/types.py +++ b/src/ai/backend/manager/data/deployment/types.py @@ -157,6 +157,19 @@ class DeploymentSubStatus(enum.StrEnum): """ +class DeploymentSubStep(DeploymentSubStatus): + """Sub-steps for the DEPLOYING lifecycle phase. + + - PROVISIONING: New revision routes are being provisioned; waiting for readiness. + - PROGRESSING: Actively replacing old routes with new routes. + - ROLLED_BACK: All new routes failed; deployment rolled back to previous revision. + """ + + PROVISIONING = "provisioning" + PROGRESSING = "progressing" + ROLLED_BACK = "rolled_back" + + @dataclass(frozen=True) class DeploymentLifecycleStatus: """Target lifecycle state for a deployment status transition. @@ -353,6 +366,7 @@ class DeploymentInfo: network: DeploymentNetworkSpec model_revisions: list[ModelRevisionSpec] current_revision_id: UUID | None = None + deploying_revision_id: UUID | None = None def target_revision(self) -> ModelRevisionSpec | None: if self.model_revisions: diff --git a/src/ai/backend/manager/defs.py b/src/ai/backend/manager/defs.py index c9a1f9b073c..67d0bd066ae 100644 --- a/src/ai/backend/manager/defs.py +++ b/src/ai/backend/manager/defs.py @@ -110,6 +110,7 @@ class LockID(enum.IntEnum): LOCKID_DEPLOYMENT_CHECK_PENDING = 226 # For operations checking PENDING sessions LOCKID_DEPLOYMENT_CHECK_REPLICA = 227 # For operations checking REPLICA sessions LOCKID_DEPLOYMENT_DESTROYING = 228 # For operations destroying deployments + LOCKID_DEPLOYMENT_DEPLOYING = 229 # For operations deploying (rolling update) deployments # Sokovan target status locks (prevent concurrent operations on same status) LOCKID_SOKOVAN_TARGET_PENDING = 230 # For operations targeting PENDING sessions LOCKID_SOKOVAN_TARGET_PREPARING = 231 # For operations targeting PREPARING/PULLING sessions diff --git a/src/ai/backend/manager/models/endpoint/row.py b/src/ai/backend/manager/models/endpoint/row.py index 21c6a36849f..e27daca3095 100644 --- a/src/ai/backend/manager/models/endpoint/row.py +++ b/src/ai/backend/manager/models/endpoint/row.py @@ -837,6 +837,7 @@ def _to_deployment_info_from_revision( ), ], current_revision_id=self.current_revision, + deploying_revision_id=self.deploying_revision, ) def _to_deployment_info_legacy(self) -> DeploymentInfo: @@ -898,6 +899,7 @@ def _to_deployment_info_legacy(self) -> DeploymentInfo: ), ], current_revision_id=self.current_revision, + deploying_revision_id=self.deploying_revision, ) diff --git a/src/ai/backend/manager/repositories/deployment/db_source/db_source.py b/src/ai/backend/manager/repositories/deployment/db_source/db_source.py index 7f4b479f82a..4864d72f048 100644 --- a/src/ai/backend/manager/repositories/deployment/db_source/db_source.py +++ b/src/ai/backend/manager/repositories/deployment/db_source/db_source.py @@ -2087,24 +2087,30 @@ async def update_endpoint( return row.to_deployment_info() - async def update_current_revision( + async def start_deploying_revision( self, endpoint_id: uuid.UUID, revision_id: uuid.UUID, ) -> uuid.UUID | None: - """Update the current_revision of an endpoint and return the previous revision ID.""" + """Set deploying_revision and transition lifecycle to DEPLOYING. + + Returns the current (previous) revision ID for reference. + The coordinator will swap deploying_revision → current_revision on completion. + """ async with self._begin_session_read_committed() as db_sess: # Get current revision first query = sa.select(EndpointRow.current_revision).where(EndpointRow.id == endpoint_id) result = await db_sess.execute(query) - row = result.scalar_one_or_none() - previous_revision_id = row + previous_revision_id = result.scalar_one_or_none() - # Update to new revision + # Set deploying_revision and transition to DEPLOYING update_query = ( sa.update(EndpointRow) .where(EndpointRow.id == endpoint_id) - .values(current_revision=revision_id) + .values( + deploying_revision=revision_id, + lifecycle_stage=EndpointLifecycle.DEPLOYING, + ) ) await db_sess.execute(update_query) @@ -2248,6 +2254,125 @@ async def delete_deployment_policy( async with self._begin_session_read_committed() as db_sess: return await execute_purger(db_sess, purger) + async def complete_deployment_revision_swap( + self, + endpoint_ids: set[uuid.UUID], + ) -> None: + """Swap deploying_revision to current_revision for completed deployments. + + Sets current_revision = deploying_revision and deploying_revision = NULL + for the given endpoints. + + Args: + endpoint_ids: Set of endpoint IDs to swap revisions for. + """ + if not endpoint_ids: + return + async with self._begin_session_read_committed() as db_sess: + stmt = ( + sa.update(EndpointRow) + .where( + EndpointRow.id.in_(endpoint_ids), + EndpointRow.deploying_revision.isnot(None), + ) + .values( + current_revision=EndpointRow.deploying_revision, + deploying_revision=None, + ) + ) + await db_sess.execute(stmt) + + async def complete_deployment_and_transition_to_ready( + self, + endpoint_ids: set[uuid.UUID], + batch_updaters: Sequence[BatchUpdater[EndpointRow]], + bulk_creator: BulkCreator[DeploymentHistoryRow], + ) -> None: + """Atomically swap revisions, update lifecycle, and record history. + + Performs all three operations in a single transaction to prevent + inconsistent state if the process crashes between steps. + + The revision swap includes an idempotency guard + (deploying_revision IS NOT NULL) to prevent double-call issues. + + Args: + endpoint_ids: Set of endpoint IDs to swap revisions for. + batch_updaters: Sequence of BatchUpdaters for lifecycle status updates. + bulk_creator: BulkCreator containing all history records. + """ + if not endpoint_ids: + return + async with self._begin_session_read_committed() as db_sess: + # 1. Swap revisions with idempotency guard + swap_stmt = ( + sa.update(EndpointRow) + .where( + EndpointRow.id.in_(endpoint_ids), + EndpointRow.deploying_revision.isnot(None), + ) + .values( + current_revision=EndpointRow.deploying_revision, + deploying_revision=None, + ) + ) + await db_sess.execute(swap_stmt) + + # 2. Execute all lifecycle status updates + for batch_updater in batch_updaters: + await execute_batch_updater(db_sess, batch_updater) + + # 3. Record history (same logic as update_endpoint_lifecycle_bulk_with_history) + if not bulk_creator.specs: + return + + new_rows = [spec.build_row() for spec in bulk_creator.specs] + deployment_ids = [row.deployment_id for row in new_rows] + + last_records = await self._get_last_deployment_histories_bulk(db_sess, deployment_ids) + + merge_ids: list[uuid.UUID] = [] + create_rows: list[DeploymentHistoryRow] = [] + + for new_row in new_rows: + last_row = last_records.get(new_row.deployment_id) + if last_row is not None and last_row.should_merge_with(new_row): + merge_ids.append(last_row.id) + else: + create_rows.append(new_row) + + if merge_ids: + await db_sess.execute( + sa.update(DeploymentHistoryRow) + .where(DeploymentHistoryRow.id.in_(merge_ids)) + .values(attempts=DeploymentHistoryRow.attempts + 1) + ) + + if create_rows: + db_sess.add_all(create_rows) + await db_sess.flush() + + async def clear_deploying_revision( + self, + endpoint_ids: set[uuid.UUID], + ) -> None: + """Clear deploying_revision for rolled-back deployments. + + Sets deploying_revision = NULL without modifying current_revision. + + Args: + endpoint_ids: Set of endpoint IDs to clear deploying revision for. + """ + if not endpoint_ids: + return + async with self._begin_session_read_committed() as db_sess: + stmt = ( + sa.update(EndpointRow) + .where(EndpointRow.id.in_(endpoint_ids)) + .values(deploying_revision=None) + ) + await db_sess.execute(stmt) + # ========== Access Token Operations ========== async def create_access_token( diff --git a/src/ai/backend/manager/repositories/deployment/options/__init__.py b/src/ai/backend/manager/repositories/deployment/options/__init__.py index aec91afa3b1..4705a1d2879 100644 --- a/src/ai/backend/manager/repositories/deployment/options/__init__.py +++ b/src/ai/backend/manager/repositories/deployment/options/__init__.py @@ -3,6 +3,7 @@ from .access_token import AccessTokenConditions, AccessTokenOrders from .auto_scaling_rule import AutoScalingRuleConditions, AutoScalingRuleOrders from .deployment import DeploymentConditions, DeploymentOrders +from .deployment_policy import DeploymentPolicyConditions from .revision import RevisionConditions, RevisionOrders from .route import RouteConditions, RouteOrders @@ -16,6 +17,8 @@ # Deployment "DeploymentConditions", "DeploymentOrders", + # DeploymentPolicy + "DeploymentPolicyConditions", # Revision "RevisionConditions", "RevisionOrders", diff --git a/src/ai/backend/manager/repositories/deployment/options/deployment_policy.py b/src/ai/backend/manager/repositories/deployment/options/deployment_policy.py new file mode 100644 index 00000000000..306074d03d2 --- /dev/null +++ b/src/ai/backend/manager/repositories/deployment/options/deployment_policy.py @@ -0,0 +1,22 @@ +"""Query conditions and orders for deployment policies.""" + +from __future__ import annotations + +import uuid +from collections.abc import Collection + +import sqlalchemy as sa + +from ai.backend.manager.models.deployment_policy import DeploymentPolicyRow +from ai.backend.manager.repositories.base import QueryCondition + + +class DeploymentPolicyConditions: + """Query conditions for deployment policies.""" + + @staticmethod + def by_endpoint_ids(endpoint_ids: Collection[uuid.UUID]) -> QueryCondition: + def inner() -> sa.sql.expression.ColumnElement[bool]: + return DeploymentPolicyRow.endpoint.in_(endpoint_ids) + + return inner diff --git a/src/ai/backend/manager/repositories/deployment/repository.py b/src/ai/backend/manager/repositories/deployment/repository.py index 7849839480a..d78f05ffe04 100644 --- a/src/ai/backend/manager/repositories/deployment/repository.py +++ b/src/ai/backend/manager/repositories/deployment/repository.py @@ -1114,17 +1114,17 @@ async def update_endpoint( return await self._db_source.update_endpoint(updater) @deployment_repository_resilience.apply() - async def update_current_revision( + async def start_deploying_revision( self, endpoint_id: uuid.UUID, revision_id: uuid.UUID, ) -> uuid.UUID | None: - """Update the current revision of a deployment. + """Set deploying_revision and transition lifecycle to DEPLOYING. Returns: - The previous revision ID, or None if there was no previous revision. + The current (previous) revision ID, or None if there was no previous revision. """ - return await self._db_source.update_current_revision(endpoint_id, revision_id) + return await self._db_source.start_deploying_revision(endpoint_id, revision_id) # ========== Deployment Auto-Scaling Policy Operations ========== @@ -1216,6 +1216,34 @@ async def delete_deployment_policy( """ return await self._db_source.delete_deployment_policy(purger) + @deployment_repository_resilience.apply() + async def complete_deployment_revision_swap( + self, + endpoint_ids: set[uuid.UUID], + ) -> None: + """Swap deploying_revision to current_revision for completed deployments.""" + await self._db_source.complete_deployment_revision_swap(endpoint_ids) + + @deployment_repository_resilience.apply() + async def complete_deployment_and_transition_to_ready( + self, + endpoint_ids: set[uuid.UUID], + batch_updaters: list[BatchUpdater[EndpointRow]], + bulk_creator: BulkCreator[DeploymentHistoryRow], + ) -> None: + """Atomically swap revisions, update lifecycle, and record history.""" + await self._db_source.complete_deployment_and_transition_to_ready( + endpoint_ids, batch_updaters, bulk_creator + ) + + @deployment_repository_resilience.apply() + async def clear_deploying_revision( + self, + endpoint_ids: set[uuid.UUID], + ) -> None: + """Clear deploying_revision for rolled-back deployments.""" + await self._db_source.clear_deploying_revision(endpoint_ids) + # =================== # Route operations # =================== diff --git a/src/ai/backend/manager/services/deployment/service.py b/src/ai/backend/manager/services/deployment/service.py index 2b887aab2ad..c0ac60680b4 100644 --- a/src/ai/backend/manager/services/deployment/service.py +++ b/src/ai/backend/manager/services/deployment/service.py @@ -516,7 +516,11 @@ async def search_revisions(self, action: SearchRevisionsAction) -> SearchRevisio async def activate_revision( self, action: ActivateRevisionAction ) -> ActivateRevisionActionResult: - """Activate a specific revision to be the current revision. + """Activate a specific revision by initiating the deployment strategy. + + Sets deploying_revision and transitions the deployment to DEPLOYING state. + The coordinator will execute the configured deployment strategy (rolling update, + blue-green, etc.) and swap deploying_revision → current_revision on completion. Args: action: Action containing deployment and revision IDs @@ -527,18 +531,16 @@ async def activate_revision( # 1. Validate revision exists (raises exception if not found) _revision = await self._deployment_repository.get_revision(action.revision_id) - # 2. Update endpoint.current_revision and get previous revision - previous_revision_id = await self._deployment_repository.update_current_revision( + # 2. Set deploying_revision and transition to DEPLOYING lifecycle + previous_revision_id = await self._deployment_repository.start_deploying_revision( action.deployment_id, action.revision_id ) - # 3. Trigger lifecycle check to update routes with new revision - await self._deployment_controller.mark_lifecycle_needed( - DeploymentLifecycleType.CHECK_REPLICA - ) + # 3. Trigger DEPLOYING lifecycle to start strategy execution + await self._deployment_controller.mark_lifecycle_needed(DeploymentLifecycleType.DEPLOYING) log.info( - "Activated revision {} for deployment {} (previous: {})", + "Started deploying revision {} for deployment {} (current: {})", action.revision_id, action.deployment_id, previous_revision_id, diff --git a/src/ai/backend/manager/sokovan/deployment/coordinator.py b/src/ai/backend/manager/sokovan/deployment/coordinator.py index 4ca8247d8fe..9781f084ce3 100644 --- a/src/ai/backend/manager/sokovan/deployment/coordinator.py +++ b/src/ai/backend/manager/sokovan/deployment/coordinator.py @@ -5,7 +5,7 @@ from __future__ import annotations import logging -from collections.abc import Mapping +from collections.abc import Mapping, Sequence from contextlib import AsyncExitStack from dataclasses import dataclass from datetime import UTC, datetime @@ -14,9 +14,7 @@ from ai.backend.common.clients.http_client.client_pool import ClientPool from ai.backend.common.clients.valkey_client.valkey_schedule import ValkeyScheduleClient from ai.backend.common.clients.valkey_client.valkey_stat.client import ValkeyStatClient -from ai.backend.common.data.endpoint.types import EndpointLifecycle -from ai.backend.common.data.notification import NotificationRuleType -from ai.backend.common.data.notification.messages import EndpointLifecycleChangedMessage +from ai.backend.common.data.model_deployment.types import DeploymentStrategy from ai.backend.common.events.dispatcher import EventProducer from ai.backend.common.events.event_types.notification import NotificationTriggeredEvent from ai.backend.common.events.event_types.schedule.anycast import ( @@ -26,8 +24,13 @@ from ai.backend.common.leader.tasks.event_task import EventTaskSpec from ai.backend.logging import BraceStyleAdapter from ai.backend.manager.config.provider import ManagerConfigProvider -from ai.backend.manager.data.deployment.types import DeploymentInfo -from ai.backend.manager.data.session.types import SchedulingResult +from ai.backend.manager.data.deployment.types import ( + DeploymentInfo, + DeploymentSubStatus, + DeploymentSubStep, +) +from ai.backend.manager.data.session.types import SchedulingResult, SubStepResult +from ai.backend.manager.models.deployment_policy import BlueGreenSpec, RollingUpdateSpec from ai.backend.manager.models.endpoint import EndpointRow from ai.backend.manager.repositories.base.creator import BulkCreator from ai.backend.manager.repositories.base.updater import BatchUpdater @@ -35,10 +38,13 @@ DeploymentConditions, DeploymentRepository, ) -from ai.backend.manager.repositories.deployment.creators import EndpointLifecycleBatchUpdaterSpec +from ai.backend.manager.repositories.deployment.creators import ( + EndpointLifecycleBatchUpdaterSpec, +) from ai.backend.manager.repositories.scheduling_history.creators import DeploymentHistoryCreatorSpec from ai.backend.manager.sokovan.deployment.recorder import DeploymentRecorderContext from ai.backend.manager.sokovan.deployment.route.route_controller import RouteController +from ai.backend.manager.sokovan.recorder.pool import RecordPool from ai.backend.manager.sokovan.recorder.types import ExecutionRecord from ai.backend.manager.sokovan.recorder.utils import extract_sub_steps_for_entity from ai.backend.manager.sokovan.scheduling_controller.scheduling_controller import ( @@ -51,11 +57,19 @@ from .handlers import ( CheckPendingDeploymentHandler, CheckReplicaDeploymentHandler, + DeployingHandler, + DeployingProgressingHandler, + DeployingProvisioningHandler, + DeployingRolledBackHandler, DeploymentHandler, DestroyingDeploymentHandler, ReconcileDeploymentHandler, ScalingDeploymentHandler, + build_lifecycle_notification_event, ) +from .strategy.blue_green import BlueGreenStrategy +from .strategy.evaluator import DeploymentStrategyEvaluator, DeploymentStrategyRegistry +from .strategy.rolling_update import RollingUpdateStrategy from .types import DeploymentExecutionResult, DeploymentLifecycleType log = BraceStyleAdapter(logging.getLogger(__name__)) @@ -132,11 +146,41 @@ def __init__( ) self._deployment_handlers = self._init_handlers(executor) + @staticmethod + def _init_deployment_strategy_registry() -> DeploymentStrategyRegistry: + """Initialize the strategy registry with all supported deployment strategies.""" + registry = DeploymentStrategyRegistry() + registry.register(DeploymentStrategy.ROLLING, RollingUpdateStrategy, RollingUpdateSpec) + registry.register(DeploymentStrategy.BLUE_GREEN, BlueGreenStrategy, BlueGreenSpec) + return registry + def _init_handlers( self, executor: DeploymentExecutor ) -> Mapping[DeploymentLifecycleType, DeploymentHandler]: - """Initialize and return the mapping of deployment lifecycle types to their handlers.""" - return { + """Initialize and return the mapping of lifecycle types to their handlers.""" + # Strategy registry + evaluator for DEPLOYING composite handler + strategy_registry = self._init_deployment_strategy_registry() + evaluator = DeploymentStrategyEvaluator( + deployment_repo=self._deployment_repository, + strategy_registry=strategy_registry, + ) + + # Sub-step handlers used internally by DeployingHandler + sub_step_handlers: Mapping[DeploymentSubStep, DeploymentHandler] = { + DeploymentSubStep.PROVISIONING: DeployingProvisioningHandler( + deployment_controller=self._deployment_controller, + route_controller=self._route_controller, + ), + DeploymentSubStep.PROGRESSING: DeployingProgressingHandler( + deployment_controller=self._deployment_controller, + route_controller=self._route_controller, + ), + DeploymentSubStep.ROLLED_BACK: DeployingRolledBackHandler( + deployment_repo=self._deployment_repository, + ), + } + + handlers: dict[DeploymentLifecycleType, DeploymentHandler] = { DeploymentLifecycleType.CHECK_PENDING: CheckPendingDeploymentHandler( deployment_executor=executor, deployment_controller=self._deployment_controller, @@ -154,12 +198,19 @@ def _init_handlers( deployment_executor=executor, deployment_controller=self._deployment_controller, ), + DeploymentLifecycleType.DEPLOYING: DeployingHandler( + evaluator=evaluator, + sub_step_handlers=sub_step_handlers, + deployment_repo=self._deployment_repository, + event_producer=self._event_producer, + ), DeploymentLifecycleType.DESTROYING: DestroyingDeploymentHandler( deployment_executor=executor, deployment_controller=self._deployment_controller, route_controller=self._route_controller, ), } + return handlers async def process_deployment_lifecycle( self, @@ -181,21 +232,17 @@ async def process_deployment_lifecycle( return log.info("handler: {} - processing {} deployments", handler.name(), len(deployments)) - # Execute handler with recorder context deployment_ids = [d.id for d in deployments] with DeploymentRecorderContext.scope( lifecycle_type.value, entity_ids=deployment_ids ) as pool: - result = await handler.execute(deployments) - all_records = pool.build_all_records() - - # Handle status transitions with history recording - await self._handle_status_transitions(handler, result, all_records) + handler_tasks = await handler.prepare(deployments) + handler_results, all_records = await self._execute_and_transition_handlers( + handler_tasks, pool + ) - try: - await handler.post_process(result) - except Exception as e: - log.error("Error during post-processing: {}", e) + await handler.finalize(all_records) + await self._post_process_handlers(handler_results) async def _handle_status_transitions( self, @@ -228,6 +275,7 @@ async def _handle_status_transitions( next_lifecycle_status = transitions.success if next_lifecycle_status is not None and result.successes: next_lifecycle = next_lifecycle_status.lifecycle + sub_status = next_lifecycle_status.sub_status endpoint_ids = [d.id for d in result.successes] success_history_specs = [ DeploymentHistoryCreatorSpec( @@ -237,7 +285,9 @@ async def _handle_status_transitions( message=f"{handler_name} completed successfully", from_status=from_status, to_status=next_lifecycle, - sub_steps=extract_sub_steps_for_entity(d.id, records), + sub_steps=self._build_history_sub_steps( + d.id, records, sub_status, SchedulingResult.SUCCESS + ), ) for d in result.successes ] @@ -252,7 +302,7 @@ async def _handle_status_transitions( ) all_history_specs.extend(success_history_specs) notification_events.extend([ - self._build_lifecycle_notification_event( + build_lifecycle_notification_event( deployment=d, from_status=from_status, to_status=next_lifecycle, @@ -266,6 +316,7 @@ async def _handle_status_transitions( failure_lifecycle_status = transitions.failure if failure_lifecycle_status is not None and result.errors: failure_lifecycle = failure_lifecycle_status.lifecycle + failure_sub_status = failure_lifecycle_status.sub_status endpoint_ids = [e.deployment_info.id for e in result.errors] failure_history_specs = [ DeploymentHistoryCreatorSpec( @@ -276,7 +327,9 @@ async def _handle_status_transitions( from_status=from_status, to_status=failure_lifecycle, error_code=e.error_code, - sub_steps=extract_sub_steps_for_entity(e.deployment_info.id, records), + sub_steps=self._build_history_sub_steps( + e.deployment_info.id, records, failure_sub_status, SchedulingResult.FAILURE + ), ) for e in result.errors ] @@ -291,7 +344,7 @@ async def _handle_status_transitions( ) all_history_specs.extend(failure_history_specs) notification_events.extend([ - self._build_lifecycle_notification_event( + build_lifecycle_notification_event( deployment=e.deployment_info, from_status=from_status, to_status=failure_lifecycle, @@ -314,43 +367,72 @@ async def _handle_status_transitions( except Exception as e: log.warning("Failed to send lifecycle notification: {}", e) - def _build_lifecycle_notification_event( + async def _execute_and_transition_handlers( self, - deployment: DeploymentInfo, - from_status: EndpointLifecycle | None, - to_status: EndpointLifecycle, - transition_result: str, - timestamp: str, - ) -> NotificationTriggeredEvent: - """Build a notification event for a lifecycle transition.""" - message = EndpointLifecycleChangedMessage( - endpoint_id=str(deployment.id), - endpoint_name=deployment.metadata.name, - domain=deployment.metadata.domain, - project_id=str(deployment.metadata.project), - resource_group=deployment.metadata.resource_group, - from_status=from_status.value if from_status else None, - to_status=to_status.value, - transition_result=transition_result, - event_timestamp=timestamp, - ) - return NotificationTriggeredEvent( - rule_type=NotificationRuleType.ENDPOINT_LIFECYCLE_CHANGED.value, - timestamp=datetime.now(UTC), - notification_data=message.model_dump(), - ) + handler_tasks: Sequence[tuple[DeploymentHandler, Sequence[DeploymentInfo]]], + pool: RecordPool[UUID], + ) -> tuple[ + list[tuple[DeploymentHandler, DeploymentExecutionResult]], + Mapping[UUID, ExecutionRecord], + ]: + """Execute handlers, build records, and handle status transitions. - async def process_if_needed(self, lifecycle_type: DeploymentLifecycleType) -> None: - """ - Process deployment lifecycle operation if needed (based on internal state). + Must be called within a recorder scope. Records are built after all + handlers have executed to capture all execution records. Args: - lifecycle_type: Type of deployment lifecycle operation + handler_tasks: Sequence of (handler, deployments) pairs to execute. + pool: The recorder pool for building execution records. Returns: - True if operation was performed, False otherwise + Tuple of (handler results, execution records). """ - # Check internal state (uses Redis marks) + handler_results: list[tuple[DeploymentHandler, DeploymentExecutionResult]] = [] + for handler, handler_deployments in handler_tasks: + result = await handler.execute(handler_deployments) + handler_results.append((handler, result)) + + all_records = pool.build_all_records() + + for handler, result in handler_results: + await self._handle_status_transitions(handler, result, all_records) + + return handler_results, all_records + + async def _post_process_handlers( + self, + handler_results: Sequence[tuple[DeploymentHandler, DeploymentExecutionResult]], + ) -> None: + """Run post-processing for handlers outside the recorder scope.""" + for handler, result in handler_results: + try: + await handler.post_process(result) + except Exception as e: + log.error("Error during post-processing for {}: {}", handler.name(), e) + + @staticmethod + def _build_history_sub_steps( + entity_id: UUID, + records: Mapping[UUID, ExecutionRecord], + sub_status: DeploymentSubStatus | None, + scheduling_result: SchedulingResult, + ) -> list[SubStepResult]: + """Build sub_steps list, appending sub_status as an entry if present.""" + sub_steps = extract_sub_steps_for_entity(entity_id, records) + if sub_status is not None: + now = datetime.now(UTC) + sub_steps.append( + SubStepResult( + step=sub_status.value, + result=scheduling_result, + started_at=now, + ended_at=now, + ) + ) + return sub_steps + + async def process_if_needed(self, lifecycle_type: DeploymentLifecycleType) -> None: + """Process deployment lifecycle operation if needed (based on internal state).""" if not await self._valkey_schedule.load_and_delete_deployment_mark(lifecycle_type.value): return await self.process_deployment_lifecycle(lifecycle_type) @@ -386,6 +468,13 @@ def _create_task_specs() -> list[DeploymentTaskSpec]: long_interval=30.0, initial_delay=10.0, ), + # Deploying (rolling update) - both short and long cycles + DeploymentTaskSpec( + DeploymentLifecycleType.DEPLOYING, + short_interval=5.0, + long_interval=30.0, + initial_delay=10.0, + ), # Check destroying deployments - only long cycle DeploymentTaskSpec( DeploymentLifecycleType.DESTROYING, diff --git a/src/ai/backend/manager/sokovan/deployment/handlers/__init__.py b/src/ai/backend/manager/sokovan/deployment/handlers/__init__.py index 90f4cc62fe7..a38922ad64a 100644 --- a/src/ai/backend/manager/sokovan/deployment/handlers/__init__.py +++ b/src/ai/backend/manager/sokovan/deployment/handlers/__init__.py @@ -3,6 +3,14 @@ """ from .base import DeploymentHandler +from .deploying import ( + DeployingHandler, + DeployingInProgressHandler, + DeployingProgressingHandler, + DeployingProvisioningHandler, + DeployingRolledBackHandler, + build_lifecycle_notification_event, +) from .destroying import DestroyingDeploymentHandler from .pending import CheckPendingDeploymentHandler from .reconcile import ReconcileDeploymentHandler @@ -12,8 +20,14 @@ __all__ = [ "CheckPendingDeploymentHandler", "CheckReplicaDeploymentHandler", + "DeployingHandler", + "DeployingInProgressHandler", + "DeployingProgressingHandler", + "DeployingProvisioningHandler", + "DeployingRolledBackHandler", "DeploymentHandler", "DestroyingDeploymentHandler", "ReconcileDeploymentHandler", "ScalingDeploymentHandler", + "build_lifecycle_notification_event", ] diff --git a/src/ai/backend/manager/sokovan/deployment/handlers/base.py b/src/ai/backend/manager/sokovan/deployment/handlers/base.py index d3a40cb31dd..2cf2f16bf89 100644 --- a/src/ai/backend/manager/sokovan/deployment/handlers/base.py +++ b/src/ai/backend/manager/sokovan/deployment/handlers/base.py @@ -1,5 +1,8 @@ +from __future__ import annotations + from abc import abstractmethod -from collections.abc import Sequence +from collections.abc import Mapping, Sequence +from uuid import UUID from ai.backend.manager.data.deployment.types import ( DeploymentInfo, @@ -8,6 +11,7 @@ from ai.backend.manager.data.model_serving.types import EndpointLifecycle from ai.backend.manager.defs import LockID from ai.backend.manager.sokovan.deployment.types import DeploymentExecutionResult +from ai.backend.manager.sokovan.recorder.types import ExecutionRecord class DeploymentHandler: @@ -53,6 +57,16 @@ def status_transitions(cls) -> DeploymentStatusTransitions: """ raise NotImplementedError("Subclasses must implement status_transitions()") + async def prepare( + self, deployments: Sequence[DeploymentInfo] + ) -> list[tuple[DeploymentHandler, Sequence[DeploymentInfo]]]: + """Prepare handler tasks for execution. + + Default: treat self as a single sub-step. + Override for composite handlers (e.g., DeployingHandler) that dispatch to sub-handlers. + """ + return [(self, deployments)] + @abstractmethod async def execute(self, deployments: Sequence[DeploymentInfo]) -> DeploymentExecutionResult: """Execute the scheduling operation. @@ -64,9 +78,24 @@ async def execute(self, deployments: Sequence[DeploymentInfo]) -> DeploymentExec @abstractmethod async def post_process(self, result: DeploymentExecutionResult) -> None: - """Handle post-processing after the operation. + """Per-handler post-processing after execute(). + + Called for each (handler, result) pair returned by prepare(). + For composite handlers, this means each sub-step handler's post_process + is called individually — not the composite handler itself. + + Typical use: reschedule the next lifecycle cycle, trigger dependent lifecycles. Args: - result: The result from execute() + result: The result from this handler's execute() """ raise NotImplementedError("Subclasses must implement post_process()") + + async def finalize(self, records: Mapping[UUID, ExecutionRecord]) -> None: + """Post-execution finalization with access to execution records. + + Called after all handler tasks have been executed and status transitions recorded, + but before post_process. Default: no-op. + Override for composite handlers that need atomic completion transitions. + """ + pass diff --git a/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py b/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py new file mode 100644 index 00000000000..c3f1be8562b --- /dev/null +++ b/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py @@ -0,0 +1,469 @@ +"""Handlers for DEPLOYING sub-steps (BEP-1049). + +In-progress handlers (PROVISIONING, PROGRESSING) run *after* the coordinator +has applied route mutations from the evaluation result. Their ``execute`` +simply returns success. ``post_process`` triggers the next DEPLOYING cycle +and route provisioning. + +The rolled-back handler clears ``deploying_revision`` and transitions the +deployment back to READY. + +The composite ``DeployingHandler`` encapsulates strategy evaluation, route +mutations, sub-step dispatch, and completed deployment transitions so that +the coordinator can treat DEPLOYING identically to every other lifecycle type. +""" + +from __future__ import annotations + +import logging +from collections.abc import Mapping, Sequence +from datetime import UTC, datetime +from typing import override +from uuid import UUID + +from ai.backend.common.data.notification import NotificationRuleType +from ai.backend.common.data.notification.messages import EndpointLifecycleChangedMessage +from ai.backend.common.events.dispatcher import EventProducer +from ai.backend.common.events.event_types.notification import NotificationTriggeredEvent +from ai.backend.logging import BraceStyleAdapter +from ai.backend.manager.data.deployment.types import ( + DeploymentInfo, + DeploymentLifecycleStatus, + DeploymentStatusTransitions, + DeploymentSubStep, + RouteStatus, + RouteTrafficStatus, +) +from ai.backend.manager.data.model_serving.types import EndpointLifecycle +from ai.backend.manager.data.session.types import SchedulingResult +from ai.backend.manager.defs import LockID +from ai.backend.manager.models.routing import RoutingRow +from ai.backend.manager.repositories.base.creator import BulkCreator +from ai.backend.manager.repositories.base.updater import BatchUpdater +from ai.backend.manager.repositories.deployment import DeploymentConditions +from ai.backend.manager.repositories.deployment.creators import ( + EndpointLifecycleBatchUpdaterSpec, + RouteBatchUpdaterSpec, +) +from ai.backend.manager.repositories.deployment.options import RouteConditions +from ai.backend.manager.repositories.deployment.repository import DeploymentRepository +from ai.backend.manager.repositories.scheduling_history.creators import DeploymentHistoryCreatorSpec +from ai.backend.manager.sokovan.deployment.deployment_controller import DeploymentController +from ai.backend.manager.sokovan.deployment.route.route_controller import RouteController +from ai.backend.manager.sokovan.deployment.route.types import RouteLifecycleType +from ai.backend.manager.sokovan.deployment.strategy.evaluator import DeploymentStrategyEvaluator +from ai.backend.manager.sokovan.deployment.strategy.types import EvaluationGroup, EvaluationResult +from ai.backend.manager.sokovan.deployment.types import ( + DeploymentExecutionResult, + DeploymentLifecycleType, +) +from ai.backend.manager.sokovan.recorder.types import ExecutionRecord +from ai.backend.manager.sokovan.recorder.utils import extract_sub_steps_for_entity + +from .base import DeploymentHandler + +log = BraceStyleAdapter(logging.getLogger(__name__)) + + +# --------------------------------------------------------------------------- +# In-progress handlers (PROVISIONING / PROGRESSING) +# --------------------------------------------------------------------------- + + +class DeployingInProgressHandler(DeploymentHandler): + """Base handler for in-progress DEPLOYING sub-steps. + + execute() returns success for all supplied deployments. + post_process() re-schedules the DEPLOYING cycle and triggers route provisioning. + """ + + def __init__( + self, + deployment_controller: DeploymentController, + route_controller: RouteController, + ) -> None: + self._deployment_controller = deployment_controller + self._route_controller = route_controller + + @classmethod + @override + def name(cls) -> str: + return "deploying-in-progress" + + @property + @override + def lock_id(self) -> LockID | None: + return None # Lock is managed by the coordinator's _process_with_evaluator + + @classmethod + @override + def target_statuses(cls) -> list[EndpointLifecycle]: + return [EndpointLifecycle.DEPLOYING] + + @classmethod + @override + def status_transitions(cls) -> DeploymentStatusTransitions: + # Stay in DEPLOYING — no automatic transition here. + return DeploymentStatusTransitions(success=None, failure=None) + + @override + async def execute(self, deployments: Sequence[DeploymentInfo]) -> DeploymentExecutionResult: + return DeploymentExecutionResult(successes=list(deployments)) + + @override + async def post_process(self, result: DeploymentExecutionResult) -> None: + # Re-schedule DEPLOYING for the next coordinator cycle + await self._deployment_controller.mark_lifecycle_needed(DeploymentLifecycleType.DEPLOYING) + # Trigger route provisioning so new routes get sessions + await self._route_controller.mark_lifecycle_needed(RouteLifecycleType.PROVISIONING) + + +class DeployingProvisioningHandler(DeployingInProgressHandler): + """Handler for DEPLOYING / PROVISIONING sub-step. + + New-revision routes are being created; waiting for them to become HEALTHY. + """ + + @classmethod + @override + def name(cls) -> str: + return "deploying-provisioning" + + @classmethod + @override + def status_transitions(cls) -> DeploymentStatusTransitions: + return DeploymentStatusTransitions( + success=DeploymentLifecycleStatus( + lifecycle=EndpointLifecycle.DEPLOYING, + sub_status=DeploymentSubStep.PROVISIONING, + ), + failure=None, + ) + + +class DeployingProgressingHandler(DeployingInProgressHandler): + """Handler for DEPLOYING / PROGRESSING sub-step. + + Actively replacing old routes with new routes. + """ + + @classmethod + @override + def name(cls) -> str: + return "deploying-progressing" + + @classmethod + @override + def status_transitions(cls) -> DeploymentStatusTransitions: + return DeploymentStatusTransitions( + success=DeploymentLifecycleStatus( + lifecycle=EndpointLifecycle.DEPLOYING, + sub_status=DeploymentSubStep.PROGRESSING, + ), + failure=None, + ) + + +# --------------------------------------------------------------------------- +# Rolled-back handler +# --------------------------------------------------------------------------- + + +class DeployingRolledBackHandler(DeploymentHandler): + """Handler for DEPLOYING / ROLLED_BACK sub-step. + + Clears ``deploying_revision`` and transitions to READY / ROLLED_BACK. + """ + + def __init__(self, deployment_repo: DeploymentRepository) -> None: + self._deployment_repo = deployment_repo + + @classmethod + @override + def name(cls) -> str: + return "deploying-rolled-back" + + @property + @override + def lock_id(self) -> LockID | None: + return None # Lock is managed by the coordinator + + @classmethod + @override + def target_statuses(cls) -> list[EndpointLifecycle]: + return [EndpointLifecycle.DEPLOYING] + + @classmethod + @override + def status_transitions(cls) -> DeploymentStatusTransitions: + return DeploymentStatusTransitions( + success=DeploymentLifecycleStatus( + lifecycle=EndpointLifecycle.READY, + sub_status=DeploymentSubStep.ROLLED_BACK, + ), + failure=None, + ) + + @override + async def execute(self, deployments: Sequence[DeploymentInfo]) -> DeploymentExecutionResult: + endpoint_ids = {d.id for d in deployments} + await self._deployment_repo.clear_deploying_revision(endpoint_ids) + log.info("Cleared deploying_revision for {} rolled-back deployments", len(endpoint_ids)) + return DeploymentExecutionResult(successes=list(deployments)) + + @override + async def post_process(self, result: DeploymentExecutionResult) -> None: + pass + + +# --------------------------------------------------------------------------- +# Composite handler +# --------------------------------------------------------------------------- + + +def build_lifecycle_notification_event( + deployment: DeploymentInfo, + from_status: EndpointLifecycle | None, + to_status: EndpointLifecycle, + transition_result: str, + timestamp: str, +) -> NotificationTriggeredEvent: + """Build a notification event for a lifecycle transition.""" + message = EndpointLifecycleChangedMessage( + endpoint_id=str(deployment.id), + endpoint_name=deployment.metadata.name, + domain=deployment.metadata.domain, + project_id=str(deployment.metadata.project), + resource_group=deployment.metadata.resource_group, + from_status=from_status.value if from_status else None, + to_status=to_status.value, + transition_result=transition_result, + event_timestamp=timestamp, + ) + return NotificationTriggeredEvent( + rule_type=NotificationRuleType.ENDPOINT_LIFECYCLE_CHANGED.value, + timestamp=datetime.now(UTC), + notification_data=message.model_dump(), + ) + + +class DeployingHandler(DeploymentHandler): + """Composite handler for DEPLOYING lifecycle. + + Encapsulates strategy evaluation, route mutations, sub-step dispatch, + and completed deployment transitions so the coordinator treats DEPLOYING + identically to every other lifecycle type. + """ + + def __init__( + self, + evaluator: DeploymentStrategyEvaluator, + sub_step_handlers: Mapping[DeploymentSubStep, DeploymentHandler], + deployment_repo: DeploymentRepository, + event_producer: EventProducer, + ) -> None: + self._evaluator = evaluator + self._sub_step_handlers = sub_step_handlers + self._deployment_repo = deployment_repo + self._event_producer = event_producer + self._eval_result: EvaluationResult | None = None + + @classmethod + @override + def name(cls) -> str: + return "deploying" + + @property + @override + def lock_id(self) -> LockID | None: + return LockID.LOCKID_DEPLOYMENT_DEPLOYING + + @classmethod + @override + def target_statuses(cls) -> list[EndpointLifecycle]: + return [EndpointLifecycle.DEPLOYING] + + @classmethod + @override + def status_transitions(cls) -> DeploymentStatusTransitions: + return DeploymentStatusTransitions(success=None, failure=None) + + @override + async def prepare( + self, deployments: Sequence[DeploymentInfo] + ) -> list[tuple[DeploymentHandler, Sequence[DeploymentInfo]]]: + """Run evaluator, apply route changes, return sub-step handler tasks.""" + eval_result = await self._evaluator.evaluate(deployments) + self._eval_result = eval_result + await self._apply_route_changes(eval_result) + return self._resolve_handler_tasks(eval_result.groups) + + @override + async def execute(self, deployments: Sequence[DeploymentInfo]) -> DeploymentExecutionResult: + # Not called directly; prepare() returns sub-step handlers + return DeploymentExecutionResult(successes=list(deployments)) + + @override + async def post_process(self, result: DeploymentExecutionResult) -> None: + # Not called directly; sub-step handlers handle post-processing + pass + + @override + async def finalize(self, records: Mapping[UUID, ExecutionRecord]) -> None: + """Record evaluation outcomes and transition completed deployments.""" + eval_result = self._eval_result + if eval_result is None: + return + await self._record_evaluation_outcomes(eval_result) + if eval_result.completed: + await self._transition_completed_deployments(eval_result, records) + self._eval_result = None + + # -- Private helpers (moved from coordinator) -- + + async def _apply_route_changes(self, eval_result: EvaluationResult) -> None: + """Apply aggregated route mutations from the evaluation result.""" + changes = eval_result.route_changes + if not changes.rollout_specs and not changes.drain_route_ids: + return + + scale_in_updater: BatchUpdater[RoutingRow] | None = None + if changes.drain_route_ids: + scale_in_updater = BatchUpdater( + spec=RouteBatchUpdaterSpec( + status=RouteStatus.TERMINATING, + traffic_ratio=0.0, + traffic_status=RouteTrafficStatus.INACTIVE, + ), + conditions=[RouteConditions.by_ids(changes.drain_route_ids)], + ) + + await self._deployment_repo.scale_routes(changes.rollout_specs, scale_in_updater) + log.debug( + "Applied route changes: {} created, {} terminated", + len(changes.rollout_specs), + len(changes.drain_route_ids), + ) + + def _resolve_handler_tasks( + self, groups: dict[DeploymentSubStep, EvaluationGroup] + ) -> list[tuple[DeploymentHandler, Sequence[DeploymentInfo]]]: + """Resolve sub-step groups into handler-deployment pairs.""" + tasks: list[tuple[DeploymentHandler, Sequence[DeploymentInfo]]] = [] + for sub_step, group in groups.items(): + handler = self._sub_step_handlers.get(sub_step) + if handler is None: + log.warning("No handler for DEPLOYING sub-step {}", sub_step.value) + continue + tasks.append((handler, group.deployments)) + return tasks + + async def _record_evaluation_outcomes(self, eval_result: EvaluationResult) -> None: + """Record history for evaluation errors and skipped deployments.""" + lifecycle_value = DeploymentLifecycleType.DEPLOYING.value + + if eval_result.errors: + error_history_specs = [ + DeploymentHistoryCreatorSpec( + deployment_id=deployment.id, + phase=lifecycle_value, + result=SchedulingResult.NEED_RETRY, + message=f"Evaluation error: {reason}", + from_status=EndpointLifecycle.DEPLOYING, + to_status=None, + sub_steps=[], + ) + for deployment, reason in eval_result.errors + ] + await self._deployment_repo.update_endpoint_lifecycle_bulk_with_history( + [], BulkCreator(specs=error_history_specs) + ) + for deployment, reason in eval_result.errors: + log.error("Deployment {} evaluation error: {}", deployment.id, reason) + + if eval_result.skipped: + skipped_history_specs = [ + DeploymentHistoryCreatorSpec( + deployment_id=deployment.id, + phase=lifecycle_value, + result=SchedulingResult.SKIPPED, + message="No deployment policy found", + from_status=EndpointLifecycle.DEPLOYING, + to_status=None, + sub_steps=[], + ) + for deployment in eval_result.skipped + ] + await self._deployment_repo.update_endpoint_lifecycle_bulk_with_history( + [], BulkCreator(specs=skipped_history_specs) + ) + for deployment in eval_result.skipped: + log.warning("Deployment {} skipped: no deployment policy found", deployment.id) + + async def _transition_completed_deployments( + self, + eval_result: EvaluationResult, + records: Mapping[UUID, ExecutionRecord], + ) -> None: + """Transition completed DEPLOYING deployments to READY. + + Atomically: + 1. Swap deploying_revision -> current_revision (with idempotency guard). + 2. Update lifecycle to READY with history recording. + 3. Send notification events. + """ + completed = eval_result.completed + strategies = eval_result.completed_strategies + endpoint_ids = {deployment.id for deployment in completed} + lifecycle_value = DeploymentLifecycleType.DEPLOYING.value + + target_statuses = [EndpointLifecycle.DEPLOYING] + from_status = EndpointLifecycle.DEPLOYING + to_status = EndpointLifecycle.READY + + batch_updater = BatchUpdater( + spec=EndpointLifecycleBatchUpdaterSpec(lifecycle_stage=to_status), + conditions=[ + DeploymentConditions.by_ids(list(endpoint_ids)), + DeploymentConditions.by_lifecycle_stages(target_statuses), + ], + ) + + timestamp_now = datetime.now(UTC).isoformat() + history_specs = [ + DeploymentHistoryCreatorSpec( + deployment_id=deployment.id, + phase=lifecycle_value, + result=SchedulingResult.SUCCESS, + message=f"Deployment completed successfully (strategy: {strategies[deployment.id].value})" + if deployment.id in strategies + else "Deployment completed successfully", + from_status=from_status, + to_status=to_status, + sub_steps=extract_sub_steps_for_entity(deployment.id, records), + ) + for deployment in completed + ] + + # Atomic: revision swap + lifecycle update + history recording + await self._deployment_repo.complete_deployment_and_transition_to_ready( + endpoint_ids, [batch_updater], BulkCreator(specs=history_specs) + ) + log.info( + "Atomically swapped revision and transitioned {} deployments to READY", + len(endpoint_ids), + ) + + # Send notifications + for deployment in completed: + try: + event = build_lifecycle_notification_event( + deployment=deployment, + from_status=from_status, + to_status=to_status, + transition_result="success", + timestamp=timestamp_now, + ) + await self._event_producer.anycast_event(event) + except Exception as e: + log.warning("Failed to send lifecycle notification: {}", e) diff --git a/src/ai/backend/manager/sokovan/deployment/strategy/__init__.py b/src/ai/backend/manager/sokovan/deployment/strategy/__init__.py new file mode 100644 index 00000000000..964ab31c132 --- /dev/null +++ b/src/ai/backend/manager/sokovan/deployment/strategy/__init__.py @@ -0,0 +1,11 @@ +"""Deployment strategy evaluation for rolling update and blue-green deployments (BEP-1049).""" + +from .blue_green import BlueGreenStrategy +from .rolling_update import RollingUpdateStrategy +from .types import BaseDeploymentStrategy + +__all__ = [ + "BaseDeploymentStrategy", + "BlueGreenStrategy", + "RollingUpdateStrategy", +] diff --git a/src/ai/backend/manager/sokovan/deployment/strategy/blue_green.py b/src/ai/backend/manager/sokovan/deployment/strategy/blue_green.py new file mode 100644 index 00000000000..a282f0f8095 --- /dev/null +++ b/src/ai/backend/manager/sokovan/deployment/strategy/blue_green.py @@ -0,0 +1,33 @@ +"""Blue-green deployment strategy evaluation for a single deployment cycle (BEP-1049). + +Provisions a full set of new-revision routes, validates them, then atomically +switches traffic from the old revision to the new one. +""" + +from __future__ import annotations + +from collections.abc import Sequence + +from ai.backend.manager.data.deployment.types import ( + DeploymentInfo, + RouteInfo, +) +from ai.backend.manager.models.deployment_policy import BlueGreenSpec + +from .types import BaseDeploymentStrategy, CycleEvaluationResult + + +class BlueGreenStrategy(BaseDeploymentStrategy): + """Blue-green deployment strategy FSM.""" + + def __init__(self, spec: BlueGreenSpec) -> None: + super().__init__(spec) + self._spec = spec + + def evaluate_cycle( + self, + deployment: DeploymentInfo, + routes: Sequence[RouteInfo], + ) -> CycleEvaluationResult: + """Evaluate one cycle of blue-green deployment for a single deployment.""" + raise NotImplementedError("Blue-green deployment strategy is not yet implemented") diff --git a/src/ai/backend/manager/sokovan/deployment/strategy/evaluator.py b/src/ai/backend/manager/sokovan/deployment/strategy/evaluator.py new file mode 100644 index 00000000000..ffa95df5eb0 --- /dev/null +++ b/src/ai/backend/manager/sokovan/deployment/strategy/evaluator.py @@ -0,0 +1,183 @@ +"""Deployment strategy evaluator — orchestrates per-deployment FSM evaluation (BEP-1049). + +Loads policies and routes in bulk, dispatches each deployment to the appropriate +strategy FSM, and aggregates route mutations. The coordinator is responsible for +applying the aggregated route changes after evaluation. +""" + +from __future__ import annotations + +import logging +from collections.abc import Sequence +from dataclasses import dataclass + +from pydantic import BaseModel + +from ai.backend.common.data.model_deployment.types import DeploymentStrategy +from ai.backend.logging import BraceStyleAdapter +from ai.backend.manager.data.deployment.types import ( + DeploymentInfo, + DeploymentPolicyData, + RouteInfo, +) +from ai.backend.manager.errors.deployment import ( + InvalidDeploymentStrategy, + InvalidDeploymentStrategySpec, +) +from ai.backend.manager.repositories.base import BatchQuerier, NoPagination +from ai.backend.manager.repositories.deployment.options import ( + DeploymentPolicyConditions, +) +from ai.backend.manager.repositories.deployment.repository import DeploymentRepository +from ai.backend.manager.sokovan.deployment.recorder import DeploymentRecorderContext + +from .types import BaseDeploymentStrategy, EvaluationGroup, EvaluationResult, RouteChanges + + +@dataclass(frozen=True) +class DeploymentStrategyRegistryEntry: + """Maps a deployment strategy to its implementation class and expected spec type.""" + + strategy_cls: type[BaseDeploymentStrategy] + spec_type: type[BaseModel] + + +class DeploymentStrategyRegistry: + """Registry of deployment strategy implementations.""" + + def __init__(self) -> None: + self._entries: dict[DeploymentStrategy, DeploymentStrategyRegistryEntry] = {} + + def register( + self, + strategy: DeploymentStrategy, + strategy_cls: type[BaseDeploymentStrategy], + spec_type: type[BaseModel], + ) -> None: + self._entries[strategy] = DeploymentStrategyRegistryEntry(strategy_cls, spec_type) + + def get(self, strategy: DeploymentStrategy) -> DeploymentStrategyRegistryEntry | None: + return self._entries.get(strategy) + + +log = BraceStyleAdapter(logging.getLogger(__name__)) + + +class DeploymentStrategyEvaluator: + """Evaluates DEPLOYING deployments and produces grouped results + route mutations.""" + + def __init__( + self, + deployment_repo: DeploymentRepository, + strategy_registry: DeploymentStrategyRegistry, + ) -> None: + self._deployment_repo = deployment_repo + self._strategy_registry = strategy_registry + + async def evaluate( + self, + deployments: Sequence[DeploymentInfo], + ) -> EvaluationResult: + """Evaluate all DEPLOYING deployments in a single cycle. + + Steps: + 1. Bulk-load policies and active routes. + 2. Per-deployment: dispatch to strategy FSM. + 3. Aggregate route changes into result (applied by coordinator). + 4. Group deployments by sub-step and return. + """ + result = EvaluationResult() + + if not deployments: + return result + + endpoint_ids = {d.id for d in deployments} + + # ── 1. Bulk-load policies and routes ── + policy_search = await self._deployment_repo.search_deployment_policies( + BatchQuerier( + pagination=NoPagination(), + conditions=[DeploymentPolicyConditions.by_endpoint_ids(endpoint_ids)], + ) + ) + policy_map = {p.endpoint: p for p in policy_search.items} + route_map = await self._deployment_repo.fetch_active_routes_by_endpoint_ids(endpoint_ids) + + # ── 2. Per-deployment evaluation ── + for deployment in deployments: + policy = policy_map.get(deployment.id) + if policy is None: + log.warning("deployment {}: no policy found — skipping", deployment.id) + result.skipped.append(deployment) + continue + + routes: list[RouteInfo] = list(route_map.get(deployment.id, [])) + + try: + strategy = self._create_strategy(policy.strategy, policy) + cycle_result = strategy.evaluate_cycle(deployment, routes) + except Exception as e: + log.warning("deployment {}: evaluation error — {}", deployment.id, e) + result.errors.append((deployment, str(e))) + continue + + # ── 3. Aggregate route changes and record sub-steps ── + changes = cycle_result.route_changes + result.route_changes.rollout_specs.extend(changes.rollout_specs) + result.route_changes.drain_route_ids.extend(changes.drain_route_ids) + self._record_route_changes(deployment, changes) + + # Group by sub-step + if cycle_result.completed: + result.completed.append(deployment) + result.completed_strategies[deployment.id] = policy.strategy + else: + group = result.groups.setdefault( + cycle_result.sub_step, + EvaluationGroup(sub_step=cycle_result.sub_step), + ) + group.deployments.append(deployment) + + return result + + @staticmethod + def _record_route_changes(deployment: DeploymentInfo, changes: RouteChanges) -> None: + """Record rollout/drain operations as sub-steps for observability.""" + if not changes.rollout_specs and not changes.drain_route_ids: + return + pool = DeploymentRecorderContext.current_pool() + recorder = pool.recorder(deployment.id) + with recorder.phase("route_mutations"): + if changes.rollout_specs: + with recorder.step( + "rollout", + success_detail=f"{len(changes.rollout_specs)} new route(s)", + ): + pass + if changes.drain_route_ids: + with recorder.step( + "drain", + success_detail=f"{len(changes.drain_route_ids)} route(s)", + ): + pass + + def _create_strategy( + self, + strategy: DeploymentStrategy, + policy: DeploymentPolicyData, + ) -> BaseDeploymentStrategy: + """Create a strategy instance for the given deployment policy.""" + entry = self._strategy_registry.get(strategy) + if entry is None: + raise InvalidDeploymentStrategy( + extra_msg=f"Unsupported deployment strategy: {strategy}" + ) + spec = policy.strategy_spec + if not isinstance(spec, entry.spec_type): + raise InvalidDeploymentStrategySpec( + extra_msg=( + f"Expected {entry.spec_type.__name__} for {strategy.name} strategy," + f" got {type(spec).__name__}" + ), + ) + return entry.strategy_cls(spec) diff --git a/src/ai/backend/manager/sokovan/deployment/strategy/rolling_update.py b/src/ai/backend/manager/sokovan/deployment/strategy/rolling_update.py new file mode 100644 index 00000000000..923254ab388 --- /dev/null +++ b/src/ai/backend/manager/sokovan/deployment/strategy/rolling_update.py @@ -0,0 +1,33 @@ +"""Rolling update strategy evaluation for a single deployment cycle (BEP-1049). + +Classifies routes by revision (old/new) and status, then decides the next +sub-step and route mutations based on ``max_surge`` / ``max_unavailable``. +""" + +from __future__ import annotations + +from collections.abc import Sequence + +from ai.backend.manager.data.deployment.types import ( + DeploymentInfo, + RouteInfo, +) +from ai.backend.manager.models.deployment_policy import RollingUpdateSpec + +from .types import BaseDeploymentStrategy, CycleEvaluationResult + + +class RollingUpdateStrategy(BaseDeploymentStrategy): + """Rolling update deployment strategy FSM.""" + + def __init__(self, spec: RollingUpdateSpec) -> None: + super().__init__(spec) + self._spec = spec + + def evaluate_cycle( + self, + deployment: DeploymentInfo, + routes: Sequence[RouteInfo], + ) -> CycleEvaluationResult: + """Evaluate one cycle of rolling update for a single deployment.""" + raise NotImplementedError("Rolling update strategy is not yet implemented") diff --git a/src/ai/backend/manager/sokovan/deployment/strategy/types.py b/src/ai/backend/manager/sokovan/deployment/strategy/types.py new file mode 100644 index 00000000000..6adcb204b24 --- /dev/null +++ b/src/ai/backend/manager/sokovan/deployment/strategy/types.py @@ -0,0 +1,92 @@ +"""Types for deployment strategy evaluation (BEP-1049).""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from collections.abc import Sequence +from dataclasses import dataclass, field +from uuid import UUID + +from pydantic import BaseModel + +from ai.backend.common.data.model_deployment.types import DeploymentStrategy +from ai.backend.manager.data.deployment.types import ( + DeploymentInfo, + DeploymentSubStep, + RouteInfo, +) +from ai.backend.manager.models.routing import RoutingRow +from ai.backend.manager.repositories.base import Creator + + +@dataclass +class RouteChanges: + """Route mutations to apply for a single deployment cycle.""" + + rollout_specs: list[Creator[RoutingRow]] = field(default_factory=list) + drain_route_ids: list[UUID] = field(default_factory=list) + + +@dataclass +class CycleEvaluationResult: + """Result of evaluating a single deployment's strategy cycle.""" + + sub_step: DeploymentSubStep + completed: bool = False + route_changes: RouteChanges = field(default_factory=RouteChanges) + + +@dataclass +class EvaluationGroup: + """Deployments grouped by their sub-step result.""" + + sub_step: DeploymentSubStep + deployments: list[DeploymentInfo] = field(default_factory=list) + + +@dataclass +class EvaluationResult: + """Aggregate result of evaluating all DEPLOYING deployments.""" + + # In-progress deployments grouped by sub-step (PROVISIONING, PROGRESSING, etc.). + # DeployingHandler.prepare() resolves the sub-step handler for each group. + groups: dict[DeploymentSubStep, EvaluationGroup] = field(default_factory=dict) + + # Deployments that satisfied all strategy FSM conditions and are ready to finish. + # DeployingHandler.finalize() performs an atomic revision swap + READY transition. + completed: list[DeploymentInfo] = field(default_factory=list) + + # Maps each completed deployment to the strategy (ROLLING, BLUE_GREEN) it used. + # Included in the history message for observability. + completed_strategies: dict[UUID, DeploymentStrategy] = field(default_factory=dict) + + # Deployments skipped because no deployment policy was found. + # DeployingHandler.finalize() records SKIPPED history and emits a warning log. + skipped: list[DeploymentInfo] = field(default_factory=list) + + # Deployments that raised an exception during strategy FSM evaluation, paired + # with the error message. DeployingHandler.finalize() records NEED_RETRY history + # and keeps the lifecycle at DEPLOYING so the next cycle can retry. + errors: list[tuple[DeploymentInfo, str]] = field(default_factory=list) + + # Aggregated route mutations from all per-deployment evaluations. + # DeployingHandler.prepare() applies these after evaluation completes. + route_changes: RouteChanges = field(default_factory=RouteChanges) + + +class BaseDeploymentStrategy(ABC): + """Base interface for deployment strategy cycle evaluation. + + Each concrete strategy (Blue-Green, Rolling Update) implements this interface. + The spec is injected via ``__init__`` — one instance per deployment. + """ + + def __init__(self, spec: BaseModel) -> None: + self._spec = spec + + @abstractmethod + def evaluate_cycle( + self, + deployment: DeploymentInfo, + routes: Sequence[RouteInfo], + ) -> CycleEvaluationResult: ... diff --git a/src/ai/backend/manager/sokovan/deployment/types.py b/src/ai/backend/manager/sokovan/deployment/types.py index 508534fc850..bf6e6dd2744 100644 --- a/src/ai/backend/manager/sokovan/deployment/types.py +++ b/src/ai/backend/manager/sokovan/deployment/types.py @@ -16,6 +16,7 @@ class DeploymentLifecycleType(StrEnum): CHECK_REPLICA = "check_replica" SCALING = "scaling" RECONCILE = "reconcile" + DEPLOYING = "deploying" DESTROYING = "destroying" @@ -34,6 +35,7 @@ class DeploymentExecutionResult: successes: list[DeploymentInfo] = field(default_factory=list) errors: list[DeploymentExecutionError] = field(default_factory=list) skipped: list[DeploymentInfo] = field(default_factory=list) + completed: list[DeploymentInfo] = field(default_factory=list) @dataclass diff --git a/tests/unit/manager/sokovan/deployment/test_coordinator_history.py b/tests/unit/manager/sokovan/deployment/test_coordinator_history.py index ca3e9259b9e..73f3811b42e 100644 --- a/tests/unit/manager/sokovan/deployment/test_coordinator_history.py +++ b/tests/unit/manager/sokovan/deployment/test_coordinator_history.py @@ -171,6 +171,7 @@ def mock_handler_with_success( failure=None, ) ) + mock.prepare = AsyncMock(side_effect=lambda deployments: [(mock, deployments)]) mock.execute = AsyncMock( return_value=DeploymentExecutionResult( successes=[sample_deployment_info], @@ -178,6 +179,7 @@ def mock_handler_with_success( ) ) mock.post_process = AsyncMock() + mock.finalize = AsyncMock() return mock @@ -196,6 +198,7 @@ def mock_handler_with_failure( failure=DeploymentLifecycleStatus(lifecycle=EndpointLifecycle.DESTROYED), ) ) + mock.prepare = AsyncMock(side_effect=lambda deployments: [(mock, deployments)]) mock.execute = AsyncMock( return_value=DeploymentExecutionResult( successes=[], @@ -203,6 +206,7 @@ def mock_handler_with_failure( ) ) mock.post_process = AsyncMock() + mock.finalize = AsyncMock() return mock @@ -219,8 +223,10 @@ def mock_handler_with_empty_result() -> MagicMock: failure=None, ) ) + mock.prepare = AsyncMock(side_effect=lambda deployments: [(mock, deployments)]) mock.execute = AsyncMock(return_value=DeploymentExecutionResult()) mock.post_process = AsyncMock() + mock.finalize = AsyncMock() return mock