From 94233e51914246dbec6a4a18440108e51d492806 Mon Sep 17 00:00:00 2001 From: Alexander Eklund Date: Sat, 4 Apr 2026 12:41:29 +0200 Subject: [PATCH] feat: wire port exposure WebSocket proxy + MCP gateway lifecycle MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sprint 2 wiring — connects existing but disconnected components: - Worker proxy: use getSessionConnection() instead of type assertion, add WebSocket upgrade handling via Hono upgradeWebSocket - Control plane expose: add WebSocket proxy route (/s/:sessionId/ws/*), path-prefix port resolution, pass upgradeWebSocket from deps - Session creation: freeze exposed port URLs at dispatch time via generateExposedUrls() — URLs available immediately to clients - Worker server: instantiate createMcpGateway() with env config (MCP_GATEWAY_ENABLED, MCP_GATEWAY_PORT, etc.), pass to executor - Worker server: add createBunWebSocket() + export websocket handler Co-Authored-By: Claude Opus 4.6 (1M context) --- apps/control-plane/src/app.ts | 8 +- apps/control-plane/src/routes/expose.ts | 162 +++++++++++++++++++++++- apps/worker/src/routes.ts | 5 +- apps/worker/src/routes/proxy.ts | 146 ++++++++++++++++----- apps/worker/src/server.ts | 25 ++++ 5 files changed, 308 insertions(+), 38 deletions(-) diff --git a/apps/control-plane/src/app.ts b/apps/control-plane/src/app.ts index b9e1bf3..d9fed82 100644 --- a/apps/control-plane/src/app.ts +++ b/apps/control-plane/src/app.ts @@ -74,7 +74,7 @@ import { createAuthRoutes } from './routes/auth.js'; import { registerWorkerWebSocket } from './routes/worker-ws.js'; import { healthRoute } from './routes/health.js'; import { createMcpRoutes } from './routes/mcp.js'; -import { createExposeRoutes } from './routes/expose.js'; +import { createExposeRoutes, generateExposedUrls } from './routes/expose.js'; import { createEnrollmentRoutes, createEnrollmentStore, @@ -1930,6 +1930,7 @@ export async function createControlPlaneApp(deps: ControlPlaneDeps) { const exposeRoutes = createExposeRoutes({ sessionStore, + ...(deps.upgradeWebSocket ? { upgradeWebSocket: deps.upgradeWebSocket } : {}), }); app.route('/', exposeRoutes); @@ -2043,8 +2044,13 @@ function dispatchSession( sessionId: string, request: Parameters[1], ): void { + // Freeze exposed port URLs at dispatch time — URLs are available immediately + const expose = request.network?.expose ?? []; + const exposedPorts = expose.length > 0 ? generateExposedUrls(sessionId, expose) : undefined; + sessionStore.updateStatus(sessionId, 'running', { startedAt: new Date().toISOString(), + ...(exposedPorts ? { exposedPorts } : {}), }); void (async () => { diff --git a/apps/control-plane/src/routes/expose.ts b/apps/control-plane/src/routes/expose.ts index 6cf73cc..ad4152b 100644 --- a/apps/control-plane/src/routes/expose.ts +++ b/apps/control-plane/src/routes/expose.ts @@ -1,5 +1,6 @@ import { createRoute, z } from '@hono/zod-openapi'; import { OpenAPIHono } from '@hono/zod-openapi'; +import type { UpgradeWebSocket } from 'hono/ws'; import { createLogger } from '@paws/logger'; import type { SessionStore } from '@paws/domain-session'; @@ -13,6 +14,8 @@ export interface ExposeDeps { sessionStore: SessionStore; /** Base domain for session URLs (e.g. "fleet.tpops.dev") */ fleetDomain?: string; + /** WebSocket upgrade function (needed for WS proxying to VMs) */ + upgradeWebSocket?: UpgradeWebSocket; } // --------------------------------------------------------------------------- @@ -22,7 +25,12 @@ export interface ExposeDeps { /** Generate exposed port URLs for a session */ export function generateExposedUrls( sessionId: string, - expose: Array<{ port: number; label?: string; access?: string; pathPrefix?: string }>, + expose: Array<{ + port: number; + label?: string | undefined; + access?: string | undefined; + pathPrefix?: string | undefined; + }>, fleetDomain?: string, ): Array<{ port: number; url: string; label?: string; access?: string; pin?: string }> { const baseUrl = fleetDomain ?? 'localhost:3000'; @@ -76,6 +84,28 @@ const exposeHealthRoute = createRoute({ }, }); +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** Resolve which exposed port to proxy to based on the request path */ +function resolvePort( + path: string, + expose: Array<{ port: number; pathPrefix?: string }>, +): number | undefined { + if (expose.length === 0) return undefined; + + // Try path-prefix matching first (longest match wins) + const withPrefix = expose + .filter((e) => e.pathPrefix && e.pathPrefix !== '/' && path.startsWith(e.pathPrefix)) + .sort((a, b) => (b.pathPrefix?.length ?? 0) - (a.pathPrefix?.length ?? 0)); + + if (withPrefix.length > 0) return withPrefix[0]!.port; + + // Default to the first exposed port + return expose[0]!.port; +} + // --------------------------------------------------------------------------- // Factory // --------------------------------------------------------------------------- @@ -109,6 +139,124 @@ export function createExposeRoutes(deps: ExposeDeps) { } }); + // WebSocket proxy: /s/:sessionId/ws/* → worker WS proxy → VM + if (deps.upgradeWebSocket) { + const upgradeWs = deps.upgradeWebSocket; + + app.get( + '/s/:sessionId/ws/*', + upgradeWs((c) => { + const sessionId = c.req.param('sessionId')!; + + const session = sessionStore.get(sessionId); + if (!session || session.status !== 'running') { + return { + onOpen(_evt, ws) { + ws.close(4004, 'Session not found or not running'); + }, + }; + } + + const expose = session.request.network?.expose ?? []; + if (expose.length === 0) { + return { + onOpen(_evt, ws) { + ws.close(4003, 'No ports exposed'); + }, + }; + } + + const workerUrl = session.worker; + if (!workerUrl) { + return { + onOpen(_evt, ws) { + ws.close(4502, 'No worker assigned'); + }, + }; + } + + const prefix = `/s/${sessionId}/ws`; + const remainingPath = c.req.path.slice(prefix.length) || '/'; + const queryString = new URL(c.req.url).search; + const targetPort = resolvePort(remainingPath, expose); + + if (!targetPort) { + return { + onOpen(_evt, ws) { + ws.close(4003, 'Port not exposed'); + }, + }; + } + + // Convert worker HTTP URL to WS URL for the worker's WS proxy endpoint + const workerWsUrl = workerUrl.replace(/^http:/, 'ws:').replace(/^https:/, 'wss:'); + const backendUrl = `${workerWsUrl}/v1/sessions/${sessionId}/proxy/${targetPort}/ws${remainingPath}${queryString}`; + + let backendWs: WebSocket | null = null; + + return { + onOpen(_evt, clientWs) { + log.debug('WebSocket proxy opening', { + sessionId, + port: targetPort, + backend: backendUrl, + }); + + backendWs = new WebSocket(backendUrl); + + backendWs.addEventListener('open', () => { + log.debug('Backend WebSocket connected', { sessionId, port: targetPort }); + }); + + backendWs.addEventListener('message', (evt) => { + try { + if (typeof evt.data === 'string') { + clientWs.send(evt.data); + } else if (evt.data instanceof ArrayBuffer) { + clientWs.send(new Uint8Array(evt.data)); + } + } catch { + // Client disconnected + } + }); + + backendWs.addEventListener('close', (evt) => { + clientWs.close(evt.code, evt.reason); + }); + + backendWs.addEventListener('error', () => { + clientWs.close(4502, 'Backend connection failed'); + }); + }, + + onMessage(evt, _ws) { + if (backendWs?.readyState === WebSocket.OPEN) { + if (typeof evt.data === 'string') { + backendWs.send(evt.data); + } else if (evt.data instanceof ArrayBuffer) { + backendWs.send(evt.data); + } + } + }, + + onClose(_evt, _ws) { + if (backendWs && backendWs.readyState !== WebSocket.CLOSED) { + backendWs.close(); + } + backendWs = null; + }, + + onError(_evt, _ws) { + if (backendWs && backendWs.readyState !== WebSocket.CLOSED) { + backendWs.close(); + } + backendWs = null; + }, + }; + }), + ); + } + // Catch-all reverse proxy: /s/:sessionId/* → worker → VM app.all('/s/:sessionId/*', async (c) => { const sessionId = c.req.param('sessionId'); @@ -128,9 +276,12 @@ export function createExposeRoutes(deps: ExposeDeps) { // Strip /s/:sessionId prefix to get the remaining path const prefix = `/s/${sessionId}`; const remainingPath = c.req.path.slice(prefix.length) || '/'; + const queryString = new URL(c.req.url).search; - // Default to first exposed port (path-based routing is a future enhancement) - const targetPort = expose[0]!.port; + const targetPort = resolvePort(remainingPath, expose); + if (!targetPort) { + return c.text('Port not exposed', 403); + } // Forward to worker const workerUrl = session.worker; @@ -138,7 +289,7 @@ export function createExposeRoutes(deps: ExposeDeps) { return c.text('No worker assigned to session', 502); } - const targetUrl = `${workerUrl}/v1/sessions/${sessionId}/proxy/${targetPort}${remainingPath}`; + const targetUrl = `${workerUrl}/v1/sessions/${sessionId}/proxy/${targetPort}${remainingPath}${queryString}`; log.debug('Proxying to worker', { sessionId, @@ -170,9 +321,6 @@ export function createExposeRoutes(deps: ExposeDeps) { } }); - // Subdomain-based routing middleware (optional, applied at app level) - // Extracts session ID from Host header: s-abc123.fleet.tpops.dev → sessionId=abc123 - return app; } diff --git a/apps/worker/src/routes.ts b/apps/worker/src/routes.ts index 9682d8f..c0dfd8d 100644 --- a/apps/worker/src/routes.ts +++ b/apps/worker/src/routes.ts @@ -1,6 +1,7 @@ import { randomUUID } from 'node:crypto'; import { Hono } from 'hono'; +import type { UpgradeWebSocket } from 'hono/ws'; import { createLogger } from '@paws/logger'; import { tracingMiddleware } from '@paws/telemetry'; import { BrowserActionSchema } from '@paws/domain-browser'; @@ -21,6 +22,8 @@ export interface AppDeps { syncLoop?: SyncLoop | undefined; /** Config for snapshot builder (optional — needed for build endpoint) */ snapshotBuilderConfig?: SnapshotBuilderConfig | undefined; + /** WebSocket upgrade function (needed for WS proxy to VMs) */ + upgradeWebSocket?: UpgradeWebSocket | undefined; } const log = createLogger('routes'); @@ -551,7 +554,7 @@ const ws = require('ws'); }); // --- Inbound proxy (VM port exposure) --- - registerProxyRoutes(app, executor); + registerProxyRoutes(app, executor, deps.upgradeWebSocket); return app; } diff --git a/apps/worker/src/routes/proxy.ts b/apps/worker/src/routes/proxy.ts index 0e6b3da..1ece19a 100644 --- a/apps/worker/src/routes/proxy.ts +++ b/apps/worker/src/routes/proxy.ts @@ -1,4 +1,5 @@ import type { Hono } from 'hono'; +import type { UpgradeWebSocket } from 'hono/ws'; import { createLogger } from '@paws/logger'; import type { Executor } from '../session/executor.js'; @@ -12,11 +13,98 @@ const log = createLogger('worker-proxy'); * ports via the TAP device. The worker host has a direct route to the guest IP * (172.16.x.2) — no iptables changes needed for inbound traffic. * - * WebSocket upgrade is handled transparently: if the incoming request has - * `Connection: Upgrade`, the response is streamed back as-is. + * WebSocket upgrades are detected and proxied via Bun's native WebSocket. */ -export function registerProxyRoutes(app: Hono, executor: Executor) { - // ALL /v1/sessions/:id/proxy/:port/* — forward to VM guest +export function registerProxyRoutes( + app: Hono, + executor: Executor, + upgradeWebSocket?: UpgradeWebSocket, +) { + // WebSocket proxy route (must be registered before the catch-all) + if (upgradeWebSocket) { + app.get( + '/v1/sessions/:id/proxy/:port{[0-9]+}/ws/*', + upgradeWebSocket((c) => { + const sessionId = c.req.param('id')!; + const portStr = c.req.param('port')!; + const port = parseInt(portStr, 10); + + const conn = executor.getSessionConnection(sessionId); + if (!conn) { + return { + onOpen(_evt, ws) { + ws.close(4004, 'Session not found'); + }, + }; + } + + const proxyPrefix = `/v1/sessions/${sessionId}/proxy/${portStr}/ws`; + const remainingPath = c.req.path.slice(proxyPrefix.length) || '/'; + const queryString = new URL(c.req.url).search; + const targetWsUrl = `ws://${conn.guestIp}:${port}${remainingPath}${queryString}`; + + let backendWs: WebSocket | null = null; + + return { + onOpen(_evt, clientWs) { + log.debug('WebSocket proxy opening', { sessionId, port, target: targetWsUrl }); + + backendWs = new WebSocket(targetWsUrl); + + backendWs.addEventListener('open', () => { + log.debug('Backend WebSocket connected', { sessionId, port }); + }); + + backendWs.addEventListener('message', (evt) => { + try { + if (typeof evt.data === 'string') { + clientWs.send(evt.data); + } else if (evt.data instanceof ArrayBuffer) { + clientWs.send(new Uint8Array(evt.data)); + } + } catch { + // Client disconnected + } + }); + + backendWs.addEventListener('close', (evt) => { + clientWs.close(evt.code, evt.reason); + }); + + backendWs.addEventListener('error', () => { + clientWs.close(4502, 'Backend connection failed'); + }); + }, + + onMessage(evt, _ws) { + if (backendWs?.readyState === WebSocket.OPEN) { + if (typeof evt.data === 'string') { + backendWs.send(evt.data); + } else if (evt.data instanceof ArrayBuffer) { + backendWs.send(evt.data); + } + } + }, + + onClose(_evt, _ws) { + if (backendWs && backendWs.readyState !== WebSocket.CLOSED) { + backendWs.close(); + } + backendWs = null; + }, + + onError(_evt, _ws) { + if (backendWs && backendWs.readyState !== WebSocket.CLOSED) { + backendWs.close(); + } + backendWs = null; + }, + }; + }), + ); + } + + // ALL /v1/sessions/:id/proxy/:port/* — forward HTTP to VM guest app.all('/v1/sessions/:id/proxy/:port{[0-9]+}/*', async (c) => { const sessionId = c.req.param('id'); const portStr = c.req.param('port'); @@ -26,9 +114,9 @@ export function registerProxyRoutes(app: Hono, executor: Executor) { return c.json({ error: { code: 'VALIDATION_ERROR', message: 'Invalid port' } }, 400); } - // Look up active session - const session = executor.activeSessions.get(sessionId); - if (!session || session.status !== 'running') { + // Look up session connection via runtime adapter + const conn = executor.getSessionConnection(sessionId); + if (!conn) { return c.json( { error: { @@ -40,28 +128,34 @@ export function registerProxyRoutes(app: Hono, executor: Executor) { ); } - // The session must have a network allocation with a guest IP - const allocation = (session as { allocation?: { guestIp: string } }).allocation; - if (!allocation) { - return c.json( - { error: { code: 'INTERNAL_ERROR', message: 'Session has no network allocation' } }, - 500, - ); - } - // Build target URL — strip the proxy prefix to get the remaining path const proxyPrefix = `/v1/sessions/${sessionId}/proxy/${portStr}`; const remainingPath = c.req.path.slice(proxyPrefix.length) || '/'; - const targetUrl = `http://${allocation.guestIp}:${port}${remainingPath}`; + const queryString = new URL(c.req.url).search; + const targetUrl = `http://${conn.guestIp}:${port}${remainingPath}${queryString}`; + + // If client sends Upgrade: websocket on the catch-all route (no upgradeWebSocket available), + // redirect them to the /ws/ sub-path + if (c.req.header('upgrade')?.toLowerCase() === 'websocket') { + if (!upgradeWebSocket) { + return c.json( + { error: { code: 'NOT_AVAILABLE', message: 'WebSocket proxy not configured' } }, + 501, + ); + } + // Shouldn't reach here — the WS route above should match first + return c.json( + { error: { code: 'PROXY_ERROR', message: 'Use /ws/ path for WebSocket connections' } }, + 400, + ); + } log.debug('Proxying to VM', { sessionId, port, targetUrl }); try { - // Forward the request to the VM const headers = new Headers(c.req.raw.headers); - // Remove hop-by-hop headers that shouldn't be forwarded headers.delete('host'); - headers.set('host', `${allocation.guestIp}:${port}`); + headers.set('host', `${conn.guestIp}:${port}`); const response = await fetch(targetUrl, { method: c.req.method, @@ -70,7 +164,6 @@ export function registerProxyRoutes(app: Hono, executor: Executor) { redirect: 'manual', }); - // Stream the response back return new Response(response.body, { status: response.status, statusText: response.statusText, @@ -96,18 +189,13 @@ export function registerProxyRoutes(app: Hono, executor: Executor) { const sessionId = c.req.param('id'); const port = parseInt(c.req.param('port'), 10); - const session = executor.activeSessions.get(sessionId); - if (!session || session.status !== 'running') { + const conn = executor.getSessionConnection(sessionId); + if (!conn) { return c.json({ healthy: false, reason: 'session not found' }, 404); } - const allocation = (session as { allocation?: { guestIp: string } }).allocation; - if (!allocation) { - return c.json({ healthy: false, reason: 'no allocation' }, 500); - } - try { - const res = await fetch(`http://${allocation.guestIp}:${port}/`, { + const res = await fetch(`http://${conn.guestIp}:${port}/`, { method: 'HEAD', signal: AbortSignal.timeout(2000), }); diff --git a/apps/worker/src/server.ts b/apps/worker/src/server.ts index ce1a316..56ab3f0 100644 --- a/apps/worker/src/server.ts +++ b/apps/worker/src/server.ts @@ -1,3 +1,4 @@ +import { createBunWebSocket } from 'hono/bun'; import { createLogger, setGlobalLogEnricher } from '@paws/logger'; import { initTracing, activeTraceId, activeSpanId } from '@paws/telemetry'; import { createPortPool } from '@paws/firecracker'; @@ -15,6 +16,7 @@ import { createRuntimeRegistry } from '@paws/runtime'; import { createCallHome } from './call-home.js'; import { createSessionApp } from './routes.js'; import { createExecutor } from './session/executor.js'; +import { createMcpGateway } from './mcp/gateway.js'; import { createSemaphore } from './semaphore.js'; import { createSyncLoop } from './sync/sync-loop.js'; import type { SyncLoop } from './sync/sync-loop.js'; @@ -88,6 +90,22 @@ const llmGateway = } : undefined; +// MCP gateway (optional — agentgateway for secure MCP tool access) +const MCP_GATEWAY_ENABLED = process.env['MCP_GATEWAY_ENABLED'] === 'true'; +const MCP_GATEWAY_CONFIG_PATH = + process.env['MCP_GATEWAY_CONFIG_PATH'] ?? '/etc/agentgateway/config.yaml'; +const MCP_GATEWAY_PORT = parseInt(process.env['MCP_GATEWAY_PORT'] ?? '4317', 10); +const MCP_GATEWAY_READINESS_URL = + process.env['MCP_GATEWAY_READINESS_URL'] ?? 'http://localhost:15020/healthz/ready'; + +const mcpGateway = MCP_GATEWAY_ENABLED + ? createMcpGateway({ + configPath: MCP_GATEWAY_CONFIG_PATH, + port: MCP_GATEWAY_PORT, + readinessUrl: MCP_GATEWAY_READINESS_URL, + }) + : undefined; + // --- Runtime setup --- // Create the Firecracker runtime adapter (owns all VM lifecycle) const firecrackerRuntime = createFirecrackerRuntime({ @@ -110,6 +128,7 @@ const executor = createExecutor({ semaphore, workerName: WORKER_NAME, ...(llmGateway ? { llmGateway } : {}), + ...(mcpGateway ? { mcpGateway } : {}), }); let syncLoop: SyncLoop | undefined; @@ -136,11 +155,15 @@ if (SNAPSHOT_SYNC_ENABLED) { log.info('Snapshot sync enabled', { pollIntervalMs: SNAPSHOT_SYNC_INTERVAL_MS }); } +// WebSocket support for proxy routes (HMR, dev servers, terminals) +const { upgradeWebSocket, websocket } = createBunWebSocket(); + const app = createSessionApp({ executor, semaphore, workerName: WORKER_NAME, syncLoop, + upgradeWebSocket, snapshotBuilderConfig: { snapshotBaseDir: SNAPSHOT_BASE_DIR, outputDir: SNAPSHOT_BASE_DIR, @@ -198,10 +221,12 @@ Worker: ${WORKER_NAME} Snapshot sync: ${SNAPSHOT_SYNC_ENABLED ? 'enabled' : 'disabled'} Port exposure: ${portExposureStatus} LLM gateway: ${llmGateway ? `${llmGateway.name} (${llmGateway.url})` : 'direct (set LLM_GATEWAY + LLM_GATEWAY_KEY)'} +MCP gateway: ${mcpGateway ? `enabled (port ${MCP_GATEWAY_PORT})` : 'disabled (set MCP_GATEWAY_ENABLED=true)'} Call-home: ${GATEWAY_URL ? `${GATEWAY_URL}` : 'disabled (set GATEWAY_URL + API_KEY)'} `); export default { port: PORT, fetch: app.fetch, + websocket, };