Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
539 changes: 102 additions & 437 deletions deployment/aws/invoke_lambda.py

Large diffs are not rendered by default.

332 changes: 311 additions & 21 deletions notebooks/custom_aggregations.ipynb

Large diffs are not rendered by default.

584 changes: 584 additions & 0 deletions notebooks/jupyterhub_example.ipynb

Large diffs are not rendered by default.

54 changes: 27 additions & 27 deletions notebooks/rasterized_zarr.ipynb

Large diffs are not rendered by default.

15 changes: 13 additions & 2 deletions src/magg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,21 @@
__version__ = "0.1.0"

# Export main processing functions
from .auth import get_nsidc_s3_credentials
from .config import PipelineConfig, default_config, get_child_order, get_store_path, load_config
from .auth import get_edl_token, get_nsidc_s3_credentials
from .config import (
PipelineConfig,
default_config,
get_child_order,
get_driver,
get_store_path,
load_config,
)
from .processing import (
calculate_cell_statistics,
process_morton_cell,
write_dataframe_to_zarr,
)
from .runner import agg
from .schema import xdggs_spec, xdggs_zarr_template
from .store import open_store, parse_s3_path

Expand All @@ -26,12 +34,15 @@
"calculate_cell_statistics",
"default_config",
"get_child_order",
"get_driver",
"get_edl_token",
"get_nsidc_s3_credentials",
"get_store_path",
"load_config",
"open_store",
"parse_s3_path",
"process_morton_cell",
"agg",
"write_dataframe_to_zarr",
"xdggs_spec",
"xdggs_zarr_template",
Expand Down
180 changes: 41 additions & 139 deletions src/magg/__main__.py
Original file line number Diff line number Diff line change
@@ -1,180 +1,82 @@
"""Local processing runner for magg.
"""CLI entry point for magg processing.

Usage:
python -m magg --config atl06.yaml --catalog catalog.json
python -m magg --config atl06.yaml --catalog catalog.json --store ./test.zarr
python -m magg --config atl06.yaml --catalog catalog.json --max-cells 5
python -m magg --config atl06.yaml --catalog catalog.json --morton-cell -4211322
python -m magg --config atl06.yaml --catalog catalog.json --backend lambda
"""

import argparse
import json
import logging
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import os

from zarr import consolidate_metadata

from magg.auth import get_nsidc_s3_credentials
from magg.config import get_child_order, get_store_path, load_config
from magg.processing import process_morton_cell, write_dataframe_to_zarr
from magg.schema import xdggs_zarr_template
from magg.store import open_store

logger = logging.getLogger(__name__)


def _process_and_write(cell, chunk_idx, granule_urls, parent_order, child_order,
s3_creds, store, config):
"""Process a single cell and write results to store."""
df_out, metadata = process_morton_cell(
parent_morton=int(cell),
parent_order=parent_order,
child_order=child_order,
granule_urls=granule_urls,
s3_credentials=s3_creds,
config=config,
)
if not df_out.empty:
write_dataframe_to_zarr(
df_out, store,
chunk_idx=chunk_idx,
child_order=child_order,
parent_order=parent_order,
)
return metadata
from magg.config import load_config
from magg.runner import agg


def main():
parser = argparse.ArgumentParser(
description="magg local processing runner",
description="magg processing runner",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
examples:
python -m magg --config atl06.yaml --catalog catalog.json
python -m magg --config atl06.yaml --catalog catalog.json --store ./test.zarr
python -m magg --config atl06.yaml --catalog catalog.json --max-cells 5
python -m magg --config atl06.yaml --catalog catalog.json --morton-cell -4211322
python -m magg --config atl06.yaml --catalog catalog.json --backend lambda
""",
)
parser.add_argument("--config", required=True, help="Path to pipeline config YAML")
parser.add_argument("--catalog", default=None, help="Path to granule catalog JSON (overrides config)")
parser.add_argument("--store", default=None, help="Output store path (overrides config)")
parser.add_argument("--backend", default="local", choices=["local", "lambda"],
help="Execution backend (default: local)")
parser.add_argument("--driver", default=None, choices=["s3", "https"],
help="Data access driver (default: from config, or s3)")
parser.add_argument("--max-cells", type=int, default=None, help="Limit number of cells (for testing)")
parser.add_argument("--morton-cell", type=str, default=None, help="Process a specific morton cell")
parser.add_argument("--max-workers", type=int, default=4, help="Max concurrent workers (default: 4)")
parser.add_argument("--max-workers", type=int, default=None, help="Max concurrent workers")
parser.add_argument("--overwrite", action="store_true", help="Overwrite existing Zarr template")
parser.add_argument("--dry-run", action="store_true", help="Show what would be processed")
parser.add_argument("--region", default="us-west-2", help="AWS region (default: us-west-2)")
parser.add_argument(
"--function-name",
default=os.environ.get("MAGG_LAMBDA_FUNCTION_NAME", "process-morton-cell"),
help="Lambda function name (default: env MAGG_LAMBDA_FUNCTION_NAME or 'process-morton-cell')",
)
args = parser.parse_args()

