Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- UNLESS it's `typing` — then `from typing import Foo` (there are too many of them).
- if the module name shadows a local variable in the same file, add a trailing underscore to the import: `from ..types import messages as messages_`. do not add trailing underscores preemptively — only when there is an actual collision.
4. tests directory structure mirrors `src`
5. to run examples that have their own `pyproject.toml`: `uv run --frozen --with-editable ~/src/py-ai/`

## design principles

Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ ai.yield_from(...) forward nested agent / streaming tool output

```
ai.system_message ai.user_message ai.assistant_message ai.tool_message
ai.tool_result ai.file_part ai.thinking
ai.tool_result ai.tool_result_part ai.file_part ai.thinking
```

### Middleware
Expand Down Expand Up @@ -109,15 +109,15 @@ async def custom(context: ai.Context):
s = ai.stream(context.model, context.messages, tools=context.tools)
async for event in s:
yield event
if s.message is not None:
yield s.message

tool_calls = context.resolve(s.tool_calls)
if not tool_calls:
return

results = [await tc() for tc in tool_calls]
tool_msg = ai.tool_message(*results)
yield ai.MessageStart(message=tool_msg)
yield ai.MessageEnd(message=tool_msg)
yield ai.tool_result(*results)
```

## Examples
Expand Down
2 changes: 1 addition & 1 deletion examples/fastapi-vite/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ to suspend execution whenever the LLM wants to call a tool. The flow is:

1. LLM emits a tool call
2. Backend calls `await ai.hook(...)` with `payload=ai.ToolApproval`
3. The runtime emits a `MessageEnd` event containing an internal `HookPart`
3. The runtime emits a `HookEvent` containing the `HookPart`
4. The frontend renders Approve / Reject buttons via the
`<Confirmation>` component (from AI Elements)
5. When the user clicks a button, `addToolApprovalResponse()` patches
Expand Down
6 changes: 3 additions & 3 deletions examples/fastapi-vite/backend/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ async def graph(context: ai.Context) -> AsyncGenerator[ai.Event]:
s = ai.models.stream(context.model, context.messages, tools=context.tools)
async for event in s:
yield event
if s.message is not None:
yield s.message

tool_calls = context.resolve(s.tool_calls)
if not tool_calls:
Expand All @@ -45,9 +47,7 @@ async def graph(context: ai.Context) -> AsyncGenerator[ai.Event]:
results = await asyncio.gather(
*(_execute_with_approval(tc) for tc in tool_calls)
)
tool_msg = ai.tool_message(*results)
yield ai.MessageStart(message=tool_msg)
yield ai.MessageEnd(message=tool_msg)
yield ai.tool_result(*results)


async def _execute_with_approval(tc: ai.ToolCall) -> ai.Message:
Expand Down
8 changes: 4 additions & 4 deletions examples/fastapi-vite/backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,21 @@ async def health() -> dict[str, str]:
class ChatRequest(pydantic.BaseModel):
"""Request body for the chat endpoint."""

messages: list[ai.ai_sdk_ui.UIMessage]
messages: list[ai.agents.ui.ai_sdk.UIMessage]
session_id: str | None = None


@app.post("/chat")
async def chat(request: ChatRequest) -> fastapi.responses.StreamingResponse:
"""Handle chat requests and stream responses."""
messages = ai.ai_sdk_ui.to_messages(request.messages)
messages = ai.agents.ui.ai_sdk.to_messages(request.messages)
result = agent_.chat_agent.run(agent_.MODEL, messages)

async def stream_response() -> AsyncGenerator[str]:
async for chunk in ai.ai_sdk_ui.to_sse(result):
async for chunk in ai.agents.ui.ai_sdk.to_sse(result):
yield chunk

return fastapi.responses.StreamingResponse(
stream_response(),
headers=ai.ai_sdk_ui.UI_MESSAGE_STREAM_HEADERS,
headers=ai.agents.ui.ai_sdk.UI_MESSAGE_STREAM_HEADERS,
)
1 change: 0 additions & 1 deletion examples/fastapi-vite/backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,3 @@ dependencies = [
"fastapi[standard]>=0.128.1",
"vercel-ai-sdk>=0.0.1.dev5",
]

8 changes: 8 additions & 0 deletions examples/fastapi-vite/frontend/vite.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,12 @@ export default defineConfig({
'@': path.resolve(__dirname, './src'),
},
},
server: {
proxy: {
'/api': {
target: 'http://localhost:8000',
rewrite: (path) => path.replace(/^\/api/, ''),

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't actually do that, the move is to use vercel dev and let it handle the routing.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do this, though, it will work either way?

},
},
},
})
72 changes: 34 additions & 38 deletions examples/multiagent-textual/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def __init__(self) -> None:
self._hook_queue: asyncio.Queue[ai.HookPart] = asyncio.Queue()
self._current_hook: ai.HookPart | None = None
self._ws: websockets.ClientConnection | None = None
self._event_adapter = pydantic.TypeAdapter(ai.Event)
self._event_adapter = pydantic.TypeAdapter(ai.AgentEvent)
self._current_label = "unknown"

def compose(self) -> textual.app.ComposeResult:
Expand Down Expand Up @@ -150,62 +150,58 @@ async def run_websocket(self) -> None:
self._on_run_complete()
break

event = self._event_adapter.validate_python(data)
try:
event = self._event_adapter.validate_python(data)
except pydantic.ValidationError:
continue
self._handle_event(event)

except (ConnectionRefusedError, OSError) as exc:
self._set_input_placeholder(f"connection failed: {exc}")

# ------------------------------------------------------------------
# Event routing
#
# TODO: streaming events (TextDelta, etc.) don't carry a source
# label, so _current_label is only updated when a ToolCallResult
# or HookEvent arrives. With concurrent sub-agents, streaming
# text can route to the wrong panel. The hooks in this demo
# serialize the flow enough that it works in practice, but a
# proper fix needs labels on streaming events.
# ------------------------------------------------------------------

def _handle_event(self, event: ai.Event) -> None:
if isinstance(event, ai.MessageStart) and event.message is not None:
self._current_label = event.message.source_label or "unknown"
panel = self._get_panel(self._current_label)
if panel is not None and panel.status == "idle":
panel.status = "streaming..."
def _handle_event(self, event: ai.AgentEvent) -> None:
if isinstance(event, ai.ToolCallResult):
label = event.message.source_label or self._current_label
self._current_label = label
panel = self._get_panel(label)
if panel is not None:
for part in event.message.parts:
match part:
case ai.ToolCallPart(tool_name=name, tool_args=args):
panel.append_line(f"> {name}({args})")
case ai.ToolResultPart(tool_name=name, result=result):
panel.append_line(f"< {name} = {result}")
return

if isinstance(event, ai.HookEvent):
if event.hook.status == "pending":
self._on_hook_pending(event.hook)
elif event.hook.status == "resolved":
self._on_hook_resolved(event.hook)
return

if isinstance(event, ai.TextDelta):
panel = self._get_panel(self._current_label)
if panel is not None:
panel.append_text(event.chunk)
return
if panel.status == "idle":
panel.status = "streaming..."

if isinstance(event, ai.ReasoningDelta | ai.ToolDelta):
elif isinstance(event, ai.ReasoningDelta | ai.ToolDelta):
panel = self._get_panel(self._current_label)
if panel is not None:
panel.append_text(event.chunk, style="dim")
return

if not isinstance(event, ai.MessageEnd):
return

msg = event.message
label = msg.source_label or self._current_label

hook_parts = [p for p in msg.parts if isinstance(p, ai.HookPart)]
if hook_parts:
hook_part = hook_parts[0]
if hook_part.status == "pending":
self._on_hook_pending(hook_part)
return
if hook_part.status == "resolved":
self._on_hook_resolved(hook_part)
return

panel = self._get_panel(label)
if panel is None:
return

for part in msg.parts:
match part:
case ai.ToolCallPart(tool_name=name, tool_args=args):
panel.append_line(f"> {name}({args})")
case ai.ToolResultPart(tool_name=name, result=result):
panel.append_line(f"< {name} = {result}")

# ------------------------------------------------------------------
# Hook lifecycle
Expand Down
43 changes: 11 additions & 32 deletions examples/multiagent-textual/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ async def gated_loop(context: ai.Context) -> AsyncGenerator[ai.Event]:
s = ai.stream(context.model, context.messages, tools=context.tools)
async for event in s:
yield event
if s.message is not None:
yield s.message

tool_calls = context.resolve(s.tool_calls)
if not tool_calls:
Expand Down Expand Up @@ -110,9 +112,7 @@ async def gated_loop(context: ai.Context) -> AsyncGenerator[ai.Event]:
else:
results.append(await tc())

tool_msg = ai.tool_message(*results)
yield ai.MessageStart(message=tool_msg)
yield ai.MessageEnd(message=tool_msg)
yield ai.tool_result(*results)

return gated

Expand Down Expand Up @@ -176,35 +176,19 @@ async def multiagent_loop(context: ai.Context) -> AsyncGenerator[ai.Event]:

combined = f"Mothership: {r1}\nData centers: {r2}"

# Fan in: summarise.
s = ai.stream(
# Fan in: summarise via a labelled sub-agent.
summary_agent = ai.agent()
async for event in summary_agent.run(
context.model,
[
ai.system_message(
"You are assistant 3. Summarise the results from the other assistants."
),
ai.user_message(combined),
],
)
async for event in s:
if isinstance(event, ai.MessageEnd):
yield event.model_copy(
update={
"message": event.message.model_copy(
update={"source_label": "summary"}
)
}
)
elif isinstance(event, ai.MessageStart) and event.message is not None:
yield event.model_copy(
update={
"message": event.message.model_copy(
update={"source_label": "summary"}
)
}
)
else:
yield event
label="summary",
):
yield event


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -262,13 +246,8 @@ async def read_resolutions() -> None:
data = _normalise_event(event.model_dump())
await websocket.send_json(data)

if isinstance(event, ai.MessageEnd) and event.message.role == "internal":
hook_parts = [
p for p in event.message.parts if isinstance(p, ai.HookPart)
]
if hook_parts:
hook_part = hook_parts[0]
print(f" Hook {hook_part.status}: {hook_part.hook_id}")
if isinstance(event, ai.HookEvent):
print(f" Hook {event.hook.status}: {event.hook.hook_id}")
finally:
reader.cancel()
with contextlib.suppress(asyncio.CancelledError):
Expand Down
9 changes: 5 additions & 4 deletions examples/samples/agent_custom_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ async def custom(context: ai.Context) -> AsyncGenerator[ai.Event]:
async for event in s:
yield event

# Yield the assistant message for silent history collection.
if s.message is not None:
yield s.message

tool_calls = context.resolve(s.tool_calls)
if not tool_calls:
return
Expand All @@ -46,10 +50,7 @@ async def custom(context: ai.Context) -> AsyncGenerator[ai.Event]:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(tc()) for tc in tool_calls]

# Yield one merged tool-result message — history auto-collects it.
tool_msg = ai.tool_message(*(t.result() for t in tasks))
yield ai.MessageStart(message=tool_msg)
yield ai.MessageEnd(message=tool_msg)
yield ai.tool_result(*(t.result() for t in tasks))

async for event in my_agent.run(
model,
Expand Down
32 changes: 15 additions & 17 deletions examples/samples/agent_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Demonstrates the function-based hook API:
- await hook("label", payload=Model) to suspend inside the loop
- resolve_hook("label", data) to unblock from outside
- Hook messages arrive as MessageEnd events with role="internal"
- Hook signals arrive as HookEvent events
"""

import asyncio
Expand Down Expand Up @@ -38,6 +38,8 @@ async def with_approval(context: ai.Context) -> AsyncGenerator[ai.Event]:
)
async for event in s:
yield event
if s.message is not None:
yield s.message

tool_calls = context.resolve(s.tool_calls)
if not tool_calls:
Expand Down Expand Up @@ -66,9 +68,7 @@ async def with_approval(context: ai.Context) -> AsyncGenerator[ai.Event]:
else:
results.append(await tc())

tool_msg = ai.tool_message(*results)
yield ai.MessageStart(message=tool_msg)
yield ai.MessageEnd(message=tool_msg)
yield ai.tool_result(*results)

messages = [
ai.system_message(
Expand All @@ -82,19 +82,17 @@ async def with_approval(context: ai.Context) -> AsyncGenerator[ai.Event]:
print(event.chunk, end="", flush=True)
continue

# Hook signals arrive as internal MessageEnd events.
if isinstance(event, ai.MessageEnd) and event.message.role == "internal":
hook_parts = [p for p in event.message.parts if isinstance(p, ai.HookPart)]
hook_part = hook_parts[0] if hook_parts else None
if hook_part is not None and hook_part.status == "pending":
answer = input(f"Approve {hook_part.hook_id}? [y/n] ")
ai.resolve_hook(
hook_part.hook_id,
Approval(
granted=answer.strip().lower() in ("y", "yes"),
reason="operator decision",
),
)
# Hook signals arrive as HookEvent events.
if isinstance(event, ai.HookEvent) and event.hook.status == "pending":
hook_part = event.hook
answer = input(f"Approve {hook_part.hook_id}? [y/n] ")
ai.resolve_hook(
hook_part.hook_id,
Approval(
granted=answer.strip().lower() in ("y", "yes"),
reason="operator decision",
),
)
print()


Expand Down
Loading
Loading