Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.performanceanalyzer.util.Utils;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.metrics.Counter;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;
Expand All @@ -51,8 +52,11 @@ public class RTFPerformanceAnalyzerSearchListener
private final Histogram cpuUtilizationHistogram;
private final Histogram heapUsedHistogram;
private final Histogram searchLatencyHistogram;
private final Counter querySourceHitsCounter;
private final int numProcessors;

public static final String QUERY_SOURCE_HITS = "query_source_hits";

public RTFPerformanceAnalyzerSearchListener(final PerformanceAnalyzerController controller) {
this.controller = controller;
this.cpuUtilizationHistogram =
Expand All @@ -61,6 +65,8 @@ public RTFPerformanceAnalyzerSearchListener(final PerformanceAnalyzerController
createHeapUsedHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry());
this.searchLatencyHistogram =
createSearchLatencyHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry());
this.querySourceHitsCounter =
createQuerySourceHitsCounter(OpenSearchResources.INSTANCE.getMetricsRegistry());
this.threadLocal = ThreadLocal.withInitial(HashMap::new);
this.numProcessors = Runtime.getRuntime().availableProcessors();
}
Expand Down Expand Up @@ -103,6 +109,18 @@ private Histogram createSearchLatencyHistogram(MetricsRegistry metricsRegistry)
}
}

private Counter createQuerySourceHitsCounter(MetricsRegistry metricsRegistry) {
if (metricsRegistry != null) {
return metricsRegistry.createCounter(
QUERY_SOURCE_HITS,
"Count of shard fetch phases where _source was requested",
RTFMetrics.MetricUnits.COUNT.toString());
} else {
LOG.debug("MetricsRegistry is null");
return null;
}
}

@Override
public String toString() {
return RTFPerformanceAnalyzerSearchListener.class.getSimpleName();
Expand Down Expand Up @@ -219,6 +237,9 @@ public void fetchPhase(SearchContext searchContext, long tookInNanos) {
searchLatencyHistogram.record(
fetchTimeInMills, createTags(searchContext, SHARD_FETCH_PHASE, false));

// Emit counter if _source was requested in this fetch phase
emitFetchWithSourceMetric(searchContext);

addResourceTrackingCompletionListenerForFetchPhase(
searchContext, fetchStartTime, tookInNanos, SHARD_FETCH_PHASE, false);
}
Expand All @@ -231,6 +252,23 @@ public void failedFetchPhase(SearchContext searchContext) {
searchContext, fetchStartTime, fetchTime, SHARD_FETCH_PHASE, true);
}

/**
* Emits the query_source_hits counter if _source was requested in this fetch. Used to calculate
* source creation ratio: shard_search_rate / query_source_hits.
*/
private void emitFetchWithSourceMetric(SearchContext searchContext) {
if (querySourceHitsCounter == null) {
return;
}
try {
if (searchContext.sourceRequested()) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is shard level correct so we will send counter as 1 for all shards. ?

querySourceHitsCounter.add(1, createIndexTags(searchContext));
}
} catch (Exception e) {
LOG.error("Error emitting query_source_hits metric", e);
}
}

