From bde754d7c028f4a8b9de0df57d6efaff2664d793 Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Mon, 23 Mar 2026 08:50:04 -0700 Subject: [PATCH] perf: skip transaction for retry requeue, tune slow benchmarks Add `requeue_for_retry` that executes a bare UPDATE on the pool instead of BEGIN IMMEDIATE + UPDATE + COMMIT. With max_connections(1) this cuts connection hold time by 2/3 for retries, letting dispatch interleave sooner. Measured 2.5x throughput improvement on retryable_dead_letter. Reduce sample_size/warm_up/measurement_time on query_by_tags and retryable_dead_letter benchmark groups to cut wall-clock time ~50%. --- benches/retry.rs | 3 ++ benches/tags.rs | 3 ++ src/scheduler/spawn/failure.rs | 50 ++++++++++++++++++++---------- src/store/lifecycle/transitions.rs | 42 +++++++++++++++++++++++++ 4 files changed, 81 insertions(+), 17 deletions(-) diff --git a/benches/retry.rs b/benches/retry.rs index 84ae52a..8934ad4 100644 --- a/benches/retry.rs +++ b/benches/retry.rs @@ -173,6 +173,9 @@ fn bench_dispatch_retryable_dead_letter(c: &mut Criterion) { let rt = Runtime::new().unwrap(); let mut group = c.benchmark_group("retryable_dead_letter"); group.throughput(Throughput::Elements(100)); + group.sample_size(10); + group.warm_up_time(Duration::from_secs(1)); + group.measurement_time(Duration::from_secs(3)); let strategies: &[(&str, BackoffStrategy)] = &[ ( diff --git a/benches/tags.rs b/benches/tags.rs index ffad797..e1ee8a3 100644 --- a/benches/tags.rs +++ b/benches/tags.rs @@ -102,6 +102,9 @@ fn bench_query_by_tags(c: &mut Criterion) { let rt = Runtime::new().unwrap(); let mut group = c.benchmark_group("query_by_tags"); group.throughput(Throughput::Elements(1)); + group.sample_size(10); + group.warm_up_time(Duration::from_secs(1)); + group.measurement_time(Duration::from_secs(3)); for queue_depth in [100usize, 1000, 5000] { let store = rt.block_on(store_with_tagged_tasks(queue_depth)); diff --git a/src/scheduler/spawn/failure.rs b/src/scheduler/spawn/failure.rs index 4a54b88..6663709 100644 --- a/src/scheduler/spawn/failure.rs +++ b/src/scheduler/spawn/failure.rs @@ -68,23 +68,39 @@ pub(crate) async fn handle_failure( "task failed" ); - let fail_backoff = crate::store::FailBackoff { - strategy: backoff_strategy, - executor_retry_after_ms: error.retry_after_ms, - }; - if let Err(e) = deps - .store - .fail_with_record( - task, - &error.message, - error.retryable, - effective_max_retries, - metrics, - &fail_backoff, - ) - .await - { - tracing::error!(task_id, error = %e, "failed to record task failure"); + if will_retry { + // Fast path: single UPDATE without a transaction wrapper. + // Avoids BEGIN IMMEDIATE + COMMIT round-trips, reducing connection + // hold time from 3 SQL executions to 1. + let delay = retry_delay.unwrap_or(std::time::Duration::ZERO); + if let Err(e) = deps + .store + .requeue_for_retry(task.id, &error.message, delay) + .await + { + tracing::error!(task_id, error = %e, "failed to requeue task for retry"); + } + } else { + // Terminal failure (permanent or retries exhausted): needs a + // transaction for the multi-statement INSERT history + DELETE. + let fail_backoff = crate::store::FailBackoff { + strategy: backoff_strategy, + executor_retry_after_ms: error.retry_after_ms, + }; + if let Err(e) = deps + .store + .fail_with_record( + task, + &error.message, + error.retryable, + effective_max_retries, + metrics, + &fail_backoff, + ) + .await + { + tracing::error!(task_id, error = %e, "failed to record task failure"); + } } // Remove from active tracking AFTER the store write completes. diff --git a/src/store/lifecycle/transitions.rs b/src/store/lifecycle/transitions.rs index fb51b0c..25f16f6 100644 --- a/src/store/lifecycle/transitions.rs +++ b/src/store/lifecycle/transitions.rs @@ -724,6 +724,48 @@ impl TaskStore { Ok(()) } + /// Requeue a task for retry without a transaction. + /// + /// The single UPDATE is atomically safe and avoids `BEGIN IMMEDIATE` + + /// `COMMIT` round-trips, reducing connection hold time from 3 SQL + /// executions to 1. This matters under concurrency because + /// `open_memory()` uses a single-connection pool — shorter hold time + /// lets dispatch (`pop_next_batch`) interleave sooner. + pub async fn requeue_for_retry( + &self, + task_id: i64, + error: &str, + delay: std::time::Duration, + ) -> Result<(), StoreError> { + if delay.is_zero() { + sqlx::query( + "UPDATE tasks SET status = 'pending', started_at = NULL, + retry_count = retry_count + 1, last_error = ? + WHERE id = ?", + ) + .bind(error) + .bind(task_id) + .execute(&self.pool) + .await?; + } else { + let run_after_ms = (chrono::Utc::now() + + chrono::Duration::milliseconds(delay.as_millis() as i64)) + .timestamp_millis(); + sqlx::query( + "UPDATE tasks SET status = 'pending', started_at = NULL, + retry_count = retry_count + 1, last_error = ?, + run_after = ? + WHERE id = ?", + ) + .bind(error) + .bind(run_after_ms) + .bind(task_id) + .execute(&self.pool) + .await?; + } + Ok(()) + } + /// Shared failure logic: retry or move to history. /// /// When retrying, computes the backoff delay from (in priority order):