From 7d59a03ca4fa1035a924e616e91f3a58c15317be Mon Sep 17 00:00:00 2001 From: Noa Dove Date: Tue, 10 Mar 2026 16:21:04 -0700 Subject: [PATCH 1/8] Add StorageService.delete_prefix --- src/azul/indexer/mirror_service.py | 6 +----- src/azul/service/storage_service.py | 6 ++++++ 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/azul/indexer/mirror_service.py b/src/azul/indexer/mirror_service.py index 32cbf69e02..75e961e05a 100644 --- a/src/azul/indexer/mirror_service.py +++ b/src/azul/indexer/mirror_service.py @@ -524,11 +524,7 @@ def delete_it_files(self): """ assert self.catalog in config.integration_test_catalogs, R( 'Not an IT catalog', self.catalog) - prefix = self._mirror_prefix - assert len(prefix) > 1 and prefix.endswith('/'), prefix - object_keys = self._storage.list_objects(prefix) - assert len(object_keys) <= 300, R('Too many objects', len(object_keys)) - self._storage.delete_objects(object_keys, batch_size=100) + self._storage.delete_prefix(self._mirror_prefix) @attrs.frozen(kw_only=True, slots=False) diff --git a/src/azul/service/storage_service.py b/src/azul/service/storage_service.py index 2adab0adf0..ef8ccc0568 100644 --- a/src/azul/service/storage_service.py +++ b/src/azul/service/storage_service.py @@ -206,6 +206,12 @@ def delete_objects(self, self._s3.delete_objects(**request) log.info('Deleted %d objects overall', num_keys) + def delete_prefix(self, prefix: str) -> None: + assert len(prefix) > 1 and prefix.endswith('/'), prefix + object_keys = self.list_objects(prefix) + assert len(object_keys) <= 300, R('Too many objects', len(object_keys)) + self.delete_objects(object_keys, batch_size=100) + def list_objects(self, prefix: str) -> OrderedSet[str]: keys: OrderedSet[str] = OrderedSet() num_keys = 0 From 54bded161767275533fb5669cf52ae7577c80c1d Mon Sep 17 00:00:00 2001 From: Noa Dove Date: Tue, 10 Mar 2026 16:22:13 -0700 Subject: [PATCH 2/8] Extract classmethod --- src/azul/service/manifest_service.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/azul/service/manifest_service.py b/src/azul/service/manifest_service.py index 8d248e5b95..302905369d 100644 --- a/src/azul/service/manifest_service.py +++ b/src/azul/service/manifest_service.py @@ -1005,9 +1005,13 @@ def manifest_key(self) -> ManifestKey: manifest_hash=manifest_hash, source_hash=source_hash) + @classmethod + def _manifest_prefix(cls) -> str: + return 'manifests/' + @classmethod def s3_object_key(cls, manifest_key: ManifestKey) -> str: - return 'manifests' + '/' + cls.s3_object_key_base(manifest_key) + return cls._manifest_prefix() + cls.s3_object_key_base(manifest_key) @classmethod def s3_object_key_base(cls, manifest_key: ManifestKey) -> str: From 077e0edc7ad856d4adf610d0be78bb35a1b2f35a Mon Sep 17 00:00:00 2001 From: Noa Dove Date: Tue, 10 Mar 2026 15:26:37 -0700 Subject: [PATCH 3/8] Extract base classes --- src/azul/service/manifest_service.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/src/azul/service/manifest_service.py b/src/azul/service/manifest_service.py index 302905369d..faa54de0a8 100644 --- a/src/azul/service/manifest_service.py +++ b/src/azul/service/manifest_service.py @@ -571,14 +571,17 @@ class CachedManifestNotFound(Exception): manifest_key: ManifestKey -@attrs.frozen(kw_only=True) -class ManifestService(QueryService): - file_url_func: FileUrlFunc +class BaseManifestService: @cached_property def storage_service(self) -> StorageService: return StorageService() + +@attrs.frozen(kw_only=True) +class ManifestService(BaseManifestService, QueryService): + file_url_func: FileUrlFunc + def get_manifest(self, *, format: ManifestFormat, @@ -785,7 +788,14 @@ def command_lines(self, type Cells = dict[str, str] -class ManifestGenerator(metaclass=ABCMeta): +class ManifestAccessor: + + @classmethod + def _manifest_prefix(cls) -> str: + return 'manifests/' + + +class ManifestGenerator(ManifestAccessor, metaclass=ABCMeta): """ A generator for manifests. A manifest is an exhaustive representation of the documents in the aggregate index for a particular entity type. The @@ -1005,10 +1015,6 @@ def manifest_key(self) -> ManifestKey: manifest_hash=manifest_hash, source_hash=source_hash) - @classmethod - def _manifest_prefix(cls) -> str: - return 'manifests/' - @classmethod def s3_object_key(cls, manifest_key: ManifestKey) -> str: return cls._manifest_prefix() + cls.s3_object_key_base(manifest_key) From a07493d9d7c28e7c569615ca82d2a553c1d37051 Mon Sep 17 00:00:00 2001 From: Noa Dove Date: Tue, 10 Mar 2026 15:00:04 -0700 Subject: [PATCH 4/8] Use '_it/' prefix for manifests --- src/azul/service/manifest_service.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/azul/service/manifest_service.py b/src/azul/service/manifest_service.py index faa54de0a8..d0765b5387 100644 --- a/src/azul/service/manifest_service.py +++ b/src/azul/service/manifest_service.py @@ -121,6 +121,7 @@ ) from azul.lib.functions import ( compose, + iif, ) from azul.lib.json import ( copy_json, @@ -791,8 +792,8 @@ def command_lines(self, class ManifestAccessor: @classmethod - def _manifest_prefix(cls) -> str: - return 'manifests/' + def _manifest_prefix(cls, is_it: bool) -> str: + return 'manifests/' + iif(is_it, '_it/') class ManifestGenerator(ManifestAccessor, metaclass=ABCMeta): @@ -1017,7 +1018,8 @@ def manifest_key(self) -> ManifestKey: @classmethod def s3_object_key(cls, manifest_key: ManifestKey) -> str: - return cls._manifest_prefix() + cls.s3_object_key_base(manifest_key) + is_it = config.catalogs[manifest_key.catalog].is_integration_test_catalog + return cls._manifest_prefix(is_it) + cls.s3_object_key_base(manifest_key) @classmethod def s3_object_key_base(cls, manifest_key: ManifestKey) -> str: From 72576fc52cc74a4161def44321bfed361280d79f Mon Sep 17 00:00:00 2001 From: Noa Dove Date: Tue, 10 Mar 2026 16:28:58 -0700 Subject: [PATCH 5/8] Pull up property to base class --- src/azul/service/manifest_service.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/azul/service/manifest_service.py b/src/azul/service/manifest_service.py index d0765b5387..c61d66b9a3 100644 --- a/src/azul/service/manifest_service.py +++ b/src/azul/service/manifest_service.py @@ -790,6 +790,14 @@ def command_lines(self, class ManifestAccessor: + service: BaseManifestService + + def __init__(self, service: BaseManifestService): + self.service = service + + @property + def storage(self) -> StorageService: + return self.service.storage_service @classmethod def _manifest_prefix(cls, is_it: bool) -> str: @@ -808,6 +816,8 @@ class ManifestGenerator(ManifestAccessor, metaclass=ABCMeta): # descendants must be inexpensive. If a property getter performs and # expensive computation or I/O, it should cache its return value. + service: ManifestService + @classmethod @abstractmethod def format(cls) -> ManifestFormat: @@ -970,8 +980,7 @@ def __init__(self, :param service: the service to use when querying the index """ - super().__init__() - self.service = service + super().__init__(service) self.catalog = catalog self.filters = filters self.file_url_func = service.file_url_func @@ -1276,10 +1285,6 @@ def write(self, """ raise NotImplementedError - @property - def storage(self) -> StorageService: - return self.service.storage_service - class ClientSidePagingManifestGenerator(ManifestGenerator, metaclass=ABCMeta): """ From 0cad66d9c88a341db8e3042d64639bba6efe593f Mon Sep 17 00:00:00 2001 From: Noa Dove Date: Thu, 9 Apr 2026 20:57:59 -0700 Subject: [PATCH 6/8] Fix typo --- test/integration_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration_test.py b/test/integration_test.py index 42784b768e..dd395a2d22 100644 --- a/test/integration_test.py +++ b/test/integration_test.py @@ -649,7 +649,7 @@ def worker(_): self.assertEqual(2, len(execution_ids)) elif i == 2: # Only fetch mode changed, cached manifest will be used, - # and no additional executions are expectect + # and no additional executions are expected self.assertEqual(2, len(execution_ids)) else: assert False From f2eb145c789da09d43ba428f674df0bdc4fbeb85 Mon Sep 17 00:00:00 2001 From: Noa Dove Date: Tue, 10 Mar 2026 16:29:40 -0700 Subject: [PATCH 7/8] Clean up cached manifests before and after IT (#7835) --- src/azul/service/manifest_service.py | 3 +++ test/integration_test.py | 13 +++++++++++++ 2 files changed, 16 insertions(+) diff --git a/src/azul/service/manifest_service.py b/src/azul/service/manifest_service.py index c61d66b9a3..4c279741e5 100644 --- a/src/azul/service/manifest_service.py +++ b/src/azul/service/manifest_service.py @@ -803,6 +803,9 @@ def storage(self) -> StorageService: def _manifest_prefix(cls, is_it: bool) -> str: return 'manifests/' + iif(is_it, '_it/') + def delete_it_files(self): + self.storage.delete_prefix(self._manifest_prefix(is_it=True)) + class ManifestGenerator(ManifestAccessor, metaclass=ABCMeta): """ diff --git a/test/integration_test.py b/test/integration_test.py index dd395a2d22..86edea0184 100644 --- a/test/integration_test.py +++ b/test/integration_test.py @@ -169,6 +169,8 @@ Token, ) from azul.service.manifest_service import ( + BaseManifestService, + ManifestAccessor, ManifestFormat, ManifestGenerator, ) @@ -470,6 +472,8 @@ class Catalog: for flag in ['index', 'delete', 'mirror'] ] + manifest_accessor = ManifestAccessor(BaseManifestService()) + self._assert_queues_empty(config.indexer_fail_queue_names) if index: self._reset_indexer() @@ -518,6 +522,12 @@ class Catalog: bundle_fqids=catalog.bundles) self._test_single_entity_response(catalog=catalog.name) + # `test_manifest` and `test_manifest_tagging_race` assert how many times + # the step function is executed when retrieving the manifests, with the + # expectation that there are no pre-existing cached manifests. This + # deletion is necessary to enforce that expectation, especially when + # performing consecutive IT runs with the same seed. + manifest_accessor.delete_it_files() for catalog in catalogs: self._test_manifest(catalog.name) self._test_manifest_tagging_race(catalog.name) @@ -527,6 +537,9 @@ class Catalog: public_source=catalog.public_source, ma_source=catalog.ma_source) + if delete: + manifest_accessor.delete_it_files() + if mirror and config.enable_mirroring: self._test_mirroring(delete=delete) From d48ebee1077c07f114a42ce983f237bf59878b46 Mon Sep 17 00:00:00 2001 From: Noa Dove Date: Thu, 9 Apr 2026 20:57:34 -0700 Subject: [PATCH 8/8] Compensate for stale executions in manifest IT (#7835) --- test/integration_test.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/test/integration_test.py b/test/integration_test.py index 86edea0184..65d263ce68 100644 --- a/test/integration_test.py +++ b/test/integration_test.py @@ -618,6 +618,7 @@ def _test_manifest(self, catalog: CatalogName): filters = self._manifest_filters(catalog) execution_ids = set() coin_flip = bool(self.random.getrandbits(1)) + num_old_executions = 0 for i, fetch in enumerate([coin_flip, coin_flip, not coin_flip]): with self.subTest('manifest', catalog=catalog, format=format, i=i, fetch=fetch): args = dict(catalog=catalog, filters=json.dumps(filters)) @@ -655,15 +656,18 @@ def worker(_): bucket, key = one(self._manifest_objects(responses)) if i == 0: aws.s3.delete_object(Bucket=bucket, Key=key) - # One execution to generate the manifest - self.assertEqual(1, len(execution_ids)) + # One execution to generate the manifest. However, if + # this test was recently run using the same seed, + # previous executions will be tracked in the token. + self.assertLessEqual(1, len(execution_ids)) + num_old_executions = len(execution_ids) - 1 elif i == 1: # One more execution to re-generate the manifest - self.assertEqual(2, len(execution_ids)) + self.assertEqual(num_old_executions + 2, len(execution_ids)) elif i == 2: # Only fetch mode changed, cached manifest will be used, # and no additional executions are expected - self.assertEqual(2, len(execution_ids)) + self.assertEqual(num_old_executions + 2, len(execution_ids)) else: assert False