-
Notifications
You must be signed in to change notification settings - Fork 97
Expand file tree
/
Copy pathstreaming_tools.py
More file actions
247 lines (204 loc) · 12.2 KB
/
streaming_tools.py
File metadata and controls
247 lines (204 loc) · 12.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
"""
Tool Streaming Example - FIXED
===============================
Real-time tool call streaming with automatic execution.
Setup:
pip install cascadeflow[all]
export OPENAI_API_KEY="sk-..."
Run:
python examples/streaming_tools.py
What You'll See:
- Tool calls being parsed as JSON arrives
- Automatic tool execution
- Results fed back to the model for final answer
Documentation:
📖 Streaming Guide: docs/guides/streaming.md#tool-streaming
📖 Quick Start: docs/guides/quickstart.md
📚 Examples README: examples/README.md
"""
import asyncio
import os
from cascadeflow import CascadeAgent, ModelConfig
from cascadeflow.streaming import ToolStreamEventType
# ═══════════════════════════════════════════════════════════════════════════
# STEP 1: Define Tools in Universal Format
# ═══════════════════════════════════════════════════════════════════════════
# IMPORTANT: Use "universal format" (not OpenAI format)
# This works with ALL providers (OpenAI, Anthropic, Groq, etc.)
#
# Universal format structure:
# {
# "name": "function_name", ← Direct property
# "description": "what it does", ← Direct property
# "parameters": {JSON Schema} ← Direct property
# }
#
# ❌ WRONG (OpenAI format - don't use this):
# {
# "type": "function", ← Extra wrapper
# "function": {
# "name": "...",
# ...
# }
# }
#
# cascadeflow converts universal format → provider format automatically
WEATHER_TOOL = [
{
"name": "get_weather",
"description": "Get current weather for a location",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string", "description": "City name (e.g., 'Paris', 'Tokyo')"},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "Temperature unit",
},
},
"required": ["location"], # location is required, unit is optional
},
}
]
# ═══════════════════════════════════════════════════════════════════════════
# STEP 2: Implement Tool Function (Not used in this example)
# ═══════════════════════════════════════════════════════════════════════════
# Note: This example shows tool call STREAMING only.
# For actual tool EXECUTION, see examples/tool_execution.py
#
# In production, you would:
# 1. Create ToolConfig objects with function=your_function
# 2. Use ToolExecutor to execute tool calls
# 3. Feed results back to the model
#
# This example focuses on the STREAMING aspect (watching tool calls form)
def get_weather(location: str, unit: str = "celsius") -> str:
"""
Mock weather function - for reference only.
Not used in this streaming example.
"""
weather = {
"paris": {"temp_c": 15, "condition": "Partly cloudy"},
"tokyo": {"temp_c": 22, "condition": "Sunny"},
"london": {"temp_c": 12, "condition": "Rainy"},
"new york": {"temp_c": 18, "condition": "Clear"},
}
city = location.lower()
if city in weather:
data = weather[city]
temp = data["temp_c"]
if unit.lower() == "fahrenheit":
temp = int(temp * 9 / 5 + 32)
unit_symbol = "°F"
else:
unit_symbol = "°C"
return f"{location}: {temp}{unit_symbol}, {data['condition']}"
else:
return f"Weather data not available for {location}"
async def main():
# ═══════════════════════════════════════════════════════════════════════
# STEP 3: Check API Key
# ═══════════════════════════════════════════════════════════════════════
if not os.getenv("OPENAI_API_KEY"):
print("❌ Set OPENAI_API_KEY first: export OPENAI_API_KEY='sk-...'")
return
print("🔧 cascadeflow Tool Streaming\n")
# ═══════════════════════════════════════════════════════════════════════
# STEP 4: Setup Agent with Cascade (REQUIRED for streaming)
# ═══════════════════════════════════════════════════════════════════════
# ✅ FIX: Need 2+ models for streaming to work!
agent = CascadeAgent(
models=[
ModelConfig(name="gpt-4o-mini", provider="openai", cost=0.00015),
ModelConfig(name="gpt-4o", provider="openai", cost=0.00625),
]
)
# Streaming is automatically available with 2+ models
print("✓ Agent ready with 2-model cascade")
print("✓ Streaming enabled (text and tools)\n")
# ═══════════════════════════════════════════════════════════════════════
# EXAMPLE 1: Single Tool Call (Streaming Events)
# ═══════════════════════════════════════════════════════════════════════
# Watch tool calls being parsed in real-time
# Note: execute_tools parameter doesn't actually execute in this example
# It just shows the streaming events
print("=" * 60)
print("Example 1: Tool call streaming events\n")
print("Q: What's the weather in Paris?\n")
# ✅ FIX: Use stream_events() not agent.tool_streaming_manager.stream()
async for event in agent.stream_events("What's the weather in Paris?", tools=WEATHER_TOOL):
# ───────────────────────────────────────────────────────────────────
# EVENT: CHUNK - Regular text output (deprecated in tool mode)
# ───────────────────────────────────────────────────────────────────
# May appear as ToolStreamEventType.TEXT_CHUNK
if event.type == ToolStreamEventType.TEXT_CHUNK:
print(event.content, end="", flush=True)
# ───────────────────────────────────────────────────────────────────
# EVENT: TOOL_CALL_START - Tool call detected
# ───────────────────────────────────────────────────────────────────
elif event.type == ToolStreamEventType.TOOL_CALL_START:
print("\n🔧 Tool call starting...")
# ───────────────────────────────────────────────────────────────────
# EVENT: TOOL_CALL_COMPLETE - Tool call fully parsed
# ───────────────────────────────────────────────────────────────────
elif event.type == ToolStreamEventType.TOOL_CALL_COMPLETE:
tool = event.data.get("tool_call", {})
print(f"🔧 Tool: {tool.get('name')}({tool.get('arguments')})")
# ───────────────────────────────────────────────────────────────────
# EVENT: COMPLETE - All done
# ───────────────────────────────────────────────────────────────────
elif event.type == ToolStreamEventType.COMPLETE:
print("\n✅ Streaming complete")
# ═══════════════════════════════════════════════════════════════════════
# EXAMPLE 2: Multiple Tool Calls
# ═══════════════════════════════════════════════════════════════════════
print("\n" + "=" * 60)
print("Example 2: Multiple tool calls\n")
print("Q: Compare weather in Paris and Tokyo\n")
async for event in agent.stream_events(
"Compare the weather in Paris and Tokyo. Which is warmer?", tools=WEATHER_TOOL
):
if event.type == ToolStreamEventType.TEXT_CHUNK:
print(event.content, end="", flush=True)
elif event.type == ToolStreamEventType.TOOL_CALL_START:
print("\n🔧 Tool call starting...")
elif event.type == ToolStreamEventType.TOOL_CALL_COMPLETE:
tool = event.data.get("tool_call", {})
print(f"🔧 Tool: {tool.get('name')}({tool.get('arguments')})")
elif event.type == ToolStreamEventType.COMPLETE:
print("\n✅ Streaming complete")
# ═══════════════════════════════════════════════════════════════════════
# Summary - What You Learned
# ═══════════════════════════════════════════════════════════════════════
print("\n" + "=" * 60)
print("\n✅ Done! Key takeaways:")
print("\n Tool Definition (Universal Format):")
print(" ├─ Use direct properties: name, description, parameters")
print(" ├─ Works with ALL providers (OpenAI, Anthropic, Groq)")
print(" └─ DON'T wrap in {'type': 'function', 'function': {...}}")
print("\n Streaming Requirements:")
print(" ├─ Need 2+ models for cascade")
print(" └─ Use agent.stream() or agent.stream_events() for streaming")
print("\n Tool Events:")
print(" ├─ TOOL_CALL_START: Tool call detected")
print(" ├─ TOOL_CALL_COMPLETE: Full JSON parsed")
print(" └─ TEXT_CHUNK: Regular text between tools")
print("\n IMPORTANT:")
print(" ├─ This example shows STREAMING only (watching tool calls form)")
print(" ├─ For actual tool EXECUTION, see examples/tool_execution.py")
print(" └─ Need ToolConfig + ToolExecutor for real execution")
print("\n📚 Learn more:")
print(" • docs/guides/streaming.md - Full streaming guide")
print(" • examples/tool_execution.py - Real tool execution")
print(" • tests/test_tool_calling.py - Comprehensive tool tests\n")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n\n⚠️ Interrupted by user")
except Exception as e:
print(f"\n\n❌ Error: {e}")
import traceback
traceback.print_exc()
print("\n💡 Tip: Make sure OPENAI_API_KEY is set correctly")