private void addResourceTrackingCompletionListener(
SearchContext searchContext,
long startTime,
Expand Down Expand Up @@ -344,4 +382,14 @@ private Tags createTags(SearchContext searchContext, String phase, boolean isFai
private Tags createTags(SearchContext searchContext) {
return createTags(searchContext, null, false);
}

private Tags createIndexTags(SearchContext searchContext) {
return Tags.create()
.addTag(
RTFMetrics.CommonDimension.INDEX_NAME.toString(),
searchContext.request().shardId().getIndex().getName())
.addTag(
RTFMetrics.CommonDimension.INDEX_UUID.toString(),
searchContext.request().shardId().getIndex().getUUID());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.opensearch.action.search.SearchShardTask;
Expand All @@ -29,6 +30,7 @@
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.metrics.Counter;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;
Expand All @@ -48,6 +50,7 @@ public class RTFPerformanceAnalyzerSearchListenerTests {
@Mock private Histogram searchLatencyHistogram;
@Mock private Histogram shardMetricsCpuHistogram;
@Mock private Histogram shardMetricsHeapHistogram;
@Mock private Counter querySourceHitsCounter;
@Mock private Index index;

@Mock private TaskResourceUsage taskResourceUsage;
Expand Down Expand Up @@ -89,6 +92,10 @@ public void init() {
}
return null;
});
Mockito.when(
metricsRegistry.createCounter(
Mockito.anyString(), Mockito.anyString(), Mockito.anyString()))
.thenReturn(querySourceHitsCounter);
searchListener = new RTFPerformanceAnalyzerSearchListener(controller);
assertEquals(
RTFPerformanceAnalyzerSearchListener.class.getSimpleName(),
Expand Down Expand Up @@ -203,6 +210,101 @@ public void testTaskCompletionListener() {
Mockito.verify(shardHeap).record(Mockito.anyDouble(), Mockito.any(Tags.class));
}

@Test
public void testFetchPhaseEmitsQuerySourceHitsWhenSourceRequested() {
initializeValidSearchContext(true);
Mockito.when(controller.getCollectorsRunModeValue())
.thenReturn(Util.CollectorMode.TELEMETRY.getValue());
Mockito.when(shardId.getIndex()).thenReturn(index);
Mockito.when(index.getName()).thenReturn("myTestIndex");
Mockito.when(index.getUUID()).thenReturn("abc-def");
Mockito.when(searchContext.sourceRequested()).thenReturn(true);

searchListener.preFetchPhase(searchContext);
searchListener.fetchPhase(searchContext, 0L);

Mockito.verify(querySourceHitsCounter).add(Mockito.eq(1L), Mockito.any(Tags.class));
}

@Test
public void testFetchPhaseDoesNotEmitQuerySourceHitsWhenSourceNotRequested() {
initializeValidSearchContext(true);
Mockito.when(controller.getCollectorsRunModeValue())
.thenReturn(Util.CollectorMode.TELEMETRY.getValue());
Mockito.when(shardId.getIndex()).thenReturn(index);
Mockito.when(index.getName()).thenReturn("myTestIndex");
Mockito.when(index.getUUID()).thenReturn("abc-def");
Mockito.when(searchContext.sourceRequested()).thenReturn(false);

searchListener.preFetchPhase(searchContext);
searchListener.fetchPhase(searchContext, 0L);

Mockito.verify(querySourceHitsCounter, Mockito.never())
.add(Mockito.anyLong(), Mockito.any(Tags.class));
}

@Test
public void testQuerySourceHitsCounterNotCreatedWhenMetricsRegistryNull() {
OpenSearchResources.INSTANCE.setMetricsRegistry(null);
RTFPerformanceAnalyzerSearchListener listener =
new RTFPerformanceAnalyzerSearchListener(controller);

// Should not throw even with null counter — emitFetchWithSourceMetric guards against it
initializeValidSearchContext(true);
Mockito.when(controller.getCollectorsRunModeValue())
.thenReturn(Util.CollectorMode.TELEMETRY.getValue());
Mockito.when(shardId.getIndex()).thenReturn(index);
Mockito.when(index.getName()).thenReturn("myTestIndex");
Mockito.when(index.getUUID()).thenReturn("abc-def");

listener.preFetchPhase(searchContext);
listener.fetchPhase(searchContext, 0L);

// Restore for other tests
OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry);
}

@Test
public void testQuerySourceHitsHandlesExceptionGracefully() {
initializeValidSearchContext(true);
Mockito.when(controller.getCollectorsRunModeValue())
.thenReturn(Util.CollectorMode.TELEMETRY.getValue());
Mockito.when(shardId.getIndex()).thenReturn(index);
Mockito.when(index.getName()).thenReturn("myTestIndex");
Mockito.when(index.getUUID()).thenReturn("abc-def");
Mockito.when(searchContext.sourceRequested())
.thenThrow(new RuntimeException("simulated failure"));

// Should not propagate — catch block logs error and swallows
searchListener.preFetchPhase(searchContext);
searchListener.fetchPhase(searchContext, 0L);

Mockito.verify(querySourceHitsCounter, Mockito.never())
.add(Mockito.anyLong(), Mockito.any(Tags.class));
}

@Test
public void testQuerySourceHitsEmitsCorrectTags() {
initializeValidSearchContext(true);
Mockito.when(controller.getCollectorsRunModeValue())
.thenReturn(Util.CollectorMode.TELEMETRY.getValue());
Mockito.when(shardId.getIndex()).thenReturn(index);
Mockito.when(index.getName()).thenReturn("myTestIndex");
Mockito.when(index.getUUID()).thenReturn("abc-def");
Mockito.when(searchContext.sourceRequested()).thenReturn(true);

searchListener.preFetchPhase(searchContext);
searchListener.fetchPhase(searchContext, 0L);

ArgumentCaptor<Tags> tagsCaptor = ArgumentCaptor.forClass(Tags.class);
Mockito.verify(querySourceHitsCounter).add(Mockito.eq(1L), tagsCaptor.capture());

Tags capturedTags = tagsCaptor.getValue();
String tagsString = capturedTags.toString();
assertTrue("Tags should contain index name", tagsString.contains("myTestIndex"));
assertTrue("Tags should contain index UUID", tagsString.contains("abc-def"));
}

private void initializeValidSearchContext(boolean isValid) {
if (isValid) {
Mockito.when(searchContext.request()).thenReturn(shardSearchRequest);
Expand Down
Loading