From 339845b7547868469ff34b47ca38f64d2abe1f81 Mon Sep 17 00:00:00 2001 From: Vadim Comanescu Date: Sun, 1 Mar 2026 11:37:13 +0100 Subject: [PATCH 1/3] engine/runtime: scrub inherited stage contract env (cherry picked from commit a66b4f39f10f6c2ac53d44598c2544148356c19b) --- .../engine/input_manifest_contract_test.go | 1 + internal/attractor/engine/main_test.go | 27 +++++++++++++++++++ internal/attractor/engine/node_env.go | 17 +++++++++++- internal/attractor/engine/node_env_test.go | 23 ++++++++++++++++ 4 files changed, 67 insertions(+), 1 deletion(-) create mode 100644 internal/attractor/engine/main_test.go diff --git a/internal/attractor/engine/input_manifest_contract_test.go b/internal/attractor/engine/input_manifest_contract_test.go index cdbf9d9a..5fc04dcc 100644 --- a/internal/attractor/engine/input_manifest_contract_test.go +++ b/internal/attractor/engine/input_manifest_contract_test.go @@ -73,6 +73,7 @@ digraph G { func TestInputManifestContract_DisabledSuppressesEnvPreambleAndEvents(t *testing.T) { repo := initTestRepo(t) logsRoot := t.TempDir() + t.Setenv(inputsManifestEnvKey, "/tmp/ambient-inputs-manifest.json") mustWriteInputFile(t, filepath.Join(repo, ".ai", "definition_of_done.md"), "line by line") cli := writeInputManifestProbeCLI(t, false) diff --git a/internal/attractor/engine/main_test.go b/internal/attractor/engine/main_test.go new file mode 100644 index 00000000..18227096 --- /dev/null +++ b/internal/attractor/engine/main_test.go @@ -0,0 +1,27 @@ +package engine + +import ( + "os" + "path/filepath" + "strings" + "testing" +) + +func TestMain(m *testing.M) { + stateRoot, err := os.MkdirTemp("", "kilroy-engine-test-state-*") + if err != nil { + _, _ = os.Stderr.WriteString("failed to create temp state root: " + err.Error() + "\n") + os.Exit(2) + } + + if strings.TrimSpace(os.Getenv("XDG_STATE_HOME")) == "" { + _ = os.Setenv("XDG_STATE_HOME", stateRoot) + } + if strings.TrimSpace(os.Getenv("KILROY_CODEX_STATE_BASE")) == "" { + _ = os.Setenv("KILROY_CODEX_STATE_BASE", filepath.Join(stateRoot, "kilroy", "attractor", "codex-state")) + } + + code := m.Run() + _ = os.RemoveAll(stateRoot) + os.Exit(code) +} diff --git a/internal/attractor/engine/node_env.go b/internal/attractor/engine/node_env.go index 317bf6a2..b21479b2 100644 --- a/internal/attractor/engine/node_env.go +++ b/internal/attractor/engine/node_env.go @@ -15,12 +15,27 @@ const ( inputsManifestEnvKey = "KILROY_INPUTS_MANIFEST_PATH" ) +var baseNodeEnvStripKeys = []string{ + "CLAUDECODE", + runIDEnvKey, + nodeIDEnvKey, + logsRootEnvKey, + stageLogsDirEnvKey, + worktreeDirEnvKey, + inputsManifestEnvKey, + stageStatusPathEnvKey, + stageStatusFallbackPathEnvKey, +} + // buildBaseNodeEnv constructs the base environment for any node execution. // It starts from os.Environ(), strips CLAUDECODE, then applies resolved // artifact policy environment variables. func buildBaseNodeEnv(rp ResolvedArtifactPolicy) []string { base := os.Environ() - base = stripEnvKey(base, "CLAUDECODE") + // Scrub inherited process state that can corrupt stage-local contracts. + for _, key := range baseNodeEnvStripKeys { + base = stripEnvKey(base, key) + } overrides := make(map[string]string, len(rp.Env.Vars)) for k, v := range rp.Env.Vars { diff --git a/internal/attractor/engine/node_env_test.go b/internal/attractor/engine/node_env_test.go index 94f770bf..01fe8304 100644 --- a/internal/attractor/engine/node_env_test.go +++ b/internal/attractor/engine/node_env_test.go @@ -59,6 +59,29 @@ func TestBuildBaseNodeEnv_StripsClaudeCode(t *testing.T) { } } +func TestBuildBaseNodeEnv_StripsKilroyContractEnvKeys(t *testing.T) { + outerValues := map[string]string{ + runIDEnvKey: "outer-run", + nodeIDEnvKey: "outer-node", + logsRootEnvKey: "/tmp/outer-logs", + stageLogsDirEnvKey: "/tmp/outer-logs/outer-node", + worktreeDirEnvKey: "/tmp/outer-worktree", + inputsManifestEnvKey: "/tmp/outer-inputs-manifest.json", + stageStatusPathEnvKey: "/tmp/outer-status.json", + stageStatusFallbackPathEnvKey: "/tmp/outer-fallback-status.json", + } + for key, value := range outerValues { + t.Setenv(key, value) + } + + env := buildBaseNodeEnv(ResolvedArtifactPolicy{}) + for key := range outerValues { + if envHasKey(env, key) { + t.Fatalf("%s should be stripped from base env", key) + } + } +} + func TestBuildBaseNodeEnv_PreservesExplicitToolchainPaths(t *testing.T) { home := t.TempDir() cargoHome := filepath.Join(home, ".cargo") From 8bd275275fb80b956ddaa35aa89efe57980af7c4 Mon Sep 17 00:00:00 2001 From: Vadim Comanescu Date: Sun, 1 Mar 2026 15:32:02 +0100 Subject: [PATCH 2/3] engine/runtime: harden run ownership locking (cherry picked from commit 05947fb09f9f187f0d3f080cd132c7d510b3cc83) --- internal/attractor/engine/engine.go | 10 +- internal/attractor/engine/resume.go | 36 ++- .../attractor/engine/run_ownership_lock.go | 204 ++++++++++++ .../engine/run_ownership_lock_test.go | 299 ++++++++++++++++++ 4 files changed, 532 insertions(+), 17 deletions(-) create mode 100644 internal/attractor/engine/run_ownership_lock.go create mode 100644 internal/attractor/engine/run_ownership_lock_test.go diff --git a/internal/attractor/engine/engine.go b/internal/attractor/engine/engine.go index 43aa0a2d..960f2c6c 100644 --- a/internal/attractor/engine/engine.go +++ b/internal/attractor/engine/engine.go @@ -373,10 +373,14 @@ func (e *Engine) run(ctx context.Context) (res *Result, err error) { runCtx, cancelRun := context.WithCancelCause(ctx) defer cancelRun(nil) + var releaseOwnership func() defer func() { - if err != nil { + if err != nil && !isRunOwnershipConflict(err) { e.persistFatalOutcome(ctx, err) } + if releaseOwnership != nil { + releaseOwnership() + } }() if e.Options.RepoPath == "" { @@ -403,6 +407,10 @@ func (e *Engine) run(ctx context.Context) (res *Result, err error) { if err := os.MkdirAll(e.LogsRoot, 0o755); err != nil { return nil, err } + releaseOwnership, err = acquireRunOwnership(e.LogsRoot, e.Options.RunID) + if err != nil { + return nil, err + } // Record PID so attractor status can detect a running process. _ = os.WriteFile(filepath.Join(e.LogsRoot, "run.pid"), []byte(strconv.Itoa(os.Getpid())), 0o644) // Snapshot the run config for repeatability and resume. diff --git a/internal/attractor/engine/resume.go b/internal/attractor/engine/resume.go index bb71bcdc..eed7708f 100644 --- a/internal/attractor/engine/resume.go +++ b/internal/attractor/engine/resume.go @@ -70,26 +70,26 @@ func resumeFromLogsRoot(ctx context.Context, logsRoot string, ov ResumeOverrides runID string checkpointSHA string eng *Engine + releaseLock func() ) defer func() { - if err == nil { - return - } - if eng != nil { - eng.persistFatalOutcome(ctx, err) - return - } - if strings.TrimSpace(logsRoot) == "" || strings.TrimSpace(runID) == "" { - return + if err != nil && !isRunOwnershipConflict(err) { + if eng != nil { + eng.persistFatalOutcome(ctx, err) + } else if strings.TrimSpace(logsRoot) != "" && strings.TrimSpace(runID) != "" { + final := runtime.FinalOutcome{ + Timestamp: time.Now().UTC(), + Status: runtime.FinalFail, + RunID: runID, + FinalGitCommitSHA: strings.TrimSpace(checkpointSHA), + FailureReason: strings.TrimSpace(err.Error()), + } + _ = final.Save(filepath.Join(logsRoot, "final.json")) + } } - final := runtime.FinalOutcome{ - Timestamp: time.Now().UTC(), - Status: runtime.FinalFail, - RunID: runID, - FinalGitCommitSHA: strings.TrimSpace(checkpointSHA), - FailureReason: strings.TrimSpace(err.Error()), + if releaseLock != nil { + releaseLock() } - _ = final.Save(filepath.Join(logsRoot, "final.json")) }() m, err := loadManifest(filepath.Join(logsRoot, "manifest.json")) @@ -97,6 +97,10 @@ func resumeFromLogsRoot(ctx context.Context, logsRoot string, ov ResumeOverrides return nil, err } runID = strings.TrimSpace(m.RunID) + releaseLock, err = acquireRunOwnership(logsRoot, runID) + if err != nil { + return nil, err + } cp, err := runtime.LoadCheckpoint(filepath.Join(logsRoot, "checkpoint.json")) if err != nil { return nil, err diff --git a/internal/attractor/engine/run_ownership_lock.go b/internal/attractor/engine/run_ownership_lock.go new file mode 100644 index 00000000..f9b2c558 --- /dev/null +++ b/internal/attractor/engine/run_ownership_lock.go @@ -0,0 +1,204 @@ +package engine + +import ( + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + "strings" + "time" + + "github.com/danshapiro/kilroy/internal/attractor/procutil" +) + +const ( + runOwnershipLockFile = "run.lock.json" + runOwnershipAcquireAttempts = 20 + runOwnershipRetryDelay = 25 * time.Millisecond + runOwnershipUnreadableGrace = 250 * time.Millisecond +) + +type runOwnershipRecord struct { + PID int `json:"pid"` + PIDStartTime uint64 `json:"pid_start_time,omitempty"` + RunID string `json:"run_id,omitempty"` + AcquiredAt string `json:"acquired_at,omitempty"` +} + +type runOwnershipConflictError struct { + LogsRoot string + LockPath string + ExistingPID int + ExistingRun string + Reason string +} + +func (e *runOwnershipConflictError) Error() string { + if e == nil { + return "run ownership conflict" + } + msg := fmt.Sprintf("logs_root %q is already owned", e.LogsRoot) + if e.ExistingPID > 0 { + msg += fmt.Sprintf(" by a live process (pid=%d)", e.ExistingPID) + } else { + msg += " by another process" + } + if strings.TrimSpace(e.ExistingRun) != "" { + msg += fmt.Sprintf(" run_id=%q", e.ExistingRun) + } + if strings.TrimSpace(e.LockPath) != "" { + msg += fmt.Sprintf(" lock=%q", e.LockPath) + } + if strings.TrimSpace(e.Reason) != "" { + msg += fmt.Sprintf(" (%s)", strings.TrimSpace(e.Reason)) + } + return msg +} + +func isRunOwnershipConflict(err error) bool { + var ownershipErr *runOwnershipConflictError + return errors.As(err, &ownershipErr) +} + +func acquireRunOwnership(logsRoot, runID string) (func(), error) { + root := strings.TrimSpace(logsRoot) + if root == "" { + return nil, fmt.Errorf("logs_root is required") + } + lockPath := filepath.Join(root, runOwnershipLockFile) + record := runOwnershipRecord{ + PID: os.Getpid(), + RunID: strings.TrimSpace(runID), + AcquiredAt: time.Now().UTC().Format(time.RFC3339Nano), + } + if startTime, err := procutil.ReadPIDStartTime(record.PID); err == nil && startTime > 0 { + record.PIDStartTime = startTime + } + payload, err := json.MarshalIndent(record, "", " ") + if err != nil { + return nil, fmt.Errorf("encode run ownership: %w", err) + } + // Keep payload newline-friendly for manual inspection. + payload = append(payload, '\n') + + var lastReadErr error + for attempts := 0; attempts < runOwnershipAcquireAttempts; attempts++ { + f, openErr := os.OpenFile(lockPath, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0o644) + if openErr == nil { + if _, writeErr := f.Write(payload); writeErr != nil { + _ = f.Close() + _ = os.Remove(lockPath) + return nil, fmt.Errorf("write run ownership lock %q: %w", lockPath, writeErr) + } + if closeErr := f.Close(); closeErr != nil { + _ = os.Remove(lockPath) + return nil, fmt.Errorf("close run ownership lock %q: %w", lockPath, closeErr) + } + pid := record.PID + start := record.PIDStartTime + return func() { + releaseRunOwnership(lockPath, pid, start) + }, nil + } + if !errors.Is(openErr, os.ErrExist) { + return nil, fmt.Errorf("create run ownership lock %q: %w", lockPath, openErr) + } + + existing, readErr := readRunOwnership(lockPath) + if readErr != nil { + lastReadErr = readErr + stale, staleErr := unreadableLockIsStale(lockPath, time.Now().UTC()) + if staleErr != nil && !errors.Is(staleErr, os.ErrNotExist) { + return nil, fmt.Errorf("stat unreadable run ownership lock %q: %w", lockPath, staleErr) + } + if stale { + if removeErr := os.Remove(lockPath); removeErr != nil && !errors.Is(removeErr, os.ErrNotExist) { + return nil, fmt.Errorf("remove unreadable stale run ownership lock %q: %w", lockPath, removeErr) + } + continue + } + time.Sleep(runOwnershipRetryDelay) + continue + } + + if runOwnershipMatchesLiveProcess(existing) { + return nil, &runOwnershipConflictError{ + LogsRoot: root, + LockPath: lockPath, + ExistingPID: existing.PID, + ExistingRun: strings.TrimSpace(existing.RunID), + } + } + // Stale ownership record, best-effort cleanup and retry. + if removeErr := os.Remove(lockPath); removeErr != nil && !errors.Is(removeErr, os.ErrNotExist) { + return nil, fmt.Errorf("remove stale run ownership lock %q: %w", lockPath, removeErr) + } + } + if lastReadErr != nil { + return nil, &runOwnershipConflictError{ + LogsRoot: root, + LockPath: lockPath, + Reason: fmt.Sprintf("ownership lock unreadable after retries: %v", lastReadErr), + } + } + return nil, fmt.Errorf("acquire run ownership lock %q: exhausted retries", lockPath) +} + +func readRunOwnership(lockPath string) (runOwnershipRecord, error) { + b, err := os.ReadFile(lockPath) + if err != nil { + return runOwnershipRecord{}, err + } + if strings.TrimSpace(string(b)) == "" { + return runOwnershipRecord{}, fmt.Errorf("empty lock payload") + } + var rec runOwnershipRecord + if err := json.Unmarshal(b, &rec); err != nil { + return runOwnershipRecord{}, err + } + if rec.PID <= 0 { + return runOwnershipRecord{}, fmt.Errorf("invalid pid %d", rec.PID) + } + return rec, nil +} + +func runOwnershipMatchesLiveProcess(rec runOwnershipRecord) bool { + if rec.PID <= 0 || !procutil.PIDAlive(rec.PID) { + return false + } + if rec.PIDStartTime == 0 { + return true + } + start, err := procutil.ReadPIDStartTime(rec.PID) + if err != nil || start == 0 { + // Cannot disambiguate; keep conservative conflict behavior. + return true + } + return start == rec.PIDStartTime +} + +func unreadableLockIsStale(lockPath string, now time.Time) (bool, error) { + info, err := os.Stat(lockPath) + if err != nil { + return false, err + } + return now.Sub(info.ModTime()) >= runOwnershipUnreadableGrace, nil +} + +func releaseRunOwnership(lockPath string, ownerPID int, ownerStartTime uint64) { + if strings.TrimSpace(lockPath) == "" || ownerPID <= 0 { + return + } + rec, err := readRunOwnership(lockPath) + if err != nil { + return + } + if rec.PID != ownerPID { + return + } + if ownerStartTime > 0 && rec.PIDStartTime > 0 && rec.PIDStartTime != ownerStartTime { + return + } + _ = os.Remove(lockPath) +} diff --git a/internal/attractor/engine/run_ownership_lock_test.go b/internal/attractor/engine/run_ownership_lock_test.go new file mode 100644 index 00000000..1e65d37b --- /dev/null +++ b/internal/attractor/engine/run_ownership_lock_test.go @@ -0,0 +1,299 @@ +package engine + +import ( + "context" + "encoding/json" + "errors" + "os" + "os/exec" + "path/filepath" + "testing" + "time" + + "github.com/danshapiro/kilroy/internal/attractor/procutil" +) + +func TestAcquireRunOwnership_RejectsLiveOwner(t *testing.T) { + t.Parallel() + + logsRoot := t.TempDir() + lockPath := filepath.Join(logsRoot, runOwnershipLockFile) + ownerPID, ownerStart := startSleepingProcess(t) + owner := runOwnershipRecord{ + PID: ownerPID, + PIDStartTime: ownerStart, + RunID: "existing-run", + } + if err := writeOwnershipRecord(lockPath, owner); err != nil { + t.Fatalf("writeOwnershipRecord: %v", err) + } + + _, err := acquireRunOwnership(logsRoot, "new-run") + if err == nil { + t.Fatalf("expected ownership conflict error, got nil") + } + var ownershipErr *runOwnershipConflictError + if !errors.As(err, &ownershipErr) { + t.Fatalf("expected runOwnershipConflictError, got %T (%v)", err, err) + } + if ownershipErr.ExistingPID != ownerPID { + t.Fatalf("existing pid: got %d want %d", ownershipErr.ExistingPID, ownerPID) + } +} + +func TestAcquireRunOwnership_ReclaimsStaleOwner(t *testing.T) { + t.Parallel() + + logsRoot := t.TempDir() + lockPath := filepath.Join(logsRoot, runOwnershipLockFile) + owner := runOwnershipRecord{ + PID: 999999, // best-effort stale PID + PIDStartTime: 123456, + RunID: "stale-run", + } + if err := writeOwnershipRecord(lockPath, owner); err != nil { + t.Fatalf("writeOwnershipRecord: %v", err) + } + + release, err := acquireRunOwnership(logsRoot, "new-run") + if err != nil { + t.Fatalf("acquireRunOwnership: %v", err) + } + defer release() + + got, err := readRunOwnership(lockPath) + if err != nil { + t.Fatalf("readRunOwnership: %v", err) + } + if got.PID != os.Getpid() { + t.Fatalf("owner pid: got %d want %d", got.PID, os.Getpid()) + } + if got.RunID != "new-run" { + t.Fatalf("owner run_id: got %q want %q", got.RunID, "new-run") + } +} + +func TestAcquireRunOwnership_ReclaimsUnreadableStaleLock(t *testing.T) { + t.Parallel() + + logsRoot := t.TempDir() + lockPath := filepath.Join(logsRoot, runOwnershipLockFile) + if err := os.WriteFile(lockPath, []byte("{"), 0o644); err != nil { + t.Fatalf("write unreadable lock: %v", err) + } + old := time.Now().Add(-2 * time.Second) + if err := os.Chtimes(lockPath, old, old); err != nil { + t.Fatalf("os.Chtimes: %v", err) + } + + release, err := acquireRunOwnership(logsRoot, "new-run") + if err != nil { + t.Fatalf("acquireRunOwnership: %v", err) + } + defer release() + + got, err := readRunOwnership(lockPath) + if err != nil { + t.Fatalf("readRunOwnership: %v", err) + } + if got.RunID != "new-run" { + t.Fatalf("owner run_id: got %q want %q", got.RunID, "new-run") + } +} + +func TestAcquireRunOwnership_RetriesUnreadableLockUntilConflict(t *testing.T) { + t.Parallel() + + logsRoot := t.TempDir() + lockPath := filepath.Join(logsRoot, runOwnershipLockFile) + owner := ownerRecordForCurrentPID("existing-run") + + created := make(chan struct{}) + done := make(chan error, 1) + go func() { + f, err := os.OpenFile(lockPath, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0o644) + if err != nil { + close(created) + done <- err + return + } + close(created) + defer func() { + _ = f.Close() + }() + time.Sleep(120 * time.Millisecond) + b, err := json.Marshal(owner) + if err != nil { + done <- err + return + } + _, err = f.Write(b) + done <- err + }() + <-created + + _, err := acquireRunOwnership(logsRoot, "new-run") + if err == nil { + t.Fatalf("expected ownership conflict error, got nil") + } + if !isRunOwnershipConflict(err) { + t.Fatalf("expected run ownership conflict, got: %v", err) + } + if writeErr := <-done; writeErr != nil { + t.Fatalf("slow owner write failed: %v", writeErr) + } +} + +func TestAcquireRunOwnership_ReclaimsPIDStartTimeMismatch(t *testing.T) { + t.Parallel() + + currentStart, err := procutil.ReadPIDStartTime(os.Getpid()) + if err != nil || currentStart == 0 { + t.Skip("pid start time unavailable on this platform") + } + + logsRoot := t.TempDir() + lockPath := filepath.Join(logsRoot, runOwnershipLockFile) + owner := runOwnershipRecord{ + PID: os.Getpid(), + PIDStartTime: currentStart + 1, // force mismatch with current process identity + RunID: "stale-owner", + } + if err := writeOwnershipRecord(lockPath, owner); err != nil { + t.Fatalf("writeOwnershipRecord: %v", err) + } + + release, err := acquireRunOwnership(logsRoot, "new-run") + if err != nil { + t.Fatalf("acquireRunOwnership: %v", err) + } + defer release() + + got, err := readRunOwnership(lockPath) + if err != nil { + t.Fatalf("readRunOwnership: %v", err) + } + if got.RunID != "new-run" { + t.Fatalf("owner run_id: got %q want %q", got.RunID, "new-run") + } +} + +func TestReleaseRunOwnership_DoesNotRemoveForeignOwner(t *testing.T) { + t.Parallel() + + logsRoot := t.TempDir() + lockPath := filepath.Join(logsRoot, runOwnershipLockFile) + owner := ownerRecordForCurrentPID("owner-run") + if err := writeOwnershipRecord(lockPath, owner); err != nil { + t.Fatalf("writeOwnershipRecord: %v", err) + } + + releaseRunOwnership(lockPath, os.Getpid()+1, 0) + if _, err := os.Stat(lockPath); err != nil { + t.Fatalf("expected lock to remain for foreign owner, stat err: %v", err) + } +} + +func TestReleaseRunOwnership_DoesNotRemoveMismatchedStartTime(t *testing.T) { + t.Parallel() + + currentStart, err := procutil.ReadPIDStartTime(os.Getpid()) + if err != nil || currentStart == 0 { + t.Skip("pid start time unavailable on this platform") + } + + logsRoot := t.TempDir() + lockPath := filepath.Join(logsRoot, runOwnershipLockFile) + owner := runOwnershipRecord{ + PID: os.Getpid(), + PIDStartTime: currentStart + 1, + RunID: "owner-run", + } + if err := writeOwnershipRecord(lockPath, owner); err != nil { + t.Fatalf("writeOwnershipRecord: %v", err) + } + + releaseRunOwnership(lockPath, os.Getpid(), currentStart) + if _, err := os.Stat(lockPath); err != nil { + t.Fatalf("expected lock to remain when start time mismatches, stat err: %v", err) + } +} + +func TestRun_OwnershipConflict_DoesNotWriteFinalJSON(t *testing.T) { + t.Parallel() + + repo := initTestRepo(t) + logsRoot := t.TempDir() + lockPath := filepath.Join(logsRoot, runOwnershipLockFile) + ownerPID, ownerStart := startSleepingProcess(t) + owner := runOwnershipRecord{ + PID: ownerPID, + PIDStartTime: ownerStart, + RunID: "existing-run", + } + if err := writeOwnershipRecord(lockPath, owner); err != nil { + t.Fatalf("writeOwnershipRecord: %v", err) + } + + dot := []byte(` +digraph G { + start [shape=Mdiamond] + exit [shape=Msquare] + start -> exit +} +`) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + _, err := Run(ctx, dot, RunOptions{ + RepoPath: repo, + RunID: "ownership-conflict", + LogsRoot: logsRoot, + }) + if err == nil { + t.Fatalf("expected ownership conflict error, got nil") + } + if !isRunOwnershipConflict(err) { + t.Fatalf("expected ownership conflict, got: %v", err) + } + if _, statErr := os.Stat(filepath.Join(logsRoot, "final.json")); !errors.Is(statErr, os.ErrNotExist) { + t.Fatalf("expected no final.json for ownership conflict, stat err=%v", statErr) + } +} + +func writeOwnershipRecord(path string, rec runOwnershipRecord) error { + b, err := json.Marshal(rec) + if err != nil { + return err + } + return os.WriteFile(path, b, 0o644) +} + +func ownerRecordForCurrentPID(runID string) runOwnershipRecord { + rec := runOwnershipRecord{ + PID: os.Getpid(), + RunID: runID, + } + if start, err := procutil.ReadPIDStartTime(rec.PID); err == nil && start > 0 { + rec.PIDStartTime = start + } + return rec +} + +func startSleepingProcess(t *testing.T) (int, uint64) { + t.Helper() + cmd := exec.Command("sleep", "60") + if err := cmd.Start(); err != nil { + t.Fatalf("start sleep process: %v", err) + } + pid := cmd.Process.Pid + var start uint64 + if s, err := procutil.ReadPIDStartTime(pid); err == nil { + start = s + } + t.Cleanup(func() { + _ = cmd.Process.Kill() + _, _ = cmd.Process.Wait() + }) + return pid, start +} From c194cfded411064c304313cc75b3eccd57eafbee Mon Sep 17 00:00:00 2001 From: Vadim Comanescu Date: Wed, 4 Mar 2026 19:52:13 +0100 Subject: [PATCH 3/3] fix(engine): preserve unreadable run ownership locks --- .../attractor/engine/run_ownership_lock.go | 19 ------------- .../engine/run_ownership_lock_test.go | 28 +++++++++---------- 2 files changed, 13 insertions(+), 34 deletions(-) diff --git a/internal/attractor/engine/run_ownership_lock.go b/internal/attractor/engine/run_ownership_lock.go index f9b2c558..cdddfbd3 100644 --- a/internal/attractor/engine/run_ownership_lock.go +++ b/internal/attractor/engine/run_ownership_lock.go @@ -16,7 +16,6 @@ const ( runOwnershipLockFile = "run.lock.json" runOwnershipAcquireAttempts = 20 runOwnershipRetryDelay = 25 * time.Millisecond - runOwnershipUnreadableGrace = 250 * time.Millisecond ) type runOwnershipRecord struct { @@ -108,16 +107,6 @@ func acquireRunOwnership(logsRoot, runID string) (func(), error) { existing, readErr := readRunOwnership(lockPath) if readErr != nil { lastReadErr = readErr - stale, staleErr := unreadableLockIsStale(lockPath, time.Now().UTC()) - if staleErr != nil && !errors.Is(staleErr, os.ErrNotExist) { - return nil, fmt.Errorf("stat unreadable run ownership lock %q: %w", lockPath, staleErr) - } - if stale { - if removeErr := os.Remove(lockPath); removeErr != nil && !errors.Is(removeErr, os.ErrNotExist) { - return nil, fmt.Errorf("remove unreadable stale run ownership lock %q: %w", lockPath, removeErr) - } - continue - } time.Sleep(runOwnershipRetryDelay) continue } @@ -178,14 +167,6 @@ func runOwnershipMatchesLiveProcess(rec runOwnershipRecord) bool { return start == rec.PIDStartTime } -func unreadableLockIsStale(lockPath string, now time.Time) (bool, error) { - info, err := os.Stat(lockPath) - if err != nil { - return false, err - } - return now.Sub(info.ModTime()) >= runOwnershipUnreadableGrace, nil -} - func releaseRunOwnership(lockPath string, ownerPID int, ownerStartTime uint64) { if strings.TrimSpace(lockPath) == "" || ownerPID <= 0 { return diff --git a/internal/attractor/engine/run_ownership_lock_test.go b/internal/attractor/engine/run_ownership_lock_test.go index 1e65d37b..e6c286a6 100644 --- a/internal/attractor/engine/run_ownership_lock_test.go +++ b/internal/attractor/engine/run_ownership_lock_test.go @@ -73,7 +73,7 @@ func TestAcquireRunOwnership_ReclaimsStaleOwner(t *testing.T) { } } -func TestAcquireRunOwnership_ReclaimsUnreadableStaleLock(t *testing.T) { +func TestAcquireRunOwnership_UnreadableLockConflicts(t *testing.T) { t.Parallel() logsRoot := t.TempDir() @@ -81,23 +81,21 @@ func TestAcquireRunOwnership_ReclaimsUnreadableStaleLock(t *testing.T) { if err := os.WriteFile(lockPath, []byte("{"), 0o644); err != nil { t.Fatalf("write unreadable lock: %v", err) } - old := time.Now().Add(-2 * time.Second) - if err := os.Chtimes(lockPath, old, old); err != nil { - t.Fatalf("os.Chtimes: %v", err) - } - - release, err := acquireRunOwnership(logsRoot, "new-run") - if err != nil { - t.Fatalf("acquireRunOwnership: %v", err) - } - defer release() - got, err := readRunOwnership(lockPath) + _, err := acquireRunOwnership(logsRoot, "new-run") if err != nil { - t.Fatalf("readRunOwnership: %v", err) + var ownershipErr *runOwnershipConflictError + if !errors.As(err, &ownershipErr) { + t.Fatalf("expected runOwnershipConflictError, got %T (%v)", err, err) + } + if ownershipErr.Reason == "" { + t.Fatalf("expected conflict reason for unreadable lock, got empty reason") + } + } else { + t.Fatalf("expected ownership conflict for unreadable lock, got nil") } - if got.RunID != "new-run" { - t.Fatalf("owner run_id: got %q want %q", got.RunID, "new-run") + if _, statErr := os.Stat(lockPath); statErr != nil { + t.Fatalf("expected unreadable lock to be preserved, stat err: %v", statErr) } }