From 2af00ae0a4127ac7e9b6692584db1e0cb83bff42 Mon Sep 17 00:00:00 2001 From: Kacy Fortner Date: Wed, 8 Apr 2026 00:36:02 +0000 Subject: [PATCH 01/13] Track partially failed release state --- src/api/routes/cluster_agents/app_routes.zig | 45 +++++++++++++++++++ .../routes/cluster_agents/deploy_routes.zig | 26 ++++++++++- src/manifest/apply_release.zig | 1 + src/manifest/update.zig | 2 + src/manifest/update/common.zig | 3 ++ 5 files changed, 76 insertions(+), 1 deletion(-) diff --git a/src/api/routes/cluster_agents/app_routes.zig b/src/api/routes/cluster_agents/app_routes.zig index 6f0892f3..7c890c97 100644 --- a/src/api/routes/cluster_agents/app_routes.zig +++ b/src/api/routes/cluster_agents/app_routes.zig @@ -530,6 +530,51 @@ test "app apply route preserves failed release metadata across reads" { try expectJsonContains(history_response.body, "\"message\":\"one or more placements failed\""); } +test "app apply route preserves partially failed release metadata across reads" { + const alloc = std.testing.allocator; + const apply_body = + \\{"app_name":"demo-app","services":[{"name":"web","image":"alpine","command":["echo","hello"]},{"name":"db","image":"alpine","command":["echo","db"],"cpu_limit":999999,"memory_limit_mb":999999}]} + ; + + var harness = try RouteFlowHarness.init(alloc); + defer harness.deinit(); + + const apply_response = harness.appApply(apply_body); + defer freeResponse(alloc, apply_response); + + try expectResponseOk(apply_response); + try expectJsonContains(apply_response.body, "\"trigger\":\"apply\""); + try expectJsonContains(apply_response.body, "\"status\":\"partially_failed\""); + try expectJsonContains(apply_response.body, "\"placed\":1"); + try expectJsonContains(apply_response.body, "\"failed\":1"); + try expectJsonContains(apply_response.body, "\"source_release_id\":null"); + try expectJsonContains(apply_response.body, "\"message\":\"one or more placements failed\""); + + const release_id = json_helpers.extractJsonString(apply_response.body, "release_id").?; + + const status_response = harness.status("demo-app"); + defer freeResponse(alloc, status_response); + + try expectResponseOk(status_response); + try expectJsonContains(status_response.body, "\"release_id\":\""); + try expectJsonContains(status_response.body, release_id); + try expectJsonContains(status_response.body, "\"trigger\":\"apply\""); + try expectJsonContains(status_response.body, "\"status\":\"partially_failed\""); + try expectJsonContains(status_response.body, "\"source_release_id\":null"); + try expectJsonContains(status_response.body, "\"message\":\"one or more placements failed\""); + + const history_response = harness.history("demo-app"); + defer freeResponse(alloc, history_response); + + try expectResponseOk(history_response); + try expectJsonContains(history_response.body, "\"id\":\""); + try expectJsonContains(history_response.body, release_id); + try expectJsonContains(history_response.body, "\"trigger\":\"apply\""); + try expectJsonContains(history_response.body, "\"status\":\"partially_failed\""); + try expectJsonContains(history_response.body, "\"source_release_id\":null"); + try expectJsonContains(history_response.body, "\"message\":\"one or more placements failed\""); +} + test "route rejects app rollback without cluster" { const body = "{\"release_id\":\"abc123def456\"}"; const request = http.Request{ diff --git a/src/api/routes/cluster_agents/deploy_routes.zig b/src/api/routes/cluster_agents/deploy_routes.zig index ef5cf76e..0963e9a7 100644 --- a/src/api/routes/cluster_agents/deploy_routes.zig +++ b/src/api/routes/cluster_agents/deploy_routes.zig @@ -158,7 +158,12 @@ const ClusterApplyBackend = struct { } return .{ - .status = if (failed == 0) .completed else .failed, + .status = if (failed == 0) + .completed + else if (placed > 0) + .partially_failed + else + .failed, .message = if (failed == 0) "all placements succeeded" else "one or more placements failed", .placed = placed, .failed = failed, @@ -337,6 +342,25 @@ test "formatAppApplyResponse includes rollback trigger metadata" { try std.testing.expect(std.mem.indexOf(u8, json, "\"message\":\"rollback to dep-1 completed: all placements succeeded\"") != null); } +test "formatAppApplyResponse includes partially failed status" { + const alloc = std.testing.allocator; + const json = try formatAppApplyResponse(alloc, .{ + .app_name = "demo-app", + .release_id = "dep-3", + .status = .partially_failed, + .service_count = 2, + .placed = 1, + .failed = 1, + .message = "one or more placements failed", + }); + defer alloc.free(json); + + try std.testing.expect(std.mem.indexOf(u8, json, "\"status\":\"partially_failed\"") != null); + try std.testing.expect(std.mem.indexOf(u8, json, "\"placed\":1") != null); + try std.testing.expect(std.mem.indexOf(u8, json, "\"failed\":1") != null); + try std.testing.expect(std.mem.indexOf(u8, json, "\"message\":\"one or more placements failed\"") != null); +} + test "formatLegacyApplyResponse preserves compact deploy shape" { const alloc = std.testing.allocator; const json = try formatLegacyApplyResponse(alloc, 1, 1); diff --git a/src/manifest/apply_release.zig b/src/manifest/apply_release.zig index 6aa687eb..abfdfc77 100644 --- a/src/manifest/apply_release.zig +++ b/src/manifest/apply_release.zig @@ -148,6 +148,7 @@ pub fn materializeMessage( try alloc.dupe(u8, message) else switch (status) { .completed => try alloc.dupe(u8, "apply completed"), + .partially_failed => try alloc.dupe(u8, "apply partially failed"), .failed => try alloc.dupe(u8, "apply failed"), else => try alloc.dupe(u8, status_text), }, diff --git a/src/manifest/update.zig b/src/manifest/update.zig index 10b7799b..2db8fb3f 100644 --- a/src/manifest/update.zig +++ b/src/manifest/update.zig @@ -270,11 +270,13 @@ const test_callbacks = UpdateCallbacks{ test "deployment status round-trip" { try std.testing.expectEqualStrings("pending", DeploymentStatus.pending.toString()); try std.testing.expectEqualStrings("in_progress", DeploymentStatus.in_progress.toString()); + try std.testing.expectEqualStrings("partially_failed", DeploymentStatus.partially_failed.toString()); try std.testing.expectEqualStrings("completed", DeploymentStatus.completed.toString()); try std.testing.expectEqualStrings("failed", DeploymentStatus.failed.toString()); try std.testing.expectEqualStrings("rolled_back", DeploymentStatus.rolled_back.toString()); try std.testing.expectEqual(DeploymentStatus.pending, DeploymentStatus.fromString("pending").?); + try std.testing.expectEqual(DeploymentStatus.partially_failed, DeploymentStatus.fromString("partially_failed").?); try std.testing.expectEqual(DeploymentStatus.completed, DeploymentStatus.fromString("completed").?); try std.testing.expect(DeploymentStatus.fromString("unknown") == null); } diff --git a/src/manifest/update/common.zig b/src/manifest/update/common.zig index fcc17ae3..0cce4791 100644 --- a/src/manifest/update/common.zig +++ b/src/manifest/update/common.zig @@ -8,6 +8,7 @@ pub const FailureAction = enum { pub const DeploymentStatus = enum { pending, in_progress, + partially_failed, completed, failed, rolled_back, @@ -16,6 +17,7 @@ pub const DeploymentStatus = enum { return switch (self) { .pending => "pending", .in_progress => "in_progress", + .partially_failed => "partially_failed", .completed => "completed", .failed => "failed", .rolled_back => "rolled_back", @@ -25,6 +27,7 @@ pub const DeploymentStatus = enum { pub fn fromString(s: []const u8) ?DeploymentStatus { if (std.mem.eql(u8, s, "pending")) return .pending; if (std.mem.eql(u8, s, "in_progress")) return .in_progress; + if (std.mem.eql(u8, s, "partially_failed")) return .partially_failed; if (std.mem.eql(u8, s, "completed")) return .completed; if (std.mem.eql(u8, s, "failed")) return .failed; if (std.mem.eql(u8, s, "rolled_back")) return .rolled_back; From 5ab0608320cbbe29e4c588190a1d2688a94baf9c Mon Sep 17 00:00:00 2001 From: Kacy Fortner Date: Wed, 8 Apr 2026 02:02:53 +0000 Subject: [PATCH 02/13] Track partial failure state in rolling updates --- src/manifest/update.zig | 62 +++++++++++++++++++++++++++ src/manifest/update/batch_runtime.zig | 11 +++-- 2 files changed, 70 insertions(+), 3 deletions(-) diff --git a/src/manifest/update.zig b/src/manifest/update.zig index 2db8fb3f..2ddaae30 100644 --- a/src/manifest/update.zig +++ b/src/manifest/update.zig @@ -153,6 +153,7 @@ pub fn performRollingUpdate( ); if (!all_healthy) { + progress.failed += batch_new_ids.items.len; return batch_runtime.handleBatchFailure( strategy, context, @@ -402,6 +403,67 @@ test "start failure triggers pause when configured" { try std.testing.expectError(UpdateError.UpdatePaused, result); } +test "pause failure stays failed before any replacement" { + var progress = UpdateProgress{ + .total_containers = 2, + .replaced = 0, + .failed = 1, + .status = .in_progress, + .message = null, + }; + var new_container_ids = std.ArrayList([12]u8).empty; + const context = UpdateContext{ + .service_name = "web", + .manifest_hash = "sha256:bad", + .config_snapshot = "{}", + .old_container_ids = &.{ "old-1", "old-2" }, + .callbacks = test_callbacks, + }; + + const result = batch_runtime.handleBatchFailure( + .{ .failure_action = .pause }, + &context, + null, + &new_container_ids, + &progress, + "one or more containers failed to start", + ); + + try std.testing.expectEqual(UpdateError.UpdatePaused, result); + try std.testing.expectEqual(DeploymentStatus.failed, progress.status); +} + +test "pause failure after cutover becomes partially failed" { + var progress = UpdateProgress{ + .total_containers = 2, + .replaced = 1, + .failed = 1, + .status = .in_progress, + .message = null, + }; + var new_container_ids = std.ArrayList([12]u8).empty; + const context = UpdateContext{ + .service_name = "web", + .manifest_hash = "sha256:partial", + .config_snapshot = "{}", + .old_container_ids = &.{ "old-1", "old-2" }, + .callbacks = test_callbacks, + }; + + const result = batch_runtime.handleBatchFailure( + .{ .failure_action = .pause }, + &context, + null, + &new_container_ids, + &progress, + "health checks failed for new containers", + ); + + try std.testing.expectEqual(UpdateError.UpdatePaused, result); + try std.testing.expectEqual(DeploymentStatus.partially_failed, progress.status); + try std.testing.expectEqualStrings("health checks failed for new containers", progress.message.?); +} + test "partial batch start failure rolls back before stopping old containers" { resetTestState(); test_fail_start_call = 1; diff --git a/src/manifest/update/batch_runtime.zig b/src/manifest/update/batch_runtime.zig index 95072b9f..49a86ac7 100644 --- a/src/manifest/update/batch_runtime.zig +++ b/src/manifest/update/batch_runtime.zig @@ -3,6 +3,10 @@ const log = @import("../../lib/log.zig"); const deployment_store = @import("deployment_store.zig"); const common = @import("common.zig"); +pub fn pausedFailureStatus(progress: *const common.UpdateProgress) common.DeploymentStatus { + return if (progress.replaced > 0) .partially_failed else .failed; +} + pub fn handleBatchFailure( strategy: common.UpdateStrategy, context: *const common.UpdateContext, @@ -33,12 +37,13 @@ pub fn handleBatchFailure( return common.UpdateError.BatchFailed; }, .pause => { - progress.status = .failed; + const status = pausedFailureStatus(progress); + progress.status = status; progress.message = reason; if (deployment_id) |id| { - deployment_store.updateDeploymentStatus(id, .failed, reason) catch |e| { - log.warn("failed to update deployment status to failed: {}", .{e}); + deployment_store.updateDeploymentStatus(id, status, reason) catch |e| { + log.warn("failed to update deployment status to {s}: {}", .{ status.toString(), e }); }; } From 61486260c791c99bb341dfd4716f01d3fdc1b6a8 Mon Sep 17 00:00:00 2001 From: Kacy Fortner Date: Wed, 8 Apr 2026 02:54:52 +0000 Subject: [PATCH 03/13] Persist richer app release states --- src/manifest/apply_release.zig | 20 ++++++++++++- src/manifest/local_apply_backend.zig | 6 +--- src/manifest/release_history.zig | 42 ++++++++++++++++++++++++++-- 3 files changed, 60 insertions(+), 8 deletions(-) diff --git a/src/manifest/apply_release.zig b/src/manifest/apply_release.zig index abfdfc77..29094f99 100644 --- a/src/manifest/apply_release.zig +++ b/src/manifest/apply_release.zig @@ -147,10 +147,12 @@ pub fn materializeMessage( .apply => if (explicit) |message| try alloc.dupe(u8, message) else switch (status) { + .pending => try alloc.dupe(u8, "apply pending"), + .in_progress => try alloc.dupe(u8, "apply in progress"), .completed => try alloc.dupe(u8, "apply completed"), .partially_failed => try alloc.dupe(u8, "apply partially failed"), .failed => try alloc.dupe(u8, "apply failed"), - else => try alloc.dupe(u8, status_text), + .rolled_back => try alloc.dupe(u8, "apply rolled back"), }, .rollback => if (context.source_release_id) |source_id| if (explicit) |message| @@ -343,6 +345,22 @@ test "materializeMessage contextualizes rollback transitions" { ); } +test "materializeMessage defaults are operator friendly for orchestration states" { + const alloc = std.testing.allocator; + + const pending = try materializeMessage(alloc, .{}, .pending, null); + defer alloc.free(pending.?); + try std.testing.expectEqualStrings("apply pending", pending.?); + + const partial = try materializeMessage(alloc, .{}, .partially_failed, null); + defer alloc.free(partial.?); + try std.testing.expectEqualStrings("apply partially failed", partial.?); + + const rolled_back = try materializeMessage(alloc, .{}, .rolled_back, null); + defer alloc.free(rolled_back.?); + try std.testing.expectEqualStrings("apply rolled back", rolled_back.?); +} + test "reportFromDeployment preserves release metadata and counts services" { const dep = store.DeploymentRecord{ .id = "dep-22", diff --git a/src/manifest/local_apply_backend.zig b/src/manifest/local_apply_backend.zig index 23637203..f36650c8 100644 --- a/src/manifest/local_apply_backend.zig +++ b/src/manifest/local_apply_backend.zig @@ -147,11 +147,7 @@ const LocalReleaseTracker = struct { const resolved_message = try apply_release.materializeMessage(self.plan.alloc, self.context, status, message); defer if (resolved_message) |msg| self.plan.alloc.free(msg); - switch (status) { - .completed => release_history.markAppReleaseCompleted(id, resolved_message) catch {}, - .failed => release_history.markAppReleaseFailed(id, resolved_message) catch {}, - else => {}, - } + release_history.markAppReleaseStatus(id, status, resolved_message) catch {}; } pub fn freeReleaseId(self: *const LocalReleaseTracker, id: []const u8) void { diff --git a/src/manifest/release_history.zig b/src/manifest/release_history.zig index 8561c964..ed6a76ce 100644 --- a/src/manifest/release_history.zig +++ b/src/manifest/release_history.zig @@ -2,6 +2,7 @@ const std = @import("std"); const store = @import("../state/store.zig"); const deployment_store = @import("update/deployment_store.zig"); const release_plan = @import("release_plan.zig"); +const update_common = @import("update/common.zig"); pub fn recordAppReleaseStart(plan: *const release_plan.ReleasePlan) ![]const u8 { const id = try deployment_store.generateDeploymentId(plan.alloc); @@ -19,12 +20,16 @@ pub fn recordAppReleaseStart(plan: *const release_plan.ReleasePlan) ![]const u8 return id; } +pub fn markAppReleaseStatus(id: []const u8, status: update_common.DeploymentStatus, message: ?[]const u8) !void { + try deployment_store.updateDeploymentStatus(id, status, message); +} + pub fn markAppReleaseCompleted(id: []const u8, message: ?[]const u8) !void { - try deployment_store.updateDeploymentStatus(id, .completed, message); + try markAppReleaseStatus(id, .completed, message); } pub fn markAppReleaseFailed(id: []const u8, message: ?[]const u8) !void { - try deployment_store.updateDeploymentStatus(id, .failed, message); + try markAppReleaseStatus(id, .failed, message); } pub fn rollbackApp(alloc: std.mem.Allocator, app_name: []const u8) ![]const u8 { @@ -71,3 +76,36 @@ test "recordAppReleaseStart stores app-scoped deployment metadata" { try std.testing.expectEqualStrings("demo-app", deployments.items[0].service_name); try std.testing.expectEqualStrings("in_progress", deployments.items[0].status); } + +test "markAppReleaseStatus persists partially failed state" { + const alloc = std.testing.allocator; + try store.initTestDb(); + defer store.deinitTestDb(); + + const app_spec = @import("app_spec.zig"); + const loader = @import("loader.zig"); + + var manifest = try loader.loadFromString(alloc, + \\[service.web] + \\image = "nginx:latest" + ); + defer manifest.deinit(); + + var app = try app_spec.fromManifest(alloc, "demo-app", &manifest); + defer app.deinit(); + + var plan = try release_plan.ReleasePlan.fromAppSpec(alloc, &app, &.{}); + defer plan.deinit(); + + const id = try recordAppReleaseStart(&plan); + defer alloc.free(id); + + try markAppReleaseStatus(id, .partially_failed, "one or more placements failed"); + + const dep = try store.getLatestDeploymentByApp(alloc, "demo-app"); + defer dep.deinit(alloc); + + try std.testing.expectEqualStrings(id, dep.id); + try std.testing.expectEqualStrings("partially_failed", dep.status); + try std.testing.expectEqualStrings("one or more placements failed", dep.message.?); +} From 7214ce7afe3a829ed18894b264f8b0e089ed01a9 Mon Sep 17 00:00:00 2001 From: Kacy Fortner Date: Wed, 8 Apr 2026 03:26:33 +0000 Subject: [PATCH 04/13] Cover local app status for partial failures --- src/runtime/cli/status_command.zig | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/runtime/cli/status_command.zig b/src/runtime/cli/status_command.zig index ed1bf743..1ffab7e2 100644 --- a/src/runtime/cli/status_command.zig +++ b/src/runtime/cli/status_command.zig @@ -477,3 +477,31 @@ test "writeAppStatusJsonObject round-trips through remote parser" { try std.testing.expectEqualStrings(snapshot.source_release_id.?, parsed.source_release_id.?); try std.testing.expectEqualStrings(snapshot.message.?, parsed.message.?); } + +test "appStatusFromReport preserves partially failed local release state" { + const dep = store.DeploymentRecord{ + .id = "dep-3", + .app_name = "demo-app", + .service_name = "demo-app", + .manifest_hash = "sha256:333", + .config_snapshot = "{\"app_name\":\"demo-app\",\"services\":[{\"name\":\"web\"},{\"name\":\"db\"}]}", + .status = "partially_failed", + .message = "one or more placements failed", + .created_at = 300, + }; + + const local = appStatusFromReport(apply_release.reportFromDeployment(dep)); + const remote = parseAppStatusResponse( + \\{"app_name":"demo-app","trigger":"apply","release_id":"dep-3","status":"partially_failed","manifest_hash":"sha256:333","created_at":300,"service_count":2,"source_release_id":null,"message":"one or more placements failed"} + ); + + try std.testing.expectEqualStrings(local.app_name, remote.app_name); + try std.testing.expectEqualStrings(local.trigger, remote.trigger); + try std.testing.expectEqualStrings(local.release_id, remote.release_id); + try std.testing.expectEqualStrings(local.status, remote.status); + try std.testing.expectEqualStrings(local.manifest_hash, remote.manifest_hash); + try std.testing.expectEqual(local.created_at, remote.created_at); + try std.testing.expectEqual(local.service_count, remote.service_count); + try std.testing.expect(local.source_release_id == null); + try std.testing.expectEqualStrings(local.message.?, remote.message.?); +} From 72a82d5f669f0564ae43288fe181c4ad0adfd428 Mon Sep 17 00:00:00 2001 From: Kacy Fortner Date: Wed, 8 Apr 2026 03:31:53 +0000 Subject: [PATCH 05/13] Cover local app history for partial failures --- src/manifest/cli/ops.zig | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/manifest/cli/ops.zig b/src/manifest/cli/ops.zig index 87210907..3940f486 100644 --- a/src/manifest/cli/ops.zig +++ b/src/manifest/cli/ops.zig @@ -411,6 +411,34 @@ test "writeHistoryJsonObject round-trips through remote parser" { try std.testing.expectEqualStrings(entry.message.?, parsed.message.?); } +test "historyEntryFromDeployment preserves partially failed local release state" { + const dep = store.DeploymentRecord{ + .id = "dep-3", + .app_name = "demo-app", + .service_name = "demo-app", + .manifest_hash = "sha256:333", + .config_snapshot = "{\"app_name\":\"demo-app\",\"services\":[{\"name\":\"web\"},{\"name\":\"db\"}]}", + .status = "partially_failed", + .message = "one or more placements failed", + .created_at = 300, + }; + + const local = historyEntryFromDeployment(dep); + const remote = parseHistoryObject( + \\{"id":"dep-3","app":"demo-app","service":"demo-app","trigger":"apply","status":"partially_failed","manifest_hash":"sha256:333","created_at":300,"source_release_id":null,"message":"one or more placements failed"} + ); + + try std.testing.expectEqualStrings(local.id, remote.id); + try std.testing.expectEqualStrings(local.app.?, remote.app.?); + try std.testing.expectEqualStrings(local.service, remote.service); + try std.testing.expectEqualStrings(local.trigger, remote.trigger); + try std.testing.expectEqualStrings(local.status, remote.status); + try std.testing.expectEqualStrings(local.manifest_hash, remote.manifest_hash); + try std.testing.expectEqual(local.created_at, remote.created_at); + try std.testing.expect(local.source_release_id == null); + try std.testing.expectEqualStrings(local.message.?, remote.message.?); +} + pub fn runWorker(args: *std.process.ArgIterator, alloc: std.mem.Allocator) !void { var manifest_path: []const u8 = manifest_loader.default_filename; var worker_name: ?[]const u8 = null; From 7c8db098a600ef2b854ccf15be27ff16d0bbb638 Mon Sep 17 00:00:00 2001 From: Kacy Fortner Date: Wed, 8 Apr 2026 03:35:12 +0000 Subject: [PATCH 06/13] Detect local replacement apply candidates --- src/manifest/local_apply_backend.zig | 88 ++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/src/manifest/local_apply_backend.zig b/src/manifest/local_apply_backend.zig index f36650c8..a8dd1213 100644 --- a/src/manifest/local_apply_backend.zig +++ b/src/manifest/local_apply_backend.zig @@ -5,6 +5,7 @@ const release_history = @import("release_history.zig"); const release_plan = @import("release_plan.zig"); const orchestrator = @import("orchestrator.zig"); const startup_runtime = @import("orchestrator/startup_runtime.zig"); +const store = @import("../state/store.zig"); const watcher_mod = @import("../dev/watcher.zig"); const spec = @import("spec.zig"); const proxy_control_plane = @import("../network/proxy/control_plane.zig"); @@ -14,10 +15,22 @@ const listener_runtime = @import("../network/proxy/listener_runtime.zig"); const writeErr = cli.writeErr; +pub const LocalApplyMode = enum { + fresh, + replacement_candidate, +}; + +pub const LocalApplyScope = struct { + mode: LocalApplyMode, + existing_target_count: usize, + new_target_count: usize, +}; + pub const PreparedLocalApply = struct { alloc: std.mem.Allocator, manifest: *spec.Manifest, release: *const release_plan.ReleasePlan, + scope: LocalApplyScope, orch: orchestrator.Orchestrator, runtime_started: bool = false, @@ -27,6 +40,7 @@ pub const PreparedLocalApply = struct { release: *const release_plan.ReleasePlan, dev_mode: bool, ) !PreparedLocalApply { + const scope = detectApplyScope(alloc, release); var orch = try orchestrator.Orchestrator.init(alloc, manifest, release.app.app_name); errdefer orch.deinit(); @@ -42,6 +56,7 @@ pub const PreparedLocalApply = struct { .alloc = alloc, .manifest = manifest, .release = release, + .scope = scope, .orch = orch, }; } @@ -125,6 +140,35 @@ pub const PreparedLocalApply = struct { } }; +fn detectApplyScope(alloc: std.mem.Allocator, release: *const release_plan.ReleasePlan) LocalApplyScope { + var existing_target_count: usize = 0; + var new_target_count: usize = 0; + + for (release.app.services) |svc| { + const record = store.findAppContainer(alloc, release.app.app_name, svc.name) catch { + new_target_count += 1; + continue; + }; + + if (record) |container| { + defer container.deinit(alloc); + if (!std.mem.eql(u8, container.status, "stopped")) { + existing_target_count += 1; + } else { + new_target_count += 1; + } + } else { + new_target_count += 1; + } + } + + return .{ + .mode = if (existing_target_count > 0) .replacement_candidate else .fresh, + .existing_target_count = existing_target_count, + .new_target_count = new_target_count, + }; +} + pub const DevWatcherRuntime = struct { watcher: ?watcher_mod.Watcher = null, thread: ?std.Thread = null, @@ -202,3 +246,47 @@ test "PreparedLocalApply init resolves filtered start set" { try std.testing.expect(start_set.contains("db")); try std.testing.expect(start_set.contains("web")); } + +test "PreparedLocalApply detects replacement candidates from existing app containers" { + const alloc = std.testing.allocator; + try store.initTestDb(); + defer store.deinitTestDb(); + + const loader = @import("loader.zig"); + const app_spec = @import("app_spec.zig"); + + var manifest = try loader.loadFromString(alloc, + \\[service.db] + \\image = "postgres:latest" + \\ + \\[service.web] + \\image = "nginx:latest" + \\depends_on = ["db"] + ); + defer manifest.deinit(); + + var app = try app_spec.fromManifest(alloc, "demo-app", &manifest); + defer app.deinit(); + + var release = try release_plan.ReleasePlan.fromAppSpec(alloc, &app, &.{"web"}); + defer release.deinit(); + + try store.save(.{ + .id = "abcdef123456", + .rootfs = "/tmp/rootfs", + .command = "/bin/sh", + .hostname = "web", + .status = "running", + .pid = null, + .exit_code = null, + .app_name = "demo-app", + .created_at = 100, + }); + + var prepared = try PreparedLocalApply.init(alloc, &manifest, &release, false); + defer prepared.deinit(); + + try std.testing.expectEqual(LocalApplyMode.replacement_candidate, prepared.scope.mode); + try std.testing.expectEqual(@as(usize, 1), prepared.scope.existing_target_count); + try std.testing.expectEqual(@as(usize, 1), prepared.scope.new_target_count); +} From 62a619d22b92f4879502b114b21893aec8758212 Mon Sep 17 00:00:00 2001 From: Kacy Fortner Date: Wed, 8 Apr 2026 11:32:01 +0000 Subject: [PATCH 07/13] Extract single-service orchestrator startup --- src/manifest/orchestrator.zig | 45 +++++++ .../orchestrator/lifecycle_support.zig | 119 ++++++++++-------- 2 files changed, 109 insertions(+), 55 deletions(-) diff --git a/src/manifest/orchestrator.zig b/src/manifest/orchestrator.zig index fb67ae0b..0f0bcc44 100644 --- a/src/manifest/orchestrator.zig +++ b/src/manifest/orchestrator.zig @@ -587,3 +587,48 @@ test "computeStartSet: no filter starts everything" { try std.testing.expect(orch.shouldStart("db")); try std.testing.expect(orch.shouldStart("anything")); } + +fn fakeStartServiceThread(orch: *Orchestrator, idx: usize) void { + orch.states[idx].status = .running; +} + +test "startServiceByIndex launches a single service thread" { + const alloc = std.testing.allocator; + + var services = [_]spec.Service{ + testSvc("web", &.{}), + }; + var manifest = spec.Manifest{ + .services = &services, + .workers = &.{}, + .crons = &.{}, + .training_jobs = &.{}, + .volumes = &.{}, + .alloc = alloc, + }; + + const states = try alloc.alloc(ServiceState, 1); + defer alloc.free(states); + for (states) |*s| s.* = .{ .container_id = undefined, .thread = null, .status = .pending }; + + const flags = try alloc.alloc(std.atomic.Value(bool), 1); + defer alloc.free(flags); + for (flags) |*f| f.* = std.atomic.Value(bool).init(false); + + var orch = Orchestrator{ + .alloc = alloc, + .manifest = &manifest, + .app_name = "test", + .states = states, + .restart_requested = flags, + }; + + var completed_workers: std.StringHashMapUnmanaged(void) = .empty; + defer completed_workers.deinit(alloc); + + try lifecycle_support.startServiceByIndex(&orch, OrchestratorError, 0, &completed_workers, fakeStartServiceThread); + defer if (orch.states[0].thread) |thread| thread.join(); + + try std.testing.expectEqual(ServiceState.Status.running, orch.states[0].status); + try std.testing.expect(orch.states[0].thread != null); +} diff --git a/src/manifest/orchestrator/lifecycle_support.zig b/src/manifest/orchestrator/lifecycle_support.zig index 939d280b..fbf46633 100644 --- a/src/manifest/orchestrator/lifecycle_support.zig +++ b/src/manifest/orchestrator/lifecycle_support.zig @@ -76,63 +76,10 @@ pub fn startAll(self: anytype, comptime OrchestratorError: type, serviceThreadFn for (services, 0..) |svc, i| { if (!shouldStart(self, svc.name)) continue; - - for (svc.depends_on) |dep_name| { - if (self.manifest.workerByName(dep_name)) |worker| { - if (!completed_workers.contains(dep_name)) { - writeErr("running worker {s}...\n", .{dep_name}); - if (!service_runtime.runOneShot( - self.alloc, - worker.image, - worker.command, - worker.env, - worker.volumes, - worker.working_dir, - dep_name, - self.manifest.volumes, - self.app_name, - )) { - writeErr("worker '{s}' failed\n", .{dep_name}); - self.stopAll(); - return OrchestratorError.StartFailed; - } - completed_workers.put(self.alloc, dep_name, {}) catch {}; - writeErr(" worker {s} completed\n", .{dep_name}); - } - } else { - const dep_idx = serviceIndex(self, dep_name) orelse continue; - if (!waitForRunning(self, dep_idx)) { - writeErr("dependency '{s}' failed to start\n", .{dep_name}); - self.stopAll(); - return OrchestratorError.StartFailed; - } - } - } - - self.states[i].status = .starting; - container.generateId(&self.states[i].container_id) catch { - writeErr("failed to generate container ID for {s}\n", .{svc.name}); - self.states[i].status = .failed; + startServiceByIndex(self, OrchestratorError, i, &completed_workers, serviceThreadFn) catch |err| { self.stopAll(); - return OrchestratorError.StartFailed; + return err; }; - - const thread = std.Thread.spawn(.{}, serviceThreadFn, .{ self, i }) catch { - writeErr("failed to spawn thread for {s}\n", .{svc.name}); - self.states[i].status = .failed; - self.stopAll(); - return OrchestratorError.StartFailed; - }; - self.states[i].thread = thread; - - if (!waitForRunning(self, i)) { - writeErr("service '{s}' failed to start\n", .{svc.name}); - self.stopAll(); - return OrchestratorError.StartFailed; - } - - const id = self.states[i].container_id; - writeErr("started {s} ({s})\n", .{ svc.name, id[0..] }); } self.registerHealthChecks(); @@ -154,6 +101,68 @@ pub fn startAll(self: anytype, comptime OrchestratorError: type, serviceThreadFn } } +pub fn startServiceByIndex( + self: anytype, + comptime OrchestratorError: type, + idx: usize, + completed_workers: *std.StringHashMapUnmanaged(void), + serviceThreadFn: anytype, +) OrchestratorError!void { + const svc = self.manifest.services[idx]; + + for (svc.depends_on) |dep_name| { + if (self.manifest.workerByName(dep_name)) |worker| { + if (!completed_workers.contains(dep_name)) { + writeErr("running worker {s}...\n", .{dep_name}); + if (!service_runtime.runOneShot( + self.alloc, + worker.image, + worker.command, + worker.env, + worker.volumes, + worker.working_dir, + dep_name, + self.manifest.volumes, + self.app_name, + )) { + writeErr("worker '{s}' failed\n", .{dep_name}); + return OrchestratorError.StartFailed; + } + completed_workers.put(self.alloc, dep_name, {}) catch {}; + writeErr(" worker {s} completed\n", .{dep_name}); + } + } else { + const dep_idx = serviceIndex(self, dep_name) orelse continue; + if (!waitForRunning(self, dep_idx)) { + writeErr("dependency '{s}' failed to start\n", .{dep_name}); + return OrchestratorError.StartFailed; + } + } + } + + self.states[idx].status = .starting; + container.generateId(&self.states[idx].container_id) catch { + writeErr("failed to generate container ID for {s}\n", .{svc.name}); + self.states[idx].status = .failed; + return OrchestratorError.StartFailed; + }; + + const thread = std.Thread.spawn(.{}, serviceThreadFn, .{ self, idx }) catch { + writeErr("failed to spawn thread for {s}\n", .{svc.name}); + self.states[idx].status = .failed; + return OrchestratorError.StartFailed; + }; + self.states[idx].thread = thread; + + if (!waitForRunning(self, idx)) { + writeErr("service '{s}' failed to start\n", .{svc.name}); + return OrchestratorError.StartFailed; + } + + const id = self.states[idx].container_id; + writeErr("started {s} ({s})\n", .{ svc.name, id[0..] }); +} + pub fn stopAll(self: anytype) void { if (self.cron_sched) |cs| { cs.stop(); From a24932bfff5086696bb6b9d26c00225015b5224f Mon Sep 17 00:00:00 2001 From: Kacy Fortner Date: Wed, 8 Apr 2026 11:47:53 +0000 Subject: [PATCH 08/13] Extract single-service orchestrator shutdown --- src/manifest/orchestrator.zig | 54 +++++++++++++++++++ .../orchestrator/lifecycle_support.zig | 45 +++++++++------- 2 files changed, 81 insertions(+), 18 deletions(-) diff --git a/src/manifest/orchestrator.zig b/src/manifest/orchestrator.zig index 0f0bcc44..0cfb3b36 100644 --- a/src/manifest/orchestrator.zig +++ b/src/manifest/orchestrator.zig @@ -592,6 +592,8 @@ fn fakeStartServiceThread(orch: *Orchestrator, idx: usize) void { orch.states[idx].status = .running; } +fn fakeJoinableThread(_: *Orchestrator, _: usize) void {} + test "startServiceByIndex launches a single service thread" { const alloc = std.testing.allocator; @@ -632,3 +634,55 @@ test "startServiceByIndex launches a single service thread" { try std.testing.expectEqual(ServiceState.Status.running, orch.states[0].status); try std.testing.expect(orch.states[0].thread != null); } + +test "stopServiceByIndex marks running service stopped without pid" { + const alloc = std.testing.allocator; + try @import("../state/store.zig").initTestDb(); + defer @import("../state/store.zig").deinitTestDb(); + + var services = [_]spec.Service{ + testSvc("web", &.{}), + }; + var manifest = spec.Manifest{ + .services = &services, + .workers = &.{}, + .crons = &.{}, + .training_jobs = &.{}, + .volumes = &.{}, + .alloc = alloc, + }; + + const states = try alloc.alloc(ServiceState, 1); + defer alloc.free(states); + for (states) |*s| s.* = .{ .container_id = "abcdef123456".*, .thread = null, .status = .running }; + + const flags = try alloc.alloc(std.atomic.Value(bool), 1); + defer alloc.free(flags); + for (flags) |*f| f.* = std.atomic.Value(bool).init(false); + + var orch = Orchestrator{ + .alloc = alloc, + .manifest = &manifest, + .app_name = "test", + .states = states, + .restart_requested = flags, + }; + + try @import("../state/store.zig").save(.{ + .id = "abcdef123456", + .rootfs = "/tmp/rootfs", + .command = "/bin/sh", + .hostname = "web", + .status = "running", + .pid = null, + .exit_code = null, + .app_name = "test", + .created_at = 100, + }); + + orch.states[0].thread = try std.Thread.spawn(.{}, fakeJoinableThread, .{ &orch, 0 }); + lifecycle_support.stopServiceByIndex(&orch, 0); + + try std.testing.expectEqual(ServiceState.Status.stopped, orch.states[0].status); + try std.testing.expect(orch.states[0].thread == null); +} diff --git a/src/manifest/orchestrator/lifecycle_support.zig b/src/manifest/orchestrator/lifecycle_support.zig index fbf46633..3007af9f 100644 --- a/src/manifest/orchestrator/lifecycle_support.zig +++ b/src/manifest/orchestrator/lifecycle_support.zig @@ -184,31 +184,40 @@ pub fn stopAll(self: anytype) void { var i: usize = services.len; while (i > 0) { i -= 1; - if (self.states[i].status != .running and self.states[i].status != .starting) continue; + stopServiceByIndex(self, i); + } +} - const id = self.states[i].container_id; - writeErr("stopping {s}...\n", .{services[i].name}); +pub fn stopServiceByIndex(self: anytype, idx: usize) void { + if (self.states[idx].status != .running and self.states[idx].status != .starting) return; - const record = store.load(self.alloc, id[0..]) catch { - log.warn("orchestrator: failed to load container for shutdown: {s}", .{services[i].name}); - continue; - }; - defer record.deinit(self.alloc); + const svc = self.manifest.services[idx]; + health.unregisterService(svc.name); + + const id = self.states[idx].container_id; + writeErr("stopping {s}...\n", .{svc.name}); - if (record.pid) |pid| { - process.terminate(pid) catch { - process.kill(pid) catch {}; - }; + const record = store.load(self.alloc, id[0..]) catch { + log.warn("orchestrator: failed to load container for shutdown: {s}", .{svc.name}); + self.states[idx].status = .stopped; + if (self.states[idx].thread) |thread| { + thread.join(); + self.states[idx].thread = null; } + return; + }; + defer record.deinit(self.alloc); - self.states[i].status = .stopped; + if (record.pid) |pid| { + process.terminate(pid) catch { + process.kill(pid) catch {}; + }; } - for (self.states) |*state| { - if (state.thread) |thread| { - thread.join(); - state.thread = null; - } + self.states[idx].status = .stopped; + if (self.states[idx].thread) |thread| { + thread.join(); + self.states[idx].thread = null; } } From c64fee9bdc592a28fdcaf41b1cda8f32debb1b16 Mon Sep 17 00:00:00 2001 From: Kacy Fortner Date: Wed, 8 Apr 2026 11:49:47 +0000 Subject: [PATCH 09/13] Classify local replacement service sets --- src/manifest/local_apply_backend.zig | 93 ++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/src/manifest/local_apply_backend.zig b/src/manifest/local_apply_backend.zig index a8dd1213..bdf1fcb3 100644 --- a/src/manifest/local_apply_backend.zig +++ b/src/manifest/local_apply_backend.zig @@ -95,6 +95,14 @@ pub const PreparedLocalApply = struct { return apply_result.toReport(self.release.app.app_name, self.release.resolvedServiceCount(), context); } + pub fn replacementServiceIndexes(self: *const PreparedLocalApply, alloc: std.mem.Allocator) !std.ArrayList(usize) { + return classifyServiceIndexes(alloc, self.manifest, self.release, self.scope, true); + } + + pub fn newServiceIndexes(self: *const PreparedLocalApply, alloc: std.mem.Allocator) !std.ArrayList(usize) { + return classifyServiceIndexes(alloc, self.manifest, self.release, self.scope, false); + } + pub fn startDevWatcher(self: *PreparedLocalApply) DevWatcherRuntime { var runtime = DevWatcherRuntime{}; runtime.watcher = watcher_mod.Watcher.init(self.alloc) catch |e| blk: { @@ -169,6 +177,41 @@ fn detectApplyScope(alloc: std.mem.Allocator, release: *const release_plan.Relea }; } +fn classifyServiceIndexes( + alloc: std.mem.Allocator, + manifest: *const spec.Manifest, + release: *const release_plan.ReleasePlan, + scope: LocalApplyScope, + want_existing: bool, +) !std.ArrayList(usize) { + var indexes: std.ArrayList(usize) = .empty; + + for (manifest.services, 0..) |svc, idx| { + if (!release.includesService(svc.name)) continue; + if (scope.mode == .fresh) { + if (!want_existing) try indexes.append(alloc, idx); + continue; + } + + const record = store.findAppContainer(alloc, release.app.app_name, svc.name) catch { + if (!want_existing) try indexes.append(alloc, idx); + continue; + }; + + if (record) |container| { + defer container.deinit(alloc); + const is_existing = !std.mem.eql(u8, container.status, "stopped"); + if (is_existing == want_existing) { + try indexes.append(alloc, idx); + } + } else if (!want_existing) { + try indexes.append(alloc, idx); + } + } + + return indexes; +} + pub const DevWatcherRuntime = struct { watcher: ?watcher_mod.Watcher = null, thread: ?std.Thread = null, @@ -290,3 +333,53 @@ test "PreparedLocalApply detects replacement candidates from existing app contai try std.testing.expectEqual(@as(usize, 1), prepared.scope.existing_target_count); try std.testing.expectEqual(@as(usize, 1), prepared.scope.new_target_count); } + +test "PreparedLocalApply classifies replacement and new service indexes" { + const alloc = std.testing.allocator; + try store.initTestDb(); + defer store.deinitTestDb(); + + const loader = @import("loader.zig"); + const app_spec = @import("app_spec.zig"); + + var manifest = try loader.loadFromString(alloc, + \\[service.db] + \\image = "postgres:latest" + \\ + \\[service.web] + \\image = "nginx:latest" + \\depends_on = ["db"] + ); + defer manifest.deinit(); + + var app = try app_spec.fromManifest(alloc, "demo-app", &manifest); + defer app.deinit(); + + var release = try release_plan.ReleasePlan.fromAppSpec(alloc, &app, &.{"web"}); + defer release.deinit(); + + try store.save(.{ + .id = "abcdef123456", + .rootfs = "/tmp/rootfs", + .command = "/bin/sh", + .hostname = "web", + .status = "running", + .pid = null, + .exit_code = null, + .app_name = "demo-app", + .created_at = 100, + }); + + var prepared = try PreparedLocalApply.init(alloc, &manifest, &release, false); + defer prepared.deinit(); + + var replacement_indexes = try prepared.replacementServiceIndexes(alloc); + defer replacement_indexes.deinit(alloc); + var new_indexes = try prepared.newServiceIndexes(alloc); + defer new_indexes.deinit(alloc); + + try std.testing.expectEqual(@as(usize, 1), replacement_indexes.items.len); + try std.testing.expectEqual(@as(usize, 1), new_indexes.items.len); + try std.testing.expectEqual(@as(usize, 1), replacement_indexes.items[0]); + try std.testing.expectEqual(@as(usize, 0), new_indexes.items[0]); +} From 037bc89558fe521df4fe6a7e8a2564797bddc76f Mon Sep 17 00:00:00 2001 From: Kacy Fortner Date: Wed, 8 Apr 2026 12:36:00 +0000 Subject: [PATCH 10/13] Branch local applies into replacement flow --- src/manifest/local_apply_backend.zig | 136 +++++++++++++++++++++++++++ src/manifest/orchestrator.zig | 12 +++ 2 files changed, 148 insertions(+) diff --git a/src/manifest/local_apply_backend.zig b/src/manifest/local_apply_backend.zig index bdf1fcb3..4258c3e0 100644 --- a/src/manifest/local_apply_backend.zig +++ b/src/manifest/local_apply_backend.zig @@ -90,6 +90,7 @@ pub const PreparedLocalApply = struct { var apply_backend = LocalApplyBackend{ .orch = &self.orch, .release = self.release, + .scope = self.scope, }; const apply_result = try apply_release.execute(&release_tracker, &apply_backend); return apply_result.toReport(self.release.app.app_name, self.release.resolvedServiceCount(), context); @@ -212,6 +213,20 @@ fn classifyServiceIndexes( return indexes; } +fn syncExistingServiceStates(orch: *orchestrator.Orchestrator, release: *const release_plan.ReleasePlan) void { + for (orch.manifest.services, 0..) |svc, idx| { + if (!release.includesService(svc.name)) continue; + const record = store.findAppContainer(orch.alloc, release.app.app_name, svc.name) catch continue; + if (record) |container| { + defer container.deinit(orch.alloc); + if (std.mem.eql(u8, container.status, "stopped")) continue; + if (container.id.len != orch.states[idx].container_id.len) continue; + @memcpy(&orch.states[idx].container_id, container.id); + orch.states[idx].status = .running; + } + } +} + pub const DevWatcherRuntime = struct { watcher: ?watcher_mod.Watcher = null, thread: ?std.Thread = null, @@ -245,8 +260,13 @@ const LocalReleaseTracker = struct { const LocalApplyBackend = struct { orch: *orchestrator.Orchestrator, release: *const release_plan.ReleasePlan, + scope: LocalApplyScope, pub fn apply(self: *const LocalApplyBackend) !apply_release.ApplyOutcome { + if (self.scope.mode == .replacement_candidate) { + return self.applyReplacementCandidate(); + } + try self.orch.startAll(); return .{ .status = .completed, @@ -255,6 +275,76 @@ const LocalApplyBackend = struct { }; } + fn applyReplacementCandidate(self: *const LocalApplyBackend) !apply_release.ApplyOutcome { + syncExistingServiceStates(self.orch, self.release); + + var completed_workers: std.StringHashMapUnmanaged(void) = .empty; + defer completed_workers.deinit(self.orch.alloc); + + var new_indexes = try classifyServiceIndexes( + self.orch.alloc, + self.orch.manifest, + self.release, + self.scope, + false, + ); + defer new_indexes.deinit(self.orch.alloc); + + var replacement_indexes = try classifyServiceIndexes( + self.orch.alloc, + self.orch.manifest, + self.release, + self.scope, + true, + ); + defer replacement_indexes.deinit(self.orch.alloc); + + var placed: usize = 0; + var failed: usize = 0; + var mutated = false; + + for (new_indexes.items) |idx| { + self.orch.startServiceByIndex(idx, &completed_workers) catch { + failed += 1; + if (!mutated) return error.StartFailed; + continue; + }; + placed += 1; + mutated = true; + } + + for (replacement_indexes.items) |idx| { + self.orch.stopServiceByIndex(idx); + mutated = true; + self.orch.startServiceByIndex(idx, &completed_workers) catch { + failed += 1; + continue; + }; + placed += 1; + } + + self.orch.startTlsProxy(); + + if (failed > 0) { + return .{ + .status = .partially_failed, + .message = "one or more local service replacements failed", + .placed = placed, + .failed = failed, + }; + } + + return .{ + .status = .completed, + .message = if (replacement_indexes.items.len > 0) + "all requested services replaced" + else + "all requested services started", + .placed = placed, + .failed = 0, + }; + } + pub fn failureMessage(_: *const LocalApplyBackend, _: anytype) ?[]const u8 { return "service startup failed"; } @@ -383,3 +473,49 @@ test "PreparedLocalApply classifies replacement and new service indexes" { try std.testing.expectEqual(@as(usize, 1), replacement_indexes.items[0]); try std.testing.expectEqual(@as(usize, 0), new_indexes.items[0]); } + +test "syncExistingServiceStates marks selected running services" { + const alloc = std.testing.allocator; + try store.initTestDb(); + defer store.deinitTestDb(); + + const loader = @import("loader.zig"); + const app_spec = @import("app_spec.zig"); + + var manifest = try loader.loadFromString(alloc, + \\[service.db] + \\image = "postgres:latest" + \\ + \\[service.web] + \\image = "nginx:latest" + \\depends_on = ["db"] + ); + defer manifest.deinit(); + + var app = try app_spec.fromManifest(alloc, "demo-app", &manifest); + defer app.deinit(); + + var release = try release_plan.ReleasePlan.fromAppSpec(alloc, &app, &.{"web"}); + defer release.deinit(); + + try store.save(.{ + .id = "abcdef123456", + .rootfs = "/tmp/rootfs", + .command = "/bin/sh", + .hostname = "web", + .status = "running", + .pid = null, + .exit_code = null, + .app_name = "demo-app", + .created_at = 100, + }); + + var prepared = try PreparedLocalApply.init(alloc, &manifest, &release, false); + defer prepared.deinit(); + + syncExistingServiceStates(&prepared.orch, &release); + + try std.testing.expectEqual(orchestrator.ServiceState.Status.pending, prepared.orch.states[0].status); + try std.testing.expectEqual(orchestrator.ServiceState.Status.running, prepared.orch.states[1].status); + try std.testing.expectEqualStrings("abcdef123456", prepared.orch.states[1].container_id[0..]); +} diff --git a/src/manifest/orchestrator.zig b/src/manifest/orchestrator.zig index 0cfb3b36..425916ba 100644 --- a/src/manifest/orchestrator.zig +++ b/src/manifest/orchestrator.zig @@ -154,6 +154,14 @@ pub const Orchestrator = struct { return lifecycle_support.startAll(self, OrchestratorError, serviceThread); } + pub fn startServiceByIndex( + self: *Orchestrator, + idx: usize, + completed_workers: *std.StringHashMapUnmanaged(void), + ) OrchestratorError!void { + return lifecycle_support.startServiceByIndex(self, OrchestratorError, idx, completed_workers, serviceThread); + } + /// register services for health checking and start the checker thread. pub fn registerHealthChecks(self: *Orchestrator) void { startup_runtime.registerHealthChecks( @@ -185,6 +193,10 @@ pub const Orchestrator = struct { lifecycle_support.stopAll(self); } + pub fn stopServiceByIndex(self: *Orchestrator, idx: usize) void { + lifecycle_support.stopServiceByIndex(self, idx); + } + /// block until shutdown is requested (SIGINT/SIGTERM) or all services exit. pub fn waitForShutdown(self: *Orchestrator) void { lifecycle_support.waitForShutdown(self, &shutdown_requested); From 9cbe5344ffdb9eae2a93070141b98fb0b0b8d072 Mon Sep 17 00:00:00 2001 From: Kacy Fortner Date: Wed, 8 Apr 2026 12:55:31 +0000 Subject: [PATCH 11/13] Cover local replacement apply outcomes --- src/manifest/local_apply_backend.zig | 200 +++++++++++++++++++++------ 1 file changed, 157 insertions(+), 43 deletions(-) diff --git a/src/manifest/local_apply_backend.zig b/src/manifest/local_apply_backend.zig index 4258c3e0..cf37bca7 100644 --- a/src/manifest/local_apply_backend.zig +++ b/src/manifest/local_apply_backend.zig @@ -227,6 +227,61 @@ fn syncExistingServiceStates(orch: *orchestrator.Orchestrator, release: *const r } } +fn runReplacementPlan( + runner: anytype, + alloc: std.mem.Allocator, + new_indexes: []const usize, + replacement_indexes: []const usize, +) !apply_release.ApplyOutcome { + var completed_workers: std.StringHashMapUnmanaged(void) = .empty; + defer completed_workers.deinit(alloc); + + var placed: usize = 0; + var failed: usize = 0; + var mutated = false; + + for (new_indexes) |idx| { + runner.start(idx, &completed_workers) catch { + failed += 1; + if (!mutated) return error.StartFailed; + continue; + }; + placed += 1; + mutated = true; + } + + for (replacement_indexes) |idx| { + runner.stop(idx); + mutated = true; + runner.start(idx, &completed_workers) catch { + failed += 1; + continue; + }; + placed += 1; + } + + runner.finish(); + + if (failed > 0) { + return .{ + .status = .partially_failed, + .message = "one or more local service replacements failed", + .placed = placed, + .failed = failed, + }; + } + + return .{ + .status = .completed, + .message = if (replacement_indexes.len > 0) + "all requested services replaced" + else + "all requested services started", + .placed = placed, + .failed = 0, + }; +} + pub const DevWatcherRuntime = struct { watcher: ?watcher_mod.Watcher = null, thread: ?std.Thread = null, @@ -278,9 +333,6 @@ const LocalApplyBackend = struct { fn applyReplacementCandidate(self: *const LocalApplyBackend) !apply_release.ApplyOutcome { syncExistingServiceStates(self.orch, self.release); - var completed_workers: std.StringHashMapUnmanaged(void) = .empty; - defer completed_workers.deinit(self.orch.alloc); - var new_indexes = try classifyServiceIndexes( self.orch.alloc, self.orch.manifest, @@ -299,50 +351,28 @@ const LocalApplyBackend = struct { ); defer replacement_indexes.deinit(self.orch.alloc); - var placed: usize = 0; - var failed: usize = 0; - var mutated = false; + var runner = struct { + orch: *orchestrator.Orchestrator, - for (new_indexes.items) |idx| { - self.orch.startServiceByIndex(idx, &completed_workers) catch { - failed += 1; - if (!mutated) return error.StartFailed; - continue; - }; - placed += 1; - mutated = true; - } - - for (replacement_indexes.items) |idx| { - self.orch.stopServiceByIndex(idx); - mutated = true; - self.orch.startServiceByIndex(idx, &completed_workers) catch { - failed += 1; - continue; - }; - placed += 1; - } + fn start(runner_self: *@This(), idx: usize, completed_workers: *std.StringHashMapUnmanaged(void)) !void { + try runner_self.orch.startServiceByIndex(idx, completed_workers); + } - self.orch.startTlsProxy(); + fn stop(runner_self: *@This(), idx: usize) void { + runner_self.orch.stopServiceByIndex(idx); + } - if (failed > 0) { - return .{ - .status = .partially_failed, - .message = "one or more local service replacements failed", - .placed = placed, - .failed = failed, - }; - } + fn finish(runner_self: *@This()) void { + runner_self.orch.startTlsProxy(); + } + }{ .orch = self.orch }; - return .{ - .status = .completed, - .message = if (replacement_indexes.items.len > 0) - "all requested services replaced" - else - "all requested services started", - .placed = placed, - .failed = 0, - }; + return runReplacementPlan( + &runner, + self.orch.alloc, + new_indexes.items, + replacement_indexes.items, + ); } pub fn failureMessage(_: *const LocalApplyBackend, _: anytype) ?[]const u8 { @@ -519,3 +549,87 @@ test "syncExistingServiceStates marks selected running services" { try std.testing.expectEqual(orchestrator.ServiceState.Status.running, prepared.orch.states[1].status); try std.testing.expectEqualStrings("abcdef123456", prepared.orch.states[1].container_id[0..]); } + +test "runReplacementPlan counts started and replaced services" { + const alloc = std.testing.allocator; + + const Runner = struct { + started: std.ArrayList(usize), + stopped: std.ArrayList(usize), + tls_started: bool = false, + + fn start(self: *@This(), idx: usize, _: *std.StringHashMapUnmanaged(void)) !void { + try self.started.append(alloc, idx); + } + + fn stop(self: *@This(), idx: usize) void { + self.stopped.append(alloc, idx) catch unreachable; + } + + fn finish(self: *@This()) void { + self.tls_started = true; + } + }; + + var runner = Runner{ + .started = .empty, + .stopped = .empty, + }; + defer runner.started.deinit(alloc); + defer runner.stopped.deinit(alloc); + + const outcome = try runReplacementPlan(&runner, alloc, &.{0}, &.{1}); + + try std.testing.expectEqual(@as(usize, 2), outcome.placed); + try std.testing.expectEqual(@as(usize, 0), outcome.failed); + try std.testing.expectEqual(@import("update/common.zig").DeploymentStatus.completed, outcome.status); + try std.testing.expectEqualStrings("all requested services replaced", outcome.message.?); + try std.testing.expect(runner.tls_started); + try std.testing.expectEqual(@as(usize, 2), runner.started.items.len); + try std.testing.expectEqual(@as(usize, 1), runner.stopped.items.len); + try std.testing.expectEqual(@as(usize, 0), runner.started.items[0]); + try std.testing.expectEqual(@as(usize, 1), runner.started.items[1]); + try std.testing.expectEqual(@as(usize, 1), runner.stopped.items[0]); +} + +test "runReplacementPlan reports partial failure after mutation" { + const alloc = std.testing.allocator; + + const Runner = struct { + fail_index: usize, + started: std.ArrayList(usize), + stopped: std.ArrayList(usize), + tls_started: bool = false, + + fn start(self: *@This(), idx: usize, _: *std.StringHashMapUnmanaged(void)) !void { + if (idx == self.fail_index) return error.StartFailed; + try self.started.append(alloc, idx); + } + + fn stop(self: *@This(), idx: usize) void { + self.stopped.append(alloc, idx) catch unreachable; + } + + fn finish(self: *@This()) void { + self.tls_started = true; + } + }; + + var runner = Runner{ + .fail_index = 1, + .started = .empty, + .stopped = .empty, + }; + defer runner.started.deinit(alloc); + defer runner.stopped.deinit(alloc); + + const outcome = try runReplacementPlan(&runner, alloc, &.{0}, &.{1}); + + try std.testing.expectEqual(@as(usize, 1), outcome.placed); + try std.testing.expectEqual(@as(usize, 1), outcome.failed); + try std.testing.expectEqual(@import("update/common.zig").DeploymentStatus.partially_failed, outcome.status); + try std.testing.expectEqualStrings("one or more local service replacements failed", outcome.message.?); + try std.testing.expect(runner.tls_started); + try std.testing.expectEqual(@as(usize, 1), runner.started.items.len); + try std.testing.expectEqual(@as(usize, 1), runner.stopped.items.len); +} From 1fbafb74a6801ca7c1ef389c71539fabcabdc258 Mon Sep 17 00:00:00 2001 From: Kacy Fortner Date: Wed, 8 Apr 2026 13:23:23 +0000 Subject: [PATCH 12/13] Pin local replacement branch selection --- src/manifest/local_apply_backend.zig | 58 ++++++++++++++++++++++++---- 1 file changed, 50 insertions(+), 8 deletions(-) diff --git a/src/manifest/local_apply_backend.zig b/src/manifest/local_apply_backend.zig index cf37bca7..e0f7a777 100644 --- a/src/manifest/local_apply_backend.zig +++ b/src/manifest/local_apply_backend.zig @@ -282,6 +282,13 @@ fn runReplacementPlan( }; } +fn runScopedApply(scope: LocalApplyScope, runner: anytype) !apply_release.ApplyOutcome { + return switch (scope.mode) { + .fresh => runner.runFresh(), + .replacement_candidate => runner.runReplacement(), + }; +} + pub const DevWatcherRuntime = struct { watcher: ?watcher_mod.Watcher = null, thread: ?std.Thread = null, @@ -318,16 +325,23 @@ const LocalApplyBackend = struct { scope: LocalApplyScope, pub fn apply(self: *const LocalApplyBackend) !apply_release.ApplyOutcome { - if (self.scope.mode == .replacement_candidate) { - return self.applyReplacementCandidate(); - } + var runner = struct { + backend: *const LocalApplyBackend, + + fn runFresh(runner_self: *@This()) !apply_release.ApplyOutcome { + try runner_self.backend.orch.startAll(); + return .{ + .status = .completed, + .message = "all requested services started", + .placed = runner_self.backend.release.resolvedServiceCount(), + }; + } - try self.orch.startAll(); - return .{ - .status = .completed, - .message = "all requested services started", - .placed = self.release.resolvedServiceCount(), + fn runReplacement(runner_self: *@This()) !apply_release.ApplyOutcome { + return runner_self.backend.applyReplacementCandidate(); + } }; + return runScopedApply(self.scope, &runner); } fn applyReplacementCandidate(self: *const LocalApplyBackend) !apply_release.ApplyOutcome { @@ -633,3 +647,31 @@ test "runReplacementPlan reports partial failure after mutation" { try std.testing.expectEqual(@as(usize, 1), runner.started.items.len); try std.testing.expectEqual(@as(usize, 1), runner.stopped.items.len); } + +test "runScopedApply chooses replacement branch for replacement candidates" { + const Runner = struct { + fresh_calls: usize = 0, + replacement_calls: usize = 0, + + fn runFresh(self: *@This()) !apply_release.ApplyOutcome { + self.fresh_calls += 1; + return .{ .status = .completed, .placed = 1 }; + } + + fn runReplacement(self: *@This()) !apply_release.ApplyOutcome { + self.replacement_calls += 1; + return .{ .status = .completed, .placed = 2 }; + } + }; + + var runner = Runner{}; + const outcome = try runScopedApply(.{ + .mode = .replacement_candidate, + .existing_target_count = 1, + .new_target_count = 0, + }, &runner); + + try std.testing.expectEqual(@as(usize, 0), runner.fresh_calls); + try std.testing.expectEqual(@as(usize, 1), runner.replacement_calls); + try std.testing.expectEqual(@as(usize, 2), outcome.placed); +} From b63615767a2e92feed0f398c418caa4b56fcbe32 Mon Sep 17 00:00:00 2001 From: Kacy Fortner Date: Wed, 8 Apr 2026 13:54:01 +0000 Subject: [PATCH 13/13] Simplify local apply replacement logic --- src/manifest/local_apply_backend.zig | 79 +++++++++++++--------------- 1 file changed, 37 insertions(+), 42 deletions(-) diff --git a/src/manifest/local_apply_backend.zig b/src/manifest/local_apply_backend.zig index e0f7a777..c93f3415 100644 --- a/src/manifest/local_apply_backend.zig +++ b/src/manifest/local_apply_backend.zig @@ -26,6 +26,11 @@ pub const LocalApplyScope = struct { new_target_count: usize, }; +const ExistingServiceState = enum { + active, + inactive, +}; + pub const PreparedLocalApply = struct { alloc: std.mem.Allocator, manifest: *spec.Manifest, @@ -154,20 +159,9 @@ fn detectApplyScope(alloc: std.mem.Allocator, release: *const release_plan.Relea var new_target_count: usize = 0; for (release.app.services) |svc| { - const record = store.findAppContainer(alloc, release.app.app_name, svc.name) catch { - new_target_count += 1; - continue; - }; - - if (record) |container| { - defer container.deinit(alloc); - if (!std.mem.eql(u8, container.status, "stopped")) { - existing_target_count += 1; - } else { - new_target_count += 1; - } - } else { - new_target_count += 1; + switch (existingServiceState(alloc, release.app.app_name, svc.name)) { + .active => existing_target_count += 1, + .inactive => new_target_count += 1, } } @@ -194,18 +188,8 @@ fn classifyServiceIndexes( continue; } - const record = store.findAppContainer(alloc, release.app.app_name, svc.name) catch { - if (!want_existing) try indexes.append(alloc, idx); - continue; - }; - - if (record) |container| { - defer container.deinit(alloc); - const is_existing = !std.mem.eql(u8, container.status, "stopped"); - if (is_existing == want_existing) { - try indexes.append(alloc, idx); - } - } else if (!want_existing) { + const is_existing = existingServiceState(alloc, release.app.app_name, svc.name) == .active; + if (is_existing == want_existing) { try indexes.append(alloc, idx); } } @@ -213,6 +197,15 @@ fn classifyServiceIndexes( return indexes; } +fn existingServiceState(alloc: std.mem.Allocator, app_name: []const u8, service_name: []const u8) ExistingServiceState { + const record = store.findAppContainer(alloc, app_name, service_name) catch return .inactive; + if (record) |container| { + defer container.deinit(alloc); + return if (std.mem.eql(u8, container.status, "stopped")) .inactive else .active; + } + return .inactive; +} + fn syncExistingServiceStates(orch: *orchestrator.Orchestrator, release: *const release_plan.ReleasePlan) void { for (orch.manifest.services, 0..) |svc, idx| { if (!release.includesService(svc.name)) continue; @@ -325,22 +318,7 @@ const LocalApplyBackend = struct { scope: LocalApplyScope, pub fn apply(self: *const LocalApplyBackend) !apply_release.ApplyOutcome { - var runner = struct { - backend: *const LocalApplyBackend, - - fn runFresh(runner_self: *@This()) !apply_release.ApplyOutcome { - try runner_self.backend.orch.startAll(); - return .{ - .status = .completed, - .message = "all requested services started", - .placed = runner_self.backend.release.resolvedServiceCount(), - }; - } - - fn runReplacement(runner_self: *@This()) !apply_release.ApplyOutcome { - return runner_self.backend.applyReplacementCandidate(); - } - }; + var runner = ScopedApplyRunner{ .backend = self }; return runScopedApply(self.scope, &runner); } @@ -394,6 +372,23 @@ const LocalApplyBackend = struct { } }; +const ScopedApplyRunner = struct { + backend: *const LocalApplyBackend, + + fn runFresh(self: *@This()) !apply_release.ApplyOutcome { + try self.backend.orch.startAll(); + return .{ + .status = .completed, + .message = "all requested services started", + .placed = self.backend.release.resolvedServiceCount(), + }; + } + + fn runReplacement(self: *@This()) !apply_release.ApplyOutcome { + return self.backend.applyReplacementCandidate(); + } +}; + test "PreparedLocalApply init resolves filtered start set" { const alloc = std.testing.allocator; const loader = @import("loader.zig");