Skip to content

Add EMORID rivers as datasource #1291

Description

@veenstrajelmer

The EMORID riverine database is available on https://dataverse.nioz.nl/dataset.xhtml?persistentId=doi:10.25850/nioz/7b.b.th. This data can be downloaded and converted to source/sink bc files. Some example code to create an index and extract data from the EMORID_v1.zip zipfile:

import os
import dfm_tools as dfmt
import matplotlib.pyplot as plt
plt.close("all")
import pandas as pd
import geopandas as gpd
import numpy as np
import zipfile
import hydrolib.core.dflowfm as hcdfm
import requests

dir_zip = r"c:\Users\veenstra\Downloads"
file_zip = os.path.join(dir_zip, "EMORID_v1.zip")

# download zipfile if not present
if not os.path.exists(file_zip):
    resp = requests.get(r"https://dataverse.nioz.nl/api/access/datafile/10223?gbrecs=true")
    resp.raise_for_status() #raise HTTPError if url not exists
    with open(file_zip, 'wb') as f:
        f.write(resp.content)

archive = zipfile.ZipFile(file_zip, "r")

def get_emorid_station_gdf():
    # create index
    filepaths = [x.filename for x in archive.filelist if x.filename.endswith(".dat")]
    df_index1 = pd.DataFrame(filepaths)
    df_index2 = df_index1[0].str.split("/", expand=True)
    df_index = pd.concat([df_index1, df_index2], axis=1)
    df_index.columns = ["filepath", "version", "country", "version_yr", "type", "filename"]
    list_lat = []
    list_lon = []
    for file_dat in df_index["filepath"]:
        with archive.open(file_dat) as datfile:
            coordsline = datfile.readline()
        coords_list = coordsline.decode("utf-8").strip().split(" ")
        list_lat.append(float(coords_list[-2].strip(",")))
        list_lon.append(float(coords_list[-1]))
    
    geom = gpd.points_from_xy(list_lon, list_lat)
    gdf_index = gpd.GeoDataFrame(df_index, geometry=geom, crs="EPSG:4326")
    return gdf_index

def get_station_data(gdf_row):
    locationname = gdf_row["filename"].replace(".dat","")
    file_dat = gdf_row["filepath"]
    datfile = archive.open(file_dat)
    for line in datfile:
        # if line.startswith(b"River"):
        #     coordsline = line
        if line.startswith(b"year"):
            headerline = line
        if line.startswith(b"[-]"):
            unitsline = line
            # break after the units line to avoid redundant i/o
            break
    # coords_list = coordsline.decode("utf-8").strip().split(" ")
    # coords_lat = float(coords_list[-2].strip(","))
    # coords_lon = float(coords_list[-1])
    header_list = headerline.decode("utf-8").strip().split("\t")
    units_list = unitsline.decode("utf-8").strip().split("\t")
    quantity_unit_pairs = dict(zip(header_list, units_list))
    # skiprows=0 because all header rows were skipped in the for-loop already
    data = pd.read_csv(datfile, sep="\\s+", skiprows=0, names=header_list)
    # TODO: TOC_flag=A while TOC=-999 (so TOC_flag should be M)
    
    # set dates index
    dates_str = data.iloc[:,0:3].astype(str).agg("-".join, axis=1)
    dates_pd = dates_str.apply(lambda x: pd.Timestamp(x))
    dates_pd.name="time"
    data = data.set_index(dates_pd)
    data = data.drop(["year", "month", "m", "day", "d"], axis=1, errors="ignore")
    
    # drop _flag columns
    columns_drop = [x for x in data.columns if x.endswith("_flag")]
    data = data.drop(columns_drop, axis=1)
    
    # drop Total* columns
    columns_drop = [x for x in data.columns if x.startswith("Total")]
    data = data.drop(columns_drop, axis=1)
    
    # mask -999 values and drop all-nan columns
    # TODO: alternatively set to 0 instead
    data[data==-999] = np.nan
    columns_drop = data.columns[data.isnull().all(axis=0)]
    data = data.drop(columns_drop, axis=1)
    
    # convert to xarray dataset
    ds = data.to_xarray()
    ds["time"].encoding["units"] = "days since 1970-01-01"
    for varn in ds.data_vars:
        unit = quantity_unit_pairs[varn]
        unit = unit.strip("[").strip("]") # required for FM kernel
        # TODO: necessary to standardize units?
        attrs = dict(
            units=unit,
            locationname=locationname,
            )
        ds[varn] = ds[varn].assign_attrs(attrs)
    
    # rename variables for FM
    rename_dict = {
        "Q": "sourcesink_discharge",
        "NO3": "sourcesink_tracerNO3Delta",
        }
    ds = ds.rename_vars(rename_dict)
    return ds


gdf_index = get_emorid_station_gdf()
bool_riv = gdf_index["type"] == "Rivers"
fig, ax = plt.subplots()
gdf_index.loc[~bool_riv].plot(ax=ax, color="g", marker=".")
gdf_index.loc[bool_riv].plot(ax=ax, color="red", marker=".")
dfmt.plot_coastlines()
# TODO: at least Portugal coverage does not correspond to fig 1.1 from database description (different version?)


