diff --git a/.github/workflows/pull_request_ci.yml b/.github/workflows/pull_request_ci.yml index 26360d94..7e51d48a 100644 --- a/.github/workflows/pull_request_ci.yml +++ b/.github/workflows/pull_request_ci.yml @@ -134,7 +134,10 @@ jobs: echo "No Python files changed; skipping typecheck." exit 0 fi - echo "$CHANGED_PY" | tr '\n' '\0' | xargs -0 mypy --ignore-missing-imports --follow-imports=silent + while IFS= read -r py_file; do + [ -z "$py_file" ] && continue + mypy --ignore-missing-imports --follow-imports=silent --explicit-package-bases "$py_file" + done <<< "$CHANGED_PY" validate-architecture: runs-on: ubuntu-latest diff --git a/README.md b/README.md index f51b86e9..528a7c01 100644 --- a/README.md +++ b/README.md @@ -144,6 +144,7 @@ innovation-lab-examples/ | [Crewai-agents](Crewai-agents/) | CrewAI agents β€” trip planner, code analyzer, meeting prep, blood report | Python, CrewAI, uAgents | πŸŸ‘β€“πŸ”΄ Collection | | [ag2-agents](ag2-agents/) | AG2 framework β€” research synthesis, payment approval | Python, AG2, uAgents | πŸ”΄ Advanced | | [community_agent](community_agent/) | AI community growth agent for events and hackathons | Python, uAgents, ASI:One, Tavily | 🟑 Intermediate | +| [pdf-podcast-agent](pdf-podcast-agent/) | PDF-to-podcast with live debate, Q&A, and Stripe payments | Python, uAgents, ASI:One, OpenAI TTS, Stripe | πŸ”΄ Advanced | ### 🌐 Web3 & Blockchain diff --git a/pdf-podcast-agent/.env.example b/pdf-podcast-agent/.env.example new file mode 100644 index 00000000..2e39c866 --- /dev/null +++ b/pdf-podcast-agent/.env.example @@ -0,0 +1,71 @@ +# ───────────────────────────────────────────────────────────────────────────── +# PDF-to-Podcast Agent – environment variables +# Copy this file to .env and fill in your real values. +# Never commit .env to version control. +# ───────────────────────────────────────────────────────────────────────────── + +# ── Required ────────────────────────────────────────────────────────────────── + +# ASI:One API key β€” used for all text generation (debate, DOCX, host Q&A) +# Get yours at https://asi1.ai/ +ASI1_API_KEY= + +# OpenAI API key β€” used for TTS audio generation only +OPENAI_API_KEY=sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + +# Agentverse / ASI:One API key (needed for agent registration & mailbox) +AGENTVERSE_API_KEY=AV-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + +# ── Sub-agent addresses (run `python get_addresses.py` to get these) ───────── +EXTRACTOR_ADDRESS= +SCRIPTWRITER_ADDRESS= +VOICE_STUDIO_ADDRESS= + +# Post-show Q&A host agents (start host_a_agent.py + host_b_agent.py first) +HOST_A_ADDRESS= +HOST_B_ADDRESS= + +# ── Optional: override agent seed phrases ───────────────────────────────────── +# Changing a seed changes the agent's on-chain address. +# Leave blank to use the built-in defaults. +ORCHESTRATOR_SEED= +EXTRACTOR_SEED= +SCRIPTWRITER_SEED= +VOICE_STUDIO_SEED= +HOST_A_SEED= +HOST_B_SEED= + +# ── Optional: LLM / TTS model overrides ────────────────────────────────────── +# ASI:One text generation model (default: asi1-mini) +ASI1_MODEL=asi1-mini + +# Extraction + scripting model (sub-agents) +EXTRACTION_MODEL=gpt-4o-mini +SCRIPTWRITER_MODEL=gpt-4o-mini + +# TTS model: "tts-1" (fast) or "tts-1-hd" (higher quality) +TTS_MODEL=tts-1 + +# Host voices (OpenAI: alloy | echo | fable | onyx | nova | shimmer) +VOICE_HOST_A=alloy +VOICE_HOST_B=echo + +# ── Optional: audio ─────────────────────────────────────────────────────────── +# Milliseconds of silence injected between each dialogue line (default: 400) +SILENCE_MS=400 + +# Directory where MP3 files are saved (default: ./output) +OUTPUT_DIR=output + +# ── Stripe Payments (Live Show Pass) ────────────────────────────────────────── +# Get your API keys at https://dashboard.stripe.com/apikeys +# Leave STRIPE_SECRET_KEY blank to disable payment gating (free/dev mode). +STRIPE_SECRET_KEY=sk_test_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx +STRIPE_PUBLISHABLE_KEY=pk_test_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + +# Price of the Live Show Pass in USD cents (default: 1000 = $10.00) +STRIPE_LIVE_SHOW_PRICE_CENTS=1000 + +# URL shown after a successful payment +STRIPE_SUCCESS_URL=https://asi1.ai +STRIPE_CANCEL_URL=https://asi1.ai diff --git a/pdf-podcast-agent/.gitignore b/pdf-podcast-agent/.gitignore new file mode 100644 index 00000000..e25466fe --- /dev/null +++ b/pdf-podcast-agent/.gitignore @@ -0,0 +1,28 @@ +# Environment & secrets +.env + +# Python +venv/ +.venv/ +__pycache__/ +*.py[cod] +*.egg-info/ + +# Agent runtime state (deterministic β€” regenerated on startup) +agent1q*_data.json + +# Generated output +output/*.mp3 +output/*.docx +output/*.png + +# OS files +.DS_Store +Thumbs.db +desktop.ini + +# IDE +.vscode/ +.idea/ +*.swp +*.swo diff --git a/pdf-podcast-agent/README.md b/pdf-podcast-agent/README.md new file mode 100644 index 00000000..9b0f26ba --- /dev/null +++ b/pdf-podcast-agent/README.md @@ -0,0 +1,203 @@ +# PDF-to-Podcast Agent + +Turn any research PDF into a 2-host debate podcast with live Q&A, interactive debates, and Stripe-gated premium features β€” all inside ASI:One. + +- **Category:** `Multi-Agent`, `LLM`, `Payments`, `RAG` +- **Difficulty:** Advanced +- **Tech stack:** Python, uAgents, ASI:One, OpenAI TTS, Stripe, pydub, pdfplumber + +## Overview + +Reading dense research papers causes serious fatigue. This agent replicates the "NotebookLM" podcast experience: upload a PDF, get back an MP3 debate between two AI hosts β€” a Skeptic and an Expert. After the podcast, the hosts stay in chat for live Q&A and can run a turn-by-turn live debate, gated by a $10 Stripe payment via the AgentPaymentProtocol. + +## Features + +- βœ… PDF text extraction with pdfplumber +- βœ… RAG-style key insight extraction (core thesis, metrics, controversy) +- βœ… AI-generated debate script (10–14 lines) between a Skeptic and an Expert +- βœ… Parallel TTS voice synthesis with pydub audio stitching β†’ MP3 +- βœ… DOCX transcript with extended director's cut +- βœ… Free live Q&A β€” tag @skeptic-agent or @expert-agent in chat +- βœ… Paid live debate β€” 8-turn agent-to-agent debate streamed in chat +- βœ… Host personality customization β€” 16 combos across 4 presets +- βœ… Stripe payment gate ($10) via AgentPaymentProtocol (embedded checkout) + +## Prerequisites + +- Python 3.10+ +- ffmpeg on PATH (`winget install ffmpeg` Β· `brew install ffmpeg` Β· `apt install ffmpeg`) +- ASI:One API key β€” [asi1.ai](https://asi1.ai) +- OpenAI API key (TTS only) β€” [platform.openai.com](https://platform.openai.com) +- Agentverse API key β€” [agentverse.ai](https://agentverse.ai) +- Stripe API keys (optional) β€” [dashboard.stripe.com](https://dashboard.stripe.com/apikeys) + +## Installation + +```bash +cd innovation-lab-examples/pdf-podcast-agent + +python -m venv .venv +source .venv/bin/activate # macOS / Linux +# .\.venv\Scripts\Activate.ps1 # Windows + +pip install -r requirements.txt +``` + +## Environment Variables + +Create a `.env` file using `.env.example`: + +```bash +cp .env.example .env +``` + +Then run `python get_addresses.py` and paste the printed addresses into `.env`. + +### Variables + +| Variable | Required | Description | +|----------|----------|-------------| +| `ASI1_API_KEY` | Yes | ASI:One LLM β€” all text generation | +| `OPENAI_API_KEY` | Yes | OpenAI TTS β€” audio synthesis only | +| `AGENTVERSE_API_KEY` | Yes | Agent registration and mailbox | +| `EXTRACTOR_ADDRESS` | Yes | From `get_addresses.py` | +| `SCRIPTWRITER_ADDRESS` | Yes | From `get_addresses.py` | +| `VOICE_STUDIO_ADDRESS` | Yes | From `get_addresses.py` | +| `HOST_A_ADDRESS` | Yes | From `get_addresses.py` | +| `HOST_B_ADDRESS` | Yes | From `get_addresses.py` | +| `STRIPE_SECRET_KEY` | No | Enables Stripe payment gate | +| `STRIPE_PUBLISHABLE_KEY` | No | For embedded Stripe checkout | + +## Run the Agent + +Start all 6 agents (sub-agents first, orchestrator last): + +```bash +python extractor_agent.py # Terminal 1 +python scriptwriter_agent.py # Terminal 2 +python voice_studio_agent.py # Terminal 3 +python host_a_agent.py # Terminal 4 +python host_b_agent.py # Terminal 5 +python orchestrator.py # Terminal 6 (start last) +``` + +Or use the convenience launcher: + +```bash +python run.py +``` + +## Expected Output + +After starting, the orchestrator prints: + +``` +INFO: [pdf_podcast_orchestrator]: Starting agent with address: agent1q... +INFO: [pdf_podcast_orchestrator]: Agent inspector available at https://agentverse.ai/inspect/?uri=... +INFO: [pdf_podcast_orchestrator]: [Orchestrator] sub-agents wired: + Extractor agent1q... + Scriptwriter agent1q... + Voice Studio agent1q... + Host A agent1q... + Host B agent1q... +``` + +When a user uploads a PDF via ASI:One, the agent replies with: + +- Episode title and duration +- MP3 download link +- DOCX transcript link +- Script preview (core argument, metrics, controversy) +- Live Q&A prompt (free) and Live Show Pass ($10) + +## Demo + +PDF upload and podcast generation: + +![PDF upload and generated podcast](./assets/demo-podcast-ready.png) + +Live Q&A and paid live show entry point: + +![Live Q&A and paid pass prompt](./assets/demo-live-qa-pass.png) + +Stripe payment confirmation for Live Show Pass: + +![Stripe payment flow](./assets/demo-stripe-pass.png) + +Host personality customization menu: + +![Host personality customization](./assets/demo-customize-hosts.png) + +## Architecture + +```text +User uploads PDF to ASI:One + β”‚ + β–Ό + β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” ExtractRequest β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” + β”‚ Orchestrator │──────────────────────▢│ RAG Extractorβ”‚ + β”‚ (port 8000) │◀─ ResearchInsights ──│ (port 8001) β”‚ + β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ β”‚ PodcastScript β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” + β”‚ │──────────────────────▢│ Scriptwriter β”‚ + β”‚ │◀─ PodcastScript ─────│ (port 8002) β”‚ + β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ β”‚ PodcastScript β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” + β”‚ │──────────────────────▢│ Voice Studio β”‚ + β”‚ │◀─ AudioResponse ─────│ (port 8003) β”‚ + β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ β”‚ ContextInjection β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” + β”‚ │─────────────────────▢│ Host A β”‚ + β”‚ β”‚ DebateTurn β”‚ "Skeptic" β”‚ + β”‚ │─────────────────────▢│ (port 8004) β”‚ + β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ β”‚ ContextInjection β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” + β”‚ │─────────────────────▢│ Host B β”‚ + β”‚ β”‚ DebateTurn β”‚ "Expert" β”‚ + β”‚ │─────────────────────▢│ (port 8005) β”‚ + β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +``` + +### Payment flow + +1. User types **"debate"** or **"customize"** in chat +2. Orchestrator sends `RequestPayment` with embedded Stripe Checkout (`ui_mode="embedded"`) +3. User completes payment in the Stripe overlay +4. ASI:One sends `CommitPayment` β†’ Orchestrator verifies β†’ sends `CompletePayment` +5. User types **"continue debate"** to start the live show + +## Project Structure + +```text +pdf-podcast-agent/ +β”œβ”€β”€ orchestrator.py # Main agent β€” chat protocol, payment, debate relay +β”œβ”€β”€ extractor_agent.py # Agent 1: RAG extraction (port 8001) +β”œβ”€β”€ scriptwriter_agent.py # Agent 2: Debate script generation (port 8002) +β”œβ”€β”€ voice_studio_agent.py # Agent 3: TTS + audio stitching (port 8003) +β”œβ”€β”€ host_a_agent.py # Host A: The Skeptic β€” Q&A + debate (port 8004) +β”œβ”€β”€ host_b_agent.py # Host B: The Expert β€” Q&A + debate (port 8005) +β”œβ”€β”€ schemas.py # Pydantic models (typed contracts between agents) +β”œβ”€β”€ get_addresses.py # Prints agent addresses (run first) +β”œβ”€β”€ run.py # Convenience launcher (all 6 agents) +β”œβ”€β”€ test_pipeline.py # Local smoke test +β”œβ”€β”€ requirements.txt +β”œβ”€β”€ .env.example +β”œβ”€β”€ render.yaml # Render.com deployment config +└── output/ # Generated MP3/DOCX files (gitignored) +``` + +## Troubleshooting + +- **"Missing ASI1_API_KEY / OPENAI_API_KEY"** β€” Check `.env` is created from `.env.example` with real values. +- **"I do not have enough funds to register on Almanac contract"** β€” Safe to ignore for local dev. +- **Port already in use** β€” Kill stale process on that port and restart. +- **Debate hosts repeat arguments** β€” Full debate history is passed via `debate_history` field with anti-repetition prompts. + +## Resources + +- [Innovation Lab Docs](https://innovationlab.fetch.ai/resources/docs/intro) +- [Agentverse](https://agentverse.ai/) +- [ASI:One API](https://asi1.ai/) +- [uAgents Framework](https://github.com/fetchai/uAgents) +- [Agent Payment Protocol](https://innovationlab.fetch.ai/resources/docs/payments) +- [Stripe Sandboxes](https://docs.stripe.com/sandboxes) diff --git a/pdf-podcast-agent/assets/.gitkeep b/pdf-podcast-agent/assets/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/pdf-podcast-agent/assets/demo-customize-hosts.png b/pdf-podcast-agent/assets/demo-customize-hosts.png new file mode 100644 index 00000000..30c1bea5 Binary files /dev/null and b/pdf-podcast-agent/assets/demo-customize-hosts.png differ diff --git a/pdf-podcast-agent/assets/demo-live-qa-pass.png b/pdf-podcast-agent/assets/demo-live-qa-pass.png new file mode 100644 index 00000000..c1d49636 Binary files /dev/null and b/pdf-podcast-agent/assets/demo-live-qa-pass.png differ diff --git a/pdf-podcast-agent/assets/demo-podcast-ready.png b/pdf-podcast-agent/assets/demo-podcast-ready.png new file mode 100644 index 00000000..a9c0fe6c Binary files /dev/null and b/pdf-podcast-agent/assets/demo-podcast-ready.png differ diff --git a/pdf-podcast-agent/assets/demo-stripe-pass.png b/pdf-podcast-agent/assets/demo-stripe-pass.png new file mode 100644 index 00000000..f1fafd60 Binary files /dev/null and b/pdf-podcast-agent/assets/demo-stripe-pass.png differ diff --git a/pdf-podcast-agent/extractor_agent.py b/pdf-podcast-agent/extractor_agent.py new file mode 100644 index 00000000..7101919e --- /dev/null +++ b/pdf-podcast-agent/extractor_agent.py @@ -0,0 +1,120 @@ +""" +Agent 1 – The RAG Extractor +=========================== +Run in its own terminal: + + python extractor_agent.py + +Receives raw document text from the Orchestrator, fires a single structured +OpenAI call with JSON mode, and returns a tight ResearchInsights payload. + +This agent deliberately drops the 40-page document after extracting three +facts so that downstream agents never have to handle large payloads. +""" + +import json +import os + +from dotenv import load_dotenv +from openai import AsyncOpenAI +from uagents import Agent, Context + +from schemas import ExtractRequest, ResearchInsights + +load_dotenv() + +# ── Configuration ───────────────────────────────────────────────────────────── + +EXTRACTION_MODEL = os.getenv("EXTRACTION_MODEL", "gpt-4o-mini") +OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") + +_SYSTEM_PROMPT = """\ +You are an elite technical researcher. Read the provided document carefully. +Extract exactly three things and return ONLY valid JSON: + +{ + "core_thesis": "<2-3 sentences – the central argument or finding>", + "key_metrics": [ + "", + "", + "", + "" + ], + "controversial_point": "" +} + +Rules: +- Be SPECIFIC. Use exact numbers, percentages, or quotes from the text. +- Never be generic ("the paper discusses …"). Pull the hard facts. +- key_metrics must contain 3–6 items. +- Return ONLY the JSON object – no markdown fences, no extra prose. +""" + +# ── Agent ───────────────────────────────────────────────────────────────────── + +extractor = Agent( + name="rag_extractor", + seed=os.getenv("EXTRACTOR_SEED", "rag_extractor_podcast_seed_v1"), + port=8001, + endpoint=["http://localhost:8001/submit"], + network="testnet", +) + +client = AsyncOpenAI(api_key=OPENAI_API_KEY) + +# ── Handlers ────────────────────────────────────────────────────────────────── + + +@extractor.on_event("startup") +async def on_startup(ctx: Context) -> None: + ctx.logger.info("[RAG Extractor] ready") + ctx.logger.info(f"[RAG Extractor] address: {ctx.agent.address}") + + +@extractor.on_message(model=ExtractRequest) +async def handle_extract(ctx: Context, sender: str, msg: ExtractRequest) -> None: + sid = msg.session_id[:8] + ctx.logger.info(f"[{sid}] Extracting from {len(msg.document_text):,} chars …") + + try: + text = msg.document_text[:50_000] + + resp = await client.chat.completions.create( + model=EXTRACTION_MODEL, + response_format={"type": "json_object"}, + messages=[ + {"role": "system", "content": _SYSTEM_PROMPT}, + {"role": "user", "content": f"Document:\n\n{text}"}, + ], + temperature=0.2, + max_tokens=1_200, + ) + + raw = resp.choices[0].message.content + data = json.loads(raw) + + insights = ResearchInsights( + core_thesis=data.get("core_thesis", "Could not extract thesis."), + key_metrics=data.get("key_metrics", []), + controversial_point=data.get("controversial_point", ""), + session_id=msg.session_id, + ) + + ctx.logger.info(f"[{sid}] Done β€” {insights.core_thesis[:70]}…") + await ctx.send(sender, insights) + + except Exception as exc: + ctx.logger.error(f"[{sid}] Error: {exc}") + await ctx.send( + sender, + ResearchInsights( + core_thesis=f"Extraction failed: {str(exc)[:200]}", + key_metrics=["Error – check logs"], + controversial_point="Unable to extract insights.", + session_id=msg.session_id, + ), + ) + + +if __name__ == "__main__": + extractor.run() diff --git a/pdf-podcast-agent/get_addresses.py b/pdf-podcast-agent/get_addresses.py new file mode 100644 index 00000000..49e2b5a1 --- /dev/null +++ b/pdf-podcast-agent/get_addresses.py @@ -0,0 +1,59 @@ +""" +get_addresses.py – Address sheet printer +========================================= +Prints the deterministic on-chain addresses for all four agents WITHOUT +starting any servers. Because addresses are derived purely from seed phrases, +this is always accurate. + +Usage: + python get_addresses.py + +Copy the output into your .env (or paste the export block into your terminal) +before starting the orchestrator. +""" + +import os +from uagents import Agent + +seeds = { + "ORCHESTRATOR": os.getenv("ORCHESTRATOR_SEED", "pdf_podcast_orchestrator_seed_v1"), + "EXTRACTOR": os.getenv("EXTRACTOR_SEED", "rag_extractor_podcast_seed_v1"), + "SCRIPTWRITER": os.getenv("SCRIPTWRITER_SEED", "podcast_scriptwriter_seed_v1"), + "VOICE_STUDIO": os.getenv("VOICE_STUDIO_SEED", "voice_studio_podcast_seed_v1"), + "HOST_A": os.getenv("HOST_A_SEED", "pdf_podcast_host_a_seed_v1"), + "HOST_B": os.getenv("HOST_B_SEED", "pdf_podcast_host_b_seed_v1"), +} + +# Build agents just to read their addresses (no .run() called) +agents = {name: Agent(name=name.lower(), seed=seed) for name, seed in seeds.items()} + +print() +print("=" * 65) +print(" PDF-to-Podcast Agent Addresses") +print("=" * 65) +for name, agent in agents.items(): + print(f" {name:<16} {agent.address}") +print("=" * 65) + +print() +print("-- Paste into .env ------------------------------------------") +print(f"EXTRACTOR_ADDRESS={agents['EXTRACTOR'].address}") +print(f"SCRIPTWRITER_ADDRESS={agents['SCRIPTWRITER'].address}") +print(f"VOICE_STUDIO_ADDRESS={agents['VOICE_STUDIO'].address}") +print(f"HOST_A_ADDRESS={agents['HOST_A'].address}") +print(f"HOST_B_ADDRESS={agents['HOST_B'].address}") +print() +print("-- Or export in PowerShell ----------------------------------") +print(f'$env:EXTRACTOR_ADDRESS="{agents["EXTRACTOR"].address}"') +print(f'$env:SCRIPTWRITER_ADDRESS="{agents["SCRIPTWRITER"].address}"') +print(f'$env:VOICE_STUDIO_ADDRESS="{agents["VOICE_STUDIO"].address}"') +print(f'$env:HOST_A_ADDRESS="{agents["HOST_A"].address}"') +print(f'$env:HOST_B_ADDRESS="{agents["HOST_B"].address}"') +print() +print("-- Or export in bash ----------------------------------------") +print(f'export EXTRACTOR_ADDRESS="{agents["EXTRACTOR"].address}"') +print(f'export SCRIPTWRITER_ADDRESS="{agents["SCRIPTWRITER"].address}"') +print(f'export VOICE_STUDIO_ADDRESS="{agents["VOICE_STUDIO"].address}"') +print(f'export HOST_A_ADDRESS="{agents["HOST_A"].address}"') +print(f'export HOST_B_ADDRESS="{agents["HOST_B"].address}"') +print() diff --git a/pdf-podcast-agent/host_a_agent.py b/pdf-podcast-agent/host_a_agent.py new file mode 100644 index 00000000..25de3256 --- /dev/null +++ b/pdf-podcast-agent/host_a_agent.py @@ -0,0 +1,297 @@ +""" +Host A – The Skeptic (Post-Show Q&A Agent) +============================================ +Run in its own terminal BEFORE the Orchestrator starts: + + python host_a_agent.py + +This agent does two things: + 1. Receives a ContextInjection from the Orchestrator after each podcast is + generated and stores the paper context in persistent local storage. + 2. Responds to ChatMessages from users in ASI:One, answering in-character as + "The Skeptic" β€” the host who pushes back and demands hard evidence. + +Users interact by tagging @pdf_podcast_host_a in ASI:One chat: + "@pdf_podcast_host_a At 01:12 you said the baseline was crushed. + What was the actual sample size?" +""" + +import json +import os +from datetime import datetime, timezone +from uuid import uuid4 + +from dotenv import load_dotenv +from openai import AsyncOpenAI +from uagents import Agent, Context, Protocol + +from uagents_core.contrib.protocols.chat import ( + ChatAcknowledgement, + ChatMessage, + EndSessionContent, + TextContent, + chat_protocol_spec, +) + +from schemas import ContextInjection, DebateTurn, DebateResponse + +load_dotenv() + +# ── Config ───────────────────────────────────────────────────────────────────── + +_MODEL = os.getenv("ASI1_MODEL", "asi1-mini") +_client = AsyncOpenAI( + api_key=os.getenv("ASI1_API_KEY", ""), + base_url="https://api.asi1.ai/v1", +) + +_DEFAULT_PERSONALITY = ( + "You demand hard empirical evidence. You always ask 'what's the sample size?' " + "and 'was this peer-reviewed?'. You challenge methodology relentlessly but fairly. " + "You are NOT dismissive β€” you are intellectually rigorous." +) + +_LATEST_SESSION_KEY = "__latest_session__" +_PERSONALITY_KEY = "__personality__" + +# ── Agent ────────────────────────────────────────────────────────────────────── + +_agentverse_key = os.getenv("AGENTVERSE_API_KEY", "") + +if _agentverse_key: + host_a = Agent( + name="pdf_podcast_host_a", + seed=os.getenv("HOST_A_SEED", "pdf_podcast_host_a_seed_v1"), + port=8004, + mailbox=True, + agentverse="https://agentverse.ai", + handle_messages_concurrently=True, + network="testnet", + ) +else: + host_a = Agent( + name="pdf_podcast_host_a", + seed=os.getenv("HOST_A_SEED", "pdf_podcast_host_a_seed_v1"), + port=8004, + endpoint=["http://localhost:8004/submit"], + handle_messages_concurrently=True, + network="testnet", + ) + +chat_proto = Protocol(spec=chat_protocol_spec) + +# ── Context injection (from Orchestrator) ────────────────────────────────────── + + +@host_a.on_message(ContextInjection) +async def receive_context(ctx: Context, sender: str, msg: ContextInjection) -> None: + """Store paper context keyed by session_id and track the latest session.""" + payload = { + "topic_title": msg.topic_title, + "core_thesis": msg.core_thesis, + "key_metrics": msg.key_metrics, + "controversial_point": msg.controversial_point, + "document_snippet": msg.document_snippet, + } + ctx.storage.set(msg.session_id, json.dumps(payload)) + ctx.storage.set(_LATEST_SESSION_KEY, msg.session_id) + if msg.host_a_personality: + ctx.storage.set(_PERSONALITY_KEY, msg.host_a_personality) + ctx.logger.info( + f"[HostA] Context stored for session {msg.session_id[:8]} β€” '{msg.topic_title}'" + ) + + +# ── Debate turn (Orchestrator β†’ HostA β†’ Orchestrator) ──────────────────────── + + +@host_a.on_message(DebateTurn) +async def handle_debate_turn(ctx: Context, sender: str, msg: DebateTurn) -> None: + ctx.logger.info( + f"[HostA] Debate turn {msg.turn}/{msg.max_turns} β€” session {msg.session_id[:8]}" + ) + + history_block = "" + if msg.debate_history: + history_block = f"--- DEBATE SO FAR ---\n{msg.debate_history}\n--- END OF DEBATE HISTORY ---\n\n" + + if msg.previous_statement: + user_content = ( + f"Topic: {msg.topic_title}\n" + f"Core thesis: {msg.core_thesis}\n" + f"Key metrics: {', '.join(msg.key_metrics)}\n" + f"Controversy: {msg.controversial_point}\n" + f"Paper excerpt: {msg.document_snippet[:1500]}\n\n" + f"{history_block}" + f'The Expert just said:\n"{msg.previous_statement}"\n\n' + f"This is turn {msg.turn + 1} of {msg.max_turns}.\n" + "IMPORTANT: Do NOT repeat any argument already made in the debate history above. " + "Introduce a NEW angle, a different data point from the paper, or a fresh implication. " + "Push the conversation forward. 2-3 punchy sentences." + ) + else: + user_content = ( + f"Topic: {msg.topic_title}\n" + f"Core thesis: {msg.core_thesis}\n" + f"Key metrics: {', '.join(msg.key_metrics)}\n" + f"Controversy: {msg.controversial_point}\n" + f"Paper excerpt: {msg.document_snippet[:1500]}\n\n" + "Open the debate with a sharp, skeptical challenge. " + "2-3 provocative but grounded sentences." + ) + + personality = ( + msg.speaker_personality + or ctx.storage.get(_PERSONALITY_KEY) + or _DEFAULT_PERSONALITY + ) + system_prompt = ( + f"You are Host A β€” 'The Skeptic' β€” in a LIVE podcast debate.\n" + f"Personality: {personality}\n" + "Rules:\n" + "- Respond ONLY with your spoken line. No labels, no stage directions.\n" + "- NEVER repeat an argument you or the Expert already made.\n" + "- Each turn MUST raise a genuinely new point, angle, or data from the paper.\n" + "- Build on what was said, don't circle back." + ) + + try: + resp = await _client.chat.completions.create( + model=_MODEL, + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_content}, + ], + temperature=0.82, + max_tokens=200, + ) + reply_text = resp.choices[0].message.content.strip() or "…" + except Exception as e: + ctx.logger.error(f"[HostA] Debate LLM error: {e}") + reply_text = "*(The Skeptic is gathering thoughts…)*" + + try: + await ctx.send( + sender, + DebateResponse( + session_id=msg.session_id, + speaker="skeptic", + reply_text=reply_text, + turn=msg.turn, + max_turns=msg.max_turns, + user_address=msg.user_address, + topic_title=msg.topic_title, + core_thesis=msg.core_thesis, + key_metrics=msg.key_metrics, + controversial_point=msg.controversial_point, + document_snippet=msg.document_snippet, + ), + ) + ctx.logger.info(f"[HostA] DebateResponse sent for turn {msg.turn}.") + except Exception as e: + ctx.logger.error(f"[HostA] Failed to send DebateResponse: {e}") + + +# ── User Q&A (from ASI:One chat) ────────────────────────────────────────────── + + +@chat_proto.on_message(ChatMessage) +async def handle_chat(ctx: Context, sender: str, msg: ChatMessage) -> None: + ctx.logger.info( + f"[HostA] ChatMessage from {sender[:20]}… content types: {[type(c).__name__ for c in msg.content]}" + ) + await ctx.send( + sender, + ChatAcknowledgement( + timestamp=datetime.now(timezone.utc), + acknowledged_msg_id=msg.msg_id, + ), + ) + + user_text = " ".join( + item.text.strip() + for item in msg.content + if isinstance(item, TextContent) and item.text.strip() + ) + if not user_text: + ctx.logger.warning("[HostA] No TextContent found in message β€” skipping.") + return + + # Load latest session context + session_id = ctx.storage.get(_LATEST_SESSION_KEY) + context_str = "" + if session_id: + raw = ctx.storage.get(session_id) + if raw: + try: + data = json.loads(raw) + context_str = ( + f"Episode topic: {data['topic_title']}\n" + f"Core thesis: {data['core_thesis']}\n" + f"Key metrics: {', '.join(data['key_metrics'])}\n" + f"Controversy: {data['controversial_point']}\n\n" + f"Paper excerpt:\n{data['document_snippet'][:2000]}" + ) + except Exception: + pass + + personality = ctx.storage.get(_PERSONALITY_KEY) or _DEFAULT_PERSONALITY + qa_system = ( + "You are Host A from a research podcast debate β€” 'The Skeptic'.\n" + f"Personality: {personality}\n\n" + "The user is asking a follow-up question about a paper you just debated. " + "Answer in character. Be concise (2–4 sentences max). Cite exact numbers " + "from the context when you challenge or probe a claim." + ) + + user_msg = {"role": "user", "content": user_text} + if context_str: + user_msg["content"] = ( + f"[Paper context]\n{context_str}\n\n[User question]\n{user_text}" + ) + + try: + resp = await _client.chat.completions.create( + model=_MODEL, + messages=[ + {"role": "system", "content": qa_system}, + user_msg, + ], + temperature=0.75, + max_tokens=300, + ) + reply_text = resp.choices[0].message.content or "…" + except Exception as e: + ctx.logger.error(f"[HostA] LLM error: {e}") + reply_text = "*(Host A is thinking… try again in a moment)*" + + ctx.logger.info(f"[HostA] Sending reply to {sender[:16]}…") + try: + await ctx.send( + sender, + ChatMessage( + timestamp=datetime.now(timezone.utc), + msg_id=uuid4(), + content=[ + TextContent( + type="text", text=f"**[Host A β€” The Skeptic]**\n\n{reply_text}" + ), + EndSessionContent(type="end-session"), + ], + ), + ) + ctx.logger.info("[HostA] Reply sent successfully.") + except Exception as e: + ctx.logger.error(f"[HostA] Failed to send reply: {e}") + + +@chat_proto.on_message(ChatAcknowledgement) +async def handle_ack(ctx: Context, sender: str, msg: ChatAcknowledgement) -> None: + pass + + +host_a.include(chat_proto, publish_manifest=True) + + +if __name__ == "__main__": + host_a.run() diff --git a/pdf-podcast-agent/host_b_agent.py b/pdf-podcast-agent/host_b_agent.py new file mode 100644 index 00000000..a68a2da0 --- /dev/null +++ b/pdf-podcast-agent/host_b_agent.py @@ -0,0 +1,286 @@ +""" +Host B – The Expert (Post-Show Q&A Agent) +========================================== +Run in its own terminal BEFORE the Orchestrator starts: + + python host_b_agent.py + +This agent does two things: + 1. Receives a ContextInjection from the Orchestrator after each podcast is + generated and stores the paper context in persistent local storage. + 2. Responds to ChatMessages from users in ASI:One, answering in-character as + "The Expert" β€” the host who defends findings with exact numbers. + +Users interact by tagging @pdf_podcast_host_b in ASI:One chat: + "@pdf_podcast_host_b Can you explain the union-based execution result?" +""" + +import json +import os +from datetime import datetime, timezone +from uuid import uuid4 + +from dotenv import load_dotenv +from openai import AsyncOpenAI +from uagents import Agent, Context, Protocol + +from uagents_core.contrib.protocols.chat import ( + ChatAcknowledgement, + ChatMessage, + EndSessionContent, + TextContent, + chat_protocol_spec, +) + +from schemas import ContextInjection, DebateTurn, DebateResponse + +load_dotenv() + +# ── Config ───────────────────────────────────────────────────────────────────── + +_MODEL = os.getenv("ASI1_MODEL", "asi1-mini") +_client = AsyncOpenAI( + api_key=os.getenv("ASI1_API_KEY", ""), + base_url="https://api.asi1.ai/v1", +) + +_DEFAULT_PERSONALITY = ( + "You cite exact data points from the study. You defend findings with measured " + "confidence, acknowledge limitations honestly, and never over-claim. You often " + "say 'the data shows' and back it up with a specific number." +) + +_LATEST_SESSION_KEY = "__latest_session__" +_PERSONALITY_KEY = "__personality__" + +# ── Agent ────────────────────────────────────────────────────────────────────── + +_agentverse_key = os.getenv("AGENTVERSE_API_KEY", "") + +if _agentverse_key: + host_b = Agent( + name="pdf_podcast_host_b", + seed=os.getenv("HOST_B_SEED", "pdf_podcast_host_b_seed_v1"), + port=8005, + mailbox=True, + agentverse="https://agentverse.ai", + handle_messages_concurrently=True, + network="testnet", + ) +else: + host_b = Agent( + name="pdf_podcast_host_b", + seed=os.getenv("HOST_B_SEED", "pdf_podcast_host_b_seed_v1"), + port=8005, + endpoint=["http://localhost:8005/submit"], + handle_messages_concurrently=True, + network="testnet", + ) + +chat_proto = Protocol(spec=chat_protocol_spec) + +# ── Context injection (from Orchestrator) ────────────────────────────────────── + + +@host_b.on_message(ContextInjection) +async def receive_context(ctx: Context, sender: str, msg: ContextInjection) -> None: + """Store paper context keyed by session_id and track the latest session.""" + payload = { + "topic_title": msg.topic_title, + "core_thesis": msg.core_thesis, + "key_metrics": msg.key_metrics, + "controversial_point": msg.controversial_point, + "document_snippet": msg.document_snippet, + } + ctx.storage.set(msg.session_id, json.dumps(payload)) + ctx.storage.set(_LATEST_SESSION_KEY, msg.session_id) + if msg.host_b_personality: + ctx.storage.set(_PERSONALITY_KEY, msg.host_b_personality) + ctx.logger.info( + f"[HostB] Context stored for session {msg.session_id[:8]} β€” '{msg.topic_title}'" + ) + + +# ── Debate turn (Orchestrator β†’ HostB β†’ Orchestrator) ──────────────────────── + + +@host_b.on_message(DebateTurn) +async def handle_debate_turn(ctx: Context, sender: str, msg: DebateTurn) -> None: + ctx.logger.info( + f"[HostB] Debate turn {msg.turn}/{msg.max_turns} β€” session {msg.session_id[:8]}" + ) + + history_block = "" + if msg.debate_history: + history_block = f"--- DEBATE SO FAR ---\n{msg.debate_history}\n--- END OF DEBATE HISTORY ---\n\n" + + user_content = ( + f"Topic: {msg.topic_title}\n" + f"Core thesis: {msg.core_thesis}\n" + f"Key metrics: {', '.join(msg.key_metrics)}\n" + f"Controversy: {msg.controversial_point}\n" + f"Paper excerpt: {msg.document_snippet[:1500]}\n\n" + f"{history_block}" + f'The Skeptic just said:\n"{msg.previous_statement}"\n\n' + f"This is turn {msg.turn + 1} of {msg.max_turns}.\n" + "IMPORTANT: Do NOT repeat any argument already made in the debate history above. " + "Respond to the Skeptic's NEW point directly. Introduce fresh evidence, a different " + "data point from the paper, or a new implication. Acknowledge a limitation honestly. " + "2-3 confident sentences." + ) + + personality = ( + msg.speaker_personality + or ctx.storage.get(_PERSONALITY_KEY) + or _DEFAULT_PERSONALITY + ) + system_prompt = ( + f"You are Host B β€” 'The Expert' β€” in a LIVE podcast debate.\n" + f"Personality: {personality}\n" + "Rules:\n" + "- Respond ONLY with your spoken line. No labels, no stage directions.\n" + "- NEVER repeat an argument you or the Skeptic already made.\n" + "- Each turn MUST raise a genuinely new point, angle, or data from the paper.\n" + "- Build on what was said, don't circle back." + ) + + try: + resp = await _client.chat.completions.create( + model=_MODEL, + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_content}, + ], + temperature=0.82, + max_tokens=200, + ) + reply_text = resp.choices[0].message.content.strip() or "…" + except Exception as e: + ctx.logger.error(f"[HostB] Debate LLM error: {e}") + reply_text = "*(The Expert is consulting the data…)*" + + try: + await ctx.send( + sender, + DebateResponse( + session_id=msg.session_id, + speaker="expert", + reply_text=reply_text, + turn=msg.turn, + max_turns=msg.max_turns, + user_address=msg.user_address, + topic_title=msg.topic_title, + core_thesis=msg.core_thesis, + key_metrics=msg.key_metrics, + controversial_point=msg.controversial_point, + document_snippet=msg.document_snippet, + ), + ) + ctx.logger.info(f"[HostB] DebateResponse sent for turn {msg.turn}.") + except Exception as e: + ctx.logger.error(f"[HostB] Failed to send DebateResponse: {e}") + + +# ── User Q&A (from ASI:One chat) ────────────────────────────────────────────── + + +@chat_proto.on_message(ChatMessage) +async def handle_chat(ctx: Context, sender: str, msg: ChatMessage) -> None: + ctx.logger.info( + f"[HostB] ChatMessage from {sender[:20]}… content types: {[type(c).__name__ for c in msg.content]}" + ) + await ctx.send( + sender, + ChatAcknowledgement( + timestamp=datetime.now(timezone.utc), + acknowledged_msg_id=msg.msg_id, + ), + ) + + user_text = " ".join( + item.text.strip() + for item in msg.content + if isinstance(item, TextContent) and item.text.strip() + ) + if not user_text: + ctx.logger.warning("[HostB] No TextContent found in message β€” skipping.") + return + + # Load latest session context + session_id = ctx.storage.get(_LATEST_SESSION_KEY) + context_str = "" + if session_id: + raw = ctx.storage.get(session_id) + if raw: + try: + data = json.loads(raw) + context_str = ( + f"Episode topic: {data['topic_title']}\n" + f"Core thesis: {data['core_thesis']}\n" + f"Key metrics: {', '.join(data['key_metrics'])}\n" + f"Controversy: {data['controversial_point']}\n\n" + f"Paper excerpt:\n{data['document_snippet'][:2000]}" + ) + except Exception: + pass + + personality = ctx.storage.get(_PERSONALITY_KEY) or _DEFAULT_PERSONALITY + qa_system = ( + "You are Host B from a research podcast debate β€” 'The Expert'.\n" + f"Personality: {personality}\n\n" + "The user is asking a follow-up question about a paper you just debated. " + "Answer in character. Be concise (2–4 sentences max). Always cite at least " + "one specific metric or finding from the context in your answer." + ) + + user_msg = {"role": "user", "content": user_text} + if context_str: + user_msg["content"] = ( + f"[Paper context]\n{context_str}\n\n[User question]\n{user_text}" + ) + + try: + resp = await _client.chat.completions.create( + model=_MODEL, + messages=[ + {"role": "system", "content": qa_system}, + user_msg, + ], + temperature=0.7, + max_tokens=300, + ) + reply_text = resp.choices[0].message.content or "…" + except Exception as e: + ctx.logger.error(f"[HostB] LLM error: {e}") + reply_text = "*(Host B is looking up the data… try again in a moment)*" + + ctx.logger.info(f"[HostB] Sending reply to {sender[:16]}…") + try: + await ctx.send( + sender, + ChatMessage( + timestamp=datetime.now(timezone.utc), + msg_id=uuid4(), + content=[ + TextContent( + type="text", text=f"**[Host B β€” The Expert]**\n\n{reply_text}" + ), + EndSessionContent(type="end-session"), + ], + ), + ) + ctx.logger.info("[HostB] Reply sent successfully.") + except Exception as e: + ctx.logger.error(f"[HostB] Failed to send reply: {e}") + + +@chat_proto.on_message(ChatAcknowledgement) +async def handle_ack(ctx: Context, sender: str, msg: ChatAcknowledgement) -> None: + pass + + +host_b.include(chat_proto, publish_manifest=True) + + +if __name__ == "__main__": + host_b.run() diff --git a/pdf-podcast-agent/orchestrator.py b/pdf-podcast-agent/orchestrator.py new file mode 100644 index 00000000..35759edc --- /dev/null +++ b/pdf-podcast-agent/orchestrator.py @@ -0,0 +1,1606 @@ +""" +The Orchestrator – State Manager & ASI:One Gateway +==================================================== +Run in its own terminal (start AFTER the three sub-agents): + + python orchestrator.py + +This is the ONLY agent that faces the outside world. It does four things: + 1. Catches incoming messages from ASI:One via the Agent Chat Protocol. + 2. Strips PDF text using pdfplumber. + 3. Chains Extractor β†’ Scriptwriter β†’ VoiceStudio with send_and_receive. + 4. Delivers the final MP3 back to the user. + +It also exposes a REST POST /process endpoint for local testing. + +Sub-agent addresses are read from environment variables. Run +`python get_addresses.py` first to print the addresses you need to set. +""" + +import asyncio +import base64 +import io +import json +import os +import re +import sys +import threading +from datetime import datetime, timezone +from functools import partial +from http.server import HTTPServer, SimpleHTTPRequestHandler +from pathlib import Path +from typing import Optional +from uuid import UUID, uuid4 + +import requests # type: ignore[import-untyped] +import stripe as _stripe_lib +from docx import Document +from docx.enum.text import WD_ALIGN_PARAGRAPH +from docx.shared import Pt, RGBColor +from openai import AsyncOpenAI +import pdfplumber +from uagents import Agent, Context, Protocol + +from dotenv import load_dotenv + +from uagents_core.contrib.protocols.chat import ( + ChatAcknowledgement, + ChatMessage, + EndSessionContent, + Resource, + ResourceContent, + TextContent, + chat_protocol_spec, +) + +from uagents_core.contrib.protocols.payment import ( + CommitPayment, + CompletePayment, + Funds, + RejectPayment, + RequestPayment, + payment_protocol_spec, +) + +from schemas import ( + AudioResponse, + ContextInjection, + DebateResponse, + DebateTurn, + ExtractRequest, + PipelineRequest, + PipelineResponse, + PodcastScript, + ResearchInsights, +) + +load_dotenv() + +# Windows PowerShell defaults to cp1252 which can't encode arrows/checkmarks. +# Force UTF-8 on stdout/stderr so logger never crashes on Unicode characters. +if sys.stdout and hasattr(sys.stdout, "buffer"): + sys.stdout = io.TextIOWrapper( + sys.stdout.buffer, encoding="utf-8", errors="replace", line_buffering=True + ) +if sys.stderr and hasattr(sys.stderr, "buffer"): + sys.stderr = io.TextIOWrapper( + sys.stderr.buffer, encoding="utf-8", errors="replace", line_buffering=True + ) + +# ── Sub-agent addresses (set via env vars β€” run get_addresses.py first) ─────── + +EXTRACTOR_ADDRESS = os.getenv("EXTRACTOR_ADDRESS", "") +SCRIPTWRITER_ADDRESS = os.getenv("SCRIPTWRITER_ADDRESS", "") +VOICE_STUDIO_ADDRESS = os.getenv("VOICE_STUDIO_ADDRESS", "") +HOST_A_ADDRESS = os.getenv("HOST_A_ADDRESS", "") +HOST_B_ADDRESS = os.getenv("HOST_B_ADDRESS", "") + +OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "output")) +OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + +AUDIO_SERVER_PORT = int(os.getenv("AUDIO_SERVER_PORT", "8080")) +AUDIO_SERVER_HOST = os.getenv("AUDIO_SERVER_HOST", "localhost") + +# ── Live debate trigger keywords & storage key ──────────────────────────────── +_LIVE_DEBATE_TRIGGERS = { + "live debate", + "start debate", + "watch debate", + "debate now", + "watch them", + "let them talk", + "replay", + "show debate", + "debate", + "continue debate", + "continue", +} +_LAST_SESSION_KEY = "last_session_context" +_ACTIVE_DEBATE_KEY = "active_debate_session" +_DEBATE_ACCUM_KEY = "debate_transcript_accum" +_DEBATE_COOLDOWN_KEY = ( + "debate_last_started_ts" # Unix timestamp β€” debounces duplicate triggers +) +_DEBATE_MSG_ID_KEY = ( + "debate_bubble_msg_id" # stable UUID so all updates hit the same bubble +) +_PERSONALITIES_KEY = "host_personalities" +_PAID_SESSIONS_KEY = "paid_podcast_sessions" # JSON list of paid session IDs +_PENDING_PAYMENTS_KEY = ( + "pending_stripe_payments" # JSON dict keyed by checkout_session_id +) + +_DEBATE_COOLDOWN_SECS = 30 # ignore duplicate "debate" triggers within this window +_ACTIVE_DEBATE_TTL = 300 # auto-clear stale active debate after 5 minutes +_ACTIVE_DEBATE_TS_KEY = "active_debate_started_ts" # Unix timestamp of debate start +_SEEN_MSG_IDS_KEY = "seen_msg_ids" # JSON list of recently processed msg_ids + +# ── Personality presets ─────────────────────────────────────────────────────── +# Each entry: (display_name, system_prompt_hint) +_PERSONALITY_PRESETS: dict[str, dict[str, tuple[str, str]]] = { + "host_a": { + "1": ( + "Classic Skeptic", + "You demand hard empirical evidence. You always ask 'what's the sample size?' " + "and 'was this peer-reviewed?'. You challenge methodology relentlessly but fairly. " + "You are NOT dismissive β€” you are intellectually rigorous.", + ), + "2": ( + "Investigative Journalist", + "You follow the money. You ask 'who funded this research?', probe conflicts of " + "interest, and surface what the paper conveniently omits. You are the audience's " + "advocate β€” sharp, tenacious, and never satisfied with PR-speak.", + ), + "3": ( + "Academic Critic", + "You are obsessed with methodological rigor and the replication crisis. You demand " + "statistical significance thresholds, effect sizes, confidence intervals, and " + "independent replication before accepting any finding as credible.", + ), + "4": ( + "Industry Veteran", + "You have seen every hype cycle β€” dot-com, blockchain, metaverse. You compare new " + "claims to past failed promises, demand real-world deployment numbers over lab " + "results, and are deeply sceptical of trend reports and analyst projections.", + ), + }, + "host_b": { + "1": ( + "Researcher", + "You cite exact data points from the study. You defend findings with measured " + "confidence, acknowledge limitations honestly, and never over-claim. You often " + "say 'the data shows' and back it up with a specific number.", + ), + "2": ( + "Industry Insider", + "You speak in terms of ROI, market share, and enterprise adoption curves. " + "You name companies already shipping these solutions, focus on practical value " + "delivered, and translate academic findings into business impact.", + ), + "3": ( + "Futurist", + "You connect the paper's findings to sweeping technological and societal shifts. " + "You extrapolate decades ahead, frame every result as part of a larger paradigm " + "change, and are genuinely excited about what comes next.", + ), + "4": ( + "Enthusiastic Teacher", + "You use vivid analogies to make complex concepts click for anyone. You celebrate " + "the 'aha moment', make the audience feel smart, and genuinely love helping people " + "understand difficult ideas through relatable storytelling.", + ), + }, +} + +_PERSONALITY_SET_RE = re.compile( + r"\bA\s*:\s*([1-4])\b.*?\bB\s*:\s*([1-4])\b", re.IGNORECASE +) +_PERSONALITY_TRIGGERS = { + "customize", + "set personality", + "change personality", + "set hosts", + "host style", + "personalities", + "change hosts", +} + +_PERSONALITY_MENU = ( + "## 🎭 Customize Host Personalities\n\n" + "Reply with **A:[1-4] B:[1-4]** to set host styles " + "*(e.g. `A:2 B:3`)*.\n\n" + "**🎀 @skeptic-agent β€” the challenger**\n" + "1. **Classic Skeptic** β€” demands evidence, asks for sample sizes\n" + "2. **Investigative Journalist** β€” follows the money, probes conflicts of interest\n" + "3. **Academic Critic** β€” obsessed with methodology, replication, and stats rigor\n" + "4. **Industry Veteran** β€” has seen every hype cycle, compares to past failures\n\n" + "**πŸŽ“ @expert-agent β€” the defender**\n" + "1. **Researcher** β€” cites exact data points, acknowledges limitations\n" + "2. **Industry Insider** β€” ROI focus, names companies, practical value\n" + "3. **Futurist** β€” connects findings to paradigm shifts, extrapolates ahead\n" + "4. **Enthusiastic Teacher** β€” vivid analogies, makes complex ideas accessible\n\n" + "*Current default: A:1 (Classic Skeptic) Β· B:1 (Researcher)*" +) + + +def _chat(text: str) -> ChatMessage: + """Create a ChatMessage that closes the current ASI:One bubble immediately. + + Without EndSessionContent, ASI:One keeps the response bubble 'open' and + every subsequent send from this agent simply updates (overwrites) that same + bubble. Appending EndSessionContent seals the bubble so the next send + opens a fresh one β€” giving each pipeline step and each debate turn its own + separate chat bubble. + """ + return ChatMessage( + timestamp=datetime.now(timezone.utc), + msg_id=uuid4(), + content=[ + TextContent(type="text", text=text), + EndSessionContent(type="end-session"), + ], + ) + + +def _chat_open(text: str, msg_id: UUID) -> ChatMessage: + """Create a ChatMessage that keeps the bubble OPEN for future updates. + + By reusing the same ``msg_id`` and omitting EndSessionContent, ASI:One + will overwrite the existing bubble in-place rather than creating a new one. + """ + return ChatMessage( + timestamp=datetime.now(timezone.utc), + msg_id=msg_id, + content=[TextContent(type="text", text=text)], + ) + + +def _chat_close(text: str, msg_id: UUID) -> ChatMessage: + """Create a ChatMessage that updates the bubble one last time and seals it.""" + return ChatMessage( + timestamp=datetime.now(timezone.utc), + msg_id=msg_id, + content=[ + TextContent(type="text", text=text), + EndSessionContent(type="end-session"), + ], + ) + + +def _start_audio_server() -> None: + """Serve OUTPUT_DIR over HTTP so MP3 links are clickable and browser-playable.""" + handler = partial(SimpleHTTPRequestHandler, directory=str(OUTPUT_DIR)) + server = HTTPServer(("0.0.0.0", AUDIO_SERVER_PORT), handler) + server.serve_forever() + + +threading.Thread(target=_start_audio_server, daemon=True, name="audio-server").start() + + +def _audio_url(audio_path: str) -> str: + filename = Path(audio_path).name + return f"http://{AUDIO_SERVER_HOST}:{AUDIO_SERVER_PORT}/{filename}" + + +# ── PDF helpers ─────────────────────────────────────────────────────────────── + + +def _pdf_bytes_to_text(pdf_bytes: bytes) -> str: + lines = [] + with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf: + for page in pdf.pages: + t = page.extract_text() + if t: + lines.append(t) + return "\n".join(lines) + + +def _pdf_path_to_text(path: str) -> str: + lines = [] + with pdfplumber.open(path) as pdf: + for page in pdf.pages: + t = page.extract_text() + if t: + lines.append(t) + return "\n".join(lines) + + +def _resource_to_text(resource: Resource) -> str: + """Fetch PDF bytes from a ResourceContent URI and extract text. + + Handles three URI schemes emitted by ASI:One: + - https://... β†’ download with requests + - data:...;base64,... β†’ decode inline + - everything else β†’ treat as a local file path + """ + uri = resource.uri + if uri.startswith("http://") or uri.startswith("https://"): + resp = requests.get(uri, timeout=30) + resp.raise_for_status() + pdf_bytes = resp.content + elif uri.startswith("data:"): + _, encoded = uri.split(",", 1) + pdf_bytes = base64.b64decode(encoded) + else: + pdf_bytes = Path(uri).read_bytes() + return _pdf_bytes_to_text(pdf_bytes) + + +# ── LLM clients ─────────────────────────────────────────────────────────────── +# All text generation β†’ ASI:One (stays within the Fetch.ai ecosystem) + +_asi = AsyncOpenAI( + api_key=os.getenv("ASI1_API_KEY", ""), + base_url="https://api.asi1.ai/v1", +) + +_ASI_MODEL = os.getenv("ASI1_MODEL", "asi1-mini") +_DOCX_MODEL = _ASI_MODEL + + +def _docx_slug(title: str) -> str: + return re.sub(r"[^a-zA-Z0-9]+", "_", title)[:50] + + +def _set_heading(doc: Document, text: str, level: int = 1) -> None: + p = doc.add_heading(text, level=level) + for run in p.runs: + run.font.color.rgb = RGBColor(0x1A, 0x1A, 0x2E) + + +def _add_speaker_block( + doc: Document, speaker_label: str, text: str, color: RGBColor +) -> None: + p = doc.add_paragraph() + label = p.add_run(f"{speaker_label} ") + label.bold = True + label.font.color.rgb = color + label.font.size = Pt(10) + body = p.add_run(text) + body.font.size = Pt(10.5) + p.paragraph_format.space_after = Pt(6) + + +async def _build_docx( + audio_path: str, + script: "PodcastScript", + insights: "ResearchInsights", +) -> str: + """Generate an extended director's-cut script via LLM and write a DOCX. + + The document contains: + 1. Cover metadata (title, date, stats) + 2. Episode Brief (thesis Β· key metrics Β· controversy) + 3. Podcast Script β€” the lines that were voiced + 4. Extended Script β€” ~25 additional exchanges, director's cut + Returns the saved .docx path. + """ + COLOR_A = RGBColor(0x0D, 0x47, 0xA1) # deep blue – Host A (Skeptic) + COLOR_B = RGBColor(0x1B, 0x5E, 0x20) # deep green – Host B (Expert) + COLOR_HDR = RGBColor(0x1A, 0x1A, 0x2E) # near-black + + # ── Generate extended dialogue via LLM ───────────────────────────────── + extend_prompt = ( + "You are a podcast showrunner writing a director's-cut script.\n" + "Below is a short voiced podcast debate. Extend it into a longer written version " + "(25–30 more exchanges) that goes deeper: explore edge cases, counter-arguments, " + "analogies, and open questions the paper didn't answer. " + "Keep the same two hosts: HostA (skeptic, demands proof) and HostB (expert, defends with numbers).\n\n" + f"CONTEXT\n" + f"Topic: {script.topic_title}\n" + f"Thesis: {insights.core_thesis}\n" + f"Key metrics: {', '.join(insights.key_metrics)}\n" + f"Controversy: {insights.controversial_point}\n\n" + "VOICED SCRIPT (do not repeat verbatim, but continue naturally from it):\n" + + "\n".join( + f"{'HostA' if line.speaker == 'HostA' else 'HostB'}: {line.text}" + for line in script.lines + ) + + "\n\n" + "Return ONLY a JSON array of objects with keys 'speaker' ('HostA'|'HostB') and 'text'. " + "No markdown, no extra keys." + ) + + resp = await _asi.chat.completions.create( + model=_ASI_MODEL, + messages=[ + { + "role": "system", + "content": "You are a podcast showrunner. Return only valid JSON.", + }, + {"role": "user", "content": extend_prompt}, + ], + temperature=0.85, + ) + raw = resp.choices[0].message.content or "{}" + try: + parsed = json.loads(raw) + # model may wrap in a key + if isinstance(parsed, dict): + for v in parsed.values(): + if isinstance(v, list): + parsed = v + break + extended_lines = parsed if isinstance(parsed, list) else [] + except Exception: + extended_lines = [] + + # ── Build DOCX ───────────────────────────────────────────────────────── + doc = Document() + + # β€” narrow margins β€” + for section in doc.sections: + section.top_margin = Pt(72) + section.bottom_margin = Pt(72) + section.left_margin = Pt(80) + section.right_margin = Pt(80) + + # Cover + title_p = doc.add_paragraph() + title_p.alignment = WD_ALIGN_PARAGRAPH.CENTER + tr = title_p.add_run(script.topic_title) + tr.bold = True + tr.font.size = Pt(20) + tr.font.color.rgb = COLOR_HDR + + sub_p = doc.add_paragraph() + sub_p.alignment = WD_ALIGN_PARAGRAPH.CENTER + sr = sub_p.add_run( + f"PDF Podcast Agent β€’ {datetime.now(timezone.utc).strftime('%d %B %Y')}\n" + f"{len(script.lines)} voiced lines β€’ {len(extended_lines)} extended lines" + ) + sr.font.size = Pt(10) + sr.font.color.rgb = RGBColor(0x66, 0x66, 0x66) + doc.add_paragraph() + + # Episode Brief + _set_heading(doc, "Episode Brief", level=1) + + _set_heading(doc, "Core Argument", level=2) + doc.add_paragraph(insights.core_thesis).paragraph_format.space_after = Pt(8) + + _set_heading(doc, "Key Metrics", level=2) + for m in insights.key_metrics: + p = doc.add_paragraph(style="List Bullet") + p.add_run(m).font.size = Pt(10.5) + + _set_heading(doc, "The Controversy", level=2) + doc.add_paragraph(insights.controversial_point).paragraph_format.space_after = Pt( + 12 + ) + doc.add_page_break() + + # Voiced Script + _set_heading(doc, "Podcast Script (Voiced Version)", level=1) + sub = doc.add_paragraph() + sub.add_run( + "These are the exact lines synthesised to audio by the Voice Studio agent." + ).font.color.rgb = RGBColor(0x55, 0x55, 0x55) + doc.add_paragraph() + + ts = 0 # running timestamp in seconds (~12 s per TTS line) + for line in script.lines: + is_a = line.speaker == "HostA" + stamp = f"[{ts // 60}:{ts % 60:02d}]" + _add_speaker_block( + doc, + f"{stamp} HOST A (The Skeptic)" + if is_a + else f"{stamp} HOST B (The Expert)", + line.text, + COLOR_A if is_a else COLOR_B, + ) + ts += 12 + doc.add_page_break() + + # Extended Script + _set_heading(doc, "Extended Script (Director's Cut)", level=1) + sub2 = doc.add_paragraph() + sub2.add_run( + "Continues from the voiced script β€” deeper exchanges generated for the written version." + ).font.color.rgb = RGBColor(0x55, 0x55, 0x55) + doc.add_paragraph() + + for line in extended_lines: + spk = line.get("speaker", "HostA") + txt = line.get("text", "") + if not txt: + continue + is_a = spk == "HostA" + _add_speaker_block( + doc, + "HOST A (The Skeptic)" if is_a else "HOST B (The Expert)", + txt, + COLOR_A if is_a else COLOR_B, + ) + + # Save alongside the MP3 + stem = Path(audio_path).stem + docx_path = OUTPUT_DIR / f"{stem}.docx" + doc.save(str(docx_path)) + return str(docx_path) + + +# ── Personality helpers ─────────────────────────────────────────────────────── + + +def _load_personalities(ctx: Context) -> tuple[str, str, str, str]: + """Return (a_name, a_hint, b_name, b_hint) from storage, falling back to defaults.""" + raw = ctx.storage.get(_PERSONALITIES_KEY) + if raw: + try: + p = json.loads(raw) + return p["a_name"], p["a_hint"], p["b_name"], p["b_hint"] + except Exception: + pass + a_name, a_hint = _PERSONALITY_PRESETS["host_a"]["1"] + b_name, b_hint = _PERSONALITY_PRESETS["host_b"]["1"] + return a_name, a_hint, b_name, b_hint + + +# ── Payment helpers ─────────────────────────────────────────────────────────── + + +def _stripe_enabled() -> bool: + return bool(_stripe_lib and os.getenv("STRIPE_SECRET_KEY", "")) + + +def _is_session_paid(ctx: Context, session_id: str) -> bool: + """Return True if this podcast session has an active Live Show Pass.""" + raw = ctx.storage.get(_PAID_SESSIONS_KEY) or "[]" + try: + return session_id in json.loads(raw) + except Exception: + return False + + +def _create_embedded_checkout( + *, user_address: str, podcast_session_id: str, description: str +) -> dict: + """Create a Stripe embedded Checkout Session and return the payload + that goes into ``RequestPayment.metadata["stripe"]``. + + Uses ``ui_mode="embedded"`` so ASI:One renders the payment form + inline as an overlay β€” no redirect required. + """ + _stripe_lib.api_key = os.getenv("STRIPE_SECRET_KEY", "") # type: ignore[union-attr] + price_cents = int(os.getenv("STRIPE_LIVE_SHOW_PRICE_CENTS", "1000")) + pub_key = os.getenv("STRIPE_PUBLISHABLE_KEY", "") + success_url = os.getenv("STRIPE_SUCCESS_URL", "https://asi1.ai") + + session = _stripe_lib.checkout.Session.create( # type: ignore[union-attr] + ui_mode="embedded", + redirect_on_completion="if_required", + mode="payment", + payment_method_types=["card"], + return_url=success_url + "?session_id={CHECKOUT_SESSION_ID}", + line_items=[ + { + "price_data": { + "currency": "usd", + "unit_amount": price_cents, + "product_data": { + "name": "PDF Podcast - Live Show Pass", + "description": description, + }, + }, + "quantity": 1, + } + ], + metadata={ + "podcast_session_id": podcast_session_id, + "user_address": user_address, + }, + ) + + return { + "client_secret": session.client_secret, + "checkout_session_id": session.id, + "publishable_key": pub_key, + "currency": "usd", + "amount_cents": price_cents, + "ui_mode": "embedded_page", + } + + +def _verify_checkout_session_paid(checkout_session_id: str) -> bool: + """Verify with Stripe that a Checkout Session has been paid.""" + _stripe_lib.api_key = os.getenv("STRIPE_SECRET_KEY", "") # type: ignore[union-attr] + session = _stripe_lib.checkout.Session.retrieve(checkout_session_id) # type: ignore[union-attr] + return getattr(session, "payment_status", None) == "paid" + + +async def _gate_with_payment(ctx: Context, sender: str, feature_name: str) -> bool: + """Check payment gate. Returns True (proceed) or False (RequestPayment sent). + + Uses the Agent Payment Protocol with Stripe embedded Checkout so that + ASI:One renders the payment form as an inline overlay in the chat. + + If STRIPE_SECRET_KEY is not set the gate is disabled and always returns True. + """ + if not _stripe_enabled(): + ctx.logger.info("[Payment] Stripe not configured -- gate disabled (dev mode).") + return True + + raw = ctx.storage.get(_LAST_SESSION_KEY) + if not raw: + await ctx.send( + sender, + _chat( + "No podcast session found.\n\n" + "Send me a PDF first to generate a podcast, then unlock the Live Show Pass." + ), + ) + return False + + data = json.loads(raw) + podcast_session_id = data["session_id"] + + if _is_session_paid(ctx, podcast_session_id): + return True + + price_cents = int(os.getenv("STRIPE_LIVE_SHOW_PRICE_CENTS", "1000")) + price_dollars = price_cents / 100 + + try: + checkout_payload = await asyncio.to_thread( + _create_embedded_checkout, + user_address=sender, + podcast_session_id=podcast_session_id, + description=( + "Unlock live debate, host personality customization " + "& extended Q&A for this episode." + ), + ) + + checkout_session_id = checkout_payload["checkout_session_id"] + raw_pending = ctx.storage.get(_PENDING_PAYMENTS_KEY) or "{}" + try: + pending_by_checkout = json.loads(raw_pending) + if not isinstance(pending_by_checkout, dict): + pending_by_checkout = {} + except Exception: + pending_by_checkout = {} + + pending_by_checkout[checkout_session_id] = { + "podcast_session_id": podcast_session_id, + "user_address": sender, + "feature": feature_name, + } + ctx.storage.set(_PENDING_PAYMENTS_KEY, json.dumps(pending_by_checkout)) + + req = RequestPayment( + accepted_funds=[ + Funds( + currency="USD", + amount=f"{price_dollars:.2f}", + payment_method="stripe", + ) + ], + recipient=str(ctx.agent.address), + deadline_seconds=600, + description=f"Pay ${price_dollars:.0f} to unlock the Live Show Pass for this episode.", + metadata={"stripe": checkout_payload, "service": "live_show_pass"}, + ) + await ctx.send(sender, req) + ctx.logger.info( + f"[Payment] RequestPayment sent (embedded checkout " + f"{checkout_payload['checkout_session_id'][:20]}...) " + f"for session {podcast_session_id[:8]}" + ) + except Exception as exc: + ctx.logger.error(f"[Payment] Stripe error: {exc}") + await ctx.send( + sender, + _chat( + f"Payment service temporarily unavailable.\n\n" + f"Please try again in a moment. *(Error: {exc})*" + ), + ) + return False + + +# ── Cover art ───────────────────────────────────────────────────────────────── + +# ── Host-agent context injection ────────────────────────────────────────────── + + +async def _inject_context_to_hosts( + ctx: Context, + session_id: str, + document_text: str, + script: "PodcastScript", + insights: "ResearchInsights", +) -> None: + """Fire-and-forget context push to @HostA and @HostB so they can answer + follow-up questions from ASI:One users after the podcast is ready.""" + if not HOST_A_ADDRESS and not HOST_B_ADDRESS: + ctx.logger.warning( + "[Orchestrator] HOST_A_ADDRESS / HOST_B_ADDRESS not set β€” skipping context injection" + ) + return + + _, a_hint, _, b_hint = _load_personalities(ctx) + + payload = ContextInjection( + session_id=session_id, + topic_title=script.topic_title, + core_thesis=insights.core_thesis, + key_metrics=insights.key_metrics, + controversial_point=insights.controversial_point, + document_snippet=document_text[:4000], + host_a_personality=a_hint, + host_b_personality=b_hint, + ) + + for name, address in [("HostA", HOST_A_ADDRESS), ("HostB", HOST_B_ADDRESS)]: + if address: + await ctx.send(address, payload) + ctx.logger.info(f"[Orchestrator] Context injected β†’ {name}") + + +# ── Core pipeline ───────────────────────────────────────────────────────────── + + +async def _run_pipeline( + ctx: Context, document_text: str, session_id: str +) -> tuple[AudioResponse, PodcastScript, ResearchInsights]: + """ + Chain: ExtractRequest β†’ ResearchInsights β†’ PodcastScript β†’ AudioResponse + Uses ctx.send_and_receive at each hop for clean request-response flow. + """ + sid = session_id[:8] + + _PIPELINE_TIMEOUT = 120 # seconds β€” generous buffer for Agentverse relay + + # Step 1 β€” RAG Extraction + ctx.logger.info(f"[{sid}] ⟢ Extractor …") + insights, st1 = await ctx.send_and_receive( + EXTRACTOR_ADDRESS, + ExtractRequest(document_text=document_text, session_id=session_id), + response_type=ResearchInsights, + timeout=_PIPELINE_TIMEOUT, + ) + if not isinstance(insights, ResearchInsights): + raise RuntimeError(f"Extractor failed (status={st1})") + ctx.logger.info(f"[{sid}] βœ“ Insights β€” {insights.core_thesis[:60]}…") + + # Step 2 β€” Script generation + ctx.logger.info(f"[{sid}] ⟢ Scriptwriter …") + script, st2 = await ctx.send_and_receive( + SCRIPTWRITER_ADDRESS, + insights, + response_type=PodcastScript, + timeout=_PIPELINE_TIMEOUT, + ) + if not isinstance(script, PodcastScript): + raise RuntimeError(f"Scriptwriter failed (status={st2})") + ctx.logger.info( + f"[{sid}] βœ“ Script β€” {len(script.lines)} lines β€’ '{script.topic_title}'" + ) + + # Step 3 β€” Audio generation + stitching + ctx.logger.info(f"[{sid}] ⟢ Voice Studio …") + audio, st3 = await ctx.send_and_receive( + VOICE_STUDIO_ADDRESS, + script, + response_type=AudioResponse, + timeout=_PIPELINE_TIMEOUT, + ) + if not isinstance(audio, AudioResponse): + raise RuntimeError(f"Voice Studio failed (status={st3})") + ctx.logger.info(f"[{sid}] βœ“ Audio β†’ {audio.audio_path}") + + return audio, script, insights + + +# ── Live debate (relay: HostA β†’ Orchestrator β†’ HostB β†’ …, one growing bubble) ─ + +_MAX_DEBATE_TURNS = 8 # total lines (4 from each host) +_DEBATE_TURN_DELAY_SECS = int(os.getenv("DEBATE_TURN_DELAY_SECS", "8")) + + +async def _run_live_debate(ctx: Context, sender: str) -> None: + """Kick off a turn-based debate. + + The orchestrator sends a DebateTurn to Host A, Host A replies with + DebateResponse, the orchestrator appends it to the growing transcript and + then (after a short pacing pause) sends the next DebateTurn to the other host. + Every DebateResponse updates the *same* chat bubble with the full accumulated + transcript so far β€” giving a "live-streaming" effect. + """ + raw = ctx.storage.get(_LAST_SESSION_KEY) + if not raw: + await ctx.send( + sender, + _chat( + "⚠️ No podcast session found yet.\n\n" + "Send me a PDF first to generate a podcast, " + "then say **debate** to watch the hosts go head-to-head." + ), + ) + return + + if not HOST_A_ADDRESS or not HOST_B_ADDRESS: + await ctx.send( + sender, + _chat( + "⚠️ Host agent addresses not configured.\n" + "Set HOST_A_ADDRESS and HOST_B_ADDRESS in your .env file." + ), + ) + return + + data = json.loads(raw) + topic = data["topic_title"] + sid = str(uuid4())[:8] + + a_name, a_hint, b_name, b_hint = _load_personalities(ctx) + + # Stamp cooldown so duplicate trigger messages are ignored + import time as _time + + now = str(_time.time()) + ctx.storage.set(_DEBATE_COOLDOWN_KEY, now) + + # Generate a stable msg_id for this debate β€” every bubble update reuses + # it so ASI:One overwrites the same bubble instead of creating new ones. + debate_msg_id = uuid4() + ctx.storage.set(_DEBATE_MSG_ID_KEY, str(debate_msg_id)) + + # Seed storage with empty transcript + active session + TTL timestamp + ctx.storage.set(_ACTIVE_DEBATE_KEY, sid) + ctx.storage.set(_ACTIVE_DEBATE_TS_KEY, now) + ctx.storage.set( + _DEBATE_ACCUM_KEY, + json.dumps( + { + "session_id": sid, + "lines": [], + "user": sender, + "a_hint": a_hint, + "b_hint": b_hint, + } + ), + ) + + ctx.logger.info( + f"[LiveDebate] Starting session {sid} for '{topic}' β€” A: {a_name}, B: {b_name}" + ) + + # Opening message β€” keep bubble OPEN so subsequent turns update it in-place + header = ( + f"## 🎭 Live Debate β€” *{topic}*\n\n" + f"*🎀 @skeptic-agent as **{a_name}** Β· πŸŽ“ @expert-agent as **{b_name}***\n\n" + f"*⏳ Warming up the hosts…*" + ) + await ctx.send(sender, _chat_open(header, debate_msg_id)) + + await ctx.send( + HOST_A_ADDRESS, + DebateTurn( + session_id=sid, + topic_title=data["topic_title"], + core_thesis=data["core_thesis"], + key_metrics=data.get("key_metrics", []), + controversial_point=data.get("controversial_point", ""), + document_snippet=data.get("document_snippet", "")[:2000], + user_address=sender, + turn=0, + max_turns=_MAX_DEBATE_TURNS, + previous_statement="", + speaker_personality=a_hint, + ), + ) + ctx.logger.info(f"[LiveDebate] DebateTurn 0 β†’ HostA ({a_name})") + + +# ── Agent ───────────────────────────────────────────────────────────────────── + +_agentverse_key = os.getenv("AGENTVERSE_API_KEY", "") + +orchestrator = Agent( + name="pdf_podcast_orchestrator", + seed=os.getenv("ORCHESTRATOR_SEED", "pdf_podcast_orchestrator_seed_v1"), + port=8000, + # Use mailbox ONLY so ASI:One can reach us via the Agentverse relay. + # Do NOT set endpoint β€” it overrides mailbox in the Almanac, causing + # ASI:One to try localhost:8000 (unreachable from the cloud). + # Sub-agent replies route through Agentverse relay (slightly slower), + # compensated by the 120 s timeout on send_and_receive calls. + **( + { + "mailbox": _agentverse_key, + } + if _agentverse_key + else { + "endpoint": ["http://localhost:8000/submit"], + } + ), + network="testnet", +) + +chat_proto = Protocol(spec=chat_protocol_spec) + +# ── Startup ─────────────────────────────────────────────────────────────────── + + +@orchestrator.on_event("startup") +async def on_startup(ctx: Context) -> None: + ctx.logger.info(f"[Orchestrator] address: {ctx.agent.address}") + + missing = [ + name + for name, val in [ + ("EXTRACTOR_ADDRESS", EXTRACTOR_ADDRESS), + ("SCRIPTWRITER_ADDRESS", SCRIPTWRITER_ADDRESS), + ("VOICE_STUDIO_ADDRESS", VOICE_STUDIO_ADDRESS), + ] + if not val + ] + if missing: + ctx.logger.warning( + f"[Orchestrator] Missing env vars: {', '.join(missing)}\n" + f" Run `python get_addresses.py` to get the correct values,\n" + f" then set them before starting this agent." + ) + else: + ctx.logger.info( + f"[Orchestrator] sub-agents wired:\n" + f" Extractor {EXTRACTOR_ADDRESS}\n" + f" Scriptwriter {SCRIPTWRITER_ADDRESS}\n" + f" Voice Studio {VOICE_STUDIO_ADDRESS}\n" + f" Host A {HOST_A_ADDRESS or '(not set)'}\n" + f" Host B {HOST_B_ADDRESS or '(not set)'}" + ) + ctx.storage.set(_PENDING_PAYMENTS_KEY, "{}") + ctx.logger.info("[Payment] Pending payment state cleared on startup.") + + # Clear stale debate state β€” if the orchestrator restarted mid-debate the + # active session key would block all future debate triggers indefinitely. + ctx.storage.set(_ACTIVE_DEBATE_KEY, "") + ctx.storage.set(_DEBATE_ACCUM_KEY, "") + ctx.storage.set(_DEBATE_COOLDOWN_KEY, "0") + ctx.storage.set(_ACTIVE_DEBATE_TS_KEY, "0") + ctx.storage.set(_DEBATE_MSG_ID_KEY, "") + ctx.storage.set(_SEEN_MSG_IDS_KEY, "[]") + ctx.logger.info("[Debate] Stale debate state cleared on startup.") + + ctx.logger.info("[Orchestrator] REST: POST http://localhost:8000/process") + + +# ── Agent Payment Protocol (Stripe embedded checkout) ───────────────────────── + +payment_proto = Protocol(spec=payment_protocol_spec, role="seller") + + +@payment_proto.on_message(CommitPayment) +async def on_commit_payment(ctx: Context, sender: str, msg: CommitPayment) -> None: + """User completed the Stripe embedded checkout. + + ASI:One sends CommitPayment with ``transaction_id`` set to the Stripe + Checkout Session ID. We verify with Stripe, send CompletePayment, + mark the session as paid, and notify the user. + """ + if msg.funds.payment_method != "stripe" or not msg.transaction_id: + await ctx.send( + sender, + RejectPayment(reason="Unsupported payment method (expected stripe)."), + ) + return + + checkout_session_id = msg.transaction_id + + try: + paid = await asyncio.to_thread( + _verify_checkout_session_paid, checkout_session_id + ) + except Exception as exc: + ctx.logger.error(f"[Payment] Stripe verify error: {exc}") + await ctx.send( + sender, + RejectPayment( + reason="Could not verify payment with Stripe. Please try again." + ), + ) + return + + if not paid: + await ctx.send( + sender, + RejectPayment( + reason="Stripe payment not completed yet. Please finish checkout." + ), + ) + return + + await ctx.send(sender, CompletePayment(transaction_id=checkout_session_id)) + ctx.logger.info(f"[Payment] CompletePayment sent for {checkout_session_id[:20]}...") + + # Determine which podcast session this checkout belongs to. + # Pending payments are keyed by checkout_session_id to avoid races between + # concurrent users paying at the same time. + raw_pending = ctx.storage.get(_PENDING_PAYMENTS_KEY) or "{}" + try: + pending_data = json.loads(raw_pending) + if not isinstance(pending_data, dict): + pending_data = {} + except Exception: + pending_data = {} + + pending_info = pending_data.get(checkout_session_id) + if not pending_info and {"checkout_session_id", "podcast_session_id"}.issubset( + pending_data.keys() + ): + # Backward compatibility for older single-pending-payment shape. + legacy_checkout = pending_data.get("checkout_session_id") + if legacy_checkout == checkout_session_id: + pending_info = pending_data + + podcast_session_id = "" + if isinstance(pending_info, dict): + podcast_session_id = pending_info.get("podcast_session_id", "") + if not isinstance(podcast_session_id, str): + podcast_session_id = "" + + if not podcast_session_id: + # Fallback: retrieve from Stripe metadata + try: + _stripe_lib.api_key = os.getenv("STRIPE_SECRET_KEY", "") # type: ignore[union-attr] + stripe_session = await asyncio.to_thread( + _stripe_lib.checkout.Session.retrieve, + checkout_session_id, # type: ignore[union-attr] + ) + podcast_session_id = (stripe_session.metadata or {}).get( + "podcast_session_id", "" + ) + except Exception: + pass + + if podcast_session_id: + raw_paid = ctx.storage.get(_PAID_SESSIONS_KEY) or "[]" + paid_ids: list = json.loads(raw_paid) + if podcast_session_id not in paid_ids: + paid_ids.append(podcast_session_id) + ctx.storage.set(_PAID_SESSIONS_KEY, json.dumps(paid_ids)) + ctx.logger.info(f"[Payment] Session {podcast_session_id[:8]} marked as paid.") + + if checkout_session_id in pending_data: + pending_data.pop(checkout_session_id, None) + ctx.storage.set(_PENDING_PAYMENTS_KEY, json.dumps(pending_data)) + + await ctx.send( + sender, + _chat( + "## Payment confirmed -- Live Show Pass is active!\n\n" + "The hosts are loaded and ready. Here's what you can do now:\n\n" + "* **Start the live debate** -- type `continue debate`\n" + "* **Customize host personalities first** -- type `A:[1-4] B:[1-4]` " + "*(e.g. `A:2 B:3`)* or type `customize` to see the full menu\n" + "* **Ask the hosts anything** -- tag @skeptic-agent or @expert-agent\n\n" + "*Tip: set personalities before the debate for the best experience!*" + ), + ) + + +@payment_proto.on_message(RejectPayment) +async def on_reject_payment(ctx: Context, sender: str, msg: RejectPayment) -> None: + """User cancelled or the UI rejected the payment.""" + ctx.logger.info(f"[Payment] Payment rejected by {sender[:20]}...: {msg.reason}") + + +# ── Debate response relay (Host β†’ Orchestrator β†’ User bubble) ───────────────── + + +@orchestrator.on_message(DebateResponse) +async def handle_debate_response( + ctx: Context, sender: str, msg: DebateResponse +) -> None: + """Receive one debate line, accumulate internally, show progress only. + + Intermediate updates send a short progress indicator (no debate content) + so the chat stays clean. The complete transcript is delivered in a single + message only after the final turn. + """ + active_sid = ctx.storage.get(_ACTIVE_DEBATE_KEY) + if active_sid != msg.session_id: + ctx.logger.warning( + f"[DebateResponse] Stale session {msg.session_id[:8]} (active={active_sid}). Dropped." + ) + return + + raw_accum = ctx.storage.get(_DEBATE_ACCUM_KEY) + if not raw_accum: + ctx.logger.error("[DebateResponse] No accumulator found. Abort.") + return + accum = json.loads(raw_accum) + + debate_msg_id = UUID(ctx.storage.get(_DEBATE_MSG_ID_KEY) or str(uuid4())) + + label = ( + "🎀 **@skeptic-agent**" if msg.speaker == "skeptic" else "πŸŽ“ **@expert-agent**" + ) + accum["lines"].append(f"{label}\n{msg.reply_text}") + ctx.storage.set(_DEBATE_ACCUM_KEY, json.dumps(accum)) + + user_address = accum["user"] + a_hint = accum.get("a_hint", "") + b_hint = accum.get("b_hint", "") + topic = msg.topic_title + total = len(accum["lines"]) + next_turn = msg.turn + 1 + is_final = next_turn >= msg.max_turns + + if is_final: + # Deliver the COMPLETE debate transcript in one message + body = "\n\n".join(accum["lines"]) + full_text = ( + f"## 🎭 Live Debate β€” *{topic}*\n\n" + f"{body}\n\n" + "---\n" + "🎬 *Debate over. Tag @skeptic-agent or @expert-agent to keep the conversation going.*" + ) + await ctx.send(user_address, _chat(full_text)) + ctx.storage.set(_ACTIVE_DEBATE_KEY, "") + ctx.logger.info( + f"[DebateResponse] Debate finished β€” {msg.max_turns} turns delivered." + ) + else: + next_speaker = "expert" if msg.speaker == "skeptic" else "skeptic" + next_label = ( + "πŸŽ“ @expert-agent" if next_speaker == "expert" else "🎀 @skeptic-agent" + ) + next_personality = b_hint if next_speaker == "expert" else a_hint + + # Progress-only update β€” no debate content, keeps the chat clean + speaker_done = ( + "🎀 @skeptic-agent" if msg.speaker == "skeptic" else "πŸŽ“ @expert-agent" + ) + progress_bar = "".join("β–ˆ" if i < total else "β–‘" for i in range(msg.max_turns)) + progress_text = ( + f"## 🎭 Live Debate β€” *{topic}*\n\n" + f"`{progress_bar}` {total}/{msg.max_turns}\n\n" + f"βœ“ {speaker_done} β€” *delivered*\n\n" + f"⏳ *{next_label} is responding…*" + ) + await ctx.send(user_address, _chat_open(progress_text, debate_msg_id)) + ctx.logger.info( + f"[DebateResponse] Turn {msg.turn} delivered. Waiting {_DEBATE_TURN_DELAY_SECS} s for turn {next_turn}…" + ) + + # Keep a human-paced cadence in the "live" stream while preserving turn order. + await asyncio.sleep(_DEBATE_TURN_DELAY_SECS) + + # Build a plain-text debate history from all accumulated lines + history_lines = [] + for line in accum["lines"]: + clean = line.replace("🎀 **@skeptic-agent**\n", "Skeptic: ") + clean = clean.replace("πŸŽ“ **@expert-agent**\n", "Expert: ") + history_lines.append(clean) + debate_history = "\n".join(history_lines) + + next_host = HOST_B_ADDRESS if next_speaker == "expert" else HOST_A_ADDRESS + await ctx.send( + next_host, + DebateTurn( + session_id=msg.session_id, + topic_title=msg.topic_title, + core_thesis=msg.core_thesis, + key_metrics=msg.key_metrics, + controversial_point=msg.controversial_point, + document_snippet=msg.document_snippet, + user_address=user_address, + turn=next_turn, + max_turns=msg.max_turns, + previous_statement=msg.reply_text, + speaker_personality=next_personality, + debate_history=debate_history, + ), + ) + ctx.logger.info( + f"[DebateResponse] DebateTurn {next_turn} β†’ " + f"{'HostB' if next_speaker == 'expert' else 'HostA'}" + ) + + +# ── ASI:One Chat Protocol ───────────────────────────────────────────────────── + + +@chat_proto.on_message(ChatMessage) +async def handle_chat_message(ctx: Context, sender: str, msg: ChatMessage) -> None: + await ctx.send( + sender, + ChatAcknowledgement( + timestamp=datetime.now(timezone.utc), acknowledged_msg_id=msg.msg_id + ), + ) + + # ── Message deduplication (Agentverse retries the same msg_id) ──────────── + import time as _time + + msg_key = str(msg.msg_id) + try: + seen_raw = ctx.storage.get(_SEEN_MSG_IDS_KEY) or "[]" + seen: list = json.loads(seen_raw) + except Exception: + seen = [] + if msg_key in seen: + ctx.logger.info(f"[Chat] Duplicate msg_id {msg_key[:12]}… β€” skipped.") + return + seen.append(msg_key) + seen = seen[-50:] # keep last 50 to avoid unbounded growth + ctx.storage.set(_SEEN_MSG_IDS_KEY, json.dumps(seen)) + + # ── Auto-expire stale active debates (TTL guard) ───────────────────────── + active = ctx.storage.get(_ACTIVE_DEBATE_KEY) + if active: + debate_ts = float(ctx.storage.get(_ACTIVE_DEBATE_TS_KEY) or "0") + if _time.time() - debate_ts > _ACTIVE_DEBATE_TTL: + ctx.logger.info( + f"[Chat] Stale debate {active[:8]} expired after " + f"{_ACTIVE_DEBATE_TTL}s β€” clearing." + ) + ctx.storage.set(_ACTIVE_DEBATE_KEY, "") + ctx.storage.set(_DEBATE_ACCUM_KEY, "") + + # ── Text-command block (debate, personality) ────────────────────────────── + has_resource = any(isinstance(item, ResourceContent) for item in msg.content) + if not has_resource: + combined_text = " ".join( + item.text.strip() + for item in msg.content + if isinstance(item, TextContent) and item.text.strip() + ) + combined_lower = combined_text.lower() + + # Personality selection: "A:2 B:3" + personality_match = _PERSONALITY_SET_RE.search(combined_text) + if personality_match: + if not await _gate_with_payment( + ctx, sender, "Host Personality Customization" + ): + return + a_key, b_key = personality_match.group(1), personality_match.group(2) + a_name, a_hint = _PERSONALITY_PRESETS["host_a"][a_key] + b_name, b_hint = _PERSONALITY_PRESETS["host_b"][b_key] + ctx.storage.set( + _PERSONALITIES_KEY, + json.dumps( + { + "a": a_key, + "b": b_key, + "a_name": a_name, + "a_hint": a_hint, + "b_name": b_name, + "b_hint": b_hint, + } + ), + ) + ctx.logger.info(f"[Personalities] Set A={a_name}, B={b_name}") + await ctx.send( + sender, + _chat( + f"βœ… **Host personalities updated!**\n\n" + f"🎀 **@skeptic-agent** β†’ *{a_name}*\n" + f"πŸŽ“ **@expert-agent** β†’ *{b_name}*\n\n" + f"These apply to all Q&A responses and the next debate.\n" + f"Type `debate` to watch them go head-to-head with the new styles." + ), + ) + return + + # Personality picker menu + if any(t in combined_lower for t in _PERSONALITY_TRIGGERS): + if not await _gate_with_payment( + ctx, sender, "Host Personality Customization" + ): + return + await ctx.send(sender, _chat(_PERSONALITY_MENU)) + return + + # Live debate trigger + if any(trigger in combined_lower for trigger in _LIVE_DEBATE_TRIGGERS): + # Cooldown guard β€” ASI:One retries cause duplicate deliveries + last_ts = float(ctx.storage.get(_DEBATE_COOLDOWN_KEY) or "0") + if _time.time() - last_ts < _DEBATE_COOLDOWN_SECS: + ctx.logger.info( + f"[Chat] Debate trigger debounced (cooldown {_DEBATE_COOLDOWN_SECS}s active)." + ) + return + # Active-debate guard β€” don't spawn a second debate while one is running + active = ctx.storage.get(_ACTIVE_DEBATE_KEY) + if active: + ctx.logger.info( + f"[Chat] Debate trigger ignored β€” session {active[:8]} already active." + ) + return + if not await _gate_with_payment(ctx, sender, "Live Debate"): + return + await _run_live_debate(ctx, sender) + return + + document_text: Optional[str] = None + + for item in msg.content: + # PDF sent as an attachment from ASI:One + if isinstance(item, ResourceContent): + if isinstance(item.resource, list) and not item.resource: + ctx.logger.warning( + "[Chat] ResourceContent had empty resource list; skipping item." + ) + continue + resource = ( + item.resource[0] if isinstance(item.resource, list) else item.resource + ) + try: + document_text = _resource_to_text(resource) + ctx.logger.info( + f"[Chat] PDF attachment received via ResourceContent ({len(document_text):,} chars)" + ) + except Exception as e: + ctx.logger.error(f"ResourceContent PDF read error: {e}") + break + + if isinstance(item, TextContent): + text = item.text.strip() + if len(text) > 300 and not text.lower().endswith(".pdf"): + document_text = text + break + if text.lower().endswith(".pdf") and os.path.exists(text): + try: + document_text = _pdf_path_to_text(text) + except Exception as e: + ctx.logger.error(f"PDF read error: {e}") + break + + if not document_text: + await ctx.send( + sender, + _chat( + "Hi! I'm the PDF Podcast Agent.\n\n" + "Send me either:\n" + " β€’ The full pasted text of your PDF, or\n" + " β€’ An absolute server-side path to a .pdf file\n\n" + "I'll turn it into a 2-host debate podcast!" + ), + ) + return + + session_id = str(uuid4()) + + await ctx.send( + sender, + _chat( + f"Starting pipeline β€” session `{session_id[:8]}`\n" + f"Steps: Extractor β†’ Scriptwriter β†’ Voice Studio\n" + f"Usually 30–90 s. Hang tight!" + ), + ) + + try: + audio, script, insights = await _run_pipeline(ctx, document_text, session_id) + + # Persist session context so the live-debate trigger can use it later + ctx.storage.set( + _LAST_SESSION_KEY, + json.dumps( + { + "session_id": session_id, + "topic_title": script.topic_title, + "core_thesis": insights.core_thesis, + "key_metrics": insights.key_metrics, + "controversial_point": insights.controversial_point, + "document_snippet": document_text[:3000], + "script_lines": [ + {"speaker": line.speaker, "text": line.text} + for line in script.lines + ], + } + ), + ) + + audio_url = _audio_url(audio.audio_path) + line_count = len(script.lines) + duration_s = line_count * 12 + duration = f"{duration_s // 60}:{duration_s % 60:02d}" + + # Run post-pipeline tasks concurrently + docx_path, _ = await asyncio.gather( + _build_docx(audio.audio_path, script, insights), + _inject_context_to_hosts(ctx, session_id, document_text, script, insights), + ) + + docx_url = _audio_url(docx_path) + + # ── Consolidated episode snapshot ─────────────────────────────────── + snapshot_lines = [ + f"> πŸ“– **{insights.core_thesis[:120]}{'…' if len(insights.core_thesis) > 120 else ''}**", + ">", + ] + for m in insights.key_metrics: + snapshot_lines.append(f"> β€’ {m}") + snapshot_lines += [ + ">", + f"> πŸ”₯ *{insights.controversial_point[:120]}{'…' if len(insights.controversial_point) > 120 else ''}*", + ">", + ] + for line in script.lines[:3]: + tag = "[A]" if line.speaker == "HostA" else "[B]" + snapshot_lines.append( + f"> **{tag}** {line.text[:90]}{'…' if len(line.text) > 90 else ''}" + ) + if line_count > 3: + snapshot_lines.append("> …") + snapshot = "\n".join(snapshot_lines) + + # ── Live Q&A section ───────────────────────────────────────────────── + hosts_ready = bool(HOST_A_ADDRESS and HOST_B_ADDRESS) + session_paid = _is_session_paid(ctx, session_id) + stripe_active = _stripe_enabled() + a_name, _, b_name, _ = _load_personalities(ctx) + price_dollars = int(os.getenv("STRIPE_LIVE_SHOW_PRICE_CENTS", "1000")) / 100 + + if not hosts_ready: + qa_section = ( + "## πŸ’¬ Live Q&A β€” Ask the Hosts\n" + "*(Start `host_a_agent.py` and `host_b_agent.py` and add their addresses " + "to `.env` as `HOST_A_ADDRESS` / `HOST_B_ADDRESS` to enable live Q&A and debate.)*" + ) + elif stripe_active and not session_paid: + # Payment required β€” show free tier + locked premium section + qa_section = ( + "## πŸ’¬ Live Q&A β€” Ask the Hosts *(FREE)*\n" + "I've called **@skeptic-agent** and **@expert-agent** into this chat " + "and they have memorised the full paper.\n\n" + "Use **@skeptic-agent** or **@expert-agent** here to ask anything about the episode.\n\n" + "---\n\n" + f"## πŸ”’ Live Show Pass β€” ${price_dollars:.0f}\n" + "Unlock the full interactive experience for this episode:\n\n" + "β€’ 🎭 **Live Debate** β€” @skeptic-agent vs @expert-agent, streamed turn-by-turn\n" + "β€’ 🎨 **Host Personality Customization** β€” 16 personality combos\n" + "β€’ πŸ’¬ **Extended Q&A** β€” deeper, richer host responses\n\n" + "Type **`debate`** or **`customize`** to unlock via Stripe." + ) + else: + # No payment gate (Stripe not configured) OR already paid β€” show everything + pass_badge = " *(🎟️ Live Show Pass active)*" if session_paid else "" + qa_section = ( + f"## πŸ’¬ Live Q&A β€” Ask the Hosts{pass_badge}\n" + "I've called **@skeptic-agent** and **@expert-agent** into this chat " + "and they have memorised the full paper.\n\n" + "Use **@skeptic-agent** or **@expert-agent** here to ask anything about the episode.\n\n" + "---\n\n" + "## 🎭 Watch Them Debate Live\n" + "Want to see @skeptic-agent and @expert-agent go head-to-head on this paper?\n\n" + "Just type any of these:\n" + "> `debate`  Β·  `live debate`  Β·  `start debate` " + " Β·  `watch them`  Β·  `replay`\n\n" + "---\n\n" + "## 🎨 Customize Host Personalities\n" + f"*Currently: 🎀 **{a_name}** Β· πŸŽ“ **{b_name}***\n\n" + "Want a different vibe? Reply **A:[1-4] B:[1-4]**:\n\n" + "🎀 **@skeptic-agent**: 1. Classic Skeptic  Β·  2. Investigative Journalist " + " Β·  3. Academic Critic  Β·  4. Industry Veteran\n\n" + "πŸŽ“ **@expert-agent**: 1. Researcher  Β·  2. Industry Insider " + " Β·  3. Futurist  Β·  4. Enthusiastic Teacher\n\n" + "*Example: type* `A:2 B:3` *for Investigative Journalist vs Futurist.*" + ) + + reply = "\n".join( + [ + "## βœ… Podcast Ready!", + "", + f"## πŸŽ™οΈ {script.topic_title}", + f"⏱️ ~{duration}  β€’  {line_count} voiced lines  β€’  2 hosts", + "", + "---", + "## πŸ”— Downloads", + f"πŸ”Š **Listen (MP3):** {audio_url}", + f"πŸ“„ **Full Script (DOCX):** {docx_url}", + "", + "---", + "## πŸ“‹ Episode Snapshot", + "*Preview only β€” open the DOCX for the full analysis, " + "timestamped transcript, and extended director's cut.*", + "", + snapshot, + "", + "---", + # ── Change 3: single-line background note ────────────────────── + "🧠 *Paper context loaded into host agents.*", + "", + "---", + qa_section, + ] + ) + + await ctx.send(sender, _chat(reply)) + + except Exception as exc: + ctx.logger.error(f"Pipeline error: {exc}") + await ctx.send(sender, _chat(f"❌ Pipeline failed: {exc}")) + + +@chat_proto.on_message(ChatAcknowledgement) +async def handle_ack(ctx: Context, sender: str, msg: ChatAcknowledgement) -> None: + ctx.logger.info(f"[Orchestrator] ACK received from {sender[:20]}…") + + +# ── REST endpoint (local testing) ───────────────────────────────────────────── + + +@orchestrator.on_rest_post("/process", PipelineRequest, PipelineResponse) +async def handle_rest_process(ctx: Context, req: PipelineRequest) -> PipelineResponse: + session_id = str(uuid4()) + ctx.logger.info(f"[REST] session {session_id[:8]}") + + try: + if req.pdf_base64: + document_text = _pdf_bytes_to_text(base64.b64decode(req.pdf_base64)) + elif req.pdf_path: + document_text = _pdf_path_to_text(req.pdf_path) + else: + return PipelineResponse( + audio_base64="", + audio_path="", + script_json="[]", + topic_title="", + status="error", + error_message="Provide pdf_path or pdf_base64.", + ) + + if not document_text.strip(): + return PipelineResponse( + audio_base64="", + audio_path="", + script_json="[]", + topic_title="", + status="error", + error_message="Could not extract text from PDF.", + ) + + ctx.logger.info( + f"[REST] {len(document_text):,} chars extracted β€” running pipeline …" + ) + audio, script, insights = await _run_pipeline(ctx, document_text, session_id) + + # Voice Studio no longer sends base64 over the wire to stay under the + # Agentverse mailbox size limit β€” read the file from disk instead. + audio_b64 = audio.audio_base64 + if not audio_b64 and audio.audio_path: + try: + audio_b64 = base64.b64encode( + Path(audio.audio_path).read_bytes() + ).decode() + except Exception: + audio_b64 = "" + + return PipelineResponse( + audio_base64=audio_b64, + audio_path=audio.audio_path, + script_json=json.dumps( + [{"speaker": line.speaker, "text": line.text} for line in script.lines], + indent=2, + ), + topic_title=script.topic_title, + status="success", + ) + + except Exception as exc: + ctx.logger.error(f"[REST] error: {exc}") + return PipelineResponse( + audio_base64="", + audio_path="", + script_json="[]", + topic_title="", + status="error", + error_message=str(exc), + ) + + +orchestrator.include(chat_proto, publish_manifest=True) +orchestrator.include(payment_proto, publish_manifest=True) + + +if __name__ == "__main__": + orchestrator.run() diff --git a/pdf-podcast-agent/render.yaml b/pdf-podcast-agent/render.yaml new file mode 100644 index 00000000..bbea200d --- /dev/null +++ b/pdf-podcast-agent/render.yaml @@ -0,0 +1,34 @@ +services: + - type: web + name: pdf-podcast-agent + runtime: python + plan: standard + buildCommand: | + pip install -r requirements.txt + apt-get install -y ffmpeg || true + startCommand: python run.py + envVars: + - key: PYTHON_VERSION + value: "3.11.9" + - key: OPENAI_API_KEY + sync: false + - key: AGENTVERSE_API_KEY + sync: false + - key: ORCHESTRATOR_SEED + sync: false + - key: EXTRACTOR_SEED + sync: false + - key: SCRIPTWRITER_SEED + sync: false + - key: VOICE_STUDIO_SEED + sync: false + - key: TTS_MODEL + value: "tts-1" + - key: EXTRACTION_MODEL + value: "gpt-4o-mini" + - key: SCRIPTWRITER_MODEL + value: "gpt-4o-mini" + - key: SILENCE_MS + value: "400" + - key: OUTPUT_DIR + value: "/tmp/podcast_output" diff --git a/pdf-podcast-agent/requirements.txt b/pdf-podcast-agent/requirements.txt new file mode 100644 index 00000000..b351e695 --- /dev/null +++ b/pdf-podcast-agent/requirements.txt @@ -0,0 +1,37 @@ +# PDF-to-Podcast Agent – Python dependencies +# Install with: pip install -r requirements.txt + +# ── Fetch.ai uAgents framework ──────────────────────────────────────────────── +uagents>=0.24.1 +uagents-core>=0.4.4 + +# ── OpenAI (LLM + TTS) ──────────────────────────────────────────────────────── +openai>=1.30.0 + +# ── PDF text extraction ─────────────────────────────────────────────────────── +pdfplumber>=0.11.0 + +# ── Audio stitching ─────────────────────────────────────────────────────────── +# pydub requires ffmpeg on your PATH for MP3 encoding. +# Install ffmpeg: https://ffmpeg.org/download.html +# Windows: choco install ffmpeg | winget install ffmpeg +# macOS: brew install ffmpeg +# Linux: sudo apt install ffmpeg +pydub>=0.25.1 +audioop-lts>=0.2.1; python_version >= "3.13" + +# ── HTTP (test script only) ─────────────────────────────────────────────────── +requests>=2.31.0 +types-requests>=2.32.0.20250328 + +# ── DOCX script generation ─────────────────────────────────────────────────── +python-docx>=1.1.0 + +# ── MP3 ID3 metadata ───────────────────────────────────────────────────────── +mutagen>=1.47.0 + +# ── Environment variable loading ────────────────────────────────────────────── +python-dotenv>=1.0.0 + +# ── Stripe payments ──────────────────────────────────────────────────────────── +stripe>=8.0.0 diff --git a/pdf-podcast-agent/run.py b/pdf-podcast-agent/run.py new file mode 100644 index 00000000..99209683 --- /dev/null +++ b/pdf-podcast-agent/run.py @@ -0,0 +1,112 @@ +""" +run.py – Convenience launcher (optional) +========================================= +Starts all six agents in SEPARATE subprocesses (one per agent), so you +get clean, isolated logs for each one β€” just like running 6 terminals by hand. + +Usage: + python run.py + +Press Ctrl+C to stop all agents at once. + +Alternatively, start each agent manually in its own terminal: + Terminal 1: python extractor_agent.py + Terminal 2: python scriptwriter_agent.py + Terminal 3: python voice_studio_agent.py + Terminal 4: python host_a_agent.py + Terminal 5: python host_b_agent.py + Terminal 6: python orchestrator.py (start LAST) +""" + +import os +import subprocess +import sys +import threading +import time + +# ── Pre-flight checks ───────────────────────────────────────────────────────── + +if not os.getenv("OPENAI_API_KEY"): + print("[ERROR] OPENAI_API_KEY is not set.") + print(" Set it first: $env:OPENAI_API_KEY='sk-...' (PowerShell)") + sys.exit(1) + +# Compute and inject addresses so the orchestrator has them from the start +print("Computing agent addresses …") + +result = subprocess.run( + [sys.executable, "get_addresses.py"], + capture_output=True, + text=True, + check=True, +) +output = result.stdout +print(output) + +# Parse the env-var lines from get_addresses output +env_extra = {} +for line in output.splitlines(): + if line.startswith("EXTRACTOR_ADDRESS="): + env_extra["EXTRACTOR_ADDRESS"] = line.split("=", 1)[1] + elif line.startswith("SCRIPTWRITER_ADDRESS="): + env_extra["SCRIPTWRITER_ADDRESS"] = line.split("=", 1)[1] + elif line.startswith("VOICE_STUDIO_ADDRESS="): + env_extra["VOICE_STUDIO_ADDRESS"] = line.split("=", 1)[1] + elif line.startswith("HOST_A_ADDRESS="): + env_extra["HOST_A_ADDRESS"] = line.split("=", 1)[1] + elif line.startswith("HOST_B_ADDRESS="): + env_extra["HOST_B_ADDRESS"] = line.split("=", 1)[1] + +child_env = {**os.environ, **env_extra} + +# ── Launch subprocesses ─────────────────────────────────────────────────────── + +agents = [ + ("Extractor", [sys.executable, "extractor_agent.py"]), + ("Scriptwriter", [sys.executable, "scriptwriter_agent.py"]), + ("VoiceStudio", [sys.executable, "voice_studio_agent.py"]), + ("HostA", [sys.executable, "host_a_agent.py"]), + ("HostB", [sys.executable, "host_b_agent.py"]), + ("Orchestrator", [sys.executable, "orchestrator.py"]), +] + +procs = [] +for label, cmd in agents: + p = subprocess.Popen( + cmd, + env=child_env, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1, + ) + procs.append((label, p)) + print(f"[run.py] Started {label} (pid {p.pid})") + time.sleep(0.5) # stagger startup slightly + +print("\n[run.py] All agents running. Press Ctrl+C to stop.\n") + + +def stream(label, proc): + for line in proc.stdout: + print(f"[{label}] {line}", end="") + + +threads = [ + threading.Thread(target=stream, args=(label, proc), daemon=True) + for label, proc in procs +] +for t in threads: + t.start() + +# ── Shutdown on Ctrl+C ──────────────────────────────────────────────────────── +try: + while all(p.poll() is None for _, p in procs): + time.sleep(1) +except KeyboardInterrupt: + print("\n[run.py] Shutting down …") + for label, p in procs: + p.terminate() + for label, p in procs: + p.wait() + print("[run.py] All agents stopped.") diff --git a/pdf-podcast-agent/schemas.py b/pdf-podcast-agent/schemas.py new file mode 100644 index 00000000..8282909d --- /dev/null +++ b/pdf-podcast-agent/schemas.py @@ -0,0 +1,137 @@ +""" +Shared Pydantic data models for the PDF-to-Podcast agent pipeline. +All four agents exchange these schemas so the LLM can never hallucinate formats. +""" + +from uagents import Model +from typing import List + + +# ── 1. Orchestrator β†’ Extractor ────────────────────────────────────────────── + + +class ExtractRequest(Model): + """Raw document text sent by the Orchestrator to the RAG Extractor.""" + + document_text: str + session_id: str + + +# ── 2. Extractor β†’ Scriptwriter ────────────────────────────────────────────── + + +class ResearchInsights(Model): + """Distilled key facts returned by the RAG Extractor.""" + + core_thesis: str + key_metrics: List[str] + controversial_point: str + session_id: str + + +# ── 3. Scriptwriter β†’ Voice Studio ─────────────────────────────────────────── + + +class DialogueLine(Model): + """A single line of spoken dialogue (speaker + text).""" + + speaker: str # "HostA" (skeptic) or "HostB" (expert) + text: str + + +class PodcastScript(Model): + """Full back-and-forth dialogue script returned by the Scriptwriter.""" + + lines: List[DialogueLine] + topic_title: str + session_id: str + + +# ── 4. Voice Studio β†’ Orchestrator ─────────────────────────────────────────── + + +class AudioResponse(Model): + """Stitched MP3 returned by the Voice Studio.""" + + audio_base64: str # base64-encoded final MP3 bytes + audio_path: str # local path where the MP3 was saved + session_id: str + line_count: int + + +# ── 5. Debate chain (Orchestrator ↔ Host Agents) ───────────────────────────── + + +class DebateTurn(Model): + """Orchestrator β†’ Host: request one debate turn.""" + + session_id: str + topic_title: str + core_thesis: str + key_metrics: List[str] + controversial_point: str + document_snippet: str + user_address: str # ASI:One user to stream the debate to + turn: int # 0-indexed + max_turns: int + previous_statement: str # opponent's last line (empty on turn 0) + speaker_personality: str = "" # system-prompt personality hint for this host + debate_history: str = "" # full conversation so far (all prior turns) + + +class DebateResponse(Model): + """Host β†’ Orchestrator: the generated debate line for this turn.""" + + session_id: str + speaker: str # "skeptic" | "expert" + reply_text: str + turn: int + max_turns: int + user_address: str + topic_title: str + core_thesis: str + key_metrics: List[str] + controversial_point: str + document_snippet: str + + +# ── 6. Orchestrator β†’ Host Agents (context injection) ──────────────────────── + + +class ContextInjection(Model): + """Sent by the Orchestrator to @HostA and @HostB after the pipeline runs. + + The two host agents persist this in ctx.storage so they can answer + follow-up questions from users in ASI:One's chat interface. + """ + + session_id: str # UUID; agents store context keyed by this + topic_title: str + core_thesis: str + key_metrics: List[str] + controversial_point: str + document_snippet: str # first 4 000 chars of the source doc for Q&A depth + host_a_personality: str = "" # system-prompt personality hint for Host A + host_b_personality: str = "" # system-prompt personality hint for Host B + + +# ── REST / Local-testing helpers ────────────────────────────────────────────── + + +class PipelineRequest(Model): + """Payload accepted by the Orchestrator's REST POST /process endpoint.""" + + pdf_path: str = "" # absolute or relative path to a PDF on disk + pdf_base64: str = "" # base64-encoded PDF bytes (alternative to path) + user_prompt: str = "Turn this into a 3-minute debate" + + +class PipelineResponse(Model): + """Response returned by the Orchestrator's REST POST /process endpoint.""" + + audio_base64: str + audio_path: str + script_json: str # JSON-serialised list of DialogueLine dicts + topic_title: str + status: str # "success" | "error" + error_message: str = "" diff --git a/pdf-podcast-agent/scriptwriter_agent.py b/pdf-podcast-agent/scriptwriter_agent.py new file mode 100644 index 00000000..61737fa8 --- /dev/null +++ b/pdf-podcast-agent/scriptwriter_agent.py @@ -0,0 +1,146 @@ +""" +Agent 2 – The Scriptwriter +========================== +Run in its own terminal: + + python scriptwriter_agent.py + +Receives the distilled ResearchInsights and uses a carefully engineered +prompt to generate a lively, two-host debate script. + +By separating this from extraction we prevent the LLM from losing hard +metrics while trying to be creative at the same time. +""" + +import json +import os + +from dotenv import load_dotenv +from openai import AsyncOpenAI +from uagents import Agent, Context + +from schemas import DialogueLine, PodcastScript, ResearchInsights + +load_dotenv() + +# ── Configuration ───────────────────────────────────────────────────────────── + +SCRIPTWRITER_MODEL = os.getenv("SCRIPTWRITER_MODEL", "gpt-4o-mini") +OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") + +_SYSTEM_PROMPT = """\ +You are the head writer for a popular tech podcast called "Debug Mode". +Your job is to turn dry research facts into a gripping, natural-sounding +debate between two hosts: + + β€’ HostA – The Skeptic. Pushes back, asks "but how?", demands proof. + β€’ HostB – The Expert. Defends the work with specific numbers and insight. + +Given the research insights below, write a 10–14 line back-and-forth script. + +Rules: +1. Make it sound HUMAN. Use natural speech: "Wait, really?", "Right, butβ€”", + "Okay hold on…", "That's the thing though,", "Exactly!" +2. HostB must cite at least 3 of the provided key_metrics by exact value. +3. The controversial_point must be debated directly. +4. Each line should be 1–3 sentences β€” podcast pacing, not a lecture. +5. End on an unresolved tension or a provocative open question. +6. Do NOT add stage directions, music cues, or section headers. +7. Return ONLY valid JSON β€” no markdown, no extra prose: + +{ + "topic_title": "", + "lines": [ + {"speaker": "HostA", "text": "…"}, + {"speaker": "HostB", "text": "…"} + ] +} +""" + +# ── Agent ───────────────────────────────────────────────────────────────────── + +scriptwriter = Agent( + name="podcast_scriptwriter", + seed=os.getenv("SCRIPTWRITER_SEED", "podcast_scriptwriter_seed_v1"), + port=8002, + endpoint=["http://localhost:8002/submit"], + network="testnet", +) + +client = AsyncOpenAI(api_key=OPENAI_API_KEY) + +# ── Handlers ────────────────────────────────────────────────────────────────── + + +@scriptwriter.on_event("startup") +async def on_startup(ctx: Context) -> None: + ctx.logger.info("[Scriptwriter] ready") + ctx.logger.info(f"[Scriptwriter] address: {ctx.agent.address}") + + +@scriptwriter.on_message(model=ResearchInsights) +async def handle_script(ctx: Context, sender: str, msg: ResearchInsights) -> None: + sid = msg.session_id[:8] + ctx.logger.info(f"[{sid}] Scripting β€” thesis: {msg.core_thesis[:60]}…") + + user_content = ( + f"Core thesis:\n{msg.core_thesis}\n\n" + f"Key metrics:\n" + "\n".join(f" β€’ {m}" for m in msg.key_metrics) + "\n\n" + f"Most controversial point:\n{msg.controversial_point}" + ) + + try: + resp = await client.chat.completions.create( + model=SCRIPTWRITER_MODEL, + response_format={"type": "json_object"}, + messages=[ + {"role": "system", "content": _SYSTEM_PROMPT}, + {"role": "user", "content": user_content}, + ], + temperature=0.75, + max_tokens=2_000, + ) + + raw = resp.choices[0].message.content + data = json.loads(raw) + + lines = [ + DialogueLine(speaker=line["speaker"], text=line["text"]) + for line in data.get("lines", []) + ] + + if not lines: + raise ValueError("LLM returned empty lines array.") + + script = PodcastScript( + lines=lines, + topic_title=data.get("topic_title", "Tech Debate"), + session_id=msg.session_id, + ) + + ctx.logger.info( + f"[{sid}] Script done β€” {len(lines)} lines β€’ '{script.topic_title}'" + ) + await ctx.send(sender, script) + + except Exception as exc: + ctx.logger.error(f"[{sid}] Error: {exc}") + fallback = PodcastScript( + lines=[ + DialogueLine( + speaker="HostA", + text=f"Today we're discussing: {msg.core_thesis[:120]}", + ), + DialogueLine( + speaker="HostB", + text=f"The key finding: {msg.controversial_point[:120]}", + ), + ], + topic_title="Research Breakdown", + session_id=msg.session_id, + ) + await ctx.send(sender, fallback) + + +if __name__ == "__main__": + scriptwriter.run() diff --git a/pdf-podcast-agent/test_pipeline.py b/pdf-podcast-agent/test_pipeline.py new file mode 100644 index 00000000..0092e465 --- /dev/null +++ b/pdf-podcast-agent/test_pipeline.py @@ -0,0 +1,93 @@ +""" +test_pipeline.py – Local smoke test +===================================== +Calls the Orchestrator's REST endpoint with a PDF path and saves the +resulting MP3 to disk. + +Usage: + python test_pipeline.py path/to/your/document.pdf + +The script will print the episode title, the full script, and save the +audio to ./output/ +""" + +import argparse +import base64 +import json +import sys +import time +from pathlib import Path + +import requests # type: ignore[import-untyped] + +ORCHESTRATOR_URL = "http://localhost:8000/process" + + +def run_test(pdf_path: str) -> None: + path = Path(pdf_path).resolve() + if not path.exists(): + print(f"[ERROR] File not found: {path}") + sys.exit(1) + + print(f"\nπŸ“„ PDF: {path}") + print(f"🌐 Sending to: {ORCHESTRATOR_URL}") + print("⏳ Running pipeline (30–120 s) …\n") + + start = time.time() + + try: + resp = requests.post( + ORCHESTRATOR_URL, + json={"pdf_path": str(path)}, + timeout=180, + ) + resp.raise_for_status() + except requests.exceptions.ConnectionError: + print("[ERROR] Could not reach the Orchestrator. Is `python run.py` running?") + sys.exit(1) + except requests.exceptions.Timeout: + print("[ERROR] Request timed out after 180 s.") + sys.exit(1) + + data = resp.json() + elapsed = time.time() - start + + if data.get("status") != "success": + print(f"[ERROR] Pipeline failed: {data.get('error_message', 'unknown')}") + sys.exit(1) + + # ── Print results ───────────────────────────────────────────────────────── + title = data.get("topic_title", "Untitled") + script = json.loads(data.get("script_json", "[]")) + audio_path = data.get("audio_path", "") + + print(f"βœ… Done in {elapsed:.1f}s\n") + print(f"πŸŽ™οΈ Episode: {title}") + print(f"πŸ“ Script ({len(script)} lines):") + print("-" * 50) + for line in script: + label = " [A]" if line["speaker"] == "HostA" else " [B]" + print(f"{label} {line['text']}") + print("-" * 50) + + # ── Save audio ──────────────────────────────────────────────────────────── + if data.get("audio_base64"): + audio_bytes = base64.b64decode(data["audio_base64"]) + out_dir = Path("output") + out_dir.mkdir(exist_ok=True) + out_path = ( + out_dir / Path(audio_path).name if audio_path else out_dir / "podcast.mp3" + ) + with open(out_path, "wb") as f: + f.write(audio_bytes) + print(f"\n🎡 Audio saved β†’ {out_path.resolve()}") + print(f" Size: {len(audio_bytes) / 1024:.1f} KB") + else: + print("\n⚠️ No audio in response (check Voice Studio logs).") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="PDF-to-Podcast local smoke test") + parser.add_argument("pdf", help="Path to the PDF file to process") + args = parser.parse_args() + run_test(args.pdf) diff --git a/pdf-podcast-agent/voice_studio_agent.py b/pdf-podcast-agent/voice_studio_agent.py new file mode 100644 index 00000000..fe2baec1 --- /dev/null +++ b/pdf-podcast-agent/voice_studio_agent.py @@ -0,0 +1,162 @@ +""" +Agent 3 – The Voice Studio +========================== +Run in its own terminal: + + python voice_studio_agent.py + +Receives the PodcastScript, fires parallel OpenAI TTS calls for every +dialogue line (HostA β†’ "alloy", HostB β†’ "echo"), stitches the audio chunks +into a single MP3 using pydub, saves it to disk, and returns the base64- +encoded final MP3 to the Orchestrator. + +Requires: openai, pydub +ffmpeg on PATH for MP3 export β†’ winget install ffmpeg | brew install ffmpeg +Fallback: raw byte concatenation if pydub/ffmpeg are unavailable. +""" + +import asyncio +import io +import os +from datetime import datetime +from pathlib import Path + +from dotenv import load_dotenv +from openai import AsyncOpenAI +from uagents import Agent, Context + +from schemas import AudioResponse, PodcastScript + +load_dotenv() + +# ── Configuration ───────────────────────────────────────────────────────────── + +OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") +TTS_MODEL = os.getenv("TTS_MODEL", "tts-1") +OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "output")) + +VOICE_MAP = { + "HostA": os.getenv("VOICE_HOST_A", "alloy"), + "HostB": os.getenv("VOICE_HOST_B", "echo"), +} + +SILENCE_BETWEEN_LINES_MS = int(os.getenv("SILENCE_MS", "400")) + +# ── Agent ───────────────────────────────────────────────────────────────────── + +voice_studio = Agent( + name="voice_studio", + seed=os.getenv("VOICE_STUDIO_SEED", "voice_studio_podcast_seed_v1"), + port=8003, + endpoint=["http://localhost:8003/submit"], + network="testnet", +) + +client = AsyncOpenAI(api_key=OPENAI_API_KEY) +OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + +# ── Helpers ─────────────────────────────────────────────────────────────────── + + +async def _tts_call(text: str, voice: str) -> bytes: + response = await client.audio.speech.create( + model=TTS_MODEL, + voice=voice, + input=text, + response_format="mp3", + ) + return response.content + + +def _stitch_audio(chunks: list[bytes], silence_ms: int) -> bytes: + """Stitch MP3 chunks with pydub (best quality) or raw bytes (fallback).""" + try: + from pydub import AudioSegment # noqa: PLC0415 + + segments = [AudioSegment.from_mp3(io.BytesIO(c)) for c in chunks] + gap = AudioSegment.silent(duration=silence_ms) if silence_ms > 0 else None + + combined = segments[0] + for seg in segments[1:]: + if gap: + combined = combined + gap + seg + else: + combined = combined + seg + + buf = io.BytesIO() + combined.export(buf, format="mp3") + return buf.getvalue() + + except Exception: + # ffmpeg not available β€” raw concatenation still plays in most players + return b"".join(chunks) + + +# ── Handlers ────────────────────────────────────────────────────────────────── + + +@voice_studio.on_event("startup") +async def on_startup(ctx: Context) -> None: + ctx.logger.info("[Voice Studio] ready") + ctx.logger.info(f"[Voice Studio] address: {ctx.agent.address}") + ctx.logger.info( + f"[Voice Studio] HostA={VOICE_MAP['HostA']} HostB={VOICE_MAP['HostB']} model={TTS_MODEL}" + ) + + +@voice_studio.on_message(model=PodcastScript) +async def handle_voice(ctx: Context, sender: str, msg: PodcastScript) -> None: + sid = msg.session_id[:8] + ctx.logger.info(f"[{sid}] Generating {len(msg.lines)} TTS chunks in parallel …") + + try: + tasks = [ + _tts_call(line.text, VOICE_MAP.get(line.speaker, "alloy")) + for line in msg.lines + ] + chunks = await asyncio.gather(*tasks) + + ctx.logger.info(f"[{sid}] Stitching audio …") + final_bytes = _stitch_audio(list(chunks), silence_ms=SILENCE_BETWEEN_LINES_MS) + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + safe_title = "".join( + c if c.isalnum() or c in "-_" else "_" for c in msg.topic_title + )[:40] + filename = f"podcast_{safe_title}_{timestamp}.mp3" + audio_path = str(OUTPUT_DIR / filename) + + with open(audio_path, "wb") as f: + f.write(final_bytes) + + ctx.logger.info( + f"[{sid}] Saved β†’ {audio_path} ({len(final_bytes) / 1024:.1f} KB)" + ) + + # Send path only β€” base64 blob (~2 MB+) exceeds Agentverse mailbox size limit. + # The orchestrator constructs the download URL from audio_path directly. + await ctx.send( + sender, + AudioResponse( + audio_base64="", + audio_path=audio_path, + session_id=msg.session_id, + line_count=len(msg.lines), + ), + ) + + except Exception as exc: + ctx.logger.error(f"[{sid}] Error: {exc}") + await ctx.send( + sender, + AudioResponse( + audio_base64="", + audio_path="", + session_id=msg.session_id, + line_count=0, + ), + ) + + +if __name__ == "__main__": + voice_studio.run()