From 8ae6edaf51367667e3dd4ab9252631e8f485b6d8 Mon Sep 17 00:00:00 2001 From: dishantsethi Date: Fri, 12 Jun 2026 23:26:28 +0530 Subject: [PATCH] add support to upload csv --- willisapi_client/services/metadata/archive.py | 83 +++++++++++++++++++ willisapi_client/services/metadata/upload.py | 42 ++++++++++ willisapi_client/willisapi_client.py | 6 ++ 3 files changed, 131 insertions(+) create mode 100644 willisapi_client/services/metadata/archive.py diff --git a/willisapi_client/services/metadata/archive.py b/willisapi_client/services/metadata/archive.py new file mode 100644 index 0000000..cea8395 --- /dev/null +++ b/willisapi_client/services/metadata/archive.py @@ -0,0 +1,83 @@ +import os +import requests + +from willisapi_client.willisapi_client import WillisapiClient +from willisapi_client.logging_setup import logger as logger + + +def _archive_headers(api_key): + return { + "Content-Type": "application/json", + "Accept": "application/json", + "Authorization": f"token {api_key}", + } + + +def archive_metadata_csv(api_key, csv_path, total_rows, upload_type, env=None): + """Archive the source metadata CSV alongside the data upload. + + Creates a server-side tracking row, then PUTs the raw CSV to the returned + presigned S3 URL. Returns the tracking ``record_id`` on success (so the + caller can finalize it with row counts), or ``None`` on failure. + + All failures are logged and swallowed — archiving must never abort the + actual data upload. + """ + try: + wc = WillisapiClient(env=env) + payload = { + "filename": os.path.basename(csv_path), + "total_rows": total_rows, + "upload_type": upload_type, + } + res = requests.post( + wc.get_csv_archive_url(), headers=_archive_headers(api_key), json=payload + ) + if res.status_code != 201: + logger.warning(f"CSV archive init failed: {res.status_code} {res.text}") + return None + + body = res.json() + record_id = body.get("record_id") + presigned = body.get("presigned_url") + if not presigned: + logger.warning("CSV archive: no presigned URL returned") + return None + + with open(csv_path, "rb") as f: + put_res = requests.put( + presigned, data=f, headers={"Content-Type": "text/csv"} + ) + if put_res.status_code not in (200, 204): + logger.warning(f"CSV archive S3 upload failed: {put_res.status_code}") + finalize_metadata_csv(api_key, record_id, "failed", 0, 0, env=env) + return None + + return record_id + except Exception as ex: + logger.warning(f"CSV archive error: {ex}") + return None + + +def finalize_metadata_csv( + api_key, record_id, status, successful_rows, failed_rows, env=None +): + """Report the CSV archive outcome and parsed row counts to the server.""" + if not record_id: + return + try: + wc = WillisapiClient(env=env) + payload = { + "status": status, + "successful_rows": successful_rows, + "failed_rows": failed_rows, + } + res = requests.patch( + wc.get_csv_archive_finalize_url(record_id), + headers=_archive_headers(api_key), + json=payload, + ) + if res.status_code != 200: + logger.warning(f"CSV archive finalize failed: {res.status_code} {res.text}") + except Exception as ex: + logger.warning(f"CSV archive finalize error: {ex}") diff --git a/willisapi_client/services/metadata/upload.py b/willisapi_client/services/metadata/upload.py index 6fc9e67..97a1cb8 100644 --- a/willisapi_client/services/metadata/upload.py +++ b/willisapi_client/services/metadata/upload.py @@ -19,6 +19,10 @@ find_files_with_pattern, get_last_n_directories, ) +from willisapi_client.services.metadata.archive import ( + archive_metadata_csv, + finalize_metadata_csv, +) VALID_SCORE_TYPES = ["rater", "reviewer"] @@ -38,6 +42,15 @@ def upload(api_key: str, csv_path: str, **kwargs): headers["Authorization"] = f"token {api_key}" logger.info(f'{datetime.now().strftime("%H:%M:%S")}: beginning upload') + # Archive the source metadata CSV and open a tracking record. + archive_record_id = archive_metadata_csv( + api_key, + csv_path, + int(csv.transformed_df.shape[0]), + upload_type="data", + env=kwargs.get("env"), + ) + results = [] for index, row in tqdm( csv.transformed_df.iterrows(), total=csv.transformed_df.shape[0] @@ -96,6 +109,16 @@ def upload(api_key: str, csv_path: str, **kwargs): result_row["error"] = f"{err}" results.append(result_row) + successful_rows = sum(1 for r in results if r.get("upload_status") == "Success") + finalize_metadata_csv( + api_key, + archive_record_id, + "successful", + successful_rows, + len(results) - successful_rows, + env=kwargs.get("env"), + ) + results_df = pd.DataFrame(results) return results_df else: @@ -128,6 +151,15 @@ def processed_upload(api_key: str, csv_path: str, output_path: str, **kwargs): headers["Authorization"] = f"token {api_key}" logger.info(f'{datetime.now().strftime("%H:%M:%S")}: beginning upload') + # Archive the source processed-data metadata CSV and open a tracking record. + archive_record_id = archive_metadata_csv( + api_key, + csv_path, + int(csv.transformed_df.shape[0]), + upload_type="processed_data", + env=kwargs.get("env"), + ) + results = [] for index, row in tqdm( csv.transformed_df.iterrows(), total=csv.transformed_df.shape[0] @@ -205,6 +237,16 @@ def processed_upload(api_key: str, csv_path: str, output_path: str, **kwargs): result_row["error"] = f"{err}" results.append(result_row) + successful_rows = sum(1 for r in results if r.get("upload_status") == "Success") + finalize_metadata_csv( + api_key, + archive_record_id, + "successful", + successful_rows, + len(results) - successful_rows, + env=kwargs.get("env"), + ) + results_df = pd.DataFrame(results) return results_df else: diff --git a/willisapi_client/willisapi_client.py b/willisapi_client/willisapi_client.py index 500bdb0..5b49f90 100644 --- a/willisapi_client/willisapi_client.py +++ b/willisapi_client/willisapi_client.py @@ -40,5 +40,11 @@ def get_upload_url(self): def get_processed_upload_url(self): return self.get_base_v2_url() + "metadata/processed-data/upload" + def get_csv_archive_url(self): + return self.get_base_v2_url() + "metadata/csv-archive" + + def get_csv_archive_finalize_url(self, record_id): + return self.get_csv_archive_url() + f"/{record_id}" + def get_headers(self): return {"Content-Type": "application/json", "Accept": "application/json"}