Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/batch_queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,7 @@ impl BatchQueue {
&self,
pool: &sqlx::PgPool,
replay_storage: Option<&crate::replay_storage::ReplayStorage>,
replay_coalescer: Option<&crate::replay_coalescer::ReplayCoalescer>,
) {
match self.backup_store.cleanup_stale_requests().await {
Ok(count) if count > 0 => {
Expand Down Expand Up @@ -765,8 +766,14 @@ impl BatchQueue {
}

for (id, request) in requests {
let result =
super::handler::process_failed_request(self, pool, replay_storage, &request).await;
let result = super::handler::process_failed_request(
self,
pool,
replay_storage,
replay_coalescer,
&request,
)
.await;

match result {
Ok(()) => {
Expand Down
59 changes: 37 additions & 22 deletions src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ pub struct IpRule {
#[derive(Clone)]
pub struct ProjectContext {
pub project_id: Uuid,
pub replay_storage_generation: i32,
pub replay_storage_active: bool,
/// The user ID to bill — either the owner_id directly (if it's a user)
/// or the org owner's user_id (if owner_id is an organization).
pub billing_customer_id: String,
Expand All @@ -167,6 +169,7 @@ pub async fn load_project_context(
let rows = sqlx::query(
r#"
SELECT p.id, p.owner_id, p.allowed_hostnames, p.error_tracking_enabled, p.cookieless_mode,
p.replay_storage_generation, p.replay_storage_state::text AS replay_storage_state,
o.id AS organization_id,
m.user_id AS org_owner_user_id,
d.reference_id, d.name, d.data_type::text, d.regex, d.allow_negative,
Expand Down Expand Up @@ -228,6 +231,8 @@ pub async fn load_project_context(

let ctx = Arc::new(ProjectContext {
project_id: first.get("id"),
replay_storage_generation: first.get("replay_storage_generation"),
replay_storage_active: first.get::<String, _>("replay_storage_state") == "active",
billing_customer_id,
organization_id,
allowed_hostnames: first
Expand Down Expand Up @@ -565,6 +570,7 @@ pub async fn process_failed_request(
batch_queue: &BatchQueue,
pool: &sqlx::PgPool,
replay_storage: Option<&crate::replay_storage::ReplayStorage>,
replay_coalescer: Option<&crate::replay_coalescer::ReplayCoalescer>,
request: &FailedRequest,
) -> Result<(), String> {
match request.request_type {
Expand All @@ -574,7 +580,7 @@ pub async fn process_failed_request(
process_vitals_request(batch_queue, pool, replay_storage, request).await
}
RequestType::Replay => {
process_replay_request(batch_queue, pool, replay_storage, request).await
process_replay_request(batch_queue, pool, replay_coalescer, request).await
}
}
}
Expand Down Expand Up @@ -784,13 +790,15 @@ async fn process_web_request(
})
});

if let Some(session_id) = parsed.session_id.as_deref()
if ctx.replay_storage_active
&& let Some(session_id) = parsed.session_id.as_deref()
&& let Some(replay_storage) = replay_storage
&& let Err(error) = replay_storage
.record_filter_event(
pool,
crate::replay_storage::ReplayFilterEventInput {
project_id: ctx.project_id,
storage_generation: ctx.replay_storage_generation,
session_id,
window_id: parsed.window_id.as_deref().unwrap_or(session_id),
identifier: Some(fallback_identity.as_str()),
Expand Down Expand Up @@ -948,7 +956,7 @@ async fn process_vitals_request(
async fn process_replay_request(
batch_queue: &BatchQueue,
pool: &sqlx::PgPool,
replay_storage: Option<&crate::replay_storage::ReplayStorage>,
replay_coalescer: Option<&crate::replay_coalescer::ReplayCoalescer>,
request: &FailedRequest,
) -> Result<(), String> {
use crate::handler::replay::ReplayRequest;
Expand All @@ -962,6 +970,7 @@ async fn process_replay_request(
view_id,
session_start,
is_final,
flush_reason,
batch_id,
sequence,
url,
Expand All @@ -975,8 +984,11 @@ async fn process_replay_request(
.await
.map_err(|_| "Unauthorized or database error")?;

let replay_storage =
replay_storage.ok_or_else(|| "Replay storage is not configured".to_string())?;
let replay_coalescer =
replay_coalescer.ok_or_else(|| "Replay storage is not configured".to_string())?;
if !ctx.replay_storage_active {
return Err("Replay storage is resetting".to_string());
}
let server_id = match ctx.cookieless_mode {
Some(true) => {
let ip = request.client_ip.as_deref().unwrap_or("");
Expand Down Expand Up @@ -1008,23 +1020,26 @@ async fn process_replay_request(
return Err("No valid events".to_string());
}

replay_storage
.store_replay_chunk(
pool,
crate::replay_storage::ReplayChunkInput {
project_id: ctx.project_id,
session_id: session_id.clone(),
window_id,
view_id,
session_start_ms: session_start.and_then(|value| i64::try_from(value).ok()),
is_final,
batch_id,
sequence,
identifier: Some(server_id.to_string()),
url: Some(url),
events,
},
)
replay_coalescer
.ingest(crate::replay_storage::ReplayChunkInput {
project_id: ctx.project_id,
storage_generation: ctx.replay_storage_generation,
session_id: session_id.clone(),
window_id,
view_id,
session_start_ms: session_start.and_then(|value| i64::try_from(value).ok()),
is_final,
flush_reason,
batch_id,
sequence,
first_sequence: None,
last_sequence: None,
client_batch_count: 1,
approx_events_bytes: request.body.len(),
identifier: Some(server_id.to_string()),
url: Some(url),
events,
})
.await
.map_err(|error| error.to_string())?;

Expand Down
50 changes: 32 additions & 18 deletions src/handler/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ pub(crate) struct ReplayRequest {
#[serde(default)]
pub(crate) is_final: bool,
#[serde(default)]
pub(crate) flush_reason: Option<String>,
#[serde(default)]
pub(crate) batch_id: Option<String>,
pub(crate) sequence: u64,
pub(crate) url: String,
Expand All @@ -88,6 +90,7 @@ pub async fn replay(
Ok(b) => b,
Err(e) => return error_response(StatusCode::BAD_REQUEST, &e),
};
let replay_payload_bytes = body.len();

let parsed: ReplayRequest = match serde_json::from_slice(&body) {
Ok(p) => p,
Expand All @@ -107,6 +110,7 @@ pub async fn replay(
view_id,
session_start,
is_final,
flush_reason,
batch_id,
sequence,
url,
Expand Down Expand Up @@ -188,7 +192,14 @@ pub async fn replay(
return error_response(StatusCode::BAD_REQUEST, "No valid events");
}

let Some(replay_storage) = state.replay_storage.as_deref() else {
if !context.replay_storage_active {
return error_response(
StatusCode::SERVICE_UNAVAILABLE,
"Replay storage is resetting",
);
}

let Some(replay_coalescer) = state.replay_coalescer.as_deref() else {
return error_response(
StatusCode::SERVICE_UNAVAILABLE,
"Replay storage is not configured",
Expand All @@ -201,23 +212,26 @@ pub async fn replay(
organization_id: context.organization_id.as_deref().map(Into::into),
};

match replay_storage
.store_replay_chunk(
&state.pool,
crate::replay_storage::ReplayChunkInput {
project_id: context.project_id,
session_id: session_id.clone(),
window_id,
view_id,
session_start_ms: session_start.and_then(|value| i64::try_from(value).ok()),
is_final,
batch_id,
sequence,
identifier: Some(server_id.to_string()),
url: Some(url),
events,
},
)
match replay_coalescer
.ingest(crate::replay_storage::ReplayChunkInput {
project_id: context.project_id,
storage_generation: context.replay_storage_generation,
session_id: session_id.clone(),
window_id,
view_id,
session_start_ms: session_start.and_then(|value| i64::try_from(value).ok()),
is_final,
flush_reason,
batch_id,
sequence,
first_sequence: None,
last_sequence: None,
client_batch_count: 1,
approx_events_bytes: replay_payload_bytes,
identifier: Some(server_id.to_string()),
url: Some(url),
events,
})
.await
{
Ok(()) => {
Expand Down
4 changes: 3 additions & 1 deletion src/handler/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,15 @@ pub async fn web(
let error_v3_context = should_process_errors
.then(|| request_context(context, || web_context(&event_row, &properties)));

if let Some(session_id) = session_id.as_deref()
if ctx.replay_storage_active
&& let Some(session_id) = session_id.as_deref()
&& let Some(replay_storage) = state.replay_storage.as_deref()
&& let Err(error) = replay_storage
.record_filter_event(
&state.pool,
crate::replay_storage::ReplayFilterEventInput {
project_id: ctx.project_id,
storage_generation: ctx.replay_storage_generation,
session_id,
window_id: window_id.as_deref().unwrap_or(session_id),
identifier: Some(fallback_identity.as_str()),
Expand Down
27 changes: 25 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mod handler;
mod identity;
mod models;
mod polar;
mod replay_coalescer;
mod replay_storage;
mod tinybird;
pub mod ua_parser;
Expand Down Expand Up @@ -136,6 +137,12 @@ async fn main() {
panic!("Invalid replay storage configuration: {}", error);
}
};
let replay_coalescer = replay_storage
.as_ref()
.map(|storage| replay_coalescer::ReplayCoalescer::new(Arc::clone(storage), pool.clone()));
if replay_coalescer.is_some() {
info!("Replay coalescer enabled");
}

let recorder_handle = setup_metrics_recorder();

Expand All @@ -157,9 +164,15 @@ async fn main() {
pool: pool.clone(),
batch_queue: Arc::clone(&batch_queue),
replay_storage: replay_storage.clone(),
replay_coalescer: replay_coalescer.clone(),
};

start_failed_request_replayer(pool, Arc::clone(&batch_queue), replay_storage);
start_failed_request_replayer(
pool.clone(),
Arc::clone(&batch_queue),
replay_storage,
replay_coalescer.clone(),
);

let cors = CorsLayer::new()
.allow_origin(AllowOrigin::mirror_request())
Expand Down Expand Up @@ -222,6 +235,11 @@ async fn main() {
.expect("Server error");

info!("Shutting down, flushing in-memory batch...");
if let Some(coalescer) = replay_coalescer.as_ref()
&& let Err(error) = coalescer.flush_all().await
{
warn!("Failed to flush replay coalescer on shutdown: {}", error);
}
batch_queue.flush_in_memory_batch().await;
info!("Shutdown complete");
}
Expand All @@ -230,14 +248,19 @@ fn start_failed_request_replayer(
pool: sqlx::PgPool,
batch_queue: Arc<batch_queue::BatchQueue>,
replay_storage: Option<Arc<replay_storage::ReplayStorage>>,
replay_coalescer: Option<Arc<replay_coalescer::ReplayCoalescer>>,
) {
tokio::spawn(async move {
let replay_interval = std::time::Duration::from_secs(60);

loop {
tokio::time::sleep(replay_interval).await;
batch_queue
.replay_failed_requests(&pool, replay_storage.as_deref())
.replay_failed_requests(
&pool,
replay_storage.as_deref(),
replay_coalescer.as_deref(),
)
.await;
}
});
Expand Down
2 changes: 2 additions & 0 deletions src/models.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::batch_queue::BatchQueue;
use crate::replay_coalescer::ReplayCoalescer;
use crate::replay_storage::ReplayStorage;
use serde::{Deserialize, Serialize};
use serde_json::Value;
Expand All @@ -11,6 +12,7 @@ pub struct AppState {
pub pool: PgPool,
pub batch_queue: Arc<BatchQueue>,
pub replay_storage: Option<Arc<ReplayStorage>>,
pub replay_coalescer: Option<Arc<ReplayCoalescer>>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
Loading