diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java index 0531cbde51..7d39deb68d 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java @@ -27,6 +27,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; +import org.opensearch.index.IndexSettings; import org.opensearch.sql.api.UnifiedQueryContext; import org.opensearch.sql.api.UnifiedQueryPlanner; import org.opensearch.sql.ast.AbstractNodeVisitor; @@ -73,8 +74,10 @@ public RestUnifiedQueryAction( } /** - * Check if the query targets an analytics engine index (e.g., Parquet-backed). Uses the context's - * parser for index name extraction, supporting both PPL and SQL. + * Returns true iff the target index has {@link + * IndexSettings#PLUGGABLE_DATAFORMAT_ENABLED_SETTING} set and {@link + * IndexSettings#PLUGGABLE_DATAFORMAT_VALUE_SETTING} is {@code "parquet"}, routing it to + * DataFusion instead of the Calcite→DSL path. * *

Note: This creates a separate UnifiedQueryContext for parsing. The context cannot be shared * with doExecute/doExplain because UnifiedQueryContext holds a Calcite JDBC connection that fails @@ -87,18 +90,29 @@ public boolean isAnalyticsIndex(String query, QueryType queryType) { } try (UnifiedQueryContext context = buildParsingContext(queryType)) { return extractIndexName(query, queryType, context) - .map( - indexName -> { - int lastDot = indexName.lastIndexOf('.'); - return lastDot >= 0 ? indexName.substring(lastDot + 1) : indexName; - }) - .map(tableName -> tableName.startsWith("parquet_")) + .map(this::stripSchemaPrefix) + .map(this::isPluggableDataformatIndex) .orElse(false); } catch (Exception e) { return false; } } + private String stripSchemaPrefix(String indexName) { + int lastDot = indexName.lastIndexOf('.'); + return lastDot >= 0 ? indexName.substring(lastDot + 1) : indexName; + } + + private boolean isPluggableDataformatIndex(String indexName) { + var indexMetadata = clusterService.state().metadata().index(indexName); + if (indexMetadata == null) { + return false; + } + var settings = indexMetadata.getSettings(); + return IndexSettings.PLUGGABLE_DATAFORMAT_ENABLED_SETTING.get(settings) + && "parquet".equals(IndexSettings.PLUGGABLE_DATAFORMAT_VALUE_SETTING.get(settings)); + } + /** Execute a query through the unified query pipeline on the sql-worker thread pool. */ public void execute( String query, diff --git a/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java b/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java index 60e0b0bf76..25f157f0d9 100644 --- a/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java +++ b/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java @@ -8,44 +8,96 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import org.apache.calcite.rel.RelNode; import org.junit.Before; import org.junit.Test; import org.opensearch.analytics.exec.QueryPlanExecutor; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.sql.common.setting.Settings; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexSettings; import org.opensearch.sql.executor.QueryType; import org.opensearch.transport.client.node.NodeClient; /** - * Tests for analytics index routing in RestUnifiedQueryAction. Uses context parser for AST-based - * index name extraction. + * Tests for analytics index routing in RestUnifiedQueryAction. Routing requires both {@code + * index.pluggable.dataformat.enabled=true} and {@code index.pluggable.dataformat=parquet}. */ public class RestUnifiedQueryActionTest { + private ClusterService clusterService; + private Metadata metadata; private RestUnifiedQueryAction action; @Before public void setUp() { + clusterService = mock(ClusterService.class); + ClusterState clusterState = mock(ClusterState.class); + metadata = mock(Metadata.class); + when(clusterService.state()).thenReturn(clusterState); + when(clusterState.metadata()).thenReturn(metadata); + @SuppressWarnings("unchecked") QueryPlanExecutor> executor = mock(QueryPlanExecutor.class); action = new RestUnifiedQueryAction( - mock(NodeClient.class), mock(ClusterService.class), executor, mock(Settings.class)); + mock(NodeClient.class), + clusterService, + executor, + mock(org.opensearch.sql.common.setting.Settings.class)); } @Test - public void parquetIndexRoutesToAnalytics() { + public void pluggableDataformatIndexRoutesToAnalytics() { + registerIndex( + "parquet_logs", + Settings.builder() + .put(IndexSettings.PLUGGABLE_DATAFORMAT_ENABLED_SETTING.getKey(), true) + .put(IndexSettings.PLUGGABLE_DATAFORMAT_VALUE_SETTING.getKey(), "parquet") + .build()); + assertTrue(action.isAnalyticsIndex("source = parquet_logs | fields ts", QueryType.PPL)); assertTrue( action.isAnalyticsIndex("source = opensearch.parquet_logs | fields ts", QueryType.PPL)); } @Test - public void nonParquetIndexRoutesToLucene() { - assertFalse(action.isAnalyticsIndex("source = my_logs | fields ts", QueryType.PPL)); + public void pluggableEnabledButLuceneFormatRoutesToLucene() { + registerIndex( + "lucene_logs", + Settings.builder() + .put(IndexSettings.PLUGGABLE_DATAFORMAT_ENABLED_SETTING.getKey(), true) + .put(IndexSettings.PLUGGABLE_DATAFORMAT_VALUE_SETTING.getKey(), "lucene") + .build()); + + assertFalse(action.isAnalyticsIndex("source = lucene_logs | fields ts", QueryType.PPL)); + } + + @Test + public void indexWithoutSettingRoutesToLucene() { + registerIndex("plain_logs", Settings.EMPTY); + + assertFalse(action.isAnalyticsIndex("source = plain_logs | fields ts", QueryType.PPL)); + } + + @Test + public void missingIndexRoutesToLucene() { + assertFalse(action.isAnalyticsIndex("source = does_not_exist | fields ts", QueryType.PPL)); + } + + @Test + public void nullAndEmptyQueriesRouteToLucene() { assertFalse(action.isAnalyticsIndex(null, QueryType.PPL)); assertFalse(action.isAnalyticsIndex("", QueryType.PPL)); } + + private void registerIndex(String name, Settings settings) { + IndexMetadata indexMetadata = mock(IndexMetadata.class); + when(indexMetadata.getSettings()).thenReturn(settings); + when(metadata.index(name)).thenReturn(indexMetadata); + } }