Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/runners/event-process/event-process.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ import { EventProcessService } from './services/event-process.service';
import { EventsReceivedConsumer } from './services/events-received.consumer';
import { SignatureValidatorRegistry } from './services/signature-validator.registry';
import { EventProcessMetrics } from './metrics/event-process-metrics';
import { EnricherService } from './services/enricher.service';
import { RecipientSourceExtractor } from './services/recipient-source.extractor';
import { GeoLocationService } from '../../modules/click-tracking/services/geo-location.service';

/**
* Runner module for RUN_MODE=event-process (story 3.3 / EVO-1208).
Expand All @@ -13,13 +16,20 @@ import { EventProcessMetrics } from './metrics/event-process-metrics';
* is global (ConfigModule.forRoot isGlobal), so this module only declares the
* consumer, handler, signature-validator registry and metrics. Imported
* conditionally from AppModule.forRoot() when shouldStartEventProcess() is true.
*
* `GeoLocationService` is reused from click-tracking by declaring it as a
* provider here (it has no injected dependencies), avoiding a dependency on the
* whole `ClickTrackingModule` (story 3.6 / EVO-1212).
*/
@Module({
providers: [
EventProcessService,
EventsReceivedConsumer,
SignatureValidatorRegistry,
EventProcessMetrics,
EnricherService,
RecipientSourceExtractor,
GeoLocationService,
],
})
export class EventProcessModule {}
167 changes: 167 additions & 0 deletions src/runners/event-process/services/enricher.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import { EnricherService } from './enricher.service';
import { RecipientSourceExtractor } from './recipient-source.extractor';
import { GeoLocationService } from '../../../modules/click-tracking/services/geo-location.service';
import type { EventsReceivedContract } from '../../../shared/broker/contracts/events-received.contract';

const envelope = (
overrides: Partial<EventsReceivedContract> = {},
): EventsReceivedContract =>
({
platform: 'evolution-api',
rawPayload: '{}',
headers: {},
receivedAt: '2026-06-09T12:00:00.000Z',
sourceIp: '203.0.113.10',
ingestionId: '00000000-0000-4000-8000-000000000000',
correlationId: '11111111-1111-4111-8111-111111111111',
...overrides,
}) as EventsReceivedContract;

