Skip to content

Real time Data Flow

dev-mondoshawan edited this page Apr 15, 2026 · 1 revision

Real-time Data Flow

**Referenced Files in This Document** - [server.js](file://backend/server.js) - [criticalPoller.js](file://backend/src/jobs/criticalPoller.js) - [routinePoller.js](file://backend/src/jobs/routinePoller.js) - [solanaRpc.js](file://backend/src/services/solanaRpc.js) - [helius.js](file://backend/src/services/helius.js) - [rpcProber.js](file://backend/src/services/rpcProber.js) - [validatorsApp.js](file://backend/src/services/validatorsApp.js) - [queries.js](file://backend/src/models/queries.js) - [redis.js](file://backend/src/models/redis.js) - [cacheKeys.js](file://backend/src/models/cacheKeys.js) - [index.js](file://backend/src/websocket/index.js) - [index.js](file://backend/src/config/index.js) - [useWebSocket.js](file://frontend/src/hooks/useWebSocket.js) - [networkStore.js](file://frontend/src/stores/networkStore.js) - [rpcStore.js](file://frontend/src/stores/rpcStore.js) - [validatorStore.js](file://frontend/src/stores/validatorStore.js)

Table of Contents

  1. Introduction
  2. Project Structure
  3. Core Components
  4. Architecture Overview
  5. Detailed Component Analysis
  6. Dependency Analysis
  7. Performance Considerations
  8. Troubleshooting Guide
  9. Conclusion

Introduction

This document explains InfraWatch’s real-time data flow architecture. It covers the data collection pipeline from critical pollers and external APIs, data processing and persistence, caching strategies, WebSocket broadcasting for live updates, client-side state management, background job scheduling, synchronization patterns, and performance optimizations for real-time delivery.

Project Structure

The backend is organized around a modular architecture:

  • Entry point initializes HTTP server, Socket.io, middleware, routes, and data layer initialization.
  • Jobs orchestrate periodic data collection and broadcasting.
  • Services encapsulate external API integrations and internal computations.
  • Models provide database access and Redis caching.
  • Frontend integrates WebSocket events with Zustand stores for reactive UI updates.
graph TB
subgraph "Backend"
S["server.js"]
WS["websocket/index.js"]
CP["jobs/criticalPoller.js"]
RP["jobs/routinePoller.js"]
SR["services/solanaRpc.js"]
HJ["services/helius.js"]
RPP["services/rpcProber.js"]
VAPP["services/validatorsApp.js"]
Q["models/queries.js"]
RD["models/redis.js"]
CK["models/cacheKeys.js"]
CFG["config/index.js"]
end
subgraph "Frontend"
UWS["hooks/useWebSocket.js"]
NS["stores/networkStore.js"]
RS["stores/rpcStore.js"]
VS["stores/validatorStore.js"]
end
S --> WS
S --> CP
S --> RP
CP --> SR
CP --> HJ
CP --> RPP
CP --> Q
CP --> RD
RP --> VAPP
RP --> SR
RP --> Q
RP --> RD
WS --> UWS
UWS --> NS
UWS --> RS
UWS --> VS
S --> CFG
CP --> CFG
RP --> CFG
Loading

Diagram sources

  • server.js:1-128
  • criticalPoller.js:1-108
  • routinePoller.js:1-116
  • solanaRpc.js:1-340
  • helius.js:1-188
  • rpcProber.js:1-342
  • validatorsApp.js:1-388
  • queries.js:1-459
  • redis.js:1-161
  • cacheKeys.js:1-50
  • index.js
  • index.js
  • useWebSocket.js:1-30
  • networkStore.js:1-25
  • rpcStore.js:1-16
  • validatorStore.js:1-28

Section sources

  • server.js:1-128
  • index.js

Core Components

  • Background jobs: Critical and routine pollers schedule periodic data collection, processing, persistence, caching, and broadcasting.
  • External integrations: Solana RPC, Helius API, RPC provider probing, and Validators.app API.
  • Data access and caching: PostgreSQL via parameterized queries and Redis for fast reads/writes.
  • Real-time messaging: Socket.io server with connection lifecycle and event broadcasting.
  • Frontend state: React hooks and Zustand stores subscribe to WebSocket events and manage UI state.

Section sources

  • criticalPoller.js:1-108
  • routinePoller.js:1-116
  • solanaRpc.js:1-340
  • helius.js:1-188
  • rpcProber.js:1-342
  • validatorsApp.js:1-388
  • queries.js:1-459
  • redis.js:1-161
  • index.js
  • useWebSocket.js:1-30
  • networkStore.js:1-25

Architecture Overview

The system follows a publish-subscribe pattern:

  • Pollers collect data, compute metrics, persist to PostgreSQL, cache in Redis, and emit events via Socket.io.
  • Clients connect via WebSocket, receive live updates, and update local state through stores.
  • Configuration drives external endpoints, intervals, and feature toggles.
sequenceDiagram
participant Cron as "Scheduler"
participant CP as "Critical Poller"
participant SR as "Solana RPC"
participant HJ as "Helius API"
participant RPP as "RPC Prober"
participant DB as "PostgreSQL"
participant RC as "Redis"
participant IO as "Socket.io"
participant FE as "Frontend"
Cron->>CP : "Tick every 30s"
CP->>SR : "collectNetworkSnapshot()"
CP->>HJ : "getPriorityFeeEstimate()"
CP->>RPP : "probeAllProviders()"
CP->>DB : "insertNetworkSnapshot(), insertRpcHealthCheck()"
CP->>RC : "setCache(network : current, rpc : latest)"
CP->>IO : "emit('network : update'), emit('rpc : update')"
IO-->>FE : "Receive live updates"
Loading

Diagram sources

  • criticalPoller.js:23-100
  • solanaRpc.js:275-328
  • helius.js:13-70
  • rpcProber.js:140-180
  • queries.js:27-118
  • redis.js:99-112
  • index.js
  • useWebSocket.js:21-23

Detailed Component Analysis

Data Collection Pipeline

  • Critical Poller (every 30 seconds):

    • Gathers network snapshot (TPS, slot, epoch, delinquency, congestion).
    • Enriches with Helius priority fees to compute congestion score.
    • Probes RPC providers for health and latency.
    • Writes to PostgreSQL and caches in Redis.
    • Broadcasts updates via Socket.io.
  • Routine Poller (every 5 minutes):

    • Fetches top validators from Validators.app with rate limiting.
    • Detects commission changes and persists snapshots.
    • Updates validator cache and epoch info cache.
    • Emits alerts for significant changes.
flowchart TD
Start(["Critical Poller Tick"]) --> Snap["Collect Snapshot<br/>Solana RPC"]
Snap --> Fees{"Helius Available?"}
Fees --> |Yes| Cong["Compute Congestion Score"]
Fees --> |No| Next["Skip Enhancement"]
Cong --> Probe["Probe RPC Providers"]
Next --> Probe
Probe --> Persist["Insert into DB"]
Persist --> Cache["Update Redis Caches"]
Cache --> Broadcast["Broadcast via Socket.io"]
Broadcast --> End(["Complete"])
Loading

Diagram sources

  • criticalPoller.js:32-92
  • solanaRpc.js:275-328
  • helius.js:13-70
  • rpcProber.js:140-180
  • queries.js:27-118
  • redis.js:99-112
  • index.js

Section sources

  • criticalPoller.js:17-100
  • routinePoller.js:16-107

External API Integrations

  • Solana RPC:

    • Health, TPS, slot progression, epoch info, delinquent validators, and confirmation time.
    • Calculates congestion score combining TPS, priority fees, and slot latency.
  • Helius:

    • Priority fee estimates for congestion modeling.
    • Optional enhanced TPS and account info retrieval.
  • RPC Prober:

    • Concurrently probes multiple providers (public/premium).
    • Tracks latency percentiles, uptime, and incidents.
  • Validators.app:

    • Rate-limited fetch of validators with normalization and caching.
    • Detects commission changes and supports historical snapshots.
classDiagram
class SolanaRpc {
+getNetworkHealth()
+getCurrentTps()
+getSlotInfo()
+getEpochInfo()
+getDelinquentValidators()
+getConfirmationTime()
+calculateCongestionScore(tps, fee, latency)
+collectNetworkSnapshot()
}
class Helius {
+getPriorityFeeEstimate()
+getEnhancedTps()
+getAccountInfo(pubkey)
+isConfigured()
}
class RpcProber {
+probeProvider(provider)
+probeAllProviders()
+getAllProviderStats()
+getBestProvider()
}
class ValidatorsApp {
+getValidators(limit)
+getValidatorDetail(votePubkey)
+getCachedValidators()
+detectCommissionChanges(current, cached)
+getRateLimitStatus()
}
SolanaRpc --> Helius : "uses for priority fees"
CriticalPoller ..> SolanaRpc : "collectNetworkSnapshot"
CriticalPoller ..> RpcProber : "probeAllProviders"
RoutinePoller ..> ValidatorsApp : "getValidators"
Loading

Diagram sources

  • solanaRpc.js:16-328
  • helius.js:13-187
  • rpcProber.js:75-307
  • validatorsApp.js:186-387
  • criticalPoller.js:32-46
  • routinePoller.js:30-31

Section sources

  • solanaRpc.js:16-328
  • helius.js:13-187
  • rpcProber.js:75-307
  • validatorsApp.js:186-387

Data Processing and Persistence

  • PostgreSQL:

    • Parameterized inserts for network snapshots, RPC health checks, validators, and validator snapshots.
    • Queries for latest snapshots, provider histories, top validators, and alerts.
  • Redis:

    • Lazy-initialized client with retry strategy and connection lifecycle logging.
    • JSON serialization for cache values with TTL management.
    • Cache keys centralized for network, RPC, validators, and history.
flowchart TD
CP["Critical Poller"] --> DBWrite["PostgreSQL Inserts"]
RP["Routine Poller"] --> DBWrite
CP --> RCWrite["Redis setCache()"]
RP --> RCWrite
DBRead["Queries"] --> RCRead["Redis getCache()"]
RCRead --> CP
RCRead --> RP
Loading

Diagram sources

  • queries.js:27-458
  • redis.js:99-131
  • cacheKeys.js:6-48
  • criticalPoller.js:49-86
  • routinePoller.js:37-70

Section sources

  • queries.js:27-458
  • redis.js:16-161
  • cacheKeys.js:6-48

WebSocket Broadcasting and Client Management

  • Server:

    • Socket.io initialized with CORS and middleware-friendly configuration.
    • Connection tracking, error handling, and global broadcast utilities.
    • Exposes IO instance for jobs to emit events.
  • Client:

    • React hook connects to Socket.io with fallback transports.
    • Subscribes to network and RPC update events.
    • Updates Zustand stores for reactive UI rendering.
sequenceDiagram
participant IO as "Socket.io Server"
participant CP as "Critical Poller"
participant RP as "Routine Poller"
participant FE as "Frontend Hook"
participant Store as "Zustand Store"
CP->>IO : "emit('network : update', snapshot)"
IO-->>FE : "on('network : update')"
FE->>Store : "setCurrent(snapshot)"
RP->>IO : "emit('alert : new', alert)"
IO-->>FE : "on('alert : new')"
FE->>Store : "setAlerts(alert)"
Loading

Diagram sources

  • index.js
  • criticalPoller.js:88-92
  • routinePoller.js:96-100
  • useWebSocket.js:8-28
  • networkStore.js:17

Section sources

  • index.js
  • useWebSocket.js:8-28
  • networkStore.js:3-22

Real-time State Management

  • Network store tracks current snapshot, history, epoch info, connection state, and last update.
  • RPC store manages provider list, recommendation, and loading/error states.
  • Validator store handles list, selection, sorting, and pagination.
stateDiagram-v2
[*] --> Idle
Idle --> Loading : "fetch data"
Loading --> Ready : "success"
Loading --> Error : "failure"
Ready --> Idle : "reset"
Error --> Loading : "retry"
Loading

Diagram sources

  • networkStore.js:3-22
  • rpcStore.js:3-13
  • validatorStore.js:3-25

Section sources

  • networkStore.js:3-22
  • rpcStore.js:3-13
  • validatorStore.js:3-25

Dependency Analysis

  • Configuration-driven:
    • Environment variables drive Solana endpoints, API keys, database and Redis URLs, polling intervals, and CORS.
  • Coupling and cohesion:
    • Jobs depend on services and models; services depend on configuration and external APIs.
    • WebSocket module is decoupled from business logic and only emits events.
  • External dependencies:
    • @solana/web3.js, axios, node-cron, ioredis, socket.io.
graph LR
CFG["config/index.js"] --> SR["solanaRpc.js"]
CFG --> HJ["helius.js"]
CFG --> VAPP["validatorsApp.js"]
CFG --> RD["redis.js"]
CP["criticalPoller.js"] --> SR
CP --> HJ
CP --> RPP["rpcProber.js"]
CP --> Q["queries.js"]
CP --> RD
RP["routinePoller.js"] --> VAPP
RP --> SR
RP --> Q
RP --> RD
WS["websocket/index.js"] --> FE["frontend stores/hooks"]
Loading

Diagram sources

  • index.js
  • solanaRpc.js:10
  • helius.js:6-7
  • validatorsApp.js:6-7
  • redis.js:6-7
  • criticalPoller.js:8-13
  • routinePoller.js:8-12
  • index.js

Section sources

  • index.js
  • criticalPoller.js:8-13
  • routinePoller.js:8-12

Performance Considerations

  • Concurrency and batching:
    • Parallel RPC calls for network metrics and provider probing reduce latency.
    • Batch writes to PostgreSQL and Redis minimize round trips.
  • Caching strategy:
    • Short TTLs for frequently changing metrics (network and RPC).
    • Longer TTLs for stable lists (validators) and epoch info.
  • Graceful degradation:
    • Try/catch around DB and Redis operations prevents cascading failures.
    • Optional Helius integration avoids blocking when API key is missing.
  • Transport optimization:
    • Socket.io configured with WebSocket and polling fallbacks for reliability.
  • Rate limiting:
    • Validators.app requests are rate-limited to respect upstream quotas.

[No sources needed since this section provides general guidance]

Troubleshooting Guide

  • Redis connectivity:

    • Lazy initialization logs connection events; check initial connect and ready callbacks.
    • Verify REDIS_URL environment variable and network access.
  • Database availability:

    • Jobs wrap DB operations in try/catch and continue if unavailable.
    • Confirm DATABASE_URL and Postgres service status.
  • External API issues:

    • Missing API keys disable Helius and Validators.app features.
    • Timeout and error handling in HTTP clients surface meaningful logs.
  • WebSocket events:

    • Verify Socket.io server is initialized and emitting events.
    • Ensure frontend connects to the correct path and handles connection/disconnection events.

Section sources

  • redis.js:16-68
  • redis.js:59-61
  • index.js
  • helius.js:14-18
  • validatorsApp.js:116-149
  • index.js
  • useWebSocket.js:11-19

Conclusion

InfraWatch’s real-time architecture combines scheduled data collection, robust external API integrations, resilient persistence, and efficient caching with a reliable WebSocket broadcast layer. The frontend consumes live events through a simple hook and Zustand stores, enabling responsive dashboards. The design emphasizes resilience, scalability, and maintainability through modular services, centralized configuration, and graceful error handling.

Clone this wiki locally