Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,10 @@ checkpoint = ai.get_checkpoint()
```python
# After a run completes or suspends
checkpoint = result.checkpoint
data = checkpoint.serialize() # dict, JSON-safe
data = checkpoint.model_dump() # dict, JSON-safe

# Later: restore and resume
checkpoint = ai.Checkpoint.deserialize(data)
checkpoint = ai.Checkpoint.model_validate(data)
result = ai.run(my_agent, llm, query, checkpoint=checkpoint)
```

Expand Down Expand Up @@ -277,7 +277,7 @@ return StreamingResponse(stream_response(), headers=UI_MESSAGE_STREAM_HEADERS)
| `RunResult` | Return type of `run()`. Async-iterable for messages, then `.checkpoint` and `.pending_hooks` |
| `HookInfo` | Pending hook info: `label`, `hook_type`, `metadata` |
| `Hook` | Generic hook base with `.create()`, `.resolve()`, `.cancel()` class methods |
| `Checkpoint` | Serializable snapshot of completed work: `steps[]`, `tools[]`, `hooks[]`. Has `.serialize()` / `.deserialize()` |
| `Checkpoint` | Pydantic model — serializable snapshot of completed work: `steps[]`, `tools[]`, `hooks[]`. Use `.model_dump()` / `.model_validate()` |
| `LanguageModel` | Abstract base class for LLM providers |

## Examples
Expand Down
8 changes: 5 additions & 3 deletions examples/fastapi-vite/backend/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import os

from typing import Any

import vercel_ai_sdk as ai


Expand All @@ -20,14 +22,14 @@ def get_llm() -> ai.LanguageModel:
)


TOOLS: list[ai.Tool] = [talk_to_mothership]
TOOLS: list[ai.Tool[..., Any]] = [talk_to_mothership]


async def graph(
llm: ai.LanguageModel,
messages: list[ai.Message],
tools: list[ai.Tool],
):
tools: list[ai.Tool[..., Any]],
) -> ai.StreamResult:
"""
Agent graph: stream LLM, execute tools, repeat until done.

Expand Down
2 changes: 1 addition & 1 deletion examples/fastapi-vite/backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@


@app.get("/api/health")
async def health():
async def health() -> dict[str, str]:
"""Health check endpoint."""
return {"status": "ok"}
10 changes: 6 additions & 4 deletions examples/fastapi-vite/backend/routes/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from __future__ import annotations

from collections.abc import AsyncGenerator

import fastapi
import fastapi.responses
import pydantic
Expand All @@ -24,7 +26,7 @@ class ChatRequest(pydantic.BaseModel):


@router.post("/chat")
async def chat(request: ChatRequest):
async def chat(request: ChatRequest) -> fastapi.responses.StreamingResponse:
"""Handle chat requests and stream responses."""
messages = ai.ai_sdk_ui.to_messages(request.messages)
session_id = request.session_id or "default"
Expand All @@ -37,19 +39,19 @@ async def chat(request: ChatRequest):
# run — the frontend carries the full message history — so we only
# load a checkpoint when one was saved from a previous incomplete run.
saved = await file_storage.get(checkpoint_key)
checkpoint = ai.Checkpoint.deserialize(saved) if saved else None
checkpoint = ai.Checkpoint.model_validate(saved) if saved else None

result = ai.run(agent.graph, llm, messages, agent.TOOLS, checkpoint=checkpoint)

async def stream_response():
async def stream_response() -> AsyncGenerator[str]:
async for chunk in ai.ai_sdk_ui.to_sse_stream(result):
yield chunk

# If the run completed (no pending hooks), clear the checkpoint
# so the next request starts fresh. If hooks are pending, save
# the checkpoint so the next request can resume from here.
if result.pending_hooks:
await file_storage.put(checkpoint_key, result.checkpoint.serialize())
await file_storage.put(checkpoint_key, result.checkpoint.model_dump())
else:
await file_storage.delete(checkpoint_key)

Expand Down
2 changes: 1 addition & 1 deletion examples/fastapi-vite/backend/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async def get(self, key: str) -> dict[str, Any] | None:
path = self._path(key)
if not path.exists():
return None
return json.loads(path.read_text())
return json.loads(path.read_text()) # type: ignore[no-any-return]

async def put(self, key: str, value: dict[str, Any]) -> None:
path = self._path(key)
Expand Down
41 changes: 27 additions & 14 deletions examples/multiagent-textual/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import os
import warnings

from typing import Any

import fastapi
import pydantic

Expand Down Expand Up @@ -58,7 +60,7 @@ class Approval(pydantic.BaseModel):
# ---------------------------------------------------------------------------


async def mothership_branch(llm: ai.LanguageModel, query: str):
async def mothership_branch(llm: ai.LanguageModel, query: str) -> ai.StreamResult:
"""Agent that contacts the mothership, gated by an approval hook."""
messages = ai.make_messages(
system="You are assistant 1. Use contact_mothership when asked about the future.",
Expand All @@ -74,23 +76,27 @@ async def mothership_branch(llm: ai.LanguageModel, query: str):

for tc in result.tool_calls:
if tc.tool_name == "contact_mothership":
approval = await Approval.create(
# TODO: mypy doesn't support class decorators that change the
# class type — @ai.hook returns type[Hook[T]] but mypy still
# sees the original BaseModel.
approval = await Approval.create( # type: ignore[attr-defined]
f"mothership_{tc.tool_call_id}",
metadata={"branch": "mothership", "tool": tc.tool_name},
)
if approval.granted:
await ai.execute_tool(tc, message=result.last_message)
else:
tc.set_result(f"Denied: {approval.reason}")
tc.set_error(f"Denied: {approval.reason}")
else:
await ai.execute_tool(tc, message=result.last_message)

messages.append(result.last_message)
if result.last_message is not None:
messages.append(result.last_message)

return result


async def data_center_branch(llm: ai.LanguageModel, query: str):
async def data_center_branch(llm: ai.LanguageModel, query: str) -> ai.StreamResult:
"""Agent that contacts data centers, gated by an approval hook."""
messages = ai.make_messages(
system="You are assistant 2. Use contact_data_centers when asked about the future.",
Expand All @@ -106,18 +112,22 @@ async def data_center_branch(llm: ai.LanguageModel, query: str):

for tc in result.tool_calls:
if tc.tool_name == "contact_data_centers":
approval = await Approval.create(
# TODO: mypy doesn't support class decorators that change the
# class type — @ai.hook returns type[Hook[T]] but mypy still
# sees the original BaseModel.
approval = await Approval.create( # type: ignore[attr-defined]
f"data_centers_{tc.tool_call_id}",
metadata={"branch": "data_centers", "tool": tc.tool_name},
)
if approval.granted:
await ai.execute_tool(tc, message=result.last_message)
else:
tc.set_result(f"Access denied: {approval.reason}")
tc.set_error(f"Access denied: {approval.reason}")
else:
await ai.execute_tool(tc, message=result.last_message)

messages.append(result.last_message)
if result.last_message is not None:
messages.append(result.last_message)

return result

Expand All @@ -127,7 +137,7 @@ async def data_center_branch(llm: ai.LanguageModel, query: str):
# ---------------------------------------------------------------------------


async def multiagent(llm: ai.LanguageModel, query: str):
async def multiagent(llm: ai.LanguageModel, query: str) -> ai.StreamResult:
"""Run two gated agents in parallel, then summarise their results."""
r1, r2 = await asyncio.gather(
mothership_branch(llm, query),
Expand All @@ -154,7 +164,7 @@ async def multiagent(llm: ai.LanguageModel, query: str):
# ---------------------------------------------------------------------------


def _normalise_message(data: dict) -> dict:
def _normalise_message(data: dict[str, Any]) -> dict[str, Any]:
"""Ensure ToolPart.result is always a dict for safe deserialisation."""
for part in data.get("parts", []):
if part.get("type") == "tool" and isinstance(part.get("result"), str):
Expand All @@ -168,7 +178,7 @@ def _normalise_message(data: dict) -> dict:


@app.websocket("/ws")
async def ws_endpoint(websocket: fastapi.WebSocket):
async def ws_endpoint(websocket: fastapi.WebSocket) -> None:
await websocket.accept()
print("Client connected")

Expand All @@ -181,13 +191,16 @@ async def ws_endpoint(websocket: fastapi.WebSocket):
result = ai.run(multiagent, llm, "When will the robots take over?")

# Background task: read hook resolutions from the client.
async def read_resolutions():
async def read_resolutions() -> None:
try:
while True:
raw = await websocket.receive_text()
data = json.loads(raw)
print(f" Resolution received: {data['hook_id']}")
Approval.resolve(
# TODO: mypy doesn't support class decorators that change the
# class type — @ai.hook returns type[Hook[T]] but mypy still
# sees the original BaseModel.
Approval.resolve( # type: ignore[attr-defined]
data["hook_id"],
{"granted": data["granted"], "reason": data["reason"]},
)
Expand Down Expand Up @@ -220,5 +233,5 @@ async def read_resolutions():


@app.get("/api/health")
async def health():
async def health() -> dict[str, str]:
return {"status": "ok"}
7 changes: 5 additions & 2 deletions examples/samples/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import vercel_ai_sdk.agent as agent


async def main():
async def main() -> None:
llm = ai.openai.OpenAIModel(
model="anthropic/claude-sonnet-4-20250514",
base_url="https://ai-gateway.vercel.sh/v1",
Expand All @@ -25,7 +25,10 @@ async def main():
async for msg in coding_agent.run(messages):
# Auto-approve all tool calls
if (hook := msg.get_hook_part()) and hook.status == "pending":
agent.ToolApproval.resolve(hook.hook_id, {"granted": True})
# TODO: mypy doesn't support class decorators that change the
# class type — @ai.hook returns type[Hook[T]] but mypy still
# sees the original BaseModel.
agent.ToolApproval.resolve(hook.hook_id, {"granted": True}) # type: ignore[attr-defined]

if msg.text_delta:
print(msg.text_delta, end="", flush=True)
Expand Down
11 changes: 7 additions & 4 deletions examples/samples/custom_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import os
from collections.abc import AsyncGenerator

from typing import Any

import vercel_ai_sdk as ai


Expand All @@ -25,7 +27,7 @@ async def get_population(city: str) -> int:
async def custom_stream_step(
llm: ai.LanguageModel,
messages: list[ai.Message],
tools: list[ai.Tool],
tools: list[ai.Tool[..., Any]],
label: str | None = None,
) -> AsyncGenerator[ai.Message, None]:
"""Wraps llm.stream to inject a label on every message."""
Expand All @@ -34,7 +36,7 @@ async def custom_stream_step(
yield msg


async def agent(llm: ai.LanguageModel, user_query: str):
async def agent(llm: ai.LanguageModel, user_query: str) -> ai.StreamResult:
"""Custom agent loop with manual tool execution.

