Skip to content

[Analytics Engine] Wire window-function support for PPL top / rare#21593

Open
ahkcs wants to merge 1 commit into
opensearch-project:mainfrom
ahkcs:feature/toprare-analytics-verify
Open

[Analytics Engine] Wire window-function support for PPL top / rare#21593
ahkcs wants to merge 1 commit into
opensearch-project:mainfrom
ahkcs:feature/toprare-analytics-verify

Conversation

@ahkcs
Copy link
Copy Markdown
Contributor

@ahkcs ahkcs commented May 10, 2026

Description

PPL top and rare lower (via CalciteRelNodeVisitor#visitRareTopN in opensearch-project/sql) to a LogicalProject containing
ROW_NUMBER() OVER (PARTITION BY ... ORDER BY ...). On the analytics-engine route, OpenSearchProjectRule.annotateExpr treated the RexOver as an ordinary RexCall and fell into the scalar-function viability path. That path keys on ScalarFunction.fromSqlOperatorWithFallback, which returns null for window-only aggregates like ROW_NUMBER, so every top / rare query died with No backend supports scalar function [ROW_NUMBER] among [datafusion] before substrait emission.

This PR adds EngineCapability.WINDOW and detects RexOver ahead of the standard RexCall branch in OpenSearchProjectRule.annotateExpr. When the child's viable backends include a WINDOW-capable backend, the call is wrapped in an AnnotatedProjectExpression exactly like other annotated calls — the existing strip path's OperatorAnnotation::unwrap returns the original RexOver unchanged, so isthmus's RexExpressionConverter#visitOver emits an inline substrait WindowFunctionInvocation. The substrait standard catalog already binds ROW_NUMBER, so DataFusion's substrait consumer decodes it natively — no separate substrait Window rel needed.

WINDOW is intentionally coarse (one boolean per backend rather than a per-window-function WindowCapability). The substrait standard catalog already constrains which window aggregates the backend's substrait consumer can decode; a runtime decode failure is a clearer error than a duplicated registry split between SPI and backend. The existing AggregateFunction / AggregateCapability model remains right for ordinary aggregates; once a second backend with different window support lands, or a window-only function carries a non-standard call shape, WINDOW can be promoted to a per-function class without disturbing the planner-side annotation flow.

Pass-rate impact

CalciteTopCommandIT / CalciteRareCommandIT against tests.analytics.force_routing=true -Dtests.analytics.parquet_indices=true on the SQL plugin's integTestRemote:

IT Before After (this PR alone) After (this PR + companion opensearch-project/sql#5433)
CalciteTopCommandIT 0 / 6 5 / 6 6 / 6
CalciteRareCommandIT 0 / 5 3 / 5 5 / 5
combined 0 / 11 8 / 11 11 / 11

The 3-failure delta in the middle column comes entirely from pre-existing analytics-route gaps unrelated to window-function wiring (see below); they are closed by the companion SQL-plugin PR.

Out-of-scope follow-ups (resolved by companion PR)

The 3 failures left after this PR are not specific to top / rare — they are pre-existing analytics-route gaps that already affect other commands. All 3 pass on the in-process Calcite path with the same query strings. They are closed by the companion SQL-plugin PR opensearch-project/sql#5433:

  1. testTopCommandLegacyFalse / testRareCommandLegacyFalseRestUnifiedQueryAction.applyClusterOverrides (in opensearch-project/sql) only forwarded PPL_REX_MAX_MATCH_LIMIT to UnifiedQueryContext. withSettings(PPL_SYNTAX_LEGACY_PREFERRED, "false") updated the cluster setting but the analytics-engine route's PPLQueryParser reads it from the unified context, which the override builder never populated. The companion PR refactors the override builder into a forwardClusterSetting helper and forwards PPL_SYNTAX_LEGACY_PREFERRED alongside the rex-limit key.
  2. testRareWithGroup — multiple states tied at the bucket count and ROW_NUMBER picked one based on insertion order, while the test expected a specific tie-broken value (same class as testStatsSortOnMeasure). The companion PR appends the rare/top field columns as secondary ASC keys to the ROW_NUMBER ORDER BY in CalciteRelNodeVisitor#visitRareTopN, so ties resolve deterministically across backends. This matches the OpenSearch terms-aggregation pushdown's existing _key:asc tie-break — wire payload unchanged.

Tests

ProjectRuleTests adds three regressions on the new code path:

  • testRowNumberOverPartitionByOrderByMarksAsWindow — happy path
  • testRowNumberWithoutWindowCapabilityErrors — capability-gap error message names the window function
  • testStripAnnotationsPreservesRexOver — strip path returns plain LogicalProject(RexOver) so isthmus can decode it

MockDataFusionBackend declares EngineCapability.WINDOW so existing project-rule tests continue to exercise the same backend gate.

Verification

# 1. publishToMavenLocal in this checkout (sandbox.enabled=true)
# 2. publishToMavenLocal in opensearch-project/sql
# 3. ./gradlew :run -Dsandbox.enabled=true -PinstalledPlugins="['opensearch-job-scheduler:3.7.0.0-SNAPSHOT', 'arrow-flight-rpc', 'analytics-engine', 'parquet-data-format', 'analytics-backend-datafusion', 'analytics-backend-lucene', 'composite-engine', 'opensearch-sql-plugin:3.7.0.0-SNAPSHOT']"
# 4. ./gradlew :integ-test:integTestRemote \
#      -Dtests.rest.cluster=localhost:9200 \
#      -Dtests.cluster=localhost:9300 \
#      -Dtests.clustername=runTask \
#      -Dtests.analytics.force_routing=true \
#      -Dtests.analytics.parquet_indices=true \
#      --tests "org.opensearch.sql.calcite.remote.CalciteTopCommandIT" \
#      --tests "org.opensearch.sql.calcite.remote.CalciteRareCommandIT"

Sandbox-wide ./gradlew check -p sandbox -Dsandbox.enabled=true is green.

Issues Resolved

N/A — partial parity with PPL command coverage on the analytics-engine route. Closes the remaining 3 out-of-scope top/rare gaps together with the companion SQL-plugin PR opensearch-project/sql#5433.

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. For more information on following Developer Certificate of Origin and signing off your commits, please check here.

PPL `top` and `rare` lower (via `CalciteRelNodeVisitor#visitRareTopN`) to a
`LogicalProject` containing
`ROW_NUMBER() OVER (PARTITION BY ... ORDER BY ...)`. On the analytics-engine
route, `OpenSearchProjectRule.annotateExpr` treated the `RexOver` as an
ordinary `RexCall` and fell into the scalar-function viability path. That
path keys on `ScalarFunction.fromSqlOperatorWithFallback`, which returns
null for window-only aggregates like `ROW_NUMBER`, so every `top` / `rare`
query died with "No backend supports scalar function [ROW_NUMBER] among
[datafusion]" before substrait emission.

Add `EngineCapability.WINDOW` and detect `RexOver` ahead of the standard
RexCall branch in `OpenSearchProjectRule#annotateExpr`. When the child's
viable backends include a WINDOW-capable backend, wrap the call in an
`AnnotatedProjectExpression` exactly like other annotated calls — the
existing strip path's `OperatorAnnotation::unwrap` returns the original
`RexOver` unchanged, so isthmus's `RexExpressionConverter#visitOver` emits
an inline substrait `WindowFunctionInvocation` (the substrait standard
catalog already binds `ROW_NUMBER`, so DataFusion's substrait consumer
decodes it natively — no separate substrait Window rel needed).

WINDOW is intentionally coarse (one boolean per backend rather than a
per-window-function `WindowCapability`). The substrait standard catalog
already constrains which window aggregates the backend's substrait
consumer can decode; a runtime decode failure is a clearer error than a
duplicated registry split between SPI and backend. The existing
`AggregateFunction` / `AggregateCapability` model remains right for
ordinary aggregates; once a second backend with different window support
lands, or a window-only function carries a non-standard call shape,
WINDOW can be promoted to a per-function class without disturbing the
planner-side annotation flow.

Result on `CalciteTopCommandIT` / `CalciteRareCommandIT` against the
force-routed analytics-engine path:
- `CalciteTopCommandIT`:  0/6 → 5/6 (only legacy-preferred test out-of-scope)
- `CalciteRareCommandIT`: 0/5 → 3/5 (legacy-preferred + sort-tie out-of-scope)
- combined: 0/11 → 8/11

Tests added in `ProjectRuleTests`:
- `testRowNumberOverPartitionByOrderByMarksAsWindow` — happy path
- `testRowNumberWithoutWindowCapabilityErrors` — capability-gap message
  names the window function
- `testStripAnnotationsPreservesRexOver` — strip path returns plain
  `LogicalProject(RexOver)` so isthmus can decode it

Signed-off-by: Kai Huang <ahkcs@amazon.com>
@ahkcs ahkcs requested a review from a team as a code owner May 10, 2026 19:36
@github-actions
Copy link
Copy Markdown
Contributor

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Possible Issue

The annotateWindowCall method does not recursively annotate operands within the RexOver (partition keys, order keys, or aggregate arguments). If these operands contain nested RexCall expressions requiring backend-specific scalar functions, they will remain unannotated. This could cause substrait emission or backend execution to fail when the window function references a scalar function that only some backends support. The comment at line 159 claims "RexInputRef / RexLiteral" pass through unannotated, but RexOver can contain arbitrary RexCall operands (e.g., ROW_NUMBER() OVER (PARTITION BY UPPER(field))).

private RexNode annotateWindowCall(RexOver rexOver, List<String> childViableBackends) {
    CapabilityRegistry registry = context.getCapabilityRegistry();
    List<String> windowCapable = registry.operatorBackends(EngineCapability.WINDOW);
    List<String> viable = new ArrayList<>();
    for (String candidate : childViableBackends) {
        if (windowCapable.contains(candidate)) {
            viable.add(candidate);
        }
    }
    if (viable.isEmpty()) {
        throw new IllegalStateException(
            "No backend supports window function [" + rexOver.getAggOperator().getName() + "] among " + childViableBackends
        );
    }
    return new AnnotatedProjectExpression(rexOver.getType(), rexOver, viable, context.nextAnnotationId());
}

@github-actions
Copy link
Copy Markdown
Contributor

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Add null check for operator

The method calls getAggOperator() on rexOver without null-checking. If
rexOver.getAggOperator() returns null, calling getName() will throw a
NullPointerException. Add a null check or use a fallback value to prevent runtime
failures when constructing the error message.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/rules/OpenSearchProjectRule.java [162-177]

 private RexNode annotateWindowCall(RexOver rexOver, List<String> childViableBackends) {
     CapabilityRegistry registry = context.getCapabilityRegistry();
     List<String> windowCapable = registry.operatorBackends(EngineCapability.WINDOW);
     List<String> viable = new ArrayList<>();
     for (String candidate : childViableBackends) {
         if (windowCapable.contains(candidate)) {
             viable.add(candidate);
         }
     }
     if (viable.isEmpty()) {
+        String funcName = rexOver.getAggOperator() != null ? rexOver.getAggOperator().getName() : "UNKNOWN";
         throw new IllegalStateException(
-            "No backend supports window function [" + rexOver.getAggOperator().getName() + "] among " + childViableBackends
+            "No backend supports window function [" + funcName + "] among " + childViableBackends
         );
     }
     return new AnnotatedProjectExpression(rexOver.getType(), rexOver, viable, context.nextAnnotationId());
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion adds defensive null-checking for rexOver.getAggOperator() before calling getName(). While this prevents a potential NullPointerException, the likelihood of getAggOperator() returning null in a valid RexOver instance is low in typical Calcite usage. This is a minor defensive programming improvement rather than fixing a critical bug.

Low

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 114e8bf: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant