diff --git a/src/api/routes/cluster_agents/app_routes.zig b/src/api/routes/cluster_agents/app_routes.zig index 59e19ad..6f0892f 100644 --- a/src/api/routes/cluster_agents/app_routes.zig +++ b/src/api/routes/cluster_agents/app_routes.zig @@ -1,6 +1,10 @@ const std = @import("std"); const http = @import("../../http.zig"); +const sqlite = @import("sqlite"); +const cluster_node = @import("../../../cluster/node.zig"); const json_helpers = @import("../../../lib/json_helpers.zig"); +const apply_release = @import("../../../manifest/apply_release.zig"); +const schema = @import("../../../state/schema.zig"); const store = @import("../../../state/store.zig"); const common = @import("../common.zig"); const deploy_routes = @import("deploy_routes.zig"); @@ -56,7 +60,7 @@ pub fn handleAppStatus(alloc: std.mem.Allocator, app_name: []const u8, ctx: Rout }; defer latest.deinit(alloc); - const body = formatAppStatusResponse(alloc, latest, countServices(latest.config_snapshot)) catch + const body = formatAppStatusResponse(alloc, apply_release.reportFromDeployment(latest)) catch return common.internalError(); return .{ .status = .ok, .body = body, .allocated = true }; } @@ -91,15 +95,7 @@ pub fn handleAppRollback( .body = release.config_snapshot, .content_length = release.config_snapshot.len, }; - return deploy_routes.handleAppApply(alloc, apply_request, ctx); -} - -fn countServices(snapshot: []const u8) usize { - const services = json_helpers.extractJsonArray(snapshot, "services") orelse return 0; - var iter = json_helpers.extractJsonObjects(services); - var count: usize = 0; - while (iter.next() != null) count += 1; - return count; + return deploy_routes.handleAppRollbackApply(alloc, apply_request, ctx, release_id); } fn formatAppHistoryResponse(alloc: std.mem.Allocator, deployments: []const store.DeploymentRecord) ![]u8 { @@ -109,30 +105,25 @@ fn formatAppHistoryResponse(alloc: std.mem.Allocator, deployments: []const store try writer.writeByte('['); for (deployments, 0..) |dep, i| { + const report = apply_release.reportFromDeployment(dep); if (i > 0) try writer.writeByte(','); - try writer.writeAll("{\"id\":\""); - try json_helpers.writeJsonEscaped(writer, dep.id); - if (dep.app_name) |app_name| { - try writer.writeAll("\",\"app\":\""); - try json_helpers.writeJsonEscaped(writer, app_name); - try writer.writeByte('"'); - } else { - try writer.writeAll("\",\"app\":null"); - } - try writer.writeAll(",\"service\":\""); - try json_helpers.writeJsonEscaped(writer, dep.service_name); - try writer.writeAll("\",\"status\":\""); - try json_helpers.writeJsonEscaped(writer, dep.status); - try writer.writeAll("\",\"manifest_hash\":\""); - try json_helpers.writeJsonEscaped(writer, dep.manifest_hash); - try writer.print("\",\"created_at\":{d}", .{dep.created_at}); - if (dep.message) |message| { - try writer.writeAll(",\"message\":\""); - try json_helpers.writeJsonEscaped(writer, message); - try writer.writeByte('"'); - } else { - try writer.writeAll(",\"message\":null"); - } + try writer.writeByte('{'); + try json_helpers.writeJsonStringField(writer, "id", report.release_id orelse ""); + try writer.writeByte(','); + try json_helpers.writeNullableJsonStringField(writer, "app", dep.app_name); + try writer.writeByte(','); + try json_helpers.writeJsonStringField(writer, "service", dep.service_name); + try writer.writeByte(','); + try json_helpers.writeJsonStringField(writer, "trigger", report.trigger.toString()); + try writer.writeByte(','); + try json_helpers.writeJsonStringField(writer, "status", report.status.toString()); + try writer.writeByte(','); + try json_helpers.writeJsonStringField(writer, "manifest_hash", report.manifest_hash); + try writer.print(",\"created_at\":{d}", .{report.created_at}); + try writer.writeByte(','); + try json_helpers.writeNullableJsonStringField(writer, "source_release_id", report.source_release_id); + try writer.writeByte(','); + try json_helpers.writeNullableJsonStringField(writer, "message", report.message); try writer.writeByte('}'); } try writer.writeByte(']'); @@ -141,36 +132,128 @@ fn formatAppHistoryResponse(alloc: std.mem.Allocator, deployments: []const store fn formatAppStatusResponse( alloc: std.mem.Allocator, - latest: store.DeploymentRecord, - service_count: usize, + report: apply_release.ApplyReport, ) ![]u8 { var json_buf: std.ArrayList(u8) = .empty; errdefer json_buf.deinit(alloc); const writer = json_buf.writer(alloc); - try writer.writeAll("{\"app_name\":\""); - try json_helpers.writeJsonEscaped(writer, latest.app_name orelse latest.service_name); - try writer.writeAll("\",\"release_id\":\""); - try json_helpers.writeJsonEscaped(writer, latest.id); - try writer.writeAll("\",\"status\":\""); - try json_helpers.writeJsonEscaped(writer, latest.status); - try writer.writeAll("\",\"manifest_hash\":\""); - try json_helpers.writeJsonEscaped(writer, latest.manifest_hash); - try writer.print("\",\"created_at\":{d},\"service_count\":{d}", .{ - latest.created_at, - service_count, + try writer.writeByte('{'); + try json_helpers.writeJsonStringField(writer, "app_name", report.app_name); + try writer.writeByte(','); + try json_helpers.writeJsonStringField(writer, "trigger", report.trigger.toString()); + try writer.writeByte(','); + try json_helpers.writeJsonStringField(writer, "release_id", report.release_id orelse ""); + try writer.writeByte(','); + try json_helpers.writeJsonStringField(writer, "status", report.status.toString()); + try writer.writeByte(','); + try json_helpers.writeJsonStringField(writer, "manifest_hash", report.manifest_hash); + try writer.print(",\"created_at\":{d},\"service_count\":{d}", .{ + report.created_at, + report.service_count, }); - if (latest.message) |message| { - try writer.writeAll(",\"message\":\""); - try json_helpers.writeJsonEscaped(writer, message); - try writer.writeByte('"'); - } else { - try writer.writeAll(",\"message\":null"); - } + try writer.writeByte(','); + try json_helpers.writeNullableJsonStringField(writer, "source_release_id", report.source_release_id); + try writer.writeByte(','); + try json_helpers.writeNullableJsonStringField(writer, "message", report.message); try writer.writeByte('}'); return json_buf.toOwnedSlice(alloc); } +const RouteFlowHarness = struct { + alloc: std.mem.Allocator, + tmp: std.testing.TmpDir, + node: cluster_node.Node, + + fn init(alloc: std.mem.Allocator) !RouteFlowHarness { + var tmp = std.testing.tmpDir(.{}); + errdefer tmp.cleanup(); + + var path_buf: [512]u8 = undefined; + const tmp_path = tmp.dir.realpath(".", &path_buf) catch return error.SkipZigTest; + + var node = cluster_node.Node.init(alloc, .{ + .id = 1, + .port = 0, + .peers = &.{}, + .data_dir = tmp_path, + }) catch return error.SkipZigTest; + errdefer node.deinit(); + + node.raft.role = .leader; + node.leader_id = node.config.id; + + var harness = RouteFlowHarness{ + .alloc = alloc, + .tmp = tmp, + .node = node, + }; + try harness.seedActiveAgent(); + return harness; + } + + fn deinit(self: *RouteFlowHarness) void { + self.node.deinit(); + self.tmp.cleanup(); + } + + fn ctx(self: *RouteFlowHarness) RouteContext { + return .{ .cluster = &self.node, .join_token = null }; + } + + fn seedActiveAgent(self: *RouteFlowHarness) !void { + self.node.stateMachineDb().exec( + "INSERT INTO agents (id, address, status, cpu_cores, memory_mb, cpu_used, memory_used_mb, containers, last_heartbeat, registered_at, role, labels) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);", + .{}, + .{ "abc123def456", "10.0.0.2:7701", "active", @as(i64, 4), @as(i64, 8192), @as(i64, 0), @as(i64, 0), @as(i64, 0), @as(i64, 100), @as(i64, 100), "agent", "" }, + ) catch return error.SkipZigTest; + } + + fn appApply(self: *RouteFlowHarness, body: []const u8) Response { + return deploy_routes.handleAppApply(self.alloc, makeRequest(.POST, "/apps/apply", body), self.ctx()); + } + + fn rollback(self: *RouteFlowHarness, app_name: []const u8, release_id: []const u8) !Response { + const body = try std.fmt.allocPrint(self.alloc, "{{\"release_id\":\"{s}\"}}", .{release_id}); + defer self.alloc.free(body); + const path = try std.fmt.allocPrint(self.alloc, "/apps/{s}/rollback", .{app_name}); + defer self.alloc.free(path); + return handleAppRollback(self.alloc, app_name, makeRequest(.POST, path, body), self.ctx()); + } + + fn status(self: *RouteFlowHarness, app_name: []const u8) Response { + return handleAppStatus(self.alloc, app_name, self.ctx()); + } + + fn history(self: *RouteFlowHarness, app_name: []const u8) Response { + return handleAppHistory(self.alloc, app_name, self.ctx()); + } +}; + +fn makeRequest(method: http.Method, path: []const u8, body: []const u8) http.Request { + return .{ + .method = method, + .path = path, + .path_only = path, + .query = "", + .headers_raw = "", + .body = body, + .content_length = body.len, + }; +} + +fn freeResponse(alloc: std.mem.Allocator, response: Response) void { + if (response.allocated) alloc.free(response.body); +} + +fn expectJsonContains(json: []const u8, needle: []const u8) !void { + try std.testing.expect(std.mem.indexOf(u8, json, needle) != null); +} + +fn expectResponseOk(response: Response) !void { + try std.testing.expectEqual(http.StatusCode.ok, response.status); +} + test "formatAppHistoryResponse emits release records" { const alloc = std.testing.allocator; const deployments = [_]store.DeploymentRecord{ @@ -201,6 +284,8 @@ test "formatAppHistoryResponse emits release records" { try std.testing.expect(std.mem.indexOf(u8, json, "\"id\":\"dep-2\"") != null); try std.testing.expect(std.mem.indexOf(u8, json, "\"app\":\"demo-app\"") != null); + try std.testing.expect(std.mem.indexOf(u8, json, "\"trigger\":\"apply\"") != null); + try std.testing.expect(std.mem.indexOf(u8, json, "\"source_release_id\":null") != null); try std.testing.expect(std.mem.indexOf(u8, json, "\"message\":\"placement failed\"") != null); try std.testing.expect(std.mem.indexOf(u8, json, "\"message\":null") != null); } @@ -218,12 +303,231 @@ test "formatAppStatusResponse summarizes latest release" { .created_at = 200, }; - const json = try formatAppStatusResponse(alloc, latest, countServices(latest.config_snapshot)); + const json = try formatAppStatusResponse(alloc, apply_release.reportFromDeployment(latest)); defer alloc.free(json); try std.testing.expect(std.mem.indexOf(u8, json, "\"app_name\":\"demo-app\"") != null); + try std.testing.expect(std.mem.indexOf(u8, json, "\"trigger\":\"apply\"") != null); try std.testing.expect(std.mem.indexOf(u8, json, "\"release_id\":\"dep-2\"") != null); try std.testing.expect(std.mem.indexOf(u8, json, "\"service_count\":2") != null); + try std.testing.expect(std.mem.indexOf(u8, json, "\"source_release_id\":null") != null); +} + +test "formatAppStatusResponse includes rollback metadata inferred from stored release message" { + const alloc = std.testing.allocator; + const latest = store.DeploymentRecord{ + .id = "dep-3", + .app_name = "demo-app", + .service_name = "demo-app", + .manifest_hash = "sha256:333", + .config_snapshot = "{\"app_name\":\"demo-app\",\"services\":[{\"name\":\"web\"}]}", + .status = "completed", + .message = "rollback to dep-1 completed: all placements succeeded", + .created_at = 300, + }; + + const json = try formatAppStatusResponse(alloc, apply_release.reportFromDeployment(latest)); + defer alloc.free(json); + + try std.testing.expect(std.mem.indexOf(u8, json, "\"trigger\":\"rollback\"") != null); + try std.testing.expect(std.mem.indexOf(u8, json, "\"source_release_id\":\"dep-1\"") != null); +} + +test "app status and history surface rollback release metadata from persisted rows" { + const alloc = std.testing.allocator; + + var db = try sqlite.Db.init(.{ .mode = .Memory, .open_flags = .{ .write = true } }); + defer db.deinit(); + try schema.init(&db); + + try store.saveDeploymentInDb(&db, .{ + .id = "dep-1", + .app_name = "demo-app", + .service_name = "demo-app", + .manifest_hash = "sha256:111", + .config_snapshot = "{\"app_name\":\"demo-app\",\"services\":[{\"name\":\"web\"}]}", + .status = "completed", + .message = "apply completed", + .created_at = 100, + }); + try store.saveDeploymentInDb(&db, .{ + .id = "dep-2", + .app_name = "demo-app", + .service_name = "demo-app", + .manifest_hash = "sha256:222", + .config_snapshot = "{\"app_name\":\"demo-app\",\"services\":[{\"name\":\"web\"}]}", + .status = "completed", + .message = "rollback to dep-1 completed: all placements succeeded", + .created_at = 200, + }); + + var deployments = try store.listDeploymentsByAppInDb(&db, alloc, "demo-app"); + defer { + for (deployments.items) |dep| dep.deinit(alloc); + deployments.deinit(alloc); + } + + const history_json = try formatAppHistoryResponse(alloc, deployments.items); + defer alloc.free(history_json); + + try std.testing.expect(std.mem.indexOf(u8, history_json, "\"id\":\"dep-2\"") != null); + try std.testing.expect(std.mem.indexOf(u8, history_json, "\"trigger\":\"rollback\"") != null); + try std.testing.expect(std.mem.indexOf(u8, history_json, "\"source_release_id\":\"dep-1\"") != null); + try std.testing.expect(std.mem.indexOf(u8, history_json, "\"id\":\"dep-1\"") != null); + try std.testing.expect(std.mem.indexOf(u8, history_json, "\"trigger\":\"apply\"") != null); + + const latest = try store.getLatestDeploymentByAppInDb(&db, alloc, "demo-app"); + defer latest.deinit(alloc); + + const status_json = try formatAppStatusResponse(alloc, apply_release.reportFromDeployment(latest)); + defer alloc.free(status_json); + + try std.testing.expect(std.mem.indexOf(u8, status_json, "\"release_id\":\"dep-2\"") != null); + try std.testing.expect(std.mem.indexOf(u8, status_json, "\"trigger\":\"rollback\"") != null); + try std.testing.expect(std.mem.indexOf(u8, status_json, "\"source_release_id\":\"dep-1\"") != null); +} + +test "app status and history surface failed apply metadata from persisted rows" { + const alloc = std.testing.allocator; + + var db = try sqlite.Db.init(.{ .mode = .Memory, .open_flags = .{ .write = true } }); + defer db.deinit(); + try schema.init(&db); + + try store.saveDeploymentInDb(&db, .{ + .id = "dep-1", + .app_name = "demo-app", + .service_name = "demo-app", + .manifest_hash = "sha256:111", + .config_snapshot = "{\"app_name\":\"demo-app\",\"services\":[{\"name\":\"web\"}]}", + .status = "completed", + .message = "apply completed", + .created_at = 100, + }); + try store.saveDeploymentInDb(&db, .{ + .id = "dep-2", + .app_name = "demo-app", + .service_name = "demo-app", + .manifest_hash = "sha256:222", + .config_snapshot = "{\"app_name\":\"demo-app\",\"services\":[{\"name\":\"web\"},{\"name\":\"db\"}]}", + .status = "failed", + .message = "scheduler error during apply", + .created_at = 200, + }); + + var deployments = try store.listDeploymentsByAppInDb(&db, alloc, "demo-app"); + defer { + for (deployments.items) |dep| dep.deinit(alloc); + deployments.deinit(alloc); + } + + const history_json = try formatAppHistoryResponse(alloc, deployments.items); + defer alloc.free(history_json); + + try std.testing.expect(std.mem.indexOf(u8, history_json, "\"id\":\"dep-2\"") != null); + try std.testing.expect(std.mem.indexOf(u8, history_json, "\"trigger\":\"apply\"") != null); + try std.testing.expect(std.mem.indexOf(u8, history_json, "\"status\":\"failed\"") != null); + try std.testing.expect(std.mem.indexOf(u8, history_json, "\"source_release_id\":null") != null); + try std.testing.expect(std.mem.indexOf(u8, history_json, "\"message\":\"scheduler error during apply\"") != null); + + const latest = try store.getLatestDeploymentByAppInDb(&db, alloc, "demo-app"); + defer latest.deinit(alloc); + + const status_json = try formatAppStatusResponse(alloc, apply_release.reportFromDeployment(latest)); + defer alloc.free(status_json); + + try std.testing.expect(std.mem.indexOf(u8, status_json, "\"release_id\":\"dep-2\"") != null); + try std.testing.expect(std.mem.indexOf(u8, status_json, "\"trigger\":\"apply\"") != null); + try std.testing.expect(std.mem.indexOf(u8, status_json, "\"status\":\"failed\"") != null); + try std.testing.expect(std.mem.indexOf(u8, status_json, "\"service_count\":2") != null); + try std.testing.expect(std.mem.indexOf(u8, status_json, "\"source_release_id\":null") != null); + try std.testing.expect(std.mem.indexOf(u8, status_json, "\"message\":\"scheduler error during apply\"") != null); +} + +test "app apply then rollback routes preserve release transition metadata" { + const alloc = std.testing.allocator; + const apply_body = + \\{"app_name":"demo-app","services":[{"name":"web","image":"alpine","command":["echo","hello"]}]} + ; + + var harness = try RouteFlowHarness.init(alloc); + defer harness.deinit(); + + const apply_response = harness.appApply(apply_body); + defer freeResponse(alloc, apply_response); + + try expectResponseOk(apply_response); + try expectJsonContains(apply_response.body, "\"trigger\":\"apply\""); + + const source_release_id = json_helpers.extractJsonString(apply_response.body, "release_id").?; + + const rollback_response = try harness.rollback("demo-app", source_release_id); + defer freeResponse(alloc, rollback_response); + + try expectResponseOk(rollback_response); + try expectJsonContains(rollback_response.body, "\"trigger\":\"rollback\""); + try expectJsonContains(rollback_response.body, "\"source_release_id\":\""); + try expectJsonContains(rollback_response.body, source_release_id); + + const status_response = harness.status("demo-app"); + defer freeResponse(alloc, status_response); + + try expectResponseOk(status_response); + try expectJsonContains(status_response.body, "\"trigger\":\"rollback\""); + try expectJsonContains(status_response.body, "\"source_release_id\":\""); + try expectJsonContains(status_response.body, source_release_id); + + const history_response = harness.history("demo-app"); + defer freeResponse(alloc, history_response); + + try expectResponseOk(history_response); + try expectJsonContains(history_response.body, "\"trigger\":\"rollback\""); + try expectJsonContains(history_response.body, "\"source_release_id\":\""); + try expectJsonContains(history_response.body, source_release_id); +} + +test "app apply route preserves failed release metadata across reads" { + const alloc = std.testing.allocator; + const apply_body = + \\{"app_name":"demo-app","services":[{"name":"web","image":"alpine","command":["echo","hello"],"cpu_limit":999999,"memory_limit_mb":999999}]} + ; + + var harness = try RouteFlowHarness.init(alloc); + defer harness.deinit(); + + const apply_response = harness.appApply(apply_body); + defer freeResponse(alloc, apply_response); + + try expectResponseOk(apply_response); + try expectJsonContains(apply_response.body, "\"trigger\":\"apply\""); + try expectJsonContains(apply_response.body, "\"status\":\"failed\""); + try expectJsonContains(apply_response.body, "\"failed\":1"); + try expectJsonContains(apply_response.body, "\"source_release_id\":null"); + try expectJsonContains(apply_response.body, "\"message\":\"one or more placements failed\""); + + const release_id = json_helpers.extractJsonString(apply_response.body, "release_id").?; + + const status_response = harness.status("demo-app"); + defer freeResponse(alloc, status_response); + + try expectResponseOk(status_response); + try expectJsonContains(status_response.body, "\"release_id\":\""); + try expectJsonContains(status_response.body, release_id); + try expectJsonContains(status_response.body, "\"trigger\":\"apply\""); + try expectJsonContains(status_response.body, "\"status\":\"failed\""); + try expectJsonContains(status_response.body, "\"source_release_id\":null"); + try expectJsonContains(status_response.body, "\"message\":\"one or more placements failed\""); + + const history_response = harness.history("demo-app"); + defer freeResponse(alloc, history_response); + + try expectResponseOk(history_response); + try expectJsonContains(history_response.body, "\"id\":\""); + try expectJsonContains(history_response.body, release_id); + try expectJsonContains(history_response.body, "\"trigger\":\"apply\""); + try expectJsonContains(history_response.body, "\"status\":\"failed\""); + try expectJsonContains(history_response.body, "\"source_release_id\":null"); + try expectJsonContains(history_response.body, "\"message\":\"one or more placements failed\""); } test "route rejects app rollback without cluster" { diff --git a/src/api/routes/cluster_agents/apply_request.zig b/src/api/routes/cluster_agents/apply_request.zig new file mode 100644 index 0000000..bda0730 --- /dev/null +++ b/src/api/routes/cluster_agents/apply_request.zig @@ -0,0 +1,156 @@ +const std = @import("std"); +const scheduler = @import("../../../cluster/scheduler.zig"); +const volumes_mod = @import("../../../state/volumes.zig"); +const json_helpers = @import("../../../lib/json_helpers.zig"); +const common = @import("../common.zig"); + +const extractJsonString = json_helpers.extractJsonString; +const extractJsonInt = json_helpers.extractJsonInt; +const extractJsonArray = json_helpers.extractJsonArray; + +pub const ApplyRequest = struct { + app_name: ?[]const u8, + requests: std.ArrayListUnmanaged(scheduler.PlacementRequest) = .empty, + + pub fn deinit(self: *ApplyRequest, alloc: std.mem.Allocator) void { + for (self.requests.items) |req| alloc.free(req.command); + self.requests.deinit(alloc); + } + + pub fn setVolumeConstraints(self: *ApplyRequest, constraints: []const volumes_mod.VolumeConstraint) void { + if (constraints.len == 0) return; + for (self.requests.items) |*req| { + req.volume_constraints = constraints; + } + } +}; + +pub const ParseError = error{ + MissingAppName, + MissingServicesArray, + NoServices, + OutOfMemory, + InvalidRequest, +}; + +pub fn parse(alloc: std.mem.Allocator, body: []const u8, require_app_name: bool) ParseError!ApplyRequest { + var parsed: ApplyRequest = .{ + .app_name = extractJsonString(body, "app_name") orelse extractJsonString(body, "volume_app"), + }; + errdefer parsed.deinit(alloc); + + if (require_app_name and parsed.app_name == null) { + return ParseError.MissingAppName; + } + + const services_json = extractJsonArray(body, "services") orelse return ParseError.MissingServicesArray; + + var iter = json_helpers.extractJsonObjects(services_json); + while (iter.next()) |block| { + const image = extractJsonString(block, "image") orelse continue; + const command = extractCommandString(alloc, block) catch return ParseError.OutOfMemory; + errdefer alloc.free(command); + + if (!common.validateClusterInput(image)) { + alloc.free(command); + continue; + } + if (command.len > 0 and !common.validateClusterInput(command)) { + alloc.free(command); + continue; + } + + parsed.requests.append(alloc, .{ + .image = image, + .command = command, + .cpu_limit = extractJsonInt(block, "cpu_limit") orelse 1000, + .memory_limit_mb = extractJsonInt(block, "memory_limit_mb") orelse 256, + .gpu_limit = extractJsonInt(block, "gpu_limit") orelse 0, + .gpu_model = extractJsonString(block, "gpu_model"), + .gpu_vram_min_mb = if (extractJsonInt(block, "gpu_vram_min_mb")) |v| @as(u64, @intCast(@max(0, v))) else null, + .required_labels = extractJsonString(block, "required_labels") orelse "", + .gang_world_size = if (extractJsonInt(block, "gang_world_size")) |v| @intCast(@max(0, v)) else 0, + .gpus_per_rank = if (extractJsonInt(block, "gpus_per_rank")) |v| @intCast(@max(1, v)) else 1, + }) catch { + alloc.free(command); + return ParseError.OutOfMemory; + }; + } + + if (parsed.requests.items.len == 0) return ParseError.NoServices; + return parsed; +} + +fn extractJsonStringArray(alloc: std.mem.Allocator, json: []const u8, key: []const u8) !?[]u8 { + const array_json = extractJsonArray(json, key) orelse return null; + if (array_json.len < 2) return null; + + var out: std.ArrayList(u8) = .empty; + errdefer out.deinit(alloc); + + var pos: usize = 1; + var first = true; + while (pos < array_json.len - 1) { + while (pos < array_json.len - 1 and (array_json[pos] == ' ' or array_json[pos] == '\n' or array_json[pos] == '\r' or array_json[pos] == '\t' or array_json[pos] == ',')) : (pos += 1) {} + if (pos >= array_json.len - 1) break; + if (array_json[pos] != '"') return ParseError.InvalidRequest; + pos += 1; + const start = pos; + + while (pos < array_json.len - 1) : (pos += 1) { + if (array_json[pos] == '\\') { + pos += 1; + if (pos >= array_json.len - 1) return ParseError.InvalidRequest; + continue; + } + if (array_json[pos] == '"') break; + } + if (pos >= array_json.len - 1) return ParseError.InvalidRequest; + + if (!first) try out.append(alloc, ' '); + first = false; + try out.appendSlice(alloc, array_json[start..pos]); + pos += 1; + } + + return try out.toOwnedSlice(alloc); +} + +fn extractCommandString(alloc: std.mem.Allocator, block: []const u8) ![]const u8 { + if (extractJsonString(block, "command")) |command| { + return alloc.dupe(u8, command); + } + if (try extractJsonStringArray(alloc, block, "command")) |joined| { + defer alloc.free(joined); + return alloc.dupe(u8, joined); + } + return alloc.dupe(u8, ""); +} + +test "parse finds services array regardless of field order" { + const alloc = std.testing.allocator; + const json = + \\{"services":[{"name":"svc-a","image":"alpine","gpu":{"devices":["../../dev/sda"]}},{"image":"busybox","name":"svc-b"}]} + ; + + var parsed = try parse(alloc, json, false); + defer parsed.deinit(alloc); + + try std.testing.expectEqual(@as(usize, 2), parsed.requests.items.len); + try std.testing.expectEqualStrings("alpine", parsed.requests.items[0].image); + try std.testing.expectEqualStrings("busybox", parsed.requests.items[1].image); +} + +test "parse joins structured command arrays" { + const alloc = std.testing.allocator; + const json = + \\{"app_name":"demo-app","services":[{"name":"web","image":"nginx","command":["nginx","-g","daemon off"]}]} + ; + + var parsed = try parse(alloc, json, true); + defer parsed.deinit(alloc); + + try std.testing.expectEqualStrings("demo-app", parsed.app_name.?); + try std.testing.expectEqual(@as(usize, 1), parsed.requests.items.len); + try std.testing.expectEqualStrings("nginx -g daemon off", parsed.requests.items[0].command); +} diff --git a/src/api/routes/cluster_agents/deploy_routes.zig b/src/api/routes/cluster_agents/deploy_routes.zig index 74c4168..ef5cf76 100644 --- a/src/api/routes/cluster_agents/deploy_routes.zig +++ b/src/api/routes/cluster_agents/deploy_routes.zig @@ -1,6 +1,10 @@ const std = @import("std"); +const sqlite = @import("sqlite"); const scheduler = @import("../../../cluster/scheduler.zig"); +const cluster_node = @import("../../../cluster/node.zig"); const json_helpers = @import("../../../lib/json_helpers.zig"); +const apply_release = @import("../../../manifest/apply_release.zig"); +const apply_request = @import("apply_request.zig"); const volumes_mod = @import("../../../state/volumes.zig"); const agent_registry = @import("../../../cluster/registry.zig"); const deployment_store = @import("../../../manifest/update/deployment_store.zig"); @@ -8,393 +12,329 @@ const common = @import("../common.zig"); const Response = common.Response; const RouteContext = common.RouteContext; -const extractJsonString = json_helpers.extractJsonString; -const extractJsonInt = json_helpers.extractJsonInt; -const extractJsonArray = json_helpers.extractJsonArray; const ResponseMode = enum { legacy, app, }; -fn extractJsonStringArray(alloc: std.mem.Allocator, json: []const u8, key: []const u8) !?[]u8 { - const array_json = extractJsonArray(json, key) orelse return null; - if (array_json.len < 2) return null; - - var out: std.ArrayList(u8) = .empty; - errdefer out.deinit(alloc); - - var pos: usize = 1; - var first = true; - while (pos < array_json.len - 1) { - while (pos < array_json.len - 1 and (array_json[pos] == ' ' or array_json[pos] == '\n' or array_json[pos] == '\r' or array_json[pos] == '\t' or array_json[pos] == ',')) : (pos += 1) {} - if (pos >= array_json.len - 1) break; - if (array_json[pos] != '"') return null; - pos += 1; - const start = pos; - - while (pos < array_json.len - 1) : (pos += 1) { - if (array_json[pos] == '\\') { - pos += 1; - if (pos >= array_json.len - 1) return null; - continue; - } - if (array_json[pos] == '"') break; - } - if (pos >= array_json.len - 1) return null; +const ClusterApplyError = error{ + NotLeader, + InternalError, +}; - if (!first) try out.append(alloc, ' '); - first = false; - try out.appendSlice(alloc, array_json[start..pos]); - pos += 1; - } +const ClusterReleaseTracker = struct { + alloc: std.mem.Allocator, + db: *sqlite.Db, + app_name: ?[]const u8, + config_snapshot: []const u8, + context: apply_release.ApplyContext = .{}, - return try out.toOwnedSlice(alloc); -} + pub fn begin(self: *const ClusterReleaseTracker) !?[]const u8 { + const name = self.app_name orelse return null; + const manifest_hash = deployment_store.computeManifestHash(self.alloc, self.config_snapshot) catch return ClusterApplyError.InternalError; + defer self.alloc.free(manifest_hash); -fn extractCommandString(alloc: std.mem.Allocator, block: []const u8) ![]const u8 { - if (extractJsonString(block, "command")) |command| { - return alloc.dupe(u8, command); - } - if (try extractJsonStringArray(alloc, block, "command")) |joined| { - defer alloc.free(joined); - return alloc.dupe(u8, joined); - } - return alloc.dupe(u8, ""); -} - -fn handleApply( - alloc: std.mem.Allocator, - request: @import("../../http.zig").Request, - ctx: RouteContext, - response_mode: ResponseMode, -) Response { - const node = ctx.cluster orelse return common.badRequest("not running in cluster mode"); - if (request.body.len == 0) return common.badRequest("missing request body"); + const id = deployment_store.generateDeploymentId(self.alloc) catch return ClusterApplyError.InternalError; + errdefer self.alloc.free(id); - var requests: std.ArrayListUnmanaged(scheduler.PlacementRequest) = .empty; - defer { - for (requests.items) |req| alloc.free(req.command); - requests.deinit(alloc); + deployment_store.recordDeploymentInDb( + self.db, + id, + name, + name, + manifest_hash, + self.config_snapshot, + .in_progress, + null, + ) catch return ClusterApplyError.InternalError; + + return id; } - const app_name = extractJsonString(request.body, "app_name") orelse extractJsonString(request.body, "volume_app"); - if (response_mode == .app and app_name == null) { - return common.badRequest("missing app_name"); + pub fn mark(self: *const ClusterReleaseTracker, id: []const u8, status: @import("../../../manifest/update/common.zig").DeploymentStatus, message: ?[]const u8) !void { + const resolved_message = apply_release.materializeMessage(self.alloc, self.context, status, message) catch return ClusterApplyError.InternalError; + defer if (resolved_message) |msg| self.alloc.free(msg); + deployment_store.updateDeploymentStatusInDb(self.db, id, status, resolved_message) catch return ClusterApplyError.InternalError; } - const services_json = extractJsonArray(request.body, "services") orelse - return common.badRequest("missing services array"); - var iter = json_helpers.extractJsonObjects(services_json); - while (iter.next()) |block| { - const image = extractJsonString(block, "image") orelse { - continue; - }; - const command = extractCommandString(alloc, block) catch return common.internalError(); - errdefer alloc.free(command); - const cpu_limit = extractJsonInt(block, "cpu_limit") orelse 1000; - const memory_limit_mb = extractJsonInt(block, "memory_limit_mb") orelse 256; - const gpu_limit = extractJsonInt(block, "gpu_limit") orelse 0; - const gpu_model = json_helpers.extractJsonString(block, "gpu_model"); - const gpu_vram_min = extractJsonInt(block, "gpu_vram_min_mb"); - const required_labels = extractJsonString(block, "required_labels") orelse ""; - const gang_world_size_val = extractJsonInt(block, "gang_world_size"); - const gpus_per_rank_val = extractJsonInt(block, "gpus_per_rank"); - - if (!common.validateClusterInput(image)) { - alloc.free(command); - continue; - } - if (command.len > 0 and !common.validateClusterInput(command)) { - alloc.free(command); - continue; - } - - requests.append(alloc, .{ - .image = image, - .command = command, - .cpu_limit = cpu_limit, - .memory_limit_mb = memory_limit_mb, - .gpu_limit = gpu_limit, - .gpu_model = gpu_model, - .gpu_vram_min_mb = if (gpu_vram_min) |v| @as(u64, @intCast(@max(0, v))) else null, - .required_labels = required_labels, - .gang_world_size = if (gang_world_size_val) |v| @intCast(@max(0, v)) else 0, - .gpus_per_rank = if (gpus_per_rank_val) |v| @intCast(@max(1, v)) else 1, - }) catch { - alloc.free(command); - return common.internalError(); - }; + pub fn freeReleaseId(self: *const ClusterReleaseTracker, id: []const u8) void { + self.alloc.free(id); } +}; - if (requests.items.len == 0) return common.badRequest("no services to deploy"); - - const db = node.stateMachineDb(); - - const vol_constraints = if (app_name) |name| - volumes_mod.getVolumesByApp(alloc, db, name) catch &[_]volumes_mod.VolumeConstraint{} - else - &[_]volumes_mod.VolumeConstraint{}; - defer if (app_name != null) alloc.free(vol_constraints); - - if (vol_constraints.len > 0) { - for (requests.items) |*req| { - req.volume_constraints = vol_constraints; - } - } +const ClusterApplyBackend = struct { + alloc: std.mem.Allocator, + node: *cluster_node.Node, + requests: []scheduler.PlacementRequest, + agents: []agent_registry.AgentRecord, - const agents = agent_registry.listAgents(alloc, db) catch return common.internalError(); - defer { - for (agents) |a| a.deinit(alloc); - alloc.free(agents); - } + pub fn apply(self: *const ClusterApplyBackend) ClusterApplyError!apply_release.ApplyOutcome { + var placed: usize = 0; + var failed: usize = 0; - if (agents.len == 0) { - return .{ .status = .bad_request, .body = "{\"error\":\"no agents available\"}", .allocated = false }; - } + for (self.requests) |req| { + if (req.gang_world_size > 0) { + const gang_placements = scheduler.scheduleGang(self.alloc, req, self.agents) catch { + failed += 1; + continue; + }; - var release_id: ?[]const u8 = null; - defer if (release_id) |id| alloc.free(id); - if (app_name) |name| { - const manifest_hash = deployment_store.computeManifestHash(alloc, request.body) catch return common.internalError(); - defer alloc.free(manifest_hash); + if (gang_placements) |gps| { + defer self.alloc.free(gps); + + var gang_ok = true; + for (gps) |gp| { + var id_buf: [12]u8 = undefined; + scheduler.generateAssignmentId(&id_buf); + + var sql_buf: [2048]u8 = undefined; + const sql = scheduler.assignmentSqlGang( + &sql_buf, + &id_buf, + gp.agent_id, + req, + std.time.timestamp(), + gp, + ) catch { + gang_ok = false; + break; + }; + + _ = self.node.propose(sql) catch return ClusterApplyError.NotLeader; + } - release_id = recordClusterReleaseStart(alloc, db, name, manifest_hash, request.body) catch return common.internalError(); - } + if (gang_ok) { + placed += gps.len; + } else { + failed += req.gang_world_size; + } + } else { + failed += req.gang_world_size; + } + } + } - var placed: usize = 0; - var failed: usize = 0; + var normal_requests: std.ArrayListUnmanaged(scheduler.PlacementRequest) = .empty; + defer normal_requests.deinit(self.alloc); + for (self.requests) |req| { + if (req.gang_world_size == 0) { + normal_requests.append(self.alloc, req) catch { + failed += 1; + continue; + }; + } + } - for (requests.items) |req| { - if (req.gang_world_size > 0) { - const gang_placements = scheduler.scheduleGang(alloc, req, agents) catch { - failed += 1; - continue; + if (normal_requests.items.len > 0) { + const placements = scheduler.schedule(self.alloc, normal_requests.items, self.agents) catch { + return ClusterApplyError.InternalError; }; + defer self.alloc.free(placements); - if (gang_placements) |gps| { - defer alloc.free(gps); - - var gang_ok = true; - for (gps) |gp| { + for (placements) |maybe_placement| { + if (maybe_placement) |placement| { var id_buf: [12]u8 = undefined; scheduler.generateAssignmentId(&id_buf); - var sql_buf: [2048]u8 = undefined; - const sql = scheduler.assignmentSqlGang( + var sql_buf: [1024]u8 = undefined; + const sql = scheduler.assignmentSql( &sql_buf, &id_buf, - gp.agent_id, - req, + placement.agent_id, + normal_requests.items[placement.request_idx], std.time.timestamp(), - gp, ) catch { - gang_ok = false; - break; - }; - - _ = node.propose(sql) catch { - if (release_id) |id| { - deployment_store.updateDeploymentStatusInDb(db, id, .failed, "leadership changed during apply") catch {}; - } - return common.notLeader(alloc, node); + failed += 1; + continue; }; - } - if (gang_ok) { - placed += gps.len; + _ = self.node.propose(sql) catch return ClusterApplyError.NotLeader; + placed += 1; } else { - failed += req.gang_world_size; + failed += 1; } - } else { - failed += req.gang_world_size; } } - } - var normal_requests: std.ArrayListUnmanaged(scheduler.PlacementRequest) = .empty; - defer normal_requests.deinit(alloc); - for (requests.items) |req| { - if (req.gang_world_size == 0) { - normal_requests.append(alloc, req) catch { - failed += 1; - continue; - }; - } + return .{ + .status = if (failed == 0) .completed else .failed, + .message = if (failed == 0) "all placements succeeded" else "one or more placements failed", + .placed = placed, + .failed = failed, + }; } - if (normal_requests.items.len > 0) { - const placements = scheduler.schedule(alloc, normal_requests.items, agents) catch { - if (release_id) |id| { - deployment_store.updateDeploymentStatusInDb(db, id, .failed, "scheduler error during apply") catch {}; - } - return common.internalError(); + pub fn failureMessage(_: *const ClusterApplyBackend, err: ClusterApplyError) ?[]const u8 { + return switch (err) { + error.NotLeader => "leadership changed during apply", + error.InternalError => "scheduler error during apply", }; - defer alloc.free(placements); - - for (placements) |maybe_placement| { - if (maybe_placement) |placement| { - var id_buf: [12]u8 = undefined; - scheduler.generateAssignmentId(&id_buf); - - var sql_buf: [1024]u8 = undefined; - const sql = scheduler.assignmentSql( - &sql_buf, - &id_buf, - placement.agent_id, - normal_requests.items[placement.request_idx], - std.time.timestamp(), - ) catch { - failed += 1; - continue; - }; + } +}; - _ = node.propose(sql) catch { - if (release_id) |id| { - deployment_store.updateDeploymentStatusInDb(db, id, .failed, "leadership changed during apply") catch {}; - } - return common.notLeader(alloc, node); - }; - placed += 1; - } else { - failed += 1; - } - } +fn handleApply( + alloc: std.mem.Allocator, + request: @import("../../http.zig").Request, + ctx: RouteContext, + response_mode: ResponseMode, + apply_context: apply_release.ApplyContext, +) Response { + const node = ctx.cluster orelse return common.badRequest("not running in cluster mode"); + if (request.body.len == 0) return common.badRequest("missing request body"); + + var parsed = apply_request.parse(alloc, request.body, response_mode == .app) catch |err| return switch (err) { + apply_request.ParseError.MissingAppName => common.badRequest("missing app_name"), + apply_request.ParseError.MissingServicesArray => common.badRequest("missing services array"), + apply_request.ParseError.NoServices => common.badRequest("no services to deploy"), + apply_request.ParseError.OutOfMemory => common.internalError(), + apply_request.ParseError.InvalidRequest => common.badRequest("invalid request body"), + }; + defer parsed.deinit(alloc); + + const db = node.stateMachineDb(); + + const vol_constraints = if (parsed.app_name) |name| + volumes_mod.getVolumesByApp(alloc, db, name) catch &[_]volumes_mod.VolumeConstraint{} + else + &[_]volumes_mod.VolumeConstraint{}; + defer if (parsed.app_name != null) alloc.free(vol_constraints); + + parsed.setVolumeConstraints(vol_constraints); + + const agents = agent_registry.listAgents(alloc, db) catch return common.internalError(); + defer { + for (agents) |a| a.deinit(alloc); + alloc.free(agents); } - const status = if (failed == 0) "completed" else "failed"; - if (release_id) |id| { - const message: ?[]const u8 = if (failed == 0) null else "one or more placements failed"; - deployment_store.updateDeploymentStatusInDb( - db, - id, - if (failed == 0) .completed else .failed, - message, - ) catch return common.internalError(); + if (agents.len == 0) { + return .{ .status = .bad_request, .body = "{\"error\":\"no agents available\"}", .allocated = false }; } + var tracker = ClusterReleaseTracker{ + .alloc = alloc, + .db = db, + .app_name = parsed.app_name, + .config_snapshot = request.body, + .context = apply_context, + }; + var backend = ClusterApplyBackend{ + .alloc = alloc, + .node = node, + .requests = parsed.requests.items, + .agents = agents, + }; + const apply_result = apply_release.execute(&tracker, &backend) catch |err| switch (err) { + ClusterApplyError.NotLeader => return common.notLeader(alloc, node), + ClusterApplyError.InternalError => return common.internalError(), + }; + const apply_report = apply_result.toReport(parsed.app_name orelse "", parsed.requests.items.len, apply_context); + defer apply_report.deinit(alloc); + const body = switch (response_mode) { - .legacy => formatLegacyApplyResponse(alloc, placed, failed) catch return common.internalError(), - .app => formatAppApplyResponse( - alloc, - app_name.?, - release_id orelse "", - status, - requests.items.len, - placed, - failed, - ) catch return common.internalError(), + .legacy => formatLegacyApplyResponse(alloc, apply_report.placed, apply_report.failed) catch return common.internalError(), + .app => formatAppApplyResponse(alloc, apply_report) catch return common.internalError(), }; return .{ .status = .ok, .body = body, .allocated = true }; } pub fn handleAppApply(alloc: std.mem.Allocator, request: @import("../../http.zig").Request, ctx: RouteContext) Response { - return handleApply(alloc, request, ctx, .app); + return handleApply(alloc, request, ctx, .app, .{}); } pub fn handleDeploy(alloc: std.mem.Allocator, request: @import("../../http.zig").Request, ctx: RouteContext) Response { - return handleApply(alloc, request, ctx, .legacy); + return handleApply(alloc, request, ctx, .legacy, .{}); } -fn recordClusterReleaseStart( +pub fn handleAppRollbackApply( alloc: std.mem.Allocator, - db: anytype, - app_name: []const u8, - manifest_hash: []const u8, - config_snapshot: []const u8, -) ![]const u8 { - const id = try deployment_store.generateDeploymentId(alloc); - errdefer alloc.free(id); - - try deployment_store.recordDeploymentInDb( - db, - id, - app_name, - app_name, - manifest_hash, - config_snapshot, - .in_progress, - null, - ); - - return id; + request: @import("../../http.zig").Request, + ctx: RouteContext, + source_release_id: []const u8, +) Response { + return handleApply(alloc, request, ctx, .app, .{ + .trigger = .rollback, + .source_release_id = source_release_id, + }); } fn formatLegacyApplyResponse(alloc: std.mem.Allocator, placed: usize, failed: usize) ![]u8 { return std.fmt.allocPrint(alloc, "{{\"placed\":{d},\"failed\":{d}}}", .{ placed, failed }); } -fn formatAppApplyResponse( - alloc: std.mem.Allocator, - app_name: []const u8, - release_id: []const u8, - status: []const u8, - service_count: usize, - placed: usize, - failed: usize, -) ![]u8 { +fn formatAppApplyResponse(alloc: std.mem.Allocator, report: apply_release.ApplyReport) ![]u8 { var json_buf: std.ArrayList(u8) = .empty; errdefer json_buf.deinit(alloc); const writer = json_buf.writer(alloc); - try writer.writeAll("{\"app_name\":\""); - try json_helpers.writeJsonEscaped(writer, app_name); - try writer.writeAll("\",\"release_id\":\""); - try json_helpers.writeJsonEscaped(writer, release_id); - try writer.writeAll("\",\"status\":\""); - try json_helpers.writeJsonEscaped(writer, status); - try writer.print("\",\"service_count\":{d},\"placed\":{d},\"failed\":{d}", .{ - service_count, - placed, - failed, + try writer.writeByte('{'); + try json_helpers.writeJsonStringField(writer, "app_name", report.app_name); + try writer.writeByte(','); + try json_helpers.writeJsonStringField(writer, "trigger", report.trigger.toString()); + try writer.writeByte(','); + try json_helpers.writeJsonStringField(writer, "release_id", report.release_id orelse ""); + try writer.writeByte(','); + try json_helpers.writeJsonStringField(writer, "status", report.status.toString()); + try writer.print(",\"service_count\":{d},\"placed\":{d},\"failed\":{d}", .{ + report.service_count, + report.placed, + report.failed, }); - try writer.writeByte('}'); - - return json_buf.toOwnedSlice(alloc); -} - -test "extractJsonArray finds services array regardless of field order" { - const json = - \\{"services":[{"name":"svc-a","image":"alpine","gpu":{"devices":["../../dev/sda"]}},{"image":"busybox","name":"svc-b"}]} - ; - - const services = extractJsonArray(json, "services").?; - var iter = json_helpers.extractJsonObjects(services); + try writer.writeByte(','); + try json_helpers.writeNullableJsonStringField(writer, "source_release_id", report.source_release_id); - const first = iter.next().?; - try std.testing.expectEqualStrings("svc-a", extractJsonString(first, "name").?); - try std.testing.expectEqualStrings("alpine", extractJsonString(first, "image").?); + const resolved_message = try report.resolvedMessage(alloc); + defer if (resolved_message) |message| alloc.free(message); - const second = iter.next().?; - try std.testing.expectEqualStrings("svc-b", extractJsonString(second, "name").?); - try std.testing.expectEqualStrings("busybox", extractJsonString(second, "image").?); - - try std.testing.expect(iter.next() == null); -} - -test "extractCommandString joins structured command arrays" { - const alloc = std.testing.allocator; - const block = - \\{"name":"web","image":"nginx","command":["nginx","-g","daemon off;"]} - ; - - const command = try extractCommandString(alloc, block); - defer alloc.free(command); + try writer.writeByte(','); + try json_helpers.writeNullableJsonStringField(writer, "message", resolved_message); + try writer.writeByte('}'); - try std.testing.expectEqualStrings("nginx -g daemon off;", command); + return json_buf.toOwnedSlice(alloc); } test "formatAppApplyResponse includes app release metadata" { const alloc = std.testing.allocator; - const json = try formatAppApplyResponse(alloc, "demo-app", "abc123def456", "completed", 2, 2, 0); + const json = try formatAppApplyResponse(alloc, .{ + .app_name = "demo-app", + .release_id = "abc123def456", + .status = .completed, + .service_count = 2, + .placed = 2, + .failed = 0, + }); defer alloc.free(json); try std.testing.expect(std.mem.indexOf(u8, json, "\"app_name\":\"demo-app\"") != null); + try std.testing.expect(std.mem.indexOf(u8, json, "\"trigger\":\"apply\"") != null); try std.testing.expect(std.mem.indexOf(u8, json, "\"release_id\":\"abc123def456\"") != null); try std.testing.expect(std.mem.indexOf(u8, json, "\"status\":\"completed\"") != null); try std.testing.expect(std.mem.indexOf(u8, json, "\"service_count\":2") != null); try std.testing.expect(std.mem.indexOf(u8, json, "\"placed\":2") != null); try std.testing.expect(std.mem.indexOf(u8, json, "\"failed\":0") != null); + try std.testing.expect(std.mem.indexOf(u8, json, "\"source_release_id\":null") != null); + try std.testing.expect(std.mem.indexOf(u8, json, "\"message\":\"apply completed\"") != null); +} + +test "formatAppApplyResponse includes rollback trigger metadata" { + const alloc = std.testing.allocator; + const json = try formatAppApplyResponse(alloc, .{ + .app_name = "demo-app", + .release_id = "dep-2", + .status = .completed, + .service_count = 2, + .placed = 2, + .failed = 0, + .message = "all placements succeeded", + .trigger = .rollback, + .source_release_id = "dep-1", + }); + defer alloc.free(json); + + try std.testing.expect(std.mem.indexOf(u8, json, "\"trigger\":\"rollback\"") != null); + try std.testing.expect(std.mem.indexOf(u8, json, "\"source_release_id\":\"dep-1\"") != null); + try std.testing.expect(std.mem.indexOf(u8, json, "\"message\":\"rollback to dep-1 completed: all placements succeeded\"") != null); } test "formatLegacyApplyResponse preserves compact deploy shape" { diff --git a/src/lib/json_helpers.zig b/src/lib/json_helpers.zig index 07d11d6..cc45747 100644 --- a/src/lib/json_helpers.zig +++ b/src/lib/json_helpers.zig @@ -27,6 +27,28 @@ pub fn writeJsonEscaped(writer: anytype, s: []const u8) !void { } } +/// write a single JSON string field without a leading comma. +pub fn writeJsonStringField(writer: anytype, key: []const u8, value: []const u8) !void { + try writer.writeByte('"'); + try writer.writeAll(key); + try writer.writeAll("\":\""); + try writeJsonEscaped(writer, value); + try writer.writeByte('"'); +} + +/// write a single JSON string-or-null field without a leading comma. +pub fn writeNullableJsonStringField(writer: anytype, key: []const u8, value: ?[]const u8) !void { + try writer.writeByte('"'); + try writer.writeAll(key); + if (value) |text| { + try writer.writeAll("\":\""); + try writeJsonEscaped(writer, text); + try writer.writeByte('"'); + } else { + try writer.writeAll("\":null"); + } +} + // -- JSON field extraction -- // minimal JSON field extraction for known request shapes. // avoids pulling in a full parser for simple key-value lookups. @@ -243,6 +265,24 @@ test "plain ascii passthrough" { try std.testing.expectEqualStrings("abc123", stream.getWritten()); } +test "writeJsonStringField emits quoted field" { + var buf: [256]u8 = undefined; + var stream = std.io.fixedBufferStream(&buf); + const writer = stream.writer(); + + try writeJsonStringField(writer, "app_name", "demo-app"); + try std.testing.expectEqualStrings("\"app_name\":\"demo-app\"", stream.getWritten()); +} + +test "writeNullableJsonStringField emits null field" { + var buf: [256]u8 = undefined; + var stream = std.io.fixedBufferStream(&buf); + const writer = stream.writer(); + + try writeNullableJsonStringField(writer, "source_release_id", null); + try std.testing.expectEqualStrings("\"source_release_id\":null", stream.getWritten()); +} + test "extractJsonString basic" { const json = "{\"token\":\"my-secret\",\"address\":\"10.0.0.5:7701\"}"; try std.testing.expectEqualStrings("my-secret", extractJsonString(json, "token").?); diff --git a/src/manifest/apply_release.zig b/src/manifest/apply_release.zig new file mode 100644 index 0000000..6aa687e --- /dev/null +++ b/src/manifest/apply_release.zig @@ -0,0 +1,384 @@ +const std = @import("std"); +const json_helpers = @import("../lib/json_helpers.zig"); +const store = @import("../state/store.zig"); +const update_common = @import("update/common.zig"); + +pub const ApplyTrigger = enum { + apply, + rollback, + + pub fn toString(self: ApplyTrigger) []const u8 { + return switch (self) { + .apply => "apply", + .rollback => "rollback", + }; + } +}; + +pub const ApplyContext = struct { + trigger: ApplyTrigger = .apply, + source_release_id: ?[]const u8 = null, +}; + +pub const ApplyOutcome = struct { + status: update_common.DeploymentStatus, + message: ?[]const u8 = null, + placed: usize = 0, + failed: usize = 0, +}; + +pub const ApplyResult = struct { + release_id: ?[]const u8, + outcome: ApplyOutcome, + + pub fn toReport(self: ApplyResult, app_name: []const u8, service_count: usize, context: ApplyContext) ApplyReport { + return .{ + .app_name = app_name, + .release_id = self.release_id, + .status = self.outcome.status, + .service_count = service_count, + .placed = self.outcome.placed, + .failed = self.outcome.failed, + .message = self.outcome.message, + .manifest_hash = "", + .created_at = 0, + .trigger = context.trigger, + .source_release_id = context.source_release_id, + }; + } +}; + +pub const ApplyReport = struct { + app_name: []const u8, + release_id: ?[]const u8, + status: update_common.DeploymentStatus, + service_count: usize, + placed: usize, + failed: usize, + message: ?[]const u8 = null, + manifest_hash: []const u8 = "", + created_at: i64 = 0, + trigger: ApplyTrigger = .apply, + source_release_id: ?[]const u8 = null, + + pub fn deinit(self: ApplyReport, alloc: std.mem.Allocator) void { + if (self.release_id) |id| alloc.free(id); + } + + pub fn context(self: ApplyReport) ApplyContext { + return .{ + .trigger = self.trigger, + .source_release_id = self.source_release_id, + }; + } + + pub fn resolvedMessage(self: ApplyReport, alloc: std.mem.Allocator) !?[]u8 { + return materializeMessage(alloc, self.context(), self.status, self.message); + } + + pub fn summaryText(self: ApplyReport, alloc: std.mem.Allocator) ![]u8 { + const status_text = self.status.toString(); + const message = try self.resolvedMessage(alloc); + defer if (message) |msg| alloc.free(msg); + + if (self.release_id) |id| { + return std.fmt.allocPrint( + alloc, + "release {s} {s}: {s} ({d} placed, {d} failed, {d} services)", + .{ id, status_text, message.?, self.placed, self.failed, self.service_count }, + ); + } + + return std.fmt.allocPrint( + alloc, + "{s}: {s} ({d} placed, {d} failed, {d} services)", + .{ status_text, message.?, self.placed, self.failed, self.service_count }, + ); + } +}; + +pub fn reportFromDeployment(dep: store.DeploymentRecord) ApplyReport { + const inferred = inferContextFromStoredMessage(dep.message); + return .{ + .app_name = dep.app_name orelse dep.service_name, + .release_id = dep.id, + .status = update_common.DeploymentStatus.fromString(dep.status) orelse .failed, + .service_count = countServices(dep.config_snapshot), + .placed = 0, + .failed = 0, + .message = dep.message, + .manifest_hash = dep.manifest_hash, + .created_at = dep.created_at, + .trigger = inferred.trigger, + .source_release_id = inferred.source_release_id, + }; +} + +fn inferContextFromStoredMessage(message: ?[]const u8) ApplyContext { + const text = message orelse return .{}; + const prefix = "rollback to "; + if (std.mem.startsWith(u8, text, prefix)) { + const remainder = text[prefix.len..]; + const split = std.mem.indexOfScalar(u8, remainder, ' ') orelse return .{ .trigger = .rollback }; + const source_release_id = remainder[0..split]; + if (source_release_id.len > 0) { + return .{ + .trigger = .rollback, + .source_release_id = source_release_id, + }; + } + return .{ .trigger = .rollback }; + } + if (std.mem.startsWith(u8, text, "rollback ")) { + return .{ .trigger = .rollback }; + } + return .{}; +} + +pub fn materializeMessage( + alloc: std.mem.Allocator, + context: ApplyContext, + status: update_common.DeploymentStatus, + explicit: ?[]const u8, +) !?[]u8 { + const status_text = status.toString(); + + return switch (context.trigger) { + .apply => if (explicit) |message| + try alloc.dupe(u8, message) + else switch (status) { + .completed => try alloc.dupe(u8, "apply completed"), + .failed => try alloc.dupe(u8, "apply failed"), + else => try alloc.dupe(u8, status_text), + }, + .rollback => if (context.source_release_id) |source_id| + if (explicit) |message| + try std.fmt.allocPrint(alloc, "rollback to {s} {s}: {s}", .{ source_id, status_text, message }) + else + try std.fmt.allocPrint(alloc, "rollback to {s} {s}", .{ source_id, status_text }) + else if (explicit) |message| + try std.fmt.allocPrint(alloc, "rollback {s}: {s}", .{ status_text, message }) + else + try std.fmt.allocPrint(alloc, "rollback {s}", .{status_text}), + }; +} + +fn countServices(snapshot: []const u8) usize { + const services = json_helpers.extractJsonArray(snapshot, "services") orelse return 0; + var iter = json_helpers.extractJsonObjects(services); + var count: usize = 0; + while (iter.next() != null) count += 1; + return count; +} + +pub fn execute(tracker: anytype, backend: anytype) !ApplyResult { + const release_id = try tracker.begin(); + + const outcome = backend.apply() catch |err| { + if (release_id) |id| { + try tracker.mark(id, .failed, backend.failureMessage(err)); + tracker.freeReleaseId(id); + } + return err; + }; + + if (release_id) |id| { + try tracker.mark(id, outcome.status, outcome.message); + } + + return .{ + .release_id = release_id, + .outcome = outcome, + }; +} + +test "execute marks completed releases on backend success" { + const alloc = std.testing.allocator; + + const Tracker = struct { + alloc: std.mem.Allocator, + last_status: ?update_common.DeploymentStatus = null, + last_message: ?[]const u8 = null, + + fn begin(self: *@This()) !?[]const u8 { + const id = try self.alloc.dupe(u8, "dep123"); + return id; + } + + fn mark(self: *@This(), id: []const u8, status: update_common.DeploymentStatus, message: ?[]const u8) !void { + try std.testing.expectEqualStrings("dep123", id); + self.last_status = status; + self.last_message = message; + } + + fn freeReleaseId(self: *@This(), id: []const u8) void { + self.alloc.free(id); + } + }; + + const Backend = struct { + fn apply(_: *@This()) !ApplyOutcome { + return .{ .status = .completed, .placed = 2 }; + } + + fn failureMessage(_: *@This(), _: anytype) ?[]const u8 { + return "backend failed"; + } + }; + + var tracker = Tracker{ .alloc = alloc }; + var backend = Backend{}; + + const result = try execute(&tracker, &backend); + defer alloc.free(result.release_id.?); + + try std.testing.expectEqualStrings("dep123", result.release_id.?); + try std.testing.expectEqual(update_common.DeploymentStatus.completed, result.outcome.status); + try std.testing.expectEqual(update_common.DeploymentStatus.completed, tracker.last_status.?); + try std.testing.expect(tracker.last_message == null); +} + +test "execute marks failed releases on backend error" { + const alloc = std.testing.allocator; + + const BackendError = error{StartupFailed}; + + const Tracker = struct { + alloc: std.mem.Allocator, + last_status: ?update_common.DeploymentStatus = null, + last_message: ?[]const u8 = null, + + fn begin(self: *@This()) !?[]const u8 { + const id = try self.alloc.dupe(u8, "dep456"); + return id; + } + + fn mark(self: *@This(), id: []const u8, status: update_common.DeploymentStatus, message: ?[]const u8) !void { + try std.testing.expectEqualStrings("dep456", id); + self.last_status = status; + self.last_message = message; + } + + fn freeReleaseId(self: *@This(), id: []const u8) void { + self.alloc.free(id); + } + }; + + const Backend = struct { + fn apply(_: *@This()) BackendError!ApplyOutcome { + return BackendError.StartupFailed; + } + + fn failureMessage(_: *@This(), err: BackendError) ?[]const u8 { + return switch (err) { + BackendError.StartupFailed => "service startup failed", + }; + } + }; + + var tracker = Tracker{ .alloc = alloc }; + var backend = Backend{}; + + try std.testing.expectError(BackendError.StartupFailed, execute(&tracker, &backend)); + try std.testing.expectEqual(update_common.DeploymentStatus.failed, tracker.last_status.?); + try std.testing.expectEqualStrings("service startup failed", tracker.last_message.?); +} + +test "ApplyResult projects to shared apply report" { + const result = ApplyResult{ + .release_id = "dep789", + .outcome = .{ + .status = .completed, + .message = null, + .placed = 3, + .failed = 0, + }, + }; + + const report = result.toReport("demo-app", 3, .{}); + try std.testing.expectEqualStrings("demo-app", report.app_name); + try std.testing.expectEqualStrings("dep789", report.release_id.?); + try std.testing.expectEqual(update_common.DeploymentStatus.completed, report.status); + try std.testing.expectEqual(@as(usize, 3), report.service_count); + try std.testing.expectEqual(@as(usize, 3), report.placed); + try std.testing.expectEqual(@as(usize, 0), report.failed); + try std.testing.expect(report.message == null); + try std.testing.expectEqual(ApplyTrigger.apply, report.trigger); + try std.testing.expect(report.source_release_id == null); +} + +test "ApplyReport summaryText includes release status and counts" { + const alloc = std.testing.allocator; + const report = ApplyReport{ + .app_name = "demo-app", + .release_id = "dep789", + .status = .completed, + .service_count = 3, + .placed = 3, + .failed = 0, + .message = "all requested services started", + }; + + const summary = try report.summaryText(alloc); + defer alloc.free(summary); + + try std.testing.expectEqualStrings( + "release dep789 completed: all requested services started (3 placed, 0 failed, 3 services)", + summary, + ); +} + +test "materializeMessage contextualizes rollback transitions" { + const alloc = std.testing.allocator; + const message = try materializeMessage(alloc, .{ + .trigger = .rollback, + .source_release_id = "dep100", + }, .completed, "all placements succeeded"); + defer alloc.free(message.?); + + try std.testing.expectEqualStrings( + "rollback to dep100 completed: all placements succeeded", + message.?, + ); +} + +test "reportFromDeployment preserves release metadata and counts services" { + const dep = store.DeploymentRecord{ + .id = "dep-22", + .app_name = "demo-app", + .service_name = "demo-app", + .manifest_hash = "sha256:xyz", + .config_snapshot = "{\"app_name\":\"demo-app\",\"services\":[{\"name\":\"web\"},{\"name\":\"db\"}]}", + .status = "completed", + .message = "all requested services started", + .created_at = 220, + }; + + const report = reportFromDeployment(dep); + try std.testing.expectEqualStrings("demo-app", report.app_name); + try std.testing.expectEqualStrings("dep-22", report.release_id.?); + try std.testing.expectEqual(update_common.DeploymentStatus.completed, report.status); + try std.testing.expectEqual(@as(usize, 2), report.service_count); + try std.testing.expectEqualStrings("sha256:xyz", report.manifest_hash); + try std.testing.expectEqual(@as(i64, 220), report.created_at); + try std.testing.expectEqualStrings("all requested services started", report.message.?); + try std.testing.expectEqual(ApplyTrigger.apply, report.trigger); + try std.testing.expect(report.source_release_id == null); +} + +test "reportFromDeployment infers rollback context from stored message" { + const dep = store.DeploymentRecord{ + .id = "dep-23", + .app_name = "demo-app", + .service_name = "demo-app", + .manifest_hash = "sha256:zzz", + .config_snapshot = "{\"app_name\":\"demo-app\",\"services\":[{\"name\":\"web\"}]}", + .status = "completed", + .message = "rollback to dep-11 completed: all placements succeeded", + .created_at = 230, + }; + + const report = reportFromDeployment(dep); + try std.testing.expectEqual(ApplyTrigger.rollback, report.trigger); + try std.testing.expectEqualStrings("dep-11", report.source_release_id.?); +} diff --git a/src/manifest/cli/deploy.zig b/src/manifest/cli/deploy.zig index 52042eb..71edc28 100644 --- a/src/manifest/cli/deploy.zig +++ b/src/manifest/cli/deploy.zig @@ -1,20 +1,13 @@ const std = @import("std"); const cli = @import("../../lib/cli.zig"); const app_spec = @import("../app_spec.zig"); -const release_history = @import("../release_history.zig"); +const local_apply_backend = @import("../local_apply_backend.zig"); const release_plan = @import("../release_plan.zig"); const manifest_loader = @import("../loader.zig"); -const orchestrator = @import("../orchestrator.zig"); -const startup_runtime = @import("../orchestrator/startup_runtime.zig"); -const watcher_mod = @import("../../dev/watcher.zig"); const store = @import("../../state/store.zig"); const process = @import("../../runtime/process.zig"); const http_client = @import("../../cluster/http_client.zig"); const container_cmds = @import("../../runtime/container_commands.zig"); -const proxy_control_plane = @import("../../network/proxy/control_plane.zig"); -const service_rollout = @import("../../network/service_rollout.zig"); -const service_reconciler = @import("../../network/service_reconciler.zig"); -const listener_runtime = @import("../../network/proxy/listener_runtime.zig"); const write = cli.write; const writeErr = cli.writeErr; @@ -86,9 +79,6 @@ pub fn up(args: *std.process.ArgIterator, alloc: std.mem.Allocator) !void { return; } - const release_id = release_history.recordAppReleaseStart(&release) catch null; - defer if (release_id) |id| alloc.free(id); - if (service_names.items.len > 0) { writeErr("starting", .{}); for (service_names.items, 0..) |name, i| { @@ -102,103 +92,39 @@ pub fn up(args: *std.process.ArgIterator, alloc: std.mem.Allocator) !void { writeErr("starting {s} ({d} services)...\n", .{ release.app.app_name, release.resolvedServiceCount() }); } - var orch = orchestrator.Orchestrator.init(alloc, &manifest, release.app.app_name) catch |err| { + var prepared = local_apply_backend.PreparedLocalApply.init(alloc, &manifest, &release, dev_mode) catch |err| { writeErr("failed to initialize orchestrator: {}\n", .{err}); return DeployError.DeploymentFailed; }; - defer orch.deinit(); - orch.dev_mode = dev_mode; - - if (release.service_filter) |filter| { - orch.service_filter = filter; - } + defer prepared.deinit(); + prepared.beginRuntime(); - orch.computeStartSet() catch |err| { - writeErr("failed to resolve service start set: {}\n", .{err}); - return DeployError.DeploymentFailed; - }; - - startup_runtime.syncServiceDefinitions(alloc, manifest.services, orch.start_set); - - service_rollout.logStartupSummary(); - service_reconciler.ensureDataPlaneReadyIfEnabled(); - service_reconciler.bootstrapIfEnabled(); - service_reconciler.startAuditLoopIfEnabled(); - listener_runtime.setStateChangeHook(proxy_control_plane.refreshIfEnabled); - defer listener_runtime.setStateChangeHook(null); - listener_runtime.startIfEnabled(alloc); - defer listener_runtime.stop(); - proxy_control_plane.startSyncLoopIfEnabled(); - defer proxy_control_plane.stopSyncLoop(); - orchestrator.installSignalHandlers(); - - orch.startAll() catch |err| { - if (release_id) |id| { - release_history.markAppReleaseFailed(id, "service startup failed") catch {}; - } + const apply_report = prepared.startRelease(.{}) catch |err| { writeErr("failed to start services: {}\n", .{err}); return DeployError.DeploymentFailed; }; + defer apply_report.deinit(alloc); - if (release_id) |id| { - release_history.markAppReleaseCompleted(id) catch {}; - } + const apply_summary = apply_report.summaryText(alloc) catch return DeployError.OutOfMemory; + defer alloc.free(apply_summary); + writeErr("{s}\n", .{apply_summary}); - var watcher: ?watcher_mod.Watcher = null; - var watcher_thread: ?std.Thread = null; + var watcher = local_apply_backend.DevWatcherRuntime{}; if (dev_mode) { - watcher = watcher_mod.Watcher.init(alloc) catch |e| blk: { - writeErr("warning: file watcher unavailable: {}\n", .{e}); - break :blk null; - }; - - if (watcher != null) { - var any_watch_failed = false; - for (manifest.services, 0..) |svc, i| { - if (!release.includesService(svc.name)) continue; - for (svc.volumes) |vol| { - if (vol.kind != .bind) continue; - - var resolve_buf: [4096]u8 = undefined; - const abs_source = std.fs.cwd().realpath(vol.source, &resolve_buf) catch |e| { - writeErr("warning: failed to resolve path {s}: {}\n", .{ vol.source, e }); - any_watch_failed = true; - continue; - }; - - watcher.?.addRecursive(abs_source, i) catch |e| { - writeErr("warning: failed to watch {s}: {}\n", .{ vol.source, e }); - any_watch_failed = true; - }; - } - } - - if (!any_watch_failed or watcher.?.watch_count > 0) { - watcher_thread = std.Thread.spawn(.{}, orchestrator.watcherThread, .{ - &orch, &watcher.?, - }) catch |e| blk: { - writeErr("warning: failed to start watcher thread: {}\n", .{e}); - break :blk null; - }; - } else { - writeErr("warning: no directories could be watched, file change detection disabled\n", .{}); - } - } - + watcher = prepared.startDevWatcher(); writeErr("all services running. watching for changes...\n", .{}); } else { writeErr("all services running. press ctrl-c to stop.\n", .{}); } - orch.waitForShutdown(); + prepared.orch.waitForShutdown(); writeErr("\nshutting down...\n", .{}); - if (watcher) |*w| w.deinit(); - if (watcher_thread) |t| t.join(); + watcher.deinit(); - orch.stopAll(); + prepared.orch.stopAll(); writeErr("stopped\n", .{}); } diff --git a/src/manifest/cli/ops.zig b/src/manifest/cli/ops.zig index d9cd98c..8721090 100644 --- a/src/manifest/cli/ops.zig +++ b/src/manifest/cli/ops.zig @@ -2,6 +2,7 @@ const std = @import("std"); const cli = @import("../../lib/cli.zig"); const json_helpers = @import("../../lib/json_helpers.zig"); const json_out = @import("../../lib/json_output.zig"); +const apply_release = @import("../apply_release.zig"); const manifest_loader = @import("../loader.zig"); const orchestrator = @import("../orchestrator.zig"); const release_history = @import("../release_history.zig"); @@ -238,21 +239,26 @@ const HistoryEntryView = struct { id: []const u8, app: ?[]const u8, service: []const u8, + trigger: []const u8, status: []const u8, manifest_hash: []const u8, created_at: i64, + source_release_id: ?[]const u8, message: ?[]const u8, }; fn historyEntryFromDeployment(dep: store.DeploymentRecord) HistoryEntryView { + const report = apply_release.reportFromDeployment(dep); return .{ - .id = dep.id, + .id = report.release_id orelse dep.id, .app = dep.app_name, .service = dep.service_name, - .status = dep.status, - .manifest_hash = dep.manifest_hash, - .created_at = dep.created_at, - .message = dep.message, + .trigger = report.trigger.toString(), + .status = report.status.toString(), + .manifest_hash = report.manifest_hash, + .created_at = report.created_at, + .source_release_id = report.source_release_id, + .message = report.message, }; } @@ -261,9 +267,11 @@ fn parseHistoryObject(obj: []const u8) HistoryEntryView { .id = json_helpers.extractJsonString(obj, "id") orelse "?", .app = json_helpers.extractJsonString(obj, "app"), .service = json_helpers.extractJsonString(obj, "service") orelse "?", + .trigger = json_helpers.extractJsonString(obj, "trigger") orelse "apply", .status = json_helpers.extractJsonString(obj, "status") orelse "?", .manifest_hash = json_helpers.extractJsonString(obj, "manifest_hash") orelse "?", .created_at = json_helpers.extractJsonInt(obj, "created_at") orelse 0, + .source_release_id = json_helpers.extractJsonString(obj, "source_release_id"), .message = json_helpers.extractJsonString(obj, "message"), }; } @@ -292,9 +300,11 @@ fn writeHistoryJsonObject(w: *json_out.JsonWriter, entry: HistoryEntryView) void w.stringField("id", entry.id); if (entry.app) |app_name| w.stringField("app", app_name) else w.nullField("app"); w.stringField("service", entry.service); + w.stringField("trigger", entry.trigger); w.stringField("status", entry.status); w.stringField("manifest_hash", entry.manifest_hash); w.intField("created_at", entry.created_at); + if (entry.source_release_id) |source_release_id| w.stringField("source_release_id", source_release_id) else w.nullField("source_release_id"); if (entry.message) |message| w.stringField("message", message) else w.nullField("message"); w.endObject(); } @@ -331,15 +341,17 @@ fn rollbackRemoteApp(alloc: std.mem.Allocator, addr_str: []const u8, app_name: [ test "parseHistoryObject extracts app release fields" { const entry = parseHistoryObject( - \\{"id":"dep-1","app":"demo-app","service":"demo-app","status":"completed","manifest_hash":"sha256:123","created_at":42,"message":null} + \\{"id":"dep-1","app":"demo-app","service":"demo-app","trigger":"apply","status":"completed","manifest_hash":"sha256:123","created_at":42,"source_release_id":null,"message":null} ); try std.testing.expectEqualStrings("dep-1", entry.id); try std.testing.expectEqualStrings("demo-app", entry.app.?); try std.testing.expectEqualStrings("demo-app", entry.service); + try std.testing.expectEqualStrings("apply", entry.trigger); try std.testing.expectEqualStrings("completed", entry.status); try std.testing.expectEqualStrings("sha256:123", entry.manifest_hash); try std.testing.expectEqual(@as(i64, 42), entry.created_at); + try std.testing.expect(entry.source_release_id == null); try std.testing.expect(entry.message == null); } @@ -349,7 +361,7 @@ test "historyEntryFromDeployment matches remote app history shape" { .app_name = "demo-app", .service_name = "demo-app", .manifest_hash = "sha256:123", - .config_snapshot = "{}", + .config_snapshot = "{\"services\":[{\"name\":\"web\"}]}", .status = "completed", .message = "healthy", .created_at = 42, @@ -357,15 +369,17 @@ test "historyEntryFromDeployment matches remote app history shape" { const local = historyEntryFromDeployment(dep); const remote = parseHistoryObject( - \\{"id":"dep-1","app":"demo-app","service":"demo-app","status":"completed","manifest_hash":"sha256:123","created_at":42,"message":"healthy"} + \\{"id":"dep-1","app":"demo-app","service":"demo-app","trigger":"apply","status":"completed","manifest_hash":"sha256:123","created_at":42,"source_release_id":null,"message":"healthy"} ); try std.testing.expectEqualStrings(local.id, remote.id); try std.testing.expectEqualStrings(local.app.?, remote.app.?); try std.testing.expectEqualStrings(local.service, remote.service); + try std.testing.expectEqualStrings(local.trigger, remote.trigger); try std.testing.expectEqualStrings(local.status, remote.status); try std.testing.expectEqualStrings(local.manifest_hash, remote.manifest_hash); try std.testing.expectEqual(local.created_at, remote.created_at); + try std.testing.expect(local.source_release_id == null); try std.testing.expectEqualStrings(local.message.?, remote.message.?); } @@ -374,9 +388,11 @@ test "writeHistoryJsonObject round-trips through remote parser" { .id = "dep-1", .app = "demo-app", .service = "demo-app", + .trigger = "rollback", .status = "completed", .manifest_hash = "sha256:123", .created_at = 42, + .source_release_id = "dep-0", .message = "healthy", }; @@ -387,9 +403,11 @@ test "writeHistoryJsonObject round-trips through remote parser" { try std.testing.expectEqualStrings(entry.id, parsed.id); try std.testing.expectEqualStrings(entry.app.?, parsed.app.?); try std.testing.expectEqualStrings(entry.service, parsed.service); + try std.testing.expectEqualStrings(entry.trigger, parsed.trigger); try std.testing.expectEqualStrings(entry.status, parsed.status); try std.testing.expectEqualStrings(entry.manifest_hash, parsed.manifest_hash); try std.testing.expectEqual(entry.created_at, parsed.created_at); + try std.testing.expectEqualStrings(entry.source_release_id.?, parsed.source_release_id.?); try std.testing.expectEqualStrings(entry.message.?, parsed.message.?); } diff --git a/src/manifest/local_apply_backend.zig b/src/manifest/local_apply_backend.zig new file mode 100644 index 0000000..2363720 --- /dev/null +++ b/src/manifest/local_apply_backend.zig @@ -0,0 +1,208 @@ +const std = @import("std"); +const cli = @import("../lib/cli.zig"); +const apply_release = @import("apply_release.zig"); +const release_history = @import("release_history.zig"); +const release_plan = @import("release_plan.zig"); +const orchestrator = @import("orchestrator.zig"); +const startup_runtime = @import("orchestrator/startup_runtime.zig"); +const watcher_mod = @import("../dev/watcher.zig"); +const spec = @import("spec.zig"); +const proxy_control_plane = @import("../network/proxy/control_plane.zig"); +const service_rollout = @import("../network/service_rollout.zig"); +const service_reconciler = @import("../network/service_reconciler.zig"); +const listener_runtime = @import("../network/proxy/listener_runtime.zig"); + +const writeErr = cli.writeErr; + +pub const PreparedLocalApply = struct { + alloc: std.mem.Allocator, + manifest: *spec.Manifest, + release: *const release_plan.ReleasePlan, + orch: orchestrator.Orchestrator, + runtime_started: bool = false, + + pub fn init( + alloc: std.mem.Allocator, + manifest: *spec.Manifest, + release: *const release_plan.ReleasePlan, + dev_mode: bool, + ) !PreparedLocalApply { + var orch = try orchestrator.Orchestrator.init(alloc, manifest, release.app.app_name); + errdefer orch.deinit(); + + orch.dev_mode = dev_mode; + if (release.service_filter) |filter| { + orch.service_filter = filter; + } + + try orch.computeStartSet(); + startup_runtime.syncServiceDefinitions(alloc, manifest.services, orch.start_set); + + return .{ + .alloc = alloc, + .manifest = manifest, + .release = release, + .orch = orch, + }; + } + + pub fn deinit(self: *PreparedLocalApply) void { + if (self.runtime_started) { + listener_runtime.stop(); + proxy_control_plane.stopSyncLoop(); + listener_runtime.setStateChangeHook(null); + } + self.orch.deinit(); + } + + pub fn beginRuntime(self: *PreparedLocalApply) void { + service_rollout.logStartupSummary(); + service_reconciler.ensureDataPlaneReadyIfEnabled(); + service_reconciler.bootstrapIfEnabled(); + service_reconciler.startAuditLoopIfEnabled(); + listener_runtime.setStateChangeHook(proxy_control_plane.refreshIfEnabled); + listener_runtime.startIfEnabled(self.alloc); + proxy_control_plane.startSyncLoopIfEnabled(); + orchestrator.installSignalHandlers(); + self.runtime_started = true; + } + + pub fn startRelease(self: *PreparedLocalApply, context: apply_release.ApplyContext) !apply_release.ApplyReport { + var release_tracker = LocalReleaseTracker{ + .plan = self.release, + .context = context, + }; + var apply_backend = LocalApplyBackend{ + .orch = &self.orch, + .release = self.release, + }; + const apply_result = try apply_release.execute(&release_tracker, &apply_backend); + return apply_result.toReport(self.release.app.app_name, self.release.resolvedServiceCount(), context); + } + + pub fn startDevWatcher(self: *PreparedLocalApply) DevWatcherRuntime { + var runtime = DevWatcherRuntime{}; + runtime.watcher = watcher_mod.Watcher.init(self.alloc) catch |e| blk: { + writeErr("warning: file watcher unavailable: {}\n", .{e}); + break :blk null; + }; + + if (runtime.watcher == null) return runtime; + + var any_watch_failed = false; + for (self.manifest.services, 0..) |svc, i| { + if (!self.release.includesService(svc.name)) continue; + for (svc.volumes) |vol| { + if (vol.kind != .bind) continue; + + var resolve_buf: [4096]u8 = undefined; + const abs_source = std.fs.cwd().realpath(vol.source, &resolve_buf) catch |e| { + writeErr("warning: failed to resolve path {s}: {}\n", .{ vol.source, e }); + any_watch_failed = true; + continue; + }; + + runtime.watcher.?.addRecursive(abs_source, i) catch |e| { + writeErr("warning: failed to watch {s}: {}\n", .{ vol.source, e }); + any_watch_failed = true; + }; + } + } + + if (!any_watch_failed or runtime.watcher.?.watch_count > 0) { + runtime.thread = std.Thread.spawn(.{}, orchestrator.watcherThread, .{ + &self.orch, + &runtime.watcher.?, + }) catch |e| blk: { + writeErr("warning: failed to start watcher thread: {}\n", .{e}); + break :blk null; + }; + } else { + writeErr("warning: no directories could be watched, file change detection disabled\n", .{}); + } + + return runtime; + } +}; + +pub const DevWatcherRuntime = struct { + watcher: ?watcher_mod.Watcher = null, + thread: ?std.Thread = null, + + pub fn deinit(self: *DevWatcherRuntime) void { + if (self.watcher) |*w| w.deinit(); + if (self.thread) |t| t.join(); + } +}; + +const LocalReleaseTracker = struct { + plan: *const release_plan.ReleasePlan, + context: apply_release.ApplyContext = .{}, + + pub fn begin(self: *const LocalReleaseTracker) !?[]const u8 { + return release_history.recordAppReleaseStart(self.plan) catch null; + } + + pub fn mark(self: *const LocalReleaseTracker, id: []const u8, status: @import("update/common.zig").DeploymentStatus, message: ?[]const u8) !void { + const resolved_message = try apply_release.materializeMessage(self.plan.alloc, self.context, status, message); + defer if (resolved_message) |msg| self.plan.alloc.free(msg); + + switch (status) { + .completed => release_history.markAppReleaseCompleted(id, resolved_message) catch {}, + .failed => release_history.markAppReleaseFailed(id, resolved_message) catch {}, + else => {}, + } + } + + pub fn freeReleaseId(self: *const LocalReleaseTracker, id: []const u8) void { + self.plan.alloc.free(id); + } +}; + +const LocalApplyBackend = struct { + orch: *orchestrator.Orchestrator, + release: *const release_plan.ReleasePlan, + + pub fn apply(self: *const LocalApplyBackend) !apply_release.ApplyOutcome { + try self.orch.startAll(); + return .{ + .status = .completed, + .message = "all requested services started", + .placed = self.release.resolvedServiceCount(), + }; + } + + pub fn failureMessage(_: *const LocalApplyBackend, _: anytype) ?[]const u8 { + return "service startup failed"; + } +}; + +test "PreparedLocalApply init resolves filtered start set" { + const alloc = std.testing.allocator; + const loader = @import("loader.zig"); + const app_spec = @import("app_spec.zig"); + + var manifest = try loader.loadFromString(alloc, + \\[service.db] + \\image = "postgres:latest" + \\ + \\[service.web] + \\image = "nginx:latest" + \\depends_on = ["db"] + ); + defer manifest.deinit(); + + var app = try app_spec.fromManifest(alloc, "demo-app", &manifest); + defer app.deinit(); + + var release = try release_plan.ReleasePlan.fromAppSpec(alloc, &app, &.{"web"}); + defer release.deinit(); + + var prepared = try PreparedLocalApply.init(alloc, &manifest, &release, false); + defer prepared.deinit(); + + const start_set = prepared.orch.start_set.?; + try std.testing.expectEqual(@as(usize, 2), start_set.count()); + try std.testing.expect(start_set.contains("db")); + try std.testing.expect(start_set.contains("web")); +} diff --git a/src/manifest/release_history.zig b/src/manifest/release_history.zig index 7b0aae7..8561c96 100644 --- a/src/manifest/release_history.zig +++ b/src/manifest/release_history.zig @@ -19,8 +19,8 @@ pub fn recordAppReleaseStart(plan: *const release_plan.ReleasePlan) ![]const u8 return id; } -pub fn markAppReleaseCompleted(id: []const u8) !void { - try deployment_store.updateDeploymentStatus(id, .completed, null); +pub fn markAppReleaseCompleted(id: []const u8, message: ?[]const u8) !void { + try deployment_store.updateDeploymentStatus(id, .completed, message); } pub fn markAppReleaseFailed(id: []const u8, message: ?[]const u8) !void { diff --git a/src/runtime/cli/status_command.zig b/src/runtime/cli/status_command.zig index c4fe14c..ed1bf74 100644 --- a/src/runtime/cli/status_command.zig +++ b/src/runtime/cli/status_command.zig @@ -1,6 +1,7 @@ const std = @import("std"); const cli = @import("../../lib/cli.zig"); const json_out = @import("../../lib/json_output.zig"); +const apply_release = @import("../../manifest/apply_release.zig"); const store = @import("../../state/store.zig"); const monitor = @import("../monitor.zig"); const cgroups = @import("../cgroups.zig"); @@ -99,11 +100,13 @@ fn statusLocal(alloc: std.mem.Allocator, verbose: bool) StatusError!void { const AppStatusSnapshot = struct { app_name: []const u8, + trigger: []const u8, release_id: []const u8, status: []const u8, manifest_hash: []const u8, created_at: i64, service_count: usize, + source_release_id: ?[]const u8, message: ?[]const u8, }; @@ -120,7 +123,7 @@ fn statusLocalApp(alloc: std.mem.Allocator, app_name: []const u8) StatusError!vo }; defer latest.deinit(alloc); - const snapshot = appStatusFromDeployment(latest); + const snapshot = appStatusFromReport(apply_release.reportFromDeployment(latest)); printAppStatus(snapshot); } @@ -259,11 +262,13 @@ fn printAppStatus(snapshot: AppStatusSnapshot) void { fn parseAppStatusResponse(json: []const u8) AppStatusSnapshot { return .{ .app_name = extractJsonString(json, "app_name") orelse "?", + .trigger = extractJsonString(json, "trigger") orelse "apply", .release_id = extractJsonString(json, "release_id") orelse "?", .status = extractJsonString(json, "status") orelse "unknown", .manifest_hash = extractJsonString(json, "manifest_hash") orelse "?", .created_at = extractJsonInt(json, "created_at") orelse 0, .service_count = @intCast(@max(0, extractJsonInt(json, "service_count") orelse 0)), + .source_release_id = extractJsonString(json, "source_release_id"), .message = extractJsonString(json, "message"), }; } @@ -271,34 +276,30 @@ fn parseAppStatusResponse(json: []const u8) AppStatusSnapshot { fn writeAppStatusJsonObject(w: *json_out.JsonWriter, snapshot: AppStatusSnapshot) void { w.beginObject(); w.stringField("app_name", snapshot.app_name); + w.stringField("trigger", snapshot.trigger); w.stringField("release_id", snapshot.release_id); w.stringField("status", snapshot.status); w.stringField("manifest_hash", snapshot.manifest_hash); w.intField("created_at", snapshot.created_at); w.uintField("service_count", snapshot.service_count); + if (snapshot.source_release_id) |source_release_id| w.stringField("source_release_id", source_release_id) else w.nullField("source_release_id"); if (snapshot.message) |message| w.stringField("message", message) else w.nullField("message"); } -fn appStatusFromDeployment(latest: store.DeploymentRecord) AppStatusSnapshot { +fn appStatusFromReport(report: apply_release.ApplyReport) AppStatusSnapshot { return .{ - .app_name = latest.app_name orelse latest.service_name, - .release_id = latest.id, - .status = latest.status, - .manifest_hash = latest.manifest_hash, - .created_at = latest.created_at, - .service_count = countServices(latest.config_snapshot), - .message = latest.message, + .app_name = report.app_name, + .trigger = report.trigger.toString(), + .release_id = report.release_id orelse "?", + .status = report.status.toString(), + .manifest_hash = report.manifest_hash, + .created_at = report.created_at, + .service_count = report.service_count, + .source_release_id = report.source_release_id, + .message = report.message, }; } -fn countServices(snapshot: []const u8) usize { - const services = extractJsonArray(snapshot, "services") orelse return 0; - var iter = json_helpers.extractJsonObjects(services); - var count: usize = 0; - while (iter.next() != null) count += 1; - return count; -} - fn currentAppNameAlloc(alloc: std.mem.Allocator) ![]u8 { var cwd_buf: [4096]u8 = undefined; const cwd = std.fs.cwd().realpath(".", &cwd_buf) catch return StatusError.StoreError; @@ -406,59 +407,59 @@ fn parsePsiFromJson(json: []const u8, some_key: []const u8, full_key: []const u8 test "parseAppStatusResponse extracts app fields" { const snapshot = parseAppStatusResponse( - \\{"app_name":"demo-app","release_id":"abc123def456","status":"completed","manifest_hash":"sha256:123","created_at":42,"service_count":2,"message":null} + \\{"app_name":"demo-app","trigger":"apply","release_id":"abc123def456","status":"completed","manifest_hash":"sha256:123","created_at":42,"service_count":2,"source_release_id":null,"message":null} ); try std.testing.expectEqualStrings("demo-app", snapshot.app_name); + try std.testing.expectEqualStrings("apply", snapshot.trigger); try std.testing.expectEqualStrings("abc123def456", snapshot.release_id); try std.testing.expectEqualStrings("completed", snapshot.status); try std.testing.expectEqualStrings("sha256:123", snapshot.manifest_hash); try std.testing.expectEqual(@as(i64, 42), snapshot.created_at); try std.testing.expectEqual(@as(usize, 2), snapshot.service_count); + try std.testing.expect(snapshot.source_release_id == null); try std.testing.expect(snapshot.message == null); } -test "countServices counts service objects in app snapshot" { - const snapshot = - \\{"app_name":"demo-app","services":[{"name":"web"},{"name":"db"},{"name":"worker"}]} - ; - try std.testing.expectEqual(@as(usize, 3), countServices(snapshot)); -} - -test "appStatusFromDeployment matches remote app status shape" { - const latest = store.DeploymentRecord{ - .id = "dep-2", +test "appStatusFromReport matches remote app status shape" { + const report = apply_release.ApplyReport{ .app_name = "demo-app", - .service_name = "demo-app", - .manifest_hash = "sha256:222", - .config_snapshot = "{\"app_name\":\"demo-app\",\"services\":[{\"name\":\"web\"},{\"name\":\"db\"}]}", - .status = "completed", + .release_id = "dep-2", + .status = .completed, + .service_count = 2, + .placed = 2, + .failed = 0, .message = "all placements healthy", + .manifest_hash = "sha256:222", .created_at = 200, }; - const local = appStatusFromDeployment(latest); + const local = appStatusFromReport(report); const remote = parseAppStatusResponse( - \\{"app_name":"demo-app","release_id":"dep-2","status":"completed","manifest_hash":"sha256:222","created_at":200,"service_count":2,"message":"all placements healthy"} + \\{"app_name":"demo-app","trigger":"apply","release_id":"dep-2","status":"completed","manifest_hash":"sha256:222","created_at":200,"service_count":2,"source_release_id":null,"message":"all placements healthy"} ); try std.testing.expectEqualStrings(local.app_name, remote.app_name); + try std.testing.expectEqualStrings(local.trigger, remote.trigger); try std.testing.expectEqualStrings(local.release_id, remote.release_id); try std.testing.expectEqualStrings(local.status, remote.status); try std.testing.expectEqualStrings(local.manifest_hash, remote.manifest_hash); try std.testing.expectEqual(local.created_at, remote.created_at); try std.testing.expectEqual(local.service_count, remote.service_count); + try std.testing.expect(local.source_release_id == null); try std.testing.expectEqualStrings(local.message.?, remote.message.?); } test "writeAppStatusJsonObject round-trips through remote parser" { const snapshot = AppStatusSnapshot{ .app_name = "demo-app", + .trigger = "rollback", .release_id = "dep-2", .status = "completed", .manifest_hash = "sha256:222", .created_at = 200, .service_count = 2, + .source_release_id = "dep-1", .message = "all placements healthy", }; @@ -467,10 +468,12 @@ test "writeAppStatusJsonObject round-trips through remote parser" { const parsed = parseAppStatusResponse(w.getWritten()); try std.testing.expectEqualStrings(snapshot.app_name, parsed.app_name); + try std.testing.expectEqualStrings(snapshot.trigger, parsed.trigger); try std.testing.expectEqualStrings(snapshot.release_id, parsed.release_id); try std.testing.expectEqualStrings(snapshot.status, parsed.status); try std.testing.expectEqualStrings(snapshot.manifest_hash, parsed.manifest_hash); try std.testing.expectEqual(snapshot.created_at, parsed.created_at); try std.testing.expectEqual(snapshot.service_count, parsed.service_count); + try std.testing.expectEqualStrings(snapshot.source_release_id.?, parsed.source_release_id.?); try std.testing.expectEqualStrings(snapshot.message.?, parsed.message.?); } diff --git a/src/state/store/deployments.zig b/src/state/store/deployments.zig index c7fe2f4..f65cd03 100644 --- a/src/state/store/deployments.zig +++ b/src/state/store/deployments.zig @@ -121,7 +121,7 @@ pub fn listDeployments(alloc: Allocator, service_name: []const u8) StoreError!st return listQueryInDb( db, alloc, - "SELECT " ++ deployment_columns ++ " FROM deployments WHERE service_name = ? ORDER BY created_at DESC;", + "SELECT " ++ deployment_columns ++ " FROM deployments WHERE service_name = ? ORDER BY created_at DESC, rowid DESC;", .{service_name}, ); } @@ -139,7 +139,7 @@ pub fn listDeploymentsByAppInDb( return listQueryInDb( db, alloc, - "SELECT " ++ deployment_columns ++ " FROM deployments WHERE app_name = ? ORDER BY created_at DESC;", + "SELECT " ++ deployment_columns ++ " FROM deployments WHERE app_name = ? ORDER BY created_at DESC, rowid DESC;", .{app_name}, ); } @@ -165,7 +165,7 @@ pub fn updateDeploymentStatusInDb( pub fn getLatestDeployment(alloc: Allocator, service_name: []const u8) StoreError!DeploymentRecord { return queryOne( alloc, - "SELECT " ++ deployment_columns ++ " FROM deployments WHERE service_name = ? ORDER BY created_at DESC LIMIT 1;", + "SELECT " ++ deployment_columns ++ " FROM deployments WHERE service_name = ? ORDER BY created_at DESC, rowid DESC LIMIT 1;", .{service_name}, ); } @@ -183,7 +183,7 @@ pub fn getLatestDeploymentByAppInDb( return queryOneInDb( db, alloc, - "SELECT " ++ deployment_columns ++ " FROM deployments WHERE app_name = ? ORDER BY created_at DESC LIMIT 1;", + "SELECT " ++ deployment_columns ++ " FROM deployments WHERE app_name = ? ORDER BY created_at DESC, rowid DESC LIMIT 1;", .{app_name}, ); } @@ -191,7 +191,7 @@ pub fn getLatestDeploymentByAppInDb( pub fn getLastSuccessfulDeployment(alloc: Allocator, service_name: []const u8) StoreError!DeploymentRecord { return queryOne( alloc, - "SELECT " ++ deployment_columns ++ " FROM deployments WHERE service_name = ? AND status = 'completed' ORDER BY created_at DESC LIMIT 1;", + "SELECT " ++ deployment_columns ++ " FROM deployments WHERE service_name = ? AND status = 'completed' ORDER BY created_at DESC, rowid DESC LIMIT 1;", .{service_name}, ); } @@ -199,7 +199,7 @@ pub fn getLastSuccessfulDeployment(alloc: Allocator, service_name: []const u8) S pub fn getLastSuccessfulDeploymentByApp(alloc: Allocator, app_name: []const u8) StoreError!DeploymentRecord { return queryOne( alloc, - "SELECT " ++ deployment_columns ++ " FROM deployments WHERE app_name = ? AND status = 'completed' ORDER BY created_at DESC LIMIT 1;", + "SELECT " ++ deployment_columns ++ " FROM deployments WHERE app_name = ? AND status = 'completed' ORDER BY created_at DESC, rowid DESC LIMIT 1;", .{app_name}, ); } @@ -259,7 +259,7 @@ test "deployment list ordered by timestamp desc" { db.exec("INSERT INTO deployments (id, app_name, service_name, manifest_hash, config_snapshot, status, created_at) VALUES (?, ?, ?, ?, ?, ?, ?);", .{}, .{ "dep-new", "demo-app", "web", "sha256:new", "{}", "completed", @as(i64, 200) }) catch unreachable; const alloc = std.testing.allocator; - var stmt = db.prepare("SELECT " ++ deployment_columns ++ " FROM deployments WHERE service_name = ? ORDER BY created_at DESC;") catch unreachable; + var stmt = db.prepare("SELECT " ++ deployment_columns ++ " FROM deployments WHERE service_name = ? ORDER BY created_at DESC, rowid DESC;") catch unreachable; defer stmt.deinit(); var results: std.ArrayList(DeploymentRecord) = .empty; @@ -304,7 +304,7 @@ test "deployment latest returns most recent" { db.exec("INSERT INTO deployments (id, app_name, service_name, manifest_hash, config_snapshot, status, created_at) VALUES (?, ?, ?, ?, ?, ?, ?);", .{}, .{ "dep-2", "demo-app", "web", "sha256:second", "{}", "in_progress", @as(i64, 200) }) catch unreachable; const alloc = std.testing.allocator; - const row = (db.oneAlloc(DeploymentRow, alloc, "SELECT " ++ deployment_columns ++ " FROM deployments WHERE service_name = ? ORDER BY created_at DESC LIMIT 1;", .{}, .{"web"}) catch unreachable).?; + const row = (db.oneAlloc(DeploymentRow, alloc, "SELECT " ++ deployment_columns ++ " FROM deployments WHERE service_name = ? ORDER BY created_at DESC, rowid DESC LIMIT 1;", .{}, .{"web"}) catch unreachable).?; const record = rowToRecord(row); defer record.deinit(alloc); @@ -330,10 +330,35 @@ test "deployment app queries return only matching app records" { db.exec("INSERT INTO deployments (id, app_name, service_name, manifest_hash, config_snapshot, status, created_at) VALUES (?, ?, ?, ?, ?, ?, ?);", .{}, .{ "dep-b", "app-b", "api", "sha256:b", "{}", "completed", @as(i64, 200) }) catch unreachable; const alloc = std.testing.allocator; - const row = (db.oneAlloc(DeploymentRow, alloc, "SELECT " ++ deployment_columns ++ " FROM deployments WHERE app_name = ? ORDER BY created_at DESC LIMIT 1;", .{}, .{"app-a"}) catch unreachable).?; + const row = (db.oneAlloc(DeploymentRow, alloc, "SELECT " ++ deployment_columns ++ " FROM deployments WHERE app_name = ? ORDER BY created_at DESC, rowid DESC LIMIT 1;", .{}, .{"app-a"}) catch unreachable).?; const record = rowToRecord(row); defer record.deinit(alloc); try std.testing.expectEqualStrings("dep-a", record.id); try std.testing.expectEqualStrings("app-a", record.app_name.?); } + +test "deployment latest prefers later insert when timestamps tie" { + var db = try sqlite.Db.init(.{ .mode = .Memory, .open_flags = .{ .write = true } }); + defer db.deinit(); + try schema.init(&db); + + db.exec("INSERT INTO deployments (id, app_name, service_name, manifest_hash, config_snapshot, status, created_at) VALUES (?, ?, ?, ?, ?, ?, ?);", .{}, .{ "dep-1", "demo-app", "demo-app", "sha256:first", "{}", "completed", @as(i64, 100) }) catch unreachable; + db.exec("INSERT INTO deployments (id, app_name, service_name, manifest_hash, config_snapshot, status, created_at) VALUES (?, ?, ?, ?, ?, ?, ?);", .{}, .{ "dep-2", "demo-app", "demo-app", "sha256:second", "{}", "completed", @as(i64, 100) }) catch unreachable; + + const alloc = std.testing.allocator; + + var deployments = try listDeploymentsByAppInDb(&db, alloc, "demo-app"); + defer { + for (deployments.items) |record| record.deinit(alloc); + deployments.deinit(alloc); + } + + try std.testing.expectEqualStrings("dep-2", deployments.items[0].id); + try std.testing.expectEqualStrings("dep-1", deployments.items[1].id); + + const latest = try getLatestDeploymentByAppInDb(&db, alloc, "demo-app"); + defer latest.deinit(alloc); + + try std.testing.expectEqualStrings("dep-2", latest.id); +} diff --git a/src/test_root.zig b/src/test_root.zig index 9ed86d3..0689862 100644 --- a/src/test_root.zig +++ b/src/test_root.zig @@ -76,6 +76,8 @@ comptime { _ = @import("build/commands.zig"); _ = @import("manifest/spec.zig"); _ = @import("manifest/app_spec.zig"); + _ = @import("manifest/apply_release.zig"); + _ = @import("manifest/local_apply_backend.zig"); _ = @import("manifest/release_plan.zig"); _ = @import("manifest/release_history.zig"); _ = @import("manifest/cli/ops.zig"); @@ -95,6 +97,7 @@ comptime { _ = @import("api/http.zig"); _ = @import("api/routes.zig"); _ = @import("api/routes/cluster_agents.zig"); + _ = @import("api/routes/cluster_agents/apply_request.zig"); _ = @import("api/routes/cluster_agents/app_routes.zig"); _ = @import("api/routes/cluster_agents/deploy_routes.zig"); _ = @import("api/routes/status_metrics.zig");