From 95e6b2ed7621d08ad42d737b4eb7be63c87f7c51 Mon Sep 17 00:00:00 2001 From: Nitay Rabinovich Date: Wed, 13 May 2026 21:06:42 +0200 Subject: [PATCH 1/2] add node logs view --- .github/workflows/build-node-artifacts.yml | 5 + apps/server/src/app.ts | 56 +++++++++- apps/server/test/app.test.ts | 48 ++++++++ apps/web/src/api.ts | 12 +- apps/web/src/components/AppSidebar.tsx | 7 +- apps/web/src/lib/topologyContext.tsx | 6 +- apps/web/src/lib/topologyStore.ts | 11 +- apps/web/src/router.tsx | 9 +- apps/web/src/routes/LogsRoute.tsx | 124 +++++++++++++++++++++ apps/web/src/styles.css | 118 ++++++++++++++++++++ docs/local-dev.md | 1 + docs/past-failures.md | 6 + install-amesh-node.sh | 9 ++ internal/acpx/runner_test.go | 17 ++- internal/app/app.go | 101 +++++++++++++++++ internal/app/app_test.go | 86 ++++++++++++++ packages/protocol/src/index.ts | 27 +++++ scripts/test-install-amesh-node.sh | 7 ++ 18 files changed, 639 insertions(+), 11 deletions(-) create mode 100644 apps/web/src/routes/LogsRoute.tsx diff --git a/.github/workflows/build-node-artifacts.yml b/.github/workflows/build-node-artifacts.yml index ef2be32..7a77362 100644 --- a/.github/workflows/build-node-artifacts.yml +++ b/.github/workflows/build-node-artifacts.yml @@ -50,10 +50,13 @@ jobs: run: | mkdir -p dist bin_name="amesh-node" + cli_name="amesh" if [ "${GOOS}" = "windows" ]; then bin_name="amesh-node.exe" + cli_name="amesh.exe" fi go build -trimpath -ldflags="-s -w" -o "dist/${bin_name}" ./cmd/amesh-node + go build -trimpath -ldflags="-s -w" -o "dist/${cli_name}" ./cmd/amesh - name: Run Go tests if: matrix.goos == 'linux' && matrix.goarch == 'amd64' @@ -67,6 +70,7 @@ jobs: cp README.md package/ cp install-amesh-node.sh package/ cp dist/amesh-node package/ + cp dist/amesh package/ tar -C package -czf "dist/${asset_base}.tar.gz" . - name: Package zip artifact @@ -77,6 +81,7 @@ jobs: cp README.md package/ cp install-amesh-node.sh package/ cp dist/amesh-node.exe package/ + cp dist/amesh.exe package/ cd package zip -r "../dist/${asset_base}.zip" . diff --git a/apps/server/src/app.ts b/apps/server/src/app.ts index 98daf7a..6b2ecd4 100644 --- a/apps/server/src/app.ts +++ b/apps/server/src/app.ts @@ -8,6 +8,8 @@ import type { NodeDirectoryBrowsePayload, NodeDirectoryBrowseResultPayload, NodeHeartbeatPayload, + NodeLogEntry, + NodeLogPayload, NodePathsUpdatePayload, NodeRegistrationPayload, NodeResumePayload, @@ -29,6 +31,8 @@ import { nodeDirectoryBrowsePayloadSchema, nodeDirectoryBrowseResultPayloadSchema, nodeHeartbeatPayloadSchema, + nodeLogPayloadSchema, + nodeLogsResponseSchema, nodePathsUpdatePayloadSchema, nodeRegistrationPayloadSchema, nodeResumePayloadSchema, @@ -76,6 +80,24 @@ type NodeSocket = { send: (message: ProtocolEnvelope) => void; }; +class NodeLogStore { + private readonly entries = new Map(); + + append(input: NodeLogPayload) { + const entry: NodeLogEntry = { + id: nanoid(14), + ...input + }; + const entries = [...(this.entries.get(input.nodeId) ?? []), entry].slice(-300); + this.entries.set(input.nodeId, entries); + return entry; + } + + list(nodeId: string) { + return [...(this.entries.get(nodeId) ?? [])]; + } +} + type AppRouteDeps = { app: ReturnType; authConfig: ReturnType; @@ -96,6 +118,7 @@ type AppRouteDeps = { broadcastTopology: () => Promise; broadcastSession: (sessionId: string) => Promise; sendToNode: (nodeId: string, envelope: ProtocolEnvelope) => void; + nodeLogs: NodeLogStore; }; function websocketSend(socket: WebSocket, payload: unknown) { @@ -122,6 +145,7 @@ export function buildApp(options: AppOptions = {}) { const browserSockets = new Set(); const nodeSockets = new Map(); const nodeVersions = new Map(); + const nodeLogs = new NodeLogStore(); const pendingDirectoryBrowses = new Map< string, { @@ -222,6 +246,19 @@ export function buildApp(options: AppOptions = {}) { } } + function broadcastNodeLogs(nodeId: string) { + const message = browserRealtimeEventSchema.parse({ + type: "node.logs.updated", + payload: nodeLogsResponseSchema.parse({ + nodeId, + entries: nodeLogs.list(nodeId) + }) + }); + for (const socket of browserSockets) { + websocketSend(socket, message); + } + } + function maybePropagateChildCompletion(payload: SessionEventRecord) { const state = repository.getSession(payload.sessionId); if (!state?.session.parentSessionId) { @@ -376,6 +413,12 @@ export function buildApp(options: AppOptions = {}) { void broadcastTopology(); break; } + case "node.log": { + const payload = nodeLogPayloadSchema.parse(envelope.payload) as NodeLogPayload; + nodeLogs.append(payload); + broadcastNodeLogs(payload.nodeId); + break; + } case "node.capabilities.sync": { const payload = capabilitySyncPayloadSchema.parse( envelope.payload @@ -556,7 +599,8 @@ export function buildApp(options: AppOptions = {}) { topologySnapshot, broadcastTopology, broadcastSession, - sendToNode + sendToNode, + nodeLogs }); app.server.on("upgrade", (request, socket, head) => { @@ -627,7 +671,8 @@ function registerApiRoutes({ topologySnapshot, broadcastTopology, broadcastSession, - sendToNode + sendToNode, + nodeLogs }: AppRouteDeps) { app.get("/api/auth/session", async (request: FastifyRequest) => ({ authenticated: isAuthenticated(request.cookies[authConfig.cookieName]), @@ -663,6 +708,13 @@ function registerApiRoutes({ repository.listTopology().triggerRules ); app.get("/api/topology", { preHandler: requireBrowserAuth }, async () => topologySnapshot()); + app.get("/api/nodes/:nodeId/logs", { preHandler: requireBrowserAuth }, async (request: FastifyRequest) => { + const params = request.params as { nodeId: string }; + return nodeLogsResponseSchema.parse({ + nodeId: params.nodeId, + entries: nodeLogs.list(params.nodeId) + }); + }); app.post("/api/nodes/:nodeId/update", { preHandler: requireBrowserAuth }, async (request: FastifyRequest, reply: FastifyReply) => triggerNodeAction(request, reply, repository, nodeSockets, sendToNode, "update") ); diff --git a/apps/server/test/app.test.ts b/apps/server/test/app.test.ts index b997f0f..d34ffb6 100644 --- a/apps/server/test/app.test.ts +++ b/apps/server/test/app.test.ts @@ -109,6 +109,54 @@ describe("server app", () => { socket.close(); }); + it("stores and returns node logs", async () => { + const socket = new WebSocket(`ws://${address}/ws?role=node&nodeId=node-1`); + await waitForOpen(socket); + socket.send(JSON.stringify(registerNode("node-1", "a"))); + await readNodeMessage(socket); + + socket.send( + JSON.stringify({ + type: "node.log", + requestId: "log-1", + sessionId: null, + source: "node-1", + target: "server", + payload: { + nodeId: "node-1", + level: "error", + message: "agent health probe failed", + context: { + agentId: "agent-openclaw", + error: "ACP metadata is missing" + }, + observedAt: "2026-05-13T18:00:00Z" + } + }) + ); + await waitForIdle(); + + const response = await injectAuthed(app, authCookie, { + method: "GET", + url: "/api/nodes/node-1/logs" + }); + expect(response.statusCode).toBe(200); + expect(response.json()).toMatchObject({ + nodeId: "node-1", + entries: [ + { + level: "error", + message: "agent health probe failed", + context: { + agentId: "agent-openclaw", + error: "ACP metadata is missing" + } + } + ] + }); + socket.close(); + }); + it("resolves the default sqlite path independently of process cwd", async () => { const originalCwd = process.cwd(); const tempCwd = await mkdtemp(join(tmpdir(), "amesh-db-cwd-")); diff --git a/apps/web/src/api.ts b/apps/web/src/api.ts index 8389360..2de7798 100644 --- a/apps/web/src/api.ts +++ b/apps/web/src/api.ts @@ -1,10 +1,11 @@ import type { BrowserRealtimeEvent, BrowseNodeDirectoriesResponse, + NodeLogsResponse, TopologySnapshot, TriggerRule } from "@amesh/protocol"; -import { browseNodeDirectoriesResponseSchema } from "@amesh/protocol"; +import { browseNodeDirectoriesResponseSchema, nodeLogsResponseSchema } from "@amesh/protocol"; import type { SessionSummary, SessionView } from "./types.js"; @@ -118,6 +119,15 @@ export async function fetchNodeDirectories(nodeId: string, path?: string): Promi return browseNodeDirectoriesResponseSchema.parse(await response.json()); } +export async function fetchNodeLogs(nodeId: string): Promise { + const response = await apiFetch(`/api/nodes/${nodeId}/logs`); + if (!response.ok) { + const body = (await response.json().catch(() => null)) as { message?: string } | null; + throw new Error(body?.message ?? "Log fetch failed"); + } + return nodeLogsResponseSchema.parse(await response.json()); +} + export async function createTriggerRule(input: { sourceAgentId: string; targetAgentId: string; diff --git a/apps/web/src/components/AppSidebar.tsx b/apps/web/src/components/AppSidebar.tsx index 3cf6541..1c3c73f 100644 --- a/apps/web/src/components/AppSidebar.tsx +++ b/apps/web/src/components/AppSidebar.tsx @@ -1,5 +1,5 @@ import { Link, useRouterState } from "@tanstack/react-router"; -import { GitBranch, MessagesSquare } from "lucide-react"; +import { FileText, GitBranch, MessagesSquare } from "lucide-react"; export function AppSidebar() { const { location } = useRouterState(); @@ -7,6 +7,7 @@ export function AppSidebar() { const path = location.pathname; const isTopology = path === "/" || path.startsWith("/topology"); const isSessions = path.startsWith("/sessions"); + const isLogs = path.startsWith("/logs"); return ( ); diff --git a/apps/web/src/lib/topologyContext.tsx b/apps/web/src/lib/topologyContext.tsx index 38ac012..9f782ca 100644 --- a/apps/web/src/lib/topologyContext.tsx +++ b/apps/web/src/lib/topologyContext.tsx @@ -1,10 +1,12 @@ -import { createContext, useContext, type ReactNode } from "react"; -import type { TopologySnapshot } from "@amesh/protocol"; +import { createContext, useContext, type Dispatch, type ReactNode, type SetStateAction } from "react"; +import type { NodeLogEntry, TopologySnapshot } from "@amesh/protocol"; import { type ConnectionState, useTopologyStore } from "./topologyStore.js"; type TopologyContextValue = { topology: TopologySnapshot; + nodeLogs: Record; + setNodeLogs: Dispatch>>; connection: ConnectionState; refresh: () => void; }; diff --git a/apps/web/src/lib/topologyStore.ts b/apps/web/src/lib/topologyStore.ts index 94e7b9b..c87f419 100644 --- a/apps/web/src/lib/topologyStore.ts +++ b/apps/web/src/lib/topologyStore.ts @@ -1,5 +1,5 @@ import { useEffect, useRef, useState } from "react"; -import type { TopologySnapshot } from "@amesh/protocol"; +import type { NodeLogEntry, TopologySnapshot } from "@amesh/protocol"; import { connectRealtime, fetchTopology } from "../api.js"; @@ -9,6 +9,7 @@ export type ConnectionState = "loading" | "connected" | "disconnected"; export function useTopologyStore() { const [topology, setTopology] = useState(empty); + const [nodeLogs, setNodeLogs] = useState>({}); const [connection, setConnection] = useState("loading"); const refetchRef = useRef<() => void>(() => {}); @@ -37,6 +38,12 @@ export function useTopologyStore() { if (event.type === "topology.snapshot" || event.type === "topology.updated") { setTopology(event.payload); } + if (event.type === "node.logs.updated") { + setNodeLogs((current) => ({ + ...current, + [event.payload.nodeId]: event.payload.entries + })); + } }); socket.onopen = () => active && setConnection("connected"); socket.onerror = () => active && setConnection("disconnected"); @@ -53,6 +60,8 @@ export function useTopologyStore() { return { topology, + nodeLogs, + setNodeLogs, connection, refresh: () => refetchRef.current() }; diff --git a/apps/web/src/router.tsx b/apps/web/src/router.tsx index 06b2a0a..265ca0c 100644 --- a/apps/web/src/router.tsx +++ b/apps/web/src/router.tsx @@ -9,6 +9,7 @@ import { AppSidebar } from "./components/AppSidebar.js"; import { ErrorBanner } from "./components/ErrorBanner.js"; import { TopBar } from "./components/TopBar.js"; import { useTopology } from "./lib/topologyContext.js"; +import { LogsRoute } from "./routes/LogsRoute.js"; import { SessionsRoute } from "./routes/SessionsRoute.js"; import { TopologyRoute } from "./routes/TopologyRoute.js"; @@ -62,7 +63,13 @@ const sessionsRoute = createRoute({ component: SessionsRoute }); -const routeTree = rootRoute.addChildren([topologyRoute, sessionsRoute]); +const logsRoute = createRoute({ + getParentRoute: () => rootRoute, + path: "/logs", + component: LogsRoute +}); + +const routeTree = rootRoute.addChildren([topologyRoute, sessionsRoute, logsRoute]); export const router = createRouter({ routeTree }); diff --git a/apps/web/src/routes/LogsRoute.tsx b/apps/web/src/routes/LogsRoute.tsx new file mode 100644 index 0000000..f92ae47 --- /dev/null +++ b/apps/web/src/routes/LogsRoute.tsx @@ -0,0 +1,124 @@ +import { useEffect, useMemo, useState } from "react"; +import type { NodeLogEntry } from "@amesh/protocol"; + +import { fetchNodeLogs } from "../api.js"; +import { useTopology } from "../lib/topologyContext.js"; + +const timeFormatter = new Intl.DateTimeFormat("en", { + hour: "2-digit", + minute: "2-digit", + second: "2-digit" +}); + +function formatLogTime(value: string) { + const parsed = Date.parse(value); + if (Number.isNaN(parsed)) { + return value; + } + return timeFormatter.format(parsed); +} + +function levelLabel(level: NodeLogEntry["level"]) { + return level.toUpperCase(); +} + +function compactContext(context: Record) { + const entries = Object.entries(context).filter(([, value]) => value !== undefined && value !== null); + if (entries.length === 0) { + return ""; + } + return entries + .map(([key, value]) => `${key}=${typeof value === "string" ? value : JSON.stringify(value)}`) + .join(" "); +} + +export function LogsRoute() { + const { topology, nodeLogs, setNodeLogs } = useTopology(); + const [selectedNodeId, setSelectedNodeId] = useState(""); + const [error, setError] = useState(null); + + const selectedNode = useMemo(() => { + return topology.nodes.find((node) => node.id === selectedNodeId) ?? topology.nodes[0] ?? null; + }, [selectedNodeId, topology.nodes]); + const entries = selectedNode ? nodeLogs[selectedNode.id] ?? [] : []; + + useEffect(() => { + if (!selectedNodeId && topology.nodes[0]) { + setSelectedNodeId(topology.nodes[0].id); + } + }, [selectedNodeId, topology.nodes]); + + useEffect(() => { + if (!selectedNode) { + return; + } + let active = true; + fetchNodeLogs(selectedNode.id) + .then((payload) => { + if (!active) return; + setError(null); + setNodeLogs((current) => ({ + ...current, + [payload.nodeId]: payload.entries + })); + }) + .catch((caught: unknown) => { + if (!active) return; + setError(caught instanceof Error ? caught.message : "Log fetch failed"); + }); + return () => { + active = false; + }; + }, [selectedNode, setNodeLogs]); + + return ( +
+
+
+

Logs

+

Node activity

+
+ +
+ + {error ?
{error}
: null} + +
+ {selectedNode ? ( + entries.length > 0 ? ( + entries.map((entry) => { + const context = compactContext(entry.context); + return ( +
+ + {levelLabel(entry.level)} +
+ {entry.message} + {context ? {context} : null} +
+
+ ); + }) + ) : ( +
No node log entries yet.
+ ) + ) : ( +
No nodes are registered.
+ )} +
+
+ ); +} diff --git a/apps/web/src/styles.css b/apps/web/src/styles.css index 2dcac57..2ffe8c5 100644 --- a/apps/web/src/styles.css +++ b/apps/web/src/styles.css @@ -509,6 +509,124 @@ button:focus { outline: none; } .topology-canvas .react-flow__attribution { display: none; } +/* =============================================================== */ +/* Node logs */ +/* =============================================================== */ + +.logs-route { + height: 100%; + min-height: 0; + display: grid; + grid-template-rows: auto auto 1fr; + background: var(--c-surface); +} + +.logs-route__header { + display: flex; + align-items: end; + justify-content: space-between; + gap: var(--s-4); + padding: var(--s-5); + border-bottom: 1px solid var(--c-line); +} + +.logs-route__eyebrow { + margin: 0 0 var(--s-2); + color: var(--c-mute); + font: var(--t-label); + letter-spacing: 0.08em; + text-transform: uppercase; +} + +.logs-route__header h1 { + margin: 0; + color: var(--c-ink); + font: 600 1.35rem/1.15 var(--font-sans); +} + +.logs-route__selector { + display: grid; + gap: var(--s-2); + min-width: min(280px, 45vw); + color: var(--c-mute); + font: var(--t-label); + letter-spacing: 0.08em; + text-transform: uppercase; +} + +.logs-route__selector select { + min-height: 38px; + border: 1px solid var(--c-line); + border-radius: var(--r-sm); + background: var(--c-surface-raised); + color: var(--c-ink); + font: var(--t-body); + letter-spacing: 0; + text-transform: none; + padding: 0 var(--s-3); +} + +.logs-route__error { + margin: var(--s-3) var(--s-5) 0; + color: var(--c-error); + font: var(--t-body); +} + +.logs-route__stream { + min-height: 0; + overflow: auto; + padding: var(--s-3) var(--s-5) var(--s-5); + font: var(--t-body); +} + +.log-row { + display: grid; + grid-template-columns: 78px 58px minmax(0, 1fr); + gap: var(--s-3); + align-items: start; + padding: 9px 0; + border-bottom: 1px solid var(--c-line); +} + +.log-row time, +.log-row__level { + color: var(--c-mute); + font: var(--t-mono); + white-space: nowrap; +} + +.log-row__level { + color: var(--c-ink); +} + +.log-row[data-level="warn"] .log-row__level { color: var(--c-pending); } +.log-row[data-level="error"] .log-row__level { color: var(--c-error); } +.log-row[data-level="debug"] .log-row__level { color: var(--c-mute); } + +.log-row__body { + min-width: 0; + display: grid; + gap: 3px; +} + +.log-row__body strong { + color: var(--c-ink); + font: var(--t-body); +} + +.log-row__body code { + color: var(--c-mute); + font: var(--t-mono); + white-space: pre-wrap; + overflow-wrap: anywhere; +} + +.logs-route__empty { + padding: var(--s-6) 0; + color: var(--c-mute); + font: var(--t-body); +} + /* Node card (custom React Flow node) */ .node-card { font: var(--t-body); diff --git a/docs/local-dev.md b/docs/local-dev.md index 314ab93..b5f208d 100644 --- a/docs/local-dev.md +++ b/docs/local-dev.md @@ -50,6 +50,7 @@ sh -n scripts/install-amesh-node.sh - `corepack pnpm check:knip` runs unused dependency and export checks against the TypeScript workspaces only; it intentionally ignores repo-local agent skill folders and the Go tree. - `corepack pnpm check:sentrux` installs a pinned user-space `sentrux` binary under `.tools/` on first run and enforces the repo rules from `.sentrux/rules.toml`. - `install-amesh-node.sh` downloads the released `amesh-node` binary for the current platform, installs a managed ACPX sidecar under `~/.local/share/amesh/acpx`, and exports `AMESH_ACPX_PATH` for the service. +- Release artifacts also include the `amesh` CLI. The installer places it beside `amesh-node` on the selected install path so operators can run commands such as `amesh logs` after bootstrap. - The published remote bootstrap path is `curl .../install-amesh-node.sh | ... bash`, so the installer must keep working when Bash reads it from stdin instead of from a file. - The installer now logs whether it is reusing or creating config/state, and on systemd hosts it fails the install if the user service does not remain active after startup. When that happens it prints both `systemctl --user status` and recent `journalctl --user -u amesh-node` output. - `install-amesh-node.sh` also normalizes `~/.acpx/config.json` so ACPX non-interactive health probes start from a valid baseline on first install. diff --git a/docs/past-failures.md b/docs/past-failures.md index 0eefd8b..7a0629f 100644 --- a/docs/past-failures.md +++ b/docs/past-failures.md @@ -18,6 +18,12 @@ - Cause: the installer wrote raw `Environment=PATH=...` lines into the systemd unit. systemd splits unquoted environment assignments on whitespace. - Mitigation: the installer now quotes and escapes systemd `Environment` values, and the stdin installer test covers a PATH entry containing spaces. +## 2026-05-13: Main artifact build failed in ACPX stream test + +- Symptom: the `Build linux-amd64` artifact job on `main` failed in `TestRunnerStreamsStdoutLineByLine` with empty stdout from the helper process. +- Cause: that test depended on re-running the Go test binary as a helper process for a simple stdout/stderr stream case, which made the artifact workflow sensitive to test-binary invocation behavior. +- Mitigation: the stream test now uses a tiny explicit executable fixture for the stdout/stderr path and keeps the Go helper process only for ACPX command-shape tests. + ## 2026-05-12: Installer rejected valid Node 24 runtimes - Symptom: remote bootstrap failed early with `could not determine Node.js major version` even though `node -v` reported `v24.13.1`. diff --git a/install-amesh-node.sh b/install-amesh-node.sh index 261dc2f..5a6e44c 100644 --- a/install-amesh-node.sh +++ b/install-amesh-node.sh @@ -215,6 +215,7 @@ main() { download_url="https://github.com/${REPO}/releases/download/${tag}/${asset}" install_dir="$(pick_install_dir)" binary_path="${BINARY_PATH:-$install_dir/amesh-node}" + cli_binary_path="${AMESH_CLI_PATH:-$install_dir/amesh}" tmp_dir="$(mktemp -d)" trap 'rm -rf "${tmp_dir}"' EXIT @@ -233,12 +234,17 @@ main() { extract_archive "${tmp_dir}/${asset}" "${extract_dir}" binary_name="amesh-node" + cli_binary_name="amesh" if [ "${os}" = "windows" ]; then binary_name="amesh-node.exe" + cli_binary_name="amesh.exe" fi [ -f "${extract_dir}/${binary_name}" ] || fail "archive did not contain ${binary_name}" install -m 0755 "${extract_dir}/${binary_name}" "${binary_path}" + if [ -f "${extract_dir}/${cli_binary_name}" ]; then + install -m 0755 "${extract_dir}/${cli_binary_name}" "${cli_binary_path}" + fi if command -v systemctl >/dev/null 2>&1; then systemctl --user stop "$SERVICE_NAME" >/dev/null 2>&1 || true @@ -319,6 +325,9 @@ EOF fi log "installed ${binary_path}" + if [ -x "${cli_binary_path}" ]; then + log "installed ${cli_binary_path}" + fi log "managed acpx: ${ACPX_BIN}" log "state: ${STATE_PATH}" } diff --git a/internal/acpx/runner_test.go b/internal/acpx/runner_test.go index 1d5f18c..698dfd0 100644 --- a/internal/acpx/runner_test.go +++ b/internal/acpx/runner_test.go @@ -15,11 +15,12 @@ func TestRunnerStreamsStdoutLineByLine(t *testing.T) { t.Parallel() var lines []string - command, args := helperCommand(t, "emit-lines") + command := writeHelperExecutable(t, `#!/bin/sh +printf '%s\n' one two +printf '%s\n' noise >&2 +`) output, err := (Runner{}).Run(context.Background(), RunRequest{ Command: command, - Args: args, - Env: []string{"GO_WANT_HELPER_PROCESS=1"}, }, func(line string) { lines = append(lines, line) }) @@ -268,3 +269,13 @@ func helperCommand(t *testing.T, mode string) (string, []string) { return os.Args[0], []string{"-test.run=TestRunnerHelperProcess", "--", mode} } + +func writeHelperExecutable(t *testing.T, contents string) string { + t.Helper() + + path := filepath.Join(t.TempDir(), "helper") + if err := os.WriteFile(path, []byte(contents), 0o755); err != nil { + t.Fatalf("write helper executable: %v", err) + } + return path +} diff --git a/internal/app/app.go b/internal/app/app.go index 3ae7bf8..123a95b 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -52,6 +52,27 @@ func logf(format string, args ...any) { fmt.Fprintf(os.Stderr, "amesh-node %s %s\n", time.Now().UTC().Format(time.RFC3339), fmt.Sprintf(format, args...)) } +func sendNodeLog(ctx context.Context, client daemonClient, nodeID string, level string, message string, context map[string]any) { + if context == nil { + context = map[string]any{} + } + if err := client.Send(ctx, nodeclient.Envelope{ + Type: "node.log", + RequestID: fmt.Sprintf("log-%d", time.Now().UnixNano()), + Source: nodeID, + Target: "server", + Payload: map[string]any{ + "nodeId": nodeID, + "level": level, + "message": message, + "context": context, + "observedAt": time.Now().UTC().Format(time.RFC3339), + }, + }); err != nil { + logf("node log send failed node=%s error=%v", nodeID, err) + } +} + func (err retryableDaemonError) Error() string { return err.err.Error() } @@ -81,11 +102,37 @@ func run(ctx context.Context, args []string, update updateRunner, detect detectR return update(ctx, os.Stdout, os.Stderr) case "acp": return runACPBridge(ctx, args[1:], os.Stdin, os.Stdout) + case "logs": + return runLogs(ctx, args[1:], os.Stdout, os.Stderr) default: return fmt.Errorf("unknown subcommand %q", args[0]) } } +func runLogs(ctx context.Context, args []string, stdout, stderr io.Writer) error { + flags := flag.NewFlagSet("logs", flag.ContinueOnError) + serviceName := flags.String("service", "amesh-node", "systemd user service name") + lines := flags.Int("n", 200, "number of recent log lines") + follow := flags.Bool("follow", true, "follow new log lines") + if err := flags.Parse(args); err != nil { + return err + } + if _, err := exec.LookPath("journalctl"); err != nil { + return fmt.Errorf("journalctl is required to read installed node logs: %w", err) + } + journalArgs := []string{"--user", "-u", *serviceName, "-n", fmt.Sprintf("%d", *lines), "--no-pager"} + if *follow { + journalArgs = append(journalArgs, "-f") + } + cmd := exec.CommandContext(ctx, "journalctl", journalArgs...) + cmd.Stdout = stdout + cmd.Stderr = stderr + if err := cmd.Run(); err != nil { + return fmt.Errorf("read node logs: %w", err) + } + return nil +} + func runACPBridge(ctx context.Context, args []string, stdin io.Reader, stdout io.Writer) error { flags := flag.NewFlagSet("acp", flag.ContinueOnError) configPath := flags.String("config", acpconfig.DefaultPath(), "path to ACP alias config") @@ -979,6 +1026,11 @@ func runDaemonSession( if err := syncHealthyCapabilities(ctx, client, nodeID, configPath, probe); err != nil { return retryableDaemonError{err: err} } + sendNodeLog(ctx, client, nodeID, "info", "node session resumed", map[string]any{ + "serverUrl": serverURL, + "config": configPath, + "version": currentVersion(), + }) sessionCtx, cancel := context.WithCancel(ctx) defer cancel() @@ -1000,17 +1052,37 @@ func runDaemonSession( switch envelope.Type { case "session.start", "session.input": logf("session command node=%s type=%s session=%s", nodeID, envelope.Type, deref(envelope.SessionID)) + sendNodeLog(sessionCtx, client, nodeID, "info", "session command received", map[string]any{ + "type": envelope.Type, + "sessionId": deref(envelope.SessionID), + "agentId": envelope.Payload["agentId"], + }) if err := startSession(sessionCtx, client, runner, sessions, configPath, nodeID, envelope); err != nil { + sendNodeLog(sessionCtx, client, nodeID, "error", "session command failed", map[string]any{ + "type": envelope.Type, + "sessionId": deref(envelope.SessionID), + "error": err.Error(), + }) return retryableDaemonError{err: err} } case "session.cancel": logf("session cancel node=%s session=%s", nodeID, deref(envelope.SessionID)) + sendNodeLog(sessionCtx, client, nodeID, "warn", "session cancel received", map[string]any{ + "sessionId": deref(envelope.SessionID), + }) if err := cancelSession(sessionCtx, client, sessions, nodeID, envelope); err != nil { return retryableDaemonError{err: err} } case "node.detect": logf("detect command node=%s config=%s", nodeID, configPath) + sendNodeLog(sessionCtx, client, nodeID, "info", "agent detection started", map[string]any{ + "config": configPath, + }) if err := detect(sessionCtx, configPath); err != nil { + sendNodeLog(sessionCtx, client, nodeID, "error", "agent detection failed", map[string]any{ + "config": configPath, + "error": err.Error(), + }) return fmt.Errorf("node detect failed: %w", err) } if err := syncHealthyCapabilities(sessionCtx, client, nodeID, configPath, probe); err != nil { @@ -1030,8 +1102,14 @@ func runDaemonSession( nextPaths = append(nextPaths, path) } if err := updateConfigPaths(configPath, nextPaths); err != nil { + sendNodeLog(sessionCtx, client, nodeID, "error", "path update failed", map[string]any{ + "error": err.Error(), + }) return fmt.Errorf("node path update failed: %w", err) } + sendNodeLog(sessionCtx, client, nodeID, "info", "node paths updated", map[string]any{ + "paths": nextPaths, + }) if err := syncHealthyCapabilities(sessionCtx, client, nodeID, configPath, probe); err != nil { return retryableDaemonError{err: err} } @@ -1057,7 +1135,11 @@ func runDaemonSession( } case "node.update": logf("update command node=%s", nodeID) + sendNodeLog(sessionCtx, client, nodeID, "warn", "node update requested", nil) if err := update(sessionCtx, os.Stdout, os.Stderr); err != nil { + sendNodeLog(sessionCtx, client, nodeID, "error", "node update failed", map[string]any{ + "error": err.Error(), + }) return fmt.Errorf("node update failed: %w", err) } return nil @@ -1078,6 +1160,25 @@ func syncHealthyCapabilities( } capabilities := capabilitiesWithStatus(ctx, configuredAgents(config), probe) logf("capability sync node=%s config=%s configured=%d healthy=%d", nodeID, configPath, len(config.Agents), len(capabilities)) + errorCount := 0 + for _, capability := range capabilities { + if capability["status"] != "error" { + continue + } + errorCount++ + sendNodeLog(ctx, client, nodeID, "error", "agent health probe failed", map[string]any{ + "agentId": capability["id"], + "agentName": capability["name"], + "acpxAgent": capability["acpxAgent"], + "error": capability["error"], + }) + } + sendNodeLog(ctx, client, nodeID, "info", "capability sync completed", map[string]any{ + "config": configPath, + "configured": len(config.Agents), + "healthy": len(capabilities) - errorCount, + "errors": errorCount, + }) return client.Send(ctx, nodeclient.Envelope{ Type: "node.capabilities.sync", RequestID: fmt.Sprintf("sync-%d", time.Now().UnixNano()), diff --git a/internal/app/app_test.go b/internal/app/app_test.go index d2161e3..b2e7558 100644 --- a/internal/app/app_test.go +++ b/internal/app/app_test.go @@ -1,6 +1,7 @@ package app import ( + "bytes" "context" "errors" "fmt" @@ -160,6 +161,31 @@ func TestRunDispatchesDetectSubcommand(t *testing.T) { } } +func TestRunLogsSubcommandTailsUserServiceJournal(t *testing.T) { + binDir := t.TempDir() + logPath := filepath.Join(t.TempDir(), "journal-args") + writeExecutable(t, filepath.Join(binDir, "journalctl"), fmt.Sprintf(`#!/bin/sh +printf '%%s\n' "$@" > %q +printf 'node log line\n' +`, logPath)) + t.Setenv("PATH", binDir) + + var stdout bytes.Buffer + if err := runLogs(context.Background(), []string{"--service", "amesh-node", "-n", "12", "--follow=false"}, &stdout, io.Discard); err != nil { + t.Fatalf("runLogs() error = %v", err) + } + if got := stdout.String(); !strings.Contains(got, "node log line") { + t.Fatalf("stdout = %q, want journal output", got) + } + bytes, err := os.ReadFile(logPath) + if err != nil { + t.Fatal(err) + } + if got := string(bytes); !strings.Contains(got, "--user\n-u\namesh-node\n-n\n12\n--no-pager\n") { + t.Fatalf("journal args = %q", got) + } +} + func TestRunDaemonSessionHandlesNodeUpdate(t *testing.T) { t.Parallel() @@ -441,6 +467,63 @@ func TestCapabilitiesWithStatus(t *testing.T) { } } +func TestRunDaemonSessionSendsHealthProbeLogs(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client := &fakeDaemonClient{ + readResults: []fakeReadResult{ + {envelope: nodeclient.Envelope{Type: "node.resumed"}}, + }, + blockReadsUntilCanceled: true, + } + + err := runDaemonSession( + ctx, + "ws://example.invalid/ws?role=node", + "node-a", + "token-a", + writeConfig(t, nodeconfig.File{ + NodeName: "node-a", + Agents: []nodeconfig.AgentConfig{ + {ID: "agent-openclaw", Name: "OpenClaw", ACPXAgent: "openclaw"}, + }, + }), + acpx.Runner{}, + newSessionStore(), + func(string) daemonClient { return client }, + func(context.Context, nodeconfig.AgentConfig) error { + cancel() + return errors.New("ACP metadata is missing") + }, + func(context.Context, io.Writer, io.Writer) error { return nil }, + func(context.Context, string) error { return nil }, + ) + if err != nil { + t.Fatalf("runDaemonSession() error = %v", err) + } + + for _, envelope := range client.sent { + if envelope.Type != "node.log" { + continue + } + if envelope.Payload["message"] == "agent health probe failed" && + envelope.Payload["level"] == "error" { + contextPayload, ok := envelope.Payload["context"].(map[string]any) + if !ok { + t.Fatalf("node log context = %#v, want map", envelope.Payload["context"]) + } + if contextPayload["agentId"] != "agent-openclaw" { + t.Fatalf("agentId = %#v, want agent-openclaw", contextPayload["agentId"]) + } + return + } + } + t.Fatalf("missing health probe node.log in %#v", client.sent) +} + func TestParseDetectableAgentsFromACPXHelp(t *testing.T) { t.Parallel() @@ -679,6 +762,9 @@ func assertEnvelopeTypes(t *testing.T, envelopes []nodeclient.Envelope, want []s got := make([]string, 0, len(envelopes)) for _, envelope := range envelopes { + if envelope.Type == "node.log" { + continue + } got = append(got, envelope.Type) } if !slices.Equal(got, want) { diff --git a/packages/protocol/src/index.ts b/packages/protocol/src/index.ts index 067119f..538a6b6 100644 --- a/packages/protocol/src/index.ts +++ b/packages/protocol/src/index.ts @@ -190,6 +190,29 @@ export type BrowseNodeDirectoriesResponse = z.infer< typeof browseNodeDirectoriesResponseSchema >; +export const nodeLogLevelSchema = z.enum(["debug", "info", "warn", "error"]); +export type NodeLogLevel = z.infer; + +export const nodeLogPayloadSchema = z.object({ + nodeId: z.string(), + level: nodeLogLevelSchema.default("info"), + message: z.string(), + context: payloadSchema.default({}), + observedAt: z.string() +}); +export type NodeLogPayload = z.infer; + +export const nodeLogEntrySchema = nodeLogPayloadSchema.extend({ + id: z.string() +}); +export type NodeLogEntry = z.infer; + +export const nodeLogsResponseSchema = z.object({ + nodeId: z.string(), + entries: z.array(nodeLogEntrySchema) +}); +export type NodeLogsResponse = z.infer; + export const sessionStartPayloadSchema = z.object({ sessionId: z.string(), agentId: z.string(), @@ -274,6 +297,10 @@ export const browserRealtimeEventSchema = z.discriminatedUnion("type", [ type: z.literal("topology.updated"), payload: topologySnapshotSchema }), + z.object({ + type: z.literal("node.logs.updated"), + payload: nodeLogsResponseSchema + }), z.object({ type: z.literal("session.updated"), payload: z.object({ diff --git a/scripts/test-install-amesh-node.sh b/scripts/test-install-amesh-node.sh index 08bfeed..f9e0654 100644 --- a/scripts/test-install-amesh-node.sh +++ b/scripts/test-install-amesh-node.sh @@ -107,6 +107,12 @@ set -euo pipefail exit 0 BIN chmod +x "$target_dir/amesh-node" +cat <<'BIN' >"$target_dir/amesh" +#!/usr/bin/env bash +set -euo pipefail +exit 0 +BIN +chmod +x "$target_dir/amesh" EOF chmod +x "$stdin_stub_dir/tar" @@ -166,3 +172,4 @@ fi assert_contains 'Environment="AMESH_ACPX_PATH=' "$stdin_env_dir/amesh-node.service" assert_contains 'Environment="AMESH_NODE_VERSION=test-tag"' "$stdin_env_dir/amesh-node.service" assert_contains "$stdin_space_dir" "$stdin_env_dir/amesh-node.service" +test -x "$stdin_env_dir/bin/amesh" From a9b5226cc6c04f35594791e138f7504dc73a472e Mon Sep 17 00:00:00 2001 From: Nitay Rabinovich Date: Wed, 13 May 2026 21:13:12 +0200 Subject: [PATCH 2/2] fix acpx stdout streaming race --- internal/acpx/runner.go | 105 +++++++++++++++++++++-------------- internal/acpx/runner_test.go | 21 ++----- 2 files changed, 67 insertions(+), 59 deletions(-) diff --git a/internal/acpx/runner.go b/internal/acpx/runner.go index 5296247..f2470bc 100644 --- a/internal/acpx/runner.go +++ b/internal/acpx/runner.go @@ -1,7 +1,6 @@ package acpx import ( - "bufio" "bytes" "context" "encoding/json" @@ -204,14 +203,17 @@ func runStreamingCommand( cmd := exec.CommandContext(ctx, command, args...) cmd.Dir = workingDir cmd.Env = append(cmd.Environ(), env...) - stdout, err := cmd.StdoutPipe() - if err != nil { - return nil, fmt.Errorf("open stdout pipe: %w", err) - } - stderr, err := cmd.StderrPipe() - if err != nil { - return nil, fmt.Errorf("open stderr pipe: %w", err) + + var ( + stdoutBuf bytes.Buffer + stderrBuf bytes.Buffer + ) + stdoutWriter := &lineStreamingWriter{ + output: &stdoutBuf, + onStdoutLine: onStdoutLine, } + cmd.Stdout = stdoutWriter + cmd.Stderr = &stderrBuf if stdinText != "" { stdin, err := cmd.StdinPipe() @@ -228,41 +230,9 @@ func runStreamingCommand( return nil, fmt.Errorf("start acpx: %w", err) } - var ( - stdoutBuf bytes.Buffer - stderrBuf bytes.Buffer - mu sync.Mutex - wg sync.WaitGroup - ) - - wg.Add(2) - go func() { - defer wg.Done() - scanner := bufio.NewScanner(stdout) - scanner.Buffer(make([]byte, 64*1024), 8*1024*1024) - for scanner.Scan() { - line := scanner.Bytes() - mu.Lock() - stdoutBuf.Write(line) - stdoutBuf.WriteByte('\n') - mu.Unlock() - if onStdoutLine != nil { - onStdoutLine(string(line)) - } - } - }() - go func() { - defer wg.Done() - // Drain stderr so the child process never blocks on a full pipe. - // Keep it out of stdout/chat events, but retain it for errors because - // provider init failures such as OpenClaw's ACP metadata error arrive - // there. - _, _ = io.Copy(&stderrBuf, stderr) - }() - - err = cmd.Wait() - wg.Wait() - output := stdoutBuf.Bytes() + err := cmd.Wait() + stdoutWriter.Flush() + output := append([]byte(nil), stdoutBuf.Bytes()...) if err != nil { if stderrText := strings.TrimSpace(stderrBuf.String()); stderrText != "" { return output, fmt.Errorf("run acpx: %w: %s", err, stderrText) @@ -271,3 +241,52 @@ func runStreamingCommand( } return output, nil } + +type lineStreamingWriter struct { + mu sync.Mutex + output *bytes.Buffer + pending []byte + onStdoutLine func(line string) +} + +func (writer *lineStreamingWriter) Write(chunk []byte) (int, error) { + writer.mu.Lock() + defer writer.mu.Unlock() + + written := len(chunk) + if _, err := writer.output.Write(chunk); err != nil { + return 0, err + } + + for len(chunk) > 0 { + index := bytes.IndexByte(chunk, '\n') + if index == -1 { + writer.pending = append(writer.pending, chunk...) + break + } + line := append(writer.pending, chunk[:index]...) + writer.pending = writer.pending[:0] + writer.emitLocked(line) + chunk = chunk[index+1:] + } + + return written, nil +} + +func (writer *lineStreamingWriter) Flush() { + writer.mu.Lock() + defer writer.mu.Unlock() + + if len(writer.pending) == 0 { + return + } + writer.emitLocked(writer.pending) + writer.pending = nil +} + +func (writer *lineStreamingWriter) emitLocked(line []byte) { + if writer.onStdoutLine == nil { + return + } + writer.onStdoutLine(string(line)) +} diff --git a/internal/acpx/runner_test.go b/internal/acpx/runner_test.go index 698dfd0..a8bc526 100644 --- a/internal/acpx/runner_test.go +++ b/internal/acpx/runner_test.go @@ -12,15 +12,14 @@ import ( ) func TestRunnerStreamsStdoutLineByLine(t *testing.T) { - t.Parallel() - var lines []string - command := writeHelperExecutable(t, `#!/bin/sh -printf '%s\n' one two -printf '%s\n' noise >&2 -`) + command := "/bin/sh" output, err := (Runner{}).Run(context.Background(), RunRequest{ Command: command, + Args: []string{ + "-c", + "printf '%s\n' one two; printf '%s\n' noise >&2", + }, }, func(line string) { lines = append(lines, line) }) @@ -269,13 +268,3 @@ func helperCommand(t *testing.T, mode string) (string, []string) { return os.Args[0], []string{"-test.run=TestRunnerHelperProcess", "--", mode} } - -func writeHelperExecutable(t *testing.T, contents string) string { - t.Helper() - - path := filepath.Join(t.TempDir(), "helper") - if err := os.WriteFile(path, []byte(contents), 0o755); err != nil { - t.Fatalf("write helper executable: %v", err) - } - return path -}