From 578706b70a53116fa6813973d2ba89c21c80d423 Mon Sep 17 00:00:00 2001 From: zhiren Date: Mon, 10 Mar 2025 16:08:18 -0400 Subject: [PATCH 1/6] move upload status check into dedicated function for upload/resume logic --- app/configs/app_config.py | 4 +++ app/resources/custom_error.py | 1 + .../file_manager/file_upload/file_upload.py | 21 +++----------- .../file_manager/file_upload/upload_client.py | 29 +++++++++++++++++++ app/services/output_manager/error_handler.py | 1 + 5 files changed, 39 insertions(+), 17 deletions(-) diff --git a/app/configs/app_config.py b/app/configs/app_config.py index 54791112..da5d7ae1 100644 --- a/app/configs/app_config.py +++ b/app/configs/app_config.py @@ -29,6 +29,10 @@ class Env: # the number of items to active interative mode interative_threshold = 10 + # number looping when waiting upload status + output_truncate_count = 10 + max_waiting_count = 30 + github_url = 'PilotDataPlatform/cli' zone_int2string = { diff --git a/app/resources/custom_error.py b/app/resources/custom_error.py index 3db703c0..f98b2e01 100644 --- a/app/resources/custom_error.py +++ b/app/resources/custom_error.py @@ -69,6 +69,7 @@ class Error: 'The following files already exist in the upload destination: \n%s\n' 'Do you want to cancel the upload [N] or skip duplicates and continue uploading [y]?' ), + 'UPLOAD_TIMEOUT': 'Upload task was timeout. Please check the portal for the upload status.', 'UPLOAD_ID_NOT_EXIST': ( 'The specified multipart upload does not exist. ' 'The upload ID may be invalid, or the upload may have been aborted or completed.' diff --git a/app/services/file_manager/file_upload/file_upload.py b/app/services/file_manager/file_upload/file_upload.py index cc22e1d4..097e20a8 100644 --- a/app/services/file_manager/file_upload/file_upload.py +++ b/app/services/file_manager/file_upload/file_upload.py @@ -254,14 +254,8 @@ def simple_upload( # noqa: C901 pool.close() pool.join() - unfinished_files = pre_upload_infos - while len(unfinished_files) > 0: - temp = [] - mhandler.SrvOutPutHandler.finalize_upload() - for file_batchs in batch_generator(pre_upload_infos, batch_size=AppConfig.Env.upload_batch_size): - temp.extend(upload_client.check_status(file_batchs)) - unfinished_files = temp - + # check the status of the upload + upload_client.upload_status_check(pre_upload_infos) num_of_file = len(pre_upload_infos) logger.info(f'Upload Time: {time.time() - upload_start_time:.2f}s for {num_of_file:d} files') @@ -390,14 +384,7 @@ def resume_upload( pool.close() pool.join() - unfinished_files = unfinished_items - while len(unfinished_files) > 0: - temp = [] - mhandler.SrvOutPutHandler.finalize_upload() - for file_batchs in batch_generator(unfinished_items, batch_size=AppConfig.Env.upload_batch_size): - temp.extend(upload_client.check_status(file_batchs)) - unfinished_files = temp - time.sleep(1) - + # check the status of the upload + upload_client.upload_status_check(unfinished_items) num_of_file = len(unfinished_items) logger.info(f'Upload Time: {time.time() - upload_start_time:.2f}s for {num_of_file:d} files') diff --git a/app/services/file_manager/file_upload/upload_client.py b/app/services/file_manager/file_upload/upload_client.py index c77f1bde..c0fc84c6 100644 --- a/app/services/file_manager/file_upload/upload_client.py +++ b/app/services/file_manager/file_upload/upload_client.py @@ -8,6 +8,7 @@ import math import os import threading +import time from logging import getLogger from multiprocessing.pool import ThreadPool from typing import Any @@ -30,6 +31,7 @@ from app.services.output_manager.error_handler import SrvErrorHandler from app.services.user_authentication.decorator import require_valid_token from app.utils.aggregated import ItemStatus +from app.utils.aggregated import batch_generator from app.utils.aggregated import get_file_info_by_geid from .exception import INVALID_CHUNK_ETAG @@ -471,5 +473,32 @@ def check_status(self, file_objects: list[FileObject]) -> list[FileObject]: return unfinished_files + def upload_status_check(self, file_objects: list[FileObject]) -> None: + ''' + Summary: + The function is to check the list of upload status. + + Parameter: + - file_objects(list[FileObject]): the list of file objects that need to be checked. + + ''' + + unfinished_files = file_objects + wait_count = 0 + while len(unfinished_files) > 0: + temp = [] + if wait_count % AppConfig.Env.output_truncate_count == 0: + mhandler.SrvOutPutHandler.finalize_upload() + elif wait_count > AppConfig.Env.max_waiting_count: + SrvErrorHandler.customized_handle(ECustomizedError.UPLOAD_TIMEOUT, True) + + for file_batchs in batch_generator(file_objects, batch_size=AppConfig.Env.upload_batch_size): + temp.extend(self.check_status(file_batchs)) + unfinished_files = temp + wait_count += 1 + time.sleep(1) + + return + def set_finish_upload(self): self.finish_upload = True diff --git a/app/services/output_manager/error_handler.py b/app/services/output_manager/error_handler.py index e16724cf..5416fe85 100644 --- a/app/services/output_manager/error_handler.py +++ b/app/services/output_manager/error_handler.py @@ -42,6 +42,7 @@ class ECustomizedError(enum.Enum): UPLOAD_CANCEL = 'UPLOAD_CANCEL' UPLOAD_FAIL = 'UPLOAD_FAIL' UPLOAD_SKIP_DUPLICATION = 'UPLOAD_SKIP_DUPLICATION' + UPLOAD_TIMEOUT = 'UPLOAD_TIMEOUT' # the error when multipart upload id is not exist UPLOAD_ID_NOT_EXIST = 'UPLOAD_ID_NOT_EXIST' MANIFEST_OF_FOLDER_FILE_EXIST = 'MANIFEST_OF_FOLDER_FILE_EXIST' From 3ae66408852668d5a4ff70d8bdc58cc75279f630 Mon Sep 17 00:00:00 2001 From: zhiren Date: Mon, 10 Mar 2025 16:34:00 -0400 Subject: [PATCH 2/6] add test case for checking status timeout --- .../file_upload/test_upload_client.py | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/app/services/file_manager/file_upload/test_upload_client.py b/tests/app/services/file_manager/file_upload/test_upload_client.py index ab10f577..c9a998f3 100644 --- a/tests/app/services/file_manager/file_upload/test_upload_client.py +++ b/tests/app/services/file_manager/file_upload/test_upload_client.py @@ -71,6 +71,32 @@ def test_check_status_fail(httpx_mock, mocker): assert len(result) == 1 +def test_check_upload_status_with_timeout(httpx_mock, mocker, capfd): + upload_client = UploadClient('project_code', 'parent_folder_id') + + mocker.patch('app.services.file_manager.file_upload.models.FileObject.generate_meta', return_value=(1, 1)) + mocker.patch('app.services.user_authentication.token_manager.SrvTokenManager.check_valid', return_value=0) + + httpx_mock.add_response( + method='POST', + url=AppConfig.Connections.url_bff + '/v1/query/geid', + json={'result': [{'status': ItemStatus.REGISTERED, 'result': {'name': 'test', 'status': ItemStatus.ACTIVE}}]}, + status_code=200, + ) + + test_obj = FileObject('test', 'test', 'test', 'test', 'test') + try: + AppConfig.Env.max_waiting_count = 1 + upload_client.upload_status_check([test_obj]) + except SystemExit: + out, _ = capfd.readouterr() + + expect_out = 'Upload task was timeout. Please check the portal for the upload status.\n' + assert expect_out in out + else: + AssertionError('SystemExit not raised') + + def test_chunk_upload(httpx_mock, mocker): upload_client = UploadClient('project_code', 'parent_folder_id') From 2c27a9f5631b1efbb8f21fba5f0f2b7cab47297b Mon Sep 17 00:00:00 2001 From: zhiren Date: Mon, 10 Mar 2025 16:55:08 -0400 Subject: [PATCH 3/6] fixup the size check when resumable uploading --- app/services/file_manager/file_upload/file_upload.py | 7 +++++-- .../services/file_manager/file_upload/test_file_upload.py | 8 ++++++++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/app/services/file_manager/file_upload/file_upload.py b/app/services/file_manager/file_upload/file_upload.py index 097e20a8..2f278ad0 100644 --- a/app/services/file_manager/file_upload/file_upload.py +++ b/app/services/file_manager/file_upload/file_upload.py @@ -304,14 +304,17 @@ def resume_get_unfinished_items( f'expected size: {file_info.get("total_size")}, ' f'actual size: {x.get("result").get("size")}' ) - if file_info.get('total_size') != x.get('result').get('size'): + local_file_size = os.path.getsize(file_info.get('local_path')) + if file_info.get('total_size') != x.get('result').get('size') or local_file_size != x.get('result').get( + 'size' + ): SrvErrorHandler.customized_handle( ECustomizedError.INVALID_RESUMABLE_FILE_SIZE, if_exit=True, value=( file_info.get('object_path'), x.get('result').get('size'), - file_info.get('total_size'), + local_file_size, ), ) diff --git a/tests/app/services/file_manager/file_upload/test_file_upload.py b/tests/app/services/file_manager/file_upload/test_file_upload.py index 2880b210..20547caf 100644 --- a/tests/app/services/file_manager/file_upload/test_file_upload.py +++ b/tests/app/services/file_manager/file_upload/test_file_upload.py @@ -342,6 +342,10 @@ def test_resume_upload(mocker): resume_upload_mock = mocker.patch( 'app.services.file_manager.file_upload.file_upload.UploadClient.resume_upload', return_value=[] ) + mocker.patch( + 'os.path.getsize', + return_value=1, + ) resume_upload(manifest_json, 1) @@ -406,6 +410,10 @@ def test_resume_upload_integrity_check_failed(mocker, capfd): get_mock = mocker.patch( 'app.services.file_manager.file_upload.file_upload.get_file_info_by_geid', return_value=[{'result': get_return}] ) + mocker.patch( + 'os.path.getsize', + return_value=2, + ) try: resume_upload(manifest_json, 1) From 5f5e20525a84d4c592f98764b2c0c5711444dfcf Mon Sep 17 00:00:00 2001 From: zhiren Date: Mon, 10 Mar 2025 16:55:23 -0400 Subject: [PATCH 4/6] bumpup version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 6575043e..d1d3d070 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "app" -version = "3.15.0" +version = "3.15.1" description = "This service is designed to support pilot platform" authors = ["Indoc Systems"] From a93264930eb8679abd918c267354e2af8ce3df5d Mon Sep 17 00:00:00 2001 From: zhiren Date: Mon, 10 Mar 2025 17:03:02 -0400 Subject: [PATCH 5/6] bumpup version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index d1d3d070..597588a3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "app" -version = "3.15.1" +version = "3.16.0" description = "This service is designed to support pilot platform" authors = ["Indoc Systems"] From 00d2a9ee44dcd1a3771508aacab7bdd62d655240 Mon Sep 17 00:00:00 2001 From: zhiren Date: Mon, 10 Mar 2025 17:05:18 -0400 Subject: [PATCH 6/6] bumpup version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 597588a3..d1153397 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "app" -version = "3.16.0" +version = "3.15.2" description = "This service is designed to support pilot platform" authors = ["Indoc Systems"]