diff --git a/.gitignore b/.gitignore index a5cd8b1a..8b910e99 100644 --- a/.gitignore +++ b/.gitignore @@ -65,3 +65,8 @@ typings/ ecosystem.config.js package-lock.json connections.json +api_access.log +api_error.log +explorer +api/explorer +.ecosystem.config.js.swp diff --git a/api/handlers/health.js b/api/handlers/health.js index 7fbefbcf..a561591e 100644 --- a/api/handlers/health.js +++ b/api/handlers/health.js @@ -8,10 +8,23 @@ const system_domain = process.env.SYSTEM_DOMAIN; const ecosystem = require('../../ecosystem.config'); function checkFeat(name) { - return currentENV[name] === 'true'; + if (currentENV) { + return currentENV[name] === 'true'; + } else { + return null; + } } -const currentENV = ecosystem.apps.find(app => app.env.CHAIN === process.env.CHAIN)['env']; +let currentENV; +if (ecosystem.apps.length > 1) { + const indexerApp = ecosystem.apps.find(app => { + return app.env.CHAIN === process.env.CHAIN && app.script === "./launcher.js"; + }); + + if (indexerApp) { + currentENV = indexerApp['env']; + } +} // get current github version let last_commit_hash; @@ -108,49 +121,58 @@ async function checkElastic(elastic) { } module.exports = function (fastify, opts, next) { - fastify.get('/health', {}, async (request, reply) => { - const t0 = Date.now(); - const {redis, elastic} = fastify; + fastify.route({ + url: '/health', + method: 'GET', + schema: { + tags: ['status'], + summary: "API Service Health Report" + }, + handler: async (request, reply) => { + const t0 = Date.now(); + const {redis, elastic} = fastify; - let cachedResponse, hash; - [cachedResponse, hash] = await getCacheByHash(redis, 'health'); - if (cachedResponse) { - cachedResponse = JSON.parse(cachedResponse); - cachedResponse['query_time'] = Date.now() - t0; - cachedResponse['cached'] = true; - reply.send(cachedResponse); - return; - } + let cachedResponse, hash; + [cachedResponse, hash] = await getCacheByHash(redis, 'health'); + if (cachedResponse) { + cachedResponse = JSON.parse(cachedResponse); + cachedResponse['query_time'] = Date.now() - t0; + cachedResponse['cached'] = true; + reply.send(cachedResponse); + return; + } - let response = { - version_hash: last_commit_hash, - features: { - indices: { - all_deltas: checkFeat('INDEX_ALL_DELTAS'), - transfer_memo: checkFeat('INDEX_TRANSFER_MEMO'), - }, - tables: { - proposals: checkFeat('PROPOSAL_STATE'), - accounts: checkFeat('ACCOUNT_STATE'), - voters: checkFeat('VOTERS_STATE') + let response = { + version_hash: last_commit_hash, + host: process.env.SERVER_NAME, + features: { + indices: { + all_deltas: checkFeat('INDEX_ALL_DELTAS'), + transfer_memo: checkFeat('INDEX_TRANSFER_MEMO'), + }, + tables: { + proposals: checkFeat('PROPOSAL_STATE'), + accounts: checkFeat('ACCOUNT_STATE'), + voters: checkFeat('VOTERS_STATE') + }, + stream: { + traces: checkFeat('STREAM_TRACES'), + deltas: checkFeat('STREAM_DELTAS') + } }, - stream: { - traces: checkFeat('STREAM_TRACES'), - deltas: checkFeat('STREAM_DELTAS') - } - }, - health: [] - }; + health: [] + }; - response.health.push(await checkRabbit()); - response.health.push(await checkNodeos()); - response.health.push(await checkRedis(redis)); - response.health.push(await checkElastic(elastic)); - response['query_time'] = Date.now() - t0; + response.health.push(await checkRabbit()); + response.health.push(await checkNodeos()); + response.health.push(await checkRedis(redis)); + response.health.push(await checkElastic(elastic)); + response['query_time'] = Date.now() - t0; - // prevent abuse of the health endpoint - redis.set(hash, JSON.stringify(response), 'EX', 10); - reply.send(response); + // prevent abuse of the health endpoint + redis.set(hash, JSON.stringify(response), 'EX', 10); + reply.send(response); + } }); next(); }; diff --git a/api/handlers/history/getActions.js b/api/handlers/history/getActions.js index 5a29c042..89f81d37 100644 --- a/api/handlers/history/getActions.js +++ b/api/handlers/history/getActions.js @@ -1,11 +1,33 @@ const {getActionsSchema} = require("../../schemas"); const {getCacheByHash, mergeActionMeta} = require("../../helpers/functions"); -const _ = require('lodash'); const maxActions = 1000; const route = '/get_actions'; -const terms = ["notified.keyword", "act.authorization.actor"]; -const extendedActions = new Set(["transfer", "newaccount", "updateauth"]); + +const terms = [ + "notified.keyword", + "act.authorization.actor" +]; + +const extendedActions = new Set([ + "transfer", + "newaccount", + "updateauth", + "buyram", + "buyrambytes" +]); + +const primaryTerms = [ + "notified", + "block_num", + "global_sequence", + "producer", + "@timestamp", + "creator_action_ordinal", + "action_ordinal", + "cpu_usage_us", + "net_usage_words" +]; const enable_caching = process.env.ENABLE_CACHING === 'true'; let cache_life = 30; @@ -13,113 +35,103 @@ if (process.env.CACHE_LIFE) { cache_life = parseInt(process.env.CACHE_LIFE); } -async function getActions(fastify, request) { - const t0 = Date.now(); - const {redis, elastic, eosjs} = fastify; - let cachedResponse, hash; - if (enable_caching) { - [cachedResponse, hash] = await getCacheByHash(redis, route + JSON.stringify(request.query)); - if (cachedResponse) { - cachedResponse = JSON.parse(cachedResponse); - cachedResponse['query_time'] = Date.now() - t0; - cachedResponse['cached'] = true; - return cachedResponse; - } - } - - const should_array = []; - for (const entry of terms) { - const tObj = {term: {}}; - tObj.term[entry] = request.query.account; - should_array.push(tObj); - } - let code, method, skip, limit, parent; - let sort_direction = 'desc'; - let filterObj = []; - if (request.query.filter) { - const filters = request.query.filter.split(','); - for (const filter of filters) { - const obj = {bool: {must: []}}; - const parts = filter.split(':'); - if (parts.length === 2) { - [code, method] = parts; - if (code && code !== "*") { - obj.bool.must.push({'term': {'act.account': code}}); - } - if (method && method !== "*") { - obj.bool.must.push({'term': {'act.name': method}}); - } +function getTrackTotalHits(query) { + let trackTotalHits = 15000; + if (query.track) { + if (query.track === 'true') { + trackTotalHits = true; + } else if (query.track === 'false') { + trackTotalHits = false; + } else { + trackTotalHits = parseInt(query.track, 10); + if (trackTotalHits !== trackTotalHits) { + throw new Error('failed to parse track param'); } - filterObj.push(obj); } } - skip = parseInt(request.query.skip, 10); - if (skip < 0) { - return 'invalid skip parameter'; - } - limit = parseInt(request.query.limit, 10); - if (limit < 1) { - return 'invalid limit parameter'; - } + return trackTotalHits; +} - if (request.query.sort) { - if (request.query.sort === 'asc' || request.query.sort === '1') { - sort_direction = 'asc'; - } else if (request.query.sort === 'desc' || request.query.sort === '-1') { - sort_direction = 'desc' - } else { - return 'invalid sort direction'; - } +function addSortedBy(query, queryBody, sort_direction) { + if (query['sortedBy']) { + const opts = query['sortedBy'].split(":"); + const sortedByObj = {}; + sortedByObj[opts[0]] = opts[1]; + queryBody['sort'] = sortedByObj; + } else { + queryBody['sort'] = { + "global_sequence": sort_direction + }; } +} - const queryStruct = { - "bool": { - must: [], - boost: 1.0 +function processMultiVars(queryStruct, parts, field) { + const must = []; + const mustNot = []; + + parts.forEach(part => { + if (part.startsWith("!")) { + mustNot.push(part.replace("!", "")); + } else { + must.push(part); } - }; + }); - if (request.query.parent !== undefined) { - queryStruct.bool['filter'] = []; - queryStruct.bool['filter'].push({ - "term": { - "parent": parseInt(request.query.parent, 10) + if (must.length > 1) { + queryStruct.bool.must.push({ + bool: { + should: must.map(elem => { + const _q = {}; + _q[field] = elem; + return {term: _q} + }) } }); + } else if (must.length === 1) { + const mustQuery = {}; + mustQuery[field] = must[0]; + queryStruct.bool.must.push({term: mustQuery}); } - if (request.query.account) { - queryStruct.bool.must.push({"bool": {should: should_array}}); - } - - for (const prop in request.query) { - if (Object.prototype.hasOwnProperty.call(request.query, prop)) { - const actionName = prop.split(".")[0]; - if (prop.split(".").length > 1) { - if (extendedActions.has(actionName)) { - const _termQuery = {}; - _termQuery["@" + prop] = request.query[prop]; - queryStruct.bool.must.push({term: _termQuery}); - } else { - const _termQuery = {}; - _termQuery[prop] = request.query[prop]; - queryStruct.bool.must.push({term: _termQuery}); - } + if (mustNot.length > 1) { + queryStruct.bool.must_not.push({ + bool: { + should: mustNot.map(elem => { + const _q = {}; + _q[field] = elem; + return {term: _q} + }) } - } + }); + } else if (mustNot.length === 1) { + const mustNotQuery = {}; + mustNotQuery[field] = mustNot[0].replace("!", ""); + queryStruct.bool.must_not.push({term: mustNotQuery}); } +} - if (request.query['after'] || request.query['before']) { +function addRangeQuery(queryStruct, prop, pkey, query) { + const _termQuery = {}; + const parts = query[prop].split("-"); + _termQuery[pkey] = { + "gte": parts[0], + "lte": parts[1] + }; + queryStruct.bool.must.push({range: _termQuery}); +} + +function applyTimeFilter(query, queryStruct) { + if (query['after'] || query['before']) { let _lte = "now"; let _gte = 0; - if (request.query['before']) { - _lte = request.query['before']; + if (query['before']) { + _lte = query['before']; if (!_lte.endsWith("Z")) { _lte += "Z"; } } - if (request.query['after']) { - _gte = request.query['after']; + if (query['after']) { + _gte = query['after']; if (!_gte.endsWith("Z")) { _gte += "Z"; } @@ -136,60 +148,218 @@ async function getActions(fastify, request) { } }); } +} + +function applyGenericFilters(query, queryStruct) { + for (const prop in query) { + if (Object.prototype.hasOwnProperty.call(query, prop)) { + const pair = prop.split("."); + if (pair.length > 1 || primaryTerms.includes(pair[0])) { + let pkey; + if (pair.length > 1) { + pkey = extendedActions.has(pair[0]) ? "@" + prop : prop; + } else { + pkey = prop; + } + if (query[prop].indexOf("-") !== -1) { + addRangeQuery(queryStruct, prop, pkey, query); + } else { + const _termQuery = {}; + const parts = query[prop].split(","); + if (parts.length > 1) { + processMultiVars(queryStruct, parts, prop); + } else if (parts.length === 1) { + const andParts = parts[0].split(" "); + if (andParts.length > 1) { + andParts.forEach(value => { + const _q = {}; + console.log(value); + _q[pkey] = value; + queryStruct.bool.must.push({term: _q}); + }); + } else { + if (parts[0].startsWith("!")) { + _termQuery[pkey] = parts[0].replace("!", ""); + queryStruct.bool.must_not.push({term: _termQuery}); + } else { + _termQuery[pkey] = parts[0]; + queryStruct.bool.must.push({term: _termQuery}); + } + } + } + } + } + } + } +} + +function makeShouldArray(query) { + const should_array = []; + for (const entry of terms) { + const tObj = {term: {}}; + tObj.term[entry] = query.account; + should_array.push(tObj); + } + return should_array; +} - if (request.query.filter) { +function applyCodeActionFilters(query, queryStruct) { + let filterObj = []; + if (query.filter) { + for (const filter of query.filter.split(',')) { + const _arr = []; + const parts = filter.split(':'); + if (parts.length === 2) { + [code, method] = parts; + if (code && code !== "*") { + _arr.push({'term': {'act.account': code}}); + } + if (method && method !== "*") { + _arr.push({'term': {'act.name': method}}); + } + } + if (_arr.length > 0) { + filterObj.push({bool: {must: _arr}}); + } + } queryStruct.bool['should'] = filterObj; queryStruct.bool['minimum_should_match'] = 1; } +} - let trackTotalHits = 10000; - if (request.query.track) { - if (request.query.track === 'true') { - trackTotalHits = true; - } else if (request.query.track === 'false') { - trackTotalHits = false; +function getSkipLimit(query) { + let skip, limit; + skip = parseInt(query.skip, 10); + if (skip < 0) { + throw new Error('invalid skip parameter'); + } + limit = parseInt(query.limit, 10); + if (limit < 1) { + throw new Error('invalid limit parameter'); + } + return {skip, limit}; +} + +function getSortDir(query) { + let sort_direction = 'desc'; + if (query.sort) { + if (query.sort === 'asc' || query.sort === '1') { + sort_direction = 'asc'; + } else if (query.sort === 'desc' || query.sort === '-1') { + sort_direction = 'desc' } else { - trackTotalHits = parseInt(request.query.track, 10); - if (trackTotalHits !== trackTotalHits) { - throw new Error('failed to parse track param'); - } + throw new Error('invalid sort direction'); } } + return sort_direction; +} +function applyAccountFilters(query, queryStruct) { + if (query.account) { + queryStruct.bool.must.push({"bool": {should: makeShouldArray(query)}}); + } +} + +async function getActions(fastify, request) { + const t0 = Date.now(); + const {redis, elastic, eosjs} = fastify; + const query = request.query; + let cachedResponse, hash; + if (enable_caching) { + [cachedResponse, hash] = await getCacheByHash(redis, route + JSON.stringify(query)); + if (cachedResponse) { + cachedResponse = JSON.parse(cachedResponse); + cachedResponse['query_time'] = Date.now() - t0; + cachedResponse['cached'] = true; + return cachedResponse; + } + } + + const queryStruct = { + "bool": { + must: [], + must_not: [], + boost: 1.0 + } + }; + + const {skip, limit} = getSkipLimit(query); + + const sort_direction = getSortDir(query); + + applyAccountFilters(query, queryStruct); + + applyGenericFilters(query, queryStruct); + + applyTimeFilter(query, queryStruct); + + applyCodeActionFilters(query, queryStruct); + + // allow precise counting of total hits + const trackTotalHits = getTrackTotalHits(query); + + // Prepare query body const query_body = { "track_total_hits": trackTotalHits, - "query": queryStruct, - "sort": { - "global_sequence": sort_direction - } + "query": queryStruct }; + // Include sorting + addSortedBy(query, query_body, sort_direction); + + // console.log(prettyjson.render(queryStruct)); + + // Perform search const pResults = await Promise.all([eosjs.rpc.get_info(), elastic['search']({ "index": process.env.CHAIN + '-action-*', "from": skip || 0, "size": (limit > maxActions ? maxActions : limit) || 10, "body": query_body })]); - const results = pResults[1]; + + const results = pResults[1]['body']['hits']; const response = { query_time: null, cached: false, lib: pResults[0].last_irreversible_block_num, - total: results['body']['hits']['total'], - actions: [] + total: results['total'] }; - if (results['body']['hits']['hits'].length > 0) { - const actions = results['body']['hits']['hits']; + + if (query.simple) { + response['simple_actions'] = []; + } else { + response['actions'] = []; + } + + if (results['hits'].length > 0) { + const actions = results['hits']; for (let action of actions) { action = action._source; mergeActionMeta(action); - response.actions.push(action); + if (query.simple) { + response.simple_actions.push({ + block: action['block_num'], + irreversible: action['block_num'] < pResults[0].last_irreversible_block_num, + timestamp: action['@timestamp'], + transaction_id: action['trx_id'], + actors: action['act']['authorization'].map(a => `${a.actor}@${a.permission}`).join(","), + notified: action['notified'].join(','), + contract: action['act']['account'], + action: action['act']['name'], + data: action['act']['data'] + }); + } else { + response.actions.push(action); + } } } + response['query_time'] = Date.now() - t0; + if (enable_caching) { redis.set(hash, JSON.stringify(response), 'EX', cache_life); } + return response; } diff --git a/api/handlers/v1-history/get_actions.js b/api/handlers/v1-history/get_actions.js index 1ea281d6..78290296 100644 --- a/api/handlers/v1-history/get_actions.js +++ b/api/handlers/v1-history/get_actions.js @@ -10,7 +10,7 @@ const extendedActions = new Set(["transfer", "newaccount", "updateauth"]); const schema = { description: 'legacy get actions query', summary: 'get actions', - tags: ['history'], + tags: ['actions','history'], body: { type: ['object', 'string'], properties: { @@ -302,6 +302,7 @@ async function get_actions(fastify, request) { const receipt = action.receipts[0]; act.action_trace.receipt = receipt; act.action_trace.receiver = receipt.receiver; + act.account_action_seq = receipt['recv_sequence']; response.actions.push(act); }); } diff --git a/api/handlers/v1-history/get_transaction.js b/api/handlers/v1-history/get_transaction.js index 7599ca83..ee078e59 100644 --- a/api/handlers/v1-history/get_transaction.js +++ b/api/handlers/v1-history/get_transaction.js @@ -7,7 +7,7 @@ const crypto = require('crypto'); const schema = { description: 'get all actions belonging to the same transaction', summary: 'get transaction by id', - tags: ['history'], + tags: ['transactions','history'], body: { type: ['object', 'string'], properties: { diff --git a/api/helpers/functions.js b/api/helpers/functions.js index 688c3352..02affa7b 100644 --- a/api/helpers/functions.js +++ b/api/helpers/functions.js @@ -11,11 +11,6 @@ function mergeActionMeta(action) { const name = action.act.name; if (action['@' + name]) { action['act']['data'] = _.merge(action['@' + name], action['act']['data']); - if (name === 'transfer') { - action.act.data.quantity = String(action.act.data.amount) + ' ' + action.act.data.symbol; - delete action.act.data.amount; - delete action.act.data.symbol; - } delete action['@' + name]; } } diff --git a/api/schemas/get_actions.js b/api/schemas/get_actions.js index 964b6a5b..6cdf94d6 100644 --- a/api/schemas/get_actions.js +++ b/api/schemas/get_actions.js @@ -2,7 +2,7 @@ exports.GET = { description: 'get actions based on notified account. this endpoint also accepts generic filters based on indexed fields' + ' (e.g. act.authorization.actor=eosio or act.name=delegatebw), if included they will be combined with a AND operator', summary: 'get root actions', - tags: ['history'], + tags: ['actions', 'history'], querystring: { type: 'object', properties: { @@ -44,10 +44,9 @@ exports.GET = { description: 'filter before specified date (ISO8601)', type: 'string' }, - "parent": { - description: 'filter by parent global sequence', - type: 'integer', - minimum: 0 + "simple": { + description: 'simplified output mode', + type: 'boolean' } } }, @@ -69,6 +68,25 @@ exports.GET = { "relation": {type: "string"} } }, + "simple_actions": { + type: "array", + items: { + type: "object", + properties: { + "block": {type: "number"}, + "timestamp": {type: "string"}, + "irreversible": {type: "boolean"}, + "contract": {type: "string"}, + "action": {type: "string"}, + "actors": {type: "string"}, + "notified": {type: "string"}, + "transaction_id": {type: "string"}, + "data": { + additionalProperties: true + } + } + } + }, "actions": { type: "array", items: { @@ -82,12 +100,28 @@ exports.GET = { }, additionalProperties: true }, + "cpu_usage_us": {type: "number"}, + "net_usage_words": {type: "number"}, + "account_ram_deltas": { + type: "array", + items: { + type: "object", + properties: { + "account": {type: "string"}, + "delta": {type: "number"} + }, + additionalProperties: true + } + }, + "global_sequence": {type: "number"}, + "receiver": {type: 'string'}, "@timestamp": {type: "string"}, "block_num": {type: "number"}, "producer": {type: "string"}, "trx_id": {type: "string"}, "parent": {type: "number"}, - "global_sequence": {type: "number"}, + "action_ordinal": {type: 'number'}, + "creator_action_ordinal": {type: 'number'}, "notified": {type: "array", items: {type: "string"}} } } diff --git a/api/schemas/get_created_accounts.js b/api/schemas/get_created_accounts.js index c8d718c1..b5afe43d 100644 --- a/api/schemas/get_created_accounts.js +++ b/api/schemas/get_created_accounts.js @@ -1,7 +1,7 @@ exports.GET = { description: 'get all accounts created by one creator', summary: 'get created accounts', - tags: ['history'], + tags: ['accounts', 'history'], querystring: { type: 'object', properties: { @@ -32,9 +32,9 @@ exports.GET = { items: { type: 'object', properties: { - 'name': {type:'string'}, - 'timestamp': {type:'string'}, - 'trx_id': {type:'string'} + 'name': {type: 'string'}, + 'timestamp': {type: 'string'}, + 'trx_id': {type: 'string'} } } } diff --git a/api/schemas/get_creator.js b/api/schemas/get_creator.js index 15dc70d3..caa73532 100644 --- a/api/schemas/get_creator.js +++ b/api/schemas/get_creator.js @@ -1,7 +1,7 @@ exports.GET = { description: 'get account creator', summary: 'get account creator', - tags: ['history'], + tags: ['accounts', 'history'], querystring: { type: 'object', properties: { diff --git a/api/schemas/get_key_accounts.js b/api/schemas/get_key_accounts.js index 45deee63..25706a04 100644 --- a/api/schemas/get_key_accounts.js +++ b/api/schemas/get_key_accounts.js @@ -2,7 +2,7 @@ module.exports = { GET: { description: 'get accounts by public key', summary: 'get accounts by public key', - tags: ['state'], + tags: ['accounts','state'], querystring: { type: 'object', properties: { @@ -30,7 +30,7 @@ module.exports = { POST: { description: 'get accounts by public key', summary: 'get accounts by public key', - tags: ['state'], + tags: ['accounts','state'], body: { type: 'object', properties: { diff --git a/api/schemas/get_tokens.js b/api/schemas/get_tokens.js index 0ec1cad4..de270cf6 100644 --- a/api/schemas/get_tokens.js +++ b/api/schemas/get_tokens.js @@ -1,7 +1,7 @@ exports.GET = { description: 'get tokens from account', summary: 'get tokens from account', - tags: ['state'], + tags: ['accounts', 'state'], querystring: { type: 'object', properties: { diff --git a/api/schemas/get_transaction.js b/api/schemas/get_transaction.js index 965ebcab..ad51ab80 100644 --- a/api/schemas/get_transaction.js +++ b/api/schemas/get_transaction.js @@ -1,7 +1,7 @@ exports.GET = { description: 'get all actions belonging to the same transaction', summary: 'get transaction by id', - tags: ['history'], + tags: ['transactions','history'], querystring: { type: 'object', properties: { diff --git a/api/schemas/get_voters.js b/api/schemas/get_voters.js index 4acd58ff..7756334b 100644 --- a/api/schemas/get_voters.js +++ b/api/schemas/get_voters.js @@ -1,7 +1,7 @@ exports.GET = { description: 'get voters', summary: 'get voters', - tags: ['state'], + tags: ['accounts','state'], querystring: { type: 'object', properties: { diff --git a/connections/manager.js b/connections/manager.js index f877e57b..81912bc4 100644 --- a/connections/manager.js +++ b/connections/manager.js @@ -48,7 +48,7 @@ class ConnectionManager { return redis.createClient(this.redisOptions); } - get elasticsearchClient() { + getESClient() { let es_url; if (conf.elasticsearch.user !== '') { es_url = `http://${conf.elasticsearch.user}:${conf.elasticsearch.pass}@${conf.elasticsearch.host}`; @@ -58,6 +58,33 @@ class ConnectionManager { return new elasticsearch.Client({node: es_url}); } + get elasticsearchClient() { + return this.getESClient(); + } + + get ingestClients() { + if (conf['elasticsearch']['ingest_nodes']) { + const clients = []; + const nodes = conf['elasticsearch']['ingest_nodes']; + if (nodes.length > 0) { + for (const node of nodes) { + let es_url; + const _user = conf['elasticsearch']['user']; + const _pass = conf['elasticsearch']['pass']; + if (conf['elasticsearch']['user'] !== '') { + es_url = `http://${_user}:${_pass}@${node}`; + } else { + es_url = `http://${node}` + } + clients.push(new elasticsearch.Client({node: es_url, pingTimeout: 100})); + } + } + return clients; + } else { + return [this.getESClient()]; + } + } + get nodeosJsonRPC() { return new JsonRpc(conf.chains[process.env.CHAIN]['http'], {fetch}); } diff --git a/definitions/lifecycle_policies.js b/definitions/lifecycle_policies.js new file mode 100644 index 00000000..2ec0fb9e --- /dev/null +++ b/definitions/lifecycle_policies.js @@ -0,0 +1,25 @@ +module.exports = { + ILPs: [ + { + policy: "50G30D", + body: { + "policy": { + "phases": { + "hot": { + "min_age": "0ms", + "actions": { + "rollover": { + "max_age": "30d", + "max_size": "50gb" + }, + "set_priority": { + "priority": 100 + } + } + } + } + } + } + } + ] +}; diff --git a/definitions/mappings.js b/definitions/mappings.js index fb2b32db..2ebbbea1 100644 --- a/definitions/mappings.js +++ b/definitions/mappings.js @@ -1,30 +1,43 @@ +const shards = 2; +const replicas = 0; +const refresh = "1s"; +const chain = process.env.CHAIN; +const defaultLifecyclePolicy = "50G30D"; + +const ILPs = require('./lifecycle_policies').ILPs; + +// LZ4 Compression +const compression = 'default'; +// DEFLATE +// const compression = "best_compression"; + const action = { - "order": 0, - "index_patterns": [ - process.env.CHAIN + "-action-*" + order: 0, + index_patterns: [ + chain + "-action-*" ], - "settings": { - "index": { - "lifecycle": { - "name": "50G30D", - "rollover_alias": process.env.CHAIN + "-action" + settings: { + index: { + lifecycle: { + "name": defaultLifecyclePolicy, + "rollover_alias": chain + "-action" }, - "codec": "best_compression", - "refresh_interval": "10s", - "number_of_shards": "4", - "number_of_replicas": "0", - "sort": { - "field": "global_sequence", - "order": "desc" + codec: compression, + refresh_interval: refresh, + number_of_shards: shards * 2, + number_of_replicas: replicas, + sort: { + field: "global_sequence", + order: "desc" } } }, - "mappings": { - "properties": { + mappings: { + properties: { "@timestamp": {"type": "date"}, "global_sequence": {"type": "long"}, - "account_ram_deltas.delta": {"enabled": false}, - "account_ram_deltas.account": {"enabled": false}, + "account_ram_deltas.delta": {"type": "integer"}, + "account_ram_deltas.account": {"type": "keyword"}, "act.authorization.permission": {"enabled": false}, "act.authorization.actor": {"type": "keyword"}, "act.account": {"type": "keyword"}, @@ -47,7 +60,7 @@ const action = { "auth_sequence": { "properties": { "account": {"type": "keyword"}, - "sequence": {"type": "integer"} + "sequence": {"type": "long"} } } } @@ -126,17 +139,18 @@ const action = { }; const abi = { - "index_patterns": [process.env.CHAIN + "-abi-*"], + "index_patterns": [chain + "-abi-*"], "settings": { "index": { - "number_of_shards": 1, - "refresh_interval": "10s", - "number_of_replicas": 0 - }, - "index.codec": "best_compression" + "number_of_shards": shards, + "refresh_interval": refresh, + "number_of_replicas": replicas, + "codec": compression + } }, "mappings": { "properties": { + "@timestamp": {"type": "date"}, "block": {"type": "long"}, "account": {"type": "keyword"}, "abi": {"enabled": false} @@ -145,25 +159,25 @@ const abi = { }; const block = { - "index_patterns": [process.env.CHAIN + "-block-*"], + "index_patterns": [chain + "-block-*"], "settings": { "index": { - "number_of_shards": 2, - "refresh_interval": "5s", - "number_of_replicas": 0, + "codec": compression, + "number_of_shards": shards, + "refresh_interval": refresh, + "number_of_replicas": replicas, "sort.field": "block_num", "sort.order": "desc" - }, - "index.codec": "best_compression" + } }, "mappings": { "properties": { + "@timestamp": {"type": "date"}, "block_num": {"type": "long"}, "producer": {"type": "keyword"}, "new_producers.producers.block_signing_key": {"enabled": false}, "new_producers.producers.producer_name": {"type": "keyword"}, "new_producers.version": {"type": "long"}, - "@timestamp": {"type": "date"}, "schedule_version": {"type": "double"}, "cpu_usage": {"type": "integer"}, "net_usage": {"type": "integer"} @@ -172,12 +186,13 @@ const block = { }; const tableProposals = { - "index_patterns": [process.env.CHAIN + "-table-proposals-*"], + "index_patterns": [chain + "-table-proposals-*"], "settings": { "index": { - "number_of_shards": 3, - "refresh_interval": "5s", - "number_of_replicas": 0, + "codec": compression, + "number_of_shards": shards, + "refresh_interval": refresh, + "number_of_replicas": replicas, "sort.field": "block_num", "sort.order": "desc" } @@ -194,12 +209,13 @@ const tableProposals = { }; const tableAccounts = { - "index_patterns": [process.env.CHAIN + "-table-accounts-*"], + "index_patterns": [chain + "-table-accounts-*"], "settings": { "index": { - "number_of_shards": 3, - "refresh_interval": "5s", - "number_of_replicas": 0, + "codec": compression, + "number_of_shards": shards, + "refresh_interval": refresh, + "number_of_replicas": replicas, "sort.field": "amount", "sort.order": "desc" } @@ -217,12 +233,13 @@ const tableAccounts = { }; const tableUserRes = { - "index_patterns": [process.env.CHAIN + "-table-userres-*"], + "index_patterns": [chain + "-table-userres-*"], "settings": { "index": { - "number_of_shards": 3, - "refresh_interval": "5s", - "number_of_replicas": 0, + "codec": compression, + "number_of_shards": shards, + "refresh_interval": refresh, + "number_of_replicas": replicas, "sort.field": "total_weight", "sort.order": "desc" } @@ -241,12 +258,13 @@ const tableUserRes = { }; const tableDelBand = { - "index_patterns": [process.env.CHAIN + "-table-delband-*"], + "index_patterns": [chain + "-table-delband-*"], "settings": { "index": { - "number_of_shards": 3, - "refresh_interval": "5s", - "number_of_replicas": 0, + "codec": compression, + "number_of_shards": shards, + "refresh_interval": refresh, + "number_of_replicas": replicas, "sort.field": "total_weight", "sort.order": "desc" } @@ -265,12 +283,13 @@ const tableDelBand = { }; const tableVoters = { - "index_patterns": [process.env.CHAIN + "-table-voters-*"], + "index_patterns": [chain + "-table-voters-*"], "settings": { "index": { - "number_of_shards": 3, - "refresh_interval": "5s", - "number_of_replicas": 0, + "codec": compression, + "number_of_shards": shards, + "refresh_interval": refresh, + "number_of_replicas": replicas, "sort.field": "last_vote_weight", "sort.order": "desc" } @@ -291,30 +310,30 @@ const tableVoters = { }; const delta = { - "index_patterns": [process.env.CHAIN + "-delta-*"], + "index_patterns": [chain + "-delta-*"], "settings": { "index": { "lifecycle": { - "name": "50G30D", - "rollover_alias": process.env.CHAIN + "-delta" + "name": defaultLifecyclePolicy, + "rollover_alias": chain + "-delta" }, - "number_of_shards": 2, - "refresh_interval": "5s", - "number_of_replicas": 0, - "sort.field": "block_num", - "sort.order": "desc" - }, - "index.codec": "best_compression" + "codec": compression, + "number_of_shards": shards * 2, + "refresh_interval": refresh, + "number_of_replicas": replicas + } }, "mappings": { "properties": { + // "global_sequence": {"type": "long"}, + // "@timestamp": {"type": "date"}, "block_num": {"type": "long"}, - "present": {"type": "boolean"}, + "data": {"enabled": false}, "code": {"type": "keyword"}, + "present": {"type": "boolean"}, "scope": {"type": "keyword"}, "table": {"type": "keyword"}, "payer": {"type": "keyword"}, - "data": {"enabled": false}, "primary_key": {"type": "keyword"}, "@approvals.proposal_name": {"type": "keyword"}, "@approvals.provided_approvals": {"type": "object"}, @@ -349,7 +368,7 @@ const delta = { }; module.exports = { - action, block, abi, delta, + action, block, abi, delta, ILPs, "table-proposals": tableProposals, "table-accounts": tableAccounts, "table-delband": tableDelBand, diff --git a/example-connections.json b/example-connections.json index 2cc04c3b..364098bc 100644 --- a/example-connections.json +++ b/example-connections.json @@ -8,6 +8,7 @@ }, "elasticsearch": { "host": "127.0.0.1:9200", + "ingest_nodes": ["127.0.0.1:9200"], "user": "elastic", "pass": "password" }, @@ -16,7 +17,7 @@ "port": "6379" }, "chains": { - "eos": { + "rem": { "http": "http://127.0.0.1:8888", "ship": "ws://127.0.0.1:8080" } diff --git a/example-ecosystem.config.js b/example-ecosystem.config.js index fd0c8c4b..587ffaa1 100644 --- a/example-ecosystem.config.js +++ b/example-ecosystem.config.js @@ -24,12 +24,12 @@ module.exports = { LIVE_ONLY: 'false', FETCH_BLOCK: 'true', FETCH_TRACES: 'true', - CHAIN: 'eos', - SYSTEM_DOMAIN: 'eosio', + CHAIN: 'rem', + SYSTEM_DOMAIN: 'rem', CREATE_INDICES: 'v1', PREVIEW: 'false', - READERS: 1, - DESERIALIZERS: 1, + READERS: 2, + DESERIALIZERS: 2, DS_MULT: 1, ES_IDX_QUEUES: 1, ES_AD_IDX_QUEUES: 1, @@ -62,7 +62,7 @@ module.exports = { env: { PROVIDER_NAME: 'Provider Name', PROVIDER_URL: 'https://yourproviderwebsite', - CHAIN: 'eos', + CHAIN: 'rem', CHAIN_NAME: 'EOS Mainnet', CHAIN_LOGO_URL: 'https://bloks.io/img/chains/eos.png', SERVER_PORT: '7000', diff --git a/helpers/elastic-routes.js b/helpers/elastic-routes.js index 4cfb2085..750db150 100644 --- a/helpers/elastic-routes.js +++ b/helpers/elastic-routes.js @@ -13,7 +13,41 @@ const prettyjson = require('prettyjson'); const {ConnectionManager} = require('../connections/manager'); const manager = new ConnectionManager(); -const client = manager.elasticsearchClient; +const ingestClients = manager.ingestClients; +const ingestNodeCounters = {}; + +function resetCounters() { + ingestClients.forEach((val, idx) => { + ingestNodeCounters[idx] = { + status: true, + docs: 0 + }; + }); +} + +resetCounters(); + +function bulkAction(bulkData) { + let minIdx = 0; + if (ingestClients.length > 1) { + let min; + ingestClients.forEach((val, idx) => { + if (!min) { + min = ingestNodeCounters[idx].docs; + } else { + if (ingestNodeCounters[idx].docs < min) { + min = ingestNodeCounters[idx].docs; + minIdx = idx; + } + } + }); + } + ingestNodeCounters[minIdx].docs += bulkData.body.length; + if (ingestNodeCounters[minIdx].docs > 1000) { + resetCounters(); + } + return ingestClients[minIdx]['bulk'](bulkData); +} function ackOrNack(resp, messageMap, channel) { for (const item of resp.items) { @@ -67,41 +101,46 @@ function onError(err, channel, callback) { } } + // Define index routes const routes = {}; routes['action'] = async (payloads, channel, cb) => { const messageMap = new Map(); - client['bulk']({ + const t0 = Date.now(); + bulkAction({ index: queue_prefix + '-action', type: '_doc', body: buildActionBulk(payloads, messageMap) }).then(resp => { + // console.log('Bulk index actions - ' + (Date.now() - t0) + "ms - " + payloads.length + ' actions'); onResponse(resp, messageMap, cb, payloads, channel); }).catch(err => { onError(err, channel, cb); }); }; -routes['block'] = async (payloads, channel, cb) => { +routes['delta'] = async (payloads, channel, cb) => { const messageMap = new Map(); - client['bulk']({ - index: queue_prefix + '-block', + const t0 = Date.now(); + bulkAction({ + index: queue_prefix + '-delta', type: '_doc', - body: buildBlockBulk(payloads, messageMap) + body: buildDeltaBulk(payloads, messageMap) }).then(resp => { + // console.log('Bulk index deltas - ' + (Date.now() - t0) + "ms"); onResponse(resp, messageMap, cb, payloads, channel); }).catch(err => { onError(err, channel, cb); }); }; -routes['delta'] = async (payloads, channel, cb) => { +routes['block'] = async (payloads, channel, cb) => { const messageMap = new Map(); - client['bulk']({ - index: queue_prefix + '-delta', + bulkAction({ + index: queue_prefix + '-block', type: '_doc', - body: buildDeltaBulk(payloads, messageMap) + body: buildBlockBulk(payloads, messageMap) }).then(resp => { onResponse(resp, messageMap, cb, payloads, channel); }).catch(err => { @@ -111,7 +150,7 @@ routes['delta'] = async (payloads, channel, cb) => { routes['table-proposals'] = async (payloads, channel, cb) => { const messageMap = new Map(); - client['bulk']({ + bulkAction({ index: queue_prefix + '-table-proposals', type: '_doc', body: buildTableProposalsBulk(payloads, messageMap) @@ -124,7 +163,7 @@ routes['table-proposals'] = async (payloads, channel, cb) => { routes['table-accounts'] = async (payloads, channel, cb) => { const messageMap = new Map(); - client['bulk']({ + bulkAction({ index: queue_prefix + '-table-accounts', type: '_doc', body: buildTableAccountsBulk(payloads, messageMap) @@ -137,7 +176,7 @@ routes['table-accounts'] = async (payloads, channel, cb) => { routes['table-voters'] = async (payloads, channel, cb) => { const messageMap = new Map(); - client['bulk']({ + bulkAction({ index: queue_prefix + '-table-voters', type: '_doc', body: buildTableVotersBulk(payloads, messageMap) @@ -150,7 +189,7 @@ routes['table-voters'] = async (payloads, channel, cb) => { routes['abi'] = async (payloads, channel, cb) => { const messageMap = new Map(); - client['bulk']({ + bulkAction({ index: queue_prefix + '-abi', type: '_doc', body: buildAbiBulk(payloads, messageMap) diff --git a/helpers/functions.js b/helpers/functions.js index 1cf41d7a..9cc6548a 100644 --- a/helpers/functions.js +++ b/helpers/functions.js @@ -1,6 +1,7 @@ const {Serialize} = require('eosjs'); const zlib = require('zlib'); const prettyjson = require("prettyjson"); +const _ = require('lodash'); function onError(err) { console.log(process.env['worker_role']); @@ -175,12 +176,52 @@ function messageAllWorkers(cl, payload) { } function printWorkerMap(wmp) { - console.log('--------------------------------------------------'); - console.log(prettyjson.render({ - 'workers': wmp - }, { - numberColor: 'grey' - })); + console.log('---------------- PROPOSED WORKER LIST ----------------------'); + for (const w of wmp) { + const str = []; + for (const key in w) { + if (w.hasOwnProperty(key) && key !== 'worker_id') { + switch (key) { + case 'worker_role': { + str.push(`Role: ${w[key]}`); + break; + } + case 'worker_queue': { + str.push(`Queue Name: ${w[key]}`); + break; + } + case 'first_block': { + str.push(`First Block: ${w[key]}`); + break; + } + case 'last_block': { + str.push(`Last Block: ${w[key]}`); + break; + } + case 'live_mode': { + str.push(`Live Mode: ${w[key]}`); + break; + } + case 'type': { + str.push(`Index Type: ${w[key]}`); + break; + } + case 'worker_last_processed_block':{ + str.push(`Last Processed Block: ${w[key]}`); + break; + } + case 'queue': { + str.push(`Indexing Queue: ${w[key]}`); + break; + } + default: { + str.push(`${key}: ${w[key]}`); + } + } + } + } + console.log(`Worker ID: ${w.worker_id} \t ${str.join(" | ")}`) + } console.log('--------------------------------------------------'); } diff --git a/master.js b/master.js index a96f1753..2464b9be 100644 --- a/master.js +++ b/master.js @@ -1,5 +1,6 @@ const cluster = require('cluster'); const fs = require('fs'); +const path = require('path'); const pm2io = require('@pm2/io'); const {promisify} = require('util'); const doctor = require('./modules/doctor'); @@ -19,47 +20,148 @@ const { onSaveAbi } = require("./helpers/functions"); +// Master proc globals let client, rClient, rpc; let cachedInitABI = null; - const missingRanges = []; +let currentSchedule; +let lastProducer = null; +const producedBlocks = {}; +let handoffCounter = 0; +let lastProducedBlockNum = 0; +const missedRounds = {}; +let dsErrorStream; +let abiCacheMap; + +async function getCurrentSchedule() { + currentSchedule = await rpc.get_producer_schedule(); +} -async function main() { - // Preview mode - prints only the proposed worker map - let preview = process.env.PREVIEW === 'true'; - const queue_prefix = process.env.CHAIN; - if (process.env.PURGE_QUEUES === 'true') { - if (process.env.DISABLE_READING === 'true') { - console.log('Conflict between PURGE_QUEUES and DISABLE_READING'); - process.exit(1); - } else { - await manager.purgeQueues(queue_prefix); +async function reportMissedBlocks(producer, last_block, size) { + console.log(`${producer} missed ${size} ${size === 1 ? "block" : "blocks"} after ${last_block}`); + await client.index({ + index: process.env.CHAIN + '-logs', + body: { + type: 'missed_blocks', + '@timestamp': new Date().toISOString(), + 'missed_blocks': { + 'producer': producer, + 'last_block': last_block, + 'size': size, + 'schedule_version': currentSchedule.schedule_version + } } - } + }); +} - rpc = manager.nodeosJsonRPC; - rClient = manager.redisClient; - client = manager.elasticsearchClient; +let blockMsgQueue = []; - const getAsync = promisify(rClient.get).bind(rClient); +function onLiveBlock(msg) { - const n_deserializers = parseInt(process.env.DESERIALIZERS, 10); - const n_ingestors_per_queue = parseInt(process.env.ES_IDX_QUEUES, 10); - const action_indexing_ratio = parseInt(process.env.ES_AD_IDX_QUEUES, 10); + if (msg.block_num === lastProducedBlockNum + 1 || lastProducedBlockNum === 0) { + const prod = msg.producer; - let max_readers = parseInt(process.env.READERS, 10); - if (process.env.DISABLE_READING === 'true') { - // Create a single reader to read the abi struct and quit. - max_readers = 1; + if (process.env.BP_LOGS === 'true') { + console.log(`Received block ${msg.block_num} from ${prod}`); + } + if (producedBlocks[prod]) { + producedBlocks[prod]++; + } else { + producedBlocks[prod] = 1; + } + if (lastProducer !== prod) { + handoffCounter++; + if (lastProducer && handoffCounter > 2) { + const activeProds = currentSchedule.active.producers; + const newIdx = activeProds.findIndex(p => p['producer_name'] === prod) + 1; + const oldIdx = activeProds.findIndex(p => p['producer_name'] === lastProducer) + 1; + if ((newIdx === oldIdx + 1) || (newIdx === 1 && oldIdx === activeProds.length)) { + // Normal operation + if (process.env.BP_LOGS === 'true') { + console.log(`[${msg.block_num}] producer handoff: ${lastProducer} [${oldIdx}] -> ${prod} [${newIdx}]`); + } + } else { + let cIdx = oldIdx + 1; + while (cIdx !== newIdx) { + try { + if (activeProds[cIdx - 1]) { + const missingProd = activeProds[cIdx - 1]['producer_name']; + // report + reportMissedBlocks(missingProd, lastProducedBlockNum, 12) + .catch(console.log); + // count missed + if (missedRounds[missingProd]) { + missedRounds[missingProd]++; + } else { + missedRounds[missingProd] = 1; + } + console.log(`${missingProd} missed a round [${missedRounds[missingProd]}]`); + } + } catch (e) { + console.log(activeProds); + console.log(e); + } + cIdx++; + if (cIdx === activeProds.length) { + cIdx = 0; + } + } + } + if (producedBlocks[lastProducer]) { + if (producedBlocks[lastProducer] < 12) { + const _size = 12 - producedBlocks[lastProducer]; + reportMissedBlocks(lastProducer, lastProducedBlockNum, _size) + .catch(console.log) + } + } + producedBlocks[lastProducer] = 0; + } + lastProducer = prod; + } + lastProducedBlockNum = msg.block_num; + } else { + blockMsgQueue.push(msg); + blockMsgQueue.sort((a, b) => a.block_num - b.block_num); + while (blockMsgQueue.length > 0) { + if (blockMsgQueue[0].block_num === lastProducedBlockNum + 1) { + onLiveBlock(blockMsgQueue.shift()); + } else { + break; + } + } } +} - const {index_queues} = require('./definitions/index-queues'); +function setupDSElogs(starting_block, head) { + const logPath = './logs/' + process.env.CHAIN; + if (!fs.existsSync(logPath)) fs.mkdirSync(logPath, {recursive: true}); + const dsLogFileName = (new Date().toISOString()) + "_ds_err_" + starting_block + "_" + head + ".log"; + const dsErrorsLog = logPath + '/' + dsLogFileName; + if (fs.existsSync(dsErrorsLog)) fs.unlinkSync(dsErrorsLog); + const symbolicLink = logPath + '/deserialization_errors.log'; + if (fs.existsSync(symbolicLink)) fs.unlinkSync(symbolicLink); + fs.symlinkSync(dsLogFileName, symbolicLink); + dsErrorStream = fs.createWriteStream(dsErrorsLog, {flags: 'a'}); + console.log(`Deserialization errors are being logged in: ${path.join(__dirname, symbolicLink)}`); +} - const indicesList = ["action", "block", "abi", "delta"]; +async function initAbiCacheMap(getAsync) { + const cachedMap = await getAsync(process.env.CHAIN + ":" + 'abi_cache'); + if (cachedMap) { + abiCacheMap = JSON.parse(cachedMap); + console.log(`Found ${Object.keys(abiCacheMap).length} entries in the local ABI cache`) + } else { + abiCacheMap = {}; + } - const index_queue_prefix = queue_prefix + ':index'; + // Periodically save the current map + setInterval(() => { + rClient.set(process.env.CHAIN + ":" + 'abi_cache', JSON.stringify(abiCacheMap)); + }, 10000); +} - const script_status = await client.putScript({ +async function applyUpdateScript(esClient) { + const script_status = await esClient.putScript({ id: "updateByBlock", body: { script: { @@ -89,14 +191,17 @@ async function main() { } } }); - if (!script_status['body']['acknowledged']) { console.log('Failed to load script updateByBlock. Aborting!'); process.exit(1); } else { - console.log('Script loaded!'); + console.log('Painless Update Script loaded!'); } +} +function addStateTables(indicesList, index_queues) { + const queue_prefix = process.env.CHAIN; + const index_queue_prefix = queue_prefix + ':index'; // Optional state tables if (process.env.PROPOSAL_STATE === 'true') { indicesList.push("table-proposals"); @@ -122,9 +227,138 @@ async function main() { indicesList.push("table-userres"); index_queues.push({type: 'table-userres', name: index_queue_prefix + "_table_userres"}); } +} + +async function waitForLaunch() { + return new Promise(resolve => { + console.log(`Use "pm2 trigger ${pm2io.getConfig().module_name} start" to start the indexer now or restart without preview mode.`); + const idleTimeout = setTimeout(() => { + console.log('No command received after 10 minutes.'); + console.log('Exiting now! Disable the PREVIEW mode to continue.'); + process.exit(1); + }, 60000 * 10); + pm2io.action('start', (reply) => { + resolve(); + reply({ack: true}); + clearTimeout(idleTimeout); + }); + }); +} + +async function main() { + + console.log(`--------- Hyperion Indexer ${require('./package').version} ---------`); + + console.log(`Using parser version ${process.env.PARSER}`); + console.log(`Chain: ${process.env.CHAIN}`); + + if (process.env.ABI_CACHE_MODE === 'true') { + console.log('--------\n ABI CACHING MODE \n ---------'); + } + + // Preview mode - prints only the proposed worker map + let preview = process.env.PREVIEW === 'true'; + const queue_prefix = process.env.CHAIN; + + // Purge queues + if (process.env.PURGE_QUEUES === 'true') { + if (process.env.DISABLE_READING === 'true') { + console.log('Conflict between PURGE_QUEUES and DISABLE_READING'); + process.exit(1); + } else { + await manager.purgeQueues(queue_prefix); + } + } + + // Chain API + rpc = manager.nodeosJsonRPC; + await getCurrentSchedule(); + + // Redis + rClient = manager.redisClient; + const getAsync = promisify(rClient.get).bind(rClient); + + // ELasticsearch + client = manager.elasticsearchClient; + let ingestClients = manager.ingestClients; + + // Check for ingestion nodes + for (const ingestClient of ingestClients) { + try { + const ping_response = await ingestClient.ping(); + if (ping_response.body) { + console.log(`Ingest client ready at ${ping_response.meta.connection.id}`); + } + } catch (e) { + console.log(e); + console.log('Failed to connect to one of the ingestion nodes. Please verify the connections.json file'); + process.exit(1); + } + } + ingestClients = null; + + const n_deserializers = parseInt(process.env.DESERIALIZERS, 10); + const n_ingestors_per_queue = parseInt(process.env.ES_IDX_QUEUES, 10); + const action_indexing_ratio = parseInt(process.env.ES_AD_IDX_QUEUES, 10); + + let max_readers = parseInt(process.env.READERS, 10); + if (process.env.DISABLE_READING === 'true') { + // Create a single reader to read the abi struct and quit. + max_readers = 1; + } + + const {index_queues} = require('./definitions/index-queues'); + + const indicesList = ["action", "block", "abi", "delta"]; + + addStateTables(indicesList, index_queues); + + await applyUpdateScript(client); const indexConfig = require('./definitions/mappings'); + // Add lifecycle policy + if (indexConfig.ILPs) { + // check for existing policy + for (const ILP of indexConfig.ILPs) { + try { + await client.ilm.getLifecycle({ + policy: ILP.policy + }); + } catch (e) { + console.log(e); + try { + const ilm_status = await client.ilm.putLifecycle(ILP); + if (!ilm_status['body']['acknowledged']) { + console.log(`Failed to create ILM Policy`); + } + } catch (e) { + console.log(`[FATAL] :: Failed to create ILM Policy`); + console.log(e); + process.exit(1); + } + } + } + } + + // Check for extra mappings + // Load Modules + const HyperionModuleLoader = require('./modules/index').HyperionModuleLoader; + const mLoader = new HyperionModuleLoader(process.env.PARSER); + + // Modify mappings + for (const exM of mLoader.extraMappings) { + if (exM['action']) { + for (const key in exM['action']) { + if (exM['action'].hasOwnProperty(key)) { + indexConfig['action']['mappings']['properties'][key] = exM['action'][key]; + console.log(`Mapping added for ${key}`); + } + } + } + } + + // Update index templates for (const index of indicesList) { try { @@ -140,9 +374,9 @@ async function main() { process.exit(1); } } - console.log('Index templates updated'); + // Create indices if (process.env.CREATE_INDICES !== 'false' && process.env.CREATE_INDICES) { // Create indices let version; @@ -166,8 +400,6 @@ async function main() { index: new_index, name: `${queue_prefix}-${index}` }); - } else { - console.log(`Index ${new_index} already created!`); } } } @@ -185,20 +417,9 @@ async function main() { const workerMap = []; let worker_index = 0; - let pushedBlocks = 0; - let livePushedBlocks = 0; - let consumedBlocks = 0; - let liveConsumedBlocks = 0; - let indexedObjects = 0; - let deserializedActions = 0; - let lastProcessedBlockNum = 0; - let total_read = 0; - let total_blocks = 0; - let total_indexed_blocks = 0; - let total_actions = 0; - let total_range = 0; let allowShutdown = false; let allowMoreReaders = true; + let total_range = 0; let maxBatchSize = parseInt(process.env.BATCH_SIZE, 10); // Auto-stop @@ -208,96 +429,6 @@ async function main() { auto_stop = parseInt(process.env.AUTO_STOP, 10); } - // Monitoring - let log_interval = 5000; - let shutdownTimer; - const consume_rates = []; - setInterval(() => { - const _workers = Object.keys(cluster.workers).length; - const tScale = (log_interval / 1000); - total_read += pushedBlocks; - total_blocks += consumedBlocks; - total_actions += deserializedActions; - total_indexed_blocks += indexedObjects; - const consume_rate = consumedBlocks / tScale; - consume_rates.push(consume_rate); - if (consume_rates.length > 20) { - consume_rates.splice(0, 1); - } - let avg_consume_rate = 0; - if (consume_rates.length > 0) { - for (const r of consume_rates) { - avg_consume_rate += r; - } - avg_consume_rate = avg_consume_rate / consume_rates.length; - } else { - avg_consume_rate = consume_rate; - } - const log_msg = []; - - log_msg.push(`W:${_workers}`); - log_msg.push(`R:${(pushedBlocks + livePushedBlocks) / tScale} b/s`); - log_msg.push(`C:${(liveConsumedBlocks + consumedBlocks) / tScale} b/s`); - log_msg.push(`D:${deserializedActions / tScale} a/s`); - log_msg.push(`I:${indexedObjects / tScale} d/s`); - - if (total_blocks < total_range) { - const remaining = total_range - total_blocks; - const estimated_time = Math.round(remaining / avg_consume_rate); - const time_string = moment() - .add(estimated_time, 'seconds') - .fromNow(false); - const pct_parsed = ((total_blocks / total_range) * 100).toFixed(1); - const pct_read = ((total_read / total_range) * 100).toFixed(1); - log_msg.push(`${total_blocks}/${total_read}/${total_range}`); - log_msg.push(`syncs ${time_string} (${pct_parsed}% ${pct_read}%)`); - } - - console.log(log_msg.join(', ')); - - if (indexedObjects === 0 && deserializedActions === 0 && consumedBlocks === 0) { - - shutdownTimer = setTimeout(() => { - allowShutdown = true; - }, 10000); - - // Auto-Stop - if (pushedBlocks === 0) { - idle_count++; - if (auto_stop > 0 && (tScale * idle_count) >= auto_stop) { - console.log("Reached limit for no blocks processed, stopping now..."); - rClient.set('abi_cache', JSON.stringify(abiCacheMap)); - process.exit(1); - } else { - console.log(`No blocks processed! Indexer will stop in ${auto_stop - (tScale * idle_count)} seconds!`); - } - } - } else { - if (idle_count > 1) { - console.log('Processing resumed!'); - } - idle_count = 0; - if (shutdownTimer) { - clearTimeout(shutdownTimer); - shutdownTimer = null; - } - } - - // reset counters - pushedBlocks = 0; - livePushedBlocks = 0; - consumedBlocks = 0; - liveConsumedBlocks = 0; - deserializedActions = 0; - indexedObjects = 0; - - if (_workers === 0) { - console.log('FATAL ERROR - All Workers have stopped!'); - process.exit(1); - } - - }, log_interval); - let lastIndexedBlock; if (process.env.INDEX_DELTAS === 'true') { lastIndexedBlock = await getLastIndexedBlockByDelta(client); @@ -328,6 +459,7 @@ async function main() { starting_block = lastIndexedABI; } + // Define block range if (process.env.START_ON !== "0") { starting_block = parseInt(process.env.START_ON, 10); // Check last indexed block again @@ -344,15 +476,14 @@ async function main() { starting_block = lastIndexedBlockOnRange; } } - console.log('FIRST BLOCK: ' + starting_block); - console.log('LAST BLOCK: ' + head); + console.log('First Block: ' + starting_block); + console.log('Last Block: ' + head); } + // Setup Readers total_range = head - starting_block; - // Create first batch of parallel readers let lastAssignedBlock = starting_block; let activeReadersCount = 0; - if (process.env.REPAIR_MODE === 'false') { if (process.env.LIVE_ONLY === 'false') { while (activeReadersCount < max_readers && lastAssignedBlock < head) { @@ -372,14 +503,14 @@ async function main() { // activeReaders.push(def); activeReadersCount++; workerMap.push(def); - console.log(`Launching new reader from ${start} to ${end}`); + console.log(`Setting parallel reader [${worker_index}] from block ${start} to ${end}`); } } // Setup Serial reader worker if (process.env.LIVE_READER === 'true') { const _head = chain_data['head_block_num']; - console.log(`Starting live reader at head = ${_head}`); + console.log(`Setting live reader at head = ${_head}`); // live block reader worker_index++; @@ -391,15 +522,13 @@ async function main() { }); // live deserializer - for (let j = 0; j < process.env.DS_MULT; j++) { - worker_index++; - workerMap.push({ - worker_queue: queue_prefix + ':live_blocks', - worker_id: worker_index, - worker_role: 'deserializer', - live_mode: 'true' - }); - } + worker_index++; + workerMap.push({ + worker_id: worker_index, + worker_role: 'deserializer', + worker_queue: queue_prefix + ':live_blocks', + live_mode: 'true' + }); } } @@ -408,9 +537,9 @@ async function main() { for (let j = 0; j < process.env.DS_MULT; j++) { worker_index++; workerMap.push({ - worker_queue: queue_prefix + ':blocks' + ":" + (i + 1), worker_id: worker_index, worker_role: 'deserializer', + worker_queue: queue_prefix + ':blocks' + ":" + (i + 1), live_mode: 'false' }); } @@ -434,8 +563,8 @@ async function main() { workerMap.push({ worker_id: worker_index, worker_role: 'ingestor', - type: q.type, - queue: q.name + ":" + (qIdx + 1) + queue: q.name + ":" + (qIdx + 1), + type: q.type }); qIdx++; } @@ -464,36 +593,123 @@ async function main() { // Quit App if on preview mode if (preview) { printWorkerMap(workerMap); - process.exit(1); + await waitForLaunch(); } + // Setup Error Logging + setupDSElogs(starting_block, head); + await initAbiCacheMap(getAsync); + + // Start Monitoring + let log_interval = 5000; + let shutdownTimer; + const consume_rates = []; + let pushedBlocks = 0; + let livePushedBlocks = 0; + let consumedBlocks = 0; + let liveConsumedBlocks = 0; + let indexedObjects = 0; + let deserializedActions = 0; + let lastProcessedBlockNum = 0; + let total_read = 0; + let total_blocks = 0; + let total_indexed_blocks = 0; + let total_actions = 0; + setInterval(() => { + const _workers = Object.keys(cluster.workers).length; + const tScale = (log_interval / 1000); + total_read += pushedBlocks; + total_blocks += consumedBlocks; + total_actions += deserializedActions; + total_indexed_blocks += indexedObjects; + const consume_rate = consumedBlocks / tScale; + consume_rates.push(consume_rate); + if (consume_rates.length > 20) { + consume_rates.splice(0, 1); + } + let avg_consume_rate = 0; + if (consume_rates.length > 0) { + for (const r of consume_rates) { + avg_consume_rate += r; + } + avg_consume_rate = avg_consume_rate / consume_rates.length; + } else { + avg_consume_rate = consume_rate; + } + const log_msg = []; + + log_msg.push(`W:${_workers}`); + log_msg.push(`R:${(pushedBlocks + livePushedBlocks) / tScale} b/s`); + log_msg.push(`C:${(liveConsumedBlocks + consumedBlocks) / tScale} b/s`); + log_msg.push(`D:${deserializedActions / tScale} a/s`); + log_msg.push(`I:${indexedObjects / tScale} d/s`); + + if (total_blocks < total_range && process.env.LIVE_ONLY !== 'true') { + const remaining = total_range - total_blocks; + const estimated_time = Math.round(remaining / avg_consume_rate); + const time_string = moment() + .add(estimated_time, 'seconds') + .fromNow(false); + const pct_parsed = ((total_blocks / total_range) * 100).toFixed(1); + const pct_read = ((total_read / total_range) * 100).toFixed(1); + log_msg.push(`${total_blocks}/${total_read}/${total_range}`); + log_msg.push(`syncs ${time_string} (${pct_parsed}% ${pct_read}%)`); + } + + // print monitoring log + if (process.env.NOLOGS !== 'true') { + console.log(log_msg.join(', ')); + } + + if (indexedObjects === 0 && deserializedActions === 0 && consumedBlocks === 0) { + + // Allow 10s threshold before shutting down the process + shutdownTimer = setTimeout(() => { + allowShutdown = true; + }, 10000); + + // Auto-Stop + if (pushedBlocks === 0) { + idle_count++; + if (auto_stop > 0 && (tScale * idle_count) >= auto_stop) { + console.log("Reached limit for no blocks processed, stopping now..."); + rClient.set('abi_cache', JSON.stringify(abiCacheMap)); + process.exit(1); + } else { + console.log(`No blocks processed! Indexer will stop in ${auto_stop - (tScale * idle_count)} seconds!`); + } + } + } else { + if (idle_count > 1) { + console.log('Processing resumed!'); + } + idle_count = 0; + if (shutdownTimer) { + clearTimeout(shutdownTimer); + shutdownTimer = null; + } + } + + // reset counters + pushedBlocks = 0; + livePushedBlocks = 0; + consumedBlocks = 0; + liveConsumedBlocks = 0; + deserializedActions = 0; + indexedObjects = 0; + + if (_workers === 0) { + console.log('FATAL ERROR - All Workers have stopped!'); + process.exit(1); + } + + }, log_interval); + // Launch all workers workerMap.forEach((conf) => { cluster.fork(conf); }); - if (!fs.existsSync('./logs')) { - fs.mkdirSync('./logs'); - } - - const dsErrorsLog = './logs/' + process.env.CHAIN + "_ds_err_" + starting_block + "_" + head + "_" + Date.now() + ".txt"; - if (fs.existsSync(dsErrorsLog)) { - fs.unlinkSync(dsErrorsLog); - } - const ds_errors = fs.createWriteStream(dsErrorsLog, {flags: 'a'}); - const cachedMap = await getAsync(process.env.CHAIN + ":" + 'abi_cache'); - let abiCacheMap; - if (cachedMap) { - abiCacheMap = JSON.parse(cachedMap); - console.log(`Found ${Object.keys(abiCacheMap).length} entries in the local ABI cache`) - } else { - abiCacheMap = {}; - } - - setInterval(() => { - rClient.set(process.env.CHAIN + ":" + 'abi_cache', JSON.stringify(abiCacheMap)); - }, 10000); - // Worker event listener const workerHandler = (msg) => { switch (msg.event) { @@ -575,7 +791,7 @@ async function main() { // console.log(msg.data); const str = JSON.stringify(msg.data); // console.log(str); - ds_errors.write(str + '\n'); + dsErrorStream.write(str + '\n'); break; } case 'read_block': { @@ -594,6 +810,7 @@ async function main() { } } else { liveConsumedBlocks++; + onLiveBlock(msg); } break; } @@ -651,6 +868,7 @@ async function main() { }, 1000); } + // Attach stop handler pm2io.action('stop', (reply) => { allowMoreReaders = false; console.info('Stop signal received. Shutting down readers immediately!'); diff --git a/modules/action_data/daobet-eosio-delegatebw.js b/modules/action_data/daobet-eosio-delegatebw.js new file mode 100644 index 00000000..b8a30e39 --- /dev/null +++ b/modules/action_data/daobet-eosio-delegatebw.js @@ -0,0 +1,45 @@ +const hyperionModule = { + chain: "74d023a9293d9b68c3c52e2f738ee681c1671cc3dc0f263cf2c533cd5523ff95", + contract: 'eosio', + action: 'delegatebw', + parser_version: ['1.7'], + mappings: { + action: { + "@delegatebw": { + "properties": { + "from": {"type": "keyword"}, + "receiver": {"type": "keyword"}, + "stake_cpu_quantity": {"type": "float"}, + "stake_net_quantity": {"type": "float"}, + "stake_vote_quantity": {"type": "float"}, + "transfer": {"type": "boolean"}, + "amount": {"type": "float"} + } + } + } + }, + handler: (action) => { + // attach action extras here + const data = action['act']['data']; + let cpu_qtd = null; + let net_qtd = null; + let vote_qtd = null; + if (data['stake_net_quantity'] && data['stake_cpu_quantity'] && data['stake_vote_quantity']) { + cpu_qtd = parseFloat(data['stake_cpu_quantity'].split(' ')[0]); + net_qtd = parseFloat(data['stake_net_quantity'].split(' ')[0]); + vote_qtd = parseFloat(data['stake_vote_quantity'].split(' ')[0]); + } + action['@delegatebw'] = { + amount: cpu_qtd + net_qtd + vote_qtd, + stake_cpu_quantity: cpu_qtd, + stake_net_quantity: net_qtd, + stake_vote_quantity: vote_qtd, + from: data['from'], + receiver: data['receiver'], + transfer: data['transfer'] + }; + delete action['act']['data']; + } +}; + +module.exports = {hyperionModule}; diff --git a/modules/action_data/daobet-eosio-undelegatebw.js b/modules/action_data/daobet-eosio-undelegatebw.js new file mode 100644 index 00000000..44f2c09b --- /dev/null +++ b/modules/action_data/daobet-eosio-undelegatebw.js @@ -0,0 +1,43 @@ +const hyperionModule = { + chain: "74d023a9293d9b68c3c52e2f738ee681c1671cc3dc0f263cf2c533cd5523ff95", + contract: 'eosio', + action: 'undelegatebw', + parser_version: ['1.7'], + mappings: { + action: { + "@undelegatebw": { + "properties": { + "from": {"type": "keyword"}, + "receiver": {"type": "keyword"}, + "unstake_cpu_quantity": {"type": "float"}, + "unstake_net_quantity": {"type": "float"}, + "unstake_vote_quantity": {"type": "float"}, + "amount": {"type": "float"} + } + } + } + }, + handler: (action) => { + // attach action extras here + const data = action['act']['data']; + let cpu_qtd = null; + let net_qtd = null; + let vote_qtd = null; + if (data['unstake_net_quantity'] && data['unstake_cpu_quantity'] && data['unstake_vote_quantity']) { + cpu_qtd = parseFloat(data['unstake_cpu_quantity'].split(' ')[0]); + net_qtd = parseFloat(data['unstake_net_quantity'].split(' ')[0]); + vote_qtd = parseFloat(data['unstake_vote_quantity'].split(' ')[0]); + } + action['@undelegatebw'] = { + amount: cpu_qtd + net_qtd + vote_qtd, + unstake_cpu_quantity: cpu_qtd, + unstake_net_quantity: net_qtd, + unstake_vote_quantity: vote_qtd, + from: data['from'], + receiver: data['receiver'] + }; + delete action['act']['data']; + } +}; + +module.exports = {hyperionModule}; diff --git a/modules/action_data/eosio-buyram.js b/modules/action_data/eosio-buyram.js index 1eafe27a..ffd26ece 100644 --- a/modules/action_data/eosio-buyram.js +++ b/modules/action_data/eosio-buyram.js @@ -2,7 +2,7 @@ const hyperionModule = { chain: "*", contract: process.env.SYSTEM_DOMAIN, action: 'buyram', - parser_version: '1.8', + parser_version: ['1.8','1.7'], handler: (action) => { // attach action extras here const data = action['act']['data']; diff --git a/modules/action_data/eosio-buyrambytes.js b/modules/action_data/eosio-buyrambytes.js index f40539b9..3029a1f1 100644 --- a/modules/action_data/eosio-buyrambytes.js +++ b/modules/action_data/eosio-buyrambytes.js @@ -2,7 +2,7 @@ const hyperionModule = { chain: "*", contract: process.env.SYSTEM_DOMAIN, action: 'buyrambytes', - parser_version: '1.8', + parser_version: ['1.8','1.7'], handler: (action) => { // attach action extras here const data = action['act']['data']; diff --git a/modules/action_data/eosio-buyrex.js b/modules/action_data/eosio-buyrex.js index f4d334eb..dbccaf23 100644 --- a/modules/action_data/eosio-buyrex.js +++ b/modules/action_data/eosio-buyrex.js @@ -2,7 +2,7 @@ const hyperionModule = { chain: "*", contract: process.env.SYSTEM_DOMAIN, action: 'buyrex', - parser_version: '1.8', + parser_version: ['1.8','1.7'], handler: (action) => { // attach action extras here const data = action['act']['data']; diff --git a/modules/action_data/eosio-delegatebw.js b/modules/action_data/eosio-delegatebw.js index 3ced7127..e748b56d 100644 --- a/modules/action_data/eosio-delegatebw.js +++ b/modules/action_data/eosio-delegatebw.js @@ -2,7 +2,7 @@ const hyperionModule = { chain: "*", contract: process.env.SYSTEM_DOMAIN, action: 'delegatebw', - parser_version: '1.8', + parser_version: ['1.8','1.7'], handler: (action) => { // attach action extras here const data = action['act']['data']; diff --git a/modules/action_data/eosio-newaccount.js b/modules/action_data/eosio-newaccount.js index f3ec3d1b..88ad8812 100644 --- a/modules/action_data/eosio-newaccount.js +++ b/modules/action_data/eosio-newaccount.js @@ -2,7 +2,7 @@ const hyperionModule = { chain: "*", contract: process.env.SYSTEM_DOMAIN, action: 'newaccount', - parser_version: '1.8', + parser_version: ['1.8','1.7'], handler: (action) => { // attach action extras here let name = null; diff --git a/modules/action_data/eosio-undelegatebw.js b/modules/action_data/eosio-undelegatebw.js index 268c1af1..03dd6b4b 100644 --- a/modules/action_data/eosio-undelegatebw.js +++ b/modules/action_data/eosio-undelegatebw.js @@ -2,7 +2,7 @@ const hyperionModule = { chain: "*", contract: process.env.SYSTEM_DOMAIN, action: 'undelegatebw', - parser_version: '1.8', + parser_version: ['1.8','1.7'], handler: (action) => { // attach action extras here const data = action['act']['data']; diff --git a/modules/action_data/eosio-unstaketorex.js b/modules/action_data/eosio-unstaketorex.js index c7619b19..d7d04f3a 100644 --- a/modules/action_data/eosio-unstaketorex.js +++ b/modules/action_data/eosio-unstaketorex.js @@ -2,7 +2,7 @@ const hyperionModule = { chain: "*", contract: process.env.SYSTEM_DOMAIN, action: 'unstaketorex', - parser_version: '1.8', + parser_version: ['1.8','1.7'], handler: (action) => { // attach action extras here const data = action['act']['data']; diff --git a/modules/action_data/eosio-updateauth.js b/modules/action_data/eosio-updateauth.js index 4037e285..0923183f 100644 --- a/modules/action_data/eosio-updateauth.js +++ b/modules/action_data/eosio-updateauth.js @@ -2,7 +2,7 @@ const hyperionModule = { chain: "*", contract: process.env.SYSTEM_DOMAIN, action: 'updateauth', - parser_version: '1.8', + parser_version: ['1.8','1.7'], handler: (action) => { // attach action extras here const data = action['act']['data']; diff --git a/modules/action_data/eosio-voteproducer.js b/modules/action_data/eosio-voteproducer.js new file mode 100644 index 00000000..2cdc43f7 --- /dev/null +++ b/modules/action_data/eosio-voteproducer.js @@ -0,0 +1,26 @@ +const hyperionModule = { + chain: "*", + contract: 'eosio', + action: 'voteproducer', + parser_version: ['1.8', '1.7'], + mappings: { + action: { + "@voteproducer": { + "properties": { + "proxy": {"type": "keyword"}, + "producers": {"type": "keyword"} + } + } + } + }, + handler: (action) => { + // attach action extras here + const data = action['act']['data']; + action['@voteproducer'] = { + proxy: data['proxy'], + producers: data['producers'] + }; + } +}; + +module.exports = {hyperionModule}; diff --git a/modules/action_data/transfer.js b/modules/action_data/transfer.js index b976a8e8..b3c44795 100644 --- a/modules/action_data/transfer.js +++ b/modules/action_data/transfer.js @@ -2,17 +2,15 @@ const hyperionModule = { chain: "*", contract: '*', action: 'transfer', - parser_version: '1.8', + parser_version: ['1.8','1.7'], handler: (action) => { // attach action extras here let qtd = null; const data = action['act']['data']; if (data['quantity']) { qtd = data['quantity'].split(' '); - delete data['quantity']; } else if (data['value']) { qtd = data['value'].split(' '); - delete data['value']; } if (qtd) { diff --git a/modules/index.js b/modules/index.js index 2b5a20f5..b46f7bd6 100644 --- a/modules/index.js +++ b/modules/index.js @@ -1,10 +1,13 @@ const fs = require('fs'); const path = require('path'); +const chainID = require('../connections.json').chains[process.env.CHAIN]['chain_id']; class HyperionModuleLoader { #handledActions = new Map(); actionParser; + chainMappings = new Map(); + extraMappings = []; constructor() { this.loadActionHandlers(); @@ -14,34 +17,50 @@ class HyperionModuleLoader { } processActionData(action) { - const wildcard = this.#handledActions.get('*'); - if (wildcard.has(action.act.name)) { wildcard.get(action.act.name)(action); } - if (this.#handledActions.has(action.act.account)) { const _c = this.#handledActions.get(action.act.account); if (_c.has(action.act.name)) { _c.get(action.act.name)(action); } } + } + includeModule(_module) { + if (this.#handledActions.has(_module.contract)) { + const existing = this.#handledActions.get(_module.contract); + existing.set(_module.action, _module.handler); + } else { + const _map = new Map(); + _map.set(_module.action, _module.handler); + this.#handledActions.set(_module.contract, _map); + } + if (_module.mappings) { + this.extraMappings.push(_module.mappings); + } } loadActionHandlers() { const files = fs.readdirSync('modules/action_data/'); for (const plugin of files) { const _module = require(path.join(__dirname, 'action_data', plugin)).hyperionModule; - if (_module.parser_version === process.env.PARSER) { - if (this.#handledActions.has(_module.contract)) { - const existing = this.#handledActions.get(_module.contract); - existing.set(_module.action, _module.handler); - } else { - const _map = new Map(); - _map.set(_module.action, _module.handler); - this.#handledActions.set(_module.contract, _map); + if (_module.parser_version.includes(process.env.PARSER)) { + if (_module.chain === chainID || _module.chain === '*') { + const key = `${_module.contract}::${_module.action}`; + if (this.chainMappings.has(key)) { + if (this.chainMappings.get(key) === '*' && _module.chain === chainID) { + // console.log('Overwriting module ' + key + ' for ' + _module.chain); + this.includeModule(_module); + this.chainMappings.set(key, _module.chain); + } + } else { + // console.log('Including module ' + key + ' for ' + _module.chain); + this.includeModule(_module); + this.chainMappings.set(key, _module.chain); + } } } } diff --git a/modules/parsers/1.7-parser.js b/modules/parsers/1.7-parser.js new file mode 100644 index 00000000..236cfbc9 --- /dev/null +++ b/modules/parsers/1.7-parser.js @@ -0,0 +1,186 @@ +const prettyjson = require("prettyjson"); +const {action_blacklist} = require('../../definitions/blacklists'); +const {action_whitelist} = require('../../definitions/whitelists'); +const {deserialize, debugLog, unzipAsync} = require('../../helpers/functions'); +const {TextEncoder, TextDecoder} = require('util'); +const txDec = new TextDecoder(); +const txEnc = new TextEncoder(); +const chain = process.env.CHAIN; + +function checkBlacklist(act) { + if (action_blacklist.has(`${chain}::${act['account']}::*`)) { + return true; + } else return action_blacklist.has(`${chain}::${act['account']}::${act['name']}`); +} + +function checkWhitelist(act) { + if (action_whitelist.has(`${chain}::${act['account']}::*`)) { + return true; + } else return action_whitelist.has(`${chain}::${act['account']}::${act['name']}`); +} + +const reading_mode = process.env.live_mode; + +async function actionParser(common, ts, action, trx_data, _actDataArray, + _processedTraces, full_trace, parent, current_ord) { + const {trx_id, block_num, producer, cpu_usage_us, net_usage_words} = trx_data; + let act = action['act']; + + // Include ordinals + if (parent === null) { + action['creator_action_ordinal'] = 0; + action['action_ordinal'] = 1; + } else { + action['creator_action_ordinal'] = parent; + if (current_ord !== null) { + action['action_ordinal'] = current_ord; + } + } + + // abort if blacklisted + if (checkBlacklist(act)) { + return false; + } + + if (action_whitelist.size > 0) { + if (!checkWhitelist(act)) { + return false; + } + } + + const original_act = Object.assign({}, act); + const actions = []; + actions.push(act); + let ds_act; + try { + ds_act = await common.deserializeActionsAtBlock(actions, block_num); + action['act'] = ds_act[0]; + common.attachActionExtras(action); + // report deserialization event + process.send({event: 'ds_action'}); + } catch (e) { + // write error to CSV + console.log(e); + process.send({ + event: 'ds_error', + data: { + type: 'action_ds_error', + block: block_num, + account: act.account, + action: act.name, + gs: parseInt(action['receipt'][1]['global_sequence'], 10), + message: e.message + } + }); + action['act'] = original_act; + action['act']['data'] = Buffer.from(action['act']['data']).toString('hex'); + } + + action['@timestamp'] = ts; + action['block_num'] = block_num; + action['producer'] = producer; + action['trx_id'] = trx_id; + + if (action['account_ram_deltas'].length === 0) { + delete action['account_ram_deltas']; + } + if (action['console'] === '') { + delete action['console']; + } + if (action['except'] === null) { + if (!action['receipt']) { + console.log(full_trace.status); + console.log(action); + } + action['receipt'] = action['receipt'][1]; + action['global_sequence'] = parseInt(action['receipt']['global_sequence'], 10); + delete action['except']; + delete action['error_code']; + + // add usage data to the first action on the transaction + if (action['action_ordinal'] === 1 && action['creator_action_ordinal'] === 0) { + action['cpu_usage_us'] = cpu_usage_us; + action['net_usage_words'] = net_usage_words; + } + + if (action['inline_traces']) { + let newOrds = action['action_ordinal'] + 1; + for (const inline of action['inline_traces']) { + await actionParser(common, ts, inline[1], trx_data, + _actDataArray, _processedTraces, full_trace, + action['action_ordinal'], newOrds); + newOrds++; + } + } + delete action['inline_traces']; + _processedTraces.push(action); + + } else { + console.log(action); + } + return true; +} + +async function messageParser(common, messages, types, ch, ch_ready) { + for (const message of messages) { + const ds_msg = deserialize('result', message.content, txEnc, txDec, types); + const res = ds_msg[1]; + let block, traces = [], deltas = []; + if (res.block && res.block.length) { + block = deserialize('signed_block', res.block, txEnc, txDec, types); + if (block === null) { + console.log(res); + } + } + if (res['traces'] && res['traces'].length) { + traces = deserialize( + 'transaction_trace[]', + await unzipAsync(res['traces']), + txEnc, + txDec, + types + ); + } + if (res['deltas'] && res['deltas'].length) { + deltas = deserialize( + 'table_delta[]', + await unzipAsync(res['deltas']), + txEnc, + txDec, + types + ); + } + try { + const t0 = Date.now(); + const result = await common.processBlock(res, block, traces, deltas); + const elapsedTime = Date.now() - t0; + if (elapsedTime > 10) { + debugLog(`[WARNING] Deserialization time for block ${result['block_num']} was too high, time elapsed ${elapsedTime}ms`); + } + if (result) { + const evPayload = { + event: 'consumed_block', + block_num: result['block_num'], + live: reading_mode + }; + if (block) { + evPayload["producer"] = block['producer']; + } + process.send(evPayload); + } else { + console.log('Empty message. No block'); + console.log(_.omit(res, ['block', 'traces', 'deltas'])); + } + if (ch_ready) { + ch.ack(message); + } + } catch (e) { + console.log(e); + if (ch_ready) { + ch.nack(message); + } + } + } +} + +module.exports = {actionParser, messageParser}; diff --git a/modules/parsers/1.8-parser.js b/modules/parsers/1.8-parser.js index 72c1b062..308794a0 100644 --- a/modules/parsers/1.8-parser.js +++ b/modules/parsers/1.8-parser.js @@ -76,15 +76,14 @@ module.exports = { if (!action['receipt']) { console.log(full_trace.status); console.log(action); - m } action['receipt'] = action['receipt'][1]; action['global_sequence'] = parseInt(action['receipt']['global_sequence'], 10); delete action['except']; delete action['error_code']; - // add usage data to the first action on the transaction - if (action['action_ordinal'] === 1 && action['creator_action_ordinal'] === 0) { + // add usage data to all 0 ordinal actions + if (action['creator_action_ordinal'] === 0) { action['cpu_usage_us'] = cpu_usage_us; action['net_usage_words'] = net_usage_words; } @@ -121,11 +120,15 @@ module.exports = { debugLog(`[WARNING] Deserialization time for block ${result['block_num']} was too high, time elapsed ${elapsedTime}ms`); } if (result) { - process.send({ + const evPayload = { event: 'consumed_block', block_num: result['block_num'], live: reading_mode - }); + }; + if (block) { + evPayload["producer"] = block['producer']; + } + process.send(evPayload); } else { console.log('Empty message. No block'); console.log(_.omit(res, ['block', 'traces', 'deltas'])); diff --git a/package.json b/package.json index af506712..795e90bd 100644 --- a/package.json +++ b/package.json @@ -1,12 +1,12 @@ { "name": "hyperion-history", - "version": "2.0.0", + "version": "2.5.0", "description": "Scalable Full History API Solution for EOSIO based blockchains", "main": "launcher.js", "scripts": { - "start:indexer": "pm2 start --only Indexer --update-env", + "logs": "pm2 logs Indexer", "start:api": "pm2 start --only API --update-env", - "logs": "pm2 logs Indexer" + "start:indexer": "pm2 start --only Indexer --update-env" }, "author": { "name": "EOS Rio", @@ -18,8 +18,8 @@ }, "license": "MIT", "dependencies": { - "@elastic/elasticsearch": "^7.4.0", - "@pm2/io": "^4.3.2", + "@elastic/elasticsearch": "^7.5.0", + "@pm2/io": "4.3.3", "amqplib": "^0.5.5", "async": "^3.1.0", "async-redis": "^1.1.7", @@ -27,15 +27,16 @@ "eosjs-ecc": "^4.0.7", "fastify": "^2.10.0", "fastify-autoload": "^1.0.0", - "fastify-cors": "^2.2.0", + "fastify-cors": "^3.0.0", "fastify-elasticsearch": "^1.1.0", "fastify-formbody": "^3.1.0", "fastify-oas": "^2.5.0", "fastify-plugin": "latest", - "fastify-rate-limit": "^2.4.0", + "fastify-rate-limit": "^3.0.0", "fastify-redis": "^3.1.1", - "fastify-websocket": "^0.6.0", - "got": "^9.6.0", + "fastify-static": "latest", + "fastify-websocket": "^1.1.0", + "got": "^10.0.1", "ioredis": "^4.14.1", "lodash": "^4.17.15", "moment": "^2.24.0", @@ -44,6 +45,7 @@ "socket.io": "^2.3.0", "socket.io-client": "latest", "socket.io-redis": "^5.2.0", - "ws": "^7.2.0" + "ws": "^7.2.0", + "fastify-static": "latest" } } diff --git a/workers/deserializer.worker.js b/workers/deserializer.worker.js index 46918b63..66b3a72e 100644 --- a/workers/deserializer.worker.js +++ b/workers/deserializer.worker.js @@ -89,6 +89,7 @@ async function processBlock(res, block, traces, deltas) { let producer = ''; let ts = ''; const block_num = res['this_block']['block_num']; + const block_ts = res['this_time']; if (process.env.FETCH_BLOCK === 'true') { if (!block) { console.log(res); @@ -135,7 +136,7 @@ async function processBlock(res, block, traces, deltas) { if (deltas && process.env.PROC_DELTAS === 'true') { const t1 = Date.now(); - await processDeltas(deltas, block_num); + await processDeltas(deltas, block_num, block_ts); const elapsed_time = Date.now() - t1; if (elapsed_time > 10) { debugLog(`[WARNING] Delta processing took ${elapsed_time}ms on block ${block_num}`); @@ -153,23 +154,21 @@ async function processBlock(res, block, traces, deltas) { const _actDataArray = []; const _processedTraces = []; const action_traces = transaction_trace['action_traces']; - // console.log(transaction_trace['partial']); const t3 = Date.now(); for (const action_trace of action_traces) { if (action_trace[0] === 'action_trace_v0') { const action = action_trace[1]; const trx_data = {trx_id, block_num, producer, cpu_usage_us, net_usage_words}; - const status = await mLoader.actionParser(common, ts, action, trx_data, _actDataArray, _processedTraces, transaction_trace); - if (status) { + if (await mLoader.actionParser(common, ts, action, trx_data, _actDataArray, _processedTraces, transaction_trace, null, 0)) { action_count++; } } } - const _finalTraces = []; + const _finalTraces = []; if (_processedTraces.length > 0) { + const digestMap = new Map(); - // console.log(`----------- TRX ${trx_id} ------------------`); for (let i = 0; i < _processedTraces.length; i++) { const receipt = _processedTraces[i].receipt; const act_digest = receipt['act_digest']; @@ -181,6 +180,7 @@ async function processBlock(res, block, traces, deltas) { digestMap.set(act_digest, _arr); } } + _processedTraces.forEach(data => { const digest = data['receipt']['act_digest']; if (digestMap.has(digest)) { @@ -205,8 +205,6 @@ async function processBlock(res, block, traces, deltas) { digestMap.delete(digest); } }); - // console.log(prettyjson.render(_finalTraces)); - // console.log(`---------------------------------------------`); } // Submit Actions after deduplication @@ -338,7 +336,19 @@ function extractDeltaStruct(deltas) { return deltaStruct; } -async function processDeltas(deltas, block_num) { +function pushToDeltaQueue(bufferdata) { + const q = index_queue_prefix + "_deltas:" + (delta_emit_idx); + preIndexingQueue.push({ + queue: q, + content: bufferdata + }); + delta_emit_idx++; + if (delta_emit_idx > (n_ingestors_per_queue * action_indexing_ratio)) { + delta_emit_idx = 1; + } +} + +async function processDeltas(deltas, block_num, block_ts) { const deltaStruct = extractDeltaStruct(deltas); // if (Object.keys(deltaStruct).length > 4) { @@ -362,6 +372,9 @@ async function processDeltas(deltas, block_num) { block: block_num, abi: jsonABIString }; + if (process.env.PATCHED_SHIP === 'true') { + new_abi_object['@timestamp'] = block_ts; + } debugLog(`[Worker ${process.env.worker_id}] read ${account['name']} ABI at block ${block_num}`); const q = index_queue_prefix + "_abis:1"; preIndexingQueue.push({ @@ -397,44 +410,86 @@ async function processDeltas(deltas, block_num) { // Contract Rows if (deltaStruct['contract_row']) { const rows = deltaStruct['contract_row']; + const actionDeltaMap = {}; for (const row of rows) { const sb = createSerialBuffer(row.data); try { - const payload = { - present: row.present, - version: sb.get(), - code: sb.getName(), - scope: sb.getName(), - table: sb.getName(), - primary_key: sb.getUint64AsNumber(), - payer: sb.getName(), - value: sb.getBytes() - }; - if (process.env.INDEX_ALL_DELTAS === 'true' || (payload.code === system_domain || payload.table === 'accounts')) { + const payload = {}; + payload['present'] = row.present; + payload['block_num'] = block_num; + + // deserialize action data + sb.get(); + payload['code'] = sb.getName(); + payload['scope'] = sb.getName(); + payload['table'] = sb.getName(); + payload['primary_key'] = sb.getUint64AsNumber(); + payload['payer'] = sb.getName(); + if (process.env.PATCHED_SHIP === 'true') { + payload['global_sequence'] = sb.getUint64AsNumber(); + payload["@timestamp"] = block_ts; + } + payload['value'] = sb.getBytes(); + + // if (!contractRowObject.code) { + // contractRowObject.code = payload['code']; + // } else { + // if (contractRowObject.code !== payload['code']) { + // console.log(contractRowObject.code, payload['code']); + // } + // } + + if (process.env.INDEX_ALL_DELTAS === 'true' || (payload.code === 'eosio' || payload.table === 'accounts')) { + // const gs = payload['global_sequence']; const jsonRow = await processContractRow(payload, block_num); if (jsonRow['data']) { - await processTableDelta(jsonRow, block_num); - } - // if (!payload.present && payload.code === 'eosio.msig') { - // console.log(block_num, jsonRow); - // } + // check for specific deltas to be indexed + const indexableData = await processTableDelta(jsonRow, block_num); + if (indexableData) { + if (process.env.ENABLE_INDEXING === 'true' && process.env.INDEX_DELTAS === 'true') { + const payload = Buffer.from(JSON.stringify(jsonRow)); + pushToDeltaQueue(payload); + if (allowStreaming && process.env.STREAM_DELTAS === 'true') { + ch.publish('', queue_prefix + ':stream', payload, { + headers: { + event: 'delta', + code: jsonRow.code, + table: jsonRow.table + } + }); + } + // if (process.env.PATCHED_SHIP === 'true') { + // if (!actionDeltaMap[gs]) { + // actionDeltaMap[gs] = { + // "@timestamp": block_ts, + // global_sequence: gs, + // block_num: block_num, + // code: payload['code'], + // delta: [_.omit(jsonRow, ['code'])] + // }; + // } else { + // actionDeltaMap[gs].delta.push(_.omit(jsonRow, ['code'])); + // } + // } else { + // pushToDeltaQueue(payload); + // } - if (allowStreaming && process.env.STREAM_DELTAS === 'true') { - const payload = Buffer.from(JSON.stringify(jsonRow)); - ch.publish('', queue_prefix + ':stream', payload, { - headers: { - event: 'delta', - code: jsonRow.code, - table: jsonRow.table } - }); + } } } } catch (e) { console.log(block_num, e); } } + + // for (const key of Object.keys(actionDeltaMap)) { + // + // console.log(actionDeltaMap[key]); + // + // } + } // TODO: store permission links on a dedicated index @@ -823,11 +878,10 @@ async function storeAccount(data) { } } -async function processTableDelta(data, block_num) { +async function processTableDelta(data) { if (data['table']) { - data['block_num'] = block_num; data['primary_key'] = String(data['primary_key']); - let allowIndex = true; + let allowIndex; let handled = false; const key = `${data.code}:${data.table}`; if (tableHandlers[key]) { @@ -844,20 +898,8 @@ async function processTableDelta(data, block_num) { } if (!handled && process.env.INDEX_ALL_DELTAS === 'true') { allowIndex = true; - } else if (handled) { - allowIndex = true; - } - if (process.env.ENABLE_INDEXING === 'true' && allowIndex && process.env.INDEX_DELTAS === 'true') { - const q = index_queue_prefix + "_deltas:" + (delta_emit_idx); - preIndexingQueue.push({ - queue: q, - content: Buffer.from(JSON.stringify(data)) - }); - delta_emit_idx++; - if (delta_emit_idx > (n_ingestors_per_queue * action_indexing_ratio)) { - delta_emit_idx = 1; - } - } + } else allowIndex = handled; + return allowIndex; } } diff --git a/workers/indexer.worker.js b/workers/indexer.worker.js index 623f060d..61d4118b 100644 --- a/workers/indexer.worker.js +++ b/workers/indexer.worker.js @@ -31,7 +31,7 @@ function assertQueues() { ch.assertQueue(process.env['queue'], {durable: true}); ch.prefetch(indexingPrefecthCount); ch.consume(process.env['queue'], indexQueue.push); - console.log(`setting up indexer on queue ${process.env['queue']}`); + console.log(`indexer listening on ${process.env['queue']}`); } } catch (e) { console.error('rabbitmq error!'); diff --git a/workers/state-reader.worker.js b/workers/state-reader.worker.js index 2820a731..38bae3f3 100644 --- a/workers/state-reader.worker.js +++ b/workers/state-reader.worker.js @@ -8,6 +8,11 @@ const txDec = new TextDecoder(); const txEnc = new TextEncoder(); let ch, api, abi, ship, types, cch, rpc; + +// elasticsearch client access for fork handling operations +let client; +const index_version = process.env.CREATE_INDICES; + let cch_ready = false; let tables = new Map(); let chainID = null; @@ -141,6 +146,7 @@ function processFirstABI(data) { abi = JSON.parse(data); types = Serialize.getTypesFromAbi(Serialize.createInitialTypes(), abi); abi.tables.map(table => tables.set(table.name, table.type)); + // console.log(prettyjson.render(abi)); process.send({ event: 'init_abi', data: data @@ -162,6 +168,43 @@ function processFirstABI(data) { } } +async function logForkEvent(starting_block, ending_block, new_id) { + await client.index({ + index: queue_prefix + '-logs', + body: { + type: 'fork', + '@timestamp': new Date().toISOString(), + 'fork.from_block': starting_block, + 'fork.to_block': ending_block, + 'fork.size': ending_block - starting_block + 1, + 'fork.new_block_id': new_id + } + }); +} + +async function handleFork(data) { + const this_block = data['this_block']; + await logForkEvent(this_block['block_num'], local_block_num, this_block['block_id']); + console.log(`Handling fork event: new block ${this_block['block_num']} has id ${this_block['block_id']}`); + console.log(`Removing indexed data from ${this_block['block_num']} to ${local_block_num}`); + const searchBody = { + query: { + bool: { + must: [{range: {block_num: {gte: this_block['block_num'], lte: local_block_num}}}] + } + } + }; + const indexName = queue_prefix + '-delta-' + index_version + '-*'; + const {body} = await client.deleteByQuery({ + index: indexName, + refresh: true, + body: searchBody + }); + console.log(body); + console.log(`Live reading resumed!`); +} + +// Entrypoint for incoming blocks async function onMessage(data) { if (abi) { if (recovery) { @@ -197,35 +240,39 @@ async function onMessage(data) { const res = deserialize('result', data, txEnc, txDec, types)[1]; if (res['this_block']) { const blk_num = res['this_block']['block_num']; - if (isLiveReader) debugLog(`${process.env.CHAIN.toUpperCase()} reader at block ${blk_num}`); - if (future_block !== 0 && future_block === blk_num) { - console.log('Missing block ' + blk_num + ' received!'); + if (isLiveReader) { + // console.log(`${new Date().toISOString()} :: ${blk_num} :: ${res['this_block']['block_id'].toLowerCase()}`); + if (blk_num !== local_block_num + 1) { + await handleFork(res); + } else { + local_block_num = blk_num; + } + stageOneDistQueue.push({num: blk_num, content: data}); + return 1; } else { - future_block = 0; - } - if (blk_num === local_block_num + 1) { - local_block_num = blk_num; - if (res['block'] || res['traces'] || res['deltas']) { - stageOneDistQueue.push({ - num: blk_num, - content: data - }); - return 1; + if (future_block !== 0 && future_block === blk_num) { + console.log('Missing block ' + blk_num + ' received!'); } else { - if (blk_num === 1) { - stageOneDistQueue.push({ - num: blk_num, - content: data - }); + future_block = 0; + } + if (blk_num === local_block_num + 1) { + local_block_num = blk_num; + if (res['block'] || res['traces'] || res['deltas']) { + stageOneDistQueue.push({num: blk_num, content: data}); return 1; } else { - return 0; + if (blk_num === 1) { + stageOneDistQueue.push({num: blk_num, content: data}); + return 1; + } else { + return 0; + } } + } else { + console.log(`[${role}] missing block: ${(local_block_num + 1)} current block: ${blk_num}`); + future_block = blk_num + 1; + return 0; } - } else { - console.log(`[${role}] missing block: ${(local_block_num + 1)} current block: ${blk_num}`); - future_block = blk_num + 1; - return 0; } } else { return 0; @@ -419,6 +466,7 @@ function createNulledApi(chainID) { async function run() { rpc = manager.nodeosJsonRPC; + client = manager.elasticsearchClient; const chain_data = await rpc.get_info(); chainID = chain_data.chain_id; api = createNulledApi(chainID);