From 943d3e838d78974caae5d47d8164835394ae6bdd Mon Sep 17 00:00:00 2001 From: YJack0000 Date: Fri, 3 Jul 2026 02:45:51 +0800 Subject: [PATCH] [refactor] centralize mic ownership in a single MicCoordinator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The meeting, Settings mic test, and voice typing each kept their own AtomicBool with hand-written pairwise exclusion checks — and the matrix had holes: voice typing checked nothing, so push-to-talk during a live meeting opened a second CoreAudio input stream that could silently kill the meeting's capture. The two stop paths also disagreed: meetings used a per-session gate + bounded join while voice typing joined unbounded on a shared flag (hang risk + the exact thread-revival race the meeting code was built to prevent). New capture.rs owns the whole story: MicCoordinator (managed state) guarantees at most one live capture session, with priority-based preemption (Meeting > VoiceTyping > MicTest) and idempotent restarts; CaptureSession unifies per-session gates + bounded-join teardown; the duplicated metered-session/usage-billing plumbing is now one function. Fixes along the way: - voice typing during a meeting is now refused (surfaced in the overlay) instead of corrupting the meeting's capture - stop_voice_typing can no longer hang on a wedged device teardown - a failed mic-test start no longer leaks its flag (which permanently refused later tests) - meeting start now joins the preempted mic test before opening the device (was a brief double-stream window) Also dedupes the app-config file path/read/write helpers and clears all outstanding clippy warnings (type alias in diarize, redundant casts in permissions, manual split_once, redundant closure). --- src-tauri/src/capture.rs | 257 ++++++++++++++++++++++++ src-tauri/src/commands.rs | 367 +++++++++++++--------------------- src-tauri/src/diarize.rs | 10 +- src-tauri/src/lib.rs | 9 +- src-tauri/src/voice_typing.rs | 130 ++++-------- 5 files changed, 448 insertions(+), 325 deletions(-) create mode 100644 src-tauri/src/capture.rs diff --git a/src-tauri/src/capture.rs b/src-tauri/src/capture.rs new file mode 100644 index 0000000..870454f --- /dev/null +++ b/src-tauri/src/capture.rs @@ -0,0 +1,257 @@ +//! Shared microphone-capture plumbing for every audio consumer: the live +//! meeting, the Settings mic test, and voice typing. +//! +//! [`MicCoordinator`] is the single source of truth for "who owns the mic". +//! On macOS a second concurrent input stream can make CoreAudio renegotiate +//! the device and silently kill the first capture, so at most ONE pipeline may +//! record at a time. Every start command claims the mic here (a higher-priority +//! user preempts a lower one; a repeated start is an idempotent no-op) and +//! every stop releases it — replacing the pairwise flag checks that used to be +//! scattered across the start/stop commands. +//! +//! A claim owns a [`CaptureSession`]: a fresh per-session gate its capture +//! threads watch, plus their join handles. The gate is never shared across +//! sessions, so a detached/wedged thread from an old session can't be revived +//! by a later start flipping a shared flag back to true. Stop clears the gate +//! and joins the threads with a bounded grace so a stuck CoreAudio teardown +//! can't hang the stop command itself. + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; +use std::thread::JoinHandle; +use std::time::{Duration, Instant}; + +use tauri::{AppHandle, Emitter}; +use tokio::sync::mpsc::UnboundedReceiver; + +use crate::audio::AudioSource; +use crate::transcription::{self, SttProvider, TranscribeConfig}; + +/// Grace given to capture threads to release their device on stop. Threads +/// self-exit within ~100 ms of the gate clearing; anything slower is treated +/// as wedged and detached (harmless — it holds this session's now-dead gate). +const STOP_GRACE: Duration = Duration::from_millis(1500); + +/// The pipelines that can own the microphone. Declaration order is priority +/// order (via `PartialOrd`): a later variant preempts an earlier one's capture, +/// an earlier variant's start yields to a later owner. +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug)] +pub enum MicUser { + /// Settings mic-level preview. Yields to everything. + MicTest, + /// Push-to-talk dictation. Preempts the mic test; yields to a meeting. + VoiceTyping, + /// A live meeting. Preempts everything. + Meeting, +} + +/// One running capture: the per-session gate plus the capture threads +/// watching it. +struct CaptureSession { + gate: Arc, + threads: Vec>, +} + +impl CaptureSession { + /// Clear the gate and join every capture thread, bounded by [`STOP_GRACE`]. + /// Joining lets each thread release its device (dropping its PCM sender, + /// which ends the STT session via channel close). A thread that overstays + /// is detached rather than hanging the caller. + fn stop(&mut self) { + self.gate.store(false, Ordering::SeqCst); + let deadline = Instant::now() + STOP_GRACE; + for h in self.threads.drain(..) { + while !h.is_finished() && Instant::now() < deadline { + std::thread::sleep(Duration::from_millis(10)); + } + if h.is_finished() { + let _ = h.join(); + } else { + log::warn!("mic: capture thread didn't exit within grace; detaching"); + } + } + } +} + +struct Active { + user: MicUser, + session: CaptureSession, +} + +/// Managed-state arbiter guaranteeing at most one live capture session. +#[derive(Default)] +pub struct MicCoordinator(Mutex>); + +/// Outcome of [`MicCoordinator::begin`]. +pub enum Begin { + /// The mic is claimed; hand this gate to every capture thread of the new + /// session (via [`spawn_capture`]). + Started(Arc), + /// `user` already owns the mic — treat the start as an idempotent no-op. + AlreadyActive, + /// A higher-priority pipeline owns the mic; the start must not open a + /// competing stream. + Busy(MicUser), +} + +impl MicCoordinator { + /// Claim the mic for `user` and arm a fresh per-session gate. A + /// lower-priority owner is stopped first (gate cleared + threads joined) so + /// its device is released before the new session opens it. + pub fn begin(&self, user: MicUser) -> Begin { + let mut active = self.0.lock().unwrap(); + match active.as_mut() { + Some(a) if a.user == user => return Begin::AlreadyActive, + Some(a) if a.user > user => return Begin::Busy(a.user), + Some(a) => { + log::info!("mic: {:?} preempts {:?}", user, a.user); + a.session.stop(); + } + None => {} + } + let gate = Arc::new(AtomicBool::new(true)); + *active = Some(Active { + user, + session: CaptureSession { + gate: gate.clone(), + threads: Vec::new(), + }, + }); + Begin::Started(gate) + } + + /// Register a capture thread with `user`'s active session. If `user` lost + /// the mic between starting the thread and registering it, the thread is + /// detached — its gate is already cleared, so it exits on its own. + fn add_thread(&self, user: MicUser, handle: JoinHandle<()>) { + let mut active = self.0.lock().unwrap(); + match active.as_mut() { + Some(a) if a.user == user => a.session.threads.push(handle), + _ => log::warn!("mic: {user:?} lost the mic before its capture registered; detaching"), + } + } + + /// Stop `user`'s session (clear its gate, join its threads with a bounded + /// grace) and free the mic. No-op when `user` doesn't own the mic, so a + /// preempted session's late stop can't clobber the new owner. + pub fn stop(&self, user: MicUser) { + let mut active = self.0.lock().unwrap(); + if matches!(active.as_ref(), Some(a) if a.user == user) { + if let Some(mut a) = active.take() { + a.session.stop(); + } + } + } + + /// Who currently owns the mic (`None` when idle). + pub fn owner(&self) -> Option { + self.0.lock().unwrap().as_ref().map(|a| a.user) + } +} + +/// Start one capture backend on its own thread, registering the thread with +/// `user`'s active session so stop/preemption joins it. Returns the PCM +/// receiver, or the error message if the device failed to start. +pub fn spawn_capture( + coord: &MicCoordinator, + user: MicUser, + source: S, + gate: Arc, + label: &'static str, +) -> Result>, String> { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel::>(); + match source.start(tx, gate) { + Ok(handle) => { + coord.add_thread(user, handle); + Ok(rx) + } + Err(e) => { + log::error!("[{label}] capture failed to start: {e}"); + Err(e.to_string()) + } + } +} + +/// Accumulates the live meeting's recorded PCM (16 kHz mono i16) so it can be +/// encoded to Ogg/Opus on stop and saved into the local history. `None` between +/// meetings; armed (`Some(empty)`) by `start_meeting` and drained by +/// `stop_meeting`. +pub type RecorderBuf = Arc>>>; + +/// Run a transcription session over `rx`, counting the audio streamed so the +/// frontend can bill it. Emits a `usage://stt` event when the session ends. +/// When `recorder` is `Some`, every chunk is also appended to it so the meeting +/// can be saved to history (only the designated session passes a recorder). +/// `emit_error_event` controls whether a failed session raises `meeting://error` +/// (the meeting UI tears down on it); voice typing passes `false` and surfaces +/// errors through its own overlay instead. +pub fn run_metered_session( + app: &AppHandle, + provider: SttProvider, + config: TranscribeConfig, + label: &'static str, + rx: UnboundedReceiver>, + recorder: Option, + emit_error_event: bool, +) -> tauri::async_runtime::JoinHandle<()> { + let app = app.clone(); + tauri::async_runtime::spawn(async move { + // Interpose a sample counter between capture and the STT adapter: it + // forwards every chunk untouched, then yields the total once the input + // closes so we can bill the audio duration actually streamed. + let (count_tx, count_rx) = tokio::sync::mpsc::unbounded_channel::>(); + let counter = tauri::async_runtime::spawn(async move { + let mut rx = rx; + let mut samples: u64 = 0; + while let Some(chunk) = rx.recv().await { + samples += chunk.len() as u64; + // Tee into the recording buffer (kept while the meeting is armed). + if let Some(rec) = &recorder { + if let Some(buf) = rec.lock().unwrap().as_mut() { + buf.extend_from_slice(&chunk); + } + } + if count_tx.send(chunk).is_err() { + break; + } + } + samples + }); + + if let Err(e) = + transcription::run_session(provider, app.clone(), config, label, count_rx).await + { + let msg = e.to_string(); + log::warn!("[stt:{label}] session ended: {msg}"); + if emit_error_event { + // Surface the failure to the UI instead of silently leaving the + // meeting in "recording" with no transcript. Hosted mode hits + // this routinely (402 out of credits / 401 expired session at + // connect); BYOK hits it on a bad key. Classify so the frontend + // can show an actionable message. + let code = if msg.contains("402") { + "quota" + } else if msg.contains("401") { + "auth" + } else { + "connect" + }; + let _ = app.emit( + "meeting://error", + serde_json::json!({ "source": label, "code": code, "message": msg }), + ); + } + } + + let samples = counter.await.unwrap_or(0); + let seconds = samples as f64 / crate::audio::TARGET_SAMPLE_RATE as f64; + let _ = app.emit( + "usage://stt", + serde_json::json!({ + "provider": provider.id(), + "source": label, + "seconds": seconds, + }), + ); + }) +} diff --git a/src-tauri/src/commands.rs b/src-tauri/src/commands.rs index 740c820..9be2c95 100644 --- a/src-tauri/src/commands.rs +++ b/src-tauri/src/commands.rs @@ -1,69 +1,70 @@ -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex}; -use std::thread::JoinHandle; +use std::sync::Mutex; use tauri::{AppHandle, Emitter, Manager, State}; use tokio::sync::mpsc::UnboundedReceiver; -use crate::audio::{microphone::Microphone, AudioSource}; +use crate::audio::microphone::Microphone; +use crate::capture::{ + run_metered_session, spawn_capture, Begin, MicCoordinator, MicUser, RecorderBuf, +}; use crate::transcription::common::{LevelMeter, LEVEL_EVENT}; -use crate::transcription::{self, SttProvider, TranscribeConfig}; +use crate::transcription::{SttProvider, TranscribeConfig}; + +/// Path to `name` inside the app config dir, where all of the app's small +/// config/state files (templates, session snapshot, command queue, voice +/// history) live. +pub(crate) fn app_config_file(app: &AppHandle, name: &str) -> Result { + let dir = app.path().app_config_dir().map_err(|e| e.to_string())?; + Ok(dir.join(name)) +} + +/// Read an app-config file as a string, treating a missing file as empty (the +/// convention for every optional JSON/JSONL state file). +pub(crate) fn read_config_file(app: &AppHandle, name: &str) -> Result { + match std::fs::read_to_string(app_config_file(app, name)?) { + Ok(s) => Ok(s), + Err(_) => Ok(String::new()), + } +} + +/// Write an app-config file, creating the config dir on first use. +pub(crate) fn write_config_file(app: &AppHandle, name: &str, contents: &str) -> Result<(), String> { + let path = app_config_file(app, name)?; + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent).map_err(|e| e.to_string())?; + } + std::fs::write(&path, contents).map_err(|e| e.to_string()) +} /// Path to the shared templates file (app config dir). The local MCP server /// reads/writes the same file so templates can be managed outside the app. pub(crate) fn templates_path(app: &AppHandle) -> Result { - let dir = app.path().app_config_dir().map_err(|e| e.to_string())?; - Ok(dir.join("templates.json")) + app_config_file(app, "templates.json") } /// Read the shared templates JSON (empty string if the file doesn't exist yet). #[tauri::command] pub fn read_templates(app: AppHandle) -> Result { - let path = templates_path(&app)?; - match std::fs::read_to_string(&path) { - Ok(s) => Ok(s), - Err(_) => Ok(String::new()), - } + read_config_file(&app, "templates.json") } /// Write the shared templates JSON, creating the config dir if needed. #[tauri::command] pub fn write_templates(app: AppHandle, json: String) -> Result<(), String> { - let path = templates_path(&app)?; - if let Some(parent) = path.parent() { - std::fs::create_dir_all(parent).map_err(|e| e.to_string())?; - } - std::fs::write(&path, json).map_err(|e| e.to_string()) + write_config_file(&app, "templates.json", &json) } -/// Accumulates the live meeting's recorded PCM (16 kHz mono i16) so it can be -/// encoded to Ogg/Opus on stop and saved into the local history. `None` between -/// meetings; armed (`Some(empty)`) by `start_meeting` and drained by `stop_meeting`. -pub(crate) type RecorderBuf = Arc>>>; - -/// Shared meeting state held in Tauri's managed state. `running` is the global -/// "a meeting is active" status (queried by the mic-test + UI). Capture threads, -/// however, watch a PER-SESSION `capture_gate` (not `running`) so a thread from a -/// previous meeting can never be revived by the next `start_meeting` flipping a -/// shared flag back to true — each meeting gets a fresh gate, and stop clears it. +/// Meeting-specific state held in Tauri's managed state. Who owns the mic (and +/// the capture threads/gate of the live session) lives in [`MicCoordinator`]; +/// only what a meeting needs beyond its capture stays here. #[derive(Default)] pub struct MeetingState { - running: Arc, - /// Per-meeting capture flag handed to every capture thread of the CURRENT - /// session. `start_meeting` installs a fresh one; `stop_meeting` clears it to - /// tell exactly this session's threads to exit. A detached/wedged thread from - /// an old session holds its own (already-false) gate, so a later meeting can't - /// adopt or revive it. `None` between meetings. - capture_gate: Mutex>>, - threads: Mutex>>, /// Live transcription session tasks (the per-source Soniox/etc. WebSocket /// loops). Held so `stop_meeting` can `abort()` them — a direct cancel that /// closes the socket even if the capture→channel-close cascade stalls, instead /// of letting the session linger and keep emitting transcript after stop. tasks: Mutex>>, - /// Separate flag for the Settings "test mic" preview (no Soniox). - test_running: Arc, /// Buffers the recorded audio of the in-progress live meeting (see RecorderBuf). recorder: RecorderBuf, } @@ -80,26 +81,29 @@ pub fn list_input_devices() -> Vec { #[tauri::command] pub fn start_mic_test( app: AppHandle, - state: State, + coord: State, input_device: Option, ) -> Result<(), String> { - // The meeting owns the mic until it ends — never open a competing input stream - // while recording. On macOS a second stream can make CoreAudio renegotiate the - // device and silently kill the meeting's capture (transcription stops). The UI - // also disables the test while recording; this is the reliable backstop. - if state.running.load(Ordering::SeqCst) { - return Ok(()); - } - if state.test_running.swap(true, Ordering::SeqCst) { - return Ok(()); - } - let running = state.test_running.clone(); - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::>(); - Microphone { + // A meeting (or dictation) owns the mic until it ends — never open a + // competing input stream while recording. On macOS a second stream can make + // CoreAudio renegotiate the device and silently kill the live capture + // (transcription stops). The UI also disables the test while recording; the + // coordinator is the reliable backstop. + let gate = match coord.begin(MicUser::MicTest) { + Begin::Started(gate) => gate, + Begin::AlreadyActive | Begin::Busy(_) => return Ok(()), + }; + let mic = Microphone { device_name: input_device, - } - .start(tx, running) - .map_err(|e| e.to_string())?; + }; + let mut rx = match spawn_capture(&coord, MicUser::MicTest, mic, gate, "test") { + Ok(rx) => rx, + Err(e) => { + // Free the claim so the next test isn't refused forever. + coord.stop(MicUser::MicTest); + return Err(e); + } + }; tauri::async_runtime::spawn(async move { // Reuse the shared metering primitive so peak/window logic lives in one place. @@ -117,16 +121,16 @@ pub fn start_mic_test( } #[tauri::command] -pub fn stop_mic_test(state: State) { - state.test_running.store(false, Ordering::SeqCst); +pub fn stop_mic_test(coord: State) { + coord.stop(MicUser::MicTest); } /// Whether a live meeting is currently recording. The Settings UI uses this to /// disable the mic test + device picker while recording (switching the input /// mid-meeting can disrupt the meeting's capture). #[tauri::command] -pub fn meeting_active(state: State) -> bool { - state.running.load(Ordering::SeqCst) +pub fn meeting_active(coord: State) -> bool { + coord.owner() == Some(MicUser::Meeting) } #[tauri::command] @@ -134,6 +138,7 @@ pub fn meeting_active(state: State) -> bool { pub fn start_meeting( app: AppHandle, state: State, + coord: State, provider: String, api_key: String, model: Option, @@ -150,20 +155,18 @@ pub fn start_meeting( return Err("missing transcription API key".into()); } let relay_endpoint = relay_url.filter(|u| !u.trim().is_empty()); - // Ignore if already running. - if state.running.swap(true, Ordering::SeqCst) { - return Ok(()); - } - // Tear down any Settings "test mic" stream so it can't contend with the - // meeting's capture (its thread exits when this flag clears). - state.test_running.store(false, Ordering::SeqCst); - // Per-session capture gate: a FRESH flag owned by THIS meeting's capture threads - // (not the global `state.running`). Installed in state so stop_meeting can clear - // exactly this session's threads; a later start_meeting installs a new one, so a - // detached/wedged thread from a prior session can never be revived. The var is - // named `running` so the capture call sites below read naturally. - let running = Arc::new(AtomicBool::new(true)); - *state.capture_gate.lock().unwrap() = Some(running.clone()); + // Claim the mic. A meeting outranks the Settings mic test and voice typing, + // so the coordinator stops either one first (device released before our + // capture opens); a second start while a meeting runs is an idempotent no-op. + // The gate is per-session: stop_meeting clears exactly this session's + // threads, and a detached/wedged thread from a prior session (holding its + // own dead gate) can never be revived by a later start. + let gate = match coord.begin(MicUser::Meeting) { + Begin::Started(gate) => gate, + Begin::AlreadyActive => return Ok(()), + // Unreachable today (Meeting outranks everything), kept for safety. + Begin::Busy(owner) => return Err(format!("microphone is in use by {owner:?}")), + }; // Arm a fresh recording buffer for this meeting (the designated session below // tees its PCM into it; stop_meeting encodes + clears it). *state.recorder.lock().unwrap() = Some(Vec::new()); @@ -196,9 +199,10 @@ pub fn start_meeting( if diarization { // Tap the PRE-MIX mic for delivery coaching (issue #22): the prosody // analyzer must see raw mic, never the mixed/diarized stream. - let rx_me = spawn_capture(&state.threads, mic, running.clone(), "me") + let rx_me = spawn_capture(&coord, MicUser::Meeting, mic, gate.clone(), "me") + .ok() .map(|rx| spawn_mic_prosody_tap(&app, rx)); - let rx_them = spawn_capture(&state.threads, sys, running.clone(), "them"); + let rx_them = spawn_capture(&coord, MicUser::Meeting, sys, gate.clone(), "them").ok(); let mut tasks = state.tasks.lock().unwrap(); log::info!( "meeting: capture started (mic: {}, system: {})", @@ -222,6 +226,7 @@ pub fn start_meeting( "mix", rx_mix, Some(recorder), + true, )); } // If one capture failed, transcribe + record whichever started. @@ -233,6 +238,7 @@ pub fn start_meeting( "me", a, Some(recorder), + true, )); } (None, Some(b)) => { @@ -243,11 +249,16 @@ pub fn start_meeting( "them", b, Some(recorder), + true, )); } (None, None) => { // No capture at all — without this the meeting would sit in - // "recording" forever with an empty transcript. + // "recording" forever with an empty transcript. Release the + // mic claim too: nothing is capturing, so voice typing / the + // mic test shouldn't stay locked out until the frontend + // reacts to the error (its stop_meeting is a no-op by then). + coord.stop(MicUser::Meeting); log::error!("meeting: no audio source could be started"); let _ = app.emit( "meeting://error", @@ -262,15 +273,29 @@ pub fn start_meeting( } else { // No diarization → two sessions. Record the mic only (mixing two // un-aligned streams into one file would garble it); see plan note. - if let Some(rx) = spawn_capture(&state.threads, mic, running.clone(), "me") { + let mut tasks = state.tasks.lock().unwrap(); + if let Ok(rx) = spawn_capture(&coord, MicUser::Meeting, mic, gate.clone(), "me") { let rx = spawn_mic_prosody_tap(&app, rx); - let task = - run_metered_session(&app, provider, make_config(), "me", rx, Some(recorder)); - state.tasks.lock().unwrap().push(task); + tasks.push(run_metered_session( + &app, + provider, + make_config(), + "me", + rx, + Some(recorder), + true, + )); } - if let Some(rx) = spawn_capture(&state.threads, sys, running.clone(), "them") { - let task = run_metered_session(&app, provider, make_config(), "them", rx, None); - state.tasks.lock().unwrap().push(task); + if let Ok(rx) = spawn_capture(&coord, MicUser::Meeting, sys, gate.clone(), "them") { + tasks.push(run_metered_session( + &app, + provider, + make_config(), + "them", + rx, + None, + true, + )); } } } @@ -280,9 +305,17 @@ pub fn start_meeting( let mic = Microphone { device_name: input_device, }; - if let Some(rx) = spawn_capture(&state.threads, mic, running.clone(), "me") { + if let Ok(rx) = spawn_capture(&coord, MicUser::Meeting, mic, gate.clone(), "me") { let rx = spawn_mic_prosody_tap(&app, rx); - let task = run_metered_session(&app, provider, make_config(), "me", rx, Some(recorder)); + let task = run_metered_session( + &app, + provider, + make_config(), + "me", + rx, + Some(recorder), + true, + ); state.tasks.lock().unwrap().push(task); } } @@ -292,22 +325,19 @@ pub fn start_meeting( } #[tauri::command] -pub fn stop_meeting(app: AppHandle, state: State) -> Result<(), String> { - state.running.store(false, Ordering::SeqCst); - // Tell THIS session's capture threads to exit. They watch the per-session gate, - // not the global `running`; `take()` so the next meeting installs a fresh one and - // can never reuse/revive this session's gate. - if let Some(gate) = state.capture_gate.lock().unwrap().take() { - gate.store(false, Ordering::SeqCst); - } - - // Direct-cancel safety net for the transcription sessions. Clearing the gate - // above starts the graceful capture→channel-close cascade, which closes the - // socket and emits final usage. The abort is ONLY a backstop for a socket that - // never closes (a stalled provider/network): wait a generous grace so even a slow - // finalize completes first — aborting an already-finished task is a no-op, and - // after stop no new audio flows, so the extra idle wait produces no new transcript. - // (A short grace would cut a slow finalize and lose the last segment + usage event.) +pub fn stop_meeting( + app: AppHandle, + state: State, + coord: State, +) -> Result<(), String> { + // Direct-cancel safety net for the transcription sessions. Stopping the + // capture below starts the graceful capture→channel-close cascade, which + // closes the socket and emits final usage. The abort is ONLY a backstop for a + // socket that never closes (a stalled provider/network): wait a generous grace + // so even a slow finalize completes first — aborting an already-finished task + // is a no-op, and after stop no new audio flows, so the extra idle wait + // produces no new transcript. (A short grace would cut a slow finalize and + // lose the last segment + usage event.) let tasks: Vec> = state.tasks.lock().unwrap().drain(..).collect(); if !tasks.is_empty() { @@ -319,25 +349,10 @@ pub fn stop_meeting(app: AppHandle, state: State) -> Result<(), St }); } - // Joining lets each capture thread release its device (drops its PCM sender, - // which in turn ends the Soniox session via channel close). Capture threads - // self-exit within ~100 ms of the gate clearing, but bound the wait so a wedged - // thread (e.g. a stuck CoreAudio teardown) can't hang the stop command itself; - // the session WebSocket is force-closed separately by the watchdog above. A - // detached wedged thread is harmless: it holds a now-dead per-session gate, so a - // later meeting (fresh gate) never adopts or revives it. - let handles: Vec> = state.threads.lock().unwrap().drain(..).collect(); - let deadline = std::time::Instant::now() + std::time::Duration::from_millis(1500); - for h in handles { - while !h.is_finished() && std::time::Instant::now() < deadline { - std::thread::sleep(std::time::Duration::from_millis(10)); - } - if h.is_finished() { - let _ = h.join(); - } else { - log::warn!("stop_meeting: capture thread didn't exit within grace; detaching"); - } - } + // Clear this session's gate and join its capture threads (bounded grace) so + // each releases its device — dropping its PCM sender, which ends the STT + // session via channel close. See MicCoordinator for the gate/detach story. + coord.stop(MicUser::Meeting); let _ = app.emit("meeting://status", "stopped"); // Encode the captured audio off-thread and tell the frontend where it landed @@ -486,7 +501,7 @@ pub fn start_oauth_loopback(app: AppHandle) -> Result { .next() .and_then(|l| l.split_whitespace().nth(1)) .unwrap_or(""); - let query = path.splitn(2, '?').nth(1).unwrap_or(""); + let query = path.split_once('?').map_or("", |(_, q)| q); let (mut token, mut error) = (None::, None::); for pair in query.split('&') { let mut kv = pair.splitn(2, '='); @@ -529,27 +544,6 @@ pub fn start_oauth_loopback(app: AppHandle) -> Result { Ok(port) } -/// Start one capture backend on its own thread, returning the PCM receiver. -/// Returns `None` if the device failed to start. -fn spawn_capture( - threads: &Mutex>>, - source: S, - running: Arc, - label: &'static str, -) -> Option>> { - let (tx, rx) = tokio::sync::mpsc::unbounded_channel::>(); - match source.start(tx, running) { - Ok(handle) => { - threads.lock().unwrap().push(handle); - Some(rx) - } - Err(e) => { - log::error!("[{label}] capture failed to start: {e}"); - None - } - } -} - /// Tee the mic ("me") PCM through a [`ProsodyAnalyzer`](crate::audio::prosody::ProsodyAnalyzer) /// for live delivery coaching, forwarding every chunk untouched downstream. /// Mirrors the sample `counter` in [`run_metered_session`]: one extra `Vec` move @@ -576,78 +570,6 @@ fn spawn_mic_prosody_tap( out_rx } -/// Run a transcription session over `rx`, counting the audio streamed so the -/// frontend can bill it. Emits a `usage://stt` event when the session ends. -/// When `recorder` is `Some`, every chunk is also appended to it so the meeting -/// can be saved to history (only the designated session passes a recorder). -fn run_metered_session( - app: &AppHandle, - provider: SttProvider, - config: TranscribeConfig, - label: &'static str, - rx: UnboundedReceiver>, - recorder: Option, -) -> tauri::async_runtime::JoinHandle<()> { - let app = app.clone(); - tauri::async_runtime::spawn(async move { - // Interpose a sample counter between capture and the STT adapter: it - // forwards every chunk untouched, then yields the total once the input - // closes so we can bill the audio duration actually streamed. - let (count_tx, count_rx) = tokio::sync::mpsc::unbounded_channel::>(); - let counter = tauri::async_runtime::spawn(async move { - let mut rx = rx; - let mut samples: u64 = 0; - while let Some(chunk) = rx.recv().await { - samples += chunk.len() as u64; - // Tee into the recording buffer (kept while the meeting is armed). - if let Some(rec) = &recorder { - if let Some(buf) = rec.lock().unwrap().as_mut() { - buf.extend_from_slice(&chunk); - } - } - if count_tx.send(chunk).is_err() { - break; - } - } - samples - }); - - if let Err(e) = - transcription::run_session(provider, app.clone(), config, label, count_rx).await - { - let msg = e.to_string(); - log::warn!("[stt:{label}] session ended: {e}"); - // Surface the failure to the UI instead of silently leaving the - // meeting in "recording" with no transcript. Hosted mode hits this - // routinely (402 out of credits / 401 expired session at connect); - // BYOK hits it on a bad key. Classify so the frontend can show an - // actionable message. - let code = if msg.contains("402") { - "quota" - } else if msg.contains("401") { - "auth" - } else { - "connect" - }; - let _ = app.emit( - "meeting://error", - serde_json::json!({ "source": label, "code": code, "message": msg }), - ); - } - - let samples = counter.await.unwrap_or(0); - let seconds = samples as f64 / crate::audio::TARGET_SAMPLE_RATE as f64; - let _ = app.emit( - "usage://stt", - serde_json::json!({ - "provider": provider.id(), - "source": label, - "seconds": seconds, - }), - ); - }) -} - /// Get the absolute path to the templates.json file. #[tauri::command] pub fn get_templates_path(app: AppHandle) -> Result { @@ -690,36 +612,25 @@ pub fn read_log_tail(app: AppHandle, max_bytes: u64) -> Result { /// Path to the live session snapshot the built-in MCP server reads. pub(crate) fn session_path(app: &AppHandle) -> Result { - let dir = app.path().app_config_dir().map_err(|e| e.to_string())?; - Ok(dir.join("session.json")) + app_config_file(app, "session.json") } /// Write the live session snapshot (meeting status, transcript, todos, /// evaluations) so MCP clients can read the current meeting state. #[tauri::command] pub fn write_session(app: AppHandle, json: String) -> Result<(), String> { - let path = session_path(&app)?; - if let Some(parent) = path.parent() { - std::fs::create_dir_all(parent).map_err(|e| e.to_string())?; - } - std::fs::write(&path, json).map_err(|e| e.to_string())?; - Ok(()) + write_config_file(&app, "session.json", &json) } /// Append-only queue of mutation commands the MCP server writes and the /// frontend applies (add/check/remove todo, add/remove evaluation). pub(crate) fn session_commands_path(app: &AppHandle) -> Result { - let dir = app.path().app_config_dir().map_err(|e| e.to_string())?; - Ok(dir.join("session_commands.jsonl")) + app_config_file(app, "session_commands.jsonl") } /// Read the pending session-command queue (one JSON object per line). The /// frontend polls this and applies commands it hasn't seen yet. #[tauri::command] pub fn read_session_commands(app: AppHandle) -> Result { - let path = session_commands_path(&app)?; - match std::fs::read_to_string(&path) { - Ok(s) => Ok(s), - Err(_) => Ok(String::new()), - } + read_config_file(&app, "session_commands.jsonl") } diff --git a/src-tauri/src/diarize.rs b/src-tauri/src/diarize.rs index ee0f47b..3a26797 100644 --- a/src-tauri/src/diarize.rs +++ b/src-tauri/src/diarize.rs @@ -244,6 +244,10 @@ fn diarize_cache_path(app: &AppHandle, key: &str) -> Option { ) } +/// One embedded slice: the segment's index paired with its speaker-embedding +/// vector (workers return these out of order; the index restores placement). +type IndexedEmbedding = (usize, Vec); + /// Decode → embed every long-enough slice → cluster → assign one speaker per /// segment. Synchronous; intended for `spawn_blocking`. fn run_pipeline( @@ -297,14 +301,14 @@ fn run_pipeline( let processed = AtomicUsize::new(0); // completed count, for progress only let mut embeds: Vec>> = vec![None; n]; - let worker_results: Vec)>>> = std::thread::scope(|scope| { + let worker_results: Vec>> = std::thread::scope(|scope| { let handles: Vec<_> = (0..workers) .map(|_| { let (spans, audio, model, next, processed, app) = (&spans, &audio, &model, &next, &processed, &app); - scope.spawn(move || -> Result)>> { + scope.spawn(move || -> Result> { let mut session = build_session(model)?; - let mut out: Vec<(usize, Vec)> = Vec::new(); + let mut out: Vec = Vec::new(); loop { // Claim the next segment; stop when the queue is drained. let i = next.fetch_add(1, Ordering::Relaxed); diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index 8d3c2bb..f2a2c72 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -1,4 +1,5 @@ mod audio; +mod capture; mod commands; mod diarize; mod history; @@ -16,8 +17,8 @@ use tauri::{Emitter, Manager}; use tauri_plugin_global_shortcut::{Code, Modifiers, Shortcut, ShortcutState}; use tauri_plugin_log::{RotationStrategy, Target, TargetKind, TimezoneStrategy}; +use capture::MicCoordinator; use commands::MeetingState; -use voice_typing::VoiceTypingState; #[cfg_attr(mobile, tauri::mobile_entry_point)] pub fn run() { @@ -67,10 +68,12 @@ pub fn run() { .plugin(tauri_plugin_dialog::init()) .plugin(tauri_plugin_process::init()) .plugin(tauri_plugin_updater::Builder::new().build()) + // Single source of truth for "who owns the mic" (meeting / mic test / + // voice typing) — guarantees at most one live capture session. + .manage(MicCoordinator::default()) .manage(MeetingState::default()) - .manage(VoiceTypingState::default()) // Native menu-bar "Diagnostics" submenu (View Logs + Clear Cache). - .menu(|handle| menu::build(handle)) + .menu(menu::build) .on_menu_event(|app, event| menu::on_event(app, event.id().as_ref())) .setup(|app| { app.manage(mcp::start(app.handle().clone())); diff --git a/src-tauri/src/voice_typing.rs b/src-tauri/src/voice_typing.rs index b010418..6fbf609 100644 --- a/src-tauri/src/voice_typing.rs +++ b/src-tauri/src/voice_typing.rs @@ -12,28 +12,25 @@ // The `objc` 0.2 macros emit `cfg(cargo-clippy)` checks newer compilers warn on. #![allow(unexpected_cfgs)] -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex}; -use std::thread::JoinHandle; - -use tauri::{AppHandle, Emitter, Manager, State}; - -use crate::audio::{microphone::Microphone, AudioSource}; -use crate::transcription::{self, SttProvider, TranscribeConfig}; - -/// Shared state for the single in-flight voice-typing session. `running` gates -/// the mic capture thread; clearing it ends capture, which closes the STT WS. -#[derive(Default)] -pub struct VoiceTypingState { - running: Arc, - threads: Mutex>>, -} - -/// Start a mic-only streaming transcription. Idempotent while already running. +use tauri::{AppHandle, State}; + +use crate::audio::microphone::Microphone; +use crate::capture::{run_metered_session, spawn_capture, Begin, MicCoordinator, MicUser}; +use crate::commands::{read_config_file, write_config_file}; +use crate::transcription::{SttProvider, TranscribeConfig}; + +/// Start a mic-only streaming transcription. Idempotent while already running; +/// refused while a meeting owns the mic (a second input stream could kill the +/// meeting's capture — see [`MicCoordinator`]). +/// +/// Streams mic -> provider, emitting `transcript://segment` + `audio://level` +/// with source "voice-typing"; the overlay window listens for both. The shared +/// metered session bills the streamed audio under the distinct +/// `source: "voice-typing"` label (kept separate from meeting usage). #[tauri::command] pub fn start_voice_typing( app: AppHandle, - state: State, + coord: State, provider: String, api_key: String, model: Option, @@ -44,10 +41,11 @@ pub fn start_voice_typing( if api_key.trim().is_empty() { return Err("missing transcription API key".into()); } - if state.running.swap(true, Ordering::SeqCst) { - return Ok(()); - } - let running = state.running.clone(); + let gate = match coord.begin(MicUser::VoiceTyping) { + Begin::Started(gate) => gate, + Begin::AlreadyActive => return Ok(()), + Begin::Busy(_) => return Err("microphone is in use by the meeting".into()), + }; let model = model .filter(|m| !m.trim().is_empty()) .unwrap_or_else(|| provider.default_model().to_string()); @@ -60,69 +58,29 @@ pub fn start_voice_typing( relay_endpoint: None, }; - let (tx, rx) = tokio::sync::mpsc::unbounded_channel::>(); - match (Microphone { + let mic = Microphone { device_name: input_device, - }) - .start(tx, running.clone()) - { - Ok(handle) => state.threads.lock().unwrap().push(handle), + }; + let rx = match spawn_capture(&coord, MicUser::VoiceTyping, mic, gate, "voice-typing") { + Ok(rx) => rx, Err(e) => { - state.running.store(false, Ordering::SeqCst); + coord.stop(MicUser::VoiceTyping); return Err(format!("microphone failed to start: {e}")); } - } - - // Stream mic -> provider. Emits `transcript://segment` + `audio://level` - // with source "voice-typing"; the overlay window listens for both. A sample - // counter is interposed so we can bill the streamed audio under a distinct - // `source: "voice-typing"` label (kept separate from meeting usage). - tauri::async_runtime::spawn(async move { - let (count_tx, count_rx) = tokio::sync::mpsc::unbounded_channel::>(); - let counter = tauri::async_runtime::spawn(async move { - let mut rx = rx; - let mut samples: u64 = 0; - while let Some(chunk) = rx.recv().await { - samples += chunk.len() as u64; - if count_tx.send(chunk).is_err() { - break; - } - } - samples - }); - - if let Err(e) = - transcription::run_session(provider, app.clone(), config, "voice-typing", count_rx) - .await - { - log::warn!("voice-typing: session ended: {e}"); - } - - let samples = counter.await.unwrap_or(0); - let seconds = samples as f64 / crate::audio::TARGET_SAMPLE_RATE as f64; - let _ = app.emit( - "usage://stt", - serde_json::json!({ - "provider": provider.id(), - "source": "voice-typing", - "seconds": seconds, - }), - ); - }); + }; + run_metered_session(&app, provider, config, "voice-typing", rx, None, false); Ok(()) } -/// Path to the voice-typing history file (one JSON object per line). -fn voice_history_path(app: &AppHandle) -> Result { - let dir = app.path().app_config_dir().map_err(|e| e.to_string())?; - Ok(dir.join("voice_typing_history.jsonl")) -} +/// Name of the voice-typing history file (one JSON object per line) in the app +/// config dir. +const HISTORY_FILE: &str = "voice_typing_history.jsonl"; /// Append one history entry (a JSON line: `{ id, text, ts }`). #[tauri::command] pub fn append_voice_history(app: AppHandle, line: String) -> Result<(), String> { use std::io::Write; - let path = voice_history_path(&app)?; + let path = crate::commands::app_config_file(&app, HISTORY_FILE)?; if let Some(parent) = path.parent() { std::fs::create_dir_all(parent).map_err(|e| e.to_string())?; } @@ -138,31 +96,21 @@ pub fn append_voice_history(app: AppHandle, line: String) -> Result<(), String> /// the JSONL and owns filtering for delete/clear (which write the file back). #[tauri::command] pub fn read_voice_history(app: AppHandle) -> Result { - match std::fs::read_to_string(voice_history_path(&app)?) { - Ok(s) => Ok(s), - Err(_) => Ok(String::new()), - } + read_config_file(&app, HISTORY_FILE) } /// Overwrite the history file (used by delete-one / clear-all). #[tauri::command] pub fn write_voice_history(app: AppHandle, content: String) -> Result<(), String> { - let path = voice_history_path(&app)?; - if let Some(parent) = path.parent() { - std::fs::create_dir_all(parent).map_err(|e| e.to_string())?; - } - std::fs::write(&path, content).map_err(|e| e.to_string()) + write_config_file(&app, HISTORY_FILE, &content) } -/// Stop the session: clear `running` and join the mic thread (which drops its -/// PCM sender, closing the STT session cleanly). +/// Stop the session: clear its gate and join the mic thread with a bounded +/// grace (which drops its PCM sender, closing the STT session cleanly). No-op +/// if voice typing doesn't own the mic. #[tauri::command] -pub fn stop_voice_typing(state: State) { - state.running.store(false, Ordering::SeqCst); - let handles: Vec> = state.threads.lock().unwrap().drain(..).collect(); - for h in handles { - let _ = h.join(); - } +pub fn stop_voice_typing(coord: State) { + coord.stop(MicUser::VoiceTyping); } /// Copy text to the system clipboard via the native pasteboard. Needed because