Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 17 additions & 30 deletions src/runners/campaign-packer/consumers/campaigns-pack.consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
import { CorrelationContext } from '../../../shared/correlation/correlation.context';
import { CustomLoggerService } from '../../../common/services/custom-logger.service';
import { CampaignPackerService } from '../services/campaign-packer.service';
import { CampaignNotFoundError } from '../errors/campaign-not-found.error';
import { processWithAckPolicy } from '../../../shared/broker/consumer/process-with-ack-policy';

const LOG_CONTEXT = 'CampaignsPackConsumer';

Expand All @@ -21,10 +21,10 @@ const LOG_CONTEXT = 'CampaignsPackConsumer';
* boot and routes each message to `CampaignPackerService`, wrapping processing
* in the request's `correlationId` so every downstream log carries it.
*
* Ack/nack policy:
* - success → ack
* - campaign not found / malformed payload → nack(requeue=false) (terminal)
* - any other (transient) error → nack(requeue=true) (retry)
* Ack/nack is delegated to the shared `processWithAckPolicy`: success → ack,
* `TerminalError` (campaign not found, invalid audience config, deterministic
* DB error) → nack(requeue=false), any other error → nack(requeue=true). A
* structurally invalid payload is dropped up-front (no correlationId to bind).
*/
@Injectable()
export class CampaignsPackConsumer implements OnModuleInit {
Expand Down Expand Up @@ -57,32 +57,19 @@ export class CampaignsPackConsumer implements OnModuleInit {

const payload = msg.payload;

await this.correlation.runWithCorrelationId(
payload.correlationId,
async () => {
try {
await this.correlation.runWithCorrelationId(payload.correlationId, () =>
processWithAckPolicy(
msg,
this.broker,
{
logger: this.logger,
context: LOG_CONTEXT,
meta: { campaignId: payload.campaignId },
},
async () => {
await this.packer.pack(payload);
await this.broker.ack(msg);
} catch (err) {
if (err instanceof CampaignNotFoundError) {
this.logger.warn(
`campaign not found (campaignId=${err.campaignId}) — nack(requeue=false)`,
LOG_CONTEXT,
);
await this.broker.nack(msg, false);
return;
}

this.logger.error(
`campaigns.pack processing failed (campaignId=${payload.campaignId}): ${
err instanceof Error ? err.message : String(err)
} — nack(requeue=true)`,
err instanceof Error ? err.stack : undefined,
LOG_CONTEXT,
);
await this.broker.nack(msg, true);
}
},
},
),
);
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import { TerminalError } from '../../../shared/errors/terminal-error';

/**
* Terminal error for a `campaigns.pack` message whose `campaignId` does not
* resolve to a Campaign row. The consumer maps it to `nack(requeue=false)` —
* requeueing would loop forever since the campaign will never appear.
*/
export class CampaignNotFoundError extends Error {
export class CampaignNotFoundError extends TerminalError {
readonly campaignId: string;

constructor(campaignId: string) {
super(`Campaign ${campaignId} not found`);
this.name = 'CampaignNotFoundError';
this.campaignId = campaignId;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { CampaignPackerService } from './campaign-packer.service';
import { CampaignNotFoundError } from '../errors/campaign-not-found.error';
import {
AudienceConfigError,
DeterministicAudienceError,
} from '../../../shared/audience/errors/audience.errors';
import { TerminalError } from '../../../shared/errors/terminal-error';
import type { CampaignsPackContract } from '../../../shared/broker/contracts/campaigns-pack.contract';

const payload: CampaignsPackContract = {
Expand Down Expand Up @@ -54,4 +59,40 @@ describe('CampaignPackerService', () => {
);
expect(computeAudience).not.toHaveBeenCalled();
});

it('wraps a deterministic DB error (malformed SQL) as a terminal DeterministicAudienceError', async () => {
findOne.mockResolvedValueOnce({ id: 'camp-1' });
const pgError = Object.assign(new Error('syntax error at or near "FROM"'), {
code: '42601',
});
computeAudience.mockRejectedValueOnce(pgError);

const err = await service.pack(payload).catch((e) => e);
expect(err).toBeInstanceOf(DeterministicAudienceError);
expect(err).toBeInstanceOf(TerminalError);
expect(err.campaignId).toBe('camp-1');
});

it('propagates an AudienceConfigError unchanged (already terminal)', async () => {
findOne.mockResolvedValueOnce({ id: 'camp-1' });
computeAudience.mockRejectedValueOnce(
new AudienceConfigError('SQL query is empty'),
);

const err = await service.pack(payload).catch((e) => e);
expect(err).toBeInstanceOf(AudienceConfigError);
expect(err).not.toBeInstanceOf(DeterministicAudienceError);
});

it('rethrows a transient error (connection drop) so the consumer requeues', async () => {
findOne.mockResolvedValueOnce({ id: 'camp-1' });
const transient = Object.assign(new Error('connection terminated'), {
code: '08006',
});
computeAudience.mockRejectedValueOnce(transient);

const err = await service.pack(payload).catch((e) => e);
expect(err).toBe(transient);
expect(err).not.toBeInstanceOf(TerminalError);
});
});
29 changes: 27 additions & 2 deletions src/runners/campaign-packer/services/campaign-packer.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@ import { Injectable } from '@nestjs/common';
import { Repository } from 'typeorm';
import { Campaign } from '../../../modules/campaigns/entities/campaign.entity';
import { TenantDbContext } from '../../../evo-extension-points';
import { AudienceComputationService } from '../../../shared/audience/audience-computation.service';
import {
AudienceComputationService,
AudienceComputationResult,
} from '../../../shared/audience/audience-computation.service';
import { CustomLoggerService } from '../../../common/services/custom-logger.service';
import type { CampaignsPackContract } from '../../../shared/broker/contracts/campaigns-pack.contract';
import { CampaignNotFoundError } from '../errors/campaign-not-found.error';
import { isDeterministicDbError } from '../../../shared/persistence/deterministic-db-error';
import { DeterministicAudienceError } from '../../../shared/audience/errors/audience.errors';

export interface PackResult {
audienceSize: number;
Expand Down Expand Up @@ -43,7 +48,7 @@ export class CampaignPackerService {
throw new CampaignNotFoundError(campaignId);
}

const result = await this.audience.computeAudience(campaignId);
const result = await this.computeAudienceOrClassify(campaignId);
const audienceSize = result.totalContacts;

this.logger.log('campaign.packed', {
Expand All @@ -56,4 +61,24 @@ export class CampaignPackerService {

return { audienceSize };
}

/**
* Run audience computation, classifying its failures for the consumer's
* ack/nack policy. Invalid segment config already surfaces as a
* `TerminalError` (rethrown as-is); a malformed segment SQL rejected by
* Postgres is a deterministic DB error and is wrapped terminally so it drops
* instead of looping. Everything else is transient and propagates to requeue.
*/
private async computeAudienceOrClassify(
campaignId: string,
): Promise<AudienceComputationResult> {
try {
return await this.audience.computeAudience(campaignId);
} catch (err) {
if (isDeterministicDbError(err)) {
throw new DeterministicAudienceError(campaignId, err);
}
throw err;
}
}
}
3 changes: 2 additions & 1 deletion src/runners/event-process/services/event-process.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import {
EventsReceivedContract,
isEventsReceivedContract,
} from 'src/shared/broker/contracts/events-received.contract';
import { TerminalError } from 'src/shared/errors/terminal-error';

/**
* Thrown when a consumed message is not a valid `events.received` envelope.
* It is a permanent (non-retriable) failure — the consumer must drop it
* (terminal nack) rather than requeue, or it would redeliver forever.
*/
export class InvalidEnvelopeError extends Error {}
export class InvalidEnvelopeError extends TerminalError {}

/**
* Stub handler for the webhook event pipeline (story 3.3 / EVO-1208).
Expand Down
30 changes: 10 additions & 20 deletions src/runners/event-process/services/events-received.consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@ import {
} from 'src/shared/broker/interfaces/message-broker.interface';
import { EVENTS_RECEIVED_TOPIC_PREFIX } from 'src/shared/broker/contracts/events-received.contract';
import { CorrelationContext } from 'src/shared/correlation/correlation.context';
import {
EventProcessService,
InvalidEnvelopeError,
} from './event-process.service';
import { processWithAckPolicy } from 'src/shared/broker/consumer/process-with-ack-policy';
import { EventProcessService } from './event-process.service';

/**
* Subscribes to the whole `events.received.<platform>` topic family via the
Expand Down Expand Up @@ -51,21 +49,13 @@ export class EventsReceivedConsumer implements OnApplicationBootstrap {
const correlationId = this.correlation.resolveIncoming(
msg.headers.correlationId,
);
await this.correlation.runWithCorrelationId(correlationId, async () => {
try {
await this.service.handle(msg.payload);
await this.broker.ack(msg);
} catch (err) {
const terminal = err instanceof InvalidEnvelopeError;
this.logger.error('event-process.consume.error', {
action: 'event-process.consume.error',
correlationId,
terminal,
error: (err as Error).message,
});
// Permanent failure → drop (terminal); transient → requeue.
await this.broker.nack(msg, !terminal);
}
});
await this.correlation.runWithCorrelationId(correlationId, () =>
processWithAckPolicy(
msg,
this.broker,
{ logger: this.logger, context: 'EventsReceivedConsumer' },
() => this.service.handle(msg.payload),
),
);
}
}
28 changes: 28 additions & 0 deletions src/shared/audience/errors/audience.errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { TerminalError } from '../../errors/terminal-error';

/**
* Invalid audience/segment configuration: empty or malformed segment SQL,
* forbidden keyword, missing `id` column, unknown query type, or a missing
* segment. Deterministic by nature — the same campaign config fails identically
* on every retry — so it is terminal.
*/
export class AudienceConfigError extends TerminalError {}

/**
* A deterministic database failure raised while computing a campaign audience
* (e.g. malformed segment SQL rejected by Postgres). Wraps the originating
* driver error so the consumer drops the message terminally instead of looping.
*/
export class DeterministicAudienceError extends TerminalError {
readonly campaignId: string;

constructor(campaignId: string, cause: unknown) {
super(
`Deterministic audience computation failure for campaign ${campaignId}: ${
cause instanceof Error ? cause.message : String(cause)
}`,
{ cause },
);
this.campaignId = campaignId;
}
}
21 changes: 12 additions & 9 deletions src/shared/audience/segment-query-builder.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { Tagging, TaggableType } from '../../modules/labels/entities/tagging.ent
import { TenantDbContext } from '../../evo-extension-points';
import { ContactsClientService } from '../crm-client/contacts-client.service';
import type { HydratedContact } from '../crm-client/types/contact';
import { AudienceConfigError } from './errors/audience.errors';

export interface SegmentQuery {
type: 'segment' | 'sql' | 'tags' | 'all';
Expand Down Expand Up @@ -125,12 +126,12 @@ export class SegmentQueryBuilderService {
case 'segment':
// This will be handled by AudienceComputationService
// which integrates with SegmentComputationService
throw new Error(
throw new AudienceConfigError(
'Segment-based queries should be handled by AudienceComputationService',
);

default:
throw new Error(`Unknown query type: ${query.type}`);
throw new AudienceConfigError(`Unknown query type: ${query.type}`);
}
}

Expand Down Expand Up @@ -239,14 +240,14 @@ export class SegmentQueryBuilderService {
const upperQuery = normalized.toUpperCase();

if (normalized.length === 0) {
throw new Error('SQL query is empty');
throw new AudienceConfigError('SQL query is empty');
}

if (normalized.includes(';')) {
throw new Error('SQL query cannot contain semicolons');
throw new AudienceConfigError('SQL query cannot contain semicolons');
}
if (upperQuery.includes('--') || upperQuery.includes('/*')) {
throw new Error('SQL query cannot contain SQL comments');
throw new AudienceConfigError('SQL query cannot contain SQL comments');
}

// Check for dangerous keywords
Expand All @@ -264,18 +265,20 @@ export class SegmentQueryBuilderService {

for (const keyword of dangerousKeywords) {
if (upperQuery.includes(keyword)) {
throw new Error(`SQL query contains forbidden keyword: ${keyword}`);
throw new AudienceConfigError(
`SQL query contains forbidden keyword: ${keyword}`,
);
}
}

// Query must be a SELECT statement
if (!upperQuery.trim().startsWith('SELECT')) {
throw new Error('SQL query must be a SELECT statement');
throw new AudienceConfigError('SQL query must be a SELECT statement');
}

// Query must expose an ID column to join with contacts.
if (!/\bSELECT\b[\s\S]*\bID\b/i.test(normalized)) {
throw new Error('SQL query must select an "id" column');
throw new AudienceConfigError('SQL query must select an "id" column');
}
}

Expand Down Expand Up @@ -341,7 +344,7 @@ export class SegmentQueryBuilderService {
});

if (!segment) {
throw new Error(`Segment ${segmentId} not found`);
throw new AudienceConfigError(`Segment ${segmentId} not found`);
}

return segment;
Expand Down
Loading
Loading