diff --git a/README.md b/README.md index 9d968c54..9a99f74f 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,8 @@ Command line tool that allows the user to execute data operations on the platfor | 3.15.0 | 2.14.2 | 2.14.2 | | 3.15.1 | 2.14.2 | 2.14.2 | | 3.15.2 | 2.15 | 2.15 | +| 3.16.0 | 2.15 | 2.15 | +| 3.17.0 | 2.15 | 2.15 | ### Build Instructions 1. Each system has its own credential, so building should be done after the updated the env file. diff --git a/app/commands/file.py b/app/commands/file.py index 9972678c..0e2d2323 100644 --- a/app/commands/file.py +++ b/app/commands/file.py @@ -288,7 +288,7 @@ def file_resume(**kwargs): # noqa: C901 # since only file upload can attach manifest, take the first file object srv_manifest = SrvFileManifests() - item_id = next(iter(resumable_manifest.get('file_objects'))) + item_id = next(iter(resumable_manifest.get('registered_items'))) attribute = resumable_manifest.get('attributes') zone = resumable_manifest.get('zone') srv_manifest.attach_manifest(attribute, item_id, zone) if attribute else None diff --git a/app/services/file_manager/file_upload/file_upload.py b/app/services/file_manager/file_upload/file_upload.py index 2f278ad0..dbc084ca 100644 --- a/app/services/file_manager/file_upload/file_upload.py +++ b/app/services/file_manager/file_upload/file_upload.py @@ -119,6 +119,58 @@ def assemble_path( return current_folder_node, parent_folder, create_folder_flag, target_folder +def item_duplication_check( + create_folder_flag: bool, file_objects: List[FileObject], upload_client: UploadClient +) -> List[FileObject]: + ''' + Summary: + The function will check if the file is already uploaded on the platform. + If the file is already uploaded, the function will ask if to skip the file. + Otherwise, the function will exit + Parameter: + - create_folder_flag(bool): the flag to indicate if need to create new folder + - file_objects(List[FileObject]): the list of file object + - upload_client(UploadClient): the upload client object + Return: + - non_duplicate_file_objects(List[FileObject]): the list of file object that is not duplicated + ''' + + # make the file duplication check to allow folde merging + logger.info('Start checking file duplication') + non_duplicate_file_objects = [] + if create_folder_flag is True: + non_duplicate_file_objects = file_objects + else: + mhandler.SrvOutPutHandler.file_duplication_check() + duplicated_file = [] + debug_logger.debug(f'upload batch size: {AppConfig.Env.upload_batch_size}') + for file_batchs in batch_generator(file_objects, batch_size=AppConfig.Env.upload_batch_size): + start_time = time.time() + non_duplicates, duplicate_path = upload_client.check_upload_duplication(file_batchs) + debug_logger.debug(f'Check duplication time: {time.time() - start_time:.2f}s') + non_duplicate_file_objects.extend(non_duplicates) + duplicated_file.extend(duplicate_path) + + # if all file objects we check are already existed on the platform + # then we will exit the upload process + if len(non_duplicate_file_objects) == 0 and len(file_objects) != 0: + mhandler.SrvOutPutHandler.file_duplication_check_warning_with_all_same() + SrvErrorHandler.customized_handle(ECustomizedError.UPLOAD_CANCEL, if_exit=True) + elif len(duplicated_file) > 0: + mhandler.SrvOutPutHandler.file_duplication_check_success() + duplicate_warning_format = '\n'.join(duplicated_file) + try: + click.confirm( + customized_error_msg(ECustomizedError.UPLOAD_SKIP_DUPLICATION) % (duplicate_warning_format), + abort=True, + ) + except Abort: + mhandler.SrvOutPutHandler.cancel_upload() + exit(1) + + return non_duplicate_file_objects + + def simple_upload( # noqa: C901 upload_event, num_of_thread: int = 1, @@ -191,35 +243,8 @@ def simple_upload( # noqa: C901 else: file_objects.append(file_object) - # make the file duplication check to allow folde merging - non_duplicate_file_objects = [] - if create_folder_flag is True: - non_duplicate_file_objects = file_objects - else: - mhandler.SrvOutPutHandler.file_duplication_check() - duplicated_file = [] - debug_logger.debug(f'upload batch size: {AppConfig.Env.upload_batch_size}') - for file_batchs in batch_generator(file_objects, batch_size=AppConfig.Env.upload_batch_size): - start_time = time.time() - non_duplicates, duplicate_path = upload_client.check_upload_duplication(file_batchs) - debug_logger.debug(f'Check duplication time: {time.time() - start_time:.2f}s') - non_duplicate_file_objects.extend(non_duplicates) - duplicated_file.extend(duplicate_path) - - if len(non_duplicate_file_objects) == 0: - mhandler.SrvOutPutHandler.file_duplication_check_warning_with_all_same() - SrvErrorHandler.customized_handle(ECustomizedError.UPLOAD_CANCEL, if_exit=True) - elif len(duplicated_file) > 0: - mhandler.SrvOutPutHandler.file_duplication_check_success() - duplicate_warning_format = '\n'.join(duplicated_file) - try: - click.confirm( - customized_error_msg(ECustomizedError.UPLOAD_SKIP_DUPLICATION) % (duplicate_warning_format), - abort=True, - ) - except Abort: - mhandler.SrvOutPutHandler.cancel_upload() - exit(1) + # make the file duplication check to allow folder merging + non_duplicate_file_objects = item_duplication_check(create_folder_flag, file_objects, upload_client) # here is list of pre upload result. We decided to call pre upload api by batch pre_upload_infos = [] @@ -228,8 +253,11 @@ def simple_upload( # noqa: C901 # the placeholder in object storage pre_upload_infos.extend(upload_client.pre_upload(file_batchs, output_path)) - # then output manifest file to the output path - upload_client.output_manifest(pre_upload_infos, output_path) + # then output manifest file to the output path AFTER EACH BATCH + # which will include unfinished objects + upload_client.output_manifest( + pre_upload_infos, non_duplicate_file_objects[len(pre_upload_infos) + 1 :], output_path + ) # now loop over each file under the folder and start # the chunk upload @@ -299,12 +327,12 @@ def resume_get_unfinished_items( elif x.get('result').get('status') == ItemStatus.REGISTERED: file_info = all_files.get(file_meta.get('id')) # check if size is matched during resume vs preupload + local_file_size = os.path.getsize(file_info.get('local_path')) logger.info( f'Check file size: {file_info.get("object_path")}, ' - f'expected size: {file_info.get("total_size")}, ' - f'actual size: {x.get("result").get("size")}' + f'expected size: {x.get("result").get("size")}, ' + f'actual size: {local_file_size}' ) - local_file_size = os.path.getsize(file_info.get('local_path')) if file_info.get('total_size') != x.get('result').get('size') or local_file_size != x.get('result').get( 'size' ): @@ -359,9 +387,33 @@ def resume_upload( ) # check files in manifest if some of them are already uploaded - all_files = manifest_json.get('file_objects') - item_ids = list(all_files.keys()) - unfinished_items = resume_get_unfinished_items(upload_client, all_files, item_ids) + registered_items = manifest_json.get('registered_items') + item_ids = list(registered_items.keys()) + unfinished_items = resume_get_unfinished_items(upload_client, registered_items, item_ids) + logger.info(f'Registered items: {len(unfinished_items)}') + + # make the file duplication check to allow folder merging + unregistered_items = manifest_json.get('unregistered_items') + unregistered_items = [ + FileObject( + object_path=value.get('object_path'), + local_path=value.get('local_path'), + resumable_id=value.get('resumable_id'), + job_id=value.get('job_id'), + item_id=value.get('item_id'), + ) + for _, value in unregistered_items.items() + ] + unregistered_items = item_duplication_check(False, unregistered_items, upload_client) + logger.info(f'Unregistered items: {len(unregistered_items)}') + + # redo preupload again + batch_count = 1 + resumable_manifest_file = manifest_json.get('resumable_manifest_file') + for file_batchs in batch_generator(unregistered_items, batch_size=AppConfig.Env.upload_batch_size): + unfinished_items.extend(upload_client.pre_upload(file_batchs, resumable_manifest_file)) + upload_client.output_manifest(unfinished_items, unregistered_items[batch_count + 1 :], resumable_manifest_file) + batch_count += 1 mhandler.SrvOutPutHandler.resume_warning(len(unfinished_items)) mhandler.SrvOutPutHandler.resume_check_success() diff --git a/app/services/file_manager/file_upload/upload_client.py b/app/services/file_manager/file_upload/upload_client.py index c0fc84c6..2d88aaf1 100644 --- a/app/services/file_manager/file_upload/upload_client.py +++ b/app/services/file_manager/file_upload/upload_client.py @@ -267,13 +267,18 @@ def pre_upload(self, file_objects: List[FileObject], output_path: str) -> List[F mhandler.SrvOutPutHandler.preupload_success() return file_objets - def output_manifest(self, file_objects: List[FileObject], output_path: str) -> Dict[str, Any]: + def output_manifest( + self, registered_items: List[FileObject], unregistered_items: List[FileObject], output_path: str + ) -> Dict[str, Any]: """ Summary: The function is to output the manifest file. Parameter: - - file_objects(list of FileObject): the file objects that contains correct - information for chunk uploading. + - registered_items(list of FileObject): the file object that has been registered + in metadata service. + - unregistered_items(list of FileObject): the file object that has not been registered + and still need to pass thought preupload + - output_path(str): the output path of manifest. return: - manifest_json(dict): the manifest file in json format. """ @@ -285,8 +290,10 @@ def output_manifest(self, file_objects: List[FileObject], output_path: str) -> D 'parent_folder_id': self.parent_folder_id, 'current_folder_node': self.current_folder_node, 'tags': self.tags, - 'file_objects': {file_object.item_id: file_object.to_dict() for file_object in file_objects}, + 'registered_items': {file_object.item_id: file_object.to_dict() for file_object in registered_items}, + 'unregistered_items': {file_object.local_path: file_object.to_dict() for file_object in unregistered_items}, 'attributes': self.attributes if self.attributes else {}, + 'resumable_manifest_file': output_path, } with open(output_path, 'w') as f: diff --git a/pyproject.toml b/pyproject.toml index 597588a3..08d6ca28 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "app" -version = "3.16.0" +version = "3.17.0" description = "This service is designed to support pilot platform" authors = ["Indoc Systems"] diff --git a/tests/app/commands/test_file.py b/tests/app/commands/test_file.py index c3e39456..64c5af71 100644 --- a/tests/app/commands/test_file.py +++ b/tests/app/commands/test_file.py @@ -189,7 +189,10 @@ def test_resumable_upload_command_success(mocker, cli_runner): with runner.isolated_filesystem(): mocker.patch('os.path.exists', return_value=True) with open('test.json', 'w') as f: - json.dump({'file_objects': {'test_item_id': {'file_name': 'test.json'}}, 'zone': 1}, f) + json.dump( + {'registered_items': {'test_item_id': {'file_name': 'test.json'}}, 'unregistered_items': {}, 'zone': 1}, + f, + ) mocker.patch('app.commands.file.resume_upload', return_value=None) mocker.patch('os.remove', return_value=None) @@ -205,7 +208,8 @@ def test_resumable_upload_command_with_file_attribute_success(mocker, cli_runner with open('test.json', 'w') as f: json.dump( { - 'file_objects': {'test_item_id': {'file_name': 'test.json'}}, + 'registered_items': {'test_item_id': {'file_name': 'test.json'}}, + 'unregistered_items': {}, 'zone': 1, 'attributes': {'M1': {'attr1': '1'}}, }, diff --git a/tests/app/services/file_manager/file_upload/test_file_upload.py b/tests/app/services/file_manager/file_upload/test_file_upload.py index 20547caf..f7c70bd5 100644 --- a/tests/app/services/file_manager/file_upload/test_file_upload.py +++ b/tests/app/services/file_manager/file_upload/test_file_upload.py @@ -306,6 +306,7 @@ def test_folder_merge_skip_with_all_duplication(mocker, mock_upload_client, capf out, _ = capfd.readouterr() expect = ( f'Starting upload of: {file_name}\n' + + 'Start checking file duplication\n' + 'Checking for file duplication...\n' + '\nAll files already exist in the upload destination.\n\n' + customized_error_msg(ECustomizedError.UPLOAD_CANCEL) @@ -328,7 +329,8 @@ def test_resume_upload(mocker): 'parent_folder_id': 'parent_folder_id', 'current_folder_node': 'current_folder_node', 'tags': 'tags', - 'file_objects': {test_obj.item_id: test_obj.to_dict()}, + 'registered_items': {test_obj.item_id: test_obj.to_dict()}, + 'unregistered_items': {}, 'total_size': 1, } @@ -364,7 +366,8 @@ def test_resume_upload_failed_when_REGISTERED_doesnt_exist(mocker, capfd): 'parent_folder_id': 'parent_folder_id', 'current_folder_node': 'current_folder_node', 'tags': 'tags', - 'file_objects': {test_obj.item_id: test_obj.to_dict()}, + 'registered_items': {test_obj.item_id: test_obj.to_dict()}, + 'unregistered_items': {}, } get_return = test_obj.to_dict() @@ -399,7 +402,8 @@ def test_resume_upload_integrity_check_failed(mocker, capfd): 'parent_folder_id': 'parent_folder_id', 'current_folder_node': 'current_folder_node', 'tags': 'tags', - 'file_objects': {test_obj.item_id: test_obj.to_dict()}, + 'registered_items': {test_obj.item_id: test_obj.to_dict()}, + 'unregistered_items': {}, 'total_size': 1, } diff --git a/tests/app/services/file_manager/file_upload/test_upload_client.py b/tests/app/services/file_manager/file_upload/test_upload_client.py index c9a998f3..c98ae077 100644 --- a/tests/app/services/file_manager/file_upload/test_upload_client.py +++ b/tests/app/services/file_manager/file_upload/test_upload_client.py @@ -352,22 +352,33 @@ def test_check_upload_duplication_fail_with_500(httpx_mock, mocker, capfd): AssertionError('SystemExit not raised') -def test_output_manifest_success(mocker, tmp_path): +def test_output_manifest_success_for_resumable_upload(mocker, tmp_path): upload_client = UploadClient('project_code', 'parent_folder_id') json_dump_mocker = mocker.patch('json.dump', return_value=None) mocker.patch('app.services.file_manager.file_upload.models.FileObject.generate_meta', return_value=(1, 1)) - test_obj = FileObject('object/path', 'local_path', 'resumable_id', 'job_id', 'item_id') + test_obj_registered = FileObject( + 'object/test_obj_registered', 'local_path_1', 'resumable_id_1', 'job_id_1', 'item_id_1' + ) + test_obj_unregistered = FileObject('object/test_obj_unregistered', 'local_path_2', 'resumable_id_2', 'job_id_2', '') - res = upload_client.output_manifest([test_obj], output_path=str(tmp_path / 'test')) + res = upload_client.output_manifest( + [test_obj_registered], [test_obj_unregistered], output_path=str(tmp_path / 'test') + ) assert res.get('project_code') == 'project_code' assert res.get('parent_folder_id') == 'parent_folder_id' - assert len(res.get('file_objects')) == 1 - - file_item = res.get('file_objects').get('item_id') - assert file_item.get('resumable_id') == 'resumable_id' - assert file_item.get('local_path') == 'local_path' - assert file_item.get('object_path') == 'object/path' - assert file_item.get('item_id') == 'item_id' + assert len(res.get('registered_items')) == 1 + + file_item = res.get('registered_items').get('item_id_1') + assert file_item.get('resumable_id') == 'resumable_id_1' + assert file_item.get('local_path') == 'local_path_1' + assert file_item.get('object_path') == 'object/test_obj_registered' + assert file_item.get('item_id') == 'item_id_1' + + assert len(res.get('unregistered_items')) == 1 + file_item = res.get('unregistered_items').get('local_path_2') # unregistered item dont have id + assert file_item.get('resumable_id') == 'resumable_id_2' + assert file_item.get('local_path') == 'local_path_2' + assert file_item.get('object_path') == 'object/test_obj_unregistered' json_dump_mocker.assert_called_once()