Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,10 @@ podman run -it --rm --entrypoint="" sunflow bash
### Arguments

- `--dataset` - Choose between KNMI or DWD data sources
- `--bbox` - Predefined bounding boxes (DENMARK, NW_EUROPE, CUSTOM)
- `--custom-bbox` - Custom bounding box (lon_min,lat_min,lon_max,lat_max)
- `--domain_satellite` - Domain for required satellite input coverage (DENMARK, NW_EUROPE, NW_EUROPE_SATELLITE, CUSTOM)
- `--custom-domain-satellite` - Custom `domain_satellite` (lon_min,lat_min,lon_max,lat_max)
- `--domain_nowcast` - Domain written to output (DENMARK, NW_EUROPE, NW_EUROPE_SATELLITE, CUSTOM, defaults to `--domain_satellite`)
- `--custom-domain-nowcast` - Custom `domain_nowcast` (lon_min,lat_min,lon_max,lat_max)
- `--time` - Specific time for processing in ISO8601 format
- `--start-time` - Start of a time range in ISO8601 format (use with `--end-time`)
- `--end-time` - End of a time range in ISO8601 format, inclusive (use with `--start-time`)
Expand Down
8 changes: 4 additions & 4 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ KNMI:
sds: sds
sds_cs: sds_cs
# Required: Filename format for input data files
# filename_format: "{dataset_name}_{timestamp}_{bbox_choice}.nc"
# filename_format: "{dataset_name}_{timestamp}_{domain_satellite_choice}.nc"
# Format for PDS data:
filename_format: "NetCDF4_sds_{pds_timestamp}.nc"
# Available template variables:
# {dataset_name}: Dataset name (e.g., KNMI)
# {bbox_choice}: Bounding box identifier (e.g., NW_EUROPE)
# {domain_satellite_choice}: Satellite domain identifier (e.g., NW_EUROPE_SATELLITE)
# {timestamp}: Compact format YYYYMMDDHHMM
# {pds_timestamp}: PDS format YYYY-MM-DDTHH_MM_SSZ
# {year}: Four-digit year (e.g., 2026)
Expand All @@ -33,5 +33,5 @@ DWD:
sds: SIS
sds_cs: SISc
# Required: Filename format for input data files
# Supports subdirectories via path separators, e.g.: "{year}/{month}/{day}/{dataset_name}_{timestamp}_{bbox_choice}.nc"
filename_format: "{dataset_name}_{timestamp}_{bbox_choice}.nc"
# Supports subdirectories via path separators, e.g.: "{year}/{month}/{day}/{dataset_name}_{timestamp}_{domain_satellite_choice}.nc"
filename_format: "{dataset_name}_{timestamp}_{domain_satellite_choice}.nc"
6 changes: 4 additions & 2 deletions sunflow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
from dataclasses import dataclass
from typing import Self

# Predefined Bounding Box (BBOX) options
BBOX_OPTIONS: dict[str, str | None] = {
# Predefined domain options
# Format: lon_min,lat_min,lon_max,lat_max
DOMAIN_OPTIONS: dict[str, str | None] = {
"DENMARK": "4,50,18,62",
"NW_EUROPE": "-10.75,47.25,20,63.5",
"NW_EUROPE_SATELLITE": "-20.75,37.25,30,73.5",
"CUSTOM": None,
}

Expand Down
88 changes: 56 additions & 32 deletions sunflow/data_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@
from loguru import logger

from .config import NowcastConfig, S3Config
from .geospatial import subset_to_bbox
from .geospatial import subset_to_bbox, validate_dataset_covers_domain
from .validation import DataNotAvailableError


def fetch_current_data_with_retry(
time_step: datetime,
run_mode: str,
config: dict[str, Any],
bbox: str,
domain_satellite: str,
dataset_name: str,
bbox_choice: str,
domain_satellite_choice: str,
nowcast_config: NowcastConfig,
s3_config: S3Config,
custom_time: bool = False,
Expand All @@ -32,9 +32,9 @@ def fetch_current_data_with_retry(
time_step: Datetime object for data to fetch
run_mode: One of 'download', 'files', or 's3'
config: Dataset configuration dict
bbox: Bounding box string
domain_satellite: Domain string for required satellite input coverage
dataset_name: Name of dataset (options: KNMI, DWD)
bbox_choice: Bounding box choice string
domain_satellite_choice: Domain choice used in input filenames
nowcast_config: NowcastConfig object
s3_config: S3Config object
custom_time: Whether a custom time was specified (no retry if True)
Expand All @@ -57,26 +57,28 @@ def fetch_current_data_with_retry(
download_current_data(
time_step,
config,
bbox,
domain_satellite,
dataset_name,
bbox_choice,
domain_satellite_choice,
nowcast_config.satellite_data_directory,
)
case "files":
check_current_data_existence_file(
time_step,
dataset_name,
bbox_choice,
domain_satellite_choice,
nowcast_config.satellite_data_directory,
config["filename_format"],
domain_satellite,
)
case "s3":
check_current_data_existence_s3(
time_step,
dataset_name,
bbox_choice,
domain_satellite_choice,
s3_config,
config["filename_format"],
domain_satellite,
)

logger.info(f"Data successfully retrieved for {time_step_str}")
Expand Down Expand Up @@ -108,7 +110,7 @@ def fetch_current_data_with_retry(
def generate_input_filename(
time_step: datetime,
dataset_name: str,
bbox_choice: str,
domain_satellite_choice: str,
filename_format: str,
) -> str:
"""Generate input filename based on a format template string.
Expand All @@ -118,15 +120,15 @@ def generate_input_filename(
Args:
time_step: Datetime of the data timestep.
dataset_name: Name of dataset (options: KNMI, DWD).
bbox_choice: Bounding box identifier.
domain_satellite_choice: Satellite domain identifier.
filename_format: Template string from config['filename_format'].

Returns:
Filename string with all template variables substituted.

Template variables supported:
{dataset_name}: Name of the dataset
{bbox_choice}: Bounding box identifier
{domain_satellite_choice}: Satellite domain identifier
{timestamp}: Compact format YYYYMMDDHHMM
{pds_timestamp}: PDS format YYYY-MM-DDTHH_MM_SSZ
{year}: Four-digit year (e.g. 2026)
Expand All @@ -138,16 +140,14 @@ def generate_input_filename(
``{year}/{month}/{day}/{dataset_name}_{timestamp}.nc`` resolves to a
file inside a date-structured subdirectory of *satellite_data_directory*.
"""
format_template = filename_format

# Generate both time formats
timestamp_compact = time_step.strftime("%Y%m%d%H%M")
timestamp_pds = time_step.strftime("%Y-%m-%dT%H_%M_%SZ")

# Substitute template variables
filename = format_template.format(
filename = filename_format.format(
dataset_name=dataset_name,
bbox_choice=bbox_choice,
domain_satellite_choice=domain_satellite_choice,
timestamp=timestamp_compact,
pds_timestamp=timestamp_pds,
year=time_step.strftime("%Y"),
Expand All @@ -161,35 +161,46 @@ def generate_input_filename(
def check_current_data_existence_file(
request_time: datetime,
dataset_name: str,
bbox_choice: str,
domain_satellite_choice: str,
satellite_data_directory: str,
filename_format: str,
required_domain: str | None = None,
) -> None:
"""Check for existence of current data file.

Args:
request_time: Python datetime object.
dataset_name: Name of dataset (options: KNMI, DWD).
bbox_choice: Bounding box identifier.
domain_satellite_choice: Satellite domain identifier.
satellite_data_directory: Directory containing data files.
filename_format: Template string from config['filename_format'].
required_domain: Optional domain string that must be covered by the file.

Raises:
FileNotFoundError: If the expected file does not exist.
RuntimeError: If required_domain is provided and file coverage is insufficient.
"""
filename = generate_input_filename(
request_time, dataset_name, bbox_choice, filename_format
request_time, dataset_name, domain_satellite_choice, filename_format
)
filepath = os.path.join(satellite_data_directory, filename)
logger.info(f"Checking existence of data at {filepath}")
if not os.path.exists(filepath):
raise FileNotFoundError(f"Input file not found: {filepath}")

if required_domain:
with xr.open_dataset(filepath) as ds:
validate_dataset_covers_domain(
ds,
required_domain,
f"Input file {filepath}",
)


def load_data_from_files(
time_steps: list[datetime],
dataset_name: str,
bbox_choice: str,
domain_satellite_choice: str,
satellite_data_directory: str,
data_type: str,
filename_format: str,
Expand All @@ -200,7 +211,7 @@ def load_data_from_files(
Args:
time_steps: List of timesteps to load.
dataset_name: Name of dataset (options: KNMI, DWD).
bbox_choice: Bounding box identifier.
domain_satellite_choice: Satellite domain identifier.
satellite_data_directory: Directory containing data files.
data_type: Type of data for logging (options: past data, clearsky data).
filename_format: Template string from config['filename_format'].
Expand All @@ -215,7 +226,7 @@ def load_data_from_files(

for time_step in time_steps:
filename = generate_input_filename(
time_step, dataset_name, bbox_choice, filename_format
time_step, dataset_name, domain_satellite_choice, filename_format
)
filepath = os.path.join(satellite_data_directory, filename)

Expand All @@ -241,24 +252,27 @@ def load_data_from_files(
def check_current_data_existence_s3(
request_time: datetime,
dataset_name: str,
bbox_choice: str,
domain_satellite_choice: str,
s3_config: S3Config,
filename_format: str,
required_domain: str | None = None,
) -> None:
"""Check for existence of current data file in S3.

Args:
request_time: Python datetime object.
dataset_name: Name of dataset (options: KNMI, DWD).
bbox_choice: Bounding box identifier.
domain_satellite_choice: Satellite domain identifier.
s3_config: S3 configuration object.
filename_format: Template string from config['filename_format'].
required_domain: Optional domain string that must be covered by the file.

Raises:
FileNotFoundError: If the expected file does not exist in S3.
RuntimeError: If required_domain is provided and file coverage is insufficient.
"""
filename = generate_input_filename(
request_time, dataset_name, bbox_choice, filename_format
request_time, dataset_name, domain_satellite_choice, filename_format
)
s3_path = f"s3://{s3_config.bucket}/{s3_config.input_prefix}/{filename}"
logger.info(f"Checking existence of data at {s3_path}")
Expand All @@ -270,6 +284,16 @@ def check_current_data_existence_s3(
)
if not fs.exists(s3_path):
raise FileNotFoundError(f"Input file not yet found in S3: {s3_path}")

if required_domain:
with fs.open(s3_path, "rb") as f:
with xr.open_dataset(f, engine="h5netcdf") as ds:
ds_loaded = ds.load()
validate_dataset_covers_domain(
ds_loaded,
required_domain,
f"Input file {s3_path}",
)
except FileNotFoundError:
raise
except Exception as e:
Expand All @@ -280,7 +304,7 @@ def check_current_data_existence_s3(
def load_data_from_s3(
time_steps: list[datetime],
dataset_name: str,
bbox_choice: str,
domain_satellite_choice: str,
s3_config: S3Config,
data_type: str,
filename_format: str,
Expand All @@ -291,7 +315,7 @@ def load_data_from_s3(
Args:
time_steps: List of timesteps to load.
dataset_name: Name of dataset (options: KNMI, DWD).
bbox_choice: Bounding box identifier.
domain_satellite_choice: Satellite domain identifier.
s3_config: S3 configuration object.
data_type: Type of data for logging (options: past data, clearsky data).
filename_format: Template string from config['filename_format'].
Expand All @@ -310,7 +334,7 @@ def load_data_from_s3(

for time_step in time_steps:
filename = generate_input_filename(
time_step, dataset_name, bbox_choice, filename_format
time_step, dataset_name, domain_satellite_choice, filename_format
)
s3_path = f"s3://{s3_config.bucket}/{s3_config.input_prefix}/{filename}"

Expand Down Expand Up @@ -340,7 +364,7 @@ def fetch_clearsky_with_fallback(
config: dict[str, Any],
bbox: str,
dataset_name: str,
bbox_choice: str,
domain_satellite_choice: str,
nowcast_config: NowcastConfig,
s3_config: S3Config,
) -> xr.Dataset:
Expand All @@ -358,7 +382,7 @@ def fetch_clearsky_with_fallback(
config: Dataset configuration dict.
bbox: Bounding box string.
dataset_name: Name of dataset (options: KNMI, DWD).
bbox_choice: Bounding box identifier.
domain_satellite_choice: Satellite domain identifier.
nowcast_config: NowcastConfig object.
s3_config: S3Config object.

Expand Down Expand Up @@ -393,7 +417,7 @@ def fetch_clearsky_with_fallback(
fetched = load_data_from_files(
[source_time],
dataset_name,
bbox_choice,
domain_satellite_choice,
nowcast_config.satellite_data_directory,
"clearsky data",
config["filename_format"],
Expand All @@ -403,7 +427,7 @@ def fetch_clearsky_with_fallback(
fetched = load_data_from_s3(
[source_time],
dataset_name,
bbox_choice,
domain_satellite_choice,
s3_config,
"clearsky data",
config["filename_format"],
Expand Down
9 changes: 6 additions & 3 deletions sunflow/downloaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def download_current_data(
config: dict[str, Any],
bbox: str,
dataset_name: str,
bbox_choice: str,
domain_satellite_choice: str,
satellite_data_directory: str,
) -> None:
"""Download satellite data for a single timestep and save it to disk.
Expand All @@ -92,7 +92,7 @@ def download_current_data(
base_url, variables, format, crs, and filename_format.
bbox: Bounding box string lon_min,lat_min,lon_max,lat_max.
dataset_name: Name of the dataset source (options: KNMI, DWD).
bbox_choice: Bounding box identifier used in the output filename.
domain_satellite_choice: Satellite domain identifier used in the output filename.
satellite_data_directory: Directory where the NetCDF file is saved.
"""
if dataset_name == "KNMI":
Expand Down Expand Up @@ -120,7 +120,10 @@ def download_current_data(
# Save data
current_time_dt = current_time.astype("datetime64[s]").astype(datetime)
filename = generate_input_filename(
current_time_dt, dataset_name, bbox_choice, config["filename_format"]
current_time_dt,
dataset_name,
domain_satellite_choice,
config["filename_format"],
)
output_path = os.path.join(satellite_data_directory, filename)
merged_ds.to_netcdf(output_path)
Expand Down
Loading
Loading