Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"p-queue": "^9.2.0",
"pino": "^10.3.1",
"pino-http": "^11.0.0",
"prom-client": "^15.1.3",
"unzipper": "^0.12.3"
},
"devDependencies": {
Expand Down
50 changes: 48 additions & 2 deletions src/app.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,59 @@
import express from "express";
import pinoHttp from "pino-http";
import { pinoHttp } from "pino-http";
import crypto from "crypto";
import { deployRouter } from "./routes/deploy.js";
import { invokeRouter } from "./routes/invoke.js";
import { httpLoggerOptions } from "./utils/logger.js";
import { register, httpRequestDuration, httpRequestsTotal } from "./utils/metrics.js";
import { deployQueue } from "./deploy/queue.js";

export const app = express();

// @ts-expect-error
app.use((req, _res, next) => {
req.id = req.headers["x-request-id"] || crypto.randomUUID();
next();
});
app.use(pinoHttp(httpLoggerOptions));

app.use((req, res, next) => {
const start = process.hrtime.bigint();

res.on("finish", () => {
const durationSec =
Number(process.hrtime.bigint() - start) / 1_000_000_000;
const route = req.route?.path || req.path || "unknown";
const labels = {
method: req.method,
route,
status_code: String(res.statusCode),
};
httpRequestDuration.observe(labels, durationSec);
httpRequestsTotal.inc(labels);
});

next();
});

app.use(express.json());
app.use("/deploy", deployRouter);
app.use("/f", invokeRouter);

app.get("/metrics", async (_req, res) => {
res.set("Content-Type", register.contentType);
res.end(await register.metrics());
});

app.get("/health", (_req, res) => {
res.json({ status: "ok", uptime: process.uptime() });
});

app.get("/ready", (_req, res) => {
const checks = {
queueAvailable: deployQueue.size < 50,
memoryOk: process.memoryUsage().heapUsed < 500 * 1024 * 1024,
};
const healthy = Object.values(checks).every(Boolean);
res
.status(healthy ? 200 : 503)
.json({ status: healthy ? "ready" : "not_ready", checks });
});
23 changes: 15 additions & 8 deletions src/deploy/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import fs from "fs";
import crypto from "crypto";
import { getPaths } from "../utils/path.js";
import { pipelineLogger } from "../utils/logger.js";
import { deployTotal, deployStageDuration } from "../utils/metrics.js";

export async function deployFunction(zipPath: string) {
const functionId = crypto.randomBytes(8).toString("hex");
Expand All @@ -23,65 +24,68 @@ export async function deployFunction(zipPath: string) {
);

try {
// ── Stage 1: Extract zip ──────────────────────────────────────
const t0 = performance.now();
await extractZip(zipPath, paths.outputDir);
const extractDuration = performance.now() - t0;
deployStageDuration.observe({ stage: "extract" }, extractDuration / 1000);
pipelineLogger.info(
{ functionId, stage: "extract", durationMs: extractDuration },
"zip extraction completed",
);
await fs.promises.unlink(zipPath);

// ── Stage 2: Prepare rootfs ───────────────────────────────────
const t1 = performance.now();
const image = await prepareRootfs(functionId);
const rootfsDuration = performance.now() - t1;
deployStageDuration.observe({ stage: "rootfs" }, rootfsDuration / 1000);
pipelineLogger.info(
{ functionId, stage: "rootfs", durationMs: rootfsDuration, image },
"rootfs preparation completed",
);

// ── Stage 3: Spawn Firecracker ────────────────────────────────
const t2 = performance.now();
fc = await startFirecrackerProcess(paths.apiSock);
const spawnDuration = performance.now() - t2;
deployStageDuration.observe({ stage: "fc-spawn" }, spawnDuration / 1000);
pipelineLogger.info(
{ functionId, stage: "fc-spawn", durationMs: spawnDuration },
"firecracker process spawned",
);

// ── Stage 4: Configure VM ─────────────────────────────────────
const t3 = performance.now();
const readyPromise = waitForVMReady(fc);
const client = createFcCient(paths.apiSock);

const t4 = performance.now();
await configureVM(client, functionId, image);
const configureDuration = performance.now() - t4;
deployStageDuration.observe({ stage: "configure-vm" }, configureDuration / 1000);
pipelineLogger.info(
{ functionId, stage: "configure-vm", durationMs: configureDuration },
"VM configured",
);

// ── Stage 5: Wait for VM ready ────────────────────────────────
await readyPromise;
const readyDuration = performance.now() - t3;
deployStageDuration.observe({ stage: "vm-ready" }, readyDuration / 1000);
pipelineLogger.info(
{ functionId, stage: "vm-ready", durationMs: readyDuration },
"VM reported READY",
);

// ── Stage 6: Snapshot ─────────────────────────────────────────
const t5 = performance.now();
await snapshotVM(client, functionId);
const snapshotDuration = performance.now() - t5;
deployStageDuration.observe({ stage: "snapshot" }, snapshotDuration / 1000);
pipelineLogger.info(
{ functionId, stage: "snapshot", durationMs: snapshotDuration },
"VM snapshot created",
);

const totalDuration = performance.now() - t0;
deployStageDuration.observe({ stage: "complete" }, totalDuration / 1000);
deployTotal.inc({ status: "success" });

pipelineLogger.info(
{
functionId,
Expand All @@ -104,19 +108,22 @@ export async function deployFunction(zipPath: string) {
url: `http://localhost:3000/f/${functionId}`,
};
} catch (err) {
deployTotal.inc({ status: "error" });

pipelineLogger.error(
{ functionId, err },
"deployment pipeline failed",
);
throw err;
} finally {
// Always kill the FC process — whether deploy succeeded or failed
try { fc!?.kill("SIGKILL"); } catch { }

const t6 = performance.now();
await cleanupResources(paths);
const cleanupDuration = performance.now() - t6;
deployStageDuration.observe({ stage: "cleanup" }, cleanupDuration / 1000);
pipelineLogger.debug(
{ functionId, stage: "cleanup", durationMs: performance.now() - t6 },
{ functionId, stage: "cleanup", durationMs: cleanupDuration },
"post-deploy cleanup completed",
);
}
Expand Down
17 changes: 16 additions & 1 deletion src/routes/deploy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import { upload } from "../deploy/upload.js";
import { jobs, deployQueue } from "../deploy/queue.js";
import { deployFunction } from "../deploy/pipeline.js";
import { deployLogger } from "../utils/logger.js";
import {
deployQueueDepth,
deployQueueWaitTime,
} from "../utils/metrics.js";

export const deployRouter = Router();

Expand All @@ -24,6 +28,9 @@ deployRouter.post("/", upload.single("code"), async (req, res) => {
const jobId = crypto.randomBytes(8).toString("hex");
jobs.set(jobId, { state: "pending" });

deployQueueDepth.inc();
const enqueueTime = performance.now();

deployLogger.info(
{
jobId,
Expand All @@ -35,8 +42,14 @@ deployRouter.post("/", upload.single("code"), async (req, res) => {
);

deployQueue.add(async () => {
const waitDurationSec = (performance.now() - enqueueTime) / 1000;
deployQueueWaitTime.observe(waitDurationSec);

jobs.set(jobId, { state: "running" });
deployLogger.info({ jobId }, "deployment job started");
deployLogger.info(
{ jobId, queueWaitMs: waitDurationSec * 1000 },
"deployment job started",
);

try {
const result = await deployFunction(req.file!.path);
Expand All @@ -55,6 +68,8 @@ deployRouter.post("/", upload.single("code"), async (req, res) => {
{ jobId, err },
"deployment job failed",
);
} finally {
deployQueueDepth.dec();
}
});

Expand Down
1 change: 0 additions & 1 deletion src/routes/invoke.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { describe, it, expect, vi } from "vitest";

vi.mock("../runtime/scheduler.js", () => ({
enqueueRequest: vi.fn((functionId, task) => {
// Simulate immediate error (no snapshot exists)
task.reject(new Error("no snapshot"));
}),
}));
Expand Down
14 changes: 12 additions & 2 deletions src/routes/invoke.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { Router } from "express";
import { enqueueRequest } from "../runtime/scheduler.js";
import { runtimeLogger } from "../utils/logger.js";
import { invocationTotal, invocationTime } from "../utils/metrics.js";

export const invokeRouter = Router();

invokeRouter.use("/:functionId", async (req, res) => {
const { functionId } = req.params;
const subPath = req.path || "/";
const start = performance.now();

runtimeLogger.info(
{ functionId, method: req.method, subPath },
Expand All @@ -24,13 +26,21 @@ invokeRouter.use("/:functionId", async (req, res) => {
});
});

const durationSec = (performance.now() - start) / 1000;
invocationTime.observe({ function_id: functionId }, durationSec);
invocationTotal.inc({ function_id: functionId, status: "success" });

runtimeLogger.info(
{ functionId, method: req.method, subPath, statusCode: res.statusCode },
{ functionId, method: req.method, subPath, statusCode: res.statusCode, durationMs: durationSec * 1000 },
"function invocation completed",
);
} catch (e: any) {
const durationSec = (performance.now() - start) / 1000;
invocationTime.observe({ function_id: functionId }, durationSec);
invocationTotal.inc({ function_id: functionId, status: "error" });

runtimeLogger.error(
{ functionId, method: req.method, subPath, err: e },
{ functionId, method: req.method, subPath, err: e, durationMs: durationSec * 1000 },
"function invocation failed",
);

Expand Down
4 changes: 4 additions & 0 deletions src/runtime/cleanup.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import fs from "fs";
import { cleanupLogger } from "../utils/logger.js";
import { vmCleanupTotal, vmCount } from "../utils/metrics.js";

import type { RuntimeFunction, Vm } from "../types/types.js";

Expand Down Expand Up @@ -27,6 +28,9 @@ export async function cleanupVm(fn: RuntimeFunction, vm: Vm) {
} catch {}

fn.vms = fn.vms.filter((v) => v !== vm);
vmCleanupTotal.inc();
vmCount.dec({ function_id: fn.functionId, state: "ready" });

cleanupLogger.info(
{ functionId: fn.functionId, vmId: vm.id, remainingVms: fn.vms.length },
"VM cleanup completed",
Expand Down
5 changes: 5 additions & 0 deletions src/runtime/protocol.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { Socket } from "net";
import { protocolLogger } from "../utils/logger.js";
import { vsockErrors } from "../utils/metrics.js";

export function buildPayload(req: any, subPath: string): string {
return (
Expand Down Expand Up @@ -32,6 +33,7 @@ export function readVsockResponse(

const timer = setTimeout(() => {
protocolLogger.error({ timeoutMs: timeout }, "function execution timeout");
vsockErrors.inc({ error_type: "timeout" });
socket.destroy();
reject(new Error("Function timeout"));
}, timeout);
Expand Down Expand Up @@ -64,6 +66,7 @@ export function readVsockResponse(
return;
}
} catch {
vsockErrors.inc({ error_type: "parse_error" });
protocolLogger.error({ rawLine: line }, "invalid JSON received from VM");
}
}
Expand All @@ -72,13 +75,15 @@ export function readVsockResponse(
onError = (err) => {
clearTimeout(timer);
cleanup();
vsockErrors.inc({ error_type: "connection_error" });
protocolLogger.error({ err }, "vsock read error");
reject(err);
};

onEnd = () => {
clearTimeout(timer);
cleanup();
vsockErrors.inc({ error_type: "connection_closed" });
protocolLogger.error("vsock connection closed before response received");
reject(new Error("Connection closed before response"));
};
Expand Down
Loading
Loading