Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,25 @@ client = AxmeClient(
# Check connectivity
print(client.health())

# Send an intent
# Send an intent to a registered agent address
intent = client.create_intent(
{
"intent_type": "order.fulfillment.v1",
"to_agent": "agent://acme-corp/production/fulfillment-service",
"payload": {"order_id": "ord_123", "priority": "high"},
"owner_agent": "agent://fulfillment-service",
},
idempotency_key="fulfill-ord-123-001",
correlation_id="corr-ord-123-001",
)
print(intent["intent_id"], intent["status"])

# List registered agent addresses in your workspace
agents = client.list_agents(org_id="acme-corp-uuid", workspace_id="prod-ws-uuid")
for agent in agents["agents"]:
print(agent["address"], agent["status"])

# Wait for resolution
resolved = client.wait_for(intent["intent_id"], terminal_states={"RESOLVED", "CANCELLED"})
resolved = client.wait_for(intent["intent_id"])
print(resolved["status"])
```

Expand Down
211 changes: 209 additions & 2 deletions axme_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,49 @@ def send_intent(
raise ValueError("create_intent response does not include string intent_id")
return intent_id

def apply_scenario(
self,
bundle: dict[str, Any],
*,
idempotency_key: str | None = None,
trace_id: str | None = None,
) -> dict[str, Any]:
"""Submit a ScenarioBundle to POST /v1/scenarios/apply.

The server provisions missing agents, compiles the workflow, and creates the
intent in one atomic operation. Returns the full bundle response including
``intent_id``, ``compile_id``, ``agents_provisioned``.
"""
payload = dict(bundle)
if idempotency_key is not None:
payload.setdefault("idempotency_key", idempotency_key)
return self._request_json(
"POST",
"/v1/scenarios/apply",
json_body=payload,
idempotency_key=idempotency_key,
trace_id=trace_id,
retryable=idempotency_key is not None,
)

def validate_scenario(
self,
bundle: dict[str, Any],
*,
trace_id: str | None = None,
) -> dict[str, Any]:
"""Dry-run validate a ScenarioBundle without creating any resources.

Returns a list of validation errors (empty list means valid).
"""
return self._request_json(
"POST",
"/v1/scenarios/validate",
json_body=bundle,
trace_id=trace_id,
retryable=True,
)

def list_intent_events(
self,
intent_id: str,
Expand Down Expand Up @@ -744,6 +787,121 @@ def create_service_account_key(
retryable=idempotency_key is not None,
)

def list_agents(
self,
*,
org_id: str,
workspace_id: str,
limit: int | None = None,
trace_id: str | None = None,
) -> dict[str, Any]:
"""List registered agent addresses in a workspace.

Returns a dict with an ``agents`` list, each entry containing
``address``, ``display_name``, ``status``, and ``created_at``.
"""
params: dict[str, str] = {"org_id": org_id, "workspace_id": workspace_id}
if limit is not None:
params["limit"] = str(limit)
return self._request_json(
"GET",
"/v1/agents",
params=params,
trace_id=trace_id,
retryable=True,
)

def get_agent(self, address: str, *, trace_id: str | None = None) -> dict[str, Any]:
"""Get agent address details by full ``agent://org/workspace/name`` address."""
if not isinstance(address, str) or not address.strip():
raise ValueError("address must be a non-empty string")
path_part = address.strip()
if path_part.startswith("agent://"):
path_part = path_part[len("agent://"):]
return self._request_json(
"GET",
f"/v1/agents/{path_part}",
trace_id=trace_id,
retryable=True,
)

def listen(
self,
address: str,
*,
since: int = 0,
wait_seconds: int = 15,
timeout_seconds: float | None = None,
trace_id: str | None = None,
) -> Iterator[dict[str, Any]]:
"""Stream incoming intents for an agent address via SSE.

Connects to ``GET /v1/agents/{address}/intents/stream`` and yields each
intent payload as it arrives. The stream is a long-lived SSE connection;
the server sends a ``stream.timeout`` keepalive event when there are no
new intents within ``wait_seconds``, at which point the client
automatically reconnects until ``timeout_seconds`` elapses (or forever if
``timeout_seconds`` is ``None``).

Args:
address: Full ``agent://org/workspace/name`` or bare ``org/workspace/name``
agent address to listen on.
since: Sequence cursor — only intents with a sequence number greater
than this value are returned. Pass the ``seq`` value from the last
received event to resume without gaps.
wait_seconds: Server-side long-poll window (1–60 s). The server keeps
the connection open for up to this many seconds while waiting for
new intents.
timeout_seconds: Optional wall-clock timeout after which the method
raises ``TimeoutError``. ``None`` means listen indefinitely.
trace_id: Optional trace ID forwarded as ``X-Trace-Id``.

Yields:
Each intent payload dict as it arrives on the stream.

Raises:
ValueError: If ``address`` is empty or arguments are out of range.
TimeoutError: If ``timeout_seconds`` elapses before the caller
stops iterating.
"""
if not isinstance(address, str) or not address.strip():
raise ValueError("address must be a non-empty string")
if since < 0:
raise ValueError("since must be >= 0")
if wait_seconds < 1:
raise ValueError("wait_seconds must be >= 1")
if timeout_seconds is not None and timeout_seconds <= 0:
raise ValueError("timeout_seconds must be > 0 when provided")

path_part = address.strip()
if path_part.startswith("agent://"):
path_part = path_part[len("agent://"):]

deadline = (time.monotonic() + timeout_seconds) if timeout_seconds is not None else None
next_since = since

while True:
if deadline is not None and time.monotonic() >= deadline:
raise TimeoutError(f"timed out while listening on {address}")

stream_wait_seconds = wait_seconds
if deadline is not None:
seconds_left = max(0.0, deadline - time.monotonic())
if seconds_left <= 0:
raise TimeoutError(f"timed out while listening on {address}")
stream_wait_seconds = max(1, min(wait_seconds, int(seconds_left)))

for event in self._iter_agent_intents_stream(
path_part=path_part,
since=next_since,
wait_seconds=stream_wait_seconds,
trace_id=trace_id,
):
seq = event.get("seq")
if isinstance(seq, int) and seq >= 0:
next_since = max(next_since, seq)
yield event

def revoke_service_account_key(
self,
service_account_id: str,
Expand Down Expand Up @@ -1546,6 +1704,53 @@ def _iter_intent_events_stream(
data_lines.append(line.partition(":")[2].lstrip())
continue

def _iter_agent_intents_stream(
self,
*,
path_part: str,
since: int,
wait_seconds: int,
trace_id: str | None,
) -> Iterator[dict[str, Any]]:
headers: dict[str, str] | None = None
normalized_trace_id = self._normalize_trace_id(trace_id)
if normalized_trace_id is not None:
headers = {"X-Trace-Id": normalized_trace_id}

with self._http.stream(
"GET",
f"/v1/agents/{path_part}/intents/stream",
params={"since": str(since), "wait_seconds": str(wait_seconds)},
headers=headers,
) as response:
if response.status_code >= 400:
self._raise_http_error(response)

current_event: str | None = None
data_lines: list[str] = []
for line in response.iter_lines():
if line == "":
if current_event == "stream.timeout":
return
if current_event and data_lines:
try:
payload = json.loads("\n".join(data_lines))
except ValueError:
payload = None
if isinstance(payload, dict) and current_event.startswith("intent."):
yield payload
current_event = None
data_lines = []
continue
if line.startswith(":"):
continue
if line.startswith("event:"):
current_event = line.partition(":")[2].strip()
continue
if line.startswith("data:"):
data_lines.append(line.partition(":")[2].lstrip())
continue

def _mcp_request(
self,
*,
Expand Down Expand Up @@ -1748,7 +1953,9 @@ def _max_seen_seq(*, next_since: int, event: dict[str, Any]) -> int:

def _is_terminal_intent_event(event: dict[str, Any]) -> bool:
status = event.get("status")
if isinstance(status, str) and status in {"COMPLETED", "FAILED", "CANCELED"}:
if isinstance(status, str) and status in {"COMPLETED", "FAILED", "CANCELED", "TIMED_OUT"}:
return True
event_type = event.get("event_type")
return isinstance(event_type, str) and event_type in {"intent.completed", "intent.failed", "intent.canceled"}
return isinstance(event_type, str) and event_type in {
"intent.completed", "intent.failed", "intent.canceled", "intent.timed_out"
}
3 changes: 1 addition & 2 deletions examples/basic_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ def main() -> None:
created = client.create_intent(
{
"intent_type": "intent.demo.v1",
"from_agent": "agent://basic/python/source",
"to_agent": "agent://basic/python/target",
"to_agent": "agent://acme-corp/production/target",
"payload": {"task": "hello-from-python"},
},
correlation_id=correlation_id,
Expand Down
Loading
Loading