logging.basicConfig(level=logging.INFO, format="%(message)s")

# Load config
config = load_config(args.config)
child_order = get_child_order(config)

# Resolve catalog: CLI > config > error
catalog_path = args.catalog or config.catalog
if not catalog_path:
parser.error("No catalog specified (use --catalog or set catalog: in config)")

# Resolve store: CLI > config > error
store_path = args.store or get_store_path(config)
if not store_path:
parser.error("No store path specified (use --store or set output.store: in config)")

# Load catalog
print(f"Loading catalog from {catalog_path}...")
with open(catalog_path) as f:
catalog_data = json.load(f)
metadata = catalog_data["metadata"]
catalog = catalog_data["catalog"]
parent_order = metadata["parent_order"]

print(f" Product: {metadata.get('short_name', '?')}")
print(f" Cells: {metadata['total_cells']}, Granules: {metadata['total_granules']}")
print(f" Parent order: {parent_order}, Child order: {child_order}")

# Select cells
all_cells = list(catalog.keys())
if args.morton_cell:
if args.morton_cell not in catalog:
parser.error(f"Morton cell '{args.morton_cell}' not in catalog")
cells = [args.morton_cell]
elif args.max_cells:
cells = all_cells[:args.max_cells]
else:
cells = all_cells

print(f" Processing {len(cells)} of {len(all_cells)} cells")

if args.dry_run:
granule_counts = [len(catalog[c]) for c in cells]
print(f"\n[DRY RUN] Would process {len(cells)} cells")
print(f" Granules per cell: min={min(granule_counts)}, "
f"max={max(granule_counts)}, avg={sum(granule_counts)/len(granule_counts):.1f}")
print(f" Output: {store_path}")
return

# Authenticate
print("\nAuthenticating with NASA Earthdata...")
s3_creds = get_nsidc_s3_credentials()

