Multi-strategy agent orchestration. Define pipelines with decorators, control flow with exceptions, run locally or distributed.
Backed by EggAI SDK for distributed execution.
pip install eggai-clutchimport asyncio
from pydantic import BaseModel
from eggai_clutch import Clutch, Terminate
class Document(BaseModel):
content: str
chunks: list[str] = []
summary: str = ""
clutch = Clutch("rag-pipeline")
@clutch.agent()
async def chunker(doc: Document) -> Document:
doc.chunks = [doc.content[i:i+500] for i in range(0, len(doc.content), 500)]
return doc
@clutch.agent()
async def summarizer(doc: Document) -> Document:
doc.summary = f"Summary of {len(doc.chunks)} chunks"
raise Terminate(doc)
async def main():
result = await clutch.run(Document(content="..." * 1000))
print(result["summary"])
asyncio.run(main())| Strategy | Description | Use Case |
|---|---|---|
SEQUENTIAL |
Execute agents in order | Pipelines, ETL |
ROUND_ROBIN |
Cycle through agents | Iterative refinement |
GRAPH |
Follow explicit edges | Complex workflows |
SELECTOR |
Dynamic routing via function | Intent classification, triage |
Agents execute in registration order. Data flows from one to the next.
clutch = Clutch("pipeline")
@clutch.agent()
async def step_a(data): ...
@clutch.agent()
async def step_b(data): ...
@clutch.agent()
async def step_c(data):
raise Terminate(data)A selector function dynamically picks the next agent.
from eggai_clutch import Clutch, Strategy, Terminate
clutch = Clutch("router", Strategy.SELECTOR)
@clutch.selector
async def route(ticket: Ticket) -> str:
if "billing" in ticket.query:
return "billing"
return "general"
@clutch.agent()
async def billing(ticket: Ticket):
ticket.response = "Billing handled"
raise Terminate(ticket)
@clutch.agent()
async def general(ticket: Ticket):
ticket.response = "General handled"
raise Terminate(ticket)Define explicit edges between agents.
clutch = Clutch("workflow", Strategy.GRAPH)
@clutch.agent(edges=["validate"])
async def parse(data): ...
@clutch.agent(edges=["store"])
async def validate(data): ...
@clutch.agent() # No edges = terminal
async def store(data):
raise Terminate(data)Stop execution and return a result:
from eggai_clutch import Terminate
@clutch.agent()
async def final_step(data):
raise Terminate({"status": "done", "result": data})Transfer control to a specific agent:
from eggai_clutch import Handover
@clutch.agent()
async def router(data):
if needs_review:
raise Handover("reviewer", data)
return data# Submit and wait
result = await clutch.run(input_data, timeout=30.0)
# Submit and get handle
task = await clutch.submit(input_data)
print(task.done) # Non-blocking check
result = await task # Await directly
result = await task.result(timeout=10.0)
task.cancel()
# Stream step events
async for event in clutch.stream(input_data):
print(f"Step: {event.step}, Final: {event.final}")Add a transport to run across multiple processes/machines:
from eggai import RedisTransport
transport = RedisTransport(url="redis://localhost:6379")
clutch = Clutch("pipeline", transport=transport)
# Worker process
await clutch.submit(data) # Routes to distributed workers
# Cleanup
await clutch.stop()Supports all EggAI transports: InMemory, Redis Streams, Kafka.
clutch = Clutch(
"pipeline",
on_request=async_fn, # Before processing
on_response=async_fn, # After completion
on_step=async_fn, # After each step
)See examples/ for complete examples:
- rag_pipeline.py - Document processing pipeline
- support_triage.py - Intent-based routing with Selector
- code_review.py - Multi-stage code analysis
Full RAG application with web UI: eggai-clutch-demo
MIT