diff --git a/db/migrations/20260607120000_add_event_key_to_processed_events.cjs b/db/migrations/20260607120000_add_event_key_to_processed_events.cjs new file mode 100644 index 00000000..72a29709 --- /dev/null +++ b/db/migrations/20260607120000_add_event_key_to_processed_events.cjs @@ -0,0 +1,31 @@ +/** + * Adds a canonical event_key to processed_events for robust Soroban idempotency. + */ +exports.up = async function up(knex) { + await knex.schema.alterTable('processed_events', (table) => { + table.text('event_key') + }) + + await knex.raw( + `UPDATE processed_events + SET event_key = transaction_hash || ':' || ledger_number || ':' || event_index` + ) + + await knex.schema.alterTable('processed_events', (table) => { + table.text('event_key').notNullable().unique().alter() + }) + + await knex.schema.alterTable('processed_events', (table) => { + table.index(['event_key'], 'idx_processed_events_event_key') + }) +} + +exports.down = async function down(knex) { + await knex.schema.alterTable('processed_events', (table) => { + table.dropIndex(['event_key'], 'idx_processed_events_event_key') + }) + + await knex.schema.alterTable('processed_events', (table) => { + table.dropColumn('event_key') + }) +} diff --git a/docs/event-processing.md b/docs/event-processing.md index 89e79891..3f27d3a5 100644 --- a/docs/event-processing.md +++ b/docs/event-processing.md @@ -33,12 +33,12 @@ The processor will retry the operation with exponential backoff. If the parent e ## Idempotency -Every event has a unique `eventId` formatted as `{transaction_hash}:{event_index}`. +Every event has a unique `eventId` formatted as `{transaction_hash}:{event_index}`. The processor also derives a canonical `event_key` from `{transaction_hash}:{ledger_number}:{event_index}`, which is persisted to `processed_events` and enforced with a unique database index. 1. Before processing, the system checks the `processed_events` table. -2. If the `eventId` exists, the event is skipped as "already processed." +2. If the `eventId` or canonical `event_key` exists, the event is skipped as "already processed." 3. If not, the event is processed within a database transaction. -4. Upon success, the `eventId` is recorded in `processed_events` before committing. +4. Upon success, both `eventId` and the canonical `event_key` are recorded in `processed_events` before committing. ## Error Handling & Dead Letter Queue diff --git a/src/services/eventParser.ts b/src/services/eventParser.ts index 2d944a86..25cde950 100644 --- a/src/services/eventParser.ts +++ b/src/services/eventParser.ts @@ -95,6 +95,10 @@ function readDateField(record: DecodedPayload, key: string): Date | undefined { return undefined } +export function createEventKey(txHash: string, ledgerNumber: number, eventIndex: number): string { + return `${txHash}:${ledgerNumber}:${eventIndex}` +} + /** * Validates vault_created event payload * @@ -605,6 +609,7 @@ export function parseHorizonEvent(rawEvent: HorizonEvent): ParseResult { transactionHash: rawEvent.txHash, eventIndex, ledgerNumber: rawEvent.ledger, + eventKey: createEventKey(rawEvent.txHash, rawEvent.ledger, eventIndex), eventType, payload } diff --git a/src/services/eventProcessor.ts b/src/services/eventProcessor.ts index 589e5c4c..ab296f41 100644 --- a/src/services/eventProcessor.ts +++ b/src/services/eventProcessor.ts @@ -138,7 +138,7 @@ export class EventProcessor { const trx = await this.db.transaction() try { - const alreadyProcessed = await this.idempotency.isEventProcessed(event.eventId, trx) + const alreadyProcessed = await this.idempotency.isEventProcessed(event, trx) if (alreadyProcessed) { await trx.commit() return diff --git a/src/services/idempotency.ts b/src/services/idempotency.ts index f5f4a988..d791bbf5 100644 --- a/src/services/idempotency.ts +++ b/src/services/idempotency.ts @@ -2,6 +2,10 @@ import { Knex } from 'knex' import { ParsedEvent } from '../types/horizonSync.js' import { createHash } from 'node:crypto' +function getCanonicalEventKey(event: ParsedEvent): string { + return event.eventKey ?? `${event.transactionHash}:${event.ledgerNumber}:${event.eventIndex}` +} + export class IdempotencyConflictError extends Error { constructor(message = 'Idempotency key conflict') { super(message) @@ -54,11 +58,14 @@ export class IdempotencyService { * @param trx - Optional transaction to use for the check * @returns Promise - True if already processed */ - async isEventProcessed(eventId: string, trx?: Knex.Transaction): Promise { + async isEventProcessed(event: ParsedEvent, trx?: Knex.Transaction): Promise { + const eventKey = getCanonicalEventKey(event) const query = (trx || this.db)('processed_events') - .where({ event_id: eventId }) + .where(function () { + this.where({ event_id: event.eventId }).orWhere({ event_key: eventKey }) + }) .first() - + const result = await query return !!result } @@ -73,6 +80,7 @@ export class IdempotencyService { async markEventProcessed(event: ParsedEvent, trx: Knex.Transaction): Promise { await trx('processed_events').insert({ event_id: event.eventId, + event_key: getCanonicalEventKey(event), transaction_hash: event.transactionHash, event_index: event.eventIndex, ledger_number: event.ledgerNumber, diff --git a/src/tests/eventProcessor.idempotency.test.ts b/src/tests/eventProcessor.idempotency.test.ts new file mode 100644 index 00000000..7e2d90b9 --- /dev/null +++ b/src/tests/eventProcessor.idempotency.test.ts @@ -0,0 +1,67 @@ +import type { Knex } from 'knex' +import { EventProcessor } from '../services/eventProcessor.js' +import { + setupTestDatabase, + teardownTestDatabase, + cleanAllTables, + insertTestVault, +} from './helpers/testDatabase.js' +import { mockVaultCompletedEvent } from './fixtures/horizonEvents.js' + +describe('EventProcessor idempotency key handling', () => { + let db: Knex + let processor: EventProcessor + + beforeAll(async () => { + db = await setupTestDatabase() + processor = new EventProcessor(db, { maxRetries: 3, retryBackoffMs: 50 }) + }) + + afterAll(async () => { + await teardownTestDatabase(db) + }) + + beforeEach(async () => { + await cleanAllTables(db) + }) + + it('persists the canonical txHash:ledger:eventIndex key on processed events', async () => { + await insertTestVault(db, 'vault-test-001', { status: 'active' }) + + const result = await processor.processEvent(mockVaultCompletedEvent) + expect(result.success).toBe(true) + + const processedEvent = await db('processed_events') + .where({ event_id: mockVaultCompletedEvent.eventId }) + .first() + + expect(processedEvent).toBeDefined() + expect(processedEvent.event_key).toBe( + `${mockVaultCompletedEvent.transactionHash}:${mockVaultCompletedEvent.ledgerNumber}:${mockVaultCompletedEvent.eventIndex}` + ) + }) + + it('enforces event_key uniqueness even across distinct event_id values', async () => { + await db('processed_events').insert({ + event_id: 'original-event', + event_key: 'txhash-dup:100:1', + transaction_hash: 'txhash-dup', + event_index: 1, + ledger_number: 100, + processed_at: new Date(), + created_at: new Date(), + }) + + await expect( + db('processed_events').insert({ + event_id: 'duplicate-event', + event_key: 'txhash-dup:100:1', + transaction_hash: 'txhash-dup', + event_index: 1, + ledger_number: 100, + processed_at: new Date(), + created_at: new Date(), + }) + ).rejects.toThrow() + }) +}) diff --git a/src/types/horizonSync.ts b/src/types/horizonSync.ts index ff82ccf9..dd906546 100644 --- a/src/types/horizonSync.ts +++ b/src/types/horizonSync.ts @@ -16,6 +16,7 @@ export interface ParsedEvent { transactionHash: string eventIndex: number ledgerNumber: number + eventKey?: string eventType: EventType payload: VaultEventPayload | MilestoneEventPayload | ValidationEventPayload }