Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions web-console/src/druid-models/stages/stages.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,63 @@ describe('Stages', () => {
// Worker 1 has output/shuffle data, so it's active even though input is zero
expect(inactiveCount).toBe(1);
});

it('counts worker as active if it has wall time but no channel activity yet', () => {
const customStages = new Stages(
[
{
stageNumber: 0,
definition: {
id: 'test-stage',
input: [
{
type: 'external',
inputSource: { type: 'http', uris: [] },
inputFormat: { type: 'json' },
signature: [],
},
],
processor: { type: 'scan' },
signature: [],
maxWorkerCount: 2,
},
phase: 'READING_INPUT',
workerCount: 2,
partitionCount: 1,
},
],
{
'0': {
'0': {
// Worker 0 has wall time but no channel activity yet
input0: {
type: 'channel',
rows: [0],
},
cpu: {
type: 'cpus',
main: {
type: 'cpu',
cpu: 500000,
wall: 1000000,
},
},
},
'1': {
// Worker 1 is truly inactive - no wall time, no channel activity
input0: {
type: 'channel',
rows: [0],
},
},
},
},
);

const inactiveCount = customStages.getInactiveWorkerCount(customStages.stages[0]);
// Worker 0 has wall time, so only worker 1 is inactive
expect(inactiveCount).toBe(1);
});
});

describe('#getByPartitionCountersForStage', () => {
Expand Down
26 changes: 19 additions & 7 deletions web-console/src/druid-models/stages/stages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -644,18 +644,30 @@ export class Stages {

const channelCounters = this.getChannelCounterNamesForStage(stage);

// Calculate and return the number of workers that have zero count across all inputChannelCounters
// Calculate and return the number of workers that have zero interesting counters
return sum(
Object.values(forStageCounters).map(stageCounters =>
Number(
Object.values(forStageCounters).map(stageCounters => {
// Check if the worker has any wall time recorded
const { cpu } = stageCounters;
if (cpu) {
const totalWall = sum(CPUS_COUNTER_FIELDS, field => cpu[field]?.wall || 0);
if (totalWall > 0) return 0;
}

// Check if the worker has any channel activity
return Number(
channelCounters.every(channel => {
const c = stageCounters[channel];
if (!c) return true;
const totalRows = sum(c.rows || []);
return totalRows === 0;
return (
sum(c.rows || []) === 0 &&
sum(c.files || []) === 0 &&
sum(c.bytes || []) === 0 &&
sum(c.frames || []) === 0
);
}),
),
),
);
}),
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,7 @@ ${title} uncompressed size: ${formatBytesCompact(
<div>{formatInteger(value)}</div>
<div
className="detail-line"
data-tooltip="Workers are counted as inactive until they report starting to read rows from their input."
data-tooltip="Workers are counted as active once they report any activity."
>{`${formatInteger(inactiveWorkers)} inactive`}</div>
</div>
);
Expand Down
Loading