diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java index 0a2cf694d4944..1989390953089 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java @@ -102,7 +102,8 @@ public void recordPollStart(long pollStartMs) { public void recordPollEnd(long pollEndMs) { long pollTimeMs = pollEndMs - pollStartMs; - double pollIdleRatio = pollTimeMs * 1.0 / (pollTimeMs + timeSinceLastPollMs); + long pollCycleTimeMs = pollTimeMs + timeSinceLastPollMs; + double pollIdleRatio = pollCycleTimeMs == 0 ? 0.0 : (pollTimeMs * 1.0 / pollCycleTimeMs); this.pollIdleSensor.record(pollIdleRatio); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java index 8903f046c5ecf..5f2c94c4eb183 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java @@ -79,7 +79,8 @@ public void recordPollStart(long pollStartMs) { public void recordPollEnd(long pollEndMs) { long pollTimeMs = pollEndMs - pollStartMs; - double pollIdleRatio = pollTimeMs * 1.0 / (pollTimeMs + timeSinceLastPollMs); + long pollCycleTimeMs = pollTimeMs + timeSinceLastPollMs; + double pollIdleRatio = pollCycleTimeMs == 0 ? 0.0 : (pollTimeMs * 1.0 / pollCycleTimeMs); this.pollIdleSensor.record(pollIdleRatio); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index b0358baebfdfc..638868a29d399 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -3775,6 +3775,26 @@ public void testPollIdleRatio(GroupProtocol groupProtocol) { assertEquals((1.0d + 0.0d + 0.5d) / 3, consumer.metrics().get(pollIdleRatio).metricValue()); } + @ParameterizedTest + @EnumSource(GroupProtocol.class) + public void testPollIdleRatioZero(GroupProtocol groupProtocol) { + ConsumerMetadata metadata = createMetadata(subscription); + MockClient client = new MockClient(time, metadata); + initMetadata(client, Map.of(topic, 1)); + + KafkaConsumer consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId); + // MetricName object to check + Metrics metrics = consumer.metricsRegistry(); + MetricName pollIdleRatio = metrics.metricName("poll-idle-ratio-avg", "consumer-metrics"); + // Test default value + assertEquals(Double.NaN, consumer.metrics().get(pollIdleRatio).metricValue()); + + // Poll starts and ends within the same millisecond, so the metric should be 0. + consumer.kafkaConsumerMetrics().recordPollStart(time.milliseconds()); + consumer.kafkaConsumerMetrics().recordPollEnd(time.milliseconds()); + assertEquals(0.0d, consumer.metrics().get(pollIdleRatio).metricValue()); + } + private static boolean consumerMetricPresent(KafkaConsumer consumer, String name) { MetricName metricName = new MetricName(name, "consumer-metrics", "", Collections.emptyMap()); return consumer.metricsRegistry().metrics().containsKey(metricName); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java index a9dc21d3d185a..84044d5fafce7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java @@ -165,6 +165,25 @@ public void testPollIdleRatio() { assertEquals((1.0d + 0.0d + 0.5d) / 3, consumer.metrics().get(pollIdleRatio).metricValue()); } + @Test + public void testPollIdleRatioZero() { + ShareConsumerMetadata metadata = createMetadata(subscription); + MockClient client = new MockClient(time, metadata); + initMetadata(client, Map.of(topic, 1)); + + KafkaShareConsumer consumer = newShareConsumer(time, client, subscription, metadata); + // MetricName object to check + Metrics metrics = consumer.metricsRegistry(); + MetricName pollIdleRatio = metrics.metricName("poll-idle-ratio-avg", CONSUMER_SHARE_METRIC_GROUP_PREFIX + "-metrics"); + // Test default value + assertEquals(Double.NaN, consumer.metrics().get(pollIdleRatio).metricValue()); + + // Poll starts and ends within the same millisecond, so the metric should be 0. + consumer.kafkaShareConsumerMetrics().recordPollStart(time.milliseconds()); + consumer.kafkaShareConsumerMetrics().recordPollEnd(time.milliseconds()); + assertEquals(0.0d, consumer.metrics().get(pollIdleRatio).metricValue()); + } + private static boolean consumerMetricPresent(KafkaShareConsumer consumer, String name) { MetricName metricName = new MetricName(name, CONSUMER_SHARE_METRIC_GROUP_PREFIX + "-metrics", "", Map.of()); return consumer.metricsRegistry().metrics().containsKey(metricName);