Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ public DbAndCacheAndStatistics(final RocksDB db, final Cache cache, final Statis
private TaskId taskId;
private StreamsMetricsImpl streamsMetrics;
private boolean singleCache = true;
// The statistics-based sensors are registered lazily (see maybeInitStatisticsBasedSensors)
// the first time value providers with a non-null Statistics are added. This guards against
// registering them more than once when segments are added and removed over the lifetime of
// the recorder. See KAFKA-10397.
private boolean statisticsBasedSensorsInitialized = false;

public RocksDBMetricsRecorder(final String metricsScope,
final String storeName) {
Expand Down Expand Up @@ -147,7 +152,11 @@ public void init(final StreamsMetricsImpl streamsMetrics,
"Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
}
final RocksDBMetricContext metricContext = new RocksDBMetricContext(taskId.toString(), metricsScope, storeName);
initSensors(streamsMetrics, metricContext);
// The property / block-cache gauges read the RocksDB instance directly (getAggregatedLongProperty /
// getLongProperty) and are meaningful regardless of whether a Statistics object exists, so they are
// always registered here. The statistics-based sensors, in contrast, are only registered later in
// addValueProviders() once we know the store actually has a (non-user-provided) Statistics to read
// from -- see maybeInitStatisticsBasedSensors() and KAFKA-10397.
initGauges(streamsMetrics, metricContext);
this.taskId = taskId;
this.streamsMetrics = streamsMetrics;
Expand All @@ -168,6 +177,24 @@ public void addValueProviders(final String segmentName,
verifyDbAndCacheAndStatistics(segmentName, db, cache, statistics);
logger.debug("Adding value providers for store {} of task {}", segmentName, taskId);
storeToValueProviders.put(segmentName, new DbAndCacheAndStatistics(db, cache, statistics));
maybeInitStatisticsBasedSensors(statistics);
}

/**
* Registers the statistics-based sensors the first time value providers with a non-null
* {@link Statistics} are added.
*
* <p>If the user supplies their own {@code Statistics} object through the {@code RocksDBConfigSetter},
* {@link org.apache.kafka.streams.state.internals.RocksDBStore} passes {@code null} here, because
* Streams must not read (and thereby reset) the user's statistics. In that case the statistics-based
* metrics can never be recorded, so we do not register them at all rather than exposing metrics that
* are perpetually empty. See KAFKA-10397.
*/
private void maybeInitStatisticsBasedSensors(final Statistics statistics) {
if (statistics != null && !statisticsBasedSensorsInitialized) {
initSensors(streamsMetrics, new RocksDBMetricContext(taskId.toString(), metricsScope, storeName));
statisticsBasedSensorsInitialized = true;
}
}

private void verifyDbAndCacheAndStatistics(final String segmentName,
Expand Down Expand Up @@ -475,7 +502,11 @@ public void record(final long now) {
compactionTimeMin = Double.min(compactionTimeMin, compactionTimeData.getMin());
compactionTimeMax = Double.max(compactionTimeMax, compactionTimeData.getMax());
}
if (shouldRecord) {
// statisticsBasedSensorsInitialized is false when no Statistics is available (e.g. the user
// provided their own via RocksDBConfigSetter, see KAFKA-10397), in which case the sensors were
// never registered and there is nothing to record to. shouldRecord already covers the
// per-provider null-Statistics case; this also guards the corner case of an empty value-provider map.
if (shouldRecord && statisticsBasedSensorsInitialized) {
bytesWrittenToDatabaseSensor.record(bytesWrittenToDatabase, now);
bytesReadFromDatabaseSensor.record(bytesReadFromDatabase, now);
memtableBytesFlushedSensor.record(memtableBytesFlushed, now);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,54 @@ public void shouldVerifyThatMetricsRecordedFromStatisticsGetMeasurementsFromRock
assertThat((double) bytesWrittenTotal.metricValue(), greaterThan(0d));
}

@Test
public void shouldNotRegisterStatisticsBasedMetricsWhenUserProvidesStatistics() {
// KAFKA-10397: when the user supplies their own Statistics object via RocksDBConfigSetter,
// Streams must not read it (that would reset the user's counters), so the statistics-based
// metrics can never be recorded. They must therefore not be exposed at all, even at
// metrics.recording.level=DEBUG. The property-based gauges, which read the RocksDB instance
// directly, must still be exposed.
final TaskId taskId = new TaskId(0, 0);

final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.DEBUG));
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "test-application", time);

final Properties props = StreamsTestUtils.getStreamsConfig();
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, RocksDBConfigSetterWithUserProvidedStatistics.class);

context = mock(InternalMockProcessorContext.class);
when(context.metrics()).thenReturn(streamsMetrics);
when(context.taskId()).thenReturn(taskId);
when(context.appConfigs()).thenReturn(new StreamsConfig(props).originals());
when(context.stateDir()).thenReturn(dir);
final MonotonicProcessorRecordContext processorRecordContext = new MonotonicProcessorRecordContext("test", 0);
when(context.recordMetadata()).thenReturn(Optional.of(processorRecordContext));

rocksDBStore.init(context, rocksDBStore);
rocksDBStore.put(Bytes.wrap("hello".getBytes()), "world".getBytes());

streamsMetrics.rocksDBMetricsRecordingTrigger().run();

// A statistics-based metric must NOT be registered ...
final Metric bytesWrittenTotal = metrics.metric(new MetricName(
"bytes-written-total",
StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP,
"description is not verified",
streamsMetrics.storeLevelTagMap(taskId.toString(), METRICS_SCOPE, DB_NAME)
));
assertNull(bytesWrittenTotal);

// ... but a property-based gauge must still be registered.
final Metric numberOfEntriesActiveMemTable = metrics.metric(new MetricName(
"num-entries-active-mem-table",
StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP,
"description is not verified",
streamsMetrics.storeLevelTagMap(taskId.toString(), METRICS_SCOPE, DB_NAME)
));
assertThat(numberOfEntriesActiveMemTable, notNullValue());
}

