-
Notifications
You must be signed in to change notification settings - Fork 787
Fix stuck running jobs by detecting dead review/task PIDs #176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
39b560f
5ff5fc5
2291d1a
b2dd44a
b155dab
6132e48
5c0d7cd
f0b9379
52e4f07
10f8528
f9e80a5
05a55a5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,7 +22,7 @@ import { | |
| } from "./lib/codex.mjs"; | ||
| import { readStdinIfPiped } from "./lib/fs.mjs"; | ||
| import { collectReviewContext, ensureGitRepository, resolveReviewTarget } from "./lib/git.mjs"; | ||
| import { binaryAvailable, terminateProcessTree } from "./lib/process.mjs"; | ||
| import { binaryAvailable, isProcessAlive, terminateProcessTree } from "./lib/process.mjs"; | ||
| import { loadPromptTemplate, interpolateTemplate } from "./lib/prompts.mjs"; | ||
| import { | ||
| generateJobId, | ||
|
|
@@ -35,6 +35,7 @@ import { | |
| import { | ||
| buildSingleJobSnapshot, | ||
| buildStatusSnapshot, | ||
| markDeadPidJobFailed, | ||
| readStoredJob, | ||
| resolveCancelableJob, | ||
| resolveResultJob, | ||
|
|
@@ -301,26 +302,50 @@ function filterJobsForCurrentClaudeSession(jobs) { | |
| } | ||
|
|
||
| function findLatestResumableTaskJob(jobs) { | ||
| return ( | ||
| jobs.find( | ||
| (job) => | ||
| job.jobClass === "task" && | ||
| job.threadId && | ||
| job.status !== "queued" && | ||
| job.status !== "running" | ||
| ) ?? null | ||
| ); | ||
| return jobs | ||
| .filter((job) => job.jobClass === "task" && !isActiveJobStatus(job.status)) | ||
| .find((job) => job.threadId != null); | ||
| } | ||
|
|
||
| function normalizeTrackedPid(pid) { | ||
| const numeric = Number(pid); | ||
| return Number.isFinite(numeric) && numeric > 0 ? numeric : null; | ||
| } | ||
|
Comment on lines
+310
to
313
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This hunk replaces the block that previously defined Useful? React with 👍 / 👎. |
||
|
|
||
| function reconcileDeadPidDuringWait(cwd, reference, snapshot) { | ||
| const trackedPid = normalizeTrackedPid(snapshot.job.pid); | ||
| if (!isActiveJobStatus(snapshot.job.status) || trackedPid == null || isProcessAlive(trackedPid)) { | ||
| return snapshot; | ||
Abdooo2235 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| try { | ||
| const didFail = markDeadPidJobFailed(snapshot.workspaceRoot, snapshot.job.id, trackedPid); | ||
| if (!didFail) { | ||
| return buildSingleJobSnapshot(cwd, reference); | ||
| } | ||
| } catch (error) { | ||
| // Never let reconciliation errors crash the poll loop. | ||
| appendLogLine( | ||
| snapshot.job.logFile ?? null, | ||
| `Dead-PID reconciliation skipped due to unexpected error: ${error instanceof Error ? error.message : String(error)}` | ||
| ); | ||
| return buildSingleJobSnapshot(cwd, reference); | ||
| } | ||
| return buildSingleJobSnapshot(cwd, reference); | ||
| } | ||
|
|
||
| async function waitForSingleJobSnapshot(cwd, reference, options = {}) { | ||
| const timeoutMs = Math.max(0, Number(options.timeoutMs) || DEFAULT_STATUS_WAIT_TIMEOUT_MS); | ||
| const pollIntervalMs = Math.max(100, Number(options.pollIntervalMs) || DEFAULT_STATUS_POLL_INTERVAL_MS); | ||
| const deadline = Date.now() + timeoutMs; | ||
| let snapshot = buildSingleJobSnapshot(cwd, reference); | ||
| let snapshot = reconcileDeadPidDuringWait(cwd, reference, buildSingleJobSnapshot(cwd, reference)); | ||
|
|
||
| while (isActiveJobStatus(snapshot.job.status) && Date.now() < deadline) { | ||
| await sleep(Math.min(pollIntervalMs, Math.max(0, deadline - Date.now()))); | ||
| snapshot = buildSingleJobSnapshot(cwd, reference); | ||
| snapshot = reconcileDeadPidDuringWait(cwd, reference, buildSingleJobSnapshot(cwd, reference)); | ||
| } | ||
|
|
||
| if (isActiveJobStatus(snapshot.job.status)) { | ||
| snapshot = reconcileDeadPidDuringWait(cwd, reference, snapshot); | ||
| } | ||
|
|
||
| return { | ||
|
|
@@ -852,7 +877,14 @@ async function handleStatus(argv) { | |
| pollIntervalMs: options["poll-interval-ms"] | ||
| }) | ||
| : buildSingleJobSnapshot(cwd, reference); | ||
| outputCommandResult(snapshot, renderJobStatusReport(snapshot.job), options.json); | ||
| outputCommandResult( | ||
| snapshot, | ||
| renderJobStatusReport(snapshot.job, { | ||
| waitTimedOut: Boolean(snapshot.waitTimedOut), | ||
| timeoutMs: snapshot.timeoutMs ?? null | ||
| }), | ||
| options.json | ||
| ); | ||
| return; | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,13 +1,159 @@ | ||
| import fs from "node:fs"; | ||
|
|
||
| import { getSessionRuntimeStatus } from "./codex.mjs"; | ||
| import { getConfig, listJobs, readJobFile, resolveJobFile } from "./state.mjs"; | ||
| import { SESSION_ID_ENV } from "./tracked-jobs.mjs"; | ||
| import { isProcessAlive } from "./process.mjs"; | ||
| import { getConfig, listJobs, readJobFile, resolveJobFile, upsertJob, writeJobFile } from "./state.mjs"; | ||
| import { appendLogLine, SESSION_ID_ENV } from "./tracked-jobs.mjs"; | ||
| import { resolveWorkspaceRoot } from "./workspace.mjs"; | ||
|
|
||
| export const DEFAULT_MAX_STATUS_JOBS = 8; | ||
| export const DEFAULT_MAX_PROGRESS_LINES = 4; | ||
|
|
||
| function isActiveJob(job) { | ||
| return job.status === "queued" || job.status === "running"; | ||
| } | ||
|
|
||
| function normalizeTrackedPid(pid) { | ||
| const numeric = Number(pid); | ||
| return Number.isFinite(numeric) && numeric > 0 ? numeric : null; | ||
| } | ||
|
|
||
| function normalizeNullable(value) { | ||
| return value ?? null; | ||
| } | ||
|
|
||
| function buildIndexSyncPatch(indexJob, sourceJob) { | ||
| return { | ||
| id: indexJob.id, | ||
| status: sourceJob.status ?? indexJob.status ?? null, | ||
| phase: sourceJob.phase ?? null, | ||
| pid: Number.isFinite(sourceJob.pid) ? sourceJob.pid : null, | ||
| completedAt: sourceJob.completedAt ?? null, | ||
| errorMessage: sourceJob.errorMessage ?? null, | ||
| threadId: sourceJob.threadId ?? null, | ||
| turnId: sourceJob.turnId ?? null, | ||
| summary: sourceJob.summary ?? indexJob.summary ?? null | ||
| }; | ||
| } | ||
|
|
||
| function indexNeedsSync(indexJob, patch) { | ||
| return ( | ||
| normalizeNullable(indexJob.status) !== normalizeNullable(patch.status) || | ||
| normalizeNullable(indexJob.phase) !== normalizeNullable(patch.phase) || | ||
| normalizeNullable(Number.isFinite(indexJob.pid) ? indexJob.pid : null) !== normalizeNullable(patch.pid) || | ||
| normalizeNullable(indexJob.completedAt) !== normalizeNullable(patch.completedAt) || | ||
| normalizeNullable(indexJob.errorMessage) !== normalizeNullable(patch.errorMessage) || | ||
| normalizeNullable(indexJob.threadId) !== normalizeNullable(patch.threadId) || | ||
| normalizeNullable(indexJob.turnId) !== normalizeNullable(patch.turnId) || | ||
| normalizeNullable(indexJob.summary) !== normalizeNullable(patch.summary) | ||
| ); | ||
| } | ||
|
|
||
| /** | ||
| * Marks a job as failed when its tracked process has died unexpectedly. | ||
| * Re-reads the latest persisted state from disk before writing to guard | ||
| * against races where the job completes legitimately at the same time. | ||
| * @param {string} workspaceRoot | ||
| * @param {string} jobId | ||
| * @param {number} pid - The PID we observed as dead | ||
| * @returns {boolean} true if the job was marked failed, false if skipped | ||
| */ | ||
| export function markDeadPidJobFailed(workspaceRoot, jobId, pid) { | ||
| const observedPid = normalizeTrackedPid(pid); | ||
| if (observedPid == null) { | ||
| return false; | ||
| } | ||
| const jobFile = resolveJobFile(workspaceRoot, jobId); | ||
|
|
||
| // Re-read the latest persisted state from disk (not from memory) | ||
| let latestJob; | ||
| try { | ||
| latestJob = readJobFile(jobFile); | ||
| } catch { | ||
| return false; | ||
| } | ||
|
|
||
| // Guard 1: only overwrite active states - never downgrade completed/failed | ||
| if (latestJob.status !== "queued" && latestJob.status !== "running") { | ||
| return false; | ||
| } | ||
|
|
||
| // Guard 2: only overwrite if the PID still matches what we observed as dead | ||
| // This prevents overwriting a job that restarted with a new PID | ||
| if (normalizeTrackedPid(latestJob.pid) !== observedPid) { | ||
| return false; | ||
| } | ||
|
|
||
| const completedAt = new Date().toISOString(); | ||
| const errorMessage = `Process PID ${observedPid} exited unexpectedly`; | ||
|
|
||
| const failedPatch = { | ||
| status: "failed", | ||
| phase: "failed", | ||
| pid: null, | ||
| errorMessage, | ||
| completedAt | ||
| }; | ||
|
|
||
| // Persist to per-job file | ||
| writeJobFile(workspaceRoot, jobId, { | ||
| ...latestJob, | ||
| ...failedPatch | ||
| }); | ||
| appendLogLine(latestJob.logFile ?? null, `Failed: ${errorMessage}`); | ||
|
|
||
| // Persist to state index | ||
| upsertJob(workspaceRoot, { | ||
| id: jobId, | ||
| ...failedPatch | ||
| }); | ||
|
|
||
| return true; | ||
| } | ||
|
|
||
| function reconcileDeadPidJob(workspaceRoot, job) { | ||
| const trackedPid = normalizeTrackedPid(job.pid); | ||
| if (!isActiveJob(job) || trackedPid == null) { | ||
| return job; | ||
| } | ||
|
|
||
| if (isProcessAlive(trackedPid)) { | ||
| return job; | ||
| } | ||
|
|
||
| const didFail = markDeadPidJobFailed(workspaceRoot, job.id, trackedPid); | ||
| try { | ||
| const storedJob = readJobFile(resolveJobFile(workspaceRoot, job.id)); | ||
| if (didFail) { | ||
| return storedJob; | ||
Abdooo2235 marked this conversation as resolved.
Show resolved
Hide resolved
Comment on lines
+127
to
+128
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
After marking a dead PID as failed, this branch returns Useful? React with 👍 / 👎. |
||
| } | ||
| if (!isActiveJob(storedJob)) { | ||
| upsertJob(workspaceRoot, { | ||
| id: job.id, | ||
| status: storedJob.status ?? null, | ||
| phase: storedJob.phase ?? null, | ||
| pid: Number.isFinite(storedJob.pid) ? storedJob.pid : null, | ||
| completedAt: storedJob.completedAt ?? null, | ||
| errorMessage: storedJob.errorMessage ?? null, | ||
| threadId: storedJob.threadId ?? null, | ||
| turnId: storedJob.turnId ?? null, | ||
| summary: storedJob.summary ?? job.summary ?? null | ||
| }); | ||
| return { | ||
| ...job, | ||
| ...storedJob | ||
| }; | ||
| } | ||
| return job; | ||
| } catch { | ||
| return job; | ||
| } | ||
| } | ||
|
|
||
| function reconcileDeadPidJobs(workspaceRoot, jobs) { | ||
| return jobs.map((job) => reconcileDeadPidJob(workspaceRoot, job)); | ||
| } | ||
|
|
||
| export function sortJobsNewestFirst(jobs) { | ||
| return [...jobs].sort((left, right) => String(right.updatedAt ?? "").localeCompare(String(left.updatedAt ?? ""))); | ||
| } | ||
|
|
@@ -213,19 +359,19 @@ function matchJobReference(jobs, reference, predicate = () => true) { | |
| export function buildStatusSnapshot(cwd, options = {}) { | ||
| const workspaceRoot = resolveWorkspaceRoot(cwd); | ||
| const config = getConfig(workspaceRoot); | ||
| const jobs = sortJobsNewestFirst(filterJobsForCurrentSession(listJobs(workspaceRoot), options)); | ||
| const jobs = sortJobsNewestFirst(reconcileDeadPidJobs(workspaceRoot, filterJobsForCurrentSession(listJobs(workspaceRoot), options))); | ||
| const maxJobs = options.maxJobs ?? DEFAULT_MAX_STATUS_JOBS; | ||
| const maxProgressLines = options.maxProgressLines ?? DEFAULT_MAX_PROGRESS_LINES; | ||
|
|
||
| const running = jobs | ||
| .filter((job) => job.status === "queued" || job.status === "running") | ||
| .filter((job) => isActiveJob(job)) | ||
| .map((job) => enrichJob(job, { maxProgressLines })); | ||
|
|
||
| const latestFinishedRaw = jobs.find((job) => job.status !== "queued" && job.status !== "running") ?? null; | ||
| const latestFinishedRaw = jobs.find((job) => !isActiveJob(job)) ?? null; | ||
| const latestFinished = latestFinishedRaw ? enrichJob(latestFinishedRaw, { maxProgressLines }) : null; | ||
|
|
||
| const recent = (options.all ? jobs : jobs.slice(0, maxJobs)) | ||
| .filter((job) => job.status !== "queued" && job.status !== "running" && job.id !== latestFinished?.id) | ||
| .filter((job) => !isActiveJob(job) && job.id !== latestFinished?.id) | ||
| .map((job) => enrichJob(job, { maxProgressLines })); | ||
|
|
||
| return { | ||
|
|
@@ -241,32 +387,49 @@ export function buildStatusSnapshot(cwd, options = {}) { | |
|
|
||
| export function buildSingleJobSnapshot(cwd, reference, options = {}) { | ||
| const workspaceRoot = resolveWorkspaceRoot(cwd); | ||
| const jobs = sortJobsNewestFirst(listJobs(workspaceRoot)); | ||
| const jobs = sortJobsNewestFirst(reconcileDeadPidJobs(workspaceRoot, listJobs(workspaceRoot))); | ||
| const selected = matchJobReference(jobs, reference); | ||
| if (!selected) { | ||
| throw new Error(`No job found for "${reference}". Run /codex:status to inspect known jobs.`); | ||
| } | ||
|
|
||
| let storedJob = null; | ||
| try { | ||
| storedJob = readStoredJob(workspaceRoot, selected.id); | ||
| } catch { | ||
| storedJob = null; | ||
| } | ||
|
|
||
| const latest = storedJob ? { ...selected, ...storedJob } : selected; | ||
| if (storedJob) { | ||
| const indexPatch = buildIndexSyncPatch(selected, latest); | ||
| if (indexNeedsSync(selected, indexPatch)) { | ||
| upsertJob(workspaceRoot, indexPatch); | ||
| } | ||
| } | ||
|
|
||
| return { | ||
| workspaceRoot, | ||
| job: enrichJob(selected, { maxProgressLines: options.maxProgressLines }) | ||
| job: enrichJob(latest, { maxProgressLines: options.maxProgressLines }) | ||
| }; | ||
| } | ||
|
|
||
| export function resolveResultJob(cwd, reference) { | ||
| const workspaceRoot = resolveWorkspaceRoot(cwd); | ||
| const jobs = sortJobsNewestFirst(reference ? listJobs(workspaceRoot) : filterJobsForCurrentSession(listJobs(workspaceRoot))); | ||
| const jobs = sortJobsNewestFirst( | ||
| reconcileDeadPidJobs(workspaceRoot, reference ? listJobs(workspaceRoot) : filterJobsForCurrentSession(listJobs(workspaceRoot))) | ||
| ); | ||
| const selected = matchJobReference( | ||
| jobs, | ||
| reference, | ||
| (job) => job.status === "completed" || job.status === "failed" || job.status === "cancelled" | ||
| (job) => !isActiveJob(job) | ||
| ); | ||
|
|
||
| if (selected) { | ||
| return { workspaceRoot, job: selected }; | ||
| } | ||
|
|
||
| const active = matchJobReference(jobs, reference, (job) => job.status === "queued" || job.status === "running"); | ||
| const active = matchJobReference(jobs, reference, (job) => isActiveJob(job)); | ||
| if (active) { | ||
| throw new Error(`Job ${active.id} is still ${active.status}. Check /codex:status and try again once it finishes.`); | ||
| } | ||
|
|
@@ -280,8 +443,8 @@ export function resolveResultJob(cwd, reference) { | |
|
|
||
| export function resolveCancelableJob(cwd, reference, options = {}) { | ||
| const workspaceRoot = resolveWorkspaceRoot(cwd); | ||
| const jobs = sortJobsNewestFirst(listJobs(workspaceRoot)); | ||
| const activeJobs = jobs.filter((job) => job.status === "queued" || job.status === "running"); | ||
| const jobs = sortJobsNewestFirst(reconcileDeadPidJobs(workspaceRoot, listJobs(workspaceRoot))); | ||
| const activeJobs = jobs.filter((job) => isActiveJob(job)); | ||
|
|
||
| if (reference) { | ||
| const selected = matchJobReference(activeJobs, reference); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
findLatestResumableTaskJobnow only considerstaskjobs withstatus === "completed", but bothtask-resume-candidateandtask --resume-lastdepend on this helper. That means a most-recent task that ended asfailedorcancelled(including the new dead-PID failure path) is treated as non-resumable even when it has a validthreadId, causing users to get “No previous Codex task thread…” or resume an older thread instead of the latest one.Useful? React with 👍 / 👎.