From d777738555e1773d7e501ae8d65e02fa0942fd27 Mon Sep 17 00:00:00 2001 From: Nickolas Oliveira Date: Tue, 9 Jun 2026 22:46:21 -0300 Subject: [PATCH] feat(event-process): integrate IdempotencyService dedup into handle() (EVO-1211) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements story 3.5: after signature validation, the pipeline computes a sha256 of rawPayload and calls IdempotencyService.checkAndMark — a duplicate within the TTL is dropped (ack, no throw) and counted, while a first-seen event proceeds. Runs after signature validation so forged payloads never reach Redis; the hash covers only rawPayload so a different header can't mask a legitimate duplicate. - EventProcessService: inject IdempotencyService (@Global), isDuplicate() gate, shared rawPayloadString() helper reused by the signature path - EventProcessMetrics: evo_webhook_event_duplicates_dropped_total{platform} - specs: AC1 (second identical dropped + metric), AC2 (bit-different payload is not a duplicate), and idempotency-runs-after-signature ordering The safety lock (acquireLock/releaseLock) is wired by 3.7 (EVO-1213) when persist exists; checkAndMark-only is the at-most-once gate for 3.5. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../metrics/event-process-metrics.ts | 17 ++++++- .../services/event-process.service.spec.ts | 46 ++++++++++++++++- .../services/event-process.service.ts | 49 +++++++++++++++---- 3 files changed, 99 insertions(+), 13 deletions(-) diff --git a/src/runners/event-process/metrics/event-process-metrics.ts b/src/runners/event-process/metrics/event-process-metrics.ts index aa996f4..200d408 100644 --- a/src/runners/event-process/metrics/event-process-metrics.ts +++ b/src/runners/event-process/metrics/event-process-metrics.ts @@ -2,17 +2,20 @@ import { Injectable } from '@nestjs/common'; import { Counter, register } from 'prom-client'; const SIGNATURE_INVALID_METRIC = 'evo_webhook_signature_invalid_total'; +const EVENT_DUPLICATES_DROPPED_METRIC = + 'evo_webhook_event_duplicates_dropped_total'; /** - * Prometheus counters for the event-process webhook pipeline (story 3.4). + * Prometheus counters for the event-process webhook pipeline (stories 3.4, 3.5). * - * The counter is fetched from the global registry if it already exists so that + * Each counter is fetched from the global registry if it already exists so that * re-instantiating this provider (e.g. across test modules) does not throw the * "metric already registered" error prom-client raises on duplicate names. */ @Injectable() export class EventProcessMetrics { readonly signatureInvalid: Counter; + readonly eventDuplicatesDropped: Counter; constructor() { this.signatureInvalid = @@ -24,5 +27,15 @@ export class EventProcessMetrics { help: 'Webhook envelopes dropped because the signature was missing, invalid or unverifiable', labelNames: ['platform', 'reason'], }); + + this.eventDuplicatesDropped = + (register.getSingleMetric(EVENT_DUPLICATES_DROPPED_METRIC) as + | Counter + | undefined) ?? + new Counter({ + name: EVENT_DUPLICATES_DROPPED_METRIC, + help: 'Webhook envelopes dropped because an identical payload was already processed within the idempotency TTL', + labelNames: ['platform'], + }); } } diff --git a/src/runners/event-process/services/event-process.service.spec.ts b/src/runners/event-process/services/event-process.service.spec.ts index d53c8c3..3b9854b 100644 --- a/src/runners/event-process/services/event-process.service.spec.ts +++ b/src/runners/event-process/services/event-process.service.spec.ts @@ -1,12 +1,16 @@ import { EventProcessService } from './event-process.service'; import { SignatureValidatorRegistry } from './signature-validator.registry'; import { EventProcessMetrics } from '../metrics/event-process-metrics'; +import { IdempotencyService } from 'src/shared/idempotency/idempotency.service'; describe('EventProcessService', () => { let service: EventProcessService; let validate: jest.Mock; let forPlatform: jest.Mock; let inc: jest.Mock; + let duplicatesInc: jest.Mock; + let computeHash: jest.Mock; + let checkAndMark: jest.Mock; const validEnvelope = { platform: 'evolution-api', @@ -24,9 +28,16 @@ describe('EventProcessService', () => { .fn() .mockReturnValue({ platform: 'evolution-api', validate }); inc = jest.fn(); + duplicatesInc = jest.fn(); + computeHash = jest.fn((payload: string) => `hash:${payload}`); + checkAndMark = jest.fn().mockResolvedValue(true); service = new EventProcessService( { for: forPlatform } as unknown as SignatureValidatorRegistry, - { signatureInvalid: { inc } } as unknown as EventProcessMetrics, + { + signatureInvalid: { inc }, + eventDuplicatesDropped: { inc: duplicatesInc }, + } as unknown as EventProcessMetrics, + { computeHash, checkAndMark } as unknown as IdempotencyService, ); }); @@ -83,4 +94,37 @@ describe('EventProcessService', () => { reason: 'invalid_signature', }); }); + + describe('idempotency (story 3.5)', () => { + it('drops the second identical message and counts the metric (AC1)', async () => { + checkAndMark.mockResolvedValueOnce(true).mockResolvedValueOnce(false); + + await expect(service.handle(validEnvelope)).resolves.toBeUndefined(); + await expect(service.handle(validEnvelope)).resolves.toBeUndefined(); + + expect(checkAndMark).toHaveBeenCalledTimes(2); + expect(duplicatesInc).toHaveBeenCalledTimes(1); + expect(duplicatesInc).toHaveBeenCalledWith({ platform: 'evolution-api' }); + }); + + it('hashes only the rawPayload, so a bit-different payload is not a duplicate (AC2)', async () => { + await service.handle(validEnvelope); + await service.handle({ + ...validEnvelope, + rawPayload: 'raw-body-bytes-X', + }); + + expect(computeHash).toHaveBeenNthCalledWith(1, 'raw-body-bytes'); + expect(computeHash).toHaveBeenNthCalledWith(2, 'raw-body-bytes-X'); + expect(duplicatesInc).not.toHaveBeenCalled(); + }); + + it('runs the idempotency check only after signature validation passes', async () => { + validate.mockReturnValue(false); + + await service.handle(validEnvelope); + + expect(checkAndMark).not.toHaveBeenCalled(); + }); + }); }); diff --git a/src/runners/event-process/services/event-process.service.ts b/src/runners/event-process/services/event-process.service.ts index 9779a69..36864a3 100644 --- a/src/runners/event-process/services/event-process.service.ts +++ b/src/runners/event-process/services/event-process.service.ts @@ -5,6 +5,7 @@ import { isEventsReceivedContract, } from 'src/shared/broker/contracts/events-received.contract'; import { TerminalError } from 'src/shared/errors/terminal-error'; +import { IdempotencyService } from 'src/shared/idempotency/idempotency.service'; import { SignatureValidatorRegistry } from './signature-validator.registry'; import { EventProcessMetrics } from '../metrics/event-process-metrics'; @@ -18,11 +19,11 @@ export class InvalidEnvelopeError extends TerminalError {} /** * Stub handler for the webhook event pipeline (story 3.3 / EVO-1208). * - * Validates the inbound `events.received.` envelope and logs it. The - * real pipeline — signature validation (3.4), idempotency (3.5), enrichment - * (3.6) and ClickHouse persist (3.7) — replaces this body in later stories. - * Throws on a malformed envelope so the consumer can nack/redeliver rather than - * silently dropping a message. + * Validates the inbound `events.received.` envelope, verifies the + * provider signature (3.4) and drops duplicates via the shared idempotency + * guard (3.5). Enrichment (3.6) and ClickHouse persist (3.7) plug in after the + * idempotency gate in later stories. Throws on a malformed envelope so the + * consumer can nack/redeliver rather than silently dropping a message. */ @Injectable() export class EventProcessService { @@ -31,6 +32,7 @@ export class EventProcessService { constructor( private readonly validators: SignatureValidatorRegistry, private readonly metrics: EventProcessMetrics, + private readonly idempotency: IdempotencyService, ) {} async handle(envelope: unknown): Promise { @@ -43,6 +45,8 @@ export class EventProcessService { if (!(await this.hasValidSignature(valid))) return; + if (await this.isDuplicate(valid)) return; + this.logger.log('event-process.handle', { action: 'event-process.handle', platform: valid.platform, @@ -93,12 +97,10 @@ export class EventProcessService { ingestionId: valid.ingestionId, }); } - const rawPayload = - typeof valid.rawPayload === 'string' - ? valid.rawPayload - : JSON.stringify(valid.rawPayload ?? ''); - if (await validator.validate(rawPayload, valid.headers)) return true; + if (await validator.validate(this.rawPayloadString(valid), valid.headers)) { + return true; + } this.logger.warn('event-process.signature.invalid', { action: 'event-process.signature.invalid', @@ -112,4 +114,31 @@ export class EventProcessService { }); return false; } + + /** + * Drops a webhook the pipeline has already processed within the idempotency + * TTL. Runs AFTER signature validation so forged payloads never reach Redis. + * The hash covers only `rawPayload` (per story 3.5) — folding in headers or + * metadata would mask legitimate duplicates that arrive with a different + * header. A duplicate is expected behaviour, so it logs at info, not warn. + */ + private async isDuplicate(valid: EventsReceivedContract): Promise { + const hash = this.idempotency.computeHash(this.rawPayloadString(valid)); + if (await this.idempotency.checkAndMark(hash)) return false; + + this.logger.log('event-process.duplicate', { + action: 'event-process.duplicate', + platform: valid.platform, + correlationId: valid.correlationId, + ingestionId: valid.ingestionId, + }); + this.metrics.eventDuplicatesDropped.inc({ platform: valid.platform }); + return true; + } + + private rawPayloadString(valid: EventsReceivedContract): string { + return typeof valid.rawPayload === 'string' + ? valid.rawPayload + : JSON.stringify(valid.rawPayload ?? ''); + } }