Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion internal/attractor/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,14 @@ func (e *Engine) run(ctx context.Context) (res *Result, err error) {
runCtx, cancelRun := context.WithCancelCause(ctx)
defer cancelRun(nil)

var releaseOwnership func()
defer func() {
if err != nil {
if err != nil && !isRunOwnershipConflict(err) {
e.persistFatalOutcome(ctx, err)
}
if releaseOwnership != nil {
releaseOwnership()
}
}()

if e.Options.RepoPath == "" {
Expand All @@ -403,6 +407,10 @@ func (e *Engine) run(ctx context.Context) (res *Result, err error) {
if err := os.MkdirAll(e.LogsRoot, 0o755); err != nil {
return nil, err
}
releaseOwnership, err = acquireRunOwnership(e.LogsRoot, e.Options.RunID)
if err != nil {
return nil, err
}
// Record PID so attractor status can detect a running process.
_ = os.WriteFile(filepath.Join(e.LogsRoot, "run.pid"), []byte(strconv.Itoa(os.Getpid())), 0o644)
// Snapshot the run config for repeatability and resume.
Expand Down
1 change: 1 addition & 0 deletions internal/attractor/engine/input_manifest_contract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ digraph G {
func TestInputManifestContract_DisabledSuppressesEnvPreambleAndEvents(t *testing.T) {
repo := initTestRepo(t)
logsRoot := t.TempDir()
t.Setenv(inputsManifestEnvKey, "/tmp/ambient-inputs-manifest.json")
mustWriteInputFile(t, filepath.Join(repo, ".ai", "definition_of_done.md"), "line by line")

cli := writeInputManifestProbeCLI(t, false)
Expand Down
27 changes: 27 additions & 0 deletions internal/attractor/engine/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package engine

import (
"os"
"path/filepath"
"strings"
"testing"
)

func TestMain(m *testing.M) {
stateRoot, err := os.MkdirTemp("", "kilroy-engine-test-state-*")
if err != nil {
_, _ = os.Stderr.WriteString("failed to create temp state root: " + err.Error() + "\n")
os.Exit(2)
}

if strings.TrimSpace(os.Getenv("XDG_STATE_HOME")) == "" {
_ = os.Setenv("XDG_STATE_HOME", stateRoot)
}
if strings.TrimSpace(os.Getenv("KILROY_CODEX_STATE_BASE")) == "" {
_ = os.Setenv("KILROY_CODEX_STATE_BASE", filepath.Join(stateRoot, "kilroy", "attractor", "codex-state"))
}

code := m.Run()
_ = os.RemoveAll(stateRoot)
os.Exit(code)
}
17 changes: 16 additions & 1 deletion internal/attractor/engine/node_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,27 @@ const (
inputsManifestEnvKey = "KILROY_INPUTS_MANIFEST_PATH"
)

var baseNodeEnvStripKeys = []string{
"CLAUDECODE",
runIDEnvKey,
nodeIDEnvKey,
logsRootEnvKey,
stageLogsDirEnvKey,
worktreeDirEnvKey,
inputsManifestEnvKey,
stageStatusPathEnvKey,
stageStatusFallbackPathEnvKey,
}

// buildBaseNodeEnv constructs the base environment for any node execution.
// It starts from os.Environ(), strips CLAUDECODE, then applies resolved
// artifact policy environment variables.
func buildBaseNodeEnv(rp ResolvedArtifactPolicy) []string {
base := os.Environ()
base = stripEnvKey(base, "CLAUDECODE")
// Scrub inherited process state that can corrupt stage-local contracts.
for _, key := range baseNodeEnvStripKeys {
base = stripEnvKey(base, key)
}

overrides := make(map[string]string, len(rp.Env.Vars))
for k, v := range rp.Env.Vars {
Expand Down
23 changes: 23 additions & 0 deletions internal/attractor/engine/node_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,29 @@ func TestBuildBaseNodeEnv_StripsClaudeCode(t *testing.T) {
}
}

func TestBuildBaseNodeEnv_StripsKilroyContractEnvKeys(t *testing.T) {
outerValues := map[string]string{
runIDEnvKey: "outer-run",
nodeIDEnvKey: "outer-node",
logsRootEnvKey: "/tmp/outer-logs",
stageLogsDirEnvKey: "/tmp/outer-logs/outer-node",
worktreeDirEnvKey: "/tmp/outer-worktree",
inputsManifestEnvKey: "/tmp/outer-inputs-manifest.json",
stageStatusPathEnvKey: "/tmp/outer-status.json",
stageStatusFallbackPathEnvKey: "/tmp/outer-fallback-status.json",
}
for key, value := range outerValues {
t.Setenv(key, value)
}

env := buildBaseNodeEnv(ResolvedArtifactPolicy{})
for key := range outerValues {
if envHasKey(env, key) {
t.Fatalf("%s should be stripped from base env", key)
}
}
}

func TestBuildBaseNodeEnv_PreservesExplicitToolchainPaths(t *testing.T) {
home := t.TempDir()
cargoHome := filepath.Join(home, ".cargo")
Expand Down
36 changes: 20 additions & 16 deletions internal/attractor/engine/resume.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,33 +70,37 @@ func resumeFromLogsRoot(ctx context.Context, logsRoot string, ov ResumeOverrides
runID string
checkpointSHA string
eng *Engine
releaseLock func()
)
defer func() {
if err == nil {
return
}
if eng != nil {
eng.persistFatalOutcome(ctx, err)
return
}
if strings.TrimSpace(logsRoot) == "" || strings.TrimSpace(runID) == "" {
return
if err != nil && !isRunOwnershipConflict(err) {
if eng != nil {
eng.persistFatalOutcome(ctx, err)
} else if strings.TrimSpace(logsRoot) != "" && strings.TrimSpace(runID) != "" {
final := runtime.FinalOutcome{
Timestamp: time.Now().UTC(),
Status: runtime.FinalFail,
RunID: runID,
FinalGitCommitSHA: strings.TrimSpace(checkpointSHA),
FailureReason: strings.TrimSpace(err.Error()),
}
_ = final.Save(filepath.Join(logsRoot, "final.json"))
}
}
final := runtime.FinalOutcome{
Timestamp: time.Now().UTC(),
Status: runtime.FinalFail,
RunID: runID,
FinalGitCommitSHA: strings.TrimSpace(checkpointSHA),
FailureReason: strings.TrimSpace(err.Error()),
if releaseLock != nil {
releaseLock()
}
_ = final.Save(filepath.Join(logsRoot, "final.json"))
}()

m, err := loadManifest(filepath.Join(logsRoot, "manifest.json"))
if err != nil {
return nil, err
}
runID = strings.TrimSpace(m.RunID)
releaseLock, err = acquireRunOwnership(logsRoot, runID)
if err != nil {
return nil, err
}
cp, err := runtime.LoadCheckpoint(filepath.Join(logsRoot, "checkpoint.json"))
if err != nil {
return nil, err
Expand Down
185 changes: 185 additions & 0 deletions internal/attractor/engine/run_ownership_lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package engine

import (
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"time"

"github.com/danshapiro/kilroy/internal/attractor/procutil"
)

const (
runOwnershipLockFile = "run.lock.json"
runOwnershipAcquireAttempts = 20
runOwnershipRetryDelay = 25 * time.Millisecond
)

type runOwnershipRecord struct {
PID int `json:"pid"`
PIDStartTime uint64 `json:"pid_start_time,omitempty"`
RunID string `json:"run_id,omitempty"`
AcquiredAt string `json:"acquired_at,omitempty"`
}

type runOwnershipConflictError struct {
LogsRoot string
LockPath string
ExistingPID int
ExistingRun string
Reason string
}

func (e *runOwnershipConflictError) Error() string {
if e == nil {
return "run ownership conflict"
}
msg := fmt.Sprintf("logs_root %q is already owned", e.LogsRoot)
if e.ExistingPID > 0 {
msg += fmt.Sprintf(" by a live process (pid=%d)", e.ExistingPID)
} else {
msg += " by another process"
}
if strings.TrimSpace(e.ExistingRun) != "" {
msg += fmt.Sprintf(" run_id=%q", e.ExistingRun)
}
if strings.TrimSpace(e.LockPath) != "" {
msg += fmt.Sprintf(" lock=%q", e.LockPath)
}
if strings.TrimSpace(e.Reason) != "" {
msg += fmt.Sprintf(" (%s)", strings.TrimSpace(e.Reason))
}
return msg
}

func isRunOwnershipConflict(err error) bool {
var ownershipErr *runOwnershipConflictError
return errors.As(err, &ownershipErr)
}

func acquireRunOwnership(logsRoot, runID string) (func(), error) {
root := strings.TrimSpace(logsRoot)
if root == "" {
return nil, fmt.Errorf("logs_root is required")
}
lockPath := filepath.Join(root, runOwnershipLockFile)
record := runOwnershipRecord{
PID: os.Getpid(),
RunID: strings.TrimSpace(runID),
AcquiredAt: time.Now().UTC().Format(time.RFC3339Nano),
}
if startTime, err := procutil.ReadPIDStartTime(record.PID); err == nil && startTime > 0 {
record.PIDStartTime = startTime
}
payload, err := json.MarshalIndent(record, "", " ")
if err != nil {
return nil, fmt.Errorf("encode run ownership: %w", err)
}
// Keep payload newline-friendly for manual inspection.
payload = append(payload, '\n')

var lastReadErr error
for attempts := 0; attempts < runOwnershipAcquireAttempts; attempts++ {
f, openErr := os.OpenFile(lockPath, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0o644)
if openErr == nil {
if _, writeErr := f.Write(payload); writeErr != nil {
_ = f.Close()
_ = os.Remove(lockPath)
return nil, fmt.Errorf("write run ownership lock %q: %w", lockPath, writeErr)
}
if closeErr := f.Close(); closeErr != nil {
_ = os.Remove(lockPath)
return nil, fmt.Errorf("close run ownership lock %q: %w", lockPath, closeErr)
}
pid := record.PID
start := record.PIDStartTime
return func() {
releaseRunOwnership(lockPath, pid, start)
}, nil
}
if !errors.Is(openErr, os.ErrExist) {
return nil, fmt.Errorf("create run ownership lock %q: %w", lockPath, openErr)
}

existing, readErr := readRunOwnership(lockPath)
if readErr != nil {
lastReadErr = readErr
time.Sleep(runOwnershipRetryDelay)
continue
}

if runOwnershipMatchesLiveProcess(existing) {
return nil, &runOwnershipConflictError{
LogsRoot: root,
LockPath: lockPath,
ExistingPID: existing.PID,
ExistingRun: strings.TrimSpace(existing.RunID),
}
}
// Stale ownership record, best-effort cleanup and retry.
if removeErr := os.Remove(lockPath); removeErr != nil && !errors.Is(removeErr, os.ErrNotExist) {
return nil, fmt.Errorf("remove stale run ownership lock %q: %w", lockPath, removeErr)
}
}
if lastReadErr != nil {
return nil, &runOwnershipConflictError{
LogsRoot: root,
LockPath: lockPath,
Reason: fmt.Sprintf("ownership lock unreadable after retries: %v", lastReadErr),
}
}
return nil, fmt.Errorf("acquire run ownership lock %q: exhausted retries", lockPath)
}

func readRunOwnership(lockPath string) (runOwnershipRecord, error) {
b, err := os.ReadFile(lockPath)
if err != nil {
return runOwnershipRecord{}, err
}
if strings.TrimSpace(string(b)) == "" {
return runOwnershipRecord{}, fmt.Errorf("empty lock payload")
}
var rec runOwnershipRecord
if err := json.Unmarshal(b, &rec); err != nil {
return runOwnershipRecord{}, err
}
if rec.PID <= 0 {
return runOwnershipRecord{}, fmt.Errorf("invalid pid %d", rec.PID)
}
return rec, nil
}

func runOwnershipMatchesLiveProcess(rec runOwnershipRecord) bool {
if rec.PID <= 0 || !procutil.PIDAlive(rec.PID) {
return false
}
if rec.PIDStartTime == 0 {
return true
}
start, err := procutil.ReadPIDStartTime(rec.PID)
if err != nil || start == 0 {
// Cannot disambiguate; keep conservative conflict behavior.
return true
}
return start == rec.PIDStartTime
}

func releaseRunOwnership(lockPath string, ownerPID int, ownerStartTime uint64) {
if strings.TrimSpace(lockPath) == "" || ownerPID <= 0 {
return
}
rec, err := readRunOwnership(lockPath)
if err != nil {
return
}
if rec.PID != ownerPID {
return
}
if ownerStartTime > 0 && rec.PIDStartTime > 0 && rec.PIDStartTime != ownerStartTime {
return
}
_ = os.Remove(lockPath)
}
Loading