Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ Command line tool that allows the user to execute data operations on the platfor

Linux example for each environment:

pyinstaller -F --distpath ./app/bundled_app/linux --specpath ./app/build/linux --workpath ./app/build/linux --paths=./.venv/lib/python3.8/site-packages ./app/pilotcli.py -n <app-name>
pyinstaller -F --distpath ./app/bundled_app/linux --specpath ./app/build/linux --workpath ./app/build/linux --paths=./.venv/lib/python3.10/site-packages ./app/pilotcli.py -n <app-name>

Note: Building for ARM Mac may require a newer version of `pyinstaller`.

Expand Down
4 changes: 0 additions & 4 deletions app/configs/app_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ class Env:
greenroom_bucket_prefix = 'gr'
# the number of items to active interative mode
interative_threshold = 10
# set hard limit for pending jobs, otherwise cli will consume all memory
# to cache jobs. If later on the speed of chunk deliver become faster, we
# can increase the concurrency number.
num_of_jobs = ConfigClass.concurrent_job_limit

github_url = 'PilotDataPlatform/cli'

Expand Down
1 change: 0 additions & 1 deletion app/configs/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class Settings(BaseSettings):
apikey_endpoint: str = 'api-key'

upload_batch_size: int = 100
concurrent_job_limit: int = 10
upload_chunk_size: int = 1024 * 1024 * 20 # 20MB

def __init__(self, **data):
Expand Down
31 changes: 18 additions & 13 deletions app/configs/user_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ class UserConfig(metaclass=Singleton):
This user config is global.
"""

_api_key: str
_access_token: str
_refresh_token: str
_username: str

def __init__(
self,
config_path: Union[str, Path, None] = None,
Expand All @@ -53,7 +58,6 @@ def __init__(

This adjustment is made to prevent complications with mounted NFS volumes where all files have root ownership.
"""

if config_path is None:
config_path = ConfigClass.config_path
if config_filename is None:
Expand Down Expand Up @@ -106,6 +110,12 @@ def __init__(
}
self.save()

# load all config into memory
self._api_key = decryption(self.config['USER']['api_key'], self.secret)
self._access_token = decryption(self.config['USER']['access_token'], self.secret)
self._refresh_token = decryption(self.config['USER']['refresh_token'], self.secret)
self._username = decryption(self.config['USER']['username'], self.secret)

def _check_user_permissions(self, path: Path, expected_bits: Iterable[int]) -> Union[str, None]:
"""Check if file or folder is owned by the user and has proper access mode."""

Expand Down Expand Up @@ -147,42 +157,37 @@ def is_access_token_exists(self) -> bool:

@property
def username(self):
return decryption(self.config['USER']['username'], self.secret)
return self._username

@username.setter
def username(self, val):
self.config['USER']['username'] = encryption(val, self.secret)

@property
def password(self):
return decryption(self.config['USER']['password'], self.secret)

@password.setter
def password(self, val):
self.config['USER']['password'] = encryption(val, self.secret)

@property
def api_key(self):
return decryption(self.config['USER']['api_key'], self.secret)
return self._api_key

@api_key.setter
def api_key(self, val):
self._api_key = val
self.config['USER']['api_key'] = encryption(val, self.secret)

@property
def access_token(self):
return decryption(self.config['USER'].get('access_token', ''), self.secret)
return self._access_token

@access_token.setter
def access_token(self, val):
self._access_token = val
self.config['USER']['access_token'] = encryption(val, self.secret)

@property
def refresh_token(self):
return decryption(self.config['USER']['refresh_token'], self.secret)
return self._refresh_token

@refresh_token.setter
def refresh_token(self, val):
self._refresh_token = val
self.config['USER']['refresh_token'] = encryption(val, self.secret)

@property
Expand Down
3 changes: 2 additions & 1 deletion app/services/clients/base_auth_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
class BaseAuthClient(BaseClient):

