Skip to content
75 changes: 67 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ asyncio.run(main())

- 🎥 **Real-time Audio/Video streaming** - Receive synchronized audio/video frames from the avatar (as PyAV AudioFrame/VideoFrame objects)
- 💬 **Two-way communication** - Send text messages (like transcribed user speech) and receive generated responses
- 📝 **Real-time transcriptions** - Receive incremental message stream events for user and persona text as it's generated
- 📚 **Message history tracking** - Automatic conversation history with incremental updates
- 🎤 **Audio-passthrough** - Send TTS generated audio input and receive rendered synchronized audio/video avatar
- 🗣️ **Direct text-to-speech** - Send text directly to TTS for immediate speech output (bypasses LLM processing)
- 🎯 **Async iterator API** - Clean, Pythonic async/await patterns for continuous stream of audio/video frames
Expand Down Expand Up @@ -130,22 +132,59 @@ async with client.connect() as session:
Register callbacks for connection and message events using the `@client.on()` decorator:

```python
from anam import AnamEvent

@client.on(AnamEvent.MESSAGE_RECEIVED)
async def on_message(message: Message):
"""Called when a chat message is received."""
print(f"{message.role}: {message.content}")
from anam import AnamEvent, Message,MessageRole, MessageStreamEvent

@client.on(AnamEvent.CONNECTION_ESTABLISHED)
async def on_connected():
"""Called when the connection is established."""
pass
print("✅ Connected!")

@client.on(AnamEvent.CONNECTION_CLOSED)
async def on_closed(code: str, reason: str | None):
"""Called when the connection is closed."""
pass
print(f"Connection closed: {code} - {reason or 'No reason'}")

@client.on(AnamEvent.MESSAGE_STREAM_EVENT_RECEIVED)
async def on_message_stream_event(event: MessageStreamEvent):
"""Called for each incremental chunk of transcribed text or persona response.

This event fires for both user transcriptions and persona responses as they stream in.
This can be used for real-time captions or transcriptions.
"""
if event.role == MessageRole.USER:
# User transcription (from their speech)
if event.content_index == 0:
print(f"👤 User: ", end="", flush=True)
print(event.content, end="", flush=True)
if event.end_of_speech:
print() # New line when transcription completes
else:
# Persona response
if event.content_index == 0:
print(f"🤖 Persona: ", end="", flush=True)
print(event.content, end="", flush=True)
if event.end_of_speech:
status = "✓" if not event.interrupted else "✗ INTERRUPTED"
print(f" {status}")

@client.on(AnamEvent.MESSAGE_RECEIVED)
async def on_message(message: Message):
"""Called when a complete message is received (after end_of_speech).

This is fired after MESSAGE_STREAM_EVENT_RECEIVED with end_of_speech=True.
Useful for backward compatibility or when you only need complete messages.
"""
print(f"{message.role}: {message.content}")

@client.on(AnamEvent.MESSAGE_HISTORY_UPDATED)
async def on_message_history_updated(messages: list[Message]):
"""Called when the message history is updated (after a message completes).

The messages list contains the complete conversation history.
"""
print(f"📝 Conversation history: {len(messages)} messages")
for msg in messages:
print(f" {msg.role}: {msg.content[:50]}...")
```

### Session
Expand All @@ -157,9 +196,29 @@ async with client.connect() as session:
# Send a text message (simulates user speech)
await session.send_message("Hello, how are you?")

# Send text directly to TTS (bypasses LLM)
await session.talk("This will be spoken immediately")

# Stream text to TTS incrementally (for streaming scenarios)
await session.send_talk_stream(
content="Hello",
start_of_speech=True,
end_of_speech=False,
)
await session.send_talk_stream(
content=" world!",
start_of_speech=False,
end_of_speech=True,
)

# Interrupt the avatar if speaking
await session.interrupt()

# Get message history
history = client.get_message_history()
for msg in history:
print(f"{msg.role}: {msg.content}")

