KAFKA-10025: guard RocksDBMetricsRecorder value provider reads against store close#22717
Open
Aangbaeck wants to merge 1 commit into
Open
KAFKA-10025: guard RocksDBMetricsRecorder value provider reads against store close#22717Aangbaeck wants to merge 1 commit into
Aangbaeck wants to merge 1 commit into
Conversation
…t store close RocksDBMetricsRecorder reads native RocksDB value providers (RocksDB and Statistics) in record() (getAndResetTickerCount / getHistogramData), in the property gauges (RocksDB.getAggregatedLongProperty) and in the block-cache gauges (RocksDB.getLongProperty). These reads had no mutual exclusion against removeValueProviders(). RocksDBStore.close() calls removeValueProviders() and then closes (frees) the native RocksDB and Statistics. Because the reads and the removal were not mutually exclusive, a metrics read that is in flight when a store is closed (e.g. during a rebalance / task migration) can dereference a native handle that close() is concurrently freeing, causing a native use-after-free / SIGSEGV. storeToValueProviders being a ConcurrentHashMap only makes the map operations safe; it does not prevent a reader that already holds a DbAndCacheAndStatistics from calling into a db/statistics that close() frees. Two observed crash frames, same root cause: - record() path -> Statistics::getAndResetTickerCount (this ticket) - gauge path -> rocksdb::DBImpl::GetAggregatedIntProperty (property gauges, registered at RecordingLevel.INFO, so reachable via a metrics reporter/JMX scrape even at metrics.recording.level=INFO) Fix: hold a single lock around every read of the value providers (record() and both gauge lambdas) and around the map mutations (addValueProviders / removeValueProviders). Since RocksDBStore.close() calls removeValueProviders() before freeing the native handles, acquiring the lock there waits for any in-flight read to finish and prevents any later read from seeing the segment, so no read can dereference a freed handle. The recording trigger holds no lock while calling record(), and the guarded reads never call back into RocksDBStore, so no lock-ordering cycle is introduced. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What
RocksDBMetricsRecorderreads native RocksDB value providers (RocksDBandStatistics)in three places:
record()—statistics.getAndResetTickerCount(...)/getHistogramData(...)gaugeToComputeSumOfProperties) —db.getAggregatedLongProperty(...)gaugeToComputeBlockCacheMetrics) —db.getLongProperty(...)These run with no mutual exclusion against
removeValueProviders(...).RocksDBStore.close()calls
removeValueProviders(name)and then closes (frees) the native RocksDB andStatistics. Because the reads and the removal are not mutually exclusive, a metrics read
that is in flight when a store is closed (during a rebalance / task migration) can
dereference a native handle that
close()is concurrently freeing — a nativeuse-after-free / SIGSEGV.
Two observed crash frames, same root cause:
record()path →Statistics::getAndResetTickerCount— this is KAFKA-10025 (open since 2020).rocksdb::DBImpl::GetAggregatedIntProperty— observed in production under afrom-zero state rebuild, where warmup/probing rebalances close stores continuously while a
metrics reporter / JMX scrape evaluates the (INFO-level) RocksDB property gauges.
Note the gauge metrics are registered at
RecordingLevel.INFO, so they are active andscraped even when
metrics.recording.level=INFO; only therecord()(statistics) path isgated to DEBUG. So the crash is reachable at INFO.
Why the current code is unsafe
storeToValueProvidersis aConcurrentHashMap, which makes the map operationsthread-safe, but does not prevent a reader that has already obtained a
DbAndCacheAndStatisticsfrom calling a native method on itsdb/statisticsafter (orwhile)
RocksDBStore.close()frees them. There is no happens-before between "recorder readsthe provider" and "store closes the provider".
Fix
Introduce a single lock (
valueProvidersLock) inRocksDBMetricsRecorderand hold itaround every read of the value providers (
record(), both gauge lambdas) and around themap mutations (
addValueProviders,removeValueProviders). SinceRocksDBStore.close()already calls
removeValueProviders(...)before it frees the native db/statistics,removeValueProviders(...)acquiring the lock guarantees:so no read can ever dereference a freed handle.
No lock-ordering risk:
RocksDBMetricsRecordingTriggerholds no lock while callingrecord(), and the guarded reads never call back intoRocksDBStore.RocksDBStore.close()takes the store monitor then this lock; opens take the same order — consistent, no cycle.
Testing
RocksDBMetricsRecorderGaugesTest#shouldNotRemoveValueProvidersWhileGaugeIsReadingThem:blocks a gauge evaluation inside
getAggregatedLongProperty(holding the lock) and assertsremoveValueProviders(...)cannot return until the read completes — i.e. the use-after-freewindow is closed. Verified it fails without the fix (
AssertionError: …the use-after-free window is open) and passes with it.RocksDBMetricsRecorderTest/RocksDBMetricsRecorderGaugesTest, checkstyle andspotbugs all pass.
End-to-end confirmation (outside this PR, in a standalone Docker harness): a real Kafka
Streams app on the released
kafka-streams 8.2.1-ce, atmetrics.recording.level=INFO, witha JMX-style metrics scrape (reading the INFO-level RocksDB property gauges) plus forced
rebalances, SIGSEGVs in
rocksdb::DBImpl::GetAggregatedIntProperty+0x83on a scrape threadafter ~27M gauge reads. Running the same binary with only
RocksDBMetricsRecorderreplacedby the patched class (classpath shadow, load-verified) survived 600s / ~336M gauge reads / 25
rebalances with zero crashes. The exact native crash was also reproduced in a pure
rocksdbjniharness by racing
getAggregatedLongPropertyagainst a concurrent DB close+reopen.