From 325a13f57a04d7f9ac632b86e835b0338b5cd076 Mon Sep 17 00:00:00 2001 From: Aangbaeck <3142272+Aangbaeck@users.noreply.github.com> Date: Wed, 1 Jul 2026 12:28:45 +0200 Subject: [PATCH] KAFKA-10397: do not expose statistics-based RocksDB metrics when the user provides a Statistics object When a user supplies their own Statistics object to a RocksDB state store through RocksDBConfigSetter, RocksDBStore passes null to the metrics recorder, because Kafka Streams must not read the user's Statistics -- doing so via getAndResetTickerCount() would reset the user's counters. As a result record() short-circuits and the statistics-based metrics (bytes-written, bytes-read, memtable-*, block-cache-* hit ratios, compaction-*, write-stall-duration, number-open-files, number-file-errors) are never recorded. They were, however, still registered in init() and therefore exposed as perpetually-empty metrics. Fix: register the statistics-based sensors lazily in addValueProviders(), only once a value provider with a non-null Statistics is added, instead of unconditionally in init(). The property / block-cache gauges read the RocksDB instance directly and remain valid without a Statistics object, so they are still registered in init() as before. record() additionally guards on whether the sensors were registered. This changes only which metrics are registered in the user-provided-Statistics case; the default (Streams-owned Statistics at RecordingLevel.DEBUG) is unchanged, and the property-based gauges are unchanged in all cases. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../metrics/RocksDBMetricsRecorder.java | 35 ++++++++- .../state/internals/RocksDBStoreTest.java | 48 ++++++++++++ .../metrics/RocksDBMetricsRecorderTest.java | 73 ++++++++++++++----- 3 files changed, 135 insertions(+), 21 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java index 43fe5181537db..4e335663f340e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java @@ -112,6 +112,11 @@ public DbAndCacheAndStatistics(final RocksDB db, final Cache cache, final Statis private TaskId taskId; private StreamsMetricsImpl streamsMetrics; private boolean singleCache = true; + // The statistics-based sensors are registered lazily (see maybeInitStatisticsBasedSensors) + // the first time value providers with a non-null Statistics are added. This guards against + // registering them more than once when segments are added and removed over the lifetime of + // the recorder. See KAFKA-10397. + private boolean statisticsBasedSensorsInitialized = false; public RocksDBMetricsRecorder(final String metricsScope, final String storeName) { @@ -147,7 +152,11 @@ public void init(final StreamsMetricsImpl streamsMetrics, "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues"); } final RocksDBMetricContext metricContext = new RocksDBMetricContext(taskId.toString(), metricsScope, storeName); - initSensors(streamsMetrics, metricContext); + // The property / block-cache gauges read the RocksDB instance directly (getAggregatedLongProperty / + // getLongProperty) and are meaningful regardless of whether a Statistics object exists, so they are + // always registered here. The statistics-based sensors, in contrast, are only registered later in + // addValueProviders() once we know the store actually has a (non-user-provided) Statistics to read + // from -- see maybeInitStatisticsBasedSensors() and KAFKA-10397. initGauges(streamsMetrics, metricContext); this.taskId = taskId; this.streamsMetrics = streamsMetrics; @@ -168,6 +177,24 @@ public void addValueProviders(final String segmentName, verifyDbAndCacheAndStatistics(segmentName, db, cache, statistics); logger.debug("Adding value providers for store {} of task {}", segmentName, taskId); storeToValueProviders.put(segmentName, new DbAndCacheAndStatistics(db, cache, statistics)); + maybeInitStatisticsBasedSensors(statistics); + } + + /** + * Registers the statistics-based sensors the first time value providers with a non-null + * {@link Statistics} are added. + * + *

If the user supplies their own {@code Statistics} object through the {@code RocksDBConfigSetter}, + * {@link org.apache.kafka.streams.state.internals.RocksDBStore} passes {@code null} here, because + * Streams must not read (and thereby reset) the user's statistics. In that case the statistics-based + * metrics can never be recorded, so we do not register them at all rather than exposing metrics that + * are perpetually empty. See KAFKA-10397. + */ + private void maybeInitStatisticsBasedSensors(final Statistics statistics) { + if (statistics != null && !statisticsBasedSensorsInitialized) { + initSensors(streamsMetrics, new RocksDBMetricContext(taskId.toString(), metricsScope, storeName)); + statisticsBasedSensorsInitialized = true; + } } private void verifyDbAndCacheAndStatistics(final String segmentName, @@ -475,7 +502,11 @@ public void record(final long now) { compactionTimeMin = Double.min(compactionTimeMin, compactionTimeData.getMin()); compactionTimeMax = Double.max(compactionTimeMax, compactionTimeData.getMax()); } - if (shouldRecord) { + // statisticsBasedSensorsInitialized is false when no Statistics is available (e.g. the user + // provided their own via RocksDBConfigSetter, see KAFKA-10397), in which case the sensors were + // never registered and there is nothing to record to. shouldRecord already covers the + // per-provider null-Statistics case; this also guards the corner case of an empty value-provider map. + if (shouldRecord && statisticsBasedSensorsInitialized) { bytesWrittenToDatabaseSensor.record(bytesWrittenToDatabase, now); bytesReadFromDatabaseSensor.record(bytesReadFromDatabase, now); memtableBytesFlushedSensor.record(memtableBytesFlushed, now); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index fd001efa4ee17..da45e939e066e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -1007,6 +1007,54 @@ public void shouldVerifyThatMetricsRecordedFromStatisticsGetMeasurementsFromRock assertThat((double) bytesWrittenTotal.metricValue(), greaterThan(0d)); } + @Test + public void shouldNotRegisterStatisticsBasedMetricsWhenUserProvidesStatistics() { + // KAFKA-10397: when the user supplies their own Statistics object via RocksDBConfigSetter, + // Streams must not read it (that would reset the user's counters), so the statistics-based + // metrics can never be recorded. They must therefore not be exposed at all, even at + // metrics.recording.level=DEBUG. The property-based gauges, which read the RocksDB instance + // directly, must still be exposed. + final TaskId taskId = new TaskId(0, 0); + + final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.DEBUG)); + final StreamsMetricsImpl streamsMetrics = + new StreamsMetricsImpl(metrics, "test-application", time); + + final Properties props = StreamsTestUtils.getStreamsConfig(); + props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, RocksDBConfigSetterWithUserProvidedStatistics.class); + + context = mock(InternalMockProcessorContext.class); + when(context.metrics()).thenReturn(streamsMetrics); + when(context.taskId()).thenReturn(taskId); + when(context.appConfigs()).thenReturn(new StreamsConfig(props).originals()); + when(context.stateDir()).thenReturn(dir); + final MonotonicProcessorRecordContext processorRecordContext = new MonotonicProcessorRecordContext("test", 0); + when(context.recordMetadata()).thenReturn(Optional.of(processorRecordContext)); + + rocksDBStore.init(context, rocksDBStore); + rocksDBStore.put(Bytes.wrap("hello".getBytes()), "world".getBytes()); + + streamsMetrics.rocksDBMetricsRecordingTrigger().run(); + + // A statistics-based metric must NOT be registered ... + final Metric bytesWrittenTotal = metrics.metric(new MetricName( + "bytes-written-total", + StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP, + "description is not verified", + streamsMetrics.storeLevelTagMap(taskId.toString(), METRICS_SCOPE, DB_NAME) + )); + assertNull(bytesWrittenTotal); + + // ... but a property-based gauge must still be registered. + final Metric numberOfEntriesActiveMemTable = metrics.metric(new MetricName( + "num-entries-active-mem-table", + StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP, + "description is not verified", + streamsMetrics.storeLevelTagMap(taskId.toString(), METRICS_SCOPE, DB_NAME) + )); + assertThat(numberOfEntriesActiveMemTable, notNullValue()); + } + @Test public void shouldVerifyThatMetricsRecordedFromPropertiesGetMeasurementsFromRocksDB() { final TaskId taskId = new TaskId(0, 0); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java index 0436811db79ca..06180cfcf2704 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java @@ -33,6 +33,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; +import org.mockito.verification.VerificationMode; import org.rocksdb.Cache; import org.rocksdb.HistogramData; import org.rocksdb.HistogramType; @@ -121,25 +122,7 @@ public static void cleanUpMockito() { } @Test - public void shouldInitMetricsRecorder() { - dbMetrics.verify(() -> RocksDBMetrics.bytesWrittenToDatabaseSensor(any(), any())); - dbMetrics.verify(() -> RocksDBMetrics.bytesReadFromDatabaseSensor(any(), any())); - dbMetrics.verify(() -> RocksDBMetrics.memtableBytesFlushedSensor(any(), any())); - dbMetrics.verify(() -> RocksDBMetrics.memtableHitRatioSensor(any(), any())); - dbMetrics.verify(() -> RocksDBMetrics.memtableAvgFlushTimeSensor(any(), any())); - dbMetrics.verify(() -> RocksDBMetrics.memtableMinFlushTimeSensor(any(), any())); - dbMetrics.verify(() -> RocksDBMetrics.memtableMaxFlushTimeSensor(any(), any())); - dbMetrics.verify(() -> RocksDBMetrics.writeStallDurationSensor(any(), any())); - dbMetrics.verify(() -> RocksDBMetrics.blockCacheDataHitRatioSensor(any(), any())); - dbMetrics.verify(() -> RocksDBMetrics.blockCacheIndexHitRatioSensor(any(), any())); - dbMetrics.verify(() -> RocksDBMetrics.blockCacheFilterHitRatioSensor(any(), any())); - dbMetrics.verify(() -> RocksDBMetrics.bytesWrittenDuringCompactionSensor(any(), any())); - dbMetrics.verify(() -> RocksDBMetrics.bytesReadDuringCompactionSensor(any(), any())); - dbMetrics.verify(() -> RocksDBMetrics.compactionTimeAvgSensor(any(), any())); - dbMetrics.verify(() -> RocksDBMetrics.compactionTimeMinSensor(any(), any())); - dbMetrics.verify(() -> RocksDBMetrics.compactionTimeMaxSensor(any(), any())); - dbMetrics.verify(() -> RocksDBMetrics.numberOfOpenFilesSensor(any(), any())); - dbMetrics.verify(() -> RocksDBMetrics.numberOfFileErrorsSensor(any(), any())); + public void shouldInitGaugesWhenRecorderIsInitialised() { dbMetrics.verify(() -> RocksDBMetrics.addNumImmutableMemTableMetric(eq(streamsMetrics), eq(metricsContext), any())); dbMetrics.verify(() -> RocksDBMetrics.addCurSizeActiveMemTable(eq(streamsMetrics), eq(metricsContext), any())); dbMetrics.verify(() -> RocksDBMetrics.addCurSizeAllMemTables(eq(streamsMetrics), eq(metricsContext), any())); @@ -165,6 +148,37 @@ public void shouldInitMetricsRecorder() { assertThat(recorder.taskId(), is(TASK_ID1)); } + @Test + public void shouldNotInitStatisticsBasedSensorsWhenRecorderIsInitialised() { + // The recorder was init()'ed in setUp() but no value providers were added yet, so the + // statistics-based sensors must not have been registered. See KAFKA-10397. + verifyStatisticsBasedSensorsRegistered(never()); + } + + @Test + public void shouldInitStatisticsBasedSensorsWhenValueProvidersWithStatisticsAreAdded() { + recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); + + verifyStatisticsBasedSensorsRegistered(times(1)); + } + + @Test + public void shouldNotInitStatisticsBasedSensorsWhenValueProvidersWithoutStatisticsAreAdded() { + // When the user supplies their own Statistics object, RocksDBStore passes null here and the + // statistics-based metrics can never be recorded, so they must not be registered. See KAFKA-10397. + recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, null); + + verifyStatisticsBasedSensorsRegistered(never()); + } + + @Test + public void shouldInitStatisticsBasedSensorsOnlyOnceWhenMultipleValueProvidersWithStatisticsAreAdded() { + recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); + recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2); + + verifyStatisticsBasedSensorsRegistered(times(1)); + } + @Test public void shouldThrowIfMetricRecorderIsReInitialisedWithDifferentTask() { assertThrows( @@ -562,6 +576,27 @@ public void shouldCorrectlyHandleAvgRecordingsWithZeroSumAndCount() { verify(memtableAvgFlushTimeSensor).record(0d, now); } + private void verifyStatisticsBasedSensorsRegistered(final VerificationMode mode) { + dbMetrics.verify(() -> RocksDBMetrics.bytesWrittenToDatabaseSensor(any(), any()), mode); + dbMetrics.verify(() -> RocksDBMetrics.bytesReadFromDatabaseSensor(any(), any()), mode); + dbMetrics.verify(() -> RocksDBMetrics.memtableBytesFlushedSensor(any(), any()), mode); + dbMetrics.verify(() -> RocksDBMetrics.memtableHitRatioSensor(any(), any()), mode); + dbMetrics.verify(() -> RocksDBMetrics.memtableAvgFlushTimeSensor(any(), any()), mode); + dbMetrics.verify(() -> RocksDBMetrics.memtableMinFlushTimeSensor(any(), any()), mode); + dbMetrics.verify(() -> RocksDBMetrics.memtableMaxFlushTimeSensor(any(), any()), mode); + dbMetrics.verify(() -> RocksDBMetrics.writeStallDurationSensor(any(), any()), mode); + dbMetrics.verify(() -> RocksDBMetrics.blockCacheDataHitRatioSensor(any(), any()), mode); + dbMetrics.verify(() -> RocksDBMetrics.blockCacheIndexHitRatioSensor(any(), any()), mode); + dbMetrics.verify(() -> RocksDBMetrics.blockCacheFilterHitRatioSensor(any(), any()), mode); + dbMetrics.verify(() -> RocksDBMetrics.bytesWrittenDuringCompactionSensor(any(), any()), mode); + dbMetrics.verify(() -> RocksDBMetrics.bytesReadDuringCompactionSensor(any(), any()), mode); + dbMetrics.verify(() -> RocksDBMetrics.compactionTimeAvgSensor(any(), any()), mode); + dbMetrics.verify(() -> RocksDBMetrics.compactionTimeMinSensor(any(), any()), mode); + dbMetrics.verify(() -> RocksDBMetrics.compactionTimeMaxSensor(any(), any()), mode); + dbMetrics.verify(() -> RocksDBMetrics.numberOfOpenFilesSensor(any(), any()), mode); + dbMetrics.verify(() -> RocksDBMetrics.numberOfFileErrorsSensor(any(), any()), mode); + } + private void setUpMetricsMock() { dbMetrics = mockStatic(RocksDBMetrics.class); dbMetrics.when(() -> RocksDBMetrics.bytesWrittenToDatabaseSensor(streamsMetrics, metricsContext))