Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import { AudienceModule } from './shared/audience/audience.module';
import { MessagingChannelsModule } from './shared/messaging-channels/messaging-channels.module';
import { EventReceiverModule } from './runners/event-receiver/event-receiver.module';
import { CampaignPackerModule } from './runners/campaign-packer/campaign-packer.module';
import { EventProcessModule } from './runners/event-process/event-process.module';
import { AppFactory } from './app-factory';
import {
EvoExtensionPoints,
Expand Down Expand Up @@ -103,6 +104,9 @@ export class AppModule {
if (AppFactory.shouldStartCampaignPacker()) {
conditionalImports.push(CampaignPackerModule);
}
if (AppFactory.shouldStartEventProcess()) {
conditionalImports.push(EventProcessModule);
}

// Extension point (story 0.15): external consumers — e.g. an enterprise
// overlay — register NestJS modules through the plugin_loader seam. The
Expand Down
2 changes: 1 addition & 1 deletion src/bootstrap/bootstrap.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ export class BootstrapService {
break;
case RunMode.EVENT_PROCESS:
this.logger.log(
'⚙️ EVENT PROCESS MODE: Broker-driven event processor (stub — module pending)',
'⚙️ EVENT PROCESS MODE: Broker-driven event processor (events.received.* consumer)',
);
break;
}
Expand Down
1 change: 0 additions & 1 deletion src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ parseRunMode(process.env.RUN_MODE);
// matching entry from this Set.
const STUB_RUN_MODES = new Set<string>([
RunMode.CAMPAIGN_SENDER, // wired by downstream campaign-sender story (epic 4)
RunMode.EVENT_PROCESS, // wired by downstream event-process story (epic 3)
]);
if (STUB_RUN_MODES.has(process.env.RUN_MODE ?? '')) {
// Structured JSON to stderr so log collectors (Loki / Datadog) ingest the
Expand Down
17 changes: 17 additions & 0 deletions src/runners/event-process/event-process.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { Module } from '@nestjs/common';
import { EventProcessService } from './services/event-process.service';
import { EventsReceivedConsumer } from './services/events-received.consumer';

/**
* Runner module for RUN_MODE=event-process (story 3.3 / EVO-1208).
*
* Boots the `events.received.<platform>` consumer end of the webhook pipeline.
* IMESSAGE_BROKER (BrokerModule, @Global) and CorrelationContext
* (CorrelationModule, @Global) come from their own modules, so this module only
* declares the consumer + stub handler. Imported conditionally from
* AppModule.forRoot() when AppFactory.shouldStartEventProcess() is true.
*/
@Module({
providers: [EventProcessService, EventsReceivedConsumer],
})
export class EventProcessModule {}
31 changes: 31 additions & 0 deletions src/runners/event-process/services/event-process.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { EventProcessService } from './event-process.service';

describe('EventProcessService', () => {
const service = new EventProcessService();

const validEnvelope = {
platform: 'evolution-api',
rawPayload: { hello: 'world' },
headers: { 'x-test': '1' },
receivedAt: '2026-06-08T12:00:00.000Z',
sourceIp: '203.0.113.10',
ingestionId: '00000000-0000-4000-8000-000000000000',
correlationId: '11111111-1111-4111-8111-111111111111',
};

it('resolves for a valid events.received envelope', async () => {
await expect(service.handle(validEnvelope)).resolves.toBeUndefined();
});

it('throws for a payload that is not a valid envelope', async () => {
await expect(service.handle({ not: 'an-envelope' })).rejects.toThrow(
/not a valid events.received envelope/,
);
});

it('throws for a known-shape envelope with an invalid platform', async () => {
await expect(
service.handle({ ...validEnvelope, platform: 'not-a-platform' }),
).rejects.toThrow();
});
});
48 changes: 48 additions & 0 deletions src/runners/event-process/services/event-process.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { Injectable } from '@nestjs/common';
import { CustomLoggerService } from 'src/common/services/custom-logger.service';
import {
EventsReceivedContract,
isEventsReceivedContract,
} from 'src/shared/broker/contracts/events-received.contract';

/**
* Thrown when a consumed message is not a valid `events.received` envelope.
* It is a permanent (non-retriable) failure — the consumer must drop it
* (terminal nack) rather than requeue, or it would redeliver forever.
*/
export class InvalidEnvelopeError extends Error {}

/**
* Stub handler for the webhook event pipeline (story 3.3 / EVO-1208).
*
* Validates the inbound `events.received.<platform>` envelope and logs it. The
* real pipeline — signature validation (3.4), idempotency (3.5), enrichment
* (3.6) and ClickHouse persist (3.7) — replaces this body in later stories.
* Throws on a malformed envelope so the consumer can nack/redeliver rather than
* silently dropping a message.
*/
@Injectable()
export class EventProcessService {
private readonly logger = new CustomLoggerService(EventProcessService.name);

async handle(envelope: unknown): Promise<void> {
if (!isEventsReceivedContract(envelope)) {
throw new InvalidEnvelopeError(
'event-process received a payload that is not a valid events.received envelope',
);
}
const valid: EventsReceivedContract = envelope;

this.logger.log('event-process.handle', {
action: 'event-process.handle',
platform: valid.platform,
correlationId: valid.correlationId,
ingestionId: valid.ingestionId,
rawPayloadBytes: Buffer.byteLength(
JSON.stringify(valid.rawPayload ?? null),
),
});

return Promise.resolve();
}
}
100 changes: 100 additions & 0 deletions src/runners/event-process/services/events-received.consumer.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import { EVENTS_RECEIVED_TOPIC_PREFIX } from 'src/shared/broker/contracts/events-received.contract';
import { BrokerMessage } from 'src/shared/broker/interfaces/message-broker.interface';
import { EventsReceivedConsumer } from './events-received.consumer';
import { InvalidEnvelopeError } from './event-process.service';

type Handler = (msg: BrokerMessage) => Promise<void>;

function buildMsg(overrides: Partial<BrokerMessage> = {}): BrokerMessage {
return {
id: 'events.received/1',
payload: { platform: 'evolution-api' },
headers: { correlationId: 'cid-123' },
raw: {},
...overrides,
};
}

describe('EventsReceivedConsumer', () => {
function setup() {
let captured: Handler | undefined;
const broker = {
subscribePattern: jest.fn((_prefix: string, handler: Handler) => {
captured = handler;
return Promise.resolve();
}),
ack: jest.fn(() => Promise.resolve()),
nack: jest.fn(() => Promise.resolve()),
publish: jest.fn(),
subscribe: jest.fn(),
};
const correlation = {
// Invoke fn synchronously so the handler body runs under the test.
runWithCorrelationId: jest.fn((_id: string, fn: () => unknown) => fn()),
resolveIncoming: jest.fn((incoming?: string) => incoming ?? 'minted-id'),
};
const service = { handle: jest.fn(() => Promise.resolve()) };
const consumer = new EventsReceivedConsumer(
broker as never,
correlation as never,
service as never,
);
return {
consumer,
broker,
correlation,
service,
getHandler: () => captured,
};
}

it('subscribes to the events.received prefix on init', async () => {
const { consumer, broker } = setup();
await consumer.onApplicationBootstrap();
expect(broker.subscribePattern).toHaveBeenCalledWith(
EVENTS_RECEIVED_TOPIC_PREFIX,
expect.any(Function),
);
});

it('runs the handler under the message correlationId and acks on success', async () => {
const { consumer, broker, correlation, service, getHandler } = setup();
await consumer.onApplicationBootstrap();
const msg = buildMsg();

await getHandler()!(msg);

expect(correlation.resolveIncoming).toHaveBeenCalledWith('cid-123');
expect(correlation.runWithCorrelationId).toHaveBeenCalledWith(
'cid-123',
expect.any(Function),
);
expect(service.handle).toHaveBeenCalledWith(msg.payload);
expect(broker.ack).toHaveBeenCalledWith(msg);
expect(broker.nack).not.toHaveBeenCalled();
});

it('nacks with requeue on a transient (non-validation) failure', async () => {
const { consumer, broker, service, getHandler } = setup();
service.handle.mockRejectedValueOnce(new Error('boom'));
await consumer.onApplicationBootstrap();
const msg = buildMsg();

await getHandler()!(msg);

expect(broker.ack).not.toHaveBeenCalled();
expect(broker.nack).toHaveBeenCalledWith(msg, true);
});

it('drops (terminal nack) an invalid envelope instead of requeuing forever', async () => {
const { consumer, broker, service, getHandler } = setup();
service.handle.mockRejectedValueOnce(new InvalidEnvelopeError('bad'));
await consumer.onApplicationBootstrap();
const msg = buildMsg();

await getHandler()!(msg);

expect(broker.ack).not.toHaveBeenCalled();
expect(broker.nack).toHaveBeenCalledWith(msg, false);
});
});
71 changes: 71 additions & 0 deletions src/runners/event-process/services/events-received.consumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { Inject, Injectable, OnApplicationBootstrap } from '@nestjs/common';
import { CustomLoggerService } from 'src/common/services/custom-logger.service';
import {
BrokerMessage,
IMessageBroker,
IMESSAGE_BROKER,
} from 'src/shared/broker/interfaces/message-broker.interface';
import { EVENTS_RECEIVED_TOPIC_PREFIX } from 'src/shared/broker/contracts/events-received.contract';
import { CorrelationContext } from 'src/shared/correlation/correlation.context';
import {
EventProcessService,
InvalidEnvelopeError,
} from './event-process.service';

/**
* Subscribes to the whole `events.received.<platform>` topic family via the
* broker's wildcard `subscribePattern` and runs each envelope through the
* (stubbed) `EventProcessService` inside the correlation context, then acks.
*
* Subscription happens in `onApplicationBootstrap` (not `onModuleInit`) so the
* broker adapter has finished its own `onModuleInit` (active=true) first —
* otherwise the hook order between this consumer and the adapter is undefined.
*
* Failure handling: a permanent `InvalidEnvelopeError` is dropped (terminal
* nack) so a poison message cannot redeliver forever; any other (transient)
* error requeues for redelivery.
*/
@Injectable()
export class EventsReceivedConsumer implements OnApplicationBootstrap {
private readonly logger = new CustomLoggerService(
EventsReceivedConsumer.name,
);

constructor(
@Inject(IMESSAGE_BROKER) private readonly broker: IMessageBroker,
private readonly correlation: CorrelationContext,
private readonly service: EventProcessService,
) {}

async onApplicationBootstrap(): Promise<void> {
await this.broker.subscribePattern(EVENTS_RECEIVED_TOPIC_PREFIX, (msg) =>
this.dispatch(msg),
);
this.logger.log('event-process.subscribed', {
action: 'event-process.subscribed',
prefix: EVENTS_RECEIVED_TOPIC_PREFIX,
});
}

private async dispatch(msg: BrokerMessage): Promise<void> {
const correlationId = this.correlation.resolveIncoming(
msg.headers.correlationId,
);
await this.correlation.runWithCorrelationId(correlationId, async () => {
try {
await this.service.handle(msg.payload);
await this.broker.ack(msg);
} catch (err) {
const terminal = err instanceof InvalidEnvelopeError;
this.logger.error('event-process.consume.error', {
action: 'event-process.consume.error',
correlationId,
terminal,
error: (err as Error).message,
});
// Permanent failure → drop (terminal); transient → requeue.
await this.broker.nack(msg, !terminal);
}
});
}
}
33 changes: 33 additions & 0 deletions src/shared/broker/adapters/kafka-broker.adapter.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -669,4 +669,37 @@ describe('KafkaBrokerAdapter', () => {
await close();
});
});

describe('subscribePattern', () => {
it('subscribes with a RegExp under groupId `${RUN_MODE}-${prefix}` and creates no topic', async () => {
const { adapter, close } = await buildAdapter({
BROKER_TYPE: 'kafka',
KAFKA_BROKERS: 'localhost:9092',
RUN_MODE: 'event-process',
});
await (
adapter as unknown as { onModuleInit: () => Promise<void> }
).onModuleInit();

await adapter.subscribePattern('events.received', () =>
Promise.resolve(),
);

const k = lastKafka();
expect(k.consumerGroupIds).toContain('event-process-events.received');

const consumer = k.consumers[k.consumers.length - 1];
const firstCall = consumer.subscribe.mock.calls[0] as unknown[];
const subscribeArg = firstCall[0] as {
topics: RegExp[];
fromBeginning: boolean;
};
expect(subscribeArg.fromBeginning).toBe(false);
expect(subscribeArg.topics[0]).toBeInstanceOf(RegExp);
expect(subscribeArg.topics[0].source).toBe('^events\\.received\\.[^.]+$');
// A pattern subscription must not create a topic (cannot create a regex).
expect(k.admin.createTopics).not.toHaveBeenCalled();
await close();
});
});
});
Loading
Loading