From 1e295881c9cc4f006c62a5bfae22835d11eb0178 Mon Sep 17 00:00:00 2001 From: Oleksii Dolhov Date: Wed, 24 Jun 2026 15:14:18 +0300 Subject: [PATCH 1/2] =?UTF-8?q?feat(loops):=20configurable=20failure=20pol?= =?UTF-8?q?icy=20=E2=80=94=20fail-fast=20vs=20continue-on-error=20(#1167)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sequential agent loops (#740) were fail-fast: the first failed iteration aborted the whole loop. Add a per-loop policy to tolerate failures and keep going, bounded so a fully-broken agent still terminates. - config: `on_failure` ('abort' default = current fail-fast, backward compatible; 'continue' tolerates a failed iteration) + `max_consecutive_failures` (default 3, range 1–100). Both plumbed end to end (Invariant #13). - runner (loop_service.py): both failure surfaces gated — raised exception AND non-success TaskExecutionResult. Continue mode finalizes the failed agent_loop_runs row, increments failed_runs/consecutive_failures, and proceeds; a success resets the streak. Reaching max_runs (or stop-signal) with tolerated failures finalizes as `completed_with_errors`; hitting the consecutive cap finalizes `failed`/`stop_reason=max_consecutive_failures`. {{previous_response}} keeps the last *successful* response (a failed iteration never overwrites it). - schema/migration: agent_loops gains on_failure, max_consecutive_failures, failed_runs (schema.py + tables.py Core + versioned migration). New terminal status `completed_with_errors`. - api: POST /loops accepts on_failure/max_consecutive_failures; LoopStatusResponse surfaces failed_runs/on_failure/max_consecutive_failures. - mcp: run_agent_loop gains the two params (tools/loops.ts + client.ts). - ui: LoopsPanel failure-policy controls + failed_runs/completed_with_errors surfacing. - tests: abort unchanged, continue past a failed run (both surfaces), consecutive cutoff, streak-reset; schema-parity + migrations green. 42 passed. Related to #1167 Co-Authored-By: Claude Opus 4.8 --- docs/memory/architecture.md | 11 +- docs/memory/requirements.md | 25 ++++ src/backend/database.py | 6 +- src/backend/db/loops.py | 53 +++++++-- src/backend/db/migrations.py | 23 ++++ src/backend/db/schema.py | 3 + src/backend/db/tables.py | 3 + src/backend/routers/loops.py | 19 ++- src/backend/services/loop_service.py | 84 +++++++++++-- src/frontend/src/components/LoopsPanel.vue | 39 ++++++- src/mcp-server/src/client.ts | 3 + src/mcp-server/src/tools/loops.ts | 20 ++++ tests/unit/test_loop_service.py | 130 ++++++++++++++++++++- 13 files changed, 389 insertions(+), 30 deletions(-) diff --git a/docs/memory/architecture.md b/docs/memory/architecture.md index 97ae86c2..fd0ea8a2 100644 --- a/docs/memory/architecture.md +++ b/docs/memory/architecture.md @@ -426,6 +426,8 @@ Backend orchestration in `services/subscription_auto_switch.py`: `_hot_reload_su Bounded sequential task execution against one agent. Runner is an in-process `asyncio.Task` spawned by `loop_service.py`; each iteration dispatches through `task_execution_service.execute_task()` with `triggered_by="loop"` and the parent `loop_id` carried on the resulting `schedule_executions` row — iterations go through the standard `capacity_manager` admit/slot path, sharing the agent's `max_parallel_tasks` budget. Message template supports `{{run}}` and `{{previous_response}}`; `max_runs` 1–100 hard cap; optional `stop_signal` (until-mode), `delay_seconds`, `timeout_per_run`, `model`, `allowed_tools`. Stop is cooperative: `POST /api/loops/{id}/stop` flips an in-process `should_stop` flag; the current iteration finishes and the runner exits with `stop_reason="user_stopped"`. Restart recovery via the cleanup-service startup hook (above); no auto-resume. WS events `loop_run_completed`/`loop_completed`. +**Failure policy (#1167):** per-loop `on_failure` — `abort` (default; fail-fast, first failed iteration ends the loop `failed`/`stop_reason=error`) or `continue` (tolerate a failed iteration and proceed). Both failure surfaces are gated: a raised exception from `execute_task` and a non-success `TaskExecutionResult`. Continue mode is bounded by `max_consecutive_failures` (default 3) — once that many iterations fail in a row the loop aborts `failed`/`stop_reason=max_consecutive_failures`; a success resets the streak. A continue-mode loop that reaches `max_runs` (or matches its stop-signal) with ≥1 tolerated failure finalizes as `completed_with_errors`, with the `failed_runs` count surfaced. `{{previous_response}}` always carries the last *successful* response (a failed iteration never overwrites it). + **Web UI (#1106):** a **Loops** tab on Agent Detail (`components/LoopsPanel.vue` + agent-scoped `stores/loops.js`; `setAgent(name)` on mount, `clear()` on unmount). The global WS handler routes the fleet-wide loop events to the store, which filters by mounted agent and targeted-refreshes only the affected loop; a 12s backstop poll runs while any loop is `queued`/`running` to recover a missed terminal event. Last full response rendered via `utils/markdown.js` (DOMPurify). ### Session Tab @@ -741,7 +743,7 @@ Coverage: agent lifecycle, auth, sharing, credentials, settings, rename; request ### Sequential Agent Loops (#740) | Method | Path | Auth | Description | |--------|------|------|-------------| -| POST | `/api/agents/{name}/loops` | JWT/MCP | Start loop; 202 with `{loop_id, status, agent_name, max_runs}`. Body: `message` (template), `max_runs` (1–100, required), `stop_signal`, `delay_seconds`, `timeout_per_run`, `model`, `allowed_tools` | +| POST | `/api/agents/{name}/loops` | JWT/MCP | Start loop; 202 with `{loop_id, status, agent_name, max_runs}`. Body: `message` (template), `max_runs` (1–100, required), `stop_signal`, `delay_seconds`, `timeout_per_run`, `on_failure` (`abort` default \| `continue`, #1167), `max_consecutive_failures` (continue-mode cutoff, default 3), `model`, `allowed_tools` | | GET | `/api/agents/{name}/loops` | JWT/MCP | List loops (`?status=`, `?limit=` 1–200 default 50) | | GET | `/api/loops/{loop_id}` | JWT/MCP | Status + per-run summaries + last full response; 404 unknown, 403 if caller neither initiator nor agent-accessor | | POST | `/api/loops/{loop_id}/stop` | JWT/MCP | Graceful stop → `{status: "stopping" \| "already_done"}` | @@ -999,11 +1001,14 @@ CREATE TABLE agent_loops ( stop_signal TEXT, -- NULL = fixed mode; set = until mode delay_seconds INTEGER NOT NULL DEFAULT 0, timeout_per_run INTEGER, -- NULL = agent's execution_timeout_seconds + on_failure TEXT NOT NULL DEFAULT 'abort', -- #1167: abort (fail-fast) | continue (tolerate failed iterations) + max_consecutive_failures INTEGER NOT NULL DEFAULT 3, -- #1167: continue-mode cutoff (1–100) model TEXT, allowed_tools TEXT, -- JSON array - status TEXT NOT NULL, -- queued | running | completed | stopped | failed | interrupted + status TEXT NOT NULL, -- queued | running | completed | completed_with_errors | stopped | failed | interrupted runs_completed INTEGER NOT NULL DEFAULT 0, - stop_reason TEXT, -- max_runs_reached | stop_signal_matched | user_stopped | error | interrupted + failed_runs INTEGER NOT NULL DEFAULT 0, -- #1167: tolerated-failure count (continue mode) + stop_reason TEXT, -- max_runs_reached | stop_signal_matched | user_stopped | error | max_consecutive_failures | interrupted last_response TEXT, error TEXT, started_by_user_id INTEGER, diff --git a/docs/memory/requirements.md b/docs/memory/requirements.md index a12cbabe..79e986b4 100644 --- a/docs/memory/requirements.md +++ b/docs/memory/requirements.md @@ -2747,6 +2747,31 @@ Standalone mobile-friendly admin page for managing agents on the go. Designed as auto-resume after restart; cross-agent loops (`agent` parameter is `"self"` only for v1, matching `fan_out`). +### 38.2 Configurable Loop Failure Policy (#1167) + +**Description**: A per-loop policy controls what happens when an iteration +fails. Default is fail-fast (backward compatible); `continue` mode tolerates a +failed iteration and proceeds, bounded so a fully-broken agent still terminates. + +- **FR-1 — `on_failure`**: `abort` (default — first failed iteration ends the + loop as `failed`/`stop_reason=error`, current behavior) or `continue`. +- **FR-2 — `max_consecutive_failures`** (default 3, range 1–100): in `continue` + mode the loop aborts as `failed` with `stop_reason=max_consecutive_failures` + once this many iterations fail in a row; a success resets the streak. +- **FR-3 — Both failure surfaces** honored: a raised exception from + `execute_task` AND a non-success `TaskExecutionResult` (TIMEOUT / AGENT_ERROR + / CIRCUIT_OPEN / AUTH). Each failed iteration finalizes its `agent_loop_runs` + row as `failed`, then (continue mode) the loop proceeds to the next run. +- **FR-4 — Terminal status**: a continue-mode loop that reaches `max_runs` (or + matches its stop-signal) with ≥1 tolerated failure finalizes as + `completed_with_errors`; the `failed_runs` count is surfaced on the loop row + and API/UI. +- **FR-5 — `{{previous_response}}`**: carries the last *successful* response — a + failed iteration does not overwrite it. +- **FR-6 — Plumbed through all surfaces** (Invariant #13): `agent_loops` schema + + migration, `POST /api/agents/{name}/loops`, MCP `run_agent_loop`, and the + Loops panel UI. Unset = `abort`, a strict no-op for existing callers. + --- ## 39. VoIP Telephony (VOIP-001) diff --git a/src/backend/database.py b/src/backend/database.py index 018444ba..57c7422d 100644 --- a/src/backend/database.py +++ b/src/backend/database.py @@ -2283,14 +2283,16 @@ def get_loop(self, loop_id: str): def mark_loop_running(self, loop_id: str): return self._loop_ops.mark_loop_running(loop_id) - def update_loop_progress(self, loop_id: str, *, runs_completed: int, last_response): + def update_loop_progress(self, loop_id: str, *, runs_completed: int, last_response, failed_runs=None): return self._loop_ops.update_loop_progress( loop_id, runs_completed=runs_completed, last_response=last_response, + failed_runs=failed_runs, ) - def finalize_loop(self, loop_id: str, *, status: str, stop_reason: str, error=None): + def finalize_loop(self, loop_id: str, *, status: str, stop_reason: str, error=None, failed_runs=None): return self._loop_ops.finalize_loop( loop_id, status=status, stop_reason=stop_reason, error=error, + failed_runs=failed_runs, ) def list_loops_for_agent(self, agent_name: str, *, status=None, limit: int = 50): diff --git a/src/backend/db/loops.py b/src/backend/db/loops.py index b4aa5d88..96d09b3e 100644 --- a/src/backend/db/loops.py +++ b/src/backend/db/loops.py @@ -18,7 +18,11 @@ # Terminal statuses for restart-recovery and stop_loop logic. -TERMINAL_STATUSES = {"completed", "stopped", "failed", "interrupted"} +# `completed_with_errors` (#1167): continue-mode loop that ran to max_runs with +# at least one tolerated failed iteration. +TERMINAL_STATUSES = { + "completed", "completed_with_errors", "stopped", "failed", "interrupted", +} def _loop_row_to_dict(row) -> dict: @@ -30,10 +34,13 @@ def _loop_row_to_dict(row) -> dict: "stop_signal": row["stop_signal"], "delay_seconds": row["delay_seconds"], "timeout_per_run": row["timeout_per_run"], + "on_failure": row["on_failure"], + "max_consecutive_failures": row["max_consecutive_failures"], "model": row["model"], "allowed_tools": json.loads(row["allowed_tools"]) if row["allowed_tools"] else None, "status": row["status"], "runs_completed": row["runs_completed"], + "failed_runs": row["failed_runs"], "stop_reason": row["stop_reason"], "last_response": row["last_response"], "error": row["error"], @@ -78,6 +85,8 @@ def create_loop( stop_signal: Optional[str] = None, delay_seconds: int = 0, timeout_per_run: Optional[int] = None, + on_failure: str = "abort", + max_consecutive_failures: int = 3, model: Optional[str] = None, allowed_tools: Optional[List[str]] = None, started_by_user_id: Optional[int] = None, @@ -99,10 +108,13 @@ def create_loop( stop_signal=stop_signal, delay_seconds=delay_seconds, timeout_per_run=timeout_per_run, + on_failure=on_failure, + max_consecutive_failures=max_consecutive_failures, model=model, allowed_tools=allowed_tools_json, status="queued", runs_completed=0, + failed_runs=0, stop_reason=None, last_response=None, error=None, @@ -126,10 +138,13 @@ def create_loop( "stop_signal": stop_signal, "delay_seconds": delay_seconds, "timeout_per_run": timeout_per_run, + "on_failure": on_failure, + "max_consecutive_failures": max_consecutive_failures, "model": model, "allowed_tools": allowed_tools, "status": "queued", "runs_completed": 0, + "failed_runs": 0, "stop_reason": None, "last_response": None, "error": None, @@ -165,12 +180,22 @@ def update_loop_progress( *, runs_completed: int, last_response: Optional[str], + failed_runs: Optional[int] = None, ) -> None: - """Bump runs_completed + last_response after each iteration.""" + """Bump runs_completed + last_response after each iteration. + + `failed_runs` (#1167) is written only when provided, so the success + path can omit it. `last_response` carries the last *successful* response + even on a tolerated-failure iteration (continue mode), preserving + `{{previous_response}}` semantics. + """ + values: dict = {"runs_completed": runs_completed, "last_response": last_response} + if failed_runs is not None: + values["failed_runs"] = failed_runs stmt = ( update(agent_loops) .where(agent_loops.c.id == loop_id) - .values(runs_completed=runs_completed, last_response=last_response) + .values(**values) ) with get_engine().begin() as conn: conn.execute(stmt) @@ -182,19 +207,27 @@ def finalize_loop( status: str, stop_reason: str, error: Optional[str] = None, + failed_runs: Optional[int] = None, ) -> None: - """Set terminal status + stop_reason + completed_at.""" + """Set terminal status + stop_reason + completed_at. + + `failed_runs` (#1167) writes the authoritative tolerated-failure count + when provided. + """ if status not in TERMINAL_STATUSES: raise ValueError(f"finalize_loop requires terminal status, got '{status}'") + values: dict = { + "status": status, + "stop_reason": stop_reason, + "error": error, + "completed_at": utc_now_iso(), + } + if failed_runs is not None: + values["failed_runs"] = failed_runs stmt = ( update(agent_loops) .where(agent_loops.c.id == loop_id) - .values( - status=status, - stop_reason=stop_reason, - error=error, - completed_at=utc_now_iso(), - ) + .values(**values) ) with get_engine().begin() as conn: conn.execute(stmt) diff --git a/src/backend/db/migrations.py b/src/backend/db/migrations.py index ec0b3e98..a4d1aeb5 100644 --- a/src/backend/db/migrations.py +++ b/src/backend/db/migrations.py @@ -2407,6 +2407,28 @@ def _migrate_agent_loops_tables(cursor, conn): conn.commit() +def _migrate_agent_loops_failure_policy(cursor, conn): + """Add per-loop failure-policy columns to agent_loops (#1167). + + `on_failure` ('abort'|'continue', default 'abort' = current fail-fast + behavior), `max_consecutive_failures` (bounds continue mode), and a + `failed_runs` counter for the terminal summary. + """ + _safe_add_column( + cursor, "agent_loops", "on_failure", + "ALTER TABLE agent_loops ADD COLUMN on_failure TEXT NOT NULL DEFAULT 'abort'", + ) + _safe_add_column( + cursor, "agent_loops", "max_consecutive_failures", + "ALTER TABLE agent_loops ADD COLUMN max_consecutive_failures INTEGER NOT NULL DEFAULT 3", + ) + _safe_add_column( + cursor, "agent_loops", "failed_runs", + "ALTER TABLE agent_loops ADD COLUMN failed_runs INTEGER NOT NULL DEFAULT 0", + ) + conn.commit() + + def _migrate_users_suspended_at(cursor, conn): """#995 — user deactivation primitive. @@ -2564,4 +2586,5 @@ def _migrate_agent_compatibility_results_table(cursor, conn): ("operator_queue_cleared_at", _migrate_operator_queue_cleared_at), ("activities_created_index", _migrate_activities_created_index), ("agent_compatibility_results_table", _migrate_agent_compatibility_results_table), + ("agent_loops_failure_policy", _migrate_agent_loops_failure_policy), ] diff --git a/src/backend/db/schema.py b/src/backend/db/schema.py index 79257d60..ef8e4620 100644 --- a/src/backend/db/schema.py +++ b/src/backend/db/schema.py @@ -247,10 +247,13 @@ stop_signal TEXT, delay_seconds INTEGER NOT NULL DEFAULT 0, timeout_per_run INTEGER, + on_failure TEXT NOT NULL DEFAULT 'abort', + max_consecutive_failures INTEGER NOT NULL DEFAULT 3, model TEXT, allowed_tools TEXT, status TEXT NOT NULL, runs_completed INTEGER NOT NULL DEFAULT 0, + failed_runs INTEGER NOT NULL DEFAULT 0, stop_reason TEXT, last_response TEXT, error TEXT, diff --git a/src/backend/db/tables.py b/src/backend/db/tables.py index 2648473a..99e08c9c 100644 --- a/src/backend/db/tables.py +++ b/src/backend/db/tables.py @@ -232,10 +232,13 @@ def process_bind_param(self, value, dialect): Column("stop_signal", Text), Column("delay_seconds", Integer), Column("timeout_per_run", Integer), + Column("on_failure", Text), + Column("max_consecutive_failures", Integer), Column("model", Text), Column("allowed_tools", Text), Column("status", Text), Column("runs_completed", Integer), + Column("failed_runs", Integer), Column("stop_reason", Text), Column("last_response", Text), Column("error", Text), diff --git a/src/backend/routers/loops.py b/src/backend/routers/loops.py index 86f14cf4..cb365d34 100644 --- a/src/backend/routers/loops.py +++ b/src/backend/routers/loops.py @@ -8,7 +8,7 @@ """ import logging -from typing import List, Optional +from typing import List, Literal, Optional from fastapi import APIRouter, Depends, HTTPException, Header from pydantic import BaseModel, Field, field_validator @@ -36,6 +36,7 @@ MAX_DELAY_SECONDS = 3600 MAX_TIMEOUT_PER_RUN = 7200 MAX_STOP_SIGNAL_LEN = 200 +MAX_CONSECUTIVE_FAILURES_LIMIT = 100 class StartLoopRequest(BaseModel): @@ -44,6 +45,12 @@ class StartLoopRequest(BaseModel): stop_signal: Optional[str] = Field(default=None, max_length=MAX_STOP_SIGNAL_LEN) delay_seconds: int = Field(default=0, ge=0, le=MAX_DELAY_SECONDS) timeout_per_run: Optional[int] = Field(default=None, ge=10, le=MAX_TIMEOUT_PER_RUN) + # #1167: failure policy. 'abort' (default) = fail-fast, backward compatible; + # 'continue' tolerates failed iterations up to max_consecutive_failures. + on_failure: Literal["abort", "continue"] = "abort" + max_consecutive_failures: int = Field( + default=3, ge=1, le=MAX_CONSECUTIVE_FAILURES_LIMIT + ) model: Optional[str] = None allowed_tools: Optional[List[str]] = None @@ -61,6 +68,7 @@ class StartLoopResponse(BaseModel): status: str agent_name: str max_runs: int + on_failure: str = "abort" class LoopRunResponse(BaseModel): @@ -81,6 +89,9 @@ class LoopStatusResponse(BaseModel): status: str max_runs: int runs_completed: int + failed_runs: int = 0 + on_failure: str = "abort" + max_consecutive_failures: int = 3 stop_reason: Optional[str] = None last_response: Optional[str] = None error: Optional[str] = None @@ -126,6 +137,9 @@ def _build_status_response(loop: dict) -> LoopStatusResponse: status=loop["status"], max_runs=loop["max_runs"], runs_completed=loop["runs_completed"], + failed_runs=loop.get("failed_runs", 0) or 0, + on_failure=loop.get("on_failure") or "abort", + max_consecutive_failures=loop.get("max_consecutive_failures") or 3, stop_reason=loop["stop_reason"], last_response=loop["last_response"], error=loop["error"], @@ -170,6 +184,8 @@ async def start_loop( stop_signal=payload.stop_signal, delay_seconds=payload.delay_seconds, timeout_per_run=payload.timeout_per_run, + on_failure=payload.on_failure, + max_consecutive_failures=payload.max_consecutive_failures, model=payload.model, allowed_tools=payload.allowed_tools, started_by_user_id=current_user.id, @@ -183,6 +199,7 @@ async def start_loop( status=loop_row["status"], agent_name=name, max_runs=payload.max_runs, + on_failure=payload.on_failure, ) diff --git a/src/backend/services/loop_service.py b/src/backend/services/loop_service.py index 871f6afb..a29b3089 100644 --- a/src/backend/services/loop_service.py +++ b/src/backend/services/loop_service.py @@ -97,6 +97,8 @@ async def start_loop( stop_signal: Optional[str] = None, delay_seconds: int = 0, timeout_per_run: Optional[int] = None, + on_failure: str = "abort", + max_consecutive_failures: int = 3, model: Optional[str] = None, allowed_tools: Optional[list] = None, started_by_user_id: Optional[int] = None, @@ -116,6 +118,8 @@ async def start_loop( stop_signal=stop_signal, delay_seconds=delay_seconds, timeout_per_run=timeout_per_run, + on_failure=on_failure, + max_consecutive_failures=max_consecutive_failures, model=model, allowed_tools=allowed_tools, started_by_user_id=started_by_user_id, @@ -186,6 +190,29 @@ async def _run(self, loop_id: str) -> None: stop_reason = "max_runs_reached" terminal_error: Optional[str] = None + # #1167: per-loop failure policy. `abort` (default) keeps the original + # fail-fast behavior; `continue` tolerates a failed iteration and moves + # on, bounded by max_consecutive_failures so a fully-broken agent still + # terminates. `previous_response` only advances on success, so + # {{previous_response}} always carries the last *successful* response. + on_failure = loop.get("on_failure") or "abort" + max_consecutive_failures = loop.get("max_consecutive_failures") or 3 + failed_runs = 0 + consecutive_failures = 0 + + def _abort_after_failure(err_msg: str): + """Decide whether a failed iteration ends the loop. Returns + (terminal_status, stop_reason, error) to abort, or None to continue.""" + if on_failure != "continue": + return ("failed", "error", err_msg) + if consecutive_failures >= max_consecutive_failures: + return ( + "failed", + "max_consecutive_failures", + f"{err_msg} (reached {max_consecutive_failures} consecutive failures)", + ) + return None + try: for run_number in range(1, loop["max_runs"] + 1): # Cooperative stop check BEFORE starting the next iteration. @@ -230,13 +257,29 @@ async def _run(self, loop_id: str) -> None: cost=None, duration_ms=elapsed_ms, ) - terminal_status = "failed" - stop_reason = "error" - terminal_error = f"Iteration {run_number}: {exc}" - logger.exception( - f"[Loop] {loop_id} iteration {run_number} raised; aborting loop" + failed_runs += 1 + consecutive_failures += 1 + runs_completed = run_number + db.update_loop_progress( + loop_id, + runs_completed=runs_completed, + last_response=previous_response, # keep last successful + failed_runs=failed_runs, ) - break + err_msg = f"Iteration {run_number}: {exc}" + decision = _abort_after_failure(err_msg) + if decision is not None: + terminal_status, stop_reason, terminal_error = decision + logger.exception( + f"[Loop] {loop_id} iteration {run_number} raised; aborting loop" + ) + break + logger.warning( + f"[Loop] {loop_id} iteration {run_number} raised; " + f"continuing (on_failure=continue, " + f"{consecutive_failures}/{max_consecutive_failures} consecutive)" + ) + continue elapsed_ms = int( (datetime.utcnow() - run_start).total_seconds() * 1000 @@ -253,6 +296,7 @@ async def _run(self, loop_id: str) -> None: execution_id=result.execution_id, ) previous_response = result.response + consecutive_failures = 0 # #1167: success breaks the streak runs_completed = run_number db.update_loop_progress( loop_id, @@ -285,18 +329,28 @@ async def _run(self, loop_id: str) -> None: duration_ms=elapsed_ms, execution_id=result.execution_id, ) + failed_runs += 1 + consecutive_failures += 1 runs_completed = run_number db.update_loop_progress( loop_id, runs_completed=runs_completed, - last_response=result.response, + last_response=previous_response, # keep last successful + failed_runs=failed_runs, ) - terminal_status = "failed" - stop_reason = "error" - terminal_error = ( + err_msg = ( f"Iteration {run_number}: {result.error or 'task failed'}" ) - break + decision = _abort_after_failure(err_msg) + if decision is not None: + terminal_status, stop_reason, terminal_error = decision + break + logger.warning( + f"[Loop] {loop_id} iteration {run_number} failed " + f"({result.error_code or 'AGENT_ERROR'}); continuing " + f"({consecutive_failures}/{max_consecutive_failures} consecutive)" + ) + # fall through to the inter-run delay, then next iteration # Inter-run delay — also a stop point. if loop["delay_seconds"] and run_number < loop["max_runs"]: @@ -306,12 +360,19 @@ async def _run(self, loop_id: str) -> None: terminal_status = "stopped" stop_reason = "user_stopped" break + + # #1167: ran to max_runs (or stop-signal matched) in continue mode + # with tolerated failures → surface partial success. Only promotes + # the natural-completion path; stop/abort already set their status. + if terminal_status == "completed" and failed_runs > 0: + terminal_status = "completed_with_errors" finally: db.finalize_loop( loop_id, status=terminal_status, stop_reason=stop_reason, error=terminal_error, + failed_runs=failed_runs, ) await _broadcast({ "type": "loop_completed", @@ -320,6 +381,7 @@ async def _run(self, loop_id: str) -> None: "status": terminal_status, "stop_reason": stop_reason, "runs_completed": runs_completed, + "failed_runs": failed_runs, "timestamp": datetime.utcnow().isoformat() + "Z", }) async with self._lock: diff --git a/src/frontend/src/components/LoopsPanel.vue b/src/frontend/src/components/LoopsPanel.vue index 29ff9801..963113b9 100644 --- a/src/frontend/src/components/LoopsPanel.vue +++ b/src/frontend/src/components/LoopsPanel.vue @@ -89,6 +89,31 @@ class="w-full px-3 py-2 border border-gray-300 dark:border-gray-600 dark:bg-gray-700 dark:text-white rounded-md focus:outline-none focus:ring-2 focus:ring-action-primary-500" /> + + +
+ + +
+ + +
+ + +

Abort the loop as failed after this many failures in a row.

+
@@ -188,11 +213,14 @@ :class="statusBadgeClass(loop.status)" > - {{ loop.status }} + {{ (loop.status || '').replace(/_/g, ' ') }} Run {{ loop.runs_completed }} / {{ loop.max_runs }} + + · {{ loop.failed_runs }} failed + · {{ formatStopReason(loop.stop_reason) }} @@ -301,6 +329,8 @@ const defaultForm = () => ({ stop_signal: '', delay_seconds: 0, timeout_per_run: null, + on_failure: 'abort', + max_consecutive_failures: 3, model: '', allowed_tools: null, }) @@ -354,6 +384,10 @@ async function submit() { if (form.stop_signal && form.stop_signal.trim()) payload.stop_signal = form.stop_signal.trim() if (form.delay_seconds) payload.delay_seconds = form.delay_seconds if (form.timeout_per_run) payload.timeout_per_run = form.timeout_per_run + if (form.on_failure === 'continue') { + payload.on_failure = 'continue' + payload.max_consecutive_failures = form.max_consecutive_failures + } if (form.model) payload.model = form.model if (form.allowed_tools !== null) payload.allowed_tools = form.allowed_tools @@ -376,6 +410,8 @@ function statusBadgeClass(status) { return 'bg-status-warning-100 dark:bg-status-warning-900/30 text-status-warning-800 dark:text-status-warning-300' case 'completed': return 'bg-status-success-100 dark:bg-status-success-900/30 text-status-success-800 dark:text-status-success-300' + case 'completed_with_errors': + return 'bg-status-warning-100 dark:bg-status-warning-900/30 text-status-warning-800 dark:text-status-warning-300' case 'failed': return 'bg-status-danger-100 dark:bg-status-danger-900/30 text-status-danger-800 dark:text-status-danger-300' case 'stopped': @@ -398,6 +434,7 @@ function formatStopReason(reason) { stop_signal_matched: 'stop signal matched', user_stopped: 'stopped by user', error: 'error', + max_consecutive_failures: 'too many consecutive failures', interrupted: 'interrupted', } return map[reason] || reason diff --git a/src/mcp-server/src/client.ts b/src/mcp-server/src/client.ts index 9c8b7b22..081bcd41 100644 --- a/src/mcp-server/src/client.ts +++ b/src/mcp-server/src/client.ts @@ -1815,6 +1815,8 @@ export class TrinityClient { stop_signal?: string; delay_seconds?: number; timeout_per_run?: number; + on_failure?: "abort" | "continue"; + max_consecutive_failures?: number; model?: string; allowed_tools?: string[]; } @@ -1823,6 +1825,7 @@ export class TrinityClient { status: string; agent_name: string; max_runs: number; + on_failure?: string; }> { return this.request( "POST", diff --git a/src/mcp-server/src/tools/loops.ts b/src/mcp-server/src/tools/loops.ts index 8b5c8cae..388e99db 100644 --- a/src/mcp-server/src/tools/loops.ts +++ b/src/mcp-server/src/tools/loops.ts @@ -108,6 +108,22 @@ export function createLoopTools( .describe( "Per-iteration timeout in seconds (defaults to agent's configured execution_timeout_seconds)." ), + on_failure: z + .enum(["abort", "continue"]) + .optional() + .describe( + "Failure policy (default 'abort' = stop the whole loop on the first failed iteration). " + + "'continue' tolerates a failed iteration and proceeds to the next, bounded by max_consecutive_failures." + ), + max_consecutive_failures: z + .number() + .int() + .min(1) + .max(100) + .optional() + .describe( + "In 'continue' mode, abort the loop as failed after this many consecutive failed iterations (default 3)." + ), model: z .string() .optional() @@ -125,6 +141,8 @@ export function createLoopTools( stop_signal?: string; delay_seconds?: number; timeout_per_run?: number; + on_failure?: "abort" | "continue"; + max_consecutive_failures?: number; model?: string; allowed_tools?: string[]; }, @@ -146,6 +164,8 @@ export function createLoopTools( stop_signal: params.stop_signal, delay_seconds: params.delay_seconds, timeout_per_run: params.timeout_per_run, + on_failure: params.on_failure, + max_consecutive_failures: params.max_consecutive_failures, model: params.model, allowed_tools: params.allowed_tools, }); diff --git a/tests/unit/test_loop_service.py b/tests/unit/test_loop_service.py index 76afda4a..bc5b9714 100644 --- a/tests/unit/test_loop_service.py +++ b/tests/unit/test_loop_service.py @@ -122,15 +122,19 @@ def mark_loop_running(self, loop_id: str): self.loops[loop_id]["status"] = "running" self.loops[loop_id]["started_at"] = "now" - def update_loop_progress(self, loop_id: str, *, runs_completed: int, last_response): + def update_loop_progress(self, loop_id: str, *, runs_completed: int, last_response, failed_runs=None): self.loops[loop_id]["runs_completed"] = runs_completed self.loops[loop_id]["last_response"] = last_response + if failed_runs is not None: + self.loops[loop_id]["failed_runs"] = failed_runs - def finalize_loop(self, loop_id: str, *, status: str, stop_reason: str, error=None): + def finalize_loop(self, loop_id: str, *, status: str, stop_reason: str, error=None, failed_runs=None): self.loops[loop_id]["status"] = status self.loops[loop_id]["stop_reason"] = stop_reason self.loops[loop_id]["error"] = error self.loops[loop_id]["completed_at"] = "now" + if failed_runs is not None: + self.loops[loop_id]["failed_runs"] = failed_runs def list_non_terminal_loops(self): return [ @@ -194,6 +198,9 @@ async def execute_task(self, **kwargs): self.calls.append(kwargs) result = self.results[self._idx] if self._idx < len(self.results) else _Result() self._idx += 1 + # A scripted Exception models the raised-exception failure surface. + if isinstance(result, BaseException): + raise result return result @@ -282,6 +289,125 @@ async def go(): assert ts.calls[0]["loop_id"] == loop_id +# --------------------------------------------------------------------------- +# Runner — failure policy (#1167) +# --------------------------------------------------------------------------- + +def _drive(ls, **start_kwargs): + """Start a loop, await its background task, return its id.""" + async def go(): + service = ls.LoopService() + row = await service.start_loop(**start_kwargs) + handle = service._handles.get(row["id"]) + if handle is not None: + await handle.task + return row["id"] + return _run(go()) + + +class TestFailurePolicy: + def test_abort_mode_default_stops_on_first_failure(self, loop_module): + """Default on_failure='abort' preserves fail-fast behavior.""" + ls, db, ts = loop_module + ts.results = [ + _Result(response="r1"), + _Result(status="failed", error="boom", error_code="AGENT_ERROR"), + _Result(response="r3"), + ] + loop_id = _drive( + ls, agent_name="a1", message_template="step {{run}}", max_runs=3, + ) + loop = db.get_loop(loop_id) + assert loop["status"] == "failed" + assert loop["stop_reason"] == "error" + assert len(ts.calls) == 2 # stopped after the failed run, never ran #3 + assert loop["failed_runs"] == 1 + + def test_continue_mode_proceeds_past_failure(self, loop_module): + """on_failure='continue' tolerates a failed run and finishes max_runs.""" + ls, db, ts = loop_module + ts.results = [ + _Result(response="r1"), + _Result(status="failed", error="boom", error_code="TIMEOUT"), + _Result(response="r3"), + ] + loop_id = _drive( + ls, + agent_name="a1", + message_template="step {{run}} prev={{previous_response}}", + max_runs=3, + on_failure="continue", + max_consecutive_failures=3, + ) + loop = db.get_loop(loop_id) + assert len(ts.calls) == 3 # all three ran + assert loop["status"] == "completed_with_errors" + assert loop["stop_reason"] == "max_runs_reached" + assert loop["failed_runs"] == 1 + assert loop["runs_completed"] == 3 + # {{previous_response}} carries the last *successful* response (r1), + # NOT the failed run-2 response. + assert ts.calls[2]["message"] == "step 3 prev=r1" + + def test_continue_mode_consecutive_cutoff_aborts(self, loop_module): + """Continue mode still terminates once consecutive failures hit the cap.""" + ls, db, ts = loop_module + ts.results = [ + _Result(status="failed", error=f"boom{i}", error_code="AUTH") + for i in range(5) + ] + loop_id = _drive( + ls, + agent_name="a1", + message_template="step {{run}}", + max_runs=5, + on_failure="continue", + max_consecutive_failures=2, + ) + loop = db.get_loop(loop_id) + assert len(ts.calls) == 2 # aborted at the 2nd consecutive failure + assert loop["status"] == "failed" + assert loop["stop_reason"] == "max_consecutive_failures" + assert loop["failed_runs"] == 2 + + def test_continue_mode_tolerates_raised_exception(self, loop_module): + """The raised-exception surface is honored by continue mode too.""" + ls, db, ts = loop_module + ts.results = [ + _Result(response="r1"), + RuntimeError("kaboom"), # raised inside execute_task + _Result(response="r3"), + ] + loop_id = _drive( + ls, agent_name="a1", message_template="step {{run}}", max_runs=3, + on_failure="continue", max_consecutive_failures=3, + ) + loop = db.get_loop(loop_id) + assert len(ts.calls) == 3 + assert loop["status"] == "completed_with_errors" + assert loop["failed_runs"] == 1 + + def test_continue_mode_resets_streak_on_success(self, loop_module): + """A success resets the consecutive-failure counter (alternating runs).""" + ls, db, ts = loop_module + ts.results = [ + _Result(status="failed", error="f1", error_code="AGENT_ERROR"), + _Result(response="ok2"), + _Result(status="failed", error="f3", error_code="AGENT_ERROR"), + _Result(response="ok4"), + _Result(status="failed", error="f5", error_code="AGENT_ERROR"), + ] + loop_id = _drive( + ls, agent_name="a1", message_template="step {{run}}", max_runs=5, + on_failure="continue", max_consecutive_failures=2, + ) + loop = db.get_loop(loop_id) + # Never 2 in a row → runs all 5, completes with errors. + assert len(ts.calls) == 5 + assert loop["status"] == "completed_with_errors" + assert loop["failed_runs"] == 3 + + # --------------------------------------------------------------------------- # Runner — until mode # --------------------------------------------------------------------------- From ed9dce275683c5a804423356621e95e989119a11 Mon Sep 17 00:00:00 2001 From: Oleksii Dolhov Date: Wed, 24 Jun 2026 15:30:39 +0300 Subject: [PATCH 2/2] fix(loops): honor inter-run delay on the exception failure surface (#1167 review) Continue mode skipped delay_seconds when an iteration *raised* (the exception path `continue`d past the delay block), while a non-success result honored it. Extract the inter-run delay (with its cooperative-stop check) into a helper and call it on both surfaces so continue-mode pacing is consistent. Add a test asserting the delay fires after a raised iteration. --- src/backend/services/loop_service.py | 30 ++++++++++++++++++++-------- tests/unit/test_loop_service.py | 23 +++++++++++++++++++++ 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/src/backend/services/loop_service.py b/src/backend/services/loop_service.py index a29b3089..87086685 100644 --- a/src/backend/services/loop_service.py +++ b/src/backend/services/loop_service.py @@ -213,6 +213,18 @@ def _abort_after_failure(err_msg: str): ) return None + async def _inter_run_delay(run_number: int) -> bool: + """Apply the inter-run pause. Returns True if the loop was stopped + (sleep cancelled) so the caller can break. Honored on every path + that proceeds to the next iteration — success AND tolerated failure + (#1167), so continue-mode pacing is consistent across surfaces.""" + if loop["delay_seconds"] and run_number < loop["max_runs"]: + try: + await asyncio.sleep(loop["delay_seconds"]) + except asyncio.CancelledError: + return True + return False + try: for run_number in range(1, loop["max_runs"] + 1): # Cooperative stop check BEFORE starting the next iteration. @@ -279,6 +291,10 @@ def _abort_after_failure(err_msg: str): f"continuing (on_failure=continue, " f"{consecutive_failures}/{max_consecutive_failures} consecutive)" ) + if await _inter_run_delay(run_number): + terminal_status = "stopped" + stop_reason = "user_stopped" + break continue elapsed_ms = int( @@ -352,14 +368,12 @@ def _abort_after_failure(err_msg: str): ) # fall through to the inter-run delay, then next iteration - # Inter-run delay — also a stop point. - if loop["delay_seconds"] and run_number < loop["max_runs"]: - try: - await asyncio.sleep(loop["delay_seconds"]) - except asyncio.CancelledError: - terminal_status = "stopped" - stop_reason = "user_stopped" - break + # Inter-run delay — also a stop point (success + tolerated-failure + # paths; the exception path applies it inline before `continue`). + if await _inter_run_delay(run_number): + terminal_status = "stopped" + stop_reason = "user_stopped" + break # #1167: ran to max_runs (or stop-signal matched) in continue mode # with tolerated failures → surface partial success. Only promotes diff --git a/tests/unit/test_loop_service.py b/tests/unit/test_loop_service.py index bc5b9714..7228f610 100644 --- a/tests/unit/test_loop_service.py +++ b/tests/unit/test_loop_service.py @@ -387,6 +387,29 @@ def test_continue_mode_tolerates_raised_exception(self, loop_module): assert loop["status"] == "completed_with_errors" assert loop["failed_runs"] == 1 + def test_continue_mode_applies_delay_after_raised_exception(self, loop_module, monkeypatch): + """The exception surface honors delay_seconds too (surface parity).""" + ls, db, ts = loop_module + sleeps: list = [] + + async def _fake_sleep(secs): + sleeps.append(secs) + + monkeypatch.setattr(ls.asyncio, "sleep", _fake_sleep) + ts.results = [ + RuntimeError("boom"), # run 1 raises → delay should still apply + _Result(response="ok2"), # run 2 (last) → no trailing delay + ] + loop_id = _drive( + ls, agent_name="a1", message_template="step {{run}}", max_runs=2, + on_failure="continue", max_consecutive_failures=3, delay_seconds=5, + ) + loop = db.get_loop(loop_id) + assert len(ts.calls) == 2 + assert loop["status"] == "completed_with_errors" + # Delay applied once — after the raised run 1 (run 2 is last, no delay). + assert sleeps == [5] + def test_continue_mode_resets_streak_on_success(self, loop_module): """A success resets the consecutive-failure counter (alternating runs).""" ls, db, ts = loop_module