Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/9568.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement Blue-Green deployment strategy
1 change: 1 addition & 0 deletions src/ai/backend/manager/data/deployment/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ class RouteInfo:
created_at: datetime | None
revision_id: UUID | None
traffic_status: RouteTrafficStatus
status_updated_at: datetime | None = None
error_data: dict[str, Any] = field(default_factory=dict)


Expand Down
7 changes: 7 additions & 0 deletions src/ai/backend/manager/models/routing/row.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ class RoutingRow(Base): # type: ignore[misc]

# Revision reference without FK (relationship only)
revision: Mapped[uuid.UUID | None] = mapped_column("revision", GUID, nullable=True)
status_updated_at: Mapped[datetime | None] = mapped_column(
"status_updated_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=True,
)
traffic_status: Mapped[RouteTrafficStatus] = mapped_column(
"traffic_status",
EnumValueType(RouteTrafficStatus),
Expand Down Expand Up @@ -255,5 +261,6 @@ def to_route_info(self) -> RouteInfo:
created_at=self.created_at,
revision_id=self.revision,
traffic_status=self.traffic_status,
status_updated_at=self.status_updated_at,
error_data=self.error_data or {},
)
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import uuid
from dataclasses import dataclass
from datetime import UTC, datetime
from typing import Any, override

from ai.backend.manager.data.deployment.types import RouteStatus, RouteTrafficStatus
Expand Down Expand Up @@ -66,6 +67,7 @@ def build_values(self) -> dict[str, Any]:
values: dict[str, Any] = {}
if self.status is not None:
values["status"] = self.status
values["status_updated_at"] = datetime.now(UTC)
if self.traffic_ratio is not None:
values["traffic_ratio"] = self.traffic_ratio
if self.traffic_status is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1406,19 +1406,49 @@ async def fetch_active_routes_by_endpoint_ids(
routes_by_endpoint[row.endpoint].append(row.to_route_info())
return routes_by_endpoint

async def fetch_routes_by_endpoint_ids(
self,
endpoint_ids: set[uuid.UUID],
) -> Mapping[uuid.UUID, list[RouteInfo]]:
"""Fetch all routes for given endpoint IDs (no status filter).

Unlike fetch_active_routes_by_endpoint_ids, this includes routes
in all statuses (FAILED_TO_START, TERMINATED, etc.), which is
required for blue-green rollback detection.
"""
if not endpoint_ids:
return {}

async with self._begin_readonly_session_read_committed() as db_sess:
query = sa.select(RoutingRow).where(
RoutingRow.endpoint.in_(endpoint_ids),
)
result = await db_sess.execute(query)
rows: Sequence[RoutingRow] = result.scalars().all()
routes_by_endpoint: defaultdict[uuid.UUID, list[RouteInfo]] = defaultdict(list)
for row in rows:
if row.endpoint not in routes_by_endpoint:
routes_by_endpoint[row.endpoint] = []
routes_by_endpoint[row.endpoint].append(row.to_route_info())
return routes_by_endpoint

async def scale_routes(
self,
scale_out_creators: Sequence[Creator[RoutingRow]],
scale_in_updater: BatchUpdater[RoutingRow] | None,
promote_updater: BatchUpdater[RoutingRow] | None = None,
) -> None:
"""Scale out/in routes based on provided creators and updater."""
"""Scale out/in/promote routes based on provided creators and updaters."""
Comment on lines 1435 to +1441
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scaling in/out and promoting should be considered separately.

async with self._begin_session_read_committed() as db_sess:
# Scale out routes
for creator in scale_out_creators:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEDIUM] Finding #5: Scale-in executes before promote in the DB transaction

In scale_routes(), the execution order is:

  1. Scale out (create new routes)
  2. Scale in (blue routes -> TERMINATING, traffic_ratio=0.0, INACTIVE)
  3. Promote (green routes -> ACTIVE, traffic_ratio=1.0)

During step 2, the blue routes are set to TERMINATING/INACTIVE but the green routes are not yet promoted to ACTIVE. If the transaction is read by a concurrent load balancer query between steps 2 and 3 (READ COMMITTED isolation allows this), there could be a brief moment where no routes are ACTIVE for this deployment.