# Wait until the session ends
await session.wait_until_closed()
```
Expand Down
77 changes: 69 additions & 8 deletions examples/persona_interactive_video.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from dotenv import load_dotenv

from anam import AnamClient, AnamEvent, ClientOptions
from anam.types import AgentAudioInputConfig, PersonaConfig
from anam.types import AgentAudioInputConfig, MessageRole, PersonaConfig

# Add parent directory to path to allow importing from examples
sys.path.insert(0, str(Path(__file__).parent.parent))
Expand All @@ -38,7 +38,7 @@

# Configure logging - reduced verbosity
logging.basicConfig(
level=logging.WARNING, # Reduced from INFO to WARNING
level=logging.WARNING,
format="%(levelname)s: %(message)s", # Simplified format
)
logger = logging.getLogger(__name__)
Expand All @@ -47,19 +47,30 @@
logging.getLogger("anam").setLevel(logging.WARNING)
logging.getLogger("websockets").setLevel(logging.WARNING)
logging.getLogger("aiohttp").setLevel(logging.WARNING)
logging.getLogger("aiortc").setLevel(logging.WARNING)
logging.getLogger("aioice").setLevel(logging.WARNING)

# Global state for captions toggle
show_captions = False
print_conversation_history = False


async def interactive_loop(session, display: VideoDisplay) -> None:
"""Interactive command loop."""
global show_captions
global print_conversation_history
print("\n" + "=" * 60)
print("Interactive Session Started!")
print("=" * 60)
print("Available commands:")
print(" f [filename] - Send audio file (defaults to input.wav)")
print(" m <message> - Send text message (user input for the conversation.)")
print(" t|ts <text> - Send talk command (bypasses LLM and sends text directly to TTS). t: REST API, ts: WebSocket)")
print(" i - Interrupt current audio")
print(" q - Quit and stop session")
print(" f [filename] - Send audio file (defaults to input.wav)")
print(" m <message> - Send text message (user input for the conversation.)")
print(" t <text> - Send talk command (bypasses LLM and sends text to TTS) usingREST API)")
print(" ts <text> - Send talk stream (bypasses LLM and sends text to TTS) using WebSocket)")
print(" i - Interrupt current audio")
print(" c - Toggle live captions. Default: disabled")
print(" h - Toggle conversation history at session end. Default: disabled.")
print(" q - Quit and stop session")
print("=" * 60 + "\n")

while True:
Expand All @@ -79,6 +90,10 @@ async def interactive_loop(session, display: VideoDisplay) -> None:
display.stop()
break

elif command == "c":
show_captions = not show_captions
print(f"Captions {'enabled' if show_captions else 'disabled'}")

elif command == "f":
# Default to input.wav if no filename provided
wav_file = parts[1] if len(parts) > 1 else "input.wav"
Expand All @@ -92,6 +107,12 @@ async def interactive_loop(session, display: VideoDisplay) -> None:
else:
print(f"❌ File not found: {wav_file}")

elif command == "h":
print_conversation_history = not print_conversation_history
print(
f"Conversation history {'enabled' if print_conversation_history else 'disabled'}"
)

elif command == "m":
# Get the rest of the input as the message text
if len(parts) < 2:
Expand All @@ -115,7 +136,10 @@ async def interactive_loop(session, display: VideoDisplay) -> None:
await session.talk(message_text)
elif command == "ts":
await session.send_talk_stream(
message_text, start_of_speech=True, end_of_speech=True, correlation_id=None
message_text,
start_of_speech=True,
end_of_speech=True,
correlation_id=None,
)
print(f"✅ Sent talk command: {message_text}")
except Exception as e:
Expand Down Expand Up @@ -145,6 +169,7 @@ async def stream_session(
audio_player: AudioPlayer,
) -> None:
"""Run the streaming session."""
global show_captions

# Register connection event handlers
@client.on(AnamEvent.CONNECTION_ESTABLISHED)
Expand All @@ -153,8 +178,44 @@ async def on_connected() -> None:

@client.on(AnamEvent.CONNECTION_CLOSED)
async def on_closed(code: str, reason: str | None) -> None:
global print_conversation_history
if print_conversation_history:
print("Conversation transcript:")
print("=" * 24)
print(
"\n".join(
[
f"{m.role.value.capitalize()}: {m.content}"
for m in client.get_message_history()
]
)
)
print(f"Connection closed: {code} - {reason or 'User initiated'}")

# Register message stream event handlers
@client.on(AnamEvent.MESSAGE_STREAM_EVENT_RECEIVED)
async def on_message_stream_event(event) -> None:
"""Handle incremental message stream events."""
global show_captions
if show_captions:
role_emoji = "👤" if event.role == MessageRole.USER else "🤖"
role_name = "USER" if event.role == MessageRole.USER else "PERSONA"

if event.content_index == 0:
# content_index 0 denotes the start of a new message
print(f"{role_emoji} {role_name}: ", end="", flush=True)
# Show incremental updates (you can customize this)
print(f"{event.content}", end="", flush=True)
if event.end_of_speech:
# end_of_speech is fired when the message is complete
status = "✓" if not event.interrupted else "✗ INTERRUPTED"
print(f"{status}\n")

@client.on(AnamEvent.MESSAGE_HISTORY_UPDATED)
async def on_message_history_updated(messages) -> None:
"""Handle message history updates."""
logger.debug(f"\n📝 Message history updated: {len(messages)} messages total")

async def consume_video_frames(session) -> None:
"""Consume video frames from iterator."""
try:
Expand Down
2 changes: 2 additions & 0 deletions src/anam/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ async def consume_audio():
ConnectionClosedCode,
Message,
MessageRole,
MessageStreamEvent,
PersonaConfig,
)

Expand All @@ -74,6 +75,7 @@ async def consume_audio():
"ConnectionClosedCode",
"Message",
"MessageRole",
"MessageStreamEvent",
"PersonaConfig",
"VideoFrame",
# Errors
Expand Down
10 changes: 7 additions & 3 deletions src/anam/_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,13 @@ async def _setup_data_channel(self) -> None:
ordered=True,
)

# Initialize to False in case there's a stale value from a previous session
self._data_channel_open = False

# Check if channel is already open
if self._data_channel.readyState == "open":
self._data_channel_open = True

@self._data_channel.on("open")
def on_open() -> None:
logger.info("Data channel opened")
Expand All @@ -355,14 +362,11 @@ def on_close() -> None:
async def on_message(message: str) -> None:
try:
data = json.loads(message)
logger.debug("Data channel message: %s", data.get("messageType", "unknown"))
if self._on_message:
await self._on_message(data)
except json.JSONDecodeError as e:
logger.error("Failed to parse data channel message: %s", e)

self._data_channel_open = False

async def video_frames(self) -> AsyncIterator[VideoFrame]:
"""Get video frames as an async iterator.

Expand Down
Loading
Loading