From d9c1b3841697799def1371d51ca8720ae30de04b Mon Sep 17 00:00:00 2001 From: Yuri Zmytrakov Date: Sun, 14 Dec 2025 20:01:33 +0100 Subject: [PATCH 1/4] feat: implement AST structure for cql2-json queries index selection - Added Abstract Syntax Tree (AST) representation for CQL2 queries replacing dictionary-based queries - Implemented datetime extraction logic to identify datetime, start_datetime, and end_datetime - Support for complex queries with logical operators, spatial operations, and property comparisons - Improved query parsing and maintainability of CQL2-JSON filter requests --- stac_fastapi/core/stac_fastapi/core/core.py | 11 +- .../stac_fastapi/core/extensions/filter.py | 62 ++++++- .../stac_fastapi/opensearch/database_logic.py | 39 +++- .../sfeos_helpers/filter/__init__.py | 18 ++ .../sfeos_helpers/filter/ast_parser.py | 113 ++++++++++++ .../sfeos_helpers/filter/ast_transform.py | 151 ++++++++++++++++ .../filter/datetime_optimizer.py | 170 ++++++++++++++++++ 7 files changed, 556 insertions(+), 8 deletions(-) create mode 100644 stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/ast_parser.py create mode 100644 stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/ast_transform.py create mode 100644 stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/datetime_optimizer.py diff --git a/stac_fastapi/core/stac_fastapi/core/core.py b/stac_fastapi/core/stac_fastapi/core/core.py index 18bea46f4..11b0ce870 100644 --- a/stac_fastapi/core/stac_fastapi/core/core.py +++ b/stac_fastapi/core/stac_fastapi/core/core.py @@ -851,6 +851,8 @@ async def post_search( search=search, intersects=getattr(search_request, "intersects") ) + collection_ids = getattr(search_request, "collections", None) + if hasattr(search_request, "query") and getattr(search_request, "query"): query_fields = set(getattr(search_request, "query").keys()) await self.queryables_cache.validate(query_fields) @@ -875,6 +877,13 @@ async def post_search( query_fields = get_properties_from_cql2_filter(cql2_filter) await self.queryables_cache.validate(query_fields) search = await self.database.apply_cql2_filter(search, cql2_filter) + date_str = getattr(search, "_cql2_date_str", None) + collection_ids = getattr(search, "_cql2_collection_ids", None) + if date_str is not None: + datetime_parsed = format_datetime_range(date_str=date_str) + search, datetime_search = self.database.apply_datetime_filter( + search=search, datetime=datetime_parsed + ) except HTTPException: raise except Exception as e: @@ -907,7 +916,7 @@ async def post_search( limit=limit, token=token_param, sort=sort, - collection_ids=getattr(search_request, "collections", None), + collection_ids=collection_ids, datetime_search=datetime_search, ) diff --git a/stac_fastapi/core/stac_fastapi/core/extensions/filter.py b/stac_fastapi/core/stac_fastapi/core/extensions/filter.py index ddeb2600d..18edc1308 100644 --- a/stac_fastapi/core/stac_fastapi/core/extensions/filter.py +++ b/stac_fastapi/core/stac_fastapi/core/extensions/filter.py @@ -13,8 +13,9 @@ # defines spatial operators (S_INTERSECTS, S_CONTAINS, S_WITHIN, S_DISJOINT). # """ +from dataclasses import dataclass from enum import Enum -from typing import Any, Dict +from typing import Any, Dict, List, Optional DEFAULT_QUERYABLES: Dict[str, Dict[str, Any]] = { "id": { @@ -90,3 +91,62 @@ class SpatialOp(str, Enum): S_CONTAINS = "s_contains" S_WITHIN = "s_within" S_DISJOINT = "s_disjoint" + + +@dataclass +class CqlNode: + """Base class.""" + + pass + + +@dataclass +class LogicalNode(CqlNode): + """Logical operators (AND, OR, NOT).""" + + op: LogicalOp + children: List["CqlNode"] + + +@dataclass +class ComparisonNode(CqlNode): + """Comparison operators (=, <>, <, <=, >, >=, is null).""" + + op: ComparisonOp + field: str + value: Any + + +@dataclass +class AdvancedComparisonNode(CqlNode): + """Advanced comparison operators (like, between, in).""" + + op: AdvancedComparisonOp + field: str + value: Any + + +@dataclass +class SpatialNode(CqlNode): + """Spatial operators.""" + + op: SpatialOp + field: str + geometry: Dict[str, Any] + + +@dataclass +class DateTimeRangeNode(CqlNode): + """Datetime range queries.""" + + field: str = "properties.datetime" + start: Optional[str] = None + end: Optional[str] = None + + +@dataclass +class DateTimeExactNode(CqlNode): + """Exact datetime queries.""" + + field: str = "properties.datetime" + value: Optional[str] = None diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py index 14529ac37..295ea17f2 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py @@ -52,6 +52,12 @@ merge_to_operations, operations_to_script, ) +from stac_fastapi.sfeos_helpers.filter import ( + Cql2AstParser, + DatetimeOptimizer, + to_es_via_ast, +) +from stac_fastapi.sfeos_helpers.filter.datetime_optimizer import extract_from_ast from stac_fastapi.sfeos_helpers.mappings import ( AGGREGATION_MAPPING, COLLECTIONS_INDEX, @@ -683,11 +689,11 @@ async def apply_cql2_filter( self, search: Search, _filter: Optional[Dict[str, Any]] ): """ - Apply a CQL2 filter to an Opensearch Search object. + Apply a CQL2 filter to an OpenSearch Search object. - This method transforms a dictionary representing a CQL2 filter into an Opensearch query - and applies it to the provided Search object. If the filter is None, the original Search - object is returned unmodified. + This method transforms a CQL2 filter dictionary into an OpenSearch query using + an AST tree-based approach. If the filter is None, the original Search object is returned + unmodified. Args: search (Search): The Opensearch Search object to which the filter will be applied. @@ -701,8 +707,29 @@ async def apply_cql2_filter( otherwise the original Search object. """ if _filter is not None: - es_query = filter_module.to_es(await self.get_queryables_mapping(), _filter) - search = search.filter(es_query) + queryables_mapping = await self.get_queryables_mapping() + + try: + parser = Cql2AstParser(queryables_mapping) + ast = parser.parse(_filter) + + optimizer = DatetimeOptimizer() + optimized_ast = optimizer.optimize_query_structure(ast) + + date_str = extract_from_ast(optimized_ast, "datetime") + collection_ids = extract_from_ast(optimized_ast, "collection") or None + + es_query = to_es_via_ast(queryables_mapping, optimized_ast) + + search = search.filter(es_query) + search._cql2_date_str = date_str + search._cql2_collection_ids = collection_ids + + except Exception: + # Fallback to dictionary-based approach + es_query = filter_module.to_es(queryables_mapping, _filter) + search = search.filter(es_query) + return search return search diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/__init__.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/__init__.py index 02b5db926..1f961ebf3 100644 --- a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/__init__.py +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/__init__.py @@ -11,6 +11,8 @@ - cql2.py: CQL2 pattern conversion helpers - transform.py: Query transformation functions - client.py: Filter client implementation +- ast_parser.py: AST parser for CQL2 queries +- datetime_optimizer.py: Datetime optimization for query structure When adding new functionality to this package, consider: 1. Will this code be used by both Elasticsearch and OpenSearch implementations? @@ -22,6 +24,14 @@ - Parameter names should be consistent across similar functions """ +from stac_fastapi.core.extensions.filter import ( + AdvancedComparisonOp, + ComparisonOp, + LogicalOp, +) + +from .ast_parser import Cql2AstParser +from .ast_transform import to_es_via_ast from .client import EsAsyncBaseFiltersClient # Re-export the main functions and classes for backward compatibility @@ -31,6 +41,7 @@ cql2_like_to_es, valid_like_substitutions, ) +from .datetime_optimizer import DatetimeOptimizer from .transform import to_es, to_es_field __all__ = [ @@ -40,5 +51,12 @@ "_replace_like_patterns", "to_es_field", "to_es", + "to_es_via_ast", "EsAsyncBaseFiltersClient", + "Cql2AstParser", + "AdvancedComparisonOp", + "ComparisonOp", + "LogicalOp", + "DatetimeOptimizer", + "extract_from_ast", ] diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/ast_parser.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/ast_parser.py new file mode 100644 index 000000000..1f8db898c --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/ast_parser.py @@ -0,0 +1,113 @@ +"""AST parser for CQL2 queries.""" + +import json +from typing import Any, Dict, Union + +from stac_fastapi.core.extensions.filter import ( + AdvancedComparisonNode, + AdvancedComparisonOp, + ComparisonNode, + ComparisonOp, + CqlNode, + LogicalNode, + LogicalOp, + SpatialNode, + SpatialOp, +) + + +class Cql2AstParser: + """Parse CQL2 into AST tree.""" + + def __init__(self, queryables_mapping: Dict[str, Any]): + """Initialize the CQL2 AST parser.""" + self.queryables_mapping = queryables_mapping + + def parse(self, cql: Union[str, Dict[str, Any]]) -> CqlNode: + """Parse CQL2 into AST tree. + + Args: + cql: CQL2 expression as string/dictionary + + Returns: + Node of AST tree + """ + if isinstance(cql, str): + data: Dict[str, Any] = json.loads(cql) + return self._parse_node(data) + + return self._parse_node(cql) + + def _parse_node(self, node: Dict[str, Any]) -> CqlNode: + """Parse a single CQL2 node into AST.""" + if "op" in node and node["op"] in ["and", "or", "not"]: + op = LogicalOp(node["op"]) + args = node.get("args", []) + + if op == LogicalOp.NOT: + children = [self._parse_node(args[0])] if args else [] + else: + children = [self._parse_node(arg) for arg in args] + + return LogicalNode(op=op, children=children) + + elif "op" in node and node["op"] in ["=", "<>", "<", "<=", ">", ">=", "isNull"]: + op = ComparisonOp(node["op"]) + args = node.get("args", []) + + if isinstance(args[0], dict) and "property" in args[0]: + field = args[0]["property"] + else: + field = str(args[0]) + + value = args[1] if len(args) > 1 else None + + return ComparisonNode(op=op, field=field, value=value) + + elif "op" in node and node["op"] in ["like", "between", "in"]: + op = AdvancedComparisonOp(node["op"]) + args = node.get("args", []) + + if isinstance(args[0], dict) and "property" in args[0]: + field = args[0]["property"] + else: + field = str(args[0]) + + if op == AdvancedComparisonOp.BETWEEN: + if len(args) != 3: + raise ValueError( + f"BETWEEN operator requires (property, lower, upper), got {args}" + ) + value = (args[1], args[2]) + + elif op == AdvancedComparisonOp.IN: + if not isinstance(args[1], list): + raise ValueError(f"IN operator expects list, got {type(args[1])}") + value = args[1] + + elif op == AdvancedComparisonOp.LIKE: + if len(args) != 2: + raise ValueError( + f"LIKE operator requires (property, pattern), got {args}" + ) + value = args[1] + + return AdvancedComparisonNode(op=op, field=field, value=value) + + elif "op" in node and node["op"] in [ + "s_intersects", + "s_contains", + "s_within", + "s_disjoint", + ]: + op = SpatialOp(node["op"]) + args = node.get("args", []) + + if isinstance(args[0], dict) and "property" in args[0]: + field = args[0]["property"] + else: + field = str(args[0]) + + geometry = args[1] + + return SpatialNode(op=op, field=field, geometry=geometry) diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/ast_transform.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/ast_transform.py new file mode 100644 index 000000000..90033a6da --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/ast_transform.py @@ -0,0 +1,151 @@ +"""AST-based query transformation for Elasticsearch/OpenSearch.""" + +from typing import Any, Dict, Union + +from stac_fastapi.core.extensions.filter import ( + AdvancedComparisonNode, + AdvancedComparisonOp, + ComparisonNode, + ComparisonOp, + CqlNode, + LogicalNode, + LogicalOp, + SpatialNode, + SpatialOp, +) + + +def to_es_via_ast( + queryables_mapping: Dict[str, Any], query: Union[Dict[str, Any], CqlNode] +) -> Dict[str, Any]: + """Transform CQL2 query to Elasticsearch/Opensearch query via AST.""" + from .ast_parser import Cql2AstParser + + if isinstance(query, CqlNode): + ast = query + else: + parser = Cql2AstParser(queryables_mapping) + ast = parser.parse(query) + + result = _transform_ast_node(ast, queryables_mapping) + return result + + +def _transform_ast_node( + node: Any, queryables_mapping: Dict[str, Any] +) -> Dict[str, Any]: + """Transform AST node to Elasticsearch/Opensearch query.""" + if isinstance(node, LogicalNode): + bool_type = { + LogicalOp.AND: "must", + LogicalOp.OR: "should", + LogicalOp.NOT: "must_not", + }[node.op] + + if node.op == LogicalOp.NOT: + return { + "bool": { + bool_type: _transform_ast_node(node.children[0], queryables_mapping) + } + } + else: + return { + "bool": { + bool_type: [ + _transform_ast_node(child, queryables_mapping) + for child in node.children + ] + } + } + + elif isinstance(node, ComparisonNode): + field = _to_es_field(queryables_mapping, node.field) + value = node.value + + if isinstance(value, dict) and "timestamp" in value: + value = value["timestamp"] + + if node.op == ComparisonOp.EQ: + return {"term": {field: value}} + elif node.op == ComparisonOp.NEQ: + return {"bool": {"must_not": [{"term": {field: value}}]}} + elif node.op in [ + ComparisonOp.LT, + ComparisonOp.LTE, + ComparisonOp.GT, + ComparisonOp.GTE, + ]: + range_op = { + ComparisonOp.LT: "lt", + ComparisonOp.LTE: "lte", + ComparisonOp.GT: "gt", + ComparisonOp.GTE: "gte", + }[node.op] + return {"range": {field: {range_op: value}}} + elif node.op == ComparisonOp.IS_NULL: + return {"bool": {"must_not": {"exists": {"field": field}}}} + + elif isinstance(node, AdvancedComparisonNode): + field = _to_es_field(queryables_mapping, node.field) + + if node.op == AdvancedComparisonOp.BETWEEN: + if isinstance(node.value, (list, tuple)) and len(node.value) == 2: + gte, lte = node.value[0], node.value[1] + if isinstance(gte, dict) and "timestamp" in gte: + gte = gte["timestamp"] + if isinstance(lte, dict) and "timestamp" in lte: + lte = lte["timestamp"] + return {"range": {field: {"gte": gte, "lte": lte}}} + + elif node.op == AdvancedComparisonOp.IN: + if not isinstance(node.value, list): + raise ValueError(f"IN operator expects list, got {type(node.value)}") + return {"terms": {field: node.value}} + + elif node.op == AdvancedComparisonOp.LIKE: + pattern = str(node.value) + + es_pattern = "" + i = 0 + while i < len(pattern): + if pattern[i] == "\\" and i + 1 < len(pattern): + i += 1 + if pattern[i] == "%": + es_pattern += "%" + elif pattern[i] == "_": + es_pattern += "_" + elif pattern[i] == "\\": + es_pattern += "\\" + else: + es_pattern += "\\" + pattern[i] + elif pattern[i] == "%": + es_pattern += "*" + elif pattern[i] == "_": + es_pattern += "?" + else: + es_pattern += pattern[i] + i += 1 + + return { + "wildcard": {field: {"value": es_pattern, "case_insensitive": True}} + } + + elif isinstance(node, SpatialNode): + field = _to_es_field(queryables_mapping, node.field) + + relation_mapping = { + SpatialOp.S_INTERSECTS: "intersects", + SpatialOp.S_CONTAINS: "contains", + SpatialOp.S_WITHIN: "within", + SpatialOp.S_DISJOINT: "disjoint", + } + + relation = relation_mapping[node.op] + return {"geo_shape": {field: {"shape": node.geometry, "relation": relation}}} + + raise ValueError("Unsupported AST node") + + +def _to_es_field(queryables_mapping: Dict[str, Any], field: str) -> str: + """Map field name using queryables mapping.""" + return queryables_mapping.get(field, field) diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/datetime_optimizer.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/datetime_optimizer.py new file mode 100644 index 000000000..10d73eed7 --- /dev/null +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/datetime_optimizer.py @@ -0,0 +1,170 @@ +"""Extracts datetime patterns from CQL2 AST.""" + +from typing import List, Union + +from stac_fastapi.core.extensions.filter import ( + AdvancedComparisonNode, + AdvancedComparisonOp, + ComparisonNode, + ComparisonOp, + CqlNode, + DateTimeExactNode, + DateTimeRangeNode, + LogicalNode, + LogicalOp, +) + + +class DatetimeOptimizer: + """Extract datetime nodes from CQL2 AST.""" + + def __init__(self) -> None: + """Initialize the datetime optimizer.""" + self.datetime_nodes: List[Union[DateTimeRangeNode, DateTimeExactNode]] = [] + + def extract_datetime_nodes( + self, node: CqlNode + ) -> List[Union[DateTimeRangeNode, DateTimeExactNode]]: + """Extract all datetime nodes from AST.""" + datetime_nodes = [] + + def _traverse(current: CqlNode): + if isinstance(current, ComparisonNode): + if self._is_datetime_field(current.field): + if current.op == ComparisonOp.EQ: + datetime_nodes.append( + DateTimeExactNode(field=current.field, value=current.value) + ) + elif current.op == ComparisonOp.GTE: + datetime_nodes.append( + DateTimeRangeNode(field=current.field, start=current.value) + ) + elif current.op == ComparisonOp.LTE: + datetime_nodes.append( + DateTimeRangeNode(field=current.field, end=current.value) + ) + + elif isinstance(current, AdvancedComparisonNode): + if ( + current.op == AdvancedComparisonOp.BETWEEN + and self._is_datetime_field(current.field) + ): + if ( + isinstance(current.value, (list, tuple)) + and len(current.value) == 2 + ): + datetime_nodes.append( + DateTimeRangeNode( + field=current.field, + start=current.value[0], + end=current.value[1], + ) + ) + + if isinstance(current, LogicalNode): + for child in current.children: + _traverse(child) + + _traverse(node) + return datetime_nodes + + def _is_datetime_field(self, field: str) -> bool: + """Check if a field is a datetime field.""" + field_lower = field.lower() + return any( + dt_field in field_lower + for dt_field in ["datetime", "start_datetime", "end_datetime"] + ) + + def optimize_query_structure(self, ast: CqlNode) -> CqlNode: + """Optimize AST structure for better query performance. + + Reorders AND clauses to put datetime filters first for better execution. + """ + return self._reorder_for_datetime_priority(ast) + + def _reorder_for_datetime_priority(self, node: CqlNode) -> CqlNode: + """Reorder query tree to prioritize datetime filters.""" + if isinstance(node, LogicalNode): + if node.op == LogicalOp.AND: + datetime_children = [] + other_children = [] + + for child in node.children: + processed_child = self._reorder_for_datetime_priority(child) + if self._contains_datetime(processed_child): + datetime_children.append(processed_child) + else: + other_children.append(processed_child) + + reordered_children = datetime_children + other_children + + if len(reordered_children) == 1: + return reordered_children[0] + + return LogicalNode(op=LogicalOp.AND, children=reordered_children) + + elif node.op in [LogicalOp.OR, LogicalOp.NOT]: + processed_children = [ + self._reorder_for_datetime_priority(child) + for child in node.children + ] + return LogicalNode(op=node.op, children=processed_children) + + return node + + def _contains_datetime(self, node: CqlNode) -> bool: + """Check if node contains datetime filter.""" + if isinstance(node, (ComparisonNode, AdvancedComparisonNode)): + return self._is_datetime_field(node.field) + + elif isinstance(node, LogicalNode): + return any(self._contains_datetime(child) for child in node.children) + + return False + + +def extract_from_ast(node: CqlNode, field_name: str): + """Extract values from AST for a given field.""" + values = [] + datetime_start = None + datetime_end = None + + def recurse(n): + nonlocal datetime_start, datetime_end + + if hasattr(n, "children"): + for child in n.children: + recurse(child) + + if hasattr(n, "field") and hasattr(n, "value"): + if n.field == field_name: + if field_name == "datetime": + op = getattr(n, "op", None) + if op == ComparisonOp.GTE: + datetime_start = n.value + elif op == ComparisonOp.LTE: + datetime_end = n.value + else: + values.append(n.value) + else: + if isinstance(n.value, list): + values.extend(n.value) + else: + values.append(n.value) + + recurse(node) + + if field_name == "datetime": + if datetime_start and datetime_end: + return f"{datetime_start}/{datetime_end}" + elif datetime_start: + return f"{datetime_start}/.." + elif datetime_end: + return f"../{datetime_end}" + elif values: + return values[0] + else: + return None + + return values From 906e4fa0d6ece9df8e4ad37039bf315b3286033c Mon Sep 17 00:00:00 2001 From: Yuri Zmytrakov Date: Mon, 15 Dec 2025 12:57:39 +0100 Subject: [PATCH 2/4] test: Add cql2-json datetime operator tests - Added tests for CQL2-JSON datetime filter operators - Tested datetime field operators: =, <=, >=, AND combinations - Tested start_datetime and end_datetime field operators - Verified range queries work correctly with date fields - Ensured AST parsing correctly handles datetime comparisons --- stac_fastapi/tests/resources/test_item.py | 202 ++++++++++++++++++++++ 1 file changed, 202 insertions(+) diff --git a/stac_fastapi/tests/resources/test_item.py b/stac_fastapi/tests/resources/test_item.py index 4231f1029..aabf5cfa4 100644 --- a/stac_fastapi/tests/resources/test_item.py +++ b/stac_fastapi/tests/resources/test_item.py @@ -1163,3 +1163,205 @@ async def test_search_datetime_with_null_datetime( await txn_client.delete_collection(test_collection["id"]) except Exception as e: logger.warning(f"Failed to delete collection: {e}") + + +@pytest.mark.asyncio +async def test_search_cql2_json_datetime_operators(app_client, ctx): + """Test POST search with CQL2-JSON datetime filter operators""" + + test_item = ctx.item + item_datetime = test_item["properties"]["datetime"] + + cql2_equal = { + "filter-lang": "cql2-json", + "filter": {"op": "=", "args": [{"property": "datetime"}, item_datetime]}, + } + resp = await app_client.post("/search", json=cql2_equal) + assert resp.status_code == 200 + resp_json = resp.json() + assert resp_json["features"][0]["id"] == test_item["id"] + + later_datetime = "2020-02-13T12:30:22Z" + cql2_lte = { + "filter-lang": "cql2-json", + "filter": {"op": "<=", "args": [{"property": "datetime"}, later_datetime]}, + } + resp = await app_client.post("/search", json=cql2_lte) + assert resp.status_code == 200 + resp_json = resp.json() + assert resp_json["features"][0]["id"] == test_item["id"] + + earlier_datetime = "2020-02-11T12:30:22Z" + cql2_gte = { + "filter-lang": "cql2-json", + "filter": {"op": ">=", "args": [{"property": "datetime"}, earlier_datetime]}, + } + resp = await app_client.post("/search", json=cql2_gte) + assert resp.status_code == 200 + resp_json = resp.json() + assert resp_json["features"][0]["id"] == test_item["id"] + + range_start = "2020-02-12T00:00:00Z" + range_end = "2020-02-13T00:00:00Z" + cql2_between = { + "filter-lang": "cql2-json", + "filter": { + "op": "and", + "args": [ + {"op": ">=", "args": [{"property": "datetime"}, range_start]}, + {"op": "<=", "args": [{"property": "datetime"}, range_end]}, + ], + }, + } + resp = await app_client.post("/search", json=cql2_between) + assert resp.status_code == 200 + resp_json = resp.json() + assert resp_json["features"][0]["id"] == test_item["id"] + + +@pytest.mark.asyncio +async def test_search_cql2_json_start_end_datetime_operators(app_client, ctx): + """Test POST search with CQL2-JSON start_datetime and end_datetime filter operators""" + + test_item = ctx.item + start_datetime = test_item["properties"]["start_datetime"] # "2020-02-08T12:30:22Z" + end_datetime = test_item["properties"]["end_datetime"] # "2020-02-16T12:30:22Z" + + cql2_start_equal = { + "filter-lang": "cql2-json", + "filter": {"op": "=", "args": [{"property": "start_datetime"}, start_datetime]}, + } + resp = await app_client.post("/search", json=cql2_start_equal) + assert resp.status_code == 200 + resp_json = resp.json() + assert resp_json["features"][0]["id"] == test_item["id"] + + later_start = "2020-02-09T12:30:22Z" + cql2_start_lte = { + "filter-lang": "cql2-json", + "filter": {"op": "<=", "args": [{"property": "start_datetime"}, later_start]}, + } + resp = await app_client.post("/search", json=cql2_start_lte) + assert resp.status_code == 200 + resp_json = resp.json() + assert resp_json["features"][0]["id"] == test_item["id"] + + earlier_start = "2020-02-07T12:30:22Z" + cql2_start_gte = { + "filter-lang": "cql2-json", + "filter": {"op": ">=", "args": [{"property": "start_datetime"}, earlier_start]}, + } + resp = await app_client.post("/search", json=cql2_start_gte) + assert resp.status_code == 200 + resp_json = resp.json() + assert resp_json["features"][0]["id"] == test_item["id"] + + start_range_start = "2020-02-08T00:00:00Z" + start_range_end = "2020-02-09T00:00:00Z" + cql2_start_between = { + "filter-lang": "cql2-json", + "filter": { + "op": "and", + "args": [ + { + "op": ">=", + "args": [{"property": "start_datetime"}, start_range_start], + }, + {"op": "<=", "args": [{"property": "start_datetime"}, start_range_end]}, + ], + }, + } + resp = await app_client.post("/search", json=cql2_start_between) + assert resp.status_code == 200 + resp_json = resp.json() + assert resp_json["features"][0]["id"] == test_item["id"] + + cql2_end_equal = { + "filter-lang": "cql2-json", + "filter": {"op": "=", "args": [{"property": "end_datetime"}, end_datetime]}, + } + resp = await app_client.post("/search", json=cql2_end_equal) + assert resp.status_code == 200 + resp_json = resp.json() + assert resp_json["features"][0]["id"] == test_item["id"] + + later_end = "2020-02-17T12:30:22Z" + cql2_end_lte = { + "filter-lang": "cql2-json", + "filter": {"op": "<=", "args": [{"property": "end_datetime"}, later_end]}, + } + resp = await app_client.post("/search", json=cql2_end_lte) + assert resp.status_code == 200 + resp_json = resp.json() + assert resp_json["features"][0]["id"] == test_item["id"] + + earlier_end = "2020-02-15T12:30:22Z" + cql2_end_gte = { + "filter-lang": "cql2-json", + "filter": {"op": ">=", "args": [{"property": "end_datetime"}, earlier_end]}, + } + resp = await app_client.post("/search", json=cql2_end_gte) + assert resp.status_code == 200 + resp_json = resp.json() + assert resp_json["features"][0]["id"] == test_item["id"] + + end_range_start = "2020-02-16T00:00:00Z" + end_range_end = "2020-02-17T00:00:00Z" + cql2_end_between = { + "filter-lang": "cql2-json", + "filter": { + "op": "and", + "args": [ + {"op": ">=", "args": [{"property": "end_datetime"}, end_range_start]}, + {"op": "<=", "args": [{"property": "end_datetime"}, end_range_end]}, + ], + }, + } + resp = await app_client.post("/search", json=cql2_end_between) + assert resp.status_code == 200 + resp_json = resp.json() + assert resp_json["features"][0]["id"] == test_item["id"] + + cql2_start_end_range = { + "filter-lang": "cql2-json", + "filter": { + "op": "and", + "args": [ + { + "op": ">=", + "args": [{"property": "start_datetime"}, "2020-02-07T12:30:22Z"], + }, + { + "op": "<=", + "args": [{"property": "end_datetime"}, "2020-02-17T12:30:22Z"], + }, + ], + }, + } + resp = await app_client.post("/search", json=cql2_start_end_range) + assert resp.status_code == 200 + resp_json = resp.json() + assert resp_json["features"][0]["id"] == test_item["id"] + + +@pytest.mark.asyncio +async def test_select_indexes(txn_client, load_test_data): + """Run select_indexes test.""" + + database = txn_client.database + index_selector = database.async_index_selector + + collection = load_test_data("test_collection.json") + collection_id = "test-collection-1" + collection["id"] = collection_id + + test_item = load_test_data("test_item.json") + test_item["id"] = "test-item-1" + + result = await index_selector.select_indexes( + ["test-collection-1"], + {"gte": "2024-01-01T00:00:00Z", "lte": "2024-12-31T23:59:59Z"}, + ) + + assert result is not None + assert result == "items_test-collection-1" From c18714f142d70aa2fdeac192db2388b22ed98629 Mon Sep 17 00:00:00 2001 From: Yuri Zmytrakov Date: Mon, 15 Dec 2025 15:50:49 +0100 Subject: [PATCH 3/4] chore: update readme and changelog --- CHANGELOG.md | 2 ++ README.md | 26 ++++++++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0285bc931..9a723bed8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added +- Added CQL2 Abstract Syntax Tree (AST) structure for efficient query parsing and datetime-based indexes. [#560](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/560) + - Environment variable `VALIDATE_QUERYABLES` to enable/disable validation of queryables in search/filter requests. When set to `true`, search requests will be validated against the defined queryables, returning an error for any unsupported fields. Defaults to `false` for backward compatibility.[#532](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/532) - Environment variable `QUERYABLES_CACHE_TTL` to configure the TTL (in seconds) for caching queryables. Default is `1800` seconds (30 minutes) to balance performance and freshness of queryables data. [#532](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/532) diff --git a/README.md b/README.md index 57ff24f4c..733106ce0 100644 --- a/README.md +++ b/README.md @@ -100,6 +100,7 @@ This project is built on the following technologies: STAC, stac-fastapi, FastAPI - [Examples](#examples) - [Performance](#performance) - [Direct Response Mode](#direct-response-mode) + - [CQL2 JSON Search with AST-based Parsing](#cql2-json-search-with-ast-based-parsing) - [Quick Start](#quick-start) - [Installation](#installation) - [Running Locally](#running-locally) @@ -379,6 +380,31 @@ These examples provide practical reference implementations for various deploymen - **Default setting**: `false` for safety. - **More information**: See [issue #347](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/issues/347) for background and implementation details. + +### CQL2 JSON Search with AST-based Parsing + +SFEOS now uses an Abstract Syntax Tree (AST) in CQL2-JSON search queries for efficient query parsing and datetime extraction, enabling the selection and management of the appropriate searchable indexes. + +#### AST-based Query Processing + +The CQL2 implementation uses an Abstract Syntax Tree (AST) structure that replaces the previous dictionary-based processing. This enables: + +1. **Structured Query Representation**: Queries are parsed into a tree structure with different node types +2. **Efficient Parameter Access**: Easy traversal and extraction of query parameters +3. **Optimized Index Selection**: Selection of appropriate fields for selection and management of indexes + +#### AST Node Types + +The AST supports various node types representing different query operations: + +- **Logical Nodes**: `AND`, `OR`, `NOT` operators for combining conditions +- **Comparison Nodes**: `=`, `<>`, `<`, `<=`, `>`, `>=`, `isNull` operations +- **Advanced Comparison Nodes**: `LIKE`, `BETWEEN`, `IN` operations +- **Spatial Nodes**: `s_intersects`, `s_contains`, `s_within`, `s_disjoint` for geospatial queries +- **Datetime Nodes**: Special handling for datetime range and exact value queries + +The AST-based approach enables efficient extraction of datetime parameters (`datetime`, `start_datetime`, `end_datetime`) from complex queries. + ## Quick Start This section helps you get up and running with stac-fastapi-elasticsearch-opensearch quickly. From 89cb538d4d9aae17192cdb0097b92fe8a50491f2 Mon Sep 17 00:00:00 2001 From: Yuri Zmytrakov Date: Wed, 17 Dec 2025 16:49:17 +0100 Subject: [PATCH 4/4] fix parsing issues: --- stac_fastapi/core/stac_fastapi/core/core.py | 7 - .../stac_fastapi/opensearch/database_logic.py | 45 +++++-- .../filter/datetime_optimizer.py | 122 +++++++++++++----- 3 files changed, 130 insertions(+), 44 deletions(-) diff --git a/stac_fastapi/core/stac_fastapi/core/core.py b/stac_fastapi/core/stac_fastapi/core/core.py index 11b0ce870..90faf27d4 100644 --- a/stac_fastapi/core/stac_fastapi/core/core.py +++ b/stac_fastapi/core/stac_fastapi/core/core.py @@ -877,13 +877,6 @@ async def post_search( query_fields = get_properties_from_cql2_filter(cql2_filter) await self.queryables_cache.validate(query_fields) search = await self.database.apply_cql2_filter(search, cql2_filter) - date_str = getattr(search, "_cql2_date_str", None) - collection_ids = getattr(search, "_cql2_collection_ids", None) - if date_str is not None: - datetime_parsed = format_datetime_range(date_str=date_str) - search, datetime_search = self.database.apply_datetime_filter( - search=search, datetime=datetime_parsed - ) except HTTPException: raise except Exception as e: diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py index 01b9cd6c9..cb3394190 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py @@ -17,6 +17,7 @@ import stac_fastapi.sfeos_helpers.filter as filter_module from stac_fastapi.core.base_database_logic import BaseDatabaseLogic +from stac_fastapi.core.datetime_utils import format_datetime_range from stac_fastapi.core.serializers import CollectionSerializer, ItemSerializer from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon, get_bool_env from stac_fastapi.extensions.core.transaction.request import ( @@ -57,7 +58,9 @@ DatetimeOptimizer, to_es_via_ast, ) -from stac_fastapi.sfeos_helpers.filter.datetime_optimizer import extract_from_ast +from stac_fastapi.sfeos_helpers.filter.datetime_optimizer import ( + extract_collection_datetime, +) from stac_fastapi.sfeos_helpers.mappings import ( AGGREGATION_MAPPING, COLLECTIONS_INDEX, @@ -799,14 +802,12 @@ async def apply_cql2_filter( optimizer = DatetimeOptimizer() optimized_ast = optimizer.optimize_query_structure(ast) - date_str = extract_from_ast(optimized_ast, "datetime") - collection_ids = extract_from_ast(optimized_ast, "collection") or None + _cql2_collection_datetime = extract_collection_datetime(optimized_ast) es_query = to_es_via_ast(queryables_mapping, optimized_ast) search = search.filter(es_query) - search._cql2_date_str = date_str - search._cql2_collection_ids = collection_ids + search._cql2_collection_datetime = _cql2_collection_datetime except Exception: # Fallback to dictionary-based approach @@ -866,9 +867,37 @@ async def execute_search( search_body: Dict[str, Any] = {} query = search.query.to_dict() if search.query else None - index_param = await self.async_index_selector.select_indexes( - collection_ids, datetime_search - ) + cql2_collection_datetime = getattr(search, "_cql2_collection_datetime", None) + + # Special case for cql2 index selection + if cql2_collection_datetime: + index_param = "" + for node in cql2_collection_datetime: + if node[0]: + collection_id = node[0] + # Parse datetime for index select without changing Search object + _, datetime_search = self.apply_datetime_filter( + search, format_datetime_range(node[1]) + ) + if not isinstance(collection_id, list): + collection_id = [collection_id] + index_param_temp = await self.async_index_selector.select_indexes( + collection_id, datetime_search + ) + index_param = ",".join( + p for p in (index_param, index_param_temp) if p + ) + + collection_ids = [] + for collections, _ in cql2_collection_datetime: + if isinstance(collections, list): + collection_ids.extend(collections) + else: + collection_ids.append(collections) + else: + index_param = await self.async_index_selector.select_indexes( + collection_ids, datetime_search + ) if len(index_param) > ES_MAX_URL_LENGTH - 300: index_param = ITEM_INDICES query = add_collections_to_body(collection_ids, query) diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/datetime_optimizer.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/datetime_optimizer.py index 10d73eed7..01c3b53dc 100644 --- a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/datetime_optimizer.py +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/filter/datetime_optimizer.py @@ -127,44 +127,108 @@ def _contains_datetime(self, node: CqlNode) -> bool: def extract_from_ast(node: CqlNode, field_name: str): """Extract values from AST for a given field.""" values = [] - datetime_start = None - datetime_end = None def recurse(n): - nonlocal datetime_start, datetime_end - + """Run field extraction in recursion.""" if hasattr(n, "children"): for child in n.children: recurse(child) if hasattr(n, "field") and hasattr(n, "value"): if n.field == field_name: - if field_name == "datetime": - op = getattr(n, "op", None) - if op == ComparisonOp.GTE: - datetime_start = n.value - elif op == ComparisonOp.LTE: - datetime_end = n.value - else: - values.append(n.value) + if isinstance(n.value, list): + values.extend(n.value) else: - if isinstance(n.value, list): - values.extend(n.value) - else: - values.append(n.value) + values.append(n.value) + # Handle datetime range optimization + elif hasattr(n, "op") and n.op == LogicalOp.AND and hasattr(n, "children"): + # Check if this is a datetime range (GTE and LTE on datetime field) + datetime_nodes = [] + for child in n.children: + if ( + hasattr(child, "field") + and child.field == "datetime" + and hasattr(child, "op") + and hasattr(child, "value") + ): + datetime_nodes.append(child) + + if len(datetime_nodes) == 2: + # Check if we have both GTE and LTE for datetime + gte_node = None + lte_node = None + for d_node in datetime_nodes: + if d_node.op == ComparisonOp.GTE: + gte_node = d_node + elif d_node.op == ComparisonOp.LTE: + lte_node = d_node + + if gte_node and lte_node: + values.append( + { + "type": "range", + "start": gte_node.value, + "end": lte_node.value, + } + ) recurse(node) - if field_name == "datetime": - if datetime_start and datetime_end: - return f"{datetime_start}/{datetime_end}" - elif datetime_start: - return f"{datetime_start}/.." - elif datetime_end: - return f"../{datetime_end}" - elif values: - return values[0] - else: - return None - - return values + return values if values else None + + +def extract_collection_datetime(node): + """Get (collection, datetime_range) pairs from node.""" + pairs = [] + + def recurse(n): + # Check if this is an AND node (we're looking for AND clauses) + if hasattr(n, "op") and hasattr(n.op, "value") and n.op.value == "and": + collection = None + gte_date = None + lte_date = None + + # Look through all children of this AND node + if hasattr(n, "children"): + for child in n.children: + # Check if it's a comparison node + if ( + hasattr(child, "op") + and hasattr(child, "field") + and hasattr(child, "value") + ): + if child.field == "collection": + collection = child.value + elif child.field in [ + "datetime", + "start_datetime", + "end_datetime", + ]: # Handle all datetime fields + if hasattr(child.op, "value"): + if child.op.value == ">=": + gte_date = child.value + elif child.op.value == "<=": + lte_date = child.value + + # If we found dates, add to pairs (even without collection) + if gte_date or lte_date: + if gte_date and lte_date: + date_range = f"{gte_date}/{lte_date}" + elif gte_date: + date_range = f"{gte_date}/.." + elif lte_date: + date_range = f"../{lte_date}" + + # Add to pairs - if no collection, use empty string + pairs.append((collection or "", date_range)) + # If we found a collection but no dates + elif collection is not None: + pairs.append((collection, "")) + + # Continue searching through children + if hasattr(n, "children"): + for child in n.children: + recurse(child) + + recurse(node) + return pairs