Skip to content

Catalog plugin#74

Draft
avaldebe wants to merge 61 commits into
masterfrom
catalog-plugin
Draft

Catalog plugin#74
avaldebe wants to merge 61 commits into
masterfrom
catalog-plugin

Conversation

@avaldebe

@avaldebe avaldebe commented May 14, 2025

Copy link
Copy Markdown
Collaborator

Uses the CLI plugin mechanism from #71 and the chained CLI commands from #73 to provide new command:

$ airbase catalog --help
Usage: airbase catalog [OPTIONS]

  Combine station metadata with observation metadata from files found on the
  donwload root path (set by `airbase --path PATH`).

Options:
  --path, --data-path DATA        Override catalog search path.
  --catalog [DATA|PATH]/catalog.parquet
                                  Override combined metadata path
  --metadata PATH/metadata.csv    Override station metadata path.
  --help                          Show this message and exit.

The catalog file is meant to sit at the top of a data directory.
It contains the most relevant station metadata only for the stations/pollutants that have observation files:

  • Country and country code (as defined by the Parquet API)
  • Station name and EoI code
  • Pollutant name and sampling point ID
  • Station longitude, latitude and altitude
  • Station type (background|industrial|traffic) and area (rural|...|suburban|urban)
  • Aggregation type/frequency (hour|day|var)
  • Actual timezone (e.g. Etc/GMT+1 for hourly data)

From the observation files the catalog contains

  • Relative path relative path to the catalog file
  • Time start and end (of valid observations)

The catalog file should help to narrow down the files that need to be open for a particular task.

With this new command, it is possible to put several tasks we do daily at work into single command invocation

airbase --quiet --overwrite --path /very/long/path/.../download \
  --metadata -p SO2 -p PM10 -p O3 -p NO2 -p CO -p NO -p PM2.5 \
  unverified --frequency hourly \
  unverified --frequency daily \
  catalog --data-path /very/long/path/.../download/unverified

This is what it does

# update the station metadata file at `/very/long/path/.../download/metadata.csv`
# update the SO2, PM10, O3, NO2, CO, NO and PM2.5 observation files
airbase --quiet --overwrite --path /very/long/path/.../download \
  --metadata -p SO2 -p PM10 -p O3 -p NO2 -p CO -p NO -p PM2.5 \

# write hourly unverified files to `/very/long/path/.../download/unverified/hourly/`
  unverified --frequency hourly \

# write hourly unverified files to `/very/long/path/.../download/unverified/daily/`
  unverified --frequency daily \

# update the catalog file at `/very/long/path/.../download/unverified//catalog.parquet`
# combining station metadata with the start/end times from all observation files
# found on `/very/long/path/.../download/unverified/`
  catalog --data-path /very/long/path/.../download/unverified

I'll probably fine tune the catalog options, but this should give you a good idea of what this PR wants to accomplish

@avaldebe

Copy link
Copy Markdown
Collaborator Author

and these are the contents of the catalog file

>>> import polars as pl
>>> df = pl.read_parquet("catalog.parquet")
>>> df
shape: (14_893, 16)
┌─────────────────────────────────┬─────────┬──────────────┬──────────────────────────────┬───┬─────────┬───────────┬─────────────────────────┬─────────────────────────┐
│ filename                        ┆ Country ┆ Country Code ┆ Air Quality Station EoI Code ┆ … ┆ AggType ┆ Timezone  ┆ Start                   ┆ End                     │
│ ---                             ┆ ---     ┆ ---          ┆ ---                          ┆   ┆ ---     ┆ ---       ┆ ---                     ┆ ---                     │
│ str                             ┆ enum    ┆ enum         ┆ str                          ┆   ┆ enum    ┆ cat       ┆ datetime[ns, UTC]       ┆ datetime[ns, UTC]       │
╞═════════════════════════════════╪═════════╪══════════════╪══════════════════════════════╪═══╪═════════╪═══════════╪═════════════════════════╪═════════════════════════╡
│ daily/CY/SPO-CY0002R_00005_102… ┆ Cyprus  ┆ CY           ┆ CY0002R                      ┆ … ┆ hour    ┆ Etc/GMT+1 ┆ 2024-01-01 00:00:00 UTC ┆ 2024-12-31 23:00:00 UTC │
│ daily/CY/SPO-CY0002R_06001_100… ┆ Cyprus  ┆ CY           ┆ CY0002R                      ┆ … ┆ hour    ┆ Etc/GMT+1 ┆ 2024-01-01 00:00:00 UTC ┆ 2024-12-31 23:00:00 UTC │
│ daily/CY/SPO-CY0004A_00005_100… ┆ Cyprus  ┆ CY           ┆ CY0004A                      ┆ … ┆ hour    ┆ Etc/GMT+1 ┆ 2024-01-01 00:00:00 UTC ┆ 2024-12-31 23:00:00 UTC │
│ daily/DE/SPO.DE_DEBE010_PM2_da… ┆ Germany ┆ DE           ┆ DEBE010                      ┆ … ┆ day     ┆ Etc/GMT+1 ┆ 2023-12-31 23:00:00 UTC ┆ 2024-12-31 23:00:00 UTC │
│ daily/DE/SPO.DE_DEBE034_PM1_da… ┆ Germany ┆ DE           ┆ DEBE034                      ┆ … ┆ day     ┆ Etc/GMT+1 ┆ 2023-12-31 23:00:00 UTC ┆ 2024-12-31 23:00:00 UTC │
│ …                               ┆ …       ┆ …            ┆ …                            ┆ … ┆ …       ┆ …         ┆ …                       ┆ …                       │
│ hourly/XK/SPO-XK0012A_00007_10… ┆ Kosovo  ┆ XK           ┆ XK0012A                      ┆ … ┆ hour    ┆ Etc/GMT+1 ┆ 2023-12-31 23:00:00 UTC ┆ 2025-05-14 04:00:00 UTC │
│ hourly/XK/SPO-XK0012A_00008_10… ┆ Kosovo  ┆ XK           ┆ XK0012A                      ┆ … ┆ hour    ┆ Etc/GMT+1 ┆ 2023-12-31 23:00:00 UTC ┆ 2025-05-14 04:00:00 UTC │
│ hourly/XK/SPO-XK0012A_00010_10… ┆ Kosovo  ┆ XK           ┆ XK0012A                      ┆ … ┆ hour    ┆ Etc/GMT+1 ┆ 2023-12-31 23:00:00 UTC ┆ 2025-05-14 04:00:00 UTC │
│ hourly/XK/SPO-XK0012A_00038_10… ┆ Kosovo  ┆ XK           ┆ XK0012A                      ┆ … ┆ hour    ┆ Etc/GMT+1 ┆ 2023-12-31 23:00:00 UTC ┆ 2025-05-14 04:00:00 UTC │
│ hourly/XK/SPO-XK0012A_06001_10… ┆ Kosovo  ┆ XK           ┆ XK0012A                      ┆ … ┆ hour    ┆ Etc/GMT+1 ┆ 2023-12-31 23:00:00 UTC ┆ 2025-05-14 04:00:00 UTC │
└─────────────────────────────────┴─────────┴──────────────┴──────────────────────────────┴───┴─────────┴───────────┴─────────────────────────┴─────────────────────────┘
>>> from pprint import pprint
>>> pprint(df.schema)
Schema([('filename', String),
        ('Country',
         Enum(categories=['Andorra', 'Albania', 'Austria', 'Bosnia and Herzegovina', 'Belgium', 'Bulgaria', 'Switzerland', 'Cyprus', 'Czechia', 'Germany', 'Denmark', 'Estonia', 'Spain', 'Finland', 'France', 'United Kingdom', 'Greece', 'Croatia', 'Hungary', 'Ireland', 'Iceland', 'Italy', 'Lithuania', 'Luxembourg', 'Latvia', 'Montenegro', 'North Macedonia', 'Malta', 'Netherlands', 'Norway', 'Poland', 'Portugal', 'Romania', 'Serbia', 'Sweden', 'Slovenia', 'Slovakia', 'Turkey', 'Kosovo'])),
        ('Country Code',
         Enum(categories=['AD', 'AL', 'AT', 'BA', 'BE', 'BG', 'CH', 'CY', 'CZ', 'DE', 'DK', 'EE', 'ES', 'FI', 'FR', 'GB', 'GR', 'HR', 'HU', 'IE', 'IS', 'IT', 'LT', 'LU', 'LV', 'ME', 'MK', 'MT', 'NL', 'NO', 'PL', 'PT', 'RO', 'RS', 'SE', 'SI', 'SK', 'TR', 'XK'])),
        ('Air Quality Station EoI Code', String),
        ('Air Quality Station Name', String),
        ('Sampling Point Id', String),
        ('Air Pollutant', Categorical(ordering='physical')),
        ('Longitude', Float64),
        ('Latitude', Float64),
        ('Altitude', Float32),
        ('Air Quality Station Type',
         Enum(categories=['background', 'industrial', 'traffic'])),
        ('Air Quality Station Area',
         Enum(categories=['rural', 'rural-nearcity', 'rural-regional', 'rural-remote', 'suburban', 'urban'])),
        ('AggType', Enum(categories=['hour', 'day', 'var'])),
        ('Timezone', Categorical(ordering='physical')),
        ('Start', Datetime(time_unit='ns', time_zone='UTC')),
        ('End', Datetime(time_unit='ns', time_zone='UTC'))])

JohnPaton
JohnPaton previously approved these changes Oct 3, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants