From b6667b7c406f13f74e72955b0889818e7bf6d990 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 19 Mar 2026 12:00:26 -0700 Subject: [PATCH] Console: Report workers active with any interesting activity. Currently workers are reported as active in the web console if they report nonzero rows for any channel. However, because segment input rows are typically reported when the segment is done processing, workers that just started can be active even when they have not yet reported rows. This patch adjusts the logic such that any nonzero rows, files, bytes, frames, or wall time is enough to consider a worker active. --- .../src/druid-models/stages/stages.spec.ts | 57 +++++++++++++++++++ web-console/src/druid-models/stages/stages.ts | 26 ++++++--- .../execution-stages-pane.tsx | 2 +- 3 files changed, 77 insertions(+), 8 deletions(-) diff --git a/web-console/src/druid-models/stages/stages.spec.ts b/web-console/src/druid-models/stages/stages.spec.ts index b0564e67b5ee..d0054d7fe0d7 100644 --- a/web-console/src/druid-models/stages/stages.spec.ts +++ b/web-console/src/druid-models/stages/stages.spec.ts @@ -422,6 +422,63 @@ describe('Stages', () => { // Worker 1 has output/shuffle data, so it's active even though input is zero expect(inactiveCount).toBe(1); }); + + it('counts worker as active if it has wall time but no channel activity yet', () => { + const customStages = new Stages( + [ + { + stageNumber: 0, + definition: { + id: 'test-stage', + input: [ + { + type: 'external', + inputSource: { type: 'http', uris: [] }, + inputFormat: { type: 'json' }, + signature: [], + }, + ], + processor: { type: 'scan' }, + signature: [], + maxWorkerCount: 2, + }, + phase: 'READING_INPUT', + workerCount: 2, + partitionCount: 1, + }, + ], + { + '0': { + '0': { + // Worker 0 has wall time but no channel activity yet + input0: { + type: 'channel', + rows: [0], + }, + cpu: { + type: 'cpus', + main: { + type: 'cpu', + cpu: 500000, + wall: 1000000, + }, + }, + }, + '1': { + // Worker 1 is truly inactive - no wall time, no channel activity + input0: { + type: 'channel', + rows: [0], + }, + }, + }, + }, + ); + + const inactiveCount = customStages.getInactiveWorkerCount(customStages.stages[0]); + // Worker 0 has wall time, so only worker 1 is inactive + expect(inactiveCount).toBe(1); + }); }); describe('#getByPartitionCountersForStage', () => { diff --git a/web-console/src/druid-models/stages/stages.ts b/web-console/src/druid-models/stages/stages.ts index 6831891a8b0c..46c8387b362c 100644 --- a/web-console/src/druid-models/stages/stages.ts +++ b/web-console/src/druid-models/stages/stages.ts @@ -644,18 +644,30 @@ export class Stages { const channelCounters = this.getChannelCounterNamesForStage(stage); - // Calculate and return the number of workers that have zero count across all inputChannelCounters + // Calculate and return the number of workers that have zero interesting counters return sum( - Object.values(forStageCounters).map(stageCounters => - Number( + Object.values(forStageCounters).map(stageCounters => { + // Check if the worker has any wall time recorded + const { cpu } = stageCounters; + if (cpu) { + const totalWall = sum(CPUS_COUNTER_FIELDS, field => cpu[field]?.wall || 0); + if (totalWall > 0) return 0; + } + + // Check if the worker has any channel activity + return Number( channelCounters.every(channel => { const c = stageCounters[channel]; if (!c) return true; - const totalRows = sum(c.rows || []); - return totalRows === 0; + return ( + sum(c.rows || []) === 0 && + sum(c.files || []) === 0 && + sum(c.bytes || []) === 0 && + sum(c.frames || []) === 0 + ); }), - ), - ), + ); + }), ); } diff --git a/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx b/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx index 74bff4380daf..197b0aba8a22 100644 --- a/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx +++ b/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx @@ -985,7 +985,7 @@ ${title} uncompressed size: ${formatBytesCompact(
{formatInteger(value)}
{`${formatInteger(inactiveWorkers)} inactive`}
);