From 0d50351c9278421947abbaef820643f3959f361b Mon Sep 17 00:00:00 2001 From: MerlinTheWhiz Date: Wed, 27 May 2026 16:12:13 +0100 Subject: [PATCH] feat: index usage_events into revenue_ledger --- .env.example | 2 + README.md | 1 + src/__tests__/persistentStores.test.ts | 27 ++ src/config/env.test.ts | 20 ++ src/config/env.ts | 2 + src/config/index.ts | 4 + src/index.ts | 16 + .../usageEventsRepository.pg.test.ts | 127 ++++++++ src/repositories/usageEventsRepository.pg.ts | 106 ++++++ src/routes/index.ts | 11 +- src/services/revenueLedgerIndexer.test.ts | 304 ++++++++++++++++++ src/services/revenueLedgerIndexer.ts | 159 +++++++++ src/services/usageStore.ts | 10 +- src/types/developer.ts | 20 ++ 14 files changed, 802 insertions(+), 7 deletions(-) create mode 100644 src/services/revenueLedgerIndexer.test.ts create mode 100644 src/services/revenueLedgerIndexer.ts diff --git a/.env.example b/.env.example index 26be877..5ef4150 100644 --- a/.env.example +++ b/.env.example @@ -83,6 +83,8 @@ HORIZON_URL=https://horizon-testnet.stellar.org HORIZON_TIMEOUT=2000 SETTLEMENT_STATUS_SYNC_INTERVAL_MS=60000 SETTLEMENT_STATUS_SYNC_TIMEOUT_MS=5000 +REVENUE_LEDGER_INDEXER_INTERVAL_MS=30000 +REVENUE_LEDGER_INDEXER_BATCH_SIZE=500 # ----------------------------------------------------------------------------- # Stellar / Soroban network selection diff --git a/README.md b/README.md index fb90945..e06e284 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,7 @@ The request requires developer auth via `Authorization: Bearer ...` or `x-user-i - The runtime now uses PostgreSQL-backed `SettlementStore` and `UsageStore` implementations so `/api/developers/revenue` survives process restarts. - Unsettled usage is persisted through `revenue_ledger`, and settlement batches are persisted through `settlements`. +- A background revenue ledger indexer backfills `revenue_ledger` from `usage_events`, keyed by `usage_event_id` and resolving API ownership from `apis`. - The in-memory store factories are still available for unit tests and isolated local scenarios. - Apply `migrations/001_create_usage_events.sql`, `migrations/002_create_settlements.sql`, `migrations/003_create_revenue_ledger.sql`, and `migrations/005_add_persistent_store_columns.sql` before starting the API against PostgreSQL. diff --git a/src/__tests__/persistentStores.test.ts b/src/__tests__/persistentStores.test.ts index 040ef7c..06fca8d 100644 --- a/src/__tests__/persistentStores.test.ts +++ b/src/__tests__/persistentStores.test.ts @@ -3,6 +3,7 @@ import request from 'supertest'; import { DataType, newDb } from 'pg-mem'; import { createDeveloperRouter } from '../routes/developerRoutes.js'; import { errorHandler } from '../middleware/errorHandler.js'; +import type { DeveloperRepository } from '../repositories/developerRepository.js'; import { createPostgresSettlementStore } from '../services/settlementStore.js'; import { createPostgresUsageStore } from '../services/usageStore.js'; @@ -30,6 +31,11 @@ function createPersistentStoreHarness() { created_at TIMESTAMP NOT NULL DEFAULT NOW() ); + CREATE TABLE apis ( + id VARCHAR(255) PRIMARY KEY, + developer_id VARCHAR(255) NOT NULL + ); + CREATE TABLE settlements ( id BIGSERIAL PRIMARY KEY, external_id VARCHAR(255) NOT NULL UNIQUE, @@ -102,6 +108,11 @@ test('PostgresUsageStore records idempotently and marks events as settled', asyn const { pool, settlementStore, usageStore } = createPersistentStoreHarness(); try { + await pool.query( + 'INSERT INTO apis (id, developer_id) VALUES ($1, $2)', + ['api-1', 'api-owner-1'], + ); + const firstInsert = await usageStore.record({ id: 'ignored-in-pg-store', requestId: 'req-1', @@ -137,6 +148,7 @@ test('PostgresUsageStore records idempotently and marks events as settled', asyn amountUsdc: 4.25, statusCode: 200, apiKey: 'key-1', + userId: 'api-owner-1', settlementId: undefined, }); @@ -165,6 +177,11 @@ test('persistent stores survive new instances and keep developer revenue availab const harness = createPersistentStoreHarness(); try { + await harness.pool.query( + 'INSERT INTO apis (id, developer_id) VALUES ($1, $2)', + ['api-restart', 'dev_restart'], + ); + await harness.settlementStore.create({ id: 'stl_completed', developerId: 'dev_restart', @@ -196,9 +213,19 @@ test('persistent stores survive new instances and keep developer revenue availab const app = express(); app.use(express.json()); + const developerRepository: DeveloperRepository = { + findByUserId: async () => undefined, + getOrCreateByUserId: async () => { + throw new Error('not used in this test'); + }, + upsertProfile: async () => { + throw new Error('not used in this test'); + }, + }; app.use('/api/developers', createDeveloperRouter({ settlementStore: createPostgresSettlementStore(harness.pool), usageStore: createPostgresUsageStore(harness.pool), + developerRepository, })); app.use(errorHandler); diff --git a/src/config/env.test.ts b/src/config/env.test.ts index bd85745..42a579e 100644 --- a/src/config/env.test.ts +++ b/src/config/env.test.ts @@ -153,3 +153,23 @@ describe('env schema — REST rate limit config', () => { expect(result.success).toBe(false); }); }); + +describe('env schema — revenue ledger indexer config', () => { + it('defaults revenue ledger indexer values when omitted', () => { + const result = envSchema.safeParse({ ...baseEnv }); + expect(result.success).toBe(true); + if (result.success) { + expect(result.data.REVENUE_LEDGER_INDEXER_INTERVAL_MS).toBe(30_000); + expect(result.data.REVENUE_LEDGER_INDEXER_BATCH_SIZE).toBe(500); + } + }); + + it('rejects non-positive revenue ledger indexer values', () => { + const result = envSchema.safeParse({ + ...baseEnv, + REVENUE_LEDGER_INDEXER_INTERVAL_MS: '0', + REVENUE_LEDGER_INDEXER_BATCH_SIZE: '-10', + }); + expect(result.success).toBe(false); + }); +}); diff --git a/src/config/env.ts b/src/config/env.ts index cdd0fe6..ab7b673 100644 --- a/src/config/env.ts +++ b/src/config/env.ts @@ -61,6 +61,8 @@ export const envSchema = z HORIZON_TIMEOUT: z.coerce.number().default(2_000), SETTLEMENT_STATUS_SYNC_INTERVAL_MS: z.coerce.number().int().positive().default(60_000), SETTLEMENT_STATUS_SYNC_TIMEOUT_MS: z.coerce.number().int().positive().default(5_000), + REVENUE_LEDGER_INDEXER_INTERVAL_MS: z.coerce.number().int().positive().default(30_000), + REVENUE_LEDGER_INDEXER_BATCH_SIZE: z.coerce.number().int().positive().default(500), // Stellar network configuration STELLAR_NETWORK: stellarNetworkSchema.optional(), diff --git a/src/config/index.ts b/src/config/index.ts index bff79a2..f032cdf 100644 --- a/src/config/index.ts +++ b/src/config/index.ts @@ -153,6 +153,10 @@ export const config = { intervalMs: env.SETTLEMENT_STATUS_SYNC_INTERVAL_MS, timeoutMs: env.SETTLEMENT_STATUS_SYNC_TIMEOUT_MS, }, + revenueLedgerIndexer: { + intervalMs: env.REVENUE_LEDGER_INDEXER_INTERVAL_MS, + batchSize: env.REVENUE_LEDGER_INDEXER_BATCH_SIZE, + }, stellar: { network: selectedNetwork, diff --git a/src/index.ts b/src/index.ts index 08309f1..e0ebce9 100644 --- a/src/index.ts +++ b/src/index.ts @@ -18,6 +18,10 @@ import { createProxyRouter } from './routes/proxyRoutes.js'; import { defaultDeveloperRepository } from './repositories/developerRepository.js'; import { createBillingService } from './services/billingService.js'; import { createRateLimiter } from './services/rateLimiter.js'; +import { PgUsageEventsRepository } from './repositories/usageEventsRepository.pg.js'; +import { createRevenueLedgerIndexerJob } from './services/revenueLedgerIndexer.js'; +import { RevenueSettlementService } from './services/revenueSettlementService.js'; +import { createSettlementStatusSyncJob } from './services/settlementStatusSyncJob.js'; import { createPostgresUsageStore } from './services/usageStore.js'; import { createPostgresSettlementStore } from './services/settlementStore.js'; import { createApiRegistry } from './data/apiRegistry.js'; @@ -237,6 +241,11 @@ if (isDirectExecution) { const rateLimiter = createRateLimiter(5, 60_000); // 5 reqs per minute const usageStore = createPostgresUsageStore(pool); const settlementStore = createPostgresSettlementStore(pool); + const usageEventsRepository = new PgUsageEventsRepository(pool); + const revenueLedgerIndexerJob = createRevenueLedgerIndexerJob(usageEventsRepository, { + intervalMs: config.revenueLedgerIndexer.intervalMs, + batchSize: config.revenueLedgerIndexer.batchSize, + }); const registry = createApiRegistry(); const revenueSettlementService = new RevenueSettlementService( usageStore, @@ -294,6 +303,11 @@ if (isDirectExecution) { const proxyDrainTracker = createInFlightDrainTracker('gateway-proxy'); const shutdownSubsystems: DrainableSubsystem[] = [ proxyDrainTracker.subsystem, + { + name: 'revenue-ledger-indexer', + beginShutdown: () => revenueLedgerIndexerJob.beginShutdown(), + awaitIdle: () => revenueLedgerIndexerJob.awaitIdle(), + }, { name: 'webhook-dispatcher', beginShutdown: stopWebhookDispatching, @@ -312,6 +326,7 @@ if (isDirectExecution) { const PORT = config.port; const closeAllDataResources = async () => { + revenueLedgerIndexerJob.stop(); settlementStatusSyncJob.stop(); await closeDb(); await Promise.allSettled([ @@ -325,6 +340,7 @@ if (isDirectExecution) { async function startServer() { try { await initializeDb(); + revenueLedgerIndexerJob.start(); settlementStatusSyncJob.start(); const server = app.listen(PORT, () => { diff --git a/src/repositories/usageEventsRepository.pg.test.ts b/src/repositories/usageEventsRepository.pg.test.ts index 8c18c2f..96114f1 100644 --- a/src/repositories/usageEventsRepository.pg.test.ts +++ b/src/repositories/usageEventsRepository.pg.test.ts @@ -28,6 +28,20 @@ function createUsageEventsRepository() { created_at TIMESTAMP NOT NULL DEFAULT NOW() ); + CREATE TABLE apis ( + id VARCHAR(255) PRIMARY KEY, + developer_id VARCHAR(255) NOT NULL + ); + + CREATE TABLE revenue_ledger ( + id BIGSERIAL PRIMARY KEY, + api_id VARCHAR(255) NOT NULL, + developer_id VARCHAR(255) NOT NULL, + amount_usdc NUMERIC(20, 0) NOT NULL, + usage_event_id BIGINT UNIQUE REFERENCES usage_events(id), + created_at TIMESTAMP NOT NULL DEFAULT NOW() + ); + CREATE INDEX idx_usage_events_user_created ON usage_events(user_id, created_at); CREATE INDEX idx_usage_events_api_created ON usage_events(api_id, created_at); `); @@ -344,6 +358,11 @@ test('repository validates blank identifiers, invalid ranges, negative amounts, repository.findByApiId('api-weather', undefined, new Date('nope')), /to must be a valid date\./, ); + + await assert.rejects( + repository.findUnindexedRevenueLedgerEvents('bad-cursor'), + /cursor must be a non-negative integer string\./, + ); } finally { await pool.end(); } @@ -461,3 +480,111 @@ test('repository accepts bigint values returned directly from the database drive assert.equal(events[0]?.id, '7'); assert.equal(events[0]?.amount, 450n); }); + +test('findUnindexedRevenueLedgerEvents resolves developer ownership from apis and skips indexed rows', async () => { + const { repository, pool } = createUsageEventsRepository(); + + try { + await pool.query( + 'INSERT INTO apis (id, developer_id) VALUES ($1, $2), ($3, $4)', + ['api-weather', 'dev-weather', 'api-chat', 'dev-chat'], + ); + + await repository.create({ + userId: 'consumer-1', + apiId: 'api-weather', + endpointId: 'endpoint-1', + apiKeyId: 'key-1', + amount: 100n, + requestId: 'req-ledger-1', + createdAt: new Date('2026-02-01T10:00:00.000Z'), + }); + await repository.create({ + userId: 'consumer-2', + apiId: 'api-chat', + endpointId: 'endpoint-2', + apiKeyId: 'key-2', + amount: 250n, + requestId: 'req-ledger-2', + createdAt: new Date('2026-02-02T10:00:00.000Z'), + }); + await repository.create({ + userId: 'consumer-3', + apiId: 'api-missing', + endpointId: 'endpoint-3', + apiKeyId: 'key-3', + amount: 999n, + requestId: 'req-ledger-3', + createdAt: new Date('2026-02-03T10:00:00.000Z'), + }); + + await pool.query( + ` + INSERT INTO revenue_ledger ( + api_id, + developer_id, + amount_usdc, + usage_event_id, + created_at + ) + VALUES ($1, $2, $3, $4, $5) + `, + ['api-weather', 'dev-weather', '100', '1', new Date('2026-02-01T10:00:00.000Z')], + ); + + const events = await repository.findUnindexedRevenueLedgerEvents(); + + assert.deepEqual(events, [ + { + usageEventId: '2', + apiId: 'api-chat', + developerId: 'dev-chat', + amount: 250n, + createdAt: new Date('2026-02-02T10:00:00.000Z'), + }, + ]); + } finally { + await pool.end(); + } +}); + +test('indexRevenueLedgerEvent inserts idempotently by usageEventId', async () => { + const { repository, pool } = createUsageEventsRepository(); + + try { + await repository.create({ + userId: 'consumer-1', + apiId: 'api-weather', + endpointId: 'endpoint-1', + apiKeyId: 'key-1', + amount: 1500n, + requestId: 'req-ledger-insert', + createdAt: new Date('2026-02-05T10:00:00.000Z'), + }); + + const insertedFirst = await repository.indexRevenueLedgerEvent({ + usageEventId: '1', + apiId: 'api-weather', + developerId: 'dev-weather', + amount: 1500n, + createdAt: new Date('2026-02-05T10:00:00.000Z'), + }); + const insertedDuplicate = await repository.indexRevenueLedgerEvent({ + usageEventId: '1', + apiId: 'api-weather', + developerId: 'dev-weather', + amount: 1500n, + createdAt: new Date('2026-02-05T10:00:00.000Z'), + }); + const count = await pool.query( + 'SELECT COUNT(*)::text AS count FROM revenue_ledger WHERE usage_event_id = $1', + ['1'], + ); + + assert.equal(insertedFirst, true); + assert.equal(insertedDuplicate, false); + assert.equal(count.rows[0]?.count, '1'); + } finally { + await pool.end(); + } +}); diff --git a/src/repositories/usageEventsRepository.pg.ts b/src/repositories/usageEventsRepository.pg.ts index a882224..35c9476 100644 --- a/src/repositories/usageEventsRepository.pg.ts +++ b/src/repositories/usageEventsRepository.pg.ts @@ -21,12 +21,22 @@ export interface BillingUsageEvent { createdAt: Date; } +export interface RevenueLedgerUsageEvent { + usageEventId: string; + apiId: string; + developerId: string; + amount: bigint; + createdAt: Date; +} + export interface UsageEventsPgRepository { create(event: CreateUsageEventInput): Promise; findByUserId(userId: string, from?: Date, to?: Date, limit?: number, offset?: number): Promise; findByApiId(apiId: string, from?: Date, to?: Date, limit?: number, offset?: number): Promise; getTotalSpentByUser(userId: string, from?: Date, to?: Date): Promise; getTotalRevenueByApi(apiId: string, from?: Date, to?: Date): Promise; + findUnindexedRevenueLedgerEvents(cursor?: string, limit?: number): Promise; + indexRevenueLedgerEvent(event: RevenueLedgerUsageEvent): Promise; } export interface UsageEventsRepositoryQueryable { @@ -49,6 +59,14 @@ interface TotalRow { total: string | number | bigint | null; } +interface RevenueLedgerUsageEventRow { + usage_event_id: string | number | bigint; + api_id: string; + developer_id: string; + amount_usdc: string | number | bigint; + created_at: Date | string; +} + const assertNonEmpty = (value: string, fieldName: string): string => { const trimmed = value.trim(); if (!trimmed) { @@ -92,6 +110,19 @@ const normalizeLimit = (limit?: number): number | undefined => { return limit; }; +const normalizeCursor = (cursor?: string): string | undefined => { + if (cursor === undefined) { + return undefined; + } + + const trimmed = cursor.trim(); + if (!/^\d+$/.test(trimmed)) { + throw new Error('cursor must be a non-negative integer string.'); + } + + return trimmed; +}; + const toBigInt = (value: string | number | bigint | null, fieldName: string): bigint => { if (value === null) { return 0n; @@ -129,6 +160,16 @@ const mapUsageEventRow = (row: UsageEventRow): BillingUsageEvent => ({ createdAt: row.created_at instanceof Date ? row.created_at : new Date(row.created_at), }); +const mapRevenueLedgerUsageEventRow = ( + row: RevenueLedgerUsageEventRow, +): RevenueLedgerUsageEvent => ({ + usageEventId: String(row.usage_event_id), + apiId: row.api_id, + developerId: row.developer_id, + amount: toBigInt(row.amount_usdc, 'amount_usdc'), + createdAt: row.created_at instanceof Date ? row.created_at : new Date(row.created_at), +}); + const appendDateFilters = (params: unknown[], clauses: string[], from?: Date, to?: Date): void => { if (from) { params.push(from); @@ -227,6 +268,71 @@ export class PgUsageEventsRepository implements UsageEventsPgRepository { return this.sumByColumn('api_id', assertNonEmpty(apiId, 'apiId'), from, to); } + async findUnindexedRevenueLedgerEvents( + cursor?: string, + limit = 100, + ): Promise { + const normalizedCursor = normalizeCursor(cursor) ?? '0'; + const normalizedLimit = normalizeLimit(limit); + if (normalizedLimit === 0) { + return []; + } + + const result = await this.db.query( + ` + SELECT + ue.id AS usage_event_id, + ue.api_id, + a.developer_id, + ue.amount_usdc, + ue.created_at + FROM usage_events ue + INNER JOIN apis a + ON a.id = ue.api_id + LEFT JOIN revenue_ledger rl + ON rl.usage_event_id = ue.id + WHERE ue.id > $1 + AND rl.usage_event_id IS NULL + ORDER BY ue.id ASC + LIMIT $2 + `, + [normalizedCursor, normalizedLimit], + ); + + return result.rows.map(mapRevenueLedgerUsageEventRow); + } + + async indexRevenueLedgerEvent(event: RevenueLedgerUsageEvent): Promise { + const result = await this.db.query<{ inserted: number }>( + ` + INSERT INTO revenue_ledger ( + api_id, + developer_id, + amount_usdc, + usage_event_id, + created_at + ) + SELECT $1, $2, $3::numeric, $4::bigint, $5::timestamp + WHERE NOT EXISTS ( + SELECT 1 + FROM revenue_ledger + WHERE usage_event_id = $4::bigint + ) + ON CONFLICT (usage_event_id) DO NOTHING + RETURNING 1 AS inserted + `, + [ + assertNonEmpty(event.apiId, 'apiId'), + assertNonEmpty(event.developerId, 'developerId'), + assertAmount(event.amount).toString(), + normalizeCursor(event.usageEventId) ?? '0', + event.createdAt, + ], + ); + + return Boolean(result.rows[0]); + } + private async findByColumn( column: 'user_id' | 'api_id', value: string, diff --git a/src/routes/index.ts b/src/routes/index.ts index 0be483b..efa188a 100644 --- a/src/routes/index.ts +++ b/src/routes/index.ts @@ -1,16 +1,17 @@ import { Router } from 'express'; +import type { RequestHandler } from 'express'; + +import apisRouter from './apis.js'; +import billingRouter from './billing.js'; import healthRouter from './health.js'; import usageRouter from './usage.js'; -import billingRouter from './billing.js'; -import type { RequestHandler } from 'express'; export interface ApiRouterDeps { restRateLimit?: RequestHandler; } -router.use('/health', healthRouter); -router.use('/usage', usageRouter); -router.use('/billing', billingRouter); +export function createApiRouter(deps: ApiRouterDeps = {}): Router { + const router = Router(); router.use('/health', healthRouter); router.use('/apis', apisRouter); diff --git a/src/services/revenueLedgerIndexer.test.ts b/src/services/revenueLedgerIndexer.test.ts new file mode 100644 index 0000000..ce22374 --- /dev/null +++ b/src/services/revenueLedgerIndexer.test.ts @@ -0,0 +1,304 @@ +import assert from 'node:assert/strict'; +import { DataType, newDb } from 'pg-mem'; + +import { + PgUsageEventsRepository, + type UsageEventsRepositoryQueryable, +} from '../repositories/usageEventsRepository.pg.js'; +import { + RevenueLedgerIndexer, + createRevenueLedgerIndexerJob, +} from './revenueLedgerIndexer.js'; + +function createIndexerHarness() { + const db = newDb(); + + db.public.registerFunction({ + name: 'now', + returns: DataType.timestamp, + implementation: () => new Date('2026-03-01T00:00:00.000Z'), + }); + + db.public.none(` + CREATE TABLE usage_events ( + id BIGSERIAL PRIMARY KEY, + user_id VARCHAR(255) NOT NULL, + api_id VARCHAR(255) NOT NULL, + endpoint_id VARCHAR(255) NOT NULL, + api_key_id VARCHAR(255) NOT NULL, + amount_usdc NUMERIC(20, 0) NOT NULL, + request_id VARCHAR(255) NOT NULL UNIQUE, + stellar_tx_hash VARCHAR(64), + created_at TIMESTAMP NOT NULL DEFAULT NOW() + ); + + CREATE TABLE apis ( + id VARCHAR(255) PRIMARY KEY, + developer_id VARCHAR(255) NOT NULL + ); + + CREATE TABLE revenue_ledger ( + id BIGSERIAL PRIMARY KEY, + api_id VARCHAR(255) NOT NULL, + developer_id VARCHAR(255) NOT NULL, + amount_usdc NUMERIC(20, 0) NOT NULL, + usage_event_id BIGINT UNIQUE REFERENCES usage_events(id), + created_at TIMESTAMP NOT NULL DEFAULT NOW() + ); + `); + + const { Pool } = db.adapters.createPg(); + const pool = new Pool(); + const repository = new PgUsageEventsRepository(pool as UsageEventsRepositoryQueryable); + + return { pool, repository }; +} + +test('RevenueLedgerIndexer backfills unindexed usage events in cursor-ordered batches', async () => { + const { pool, repository } = createIndexerHarness(); + + try { + await pool.query( + 'INSERT INTO apis (id, developer_id) VALUES ($1, $2), ($3, $4)', + ['api-1', 'dev-1', 'api-2', 'dev-2'], + ); + + await repository.create({ + userId: 'consumer-1', + apiId: 'api-1', + endpointId: 'endpoint-1', + apiKeyId: 'key-1', + amount: 100n, + requestId: 'req-1', + createdAt: new Date('2026-02-01T10:00:00.000Z'), + }); + await repository.create({ + userId: 'consumer-2', + apiId: 'api-2', + endpointId: 'endpoint-2', + apiKeyId: 'key-2', + amount: 200n, + requestId: 'req-2', + createdAt: new Date('2026-02-02T10:00:00.000Z'), + }); + await repository.create({ + userId: 'consumer-3', + apiId: 'api-1', + endpointId: 'endpoint-3', + apiKeyId: 'key-3', + amount: 300n, + requestId: 'req-3', + createdAt: new Date('2026-02-03T10:00:00.000Z'), + }); + + const indexer = new RevenueLedgerIndexer(repository, { batchSize: 2 }); + const firstRun = await indexer.runOnce(); + const secondRun = await indexer.runOnce(); + const rows = await pool.query( + ` + SELECT usage_event_id::text, api_id, developer_id, amount_usdc::text + FROM revenue_ledger + ORDER BY usage_event_id ASC + `, + ); + + assert.deepEqual(firstRun, { scanned: 3, inserted: 3 }); + assert.deepEqual(secondRun, { scanned: 0, inserted: 0 }); + assert.deepEqual(rows.rows, [ + { usage_event_id: '1', api_id: 'api-1', developer_id: 'dev-1', amount_usdc: '100' }, + { usage_event_id: '2', api_id: 'api-2', developer_id: 'dev-2', amount_usdc: '200' }, + { usage_event_id: '3', api_id: 'api-1', developer_id: 'dev-1', amount_usdc: '300' }, + ]); + } finally { + await pool.end(); + } +}); + +test('RevenueLedgerIndexerJob drains in-flight work during shutdown', async () => { + jest.useFakeTimers(); + + try { + let releaseFirstQuery!: () => void; + const repository = { + findUnindexedRevenueLedgerEvents: jest + .fn() + .mockImplementationOnce( + () => + new Promise((resolve) => { + releaseFirstQuery = () => { + resolve([ + { + usageEventId: '1', + apiId: 'api-1', + developerId: 'dev-1', + amount: 100n, + createdAt: new Date('2026-02-01T10:00:00.000Z'), + }, + ]); + }; + }), + ) + .mockResolvedValueOnce([]), + indexRevenueLedgerEvent: jest.fn().mockResolvedValue(true), + } as unknown as PgUsageEventsRepository; + + const job = createRevenueLedgerIndexerJob(repository, { + intervalMs: 1_000, + batchSize: 10, + logger: { error: jest.fn() }, + }); + + job.start(); + await Promise.resolve(); + + const idlePromise = job.awaitIdle(); + let settled = false; + void idlePromise.then(() => { + settled = true; + }); + + job.beginShutdown(); + jest.advanceTimersByTime(5_000); + await Promise.resolve(); + + assert.equal(settled, false); + releaseFirstQuery(); + await idlePromise; + + expect(repository.indexRevenueLedgerEvent).toHaveBeenCalledTimes(1); + assert.equal(settled, true); + } finally { + jest.useRealTimers(); + } +}); + +test('RevenueLedgerIndexer validates configuration and logs insert failures', async () => { + assert.throws( + () => + new RevenueLedgerIndexer( + { + findUnindexedRevenueLedgerEvents: async () => [], + indexRevenueLedgerEvent: async () => true, + } as unknown as PgUsageEventsRepository, + { batchSize: 0 }, + ), + /batchSize must be a positive integer\./, + ); + + const logger = { error: jest.fn() }; + const repository = { + findUnindexedRevenueLedgerEvents: jest + .fn() + .mockResolvedValueOnce([ + { + usageEventId: '9', + apiId: 'api-9', + developerId: 'dev-9', + amount: 999n, + createdAt: new Date('2026-02-09T10:00:00.000Z'), + }, + ]), + indexRevenueLedgerEvent: jest.fn().mockRejectedValue(new Error('boom')), + } as unknown as PgUsageEventsRepository; + + const indexer = new RevenueLedgerIndexer(repository, { logger }); + + await assert.rejects(indexer.runOnce(), /boom/); + await indexer.awaitIdle(); + expect(logger.error).toHaveBeenCalledWith( + 'Revenue ledger indexing failed for usage event', + expect.objectContaining({ usageEventId: '9' }), + ); +}); + +test('RevenueLedgerIndexerJob validates interval and skips overlapping ticks', async () => { + assert.throws( + () => + createRevenueLedgerIndexerJob( + { + findUnindexedRevenueLedgerEvents: async () => [], + indexRevenueLedgerEvent: async () => true, + } as unknown as PgUsageEventsRepository, + { intervalMs: 0 }, + ), + /intervalMs must be a positive integer\./, + ); + + jest.useFakeTimers(); + + try { + let release!: () => void; + const repository = { + findUnindexedRevenueLedgerEvents: jest + .fn() + .mockImplementationOnce( + () => + new Promise((resolve) => { + release = () => resolve([]); + }), + ), + indexRevenueLedgerEvent: jest.fn(), + } as unknown as PgUsageEventsRepository; + + const job = createRevenueLedgerIndexerJob(repository, { + intervalMs: 100, + batchSize: 10, + logger: { error: jest.fn() }, + }); + + job.stop(); + job.start(); + job.start(); + await Promise.resolve(); + await Promise.resolve(); + await Promise.resolve(); + jest.advanceTimersByTime(500); + await Promise.resolve(); + + expect(repository.findUnindexedRevenueLedgerEvents).toHaveBeenCalledTimes(1); + + release(); + await job.awaitIdle(); + + job.beginShutdown(); + job.start(); + jest.advanceTimersByTime(500); + await Promise.resolve(); + + expect(repository.findUnindexedRevenueLedgerEvents).toHaveBeenCalledTimes(1); + } finally { + jest.useRealTimers(); + } +}); + +test('RevenueLedgerIndexerJob logs job failures and stop is safe when idle', async () => { + const logger = { error: jest.fn() }; + const repository = { + findUnindexedRevenueLedgerEvents: jest.fn().mockResolvedValueOnce([ + { + usageEventId: '11', + apiId: 'api-11', + developerId: 'dev-11', + amount: 11n, + createdAt: new Date('2026-02-11T10:00:00.000Z'), + }, + ]), + indexRevenueLedgerEvent: jest.fn().mockRejectedValue(new Error('job-failure')), + } as unknown as PgUsageEventsRepository; + + const job = createRevenueLedgerIndexerJob(repository, { + intervalMs: 1_000, + batchSize: 10, + logger, + }); + + job.stop(); + job.start(); + await expect(job.awaitIdle()).rejects.toThrow('job-failure'); + job.stop(); + + expect(logger.error).toHaveBeenCalledWith( + 'Revenue ledger indexer job failed:', + expect.any(Error), + ); +}); diff --git a/src/services/revenueLedgerIndexer.ts b/src/services/revenueLedgerIndexer.ts new file mode 100644 index 0000000..6ddbcfa --- /dev/null +++ b/src/services/revenueLedgerIndexer.ts @@ -0,0 +1,159 @@ +import type { + RevenueLedgerUsageEvent, + UsageEventsPgRepository, +} from '../repositories/usageEventsRepository.pg.js'; + +export interface RevenueLedgerIndexerOptions { + batchSize?: number; + logger?: Pick; +} + +export interface RevenueLedgerIndexerRunResult { + scanned: number; + inserted: number; +} + +export class RevenueLedgerIndexer { + private readonly batchSize: number; + private readonly logger: Pick; + private runTail: Promise = Promise.resolve(); + + constructor( + private readonly usageEventsRepository: UsageEventsPgRepository, + options: RevenueLedgerIndexerOptions = {}, + ) { + this.batchSize = options.batchSize ?? 100; + if (!Number.isInteger(this.batchSize) || this.batchSize <= 0) { + throw new Error('batchSize must be a positive integer.'); + } + + this.logger = options.logger ?? console; + } + + async runOnce(): Promise { + const previousRun = this.runTail.catch(() => undefined); + let releaseRun!: () => void; + this.runTail = new Promise((resolve) => { + releaseRun = resolve; + }); + + await previousRun; + + try { + return await this.runOnceInternal(); + } finally { + releaseRun(); + } + } + + async awaitIdle(): Promise { + await this.runTail.catch(() => undefined); + } + + private async runOnceInternal(): Promise { + let cursor: string | undefined; + let scanned = 0; + let inserted = 0; + + while (true) { + const events = await this.usageEventsRepository.findUnindexedRevenueLedgerEvents( + cursor, + this.batchSize, + ); + if (events.length === 0) { + return { scanned, inserted }; + } + + scanned += events.length; + for (const event of events) { + inserted += await this.insertEvent(event); + } + + cursor = events[events.length - 1]?.usageEventId; + } + } + + private async insertEvent(event: RevenueLedgerUsageEvent): Promise { + try { + return (await this.usageEventsRepository.indexRevenueLedgerEvent(event)) ? 1 : 0; + } catch (error) { + this.logger.error('Revenue ledger indexing failed for usage event', { + usageEventId: event.usageEventId, + error, + }); + throw error; + } + } +} + +export interface RevenueLedgerIndexerJobOptions extends RevenueLedgerIndexerOptions { + intervalMs: number; +} + +export interface RevenueLedgerIndexerJob { + start(): void; + stop(): void; + beginShutdown(): void; + awaitIdle(): Promise; +} + +export function createRevenueLedgerIndexerJob( + usageEventsRepository: UsageEventsPgRepository, + options: RevenueLedgerIndexerJobOptions, +): RevenueLedgerIndexerJob { + const logger = options.logger ?? console; + if (!Number.isInteger(options.intervalMs) || options.intervalMs <= 0) { + throw new Error('intervalMs must be a positive integer.'); + } + + const indexer = new RevenueLedgerIndexer(usageEventsRepository, options); + let timer: NodeJS.Timeout | null = null; + let accepting = true; + let running: Promise | null = null; + + const tick = async (): Promise => { + if (!accepting || running) { + return; + } + + running = indexer.runOnce(); + try { + await running; + } catch (error) { + logger.error('Revenue ledger indexer job failed:', error); + } finally { + running = null; + } + }; + + return { + start() { + if (timer || !accepting) { + return; + } + + void tick(); + timer = setInterval(() => { + void tick(); + }, options.intervalMs); + }, + stop() { + if (!timer) { + return; + } + + clearInterval(timer); + timer = null; + }, + beginShutdown() { + accepting = false; + if (timer) { + clearInterval(timer); + timer = null; + } + }, + async awaitIdle() { + await (running ?? indexer.awaitIdle()); + }, + }; +} diff --git a/src/services/usageStore.ts b/src/services/usageStore.ts index be81e4b..2d6794a 100644 --- a/src/services/usageStore.ts +++ b/src/services/usageStore.ts @@ -178,12 +178,18 @@ export class PostgresUsageStore implements UsageStore { usage_event_id, created_at ) - VALUES ($1, $2, $3, $4, $5) + SELECT + $1, + a.developer_id, + $2::numeric, + $3::bigint, + $4::timestamp + FROM apis a + WHERE a.id = $1 ON CONFLICT (usage_event_id) DO NOTHING `, [ event.apiId, - event.userId, event.amountUsdc, inserted.id, event.timestamp, diff --git a/src/types/developer.ts b/src/types/developer.ts index 275c9ac..eb05e57 100644 --- a/src/types/developer.ts +++ b/src/types/developer.ts @@ -1,5 +1,18 @@ import type { Awaitable } from './awaitable.js'; +export const developerCategoryEnum = [ + 'analytics', + 'developer-tools', + 'finance', + 'payments', + 'security', + 'ai', + 'data', + 'productivity', +] as const; + +export type DeveloperCategory = (typeof developerCategoryEnum)[number]; + export interface Settlement { id: string; developerId: string; // the dev receiving the payout @@ -26,6 +39,13 @@ export interface DeveloperRevenueResponse { }; } +export interface UpdateDeveloperProfileInput { + name?: string | null; + website?: string | null; + description?: string | null; + category?: DeveloperCategory | null; +} + export interface SettlementStore { create(settlement: Settlement): Awaitable; updateStatus(id: string, status: Settlement['status'], txHash?: string | null): Awaitable;