describe('EnricherService', () => {
let service: EnricherService;

beforeEach(() => {
// Real GeoLocationService: geoip-lite is local + deterministic, so AC2 is
// verified end-to-end rather than against a mocked return.
service = new EnricherService(
new GeoLocationService(),
new RecipientSourceExtractor(),
);
});

it('parses an iPhone user-agent to a mobile device (AC1)', async () => {
const enriched = await service.enrich(
envelope({
headers: {
'user-agent':
'Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X)',
},
}),
);

expect(enriched.enrichment.ua.device.type).toBe('mobile');
});

it('resolves geo country US for 8.8.8.8 (AC2)', async () => {
const enriched = await service.enrich(envelope({ sourceIp: '8.8.8.8' }));

expect(enriched.enrichment.geo.country).toBe('US');
});

it('flags a Googlebot user-agent as a bot (AC3)', async () => {
const enriched = await service.enrich(
envelope({
headers: {
'user-agent': 'Googlebot/2.1 (+http://www.google.com/bot.html)',
},
}),
);

expect(enriched.enrichment.botMarkers.isBot).toBe(true);
});

it('does not flag a normal browser user-agent as a bot', async () => {
const enriched = await service.enrich(
envelope({
headers: {
'user-agent':
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0',
},
}),
);

expect(enriched.enrichment.botMarkers.isBot).toBe(false);
});

it('reads the User-Agent header case-insensitively', async () => {
const enriched = await service.enrich(
envelope({ headers: { 'User-Agent': 'Googlebot/2.1' } }),
);

expect(enriched.enrichment.botMarkers.isBot).toBe(true);
});

it('defaults enrichment fields to empty strings for a private IP and missing UA', async () => {
const enriched = await service.enrich(
envelope({ sourceIp: '10.0.0.1', headers: {} }),
);

expect(enriched.enrichment.geo).toEqual({
country: '',
region: '',
city: '',
});
expect(enriched.enrichment.ua.browser.name).toBe('');
expect(enriched.enrichment.botMarkers.isDatacenter).toBe(false);
});

it('preserves the original envelope fields', async () => {
const enriched = await service.enrich(
envelope({ correlationId: '22222222-2222-4222-8222-222222222222' }),
);

expect(enriched.platform).toBe('evolution-api');
expect(enriched.correlationId).toBe('22222222-2222-4222-8222-222222222222');
});

describe('recipient context from rawPayload (M1)', () => {
it('prefers the SendGrid body UA/IP over the HTTP envelope', async () => {
const enriched = await service.enrich(
envelope({
platform: 'sendgrid',
// Recipient (mobile, US) in the body; provider infra (desktop UA,
// private IP) in the HTTP envelope. The body must win.
rawPayload: JSON.stringify([
{
event: 'open',
useragent:
'Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X)',
ip: '8.8.8.8',
},
]),
headers: {
'user-agent':
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
},
sourceIp: '10.0.0.1',
}),
);

expect(enriched.enrichment.ua.device.type).toBe('mobile');
expect(enriched.enrichment.geo.country).toBe('US');
});

it('uses the Resend body UA for bot detection', async () => {
const enriched = await service.enrich(
envelope({
platform: 'resend',
rawPayload: JSON.stringify({
type: 'email.opened',
data: {
open: { userAgent: 'Googlebot/2.1', ipAddress: '8.8.8.8' },
},
}),
headers: { 'user-agent': 'Mozilla/5.0 normal browser' },
sourceIp: '10.0.0.1',
}),
);

expect(enriched.enrichment.botMarkers.isBot).toBe(true);
expect(enriched.enrichment.geo.country).toBe('US');
});

it('falls back to the envelope when the body carries no recipient source', async () => {
const enriched = await service.enrich(
envelope({
platform: 'sendgrid',
rawPayload: JSON.stringify([{ event: 'delivered' }]),
headers: { 'user-agent': 'Googlebot/2.1' },
sourceIp: '8.8.8.8',
}),
);

expect(enriched.enrichment.botMarkers.isBot).toBe(true);
expect(enriched.enrichment.geo.country).toBe('US');
});
});
});
110 changes: 110 additions & 0 deletions src/runners/event-process/services/enricher.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { Injectable } from '@nestjs/common';
import { UAParser } from 'ua-parser-js';
import { EventsReceivedContract } from '../../../shared/broker/contracts/events-received.contract';
import { EventsEnrichedContract } from '../../../shared/broker/contracts/events-enriched.contract';
import { GeoLocationService } from '../../../modules/click-tracking/services/geo-location.service';
import { RecipientSourceExtractor } from './recipient-source.extractor';

type Enrichment = EventsEnrichedContract['enrichment'];

export type EnrichedEvent = EventsReceivedContract & {
enrichment: Enrichment;
};

// Prefix/substring tokens that mark a request as a known crawler. MVP list
// (story 3.6); broader bot/datacenter detection is deferred to Growth.
const BOT_UA_DENYLIST = [
'googlebot',
'bingbot',
'ahrefsbot',
'semrushbot',
'dotbot',
'mj12bot',
'yandexbot',
];

