diff --git a/dashboard/public/app.js b/dashboard/public/app.js index dd45d12d..eb17412f 100644 --- a/dashboard/public/app.js +++ b/dashboard/public/app.js @@ -243,6 +243,17 @@ let viewerMode = null; // "conversation" | "status-md" | null let viewerTarget = null; // session name (conversation) or taskId (status-md) let lastBatchId = null; // TP-178: track batchId for stale viewer detection (#487) +// #507: Debounce the no-batch transition. A single missed poll happens +// transiently during batch-state.json writes at batch startup, and was +// causing the dashboard to flash the previous batch's history view before +// switching to the new live batch. Require N consecutive no-batch polls +// before clearing the viewer / showing history. With the server's 2s +// POLL_INTERVAL, a threshold of 3 corresponds to ~6s of confirmed silence — +// well past the typical batch-state.json write window (sub-second) while +// still cleaning up promptly when a batch genuinely ends. +let consecutiveNoBatchPolls = 0; +const NO_BATCH_DEBOUNCE_THRESHOLD = 3; + // ─── Repo Helpers ─────────────────────────────────────────────────────────── /** @@ -1818,6 +1829,16 @@ function render(data) { $lastUpdate.textContent = new Date().toLocaleTimeString(); if (!batch) { + // #507: A single missed poll during batch startup (batch-state.json being + // written) is not a real "batch disappeared" signal. Only act on no-batch + // after N consecutive polls confirm it, so we don't flash the history + // view between two live batches. + consecutiveNoBatchPolls += 1; + if (consecutiveNoBatchPolls < NO_BATCH_DEBOUNCE_THRESHOLD) { + // Hold the previous render in place. Still tick the timestamp so the + // user knows the SSE stream is alive. + return; + } // TP-178: Clear viewer when batch disappears (#487) if (lastBatchId && viewerMode) closeViewer(); lastBatchId = null; @@ -1830,6 +1851,9 @@ function render(data) { return; } + // Batch present — reset the no-batch debounce counter (#507). + consecutiveNoBatchPolls = 0; + // TP-178: Detect batchId change — clear stale viewer state (#487) if (batch.batchId && lastBatchId && batch.batchId !== lastBatchId && viewerMode) { closeViewer(); diff --git a/dashboard/server.cjs b/dashboard/server.cjs index 7c50ccb5..04152efe 100644 --- a/dashboard/server.cjs +++ b/dashboard/server.cjs @@ -467,12 +467,26 @@ function loadRuntimeLaneSnapshots(batchId) { /** * Load Runtime V2 merge agent snapshots for the current batch. * - * Reads all `merge-N.json` files from `.pi/runtime/{batchId}/lanes/`. - * Returns a map of mergeNumber (string) → snapshot data. + * Reads all `merge-*.json` files from `.pi/runtime/{batchId}/lanes/`. + * Returns a map of unique key → snapshot data, where the key is a composite + * of waveIndex and mergeNumber. + * + * The composite key is essential because lane numbers (and therefore + * `mergeNumber`) repeat across waves — keying solely by `mergeNumber` caused + * wave N+1's snapshots to silently overwrite wave N's in the intermediate + * map, which is the root cause of #509 ('merge agent telemetry missing for + * some waves'). * * Follows the same pattern as {@link loadRuntimeLaneSnapshots}. * - * @since TP-164 + * Filename accepted patterns (back-compat-tolerant): + * merge-w{waveIndex}-{mergeNumber}.json (current, post-#509) + * merge-{mergeNumber}.json (legacy, pre-#509) + * + * Both patterns embed waveIndex inside the snapshot JSON itself, so the key + * derivation works for either filename. + * + * @since TP-164 (composite key added in #509 remediation) */ function loadRuntimeMergeSnapshots(batchId) { if (!batchId) return {}; @@ -484,7 +498,14 @@ function loadRuntimeMergeSnapshots(batchId) { for (const file of files) { try { const data = JSON.parse(fs.readFileSync(path.join(lanesDir, file), "utf-8")); - if (data.mergeNumber != null) snapshots[data.mergeNumber] = data; + if (data.mergeNumber == null) continue; + // Composite key keeps cross-wave snapshots from colliding in this map. + // Falls back to mergeNumber-only for legacy snapshots that pre-date + // the waveIndex-in-filename change. + const key = data.waveIndex != null + ? `w${data.waveIndex}-${data.mergeNumber}` + : String(data.mergeNumber); + snapshots[key] = data; } catch { continue; } } } catch { /* dir missing */ } diff --git a/extensions/taskplane/merge.ts b/extensions/taskplane/merge.ts index 0bcf1e67..aa88022a 100644 --- a/extensions/taskplane/merge.ts +++ b/extensions/taskplane/merge.ts @@ -895,7 +895,7 @@ export async function spawnMergeAgentV2( agent: buildAgentSnap(tel, "running"), updatedAt: Date.now(), }; - writeMergeSnapshot(mergeStateRoot, bid, mergeNumber, snap); + writeMergeSnapshot(mergeStateRoot, bid, waveIndex ?? 0, mergeNumber, snap); } catch { /* non-fatal */ } @@ -915,7 +915,7 @@ export async function spawnMergeAgentV2( agent: buildAgentSnap({}, "running"), updatedAt: Date.now(), }; - writeMergeSnapshot(mergeStateRoot, bid, mergeNumber, initialSnap); + writeMergeSnapshot(mergeStateRoot, bid, waveIndex ?? 0, mergeNumber, initialSnap); } catch { /* non-fatal */ } @@ -964,7 +964,7 @@ export async function spawnMergeAgentV2( agent: buildAgentSnap(result, terminalStatus === "complete" ? "exited" : "crashed"), updatedAt: Date.now(), }; - writeMergeSnapshot(mergeStateRoot, bid, mergeNumber, snap); + writeMergeSnapshot(mergeStateRoot, bid, waveIndex ?? 0, mergeNumber, snap); } catch { /* non-fatal */ } @@ -987,7 +987,7 @@ export async function spawnMergeAgentV2( agent: buildAgentSnap({}, "crashed"), updatedAt: Date.now(), }; - writeMergeSnapshot(mergeStateRoot, bid, mergeNumber, snap); + writeMergeSnapshot(mergeStateRoot, bid, waveIndex ?? 0, mergeNumber, snap); } catch { /* non-fatal */ } diff --git a/extensions/taskplane/process-registry.ts b/extensions/taskplane/process-registry.ts index c432d19b..4f7b78e2 100644 --- a/extensions/taskplane/process-registry.ts +++ b/extensions/taskplane/process-registry.ts @@ -400,20 +400,25 @@ export function readLaneSnapshot( * Stored in the `lanes/` directory alongside lane snapshots so the dashboard * server picks it up with the same scan that reads lane-N.json files. * + * Filename includes BOTH waveIndex and mergeNumber so wave-N+1's merges + * cannot overwrite wave-N's snapshots before the dashboard polls them (#509). + * * @param stateRoot - Repository root (where `.pi/` lives) * @param batchId - Current batch identifier + * @param waveIndex - 0-based wave index for the merge * @param mergeNumber - 1-indexed merge agent number * @param snapshot - Snapshot data to persist * - * @since TP-164 + * @since TP-164 (waveIndex parameter added in #509 remediation) */ export function writeMergeSnapshot( stateRoot: string, batchId: string, + waveIndex: number, mergeNumber: number, snapshot: RuntimeMergeSnapshot, ): void { - const path = runtimeMergeSnapshotPath(stateRoot, batchId, mergeNumber); + const path = runtimeMergeSnapshotPath(stateRoot, batchId, waveIndex, mergeNumber); mkdirSync(dirname(path), { recursive: true }); const tmpPath = path + ".tmp"; writeFileSync(tmpPath, JSON.stringify(snapshot, null, 2) + "\n", "utf-8"); @@ -426,17 +431,19 @@ export function writeMergeSnapshot( * * @param stateRoot - Repository root (where `.pi/` lives) * @param batchId - Current batch identifier + * @param waveIndex - 0-based wave index for the merge * @param mergeNumber - 1-indexed merge agent number * - * @since TP-164 + * @since TP-164 (waveIndex parameter added in #509 remediation) */ export function readMergeSnapshot( stateRoot: string, batchId: string, + waveIndex: number, mergeNumber: number, ): RuntimeMergeSnapshot | null { try { - const p = runtimeMergeSnapshotPath(stateRoot, batchId, mergeNumber); + const p = runtimeMergeSnapshotPath(stateRoot, batchId, waveIndex, mergeNumber); if (!existsSync(p)) return null; return JSON.parse(readFileSync(p, "utf-8")) as RuntimeMergeSnapshot; } catch { diff --git a/extensions/taskplane/types.ts b/extensions/taskplane/types.ts index cc92a04b..cb5c9458 100644 --- a/extensions/taskplane/types.ts +++ b/extensions/taskplane/types.ts @@ -4323,12 +4323,26 @@ export interface RuntimeMergeSnapshot { * * @since TP-164 */ +/** + * Path to a merge agent snapshot file. + * + * The filename includes BOTH `waveIndex` and `mergeNumber` because lane + * numbers (and therefore the legacy `mergeNumber`-only filename) repeat + * across waves — a wave-2 lane-1 merge would overwrite the wave-1 lane-1 + * snapshot before the dashboard's next poll could read it. Per-wave + * namespacing keeps each merge's snapshot durable until the runtime + * directory itself is cleaned up at end-of-batch. See #509. + * + * @param waveIndex 0-based wave index for the merge + * @param mergeNumber 1-based merge agent number (derived from lane number) + */ export function runtimeMergeSnapshotPath( stateRoot: string, batchId: string, + waveIndex: number, mergeNumber: number, ): string { - return `${stateRoot}/.pi/runtime/${batchId}/lanes/merge-${mergeNumber}.json`; + return `${stateRoot}/.pi/runtime/${batchId}/lanes/merge-w${waveIndex}-${mergeNumber}.json`; } /** diff --git a/extensions/tests/process-registry.test.ts b/extensions/tests/process-registry.test.ts index 56c10ecb..f777b8ea 100644 --- a/extensions/tests/process-registry.test.ts +++ b/extensions/tests/process-registry.test.ts @@ -38,6 +38,8 @@ import { cleanupBatchRuntime, appendAgentEvent, writeLaneSnapshot, + writeMergeSnapshot, + readMergeSnapshot, } from "../taskplane/process-registry.ts"; import { @@ -45,7 +47,9 @@ import { runtimeRegistryPath, runtimeAgentEventsPath, runtimeLaneSnapshotPath, + runtimeMergeSnapshotPath, type RuntimeAgentManifest, + type RuntimeMergeSnapshot, } from "../taskplane/types.ts"; let tmpDir: string; @@ -417,6 +421,87 @@ describe("7.x: Event and snapshot persistence", () => { const data = JSON.parse(readFileSync(path, "utf-8")); expect(data.laneNumber).toBe(1); }); + + // ── #509 regression: merge snapshots are namespaced per wave ────────── + // Pre-fix, writeMergeSnapshot keyed the on-disk filename by mergeNumber + // alone (which is derived from lane.laneNumber). Because lane numbers + // reset every wave, wave N+1's first merge would overwrite wave N's + // first merge before the dashboard could observe the terminal snapshot, + // causing the dashboard's merge telemetry column to render '—' for any + // wave whose lane numbers were reused by a subsequent wave. + + function sampleSnapshot( + waveIndex: number, + mergeNumber: number, + status: RuntimeMergeSnapshot["status"] = "complete", + ): RuntimeMergeSnapshot { + return { + batchId, + mergeNumber, + sessionName: `orch-${batchId}-merge-w${waveIndex}-${mergeNumber}`, + waveIndex, + status, + agent: { + contextPct: 0, + costUsd: 0, + elapsedMs: 0, + inputTokens: 0, + outputTokens: 0, + cacheReadTokens: 0, + cacheWriteTokens: 0, + toolCalls: 0, + lastTool: "", + status: status === "running" ? "running" : "exited", + }, + updatedAt: Date.now(), + } as RuntimeMergeSnapshot; + } + + it("7.4: writeMergeSnapshot namespaces filename by wave (#509 regression)", () => { + writeMergeSnapshot(tmpDir, batchId, 0, 1, sampleSnapshot(0, 1)); + const expected = runtimeMergeSnapshotPath(tmpDir, batchId, 0, 1); + expect(existsSync(expected)).toBe(true); + // The new filename must include the waveIndex so wave-N+1's lane-1 + // merge cannot reuse the same path as wave-N's lane-1 merge. + expect(expected).toMatch(/merge-w0-1\.json$/); + }); + + it("7.5: same mergeNumber across two waves writes to distinct files (#509)", () => { + writeMergeSnapshot(tmpDir, batchId, 0, 1, sampleSnapshot(0, 1, "complete")); + writeMergeSnapshot(tmpDir, batchId, 1, 1, sampleSnapshot(1, 1, "running")); + + const wave0Path = runtimeMergeSnapshotPath(tmpDir, batchId, 0, 1); + const wave1Path = runtimeMergeSnapshotPath(tmpDir, batchId, 1, 1); + + expect(wave0Path).not.toBe(wave1Path); + expect(existsSync(wave0Path)).toBe(true); + expect(existsSync(wave1Path)).toBe(true); + }); + + it("7.6: writing wave 1 does not overwrite wave 0 with same mergeNumber (#509)", () => { + // This is the exact failure mode from the issue: wave-2 lane-1 merge + // trampling wave-1 lane-1's terminal snapshot. + writeMergeSnapshot(tmpDir, batchId, 0, 1, sampleSnapshot(0, 1, "complete")); + writeMergeSnapshot(tmpDir, batchId, 1, 1, sampleSnapshot(1, 1, "running")); + + const wave0Snap = readMergeSnapshot(tmpDir, batchId, 0, 1); + const wave1Snap = readMergeSnapshot(tmpDir, batchId, 1, 1); + + expect(wave0Snap).not.toBeNull(); + expect(wave1Snap).not.toBeNull(); + expect(wave0Snap?.waveIndex).toBe(0); + expect(wave0Snap?.status).toBe("complete"); + expect(wave1Snap?.waveIndex).toBe(1); + expect(wave1Snap?.status).toBe("running"); + }); + + it("7.7: readMergeSnapshot returns null for absent (wave, mergeNumber) tuple (#509)", () => { + writeMergeSnapshot(tmpDir, batchId, 0, 1, sampleSnapshot(0, 1)); + // Same wave, different mergeNumber — absent + expect(readMergeSnapshot(tmpDir, batchId, 0, 99)).toBeNull(); + // Different wave, same mergeNumber — also absent (no cross-wave fallback) + expect(readMergeSnapshot(tmpDir, batchId, 5, 1)).toBeNull(); + }); }); // ── 8. Agent-host export contract ───────────────────────────────────