@Test
public void shouldVerifyThatMetricsRecordedFromPropertiesGetMeasurementsFromRocksDB() {
final TaskId taskId = new TaskId(0, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.mockito.verification.VerificationMode;
import org.rocksdb.Cache;
import org.rocksdb.HistogramData;
import org.rocksdb.HistogramType;
Expand Down Expand Up @@ -121,25 +122,7 @@ public static void cleanUpMockito() {
}

@Test
public void shouldInitMetricsRecorder() {
dbMetrics.verify(() -> RocksDBMetrics.bytesWrittenToDatabaseSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.bytesReadFromDatabaseSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.memtableBytesFlushedSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.memtableHitRatioSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.memtableAvgFlushTimeSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.memtableMinFlushTimeSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.memtableMaxFlushTimeSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.writeStallDurationSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.blockCacheDataHitRatioSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.blockCacheIndexHitRatioSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.blockCacheFilterHitRatioSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.bytesWrittenDuringCompactionSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.bytesReadDuringCompactionSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.compactionTimeAvgSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.compactionTimeMinSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.compactionTimeMaxSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.numberOfOpenFilesSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.numberOfFileErrorsSensor(any(), any()));
public void shouldInitGaugesWhenRecorderIsInitialised() {
dbMetrics.verify(() -> RocksDBMetrics.addNumImmutableMemTableMetric(eq(streamsMetrics), eq(metricsContext), any()));
dbMetrics.verify(() -> RocksDBMetrics.addCurSizeActiveMemTable(eq(streamsMetrics), eq(metricsContext), any()));
dbMetrics.verify(() -> RocksDBMetrics.addCurSizeAllMemTables(eq(streamsMetrics), eq(metricsContext), any()));
Expand All @@ -165,6 +148,37 @@ public void shouldInitMetricsRecorder() {
assertThat(recorder.taskId(), is(TASK_ID1));
}

@Test
public void shouldNotInitStatisticsBasedSensorsWhenRecorderIsInitialised() {
// The recorder was init()'ed in setUp() but no value providers were added yet, so the
// statistics-based sensors must not have been registered. See KAFKA-10397.
verifyStatisticsBasedSensorsRegistered(never());
}

@Test
public void shouldInitStatisticsBasedSensorsWhenValueProvidersWithStatisticsAreAdded() {
recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);

verifyStatisticsBasedSensorsRegistered(times(1));
}

@Test
public void shouldNotInitStatisticsBasedSensorsWhenValueProvidersWithoutStatisticsAreAdded() {
// When the user supplies their own Statistics object, RocksDBStore passes null here and the
// statistics-based metrics can never be recorded, so they must not be registered. See KAFKA-10397.
recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, null);

verifyStatisticsBasedSensorsRegistered(never());
}

@Test
public void shouldInitStatisticsBasedSensorsOnlyOnceWhenMultipleValueProvidersWithStatisticsAreAdded() {
recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2);

verifyStatisticsBasedSensorsRegistered(times(1));
}

@Test
public void shouldThrowIfMetricRecorderIsReInitialisedWithDifferentTask() {
assertThrows(
Expand Down Expand Up @@ -562,6 +576,27 @@ public void shouldCorrectlyHandleAvgRecordingsWithZeroSumAndCount() {
verify(memtableAvgFlushTimeSensor).record(0d, now);
}

private void verifyStatisticsBasedSensorsRegistered(final VerificationMode mode) {
dbMetrics.verify(() -> RocksDBMetrics.bytesWrittenToDatabaseSensor(any(), any()), mode);
dbMetrics.verify(() -> RocksDBMetrics.bytesReadFromDatabaseSensor(any(), any()), mode);
dbMetrics.verify(() -> RocksDBMetrics.memtableBytesFlushedSensor(any(), any()), mode);
dbMetrics.verify(() -> RocksDBMetrics.memtableHitRatioSensor(any(), any()), mode);
dbMetrics.verify(() -> RocksDBMetrics.memtableAvgFlushTimeSensor(any(), any()), mode);
dbMetrics.verify(() -> RocksDBMetrics.memtableMinFlushTimeSensor(any(), any()), mode);
dbMetrics.verify(() -> RocksDBMetrics.memtableMaxFlushTimeSensor(any(), any()), mode);
dbMetrics.verify(() -> RocksDBMetrics.writeStallDurationSensor(any(), any()), mode);
dbMetrics.verify(() -> RocksDBMetrics.blockCacheDataHitRatioSensor(any(), any()), mode);
dbMetrics.verify(() -> RocksDBMetrics.blockCacheIndexHitRatioSensor(any(), any()), mode);
dbMetrics.verify(() -> RocksDBMetrics.blockCacheFilterHitRatioSensor(any(), any()), mode);
dbMetrics.verify(() -> RocksDBMetrics.bytesWrittenDuringCompactionSensor(any(), any()), mode);
dbMetrics.verify(() -> RocksDBMetrics.bytesReadDuringCompactionSensor(any(), any()), mode);
dbMetrics.verify(() -> RocksDBMetrics.compactionTimeAvgSensor(any(), any()), mode);
dbMetrics.verify(() -> RocksDBMetrics.compactionTimeMinSensor(any(), any()), mode);
dbMetrics.verify(() -> RocksDBMetrics.compactionTimeMaxSensor(any(), any()), mode);
dbMetrics.verify(() -> RocksDBMetrics.numberOfOpenFilesSensor(any(), any()), mode);
dbMetrics.verify(() -> RocksDBMetrics.numberOfFileErrorsSensor(any(), any()), mode);
}

private void setUpMetricsMock() {
dbMetrics = mockStatic(RocksDBMetrics.class);
dbMetrics.when(() -> RocksDBMetrics.bytesWrittenToDatabaseSensor(streamsMetrics, metricsContext))
Expand Down