Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion apps/control-plane/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
import { registerWorkerWebSocket } from './routes/worker-ws.js';
import { healthRoute } from './routes/health.js';
import { createMcpRoutes } from './routes/mcp.js';
import { createExposeRoutes } from './routes/expose.js';
import { createExposeRoutes, generateExposedUrls } from './routes/expose.js';
import {
createEnrollmentRoutes,
createEnrollmentStore,
Expand Down Expand Up @@ -1357,7 +1357,7 @@
TRIGGER_TYPE: event.type,
GITHUB_REPO: event.repo,
GITHUB_SENDER: event.sender,
};

Check warning on line 1360 in apps/control-plane/src/app.ts

View workflow job for this annotation

GitHub Actions / check

eslint-plugin-unicorn(no-useless-fallback-in-spread)

Empty fallbacks in spreads are unnecessary

if (event.type === 'mention') {
envVars['GITHUB_COMMAND'] = event.command;
Expand Down Expand Up @@ -1930,6 +1930,7 @@

const exposeRoutes = createExposeRoutes({
sessionStore,
...(deps.upgradeWebSocket ? { upgradeWebSocket: deps.upgradeWebSocket } : {}),
});
app.route('/', exposeRoutes);

Expand Down Expand Up @@ -2043,8 +2044,13 @@
sessionId: string,
request: Parameters<WorkerClient['createSession']>[1],
): void {
// Freeze exposed port URLs at dispatch time — URLs are available immediately
const expose = request.network?.expose ?? [];
const exposedPorts = expose.length > 0 ? generateExposedUrls(sessionId, expose) : undefined;

sessionStore.updateStatus(sessionId, 'running', {
startedAt: new Date().toISOString(),
...(exposedPorts ? { exposedPorts } : {}),
Comment on lines +2047 to +2053
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

These frozen exposed URLs still default to localhost.

Lines 2047-2049 call generateExposedUrls(sessionId, expose) without a fleetDomain. In apps/control-plane/src/routes/expose.ts, that falls back to http://s-<id>.localhost:3000/..., so every persisted exposedPorts[].url will be wrong outside local dev. Please thread the actual fleet/public domain into dispatchSession() before storing these URLs.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/control-plane/src/app.ts` around lines 2047 - 2053, The persisted
exposedPorts URLs are built without the fleet/public domain; update
dispatchSession to pass the real fleet/public domain through and use it when
calling generateExposedUrls, so generateExposedUrls(sessionId, expose,
fleetDomain) (or equivalent) is invoked instead of the two-arg form; ensure the
same fleetDomain value is threaded into the code path that computes exposedPorts
before calling sessionStore.updateStatus(sessionId, 'running', { startedAt: ...,
exposedPorts }), and update any call-sites that construct exposed URLs (e.g.,
generateExposedUrls and the expose route logic) to accept and use the
fleet/public domain rather than defaulting to localhost.

});

void (async () => {
Expand Down
162 changes: 155 additions & 7 deletions apps/control-plane/src/routes/expose.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { createRoute, z } from '@hono/zod-openapi';
import { OpenAPIHono } from '@hono/zod-openapi';
import type { UpgradeWebSocket } from 'hono/ws';
import { createLogger } from '@paws/logger';
import type { SessionStore } from '@paws/domain-session';

Expand All @@ -13,6 +14,8 @@ export interface ExposeDeps {
sessionStore: SessionStore;
/** Base domain for session URLs (e.g. "fleet.tpops.dev") */
fleetDomain?: string;
/** WebSocket upgrade function (needed for WS proxying to VMs) */
upgradeWebSocket?: UpgradeWebSocket;
}

// ---------------------------------------------------------------------------
Expand All @@ -22,7 +25,12 @@ export interface ExposeDeps {
/** Generate exposed port URLs for a session */
export function generateExposedUrls(
sessionId: string,
expose: Array<{ port: number; label?: string; access?: string; pathPrefix?: string }>,
expose: Array<{
port: number;
label?: string | undefined;
access?: string | undefined;
pathPrefix?: string | undefined;
}>,
fleetDomain?: string,
): Array<{ port: number; url: string; label?: string; access?: string; pin?: string }> {
const baseUrl = fleetDomain ?? 'localhost:3000';
Expand Down Expand Up @@ -76,6 +84,28 @@ const exposeHealthRoute = createRoute({
},
});

// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------

/** Resolve which exposed port to proxy to based on the request path */
function resolvePort(
path: string,
expose: Array<{ port: number; pathPrefix?: string }>,
): number | undefined {
if (expose.length === 0) return undefined;

// Try path-prefix matching first (longest match wins)
const withPrefix = expose
.filter((e) => e.pathPrefix && e.pathPrefix !== '/' && path.startsWith(e.pathPrefix))
.sort((a, b) => (b.pathPrefix?.length ?? 0) - (a.pathPrefix?.length ?? 0));

if (withPrefix.length > 0) return withPrefix[0]!.port;

// Default to the first exposed port
return expose[0]!.port;
Comment on lines +92 to +106
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't fall back to the first port when no prefix matches.

When every exposure is path-scoped, resolvePort() can never return undefined because it always falls back to expose[0]. That makes the new 403 branch unreachable and can send /s/:sessionId/<unknown> or /s/:sessionId/ws/<unknown> to the wrong service instead of rejecting it.

Suggested fix
 function resolvePort(
   path: string,
   expose: Array<{ port: number; pathPrefix?: string }>,
 ): number | undefined {
   if (expose.length === 0) return undefined;
 
   // Try path-prefix matching first (longest match wins)
   const withPrefix = expose
-    .filter((e) => e.pathPrefix && e.pathPrefix !== '/' && path.startsWith(e.pathPrefix))
+    .filter((e) => {
+      const prefix = e.pathPrefix;
+      return (
+        prefix !== undefined &&
+        prefix !== '/' &&
+        (path === prefix || path.startsWith(`${prefix}/`))
+      );
+    })
     .sort((a, b) => (b.pathPrefix?.length ?? 0) - (a.pathPrefix?.length ?? 0));
 
   if (withPrefix.length > 0) return withPrefix[0]!.port;
 
-  // Default to the first exposed port
-  return expose[0]!.port;
+  // Only fall back to an explicit default exposure
+  return expose.find((e) => !e.pathPrefix || e.pathPrefix === '/')?.port;
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
function resolvePort(
path: string,
expose: Array<{ port: number; pathPrefix?: string }>,
): number | undefined {
if (expose.length === 0) return undefined;
// Try path-prefix matching first (longest match wins)
const withPrefix = expose
.filter((e) => e.pathPrefix && e.pathPrefix !== '/' && path.startsWith(e.pathPrefix))
.sort((a, b) => (b.pathPrefix?.length ?? 0) - (a.pathPrefix?.length ?? 0));
if (withPrefix.length > 0) return withPrefix[0]!.port;
// Default to the first exposed port
return expose[0]!.port;
function resolvePort(
path: string,
expose: Array<{ port: number; pathPrefix?: string }>,
): number | undefined {
if (expose.length === 0) return undefined;
// Try path-prefix matching first (longest match wins)
const withPrefix = expose
.filter((e) => {
const prefix = e.pathPrefix;
return (
prefix !== undefined &&
prefix !== '/' &&
(path === prefix || path.startsWith(`${prefix}/`))
);
})
.sort((a, b) => (b.pathPrefix?.length ?? 0) - (a.pathPrefix?.length ?? 0));
if (withPrefix.length > 0) return withPrefix[0]!.port;
// Only fall back to an explicit default exposure
return expose.find((e) => !e.pathPrefix || e.pathPrefix === '/')?.port;
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/control-plane/src/routes/expose.ts` around lines 92 - 106, resolvePort
currently always falls back to expose[0] which masks the case where all
exposures are path-scoped and prevents returning undefined; update resolvePort
so that after trying longest pathPrefix matches it only returns a default port
if there exists an exposure that is not path-scoped (e.g. has no pathPrefix or
pathPrefix === '/'); otherwise return undefined. Locate the resolvePort function
and adjust the fallback logic (the withPrefix computation and final return) to
search expose for a non-scoped entry and return its port, or return undefined
when none exists.

}

// ---------------------------------------------------------------------------
// Factory
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -109,6 +139,124 @@ export function createExposeRoutes(deps: ExposeDeps) {
}
});

// WebSocket proxy: /s/:sessionId/ws/* → worker WS proxy → VM
if (deps.upgradeWebSocket) {
const upgradeWs = deps.upgradeWebSocket;

app.get(
'/s/:sessionId/ws/*',
upgradeWs((c) => {
const sessionId = c.req.param('sessionId')!;

const session = sessionStore.get(sessionId);
if (!session || session.status !== 'running') {
return {
onOpen(_evt, ws) {
ws.close(4004, 'Session not found or not running');
},
};
}

const expose = session.request.network?.expose ?? [];
if (expose.length === 0) {
return {
onOpen(_evt, ws) {
ws.close(4003, 'No ports exposed');
},
};
}

const workerUrl = session.worker;
if (!workerUrl) {
return {
onOpen(_evt, ws) {
ws.close(4502, 'No worker assigned');
},
};
}

const prefix = `/s/${sessionId}/ws`;
const remainingPath = c.req.path.slice(prefix.length) || '/';
const queryString = new URL(c.req.url).search;
const targetPort = resolvePort(remainingPath, expose);

if (!targetPort) {
return {
onOpen(_evt, ws) {
ws.close(4003, 'Port not exposed');
},
};
}

// Convert worker HTTP URL to WS URL for the worker's WS proxy endpoint
const workerWsUrl = workerUrl.replace(/^http:/, 'ws:').replace(/^https:/, 'wss:');
const backendUrl = `${workerWsUrl}/v1/sessions/${sessionId}/proxy/${targetPort}/ws${remainingPath}${queryString}`;

let backendWs: WebSocket | null = null;

return {
onOpen(_evt, clientWs) {
log.debug('WebSocket proxy opening', {
sessionId,
port: targetPort,
backend: backendUrl,
});

backendWs = new WebSocket(backendUrl);

backendWs.addEventListener('open', () => {
log.debug('Backend WebSocket connected', { sessionId, port: targetPort });
});

backendWs.addEventListener('message', (evt) => {
try {
if (typeof evt.data === 'string') {
clientWs.send(evt.data);
} else if (evt.data instanceof ArrayBuffer) {
clientWs.send(new Uint8Array(evt.data));
}
} catch {
// Client disconnected
}
});

backendWs.addEventListener('close', (evt) => {
clientWs.close(evt.code, evt.reason);
});

backendWs.addEventListener('error', () => {
clientWs.close(4502, 'Backend connection failed');
});
},

onMessage(evt, _ws) {
if (backendWs?.readyState === WebSocket.OPEN) {
if (typeof evt.data === 'string') {
backendWs.send(evt.data);
} else if (evt.data instanceof ArrayBuffer) {
backendWs.send(evt.data);
}
}
Comment on lines +197 to +239
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Buffer client frames until the worker WebSocket is open.

The client connection is accepted before backendWs reaches OPEN, but onMessage() drops anything sent while it is still CONNECTING. Clients that send an init/auth frame immediately after open will intermittently fail on this hop.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/control-plane/src/routes/expose.ts` around lines 197 - 239, The
onMessage handler drops client frames if backendWs is not yet OPEN; fix by
adding a small send-queue: create a messages array scoped alongside backendWs
and, in onOpen (backendWs 'open' listener), flush the queued frames to backendWs
(preserving string vs ArrayBuffer/Uint8Array handling) and clear the queue;
update onMessage to push frames onto the queue when backendWs is CONNECTING (or
undefined) instead of dropping them, and ensure that backendWs 'close' and
'error' handlers clear the queue and optionally reject/close clientWs with an
error so buffered frames are not leaked; reference backendWs, onOpen/onMessage,
and the backendWs 'open'/'close'/'error' listeners.

},

onClose(_evt, _ws) {
if (backendWs && backendWs.readyState !== WebSocket.CLOSED) {
backendWs.close();
}
backendWs = null;
},

onError(_evt, _ws) {
if (backendWs && backendWs.readyState !== WebSocket.CLOSED) {
backendWs.close();
}
backendWs = null;
},
};
}),
);
}

// Catch-all reverse proxy: /s/:sessionId/* → worker → VM
app.all('/s/:sessionId/*', async (c) => {
const sessionId = c.req.param('sessionId');
Expand All @@ -128,17 +276,20 @@ export function createExposeRoutes(deps: ExposeDeps) {
// Strip /s/:sessionId prefix to get the remaining path
const prefix = `/s/${sessionId}`;
const remainingPath = c.req.path.slice(prefix.length) || '/';
const queryString = new URL(c.req.url).search;

// Default to first exposed port (path-based routing is a future enhancement)
const targetPort = expose[0]!.port;
const targetPort = resolvePort(remainingPath, expose);
if (!targetPort) {
return c.text('Port not exposed', 403);
}

// Forward to worker
const workerUrl = session.worker;
if (!workerUrl) {
return c.text('No worker assigned to session', 502);
}

const targetUrl = `${workerUrl}/v1/sessions/${sessionId}/proxy/${targetPort}${remainingPath}`;
const targetUrl = `${workerUrl}/v1/sessions/${sessionId}/proxy/${targetPort}${remainingPath}${queryString}`;

log.debug('Proxying to worker', {
sessionId,
Expand Down Expand Up @@ -170,9 +321,6 @@ export function createExposeRoutes(deps: ExposeDeps) {
}
});

// Subdomain-based routing middleware (optional, applied at app level)
// Extracts session ID from Host header: s-abc123.fleet.tpops.dev → sessionId=abc123

return app;
}

Expand Down
5 changes: 4 additions & 1 deletion apps/worker/src/routes.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { randomUUID } from 'node:crypto';

import { Hono } from 'hono';
import type { UpgradeWebSocket } from 'hono/ws';
import { createLogger } from '@paws/logger';
import { tracingMiddleware } from '@paws/telemetry';
import { BrowserActionSchema } from '@paws/domain-browser';
Expand All @@ -21,6 +22,8 @@ export interface AppDeps {
syncLoop?: SyncLoop | undefined;
/** Config for snapshot builder (optional — needed for build endpoint) */
snapshotBuilderConfig?: SnapshotBuilderConfig | undefined;
/** WebSocket upgrade function (needed for WS proxy to VMs) */
upgradeWebSocket?: UpgradeWebSocket | undefined;
}

const log = createLogger('routes');
Expand Down Expand Up @@ -551,7 +554,7 @@ const ws = require('ws');
});

// --- Inbound proxy (VM port exposure) ---
registerProxyRoutes(app, executor);
registerProxyRoutes(app, executor, deps.upgradeWebSocket);

return app;
}
Loading
Loading