Uses @ai.stream for custom streaming, stream_step-style while loop,
Expand All @@ -52,7 +54,8 @@ async def agent(llm: ai.LanguageModel, user_query: str):
if not result.tool_calls:
return result

messages.append(result.last_message)
if result.last_message is not None:
messages.append(result.last_message)
await asyncio.gather(
*(
ai.execute_tool(tc, message=result.last_message)
Expand All @@ -61,7 +64,7 @@ async def agent(llm: ai.LanguageModel, user_query: str):
)


async def main():
async def main() -> None:
llm = ai.anthropic.AnthropicModel(
model="anthropic/claude-sonnet-4",
base_url="https://ai-gateway.vercel.sh",
Expand Down
19 changes: 13 additions & 6 deletions examples/samples/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class CommunicationApproval(pydantic.BaseModel):
reason: str


async def graph(llm: ai.LanguageModel, query: str):
async def graph(llm: ai.LanguageModel, query: str) -> ai.StreamResult:
messages = ai.make_messages(
system="Use the contact_mothership tool when asked about the future.",
user=query,
Expand All @@ -36,23 +36,27 @@ async def graph(llm: ai.LanguageModel, query: str):
for tc in result.tool_calls:
if tc.tool_name == "contact_mothership":
# Blocks until resolved (long-running) or cancelled (serverless)
approval = await CommunicationApproval.create(
# TODO: mypy doesn't support class decorators that change the
# class type — @ai.hook returns type[Hook[T]] but mypy still
# sees the original BaseModel.
approval = await CommunicationApproval.create( # type: ignore[attr-defined]
f"approve_{tc.tool_call_id}",
metadata={"tool": tc.tool_name},
)
if approval.granted:
await ai.execute_tool(tc, message=result.last_message)
else:
tc.set_result({"error": f"Rejected: {approval.reason}"})
tc.set_error(f"Rejected: {approval.reason}")
else:
await ai.execute_tool(tc, message=result.last_message)

messages.append(result.last_message)
if result.last_message is not None:
messages.append(result.last_message)

return result


async def main():
async def main() -> None:
llm = ai.openai.OpenAIModel(
model="anthropic/claude-sonnet-4-20250514",
base_url="https://ai-gateway.vercel.sh/v1",
Expand All @@ -63,7 +67,10 @@ async def main():
# Hook parts arrive as pending, waiting for resolution
if (hook := msg.get_hook_part()) and hook.status == "pending":
answer = input(f"Approve {hook.hook_id}? [y/n] ")
CommunicationApproval.resolve(
# TODO: mypy doesn't support class decorators that change the
# class type — @ai.hook returns type[Hook[T]] but mypy still
# sees the original BaseModel.
CommunicationApproval.resolve( # type: ignore[attr-defined]
hook.hook_id,
{
"granted": answer.strip().lower() in ("y", "yes"),
Expand Down
8 changes: 5 additions & 3 deletions examples/samples/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@

import rich

from typing import Any

import vercel_ai_sdk as ai


async def context7_agent(llm: ai.LanguageModel, user_query: str):
async def context7_agent(llm: ai.LanguageModel, user_query: str) -> ai.StreamResult:
"""Agent with Context7 MCP tools for up-to-date library documentation."""

context7_tools: list[ai.Tool] = await ai.mcp.get_http_tools(
context7_tools: list[ai.Tool[..., Any]] = await ai.mcp.get_http_tools(
"https://mcp.context7.com/mcp",
headers={"CONTEXT7_API_KEY": os.environ.get("CONTEXT7_API_KEY", "")},
tool_prefix="context7",
Expand All @@ -28,7 +30,7 @@ async def context7_agent(llm: ai.LanguageModel, user_query: str):
)


async def main():
async def main() -> None:
llm = ai.openai.OpenAIModel(
model="openai/gpt-4.1",
base_url="https://ai-gateway.vercel.sh/v1",
Expand Down
Loading