From dc71dd468b0c4f661cf1f9f113dcdb754e5b3386 Mon Sep 17 00:00:00 2001 From: Noa Dove Date: Mon, 6 Apr 2026 23:37:50 -0700 Subject: [PATCH 1/9] Normalize import --- src/azul/service/query_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/azul/service/query_service.py b/src/azul/service/query_service.py index 0a3c739cb9..05ccf3e0ed 100644 --- a/src/azul/service/query_service.py +++ b/src/azul/service/query_service.py @@ -53,6 +53,7 @@ ) from azul.indexer.document import ( DocumentType, + FieldPath, IndexName, ) from azul.indexer.document_service import ( @@ -77,7 +78,6 @@ ) from azul.plugins import ( DocumentSlice, - FieldPath, MetadataPlugin, dotted, ) From ba3d151562ea14ee690cdabd62a53a337aee5a33 Mon Sep 17 00:00:00 2001 From: Noa Dove Date: Tue, 7 Apr 2026 17:49:52 -0700 Subject: [PATCH 2/9] Fix definition of TranslatedFilters Non-primitives (mappings) are present when filtering by nested fields --- src/azul/service/query_service.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/azul/service/query_service.py b/src/azul/service/query_service.py index 05ccf3e0ed..9b75fa3c45 100644 --- a/src/azul/service/query_service.py +++ b/src/azul/service/query_service.py @@ -8,7 +8,6 @@ from collections.abc import ( Iterable, Mapping, - Sequence, ) from functools import ( partial, @@ -66,6 +65,7 @@ from azul.lib.types import ( AnyJSON, JSON, + JSONArray, JSONTypedDict, JSONs, MutableJSON, @@ -168,7 +168,7 @@ def wrap[R0](self, other: OpenSearchStage[R0, R1]) -> OpenSearchChain[R0, R1, R2 return OpenSearchChain(inner=other, outer=self) -TranslatedFilters = Mapping[FieldPath, Mapping[str, Sequence[PrimitiveJSON]]] +TranslatedFilters = Mapping[FieldPath, Mapping[str, JSONArray]] @attr.s(frozen=True, auto_attribs=True, kw_only=True) From 39fecf1493ee35faab4d46654762426bc79ddd9b Mon Sep 17 00:00:00 2001 From: Noa Dove Date: Tue, 7 Apr 2026 18:14:21 -0700 Subject: [PATCH 3/9] Fix type annotation --- src/azul/service/query_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/azul/service/query_service.py b/src/azul/service/query_service.py index 9b75fa3c45..b0780c6218 100644 --- a/src/azul/service/query_service.py +++ b/src/azul/service/query_service.py @@ -225,7 +225,7 @@ def _translate_filters(self, filters: FiltersJSON) -> TranslatedFilters: translated_filters[field] = {operator: list(values)} return translated_filters - def prepare_query(self, skip_field_paths: tuple[FieldPath] = ()) -> Query: + def prepare_query(self, skip_field_paths: tuple[FieldPath, ...] = ()) -> Query: """ Converts the given filters into an OpenSearch DSL Query object. """ From 9c7ff6f293eb26b551fa81022d1ea7dd06389f7b Mon Sep 17 00:00:00 2001 From: Noa Dove Date: Thu, 9 Apr 2026 15:59:21 -0700 Subject: [PATCH 4/9] Move misplaced comment --- src/azul/service/query_service.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/azul/service/query_service.py b/src/azul/service/query_service.py index b0780c6218..8fbacc97ea 100644 --- a/src/azul/service/query_service.py +++ b/src/azul/service/query_service.py @@ -590,8 +590,6 @@ def process_response(self, response: JSON) -> ResponseTriple: """ Returns hits and pagination as dict """ - # The slice is necessary because we may have fetched an extra entry to - # determine if there is a previous or next page. hits = self._extract_hits(response) hits = self._translate_hits(hits) pagination = self._process_pagination(response) @@ -599,6 +597,8 @@ def process_response(self, response: JSON) -> ResponseTriple: return hits, pagination, aggregations def _extract_hits(self, response): + # The slice is necessary because we may have fetched an extra entry to + # determine if there is a previous or next page. hits = response['hits']['hits'][0:self.pagination.size] if self.pagination.search_before is not None: hits = reversed(hits) From 3395aa14390c8ca242de65f087e1c98ee9eb26a4 Mon Sep 17 00:00:00 2001 From: Noa Dove Date: Tue, 7 Apr 2026 18:20:01 -0700 Subject: [PATCH 5/9] Extract local variables in _populate_accessible --- src/azul/service/query_service.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/azul/service/query_service.py b/src/azul/service/query_service.py index 8fbacc97ea..0ef6990d7a 100644 --- a/src/azul/service/query_service.py +++ b/src/azul/service/query_service.py @@ -412,10 +412,12 @@ def _populate_accessible(self, aggs: MutableJSON) -> None: special_fields = plugin.special_fields agg = aggs.pop(special_fields.source_id.name) counts_by_accessibility: dict[bool, int] = defaultdict(int) - for bucket in agg['myTerms']['buckets']: + terms = agg['myTerms'] + buckets = terms['buckets'] + for bucket in buckets: accessible = bucket['key'] in source_ids counts_by_accessibility[accessible] += bucket['doc_count'] - agg['myTerms']['buckets'] = [ + terms['buckets'] = [ {'key': accessible, 'doc_count': count} for accessible, count in counts_by_accessibility.items() ] From de9cd39c477cdba05a36a771697ab3b54bde799b Mon Sep 17 00:00:00 2001 From: Noa Dove Date: Tue, 7 Apr 2026 18:20:44 -0700 Subject: [PATCH 6/9] Refactor pagination response processing --- src/azul/service/query_service.py | 41 ++++++++++++++----------------- 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/src/azul/service/query_service.py b/src/azul/service/query_service.py index 0ef6990d7a..25223f5e6b 100644 --- a/src/azul/service/query_service.py +++ b/src/azul/service/query_service.py @@ -9,9 +9,6 @@ Iterable, Mapping, ) -from functools import ( - partial, -) import json import logging from typing import ( @@ -592,35 +589,35 @@ def process_response(self, response: JSON) -> ResponseTriple: """ Returns hits and pagination as dict """ - hits = self._extract_hits(response) + hits, total = self._extract_hits(response) + pagination = self._process_pagination(hits, total) hits = self._translate_hits(hits) - pagination = self._process_pagination(response) aggregations = response.get('aggregations', {}) return hits, pagination, aggregations - def _extract_hits(self, response): + def _extract_hits(self, response: JSON) -> tuple[JSONs, int]: + hits = response['hits'] + total = hits['total'] + # FIXME: Handle other relations + # https://github.com/DataBiosphere/azul/issues/3770 + assert total['relation'] == 'eq' + return hits['hits'], total['value'] + + def _translate_hits(self, hits: JSONs) -> JSONs: # The slice is necessary because we may have fetched an extra entry to # determine if there is a previous or next page. - hits = response['hits']['hits'][0:self.pagination.size] + hits = hits[0:self.pagination.size] if self.pagination.search_before is not None: hits = reversed(hits) - hits = [hit['_source'] for hit in hits] - return hits - - def _translate_hits(self, hits): - f = partial(self.service.translate_fields, self.catalog, forward=False) - hits = list(map(f, hits)) - return hits + return [ + self.service.translate_fields(self.catalog, hit['_source'], forward=False) + for hit in hits + ] - def _process_pagination(self, response: JSON) -> MutableJSON: - total = response['hits']['total'] - # FIXME: Handle other relations - # https://github.com/DataBiosphere/azul/issues/3770 - assert total['relation'] == 'eq' - pages = -(-total['value'] // self.pagination.size) + def _process_pagination(self, hits: JSONs, total: int) -> MutableJSON: + pages = -(-total // self.pagination.size) # ... else use search_after/search_before pagination - hits: JSONs = response['hits']['hits'] count = len(hits) if self.pagination.search_before is None: # hits are normal sorted @@ -656,7 +653,7 @@ def page_link(*, previous): return None if url is None else str(url) return ResponsePagination(count=count, - total=total['value'], + total=total, size=pagination.size, next=page_link(previous=False), previous=page_link(previous=True), From 510aa4b072a8bfc83acb5574306e44ea430ae10a Mon Sep 17 00:00:00 2001 From: Noa Dove Date: Tue, 7 Apr 2026 18:18:31 -0700 Subject: [PATCH 7/9] [p] Cover azul.service.query_service with mypy (#6821) --- .mypy.ini | 1 + src/azul/lib/types.py | 5 ++ src/azul/service/query_service.py | 101 ++++++++++++++++++------------ 3 files changed, 66 insertions(+), 41 deletions(-) diff --git a/.mypy.ini b/.mypy.ini index dbc569f195..898a5a6287 100644 --- a/.mypy.ini +++ b/.mypy.ini @@ -71,6 +71,7 @@ modules = azul.service.manifest_service, azul.field_type, azul.source, + azul.service.query_service, packages = diff --git a/src/azul/lib/types.py b/src/azul/lib/types.py index 4d7dcf8b7c..71b55a7ef6 100644 --- a/src/azul/lib/types.py +++ b/src/azul/lib/types.py @@ -132,6 +132,11 @@ def json_items_are_sequences_of_mappings(vs: AnyJSON) -> TypeGuard[Mapping[str, return True +def json_primitive(v: AnyJSON) -> PrimitiveJSON: + assert v is None or isinstance(v, (str, int, float, bool)), type(v) + return v + + def json_dict(v: AnyMutableJSON) -> MutableJSON: assert isinstance(v, dict), type(v) return v diff --git a/src/azul/service/query_service.py b/src/azul/service/query_service.py index 25223f5e6b..f9fad9a284 100644 --- a/src/azul/service/query_service.py +++ b/src/azul/service/query_service.py @@ -67,7 +67,17 @@ JSONs, MutableJSON, PrimitiveJSON, - json_list, + json_dict, + json_dict_of_dicts, + json_element_dicts, + json_element_strings, + json_int, + json_item_sequences, + json_list_of_dicts, + json_mapping, + json_primitive, + json_sequence, + json_sequence_of_mappings, json_str, ) from azul.opensearch import ( @@ -79,6 +89,7 @@ dotted, ) from azul.service import ( + FilterJSON, Filters, FiltersJSON, ) @@ -213,14 +224,20 @@ def _translate_filters(self, filters: FiltersJSON) -> TranslatedFilters: """ catalog = self.catalog field_mapping = self.plugin.field_mapping - translated_filters = {} - for field, filter in filters.items(): - field = field_mapping[field] + + def translate_filter(field_name: str, + filter: FilterJSON + ) -> tuple[FieldPath, Mapping[str, JSONArray]]: + field_path = field_mapping[field_name] operator, values = one(filter.items()) - field_type = self.service.field_type(catalog, field) - values = field_type.filter(operator, values) - translated_filters[field] = {operator: list(values)} - return translated_filters + field_type = self.service.field_type(catalog, field_path) + values: JSONArray = list(field_type.filter(operator, values)) + return field_path, {operator: values} + + return dict( + translate_filter(field, filter) + for field, filter in filters.items() + ) def prepare_query(self, skip_field_paths: tuple[FieldPath, ...] = ()) -> Query: """ @@ -229,14 +246,14 @@ def prepare_query(self, skip_field_paths: tuple[FieldPath, ...] = ()) -> Query: filter_list = [] for field_path, filter in self.prepared_filters.items(): if field_path not in skip_field_paths: - operator, values = one(filter.items()) + operator, values = one(json_item_sequences(filter)) # Note that `is_not` is only used internally (for filtering by # inaccessible sources) if operator in ('is', 'is_not'): field_type = self.service.field_type(self.catalog, field_path) if isinstance(field_type, Nested): term_queries = [] - for nested_field, nested_value in one(values).items(): + for nested_field, nested_value in json_mapping(one(values)).items(): nested_body = {dotted(field_path, nested_field, 'keyword'): nested_value} term_queries.append(Q('term', **nested_body)) query = Q('nested', path=dotted(field_path), query=Q('bool', must=term_queries)) @@ -255,7 +272,7 @@ def prepare_query(self, skip_field_paths: tuple[FieldPath, ...] = ()) -> Query: filter_list.append(query) elif operator in ('contains', 'within', 'intersects'): for value in values: - value = value | {'relation': operator} + value = {**json_mapping(value), 'relation': operator} filter_list.append(Q('range', **{dotted(field_path): value})) else: assert False @@ -304,7 +321,7 @@ def prepare_request(self, request: Search) -> Search: def process_response(self, response: MutableJSON) -> MutableJSON: try: - aggs = response['aggregations'] + aggs = json_dict(response['aggregations']) except KeyError: pass else: @@ -327,7 +344,7 @@ def _prepare_aggregation(self, *, facet: str, facet_path: FieldPath) -> Agg: nested_agg = agg.bucket(name='nested', agg_type='nested', path=dotted(facet_path)) - facet_path = dotted(facet_path, field_type.agg_property) + facet_path = (*facet_path, field_type.agg_property) else: nested_agg = agg # Make an inner agg that will contain the terms in question @@ -365,9 +382,9 @@ def annotate(agg: Agg): annotate(request.aggs[agg_name]) def _flatten_nested_aggs(self, aggs: MutableJSON): - for facet, agg in aggs.items(): + for facet, agg in json_dict_of_dicts(aggs).items(): try: - nested_agg = agg.pop('nested') + nested_agg = json_dict(agg.pop('nested')) except KeyError: pass else: @@ -379,26 +396,27 @@ def _translate_response_aggs(self, aggs: MutableJSON): OpenSearch response. """ - def translate(k, v: MutableJSON): + def translate(k: str, v: MutableJSON): try: buckets = v['buckets'] except KeyError: - for k, v in v.items(): - if isinstance(v, dict): - translate(k, v) + for ki, vi in v.items(): + if isinstance(vi, dict): + translate(ki, vi) else: try: - path = v['meta']['path'] + path = json_dict(v['meta'])['path'] except KeyError: pass else: - field_type = self.service.field_type(self.catalog, tuple(path)) - for bucket in buckets: + field_type = self.service.field_type(self.catalog, + tuple(json_element_strings(path))) + for bucket in json_element_dicts(buckets): bucket['key'] = field_type.from_index(bucket['key']) translate(k, bucket) for k, v in aggs.items(): - translate(k, v) + translate(k, json_dict(v)) def _populate_accessible(self, aggs: MutableJSON) -> None: # Because the value of the `accessible` field depends on the provided @@ -407,13 +425,13 @@ def _populate_accessible(self, aggs: MutableJSON) -> None: source_ids = self.filter_stage.filters.source_ids plugin = self.service.metadata_plugin(self.catalog) special_fields = plugin.special_fields - agg = aggs.pop(special_fields.source_id.name) + agg = json_dict(aggs.pop(special_fields.source_id.name)) counts_by_accessibility: dict[bool, int] = defaultdict(int) - terms = agg['myTerms'] - buckets = terms['buckets'] + terms = json_dict(agg['myTerms']) + buckets = json_list_of_dicts(terms['buckets']) for bucket in buckets: accessible = bucket['key'] in source_ids - counts_by_accessibility[accessible] += bucket['doc_count'] + counts_by_accessibility[accessible] += json_int(bucket['doc_count']) terms['buckets'] = [ {'key': accessible, 'doc_count': count} for accessible, count in counts_by_accessibility.items() @@ -464,8 +482,8 @@ def process_response(self, response: Response) -> MutableJSON: def sort_key_from_json(s: AnyJSON) -> SortKey: - a, b = json_list(s) - return a, json_str(b) + a, b = json_sequence(s) + return json_primitive(a), json_str(b) def sort_key_to_json(s: SortKey) -> AnyJSON: @@ -592,29 +610,30 @@ def process_response(self, response: JSON) -> ResponseTriple: hits, total = self._extract_hits(response) pagination = self._process_pagination(hits, total) hits = self._translate_hits(hits) - aggregations = response.get('aggregations', {}) + aggregations = json_mapping(response.get('aggregations', {})) return hits, pagination, aggregations def _extract_hits(self, response: JSON) -> tuple[JSONs, int]: - hits = response['hits'] - total = hits['total'] + hits = json_mapping(response['hits']) + total = json_mapping(hits['total']) # FIXME: Handle other relations # https://github.com/DataBiosphere/azul/issues/3770 assert total['relation'] == 'eq' - return hits['hits'], total['value'] + return json_sequence_of_mappings(hits['hits']), json_int(total['value']) def _translate_hits(self, hits: JSONs) -> JSONs: # The slice is necessary because we may have fetched an extra entry to # determine if there is a previous or next page. hits = hits[0:self.pagination.size] - if self.pagination.search_before is not None: - hits = reversed(hits) + hits = iter(hits) if self.pagination.search_before is None else reversed(hits) return [ - self.service.translate_fields(self.catalog, hit['_source'], forward=False) + self.service.translate_fields(self.catalog, + json_mapping(hit['_source']), + forward=False) for hit in hits ] - def _process_pagination(self, hits: JSONs, total: int) -> MutableJSON: + def _process_pagination(self, hits: JSONs, total: int) -> ResponsePagination: pages = -(-total // self.pagination.size) # ... else use search_after/search_before pagination @@ -624,12 +643,12 @@ def _process_pagination(self, hits: JSONs, total: int) -> MutableJSON: if count > self.pagination.size: # There is an extra hit, indicating a next page. count -= 1 - search_after = tuple(hits[count - 1]['sort']) + search_after = sort_key_from_json(hits[count - 1]['sort']) else: # No next page search_after = None if self.pagination.search_after is not None: - search_before = tuple(hits[0]['sort']) + search_before = sort_key_from_json(hits[0]['sort']) else: search_before = None else: @@ -637,11 +656,11 @@ def _process_pagination(self, hits: JSONs, total: int) -> MutableJSON: if count > self.pagination.size: # There is an extra hit, indicating a previous page. count -= 1 - search_before = tuple(hits[count - 1]['sort']) + search_before = sort_key_from_json(hits[count - 1]['sort']) else: # No previous page search_before = None - search_after = tuple(hits[0]['sort']) + search_after = sort_key_from_json(hits[0]['sort']) pagination = self.pagination.advance(search_before=search_before, search_after=search_after) From 03a9859601b2ab4310764ecd86f58b630bfb22f3 Mon Sep 17 00:00:00 2001 From: Noa Dove Date: Thu, 9 Apr 2026 16:06:16 -0700 Subject: [PATCH 8/9] fixup! [p] Cover azul.service.query_service with mypy (#6821) --- src/azul/lib/types.py | 10 +++++----- src/azul/service/query_service.py | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/azul/lib/types.py b/src/azul/lib/types.py index 71b55a7ef6..7eba3e5990 100644 --- a/src/azul/lib/types.py +++ b/src/azul/lib/types.py @@ -132,11 +132,6 @@ def json_items_are_sequences_of_mappings(vs: AnyJSON) -> TypeGuard[Mapping[str, return True -def json_primitive(v: AnyJSON) -> PrimitiveJSON: - assert v is None or isinstance(v, (str, int, float, bool)), type(v) - return v - - def json_dict(v: AnyMutableJSON) -> MutableJSON: assert isinstance(v, dict), type(v) return v @@ -199,6 +194,11 @@ def json_sorted(vs: Iterable[PrimitiveJSON]) -> MutableJSONArray: return sorted(vs, key=none_safe_key(none_last=True)) +def json_primitive(v: AnyJSON) -> PrimitiveJSON: + assert v is None or isinstance(v, (str, int, float, bool)), type(v) + return v + + def json_str(v: AnyMutableJSON | AnyJSON) -> str: return any_str(v) diff --git a/src/azul/service/query_service.py b/src/azul/service/query_service.py index f9fad9a284..fd9dce937a 100644 --- a/src/azul/service/query_service.py +++ b/src/azul/service/query_service.py @@ -549,12 +549,12 @@ class PaginationStage(_OpenSearchStage[JSON, ResponseTriple]): def prepare_request(self, request: Search) -> Search: sort_order = self.pagination.order - sort_field = self.plugin.field_mapping[self.pagination.sort] - field_type = self.service.field_type(self.catalog, sort_field) + sort_field_path = self.plugin.field_mapping[self.pagination.sort] + field_type = self.service.field_type(self.catalog, sort_field_path) sort_mode = field_type.es_sort_mode - sort_field = dotted(sort_field, 'keyword') + sort_field = dotted(sort_field_path, 'keyword') - def sort(order): + def sort(order: str) -> tuple[JSON, JSON]: assert order in ('asc', 'desc'), order return ( { @@ -665,7 +665,7 @@ def _process_pagination(self, hits: JSONs, total: int) -> ResponsePagination: pagination = self.pagination.advance(search_before=search_before, search_after=search_after) - def page_link(*, previous): + def page_link(*, previous: bool) -> str | None: url = pagination.link(previous=previous, catalog=self.catalog, filters=json.dumps(self.filters.explicit)) From 59295f06253815aa0b379f76971a1c21b2a0fc8d Mon Sep 17 00:00:00 2001 From: Noa Dove Date: Tue, 7 Apr 2026 18:27:15 -0700 Subject: [PATCH 9/9] Suppress mypy error and add FIXME (#6821) --- src/azul/service/query_service.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/azul/service/query_service.py b/src/azul/service/query_service.py index fd9dce937a..feb4a8fbf0 100644 --- a/src/azul/service/query_service.py +++ b/src/azul/service/query_service.py @@ -231,7 +231,9 @@ def translate_filter(field_name: str, field_path = field_mapping[field_name] operator, values = one(filter.items()) field_type = self.service.field_type(catalog, field_path) - values: JSONArray = list(field_type.filter(operator, values)) + # FIXME: remove `type: ignore` + # https://github.com/DataBiosphere/azul/issues/6821 + values: JSONArray = list(field_type.filter(operator, values)) # type: ignore return field_path, {operator: values} return dict(