Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"better-sqlite3": "^11.5.0",
"commander": "^14.0.3",
"dotenv": "^16.4.5",
"hono": "^4.12.15",
"ink": "^7.0.1",
"ink-big-text": "^2.0.0",
"ink-gradient": "^4.0.0",
Expand Down
17 changes: 10 additions & 7 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions src/cockpit/web/auth.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { randomBytes, timingSafeEqual } from "node:crypto";

export const COCKPIT_TOKEN_HEADER = "X-Cockpit-Token";
export const COCKPIT_TOKEN_BYTES = 32;

export function generateCockpitToken(): string {
return randomBytes(COCKPIT_TOKEN_BYTES).toString("hex");
}

export function isLoopbackAddress(addr: string | undefined): boolean {
if (!addr) return false;
return (
addr === "127.0.0.1" || addr === "::1" || addr === "::ffff:127.0.0.1" || addr === "localhost"
);
}

export function verifyToken(provided: string | null | undefined, expected: string): boolean {
if (!provided || !expected) return false;
if (provided.length !== expected.length) return false;
try {
return timingSafeEqual(Buffer.from(provided, "utf8"), Buffer.from(expected, "utf8"));
} catch {
return false;
}
}
26 changes: 26 additions & 0 deletions src/cockpit/web/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,29 @@ export const COCKPIT_COMMAND_PATH_PREFIX = "/commands";
export const COCKPIT_SSE_HEADERS = {
[COCKPIT_PROTOCOL_HEADER]: String(COCKPIT_PROTOCOL_VERSION),
} as const;

export {
COCKPIT_TOKEN_HEADER,
generateCockpitToken,
isLoopbackAddress,
verifyToken,
} from "./auth";
export { createCockpitApp } from "./server";
export type { CockpitServerOptions } from "./server";
export { connectSSE } from "./sse-client";
export type { SSEClient, SSEClientOptions } from "./sse-client";
export {
initialTranscriptState,
reduceTranscriptEvent,
replayEvents,
} from "./transcript-reducer";
export type {
BudgetWarningEntry,
ErrorEntry,
PolicyGateEntry,
SkillProposalEntry,
SubagentEntry,
ToolCallEntry,
TranscriptMessage,
TranscriptState,
} from "./transcript-reducer";
92 changes: 92 additions & 0 deletions src/cockpit/web/server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { Hono } from "hono";
import { streamSSE } from "hono/streaming";
import type { ChatController } from "../core/controller";
import type { EventBus } from "../core/events";
import { COCKPIT_PROTOCOL_HEADER, COCKPIT_PROTOCOL_VERSION } from "../core/events";
import { COCKPIT_TOKEN_HEADER, verifyToken } from "./auth";

// ─── Server factory ─────────────────────────────────────────────────────────

export interface CockpitServerOptions {
readonly eventBus: EventBus;
readonly controller: ChatController;
readonly token: string;
}

export function createCockpitApp(opts: CockpitServerOptions): Hono {
const app = new Hono();

// Token auth middleware — every request must carry the cockpit token.
app.use("*", async (c, next) => {
const provided = c.req.header(COCKPIT_TOKEN_HEADER);
if (verifyToken(provided, opts.token)) {
await next();
return;
}
return c.json({ error: "unauthorized" }, 401);
});

// GET /events — SSE stream of CockpitEvents.
app.get("/events", (c) => {
return streamSSE(c, async (stream) => {
c.header(COCKPIT_PROTOCOL_HEADER, String(COCKPIT_PROTOCOL_VERSION));

const ac = new AbortController();
stream.onAbort(() => ac.abort());

const unsubscribe = opts.eventBus.subscribe((event) => {
if (!ac.signal.aborted) {
void stream.writeSSE({ data: JSON.stringify(event) });
}
});

await new Promise<void>((resolve) => {
if (ac.signal.aborted) {
resolve();
return;
}
ac.signal.addEventListener("abort", () => resolve());
});

unsubscribe();
});
});

// POST /input — submit user input to the ChatController.
app.post("/input", async (c) => {
const body = (await c.req.json()) as {
sessionId?: string;
text?: string;
metadata?: Record<string, unknown>;
};
if (!body.sessionId || !body.text) {
return c.json({ error: "sessionId and text required" }, 400);
}
await opts.controller.submit({
sessionId: body.sessionId,
text: body.text,
...(body.metadata !== undefined ? { metadata: body.metadata } : {}),
});
return c.json({ ok: true });
});

// POST /commands/:slash — dispatch a slash command.
app.post("/commands/:slash", async (c) => {
const slash = c.req.param("slash");
const body = (await c.req.json()) as {
sessionId?: string;
args?: readonly string[];
};
if (!body.sessionId) {
return c.json({ error: "sessionId required" }, 400);
}
await opts.controller.slash({
sessionId: body.sessionId,
command: slash,
args: body.args ?? [],
});
return c.json({ ok: true });
});

return app;
}
82 changes: 82 additions & 0 deletions src/cockpit/web/sse-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/**
* Fetch-based SSE client for the cockpit web renderer.
*
* Uses fetch + ReadableStream instead of native EventSource so we can pass the
* cockpit token via a custom header (EventSource only supports query params).
*
* This module is browser-compatible — no Node imports.
*/

import type { CockpitEvent } from "../core/events";
import { COCKPIT_TOKEN_HEADER } from "./auth";

export interface SSEClientOptions {
readonly url: string;
readonly token: string;
readonly onEvent: (event: CockpitEvent) => void;
readonly onError?: (error: Error) => void;
readonly onOpen?: () => void;
}

export interface SSEClient {
close(): void;
}

export function connectSSE(opts: SSEClientOptions): SSEClient {
const ac = new AbortController();

const run = async (): Promise<void> => {
const response = await fetch(opts.url, {
headers: {
[COCKPIT_TOKEN_HEADER]: opts.token,
Accept: "text/event-stream",
},
signal: ac.signal,
});

if (!response.ok) {
throw new Error(`SSE connection failed: ${String(response.status)}`);
}

opts.onOpen?.();

const reader = response.body?.getReader();
if (!reader) throw new Error("No response body");

const decoder = new TextDecoder();
let buffer = "";

for (;;) {
const { done, value } = await reader.read();
if (done) break;

buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";

for (const line of lines) {
if (line.startsWith("data:")) {
const data = line.slice(5).trim();
if (!data) continue;
try {
opts.onEvent(JSON.parse(data) as CockpitEvent);
} catch {
// skip malformed events
}
}
}
}
};

run().catch((err: unknown) => {
if (!ac.signal.aborted) {
opts.onError?.(err instanceof Error ? err : new Error(String(err)));
}
});

return {
close(): void {
ac.abort();
},
};
}
Loading
Loading