From ea806acf5ef737bb8b830c952899e270c1d79199 Mon Sep 17 00:00:00 2001 From: Shivam Gupta Date: Tue, 14 Apr 2026 15:52:14 +0530 Subject: [PATCH] Add query_source_hits metric Signed-off-by: Shivam Gupta --- .../RTFPerformanceAnalyzerSearchListener.java | 48 +++++++++ ...erformanceAnalyzerSearchListenerTests.java | 102 ++++++++++++++++++ 2 files changed, 150 insertions(+) diff --git a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java index 6d7b0532..173fd545 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java +++ b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java @@ -25,6 +25,7 @@ import org.opensearch.performanceanalyzer.util.Utils; import org.opensearch.search.internal.SearchContext; import org.opensearch.tasks.Task; +import org.opensearch.telemetry.metrics.Counter; import org.opensearch.telemetry.metrics.Histogram; import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.telemetry.metrics.tags.Tags; @@ -51,8 +52,11 @@ public class RTFPerformanceAnalyzerSearchListener private final Histogram cpuUtilizationHistogram; private final Histogram heapUsedHistogram; private final Histogram searchLatencyHistogram; + private final Counter querySourceHitsCounter; private final int numProcessors; + public static final String QUERY_SOURCE_HITS = "query_source_hits"; + public RTFPerformanceAnalyzerSearchListener(final PerformanceAnalyzerController controller) { this.controller = controller; this.cpuUtilizationHistogram = @@ -61,6 +65,8 @@ public RTFPerformanceAnalyzerSearchListener(final PerformanceAnalyzerController createHeapUsedHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry()); this.searchLatencyHistogram = createSearchLatencyHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry()); + this.querySourceHitsCounter = + createQuerySourceHitsCounter(OpenSearchResources.INSTANCE.getMetricsRegistry()); this.threadLocal = ThreadLocal.withInitial(HashMap::new); this.numProcessors = Runtime.getRuntime().availableProcessors(); } @@ -103,6 +109,18 @@ private Histogram createSearchLatencyHistogram(MetricsRegistry metricsRegistry) } } + private Counter createQuerySourceHitsCounter(MetricsRegistry metricsRegistry) { + if (metricsRegistry != null) { + return metricsRegistry.createCounter( + QUERY_SOURCE_HITS, + "Count of shard fetch phases where _source was requested", + RTFMetrics.MetricUnits.COUNT.toString()); + } else { + LOG.debug("MetricsRegistry is null"); + return null; + } + } + @Override public String toString() { return RTFPerformanceAnalyzerSearchListener.class.getSimpleName(); @@ -219,6 +237,9 @@ public void fetchPhase(SearchContext searchContext, long tookInNanos) { searchLatencyHistogram.record( fetchTimeInMills, createTags(searchContext, SHARD_FETCH_PHASE, false)); + // Emit counter if _source was requested in this fetch phase + emitFetchWithSourceMetric(searchContext); + addResourceTrackingCompletionListenerForFetchPhase( searchContext, fetchStartTime, tookInNanos, SHARD_FETCH_PHASE, false); } @@ -231,6 +252,23 @@ public void failedFetchPhase(SearchContext searchContext) { searchContext, fetchStartTime, fetchTime, SHARD_FETCH_PHASE, true); } + /** + * Emits the query_source_hits counter if _source was requested in this fetch. Used to calculate + * source creation ratio: shard_search_rate / query_source_hits. + */ + private void emitFetchWithSourceMetric(SearchContext searchContext) { + if (querySourceHitsCounter == null) { + return; + } + try { + if (searchContext.sourceRequested()) { + querySourceHitsCounter.add(1, createIndexTags(searchContext)); + } + } catch (Exception e) { + LOG.error("Error emitting query_source_hits metric", e); + } + } + private void addResourceTrackingCompletionListener( SearchContext searchContext, long startTime, @@ -344,4 +382,14 @@ private Tags createTags(SearchContext searchContext, String phase, boolean isFai private Tags createTags(SearchContext searchContext) { return createTags(searchContext, null, false); } + + private Tags createIndexTags(SearchContext searchContext) { + return Tags.create() + .addTag( + RTFMetrics.CommonDimension.INDEX_NAME.toString(), + searchContext.request().shardId().getIndex().getName()) + .addTag( + RTFMetrics.CommonDimension.INDEX_UUID.toString(), + searchContext.request().shardId().getIndex().getUUID()); + } } diff --git a/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java index 7d5c726c..56a75ca5 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java @@ -13,6 +13,7 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Mockito; import org.opensearch.action.search.SearchShardTask; @@ -29,6 +30,7 @@ import org.opensearch.search.internal.SearchContext; import org.opensearch.search.internal.ShardSearchRequest; import org.opensearch.tasks.Task; +import org.opensearch.telemetry.metrics.Counter; import org.opensearch.telemetry.metrics.Histogram; import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.telemetry.metrics.tags.Tags; @@ -48,6 +50,7 @@ public class RTFPerformanceAnalyzerSearchListenerTests { @Mock private Histogram searchLatencyHistogram; @Mock private Histogram shardMetricsCpuHistogram; @Mock private Histogram shardMetricsHeapHistogram; + @Mock private Counter querySourceHitsCounter; @Mock private Index index; @Mock private TaskResourceUsage taskResourceUsage; @@ -89,6 +92,10 @@ public void init() { } return null; }); + Mockito.when( + metricsRegistry.createCounter( + Mockito.anyString(), Mockito.anyString(), Mockito.anyString())) + .thenReturn(querySourceHitsCounter); searchListener = new RTFPerformanceAnalyzerSearchListener(controller); assertEquals( RTFPerformanceAnalyzerSearchListener.class.getSimpleName(), @@ -203,6 +210,101 @@ public void testTaskCompletionListener() { Mockito.verify(shardHeap).record(Mockito.anyDouble(), Mockito.any(Tags.class)); } + @Test + public void testFetchPhaseEmitsQuerySourceHitsWhenSourceRequested() { + initializeValidSearchContext(true); + Mockito.when(controller.getCollectorsRunModeValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + Mockito.when(shardId.getIndex()).thenReturn(index); + Mockito.when(index.getName()).thenReturn("myTestIndex"); + Mockito.when(index.getUUID()).thenReturn("abc-def"); + Mockito.when(searchContext.sourceRequested()).thenReturn(true); + + searchListener.preFetchPhase(searchContext); + searchListener.fetchPhase(searchContext, 0L); + + Mockito.verify(querySourceHitsCounter).add(Mockito.eq(1L), Mockito.any(Tags.class)); + } + + @Test + public void testFetchPhaseDoesNotEmitQuerySourceHitsWhenSourceNotRequested() { + initializeValidSearchContext(true); + Mockito.when(controller.getCollectorsRunModeValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + Mockito.when(shardId.getIndex()).thenReturn(index); + Mockito.when(index.getName()).thenReturn("myTestIndex"); + Mockito.when(index.getUUID()).thenReturn("abc-def"); + Mockito.when(searchContext.sourceRequested()).thenReturn(false); + + searchListener.preFetchPhase(searchContext); + searchListener.fetchPhase(searchContext, 0L); + + Mockito.verify(querySourceHitsCounter, Mockito.never()) + .add(Mockito.anyLong(), Mockito.any(Tags.class)); + } + + @Test + public void testQuerySourceHitsCounterNotCreatedWhenMetricsRegistryNull() { + OpenSearchResources.INSTANCE.setMetricsRegistry(null); + RTFPerformanceAnalyzerSearchListener listener = + new RTFPerformanceAnalyzerSearchListener(controller); + + // Should not throw even with null counter — emitFetchWithSourceMetric guards against it + initializeValidSearchContext(true); + Mockito.when(controller.getCollectorsRunModeValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + Mockito.when(shardId.getIndex()).thenReturn(index); + Mockito.when(index.getName()).thenReturn("myTestIndex"); + Mockito.when(index.getUUID()).thenReturn("abc-def"); + + listener.preFetchPhase(searchContext); + listener.fetchPhase(searchContext, 0L); + + // Restore for other tests + OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); + } + + @Test + public void testQuerySourceHitsHandlesExceptionGracefully() { + initializeValidSearchContext(true); + Mockito.when(controller.getCollectorsRunModeValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + Mockito.when(shardId.getIndex()).thenReturn(index); + Mockito.when(index.getName()).thenReturn("myTestIndex"); + Mockito.when(index.getUUID()).thenReturn("abc-def"); + Mockito.when(searchContext.sourceRequested()) + .thenThrow(new RuntimeException("simulated failure")); + + // Should not propagate — catch block logs error and swallows + searchListener.preFetchPhase(searchContext); + searchListener.fetchPhase(searchContext, 0L); + + Mockito.verify(querySourceHitsCounter, Mockito.never()) + .add(Mockito.anyLong(), Mockito.any(Tags.class)); + } + + @Test + public void testQuerySourceHitsEmitsCorrectTags() { + initializeValidSearchContext(true); + Mockito.when(controller.getCollectorsRunModeValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + Mockito.when(shardId.getIndex()).thenReturn(index); + Mockito.when(index.getName()).thenReturn("myTestIndex"); + Mockito.when(index.getUUID()).thenReturn("abc-def"); + Mockito.when(searchContext.sourceRequested()).thenReturn(true); + + searchListener.preFetchPhase(searchContext); + searchListener.fetchPhase(searchContext, 0L); + + ArgumentCaptor tagsCaptor = ArgumentCaptor.forClass(Tags.class); + Mockito.verify(querySourceHitsCounter).add(Mockito.eq(1L), tagsCaptor.capture()); + + Tags capturedTags = tagsCaptor.getValue(); + String tagsString = capturedTags.toString(); + assertTrue("Tags should contain index name", tagsString.contains("myTestIndex")); + assertTrue("Tags should contain index UUID", tagsString.contains("abc-def")); + } + private void initializeValidSearchContext(boolean isValid) { if (isValid) { Mockito.when(searchContext.request()).thenReturn(shardSearchRequest);