diff --git a/attic/scripts/delete_mirrored_files.py b/attic/scripts/delete_mirrored_files.py new file mode 100644 index 0000000000..f063c5d45f --- /dev/null +++ b/attic/scripts/delete_mirrored_files.py @@ -0,0 +1,66 @@ +from collections.abc import ( + Iterable, + Sequence, +) +import csv +import logging +import sys + +from more_itertools import ( + one, +) + +from azul import ( + CatalogName, + config, +) +from azul.azulclient import ( + AzulClient, +) +from azul.indexer.mirror_service import ( + MirrorService, +) +from azul.lib import ( + R, +) +from azul.logging import ( + configure_script_logging, +) +from azul.service.storage_service import ( + StorageService, +) + +log = logging.getLogger(__name__) + + +def delete_files(catalog: CatalogName, diff: Iterable[tuple[str, str]]): + assert False + checksums, sizes = zip(*diff) + mirror_service: MirrorService = AzulClient().mirror_service(catalog) + service: StorageService = mirror_service._storage + keys = set() + assert config.is_anvil_enabled(catalog) + for checksum in checksums: + keys.add(f'file/{checksum}.md5') + keys.add(f'info/{checksum}.json') + + assert len(keys) == 2 * len(checksums), R('There are duplicate checksums') + total_size = sum(map(int, sizes)) + print('This will permanently delete', len(checksums), 'files from', + service.bucket_name, 'totalling', total_size, 'bytes. Proceed? (y/N)') + if input() == 'y': + service.delete_objects(object_keys=keys) + else: + print('Cancelled.') + + +def main(argv: Sequence[str]): + path = one(argv) + with open(path) as f: + reader = csv.reader(f, delimiter='\t') + delete_files(config.default_catalog, reader) + + +if __name__ == '__main__': + configure_script_logging(log) + main(sys.argv[1:]) diff --git a/src/azul/service/storage_service.py b/src/azul/service/storage_service.py index 2adab0adf0..47e0366c66 100644 --- a/src/azul/service/storage_service.py +++ b/src/azul/service/storage_service.py @@ -14,9 +14,7 @@ from email.utils import ( parsedate_to_datetime, ) -from logging import ( - getLogger, -) +import logging import random import time from typing import ( @@ -58,14 +56,13 @@ CompleteMultipartUploadRequestTypeDef, CompletedPartTypeDef, CreateMultipartUploadRequestTypeDef, - DeleteObjectsRequestTypeDef, GetObjectOutputTypeDef, HeadObjectOutputTypeDef, PutObjectRequestTypeDef, PutObjectTaggingRequestTypeDef, ) -log = getLogger(__name__) +log = logging.getLogger(__name__) Tagging = Mapping[str, str] @@ -196,15 +193,38 @@ def delete_objects(self, object_keys: Collection[str], batch_size: int = 1000 ) -> None: + """ + Delete the objects with the given keys, in batches of the given size. + This method is idempotent: passing the key of an object that was already + deleted, or that never existed, will not cause an error. This method is + not atomic: a requirement error will be raised for the first batch with + an object that failed to be deleted, and any subsequent batches will be + ignored. + + :param object_keys: a collection of keys of objects to be deleted + + :param batch_size: the number of objects to delete per request + """ assert batch_size <= 1000, R('Batch size must <= 1000', batch_size) - num_keys = len(object_keys) + num_keys, num_deleted = len(object_keys), 0 for batch in chunked(object_keys, batch_size): - log.debug('Deleting batch of objects: %r', batch) - request: DeleteObjectsRequestTypeDef - request = dict(Bucket=self.bucket_name, - Delete=dict(Objects=[dict(Key=key) for key in batch])) - self._s3.delete_objects(**request) - log.info('Deleted %d objects overall', num_keys) + if log.isEnabledFor(logging.DEBUG): + log.debug('Deleting batch of objects: %r', batch) + else: + log.info('Deleting batch of %d object(s)', len(batch)) + response = self._s3.delete_objects( + Bucket=self.bucket_name, + Delete=dict(Quiet=True, + Objects=[dict(Key=key) for key in batch]) + ) + try: + errors = response['Errors'] + except KeyError: + num_deleted += len(batch) + else: + assert False, R('Failed to delete some objects', errors) + assert num_deleted == num_keys, (num_deleted, num_keys) + log.info('Deleted %d objects overall', num_deleted) def list_objects(self, prefix: str) -> OrderedSet[str]: keys: OrderedSet[str] = OrderedSet()