From 2a5a8470646cff094700cdeb40e939885beaffb2 Mon Sep 17 00:00:00 2001 From: Nickolas Oliveira Date: Tue, 9 Jun 2026 11:58:37 -0300 Subject: [PATCH] fix(campaign-packer): terminal-classify deterministic audience failures to stop redelivery loops (EVO-1676) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A campaigns.pack message whose audience computation fails deterministically (malformed segment SQL, invalid campaign config) was requeued forever: only CampaignNotFoundError was treated as terminal, every other error fell through to nack(requeue=true), and the config never changes between redeliveries — so the poison message hot-looped and blocked the partition without ever showing up in terminal_failures. Introduce a shared, layered terminal-error taxonomy and ack/nack policy so the "this can never succeed on retry" decision lives at the boundary where the knowledge is, not in copy-pasted try/catch blocks (the campaign-sender / 4.2 consumer will reuse it): - shared/errors/TerminalError: neutral marker base for permanent failures. - shared/broker/consumer/processWithAckPolicy: success -> ack, TerminalError -> nack(false), anything else -> nack(true). Optional meta merged into the failure log. - shared/persistence/isDeterministicDbError: SQLSTATE classifier (42/22 deterministic; 08/53/57/40 transient; unknown -> transient/retry). - shared/audience/errors: AudienceConfigError (segment/SQL validation) and DeterministicAudienceError (deterministic DB failure, wraps cause). - CampaignNotFoundError / InvalidEnvelopeError now extend TerminalError; segment-query-builder validation throws AudienceConfigError; pack() wraps computeAudience and classifies; both consumers delegate to the shared policy. Deterministic failures now drop terminally (and surface in terminal_failures). The residual risk (an unclassified deterministic error) is covered by the broker-level redelivery backstop tracked in EVO-1677. Behavior change: terminal drops (incl. campaign-not-found) now log at error level with a `terminal` flag, unifying the two consumers. Tests: SQLSTATE classifier, ack/nack policy, and pack() classification (deterministic -> terminal, config -> terminal, transient -> requeue). Co-Authored-By: Claude Opus 4.8 (1M context) --- .../consumers/campaigns-pack.consumer.ts | 47 +++++------- .../errors/campaign-not-found.error.ts | 5 +- .../services/campaign-packer.service.spec.ts | 41 ++++++++++ .../services/campaign-packer.service.ts | 29 ++++++- .../services/event-process.service.ts | 3 +- .../services/events-received.consumer.ts | 30 +++----- src/shared/audience/errors/audience.errors.ts | 28 +++++++ .../audience/segment-query-builder.service.ts | 21 ++--- .../consumer/process-with-ack-policy.spec.ts | 76 +++++++++++++++++++ .../consumer/process-with-ack-policy.ts | 49 ++++++++++++ src/shared/errors/terminal-error.ts | 17 +++++ .../deterministic-db-error.spec.ts | 47 ++++++++++++ .../persistence/deterministic-db-error.ts | 40 ++++++++++ 13 files changed, 369 insertions(+), 64 deletions(-) create mode 100644 src/shared/audience/errors/audience.errors.ts create mode 100644 src/shared/broker/consumer/process-with-ack-policy.spec.ts create mode 100644 src/shared/broker/consumer/process-with-ack-policy.ts create mode 100644 src/shared/errors/terminal-error.ts create mode 100644 src/shared/persistence/deterministic-db-error.spec.ts create mode 100644 src/shared/persistence/deterministic-db-error.ts diff --git a/src/runners/campaign-packer/consumers/campaigns-pack.consumer.ts b/src/runners/campaign-packer/consumers/campaigns-pack.consumer.ts index 9f490a0..d9bede6 100644 --- a/src/runners/campaign-packer/consumers/campaigns-pack.consumer.ts +++ b/src/runners/campaign-packer/consumers/campaigns-pack.consumer.ts @@ -12,7 +12,7 @@ import { import { CorrelationContext } from '../../../shared/correlation/correlation.context'; import { CustomLoggerService } from '../../../common/services/custom-logger.service'; import { CampaignPackerService } from '../services/campaign-packer.service'; -import { CampaignNotFoundError } from '../errors/campaign-not-found.error'; +import { processWithAckPolicy } from '../../../shared/broker/consumer/process-with-ack-policy'; const LOG_CONTEXT = 'CampaignsPackConsumer'; @@ -21,10 +21,10 @@ const LOG_CONTEXT = 'CampaignsPackConsumer'; * boot and routes each message to `CampaignPackerService`, wrapping processing * in the request's `correlationId` so every downstream log carries it. * - * Ack/nack policy: - * - success → ack - * - campaign not found / malformed payload → nack(requeue=false) (terminal) - * - any other (transient) error → nack(requeue=true) (retry) + * Ack/nack is delegated to the shared `processWithAckPolicy`: success → ack, + * `TerminalError` (campaign not found, invalid audience config, deterministic + * DB error) → nack(requeue=false), any other error → nack(requeue=true). A + * structurally invalid payload is dropped up-front (no correlationId to bind). */ @Injectable() export class CampaignsPackConsumer implements OnModuleInit { @@ -57,32 +57,19 @@ export class CampaignsPackConsumer implements OnModuleInit { const payload = msg.payload; - await this.correlation.runWithCorrelationId( - payload.correlationId, - async () => { - try { + await this.correlation.runWithCorrelationId(payload.correlationId, () => + processWithAckPolicy( + msg, + this.broker, + { + logger: this.logger, + context: LOG_CONTEXT, + meta: { campaignId: payload.campaignId }, + }, + async () => { await this.packer.pack(payload); - await this.broker.ack(msg); - } catch (err) { - if (err instanceof CampaignNotFoundError) { - this.logger.warn( - `campaign not found (campaignId=${err.campaignId}) — nack(requeue=false)`, - LOG_CONTEXT, - ); - await this.broker.nack(msg, false); - return; - } - - this.logger.error( - `campaigns.pack processing failed (campaignId=${payload.campaignId}): ${ - err instanceof Error ? err.message : String(err) - } — nack(requeue=true)`, - err instanceof Error ? err.stack : undefined, - LOG_CONTEXT, - ); - await this.broker.nack(msg, true); - } - }, + }, + ), ); } } diff --git a/src/runners/campaign-packer/errors/campaign-not-found.error.ts b/src/runners/campaign-packer/errors/campaign-not-found.error.ts index fd72209..cf91df9 100644 --- a/src/runners/campaign-packer/errors/campaign-not-found.error.ts +++ b/src/runners/campaign-packer/errors/campaign-not-found.error.ts @@ -1,14 +1,15 @@ +import { TerminalError } from '../../../shared/errors/terminal-error'; + /** * Terminal error for a `campaigns.pack` message whose `campaignId` does not * resolve to a Campaign row. The consumer maps it to `nack(requeue=false)` — * requeueing would loop forever since the campaign will never appear. */ -export class CampaignNotFoundError extends Error { +export class CampaignNotFoundError extends TerminalError { readonly campaignId: string; constructor(campaignId: string) { super(`Campaign ${campaignId} not found`); - this.name = 'CampaignNotFoundError'; this.campaignId = campaignId; } } diff --git a/src/runners/campaign-packer/services/campaign-packer.service.spec.ts b/src/runners/campaign-packer/services/campaign-packer.service.spec.ts index 9e35bd5..2bc1a92 100644 --- a/src/runners/campaign-packer/services/campaign-packer.service.spec.ts +++ b/src/runners/campaign-packer/services/campaign-packer.service.spec.ts @@ -1,5 +1,10 @@ import { CampaignPackerService } from './campaign-packer.service'; import { CampaignNotFoundError } from '../errors/campaign-not-found.error'; +import { + AudienceConfigError, + DeterministicAudienceError, +} from '../../../shared/audience/errors/audience.errors'; +import { TerminalError } from '../../../shared/errors/terminal-error'; import type { CampaignsPackContract } from '../../../shared/broker/contracts/campaigns-pack.contract'; const payload: CampaignsPackContract = { @@ -54,4 +59,40 @@ describe('CampaignPackerService', () => { ); expect(computeAudience).not.toHaveBeenCalled(); }); + + it('wraps a deterministic DB error (malformed SQL) as a terminal DeterministicAudienceError', async () => { + findOne.mockResolvedValueOnce({ id: 'camp-1' }); + const pgError = Object.assign(new Error('syntax error at or near "FROM"'), { + code: '42601', + }); + computeAudience.mockRejectedValueOnce(pgError); + + const err = await service.pack(payload).catch((e) => e); + expect(err).toBeInstanceOf(DeterministicAudienceError); + expect(err).toBeInstanceOf(TerminalError); + expect(err.campaignId).toBe('camp-1'); + }); + + it('propagates an AudienceConfigError unchanged (already terminal)', async () => { + findOne.mockResolvedValueOnce({ id: 'camp-1' }); + computeAudience.mockRejectedValueOnce( + new AudienceConfigError('SQL query is empty'), + ); + + const err = await service.pack(payload).catch((e) => e); + expect(err).toBeInstanceOf(AudienceConfigError); + expect(err).not.toBeInstanceOf(DeterministicAudienceError); + }); + + it('rethrows a transient error (connection drop) so the consumer requeues', async () => { + findOne.mockResolvedValueOnce({ id: 'camp-1' }); + const transient = Object.assign(new Error('connection terminated'), { + code: '08006', + }); + computeAudience.mockRejectedValueOnce(transient); + + const err = await service.pack(payload).catch((e) => e); + expect(err).toBe(transient); + expect(err).not.toBeInstanceOf(TerminalError); + }); }); diff --git a/src/runners/campaign-packer/services/campaign-packer.service.ts b/src/runners/campaign-packer/services/campaign-packer.service.ts index b6b944b..15e32dd 100644 --- a/src/runners/campaign-packer/services/campaign-packer.service.ts +++ b/src/runners/campaign-packer/services/campaign-packer.service.ts @@ -2,10 +2,15 @@ import { Injectable } from '@nestjs/common'; import { Repository } from 'typeorm'; import { Campaign } from '../../../modules/campaigns/entities/campaign.entity'; import { TenantDbContext } from '../../../evo-extension-points'; -import { AudienceComputationService } from '../../../shared/audience/audience-computation.service'; +import { + AudienceComputationService, + AudienceComputationResult, +} from '../../../shared/audience/audience-computation.service'; import { CustomLoggerService } from '../../../common/services/custom-logger.service'; import type { CampaignsPackContract } from '../../../shared/broker/contracts/campaigns-pack.contract'; import { CampaignNotFoundError } from '../errors/campaign-not-found.error'; +import { isDeterministicDbError } from '../../../shared/persistence/deterministic-db-error'; +import { DeterministicAudienceError } from '../../../shared/audience/errors/audience.errors'; export interface PackResult { audienceSize: number; @@ -43,7 +48,7 @@ export class CampaignPackerService { throw new CampaignNotFoundError(campaignId); } - const result = await this.audience.computeAudience(campaignId); + const result = await this.computeAudienceOrClassify(campaignId); const audienceSize = result.totalContacts; this.logger.log('campaign.packed', { @@ -56,4 +61,24 @@ export class CampaignPackerService { return { audienceSize }; } + + /** + * Run audience computation, classifying its failures for the consumer's + * ack/nack policy. Invalid segment config already surfaces as a + * `TerminalError` (rethrown as-is); a malformed segment SQL rejected by + * Postgres is a deterministic DB error and is wrapped terminally so it drops + * instead of looping. Everything else is transient and propagates to requeue. + */ + private async computeAudienceOrClassify( + campaignId: string, + ): Promise { + try { + return await this.audience.computeAudience(campaignId); + } catch (err) { + if (isDeterministicDbError(err)) { + throw new DeterministicAudienceError(campaignId, err); + } + throw err; + } + } } diff --git a/src/runners/event-process/services/event-process.service.ts b/src/runners/event-process/services/event-process.service.ts index b4901bb..69ab28f 100644 --- a/src/runners/event-process/services/event-process.service.ts +++ b/src/runners/event-process/services/event-process.service.ts @@ -4,13 +4,14 @@ import { EventsReceivedContract, isEventsReceivedContract, } from 'src/shared/broker/contracts/events-received.contract'; +import { TerminalError } from 'src/shared/errors/terminal-error'; /** * Thrown when a consumed message is not a valid `events.received` envelope. * It is a permanent (non-retriable) failure — the consumer must drop it * (terminal nack) rather than requeue, or it would redeliver forever. */ -export class InvalidEnvelopeError extends Error {} +export class InvalidEnvelopeError extends TerminalError {} /** * Stub handler for the webhook event pipeline (story 3.3 / EVO-1208). diff --git a/src/runners/event-process/services/events-received.consumer.ts b/src/runners/event-process/services/events-received.consumer.ts index d8219b7..daaa7f3 100644 --- a/src/runners/event-process/services/events-received.consumer.ts +++ b/src/runners/event-process/services/events-received.consumer.ts @@ -7,10 +7,8 @@ import { } from 'src/shared/broker/interfaces/message-broker.interface'; import { EVENTS_RECEIVED_TOPIC_PREFIX } from 'src/shared/broker/contracts/events-received.contract'; import { CorrelationContext } from 'src/shared/correlation/correlation.context'; -import { - EventProcessService, - InvalidEnvelopeError, -} from './event-process.service'; +import { processWithAckPolicy } from 'src/shared/broker/consumer/process-with-ack-policy'; +import { EventProcessService } from './event-process.service'; /** * Subscribes to the whole `events.received.` topic family via the @@ -51,21 +49,13 @@ export class EventsReceivedConsumer implements OnApplicationBootstrap { const correlationId = this.correlation.resolveIncoming( msg.headers.correlationId, ); - await this.correlation.runWithCorrelationId(correlationId, async () => { - try { - await this.service.handle(msg.payload); - await this.broker.ack(msg); - } catch (err) { - const terminal = err instanceof InvalidEnvelopeError; - this.logger.error('event-process.consume.error', { - action: 'event-process.consume.error', - correlationId, - terminal, - error: (err as Error).message, - }); - // Permanent failure → drop (terminal); transient → requeue. - await this.broker.nack(msg, !terminal); - } - }); + await this.correlation.runWithCorrelationId(correlationId, () => + processWithAckPolicy( + msg, + this.broker, + { logger: this.logger, context: 'EventsReceivedConsumer' }, + () => this.service.handle(msg.payload), + ), + ); } } diff --git a/src/shared/audience/errors/audience.errors.ts b/src/shared/audience/errors/audience.errors.ts new file mode 100644 index 0000000..aacfb9c --- /dev/null +++ b/src/shared/audience/errors/audience.errors.ts @@ -0,0 +1,28 @@ +import { TerminalError } from '../../errors/terminal-error'; + +/** + * Invalid audience/segment configuration: empty or malformed segment SQL, + * forbidden keyword, missing `id` column, unknown query type, or a missing + * segment. Deterministic by nature — the same campaign config fails identically + * on every retry — so it is terminal. + */ +export class AudienceConfigError extends TerminalError {} + +/** + * A deterministic database failure raised while computing a campaign audience + * (e.g. malformed segment SQL rejected by Postgres). Wraps the originating + * driver error so the consumer drops the message terminally instead of looping. + */ +export class DeterministicAudienceError extends TerminalError { + readonly campaignId: string; + + constructor(campaignId: string, cause: unknown) { + super( + `Deterministic audience computation failure for campaign ${campaignId}: ${ + cause instanceof Error ? cause.message : String(cause) + }`, + { cause }, + ); + this.campaignId = campaignId; + } +} diff --git a/src/shared/audience/segment-query-builder.service.ts b/src/shared/audience/segment-query-builder.service.ts index 32b63af..e43828a 100644 --- a/src/shared/audience/segment-query-builder.service.ts +++ b/src/shared/audience/segment-query-builder.service.ts @@ -6,6 +6,7 @@ import { Tagging, TaggableType } from '../../modules/labels/entities/tagging.ent import { TenantDbContext } from '../../evo-extension-points'; import { ContactsClientService } from '../crm-client/contacts-client.service'; import type { HydratedContact } from '../crm-client/types/contact'; +import { AudienceConfigError } from './errors/audience.errors'; export interface SegmentQuery { type: 'segment' | 'sql' | 'tags' | 'all'; @@ -125,12 +126,12 @@ export class SegmentQueryBuilderService { case 'segment': // This will be handled by AudienceComputationService // which integrates with SegmentComputationService - throw new Error( + throw new AudienceConfigError( 'Segment-based queries should be handled by AudienceComputationService', ); default: - throw new Error(`Unknown query type: ${query.type}`); + throw new AudienceConfigError(`Unknown query type: ${query.type}`); } } @@ -239,14 +240,14 @@ export class SegmentQueryBuilderService { const upperQuery = normalized.toUpperCase(); if (normalized.length === 0) { - throw new Error('SQL query is empty'); + throw new AudienceConfigError('SQL query is empty'); } if (normalized.includes(';')) { - throw new Error('SQL query cannot contain semicolons'); + throw new AudienceConfigError('SQL query cannot contain semicolons'); } if (upperQuery.includes('--') || upperQuery.includes('/*')) { - throw new Error('SQL query cannot contain SQL comments'); + throw new AudienceConfigError('SQL query cannot contain SQL comments'); } // Check for dangerous keywords @@ -264,18 +265,20 @@ export class SegmentQueryBuilderService { for (const keyword of dangerousKeywords) { if (upperQuery.includes(keyword)) { - throw new Error(`SQL query contains forbidden keyword: ${keyword}`); + throw new AudienceConfigError( + `SQL query contains forbidden keyword: ${keyword}`, + ); } } // Query must be a SELECT statement if (!upperQuery.trim().startsWith('SELECT')) { - throw new Error('SQL query must be a SELECT statement'); + throw new AudienceConfigError('SQL query must be a SELECT statement'); } // Query must expose an ID column to join with contacts. if (!/\bSELECT\b[\s\S]*\bID\b/i.test(normalized)) { - throw new Error('SQL query must select an "id" column'); + throw new AudienceConfigError('SQL query must select an "id" column'); } } @@ -341,7 +344,7 @@ export class SegmentQueryBuilderService { }); if (!segment) { - throw new Error(`Segment ${segmentId} not found`); + throw new AudienceConfigError(`Segment ${segmentId} not found`); } return segment; diff --git a/src/shared/broker/consumer/process-with-ack-policy.spec.ts b/src/shared/broker/consumer/process-with-ack-policy.spec.ts new file mode 100644 index 0000000..bdbdf67 --- /dev/null +++ b/src/shared/broker/consumer/process-with-ack-policy.spec.ts @@ -0,0 +1,76 @@ +import { processWithAckPolicy } from './process-with-ack-policy'; +import { TerminalError } from '../../errors/terminal-error'; +import { + BrokerMessage, + IMessageBroker, +} from '../interfaces/message-broker.interface'; + +class SampleTerminalError extends TerminalError {} + +describe('processWithAckPolicy', () => { + let ack: jest.Mock; + let nack: jest.Mock; + let broker: IMessageBroker; + let logger: any; + const msg = { + id: 'topic/1', + payload: {}, + headers: {}, + raw: {}, + } as BrokerMessage; + + beforeEach(() => { + ack = jest.fn().mockResolvedValue(undefined); + nack = jest.fn().mockResolvedValue(undefined); + broker = { ack, nack } as unknown as IMessageBroker; + logger = { error: jest.fn() }; + }); + + it('acks on success', async () => { + await processWithAckPolicy(msg, broker, { logger, context: 'T' }, () => + Promise.resolve(), + ); + + expect(ack).toHaveBeenCalledWith(msg); + expect(nack).not.toHaveBeenCalled(); + }); + + it('nacks WITHOUT requeue on a TerminalError', async () => { + await processWithAckPolicy(msg, broker, { logger, context: 'T' }, () => + Promise.reject(new SampleTerminalError('permanent')), + ); + + expect(ack).not.toHaveBeenCalled(); + expect(nack).toHaveBeenCalledWith(msg, false); + expect(logger.error).toHaveBeenCalledWith( + expect.any(String), + expect.objectContaining({ terminal: true }), + ); + }); + + it('nacks WITH requeue on any other (transient) error', async () => { + await processWithAckPolicy(msg, broker, { logger, context: 'T' }, () => + Promise.reject(new Error('transient')), + ); + + expect(nack).toHaveBeenCalledWith(msg, true); + expect(logger.error).toHaveBeenCalledWith( + expect.any(String), + expect.objectContaining({ terminal: false }), + ); + }); + + it('merges `meta` fields into the failure log', async () => { + await processWithAckPolicy( + msg, + broker, + { logger, context: 'T', meta: { campaignId: 'camp-9' } }, + () => Promise.reject(new Error('boom')), + ); + + expect(logger.error).toHaveBeenCalledWith( + expect.any(String), + expect.objectContaining({ campaignId: 'camp-9' }), + ); + }); +}); diff --git a/src/shared/broker/consumer/process-with-ack-policy.ts b/src/shared/broker/consumer/process-with-ack-policy.ts new file mode 100644 index 0000000..df4a407 --- /dev/null +++ b/src/shared/broker/consumer/process-with-ack-policy.ts @@ -0,0 +1,49 @@ +import { CustomLoggerService } from '../../../common/services/custom-logger.service'; +import { TerminalError } from '../../errors/terminal-error'; +import { + BrokerMessage, + IMessageBroker, +} from '../interfaces/message-broker.interface'; + +interface AckPolicyContext { + logger: CustomLoggerService; + context: string; + /** Extra structured fields merged into the failure log (e.g. campaignId). */ + meta?: Record; +} + +/** + * Shared ack/nack policy for every broker consumer in the pipeline + * (event-process, campaign-packer, campaign-sender, …). Runs `work`, then: + * - success → `ack` + * - `TerminalError` → `nack(requeue=false)` (permanent drop) + * - any other error → `nack(requeue=true)` (transient — redeliver) + * + * Centralizing this keeps the "what is terminal" decision in the error taxonomy + * (`TerminalError`) instead of copy-pasted try/catch blocks, so a new consumer + * gets the correct redelivery behavior for free. The caller is responsible for + * running this inside its own correlation context. + */ +export async function processWithAckPolicy( + msg: BrokerMessage, + broker: IMessageBroker, + { logger, context, meta }: AckPolicyContext, + work: () => Promise, +): Promise { + try { + await work(); + await broker.ack(msg); + } catch (err) { + const terminal = err instanceof TerminalError; + // Terminal and transient failures both log at `error`; the `terminal` flag + // distinguishes a permanent drop from a will-retry requeue for alerting. + logger.error(`${context} processing failed — nack(requeue=${!terminal})`, { + ...meta, + terminal, + messageId: msg.id, + error: err instanceof Error ? err.message : String(err), + stack: err instanceof Error ? err.stack : undefined, + }); + await broker.nack(msg, !terminal); + } +} diff --git a/src/shared/errors/terminal-error.ts b/src/shared/errors/terminal-error.ts new file mode 100644 index 0000000..37e0937 --- /dev/null +++ b/src/shared/errors/terminal-error.ts @@ -0,0 +1,17 @@ +/** + * Marker base class for errors that represent a PERMANENT, non-retriable + * failure. A broker consumer using `processWithAckPolicy` maps any + * `TerminalError` (and its subclasses) to `nack(requeue=false)` — a terminal + * drop — while every other error requeues for redelivery. + * + * Subclass it at the boundary where the "this can never succeed on retry" + * knowledge lives (malformed payload, invalid audience config, deterministic DB + * error). New terminal types extend the taxonomy without touching any + * consumer's ack/nack policy. + */ +export class TerminalError extends Error { + constructor(message: string, options?: ErrorOptions) { + super(message, options); + this.name = new.target.name; + } +} diff --git a/src/shared/persistence/deterministic-db-error.spec.ts b/src/shared/persistence/deterministic-db-error.spec.ts new file mode 100644 index 0000000..a9b7dab --- /dev/null +++ b/src/shared/persistence/deterministic-db-error.spec.ts @@ -0,0 +1,47 @@ +import { + extractSqlState, + isDeterministicDbError, +} from './deterministic-db-error'; + +describe('extractSqlState', () => { + it('reads a top-level driver `code`', () => { + expect(extractSqlState({ code: '42601' })).toBe('42601'); + }); + + it('reads a TypeORM QueryFailedError nested `driverError.code`', () => { + expect(extractSqlState({ driverError: { code: '22P02' } })).toBe('22P02'); + }); + + it('returns undefined for non-objects, missing or malformed codes', () => { + expect(extractSqlState(null)).toBeUndefined(); + expect(extractSqlState('boom')).toBeUndefined(); + expect(extractSqlState(new Error('no code'))).toBeUndefined(); + expect(extractSqlState({ code: 'nope' })).toBeUndefined(); + }); +}); + +describe('isDeterministicDbError', () => { + it.each([ + ['42601', 'syntax error'], + ['42P01', 'undefined table'], + ['42703', 'undefined column'], + ['22P02', 'invalid text representation'], + ])('classifies SQLSTATE %s (%s) as deterministic', (code) => { + expect(isDeterministicDbError({ code })).toBe(true); + }); + + it.each([ + ['08006', 'connection failure'], + ['53300', 'too many connections'], + ['57P03', 'cannot connect now'], + ['40P01', 'deadlock detected'], + ])('classifies SQLSTATE %s (%s) as transient', (code) => { + expect(isDeterministicDbError({ code })).toBe(false); + }); + + it('treats unknown / missing codes as NOT deterministic (safer to retry)', () => { + expect(isDeterministicDbError({ code: '99999' })).toBe(false); + expect(isDeterministicDbError(new Error('plain'))).toBe(false); + expect(isDeterministicDbError(undefined)).toBe(false); + }); +}); diff --git a/src/shared/persistence/deterministic-db-error.ts b/src/shared/persistence/deterministic-db-error.ts new file mode 100644 index 0000000..be6ffbe --- /dev/null +++ b/src/shared/persistence/deterministic-db-error.ts @@ -0,0 +1,40 @@ +/** + * Classifies a database error as deterministic (fails identically on every + * redelivery — e.g. malformed SQL, bad data) vs transient (may succeed on + * retry — e.g. a dropped connection). A broker consumer uses this at its + * ack/nack boundary to avoid requeueing a poison message forever. + * + * Keyed off the PostgreSQL SQLSTATE class (first two chars of the 5-char code): + * - deterministic: 42 (syntax error / access rule violation), + * 22 (data exception) + * - transient: 08 (connection exception), 53 (insufficient resources), + * 57 (operator intervention), 40 (transaction rollback / deadlock) + * + * Unknown codes default to NOT deterministic: requeueing a recoverable failure + * is safer than dropping it. The residual risk (an unclassified deterministic + * error that loops) is covered by the broker-level redelivery backstop + * (EVO-1677). + */ +const DETERMINISTIC_SQLSTATE_CLASSES = new Set(['42', '22']); +const TRANSIENT_SQLSTATE_CLASSES = new Set(['08', '53', '57', '40']); + +/** + * Extract a PostgreSQL SQLSTATE code from a raw driver error or a TypeORM + * `QueryFailedError` (which nests the driver error under `driverError`). + */ +export function extractSqlState(err: unknown): string | undefined { + if (err == null || typeof err !== 'object') return undefined; + const candidate = err as { code?: unknown; driverError?: { code?: unknown } }; + const code = candidate.driverError?.code ?? candidate.code; + return typeof code === 'string' && /^[0-9A-Z]{5}$/.test(code) + ? code + : undefined; +} + +export function isDeterministicDbError(err: unknown): boolean { + const sqlState = extractSqlState(err); + if (!sqlState) return false; + const sqlStateClass = sqlState.slice(0, 2); + if (TRANSIENT_SQLSTATE_CLASSES.has(sqlStateClass)) return false; + return DETERMINISTIC_SQLSTATE_CLASSES.has(sqlStateClass); +}