📝 Description
Context: apps/aggregator already has: normalization, aggregation strategies, metrics, health checks, and an in-memory debug store exposed at GET /debug/prices. It also has DataReceptionService which connects to the ingestor WebSocket and emits price.received events — but there’s currently no “glue” that listens to those events, normalizes the payload, runs aggregation, and keeps a continuously-updated latest snapshot per symbol.
Goal: Add a small “stream processor” inside the aggregator that:
- listens for incoming price events
- converts/validates them
- normalizes them into the canonical format
- maintains a short rolling buffer of recent normalized prices per symbol
- calls
AggregationService.aggregate(...) when enough sources are present
- updates the in-memory latest aggregated snapshot (already handled via
DebugService.setLastAggregated inside AggregationService)
This makes apps/aggregator actually produce real-time aggregated results that downstream services (API/Frontend) can consume.
Tech stack / conventions: NestJS (existing), TypeScript, @nestjs/event-emitter for @OnEvent(...) handler, reuse existing NormalizationService, AggregationService, and DebugService. Keep it in-memory (no Redis persistence in this issue).
Complexity: ~6/10 — event-driven wiring + buffering + tests; nothing on-chain.
✅ Requirements
- Add an event listener for
price.received (emitted by DataReceptionService) using @OnEvent('price.received').
- Convert the event payload into a
RawPrice-compatible object (notably: parse ISO timestamp into unix ms) and run it through NormalizationService.
- Maintain an in-memory buffer per symbol (and optionally per source) for recent
NormalizedPrice entries:
- configurable time window (e.g.
AGG_TIME_WINDOW_MS, default 30s)
- configurable
minSources (default 3, matching AggregationService)
- When a symbol has enough recent sources, call
AggregationService.aggregate(symbol, prices, options) and let it update DebugService’s lastAggregated and lastNormalized.
- Fix/complete any missing Nest module wiring needed for this flow (e.g. ensure
EventEmitterModule, HttpModule and providers referenced in AppModule are properly imported/registered).
- Add tests proving: an incoming event updates the rolling buffer; aggregation runs when thresholds are met; debug store updates.
🎯 Acceptance Criteria
📁 Expected files to change/structure
apps/aggregator/src/services/price-stream-processor.service.ts (new) — @OnEvent handler + buffering + trigger aggregation.
apps/aggregator/src/app.module.ts — ensure all imports/providers are correctly wired for event emitter + reception + processor.
apps/aggregator/src/services/*.spec.ts (new or update) — unit tests for processor behavior.
apps/aggregator/.env.example (optional) — document AGG_TIME_WINDOW_MS, AGG_MIN_SOURCES, and ingestor connection vars if needed.
Note: don't worry about CI workflow for now, it's an issue on our end we will fix after we merge all issues!
Thank you for taking this issue! You are helping us make RWAs consumer friendly on Stellar.
📝 Description
Context:
apps/aggregatoralready has: normalization, aggregation strategies, metrics, health checks, and an in-memory debug store exposed atGET /debug/prices. It also hasDataReceptionServicewhich connects to the ingestor WebSocket and emitsprice.receivedevents — but there’s currently no “glue” that listens to those events, normalizes the payload, runs aggregation, and keeps a continuously-updated latest snapshot per symbol.Goal: Add a small “stream processor” inside the aggregator that:
AggregationService.aggregate(...)when enough sources are presentDebugService.setLastAggregatedinsideAggregationService)This makes
apps/aggregatoractually produce real-time aggregated results that downstream services (API/Frontend) can consume.Tech stack / conventions: NestJS (existing), TypeScript,
@nestjs/event-emitterfor@OnEvent(...)handler, reuse existingNormalizationService,AggregationService, andDebugService. Keep it in-memory (no Redis persistence in this issue).Complexity: ~6/10 — event-driven wiring + buffering + tests; nothing on-chain.
✅ Requirements
price.received(emitted byDataReceptionService) using@OnEvent('price.received').RawPrice-compatible object (notably: parse ISOtimestampinto unix ms) and run it throughNormalizationService.NormalizedPriceentries:AGG_TIME_WINDOW_MS, default 30s)minSources(default 3, matchingAggregationService)AggregationService.aggregate(symbol, prices, options)and let it updateDebugService’slastAggregatedandlastNormalized.EventEmitterModule,HttpModuleand providers referenced inAppModuleare properly imported/registered).🎯 Acceptance Criteria
price.receivedevents and continuously updates the latest aggregated snapshot for each symbol.GET /debug/pricesshows non-emptyaggregatedoutput after receiving valid events (in tests and/or local run).minSources.apps/aggregatorbuilds and tests pass.📁 Expected files to change/structure
apps/aggregator/src/services/price-stream-processor.service.ts(new) —@OnEventhandler + buffering + trigger aggregation.apps/aggregator/src/app.module.ts— ensure all imports/providers are correctly wired for event emitter + reception + processor.apps/aggregator/src/services/*.spec.ts(new or update) — unit tests for processor behavior.apps/aggregator/.env.example(optional) — documentAGG_TIME_WINDOW_MS,AGG_MIN_SOURCES, and ingestor connection vars if needed.Note: don't worry about CI workflow for now, it's an issue on our end we will fix after we merge all issues!
Thank you for taking this issue! You are helping us make RWAs consumer friendly on Stellar.