From cad7aba9707f3c7d4be3c208ef9f7325963d314a Mon Sep 17 00:00:00 2001 From: aliehsaeedii Date: Mon, 15 Dec 2025 11:43:35 +0100 Subject: [PATCH 1/9] windowed-sum --- .../streams/processor/internals/StreamThread.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index f208567c32db7..c9dde2fd56bcf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -325,6 +325,12 @@ public boolean isStartingRunningOrPartitionAssigned() { private final Sensor punctuateRatioSensor; private final Sensor commitRatioSensor; private final Sensor failedStreamThreadSensor; + private final Sensor windowedPollLatencySensor; + private final Sensor windowedTotalCommitLatencySensor; + private final Sensor windowedTotalProcessLatencySensor; + private final Sensor windowedTotalPunctuateLatencySensor; + private final Sensor windowedRunOnceLatencySensor; + private final long logSummaryIntervalMs; // the count summary log output time interval private long lastLogSummaryMs = -1L; @@ -802,6 +808,11 @@ public StreamThread(final Time time, this.punctuateRatioSensor = ThreadMetrics.punctuateRatioSensor(threadId, streamsMetrics); this.commitRatioSensor = ThreadMetrics.commitRatioSensor(threadId, streamsMetrics); this.failedStreamThreadSensor = ClientMetrics.failedStreamThreadSensor(streamsMetrics); + this.windowedPollLatencySensor = null; + this.windowedTotalCommitLatencySensor = null; + this.windowedTotalProcessLatencySensor = null; + this.windowedTotalPunctuateLatencySensor = null; + this.windowedRunOnceLatencySensor = null; this.assignmentErrorCode = assignmentErrorCode; this.shutdownErrorHook = shutdownErrorHook; this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler; From 8f92e78b99fd0e726794bd7a417984b7df7fe039 Mon Sep 17 00:00:00 2001 From: aliehsaeedii Date: Tue, 16 Dec 2025 00:27:37 +0100 Subject: [PATCH 2/9] record ratio metrics during window --- .../processor/internals/StreamThread.java | 108 +++++++++++++++--- 1 file changed, 91 insertions(+), 17 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index c9dde2fd56bcf..652eb18faac7e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -42,7 +42,9 @@ import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; +import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.WindowedSum; import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.utils.LogContext; @@ -325,12 +327,6 @@ public boolean isStartingRunningOrPartitionAssigned() { private final Sensor punctuateRatioSensor; private final Sensor commitRatioSensor; private final Sensor failedStreamThreadSensor; - private final Sensor windowedPollLatencySensor; - private final Sensor windowedTotalCommitLatencySensor; - private final Sensor windowedTotalProcessLatencySensor; - private final Sensor windowedTotalPunctuateLatencySensor; - private final Sensor windowedRunOnceLatencySensor; - private final long logSummaryIntervalMs; // the count summary log output time interval private long lastLogSummaryMs = -1L; @@ -381,6 +377,15 @@ public boolean isStartingRunningOrPartitionAssigned() { private final boolean stateUpdaterEnabled; private final boolean processingThreadsEnabled; + private final WindowedSum pollLatencyWindowedSum = new WindowedSum(); + private final WindowedSum totalCommitLatencyWindowedSum = new WindowedSum(); + private final WindowedSum processLatencyWindowedSum = new WindowedSum(); + private final WindowedSum punctuateLatencyWindowedSum = new WindowedSum(); + private final WindowedSum runOnceLatencyWindowedSum = new WindowedSum(); + private final MetricConfig metricsConfig; + + private boolean latencyWindowsInitialized = false; + private volatile long fetchDeadlineClientInstanceId = -1; private volatile KafkaFutureImpl mainConsumerInstanceIdFuture = new KafkaFutureImpl<>(); private volatile KafkaFutureImpl restoreConsumerInstanceIdFuture = new KafkaFutureImpl<>(); @@ -808,15 +813,11 @@ public StreamThread(final Time time, this.punctuateRatioSensor = ThreadMetrics.punctuateRatioSensor(threadId, streamsMetrics); this.commitRatioSensor = ThreadMetrics.commitRatioSensor(threadId, streamsMetrics); this.failedStreamThreadSensor = ClientMetrics.failedStreamThreadSensor(streamsMetrics); - this.windowedPollLatencySensor = null; - this.windowedTotalCommitLatencySensor = null; - this.windowedTotalProcessLatencySensor = null; - this.windowedTotalPunctuateLatencySensor = null; - this.windowedRunOnceLatencySensor = null; this.assignmentErrorCode = assignmentErrorCode; this.shutdownErrorHook = shutdownErrorHook; this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler; this.cacheResizer = cacheResizer; + this.metricsConfig = streamsMetrics.metricsRegistry().config(); // The following sensors are created here but their references are not stored in this object, since within // this object they are not recorded. The sensors are created here so that the stream threads starts with all @@ -919,6 +920,7 @@ public void run() { if (stateUpdaterEnabled) { taskManager.init(); } + initLatencyWindowsIfNeeded(System.currentTimeMillis()); cleanRun = runLoop(); } catch (final Throwable e) { failedStreamThreadSensor.record(); @@ -1330,11 +1332,9 @@ void runOnceWithoutProcessingThreads() { now = time.milliseconds(); final long runOnceLatency = now - startMs; + recordWindowedSum(now, pollLatency, totalCommitLatency, totalProcessLatency, totalPunctuateLatency, runOnceLatency); + recordAllRatios(now); processRecordsSensor.record(totalProcessed, now); - processRatioSensor.record((double) totalProcessLatency / runOnceLatency, now); - punctuateRatioSensor.record((double) totalPunctuateLatency / runOnceLatency, now); - pollRatioSensor.record((double) pollLatency / runOnceLatency, now); - commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now); final long timeSinceLastLog = now - lastLogSummaryMs; if (logSummaryIntervalMs > 0 && timeSinceLastLog > logSummaryIntervalMs) { @@ -1409,8 +1409,9 @@ void runOnceWithProcessingThreads() { now = time.milliseconds(); final long runOnceLatency = now - startMs; - pollRatioSensor.record((double) pollLatency / runOnceLatency, now); - commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now); + recordWindowedSum(now, pollLatency, totalCommitLatency, 0, 0, runOnceLatency); + recordPollRatio(now); + recordCommitRatio(now); if (logSummaryIntervalMs > 0 && now - lastLogSummaryMs > logSummaryIntervalMs) { log.info("Committed {} total tasks since the last update", totalCommittedSinceLastSummary); @@ -2153,4 +2154,77 @@ Admin adminClient() { Optional streamsRebalanceData() { return streamsRebalanceData; } + + /** + * Initialize both WindowedSum instances at exactly the same timestamp so + * their windows are aligned from the very beginning. + */ + private void initLatencyWindowsIfNeeded(final long now) { + if (!latencyWindowsInitialized) { + // Start both windows at the same instant with a zero record + pollLatencyWindowedSum.record(metricsConfig, 0.0, now); + runOnceLatencyWindowedSum.record(metricsConfig, 0.0, now); + latencyWindowsInitialized = true; + } + } + + private void recordWindowedSum(final long now, + final double pollLatency, + final double totalCommitLatency, + final double processLatency, + final double punctuateLatency, + final double runOnceLatency) { + this.pollLatencyWindowedSum.record(metricsConfig, pollLatency, now); + this.totalCommitLatencyWindowedSum.record(metricsConfig, totalCommitLatency, now); + this.processLatencyWindowedSum.record(metricsConfig, processLatency, now); + this.punctuateLatencyWindowedSum.record(metricsConfig, punctuateLatency, now); + this.runOnceLatencyWindowedSum.record(metricsConfig, runOnceLatency, now); + } + + private void recordPollRatio(final long now) { + final double pollLatencyWindow = + pollLatencyWindowedSum.measure(metricsConfig, now); + final double runOnceLatencyWindow = + runOnceLatencyWindowedSum.measure(metricsConfig, now); + + if (runOnceLatencyWindow > 0.0) { + final double ratio = pollLatencyWindow / runOnceLatencyWindow; + pollRatioSensor.record(ratio, now); + } else { + pollRatioSensor.record(0.0, now); + } + } + + private void recordCommitRatio(final long now) { + final double commitLatencyWindow = + totalCommitLatencyWindowedSum.measure(metricsConfig, now); + final double runOnceLatencyWindow = + runOnceLatencyWindowedSum.measure(metricsConfig, now); + + if (runOnceLatencyWindow > 0.0) { + final double ratio = commitLatencyWindow / runOnceLatencyWindow; + commitRatioSensor.record(ratio, now); + } else { + commitRatioSensor.record(0.0, now); + } + } + + private void recordAllRatios(final long now) { + recordCommitRatio(now); + recordPollRatio(now); + final double runOnceLatencyWindow = + runOnceLatencyWindowedSum.measure(metricsConfig, now); + + if (runOnceLatencyWindow > 0.0) { + final double totalProcessLatency = + processLatencyWindowedSum.measure(metricsConfig, now); + final double totalPunctuateLatency = + punctuateLatencyWindowedSum.measure(metricsConfig, now); + processRatioSensor.record(totalProcessLatency / runOnceLatencyWindow, now); + punctuateRatioSensor.record(totalPunctuateLatency / runOnceLatencyWindow, now); + } else { + processRatioSensor.record(0.0, now); + punctuateRatioSensor.record(0.0, now); + } + } } From 067cced7e294a82f23878c96c5365c2d1a4218ed Mon Sep 17 00:00:00 2001 From: aliehsaeedii Date: Tue, 16 Dec 2025 01:07:00 +0100 Subject: [PATCH 3/9] fix initialization --- .../apache/kafka/streams/processor/internals/StreamThread.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 652eb18faac7e..1f1866dc3324b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -2163,6 +2163,9 @@ private void initLatencyWindowsIfNeeded(final long now) { if (!latencyWindowsInitialized) { // Start both windows at the same instant with a zero record pollLatencyWindowedSum.record(metricsConfig, 0.0, now); + this.totalCommitLatencyWindowedSum.record(metricsConfig, 0, now); + this.processLatencyWindowedSum.record(metricsConfig, 0, now); + this.punctuateLatencyWindowedSum.record(metricsConfig, 0, now); runOnceLatencyWindowedSum.record(metricsConfig, 0.0, now); latencyWindowsInitialized = true; } From 2a2234b42229dd4e716b2c8a9f414aece97750f4 Mon Sep 17 00:00:00 2001 From: aliehsaeedii Date: Tue, 16 Dec 2025 10:03:13 +0100 Subject: [PATCH 4/9] address reviews --- .../processor/internals/StreamThread.java | 54 +++++-------------- .../internals/metrics/ThreadMetrics.java | 12 +++-- 2 files changed, 20 insertions(+), 46 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 1f1866dc3324b..4cecf60b10818 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1333,7 +1333,10 @@ void runOnceWithoutProcessingThreads() { now = time.milliseconds(); final long runOnceLatency = now - startMs; recordWindowedSum(now, pollLatency, totalCommitLatency, totalProcessLatency, totalPunctuateLatency, runOnceLatency); - recordAllRatios(now); + recordRatio(now, pollLatencyWindowedSum, pollRatioSensor); + recordRatio(now, totalCommitLatencyWindowedSum, commitRatioSensor); + recordRatio(now, processLatencyWindowedSum, processRatioSensor); + recordRatio(now, punctuateLatencyWindowedSum, punctuateRatioSensor); processRecordsSensor.record(totalProcessed, now); final long timeSinceLastLog = now - lastLogSummaryMs; @@ -1410,8 +1413,8 @@ void runOnceWithProcessingThreads() { now = time.milliseconds(); final long runOnceLatency = now - startMs; recordWindowedSum(now, pollLatency, totalCommitLatency, 0, 0, runOnceLatency); - recordPollRatio(now); - recordCommitRatio(now); + recordRatio(now, pollLatencyWindowedSum, pollRatioSensor); + recordRatio(now, totalCommitLatencyWindowedSum, commitRatioSensor); if (logSummaryIntervalMs > 0 && now - lastLogSummaryMs > logSummaryIntervalMs) { log.info("Committed {} total tasks since the last update", totalCommittedSinceLastSummary); @@ -2184,50 +2187,17 @@ private void recordWindowedSum(final long now, this.runOnceLatencyWindowedSum.record(metricsConfig, runOnceLatency, now); } - private void recordPollRatio(final long now) { - final double pollLatencyWindow = - pollLatencyWindowedSum.measure(metricsConfig, now); + private void recordRatio(final long now, final WindowedSum windowedSum, final Sensor ratioSensor) { + final double latencyWindow = + windowedSum.measure(metricsConfig, now); final double runOnceLatencyWindow = runOnceLatencyWindowedSum.measure(metricsConfig, now); if (runOnceLatencyWindow > 0.0) { - final double ratio = pollLatencyWindow / runOnceLatencyWindow; - pollRatioSensor.record(ratio, now); + final double ratio = latencyWindow / runOnceLatencyWindow; + ratioSensor.record(ratio, now); } else { - pollRatioSensor.record(0.0, now); - } - } - - private void recordCommitRatio(final long now) { - final double commitLatencyWindow = - totalCommitLatencyWindowedSum.measure(metricsConfig, now); - final double runOnceLatencyWindow = - runOnceLatencyWindowedSum.measure(metricsConfig, now); - - if (runOnceLatencyWindow > 0.0) { - final double ratio = commitLatencyWindow / runOnceLatencyWindow; - commitRatioSensor.record(ratio, now); - } else { - commitRatioSensor.record(0.0, now); - } - } - - private void recordAllRatios(final long now) { - recordCommitRatio(now); - recordPollRatio(now); - final double runOnceLatencyWindow = - runOnceLatencyWindowedSum.measure(metricsConfig, now); - - if (runOnceLatencyWindow > 0.0) { - final double totalProcessLatency = - processLatencyWindowedSum.measure(metricsConfig, now); - final double totalPunctuateLatency = - punctuateLatencyWindowedSum.measure(metricsConfig, now); - processRatioSensor.record(totalProcessLatency / runOnceLatencyWindow, now); - punctuateRatioSensor.record(totalPunctuateLatency / runOnceLatencyWindow, now); - } else { - processRatioSensor.record(0.0, now); - punctuateRatioSensor.record(0.0, now); + ratioSensor.record(0.0, now); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java index b45bde5ddd366..73fa53ebcdd04 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java @@ -82,13 +82,17 @@ private ThreadMetrics() {} private static final String PUNCTUATE_AVG_LATENCY_DESCRIPTION = "The average punctuate latency"; private static final String PUNCTUATE_MAX_LATENCY_DESCRIPTION = "The maximum punctuate latency"; private static final String PROCESS_RATIO_DESCRIPTION = - "The fraction of time the thread spent on processing active tasks"; + "The ratio, over a rolling measurement window, of the time this thread spent " + + "processing active tasks to the total elapsed time in that window."; private static final String PUNCTUATE_RATIO_DESCRIPTION = - "The fraction of time the thread spent on punctuating active tasks"; + "The ratio, over a rolling measurement window, of the time this thread spent " + + "punctuating active tasks to the total elapsed time in that window."; private static final String POLL_RATIO_DESCRIPTION = - "The fraction of time the thread spent on polling records from consumer"; + "The ratio, over a rolling measurement window, of the time this thread spent " + + "polling records from the consumer to the total elapsed time in that window."; private static final String COMMIT_RATIO_DESCRIPTION = - "The fraction of time the thread spent on committing all tasks"; + "The ratio, over a rolling measurement window, of the time this thread spent " + + "committing all tasks to the total elapsed time in that window."; private static final String BLOCKED_TIME_DESCRIPTION = "The total time the thread spent blocked on kafka in nanoseconds"; private static final String THREAD_START_TIME_DESCRIPTION = From f22c1760b13bfc105ae694dd1cba46af2a8ed7f9 Mon Sep 17 00:00:00 2001 From: aliehsaeedii Date: Tue, 16 Dec 2025 18:04:24 +0100 Subject: [PATCH 5/9] minor optimization --- .../kafka/streams/processor/internals/StreamThread.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 6f56db701f18a..63d8175e0271d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -2151,14 +2151,13 @@ private void recordWindowedSum(final long now, } private void recordRatio(final long now, final WindowedSum windowedSum, final Sensor ratioSensor) { - final double latencyWindow = - windowedSum.measure(metricsConfig, now); final double runOnceLatencyWindow = runOnceLatencyWindowedSum.measure(metricsConfig, now); if (runOnceLatencyWindow > 0.0) { - final double ratio = latencyWindow / runOnceLatencyWindow; - ratioSensor.record(ratio, now); + final double latencyWindow = + windowedSum.measure(metricsConfig, now); + ratioSensor.record(latencyWindow / runOnceLatencyWindow); } else { ratioSensor.record(0.0, now); } From 7b5d1b03ef3d9f834b027fbad01351f759049d72 Mon Sep 17 00:00:00 2001 From: aliehsaeedii Date: Wed, 17 Dec 2025 02:40:26 +0100 Subject: [PATCH 6/9] fix utests + upgrade doc --- docs/upgrade.html | 6 ++++++ .../internals/metrics/ThreadMetricsTest.java | 12 ++++++++---- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/docs/upgrade.html b/docs/upgrade.html index 1c0aea9e53a9b..1242270f87360 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -66,6 +66,12 @@
Notable changes in 4
  • The behavior of org.apache.kafka.streams.KafkaStreams#removeStreamThread has been changed. The consumer has no longer remove once removeStreamThread finished. Instead, consumer would be kicked off from the group after org.apache.kafka.streams.processor.internals.StreamThread completes its run function.
  • +
  • TThe streams thread metrics commit-ratio, process-ratio, punctuate-ratio, and poll-ratio have been updated. + They now report, over a rolling measurement window, + the ratio of time this thread spends performing the given action ({action}) to the total elapsed time in that window. + The effective window duration is determined by your metrics configuration: metrics.sample.window.ms (per-sample window length) + and metrics.num.samples (number of rolling windows). +
  • The support for MX4J library, enabled through kafka_mx4jenable system property, was deprecated and will be removed in Kafka 5.0.
  • diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java index 87891ab389730..63a3512049f12 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java @@ -53,7 +53,8 @@ public class ThreadMetricsTest { @Test public void shouldGetProcessRatioSensor() { final String operation = "process-ratio"; - final String ratioDescription = "The fraction of time the thread spent on processing active tasks"; + final String ratioDescription = "The ratio, over a rolling measurement window, of the time this thread spent " + + "processing active tasks to the total elapsed time in that window."; when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor); when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); @@ -148,7 +149,8 @@ public void shouldGetProcessRateSensor() { @Test public void shouldGetPollRatioSensor() { final String operation = "poll-ratio"; - final String ratioDescription = "The fraction of time the thread spent on polling records from consumer"; + final String ratioDescription = "The ratio, over a rolling measurement window, of the time this thread " + + "spent polling records from the consumer to the total elapsed time in that window."; when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor); when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); @@ -268,7 +270,8 @@ public void shouldGetCommitSensor() { @Test public void shouldGetCommitRatioSensor() { final String operation = "commit-ratio"; - final String ratioDescription = "The fraction of time the thread spent on committing all tasks"; + final String ratioDescription = "The ratio, over a rolling measurement window, of the time this thread spent " + + "committing all tasks to the total elapsed time in that window."; when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor); when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); @@ -327,7 +330,8 @@ public void shouldGetPunctuateSensor() { @Test public void shouldGetPunctuateRatioSensor() { final String operation = "punctuate-ratio"; - final String ratioDescription = "The fraction of time the thread spent on punctuating active tasks"; + final String ratioDescription = "The ratio, over a rolling measurement window, of the time this thread spent " + + "punctuating active tasks to the total elapsed time in that window."; when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor); when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); From ae241dc390f66851233af3df16590ab8971ce6b2 Mon Sep 17 00:00:00 2001 From: aliehsaeedii Date: Wed, 17 Dec 2025 16:01:40 +0100 Subject: [PATCH 7/9] address nits --- docs/upgrade.html | 2 +- .../processor/internals/StreamThread.java | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/docs/upgrade.html b/docs/upgrade.html index 1242270f87360..65ece301920a4 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -66,7 +66,7 @@
    Notable changes in 4
  • The behavior of org.apache.kafka.streams.KafkaStreams#removeStreamThread has been changed. The consumer has no longer remove once removeStreamThread finished. Instead, consumer would be kicked off from the group after org.apache.kafka.streams.processor.internals.StreamThread completes its run function.
  • -
  • TThe streams thread metrics commit-ratio, process-ratio, punctuate-ratio, and poll-ratio have been updated. +
  • The streams thread metrics commit-ratio, process-ratio, punctuate-ratio, and poll-ratio have been updated. They now report, over a rolling measurement window, the ratio of time this thread spends performing the given action ({action}) to the total elapsed time in that window. The effective window duration is determined by your metrics configuration: metrics.sample.window.ms (per-sample window length) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 63d8175e0271d..ba87bcd463840 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -899,7 +899,7 @@ public void run() { boolean cleanRun = false; try { taskManager.init(); - initLatencyWindowsIfNeeded(System.currentTimeMillis()); + initLatencyWindowsIfNeeded(time.milliseconds()); cleanRun = runLoop(); } catch (final Throwable e) { failedStreamThreadSensor.record(); @@ -2129,9 +2129,9 @@ private void initLatencyWindowsIfNeeded(final long now) { if (!latencyWindowsInitialized) { // Start both windows at the same instant with a zero record pollLatencyWindowedSum.record(metricsConfig, 0.0, now); - this.totalCommitLatencyWindowedSum.record(metricsConfig, 0, now); - this.processLatencyWindowedSum.record(metricsConfig, 0, now); - this.punctuateLatencyWindowedSum.record(metricsConfig, 0, now); + totalCommitLatencyWindowedSum.record(metricsConfig, 0, now); + processLatencyWindowedSum.record(metricsConfig, 0, now); + punctuateLatencyWindowedSum.record(metricsConfig, 0, now); runOnceLatencyWindowedSum.record(metricsConfig, 0.0, now); latencyWindowsInitialized = true; } @@ -2143,11 +2143,11 @@ private void recordWindowedSum(final long now, final double processLatency, final double punctuateLatency, final double runOnceLatency) { - this.pollLatencyWindowedSum.record(metricsConfig, pollLatency, now); - this.totalCommitLatencyWindowedSum.record(metricsConfig, totalCommitLatency, now); - this.processLatencyWindowedSum.record(metricsConfig, processLatency, now); - this.punctuateLatencyWindowedSum.record(metricsConfig, punctuateLatency, now); - this.runOnceLatencyWindowedSum.record(metricsConfig, runOnceLatency, now); + pollLatencyWindowedSum.record(metricsConfig, pollLatency, now); + totalCommitLatencyWindowedSum.record(metricsConfig, totalCommitLatency, now); + processLatencyWindowedSum.record(metricsConfig, processLatency, now); + punctuateLatencyWindowedSum.record(metricsConfig, punctuateLatency, now); + runOnceLatencyWindowedSum.record(metricsConfig, runOnceLatency, now); } private void recordRatio(final long now, final WindowedSum windowedSum, final Sensor ratioSensor) { From 31a20931c57c05917f0e919f612761515b9b5c99 Mon Sep 17 00:00:00 2001 From: aliehsaeedii Date: Fri, 19 Dec 2025 10:34:23 +0100 Subject: [PATCH 8/9] move explanations to docs/streams/upgrade-guide.html --- docs/streams/upgrade-guide.html | 7 +++++++ docs/upgrade.html | 6 ------ 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 5037410f3d61e..c72344ad99536 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -151,6 +151,13 @@

    Streams API More details can be found in KIP-1221.

    +

    The streams thread metrics commit-ratio, process-ratio, punctuate-ratio, and poll-ratio have been updated. + They now report, over a rolling measurement window, + the ratio of time this thread spends performing the given action ({action}) to the total elapsed time in that window. + The effective window duration is determined by the metrics configuration: metrics.sample.window.ms (per-sample window length) + and metrics.num.samples (number of rolling windows). +

    +

    Streams API changes in 4.1.0

    Note: Kafka Streams 4.1.0 contains a critical memory leak bug (KAFKA-19748) that affects users of range scans and certain DSL operators (session windows, sliding windows, stream-stream joins, foreign-key joins). Users running Kafka Streams should consider upgrading directly to 4.1.1, which includes the fix for it.

    diff --git a/docs/upgrade.html b/docs/upgrade.html index 65ece301920a4..1c0aea9e53a9b 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -66,12 +66,6 @@
    Notable changes in 4
  • The behavior of org.apache.kafka.streams.KafkaStreams#removeStreamThread has been changed. The consumer has no longer remove once removeStreamThread finished. Instead, consumer would be kicked off from the group after org.apache.kafka.streams.processor.internals.StreamThread completes its run function.
  • -
  • The streams thread metrics commit-ratio, process-ratio, punctuate-ratio, and poll-ratio have been updated. - They now report, over a rolling measurement window, - the ratio of time this thread spends performing the given action ({action}) to the total elapsed time in that window. - The effective window duration is determined by your metrics configuration: metrics.sample.window.ms (per-sample window length) - and metrics.num.samples (number of rolling windows). -
  • The support for MX4J library, enabled through kafka_mx4jenable system property, was deprecated and will be removed in Kafka 5.0.
  • From 8781105c7ee7a14b7e560e5ad9dab4c43f56ac23 Mon Sep 17 00:00:00 2001 From: aliehsaeedii Date: Fri, 19 Dec 2025 19:51:37 +0100 Subject: [PATCH 9/9] doc moved to 4.3 --- docs/streams/upgrade-guide.html | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index c72344ad99536..bb6ea53864a0b 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -108,6 +108,15 @@

    <

    Since 2.6.0 release, Kafka Streams depends on a RocksDB version that requires MacOS 10.14 or higher.

    +

    Streams API changes in 4.3.0

    + +

    The streams thread metrics commit-ratio, process-ratio, punctuate-ratio, and poll-ratio have been updated. + They now report, over a rolling measurement window, + the ratio of time this thread spends performing the given action ({action}) to the total elapsed time in that window. + The effective window duration is determined by the metrics configuration: metrics.sample.window.ms (per-sample window length) + and metrics.num.samples (number of rolling windows). +

    +

    Streams API changes in 4.2.0

    @@ -151,13 +160,6 @@

    Streams API More details can be found in KIP-1221.

    -

    The streams thread metrics commit-ratio, process-ratio, punctuate-ratio, and poll-ratio have been updated. - They now report, over a rolling measurement window, - the ratio of time this thread spends performing the given action ({action}) to the total elapsed time in that window. - The effective window duration is determined by the metrics configuration: metrics.sample.window.ms (per-sample window length) - and metrics.num.samples (number of rolling windows). -

    -

    Streams API changes in 4.1.0

    Note: Kafka Streams 4.1.0 contains a critical memory leak bug (KAFKA-19748) that affects users of range scans and certain DSL operators (session windows, sliding windows, stream-stream joins, foreign-key joins). Users running Kafka Streams should consider upgrading directly to 4.1.1, which includes the fix for it.