While this is within a single transaction and the commit is atomic, with READ COMMITTED isolation, other transactions reading during this window could see intermediate state.

Suggested fix: Consider reordering to promote first, then scale-in. This way, there is a brief overlap period where both blue and green are ACTIVE (which is safer for availability than having neither active). Alternatively, consider using SERIALIZABLE isolation for this specific operation.

await execute_creator(db_sess, creator)
# Scale in routes
if scale_in_updater:
await execute_batch_updater(db_sess, scale_in_updater)
# Promote routes (blue-green)
if promote_updater:
await execute_batch_updater(db_sess, promote_updater)

# Route operations

Expand Down
11 changes: 10 additions & 1 deletion src/ai/backend/manager/repositories/deployment/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,13 +548,22 @@ async def fetch_active_routes_by_endpoint_ids(
"""Fetch routes for multiple endpoints."""
return await self._db_source.fetch_active_routes_by_endpoint_ids(endpoint_ids)

@deployment_repository_resilience.apply()
async def fetch_routes_by_endpoint_ids(
self,
endpoint_ids: set[uuid.UUID],
) -> Mapping[uuid.UUID, list[RouteInfo]]:
"""Fetch all routes for multiple endpoints (no status filter)."""
return await self._db_source.fetch_routes_by_endpoint_ids(endpoint_ids)

@deployment_repository_resilience.apply()
async def scale_routes(
self,
scale_out_creators: Sequence[Creator[RoutingRow]],
scale_in_updater: BatchUpdater[RoutingRow] | None,
promote_updater: BatchUpdater[RoutingRow] | None = None,
) -> None:
await self._db_source.scale_routes(scale_out_creators, scale_in_updater)
await self._db_source.scale_routes(scale_out_creators, scale_in_updater, promote_updater)

# Route operations

Expand Down
19 changes: 16 additions & 3 deletions src/ai/backend/manager/sokovan/deployment/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ async def _apply_route_changes(
) -> None:
"""Apply aggregated route mutations from the evaluation result."""
changes = eval_result.route_changes
if not changes.rollout_specs and not changes.drain_route_ids:
if not changes.rollout_specs and not changes.drain_route_ids and not changes.promote_route_ids:
return

scale_in_updater: BatchUpdater[RoutingRow] | None = None
Expand All @@ -512,11 +512,24 @@ async def _apply_route_changes(
conditions=[RouteConditions.by_ids(changes.drain_route_ids)],
)

await self._deployment_repository.scale_routes(changes.rollout_specs, scale_in_updater)
promote_updater: BatchUpdater[RoutingRow] | None = None
if changes.promote_route_ids:
promote_updater = BatchUpdater(
spec=RouteBatchUpdaterSpec(
traffic_status=RouteTrafficStatus.ACTIVE,
traffic_ratio=1.0,
),
conditions=[RouteConditions.by_ids(changes.promote_route_ids)],
)

await self._deployment_repository.scale_routes(
changes.rollout_specs, scale_in_updater, promote_updater
)
log.debug(
"Applied route changes: {} created, {} terminated",
"Applied route changes: {} created, {} terminated, {} promoted",
len(changes.rollout_specs),
len(changes.drain_route_ids),
len(changes.promote_route_ids),
)

async def _transition_completed_deployments(
Expand Down
178 changes: 173 additions & 5 deletions src/ai/backend/manager/sokovan/deployment/strategy/blue_green.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,194 @@
"""Blue-green deployment strategy evaluation for a single deployment cycle (BEP-1049).

Provisions a full set of new-revision routes, validates them, then atomically
switches traffic from the old revision to the new one.
Provisions a full set of new-revision routes (INACTIVE), validates them, then
atomically switches traffic from the old revision to the new one.
"""

from __future__ import annotations

import logging
from collections.abc import Sequence
from datetime import UTC, datetime

from ai.backend.logging import BraceStyleAdapter
from ai.backend.manager.data.deployment.types import (
DeploymentInfo,
DeploymentSubStep,
RouteInfo,
RouteStatus,
RouteTrafficStatus,
)
from ai.backend.manager.models.deployment_policy import BlueGreenSpec
from ai.backend.manager.models.routing import RoutingRow
from ai.backend.manager.repositories.base import Creator
from ai.backend.manager.repositories.deployment.creators import RouteCreatorSpec

from .types import CycleEvaluationResult
from .types import CycleEvaluationResult, RouteChanges

log = BraceStyleAdapter(logging.getLogger(__name__))


def blue_green_evaluate(
deployment: DeploymentInfo,
routes: Sequence[RouteInfo],
spec: BlueGreenSpec,
) -> CycleEvaluationResult:
"""Evaluate one cycle of blue-green deployment for a single deployment."""
raise NotImplementedError("Blue-green deployment strategy is not yet implemented")
"""Evaluate one cycle of blue-green deployment for a single deployment.

FSM flow:
1. Classify routes into blue (old) / green (new) by revision_id.
2. If no green routes → create all green (INACTIVE) → PROVISIONING.
3. If any green PROVISIONING → PROVISIONING (wait).
4. If all green failed → scale_in green → ROLLED_BACK.
5. If not all green healthy → PROGRESSING (wait).
6. If all green healthy + auto_promote=False → PROGRESSING (manual wait).
7. If all green healthy + auto_promote=True + delay>0 → PROGRESSING (delay wait).
8. If all green healthy + auto_promote=True + delay=0 → promote + completed.
"""
deploying_rev = deployment.deploying_revision_id
desired = deployment.replica_spec.target_replica_count

# ── 1. Classify routes ──
blue_active: list[RouteInfo] = []
green_provisioning: list[RouteInfo] = []
green_healthy: list[RouteInfo] = []
green_failed: list[RouteInfo] = []

for r in routes:
is_green = r.revision_id == deploying_rev
if not is_green:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[HIGH] Finding #1: Rollback path (step 4) is unreachable in production

The evaluator calls fetch_active_routes_by_endpoint_ids() which filters by RouteStatus.active_route_statuses() = {PROVISIONING, HEALTHY, UNHEALTHY, DEGRADED}. The statuses FAILED_TO_START and TERMINATED are excluded from the query results.

This means the FSM will never see green_failed routes in production. Step 4 (all green failed -> rollback) is dead code in the integrated path. When all green routes fail, they disappear from the route list, and the FSM falls into step 2 (no green routes -> create new ones), causing an infinite retry loop of creating new green routes instead of a proper rollback.

The unit tests pass because they call blue_green_evaluate() directly with manually constructed failed routes, bypassing the data fetching layer.

Suggested fix: Either:

  1. Create a new fetch method (or add a parameter) that includes FAILED_TO_START / TERMINATED statuses for the deployment strategy evaluator, or
  2. Handle the "no green routes but previously had green routes" case in the FSM (requires tracking deployment state across cycles).

if r.status.is_active():
blue_active.append(r)
continue

if r.status == RouteStatus.PROVISIONING:
green_provisioning.append(r)
elif r.status == RouteStatus.HEALTHY:
green_healthy.append(r)
elif r.status in (RouteStatus.FAILED_TO_START, RouteStatus.TERMINATED):
green_failed.append(r)
elif r.status.is_active():
green_healthy.append(r)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEDIUM] Finding #4: UNHEALTHY and DEGRADED green routes silently classified as "healthy"

The is_active() fallback at line 70 catches RouteStatus.UNHEALTHY and RouteStatus.DEGRADED and puts them into green_healthy. This means the FSM could promote green routes that are actively unhealthy or degraded to receive production traffic.

While PROVISIONING is explicitly handled before this point, UNHEALTHY and DEGRADED routes reaching the promotion step would result in switching production traffic to degraded service.

Suggested fix: Consider adding explicit handling for UNHEALTHY and DEGRADED:

  • Either treat them like PROVISIONING (wait for them to become healthy)
  • Or treat them like failures (count toward a failure threshold)
  • At minimum, add a comment explaining why treating them as healthy is intentional if that is the design choice


total_green_live = len(green_provisioning) + len(green_healthy)

# ── 2. No green routes → create all green (INACTIVE) ──
if total_green_live == 0 and not green_failed:
log.debug(
"deployment {}: no green routes — creating {} INACTIVE routes",
deployment.id,
desired,
)
route_changes = RouteChanges(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEDIUM] Finding #3: Mixed healthy+failed green routes return PROGRESSING indefinitely with no recovery path

When some green routes are healthy and some have failed (but none are provisioning), the FSM reaches step 5 (len(green_healthy) < desired) and returns PROGRESSING. However, since:

  • Failed routes cannot recover on their own (they are in terminal states FAILED_TO_START / TERMINATED)
  • No new green routes are created to replace them
  • The desired count will never be met

This creates a stuck deployment that will return PROGRESSING forever.

Suggested fix: Consider one of:

  1. If the ratio of failed to total green exceeds a threshold, trigger a rollback.
  2. Create replacement green routes for the failed ones (retry semantics).
  3. At minimum, add a max_failures or failure_threshold field to BlueGreenSpec to control this behavior.

Note: This finding is partially related to Finding #1 -- if failed routes are not fetched from the DB, this scenario manifests as fewer green routes than expected, but the FSM still gets stuck at step 5.

rollout_specs=_build_route_creators(deployment, desired),
)
return CycleEvaluationResult(
sub_step=DeploymentSubStep.PROVISIONING,
route_changes=route_changes,
)

# ── 3. Green PROVISIONING → wait ──
if green_provisioning:
log.debug(
"deployment {}: {} green routes still provisioning",
deployment.id,
len(green_provisioning),
)
return CycleEvaluationResult(sub_step=DeploymentSubStep.PROVISIONING)

# ── 4. All green failed → rollback ──
if total_green_live == 0 and green_failed:
log.warning(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[HIGH] Finding #2: promote_delay_seconds > 0 creates an infinite wait -- promotion will never happen automatically

When auto_promote=True and promote_delay_seconds > 0, this code always returns PROGRESSING without tracking when the delay period started. Since blue_green_evaluate is a pure function called on each evaluation cycle with no external state, there is no mechanism to determine when the delay has elapsed.

Every subsequent cycle will re-evaluate, see promote_delay_seconds > 0 is still true, and return PROGRESSING again -- indefinitely. The auto-promotion will never trigger.

Suggested fix: Either:

  1. Track the timestamp when all green routes first became healthy (e.g., in deployment state or a separate field), then compare now - first_all_healthy_time >= promote_delay_seconds to decide whether to promote or keep waiting.
  2. Use an external timer/scheduler that triggers promotion after the delay.
  3. If this is a known limitation for the initial implementation, document it clearly and consider validating that promote_delay_seconds == 0 when auto_promote=True until the delay tracking is implemented.

"deployment {}: all {} green routes failed — rolling back",
deployment.id,
len(green_failed),
)
route_changes = RouteChanges(
drain_route_ids=[r.route_id for r in green_failed],
)
return CycleEvaluationResult(
sub_step=DeploymentSubStep.ROLLED_BACK,
route_changes=route_changes,
)

