Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions app/configs/app_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ class Env:
# the number of items to active interative mode
interative_threshold = 10

# number looping when waiting upload status
output_truncate_count = 10
max_waiting_count = 30

github_url = 'PilotDataPlatform/cli'

zone_int2string = {
Expand Down
1 change: 1 addition & 0 deletions app/resources/custom_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class Error:
'The following files already exist in the upload destination: \n%s\n'
'Do you want to cancel the upload [N] or skip duplicates and continue uploading [y]?'
),
'UPLOAD_TIMEOUT': 'Upload task was timeout. Please check the portal for the upload status.',
'UPLOAD_ID_NOT_EXIST': (
'The specified multipart upload does not exist. '
'The upload ID may be invalid, or the upload may have been aborted or completed.'
Expand Down
28 changes: 9 additions & 19 deletions app/services/file_manager/file_upload/file_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,14 +254,8 @@ def simple_upload( # noqa: C901
pool.close()
pool.join()

unfinished_files = pre_upload_infos
while len(unfinished_files) > 0:
temp = []
mhandler.SrvOutPutHandler.finalize_upload()
for file_batchs in batch_generator(pre_upload_infos, batch_size=AppConfig.Env.upload_batch_size):
temp.extend(upload_client.check_status(file_batchs))
unfinished_files = temp

# check the status of the upload
upload_client.upload_status_check(pre_upload_infos)
num_of_file = len(pre_upload_infos)
logger.info(f'Upload Time: {time.time() - upload_start_time:.2f}s for {num_of_file:d} files')

Expand Down Expand Up @@ -310,14 +304,17 @@ def resume_get_unfinished_items(
f'expected size: {file_info.get("total_size")}, '
f'actual size: {x.get("result").get("size")}'
)
if file_info.get('total_size') != x.get('result').get('size'):
local_file_size = os.path.getsize(file_info.get('local_path'))
if file_info.get('total_size') != x.get('result').get('size') or local_file_size != x.get('result').get(
'size'
):
SrvErrorHandler.customized_handle(
ECustomizedError.INVALID_RESUMABLE_FILE_SIZE,
if_exit=True,
value=(
file_info.get('object_path'),
x.get('result').get('size'),
file_info.get('total_size'),
local_file_size,
),
)

Expand Down Expand Up @@ -390,14 +387,7 @@ def resume_upload(
pool.close()
pool.join()

unfinished_files = unfinished_items
while len(unfinished_files) > 0:
temp = []
mhandler.SrvOutPutHandler.finalize_upload()
for file_batchs in batch_generator(unfinished_items, batch_size=AppConfig.Env.upload_batch_size):
temp.extend(upload_client.check_status(file_batchs))
unfinished_files = temp
time.sleep(1)

# check the status of the upload
upload_client.upload_status_check(unfinished_items)
num_of_file = len(unfinished_items)
logger.info(f'Upload Time: {time.time() - upload_start_time:.2f}s for {num_of_file:d} files')
29 changes: 29 additions & 0 deletions app/services/file_manager/file_upload/upload_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import math
import os
import threading
import time
from logging import getLogger
from multiprocessing.pool import ThreadPool
from typing import Any
Expand All @@ -30,6 +31,7 @@
from app.services.output_manager.error_handler import SrvErrorHandler
from app.services.user_authentication.decorator import require_valid_token
from app.utils.aggregated import ItemStatus
from app.utils.aggregated import batch_generator
from app.utils.aggregated import get_file_info_by_geid

from .exception import INVALID_CHUNK_ETAG
Expand Down Expand Up @@ -471,5 +473,32 @@ def check_status(self, file_objects: list[FileObject]) -> list[FileObject]:

return unfinished_files

def upload_status_check(self, file_objects: list[FileObject]) -> None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can directly naming it as this and remove the first line of the function

Suggested change
def upload_status_check(self, file_objects: list[FileObject]) -> None:
def upload_status_check(self, unfinished_files: list[FileObject]) -> None:

'''
Summary:
The function is to check the list of upload status.

Parameter:
- file_objects(list[FileObject]): the list of file objects that need to be checked.

'''

unfinished_files = file_objects
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this line

wait_count = 0
while len(unfinished_files) > 0:
temp = []
if wait_count % AppConfig.Env.output_truncate_count == 0:
mhandler.SrvOutPutHandler.finalize_upload()
elif wait_count > AppConfig.Env.max_waiting_count:
SrvErrorHandler.customized_handle(ECustomizedError.UPLOAD_TIMEOUT, True)

for file_batchs in batch_generator(file_objects, batch_size=AppConfig.Env.upload_batch_size):
temp.extend(self.check_status(file_batchs))
unfinished_files = temp
wait_count += 1
time.sleep(1)

return

def set_finish_upload(self):
self.finish_upload = True
1 change: 1 addition & 0 deletions app/services/output_manager/error_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class ECustomizedError(enum.Enum):
UPLOAD_CANCEL = 'UPLOAD_CANCEL'
UPLOAD_FAIL = 'UPLOAD_FAIL'
UPLOAD_SKIP_DUPLICATION = 'UPLOAD_SKIP_DUPLICATION'
UPLOAD_TIMEOUT = 'UPLOAD_TIMEOUT'
# the error when multipart upload id is not exist
UPLOAD_ID_NOT_EXIST = 'UPLOAD_ID_NOT_EXIST'
MANIFEST_OF_FOLDER_FILE_EXIST = 'MANIFEST_OF_FOLDER_FILE_EXIST'
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "app"
version = "3.15.0"
version = "3.15.2"
description = "This service is designed to support pilot platform"
authors = ["Indoc Systems"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,10 @@ def test_resume_upload(mocker):
resume_upload_mock = mocker.patch(
'app.services.file_manager.file_upload.file_upload.UploadClient.resume_upload', return_value=[]
)
mocker.patch(
'os.path.getsize',
return_value=1,
)

resume_upload(manifest_json, 1)

Expand Down Expand Up @@ -406,6 +410,10 @@ def test_resume_upload_integrity_check_failed(mocker, capfd):
get_mock = mocker.patch(
'app.services.file_manager.file_upload.file_upload.get_file_info_by_geid', return_value=[{'result': get_return}]
)
mocker.patch(
'os.path.getsize',
return_value=2,
)

try:
resume_upload(manifest_json, 1)
Expand Down
26 changes: 26 additions & 0 deletions tests/app/services/file_manager/file_upload/test_upload_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,32 @@ def test_check_status_fail(httpx_mock, mocker):
assert len(result) == 1


def test_check_upload_status_with_timeout(httpx_mock, mocker, capfd):
upload_client = UploadClient('project_code', 'parent_folder_id')

mocker.patch('app.services.file_manager.file_upload.models.FileObject.generate_meta', return_value=(1, 1))
mocker.patch('app.services.user_authentication.token_manager.SrvTokenManager.check_valid', return_value=0)

httpx_mock.add_response(
method='POST',
url=AppConfig.Connections.url_bff + '/v1/query/geid',
json={'result': [{'status': ItemStatus.REGISTERED, 'result': {'name': 'test', 'status': ItemStatus.ACTIVE}}]},
status_code=200,
)

test_obj = FileObject('test', 'test', 'test', 'test', 'test')
try:
AppConfig.Env.max_waiting_count = 1
upload_client.upload_status_check([test_obj])
except SystemExit:
out, _ = capfd.readouterr()

expect_out = 'Upload task was timeout. Please check the portal for the upload status.\n'
assert expect_out in out
else:
AssertionError('SystemExit not raised')


def test_chunk_upload(httpx_mock, mocker):
upload_client = UploadClient('project_code', 'parent_folder_id')

Expand Down
Loading