From b0666942ea921fecfd01ed04d86fd30eb19d6e9a Mon Sep 17 00:00:00 2001 From: emiliosp Date: Wed, 28 Jan 2026 16:08:59 +0100 Subject: [PATCH 1/4] feat: add retry logic --- migrations/20260128113327_add_retry_logic.ts | 23 +++++++++ src/consumer.ts | 49 ++++++++++++++++++-- src/types.ts | 10 ++-- 3 files changed, 74 insertions(+), 8 deletions(-) create mode 100644 migrations/20260128113327_add_retry_logic.ts diff --git a/migrations/20260128113327_add_retry_logic.ts b/migrations/20260128113327_add_retry_logic.ts new file mode 100644 index 0000000..5fe537b --- /dev/null +++ b/migrations/20260128113327_add_retry_logic.ts @@ -0,0 +1,23 @@ +import type { Knex } from 'knex'; + +export async function up(knex: Knex): Promise { + // Add retry columns + await knex.raw( + `ALTER TABLE tasks ADD COLUMN retry_count INTEGER NOT NULL DEFAULT 0;`, + ); + await knex.raw( + `ALTER TABLE tasks ADD COLUMN max_retries INTEGER NOT NULL DEFAULT 3;`, + ); + await knex.raw(`ALTER TABLE tasks ADD COLUMN error_message TEXT;`); + + await knex.raw(`ALTER TYPE t_status ADD VALUE 'failed';`); +} + +export async function down(knex: Knex): Promise { + await knex.raw(`ALTER TABLE tasks DROP COLUMN retry_count;`); + await knex.raw(`ALTER TABLE tasks DROP COLUMN max_retries;`); + await knex.raw(`ALTER TABLE tasks DROP COLUMN error_message;`); + + await knex.raw(`ALTER TYPE t_status REMOVE VALUE 'failed';`); + // Note: PostgreSQL doesn't support removing values from enums easily +} diff --git a/src/consumer.ts b/src/consumer.ts index c84d310..d5f92ec 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -4,6 +4,10 @@ import type { Task } from './types.ts'; const executeTask = async (task: Task): Promise => { // simulate task execution, just sleep for the specified time + // randomly fail 30% of the time to test retry logic + if (Math.random() < 0.3) { + throw new Error('Random failure for testing'); + } await new Promise((resolve) => setTimeout(resolve, task.payload.sleep)); }; @@ -16,10 +20,45 @@ while (true) { continue; } - await executeTask(task); + try { + await executeTask(task); + + await db('tasks') + .update({ status: 'done', executed_at: db.fn.now() }) + .where({ id: task.id }); + console.log('✅ Task done:', task.id); + } catch (error) { + const newRetryCount = task.retry_count + 1; + + if (newRetryCount >= task.max_retries) { + // Max retries exceeded, mark as failed + await db('tasks') + .update({ + status: 'failed', + retry_count: newRetryCount, + error_message: error.message, + executed_at: db.fn.now(), + }) + .where({ id: task.id }); - await db('tasks') - .update({ status: 'done', executed_at: db.fn.now() }) - .where({ id: task.id }); - console.log('Task done:', task.id); + console.log( + `❌ Task failed permanently after ${newRetryCount} retries:`, + task.id, + ); + } else { + await db('tasks') + .update({ + status: 'pending', + retry_count: newRetryCount, + error_message: error.message, + picked_at: null, + }) + .where({ id: task.id }); + + console.log( + `🔄 Task failed, retry ${newRetryCount}/${task.max_retries}:`, + task.id, + ); + } + } } diff --git a/src/types.ts b/src/types.ts index fc7c200..56e4646 100644 --- a/src/types.ts +++ b/src/types.ts @@ -3,9 +3,13 @@ export type Payload = { }; export type Task = { - id: number; + id: string; payload: Payload; - status: 'pending' | 'in_progress' | 'done'; + status: 'pending' | 'in_progress' | 'done' | 'failed'; + retry_count: number; + max_retries: number; + error_message: string | null; created_at: string; - updated_at: string; + picked_at: string | null; + executed_at: string | null; }; From 3b9a6af210b448e9e08768df37655945ca77801f Mon Sep 17 00:00:00 2001 From: Emilio Spatola Date: Wed, 28 Jan 2026 16:22:02 +0100 Subject: [PATCH 2/4] Update migrations/20260128113327_add_retry_logic.ts Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- migrations/20260128113327_add_retry_logic.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/migrations/20260128113327_add_retry_logic.ts b/migrations/20260128113327_add_retry_logic.ts index 5fe537b..80a3770 100644 --- a/migrations/20260128113327_add_retry_logic.ts +++ b/migrations/20260128113327_add_retry_logic.ts @@ -18,6 +18,6 @@ export async function down(knex: Knex): Promise { await knex.raw(`ALTER TABLE tasks DROP COLUMN max_retries;`); await knex.raw(`ALTER TABLE tasks DROP COLUMN error_message;`); - await knex.raw(`ALTER TYPE t_status REMOVE VALUE 'failed';`); - // Note: PostgreSQL doesn't support removing values from enums easily + // Note: PostgreSQL does not support simply removing values from enums. + // The 'failed' value will remain in t_status; only the added columns are rolled back. } From e2fd92277f8fa6824360ff7cad8000e3a9388667 Mon Sep 17 00:00:00 2001 From: Emilio Spatola Date: Wed, 28 Jan 2026 16:23:00 +0100 Subject: [PATCH 3/4] Update migrations/20260128113327_add_retry_logic.ts Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- migrations/20260128113327_add_retry_logic.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/migrations/20260128113327_add_retry_logic.ts b/migrations/20260128113327_add_retry_logic.ts index 80a3770..698b9b4 100644 --- a/migrations/20260128113327_add_retry_logic.ts +++ b/migrations/20260128113327_add_retry_logic.ts @@ -10,7 +10,7 @@ export async function up(knex: Knex): Promise { ); await knex.raw(`ALTER TABLE tasks ADD COLUMN error_message TEXT;`); - await knex.raw(`ALTER TYPE t_status ADD VALUE 'failed';`); + await knex.raw(`ALTER TYPE t_status ADD VALUE IF NOT EXISTS 'failed';`); } export async function down(knex: Knex): Promise { From 934d18127da2d777a52322742a0c0fdd46b3bb1b Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Wed, 28 Jan 2026 16:30:37 +0100 Subject: [PATCH 4/4] Rename max_retries to max_attempts for semantic clarity (#2) * Initial plan * Rename max_retries to max_attempts for clarity Co-authored-by: emilioSp <22614105+emilioSp@users.noreply.github.com> * Clarify log messages and fix migration down function Co-authored-by: emilioSp <22614105+emilioSp@users.noreply.github.com> * Improve migration down function comment Co-authored-by: emilioSp <22614105+emilioSp@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: emilioSp <22614105+emilioSp@users.noreply.github.com> Co-authored-by: Emilio Spatola --- migrations/20260128113327_add_retry_logic.ts | 4 ++-- src/consumer.ts | 8 ++++---- src/types.ts | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/migrations/20260128113327_add_retry_logic.ts b/migrations/20260128113327_add_retry_logic.ts index 698b9b4..5543919 100644 --- a/migrations/20260128113327_add_retry_logic.ts +++ b/migrations/20260128113327_add_retry_logic.ts @@ -6,7 +6,7 @@ export async function up(knex: Knex): Promise { `ALTER TABLE tasks ADD COLUMN retry_count INTEGER NOT NULL DEFAULT 0;`, ); await knex.raw( - `ALTER TABLE tasks ADD COLUMN max_retries INTEGER NOT NULL DEFAULT 3;`, + `ALTER TABLE tasks ADD COLUMN max_attempts INTEGER NOT NULL DEFAULT 3;`, ); await knex.raw(`ALTER TABLE tasks ADD COLUMN error_message TEXT;`); @@ -15,7 +15,7 @@ export async function up(knex: Knex): Promise { export async function down(knex: Knex): Promise { await knex.raw(`ALTER TABLE tasks DROP COLUMN retry_count;`); - await knex.raw(`ALTER TABLE tasks DROP COLUMN max_retries;`); + await knex.raw(`ALTER TABLE tasks DROP COLUMN max_attempts;`); await knex.raw(`ALTER TABLE tasks DROP COLUMN error_message;`); // Note: PostgreSQL does not support simply removing values from enums. diff --git a/src/consumer.ts b/src/consumer.ts index d5f92ec..64740ed 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -30,8 +30,8 @@ while (true) { } catch (error) { const newRetryCount = task.retry_count + 1; - if (newRetryCount >= task.max_retries) { - // Max retries exceeded, mark as failed + if (newRetryCount >= task.max_attempts) { + // Max attempts exceeded, mark as failed await db('tasks') .update({ status: 'failed', @@ -42,7 +42,7 @@ while (true) { .where({ id: task.id }); console.log( - `❌ Task failed permanently after ${newRetryCount} retries:`, + `❌ Task failed permanently (${newRetryCount} failures):`, task.id, ); } else { @@ -56,7 +56,7 @@ while (true) { .where({ id: task.id }); console.log( - `🔄 Task failed, retry ${newRetryCount}/${task.max_retries}:`, + `🔄 Task failed (failure ${newRetryCount}/${task.max_attempts}):`, task.id, ); } diff --git a/src/types.ts b/src/types.ts index 56e4646..85fa452 100644 --- a/src/types.ts +++ b/src/types.ts @@ -7,7 +7,7 @@ export type Task = { payload: Payload; status: 'pending' | 'in_progress' | 'done' | 'failed'; retry_count: number; - max_retries: number; + max_attempts: number; error_message: string | null; created_at: string; picked_at: string | null;