# ── 5. Not all green healthy → PROGRESSING (wait) ──
if len(green_healthy) < desired:
log.debug(
"deployment {}: green healthy={}/{} — waiting",
deployment.id,
len(green_healthy),
desired,
)
return CycleEvaluationResult(sub_step=DeploymentSubStep.PROGRESSING)

# ── All green healthy from here ──

# ── 6. auto_promote=False → PROGRESSING (manual wait) ──
if not spec.auto_promote:
log.debug(
"deployment {}: all green healthy, waiting for manual promotion",
deployment.id,
)
return CycleEvaluationResult(sub_step=DeploymentSubStep.PROGRESSING)

# ── 7. auto_promote=True + delay>0 → check elapsed time ──
if spec.promote_delay_seconds > 0:
latest_healthy_at = _latest_status_updated_at(green_healthy)
if latest_healthy_at is None:
log.debug(
"deployment {}: all green healthy but status_updated_at unknown — waiting",
deployment.id,
)
return CycleEvaluationResult(sub_step=DeploymentSubStep.PROGRESSING)
elapsed = (datetime.now(UTC) - latest_healthy_at).total_seconds()
if elapsed < spec.promote_delay_seconds:
log.debug(
"deployment {}: promote delay {:.0f}/{} seconds elapsed — waiting",
deployment.id,
elapsed,
spec.promote_delay_seconds,
)
return CycleEvaluationResult(sub_step=DeploymentSubStep.PROGRESSING)

