From fdfdcb2c9c33bd186ba66187ee4610592264fdc0 Mon Sep 17 00:00:00 2001 From: Henry Lach Date: Mon, 25 May 2026 14:53:50 -0400 Subject: [PATCH] fix(#507,#509): dashboard merge-snapshot collision + no-batch poll debounce Two paired dashboard fixes, landed together because both are small-surface-area observability issues in the same UI layer. #509 \u2014 merge agent telemetry missing for some waves Root cause: writeMergeSnapshot keyed the on-disk filename by mergeNumber alone (mergeNumber == lane.laneNumber). Lane numbers reset every wave, so wave N+1's lane-1 merge silently overwrote wave N's lane-1 terminal snapshot before the dashboard could observe it. The user-visible symptom was '\u2014' in the merge telemetry column for any wave whose lane numbers were reused by a subsequent wave. The fix is per-wave filename namespacing in runtimeMergeSnapshotPath: Before: .pi/runtime/{batchId}/lanes/merge-{mergeNumber}.json After: .pi/runtime/{batchId}/lanes/merge-w{waveIndex}-{mergeNumber}.json writeMergeSnapshot / readMergeSnapshot signatures gain waveIndex. All four call sites in merge.ts already had waveIndex in scope (it's a parameter of spawnMergeAgentV2) and pass it through as 'waveIndex ?? 0', matching the existing nullish-coalesce pattern used to populate the snapshot's own waveIndex field on lines 894-895. Dashboard server's loadRuntimeMergeSnapshots in dashboard/server.cjs correspondingly switches its intermediate map key from mergeNumber alone to 'w{waveIndex}-{mergeNumber}', because keying by mergeNumber alone reproduced the same collision at read time (multiple wave snapshots read off disk but only the last-iterated kept in memory). Filename filter remains permissive ('merge-*.json') so legacy snapshots from pre-fix batches still load and key as plain '{mergeNumber}' for back-compat. #507 \u2014 dashboard briefly flips to history view at batch startup Root cause: a single missed SSE poll during batch-state.json write at startup triggered the no-batch handler in app.js, which closes the viewer and renders the history panel. The next poll then picked up the new batch and flipped back to live view, but the user perceived a 'flash of history' between two live batches. The fix is a 3-consecutive-miss debounce on the no-batch transition in dashboard/public/app.js. With the server's 2s POLL_INTERVAL, the threshold corresponds to ~6s of confirmed batch-absence \u2014 well past the sub-second batch-state.json write window while still cleaning up promptly when a batch genuinely ends. The miss counter resets the moment a batch reappears, so back-to-back batches no longer flash the history panel. Tests Adds 4 regression tests to process-registry.test.ts (7.4 through 7.7) covering the #509 invariants: 7.4 \u2014 writeMergeSnapshot produces a filename containing the waveIndex 7.5 \u2014 same mergeNumber across two waves writes to distinct files 7.6 \u2014 wave-1 write does not overwrite wave-0 with same mergeNumber (exact failure mode from the issue body) 7.7 \u2014 readMergeSnapshot returns null for absent (wave,mergeNumber) tuples (no accidental cross-wave fallback) #507 is a pure presentation-layer debounce in app.js with no Node-side tests \u2014 the debounce kinetics depend on browser SSE timing and are best validated by running a back-to-back batch sequence locally against the dashboard. Validation npm run typecheck pass npm run lint 286 warnings / 671 infos (identical to main) npm run format:check pass process-registry.test.ts 42/42 pass (4 new) Full test suite 3708/3709 pass, 1 skipped (zero new failures) taskplane help / doctor pass Closes #507 Closes #509 --- dashboard/public/app.js | 24 +++++++ dashboard/server.cjs | 29 ++++++-- extensions/taskplane/merge.ts | 8 +-- extensions/taskplane/process-registry.ts | 15 ++-- extensions/taskplane/types.ts | 16 ++++- extensions/tests/process-registry.test.ts | 85 +++++++++++++++++++++++ 6 files changed, 164 insertions(+), 13 deletions(-) diff --git a/dashboard/public/app.js b/dashboard/public/app.js index dd45d12d..eb17412f 100644 --- a/dashboard/public/app.js +++ b/dashboard/public/app.js @@ -243,6 +243,17 @@ let viewerMode = null; // "conversation" | "status-md" | null let viewerTarget = null; // session name (conversation) or taskId (status-md) let lastBatchId = null; // TP-178: track batchId for stale viewer detection (#487) +// #507: Debounce the no-batch transition. A single missed poll happens +// transiently during batch-state.json writes at batch startup, and was +// causing the dashboard to flash the previous batch's history view before +// switching to the new live batch. Require N consecutive no-batch polls +// before clearing the viewer / showing history. With the server's 2s +// POLL_INTERVAL, a threshold of 3 corresponds to ~6s of confirmed silence — +// well past the typical batch-state.json write window (sub-second) while +// still cleaning up promptly when a batch genuinely ends. +let consecutiveNoBatchPolls = 0; +const NO_BATCH_DEBOUNCE_THRESHOLD = 3; + // ─── Repo Helpers ─────────────────────────────────────────────────────────── /** @@ -1818,6 +1829,16 @@ function render(data) { $lastUpdate.textContent = new Date().toLocaleTimeString(); if (!batch) { + // #507: A single missed poll during batch startup (batch-state.json being + // written) is not a real "batch disappeared" signal. Only act on no-batch + // after N consecutive polls confirm it, so we don't flash the history + // view between two live batches. + consecutiveNoBatchPolls += 1; + if (consecutiveNoBatchPolls < NO_BATCH_DEBOUNCE_THRESHOLD) { + // Hold the previous render in place. Still tick the timestamp so the + // user knows the SSE stream is alive. + return; + } // TP-178: Clear viewer when batch disappears (#487) if (lastBatchId && viewerMode) closeViewer(); lastBatchId = null; @@ -1830,6 +1851,9 @@ function render(data) { return; } + // Batch present — reset the no-batch debounce counter (#507). + consecutiveNoBatchPolls = 0; + // TP-178: Detect batchId change — clear stale viewer state (#487) if (batch.batchId && lastBatchId && batch.batchId !== lastBatchId && viewerMode) { closeViewer(); diff --git a/dashboard/server.cjs b/dashboard/server.cjs index 7c50ccb5..04152efe 100644 --- a/dashboard/server.cjs +++ b/dashboard/server.cjs @@ -467,12 +467,26 @@ function loadRuntimeLaneSnapshots(batchId) { /** * Load Runtime V2 merge agent snapshots for the current batch. * - * Reads all `merge-N.json` files from `.pi/runtime/{batchId}/lanes/`. - * Returns a map of mergeNumber (string) → snapshot data. + * Reads all `merge-*.json` files from `.pi/runtime/{batchId}/lanes/`. + * Returns a map of unique key → snapshot data, where the key is a composite + * of waveIndex and mergeNumber. + * + * The composite key is essential because lane numbers (and therefore + * `mergeNumber`) repeat across waves — keying solely by `mergeNumber` caused + * wave N+1's snapshots to silently overwrite wave N's in the intermediate + * map, which is the root cause of #509 ('merge agent telemetry missing for + * some waves'). * * Follows the same pattern as {@link loadRuntimeLaneSnapshots}. * - * @since TP-164 + * Filename accepted patterns (back-compat-tolerant): + * merge-w{waveIndex}-{mergeNumber}.json (current, post-#509) + * merge-{mergeNumber}.json (legacy, pre-#509) + * + * Both patterns embed waveIndex inside the snapshot JSON itself, so the key + * derivation works for either filename. + * + * @since TP-164 (composite key added in #509 remediation) */ function loadRuntimeMergeSnapshots(batchId) { if (!batchId) return {}; @@ -484,7 +498,14 @@ function loadRuntimeMergeSnapshots(batchId) { for (const file of files) { try { const data = JSON.parse(fs.readFileSync(path.join(lanesDir, file), "utf-8")); - if (data.mergeNumber != null) snapshots[data.mergeNumber] = data; + if (data.mergeNumber == null) continue; + // Composite key keeps cross-wave snapshots from colliding in this map. + // Falls back to mergeNumber-only for legacy snapshots that pre-date + // the waveIndex-in-filename change. + const key = data.waveIndex != null + ? `w${data.waveIndex}-${data.mergeNumber}` + : String(data.mergeNumber); + snapshots[key] = data; } catch { continue; } } } catch { /* dir missing */ } diff --git a/extensions/taskplane/merge.ts b/extensions/taskplane/merge.ts index 0bcf1e67..aa88022a 100644 --- a/extensions/taskplane/merge.ts +++ b/extensions/taskplane/merge.ts @@ -895,7 +895,7 @@ export async function spawnMergeAgentV2( agent: buildAgentSnap(tel, "running"), updatedAt: Date.now(), }; - writeMergeSnapshot(mergeStateRoot, bid, mergeNumber, snap); + writeMergeSnapshot(mergeStateRoot, bid, waveIndex ?? 0, mergeNumber, snap); } catch { /* non-fatal */ } @@ -915,7 +915,7 @@ export async function spawnMergeAgentV2( agent: buildAgentSnap({}, "running"), updatedAt: Date.now(), }; - writeMergeSnapshot(mergeStateRoot, bid, mergeNumber, initialSnap); + writeMergeSnapshot(mergeStateRoot, bid, waveIndex ?? 0, mergeNumber, initialSnap); } catch { /* non-fatal */ } @@ -964,7 +964,7 @@ export async function spawnMergeAgentV2( agent: buildAgentSnap(result, terminalStatus === "complete" ? "exited" : "crashed"), updatedAt: Date.now(), }; - writeMergeSnapshot(mergeStateRoot, bid, mergeNumber, snap); + writeMergeSnapshot(mergeStateRoot, bid, waveIndex ?? 0, mergeNumber, snap); } catch { /* non-fatal */ } @@ -987,7 +987,7 @@ export async function spawnMergeAgentV2( agent: buildAgentSnap({}, "crashed"), updatedAt: Date.now(), }; - writeMergeSnapshot(mergeStateRoot, bid, mergeNumber, snap); + writeMergeSnapshot(mergeStateRoot, bid, waveIndex ?? 0, mergeNumber, snap); } catch { /* non-fatal */ } diff --git a/extensions/taskplane/process-registry.ts b/extensions/taskplane/process-registry.ts index c432d19b..4f7b78e2 100644 --- a/extensions/taskplane/process-registry.ts +++ b/extensions/taskplane/process-registry.ts @@ -400,20 +400,25 @@ export function readLaneSnapshot( * Stored in the `lanes/` directory alongside lane snapshots so the dashboard * server picks it up with the same scan that reads lane-N.json files. * + * Filename includes BOTH waveIndex and mergeNumber so wave-N+1's merges + * cannot overwrite wave-N's snapshots before the dashboard polls them (#509). + * * @param stateRoot - Repository root (where `.pi/` lives) * @param batchId - Current batch identifier + * @param waveIndex - 0-based wave index for the merge * @param mergeNumber - 1-indexed merge agent number * @param snapshot - Snapshot data to persist * - * @since TP-164 + * @since TP-164 (waveIndex parameter added in #509 remediation) */ export function writeMergeSnapshot( stateRoot: string, batchId: string, + waveIndex: number, mergeNumber: number, snapshot: RuntimeMergeSnapshot, ): void { - const path = runtimeMergeSnapshotPath(stateRoot, batchId, mergeNumber); + const path = runtimeMergeSnapshotPath(stateRoot, batchId, waveIndex, mergeNumber); mkdirSync(dirname(path), { recursive: true }); const tmpPath = path + ".tmp"; writeFileSync(tmpPath, JSON.stringify(snapshot, null, 2) + "\n", "utf-8"); @@ -426,17 +431,19 @@ export function writeMergeSnapshot( * * @param stateRoot - Repository root (where `.pi/` lives) * @param batchId - Current batch identifier + * @param waveIndex - 0-based wave index for the merge * @param mergeNumber - 1-indexed merge agent number * - * @since TP-164 + * @since TP-164 (waveIndex parameter added in #509 remediation) */ export function readMergeSnapshot( stateRoot: string, batchId: string, + waveIndex: number, mergeNumber: number, ): RuntimeMergeSnapshot | null { try { - const p = runtimeMergeSnapshotPath(stateRoot, batchId, mergeNumber); + const p = runtimeMergeSnapshotPath(stateRoot, batchId, waveIndex, mergeNumber); if (!existsSync(p)) return null; return JSON.parse(readFileSync(p, "utf-8")) as RuntimeMergeSnapshot; } catch { diff --git a/extensions/taskplane/types.ts b/extensions/taskplane/types.ts index cc92a04b..cb5c9458 100644 --- a/extensions/taskplane/types.ts +++ b/extensions/taskplane/types.ts @@ -4323,12 +4323,26 @@ export interface RuntimeMergeSnapshot { * * @since TP-164 */ +/** + * Path to a merge agent snapshot file. + * + * The filename includes BOTH `waveIndex` and `mergeNumber` because lane + * numbers (and therefore the legacy `mergeNumber`-only filename) repeat + * across waves — a wave-2 lane-1 merge would overwrite the wave-1 lane-1 + * snapshot before the dashboard's next poll could read it. Per-wave + * namespacing keeps each merge's snapshot durable until the runtime + * directory itself is cleaned up at end-of-batch. See #509. + * + * @param waveIndex 0-based wave index for the merge + * @param mergeNumber 1-based merge agent number (derived from lane number) + */ export function runtimeMergeSnapshotPath( stateRoot: string, batchId: string, + waveIndex: number, mergeNumber: number, ): string { - return `${stateRoot}/.pi/runtime/${batchId}/lanes/merge-${mergeNumber}.json`; + return `${stateRoot}/.pi/runtime/${batchId}/lanes/merge-w${waveIndex}-${mergeNumber}.json`; } /** diff --git a/extensions/tests/process-registry.test.ts b/extensions/tests/process-registry.test.ts index 56c10ecb..f777b8ea 100644 --- a/extensions/tests/process-registry.test.ts +++ b/extensions/tests/process-registry.test.ts @@ -38,6 +38,8 @@ import { cleanupBatchRuntime, appendAgentEvent, writeLaneSnapshot, + writeMergeSnapshot, + readMergeSnapshot, } from "../taskplane/process-registry.ts"; import { @@ -45,7 +47,9 @@ import { runtimeRegistryPath, runtimeAgentEventsPath, runtimeLaneSnapshotPath, + runtimeMergeSnapshotPath, type RuntimeAgentManifest, + type RuntimeMergeSnapshot, } from "../taskplane/types.ts"; let tmpDir: string; @@ -417,6 +421,87 @@ describe("7.x: Event and snapshot persistence", () => { const data = JSON.parse(readFileSync(path, "utf-8")); expect(data.laneNumber).toBe(1); }); + + // ── #509 regression: merge snapshots are namespaced per wave ────────── + // Pre-fix, writeMergeSnapshot keyed the on-disk filename by mergeNumber + // alone (which is derived from lane.laneNumber). Because lane numbers + // reset every wave, wave N+1's first merge would overwrite wave N's + // first merge before the dashboard could observe the terminal snapshot, + // causing the dashboard's merge telemetry column to render '—' for any + // wave whose lane numbers were reused by a subsequent wave. + + function sampleSnapshot( + waveIndex: number, + mergeNumber: number, + status: RuntimeMergeSnapshot["status"] = "complete", + ): RuntimeMergeSnapshot { + return { + batchId, + mergeNumber, + sessionName: `orch-${batchId}-merge-w${waveIndex}-${mergeNumber}`, + waveIndex, + status, + agent: { + contextPct: 0, + costUsd: 0, + elapsedMs: 0, + inputTokens: 0, + outputTokens: 0, + cacheReadTokens: 0, + cacheWriteTokens: 0, + toolCalls: 0, + lastTool: "", + status: status === "running" ? "running" : "exited", + }, + updatedAt: Date.now(), + } as RuntimeMergeSnapshot; + } + + it("7.4: writeMergeSnapshot namespaces filename by wave (#509 regression)", () => { + writeMergeSnapshot(tmpDir, batchId, 0, 1, sampleSnapshot(0, 1)); + const expected = runtimeMergeSnapshotPath(tmpDir, batchId, 0, 1); + expect(existsSync(expected)).toBe(true); + // The new filename must include the waveIndex so wave-N+1's lane-1 + // merge cannot reuse the same path as wave-N's lane-1 merge. + expect(expected).toMatch(/merge-w0-1\.json$/); + }); + + it("7.5: same mergeNumber across two waves writes to distinct files (#509)", () => { + writeMergeSnapshot(tmpDir, batchId, 0, 1, sampleSnapshot(0, 1, "complete")); + writeMergeSnapshot(tmpDir, batchId, 1, 1, sampleSnapshot(1, 1, "running")); + + const wave0Path = runtimeMergeSnapshotPath(tmpDir, batchId, 0, 1); + const wave1Path = runtimeMergeSnapshotPath(tmpDir, batchId, 1, 1); + + expect(wave0Path).not.toBe(wave1Path); + expect(existsSync(wave0Path)).toBe(true); + expect(existsSync(wave1Path)).toBe(true); + }); + + it("7.6: writing wave 1 does not overwrite wave 0 with same mergeNumber (#509)", () => { + // This is the exact failure mode from the issue: wave-2 lane-1 merge + // trampling wave-1 lane-1's terminal snapshot. + writeMergeSnapshot(tmpDir, batchId, 0, 1, sampleSnapshot(0, 1, "complete")); + writeMergeSnapshot(tmpDir, batchId, 1, 1, sampleSnapshot(1, 1, "running")); + + const wave0Snap = readMergeSnapshot(tmpDir, batchId, 0, 1); + const wave1Snap = readMergeSnapshot(tmpDir, batchId, 1, 1); + + expect(wave0Snap).not.toBeNull(); + expect(wave1Snap).not.toBeNull(); + expect(wave0Snap?.waveIndex).toBe(0); + expect(wave0Snap?.status).toBe("complete"); + expect(wave1Snap?.waveIndex).toBe(1); + expect(wave1Snap?.status).toBe("running"); + }); + + it("7.7: readMergeSnapshot returns null for absent (wave, mergeNumber) tuple (#509)", () => { + writeMergeSnapshot(tmpDir, batchId, 0, 1, sampleSnapshot(0, 1)); + // Same wave, different mergeNumber — absent + expect(readMergeSnapshot(tmpDir, batchId, 0, 99)).toBeNull(); + // Different wave, same mergeNumber — also absent (no cross-wave fallback) + expect(readMergeSnapshot(tmpDir, batchId, 5, 1)).toBeNull(); + }); }); // ── 8. Agent-host export contract ───────────────────────────────────