# Lake IJssel: 'EMORID_CS_2025_06_06/NL/2025/Rivers/Lake_IJssel_1990_2024.dat'
# gdf_index_sel = gdf_index.loc[[1482]]
# Faroe Islands
# lon_min, lat_min, lon_max, lat_max = -2.5, 59.5, 0.0, 61.5
# gdf_index_sel = gdf_index.loc[bool_riv].clip((lon_min, lat_min, lon_max, lat_max))
# # Portugal rivers
# lon_min, lat_min, lon_max, lat_max = -7.5, 36.3, -6, 37.5
# gdf_index_sel = gdf_index.loc[bool_riv].clip((lon_min, lat_min, lon_max, lat_max))
# Portugal first river
gdf_index_sel = gdf_index.loc[[782]]

dir_model = r"c:\DATA\checkouts\dfm_tools\docs\notebooks\Portugal_2D_model"
os.chdir(dir_model)

file_ext_new = "Portugal_new.ext"
file_ext_new_waq = "Portugal_new_WAQ.ext"
file_riv_bc = "EMORID_v1_selection.bc"
file_mdu = "Portugal.mdu"

ext_new = hcdfm.ExtModel(file_ext_new)
ext_new.filepath = file_ext_new_waq
forcingmodel = hcdfm.ForcingModel()
# setting the filepath is required for the discharge keyword to be set in exfile (otherwise None, so skipped)
forcingmodel.filepath = file_riv_bc
for irow, gdf_row in gdf_index_sel.iterrows():
    # retrieve data for single station
    ds = get_station_data(gdf_row)
    locationname = ds["sourcesink_discharge"].attrs["locationname"]
    
    # plot
    fig, ax = plt.subplots()
    ds.to_pandas().plot(ax=ax)
    ax.set_title(gdf_row["country"] + ": " + locationname)
    
    # convert to hydrolib-core TimeSeries object (can be appended to ForcingModel object)
    hcdfm_timeseries = dfmt.Dataset_to_TimeSeries(ds)
    forcingmodel.forcing.append(hcdfm_timeseries)

    # # convert to hydrolib-core ForcingModel object (and bc-file)
    # import xarray as xr
    # from dfm_tools.hydrolib_helpers import get_ncbnd_construct
    # ncbnd_construct = get_ncbnd_construct()
    # dimn_station = ncbnd_construct["dimn_point"]
    # # TODO: currently "node", but the dimension name will change with an update in the FM kernel
    # ds_allstats = ds.expand_dims(dimn_station)
    # ds_allstats["station_id"] = xr.DataArray([locationname], dims=dimn_station)
    # forcingmodel = dfmt.plipointsDataset_to_ForcingModel(ds_allstats)
    
    source_obj = hcdfm.SourceSink(
        id=locationname,
        name="discharge_salinity_temperature_sorsin",
        numcoordinates=1,
        xcoordinates=[gdf_row.geometry.x],
        ycoordinates=[gdf_row.geometry.y],
        discharge=forcingmodel,
        # salinityDelta=forcingmodel, # TODO: missing salinity and temperature
        # temperatureDelta=forcingmodel,
        # tracerNO3Delta=forcingmodel, # TODO: hcdfm.SourceSink() doet not support tracer/WAQ (like tracerNO3Delta)
        )
    ext_new.sourcesink.append(source_obj)

forcingmodel.save()
ext_new.save()
mdu = hcdfm.FMModel(file_mdu, recurse=False) # TODO: fails with recurse=True if sourcesink already added
mdu.external_forcing.extforcefilenew = ext_new
mdu.save()

Gives the geodataframe index:
Image

And an example of the data:
Image

Todo EMORID data:

  • final data format or changes possible?
  • fixed amount of header columns?
  • actual header column always starts with "year", month/m and day/d are mixed
  • Lake_IJssel has TOC_flag=A while TOC=-999 (so TOC_flag should be M?)
  • how to retrieve the river/dd name properly?
  • README.txt on https://dataverse.nioz.nl/dataset.xhtml?persistentId=doi:10.25850/nioz/7b.b.th speaks of V3, but V2 and V3 are not available on the download page
  • Portugal has less datapoints as in figure in database description, maybe because V1 instead of V3?

Todo code:

  • Add support for multiple quantities in dfmt.plipointsDataset_to_ForcingModel #1266
  • are there river discharge+WAQ datasets with global coverage (preferrably from a global public datasource)? GRDC has no API and no coastal points. Swedish SMHI EHYPE dataset? GRADES-HYDRODL at reachhydro.org
  • add functions to: download, creating gdf_index and getting data for subset, consider merging into observations.py
  • convert to source/sink bc files (with dfmt.plipointsDataset_to_ForcingModel())
  • hcdfm.SourceSink() only has no forcingfile attr like hcdfm.Boundary(), but discharge/salinitydelta/etc, so no support for WAQ, also not in FM kernel?
  • convert units from per day to something else
  • add writing of pli and ext files?
  • add to modelbuilder notebook (add Portugal example, for instance)
  • add temperature and salinity (from E-HYPE climatology)
  • add test
  • update whatsnew
  • use this in current OSPAR EMORID script that converts river discharges from EMORID csv/dat sourcefiles to bc-files: p:\11209731-002-ospar-nutrients\scripts\1-pre-processing\river_inputs\Rivers_DD_bc_EMORID_2025.ipynb

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions