Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ async def main() -> None:
ai.user_message("What's the weather in Tokyo?"),
]

async for msg in agent.run(model, messages):
if msg.text_delta:
print(msg.text_delta, end="", flush=True)
async for event in agent.run(model, messages):
if isinstance(event, ai.TextDelta):
print(event.chunk, end="", flush=True)
print()


Expand Down Expand Up @@ -106,18 +106,18 @@ Override the default loop when you need approval gates, routing, or custom orche
@agent.loop
async def custom(context: ai.Context):
while True:
s = await ai.stream(
context.model, context.messages, tools=context.tools
)
async for msg in s:
yield msg
s = ai.stream(context.model, context.messages, tools=context.tools)
async for event in s:
yield event

tool_calls = context.resolve(s.tool_calls)
if not tool_calls:
return

results = [await tc() for tc in tool_calls]
yield ai.tool_message(*results)
tool_msg = ai.tool_message(*results)
yield ai.MessageStart(message=tool_msg)
yield ai.MessageEnd(message=tool_msg)
```

## Examples
Expand Down
35 changes: 35 additions & 0 deletions examples/coding-agent/1_raw_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import ai
import asyncio

import inspect
import pydantic
import json

from typing import get_type_hints


def get_schema(fn) -> dict:
sig = inspect.signature(fn)
hints = get_type_hints(fn)

fields = {}
for name, p in sig.parameters.items():
t = hints.get(name, str)
default = ... if p.default is inspect.Parameter.empty else p.default
fields[name] = (t, default)


async def main() -> None:
model = ai.ai_gateway("anthropic/claude-opus-4.7")

messages = [
ai.system_message("you are a coding assistant"),
ai.user_message("actually i don't need assistance thanks"),
]

async for e in ai.stream(model, messages):
print(e)


if __name__ == "__main__":
asyncio.run(main())
13 changes: 6 additions & 7 deletions examples/fastapi-vite/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ to suspend execution whenever the LLM wants to call a tool. The flow is:

1. LLM emits a tool call
2. Backend calls `await ai.hook(...)` with `payload=ai.ToolApproval`
3. The runtime emits a `role="internal"` message containing a pending `HookPart`
3. The runtime emits a `MessageEnd` event containing an internal `HookPart`
4. The frontend renders Approve / Reject buttons via the
`<Confirmation>` component (from AI Elements)
5. When the user clicks a button, `addToolApprovalResponse()` patches
the message and sends a new request with the decision
6. The backend resumes from the checkpoint, calls `ai.resolve_hook(...)`,
and either executes the tool or returns an error tool-result message
6. The backend pre-registers the approval via `ai.resolve_hook(...)` on the
next request, then either executes the tool or returns an error tool-result
message

Tool results are appended as separate `role="tool"` messages. The
assistant tool-call message remains immutable.
Expand Down Expand Up @@ -60,7 +61,5 @@ The frontend dev server proxies `/api` requests to the backend at `localhost:800

## Storage

Checkpoints are persisted to `./data/` as JSON files via `FileStorage`.
The storage backend implements a simple `Storage` protocol — swap in
Redis, Postgres, or any async key-value store by providing a different
implementation.
The demo backend is stateless. The frontend sends the conversation history
and approval responses on each request.
12 changes: 7 additions & 5 deletions examples/fastapi-vite/backend/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async def talk_to_mothership(question: str) -> str:


@chat_agent.loop
async def graph(context: ai.Context) -> AsyncGenerator[ai.Message]:
async def graph(context: ai.Context) -> AsyncGenerator[ai.Event]:
"""Agent graph with human-in-the-loop tool approval.

Loops: stream LLM -> request approval -> execute tools -> repeat.
Expand All @@ -34,9 +34,9 @@ async def graph(context: ai.Context) -> AsyncGenerator[ai.Message]:
Reject buttons and sends the decision back on the next request.
"""
while True:
s = await ai.models.stream(context.model, context.messages, tools=context.tools)
async for msg in s:
yield msg
s = ai.models.stream(context.model, context.messages, tools=context.tools)
async for event in s:
yield event

tool_calls = context.resolve(s.tool_calls)
if not tool_calls:
Expand All @@ -45,7 +45,9 @@ async def graph(context: ai.Context) -> AsyncGenerator[ai.Message]:
results = await asyncio.gather(
*(_execute_with_approval(tc) for tc in tool_calls)
)
yield ai.tool_message(*results)
tool_msg = ai.tool_message(*results)
yield ai.MessageStart(message=tool_msg)
yield ai.MessageEnd(message=tool_msg)


async def _execute_with_approval(tc: ai.ToolCall) -> ai.Message:
Expand Down
28 changes: 2 additions & 26 deletions examples/fastapi-vite/backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import fastapi.middleware.cors
import fastapi.responses
import pydantic
import storage

import ai

Expand All @@ -33,9 +32,6 @@ async def health() -> dict[str, str]:
return {"status": "ok"}


file_storage = storage.FileStorage()


class ChatRequest(pydantic.BaseModel):
"""Request body for the chat endpoint."""

Expand All @@ -47,32 +43,12 @@ class ChatRequest(pydantic.BaseModel):
async def chat(request: ChatRequest) -> fastapi.responses.StreamingResponse:
"""Handle chat requests and stream responses."""
messages = ai.ai_sdk_ui.to_messages(request.messages)
session_id = request.session_id or "default"
checkpoint_key = f"checkpoint:{session_id}"

checkpoint = None
saved = await file_storage.get(checkpoint_key)
if saved:
checkpoint = ai.Checkpoint.model_validate(saved)

durability = ai.EventLogProvider(checkpoint)
result = agent_.chat_agent.run(agent_.MODEL, messages, durability=durability)
result = agent_.chat_agent.run(agent_.MODEL, messages)

async def stream_response() -> AsyncGenerator[str]:
async for chunk in ai.ai_sdk_ui.to_sse_stream(result):
async for chunk in ai.ai_sdk_ui.to_sse(result):
yield chunk

# Persist checkpoint so interrupted runs (approval hooks with
# interrupt_loop=True) can resume on re-entry. Clean up when
# the run completes without pending hooks.
cp = durability.checkpoint()
if cp.steps and not cp.hooks:
# Steps recorded but no hooks resolved — the run was likely
# interrupted by an approval hook. Save for replay.
await file_storage.put(checkpoint_key, cp.model_dump())
else:
await file_storage.delete(checkpoint_key)

return fastapi.responses.StreamingResponse(
stream_response(),
headers=ai.ai_sdk_ui.UI_MESSAGE_STREAM_HEADERS,
Expand Down
68 changes: 40 additions & 28 deletions examples/multiagent-textual/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import json

import rich.text
import pydantic
import textual
import textual.app
import textual.containers
Expand Down Expand Up @@ -111,6 +112,8 @@ def __init__(self) -> None:
self._hook_queue: asyncio.Queue[ai.HookPart] = asyncio.Queue()
self._current_hook: ai.HookPart | None = None
self._ws: websockets.ClientConnection | None = None
self._event_adapter = pydantic.TypeAdapter(ai.Event)
self._current_label = "unknown"

def compose(self) -> textual.app.ComposeResult:
yield AgentPanel("mothership", "mothership")
Expand Down Expand Up @@ -147,20 +150,45 @@ async def run_websocket(self) -> None:
self._on_run_complete()
break

msg = ai.Message.model_validate(data)
self._handle_message(msg)
event = self._event_adapter.validate_python(data)
self._handle_event(event)

except (ConnectionRefusedError, OSError) as exc:
self._set_input_placeholder(f"connection failed: {exc}")

# ------------------------------------------------------------------
# Message routing
# Event routing
# ------------------------------------------------------------------

def _handle_message(self, msg: ai.Message) -> None:
label = msg.source_label or "unknown"
def _handle_event(self, event: ai.Event) -> None:
if isinstance(event, ai.MessageStart) and event.message is not None:
self._current_label = event.message.source_label or "unknown"
panel = self._get_panel(self._current_label)
if panel is not None and panel.status == "idle":
panel.status = "streaming..."
return

if isinstance(event, ai.TextDelta):
panel = self._get_panel(self._current_label)
if panel is not None:
panel.append_text(event.chunk)
return

if isinstance(event, ai.ReasoningDelta | ai.ToolDelta):
panel = self._get_panel(self._current_label)
if panel is not None:
panel.append_text(event.chunk, style="dim")
return

if (hook_part := msg.get_hook_part()) is not None:
if not isinstance(event, ai.MessageEnd):
return

msg = event.message
label = msg.source_label or self._current_label

hook_parts = [p for p in msg.parts if isinstance(p, ai.HookPart)]
if hook_parts:
hook_part = hook_parts[0]
if hook_part.status == "pending":
self._on_hook_pending(hook_part)
return
Expand All @@ -172,28 +200,12 @@ def _handle_message(self, msg: ai.Message) -> None:
if panel is None:
return

# Mark panel as actively streaming
if panel.status == "idle":
panel.status = "streaming..."

# Text / reasoning / tool-arg deltas
for ev in msg.deltas:
match ev.part:
case ai.TextPart():
panel.append_text(ev.chunk)
case ai.ReasoningPart():
panel.append_text(ev.chunk, style="dim")
case ai.ToolCallPart():
panel.append_text(ev.chunk, style="dim")

# Completed message — show tool calls and results
if msg.is_done:
for part in msg.parts:
match part:
case ai.ToolCallPart(tool_name=name, tool_args=args):
panel.append_line(f"> {name}({args})")
case ai.ToolResultPart(tool_name=name, result=result):
panel.append_line(f"< {name} = {result}")
for part in msg.parts:
match part:
case ai.ToolCallPart(tool_name=name, tool_args=args):
panel.append_line(f"> {name}({args})")
case ai.ToolResultPart(tool_name=name, result=result):
panel.append_line(f"< {name} = {result}")

# ------------------------------------------------------------------
# Hook lifecycle
Expand Down
Loading
Loading