diff --git a/dev/conda-requirements.yml b/dev/conda-requirements.yml index 267580a5..622b6107 100644 --- a/dev/conda-requirements.yml +++ b/dev/conda-requirements.yml @@ -13,6 +13,7 @@ dependencies: - matplotlib # Data / parallel + - joblib - xarray - dask - distributed diff --git a/hlavo/__init__.py b/hlavo/__init__.py index 7c2ea8fc..e733ce4d 100644 --- a/hlavo/__init__.py +++ b/hlavo/__init__.py @@ -1,11 +1,11 @@ # DRAFT """Top-level HLAVO package exports.""" -from . import composed -from . import deep_model -from . import kalman -from . import soil_parflow -from . import soil_py +# from . import composed +# from . import deep_model +# from . import kalman +# from . import soil_parflow +# from . import soil_py __all__ = [ "soil_py", diff --git a/hlavo/composed/__init__.py b/hlavo/composed/__init__.py index 949fff68..87090688 100644 --- a/hlavo/composed/__init__.py +++ b/hlavo/composed/__init__.py @@ -1,12 +1,14 @@ # DRAFT """Exports for composed.""" -from .composed_model_mock import Model1D, Model3D, setup_models +from .composed_model_mock import setup_models +from .model_1d import Model1D +from .model_3d import Model3D from .run_map import main __all__ = [ "main", "setup_models", "Model1D", - "Model3D", + "Model3D" ] diff --git a/hlavo/composed/composed_model_mock.py b/hlavo/composed/composed_model_mock.py index 73c5239c..cdb49178 100644 --- a/hlavo/composed/composed_model_mock.py +++ b/hlavo/composed/composed_model_mock.py @@ -21,156 +21,57 @@ keep relation to the date time series of the measurements. """ +import os import sys import argparse -import numpy as np from pathlib import Path +import numpy as np from dask.distributed import Client, LocalCluster, get_client, Queue -from hlavo.kalman.kalman import KalmanFilter -from hlavo.ingress.moist_profile.load_data import load_data - - -# --------------------------------------------------------------------------- -# 1D model class -# --------------------------------------------------------------------------- - -class Model1D: - def __init__(self, idx, initial_state=0.0, work_dir=None, kalman_config={}): - self.idx = idx - self.state = initial_state - - self.kalman = KalmanFilter.from_config(work_dir, kalman_config, verbose=False) - self.ukf = self._prepare_kalman_measurements() - - #kalman_filter.run() # load measurements - - def prepare_kalman_measurements(self): - noisy_measurements, noisy_measurements_to_test, meas_model_iter_flux, measurement_state_flag = load_data( - self.kalman.train_measurements_struc, - self.kalman.test_measurements_struc, - data_csv=self.kalman.measurements_config["measurements_file"], - measurements_config=self.kalman.measurements_config - ) - - precipitation_list = [] - for (time_prec, precipitation) in meas_model_iter_flux: - precipitation_list.extend([precipitation] * time_prec) - self.kalman.measurements_config["precipitation_list"] = precipitation_list - - noisy_measurements, noisy_measurements_to_test, measurement_state_flag_sampled, meas_model_iter_time, meas_model_iter_flux = \ - self.kalman.process_loaded_measurements(noisy_measurements, noisy_measurements_to_test, measurement_state_flag) - - sample_variance = np.nanvar(noisy_measurements, axis=0) - measurement_noise_covariance = np.diag(sample_variance) - - self.kalman.results.times_measurements = np.cumsum(meas_model_iter_time) - self.kalman.results.precipitation_flux_measurements = meas_model_iter_flux +from hlavo.composed.model_1d import Model1D +from hlavo.composed.model_3d import Model3D - return self.set_kalman_filter(measurement_noise_covariance) +### +# Process paths +### - def step(self, target_time, data_for_step): - print(f"[1D {self.idx}] step at t={target_time}, " - f"data={data_for_step}, current_state={self.state}") - self.state += data_for_step - print(f"[1D {self.idx}] new state={self.state}") - return self.state +def relative_to_absolute_paths(config_dict, base_dir: Path): + if isinstance(config_dict, dict): + for k, v in config_dict.items(): + if isinstance(v, str) and k.endswith(("_file", "_path", "_dir")): + p = Path(v) + if not p.is_absolute(): + config_dict[k] = (base_dir / p).resolve() + else: + relative_to_absolute_paths(v, base_dir) - def run_loop(self, t_end, queue_name_in, queue_name_out): - """ - Input queue processing loop for the 1D model. - """ - #client = get_client() - q_in = Queue(queue_name_in) # correct API - q_out = Queue(queue_name_out) + elif isinstance(config_dict, list): + for i, v in enumerate(config_dict): + if isinstance(v, str): + # optional: only resolve if needed + p = Path(v) + if not p.is_absolute(): + config_dict[i] = (base_dir / p).resolve() + else: + relative_to_absolute_paths(v, base_dir) - current_time = 0.0 + return config_dict - while current_time < t_end: - target_time, data = q_in.get() # blocks - contribution = self.step(target_time, data) # # call kalman, meas - neni ve fronte, musi se odnekud nacist - q_out.put((self.idx, target_time, contribution)) - print(f"[1D {self.idx}] sent contribution={contribution} at t={target_time}") - current_time = target_time - print(f"[1D {self.idx}] finished loop at t={current_time} (t_end={t_end})") - return f"1D model {self.idx} done; final state={self.state}" - -def model1d_worker_entry(idx, t_end, queue_name_in, queue_name_out, work_dir, kalman_config): +def model1d_worker_entry(idx, start_datetime, end_datetime, queue_name_in, queue_name_out, work_dir, model_kalman_config_dict, seed=123): """ Entry function running on a Dask worker. """ - model = Model1D(idx=idx, initial_state=0.0, work_dir=work_dir, kalman_config=kalman_config) - return model.run_loop(t_end, queue_name_in, queue_name_out) - - -# --------------------------------------------------------------------------- -# 3D model class -# --------------------------------------------------------------------------- - -class Model3D: - def __init__(self, n_1d, initial_state=0.0, initial_time=0.0, base_dt=1.0): - self.n_1d = n_1d - self.state = initial_state - self.time = initial_time - self.base_dt = base_dt - - def choose_dt(self, t_end): - remaining = t_end - self.time - return max(min(self.base_dt, remaining), 0.0) - - def step(self, target_time, contributions): - print(f"[3D] step to t={target_time}, " - f"current_state={self.state}, contributions={contributions}") - total_contrib = sum(contributions) - self.state += total_contrib - self.time = target_time - print(f"[3D] new state={self.state}") - return self.state - - def run_loop(self, t_end, queue_names_out_to_1d, queue_name_in_from_1d): - client = get_client() - q_3d_to_1d = [Queue(name) for name in queue_names_out_to_1d] - q_1d_to_3d = Queue(queue_name_in_from_1d) - - while self.time < t_end: - dt = self.choose_dt(t_end) - if dt <= 0.0: - print("[3D] dt <= 0, stopping to avoid infinite loop.") - break - - target_time = self.time + dt - print(f"\n[3D] === Step: t={self.time} -> t={target_time} ===") - print(f"[3D] current state={self.state}") - - # send to 1D - for i in range(self.n_1d): - data_for_i = self.state + i # dummy placeholder - print(f"[3D] sending to 1D {i}: t={target_time}, data={data_for_i}") - q_3d_to_1d[i].put((target_time, data_for_i)) - - # receive contributions - contributions = [None] * self.n_1d - received = 0 - - while received < self.n_1d: - idx, t_recv, contrib = q_1d_to_3d.get() - print(f"[3D] received from 1D {idx}: t={t_recv}, contrib={contrib}") - contributions[idx] = contrib - received += 1 - - self.step(target_time, contributions) - - print(f"[3D] finished time loop at t={self.time} (t_end={t_end}), state={self.state}") - return self.state + model = Model1D(site_id=idx, initial_state=0.0, work_dir=work_dir, model_kalman_config_dict=model_kalman_config_dict, seed=seed) + return model.run_loop(start_datetime, end_datetime, queue_name_in, queue_name_out) # --------------------------------------------------------------------------- # Setup function # --------------------------------------------------------------------------- -def setup_models(n_1d, t_end, work_dir, kalman_config): +def setup_models(work_dir, config_dir, start_datetime, end_datetime, deep_model_config): client = get_client() queue_names_3d_to_1d = [] @@ -179,26 +80,36 @@ def setup_models(n_1d, t_end, work_dir, kalman_config): queue_name_1d_to_3d = "q-1d-to-3d" Queue(queue_name_1d_to_3d, client=client) # ensure creation - for i in range(n_1d): - q_name_3d_to_1d = f"q-3d-to-1d-{i}" + for i, model_1d_config in enumerate(deep_model_config["1d_models"]): + model_1d_config_path = config_dir / Path(model_1d_config) + model_kalman_config_dict = load_config(model_1d_config_path) + + model_kalman_config_dict = relative_to_absolute_paths(model_kalman_config_dict, config_dir) + + model_work_dir = os.path.join(work_dir, "model_1d_{}".format(i+1)) + print("model_work_dir ", model_work_dir) + os.makedirs(model_work_dir, exist_ok=True) + + q_name_3d_to_1d = f"q-3d-to-1d-{i+1}" Queue(q_name_3d_to_1d, client=client) # ensure creation queue_names_3d_to_1d.append(q_name_3d_to_1d) fut = client.submit( model1d_worker_entry, - i, - t_end, + i+1, + start_datetime, end_datetime, q_name_3d_to_1d, - queue_name_1d_to_3d, work_dir, kalman_config, + queue_name_1d_to_3d, model_work_dir, model_kalman_config_dict, seed=deep_model_config["seed"], pure=False, ) futures_1d.append(fut) print(f"[SETUP] Submitted Model1D idx={i}") - model_3d = Model3D(n_1d=n_1d) + + model_3d = Model3D(n_1d=i+1) final_state_3d = model_3d.run_loop( - t_end, + start_datetime, end_datetime, queue_names_out_to_1d=queue_names_3d_to_1d, queue_name_in_from_1d=queue_name_1d_to_3d, ) @@ -214,23 +125,39 @@ def setup_models(n_1d, t_end, work_dir, kalman_config): # Main # --------------------------------------------------------------------------- +def load_config(config_path): + import yaml + with config_path.open("r") as f: + config_dict = yaml.safe_load(f) + return config_dict + if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('work_dir', help='Path to work dir') parser.add_argument('config_file', help='Path to configuration file') + args = parser.parse_args(sys.argv[1:]) work_dir = Path(args.work_dir) - kalman_config = Path(args.config_file).resolve() + composed_config_path = Path(args.config_file).resolve() + config_dir = composed_config_path.parent + + composed_config = load_config(composed_config_path) cluster = LocalCluster(n_workers=4, threads_per_worker=1) client = Client(cluster) - n_1d = 3 - t_end = 5.0 + start_datetime = np.datetime64(composed_config["start_datetime"]) + end_datetime = np.datetime64(composed_config["end_datetime"]) + + print("start date time ", start_datetime) + print("end date time ", end_datetime) + + if end_datetime <= start_datetime: + raise ValueError("end-datetime must be after start-datetime") - final_state = setup_models(n_1d, t_end, work_dir, kalman_config) + final_state = setup_models(work_dir, config_dir, start_datetime, end_datetime, composed_config) print("\n[MAIN] Final 3D state:", final_state) client.close() diff --git a/hlavo/composed/model_1d.py b/hlavo/composed/model_1d.py new file mode 100644 index 00000000..e25df6ac --- /dev/null +++ b/hlavo/composed/model_1d.py @@ -0,0 +1,269 @@ +import numpy as np +from dask.distributed import Queue +from hlavo.kalman.kalman import KalmanFilter +from hlavo.ingress.moist_profile.load_zarr_data import load_measurments_data, load_meteo_data +from hlavo.composed.data_1d_to_3d import Data1DTo3D + +# --------------------------------------------------------------------------- +# 1D model class +# --------------------------------------------------------------------------- + +class Model1D: + def __init__(self, site_id, initial_state=0.0, work_dir=None, model_kalman_config_dict=None, seed=None): + """ + Initialize the 1D model instance for a specific site. + + :param int site_id: Identifier of the site used for selecting measurements. + :param float initial_state: Initial state value for the model. + :param Path work_dir: Working directory used by the Kalman filter. + :param dict model_kalman_config_dict: Configuration dictionary for Kalman filter setup. + :param int seed: Random seed for reproducibility. + """ + + self.site_id = site_id + self.state = initial_state + self.measurements_dataset = None + + # Initialize Kalman filter from configuration + self.kalman = KalmanFilter.from_config( + work_dir, + config_dict=model_kalman_config_dict, + verbose=False, + seed=seed + ) + + # Load meteorological data required for model forcing + self._get_meteo_data() + + # Prepare measurement dataset and initialize UKF + self.ukf = self._prepare_kalman_measurements() + + def _get_meteo_data(self): + """ + Load and preprocess meteorological dataset required by the model. + :return: None + """ + # Load meteorological dataset from configured scheme file + meteo_dataset = load_meteo_data(self.kalman.measurements_config["meteo_scheme_file"]) + + self.meteo_dataset = meteo_dataset.sel(site_id=self.site_id) + + # # TEMPORARY: Shift year to 2025 to align with measurement dataset + # # TODO: Remove once datasets are temporally consistent + # import pandas as pd + # + # orig_date_time = pd.to_datetime(self.meteo_dataset["date_time"].values) + # + # self.meteo_dataset = self.meteo_dataset.assign_coords( + # date_time=pd.to_datetime({ + # "year": 2025, + # "month": orig_date_time.month, + # "day": orig_date_time.day, + # "hour": orig_date_time.hour, + # "minute": orig_date_time.minute, + # "second": orig_date_time.second, + # }) + # ) + + print("self.meteo_dataset ", self.meteo_dataset) + + def _prepare_kalman_measurements(self): + """ + Load measurement dataset, filter it for the current site, and initialize the Kalman filter. + + :return: Configured Unscented Kalman Filter (UKF) instance. + """ + # Load measurement dataset from scheme file + measurements_xarray = load_measurments_data( + scheme_file=self.kalman.measurements_config["measurements_scheme_file"] + ) + + # Select data for the specific site + self.measurements_dataset = measurements_xarray.sel(site_id=self.site_id) + + # TODO: Define proper measurement covariance matrix (R) + kalman_R_matrix = Model1D.calculate_kalman_R_matrix(len(self.measurements_dataset["depth_level"])) + + # Initialize and return configured Kalman filter + return self.kalman.set_kalman_filter(kalman_R_matrix) + + def get_dataset_values_for_time(self, dataset, start_time: np.datetime64, stop_time: np.datetime64): + """ + Retrieve dataset part within a specified time window. + + :param numpy.datetime64 start_time: Inclusive start of the time window. + :param numpy.datetime64 stop_time: Inclusive end of the time window. + :return: Subset of the measurement dataset for the selected time range. + """ + return dataset.sel(date_time=slice(start_time, stop_time)) + + + # def _get_mock_meteo_data(self): + # import pandas as pd + # import xarray as xr + # n_loc = 1 + # precipitation_flux = -0.0166 + # time_step = pd.Timedelta(hours=0.025) + # lon = [14.41854] * n_loc + # lat = [50.073658] * n_loc + # start_date = "2025-03-06" + # end_date = "2025-03-09" + # time = pd.date_range(start=start_date, end=end_date, freq=pd.Timedelta(hours=1)) + # + # meteo_data = xr.Dataset( + # data_vars=dict( + # surface_solar_radiation_downwards=(["loc", "date_time"], np.zeros((n_loc, time.size))), + # surface_thermal_radiation_downwards=(["loc", "date_time"], np.zeros((n_loc, time.size))), + # precipitation_amount_accum=(["loc", "date_time"], precipitation_flux * np.ones((n_loc, time.size))), + # air_temperature_2m=(["loc", "date_time"], 300 * np.ones((n_loc, time.size))), + # wind_speed_10m=(["loc", "date_time"], np.zeros((n_loc, time.size))), + # wind_from_direction_10m=(["loc", "date_time"], np.zeros((n_loc, time.size))), + # air_pressure_at_sea_level=(["loc", "date_time"], 1e5 * np.ones((n_loc, time.size))), + # relative_humidity_2m=(["loc", "date_time"], np.zeros((n_loc, time.size))), + # ), + # coords=dict( + # lon=("loc", lon), + # lat=("loc", lat), + # date_time=time, + # ), + # ) + # + # return meteo_data + + # def get_meteo_for_lon_lat_time(self, start_time, stop_time, longitude, latitude): + # """ + # Retrieve meteorological data for a given location and time window. + # + # :param numpy.datetime64 start_time: Inclusive start of the time window. + # :param numpy.datetime64 stop_time: Inclusive end of the time window. + # :param float longitude: Target longitude. + # :param float latitude: Target latitude. + # :return: Meteorological dataset subset for nearest grid point and selected time range. + # """ + # + # # Select nearest grid point to given coordinates + # meteo_lon_lat_dset = self.meteo_dataset.sel( + # latitude=latitude, + # longitude=longitude, + # method="nearest" + # ) + # + # #meteo_lon_lat_dset = self._get_mock_meteo_data() #@TODO: Use real meteo data ASAP + # + # # Slice dataset by time range + # return meteo_lon_lat_dset.sel(date_time=slice(start_time, stop_time)) + + def get_long_lat(self, target_time: np.datetime64) -> tuple[float, float]: + """ + Retrieve longitude and latitude for the site at the specified time. + + :param numpy.datetime64 target_time: Timestamp for which the site coordinates should be retrieved. + :return: Tuple containing (longitude, latitude) in decimal degrees. + """ + + # Select measurement record at given time + meas_at_target_time = self.measurements_dataset.sel(date_time=target_time) + + # Extract scalar longitude and latitude values + longitude = meas_at_target_time.longitude.compute().item() + latitude = meas_at_target_time.latitude.compute().item() + + return longitude, latitude + + def step(self, start_time, target_time, pressure_at_bottom): + """ + Advance the 1D model state from start_time to target_time. + + :param numpy.datetime64 start_time: Start of the integration window. + :param numpy.datetime64 target_time: Target time to which the model advances. + :param float pressure_at_bottom: Boundary condition (pressure head at bottom). + :return: Data1DTo3D object containing updated state contribution for 3D model. + """ + print(f"[1D {self.site_id}] step at t={target_time}") + + # Retrieve spatial coordinates of the site at the target time + longitude, latitude = self.get_long_lat(target_time) + + # Retrieve measurement data within the time window + measurements = self.get_dataset_values_for_time(self.measurements_dataset, start_time, target_time) + # Retrieve meteo data within the time window + meteo = self.get_dataset_values_for_time(self.meteo_dataset, start_time, target_time) + + # Retrieve meteorological data aligned to location and time window + #meteo = self.get_meteo_for_lon_lat_time(start_time, target_time, longitude, latitude) + + darcy_velocity = None + + # Perform Kalman update only if measurements are available + if len(measurements) > 0: + darcy_velocity = self.kalman.kalman_step( + self.ukf, + measurements, + meteo, + pressure_at_bottom + ) + + # Package result for downstream 3D model + return Data1DTo3D( + date_time=target_time, + site_id=self.site_id, + longitude=longitude, + latitude=latitude, + velocity=darcy_velocity + ) + + def run_loop(self, t_start, t_end, queue_name_in, queue_name_out): + """ + Execute the main processing loop for the 1D model using input/output queues. + + :param numpy.datetime64 t_start: Initial simulation time. + :param numpy.datetime64 t_end: Final simulation time. + :param str queue_name_in: Name of the input queue receiving 3D→1D data. + :param str queue_name_out: Name of the output queue sending 1D→3D data. + :return: Completion message with final state. + """ + + # Initialize input/output queues + q_in = Queue(queue_name_in) + q_out = Queue(queue_name_out) + + current_time = t_start + + # Main time-stepping loop + while current_time < t_end: + # Blocking call: waits for next 3D→1D message + target_time, data_to_1d = q_in.get() + + print("current time:{}, target time: {}".format(current_time, target_time)) + + # Ensure data consistency (message belongs to this site) + assert self.site_id == data_to_1d.site_id + + # Perform one model step using received boundary condition + data_to_3d = self.step( + current_time, + target_time, + data_to_1d.pressure_head + ) + + # Send result back to 3D model + q_out.put(data_to_3d) + + # Advance internal time + current_time = target_time + + # Persist Kalman filter results after loop completion + self.kalman.save_results() + + print(f"[1D {self.idx}] finished loop at t={current_time} (t_end={t_end})") + + return f"1D model {self.idx} done; final state={self.state}" + + + @staticmethod + def calculate_kalman_R_matrix(num_meas_per_probe) -> np.ndarray: + """ + Construct the measurement noise covariance matrix R for Kalman + :return: Diagonal covariance matrix of size (num measurements depths, num measurements depths). + """ + return np.eye(num_meas_per_probe) diff --git a/hlavo/composed/model_3d.py b/hlavo/composed/model_3d.py new file mode 100644 index 00000000..a6af7c65 --- /dev/null +++ b/hlavo/composed/model_3d.py @@ -0,0 +1,93 @@ +import numpy as np +from dask.distributed import get_client, Queue +from datetime import datetime, timedelta +from hlavo.composed.data_3d_to_1d import Data3DTo1D + +# --------------------------------------------------------------------------- +# 3D model class +# --------------------------------------------------------------------------- + +class Model3D: + def __init__(self, n_1d, initial_state=0.0, initial_time=0.0, base_dt=timedelta(minutes=120)): + self.n_1d = n_1d + self.state = initial_state + #self.time = initial_time + self.base_dt = base_dt + + # def choose_dt(self, current_time, t_end): + # remaining = t_end - current_time + # return max(min(self.base_dt, remaining), 0.0) + + def choose_dt(self, current_time, t_end): + remaining = t_end - current_time + + if remaining <= timedelta(0): + return timedelta(0) + + return min(self.base_dt, remaining) + + def step(self, target_time, contributions): + #@TODO: how to handle this method? + print(f"[3D] step to t={target_time}, " + f"current_state={self.state}, contributions={contributions}") + print("contributions ", contributions) + + #total_contrib = sum(contributions) + #print("total contribution ", total_contrib) + #self.state += total_contrib + #self.time = target_time + #print(f"[3D] new state={self.state}") + return self.state + + def run_loop(self, start_datetime, end_datetime, queue_names_out_to_1d, queue_name_in_from_1d): + client = get_client() + q_3d_to_1d = [Queue(name) for name in queue_names_out_to_1d] + q_1d_to_3d = Queue(queue_name_in_from_1d) + + current_time = start_datetime + + print("current time ", current_time) + print("end date time ", end_datetime) + + print("current time ", type(current_time)) + print("end date time ", type(end_datetime)) + + while current_time < end_datetime: + dt = self.choose_dt(current_time, end_datetime) + print("dt ", dt) + if dt <= timedelta(minutes=0): + print("[3D] dt <= 0, stopping to avoid infinite loop.") + break + + dt_np = np.timedelta64(int(dt.total_seconds()), "s") + + target_time = current_time + dt_np + print("type(target_time) ", type(target_time)) + print(f"\n[3D] === Step: t={current_time} -> t={target_time} ===") + print(f"[3D] current state={self.state}") + + # send to 1D + for i in range(self.n_1d): + data_for_i = self.state + i + 1 # dummy placeholder + print(f"[3D] sending to 1D {i+1}: t={target_time}, data={data_for_i}") + data_to_1d = Data3DTo1D(site_id=i+1, date_time=target_time, pressure_head=data_for_i) + q_3d_to_1d[i].put((target_time, data_to_1d)) + + # receive contributions + contributions = [None] * self.n_1d + received = 0 + + while received < self.n_1d: + idx, t_recv, contrib = q_1d_to_3d.get() + print(f"[3D] received from 1D {idx}: t={t_recv}, contrib={contrib}") + + if contrib is not None: + contrib_velocity, contrib_long, contrib_lat = contrib + contributions[idx] = contrib_velocity + received += 1 + + self.step(target_time, contributions) + current_time = target_time + + print(f"[3D] finished time loop at t={current_time} (t_end={end_datetime}), state={self.state}") + return self.state diff --git a/hlavo/ingress/moist_profile/load_data.py b/hlavo/ingress/moist_profile/load_data.py index 1a594f81..6e186395 100644 --- a/hlavo/ingress/moist_profile/load_data.py +++ b/hlavo/ingress/moist_profile/load_data.py @@ -33,7 +33,7 @@ def get_measurements(time_step, measurements_struc, probe_data): for measurement_name, measure_obj in measurements_struc.items(): measurements = [] for zp in measure_obj.z_pos: - measurements.append(probe_data[pos_to_index[zp]][time_step]) + measurements.append(probe_data[pos_to_index[zp]].iloc[time_step]) measurements_dict[measurement_name] = measurements return measurements_dict @@ -193,6 +193,8 @@ def load_data(train_measurements_struc, test_measurements_struc, data_csv, measu data = preprocess_data(data[min_idx:max_idx]) + timestamps = data["DateTime"].to_numpy() + measurement_state_flag = [] if "State" in data: measurement_state_flag = data["State"].to_numpy() @@ -217,4 +219,4 @@ def load_data(train_measurements_struc, test_measurements_struc, data_csv, measu train_measurements.append(train_measurements_struc.encode(measurement_train)) test_measurements.append(test_measurements_struc.encode(measurement_test)) - return train_measurements, test_measurements, precipitations, measurement_state_flag + return train_measurements, test_measurements, precipitations, measurement_state_flag, timestamps diff --git a/hlavo/ingress/moist_profile/load_zarr_data.py b/hlavo/ingress/moist_profile/load_zarr_data.py new file mode 100644 index 00000000..a2d1df2b --- /dev/null +++ b/hlavo/ingress/moist_profile/load_zarr_data.py @@ -0,0 +1,30 @@ +import zarr_fuse as zf +from pathlib import Path + + +def load_measurments_data(scheme_file): + """ + Load measurement profile dataset from a Zarr scheme file. + :param str | Path scheme_file: Path to the measurement scheme file. + :return: Dataset containing measurement profiles. + """ + scheme_file_path = Path(scheme_file) + root = zf.open_store(scheme_file_path) + + profiles = root["Uhelna"]["profiles"] + return profiles.dataset + + +def load_meteo_data(scheme_file): + """ + Load meteorological dataset from a Zarr scheme file. + :param str | Path scheme_file: Path to the meteorological scheme file. + :return: Dataset containing meteorological data. + """ + scheme_file_path = Path(scheme_file) + root = zf.open_store(scheme_file_path) + + + meteo_data = root["Uhelna"]["parflow"]["version_01"] #root["chmi_aladin_10m"] + #meteo_data = root["parflow_input"] + return meteo_data.dataset diff --git a/hlavo/kalman/kalman.py b/hlavo/kalman/kalman.py index e9de6f48..32f583a5 100644 --- a/hlavo/kalman/kalman.py +++ b/hlavo/kalman/kalman.py @@ -12,13 +12,17 @@ # memory = Memory(location='cache_dir', verbose=10) from hlavo.kalman.kalman_result import KalmanResults from hlavo.soil_parflow.parflow_model import ToyProblem -from filterpy.kalman import MerweScaledSigmaPoints +from filterpy.kalman import MerweScaledSigmaPoints, UnscentedKalmanFilter # from soil_model.evapotranspiration_fce import ET0 from hlavo.misc.auxiliary_functions import sqrt_func, add_noise from hlavo.ingress.moist_profile.load_data import load_data from hlavo.kalman.kalman_state import StateStructure, MeasurementsStructure from hlavo.kalman.parallel_ukf import ParallelUKF import threading +from datetime import datetime +import xarray as xr +import pandas as pd + ###### # Unscented Kalman Filter for Parflow model @@ -30,17 +34,21 @@ class KalmanFilter: """High-level driver for configuring and running a UKF on a ParFlow-based model.""" @staticmethod - def from_config(workdir, config_path, verbose=False): + def from_config(workdir, config_path=None, config_dict=None, verbose=False, seed=None): """ Create a KalmanFilter from a YAML configuration file. :param workdir: Working directory where outputs are written :param config_path: Path to YAML configuration file + :param config_dict: Configuration dictionary :param verbose: Whether to print verbose runtime logs :return: Configured KalmanFilter instance """ - with config_path.open("r") as f: - config_dict = yaml.safe_load(f) + if config_dict is None: + with config_path.open("r") as f: + config_dict = yaml.safe_load(f) + if seed is not None: + config_dict["seed"] = seed return KalmanFilter(config_dict, workdir, verbose) def __init__(self, config, workdir, verbose=False): @@ -74,10 +82,11 @@ def __init__(self, config, workdir, verbose=False): self.lock = threading.Lock() # Expand rainfall schedule into a per-timestep list - precipitation_list = [] - for (time_prec, precipitation) in self.measurements_config['rain_periods']: - precipitation_list.extend([precipitation] * time_prec) - self.measurements_config["precipitation_list"] = precipitation_list + if "rain_periods" in self.measurements_config: + precipitation_list = [] + for (time_prec, precipitation) in self.measurements_config['rain_periods']: + precipitation_list.extend([precipitation] * time_prec) + self.measurements_config["precipitation_list"] = precipitation_list self.results = KalmanResults( workdir, nodes_z, self.state_struc, @@ -97,7 +106,7 @@ def _make_model(self): raise NotImplemented("Import desired class") return model_class(self.model_config, workdir=self.work_dir / "output-toy") - def process_loaded_measurements(self, noisy_measurements_train, noisy_measurements_test, measurement_state_flag): + def process_loaded_measurements(self, noisy_measurements_train, noisy_measurements_test, measurement_state_flag, timestamps): """ Align preloaded measurements with precipitation schedule and iteration grouping. @@ -114,6 +123,7 @@ def process_loaded_measurements(self, noisy_measurements_train, noisy_measuremen noisy_train_measurements = [] noisy_test_measurements = [] measurement_state_flag_sampled = [] + meas_model_iter_timestamps = [] print("len(noisy_measurements_train) ", len(noisy_measurements_train)) total_index = 0 @@ -141,16 +151,18 @@ def process_loaded_measurements(self, noisy_measurements_train, noisy_measuremen noisy_train_measurements.append(noisy_measurements_train[total_index]) noisy_test_measurements.append(noisy_measurements_test[total_index]) measurement_state_flag_sampled.append(measurement_state_flag[total_index]) + meas_model_iter_timestamps.append(datetime.fromisoformat(timestamps[total_index])) except IndexError as idxerr: print("idx_error ", idxerr) noisy_train_measurements.append(noisy_measurements_train[total_index - 1]) noisy_test_measurements.append(noisy_measurements_test[total_index - 1]) measurement_state_flag_sampled.append(measurement_state_flag[total_index - 1]) + meas_model_iter_timestamps.append(timestamps[total_index - 1]) meas_model_iter_time.append(prec_time) meas_model_iter_flux.append(prec_flux) - return noisy_train_measurements, noisy_test_measurements, measurement_state_flag_sampled, meas_model_iter_time, meas_model_iter_flux + return noisy_train_measurements, noisy_test_measurements, measurement_state_flag_sampled, meas_model_iter_time, meas_model_iter_flux, meas_model_iter_timestamps def run(self): """ @@ -164,7 +176,7 @@ def run(self): ### Generate measurements ### ############################# if "measurements_file" in self.measurements_config: - noisy_measurements, noisy_measurements_to_test, meas_model_iter_flux, measurement_state_flag = load_data( + noisy_measurements, noisy_measurements_to_test, meas_model_iter_flux, measurement_state_flag, timestamps = load_data( self.train_measurements_struc, self.test_measurements_struc, data_csv=self.measurements_config["measurements_file"], @@ -176,8 +188,11 @@ def run(self): precipitation_list.extend([precipitation] * time_prec) self.measurements_config["precipitation_list"] = precipitation_list - noisy_measurements, noisy_measurements_to_test, measurement_state_flag_sampled, meas_model_iter_time, meas_model_iter_flux = \ - self.process_loaded_measurements(noisy_measurements, noisy_measurements_to_test, measurement_state_flag) + print("precipitation_list ", len(precipitation_list)) + print("noisy measurements ", len(noisy_measurements)) + + noisy_measurements, noisy_measurements_to_test, measurement_state_flag_sampled, meas_model_iter_time, meas_model_iter_flux, meas_model_iter_timestamps = \ + self.process_loaded_measurements(noisy_measurements, noisy_measurements_to_test, measurement_state_flag, timestamps) sample_variance = np.nanvar(noisy_measurements, axis=0) measurement_noise_covariance = np.diag(sample_variance) @@ -187,6 +202,7 @@ def run(self): # Generate synthetic measurements via forward model runs measurements, noisy_measurements, measurements_to_test, noisy_measurements_to_test, \ state_data_iters, meas_model_iter_time, meas_model_iter_flux = self.generate_measurements() + measurement_state_flag_sampled = [] measurement_state_flag = [] # No state flag for synthetic data residuals = noisy_measurements - measurements @@ -201,6 +217,7 @@ def run(self): self.results.precipitation_flux_measurements = meas_model_iter_flux + ####################################### ### UKF settings: sigma points, Q/R ### ####################################### @@ -356,7 +373,7 @@ def get_measurement(self, current_time_step, measurements_struct): ##################### ### Kalman filter ### ##################### - def state_transition_function(self, state_vec, dt, iter_duration, precipitation_flux): + def state_transition_function(self, state_vec, dt, model_num_iters, precipitation_flux=None, met_data=None): """ UKF state transition function: advance model state over one iteration. @@ -364,12 +381,14 @@ def state_transition_function(self, state_vec, dt, iter_duration, precipitation_ :param state_vec: Encoded current state vector :param dt: UKF dt parameter (not used directly when iter_duration provided) - :param iter_duration: Physical duration of this iteration (model time) + :param model_num_iters: number of model iterations :param precipitation_flux: Precipitation flux applied during this iteration + :param met_data: xarray dataset of meteo data :return: Encoded next state vector """ - print("dt: ", dt, "iter duration time: ", iter_duration) + print("dt: ", dt, "number of model iterations: ", model_num_iters) print("process PID:", os.getpid(), "thread:", threading.get_ident()) + pid = os.getpid() timestamp = int(time.time()) if os.environ.get("SCRATCHDIR"): @@ -383,23 +402,30 @@ def state_transition_function(self, state_vec, dt, iter_duration, precipitation_ pressure_data = state["pressure_field"] et_per_time = 0 # placeholder for ET computation - iter_duration = float(iter_duration) + model_num_iters = float(model_num_iters) - self.model.run( - init_pressure=pressure_data, precipitation_value=precipitation_flux, - state_params=state, start_time=0, stop_time=iter_duration, - time_step=self.kalman_config["model_time_step"], - working_dir=parflow_working_dir - ) + if met_data is not None: + self.model.run( + init_pressure=pressure_data, met_data=met_data, + state_params=state, working_dir=parflow_working_dir + ) + else: + assert precipitation_flux is not None + self.model.run( + init_pressure=pressure_data, precipitation_value=precipitation_flux, + state_params=state, start_time=0, stop_time=model_num_iters, + time_step=self.kalman_config["model_time_step"], + working_dir=parflow_working_dir + ) - state["pressure_field"] = self.model.get_data(current_time_step=iter_duration, data_name="pressure") + state["pressure_field"] = self.model.get_data(current_time_step=model_num_iters, data_name="pressure") - velocity = self.model.get_data(current_time_step=iter_duration, data_name="velocity") - moisture = self.model.get_data(current_time_step=iter_duration, data_name="moisture") + velocity = self.model.get_data(current_time_step=model_num_iters, data_name="velocity") + moisture = self.model.get_data(current_time_step=model_num_iters, data_name="moisture") - measurements_train = self.get_measurement(current_time_step=iter_duration, + measurements_train = self.get_measurement(current_time_step=model_num_iters, measurements_struct=self.train_measurements_struc) - measurements_test = self.get_measurement(current_time_step=iter_duration, + measurements_test = self.get_measurement(current_time_step=model_num_iters, measurements_struct=self.test_measurements_struc) new_state_vec = self.state_struc.encode_state(state) @@ -494,11 +520,18 @@ def set_kalman_filter(self, measurement_noise_covariance): time_step = 1 # UKF internal dt (hours) — physical duration passed via kwargs - ukf = ParallelUKF( - dim_x=num_state_params, dim_z=dim_z, dt=time_step, - fx=self.state_transition_function, hx=self.measurement_function, - points=sigma_points - ) + if "parallel_sigmas" in self.kalman_config and self.kalman_config["parallel_sigmas"]: + ukf = ParallelUKF( + dim_x=num_state_params, dim_z=dim_z, dt=time_step, + fx=self.state_transition_function, hx=self.measurement_function, + points=sigma_points + ) + else: + ukf = UnscentedKalmanFilter( + dim_x=num_state_params, dim_z=dim_z, dt=time_step, + fx=self.state_transition_function, hx=self.measurement_function, + points=sigma_points + ) Q_state = self.state_struc.compose_Q() ukf.Q = Q_state @@ -529,6 +562,389 @@ def set_kalman_filter(self, measurement_noise_covariance): return ukf + + def align_meteo_to_measurements(self, meteo_ds: xr.Dataset, meas_ds: xr.Dataset) -> xr.Dataset: + """ + Align meteorological dataset to measurement time grid. + + Performs: + - Linear interpolation of continuous variables + - Handling of accumulated variables + (convert to rate → interpolate → re-accumulate) + + :param meteo_ds: Meteorological dataset with coarse time resolution (e.g. hourly) + :param meas_ds: Measurement dataset defining target time grid (e.g. 15-min) + :return: Meteo dataset aligned to measurement time grid + """ + # Extract target time coordinate + target_time = meas_ds["date_time"] + + # Define variable groups + continuous_vars = [ + "air_pressure_at_sea_level", + "surface_temperature", + "wind_from_direction_10m", + "wet_bulb_temperature_2m", + "cloud_fraction", + "cloud_fraction_low", + "cloud_fraction_medium", + "cloud_fraction_high", + "surface_direct_solar_radiation_downwards", + ] + + accum_vars = [ + "precipitation_amount_accum", + "snowfall_amount_accum", + ] + + # Keep only variables present in dataset + continuous_vars = [v for v in continuous_vars if v in meteo_ds] + accum_vars = [v for v in accum_vars if v in meteo_ds] + + # ------------------------------------------------------------------ + # Interpolate continuous variables + # ------------------------------------------------------------------ + print("target time ", target_time) + meteo_cont_interp = meteo_ds[continuous_vars].interp(date_time=target_time) + + print("meteo_cont_interp.date_time.values ", meteo_cont_interp.date_time.values) + + # ------------------------------------------------------------------ + # Process accumulated variables + # ------------------------------------------------------------------ + meteo_accum_interp = [] + + print("meteo_ds.date_time ", meteo_ds.date_time) + + if accum_vars: + rate_ds = {} + for var in accum_vars: + diff = meteo_ds[var].diff("date_time") + + dt = ( + meteo_ds["date_time"].diff("date_time") + / np.timedelta64(1, "s") + ) + + # ensure identical coordinates + dt = dt.assign_coords(date_time=diff["date_time"]) + + rate = diff / dt + + # create first timestamp explicitly + first_time = meteo_ds["date_time"].values[0] + + first_value = rate.isel(date_time=0) + + first_value = first_value.assign_coords( + date_time=first_time + ) + + rate = xr.concat([first_value, rate], dim="date_time") + + rate_ds[var] = rate + + rate_ds = xr.Dataset(rate_ds) + + print("rate ds", rate_ds.date_time) + + # Interpolate rate to target time grid + rate_interp = rate_ds.interp(date_time=target_time) + + time = target_time.values + + dt_seconds = np.diff(time) / np.timedelta64(1, "s") + + dt_target = xr.DataArray( + np.concatenate(([dt_seconds[0]], dt_seconds)), + dims=["date_time"], + coords={"date_time": time} + ) + + accum_interp = {} + + for var in accum_vars: + # Reconstruct accumulation from interpolated rate + accum = (rate_interp[var] * dt_target).cumsum("date_time") + # Ensure no NaNs at start + accum = accum.fillna(0) + accum_interp[var] = accum + + meteo_accum_interp = xr.Dataset(accum_interp) + + # ------------------------------------------------------------------ + # Merge continuous and accumulated variables + # ------------------------------------------------------------------ + datasets_to_merge = [meteo_cont_interp] + + print("datasets_to_merge ", datasets_to_merge) + print("meteo_accum_interp ", meteo_accum_interp) + + if accum_vars: + datasets_to_merge.append(meteo_accum_interp) + + meteo_final = xr.merge(datasets_to_merge) + + # Ensure exact coordinate alignment + meteo_final = meteo_final.assign_coords( + date_time=("date_time", target_time.values) + ) + + # Update dataset attributes + meteo_final = meteo_final.assign_attrs( + **meteo_ds.attrs, + time_step=(target_time[1] - target_time[0]).values, + time_interval=(target_time[-1] - target_time[0]).values, + ) + + return meteo_final + + def resample_meteo_to_model_timestep( + self, + meteo_ds: xr.Dataset, + model_time_step: pd.Timedelta + ) -> xr.Dataset: + """ + Resample meteorological dataset to a fixed model time step. + + Handles: + - continuous variables via linear interpolation + - accumulated variables (per-interval totals) via constant-rate redistribution + + :param meteo_ds: Input dataset with datetime coordinate + :param model_time_step: Target time step + :return: Resampled dataset (aligned on interval grid) + """ + + # ------------------------------------------------------------------ + # Clean time coordinate + # ------------------------------------------------------------------ + meteo_ds = meteo_ds.sortby("date_time") + + print("meteo_ds ", meteo_ds) + + _, idx = np.unique(meteo_ds.date_time.values, return_index=True) + meteo_ds = meteo_ds.isel(date_time=np.sort(idx)) + + time = meteo_ds.date_time.values + assert len(time) >= 2, "Need at least 2 timestamps" + + # ------------------------------------------------------------------ + # Build new time grid + # ------------------------------------------------------------------ + new_time = pd.date_range( + start=time[0], + end=time[-1], + freq=model_time_step + ) + + # interval grid (IMPORTANT) + new_time_mid = new_time[1:] + + # ------------------------------------------------------------------ + # Define variable groups + # ------------------------------------------------------------------ + accum_vars = [ + "precipitation_amount_accum", + "snowfall_amount_accum", + ] + accum_vars = [v for v in accum_vars if v in meteo_ds] + + cont_vars = [v for v in meteo_ds.data_vars if v not in accum_vars] + + # ------------------------------------------------------------------ + # 1. Continuous variables → interpolate then align to interval grid + # ------------------------------------------------------------------ + if cont_vars: + ds_cont = meteo_ds[cont_vars].interp(date_time=new_time_mid) + else: + ds_cont = xr.Dataset() + + # ------------------------------------------------------------------ + # 2. Accumulated variables → redistribute (constant rate) + # ------------------------------------------------------------------ + ds_accum = {} + + if accum_vars: + time_np = meteo_ds.date_time.values + + # original timestep (seconds) + dt_orig = (time_np[1] - time_np[0]) / np.timedelta64(1, "s") + + # new timestep durations + dt_new = np.diff(new_time.values) / np.timedelta64(1, "s") + + dt_new_da = xr.DataArray( + dt_new, + dims=["date_time"], + coords={"date_time": new_time_mid} + ) + + for var in accum_vars: + # interval total (ONLY second value!) + total = meteo_ds[var].isel(date_time=1) + + # constant rate over interval + rate = total / dt_orig + + # broadcast to new grid + rate_broadcast = rate.expand_dims(date_time=new_time_mid) + + # compute redistributed values + accum = (rate_broadcast * dt_new_da).transpose("loc", "date_time") + + ds_accum[var] = accum + + ds_accum = xr.Dataset(ds_accum) if ds_accum else xr.Dataset() + + # ------------------------------------------------------------------ + # Merge (now aligned!) + # ------------------------------------------------------------------ + ds_final = xr.merge([ds_cont, ds_accum]) + + # ------------------------------------------------------------------ + # Assertions (mass conservation) + # ------------------------------------------------------------------ + for var in accum_vars: + original_total = meteo_ds[var].isel(date_time=slice(1, None)).sum().values + resampled_total = ds_final[var].sum().values + + assert np.isclose(original_total, resampled_total, rtol=1e-5), \ + f"{var}: mass not conserved ({original_total} vs {resampled_total})" + + return ds_final + + def kalman_step(self, ukf, measurements_dataset, meteo_data, pressure_at_bottom): + """ + Execute one full UKF assimilation cycle over a meteo time window. + + For each interval: + 1. Resample meteorological forcing to model timestep + 2. Run UKF prediction for the corresponding number of model steps + 3. Perform measurement update (if valid) + + :param ukf: Unscented Kalman Filter instance + :param xarray.Dataset measurements_dataset: Measurement data indexed by date_time + :param xarray.Dataset meteo_data: Meteorological forcing (interval-based) + :param float pressure_at_bottom: Boundary condition for the physical model + :return: Final estimated velocity from the state model + """ + print(f"[UKF] Running Kalman step (pid={os.getpid()})") + + # ------------------------------------------------------------------ + # Apply boundary condition to the physical model + # ------------------------------------------------------------------ + self.model.set_pressure_at_bottom(pressure_at_bottom) + + # ------------------------------------------------------------------ + # Resolve model timestep (in hours → Timedelta) + # ------------------------------------------------------------------ + parflow_model_time_step = self.kalman_config["model_time_step"] + model_time_step = pd.Timedelta(hours=parflow_model_time_step) + + meteo_times = meteo_data.date_time.values + + collected_measurements = [] + + # ------------------------------------------------------------------ + # Main time loop (interval-based) + # ------------------------------------------------------------------ + for i in range(1, len(meteo_times)): + t_start = meteo_times[i - 1] + t_end = meteo_times[i] + + print(f"[UKF] Step {i}: {t_start} → {t_end}") + + # -------------------------------------------------------------- + # 1. Extract measurement for current timestep + # -------------------------------------------------------------- + measurement = measurements_dataset.isel(date_time=i) + encoded_measurement = self.train_measurements_struc.encode(measurement) + + # -------------------------------------------------------------- + # 2. Extract meteo data for current interval + # -------------------------------------------------------------- + met_interval = meteo_data.sel(date_time=slice(t_start, t_end)) + + # Resample forcing to model timestep + met_resampled = self.resample_meteo_to_model_timestep( + met_interval, + model_time_step + ) + + # Store timing metadata + met_resampled.attrs["time_step"] = model_time_step + met_resampled.attrs["time_interval"] = t_end - t_start + + # -------------------------------------------------------------- + # 3. Compute number of model iterations + # -------------------------------------------------------------- + dt_step = model_time_step / np.timedelta64(1, "h") + dt_interval = (t_end - t_start) / np.timedelta64(1, "h") + + model_num_iters = int(dt_interval / dt_step) + + # Safety check + assert model_num_iters > 0, "Model iteration count must be positive" + + # -------------------------------------------------------------- + # 4. UKF prediction step + # -------------------------------------------------------------- + ukf.predict( + model_num_iters=model_num_iters, + met_data=met_resampled + ) + + # -------------------------------------------------------------- + # 5. UKF update step (if valid measurement) + # -------------------------------------------------------------- + status = measurements_dataset.site_status.isel(date_time=i).values.item() + + if not (10 <= status < 20): + print(f"[UKF] Skipping update (invalid status={status})") + + elif np.isnan(encoded_measurement).any(): + print("[UKF] Skipping update (NaN in measurement)") + + else: + ukf.update(encoded_measurement) + + # -------------------------------------------------------------- + # Debug / diagnostics + # -------------------------------------------------------------- + print(f"[UKF] Covariance sum: {np.sum(ukf.P)}") + print(f"[UKF] State estimate: {ukf.x}") + + collected_measurements.append(measurement) + + # ------------------------------------------------------------------ + # Store results + # ------------------------------------------------------------------ + final_time = t_end + + self.results.times.append(final_time) + self.results.ukf_x.append(ukf.x.copy()) + self.results.ukf_P.append(ukf.P.copy()) + self.results.measurement_in.append(collected_measurements) + + # Extract latest state-derived quantities + last_key = list(self.state_measurements.keys())[-1] + + velocity, moisture = self.state_model_velocity_moisture[last_key] + self.results.velocities.append(velocity) + self.results.moistures.append(moisture) + + measurements_train_dict, measurements_test_dict = self.state_measurements[last_key] + + self.results.ukf_train_meas.append( + self.train_measurements_struc.encode(measurements_train_dict) + ) + self.results.ukf_test_meas.append( + self.test_measurements_struc.encode(measurements_test_dict) + ) + + return velocity + def run_kalman_filter(self, ukf, noisy_measurements, measurement_state_flag): """ Run the UKF predict/update loop for all measurement timesteps. @@ -541,6 +957,7 @@ def run_kalman_filter(self, ukf, noisy_measurements, measurement_state_flag): iter_durations = [self.results.times_measurements[0]] + list( np.array(self.results.times_measurements[1:]) - np.array(self.results.times_measurements[:-1]) ) + print("RUN kalman filter, process id:", os.getpid()) measurement_state_flag = np.array(measurement_state_flag) for i, measurement in enumerate(noisy_measurements): @@ -571,10 +988,19 @@ def run_kalman_filter(self, ukf, noisy_measurements, measurement_state_flag): self.results.ukf_train_meas.append(self.train_measurements_struc.encode(measurements_train_dict)) self.results.ukf_test_meas.append(self.test_measurements_struc.encode(measurements_test_dict)) - joblib.dump(self.results, self.work_dir / 'kalman_results.pkl') + self.save_results() return self.results + def get_longitude_latitude(self): + longitude, latitude = 0, 0 #@TODO: get correct values + return longitude, latitude + + + def save_results(self): + joblib.dump(self.results, self.work_dir / 'kalman_results.pkl') + + # @memory.cache def run_kalman(workdir, cfg_file): """ @@ -606,18 +1032,4 @@ def main(): if __name__ == "__main__": - import cProfile - import pstats - - pr = cProfile.Profile() - pr.enable() - main() - - # Configure ParFlow executable paths if needed - # os.environ['PARFLOW_HOME'] = '/opt/parflow_install' - # os.environ['PATH'] += ':/opt/parflow_install/bin' - - pr.disable() - ps = pstats.Stats(pr).sort_stats('cumtime') - ps.print_stats(50) diff --git a/hlavo/soil_parflow/parflow_model.py b/hlavo/soil_parflow/parflow_model.py index 46b37f83..9a2b0688 100644 --- a/hlavo/soil_parflow/parflow_model.py +++ b/hlavo/soil_parflow/parflow_model.py @@ -35,6 +35,18 @@ def __init__(self, config, workdir=None): # Check if the directory exists assert parflow_path.is_dir(), f"The PARFLOW_DIR environment variable is set, but the directory does not exist: {parflow_path}" + if config.get("use_clm", False): + assert "clm_files" in config + self._clm_files = config["clm_files"] + self.configure_clm() + + + def configure_clm(self): + # Setup CLM + self._run.Solver.LSM = "CLM" + self._run.Solver.CLM.MetForcing = "1D" + self._run.Solver.CLM.MetFileName = "narr_1hr.txt" + def get_nodes_z(self): """ Grid nodes @@ -374,8 +386,8 @@ def set_porosity(self, z_values, porosity_values): def prepare_clm(self, working_dir, input_dir, ds): # We assume these files exist in current dir. # They should be either generated runtime or stored somewhere globally. - shutil.copy(input_dir / "drv_vegm.dat", working_dir / "drv_vegm.dat") - shutil.copy(input_dir / "drv_vegp.dat", working_dir / "drv_vegp.dat") + shutil.copy(self._clm_files["drv_vegm_file"], working_dir / pathlib.Path("drv_vegm.dat")) + shutil.copy(self._clm_files["drv_vegp_file"], working_dir / pathlib.Path("drv_vegp.dat")) # Create subdirectories necessary for Parflow/CLM coupling. for dir in [ @@ -395,11 +407,14 @@ def prepare_clm(self, working_dir, input_dir, ds): "qflx_evap_veg", "qflx_top_soil", ]: - os.makedirs(working_dir / dir) + os.makedirs(working_dir / pathlib.Path(dir)) # Create main CLM parameter file drv_clmin.dat - start=ds.time[0].dt - end=ds.time[-1].dt + start=ds.date_time[0].dt + end=ds.date_time[-1].dt + + print("ds ", ds) + drv_clmin_params = dict( # CLM Domain (Read into 1D drv_module variables) : maxt= 1, # Maximum tiles per grid (originally 3; changed it, because we have one type per cell) @@ -434,13 +449,12 @@ def prepare_clm(self, working_dir, input_dir, ds): clm_ic= 2, # 1=restart file,2=defined CLM Initial Condition Source # CLM initial conditions (1-D) : used in drv_clmini.f90_ - t_ini= ds.air_temperature_2m[0,0].values.item(), # Initial temperature [K] + t_ini= ds.Temp[0].values.item(), # Initial temperature [K] #h2osno_ini= 0., # Initial snow cover, water equivalent [mm] ) - with open(working_dir / "drv_clmin.dat", "w") as f: - f.write("\n".join(f"{name:20} {value}" for name,value in drv_clmin_params.items())) - + with open(working_dir / pathlib.Path("drv_clmin.dat"), "w") as f: + f.write("\n".join(f"{name:20} {value}" for name,value in drv_clmin_params.items())) # Prepare file with meteorological data for CLM. stop_time = ds.time_interval / np.timedelta64(1, 's') @@ -450,58 +464,44 @@ def prepare_clm(self, working_dir, input_dir, ds): clm_met_data = { # Downward Visible or Short-Wave radiation [W/m2]. # surface_solar_radiation_downwards [J/m2] / time_step [s] - "DSWR": ds.surface_solar_radiation_downwards / time_step, + "DSWR": ds.DSWR, #ds.surface_solar_radiation_downwards / time_step, # Downward Infa-Red or Long-Wave radiation [W/m2]. # surface_thermal_radiation_downwards [J/m2] / time_step [s] - "DLWR": ds.surface_thermal_radiation_downwards / time_step, + "DLWR": ds.DLWR, #ds.surface_thermal_radiation_downwards / time_step, # Precipitation rate [mm/s]. # precipitation_amount_accum [kg/m2] / water_density [kg/m3] / total_time [s] * 1000 [mm/m] # Note: Here we probably need some else field than the total amount since forecast start. - "APCP": ds.precipitation_amount_accum / 1000 / stop_time * 1000, + "APCP": ds.APCP, #ds.precipitation_amount_accum / 1000 / stop_time * 1000, # Air temperature [K]. # air_temperature_2m - "Temp": ds.air_temperature_2m, + "Temp": ds.Temp, #ds.air_temperature_2m, # West-to-East or U-component of wind [m/s]. # -wind_speed_10m * cos(wind_from_direction_10m * pi/180) - "UGRD": -ds.wind_speed_10m * np.cos(ds.wind_from_direction_10m * np.pi/180), + "UGRD": ds.UGRD, #-ds.wind_speed_10m * np.cos(ds.wind_from_direction_10m * np.pi/180), # South-to-North or V-component of wind [m/s]. # -wind_speed_10m * sin(wind_from_direction_10m * pi/180) - "VGRD": -ds.wind_speed_10m * np.cos(ds.wind_from_direction_10m * np.pi/180), + "VGRD": ds.VGRD, #-ds.wind_speed_10m * np.cos(ds.wind_from_direction_10m * np.pi/180), # Atmospheric pressure [Pa]. # air_pressure_at_sea_level # Note: Can we get the pressure at current terrain level? - "Press": ds.air_pressure_at_sea_level, + "Press": ds.Press, #ds.air_pressure_at_sea_level, # Water-vapor specific humidity [kg/kg]. # relative_humidity_2m - "SPFH": ds.relative_humidity_2m, + "SPFH": ds.SPFH, #ds.relative_humidity_2m, } - with open(working_dir / self._run.Solver.CLM.MetFileName, "w") as f: - for i,time in enumerate(ds.time.values): - f.write( " ".join(str(v[0,i].item()) for v in clm_met_data.values()) + "\n") - # with open(working_dir / "lai.dat", "w") as f: - # for time in ds.time.values: - # f.write( "6.00 6.00 6.00 6.00 6.00 6.00 6.00 6.00 6.00 2.00 6.00 6.00 5.00 6.00 0.00 6.00 0.00 0.00\n" ) - # - # with open(working_dir / "sai.dat", "w") as f: - # for time in ds.time.values: - # f.write( "2.00 2.00 2.00 2.00 2.00 2.00 2.00 2.00 2.00 4.00 2.00 0.50 2.00 2.00 2.00 2.00 2.00 0.00\n" ) - # - # with open(working_dir / "z0m.dat", "w") as f: - # for time in ds.time.values: - # f.write( "1.00 2.20 1.00 0.80 0.80 0.10 0.10 0.70 0.10 0.03 0.03 0.06 0.50 0.06 0.01 0.05 0.002 0.01\n" ) - # - # with open(working_dir / "displa.dat", "w") as f: - # for time in ds.time.values: - # f.write( "11.0 23.00 11.0 13.0 13.0 0.30 0.30 6.50 0.70 0.30 0.30 0.30 3.00 0.30 0.00 0.10 0.00 0.00\n" ) + ds_loaded = ds.compute() + with open(working_dir / pathlib.Path(self._run.Solver.CLM.MetFileName), "w") as f: + for i,time in enumerate(ds_loaded.date_time.values): + f.write( " ".join(str(v[i].item()) for v in ds_loaded.data_vars.values()) + "\n") def run(self, @@ -635,6 +635,9 @@ def set_dynamic_params(self, model_params): #return model_params_new_values + def set_pressure_at_bottom(self, pressure_at_bottom): + self._run.Patch.bottom.BCPressure.alltime.Value = pressure_at_bottom + def save_pressure(self, image_file): cwd = settings.get_working_directory() settings.set_working_directory(self._workdir) diff --git a/runs/composed_1d_only/README.md b/runs/composed_1d_only/README.md new file mode 100644 index 00000000..3dfc31cd --- /dev/null +++ b/runs/composed_1d_only/README.md @@ -0,0 +1,56 @@ +# Model1D – Usage Guide + +## Overview + +`hlavo.composed.model_1d.Model1D` is a one-dimensional hydrological model component designed to operate within a coupled 1D–3D simulations. It integrates: + +* Measurement data +* Meteorological data +* An Unscented Kalman filter (UKF) + +The model processes time-stepped data and communicates with external components via queues. + +## Testing + +Run tests using: + +```bash +pytest tests/model_1d/model_1d_test_run.py +``` + +Or manually: + +```bash +python tests/model_1d/model_1d_test_run.py +``` + +### Covered tests + +* Measurement preparation +* Time slicing +* Coordinate (lon, lat) retrieval +* Step execution + +--- + +## Known Limitations / TODOs + +* Meteorological data is currently mocked +* Time alignment hack (forcing year = 2025) +* Measurement covariance matrix is simplistic (identity) + +--- + + + +## Charon cluster run - not tested +First singularity image has to be created: +```bash +export SINGULARITY_CACHEDIR="user home dir" +export SINGULARITY_LOCALCACHEDIR="user scratch dir" +export SINGULARITY_TMPDIR=$SCRATCHDIR + +singularity build hlavo_0_1_0.sif docker://flow123d/hlavo:0.1.0 + + +./pbs_run.sh diff --git a/runs/composed_1d_only/composed_1d_part_config.yaml b/runs/composed_1d_only/composed_1d_part_config.yaml new file mode 100644 index 00000000..40ba411e --- /dev/null +++ b/runs/composed_1d_only/composed_1d_part_config.yaml @@ -0,0 +1,6 @@ +seed: 123456 +start_datetime: "2025-03-06T00:00:00" +end_datetime: "2025-03-09T00:00:00" +1d_models: + - model_1d_config.yaml + diff --git a/runs/composed_1d_only/composed_config.yaml b/runs/composed_1d_only/composed_config.yaml new file mode 100644 index 00000000..ebec009c --- /dev/null +++ b/runs/composed_1d_only/composed_config.yaml @@ -0,0 +1,174 @@ +seed: 123456 +start_datetime: "2025-03-06T00:00:00" +end_datetime: "2025-03-09T00:00:00" +#model_3d: +# common: +# class_name: "Model3D" +# name: "uhelna" +# total_time_days: 15.0 +# # Total simulated time in days. If omitted, a single step is run. +# executable: "mf6" +# # executable to run for the 3d model - some version of modflow +# +# geometry: +# qgis_project_path: ../../hlavo/deep_model/GIS/uhelna_all.qgs +# boundary_layer_name: domain_around_mine +# raster_group_name: HG model layers +# meshsteps: +# x: 40.0 +# y: 40.0 +# z: 4.0 +# +# materials: +# all: +# # AGENT: document each parameter here (a demo file) +# # Comment quantity name and unit according to modflow6 documentation. +# # Put parameters that does not apply to volumetric materials out of the all section +# # logic should be that what is in all is applied to all materials as common/default value +# horizontal_conductivity: 1.0e-06 +# vertical_conductivity: 1.0e-06 +# porosity: 0.3 +# vG_n: 2.0 +# vG_alpha: 1.0 +# recharge_rate: 0.0001 +# vks: 1.0e-06 +# thtr: 0.05 +# thts: 0.3 +# thti: 0.2 +# eps: 3.5 +# surfdep: 0.0 +# pet: 0.0 +# extdp: 1.0 +# extwc: 0.1 +# ha: 0.0 +# hroot: 0.0 +# rootact: 0.0 +# ntrailwaves: 7 +# nwavesets: 40 +# simulate_et: false +# unsat_etwc: false +# unsat_etae: false +# simulate_gwseep: true +# hydraulic_conductivity: 1.0e-06 +# specific_yield: 0.1 +# specific_storage: 1.0e-05 +# initial_head_offset: -1.0 +# perlen: 1.0 +# nstp: 1 +# tsmult: 1.0 +# sand: +# horizontal_conductivity: 1.0e-05 +# vertical_conductivity: 1.0e-05 +# clay: +# horizontal_conductivity: 1.0e-07 +# vertical_conductivity: 1.0e-07 +# +# plots: +# enabled: true +# dpi: 150 +# xsection_depth_window: 40.0 + +model_1d: + class_name: "Model1DMock" + +# sites: +# - longitude: 14.8800 +# latitude: 50.8600 +# - longitude: 14.9000 +# latitude: 50.8800 +# - longitude: 14.9200 +# latitude: 50.8900 + model_config: + model_class_name: "ToyProblem" + use_clm: True + clm_files: + drv_vegm_file: drv_vegm.dat # relative path to composed config file + drv_vegp_file: drv_vegm.dat + init_pressure: [-40, -100] #[-0.1, -4] # bottom, top ; [temporary solution] + static_params: + ComputationalGrid: + "Lower.Z": -135 + "DZ": 9 #0.2 + "NZ": 15 #, 0] # Number of grid cells in z-direction + params: + vG_K_s: "Geom.domain.Perm.Value" + vG_n: ["Geom.domain.Saturation.N", "Geom.domain.RelPerm.N"] + vG_Th_s: "Geom.domain.Saturation.SSat" + vG_Th_r: "Geom.domain.Saturation.SRes" + vG_alpha: ["Geom.domain.Saturation.Alpha", "Geom.domain.RelPerm.Alpha"] + measurements_config: + measurements_scheme_file: ../../hlavo/ingress/moist_profile/profile_schema.yaml + meteo_scheme_file: ../../hlavo/ingress/meteo_playground/chmi_stations/chmi_stations_schema.yaml + kalman_config: + model_time_step: 2.5e-2 + state_params: + pressure_field: + mean_linear: + top: -40 + bottom: -100 # top, bottom + cov_exponential: + std: 2.0 + rel_std_Q: 0.007 + ref: + # ref: TODO: make ref accept mean clases + # correlation_length: 0.5 + vG_K_s: + ref: 0.0737 + #mean_std: [12.833, 0.01] + mean_std: [-2.60775248, 0.01] #[0.0737, 0.001] # transformed mean: + rel_std_Q: 0.00028 #0.01 + transform: lognormal # transform to normal distribution + #conf_interval: [0.0001, 0.01] # implicit 0.95 confidence interval + #vG_n: + # ref: 1.89 + # mean_std: [1.89, 0.05] + # mean_std: [-0.116533816, 0.005] + # rel_std_Q: 0.081 #0.005 #0.007273926 + # transform: log_minus_one # transform to normal distribution + # conf_interval: [0.0001, 0.01] # implicit 0.95 confidence interval + #vG_Th_s: + # ref: 0.41 + # mean_std: [-0.363, 0.01] #[0.41, 0.01] + # rel_std_Q: 0.001 #0.000243902 + # transform: logit + #vG_Th_r: + # ref: 0.065 + # mean_std: [-2.667, 0.01] #[0.065, 0.01] + # rel_std_Q: 0.0001 #0.001538462 + # transform: logit + #vG_alpha: + # ref: 0.75 + # mean_std: [-0.287682072, 0.001] + # rel_std_Q: 0.0026 # 0.001 #0.000133333 + # transform: lognormal # transform to normal distribution + # conf_interval: [0.0001, 0.01] # implicit 0.95 confidence interval + #Th_s: + # transform: none + # + # conf_interval: [0.35, 0.41, 0.9] # explicit 0.9 confidence interval + train_measurements: + moisture: + z_pos: [-10, -20, -60] #[-10, -20, -30, -40, -50, -60, -70, -80, -90, -100, 110, -120, -130, -140, -150] ##[-0.5, -2.5] + noise_level: 0.01 + noise_distr_type: gaussian + #pressure: + # z_pos: [-15] + # noise_level: 0.1 + # noise_distr_type: uniform + #velocity: + # z_pos: [0.75] + # noise_level: 0.1 + # noise_distr_type: uniform + test_measurements: + moisture: + z_pos: [-40] + noise_level: 0.01 + noise_distr_type: gaussian + noise_distr_type: gaussian + pressure_saturation_data_noise_level: 0.01 # init state noise + init_cov_P_multiplicator: 1 + sigma_points_params: {"alpha": 0.01, "beta": 2, "kappa": 1} + parallel_sigmas: false + postprocess: + show: True + diff --git a/runs/composed_1d_only/drv_vegm.dat b/runs/composed_1d_only/drv_vegm.dat new file mode 100644 index 00000000..247b04dd --- /dev/null +++ b/runs/composed_1d_only/drv_vegm.dat @@ -0,0 +1,3 @@ + x y lat lon sand clay color fractional coverage of grid by vegetation class (Must/Should Add to 1.0) + (Deg) (Deg) (%/100) index 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 + 1 1 50.0 14.5 0.16 0.265 2 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 diff --git a/runs/composed_1d_only/drv_vegp.dat b/runs/composed_1d_only/drv_vegp.dat new file mode 100644 index 00000000..a460e09b --- /dev/null +++ b/runs/composed_1d_only/drv_vegp.dat @@ -0,0 +1,111 @@ +!========================================================================= +! +! CLMCLMCLMCLMCLMCLMCLMCLMCL A community developed and sponsored, freely +! L M available land surface process model. +! M --COMMON LAND MODEL-- C +! C L CLM WEB INFO: http://clm.gsfc.nasa.gov +! LMCLMCLMCLMCLMCLMCLMCLMCLM CLM ListServ/Mailing List: +! +!========================================================================= +! clm_vegp.dat: +! +! DESCRIPTION: +! Vegetation parameter data file for IGBP classification. +! Please note that any other classification can be used with CLM +! provided that the following parameter values are given. +! This data file is read in by drv_getvegp.f90 - this program expects +! the format given below, but does not assume an exact parameter order. +! +! REVISION HISTORY: +! 15 September 1999: Yongjiu Dai; Initial code +! 15 December 1999: Paul Houser; F90 Revision +! 25 November 2000: Mariana Vertenstein +!========================================================================= +!IGBP Land Cover Types (other classes can be used by changing this file) +! 1 evergreen needleleaf forests +! 2 evergreen broadleaf forests +! 3 deciduous needleleaf forests +! 4 deciduous broadleaf forests +! 5 mixed forests +! 6 closed shrublands +! 7 open shrublands +! 8 woody savannas +! 9 svannas +! 10 grasslands +! 11 permanent wetlands +! 12 croplands +! 13 urban and built-up lands +! 14 cropland / natural vegetation mosaics +! 15 snow and ice +! 16 barren or sparsely vegetated +! 17 water bodies +! 18 bare soil +!========================================================================= +! +itypwat (1-soil, 2-land ice, 3-deep lake, 4-shallow lake, 5-wetland: swamp, marsh) +1 1 1 1 1 1 1 1 1 1 5 1 1 1 2 1 3 1 +! +lai Maximum leaf area index [-] +6.00 6.00 6.00 6.00 6.00 6.00 6.00 6.00 6.00 2.00 6.00 6.00 5.00 6.00 0.00 6.00 0.00 0.00 +! +lai0 Minimum leaf area index [-] +5.00 5.00 1.00 1.00 3.00 2.00 1.00 2.00 1.00 0.50 0.50 0.50 1.00 2.00 0.00 0.50 0.00 0.00 +! +sai Stem area index [-] +2.00 2.00 2.00 2.00 2.00 2.00 2.00 2.00 2.00 4.00 2.00 0.50 2.00 2.00 2.00 2.00 2.00 0.00 +! +z0m Aerodynamic roughness length [m] +1.00 2.20 1.00 0.80 0.80 0.10 0.10 0.70 0.10 0.03 0.03 0.06 0.50 0.06 0.01 0.05 0.002 0.01 +! +displa Displacement height [m] +11.0 23.00 11.0 13.0 13.0 0.30 0.30 6.50 0.70 0.30 0.30 0.30 3.00 0.30 0.00 0.10 0.00 0.00 +! +dleaf Leaf dimension [m] +0.04 0.04 0.04 0.04 0.04 0.04 0.04 0.04 0.04 0.04 0.04 0.04 0.04 0.04 0.04 0.04 0.04 0.00 +! +roota Fitted numerical index of rooting distribution +7.00 7.00 7.00 6.00 5.00 6.00 5.00 6.00 5.00 1.00 6.00 6.00 5.00 5.00 0.00 5.00 0.00 0.00 +! +rootb Fitted numerical index of rooting distribution +2.00 1.00 2.00 2.00 1.50 1.50 2.50 2.50 1.00 2.50 2.00 2.50 2.00 2.00 0.00 2.00 0.00 0.00 +! +rhol_vis !leaf reflectance vis +0.07 0.10 0.07 0.10 0.08 0.08 0.08 0.09 0.11 0.11 0.11 0.11 0.09 0.09 -99. 0.09 -99. -99. +! +rhol_nir !leaf reflectance nir +0.35 0.45 0.35 0.45 0.40 0.40 0.40 0.49 0.58 0.58 0.35 0.58 0.47 0.47 -99. 0.47 -99. -99. +! +rhos_vis !stem reflectance vis +0.16 0.16 0.16 0.16 0.16 0.16 0.16 0.26 0.36 0.36 0.36 0.36 0.24 0.24 -99. 0.24 -99. -99. +! +rhos_nir !stem reflectance nir +0.39 0.39 0.39 0.39 0.39 0.39 0.39 0.48 0.58 0.58 0.39 0.58 0.47 0.47 -99. 0.47 -99. -99. +! +taul_vis !leaf transmittance vis +0.05 0.05 0.05 0.05 0.05 0.05 0.05 0.06 0.07 0.07 0.07 0.07 0.06 0.06 -99. 0.06 -99. -99. +! +taul_nir !leaf transmittance nir +0.10 0.25 0.10 0.25 0.17 0.17 0.17 0.21 0.25 0.25 0.10 0.25 0.20 0.20 -99. 0.20 -99. -99. +! +taus_vis !stem transmittance vis +0.001 0.001 0.001 0.001 0.001 0.001 0.001 0.11 0.22 0.22 0.22 0.22 0.09 0.09 -99. 0.09 -99. -99. +! +taus_nir !stem transmittance nir +0.001 0.001 0.001 0.001 0.001 0.001 0.001 0.19 0.38 0.38 0.001 0.38 0.15 0.15 -99. 0.15 -99. -99. +! +xl !leaf/stem orientation index +0.01 0.10 0.01 0.25 0.13 0.13 0.13 -0.08 -0.3 -0.3 -0.3 -0.3 -0.07 -0.07 -99. -0.07 -99. -99. +! +vw !btran exponent:[(h2osoi_vol-watdry)/(watopt-watdry)]**vw +1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. -99. 1. -99. -99. +! +irrig !(irrig=0 -> no irrigation, irrig=1 -> irrigate) +0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 +! + + + + + + + diff --git a/runs/composed_1d_only/model_1d_config.yaml b/runs/composed_1d_only/model_1d_config.yaml new file mode 100644 index 00000000..7e1bab25 --- /dev/null +++ b/runs/composed_1d_only/model_1d_config.yaml @@ -0,0 +1,97 @@ +model_config: + model_class_name: "ToyProblem" + use_clm: True + clm_files: + drv_vegm_file: drv_vegm.dat # relative path to composed config file + drv_vegp_file: drv_vegm.dat + init_pressure: [-40, -100] #[-0.1, -4] # bottom, top ; [temporary solution] + static_params: + ComputationalGrid: + "Lower.Z": -135 + "DZ": 9 #0.2 + "NZ": 15 #, 0] # Number of grid cells in z-direction + params: + vG_K_s: "Geom.domain.Perm.Value" + vG_n: ["Geom.domain.Saturation.N", "Geom.domain.RelPerm.N"] + vG_Th_s: "Geom.domain.Saturation.SSat" + vG_Th_r: "Geom.domain.Saturation.SRes" + vG_alpha: ["Geom.domain.Saturation.Alpha", "Geom.domain.RelPerm.Alpha"] + +measurements_config: + #zarr_dir: /home/martin/Documents/HLAVO/tests/ingress/moist_profile/storage_2025 + measurements_scheme_file: ../../hlavo/ingress/moist_profile/profile_schema.yaml + #meteo_scheme_file: ../../hlavo/ingress/scrapper/schemas/hlavo_surface_schema.yaml + meteo_scheme_file: ../../hlavo/ingress/meteo_playground/chmi_stations/chmi_stations_schema.yaml +kalman_config: + model_time_step: 2.5e-2 # in hours + state_params: + pressure_field: + mean_linear: + top: -40 + bottom: -100 # top, bottom + cov_exponential: + std: 2.0 + rel_std_Q: 0.007 + ref: + # ref: TODO: make ref accept mean clases + # correlation_length: 0.5 + vG_K_s: + ref: 0.0737 + #mean_std: [12.833, 0.01] + mean_std: [-2.60775248, 0.01] #[0.0737, 0.001] # transformed mean: + rel_std_Q: 0.00028 #0.01 + transform: lognormal # transform to normal distribution + #conf_interval: [0.0001, 0.01] # implicit 0.95 confidence interval + #vG_n: + # ref: 1.89 + #mean_std: [1.89, 0.05] + # mean_std: [-0.116533816, 0.005] + # rel_std_Q: 0.081 #0.005 #0.007273926 + # transform: log_minus_one # transform to normal distribution + #conf_interval: [0.0001, 0.01] # implicit 0.95 confidence interval + #vG_Th_s: + # ref: 0.41 + # mean_std: [-0.363, 0.01] #[0.41, 0.01] + # rel_std_Q: 0.001 #0.000243902 + # transform: logit + #vG_Th_r: + # ref: 0.065 + # mean_std: [-2.667, 0.01] #[0.065, 0.01] + # rel_std_Q: 0.0001 #0.001538462 + # transform: logit + #vG_alpha: + # ref: 0.75 + # mean_std: [-0.287682072, 0.001] + # rel_std_Q: 0.0026 # 0.001 #0.000133333 + # transform: lognormal # transform to normal distribution + #conf_interval: [0.0001, 0.01] # implicit 0.95 confidence interval + #Th_s: + # transform: none + # + # conf_interval: [0.35, 0.41, 0.9] # explicit 0.9 confidence interval + train_measurements: + moisture: + z_pos: [-10, -20,-40, -60, -100] + noise_level: 0.01 + noise_distr_type: gaussian + #pressure: + # z_pos: [-15] + # noise_level: 0.1 + # noise_distr_type: uniform + #velocity: + # z_pos: [0.75] + # noise_level: 0.1 + # noise_distr_type: uniform + test_measurements: + moisture: + z_pos: [-40] + noise_level: 0.01 + noise_distr_type: gaussian + + noise_distr_type: gaussian + pressure_saturation_data_noise_level: 0.01 # init state noise + init_cov_P_multiplicator: 1 + sigma_points_params: {"alpha": 0.01, "beta": 2, "kappa": 1} + parallel_sigmas: false +postprocess: + show: True diff --git a/runs/composed_1d_only/pbs_dask_map.sh b/runs/composed_1d_only/pbs_dask_map.sh new file mode 100644 index 00000000..5e6a2cbf --- /dev/null +++ b/runs/composed_1d_only/pbs_dask_map.sh @@ -0,0 +1,122 @@ +#!/bin/bash +#PBS -N dask_map_demo +#PBS -l select=1:ncpus=4:mem=16gb:scratch_local=1gb +#PBS -l place=scatter +#PBS -l walltime=2:00:00 +#PBS -j oe + +set -euo pipefail +set -x + +############################################ +# CONFIGURATION +############################################ + +work_dir="" +HLAVODIR="" # HLAVO repository code dir +CONTAINER="hlavo_0_1_0.sif" + +ENV_PY="/home/hlavo/miniconda3/envs/hlavo/bin/python" + +SCHED_FILE="$PBS_O_WORKDIR/scheduler.json" + +cd "$PBS_O_WORKDIR" + +echo "Job started at: $(date)" +echo "Host: $(hostname -f)" +echo "CPUs allocated: ${PBS_NCPUS:-4}" +echo "SCRATCHDIR: ${SCRATCHDIR:-}" + +############################################ +# CLEANUP HANDLER +############################################ + +cleanup() { + echo "Stopping scheduler..." + [[ -n "${SCHED_PID:-}" ]] && kill "$SCHED_PID" 2>/dev/null || true + wait || true +} +trap cleanup EXIT + +############################################ +# START DASK SCHEDULER +############################################ + +: > "$SCHED_FILE" + +echo "Starting Dask scheduler..." + +singularity exec \ + -B "$HLAVODIR" \ + -B "$work_dir" \ + -B "$SCRATCHDIR" \ + "$CONTAINER" \ + "$ENV_PY" -m distributed.cli.dask_scheduler \ + --scheduler-file "$SCHED_FILE" \ + --host "$(hostname -f)" \ + --protocol tcp \ + --port 0 \ + --dashboard-address ":0" \ + > scheduler.log 2>&1 & + +SCHED_PID=$! + +echo -n "Waiting for scheduler" +for i in {1..40}; do + if [[ -s "$SCHED_FILE" ]] && grep -q '"address"' "$SCHED_FILE"; then + echo " - ready" + break + fi + sleep 0.5 + echo -n "." +done + +if ! grep -q '"address"' "$SCHED_FILE"; then + echo "Scheduler failed to start" + tail -n 200 scheduler.log || true + exit 1 +fi + +############################################ +# START DASK WORKERS (SINGLE NODE) +############################################ + +echo "Starting Dask workers..." + +NWORKERS=${PBS_NCPUS:-4} + +for i in $(seq 1 "$NWORKERS"); do + singularity exec \ + -B "$HLAVODIR" \ + -B "$work_dir" \ + -B "$SCRATCHDIR" \ + "$CONTAINER" \ + "$ENV_PY" -m distributed.cli.dask_worker \ + --scheduler-file "$SCHED_FILE" \ + --nthreads 1 \ + --memory-limit 0 \ + --local-directory "$SCRATCHDIR" \ + > worker_${i}.log 2>&1 & +done + +sleep 5 + +############################################ +# RUN CLIENT +############################################ + +echo "Running composed model..." + +singularity exec $CONTAINER $ENV_PY -m pip install joblib + +singularity exec \ + -B "$HLAVODIR" \ + -B "$work_dir" \ + -B "$SCRATCHDIR" \ + "$CONTAINER" \ + env PYTHONPATH="$HLAVODIR" \ + "$ENV_PY" -m hlavo.composed.composed_model_mock \ + "${work_dir}/results_dir" \ + "${work_dir}/config.yaml" + +echo "Job finished at: $(date)" \ No newline at end of file diff --git a/runs/composed_1d_only/pbs_run.sh b/runs/composed_1d_only/pbs_run.sh new file mode 100644 index 00000000..b635655f --- /dev/null +++ b/runs/composed_1d_only/pbs_run.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +qsub -q charon_2h /storage/liberec3-tul/home/martin_spetlik/HLAVO_distr/runs/composed_1d_only/pbs_dask_map.sh \ No newline at end of file diff --git a/runs/composed_1d_only/run_model_1d.py b/runs/composed_1d_only/run_model_1d.py new file mode 100644 index 00000000..56b42399 --- /dev/null +++ b/runs/composed_1d_only/run_model_1d.py @@ -0,0 +1,68 @@ +import os +import numpy as np +from pathlib import Path +from hlavo.composed.model_1d import Model1D +from hlavo.composed.data_3d_to_1d import Data3DTo1D +from hlavo.composed.composed_model_mock import relative_to_absolute_paths +from dask.distributed import Client, LocalCluster, get_client, Queue + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def load_config(config_path: Path): + import yaml + with config_path.open("r") as f: + return yaml.safe_load(f) + + +def build_model(composed_model_config: dict): + composed_model_config = relative_to_absolute_paths(composed_model_config, config_dir) + + site_id = 1 + seed = composed_model_config["seed"] + + model_1d_config = composed_model_config["model_1d"] + #model_1d_config = load_config(Path(kalman_config_path).resolve()) + model_1d_config = relative_to_absolute_paths(model_1d_config, config_dir) + + model = Model1D( + site_id=site_id, + initial_state=0.0, + work_dir=work_dir, + model_kalman_config_dict=model_1d_config, + seed=seed + ) + + return model + +if __name__ == "__main__": + cluster = LocalCluster(n_workers=4, threads_per_worker=1) + client = Client(cluster) + + queue_name_1d_to_3d = "q-1d-to-3d" + Queue(queue_name_1d_to_3d, client=client) # ensure creation + + q_name_3d_to_1d = "q-3d-to-1d-1" + q_3d_to_1d = Queue(q_name_3d_to_1d, client=client) + + work_dir = Path(os.getcwd()) + config_file = work_dir / 'composed_config.yaml' + print("config file ", config_file) + composed_model_config_path = Path(config_file).resolve() + config_dir = composed_model_config_path.parent + + composed_model_config = load_config(composed_model_config_path) + + start_datetime = np.datetime64(composed_model_config["start_datetime"]) + end_datetime = np.datetime64(composed_model_config["end_datetime"]) + + data_to_1d = Data3DTo1D(site_id=1, date_time=end_datetime, pressure_head=0) + q_3d_to_1d.put((end_datetime, data_to_1d)) + + model = build_model(composed_model_config) + + model.run_loop(start_datetime, end_datetime, q_name_3d_to_1d, queue_name_1d_to_3d) + + client.close() + cluster.close() diff --git a/runs/kalman/README.md b/runs/kalman/README.md new file mode 100644 index 00000000..ff9af401 --- /dev/null +++ b/runs/kalman/README.md @@ -0,0 +1 @@ +`run_kalman.sh` is deprecated and no longer maintained. diff --git a/runs/kalman/config_0_experiments_distr.yaml b/runs/kalman/config_0_experiments_distr.yaml new file mode 100644 index 00000000..c2c8cb17 --- /dev/null +++ b/runs/kalman/config_0_experiments_distr.yaml @@ -0,0 +1,175 @@ +model_config: + model_class_name: "ToyProblem" + init_pressure: [-40, -100] #[-0.1, -4] # bottom, top ; [temporary solution] + static_params: + ComputationalGrid: + "Lower.Z": -135 + "DZ": 9 #0.2 + "NZ": 15 #, 0] # Number of grid cells in z-direction + params: + vG_K_s: "Geom.domain.Perm.Value" + vG_n: ["Geom.domain.Saturation.N", "Geom.domain.RelPerm.N"] + vG_Th_s: "Geom.domain.Saturation.SSat" + vG_Th_r: "Geom.domain.Saturation.SRes" + vG_alpha: ["Geom.domain.Saturation.Alpha", "Geom.domain.RelPerm.Alpha"] + # "Geom.domain": + # : + # "Saturation.N": [3.7, 0.1] + evapotranspiration_params: + names: + - "n" + - "T" + - "u2" + - "Tmax" + - "Tmin" + - "RHmax" + - "RHmin" + - "month" + values: + - 14 + - 20 + - 10 + - 27 + - 12 + - 0.55 + - 0.35 + - 6 +measurements_config: + measurements_file: /home/martin/Documents/HLAVO/runs/column/hlavo_lab_merged_data_2025_06_12_state_smoothed_subset.csv + zarr_dir: /home/martin/Documents/HLAVO/tests/ingress/moist_profile/test_storage + scheme_file: /home/martin/Documents/HLAVO/hlavo/ingress/moist_profile/profile_schema.yaml + measurements_time_step: 5 + probe: "pr2" + model_time_step: 0.025 #2.5e-2 + model_n_time_steps_per_iter: 1200 #40 + rain_periods: + #- [2, 0] + - [284, -0.09523809523809525] + #- [284, -0.01667] + - [40, 0] + - [8, -0.01667] + - [4, 0] +kalman_config: + model_time_step: 2.5e-2 + #model_n_time_steps_per_iter: 40 + state_params: + pressure_field: + mean_linear: + top: -40 + bottom: -100 # top, bottom + cov_exponential: + std: 2.0 + rel_std_Q: 0.007 + ref: + # ref: TODO: make ref accept mean clases + # correlation_length: 0.5 + vG_K_s: + ref: 0.0737 + #mean_std: [12.833, 0.01] + mean_std: [-2.60775248, 0.01] #[0.0737, 0.001] # transformed mean: + rel_std_Q: 0.00028 #0.01 + transform: lognormal # transform to normal distribution + #conf_interval: [0.0001, 0.01] # implicit 0.95 confidence interval + #vG_n: + # ref: 1.89 + #mean_std: [1.89, 0.05] + # mean_std: [-0.116533816, 0.005] + # rel_std_Q: 0.081 #0.005 #0.007273926 + # transform: log_minus_one # transform to normal distribution + #conf_interval: [0.0001, 0.01] # implicit 0.95 confidence interval + #vG_Th_s: + # ref: 0.41 + # mean_std: [-0.363, 0.01] #[0.41, 0.01] + # rel_std_Q: 0.001 #0.000243902 + # transform: logit + #vG_Th_r: + # ref: 0.065 + # mean_std: [-2.667, 0.01] #[0.065, 0.01] + # rel_std_Q: 0.0001 #0.001538462 + # transform: logit + #vG_alpha: + # ref: 0.75 + # mean_std: [-0.287682072, 0.001] + # rel_std_Q: 0.0026 # 0.001 #0.000133333 + # transform: lognormal # transform to normal distribution + #conf_interval: [0.0001, 0.01] # implicit 0.95 confidence interval + #Th_s: + # transform: none + # + # conf_interval: [0.35, 0.41, 0.9] # explicit 0.9 confidence interval + #calibration_coeffs: + # - z_pos: -10 + # ref: 1 + # mean_std: [1, 0.01] #[0.065, 0.01] + # rel_std_Q: 0.0001 #0.001538462 + #- z_pos: -15 + # ref: 1 + # mean_std: [1, 0.01] #[0.065, 0.01] + # rel_std_Q: 0.0001 #0.001538462 + # - z_pos: -20 + # ref: 1 + # mean_std: [1, 0.01] #[0.065, 0.01] + # rel_std_Q: 0.0001 #0.001538462 + # - z_pos: -40 + # ref: 1 + # mean_std: [1, 0.01] #[0.065, 0.01] + # rel_std_Q: 0.0001 #0.001538462 + # - z_pos: -60 + # ref: 1 + # mean_std: [1, 0.01] #[0.065, 0.01] + # rel_std_Q: 0.0001 #0.001538462 + # - z_pos: -100 + # ref: 1 + # mean_std: [1, 0.01] #[0.065, 0.01] + # rel_std_Q: 0.0001 #0.001538462 + train_measurements: + moisture: + z_pos: [-10, -20, -60] #[-10, -20, -30, -40, -50, -60, -70, -80, -90, -100, 110, -120, -130, -140, -150] ##[-0.5, -2.5] + noise_level: 0.01 + noise_distr_type: gaussian + #pressure: + # z_pos: [-15] + # noise_level: 0.1 + # noise_distr_type: uniform + #velocity: + # z_pos: [0.75] + # noise_level: 0.1 + # noise_distr_type: uniform + test_measurements: + moisture: + z_pos: [-40] + noise_level: 0.01 + noise_distr_type: gaussian + #saturation: + # z_pos: [-10, -20, -40] + # noise_level: 0.01 + # noise_distr_type: gaussian + #pressure: + # z_pos: [1.6, 3.0] + # noise_level: 0.1 + # noise_distr_type: uniform + + + #measurements_data_name: "saturation" # Possible values: "saturation" or "pressure" + #mes_locations_train: + # - 0.5 + # - 1.4 + #- 2.5 + #- 3.5 + #mes_locations_test: + # - 0.2 + # - 0.8 + # - 1.6 + # - 3.0 + #measurements_noise_level: 0.01 + noise_distr_type: gaussian #gaussian + # model_params_noise_level: 0.1 + pressure_saturation_data_noise_level: 0.01 # init state noise + init_cov_P_multiplicator: 1 + #pressure_saturation_data_std: 3.2 + #Q_var: 5.0e-5 + #Q_model_params_var: 1e-8 + sigma_points_params: {"alpha": 0.01, "beta": 2, "kappa": 1} + parallel_sigmas: false +postprocess: + show: True diff --git a/runs/kalman/run_kalman.sh b/runs/kalman/run_kalman.sh new file mode 100755 index 00000000..767793f3 --- /dev/null +++ b/runs/kalman/run_kalman.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +work_dir=$1 +hlavo_dir=$2 + + +docker pull flow123d/hlavo:0.1.0 + +docker run -it --name hlavo_tmp \ + flow123d/hlavo:0.1.0 \ + python3 -m pip install joblib + +docker commit hlavo_tmp flow123d/hlavo:0.1.0 +docker rm hlavo_tmp + +docker run --rm \ + --user $(id -u):$(id -g) \ + -v ${hlavo_dir}:${hlavo_dir} \ + -v ${work_dir}:${work_dir} \ + -e PYTHONPATH=${hlavo_dir} \ + flow123d/hlavo:0.1.0 \ + python3 -m hlavo.kalman.kalman \ + ${work_dir} \ + ${hlavo_dir}/runs/kalman/config_test.yaml diff --git a/runs/kalman/run_parflow_model.py b/runs/kalman/run_parflow_model.py index 2eaf2522..6f5f5b79 100644 --- a/runs/kalman/run_parflow_model.py +++ b/runs/kalman/run_parflow_model.py @@ -4,7 +4,7 @@ import argparse import numpy as np from hlavo.soil_parflow.parflow_model import ToyProblem -from hlavo.kalman.visualization.plots import RichardsSolverOutput, plot_richards_output +from hlavo.kalman.plots import RichardsSolverOutput, plot_richards_output def run_parflow_model(tmp_path: Path): diff --git a/tests/deep_model/run_deep_model_test.py b/tests/deep_model/run_deep_model_test.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/model_1d/drv_vegm.dat b/tests/model_1d/drv_vegm.dat new file mode 100644 index 00000000..247b04dd --- /dev/null +++ b/tests/model_1d/drv_vegm.dat @@ -0,0 +1,3 @@ + x y lat lon sand clay color fractional coverage of grid by vegetation class (Must/Should Add to 1.0) + (Deg) (Deg) (%/100) index 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 + 1 1 50.0 14.5 0.16 0.265 2 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 diff --git a/tests/model_1d/drv_vegp.dat b/tests/model_1d/drv_vegp.dat new file mode 100644 index 00000000..a460e09b --- /dev/null +++ b/tests/model_1d/drv_vegp.dat @@ -0,0 +1,111 @@ +!========================================================================= +! +! CLMCLMCLMCLMCLMCLMCLMCLMCL A community developed and sponsored, freely +! L M available land surface process model. +! M --COMMON LAND MODEL-- C +! C L CLM WEB INFO: http://clm.gsfc.nasa.gov +! LMCLMCLMCLMCLMCLMCLMCLMCLM CLM ListServ/Mailing List: +! +!========================================================================= +! clm_vegp.dat: +! +! DESCRIPTION: +! Vegetation parameter data file for IGBP classification. +! Please note that any other classification can be used with CLM +! provided that the following parameter values are given. +! This data file is read in by drv_getvegp.f90 - this program expects +! the format given below, but does not assume an exact parameter order. +! +! REVISION HISTORY: +! 15 September 1999: Yongjiu Dai; Initial code +! 15 December 1999: Paul Houser; F90 Revision +! 25 November 2000: Mariana Vertenstein +!========================================================================= +!IGBP Land Cover Types (other classes can be used by changing this file) +! 1 evergreen needleleaf forests +! 2 evergreen broadleaf forests +! 3 deciduous needleleaf forests +! 4 deciduous broadleaf forests +! 5 mixed forests +! 6 closed shrublands +! 7 open shrublands +! 8 woody savannas +! 9 svannas +! 10 grasslands +! 11 permanent wetlands +! 12 croplands +! 13 urban and built-up lands +! 14 cropland / natural vegetation mosaics +! 15 snow and ice +! 16 barren or sparsely vegetated +! 17 water bodies +! 18 bare soil +!========================================================================= +! +itypwat (1-soil, 2-land ice, 3-deep lake, 4-shallow lake, 5-wetland: swamp, marsh) +1 1 1 1 1 1 1 1 1 1 5 1 1 1 2 1 3 1 +! +lai Maximum leaf area index [-] +6.00 6.00 6.00 6.00 6.00 6.00 6.00 6.00 6.00 2.00 6.00 6.00 5.00 6.00 0.00 6.00 0.00 0.00 +! +lai0 Minimum leaf area index [-] +5.00 5.00 1.00 1.00 3.00 2.00 1.00 2.00 1.00 0.50 0.50 0.50 1.00 2.00 0.00 0.50 0.00 0.00 +! +sai Stem area index [-] +2.00 2.00 2.00 2.00 2.00 2.00 2.00 2.00 2.00 4.00 2.00 0.50 2.00 2.00 2.00 2.00 2.00 0.00 +! +z0m Aerodynamic roughness length [m] +1.00 2.20 1.00 0.80 0.80 0.10 0.10 0.70 0.10 0.03 0.03 0.06 0.50 0.06 0.01 0.05 0.002 0.01 +! +displa Displacement height [m] +11.0 23.00 11.0 13.0 13.0 0.30 0.30 6.50 0.70 0.30 0.30 0.30 3.00 0.30 0.00 0.10 0.00 0.00 +! +dleaf Leaf dimension [m] +0.04 0.04 0.04 0.04 0.04 0.04 0.04 0.04 0.04 0.04 0.04 0.04 0.04 0.04 0.04 0.04 0.04 0.00 +! +roota Fitted numerical index of rooting distribution +7.00 7.00 7.00 6.00 5.00 6.00 5.00 6.00 5.00 1.00 6.00 6.00 5.00 5.00 0.00 5.00 0.00 0.00 +! +rootb Fitted numerical index of rooting distribution +2.00 1.00 2.00 2.00 1.50 1.50 2.50 2.50 1.00 2.50 2.00 2.50 2.00 2.00 0.00 2.00 0.00 0.00 +! +rhol_vis !leaf reflectance vis +0.07 0.10 0.07 0.10 0.08 0.08 0.08 0.09 0.11 0.11 0.11 0.11 0.09 0.09 -99. 0.09 -99. -99. +! +rhol_nir !leaf reflectance nir +0.35 0.45 0.35 0.45 0.40 0.40 0.40 0.49 0.58 0.58 0.35 0.58 0.47 0.47 -99. 0.47 -99. -99. +! +rhos_vis !stem reflectance vis +0.16 0.16 0.16 0.16 0.16 0.16 0.16 0.26 0.36 0.36 0.36 0.36 0.24 0.24 -99. 0.24 -99. -99. +! +rhos_nir !stem reflectance nir +0.39 0.39 0.39 0.39 0.39 0.39 0.39 0.48 0.58 0.58 0.39 0.58 0.47 0.47 -99. 0.47 -99. -99. +! +taul_vis !leaf transmittance vis +0.05 0.05 0.05 0.05 0.05 0.05 0.05 0.06 0.07 0.07 0.07 0.07 0.06 0.06 -99. 0.06 -99. -99. +! +taul_nir !leaf transmittance nir +0.10 0.25 0.10 0.25 0.17 0.17 0.17 0.21 0.25 0.25 0.10 0.25 0.20 0.20 -99. 0.20 -99. -99. +! +taus_vis !stem transmittance vis +0.001 0.001 0.001 0.001 0.001 0.001 0.001 0.11 0.22 0.22 0.22 0.22 0.09 0.09 -99. 0.09 -99. -99. +! +taus_nir !stem transmittance nir +0.001 0.001 0.001 0.001 0.001 0.001 0.001 0.19 0.38 0.38 0.001 0.38 0.15 0.15 -99. 0.15 -99. -99. +! +xl !leaf/stem orientation index +0.01 0.10 0.01 0.25 0.13 0.13 0.13 -0.08 -0.3 -0.3 -0.3 -0.3 -0.07 -0.07 -99. -0.07 -99. -99. +! +vw !btran exponent:[(h2osoi_vol-watdry)/(watopt-watdry)]**vw +1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. -99. 1. -99. -99. +! +irrig !(irrig=0 -> no irrigation, irrig=1 -> irrigate) +0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 +! + + + + + + + diff --git a/tests/model_1d/model_1d_test_run.py b/tests/model_1d/model_1d_test_run.py new file mode 100644 index 00000000..4e38e430 --- /dev/null +++ b/tests/model_1d/model_1d_test_run.py @@ -0,0 +1,126 @@ +import os +import numpy as np +import xarray as xr +import pytest +from pathlib import Path + +#from dask.distributed import Client, LocalCluster + +from hlavo.composed.model_1d import Model1D +from hlavo.composed.composed_model_mock import relative_to_absolute_paths +from filterpy.kalman import UnscentedKalmanFilter +from hlavo.kalman.parallel_ukf import ParallelUKF + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def load_config(config_path: Path): + import yaml + with config_path.open("r") as f: + return yaml.safe_load(f) + + +def build_model(): + """ + Shared factory usable by pytest fixtures AND __main__. + """ + work_dir = Path(__file__).parent + config_file = work_dir / 'composed_1d_part_config_test.yaml' + composed_model_config_path = Path(config_file).resolve() + + composed_model_config = load_config(composed_model_config_path) + + composed_model_config = relative_to_absolute_paths(composed_model_config, work_dir) + + site_id = 1 + seed = composed_model_config["seed"] + + kalman_config_path = composed_model_config["1d_models"][0] + model_1d_config = load_config(Path(kalman_config_path).resolve()) + model_1d_config = relative_to_absolute_paths(model_1d_config, work_dir) + + model = Model1D( + site_id=site_id, + initial_state=0.0, + work_dir=work_dir, + model_kalman_config_dict=model_1d_config, + seed=seed + ) + + return model + + +# --------------------------------------------------------------------------- +# Pytest fixtures +# --------------------------------------------------------------------------- +@pytest.fixture +def model(): + return build_model() + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +def test_measurements_preparation(model): + ukf = model._prepare_kalman_measurements() + + assert isinstance(ukf, (UnscentedKalmanFilter, ParallelUKF)) + assert isinstance(model.measurements_dataset, xr.Dataset) + assert np.all(model.measurements_dataset["site_id"].values == model.site_id) + + +def test_measurement_for_time(model): + start_datetime = np.datetime64("2025-03-18T01:00:00") + end_datetime = np.datetime64("2025-04-16T03:00:00") + + meas = model.get_dataset_values_for_time(model.measurements_dataset, start_datetime, end_datetime) + times = meas["date_time"].values + + assert times.min() >= start_datetime + assert times.max() <= end_datetime + + +def test_get_long_lat(model): + target = np.datetime64("2025-03-18T00:00:00") + lon, lat = model.get_long_lat(target) + + assert lon == 14.889853 + assert lat == 50.863565 + + +def test_step(model): + start = np.datetime64("2025-03-06T00:00:00") + target = np.datetime64("2025-03-06T02:00:00") + + result = model.step(start, target, pressure_at_bottom=1.0) + + assert result is not None + assert hasattr(result, "velocity") + + +# --------------------------------------------------------------------------- +# __main__ support (manual execution) +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + print("Running tests manually...\n") + + model = build_model() + + test_measurements_preparation(model) + print("✓ measurements_preparation passed") + + test_measurement_for_time(model) + print("✓ measurement_for_time passed") + + test_get_long_lat(model) + print("✓ get_long_lat passed") + + test_step(model) + print("✓ step passed") + + + print("\nAll tests passed.") diff --git a/tests/soil_parflow/test_parflow_model.py b/tests/soil_parflow/test_parflow_model.py index f63684ca..f9c52ecd 100644 --- a/tests/soil_parflow/test_parflow_model.py +++ b/tests/soil_parflow/test_parflow_model.py @@ -29,6 +29,10 @@ "Geom.domain.RelPerm.Alpha", ], }, + "use_clm": True, + "clm_files": { + "drv_vegm_file": "drv_vegm.dat", # relative path to composed config file + "drv_vegp_file": "drv_vegm.dat"} } @@ -89,7 +93,7 @@ def test_parflow_model_with_clm(): coords=dict( lon=("loc", lon), lat=("loc", lat), - time=time, + date_time=time, ), attrs=dict( description="Meteorological dataset from CHMI opendata.",