Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -220,13 +220,14 @@ public UnifiedQueryContext build() {
buildFrameworkConfig(langSpec), SysLimit.fromSettings(settings), queryType);
QueryProfiling.activate(profiling);
return new UnifiedQueryContext(
planContext, settings, createParser(planContext, settings), langSpec);
planContext, settings, createParser(planContext, settings, langSpec), langSpec);
}

private UnifiedQueryParser<?> createParser(CalcitePlanContext planContext, Settings settings) {
private UnifiedQueryParser<?> createParser(
CalcitePlanContext planContext, Settings settings, LanguageSpec langSpec) {
return switch (queryType) {
case PPL -> new PPLQueryParser(settings);
case SQL -> new CalciteSqlQueryParser(planContext);
case SQL -> new CalciteSqlQueryParser(planContext, langSpec.postParseRules());
};
}

Expand Down
137 changes: 117 additions & 20 deletions api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,38 @@

import static org.opensearch.sql.monitor.profile.MetricName.ANALYZE;

import lombok.RequiredArgsConstructor;
import java.util.Properties;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.config.CalciteConnectionConfigImpl;
import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.ConventionTraitDef;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptTable.ViewExpander;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.volcano.VolcanoPlanner;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.prepare.CalciteSqlValidator;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.util.SqlVisitor;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.Planner;
import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.util.SqlOperatorTables;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql2rel.RelDecorrelator;
import org.apache.calcite.sql2rel.SqlToRelConverter;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.RelBuilder;
import org.opensearch.sql.api.parser.UnifiedQueryParser;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.calcite.CalciteRelNodeVisitor;
Expand Down Expand Up @@ -73,31 +93,108 @@ private interface PlanningStrategy {
RelNode plan(String query) throws Exception;
}

/** ANSI SQL planning using Calcite's native SqlParser → SqlValidator → SqlToRelConverter. */
@RequiredArgsConstructor
/**
* SQL planning using a custom validate+convert pipeline. Consumes pre-parsed SqlNode from {@link
* UnifiedQueryParser} and applies validation with a conformance that disables non-strict GROUP BY
* (preventing unwanted ANY_VALUE wrapping and NPE on CASE expressions).
*/
private static class CalciteNativeStrategy implements PlanningStrategy {
private final UnifiedQueryContext context;
private final UnifiedQueryParser<SqlNode> parser;

@SuppressWarnings("unchecked")
CalciteNativeStrategy(UnifiedQueryContext context) {
this.context = context;
this.parser = (UnifiedQueryParser<SqlNode>) context.getParser();
}

@Override
public RelNode plan(String query) throws Exception {
try (Planner planner = Frameworks.getPlanner(context.getPlanContext().config)) {
SqlNode parsed = planner.parse(query);
if (!parsed.isA(SqlKind.QUERY)) {
throw new UnsupportedOperationException(
"Only query statements are supported. Got: " + parsed.getKind());
}
SqlNode parsed = parser.parse(query);
if (!parsed.isA(SqlKind.QUERY)) {
throw new UnsupportedOperationException(
"Only query statements are supported. Got: " + parsed.getKind());
}
return validateAndConvert(parsed);
}

// TODO: move post-parse rewriting into CalciteSqlQueryParser
SqlNode rewritten = parsed;
for (SqlVisitor<SqlNode> visitor : context.getLangSpec().postParseRules()) {
rewritten = rewritten.accept(visitor);
}
private RelNode validateAndConvert(SqlNode parsed) {
FrameworkConfig config = context.getPlanContext().config;

SchemaPlus defaultSchema = config.getDefaultSchema();
SchemaPlus rootSchema = rootSchema(defaultSchema);
JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(config.getTypeSystem());

CalciteConnectionConfigImpl connectionConfig = buildConnectionConfig(config);
CalciteCatalogReader catalogReader =
new CalciteCatalogReader(
CalciteSchema.from(rootSchema),
CalciteSchema.from(defaultSchema).path(null),
typeFactory,
connectionConfig);

SqlOperatorTable opTab = SqlOperatorTables.chain(config.getOperatorTable(), catalogReader);
SqlValidator validator =
new CalciteSqlValidator(
opTab,
catalogReader,
typeFactory,
config
.getSqlValidatorConfig()
.withDefaultNullCollation(connectionConfig.defaultNullCollation())
.withLenientOperatorLookup(connectionConfig.lenientOperatorLookup())
.withIdentifierExpansion(true));

SqlNode validated = validator.validate(parsed);

RelOptPlanner planner = new VolcanoPlanner(config.getCostFactory(), Contexts.empty());
RelOptUtil.registerDefaultRules(planner, false, false);
planner.addRelTraitDef(ConventionTraitDef.INSTANCE);

RexBuilder rexBuilder = new RexBuilder(typeFactory);
RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder);

SqlToRelConverter.Config converterConfig =
config.getSqlToRelConverterConfig().withTrimUnusedFields(false);
SqlToRelConverter sqlToRelConverter =
new SqlToRelConverter(
NOOP_VIEW_EXPANDER,
validator,
catalogReader,
cluster,
config.getConvertletTable(),
converterConfig);

RelRoot root = sqlToRelConverter.convertQuery(validated, false, true);
root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true));

RelBuilder relBuilder = converterConfig.getRelBuilderFactory().create(cluster, null);
root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel, relBuilder));
return root.project();
}

