Skip to content

feat: harden migration pipeline - parallelism, streaming, resume, re-auth#255

Open
vidiecan wants to merge 15 commits intomainfrom
optim
Open

feat: harden migration pipeline - parallelism, streaming, resume, re-auth#255
vidiecan wants to merge 15 commits intomainfrom
optim

Conversation

@vidiecan
Copy link
Contributor

@vidiecan vidiecan commented Feb 24, 2026

Summary

This PR improves migration reliability, observability, and performance in the import pipeline, with a strong focus on validation signal quality and long-running operation diagnostics.

What Changed

1) Runtime profiling and duration accuracy

  • src/repo_import.py
    • Fixed end-of-run profiling so total runtime reflects full wall-clock duration.
    • Added total_elapsed to final profile checkpoint logging.
    • Added human-readable duration formatting (e.g. 1d 03:12:45).
    • Kept phase elapsed timing while adding total elapsed context.

2) Logging UX and status semantics

  • src/utils.py

    • Added console color formatter with Windows/PyCharm ANSI handling (colorama support).
    • Standardized level rendering to 4-char codes (DEBU, INFO, WARN, CRIT).
    • Status color semantics:
      • CRIT -> red
      • WARN -> orange
      • OK -> green
    • Removed transient highlight markers from emitted message text.
  • src/pump/_utils.py

    • Standardized post-import status lines:
      • OK when expected == imported
      • WARN otherwise
    • Improved alignment and count readability in one-line status summaries.

3) Validation severity and signal quality

  • src/pump/_db.py
    • Introduced severity classification for length comparisons:
      • CRIT when v7 < v5
      • WARN for other mismatches
      • OK when equal
    • Applied severity to table len checks, non-null checks, and SQL fallback checks.
    • Improved bool-field diagnostics (withdrawn, in_archive, discoverable, deleted) with true/false breakdowns.
    • Removed noisy SQL-validation progress logs from INFO; moved fetch/normalize/set-diff step logs to DEBUG.
    • Simplified SQL-validation progress phrasing (preparing fetch v5 and v7).

4) Metadata normalization performance improvements

  • src/pump/_metadata.py
    • Optimized hot path in _metadatavalue_process:
      • hoisted repeated property/method lookups
      • cached int-key maps for UUID and field-id mapping
      • reduced per-row conversions/dispatch overhead
      • replaced regex-in-loop pattern checks with cheaper substring checks in the complex branch
      • tightened text normalization path and append bindings
    • Improved ignored field reporting to include human-readable field names.

5) Import progress and resiliency enhancements

  • src/pump/_bitstream.py

    • Enhanced progress postfix with richer counters (skipped_deleted, resumed, checkpoints).
    • Made checkpoint feedback less noisy and more progress-oriented.
  • src/pump/_resourcepolicy.py

    • Added SQL-based validation definitions for embargo-related policy checks.
    • Improved import accounting for skipped deleted targets/missing bitstreams.
    • Added progress postfix updates for worker-mode imports.
  • src/pump/_eperson.py

    • Added non-null validation coverage for epersongroup2eperson.
  • src/pump/_sequences.py

    • Improved sequence drift warning readability with aligned structured values.

6) Dependencies

  • requirements.txt
    • Added colorama>=0.4.6 for better ANSI support on Windows/PyCharm.
    • pysolr line currently appears without version pin in this branch.

Results / Impact

  • Cleaner and more actionable validation output:
    • critical regressions (v7 < v5) now stand out as CRIT.
    • warning and success lines are easier to visually scan.
  • Reduced INFO log noise during SQL validation by shifting operational progress to DEBUG.
  • Improved runtime visibility with accurate total elapsed and readable duration.
  • Metadata normalization path was optimized; during iterative runs, throughput trends improved versus earlier baseline in user-reported logs.

Validation Performed

  • Pre-commit checks passed on commit:
    • autopep8 -> passed
    • ruff -> passed
  • No new diagnostics errors in modified Python files during implementation iterations.

Commit

  • 671ebb6 feat(import): improve validation signals, profiling, and normalization performance

Copilot AI review requested due to automatic review settings February 24, 2026 11:28
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request implements performance optimizations for the DSpace import process, focusing on memory efficiency and import speed improvements.

Changes:

  • Added memory profiling and performance checkpointing throughout the import process
  • Implemented streaming JSONL processing for large metadata tables to reduce memory usage
  • Optimized database operations including chunked exports and connection reuse
  • Enhanced error handling with fail-fast behavior on critical import failures

Reviewed changes

Copilot reviewed 17 out of 17 changed files in this pull request and generated 11 comments.

