Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 164 additions & 39 deletions backend/src/handlers/recording.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,43 +221,87 @@ pub async fn add_marker(
///
/// Stop the current recording session and upload raw recording + markers to R2.
/// Also stops the stream from tee-ing audio chunks.
///
/// Recording is normally finalized automatically by the stream lifecycle (see
/// [`finalize_and_upload`]); this endpoint remains for explicit/manual stops.
pub async fn stop_recording(
State(state): State<Arc<AppState>>,
State(recording_manager): State<SharedRecordingManager>,
State(stream_state): State<SharedStreamState>,
State(_recording_manager): State<SharedRecordingManager>,
State(_stream_state): State<SharedStreamState>,
headers: HeaderMap,
) -> Result<impl IntoResponse> {
// Require admin authentication
let _user = require_admin(&state, &headers).await?;

// Stop the stream recording first (flushes and closes the file)
{
let mut stream = stream_state.lock().await;
match finalize_and_upload(&state).await? {
Some(result) => Ok((
StatusCode::OK,
Json(StopRecordingResponse {
success: true,
message: format!("Recording stopped and uploaded for show {}", result.show_id),
show_id: result.show_id,
version: result.version,
marker_count: result.marker_count,
raw_key: Some(result.raw_key),
markers_key: Some(result.markers_key),
}),
)),
None => Err(AppError::BadRequest(
"No recording session was active".to_string(),
)),
}
}

/// Outcome of finalizing a recording session.
pub struct FinalizedRecording {
pub show_id: i64,
pub version: String,
pub marker_count: usize,
pub raw_key: String,
pub markers_key: String,
/// True if a write failure occurred mid-recording, so the raw archive is
/// incomplete (the DB version is marked `failed`).
pub incomplete: bool,
}

/// Stop the active recording (if any), upload the raw recording + markers to R2,
/// and record a `recording_versions` row.
///
/// This is the single source of truth for ending a recording. It is invoked both
/// by the explicit stop endpoint and by the stream lifecycle (auto-record on
/// go-live, grace-period finalize on disconnect). Returns `Ok(None)` when no
/// session was active, so callers can treat a no-op as success.
pub async fn finalize_and_upload(state: &Arc<AppState>) -> Result<Option<FinalizedRecording>> {
// Stop the stream tee first (flushes + closes the file) and capture whether
// a write failure had already abandoned the recording.
let write_failure = {
let mut stream = state.stream_state.lock().await;
let failure = stream.recording_failure().map(|s| s.to_string());
if let Err(e) = stream.stop_recording().await {
tracing::warn!("Error stopping stream recording: {}", e);
// Continue anyway - we still want to process the session
// Continue anyway - we still want to process the session.
}
}
failure
};

// Stop the recording session
let mut manager = recording_manager.lock().await;
let session = manager
.stop()
.await
.map_err(|e| AppError::Internal(e.to_string()))?;
// Stop the recording session.
let session = {
let mut manager = state.recording_manager.lock().await;
manager
.stop()
.await
.map_err(|e| AppError::Internal(e.to_string()))?
};

let session = match session {
Some(s) => s,
None => {
return Err(AppError::BadRequest(
"No recording session was active".to_string(),
));
}
None => return Ok(None),
};

let show_id = session.show_id;
let version = session.version_timestamp.clone();
let marker_count = session.markers.len();
let incomplete = write_failure.is_some();

// Upload raw recording to R2
let raw_key = format!("recordings/{}/{}/raw.webm", show_id, version);
Expand Down Expand Up @@ -300,7 +344,7 @@ pub async fn stop_recording(
tracing::info!("Uploaded markers to {}", markers_key);

// Create recording version entry in database
if let Err(e) = crate::db::create_recording_version(
match crate::db::create_recording_version(
&state.db,
show_id,
&version,
Expand All @@ -310,33 +354,114 @@ pub async fn stop_recording(
)
.await
{
tracing::error!("Failed to create recording version in database: {}", e);
// Don't fail the request - the recording was uploaded successfully
} else {
tracing::info!(
"Created recording version in database: show_id={}, version={}",
show_id,
version
);
Ok(recording) => {
tracing::info!(
"Created recording version in database: show_id={}, version={}",
show_id,
version
);
// Mark incomplete recordings as failed so the operator sees the gap.
if let Some(ref err) = write_failure {
let msg = format!("Recording write failed mid-stream: {}", err);
tracing::error!(
"Marking recording version {} as failed: {}",
recording.id,
msg
);
if let Err(e) = crate::db::update_recording_version_status(
&state.db,
recording.id,
"failed",
Some(&msg),
)
.await
{
tracing::error!("Failed to mark recording version as failed: {}", e);
}
}
}
Err(e) => {
tracing::error!("Failed to create recording version in database: {}", e);
// Don't fail the finalize - the recording was uploaded successfully.
}
}

// Clean up temp file
if let Err(e) = tokio::fs::remove_file(&session.temp_file_path).await {
tracing::warn!("Failed to clean up temp file: {}", e);
}

Ok((
StatusCode::OK,
Json(StopRecordingResponse {
success: true,
message: format!("Recording stopped and uploaded for show {}", show_id),
show_id,
version,
marker_count,
raw_key: Some(raw_key),
markers_key: Some(markers_key),
}),
))
Ok(Some(FinalizedRecording {
show_id,
version,
marker_count,
raw_key,
markers_key,
incomplete,
}))
}

/// Ensure a recording session is running for `show_id`, idempotently.
///
/// Called when a live stream connects (auto-record on go-live). Safe to call on
/// every (re)connect:
/// - If already recording this show, it's a no-op (a transient WS reconnect must
/// NOT restart the archive or truncate the tee file).
/// - If recording a *different* show, the existing session is left untouched and
/// a warning is logged (the caller decides whether to take over).
/// - Otherwise a new session is started and the stream tee is pointed at it.
pub async fn ensure_recording_started(state: &Arc<AppState>, show_id: i64) -> Result<()> {
// Validate the show exists before recording for it.
let show: Option<models::Show> = sqlx::query_as("SELECT * FROM shows WHERE id = ?")
.bind(show_id)
.fetch_optional(&state.db)
.await?;
if show.is_none() {
tracing::warn!(
"Auto-record skipped: show {} not found (stream will continue unrecorded)",
show_id
);
return Ok(());
}

let temp_path = {
let mut manager = state.recording_manager.lock().await;
match manager.current_show_id() {
Some(current) if current == show_id => {
// Already recording this show (reconnect) — keep the existing tee.
return Ok(());
}
Some(current) => {
tracing::warn!(
"Stream for show {} connected while recording show {}; leaving existing recording untouched",
show_id,
current
);
return Ok(());
}
None => {
manager
.start(show_id)
.await
.map_err(|e| AppError::Internal(e.to_string()))?;
manager.get_temp_file_path().ok_or_else(|| {
AppError::Internal(
"Recording session started but no temp file path available".to_string(),
)
})?
}
}
};

// Point the stream tee at the new recording file.
let mut stream = state.stream_state.lock().await;
stream
.start_recording(temp_path)
.await
.map_err(|e| AppError::Internal(format!("Failed to start stream recording: {}", e)))?;

tracing::info!("Auto-started recording for show {}", show_id);
Ok(())
}

/// GET /api/shows/:id/recordings
Expand Down
Loading
Loading