Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies {
testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: "${hamcrest_version}"
testImplementation group: 'org.mockito', name: 'mockito-core', version: "${mockito_version}"
testImplementation group: 'org.apache.calcite', name: 'calcite-testkit', version: '1.41.0'
testCompileOnly 'org.immutables:value-annotations:2.8.8'

testFixturesApi group: 'junit', name: 'junit', version: '4.13.2'
testFixturesApi group: 'org.hamcrest', name: 'hamcrest', version: "${hamcrest_version}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,28 @@

package org.opensearch.sql.api;

import java.util.List;
import org.antlr.v4.runtime.tree.ParseTree;
import org.apache.calcite.adapter.enumerable.EnumerableConvention;
import org.apache.calcite.config.Lex;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.Planner;
import org.apache.calcite.tools.Program;
import org.apache.calcite.tools.Programs;
import org.opensearch.sql.ast.statement.Query;
import org.opensearch.sql.ast.statement.Statement;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.calcite.CalcitePlanContext;
import org.opensearch.sql.calcite.CalciteRelNodeVisitor;
import org.opensearch.sql.common.antlr.Parser;
import org.opensearch.sql.common.antlr.SyntaxCheckException;
Expand Down Expand Up @@ -44,19 +57,25 @@ public class UnifiedQueryPlanner {
* @param context the unified query context containing CalcitePlanContext
*/
public UnifiedQueryPlanner(UnifiedQueryContext context) {
this.parser = buildQueryParser(context.getPlanContext().queryType);
this.parser =
context.getPlanContext().queryType == QueryType.SQL
? null
: buildQueryParser(context.getPlanContext().queryType);
this.context = context;
}

/**
* Parses and analyzes a query string into a Calcite logical plan (RelNode). TODO: Generate
* optimal physical plan to fully unify query execution and leverage Calcite's optimizer.
*
* @param query the raw query string in PPL or other supported syntax
* @param query the raw query string in PPL, SQL, or other supported syntax
* @return a logical plan representing the query
*/
public RelNode plan(String query) {
try {
if (context.getPlanContext().queryType == QueryType.SQL) {
return planWithCalcite(query);
}
return preserveCollation(analyze(parse(query)));
} catch (SyntaxCheckException e) {
// Re-throw syntax error without wrapping
Expand All @@ -66,6 +85,42 @@ public RelNode plan(String query) {
}
}

/**
* Optimizes a logical plan using the VolcanoPlanner with rules registered by the schema's table
* scan nodes. Adapter-specific pushdown rules (filter, project, aggregate) are applied here.
*
* @param logical the logical plan from {@link #plan(String)}
* @return an optimized plan with adapter-specific physical operators
*/
public RelNode optimize(RelNode logical) {
try {
RelTraitSet targetTraits = logical.getCluster().traitSetOf(EnumerableConvention.INSTANCE);
Program program = Programs.standard();
// Create a fresh VolcanoPlanner to avoid state conflicts with the HepPlanner from plan()
return program.run(
logical.getCluster().getPlanner(), logical, targetTraits, List.of(), List.of());
} catch (Exception e) {
throw new IllegalStateException("Failed to optimize plan", e);
}
}

private RelNode planWithCalcite(String query) throws Exception {
CalcitePlanContext planContext = context.getPlanContext();
FrameworkConfig sqlConfig =
Frameworks.newConfigBuilder(planContext.config)
.parserConfig(SqlParser.config().withLex(Lex.JAVA))
.build();
Planner planner = Frameworks.getPlanner(sqlConfig);
try {
SqlNode parsed = planner.parse(query);
SqlNode validated = planner.validate(parsed);
RelRoot relRoot = planner.rel(validated);
return relRoot.rel;
} finally {
planner.close();
}
}

private Parser buildQueryParser(QueryType queryType) {
if (queryType == QueryType.PPL) {
return new PPLSyntaxParser();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ public void testMissingQueryType() {
UnifiedQueryContext.builder().catalog("opensearch", testSchema).build();
}

@Test(expected = IllegalArgumentException.class)
public void testUnsupportedQueryType() {
@Test
public void testSqlQueryType() {
UnifiedQueryContext context =
UnifiedQueryContext.builder()
.language(QueryType.SQL) // only PPL is supported for now
.language(QueryType.SQL)
.catalog("opensearch", testSchema)
.build();
new UnifiedQueryPlanner(context);
assertNotNull("SQL planner should be created", new UnifiedQueryPlanner(context));
}

@Test(expected = IllegalArgumentException.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.api;

import static org.apache.calcite.sql.type.SqlTypeName.INTEGER;
import static org.apache.calcite.sql.type.SqlTypeName.VARCHAR;
import static org.apache.calcite.test.Matchers.hasTree;
import static org.hamcrest.MatcherAssert.assertThat;

import java.util.List;
import java.util.Map;
import org.apache.calcite.adapter.enumerable.EnumerableConvention;
import org.apache.calcite.adapter.enumerable.EnumerableRel;
import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
import org.apache.calcite.adapter.enumerable.PhysType;
import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
import org.apache.calcite.linq4j.tree.Blocks;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.schema.impl.AbstractTable;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.opensearch.sql.executor.QueryType;

/**
* Demonstrates that {@link UnifiedQueryPlanner#optimize(RelNode)} triggers the VolcanoPlanner with
* adapter-specific pushdown rules. Simulates a real Analytics engine that provides a custom
* TableScan with a filter absorption rule.
*/
public class UnifiedQueryPlannerOptimizeTest {

private UnifiedQueryContext context;
private UnifiedQueryPlanner planner;

@Before
public void setUp() {
AbstractSchema testSchema =
new AbstractSchema() {
@Override
protected Map<String, Table> getTableMap() {
return Map.of("test_table", new EngineTable());
}
};
context =
UnifiedQueryContext.builder()
.language(QueryType.SQL)
.catalog("catalog", testSchema)
.build();
planner = new UnifiedQueryPlanner(context);
}

@After
public void tearDown() throws Exception {
if (context != null) {
context.close();
}
}

@Test
public void optimizePushesFilterIntoEngineScan() {
RelNode logical = planner.plan("SELECT name FROM catalog.test_table WHERE id > 10");

// Before: EngineTableScan (from TranslatableTable.toRel) with LogicalFilter on top
assertThat(
logical,
hasTree(
"""
LogicalProject(name=[$1])
LogicalFilter(condition=[>($0, 10)])
EngineTableScan(table=[[catalog, test_table]])
"""));

// After: filter absorbed into EngineTableScan by the pushdown rule
RelNode optimized = planner.optimize(logical);
assertThat(
optimized,
hasTree(
"""
EnumerableCalc(expr#0..1=[{inputs}], name=[$t1])
EngineTableScan(table=[[catalog, test_table]], filter=[>($0, 10)])
"""));
}

// --- Simulated Analytics engine adapter ---

/** Table that produces EngineTableScan via toRel(). */
static class EngineTable extends AbstractTable
implements org.apache.calcite.schema.TranslatableTable {
@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
return typeFactory.builder().add("id", INTEGER).add("name", VARCHAR).build();
}

@Override
public RelNode toRel(RelOptTable.ToRelContext ctx, RelOptTable table) {
return new EngineTableScan(ctx.getCluster(), table, null);
}
}

/** Engine-specific scan node with filter absorption. */
static class EngineTableScan extends TableScan implements EnumerableRel {
private final RexNode filter;

EngineTableScan(RelOptCluster cluster, RelOptTable table, RexNode filter) {
super(cluster, cluster.traitSetOf(EnumerableConvention.INSTANCE), List.of(), table);
this.filter = filter;
}

@Override
public void register(RelOptPlanner planner) {
planner.addRule(EngineFilterAbsorptionRule.INSTANCE);
}

@Override
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
return new EngineTableScan(getCluster(), getTable(), filter);
}

@Override
public RelWriter explainTerms(RelWriter pw) {
return super.explainTerms(pw).itemIf("filter", filter, filter != null);
}

@Override
public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
PhysType physType =
PhysTypeImpl.of(implementor.getTypeFactory(), getRowType(), pref.preferArray());
return implementor.result(
physType, Blocks.toBlock(Expressions.call(Expressions.constant(List.of()), "iterator")));
}
}

/** Absorbs LogicalFilter into EngineTableScan. */
static class EngineFilterAbsorptionRule extends RelOptRule {
static final EngineFilterAbsorptionRule INSTANCE = new EngineFilterAbsorptionRule();

EngineFilterAbsorptionRule() {
super(operand(LogicalFilter.class, operand(EngineTableScan.class, none())));
}

@Override
public void onMatch(RelOptRuleCall call) {
LogicalFilter filter = call.rel(0);
EngineTableScan scan = call.rel(1);
call.transformTo(
new EngineTableScan(scan.getCluster(), scan.getTable(), filter.getCondition()));
}
}
}
Loading
Loading