diff --git a/src/application/services/SubscriptionOrchestrator.ts b/src/application/services/SubscriptionOrchestrator.ts new file mode 100644 index 0000000..7ae1d31 --- /dev/null +++ b/src/application/services/SubscriptionOrchestrator.ts @@ -0,0 +1,223 @@ +import { Chain } from '../../shared/types'; +import type { + ISubscriptionService, + ChainSubscriptionEvent, + PortfolioUpdateEvent, + LiveSubscriptionHandle, + SubscriptionStatus, +} from '../../contracts/subscriptions/ISubscriptionService'; + +type UpdateCallback = (event: PortfolioUpdateEvent) => void; + +/** + * Debounce windows per chain (ms). + * Ethereum/L2s have 12s+ block times so no debounce needed. + * Solana has fast block times — debounce 2s. + * Polygon has 2s blocks — debounce 5s. + */ +const DEBOUNCE_MS: Partial> = { + [Chain.SOLANA]: 2000, + [Chain.POLYGON]: 5000, +}; + +const DEDUP_WINDOW_MS = 500; + +interface Subscription { + id: number; + addresses: Map; + callback: UpdateCallback; + chainUnsubscribers: Array<() => void>; + active: boolean; +} + +export class SubscriptionOrchestrator { + private services: Map; + private subscriptions: Map = new Map(); + private nextId = 1; + private disposed = false; + + // Dedup state: key = `${chain}:${address}`, value = pending event timer + private dedupTimers: Map> = new Map(); + // Debounce state: key = `${chain}:${address}`, value = debounce timer + private debounceTimers: Map> = new Map(); + // Pending debounced events: key = `${chain}:${address}` + private debouncedEvents: Map = new Map(); + + constructor(services: Map) { + this.services = services; + } + + subscribeToAddresses( + addresses: Map, + callback: UpdateCallback + ): LiveSubscriptionHandle { + const id = this.nextId++; + const chainUnsubscribers: Array<() => void> = []; + + const subscription: Subscription = { + id, + addresses, + callback, + chainUnsubscribers, + active: true, + }; + + this.subscriptions.set(id, subscription); + + // Create chain-level subscriptions + for (const [chain, addrs] of addresses) { + const service = this.services.get(chain); + if (!service) continue; + + const unsub = service.subscribe(addrs, (rawEvent: ChainSubscriptionEvent) => { + if (this.disposed) return; + this.handleRawEvent(rawEvent); + }); + + chainUnsubscribers.push(unsub); + } + + return { + unsubscribe: () => { + subscription.active = false; + for (const unsub of subscription.chainUnsubscribers) { + unsub(); + } + subscription.chainUnsubscribers.length = 0; + this.subscriptions.delete(id); + }, + isActive: () => subscription.active, + }; + } + + getConnectionStatus(): Map { + const result = new Map(); + for (const [chain, service] of this.services) { + result.set(chain, service.getStatus()); + } + return result; + } + + dispose(): void { + this.disposed = true; + + // Unsubscribe all + for (const sub of this.subscriptions.values()) { + sub.active = false; + for (const unsub of sub.chainUnsubscribers) { + unsub(); + } + sub.chainUnsubscribers.length = 0; + } + this.subscriptions.clear(); + + // Clear all timers + for (const timer of this.dedupTimers.values()) { + clearTimeout(timer); + } + this.dedupTimers.clear(); + + for (const timer of this.debounceTimers.values()) { + clearTimeout(timer); + } + this.debounceTimers.clear(); + this.debouncedEvents.clear(); + } + + // -- Internal -- + + private handleRawEvent(raw: ChainSubscriptionEvent): void { + const normalized = this.normalizeEvent(raw); + const dedupKey = `${normalized.chain}:${normalized.address}`; + const debounceMs = DEBOUNCE_MS[normalized.chain as Chain]; + + if (debounceMs) { + this.debounceEvent(dedupKey, normalized, debounceMs); + } else { + this.deduplicateAndEmit(dedupKey, normalized); + } + } + + private normalizeEvent(raw: ChainSubscriptionEvent): PortfolioUpdateEvent { + return { + chain: raw.chain, + address: raw.address, + updateType: this.mapEventType(raw.eventType), + timestamp: raw.timestamp, + }; + } + + private mapEventType(eventType: string): PortfolioUpdateEvent['updateType'] { + const lower = eventType.toLowerCase(); + if (lower.includes('mint') || lower.includes('new')) return 'new_asset'; + if (lower.includes('burn') || lower.includes('remove')) return 'removed_asset'; + if (lower.includes('price')) return 'price_change'; + return 'balance_change'; + } + + /** + * Debounce: wait until `debounceMs` of silence before emitting. + * Each new event resets the timer. + */ + private debounceEvent( + key: string, + event: PortfolioUpdateEvent, + debounceMs: number + ): void { + // Store latest event + this.debouncedEvents.set(key, event); + + // Clear existing debounce timer + const existing = this.debounceTimers.get(key); + if (existing) clearTimeout(existing); + + // Set new debounce timer + const timer = setTimeout(() => { + this.debounceTimers.delete(key); + const pending = this.debouncedEvents.get(key); + this.debouncedEvents.delete(key); + if (pending) { + this.emitToSubscribers(pending); + } + }, debounceMs); + + this.debounceTimers.set(key, timer); + } + + /** + * Dedup: if we already have a pending event for this key within DEDUP_WINDOW_MS, + * swallow the new event. Otherwise schedule emission after the dedup window. + */ + private deduplicateAndEmit(key: string, event: PortfolioUpdateEvent): void { + if (this.dedupTimers.has(key)) { + // Already have a pending event — deduplicate (swallow) + return; + } + + // Schedule emission after dedup window + const timer = setTimeout(() => { + this.dedupTimers.delete(key); + this.emitToSubscribers(event); + }, DEDUP_WINDOW_MS); + + this.dedupTimers.set(key, timer); + } + + private emitToSubscribers(event: PortfolioUpdateEvent): void { + if (this.disposed) return; + + for (const sub of this.subscriptions.values()) { + if (!sub.active) continue; + + // Only emit to subscribers interested in this chain/address + const chainAddresses = sub.addresses.get(event.chain as Chain); + if (chainAddresses && chainAddresses.includes(event.address)) { + try { + sub.callback(event); + } catch { + // Swallow subscriber errors + } + } + } + } +} diff --git a/src/contracts/subscriptions/ISubscriptionService.ts b/src/contracts/subscriptions/ISubscriptionService.ts new file mode 100644 index 0000000..7040fd3 --- /dev/null +++ b/src/contracts/subscriptions/ISubscriptionService.ts @@ -0,0 +1,60 @@ +import type { Chain } from '../../shared/types'; + +/** + * Status of a subscription connection + */ +export const SubscriptionStatus = { + CONNECTED: 'connected', + CONNECTING: 'connecting', + DISCONNECTED: 'disconnected', + ERROR: 'error' +} as const; + +export type SubscriptionStatus = typeof SubscriptionStatus[keyof typeof SubscriptionStatus]; + +/** + * Chain-specific raw subscription event from integration services. + * Each integration (evm, sol) emits events in its own format; + * the SubscriptionOrchestrator normalizes them. + */ +export interface ChainSubscriptionEvent { + chain: Chain; + address: string; + eventType: string; + data: unknown; + timestamp: Date; +} + +/** + * Callback for chain subscription events + */ +export type ChainEventCallback = (event: ChainSubscriptionEvent) => void; + +/** + * Interface expected from evm-integration and sol-integration SubscriptionServices. + * These are being built in parallel — implement against this contract and mock in tests. + */ +export interface ISubscriptionService { + subscribe(addresses: string[], callback: ChainEventCallback): () => void; + getStatus(): SubscriptionStatus; + getChain(): Chain; +} + +/** + * Normalized portfolio update event emitted by the SubscriptionOrchestrator + */ +export interface PortfolioUpdateEvent { + chain: Chain; + address: string; + assetId?: string; + updateType: 'balance_change' | 'price_change' | 'new_asset' | 'removed_asset'; + timestamp: Date; +} + +/** + * Handle returned by subscribeToLiveUpdates() for managing the subscription + */ +export interface LiveSubscriptionHandle { + unsubscribe: () => void; + isActive: () => boolean; +} diff --git a/src/domain/events/DomainEvent.ts b/src/domain/events/DomainEvent.ts index c5439f3..0a89e50 100644 --- a/src/domain/events/DomainEvent.ts +++ b/src/domain/events/DomainEvent.ts @@ -67,5 +67,11 @@ export enum DomainEventType { // Circuit Breaker Events CIRCUIT_BREAKER_OPENED = 'CircuitBreakerOpened', CIRCUIT_BREAKER_CLOSED = 'CircuitBreakerClosed', - CIRCUIT_BREAKER_HALF_OPEN = 'CircuitBreakerHalfOpen' + CIRCUIT_BREAKER_HALF_OPEN = 'CircuitBreakerHalfOpen', + + // Subscription Events + SUBSCRIPTION_STARTED = 'SubscriptionStarted', + SUBSCRIPTION_STOPPED = 'SubscriptionStopped', + SUBSCRIPTION_ERROR = 'SubscriptionError', + PORTFOLIO_LIVE_UPDATE = 'PortfolioLiveUpdate' } \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index 19420bd..a2579f5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -72,7 +72,7 @@ export { type WalletConnection } from './application/services/AddressRegistryService'; -export { +export { SyncOrchestratorService, SyncCycleStartedEvent, SyncCycleCompletedEvent, @@ -81,6 +81,8 @@ export { type SourceMetrics } from './application/services/SyncOrchestratorService'; +export { SubscriptionOrchestrator } from './application/services/SubscriptionOrchestrator'; + // Commands export { type ICommand, @@ -162,11 +164,21 @@ export { } from './contracts/patterns/IRateLimiter'; // Event Interfaces -export { +export { type IEventEmitter, type EventHandler as IEventHandler } from './contracts/events/IEventEmitter'; +// Subscription Interfaces +export { + SubscriptionStatus, + type ISubscriptionService, + type ChainSubscriptionEvent, + type ChainEventCallback, + type PortfolioUpdateEvent, + type LiveSubscriptionHandle +} from './contracts/subscriptions/ISubscriptionService'; + // ============================================================================ // Shared Types Exports // ============================================================================ @@ -204,11 +216,14 @@ import type { IAddressRepository } from './contracts/repositories/IAddressReposi import type { ICircuitBreaker } from './contracts/patterns/ICircuitBreaker'; import type { IRateLimiter } from './contracts/patterns/IRateLimiter'; import type { IEventEmitter } from './contracts/events/IEventEmitter'; +import type { ISubscriptionService } from './contracts/subscriptions/ISubscriptionService'; import { IntegrationSource, Environment } from './shared/types'; +import type { Chain } from './shared/types'; import type { EnvironmentValidatorConfig } from './infrastructure/validation/EnvironmentValidator'; import { PortfolioAggregationService } from './application/services/PortfolioAggregationService'; import { AddressRegistryService } from './application/services/AddressRegistryService'; import { SyncOrchestratorService } from './application/services/SyncOrchestratorService'; +import { SubscriptionOrchestrator } from './application/services/SubscriptionOrchestrator'; /** * Factory for creating portfolio aggregation service with default configuration @@ -253,6 +268,12 @@ export class PortfolioServiceFactory { config.environmentValidatorConfig ); } + + static createSubscriptionOrchestrator(config: { + subscriptionServices: Map; + }): SubscriptionOrchestrator { + return new SubscriptionOrchestrator(config.subscriptionServices); + } } // ============================================================================ @@ -279,6 +300,7 @@ export default { PortfolioAggregationService, AddressRegistryService, SyncOrchestratorService, + SubscriptionOrchestrator, PortfolioServiceFactory, VERSION, LIBRARY_NAME diff --git a/src/tests/unit/application/services/SubscriptionOrchestrator.test.ts b/src/tests/unit/application/services/SubscriptionOrchestrator.test.ts new file mode 100644 index 0000000..0b30497 --- /dev/null +++ b/src/tests/unit/application/services/SubscriptionOrchestrator.test.ts @@ -0,0 +1,407 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { SubscriptionOrchestrator } from '../../../../application/services/SubscriptionOrchestrator'; +import type { + ISubscriptionService, + ChainSubscriptionEvent, + ChainEventCallback, + PortfolioUpdateEvent +} from '../../../../contracts/subscriptions/ISubscriptionService'; +import { SubscriptionStatus } from '../../../../contracts/subscriptions/ISubscriptionService'; +import { Chain } from '../../../../shared/types'; + +// -- Mock helpers -- + +function createMockSubscriptionService(chain: Chain): ISubscriptionService & { + simulateEvent: (event: Partial) => void; + _callbacks: Set; +} { + const callbacks = new Set(); + + const service: ISubscriptionService & { + simulateEvent: (event: Partial) => void; + _callbacks: Set; + } = { + _callbacks: callbacks, + subscribe: vi.fn((_addresses: string[], callback: ChainEventCallback) => { + callbacks.add(callback); + return () => { callbacks.delete(callback); }; + }), + getStatus: vi.fn(() => SubscriptionStatus.CONNECTED), + getChain: vi.fn(() => chain), + simulateEvent(partial: Partial) { + const event: ChainSubscriptionEvent = { + chain, + address: partial.address || '0xabc', + eventType: partial.eventType || 'Transfer', + data: partial.data || {}, + timestamp: partial.timestamp || new Date(), + }; + for (const cb of callbacks) { + cb(event); + } + } + }; + + return service; +} + +describe('SubscriptionOrchestrator', () => { + let orchestrator: SubscriptionOrchestrator; + let ethService: ReturnType; + let polygonService: ReturnType; + let solService: ReturnType; + + beforeEach(() => { + vi.useFakeTimers(); + ethService = createMockSubscriptionService(Chain.ETHEREUM); + polygonService = createMockSubscriptionService(Chain.POLYGON); + solService = createMockSubscriptionService(Chain.SOLANA); + + orchestrator = new SubscriptionOrchestrator( + new Map([ + [Chain.ETHEREUM, ethService], + [Chain.POLYGON, polygonService], + [Chain.SOLANA, solService], + ]) + ); + }); + + afterEach(() => { + orchestrator.dispose(); + vi.useRealTimers(); + }); + + // ===================================================================== + // 1. Lifecycle management + // ===================================================================== + describe('lifecycle management', () => { + it('should create subscriptions for tracked addresses', () => { + const addresses = new Map([ + [Chain.ETHEREUM, ['0xabc', '0xdef']], + [Chain.SOLANA, ['5UtaX']], + ]); + + orchestrator.subscribeToAddresses(addresses, () => {}); + + expect(ethService.subscribe).toHaveBeenCalledWith( + ['0xabc', '0xdef'], + expect.any(Function) + ); + expect(solService.subscribe).toHaveBeenCalledWith( + ['5UtaX'], + expect.any(Function) + ); + // Polygon not called because no polygon addresses given + expect(polygonService.subscribe).not.toHaveBeenCalled(); + }); + + it('should return a handle that can unsubscribe', () => { + const addresses = new Map([ + [Chain.ETHEREUM, ['0xabc']], + ]); + + const handle = orchestrator.subscribeToAddresses(addresses, () => {}); + + expect(handle.isActive()).toBe(true); + handle.unsubscribe(); + expect(handle.isActive()).toBe(false); + }); + + it('should skip chains with no subscription service', () => { + const addresses = new Map([ + [Chain.BITCOIN, ['bc1q']], + ]); + + // Should not throw + const handle = orchestrator.subscribeToAddresses(addresses, () => {}); + expect(handle.isActive()).toBe(true); + }); + + it('should report connection status per chain', () => { + const statuses = orchestrator.getConnectionStatus(); + + expect(statuses.get(Chain.ETHEREUM)).toBe(SubscriptionStatus.CONNECTED); + expect(statuses.get(Chain.POLYGON)).toBe(SubscriptionStatus.CONNECTED); + expect(statuses.get(Chain.SOLANA)).toBe(SubscriptionStatus.CONNECTED); + }); + }); + + // ===================================================================== + // 2. Event normalization + // ===================================================================== + describe('event normalization', () => { + it('should normalize chain-specific events to PortfolioUpdateEvent', () => { + const received: PortfolioUpdateEvent[] = []; + const addresses = new Map([ + [Chain.ETHEREUM, ['0xabc']], + ]); + + orchestrator.subscribeToAddresses(addresses, (event) => { + received.push(event); + }); + + ethService.simulateEvent({ + address: '0xabc', + eventType: 'Transfer', + data: { value: '1000' }, + }); + + // No debounce on ethereum, but dedup window applies — flush timers + vi.advanceTimersByTime(600); + + expect(received.length).toBe(1); + expect(received[0].chain).toBe(Chain.ETHEREUM); + expect(received[0].address).toBe('0xabc'); + expect(received[0].updateType).toBe('balance_change'); + }); + + it('should map different chain event types to normalized update types', () => { + const received: PortfolioUpdateEvent[] = []; + const addresses = new Map([ + [Chain.ETHEREUM, ['0xabc']], + ]); + + orchestrator.subscribeToAddresses(addresses, (event) => { + received.push(event); + }); + + ethService.simulateEvent({ + address: '0xabc', + eventType: 'TokenMint', + }); + + vi.advanceTimersByTime(600); + + expect(received.length).toBe(1); + expect(received[0].updateType).toBe('new_asset'); + }); + }); + + // ===================================================================== + // 3. Deduplication (500ms window per address per chain) + // ===================================================================== + describe('deduplication', () => { + it('should deduplicate events within 500ms for same address+chain', () => { + const received: PortfolioUpdateEvent[] = []; + const addresses = new Map([ + [Chain.ETHEREUM, ['0xabc']], + ]); + + orchestrator.subscribeToAddresses(addresses, (event) => { + received.push(event); + }); + + // Fire two events for same address within 500ms + ethService.simulateEvent({ address: '0xabc', eventType: 'Transfer' }); + vi.advanceTimersByTime(100); + ethService.simulateEvent({ address: '0xabc', eventType: 'BalanceChange' }); + + // At 500ms, the dedup window closes and emits once + vi.advanceTimersByTime(500); + + expect(received.length).toBe(1); + }); + + it('should not deduplicate events for different addresses', () => { + const received: PortfolioUpdateEvent[] = []; + const addresses = new Map([ + [Chain.ETHEREUM, ['0xabc', '0xdef']], + ]); + + orchestrator.subscribeToAddresses(addresses, (event) => { + received.push(event); + }); + + ethService.simulateEvent({ address: '0xabc', eventType: 'Transfer' }); + ethService.simulateEvent({ address: '0xdef', eventType: 'Transfer' }); + + vi.advanceTimersByTime(600); + + expect(received.length).toBe(2); + }); + + it('should not deduplicate events outside the 500ms window', () => { + const received: PortfolioUpdateEvent[] = []; + const addresses = new Map([ + [Chain.ETHEREUM, ['0xabc']], + ]); + + orchestrator.subscribeToAddresses(addresses, (event) => { + received.push(event); + }); + + ethService.simulateEvent({ address: '0xabc', eventType: 'Transfer' }); + vi.advanceTimersByTime(600); // First event fires + + ethService.simulateEvent({ address: '0xabc', eventType: 'Transfer' }); + vi.advanceTimersByTime(600); // Second event fires + + expect(received.length).toBe(2); + }); + }); + + // ===================================================================== + // 4. Per-chain debouncing + // ===================================================================== + describe('debouncing', () => { + it('should debounce Solana events with 2s window', () => { + const received: PortfolioUpdateEvent[] = []; + const addresses = new Map([ + [Chain.SOLANA, ['5UtaX']], + ]); + + orchestrator.subscribeToAddresses(addresses, (event) => { + received.push(event); + }); + + // Fire rapid events within 2s + solService.simulateEvent({ address: '5UtaX', eventType: 'AccountChange' }); + vi.advanceTimersByTime(500); + solService.simulateEvent({ address: '5UtaX', eventType: 'AccountChange' }); + vi.advanceTimersByTime(500); + solService.simulateEvent({ address: '5UtaX', eventType: 'AccountChange' }); + + // Still within debounce window, nothing emitted + expect(received.length).toBe(0); + + // After 2s debounce from last event + vi.advanceTimersByTime(2500); + + expect(received.length).toBe(1); + }); + + it('should debounce Polygon events with 5s window', () => { + const received: PortfolioUpdateEvent[] = []; + const addresses = new Map([ + [Chain.POLYGON, ['0xabc']], + ]); + + orchestrator.subscribeToAddresses(addresses, (event) => { + received.push(event); + }); + + polygonService.simulateEvent({ address: '0xabc', eventType: 'Transfer' }); + vi.advanceTimersByTime(2000); + polygonService.simulateEvent({ address: '0xabc', eventType: 'Transfer' }); + + // Only 2s after second event — still within 5s debounce + vi.advanceTimersByTime(3000); + expect(received.length).toBe(0); + + // Now 5s after last event + vi.advanceTimersByTime(2500); + expect(received.length).toBe(1); + }); + + it('should NOT debounce Ethereum events (12s+ block times)', () => { + const received: PortfolioUpdateEvent[] = []; + const addresses = new Map([ + [Chain.ETHEREUM, ['0xabc']], + ]); + + orchestrator.subscribeToAddresses(addresses, (event) => { + received.push(event); + }); + + ethService.simulateEvent({ address: '0xabc', eventType: 'Transfer' }); + vi.advanceTimersByTime(600); // Past dedup window + + // Event fires immediately (after dedup window only) + expect(received.length).toBe(1); + }); + + it('should NOT debounce Arbitrum/Optimism events', () => { + const arbService = createMockSubscriptionService(Chain.ARBITRUM); + const orch = new SubscriptionOrchestrator( + new Map([ + [Chain.ARBITRUM, arbService], + ]) + ); + + const received: PortfolioUpdateEvent[] = []; + orch.subscribeToAddresses( + new Map([[Chain.ARBITRUM, ['0xabc']]]), + (event) => { received.push(event); } + ); + + arbService.simulateEvent({ address: '0xabc', eventType: 'Transfer' }); + vi.advanceTimersByTime(600); + + expect(received.length).toBe(1); + orch.dispose(); + }); + }); + + // ===================================================================== + // 5. Multiple subscribers + // ===================================================================== + describe('multiple subscribers', () => { + it('should notify all active subscribers', () => { + const received1: PortfolioUpdateEvent[] = []; + const received2: PortfolioUpdateEvent[] = []; + const addresses = new Map([ + [Chain.ETHEREUM, ['0xabc']], + ]); + + orchestrator.subscribeToAddresses(addresses, (e) => { received1.push(e); }); + orchestrator.subscribeToAddresses(addresses, (e) => { received2.push(e); }); + + ethService.simulateEvent({ address: '0xabc', eventType: 'Transfer' }); + vi.advanceTimersByTime(600); + + expect(received1.length).toBe(1); + expect(received2.length).toBe(1); + }); + + it('should stop notifying unsubscribed listeners', () => { + const received1: PortfolioUpdateEvent[] = []; + const received2: PortfolioUpdateEvent[] = []; + const addresses = new Map([ + [Chain.ETHEREUM, ['0xabc']], + ]); + + const handle1 = orchestrator.subscribeToAddresses(addresses, (e) => { received1.push(e); }); + orchestrator.subscribeToAddresses(addresses, (e) => { received2.push(e); }); + + handle1.unsubscribe(); + + ethService.simulateEvent({ address: '0xabc', eventType: 'Transfer' }); + vi.advanceTimersByTime(600); + + expect(received1.length).toBe(0); + expect(received2.length).toBe(1); + }); + }); + + // ===================================================================== + // 6. Dispose + // ===================================================================== + describe('dispose', () => { + it('should clean up all subscriptions on dispose', () => { + const addresses = new Map([ + [Chain.ETHEREUM, ['0xabc']], + [Chain.SOLANA, ['5UtaX']], + ]); + + const handle = orchestrator.subscribeToAddresses(addresses, () => {}); + orchestrator.dispose(); + + expect(handle.isActive()).toBe(false); + }); + + it('should not emit events after dispose', () => { + const received: PortfolioUpdateEvent[] = []; + const addresses = new Map([ + [Chain.ETHEREUM, ['0xabc']], + ]); + + orchestrator.subscribeToAddresses(addresses, (e) => { received.push(e); }); + orchestrator.dispose(); + + ethService.simulateEvent({ address: '0xabc', eventType: 'Transfer' }); + vi.advanceTimersByTime(600); + + expect(received.length).toBe(0); + }); + }); +});