diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java index 43fe5181537db..56333f9497bf4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java @@ -107,6 +107,15 @@ public DbAndCacheAndStatistics(final RocksDB db, final Cache cache, final Statis private Sensor numberOfFileErrorsSensor; private final Map storeToValueProviders = new ConcurrentHashMap<>(); + // Guards reads of the value providers (in record() and in the property / block-cache + // gauges) against their removal in removeValueProviders(). RocksDBStore.close() calls + // removeValueProviders() and then closes (frees) the native RocksDB and Statistics. + // Without this lock a metrics read - triggered by the recording trigger, a metrics + // reporter, or a JMX scrape evaluating a gauge - can dereference a value provider that + // close() is concurrently freeing, causing a native use-after-free (SIGSEGV in e.g. + // rocksdb::DBImpl::GetAggregatedIntProperty or Statistics::getAndResetTickerCount). + // See KAFKA-10025. + private final Object valueProvidersLock = new Object(); private final String metricsScope; private final String storeName; private TaskId taskId; @@ -157,17 +166,19 @@ public void addValueProviders(final String segmentName, final RocksDB db, final Cache cache, final Statistics statistics) { - if (storeToValueProviders.isEmpty()) { - logger.debug("Adding metrics recorder of task {} to metrics recording trigger", taskId); - streamsMetrics.rocksDBMetricsRecordingTrigger().addMetricsRecorder(this); - } else if (storeToValueProviders.containsKey(segmentName)) { - throw new IllegalStateException("Value providers for store " + segmentName + " of task " + taskId + - " has been already added. This is a bug in Kafka Streams. " + - "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues"); + synchronized (valueProvidersLock) { + if (storeToValueProviders.isEmpty()) { + logger.debug("Adding metrics recorder of task {} to metrics recording trigger", taskId); + streamsMetrics.rocksDBMetricsRecordingTrigger().addMetricsRecorder(this); + } else if (storeToValueProviders.containsKey(segmentName)) { + throw new IllegalStateException("Value providers for store " + segmentName + " of task " + taskId + + " has been already added. This is a bug in Kafka Streams. " + + "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues"); + } + verifyDbAndCacheAndStatistics(segmentName, db, cache, statistics); + logger.debug("Adding value providers for store {} of task {}", segmentName, taskId); + storeToValueProviders.put(segmentName, new DbAndCacheAndStatistics(db, cache, statistics)); } - verifyDbAndCacheAndStatistics(segmentName, db, cache, statistics); - logger.debug("Adding value providers for store {} of task {}", segmentName, taskId); - storeToValueProviders.put(segmentName, new DbAndCacheAndStatistics(db, cache, statistics)); } private void verifyDbAndCacheAndStatistics(final String segmentName, @@ -350,15 +361,19 @@ private void initGauges(final StreamsMetricsImpl streamsMetrics, private Gauge gaugeToComputeSumOfProperties(final String propertyName) { return (metricsConfig, now) -> { BigInteger result = BigInteger.valueOf(0); - for (final DbAndCacheAndStatistics valueProvider : storeToValueProviders.values()) { - try { - // values of RocksDB properties are of type unsigned long in C++, i.e., in Java we need to use - // BigInteger and construct the object from the byte representation of the value - result = result.add(new BigInteger(1, longToBytes( - valueProvider.db.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName) - ))); - } catch (final RocksDBException e) { - throw new ProcessorStateException("Error recording RocksDB metric " + propertyName, e); + // Read the value providers under the lock so a store cannot be closed (and its + // native RocksDB freed) while getAggregatedLongProperty() is executing on it. + synchronized (valueProvidersLock) { + for (final DbAndCacheAndStatistics valueProvider : storeToValueProviders.values()) { + try { + // values of RocksDB properties are of type unsigned long in C++, i.e., in Java we need to use + // BigInteger and construct the object from the byte representation of the value + result = result.add(new BigInteger(1, longToBytes( + valueProvider.db.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName) + ))); + } catch (final RocksDBException e) { + throw new ProcessorStateException("Error recording RocksDB metric " + propertyName, e); + } } } return result; @@ -368,24 +383,28 @@ private Gauge gaugeToComputeSumOfProperties(final String propertyNam private Gauge gaugeToComputeBlockCacheMetrics(final String propertyName) { return (metricsConfig, now) -> { BigInteger result = BigInteger.valueOf(0); - for (final DbAndCacheAndStatistics valueProvider : storeToValueProviders.values()) { - try { - if (singleCache) { - // values of RocksDB properties are of type unsigned long in C++, i.e., in Java we need to use - // BigInteger and construct the object from the byte representation of the value - result = new BigInteger(1, longToBytes( - valueProvider.db.getLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName) - )); - break; - } else { - // values of RocksDB properties are of type unsigned long in C++, i.e., in Java we need to use - // BigInteger and construct the object from the byte representation of the value - result = result.add(new BigInteger(1, longToBytes( - valueProvider.db.getLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName) - ))); + // Read the value providers under the lock so a store cannot be closed (and its + // native RocksDB freed) while getLongProperty() is executing on it. + synchronized (valueProvidersLock) { + for (final DbAndCacheAndStatistics valueProvider : storeToValueProviders.values()) { + try { + if (singleCache) { + // values of RocksDB properties are of type unsigned long in C++, i.e., in Java we need to use + // BigInteger and construct the object from the byte representation of the value + result = new BigInteger(1, longToBytes( + valueProvider.db.getLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName) + )); + break; + } else { + // values of RocksDB properties are of type unsigned long in C++, i.e., in Java we need to use + // BigInteger and construct the object from the byte representation of the value + result = result.add(new BigInteger(1, longToBytes( + valueProvider.db.getLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName) + ))); + } + } catch (final RocksDBException e) { + throw new ProcessorStateException("Error recording RocksDB metric " + propertyName, e); } - } catch (final RocksDBException e) { - throw new ProcessorStateException("Error recording RocksDB metric " + propertyName, e); } } return result; @@ -399,20 +418,26 @@ private static byte[] longToBytes(final long data) { } public void removeValueProviders(final String segmentName) { - logger.debug("Removing value providers for store {} of task {}", segmentName, taskId); - final DbAndCacheAndStatistics removedValueProviders = storeToValueProviders.remove(segmentName); - if (removedValueProviders == null) { - throw new IllegalStateException("No value providers for store \"" + segmentName + "\" of task " + taskId + - " could be found. This is a bug in Kafka Streams. " + - "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues"); - } - if (storeToValueProviders.isEmpty()) { - logger.debug( - "Removing metrics recorder for store {} of task {} from metrics recording trigger", - storeName, - taskId - ); - streamsMetrics.rocksDBMetricsRecordingTrigger().removeMetricsRecorder(this); + // Acquiring the lock here waits for any in-flight record()/gauge read of the value + // providers to finish before the caller (RocksDBStore.close()) frees the native + // RocksDB / Statistics. After this returns the segment is no longer in the map, so + // no subsequent read can dereference it. This is what prevents the use-after-free. + synchronized (valueProvidersLock) { + logger.debug("Removing value providers for store {} of task {}", segmentName, taskId); + final DbAndCacheAndStatistics removedValueProviders = storeToValueProviders.remove(segmentName); + if (removedValueProviders == null) { + throw new IllegalStateException("No value providers for store \"" + segmentName + "\" of task " + taskId + + " could be found. This is a bug in Kafka Streams. " + + "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues"); + } + if (storeToValueProviders.isEmpty()) { + logger.debug( + "Removing metrics recorder for store {} of task {} from metrics recording trigger", + storeName, + taskId + ); + streamsMetrics.rocksDBMetricsRecordingTrigger().removeMetricsRecorder(this); + } } } @@ -443,37 +468,42 @@ public void record(final long now) { double compactionTimeMin = Double.MAX_VALUE; double compactionTimeMax = 0.0; boolean shouldRecord = true; - for (final DbAndCacheAndStatistics valueProviders : storeToValueProviders.values()) { - if (valueProviders.statistics == null) { - shouldRecord = false; - break; + // Read the value providers under the lock so a store cannot be closed (and its + // native Statistics freed) while these getAndResetTickerCount()/getHistogramData() + // reads are executing on it. See KAFKA-10025. + synchronized (valueProvidersLock) { + for (final DbAndCacheAndStatistics valueProviders : storeToValueProviders.values()) { + if (valueProviders.statistics == null) { + shouldRecord = false; + break; + } + bytesWrittenToDatabase += valueProviders.statistics.getAndResetTickerCount(TickerType.BYTES_WRITTEN); + bytesReadFromDatabase += valueProviders.statistics.getAndResetTickerCount(TickerType.BYTES_READ); + memtableBytesFlushed += valueProviders.statistics.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES); + memtableHits += valueProviders.statistics.getAndResetTickerCount(TickerType.MEMTABLE_HIT); + memtableMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.MEMTABLE_MISS); + blockCacheDataHits += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT); + blockCacheDataMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS); + blockCacheIndexHits += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT); + blockCacheIndexMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS); + blockCacheFilterHits += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT); + blockCacheFilterMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS); + writeStallDuration += valueProviders.statistics.getAndResetTickerCount(TickerType.STALL_MICROS); + bytesWrittenDuringCompaction += valueProviders.statistics.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES); + bytesReadDuringCompaction += valueProviders.statistics.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES); + numberOfOpenFiles = -1; + numberOfFileErrors += valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_ERRORS); + final HistogramData memtableFlushTimeData = valueProviders.statistics.getHistogramData(HistogramType.FLUSH_TIME); + memtableFlushTimeSum += memtableFlushTimeData.getSum(); + memtableFlushTimeCount += memtableFlushTimeData.getCount(); + memtableFlushTimeMin = Double.min(memtableFlushTimeMin, memtableFlushTimeData.getMin()); + memtableFlushTimeMax = Double.max(memtableFlushTimeMax, memtableFlushTimeData.getMax()); + final HistogramData compactionTimeData = valueProviders.statistics.getHistogramData(HistogramType.COMPACTION_TIME); + compactionTimeSum += compactionTimeData.getSum(); + compactionTimeCount += compactionTimeData.getCount(); + compactionTimeMin = Double.min(compactionTimeMin, compactionTimeData.getMin()); + compactionTimeMax = Double.max(compactionTimeMax, compactionTimeData.getMax()); } - bytesWrittenToDatabase += valueProviders.statistics.getAndResetTickerCount(TickerType.BYTES_WRITTEN); - bytesReadFromDatabase += valueProviders.statistics.getAndResetTickerCount(TickerType.BYTES_READ); - memtableBytesFlushed += valueProviders.statistics.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES); - memtableHits += valueProviders.statistics.getAndResetTickerCount(TickerType.MEMTABLE_HIT); - memtableMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.MEMTABLE_MISS); - blockCacheDataHits += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT); - blockCacheDataMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS); - blockCacheIndexHits += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT); - blockCacheIndexMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS); - blockCacheFilterHits += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT); - blockCacheFilterMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS); - writeStallDuration += valueProviders.statistics.getAndResetTickerCount(TickerType.STALL_MICROS); - bytesWrittenDuringCompaction += valueProviders.statistics.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES); - bytesReadDuringCompaction += valueProviders.statistics.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES); - numberOfOpenFiles = -1; - numberOfFileErrors += valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_ERRORS); - final HistogramData memtableFlushTimeData = valueProviders.statistics.getHistogramData(HistogramType.FLUSH_TIME); - memtableFlushTimeSum += memtableFlushTimeData.getSum(); - memtableFlushTimeCount += memtableFlushTimeData.getCount(); - memtableFlushTimeMin = Double.min(memtableFlushTimeMin, memtableFlushTimeData.getMin()); - memtableFlushTimeMax = Double.max(memtableFlushTimeMax, memtableFlushTimeData.getMax()); - final HistogramData compactionTimeData = valueProviders.statistics.getHistogramData(HistogramType.COMPACTION_TIME); - compactionTimeSum += compactionTimeData.getSum(); - compactionTimeCount += compactionTimeData.getCount(); - compactionTimeMin = Double.min(compactionTimeMin, compactionTimeData.getMin()); - compactionTimeMax = Double.max(compactionTimeMax, compactionTimeData.getMax()); } if (shouldRecord) { bytesWrittenToDatabaseSensor.record(bytesWrittenToDatabase, now); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java index 113de5959a4ed..9f970f1878402 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.state.internals.metrics; -import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; @@ -25,12 +24,15 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.rocksdb.Cache; import org.rocksdb.RocksDB; import org.rocksdb.Statistics; import java.math.BigInteger; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; @@ -254,20 +256,88 @@ private void verifyMetrics(final StreamsMetricsImpl streamsMetrics, final String propertyName, final long expectedValue) { - final Map metrics = streamsMetrics.metrics(); + final KafkaMetric metric = getMetric(streamsMetrics, propertyName); + + assertThat(metric, notNullValue()); + assertThat(metric.metricValue(), is(BigInteger.valueOf(expectedValue))); + } + + private KafkaMetric getMetric(final StreamsMetricsImpl streamsMetrics, final String propertyName) { final Map tagMap = mkMap( mkEntry(THREAD_ID_TAG, Thread.currentThread().getName()), mkEntry(TASK_ID_TAG, TASK_ID.toString()), mkEntry(METRICS_SCOPE + "-" + STORE_ID_TAG, STORE_NAME) ); - final KafkaMetric metric = (KafkaMetric) metrics.get(new MetricName( + return (KafkaMetric) streamsMetrics.metrics().get(new MetricName( propertyName, STATE_STORE_LEVEL_GROUP, "description is ignored", tagMap )); + } + + /** + * A metrics read (e.g. a JMX scrape or metrics reporter) evaluates a RocksDB property + * gauge, which calls {@code db.getAggregatedLongProperty(...)}, at the same time a store + * is being closed. {@code RocksDBStore.close()} calls {@code removeValueProviders(...)} + * and then closes (frees) the native RocksDB. If {@code removeValueProviders(...)} could + * return while the gauge is mid-read, the caller would then free the native handle the + * gauge is still dereferencing, causing a native use-after-free (SIGSEGV in + * {@code rocksdb::DBImpl::GetAggregatedIntProperty}; the sibling + * {@code Statistics::getAndResetTickerCount} variant is KAFKA-10025). This test asserts + * the gauge read and value-provider removal are mutually exclusive. + */ + @Test + @Timeout(30) + public void shouldNotRemoveValueProvidersWhileGaugeIsReadingThem() throws Exception { + final StreamsMetricsImpl streamsMetrics = + new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()); + final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME); + recorder.init(streamsMetrics, TASK_ID); + recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); + final CountDownLatch gaugeReadInProgress = new CountDownLatch(1); + final CountDownLatch allowGaugeReadToComplete = new CountDownLatch(1); + when(dbToAdd1.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + ESTIMATED_NUMBER_OF_KEYS)) + .thenAnswer(invocation -> { + gaugeReadInProgress.countDown(); + allowGaugeReadToComplete.await(10, TimeUnit.SECONDS); + return 5L; + }); + + final KafkaMetric metric = getMetric(streamsMetrics, ESTIMATED_NUMBER_OF_KEYS); assertThat(metric, notNullValue()); - assertThat(metric.metricValue(), is(BigInteger.valueOf(expectedValue))); + + // Evaluate the gauge on a separate thread; it blocks inside getAggregatedLongProperty + // while holding the recorder's value-providers lock. + final Thread gaugeReader = new Thread(metric::metricValue, "gauge-reader"); + gaugeReader.start(); + assertThat("gauge read did not start", gaugeReadInProgress.await(10, TimeUnit.SECONDS), is(true)); + + // While the gauge read is in progress, removeValueProviders() (called by + // RocksDBStore.close() before it frees the native db) must not be able to proceed. + final CountDownLatch removeReturned = new CountDownLatch(1); + final Thread storeCloser = new Thread(() -> { + recorder.removeValueProviders(SEGMENT_STORE_NAME_1); + removeReturned.countDown(); + }, "store-closer"); + storeCloser.start(); + + assertThat( + "removeValueProviders() returned while a gauge read was in progress - the use-after-free window is open", + removeReturned.await(500, TimeUnit.MILLISECONDS), + is(false) + ); + + // Once the gauge read completes, removeValueProviders() must be able to proceed. + allowGaugeReadToComplete.countDown(); + assertThat( + "removeValueProviders() did not proceed after the gauge read finished", + removeReturned.await(10, TimeUnit.SECONDS), + is(true) + ); + + gaugeReader.join(TimeUnit.SECONDS.toMillis(10)); + storeCloser.join(TimeUnit.SECONDS.toMillis(10)); } }