Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions willisapi_client/services/metadata/archive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import os
import requests

from willisapi_client.willisapi_client import WillisapiClient
from willisapi_client.logging_setup import logger as logger


def _archive_headers(api_key):
return {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"token {api_key}",
}


def archive_metadata_csv(api_key, csv_path, total_rows, upload_type, env=None):
"""Archive the source metadata CSV alongside the data upload.

Creates a server-side tracking row, then PUTs the raw CSV to the returned
presigned S3 URL. Returns the tracking ``record_id`` on success (so the
caller can finalize it with row counts), or ``None`` on failure.

All failures are logged and swallowed — archiving must never abort the
actual data upload.
"""
try:
wc = WillisapiClient(env=env)
payload = {
"filename": os.path.basename(csv_path),
"total_rows": total_rows,
"upload_type": upload_type,
}
res = requests.post(
wc.get_csv_archive_url(), headers=_archive_headers(api_key), json=payload
)
if res.status_code != 201:
logger.warning(f"CSV archive init failed: {res.status_code} {res.text}")
return None

body = res.json()
record_id = body.get("record_id")
presigned = body.get("presigned_url")
if not presigned:
logger.warning("CSV archive: no presigned URL returned")
return None

with open(csv_path, "rb") as f:
put_res = requests.put(
presigned, data=f, headers={"Content-Type": "text/csv"}
)
if put_res.status_code not in (200, 204):
logger.warning(f"CSV archive S3 upload failed: {put_res.status_code}")
finalize_metadata_csv(api_key, record_id, "failed", 0, 0, env=env)
return None

return record_id
except Exception as ex:
logger.warning(f"CSV archive error: {ex}")
return None


def finalize_metadata_csv(
api_key, record_id, status, successful_rows, failed_rows, env=None
):
"""Report the CSV archive outcome and parsed row counts to the server."""
if not record_id:
return
try:
wc = WillisapiClient(env=env)
payload = {
"status": status,
"successful_rows": successful_rows,
"failed_rows": failed_rows,
}
res = requests.patch(
wc.get_csv_archive_finalize_url(record_id),
headers=_archive_headers(api_key),
json=payload,
)
if res.status_code != 200:
logger.warning(f"CSV archive finalize failed: {res.status_code} {res.text}")
except Exception as ex:
logger.warning(f"CSV archive finalize error: {ex}")
42 changes: 42 additions & 0 deletions willisapi_client/services/metadata/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
find_files_with_pattern,
get_last_n_directories,
)
from willisapi_client.services.metadata.archive import (
archive_metadata_csv,
finalize_metadata_csv,
)

VALID_SCORE_TYPES = ["rater", "reviewer"]

Expand All @@ -38,6 +42,15 @@ def upload(api_key: str, csv_path: str, **kwargs):
headers["Authorization"] = f"token {api_key}"
logger.info(f'{datetime.now().strftime("%H:%M:%S")}: beginning upload')

# Archive the source metadata CSV and open a tracking record.
archive_record_id = archive_metadata_csv(
api_key,
csv_path,
int(csv.transformed_df.shape[0]),
upload_type="data",
env=kwargs.get("env"),
)

results = []
for index, row in tqdm(
csv.transformed_df.iterrows(), total=csv.transformed_df.shape[0]
Expand Down Expand Up @@ -96,6 +109,16 @@ def upload(api_key: str, csv_path: str, **kwargs):
result_row["error"] = f"{err}"
results.append(result_row)

successful_rows = sum(1 for r in results if r.get("upload_status") == "Success")
finalize_metadata_csv(
api_key,
archive_record_id,
"successful",
successful_rows,
len(results) - successful_rows,
env=kwargs.get("env"),
)

results_df = pd.DataFrame(results)
return results_df
else:
Expand Down Expand Up @@ -128,6 +151,15 @@ def processed_upload(api_key: str, csv_path: str, output_path: str, **kwargs):
headers["Authorization"] = f"token {api_key}"
logger.info(f'{datetime.now().strftime("%H:%M:%S")}: beginning upload')

# Archive the source processed-data metadata CSV and open a tracking record.
archive_record_id = archive_metadata_csv(
api_key,
csv_path,
int(csv.transformed_df.shape[0]),
upload_type="processed_data",
env=kwargs.get("env"),
)

results = []
for index, row in tqdm(
csv.transformed_df.iterrows(), total=csv.transformed_df.shape[0]
Expand Down Expand Up @@ -205,6 +237,16 @@ def processed_upload(api_key: str, csv_path: str, output_path: str, **kwargs):
result_row["error"] = f"{err}"
results.append(result_row)

successful_rows = sum(1 for r in results if r.get("upload_status") == "Success")
finalize_metadata_csv(
api_key,
archive_record_id,
"successful",
successful_rows,
len(results) - successful_rows,
env=kwargs.get("env"),
)

results_df = pd.DataFrame(results)
return results_df
else:
Expand Down
6 changes: 6 additions & 0 deletions willisapi_client/willisapi_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,11 @@ def get_upload_url(self):
def get_processed_upload_url(self):
return self.get_base_v2_url() + "metadata/processed-data/upload"

def get_csv_archive_url(self):
return self.get_base_v2_url() + "metadata/csv-archive"

def get_csv_archive_finalize_url(self, record_id):
return self.get_csv_archive_url() + f"/{record_id}"

def get_headers(self):
return {"Content-Type": "application/json", "Accept": "application/json"}
Loading