From 735c00da1bc9f0305f0685e763c830bb2147cd98 Mon Sep 17 00:00:00 2001 From: Aangbaeck <3142272+Aangbaeck@users.noreply.github.com> Date: Wed, 1 Jul 2026 10:19:00 +0200 Subject: [PATCH] KAFKA-10025: guard RocksDBMetricsRecorder value provider reads against store close RocksDBMetricsRecorder reads native RocksDB value providers (RocksDB and Statistics) in record() (getAndResetTickerCount / getHistogramData), in the property gauges (RocksDB.getAggregatedLongProperty) and in the block-cache gauges (RocksDB.getLongProperty). These reads had no mutual exclusion against removeValueProviders(). RocksDBStore.close() calls removeValueProviders() and then closes (frees) the native RocksDB and Statistics. Because the reads and the removal were not mutually exclusive, a metrics read that is in flight when a store is closed (e.g. during a rebalance / task migration) can dereference a native handle that close() is concurrently freeing, causing a native use-after-free / SIGSEGV. storeToValueProviders being a ConcurrentHashMap only makes the map operations safe; it does not prevent a reader that already holds a DbAndCacheAndStatistics from calling into a db/statistics that close() frees. Two observed crash frames, same root cause: - record() path -> Statistics::getAndResetTickerCount (this ticket) - gauge path -> rocksdb::DBImpl::GetAggregatedIntProperty (property gauges, registered at RecordingLevel.INFO, so reachable via a metrics reporter/JMX scrape even at metrics.recording.level=INFO) Fix: hold a single lock around every read of the value providers (record() and both gauge lambdas) and around the map mutations (addValueProviders / removeValueProviders). Since RocksDBStore.close() calls removeValueProviders() before freeing the native handles, acquiring the lock there waits for any in-flight read to finish and prevents any later read from seeing the segment, so no read can dereference a freed handle. The recording trigger holds no lock while calling record(), and the guarded reads never call back into RocksDBStore, so no lock-ordering cycle is introduced. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../metrics/RocksDBMetricsRecorder.java | 190 ++++++++++-------- .../RocksDBMetricsRecorderGaugesTest.java | 78 ++++++- 2 files changed, 184 insertions(+), 84 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java index 43fe5181537db..56333f9497bf4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java @@ -107,6 +107,15 @@ public DbAndCacheAndStatistics(final RocksDB db, final Cache cache, final Statis private Sensor numberOfFileErrorsSensor; private final Map storeToValueProviders = new ConcurrentHashMap<>(); + // Guards reads of the value providers (in record() and in the property / block-cache + // gauges) against their removal in removeValueProviders(). RocksDBStore.close() calls + // removeValueProviders() and then closes (frees) the native RocksDB and Statistics. + // Without this lock a metrics read - triggered by the recording trigger, a metrics + // reporter, or a JMX scrape evaluating a gauge - can dereference a value provider that + // close() is concurrently freeing, causing a native use-after-free (SIGSEGV in e.g. + // rocksdb::DBImpl::GetAggregatedIntProperty or Statistics::getAndResetTickerCount). + // See KAFKA-10025. + private final Object valueProvidersLock = new Object(); private final String metricsScope; private final String storeName; private TaskId taskId; @@ -157,17 +166,19 @@ public void addValueProviders(final String segmentName, final RocksDB db, final Cache cache, final Statistics statistics) { - if (storeToValueProviders.isEmpty()) { - logger.debug("Adding metrics recorder of task {} to metrics recording trigger", taskId); - streamsMetrics.rocksDBMetricsRecordingTrigger().addMetricsRecorder(this); - } else if (storeToValueProviders.containsKey(segmentName)) { - throw new IllegalStateException("Value providers for store " + segmentName + " of task " + taskId + - " has been already added. This is a bug in Kafka Streams. " + - "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues"); + synchronized (valueProvidersLock) { + if (storeToValueProviders.isEmpty()) { + logger.debug("Adding metrics recorder of task {} to metrics recording trigger", taskId); + streamsMetrics.rocksDBMetricsRecordingTrigger().addMetricsRecorder(this); + } else if (storeToValueProviders.containsKey(segmentName)) { + throw new IllegalStateException("Value providers for store " + segmentName + " of task " + taskId + + " has been already added. This is a bug in Kafka Streams. " + + "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues"); + } + verifyDbAndCacheAndStatistics(segmentName, db, cache, statistics); + logger.debug("Adding value providers for store {} of task {}", segmentName, taskId); + storeToValueProviders.put(segmentName, new DbAndCacheAndStatistics(db, cache, statistics)); } - verifyDbAndCacheAndStatistics(segmentName, db, cache, statistics); - logger.debug("Adding value providers for store {} of task {}", segmentName, taskId); - storeToValueProviders.put(segmentName, new DbAndCacheAndStatistics(db, cache, statistics)); } private void verifyDbAndCacheAndStatistics(final String segmentName, @@ -350,15 +361,19 @@ private void initGauges(final StreamsMetricsImpl streamsMetrics, private Gauge gaugeToComputeSumOfProperties(final String propertyName) { return (metricsConfig, now) -> { BigInteger result = BigInteger.valueOf(0); - for (final DbAndCacheAndStatistics valueProvider : storeToValueProviders.values()) { - try { - // values of RocksDB properties are of type unsigned long in C++, i.e., in Java we need to use - // BigInteger and construct the object from the byte representation of the value - result = result.add(new BigInteger(1, longToBytes( - valueProvider.db.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName) - ))); - } catch (final RocksDBException e) { - throw new ProcessorStateException("Error recording RocksDB metric " + propertyName, e); + // Read the value providers under the lock so a store cannot be closed (and its + // native RocksDB freed) while getAggregatedLongProperty() is executing on it. + synchronized (valueProvidersLock) { + for (final DbAndCacheAndStatistics valueProvider : storeToValueProviders.values()) { + try { + // values of RocksDB properties are of type unsigned long in C++, i.e., in Java we need to use + // BigInteger and construct the object from the byte representation of the value + result = result.add(new BigInteger(1, longToBytes( + valueProvider.db.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName) + ))); + } catch (final RocksDBException e) { + throw new ProcessorStateException("Error recording RocksDB metric " + propertyName, e); + } } } return result; @@ -368,24 +383,28 @@ private Gauge gaugeToComputeSumOfProperties(final String propertyNam private Gauge gaugeToComputeBlockCacheMetrics(final String propertyName) { return (metricsConfig, now) -> { BigInteger result = BigInteger.valueOf(0); - for (final DbAndCacheAndStatistics valueProvider : storeToValueProviders.values()) { - try { - if (singleCache) { - // values of RocksDB properties are of type unsigned long in C++, i.e., in Java we need to use - // BigInteger and construct the object from the byte representation of the value - result = new BigInteger(1, longToBytes( - valueProvider.db.getLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName) - )); - break; - } else { - // values of RocksDB properties are of type unsigned long in C++, i.e., in Java we need to use - // BigInteger and construct the object from the byte representation of the value - result = result.add(new BigInteger(1, longToBytes( - valueProvider.db.getLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName) - ))); + // Read the value providers under the lock so a store cannot be closed (and its + // native RocksDB freed) while getLongProperty() is executing on it. + synchronized (valueProvidersLock) { + for (final DbAndCacheAndStatistics valueProvider : storeToValueProviders.values()) { + try { + if (singleCache) { + // values of RocksDB properties are of type unsigned long in C++, i.e., in Java we need to use + // BigInteger and construct the object from the byte representation of the value + result = new BigInteger(1, longToBytes( + valueProvider.db.getLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName) + )); + break; + } else { + // values of RocksDB properties are of type unsigned long in C++, i.e., in Java we need to use + // BigInteger and construct the object from the byte representation of the value + result = result.add(new BigInteger(1, longToBytes( + valueProvider.db.getLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName) + ))); + } + } catch (final RocksDBException e) { + throw new ProcessorStateException("Error recording RocksDB metric " + propertyName, e); } - } catch (final RocksDBException e) { - throw new ProcessorStateException("Error recording RocksDB metric " + propertyName, e); } } return result; @@ -399,20 +418,26 @@ private static byte[] longToBytes(final long data) { } public void removeValueProviders(final String segmentName) { - logger.debug("Removing value providers for store {} of task {}", segmentName, taskId); - final DbAndCacheAndStatistics removedValueProviders = storeToValueProviders.remove(segmentName); - if (removedValueProviders == null) { - throw new IllegalStateException("No value providers for store \"" + segmentName + "\" of task " + taskId + - " could be found. This is a bug in Kafka Streams. " + - "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues"); - } - if (storeToValueProviders.isEmpty()) { - logger.debug( - "Removing metrics recorder for store {} of task {} from metrics recording trigger", - storeName, - taskId - ); - streamsMetrics.rocksDBMetricsRecordingTrigger().removeMetricsRecorder(this); + // Acquiring the lock here waits for any in-flight record()/gauge read of the value + // providers to finish before the caller (RocksDBStore.close()) frees the native + // RocksDB / Statistics. After this returns the segment is no longer in the map, so + // no subsequent read can dereference it. This is what prevents the use-after-free. + synchronized (valueProvidersLock) { + logger.debug("Removing value providers for store {} of task {}", segmentName, taskId); + final DbAndCacheAndStatistics removedValueProviders = storeToValueProviders.remove(segmentName); + if (removedValueProviders == null) { + throw new IllegalStateException("No value providers for store \"" + segmentName + "\" of task " + taskId + + " could be found. This is a bug in Kafka Streams. " + + "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues"); + } + if (storeToValueProviders.isEmpty()) { + logger.debug( + "Removing metrics recorder for store {} of task {} from metrics recording trigger", + storeName, + taskId + ); + streamsMetrics.rocksDBMetricsRecordingTrigger().removeMetricsRecorder(this); + } } } @@ -443,37 +468,42 @@ public void record(final long now) { double compactionTimeMin = Double.MAX_VALUE; double compactionTimeMax = 0.0; boolean shouldRecord = true; - for (final DbAndCacheAndStatistics valueProviders : storeToValueProviders.values()) { - if (valueProviders.statistics == null) { - shouldRecord = false; - break; + // Read the value providers under the lock so a store cannot be closed (and its + // native Statistics freed) while these getAndResetTickerCount()/getHistogramData() + // reads are executing on it. See KAFKA-10025. + synchronized (valueProvidersLock) { + for (final DbAndCacheAndStatistics valueProviders : storeToValueProviders.values()) { + if (valueProviders.statistics == null) { + shouldRecord = false; + break; + } + bytesWrittenToDatabase += valueProviders.statistics.getAndResetTickerCount(TickerType.BYTES_WRITTEN); + bytesReadFromDatabase += valueProviders.statistics.getAndResetTickerCount(TickerType.BYTES_READ); + memtableBytesFlushed += valueProviders.statistics.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES); + memtableHits += valueProviders.statistics.getAndResetTickerCount(TickerType.MEMTABLE_HIT); + memtableMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.MEMTABLE_MISS); + blockCacheDataHits += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT); + blockCacheDataMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS); + blockCacheIndexHits += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT); + blockCacheIndexMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS); + blockCacheFilterHits += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT); + blockCacheFilterMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS); + writeStallDuration += valueProviders.statistics.getAndResetTickerCount(TickerType.STALL_MICROS); + bytesWrittenDuringCompaction += valueProviders.statistics.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES); + bytesReadDuringCompaction += valueProviders.statistics.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES); + numberOfOpenFiles = -1; + numberOfFileErrors += valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_ERRORS); + final HistogramData memtableFlushTimeData = valueProviders.statistics.getHistogramData(HistogramType.FLUSH_TIME); + memtableFlushTimeSum += memtableFlushTimeData.getSum(); + memtableFlushTimeCount += memtableFlushTimeData.getCount(); + memtableFlushTimeMin = Double.min(memtableFlushTimeMin, memtableFlushTimeData.getMin()); + memtableFlushTimeMax = Double.max(memtableFlushTimeMax, memtableFlushTimeData.getMax()); + final HistogramData compactionTimeData = valueProviders.statistics.getHistogramData(HistogramType.COMPACTION_TIME); + compactionTimeSum += compactionTimeData.getSum(); + compactionTimeCount += compactionTimeData.getCount(); + compactionTimeMin = Double.min(compactionTimeMin, compactionTimeData.getMin()); + compactionTimeMax = Double.max(compactionTimeMax, compactionTimeData.getMax()); } - bytesWrittenToDatabase += valueProviders.statistics.getAndResetTickerCount(TickerType.BYTES_WRITTEN); - bytesReadFromDatabase += valueProviders.statistics.getAndResetTickerCount(TickerType.BYTES_READ); - memtableBytesFlushed += valueProviders.statistics.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES); - memtableHits += valueProviders.statistics.getAndResetTickerCount(TickerType.MEMTABLE_HIT); - memtableMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.MEMTABLE_MISS); - blockCacheDataHits += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT); - blockCacheDataMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS); - blockCacheIndexHits += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT); - blockCacheIndexMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS); - blockCacheFilterHits += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT); - blockCacheFilterMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS); - writeStallDuration += valueProviders.statistics.getAndResetTickerCount(TickerType.STALL_MICROS); - bytesWrittenDuringCompaction += valueProviders.statistics.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES); - bytesReadDuringCompaction += valueProviders.statistics.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES); - numberOfOpenFiles = -1; - numberOfFileErrors += valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_ERRORS); - final HistogramData memtableFlushTimeData = valueProviders.statistics.getHistogramData(HistogramType.FLUSH_TIME); - memtableFlushTimeSum += memtableFlushTimeData.getSum(); - memtableFlushTimeCount += memtableFlushTimeData.getCount(); - memtableFlushTimeMin = Double.min(memtableFlushTimeMin, memtableFlushTimeData.getMin()); - memtableFlushTimeMax = Double.max(memtableFlushTimeMax, memtableFlushTimeData.getMax()); - final HistogramData compactionTimeData = valueProviders.statistics.getHistogramData(HistogramType.COMPACTION_TIME); - compactionTimeSum += compactionTimeData.getSum(); - compactionTimeCount += compactionTimeData.getCount(); - compactionTimeMin = Double.min(compactionTimeMin, compactionTimeData.getMin()); - compactionTimeMax = Double.max(compactionTimeMax, compactionTimeData.getMax()); } if (shouldRecord) { bytesWrittenToDatabaseSensor.record(bytesWrittenToDatabase, now); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java index 113de5959a4ed..9f970f1878402 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.state.internals.metrics; -import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; @@ -25,12 +24,15 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.rocksdb.Cache; import org.rocksdb.RocksDB; import org.rocksdb.Statistics; import java.math.BigInteger; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; @@ -254,20 +256,88 @@ private void verifyMetrics(final StreamsMetricsImpl streamsMetrics, final String propertyName, final long expectedValue) { - final Map metrics = streamsMetrics.metrics(); + final KafkaMetric metric = getMetric(streamsMetrics, propertyName); + + assertThat(metric, notNullValue()); + assertThat(metric.metricValue(), is(BigInteger.valueOf(expectedValue))); + } + + private KafkaMetric getMetric(final StreamsMetricsImpl streamsMetrics, final String propertyName) { final Map tagMap = mkMap( mkEntry(THREAD_ID_TAG, Thread.currentThread().getName()), mkEntry(TASK_ID_TAG, TASK_ID.toString()), mkEntry(METRICS_SCOPE + "-" + STORE_ID_TAG, STORE_NAME) ); - final KafkaMetric metric = (KafkaMetric) metrics.get(new MetricName( + return (KafkaMetric) streamsMetrics.metrics().get(new MetricName( propertyName, STATE_STORE_LEVEL_GROUP, "description is ignored", tagMap )); + } + + /** + * A metrics read (e.g. a JMX scrape or metrics reporter) evaluates a RocksDB property + * gauge, which calls {@code db.getAggregatedLongProperty(...)}, at the same time a store + * is being closed. {@code RocksDBStore.close()} calls {@code removeValueProviders(...)} + * and then closes (frees) the native RocksDB. If {@code removeValueProviders(...)} could + * return while the gauge is mid-read, the caller would then free the native handle the + * gauge is still dereferencing, causing a native use-after-free (SIGSEGV in + * {@code rocksdb::DBImpl::GetAggregatedIntProperty}; the sibling + * {@code Statistics::getAndResetTickerCount} variant is KAFKA-10025). This test asserts + * the gauge read and value-provider removal are mutually exclusive. + */ + @Test + @Timeout(30) + public void shouldNotRemoveValueProvidersWhileGaugeIsReadingThem() throws Exception { + final StreamsMetricsImpl streamsMetrics = + new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()); + final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME); + recorder.init(streamsMetrics, TASK_ID); + recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); + final CountDownLatch gaugeReadInProgress = new CountDownLatch(1); + final CountDownLatch allowGaugeReadToComplete = new CountDownLatch(1); + when(dbToAdd1.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + ESTIMATED_NUMBER_OF_KEYS)) + .thenAnswer(invocation -> { + gaugeReadInProgress.countDown(); + allowGaugeReadToComplete.await(10, TimeUnit.SECONDS); + return 5L; + }); + + final KafkaMetric metric = getMetric(streamsMetrics, ESTIMATED_NUMBER_OF_KEYS); assertThat(metric, notNullValue()); - assertThat(metric.metricValue(), is(BigInteger.valueOf(expectedValue))); + + // Evaluate the gauge on a separate thread; it blocks inside getAggregatedLongProperty + // while holding the recorder's value-providers lock. + final Thread gaugeReader = new Thread(metric::metricValue, "gauge-reader"); + gaugeReader.start(); + assertThat("gauge read did not start", gaugeReadInProgress.await(10, TimeUnit.SECONDS), is(true)); + + // While the gauge read is in progress, removeValueProviders() (called by + // RocksDBStore.close() before it frees the native db) must not be able to proceed. + final CountDownLatch removeReturned = new CountDownLatch(1); + final Thread storeCloser = new Thread(() -> { + recorder.removeValueProviders(SEGMENT_STORE_NAME_1); + removeReturned.countDown(); + }, "store-closer"); + storeCloser.start(); + + assertThat( + "removeValueProviders() returned while a gauge read was in progress - the use-after-free window is open", + removeReturned.await(500, TimeUnit.MILLISECONDS), + is(false) + ); + + // Once the gauge read completes, removeValueProviders() must be able to proceed. + allowGaugeReadToComplete.countDown(); + assertThat( + "removeValueProviders() did not proceed after the gauge read finished", + removeReturned.await(10, TimeUnit.SECONDS), + is(true) + ); + + gaugeReader.join(TimeUnit.SECONDS.toMillis(10)); + storeCloser.join(TimeUnit.SECONDS.toMillis(10)); } }