diff --git a/src/batch_queue/mod.rs b/src/batch_queue/mod.rs index 2cd5942..3f60527 100644 --- a/src/batch_queue/mod.rs +++ b/src/batch_queue/mod.rs @@ -727,6 +727,7 @@ impl BatchQueue { &self, pool: &sqlx::PgPool, replay_storage: Option<&crate::replay_storage::ReplayStorage>, + replay_coalescer: Option<&crate::replay_coalescer::ReplayCoalescer>, ) { match self.backup_store.cleanup_stale_requests().await { Ok(count) if count > 0 => { @@ -765,8 +766,14 @@ impl BatchQueue { } for (id, request) in requests { - let result = - super::handler::process_failed_request(self, pool, replay_storage, &request).await; + let result = super::handler::process_failed_request( + self, + pool, + replay_storage, + replay_coalescer, + &request, + ) + .await; match result { Ok(()) => { diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 344b363..d5467ed 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -145,6 +145,8 @@ pub struct IpRule { #[derive(Clone)] pub struct ProjectContext { pub project_id: Uuid, + pub replay_storage_generation: i32, + pub replay_storage_active: bool, /// The user ID to bill — either the owner_id directly (if it's a user) /// or the org owner's user_id (if owner_id is an organization). pub billing_customer_id: String, @@ -167,6 +169,7 @@ pub async fn load_project_context( let rows = sqlx::query( r#" SELECT p.id, p.owner_id, p.allowed_hostnames, p.error_tracking_enabled, p.cookieless_mode, + p.replay_storage_generation, p.replay_storage_state::text AS replay_storage_state, o.id AS organization_id, m.user_id AS org_owner_user_id, d.reference_id, d.name, d.data_type::text, d.regex, d.allow_negative, @@ -228,6 +231,8 @@ pub async fn load_project_context( let ctx = Arc::new(ProjectContext { project_id: first.get("id"), + replay_storage_generation: first.get("replay_storage_generation"), + replay_storage_active: first.get::("replay_storage_state") == "active", billing_customer_id, organization_id, allowed_hostnames: first @@ -565,6 +570,7 @@ pub async fn process_failed_request( batch_queue: &BatchQueue, pool: &sqlx::PgPool, replay_storage: Option<&crate::replay_storage::ReplayStorage>, + replay_coalescer: Option<&crate::replay_coalescer::ReplayCoalescer>, request: &FailedRequest, ) -> Result<(), String> { match request.request_type { @@ -574,7 +580,7 @@ pub async fn process_failed_request( process_vitals_request(batch_queue, pool, replay_storage, request).await } RequestType::Replay => { - process_replay_request(batch_queue, pool, replay_storage, request).await + process_replay_request(batch_queue, pool, replay_coalescer, request).await } } } @@ -784,13 +790,15 @@ async fn process_web_request( }) }); - if let Some(session_id) = parsed.session_id.as_deref() + if ctx.replay_storage_active + && let Some(session_id) = parsed.session_id.as_deref() && let Some(replay_storage) = replay_storage && let Err(error) = replay_storage .record_filter_event( pool, crate::replay_storage::ReplayFilterEventInput { project_id: ctx.project_id, + storage_generation: ctx.replay_storage_generation, session_id, window_id: parsed.window_id.as_deref().unwrap_or(session_id), identifier: Some(fallback_identity.as_str()), @@ -948,7 +956,7 @@ async fn process_vitals_request( async fn process_replay_request( batch_queue: &BatchQueue, pool: &sqlx::PgPool, - replay_storage: Option<&crate::replay_storage::ReplayStorage>, + replay_coalescer: Option<&crate::replay_coalescer::ReplayCoalescer>, request: &FailedRequest, ) -> Result<(), String> { use crate::handler::replay::ReplayRequest; @@ -962,6 +970,7 @@ async fn process_replay_request( view_id, session_start, is_final, + flush_reason, batch_id, sequence, url, @@ -975,8 +984,11 @@ async fn process_replay_request( .await .map_err(|_| "Unauthorized or database error")?; - let replay_storage = - replay_storage.ok_or_else(|| "Replay storage is not configured".to_string())?; + let replay_coalescer = + replay_coalescer.ok_or_else(|| "Replay storage is not configured".to_string())?; + if !ctx.replay_storage_active { + return Err("Replay storage is resetting".to_string()); + } let server_id = match ctx.cookieless_mode { Some(true) => { let ip = request.client_ip.as_deref().unwrap_or(""); @@ -1008,23 +1020,26 @@ async fn process_replay_request( return Err("No valid events".to_string()); } - replay_storage - .store_replay_chunk( - pool, - crate::replay_storage::ReplayChunkInput { - project_id: ctx.project_id, - session_id: session_id.clone(), - window_id, - view_id, - session_start_ms: session_start.and_then(|value| i64::try_from(value).ok()), - is_final, - batch_id, - sequence, - identifier: Some(server_id.to_string()), - url: Some(url), - events, - }, - ) + replay_coalescer + .ingest(crate::replay_storage::ReplayChunkInput { + project_id: ctx.project_id, + storage_generation: ctx.replay_storage_generation, + session_id: session_id.clone(), + window_id, + view_id, + session_start_ms: session_start.and_then(|value| i64::try_from(value).ok()), + is_final, + flush_reason, + batch_id, + sequence, + first_sequence: None, + last_sequence: None, + client_batch_count: 1, + approx_events_bytes: request.body.len(), + identifier: Some(server_id.to_string()), + url: Some(url), + events, + }) .await .map_err(|error| error.to_string())?; diff --git a/src/handler/replay.rs b/src/handler/replay.rs index fc1f652..3d5446b 100644 --- a/src/handler/replay.rs +++ b/src/handler/replay.rs @@ -70,6 +70,8 @@ pub(crate) struct ReplayRequest { #[serde(default)] pub(crate) is_final: bool, #[serde(default)] + pub(crate) flush_reason: Option, + #[serde(default)] pub(crate) batch_id: Option, pub(crate) sequence: u64, pub(crate) url: String, @@ -88,6 +90,7 @@ pub async fn replay( Ok(b) => b, Err(e) => return error_response(StatusCode::BAD_REQUEST, &e), }; + let replay_payload_bytes = body.len(); let parsed: ReplayRequest = match serde_json::from_slice(&body) { Ok(p) => p, @@ -107,6 +110,7 @@ pub async fn replay( view_id, session_start, is_final, + flush_reason, batch_id, sequence, url, @@ -188,7 +192,14 @@ pub async fn replay( return error_response(StatusCode::BAD_REQUEST, "No valid events"); } - let Some(replay_storage) = state.replay_storage.as_deref() else { + if !context.replay_storage_active { + return error_response( + StatusCode::SERVICE_UNAVAILABLE, + "Replay storage is resetting", + ); + } + + let Some(replay_coalescer) = state.replay_coalescer.as_deref() else { return error_response( StatusCode::SERVICE_UNAVAILABLE, "Replay storage is not configured", @@ -201,23 +212,26 @@ pub async fn replay( organization_id: context.organization_id.as_deref().map(Into::into), }; - match replay_storage - .store_replay_chunk( - &state.pool, - crate::replay_storage::ReplayChunkInput { - project_id: context.project_id, - session_id: session_id.clone(), - window_id, - view_id, - session_start_ms: session_start.and_then(|value| i64::try_from(value).ok()), - is_final, - batch_id, - sequence, - identifier: Some(server_id.to_string()), - url: Some(url), - events, - }, - ) + match replay_coalescer + .ingest(crate::replay_storage::ReplayChunkInput { + project_id: context.project_id, + storage_generation: context.replay_storage_generation, + session_id: session_id.clone(), + window_id, + view_id, + session_start_ms: session_start.and_then(|value| i64::try_from(value).ok()), + is_final, + flush_reason, + batch_id, + sequence, + first_sequence: None, + last_sequence: None, + client_batch_count: 1, + approx_events_bytes: replay_payload_bytes, + identifier: Some(server_id.to_string()), + url: Some(url), + events, + }) .await { Ok(()) => { diff --git a/src/handler/web.rs b/src/handler/web.rs index c7a0ac8..cd18be4 100644 --- a/src/handler/web.rs +++ b/src/handler/web.rs @@ -221,13 +221,15 @@ pub async fn web( let error_v3_context = should_process_errors .then(|| request_context(context, || web_context(&event_row, &properties))); - if let Some(session_id) = session_id.as_deref() + if ctx.replay_storage_active + && let Some(session_id) = session_id.as_deref() && let Some(replay_storage) = state.replay_storage.as_deref() && let Err(error) = replay_storage .record_filter_event( &state.pool, crate::replay_storage::ReplayFilterEventInput { project_id: ctx.project_id, + storage_generation: ctx.replay_storage_generation, session_id, window_id: window_id.as_deref().unwrap_or(session_id), identifier: Some(fallback_identity.as_str()), diff --git a/src/main.rs b/src/main.rs index 72e9672..2388929 100644 --- a/src/main.rs +++ b/src/main.rs @@ -26,6 +26,7 @@ mod handler; mod identity; mod models; mod polar; +mod replay_coalescer; mod replay_storage; mod tinybird; pub mod ua_parser; @@ -136,6 +137,12 @@ async fn main() { panic!("Invalid replay storage configuration: {}", error); } }; + let replay_coalescer = replay_storage + .as_ref() + .map(|storage| replay_coalescer::ReplayCoalescer::new(Arc::clone(storage), pool.clone())); + if replay_coalescer.is_some() { + info!("Replay coalescer enabled"); + } let recorder_handle = setup_metrics_recorder(); @@ -157,9 +164,15 @@ async fn main() { pool: pool.clone(), batch_queue: Arc::clone(&batch_queue), replay_storage: replay_storage.clone(), + replay_coalescer: replay_coalescer.clone(), }; - start_failed_request_replayer(pool, Arc::clone(&batch_queue), replay_storage); + start_failed_request_replayer( + pool.clone(), + Arc::clone(&batch_queue), + replay_storage, + replay_coalescer.clone(), + ); let cors = CorsLayer::new() .allow_origin(AllowOrigin::mirror_request()) @@ -222,6 +235,11 @@ async fn main() { .expect("Server error"); info!("Shutting down, flushing in-memory batch..."); + if let Some(coalescer) = replay_coalescer.as_ref() + && let Err(error) = coalescer.flush_all().await + { + warn!("Failed to flush replay coalescer on shutdown: {}", error); + } batch_queue.flush_in_memory_batch().await; info!("Shutdown complete"); } @@ -230,6 +248,7 @@ fn start_failed_request_replayer( pool: sqlx::PgPool, batch_queue: Arc, replay_storage: Option>, + replay_coalescer: Option>, ) { tokio::spawn(async move { let replay_interval = std::time::Duration::from_secs(60); @@ -237,7 +256,11 @@ fn start_failed_request_replayer( loop { tokio::time::sleep(replay_interval).await; batch_queue - .replay_failed_requests(&pool, replay_storage.as_deref()) + .replay_failed_requests( + &pool, + replay_storage.as_deref(), + replay_coalescer.as_deref(), + ) .await; } }); diff --git a/src/models.rs b/src/models.rs index 36ce28e..5032b7e 100644 --- a/src/models.rs +++ b/src/models.rs @@ -1,4 +1,5 @@ use crate::batch_queue::BatchQueue; +use crate::replay_coalescer::ReplayCoalescer; use crate::replay_storage::ReplayStorage; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -11,6 +12,7 @@ pub struct AppState { pub pool: PgPool, pub batch_queue: Arc, pub replay_storage: Option>, + pub replay_coalescer: Option>, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/src/replay_coalescer.rs b/src/replay_coalescer.rs new file mode 100644 index 0000000..49cab8b --- /dev/null +++ b/src/replay_coalescer.rs @@ -0,0 +1,377 @@ +use crate::replay_storage::{ReplayChunkInput, ReplayStorage, ReplayStorageError}; +use sqlx::types::Uuid; +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::sync::{mpsc, oneshot}; +use tracing::{error, warn}; + +const DEFAULT_CHANNEL_CAPACITY: usize = 2_000; +const DEFAULT_MAX_AGE: Duration = Duration::from_secs(15); +const DEFAULT_MAX_BYTES: usize = 1024 * 1024; +const DEFAULT_MAX_CHUNKS_PER_WINDOW: usize = 64; +const DEFAULT_MAX_WINDOWS: usize = 10_000; + +#[derive(Clone)] +pub struct ReplayCoalescer { + tx: mpsc::Sender, +} + +enum ReplayCommand { + Ingest(Box), + FlushAll(oneshot::Sender>), +} + +struct ReplayCoalescerActor { + storage: Arc, + pool: sqlx::PgPool, + buffers: HashMap, + max_age: Duration, + max_bytes: usize, + max_chunks_per_window: usize, + max_windows: usize, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct ReplayCoalescerKey { + project_id: Uuid, + storage_generation: i32, + session_id: String, + window_id: String, +} + +struct ReplayBuffer { + chunks: Vec, + first_seen: Instant, + approx_bytes: usize, + batch_ids: HashSet, + sequences: HashSet, +} + +impl ReplayCoalescer { + pub fn new(storage: Arc, pool: sqlx::PgPool) -> Arc { + let (tx, rx) = mpsc::channel(usize_env( + "REPLAY_COALESCE_CHANNEL_CAPACITY", + DEFAULT_CHANNEL_CAPACITY, + )); + + ReplayCoalescerActor { + storage, + pool, + buffers: HashMap::new(), + max_age: duration_env("REPLAY_COALESCE_MAX_AGE_MS", DEFAULT_MAX_AGE), + max_bytes: usize_env("REPLAY_COALESCE_MAX_BYTES", DEFAULT_MAX_BYTES), + max_chunks_per_window: usize_env( + "REPLAY_COALESCE_MAX_CHUNKS_PER_WINDOW", + DEFAULT_MAX_CHUNKS_PER_WINDOW, + ), + max_windows: usize_env("REPLAY_COALESCE_MAX_WINDOWS", DEFAULT_MAX_WINDOWS), + } + .start(rx); + + Arc::new(Self { tx }) + } + + pub async fn ingest(&self, input: ReplayChunkInput) -> Result<(), ReplayStorageError> { + self.tx + .try_send(ReplayCommand::Ingest(Box::new(input))) + .map_err(|error| { + metrics::counter!("replay_coalescer_backpressure_total").increment(1); + ReplayStorageError::Backpressure(error.to_string()) + }) + } + + pub async fn flush_all(&self) -> Result<(), ReplayStorageError> { + let (tx, rx) = oneshot::channel(); + self.tx + .send(ReplayCommand::FlushAll(tx)) + .await + .map_err(|error| ReplayStorageError::Backpressure(error.to_string()))?; + rx.await + .map_err(|error| ReplayStorageError::Backpressure(error.to_string()))? + .map_err(ReplayStorageError::Backpressure) + } +} + +impl ReplayCoalescerActor { + fn start(mut self, mut rx: mpsc::Receiver) { + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(1)); + loop { + tokio::select! { + command = rx.recv() => { + match command { + Some(ReplayCommand::Ingest(input)) => { + if let Err(error) = self.ingest(*input).await { + error!("Failed to coalesce replay chunk: {}", error); + } + } + Some(ReplayCommand::FlushAll(reply)) => { + let result = self.flush_all().await.map_err(|error| error.to_string()); + let _ = reply.send(result); + } + None => { + if let Err(error) = self.flush_all().await { + error!("Failed to flush replay coalescer before shutdown: {}", error); + } + break; + } + } + } + _ = interval.tick() => { + if let Err(error) = self.flush_expired().await { + error!("Failed to flush expired replay buffers: {}", error); + } + } + } + } + }); + } + + async fn ingest(&mut self, input: ReplayChunkInput) -> Result<(), ReplayStorageError> { + let mut flush_buffers = Vec::new(); + let input_key = key(&input); + + if self.buffers.len() >= self.max_windows + && !self.buffers.contains_key(&input_key) + && let Some(oldest_key) = self.oldest_key() + && let Some(buffer) = self.buffers.remove(&oldest_key) + { + warn!("Replay coalescer window limit reached; flushing oldest buffer"); + flush_buffers.push(buffer); + } + + let buffer = self + .buffers + .entry(input_key.clone()) + .or_insert_with(ReplayBuffer::new); + + if !buffer.push(input) { + return Ok(()); + } + + if buffer.should_flush(self.max_age, self.max_bytes, self.max_chunks_per_window) + && let Some(buffer) = self.buffers.remove(&input_key) + { + flush_buffers.push(buffer); + } + + self.flush_buffers(flush_buffers).await + } + + async fn flush_expired(&mut self) -> Result<(), ReplayStorageError> { + let expired_keys = self + .buffers + .iter() + .filter(|(_, buffer)| { + buffer.should_flush(self.max_age, self.max_bytes, self.max_chunks_per_window) + }) + .map(|(key, _)| key.clone()) + .collect::>(); + + let buffers = expired_keys + .into_iter() + .filter_map(|key| self.buffers.remove(&key)) + .collect::>(); + + self.record_buffer_metrics(); + self.flush_buffers(buffers).await + } + + async fn flush_all(&mut self) -> Result<(), ReplayStorageError> { + let buffers = self.buffers.drain().map(|(_, buffer)| buffer).collect(); + self.record_buffer_metrics(); + self.flush_buffers(buffers).await + } + + async fn flush_buffers( + &mut self, + buffers: Vec, + ) -> Result<(), ReplayStorageError> { + for mut buffer in buffers { + if buffer.chunks.is_empty() { + continue; + } + + if !chunks_are_sequence_ordered(&buffer.chunks) { + buffer.chunks.sort_by_key(|chunk| chunk.sequence); + } + let buffer_key = buffer.key(); + let mut bundle = bundle_chunks(buffer.chunks); + + if let Err(error) = self + .storage + .store_replay_chunk(&self.pool, &mut bundle) + .await + { + self.buffers + .entry(buffer_key) + .or_insert_with(|| ReplayBuffer::from_input(bundle)); + return Err(error); + } + + metrics::counter!("replay_coalescer_bundles_flushed_total").increment(1); + metrics::histogram!("replay_coalescer_bundle_client_batches") + .record(bundle.client_batch_count as f64); + metrics::histogram!("replay_coalescer_bundle_approx_bytes") + .record(bundle.approx_events_bytes as f64); + } + + Ok(()) + } + + fn oldest_key(&self) -> Option { + self.buffers + .iter() + .min_by_key(|(_, buffer)| buffer.first_seen) + .map(|(key, _)| key.clone()) + } + + fn record_buffer_metrics(&self) { + metrics::gauge!("replay_coalescer_active_buffers").set(self.buffers.len() as f64); + metrics::gauge!("replay_coalescer_buffered_approx_bytes").set( + self.buffers + .values() + .map(|buffer| buffer.approx_bytes) + .sum::() as f64, + ); + } +} + +impl ReplayBuffer { + fn new() -> Self { + Self { + chunks: Vec::new(), + first_seen: Instant::now(), + approx_bytes: 0, + batch_ids: HashSet::new(), + sequences: HashSet::new(), + } + } + + fn push(&mut self, input: ReplayChunkInput) -> bool { + if let Some(batch_id) = input.batch_id.as_ref() + && !self.batch_ids.insert(batch_id.clone()) + { + return false; + } + if !self.sequences.insert(input.sequence) { + return false; + } + self.approx_bytes = self.approx_bytes.saturating_add(input.approx_events_bytes); + self.chunks.push(input); + true + } + + fn from_input(input: ReplayChunkInput) -> Self { + let mut buffer = Self::new(); + buffer.push(input); + buffer + } + + fn should_flush(&self, max_age: Duration, max_bytes: usize, max_chunks: usize) -> bool { + self.chunks.iter().any(|chunk| chunk.is_final) + || self.first_seen.elapsed() >= max_age + || self.approx_bytes >= max_bytes + || self.chunks.len() >= max_chunks + } + + fn key(&self) -> ReplayCoalescerKey { + key(self.chunks.first().expect("non-empty replay buffer")) + } +} + +fn key(input: &ReplayChunkInput) -> ReplayCoalescerKey { + ReplayCoalescerKey { + project_id: input.project_id, + storage_generation: input.storage_generation, + session_id: input.session_id.clone(), + window_id: input.window_id.clone(), + } +} + +fn bundle_chunks(chunks: Vec) -> ReplayChunkInput { + let first = chunks.first().expect("non-empty replay buffer"); + let project_id = first.project_id; + let storage_generation = first.storage_generation; + let session_id = first.session_id.clone(); + let window_id = first.window_id.clone(); + let view_id = first.view_id.clone(); + let first_sequence = first.sequence; + let last_sequence = chunks.last().map_or(first_sequence, |chunk| chunk.sequence); + let client_batch_count = chunks + .iter() + .filter(|chunk| chunk.batch_id.is_some()) + .count() + .max(1); + + let mut session_start_ms = first.session_start_ms; + let mut events = Vec::new(); + let mut is_final = false; + let mut identifier = None; + let mut url = None; + let mut flush_reasons = Vec::new(); + let mut events_size_bytes = 0usize; + + for mut chunk in chunks { + is_final |= chunk.is_final; + events_size_bytes = events_size_bytes.saturating_add(chunk.approx_events_bytes); + identifier = chunk.identifier.or(identifier); + url = chunk.url.or(url); + session_start_ms = match (session_start_ms, chunk.session_start_ms) { + (Some(left), Some(right)) => Some(left.min(right)), + (None, value) | (value, None) => value, + }; + if let Some(reason) = chunk.flush_reason.take() { + flush_reasons.push(reason); + } + events.append(&mut chunk.events); + } + + ReplayChunkInput { + project_id, + storage_generation, + batch_id: Some(format!( + "bundle:{session_id}:{first_sequence}:{last_sequence}" + )), + session_id, + window_id, + view_id, + session_start_ms, + is_final, + flush_reason: Some(if flush_reasons.is_empty() { + "coalesced".to_string() + } else { + format!("coalesced:{}", flush_reasons.join(",")) + }), + sequence: first_sequence, + first_sequence: Some(first_sequence), + last_sequence: Some(last_sequence), + client_batch_count: i32::try_from(client_batch_count).unwrap_or(i32::MAX), + approx_events_bytes: events_size_bytes, + identifier, + url, + events, + } +} + +fn chunks_are_sequence_ordered(chunks: &[ReplayChunkInput]) -> bool { + chunks + .windows(2) + .all(|pair| pair[0].sequence <= pair[1].sequence) +} + +fn usize_env(name: &str, default: usize) -> usize { + std::env::var(name) + .ok() + .and_then(|value| value.parse().ok()) + .unwrap_or(default) +} + +fn duration_env(name: &str, default: Duration) -> Duration { + std::env::var(name) + .ok() + .and_then(|value| value.parse::().ok()) + .map(Duration::from_millis) + .unwrap_or(default) +} diff --git a/src/replay_storage.rs b/src/replay_storage.rs index 5efeead..6c12b0e 100644 --- a/src/replay_storage.rs +++ b/src/replay_storage.rs @@ -15,19 +15,25 @@ const REPLAY_COMPRESSION_TIMEOUT: Duration = Duration::from_secs(10); #[derive(Clone)] pub struct ReplayStorage { client: Client, - bucket: String, - prefix: String, + bucket_prefix: String, } +#[derive(Clone)] pub struct ReplayChunkInput { pub project_id: Uuid, + pub storage_generation: i32, pub session_id: String, pub window_id: String, pub view_id: Option, pub session_start_ms: Option, pub is_final: bool, + pub flush_reason: Option, pub batch_id: Option, pub sequence: i64, + pub first_sequence: Option, + pub last_sequence: Option, + pub client_batch_count: i32, + pub approx_events_bytes: usize, pub identifier: Option, pub url: Option, pub events: Vec, @@ -35,6 +41,7 @@ pub struct ReplayChunkInput { pub struct ReplayFilterEventInput<'a> { pub project_id: Uuid, + pub storage_generation: i32, pub session_id: &'a str, pub window_id: &'a str, pub identifier: Option<&'a str>, @@ -51,6 +58,7 @@ pub enum ReplayStorageError { Compression(std::io::Error), CompressionTimeout, CompressionTask(String), + Backpressure(String), Upload(String), Database(sqlx::Error), } @@ -70,6 +78,9 @@ impl std::fmt::Display for ReplayStorageError { ReplayStorageError::CompressionTask(error) => { write!(f, "Replay compression task failed: {}", error) } + ReplayStorageError::Backpressure(error) => { + write!(f, "Replay coalescer is overloaded: {}", error) + } ReplayStorageError::Upload(error) => { write!(f, "Failed to upload replay chunk: {}", error) } @@ -123,22 +134,28 @@ struct ReplayRouteMetadata { impl ReplayStorage { pub fn from_env() -> Result, String> { - let bucket = std::env::var("REPLAY_S3_BUCKET").ok(); + let bucket_prefix = std::env::var("REPLAY_S3_BUCKET_PREFIX") + .ok() + .or_else(|| std::env::var("REPLAY_S3_BUCKET").ok()); let endpoint = std::env::var("REPLAY_S3_ENDPOINT").ok(); let access_key = std::env::var("REPLAY_S3_ACCESS_KEY_ID").ok(); let secret_key = std::env::var("REPLAY_S3_SECRET_ACCESS_KEY").ok(); - if bucket.is_none() && endpoint.is_none() && access_key.is_none() && secret_key.is_none() { + if bucket_prefix.is_none() + && endpoint.is_none() + && access_key.is_none() + && secret_key.is_none() + { return Ok(None); } - let bucket = bucket.ok_or_else(|| "REPLAY_S3_BUCKET must be set".to_string())?; + let bucket_prefix = + bucket_prefix.ok_or_else(|| "REPLAY_S3_BUCKET_PREFIX must be set".to_string())?; let access_key = access_key.ok_or_else(|| "REPLAY_S3_ACCESS_KEY must be set".to_string())?; let secret_key = secret_key.ok_or_else(|| "REPLAY_S3_SECRET_KEY must be set".to_string())?; let region = std::env::var("REPLAY_S3_REGION").unwrap_or_else(|_| "us-east-1".to_string()); - let prefix = std::env::var("REPLAY_S3_PREFIX").unwrap_or_else(|_| "replays".to_string()); let mut config = S3ConfigBuilder::new() .region(Region::new(region)) @@ -157,43 +174,59 @@ impl ReplayStorage { Ok(Some(Self { client: Client::from_conf(config.build()), - bucket, - prefix: prefix.trim_matches('/').to_string(), + bucket_prefix: normalize_bucket_prefix(&bucket_prefix)?, })) } pub async fn store_replay_chunk( &self, pool: &sqlx::PgPool, - mut input: ReplayChunkInput, + input: &mut ReplayChunkInput, ) -> Result<(), ReplayStorageError> { - if replay_chunk_exists(pool, &input).await? { + if replay_chunk_exists(pool, input).await? { return Ok(()); } - input.events.sort_by_key(replay_timestamp_ms); + if !replay_events_are_timestamp_ordered(&input.events) { + input.events.sort_by_key(replay_timestamp_ms); + } let snapshot_id = Uuid::new_v4(); let first_event_timestamp_ms = replay_first_event_timestamp_ms(&input.events); let last_event_timestamp_ms = replay_last_event_timestamp_ms(&input.events); let has_full_snapshot = replay_has_full_snapshot(&input.events); let event_count = i32::try_from(input.events.len()).unwrap_or(i32::MAX); + let first_sequence = input.first_sequence.unwrap_or(input.sequence); + let last_sequence = input.last_sequence.unwrap_or(input.sequence); + let client_batch_count = input.client_batch_count.max(1); let route_metadata = replay_route_metadata(&input.events, input.url.as_deref()); - let object_key = self.object_key( - input.project_id, + let object_key = replay_object_key( + input.storage_generation, &input.session_id, &input.window_id, input.batch_id.as_deref(), input.sequence, first_event_timestamp_ms.unwrap_or(0), ); - let (compressed, uncompressed_bytes) = compress_replay_events(input.events).await?; + let (compressed, uncompressed_bytes) = compress_replay_events(input.events.clone()).await?; let compressed_bytes = i64::try_from(compressed.len()).unwrap_or(i64::MAX); - self.put_object(&object_key, compressed).await?; + let bucket = self.bucket_for_project(input.project_id); + self.put_object(&bucket, &object_key, compressed).await?; let result = async { let mut tx = pool.begin().await?; + if !replay_storage_generation_is_active( + &mut *tx, + input.project_id, + input.storage_generation, + ) + .await? + { + tx.commit().await?; + return Ok::<(bool, bool), sqlx::Error>((false, false)); + } + let insert_result = sqlx::query( r#" INSERT INTO replay_snapshots ( @@ -206,9 +239,12 @@ impl ReplayStorage { is_final, batch_id, sequence, + first_sequence, + last_sequence, + client_batch_count, identifier, - s3_bucket, s3_key, + storage_generation, content_encoding, compressed_bytes, uncompressed_bytes, @@ -221,8 +257,11 @@ impl ReplayStorage { routes, route_count, route_spans - ) VALUES ( - $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24 + ) + VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, + $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, + $25, $26, $27 ) ON CONFLICT DO NOTHING "#, @@ -236,9 +275,12 @@ impl ReplayStorage { .bind(input.is_final) .bind(&input.batch_id) .bind(input.sequence) + .bind(first_sequence) + .bind(last_sequence) + .bind(client_batch_count) .bind(&input.identifier) - .bind(&self.bucket) .bind(&object_key) + .bind(input.storage_generation) .bind(REPLAY_CONTENT_ENCODING) .bind(compressed_bytes) .bind(uncompressed_bytes) @@ -255,8 +297,32 @@ impl ReplayStorage { .await?; if insert_result.rows_affected() == 0 { + let already_exists: bool = sqlx::query_scalar( + r#" + SELECT EXISTS ( + SELECT 1 FROM replay_snapshots + WHERE project_id = $1 + AND session_id = $2 + AND window_id = $3 + AND storage_generation = $6 + AND ( + ($4::text IS NOT NULL AND batch_id = $4) + OR ($4::text IS NULL AND sequence = $5) + OR ($5 BETWEEN first_sequence AND last_sequence) + ) + ) + "#, + ) + .bind(input.project_id) + .bind(&input.session_id) + .bind(&input.window_id) + .bind(&input.batch_id) + .bind(input.sequence) + .bind(input.storage_generation) + .fetch_one(&mut *tx) + .await?; tx.commit().await?; - return Ok::(false); + return Ok::<(bool, bool), sqlx::Error>((false, already_exists)); } let latest_filter_metadata = sqlx::query_as::<_, ReplayFilterMetadata>( @@ -426,17 +492,29 @@ impl ReplayStorage { .await?; tx.commit().await?; - Ok::(true) + Ok::<(bool, bool), sqlx::Error>((true, false)) } .await; match result { - Ok(true) => {} - Ok(false) => { + Ok((true, _)) => { + record_replay_chunk_metrics( + input.flush_reason.as_deref(), + input.is_final, + event_count, + compressed_bytes, + uncompressed_bytes, + ); + } + Ok((false, true)) => { // Idempotent retries use the same deterministic object key. Deleting it here // would remove the object referenced by the transaction that won the race. return Ok(()); } + Ok((false, false)) => { + self.delete_object(&bucket, &object_key).await?; + return Ok(()); + } Err(error) => { // Keep deterministic objects on database failure. A concurrent transaction may // already reference the same key, and the backed-up retry can safely reuse it. @@ -454,6 +532,17 @@ impl ReplayStorage { ) -> Result<(), ReplayStorageError> { let mut tx = pool.begin().await?; + if !replay_storage_generation_is_active( + &mut *tx, + input.project_id, + input.storage_generation, + ) + .await? + { + tx.commit().await?; + return Ok(()); + } + sqlx::query( r#" INSERT INTO replay_filter_events ( @@ -466,7 +555,8 @@ impl ReplayStorage { os, normalized_route, custom - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) "#, ) .bind(Uuid::new_v4()) @@ -553,30 +643,19 @@ impl ReplayStorage { Ok(()) } - fn object_key( - &self, - project_id: Uuid, - session_id: &str, - window_id: &str, - batch_id: Option<&str>, - sequence: i64, - first_event_timestamp_ms: i64, - ) -> String { - let identity = batch_id - .map(|value| crate::utils::sha256_hex(&[value.as_bytes()])) - .unwrap_or_else(|| { - crate::utils::sha256_hex(&[window_id.as_bytes(), &sequence.to_be_bytes()]) - }); - format!( - "{}/{}/{}/{}-{}.json.zst", - self.prefix, project_id, session_id, first_event_timestamp_ms, identity - ) + fn bucket_for_project(&self, project_id: Uuid) -> String { + format!("{}-{}", self.bucket_prefix, project_id) } - async fn put_object(&self, key: &str, body: Vec) -> Result<(), ReplayStorageError> { + async fn put_object( + &self, + bucket: &str, + key: &str, + body: Vec, + ) -> Result<(), ReplayStorageError> { self.client .put_object() - .bucket(&self.bucket) + .bucket(bucket) .key(key) .content_type("application/json") .content_encoding(REPLAY_CONTENT_ENCODING) @@ -587,6 +666,137 @@ impl ReplayStorage { Ok(()) } + + async fn delete_object(&self, bucket: &str, key: &str) -> Result<(), ReplayStorageError> { + self.client + .delete_object() + .bucket(bucket) + .key(key) + .send() + .await + .map_err(|error| ReplayStorageError::Upload(error.to_string()))?; + Ok(()) + } +} + +/// Confirms the project's replay storage is active at `generation`, taking a +/// `FOR SHARE` lock so the generation cannot change before the caller's +/// transaction commits. This makes per-statement generation guards unnecessary. +async fn replay_storage_generation_is_active<'e, E>( + executor: E, + project_id: Uuid, + generation: i32, +) -> Result +where + E: sqlx::Executor<'e, Database = sqlx::Postgres>, +{ + let row = sqlx::query_scalar::<_, i32>( + r#" + SELECT replay_storage_generation + FROM project + WHERE id = $1 + AND replay_storage_generation = $2 + AND replay_storage_state = 'active' + FOR SHARE + "#, + ) + .bind(project_id) + .bind(generation) + .fetch_optional(executor) + .await?; + Ok(row.is_some()) +} + +fn replay_object_key( + storage_generation: i32, + session_id: &str, + window_id: &str, + batch_id: Option<&str>, + sequence: i64, + first_event_timestamp_ms: i64, +) -> String { + let identity = batch_id + .map(|value| crate::utils::sha256_hex(&[value.as_bytes()])) + .unwrap_or_else(|| { + crate::utils::sha256_hex(&[window_id.as_bytes(), &sequence.to_be_bytes()]) + }); + format!( + "{}/{}/{}/{}-{}.json.zst", + storage_generation, session_id, window_id, first_event_timestamp_ms, identity + ) +} + +/// Allowed `flush_reason` metric labels. Anything else is bucketed into +/// `unknown` to keep metric cardinality bounded. +const KNOWN_FLUSH_REASONS: &[&str] = &[ + "interval", + "maxEvents", + "maxBytes", + "checkout", + "fullSnapshot", + "minLength", + "pageHidden", + "pageShow", + "unload", + "stop", + "sessionRotate", + "coalesced", + "manual", +]; + +fn normalize_flush_reason(value: Option<&str>) -> &str { + let value = value.unwrap_or("unknown"); + if value.starts_with("coalesced:") { + "coalesced" + } else if KNOWN_FLUSH_REASONS.contains(&value) { + value + } else { + "unknown" + } +} + +fn record_replay_chunk_metrics( + flush_reason: Option<&str>, + is_final: bool, + event_count: i32, + compressed_bytes: i64, + uncompressed_bytes: i64, +) { + let labels = [ + ( + "flush_reason", + normalize_flush_reason(flush_reason).to_string(), + ), + ("is_final", is_final.to_string()), + ]; + metrics::counter!("replay_chunks_committed_total", &labels).increment(1); + metrics::histogram!("replay_chunk_events", &labels).record(event_count as f64); + metrics::histogram!("replay_chunk_compressed_bytes", &labels).record(compressed_bytes as f64); + metrics::histogram!("replay_chunk_uncompressed_bytes", &labels) + .record(uncompressed_bytes as f64); +} + +fn normalize_bucket_prefix(value: &str) -> Result { + let normalized = value + .trim() + .to_ascii_lowercase() + .chars() + .map(|character| { + if character.is_ascii_lowercase() || character.is_ascii_digit() || character == '-' { + character + } else { + '-' + } + }) + .collect::() + .trim_matches('-') + .chars() + .take(26) + .collect::(); + if normalized.len() < 3 { + return Err("REPLAY_S3_BUCKET_PREFIX must contain at least 3 valid characters".to_string()); + } + Ok(normalized) } struct CountingWriter { @@ -665,6 +875,19 @@ fn replay_last_event_timestamp_ms(events: &[Value]) -> Option { events.iter().filter_map(replay_timestamp_ms).max() } +fn replay_events_are_timestamp_ordered(events: &[Value]) -> bool { + let mut previous = None; + for timestamp in events.iter().filter_map(replay_timestamp_ms) { + if let Some(previous) = previous + && timestamp < previous + { + return false; + } + previous = Some(timestamp); + } + true +} + fn replay_has_full_snapshot(events: &[Value]) -> bool { events.iter().any(|event| { event @@ -766,6 +989,7 @@ async fn replay_chunk_exists( AND session_id = $2 AND batch_id = $3 AND window_id = $4 + AND storage_generation = $5 ) "#, ) @@ -773,6 +997,7 @@ async fn replay_chunk_exists( .bind(&input.session_id) .bind(batch_id) .bind(&input.window_id) + .bind(input.storage_generation) .fetch_one(pool) .await?; if exists { @@ -785,7 +1010,11 @@ async fn replay_chunk_exists( SELECT EXISTS ( SELECT 1 FROM replay_snapshots - WHERE project_id = $1 AND session_id = $2 AND window_id = $3 AND sequence = $4 + WHERE project_id = $1 + AND session_id = $2 + AND window_id = $3 + AND storage_generation = $5 + AND (sequence = $4 OR $4 BETWEEN first_sequence AND last_sequence) ) "#, ) @@ -793,6 +1022,7 @@ async fn replay_chunk_exists( .bind(&input.session_id) .bind(&input.window_id) .bind(input.sequence) + .bind(input.storage_generation) .fetch_one(pool) .await } @@ -834,6 +1064,26 @@ mod tests { use super::*; use serde_json::json; + #[test] + fn normalizes_bucket_prefix_for_project_bucket_names() { + assert_eq!( + normalize_bucket_prefix(" FastStats_Replays ").unwrap(), + "faststats-replays" + ); + assert_eq!( + normalize_bucket_prefix("abcdefghijklmnopqrstuvwxyz-more").unwrap(), + "abcdefghijklmnopqrstuvwxyz" + ); + assert!(normalize_bucket_prefix("__").is_err()); + } + + #[test] + fn replay_object_keys_are_generation_scoped() { + let key = replay_object_key(7, "session-1", "window-1", Some("batch-1"), 3, 1234); + assert!(key.starts_with("7/session-1/window-1/1234-")); + assert!(key.ends_with(".json.zst")); + } + #[test] fn replay_route_metadata_uses_fallback_route() { let events = vec![