-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmulti_doa.py
More file actions
52 lines (44 loc) · 1.63 KB
/
Copy pathmulti_doa.py
File metadata and controls
52 lines (44 loc) · 1.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import usb.core
import usb.util
import time
import sys
from tuning import Tuning
def monitor_all_doa():
# 1. 自动寻找所有连接的 ReSpeaker Mic Array v2.0
# find_all=True 会返回所有符合条件的设备迭代器
devices = list(usb.core.find(idVendor=0x2886, idProduct=0x0018, find_all=True))
if not devices:
print("未找到 ReSpeaker 设备,请检查连接。")
return
# 2. 为所有找到的设备初始化 Tuning 实例
mic_list = []
print(f"检测到 {len(devices)} 台设备:")
for i, dev in enumerate(devices):
# 获取 USB 路径(总线号和地址)用于物理区分
dev_id = f"Bus {dev.bus} Addr {dev.address}"
print(f" [{i}] {dev_id}")
mic_list.append({
"id": i,
"path": dev_id,
"tuning": Tuning(dev)
})
print("\n开始监控 DOA (按 Ctrl+C 停止)...")
try:
while True:
display_parts = []
for mic in mic_list:
try:
# 获取该设备的实时角度
angle = mic["tuning"].direction
display_parts.append(f"Dev{mic['id']}({angle:3d}°)")
except Exception:
display_parts.append(f"Dev{mic['id']}(Lost)")
# 3. 动态拼接所有设备的状态并整行刷新显示
line = " | ".join(display_parts)
sys.stdout.write(f"\r{line}")
sys.stdout.flush()
time.sleep(0.1)
except KeyboardInterrupt:
print("\n\n已停止监控。")
if __name__ == "__main__":
monitor_all_doa()