Skip to content

feat(a2a): expose CUGA over A2A v0.3 + working two-CUGA demo#323

Open
laredo wants to merge 3 commits into
mainfrom
feat/a2a-server
Open

feat(a2a): expose CUGA over A2A v0.3 + working two-CUGA demo#323
laredo wants to merge 3 commits into
mainfrom
feat/a2a-server

Conversation

@laredo

@laredo laredo commented Jun 9, 2026

Copy link
Copy Markdown
Collaborator

Summary

  • Adds an opt-in A2A server module (src/cuga/backend/server/a2a/) that exposes CUGA over A2A v0.3 (JSON-RPC) — /.well-known/agent.json, /.well-known/agent-card.json, and a /a2a endpoint supporting message/send and SSE message/stream. Mounted only when settings.a2a.enabled = true, so disabled deployments are byte-identical to before.
  • Wires the inbound endpoint to a lazily-instantiated CugaSupervisor configured via settings.a2a.supervisor_config_path. First request constructs the supervisor, caches it on app_state.a2a_supervisor, and emits the answer as a single final_answer event.
  • Rewrites the outbound delegate_task_via_a2a_sdk to speak v0.3 JSON-RPC directly via httpx. The previous implementation imported names removed in a2a-sdk 1.x (A2AClient, MessageSendParams, TextPart, JSONRPCErrorResponse, a2a.utils.message), which made HAS_A2A_SDK = False and silently disabled HTTP-A2A delegation in production.
  • Handles both AgentCard shapes the SDK can return: the v0.3 pydantic model (top-level .url) and the v1.0 protobuf model that A2ACardResolver.get_agent_card() actually returns (URL lives at supported_interfaces[0].url with a protocol_binding). Without this, every cross-process delegation failed with "Agent card carries no URL".
  • Adds a working two-CUGA demo at docs/examples/a2a_two_cuga/: provider YAML (digital_sales toolset), consumer YAML (no local tools, one external A2A agent), launch scripts, and a preflight that loads .env and works around Saved-config startup wipes MODEL_NAME env var when llm.model is empty (force_env mode) #322.
  • Test coverage: 40 passed, 1 xfailed (auth-token forwarding, intentionally deferred). New test_outbound_protocol.py (8 tests) covers _extract_text_from_task, the proto-shape AgentCard URL extraction, JSONRPC-vs-GRPC interface preference, end-to-end roundtrip via httpx.ASGITransport, and JSON-RPC error envelope handling. Existing test_cuga_to_cuga.py "delegate" test now actually exercises the rewritten outbound client end-to-end.

End-to-end verification

Smoke-tested live in this branch:

# Terminal 1: cuga start registry
# Terminal 2: docs/examples/a2a_two_cuga/run_provider.sh   → :8002
# Terminal 3: docs/examples/a2a_two_cuga/run_consumer.sh   → :7860
# Browser:    http://localhost:7860 → "list my territory accounts"

Result: consumer chat UI → consumer supervisor → A2A JSON-RPC → provider on :8002 → digital_sales tools → 50 real territory accounts → final answer back in the chat UI. Provider log shows POST /a2a → 200, consumer log shows Delegating to provider: List my territory accounts....

Related

Out of scope

  • Auth-token forwarding across A2A (xfail test stays xfail).
  • Per-step streaming through message/stream — currently folds the whole supervisor run into one terminal frame.
  • The non-A2A working-tree changes (cuga_mode, memory_provider, github MCP, drawio MCP) are deliberately not in this PR.

Test plan

  • uv run pytest tests/unit/a2a/ tests/integration/a2a/ → 40 passed, 1 xfailed
  • uv run ruff check src/cuga/backend/server/a2a/ src/cuga/backend/server/main.py src/cuga/backend/cuga_graph/nodes/cuga_supervisor/a2a_protocol.py tests/integration/a2a/ → clean
  • Live cross-process round-trip with the digital_sales demo on macOS, Azure/gpt-4.1 via ete-litellm gateway
  • Reviewer: confirm the opt-in mount truly preserves the disabled-deployment surface (no new imports under settings.a2a.enabled = false)
  • Reviewer: spot-check the proto-shape _agent_card_rpc_url against any internal A2A peers that aren't CUGA-fronted

Summary by CodeRabbit

  • New Features

    • A2A (agent-to-agent) protocol support enabling task delegation between CUGA instances over HTTP
    • Configuration options to enable/disable A2A protocol and specify supervisor settings
  • Documentation

    • Complete example guide with setup scripts for running two CUGA instances with A2A delegation
    • Includes troubleshooting guidance and architectural overview
  • Tests

    • Added integration tests for A2A protocol roundtrip communication
    • Added unit tests for A2A components

Jim Laredo added 3 commits June 9, 2026 15:34
Adds a self-contained A2A server module mounted only when settings.a2a.enabled
is true, so disabled deployments are byte-identical to before. Exposes
/.well-known/agent[-card].json and a JSON-RPC /a2a endpoint supporting
message/send and SSE message/stream, with a lifecycle adapter that maps CUGA
graph events onto A2A TaskStatusUpdateEvents (HITL -> input_required, final ->
completed, always emits a terminal). Production graph wiring is left as a
follow-up; main.py mounts a placeholder runner that returns a clean
working->completed lifecycle.
Wires CUGA's outbound and inbound A2A halves end-to-end against the
installed a2a-sdk 1.0.2. Replaces the placeholder A2A runner with a
lazily-instantiated CugaSupervisor that routes inbound message/send
through the real graph; rewrites delegate_task_via_a2a_sdk to speak
v0.3 JSON-RPC directly (the SDK 1.x removed A2AClient and
MessageSendParams that the prior implementation imported, so HTTP-A2A
delegation was silently no-op).

Adds docs/examples/a2a_two_cuga/ — provider.yaml, consumer.yaml, two
launch scripts, README, and a preflight that loads .env and patches
the saved cuga.db agent_configs row so MODEL_NAME survives
_apply_published_config (see issue tracking the underlying bug).

Tests: 38 passed, 1 xfailed (auth-token forwarding, intentionally
deferred). New test_outbound_protocol.py covers _extract_text_from_task,
_agent_card_rpc_url, end-to-end roundtrip via httpx.ASGITransport, and
JSON-RPC error envelope handling.
The a2a-sdk 1.x A2ACardResolver.get_agent_card() returns a protobuf
AgentCard whose URL lives at supported_interfaces[0].url with a
protocol_binding ("JSONRPC"/"GRPC"/...) — not at the top-level .url
attribute that the v0.3 pydantic shape uses. The previous
_agent_card_rpc_url() only looked at .url, so every cross-process
delegation through delegate_task_via_a2a_sdk() failed with
"Agent card carries no URL" once the supervisor flow handed in the
real (proto) card from the resolver.

Now we try .url first (pydantic / direct callers), then walk
.supported_interfaces preferring an entry whose protocol_binding is
JSONRPC, falling back to fallback_base. Verified end-to-end against
the two-CUGA demo: provider on :8002 receives POST /a2a → 200 with
real digital_sales output round-tripping back to the consumer.

Tests: +2 cases covering the proto shape and JSONRPC-vs-GRPC
preference. 40 passed, 1 xfailed.
@coderabbitai

coderabbitai Bot commented Jun 9, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

📝 Walkthrough

Walkthrough

This PR implements bidirectional A2A v0.3 protocol support for CUGA. It introduces an inbound FastAPI router advertising agents via well-known endpoints and JSON-RPC, rewrites the outbound client for direct HTTP JSON-RPC, adds configuration toggles, and provides a complete two-CUGA example with provider and consumer setup.

Changes

A2A v0.3 Protocol Support

Layer / File(s) Summary
Module structure and type exports
src/cuga/backend/server/a2a/_a2a_types.py, src/cuga/backend/server/a2a/__init__.py
A2A SDK types are centralized in _a2a_types.py via __all__ list; package-level __init__.py re-exports build_agent_card, build_router, and stream_events_to_a2a for consistent import paths.
Agent card construction and validation
src/cuga/backend/server/a2a/agent_card.py, tests/unit/a2a/test_agent_card.py
build_agent_card assembles metadata from settings (name, description, version, URL) and coerced skills; sets streaming capability and default text modes; adds JWT security schemes when auth is required. Unit tests validate minimal fields, streaming flag, skill ID stability, round-trip Pydantic validation, and auth-dependent security schemes.
A2A router, JSON-RPC endpoints, and event adapter
src/cuga/backend/server/a2a/router.py, src/cuga/backend/server/a2a/task_adapter.py, tests/integration/a2a/conftest.py, tests/unit/a2a/test_task_adapter.py, tests/integration/a2a/test_a2a_router.py
FastAPI router exposes /.well-known/agent.json and /a2a JSON-RPC endpoints; supports message/send (returning Task) and message/stream (SSE stream). Task adapter converts graph events into A2A TaskStatusUpdateEvent with HITL detection, finality detection, text extraction, and terminal completion guarantee. Conftest provides test doubles (FakeStreamEvent, ScriptedGraphRunner, minimal settings), app fixtures, and ASGI client. Router tests validate well-known contract, JSON-RPC encoding, error codes, SSE streaming, and context propagation. Task adapter tests validate state transitions (working/completed), HITL mapping, context preservation, and empty-stream handling.
Configuration and FastAPI app integration
src/cuga/settings.toml, src/cuga/backend/server/main.py
New [a2a] section enables/disables A2A, sets agent metadata and advertised auth, declares skill IDs, and references supervisor config path. Main.py conditionally mounts A2A router when enabled; implements lazy supervisor initialization with asyncio.Lock to avoid concurrent loads, forwards messages to supervisor.invoke, and streams terminal events back as properly formatted A2A updates.
Outbound A2A delegation protocol
src/cuga/backend/cuga_graph/nodes/cuga_supervisor/a2a_protocol.py, tests/integration/a2a/test_a2a_client_roundtrip.py, tests/integration/a2a/test_outbound_protocol.py
Rewrites delegate_task_via_a2a_sdk to send raw HTTP JSON-RPC to peer /a2a endpoint instead of using SDK client; adds _extract_text_from_task() to extract readable text from v0.3 Task response and _agent_card_rpc_url() to derive JSON-RPC URL from varying AgentCard shapes (v0.3 url vs v1.0 interfaces). New variables and rpc_url parameters support variable forwarding and explicit endpoint override. Client roundtrip test validates SDK card resolver and message/send response shape. Outbound protocol tests validate URL derivation across card variants, text extraction fallback logic, end-to-end delegation via ASGI transport, and JSON-RPC error propagation.
Example setup and documentation
docs/examples/a2a_two_cuga/README.md, docs/examples/a2a_two_cuga/_preflight.py, docs/examples/a2a_two_cuga/provider.supervisor.yaml, docs/examples/a2a_two_cuga/consumer.supervisor.yaml, docs/examples/a2a_two_cuga/run_provider.sh, docs/examples/a2a_two_cuga/run_consumer.sh
Complete two-CUGA example with provider (exposes digital_sales agent) and consumer (delegates to provider). Provider supervisor routes requests to digital_sales; consumer supervisor delegates all tasks to provider via A2A. Run scripts set ports, load .env, run preflight, configure Dynaconf, and start Uvicorn servers. Preflight script loads environment, resolves model name, and patches SQLite agent configs to preserve MODEL_NAME. README documents architecture, prerequisites (OpenAI key, uv), multi-terminal run flow, key URLs, configuration explanation, auth notes (both run with auth_required=false), and troubleshooting guidance.
End-to-end CUGA-to-CUGA integration
tests/integration/a2a/test_cuga_to_cuga.py
Integration tests run client and server CUGA instances in-process via ASGI transport. Tests validate agent card discovery with skill advertisement, simple task delegation with response verification and runner state inspection, streaming event propagation end-to-end, context ID isolation across concurrent delegations, and expected-failure auth token forwarding (marked xfail, not yet implemented).

Sequence Diagram

sequenceDiagram
  participant ClientCUGA as Client CUGA<br/>(Delegation)
  participant ServerRouter as Server Router<br/>(/a2a endpoint)
  participant SupervisorRunner as Supervisor<br/>(via runner)
  participant ResponseAdapter as Task Adapter<br/>(event → A2A)

  ClientCUGA->>ServerRouter: POST /a2a<br/>message/send JSON-RPC
  ServerRouter->>SupervisorRunner: runner.run(message)
  SupervisorRunner->>ResponseAdapter: stream events
  ResponseAdapter->>ResponseAdapter: convert to TaskStatusUpdateEvent
  ResponseAdapter->>ServerRouter: buffer Task with history
  ServerRouter->>ClientCUGA: JSON-RPC result{Task}
  ClientCUGA->>ClientCUGA: extract_text_from_task()
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Suggested labels

priority: critical

Suggested reviewers

  • sami-marreed
  • offerakrabi
  • gjt-prog

Poem

🐰 A2A protocol hops along the wire,
Agent speaks to agent, building higher,
JSON-RPC echoes through the night,
Two CUGAs delegate with delight,
Metadata and tasks in perfect flight! 🚀

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 45.16% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and concisely summarizes the main changes: exposing CUGA over A2A v0.3 protocol and providing a working two-CUGA demo example.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/a2a-server

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

assert card.name == "cuga"
assert card.description == "CUGA exposed over A2A."
assert card.version == "1.2.3"
assert str(card.url).startswith("https://cuga.example")

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/cuga/backend/server/a2a/router.py (1)

1-183: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Ruff format check is failing for this file.

CI indicates this file would be reformatted; run formatter before merge.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/cuga/backend/server/a2a/router.py` around lines 1 - 183, Ruff formatting
errors were detected in this file; run the repo's formatter and commit the
changes so the CI passes. Fix by running the project's formatter (e.g. ruff
format or the configured pre-commit hooks / black + ruff) on this file
(affecting functions like build_router, _run_and_collect, _sse_stream, and
jsonrpc_endpoint), review the modified whitespace/ordering changes, and
add/commit the reformatted file before merging.

Source: Pipeline failures

🧹 Nitpick comments (1)
tests/unit/a2a/test_agent_card.py (1)

39-39: ⚡ Quick win

Use exact URL assertion for stronger contract coverage.

startswith("https://cuga.example") can pass unintended values (e.g., extra path/host variants). Since fixture input is fixed, assert the exact URL string to tighten the test contract.

Suggested change
-    assert str(card.url).startswith("https://cuga.example")
+    assert str(card.url) == "https://cuga.example/a2a"
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/a2a/test_agent_card.py` at line 39, Replace the loose startswith
assertion with an exact equality check: change the assertion that currently uses
str(card.url).startswith("https://cuga.example") to assert the full expected URL
string (e.g., assert str(card.url) == "https://cuga.example") in the test where
the card fixture is used (reference variable card and its url property in
tests/unit/a2a/test_agent_card.py) so the test validates the exact URL value.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@docs/examples/a2a_two_cuga/_preflight.py`:
- Around line 63-64: Replace the broad except in the JSON parsing block with a
specific except json.JSONDecodeError that logs the offending row id (rowid) and
continues; locate the try/except around the JSON load operation in _preflight.py
(the loop that processes saved configs) and change it to catch
json.JSONDecodeError, call the existing logger or print with a clear message
including rowid, and then continue so malformed rows are visible for debugging.

In `@docs/examples/a2a_two_cuga/README.md`:
- Around line 11-21: The fenced ASCII-art block in README.md is missing a
language tag which triggers MD040; update the opening fence for the block
starting at the diagram (the triple backticks before the diagram) to include a
language identifier (e.g., text) so the block becomes ```text, leaving the
content unchanged and keeping the closing triple backticks as-is.

In `@src/cuga/backend/cuga_graph/nodes/cuga_supervisor/a2a_protocol.py`:
- Around line 228-233: The code currently treats any non-dict or failed peer
result as success; modify the logic around body/result_obj (the variables `body`
and `result_obj`) in the a2a response handler so that if `body` indicates a peer
task failure or `result_obj` is not a dict you return a failure-shaped response
(e.g., {"result": "", "variables": {}, "status": "failure", "error": <message>})
instead of status "success"; when `result_obj` is present but contains an error
state propagate that error (include its message/details) and only call
`_extract_text_from_task(result_obj)` and return status "success" when the peer
state and result payload shape are valid.
- Around line 158-168: The loop that picks an interface URL for
agent_card.supported_interfaces currently remembers the first interface
regardless of protocol_binding, which can select non-HTTP bindings (e.g., GRPC)
and break the HTTP JSON-RPC sender; change the logic in that loop so you only
set the fallback url when the iface_url has an HTTP(S) scheme (e.g.,
iface_url.lower().startswith("http")) or the protocol_binding is
empty/explicitly HTTP, and continue to prefer protocol_binding == "JSONRPC" as
the primary selector (i.e., keep the existing JSONRPC break path, but replace
the unconditional fallback assignment to url with a guarded assignment that only
accepts HTTP(S)-compatible interfaces).

In `@src/cuga/backend/server/a2a/router.py`:
- Around line 127-134: The SSE generator _sse_stream must catch exceptions from
runner.run(...) and stream_events_to_a2a(...) and emit a JSON-RPC error frame
instead of just closing the stream; wrap the run/iteration logic in try/except,
build an error response via _rpc_result(rpc_id, {"error": {...}}) including the
exception message/type (or a sanitized message) and yield it as a JSON string,
then return/stop the generator; apply the same pattern to the other SSE
generator block (the one also using runner.run and stream_events_to_a2a) so any
runtime error produces a single JSON-RPC error frame to clients rather than a
silent broken stream.
- Around line 164-165: The handler currently returns raw exception text via
JSONResponse(_rpc_error(rpc_id, _INTERNAL_ERROR, "Internal error", str(exc))),
which leaks internal details; instead log the full exception server-side (e.g.,
logging.exception or the existing logger) and change the JSON-RPC response to
return a generic message only (omit str(exc)), keeping the same rpc_id and
_INTERNAL_ERROR constants and using _rpc_error to build the response; ensure the
logged entry includes the exception and stack for debugging while the wire
response contains no internal exception data.

In `@src/cuga/backend/server/a2a/task_adapter.py`:
- Around line 98-107: The terminal-event branch in _is_final(...) currently
always emits TaskState.completed even for error terminal events; update the
branch that yields TaskStatusUpdateEvent so it inspects the event (e.g., ev.name
or ev.type via _event_text/ev.name) and chooses TaskState.failed (or the
project’s error state enum) when the final event represents an error, and only
uses TaskState.completed for successful terminals; preserve the existing message
construction via _message(_event_text(ev) ..., f"{task_id}-final", context_id)
and set the TaskStatus.state accordingly in the TaskStatus passed to
TaskStatusUpdateEvent.

In `@src/cuga/backend/server/main.py`:
- Around line 1632-1719: Ruff reported that src/cuga/backend/server/main.py
needs reformatting; run the project's formatter (ruff/black as configured) to
reformat this file (or the repo) and fix style issues around the A2A block
(e.g., the classes _A2AStreamEvent, _SupervisorA2ARunner, _PlaceholderA2ARunner
and the _a2a_settings_dict/_a2a_supervisor_cfg logic), then re-run the linter
and commit the formatted changes so CI passes.
- Around line 1685-1687: The A2A error event is leaking raw exception text via
f"A2A handler error: {exc}"; in the except block that catches Exception (where
logger.exception(...) and yield _A2AStreamEvent(...) are called) change the
yielded event to include a generic error message (e.g. "A2A handler error")
without including exc or any internal details, keep the logger.exception call
as-is so full exception details remain in server logs, and ensure the
_A2AStreamEvent still sets final=True.

In `@tests/integration/a2a/test_outbound_protocol.py`:
- Around line 87-90: Change mutable list class attributes used in the test
doubles to immutable tuples: replace occurrences of supported_interfaces =
[_Iface()] in the _ProtoCard test double (and the other test double at lines
~107-110) with supported_interfaces = (_Iface(),) so the class-level attribute
is immutable and avoids shared mutable state and the RUF012 warning; update any
other similar class-level list literals in those test classes to tuple
equivalents.

---

Outside diff comments:
In `@src/cuga/backend/server/a2a/router.py`:
- Around line 1-183: Ruff formatting errors were detected in this file; run the
repo's formatter and commit the changes so the CI passes. Fix by running the
project's formatter (e.g. ruff format or the configured pre-commit hooks / black
+ ruff) on this file (affecting functions like build_router, _run_and_collect,
_sse_stream, and jsonrpc_endpoint), review the modified whitespace/ordering
changes, and add/commit the reformatted file before merging.

---

Nitpick comments:
In `@tests/unit/a2a/test_agent_card.py`:
- Line 39: Replace the loose startswith assertion with an exact equality check:
change the assertion that currently uses
str(card.url).startswith("https://cuga.example") to assert the full expected URL
string (e.g., assert str(card.url) == "https://cuga.example") in the test where
the card fixture is used (reference variable card and its url property in
tests/unit/a2a/test_agent_card.py) so the test validates the exact URL value.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 48ff2f51-8714-49e3-ace5-73767ee1bcd8

📥 Commits

Reviewing files that changed from the base of the PR and between 5bd8644 and 4183c63.

📒 Files selected for processing (21)
  • docs/examples/a2a_two_cuga/README.md
  • docs/examples/a2a_two_cuga/_preflight.py
  • docs/examples/a2a_two_cuga/consumer.supervisor.yaml
  • docs/examples/a2a_two_cuga/provider.supervisor.yaml
  • docs/examples/a2a_two_cuga/run_consumer.sh
  • docs/examples/a2a_two_cuga/run_provider.sh
  • src/cuga/backend/cuga_graph/nodes/cuga_supervisor/a2a_protocol.py
  • src/cuga/backend/server/a2a/__init__.py
  • src/cuga/backend/server/a2a/_a2a_types.py
  • src/cuga/backend/server/a2a/agent_card.py
  • src/cuga/backend/server/a2a/router.py
  • src/cuga/backend/server/a2a/task_adapter.py
  • src/cuga/backend/server/main.py
  • src/cuga/settings.toml
  • tests/integration/a2a/conftest.py
  • tests/integration/a2a/test_a2a_client_roundtrip.py
  • tests/integration/a2a/test_a2a_router.py
  • tests/integration/a2a/test_cuga_to_cuga.py
  • tests/integration/a2a/test_outbound_protocol.py
  • tests/unit/a2a/test_agent_card.py
  • tests/unit/a2a/test_task_adapter.py

Comment on lines +63 to +64
except Exception:
continue

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Don’t silently swallow JSON parse failures.

At Line 63, except Exception: continue hides malformed saved configs and makes preflight/debugging unnecessarily opaque. Catch json.JSONDecodeError and log rowid so users can fix corrupted rows.

Suggested fix
-            except Exception:
-                continue
+            except json.JSONDecodeError as exc:
+                print(f"[preflight] skipped rowid={rowid}: invalid JSON ({exc})", file=sys.stderr)
+                continue
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except Exception:
continue
except json.JSONDecodeError as exc:
print(f"[preflight] skipped rowid={rowid}: invalid JSON ({exc})", file=sys.stderr)
continue
🧰 Tools
🪛 Ruff (0.15.15)

[error] 63-64: try-except-continue detected, consider logging the exception

(S112)


[warning] 63-63: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@docs/examples/a2a_two_cuga/_preflight.py` around lines 63 - 64, Replace the
broad except in the JSON parsing block with a specific except
json.JSONDecodeError that logs the offending row id (rowid) and continues;
locate the try/except around the JSON load operation in _preflight.py (the loop
that processes saved configs) and change it to catch json.JSONDecodeError, call
the existing logger or print with a clear message including rowid, and then
continue so malformed rows are visible for debugging.

Source: Linters/SAST tools

Comment on lines +11 to +21
```
┌──────────────────────────┐ ┌──────────────────────────┐
user │ CUGA-1 (consumer) │ A2A v0.3 │ CUGA-2 (provider) │
──→ │ http://localhost:7860/ │ ─JSON-RPC──→ │ http://localhost:8002/ │
chat │ no local tools │ /a2a │ digital_sales toolset │
│ supervisor: 1 ext agent │ │ A2A inbound enabled │
└──────────────────────────┘ └──────────────────────────┘
▲ ▲
│ shared │
└─── http://localhost:8001 (registry) ─────┘
```

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Add a language tag to the fenced code block.

At Line 11, the fenced block is missing a language identifier, which triggers MD040 and can fail markdown linting.

Suggested fix
-```
+```text
        ┌──────────────────────────┐                  ┌──────────────────────────┐
 user  │  CUGA-1 (consumer)       │   A2A v0.3       │  CUGA-2 (provider)       │
 ──→   │  http://localhost:7860/  │  ─JSON-RPC──→    │  http://localhost:8002/  │
@@
                        │ shared                                   │
                        └─── http://localhost:8001 (registry) ─────┘
</details>

<!-- suggestion_start -->

<details>
<summary>📝 Committable suggestion</summary>

> ‼️ **IMPORTANT**
> Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

```suggestion

🧰 Tools
🪛 markdownlint-cli2 (0.22.1)

[warning] 11-11: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@docs/examples/a2a_two_cuga/README.md` around lines 11 - 21, The fenced
ASCII-art block in README.md is missing a language tag which triggers MD040;
update the opening fence for the block starting at the diagram (the triple
backticks before the diagram) to include a language identifier (e.g., text) so
the block becomes ```text, leaving the content unchanged and keeping the closing
triple backticks as-is.

Source: Linters/SAST tools

Comment on lines +158 to +168
for iface in getattr(agent_card, "supported_interfaces", None) or []:
iface_url = (getattr(iface, "url", None) or "").strip()
binding = (getattr(iface, "protocol_binding", None) or "").upper()
if not iface_url:
continue
if binding == "JSONRPC":
url = iface_url
break
if not url:
url = iface_url # remember first interface; keep looking for JSONRPC

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Avoid selecting non-JSONRPC interface URLs for an HTTP JSON-RPC sender.

At Line 166-167, the fallback keeps the first interface URL even when protocol_binding is not JSONRPC (e.g., GRPC). This can produce an invalid POST target for httpx in Line 220.

Suggested fix
-    if not url:
-        for iface in getattr(agent_card, "supported_interfaces", None) or []:
-            iface_url = (getattr(iface, "url", None) or "").strip()
-            binding = (getattr(iface, "protocol_binding", None) or "").upper()
-            if not iface_url:
-                continue
-            if binding == "JSONRPC":
-                url = iface_url
-                break
-            if not url:
-                url = iface_url  # remember first interface; keep looking for JSONRPC
+    if not url:
+        first_http_candidate = ""
+        for iface in getattr(agent_card, "supported_interfaces", None) or []:
+            iface_url = (getattr(iface, "url", None) or "").strip()
+            binding = (getattr(iface, "protocol_binding", None) or "").upper()
+            if not iface_url:
+                continue
+            if binding == "JSONRPC":
+                url = iface_url
+                break
+            if not binding and iface_url.startswith(("http://", "https://")) and not first_http_candidate:
+                first_http_candidate = iface_url
+        if not url:
+            url = first_http_candidate
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/cuga/backend/cuga_graph/nodes/cuga_supervisor/a2a_protocol.py` around
lines 158 - 168, The loop that picks an interface URL for
agent_card.supported_interfaces currently remembers the first interface
regardless of protocol_binding, which can select non-HTTP bindings (e.g., GRPC)
and break the HTTP JSON-RPC sender; change the logic in that loop so you only
set the fallback url when the iface_url has an HTTP(S) scheme (e.g.,
iface_url.lower().startswith("http")) or the protocol_binding is
empty/explicitly HTTP, and continue to prefer protocol_binding == "JSONRPC" as
the primary selector (i.e., keep the existing JSONRPC break path, but replace
the unconditional fallback assignment to url with a guarded assignment that only
accepts HTTP(S)-compatible interfaces).

Comment on lines +228 to +233
result_obj = body.get("result") if isinstance(body, dict) else None
if not isinstance(result_obj, dict):
return {"result": "", "variables": {}, "status": "success"}
if isinstance(result_obj, Message):
text = get_message_text(result_obj)
elif isinstance(result_obj, Task) and result_obj.history:
texts = [get_message_text(m) for m in result_obj.history if isinstance(m, Message)]
text = "\n".join(texts) if texts else ""
else:
text = str(result_obj) if result_obj else ""
return {"result": text or "", "variables": {}, "status": "success"}

text = _extract_text_from_task(result_obj)
return {"result": text, "variables": {}, "status": "success"}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Remote task failures are flattened into status="success".

Line 229-233 returns success unconditionally, even when the peer task state indicates failure or when the result payload shape is invalid. This hides upstream errors and breaks caller-side contract handling.

Suggested fix
     result_obj = body.get("result") if isinstance(body, dict) else None
     if not isinstance(result_obj, dict):
-        return {"result": "", "variables": {}, "status": "success"}
+        raise RuntimeError("A2A response is missing a valid `result` task envelope")
 
     text = _extract_text_from_task(result_obj)
-    return {"result": text, "variables": {}, "status": "success"}
+    state = str(((result_obj.get("status") or {}).get("state") or "")).lower()
+    normalized_status = "failed" if ("fail" in state or "error" in state) else "success"
+    out_vars = ((result_obj.get("metadata") or {}).get("variables") or {})
+    return {"result": text, "variables": out_vars, "status": normalized_status}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/cuga/backend/cuga_graph/nodes/cuga_supervisor/a2a_protocol.py` around
lines 228 - 233, The code currently treats any non-dict or failed peer result as
success; modify the logic around body/result_obj (the variables `body` and
`result_obj`) in the a2a response handler so that if `body` indicates a peer
task failure or `result_obj` is not a dict you return a failure-shaped response
(e.g., {"result": "", "variables": {}, "status": "failure", "error": <message>})
instead of status "success"; when `result_obj` is present but contains an error
state propagate that error (include its message/details) and only call
`_extract_text_from_task(result_obj)` and return status "success" when the peer
state and result payload shape are valid.

Comment on lines +127 to +134
async def _sse_stream(message_text: str, context_id: str, task_id: str, rpc_id: Any) -> AsyncIterator[dict]:
agen = runner.run(message_text, context_id)
events = []
async for ev in agen:
events.append(ev)
for upd in stream_events_to_a2a(events, task_id=task_id, context_id=context_id):
frame = _rpc_result(rpc_id, upd.model_dump(mode="json", exclude_none=True, by_alias=True))
yield {"data": json.dumps(frame)}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Handle stream-time exceptions inside the SSE generator.

If runner.run(...) or stream_events_to_a2a(...) raises, the stream can terminate without a JSON-RPC error frame, leaving clients with an ambiguous broken stream.

Suggested fix
 async def _sse_stream(message_text: str, context_id: str, task_id: str, rpc_id: Any) -> AsyncIterator[dict]:
-    agen = runner.run(message_text, context_id)
-    events = []
-    async for ev in agen:
-        events.append(ev)
-    for upd in stream_events_to_a2a(events, task_id=task_id, context_id=context_id):
-        frame = _rpc_result(rpc_id, upd.model_dump(mode="json", exclude_none=True, by_alias=True))
-        yield {"data": json.dumps(frame)}
+    try:
+        agen = runner.run(message_text, context_id)
+        events = []
+        async for ev in agen:
+            events.append(ev)
+        for upd in stream_events_to_a2a(events, task_id=task_id, context_id=context_id):
+            frame = _rpc_result(rpc_id, upd.model_dump(mode="json", exclude_none=True, by_alias=True))
+            yield {"data": json.dumps(frame)}
+    except Exception:
+        yield {"data": json.dumps(_rpc_error(rpc_id, _INTERNAL_ERROR, "Internal error"))}

