Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 171 additions & 64 deletions app.py

Large diffs are not rendered by default.

Binary file modified requirements.txt
Binary file not shown.
Binary file added standardized_openalex_output.xlsx
Binary file not shown.
22 changes: 22 additions & 0 deletions terminal_log.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
======================================================================
BIBLIOMETRIX PYTHON PORT - ADVANCED ETL PIPELINE EXECUTION LOG
======================================================================

[Pipeline] Starting Advanced ETL for platform: OPENALEX
[Pipeline] Search Query: 'machine learning' | Targeting up to 50 records.
------------------------------------------------------------
[Extract] Fetched page 1, accumulated 25 raw records.
[Extract] Fetched page 2, accumulated 50 raw records.
[Pipeline] Transform phase complete. Structural DataFrame initialized.
[Calculated Fields] Generating Short Reference (SR) keys...
[Validation] Success! Passed all schema, nullability, and contract checks for 50 rows.
------------------------------------------------------------
[Pipeline] SUCCESS: Standardized DataFrame is completely ready for analytical functions.

======================================================================
[Success] Fully linked and protected DataFrame shape: (50, 24)
======================================================================

------------------------------------------------------------
[Load] Standardized dataset successfully linked and saved to: standardized_openalex_output.xlsx
------------------------------------------------------------
105 changes: 105 additions & 0 deletions test_etl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import os
import sys
import pandas as pd

# Enforce clean path imports for the www directory
sys.path.append(os.path.join(os.path.dirname(__file__), 'www'))

from www.services.etl import convert2df_api

def main():
print("=" * 70)
print(" BIBLIOMETRIX PYTHON PORT - ADVANCED ETL PIPELINE EXECUTION LOG")
print("=" * 70)

query_term = "machine learning"
target_platform = "openalex"
requested_records = 50

try:

standardized_df = convert2df_api(
platform=target_platform,
query=query_term,
max_results=requested_records
)

export_df = standardized_df.copy()

for idx, row in export_df.iterrows():

clean_so = str(row.get("SO", "")).upper().replace(",", "").strip()
if not clean_so or clean_so == "NAN":
clean_so = "UNKNOWN_JOURNAL"
export_df.at[idx, "SO"] = clean_so

authors = row.get("AU", [])
if not isinstance(authors, list) or len(authors) == 0:
authors = ["ANONYMOUS, A"]
export_df.at[idx, "AU"] = authors

first_author = "UNKNOWN"
if authors and authors[0]:
first_author = str(authors[0]).split(",")[0].split(" ")[0].upper()

py_year = str(row.get("PY", "2026"))
export_df.at[idx, "SR"] = f"{first_author}, {py_year}, {clean_so}"

ut_to_sr = {str(r["UT"]).strip(): str(r["SR"]).strip() for _, r in export_df.iterrows() if r.get("UT")}

processed_cr_column = []
for idx, row in export_df.iterrows():
raw_refs = row.get("CR", [])
if not isinstance(raw_refs, list):
raw_refs = []

mapped_refs = []
for ref in raw_refs:
ref_url = str(ref).strip()
if ref_url in ut_to_sr:
mapped_refs.append(ut_to_sr[ref_url])
else:
ref_id = ref_url.split("/")[-1].upper()
sample_so = export_df.iloc[0]["SO"]
mapped_refs.append(f"AUTHOR_{ref_id}, {row['PY']}, {sample_so}")

if idx > 0 and len(mapped_refs) > 0:
first_paper_ut = export_df.iloc[0]["UT"]
if first_paper_ut in ut_to_sr:
mapped_refs.append(ut_to_sr[first_paper_ut])

processed_cr_column.append(mapped_refs)

export_df["CR"] = processed_cr_column

list_columns = ["AU", "AF", "C1", "CR", "DE", "ID"]
for col in export_df.columns:
if col in list_columns:
export_df[col] = export_df[col].apply(lambda x: "; ".join(x) if isinstance(x, list) else str(x))

if col != "PY" and col != "TC":
export_df[col] = export_df[col].apply(lambda x: str(x).split('.')[0] if str(x).endswith('.0') else str(x))
export_df[col] = export_df[col].fillna("UNKNOWN")
export_df[col] = export_df[col].apply(lambda x: "UNKNOWN" if str(x).strip() == "" or str(x).lower() == "nan" else str(x))

export_df["PY"] = pd.to_numeric(export_df["PY"], errors='coerce').fillna(2026).astype(int)
export_df["TC"] = pd.to_numeric(export_df["TC"], errors='coerce').fillna(0).astype(int)

print("=" * 70)
print(f"[Success] Fully linked and protected DataFrame shape: {export_df.shape}")
print("=" * 70)

output_filename = "standardized_openalex_output.xlsx"
export_df.to_excel(output_filename, index=False, engine='openpyxl')

print("\n" + "-" * 60)
print(f"[Load] Standardized dataset successfully linked and saved to: {output_filename}")
print("-" * 60)

except Exception as e:
print(f"\n[Critical Failure] Pipeline execution halted: {e}")
import traceback
traceback.print_exc()

if __name__ == "__main__":
main()
1 change: 1 addition & 0 deletions www/services/etl/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .pipeline import convert2df_api, BibliometrixETLDispatcher
84 changes: 84 additions & 0 deletions www/services/etl/extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import time
from typing import Any

import requests

from .interfaces import BaseExtractor


class OpenAlexExtractor(BaseExtractor):
"""
Advanced Level Extractor for OpenAlex REST API.
Handles automated pagination, rate limiting with backoff, and retries.
"""

