Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions src/runners/event-process/metrics/event-process-metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@ import { Injectable } from '@nestjs/common';
import { Counter, register } from 'prom-client';

const SIGNATURE_INVALID_METRIC = 'evo_webhook_signature_invalid_total';
const EVENT_DUPLICATES_DROPPED_METRIC =
'evo_webhook_event_duplicates_dropped_total';

/**
* Prometheus counters for the event-process webhook pipeline (story 3.4).
* Prometheus counters for the event-process webhook pipeline (stories 3.4, 3.5).
*
* The counter is fetched from the global registry if it already exists so that
* Each counter is fetched from the global registry if it already exists so that
* re-instantiating this provider (e.g. across test modules) does not throw the
* "metric already registered" error prom-client raises on duplicate names.
*/
@Injectable()
export class EventProcessMetrics {
readonly signatureInvalid: Counter<string>;
readonly eventDuplicatesDropped: Counter<string>;

constructor() {
this.signatureInvalid =
Expand All @@ -24,5 +27,15 @@ export class EventProcessMetrics {
help: 'Webhook envelopes dropped because the signature was missing, invalid or unverifiable',
labelNames: ['platform', 'reason'],
});

this.eventDuplicatesDropped =
(register.getSingleMetric(EVENT_DUPLICATES_DROPPED_METRIC) as
| Counter<string>
| undefined) ??
new Counter({
name: EVENT_DUPLICATES_DROPPED_METRIC,
help: 'Webhook envelopes dropped because an identical payload was already processed within the idempotency TTL',
labelNames: ['platform'],
});
}
}
46 changes: 45 additions & 1 deletion src/runners/event-process/services/event-process.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import { EventProcessService } from './event-process.service';
import { SignatureValidatorRegistry } from './signature-validator.registry';
import { EventProcessMetrics } from '../metrics/event-process-metrics';
import { IdempotencyService } from 'src/shared/idempotency/idempotency.service';

describe('EventProcessService', () => {
let service: EventProcessService;
let validate: jest.Mock;
let forPlatform: jest.Mock;
let inc: jest.Mock;
let duplicatesInc: jest.Mock;
let computeHash: jest.Mock;
let checkAndMark: jest.Mock;

const validEnvelope = {
platform: 'evolution-api',
Expand All @@ -24,9 +28,16 @@ describe('EventProcessService', () => {
.fn()
.mockReturnValue({ platform: 'evolution-api', validate });
inc = jest.fn();
duplicatesInc = jest.fn();
computeHash = jest.fn((payload: string) => `hash:${payload}`);
checkAndMark = jest.fn().mockResolvedValue(true);
service = new EventProcessService(
{ for: forPlatform } as unknown as SignatureValidatorRegistry,
{ signatureInvalid: { inc } } as unknown as EventProcessMetrics,
{
signatureInvalid: { inc },
eventDuplicatesDropped: { inc: duplicatesInc },
} as unknown as EventProcessMetrics,
{ computeHash, checkAndMark } as unknown as IdempotencyService,
);
});

Expand Down Expand Up @@ -83,4 +94,37 @@ describe('EventProcessService', () => {
reason: 'invalid_signature',
});
});

describe('idempotency (story 3.5)', () => {
it('drops the second identical message and counts the metric (AC1)', async () => {
checkAndMark.mockResolvedValueOnce(true).mockResolvedValueOnce(false);

await expect(service.handle(validEnvelope)).resolves.toBeUndefined();
await expect(service.handle(validEnvelope)).resolves.toBeUndefined();

expect(checkAndMark).toHaveBeenCalledTimes(2);
expect(duplicatesInc).toHaveBeenCalledTimes(1);
expect(duplicatesInc).toHaveBeenCalledWith({ platform: 'evolution-api' });
});

it('hashes only the rawPayload, so a bit-different payload is not a duplicate (AC2)', async () => {
await service.handle(validEnvelope);
await service.handle({
...validEnvelope,
rawPayload: 'raw-body-bytes-X',
});

expect(computeHash).toHaveBeenNthCalledWith(1, 'raw-body-bytes');
expect(computeHash).toHaveBeenNthCalledWith(2, 'raw-body-bytes-X');
expect(duplicatesInc).not.toHaveBeenCalled();
});

it('runs the idempotency check only after signature validation passes', async () => {
validate.mockReturnValue(false);

await service.handle(validEnvelope);

expect(checkAndMark).not.toHaveBeenCalled();
});
});
});
49 changes: 39 additions & 10 deletions src/runners/event-process/services/event-process.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
isEventsReceivedContract,
} from 'src/shared/broker/contracts/events-received.contract';
import { TerminalError } from 'src/shared/errors/terminal-error';
import { IdempotencyService } from 'src/shared/idempotency/idempotency.service';
import { SignatureValidatorRegistry } from './signature-validator.registry';
import { EventProcessMetrics } from '../metrics/event-process-metrics';

Expand All @@ -18,11 +19,11 @@ export class InvalidEnvelopeError extends TerminalError {}
/**
* Stub handler for the webhook event pipeline (story 3.3 / EVO-1208).
*
* Validates the inbound `events.received.<platform>` envelope and logs it. The
* real pipeline — signature validation (3.4), idempotency (3.5), enrichment
* (3.6) and ClickHouse persist (3.7) — replaces this body in later stories.
* Throws on a malformed envelope so the consumer can nack/redeliver rather than
* silently dropping a message.
* Validates the inbound `events.received.<platform>` envelope, verifies the
* provider signature (3.4) and drops duplicates via the shared idempotency
* guard (3.5). Enrichment (3.6) and ClickHouse persist (3.7) plug in after the
* idempotency gate in later stories. Throws on a malformed envelope so the
* consumer can nack/redeliver rather than silently dropping a message.
*/
@Injectable()
export class EventProcessService {
Expand All @@ -31,6 +32,7 @@ export class EventProcessService {
constructor(
private readonly validators: SignatureValidatorRegistry,
private readonly metrics: EventProcessMetrics,
private readonly idempotency: IdempotencyService,
) {}

async handle(envelope: unknown): Promise<void> {
Expand All @@ -43,6 +45,8 @@ export class EventProcessService {

if (!(await this.hasValidSignature(valid))) return;

if (await this.isDuplicate(valid)) return;

this.logger.log('event-process.handle', {
action: 'event-process.handle',
platform: valid.platform,
Expand Down Expand Up @@ -93,12 +97,10 @@ export class EventProcessService {
ingestionId: valid.ingestionId,
});
}
const rawPayload =
typeof valid.rawPayload === 'string'
? valid.rawPayload
: JSON.stringify(valid.rawPayload ?? '');

if (await validator.validate(rawPayload, valid.headers)) return true;
if (await validator.validate(this.rawPayloadString(valid), valid.headers)) {
return true;
}

this.logger.warn('event-process.signature.invalid', {
action: 'event-process.signature.invalid',
Expand All @@ -112,4 +114,31 @@ export class EventProcessService {
});
return false;
}

/**
* Drops a webhook the pipeline has already processed within the idempotency
* TTL. Runs AFTER signature validation so forged payloads never reach Redis.
* The hash covers only `rawPayload` (per story 3.5) — folding in headers or
* metadata would mask legitimate duplicates that arrive with a different
* header. A duplicate is expected behaviour, so it logs at info, not warn.
*/
private async isDuplicate(valid: EventsReceivedContract): Promise<boolean> {
const hash = this.idempotency.computeHash(this.rawPayloadString(valid));
if (await this.idempotency.checkAndMark(hash)) return false;

this.logger.log('event-process.duplicate', {
action: 'event-process.duplicate',
platform: valid.platform,
correlationId: valid.correlationId,
ingestionId: valid.ingestionId,
});
this.metrics.eventDuplicatesDropped.inc({ platform: valid.platform });
return true;
}

private rawPayloadString(valid: EventsReceivedContract): string {
return typeof valid.rawPayload === 'string'
? valid.rawPayload
: JSON.stringify(valid.rawPayload ?? '');
}
}
Loading