Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import static org.opensearch.sql.ast.dsl.AstDSL.agg;
import static org.opensearch.sql.ast.dsl.AstDSL.aggregate;
import static org.opensearch.sql.ast.dsl.AstDSL.alias;
import static org.opensearch.sql.ast.dsl.AstDSL.allFields;
import org.opensearch.sql.ast.expression.AllFieldsExcludeMeta;
import static org.opensearch.sql.ast.dsl.AstDSL.compare;
import static org.opensearch.sql.ast.dsl.AstDSL.defaultStatsArgs;
import static org.opensearch.sql.ast.dsl.AstDSL.eval;
Expand All @@ -36,7 +36,7 @@ public class UnifiedQueryParserTest extends UnifiedQueryTestBase {
public void testParseSource() {
assertEqual(
"source = catalog.employees",
project(relation(qualifiedName("catalog", "employees")), allFields()));
project(relation(qualifiedName("catalog", "employees")), AllFieldsExcludeMeta.of()));
}

@Test
Expand All @@ -47,7 +47,7 @@ public void testParseFilter() {
filter(
relation(qualifiedName("catalog", "employees")),
compare(">", field("age"), intLiteral(30))),
allFields()));
AllFieldsExcludeMeta.of()));
}

@Test
Expand All @@ -58,7 +58,7 @@ public void testParseEval() {
eval(
relation(qualifiedName("catalog", "employees")),
let(field("f"), function("abs", field("id")))),
allFields()));
AllFieldsExcludeMeta.of()));
}

@Test
Expand All @@ -72,7 +72,7 @@ public void testParseStats() {
emptyList(),
exprList(alias("department", field("department"))),
defaultStatsArgs()),
allFields()));
AllFieldsExcludeMeta.of()));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class Query extends Statement {
protected final UnresolvedPlan plan;
protected final int fetchSize;
private final QueryType queryType;
private final boolean includeMetadata;
private HighlightConfig highlightConfig;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ public class CalcitePlanContext {
*/
@Getter @Setter private boolean isProjectVisited = false;

/**
* Whether to include metadata fields like _id, _index, _score in the result. When true, metadata
* fields are included in wildcard field selections. When false (default), metadata fields are
* excluded.
*/
@Getter @Setter private boolean includeMetadata = false;

private final Stack<RexCorrelVariable> correlVar = new Stack<>();
private final Stack<List<RexNode>> windowPartitions = new Stack<>();

Expand Down Expand Up @@ -99,6 +106,7 @@ private CalcitePlanContext(CalcitePlanContext parent) {
this.rexBuilder = parent.rexBuilder; // Share the same rexBuilder
this.functionProperties = parent.functionProperties;
this.highlightConfig = parent.highlightConfig;
this.includeMetadata = parent.includeMetadata; // Preserve parent's metadata setting
this.rexLambdaRefMap = new HashMap<>(); // New map for lambda variables
this.capturedVariables = new ArrayList<>(); // New list for captured variables
this.inLambdaContext = true; // Mark that we're inside a lambda
Expand Down Expand Up @@ -147,6 +155,13 @@ public static CalcitePlanContext create(
return new CalcitePlanContext(config, sysLimit, queryType);
}

public static CalcitePlanContext create(
FrameworkConfig config, SysLimit sysLimit, QueryType queryType, boolean includeMetadata) {
CalcitePlanContext context = new CalcitePlanContext(config, sysLimit, queryType);
context.setIncludeMetadata(includeMetadata);
return context;
}

/**
* Executes {@code action} with the thread-local legacy flag set according to the supplied
* settings.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,11 +493,18 @@ private RelNode handleAllFieldsProject(Project node, CalcitePlanContext context)
"Invalid field exclusion: operation would exclude all fields from the result set");
}
AllFields allFields = (AllFields) node.getProjectList().getFirst();
if (!(allFields instanceof AllFieldsExcludeMeta)) {
// Should not remove nested fields for AllFieldsExcludeMeta.

if (allFields instanceof AllFieldsExcludeMeta) {
// Do NOT remove nested fields for AllFieldsExcludeMeta
tryToRemoveMetaFields(context, true); // Force exclude metadata fields
} else {
// For AllFields (include_metadata=true), include metadata fields
tryToRemoveNestedFields(context);
// Mark as project visited to prevent automatic metadata field removal
context.setProjectVisited(true);
// Don't force exclude metadata fields - let them remain
tryToRemoveMetaFields(context, false);
}
tryToRemoveMetaFields(context, allFields instanceof AllFieldsExcludeMeta);
return context.relBuilder.peek();
}

Expand Down Expand Up @@ -645,6 +652,11 @@ private static void forceProjectExcept(RelBuilder relBuilder, Iterable<RexNode>
* @param excludeByForce whether exclude metadata fields by force
*/
private static void tryToRemoveMetaFields(CalcitePlanContext context, boolean excludeByForce) {
// If include_metadata=true, never remove metadata fields
if (context.isIncludeMetadata() && !excludeByForce) {
return;
}

if (excludeByForce || !context.isProjectVisited()) {
List<String> originalFields = context.relBuilder.peek().getRowType().getFieldNames();
List<RexNode> metaFieldsRef =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,22 @@ public void execute(
QueryType queryType,
HighlightConfig highlightConfig,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
execute(plan, queryType, highlightConfig, false, listener);
}

/** Execute with optional highlight config and include metadata flag. */
public void execute(
UnresolvedPlan plan,
QueryType queryType,
HighlightConfig highlightConfig,
boolean includeMetadata,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
if (shouldUseCalcite(queryType)) {
executeWithCalcite(plan, queryType, highlightConfig, listener);
executeWithCalcite(plan, queryType, highlightConfig, includeMetadata, listener);
} else {
// Legacy engine always includes basic metadata (schema information)
// The includeMetadata flag doesn't affect legacy engine behavior since
// it already provides column names, types, and aliases in the schema
executeWithLegacy(plan, queryType, listener, Optional.empty());
}
}
Expand All @@ -123,9 +136,22 @@ public void explain(
HighlightConfig highlightConfig,
ResponseListener<ExecutionEngine.ExplainResponse> listener,
ExplainMode mode) {
explain(plan, queryType, highlightConfig, false, listener, mode);
}

/** Explain with optional highlight config and include metadata flag. */
public void explain(
UnresolvedPlan plan,
QueryType queryType,
HighlightConfig highlightConfig,
boolean includeMetadata,
ResponseListener<ExecutionEngine.ExplainResponse> listener,
ExplainMode mode) {
if (shouldUseCalcite(queryType)) {
explainWithCalcite(plan, queryType, highlightConfig, listener, mode);
explainWithCalcite(plan, queryType, highlightConfig, includeMetadata, listener, mode);
} else {
// Legacy engine provides basic explain information
// The includeMetadata flag doesn't significantly affect legacy explain behavior
explainWithLegacy(plan, queryType, listener, mode, Optional.empty());
}
}
Expand All @@ -135,6 +161,15 @@ public void executeWithCalcite(
QueryType queryType,
HighlightConfig highlightConfig,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
executeWithCalcite(plan, queryType, highlightConfig, false, listener);
}

public void executeWithCalcite(
UnresolvedPlan plan,
QueryType queryType,
HighlightConfig highlightConfig,
boolean includeMetadata,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
CalcitePlanContext.run(
() -> {
try {
Expand All @@ -144,7 +179,10 @@ public void executeWithCalcite(
long analyzeStart = System.nanoTime();
CalcitePlanContext context =
CalcitePlanContext.create(
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
buildFrameworkConfig(),
SysLimit.fromSettings(settings),
queryType,
includeMetadata);

context.setHighlightConfig(highlightConfig);

Expand Down Expand Up @@ -172,6 +210,7 @@ public void executeWithCalcite(
} catch (Throwable t) {
if (isCalciteFallbackAllowed(t) && !(t instanceof NonFallbackCalciteException)) {
log.warn("Fallback to V2 query engine since got exception", t);
// Legacy engine provides basic metadata support, so fallback is acceptable
executeWithLegacy(plan, queryType, listener, Optional.of(t));
} else {
propagateCalciteError(t, listener);
Expand All @@ -187,13 +226,26 @@ public void explainWithCalcite(
HighlightConfig highlightConfig,
ResponseListener<ExecutionEngine.ExplainResponse> listener,
ExplainMode mode) {
explainWithCalcite(plan, queryType, highlightConfig, false, listener, mode);
}

public void explainWithCalcite(
UnresolvedPlan plan,
QueryType queryType,
HighlightConfig highlightConfig,
boolean includeMetadata,
ResponseListener<ExecutionEngine.ExplainResponse> listener,
ExplainMode mode) {
CalcitePlanContext.run(
() -> {
try {
QueryProfiling.noop();
CalcitePlanContext context =
CalcitePlanContext.create(
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
buildFrameworkConfig(),
SysLimit.fromSettings(settings),
queryType,
includeMetadata);
context.setHighlightConfig(highlightConfig);
context.run(
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,16 @@ public class QueryPlan extends AbstractPlan {

protected final HighlightConfig highlightConfig;

protected final boolean includeMetadata;

/** Constructor. */
public QueryPlan(
QueryId queryId,
QueryType queryType,
UnresolvedPlan plan,
QueryService queryService,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
this(queryId, queryType, plan, queryService, listener, null);
this(queryId, queryType, plan, queryService, listener, null, false);
}

/** Constructor with highlight config. */
Expand All @@ -50,12 +52,25 @@ public QueryPlan(
QueryService queryService,
ResponseListener<ExecutionEngine.QueryResponse> listener,
HighlightConfig highlightConfig) {
this(queryId, queryType, plan, queryService, listener, highlightConfig, false);
}

/** Constructor with highlight config and include metadata flag. */
public QueryPlan(
QueryId queryId,
QueryType queryType,
UnresolvedPlan plan,
QueryService queryService,
ResponseListener<ExecutionEngine.QueryResponse> listener,
HighlightConfig highlightConfig,
boolean includeMetadata) {
super(queryId, queryType);
this.plan = plan;
this.queryService = queryService;
this.listener = listener;
this.pageSize = Optional.empty();
this.highlightConfig = highlightConfig;
this.includeMetadata = includeMetadata;
}

/** Constructor with page size. */
Expand All @@ -66,20 +81,33 @@ public QueryPlan(
int pageSize,
QueryService queryService,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
this(queryId, queryType, plan, pageSize, queryService, listener, false);
}

/** Constructor with page size and include metadata flag. */
public QueryPlan(
QueryId queryId,
QueryType queryType,
UnresolvedPlan plan,
int pageSize,
QueryService queryService,
ResponseListener<ExecutionEngine.QueryResponse> listener,
boolean includeMetadata) {
super(queryId, queryType);
this.plan = plan;
this.queryService = queryService;
this.listener = listener;
this.pageSize = Optional.of(pageSize);
this.highlightConfig = null;
this.includeMetadata = includeMetadata;
}

@Override
public void execute() {
if (pageSize.isPresent()) {
queryService.execute(new Paginate(pageSize.get(), plan), getQueryType(), listener);
queryService.execute(new Paginate(pageSize.get(), plan), getQueryType(), highlightConfig, includeMetadata, listener);
} else {
queryService.execute(plan, getQueryType(), highlightConfig, listener);
queryService.execute(plan, getQueryType(), highlightConfig, includeMetadata, listener);
}
}

Expand All @@ -91,7 +119,7 @@ public void explain(
new NotImplementedException(
"`explain` feature for paginated requests is not implemented yet."));
} else {
queryService.explain(plan, getQueryType(), highlightConfig, listener, mode);
queryService.explain(plan, getQueryType(), highlightConfig, includeMetadata, listener, mode);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ public AbstractPlan visitQuery(
node.getPlan(),
node.getFetchSize(),
queryService,
context.getLeft());
context.getLeft(),
node.isIncludeMetadata());
} else {
// This should be picked up by the legacy engine.
throw new UnsupportedCursorRequestException();
Expand All @@ -127,7 +128,8 @@ public AbstractPlan visitQuery(
node.getPlan(),
queryService,
context.getLeft(),
node.getHighlightConfig());
node.getHighlightConfig(),
node.isIncludeMetadata());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ void init() {

@Test
public void create_from_query_should_success() {
Statement query = new Query(plan, 0, queryType);
Statement query = new Query(plan, 0, queryType, false);
AbstractPlan queryExecution = factory.create(query, queryListener, explainListener);
assertTrue(queryExecution instanceof QueryPlan);
}

@Test
public void create_from_explain_should_success() {
Statement query = new Explain(new Query(plan, 0, queryType), queryType);
Statement query = new Explain(new Query(plan, 0, queryType, false), queryType);
AbstractPlan queryExecution = factory.create(query, queryListener, explainListener);
assertTrue(queryExecution instanceof ExplainPlan);
}
Expand Down Expand Up @@ -103,7 +103,7 @@ public void no_consumer_response_channel() {
public void create_query_with_fetch_size_which_can_be_paged() {
when(plan.accept(any(CanPaginateVisitor.class), any())).thenReturn(Boolean.TRUE);
factory = new QueryPlanFactory(queryService);
Statement query = new Query(plan, 10, queryType);
Statement query = new Query(plan, 10, queryType, false);
AbstractPlan queryExecution = factory.create(query, queryListener, explainListener);
assertTrue(queryExecution instanceof QueryPlan);
}
Expand All @@ -112,7 +112,7 @@ public void create_query_with_fetch_size_which_can_be_paged() {
public void create_query_with_fetch_size_which_cannot_be_paged() {
when(plan.accept(any(CanPaginateVisitor.class), any())).thenReturn(Boolean.FALSE);
factory = new QueryPlanFactory(queryService);
Statement query = new Query(plan, 10, queryType);
Statement query = new Query(plan, 10, queryType, false);
assertThrows(
UnsupportedCursorRequestException.class,
() -> factory.create(query, queryListener, explainListener));
Expand Down
Loading