diff --git a/src/runners/event-process/event-process.module.ts b/src/runners/event-process/event-process.module.ts index c524469..20afeab 100644 --- a/src/runners/event-process/event-process.module.ts +++ b/src/runners/event-process/event-process.module.ts @@ -3,6 +3,9 @@ import { EventProcessService } from './services/event-process.service'; import { EventsReceivedConsumer } from './services/events-received.consumer'; import { SignatureValidatorRegistry } from './services/signature-validator.registry'; import { EventProcessMetrics } from './metrics/event-process-metrics'; +import { EnricherService } from './services/enricher.service'; +import { RecipientSourceExtractor } from './services/recipient-source.extractor'; +import { GeoLocationService } from '../../modules/click-tracking/services/geo-location.service'; /** * Runner module for RUN_MODE=event-process (story 3.3 / EVO-1208). @@ -13,6 +16,10 @@ import { EventProcessMetrics } from './metrics/event-process-metrics'; * is global (ConfigModule.forRoot isGlobal), so this module only declares the * consumer, handler, signature-validator registry and metrics. Imported * conditionally from AppModule.forRoot() when shouldStartEventProcess() is true. + * + * `GeoLocationService` is reused from click-tracking by declaring it as a + * provider here (it has no injected dependencies), avoiding a dependency on the + * whole `ClickTrackingModule` (story 3.6 / EVO-1212). */ @Module({ providers: [ @@ -20,6 +27,9 @@ import { EventProcessMetrics } from './metrics/event-process-metrics'; EventsReceivedConsumer, SignatureValidatorRegistry, EventProcessMetrics, + EnricherService, + RecipientSourceExtractor, + GeoLocationService, ], }) export class EventProcessModule {} diff --git a/src/runners/event-process/services/enricher.service.spec.ts b/src/runners/event-process/services/enricher.service.spec.ts new file mode 100644 index 0000000..24410ff --- /dev/null +++ b/src/runners/event-process/services/enricher.service.spec.ts @@ -0,0 +1,167 @@ +import { EnricherService } from './enricher.service'; +import { RecipientSourceExtractor } from './recipient-source.extractor'; +import { GeoLocationService } from '../../../modules/click-tracking/services/geo-location.service'; +import type { EventsReceivedContract } from '../../../shared/broker/contracts/events-received.contract'; + +const envelope = ( + overrides: Partial = {}, +): EventsReceivedContract => + ({ + platform: 'evolution-api', + rawPayload: '{}', + headers: {}, + receivedAt: '2026-06-09T12:00:00.000Z', + sourceIp: '203.0.113.10', + ingestionId: '00000000-0000-4000-8000-000000000000', + correlationId: '11111111-1111-4111-8111-111111111111', + ...overrides, + }) as EventsReceivedContract; + +describe('EnricherService', () => { + let service: EnricherService; + + beforeEach(() => { + // Real GeoLocationService: geoip-lite is local + deterministic, so AC2 is + // verified end-to-end rather than against a mocked return. + service = new EnricherService( + new GeoLocationService(), + new RecipientSourceExtractor(), + ); + }); + + it('parses an iPhone user-agent to a mobile device (AC1)', async () => { + const enriched = await service.enrich( + envelope({ + headers: { + 'user-agent': + 'Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X)', + }, + }), + ); + + expect(enriched.enrichment.ua.device.type).toBe('mobile'); + }); + + it('resolves geo country US for 8.8.8.8 (AC2)', async () => { + const enriched = await service.enrich(envelope({ sourceIp: '8.8.8.8' })); + + expect(enriched.enrichment.geo.country).toBe('US'); + }); + + it('flags a Googlebot user-agent as a bot (AC3)', async () => { + const enriched = await service.enrich( + envelope({ + headers: { + 'user-agent': 'Googlebot/2.1 (+http://www.google.com/bot.html)', + }, + }), + ); + + expect(enriched.enrichment.botMarkers.isBot).toBe(true); + }); + + it('does not flag a normal browser user-agent as a bot', async () => { + const enriched = await service.enrich( + envelope({ + headers: { + 'user-agent': + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0', + }, + }), + ); + + expect(enriched.enrichment.botMarkers.isBot).toBe(false); + }); + + it('reads the User-Agent header case-insensitively', async () => { + const enriched = await service.enrich( + envelope({ headers: { 'User-Agent': 'Googlebot/2.1' } }), + ); + + expect(enriched.enrichment.botMarkers.isBot).toBe(true); + }); + + it('defaults enrichment fields to empty strings for a private IP and missing UA', async () => { + const enriched = await service.enrich( + envelope({ sourceIp: '10.0.0.1', headers: {} }), + ); + + expect(enriched.enrichment.geo).toEqual({ + country: '', + region: '', + city: '', + }); + expect(enriched.enrichment.ua.browser.name).toBe(''); + expect(enriched.enrichment.botMarkers.isDatacenter).toBe(false); + }); + + it('preserves the original envelope fields', async () => { + const enriched = await service.enrich( + envelope({ correlationId: '22222222-2222-4222-8222-222222222222' }), + ); + + expect(enriched.platform).toBe('evolution-api'); + expect(enriched.correlationId).toBe('22222222-2222-4222-8222-222222222222'); + }); + + describe('recipient context from rawPayload (M1)', () => { + it('prefers the SendGrid body UA/IP over the HTTP envelope', async () => { + const enriched = await service.enrich( + envelope({ + platform: 'sendgrid', + // Recipient (mobile, US) in the body; provider infra (desktop UA, + // private IP) in the HTTP envelope. The body must win. + rawPayload: JSON.stringify([ + { + event: 'open', + useragent: + 'Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X)', + ip: '8.8.8.8', + }, + ]), + headers: { + 'user-agent': + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', + }, + sourceIp: '10.0.0.1', + }), + ); + + expect(enriched.enrichment.ua.device.type).toBe('mobile'); + expect(enriched.enrichment.geo.country).toBe('US'); + }); + + it('uses the Resend body UA for bot detection', async () => { + const enriched = await service.enrich( + envelope({ + platform: 'resend', + rawPayload: JSON.stringify({ + type: 'email.opened', + data: { + open: { userAgent: 'Googlebot/2.1', ipAddress: '8.8.8.8' }, + }, + }), + headers: { 'user-agent': 'Mozilla/5.0 normal browser' }, + sourceIp: '10.0.0.1', + }), + ); + + expect(enriched.enrichment.botMarkers.isBot).toBe(true); + expect(enriched.enrichment.geo.country).toBe('US'); + }); + + it('falls back to the envelope when the body carries no recipient source', async () => { + const enriched = await service.enrich( + envelope({ + platform: 'sendgrid', + rawPayload: JSON.stringify([{ event: 'delivered' }]), + headers: { 'user-agent': 'Googlebot/2.1' }, + sourceIp: '8.8.8.8', + }), + ); + + expect(enriched.enrichment.botMarkers.isBot).toBe(true); + expect(enriched.enrichment.geo.country).toBe('US'); + }); + }); +}); diff --git a/src/runners/event-process/services/enricher.service.ts b/src/runners/event-process/services/enricher.service.ts new file mode 100644 index 0000000..4a3c628 --- /dev/null +++ b/src/runners/event-process/services/enricher.service.ts @@ -0,0 +1,110 @@ +import { Injectable } from '@nestjs/common'; +import { UAParser } from 'ua-parser-js'; +import { EventsReceivedContract } from '../../../shared/broker/contracts/events-received.contract'; +import { EventsEnrichedContract } from '../../../shared/broker/contracts/events-enriched.contract'; +import { GeoLocationService } from '../../../modules/click-tracking/services/geo-location.service'; +import { RecipientSourceExtractor } from './recipient-source.extractor'; + +type Enrichment = EventsEnrichedContract['enrichment']; + +export type EnrichedEvent = EventsReceivedContract & { + enrichment: Enrichment; +}; + +// Prefix/substring tokens that mark a request as a known crawler. MVP list +// (story 3.6); broader bot/datacenter detection is deferred to Growth. +const BOT_UA_DENYLIST = [ + 'googlebot', + 'bingbot', + 'ahrefsbot', + 'semrushbot', + 'dotbot', + 'mj12bot', + 'yandexbot', +]; + +/** + * Enriches a received webhook envelope (story 3.6 / EVO-1212) with parsed + * user-agent, IP geolocation and basic bot markers before the event-process + * pipeline persists it. Reuses the existing `GeoLocationService` (geoip-lite) + * and `ua-parser-js`. The enriched event is consumed by the ClickHouse writer + * wired into `EventProcessService.handle()` in story 3.7. + */ +@Injectable() +export class EnricherService { + constructor( + private readonly geoLocation: GeoLocationService, + private readonly recipientSource: RecipientSourceExtractor, + ) {} + + async enrich(envelope: EventsReceivedContract): Promise { + // Prefer the recipient's UA/IP carried in the provider payload; fall back + // to the HTTP envelope (provider infra) when the body doesn't carry it. + const recipient = this.recipientSource.extract( + envelope.platform, + this.parsePayload(envelope.rawPayload), + ); + const userAgent = + recipient.userAgent ?? this.extractUserAgent(envelope.headers); + const ip = recipient.ip ?? envelope.sourceIp; + + return { + ...envelope, + enrichment: { + ua: this.parseUserAgent(userAgent), + geo: await this.resolveGeo(ip), + botMarkers: { + isBot: this.isBot(userAgent), + isDatacenter: false, + }, + }, + }; + } + + private parsePayload(rawPayload: unknown): unknown { + if (typeof rawPayload !== 'string') return rawPayload; + try { + return JSON.parse(rawPayload); + } catch { + return undefined; + } + } + + private extractUserAgent(headers: Record): string { + const key = Object.keys(headers).find( + (header) => header.toLowerCase() === 'user-agent', + ); + return key ? headers[key] : ''; + } + + private parseUserAgent(userAgent: string): Enrichment['ua'] { + const parsed = new UAParser(userAgent); + const browser = parsed.getBrowser(); + const os = parsed.getOS(); + const device = parsed.getDevice(); + + return { + browser: { name: browser.name ?? '', version: browser.version ?? '' }, + os: { name: os.name ?? '', version: os.version ?? '' }, + device: { + type: device.type ?? '', + vendor: device.vendor ?? '', + model: device.model ?? '', + }, + }; + } + + private async resolveGeo(sourceIp: string): Promise { + const geo = await this.geoLocation.getLocationFromIp(sourceIp); + return { + country: geo.country ?? '', + region: geo.region ?? '', + city: geo.city ?? '', + }; + } + + private isBot(userAgent: string): boolean { + const normalized = userAgent.toLowerCase(); + return BOT_UA_DENYLIST.some((bot) => normalized.includes(bot)); + } +} diff --git a/src/runners/event-process/services/recipient-source.extractor.spec.ts b/src/runners/event-process/services/recipient-source.extractor.spec.ts new file mode 100644 index 0000000..9085e8d --- /dev/null +++ b/src/runners/event-process/services/recipient-source.extractor.spec.ts @@ -0,0 +1,80 @@ +import { RecipientSourceExtractor } from './recipient-source.extractor'; + +describe('RecipientSourceExtractor', () => { + const extractor = new RecipientSourceExtractor(); + + it('extracts useragent/ip from a SendGrid event array (first event)', () => { + const payload = [ + { event: 'open', useragent: 'Mozilla/5.0', ip: '8.8.8.8' }, + { event: 'click', useragent: 'other', ip: '1.1.1.1' }, + ]; + expect(extractor.extract('sendgrid', payload)).toEqual({ + userAgent: 'Mozilla/5.0', + ip: '8.8.8.8', + }); + }); + + it('extracts user_agent/ip from a Mandrill event array', () => { + const payload = [ + { event: 'open', user_agent: 'Mozilla/5.0', ip: '8.8.8.8' }, + ]; + expect(extractor.extract('mandrill', payload)).toEqual({ + userAgent: 'Mozilla/5.0', + ip: '8.8.8.8', + }); + }); + + it('extracts from a Resend engagement payload (data.open)', () => { + const payload = { + type: 'email.opened', + data: { open: { userAgent: 'Mozilla/5.0', ipAddress: '8.8.8.8' } }, + }; + expect(extractor.extract('resend', payload)).toEqual({ + userAgent: 'Mozilla/5.0', + ip: '8.8.8.8', + }); + }); + + it('extracts from an SES SNS-wrapped click event (Message is a JSON string)', () => { + const payload = { + Type: 'Notification', + Message: JSON.stringify({ + eventType: 'Click', + click: { userAgent: 'Mozilla/5.0', ipAddress: '8.8.8.8' }, + }), + }; + expect(extractor.extract('ses', payload)).toEqual({ + userAgent: 'Mozilla/5.0', + ip: '8.8.8.8', + }); + }); + + it('extracts from a SparkPost track_event', () => { + const payload = [ + { + msys: { + track_event: { user_agent: 'Mozilla/5.0', ip_address: '8.8.8.8' }, + }, + }, + ]; + expect(extractor.extract('sparkpost', payload)).toEqual({ + userAgent: 'Mozilla/5.0', + ip: '8.8.8.8', + }); + }); + + it('returns {} for evolution-api (no recipient UA/IP in body)', () => { + expect(extractor.extract('evolution-api', { foo: 'bar' })).toEqual({}); + }); + + it('returns {} when the payload lacks the expected fields', () => { + expect(extractor.extract('sendgrid', [{ event: 'delivered' }])).toEqual({}); + expect(extractor.extract('resend', { data: {} })).toEqual({}); + expect(extractor.extract('sparkpost', { msys: {} })).toEqual({}); + }); + + it('returns {} for a non-object payload', () => { + expect(extractor.extract('sendgrid', null)).toEqual({}); + expect(extractor.extract('sendgrid', 'not-json')).toEqual({}); + }); +}); diff --git a/src/runners/event-process/services/recipient-source.extractor.ts b/src/runners/event-process/services/recipient-source.extractor.ts new file mode 100644 index 0000000..376daf5 --- /dev/null +++ b/src/runners/event-process/services/recipient-source.extractor.ts @@ -0,0 +1,103 @@ +import { Injectable } from '@nestjs/common'; +import { Platform } from '../../../shared/broker/contracts'; + +export interface RecipientSource { + userAgent?: string; + ip?: string; +} + +const isRecord = (value: unknown): value is Record => + typeof value === 'object' && value !== null && !Array.isArray(value); + +const asString = (value: unknown): string | undefined => + typeof value === 'string' && value.length > 0 ? value : undefined; + +/** + * Extracts the END-RECIPIENT's user-agent and IP from a provider webhook body + * (story 3.6 / EVO-1212). For email engagement events (open/click) the real + * recipient UA/IP live in the payload — the HTTP envelope only carries the + * provider's own server UA/IP. Returns `{}` for providers/payloads that don't + * carry it, so the caller can fall back to the envelope. + * + * Array payloads (SendGrid/Mandrill batch one POST as many events): the MVP + * reads the first event; per-event fan-out is a downstream normalization + * concern. `mailersend` reuses the engagement heuristic as a best-effort. + */ +@Injectable() +export class RecipientSourceExtractor { + extract(platform: Platform, payload: unknown): RecipientSource { + const event: unknown = Array.isArray(payload) + ? (payload as unknown[])[0] + : payload; + if (!isRecord(event)) return {}; + + switch (platform) { + case 'sendgrid': + return { userAgent: asString(event.useragent), ip: asString(event.ip) }; + case 'mandrill': + return { + userAgent: asString(event.user_agent), + ip: asString(event.ip), + }; + case 'resend': + case 'mailersend': + return this.fromEngagement(event.data); + case 'ses': + return this.fromSes(event); + case 'sparkpost': + return this.fromSparkpost(event); + default: + return {}; + } + } + + // Resend / MailerSend: { data: { open | click: { userAgent, ipAddress } } } + private fromEngagement(data: unknown): RecipientSource { + if (!isRecord(data)) return {}; + const engagement = + (isRecord(data.open) && data.open) || + (isRecord(data.click) && data.click); + if (!isRecord(engagement)) return {}; + return { + userAgent: + asString(engagement.userAgent) ?? asString(engagement.user_agent), + ip: asString(engagement.ipAddress) ?? asString(engagement.ip), + }; + } + + // SES via SNS: { Message: '' } whose body is { open | click: { userAgent, ipAddress } } + private fromSes(event: Record): RecipientSource { + const message = this.parseMaybeJson(event.Message) ?? event; + if (!isRecord(message)) return {}; + const engagement = + (isRecord(message.open) && message.open) || + (isRecord(message.click) && message.click); + if (!isRecord(engagement)) return {}; + return { + userAgent: asString(engagement.userAgent), + ip: asString(engagement.ipAddress), + }; + } + + // SparkPost: { msys: { track_event | message_event: { user_agent, ip_address } } } + private fromSparkpost(event: Record): RecipientSource { + if (!isRecord(event.msys)) return {}; + const trackEvent = + (isRecord(event.msys.track_event) && event.msys.track_event) || + (isRecord(event.msys.message_event) && event.msys.message_event); + if (!isRecord(trackEvent)) return {}; + return { + userAgent: asString(trackEvent.user_agent), + ip: asString(trackEvent.ip_address), + }; + } + + private parseMaybeJson(value: unknown): unknown { + if (typeof value !== 'string') return undefined; + try { + return JSON.parse(value); + } catch { + return undefined; + } + } +}