Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Decision docs:

- [deployment shape](docs/deployment.md)
- [ACP alias bridge](docs/acp-alias-bridge.md)
- [agent control MCP](docs/agent-control-mcp.md)

Working rules:

Expand Down
6 changes: 5 additions & 1 deletion apps/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@
},
"dependencies": {
"@amesh/protocol": "workspace:*",
"@cfworker/json-schema": "^4.1.1",
"@fastify/cookie": "^11.0.2",
"@modelcontextprotocol/node": "2.0.0-alpha.2",
"@modelcontextprotocol/server": "2.0.0-alpha.2",
"better-sqlite3": "^12.2.0",
"dotenv": "^17.4.2",
"drizzle-orm": "^0.44.5",
"fastify": "^5.6.0",
"nanoid": "^5.1.6"
"nanoid": "^5.1.6",
"zod": "^4.4.3"
},
"devDependencies": {
"@types/better-sqlite3": "^7.6.13",
Expand Down
180 changes: 180 additions & 0 deletions apps/server/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ import {
upsertTriggerRuleRequestSchema
} from "@amesh/protocol";
import cookie from "@fastify/cookie";
import { NodeStreamableHTTPServerTransport } from "@modelcontextprotocol/node";
import Fastify, { type FastifyReply, type FastifyRequest } from "fastify";
import { nanoid } from "nanoid";
import { randomUUID } from "node:crypto";
import { existsSync } from "node:fs";
import { readFile } from "node:fs/promises";
import { dirname, extname, join, normalize, resolve, sep } from "node:path";
Expand All @@ -61,6 +63,7 @@ import {
verifySession
} from "./auth.js";
import { createDatabase } from "./db/client.js";
import { buildAmeshMcpServer, type McpScope } from "./mcp.js";
import { Repository } from "./repository.js";

type Role = "browser" | "node";
Expand All @@ -80,6 +83,11 @@ type NodeSocket = {
send: (message: ProtocolEnvelope) => void;
};

type McpSessionState = {
scope: McpScope;
transport: NodeStreamableHTTPServerTransport;
};

class NodeLogStore {
private readonly entries = new Map<string, NodeLogEntry[]>();

Expand All @@ -104,6 +112,7 @@ type AppRouteDeps = {
registrationToken: string;
repository: Repository;
nodeSockets: Map<string, NodeSocket>;
mcpSessions: Map<string, McpSessionState>;
pendingDirectoryBrowses: Map<
string,
{
Expand Down Expand Up @@ -146,6 +155,7 @@ export function buildApp(options: AppOptions = {}) {
const nodeSockets = new Map<string, NodeSocket>();
const nodeVersions = new Map<string, string | null>();
const nodeLogs = new NodeLogStore();
const mcpSessions = new Map<string, McpSessionState>();
const pendingDirectoryBrowses = new Map<
string,
{
Expand Down Expand Up @@ -593,6 +603,7 @@ export function buildApp(options: AppOptions = {}) {
registrationToken,
repository,
nodeSockets,
mcpSessions,
pendingDirectoryBrowses,
isAuthenticated,
requireBrowserAuth,
Expand Down Expand Up @@ -624,6 +635,10 @@ export function buildApp(options: AppOptions = {}) {
});

app.addHook("onClose", async () => {
for (const state of mcpSessions.values()) {
await state.transport.close();
}
mcpSessions.clear();
for (const socket of websocketServer.clients) {
socket.close();
}
Expand Down Expand Up @@ -665,6 +680,7 @@ function registerApiRoutes({
registrationToken,
repository,
nodeSockets,
mcpSessions,
pendingDirectoryBrowses,
isAuthenticated,
requireBrowserAuth,
Expand All @@ -674,6 +690,164 @@ function registerApiRoutes({
sendToNode,
nodeLogs
}: AppRouteDeps) {
async function authenticateMcpRequest(request: FastifyRequest, reply: FastifyReply) {
const origin = request.headers.origin;
if (origin) {
let parsedOrigin: URL;
try {
parsedOrigin = new URL(origin);
} catch {
reply.code(400).send({ message: "invalid origin header" });
return null;
}
if (parsedOrigin.host !== request.headers.host) {
reply.code(403).send({ message: "origin not allowed" });
return null;
}
}

const cookieSession = verifySession(authConfig, request.cookies[authConfig.cookieName]);
if (cookieSession) {
return {
authMode: "browser_session" as const
};
}

const authorization = request.headers.authorization ?? "";
const [scheme, token] = authorization.split(" ", 2);
if (scheme === "Bearer" && typeof token === "string") {
if (constantTimeStringEqual(token, authConfig.password)) {
return { authMode: "admin_password" as const };
}
if (registrationToken && constantTimeStringEqual(token, registrationToken)) {
return { authMode: "registration_token" as const };
}
}

reply
.code(401)
.header("WWW-Authenticate", 'Bearer realm="amesh-mcp"')
.send({ message: "authentication required" });
return null;
}

function scopedMcpAgent(request: FastifyRequest, reply: FastifyReply, auth: { authMode: McpScope["authMode"] }) {
const scopedAgentId = typeof request.headers["x-amesh-agent-id"] === "string"
? request.headers["x-amesh-agent-id"].trim()
: "";
const scopedNodeId = typeof request.headers["x-amesh-node-id"] === "string"
? request.headers["x-amesh-node-id"].trim()
: "";

if (!scopedAgentId) {
return {
authMode: auth.authMode,
scopedAgentId: null,
scopedNodeId: null
} satisfies McpScope;
}

const agent = repository.findAgent(scopedAgentId);
if (!agent) {
reply.code(404).send({ message: `scoped agent not found: ${scopedAgentId}` });
return null;
}
if (scopedNodeId && scopedNodeId !== agent.nodeId) {
reply.code(400).send({ message: "scoped node does not match scoped agent" });
return null;
}

return {
authMode: auth.authMode,
scopedAgentId: agent.id,
scopedNodeId: agent.nodeId
} satisfies McpScope;
}

function isInitializeRequest(body: unknown) {
if (!body || typeof body !== "object") {
return false;
}
const candidate = body as { method?: unknown };
return candidate.method === "initialize";
}

async function handleMcpRequest(request: FastifyRequest, reply: FastifyReply) {
const auth = await authenticateMcpRequest(request, reply);
if (!auth) {
return;
}

const requestedSessionId = typeof request.headers["mcp-session-id"] === "string"
? request.headers["mcp-session-id"]
: null;

let sessionState = requestedSessionId ? mcpSessions.get(requestedSessionId) ?? null : null;
if (requestedSessionId && !sessionState) {
reply.code(404).send({ message: "unknown MCP session" });
return;
}

if (!sessionState && request.method === "GET") {
reply.code(405).send({ message: "GET SSE is not enabled for this MCP endpoint" });
return;
}

if (!sessionState) {
if (request.method === "DELETE") {
reply.code(404).send({ message: "unknown MCP session" });
return;
}
if (!isInitializeRequest(request.body)) {
reply.code(400).send({ message: "MCP session must start with initialize" });
return;
}

const scope = scopedMcpAgent(request, reply, auth);
if (!scope) {
return;
}

const transport = new NodeStreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
enableJsonResponse: true
});
transport.onclose = () => {
if (transport.sessionId) {
mcpSessions.delete(transport.sessionId);
}
};
transport.onerror = (error) => {
app.log.error({ error }, "mcp transport error");
};

const server = buildAmeshMcpServer(scope, {
repository,
nodeSockets,
sendToNode
});
await server.connect(transport);
sessionState = {
scope,
transport
};
}

reply.hijack();
try {
await sessionState.transport.handleRequest(request.raw, reply.raw, request.body);
const createdSessionId = sessionState.transport.sessionId;
if (createdSessionId && !mcpSessions.has(createdSessionId)) {
mcpSessions.set(createdSessionId, sessionState);
}
} catch (error) {
if (sessionState.transport.sessionId) {
mcpSessions.delete(sessionState.transport.sessionId);
}
throw error;
}
}

app.get("/api/auth/session", async (request: FastifyRequest) => ({
authenticated: isAuthenticated(request.cookies[authConfig.cookieName]),
username: authConfig.username
Expand Down Expand Up @@ -701,6 +875,12 @@ function registerApiRoutes({
return { authenticated: false };
});

app.route({
method: ["GET", "POST", "DELETE"],
url: "/mcp",
handler: handleMcpRequest
});

app.get("/api/nodes", { preHandler: requireBrowserAuth }, async () => (await topologySnapshot()).nodes);
app.get("/api/agents", { preHandler: requireBrowserAuth }, async () => repository.listTopology().agents);
app.get("/api/bootstrap", { preHandler: requireBrowserAuth }, async () => ({ registrationToken }));
Expand Down
Loading
Loading