diff --git a/README.md b/README.md index 9fb7bc3..fbb3eac 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,6 @@ A Node.js backend service for the L1Beat, providing API endpoints for Avalanche - **Chain Data**: Fetch and store information about Avalanche chains - **Validator Data**: Track validators for each chain -- **TVL Tracking**: Historical and current TVL data for Avalanche - **TPS Metrics**: Track transactions per second for each chain and the entire network - **Caching**: In-memory caching for improved performance - **Structured Logging**: Comprehensive logging system @@ -15,7 +14,7 @@ A Node.js backend service for the L1Beat, providing API endpoints for Avalanche ## Tech Stack - **Node.js** and **Express**: Backend framework -- **MongoDB**: Database for storing chain, validator, TVL, and TPS data +- **MongoDB**: Database for storing chain, validator, and TPS data - **Mongoose**: MongoDB object modeling - **Winston**: Structured logging - **Helmet**: Security headers @@ -31,11 +30,6 @@ A Node.js backend service for the L1Beat, providing API endpoints for Avalanche - `GET /api/chains/:chainId`: Get a specific chain by ID - `GET /api/chains/:chainId/validators`: Get validators for a specific chain -### TVL Endpoints - -- `GET /api/tvl/history`: Get historical TVL data -- `GET /api/tvl/health`: Check TVL data health - ### TPS Endpoints - `GET /api/chains/:chainId/tps/history`: Get TPS history for a specific chain @@ -89,18 +83,12 @@ A Node.js backend service for the L1Beat, providing API endpoints for Avalanche ### Production Deployment -For production deployment, set `NODE_ENV=production` and ensure all environment variables are properly configured. - -#### Deploying to Vercel - -This application is configured for deployment on Vercel. To deploy: +The application runs as a long-lived Node.js process on DigitalOcean (it relies +on in-process `node-cron` jobs and long-running background updates, so it must +run as a persistent process — not a serverless function). -1. Install the Vercel CLI: - ``` - npm install -g vercel - ``` - -2. Create a `.env.production` file with your production environment variables: +1. Set the production environment variables (e.g. via a `.env` file or the + process environment): ``` NODE_ENV=production PROD_MONGODB_URI=your_production_mongodb_uri @@ -108,27 +96,28 @@ This application is configured for deployment on Vercel. To deploy: UPDATE_API_KEY=your_production_update_key ``` -3. Run the deployment script: +2. Install dependencies and start the server: ``` - ./deploy.sh + npm ci + npm start ``` -Alternatively, you can deploy directly from the Vercel dashboard by connecting your GitHub repository. +Run it under a process manager (e.g. PM2 or a systemd unit) so it restarts on +crash, and place it behind a reverse proxy / load balancer (the app sets +`trust proxy` in production). On `SIGTERM`/`SIGINT` the server shuts down +gracefully, draining in-flight requests and closing the MongoDB connection. ## Scheduled Tasks The application runs several scheduled tasks: -- TVL updates: Every 30 minutes - Chain and TPS updates: Every hour -- TPS verification: Every 15 minutes ## Caching The application implements in-memory caching for frequently accessed data: - Chain data: 5 minutes -- TVL history: 15 minutes - TPS data: 5 minutes ## Security @@ -166,9 +155,6 @@ The following environment variables are required for the application to function - `GLACIER_VALIDATORS_ENDPOINT` - Endpoint for validators (default: /networks/mainnet/validators) - `GLACIER_L1VALIDATORS_ENDPOINT` - Endpoint for L1Validators (default: /networks/mainnet/l1Validators) -- `DEFILLAMA_API_BASE` - Base URL for the DefiLlama API -- `DEFILLAMA_API_TIMEOUT` - Timeout for DefiLlama API requests in milliseconds (default: 30000) - - `METRICS_API_BASE` - Base URL for the Metrics API - `METRICS_API_TIMEOUT` - Timeout for Metrics API requests in milliseconds (default: 30000) - `METRICS_RATE_LIMIT` - Rate limit for Metrics API requests per minute (default: 20) diff --git a/package.json b/package.json index 03993ff..0f44df7 100644 --- a/package.json +++ b/package.json @@ -8,10 +8,12 @@ "dev": "NODE_ENV=development nodemon src/app.js", "seed-authors": "node src/scripts/seedAuthors.js", "populate-tokens": "node scripts/populate-native-tokens.js", - "vercel-build": "echo hello", "test": "cross-env NODE_ENV=test jest --verbose --detectOpenHandles", "test:watch": "cross-env NODE_ENV=test jest --watch" }, + "nodemonConfig": { + "ignore": ["logs/", "data/", "*.log"] + }, "keywords": [], "author": "", "license": "ISC", diff --git a/src/app.js b/src/app.js index 1dd886b..b921163 100644 --- a/src/app.js +++ b/src/app.js @@ -5,11 +5,11 @@ const cron = require('node-cron'); const helmet = require('helmet'); const rateLimit = require('express-rate-limit'); const pLimit = require('p-limit'); +const mongoose = require('mongoose'); +const axios = require('axios'); const config = require('./config/config'); const connectDB = require('./config/db'); const chainRoutes = require('./routes/chainRoutes'); -const fetchAndUpdateData = require('./utils/fetchGlacierData'); -const chainDataService = require('./services/chainDataService'); const Chain = require('./models/chain'); const chainService = require('./services/chainService'); const tpsRoutes = require('./routes/tpsRoutes'); @@ -54,11 +54,9 @@ process.on('uncaughtException', (error) => { const app = express(); -// Check if we're running on Vercel -const isVercel = process.env.VERCEL === '1'; - -// Trust proxy when running on Vercel or other cloud platforms -if (isVercel || config.isProduction) { +// Trust proxy in production (the app runs behind a reverse proxy / load +// balancer on DigitalOcean). +if (config.isProduction) { logger.info('Running behind a proxy, setting trust proxy to true'); app.set('trust proxy', 1); } @@ -130,8 +128,89 @@ app.use(cors({ app.use(express.json({ limit: '1mb' })); // Health check endpoint - MUST be before DB connection for deployment health checks +// Liveness: is the process up and serving? Always 200 while running. MongoDB +// state is reported for visibility but does not affect liveness. Kept cheap +// (no external I/O) so uptime checks can hit it frequently. app.get('/health', (req, res) => { - res.status(200).json({ status: 'ok' }); + res.status(200).json({ + status: 'ok', + uptime: process.uptime(), + timestamp: new Date().toISOString(), + dependencies: { + mongodb: mongoose.connection.readyState === 1 ? 'connected' : 'disconnected' + } + }); +}); + +// Readiness: should this instance receive traffic right now? Returns 503 when +// MongoDB is not connected so a load balancer can drain it. Still cheap — only +// inspects the local connection state, no external I/O. +app.get('/health/ready', (req, res) => { + const mongoConnected = mongoose.connection.readyState === 1; + res.status(mongoConnected ? 200 : 503).json({ + status: mongoConnected ? 'ok' : 'not_ready', + timestamp: new Date().toISOString(), + dependencies: { + mongodb: mongoConnected ? 'connected' : 'disconnected' + } + }); +}); + +// Deep dependency probe. Actively pings external APIs, so it is intentionally +// kept off the main /health path to avoid latency and burning rate limits on +// every uptime check. Call this on demand or from a low-frequency monitor. +// Cache the deep probe briefly so this endpoint can't be used to hammer the +// upstream APIs (and burn the rate budget the cron jobs depend on): the outbound +// probes run at most once per TTL regardless of request volume. +const DEP_HEALTH_TTL_MS = 30 * 1000; +let depHealthCache = null; // { expiresAt, statusCode, body } + +app.get('/health/dependencies', async (req, res) => { + if (depHealthCache && depHealthCache.expiresAt > Date.now()) { + return res.status(depHealthCache.statusCode).json({ ...depHealthCache.body, cached: true }); + } + + const probe = async (name, url) => { + if (!url) return { name, status: 'unconfigured' }; + const startedAt = Date.now(); + try { + await axios.get(url, { + timeout: 5000, + // Any HTTP response means the host is reachable; we only care about + // connectivity here, not the specific status code. + validateStatus: () => true + }); + return { name, status: 'reachable', latencyMs: Date.now() - startedAt }; + } catch (error) { + return { name, status: 'unreachable', error: error.message }; + } + }; + + const mongoConnected = mongoose.connection.readyState === 1; + const [glacier, metrics] = await Promise.all([ + probe('glacier', config.api && config.api.glacier && config.api.glacier.baseUrl), + probe('metrics', config.api && config.api.metrics && config.api.metrics.baseUrl) + ]); + + // 'unconfigured' counts as unhealthy: GLACIER_API_BASE / METRICS_API_BASE are + // required, so a missing base URL is a real misconfiguration, not "fine". + const healthy = mongoConnected && + glacier.status === 'reachable' && + metrics.status === 'reachable'; + + const body = { + status: healthy ? 'ok' : 'degraded', + timestamp: new Date().toISOString(), + dependencies: { + mongodb: mongoConnected ? 'connected' : 'disconnected', + glacier, + metrics + } + }; + const statusCode = healthy ? 200 : 503; + + depHealthCache = { expiresAt: Date.now() + DEP_HEALTH_TTL_MS, statusCode, body }; + res.status(statusCode).json(body); }); // Single initialization point for data updates @@ -637,12 +716,12 @@ if (missingEnvVars.length > 0) { // Still allow the server to start (for development convenience) } -// For Vercel, we need to export the app +// Export the app so tests (supertest) can mount it without binding a port. module.exports = app; -// Only listen if not running on Vercel or in test mode +// Only bind a port outside of tests (in tests supertest drives the app directly). const isTest = process.env.NODE_ENV === 'test'; -if (!isVercel && !isTest) { +if (!isTest) { const server = app.listen(PORT, () => { logger.info(`Server running on port ${PORT}`, { environment: config.env, @@ -656,4 +735,36 @@ if (!isVercel && !isTest) { server.on('error', (error) => { logger.error('Server error:', { error: error.message, stack: error.stack }); }); + + // Graceful shutdown: stop accepting new connections, then close the DB + // connection so in-flight work can drain before the process exits. + let shuttingDown = false; + const gracefulShutdown = (signal) => { + if (shuttingDown) return; + shuttingDown = true; + logger.info(`Received ${signal}, shutting down gracefully...`); + + // Hard limit so a hung connection can't block shutdown indefinitely. + const forceExit = setTimeout(() => { + logger.error('Graceful shutdown timed out, forcing exit'); + process.exit(1); + }, 10000); + forceExit.unref(); + + server.close(async () => { + logger.info('HTTP server closed, no longer accepting connections'); + try { + await mongoose.connection.close(false); + logger.info('MongoDB connection closed'); + } catch (error) { + logger.error('Error closing MongoDB connection:', { error: error.message }); + } finally { + clearTimeout(forceExit); + process.exit(0); + } + }); + }; + + process.on('SIGTERM', () => gracefulShutdown('SIGTERM')); + process.on('SIGINT', () => gracefulShutdown('SIGINT')); } diff --git a/src/models/teleporterMessage.js b/src/models/teleporterMessage.js index 5dca1e6..ec7e403 100644 --- a/src/models/teleporterMessage.js +++ b/src/models/teleporterMessage.js @@ -79,6 +79,14 @@ const teleporterUpdateStateSchema = new mongoose.Schema({ required: true, default: Date.now }, + // Anchor for resumable multi-day fetches: the fixed "end of window" the + // per-day windows are measured back from. Captured when a weekly run first + // starts so that resuming hours later still produces a consistent 7-day + // dataset (rather than a window that slides with wall-clock time). + referenceEndTime: { + type: Date, + default: null + }, // Progress information progress: { currentDay: { diff --git a/src/services/activeAddressesService.js b/src/services/activeAddressesService.js index 02f5be7..5196b5e 100644 --- a/src/services/activeAddressesService.js +++ b/src/services/activeAddressesService.js @@ -1,307 +1,20 @@ const ActiveAddresses = require('../models/activeAddresses'); -const axios = require('axios'); -const Chain = require('../models/chain'); -const config = require('../config/config'); -const logger = require('../utils/logger'); - -// Rate limiter implementation (shared with TPS service) -class RateLimiter { - constructor(maxRequestsPerMinute = 30) { - this.queue = []; - this.processing = false; - this.maxRequestsPerMinute = maxRequestsPerMinute; - this.requestTimestamps = []; - } - - async enqueue(fn) { - return new Promise((resolve, reject) => { - this.queue.push({ fn, resolve, reject }); - this.processQueue(); - }); - } - - async processQueue() { - if (this.processing || this.queue.length === 0) return; - - this.processing = true; - - try { - const now = Date.now(); - this.requestTimestamps = this.requestTimestamps.filter( - timestamp => now - timestamp < 60000 - ); - - if (this.requestTimestamps.length >= this.maxRequestsPerMinute) { - const oldestTimestamp = this.requestTimestamps[0]; - const timeToWait = 60000 - (now - oldestTimestamp); - - logger.info(`Rate limit reached, waiting ${Math.round(timeToWait/1000)}s before next request`); - - setTimeout(() => { - this.processing = false; - this.processQueue(); - }, timeToWait + 100); - - return; - } - - const item = this.queue.shift(); - this.requestTimestamps.push(now); - - try { - const result = await item.fn(); - item.resolve(result); - } catch (error) { - item.reject(error); - } - - setTimeout(() => { - this.processing = false; - this.processQueue(); - }, 300); - } catch (error) { - logger.error('Error in rate limiter:', error); - this.processing = false; - } - } -} - -const metricsApiRateLimiter = new RateLimiter(config.api.metrics.rateLimit.requestsPerMinute || 20); - -class ActiveAddressesService { - async updateActiveAddressesData(chainId, retryCount = config.api.metrics.rateLimit.maxRetries || 3) { - return metricsApiRateLimiter.enqueue(async () => { - for (let attempt = 1; attempt <= retryCount; attempt++) { - try { - logger.info(`[Active Addresses Update] Starting update for chain ${chainId} (Attempt ${attempt}/${retryCount})`); - - const headers = { - 'Accept': 'application/json', - 'User-Agent': 'l1beat-backend', - 'Cache-Control': 'no-cache' - }; - - if (process.env.GLACIER_API_KEY) { - headers['x-api-key'] = process.env.GLACIER_API_KEY; - } - - const response = await axios.get(`${config.api.metrics.baseUrl}/chains/${chainId}/metrics/activeAddresses`, { - params: { - timeInterval: 'day', - pageSize: 100 // Maximum allowed by API - }, - timeout: config.api.metrics.timeout, - headers - }); - - if (!response.data || !Array.isArray(response.data.results)) { - logger.warn(`[Active Addresses Update] Invalid response format for chain ${chainId}`); - continue; - } - - const currentTime = Math.floor(Date.now() / 1000); - const thirtyDaysAgo = currentTime - (30 * 24 * 60 * 60); - - const validData = response.data.results.filter(item => { - const timestamp = Number(item.timestamp); - const value = parseFloat(item.value); - - if (isNaN(timestamp) || isNaN(value)) { - return false; - } - - return timestamp >= thirtyDaysAgo && timestamp <= currentTime; - }); - - if (validData.length > 0) { - const result = await ActiveAddresses.bulkWrite( - validData.map(item => ({ - updateOne: { - filter: { - chainId: chainId, - timestamp: Number(item.timestamp) - }, - update: { - $set: { - value: parseFloat(item.value), - lastUpdated: new Date() - } - }, - upsert: true - } - })), - { ordered: false } - ); - - logger.info(`[Active Addresses Update] Updated chain ${chainId}:`, { - upserted: result.upsertedCount, - modified: result.modifiedCount, - total: validData.length - }); - - return { - success: true, - chainId, - recordsProcessed: validData.length, - upserted: result.upsertedCount, - modified: result.modifiedCount - }; - } - - logger.info(`[Active Addresses Update] No valid data for chain ${chainId}`); - return { - success: true, - chainId, - recordsProcessed: 0, - message: 'No valid data points' - }; - - } catch (error) { - logger.error(`[Active Addresses Update] Error for chain ${chainId} (Attempt ${attempt}/${retryCount}):`, error.message); - - if (attempt === retryCount) { - return { - success: false, - chainId, - error: error.message - }; - } - - await new Promise(resolve => setTimeout(resolve, 2000 * attempt)); - } - } - }); - } - - async updateAllChainsActiveAddresses() { - try { - logger.info('[Active Addresses] Starting update for all chains'); - - const chains = await Chain.find({}); - const results = []; - - for (const chain of chains) { - const chainId = chain.evmChainId || chain.chainId; - - if (!chainId || !/^\d+$/.test(String(chainId))) { - logger.warn(`[Active Addresses] Skipping chain with invalid ID:`, chain.name); - continue; - } - - const result = await this.updateActiveAddressesData(String(chainId)); - results.push(result); - } - - const successful = results.filter(r => r.success).length; - const failed = results.filter(r => !r.success).length; - - logger.info('[Active Addresses] Update completed:', { - total: chains.length, - successful, - failed - }); - - return { success: true, results }; - } catch (error) { - logger.error('[Active Addresses] Error updating all chains:', error); - return { success: false, error: error.message }; - } - } - - async getActiveAddressesHistory(chainId, days = 30) { - try { - const endTime = Math.floor(Date.now() / 1000); - const startTime = endTime - (days * 24 * 60 * 60); - - const data = await ActiveAddresses.find({ - chainId: String(chainId), - timestamp: { $gte: startTime, $lte: endTime } - }) - .sort({ timestamp: 1 }) - .lean(); - - return data; - } catch (error) { - logger.error(`[Active Addresses] Error fetching history for chain ${chainId}:`, error); - throw error; - } - } - - async getLatestActiveAddresses(chainId) { - try { - const latestRecord = await ActiveAddresses.findOne({ - chainId: String(chainId) - }) - .sort({ timestamp: -1 }) - .lean(); - - return latestRecord; - } catch (error) { - logger.error(`[Active Addresses] Error fetching latest for chain ${chainId}:`, error); - throw error; - } - } - - async getNetworkActiveAddressesHistory(days = 30) { - try { - const endTime = Math.floor(Date.now() / 1000); - const startTime = endTime - (days * 24 * 60 * 60); - - const data = await ActiveAddresses.find({ - timestamp: { $gte: startTime, $lte: endTime } - }) - .sort({ timestamp: 1 }) - .lean(); - - // Group by timestamp and sum active addresses - const groupedData = {}; - data.forEach(record => { - if (!groupedData[record.timestamp]) { - groupedData[record.timestamp] = { - timestamp: record.timestamp, - value: 0 - }; - } - groupedData[record.timestamp].value += record.value; - }); - - return Object.values(groupedData).sort((a, b) => a.timestamp - b.timestamp); - } catch (error) { - logger.error('[Active Addresses] Error fetching network history:', error); - throw error; - } - } - - async getNetworkLatestActiveAddresses() { - try { - // Get the most recent timestamp - const latestRecord = await ActiveAddresses.findOne() - .sort({ timestamp: -1 }) - .lean(); - - if (!latestRecord) { - return null; - } - - const latestTimestamp = latestRecord.timestamp; - - // Get all records for that timestamp and sum them - const records = await ActiveAddresses.find({ - timestamp: latestTimestamp - }).lean(); - - const totalActiveAddresses = records.reduce((sum, record) => sum + record.value, 0); - - return { - timestamp: latestTimestamp, - value: totalActiveAddresses, - chainCount: records.length - }; - } catch (error) { - logger.error('[Active Addresses] Error fetching network latest:', error); - throw error; - } - } -} - -module.exports = new ActiveAddressesService(); +const createMetricService = require('./metricService'); + +const service = createMetricService({ + model: ActiveAddresses, + metricPath: 'activeAddresses', + label: 'Active Addresses', + aggregation: 'sum' +}); + +// Preserve the original public method names so existing routes/cron callers +// continue to work unchanged. +module.exports = { + updateActiveAddressesData: (chainId, retryCount) => service.updateData(chainId, retryCount), + updateAllChainsActiveAddresses: () => service.updateAllChains(), + getActiveAddressesHistory: (chainId, days) => service.getHistory(chainId, days), + getLatestActiveAddresses: (chainId) => service.getLatest(chainId), + getNetworkActiveAddressesHistory: (days) => service.getNetworkHistory(days), + getNetworkLatestActiveAddresses: () => service.getNetworkLatest() +}; diff --git a/src/services/avgGasPriceService.js b/src/services/avgGasPriceService.js index 212c88d..38aba7f 100644 --- a/src/services/avgGasPriceService.js +++ b/src/services/avgGasPriceService.js @@ -1,313 +1,21 @@ const AvgGasPrice = require('../models/avgGasPrice'); -const axios = require('axios'); -const Chain = require('../models/chain'); -const config = require('../config/config'); -const logger = require('../utils/logger'); - -// Rate limiter implementation -class RateLimiter { - constructor(maxRequestsPerMinute = 30) { - this.queue = []; - this.processing = false; - this.maxRequestsPerMinute = maxRequestsPerMinute; - this.requestTimestamps = []; - } - - async enqueue(fn) { - return new Promise((resolve, reject) => { - this.queue.push({ fn, resolve, reject }); - this.processQueue(); - }); - } - - async processQueue() { - if (this.processing || this.queue.length === 0) return; - - this.processing = true; - - try { - const now = Date.now(); - this.requestTimestamps = this.requestTimestamps.filter( - timestamp => now - timestamp < 60000 - ); - - if (this.requestTimestamps.length >= this.maxRequestsPerMinute) { - const oldestTimestamp = this.requestTimestamps[0]; - const timeToWait = 60000 - (now - oldestTimestamp); - - logger.info(`Rate limit reached, waiting ${Math.round(timeToWait/1000)}s before next request`); - - setTimeout(() => { - this.processing = false; - this.processQueue(); - }, timeToWait + 100); - - return; - } - - const item = this.queue.shift(); - this.requestTimestamps.push(now); - - try { - const result = await item.fn(); - item.resolve(result); - } catch (error) { - item.reject(error); - } - - setTimeout(() => { - this.processing = false; - this.processQueue(); - }, 300); - } catch (error) { - logger.error('Error in rate limiter:', error); - this.processing = false; - } - } -} - -const metricsApiRateLimiter = new RateLimiter(config.api.metrics.rateLimit.requestsPerMinute || 20); - -class AvgGasPriceService { - async updateAvgGasPriceData(chainId, retryCount = config.api.metrics.rateLimit.maxRetries || 3) { - return metricsApiRateLimiter.enqueue(async () => { - for (let attempt = 1; attempt <= retryCount; attempt++) { - try { - logger.info(`[AvgGasPrice Update] Starting update for chain ${chainId} (Attempt ${attempt}/${retryCount})`); - - const headers = { - 'Accept': 'application/json', - 'User-Agent': 'l1beat-backend', - 'Cache-Control': 'no-cache' - }; - - if (process.env.GLACIER_API_KEY) { - headers['x-api-key'] = process.env.GLACIER_API_KEY; - } - - const response = await axios.get(`${config.api.metrics.baseUrl}/chains/${chainId}/metrics/avgGasPrice`, { - params: { - timeInterval: 'day', - pageSize: 100 // Maximum allowed by API - }, - timeout: config.api.metrics.timeout, - headers - }); - - if (!response.data || !Array.isArray(response.data.results)) { - logger.warn(`[AvgGasPrice Update] Invalid response format for chain ${chainId}`); - continue; - } - - const currentTime = Math.floor(Date.now() / 1000); - const thirtyDaysAgo = currentTime - (30 * 24 * 60 * 60); - - const validData = response.data.results.filter(item => { - const timestamp = Number(item.timestamp); - const value = parseFloat(item.value); - - if (isNaN(timestamp) || isNaN(value)) { - return false; - } - - return timestamp >= thirtyDaysAgo && timestamp <= currentTime; - }); - - if (validData.length > 0) { - const result = await AvgGasPrice.bulkWrite( - validData.map(item => ({ - updateOne: { - filter: { - chainId: chainId, - timestamp: Number(item.timestamp) - }, - update: { - $set: { - value: parseFloat(item.value), - lastUpdated: new Date() - } - }, - upsert: true - } - })), - { ordered: false } - ); - - logger.info(`[AvgGasPrice Update] Updated chain ${chainId}:`, { - upserted: result.upsertedCount, - modified: result.modifiedCount, - total: validData.length - }); - - return { - success: true, - chainId, - recordsProcessed: validData.length, - upserted: result.upsertedCount, - modified: result.modifiedCount - }; - } - - logger.info(`[AvgGasPrice Update] No valid data for chain ${chainId}`); - return { - success: true, - chainId, - recordsProcessed: 0, - message: 'No valid data points' - }; - - } catch (error) { - logger.error(`[AvgGasPrice Update] Error for chain ${chainId} (Attempt ${attempt}/${retryCount}):`, error.message); - - if (attempt === retryCount) { - return { - success: false, - chainId, - error: error.message - }; - } - - await new Promise(resolve => setTimeout(resolve, 2000 * attempt)); - } - } - }); - } - - async updateAllChainsAvgGasPrice() { - try { - logger.info('[AvgGasPrice] Starting update for all chains'); - - const chains = await Chain.find({}); - const results = []; - - for (const chain of chains) { - const chainId = chain.evmChainId || chain.chainId; - - if (!chainId || !/^\d+$/.test(String(chainId))) { - logger.warn(`[AvgGasPrice] Skipping chain with invalid ID:`, chain.name); - continue; - } - - const result = await this.updateAvgGasPriceData(String(chainId)); - results.push(result); - } - - const successful = results.filter(r => r.success).length; - const failed = results.filter(r => !r.success).length; - - logger.info('[AvgGasPrice] Update completed:', { - total: chains.length, - successful, - failed - }); - - return { success: true, results }; - } catch (error) { - logger.error('[AvgGasPrice] Error updating all chains:', error); - return { success: false, error: error.message }; - } - } - - async getAvgGasPriceHistory(chainId, days = 30) { - try { - const endTime = Math.floor(Date.now() / 1000); - const startTime = endTime - (days * 24 * 60 * 60); - - const data = await AvgGasPrice.find({ - chainId: String(chainId), - timestamp: { $gte: startTime, $lte: endTime } - }) - .sort({ timestamp: 1 }) - .lean(); - - return data; - } catch (error) { - logger.error(`[AvgGasPrice] Error fetching history for chain ${chainId}:`, error); - throw error; - } - } - - async getLatestAvgGasPrice(chainId) { - try { - const latestRecord = await AvgGasPrice.findOne({ - chainId: String(chainId) - }) - .sort({ timestamp: -1 }) - .lean(); - - return latestRecord; - } catch (error) { - logger.error(`[AvgGasPrice] Error fetching latest for chain ${chainId}:`, error); - throw error; - } - } - - async getNetworkAvgGasPriceHistory(days = 30) { - try { - const endTime = Math.floor(Date.now() / 1000); - const startTime = endTime - (days * 24 * 60 * 60); - - const data = await AvgGasPrice.find({ - timestamp: { $gte: startTime, $lte: endTime } - }) - .sort({ timestamp: 1 }) - .lean(); - - // Group by timestamp and calculate weighted average - const groupedData = {}; - data.forEach(record => { - if (!groupedData[record.timestamp]) { - groupedData[record.timestamp] = { - timestamp: record.timestamp, - sum: 0, - count: 0 - }; - } - groupedData[record.timestamp].sum += record.value; - groupedData[record.timestamp].count += 1; - }); - - return Object.values(groupedData).map(item => ({ - timestamp: item.timestamp, - value: item.sum / item.count // Average across all chains - })).sort((a, b) => a.timestamp - b.timestamp); - } catch (error) { - logger.error('[AvgGasPrice] Error fetching network history:', error); - throw error; - } - } - - async getNetworkLatestAvgGasPrice() { - try { - // Get the most recent timestamp - const latestRecord = await AvgGasPrice.findOne() - .sort({ timestamp: -1 }) - .lean(); - - if (!latestRecord) { - return null; - } - - const latestTimestamp = latestRecord.timestamp; - - // Get all records for that timestamp and calculate average - const records = await AvgGasPrice.find({ - timestamp: latestTimestamp - }).lean(); - - const totalAvgGasPrice = records.reduce((sum, record) => sum + record.value, 0); - const avgGasPrice = totalAvgGasPrice / records.length; - - return { - timestamp: latestTimestamp, - value: avgGasPrice, - chainCount: records.length - }; - } catch (error) { - logger.error('[AvgGasPrice] Error fetching network latest:', error); - throw error; - } - } -} - -module.exports = new AvgGasPriceService(); +const createMetricService = require('./metricService'); + +const service = createMetricService({ + model: AvgGasPrice, + metricPath: 'avgGasPrice', + label: 'AvgGasPrice', + // Gas price is averaged across chains for network-wide figures, not summed. + aggregation: 'avg' +}); + +// Preserve the original public method names so existing routes/cron callers +// continue to work unchanged. +module.exports = { + updateAvgGasPriceData: (chainId, retryCount) => service.updateData(chainId, retryCount), + updateAllChainsAvgGasPrice: () => service.updateAllChains(), + getAvgGasPriceHistory: (chainId, days) => service.getHistory(chainId, days), + getLatestAvgGasPrice: (chainId) => service.getLatest(chainId), + getNetworkAvgGasPriceHistory: (days) => service.getNetworkHistory(days), + getNetworkLatestAvgGasPrice: () => service.getNetworkLatest() +}; diff --git a/src/services/chainDataService.js b/src/services/chainDataService.js deleted file mode 100644 index 3494107..0000000 --- a/src/services/chainDataService.js +++ /dev/null @@ -1,77 +0,0 @@ -const axios = require('axios'); -const config = require('../config/config'); -const logger = require('../utils/logger'); - -class ChainDataService { - constructor() { - this.GLACIER_API_BASE = config.api.glacier.baseUrl; - this.GLACIER_API_KEY = process.env.GLACIER_API_KEY; - - // Log API key status (without revealing the actual key) - logger.info('ChainDataService - Glacier API Key status:', { - hasApiKey: !!this.GLACIER_API_KEY, - apiKeyLength: this.GLACIER_API_KEY ? this.GLACIER_API_KEY.length : 0 - }); - } - - async fetchChainData() { - try { - logger.info('Fetching chains from Glacier API...'); - - // Prepare headers with API key - const headers = { - 'Accept': 'application/json', - 'User-Agent': 'l1beat-backend' - }; - - // Add API key header if available - first check env var, then config - if (this.GLACIER_API_KEY) { - headers['x-glacier-api-key'] = this.GLACIER_API_KEY; - logger.debug('Using Glacier API key from environment variables'); - } else if (config.api.glacier.apiKey) { - headers['x-glacier-api-key'] = config.api.glacier.apiKey; - logger.debug('Using Glacier API key from config'); - } - - // Log request details (without exposing full API key) - logger.info('Making Glacier chains API request:', { - endpoint: '/chains', - hasApiKey: !!headers['x-glacier-api-key'], - apiKeyPrefix: headers['x-glacier-api-key'] ? `${headers['x-glacier-api-key'].substring(0, 4)}...` : 'none' - }); - - const response = await axios.get(`${this.GLACIER_API_BASE}/chains`, { - timeout: config.api.glacier.timeout, - headers: { - 'Accept': 'application/json', - 'User-Agent': 'l1beat-backend', - 'x-glacier-api-key': this.GLACIER_API_KEY - } - }); - - logger.info('Glacier API Response:', { - status: response.status, - chainCount: response.data?.chains?.length || 0 - }); - - if (!response.data || !response.data.chains) { - throw new Error('Invalid response from Glacier API'); - } - - const chains = response.data.chains.filter(chain => !chain.isTestnet); - logger.info(`Filtered ${chains.length} non-testnet chains`); - - return chains; - - } catch (error) { - logger.error('Error fetching chain data:', { - message: error.message, - status: error.response?.status, - data: error.response?.data - }); - throw error; - } - } -} - -module.exports = new ChainDataService(); \ No newline at end of file diff --git a/src/services/feesPaidService.js b/src/services/feesPaidService.js index a8af5c7..ba45b7e 100644 --- a/src/services/feesPaidService.js +++ b/src/services/feesPaidService.js @@ -1,307 +1,20 @@ const FeesPaid = require('../models/feesPaid'); -const axios = require('axios'); -const Chain = require('../models/chain'); -const config = require('../config/config'); -const logger = require('../utils/logger'); - -// Rate limiter implementation -class RateLimiter { - constructor(maxRequestsPerMinute = 30) { - this.queue = []; - this.processing = false; - this.maxRequestsPerMinute = maxRequestsPerMinute; - this.requestTimestamps = []; - } - - async enqueue(fn) { - return new Promise((resolve, reject) => { - this.queue.push({ fn, resolve, reject }); - this.processQueue(); - }); - } - - async processQueue() { - if (this.processing || this.queue.length === 0) return; - - this.processing = true; - - try { - const now = Date.now(); - this.requestTimestamps = this.requestTimestamps.filter( - timestamp => now - timestamp < 60000 - ); - - if (this.requestTimestamps.length >= this.maxRequestsPerMinute) { - const oldestTimestamp = this.requestTimestamps[0]; - const timeToWait = 60000 - (now - oldestTimestamp); - - logger.info(`Rate limit reached, waiting ${Math.round(timeToWait/1000)}s before next request`); - - setTimeout(() => { - this.processing = false; - this.processQueue(); - }, timeToWait + 100); - - return; - } - - const item = this.queue.shift(); - this.requestTimestamps.push(now); - - try { - const result = await item.fn(); - item.resolve(result); - } catch (error) { - item.reject(error); - } - - setTimeout(() => { - this.processing = false; - this.processQueue(); - }, 300); - } catch (error) { - logger.error('Error in rate limiter:', error); - this.processing = false; - } - } -} - -const metricsApiRateLimiter = new RateLimiter(config.api.metrics.rateLimit.requestsPerMinute || 20); - -class FeesPaidService { - async updateFeesPaidData(chainId, retryCount = config.api.metrics.rateLimit.maxRetries || 3) { - return metricsApiRateLimiter.enqueue(async () => { - for (let attempt = 1; attempt <= retryCount; attempt++) { - try { - logger.info(`[FeesPaid Update] Starting update for chain ${chainId} (Attempt ${attempt}/${retryCount})`); - - const headers = { - 'Accept': 'application/json', - 'User-Agent': 'l1beat-backend', - 'Cache-Control': 'no-cache' - }; - - if (process.env.GLACIER_API_KEY) { - headers['x-api-key'] = process.env.GLACIER_API_KEY; - } - - const response = await axios.get(`${config.api.metrics.baseUrl}/chains/${chainId}/metrics/feesPaid`, { - params: { - timeInterval: 'day', - pageSize: 100 // Maximum allowed by API - }, - timeout: config.api.metrics.timeout, - headers - }); - - if (!response.data || !Array.isArray(response.data.results)) { - logger.warn(`[FeesPaid Update] Invalid response format for chain ${chainId}`); - continue; - } - - const currentTime = Math.floor(Date.now() / 1000); - const thirtyDaysAgo = currentTime - (30 * 24 * 60 * 60); - - const validData = response.data.results.filter(item => { - const timestamp = Number(item.timestamp); - const value = parseFloat(item.value); - - if (isNaN(timestamp) || isNaN(value)) { - return false; - } - - return timestamp >= thirtyDaysAgo && timestamp <= currentTime; - }); - - if (validData.length > 0) { - const result = await FeesPaid.bulkWrite( - validData.map(item => ({ - updateOne: { - filter: { - chainId: chainId, - timestamp: Number(item.timestamp) - }, - update: { - $set: { - value: parseFloat(item.value), - lastUpdated: new Date() - } - }, - upsert: true - } - })), - { ordered: false } - ); - - logger.info(`[FeesPaid Update] Updated chain ${chainId}:`, { - upserted: result.upsertedCount, - modified: result.modifiedCount, - total: validData.length - }); - - return { - success: true, - chainId, - recordsProcessed: validData.length, - upserted: result.upsertedCount, - modified: result.modifiedCount - }; - } - - logger.info(`[FeesPaid Update] No valid data for chain ${chainId}`); - return { - success: true, - chainId, - recordsProcessed: 0, - message: 'No valid data points' - }; - - } catch (error) { - logger.error(`[FeesPaid Update] Error for chain ${chainId} (Attempt ${attempt}/${retryCount}):`, error.message); - - if (attempt === retryCount) { - return { - success: false, - chainId, - error: error.message - }; - } - - await new Promise(resolve => setTimeout(resolve, 2000 * attempt)); - } - } - }); - } - - async updateAllChainsFeesPaid() { - try { - logger.info('[FeesPaid] Starting update for all chains'); - - const chains = await Chain.find({}); - const results = []; - - for (const chain of chains) { - const chainId = chain.evmChainId || chain.chainId; - - if (!chainId || !/^\d+$/.test(String(chainId))) { - logger.warn(`[FeesPaid] Skipping chain with invalid ID:`, chain.name); - continue; - } - - const result = await this.updateFeesPaidData(String(chainId)); - results.push(result); - } - - const successful = results.filter(r => r.success).length; - const failed = results.filter(r => !r.success).length; - - logger.info('[FeesPaid] Update completed:', { - total: chains.length, - successful, - failed - }); - - return { success: true, results }; - } catch (error) { - logger.error('[FeesPaid] Error updating all chains:', error); - return { success: false, error: error.message }; - } - } - - async getFeesPaidHistory(chainId, days = 30) { - try { - const endTime = Math.floor(Date.now() / 1000); - const startTime = endTime - (days * 24 * 60 * 60); - - const data = await FeesPaid.find({ - chainId: String(chainId), - timestamp: { $gte: startTime, $lte: endTime } - }) - .sort({ timestamp: 1 }) - .lean(); - - return data; - } catch (error) { - logger.error(`[FeesPaid] Error fetching history for chain ${chainId}:`, error); - throw error; - } - } - - async getLatestFeesPaid(chainId) { - try { - const latestRecord = await FeesPaid.findOne({ - chainId: String(chainId) - }) - .sort({ timestamp: -1 }) - .lean(); - - return latestRecord; - } catch (error) { - logger.error(`[FeesPaid] Error fetching latest for chain ${chainId}:`, error); - throw error; - } - } - - async getNetworkFeesPaidHistory(days = 30) { - try { - const endTime = Math.floor(Date.now() / 1000); - const startTime = endTime - (days * 24 * 60 * 60); - - const data = await FeesPaid.find({ - timestamp: { $gte: startTime, $lte: endTime } - }) - .sort({ timestamp: 1 }) - .lean(); - - // Group by timestamp and sum fees paid - const groupedData = {}; - data.forEach(record => { - if (!groupedData[record.timestamp]) { - groupedData[record.timestamp] = { - timestamp: record.timestamp, - value: 0 - }; - } - groupedData[record.timestamp].value += record.value; - }); - - return Object.values(groupedData).sort((a, b) => a.timestamp - b.timestamp); - } catch (error) { - logger.error('[FeesPaid] Error fetching network history:', error); - throw error; - } - } - - async getNetworkLatestFeesPaid() { - try { - // Get the most recent timestamp - const latestRecord = await FeesPaid.findOne() - .sort({ timestamp: -1 }) - .lean(); - - if (!latestRecord) { - return null; - } - - const latestTimestamp = latestRecord.timestamp; - - // Get all records for that timestamp and sum them - const records = await FeesPaid.find({ - timestamp: latestTimestamp - }).lean(); - - const totalFeesPaid = records.reduce((sum, record) => sum + record.value, 0); - - return { - timestamp: latestTimestamp, - value: totalFeesPaid, - chainCount: records.length - }; - } catch (error) { - logger.error('[FeesPaid] Error fetching network latest:', error); - throw error; - } - } -} - -module.exports = new FeesPaidService(); +const createMetricService = require('./metricService'); + +const service = createMetricService({ + model: FeesPaid, + metricPath: 'feesPaid', + label: 'FeesPaid', + aggregation: 'sum' +}); + +// Preserve the original public method names so existing routes/cron callers +// continue to work unchanged. +module.exports = { + updateFeesPaidData: (chainId, retryCount) => service.updateData(chainId, retryCount), + updateAllChainsFeesPaid: () => service.updateAllChains(), + getFeesPaidHistory: (chainId, days) => service.getHistory(chainId, days), + getLatestFeesPaid: (chainId) => service.getLatest(chainId), + getNetworkFeesPaidHistory: (days) => service.getNetworkHistory(days), + getNetworkLatestFeesPaid: () => service.getNetworkLatest() +}; diff --git a/src/services/gasUsedService.js b/src/services/gasUsedService.js index 282a3d2..6681661 100644 --- a/src/services/gasUsedService.js +++ b/src/services/gasUsedService.js @@ -1,307 +1,20 @@ const GasUsed = require('../models/gasUsed'); -const axios = require('axios'); -const Chain = require('../models/chain'); -const config = require('../config/config'); -const logger = require('../utils/logger'); - -// Rate limiter implementation -class RateLimiter { - constructor(maxRequestsPerMinute = 30) { - this.queue = []; - this.processing = false; - this.maxRequestsPerMinute = maxRequestsPerMinute; - this.requestTimestamps = []; - } - - async enqueue(fn) { - return new Promise((resolve, reject) => { - this.queue.push({ fn, resolve, reject }); - this.processQueue(); - }); - } - - async processQueue() { - if (this.processing || this.queue.length === 0) return; - - this.processing = true; - - try { - const now = Date.now(); - this.requestTimestamps = this.requestTimestamps.filter( - timestamp => now - timestamp < 60000 - ); - - if (this.requestTimestamps.length >= this.maxRequestsPerMinute) { - const oldestTimestamp = this.requestTimestamps[0]; - const timeToWait = 60000 - (now - oldestTimestamp); - - logger.info(`Rate limit reached, waiting ${Math.round(timeToWait/1000)}s before next request`); - - setTimeout(() => { - this.processing = false; - this.processQueue(); - }, timeToWait + 100); - - return; - } - - const item = this.queue.shift(); - this.requestTimestamps.push(now); - - try { - const result = await item.fn(); - item.resolve(result); - } catch (error) { - item.reject(error); - } - - setTimeout(() => { - this.processing = false; - this.processQueue(); - }, 300); - } catch (error) { - logger.error('Error in rate limiter:', error); - this.processing = false; - } - } -} - -const metricsApiRateLimiter = new RateLimiter(config.api.metrics.rateLimit.requestsPerMinute || 20); - -class GasUsedService { - async updateGasUsedData(chainId, retryCount = config.api.metrics.rateLimit.maxRetries || 3) { - return metricsApiRateLimiter.enqueue(async () => { - for (let attempt = 1; attempt <= retryCount; attempt++) { - try { - logger.info(`[GasUsed Update] Starting update for chain ${chainId} (Attempt ${attempt}/${retryCount})`); - - const headers = { - 'Accept': 'application/json', - 'User-Agent': 'l1beat-backend', - 'Cache-Control': 'no-cache' - }; - - if (process.env.GLACIER_API_KEY) { - headers['x-api-key'] = process.env.GLACIER_API_KEY; - } - - const response = await axios.get(`${config.api.metrics.baseUrl}/chains/${chainId}/metrics/gasUsed`, { - params: { - timeInterval: 'day', - pageSize: 100 // Maximum allowed by API - }, - timeout: config.api.metrics.timeout, - headers - }); - - if (!response.data || !Array.isArray(response.data.results)) { - logger.warn(`[GasUsed Update] Invalid response format for chain ${chainId}`); - continue; - } - - const currentTime = Math.floor(Date.now() / 1000); - const thirtyDaysAgo = currentTime - (30 * 24 * 60 * 60); - - const validData = response.data.results.filter(item => { - const timestamp = Number(item.timestamp); - const value = parseFloat(item.value); - - if (isNaN(timestamp) || isNaN(value)) { - return false; - } - - return timestamp >= thirtyDaysAgo && timestamp <= currentTime; - }); - - if (validData.length > 0) { - const result = await GasUsed.bulkWrite( - validData.map(item => ({ - updateOne: { - filter: { - chainId: chainId, - timestamp: Number(item.timestamp) - }, - update: { - $set: { - value: parseFloat(item.value), - lastUpdated: new Date() - } - }, - upsert: true - } - })), - { ordered: false } - ); - - logger.info(`[GasUsed Update] Updated chain ${chainId}:`, { - upserted: result.upsertedCount, - modified: result.modifiedCount, - total: validData.length - }); - - return { - success: true, - chainId, - recordsProcessed: validData.length, - upserted: result.upsertedCount, - modified: result.modifiedCount - }; - } - - logger.info(`[GasUsed Update] No valid data for chain ${chainId}`); - return { - success: true, - chainId, - recordsProcessed: 0, - message: 'No valid data points' - }; - - } catch (error) { - logger.error(`[GasUsed Update] Error for chain ${chainId} (Attempt ${attempt}/${retryCount}):`, error.message); - - if (attempt === retryCount) { - return { - success: false, - chainId, - error: error.message - }; - } - - await new Promise(resolve => setTimeout(resolve, 2000 * attempt)); - } - } - }); - } - - async updateAllChainsGasUsed() { - try { - logger.info('[GasUsed] Starting update for all chains'); - - const chains = await Chain.find({}); - const results = []; - - for (const chain of chains) { - const chainId = chain.evmChainId || chain.chainId; - - if (!chainId || !/^\d+$/.test(String(chainId))) { - logger.warn(`[GasUsed] Skipping chain with invalid ID:`, chain.name); - continue; - } - - const result = await this.updateGasUsedData(String(chainId)); - results.push(result); - } - - const successful = results.filter(r => r.success).length; - const failed = results.filter(r => !r.success).length; - - logger.info('[GasUsed] Update completed:', { - total: chains.length, - successful, - failed - }); - - return { success: true, results }; - } catch (error) { - logger.error('[GasUsed] Error updating all chains:', error); - return { success: false, error: error.message }; - } - } - - async getGasUsedHistory(chainId, days = 30) { - try { - const endTime = Math.floor(Date.now() / 1000); - const startTime = endTime - (days * 24 * 60 * 60); - - const data = await GasUsed.find({ - chainId: String(chainId), - timestamp: { $gte: startTime, $lte: endTime } - }) - .sort({ timestamp: 1 }) - .lean(); - - return data; - } catch (error) { - logger.error(`[GasUsed] Error fetching history for chain ${chainId}:`, error); - throw error; - } - } - - async getLatestGasUsed(chainId) { - try { - const latestRecord = await GasUsed.findOne({ - chainId: String(chainId) - }) - .sort({ timestamp: -1 }) - .lean(); - - return latestRecord; - } catch (error) { - logger.error(`[GasUsed] Error fetching latest for chain ${chainId}:`, error); - throw error; - } - } - - async getNetworkGasUsedHistory(days = 30) { - try { - const endTime = Math.floor(Date.now() / 1000); - const startTime = endTime - (days * 24 * 60 * 60); - - const data = await GasUsed.find({ - timestamp: { $gte: startTime, $lte: endTime } - }) - .sort({ timestamp: 1 }) - .lean(); - - // Group by timestamp and sum gas used - const groupedData = {}; - data.forEach(record => { - if (!groupedData[record.timestamp]) { - groupedData[record.timestamp] = { - timestamp: record.timestamp, - value: 0 - }; - } - groupedData[record.timestamp].value += record.value; - }); - - return Object.values(groupedData).sort((a, b) => a.timestamp - b.timestamp); - } catch (error) { - logger.error('[GasUsed] Error fetching network history:', error); - throw error; - } - } - - async getNetworkLatestGasUsed() { - try { - // Get the most recent timestamp - const latestRecord = await GasUsed.findOne() - .sort({ timestamp: -1 }) - .lean(); - - if (!latestRecord) { - return null; - } - - const latestTimestamp = latestRecord.timestamp; - - // Get all records for that timestamp and sum them - const records = await GasUsed.find({ - timestamp: latestTimestamp - }).lean(); - - const totalGasUsed = records.reduce((sum, record) => sum + record.value, 0); - - return { - timestamp: latestTimestamp, - value: totalGasUsed, - chainCount: records.length - }; - } catch (error) { - logger.error('[GasUsed] Error fetching network latest:', error); - throw error; - } - } -} - -module.exports = new GasUsedService(); +const createMetricService = require('./metricService'); + +const service = createMetricService({ + model: GasUsed, + metricPath: 'gasUsed', + label: 'GasUsed', + aggregation: 'sum' +}); + +// Preserve the original public method names so existing routes/cron callers +// continue to work unchanged. +module.exports = { + updateGasUsedData: (chainId, retryCount) => service.updateData(chainId, retryCount), + updateAllChainsGasUsed: () => service.updateAllChains(), + getGasUsedHistory: (chainId, days) => service.getHistory(chainId, days), + getLatestGasUsed: (chainId) => service.getLatest(chainId), + getNetworkGasUsedHistory: (days) => service.getNetworkHistory(days), + getNetworkLatestGasUsed: () => service.getNetworkLatest() +}; diff --git a/src/services/maxTpsService.js b/src/services/maxTpsService.js index 0066678..053a8a6 100644 --- a/src/services/maxTpsService.js +++ b/src/services/maxTpsService.js @@ -1,307 +1,20 @@ const MaxTps = require('../models/maxTps'); -const axios = require('axios'); -const Chain = require('../models/chain'); -const config = require('../config/config'); -const logger = require('../utils/logger'); - -// Rate limiter implementation -class RateLimiter { - constructor(maxRequestsPerMinute = 30) { - this.queue = []; - this.processing = false; - this.maxRequestsPerMinute = maxRequestsPerMinute; - this.requestTimestamps = []; - } - - async enqueue(fn) { - return new Promise((resolve, reject) => { - this.queue.push({ fn, resolve, reject }); - this.processQueue(); - }); - } - - async processQueue() { - if (this.processing || this.queue.length === 0) return; - - this.processing = true; - - try { - const now = Date.now(); - this.requestTimestamps = this.requestTimestamps.filter( - timestamp => now - timestamp < 60000 - ); - - if (this.requestTimestamps.length >= this.maxRequestsPerMinute) { - const oldestTimestamp = this.requestTimestamps[0]; - const timeToWait = 60000 - (now - oldestTimestamp); - - logger.info(`Rate limit reached, waiting ${Math.round(timeToWait/1000)}s before next request`); - - setTimeout(() => { - this.processing = false; - this.processQueue(); - }, timeToWait + 100); - - return; - } - - const item = this.queue.shift(); - this.requestTimestamps.push(now); - - try { - const result = await item.fn(); - item.resolve(result); - } catch (error) { - item.reject(error); - } - - setTimeout(() => { - this.processing = false; - this.processQueue(); - }, 300); - } catch (error) { - logger.error('Error in rate limiter:', error); - this.processing = false; - } - } -} - -const metricsApiRateLimiter = new RateLimiter(config.api.metrics.rateLimit.requestsPerMinute || 20); - -class MaxTpsService { - async updateMaxTpsData(chainId, retryCount = config.api.metrics.rateLimit.maxRetries || 3) { - return metricsApiRateLimiter.enqueue(async () => { - for (let attempt = 1; attempt <= retryCount; attempt++) { - try { - logger.info(`[MaxTps Update] Starting update for chain ${chainId} (Attempt ${attempt}/${retryCount})`); - - const headers = { - 'Accept': 'application/json', - 'User-Agent': 'l1beat-backend', - 'Cache-Control': 'no-cache' - }; - - if (process.env.GLACIER_API_KEY) { - headers['x-api-key'] = process.env.GLACIER_API_KEY; - } - - const response = await axios.get(`${config.api.metrics.baseUrl}/chains/${chainId}/metrics/maxTps`, { - params: { - timeInterval: 'day', - pageSize: 100 // Maximum allowed by API - }, - timeout: config.api.metrics.timeout, - headers - }); - - if (!response.data || !Array.isArray(response.data.results)) { - logger.warn(`[MaxTps Update] Invalid response format for chain ${chainId}`); - continue; - } - - const currentTime = Math.floor(Date.now() / 1000); - const thirtyDaysAgo = currentTime - (30 * 24 * 60 * 60); - - const validData = response.data.results.filter(item => { - const timestamp = Number(item.timestamp); - const value = parseFloat(item.value); - - if (isNaN(timestamp) || isNaN(value)) { - return false; - } - - return timestamp >= thirtyDaysAgo && timestamp <= currentTime; - }); - - if (validData.length > 0) { - const result = await MaxTps.bulkWrite( - validData.map(item => ({ - updateOne: { - filter: { - chainId: chainId, - timestamp: Number(item.timestamp) - }, - update: { - $set: { - value: parseFloat(item.value), - lastUpdated: new Date() - } - }, - upsert: true - } - })), - { ordered: false } - ); - - logger.info(`[MaxTps Update] Updated chain ${chainId}:`, { - upserted: result.upsertedCount, - modified: result.modifiedCount, - total: validData.length - }); - - return { - success: true, - chainId, - recordsProcessed: validData.length, - upserted: result.upsertedCount, - modified: result.modifiedCount - }; - } - - logger.info(`[MaxTps Update] No valid data for chain ${chainId}`); - return { - success: true, - chainId, - recordsProcessed: 0, - message: 'No valid data points' - }; - - } catch (error) { - logger.error(`[MaxTps Update] Error for chain ${chainId} (Attempt ${attempt}/${retryCount}):`, error.message); - - if (attempt === retryCount) { - return { - success: false, - chainId, - error: error.message - }; - } - - await new Promise(resolve => setTimeout(resolve, 2000 * attempt)); - } - } - }); - } - - async updateAllChainsMaxTps() { - try { - logger.info('[MaxTps] Starting update for all chains'); - - const chains = await Chain.find({}); - const results = []; - - for (const chain of chains) { - const chainId = chain.evmChainId || chain.chainId; - - if (!chainId || !/^\d+$/.test(String(chainId))) { - logger.warn(`[MaxTps] Skipping chain with invalid ID:`, chain.name); - continue; - } - - const result = await this.updateMaxTpsData(String(chainId)); - results.push(result); - } - - const successful = results.filter(r => r.success).length; - const failed = results.filter(r => !r.success).length; - - logger.info('[MaxTps] Update completed:', { - total: chains.length, - successful, - failed - }); - - return { success: true, results }; - } catch (error) { - logger.error('[MaxTps] Error updating all chains:', error); - return { success: false, error: error.message }; - } - } - - async getMaxTpsHistory(chainId, days = 30) { - try { - const endTime = Math.floor(Date.now() / 1000); - const startTime = endTime - (days * 24 * 60 * 60); - - const data = await MaxTps.find({ - chainId: String(chainId), - timestamp: { $gte: startTime, $lte: endTime } - }) - .sort({ timestamp: 1 }) - .lean(); - - return data; - } catch (error) { - logger.error(`[MaxTps] Error fetching history for chain ${chainId}:`, error); - throw error; - } - } - - async getLatestMaxTps(chainId) { - try { - const latestRecord = await MaxTps.findOne({ - chainId: String(chainId) - }) - .sort({ timestamp: -1 }) - .lean(); - - return latestRecord; - } catch (error) { - logger.error(`[MaxTps] Error fetching latest for chain ${chainId}:`, error); - throw error; - } - } - - async getNetworkMaxTpsHistory(days = 30) { - try { - const endTime = Math.floor(Date.now() / 1000); - const startTime = endTime - (days * 24 * 60 * 60); - - const data = await MaxTps.find({ - timestamp: { $gte: startTime, $lte: endTime } - }) - .sort({ timestamp: 1 }) - .lean(); - - // Group by timestamp and sum max TPS - const groupedData = {}; - data.forEach(record => { - if (!groupedData[record.timestamp]) { - groupedData[record.timestamp] = { - timestamp: record.timestamp, - value: 0 - }; - } - groupedData[record.timestamp].value += record.value; - }); - - return Object.values(groupedData).sort((a, b) => a.timestamp - b.timestamp); - } catch (error) { - logger.error('[MaxTps] Error fetching network history:', error); - throw error; - } - } - - async getNetworkLatestMaxTps() { - try { - // Get the most recent timestamp - const latestRecord = await MaxTps.findOne() - .sort({ timestamp: -1 }) - .lean(); - - if (!latestRecord) { - return null; - } - - const latestTimestamp = latestRecord.timestamp; - - // Get all records for that timestamp and sum them - const records = await MaxTps.find({ - timestamp: latestTimestamp - }).lean(); - - const totalMaxTps = records.reduce((sum, record) => sum + record.value, 0); - - return { - timestamp: latestTimestamp, - value: totalMaxTps, - chainCount: records.length - }; - } catch (error) { - logger.error('[MaxTps] Error fetching network latest:', error); - throw error; - } - } -} - -module.exports = new MaxTpsService(); +const createMetricService = require('./metricService'); + +const service = createMetricService({ + model: MaxTps, + metricPath: 'maxTps', + label: 'MaxTps', + aggregation: 'sum' +}); + +// Preserve the original public method names so existing routes/cron callers +// continue to work unchanged. +module.exports = { + updateMaxTpsData: (chainId, retryCount) => service.updateData(chainId, retryCount), + updateAllChainsMaxTps: () => service.updateAllChains(), + getMaxTpsHistory: (chainId, days) => service.getHistory(chainId, days), + getLatestMaxTps: (chainId) => service.getLatest(chainId), + getNetworkMaxTpsHistory: (days) => service.getNetworkHistory(days), + getNetworkLatestMaxTps: () => service.getNetworkLatest() +}; diff --git a/src/services/metricService.js b/src/services/metricService.js new file mode 100644 index 0000000..ceeecbb --- /dev/null +++ b/src/services/metricService.js @@ -0,0 +1,368 @@ +const axios = require('axios'); +const Chain = require('../models/chain'); +const config = require('../config/config'); +const logger = require('../utils/logger'); + +/** + * Generic per-chain time-series metric service. + * + * The Metrics API exposes several day-bucketed metrics (activeAddresses, + * txCount, gasUsed, avgGasPrice, maxTps, feesPaid) that are fetched, stored, + * and queried identically. This factory captures that shared behavior so each + * concrete service is just a thin configuration wrapper. + * + * @param {Object} options + * @param {import('mongoose').Model} options.model Mongoose model storing { chainId, timestamp, value, lastUpdated }. + * @param {string} options.metricPath Metrics API path segment, e.g. 'activeAddresses'. + * @param {string} options.label Human label used in log lines, e.g. 'Active Addresses'. + * @param {'sum'|'avg'} [options.aggregation='sum'] How to combine per-chain values into a network value. + */ +function createMetricService({ model, metricPath, label, aggregation = 'sum' }) { + if (!model) throw new Error('createMetricService: model is required'); + if (!metricPath) throw new Error('createMetricService: metricPath is required'); + if (!label) throw new Error('createMetricService: label is required'); + + // Rate limiter implementation (one instance per metric service, matching the + // original per-service behavior: each metric gets its own request budget). + class RateLimiter { + constructor(maxRequestsPerMinute = 30) { + this.queue = []; + this.processing = false; + this.maxRequestsPerMinute = maxRequestsPerMinute; + this.requestTimestamps = []; + } + + async enqueue(fn) { + return new Promise((resolve, reject) => { + this.queue.push({ fn, resolve, reject }); + this.processQueue(); + }); + } + + async processQueue() { + if (this.processing || this.queue.length === 0) return; + + this.processing = true; + + try { + const now = Date.now(); + this.requestTimestamps = this.requestTimestamps.filter( + timestamp => now - timestamp < 60000 + ); + + if (this.requestTimestamps.length >= this.maxRequestsPerMinute) { + const oldestTimestamp = this.requestTimestamps[0]; + const timeToWait = 60000 - (now - oldestTimestamp); + + logger.info(`Rate limit reached, waiting ${Math.round(timeToWait / 1000)}s before next request`); + + setTimeout(() => { + this.processing = false; + this.processQueue(); + }, timeToWait + 100); + + return; + } + + const item = this.queue.shift(); + this.requestTimestamps.push(now); + + try { + const result = await item.fn(); + item.resolve(result); + } catch (error) { + item.reject(error); + } + + setTimeout(() => { + this.processing = false; + this.processQueue(); + }, 300); + } catch (error) { + logger.error('Error in rate limiter:', error); + this.processing = false; + } + } + } + + const metricsApiRateLimiter = new RateLimiter(config.api.metrics.rateLimit.requestsPerMinute || 20); + + async function updateData(chainId, retryCount = config.api.metrics.rateLimit.maxRetries || 3) { + return metricsApiRateLimiter.enqueue(async () => { + for (let attempt = 1; attempt <= retryCount; attempt++) { + try { + logger.info(`[${label} Update] Starting update for chain ${chainId} (Attempt ${attempt}/${retryCount})`); + + const headers = { + 'Accept': 'application/json', + 'User-Agent': 'l1beat-backend', + 'Cache-Control': 'no-cache' + }; + + if (process.env.GLACIER_API_KEY) { + headers['x-api-key'] = process.env.GLACIER_API_KEY; + } + + const response = await axios.get(`${config.api.metrics.baseUrl}/chains/${chainId}/metrics/${metricPath}`, { + params: { + timeInterval: 'day', + pageSize: 100 // Maximum allowed by API + }, + timeout: config.api.metrics.timeout, + headers + }); + + if (!response.data || !Array.isArray(response.data.results)) { + logger.warn(`[${label} Update] Invalid response format for chain ${chainId}`); + continue; + } + + const currentTime = Math.floor(Date.now() / 1000); + const thirtyDaysAgo = currentTime - (30 * 24 * 60 * 60); + + const validData = response.data.results.filter(item => { + const timestamp = Number(item.timestamp); + const value = parseFloat(item.value); + + if (isNaN(timestamp) || isNaN(value)) { + return false; + } + + return timestamp >= thirtyDaysAgo && timestamp <= currentTime; + }); + + if (validData.length > 0) { + const result = await model.bulkWrite( + validData.map(item => ({ + updateOne: { + filter: { + chainId: chainId, + timestamp: Number(item.timestamp) + }, + update: { + $set: { + value: parseFloat(item.value), + lastUpdated: new Date() + } + }, + upsert: true + } + })), + { ordered: false } + ); + + logger.info(`[${label} Update] Updated chain ${chainId}:`, { + upserted: result.upsertedCount, + modified: result.modifiedCount, + total: validData.length + }); + + return { + success: true, + chainId, + recordsProcessed: validData.length, + upserted: result.upsertedCount, + modified: result.modifiedCount + }; + } + + logger.info(`[${label} Update] No valid data for chain ${chainId}`); + return { + success: true, + chainId, + recordsProcessed: 0, + message: 'No valid data points' + }; + + } catch (error) { + logger.error(`[${label} Update] Error for chain ${chainId} (Attempt ${attempt}/${retryCount}):`, error.message); + + if (attempt === retryCount) { + return { + success: false, + chainId, + error: error.message + }; + } + + await new Promise(resolve => setTimeout(resolve, 2000 * attempt)); + } + } + + // All retries exhausted without ever getting a valid response (e.g. the + // API kept returning a 200 with a non-array `results` payload, hitting the + // `continue` above each attempt). Return an explicit failure result so + // updateAllChains can count it instead of crashing on `undefined.success`. + return { + success: false, + chainId, + error: 'No valid response after all retries' + }; + }); + } + + async function updateAllChains() { + try { + logger.info(`[${label}] Starting update for all chains`); + + const chains = await Chain.find({}); + const results = []; + + for (const chain of chains) { + const chainId = chain.evmChainId || chain.chainId; + + if (!chainId || !/^\d+$/.test(String(chainId))) { + logger.warn(`[${label}] Skipping chain with invalid ID:`, chain.name); + continue; + } + + const result = await updateData(String(chainId)); + results.push(result); + } + + const successful = results.filter(r => r.success).length; + const failed = results.filter(r => !r.success).length; + + logger.info(`[${label}] Update completed:`, { + total: chains.length, + successful, + failed + }); + + return { success: true, results }; + } catch (error) { + logger.error(`[${label}] Error updating all chains:`, error); + return { success: false, error: error.message }; + } + } + + async function getHistory(chainId, days = 30) { + try { + const endTime = Math.floor(Date.now() / 1000); + const startTime = endTime - (days * 24 * 60 * 60); + + const data = await model.find({ + chainId: String(chainId), + timestamp: { $gte: startTime, $lte: endTime } + }) + .sort({ timestamp: 1 }) + .lean(); + + return data; + } catch (error) { + logger.error(`[${label}] Error fetching history for chain ${chainId}:`, error); + throw error; + } + } + + async function getLatest(chainId) { + try { + const latestRecord = await model.findOne({ + chainId: String(chainId) + }) + .sort({ timestamp: -1 }) + .lean(); + + return latestRecord; + } catch (error) { + logger.error(`[${label}] Error fetching latest for chain ${chainId}:`, error); + throw error; + } + } + + async function getNetworkHistory(days = 30) { + try { + const endTime = Math.floor(Date.now() / 1000); + const startTime = endTime - (days * 24 * 60 * 60); + + const data = await model.find({ + timestamp: { $gte: startTime, $lte: endTime } + }) + .sort({ timestamp: 1 }) + .lean(); + + if (aggregation === 'avg') { + // Group by timestamp and calculate the average across all chains. + const groupedData = {}; + data.forEach(record => { + if (!groupedData[record.timestamp]) { + groupedData[record.timestamp] = { + timestamp: record.timestamp, + sum: 0, + count: 0 + }; + } + groupedData[record.timestamp].sum += record.value; + groupedData[record.timestamp].count += 1; + }); + + return Object.values(groupedData).map(item => ({ + timestamp: item.timestamp, + value: item.sum / item.count // Average across all chains + })).sort((a, b) => a.timestamp - b.timestamp); + } + + // Group by timestamp and sum across all chains. + const groupedData = {}; + data.forEach(record => { + if (!groupedData[record.timestamp]) { + groupedData[record.timestamp] = { + timestamp: record.timestamp, + value: 0 + }; + } + groupedData[record.timestamp].value += record.value; + }); + + return Object.values(groupedData).sort((a, b) => a.timestamp - b.timestamp); + } catch (error) { + logger.error(`[${label}] Error fetching network history:`, error); + throw error; + } + } + + async function getNetworkLatest() { + try { + // Get the most recent timestamp + const latestRecord = await model.findOne() + .sort({ timestamp: -1 }) + .lean(); + + if (!latestRecord) { + return null; + } + + const latestTimestamp = latestRecord.timestamp; + + // Get all records for that timestamp and combine them + const records = await model.find({ + timestamp: latestTimestamp + }).lean(); + + const total = records.reduce((sum, record) => sum + record.value, 0); + const value = aggregation === 'avg' + ? (records.length ? total / records.length : 0) // Average across all chains + : total; // Sum across all chains + + return { + timestamp: latestTimestamp, + value, + chainCount: records.length + }; + } catch (error) { + logger.error(`[${label}] Error fetching network latest:`, error); + throw error; + } + } + + return { + updateData, + updateAllChains, + getHistory, + getLatest, + getNetworkHistory, + getNetworkLatest + }; +} + +module.exports = createMetricService; diff --git a/src/services/teleporterService 2.js b/src/services/teleporterService 2.js deleted file mode 100644 index 9767b58..0000000 --- a/src/services/teleporterService 2.js +++ /dev/null @@ -1,703 +0,0 @@ -const axios = require('axios'); -const config = require('../config/config'); -const logger = require('../utils/logger'); -const { TeleporterMessage, TeleporterUpdateState } = require('../models/teleporterMessage'); - -class TeleporterService { - constructor() { - this.GLACIER_API_BASE = process.env.GLACIER_API_BASE || config.api.glacier.baseUrl; - this.GLACIER_API_KEY = process.env.GLACIER_API_KEY; - this.UPDATE_INTERVAL = 60 * 60 * 1000; // 1 hour in milliseconds - this.TIMEOUT = 30000; // 30 seconds - - if (!this.GLACIER_API_KEY) { - logger.warn('GLACIER_API_KEY not found in environment variables'); - } - - if (!this.GLACIER_API_BASE) { - logger.error('GLACIER_API_BASE not configured'); - } - - logger.info('TeleporterService initialized', { - hasApiKey: !!this.GLACIER_API_KEY, - apiBase: this.GLACIER_API_BASE - }); - } - - /** - * Fetch ICM messages from Glacier API - * @param {number} hoursAgo - How many hours ago to start fetching from - * @returns {Promise} Array of messages - */ - async fetchICMMessages(hoursAgo = 24) { - try { - const endTime = Math.floor(Date.now() / 1000); - const startTime = endTime - (hoursAgo * 60 * 60); - - const headers = { - 'Accept': 'application/json', - 'User-Agent': 'l1beat-backend' - }; - - if (this.GLACIER_API_KEY) { - headers['x-glacier-api-key'] = this.GLACIER_API_KEY; - } - - const params = { - startTime, - endTime, - network: 'mainnet', - pageSize: 100 - }; - - logger.info(`Fetching ICM messages from ${hoursAgo} hours ago`, { - startTime, - endTime, - startTimeISO: new Date(startTime * 1000).toISOString(), - endTimeISO: new Date(endTime * 1000).toISOString() - }); - - let allMessages = []; - let nextPageToken = null; - let pageCount = 0; - const maxPages = 1000; // Higher safety limit - let reachedTimeLimit = false; - - do { - pageCount++; - - if (nextPageToken) { - params.pageToken = nextPageToken; - } - - const response = await axios.get(`${this.GLACIER_API_BASE}/icm/messages`, { - headers, - params, - timeout: this.TIMEOUT - }); - - const messages = response.data?.messages || []; - let validMessages = []; - - // Check each message timestamp to see if it's within our time window - for (const message of messages) { - let messageTimestamp = null; - - // Try to get timestamp from sourceTransaction first, then fallback to message timestamp - if (message.sourceTransaction && message.sourceTransaction.timestamp) { - messageTimestamp = message.sourceTransaction.timestamp; - } else if (message.timestamp) { - messageTimestamp = message.timestamp; - } - - if (!messageTimestamp) { - // If no timestamp found, include the message (we can't determine its age) - validMessages.push(message); - continue; - } - - // Convert timestamp to seconds if it's in milliseconds - const timestampInSeconds = messageTimestamp > 1000000000000 - ? Math.floor(messageTimestamp / 1000) - : messageTimestamp; - - // Check if the message is within our time range - if (timestampInSeconds >= startTime) { - validMessages.push(message); - } else { - // Found a message older than our time window, stop pagination - reachedTimeLimit = true; - logger.info(`Found message older than ${hoursAgo} hours, stopping pagination`, { - messageTimestamp: new Date(timestampInSeconds * 1000).toISOString(), - startTime: new Date(startTime * 1000).toISOString(), - page: pageCount, - messageId: message.messageId || 'unknown' - }); - break; - } - } - - // Add valid messages from this page to our collection - allMessages = allMessages.concat(validMessages); - nextPageToken = response.data?.nextPageToken; - - logger.info(`Fetched page ${pageCount}, got ${messages.length} messages (${validMessages.length} valid)`, { - totalMessages: allMessages.length, - hasNextPage: !!nextPageToken, - reachedTimeLimit - }); - - // If we reached the time limit, stop pagination - if (reachedTimeLimit) { - logger.info(`Reached time limit (${hoursAgo} hours), stopping pagination`); - break; - } - - // Safety check to prevent infinite loops (should rarely be hit now) - if (pageCount >= maxPages) { - logger.warn(`Reached maximum page limit (${maxPages}), stopping pagination`); - break; - } - - // Small delay between requests to be respectful to the API - if (nextPageToken) { - await new Promise(resolve => setTimeout(resolve, 1000)); - } - - } while (nextPageToken); - - logger.info(`Completed fetching ICM messages: ${allMessages.length} total messages from ${pageCount} pages`, { - reachedTimeLimit, - hitPageLimit: pageCount >= maxPages - }); - return allMessages; - - } catch (error) { - logger.error('Error fetching ICM messages:', { - message: error.message, - status: error.response?.status, - statusText: error.response?.statusText - }); - throw error; - } - } - - /** - * Fetch chain data and create chainId to chainName mapping - * @returns {Promise} Mapping of chainId to chainName - */ - async getChainMapping() { - try { - // Check if we have cached mapping (cache for 1 hour) - if (this.chainMapping && this.chainMappingLastUpdate && - (Date.now() - this.chainMappingLastUpdate) < (60 * 60 * 1000)) { - return this.chainMapping; - } - - logger.info('Fetching chain data for name mapping...'); - - // Fetch chain data from our own API - const response = await axios.get(`http://localhost:${process.env.PORT || 5001}/api/chains`, { - timeout: this.TIMEOUT, - headers: { - 'Accept': 'application/json', - 'User-Agent': 'l1beat-backend-internal' - } - }); - - const chains = response.data || []; - const mapping = {}; - - for (const chain of chains) { - if (chain.evmChainId && chain.chainName) { - mapping[chain.evmChainId] = chain.chainName; - } - } - - // Cache the mapping - this.chainMapping = mapping; - this.chainMappingLastUpdate = Date.now(); - - logger.info(`Created chain mapping for ${Object.keys(mapping).length} chains`); - return mapping; - - } catch (error) { - logger.error('Error fetching chain mapping:', error); - // Return empty mapping as fallback - return this.chainMapping || {}; - } - } - - /** - * Process messages to count by chain pairs - * @param {Array} messages - Array of ICM messages - * @returns {Array} Processed message counts - */ - async processMessages(messages) { - const counts = {}; - - logger.info(`Processing ${messages.length} ICM messages`); - - // Get chain mapping - const chainMapping = await this.getChainMapping(); - - for (const message of messages) { - if (!message.sourceEvmChainId || !message.destinationEvmChainId) { - continue; // Skip messages without chain IDs - } - - // Use chain names if available, fallback to "Chain {id}" format - const sourceChain = chainMapping[message.sourceEvmChainId] || `Chain ${message.sourceEvmChainId}`; - const destinationChain = chainMapping[message.destinationEvmChainId] || `Chain ${message.destinationEvmChainId}`; - const key = `${sourceChain}|${destinationChain}`; - - if (!counts[key]) { - counts[key] = { - sourceChain, - destinationChain, - messageCount: 0 - }; - } - counts[key].messageCount++; - } - - const result = Object.values(counts).sort((a, b) => b.messageCount - a.messageCount); - - logger.info(`Processed messages into ${result.length} chain pairs with actual chain names`); - return result; - } - - /** - * Update daily teleporter data - */ - async updateDailyData() { - try { - logger.info('[TELEPORTER DAILY] Starting daily teleporter data update (last 24 hours)'); - - // Check if update is already in progress - const existingUpdate = await TeleporterUpdateState.findOne({ - updateType: 'daily', - state: 'in_progress' - }); - - if (existingUpdate) { - const timeSinceUpdate = Date.now() - new Date(existingUpdate.lastUpdatedAt).getTime(); - if (timeSinceUpdate < 10 * 60 * 1000) { // 10 minutes - logger.info('[TELEPORTER DAILY] Update already in progress, skipping'); - return { success: true, status: 'in_progress' }; - } - - // Mark stale update as failed - existingUpdate.state = 'failed'; - existingUpdate.error = { message: 'Update timed out' }; - await existingUpdate.save(); - logger.warn('[TELEPORTER DAILY] Marked stale update as failed, proceeding with new update'); - } - - // Create new update state - const updateState = new TeleporterUpdateState({ - updateType: 'daily', - state: 'in_progress', - startedAt: new Date(), - lastUpdatedAt: new Date() - }); - await updateState.save(); - - logger.info('[TELEPORTER DAILY] Fetching ICM messages for last 24 hours...'); - // Fetch and process messages - const messages = await this.fetchICMMessages(24); - logger.info(`[TELEPORTER DAILY] Fetched ${messages.length} raw messages from Glacier API`); - - const processedData = await this.processMessages(messages); - logger.info(`[TELEPORTER DAILY] Processed into ${processedData.length} unique chain pairs`); - - // Clean up old daily data (older than 90 days) to prevent database bloat - const ninetyDaysAgo = new Date(); - ninetyDaysAgo.setDate(ninetyDaysAgo.getDate() - 90); - - const deletedCount = await TeleporterMessage.deleteMany({ - dataType: 'daily', - updatedAt: { $lt: ninetyDaysAgo } - }); - - if (deletedCount.deletedCount > 0) { - logger.info(`[TELEPORTER DAILY] Cleaned up ${deletedCount.deletedCount} old daily records (>90 days)`); - } - - // Check if we already have data for today (to avoid duplicates) - const today = new Date(); - const todayStart = new Date(today.getFullYear(), today.getMonth(), today.getDate()); - const todayEnd = new Date(todayStart.getTime() + 24 * 60 * 60 * 1000); - - const existingTodayData = await TeleporterMessage.findOne({ - dataType: 'daily', - updatedAt: { $gte: todayStart, $lt: todayEnd } - }); - - if (existingTodayData) { - // Update existing data for today - existingTodayData.updatedAt = new Date(); - existingTodayData.messageCounts = processedData; - existingTodayData.totalMessages = messages.length; - existingTodayData.timeWindow = 24; - await existingTodayData.save(); - logger.info(`[TELEPORTER DAILY] Updated existing daily snapshot for today`); - } else { - // Create new daily snapshot - const teleporterData = new TeleporterMessage({ - updatedAt: new Date(), - messageCounts: processedData, - totalMessages: messages.length, - timeWindow: 24, - dataType: 'daily' - }); - await teleporterData.save(); - logger.info(`[TELEPORTER DAILY] Created new daily snapshot`); - } - - // Update state to completed - updateState.state = 'completed'; - updateState.lastUpdatedAt = new Date(); - await updateState.save(); - - logger.info(`[TELEPORTER DAILY] ✅ Successfully completed daily update: ${messages.length} messages, ${processedData.length} chain pairs`); - - return { - success: true, - messageCount: processedData.length, - totalMessages: messages.length - }; - - } catch (error) { - logger.error('[TELEPORTER DAILY] ❌ Error updating daily data:', error); - - // Update state to failed - const updateState = await TeleporterUpdateState.findOne({ updateType: 'daily' }); - if (updateState) { - updateState.state = 'failed'; - updateState.error = { message: error.message }; - updateState.lastUpdatedAt = new Date(); - await updateState.save(); - } - - throw error; - } - } - - /** - * Update weekly teleporter data (last 7 days) - */ - async updateWeeklyData() { - try { - logger.info('[TELEPORTER WEEKLY] Starting weekly teleporter data update (last 7 days)'); - - // Check if update is already in progress - const existingUpdate = await TeleporterUpdateState.findOne({ - updateType: 'weekly', - state: 'in_progress' - }); - - if (existingUpdate) { - const timeSinceUpdate = Date.now() - new Date(existingUpdate.lastUpdatedAt).getTime(); - if (timeSinceUpdate < 30 * 60 * 1000) { // 30 minutes for weekly - logger.info('[TELEPORTER WEEKLY] Weekly update already in progress, skipping'); - return { success: true, status: 'in_progress' }; - } - - // Mark stale update as failed - existingUpdate.state = 'failed'; - existingUpdate.error = { message: 'Update timed out' }; - await existingUpdate.save(); - logger.warn('[TELEPORTER WEEKLY] Marked stale update as failed, proceeding with new update'); - } - - // Create new update state - const updateState = new TeleporterUpdateState({ - updateType: 'weekly', - state: 'in_progress', - startedAt: new Date(), - lastUpdatedAt: new Date() - }); - await updateState.save(); - - // Fetch and process messages for the last 7 days (168 hours) - logger.info('[TELEPORTER WEEKLY] Fetching ICM messages for last 7 days (168 hours)...'); - const messages = await this.fetchICMMessages(168); // 7 * 24 = 168 hours - logger.info(`[TELEPORTER WEEKLY] Fetched ${messages.length} raw messages from Glacier API`); - - const processedData = await this.processMessages(messages); - logger.info(`[TELEPORTER WEEKLY] Processed into ${processedData.length} unique chain pairs`); - - // Save to database (replace existing weekly data) - await TeleporterMessage.deleteMany({ dataType: 'weekly' }); - - const teleporterData = new TeleporterMessage({ - updatedAt: new Date(), - messageCounts: processedData, - totalMessages: messages.length, - timeWindow: 168, - dataType: 'weekly' - }); - await teleporterData.save(); - - // Update state to completed - updateState.state = 'completed'; - updateState.lastUpdatedAt = new Date(); - await updateState.save(); - - logger.info(`[TELEPORTER WEEKLY] ✅ Successfully completed weekly update: ${messages.length} messages, ${processedData.length} chain pairs`); - - return { - success: true, - messageCount: processedData.length, - totalMessages: messages.length - }; - - } catch (error) { - logger.error('[TELEPORTER WEEKLY] ❌ Error updating weekly data:', error); - - // Update state to failed - const updateState = await TeleporterUpdateState.findOne({ updateType: 'weekly' }); - if (updateState) { - updateState.state = 'failed'; - updateState.error = { message: error.message }; - updateState.lastUpdatedAt = new Date(); - await updateState.save(); - } - - throw error; - } - } - - /** - * Get daily message counts - * @returns {Promise} Daily message count data - */ - async getDailyMessageCounts() { - try { - // Get from database first - const data = await TeleporterMessage.findOne({ dataType: 'daily' }) - .sort({ updatedAt: -1 }); - - if (data) { - const age = Date.now() - new Date(data.updatedAt).getTime(); - - // If data is older than 1 hour, trigger background update - if (age > this.UPDATE_INTERVAL) { - logger.info('[TELEPORTER DAILY] Data is older than 1 hour, triggering background update'); - this.updateDailyData().catch(err => { - logger.error('[TELEPORTER DAILY] Background update failed:', err); - }); - } - - return { - data: data.messageCounts.map(item => ({ - sourceChain: item.sourceChain, - destinationChain: item.destinationChain, - messageCount: item.messageCount - })), - metadata: { - totalMessages: data.totalMessages, - timeWindow: data.timeWindow, - timeWindowUnit: 'hours', - updatedAt: data.updatedAt - } - }; - } - - // If no data exists, trigger update and return empty result - logger.info('[TELEPORTER DAILY] No daily data found, triggering initial update'); - this.updateDailyData().catch(err => { - logger.error('[TELEPORTER DAILY] Initial update failed:', err); - }); - - return { - data: [], - metadata: { - totalMessages: 0, - timeWindow: 24, - timeWindowUnit: 'hours', - updatedAt: new Date() - } - }; - - } catch (error) { - logger.error('Error getting daily message counts:', error); - throw error; - } - } - - /** - * Get weekly message counts (last 7 days) - * @returns {Promise} Weekly message count data - */ - async getWeeklyMessageCounts() { - try { - // Get from database first - const data = await TeleporterMessage.findOne({ dataType: 'weekly' }) - .sort({ updatedAt: -1 }); - - if (data) { - const age = Date.now() - new Date(data.updatedAt).getTime(); - - // If data is older than 6 hours, trigger background update (weekly data doesn't need to be as fresh) - if (age > 6 * 60 * 60 * 1000) { - logger.info('[TELEPORTER WEEKLY] Weekly data is older than 6 hours, triggering background update'); - this.updateWeeklyData().catch(err => { - logger.error('[TELEPORTER WEEKLY] Background weekly update failed:', err); - }); - } - - return { - data: data.messageCounts.map(item => ({ - sourceChain: item.sourceChain, - destinationChain: item.destinationChain, - messageCount: item.messageCount - })), - metadata: { - totalMessages: data.totalMessages, - timeWindow: data.timeWindow, - timeWindowUnit: 'hours', - updatedAt: data.updatedAt - } - }; - } - - // If no data exists, trigger update and return empty result - logger.info('[TELEPORTER WEEKLY] No weekly data found, triggering initial update'); - this.updateWeeklyData().catch(err => { - logger.error('[TELEPORTER WEEKLY] Initial weekly update failed:', err); - }); - - return { - data: [], - metadata: { - totalMessages: 0, - timeWindow: 168, - timeWindowUnit: 'hours', - updatedAt: new Date() - } - }; - - } catch (error) { - logger.error('Error getting weekly message counts:', error); - throw error; - } - } - - /** - * Legacy method to maintain compatibility with existing controller - * @param {string} requestId - Optional request ID for tracking - * @returns {Promise} Array of message counts - */ - async getDailyCrossChainMessageCount(requestId = 'unknown') { - try { - const result = await this.getDailyMessageCounts(); - return result.data; - } catch (error) { - logger.error('Error in legacy getDailyCrossChainMessageCount:', error); - return []; - } - } - - /** - * Legacy method to maintain compatibility with existing controller - * @returns {Promise} Array of weekly message counts - */ - async getWeeklyCrossChainMessageCount() { - try { - const result = await this.getWeeklyMessageCounts(); - return result.data; - } catch (error) { - logger.error('Error in legacy getWeeklyCrossChainMessageCount:', error); - return []; - } - } - - /** - * Legacy method to maintain compatibility - * @returns {Promise} Message count data or null - */ - async getAnyMessageCountFromDB(dataType = 'daily') { - try { - const data = await TeleporterMessage.findOne({ dataType }) - .sort({ updatedAt: -1 }); - return data; - } catch (error) { - logger.error('Error getting message count from database:', error); - return null; - } - } - - /** - * Start periodic updates - */ - startPeriodicUpdates() { - // Initial updates - this.updateDailyData().catch(err => { - logger.error('Initial daily update failed:', err); - }); - - this.updateWeeklyData().catch(err => { - logger.error('Initial weekly update failed:', err); - }); - - // Set up hourly updates for daily data - setInterval(() => { - this.updateDailyData().catch(err => { - logger.error('Periodic daily update failed:', err); - }); - }, this.UPDATE_INTERVAL); - - // Set up daily updates for weekly data (every 24 hours) - setInterval(() => { - this.updateWeeklyData().catch(err => { - logger.error('Periodic weekly update failed:', err); - }); - }, 24 * 60 * 60 * 1000); // 24 hours - - logger.info('Started periodic updates (daily: every hour, weekly: every 24 hours)'); - } - - /** - * Legacy method for backward compatibility with existing cron jobs - * @param {string} requestId - Optional request ID for tracking - * @returns {Promise} Update result - */ - async updateTeleporterData(requestId = 'unknown') { - return await this.updateDailyData(); - } - - /** - * Get historical daily cross-chain message counts for the past N days - * @param {number} days - Number of days to retrieve (default: 30) - * @returns {Promise} Array of historical daily data - */ - async getHistoricalDailyData(days = 30) { - try { - logger.info(`[TELEPORTER HISTORICAL] Fetching historical daily data for last ${days} days`); - - // Calculate the date threshold - const dateThreshold = new Date(); - dateThreshold.setDate(dateThreshold.getDate() - days); - - // Query for historical daily snapshots - const historicalData = await TeleporterMessage.find({ - dataType: 'daily', - updatedAt: { $gte: dateThreshold } - }) - .sort({ updatedAt: -1 }) - .lean(); // Use lean() for better performance since we're not modifying the docs - - logger.info(`[TELEPORTER HISTORICAL] Found ${historicalData.length} daily snapshots in the last ${days} days`); - - // Group by date to handle potential duplicate entries on the same day - const groupedByDate = new Map(); - - historicalData.forEach(entry => { - const entryDate = new Date(entry.updatedAt); - const dateKey = `${entryDate.getFullYear()}-${(entryDate.getMonth() + 1).toString().padStart(2, '0')}-${entryDate.getDate().toString().padStart(2, '0')}`; - - // Keep only the most recent entry for each day - if (!groupedByDate.has(dateKey) || - new Date(entry.updatedAt) > new Date(groupedByDate.get(dateKey).updatedAt)) { - groupedByDate.set(dateKey, entry); - } - }); - - // Convert back to array and sort by date (most recent first) - const uniqueDailyData = Array.from(groupedByDate.values()) - .sort((a, b) => new Date(b.updatedAt) - new Date(a.updatedAt)); - - logger.info(`[TELEPORTER HISTORICAL] Returning ${uniqueDailyData.length} unique daily snapshots`); - - return uniqueDailyData; - - } catch (error) { - logger.error('[TELEPORTER HISTORICAL] Error fetching historical daily data:', error); - throw error; - } - } -} - -module.exports = new TeleporterService(); \ No newline at end of file diff --git a/src/services/teleporterService.js b/src/services/teleporterService.js index b0f7905..da45619 100644 --- a/src/services/teleporterService.js +++ b/src/services/teleporterService.js @@ -38,10 +38,25 @@ class TeleporterService { * @returns {Promise} Array of messages */ async fetchICMMessages(hoursAgo = 24, updateType = 'daily', ownershipCheck = null) { + const endTime = Math.floor(Date.now() / 1000); + const startTime = endTime - (hoursAgo * 60 * 60); + return this.fetchICMMessagesInWindow(startTime, endTime, updateType, ownershipCheck); + } + + /** + * Fetch ICM messages for an explicit [startTime, endTime] window (unix seconds). + * This is the pagination core shared by daily fetches and resumable weekly + * day-by-day fetches. + * @param {number} startTime - Window start (unix seconds, inclusive) + * @param {number} endTime - Window end (unix seconds, inclusive) + * @param {string} updateType - Type of update ('daily' or 'weekly') for logging/timeout + * @param {Function} ownershipCheck - Optional async function that returns false if we should stop + * @returns {Promise} Array of messages within the window + */ + async fetchICMMessagesInWindow(startTime, endTime, updateType = 'weekly', ownershipCheck = null) { try { - const endTime = Math.floor(Date.now() / 1000); - const startTime = endTime - (hoursAgo * 60 * 60); const updateLabel = updateType.toUpperCase(); + const hoursAgo = Math.max(1, Math.round((endTime - startTime) / (60 * 60))); const headers = { 'Accept': 'application/json', @@ -175,8 +190,14 @@ class TeleporterService { ? Math.floor(messageTimestamp / 1000) : messageTimestamp; - // Check if the message is within our time range - if (timestampInSeconds >= startTime) { + // Half-open window [startTime, endTime): the upper bound is + // exclusive so adjacent weekly day-windows (where one day's + // startTime equals the next day's endTime) never both claim + // a message landing exactly on the shared boundary — which + // would double-count it after merge. For daily fetches + // endTime is "now", so at most a message at the current + // instant is deferred to the next run. + if (timestampInSeconds >= startTime && timestampInSeconds < endTime) { validMessages.push(message); } // Don't stop on individual old messages - they may not be chronological @@ -544,6 +565,52 @@ class TeleporterService { /** * Update weekly teleporter data (last 7 days) */ + /** + * Compute the [startTime, endTime] window (unix seconds) for a given day of + * the weekly fetch, anchored to a fixed reference end time. + * Day 1 is the most recent 24h; day 7 is the oldest. + * @param {number} anchorEndTime - Reference end of the 7-day window (unix seconds) + * @param {number} day - 1-based day index (1..7) + * @returns {{startTime: number, endTime: number}} + */ + getWeeklyDayWindow(anchorEndTime, day) { + const DAY_SECONDS = 24 * 60 * 60; + return { + startTime: anchorEndTime - (day * DAY_SECONDS), + endTime: anchorEndTime - ((day - 1) * DAY_SECONDS) + }; + } + + /** + * Merge per-day partial results into a single aggregated weekly result. + * Sums message counts per chain pair across all days and totals the raw + * message count. Pure function (no I/O) so it is easy to unit test. + * @param {Array} partialResults - Array of { messageCount: [{sourceChain, destinationChain, messageCount}], totalMessages } + * @returns {{messageCounts: Array, totalMessages: number}} + */ + mergePartialResults(partialResults) { + const counts = {}; + let totalMessages = 0; + + for (const dayResult of partialResults || []) { + totalMessages += dayResult.totalMessages || 0; + for (const pair of dayResult.messageCount || []) { + const key = `${pair.sourceChain}|${pair.destinationChain}`; + if (!counts[key]) { + counts[key] = { + sourceChain: pair.sourceChain, + destinationChain: pair.destinationChain, + messageCount: 0 + }; + } + counts[key].messageCount += pair.messageCount; + } + } + + const messageCounts = Object.values(counts).sort((a, b) => b.messageCount - a.messageCount); + return { messageCounts, totalMessages }; + } + async updateWeeklyData() { try { logger.info('[TELEPORTER WEEKLY] Starting weekly teleporter data update (last 7 days)'); @@ -635,37 +702,124 @@ class TeleporterService { } }; - // Fetch and process messages for the last 7 days (168 hours) - logger.info('[TELEPORTER WEEKLY] Fetching ICM messages for last 7 days (168 hours)...'); - const messages = await this.fetchICMMessages(168, 'weekly', ownershipCheck); // 7 * 24 = 168 hours - logger.info(`[TELEPORTER WEEKLY] Fetched ${messages.length} raw messages from Glacier API`); - - const processedData = await this.processMessages(messages); - logger.info(`[TELEPORTER WEEKLY] Processed into ${processedData.length} unique chain pairs`); + // The 7-day fetch is split into 7 independent day-windows. Each + // completed day is persisted to partialResults so that if the + // process restarts mid-update (e.g. a redeploy), the next run + // resumes from the next unfetched day instead of starting over. + const WEEKLY_DAYS = 7; + const MAX_RESUME_AGE_MS = 24 * 60 * 60 * 1000; // Resume only if the run is still recent. + + if (!updateState.progress) { + updateState.progress = { + currentDay: 1, totalDays: WEEKLY_DAYS, daysCompleted: 0, + currentChunk: 0, totalChunks: 6, messagesCollected: 0 + }; + } + + const priorDaysCompleted = updateState.progress.daysCompleted || 0; + const priorPartial = Array.isArray(updateState.partialResults) ? updateState.partialResults : []; + const refEnd = updateState.referenceEndTime; + const canResume = priorDaysCompleted >= 1 && + priorDaysCompleted < WEEKLY_DAYS && + refEnd && + priorPartial.length === priorDaysCompleted && + (Date.now() - new Date(refEnd).getTime()) < MAX_RESUME_AGE_MS; + + let anchorEndTime; + let daysCompleted; + let partial; + + if (canResume) { + anchorEndTime = Math.floor(new Date(refEnd).getTime() / 1000); + daysCompleted = priorDaysCompleted; + // Keep prior days as plain objects so merge/save are independent of Mongoose subdocs. + partial = priorPartial.map(p => ({ + day: p.day, + messageCount: (p.messageCount || []).map(m => ({ + sourceChain: m.sourceChain, + destinationChain: m.destinationChain, + messageCount: m.messageCount + })), + totalMessages: p.totalMessages, + startHoursAgo: p.startHoursAgo, + endHoursAgo: p.endHoursAgo, + processedAt: p.processedAt + })); + logger.info(`[TELEPORTER WEEKLY] Resuming update: ${daysCompleted}/${WEEKLY_DAYS} days already collected, continuing from day ${daysCompleted + 1}`); + } else { + anchorEndTime = Math.floor(Date.now() / 1000); + daysCompleted = 0; + partial = []; + updateState.referenceEndTime = new Date(anchorEndTime * 1000); + updateState.partialResults = []; + updateState.progress.daysCompleted = 0; + updateState.progress.currentDay = 1; + updateState.progress.messagesCollected = 0; + await updateState.save(); + logger.info(`[TELEPORTER WEEKLY] Starting fresh 7-day fetch (anchor ${new Date(anchorEndTime * 1000).toISOString()})`); + } + + for (let day = daysCompleted + 1; day <= WEEKLY_DAYS; day++) { + // Stop immediately if another process took over the lock. + if (!(await ownershipCheck())) { + throw new Error('Lost ownership of update lock'); + } + + const { startTime, endTime } = this.getWeeklyDayWindow(anchorEndTime, day); + logger.info(`[TELEPORTER WEEKLY] Fetching day ${day}/${WEEKLY_DAYS} (${new Date(startTime * 1000).toISOString()} → ${new Date(endTime * 1000).toISOString()})`); + + const dayMessages = await this.fetchICMMessagesInWindow(startTime, endTime, 'weekly', ownershipCheck); + const dayProcessed = await this.processMessages(dayMessages); + + partial.push({ + day, + messageCount: dayProcessed, + totalMessages: dayMessages.length, + startHoursAgo: day * 24, + endHoursAgo: (day - 1) * 24, + processedAt: new Date() + }); + + // Checkpoint this day so a restart resumes from the next one. + updateState.partialResults = partial; + updateState.progress.daysCompleted = day; + updateState.progress.currentDay = day + 1; + updateState.progress.messagesCollected = partial.reduce((sum, p) => sum + (p.totalMessages || 0), 0); + updateState.lastUpdatedAt = new Date(); + await updateState.save(); + + logger.info(`[TELEPORTER WEEKLY] ✅ Day ${day}/${WEEKLY_DAYS} complete: ${dayMessages.length} messages, ${dayProcessed.length} chain pairs (${partial.reduce((s, p) => s + (p.totalMessages || 0), 0)} messages so far)`); + } + + // All days collected — merge into the final weekly aggregate. + const merged = this.mergePartialResults(partial); + logger.info(`[TELEPORTER WEEKLY] Merged ${WEEKLY_DAYS} days into ${merged.messageCounts.length} unique chain pairs (${merged.totalMessages} total messages)`); // Save to database (replace existing weekly data) await TeleporterMessage.deleteMany({ dataType: 'weekly' }); - + const teleporterData = new TeleporterMessage({ updatedAt: new Date(), - messageCounts: processedData, - totalMessages: messages.length, + messageCounts: merged.messageCounts, + totalMessages: merged.totalMessages, timeWindow: 168, dataType: 'weekly' }); await teleporterData.save(); - // Update state to completed + // Update state to completed and clear the resume checkpoint. updateState.state = 'completed'; + updateState.partialResults = []; + updateState.referenceEndTime = null; updateState.lastUpdatedAt = new Date(); await updateState.save(); - logger.info(`[TELEPORTER WEEKLY] ✅ Successfully completed weekly update: ${messages.length} messages, ${processedData.length} chain pairs`); + logger.info(`[TELEPORTER WEEKLY] ✅ Successfully completed weekly update: ${merged.totalMessages} messages, ${merged.messageCounts.length} chain pairs`); return { success: true, - messageCount: processedData.length, - totalMessages: messages.length + messageCount: merged.messageCounts.length, + totalMessages: merged.totalMessages }; } catch (error) { diff --git a/src/services/txCountService.js b/src/services/txCountService.js index 969514f..d849f9d 100644 --- a/src/services/txCountService.js +++ b/src/services/txCountService.js @@ -1,307 +1,20 @@ const TxCount = require('../models/txCount'); -const axios = require('axios'); -const Chain = require('../models/chain'); -const config = require('../config/config'); -const logger = require('../utils/logger'); - -// Rate limiter implementation -class RateLimiter { - constructor(maxRequestsPerMinute = 30) { - this.queue = []; - this.processing = false; - this.maxRequestsPerMinute = maxRequestsPerMinute; - this.requestTimestamps = []; - } - - async enqueue(fn) { - return new Promise((resolve, reject) => { - this.queue.push({ fn, resolve, reject }); - this.processQueue(); - }); - } - - async processQueue() { - if (this.processing || this.queue.length === 0) return; - - this.processing = true; - - try { - const now = Date.now(); - this.requestTimestamps = this.requestTimestamps.filter( - timestamp => now - timestamp < 60000 - ); - - if (this.requestTimestamps.length >= this.maxRequestsPerMinute) { - const oldestTimestamp = this.requestTimestamps[0]; - const timeToWait = 60000 - (now - oldestTimestamp); - - logger.info(`Rate limit reached, waiting ${Math.round(timeToWait/1000)}s before next request`); - - setTimeout(() => { - this.processing = false; - this.processQueue(); - }, timeToWait + 100); - - return; - } - - const item = this.queue.shift(); - this.requestTimestamps.push(now); - - try { - const result = await item.fn(); - item.resolve(result); - } catch (error) { - item.reject(error); - } - - setTimeout(() => { - this.processing = false; - this.processQueue(); - }, 300); - } catch (error) { - logger.error('Error in rate limiter:', error); - this.processing = false; - } - } -} - -const metricsApiRateLimiter = new RateLimiter(config.api.metrics.rateLimit.requestsPerMinute || 20); - -class TxCountService { - async updateTxCountData(chainId, retryCount = config.api.metrics.rateLimit.maxRetries || 3) { - return metricsApiRateLimiter.enqueue(async () => { - for (let attempt = 1; attempt <= retryCount; attempt++) { - try { - logger.info(`[TxCount Update] Starting update for chain ${chainId} (Attempt ${attempt}/${retryCount})`); - - const headers = { - 'Accept': 'application/json', - 'User-Agent': 'l1beat-backend', - 'Cache-Control': 'no-cache' - }; - - if (process.env.GLACIER_API_KEY) { - headers['x-api-key'] = process.env.GLACIER_API_KEY; - } - - const response = await axios.get(`${config.api.metrics.baseUrl}/chains/${chainId}/metrics/txCount`, { - params: { - timeInterval: 'day', - pageSize: 100 // Maximum allowed by API - }, - timeout: config.api.metrics.timeout, - headers - }); - - if (!response.data || !Array.isArray(response.data.results)) { - logger.warn(`[TxCount Update] Invalid response format for chain ${chainId}`); - continue; - } - - const currentTime = Math.floor(Date.now() / 1000); - const thirtyDaysAgo = currentTime - (30 * 24 * 60 * 60); - - const validData = response.data.results.filter(item => { - const timestamp = Number(item.timestamp); - const value = parseFloat(item.value); - - if (isNaN(timestamp) || isNaN(value)) { - return false; - } - - return timestamp >= thirtyDaysAgo && timestamp <= currentTime; - }); - - if (validData.length > 0) { - const result = await TxCount.bulkWrite( - validData.map(item => ({ - updateOne: { - filter: { - chainId: chainId, - timestamp: Number(item.timestamp) - }, - update: { - $set: { - value: parseFloat(item.value), - lastUpdated: new Date() - } - }, - upsert: true - } - })), - { ordered: false } - ); - - logger.info(`[TxCount Update] Updated chain ${chainId}:`, { - upserted: result.upsertedCount, - modified: result.modifiedCount, - total: validData.length - }); - - return { - success: true, - chainId, - recordsProcessed: validData.length, - upserted: result.upsertedCount, - modified: result.modifiedCount - }; - } - - logger.info(`[TxCount Update] No valid data for chain ${chainId}`); - return { - success: true, - chainId, - recordsProcessed: 0, - message: 'No valid data points' - }; - - } catch (error) { - logger.error(`[TxCount Update] Error for chain ${chainId} (Attempt ${attempt}/${retryCount}):`, error.message); - - if (attempt === retryCount) { - return { - success: false, - chainId, - error: error.message - }; - } - - await new Promise(resolve => setTimeout(resolve, 2000 * attempt)); - } - } - }); - } - - async updateAllChainsTxCount() { - try { - logger.info('[TxCount] Starting update for all chains'); - - const chains = await Chain.find({}); - const results = []; - - for (const chain of chains) { - const chainId = chain.evmChainId || chain.chainId; - - if (!chainId || !/^\d+$/.test(String(chainId))) { - logger.warn(`[TxCount] Skipping chain with invalid ID:`, chain.name); - continue; - } - - const result = await this.updateTxCountData(String(chainId)); - results.push(result); - } - - const successful = results.filter(r => r.success).length; - const failed = results.filter(r => !r.success).length; - - logger.info('[TxCount] Update completed:', { - total: chains.length, - successful, - failed - }); - - return { success: true, results }; - } catch (error) { - logger.error('[TxCount] Error updating all chains:', error); - return { success: false, error: error.message }; - } - } - - async getTxCountHistory(chainId, days = 30) { - try { - const endTime = Math.floor(Date.now() / 1000); - const startTime = endTime - (days * 24 * 60 * 60); - - const data = await TxCount.find({ - chainId: String(chainId), - timestamp: { $gte: startTime, $lte: endTime } - }) - .sort({ timestamp: 1 }) - .lean(); - - return data; - } catch (error) { - logger.error(`[TxCount] Error fetching history for chain ${chainId}:`, error); - throw error; - } - } - - async getLatestTxCount(chainId) { - try { - const latestRecord = await TxCount.findOne({ - chainId: String(chainId) - }) - .sort({ timestamp: -1 }) - .lean(); - - return latestRecord; - } catch (error) { - logger.error(`[TxCount] Error fetching latest for chain ${chainId}:`, error); - throw error; - } - } - - async getNetworkTxCountHistory(days = 30) { - try { - const endTime = Math.floor(Date.now() / 1000); - const startTime = endTime - (days * 24 * 60 * 60); - - const data = await TxCount.find({ - timestamp: { $gte: startTime, $lte: endTime } - }) - .sort({ timestamp: 1 }) - .lean(); - - // Group by timestamp and sum tx counts - const groupedData = {}; - data.forEach(record => { - if (!groupedData[record.timestamp]) { - groupedData[record.timestamp] = { - timestamp: record.timestamp, - value: 0 - }; - } - groupedData[record.timestamp].value += record.value; - }); - - return Object.values(groupedData).sort((a, b) => a.timestamp - b.timestamp); - } catch (error) { - logger.error('[TxCount] Error fetching network history:', error); - throw error; - } - } - - async getNetworkLatestTxCount() { - try { - // Get the most recent timestamp - const latestRecord = await TxCount.findOne() - .sort({ timestamp: -1 }) - .lean(); - - if (!latestRecord) { - return null; - } - - const latestTimestamp = latestRecord.timestamp; - - // Get all records for that timestamp and sum them - const records = await TxCount.find({ - timestamp: latestTimestamp - }).lean(); - - const totalTxCount = records.reduce((sum, record) => sum + record.value, 0); - - return { - timestamp: latestTimestamp, - value: totalTxCount, - chainCount: records.length - }; - } catch (error) { - logger.error('[TxCount] Error fetching network latest:', error); - throw error; - } - } -} - -module.exports = new TxCountService(); +const createMetricService = require('./metricService'); + +const service = createMetricService({ + model: TxCount, + metricPath: 'txCount', + label: 'TxCount', + aggregation: 'sum' +}); + +// Preserve the original public method names so existing routes/cron callers +// continue to work unchanged. +module.exports = { + updateTxCountData: (chainId, retryCount) => service.updateData(chainId, retryCount), + updateAllChainsTxCount: () => service.updateAllChains(), + getTxCountHistory: (chainId, days) => service.getHistory(chainId, days), + getLatestTxCount: (chainId) => service.getLatest(chainId), + getNetworkTxCountHistory: (days) => service.getNetworkHistory(days), + getNetworkLatestTxCount: () => service.getNetworkLatest() +}; diff --git a/src/utils/fetchGlacierData.js b/src/utils/fetchGlacierData.js deleted file mode 100644 index d80208d..0000000 --- a/src/utils/fetchGlacierData.js +++ /dev/null @@ -1,87 +0,0 @@ -const chainService = require('../services/chainService'); -const chainDataService = require('../services/chainDataService'); -const Chain = require('../models/chain'); -const tpsService = require('../services/tpsService'); - -const fetchAndUpdateData = async () => { - try { - console.log(`[${process.env.NODE_ENV}] Starting scheduled data update...`); - - // Fetch chain data from Glacier API first - const chains = await chainDataService.fetchChainData(); - console.log(`Fetched ${chains.length} chains from Glacier API`); - - // Only proceed if we successfully fetched new data - if (chains && chains.length > 0) { - try { - // Check if we're using a replica set (production) or standalone (development) - const isReplicaSet = process.env.NODE_ENV === 'production'; - - if (isReplicaSet) { - // Production: Use transactions - const session = await Chain.startSession(); - try { - await session.withTransaction(async () => { - await updateChains(chains, session); - }); - } finally { - await session.endSession(); - } - } else { - // Development: Direct updates without transaction - await updateChains(chains); - } - - console.log(`Processed ${chains.length} unique chains successfully`); - } catch (error) { - console.error('Error updating chains:', error); - throw error; - } - } else { - console.warn('No chains fetched from API, skipping database update'); - } - - // Update TPS data for each chain - for (const chain of chains) { - try { - await tpsService.updateTpsData(chain.chainId); - console.log(`Updated TPS data for chain ${chain.chainId}`); - } catch (error) { - console.error(`Failed to update TPS data for chain ${chain.chainId}:`, error); - } - } - - console.log(`[${process.env.NODE_ENV}] Data update completed successfully`); - - } catch (error) { - console.error(`[${process.env.NODE_ENV}] Error in scheduled data update:`, { - message: error.message, - stack: error.stack, - timestamp: new Date().toISOString() - }); - throw error; - } -}; - -// Helper function to update chains -async function updateChains(chains, session = null) { - const options = session ? { session } : {}; - - // Delete existing data - await Chain.deleteMany({}, options); - - const processedInThisCycle = new Set(); - - // Update chains - for (const chain of chains) { - if (processedInThisCycle.has(chain.chainId)) { - console.log(`Skipping duplicate chain ${chain.chainId} in current cycle`); - continue; - } - - await chainService.updateChain(chain); - processedInThisCycle.add(chain.chainId); - } -} - -module.exports = fetchAndUpdateData; diff --git a/tests/health.test.js b/tests/health.test.js index b3c336c..9b5684b 100644 --- a/tests/health.test.js +++ b/tests/health.test.js @@ -20,5 +20,30 @@ describe('Health Check Endpoints', () => { expect(response.headers['content-type']).toMatch(/json/); }); + + it('should report uptime and mongodb dependency state', async () => { + const response = await get('/health'); + + expect(typeof response.body.uptime).toBe('number'); + expect(response.body).toHaveProperty('dependencies.mongodb'); + expect(['connected', 'disconnected']).toContain(response.body.dependencies.mongodb); + }); + }); + + describe('GET /health/ready', () => { + it('should return a readiness status and reflect mongodb state in the code', async () => { + const response = await get('/health/ready'); + + // 200 when Mongo is connected, 503 when it is draining/disconnected. + expect([200, 503]).toContain(response.status); + expect(response.body).toHaveProperty('status'); + if (response.body.dependencies.mongodb === 'connected') { + expect(response.status).toBe(200); + expect(response.body.status).toBe('ok'); + } else { + expect(response.status).toBe(503); + expect(response.body.status).toBe('not_ready'); + } + }); }); }); diff --git a/tests/metricService.test.js b/tests/metricService.test.js new file mode 100644 index 0000000..320feeb --- /dev/null +++ b/tests/metricService.test.js @@ -0,0 +1,120 @@ +/** + * Unit tests for the shared metric service factory. + * Focuses on the one behavioral branch that differs between metrics: + * network-wide aggregation (sum vs. average). + */ + +// Chain model is required by the factory but unused in these query paths. +jest.mock('../src/models/chain', () => ({ find: jest.fn() })); +jest.mock('axios'); + +const axios = require('axios'); +const createMetricService = require('../src/services/metricService'); + +/** + * Build a fake Mongoose-like model backed by an in-memory row set. + * Supports the chainable find().sort().lean() / findOne().sort().lean() shapes + * the factory uses. + */ +function fakeModel(rows) { + const chainable = (result) => ({ + sort: () => ({ lean: async () => result }), + lean: async () => result + }); + return { + find: (query = {}) => { + let result = rows; + // Exact-timestamp lookup (used by network-latest). Range queries return + // everything here so the test exercises aggregation, not date filtering. + if (query.timestamp && typeof query.timestamp === 'number') { + result = rows.filter(r => r.timestamp === query.timestamp); + } + return chainable(result); + }, + findOne: () => { + const latest = rows.length + ? rows.reduce((a, b) => (b.timestamp > a.timestamp ? b : a)) + : null; + return chainable(latest); + } + }; +} + +describe('createMetricService aggregation', () => { + const rows = [ + { chainId: '1', timestamp: 100, value: 10 }, + { chainId: '2', timestamp: 100, value: 20 }, + { chainId: '1', timestamp: 200, value: 30 } + ]; + + describe("sum aggregation", () => { + const svc = createMetricService({ + model: fakeModel(rows), metricPath: 'x', label: 'X', aggregation: 'sum' + }); + + it('sums values across chains per timestamp in network history', async () => { + const history = await svc.getNetworkHistory(30); + expect(history).toEqual([ + { timestamp: 100, value: 30 }, + { timestamp: 200, value: 30 } + ]); + }); + + it('sums values for the latest timestamp in network latest', async () => { + const latest = await svc.getNetworkLatest(); + // Latest timestamp is 200, which has a single chain with value 30. + expect(latest).toEqual({ timestamp: 200, value: 30, chainCount: 1 }); + }); + }); + + describe("avg aggregation", () => { + const svc = createMetricService({ + model: fakeModel(rows), metricPath: 'x', label: 'X', aggregation: 'avg' + }); + + it('averages values across chains per timestamp in network history', async () => { + const history = await svc.getNetworkHistory(30); + expect(history).toEqual([ + { timestamp: 100, value: 15 }, // (10 + 20) / 2 + { timestamp: 200, value: 30 } // 30 / 1 + ]); + }); + + it('averages values for the latest timestamp in network latest', async () => { + const latest = await svc.getNetworkLatest(); + expect(latest).toEqual({ timestamp: 200, value: 30, chainCount: 1 }); + }); + }); + + it('returns null from network latest when there is no data', async () => { + const svc = createMetricService({ + model: fakeModel([]), metricPath: 'x', label: 'X' + }); + expect(await svc.getNetworkLatest()).toBeNull(); + }); + + it('validates required options', () => { + expect(() => createMetricService({ metricPath: 'x', label: 'X' })).toThrow(/model/); + expect(() => createMetricService({ model: {}, label: 'X' })).toThrow(/metricPath/); + expect(() => createMetricService({ model: {}, metricPath: 'x' })).toThrow(/label/); + }); +}); + +describe('createMetricService updateData failure handling', () => { + afterEach(() => jest.restoreAllMocks()); + + it('returns a failure result (not undefined) when every attempt gets a non-array response', async () => { + // API responds 200 with a body that has no `results` array, on every retry. + axios.get.mockResolvedValue({ data: {} }); + + const svc = createMetricService({ model: {}, metricPath: 'x', label: 'X' }); + const result = await svc.updateData('123', 2); // retryCount=2 + + // The bug this guards: falling off the retry loop returned undefined, which + // crashed updateAllChains on `undefined.success`. + expect(result).toBeDefined(); + expect(result.success).toBe(false); + expect(result.chainId).toBe('123'); + expect(result.error).toMatch(/no valid response/i); + }); +}); diff --git a/tests/teleporterWeekly.test.js b/tests/teleporterWeekly.test.js new file mode 100644 index 0000000..0fe2b73 --- /dev/null +++ b/tests/teleporterWeekly.test.js @@ -0,0 +1,160 @@ +/** + * Tests for the resumable weekly Teleporter update. + * + * Mocks the model layer so updateWeeklyData can run without a database, then: + * - exercises the pure helpers (mergePartialResults, getWeeklyDayWindow) + * - simulates a mid-run crash and verifies the next run RESUMES from the + * next unfetched day rather than starting over. + */ + +// In-memory model mock. `store` lives inside the factory (jest.mock hoisting), +// and is exposed via __store / __reset for the test to seed and inspect. +jest.mock('../src/models/teleporterMessage', () => { + const store = { weekly: null, saved: [] }; + + const TeleporterUpdateState = { + find: () => ({ sort: async () => (store.weekly ? [store.weekly] : []) }), + deleteMany: async () => ({ deletedCount: 0 }), + updateMany: async () => ({ modifiedCount: 0 }), + updateOne: async () => ({ upsertedCount: 0 }), + findOneAndUpdate: async () => { + const doc = store.weekly; + if (!doc || doc.state === 'in_progress') return null; // lock held + doc.state = 'in_progress'; + doc.startedAt = new Date(); + doc.lastUpdatedAt = new Date(); + doc.error = null; + return doc; + }, + findOne: async () => store.weekly + }; + + class TeleporterMessage { + constructor(data) { Object.assign(this, data); } + async save() { store.saved.push(this); } + static async deleteMany() { return { deletedCount: 0 }; } + } + + return { + TeleporterMessage, + TeleporterUpdateState, + __store: store, + __reset: () => { + store.saved = []; + store.weekly = { + updateType: 'weekly', + state: 'idle', + startedAt: null, + lastUpdatedAt: new Date(), + error: null, + referenceEndTime: null, + partialResults: [], + progress: { + currentDay: 1, totalDays: 7, daysCompleted: 0, + currentChunk: 0, totalChunks: 6, messagesCollected: 0 + }, + async save() { /* in-place; we hold the reference */ } + }; + } + }; +}); + +const teleporterService = require('../src/services/teleporterService'); +const { __store, __reset } = require('../src/models/teleporterMessage'); + +describe('TeleporterService pure helpers', () => { + it('getWeeklyDayWindow returns contiguous, non-overlapping 24h windows', () => { + const anchor = 7 * 24 * 3600; // arbitrary anchor in seconds + const day1 = teleporterService.getWeeklyDayWindow(anchor, 1); + const day2 = teleporterService.getWeeklyDayWindow(anchor, 2); + + expect(day1.endTime).toBe(anchor); // day 1 ends at the anchor + expect(day1.endTime - day1.startTime).toBe(86400); // exactly 24h + expect(day2.endTime).toBe(day1.startTime); // contiguous, no gap/overlap + }); + + it('mergePartialResults sums counts per chain pair and totals messages', () => { + const merged = teleporterService.mergePartialResults([ + { totalMessages: 3, messageCount: [ + { sourceChain: 'A', destinationChain: 'B', messageCount: 2 }, + { sourceChain: 'C', destinationChain: 'D', messageCount: 1 } + ] }, + { totalMessages: 5, messageCount: [ + { sourceChain: 'A', destinationChain: 'B', messageCount: 5 } + ] } + ]); + + expect(merged.totalMessages).toBe(8); + // A|B summed across days (2 + 5 = 7) and sorted first by count desc. + expect(merged.messageCounts).toEqual([ + { sourceChain: 'A', destinationChain: 'B', messageCount: 7 }, + { sourceChain: 'C', destinationChain: 'D', messageCount: 1 } + ]); + }); +}); + +describe('TeleporterService resumable weekly update', () => { + let fetchSpy; + let crashArmed; + + beforeEach(() => { + __reset(); + crashArmed = true; + + // processMessages → one chain pair whose count is the day's message volume. + jest.spyOn(teleporterService, 'processMessages').mockImplementation(async (msgs) => ( + [{ sourceChain: 'A', destinationChain: 'B', messageCount: msgs.length }] + )); + + // Each day returns (day * 10) messages. Day is inferred from how many days + // are already completed. Crash once, when about to fetch day 4. + fetchSpy = jest.spyOn(teleporterService, 'fetchICMMessagesInWindow').mockImplementation(async () => { + const dayBeingFetched = __store.weekly.progress.daysCompleted + 1; + if (crashArmed && dayBeingFetched === 4) { + throw new Error('simulated crash mid-update'); + } + return new Array(dayBeingFetched * 10).fill({}); + }); + }); + + afterEach(() => jest.restoreAllMocks()); + + it('checkpoints each day and resumes from the crash point without refetching', async () => { + // First run crashes while fetching day 4. + await expect(teleporterService.updateWeeklyData()).rejects.toThrow('simulated crash'); + + // Days 1-3 are checkpointed; state is failed but progress is preserved. + expect(__store.weekly.state).toBe('failed'); + expect(__store.weekly.progress.daysCompleted).toBe(3); + expect(__store.weekly.partialResults).toHaveLength(3); + expect(__store.weekly.referenceEndTime).toBeTruthy(); + // 4 calls: days 1-3 succeeded, the day-4 call threw. + expect(fetchSpy).toHaveBeenCalledTimes(4); + + // Second run resumes (no crash this time). + crashArmed = false; + const result = await teleporterService.updateWeeklyData(); + + expect(result.success).toBe(true); + // Resume re-fetches only days 4-7 (4 more) → 8 total. A fresh restart would + // have re-fetched days 1-7 (7 more) for 11 total, so 8 proves days 1-3 were + // not refetched. + expect(fetchSpy).toHaveBeenCalledTimes(8); + + // Final merged total = 10+20+...+70 = 280, all into the single A|B pair. + expect(result.totalMessages).toBe(280); + expect(result.messageCount).toBe(1); + + const savedWeekly = __store.saved.find(d => d.dataType === 'weekly'); + expect(savedWeekly).toBeTruthy(); + expect(savedWeekly.totalMessages).toBe(280); + expect(savedWeekly.messageCounts).toEqual([ + { sourceChain: 'A', destinationChain: 'B', messageCount: 280 } + ]); + + // Completion clears the resume checkpoint. + expect(__store.weekly.state).toBe('completed'); + expect(__store.weekly.partialResults).toHaveLength(0); + expect(__store.weekly.referenceEndTime).toBeNull(); + }); +}); diff --git a/tests/teleporterWindow.test.js b/tests/teleporterWindow.test.js new file mode 100644 index 0000000..2b6bdd7 --- /dev/null +++ b/tests/teleporterWindow.test.js @@ -0,0 +1,54 @@ +/** + * Regression test for the weekly day-window boundary fix. + * + * Adjacent day-windows share a boundary timestamp (day N's startTime equals + * day N+1's endTime). The per-message filter uses a half-open interval + * [startTime, endTime), so a message landing exactly on the shared boundary + * must be counted in exactly ONE window — never both (which would double-count + * it after mergePartialResults). + */ + +jest.mock('axios'); +const axios = require('axios'); +const teleporterService = require('../src/services/teleporterService'); + +const ANCHOR = 1_000_000_000; // seconds (< 1e12 so no ms conversion) +const DAY = 24 * 60 * 60; + +const messages = [ + { id: 'a', sourceTransaction: { timestamp: ANCHOR - DAY / 2 } }, // inside day 1 + { id: 'boundary', sourceTransaction: { timestamp: ANCHOR - DAY } }, // the shared boundary + { id: 'b', sourceTransaction: { timestamp: ANCHOR - DAY - DAY / 2 } } // inside day 2 +]; + +describe('fetchICMMessagesInWindow boundary handling', () => { + beforeEach(() => { + teleporterService.GLACIER_API_BASE = 'http://glacier.test'; + // Same full page returned for every call; the client-side filter decides + // which messages belong to the requested window. + axios.get.mockResolvedValue({ data: { messages, nextPageToken: null } }); + }); + + afterEach(() => jest.restoreAllMocks()); + + it('assigns a boundary-timestamp message to exactly one of two adjacent windows', async () => { + const day1 = teleporterService.getWeeklyDayWindow(ANCHOR, 1); // [ANCHOR-DAY, ANCHOR) + const day2 = teleporterService.getWeeklyDayWindow(ANCHOR, 2); // [ANCHOR-2DAY, ANCHOR-DAY) + + // Sanity: the windows share a boundary. + expect(day1.startTime).toBe(day2.endTime); + + const day1Msgs = await teleporterService.fetchICMMessagesInWindow(day1.startTime, day1.endTime, 'weekly'); + const day2Msgs = await teleporterService.fetchICMMessagesInWindow(day2.startTime, day2.endTime, 'weekly'); + + const ids1 = day1Msgs.map(m => m.id).sort(); + const ids2 = day2Msgs.map(m => m.id).sort(); + + // Boundary message belongs to day 1 (startTime is inclusive), not day 2 + // (endTime is exclusive) — counted exactly once across the two windows. + expect(ids1).toEqual(['a', 'boundary']); + expect(ids2).toEqual(['b']); + expect(ids1).toContain('boundary'); + expect(ids2).not.toContain('boundary'); + }); +}); diff --git a/vercel.json b/vercel.json deleted file mode 100644 index f908c66..0000000 --- a/vercel.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "version": 2, - "builds": [ - { - "src": "src/app.js", - "use": "@vercel/node" - } - ], - "routes": [ - { - "src": "/(.*)", - "dest": "src/app.js" - } - ], - "env": { - "VERCEL": "1" - } -} \ No newline at end of file