From 60dcc72ef3151cfad6075788b545e37ce8c852ce Mon Sep 17 00:00:00 2001 From: Vadim Comanescu Date: Wed, 4 Mar 2026 18:41:57 +0100 Subject: [PATCH 1/2] engine: harden fallback status ingestion diagnostics --- internal/attractor/engine/handlers.go | 148 ++++++++++++++++-- .../engine/status_ingestion_retry_test.go | 134 ++++++++++++++++ internal/attractor/engine/status_json_test.go | 15 +- 3 files changed, 282 insertions(+), 15 deletions(-) create mode 100644 internal/attractor/engine/status_ingestion_retry_test.go diff --git a/internal/attractor/engine/handlers.go b/internal/attractor/engine/handlers.go index 8bcd9813..c185a98d 100644 --- a/internal/attractor/engine/handlers.go +++ b/internal/attractor/engine/handlers.go @@ -4,7 +4,9 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" + "io" "os" "os/exec" "path/filepath" @@ -235,25 +237,130 @@ type fallbackStatusPath struct { source statusSource } -func copyFirstValidFallbackStatus(stageStatusPath string, fallbackPaths []fallbackStatusPath) (statusSource, error) { +type fallbackStatusFailureMode string + +const ( + fallbackFailureModeNone fallbackStatusFailureMode = "" + fallbackFailureModeMissing fallbackStatusFailureMode = "missing" + fallbackFailureModeUnreadable fallbackStatusFailureMode = "unreadable" + fallbackFailureModeCorrupt fallbackStatusFailureMode = "corrupt" + fallbackFailureModeInvalidPayload fallbackStatusFailureMode = "invalid_payload" +) + +const ( + fallbackStatusDecodeMaxAttempts = 3 + fallbackStatusDecodeBaseDelay = 25 * time.Millisecond +) + +func copyFirstValidFallbackStatus(stageStatusPath string, fallbackPaths []fallbackStatusPath) (statusSource, string, error) { if _, err := os.Stat(stageStatusPath); err == nil { - return statusSourceCanonical, nil + return statusSourceCanonical, "", nil } + issues := make([]string, 0, len(fallbackPaths)) for _, fallback := range fallbackPaths { - b, err := os.ReadFile(fallback.path) + b, mode, err := readAndDecodeFallbackStatusWithRetry(fallback.path) if err != nil { - continue - } - if _, err := runtime.DecodeOutcomeJSON(b); err != nil { + issues = append(issues, formatFallbackStatusIssue(fallback, mode, err)) continue } if err := runtime.WriteFileAtomic(stageStatusPath, b); err != nil { - return statusSourceNone, err + return statusSourceNone, strings.Join(issues, "; "), err } _ = os.Remove(fallback.path) - return fallback.source, nil + return fallback.source, "", nil + } + return statusSourceNone, strings.Join(issues, "; "), nil +} + +func readAndDecodeFallbackStatusWithRetry(path string) ([]byte, fallbackStatusFailureMode, error) { + var lastErr error + mode := fallbackFailureModeMissing + for attempt := 1; attempt <= fallbackStatusDecodeMaxAttempts; attempt++ { + b, err := os.ReadFile(path) + if err != nil { + lastErr = err + if os.IsNotExist(err) { + mode = fallbackFailureModeMissing + } else { + mode = fallbackFailureModeUnreadable + } + if attempt < fallbackStatusDecodeMaxAttempts && shouldRetryFallbackRead(mode, err) { + time.Sleep(backoffDelay(attempt)) + continue + } + return nil, mode, lastErr + } + if _, err := runtime.DecodeOutcomeJSON(b); err != nil { + lastErr = err + mode = classifyFallbackDecodeError(b, err) + if attempt < fallbackStatusDecodeMaxAttempts && shouldRetryFallbackDecode(mode, b, err) { + time.Sleep(backoffDelay(attempt)) + continue + } + return nil, mode, lastErr + } + return b, fallbackFailureModeNone, nil + } + return nil, mode, lastErr +} + +func classifyFallbackDecodeError(raw []byte, err error) fallbackStatusFailureMode { + if isCorruptFallbackPayload(raw, err) { + return fallbackFailureModeCorrupt + } + return fallbackFailureModeInvalidPayload +} + +func shouldRetryFallbackRead(mode fallbackStatusFailureMode, err error) bool { + if mode == fallbackFailureModeMissing { + return true + } + return errors.Is(err, io.ErrUnexpectedEOF) +} + +func shouldRetryFallbackDecode(mode fallbackStatusFailureMode, raw []byte, err error) bool { + return mode == fallbackFailureModeCorrupt && isCorruptFallbackPayload(raw, err) +} + +func isCorruptFallbackPayload(raw []byte, err error) bool { + if len(strings.TrimSpace(string(raw))) == 0 { + return true + } + var syntaxErr *json.SyntaxError + if errors.As(err, &syntaxErr) { + return true + } + var typeErr *json.UnmarshalTypeError + if errors.As(err, &typeErr) { + return true + } + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { + return true + } + msg := strings.ToLower(strings.TrimSpace(err.Error())) + return strings.Contains(msg, "unexpected end of json input") || strings.Contains(msg, "invalid character") +} + +func backoffDelay(attempt int) time.Duration { + if attempt <= 0 { + attempt = 1 + } + return time.Duration(attempt) * fallbackStatusDecodeBaseDelay +} + +func formatFallbackStatusIssue(fallback fallbackStatusPath, mode fallbackStatusFailureMode, err error) string { + switch mode { + case fallbackFailureModeMissing: + return fmt.Sprintf("fallback[%s] missing status artifact: %s", fallback.source, fallback.path) + case fallbackFailureModeUnreadable: + return fmt.Sprintf("fallback[%s] unreadable status artifact: %s (%v)", fallback.source, fallback.path, err) + case fallbackFailureModeCorrupt: + return fmt.Sprintf("fallback[%s] corrupt status artifact: %s (%v)", fallback.source, fallback.path, err) + case fallbackFailureModeInvalidPayload: + return fmt.Sprintf("fallback[%s] invalid status payload: %s (%v)", fallback.source, fallback.path, err) + default: + return fmt.Sprintf("fallback[%s] status ingestion error: %s (%v)", fallback.source, fallback.path, err) } - return statusSourceNone, nil } func buildManualBoxFanInPromptPreamble(exec *Execution, node *model.Node) string { @@ -468,20 +575,29 @@ func (h *CodergenHandler) Execute(ctx context.Context, exec *Execution, node *mo // If the backend/agent wrote a worktree status.json, surface it to the engine by // copying it into the authoritative stage directory location. source := statusSourceNone + ingestionDiagnostic := "" if len(worktreeStatusPaths) > 0 { var err error - source, err = copyFirstValidFallbackStatus(stageStatusPath, worktreeStatusPaths) + source, ingestionDiagnostic, err = copyFirstValidFallbackStatus(stageStatusPath, worktreeStatusPaths) if err != nil { - return runtime.Outcome{Status: runtime.StatusFail, FailureReason: err.Error()}, nil + reason := err.Error() + if strings.TrimSpace(ingestionDiagnostic) != "" { + reason = reason + "; " + strings.TrimSpace(ingestionDiagnostic) + } + return runtime.Outcome{Status: runtime.StatusFail, FailureReason: reason}, nil } } if exec != nil && exec.Engine != nil { - exec.Engine.appendProgress(map[string]any{ + progress := map[string]any{ "event": "status_ingestion_decision", "node_id": node.ID, "source": string(source), "copied": source == statusSourceWorktree || source == statusSourceDotAI, - }) + } + if strings.TrimSpace(ingestionDiagnostic) != "" { + progress["diagnostic"] = strings.TrimSpace(ingestionDiagnostic) + } + exec.Engine.appendProgress(progress) } if out != nil { @@ -523,9 +639,13 @@ func (h *CodergenHandler) Execute(ctx context.Context, exec *Execution, node *mo }, }, nil } + reason := "missing status.json (auto_status=false)" + if strings.TrimSpace(ingestionDiagnostic) != "" { + reason = reason + "; " + strings.TrimSpace(ingestionDiagnostic) + } return runtime.Outcome{ Status: runtime.StatusFail, - FailureReason: "missing status.json (auto_status=false)", + FailureReason: reason, Notes: "codergen completed without an outcome or status.json", ContextUpdates: map[string]any{ "last_stage": node.ID, diff --git a/internal/attractor/engine/status_ingestion_retry_test.go b/internal/attractor/engine/status_ingestion_retry_test.go new file mode 100644 index 00000000..e66c9e42 --- /dev/null +++ b/internal/attractor/engine/status_ingestion_retry_test.go @@ -0,0 +1,134 @@ +package engine + +import ( + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/danshapiro/kilroy/internal/attractor/runtime" +) + +func TestCopyFirstValidFallbackStatus_CanonicalStageStatusWins(t *testing.T) { + tmp := t.TempDir() + stageStatusPath := filepath.Join(tmp, "logs", "a", "status.json") + fallbackPath := filepath.Join(tmp, "status.json") + + if err := runtime.WriteFileAtomic(stageStatusPath, []byte(`{"status":"success","notes":"canonical"}`)); err != nil { + t.Fatalf("write canonical status: %v", err) + } + if err := os.WriteFile(fallbackPath, []byte(`{"status":"fail","failure_reason":"fallback"}`), 0o644); err != nil { + t.Fatalf("write fallback status: %v", err) + } + + source, diagnostic, err := copyFirstValidFallbackStatus(stageStatusPath, []fallbackStatusPath{ + {path: fallbackPath, source: statusSourceWorktree}, + }) + if err != nil { + t.Fatalf("copyFirstValidFallbackStatus: %v", err) + } + if source != statusSourceCanonical { + t.Fatalf("source=%q want %q", source, statusSourceCanonical) + } + if strings.TrimSpace(diagnostic) != "" { + t.Fatalf("diagnostic=%q want empty", diagnostic) + } + + b, err := os.ReadFile(stageStatusPath) + if err != nil { + t.Fatalf("read stage status: %v", err) + } + out, err := runtime.DecodeOutcomeJSON(b) + if err != nil { + t.Fatalf("decode stage status: %v", err) + } + if out.Status != runtime.StatusSuccess { + t.Fatalf("status=%q want %q", out.Status, runtime.StatusSuccess) + } +} + +func TestCopyFirstValidFallbackStatus_MissingFallbackIsDiagnosed(t *testing.T) { + tmp := t.TempDir() + stageStatusPath := filepath.Join(tmp, "logs", "a", "status.json") + missingPath := filepath.Join(tmp, "missing-status.json") + + source, diagnostic, err := copyFirstValidFallbackStatus(stageStatusPath, []fallbackStatusPath{ + {path: missingPath, source: statusSourceWorktree}, + }) + if err != nil { + t.Fatalf("copyFirstValidFallbackStatus: %v", err) + } + if source != statusSourceNone { + t.Fatalf("source=%q want empty", source) + } + if !strings.Contains(diagnostic, "missing status artifact") { + t.Fatalf("diagnostic=%q want mention of missing status artifact", diagnostic) + } +} + +func TestCopyFirstValidFallbackStatus_PermanentCorruptFallbackIsDiagnosed(t *testing.T) { + tmp := t.TempDir() + stageStatusPath := filepath.Join(tmp, "logs", "a", "status.json") + fallbackPath := filepath.Join(tmp, "status.json") + + if err := os.WriteFile(fallbackPath, []byte(`{ this is invalid json }`), 0o644); err != nil { + t.Fatalf("write corrupt fallback: %v", err) + } + + source, diagnostic, err := copyFirstValidFallbackStatus(stageStatusPath, []fallbackStatusPath{ + {path: fallbackPath, source: statusSourceWorktree}, + }) + if err != nil { + t.Fatalf("copyFirstValidFallbackStatus: %v", err) + } + if source != statusSourceNone { + t.Fatalf("source=%q want empty", source) + } + if !strings.Contains(diagnostic, "corrupt status artifact") { + t.Fatalf("diagnostic=%q want mention of corrupt status artifact", diagnostic) + } +} + +func TestCopyFirstValidFallbackStatus_RetryDecodeSucceedsAfterTransientCorruption(t *testing.T) { + tmp := t.TempDir() + stageStatusPath := filepath.Join(tmp, "logs", "a", "status.json") + fallbackPath := filepath.Join(tmp, "status.json") + + if err := os.WriteFile(fallbackPath, []byte(`{ this is invalid json }`), 0o644); err != nil { + t.Fatalf("seed transient corrupt fallback: %v", err) + } + + go func() { + time.Sleep(fallbackStatusDecodeBaseDelay + 10*time.Millisecond) + _ = os.WriteFile(fallbackPath, []byte(`{"status":"fail","failure_reason":"transient decode retry success"}`), 0o644) + }() + + source, diagnostic, err := copyFirstValidFallbackStatus(stageStatusPath, []fallbackStatusPath{ + {path: fallbackPath, source: statusSourceWorktree}, + }) + if err != nil { + t.Fatalf("copyFirstValidFallbackStatus: %v", err) + } + if source != statusSourceWorktree { + t.Fatalf("source=%q want %q", source, statusSourceWorktree) + } + if strings.TrimSpace(diagnostic) != "" { + t.Fatalf("diagnostic=%q want empty", diagnostic) + } + + b, err := os.ReadFile(stageStatusPath) + if err != nil { + t.Fatalf("read copied stage status: %v", err) + } + out, err := runtime.DecodeOutcomeJSON(b) + if err != nil { + t.Fatalf("decode copied stage status: %v", err) + } + if out.Status != runtime.StatusFail { + t.Fatalf("status=%q want %q", out.Status, runtime.StatusFail) + } + if out.FailureReason != "transient decode retry success" { + t.Fatalf("failure_reason=%q want %q", out.FailureReason, "transient decode retry success") + } +} diff --git a/internal/attractor/engine/status_json_test.go b/internal/attractor/engine/status_json_test.go index df90a1f0..6bdc5579 100644 --- a/internal/attractor/engine/status_json_test.go +++ b/internal/attractor/engine/status_json_test.go @@ -74,8 +74,21 @@ func TestCodergenStatusIngestion_FallbackOnlyWhenCanonicalMissing(t *testing.T) } func TestCodergenStatusIngestion_InvalidFallbackIsRejected(t *testing.T) { - _, source := runStatusIngestionFixture(t, false, false, true) + out, source := runStatusIngestionFixture(t, false, false, true) if source != "" { t.Fatalf("source=%q want empty", source) } + if out.Status != runtime.StatusFail { + t.Fatalf("status=%q want %q", out.Status, runtime.StatusFail) + } +} + +func TestCodergenStatusIngestion_MissingFallbackIsDiagnosed(t *testing.T) { + out, source := runStatusIngestionFixture(t, false, false, false) + if source != "" { + t.Fatalf("source=%q want empty", source) + } + if out.Status != runtime.StatusFail { + t.Fatalf("status=%q want %q", out.Status, runtime.StatusFail) + } } From b671b6ba6e2c4ee77f5bd29b7b8dc389b8028322 Mon Sep 17 00:00:00 2001 From: Vadim Comanescu Date: Wed, 4 Mar 2026 18:41:57 +0100 Subject: [PATCH 2/2] engine: fix fallback retry latency and payload classification --- internal/attractor/engine/handlers.go | 8 ++-- .../engine/status_ingestion_retry_test.go | 47 +++++++++++++++++++ 2 files changed, 50 insertions(+), 5 deletions(-) diff --git a/internal/attractor/engine/handlers.go b/internal/attractor/engine/handlers.go index c185a98d..9567c2d1 100644 --- a/internal/attractor/engine/handlers.go +++ b/internal/attractor/engine/handlers.go @@ -313,7 +313,9 @@ func classifyFallbackDecodeError(raw []byte, err error) fallbackStatusFailureMod func shouldRetryFallbackRead(mode fallbackStatusFailureMode, err error) bool { if mode == fallbackFailureModeMissing { - return true + // Missing fallback files are expected in normal flows; retrying them + // just adds latency before trying the next candidate path. + return false } return errors.Is(err, io.ErrUnexpectedEOF) } @@ -330,10 +332,6 @@ func isCorruptFallbackPayload(raw []byte, err error) bool { if errors.As(err, &syntaxErr) { return true } - var typeErr *json.UnmarshalTypeError - if errors.As(err, &typeErr) { - return true - } if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { return true } diff --git a/internal/attractor/engine/status_ingestion_retry_test.go b/internal/attractor/engine/status_ingestion_retry_test.go index e66c9e42..ce730c31 100644 --- a/internal/attractor/engine/status_ingestion_retry_test.go +++ b/internal/attractor/engine/status_ingestion_retry_test.go @@ -1,6 +1,8 @@ package engine import ( + "errors" + "io" "os" "path/filepath" "strings" @@ -132,3 +134,48 @@ func TestCopyFirstValidFallbackStatus_RetryDecodeSucceedsAfterTransientCorruptio t.Fatalf("failure_reason=%q want %q", out.FailureReason, "transient decode retry success") } } + +func TestCopyFirstValidFallbackStatus_TypeMismatchIsInvalidPayload(t *testing.T) { + tmp := t.TempDir() + stageStatusPath := filepath.Join(tmp, "logs", "a", "status.json") + fallbackPath := filepath.Join(tmp, "status.json") + + // Deterministic schema/type violation: status must be a string. + if err := os.WriteFile(fallbackPath, []byte(`{"status":123}`), 0o644); err != nil { + t.Fatalf("write invalid payload fallback: %v", err) + } + + source, diagnostic, err := copyFirstValidFallbackStatus(stageStatusPath, []fallbackStatusPath{ + {path: fallbackPath, source: statusSourceWorktree}, + }) + if err != nil { + t.Fatalf("copyFirstValidFallbackStatus: %v", err) + } + if source != statusSourceNone { + t.Fatalf("source=%q want empty", source) + } + if !strings.Contains(diagnostic, "invalid status payload") { + t.Fatalf("diagnostic=%q want mention of invalid status payload", diagnostic) + } + if strings.Contains(diagnostic, "corrupt status artifact") { + t.Fatalf("diagnostic=%q should not classify type mismatch as corrupt artifact", diagnostic) + } +} + +func TestShouldRetryFallbackRead_DoesNotRetryMissing(t *testing.T) { + if shouldRetryFallbackRead(fallbackFailureModeMissing, os.ErrNotExist) { + t.Fatalf("missing fallback paths should not be retried") + } +} + +func TestShouldRetryFallbackRead_RetriesUnexpectedEOF(t *testing.T) { + if !shouldRetryFallbackRead(fallbackFailureModeUnreadable, io.ErrUnexpectedEOF) { + t.Fatalf("unexpected EOF should remain retryable for unreadable fallback artifacts") + } +} + +func TestShouldRetryFallbackRead_DoesNotRetryRegularUnreadable(t *testing.T) { + if shouldRetryFallbackRead(fallbackFailureModeUnreadable, errors.New("permission denied")) { + t.Fatalf("regular unreadable fallback artifacts should not be retried") + } +}