Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/runners/campaign-packer/campaign-packer.module.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Module } from '@nestjs/common';
import { CampaignPackerService } from './services/campaign-packer.service';
import { PaginationService } from './services/pagination.service';
import { CampaignsPackConsumer } from './consumers/campaigns-pack.consumer';

/**
Expand All @@ -13,6 +14,6 @@ import { CampaignsPackConsumer } from './consumers/campaigns-pack.consumer';
* AppModule.forRoot() when AppFactory.shouldStartCampaignPacker() is true.
*/
@Module({
providers: [CampaignPackerService, CampaignsPackConsumer],
providers: [CampaignPackerService, PaginationService, CampaignsPackConsumer],
})
export class CampaignPackerModule {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { TerminalError } from '../../../shared/errors/terminal-error';

/**
* Terminal error for a campaign whose audience resolved but which is missing
* the channel type or a message template required to build a `campaigns.send`
* message. The misconfiguration is permanent, so the consumer drops it
* (nack requeue=false) instead of looping.
*/
export class CampaignNotConfiguredError extends TerminalError {
readonly campaignId: string;

constructor(campaignId: string, reason: string) {
super(`Campaign ${campaignId} is not dispatchable: ${reason}`);
this.campaignId = campaignId;
}
}
189 changes: 180 additions & 9 deletions src/runners/campaign-packer/services/campaign-packer.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
import { CampaignPackerService } from './campaign-packer.service';
import { PaginationService } from './pagination.service';
import { CampaignNotFoundError } from '../errors/campaign-not-found.error';
import { CampaignNotConfiguredError } from '../errors/campaign-not-configured.error';
import {
AudienceConfigError,
DeterministicAudienceError,
} from '../../../shared/audience/errors/audience.errors';
import { TerminalError } from '../../../shared/errors/terminal-error';
import { CampaignContact } from '../../../modules/campaigns/entities/campaign-contact.entity';
import type { CampaignsPackContract } from '../../../shared/broker/contracts/campaigns-pack.contract';
import {
CAMPAIGNS_SEND_TOPIC,
isCampaignsSendContract,
} from '../../../shared/broker/contracts/campaigns-send.contract';
import { CAMPAIGNS_TRACKED_TOPIC } from '../../../shared/broker/contracts/campaigns-tracked.contract';

const payload: CampaignsPackContract = {
campaignId: 'camp-1',
Expand All @@ -14,43 +22,206 @@ const payload: CampaignsPackContract = {
correlationId: '11111111-1111-4111-8111-111111111111',
};

const contactRows = (n: number): Array<{ contactId: string }> =>
Array.from({ length: n }, (_, i) => ({ contactId: `contact-${i + 1}` }));

const sendCalls = (publish: jest.Mock): unknown[][] =>
publish.mock.calls.filter(([topic]) => topic === CAMPAIGNS_SEND_TOPIC);

describe('CampaignPackerService', () => {
let service: CampaignPackerService;
let findOne: jest.Mock;
let find: jest.Mock;
let publish: jest.Mock;
let computeAudience: jest.Mock;
let log: jest.Mock;
let warn: jest.Mock;

beforeEach(() => {
findOne = jest.fn();
find = jest.fn().mockResolvedValue([]);
publish = jest.fn().mockResolvedValue(undefined);
computeAudience = jest.fn();
log = jest.fn();
const db = { getRepository: () => ({ findOne }) } as any;
warn = jest.fn();
const db = {
getRepository: (entity: unknown) =>
entity === CampaignContact ? { find } : { findOne },
} as any;
const audience = { computeAudience } as any;
const logger = { log, warn: jest.fn(), error: jest.fn() } as any;
service = new CampaignPackerService(db, audience, logger);
const logger = { log, warn, error: jest.fn() } as any;
const broker = { publish } as any;
service = new CampaignPackerService(
db,
audience,
logger,
broker,
new PaginationService(),
);
});

it('loads the campaign, computes audience and logs audienceSize', async () => {
findOne.mockResolvedValueOnce({ id: 'camp-1' });
findOne.mockResolvedValueOnce({
id: 'camp-1',
channelType: 'Channel::Email',
templates: [{ messageTemplateId: 'tmpl-1', variant: 'A' }],
});
computeAudience.mockResolvedValueOnce({
campaignId: 'camp-1',
totalContacts: 42,
validContacts: 40,
invalidContacts: 2,
totalContacts: 2,
validContacts: 2,
invalidContacts: 0,
processingTimeMs: 10,
strategy: 'segment',
});
find.mockResolvedValueOnce(contactRows(2));

const result = await service.pack(payload);

expect(computeAudience).toHaveBeenCalledWith('camp-1');
expect(result).toEqual({ audienceSize: 42 });
expect(result).toEqual({ audienceSize: 2 });
expect(log).toHaveBeenCalledWith(
'campaign.packed',
expect.objectContaining({ campaignId: 'camp-1', audienceSize: 42 }),
expect.objectContaining({ campaignId: 'camp-1', audienceSize: 2 }),
);
});

it('paginates the audience and publishes one campaigns.send per page (AC3)', async () => {
findOne.mockResolvedValueOnce({
id: 'camp-1',
channelType: 'Channel::Email',
templates: [{ messageTemplateId: 'tmpl-1', variant: 'A' }],
});
computeAudience.mockResolvedValueOnce({
totalContacts: 1500,
validContacts: 1500,
invalidContacts: 0,
strategy: 'segment',
});
find.mockResolvedValueOnce(contactRows(1500));

await service.pack(payload);

const calls = sendCalls(publish);
expect(calls).toHaveLength(2);
expect(calls[0][1]).toMatchObject({
page: 1,
totalPages: 2,
channelType: 'email',
templateId: 'tmpl-1',
correlationId: payload.correlationId,
});
expect((calls[0][1] as { contactIds: string[] }).contactIds).toHaveLength(
1000,
);
expect(calls[1][1]).toMatchObject({ page: 2, totalPages: 2 });
expect((calls[1][1] as { contactIds: string[] }).contactIds).toHaveLength(
500,
);
});

it('publishes campaigns.tracked completed and warns on empty audience (AC2)', async () => {
findOne.mockResolvedValueOnce({
id: 'camp-1',
channelType: 'Channel::Email',
templates: [],
});
computeAudience.mockResolvedValueOnce({
totalContacts: 0,
validContacts: 0,
invalidContacts: 0,
strategy: 'segment',
});
find.mockResolvedValueOnce([]);

await service.pack(payload);

expect(publish).toHaveBeenCalledTimes(1);
expect(publish).toHaveBeenCalledWith(
CAMPAIGNS_TRACKED_TOPIC,
expect.objectContaining({
campaignId: 'camp-1',
page: 0,
sentCount: 0,
failedCount: 0,
completed: true,
correlationId: payload.correlationId,
}),
);
expect(warn).toHaveBeenCalledWith('campaign has no contacts', {
campaignId: 'camp-1',
});
expect(sendCalls(publish)).toHaveLength(0);
});

it('emits a payload that satisfies the campaigns.send contract and maps channelType (AC4)', async () => {
findOne.mockResolvedValueOnce({
id: 'camp-1',
channelType: 'Channel::Whatsapp',
templates: [{ messageTemplateId: 'tmpl-9', variant: 'A' }],
});
computeAudience.mockResolvedValueOnce({
totalContacts: 3,
validContacts: 3,
invalidContacts: 0,
strategy: 'segment',
});
find.mockResolvedValueOnce(contactRows(3));

await service.pack(payload);

const [, message] = sendCalls(publish)[0];
expect(isCampaignsSendContract(message)).toBe(true);
expect(message).toMatchObject({
channelType: 'whatsapp',
templateId: 'tmpl-9',
page: 1,
totalPages: 1,
});
});

it('prefers the A-variant template when multiple templates exist', async () => {
findOne.mockResolvedValueOnce({
id: 'camp-1',
channelType: 'Channel::Email',
templates: [
{ messageTemplateId: 'tmpl-b', variant: 'B' },
{ messageTemplateId: 'tmpl-a', variant: 'A' },
],
});
computeAudience.mockResolvedValueOnce({
totalContacts: 1,
validContacts: 1,
invalidContacts: 0,
strategy: 'segment',
});
find.mockResolvedValueOnce(contactRows(1));

await service.pack(payload);

expect(sendCalls(publish)[0][1]).toMatchObject({ templateId: 'tmpl-a' });
});

it('throws a terminal CampaignNotConfiguredError when no template exists', async () => {
findOne.mockResolvedValueOnce({
id: 'camp-1',
channelType: 'Channel::Email',
templates: [],
});
computeAudience.mockResolvedValueOnce({
totalContacts: 2,
validContacts: 2,
invalidContacts: 0,
strategy: 'segment',
});
find.mockResolvedValueOnce(contactRows(2));

const err = await service.pack(payload).catch((e) => e);
expect(err).toBeInstanceOf(CampaignNotConfiguredError);
expect(err).toBeInstanceOf(TerminalError);
expect(sendCalls(publish)).toHaveLength(0);
});

it('throws CampaignNotFoundError when the campaign does not exist', async () => {
findOne.mockResolvedValueOnce(null);

Expand Down
Loading
Loading