From 958eacfa5c3fd90efa6bb46b8cc810552786a2e8 Mon Sep 17 00:00:00 2001 From: hieptl Date: Tue, 9 Jun 2026 23:26:27 +0700 Subject: [PATCH 1/2] =?UTF-8?q?feat:=20mark=20rate-limited=20automation=20?= =?UTF-8?q?runs=20as=20Skipped=20=E2=80=93=20Limit=20Reached?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../detail/run-status-badge.test.tsx | 7 ++ .../automations/detail/run-status-badge.tsx | 4 + frontend/src/i18n/translation.json | 17 +++ frontend/src/types/automation.ts | 1 + openhands/automation/backends/cloud.py | 30 +++++ openhands/automation/dispatcher.py | 14 ++- openhands/automation/exceptions.py | 11 ++ openhands/automation/models.py | 1 + openhands/automation/schemas.py | 1 + openhands/automation/utils/run.py | 1 + tests/test_backends.py | 101 ++++++++++++++++ tests/test_dispatcher.py | 110 ++++++++++++++++++ 12 files changed, 297 insertions(+), 1 deletion(-) diff --git a/frontend/src/__tests__/components/automations/detail/run-status-badge.test.tsx b/frontend/src/__tests__/components/automations/detail/run-status-badge.test.tsx index 63f2a73..fdd62f9 100644 --- a/frontend/src/__tests__/components/automations/detail/run-status-badge.test.tsx +++ b/frontend/src/__tests__/components/automations/detail/run-status-badge.test.tsx @@ -25,4 +25,11 @@ describe("RunStatusBadge", () => { render(); expect(screen.getByText("AUTOMATIONS$DETAIL$RUNNING")).toBeInTheDocument(); }); + + it("renders skipped/limit-reached label for skipped status", () => { + render(); + expect( + screen.getByText("AUTOMATIONS$DETAIL$SKIPPED_LIMIT_REACHED"), + ).toBeInTheDocument(); + }); }); diff --git a/frontend/src/components/automations/detail/run-status-badge.tsx b/frontend/src/components/automations/detail/run-status-badge.tsx index 442ba00..6930090 100644 --- a/frontend/src/components/automations/detail/run-status-badge.tsx +++ b/frontend/src/components/automations/detail/run-status-badge.tsx @@ -30,6 +30,10 @@ const statusConfig: Record< label: I18nKey.AUTOMATIONS$DETAIL$RUNNING, style: "border-border bg-surface-elevated text-content-muted", }, + [AutomationRunStatus.SKIPPED]: { + label: I18nKey.AUTOMATIONS$DETAIL$SKIPPED_LIMIT_REACHED, + style: "border-border bg-surface-elevated text-content-muted", + }, }; function StatusIcon({ status }: { status: AutomationRunStatus }) { diff --git a/frontend/src/i18n/translation.json b/frontend/src/i18n/translation.json index 33f71e2..e54cfba 100644 --- a/frontend/src/i18n/translation.json +++ b/frontend/src/i18n/translation.json @@ -696,6 +696,23 @@ "tr": "Zamanlama", "uk": "Розклад" }, + "AUTOMATIONS$DETAIL$SKIPPED_LIMIT_REACHED": { + "en": "Skipped – Limit Reached", + "ja": "スキップ – 上限に到達", + "zh-CN": "已跳过 – 已达上限", + "zh-TW": "已略過 – 已達上限", + "ko-KR": "건너뜀 – 한도 도달", + "no": "Hoppet over – grense nådd", + "ar": "تم التخطّي – تم بلوغ الحد", + "de": "Übersprungen – Limit erreicht", + "fr": "Ignoré – limite atteinte", + "it": "Saltato – limite raggiunto", + "pt": "Ignorado – limite atingido", + "es": "Omitido – límite alcanzado", + "ca": "Omès – límit assolit", + "tr": "Atlandı – sınıra ulaşıldı", + "uk": "Пропущено – ліміт досягнуто" + }, "AUTOMATIONS$DETAIL$SUCCESSFUL": { "en": "Successful", "ja": "成功", diff --git a/frontend/src/types/automation.ts b/frontend/src/types/automation.ts index f10a57e..583450a 100644 --- a/frontend/src/types/automation.ts +++ b/frontend/src/types/automation.ts @@ -34,6 +34,7 @@ export enum AutomationRunStatus { RUNNING = "RUNNING", COMPLETED = "COMPLETED", FAILED = "FAILED", + SKIPPED = "SKIPPED", } export interface AutomationRun { diff --git a/openhands/automation/backends/cloud.py b/openhands/automation/backends/cloud.py index 5a4665a..9ed3439 100644 --- a/openhands/automation/backends/cloud.py +++ b/openhands/automation/backends/cloud.py @@ -21,6 +21,7 @@ from openhands.automation.backends.base import ExecutionBackend, ExecutionContext from openhands.automation.config import get_config +from openhands.automation.exceptions import ConcurrencyLimitReachedError from openhands.automation.models import AutomationRun from openhands.automation.utils.api_key import get_api_key_for_automation_run from openhands.automation.utils.sandbox import ( @@ -52,6 +53,27 @@ def _is_auth_error(exc: BaseException) -> bool: return False +def _concurrency_limit_detail(resp: httpx.Response) -> dict | None: + """Return the CONCURRENCY_LIMIT_REACHED detail dict if the response is the + organization concurrent-sandbox limit 429, else None. + + The OpenHands API raises this as a FastAPI HTTPException, so the body is + ``{"detail": {"error": "CONCURRENCY_LIMIT_REACHED", ...}}``; we also tolerate + a flat ``{"error": ...}`` shape. This is distinct from a transient + rate-limit 429, which should still be retried (see ``_is_rate_limit_error``). + """ + if resp.status_code != 429: + return None + try: + body = resp.json() + except Exception: + return None + detail = body.get("detail", body) if isinstance(body, dict) else None + if isinstance(detail, dict) and detail.get("error") == "CONCURRENCY_LIMIT_REACHED": + return detail + return None + + class CloudSandboxBackend(ExecutionBackend): """Execution backend that creates Cloud sandboxes per run. @@ -269,6 +291,14 @@ async def _do_create(): resp = await client.post( f"{self.api_url}/api/v1/sandboxes", headers=headers ) + # A concurrency-limit 429 is not transient: retrying won't free a + # slot. Raise a non-HTTPStatusError so the retry predicate skips it + # and the dispatcher can mark the run SKIPPED instead of FAILED. + detail = _concurrency_limit_detail(resp) + if detail is not None: + raise ConcurrencyLimitReachedError( + detail.get("message") or "Concurrency limit reached" + ) resp.raise_for_status() return resp.json()["id"] diff --git a/openhands/automation/dispatcher.py b/openhands/automation/dispatcher.py index 0fd02fe..2fb5881 100644 --- a/openhands/automation/dispatcher.py +++ b/openhands/automation/dispatcher.py @@ -29,7 +29,11 @@ from openhands.automation.backends import get_backend from openhands.automation.config import ServiceSettings, get_config from openhands.automation.db import using_sqlite -from openhands.automation.exceptions import PermanentDispatchError, TarballNotFoundError +from openhands.automation.exceptions import ( + ConcurrencyLimitReachedError, + PermanentDispatchError, + TarballNotFoundError, +) from openhands.automation.execution import execute_in_context from openhands.automation.models import ( Automation, @@ -196,6 +200,14 @@ async def _fail(error: str, disable: bool = False) -> None: # Note: This also initializes backend state (e.g., API key for cloud mode) try: ctx = await backend.get_execution_context(client) + except ConcurrencyLimitReachedError as exc: + logger.warning( + "Run skipped — organization concurrency limit reached: %s", + exc, + extra=_log_ctx(), + ) + await mark_run_terminal(session_factory, run, AutomationRunStatus.SKIPPED) + return except Exception: logger.exception("Failed to get execution context", extra=_log_ctx()) await _fail("Failed to get execution context") diff --git a/openhands/automation/exceptions.py b/openhands/automation/exceptions.py index c0a7e93..e4b7e0b 100644 --- a/openhands/automation/exceptions.py +++ b/openhands/automation/exceptions.py @@ -27,3 +27,14 @@ class TarballNotFoundError(PermanentDispatchError): """ pass + + +class ConcurrencyLimitReachedError(Exception): + """The organization/workspace has reached its concurrent-sandbox limit. + + Unlike PermanentDispatchError this is a *transient*, org-level condition: a + later run may succeed once a concurrent slot frees up. The run is marked + SKIPPED (not FAILED) and the automation is left enabled. + """ + + pass diff --git a/openhands/automation/models.py b/openhands/automation/models.py index d16766c..c9298c4 100644 --- a/openhands/automation/models.py +++ b/openhands/automation/models.py @@ -41,6 +41,7 @@ class AutomationRunStatus(enum.Enum): COMPLETED = "COMPLETED" FAILED = "FAILED" CANCELLED = "CANCELLED" + SKIPPED = "SKIPPED" class Automation(Base): diff --git a/openhands/automation/schemas.py b/openhands/automation/schemas.py index ee9e2fa..aa4f9d1 100644 --- a/openhands/automation/schemas.py +++ b/openhands/automation/schemas.py @@ -211,6 +211,7 @@ class RunStatus(StrEnum): COMPLETED = "COMPLETED" FAILED = "FAILED" CANCELLED = "CANCELLED" + SKIPPED = "SKIPPED" def _validate_command_string( diff --git a/openhands/automation/utils/run.py b/openhands/automation/utils/run.py index cc06ff2..46d873b 100644 --- a/openhands/automation/utils/run.py +++ b/openhands/automation/utils/run.py @@ -149,6 +149,7 @@ async def mark_run_status( AutomationRunStatus.COMPLETED, AutomationRunStatus.FAILED, AutomationRunStatus.CANCELLED, + AutomationRunStatus.SKIPPED, ): values["completed_at"] = now run.completed_at = now diff --git a/tests/test_backends.py b/tests/test_backends.py index 7c3034e..1c4b22c 100644 --- a/tests/test_backends.py +++ b/tests/test_backends.py @@ -2,6 +2,7 @@ from unittest.mock import AsyncMock, MagicMock, patch +import httpx import pytest from openhands.automation.backends import ( @@ -10,6 +11,8 @@ LocalAgentServerBackend, get_backend, ) +from openhands.automation.backends.cloud import _concurrency_limit_detail +from openhands.automation.exceptions import ConcurrencyLimitReachedError class TestExecutionContext: @@ -469,3 +472,101 @@ def test_cloud_mode(self, monkeypatch, mock_run): backend = get_backend(mock_run) assert isinstance(backend, CloudSandboxBackend) assert backend.api_url == "https://app.all-hands.dev" + + +class TestConcurrencyLimitDetection: + """Tests for `_concurrency_limit_detail`, the discriminator that tells an + organization concurrency-limit 429 (→ mark run SKIPPED) apart from a + transient rate-limit 429 (→ retry as before).""" + + @staticmethod + def _resp(status: int, *, json=None, raw: bytes | None = None) -> httpx.Response: + req = httpx.Request("POST", "https://app.all-hands.dev/api/v1/sandboxes") + if raw is not None: + return httpx.Response(status, request=req, content=raw) + return httpx.Response(status, request=req, json=json) + + def test_detects_nested_fastapi_detail(self): + """The real shape: FastAPI nests the HTTPException detail under "detail".""" + resp = self._resp( + 429, + json={ + "detail": { + "error": "CONCURRENCY_LIMIT_REACHED", + "message": "You have reached your limit of 3 ...", + "limit": 3, + "current": 3, + } + }, + ) + detail = _concurrency_limit_detail(resp) + assert detail is not None + assert detail["limit"] == 3 + + def test_detects_flat_detail(self): + """A non-nested {"error": ...} body is also tolerated.""" + resp = self._resp(429, json={"error": "CONCURRENCY_LIMIT_REACHED"}) + assert _concurrency_limit_detail(resp) is not None + + def test_ignores_transient_rate_limit_429(self): + """A generic 429 with a string detail is a transient rate limit.""" + resp = self._resp(429, json={"detail": "Rate limited, slow down"}) + assert _concurrency_limit_detail(resp) is None + + def test_ignores_429_without_marker(self): + """A 429 whose detail lacks the marker is not a concurrency limit.""" + resp = self._resp(429, json={"detail": {"error": "SOMETHING_ELSE"}}) + assert _concurrency_limit_detail(resp) is None + + def test_ignores_non_json_429(self): + """A non-JSON 429 body never matches (and does not raise).""" + resp = self._resp(429, raw=b"too many requests") + assert _concurrency_limit_detail(resp) is None + + def test_ignores_non_429(self): + """Only 429 responses can be a concurrency limit.""" + resp = self._resp(200, json={"id": "sandbox-abc"}) + assert _concurrency_limit_detail(resp) is None + + +class TestCloudSandboxConcurrencyLimit: + """Tests that `_create_sandbox` surfaces the org concurrency limit as + `ConcurrencyLimitReachedError` without retrying it.""" + + @pytest.fixture + def mock_run(self): + run = MagicMock() + run.sandbox_id = None + run.keep_alive = False + run.bash_command_id = None + return run + + @pytest.mark.asyncio + async def test_create_sandbox_raises_and_does_not_retry(self, mock_run): + """A concurrency-limit 429 raises ConcurrencyLimitReachedError on the + first attempt — retrying cannot free a slot, so it must not be retried.""" + backend = CloudSandboxBackend(api_url="https://app.all-hands.dev", run=mock_run) + + req = httpx.Request("POST", "https://app.all-hands.dev/api/v1/sandboxes") + resp = httpx.Response( + 429, + request=req, + json={ + "detail": { + "error": "CONCURRENCY_LIMIT_REACHED", + "message": "Reached limit of 3 concurrent conversations.", + "limit": 3, + "current": 3, + } + }, + ) + client = MagicMock() + client.post = AsyncMock(return_value=resp) + + with pytest.raises( + ConcurrencyLimitReachedError, match="concurrent conversations" + ): + await backend._create_sandbox(client, {"Authorization": "Bearer x"}) + + # No retry: the sandbox API was hit exactly once. + assert client.post.await_count == 1 diff --git a/tests/test_dispatcher.py b/tests/test_dispatcher.py index fc81824..b7502de 100644 --- a/tests/test_dispatcher.py +++ b/tests/test_dispatcher.py @@ -11,12 +11,15 @@ import pytest from sqlalchemy import select +from sqlalchemy.orm import selectinload from openhands.automation.dispatcher import ( _build_event_payload, + _execute_run, dispatch_pending_runs, dispatcher_loop, ) +from openhands.automation.exceptions import ConcurrencyLimitReachedError from openhands.automation.models import Automation, AutomationRun, AutomationRunStatus from openhands.automation.utils import utcnow from openhands.automation.utils.run import mark_run_status @@ -689,3 +692,110 @@ def test_empty_dict_trigger(self): assert payload["trigger"] == "unknown" assert payload["trigger_payload"] == {} + + +class TestExecuteRunConcurrencyLimit: + """When the org/workspace is at its concurrent-sandbox limit, the run is + marked SKIPPED (not FAILED) and the automation is left enabled.""" + + async def _make_running_run(self, async_session_factory): + """Create an automation + a RUNNING run (as the dispatcher leaves it + right before calling get_execution_context), with the automation + relationship eagerly loaded for _execute_run.""" + async with async_session_factory() as session: + automation = Automation( + user_id=TEST_USER_ID, + org_id=TEST_ORG_ID, + name="Test", + trigger={"type": "cron", "schedule": "* * * * *", "timezone": "UTC"}, + tarball_path="s3://bucket/code.tar.gz", + entrypoint="uv run main.py", + enabled=True, + ) + session.add(automation) + await session.commit() + + run = AutomationRun( + automation_id=automation.id, + status=AutomationRunStatus.RUNNING, + started_at=utcnow(), + ) + session.add(run) + await session.commit() + run_id = run.id + automation_id = automation.id + + async with async_session_factory() as session: + run = ( + await session.execute( + select(AutomationRun) + .options(selectinload(AutomationRun.automation)) + .where(AutomationRun.id == run_id) + ) + ).scalars().first() + return run, run_id, automation_id + + async def test_concurrency_limit_marks_skipped_and_keeps_enabled( + self, async_session_factory, mock_settings, mock_client + ): + """A ConcurrencyLimitReachedError from get_execution_context marks the + run SKIPPED (with completed_at, no error_detail) and does NOT disable + the automation.""" + run, run_id, automation_id = await self._make_running_run(async_session_factory) + + backend = MagicMock() + backend.get_execution_context = AsyncMock( + side_effect=ConcurrencyLimitReachedError( + "You have reached your limit of 3 concurrent conversations." + ) + ) + backend.release_context = AsyncMock() + + with patch( + "openhands.automation.dispatcher.get_backend", return_value=backend + ): + await _execute_run(run, mock_settings, async_session_factory, mock_client) + + async with async_session_factory() as session: + updated = ( + await session.execute( + select(AutomationRun).where(AutomationRun.id == run_id) + ) + ).scalars().first() + assert updated.status == AutomationRunStatus.SKIPPED + assert updated.completed_at is not None + assert updated.error_detail is None # SKIPPED is not a failure + + auto = ( + await session.execute( + select(Automation).where(Automation.id == automation_id) + ) + ).scalars().first() + assert auto.enabled is True # transient org-level condition: not disabled + + # No execution context was acquired, so there is nothing to release. + backend.release_context.assert_not_called() + + async def test_generic_context_failure_still_marks_failed( + self, async_session_factory, mock_settings, mock_client + ): + """Regression: a non-concurrency failure in get_execution_context still + marks the run FAILED — the new SKIPPED branch must not swallow it.""" + run, run_id, _ = await self._make_running_run(async_session_factory) + + backend = MagicMock() + backend.get_execution_context = AsyncMock(side_effect=RuntimeError("boom")) + backend.release_context = AsyncMock() + + with patch( + "openhands.automation.dispatcher.get_backend", return_value=backend + ): + await _execute_run(run, mock_settings, async_session_factory, mock_client) + + async with async_session_factory() as session: + updated = ( + await session.execute( + select(AutomationRun).where(AutomationRun.id == run_id) + ) + ).scalars().first() + assert updated.status == AutomationRunStatus.FAILED From 861f28adc4a1b57090bee0529122cb2b90a40d9e Mon Sep 17 00:00:00 2001 From: hieptl Date: Tue, 9 Jun 2026 23:29:58 +0700 Subject: [PATCH 2/2] fix: lint --- tests/test_dispatcher.py | 52 ++++++++++++++++++++++++---------------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/tests/test_dispatcher.py b/tests/test_dispatcher.py index b7502de..ee16169 100644 --- a/tests/test_dispatcher.py +++ b/tests/test_dispatcher.py @@ -727,12 +727,16 @@ async def _make_running_run(self, async_session_factory): async with async_session_factory() as session: run = ( - await session.execute( - select(AutomationRun) - .options(selectinload(AutomationRun.automation)) - .where(AutomationRun.id == run_id) + ( + await session.execute( + select(AutomationRun) + .options(selectinload(AutomationRun.automation)) + .where(AutomationRun.id == run_id) + ) ) - ).scalars().first() + .scalars() + .first() + ) return run, run_id, automation_id async def test_concurrency_limit_marks_skipped_and_keeps_enabled( @@ -751,26 +755,32 @@ async def test_concurrency_limit_marks_skipped_and_keeps_enabled( ) backend.release_context = AsyncMock() - with patch( - "openhands.automation.dispatcher.get_backend", return_value=backend - ): + with patch("openhands.automation.dispatcher.get_backend", return_value=backend): await _execute_run(run, mock_settings, async_session_factory, mock_client) async with async_session_factory() as session: updated = ( - await session.execute( - select(AutomationRun).where(AutomationRun.id == run_id) + ( + await session.execute( + select(AutomationRun).where(AutomationRun.id == run_id) + ) ) - ).scalars().first() + .scalars() + .first() + ) assert updated.status == AutomationRunStatus.SKIPPED assert updated.completed_at is not None assert updated.error_detail is None # SKIPPED is not a failure auto = ( - await session.execute( - select(Automation).where(Automation.id == automation_id) + ( + await session.execute( + select(Automation).where(Automation.id == automation_id) + ) ) - ).scalars().first() + .scalars() + .first() + ) assert auto.enabled is True # transient org-level condition: not disabled # No execution context was acquired, so there is nothing to release. @@ -787,15 +797,17 @@ async def test_generic_context_failure_still_marks_failed( backend.get_execution_context = AsyncMock(side_effect=RuntimeError("boom")) backend.release_context = AsyncMock() - with patch( - "openhands.automation.dispatcher.get_backend", return_value=backend - ): + with patch("openhands.automation.dispatcher.get_backend", return_value=backend): await _execute_run(run, mock_settings, async_session_factory, mock_client) async with async_session_factory() as session: updated = ( - await session.execute( - select(AutomationRun).where(AutomationRun.id == run_id) + ( + await session.execute( + select(AutomationRun).where(AutomationRun.id == run_id) + ) ) - ).scalars().first() + .scalars() + .first() + ) assert updated.status == AutomationRunStatus.FAILED