Also applies to: 170-178

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/cuga/backend/server/a2a/router.py` around lines 127 - 134, The SSE
generator _sse_stream must catch exceptions from runner.run(...) and
stream_events_to_a2a(...) and emit a JSON-RPC error frame instead of just
closing the stream; wrap the run/iteration logic in try/except, build an error
response via _rpc_result(rpc_id, {"error": {...}}) including the exception
message/type (or a sanitized message) and yield it as a JSON string, then
return/stop the generator; apply the same pattern to the other SSE generator
block (the one also using runner.run and stream_events_to_a2a) so any runtime
error produces a single JSON-RPC error frame to clients rather than a silent
broken stream.

Comment on lines +164 to +165
except Exception as exc: # graph blew up
return JSONResponse(_rpc_error(rpc_id, _INTERNAL_ERROR, "Internal error", str(exc)))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Do not expose raw exception details in JSON-RPC errors.

Returning str(exc) to remote callers leaks internal implementation details. Keep the wire error generic and log details server-side only.

Suggested fix
-            except Exception as exc:  # graph blew up
-                return JSONResponse(_rpc_error(rpc_id, _INTERNAL_ERROR, "Internal error", str(exc)))
+            except Exception:  # graph blew up
+                return JSONResponse(_rpc_error(rpc_id, _INTERNAL_ERROR, "Internal error"))
🧰 Tools
🪛 Ruff (0.15.15)

[warning] 164-164: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/cuga/backend/server/a2a/router.py` around lines 164 - 165, The handler
currently returns raw exception text via JSONResponse(_rpc_error(rpc_id,
_INTERNAL_ERROR, "Internal error", str(exc))), which leaks internal details;
instead log the full exception server-side (e.g., logging.exception or the
existing logger) and change the JSON-RPC response to return a generic message
only (omit str(exc)), keeping the same rpc_id and _INTERNAL_ERROR constants and
using _rpc_error to build the response; ensure the logged entry includes the
exception and stack for debugging while the wire response contains no internal
exception data.

