Skip to content

Temporal aggregation #12

@espg

Description

@espg

Motivation

The antarctic_AR_dataset repo implements a Lambda-based aggregation pipeline for computing storm attributes from MERRA-2 reanalysis data. The architecture there — declarative metric definitions, Lambda orchestration, catalog-driven dispatch, S3 data access — is structurally very similar to what magg already does for spatial aggregation. This issue proposes unifying the temporal aggregation pattern into magg as a second pipeline type.

Common Design Elements

Both repos share similar architecture at the structural level:

Pattern magg antarctic_AR_dataset
Declarative metrics YAML config with function/expression strings AggregationSpec dataclass with spatial_func/temporal_reducer strings
Function dispatch resolve_function("np.min") → callable String lookup in spatial_functions.py
Lambda orchestration Catalog → dispatch → collect results Catalog → dispatch → collect results
Catalog building CMR query → morton→granule_urls mapping earthaccess query → collection→date→url index
S3 data access h5coro byte-range reads s3fs + xarray
Auth earthaccess.login() → temp S3 creds Same
Output Zarr (gridded) HDF5/DataFrame (tabular)

The orchestrator/worker/catalog trinity is nearly identical. The config-driven function dispatch is the same idea expressed differently (YAML vs dataclass).

Key Architectural Differences

1. Processing axis — this is the fundamental divergence

  • magg: spatial. Group irregular points by cell, aggregate within each cell. No time dimension.
  • AR: temporal. Track an object across timesteps, apply spatial funcs per timestep, then reduce across time.

2. Statefulness

  • magg: stateless — load all data, groupby, apply. Each cell is independent.
  • AR: stateful — streaming accumulators (MaxAccumulator, WeightedMeanAccumulator) maintain running state across timesteps because you can't hold all timesteps in memory at once.

3. Processing unit

  • magg: a fixed spatial cell (morton parent at order 6, ~100km)
  • AR: a dynamic spatiotemporal object (storm with evolving binary mask)

4. Data model

  • magg: point cloud (irregular observations) → regular grid
  • AR: regular grid (MERRA-2) → scalar attributes per object

What Could Be Unified

Three layers are ready to share today with minimal refactoring:

  1. Config format — the YAML data_source / aggregation / output structure works for both. AR's AggregationSpec maps directly:

    aggregation:
      variables:
        max_T2m_ais:
          source: T2M
          spatial_func: max
          temporal_reducer: max
          mask: ais
  2. Orchestration — catalog building, Lambda dispatch, credential management, result collection. The shape is identical; only the "what does one worker do?" part differs.

  3. Function resolutionresolve_function() already handles numpy/builtins. AR's spatial function registry is the same pattern.

What a Temporal Pipeline Config Might Look Like

pipeline:
  type: temporal          # ← new: tells magg which processing core to use

data_source:
  reader: xarray_s3
  collections:
    T2M_TQV_SLP:
      variables: [T2M, TQV, SLP]
    VFLXQV_PRECIP:
      variables: [VFLXQV, PRECLSC, PRECCON]
  masks:
    ais: path/to/ais_mask.nc

aggregation:
  variables:
    max_T2m_ais:
      source: T2M
      spatial_func: max
      temporal_reducer: max
      mask: ais
      anomaly: true

output:
  format: dataframe       # ← not gridded

An event/object pipeline would add object identity:

pipeline:
  type: event

data_source:
  objects:
    catalog: storms.h5     # object catalog with masks per timestep
    identity: storm_id
  # ... same collections as temporal

aggregation:
  # same as temporal, but applied per-object

Major Challenges and Blockers

1. The accumulator abstraction (hard, but solvable)
magg currently does batch reduction: load everything, groupby, apply. The AR pattern needs streaming accumulators because you can't fit all timesteps in memory. These are fundamentally different execution models. We'd need to either:

  • Add an Accumulator protocol to magg and let the config specify which mode to use, or
  • Keep batch mode for spatial pipelines and streaming mode for temporal ones

2. Multi-collection data fusion (medium difficulty)
magg reads one product. AR reads 5+ MERRA-2 collections per worker, joining by time. The config system would need a collections concept (plural data sources joined along a dimension), not just one data_source.

3. Mask-based vs point-based spatial operations (hard)
These are fundamentally different function signatures:

  • magg: f(array_of_point_values) → scalar (e.g., np.mean(h_li_values))
  • AR: f(gridded_field, binary_mask, cell_areas) → scalar (e.g., area-weighted max under mask)

A unified function dispatch would need to know which kind of spatial operation it's dealing with.

4. Dynamic vs static geometry (design challenge)
magg cells are fixed geometry. Storm events evolve over time — the mask changes every timestep. An "event" pipeline would need the concept of a time-varying spatial footprint, plus lifecycle metadata (first_landfall, duration). This is closer to object tracking than aggregation.

5. Output format divergence
magg writes Zarr (gridded, chunked by morton cell). Temporal/event pipelines produce tabular output (one row per object × many scalar columns). The output layer needs to be polymorphic.

Proposed Approach

Unify incrementally and from the infrastructure up, not the processing down:

  1. Now: Add a pipeline.type field to the config schema (default: spatial). This costs nothing but opens the door.

  2. Short-term: Extract the orchestration layer (catalog dispatch, Lambda management, auth, result collection) into a shared module. Both repos already do the same thing — just DRY it up.

  3. Medium-term: Add temporal as a second pipeline type with its own processing function, but sharing config loading, validation, function resolution, and orchestration. The AR repo's AggregationSpec maps cleanly to the existing YAML schema with a few new fields (temporal_reducer, mask, anomaly).

  4. Longer-term: Event/object pipelines. This is where it gets genuinely hard because of dynamic geometry and lifecycle tracking. Defer until temporal works.

The biggest risk is over-abstracting too early. The shared infrastructure (config, orchestration, auth) is low-risk to unify. The processing cores should stay separate and pluggable — trying to make one function handle points-in-cells AND gridded-field-under-mask AND streaming-accumulators-over-time would create a mess. Better to have process_spatial(), process_temporal(), and eventually process_event() as peers under a common framework.

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions