diff --git a/app/configs/user_config.py b/app/configs/user_config.py index 86dac7e9..ecf10dd7 100644 --- a/app/configs/user_config.py +++ b/app/configs/user_config.py @@ -58,6 +58,7 @@ def __init__( This adjustment is made to prevent complications with mounted NFS volumes where all files have root ownership. """ + if config_path is None: config_path = ConfigClass.config_path if config_filename is None: diff --git a/app/services/crypto/crypto.py b/app/services/crypto/crypto.py index 519b86df..f156f7c5 100644 --- a/app/services/crypto/crypto.py +++ b/app/services/crypto/crypto.py @@ -75,5 +75,4 @@ def decryption(encrypted_message, secret, interactive=True): ehandler.SrvErrorHandler.default_handle(str(ex) + ', please try login as a valid user.') else: raise ex - else: - ehandler.SrvErrorHandler.customized_handle(ehandler.ECustomizedError.LOGIN_SESSION_INVALID, True) + return '' diff --git a/app/services/file_manager/file_upload/file_upload.py b/app/services/file_manager/file_upload/file_upload.py index 93abdc3d..bec492de 100644 --- a/app/services/file_manager/file_upload/file_upload.py +++ b/app/services/file_manager/file_upload/file_upload.py @@ -254,13 +254,13 @@ def simple_upload( # noqa: C901 pool.close() pool.join() - if attribute: - continue_loop = True - while continue_loop: - # the last uploaded file - succeed = upload_client.check_status(file_object) - continue_loop = not succeed - time.sleep(0.5) + unfinished_files = pre_upload_infos + while len(unfinished_files) > 0: + temp = [] + mhandler.SrvOutPutHandler.finalize_upload() + for file_batchs in batch_generator(pre_upload_infos, batch_size=AppConfig.Env.upload_batch_size): + temp.extend(upload_client.check_status(file_batchs)) + unfinished_files = temp num_of_file = len(pre_upload_infos) logger.info(f'Upload Time: {time.time() - upload_start_time:.2f}s for {num_of_file:d} files') @@ -341,7 +341,7 @@ def resume_upload( # out of thread pool. res = pool.apply_async( upload_client.on_succeed, - args=(file_object), + args=(file_object,), ) on_success_res.append(res) @@ -353,5 +353,14 @@ def resume_upload( pool.close() pool.join() + unfinished_files = unfinished_items + while len(unfinished_files) > 0: + temp = [] + mhandler.SrvOutPutHandler.finalize_upload() + for file_batchs in batch_generator(unfinished_items, batch_size=AppConfig.Env.upload_batch_size): + temp.extend(upload_client.check_status(file_batchs)) + unfinished_files = temp + time.sleep(1) + num_of_file = len(unfinished_items) logger.info(f'Upload Time: {time.time() - upload_start_time:.2f}s for {num_of_file:d} files') diff --git a/app/services/file_manager/file_upload/upload_client.py b/app/services/file_manager/file_upload/upload_client.py index 6c5ad80f..f67a1181 100644 --- a/app/services/file_manager/file_upload/upload_client.py +++ b/app/services/file_manager/file_upload/upload_client.py @@ -29,6 +29,7 @@ from app.services.output_manager.error_handler import ECustomizedError from app.services.output_manager.error_handler import SrvErrorHandler from app.services.user_authentication.decorator import require_valid_token +from app.utils.aggregated import ItemStatus from app.utils.aggregated import get_file_info_by_geid from .exception import INVALID_CHUNK_ETAG @@ -354,10 +355,11 @@ def on_complete(result): # for resumable check ONLY if user resume the upload at 100% # just check if there is any active job, if not, set the event - while not chunk_upload_done.wait(timeout=60): + while not chunk_upload_done.wait(timeout=5): if self.active_jobs == 0: chunk_upload_done.set() - logger.warning('Waiting for all the chunks to be uploaded, remaining jobs: %s', file_object.progress) + else: + logger.warning('Waiting for all the chunks to be uploaded, remaining jobs: %s', self.active_jobs) f.close() @@ -447,7 +449,7 @@ def on_succeed(self, file_object: FileObject) -> None: result = response.json().get('result') return result - def check_status(self, file_object: FileObject) -> bool: + def check_status(self, file_objects: list[FileObject]) -> list[FileObject]: """ Summary: The function is to check the status of upload process. @@ -458,14 +460,15 @@ def check_status(self, file_object: FileObject) -> bool: - bool: if job success or not """ - # with pre-register upload, we can check if the file entity is already exist - # if exist, we can continue with manifest process - file_entity = get_file_info_by_geid([file_object.item_id])[0].get('result', {}) - mhandler.SrvOutPutHandler.finalize_upload() - if file_entity.get('status') == 'ACTIVE': - return True - else: - return False + file_ids = [file_object.item_id for file_object in file_objects] + results = get_file_info_by_geid(file_ids) + unfinished_files = [] + for r in results: + status = r.get('status') + if status != ItemStatus.ACTIVE: + unfinished_files.append(r.get('result')) + + return unfinished_files def set_finish_upload(self): self.finish_upload = True diff --git a/app/services/output_manager/message_handler.py b/app/services/output_manager/message_handler.py index 05a56982..e188c2b9 100644 --- a/app/services/output_manager/message_handler.py +++ b/app/services/output_manager/message_handler.py @@ -356,7 +356,8 @@ def newer_version_available(version, download_url, print_message=True): clickable_text = f'\033]8;;{download_url}\033\\latest cli version\033]8;;\033\\' message = ( f'\nNewer version available! Pilotcli v{version} is available. Please vist \n{clickable_text}. ' - 'This link will expire in 10 minutes.\n' + 'This link will expire in 10 minutes. If the link doesn\'t show up, Please visit the \n' + 'support page on portal to download the latest version.' ) if print_message: logger.warning(message) diff --git a/app/utils/aggregated.py b/app/utils/aggregated.py index 375de75c..b70037dc 100644 --- a/app/utils/aggregated.py +++ b/app/utils/aggregated.py @@ -234,24 +234,15 @@ def remove_the_output_file(filepath: str) -> None: def get_latest_cli_version() -> Tuple[Version, str]: - import logging - import time try: - start_time = time.time() httpx_client = BaseClient(AppConfig.Connections.url_fileops_greenroom) - logging.critical(f'http client init time: {time.time() - start_time}') user_config = UserConfig() - logging.critical(f'user config init time: {time.time() - start_time}') - t1 = time.time() if not user_config.is_access_token_exists(): return Version('0.0.0') - logging.critical(f'Check token time: {time.time() - t1}') - t2 = time.time() headers = {'Authorization': 'Bearer'} response = httpx_client._get('v1/download/cli/presigned', headers=headers) - logging.critical(f'Get latest version time: {time.time() - t2}') result = response.json().get('result', {}) latest_version = result.get('linux', {}).get('version', '0.0.0') download_url = result.get('linux', {}).get('download_url', '') diff --git a/pyproject.toml b/pyproject.toml index 9c1db4e2..ba97ea02 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "app" -version = "3.14.0" +version = "3.14.1" description = "This service is designed to support pilot platform" authors = ["Indoc Systems"] diff --git a/tests/app/services/file_manager/file_upload/test_upload_client.py b/tests/app/services/file_manager/file_upload/test_upload_client.py index a72cc769..ab10f577 100644 --- a/tests/app/services/file_manager/file_upload/test_upload_client.py +++ b/tests/app/services/file_manager/file_upload/test_upload_client.py @@ -13,6 +13,7 @@ import pytest from app.configs.app_config import AppConfig +from app.models.item import ItemStatus from app.services.file_manager.file_upload.exception import INVALID_CHUNK_ETAG from app.services.file_manager.file_upload.models import FileObject from app.services.file_manager.file_upload.upload_client import UploadClient @@ -39,14 +40,14 @@ def test_check_status_success(httpx_mock, mocker): httpx_mock.add_response( method='POST', url=AppConfig.Connections.url_bff + '/v1/query/geid', - json={'result': [{'result': {'filename': 'test', 'status': 'ACTIVE'}}]}, + json={'result': [{'status': ItemStatus.ACTIVE, 'result': {'name': 'test', 'status': ItemStatus.ACTIVE}}]}, status_code=200, ) test_obj = FileObject('test', 'test', 'test', 'test', 'test') - result = upload_client.check_status(test_obj) + result = upload_client.check_status([test_obj]) - assert result is True + assert len(result) == 0 def test_check_status_fail(httpx_mock, mocker): @@ -58,14 +59,16 @@ def test_check_status_fail(httpx_mock, mocker): httpx_mock.add_response( method='POST', url=AppConfig.Connections.url_bff + '/v1/query/geid', - json={'result': [{'result': {'filename': 'test', 'status': 'REGISTERED'}}]}, + json={ + 'result': [{'status': ItemStatus.REGISTERED, 'result': {'name': 'test', 'status': ItemStatus.REGISTERED}}] + }, status_code=200, ) test_obj = FileObject('test', 'test', 'test', 'test', 'test') - result = upload_client.check_status(test_obj) + result = upload_client.check_status([test_obj]) - assert result is False + assert len(result) == 1 def test_chunk_upload(httpx_mock, mocker): diff --git a/tests/conftest.py b/tests/conftest.py index 9072d179..f9845784 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -52,7 +52,7 @@ def mock_upload_client(monkeypatch): monkeypatch.setattr(UploadClient, 'stream_upload', lambda *args, **kwargs: []) monkeypatch.setattr(UploadClient, 'on_succeed', lambda *args, **kwargs: None) monkeypatch.setattr(UploadClient, 'output_manifest', lambda *args, **kwargs: {}) - monkeypatch.setattr(UploadClient, 'check_status', lambda *args, **kwargs: True) + monkeypatch.setattr(UploadClient, 'check_status', lambda *args, **kwargs: []) @pytest.fixture