Comment on lines +98 to +107
if _is_final(ev):
saw_terminal = True
yield TaskStatusUpdateEvent(
task_id=task_id,
context_id=context_id,
final=True,
status=TaskStatus(
state=TaskState.completed,
message=_message(_event_text(ev) or "", f"{task_id}-final", context_id),
),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Terminal error events are being marked as completed.

When a final event represents an error (e.g., name "error"), this path still emits TaskState.completed, so failures are reported as success.

Suggested fix
+def _is_error(event: Any) -> bool:
+    name = str(getattr(event, "name", "") or "").lower()
+    return name in {"error", "failed", "failure", "exception"}
+
 ...
         if _is_final(ev):
             saw_terminal = True
+            terminal_state = TaskState.failed if _is_error(ev) else TaskState.completed
             yield TaskStatusUpdateEvent(
                 task_id=task_id,
                 context_id=context_id,
                 final=True,
                 status=TaskStatus(
-                    state=TaskState.completed,
+                    state=terminal_state,
                     message=_message(_event_text(ev) or "", f"{task_id}-final", context_id),
                 ),
             )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/cuga/backend/server/a2a/task_adapter.py` around lines 98 - 107, The
terminal-event branch in _is_final(...) currently always emits
TaskState.completed even for error terminal events; update the branch that
yields TaskStatusUpdateEvent so it inspects the event (e.g., ev.name or ev.type
via _event_text/ev.name) and chooses TaskState.failed (or the project’s error
state enum) when the final event represents an error, and only uses
TaskState.completed for successful terminals; preserve the existing message
construction via _message(_event_text(ev) ..., f"{task_id}-final", context_id)
and set the TaskStatus.state accordingly in the TaskStatus passed to
TaskStatusUpdateEvent.

Comment on lines +1632 to +1719
if getattr(settings, "a2a", None) and getattr(settings.a2a, "enabled", False):
# The A2A package is only imported when explicitly enabled in settings,
# so disabled deployments pay no import-time cost for it.
from cuga.backend.server.a2a import build_router as _build_a2a_router # noqa: E402

class _A2AStreamEvent:
"""Duck-typed event the A2A task adapter consumes."""

__slots__ = ("name", "data", "final")

def __init__(self, name, data=None, final=False):
self.name = name
self.data = data
self.final = final

class _SupervisorA2ARunner:
"""Run inbound A2A messages through a lazily-created CugaSupervisor.

We instantiate the supervisor on first use rather than during
lifespan startup so deployments that enable A2A but never receive
a request pay no init cost. The supervisor is cached on
``app_state.a2a_supervisor`` afterward.
"""

def __init__(self, app_state_ref, supervisor_config_path: str):
self._app_state = app_state_ref
self._yaml_path = supervisor_config_path
self._lock = asyncio.Lock()

async def _ensure_supervisor(self):
existing = getattr(self._app_state, "a2a_supervisor", None)
if existing is not None:
return existing
async with self._lock:
existing = getattr(self._app_state, "a2a_supervisor", None)
if existing is not None:
return existing
from cuga.sdk import CugaSupervisor

supervisor = await CugaSupervisor.from_yaml(self._yaml_path)
self._app_state.a2a_supervisor = supervisor
return supervisor

async def run(self, message, context_id=None):
try:
supervisor = await self._ensure_supervisor()
result = await supervisor.invoke(message, thread_id=context_id)
answer = getattr(result, "answer", None) or str(result)
error = getattr(result, "error", None)
if error:
yield _A2AStreamEvent("error", {"text": f"Supervisor error: {error}"}, final=True)
return
yield _A2AStreamEvent("final_answer", {"text": answer}, final=True)
except Exception as exc: # surface failures across the wire
logger.exception("A2A inbound delegation failed")
yield _A2AStreamEvent("error", {"text": f"A2A handler error: {exc}"}, final=True)

class _PlaceholderA2ARunner:
"""Used when ``a2a.supervisor_config_path`` is unset.

Returns a clear "endpoint reached but unconfigured" terminal event
so callers get a well-formed Task envelope instead of a 5xx.
"""

async def run(self, message, context_id=None):
yield _A2AStreamEvent(
"final_answer",
{"text": "A2A inbound endpoint reached, but settings.a2a.supervisor_config_path is not set."},
final=True,
)

_a2a_settings_dict = {
"agent_name": getattr(settings.a2a, "agent_name", "cuga"),
"agent_description": getattr(settings.a2a, "agent_description", "CUGA agent exposed over A2A."),
"agent_version": getattr(settings.a2a, "agent_version", "0.0.0"),
"agent_url": getattr(settings.a2a, "agent_url", "http://localhost:8000"),
"auth_required": getattr(settings.a2a, "auth_required", False),
"skill_ids": list(getattr(settings.a2a, "skill_ids", []) or []),
}
_a2a_supervisor_cfg = getattr(settings.a2a, "supervisor_config_path", "") or ""
if _a2a_supervisor_cfg:
_a2a_runner: Any = _SupervisorA2ARunner(app_state, _a2a_supervisor_cfg)
logger.info(f"A2A inbound: routing requests through supervisor at {_a2a_supervisor_cfg}")
else:
_a2a_runner = _PlaceholderA2ARunner()
logger.warning("A2A inbound enabled but settings.a2a.supervisor_config_path is empty; using placeholder runner.")
app.include_router(_build_a2a_router(runner=_a2a_runner, settings=_a2a_settings_dict))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Ruff format check is failing for this file.

CI reports this file would be reformatted; run formatter before merge.

🧰 Tools
🪛 Ruff (0.15.15)

[warning] 1640-1640: _A2AStreamEvent.__slots__ is not sorted

Apply a natural sort to _A2AStreamEvent.__slots__

(RUF023)


[warning] 1685-1685: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/cuga/backend/server/main.py` around lines 1632 - 1719, Ruff reported that
src/cuga/backend/server/main.py needs reformatting; run the project's formatter
(ruff/black as configured) to reformat this file (or the repo) and fix style
issues around the A2A block (e.g., the classes _A2AStreamEvent,
_SupervisorA2ARunner, _PlaceholderA2ARunner and the
_a2a_settings_dict/_a2a_supervisor_cfg logic), then re-run the linter and commit
the formatted changes so CI passes.

Source: Pipeline failures

Comment on lines +1685 to +1687
except Exception as exc: # surface failures across the wire
logger.exception("A2A inbound delegation failed")
yield _A2AStreamEvent("error", {"text": f"A2A handler error: {exc}"}, final=True)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Do not send raw exception text over A2A error events.

f"A2A handler error: {exc}" can disclose internal paths/config/runtime details to external callers. Send a generic message and keep details in server logs.

Suggested fix
             except Exception as exc:  # surface failures across the wire
                 logger.exception("A2A inbound delegation failed")
-                yield _A2AStreamEvent("error", {"text": f"A2A handler error: {exc}"}, final=True)
+                yield _A2AStreamEvent("error", {"text": "A2A handler error"}, final=True)
🧰 Tools
🪛 Ruff (0.15.15)

[warning] 1685-1685: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/cuga/backend/server/main.py` around lines 1685 - 1687, The A2A error
event is leaking raw exception text via f"A2A handler error: {exc}"; in the
except block that catches Exception (where logger.exception(...) and yield
_A2AStreamEvent(...) are called) change the yielded event to include a generic
error message (e.g. "A2A handler error") without including exc or any internal
details, keep the logger.exception call as-is so full exception details remain
in server logs, and ensure the _A2AStreamEvent still sets final=True.

Comment on lines +87 to +90
class _ProtoCard:
url = "" # absent on protobuf
supported_interfaces = [_Iface()]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Use immutable class attributes in test doubles (RUF012).

Line 89 and Line 109 define mutable list class attributes for supported_interfaces. Switching to tuples avoids shared mutable state and clears the Ruff warning.

Suggested fix
     class _ProtoCard:
         url = ""  # absent on protobuf
-        supported_interfaces = [_Iface()]
+        supported_interfaces = (_Iface(),)
@@
     class _Card:
         url = ""
-        supported_interfaces = [_Grpc(), _Json()]
+        supported_interfaces = (_Grpc(), _Json())

Also applies to: 107-110

🧰 Tools
🪛 Ruff (0.15.15)

[warning] 89-89: Mutable default value for class attribute

(RUF012)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/integration/a2a/test_outbound_protocol.py` around lines 87 - 90, Change
mutable list class attributes used in the test doubles to immutable tuples:
replace occurrences of supported_interfaces = [_Iface()] in the _ProtoCard test
double (and the other test double at lines ~107-110) with supported_interfaces =
(_Iface(),) so the class-level attribute is immutable and avoids shared mutable
state and the RUF012 warning; update any other similar class-level list literals
in those test classes to tuple equivalents.

Source: Linters/SAST tools

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants