Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 132 additions & 14 deletions internal/attractor/engine/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
Expand Down Expand Up @@ -235,25 +237,128 @@ type fallbackStatusPath struct {
source statusSource
}

func copyFirstValidFallbackStatus(stageStatusPath string, fallbackPaths []fallbackStatusPath) (statusSource, error) {
type fallbackStatusFailureMode string

const (
fallbackFailureModeNone fallbackStatusFailureMode = ""
fallbackFailureModeMissing fallbackStatusFailureMode = "missing"
fallbackFailureModeUnreadable fallbackStatusFailureMode = "unreadable"
fallbackFailureModeCorrupt fallbackStatusFailureMode = "corrupt"
fallbackFailureModeInvalidPayload fallbackStatusFailureMode = "invalid_payload"
)

const (
fallbackStatusDecodeMaxAttempts = 3
fallbackStatusDecodeBaseDelay = 25 * time.Millisecond
)

func copyFirstValidFallbackStatus(stageStatusPath string, fallbackPaths []fallbackStatusPath) (statusSource, string, error) {
if _, err := os.Stat(stageStatusPath); err == nil {
return statusSourceCanonical, nil
return statusSourceCanonical, "", nil
}
issues := make([]string, 0, len(fallbackPaths))
for _, fallback := range fallbackPaths {
b, err := os.ReadFile(fallback.path)
b, mode, err := readAndDecodeFallbackStatusWithRetry(fallback.path)
if err != nil {
continue
}
if _, err := runtime.DecodeOutcomeJSON(b); err != nil {
issues = append(issues, formatFallbackStatusIssue(fallback, mode, err))
continue
}
if err := runtime.WriteFileAtomic(stageStatusPath, b); err != nil {
return statusSourceNone, err
return statusSourceNone, strings.Join(issues, "; "), err
}
_ = os.Remove(fallback.path)
return fallback.source, nil
return fallback.source, "", nil
}
return statusSourceNone, strings.Join(issues, "; "), nil
}

func readAndDecodeFallbackStatusWithRetry(path string) ([]byte, fallbackStatusFailureMode, error) {
var lastErr error
mode := fallbackFailureModeMissing
for attempt := 1; attempt <= fallbackStatusDecodeMaxAttempts; attempt++ {
b, err := os.ReadFile(path)
if err != nil {
lastErr = err
if os.IsNotExist(err) {
mode = fallbackFailureModeMissing
} else {
mode = fallbackFailureModeUnreadable
}
if attempt < fallbackStatusDecodeMaxAttempts && shouldRetryFallbackRead(mode, err) {
time.Sleep(backoffDelay(attempt))
continue
}
return nil, mode, lastErr
}
if _, err := runtime.DecodeOutcomeJSON(b); err != nil {
lastErr = err
mode = classifyFallbackDecodeError(b, err)
if attempt < fallbackStatusDecodeMaxAttempts && shouldRetryFallbackDecode(mode, b, err) {
time.Sleep(backoffDelay(attempt))
continue
}
return nil, mode, lastErr
}
return b, fallbackFailureModeNone, nil
}
return nil, mode, lastErr
}

func classifyFallbackDecodeError(raw []byte, err error) fallbackStatusFailureMode {
if isCorruptFallbackPayload(raw, err) {
return fallbackFailureModeCorrupt
}
return fallbackFailureModeInvalidPayload
}

func shouldRetryFallbackRead(mode fallbackStatusFailureMode, err error) bool {
if mode == fallbackFailureModeMissing {
// Missing fallback files are expected in normal flows; retrying them
// just adds latency before trying the next candidate path.
return false
}
return errors.Is(err, io.ErrUnexpectedEOF)
}

func shouldRetryFallbackDecode(mode fallbackStatusFailureMode, raw []byte, err error) bool {
return mode == fallbackFailureModeCorrupt && isCorruptFallbackPayload(raw, err)
}

func isCorruptFallbackPayload(raw []byte, err error) bool {
if len(strings.TrimSpace(string(raw))) == 0 {
return true
}
var syntaxErr *json.SyntaxError
if errors.As(err, &syntaxErr) {
return true
}
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
return true
}
msg := strings.ToLower(strings.TrimSpace(err.Error()))
return strings.Contains(msg, "unexpected end of json input") || strings.Contains(msg, "invalid character")
}

func backoffDelay(attempt int) time.Duration {
if attempt <= 0 {
attempt = 1
}
return time.Duration(attempt) * fallbackStatusDecodeBaseDelay
}

func formatFallbackStatusIssue(fallback fallbackStatusPath, mode fallbackStatusFailureMode, err error) string {
switch mode {
case fallbackFailureModeMissing:
return fmt.Sprintf("fallback[%s] missing status artifact: %s", fallback.source, fallback.path)
case fallbackFailureModeUnreadable:
return fmt.Sprintf("fallback[%s] unreadable status artifact: %s (%v)", fallback.source, fallback.path, err)
case fallbackFailureModeCorrupt:
return fmt.Sprintf("fallback[%s] corrupt status artifact: %s (%v)", fallback.source, fallback.path, err)
case fallbackFailureModeInvalidPayload:
return fmt.Sprintf("fallback[%s] invalid status payload: %s (%v)", fallback.source, fallback.path, err)
default:
return fmt.Sprintf("fallback[%s] status ingestion error: %s (%v)", fallback.source, fallback.path, err)
}
return statusSourceNone, nil
}

func buildManualBoxFanInPromptPreamble(exec *Execution, node *model.Node) string {
Expand Down Expand Up @@ -468,20 +573,29 @@ func (h *CodergenHandler) Execute(ctx context.Context, exec *Execution, node *mo
// If the backend/agent wrote a worktree status.json, surface it to the engine by
// copying it into the authoritative stage directory location.
source := statusSourceNone
ingestionDiagnostic := ""
if len(worktreeStatusPaths) > 0 {
var err error
source, err = copyFirstValidFallbackStatus(stageStatusPath, worktreeStatusPaths)
source, ingestionDiagnostic, err = copyFirstValidFallbackStatus(stageStatusPath, worktreeStatusPaths)
if err != nil {
return runtime.Outcome{Status: runtime.StatusFail, FailureReason: err.Error()}, nil
reason := err.Error()
if strings.TrimSpace(ingestionDiagnostic) != "" {
reason = reason + "; " + strings.TrimSpace(ingestionDiagnostic)
}
return runtime.Outcome{Status: runtime.StatusFail, FailureReason: reason}, nil
}
}
if exec != nil && exec.Engine != nil {
exec.Engine.appendProgress(map[string]any{
progress := map[string]any{
"event": "status_ingestion_decision",
"node_id": node.ID,
"source": string(source),
"copied": source == statusSourceWorktree || source == statusSourceDotAI,
})
}
if strings.TrimSpace(ingestionDiagnostic) != "" {
progress["diagnostic"] = strings.TrimSpace(ingestionDiagnostic)
}
exec.Engine.appendProgress(progress)
}

if out != nil {
Expand Down Expand Up @@ -523,9 +637,13 @@ func (h *CodergenHandler) Execute(ctx context.Context, exec *Execution, node *mo
},
}, nil
}
reason := "missing status.json (auto_status=false)"
if strings.TrimSpace(ingestionDiagnostic) != "" {
reason = reason + "; " + strings.TrimSpace(ingestionDiagnostic)
}
return runtime.Outcome{
Status: runtime.StatusFail,
FailureReason: "missing status.json (auto_status=false)",
FailureReason: reason,
Notes: "codergen completed without an outcome or status.json",
ContextUpdates: map[string]any{
"last_stage": node.ID,
Expand Down
181 changes: 181 additions & 0 deletions internal/attractor/engine/status_ingestion_retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package engine

import (
"errors"
"io"
"os"
"path/filepath"
"strings"
"testing"
"time"

"github.com/danshapiro/kilroy/internal/attractor/runtime"
)

func TestCopyFirstValidFallbackStatus_CanonicalStageStatusWins(t *testing.T) {
tmp := t.TempDir()
stageStatusPath := filepath.Join(tmp, "logs", "a", "status.json")
fallbackPath := filepath.Join(tmp, "status.json")

if err := runtime.WriteFileAtomic(stageStatusPath, []byte(`{"status":"success","notes":"canonical"}`)); err != nil {
t.Fatalf("write canonical status: %v", err)
}
if err := os.WriteFile(fallbackPath, []byte(`{"status":"fail","failure_reason":"fallback"}`), 0o644); err != nil {
t.Fatalf("write fallback status: %v", err)
}

source, diagnostic, err := copyFirstValidFallbackStatus(stageStatusPath, []fallbackStatusPath{
{path: fallbackPath, source: statusSourceWorktree},
})
if err != nil {
t.Fatalf("copyFirstValidFallbackStatus: %v", err)
}
if source != statusSourceCanonical {
t.Fatalf("source=%q want %q", source, statusSourceCanonical)
}
if strings.TrimSpace(diagnostic) != "" {
t.Fatalf("diagnostic=%q want empty", diagnostic)
}

b, err := os.ReadFile(stageStatusPath)
if err != nil {
t.Fatalf("read stage status: %v", err)
}
out, err := runtime.DecodeOutcomeJSON(b)
if err != nil {
t.Fatalf("decode stage status: %v", err)
}
if out.Status != runtime.StatusSuccess {
t.Fatalf("status=%q want %q", out.Status, runtime.StatusSuccess)
}
}

func TestCopyFirstValidFallbackStatus_MissingFallbackIsDiagnosed(t *testing.T) {
tmp := t.TempDir()
stageStatusPath := filepath.Join(tmp, "logs", "a", "status.json")
missingPath := filepath.Join(tmp, "missing-status.json")

source, diagnostic, err := copyFirstValidFallbackStatus(stageStatusPath, []fallbackStatusPath{
{path: missingPath, source: statusSourceWorktree},
})
if err != nil {
t.Fatalf("copyFirstValidFallbackStatus: %v", err)
}
if source != statusSourceNone {
t.Fatalf("source=%q want empty", source)
}
if !strings.Contains(diagnostic, "missing status artifact") {
t.Fatalf("diagnostic=%q want mention of missing status artifact", diagnostic)
}
}

func TestCopyFirstValidFallbackStatus_PermanentCorruptFallbackIsDiagnosed(t *testing.T) {
tmp := t.TempDir()
stageStatusPath := filepath.Join(tmp, "logs", "a", "status.json")
fallbackPath := filepath.Join(tmp, "status.json")

if err := os.WriteFile(fallbackPath, []byte(`{ this is invalid json }`), 0o644); err != nil {
t.Fatalf("write corrupt fallback: %v", err)
}

source, diagnostic, err := copyFirstValidFallbackStatus(stageStatusPath, []fallbackStatusPath{
{path: fallbackPath, source: statusSourceWorktree},
})
if err != nil {
t.Fatalf("copyFirstValidFallbackStatus: %v", err)
}
if source != statusSourceNone {
t.Fatalf("source=%q want empty", source)
}
if !strings.Contains(diagnostic, "corrupt status artifact") {
t.Fatalf("diagnostic=%q want mention of corrupt status artifact", diagnostic)
}
}

func TestCopyFirstValidFallbackStatus_RetryDecodeSucceedsAfterTransientCorruption(t *testing.T) {
tmp := t.TempDir()
stageStatusPath := filepath.Join(tmp, "logs", "a", "status.json")
fallbackPath := filepath.Join(tmp, "status.json")

if err := os.WriteFile(fallbackPath, []byte(`{ this is invalid json }`), 0o644); err != nil {
t.Fatalf("seed transient corrupt fallback: %v", err)
}

go func() {
time.Sleep(fallbackStatusDecodeBaseDelay + 10*time.Millisecond)
_ = os.WriteFile(fallbackPath, []byte(`{"status":"fail","failure_reason":"transient decode retry success"}`), 0o644)
}()

source, diagnostic, err := copyFirstValidFallbackStatus(stageStatusPath, []fallbackStatusPath{
{path: fallbackPath, source: statusSourceWorktree},
})
if err != nil {
t.Fatalf("copyFirstValidFallbackStatus: %v", err)
}
if source != statusSourceWorktree {
t.Fatalf("source=%q want %q", source, statusSourceWorktree)
}
if strings.TrimSpace(diagnostic) != "" {
t.Fatalf("diagnostic=%q want empty", diagnostic)
}

b, err := os.ReadFile(stageStatusPath)
if err != nil {
t.Fatalf("read copied stage status: %v", err)
}
out, err := runtime.DecodeOutcomeJSON(b)
if err != nil {
t.Fatalf("decode copied stage status: %v", err)
}
if out.Status != runtime.StatusFail {
t.Fatalf("status=%q want %q", out.Status, runtime.StatusFail)
}
if out.FailureReason != "transient decode retry success" {
t.Fatalf("failure_reason=%q want %q", out.FailureReason, "transient decode retry success")
}
}

func TestCopyFirstValidFallbackStatus_TypeMismatchIsInvalidPayload(t *testing.T) {
tmp := t.TempDir()
stageStatusPath := filepath.Join(tmp, "logs", "a", "status.json")
fallbackPath := filepath.Join(tmp, "status.json")

// Deterministic schema/type violation: status must be a string.
if err := os.WriteFile(fallbackPath, []byte(`{"status":123}`), 0o644); err != nil {
t.Fatalf("write invalid payload fallback: %v", err)
}

source, diagnostic, err := copyFirstValidFallbackStatus(stageStatusPath, []fallbackStatusPath{
{path: fallbackPath, source: statusSourceWorktree},
})
if err != nil {
t.Fatalf("copyFirstValidFallbackStatus: %v", err)
}
if source != statusSourceNone {
t.Fatalf("source=%q want empty", source)
}
if !strings.Contains(diagnostic, "invalid status payload") {
t.Fatalf("diagnostic=%q want mention of invalid status payload", diagnostic)
}
if strings.Contains(diagnostic, "corrupt status artifact") {
t.Fatalf("diagnostic=%q should not classify type mismatch as corrupt artifact", diagnostic)
}
}

func TestShouldRetryFallbackRead_DoesNotRetryMissing(t *testing.T) {
if shouldRetryFallbackRead(fallbackFailureModeMissing, os.ErrNotExist) {
t.Fatalf("missing fallback paths should not be retried")
}
}

func TestShouldRetryFallbackRead_RetriesUnexpectedEOF(t *testing.T) {
if !shouldRetryFallbackRead(fallbackFailureModeUnreadable, io.ErrUnexpectedEOF) {
t.Fatalf("unexpected EOF should remain retryable for unreadable fallback artifacts")
}
}

func TestShouldRetryFallbackRead_DoesNotRetryRegularUnreadable(t *testing.T) {
if shouldRetryFallbackRead(fallbackFailureModeUnreadable, errors.New("permission denied")) {
t.Fatalf("regular unreadable fallback artifacts should not be retried")
}
}
Loading