diff --git a/src/azul/indexer/mirror_service.py b/src/azul/indexer/mirror_service.py index 32cbf69e02..75e961e05a 100644 --- a/src/azul/indexer/mirror_service.py +++ b/src/azul/indexer/mirror_service.py @@ -524,11 +524,7 @@ def delete_it_files(self): """ assert self.catalog in config.integration_test_catalogs, R( 'Not an IT catalog', self.catalog) - prefix = self._mirror_prefix - assert len(prefix) > 1 and prefix.endswith('/'), prefix - object_keys = self._storage.list_objects(prefix) - assert len(object_keys) <= 300, R('Too many objects', len(object_keys)) - self._storage.delete_objects(object_keys, batch_size=100) + self._storage.delete_prefix(self._mirror_prefix) @attrs.frozen(kw_only=True, slots=False) diff --git a/src/azul/service/manifest_service.py b/src/azul/service/manifest_service.py index 8d248e5b95..4c279741e5 100644 --- a/src/azul/service/manifest_service.py +++ b/src/azul/service/manifest_service.py @@ -121,6 +121,7 @@ ) from azul.lib.functions import ( compose, + iif, ) from azul.lib.json import ( copy_json, @@ -571,14 +572,17 @@ class CachedManifestNotFound(Exception): manifest_key: ManifestKey -@attrs.frozen(kw_only=True) -class ManifestService(QueryService): - file_url_func: FileUrlFunc +class BaseManifestService: @cached_property def storage_service(self) -> StorageService: return StorageService() + +@attrs.frozen(kw_only=True) +class ManifestService(BaseManifestService, QueryService): + file_url_func: FileUrlFunc + def get_manifest(self, *, format: ManifestFormat, @@ -785,7 +789,25 @@ def command_lines(self, type Cells = dict[str, str] -class ManifestGenerator(metaclass=ABCMeta): +class ManifestAccessor: + service: BaseManifestService + + def __init__(self, service: BaseManifestService): + self.service = service + + @property + def storage(self) -> StorageService: + return self.service.storage_service + + @classmethod + def _manifest_prefix(cls, is_it: bool) -> str: + return 'manifests/' + iif(is_it, '_it/') + + def delete_it_files(self): + self.storage.delete_prefix(self._manifest_prefix(is_it=True)) + + +class ManifestGenerator(ManifestAccessor, metaclass=ABCMeta): """ A generator for manifests. A manifest is an exhaustive representation of the documents in the aggregate index for a particular entity type. The @@ -797,6 +819,8 @@ class ManifestGenerator(metaclass=ABCMeta): # descendants must be inexpensive. If a property getter performs and # expensive computation or I/O, it should cache its return value. + service: ManifestService + @classmethod @abstractmethod def format(cls) -> ManifestFormat: @@ -959,8 +983,7 @@ def __init__(self, :param service: the service to use when querying the index """ - super().__init__() - self.service = service + super().__init__(service) self.catalog = catalog self.filters = filters self.file_url_func = service.file_url_func @@ -1007,7 +1030,8 @@ def manifest_key(self) -> ManifestKey: @classmethod def s3_object_key(cls, manifest_key: ManifestKey) -> str: - return 'manifests' + '/' + cls.s3_object_key_base(manifest_key) + is_it = config.catalogs[manifest_key.catalog].is_integration_test_catalog + return cls._manifest_prefix(is_it) + cls.s3_object_key_base(manifest_key) @classmethod def s3_object_key_base(cls, manifest_key: ManifestKey) -> str: @@ -1264,10 +1288,6 @@ def write(self, """ raise NotImplementedError - @property - def storage(self) -> StorageService: - return self.service.storage_service - class ClientSidePagingManifestGenerator(ManifestGenerator, metaclass=ABCMeta): """ diff --git a/src/azul/service/storage_service.py b/src/azul/service/storage_service.py index 2adab0adf0..ef8ccc0568 100644 --- a/src/azul/service/storage_service.py +++ b/src/azul/service/storage_service.py @@ -206,6 +206,12 @@ def delete_objects(self, self._s3.delete_objects(**request) log.info('Deleted %d objects overall', num_keys) + def delete_prefix(self, prefix: str) -> None: + assert len(prefix) > 1 and prefix.endswith('/'), prefix + object_keys = self.list_objects(prefix) + assert len(object_keys) <= 300, R('Too many objects', len(object_keys)) + self.delete_objects(object_keys, batch_size=100) + def list_objects(self, prefix: str) -> OrderedSet[str]: keys: OrderedSet[str] = OrderedSet() num_keys = 0 diff --git a/test/integration_test.py b/test/integration_test.py index 42784b768e..65d263ce68 100644 --- a/test/integration_test.py +++ b/test/integration_test.py @@ -169,6 +169,8 @@ Token, ) from azul.service.manifest_service import ( + BaseManifestService, + ManifestAccessor, ManifestFormat, ManifestGenerator, ) @@ -470,6 +472,8 @@ class Catalog: for flag in ['index', 'delete', 'mirror'] ] + manifest_accessor = ManifestAccessor(BaseManifestService()) + self._assert_queues_empty(config.indexer_fail_queue_names) if index: self._reset_indexer() @@ -518,6 +522,12 @@ class Catalog: bundle_fqids=catalog.bundles) self._test_single_entity_response(catalog=catalog.name) + # `test_manifest` and `test_manifest_tagging_race` assert how many times + # the step function is executed when retrieving the manifests, with the + # expectation that there are no pre-existing cached manifests. This + # deletion is necessary to enforce that expectation, especially when + # performing consecutive IT runs with the same seed. + manifest_accessor.delete_it_files() for catalog in catalogs: self._test_manifest(catalog.name) self._test_manifest_tagging_race(catalog.name) @@ -527,6 +537,9 @@ class Catalog: public_source=catalog.public_source, ma_source=catalog.ma_source) + if delete: + manifest_accessor.delete_it_files() + if mirror and config.enable_mirroring: self._test_mirroring(delete=delete) @@ -605,6 +618,7 @@ def _test_manifest(self, catalog: CatalogName): filters = self._manifest_filters(catalog) execution_ids = set() coin_flip = bool(self.random.getrandbits(1)) + num_old_executions = 0 for i, fetch in enumerate([coin_flip, coin_flip, not coin_flip]): with self.subTest('manifest', catalog=catalog, format=format, i=i, fetch=fetch): args = dict(catalog=catalog, filters=json.dumps(filters)) @@ -642,15 +656,18 @@ def worker(_): bucket, key = one(self._manifest_objects(responses)) if i == 0: aws.s3.delete_object(Bucket=bucket, Key=key) - # One execution to generate the manifest - self.assertEqual(1, len(execution_ids)) + # One execution to generate the manifest. However, if + # this test was recently run using the same seed, + # previous executions will be tracked in the token. + self.assertLessEqual(1, len(execution_ids)) + num_old_executions = len(execution_ids) - 1 elif i == 1: # One more execution to re-generate the manifest - self.assertEqual(2, len(execution_ids)) + self.assertEqual(num_old_executions + 2, len(execution_ids)) elif i == 2: # Only fetch mode changed, cached manifest will be used, - # and no additional executions are expectect - self.assertEqual(2, len(execution_ids)) + # and no additional executions are expected + self.assertEqual(num_old_executions + 2, len(execution_ids)) else: assert False