基于 FastAPI WebSocket 的实时数字人直播/语音交互服务,支持文本转语音(TTS)并驱动数字人视频/音频流推送。
Algo_RetalkingStreaming_Service-master/
├── server/ # 服务端
│ ├── app.py # 服务入口(FastAPI 应用)
│ ├── config.yaml # 服务端配置
│ ├── common/ # 公共模块
│ │ ├── __init__.py # 信号常量、CODE、工具函数
│ │ ├── config.py # 配置加载
│ │ ├── logger.py # 结构化日志
│ │ └── download.py # 资源下载/视频切片
│ ├── handler/
│ │ └── stream.py # WebSocket 推流处理器
│ └── src/ # 核心算法
│ ├── tts/ # TTS 客户端(Azure/CosyVoice)
│ └── gen_talking_head/ # 数字人生成
├── sdk/ # 客户端 SDK
│ ├── retalking_streaming/ # SDK 包
│ │ ├── client.py # 异步客户端
│ │ ├── sync_client.py # 同步客户端
│ │ ├── cli.py # 命令行工具
│ │ ├── models.py # 数据模型
│ │ ├── constants.py # 常量定义
│ │ ├── rtmp.py # RTMP 签名 URL
│ │ ├── audio.py # 音频处理
│ │ └── automation.py # 高阶自动化(直播/讲课)
│ ├── examples/ # 使用示例
│ ├── tests/ # 单元测试
│ └── pyproject.toml # SDK 打包配置
├── Dockerfile # 服务端 Docker 构建
├── requirements.txt # 服务端依赖
└── README.md
- WebSocket 实时通信:低延迟流式交互
- 双推流模式:
- 数字人视频推流(带动作匹配)
- 纯音频推流
- 多 TTS 后端支持:
- Azure TTS
- CosyVoice 自建 TTS 服务
- 智能文本切分:超过 50 字自动按句切分
- 完整日志追踪:性能指标实时记录
- 帧循环类型:
- 往复(Reciprocating):0→N→0→N 循环
- 顺序(Sequential):0→N 循环重复
- 单次(Once):0→N 播放一次后结束
pip install -r requirements.txt编辑 server/config.yaml:
azure:
key: "your-azure-key"
region: "eastus"
cosyvoice:
url: "http://your-cosyvoice-server:8888"
source_image_folder: "/mnt/algo/source_image"
log_dir: "/mnt/algo/logs"
server:
s1_port: 9910
max_connections:
avatar_streaming: 1
speech_streaming: 10
audio_driven_streaming: 1# 从项目根目录启动
uvicorn server.app:app --host 0.0.0.0 --port 9910
# 或直接运行
python -m server.appWebSocket: /api/algo/streaming/avatar_streaming
参数:
| 参数 | 说明 |
|---|---|
| push_url | 推流地址(RTMP) |
| avatar_id | 数字人 ID |
| voice_id | 音色 ID |
| room_id | 房间 ID(可选) |
请求示例:
ws://localhost:9910/api/algo/streaming/avatar_streaming?push_url=rtmp://xxx/live/test&avatar_id=001&voice_id=zh-CN-XiaoxiaoNeural
WebSocket: /api/algo/streaming/speech_streaming
参数: 同上
WebSocket: /api/algo/streaming/avatar_audio_driven
参数:
| 参数 | 说明 |
|---|---|
| push_url | 推流地址(RTMP) |
| avatar_id | 数字人 ID |
| room_id | 房间 ID(可选) |
SDK 位于 sdk/ 目录,可独立安装使用。
# 基础安装(仅 websockets 依赖)
cd sdk
pip install -e .
# 带音频处理支持
pip install -e ".[audio]"
# 开发模式(含测试)
pip install -e ".[dev]"安装后提供 retalking-streaming 命令:
# 数字人视频推流
retalking-streaming -m avatar -p rtmp://xxx/live/test -a 001 -v zh-CN-XiaoxiaoNeural
# 自动生成签名URL
retalking-streaming -m avatar -r room001 -a 001 -v zh-CN-XiaoxiaoNeural
# 语音推流
retalking-streaming -m speech -r room001 -a 001 -v zh-CN-XiaoxiaoNeural
# 音频直驱推流
retalking-streaming -m avatar_audio -r room001 -a 001# 异步用法
from retalking_streaming import StreamingClient
async with StreamingClient(
mode='avatar',
host='localhost',
port=9910,
push_url='rtmp://xxx/live/test',
avatar_id='001',
voice_id='zh-CN-XiaoxiaoNeural',
) as client:
await client.speak("你好,世界")
await client.pause()
await client.resume()
# 同步用法
from retalking_streaming import SyncStreamingClient
with SyncStreamingClient(mode='avatar', ...) as client:
client.speak("你好,世界")更多示例见 sdk/examples/。
所有 WebSocket 端点支持以下控制信号(以文本帧发送):
| 信号 | 值 | 说明 |
|---|---|---|
| SIGNAL_STOP_TALKING | _stop_talking |
暂停说话,保留队列,可恢复 |
| SIGNAL_RESTART_TALKING | _restart_talking |
恢复说话,从暂停处继续 |
| SIGNAL_GET_REMAIN | _get_remain |
获取当前及后续待处理内容 |
| SIGNAL_CLEAR | _clear |
清空所有待处理内容并中断当前 |
| SIGNAL_CHECK_STATUS | _check_status |
查询推流状态 |
| SIGNAL_CACHE_DURATION | _cache_duration |
查询缓存时长 |
| SIGNAL_INSERT_TALKING | _insert_talking:<text> |
插入文本到队首,下一句播放,原内容推后 |
每次发送文本或音频数据后,服务端立即返回 ACK 消息:
{
"code": 200,
"msg": "Received",
"data": {
"item_id": "a1b2c3d4"
}
}item_id:8位唯一标识,用于后续追踪和区分
音频模式额外返回 duration 字段:
{
"code": 200,
"msg": "Received",
"data": {
"item_id": "a1b2c3d4",
"duration": 1.52
}
}-
SIGNAL_STOP_TALKING(暂停)
- 停止当前推流输出
- 保留消息队列中所有待处理项
- 如果当前文本/音频正在处理中,记录进度位置
- 恢复时从当前话的当前段落重新开始说,并继续后续队列
-
SIGNAL_RESTART_TALKING(恢复)
- 从暂停位置恢复推流
- 当前中断的文本从头开始重新说(当前段落),然后继续后续文本
- 未暂停时忽略此信号
-
SIGNAL_GET_REMAIN(查询剩余)
- TTS 模式返回待处理文本列表:
{ "code": 200, "msg": "Successfully returned.", "data": { "items": [ {"item_id": "a1b2c3d4", "text": "你好世界...", "status": "paused_current"}, {"item_id": "e5f6g7h8", "text": "下一段话...", "status": "queued"} ], "total": 2 } }- 音频模式返回待处理音频列表:
{ "code": 200, "msg": "Successfully returned.", "data": { "items": [ {"item_id": "a1b2c3d4", "duration": 1.52, "status": "paused_current"}, {"item_id": "e5f6g7h8", "duration": 0.85, "status": "queued"} ], "total": 2 } } -
SIGNAL_CLEAR(清空)
- 清空所有待处理队列
- 中断当前正在处理的内容
- 重置暂停状态
- 停止推流输出
音频直驱模式(avatar_audio_driven)通过二进制帧发送音频数据:
- 普通音频:直接发送 WAV 字节流
- 插入音频:以
INSERT:为前缀发送 WAV 字节流(即INSERT:<wav_bytes>)
所有音频将先入队再由后台任务推入推流器,支持暂停/恢复/查询/清空。
docker build -t avatar-streaming .
docker run -d -p 9910:9910 \
-v /mnt/algo/source_image:/mnt/algo/source_image \
-v /mnt/algo/logs:/mnt/algo/logs \
avatar-streaming