Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions docs/memory/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,8 @@ Backend orchestration in `services/subscription_auto_switch.py`: `_hot_reload_su

Bounded sequential task execution against one agent. Runner is an in-process `asyncio.Task` spawned by `loop_service.py`; each iteration dispatches through `task_execution_service.execute_task()` with `triggered_by="loop"` and the parent `loop_id` carried on the resulting `schedule_executions` row — iterations go through the standard `capacity_manager` admit/slot path, sharing the agent's `max_parallel_tasks` budget. Message template supports `{{run}}` and `{{previous_response}}`; `max_runs` 1–100 hard cap; optional `stop_signal` (until-mode), `delay_seconds`, `timeout_per_run`, `model`, `allowed_tools`. Stop is cooperative: `POST /api/loops/{id}/stop` flips an in-process `should_stop` flag; the current iteration finishes and the runner exits with `stop_reason="user_stopped"`. Restart recovery via the cleanup-service startup hook (above); no auto-resume. WS events `loop_run_completed`/`loop_completed`.

**Failure policy (#1167):** per-loop `on_failure` — `abort` (default; fail-fast, first failed iteration ends the loop `failed`/`stop_reason=error`) or `continue` (tolerate a failed iteration and proceed). Both failure surfaces are gated: a raised exception from `execute_task` and a non-success `TaskExecutionResult`. Continue mode is bounded by `max_consecutive_failures` (default 3) — once that many iterations fail in a row the loop aborts `failed`/`stop_reason=max_consecutive_failures`; a success resets the streak. A continue-mode loop that reaches `max_runs` (or matches its stop-signal) with ≥1 tolerated failure finalizes as `completed_with_errors`, with the `failed_runs` count surfaced. `{{previous_response}}` always carries the last *successful* response (a failed iteration never overwrites it).

**Web UI (#1106):** a **Loops** tab on Agent Detail (`components/LoopsPanel.vue` + agent-scoped `stores/loops.js`; `setAgent(name)` on mount, `clear()` on unmount). The global WS handler routes the fleet-wide loop events to the store, which filters by mounted agent and targeted-refreshes only the affected loop; a 12s backstop poll runs while any loop is `queued`/`running` to recover a missed terminal event. Last full response rendered via `utils/markdown.js` (DOMPurify).

### Session Tab
Expand Down Expand Up @@ -741,7 +743,7 @@ Coverage: agent lifecycle, auth, sharing, credentials, settings, rename; request
### Sequential Agent Loops (#740)
| Method | Path | Auth | Description |
|--------|------|------|-------------|
| POST | `/api/agents/{name}/loops` | JWT/MCP | Start loop; 202 with `{loop_id, status, agent_name, max_runs}`. Body: `message` (template), `max_runs` (1–100, required), `stop_signal`, `delay_seconds`, `timeout_per_run`, `model`, `allowed_tools` |
| POST | `/api/agents/{name}/loops` | JWT/MCP | Start loop; 202 with `{loop_id, status, agent_name, max_runs}`. Body: `message` (template), `max_runs` (1–100, required), `stop_signal`, `delay_seconds`, `timeout_per_run`, `on_failure` (`abort` default \| `continue`, #1167), `max_consecutive_failures` (continue-mode cutoff, default 3), `model`, `allowed_tools` |
| GET | `/api/agents/{name}/loops` | JWT/MCP | List loops (`?status=`, `?limit=` 1–200 default 50) |
| GET | `/api/loops/{loop_id}` | JWT/MCP | Status + per-run summaries + last full response; 404 unknown, 403 if caller neither initiator nor agent-accessor |
| POST | `/api/loops/{loop_id}/stop` | JWT/MCP | Graceful stop → `{status: "stopping" \| "already_done"}` |
Expand Down Expand Up @@ -999,11 +1001,14 @@ CREATE TABLE agent_loops (
stop_signal TEXT, -- NULL = fixed mode; set = until mode
delay_seconds INTEGER NOT NULL DEFAULT 0,
timeout_per_run INTEGER, -- NULL = agent's execution_timeout_seconds
on_failure TEXT NOT NULL DEFAULT 'abort', -- #1167: abort (fail-fast) | continue (tolerate failed iterations)
max_consecutive_failures INTEGER NOT NULL DEFAULT 3, -- #1167: continue-mode cutoff (1–100)
model TEXT,
allowed_tools TEXT, -- JSON array
status TEXT NOT NULL, -- queued | running | completed | stopped | failed | interrupted
status TEXT NOT NULL, -- queued | running | completed | completed_with_errors | stopped | failed | interrupted
runs_completed INTEGER NOT NULL DEFAULT 0,
stop_reason TEXT, -- max_runs_reached | stop_signal_matched | user_stopped | error | interrupted
failed_runs INTEGER NOT NULL DEFAULT 0, -- #1167: tolerated-failure count (continue mode)
stop_reason TEXT, -- max_runs_reached | stop_signal_matched | user_stopped | error | max_consecutive_failures | interrupted
last_response TEXT,
error TEXT,
started_by_user_id INTEGER,
Expand Down
25 changes: 25 additions & 0 deletions docs/memory/requirements.md
Original file line number Diff line number Diff line change
Expand Up @@ -2747,6 +2747,31 @@ Standalone mobile-friendly admin page for managing agents on the go. Designed as
auto-resume after restart; cross-agent loops (`agent` parameter
is `"self"` only for v1, matching `fan_out`).

### 38.2 Configurable Loop Failure Policy (#1167)

**Description**: A per-loop policy controls what happens when an iteration
fails. Default is fail-fast (backward compatible); `continue` mode tolerates a
failed iteration and proceeds, bounded so a fully-broken agent still terminates.

- **FR-1 — `on_failure`**: `abort` (default — first failed iteration ends the
loop as `failed`/`stop_reason=error`, current behavior) or `continue`.
- **FR-2 — `max_consecutive_failures`** (default 3, range 1–100): in `continue`
mode the loop aborts as `failed` with `stop_reason=max_consecutive_failures`
once this many iterations fail in a row; a success resets the streak.
- **FR-3 — Both failure surfaces** honored: a raised exception from
`execute_task` AND a non-success `TaskExecutionResult` (TIMEOUT / AGENT_ERROR
/ CIRCUIT_OPEN / AUTH). Each failed iteration finalizes its `agent_loop_runs`
row as `failed`, then (continue mode) the loop proceeds to the next run.
- **FR-4 — Terminal status**: a continue-mode loop that reaches `max_runs` (or
matches its stop-signal) with ≥1 tolerated failure finalizes as
`completed_with_errors`; the `failed_runs` count is surfaced on the loop row
and API/UI.
- **FR-5 — `{{previous_response}}`**: carries the last *successful* response — a
failed iteration does not overwrite it.
- **FR-6 — Plumbed through all surfaces** (Invariant #13): `agent_loops` schema
+ migration, `POST /api/agents/{name}/loops`, MCP `run_agent_loop`, and the
Loops panel UI. Unset = `abort`, a strict no-op for existing callers.

---

## 39. VoIP Telephony (VOIP-001)
Expand Down
6 changes: 4 additions & 2 deletions src/backend/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2283,14 +2283,16 @@ def get_loop(self, loop_id: str):
def mark_loop_running(self, loop_id: str):
return self._loop_ops.mark_loop_running(loop_id)

def update_loop_progress(self, loop_id: str, *, runs_completed: int, last_response):
def update_loop_progress(self, loop_id: str, *, runs_completed: int, last_response, failed_runs=None):
return self._loop_ops.update_loop_progress(
loop_id, runs_completed=runs_completed, last_response=last_response,
failed_runs=failed_runs,
)

def finalize_loop(self, loop_id: str, *, status: str, stop_reason: str, error=None):
def finalize_loop(self, loop_id: str, *, status: str, stop_reason: str, error=None, failed_runs=None):
return self._loop_ops.finalize_loop(
loop_id, status=status, stop_reason=stop_reason, error=error,
failed_runs=failed_runs,
)

def list_loops_for_agent(self, agent_name: str, *, status=None, limit: int = 50):
Expand Down
53 changes: 43 additions & 10 deletions src/backend/db/loops.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@


# Terminal statuses for restart-recovery and stop_loop logic.
TERMINAL_STATUSES = {"completed", "stopped", "failed", "interrupted"}
# `completed_with_errors` (#1167): continue-mode loop that ran to max_runs with
# at least one tolerated failed iteration.
TERMINAL_STATUSES = {
"completed", "completed_with_errors", "stopped", "failed", "interrupted",
}


def _loop_row_to_dict(row) -> dict:
Expand All @@ -30,10 +34,13 @@ def _loop_row_to_dict(row) -> dict:
"stop_signal": row["stop_signal"],
"delay_seconds": row["delay_seconds"],
"timeout_per_run": row["timeout_per_run"],
"on_failure": row["on_failure"],
"max_consecutive_failures": row["max_consecutive_failures"],
"model": row["model"],
"allowed_tools": json.loads(row["allowed_tools"]) if row["allowed_tools"] else None,
"status": row["status"],
"runs_completed": row["runs_completed"],
"failed_runs": row["failed_runs"],
"stop_reason": row["stop_reason"],
"last_response": row["last_response"],
"error": row["error"],
Expand Down Expand Up @@ -78,6 +85,8 @@ def create_loop(
stop_signal: Optional[str] = None,
delay_seconds: int = 0,
timeout_per_run: Optional[int] = None,
on_failure: str = "abort",
max_consecutive_failures: int = 3,
model: Optional[str] = None,
allowed_tools: Optional[List[str]] = None,
started_by_user_id: Optional[int] = None,
Expand All @@ -99,10 +108,13 @@ def create_loop(
stop_signal=stop_signal,
delay_seconds=delay_seconds,
timeout_per_run=timeout_per_run,
on_failure=on_failure,
max_consecutive_failures=max_consecutive_failures,
model=model,
allowed_tools=allowed_tools_json,
status="queued",
runs_completed=0,
failed_runs=0,
stop_reason=None,
last_response=None,
error=None,
Expand All @@ -126,10 +138,13 @@ def create_loop(
"stop_signal": stop_signal,
"delay_seconds": delay_seconds,
"timeout_per_run": timeout_per_run,
"on_failure": on_failure,
"max_consecutive_failures": max_consecutive_failures,
"model": model,
"allowed_tools": allowed_tools,
"status": "queued",
"runs_completed": 0,
"failed_runs": 0,
"stop_reason": None,
"last_response": None,
"error": None,
Expand Down Expand Up @@ -165,12 +180,22 @@ def update_loop_progress(
*,
runs_completed: int,
last_response: Optional[str],
failed_runs: Optional[int] = None,
) -> None:
"""Bump runs_completed + last_response after each iteration."""
"""Bump runs_completed + last_response after each iteration.

`failed_runs` (#1167) is written only when provided, so the success
path can omit it. `last_response` carries the last *successful* response
even on a tolerated-failure iteration (continue mode), preserving
`{{previous_response}}` semantics.
"""
values: dict = {"runs_completed": runs_completed, "last_response": last_response}
if failed_runs is not None:
values["failed_runs"] = failed_runs
stmt = (
update(agent_loops)
.where(agent_loops.c.id == loop_id)
.values(runs_completed=runs_completed, last_response=last_response)
.values(**values)
)
with get_engine().begin() as conn:
conn.execute(stmt)
Expand All @@ -182,19 +207,27 @@ def finalize_loop(
status: str,
stop_reason: str,
error: Optional[str] = None,
failed_runs: Optional[int] = None,
) -> None:
"""Set terminal status + stop_reason + completed_at."""
"""Set terminal status + stop_reason + completed_at.

`failed_runs` (#1167) writes the authoritative tolerated-failure count
when provided.
"""
if status not in TERMINAL_STATUSES:
raise ValueError(f"finalize_loop requires terminal status, got '{status}'")
values: dict = {
"status": status,
"stop_reason": stop_reason,
"error": error,
"completed_at": utc_now_iso(),
}
if failed_runs is not None:
values["failed_runs"] = failed_runs
stmt = (
update(agent_loops)
.where(agent_loops.c.id == loop_id)
.values(
status=status,
stop_reason=stop_reason,
error=error,
completed_at=utc_now_iso(),
)
.values(**values)
)
with get_engine().begin() as conn:
conn.execute(stmt)
Expand Down
23 changes: 23 additions & 0 deletions src/backend/db/migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2407,6 +2407,28 @@ def _migrate_agent_loops_tables(cursor, conn):
conn.commit()


def _migrate_agent_loops_failure_policy(cursor, conn):
"""Add per-loop failure-policy columns to agent_loops (#1167).

`on_failure` ('abort'|'continue', default 'abort' = current fail-fast
behavior), `max_consecutive_failures` (bounds continue mode), and a
`failed_runs` counter for the terminal summary.
"""
_safe_add_column(
cursor, "agent_loops", "on_failure",
"ALTER TABLE agent_loops ADD COLUMN on_failure TEXT NOT NULL DEFAULT 'abort'",
)
_safe_add_column(
cursor, "agent_loops", "max_consecutive_failures",
"ALTER TABLE agent_loops ADD COLUMN max_consecutive_failures INTEGER NOT NULL DEFAULT 3",
)
_safe_add_column(
cursor, "agent_loops", "failed_runs",
"ALTER TABLE agent_loops ADD COLUMN failed_runs INTEGER NOT NULL DEFAULT 0",
)
conn.commit()


def _migrate_users_suspended_at(cursor, conn):
"""#995 — user deactivation primitive.

Expand Down Expand Up @@ -2564,4 +2586,5 @@ def _migrate_agent_compatibility_results_table(cursor, conn):
("operator_queue_cleared_at", _migrate_operator_queue_cleared_at),
("activities_created_index", _migrate_activities_created_index),
("agent_compatibility_results_table", _migrate_agent_compatibility_results_table),
("agent_loops_failure_policy", _migrate_agent_loops_failure_policy),
]
3 changes: 3 additions & 0 deletions src/backend/db/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,13 @@
stop_signal TEXT,
delay_seconds INTEGER NOT NULL DEFAULT 0,
timeout_per_run INTEGER,
on_failure TEXT NOT NULL DEFAULT 'abort',
max_consecutive_failures INTEGER NOT NULL DEFAULT 3,
model TEXT,
allowed_tools TEXT,
status TEXT NOT NULL,
runs_completed INTEGER NOT NULL DEFAULT 0,
failed_runs INTEGER NOT NULL DEFAULT 0,
stop_reason TEXT,
last_response TEXT,
error TEXT,
Expand Down
3 changes: 3 additions & 0 deletions src/backend/db/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,13 @@ def process_bind_param(self, value, dialect):
Column("stop_signal", Text),
Column("delay_seconds", Integer),
Column("timeout_per_run", Integer),
Column("on_failure", Text),
Column("max_consecutive_failures", Integer),
Column("model", Text),
Column("allowed_tools", Text),
Column("status", Text),
Column("runs_completed", Integer),
Column("failed_runs", Integer),
Column("stop_reason", Text),
Column("last_response", Text),
Column("error", Text),
Expand Down
19 changes: 18 additions & 1 deletion src/backend/routers/loops.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"""

import logging
from typing import List, Optional
from typing import List, Literal, Optional

from fastapi import APIRouter, Depends, HTTPException, Header
from pydantic import BaseModel, Field, field_validator
Expand Down Expand Up @@ -36,6 +36,7 @@
MAX_DELAY_SECONDS = 3600
MAX_TIMEOUT_PER_RUN = 7200
MAX_STOP_SIGNAL_LEN = 200
MAX_CONSECUTIVE_FAILURES_LIMIT = 100


class StartLoopRequest(BaseModel):
Expand All @@ -44,6 +45,12 @@ class StartLoopRequest(BaseModel):
stop_signal: Optional[str] = Field(default=None, max_length=MAX_STOP_SIGNAL_LEN)
delay_seconds: int = Field(default=0, ge=0, le=MAX_DELAY_SECONDS)
timeout_per_run: Optional[int] = Field(default=None, ge=10, le=MAX_TIMEOUT_PER_RUN)
# #1167: failure policy. 'abort' (default) = fail-fast, backward compatible;
# 'continue' tolerates failed iterations up to max_consecutive_failures.
on_failure: Literal["abort", "continue"] = "abort"
max_consecutive_failures: int = Field(
default=3, ge=1, le=MAX_CONSECUTIVE_FAILURES_LIMIT
)
model: Optional[str] = None
allowed_tools: Optional[List[str]] = None

Expand All @@ -61,6 +68,7 @@ class StartLoopResponse(BaseModel):
status: str
agent_name: str
max_runs: int
on_failure: str = "abort"


class LoopRunResponse(BaseModel):
Expand All @@ -81,6 +89,9 @@ class LoopStatusResponse(BaseModel):
status: str
max_runs: int
runs_completed: int
failed_runs: int = 0
on_failure: str = "abort"
max_consecutive_failures: int = 3
stop_reason: Optional[str] = None
last_response: Optional[str] = None
error: Optional[str] = None
Expand Down Expand Up @@ -126,6 +137,9 @@ def _build_status_response(loop: dict) -> LoopStatusResponse:
status=loop["status"],
max_runs=loop["max_runs"],
runs_completed=loop["runs_completed"],
failed_runs=loop.get("failed_runs", 0) or 0,
on_failure=loop.get("on_failure") or "abort",
max_consecutive_failures=loop.get("max_consecutive_failures") or 3,
stop_reason=loop["stop_reason"],
last_response=loop["last_response"],
error=loop["error"],
Expand Down Expand Up @@ -170,6 +184,8 @@ async def start_loop(
stop_signal=payload.stop_signal,
delay_seconds=payload.delay_seconds,
timeout_per_run=payload.timeout_per_run,
on_failure=payload.on_failure,
max_consecutive_failures=payload.max_consecutive_failures,
model=payload.model,
allowed_tools=payload.allowed_tools,
started_by_user_id=current_user.id,
Expand All @@ -183,6 +199,7 @@ async def start_loop(
status=loop_row["status"],
agent_name=name,
max_runs=payload.max_runs,
on_failure=payload.on_failure,
)


Expand Down
Loading
Loading