From 7f221e1b97948a2be3f60dfb9587a074c70f9596 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Sat, 9 May 2026 21:29:32 -0700 Subject: [PATCH 1/2] Route analytics queries by index setting, not table-name prefix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Today `RestUnifiedQueryAction.isAnalyticsIndex` dispatches to the analytics engine when the source index name starts with `parquet_`. That's brittle — it conflates naming convention with storage type. An index created without the prefix but with pluggable dataformat enabled is silently sent to the Lucene path; an index named `parquet_foo` without the setting is mis-dispatched to analytics. Use the authoritative signal instead: the `index.pluggable.dataformat.enabled` setting on cluster-state metadata. This is the same setting integration tests (`CoordinatorReduceIT`, `CompositeCommitDeletionIT`, etc.) already use to create analytics-backed indices, and it's what `FieldStorageResolver` reads to decide field-level storage. Behavior: - `index.pluggable.dataformat.enabled=true` → analytics engine (DataFusion) - flag absent / false / index missing → Calcite→OpenSearch DSL path Signed-off-by: bowenlan-amzn --- .../plugin/rest/RestUnifiedQueryAction.java | 25 ++++++--- .../rest/RestUnifiedQueryActionTest.java | 53 ++++++++++++++++--- 2 files changed, 63 insertions(+), 15 deletions(-) diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java index 0531cbde51..1766d47df4 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java @@ -27,6 +27,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; +import org.opensearch.index.IndexSettings; import org.opensearch.sql.api.UnifiedQueryContext; import org.opensearch.sql.api.UnifiedQueryPlanner; import org.opensearch.sql.ast.AbstractNodeVisitor; @@ -73,8 +74,9 @@ public RestUnifiedQueryAction( } /** - * Check if the query targets an analytics engine index (e.g., Parquet-backed). Uses the context's - * parser for index name extraction, supporting both PPL and SQL. + * Returns true iff the target index has {@link + * IndexSettings#PLUGGABLE_DATAFORMAT_ENABLED_SETTING} set, routing it to DataFusion instead of + * the Calcite→DSL path. * *

Note: This creates a separate UnifiedQueryContext for parsing. The context cannot be shared * with doExecute/doExplain because UnifiedQueryContext holds a Calcite JDBC connection that fails @@ -87,18 +89,25 @@ public boolean isAnalyticsIndex(String query, QueryType queryType) { } try (UnifiedQueryContext context = buildParsingContext(queryType)) { return extractIndexName(query, queryType, context) - .map( - indexName -> { - int lastDot = indexName.lastIndexOf('.'); - return lastDot >= 0 ? indexName.substring(lastDot + 1) : indexName; - }) - .map(tableName -> tableName.startsWith("parquet_")) + .map(this::stripSchemaPrefix) + .map(this::isPluggableDataformatIndex) .orElse(false); } catch (Exception e) { return false; } } + private String stripSchemaPrefix(String indexName) { + int lastDot = indexName.lastIndexOf('.'); + return lastDot >= 0 ? indexName.substring(lastDot + 1) : indexName; + } + + private boolean isPluggableDataformatIndex(String indexName) { + var indexMetadata = clusterService.state().metadata().index(indexName); + return indexMetadata != null + && IndexSettings.PLUGGABLE_DATAFORMAT_ENABLED_SETTING.get(indexMetadata.getSettings()); + } + /** Execute a query through the unified query pipeline on the sql-worker thread pool. */ public void execute( String query, diff --git a/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java b/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java index 60e0b0bf76..00a3e88be4 100644 --- a/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java +++ b/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java @@ -8,44 +8,83 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import org.apache.calcite.rel.RelNode; import org.junit.Before; import org.junit.Test; import org.opensearch.analytics.exec.QueryPlanExecutor; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.sql.common.setting.Settings; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexSettings; import org.opensearch.sql.executor.QueryType; import org.opensearch.transport.client.node.NodeClient; /** - * Tests for analytics index routing in RestUnifiedQueryAction. Uses context parser for AST-based - * index name extraction. + * Tests for analytics index routing in RestUnifiedQueryAction. Routing is driven by the {@code + * index.pluggable.dataformat.enabled} index setting, read from cluster state. */ public class RestUnifiedQueryActionTest { + private ClusterService clusterService; + private Metadata metadata; private RestUnifiedQueryAction action; @Before public void setUp() { + clusterService = mock(ClusterService.class); + ClusterState clusterState = mock(ClusterState.class); + metadata = mock(Metadata.class); + when(clusterService.state()).thenReturn(clusterState); + when(clusterState.metadata()).thenReturn(metadata); + @SuppressWarnings("unchecked") QueryPlanExecutor> executor = mock(QueryPlanExecutor.class); action = new RestUnifiedQueryAction( - mock(NodeClient.class), mock(ClusterService.class), executor, mock(Settings.class)); + mock(NodeClient.class), + clusterService, + executor, + mock(org.opensearch.sql.common.setting.Settings.class)); } @Test - public void parquetIndexRoutesToAnalytics() { + public void pluggableDataformatIndexRoutesToAnalytics() { + registerIndex( + "parquet_logs", + Settings.builder() + .put(IndexSettings.PLUGGABLE_DATAFORMAT_ENABLED_SETTING.getKey(), true) + .build()); + assertTrue(action.isAnalyticsIndex("source = parquet_logs | fields ts", QueryType.PPL)); assertTrue( action.isAnalyticsIndex("source = opensearch.parquet_logs | fields ts", QueryType.PPL)); } @Test - public void nonParquetIndexRoutesToLucene() { - assertFalse(action.isAnalyticsIndex("source = my_logs | fields ts", QueryType.PPL)); + public void indexWithoutSettingRoutesToLucene() { + registerIndex("plain_logs", Settings.EMPTY); + + assertFalse(action.isAnalyticsIndex("source = plain_logs | fields ts", QueryType.PPL)); + } + + @Test + public void missingIndexRoutesToLucene() { + assertFalse(action.isAnalyticsIndex("source = does_not_exist | fields ts", QueryType.PPL)); + } + + @Test + public void nullAndEmptyQueriesRouteToLucene() { assertFalse(action.isAnalyticsIndex(null, QueryType.PPL)); assertFalse(action.isAnalyticsIndex("", QueryType.PPL)); } + + private void registerIndex(String name, Settings settings) { + IndexMetadata indexMetadata = mock(IndexMetadata.class); + when(indexMetadata.getSettings()).thenReturn(settings); + when(metadata.index(name)).thenReturn(indexMetadata); + } } From f2e83ee4c0a716677e6dfea893605aa89a61781c Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Sun, 10 May 2026 13:29:17 -0700 Subject: [PATCH 2/2] Check primary_data_format=parquet before routing to analytics engine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Only route to the analytics path when both pluggable.dataformat.enabled=true AND pluggable.dataformat=parquet. If the format is lucene (or anything else), fall through to the standard Calcite→DSL path. Signed-off-by: bowenlan-amzn --- .../sql/plugin/rest/RestUnifiedQueryAction.java | 13 +++++++++---- .../plugin/rest/RestUnifiedQueryActionTest.java | 17 +++++++++++++++-- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java index 1766d47df4..7d39deb68d 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java @@ -75,8 +75,9 @@ public RestUnifiedQueryAction( /** * Returns true iff the target index has {@link - * IndexSettings#PLUGGABLE_DATAFORMAT_ENABLED_SETTING} set, routing it to DataFusion instead of - * the Calcite→DSL path. + * IndexSettings#PLUGGABLE_DATAFORMAT_ENABLED_SETTING} set and {@link + * IndexSettings#PLUGGABLE_DATAFORMAT_VALUE_SETTING} is {@code "parquet"}, routing it to + * DataFusion instead of the Calcite→DSL path. * *

Note: This creates a separate UnifiedQueryContext for parsing. The context cannot be shared * with doExecute/doExplain because UnifiedQueryContext holds a Calcite JDBC connection that fails @@ -104,8 +105,12 @@ private String stripSchemaPrefix(String indexName) { private boolean isPluggableDataformatIndex(String indexName) { var indexMetadata = clusterService.state().metadata().index(indexName); - return indexMetadata != null - && IndexSettings.PLUGGABLE_DATAFORMAT_ENABLED_SETTING.get(indexMetadata.getSettings()); + if (indexMetadata == null) { + return false; + } + var settings = indexMetadata.getSettings(); + return IndexSettings.PLUGGABLE_DATAFORMAT_ENABLED_SETTING.get(settings) + && "parquet".equals(IndexSettings.PLUGGABLE_DATAFORMAT_VALUE_SETTING.get(settings)); } /** Execute a query through the unified query pipeline on the sql-worker thread pool. */ diff --git a/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java b/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java index 00a3e88be4..25f157f0d9 100644 --- a/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java +++ b/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java @@ -24,8 +24,8 @@ import org.opensearch.transport.client.node.NodeClient; /** - * Tests for analytics index routing in RestUnifiedQueryAction. Routing is driven by the {@code - * index.pluggable.dataformat.enabled} index setting, read from cluster state. + * Tests for analytics index routing in RestUnifiedQueryAction. Routing requires both {@code + * index.pluggable.dataformat.enabled=true} and {@code index.pluggable.dataformat=parquet}. */ public class RestUnifiedQueryActionTest { @@ -57,6 +57,7 @@ public void pluggableDataformatIndexRoutesToAnalytics() { "parquet_logs", Settings.builder() .put(IndexSettings.PLUGGABLE_DATAFORMAT_ENABLED_SETTING.getKey(), true) + .put(IndexSettings.PLUGGABLE_DATAFORMAT_VALUE_SETTING.getKey(), "parquet") .build()); assertTrue(action.isAnalyticsIndex("source = parquet_logs | fields ts", QueryType.PPL)); @@ -64,6 +65,18 @@ public void pluggableDataformatIndexRoutesToAnalytics() { action.isAnalyticsIndex("source = opensearch.parquet_logs | fields ts", QueryType.PPL)); } + @Test + public void pluggableEnabledButLuceneFormatRoutesToLucene() { + registerIndex( + "lucene_logs", + Settings.builder() + .put(IndexSettings.PLUGGABLE_DATAFORMAT_ENABLED_SETTING.getKey(), true) + .put(IndexSettings.PLUGGABLE_DATAFORMAT_VALUE_SETTING.getKey(), "lucene") + .build()); + + assertFalse(action.isAnalyticsIndex("source = lucene_logs | fields ts", QueryType.PPL)); + } + @Test public void indexWithoutSettingRoutesToLucene() { registerIndex("plain_logs", Settings.EMPTY);