FEAT: LeMatRho data fetch, transform. Downsample rho during fetch.#49
FEAT: LeMatRho data fetch, transform. Downsample rho during fetch.#49
Conversation
Add `lematrho run` CLI command that downloads charge density data from S3, compresses via pyrho, optionally runs Bader/DDEC6 analysis, and writes Parquet files directly. Crash-safe via checkpoint file, atomic Parquet writes, work-stealing parallelism.
- Move shared constants (STATIC_CALC_TYPE, GRID_KEY_MAP, timeouts) and write_potcar() into utils.py as single source of truth - Convert all lematrho docstrings to Google style - Add 11 new pipeline tests: _validate_tools, DDEC6 happy path, _structure_to_row with None fields, _push_to_huggingface, DDEC6 in _process_material - Add @pytest.mark.integration scaffold with .env.integration loading - Remove unused --force flag from lematrho transform (was silently ignored) - Fix batch: Any -> batch: str type annotation in fetch.py - Add .env.* to .gitignore with !.env.example exception - Add LeMatRho/AWS/HuggingFace variables to .env.example - Add pytest integration marker config to pyproject.toml
- Vasprun and Chgcar.from_file require filesystem paths, not BytesIO objects. Write bytes to temp files before parsing. - Add --limit CLI option to cap number of materials processed (useful for smoke testing without processing all ~76k materials). - Verified with real S3 smoke test: 2 materials processed end-to-end with compressed charge densities at 10x10x10 grid shape.
- Batch-checkpoint after Parquet flush instead of per-material to prevent
desync on crash (up to chunk_size materials could be lost)
- Track failed materials in .failures.txt, skip on resume
- Close S3 StreamingBody and free compressed buffer in download_gz_file_from_s3
- Replace NamedTemporaryFile(delete=True) with TemporaryDirectory for
reliable temp file lifetime in parse_vasprun_structure and compress_chgcar
- Fix _list_materials docstring ("Sorted" claim was incorrect)
- Document memory trade-off for raw_files kept for Bader/DDEC6
- Add 5 new tests (batch checkpoint, failure load/append/resume)
Replace manual subprocess Bader/DDEC6 calls with pymatgen's BaderAnalysis and ChargemolAnalysis wrappers. Replace perl chgsum.pl with Chgcar arithmetic. Extract shared run_bader_from_bytes() and run_ddec6_from_bytes() helpers to utils.py, eliminating ~100 lines of duplication between transform.py and pipeline.py. Remove chgsum_script_path from config/CLI and timeout constants from utils. Add module-level docstrings to all LeMatRho files.
…ct pipeline Delete fetch.py, transform.py, and their tests. The traditional PostgreSQL-based pipeline added confusion and required double S3 downloads. Only the direct S3-to-Parquet pipeline (lematrho run) remains. Move get_cross_compatibility() to utils.py so pipeline.py has no dependency on the deleted transform.py. Remove lematrho fields from FetcherConfig and TransformerConfig. Remove fetch/transform CLI commands and options. 131 tests pass, 2 skipped.
|
|
||
| try: | ||
| # Fresh client per worker (boto3 clients are NOT multiprocess-safe) | ||
| aws_client = get_authenticated_aws_client() |
There was a problem hiding this comment.
This creates a new authenticated client for every material, I don't think it is a bottleneck because of the rest of the pipeline but might be done for say batches of materials if it causes an issue or triggers rate-limits on AWS side.
There was a problem hiding this comment.
You're right that it creates a client per material, but this is intentional: boto3 clients are not safe to share across ProcessPoolExecutor workers, and client creation is fast (~ms) relative to the multi-hundred-MB downloads and Bader/DDEC6 analysis per material. Added an inline comment explaining this trade-off. If we ever see rate-limiting from AWS we can batch client creation per worker init, but so far it hasn't been an issue. Gemini thinks we could do something like this (best of both worlds, minimize clients but not share them across processes:
import concurrent.futures
import boto3
# Global variable inside the worker's memory space
worker_s3_client = None
def initialize_worker():
"""Runs once when the worker process starts."""
global worker_s3_client
# Get the authenticated client once per process
worker_s3_client = get_authenticated_aws_client()
def process_material(material_id):
"""Your task function."""
global worker_s3_client
# Use the process-local client (no need to recreate it!)
response = worker_s3_client.get_object(Bucket='my-bucket', Key=material_id)
# ... do the multi-hundred-MB download and Bader/DDEC6 analysis ...
return f"Processed {material_id}"
# Main pipeline execution
def run_pipeline(materials):
with concurrent.futures.ProcessPoolExecutor(
max_workers=4,
initializer=initialize_worker # <-- This is the magic!
) as executor:
results = list(executor.map(process_material, materials))
return results
Let's come back to this if we see the code is too slow (for now with 70k materials I didnt notice this bottleneck).
| optimade_structure = OptimadeStructure( | ||
| id=material_id, | ||
| source="lematrho", | ||
| immutable_id=material_id, | ||
| last_modified=datetime.now(), | ||
| **optimade_dict, | ||
| functional=Functional.PBE, | ||
| cross_compatibility=cross_compatibility, | ||
| compressed_charge_density=compressed_grids.get("charge_density"), | ||
| compressed_aeccar0=compressed_grids.get("aeccar0"), | ||
| compressed_aeccar1=compressed_grids.get("aeccar1"), | ||
| compressed_aeccar2=compressed_grids.get("aeccar2"), | ||
| charge_density_grid_shape=list(grid_shape), | ||
| bader_charges=bader_charges, | ||
| bader_atomic_volume=bader_atomic_volume, | ||
| ddec6_charges=ddec6_charges, | ||
| compute_space_group=True, | ||
| compute_bawl_hash=True, | ||
| ) |
There was a problem hiding this comment.
Do you know whether it might be useful to also transfer the energy or forces keys for instance in the database? Or are they also included elsewhere?
There was a problem hiding this comment.
Good question, yes, energy, forces, and stress are all available in the vasprun.xml we already download, so extracting them would be essentially free. Currently parse_vasprun_structure only pulls the final relaxed structure and discards the rest. @mfranckel will add energy/forces/stress extraction in a follow-up PR
Ramlaoui
left a comment
There was a problem hiding this comment.
LGTM, no major changes requested as the pipeline is pretty separate from the rest of the logic. Also I am not very familiar with the bader/DDEC6 code so I can't comment on what gets executed there.
Just one high-level question: for the generic fetch / transform pipeline, why do you think it would need a double download? wouldn't it be enough to download the AWS open data with all CHGCAR / AECCAR once during fetch in the raw postgres database and then use that database during transform? Of course the big issue with that would be storage space and then this leads us to wondering whether we really need local copies of online databases for reproducibility or not but I guess that would be project dependent.
Also added a few comments on whether the bottleneck induced by creating connections and switching tasks for each material adds latency that could be avoided if materials can be batched / or workers increased. But I guess that depends a lot on the BADER/DDEC6 workflow and its own internal bottlenecks and overheads.
Thanks a lot it was very nice to read both the code and the documentation and follow along!
|
Thanks a lot for the thorough review Ali with loads of constructive feedback and good finds. On the double download: You're right that it would technically be possible to store the raw CHGCAR/AECCAR bytes in PostgreSQL during fetch and avoid re-downloading in transform. The reason we didn't go that route is storage: each material has 4 charge density files (CHGCAR + AECCAR0/1/2) that can be 100–500 MB each when decompressed. Across even a few thousand materials that's multiple terabytes, which is way outside what makes sense for a Postgres DB (we could store file paths in the d though). Mostly this was built on @inelgnu 's suggestion to cut out the middleman postgres and use parquet directly (I think this is what he wouldve done if he knew HF was the endpoint but that wasnt known when most of LeMaterialFetcher was written). |
LeMatRho uses a direct S3 → Parquet pipeline and never writes to Postgres.
The 8 charge density columns (compressed_charge_density, compressed_aeccar{0,1,2},
charge_density_grid_shape, bader_charges, bader_atomic_volume, ddec6_charges)
were dead weight in the Postgres schema and INSERT tuples. The fields remain on
OptimadeStructure (Pydantic model) where they define the Parquet output schema.
|
Thanks for the explanation, that makes a lot of sense! I think this can then be viewed as a special case where the fetcher is not used or does not exist and all there is is a transform that fetches the raw files and throws the unnecessary data which is exactly what you did. Congrats on the massive work with this PR and looking forward to using the dataset :) |
What Is LeMatRho?
LeMatRho is an effort to open source charge density results from DFT. The raw VASP outputs live in an authenticated S3 bucket (
lemat-rho) (talk to me for access) and includes CHGCAR, AECCAR0/1/2, and vasprun.xml files for eachmaterial.
This PR adds LeMatRho as a new data source inside
lematerial-fetcherso wecan fetch, compress, analyse, and publish charge densities as a HuggingFace
dataset.
Why This Codebase?
We built on
lematerial-fetcher(instead of writing from scratch) because:BaseFetcher/BaseTransformerpattern for pulling datafrom external sources (Materials Project, OQMD, Alexandria) into a shared
OPTIMADE data model.
infrastructure was partially in place.
OptimadeStructurePydantic model, PostgreSQL schema, andHuggingFace push tooling can be reused with minimal extensions.
Two Pipeline Architectures
1. Traditional Pipeline (
lematrho fetch+lematrho transform)Follows the repo's existing two-step pattern:
them with pyrho, stores compressed grids in PostgreSQL.
to
OptimadeStructure, optionally re-downloads files from S3 to runBader/DDEC6 charge analysis.
Included for consistency with existing architecture. Not recommended for
production because the transform step must re-download files from S3 for
charge analysis (double download).
2. Direct Pipeline (
lematrho run) -- RECOMMENDEDA single-pass architecture that bypasses PostgreSQL entirely:
wrappers used by all three pipeline files.
File Map
All LeMatRho source code lives under
src/lematerial_fetcher/fetcher/lematrho/:__init__.pyfetch.pyLeMatRhoFetcher(BaseFetcher)-- S3 to PostgreSQLtransform.pyLeMatRhoTransformer(BaseTransformer)-- PostgreSQL raw to OPTIMADEpipeline.pyLeMatRhoDirectPipeline-- single-pass S3 to Parquetutils.pyModified files elsewhere in the repo:
models/optimade.pydatabase/postgres.pyutils/config.pyDirectPipelineConfigdataclass, extendedFetcherConfig/TransformerConfigutils/cli.pyadd_lematrho_fetch_options,add_lematrho_transform_options,add_lematrho_direct_optionsutils/aws.pyget_authenticated_aws_client()(adaptive retry, credential chain)cli.pylematrhocommand group withfetch,transform,runsubcommandspush.pymodels/utils/enums.pySource.LEMATRHO = "lematrho"pyproject.tomlmp-pyrho>=0.3.1dependency.env.exampleData Model: New Fields on OptimadeStructure
All 8 fields are
Optionaland default toNone, so existing data sources(MP, Alexandria, OQMD) are unaffected.
compressed_charge_densityOptional[list]compressed_aeccar0Optional[list]compressed_aeccar1Optional[list]compressed_aeccar2Optional[list]charge_density_grid_shapeOptional[list[int]][15, 15, 15]bader_chargesOptional[list[float]]bader_atomic_volumeOptional[list[float]]ddec6_chargesOptional[list[float]]Per-site fields (
bader_charges,bader_atomic_volume,ddec6_charges) arevalidated by
_validate_with_number_of_sites()to ensure their length matchesnsites.Key Design Decisions
Single-pass S3 access (direct pipeline)
Each S3 file is downloaded exactly once. The raw decompressed bytes are
held in memory so they can be reused for both pyrho compression and
Bader/DDEC6 analysis. The traditional pipeline cannot do this because fetch and
transform are separate steps.
No PostgreSQL (direct pipeline)
The end target is Parquet files on HuggingFace. PostgreSQL adds schema
migration, connection management, and disk space overhead with no benefit for
this use case.
DirectPipelineConfighas zero database fields.Crash-safe checkpointing
A text file (
.checkpoint.txt) records each successfully processed materialID. On restart, the pipeline skips already-processed materials and resumes
writing Parquet chunks from the next index.
Atomic Parquet writes
Chunks are written to a
.tmpfile first, then renamed viaos.rename()(POSIX atomic on the same filesystem). Stale
.tmpfiles from crashed runs areignored on resume.
Work-stealing parallelism
Uses
concurrent.futures.wait(FIRST_COMPLETED)with bounded submission (2xnum_workersfutures in flight) to avoid creating hundreds of thousands ofFuture objects in memory.
Memory management
Sequential file processing within each worker, explicit
delof raw bytesafter use,
gc.collect()after each material. Conservative default of 4workers because each CHGCAR can be hundreds of MB when decompressed.
Graceful tool degradation
External tools (
bader,chargemol) are validated at init. If any tool ismissing, the corresponding output fields are set to
Nonerather thanfailing the pipeline. Bader and DDEC6 are independent -- one can fail while the
other succeeds.
Cross-compatibility
All LeMatRho structures are marked cross-compatible (no element exclusions).
This differs from the Alexandria fetcher, which excludes Yb-containing
structures. The
get_cross_compatibility()function always returnsTrue.Charge Density Processing: How It Works
pyrho Compression
For each of CHGCAR, AECCAR0, AECCAR1, AECCAR2:
pymatgen.io.vasp.Chgcar(writes to temp file firstbecause pymatgen needs a file path).
pyrho.charge_density.ChargeDensityviaChargeDensity.from_pmg().pgrids["total"].lossy_smooth_compression(grid_shape).(15, 15, 15), configurable via--grid-shape.Bader Charge Analysis (optional)
Requires:
baderexecutable on PATH,PMG_VASP_PSP_DIRenvironment variable.Uses
pymatgen.command_line.bader_caller.BaderAnalysis, which manages theexternal
baderbinary internally. The pipeline does not call subprocessdirectly.
Steps (in
run_bader_from_bytes()inutils.py):MatPESStaticSet(structure).potcar.Chgcararithmetic(
Chgcar.from_file("AECCAR0") + Chgcar.from_file("AECCAR2")), write resultas
CHGCAR_sum.BaderAnalysis(chgcar_filename=..., potcar_filename=..., chgref_filename=..., bader_path=...).summary["charge_transfer"]andsummary["atomic_volume"].electron_count - valence(positive = gained electrons). We negate it:
[-ct for ct in ba.summary["charge_transfer"]]so positive = cationic,matching the convention used in the rest of the codebase.
Returns:
(net_charges: list[float], atomic_volumes: list[float])or(None, None)on failure.DDEC6 Charge Analysis (optional)
Requires:
chargemolexecutable, atomic densities directory,PMG_VASP_PSP_DIRenvironment variable.Uses
pymatgen.command_line.chargemol_caller.ChargemolAnalysis, which managesthe external
chargemolbinary internally.Steps (in
run_ddec6_from_bytes()inutils.py):MatPESStaticSet(structure).potcar.CHARGEMOL_COMMANDenvironment variable to the chargemolpath (pymatgen reads this env var).
ChargemolAnalysis(path=tmpdir, atomic_densities_path=..., run_chargemol=True).ca.ddec_charges.CHARGEMOL_COMMANDvalue in afinallyblock.Thread safety note: The
CHARGEMOL_COMMANDsave/restore pattern isprocess-safe (each
ProcessPoolExecutorworker has its own environment) butnot thread-safe. Do not call from multiple threads in the same process.
Returns:
list[float](net charges per site) orNoneon failure.Much of the pyrho/bader/DDEC6 code is derived from @msiron-entalpic 's code here.
Shared Code Architecture
Both pipeline variants (traditional and direct) delegate charge analysis to
the same two functions in
utils.py:run_bader_from_bytes(structure, raw_files, bader_path, material_id)run_ddec6_from_bytes(structure, raw_files, chargemol_path, atomic_densities_path, material_id)This eliminates code duplication. The difference between the two pipelines is
only in how they obtain the raw bytes:
self.aws_client, then delegates.passes them directly.
Constants in utils.py
Helper functions in utils.py
download_gz_file_from_s3(client, bucket, key)parse_vasprun_structure(vasprun_bytes)Structurecompress_chgcar(chgcar_bytes, grid_shape)build_raw_structure(material_id, structure, compressed_grids, grid_shape, s3_prefix)RawStructurefor PostgreSQL insertionwrite_potcar(structure, tmpdir)MatPESStaticSetrun_bader_from_bytes(structure, raw_files, bader_path, material_id)run_ddec6_from_bytes(structure, raw_files, chargemol_path, atomic_densities_path, material_id)S3 Bucket Structure
The
lemat-rhobucket contains one folder per material:Material IDs have prefixes indicating their origin database:
agm-- Alexandriamp--- Materials Projectoqmd--- OQMDOnly folders matching
VALID_PREFIXESare processed.Parquet Schema (Direct Pipeline)
The direct pipeline writes 33-column Parquet files. Compressed grids are stored
as JSON-serialised strings (not nested arrays) because Parquet handles
variable-depth nesting poorly. Per-site arrays use
pa.list_(pa.float64()).Key column types:
compressed_charge_densitypa.string()compressed_aeccar0pa.string()compressed_aeccar1pa.string()compressed_aeccar2pa.string()charge_density_grid_shapepa.list_(pa.int32())[15, 15, 15]bader_chargespa.list_(pa.float64())bader_atomic_volumepa.list_(pa.float64())ddec6_chargespa.list_(pa.float64())speciespa.string()functionalpa.string().valuelast_modifiedpa.string()Full schema is defined as
PARQUET_SCHEMAinpipeline.py.Configuration
DirectPipelineConfig (dataclass)
Environment Variables
AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEYAWS_DEFAULT_REGIONPMG_VASP_PSP_DIRSee
.env.examplefor the full template.CLI Usage
External Dependencies
mp-pyrho>=0.3.1pip install(in pyproject.toml)baderexecutablechargemolexecutablePMG_VASP_PSP_DIRAll external tools are optional. Missing tools result in
Nonefieldsrather than pipeline failure.
Test Coverage
182 total test functions across the repo, of which 90 are
LeMatRho-specific:
test_lematrho_fetch.pytest_lematrho_transform.pytest_lematrho_pipeline.pytest_cli.py(lematrho portion)test_config.py(lematrho portion)DirectPipelineConfigconstruction and defaultstest_aws.pytest_optimade_model.pyIntegration test scaffolds (
@pytest.mark.integration) exist for real S3testing but are deselected by default.
Known Open Items
HuggingFace push needs rework. The code currently treats LeMatRho as if
it will be pushed into the existing LeMatBulk dataset. LeMatRho should be
its own standalone dataset. The
push.pyscript needs modification tohandle LeMatRho-specific features and schema separately.
HuggingFace schema verification. Need to confirm the HF dataset schema
includes the new columns and that compressed grid fields serialise correctly
as JSON strings when loaded back.
Stale
.env.examplereference. The.env.examplefile still containsLEMATERIALFETCHER_CHGSUM_SCRIPT_PATH(line 105), which was removed fromthe actual codebase. This line should be deleted.
No subprocess timeout control. The pymatgen wrappers (
BaderAnalysis,ChargemolAnalysis) do not expose subprocess timeout parameters. If anexternal tool hangs, the worker process will block indefinitely. Monitor in
production.
Memory profiling. Each decompressed CHGCAR can be 100-500 MB. With 4
workers, peak RSS could reach multiple GB. Needs profiling with real data
before scaling up
--num-workers.Quick-Start Checklist for a New Engineer
pip install -e ".[dev]".env.exampleto.envand fill in your AWS credentials.pytest tests/fetcher/lematrho/ -v(no external toolsneeded; everything is mocked).
lematerial_fetcher --debug lematrho run \ --output-dir ./test_output \ --num-workers 1 \ --parquet-chunk-size 5 \ --limit 10utils.pyfirst -- it's the shared foundation. Thenpipeline.pyforthe direct pipeline, or
fetch.py+transform.pyfor the traditional one.