SqlNode validated = planner.validate(rewritten);
RelRoot relRoot = planner.rel(validated);
return relRoot.project();
private static CalciteConnectionConfigImpl buildConnectionConfig(FrameworkConfig config) {
Properties props = new Properties();
props.setProperty(
CalciteConnectionProperty.CASE_SENSITIVE.camelName(),
String.valueOf(config.getParserConfig().caseSensitive()));
return new CalciteConnectionConfigImpl(props);
}

private static SchemaPlus rootSchema(SchemaPlus schema) {
for (; ; ) {
SchemaPlus parent = schema.getParentSchema();
if (parent == null) {
return schema;
}
schema = parent;
}
}

private static final ViewExpander NOOP_VIEW_EXPANDER =
(rowType, queryString, schemaPath, viewPath) -> {
throw new UnsupportedOperationException("Views not supported");
};
}

/** AST-based planning via context-owned parser → UnresolvedPlan → CalciteRelNodeVisitor. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,39 @@

package org.opensearch.sql.api.parser;

import java.util.List;
import lombok.RequiredArgsConstructor;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.util.SqlVisitor;
import org.opensearch.sql.calcite.CalcitePlanContext;
import org.opensearch.sql.common.antlr.SyntaxCheckException;

/** Calcite SQL query parser that produces {@link SqlNode} as the native parse result. */
/**
* Calcite SQL query parser that produces {@link SqlNode} as the native parse result. Applies
* post-parse rewrite rules (e.g., named argument normalization) before returning.
*/
@RequiredArgsConstructor
public class CalciteSqlQueryParser implements UnifiedQueryParser<SqlNode> {

/** Calcite plan context providing parser configuration (e.g., case sensitivity, conformance). */
private final CalcitePlanContext planContext;

/** Post-parse rewrite rules applied after parsing and before validation. */
private final List<SqlVisitor<SqlNode>> postParseRules;

@Override
public SqlNode parse(String query) {
try {
SqlParser parser = SqlParser.create(query, planContext.config.getParserConfig());
return parser.parseQuery();
SqlNode parsed = parser.parseQuery();

SqlNode result = parsed;
for (SqlVisitor<SqlNode> visitor : postParseRules) {
result = result.accept(visitor);
}
return result;
} catch (SqlParseException e) {
throw new SyntaxCheckException("Failed to parse SQL query: " + e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.parser.SqlParserImplFactory;
import org.apache.calcite.sql.parser.babel.SqlBabelParserImpl;
import org.apache.calcite.sql.validate.SqlConformance;
import org.apache.calcite.sql.validate.SqlConformanceEnum;
import org.apache.calcite.sql.validate.SqlDelegatingConformance;
import org.apache.calcite.sql.validate.SqlValidator;
import org.opensearch.sql.api.spec.search.SearchExtension;

Expand All @@ -29,6 +31,19 @@
@Accessors(fluent = true)
public class UnifiedSqlSpec implements LanguageSpec {

/**
* BABEL conformance with strict GROUP BY validation. Delegates all behavior to BABEL except
* isNonStrictGroupBy which returns false — preventing the validator from wrapping non-grouped
* expressions in ANY_VALUE and avoiding a NPE on CASE expressions in GROUP BY.
*/
private static final SqlConformance BABEL_STRICT_GROUP_BY =
new SqlDelegatingConformance(SqlConformanceEnum.BABEL) {
@Override
public boolean isNonStrictGroupBy() {
return false;
}
};

/** Lexical rules: identifier quoting, character escaping, and special identifier support. */
private final Lex lex;

Expand Down Expand Up @@ -63,6 +78,6 @@ public SqlParser.Config parserConfig() {

@Override
public SqlValidator.Config validatorConfig() {
return SqlValidator.Config.DEFAULT.withConformance(conformance);
return SqlValidator.Config.DEFAULT.withConformance(BABEL_STRICT_GROUP_BY);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.junit.Test;
import org.opensearch.sql.common.antlr.SyntaxCheckException;
import org.opensearch.sql.executor.QueryType;

public class UnifiedQueryPlannerSqlTest extends UnifiedQueryTestBase {
Expand Down Expand Up @@ -210,7 +211,7 @@ public void testSqlQueryPlanningWithMultipleCatalogs() {

@Test
public void testInvalidSqlThrowsException() {
assertThrows(IllegalStateException.class, () -> planner.plan("SELECT FROM"));
assertThrows(SyntaxCheckException.class, () -> planner.plan("SELECT FROM"));
}

@Test
Expand Down Expand Up @@ -257,4 +258,30 @@ SELECT department, count(*)
""")
.assertErrorMessage("Encountered");
}

@Test
public void testGroupByOrdinalNoAnyValueWrapping() {
givenQuery(
"""
SELECT 1, department, COUNT(*) AS c
FROM catalog.employees
GROUP BY 1, department
ORDER BY c DESC\
""")
.assertPlanContains("LogicalAggregate(group=[{0, 1}], c=[COUNT()])")
.assertFields("EXPR$0", "department", "c");
}

@Test
public void testGroupByWithCaseExpression() {
givenQuery(
"""
SELECT CASE WHEN age > 30 THEN 'senior' ELSE 'junior' END AS seniority,
COUNT(*) AS cnt
FROM catalog.employees
GROUP BY seniority\
""")
.assertPlanContains("LogicalAggregate")
.assertFields("seniority", "cnt");
}
}
Loading