Show a summary per file
File Description
src/utils.py Added memory logging configuration parameter
src/repo_import.py Added memory profiling, performance checkpointing, and changed default --resume to False
src/pump/_repo.py Implemented chunked table export with progress tracking and JSONL support
src/pump/_metadata.py Converted to streaming JSONL processing with tuple-based storage for memory efficiency
src/pump/_item.py Added reset_progress method for progress tracking
src/pump/_handle.py Added database check to skip already-imported handles
src/pump/_group.py Enhanced error handling with concurrent creation detection
src/pump/_eperson.py Added hydrate_uuid_map for efficient eperson mapping and netid support
src/pump/_db.py Refactored connection health checks and added estimated vs exact count modes
src/pump/_community.py Changed error from log-and-continue to raise exception
src/pump/_collection.py Optimized to accept pre-computed collection-to-group mapping
src/pump/_bundle.py Added reset_progress method
src/pump/_bitstream.py Enhanced error handling with validation and fail-fast behavior
src/project_settings.py Added memory log file config and changed localhost to 127.0.0.1
src/dspace/_rest.py Modified post method and added HTTP timeouts
scripts/start.local.dspace.db.bat Enhanced script with environment variables and localhost binding
scripts/Readme.md Added OXFORD mock setup documentation

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

vidiecan and others added 7 commits February 24, 2026 12:51
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…d improve operator safety

- add startup configuration confirmation prompt before import begins, including key runtime flags and license URL rewrite settings
- introduce backend config options for parallel item and bundle imports and wire them through the import pipeline
- add worker-client spawning in REST layer to support safe per-thread API sessions
- improve auth resilience by retrying 401/403 responses with forced re-authentication
- add reusable parallel task runner and use it in item and bundle import flows
- optionally skip deleted bitstreams via configuration
- improve log messages
@vidiecan vidiecan changed the title Optim feat: harden migration pipeline - parallelism, streaming, resume, re-auth Mar 3, 2026
@vidiecan vidiecan requested a review from Copilot March 3, 2026 23:41
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 23 out of 24 changed files in this pull request and generated 6 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 3 to +7
import os
import argparse
import logging
import gc
import tracemalloc
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time.time() / time.perf_counter() are used, but time is not imported in the shown imports. Add import time to avoid a runtime NameError.

Copilot uses AI. Check for mistakes.
Comment on lines +192 to +195
s = time.time()
if args.memory_profile and not tracemalloc.is_tracing():
tracemalloc.start(25)
checkpoint_ts = time.perf_counter()
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time.time() / time.perf_counter() are used, but time is not imported in the shown imports. Add import time to avoid a runtime NameError.

Copilot uses AI. Check for mistakes.
Comment on lines +49 to +51
if memory_log_file:
memory_log_dir = os.path.dirname(memory_log_file)
os.makedirs(memory_log_dir, exist_ok=True)
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

os.path.dirname(memory_log_file) can be an empty string when memory_log_file is just a filename (no directory). os.makedirs('') raises FileNotFoundError. Guard with if memory_log_dir: before calling os.makedirs, or default to '.'.

Copilot uses AI. Check for mistakes.
self._col2group[int(m.group(1))] = meta['resource_id']
if col2group is not None:
self._col2group = col2group
else:
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metadata_file_str is now optional, but it’s still required when col2group is not provided. If metadata_file_str is None, this will call read_json(None) and fail unclearly. Consider validating inputs (raise a clear ValueError) when col2group is None and metadata_file_str is missing.

Suggested change
else:
else:
if metadata_file_str is None:
raise ValueError(
"metadata_file_str must be provided when col2group is None"
)

Copilot uses AI. Check for mistakes.
Comment on lines +69 to +76
run_with_retry "createdb clarin-dspace" createdb --username=postgres --owner=dspace --encoding=UNICODE clarin-dspace
run_with_retry "psql import clarin-dspace" bash -lc "psql -U postgres clarin-dspace < ../dump/clarin-dspace.sql > /dev/null 2>&1"
run_with_retry "psql import clarin-dspace log" bash -lc "psql -U postgres clarin-dspace < ../dump/clarin-dspace.sql > ./__clarin-dspace.log 2>&1"

