Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,15 @@ public DbAndCacheAndStatistics(final RocksDB db, final Cache cache, final Statis
private Sensor numberOfFileErrorsSensor;

private final Map<String, DbAndCacheAndStatistics> storeToValueProviders = new ConcurrentHashMap<>();
// Guards reads of the value providers (in record() and in the property / block-cache
// gauges) against their removal in removeValueProviders(). RocksDBStore.close() calls
// removeValueProviders() and then closes (frees) the native RocksDB and Statistics.
// Without this lock a metrics read - triggered by the recording trigger, a metrics
// reporter, or a JMX scrape evaluating a gauge - can dereference a value provider that
// close() is concurrently freeing, causing a native use-after-free (SIGSEGV in e.g.
// rocksdb::DBImpl::GetAggregatedIntProperty or Statistics::getAndResetTickerCount).
// See KAFKA-10025.
private final Object valueProvidersLock = new Object();
private final String metricsScope;
private final String storeName;
private TaskId taskId;
Expand Down Expand Up @@ -157,17 +166,19 @@ public void addValueProviders(final String segmentName,
final RocksDB db,
final Cache cache,
final Statistics statistics) {
if (storeToValueProviders.isEmpty()) {
logger.debug("Adding metrics recorder of task {} to metrics recording trigger", taskId);
streamsMetrics.rocksDBMetricsRecordingTrigger().addMetricsRecorder(this);
} else if (storeToValueProviders.containsKey(segmentName)) {
throw new IllegalStateException("Value providers for store " + segmentName + " of task " + taskId +
" has been already added. This is a bug in Kafka Streams. " +
"Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
synchronized (valueProvidersLock) {
if (storeToValueProviders.isEmpty()) {
logger.debug("Adding metrics recorder of task {} to metrics recording trigger", taskId);
streamsMetrics.rocksDBMetricsRecordingTrigger().addMetricsRecorder(this);
} else if (storeToValueProviders.containsKey(segmentName)) {
throw new IllegalStateException("Value providers for store " + segmentName + " of task " + taskId +
" has been already added. This is a bug in Kafka Streams. " +
"Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
}
verifyDbAndCacheAndStatistics(segmentName, db, cache, statistics);
logger.debug("Adding value providers for store {} of task {}", segmentName, taskId);
storeToValueProviders.put(segmentName, new DbAndCacheAndStatistics(db, cache, statistics));
}
verifyDbAndCacheAndStatistics(segmentName, db, cache, statistics);
logger.debug("Adding value providers for store {} of task {}", segmentName, taskId);
storeToValueProviders.put(segmentName, new DbAndCacheAndStatistics(db, cache, statistics));
}

private void verifyDbAndCacheAndStatistics(final String segmentName,
Expand Down Expand Up @@ -350,15 +361,19 @@ private void initGauges(final StreamsMetricsImpl streamsMetrics,
private Gauge<BigInteger> gaugeToComputeSumOfProperties(final String propertyName) {
return (metricsConfig, now) -> {
BigInteger result = BigInteger.valueOf(0);
for (final DbAndCacheAndStatistics valueProvider : storeToValueProviders.values()) {
try {
// values of RocksDB properties are of type unsigned long in C++, i.e., in Java we need to use
// BigInteger and construct the object from the byte representation of the value
result = result.add(new BigInteger(1, longToBytes(
valueProvider.db.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName)
)));
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error recording RocksDB metric " + propertyName, e);
// Read the value providers under the lock so a store cannot be closed (and its
// native RocksDB freed) while getAggregatedLongProperty() is executing on it.
synchronized (valueProvidersLock) {
for (final DbAndCacheAndStatistics valueProvider : storeToValueProviders.values()) {
try {
// values of RocksDB properties are of type unsigned long in C++, i.e., in Java we need to use
// BigInteger and construct the object from the byte representation of the value
result = result.add(new BigInteger(1, longToBytes(
valueProvider.db.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName)
)));
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error recording RocksDB metric " + propertyName, e);
}
}
}
return result;
Expand All @@ -368,24 +383,28 @@ private Gauge<BigInteger> gaugeToComputeSumOfProperties(final String propertyNam
private Gauge<BigInteger> gaugeToComputeBlockCacheMetrics(final String propertyName) {
return (metricsConfig, now) -> {
BigInteger result = BigInteger.valueOf(0);
for (final DbAndCacheAndStatistics valueProvider : storeToValueProviders.values()) {
try {
if (singleCache) {
// values of RocksDB properties are of type unsigned long in C++, i.e., in Java we need to use
// BigInteger and construct the object from the byte representation of the value
result = new BigInteger(1, longToBytes(
valueProvider.db.getLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName)
));
break;
} else {
// values of RocksDB properties are of type unsigned long in C++, i.e., in Java we need to use
// BigInteger and construct the object from the byte representation of the value
result = result.add(new BigInteger(1, longToBytes(
valueProvider.db.getLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName)
)));
// Read the value providers under the lock so a store cannot be closed (and its
// native RocksDB freed) while getLongProperty() is executing on it.
synchronized (valueProvidersLock) {
for (final DbAndCacheAndStatistics valueProvider : storeToValueProviders.values()) {
try {
if (singleCache) {
// values of RocksDB properties are of type unsigned long in C++, i.e., in Java we need to use
// BigInteger and construct the object from the byte representation of the value
result = new BigInteger(1, longToBytes(
valueProvider.db.getLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName)
));
break;
} else {
// values of RocksDB properties are of type unsigned long in C++, i.e., in Java we need to use
// BigInteger and construct the object from the byte representation of the value
result = result.add(new BigInteger(1, longToBytes(
valueProvider.db.getLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName)
)));
}
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error recording RocksDB metric " + propertyName, e);
}
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error recording RocksDB metric " + propertyName, e);
}
}
return result;
Expand All @@ -399,20 +418,26 @@ private static byte[] longToBytes(final long data) {
}

public void removeValueProviders(final String segmentName) {
logger.debug("Removing value providers for store {} of task {}", segmentName, taskId);
final DbAndCacheAndStatistics removedValueProviders = storeToValueProviders.remove(segmentName);
if (removedValueProviders == null) {
throw new IllegalStateException("No value providers for store \"" + segmentName + "\" of task " + taskId +
" could be found. This is a bug in Kafka Streams. " +
"Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
}
if (storeToValueProviders.isEmpty()) {
logger.debug(
"Removing metrics recorder for store {} of task {} from metrics recording trigger",
storeName,
taskId
);
streamsMetrics.rocksDBMetricsRecordingTrigger().removeMetricsRecorder(this);
// Acquiring the lock here waits for any in-flight record()/gauge read of the value
// providers to finish before the caller (RocksDBStore.close()) frees the native
// RocksDB / Statistics. After this returns the segment is no longer in the map, so
// no subsequent read can dereference it. This is what prevents the use-after-free.
synchronized (valueProvidersLock) {
logger.debug("Removing value providers for store {} of task {}", segmentName, taskId);
final DbAndCacheAndStatistics removedValueProviders = storeToValueProviders.remove(segmentName);
if (removedValueProviders == null) {
throw new IllegalStateException("No value providers for store \"" + segmentName + "\" of task " + taskId +
" could be found. This is a bug in Kafka Streams. " +
"Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
}
if (storeToValueProviders.isEmpty()) {
logger.debug(
"Removing metrics recorder for store {} of task {} from metrics recording trigger",
storeName,
taskId
);
streamsMetrics.rocksDBMetricsRecordingTrigger().removeMetricsRecorder(this);
}
}
}

Expand Down Expand Up @@ -443,37 +468,42 @@ public void record(final long now) {
double compactionTimeMin = Double.MAX_VALUE;
double compactionTimeMax = 0.0;
boolean shouldRecord = true;
for (final DbAndCacheAndStatistics valueProviders : storeToValueProviders.values()) {
if (valueProviders.statistics == null) {
shouldRecord = false;
break;
// Read the value providers under the lock so a store cannot be closed (and its
// native Statistics freed) while these getAndResetTickerCount()/getHistogramData()
// reads are executing on it. See KAFKA-10025.
synchronized (valueProvidersLock) {
for (final DbAndCacheAndStatistics valueProviders : storeToValueProviders.values()) {
if (valueProviders.statistics == null) {
shouldRecord = false;
break;
}
bytesWrittenToDatabase += valueProviders.statistics.getAndResetTickerCount(TickerType.BYTES_WRITTEN);
bytesReadFromDatabase += valueProviders.statistics.getAndResetTickerCount(TickerType.BYTES_READ);
memtableBytesFlushed += valueProviders.statistics.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES);
memtableHits += valueProviders.statistics.getAndResetTickerCount(TickerType.MEMTABLE_HIT);
memtableMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.MEMTABLE_MISS);
blockCacheDataHits += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT);
blockCacheDataMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS);
blockCacheIndexHits += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT);
blockCacheIndexMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS);
blockCacheFilterHits += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT);
blockCacheFilterMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS);
writeStallDuration += valueProviders.statistics.getAndResetTickerCount(TickerType.STALL_MICROS);
bytesWrittenDuringCompaction += valueProviders.statistics.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES);
bytesReadDuringCompaction += valueProviders.statistics.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES);
numberOfOpenFiles = -1;
numberOfFileErrors += valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_ERRORS);
final HistogramData memtableFlushTimeData = valueProviders.statistics.getHistogramData(HistogramType.FLUSH_TIME);
memtableFlushTimeSum += memtableFlushTimeData.getSum();
memtableFlushTimeCount += memtableFlushTimeData.getCount();
memtableFlushTimeMin = Double.min(memtableFlushTimeMin, memtableFlushTimeData.getMin());
memtableFlushTimeMax = Double.max(memtableFlushTimeMax, memtableFlushTimeData.getMax());
final HistogramData compactionTimeData = valueProviders.statistics.getHistogramData(HistogramType.COMPACTION_TIME);
compactionTimeSum += compactionTimeData.getSum();
compactionTimeCount += compactionTimeData.getCount();
compactionTimeMin = Double.min(compactionTimeMin, compactionTimeData.getMin());
compactionTimeMax = Double.max(compactionTimeMax, compactionTimeData.getMax());
}
bytesWrittenToDatabase += valueProviders.statistics.getAndResetTickerCount(TickerType.BYTES_WRITTEN);
bytesReadFromDatabase += valueProviders.statistics.getAndResetTickerCount(TickerType.BYTES_READ);
memtableBytesFlushed += valueProviders.statistics.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES);
memtableHits += valueProviders.statistics.getAndResetTickerCount(TickerType.MEMTABLE_HIT);
memtableMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.MEMTABLE_MISS);
blockCacheDataHits += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT);
blockCacheDataMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS);
blockCacheIndexHits += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT);
blockCacheIndexMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS);
blockCacheFilterHits += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT);
blockCacheFilterMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS);
writeStallDuration += valueProviders.statistics.getAndResetTickerCount(TickerType.STALL_MICROS);
bytesWrittenDuringCompaction += valueProviders.statistics.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES);
bytesReadDuringCompaction += valueProviders.statistics.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES);
numberOfOpenFiles = -1;
numberOfFileErrors += valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_ERRORS);
final HistogramData memtableFlushTimeData = valueProviders.statistics.getHistogramData(HistogramType.FLUSH_TIME);
memtableFlushTimeSum += memtableFlushTimeData.getSum();
memtableFlushTimeCount += memtableFlushTimeData.getCount();
memtableFlushTimeMin = Double.min(memtableFlushTimeMin, memtableFlushTimeData.getMin());
memtableFlushTimeMax = Double.max(memtableFlushTimeMax, memtableFlushTimeData.getMax());
final HistogramData compactionTimeData = valueProviders.statistics.getHistogramData(HistogramType.COMPACTION_TIME);
compactionTimeSum += compactionTimeData.getSum();
compactionTimeCount += compactionTimeData.getCount();
compactionTimeMin = Double.min(compactionTimeMin, compactionTimeData.getMin());
compactionTimeMax = Double.max(compactionTimeMax, compactionTimeData.getMax());
}
if (shouldRecord) {
bytesWrittenToDatabaseSensor.record(bytesWrittenToDatabase, now);
Expand Down
Loading