Temporal aggregation infrastructure (Steps 1-3)#15
Draft
espg wants to merge 2 commits into
Draft
Conversation
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds the infrastructure for temporal aggregation pipelines, enabling magg to support use cases beyond spatial binning (e.g., computing storm summary statistics from reanalysis data). Implements Steps 1–3 from the implementation plan on #12.
Motivated by the aggregation patterns in antarctic_AR_dataset, which does identical orchestration/dispatch but for temporal reduction of gridded fields under spatial masks.
What's done
pipeline.typeconfig field — configs can now declare themselves asspatial(default, backward compatible),temporal, orevent. Validation branches per type: spatial pipelines validate grid/source/function; temporal pipelines validate collections, spatial_func, and temporal_reducer references.Shared orchestration (
orchestrate.py) — extracted the Lambda dispatch machinery (ThreadPoolExecutor + boto3 invoke + retry + log parsing + cost estimation) that was duplicated between magg and antarctic_AR_dataset.invoke_lambda.pyrefactored to use it.Generalized auth —
get_s3_credentials(daac=)accepts any DAAC (NSIDC, GES_DISC, etc.);get_nsidc_s3_credentialskept as alias.Temporal building blocks (
temporal.py) — ported from antarctic_AR_dataset:Max,Min,Sum,WeightedMean,FirstLandfallCapturemax,min,weighted_sum,weighted_mean,max_gradient,min_over_levelsspecs_from_config()bridge to convert YAML config to internal spec dictsmerra2_storm.yamlbuilt-in config — 15 storm metrics matching the antarctic_AR_dataset registry, fully validated by the temporal config path.Tests — 187 pass (88 new). Covers accumulators, spatial functions with synthetic xarray grids, config roundtrips, temporal validation, orchestrate pure functions.
What remains
-process_storm()— the main temporal processing function (analogous toprocess_morton_cell()for spatial). Reads MERRA-2 from S3 via xarray, applies spatial funcs per timestep, feeds streaming accumulators, returns scalar dict. To be ported/adapted fromartools/cloud/worker.py.- Temporal Lambda handler —deployment/aws/temporal_handler.pywrappingprocess_storm()with serialization/deserialization of event payloads (base64 storm masks, etc.).- Temporal orchestrator CLI — amain()equivalent that loads storm catalogs, builds granule indices, and dispatches temporal Lambda invocations usingorchestrate.dispatch_lambda().The above was the original plan, written before #13 landed. Now that
runner.pyprovides the composableagg()API with pluggable backends, the remaining work is:process_event()— the temporal equivalent ofprocess_morton_cell(). Reads gridded data from S3 via xarray, applies spatial funcs per timestep, feeds streaming accumulators, returns scalar dict per event. Ported/adapted fromartools/cloud/worker.py. Namedprocess_event()(notprocess_storm()) since it generalizes beyond storms.agg()inrunner.py— branch onget_pipeline_type(config) == "temporal"to call_run_temporal_local()/_run_temporal_lambda(), which iterate over events (not morton cells) and callprocess_event(). The existing CLI (invoke_lambda.py) would work for temporal pipelines with no changes — just pass--config merra2_storm.yaml.runner.pyLambda dispatch —_run_lambda()/_invoke_lambda_cell()duplicates retry + ThreadPoolExecutor logic thatorchestrate.dispatch_lambda()now provides. Should refactor to use it._rainfall = PRECCU + PRECLSin config (currently special-cased in artools worker).pipeline.type: event) — dynamic geometry, lifecycle metadata. Deferred until temporal works end-to-end.Closes #12 when all remaining items are complete.
Test plan
pytest tests/)default_config("atl06")works unchanged, defaults tospatialmerra2_storm.yamlloads and validates as temporal pipeline🤖 Generated with Claude Code