# Open store and create template
print(f"Opening store: {store_path}")
store = open_store(store_path)
store = xdggs_zarr_template(
store, parent_order, child_order,
results = agg(
config,
catalog=args.catalog,
store=args.store,
backend=args.backend,
driver=args.driver,
max_cells=args.max_cells,
morton_cell=args.morton_cell,
max_workers=args.max_workers,
overwrite=args.overwrite,
n_parent_cells=metadata["total_cells"],
config=config,
dry_run=args.dry_run,
function_name=args.function_name,
region=args.region,
)

# Build cell-to-index mapping (must use all_cells for correct chunk indices)
cell_to_idx = {cell: idx for idx, cell in enumerate(all_cells)}

# Process
print(f"\nProcessing with {args.max_workers} workers...")
start_time = time.time()
total_obs = 0
cells_with_data = 0
cells_error = 0

with ThreadPoolExecutor(max_workers=args.max_workers) as executor:
futures = {
executor.submit(
_process_and_write,
cell, cell_to_idx[cell], catalog[cell],
parent_order, child_order,
s3_creds, store, config,
): cell
for cell in cells
}

for i, future in enumerate(as_completed(futures), 1):
cell = futures[future]
try:
meta = future.result()
if meta.get("error"):
print(f" [{i}/{len(cells)}] {cell}: {meta['error']}")
else:
obs = meta.get("total_obs", 0)
total_obs += obs
cells_with_data += 1
if i % 10 == 0 or len(cells) <= 20:
print(f" [{i}/{len(cells)}] {cell}: {obs:,} obs")
except Exception as e:
cells_error += 1
print(f" [{i}/{len(cells)}] {cell}: ERROR {e}")

# Consolidate
print("\nConsolidating metadata...")
consolidate_metadata(store, zarr_format=3)

elapsed = time.time() - start_time
print(f"\nDone: {cells_with_data} cells with data, {total_obs:,} obs, "
f"{cells_error} errors, {elapsed:.1f}s")
print(f"Output: {store_path}")
if args.dry_run:
print(f"\n[DRY RUN] Would process {results['total_cells']} cells")
print(f" Granules per cell: min={results['granules_per_cell_min']}, "
f"max={results['granules_per_cell_max']}, "
f"avg={results['granules_per_cell_avg']:.1f}")
print(f" Output: {results['store_path']}")
else:
print(f"\nDone: {results['cells_with_data']} cells with data, "
f"{results['total_obs']:,} obs, {results['cells_error']} errors, "
f"{results['wall_time_s']:.1f}s")
print(f"Output: {results['store_path']}")


if __name__ == "__main__":
Expand Down
28 changes: 24 additions & 4 deletions src/magg/auth.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,35 @@
"""
Orchestrator authentication helper for NASA Earthdata S3 access.
Orchestrator authentication helpers for NASA Earthdata access.

Call get_nsidc_s3_credentials() ONCE in your orchestrator before invoking
Lambda functions. Pass the returned credentials to each Lambda invocation.
Two credential types:

Credentials are valid for approximately 1 hour.
- **S3**: ``get_nsidc_s3_credentials()`` returns STS temporary credentials
for direct S3 access. Only works from within us-west-2.
- **HTTPS**: ``get_edl_token()`` returns a bearer token for HTTPS access.
Works from anywhere.

Call ONCE in the orchestrator before processing. Credentials are valid
for approximately 1 hour.
"""

import earthaccess


def get_edl_token() -> str:
"""Return an Earthdata Login bearer token for HTTPS data access.

Works from any network location (not region-restricted like S3).
The token is used by h5coro's HTTPDriver.

Returns
-------
str
Bearer token string.
"""
auth = earthaccess.login()
return auth.token["access_token"]


def get_nsidc_s3_credentials() -> dict:
"""
Authenticate with NASA Earthdata and return S3 credentials for NSIDC.
Expand Down
54 changes: 53 additions & 1 deletion src/magg/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,13 @@ def extract_granule_info(granule: dict) -> dict:

related_urls = umm.get("RelatedUrls", [])
s3_url = None
https_url = None
for url_obj in related_urls:
url = url_obj.get("URL", "")
if url.startswith("s3://") and url.endswith(".h5"):
s3_url = url
break
elif url.startswith("https://") and url.endswith(".h5") and url_obj.get("Type") == "GET DATA":
https_url = url

points = []
spatial_extent = umm.get("SpatialExtent", {})
Expand All @@ -313,6 +315,7 @@ def extract_granule_info(granule: dict) -> dict:
return {
"granule_id": granule_id,
"s3_url": s3_url,
"https_url": https_url,
"points": points,
}

Expand Down Expand Up @@ -421,6 +424,51 @@ def build_catalog(
return catalog, timings


def _extract_base_urls(granules: list) -> dict:
"""Extract S3 and HTTPS base URLs from the first granule with both.

The base URLs are stored in catalog metadata so the runner can rewrite
S3 URLs to HTTPS URLs at runtime without hardcoding provider paths.

The S3 URL ``s3://bucket/path/file.h5`` maps to the HTTPS URL
``https://host/bucket/path/file.h5``. The file path (everything after
the bucket name) is identical in both, so we extract the S3 bucket
prefix and the HTTPS host+bucket prefix.

Parameters
----------
granules : list
CMR granule list (only the first with both URLs is used).

Returns
-------
dict
``{"s3_base": "s3://bucket", "https_base": "https://host/bucket"}``
or empty.
"""
for granule in granules:
info = extract_granule_info(granule)
s3_url = info.get("s3_url")
https_url = info.get("https_url")
if s3_url and https_url:
# S3: s3://bucket/path/file.h5 → bucket
s3_after = s3_url.split("//", 1)[1] # bucket/path/file.h5
s3_bucket = s3_after.split("/", 1)[0] # bucket
s3_base = f"s3://{s3_bucket}"

# HTTPS: https://host/bucket/path/file.h5 → https://host/bucket
https_after = https_url.split("//", 1)[1] # host/bucket/path/file.h5
# Find where the bucket name appears in the HTTPS path
bucket_idx = https_after.find(f"/{s3_bucket}")
if bucket_idx >= 0:
https_base = "https://" + https_after[: bucket_idx + 1 + len(s3_bucket)]
else:
# Bucket name not in HTTPS path — can't derive mapping
continue
return {"s3_base": s3_base, "https_base": https_base}
return {}


def main():
parser = argparse.ArgumentParser(
description="Build granule catalog from CMR",
Expand Down Expand Up @@ -535,6 +583,9 @@ def main():
f"catalog_{args.short_name}_{start_date}_{end_date}_order{args.parent_order}.json"
)

# Derive base URLs from first granule for driver URL rewriting
access_urls = _extract_base_urls(granules)

output_metadata = {
"short_name": args.short_name,
"version": args.version,
Expand All @@ -545,6 +596,7 @@ def main():
"total_granules": len(granules),
"total_cells": len(catalog),
"created": datetime.now().isoformat(),
**access_urls,
}
if args.cycle:
output_metadata["cycle"] = args.cycle
Expand Down
Loading
Loading