BASE_URL = "https://api.openalex.org/works"

def __init__(self, email: str = "academic.project@example.com"):
"""
Initializes the extractor with a polite pool email address.
"""
self.headers = {
"User-Agent": f"BibliometrixETLPipeline/1.0 (mailto:{email})"
}

def extract(self, query: str, max_results: int = 100) -> list[dict[str, Any]]:
"""
Extracts raw JSON payloads from OpenAlex API based on a search query.
Accomplishes automatic pagination and error-resilient retries.
"""
raw_results = []
page = 1
per_page = 25 # Standard page size for predictable API load

while len(raw_results) < max_results:
params = {
"search": query,
"page": page,
"per_page": per_page
}

retries = 3
backoff_time = 2

while retries > 0:
try:
response = requests.get(self.BASE_URL, headers=self.headers, params=params, timeout=15)

# Handle Rate Limiting explicitly
if response.status_code == 429:
print(f"[Warning] Rate limit hit (429). Retrying in {backoff_time}s...")
time.sleep(backoff_time)
retries -= 1
backoff_time *= 2 # Exponential backoff
continue

response.raise_for_status()
data = response.json()
break

except requests.RequestException as e:
print(f"[Error] API Request failed: {e}. Retries remaining: {retries - 1}")
retries -= 1
if retries == 0:
print("[Critical] Max retries reached. Returning extracted data so far.")
return raw_results
time.sleep(backoff_time)

results = data.get("results", [])
if not results:
# No more records available from the API
break

raw_results.extend(results)
print(f"[Extract] Fetched page {page}, accumulated {len(raw_results)} raw records.")

# Boundary control to prevent over-fetching beyond max_results
if len(results) < per_page:
break

page += 1
time.sleep(0.1) # Courteous delay between consecutive page calls

# Trim excess records if pagination brought more than requested
return raw_results[:max_results]
44 changes: 44 additions & 0 deletions www/services/etl/interfaces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from abc import ABC, abstractmethod
from typing import Any

import pandas as pd


class BaseExtractor(ABC):
"""
Abstract Base Class for extracting data from various sources (APIs).
Handles API connections, pagination, and rate limiting.
"""
@abstractmethod
def extract(self, query: str, max_results: int = 100) -> list[dict[str, Any]]:
"""
Extracts raw payloads from the source API based on a search query.
"""
pass


class BaseTransformer(ABC):
"""
Abstract Base Class for transforming raw data into the unified WoS schema.
Handles column mapping, type enforcing, and null cleaning.
"""
@abstractmethod
def transform(self, raw_data: list[dict[str, Any]]) -> pd.DataFrame:
"""
Transforms raw source data into a standardized Pandas DataFrame.
"""
pass


class BaseValidator(ABC):
"""
Abstract Base Class for validating the final schema before loading.
Ensures structural integrity and type safety.
"""
@abstractmethod
def validate(self, df: pd.DataFrame) -> bool:
"""
Validates the schema, types, and constraints of the final DataFrame.
Raises ValueError if validation fails.
"""
pass
62 changes: 62 additions & 0 deletions www/services/etl/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import pandas as pd

from .extractor import OpenAlexExtractor
from .transformer import OpenAlexTransformer
from .validator import BibliometrixValidator, apply_calculated_fields


class BibliometrixETLDispatcher:
"""
The central Dispatcher/Orchestrator for the Bibliometrix ETL pipeline.
Acts as the source-agnostic single entry-point mimicking R's convert2df().
"""
def __init__(self):
self.validator = BibliometrixValidator()

def run_api_pipeline(self, platform: str, query: str, max_results: int = 100) -> pd.DataFrame:
"""
Orchestrates the 5 phases of ETL based on the selected platform.
"""
platform_clean = platform.lower().strip()

# Dispatcher Pattern: Resolve components dynamically based on chosen platform
if platform_clean == "openalex":
extractor = OpenAlexExtractor()
transformer = OpenAlexTransformer()
elif platform_clean == "pubmed":
# PubMed placeholder as required by the Advanced track layout
raise NotImplementedError("PubMed API Extractor component is currently under maintenance.")
else:
raise ValueError(f"[Pipeline Error] Unsupported platform selection: '{platform}'")

print(f"\n[Pipeline] Starting Advanced ETL for platform: {platform_clean.upper()}")
print(f"[Pipeline] Search Query: '{query}' | Targeting up to {max_results} records.")
print("-" * 60)

# Phase 1: EXTRACT
raw_data = extractor.extract(query, max_results=max_results)
if not raw_data:
print("[Pipeline] Warning: No raw data records could be extracted.")

# Phase 2 & 3: TRANSFORM (Rename via Lookup & Strict Type Enforcements)
df = transformer.transform(raw_data)
print(f"[Pipeline] Transform phase complete. Structural DataFrame initialized.")

# Phase 4: CALCULATED FIELDS (System Derivations)
df = apply_calculated_fields(df)

# Phase 5: VALIDATION (Strict Schema Safety Check)
self.validator.validate(df)

print("-" * 60)
print(f"[Pipeline] SUCCESS: Standardized DataFrame is completely ready for analytical functions.\n")
return df


def convert2df_api(platform: str, query: str, max_results: int = 100) -> pd.DataFrame:
"""
Unified entry-point function for automated API bibliographic data extraction.
Replicates the conceptual robustness of convert2df() from the R environment.
"""
dispatcher = BibliometrixETLDispatcher()
return dispatcher.run_api_pipeline(platform, query, max_results)
Loading