From 835806665070a20e5d8e7ff8f71e7bf717803364 Mon Sep 17 00:00:00 2001 From: Alex Osland Date: Wed, 28 Jan 2026 12:26:54 +0000 Subject: [PATCH 01/23] feat: support custom audio track input --- src/anam/__init__.py | 4 ++++ src/anam/_streaming.py | 20 ++++++++++++++++++-- src/anam/client.py | 1 + src/anam/types.py | 7 +++++++ 4 files changed, 30 insertions(+), 2 deletions(-) diff --git a/src/anam/__init__.py b/src/anam/__init__.py index ec68043..9f19cee 100644 --- a/src/anam/__init__.py +++ b/src/anam/__init__.py @@ -39,6 +39,8 @@ async def consume_audio(): For more information, see https://docs.anam.ai """ +from aiortc import AudioStreamTrack +from av.audio.frame import AudioFrame from av.video.frame import VideoFrame from ._agent_audio_input_stream import AgentAudioInputStream @@ -70,6 +72,8 @@ async def consume_audio(): "AgentAudioInputConfig", "AgentAudioInputStream", "AnamEvent", + "AudioFrame", + "AudioStreamTrack", "ClientOptions", "ConnectionClosedCode", "Message", diff --git a/src/anam/_streaming.py b/src/anam/_streaming.py index 0511c3d..e5ee548 100644 --- a/src/anam/_streaming.py +++ b/src/anam/_streaming.py @@ -8,6 +8,7 @@ import aiohttp from aiortc import ( + AudioStreamTrack, MediaStreamTrack, RTCConfiguration, RTCDataChannel, @@ -42,6 +43,7 @@ def __init__( on_connection_closed: Callable[[str, str | None], Awaitable[None]] | None = None, disable_input_audio: bool = False, custom_ice_servers: list[dict[str, Any]] | None = None, + audio_input_track: AudioStreamTrack | None = None, ): """Initialize the streaming client. @@ -52,6 +54,9 @@ def __init__( on_connection_closed: Callback when disconnected. disable_input_audio: If True, don't send microphone audio. custom_ice_servers: Custom ICE servers (optional). + audio_input_track: Custom audio track for microphone input (optional). + If provided and disable_input_audio is False, this track will be + added to the WebRTC connection for sending audio to the server. """ self._session_info = session_info self._session_id = session_info.session_id @@ -64,6 +69,7 @@ def __init__( # Configuration self._disable_input_audio = disable_input_audio self._ice_servers = custom_ice_servers or session_info.ice_servers + self._audio_input_track = audio_input_track # State self._peer_connection: RTCPeerConnection | None = None @@ -326,8 +332,13 @@ def on_track(track: MediaStreamTrack) -> None: if self._disable_input_audio: self._peer_connection.addTransceiver("audio", direction="recvonly") else: - self._peer_connection.addTransceiver("audio", direction="sendrecv") - # Note: Audio input track would be added here if needed + # If we have an audio input track, add it to enable sending audio + if self._audio_input_track: + self._peer_connection.addTrack(self._audio_input_track) + logger.info("Added audio input track to peer connection") + else: + # Set up transceiver for potential future audio input + self._peer_connection.addTransceiver("audio", direction="sendrecv") logger.debug("Peer connection initialized") @@ -629,6 +640,11 @@ async def close(self) -> None: """Close the streaming connection and clean up resources.""" logger.debug("Closing streaming client") + # Stop audio input track if present + if self._audio_input_track: + self._audio_input_track.stop() + self._audio_input_track = None + # Close signalling if self._signalling_client: try: diff --git a/src/anam/client.py b/src/anam/client.py index 06ad5bf..bc7cf91 100644 --- a/src/anam/client.py +++ b/src/anam/client.py @@ -245,6 +245,7 @@ async def connect_async(self) -> "Session": on_connection_closed=self._handle_connection_closed, disable_input_audio=self._options.disable_input_audio, custom_ice_servers=self._options.ice_servers, + audio_input_track=self._options.audio_input_track, ) # Connect diff --git a/src/anam/types.py b/src/anam/types.py index af99540..cf2cac8 100644 --- a/src/anam/types.py +++ b/src/anam/types.py @@ -4,6 +4,8 @@ from enum import Enum from typing import Any +from aiortc import AudioStreamTrack + class AnamEvent(str, Enum): """Events emitted by the Anam client.""" @@ -112,6 +114,10 @@ class ClientOptions: ice_servers: Custom ICE servers for WebRTC (optional). client_label: Custom label for session tracking (optional). Defaults to 'python-sdk' if not specified. + audio_input_track: Custom audio track for microphone input (optional). + Provide an aiortc AudioStreamTrack subclass to send audio to the + server for speech-to-text processing. The track should produce + audio frames at 48kHz (WebRTC standard) - aiortc handles encoding. """ api_base_url: str = "https://api.anam.ai" @@ -119,6 +125,7 @@ class ClientOptions: disable_input_audio: bool = False ice_servers: list[dict[str, Any]] | None = None client_label: str | None = None + audio_input_track: AudioStreamTrack | None = None @dataclass From 1705430bc52495c38908ede064783de38fccb2fe Mon Sep 17 00:00:00 2001 From: Alex Osland <165405455+ao-anam@users.noreply.github.com> Date: Wed, 28 Jan 2026 12:42:52 +0000 Subject: [PATCH 02/23] chore: fix comment Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/anam/_streaming.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/anam/_streaming.py b/src/anam/_streaming.py index e5ee548..118ce36 100644 --- a/src/anam/_streaming.py +++ b/src/anam/_streaming.py @@ -337,7 +337,8 @@ def on_track(track: MediaStreamTrack) -> None: self._peer_connection.addTrack(self._audio_input_track) logger.info("Added audio input track to peer connection") else: - # Set up transceiver for potential future audio input + # Set up a bidirectional audio transceiver to receive audio now + # and allow attaching a local audio input track in the future self._peer_connection.addTransceiver("audio", direction="sendrecv") logger.debug("Peer connection initialized") From 36614ff1e10c6b601075516ce734c19e1b70fc3c Mon Sep 17 00:00:00 2001 From: sebvanleuven Date: Thu, 29 Jan 2026 12:43:45 +0000 Subject: [PATCH 03/23] direct audio samples input --- examples/persona_interactive_video.py | 2 +- src/anam/__init__.py | 2 - src/anam/_streaming.py | 92 +++++++++--- src/anam/_user_audio_input_track.py | 206 ++++++++++++++++++++++++++ src/anam/client.py | 1 - src/anam/types.py | 9 +- 6 files changed, 282 insertions(+), 30 deletions(-) create mode 100644 src/anam/_user_audio_input_track.py diff --git a/examples/persona_interactive_video.py b/examples/persona_interactive_video.py index 89f991c..4d66316 100644 --- a/examples/persona_interactive_video.py +++ b/examples/persona_interactive_video.py @@ -214,7 +214,7 @@ async def on_message_stream_event(event) -> None: @client.on(AnamEvent.MESSAGE_HISTORY_UPDATED) async def on_message_history_updated(messages) -> None: """Handle message history updates.""" - logger.debug(f"\nšŸ“ Message history updated: {len(messages)} messages total") + print(f"\nšŸ“ Message history updated: {len(messages)} messages total") async def consume_video_frames(session) -> None: """Consume video frames from iterator.""" diff --git a/src/anam/__init__.py b/src/anam/__init__.py index 0081c74..8a9d7bb 100644 --- a/src/anam/__init__.py +++ b/src/anam/__init__.py @@ -39,7 +39,6 @@ async def consume_audio(): For more information, see https://docs.anam.ai """ -from aiortc import AudioStreamTrack from av.audio.frame import AudioFrame from av.video.frame import VideoFrame @@ -74,7 +73,6 @@ async def consume_audio(): "AgentAudioInputStream", "AnamEvent", "AudioFrame", - "AudioStreamTrack", "ClientOptions", "ConnectionClosedCode", "Message", diff --git a/src/anam/_streaming.py b/src/anam/_streaming.py index 816661e..b1a4ae1 100644 --- a/src/anam/_streaming.py +++ b/src/anam/_streaming.py @@ -8,7 +8,6 @@ import aiohttp from aiortc import ( - AudioStreamTrack, MediaStreamTrack, RTCConfiguration, RTCDataChannel, @@ -22,6 +21,7 @@ from ._agent_audio_input_stream import AgentAudioInputStream from ._signalling import SignalAction, SignallingClient +from ._user_audio_input_track import UserAudioInputTrack from .types import AgentAudioInputConfig, SessionInfo logger = logging.getLogger(__name__) @@ -43,7 +43,6 @@ def __init__( on_connection_closed: Callable[[str, str | None], Awaitable[None]] | None = None, disable_input_audio: bool = False, custom_ice_servers: list[dict[str, Any]] | None = None, - audio_input_track: AudioStreamTrack | None = None, ): """Initialize the streaming client. @@ -53,10 +52,8 @@ def __init__( on_connection_established: Callback when connected. on_connection_closed: Callback when disconnected. disable_input_audio: If True, don't send microphone audio. + When False, audio can be sent via send_user_audio() method. custom_ice_servers: Custom ICE servers (optional). - audio_input_track: Custom audio track for microphone input (optional). - If provided and disable_input_audio is False, this track will be - added to the WebRTC connection for sending audio to the server. """ self._session_info = session_info self._session_id = session_info.session_id @@ -69,7 +66,6 @@ def __init__( # Configuration self._disable_input_audio = disable_input_audio self._ice_servers = custom_ice_servers or session_info.ice_servers - self._audio_input_track = audio_input_track # State self._peer_connection: RTCPeerConnection | None = None @@ -81,6 +77,8 @@ def __init__( self._audio_track: MediaStreamTrack | None = None self._is_connected = False self._agent_audio_input_stream: AgentAudioInputStream | None = None + self._user_audio_input_track: UserAudioInputTrack | None = None + self._audio_transceiver = None # Store transceiver for lazy track creation async def connect(self, timeout: float = 30.0) -> None: """Start the streaming connection. @@ -332,14 +330,18 @@ def on_track(track: MediaStreamTrack) -> None: if self._disable_input_audio: self._peer_connection.addTransceiver("audio", direction="recvonly") else: - # If we have an audio input track, add it to enable sending audio - if self._audio_input_track: - self._peer_connection.addTrack(self._audio_input_track) - logger.info("Added audio input track to peer connection") - else: - # Set up a bidirectional audio transceiver to receive audio now - # and allow attaching a local audio input track in the future - self._peer_connection.addTransceiver("audio", direction="sendrecv") + # Always add a transceiver first (like JavaScript SDK) + # This ensures proper WebRTC negotiation + # We add transceiver early but create track lazily when audio arrives + self._audio_transceiver = self._peer_connection.addTransceiver("audio", direction="sendrecv") + logger.info( + f"Added audio transceiver: direction={self._audio_transceiver.direction}, " + f"disable_input_audio={self._disable_input_audio}" + ) + # Track will be created lazily when first audio arrives via send_user_audio() + logger.info( + "Audio input enabled - track will be created lazily when first audio arrives via send_user_audio()" + ) logger.debug("Peer connection initialized") @@ -645,11 +647,6 @@ async def close(self) -> None: """Close the streaming connection and clean up resources.""" logger.debug("Closing streaming client") - # Stop audio input track if present - if self._audio_input_track: - self._audio_input_track.stop() - self._audio_input_track = None - # Close signalling if self._signalling_client: try: @@ -671,6 +668,63 @@ async def close(self) -> None: self._is_connected = False logger.info("Streaming client closed") + def send_user_audio( + self, + audio_bytes: bytes, + sample_rate: int, + num_channels: int, + ) -> None: + """Send raw user audio samples to Anam for processing. + + This method accepts raw audio bytes (16-bit PCM) and queues them for transmission via WebRTC. + The audio track is created lazily when first audio arrives, minimizing + latency and avoiding unnecessary resource allocation. + + Args: + audio_bytes: Raw audio data (16-bit PCM). + sample_rate: Sample rate of the input audio (Hz). + num_channels: Number of channels in the input audio (1=mono, 2=stereo). + + Raises: + RuntimeError: If audio input is disabled or peer connection is not initialized. + """ + if self._disable_input_audio: + raise RuntimeError( + "Audio input is disabled. Set disable_input_audio=False in ClientOptions." + ) + + if not self._peer_connection: + raise RuntimeError("Peer connection not initialized. Call connect() first.") + + # Create track lazily when first audio arrives + if self._user_audio_input_track is None: + logger.info( + f"Creating user audio input track: sample_rate={sample_rate}Hz, " + f"channels={num_channels}" + ) + self._user_audio_input_track = UserAudioInputTrack( + expected_sample_rate=sample_rate, + expected_channels=num_channels, + ) + + # Add track to transceiver (lazy track creation) + if self._audio_transceiver and self._audio_transceiver.sender: + try: + self._audio_transceiver.sender.replaceTrack(self._user_audio_input_track) + logger.info( + "Added user audio track to transceiver - " + "WebRTC will call recv() to get audio frames for encoding to Opus" + ) + except Exception as e: + logger.error(f"Failed to add user audio track to transceiver: {e}") + raise RuntimeError(f"Failed to add user audio track: {e}") from e + else: + logger.error("Audio transceiver or sender not available") + raise RuntimeError("Audio transceiver not properly initialized") + + # Add audio samples to track buffer + self._user_audio_input_track.add_audio_samples(audio_bytes, sample_rate, num_channels) + def __del__(self) -> None: """Cleanup on destruction to prevent warnings.""" # Clear peer connection reference if close() wasn't called explicitly. diff --git a/src/anam/_user_audio_input_track.py b/src/anam/_user_audio_input_track.py new file mode 100644 index 0000000..50775d1 --- /dev/null +++ b/src/anam/_user_audio_input_track.py @@ -0,0 +1,206 @@ +"""User audio input track for sending raw audio samples to Anam via WebRTC. + +This module provides a mechanism for accepting raw audio samples from Pipecat +and converting them to WebRTC-compatible format (48kHz mono) for transmission. +""" + +import asyncio +import fractions +import logging +import time +from collections import deque +from typing import Optional + +import numpy as np +from aiortc.mediastreams import AudioStreamTrack +from av.audio.frame import AudioFrame +from av.audio.resampler import AudioResampler + +logger = logging.getLogger(__name__) + +# WebRTC standard audio sample rate +WEBRTC_AUDIO_SAMPLE_RATE = 48000 + + +class UserAudioInputTrack(AudioStreamTrack): + """AudioStreamTrack that accepts raw audio samples and converts to WebRTC format. + + This track accepts raw audio bytes (16-bit PCM) with variable sample rates + and channel counts, and converts them to WebRTC-compatible format (48kHz mono). + The track is created lazily when first audio arrives, minimizing latency. + + The track buffers audio in small chunks (10ms) and handles resampling/conversion + only when necessary. If WebRTC/Opus can handle the input format directly, + minimal processing is performed. + """ + + def __init__(self, expected_sample_rate: Optional[int] = None, expected_channels: Optional[int] = None): + """Initialize the user audio input track. + + Args: + expected_sample_rate: Expected input sample rate (Hz). If None, will be + determined from first audio chunk. Defaults to None. + expected_channels: Expected number of channels. If None, will be determined + from first audio chunk. Defaults to None. + """ + super().__init__() + self._output_sample_rate = WEBRTC_AUDIO_SAMPLE_RATE + self._samples_per_10ms = self._output_sample_rate * 10 // 1000 + self._bytes_per_10ms = self._samples_per_10ms * 2 # 16-bit (2 bytes per sample) + self._timestamp = 0 + self._start = time.time() + + # Expected input format (can be None initially) + self._expected_sample_rate = expected_sample_rate + self._expected_channels = expected_channels + + # Queue of (audio_bytes, sample_rate, num_channels) tuples + # Audio is queued in 10ms chunks at input sample rate + self._audio_queue: deque[tuple[bytes, int, int]] = deque() + + # Resampler for converting input audio to 48kHz mono + # Created lazily when we know the input format + self._resampler: Optional[AudioResampler] = None + self._resampler_input_rate: Optional[int] = None + + # Lock for thread-safe operations + self._lock = asyncio.Lock() + + def add_audio_samples( + self, + audio_bytes: bytes, + sample_rate: int, + num_channels: int, + ) -> None: + """Add raw audio samples to the track buffer. + + This method accepts raw 16-bit PCM audio bytes and queues them for + transmission. The audio will be resampled/converted to WebRTC format + (48kHz mono) when recv() is called by WebRTC. + + Args: + audio_bytes: Raw audio data (16-bit PCM). + sample_rate: Sample rate of the input audio (Hz). + num_channels: Number of channels in the input audio (1=mono, 2=stereo). + """ + # Validate input format matches expected (if set) + if self._expected_sample_rate is not None and sample_rate != self._expected_sample_rate: + logger.warning( + f"Sample rate mismatch: expected {self._expected_sample_rate}Hz, " + f"got {sample_rate}Hz. Resampling will occur." + ) + if self._expected_channels is not None and num_channels != self._expected_channels: + logger.warning( + f"Channel count mismatch: expected {self._expected_channels}, " + f"got {num_channels}. Conversion will occur." + ) + + # Convert to numpy array for processing + samples = np.frombuffer(audio_bytes, dtype=np.int16) + + # Handle multi-channel audio (convert to mono if needed) + if num_channels > 1: + samples = samples.reshape(-1, num_channels).mean(axis=1).astype(np.int16) + + # Calculate samples per 10ms at input sample rate + samples_per_10ms_input = sample_rate * 10 // 1000 + bytes_per_10ms_input = samples_per_10ms_input * 2 # 16-bit + + # Break into 10ms chunks at input sample rate for minimal buffering + for i in range(0, len(samples), samples_per_10ms_input): + chunk_samples = samples[i:i + samples_per_10ms_input] + + # Pad last chunk if needed to make it exactly 10ms + if len(chunk_samples) < samples_per_10ms_input: + padding_samples = samples_per_10ms_input - len(chunk_samples) + padding = np.zeros(padding_samples, dtype=np.int16) + chunk_samples = np.concatenate([chunk_samples, padding]) + + chunk_bytes = chunk_samples.astype(np.int16).tobytes() + self._audio_queue.append((chunk_bytes, sample_rate, 1)) # Always mono after conversion + + async def recv(self) -> AudioFrame: + """Return the next audio frame for WebRTC transmission. + + This method is called by WebRTC to get audio frames for encoding to Opus. + It returns AudioFrame objects at 48kHz mono, resampling/converting input + audio as necessary. + + Returns: + An AudioFrame containing the next 10ms of audio data at 48kHz mono. + """ + # Compute required wait time for synchronization + if self._timestamp > 0: + wait = self._start + (self._timestamp / self._output_sample_rate) - time.time() + if wait > 0: + await asyncio.sleep(wait) + + audio_data = None + + async with self._lock: + if self._audio_queue: + audio_bytes, sample_rate, num_channels = self._audio_queue.popleft() + + # Convert bytes to numpy array (already mono, 16-bit PCM) + samples = np.frombuffer(audio_bytes, dtype=np.int16) + + # If sample_rate is already 48kHz, no resampling needed + if sample_rate == self._output_sample_rate: + # Audio is already at target sample rate (should be exactly 10ms) + audio_data = samples[None, :] # Shape: (1, num_samples) + else: + # Need to resample to 48kHz + # Create resampler lazily if needed + if self._resampler is None or self._resampler_input_rate != sample_rate: + self._resampler = AudioResampler("s16", "mono", self._output_sample_rate) + self._resampler_input_rate = sample_rate + logger.debug( + f"Created resampler: {sample_rate}Hz -> {self._output_sample_rate}Hz" + ) + + # Create AudioFrame from input sample rate for resampling + input_frame = AudioFrame.from_ndarray(samples[None, :], layout="mono") + input_frame.sample_rate = sample_rate + input_frame.pts = 0 + input_frame.time_base = fractions.Fraction(1, sample_rate) + + # Resample to 48kHz + resampled_frames = self._resampler.resample(input_frame) + # Collect all resampled frames and concatenate + resampled_arrays = [] + for resampled_frame in resampled_frames: + resampled_arrays.append(resampled_frame.to_ndarray()) + if resampled_arrays: + audio_data = np.concatenate(resampled_arrays, axis=1) + else: + audio_data = None + + if audio_data is None: + # Generate silence if no audio available + audio_data = np.zeros((1, self._samples_per_10ms), dtype=np.int16) + else: + # Ensure we have exactly 10ms worth of samples at 48kHz + if audio_data.shape[1] < self._samples_per_10ms: + # Pad with silence if we have less than 10ms (shouldn't happen often) + padding = np.zeros((1, self._samples_per_10ms - audio_data.shape[1]), dtype=np.int16) + audio_data = np.concatenate([audio_data, padding], axis=1) + elif audio_data.shape[1] > self._samples_per_10ms: + # If resampling produced more than 10ms, take first 10ms and queue the rest + # This can happen due to resampling precision + remaining_samples = audio_data[:, self._samples_per_10ms:] + remaining_bytes = remaining_samples.astype(np.int16).tobytes() + # Put remaining audio back in queue - already at 48kHz, mono + async with self._lock: + self._audio_queue.appendleft((remaining_bytes, self._output_sample_rate, 1)) + audio_data = audio_data[:, :self._samples_per_10ms] + + # Create AudioFrame for WebRTC + # WebRTC will automatically encode AudioFrame to Opus + # We provide mono 48kHz PCM - WebRTC handles encoding/transmission + frame = AudioFrame.from_ndarray(audio_data, layout="mono") + frame.sample_rate = self._output_sample_rate + frame.pts = self._timestamp + frame.time_base = fractions.Fraction(1, self._output_sample_rate) + self._timestamp += self._samples_per_10ms + + return frame diff --git a/src/anam/client.py b/src/anam/client.py index 3b4d1bc..432adf5 100644 --- a/src/anam/client.py +++ b/src/anam/client.py @@ -247,7 +247,6 @@ async def connect_async(self) -> "Session": on_connection_closed=self._handle_connection_closed, disable_input_audio=self._options.disable_input_audio, custom_ice_servers=self._options.ice_servers, - audio_input_track=self._options.audio_input_track, ) # Connect diff --git a/src/anam/types.py b/src/anam/types.py index b51eb7c..7bbe54f 100644 --- a/src/anam/types.py +++ b/src/anam/types.py @@ -4,7 +4,6 @@ from enum import Enum from typing import Any -from aiortc import AudioStreamTrack class AnamEvent(str, Enum): @@ -111,14 +110,11 @@ class ClientOptions: Args: api_base_url: Base URL for the Anam API. api_version: API version to use. - disable_input_audio: If True, don't capture/send microphone audio. + disable_input_audio: If True, do not send audio to Anam's service. + When False, audio can be sent via send_user_audio() method. ice_servers: Custom ICE servers for WebRTC (optional). client_label: Custom label for session tracking (optional). Defaults to 'python-sdk' if not specified. - audio_input_track: Custom audio track for microphone input (optional). - Provide an aiortc AudioStreamTrack subclass to send audio to the - server for speech-to-text processing. The track should produce - audio frames at 48kHz (WebRTC standard) - aiortc handles encoding. """ api_base_url: str = "https://api.anam.ai" @@ -126,7 +122,6 @@ class ClientOptions: disable_input_audio: bool = False ice_servers: list[dict[str, Any]] | None = None client_label: str | None = None - audio_input_track: AudioStreamTrack | None = None @dataclass From 0bb97465b8a744a3face75f76cd5419b94eabe4f Mon Sep 17 00:00:00 2001 From: sebvanleuven Date: Thu, 29 Jan 2026 13:14:54 +0000 Subject: [PATCH 04/23] fix lint --- src/anam/_user_audio_input_track.py | 30 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/src/anam/_user_audio_input_track.py b/src/anam/_user_audio_input_track.py index 50775d1..3cf6143 100644 --- a/src/anam/_user_audio_input_track.py +++ b/src/anam/_user_audio_input_track.py @@ -49,20 +49,20 @@ def __init__(self, expected_sample_rate: Optional[int] = None, expected_channels self._bytes_per_10ms = self._samples_per_10ms * 2 # 16-bit (2 bytes per sample) self._timestamp = 0 self._start = time.time() - + # Expected input format (can be None initially) self._expected_sample_rate = expected_sample_rate self._expected_channels = expected_channels - + # Queue of (audio_bytes, sample_rate, num_channels) tuples # Audio is queued in 10ms chunks at input sample rate self._audio_queue: deque[tuple[bytes, int, int]] = deque() - + # Resampler for converting input audio to 48kHz mono # Created lazily when we know the input format self._resampler: Optional[AudioResampler] = None self._resampler_input_rate: Optional[int] = None - + # Lock for thread-safe operations self._lock = asyncio.Lock() @@ -97,25 +97,25 @@ def add_audio_samples( # Convert to numpy array for processing samples = np.frombuffer(audio_bytes, dtype=np.int16) - + # Handle multi-channel audio (convert to mono if needed) if num_channels > 1: samples = samples.reshape(-1, num_channels).mean(axis=1).astype(np.int16) - + # Calculate samples per 10ms at input sample rate samples_per_10ms_input = sample_rate * 10 // 1000 bytes_per_10ms_input = samples_per_10ms_input * 2 # 16-bit - + # Break into 10ms chunks at input sample rate for minimal buffering for i in range(0, len(samples), samples_per_10ms_input): chunk_samples = samples[i:i + samples_per_10ms_input] - + # Pad last chunk if needed to make it exactly 10ms if len(chunk_samples) < samples_per_10ms_input: padding_samples = samples_per_10ms_input - len(chunk_samples) padding = np.zeros(padding_samples, dtype=np.int16) chunk_samples = np.concatenate([chunk_samples, padding]) - + chunk_bytes = chunk_samples.astype(np.int16).tobytes() self._audio_queue.append((chunk_bytes, sample_rate, 1)) # Always mono after conversion @@ -136,20 +136,19 @@ async def recv(self) -> AudioFrame: await asyncio.sleep(wait) audio_data = None - + async with self._lock: if self._audio_queue: audio_bytes, sample_rate, num_channels = self._audio_queue.popleft() - + # Convert bytes to numpy array (already mono, 16-bit PCM) samples = np.frombuffer(audio_bytes, dtype=np.int16) - + # If sample_rate is already 48kHz, no resampling needed if sample_rate == self._output_sample_rate: # Audio is already at target sample rate (should be exactly 10ms) audio_data = samples[None, :] # Shape: (1, num_samples) else: - # Need to resample to 48kHz # Create resampler lazily if needed if self._resampler is None or self._resampler_input_rate != sample_rate: self._resampler = AudioResampler("s16", "mono", self._output_sample_rate) @@ -157,13 +156,13 @@ async def recv(self) -> AudioFrame: logger.debug( f"Created resampler: {sample_rate}Hz -> {self._output_sample_rate}Hz" ) - + # Create AudioFrame from input sample rate for resampling input_frame = AudioFrame.from_ndarray(samples[None, :], layout="mono") input_frame.sample_rate = sample_rate input_frame.pts = 0 input_frame.time_base = fractions.Fraction(1, sample_rate) - + # Resample to 48kHz resampled_frames = self._resampler.resample(input_frame) # Collect all resampled frames and concatenate @@ -202,5 +201,4 @@ async def recv(self) -> AudioFrame: frame.pts = self._timestamp frame.time_base = fractions.Fraction(1, self._output_sample_rate) self._timestamp += self._samples_per_10ms - return frame From 7489cd5c21c43c830338ea47af9f61baf3752422 Mon Sep 17 00:00:00 2001 From: sebvanleuven Date: Thu, 29 Jan 2026 17:24:22 +0000 Subject: [PATCH 05/23] flush audio buffer at first recv() and keep last samples so we can start processing --- src/anam/_streaming.py | 43 +++++- src/anam/_user_audio_input_track.py | 230 +++++++++++++++++----------- 2 files changed, 182 insertions(+), 91 deletions(-) diff --git a/src/anam/_streaming.py b/src/anam/_streaming.py index b1a4ae1..987b787 100644 --- a/src/anam/_streaming.py +++ b/src/anam/_streaming.py @@ -80,6 +80,20 @@ def __init__( self._user_audio_input_track: UserAudioInputTrack | None = None self._audio_transceiver = None # Store transceiver for lazy track creation + def _on_connection_established_internal(self) -> None: + """Internal callback that flushes audio buffer when connection is established. + + This is called automatically before the user's on_connection_established callback. + """ + # Flush buffered audio when connection is established + # This ensures we start with live audio instead of catching up on buffered audio + logger.debug("_on_connection_established_internal ready for duty, sir.") + if self._user_audio_input_track: + logger.debug("Flushing audio queue on connection established") + self._user_audio_input_track.flush() + else: + logger.debug("Cannot flush audio queue - track not created yet (will flush when track is created)") + async def connect(self, timeout: float = 30.0) -> None: """Start the streaming connection. @@ -292,6 +306,8 @@ def on_ice_connection_state_change() -> None: self._is_connected = True if hasattr(self, "_connection_ready"): self._connection_ready.set() + # Call internal flush callback, then user's callback + self._on_connection_established_internal() if self._on_connection_established: asyncio.create_task(self._on_connection_established()) elif state == "failed": @@ -415,6 +431,8 @@ async def video_frames(self) -> AsyncIterator[VideoFrame]: self._is_connected = True if hasattr(self, "_connection_ready"): self._connection_ready.set() + # Call internal flush callback, then user's callback + self._on_connection_established_internal() if self._on_connection_established: asyncio.create_task(self._on_connection_established()) @@ -469,6 +487,8 @@ async def audio_frames(self) -> AsyncIterator[AudioFrame]: self._is_connected = True if hasattr(self, "_connection_ready"): self._connection_ready.set() + # Call internal flush callback, then user's callback + self._on_connection_established_internal() if self._on_connection_established: asyncio.create_task(self._on_connection_established()) @@ -656,6 +676,17 @@ async def close(self) -> None: finally: self._signalling_client = None + # Close user audio input track before closing peer connection + # This clears the audio queue and prevents recv() from generating more frames + if self._user_audio_input_track: + try: + self._user_audio_input_track.close() + logger.debug("Closed user audio input track") + except Exception as e: + logger.warning("Error closing user audio input track: %s", e) + finally: + self._user_audio_input_track = None + # Close peer connection if self._peer_connection: try: @@ -702,10 +733,7 @@ def send_user_audio( f"Creating user audio input track: sample_rate={sample_rate}Hz, " f"channels={num_channels}" ) - self._user_audio_input_track = UserAudioInputTrack( - expected_sample_rate=sample_rate, - expected_channels=num_channels, - ) + self._user_audio_input_track = UserAudioInputTrack() # Add track to transceiver (lazy track creation) if self._audio_transceiver and self._audio_transceiver.sender: @@ -722,6 +750,13 @@ def send_user_audio( logger.error("Audio transceiver or sender not available") raise RuntimeError("Audio transceiver not properly initialized") + # If connection is already established, flush any audio that was queued + # before the track was created (this handles the case where audio arrives + # before connection is established, then track is created after connection) + if self._is_connected: + logger.debug("Connection already established - flushing audio queue on track creation") + self._user_audio_input_track.flush() + # Add audio samples to track buffer self._user_audio_input_track.add_audio_samples(audio_bytes, sample_rate, num_channels) diff --git a/src/anam/_user_audio_input_track.py b/src/anam/_user_audio_input_track.py index 3cf6143..68cf791 100644 --- a/src/anam/_user_audio_input_track.py +++ b/src/anam/_user_audio_input_track.py @@ -14,7 +14,6 @@ import numpy as np from aiortc.mediastreams import AudioStreamTrack from av.audio.frame import AudioFrame -from av.audio.resampler import AudioResampler logger = logging.getLogger(__name__) @@ -34,14 +33,12 @@ class UserAudioInputTrack(AudioStreamTrack): minimal processing is performed. """ - def __init__(self, expected_sample_rate: Optional[int] = None, expected_channels: Optional[int] = None): + def __init__(self): """Initialize the user audio input track. - Args: - expected_sample_rate: Expected input sample rate (Hz). If None, will be - determined from first audio chunk. Defaults to None. - expected_channels: Expected number of channels. If None, will be determined - from first audio chunk. Defaults to None. + The track determines audio format (sample rate, channels) from the actual + audio data received via add_audio_samples(). No assumptions are made about + the input format. """ super().__init__() self._output_sample_rate = WEBRTC_AUDIO_SAMPLE_RATE @@ -50,22 +47,74 @@ def __init__(self, expected_sample_rate: Optional[int] = None, expected_channels self._timestamp = 0 self._start = time.time() - # Expected input format (can be None initially) - self._expected_sample_rate = expected_sample_rate - self._expected_channels = expected_channels - # Queue of (audio_bytes, sample_rate, num_channels) tuples # Audio is queued in 10ms chunks at input sample rate self._audio_queue: deque[tuple[bytes, int, int]] = deque() - # Resampler for converting input audio to 48kHz mono - # Created lazily when we know the input format - self._resampler: Optional[AudioResampler] = None - self._resampler_input_rate: Optional[int] = None + # Track current sample rate for timing calculations + # Set from actual audio data when first chunk is processed + self._current_sample_rate: Optional[int] = None + + # Flag to indicate if connection is closed - prevents generating frames after disconnect + self._is_closed = False + + # Flag to track if this is the first recv() call - flush buffer on first call + # This handles the case where audio arrives between connection established and WebRTC starting to pull + self._first_recv = True + + # Maximum queue size for backpressure (approximately 1 second at 16kHz = 100 chunks) + # If queue exceeds this, we drop old audio to prevent unbounded growth + self._max_queue_size = 100 # Lock for thread-safe operations self._lock = asyncio.Lock() + def flush(self) -> None: + """Flush the audio queue to discard any buffered audio. + + This should be called when the WebRTC connection is established to ensure + we start with live audio immediately instead of catching up on buffered audio. + + Preserves the last chunk(s) from the queue to maintain format information + (sample rate, channels) so we don't generate silence at wrong format. + """ + queue_size = len(self._audio_queue) + if not self._audio_queue or queue_size == 0: + logger.debug("Audio queue is empty - nothing to flush") + return + + # Preserve the last chunk to maintain format information + # This ensures we know the sample rate/channels before generating any silence + last_chunk = self._audio_queue[-1] + + # Clear the queue + self._audio_queue.clear() + + # Put the last chunk back if we have one + # This preserves format info while discarding old buffered audio + self._audio_queue.append(last_chunk) + # Update current sample rate from the preserved chunk + _, sample_rate, _ = last_chunk + self._current_sample_rate = sample_rate + logger.info( + f"Flushed audio queue: discarded {queue_size - 1} buffered audio chunks, " + f"preserved last chunk at {sample_rate}Hz to maintain format" + ) + + def close(self) -> None: + """Mark track as closed and clear audio queue to prevent further frame generation. + + This method should be called when the connection is closing to stop WebRTC + from continuing to pull audio frames. After this is called, recv() will raise + MediaStreamError to signal WebRTC to stop calling it. + """ + self._is_closed = True + # Clear the queue to prevent processing any remaining queued audio + # Note: We don't use the lock here because this is called during cleanup + # and we want to be sure the flag is set immediately + self._audio_queue.clear() + logger.debug("UserAudioInputTrack closed - cleared audio queue and marked as closed") + def add_audio_samples( self, audio_bytes: bytes, @@ -75,26 +124,14 @@ def add_audio_samples( """Add raw audio samples to the track buffer. This method accepts raw 16-bit PCM audio bytes and queues them for - transmission. The audio will be resampled/converted to WebRTC format - (48kHz mono) when recv() is called by WebRTC. + transmission. The audio format (sample rate, channels) is determined + from the actual audio data provided. Args: audio_bytes: Raw audio data (16-bit PCM). sample_rate: Sample rate of the input audio (Hz). num_channels: Number of channels in the input audio (1=mono, 2=stereo). """ - # Validate input format matches expected (if set) - if self._expected_sample_rate is not None and sample_rate != self._expected_sample_rate: - logger.warning( - f"Sample rate mismatch: expected {self._expected_sample_rate}Hz, " - f"got {sample_rate}Hz. Resampling will occur." - ) - if self._expected_channels is not None and num_channels != self._expected_channels: - logger.warning( - f"Channel count mismatch: expected {self._expected_channels}, " - f"got {num_channels}. Conversion will occur." - ) - # Convert to numpy array for processing samples = np.frombuffer(audio_bytes, dtype=np.int16) @@ -104,7 +141,6 @@ def add_audio_samples( # Calculate samples per 10ms at input sample rate samples_per_10ms_input = sample_rate * 10 // 1000 - bytes_per_10ms_input = samples_per_10ms_input * 2 # 16-bit # Break into 10ms chunks at input sample rate for minimal buffering for i in range(0, len(samples), samples_per_10ms_input): @@ -117,88 +153,108 @@ def add_audio_samples( chunk_samples = np.concatenate([chunk_samples, padding]) chunk_bytes = chunk_samples.astype(np.int16).tobytes() + + # Apply backpressure: if queue is too large, drop oldest audio + # This prevents unbounded memory growth when audio arrives faster than WebRTC can consume + if len(self._audio_queue) >= self._max_queue_size: + self._audio_queue.popleft() + logger.debug( + f"Queue full ({len(self._audio_queue)} items) - dropping oldest chunk " + f"to apply backpressure" + ) + self._audio_queue.append((chunk_bytes, sample_rate, 1)) # Always mono after conversion async def recv(self) -> AudioFrame: """Return the next audio frame for WebRTC transmission. - This method is called by WebRTC to get audio frames for encoding to Opus. - It returns AudioFrame objects at 48kHz mono, resampling/converting input - audio as necessary. + This method sends audio at its original sample rate. The Opus encoder + handles resampling internally. Returns: - An AudioFrame containing the next 10ms of audio data at 48kHz mono. + An AudioFrame containing the next 10ms of audio data at original sample rate. + + Raises: + MediaStreamError: If the track has been closed. """ - # Compute required wait time for synchronization - if self._timestamp > 0: - wait = self._start + (self._timestamp / self._output_sample_rate) - time.time() - if wait > 0: - await asyncio.sleep(wait) + # Check if track has been closed - raise error to stop WebRTC from calling recv() + if self._is_closed: + from aiortc.mediastreams import MediaStreamError + raise MediaStreamError("Track has been closed") + + # Flush buffer on first recv() call to catch any audio that arrived between + # connection established and WebRTC starting to pull frames + if self._first_recv: + self._first_recv = False + self.flush() audio_data = None + current_sample_rate = None + samples_per_chunk = None async with self._lock: + # Double-check after acquiring lock (race condition protection) + if self._is_closed: + from aiortc.mediastreams import MediaStreamError + raise MediaStreamError("Track has been closed") + if self._audio_queue: + # Process audio from queue - format is determined from actual audio data audio_bytes, sample_rate, num_channels = self._audio_queue.popleft() + current_sample_rate = sample_rate + samples_per_chunk = sample_rate * 10 // 1000 # Convert bytes to numpy array (already mono, 16-bit PCM) samples = np.frombuffer(audio_bytes, dtype=np.int16) - # If sample_rate is already 48kHz, no resampling needed - if sample_rate == self._output_sample_rate: - # Audio is already at target sample rate (should be exactly 10ms) - audio_data = samples[None, :] # Shape: (1, num_samples) - else: - # Create resampler lazily if needed - if self._resampler is None or self._resampler_input_rate != sample_rate: - self._resampler = AudioResampler("s16", "mono", self._output_sample_rate) - self._resampler_input_rate = sample_rate - logger.debug( - f"Created resampler: {sample_rate}Hz -> {self._output_sample_rate}Hz" - ) - - # Create AudioFrame from input sample rate for resampling - input_frame = AudioFrame.from_ndarray(samples[None, :], layout="mono") - input_frame.sample_rate = sample_rate - input_frame.pts = 0 - input_frame.time_base = fractions.Fraction(1, sample_rate) - - # Resample to 48kHz - resampled_frames = self._resampler.resample(input_frame) - # Collect all resampled frames and concatenate - resampled_arrays = [] - for resampled_frame in resampled_frames: - resampled_arrays.append(resampled_frame.to_ndarray()) - if resampled_arrays: - audio_data = np.concatenate(resampled_arrays, axis=1) - else: - audio_data = None + # Use audio at original sample rate - Opus encoder handles resampling internally + audio_data = samples[None, :] # Shape: (1, num_samples) + + # Update current sample rate tracking + self._current_sample_rate = sample_rate + # Generate silence if no audio available, using known sample rate if audio_data is None: - # Generate silence if no audio available - audio_data = np.zeros((1, self._samples_per_10ms), dtype=np.int16) + if self._current_sample_rate is not None: + # We've seen audio before - generate silence at the same sample rate + current_sample_rate = self._current_sample_rate + samples_per_chunk = current_sample_rate * 10 // 1000 + audio_data = np.zeros((1, samples_per_chunk), dtype=np.int16) + else: + # No audio format known yet - this shouldn't happen since track is created + # lazily when first audio arrives, but use default as fallback + logger.warning( + "No audio format known - generating silence at default 48kHz. " + "This may cause format mismatch if actual audio is different." + ) + current_sample_rate = self._output_sample_rate + samples_per_chunk = self._samples_per_10ms + audio_data = np.zeros((1, samples_per_chunk), dtype=np.int16) else: - # Ensure we have exactly 10ms worth of samples at 48kHz - if audio_data.shape[1] < self._samples_per_10ms: - # Pad with silence if we have less than 10ms (shouldn't happen often) - padding = np.zeros((1, self._samples_per_10ms - audio_data.shape[1]), dtype=np.int16) + # Ensure we have exactly 10ms worth of samples at current sample rate + if audio_data.shape[1] < samples_per_chunk: + # Pad with silence if we have less than 10ms + padding = np.zeros((1, samples_per_chunk - audio_data.shape[1]), dtype=np.int16) audio_data = np.concatenate([audio_data, padding], axis=1) - elif audio_data.shape[1] > self._samples_per_10ms: - # If resampling produced more than 10ms, take first 10ms and queue the rest - # This can happen due to resampling precision - remaining_samples = audio_data[:, self._samples_per_10ms:] + elif audio_data.shape[1] > samples_per_chunk: + # Queue the rest if we have more than 10ms + remaining_samples = audio_data[:, samples_per_chunk:] remaining_bytes = remaining_samples.astype(np.int16).tobytes() - # Put remaining audio back in queue - already at 48kHz, mono async with self._lock: - self._audio_queue.appendleft((remaining_bytes, self._output_sample_rate, 1)) - audio_data = audio_data[:, :self._samples_per_10ms] + self._audio_queue.appendleft((remaining_bytes, current_sample_rate, 1)) + audio_data = audio_data[:, :samples_per_chunk] + + # Compute required wait time for synchronization using current sample rate + if self._timestamp > 0: + wait = self._start + (self._timestamp / current_sample_rate) - time.time() + if wait > 0: + await asyncio.sleep(wait) - # Create AudioFrame for WebRTC - # WebRTC will automatically encode AudioFrame to Opus - # We provide mono 48kHz PCM - WebRTC handles encoding/transmission + # Create AudioFrame for WebRTC at original sample rate + # Opus encoder handles resampling internally frame = AudioFrame.from_ndarray(audio_data, layout="mono") - frame.sample_rate = self._output_sample_rate + frame.sample_rate = current_sample_rate frame.pts = self._timestamp - frame.time_base = fractions.Fraction(1, self._output_sample_rate) - self._timestamp += self._samples_per_10ms + frame.time_base = fractions.Fraction(1, current_sample_rate) + self._timestamp += samples_per_chunk return frame From 07aff775ec3bfcce35e6db4c8d9dbf06ff916fa3 Mon Sep 17 00:00:00 2001 From: sebvanleuven Date: Thu, 29 Jan 2026 17:29:42 +0000 Subject: [PATCH 06/23] return error when sample rate does not exists --- src/anam/_streaming.py | 11 ++++++++--- src/anam/_user_audio_input_track.py | 11 ++--------- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/src/anam/_streaming.py b/src/anam/_streaming.py index 987b787..9e4bb04 100644 --- a/src/anam/_streaming.py +++ b/src/anam/_streaming.py @@ -735,7 +735,12 @@ def send_user_audio( ) self._user_audio_input_track = UserAudioInputTrack() + # CRITICAL: Queue audio samples BEFORE adding track to transceiver + # This ensures WebRTC can't call recv() before we know the sample rate + self._user_audio_input_track.add_audio_samples(audio_bytes, sample_rate, num_channels) + # Add track to transceiver (lazy track creation) + # Now that audio is queued, WebRTC can safely call recv() and will get the correct format if self._audio_transceiver and self._audio_transceiver.sender: try: self._audio_transceiver.sender.replaceTrack(self._user_audio_input_track) @@ -756,9 +761,9 @@ def send_user_audio( if self._is_connected: logger.debug("Connection already established - flushing audio queue on track creation") self._user_audio_input_track.flush() - - # Add audio samples to track buffer - self._user_audio_input_track.add_audio_samples(audio_bytes, sample_rate, num_channels) + else: + # Track already exists - just add audio samples + self._user_audio_input_track.add_audio_samples(audio_bytes, sample_rate, num_channels) def __del__(self) -> None: """Cleanup on destruction to prevent warnings.""" diff --git a/src/anam/_user_audio_input_track.py b/src/anam/_user_audio_input_track.py index 68cf791..7f86b6d 100644 --- a/src/anam/_user_audio_input_track.py +++ b/src/anam/_user_audio_input_track.py @@ -221,15 +221,8 @@ async def recv(self) -> AudioFrame: samples_per_chunk = current_sample_rate * 10 // 1000 audio_data = np.zeros((1, samples_per_chunk), dtype=np.int16) else: - # No audio format known yet - this shouldn't happen since track is created - # lazily when first audio arrives, but use default as fallback - logger.warning( - "No audio format known - generating silence at default 48kHz. " - "This may cause format mismatch if actual audio is different." - ) - current_sample_rate = self._output_sample_rate - samples_per_chunk = self._samples_per_10ms - audio_data = np.zeros((1, samples_per_chunk), dtype=np.int16) + from aiortc.mediastreams import MediaStreamError + raise MediaStreamError("aiortc called recv() but no samples have been queued.") else: # Ensure we have exactly 10ms worth of samples at current sample rate if audio_data.shape[1] < samples_per_chunk: From d72d809a11e2913bc41ee913b2c989a0f5adec83 Mon Sep 17 00:00:00 2001 From: sebvanleuven Date: Thu, 29 Jan 2026 20:45:04 +0000 Subject: [PATCH 07/23] replace queue with buffer, while until enough data is captured, flush buffer on recv, no flush on connect --- src/anam/_streaming.py | 25 +-- src/anam/_user_audio_input_track.py | 271 +++++++++++----------------- 2 files changed, 105 insertions(+), 191 deletions(-) diff --git a/src/anam/_streaming.py b/src/anam/_streaming.py index 9e4bb04..7c7ad51 100644 --- a/src/anam/_streaming.py +++ b/src/anam/_streaming.py @@ -733,37 +733,20 @@ def send_user_audio( f"Creating user audio input track: sample_rate={sample_rate}Hz, " f"channels={num_channels}" ) - self._user_audio_input_track = UserAudioInputTrack() - - # CRITICAL: Queue audio samples BEFORE adding track to transceiver - # This ensures WebRTC can't call recv() before we know the sample rate - self._user_audio_input_track.add_audio_samples(audio_bytes, sample_rate, num_channels) + self._user_audio_input_track = UserAudioInputTrack(sample_rate, num_channels) # Add track to transceiver (lazy track creation) - # Now that audio is queued, WebRTC can safely call recv() and will get the correct format if self._audio_transceiver and self._audio_transceiver.sender: try: self._audio_transceiver.sender.replaceTrack(self._user_audio_input_track) - logger.info( - "Added user audio track to transceiver - " - "WebRTC will call recv() to get audio frames for encoding to Opus" - ) + logger.info("Added user audio track to transceiver") except Exception as e: - logger.error(f"Failed to add user audio track to transceiver: {e}") raise RuntimeError(f"Failed to add user audio track: {e}") from e else: - logger.error("Audio transceiver or sender not available") raise RuntimeError("Audio transceiver not properly initialized") - # If connection is already established, flush any audio that was queued - # before the track was created (this handles the case where audio arrives - # before connection is established, then track is created after connection) - if self._is_connected: - logger.debug("Connection already established - flushing audio queue on track creation") - self._user_audio_input_track.flush() - else: - # Track already exists - just add audio samples - self._user_audio_input_track.add_audio_samples(audio_bytes, sample_rate, num_channels) + # Add audio samples to track buffer + self._user_audio_input_track.add_audio_samples(audio_bytes, sample_rate, num_channels) def __del__(self) -> None: """Cleanup on destruction to prevent warnings.""" diff --git a/src/anam/_user_audio_input_track.py b/src/anam/_user_audio_input_track.py index 7f86b6d..ec36392 100644 --- a/src/anam/_user_audio_input_track.py +++ b/src/anam/_user_audio_input_track.py @@ -1,119 +1,83 @@ """User audio input track for sending raw audio samples to Anam via WebRTC. This module provides a mechanism for accepting raw audio samples from Pipecat -and converting them to WebRTC-compatible format (48kHz mono) for transmission. +and converting them to WebRTC-compatible format for transmission. """ import asyncio import fractions import logging -import time -from collections import deque -from typing import Optional import numpy as np -from aiortc.mediastreams import AudioStreamTrack +from aiortc.mediastreams import AudioStreamTrack, MediaStreamError, AUDIO_PTIME from av.audio.frame import AudioFrame -logger = logging.getLogger(__name__) -# WebRTC standard audio sample rate -WEBRTC_AUDIO_SAMPLE_RATE = 48000 +logger = logging.getLogger(__name__) class UserAudioInputTrack(AudioStreamTrack): """AudioStreamTrack that accepts raw audio samples and converts to WebRTC format. - This track accepts raw audio bytes (16-bit PCM) with variable sample rates - and channel counts, and converts them to WebRTC-compatible format (48kHz mono). - The track is created lazily when first audio arrives, minimizing latency. + This track accepts raw audio bytes (16-bit PCM) and converts them to AudioFrames + for WebRTC transmission. Audio is stored in a byte buffer and converted to + AudioFrame only when recv() is called. - The track buffers audio in small chunks (10ms) and handles resampling/conversion - only when necessary. If WebRTC/Opus can handle the input format directly, - minimal processing is performed. + To stay close to the live point, the buffer is flushed on the first recv() call, + keeping only the most recent chunk. This handles the case where audio accumulates + between track connection and WebRTC starting to pull frames. """ - def __init__(self): + def __init__(self, sample_rate: int, num_channels: int): """Initialize the user audio input track. - The track determines audio format (sample rate, channels) from the actual - audio data received via add_audio_samples(). No assumptions are made about - the input format. + Args: + sample_rate: Sample rate of the audio (Hz), e.g., 16000 or 48000. + num_channels: Number of channels (1=mono, 2=stereo). """ super().__init__() - self._output_sample_rate = WEBRTC_AUDIO_SAMPLE_RATE - self._samples_per_10ms = self._output_sample_rate * 10 // 1000 - self._bytes_per_10ms = self._samples_per_10ms * 2 # 16-bit (2 bytes per sample) - self._timestamp = 0 - self._start = time.time() + self._sample_rate = sample_rate + self._num_channels = num_channels - # Queue of (audio_bytes, sample_rate, num_channels) tuples - # Audio is queued in 10ms chunks at input sample rate - self._audio_queue: deque[tuple[bytes, int, int]] = deque() + # Byte buffer for raw 16-bit PCM audio + self._audio_buffer = bytearray() - # Track current sample rate for timing calculations - # Set from actual audio data when first chunk is processed - self._current_sample_rate: Optional[int] = None + # Calculate samples per chunk (20ms frame) + self._samples_per_chunk = int(sample_rate * AUDIO_PTIME) + # 16-bit = 2 bytes per sample, per channel + self._bytes_per_chunk = self._samples_per_chunk * 2 * num_channels - # Flag to indicate if connection is closed - prevents generating frames after disconnect + # Timestamp for frame pts (in samples) + self._timestamp = 0 + + # Flag to indicate if track is closed self._is_closed = False - # Flag to track if this is the first recv() call - flush buffer on first call - # This handles the case where audio arrives between connection established and WebRTC starting to pull + # Flag to flush buffer on first recv() - handles audio that accumulated + # between track connection and WebRTC starting to pull frames self._first_recv = True - # Maximum queue size for backpressure (approximately 1 second at 16kHz = 100 chunks) - # If queue exceeds this, we drop old audio to prevent unbounded growth - self._max_queue_size = 100 - - # Lock for thread-safe operations + # Lock for thread-safe buffer access self._lock = asyncio.Lock() - def flush(self) -> None: - """Flush the audio queue to discard any buffered audio. + # Maximum buffer size for backpressure (~500ms of audio) + # Drop oldest audio if buffer exceeds this to prevent unbounded growth + self._max_buffer_bytes = self._bytes_per_chunk * 50 - This should be called when the WebRTC connection is established to ensure - we start with live audio immediately instead of catching up on buffered audio. - - Preserves the last chunk(s) from the queue to maintain format information - (sample rate, channels) so we don't generate silence at wrong format. - """ - queue_size = len(self._audio_queue) - if not self._audio_queue or queue_size == 0: - logger.debug("Audio queue is empty - nothing to flush") - return - - # Preserve the last chunk to maintain format information - # This ensures we know the sample rate/channels before generating any silence - last_chunk = self._audio_queue[-1] - - # Clear the queue - self._audio_queue.clear() - - # Put the last chunk back if we have one - # This preserves format info while discarding old buffered audio - self._audio_queue.append(last_chunk) - # Update current sample rate from the preserved chunk - _, sample_rate, _ = last_chunk - self._current_sample_rate = sample_rate logger.info( - f"Flushed audio queue: discarded {queue_size - 1} buffered audio chunks, " - f"preserved last chunk at {sample_rate}Hz to maintain format" + f"UserAudioInputTrack initialized: {sample_rate}Hz, {num_channels} channel(s), " + f"{self._bytes_per_chunk} bytes per chunk" ) def close(self) -> None: - """Mark track as closed and clear audio queue to prevent further frame generation. + """Mark track as closed and clear audio buffer. - This method should be called when the connection is closing to stop WebRTC - from continuing to pull audio frames. After this is called, recv() will raise - MediaStreamError to signal WebRTC to stop calling it. + After this is called, recv() will raise MediaStreamError to signal + WebRTC to stop calling it. """ self._is_closed = True - # Clear the queue to prevent processing any remaining queued audio - # Note: We don't use the lock here because this is called during cleanup - # and we want to be sure the flag is set immediately - self._audio_queue.clear() - logger.debug("UserAudioInputTrack closed - cleared audio queue and marked as closed") + self._audio_buffer = bytearray() + logger.debug("UserAudioInputTrack closed") def add_audio_samples( self, @@ -123,131 +87,98 @@ def add_audio_samples( ) -> None: """Add raw audio samples to the track buffer. - This method accepts raw 16-bit PCM audio bytes and queues them for - transmission. The audio format (sample rate, channels) is determined - from the actual audio data provided. - Args: audio_bytes: Raw audio data (16-bit PCM). sample_rate: Sample rate of the input audio (Hz). - num_channels: Number of channels in the input audio (1=mono, 2=stereo). + num_channels: Number of channels in the input audio. """ - # Convert to numpy array for processing - samples = np.frombuffer(audio_bytes, dtype=np.int16) - - # Handle multi-channel audio (convert to mono if needed) - if num_channels > 1: - samples = samples.reshape(-1, num_channels).mean(axis=1).astype(np.int16) - - # Calculate samples per 10ms at input sample rate - samples_per_10ms_input = sample_rate * 10 // 1000 - - # Break into 10ms chunks at input sample rate for minimal buffering - for i in range(0, len(samples), samples_per_10ms_input): - chunk_samples = samples[i:i + samples_per_10ms_input] + if self._is_closed: + return - # Pad last chunk if needed to make it exactly 10ms - if len(chunk_samples) < samples_per_10ms_input: - padding_samples = samples_per_10ms_input - len(chunk_samples) - padding = np.zeros(padding_samples, dtype=np.int16) - chunk_samples = np.concatenate([chunk_samples, padding]) + # Validate format matches initialization + if sample_rate != self._sample_rate: + logger.warning( + f"Sample rate mismatch: expected {self._sample_rate}Hz, got {sample_rate}Hz. " + "Discarding audio." + ) + return - chunk_bytes = chunk_samples.astype(np.int16).tobytes() + if num_channels != self._num_channels: + logger.warning( + f"Channel count mismatch: expected {self._num_channels}, got {num_channels}. " + "Discarding audio." + ) + return - # Apply backpressure: if queue is too large, drop oldest audio - # This prevents unbounded memory growth when audio arrives faster than WebRTC can consume - if len(self._audio_queue) >= self._max_queue_size: - self._audio_queue.popleft() - logger.debug( - f"Queue full ({len(self._audio_queue)} items) - dropping oldest chunk " - f"to apply backpressure" - ) + # Append to buffer + self._audio_buffer.extend(audio_bytes) - self._audio_queue.append((chunk_bytes, sample_rate, 1)) # Always mono after conversion + # Backpressure: drop oldest audio if buffer is too large + if len(self._audio_buffer) > self._max_buffer_bytes: + excess = len(self._audio_buffer) - self._max_buffer_bytes + # Align to frame boundary + excess = (excess // self._bytes_per_chunk) * self._bytes_per_chunk + if excess > 0: + self._audio_buffer = self._audio_buffer[excess:] + logger.debug(f"Dropped {excess} bytes of old audio due to buffer overflow") async def recv(self) -> AudioFrame: """Return the next audio frame for WebRTC transmission. - This method sends audio at its original sample rate. The Opus encoder - handles resampling internally. + On the first call, flushes the buffer to stay close to the live point, + keeping only the most recent chunk. Returns: - An AudioFrame containing the next 10ms of audio data at original sample rate. + An AudioFrame containing chunk of audio data. Raises: MediaStreamError: If the track has been closed. """ - # Check if track has been closed - raise error to stop WebRTC from calling recv() if self._is_closed: - from aiortc.mediastreams import MediaStreamError raise MediaStreamError("Track has been closed") - # Flush buffer on first recv() call to catch any audio that arrived between - # connection established and WebRTC starting to pull frames + # On first recv, flush buffer to stay near live point + # This discards audio that accumulated during connection setup if self._first_recv: + async with self._lock: + if len(self._audio_buffer) > self._bytes_per_chunk: + # Keep only the last chunk + self._audio_buffer = self._audio_buffer[-self._bytes_per_chunk:] + logger.debug("Flushed audio buffer on first recv to stay near live point") self._first_recv = False - self.flush() - audio_data = None - current_sample_rate = None - samples_per_chunk = None + # Wait for enough data (chunk) + while len(self._audio_buffer) < self._bytes_per_chunk: + if self._is_closed: + raise MediaStreamError("Track has been closed") + await asyncio.sleep(0.001) # 1ms poll + # Extract one chunk from buffer async with self._lock: - # Double-check after acquiring lock (race condition protection) if self._is_closed: - from aiortc.mediastreams import MediaStreamError raise MediaStreamError("Track has been closed") - if self._audio_queue: - # Process audio from queue - format is determined from actual audio data - audio_bytes, sample_rate, num_channels = self._audio_queue.popleft() - current_sample_rate = sample_rate - samples_per_chunk = sample_rate * 10 // 1000 - - # Convert bytes to numpy array (already mono, 16-bit PCM) - samples = np.frombuffer(audio_bytes, dtype=np.int16) - - # Use audio at original sample rate - Opus encoder handles resampling internally - audio_data = samples[None, :] # Shape: (1, num_samples) - - # Update current sample rate tracking - self._current_sample_rate = sample_rate - - # Generate silence if no audio available, using known sample rate - if audio_data is None: - if self._current_sample_rate is not None: - # We've seen audio before - generate silence at the same sample rate - current_sample_rate = self._current_sample_rate - samples_per_chunk = current_sample_rate * 10 // 1000 - audio_data = np.zeros((1, samples_per_chunk), dtype=np.int16) - else: - from aiortc.mediastreams import MediaStreamError - raise MediaStreamError("aiortc called recv() but no samples have been queued.") + chunk_bytes = bytes(self._audio_buffer[: self._bytes_per_chunk]) + self._audio_buffer = self._audio_buffer[self._bytes_per_chunk :] + + # Convert bytes to numpy array (16-bit PCM) + samples = np.frombuffer(chunk_bytes, dtype=np.int16) + + # Shape for AudioFrame + if self._num_channels == 1: + audio_data = samples[None, :] # Shape: (1, num_samples) + layout = "mono" else: - # Ensure we have exactly 10ms worth of samples at current sample rate - if audio_data.shape[1] < samples_per_chunk: - # Pad with silence if we have less than 10ms - padding = np.zeros((1, samples_per_chunk - audio_data.shape[1]), dtype=np.int16) - audio_data = np.concatenate([audio_data, padding], axis=1) - elif audio_data.shape[1] > samples_per_chunk: - # Queue the rest if we have more than 10ms - remaining_samples = audio_data[:, samples_per_chunk:] - remaining_bytes = remaining_samples.astype(np.int16).tobytes() - async with self._lock: - self._audio_queue.appendleft((remaining_bytes, current_sample_rate, 1)) - audio_data = audio_data[:, :samples_per_chunk] - - # Compute required wait time for synchronization using current sample rate - if self._timestamp > 0: - wait = self._start + (self._timestamp / current_sample_rate) - time.time() - if wait > 0: - await asyncio.sleep(wait) - - # Create AudioFrame for WebRTC at original sample rate - # Opus encoder handles resampling internally - frame = AudioFrame.from_ndarray(audio_data, layout="mono") - frame.sample_rate = current_sample_rate + # Reshape interleaved stereo to (2, num_samples) + audio_data = samples.reshape((-1, self._num_channels)).T + layout = "stereo" + + # Create AudioFrame + frame = AudioFrame.from_ndarray(audio_data, layout=layout) + frame.sample_rate = self._sample_rate frame.pts = self._timestamp - frame.time_base = fractions.Fraction(1, current_sample_rate) - self._timestamp += samples_per_chunk + frame.time_base = fractions.Fraction(1, self._sample_rate) + + self._timestamp += self._samples_per_chunk + return frame From c65d1eab9d8d8d63f88fe796c987a863258557e5 Mon Sep 17 00:00:00 2001 From: sebvanleuven Date: Thu, 29 Jan 2026 20:47:44 +0000 Subject: [PATCH 08/23] remove flush on connection state changes --- src/anam/_streaming.py | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/src/anam/_streaming.py b/src/anam/_streaming.py index 7c7ad51..a388890 100644 --- a/src/anam/_streaming.py +++ b/src/anam/_streaming.py @@ -80,20 +80,6 @@ def __init__( self._user_audio_input_track: UserAudioInputTrack | None = None self._audio_transceiver = None # Store transceiver for lazy track creation - def _on_connection_established_internal(self) -> None: - """Internal callback that flushes audio buffer when connection is established. - - This is called automatically before the user's on_connection_established callback. - """ - # Flush buffered audio when connection is established - # This ensures we start with live audio instead of catching up on buffered audio - logger.debug("_on_connection_established_internal ready for duty, sir.") - if self._user_audio_input_track: - logger.debug("Flushing audio queue on connection established") - self._user_audio_input_track.flush() - else: - logger.debug("Cannot flush audio queue - track not created yet (will flush when track is created)") - async def connect(self, timeout: float = 30.0) -> None: """Start the streaming connection. @@ -306,8 +292,6 @@ def on_ice_connection_state_change() -> None: self._is_connected = True if hasattr(self, "_connection_ready"): self._connection_ready.set() - # Call internal flush callback, then user's callback - self._on_connection_established_internal() if self._on_connection_established: asyncio.create_task(self._on_connection_established()) elif state == "failed": @@ -431,8 +415,6 @@ async def video_frames(self) -> AsyncIterator[VideoFrame]: self._is_connected = True if hasattr(self, "_connection_ready"): self._connection_ready.set() - # Call internal flush callback, then user's callback - self._on_connection_established_internal() if self._on_connection_established: asyncio.create_task(self._on_connection_established()) @@ -487,8 +469,6 @@ async def audio_frames(self) -> AsyncIterator[AudioFrame]: self._is_connected = True if hasattr(self, "_connection_ready"): self._connection_ready.set() - # Call internal flush callback, then user's callback - self._on_connection_established_internal() if self._on_connection_established: asyncio.create_task(self._on_connection_established()) From cae2fb88790769c19a8d8a3ae6fb4e8a157c38b1 Mon Sep 17 00:00:00 2001 From: sebvanleuven Date: Thu, 29 Jan 2026 20:49:41 +0000 Subject: [PATCH 09/23] undo logger->print for example --- examples/persona_interactive_video.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/persona_interactive_video.py b/examples/persona_interactive_video.py index 4d66316..89f991c 100644 --- a/examples/persona_interactive_video.py +++ b/examples/persona_interactive_video.py @@ -214,7 +214,7 @@ async def on_message_stream_event(event) -> None: @client.on(AnamEvent.MESSAGE_HISTORY_UPDATED) async def on_message_history_updated(messages) -> None: """Handle message history updates.""" - print(f"\nšŸ“ Message history updated: {len(messages)} messages total") + logger.debug(f"\nšŸ“ Message history updated: {len(messages)} messages total") async def consume_video_frames(session) -> None: """Consume video frames from iterator.""" From c18f1458450890962d06bad95fc2ca88c92c71f2 Mon Sep 17 00:00:00 2001 From: sebvanleuven Date: Thu, 29 Jan 2026 20:53:07 +0000 Subject: [PATCH 10/23] nit comment --- src/anam/_streaming.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/anam/_streaming.py b/src/anam/_streaming.py index a388890..1fbdd8b 100644 --- a/src/anam/_streaming.py +++ b/src/anam/_streaming.py @@ -330,9 +330,7 @@ def on_track(track: MediaStreamTrack) -> None: if self._disable_input_audio: self._peer_connection.addTransceiver("audio", direction="recvonly") else: - # Always add a transceiver first (like JavaScript SDK) - # This ensures proper WebRTC negotiation - # We add transceiver early but create track lazily when audio arrives + # Add a transceiver to ensure proper WebRTC negotiation self._audio_transceiver = self._peer_connection.addTransceiver("audio", direction="sendrecv") logger.info( f"Added audio transceiver: direction={self._audio_transceiver.direction}, " From 91fd6779c1d77b66355497f1c50e512729616b2f Mon Sep 17 00:00:00 2001 From: sebvanleuven Date: Thu, 29 Jan 2026 21:16:26 +0000 Subject: [PATCH 11/23] lintr --- src/anam/_streaming.py | 4 +++- src/anam/_user_audio_input_track.py | 5 ++--- src/anam/types.py | 1 - 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/anam/_streaming.py b/src/anam/_streaming.py index 1fbdd8b..1a492ba 100644 --- a/src/anam/_streaming.py +++ b/src/anam/_streaming.py @@ -331,7 +331,9 @@ def on_track(track: MediaStreamTrack) -> None: self._peer_connection.addTransceiver("audio", direction="recvonly") else: # Add a transceiver to ensure proper WebRTC negotiation - self._audio_transceiver = self._peer_connection.addTransceiver("audio", direction="sendrecv") + self._audio_transceiver = self._peer_connection.addTransceiver( + "audio", direction="sendrecv" + ) logger.info( f"Added audio transceiver: direction={self._audio_transceiver.direction}, " f"disable_input_audio={self._disable_input_audio}" diff --git a/src/anam/_user_audio_input_track.py b/src/anam/_user_audio_input_track.py index ec36392..70ce5e7 100644 --- a/src/anam/_user_audio_input_track.py +++ b/src/anam/_user_audio_input_track.py @@ -9,10 +9,9 @@ import logging import numpy as np -from aiortc.mediastreams import AudioStreamTrack, MediaStreamError, AUDIO_PTIME +from aiortc.mediastreams import AUDIO_PTIME, AudioStreamTrack, MediaStreamError from av.audio.frame import AudioFrame - logger = logging.getLogger(__name__) @@ -143,7 +142,7 @@ async def recv(self) -> AudioFrame: async with self._lock: if len(self._audio_buffer) > self._bytes_per_chunk: # Keep only the last chunk - self._audio_buffer = self._audio_buffer[-self._bytes_per_chunk:] + self._audio_buffer = self._audio_buffer[-self._bytes_per_chunk :] logger.debug("Flushed audio buffer on first recv to stay near live point") self._first_recv = False diff --git a/src/anam/types.py b/src/anam/types.py index 7bbe54f..5d89d81 100644 --- a/src/anam/types.py +++ b/src/anam/types.py @@ -5,7 +5,6 @@ from typing import Any - class AnamEvent(str, Enum): """Events emitted by the Anam client.""" From 7f704476419af6dee0609d13f6a5f0cfa7836c0f Mon Sep 17 00:00:00 2001 From: sebvanleuven Date: Tue, 3 Feb 2026 09:16:24 +0000 Subject: [PATCH 12/23] add warning + lint --- src/anam/_streaming.py | 1 + src/anam/_user_audio_input_track.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/anam/_streaming.py b/src/anam/_streaming.py index 1a492ba..e8220ea 100644 --- a/src/anam/_streaming.py +++ b/src/anam/_streaming.py @@ -311,6 +311,7 @@ def on_connection_state_change() -> None: state = self._peer_connection.connectionState logger.debug("Connection state: %s", state) + @self._peer_connection.on("track") def on_track(track: MediaStreamTrack) -> None: logger.info("Received %s track", track.kind) diff --git a/src/anam/_user_audio_input_track.py b/src/anam/_user_audio_input_track.py index 70ce5e7..38207ae 100644 --- a/src/anam/_user_audio_input_track.py +++ b/src/anam/_user_audio_input_track.py @@ -119,7 +119,6 @@ def add_audio_samples( excess = (excess // self._bytes_per_chunk) * self._bytes_per_chunk if excess > 0: self._audio_buffer = self._audio_buffer[excess:] - logger.debug(f"Dropped {excess} bytes of old audio due to buffer overflow") async def recv(self) -> AudioFrame: """Return the next audio frame for WebRTC transmission. @@ -139,11 +138,12 @@ async def recv(self) -> AudioFrame: # On first recv, flush buffer to stay near live point # This discards audio that accumulated during connection setup if self._first_recv: + logger.debug("first recv called now.") async with self._lock: if len(self._audio_buffer) > self._bytes_per_chunk: # Keep only the last chunk self._audio_buffer = self._audio_buffer[-self._bytes_per_chunk :] - logger.debug("Flushed audio buffer on first recv to stay near live point") + logger.warning("Flushing audio buffer to keep up. This can hurt latency") self._first_recv = False # Wait for enough data (chunk) From be63a71fca281900b559c2b0f07f079b26c9d1bb Mon Sep 17 00:00:00 2001 From: sebvanleuven Date: Tue, 3 Feb 2026 09:16:44 +0000 Subject: [PATCH 13/23] ruff format --- src/anam/_streaming.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/anam/_streaming.py b/src/anam/_streaming.py index e8220ea..1a492ba 100644 --- a/src/anam/_streaming.py +++ b/src/anam/_streaming.py @@ -311,7 +311,6 @@ def on_connection_state_change() -> None: state = self._peer_connection.connectionState logger.debug("Connection state: %s", state) - @self._peer_connection.on("track") def on_track(track: MediaStreamTrack) -> None: logger.info("Received %s track", track.kind) From f1444f5f351d17b3ed7bd691bb951c8271603bd6 Mon Sep 17 00:00:00 2001 From: sebvanleuven Date: Tue, 3 Feb 2026 11:21:18 +0000 Subject: [PATCH 14/23] flush on first recv not required after checking for connection state --- src/anam/_streaming.py | 12 ++++++------ src/anam/_user_audio_input_track.py | 15 +-------------- 2 files changed, 7 insertions(+), 20 deletions(-) diff --git a/src/anam/_streaming.py b/src/anam/_streaming.py index 1a492ba..6328ef9 100644 --- a/src/anam/_streaming.py +++ b/src/anam/_streaming.py @@ -687,9 +687,9 @@ def send_user_audio( ) -> None: """Send raw user audio samples to Anam for processing. - This method accepts raw audio bytes (16-bit PCM) and queues them for transmission via WebRTC. - The audio track is created lazily when first audio arrives, minimizing - latency and avoiding unnecessary resource allocation. + This method accepts 16-bit PCM samples and adds them to the audio buffer for transmission via WebRTC. + The audio track is created lazily when first audio arrives. + Audio is only added to the buffer after the connection is established, to avoid accumulating stale audio. Args: audio_bytes: Raw audio data (16-bit PCM). @@ -724,9 +724,9 @@ def send_user_audio( raise RuntimeError(f"Failed to add user audio track: {e}") from e else: raise RuntimeError("Audio transceiver not properly initialized") - - # Add audio samples to track buffer - self._user_audio_input_track.add_audio_samples(audio_bytes, sample_rate, num_channels) + if self._peer_connection.connectionState == "connected": + # Avoid accumulating stale audio, only queue audio when connection is established. + self._user_audio_input_track.add_audio_samples(audio_bytes, sample_rate, num_channels) def __del__(self) -> None: """Cleanup on destruction to prevent warnings.""" diff --git a/src/anam/_user_audio_input_track.py b/src/anam/_user_audio_input_track.py index 38207ae..c207e2e 100644 --- a/src/anam/_user_audio_input_track.py +++ b/src/anam/_user_audio_input_track.py @@ -118,14 +118,12 @@ def add_audio_samples( # Align to frame boundary excess = (excess // self._bytes_per_chunk) * self._bytes_per_chunk if excess > 0: + logger.warning(f"Dropping {excess} bytes of old audio due to buffer overflow") self._audio_buffer = self._audio_buffer[excess:] async def recv(self) -> AudioFrame: """Return the next audio frame for WebRTC transmission. - On the first call, flushes the buffer to stay close to the live point, - keeping only the most recent chunk. - Returns: An AudioFrame containing chunk of audio data. @@ -135,17 +133,6 @@ async def recv(self) -> AudioFrame: if self._is_closed: raise MediaStreamError("Track has been closed") - # On first recv, flush buffer to stay near live point - # This discards audio that accumulated during connection setup - if self._first_recv: - logger.debug("first recv called now.") - async with self._lock: - if len(self._audio_buffer) > self._bytes_per_chunk: - # Keep only the last chunk - self._audio_buffer = self._audio_buffer[-self._bytes_per_chunk :] - logger.warning("Flushing audio buffer to keep up. This can hurt latency") - self._first_recv = False - # Wait for enough data (chunk) while len(self._audio_buffer) < self._bytes_per_chunk: if self._is_closed: From 6acc28a4094f8231df0277266b480b1faf7b0f77 Mon Sep 17 00:00:00 2001 From: sebvanleuven Date: Tue, 3 Feb 2026 11:26:15 +0000 Subject: [PATCH 15/23] add num_channel and sample rate check --- src/anam/_streaming.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/anam/_streaming.py b/src/anam/_streaming.py index 6328ef9..045e34b 100644 --- a/src/anam/_streaming.py +++ b/src/anam/_streaming.py @@ -706,6 +706,8 @@ def send_user_audio( if not self._peer_connection: raise RuntimeError("Peer connection not initialized. Call connect() first.") + if num_channels != 1 and num_channels != 2: + raise RuntimeError("Invalid number of channels. Must be 1 or 2.") # Create track lazily when first audio arrives if self._user_audio_input_track is None: @@ -713,6 +715,9 @@ def send_user_audio( f"Creating user audio input track: sample_rate={sample_rate}Hz, " f"channels={num_channels}" ) + if sample_rate < 16000 or sample_rate > 4800 or sample_rate not in [16000, 24000, 32000, 44100,48000]: + logger.warning(f"Unusual sample rate provided: {sample_rate}Hz. Performance might be detoriated. Verify your audio configuration.") + self._user_audio_input_track = UserAudioInputTrack(sample_rate, num_channels) # Add track to transceiver (lazy track creation) From 54d09c4c4f7dcb1d60a2c51f70f834433aba23bd Mon Sep 17 00:00:00 2001 From: sebvanleuven Date: Tue, 3 Feb 2026 11:26:45 +0000 Subject: [PATCH 16/23] lint --- src/anam/_streaming.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/anam/_streaming.py b/src/anam/_streaming.py index 045e34b..fb1ab68 100644 --- a/src/anam/_streaming.py +++ b/src/anam/_streaming.py @@ -715,8 +715,14 @@ def send_user_audio( f"Creating user audio input track: sample_rate={sample_rate}Hz, " f"channels={num_channels}" ) - if sample_rate < 16000 or sample_rate > 4800 or sample_rate not in [16000, 24000, 32000, 44100,48000]: - logger.warning(f"Unusual sample rate provided: {sample_rate}Hz. Performance might be detoriated. Verify your audio configuration.") + if ( + sample_rate < 16000 + or sample_rate > 4800 + or sample_rate not in [16000, 24000, 32000, 44100, 48000] + ): + logger.warning( + f"Unusual sample rate provided: {sample_rate}Hz. Performance might be detoriated. Verify your audio configuration." + ) self._user_audio_input_track = UserAudioInputTrack(sample_rate, num_channels) From 56a1d5cfb7c22833390fa6fcb2f45230f99f442d Mon Sep 17 00:00:00 2001 From: sebvanleuven Date: Tue, 3 Feb 2026 14:51:40 +0000 Subject: [PATCH 17/23] remove disable_input_audio and always set to sendrecv --- src/anam/_streaming.py | 37 +++++++++++-------------------------- src/anam/client.py | 1 - src/anam/types.py | 3 --- 3 files changed, 11 insertions(+), 30 deletions(-) diff --git a/src/anam/_streaming.py b/src/anam/_streaming.py index fb1ab68..47eeb91 100644 --- a/src/anam/_streaming.py +++ b/src/anam/_streaming.py @@ -41,7 +41,6 @@ def __init__( on_message: Callable[[dict[str, Any]], Awaitable[None]] | None = None, on_connection_established: Callable[[], Awaitable[None]] | None = None, on_connection_closed: Callable[[str, str | None], Awaitable[None]] | None = None, - disable_input_audio: bool = False, custom_ice_servers: list[dict[str, Any]] | None = None, ): """Initialize the streaming client. @@ -51,8 +50,6 @@ def __init__( on_message: Callback for data channel messages. on_connection_established: Callback when connected. on_connection_closed: Callback when disconnected. - disable_input_audio: If True, don't send microphone audio. - When False, audio can be sent via send_user_audio() method. custom_ice_servers: Custom ICE servers (optional). """ self._session_info = session_info @@ -64,7 +61,6 @@ def __init__( self._on_connection_closed = on_connection_closed # Configuration - self._disable_input_audio = disable_input_audio self._ice_servers = custom_ice_servers or session_info.ice_servers # State @@ -326,22 +322,16 @@ def on_track(track: MediaStreamTrack) -> None: # Video: receive only self._peer_connection.addTransceiver("video", direction="recvonly") - # Audio: send/receive or receive only - if self._disable_input_audio: - self._peer_connection.addTransceiver("audio", direction="recvonly") - else: - # Add a transceiver to ensure proper WebRTC negotiation - self._audio_transceiver = self._peer_connection.addTransceiver( - "audio", direction="sendrecv" - ) - logger.info( - f"Added audio transceiver: direction={self._audio_transceiver.direction}, " - f"disable_input_audio={self._disable_input_audio}" - ) - # Track will be created lazily when first audio arrives via send_user_audio() - logger.info( - "Audio input enabled - track will be created lazily when first audio arrives via send_user_audio()" - ) + # Audio: send/receive (track created lazily when first audio arrives via send_user_audio()) + self._audio_transceiver = self._peer_connection.addTransceiver( + "audio", direction="sendrecv" + ) + logger.info( + f"Added audio transceiver: direction={self._audio_transceiver.direction}" + ) + logger.info( + "Audio input enabled - track will be created lazily when first audio arrives via send_user_audio()" + ) logger.debug("Peer connection initialized") @@ -697,13 +687,8 @@ def send_user_audio( num_channels: Number of channels in the input audio (1=mono, 2=stereo). Raises: - RuntimeError: If audio input is disabled or peer connection is not initialized. + RuntimeError: If peer connection is not initialized. """ - if self._disable_input_audio: - raise RuntimeError( - "Audio input is disabled. Set disable_input_audio=False in ClientOptions." - ) - if not self._peer_connection: raise RuntimeError("Peer connection not initialized. Call connect() first.") if num_channels != 1 and num_channels != 2: diff --git a/src/anam/client.py b/src/anam/client.py index 432adf5..73075d3 100644 --- a/src/anam/client.py +++ b/src/anam/client.py @@ -245,7 +245,6 @@ async def connect_async(self) -> "Session": on_message=self._handle_data_message, on_connection_established=self._handle_connection_established, on_connection_closed=self._handle_connection_closed, - disable_input_audio=self._options.disable_input_audio, custom_ice_servers=self._options.ice_servers, ) diff --git a/src/anam/types.py b/src/anam/types.py index 5d89d81..dcd4018 100644 --- a/src/anam/types.py +++ b/src/anam/types.py @@ -109,8 +109,6 @@ class ClientOptions: Args: api_base_url: Base URL for the Anam API. api_version: API version to use. - disable_input_audio: If True, do not send audio to Anam's service. - When False, audio can be sent via send_user_audio() method. ice_servers: Custom ICE servers for WebRTC (optional). client_label: Custom label for session tracking (optional). Defaults to 'python-sdk' if not specified. @@ -118,7 +116,6 @@ class ClientOptions: api_base_url: str = "https://api.anam.ai" api_version: str = "v1" - disable_input_audio: bool = False ice_servers: list[dict[str, Any]] | None = None client_label: str | None = None From 69197dffa320bb0c2544f109c04661d4c1285749 Mon Sep 17 00:00:00 2001 From: sebvanleuven <102162250+sebvanleuven@users.noreply.github.com> Date: Tue, 3 Feb 2026 15:45:30 +0000 Subject: [PATCH 18/23] fix typo Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com> --- src/anam/_streaming.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/anam/_streaming.py b/src/anam/_streaming.py index 47eeb91..1ce7d96 100644 --- a/src/anam/_streaming.py +++ b/src/anam/_streaming.py @@ -702,7 +702,7 @@ def send_user_audio( ) if ( sample_rate < 16000 - or sample_rate > 4800 + or sample_rate > 48000 or sample_rate not in [16000, 24000, 32000, 44100, 48000] ): logger.warning( From 410928627593b2e3d77d743f9a14a07f11806218 Mon Sep 17 00:00:00 2001 From: sebvanleuven Date: Tue, 3 Feb 2026 15:51:16 +0000 Subject: [PATCH 19/23] remove elaborate logging --- src/anam/_streaming.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/anam/_streaming.py b/src/anam/_streaming.py index 47eeb91..9556940 100644 --- a/src/anam/_streaming.py +++ b/src/anam/_streaming.py @@ -326,12 +326,6 @@ def on_track(track: MediaStreamTrack) -> None: self._audio_transceiver = self._peer_connection.addTransceiver( "audio", direction="sendrecv" ) - logger.info( - f"Added audio transceiver: direction={self._audio_transceiver.direction}" - ) - logger.info( - "Audio input enabled - track will be created lazily when first audio arrives via send_user_audio()" - ) logger.debug("Peer connection initialized") From b34f07ae1ec15ec9977628b99ec4be339ed9887e Mon Sep 17 00:00:00 2001 From: sebvanleuven Date: Tue, 3 Feb 2026 17:13:21 +0000 Subject: [PATCH 20/23] fix unit test and update dependencies --- pyproject.toml | 2 ++ tests/test_client.py | 2 -- uv.lock | 2 ++ 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 91faea0..8f987ec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,6 +29,7 @@ dependencies = [ "websockets>=12.0", "numpy>=1.26.0", "python-dotenv>=1.2.1", + "av>=16.0.1", ] [project.optional-dependencies] @@ -84,6 +85,7 @@ disallow_incomplete_defs = true [tool.pytest.ini_options] testpaths = ["tests"] +pythonpath = ["src"] [tool.semantic_release] version_toml = ["pyproject.toml:project.version"] diff --git a/tests/test_client.py b/tests/test_client.py index 0743c82..ea97f89 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -55,7 +55,6 @@ def test_init_with_options(self) -> None: """Test initialization with ClientOptions.""" options = ClientOptions( api_base_url="https://custom.api.com", - disable_input_audio=True, ) client = AnamClient( api_key="test-key", @@ -63,7 +62,6 @@ def test_init_with_options(self) -> None: options=options, ) assert client._options.api_base_url == "https://custom.api.com" - assert client._options.disable_input_audio is True class TestAnamClientEvents: diff --git a/uv.lock b/uv.lock index de09101..9d44fe9 100644 --- a/uv.lock +++ b/uv.lock @@ -193,6 +193,7 @@ source = { editable = "." } dependencies = [ { name = "aiohttp" }, { name = "aiortc" }, + { name = "av" }, { name = "numpy", version = "2.2.6", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" }, { name = "numpy", version = "2.3.5", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" }, { name = "python-dotenv" }, @@ -215,6 +216,7 @@ display = [ requires-dist = [ { name = "aiohttp", specifier = ">=3.9.0" }, { name = "aiortc", specifier = ">=1.14.0" }, + { name = "av", specifier = ">=16.0.1" }, { name = "mypy", marker = "extra == 'dev'", specifier = ">=1.10.0" }, { name = "numpy", specifier = ">=1.26.0" }, { name = "opencv-python", marker = "extra == 'display'", specifier = ">=4.9.0" }, From 76452d73d00934a26c7579dceeec1884554b7a27 Mon Sep 17 00:00:00 2001 From: sebvanleuven Date: Tue, 3 Feb 2026 17:52:07 +0000 Subject: [PATCH 21/23] remove disable_input_audio from examples --- README.md | 4 ---- examples/avatar_audio_passthrough.py | 2 +- examples/persona_interactive_video.py | 2 +- examples/save_recording.py | 2 +- examples/text_to_video.py | 2 +- 5 files changed, 4 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 7944e69..978a869 100644 --- a/README.md +++ b/README.md @@ -101,9 +101,6 @@ client = AnamClient( voice_id="emma", language_code="en", ), - options=ClientOptions( - disable_input_audio=True, # Don't capture microphone - ), ) ``` @@ -313,7 +310,6 @@ from anam import ClientOptions options = ClientOptions( api_base_url="https://api.anam.ai", # API base URL api_version="v1", # API version - disable_input_audio=False, # Disable microphone input ice_servers=None, # Custom ICE servers ) ``` diff --git a/examples/avatar_audio_passthrough.py b/examples/avatar_audio_passthrough.py index 2b0d652..7655659 100644 --- a/examples/avatar_audio_passthrough.py +++ b/examples/avatar_audio_passthrough.py @@ -198,7 +198,7 @@ def main() -> None: client = AnamClient( api_key=api_key, persona_config=persona_config, - options=ClientOptions(disable_input_audio=True, api_base_url=api_base_url), + options=ClientOptions(api_base_url=api_base_url), ) # Create display and audio player diff --git a/examples/persona_interactive_video.py b/examples/persona_interactive_video.py index 89f991c..e9e78d5 100644 --- a/examples/persona_interactive_video.py +++ b/examples/persona_interactive_video.py @@ -294,7 +294,7 @@ def main() -> None: client = AnamClient( api_key=api_key, persona_config=persona_config, - options=ClientOptions(disable_input_audio=False, api_base_url=api_base_url), + options=ClientOptions(api_base_url=api_base_url), ) # Create display and audio player diff --git a/examples/save_recording.py b/examples/save_recording.py index d245c0e..1ae8d51 100644 --- a/examples/save_recording.py +++ b/examples/save_recording.py @@ -131,7 +131,7 @@ async def main() -> None: client = AnamClient( api_key=api_key, persona_id=persona_id, - options=ClientOptions(disable_input_audio=True, api_base_url=api_base_url), + options=ClientOptions(api_base_url=api_base_url), ) # Register connection event handler diff --git a/examples/text_to_video.py b/examples/text_to_video.py index 0d0d5a9..3527f4e 100644 --- a/examples/text_to_video.py +++ b/examples/text_to_video.py @@ -268,7 +268,7 @@ async def text_to_video( client = AnamClient( api_key=api_key, persona_config=persona_config, - options=ClientOptions(disable_input_audio=True, api_base_url=api_base_url), + options=ClientOptions(api_base_url=api_base_url), ) # Temp files for video and audio (keep extensions for format detection) From 4ff60a15d1a3c89817ec8c1e967aa6cf45deee65 Mon Sep 17 00:00:00 2001 From: sebvanleuven Date: Tue, 3 Feb 2026 19:18:36 +0000 Subject: [PATCH 22/23] readme updated --- README.md | 19 ++++++++++++++++--- src/anam/_streaming.py | 8 -------- src/anam/_user_audio_input_track.py | 5 +++-- 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 978a869..f8cbeb7 100644 --- a/README.md +++ b/README.md @@ -69,12 +69,13 @@ asyncio.run(main()) - šŸ’¬ **Two-way communication** - Send text messages (like transcribed user speech) and receive generated responses - šŸ“ **Real-time transcriptions** - Receive incremental message stream events for user and persona text as it's generated - šŸ“š **Message history tracking** - Automatic conversation history with incremental updates -- šŸŽ¤ **Audio-passthrough** - Send TTS generated audio input and receive rendered synchronized audio/video avatar +- šŸ¤– **Audio-passthrough** - Send TTS generated audio input and receive rendered synchronized audio/video avatar - šŸ—£ļø **Direct text-to-speech** - Send text directly to TTS for immediate speech output (bypasses LLM processing) -- šŸŽÆ **Async iterator API** - Clean, Pythonic async/await patterns for continuous stream of audio/video frames +- šŸŽ¤ **Real-time user audio input** - Send raw audio samples (e.g. from microphone) to Anam for processing (turnkey solution: STT → LLM → TTS → Face) +- šŸ“” **Async iterator API** - Clean, Pythonic async/await patterns for continuous stream of audio/video frames - šŸŽÆ **Event-driven API** - Simple decorator-based event handlers for discrete events - šŸ“ **Fully typed** - Complete type hints for IDE support -- šŸ”’ **Server-side ready** - Designed for server-side Python applications (e.g. for use in a web application) +- šŸ”’ **Server-side ready** - Designed for server-side Python applications (e.g. for backend pipelines) ## API Reference @@ -123,6 +124,17 @@ async with client.connect() as session: # Both streams run concurrently await asyncio.gather(process_video(), process_audio()) ``` +### User Audio Input + +User audio input is real time audio such as microphone audio. +User audio is 16 bit PCM samples, mono or stereo, with any sample rate. In order to process the audio correctly, the sample rate needs to be provided. +The audio is forwarded in real-time as a webRTC audio track. In order to reduce latency, any audio provided before the webRTC audio track is created will be dropped. + +### TTS audio (Audio Passthrough) + +TTS audio is generated by a TTS engine, and should be provided in chunks through the `send_audio_chunk` method. The audio can be a byte array or base64 encoded strings (the SDK will convert to base64). The audio is ingested to the backend at max bandwidth. Sample_rate and channels need to be provided through the `AgentAudioInputConfig` object. + +For best performance, we suggest using 24kHz mono audio. The provided audio is returned in-sync with the avatar without any resampling. Sample rates lower than 24kHz will result in poor Avatar performance. Sample rates higher than 24kHz might impact latency without any noticeable improvement in audio quality. ### Events @@ -355,6 +367,7 @@ except AnamError as e: - `aiohttp` - HTTP client - `websockets` - WebSocket client - `numpy` - Array handling + - `pyav` - Video and audio handling Optional for display utilities: - `opencv-python` - Video display diff --git a/src/anam/_streaming.py b/src/anam/_streaming.py index acde9a6..7db9990 100644 --- a/src/anam/_streaming.py +++ b/src/anam/_streaming.py @@ -694,14 +694,6 @@ def send_user_audio( f"Creating user audio input track: sample_rate={sample_rate}Hz, " f"channels={num_channels}" ) - if ( - sample_rate < 16000 - or sample_rate > 48000 - or sample_rate not in [16000, 24000, 32000, 44100, 48000] - ): - logger.warning( - f"Unusual sample rate provided: {sample_rate}Hz. Performance might be detoriated. Verify your audio configuration." - ) self._user_audio_input_track = UserAudioInputTrack(sample_rate, num_channels) diff --git a/src/anam/_user_audio_input_track.py b/src/anam/_user_audio_input_track.py index c207e2e..b3ba712 100644 --- a/src/anam/_user_audio_input_track.py +++ b/src/anam/_user_audio_input_track.py @@ -1,7 +1,8 @@ """User audio input track for sending raw audio samples to Anam via WebRTC. -This module provides a mechanism for accepting raw audio samples from Pipecat -and converting them to WebRTC-compatible format for transmission. +This module provides a mechanism for accepting raw audio samples and +converting them to WebRTC-compatible format for transmission. +User audio is real time audio such as microphone audio. """ import asyncio From a63666a99db3326942e9dd079a88dd5ff412b7f3 Mon Sep 17 00:00:00 2001 From: sebvanleuven Date: Tue, 3 Feb 2026 19:21:03 +0000 Subject: [PATCH 23/23] linter --- src/anam/_user_audio_input_track.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/anam/_user_audio_input_track.py b/src/anam/_user_audio_input_track.py index b3ba712..d8e52f0 100644 --- a/src/anam/_user_audio_input_track.py +++ b/src/anam/_user_audio_input_track.py @@ -1,6 +1,6 @@ """User audio input track for sending raw audio samples to Anam via WebRTC. -This module provides a mechanism for accepting raw audio samples and +This module provides a mechanism for accepting raw audio samples and converting them to WebRTC-compatible format for transmission. User audio is real time audio such as microphone audio. """