diff --git a/app/resources/custom_error.py b/app/resources/custom_error.py index ef844391..3db703c0 100644 --- a/app/resources/custom_error.py +++ b/app/resources/custom_error.py @@ -49,6 +49,10 @@ class Error: 'File: %s does not exist in the folder.\n' 'Please remove the resumable upload log and retry uploading the entire folder again.' ), + 'INVALID_RESUMABLE_FILE_SIZE': ( + 'The file size of %s is not the same as the previous upload. ' + 'Expected size: %s, Actual size: %s. Please verify the file content and try again.' + ), 'INVALID_FOLDERNAME': ( 'The input folder name is not valid. Please follow the rule:\n' ' - cannot contains special characters.\n' diff --git a/app/services/file_manager/file_upload/file_upload.py b/app/services/file_manager/file_upload/file_upload.py index bec492de..cc22e1d4 100644 --- a/app/services/file_manager/file_upload/file_upload.py +++ b/app/services/file_manager/file_upload/file_upload.py @@ -268,35 +268,24 @@ def simple_upload( # noqa: C901 return [file_object.item_id for file_object in pre_upload_infos] -def resume_upload( - manifest_json: Dict[str, Any], - num_of_thread: int = 1, -): - """ +def resume_get_unfinished_items( + upload_client: UploadClient, all_files: Dict[str, Any], item_ids: List[str] +) -> List[FileObject]: + ''' Summary: - Resume upload from the manifest file - Parameters: - - manifest_json: the manifest json which store the upload information - - num_of_thread: the number of thread to upload the file - """ - upload_start_time = time.time() + Function will loop over `all_files` batchly and check if the file is already uploaded. + During the process, the logic wll check if the size registered in the backend is matched + with the local file size. If not, the function will raise an error. - upload_client = UploadClient( - project_code=manifest_json.get('project_code'), - zone=manifest_json.get('zone'), - job_type='AS_FOLDER', - current_folder_node=manifest_json.get('current_folder_node', ''), - parent_folder_id=manifest_json.get('parent_folder_id', ''), - tags=manifest_json.get('tags'), - ) + Parameter: + - upload_client(UploadClient): the upload client object + - all_files(Dict[str, Any]): the file object dictionary + - item_ids(List[str]): the list of item ids that will be checked + Return: + - unfinished_items(List[FileObject]): the list of file object that is not uploaded yet + ''' - # check files in manifest if some of them are already uploaded unfinished_items = [] - all_files = manifest_json.get('file_objects') - item_ids = [] - for item_id in all_files: - item_ids.append(item_id) - # here add the batch of 500 per loop, the pre upload api cannot # process very large amount of file at same time. otherwise it will timeout # here is list of pre upload result. We decided to call pre upload api by batch @@ -312,8 +301,26 @@ def resume_upload( SrvErrorHandler.customized_handle( ECustomizedError.INVALID_RESUMABLE_UPLOAD, if_exit=True, value=missing_item.get('object_path') ) + # check if the file is already registered elif x.get('result').get('status') == ItemStatus.REGISTERED: file_info = all_files.get(file_meta.get('id')) + # check if size is matched during resume vs preupload + logger.info( + f'Check file size: {file_info.get("object_path")}, ' + f'expected size: {file_info.get("total_size")}, ' + f'actual size: {x.get("result").get("size")}' + ) + if file_info.get('total_size') != x.get('result').get('size'): + SrvErrorHandler.customized_handle( + ECustomizedError.INVALID_RESUMABLE_FILE_SIZE, + if_exit=True, + value=( + file_info.get('object_path'), + x.get('result').get('size'), + file_info.get('total_size'), + ), + ) + unfinished_files.append( FileObject( file_info.get('object_path'), @@ -329,6 +336,36 @@ def resume_upload( if len(unfinished_files) > 0: unfinished_items.extend(upload_client.resume_upload(unfinished_files)) + return unfinished_items + + +def resume_upload( + manifest_json: Dict[str, Any], + num_of_thread: int = 1, +): + """ + Summary: + Resume upload from the manifest file + Parameters: + - manifest_json: the manifest json which store the upload information + - num_of_thread: the number of thread to upload the file + """ + upload_start_time = time.time() + + upload_client = UploadClient( + project_code=manifest_json.get('project_code'), + zone=manifest_json.get('zone'), + job_type='AS_FOLDER', + current_folder_node=manifest_json.get('current_folder_node', ''), + parent_folder_id=manifest_json.get('parent_folder_id', ''), + tags=manifest_json.get('tags'), + ) + + # check files in manifest if some of them are already uploaded + all_files = manifest_json.get('file_objects') + item_ids = list(all_files.keys()) + unfinished_items = resume_get_unfinished_items(upload_client, all_files, item_ids) + mhandler.SrvOutPutHandler.resume_warning(len(unfinished_items)) mhandler.SrvOutPutHandler.resume_check_success() diff --git a/app/services/file_manager/file_upload/upload_client.py b/app/services/file_manager/file_upload/upload_client.py index f67a1181..c77f1bde 100644 --- a/app/services/file_manager/file_upload/upload_client.py +++ b/app/services/file_manager/file_upload/upload_client.py @@ -225,7 +225,8 @@ def pre_upload(self, file_objects: List[FileObject], output_path: str) -> List[F 'parent_folder_id': self.parent_folder_id, 'folder_tags': self.tags, 'data': [ - {'resumable_filename': x.file_name, 'resumable_relative_path': x.parent_path} for x in file_objects + {'resumable_filename': x.file_name, 'resumable_relative_path': x.parent_path, 'size': x.total_size} + for x in file_objects ], } if self.source_id: diff --git a/app/services/output_manager/error_handler.py b/app/services/output_manager/error_handler.py index 25708ecf..e16724cf 100644 --- a/app/services/output_manager/error_handler.py +++ b/app/services/output_manager/error_handler.py @@ -35,6 +35,7 @@ class ECustomizedError(enum.Enum): INVALID_PATHS = 'INVALID_PATHS' INVALID_RESUMABLE_FILE = 'INVALID_RESUMABLE_FILE' INVALID_RESUMABLE_UPLOAD = 'INVALID_RESUMABLE_UPLOAD' + INVALID_RESUMABLE_FILE_SIZE = 'INVALID_RESUMABLE_FILE_SIZE' TOU_CONTENT = 'TOU_CONTENT' INVALID_TOKEN = 'INVALID_TOKEN' PERMISSION_DENIED = 'PERMISSION_DENIED' diff --git a/pyproject.toml b/pyproject.toml index ba97ea02..6575043e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "app" -version = "3.14.1" +version = "3.15.0" description = "This service is designed to support pilot platform" authors = ["Indoc Systems"] diff --git a/tests/app/services/file_manager/file_upload/test_file_upload.py b/tests/app/services/file_manager/file_upload/test_file_upload.py index 16bc1b41..2880b210 100644 --- a/tests/app/services/file_manager/file_upload/test_file_upload.py +++ b/tests/app/services/file_manager/file_upload/test_file_upload.py @@ -319,6 +319,7 @@ def test_folder_merge_skip_with_all_duplication(mocker, mock_upload_client, capf def test_resume_upload(mocker): mocker.patch('app.services.file_manager.file_upload.models.FileObject.generate_meta', return_value=(1, 1)) test_obj = FileObject('object/path', 'local_path', 'resumable_id', 'job_id', 'item_id') + test_obj.total_size = 1 manifest_json = { 'project_code': 'project_code', @@ -328,11 +329,13 @@ def test_resume_upload(mocker): 'current_folder_node': 'current_folder_node', 'tags': 'tags', 'file_objects': {test_obj.item_id: test_obj.to_dict()}, + 'total_size': 1, } get_return = test_obj.to_dict() get_return.update({'status': ItemStatus.REGISTERED}) get_return.update({'id': get_return.get('item_id')}) + get_return.update({'size': 1}) get_mock = mocker.patch( 'app.services.file_manager.file_upload.file_upload.get_file_info_by_geid', return_value=[{'result': get_return}] ) @@ -378,3 +381,39 @@ def test_resume_upload_failed_when_REGISTERED_doesnt_exist(mocker, capfd): get_mock.assert_called_once() assert resume_upload_mock.call_count == 0 + + +def test_resume_upload_integrity_check_failed(mocker, capfd): + mocker.patch('app.services.file_manager.file_upload.models.FileObject.generate_meta', return_value=(1, 1)) + test_obj = FileObject('object/path', 'local_path', 'resumable_id', 'job_id', 'item_id') + test_obj.total_size = 2 # wrong size + + manifest_json = { + 'project_code': 'project_code', + 'operator': 'operator', + 'zone': AppConfig.Env.green_zone, + 'parent_folder_id': 'parent_folder_id', + 'current_folder_node': 'current_folder_node', + 'tags': 'tags', + 'file_objects': {test_obj.item_id: test_obj.to_dict()}, + 'total_size': 1, + } + + get_return = test_obj.to_dict() + get_return.update({'status': ItemStatus.REGISTERED}) + get_return.update({'id': get_return.get('item_id')}) + get_return.update({'size': 1}) + get_mock = mocker.patch( + 'app.services.file_manager.file_upload.file_upload.get_file_info_by_geid', return_value=[{'result': get_return}] + ) + + try: + resume_upload(manifest_json, 1) + except SystemExit: + out, _ = capfd.readouterr() + expect = customized_error_msg(ECustomizedError.INVALID_RESUMABLE_FILE_SIZE) % ('object/path', 1, 2) + assert expect in out + else: + AssertionError('SystemExit not raised') + + get_mock.assert_called_once() diff --git a/tests/app/services/project_manager/test_project.py b/tests/app/services/project_manager/test_project.py index 9fac30c4..08c11fd2 100644 --- a/tests/app/services/project_manager/test_project.py +++ b/tests/app/services/project_manager/test_project.py @@ -9,7 +9,7 @@ def test_list_project(httpx_mock, mocker, capsys): mocker.patch('app.services.user_authentication.token_manager.SrvTokenManager.check_valid', return_value=0) httpx_mock.add_response( method='GET', - url='http://bff_cli/v1/projects?page=0&page_size=10&order=created_at&order_by=desc', + url='http://bff_cli/v1/projects?page=0&page_size=10&order=desc&order_by=created_at', json={ 'code': 200, 'error_msg': '', @@ -38,7 +38,7 @@ def test_list_project(httpx_mock, mocker, capsys): }, ) project_mgr = SrvProjectManager() - project_mgr.list_projects(page=0, page_size=10, order='created_at', order_by='desc') + project_mgr.list_projects(page=0, page_size=10, order='desc', order_by='created_at') out, _ = capsys.readouterr() print_out = out.split('\n') assert print_out[0] == ' Project Name Project Code ' @@ -54,11 +54,11 @@ def test_list_project_no_project(httpx_mock, mocker, capsys): mocker.patch('app.services.user_authentication.token_manager.SrvTokenManager.check_valid', return_value=0) httpx_mock.add_response( method='GET', - url='http://bff_cli/v1/projects?page=0&page_size=10&order=created_at&order_by=desc', + url='http://bff_cli/v1/projects?page=0&page_size=10&order=desc&order_by=created_at', json={'code': 200, 'error_msg': '', 'result': [], 'total': 0, 'page': 0}, ) project_mgr = SrvProjectManager() - project_mgr.list_projects(page=0, page_size=10, order='created_at', order_by='desc') + project_mgr.list_projects(page=0, page_size=10, order='desc', order_by='created_at') out, _ = capsys.readouterr() print_out = out.split('\n') assert print_out[0] == ' Project Name Project Code ' @@ -71,7 +71,7 @@ def test_list_project_desc_by_code(httpx_mock, mocker, capsys): mocker.patch('app.services.user_authentication.token_manager.SrvTokenManager.check_valid', return_value=0) httpx_mock.add_response( method='GET', - url='http://bff_cli/v1/projects?page=0&page_size=10&order=code&order_by=desc', + url='http://bff_cli/v1/projects?page=0&page_size=10&order=desc&order_by=code', json={ 'code': 200, 'error_msg': '', @@ -92,7 +92,7 @@ def test_list_project_desc_by_code(httpx_mock, mocker, capsys): }, ) project_mgr = SrvProjectManager() - project_mgr.list_projects(page=0, page_size=10, order='code', order_by='desc') + project_mgr.list_projects(page=0, page_size=10, order='desc', order_by='code') out, _ = capsys.readouterr() print_out = out.split('\n') assert print_out[0] == ' Project Name Project Code ' @@ -115,7 +115,7 @@ def test_list_project_desc_by_name(httpx_mock, mocker, capsys): mocker.patch('app.services.user_authentication.token_manager.SrvTokenManager.check_valid', return_value=0) httpx_mock.add_response( method='GET', - url='http://bff_cli/v1/projects?page=0&page_size=10&order=code&order_by=desc', + url='http://bff_cli/v1/projects?page=0&page_size=10&order=desc&order_by=name', json={ 'code': 200, 'error_msg': '', @@ -136,7 +136,7 @@ def test_list_project_desc_by_name(httpx_mock, mocker, capsys): }, ) project_mgr = SrvProjectManager() - project_mgr.list_projects(page=0, page_size=10, order='code', order_by='desc') + project_mgr.list_projects(page=0, page_size=10, order='desc', order_by='name') out, _ = capsys.readouterr() print_out = out.split('\n') assert print_out[0] == ' Project Name Project Code ' @@ -159,7 +159,7 @@ def test_list_project_desc_by_name_with_page_size(httpx_mock, mocker, capsys): mocker.patch('app.services.user_authentication.token_manager.SrvTokenManager.check_valid', return_value=0) httpx_mock.add_response( method='GET', - url='http://bff_cli/v1/projects?page=0&page_size=3&order=code&order_by=desc', + url='http://bff_cli/v1/projects?page=0&page_size=3&order=desc&order_by=name', json={ 'code': 200, 'error_msg': '', @@ -173,7 +173,7 @@ def test_list_project_desc_by_name_with_page_size(httpx_mock, mocker, capsys): }, ) project_mgr = SrvProjectManager() - project_mgr.list_projects(page=0, page_size=3, order='code', order_by='desc') + project_mgr.list_projects(page=0, page_size=3, order='desc', order_by='name') out, _ = capsys.readouterr() print_out = out.split('\n') assert print_out[0] == ' Project Name Project Code ' @@ -189,7 +189,7 @@ def test_list_project_desc_by_name_with_page_size_and_page(httpx_mock, mocker, c mocker.patch('app.services.user_authentication.token_manager.SrvTokenManager.check_valid', return_value=0) httpx_mock.add_response( method='GET', - url='http://bff_cli/v1/projects?page=1&page_size=3&order=code&order_by=desc', + url='http://bff_cli/v1/projects?page=1&page_size=3&order=desc&order_by=name', json={ 'code': 200, 'error_msg': '', @@ -203,7 +203,7 @@ def test_list_project_desc_by_name_with_page_size_and_page(httpx_mock, mocker, c }, ) project_mgr = SrvProjectManager() - project_mgr.list_projects(page=1, page_size=3, order='code', order_by='desc') + project_mgr.list_projects(page=1, page_size=3, order='desc', order_by='name') out, _ = capsys.readouterr() print_out = out.split('\n') assert print_out[0] == ' Project Name Project Code '