From 50eb98a44318e40f26db31463266c40147325212 Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Mon, 11 May 2026 10:18:09 -0700 Subject: [PATCH 1/2] Land analytics-engine PPL integration into main MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Single squashed delivery of the long-running feature/mustang-ppl-integration branch into main, consolidating 22 feature-branch PRs plus the conflict-resolved merge of current main. Squashed because the feature branch's history includes commits with missing or mismatched Signed-off-by trailers that block DCO at this scope — the equivalent issue documented for the catch-up squashes (#5397). The feature branch f006e29cd is retained for individual-commit lineage. ### What this delivers Analytics-engine PPL integration — a new execution path that routes Parquet-backed (non-Lucene) indices through an analytics engine while keeping Lucene-backed indices on the existing v2 / Calcite paths. Headline pieces: - Query routing (#5267) — PPL queries against Parquet-backed indices hand off to the analytics-engine execution path; Lucene-backed indices continue through the legacy path - Explain support (#5275) — EXPLAIN covers the analytics-engine path - Profiling + UnifiedQueryParser (#5285) — migrates PPL parsing to the unified parser and wires profiling metrics through the analytics path - extendedPlugins wiring (#5302) — analytics-engine attaches as an OpenSearch extension via SPI - SQL REST endpoint integration (#5317) — same analytics-route fork applied to the SQL transport, plus delegateToV2Engine extraction in RestSqlAction - Async QueryPlanExecutor (#5396) — async execution for analytics-engine plans + version bump to OpenSearch 3.7 - Optional dependency (#5403) — analytics-engine becomes an optional runtime dep so the SQL bundle is shippable without it - Index-setting-based routing (#5429, #5432) — replaces the earlier table-name-prefix heuristic with an authoritative index-setting check Supporting infrastructure: - Gradle wrapper bump to 9.4.1 (#5406) - Jar-hell exclusions for arrow-flight-rpc / httpcore5-h2 / httpcore5-reactive / httpclient5 (#5400, #5409) - IT plumbing: CalciteEvalCommandIT / CalciteFieldFormatCommandIT carried through the helper-managed index path (#5407, #5417); CalciteReplaceCommandIT column-order-agnostic (#5415); @Ignore'd Calcite ITs dropped from CalciteNoPushdownIT (#5416) - plugins.calcite.enabled=true defaulted on the unified query path (#5413) - PPL_REX_MAX_MATCH_LIMIT bridged into UnifiedQueryContext (#5418) - Calcite tolerance fixes: array() default type (#5421), containsNestedAggregator flat-leaf schemas (#5423) - Sandbox deps switched to analytics-api JDK 21 surface (#5426) ### Feature-branch commits squashed (22) #5432, #5429, #5426, #5423, #5421, #5418, #5403, #5417, #5415, #5416, #5413, #5407, #5409, #5406, #5400, #5396, #5317, #5302, #5285, #5275, #5267, #5397, #5286 ### Main commits absorbed via the merge (54) Brings the branch up to current upstream/main (54 commits since the last catch-up at #5397, divergence point 513e1b220). Highlights: #5419, #5408, #5414, #5399, #5394, #5361, #5360, #5240, #5266, #5278, plus 44 others (bugfixes, doc updates, infra). ### Conflict resolutions (7) Resolved during the merge of main into the feature branch. Resolution kept the feature branch's analytics-engine-path semantics where main's changes would have regressed them. - api/.../UnifiedQueryContext.java Blank-line-only conflict; took main's tighter formatting. - core/.../executor/QueryService.java Kept feature's CalciteClassLoaderHelper.withCalciteClassLoader(...) wrapping (required for analytics-engine classloader isolation) and the matching import. - integ-test/build.gradle Kept feature's detailed root-cause comment on the Gradle 9.4.1 TestEventReporterAsListener workaround; kept ASCII ordering of JSONRequestIT / JoinIT and SQLFunctionsIT / ShowIT / SourceFieldIT entries. - integ-test/.../CalciteEvalCommandIT.java Kept feature's if (!TestUtils.isIndexExist(...)) idempotency guards on test_eval and test_eval_agent setup (needed for the helper-managed index analytics-engine compatibility run). - legacy/.../RestSqlAction.java Kept feature's delegateToV2Engine(...) (extracted from the analytics-engine routing path). Both sides added handleException / getRestStatus / getRawErrorCode; removed the duplicate set git produced. - plugin/.../SQLPlugin.java Took the union of imports: ExecutionEngine + ExecutionEngine.ExplainResponse + QueryType. - plugin/.../transport/TransportPPLQueryAction.java Combined main's OpenSearchPluginModule(extensionsHolder.engines()) and feature's local pluginSettings / pluginSettingsRef wiring. EngineExtensionsHolder.java is a new file from main (#5298) preserved as-is. ### Compatibility / opt-in The analytics-engine path is gated by the extendedPlugins extension being installed (#5403 makes the dep optional). Clusters without analytics-engine installed see no behavior change. Clusters with analytics-engine installed route only Parquet-backed indices through the new path (#5429 — by index setting). ### Verification - ./gradlew :api:compileJava :core:compileJava :legacy:compileJava :opensearch-sql-plugin:compileJava :integ-test:compileTestJava passes locally Signed-off-by: Kai Huang Co-authored-by: bowenlan-amzn --- .github/workflows/analytics-engine-compat.yml | 44 ++ .../sql/api/UnifiedQueryContext.java | 22 +- .../sql/api/UnifiedQueryContextTest.java | 9 + core/build.gradle | 7 + .../sql/ast/statement/ExplainMode.java | 10 + .../sql/calcite/CalciteRelNodeVisitor.java | 19 +- .../utils/CalciteClassLoaderHelper.java | 60 +++ .../opensearch/sql/executor/QueryService.java | 71 ++-- .../analytics/AnalyticsExecutionEngine.java | 151 +++++++ .../CollectionUDF/ArrayFunctionImpl.java | 15 + .../AnalyticsExecutionEngineTest.java | 396 ++++++++++++++++++ .../CollectionUDF/ArrayFunctionImplTest.java | 81 ++++ doctest/build.gradle | 1 + integ-test/build.gradle | 67 ++- .../calcite/remote/CalciteEvalCommandIT.java | 70 +++- .../remote/CalciteFieldFormatCommandIT.java | 45 +- .../calcite/remote/CalcitePPLRenameIT.java | 142 +++++-- .../remote/CalciteReplaceCommandIT.java | 228 ++++++++-- .../sql/plugin/AnalyticsEngineCompatIT.java | 21 + .../sql/legacy/plugin/RestSqlAction.java | 82 +++- .../storage/script/CalciteScriptEngine.java | 5 +- plugin/build.gradle | 6 +- .../org/opensearch/sql/plugin/SQLPlugin.java | 98 ++++- .../plugin/rest/AnalyticsExecutorHolder.java | 36 ++ .../plugin/rest/RestUnifiedQueryAction.java | 290 +++++++++++++ .../transport/TransportPPLQueryAction.java | 53 ++- .../rest/RestUnifiedQueryActionTest.java | 103 +++++ 27 files changed, 1979 insertions(+), 153 deletions(-) create mode 100644 .github/workflows/analytics-engine-compat.yml create mode 100644 core/src/main/java/org/opensearch/sql/calcite/utils/CalciteClassLoaderHelper.java create mode 100644 core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java create mode 100644 core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java create mode 100644 integ-test/src/test/java/org/opensearch/sql/plugin/AnalyticsEngineCompatIT.java create mode 100644 plugin/src/main/java/org/opensearch/sql/plugin/rest/AnalyticsExecutorHolder.java create mode 100644 plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java create mode 100644 plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java diff --git a/.github/workflows/analytics-engine-compat.yml b/.github/workflows/analytics-engine-compat.yml new file mode 100644 index 00000000000..9c3bd9c9f99 --- /dev/null +++ b/.github/workflows/analytics-engine-compat.yml @@ -0,0 +1,44 @@ +name: Analytics Engine Compatibility + +on: + pull_request: + push: + branches-ignore: + - 'backport/**' + - 'dependabot/**' + paths: + - '**/*.java' + - '**gradle*' + - 'integ-test/**' + - '.github/workflows/analytics-engine-compat.yml' + merge_group: + +jobs: + Get-CI-Image-Tag: + uses: opensearch-project/opensearch-build/.github/workflows/get-ci-image-tag.yml@main + with: + product: opensearch + + analytics-engine-compat: + needs: Get-CI-Image-Tag + runs-on: ubuntu-latest + container: + image: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-version-linux }} + options: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-start-options }} + + steps: + - name: Run start commands + run: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-start-command }} + + - uses: actions/checkout@v4 + + - name: Set up JDK 25 + uses: actions/setup-java@v4 + with: + distribution: 'temurin' + java-version: 25 + + - name: Run analytics-engine compatibility smoke test + run: | + chown -R 1000:1000 `pwd` + su `id -un 1000` -c "./gradlew :integ-test:analyticsEngineCompatIT" diff --git a/api/src/main/java/org/opensearch/sql/api/UnifiedQueryContext.java b/api/src/main/java/org/opensearch/sql/api/UnifiedQueryContext.java index 4169963da54..4372d5e05ba 100644 --- a/api/src/main/java/org/opensearch/sql/api/UnifiedQueryContext.java +++ b/api/src/main/java/org/opensearch/sql/api/UnifiedQueryContext.java @@ -5,7 +5,9 @@ package org.opensearch.sql.api; +import static org.opensearch.sql.common.setting.Settings.Key.CALCITE_ENGINE_ENABLED; import static org.opensearch.sql.common.setting.Settings.Key.PPL_JOIN_SUBSEARCH_MAXOUT; +import static org.opensearch.sql.common.setting.Settings.Key.PPL_REX_MAX_MATCH_LIMIT; import static org.opensearch.sql.common.setting.Settings.Key.PPL_SUBSEARCH_MAXOUT; import static org.opensearch.sql.common.setting.Settings.Key.QUERY_SIZE_LIMIT; @@ -119,13 +121,31 @@ public static class Builder { /** * Setting values with defaults from SysLimit.DEFAULT. Only includes planning-required settings * to avoid coupling with OpenSearchSettings. + * + *

{@link Settings.Key#CALCITE_ENGINE_ENABLED} defaults to {@code true} here because the + * unified query path is by definition Calcite-based — every query reaching this context flows + * through Calcite's planner, never the v2 engine. The PPL {@link + * org.opensearch.sql.api.parser.PPLQueryParser} reuses the v2 {@code AstBuilder}, which gates + * Calcite-only commands (e.g. {@code visitTableCommand}) on this setting; without the default, + * those commands fail at parse time even when the cluster setting is true. + * + *

{@link Settings.Key#PPL_REX_MAX_MATCH_LIMIT} defaults to {@code 10} here because {@code + * AstBuilder.visitRexCommand} reads it unconditionally and unboxes to {@code int} — a {@code + * null} return from {@code getSettingValue} NPEs the planner before any operator-level + * capability check runs. The value mirrors the cluster-side default of {@code 10} registered by + * {@code OpenSearchSettings.PPL_REX_MAX_MATCH_LIMIT_SETTING}. Cluster-side overrides reach this + * map via {@link #setting(String, Object)} — the REST handler reads the live value from {@code + * OpenSearchSettings} and routes it through that existing API, keeping {@link + * UnifiedQueryContext} decoupled from any specific {@link Settings} implementation. */ private final Map settings = new HashMap( Map.of( QUERY_SIZE_LIMIT, SysLimit.DEFAULT.querySizeLimit(), PPL_SUBSEARCH_MAXOUT, SysLimit.DEFAULT.subsearchLimit(), - PPL_JOIN_SUBSEARCH_MAXOUT, SysLimit.DEFAULT.joinSubsearchLimit())); + PPL_JOIN_SUBSEARCH_MAXOUT, SysLimit.DEFAULT.joinSubsearchLimit(), + CALCITE_ENGINE_ENABLED, true, + PPL_REX_MAX_MATCH_LIMIT, 10)); /** * Sets the query language frontend to be used. diff --git a/api/src/test/java/org/opensearch/sql/api/UnifiedQueryContextTest.java b/api/src/test/java/org/opensearch/sql/api/UnifiedQueryContextTest.java index ad2eba0fea5..f0111d06363 100644 --- a/api/src/test/java/org/opensearch/sql/api/UnifiedQueryContextTest.java +++ b/api/src/test/java/org/opensearch/sql/api/UnifiedQueryContextTest.java @@ -33,6 +33,10 @@ public void testContextCreationWithDefaults() { "Settings should have default system limits", SysLimit.DEFAULT, SysLimit.fromSettings(context.getSettings())); + assertEquals( + "PPL_REX_MAX_MATCH_LIMIT default should be 10", + Integer.valueOf(10), + context.getSettings().getSettingValue(PPL_REX_MAX_MATCH_LIMIT)); } @Test @@ -43,10 +47,15 @@ public void testContextCreationWithCustomConfig() { .catalog("opensearch", testSchema) .cacheMetadata(true) .setting("plugins.query.size_limit", 200) + .setting("plugins.ppl.rex.max_match.limit", 5) .build(); Integer querySizeLimit = context.getSettings().getSettingValue(QUERY_SIZE_LIMIT); assertEquals("Custom setting should be applied", Integer.valueOf(200), querySizeLimit); + assertEquals( + "Cluster-side override for PPL_REX_MAX_MATCH_LIMIT should reach the unified path", + Integer.valueOf(5), + context.getSettings().getSettingValue(PPL_REX_MAX_MATCH_LIMIT)); } @Test(expected = IllegalArgumentException.class) diff --git a/core/build.gradle b/core/build.gradle index 23f9b37e317..f567fb85653 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -33,6 +33,7 @@ plugins { } repositories { + mavenLocal() mavenCentral() } @@ -63,6 +64,12 @@ dependencies { } api 'org.apache.calcite:calcite-linq4j:1.41.0' api project(':common') + compileOnly 'org.opensearch.sandbox:analytics-api:3.7.0-SNAPSHOT' + // Needed because analytics-api's QueryPlanExecutor signature uses + // org.opensearch.core.action.ActionListener; AnalyticsExecutionEngine references that type. + compileOnly group: 'org.opensearch', name: 'opensearch-core', version: "${opensearch_version}" + testImplementation 'org.opensearch.sandbox:analytics-api:3.7.0-SNAPSHOT' + testImplementation group: 'org.opensearch', name: 'opensearch-core', version: "${opensearch_version}" implementation "com.github.seancfoley:ipaddress:5.4.2" implementation "com.jayway.jsonpath:json-path:2.9.0" diff --git a/core/src/main/java/org/opensearch/sql/ast/statement/ExplainMode.java b/core/src/main/java/org/opensearch/sql/ast/statement/ExplainMode.java index 9043f05929b..b52d64f4867 100644 --- a/core/src/main/java/org/opensearch/sql/ast/statement/ExplainMode.java +++ b/core/src/main/java/org/opensearch/sql/ast/statement/ExplainMode.java @@ -8,6 +8,7 @@ import java.util.Locale; import lombok.Getter; import lombok.RequiredArgsConstructor; +import org.apache.calcite.sql.SqlExplainLevel; @RequiredArgsConstructor public enum ExplainMode { @@ -26,4 +27,13 @@ public static ExplainMode of(String mode) { return ExplainMode.STANDARD; } } + + /** Convert to Calcite SqlExplainLevel for RelOptUtil.toString(). */ + public SqlExplainLevel toExplainLevel() { + return switch (this) { + case SIMPLE -> SqlExplainLevel.NO_ATTRIBUTES; + case COST -> SqlExplainLevel.ALL_ATTRIBUTES; + default -> SqlExplainLevel.EXPPLAN_ATTRIBUTES; + }; + } } diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 15bfece5f46..1251f51b131 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -1538,10 +1538,25 @@ private Pair, List> aggregateWithTrimming( * count(a.b)] returns true. */ private boolean containsNestedAggregator(RelBuilder relBuilder, List aggCallRefs) { + // For each aggregator argument, take the part of its column name before the first dot + // (e.g. "city" from "city.location.latitude") and check whether that's a top-level + // ARRAY column — the marker for an OpenSearch `nested` field. + // + // The classic path always exposes a top-level column for object/nested parents. The + // analytics-engine path emits only the flat leaves ("city.name", "city.location.latitude") + // because parent placeholder types (MAP) can't round-trip through Substrait. + // RelDataType.getField returns null when the column doesn't exist — for analytics-engine, + // that null just means "not nested," which is the right answer. + RelDataType rowType = relBuilder.peek().getRowType(); return aggCallRefs.stream() - .map(r -> relBuilder.peek().getRowType().getFieldNames().get(r.getIndex())) + .map(r -> rowType.getFieldNames().get(r.getIndex())) .map(name -> org.apache.commons.lang3.StringUtils.substringBefore(name, ".")) - .anyMatch(root -> relBuilder.field(root).getType().getSqlTypeName() == SqlTypeName.ARRAY); + .anyMatch( + root -> { + RelDataTypeField field = + rowType.getField(root, /* caseSensitive= */ true, /* elideRecord= */ false); + return field != null && field.getType().getSqlTypeName() == SqlTypeName.ARRAY; + }); } /** diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteClassLoaderHelper.java b/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteClassLoaderHelper.java new file mode 100644 index 00000000000..b2367f653c3 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteClassLoaderHelper.java @@ -0,0 +1,60 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.utils; + +import java.util.concurrent.Callable; + +/** + * Helper for setting the thread context classloader before Calcite operations. This is needed for + * patched Calcite (CALCITE-3745): when analytics-engine is the parent classloader, Janino uses the + * parent's classloader which can't see SQL plugin classes. The patched Calcite checks {@code + * Thread.currentThread().getContextClassLoader()} first. This helper sets it to the SQL plugin's + * classloader (child) which can see both parent and child classes. + * + * @see CALCITE-3745 + * @see sql#5306 + */ +public final class CalciteClassLoaderHelper { + + private CalciteClassLoaderHelper() {} + + /** + * Run an action with the thread context classloader set to the caller's classloader. + * + * @param action the action to run + * @param callerClass the class whose classloader should be used (pass {@code MyClass.class}) + * @param the return type + * @return the result of the action + */ + public static T withCalciteClassLoader(Callable action, Class callerClass) { + Thread currentThread = Thread.currentThread(); + ClassLoader originalCl = currentThread.getContextClassLoader(); + currentThread.setContextClassLoader(callerClass.getClassLoader()); + try { + return action.call(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + currentThread.setContextClassLoader(originalCl); + } + } + + /** + * Run a void action with the thread context classloader set to the caller's classloader. + * + * @see #withCalciteClassLoader(Callable, Class) + */ + public static void withCalciteClassLoader(Runnable action, Class callerClass) { + withCalciteClassLoader( + () -> { + action.run(); + return null; + }, + callerClass); + } +} diff --git a/core/src/main/java/org/opensearch/sql/executor/QueryService.java b/core/src/main/java/org/opensearch/sql/executor/QueryService.java index b294a168cd8..fe9d3e55dc1 100644 --- a/core/src/main/java/org/opensearch/sql/executor/QueryService.java +++ b/core/src/main/java/org/opensearch/sql/executor/QueryService.java @@ -35,6 +35,7 @@ import org.opensearch.sql.calcite.SysLimit; import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit; import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit.SystemLimitType; +import org.opensearch.sql.calcite.utils.CalciteClassLoaderHelper; import org.opensearch.sql.common.error.ErrorReport; import org.opensearch.sql.common.error.QueryProcessingStage; import org.opensearch.sql.common.error.StageErrorHandler; @@ -142,33 +143,37 @@ public void executeWithCalcite( QueryProfiling.activate(QueryContext.isProfileEnabled()); ProfileMetric analyzeMetric = profileContext.getOrCreateMetric(MetricName.ANALYZE); long analyzeStart = System.nanoTime(); - CalcitePlanContext context = - CalcitePlanContext.create( - buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType); + CalciteClassLoaderHelper.withCalciteClassLoader( + () -> { + CalcitePlanContext context = + CalcitePlanContext.create( + buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType); - context.setHighlightConfig(highlightConfig); + context.setHighlightConfig(highlightConfig); - // Wrap analyze with ANALYZING stage tracking - RelNode relNode = - StageErrorHandler.executeStage( - QueryProcessingStage.ANALYZING, - () -> analyze(plan, context), - "while preparing and validating the query plan"); + // Wrap analyze with ANALYZING stage tracking + RelNode relNode = + StageErrorHandler.executeStage( + QueryProcessingStage.ANALYZING, + () -> analyze(plan, context), + "while preparing and validating the query plan"); - // Wrap plan conversion with PLAN_CONVERSION stage tracking - RelNode calcitePlan = - StageErrorHandler.executeStage( - QueryProcessingStage.PLAN_CONVERSION, - () -> convertToCalcitePlan(relNode, context), - "while converting the query to an executable plan"); + // Wrap plan conversion with PLAN_CONVERSION stage tracking + RelNode calcitePlan = + StageErrorHandler.executeStage( + QueryProcessingStage.PLAN_CONVERSION, + () -> convertToCalcitePlan(relNode, context), + "while converting the query to an executable plan"); - analyzeMetric.set(System.nanoTime() - analyzeStart); + analyzeMetric.set(System.nanoTime() - analyzeStart); - // Wrap execution with EXECUTING stage tracking - StageErrorHandler.executeStageVoid( - QueryProcessingStage.EXECUTING, - () -> executionEngine.execute(calcitePlan, context, listener), - "while running the query"); + // Wrap execution with EXECUTING stage tracking + StageErrorHandler.executeStageVoid( + QueryProcessingStage.EXECUTING, + () -> executionEngine.execute(calcitePlan, context, listener), + "while running the query"); + }, + QueryService.class); } catch (Throwable t) { if (isCalciteFallbackAllowed(t) && !(t instanceof NonFallbackCalciteException)) { log.warn("Fallback to V2 query engine since got exception", t); @@ -191,17 +196,21 @@ public void explainWithCalcite( () -> { try { QueryProfiling.noop(); - CalcitePlanContext context = - CalcitePlanContext.create( - buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType); - context.setHighlightConfig(highlightConfig); - context.run( + CalciteClassLoaderHelper.withCalciteClassLoader( () -> { - RelNode relNode = analyze(plan, context); - RelNode calcitePlan = convertToCalcitePlan(relNode, context); - executionEngine.explain(calcitePlan, mode, context, listener); + CalcitePlanContext context = + CalcitePlanContext.create( + buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType); + context.setHighlightConfig(highlightConfig); + context.run( + () -> { + RelNode relNode = analyze(plan, context); + RelNode calcitePlan = convertToCalcitePlan(relNode, context); + executionEngine.explain(calcitePlan, mode, context, listener); + }, + settings); }, - settings); + QueryService.class); } catch (Throwable t) { if (isCalciteFallbackAllowed(t)) { log.warn("Fallback to V2 query engine since got exception", t); diff --git a/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java b/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java new file mode 100644 index 00000000000..ddfe5fd3556 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java @@ -0,0 +1,151 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor.analytics; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.opensearch.analytics.exec.QueryPlanExecutor; +import org.opensearch.core.action.ActionListener; +import org.opensearch.sql.ast.statement.ExplainMode; +import org.opensearch.sql.calcite.CalcitePlanContext; +import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.data.model.ExprTupleValue; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.model.ExprValueUtils; +import org.opensearch.sql.data.type.ExprType; +import org.opensearch.sql.executor.ExecutionContext; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.pagination.Cursor; +import org.opensearch.sql.monitor.profile.MetricName; +import org.opensearch.sql.monitor.profile.ProfileMetric; +import org.opensearch.sql.monitor.profile.QueryProfiling; +import org.opensearch.sql.planner.physical.PhysicalPlan; + +/** + * Execution engine adapter for the analytics engine (Project Mustang). + * + *

Bridges the analytics engine's {@link QueryPlanExecutor} with the SQL plugin's {@link + * ExecutionEngine} response pipeline. Takes a Calcite {@link RelNode}, delegates execution to the + * analytics engine, and converts the raw results into {@link QueryResponse}. + */ +public class AnalyticsExecutionEngine implements ExecutionEngine { + + private final QueryPlanExecutor> planExecutor; + + public AnalyticsExecutionEngine(QueryPlanExecutor> planExecutor) { + this.planExecutor = planExecutor; + } + + /** Not supported. Analytics queries use the RelNode path exclusively. */ + @Override + public void execute(PhysicalPlan plan, ResponseListener listener) { + listener.onFailure( + new UnsupportedOperationException("Analytics engine only supports RelNode execution")); + } + + /** Not supported. Analytics queries use the RelNode path exclusively. */ + @Override + public void execute( + PhysicalPlan plan, ExecutionContext context, ResponseListener listener) { + listener.onFailure( + new UnsupportedOperationException("Analytics engine only supports RelNode execution")); + } + + /** Not supported. Analytics queries use the RelNode path exclusively. */ + @Override + public void explain(PhysicalPlan plan, ResponseListener listener) { + listener.onFailure( + new UnsupportedOperationException("Analytics engine only supports RelNode execution")); + } + + @Override + public void execute( + RelNode plan, CalcitePlanContext context, ResponseListener listener) { + // QueryPlanExecutor became asynchronous in analytics-framework 3.7 — execution is dispatched + // to a worker pool and results arrive on the listener. Record the execute metric in the + // listener callback, before delegating to the user-supplied listener, so the metric snapshot + // taken by SimpleJsonResponseFormatter sees the correct value. + ProfileMetric execMetric = QueryProfiling.current().getOrCreateMetric(MetricName.EXECUTE); + long execStart = System.nanoTime(); + + planExecutor.execute( + plan, + null, + new ActionListener<>() { + @Override + public void onResponse(Iterable rows) { + try { + List fields = plan.getRowType().getFieldList(); + List results = convertRows(rows, fields); + Schema schema = buildSchema(fields); + execMetric.set(System.nanoTime() - execStart); + listener.onResponse(new QueryResponse(schema, results, Cursor.None)); + } catch (Exception e) { + listener.onFailure(e); + } + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } + + @Override + public void explain( + RelNode plan, + ExplainMode mode, + CalcitePlanContext context, + ResponseListener listener) { + try { + String logical = RelOptUtil.toString(plan, mode.toExplainLevel()); + ExplainResponse response = + new ExplainResponse(new ExplainResponseNodeV2(logical, null, null)); + listener.onResponse(ExplainResponse.normalizeLf(response)); + } catch (Exception e) { + listener.onFailure(e); + } + } + + private List convertRows(Iterable rows, List fields) { + List results = new ArrayList<>(); + for (Object[] row : rows) { + Map valueMap = new LinkedHashMap<>(); + for (int i = 0; i < fields.size(); i++) { + String columnName = fields.get(i).getName(); + Object value = (i < row.length) ? row[i] : null; + valueMap.put(columnName, ExprValueUtils.fromObjectValue(value)); + } + results.add(ExprTupleValue.fromExprValueMap(valueMap)); + } + return results; + } + + private Schema buildSchema(List fields) { + List columns = new ArrayList<>(); + for (RelDataTypeField field : fields) { + ExprType exprType = convertType(field.getType()); + columns.add(new Schema.Column(field.getName(), null, exprType)); + } + return new Schema(columns); + } + + private ExprType convertType(RelDataType type) { + try { + return OpenSearchTypeFactory.convertRelDataTypeToExprType(type); + } catch (IllegalArgumentException e) { + return org.opensearch.sql.data.type.ExprCoreType.UNKNOWN; + } + } +} diff --git a/core/src/main/java/org/opensearch/sql/expression/function/CollectionUDF/ArrayFunctionImpl.java b/core/src/main/java/org/opensearch/sql/expression/function/CollectionUDF/ArrayFunctionImpl.java index 9a77a0d5a7c..318f32a41be 100644 --- a/core/src/main/java/org/opensearch/sql/expression/function/CollectionUDF/ArrayFunctionImpl.java +++ b/core/src/main/java/org/opensearch/sql/expression/function/CollectionUDF/ArrayFunctionImpl.java @@ -50,6 +50,10 @@ public SqlReturnTypeInference getReturnTypeInference() { RelDataType originalType = SqlLibraryOperators.ARRAY.getReturnTypeInference().inferReturnType(sqlOperatorBinding); RelDataType innerType = originalType.getComponentType(); + // Default empty/unknown element type to VARCHAR — see PR description for why. + if (innerType == null || isUnknownLikeType(innerType.getSqlTypeName())) { + innerType = typeFactory.createSqlType(SqlTypeName.VARCHAR); + } return createArrayType( typeFactory, typeFactory.createTypeWithNullability(innerType, true), true); } catch (Exception e) { @@ -63,6 +67,17 @@ public UDFOperandMetadata getOperandMetadata() { return null; } + /** + * Calcite's {@link SqlLibraryOperators#ARRAY} infers a {@code NULL}-element array for an empty + * call list and an {@code UNKNOWN}-element array when type inference can't pick one (e.g. all + * operands are typeless nulls). Either of those bubbles up to the analytics-engine route's + * substrait converter as "Unable to convert the type UNKNOWN" — substrait has no encoding for + * either marker. Treat both as needing a concrete fallback. + */ + private static boolean isUnknownLikeType(SqlTypeName sqlTypeName) { + return sqlTypeName == SqlTypeName.NULL || sqlTypeName == SqlTypeName.UNKNOWN; + } + public static class ArrayImplementor implements NotNullImplementor { @Override public Expression implement( diff --git a/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java b/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java new file mode 100644 index 00000000000..4de596fb375 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java @@ -0,0 +1,396 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor.analytics; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.sql.type.SqlTypeFactoryImpl; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.analytics.exec.QueryPlanExecutor; +import org.opensearch.core.action.ActionListener; +import org.opensearch.sql.calcite.CalcitePlanContext; +import org.opensearch.sql.calcite.SysLimit; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.executor.ExecutionEngine.ExplainResponse; +import org.opensearch.sql.executor.ExecutionEngine.QueryResponse; +import org.opensearch.sql.planner.physical.PhysicalPlan; + +class AnalyticsExecutionEngineTest { + + private AnalyticsExecutionEngine engine; + + @SuppressWarnings("unchecked") + private QueryPlanExecutor> mockExecutor; + + private CalcitePlanContext mockContext; + + @BeforeEach + void setUp() throws Exception { + mockExecutor = (QueryPlanExecutor>) mock(QueryPlanExecutor.class); + engine = new AnalyticsExecutionEngine(mockExecutor); + mockContext = mock(CalcitePlanContext.class); + setSysLimit(mockContext, SysLimit.DEFAULT); + } + + /** Sets the public final sysLimit field on a mocked CalcitePlanContext. */ + private static void setSysLimit(CalcitePlanContext context, SysLimit sysLimit) throws Exception { + Field field = CalcitePlanContext.class.getDeclaredField("sysLimit"); + field.setAccessible(true); + field.set(context, sysLimit); + } + + /** QueryPlanExecutor became async in analytics-framework 3.7 — stub the listener callback. */ + @SuppressWarnings("unchecked") + private void stubExecutorWith(RelNode relNode, Iterable rows) { + doAnswer( + inv -> { + ((ActionListener>) inv.getArgument(2)).onResponse(rows); + return null; + }) + .when(mockExecutor) + .execute(eq(relNode), any(), any(ActionListener.class)); + } + + @SuppressWarnings("unchecked") + private void stubExecutorWithError(RelNode relNode, Exception error) { + doAnswer( + inv -> { + ((ActionListener>) inv.getArgument(2)).onFailure(error); + return null; + }) + .when(mockExecutor) + .execute(eq(relNode), any(), any(ActionListener.class)); + } + + @Test + void executeRelNode_basicTypesAndRows() { + RelNode relNode = mockRelNode("name", SqlTypeName.VARCHAR, "age", SqlTypeName.INTEGER); + Iterable rows = Arrays.asList(new Object[] {"Alice", 30}, new Object[] {"Bob", 25}); + stubExecutorWith(relNode, rows); + + QueryResponse response = executeAndCapture(relNode); + String dump = dumpResponse(response); + + // Schema: 2 columns [name:STRING, age:INTEGER] + assertEquals(2, response.getSchema().getColumns().size(), "Column count. " + dump); + assertEquals("name", response.getSchema().getColumns().get(0).getName(), dump); + assertEquals(ExprCoreType.STRING, response.getSchema().getColumns().get(0).getExprType(), dump); + assertEquals("age", response.getSchema().getColumns().get(1).getName(), dump); + assertEquals( + ExprCoreType.INTEGER, response.getSchema().getColumns().get(1).getExprType(), dump); + + // Rows: [{name=Alice, age=30}, {name=Bob, age=25}] + assertEquals(2, response.getResults().size(), "Row count. " + dump); + assertEquals( + "Alice", response.getResults().get(0).tupleValue().get("name").value(), "Row 0. " + dump); + assertEquals( + 30, response.getResults().get(0).tupleValue().get("age").value(), "Row 0. " + dump); + assertEquals( + "Bob", response.getResults().get(1).tupleValue().get("name").value(), "Row 1. " + dump); + assertEquals( + 25, response.getResults().get(1).tupleValue().get("age").value(), "Row 1. " + dump); + + // Cursor: None + assertEquals(org.opensearch.sql.executor.pagination.Cursor.None, response.getCursor(), dump); + } + + @Test + void executeRelNode_numericTypes() { + RelNode relNode = + mockRelNode( + "b", SqlTypeName.TINYINT, + "s", SqlTypeName.SMALLINT, + "i", SqlTypeName.INTEGER, + "l", SqlTypeName.BIGINT, + "f", SqlTypeName.FLOAT, + "d", SqlTypeName.DOUBLE); + Iterable rows = + Collections.singletonList(new Object[] {(byte) 1, (short) 2, 3, 4L, 5.0f, 6.0}); + stubExecutorWith(relNode, rows); + + QueryResponse response = executeAndCapture(relNode); + String dump = dumpResponse(response); + + assertEquals(ExprCoreType.BYTE, response.getSchema().getColumns().get(0).getExprType(), dump); + assertEquals(ExprCoreType.SHORT, response.getSchema().getColumns().get(1).getExprType(), dump); + assertEquals( + ExprCoreType.INTEGER, response.getSchema().getColumns().get(2).getExprType(), dump); + assertEquals(ExprCoreType.LONG, response.getSchema().getColumns().get(3).getExprType(), dump); + assertEquals(ExprCoreType.FLOAT, response.getSchema().getColumns().get(4).getExprType(), dump); + assertEquals(ExprCoreType.DOUBLE, response.getSchema().getColumns().get(5).getExprType(), dump); + + // Verify actual values + assertEquals( + (byte) 1, + response.getResults().get(0).tupleValue().get("b").value(), + "byte value. " + dump); + assertEquals( + (short) 2, + response.getResults().get(0).tupleValue().get("s").value(), + "short value. " + dump); + assertEquals( + 3, response.getResults().get(0).tupleValue().get("i").value(), "int value. " + dump); + assertEquals( + 4L, response.getResults().get(0).tupleValue().get("l").value(), "long value. " + dump); + assertEquals( + 5.0f, response.getResults().get(0).tupleValue().get("f").value(), "float value. " + dump); + assertEquals( + 6.0, response.getResults().get(0).tupleValue().get("d").value(), "double value. " + dump); + } + + @Test + void executeRelNode_temporalTypes() { + RelNode relNode = + mockRelNode("dt", SqlTypeName.DATE, "tm", SqlTypeName.TIME, "ts", SqlTypeName.TIMESTAMP); + Iterable emptyRows = Collections.emptyList(); + stubExecutorWith(relNode, emptyRows); + + QueryResponse response = executeAndCapture(relNode); + String dump = dumpResponse(response); + + assertEquals(ExprCoreType.DATE, response.getSchema().getColumns().get(0).getExprType(), dump); + assertEquals(ExprCoreType.TIME, response.getSchema().getColumns().get(1).getExprType(), dump); + assertEquals( + ExprCoreType.TIMESTAMP, response.getSchema().getColumns().get(2).getExprType(), dump); + assertEquals(0, response.getResults().size(), "Should have 0 rows. " + dump); + } + + // Query size limit is now enforced in the RelNode plan (LogicalSystemLimit) before it reaches + // AnalyticsExecutionEngine. The engine trusts the executor to honor the limit. + + @Test + void executeRelNode_emptyResults() { + RelNode relNode = mockRelNode("name", SqlTypeName.VARCHAR); + Iterable emptyRows = Collections.emptyList(); + stubExecutorWith(relNode, emptyRows); + + QueryResponse response = executeAndCapture(relNode); + String dump = dumpResponse(response); + + assertEquals(1, response.getSchema().getColumns().size(), "Schema column count. " + dump); + assertEquals(0, response.getResults().size(), "Row count should be 0. " + dump); + } + + @Test + void executeRelNode_nullValues() { + RelNode relNode = mockRelNode("name", SqlTypeName.VARCHAR, "age", SqlTypeName.INTEGER); + Iterable rows = Collections.singletonList(new Object[] {null, null}); + stubExecutorWith(relNode, rows); + + QueryResponse response = executeAndCapture(relNode); + String dump = dumpResponse(response); + + assertEquals(1, response.getResults().size(), "Row count. " + dump); + assertTrue( + response.getResults().get(0).tupleValue().get("name").isNull(), + "name should be null. " + dump); + assertTrue( + response.getResults().get(0).tupleValue().get("age").isNull(), + "age should be null. " + dump); + } + + @Test + void executeRelNode_errorPropagation() { + RelNode relNode = mockRelNode("id", SqlTypeName.INTEGER); + stubExecutorWithError(relNode, new RuntimeException("Engine failure")); + + Exception error = executeAndCaptureError(relNode); + System.out.println(dumpError("executeRelNode_errorPropagation", error)); + + assertEquals( + "Engine failure", + error.getMessage(), + "Exception type: " + error.getClass().getSimpleName() + ", message: " + error.getMessage()); + } + + @Test + void physicalPlanExecute_callsOnFailure() { + PhysicalPlan physicalPlan = mock(PhysicalPlan.class); + AtomicReference errorRef = new AtomicReference<>(); + engine.execute(physicalPlan, failureListener(errorRef)); + + assertNotNull(errorRef.get(), "onFailure should have been called"); + System.out.println(dumpError("physicalPlanExecute_callsOnFailure", errorRef.get())); + assertTrue( + errorRef.get() instanceof UnsupportedOperationException, + "Expected UnsupportedOperationException, got: " + + errorRef.get().getClass().getSimpleName() + + " - " + + errorRef.get().getMessage()); + } + + @Test + void physicalPlanExecuteWithContext_callsOnFailure() { + PhysicalPlan physicalPlan = mock(PhysicalPlan.class); + AtomicReference errorRef = new AtomicReference<>(); + engine.execute( + physicalPlan, + org.opensearch.sql.executor.ExecutionContext.emptyExecutionContext(), + failureListener(errorRef)); + + assertNotNull(errorRef.get(), "onFailure should have been called"); + System.out.println(dumpError("physicalPlanExecuteWithContext_callsOnFailure", errorRef.get())); + assertTrue( + errorRef.get() instanceof UnsupportedOperationException, + "Expected UnsupportedOperationException, got: " + + errorRef.get().getClass().getSimpleName() + + " - " + + errorRef.get().getMessage()); + } + + @Test + void physicalPlanExplain_callsOnFailure() { + PhysicalPlan physicalPlan = mock(PhysicalPlan.class); + AtomicReference errorRef = new AtomicReference<>(); + engine.explain(physicalPlan, explainFailureListener(errorRef)); + + assertNotNull(errorRef.get(), "onFailure should have been called"); + System.out.println(dumpError("physicalPlanExplain_callsOnFailure", errorRef.get())); + assertTrue( + errorRef.get() instanceof UnsupportedOperationException, + "Expected UnsupportedOperationException, got: " + + errorRef.get().getClass().getSimpleName() + + " - " + + errorRef.get().getMessage()); + } + + // --- helpers --- + + private QueryResponse executeAndCapture(RelNode relNode) { + AtomicReference ref = new AtomicReference<>(); + engine.execute(relNode, mockContext, captureListener(ref)); + assertNotNull(ref.get(), "QueryResponse should not be null"); + // Always print the full response so test output shows exact results + System.out.println(dumpResponse(ref.get())); + return ref.get(); + } + + private Exception executeAndCaptureError(RelNode relNode) { + AtomicReference ref = new AtomicReference<>(); + engine.execute( + relNode, + mockContext, + new ResponseListener() { + @Override + public void onResponse(QueryResponse response) {} + + @Override + public void onFailure(Exception e) { + ref.set(e); + } + }); + assertNotNull(ref.get(), "onFailure should have been called"); + return ref.get(); + } + + private ResponseListener failureListener(AtomicReference ref) { + return new ResponseListener() { + @Override + public void onResponse(QueryResponse response) {} + + @Override + public void onFailure(Exception e) { + ref.set(e); + } + }; + } + + private ResponseListener explainFailureListener(AtomicReference ref) { + return new ResponseListener() { + @Override + public void onResponse(ExplainResponse response) {} + + @Override + public void onFailure(Exception e) { + ref.set(e); + } + }; + } + + private String dumpError(String testName, Exception e) { + return "\n--- " + + testName + + " ---\n" + + "Exception: " + + e.getClass().getSimpleName() + + "\n" + + "Message: " + + e.getMessage() + + "\n--- End ---"; + } + + /** Dumps the full QueryResponse into a readable string for test output and assertion messages. */ + private String dumpResponse(QueryResponse response) { + StringBuilder sb = new StringBuilder(); + sb.append("\n--- QueryResponse ---\n"); + + sb.append("Schema: ["); + sb.append( + response.getSchema().getColumns().stream() + .map(c -> c.getName() + ":" + c.getExprType().typeName()) + .collect(Collectors.joining(", "))); + sb.append("]\n"); + + sb.append("Rows (").append(response.getResults().size()).append("):\n"); + for (int i = 0; i < response.getResults().size(); i++) { + sb.append(" [").append(i).append("] "); + sb.append(response.getResults().get(i).tupleValue()); + sb.append("\n"); + } + + sb.append("Cursor: ").append(response.getCursor()).append("\n"); + sb.append("--- End ---"); + return sb.toString(); + } + + private RelNode mockRelNode(Object... nameTypePairs) { + SqlTypeFactoryImpl typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + RelDataTypeFactory.Builder builder = typeFactory.builder(); + for (int i = 0; i < nameTypePairs.length; i += 2) { + String name = (String) nameTypePairs[i]; + SqlTypeName typeName = (SqlTypeName) nameTypePairs[i + 1]; + builder.add(name, typeName); + } + RelDataType rowType = builder.build(); + + RelNode relNode = mock(RelNode.class); + when(relNode.getRowType()).thenReturn(rowType); + return relNode; + } + + private ResponseListener captureListener(AtomicReference ref) { + return new ResponseListener() { + @Override + public void onResponse(QueryResponse response) { + ref.set(response); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("Unexpected failure", e); + } + }; + } +} diff --git a/core/src/test/java/org/opensearch/sql/expression/function/CollectionUDF/ArrayFunctionImplTest.java b/core/src/test/java/org/opensearch/sql/expression/function/CollectionUDF/ArrayFunctionImplTest.java index 6dbc1901fa7..600a802615a 100644 --- a/core/src/test/java/org/opensearch/sql/expression/function/CollectionUDF/ArrayFunctionImplTest.java +++ b/core/src/test/java/org/opensearch/sql/expression/function/CollectionUDF/ArrayFunctionImplTest.java @@ -14,6 +14,12 @@ import java.util.Collections; import java.util.List; import java.util.stream.Collectors; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.sql.ExplicitOperatorBinding; +import org.apache.calcite.sql.fun.SqlLibraryOperators; +import org.apache.calcite.sql.type.SqlTypeFactoryImpl; import org.apache.calcite.sql.type.SqlTypeName; import org.junit.jupiter.api.Test; @@ -302,4 +308,79 @@ public void testArrayWithCharTypePreservesNulls() { assertNull(list.get(1), "Null should be preserved during CHAR type conversion"); assertEquals("y", list.get(2)); } + + // ==================== RETURN-TYPE INFERENCE TESTS ==================== + // These tests cover the return-type fallback the analytics-engine route depends on: + // when Calcite can't infer a concrete element type (no operands, or all-null operands), + // we substitute VARCHAR so the call's return type is substrait-serializable. Without the + // fallback Calcite emits ARRAY / ARRAY, which fails substrait conversion + // with "Unable to convert the type UNKNOWN" downstream. + + /** array() — empty operand list — returns ARRAY. */ + @Test + public void testReturnTypeForEmptyCallIsVarcharArray() { + RelDataType returnType = inferReturnType(); + assertEquals(SqlTypeName.ARRAY, returnType.getSqlTypeName()); + RelDataType element = returnType.getComponentType(); + assertNotNull(element); + assertEquals(SqlTypeName.VARCHAR, element.getSqlTypeName()); + assertTrue(element.isNullable(), "Element type should be nullable per existing semantics"); + } + + /** array(NULL) — single typeless-null operand — also falls back to ARRAY. */ + @Test + public void testReturnTypeForAllNullOperandsIsVarcharArray() { + RelDataTypeFactory typeFactory = newTypeFactory(); + RelDataType nullType = typeFactory.createSqlType(SqlTypeName.NULL); + RelDataType returnType = inferReturnType(nullType); + assertEquals(SqlTypeName.ARRAY, returnType.getSqlTypeName()); + RelDataType element = returnType.getComponentType(); + assertNotNull(element); + assertEquals(SqlTypeName.VARCHAR, element.getSqlTypeName()); + } + + /** array(1) — INTEGER operand — preserves the inferred element type (no fallback). */ + @Test + public void testReturnTypeForIntegerOperandPreservesType() { + RelDataTypeFactory typeFactory = newTypeFactory(); + RelDataType intType = typeFactory.createSqlType(SqlTypeName.INTEGER); + RelDataType returnType = inferReturnType(intType); + assertEquals(SqlTypeName.ARRAY, returnType.getSqlTypeName()); + RelDataType element = returnType.getComponentType(); + assertNotNull(element); + assertEquals( + SqlTypeName.INTEGER, + element.getSqlTypeName(), + "Concrete element types must not be affected by the VARCHAR fallback"); + } + + /** array('a', 'b') — VARCHAR operands — already VARCHAR, fallback path doesn't fire. */ + @Test + public void testReturnTypeForVarcharOperandPreservesType() { + RelDataTypeFactory typeFactory = newTypeFactory(); + RelDataType varcharType = typeFactory.createSqlType(SqlTypeName.VARCHAR); + RelDataType returnType = inferReturnType(varcharType, varcharType); + assertEquals(SqlTypeName.ARRAY, returnType.getSqlTypeName()); + assertEquals(SqlTypeName.VARCHAR, returnType.getComponentType().getSqlTypeName()); + } + + /** + * Helper — invokes {@code new ArrayFunctionImpl().getReturnTypeInference().inferReturnType(...)} + * via Calcite's {@link ExplicitOperatorBinding}, which is the public test harness for exercising + * a return-type inference against a specific operand-type list. We bind it to {@link + * SqlLibraryOperators#ARRAY} so the inference's internal call to {@code + * SqlLibraryOperators.ARRAY.getReturnTypeInference().inferReturnType(...)} resolves the same + * operator the lambda delegates to. + */ + private static RelDataType inferReturnType(RelDataType... operandTypes) { + RelDataTypeFactory typeFactory = newTypeFactory(); + ExplicitOperatorBinding binding = + new ExplicitOperatorBinding( + typeFactory, SqlLibraryOperators.ARRAY, Arrays.asList(operandTypes)); + return new ArrayFunctionImpl().getReturnTypeInference().inferReturnType(binding); + } + + private static RelDataTypeFactory newTypeFactory() { + return new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + } } diff --git a/doctest/build.gradle b/doctest/build.gradle index d758a6de4b8..d9dca3fd5c2 100644 --- a/doctest/build.gradle +++ b/doctest/build.gradle @@ -15,6 +15,7 @@ plugins { apply plugin: 'opensearch.testclusters' + def path = project(':').projectDir // temporary fix, because currently we are under migration to new architecture. Need to run ./gradlew run from // plugin module, and will only build ppl in it. diff --git a/integ-test/build.gradle b/integ-test/build.gradle index 4bd9a9847ca..06e6478f5fd 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -270,6 +270,36 @@ def getGeoSpatialPlugin() { } } +// fetch from the feature-build artifact for now (linux/x64 only; for local dev pass -PanalyticsEngineZip=/path instead). +ext.pluginVersion = opensearch_version.tokenize('-')[0] +ext.featureBuildBase = "https://ci.opensearch.org/ci/dbc/feature-build-opensearch/feature-datafusion/latest/linux/x64/tar/builds/opensearch/plugins" +ext.analyticsEngineZipDest = "${buildDir}/distributions/analytics-engine-${pluginVersion}-SNAPSHOT.zip" +ext.arrowFlightRpcZipDest = "${buildDir}/distributions/arrow-flight-rpc-${pluginVersion}-SNAPSHOT.zip" + +task downloadAnalyticsEngineZip(type: Download) { + src "${featureBuildBase}/1-analytics-engine-${pluginVersion}.zip" + dest analyticsEngineZipDest + overwrite false + onlyIfModified true + onlyIf { !project.findProperty('analyticsEngineZip') } +} + +task downloadArrowFlightRpcZip(type: Download) { + src "${featureBuildBase}/0-arrow-flight-rpc-${pluginVersion}.zip" + dest arrowFlightRpcZipDest + overwrite false + onlyIfModified true + onlyIf { !project.findProperty('arrowFlightRpcZip') } +} + +def getAnalyticsEnginePlugin() { + provider { (RegularFile) (() -> file(project.findProperty('analyticsEngineZip') ?: analyticsEngineZipDest)) } +} + +def getArrowFlightRpcPlugin() { + provider { (RegularFile) (() -> file(project.findProperty('arrowFlightRpcZip') ?: arrowFlightRpcZipDest)) } +} + testClusters { integTest { testDistribution = 'archive' @@ -301,6 +331,14 @@ testClusters { plugin(getJobSchedulerPlugin()) plugin ":opensearch-sql-plugin" } + // Smoke test: verify sql loads cleanly alongside analytics-engine. + analyticsEngineCompat { + testDistribution = 'archive' + plugin(getJobSchedulerPlugin()) + plugin(getArrowFlightRpcPlugin()) + plugin(getAnalyticsEnginePlugin()) + plugin ":opensearch-sql-plugin" + } } def isPrometheusRunning() { @@ -351,9 +389,19 @@ task stopPrometheus(type: KillProcessTask) { stopPrometheus.mustRunAfter startPrometheus +task analyticsEngineCompatIT(type: RestIntegTestTask) { + useCluster testClusters.analyticsEngineCompat + dependsOn downloadAnalyticsEngineZip, downloadArrowFlightRpcZip + systemProperty 'tests.security.manager', 'false' + filter { + includeTestsMatching 'org.opensearch.sql.plugin.AnalyticsEngineCompatIT' + } +} + task integJdbcTest(type: RestIntegTestTask) { - testClusters.findAll {c -> c.clusterName == "integJdbcTest"}.first(). + testClusters.findAll {c -> c.clusterName == "integJdbcTest"}.first().with { plugin ":opensearch-sql-plugin" + } useJUnitPlatform() dependsOn ':opensearch-sql-plugin:bundlePlugin' @@ -546,10 +594,15 @@ integTest { // Exclude this IT, because they executed in another task (:integTestWithSecurity) exclude 'org/opensearch/sql/security/**' - // Workaround for Gradle 9.4.1 bug: TestEventReporterAsListener crashes with ClassCastException - // when encountering class-level @Ignore annotations. These classes were already skipped by JUnit; - // this moves the skip to the Gradle layer to avoid the buggy bridge. - // Remove once Gradle ships a fix (not fixed as of 9.5.0). + // Workaround for Gradle 9.4.1 ClassCastException in TestEventReporterAsListener.started + // (line 58) — the bridge casts a parent test descriptor's reporter to + // GroupTestEventReporterInternal but a class-level @Ignore produces a non-composite parent + // descriptor with a leaf reporter, so the cast fails and aborts the entire integTest task + // even though the tests would have been skipped anyway. The bridge is registered by Gradle's + // own AbstractTestTask (we can't bypass it from user code), so the only available remedy is + // to keep these classes off the test runner's input set. Net behaviour for CI: still + // skipped, just at the build layer instead of inside JUnit. Remove once Gradle ships a fix + // (not fixed as of 9.5.0). OrderIT is already excluded above. exclude 'org/opensearch/sql/calcite/remote/CalciteInformationSchemaCommandIT.class' exclude 'org/opensearch/sql/calcite/remote/CalciteJsonFunctionsIT.class' exclude 'org/opensearch/sql/calcite/remote/CalcitePrometheusDataSourceCommandsIT.class' @@ -559,8 +612,8 @@ integTest { exclude 'org/opensearch/sql/legacy/DateFunctionsIT.class' exclude 'org/opensearch/sql/legacy/HashJoinIT.class' exclude 'org/opensearch/sql/legacy/HavingIT.class' - exclude 'org/opensearch/sql/legacy/JoinIT.class' exclude 'org/opensearch/sql/legacy/JSONRequestIT.class' + exclude 'org/opensearch/sql/legacy/JoinIT.class' exclude 'org/opensearch/sql/legacy/MathFunctionsIT.class' exclude 'org/opensearch/sql/legacy/MetricsIT.class' exclude 'org/opensearch/sql/legacy/MultiQueryIT.class' @@ -568,9 +621,9 @@ integTest { exclude 'org/opensearch/sql/legacy/PreparedStatementIT.class' exclude 'org/opensearch/sql/legacy/QueryFunctionsIT.class' exclude 'org/opensearch/sql/legacy/QueryIT.class' + exclude 'org/opensearch/sql/legacy/SQLFunctionsIT.class' exclude 'org/opensearch/sql/legacy/ShowIT.class' exclude 'org/opensearch/sql/legacy/SourceFieldIT.class' - exclude 'org/opensearch/sql/legacy/SQLFunctionsIT.class' exclude 'org/opensearch/sql/legacy/SubqueryIT.class' exclude 'org/opensearch/sql/ppl/JsonFunctionsIT.class' exclude 'org/opensearch/sql/sql/ExpressionIT.class' diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteEvalCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteEvalCommandIT.java index 219020b1650..87bd412907d 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteEvalCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteEvalCommandIT.java @@ -17,6 +17,7 @@ import org.json.JSONObject; import org.junit.jupiter.api.Test; import org.opensearch.client.Request; +import org.opensearch.sql.legacy.TestUtils; import org.opensearch.sql.ppl.PPLIntegTestCase; public class CalciteEvalCommandIT extends PPLIntegTestCase { @@ -29,38 +30,63 @@ public void init() throws Exception { loadIndex(Index.BANK); loadIndex(Index.TELEMETRY); - // Create test data for string concatenation - Request request1 = new Request("PUT", "/test_eval/_doc/1?refresh=true"); - request1.setJsonEntity("{\"name\": \"Alice\", \"age\": 25, \"title\": \"Engineer\"}"); - client().performRequest(request1); - - Request request2 = new Request("PUT", "/test_eval/_doc/2?refresh=true"); - request2.setJsonEntity("{\"name\": \"Bob\", \"age\": 30, \"title\": \"Manager\"}"); - client().performRequest(request2); - - Request request3 = new Request("PUT", "/test_eval/_doc/3?refresh=true"); - request3.setJsonEntity("{\"name\": \"Charlie\", \"age\": null, \"title\": \"Analyst\"}"); - client().performRequest(request3); + // Pre-create test_eval through the helper so the analytics-engine compatibility run + // (tests.analytics.parquet_indices=true) provisions it as a parquet-backed composite + // index. Plain auto-mapping via the doc PUTs would create a Lucene-backed index, which + // the analytics-engine planner cannot scan ("No backend can scan all requested fields"). + // Explicit mapping pins types so both v2 (verifySchema "string"/"bigint") and analytics + // paths see the same shape regardless of dynamic-mapping behavior on the parquet engine. + // Guarded by isIndexExist for idempotency — init() runs before each @Test method. + if (!TestUtils.isIndexExist(client(), "test_eval")) { + String testEvalMapping = + "{\"mappings\":{\"properties\":{" + + "\"name\":{\"type\":\"keyword\"}," + + "\"age\":{\"type\":\"long\"}," + + "\"title\":{\"type\":\"keyword\"}}}}"; + TestUtils.createIndexByRestClient(client(), "test_eval", testEvalMapping); + + // Create test data for string concatenation + Request request1 = new Request("PUT", "/test_eval/_doc/1?refresh=true"); + request1.setJsonEntity("{\"name\": \"Alice\", \"age\": 25, \"title\": \"Engineer\"}"); + client().performRequest(request1); + + Request request2 = new Request("PUT", "/test_eval/_doc/2?refresh=true"); + request2.setJsonEntity("{\"name\": \"Bob\", \"age\": 30, \"title\": \"Manager\"}"); + client().performRequest(request2); + + Request request3 = new Request("PUT", "/test_eval/_doc/3?refresh=true"); + request3.setJsonEntity("{\"name\": \"Charlie\", \"age\": null, \"title\": \"Analyst\"}"); + client().performRequest(request3); + } // Index with a struct field `agent` to reproduce the reviewer's case from PR #5351: // source= | fields agent | eval agent.name = "test" // Rely on dynamic mapping — OpenSearch infers `agent` as an object with string children // from the document contents. Using dynamic mapping keeps the init idempotent across // repeated `@Before` invocations in the preserved cluster. - Request agentDoc1 = new Request("PUT", "/test_eval_agent/_doc/1?refresh=true"); - agentDoc1.setJsonEntity( - "{\"agent\": {\"name\": \"winlogbeat\", \"version\": \"7.0\"}, \"message\": \"hello\"}"); - client().performRequest(agentDoc1); - - Request agentDoc2 = new Request("PUT", "/test_eval_agent/_doc/2?refresh=true"); - agentDoc2.setJsonEntity( - "{\"agent\": {\"name\": \"filebeat\", \"version\": \"8.1\"}, \"message\": \"world\"}"); - client().performRequest(agentDoc2); + if (!TestUtils.isIndexExist(client(), "test_eval_agent")) { + Request agentDoc1 = new Request("PUT", "/test_eval_agent/_doc/1?refresh=true"); + agentDoc1.setJsonEntity( + "{\"agent\": {\"name\": \"winlogbeat\", \"version\": \"7.0\"}, \"message\": \"hello\"}"); + client().performRequest(agentDoc1); + + Request agentDoc2 = new Request("PUT", "/test_eval_agent/_doc/2?refresh=true"); + agentDoc2.setJsonEntity( + "{\"agent\": {\"name\": \"filebeat\", \"version\": \"8.1\"}, \"message\": \"world\"}"); + client().performRequest(agentDoc2); + } } @Test public void testEvalStringConcatenation() throws IOException { - JSONObject result = executeQuery("source=test_eval | eval greeting = 'Hello ' + name"); + // Pin the projection so column order is deterministic across execution paths — the + // analytics-engine route reads parquet schema in storage order, which can differ from the + // v2 / Lucene path's _source-iteration order. Adding an explicit | fields makes the test + // a strict assertion on the eval expression rather than a coincidence of projection order. + JSONObject result = + executeQuery( + "source=test_eval | eval greeting = 'Hello ' + name | fields name, title, age," + + " greeting"); verifySchema( result, schema("name", "string"), diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteFieldFormatCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteFieldFormatCommandIT.java index 86f87c90c81..24c7e504224 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteFieldFormatCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteFieldFormatCommandIT.java @@ -12,6 +12,7 @@ import org.json.JSONObject; import org.junit.jupiter.api.Test; import org.opensearch.client.Request; +import org.opensearch.sql.legacy.TestUtils; import org.opensearch.sql.ppl.PPLIntegTestCase; public class CalciteFieldFormatCommandIT extends PPLIntegTestCase { @@ -23,26 +24,48 @@ public void init() throws Exception { loadIndex(Index.BANK); - // Create test data for string concatenation - Request request1 = new Request("PUT", "/test_eval/_doc/1?refresh=true"); - request1.setJsonEntity("{\"name\": \"Alice\", \"age\": 25, \"title\": \"Engineer\"}"); - client().performRequest(request1); + // Pre-create test_eval through the helper so the analytics-engine compatibility run + // (tests.analytics.parquet_indices=true) provisions it as a parquet-backed composite + // index. Plain auto-mapping via the doc PUTs would create a Lucene-backed index, which + // the analytics-engine planner cannot scan ("No backend can scan all requested fields"). + // Explicit mapping pins types so both v2 (verifySchema "string"/"bigint") and analytics + // paths see the same shape regardless of dynamic-mapping behavior on the parquet engine. + // Guarded by isIndexExist for idempotency — init() runs before each @Test method. + if (!TestUtils.isIndexExist(client(), "test_eval")) { + String testEvalMapping = + "{\"mappings\":{\"properties\":{" + + "\"name\":{\"type\":\"keyword\"}," + + "\"age\":{\"type\":\"long\"}," + + "\"title\":{\"type\":\"keyword\"}}}}"; + TestUtils.createIndexByRestClient(client(), "test_eval", testEvalMapping); - Request request2 = new Request("PUT", "/test_eval/_doc/2?refresh=true"); - request2.setJsonEntity("{\"name\": \"Bob\", \"age\": 30, \"title\": \"Manager\"}"); - client().performRequest(request2); + // Create test data for string concatenation + Request request1 = new Request("PUT", "/test_eval/_doc/1?refresh=true"); + request1.setJsonEntity("{\"name\": \"Alice\", \"age\": 25, \"title\": \"Engineer\"}"); + client().performRequest(request1); - Request request3 = new Request("PUT", "/test_eval/_doc/3?refresh=true"); - request3.setJsonEntity("{\"name\": \"Charlie\", \"age\": null, \"title\": \"Analyst\"}"); - client().performRequest(request3); + Request request2 = new Request("PUT", "/test_eval/_doc/2?refresh=true"); + request2.setJsonEntity("{\"name\": \"Bob\", \"age\": 30, \"title\": \"Manager\"}"); + client().performRequest(request2); + + Request request3 = new Request("PUT", "/test_eval/_doc/3?refresh=true"); + request3.setJsonEntity("{\"name\": \"Charlie\", \"age\": null, \"title\": \"Analyst\"}"); + client().performRequest(request3); + } } @Test public void testFieldFormatStringConcatenation() throws IOException { + // Pin the projection so column order is deterministic across execution paths — the + // analytics-engine route reads parquet schema in storage order, which can differ from the + // v2 / Lucene path's _source-iteration order. Adding an explicit | fields makes the test + // a strict assertion on the fieldformat expression rather than a coincidence of projection + // order. JSONObject result = executeQuery( StringEscapeUtils.escapeJson( - "source=test_eval | fieldformat greeting = 'Hello ' + name")); + "source=test_eval | fieldformat greeting = 'Hello ' + name | fields name, title," + + " age, greeting")); verifySchema( result, schema("name", "string"), diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLRenameIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLRenameIT.java index 24401444457..3503d7c533c 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLRenameIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLRenameIT.java @@ -14,6 +14,10 @@ import static org.opensearch.sql.util.MatcherUtils.verifySchemaInOrder; import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.Map; +import org.hamcrest.Matcher; +import org.json.JSONArray; import org.json.JSONObject; import org.junit.Test; import org.opensearch.sql.ppl.PPLIntegTestCase; @@ -40,7 +44,7 @@ public void testRename() throws IOException { schema("country", "string"), schema("year", "int"), schema("month", "int")); - verifyStandardDataRows(result); + verifyStandardDataRows(result, "name", "country", "state", "month", "year", "renamed_age"); } @Test @@ -77,7 +81,7 @@ public void testRenameToMetaField() throws IOException { schema("country", "string"), schema("year", "int"), schema("month", "int")); - verifyStandardDataRows(result); + verifyStandardDataRows(result, "name", "country", "state", "month", "year", "_ID"); } @Test @@ -156,7 +160,7 @@ public void testRenameWildcardFields() throws IOException { schema("country", "string"), schema("year", "int"), schema("month", "int")); - verifyStandardDataRows(result); + verifyStandardDataRows(result, "nAME", "country", "state", "month", "year", "age"); } @Test @@ -171,7 +175,7 @@ public void testRenameMultipleWildcardFields() throws IOException { schema("couNTry", "string"), schema("year", "int"), schema("moNTh", "int")); - verifyStandardDataRows(result); + verifyStandardDataRows(result, "name", "couNTry", "state", "moNTh", "year", "age"); } @Test @@ -186,7 +190,7 @@ public void testRenameWildcardPrefix() throws IOException { schema("country", "string"), schema("year", "int"), schema("month", "int")); - verifyStandardDataRows(result); + verifyStandardDataRows(result, "new_na", "country", "state", "month", "year", "age"); } @Test @@ -247,7 +251,7 @@ public void testRenameMultipleWildcards() throws IOException { schema("country", "string"), schema("year", "int"), schema("MoNtH", "int")); - verifyStandardDataRows(result); + verifyStandardDataRows(result, "name", "country", "state", "MoNtH", "year", "age"); } @Test @@ -296,12 +300,14 @@ public void testRenamingToExistingField() throws IOException { schema("country", "string"), schema("year", "int"), schema("month", "int")); - verifyDataRows( + // After `rename name as age`, the original name column overwrites the original age column; + // the (number) age values are gone and only the (string) name values remain under "age". + verifyDataRowsByColumn( result, - rows("Jake", "USA", "California", 4, 2023), - rows("Hello", "USA", "New York", 4, 2023), - rows("John", "Canada", "Ontario", 4, 2023), - rows("Jane", "Canada", "Quebec", 4, 2023)); + rowOf("age", "Jake", "country", "USA", "state", "California", "month", 4, "year", 2023), + rowOf("age", "Hello", "country", "USA", "state", "New York", "month", 4, "year", 2023), + rowOf("age", "John", "country", "Canada", "state", "Ontario", "month", 4, "year", 2023), + rowOf("age", "Jane", "country", "Canada", "state", "Quebec", "month", 4, "year", 2023)); } @Test @@ -331,12 +337,12 @@ public void testRenamingNonExistentFieldToExistingField() throws IOException { schema("country", "string"), schema("year", "int"), schema("month", "int")); - verifyDataRows( + verifyDataRowsByColumn( result, - rows("Jake", "USA", "California", 4, 2023), - rows("Hello", "USA", "New York", 4, 2023), - rows("John", "Canada", "Ontario", 4, 2023), - rows("Jane", "Canada", "Quebec", 4, 2023)); + rowOf("name", "Jake", "country", "USA", "state", "California", "month", 4, "year", 2023), + rowOf("name", "Hello", "country", "USA", "state", "New York", "month", 4, "year", 2023), + rowOf("name", "John", "country", "Canada", "state", "Ontario", "month", 4, "year", 2023), + rowOf("name", "Jane", "country", "Canada", "state", "Quebec", "month", 4, "year", 2023)); } @Test @@ -380,7 +386,7 @@ public void testMultipleRenameWithoutComma() throws IOException { schema("location", "string"), schema("year", "int"), schema("month", "int")); - verifyStandardDataRows(result); + verifyStandardDataRows(result, "user_name", "location", "state", "month", "year", "user_age"); } @Test @@ -398,15 +404,103 @@ public void testRenameMixedCommaAndSpace() throws IOException { schema("location", "string"), schema("year", "int"), schema("month", "int")); - verifyStandardDataRows(result); + verifyStandardDataRows(result, "user_name", "location", "state", "month", "year", "user_age"); + } + + /** + * Build a {@code column -> value} map from interleaved varargs ({@code key1, val1, key2, val2, + * ...}). Preserves insertion order so the expected-row mapping reads naturally at the call site. + */ + private static Map rowOf(Object... pairs) { + if (pairs.length % 2 != 0) { + throw new IllegalArgumentException("rowOf expects an even number of args (key, value, ...)"); + } + Map row = new LinkedHashMap<>(); + for (int i = 0; i < pairs.length; i += 2) { + row.put((String) pairs[i], pairs[i + 1]); + } + return row; } private void verifyStandardDataRows(JSONObject result) { - verifyDataRows( - result, - rows("Jake", "USA", "California", 4, 2023, 70), - rows("Hello", "USA", "New York", 4, 2023, 30), - rows("John", "Canada", "Ontario", 4, 2023, 25), - rows("Jane", "Canada", "Quebec", 4, 2023, 20)); + verifyStandardDataRows(result, "name", "country", "state", "month", "year", "age"); + } + + /** + * Verify the four canonical state_country rows independently of column order. + * + *

The schema check above ({@code verifySchema}) is set-equality on column names; the data row + * check {@code verifyDataRows} is positional. The two paths the analytics-engine route can take + * return columns in different orders (parquet preserves storage order, the v2 / Lucene path + * preserves {@code _source} iteration order), and either is valid given the contract {@code + * verifySchema} declares. To avoid baking either order into the test, this helper takes the + * canonical-position column names as varargs and reorders the canonical row values to match + * whatever column order the response actually returned. + * + * @param result the response JSON + * @param canonicalColumns the column names of the four canonical rows in {@code (name-or-renamed, + * country-or-renamed, state, month, year, age-or-renamed)} order. Pass the rename target + * where applicable. + */ + private void verifyStandardDataRows(JSONObject result, String... canonicalColumns) { + if (canonicalColumns.length != 6) { + throw new IllegalArgumentException( + "verifyStandardDataRows expects 6 canonical column names; got " + + canonicalColumns.length); + } + Object[][] canonicalValues = + new Object[][] { + {"Jake", "USA", "California", 4, 2023, 70}, + {"Hello", "USA", "New York", 4, 2023, 30}, + {"John", "Canada", "Ontario", 4, 2023, 25}, + {"Jane", "Canada", "Quebec", 4, 2023, 20} + }; + Map[] expectedRows = new LinkedHashMap[canonicalValues.length]; + for (int i = 0; i < canonicalValues.length; i++) { + Map row = new LinkedHashMap<>(); + for (int c = 0; c < canonicalColumns.length; c++) { + row.put(canonicalColumns[c], canonicalValues[i][c]); + } + expectedRows[i] = row; + } + verifyDataRowsByColumn(result, expectedRows); + } + + /** + * Match expected rows against the response by column name, ignoring the response's column + * emission order. For each expected row (a {@code column-name -> value} map), the value at each + * schema position is looked up by name. Tests using this helper become engine-order agnostic: a + * parquet-backed response and a Lucene-backed response yield the same assertion outcome as long + * as the column-name-to-value mapping agrees. + */ + @SafeVarargs + @SuppressWarnings("varargs") + private final void verifyDataRowsByColumn( + JSONObject result, Map... expectedRows) { + JSONArray schema = result.getJSONArray("schema"); + int n = schema.length(); + String[] columnOrder = new String[n]; + for (int i = 0; i < n; i++) { + columnOrder[i] = schema.getJSONObject(i).getString("name"); + } + @SuppressWarnings({"unchecked", "rawtypes"}) + Matcher[] rowMatchers = new Matcher[expectedRows.length]; + for (int r = 0; r < expectedRows.length; r++) { + Object[] reordered = new Object[n]; + for (int c = 0; c < n; c++) { + if (!expectedRows[r].containsKey(columnOrder[c])) { + throw new IllegalArgumentException( + "Expected row at index " + + r + + " is missing canonical value for response column [" + + columnOrder[c] + + "]; provided keys: " + + expectedRows[r].keySet()); + } + reordered[c] = expectedRows[r].get(columnOrder[c]); + } + rowMatchers[r] = rows(reordered); + } + verifyDataRows(result, rowMatchers); } } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteReplaceCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteReplaceCommandIT.java index 44cc4a3aaf0..5943a3c5d30 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteReplaceCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteReplaceCommandIT.java @@ -9,6 +9,10 @@ import static org.opensearch.sql.util.MatcherUtils.*; import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.Map; +import org.hamcrest.Matcher; +import org.json.JSONArray; import org.json.JSONObject; import org.junit.Test; import org.opensearch.sql.common.antlr.SyntaxCheckException; @@ -61,12 +65,41 @@ public void testMultipleReplace() throws IOException { schema("year", "int"), schema("age", "int")); - verifyDataRows( + // Match by column name — analytics-engine and v2 paths return columns in different orders. + verifyDataRowsByColumn( result, - rows("Jake", "United States", "California", 4, 2023, 70), - rows("Hello", "United States", "New York", 4, 2023, 30), - rows("John", "Canada", "Ontario", 4, 2023, 25), - rows("Joseph", "Canada", "Quebec", 4, 2023, 20)); + rowOf( + "name", + "Jake", + "country", + "United States", + "state", + "California", + "month", + 4, + "year", + 2023, + "age", + 70), + rowOf( + "name", + "Hello", + "country", + "United States", + "state", + "New York", + "month", + 4, + "year", + 2023, + "age", + 30), + rowOf( + "name", "John", "country", "Canada", "state", "Ontario", "month", 4, "year", 2023, + "age", 25), + rowOf( + "name", "Joseph", "country", "Canada", "state", "Quebec", "month", 4, "year", 2023, + "age", 20)); } @Test @@ -121,12 +154,40 @@ public void testEmptyStringReplacement() throws IOException { schema("year", "int"), schema("age", "int")); - verifyDataRows( + verifyDataRowsByColumn( result, - rows("Jake", "", "California", 4, 2023, 70), - rows("Hello", "", "New York", 4, 2023, 30), - rows("John", "Canada", "Ontario", 4, 2023, 25), - rows("Jane", "Canada", "Quebec", 4, 2023, 20)); + rowOf( + "name", + "Jake", + "country", + "", + "state", + "California", + "month", + 4, + "year", + 2023, + "age", + 70), + rowOf( + "name", + "Hello", + "country", + "", + "state", + "New York", + "month", + 4, + "year", + 2023, + "age", + 30), + rowOf( + "name", "John", "country", "Canada", "state", "Ontario", "month", 4, "year", 2023, + "age", 25), + rowOf( + "name", "Jane", "country", "Canada", "state", "Quebec", "month", 4, "year", 2023, "age", + 20)); } @Test @@ -146,12 +207,40 @@ public void testMultipleFieldsInClause() throws IOException { schema("year", "int"), schema("age", "int")); - verifyDataRows( + verifyDataRowsByColumn( result, - rows("Jake", "United States", "California", 4, 2023, 70), - rows("Hello", "United States", "New York", 4, 2023, 30), - rows("John", "Canada", "Ontario", 4, 2023, 25), - rows("Jane", "Canada", "Quebec", 4, 2023, 20)); + rowOf( + "name", + "Jake", + "country", + "United States", + "state", + "California", + "month", + 4, + "year", + 2023, + "age", + 70), + rowOf( + "name", + "Hello", + "country", + "United States", + "state", + "New York", + "month", + 4, + "year", + 2023, + "age", + 30), + rowOf( + "name", "John", "country", "Canada", "state", "Ontario", "month", 4, "year", 2023, + "age", 25), + rowOf( + "name", "Jane", "country", "Canada", "state", "Quebec", "month", 4, "year", 2023, "age", + 20)); } @Test @@ -164,10 +253,16 @@ public void testReplaceNonExistentField() { String.format( "source = %s | replace 'USA' WITH 'United States' IN non_existent_field", TEST_INDEX_STATE_COUNTRY))); - verifyErrorMessageContains( - e, - "field [non_existent_field] not found; input fields are: [name, country, state, month," - + " year, age, _id, _index, _score, _maxscore, _sort, _routing]"); + // Order-agnostic — analytics-engine and v2 paths emit the input-field list in different + // orders (parquet preserves storage order, Lucene preserves _source iteration order). + // Assert that the prefix and every expected field name appear somewhere in the message. + verifyErrorMessageContains(e, "field [non_existent_field] not found; input fields are:"); + verifyErrorMessageContains(e, "name"); + verifyErrorMessageContains(e, "country"); + verifyErrorMessageContains(e, "state"); + verifyErrorMessageContains(e, "month"); + verifyErrorMessageContains(e, "year"); + verifyErrorMessageContains(e, "age"); } @Test @@ -259,12 +354,40 @@ public void testMultiplePairsInSingleCommand() throws IOException { schema("year", "int"), schema("age", "int")); - verifyDataRows( + verifyDataRowsByColumn( result, - rows("Jake", "United States", "California", 4, 2023, 70), - rows("Hello", "United States", "New York", 4, 2023, 30), - rows("John", "CA", "Ontario", 4, 2023, 25), - rows("Jane", "CA", "Quebec", 4, 2023, 20)); + rowOf( + "name", + "Jake", + "country", + "United States", + "state", + "California", + "month", + 4, + "year", + 2023, + "age", + 70), + rowOf( + "name", + "Hello", + "country", + "United States", + "state", + "New York", + "month", + 4, + "year", + 2023, + "age", + 30), + rowOf( + "name", "John", "country", "CA", "state", "Ontario", "month", 4, "year", 2023, "age", + 25), + rowOf( + "name", "Jane", "country", "CA", "state", "Quebec", "month", 4, "year", 2023, "age", + 20)); } @Test @@ -402,4 +525,61 @@ public void testEscapeSequence_noMatchLiteral() throws IOException { // Pattern "foo\*bar" matches literal "foo*bar", not "fooXbar", so original value returned verifyDataRows(result, rows("fooXbar")); } + + /** + * Build a {@code column -> value} map from interleaved varargs ({@code key1, val1, key2, val2, + * ...}). Preserves insertion order so the expected-row mapping reads naturally at the call site. + */ + private static Map rowOf(Object... pairs) { + if (pairs.length % 2 != 0) { + throw new IllegalArgumentException("rowOf expects an even number of args (key, value, ...)"); + } + Map row = new LinkedHashMap<>(); + for (int i = 0; i < pairs.length; i += 2) { + row.put((String) pairs[i], pairs[i + 1]); + } + return row; + } + + /** + * Match expected rows against the response by column name, ignoring the response's column + * emission order. The two paths the analytics-engine route can take return columns in different + * orders (parquet preserves storage order, the v2 / Lucene path preserves {@code _source} + * iteration order), and either is valid given the contract {@code verifySchema} declares (set + * equality on column names). To avoid baking either order into the test, this helper reorders + * each expected row to match whatever column order the response actually returned. + * + *

Mirrors the helper in {@code CalcitePPLRenameIT} (commit 59c728b) — same pattern applied to + * PPL {@code replace} command tests. + */ + @SafeVarargs + @SuppressWarnings("varargs") + private final void verifyDataRowsByColumn( + JSONObject result, Map... expectedRows) { + JSONArray schema = result.getJSONArray("schema"); + int n = schema.length(); + String[] columnOrder = new String[n]; + for (int i = 0; i < n; i++) { + columnOrder[i] = schema.getJSONObject(i).getString("name"); + } + @SuppressWarnings({"unchecked", "rawtypes"}) + Matcher[] rowMatchers = new Matcher[expectedRows.length]; + for (int r = 0; r < expectedRows.length; r++) { + Object[] reordered = new Object[n]; + for (int c = 0; c < n; c++) { + if (!expectedRows[r].containsKey(columnOrder[c])) { + throw new IllegalArgumentException( + "Expected row at index " + + r + + " is missing canonical value for response column [" + + columnOrder[c] + + "]; provided keys: " + + expectedRows[r].keySet()); + } + reordered[c] = expectedRows[r].get(columnOrder[c]); + } + rowMatchers[r] = rows(reordered); + } + verifyDataRows(result, rowMatchers); + } } diff --git a/integ-test/src/test/java/org/opensearch/sql/plugin/AnalyticsEngineCompatIT.java b/integ-test/src/test/java/org/opensearch/sql/plugin/AnalyticsEngineCompatIT.java new file mode 100644 index 00000000000..5cd89fa7cd9 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/plugin/AnalyticsEngineCompatIT.java @@ -0,0 +1,21 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin; + +import org.junit.Test; +import org.opensearch.test.rest.OpenSearchRestTestCase; + +/** + * Smoke test: verifies that opensearch-sql loads cleanly alongside arrow-flight-rpc and + * analytics-engine. A successful cluster start is the only assertion — no sql-specific logic runs. + */ +public class AnalyticsEngineCompatIT extends OpenSearchRestTestCase { + + @Test + public void testClusterStarted() { + // If the cluster booted, all three plugins loaded without classloader errors. + } +} diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java index 676dee1751a..2950f6c9b85 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java @@ -17,6 +17,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.BiFunction; import java.util.function.Predicate; import java.util.regex.Pattern; import org.apache.logging.log4j.LogManager; @@ -82,10 +83,21 @@ public class RestSqlAction extends BaseRestHandler { /** New SQL query request handler. */ private final RestSQLQueryAction newSqlQueryHandler; - public RestSqlAction(Settings settings, Injector injector) { + /** + * Analytics router. Called before the normal SQL engine. Accepts the request and channel, returns + * {@code true} if it handled the request (analytics index), {@code false} to fall through to + * normal SQL engine. + */ + private final BiFunction analyticsRouter; + + public RestSqlAction( + Settings settings, + Injector injector, + BiFunction analyticsRouter) { super(); this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings); this.newSqlQueryHandler = new RestSQLQueryAction(injector); + this.analyticsRouter = analyticsRouter; } @Override @@ -133,7 +145,6 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli Format format = SqlRequestParam.getFormat(request.params()); - // Route request to new query engine if it's supported already SQLQueryRequest newSqlRequest = new SQLQueryRequest( sqlRequest.getJsonContent(), @@ -141,31 +152,58 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli request.path(), request.params(), sqlRequest.cursor()); - return newSqlQueryHandler.prepareRequest( - newSqlRequest, - (restChannel, exception) -> { - try { - if (newSqlRequest.isExplainRequest()) { - LOG.info( - "Request is falling back to old SQL engine due to: " + exception.getMessage()); - } - LOG.info( - "[{}] Request {} is not supported and falling back to old SQL engine", - QueryContext.getRequestId(), - newSqlRequest); - LOG.info("Request Query: {}", QueryDataAnonymizer.anonymizeData(sqlRequest.getSql())); - QueryAction queryAction = explainRequest(client, sqlRequest, format); - executeSqlRequest(request, queryAction, client, restChannel); - } catch (Exception e) { - handleException(restChannel, e); - } - }, - this::handleException); + + // Route to analytics engine for non-Lucene (e.g., Parquet-backed) indices. + // The router returns true and sends the response directly if it handled the request. + final SQLQueryRequest finalRequest = newSqlRequest; + return channel -> { + if (!analyticsRouter.apply(finalRequest, channel)) { + delegateToV2Engine(request, client, sqlRequest, finalRequest, format, channel); + } + }; } catch (Exception e) { return channel -> handleException(channel, e); } } + /** Delegate a SQL query to the V2 engine with legacy fallback. */ + private void delegateToV2Engine( + RestRequest request, + NodeClient client, + SqlRequest sqlRequest, + SQLQueryRequest sqlQueryRequest, + Format format, + RestChannel channel) { + try { + newSqlQueryHandler + .prepareRequest( + sqlQueryRequest, + (restChannel, exception) -> { + try { + if (sqlQueryRequest.isExplainRequest()) { + LOG.info( + "Request is falling back to old SQL engine due to: " + + exception.getMessage()); + } + LOG.info( + "[{}] Request {} is not supported and falling back to old SQL engine", + QueryContext.getRequestId(), + sqlQueryRequest); + LOG.info( + "Request Query: {}", QueryDataAnonymizer.anonymizeData(sqlRequest.getSql())); + QueryAction queryAction = explainRequest(client, sqlRequest, format); + executeSqlRequest(request, queryAction, client, restChannel); + } catch (Exception e) { + handleException(restChannel, e); + } + }, + this::handleException) + .accept(channel); + } catch (Exception e) { + handleException(channel, e); + } + } + private void handleException(RestChannel restChannel, Exception exception) { RestStatus status = getRestStatus(exception); logAndPublishMetrics(status, exception); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/CalciteScriptEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/CalciteScriptEngine.java index 224d7019ec2..6e2240909b0 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/CalciteScriptEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/CalciteScriptEngine.java @@ -77,6 +77,7 @@ import org.opensearch.script.ScriptEngine; import org.opensearch.script.StringSortScript; import org.opensearch.search.lookup.SourceLookup; +import org.opensearch.sql.calcite.utils.CalciteClassLoaderHelper; import org.opensearch.sql.data.model.ExprTimestampValue; import org.opensearch.sql.opensearch.storage.script.aggregation.CalciteAggregationScriptFactory; import org.opensearch.sql.opensearch.storage.script.field.CalciteFieldScriptFactory; @@ -138,7 +139,9 @@ public T compile( new RelRecordType(List.of())); Function1 function = - new RexExecutable(code, "generated Rex code").getFunction(); + CalciteClassLoaderHelper.withCalciteClassLoader( + () -> new RexExecutable(code, "generated Rex code").getFunction(), + CalciteScriptEngine.class); if (CONTEXTS.containsKey(context)) { return context.factoryClazz.cast(CONTEXTS.get(context).apply(function, rexNode.getType())); diff --git a/plugin/build.gradle b/plugin/build.gradle index 340787fa01f..708c4b18b35 100644 --- a/plugin/build.gradle +++ b/plugin/build.gradle @@ -1,5 +1,4 @@ import java.util.concurrent.Callable -import org.opensearch.gradle.dependencies.CompileOnlyResolvePlugin /* * Copyright OpenSearch Contributors @@ -55,7 +54,7 @@ opensearchplugin { name 'opensearch-sql' description 'OpenSearch SQL' classname 'org.opensearch.sql.plugin.SQLPlugin' - extendedPlugins = ['opensearch-job-scheduler'] + extendedPlugins = ['opensearch-job-scheduler', 'analytics-engine;optional=true'] licenseFile rootProject.file("LICENSE.txt") noticeFile rootProject.file("NOTICE") } @@ -160,6 +159,8 @@ dependencies { api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson_annotations}" api project(":ppl") + api project(':api') + implementation("org.opensearch.sandbox:analytics-api:${opensearch_version}") api project(':legacy') api project(':opensearch') api project(':prometheus') @@ -320,4 +321,3 @@ testClusters.integTest { run { useCluster testClusters.integTest } - diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index d92788ac43b..3692f49688d 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -18,6 +18,7 @@ import java.util.Collection; import java.util.List; import java.util.Objects; +import java.util.function.BiFunction; import java.util.function.Supplier; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; @@ -36,8 +37,10 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsFilter; import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import org.opensearch.core.action.ActionListener; import org.opensearch.core.action.ActionResponse; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; @@ -51,11 +54,15 @@ import org.opensearch.plugins.ScriptPlugin; import org.opensearch.plugins.SystemIndexPlugin; import org.opensearch.repositories.RepositoriesService; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestChannel; import org.opensearch.rest.RestController; import org.opensearch.rest.RestHandler; import org.opensearch.script.ScriptContext; import org.opensearch.script.ScriptEngine; import org.opensearch.script.ScriptService; +import org.opensearch.sql.ast.statement.ExplainMode; +import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelper; import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl; @@ -87,6 +94,8 @@ import org.opensearch.sql.directquery.transport.model.ReadDirectQueryResourcesActionResponse; import org.opensearch.sql.directquery.transport.model.WriteDirectQueryResourcesActionResponse; import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.ExecutionEngine.ExplainResponse; +import org.opensearch.sql.executor.QueryType; import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.legacy.plugin.RestSqlAction; @@ -97,14 +106,18 @@ import org.opensearch.sql.opensearch.storage.script.CompoundedScriptEngine; import org.opensearch.sql.plugin.config.EngineExtensionsHolder; import org.opensearch.sql.plugin.config.OpenSearchPluginModule; +import org.opensearch.sql.plugin.rest.AnalyticsExecutorHolder; import org.opensearch.sql.plugin.rest.RestPPLGrammarAction; import org.opensearch.sql.plugin.rest.RestPPLQueryAction; import org.opensearch.sql.plugin.rest.RestPPLStatsAction; import org.opensearch.sql.plugin.rest.RestQuerySettingsAction; +import org.opensearch.sql.plugin.rest.RestUnifiedQueryAction; import org.opensearch.sql.plugin.transport.PPLQueryAction; import org.opensearch.sql.plugin.transport.TransportPPLQueryAction; import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory; +import org.opensearch.sql.protocol.response.format.JsonResponseFormatter; +import org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style; import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService; import org.opensearch.sql.spark.cluster.ClusterManagerEventListener; import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl; @@ -120,6 +133,7 @@ import org.opensearch.sql.spark.transport.model.CancelAsyncQueryActionResponse; import org.opensearch.sql.spark.transport.model.CreateAsyncQueryActionResponse; import org.opensearch.sql.spark.transport.model.GetAsyncQueryResultActionResponse; +import org.opensearch.sql.sql.domain.SQLQueryRequest; import org.opensearch.sql.storage.DataSourceFactory; import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.FixedExecutorBuilder; @@ -185,7 +199,7 @@ public List getRestHandlers( return Arrays.asList( new RestPPLQueryAction(), new RestPPLGrammarAction(), - new RestSqlAction(settings, injector), + new RestSqlAction(settings, injector, createSqlAnalyticsRouter()), new RestSqlStatsAction(settings, restController), new RestPPLStatsAction(settings, restController), new RestQuerySettingsAction(settings, restController), @@ -195,6 +209,88 @@ public List getRestHandlers( new RestDirectQueryResourcesManagementAction((OpenSearchSettings) pluginSettings)); } + /** + * Creates a routing function for SQL queries targeting analytics engine indices. Returns {@code + * true} if the query was handled (analytics index), {@code false} to fall through to normal SQL. + * + *

The {@link RestUnifiedQueryAction} is built lazily on the first request because the + * analytics-engine {@code QueryPlanExecutor} is published into {@link AnalyticsExecutorHolder} by + * {@code TransportPPLQueryAction}'s {@code @Inject} constructor — which fires after the Node + * Guice injector is built, i.e. after {@code getRestHandlers}. If the executor is still + * unavailable when a SQL request arrives, the router falls through to the legacy SQL path. + */ + private BiFunction createSqlAnalyticsRouter() { + final RestUnifiedQueryAction[] cached = new RestUnifiedQueryAction[1]; + java.util.function.Supplier handlerSupplier = + () -> { + if (cached[0] == null) { + var executor = AnalyticsExecutorHolder.get(); + if (executor == null) { + return null; + } + cached[0] = + new RestUnifiedQueryAction(client, clusterService, executor, pluginSettings); + } + return cached[0]; + }; + return (sqlRequest, channel) -> { + RestUnifiedQueryAction unifiedQueryHandler = handlerSupplier.get(); + if (unifiedQueryHandler == null + || !unifiedQueryHandler.isAnalyticsIndex(sqlRequest.getQuery(), QueryType.SQL)) { + return false; + } + if (sqlRequest.isExplainRequest()) { + unifiedQueryHandler.explain( + sqlRequest.getQuery(), + QueryType.SQL, + ExplainMode.STANDARD, + new ResponseListener<>() { + @Override + public void onResponse(ExplainResponse response) { + JsonResponseFormatter formatter = + new JsonResponseFormatter<>(Style.PRETTY) { + @Override + protected Object buildJsonObject(ExplainResponse resp) { + return resp; + } + }; + channel.sendResponse( + new BytesRestResponse( + RestStatus.OK, + "application/json; charset=UTF-8", + formatter.format(response))); + } + + @Override + public void onFailure(Exception e) { + channel.sendResponse( + new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); + } + }); + } else { + unifiedQueryHandler.execute( + sqlRequest.getQuery(), + QueryType.SQL, + false, + new ActionListener<>() { + @Override + public void onResponse(TransportPPLQueryResponse response) { + channel.sendResponse( + new BytesRestResponse( + RestStatus.OK, "application/json; charset=UTF-8", response.getResult())); + } + + @Override + public void onFailure(Exception e) { + channel.sendResponse( + new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); + } + }); + } + return true; + }; + } + /** Register action and handler so that transportClient can find proxy for action. */ @Override public List> getActions() { diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/AnalyticsExecutorHolder.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/AnalyticsExecutorHolder.java new file mode 100644 index 00000000000..fa3e7d1d1fa --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/AnalyticsExecutorHolder.java @@ -0,0 +1,36 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.rest; + +import org.apache.calcite.rel.RelNode; +import org.opensearch.analytics.exec.QueryPlanExecutor; + +/** + * Bridge for sharing the analytics-engine {@link QueryPlanExecutor} between the PPL transport + * action (where Guice resolves the binding via {@code @Inject}) and the REST-only SQL router (where + * Guice cannot, because {@code SQLPlugin#getRestHandlers} runs before the Node-level injector + * satisfies {@code @Inject} parameters). + * + *

Why a static holder: cross-plugin Guice injection needs a class registered in the Node + * injector, and {@link org.opensearch.sql.plugin.SQLPlugin}'s SQL routing path is built in {@code + * getRestHandlers} — outside any Guice-managed lifecycle. Persisting the executor in this holder + * once {@link org.opensearch.sql.plugin.transport.TransportPPLQueryAction} is constructed lets the + * SQL router read the same instance without going back through the injector. + */ +public final class AnalyticsExecutorHolder { + + private static volatile QueryPlanExecutor> executor; + + private AnalyticsExecutorHolder() {} + + public static void set(QueryPlanExecutor> instance) { + executor = instance; + } + + public static QueryPlanExecutor> get() { + return executor; + } +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java new file mode 100644 index 00000000000..e1bb84778dd --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java @@ -0,0 +1,290 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.rest; + +import static org.opensearch.sql.executor.ExecutionEngine.ExplainResponse; +import static org.opensearch.sql.lang.PPLLangSpec.PPL_SPEC; +import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME; +import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY; + +import java.util.Map; +import java.util.Optional; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlJoin; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlSelect; +import org.apache.calcite.sql.util.SqlBasicVisitor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.ThreadContext; +import org.opensearch.analytics.exec.QueryPlanExecutor; +import org.opensearch.analytics.schema.OpenSearchSchemaBuilder; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; +import org.opensearch.index.IndexSettings; +import org.opensearch.sql.api.UnifiedQueryContext; +import org.opensearch.sql.api.UnifiedQueryPlanner; +import org.opensearch.sql.ast.AbstractNodeVisitor; +import org.opensearch.sql.ast.statement.ExplainMode; +import org.opensearch.sql.ast.tree.Relation; +import org.opensearch.sql.ast.tree.UnresolvedPlan; +import org.opensearch.sql.calcite.CalcitePlanContext; +import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.executor.ExecutionEngine.QueryResponse; +import org.opensearch.sql.executor.QueryType; +import org.opensearch.sql.executor.analytics.AnalyticsExecutionEngine; +import org.opensearch.sql.lang.LangSpec; +import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; +import org.opensearch.sql.protocol.response.QueryResult; +import org.opensearch.sql.protocol.response.format.ResponseFormatter; +import org.opensearch.sql.protocol.response.format.SimpleJsonResponseFormatter; +import org.opensearch.transport.client.node.NodeClient; + +/** + * Handles queries routed to the Analytics engine via the unified query pipeline. Parses PPL/SQL + * queries using {@link UnifiedQueryPlanner} to generate a Calcite {@link RelNode}, then delegates + * to {@link AnalyticsExecutionEngine} for execution. + */ +public class RestUnifiedQueryAction { + + private static final Logger LOG = LogManager.getLogger(RestUnifiedQueryAction.class); + private static final String SCHEMA_NAME = "opensearch"; + + private final AnalyticsExecutionEngine analyticsEngine; + private final NodeClient client; + private final ClusterService clusterService; + private final org.opensearch.sql.common.setting.Settings pluginSettings; + + public RestUnifiedQueryAction( + NodeClient client, + ClusterService clusterService, + QueryPlanExecutor> planExecutor, + org.opensearch.sql.common.setting.Settings pluginSettings) { + this.client = client; + this.clusterService = clusterService; + this.analyticsEngine = new AnalyticsExecutionEngine(planExecutor); + this.pluginSettings = pluginSettings; + } + + /** + * Returns true iff the target index has {@link + * IndexSettings#PLUGGABLE_DATAFORMAT_ENABLED_SETTING} set and {@link + * IndexSettings#PLUGGABLE_DATAFORMAT_VALUE_SETTING} is {@code "composite"}, routing it to + * DataFusion instead of the Calcite→DSL path. + * + *

Note: This creates a separate UnifiedQueryContext for parsing. The context cannot be shared + * with doExecute/doExplain because UnifiedQueryContext holds a Calcite JDBC connection that fails + * when used across threads (transport thread -> sql-worker thread). When real catalog metadata + * makes this expensive, consider moving the routing check to the sql-worker thread. + */ + public boolean isAnalyticsIndex(String query, QueryType queryType) { + if (query == null || query.isEmpty()) { + return false; + } + try (UnifiedQueryContext context = buildParsingContext(queryType)) { + return extractIndexName(query, queryType, context) + .map(this::stripSchemaPrefix) + .map(this::isPluggableDataformatIndex) + .orElse(false); + } catch (Exception e) { + return false; + } + } + + private String stripSchemaPrefix(String indexName) { + int lastDot = indexName.lastIndexOf('.'); + return lastDot >= 0 ? indexName.substring(lastDot + 1) : indexName; + } + + private boolean isPluggableDataformatIndex(String indexName) { + var indexMetadata = clusterService.state().metadata().index(indexName); + if (indexMetadata == null) { + return false; + } + var settings = indexMetadata.getSettings(); + return IndexSettings.PLUGGABLE_DATAFORMAT_ENABLED_SETTING.get(settings) + && "composite".equals(IndexSettings.PLUGGABLE_DATAFORMAT_VALUE_SETTING.get(settings)); + } + + /** Execute a query through the unified query pipeline on the sql-worker thread pool. */ + public void execute( + String query, + QueryType queryType, + boolean profiling, + ActionListener listener) { + client + .threadPool() + .schedule( + withCurrentContext( + () -> { + try (UnifiedQueryContext context = buildContext(queryType, profiling)) { + UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); + RelNode plan = planner.plan(query); + CalcitePlanContext planContext = context.getPlanContext(); + plan = addQuerySizeLimit(plan, planContext); + analyticsEngine.execute( + plan, planContext, createQueryListener(queryType, listener)); + } catch (Exception e) { + listener.onFailure(e); + } + }), + new TimeValue(0), + SQL_WORKER_THREAD_POOL_NAME); + } + + /** + * Explain a query through the unified query pipeline on the sql-worker thread pool. Returns + * ExplainResponse via ResponseListener so the caller can format it. + */ + public void explain( + String query, + QueryType queryType, + ExplainMode mode, + ResponseListener listener) { + client + .threadPool() + .schedule( + withCurrentContext( + () -> { + try (UnifiedQueryContext context = buildContext(queryType, false)) { + UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); + RelNode plan = planner.plan(query); + CalcitePlanContext planContext = context.getPlanContext(); + plan = addQuerySizeLimit(plan, planContext); + analyticsEngine.explain(plan, mode, planContext, listener); + } catch (Exception e) { + listener.onFailure(e); + } + }), + new TimeValue(0), + SQL_WORKER_THREAD_POOL_NAME); + } + + /** + * Build a lightweight context for parsing only (index name extraction). Does not require cluster + * state or catalog schema. + */ + private UnifiedQueryContext buildParsingContext(QueryType queryType) { + return applyClusterOverrides(UnifiedQueryContext.builder().language(queryType)).build(); + } + + private UnifiedQueryContext buildContext(QueryType queryType, boolean profiling) { + return applyClusterOverrides( + UnifiedQueryContext.builder() + .language(queryType) + .catalog(SCHEMA_NAME, OpenSearchSchemaBuilder.buildSchema(clusterService.state())) + .defaultNamespace(SCHEMA_NAME) + .profiling(profiling)) + .build(); + } + + /** + * Routes operator-configured cluster overrides into the builder via the existing {@code + * setting(String, Object)} API, keeping {@link UnifiedQueryContext} decoupled from any specific + * {@link org.opensearch.sql.common.setting.Settings} implementation. + * + *

Currently scoped to {@code plugins.ppl.rex.max_match.limit} — required so the unified path + * honors {@code _cluster/settings} updates for {@code rex max_match} (CalciteRexCommandIT's + * testRexMaxMatchConfigurableLimit). Add keys here if a future PR / IT depends on cluster-side + * fidelity for one of the other planning settings. + */ + private UnifiedQueryContext.Builder applyClusterOverrides(UnifiedQueryContext.Builder builder) { + Object rexLimit = + pluginSettings.getSettingValue( + org.opensearch.sql.common.setting.Settings.Key.PPL_REX_MAX_MATCH_LIMIT); + if (rexLimit != null) { + builder.setting( + org.opensearch.sql.common.setting.Settings.Key.PPL_REX_MAX_MATCH_LIMIT.getKeyValue(), + rexLimit); + } + return builder; + } + + /** + * Extract the source index name by parsing the query and visiting the AST to find the Relation + * node. Uses the context's parser which supports both PPL and SQL. + */ + private static Optional extractIndexName( + String query, QueryType queryType, UnifiedQueryContext context) { + if (queryType == QueryType.PPL) { + UnresolvedPlan unresolvedPlan = (UnresolvedPlan) context.getParser().parse(query); + return Optional.ofNullable(unresolvedPlan.accept(new IndexNameExtractor(), null)); + } + SqlNode sqlNode = (SqlNode) context.getParser().parse(query); + return Optional.ofNullable(extractTableNameFromSqlNode(sqlNode)); + } + + /** AST visitor that extracts the source index name from a Relation node (PPL path). */ + private static class IndexNameExtractor extends AbstractNodeVisitor { + @Override + public String visitRelation(Relation node, Void context) { + return node.getTableQualifiedName().toString(); + } + } + + /** SqlNode visitor that extracts the source table name from a SQL parse tree. */ + private static class SqlTableNameExtractor extends SqlBasicVisitor { + @Override + public String visit(SqlCall call) { + if (call instanceof SqlSelect select) { + return select.getFrom().accept(this); + } + if (call instanceof SqlJoin join) { + return join.getLeft().accept(this); + } + return null; + } + + @Override + public String visit(SqlIdentifier id) { + return id.toString(); + } + } + + private static String extractTableNameFromSqlNode(SqlNode sqlNode) { + return sqlNode.accept(new SqlTableNameExtractor()); + } + + private static RelNode addQuerySizeLimit(RelNode plan, CalcitePlanContext context) { + return LogicalSystemLimit.create( + LogicalSystemLimit.SystemLimitType.QUERY_SIZE_LIMIT, + plan, + context.relBuilder.literal(context.sysLimit.querySizeLimit())); + } + + private ResponseListener createQueryListener( + QueryType queryType, ActionListener transportListener) { + ResponseFormatter formatter = new SimpleJsonResponseFormatter(PRETTY); + return new ResponseListener() { + @Override + public void onResponse(QueryResponse response) { + LangSpec langSpec = queryType == QueryType.PPL ? PPL_SPEC : LangSpec.SQL_SPEC; + String result = + formatter.format( + new QueryResult( + response.getSchema(), response.getResults(), response.getCursor(), langSpec)); + transportListener.onResponse(new TransportPPLQueryResponse(result)); + } + + @Override + public void onFailure(Exception e) { + transportListener.onFailure(e); + } + }; + } + + private static Runnable withCurrentContext(final Runnable task) { + final Map currentContext = ThreadContext.getImmutableContext(); + return () -> { + ThreadContext.putAll(currentContext); + task.run(); + }; + } +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java index ef10aaca451..95f5b014ef0 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java @@ -13,9 +13,11 @@ import java.util.Locale; import java.util.Optional; import java.util.function.Supplier; +import org.apache.calcite.rel.RelNode; import org.opensearch.action.ActionRequest; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.analytics.exec.QueryPlanExecutor; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Guice; import org.opensearch.common.inject.Inject; @@ -28,6 +30,7 @@ import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.QueryType; import org.opensearch.sql.legacy.metrics.MetricName; import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.monitor.profile.QueryProfiling; @@ -35,6 +38,8 @@ import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.sql.plugin.config.EngineExtensionsHolder; import org.opensearch.sql.plugin.config.OpenSearchPluginModule; +import org.opensearch.sql.plugin.rest.AnalyticsExecutorHolder; +import org.opensearch.sql.plugin.rest.RestUnifiedQueryAction; import org.opensearch.sql.ppl.PPLService; import org.opensearch.sql.ppl.domain.PPLQueryRequest; import org.opensearch.sql.protocol.response.QueryResult; @@ -58,7 +63,13 @@ public class TransportPPLQueryAction private final Supplier pplEnabled; - /** Constructor of TransportPPLQueryAction. */ + /** Null when analytics-engine plugin is absent; set via {@link #setQueryPlanExecutor}. */ + private volatile RestUnifiedQueryAction unifiedQueryHandler; + + private final NodeClient clientRef; + private final ClusterService clusterServiceRef; + private final org.opensearch.sql.common.setting.Settings pluginSettingsRef; + @Inject public TransportPPLQueryAction( TransportService transportService, @@ -69,14 +80,18 @@ public TransportPPLQueryAction( org.opensearch.common.settings.Settings clusterSettings, EngineExtensionsHolder extensionsHolder) { super(PPLQueryAction.NAME, transportService, actionFilters, TransportPPLQueryRequest::new); + this.clientRef = client; + this.clusterServiceRef = clusterService; ModulesBuilder modules = new ModulesBuilder(); modules.add(new OpenSearchPluginModule(extensionsHolder.engines())); + org.opensearch.sql.common.setting.Settings pluginSettings = + new OpenSearchSettings(clusterService.getClusterSettings()); + this.pluginSettingsRef = pluginSettings; modules.add( b -> { b.bind(NodeClient.class).toInstance(client); - b.bind(org.opensearch.sql.common.setting.Settings.class) - .toInstance(new OpenSearchSettings(clusterService.getClusterSettings())); + b.bind(org.opensearch.sql.common.setting.Settings.class).toInstance(pluginSettings); b.bind(DataSourceService.class).toInstance(dataSourceService); }); this.injector = Guice.createInjector(modules); @@ -89,6 +104,16 @@ public TransportPPLQueryAction( .getSettingValue(Settings.Key.PPL_ENABLED); } + /** Invoked by Guice iff analytics-engine bound {@code QueryPlanExecutor}. */ + @Inject(optional = true) + public void setQueryPlanExecutor( + QueryPlanExecutor> queryPlanExecutor) { + AnalyticsExecutorHolder.set(queryPlanExecutor); + this.unifiedQueryHandler = + new RestUnifiedQueryAction( + clientRef, clusterServiceRef, queryPlanExecutor, pluginSettingsRef); + } + /** * {@inheritDoc} Transform the request and call super.doExecute() to support call from other * plugins. @@ -120,12 +145,32 @@ protected void doExecute( QueryContext.addRequestId(); - PPLService pplService = injector.getInstance(PPLService.class); // in order to use PPL service, we need to convert TransportPPLQueryRequest to PPLQueryRequest PPLQueryRequest transformedRequest = transportRequest.toPPLQueryRequest(); QueryContext.setProfile(transformedRequest.profile()); ActionListener clearingListener = wrapWithProfilingClear(listener); + // Route to analytics engine for non-Lucene (e.g., Parquet-backed) indices. + if (unifiedQueryHandler != null + && unifiedQueryHandler.isAnalyticsIndex(transformedRequest.getRequest(), QueryType.PPL)) { + if (transformedRequest.isExplainRequest()) { + unifiedQueryHandler.explain( + transformedRequest.getRequest(), + QueryType.PPL, + transformedRequest.mode(), + createExplainResponseListener(transformedRequest, clearingListener)); + } else { + unifiedQueryHandler.execute( + transformedRequest.getRequest(), + QueryType.PPL, + transformedRequest.profile(), + clearingListener); + } + return; + } + + PPLService pplService = injector.getInstance(PPLService.class); + if (transformedRequest.isExplainRequest()) { pplService.explain( transformedRequest, createExplainResponseListener(transformedRequest, clearingListener)); diff --git a/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java b/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java new file mode 100644 index 00000000000..c8012660369 --- /dev/null +++ b/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java @@ -0,0 +1,103 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.rest; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.calcite.rel.RelNode; +import org.junit.Before; +import org.junit.Test; +import org.opensearch.analytics.exec.QueryPlanExecutor; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexSettings; +import org.opensearch.sql.executor.QueryType; +import org.opensearch.transport.client.node.NodeClient; + +/** + * Tests for analytics index routing in RestUnifiedQueryAction. Routing requires both {@code + * index.pluggable.dataformat.enabled=true} and {@code index.pluggable.dataformat=composite}. + */ +public class RestUnifiedQueryActionTest { + + private ClusterService clusterService; + private Metadata metadata; + private RestUnifiedQueryAction action; + + @Before + public void setUp() { + clusterService = mock(ClusterService.class); + ClusterState clusterState = mock(ClusterState.class); + metadata = mock(Metadata.class); + when(clusterService.state()).thenReturn(clusterState); + when(clusterState.metadata()).thenReturn(metadata); + + @SuppressWarnings("unchecked") + QueryPlanExecutor> executor = mock(QueryPlanExecutor.class); + action = + new RestUnifiedQueryAction( + mock(NodeClient.class), + clusterService, + executor, + mock(org.opensearch.sql.common.setting.Settings.class)); + } + + @Test + public void pluggableDataformatIndexRoutesToAnalytics() { + registerIndex( + "parquet_logs", + Settings.builder() + .put(IndexSettings.PLUGGABLE_DATAFORMAT_ENABLED_SETTING.getKey(), true) + .put(IndexSettings.PLUGGABLE_DATAFORMAT_VALUE_SETTING.getKey(), "composite") + .build()); + + assertTrue(action.isAnalyticsIndex("source = parquet_logs | fields ts", QueryType.PPL)); + assertTrue( + action.isAnalyticsIndex("source = opensearch.parquet_logs | fields ts", QueryType.PPL)); + } + + @Test + public void pluggableEnabledButLuceneFormatRoutesToLucene() { + registerIndex( + "lucene_logs", + Settings.builder() + .put(IndexSettings.PLUGGABLE_DATAFORMAT_ENABLED_SETTING.getKey(), true) + .put(IndexSettings.PLUGGABLE_DATAFORMAT_VALUE_SETTING.getKey(), "lucene") + .build()); + + assertFalse(action.isAnalyticsIndex("source = lucene_logs | fields ts", QueryType.PPL)); + } + + @Test + public void indexWithoutSettingRoutesToLucene() { + registerIndex("plain_logs", Settings.EMPTY); + + assertFalse(action.isAnalyticsIndex("source = plain_logs | fields ts", QueryType.PPL)); + } + + @Test + public void missingIndexRoutesToLucene() { + assertFalse(action.isAnalyticsIndex("source = does_not_exist | fields ts", QueryType.PPL)); + } + + @Test + public void nullAndEmptyQueriesRouteToLucene() { + assertFalse(action.isAnalyticsIndex(null, QueryType.PPL)); + assertFalse(action.isAnalyticsIndex("", QueryType.PPL)); + } + + private void registerIndex(String name, Settings settings) { + IndexMetadata indexMetadata = mock(IndexMetadata.class); + when(indexMetadata.getSettings()).thenReturn(settings); + when(metadata.index(name)).thenReturn(indexMetadata); + } +} From 940798c1ffe0e7c79ddd937b6dcb559623ebe98c Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Mon, 11 May 2026 11:33:11 -0700 Subject: [PATCH 2/2] Address @penghuo: revert stray blank line in doctest/build.gradle MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After 'apply plugin: opensearch.testclusters', one blank line is enough — restoring the single-blank spacing to match upstream/main. Signed-off-by: Kai Huang --- doctest/build.gradle | 1 - 1 file changed, 1 deletion(-) diff --git a/doctest/build.gradle b/doctest/build.gradle index d9dca3fd5c2..d758a6de4b8 100644 --- a/doctest/build.gradle +++ b/doctest/build.gradle @@ -15,7 +15,6 @@ plugins { apply plugin: 'opensearch.testclusters' - def path = project(':').projectDir // temporary fix, because currently we are under migration to new architecture. Need to run ./gradlew run from // plugin module, and will only build ppl in it.