Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ Command line tool that allows the user to execute data operations on the platfor
| 3.15.0 | 2.14.2 | 2.14.2 |
| 3.15.1 | 2.14.2 | 2.14.2 |
| 3.15.2 | 2.15 | 2.15 |
| 3.16.0 | 2.15 | 2.15 |
| 3.17.0 | 2.15 | 2.15 |

### Build Instructions
1. Each system has its own credential, so building should be done after the updated the env file.
Expand Down
2 changes: 1 addition & 1 deletion app/commands/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ def file_resume(**kwargs): # noqa: C901

# since only file upload can attach manifest, take the first file object
srv_manifest = SrvFileManifests()
item_id = next(iter(resumable_manifest.get('file_objects')))
item_id = next(iter(resumable_manifest.get('registered_items')))
attribute = resumable_manifest.get('attributes')
zone = resumable_manifest.get('zone')
srv_manifest.attach_manifest(attribute, item_id, zone) if attribute else None
Expand Down
126 changes: 89 additions & 37 deletions app/services/file_manager/file_upload/file_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,58 @@ def assemble_path(
return current_folder_node, parent_folder, create_folder_flag, target_folder


def item_duplication_check(
create_folder_flag: bool, file_objects: List[FileObject], upload_client: UploadClient
) -> List[FileObject]:
'''
Summary:
The function will check if the file is already uploaded on the platform.
If the file is already uploaded, the function will ask if to skip the file.
Otherwise, the function will exit
Parameter:
- create_folder_flag(bool): the flag to indicate if need to create new folder
- file_objects(List[FileObject]): the list of file object
- upload_client(UploadClient): the upload client object
Return:
- non_duplicate_file_objects(List[FileObject]): the list of file object that is not duplicated
'''

# make the file duplication check to allow folde merging
logger.info('Start checking file duplication')
non_duplicate_file_objects = []
if create_folder_flag is True:
non_duplicate_file_objects = file_objects
else:
mhandler.SrvOutPutHandler.file_duplication_check()
duplicated_file = []
debug_logger.debug(f'upload batch size: {AppConfig.Env.upload_batch_size}')
for file_batchs in batch_generator(file_objects, batch_size=AppConfig.Env.upload_batch_size):
start_time = time.time()
non_duplicates, duplicate_path = upload_client.check_upload_duplication(file_batchs)
debug_logger.debug(f'Check duplication time: {time.time() - start_time:.2f}s')
non_duplicate_file_objects.extend(non_duplicates)
duplicated_file.extend(duplicate_path)

# if all file objects we check are already existed on the platform
# then we will exit the upload process
if len(non_duplicate_file_objects) == 0 and len(file_objects) != 0:
mhandler.SrvOutPutHandler.file_duplication_check_warning_with_all_same()
SrvErrorHandler.customized_handle(ECustomizedError.UPLOAD_CANCEL, if_exit=True)
elif len(duplicated_file) > 0:
mhandler.SrvOutPutHandler.file_duplication_check_success()
duplicate_warning_format = '\n'.join(duplicated_file)
try:
click.confirm(
customized_error_msg(ECustomizedError.UPLOAD_SKIP_DUPLICATION) % (duplicate_warning_format),
abort=True,
)
except Abort:
mhandler.SrvOutPutHandler.cancel_upload()
exit(1)

return non_duplicate_file_objects


def simple_upload( # noqa: C901
upload_event,
num_of_thread: int = 1,
Expand Down Expand Up @@ -191,35 +243,8 @@ def simple_upload( # noqa: C901
else:
file_objects.append(file_object)

# make the file duplication check to allow folde merging
non_duplicate_file_objects = []
if create_folder_flag is True:
non_duplicate_file_objects = file_objects
else:
mhandler.SrvOutPutHandler.file_duplication_check()
duplicated_file = []
debug_logger.debug(f'upload batch size: {AppConfig.Env.upload_batch_size}')
for file_batchs in batch_generator(file_objects, batch_size=AppConfig.Env.upload_batch_size):
start_time = time.time()
non_duplicates, duplicate_path = upload_client.check_upload_duplication(file_batchs)
debug_logger.debug(f'Check duplication time: {time.time() - start_time:.2f}s')
non_duplicate_file_objects.extend(non_duplicates)
duplicated_file.extend(duplicate_path)

if len(non_duplicate_file_objects) == 0:
mhandler.SrvOutPutHandler.file_duplication_check_warning_with_all_same()
SrvErrorHandler.customized_handle(ECustomizedError.UPLOAD_CANCEL, if_exit=True)
elif len(duplicated_file) > 0:
mhandler.SrvOutPutHandler.file_duplication_check_success()
duplicate_warning_format = '\n'.join(duplicated_file)
try:
click.confirm(
customized_error_msg(ECustomizedError.UPLOAD_SKIP_DUPLICATION) % (duplicate_warning_format),
abort=True,
)
except Abort:
mhandler.SrvOutPutHandler.cancel_upload()
exit(1)
# make the file duplication check to allow folder merging
non_duplicate_file_objects = item_duplication_check(create_folder_flag, file_objects, upload_client)

# here is list of pre upload result. We decided to call pre upload api by batch
pre_upload_infos = []
Expand All @@ -228,8 +253,11 @@ def simple_upload( # noqa: C901
# the placeholder in object storage
pre_upload_infos.extend(upload_client.pre_upload(file_batchs, output_path))

# then output manifest file to the output path
upload_client.output_manifest(pre_upload_infos, output_path)
# then output manifest file to the output path AFTER EACH BATCH
# which will include unfinished objects
upload_client.output_manifest(
pre_upload_infos, non_duplicate_file_objects[len(pre_upload_infos) + 1 :], output_path
)

# now loop over each file under the folder and start
# the chunk upload
Expand Down Expand Up @@ -299,12 +327,12 @@ def resume_get_unfinished_items(
elif x.get('result').get('status') == ItemStatus.REGISTERED:
file_info = all_files.get(file_meta.get('id'))
# check if size is matched during resume vs preupload
local_file_size = os.path.getsize(file_info.get('local_path'))
logger.info(
f'Check file size: {file_info.get("object_path")}, '
f'expected size: {file_info.get("total_size")}, '
f'actual size: {x.get("result").get("size")}'
f'expected size: {x.get("result").get("size")}, '
f'actual size: {local_file_size}'
)
local_file_size = os.path.getsize(file_info.get('local_path'))
if file_info.get('total_size') != x.get('result').get('size') or local_file_size != x.get('result').get(
'size'
):
Expand Down Expand Up @@ -359,9 +387,33 @@ def resume_upload(
)

# check files in manifest if some of them are already uploaded
all_files = manifest_json.get('file_objects')
item_ids = list(all_files.keys())
unfinished_items = resume_get_unfinished_items(upload_client, all_files, item_ids)
registered_items = manifest_json.get('registered_items')
item_ids = list(registered_items.keys())
unfinished_items = resume_get_unfinished_items(upload_client, registered_items, item_ids)
logger.info(f'Registered items: {len(unfinished_items)}')

# make the file duplication check to allow folder merging
unregistered_items = manifest_json.get('unregistered_items')
unregistered_items = [
FileObject(
object_path=value.get('object_path'),
local_path=value.get('local_path'),
resumable_id=value.get('resumable_id'),
job_id=value.get('job_id'),
item_id=value.get('item_id'),
)
for _, value in unregistered_items.items()
]
unregistered_items = item_duplication_check(False, unregistered_items, upload_client)
logger.info(f'Unregistered items: {len(unregistered_items)}')

# redo preupload again
batch_count = 1
resumable_manifest_file = manifest_json.get('resumable_manifest_file')
for file_batchs in batch_generator(unregistered_items, batch_size=AppConfig.Env.upload_batch_size):
unfinished_items.extend(upload_client.pre_upload(file_batchs, resumable_manifest_file))
upload_client.output_manifest(unfinished_items, unregistered_items[batch_count + 1 :], resumable_manifest_file)
batch_count += 1

mhandler.SrvOutPutHandler.resume_warning(len(unfinished_items))
mhandler.SrvOutPutHandler.resume_check_success()
Expand Down
15 changes: 11 additions & 4 deletions app/services/file_manager/file_upload/upload_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,18 @@ def pre_upload(self, file_objects: List[FileObject], output_path: str) -> List[F
mhandler.SrvOutPutHandler.preupload_success()
return file_objets

def output_manifest(self, file_objects: List[FileObject], output_path: str) -> Dict[str, Any]:
def output_manifest(
self, registered_items: List[FileObject], unregistered_items: List[FileObject], output_path: str
) -> Dict[str, Any]:
"""
Summary:
The function is to output the manifest file.
Parameter:
- file_objects(list of FileObject): the file objects that contains correct
information for chunk uploading.
- registered_items(list of FileObject): the file object that has been registered
in metadata service.
- unregistered_items(list of FileObject): the file object that has not been registered
and still need to pass thought preupload
- output_path(str): the output path of manifest.
return:
- manifest_json(dict): the manifest file in json format.
"""
Expand All @@ -285,8 +290,10 @@ def output_manifest(self, file_objects: List[FileObject], output_path: str) -> D
'parent_folder_id': self.parent_folder_id,
'current_folder_node': self.current_folder_node,
'tags': self.tags,
'file_objects': {file_object.item_id: file_object.to_dict() for file_object in file_objects},
'registered_items': {file_object.item_id: file_object.to_dict() for file_object in registered_items},
'unregistered_items': {file_object.local_path: file_object.to_dict() for file_object in unregistered_items},
'attributes': self.attributes if self.attributes else {},
'resumable_manifest_file': output_path,
}

with open(output_path, 'w') as f:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "app"
version = "3.16.0"
version = "3.17.0"
description = "This service is designed to support pilot platform"
authors = ["Indoc Systems"]

Expand Down
8 changes: 6 additions & 2 deletions tests/app/commands/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,10 @@ def test_resumable_upload_command_success(mocker, cli_runner):
with runner.isolated_filesystem():
mocker.patch('os.path.exists', return_value=True)
with open('test.json', 'w') as f:
json.dump({'file_objects': {'test_item_id': {'file_name': 'test.json'}}, 'zone': 1}, f)
json.dump(
{'registered_items': {'test_item_id': {'file_name': 'test.json'}}, 'unregistered_items': {}, 'zone': 1},
f,
)

mocker.patch('app.commands.file.resume_upload', return_value=None)
mocker.patch('os.remove', return_value=None)
Expand All @@ -205,7 +208,8 @@ def test_resumable_upload_command_with_file_attribute_success(mocker, cli_runner
with open('test.json', 'w') as f:
json.dump(
{
'file_objects': {'test_item_id': {'file_name': 'test.json'}},
'registered_items': {'test_item_id': {'file_name': 'test.json'}},
'unregistered_items': {},
'zone': 1,
'attributes': {'M1': {'attr1': '1'}},
},
Expand Down
10 changes: 7 additions & 3 deletions tests/app/services/file_manager/file_upload/test_file_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ def test_folder_merge_skip_with_all_duplication(mocker, mock_upload_client, capf
out, _ = capfd.readouterr()
expect = (
f'Starting upload of: {file_name}\n'
+ 'Start checking file duplication\n'
+ 'Checking for file duplication...\n'
+ '\nAll files already exist in the upload destination.\n\n'
+ customized_error_msg(ECustomizedError.UPLOAD_CANCEL)
Expand All @@ -328,7 +329,8 @@ def test_resume_upload(mocker):
'parent_folder_id': 'parent_folder_id',
'current_folder_node': 'current_folder_node',
'tags': 'tags',
'file_objects': {test_obj.item_id: test_obj.to_dict()},
'registered_items': {test_obj.item_id: test_obj.to_dict()},
'unregistered_items': {},
'total_size': 1,
}

Expand Down Expand Up @@ -364,7 +366,8 @@ def test_resume_upload_failed_when_REGISTERED_doesnt_exist(mocker, capfd):
'parent_folder_id': 'parent_folder_id',
'current_folder_node': 'current_folder_node',
'tags': 'tags',
'file_objects': {test_obj.item_id: test_obj.to_dict()},
'registered_items': {test_obj.item_id: test_obj.to_dict()},
'unregistered_items': {},
}

get_return = test_obj.to_dict()
Expand Down Expand Up @@ -399,7 +402,8 @@ def test_resume_upload_integrity_check_failed(mocker, capfd):
'parent_folder_id': 'parent_folder_id',
'current_folder_node': 'current_folder_node',
'tags': 'tags',
'file_objects': {test_obj.item_id: test_obj.to_dict()},
'registered_items': {test_obj.item_id: test_obj.to_dict()},
'unregistered_items': {},
'total_size': 1,
}

Expand Down
31 changes: 21 additions & 10 deletions tests/app/services/file_manager/file_upload/test_upload_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,22 +352,33 @@ def test_check_upload_duplication_fail_with_500(httpx_mock, mocker, capfd):
AssertionError('SystemExit not raised')


def test_output_manifest_success(mocker, tmp_path):
def test_output_manifest_success_for_resumable_upload(mocker, tmp_path):
upload_client = UploadClient('project_code', 'parent_folder_id')
json_dump_mocker = mocker.patch('json.dump', return_value=None)
mocker.patch('app.services.file_manager.file_upload.models.FileObject.generate_meta', return_value=(1, 1))
test_obj = FileObject('object/path', 'local_path', 'resumable_id', 'job_id', 'item_id')
test_obj_registered = FileObject(
'object/test_obj_registered', 'local_path_1', 'resumable_id_1', 'job_id_1', 'item_id_1'
)
test_obj_unregistered = FileObject('object/test_obj_unregistered', 'local_path_2', 'resumable_id_2', 'job_id_2', '')

res = upload_client.output_manifest([test_obj], output_path=str(tmp_path / 'test'))
res = upload_client.output_manifest(
[test_obj_registered], [test_obj_unregistered], output_path=str(tmp_path / 'test')
)

assert res.get('project_code') == 'project_code'
assert res.get('parent_folder_id') == 'parent_folder_id'
assert len(res.get('file_objects')) == 1

file_item = res.get('file_objects').get('item_id')
assert file_item.get('resumable_id') == 'resumable_id'
assert file_item.get('local_path') == 'local_path'
assert file_item.get('object_path') == 'object/path'
assert file_item.get('item_id') == 'item_id'
assert len(res.get('registered_items')) == 1

file_item = res.get('registered_items').get('item_id_1')
assert file_item.get('resumable_id') == 'resumable_id_1'
assert file_item.get('local_path') == 'local_path_1'
assert file_item.get('object_path') == 'object/test_obj_registered'
assert file_item.get('item_id') == 'item_id_1'

assert len(res.get('unregistered_items')) == 1
file_item = res.get('unregistered_items').get('local_path_2') # unregistered item dont have id
assert file_item.get('resumable_id') == 'resumable_id_2'
assert file_item.get('local_path') == 'local_path_2'
assert file_item.get('object_path') == 'object/test_obj_unregistered'

json_dump_mocker.assert_called_once()
Loading