# ── 8. Promotion: green → ACTIVE, blue → TERMINATING ──
log.info(
"deployment {}: promoting {} green routes, terminating {} blue routes",
deployment.id,
len(green_healthy),
len(blue_active),
)
route_changes = RouteChanges(
promote_route_ids=[r.route_id for r in green_healthy],
drain_route_ids=[r.route_id for r in blue_active],
)
return CycleEvaluationResult(
sub_step=DeploymentSubStep.PROGRESSING,
completed=True,
route_changes=route_changes,
)


def _latest_status_updated_at(routes: list[RouteInfo]) -> datetime | None:
"""Return the most recent status_updated_at among the given routes."""
timestamps = [r.status_updated_at for r in routes if r.status_updated_at is not None]
return max(timestamps) if timestamps else None


def _build_route_creators(
deployment: DeploymentInfo,
count: int,
) -> list[Creator[RoutingRow]]:
"""Build route creator specs for green routes (INACTIVE, traffic_ratio=0.0)."""
creators: list[Creator[RoutingRow]] = []
for _ in range(count):
creator_spec = RouteCreatorSpec(
endpoint_id=deployment.id,
session_owner_id=deployment.metadata.session_owner,
domain=deployment.metadata.domain,
project_id=deployment.metadata.project,
revision_id=deployment.deploying_revision_id,
traffic_status=RouteTrafficStatus.INACTIVE,
traffic_ratio=0.0,
)
creators.append(Creator(spec=creator_spec))
return creators
13 changes: 10 additions & 3 deletions src/ai/backend/manager/sokovan/deployment/strategy/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async def evaluate(
)
)
policy_map = {p.endpoint: p for p in policy_search.items}
route_map = await self._deployment_repo.fetch_active_routes_by_endpoint_ids(endpoint_ids)
route_map = await self._deployment_repo.fetch_routes_by_endpoint_ids(endpoint_ids)

# ── 2. Per-deployment evaluation ──
for deployment in deployments:
Expand All @@ -92,6 +92,7 @@ async def evaluate(
changes = cycle_result.route_changes
result.route_changes.rollout_specs.extend(changes.rollout_specs)
result.route_changes.drain_route_ids.extend(changes.drain_route_ids)
result.route_changes.promote_route_ids.extend(changes.promote_route_ids)
self._record_route_changes(deployment, changes)

# Group by sub-step
Expand All @@ -109,8 +110,8 @@ async def evaluate(

@staticmethod
def _record_route_changes(deployment: DeploymentInfo, changes: RouteChanges) -> None:
"""Record rollout/drain operations as sub-steps for observability."""
if not changes.rollout_specs and not changes.drain_route_ids:
"""Record rollout/drain/promote operations as sub-steps for observability."""
if not changes.rollout_specs and not changes.drain_route_ids and not changes.promote_route_ids:
return
pool = DeploymentRecorderContext.current_pool()
recorder = pool.recorder(deployment.id)
Expand All @@ -127,6 +128,12 @@ def _record_route_changes(deployment: DeploymentInfo, changes: RouteChanges) ->
success_detail=f"{len(changes.drain_route_ids)} route(s)",
):
pass
if changes.promote_route_ids:
with recorder.step(
"promote",
success_detail=f"{len(changes.promote_route_ids)} route(s)",
):
pass

def _evaluate_single(
self,
Expand Down
Loading
Loading