Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions benches/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ fn bench_dispatch_retryable_dead_letter(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("retryable_dead_letter");
group.throughput(Throughput::Elements(100));
group.sample_size(10);
group.warm_up_time(Duration::from_secs(1));
group.measurement_time(Duration::from_secs(3));

let strategies: &[(&str, BackoffStrategy)] = &[
(
Expand Down
3 changes: 3 additions & 0 deletions benches/tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ fn bench_query_by_tags(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("query_by_tags");
group.throughput(Throughput::Elements(1));
group.sample_size(10);
group.warm_up_time(Duration::from_secs(1));
group.measurement_time(Duration::from_secs(3));

for queue_depth in [100usize, 1000, 5000] {
let store = rt.block_on(store_with_tagged_tasks(queue_depth));
Expand Down
50 changes: 33 additions & 17 deletions src/scheduler/spawn/failure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,23 +68,39 @@ pub(crate) async fn handle_failure(
"task failed"
);

let fail_backoff = crate::store::FailBackoff {
strategy: backoff_strategy,
executor_retry_after_ms: error.retry_after_ms,
};
if let Err(e) = deps
.store
.fail_with_record(
task,
&error.message,
error.retryable,
effective_max_retries,
metrics,
&fail_backoff,
)
.await
{
tracing::error!(task_id, error = %e, "failed to record task failure");
if will_retry {
// Fast path: single UPDATE without a transaction wrapper.
// Avoids BEGIN IMMEDIATE + COMMIT round-trips, reducing connection
// hold time from 3 SQL executions to 1.
let delay = retry_delay.unwrap_or(std::time::Duration::ZERO);
if let Err(e) = deps
.store
.requeue_for_retry(task.id, &error.message, delay)
.await
{
tracing::error!(task_id, error = %e, "failed to requeue task for retry");
}
} else {
// Terminal failure (permanent or retries exhausted): needs a
// transaction for the multi-statement INSERT history + DELETE.
let fail_backoff = crate::store::FailBackoff {
strategy: backoff_strategy,
executor_retry_after_ms: error.retry_after_ms,
};
if let Err(e) = deps
.store
.fail_with_record(
task,
&error.message,
error.retryable,
effective_max_retries,
metrics,
&fail_backoff,
)
.await
{
tracing::error!(task_id, error = %e, "failed to record task failure");
}
}

// Remove from active tracking AFTER the store write completes.
Expand Down
42 changes: 42 additions & 0 deletions src/store/lifecycle/transitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,48 @@ impl TaskStore {
Ok(())
}

/// Requeue a task for retry without a transaction.
///
/// The single UPDATE is atomically safe and avoids `BEGIN IMMEDIATE` +
/// `COMMIT` round-trips, reducing connection hold time from 3 SQL
/// executions to 1. This matters under concurrency because
/// `open_memory()` uses a single-connection pool — shorter hold time
/// lets dispatch (`pop_next_batch`) interleave sooner.
pub async fn requeue_for_retry(
&self,
task_id: i64,
error: &str,
delay: std::time::Duration,
) -> Result<(), StoreError> {
if delay.is_zero() {
sqlx::query(
"UPDATE tasks SET status = 'pending', started_at = NULL,
retry_count = retry_count + 1, last_error = ?
WHERE id = ?",
)
.bind(error)
.bind(task_id)
.execute(&self.pool)
.await?;
} else {
let run_after_ms = (chrono::Utc::now()
+ chrono::Duration::milliseconds(delay.as_millis() as i64))
.timestamp_millis();
sqlx::query(
"UPDATE tasks SET status = 'pending', started_at = NULL,
retry_count = retry_count + 1, last_error = ?,
run_after = ?
WHERE id = ?",
)
.bind(error)
.bind(run_after_ms)
.bind(task_id)
.execute(&self.pool)
.await?;
}
Ok(())
}

/// Shared failure logic: retry or move to history.
///
/// When retrying, computes the backoff delay from (in priority order):
Expand Down
Loading