token_manager: SrvTokenManager
user = UserConfig()
user: UserConfig

def __init__(self, endpoint: str, timeout: int = 10) -> None:
super().__init__(endpoint, timeout)

self.user = UserConfig()
self.token_manager = SrvTokenManager()
self.headers = {
'Authorization': 'Bearer ' + self.user.access_token,
Expand Down
1 change: 1 addition & 0 deletions app/services/clients/base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def _request(
if response.status_code not in self.retry_status:
response.raise_for_status()
return response

time.sleep(self.retry_interval)

logger.debug(f'failed with over {self.retry_count} retries.')
Expand Down
4 changes: 3 additions & 1 deletion app/services/file_manager/file_manifests.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ def dupe_checking_hook(pairs):

class SrvFileManifests(BaseAuthClient, metaclass=MetaService):
app_config = AppConfig()
user = UserConfig()
user: UserConfig

def __init__(self, interactive=True):
super().__init__(self.app_config.Connections.url_bff)

self.user = UserConfig()

self.interactive = interactive
self.endpoint = self.app_config.Connections.url_bff + '/v1'

Expand Down
3 changes: 2 additions & 1 deletion app/services/file_manager/file_tag.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

class SrvFileTag(metaclass=MetaService):
appconfig = AppConfig()
user = UserConfig()
user: UserConfig

def __init__(self, interactive=True):
self.interactive = interactive
self.user = UserConfig()

@staticmethod
def validate_tag(tag):
Expand Down
13 changes: 2 additions & 11 deletions app/services/file_manager/file_upload/file_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,7 @@ def simple_upload( # noqa: C901

# now loop over each file under the folder and start
# the chunk upload

# thread number +1 reserve one thread to refresh token
# and remove the token decorator in functions

pool = ThreadPool(num_of_thread + 1)
pool.apply_async(upload_client.upload_token_refresh)
pool = ThreadPool(num_of_thread)
on_success_res = []

file_object: FileObject
Expand Down Expand Up @@ -338,11 +333,7 @@ def resume_upload(
mhandler.SrvOutPutHandler.resume_check_success()

# lastly, start resumable upload for the rest of the chunks
# thread number +1 reserve one thread to refresh token
# and remove the token decorator in functions

pool = ThreadPool(num_of_thread + 1)
pool.apply_async(upload_client.upload_token_refresh)
pool = ThreadPool(num_of_thread)
on_success_res = []
for file_object in unfinished_items:
upload_client.stream_upload(file_object, pool)
Expand Down
30 changes: 10 additions & 20 deletions app/services/file_manager/file_upload/upload_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import math
import os
import threading
import time
from logging import getLogger
from multiprocessing.pool import ThreadPool
from typing import Any
Expand All @@ -22,7 +21,6 @@

import app.services.output_manager.message_handler as mhandler
from app.configs.app_config import AppConfig
from app.configs.config import ConfigClass
from app.configs.user_config import UserConfig
from app.models.upload_form import generate_on_success_form
from app.services.clients.base_auth_client import BaseAuthClient
Expand All @@ -31,7 +29,6 @@
from app.services.output_manager.error_handler import ECustomizedError
from app.services.output_manager.error_handler import SrvErrorHandler
from app.services.user_authentication.decorator import require_valid_token
from app.services.user_authentication.token_manager import SrvTokenManager
from app.utils.aggregated import get_file_info_by_geid

from .exception import INVALID_CHUNK_ETAG
Expand Down Expand Up @@ -92,7 +89,6 @@ def __init__(
# for tracking the multi-threading chunk upload
self.active_jobs = 0
self.lock = threading.Lock()
self.chunk_upload_done = threading.Event()

def generate_meta(self, local_path: str) -> Tuple[int, int]:
"""
Expand Down Expand Up @@ -309,14 +305,15 @@ def stream_upload(self, file_object: FileObject, pool: ThreadPool) -> None:
been uploaded.
"""
count = 0
semaphore = threading.Semaphore(AppConfig.Env.num_of_jobs)
semaphore = threading.Semaphore(pool._processes + 1)
chunk_upload_done = threading.Event()

def on_complete(result):
semaphore.release()
with self.lock:
self.active_jobs -= 1
if self.active_jobs == 0:
self.chunk_upload_done.set()
chunk_upload_done.set()

# process on the file content
f = open(file_object.local_path, 'rb')
Expand Down Expand Up @@ -355,8 +352,14 @@ def on_complete(result):

count += 1

# for resumable check ONLY if user resume the upload at 100%
# just check if there is any active job, if not, set the event
while not chunk_upload_done.wait(timeout=60):
if self.active_jobs == 0:
chunk_upload_done.set()
logger.warning('Waiting for all the chunks to be uploaded, remaining jobs: %s', file_object.progress)

f.close()
self.chunk_upload_done.wait()

def upload_chunk(self, file_object: FileObject, chunk_number: int, chunk: str, etag: str, chunk_size: int) -> None:
"""
Expand Down Expand Up @@ -466,16 +469,3 @@ def check_status(self, file_object: FileObject) -> bool:

def set_finish_upload(self):
self.finish_upload = True

def upload_token_refresh(self, azp: str = ConfigClass.keycloak_device_client_id):
token_manager = SrvTokenManager()
DEFAULT_INTERVAL = 2 # seconds to check if the upload is finished
total_count = 0 # when total_count equals token_refresh_interval, refresh token
while self.finish_upload is not True:
if total_count >= AppConfig.Env.token_refresh_interval:
token_manager.refresh(azp)
total_count = 0

# if not then sleep for DEFAULT_INTERVAL seconds
time.sleep(DEFAULT_INTERVAL)
total_count = total_count + DEFAULT_INTERVAL
6 changes: 1 addition & 5 deletions app/services/user_authentication/token_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@
class SrvTokenManager(BaseClient, metaclass=MetaService):
def __init__(self):
super().__init__(AppConfig.Connections.url_keycloak_token, 10)
user_config = UserConfig()
if user_config.is_logged_in():
self.config = user_config
else:
raise Exception('Login session not found, please login first.')
self.config = UserConfig()

def update_token(self, access_token, refresh_token):
self.config.access_token = access_token
Expand Down
16 changes: 14 additions & 2 deletions app/utils/aggregated.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from app.models.item import ItemStatus
from app.models.item import ItemType
from app.services.clients.base_auth_client import BaseAuthClient
from app.services.clients.base_auth_client import BaseClient
from app.services.logger_services.debugging_log import debug_logger
from app.services.output_manager.error_handler import ECustomizedError
from app.services.output_manager.error_handler import SrvErrorHandler
Expand Down Expand Up @@ -233,13 +234,24 @@ def remove_the_output_file(filepath: str) -> None:


def get_latest_cli_version() -> Tuple[Version, str]:
import logging
import time

try:
httpx_client = BaseAuthClient(AppConfig.Connections.url_fileops_greenroom)
start_time = time.time()
httpx_client = BaseClient(AppConfig.Connections.url_fileops_greenroom)
logging.critical(f'http client init time: {time.time() - start_time}')
user_config = UserConfig()
logging.critical(f'user config init time: {time.time() - start_time}')
t1 = time.time()
if not user_config.is_access_token_exists():
return Version('0.0.0')
logging.critical(f'Check token time: {time.time() - t1}')
t2 = time.time()

response = httpx_client._get('v1/download/cli/presigned')
headers = {'Authorization': 'Bearer'}
response = httpx_client._get('v1/download/cli/presigned', headers=headers)
logging.critical(f'Get latest version time: {time.time() - t2}')
result = response.json().get('result', {})
latest_version = result.get('linux', {}).get('version', '0.0.0')
download_url = result.get('linux', {}).get('download_url', '')
Expand Down
Loading
Loading