echo "Importing clarin-utilities"
createdb --username=postgres --encoding=UNICODE clarin-utilities
psql -U postgres clarin-utilities < ../dump/clarin-utilities.sql &> /dev/null
psql -U postgres clarin-utilities < ../dump/clarin-utilities.sql &> ./__clarin-utilities.log
run_with_retry "createdb clarin-utilities" createdb --username=postgres --encoding=UNICODE clarin-utilities
run_with_retry "psql import clarin-utilities" bash -lc "psql -U postgres clarin-utilities < ../dump/clarin-utilities.sql > /dev/null 2>&1"
run_with_retry "psql import clarin-utilities log" bash -lc "psql -U postgres clarin-utilities < ../dump/clarin-utilities.sql > ./__clarin-utilities.log 2>&1"
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each SQL dump is imported twice (once to /dev/null, then again to a log file), which is slow and can break if the dump isn’t idempotent. Prefer a single import that logs output (e.g., redirect once to the log, optionally tail/print summary on failure).

Copilot uses AI. Check for mistakes.
Comment on lines +174 to +175
expected = len([b for b in (self._bs or []) if not (
skip_deleted and b.get('deleted', False))])
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This builds a full filtered list just to compute expected, which can be expensive for large bitstream tables. Use a generator-based count (e.g., sum(1 for b in ...)) to avoid allocating a large intermediate list.

Suggested change
expected = len([b for b in (self._bs or []) if not (
skip_deleted and b.get('deleted', False))])
expected = sum(
1
for b in (self._bs or [])
if not (skip_deleted and b.get('deleted', False))
)

Copilot uses AI. Check for mistakes.
jm added 4 commits March 4, 2026 19:07
…n performance

- add total runtime to final profile checkpoints with human-readable duration\n- improve console log UX: 4-letter levels, status colorization (OK/WARN/CRIT), PyCharm ANSI handling\n- remove redundant SQL fetch progress at INFO and move SQL-validation step telemetry to DEBUG\n- classify table-length diffs by severity: CRIT when v7 < v5, WARN otherwise\n- standardize status lines and alignment for import/test comparison output\n- optimize metadatavalue normalization hot path by reducing per-row overhead and lookup costs\n- add/adjust validation coverage for resourcepolicy and epersongroup2eperson paths\n- improve sequence drift logging readability and bitstream/resourcepolicy progress reporting
@vidiecan vidiecan requested a review from Copilot March 6, 2026 11:53
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 25 out of 26 changed files in this pull request and generated 4 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +94 to +100
try:
_logger.info(
"Press ENTER to continue if this configuration is OK (Ctrl+C to abort)...")
input()
except EOFError:
_logger.warning(
"No interactive input available; continuing without confirmation keypress.")
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unconditional input() can block indefinitely in non-interactive runs (e.g., CI, redirected stdin that isn’t EOF). Consider guarding with sys.stdin.isatty() (or a --no-confirm / --non-interactive flag) and skipping the prompt when not interactive.

Copilot uses AI. Check for mistakes.
Comment on lines +204 to +213
def _update_progress(pbar_ref):
"""Centralized progress-bar postfix update."""
pbar_ref.set_postfix(
imported=self._imported['bitstream'],
skipped_deleted=skipped_deleted,
resumed=skipped_already_imported,
errored=errored,
checkpoints=checkpoints_saved,
to_checkpoint=checkpoint_every - checkpoint_counter,
)
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

progress_bar() can return a plain iterator (when len(arr) < 2), which does not have set_postfix(). This will raise AttributeError for very small bitstream datasets (0/1 elements) because _update_progress() is called unconditionally later. Make _update_progress() resilient (e.g., if hasattr(pbar_ref, "set_postfix"):) or ensure a tqdm instance is always used when postfix updates are needed.

Copilot uses AI. Check for mistakes.
Comment on lines +149 to +151
def run_tasks(tasks, worker, workers: int = 1, desc: str = None, on_result=None):
tasks = list(tasks or [])
workers = max(1, int(workers or 1))
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converting tasks to a list unconditionally duplicates memory and defeats streaming for large iterables (which matters for migration workloads). Consider only materializing when necessary (e.g., if tasks isn’t a Sequence and you need len()/multiple passes), or accept a total parameter and iterate without copying.

Copilot uses AI. Check for mistakes.
Comment on lines 90 to +96
def init_logging(
logger,
log_file: str,
console_level=logging.INFO,
file_level=logging.INFO,
format: str = '%(asctime)s:%(levelname)s: %(message)s'):
logger,
log_file: str,
memory_log_file: str = None,
console_level=logging.INFO,
file_level=logging.INFO,
format: str = '%(asctime)s:%(levelname).4s: %(message)s'):
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

memory_log_file: str = None should be typed as Optional[str] (and format is oddly indented compared to the other parameters). Adjusting the typing/formatting will improve readability and static analysis without changing behavior.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants