From 9a453c5a35a5d69aec8f2c52dad7102eaf59a322 Mon Sep 17 00:00:00 2001 From: zhiren Date: Fri, 7 Mar 2025 17:10:53 -0500 Subject: [PATCH 1/6] update manifest logic to output resumable information after each batch --- .../file_manager/file_upload/file_upload.py | 117 +++++++++++++----- .../file_manager/file_upload/upload_client.py | 15 ++- 2 files changed, 94 insertions(+), 38 deletions(-) diff --git a/app/services/file_manager/file_upload/file_upload.py b/app/services/file_manager/file_upload/file_upload.py index cc22e1d4..669d3280 100644 --- a/app/services/file_manager/file_upload/file_upload.py +++ b/app/services/file_manager/file_upload/file_upload.py @@ -119,6 +119,55 @@ def assemble_path( return current_folder_node, parent_folder, create_folder_flag, target_folder +def item_duplication_check( + create_folder_flag: bool, file_objects: List[FileObject], upload_client: UploadClient +) -> List[FileObject]: + ''' + Summary: + The function will check if the file is already uploaded on the platform. + If the file is already uploaded, the function will ask if to skip the file. + Otherwise, the function will exit + Parameter: + - create_folder_flag(bool): the flag to indicate if need to create new folder + - file_objects(List[FileObject]): the list of file object + - upload_client(UploadClient): the upload client object + Return: + - non_duplicate_file_objects(List[FileObject]): the list of file object that is not duplicated + ''' + + # make the file duplication check to allow folde merging + non_duplicate_file_objects = [] + if create_folder_flag is True: + non_duplicate_file_objects = file_objects + else: + mhandler.SrvOutPutHandler.file_duplication_check() + duplicated_file = [] + debug_logger.debug(f'upload batch size: {AppConfig.Env.upload_batch_size}') + for file_batchs in batch_generator(file_objects, batch_size=AppConfig.Env.upload_batch_size): + start_time = time.time() + non_duplicates, duplicate_path = upload_client.check_upload_duplication(file_batchs) + debug_logger.debug(f'Check duplication time: {time.time() - start_time:.2f}s') + non_duplicate_file_objects.extend(non_duplicates) + duplicated_file.extend(duplicate_path) + + if len(non_duplicate_file_objects) == 0: + mhandler.SrvOutPutHandler.file_duplication_check_warning_with_all_same() + SrvErrorHandler.customized_handle(ECustomizedError.UPLOAD_CANCEL, if_exit=True) + elif len(duplicated_file) > 0: + mhandler.SrvOutPutHandler.file_duplication_check_success() + duplicate_warning_format = '\n'.join(duplicated_file) + try: + click.confirm( + customized_error_msg(ECustomizedError.UPLOAD_SKIP_DUPLICATION) % (duplicate_warning_format), + abort=True, + ) + except Abort: + mhandler.SrvOutPutHandler.cancel_upload() + exit(1) + + return non_duplicate_file_objects + + def simple_upload( # noqa: C901 upload_event, num_of_thread: int = 1, @@ -191,35 +240,8 @@ def simple_upload( # noqa: C901 else: file_objects.append(file_object) - # make the file duplication check to allow folde merging - non_duplicate_file_objects = [] - if create_folder_flag is True: - non_duplicate_file_objects = file_objects - else: - mhandler.SrvOutPutHandler.file_duplication_check() - duplicated_file = [] - debug_logger.debug(f'upload batch size: {AppConfig.Env.upload_batch_size}') - for file_batchs in batch_generator(file_objects, batch_size=AppConfig.Env.upload_batch_size): - start_time = time.time() - non_duplicates, duplicate_path = upload_client.check_upload_duplication(file_batchs) - debug_logger.debug(f'Check duplication time: {time.time() - start_time:.2f}s') - non_duplicate_file_objects.extend(non_duplicates) - duplicated_file.extend(duplicate_path) - - if len(non_duplicate_file_objects) == 0: - mhandler.SrvOutPutHandler.file_duplication_check_warning_with_all_same() - SrvErrorHandler.customized_handle(ECustomizedError.UPLOAD_CANCEL, if_exit=True) - elif len(duplicated_file) > 0: - mhandler.SrvOutPutHandler.file_duplication_check_success() - duplicate_warning_format = '\n'.join(duplicated_file) - try: - click.confirm( - customized_error_msg(ECustomizedError.UPLOAD_SKIP_DUPLICATION) % (duplicate_warning_format), - abort=True, - ) - except Abort: - mhandler.SrvOutPutHandler.cancel_upload() - exit(1) + # make the file duplication check to allow folder merging + non_duplicate_file_objects = item_duplication_check(create_folder_flag, file_objects, upload_client) # here is list of pre upload result. We decided to call pre upload api by batch pre_upload_infos = [] @@ -228,8 +250,11 @@ def simple_upload( # noqa: C901 # the placeholder in object storage pre_upload_infos.extend(upload_client.pre_upload(file_batchs, output_path)) - # then output manifest file to the output path - upload_client.output_manifest(pre_upload_infos, output_path) + # then output manifest file to the output path AFTER EACH BATCH + # which will include unfinished objects + upload_client.output_manifest( + pre_upload_infos, non_duplicate_file_objects[len(pre_upload_infos) + 1 :], output_path + ) # now loop over each file under the folder and start # the chunk upload @@ -362,9 +387,33 @@ def resume_upload( ) # check files in manifest if some of them are already uploaded - all_files = manifest_json.get('file_objects') - item_ids = list(all_files.keys()) - unfinished_items = resume_get_unfinished_items(upload_client, all_files, item_ids) + registered_items = manifest_json.get('registered_items') + item_ids = list(registered_items.keys()) + unfinished_items = resume_get_unfinished_items(upload_client, registered_items, item_ids) + logger.info(f'Registered items: {len(unfinished_items)}') + + # make the file duplication check to allow folder merging + unregistered_items = manifest_json.get('unregistered_items') + unregistered_items = [ + FileObject( + object_path=value.get('object_path'), + local_path=value.get('local_path'), + resumable_id=value.get('resumable_id'), + job_id=value.get('job_id'), + item_id=value.get('item_id'), + ) + for _, value in unregistered_items.items() + ] + unregistered_items = item_duplication_check(False, unregistered_items, upload_client) + logger.info(f'Unregistered items: {len(unregistered_items)}') + + # redo preupload again + batch_count = 1 + resumable_manifest_file = manifest_json.get('resumable_manifest_file') + for file_batchs in batch_generator(unregistered_items, batch_size=AppConfig.Env.upload_batch_size): + unfinished_items.extend(upload_client.pre_upload(file_batchs, resumable_manifest_file)) + upload_client.output_manifest(unfinished_items, unregistered_items[batch_count + 1 :], resumable_manifest_file) + batch_count += 1 mhandler.SrvOutPutHandler.resume_warning(len(unfinished_items)) mhandler.SrvOutPutHandler.resume_check_success() diff --git a/app/services/file_manager/file_upload/upload_client.py b/app/services/file_manager/file_upload/upload_client.py index c77f1bde..7498251c 100644 --- a/app/services/file_manager/file_upload/upload_client.py +++ b/app/services/file_manager/file_upload/upload_client.py @@ -265,13 +265,18 @@ def pre_upload(self, file_objects: List[FileObject], output_path: str) -> List[F mhandler.SrvOutPutHandler.preupload_success() return file_objets - def output_manifest(self, file_objects: List[FileObject], output_path: str) -> Dict[str, Any]: + def output_manifest( + self, registered_items: List[FileObject], unregistered_items: List[FileObject], output_path: str + ) -> Dict[str, Any]: """ Summary: The function is to output the manifest file. Parameter: - - file_objects(list of FileObject): the file objects that contains correct - information for chunk uploading. + - registered_items(list of FileObject): the file object that has been registered + in metadata service. + - unregistered_items(list of FileObject): the file object that has not been registered + and still need to pass thought preupload + - output_path(str): the output path of manifest. return: - manifest_json(dict): the manifest file in json format. """ @@ -283,8 +288,10 @@ def output_manifest(self, file_objects: List[FileObject], output_path: str) -> D 'parent_folder_id': self.parent_folder_id, 'current_folder_node': self.current_folder_node, 'tags': self.tags, - 'file_objects': {file_object.item_id: file_object.to_dict() for file_object in file_objects}, + 'registered_items': {file_object.item_id: file_object.to_dict() for file_object in registered_items}, + 'unregistered_items': {file_object.local_path: file_object.to_dict() for file_object in unregistered_items}, 'attributes': self.attributes if self.attributes else {}, + 'resumable_manifest_file': output_path, } with open(output_path, 'w') as f: From 333201d495374665d1ca777e30a0a6c91847fe6b Mon Sep 17 00:00:00 2001 From: zhiren Date: Tue, 11 Mar 2025 00:09:16 -0400 Subject: [PATCH 2/6] update test cases for new resumable upload files --- app/commands/file.py | 2 +- .../file_manager/file_upload/file_upload.py | 4 ++- tests/app/commands/test_file.py | 8 +++-- .../file_upload/test_file_upload.py | 9 ++++-- .../file_upload/test_upload_client.py | 29 +++++++++++++------ 5 files changed, 36 insertions(+), 16 deletions(-) diff --git a/app/commands/file.py b/app/commands/file.py index 9972678c..0e2d2323 100644 --- a/app/commands/file.py +++ b/app/commands/file.py @@ -288,7 +288,7 @@ def file_resume(**kwargs): # noqa: C901 # since only file upload can attach manifest, take the first file object srv_manifest = SrvFileManifests() - item_id = next(iter(resumable_manifest.get('file_objects'))) + item_id = next(iter(resumable_manifest.get('registered_items'))) attribute = resumable_manifest.get('attributes') zone = resumable_manifest.get('zone') srv_manifest.attach_manifest(attribute, item_id, zone) if attribute else None diff --git a/app/services/file_manager/file_upload/file_upload.py b/app/services/file_manager/file_upload/file_upload.py index 669d3280..37ef42e1 100644 --- a/app/services/file_manager/file_upload/file_upload.py +++ b/app/services/file_manager/file_upload/file_upload.py @@ -150,7 +150,9 @@ def item_duplication_check( non_duplicate_file_objects.extend(non_duplicates) duplicated_file.extend(duplicate_path) - if len(non_duplicate_file_objects) == 0: + # if all file objects we check are already existed on the platform + # then we will exit the upload process + if len(non_duplicate_file_objects) == 0 and len(file_objects) != 0: mhandler.SrvOutPutHandler.file_duplication_check_warning_with_all_same() SrvErrorHandler.customized_handle(ECustomizedError.UPLOAD_CANCEL, if_exit=True) elif len(duplicated_file) > 0: diff --git a/tests/app/commands/test_file.py b/tests/app/commands/test_file.py index c3e39456..64c5af71 100644 --- a/tests/app/commands/test_file.py +++ b/tests/app/commands/test_file.py @@ -189,7 +189,10 @@ def test_resumable_upload_command_success(mocker, cli_runner): with runner.isolated_filesystem(): mocker.patch('os.path.exists', return_value=True) with open('test.json', 'w') as f: - json.dump({'file_objects': {'test_item_id': {'file_name': 'test.json'}}, 'zone': 1}, f) + json.dump( + {'registered_items': {'test_item_id': {'file_name': 'test.json'}}, 'unregistered_items': {}, 'zone': 1}, + f, + ) mocker.patch('app.commands.file.resume_upload', return_value=None) mocker.patch('os.remove', return_value=None) @@ -205,7 +208,8 @@ def test_resumable_upload_command_with_file_attribute_success(mocker, cli_runner with open('test.json', 'w') as f: json.dump( { - 'file_objects': {'test_item_id': {'file_name': 'test.json'}}, + 'registered_items': {'test_item_id': {'file_name': 'test.json'}}, + 'unregistered_items': {}, 'zone': 1, 'attributes': {'M1': {'attr1': '1'}}, }, diff --git a/tests/app/services/file_manager/file_upload/test_file_upload.py b/tests/app/services/file_manager/file_upload/test_file_upload.py index 2880b210..137c011e 100644 --- a/tests/app/services/file_manager/file_upload/test_file_upload.py +++ b/tests/app/services/file_manager/file_upload/test_file_upload.py @@ -328,7 +328,8 @@ def test_resume_upload(mocker): 'parent_folder_id': 'parent_folder_id', 'current_folder_node': 'current_folder_node', 'tags': 'tags', - 'file_objects': {test_obj.item_id: test_obj.to_dict()}, + 'registered_items': {test_obj.item_id: test_obj.to_dict()}, + 'unregistered_items': {}, 'total_size': 1, } @@ -360,7 +361,8 @@ def test_resume_upload_failed_when_REGISTERED_doesnt_exist(mocker, capfd): 'parent_folder_id': 'parent_folder_id', 'current_folder_node': 'current_folder_node', 'tags': 'tags', - 'file_objects': {test_obj.item_id: test_obj.to_dict()}, + 'registered_items': {test_obj.item_id: test_obj.to_dict()}, + 'unregistered_items': {}, } get_return = test_obj.to_dict() @@ -395,7 +397,8 @@ def test_resume_upload_integrity_check_failed(mocker, capfd): 'parent_folder_id': 'parent_folder_id', 'current_folder_node': 'current_folder_node', 'tags': 'tags', - 'file_objects': {test_obj.item_id: test_obj.to_dict()}, + 'registered_items': {test_obj.item_id: test_obj.to_dict()}, + 'unregistered_items': {}, 'total_size': 1, } diff --git a/tests/app/services/file_manager/file_upload/test_upload_client.py b/tests/app/services/file_manager/file_upload/test_upload_client.py index ab10f577..0773d410 100644 --- a/tests/app/services/file_manager/file_upload/test_upload_client.py +++ b/tests/app/services/file_manager/file_upload/test_upload_client.py @@ -330,18 +330,29 @@ def test_output_manifest_success(mocker, tmp_path): upload_client = UploadClient('project_code', 'parent_folder_id') json_dump_mocker = mocker.patch('json.dump', return_value=None) mocker.patch('app.services.file_manager.file_upload.models.FileObject.generate_meta', return_value=(1, 1)) - test_obj = FileObject('object/path', 'local_path', 'resumable_id', 'job_id', 'item_id') + test_obj_registered = FileObject( + 'object/test_obj_registered', 'local_path_1', 'resumable_id_1', 'job_id_1', 'item_id_1' + ) + test_obj_unregistered = FileObject('object/test_obj_unregistered', 'local_path_2', 'resumable_id_2', 'job_id_2', '') - res = upload_client.output_manifest([test_obj], output_path=str(tmp_path / 'test')) + res = upload_client.output_manifest( + [test_obj_registered], [test_obj_unregistered], output_path=str(tmp_path / 'test') + ) assert res.get('project_code') == 'project_code' assert res.get('parent_folder_id') == 'parent_folder_id' - assert len(res.get('file_objects')) == 1 - - file_item = res.get('file_objects').get('item_id') - assert file_item.get('resumable_id') == 'resumable_id' - assert file_item.get('local_path') == 'local_path' - assert file_item.get('object_path') == 'object/path' - assert file_item.get('item_id') == 'item_id' + assert len(res.get('registered_items')) == 1 + + file_item = res.get('registered_items').get('item_id_1') + assert file_item.get('resumable_id') == 'resumable_id_1' + assert file_item.get('local_path') == 'local_path_1' + assert file_item.get('object_path') == 'object/test_obj_registered' + assert file_item.get('item_id') == 'item_id_1' + + assert len(res.get('unregistered_items')) == 1 + file_item = res.get('unregistered_items').get('local_path_2') # unregistered item dont have id + assert file_item.get('resumable_id') == 'resumable_id_2' + assert file_item.get('local_path') == 'local_path_2' + assert file_item.get('object_path') == 'object/test_obj_unregistered' json_dump_mocker.assert_called_once() From 9a54ff638ac99c3e15dabdb5c9de3917007cfe40 Mon Sep 17 00:00:00 2001 From: zhiren Date: Tue, 11 Mar 2025 09:18:43 -0400 Subject: [PATCH 3/6] bumpup version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 6575043e..597588a3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "app" -version = "3.15.0" +version = "3.16.0" description = "This service is designed to support pilot platform" authors = ["Indoc Systems"] From 70a1f9ff4b1f9b89273d51a60a6a57c6c8ce84e0 Mon Sep 17 00:00:00 2001 From: zhiren Date: Mon, 17 Mar 2025 10:56:50 -0400 Subject: [PATCH 4/6] update PR --- app/services/file_manager/file_upload/file_upload.py | 1 + .../app/services/file_manager/file_upload/test_upload_client.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/app/services/file_manager/file_upload/file_upload.py b/app/services/file_manager/file_upload/file_upload.py index 53681d8b..dbc084ca 100644 --- a/app/services/file_manager/file_upload/file_upload.py +++ b/app/services/file_manager/file_upload/file_upload.py @@ -136,6 +136,7 @@ def item_duplication_check( ''' # make the file duplication check to allow folde merging + logger.info('Start checking file duplication') non_duplicate_file_objects = [] if create_folder_flag is True: non_duplicate_file_objects = file_objects diff --git a/tests/app/services/file_manager/file_upload/test_upload_client.py b/tests/app/services/file_manager/file_upload/test_upload_client.py index 82f6f63d..c98ae077 100644 --- a/tests/app/services/file_manager/file_upload/test_upload_client.py +++ b/tests/app/services/file_manager/file_upload/test_upload_client.py @@ -352,7 +352,7 @@ def test_check_upload_duplication_fail_with_500(httpx_mock, mocker, capfd): AssertionError('SystemExit not raised') -def test_output_manifest_success(mocker, tmp_path): +def test_output_manifest_success_for_resumable_upload(mocker, tmp_path): upload_client = UploadClient('project_code', 'parent_folder_id') json_dump_mocker = mocker.patch('json.dump', return_value=None) mocker.patch('app.services.file_manager.file_upload.models.FileObject.generate_meta', return_value=(1, 1)) From f9af654607ee154d10bff72f04b9cf7a6f69625f Mon Sep 17 00:00:00 2001 From: zhiren Date: Mon, 17 Mar 2025 11:42:58 -0400 Subject: [PATCH 5/6] fixup test cases due to new output --- tests/app/services/file_manager/file_upload/test_file_upload.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/app/services/file_manager/file_upload/test_file_upload.py b/tests/app/services/file_manager/file_upload/test_file_upload.py index 6c42dcff..f7c70bd5 100644 --- a/tests/app/services/file_manager/file_upload/test_file_upload.py +++ b/tests/app/services/file_manager/file_upload/test_file_upload.py @@ -306,6 +306,7 @@ def test_folder_merge_skip_with_all_duplication(mocker, mock_upload_client, capf out, _ = capfd.readouterr() expect = ( f'Starting upload of: {file_name}\n' + + 'Start checking file duplication\n' + 'Checking for file duplication...\n' + '\nAll files already exist in the upload destination.\n\n' + customized_error_msg(ECustomizedError.UPLOAD_CANCEL) From bae669eff2cb1612d376194d04cb2f9dfa88c4ab Mon Sep 17 00:00:00 2001 From: zhiren Date: Wed, 19 Mar 2025 11:19:00 -0400 Subject: [PATCH 6/6] merged --- README.md | 2 ++ pyproject.toml | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 9d968c54..9a99f74f 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,8 @@ Command line tool that allows the user to execute data operations on the platfor | 3.15.0 | 2.14.2 | 2.14.2 | | 3.15.1 | 2.14.2 | 2.14.2 | | 3.15.2 | 2.15 | 2.15 | +| 3.16.0 | 2.15 | 2.15 | +| 3.17.0 | 2.15 | 2.15 | ### Build Instructions 1. Each system has its own credential, so building should be done after the updated the env file. diff --git a/pyproject.toml b/pyproject.toml index 597588a3..08d6ca28 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "app" -version = "3.16.0" +version = "3.17.0" description = "This service is designed to support pilot platform" authors = ["Indoc Systems"]