/**
* Enriches a received webhook envelope (story 3.6 / EVO-1212) with parsed
* user-agent, IP geolocation and basic bot markers before the event-process
* pipeline persists it. Reuses the existing `GeoLocationService` (geoip-lite)
* and `ua-parser-js`. The enriched event is consumed by the ClickHouse writer
* wired into `EventProcessService.handle()` in story 3.7.
*/
@Injectable()
export class EnricherService {
constructor(
private readonly geoLocation: GeoLocationService,
private readonly recipientSource: RecipientSourceExtractor,
) {}

async enrich(envelope: EventsReceivedContract): Promise<EnrichedEvent> {
// Prefer the recipient's UA/IP carried in the provider payload; fall back
// to the HTTP envelope (provider infra) when the body doesn't carry it.
const recipient = this.recipientSource.extract(
envelope.platform,
this.parsePayload(envelope.rawPayload),
);
const userAgent =
recipient.userAgent ?? this.extractUserAgent(envelope.headers);
const ip = recipient.ip ?? envelope.sourceIp;

return {
...envelope,
enrichment: {
ua: this.parseUserAgent(userAgent),
geo: await this.resolveGeo(ip),
botMarkers: {
isBot: this.isBot(userAgent),
isDatacenter: false,
},
},
};
}

private parsePayload(rawPayload: unknown): unknown {
if (typeof rawPayload !== 'string') return rawPayload;
try {
return JSON.parse(rawPayload);
} catch {
return undefined;
}
}

private extractUserAgent(headers: Record<string, string>): string {
const key = Object.keys(headers).find(
(header) => header.toLowerCase() === 'user-agent',
);
return key ? headers[key] : '';
}

private parseUserAgent(userAgent: string): Enrichment['ua'] {
const parsed = new UAParser(userAgent);
const browser = parsed.getBrowser();
const os = parsed.getOS();
const device = parsed.getDevice();

return {
browser: { name: browser.name ?? '', version: browser.version ?? '' },
os: { name: os.name ?? '', version: os.version ?? '' },
device: {
type: device.type ?? '',
vendor: device.vendor ?? '',
model: device.model ?? '',
},
};
}

private async resolveGeo(sourceIp: string): Promise<Enrichment['geo']> {
const geo = await this.geoLocation.getLocationFromIp(sourceIp);
return {
country: geo.country ?? '',
region: geo.region ?? '',
city: geo.city ?? '',
};
}

private isBot(userAgent: string): boolean {
const normalized = userAgent.toLowerCase();
return BOT_UA_DENYLIST.some((bot) => normalized.includes(bot));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import { RecipientSourceExtractor } from './recipient-source.extractor';

describe('RecipientSourceExtractor', () => {
const extractor = new RecipientSourceExtractor();

it('extracts useragent/ip from a SendGrid event array (first event)', () => {
const payload = [
{ event: 'open', useragent: 'Mozilla/5.0', ip: '8.8.8.8' },
{ event: 'click', useragent: 'other', ip: '1.1.1.1' },
];
expect(extractor.extract('sendgrid', payload)).toEqual({
userAgent: 'Mozilla/5.0',
ip: '8.8.8.8',
});
});

it('extracts user_agent/ip from a Mandrill event array', () => {
const payload = [
{ event: 'open', user_agent: 'Mozilla/5.0', ip: '8.8.8.8' },
];
expect(extractor.extract('mandrill', payload)).toEqual({
userAgent: 'Mozilla/5.0',
ip: '8.8.8.8',
});
});

it('extracts from a Resend engagement payload (data.open)', () => {
const payload = {
type: 'email.opened',
data: { open: { userAgent: 'Mozilla/5.0', ipAddress: '8.8.8.8' } },
};
expect(extractor.extract('resend', payload)).toEqual({
userAgent: 'Mozilla/5.0',
ip: '8.8.8.8',
});
});

it('extracts from an SES SNS-wrapped click event (Message is a JSON string)', () => {
const payload = {
Type: 'Notification',
Message: JSON.stringify({
eventType: 'Click',
click: { userAgent: 'Mozilla/5.0', ipAddress: '8.8.8.8' },
}),
};
expect(extractor.extract('ses', payload)).toEqual({
userAgent: 'Mozilla/5.0',
ip: '8.8.8.8',
});
});

it('extracts from a SparkPost track_event', () => {
const payload = [
{
msys: {
track_event: { user_agent: 'Mozilla/5.0', ip_address: '8.8.8.8' },
},
},
];
expect(extractor.extract('sparkpost', payload)).toEqual({
userAgent: 'Mozilla/5.0',
ip: '8.8.8.8',
});
});

it('returns {} for evolution-api (no recipient UA/IP in body)', () => {
expect(extractor.extract('evolution-api', { foo: 'bar' })).toEqual({});
});

it('returns {} when the payload lacks the expected fields', () => {
expect(extractor.extract('sendgrid', [{ event: 'delivered' }])).toEqual({});
expect(extractor.extract('resend', { data: {} })).toEqual({});
expect(extractor.extract('sparkpost', { msys: {} })).toEqual({});
});

it('returns {} for a non-object payload', () => {
expect(extractor.extract('sendgrid', null)).toEqual({});
expect(extractor.extract('sendgrid', 'not-json')).toEqual({});
});
});
Loading
Loading