From 7ac37e7fd78798eac423680d577c5e568b9d7f67 Mon Sep 17 00:00:00 2001 From: ShivsundarR Date: Wed, 1 Jul 2026 15:41:18 +0530 Subject: [PATCH 1/2] Add divide-by-zero error check --- .../metrics/KafkaConsumerMetrics.java | 3 ++- .../metrics/KafkaShareConsumerMetrics.java | 3 ++- .../clients/consumer/KafkaConsumerTest.java | 20 +++++++++++++++++++ .../KafkaShareConsumerMetricsTest.java | 19 ++++++++++++++++++ 4 files changed, 43 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java index 0a2cf694d4944..9e079c07782ec 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java @@ -102,7 +102,8 @@ public void recordPollStart(long pollStartMs) { public void recordPollEnd(long pollEndMs) { long pollTimeMs = pollEndMs - pollStartMs; - double pollIdleRatio = pollTimeMs * 1.0 / (pollTimeMs + timeSinceLastPollMs); + long pollCycleTimeMs = pollTimeMs + timeSinceLastPollMs; + double pollIdleRatio = pollCycleTimeMs == 0 ? 0.0 : pollTimeMs * 1.0 / pollCycleTimeMs; this.pollIdleSensor.record(pollIdleRatio); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java index 8903f046c5ecf..96f9ea6f1bd34 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java @@ -79,7 +79,8 @@ public void recordPollStart(long pollStartMs) { public void recordPollEnd(long pollEndMs) { long pollTimeMs = pollEndMs - pollStartMs; - double pollIdleRatio = pollTimeMs * 1.0 / (pollTimeMs + timeSinceLastPollMs); + long pollCycleTimeMs = pollTimeMs + timeSinceLastPollMs; + double pollIdleRatio = pollCycleTimeMs == 0 ? 0.0 : pollTimeMs * 1.0 / pollCycleTimeMs; this.pollIdleSensor.record(pollIdleRatio); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index b0358baebfdfc..638868a29d399 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -3775,6 +3775,26 @@ public void testPollIdleRatio(GroupProtocol groupProtocol) { assertEquals((1.0d + 0.0d + 0.5d) / 3, consumer.metrics().get(pollIdleRatio).metricValue()); } + @ParameterizedTest + @EnumSource(GroupProtocol.class) + public void testPollIdleRatioZero(GroupProtocol groupProtocol) { + ConsumerMetadata metadata = createMetadata(subscription); + MockClient client = new MockClient(time, metadata); + initMetadata(client, Map.of(topic, 1)); + + KafkaConsumer consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId); + // MetricName object to check + Metrics metrics = consumer.metricsRegistry(); + MetricName pollIdleRatio = metrics.metricName("poll-idle-ratio-avg", "consumer-metrics"); + // Test default value + assertEquals(Double.NaN, consumer.metrics().get(pollIdleRatio).metricValue()); + + // Poll starts and ends within the same millisecond, so the metric should be 0. + consumer.kafkaConsumerMetrics().recordPollStart(time.milliseconds()); + consumer.kafkaConsumerMetrics().recordPollEnd(time.milliseconds()); + assertEquals(0.0d, consumer.metrics().get(pollIdleRatio).metricValue()); + } + private static boolean consumerMetricPresent(KafkaConsumer consumer, String name) { MetricName metricName = new MetricName(name, "consumer-metrics", "", Collections.emptyMap()); return consumer.metricsRegistry().metrics().containsKey(metricName); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java index a9dc21d3d185a..84044d5fafce7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java @@ -165,6 +165,25 @@ public void testPollIdleRatio() { assertEquals((1.0d + 0.0d + 0.5d) / 3, consumer.metrics().get(pollIdleRatio).metricValue()); } + @Test + public void testPollIdleRatioZero() { + ShareConsumerMetadata metadata = createMetadata(subscription); + MockClient client = new MockClient(time, metadata); + initMetadata(client, Map.of(topic, 1)); + + KafkaShareConsumer consumer = newShareConsumer(time, client, subscription, metadata); + // MetricName object to check + Metrics metrics = consumer.metricsRegistry(); + MetricName pollIdleRatio = metrics.metricName("poll-idle-ratio-avg", CONSUMER_SHARE_METRIC_GROUP_PREFIX + "-metrics"); + // Test default value + assertEquals(Double.NaN, consumer.metrics().get(pollIdleRatio).metricValue()); + + // Poll starts and ends within the same millisecond, so the metric should be 0. + consumer.kafkaShareConsumerMetrics().recordPollStart(time.milliseconds()); + consumer.kafkaShareConsumerMetrics().recordPollEnd(time.milliseconds()); + assertEquals(0.0d, consumer.metrics().get(pollIdleRatio).metricValue()); + } + private static boolean consumerMetricPresent(KafkaShareConsumer consumer, String name) { MetricName metricName = new MetricName(name, CONSUMER_SHARE_METRIC_GROUP_PREFIX + "-metrics", "", Map.of()); return consumer.metricsRegistry().metrics().containsKey(metricName); From fbe3dd3f7f6d25fa92d4a755f8a9e83ac697a4b7 Mon Sep 17 00:00:00 2001 From: ShivsundarR Date: Wed, 1 Jul 2026 16:42:47 +0530 Subject: [PATCH 2/2] Nit comments --- .../consumer/internals/metrics/KafkaConsumerMetrics.java | 2 +- .../consumer/internals/metrics/KafkaShareConsumerMetrics.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java index 9e079c07782ec..1989390953089 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java @@ -103,7 +103,7 @@ public void recordPollStart(long pollStartMs) { public void recordPollEnd(long pollEndMs) { long pollTimeMs = pollEndMs - pollStartMs; long pollCycleTimeMs = pollTimeMs + timeSinceLastPollMs; - double pollIdleRatio = pollCycleTimeMs == 0 ? 0.0 : pollTimeMs * 1.0 / pollCycleTimeMs; + double pollIdleRatio = pollCycleTimeMs == 0 ? 0.0 : (pollTimeMs * 1.0 / pollCycleTimeMs); this.pollIdleSensor.record(pollIdleRatio); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java index 96f9ea6f1bd34..5f2c94c4eb183 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java @@ -80,7 +80,7 @@ public void recordPollStart(long pollStartMs) { public void recordPollEnd(long pollEndMs) { long pollTimeMs = pollEndMs - pollStartMs; long pollCycleTimeMs = pollTimeMs + timeSinceLastPollMs; - double pollIdleRatio = pollCycleTimeMs == 0 ? 0.0 : pollTimeMs * 1.0 / pollCycleTimeMs; + double pollIdleRatio = pollCycleTimeMs == 0 ? 0.0 : (pollTimeMs * 1.0 / pollCycleTimeMs); this.pollIdleSensor.record(pollIdleRatio); } }