Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
5147e34
distr in progress
Feb 12, 2026
da49cee
deep model config update
Feb 15, 2026
2174e6c
start end time config
Feb 15, 2026
e016a2d
seed propagation
Feb 16, 2026
13d2506
kalman save results
Feb 17, 2026
e6c32bb
Merge remote-tracking branch 'origin/JS_fix_dependencies' into MS_distr
Feb 19, 2026
0be4073
run kalman
Feb 19, 2026
6a04622
deep model charon run
Feb 25, 2026
156c8e2
kalman run script
Feb 25, 2026
3c785b4
composed update
Mar 2, 2026
67c7df2
Merge branch 'main' into MS_distr
Mar 2, 2026
6404c64
composed update
Mar 10, 2026
86e9b3d
composed split into 1d and 3d model
Mar 10, 2026
b451a6d
work in progress
Mar 10, 2026
9e7c422
model1d test in progress
Mar 10, 2026
dc9bc42
model 1d in progress
Mar 10, 2026
09992a0
data classes
Mar 10, 2026
23a4c80
1d model in progress
Mar 12, 2026
f0d3505
Merge remote-tracking branch 'origin/main' into MS_distr
jbrezmorf Mar 13, 2026
f52124e
model 1d progress
Mar 17, 2026
c4d5b80
Merge branch 'MS_distr' of https://github.com/jbrezmorf/HLAVO into MS…
Mar 17, 2026
69cf7c5
Merge remote-tracking branch 'origin/JS_clm' into MS_distr
Mar 18, 2026
73c889a
model 1d in progress
Mar 19, 2026
2659e38
mock meteo data procesing
Mar 20, 2026
016f939
model 1d test
Mar 26, 2026
d1b3e96
model 1d readme
Apr 8, 2026
159dc2f
readme update
Apr 8, 2026
f829dfa
Merge branch 'main' into MS_distr
jbrezmorf Apr 23, 2026
53f832e
meteo usage
Apr 23, 2026
ec4d33e
Merge branch 'MS_distr' of https://github.com/jbrezmorf/HLAVO into MS…
Apr 23, 2026
f7262e1
model 1d run fix
Apr 23, 2026
c6463ac
1d model run
Apr 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev/conda-requirements.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies:
- matplotlib

# Data / parallel
- joblib
- xarray
- dask
- distributed
Expand Down
10 changes: 5 additions & 5 deletions hlavo/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# DRAFT
"""Top-level HLAVO package exports."""

from . import composed
from . import deep_model
from . import kalman
from . import soil_parflow
from . import soil_py
# from . import composed
# from . import deep_model
# from . import kalman
# from . import soil_parflow
# from . import soil_py

__all__ = [
"soil_py",
Expand Down
6 changes: 4 additions & 2 deletions hlavo/composed/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
# DRAFT
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • add README-md files to individual runs dirs, describe:
    • how to run it (including getting SIF images on charon)
    • on which data it depends and how to get them
    • what the run test, what kind of calculation it performs

"""Exports for composed."""

from .composed_model_mock import Model1D, Model3D, setup_models
from .composed_model_mock import setup_models
from .model_1d import Model1D
from .model_3d import Model3D
from .run_map import main

__all__ = [
"main",
"setup_models",
"Model1D",
"Model3D",
"Model3D"
]
207 changes: 67 additions & 140 deletions hlavo/composed/composed_model_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,156 +21,57 @@
keep relation to the date time series of the measurements.

"""
import os
import sys
import argparse
import numpy as np
from pathlib import Path
import numpy as np
from dask.distributed import Client, LocalCluster, get_client, Queue
from hlavo.kalman.kalman import KalmanFilter
from hlavo.ingress.moist_profile.load_data import load_data


# ---------------------------------------------------------------------------
# 1D model class
# ---------------------------------------------------------------------------

class Model1D:
def __init__(self, idx, initial_state=0.0, work_dir=None, kalman_config={}):
self.idx = idx
self.state = initial_state

self.kalman = KalmanFilter.from_config(work_dir, kalman_config, verbose=False)
self.ukf = self._prepare_kalman_measurements()

#kalman_filter.run() # load measurements

def prepare_kalman_measurements(self):
noisy_measurements, noisy_measurements_to_test, meas_model_iter_flux, measurement_state_flag = load_data(
self.kalman.train_measurements_struc,
self.kalman.test_measurements_struc,
data_csv=self.kalman.measurements_config["measurements_file"],
measurements_config=self.kalman.measurements_config
)

precipitation_list = []
for (time_prec, precipitation) in meas_model_iter_flux:
precipitation_list.extend([precipitation] * time_prec)
self.kalman.measurements_config["precipitation_list"] = precipitation_list

noisy_measurements, noisy_measurements_to_test, measurement_state_flag_sampled, meas_model_iter_time, meas_model_iter_flux = \
self.kalman.process_loaded_measurements(noisy_measurements, noisy_measurements_to_test, measurement_state_flag)

sample_variance = np.nanvar(noisy_measurements, axis=0)
measurement_noise_covariance = np.diag(sample_variance)

self.kalman.results.times_measurements = np.cumsum(meas_model_iter_time)
self.kalman.results.precipitation_flux_measurements = meas_model_iter_flux
from hlavo.composed.model_1d import Model1D
from hlavo.composed.model_3d import Model3D

return self.set_kalman_filter(measurement_noise_covariance)
###
# Process paths
###

def step(self, target_time, data_for_step):
print(f"[1D {self.idx}] step at t={target_time}, "
f"data={data_for_step}, current_state={self.state}")
self.state += data_for_step
print(f"[1D {self.idx}] new state={self.state}")
return self.state
def relative_to_absolute_paths(config_dict, base_dir: Path):
if isinstance(config_dict, dict):
for k, v in config_dict.items():
if isinstance(v, str) and k.endswith(("_file", "_path", "_dir")):
p = Path(v)
if not p.is_absolute():
config_dict[k] = (base_dir / p).resolve()
else:
relative_to_absolute_paths(v, base_dir)

def run_loop(self, t_end, queue_name_in, queue_name_out):
"""
Input queue processing loop for the 1D model.
"""
#client = get_client()
q_in = Queue(queue_name_in) # correct API
q_out = Queue(queue_name_out)
elif isinstance(config_dict, list):
for i, v in enumerate(config_dict):
if isinstance(v, str):
# optional: only resolve if needed
p = Path(v)
if not p.is_absolute():
config_dict[i] = (base_dir / p).resolve()
else:
relative_to_absolute_paths(v, base_dir)

current_time = 0.0
return config_dict

while current_time < t_end:
target_time, data = q_in.get() # blocks
contribution = self.step(target_time, data) # # call kalman, meas - neni ve fronte, musi se odnekud nacist
q_out.put((self.idx, target_time, contribution))
print(f"[1D {self.idx}] sent contribution={contribution} at t={target_time}")

current_time = target_time

print(f"[1D {self.idx}] finished loop at t={current_time} (t_end={t_end})")
return f"1D model {self.idx} done; final state={self.state}"


def model1d_worker_entry(idx, t_end, queue_name_in, queue_name_out, work_dir, kalman_config):
def model1d_worker_entry(idx, start_datetime, end_datetime, queue_name_in, queue_name_out, work_dir, model_kalman_config_dict, seed=123):
"""
Entry function running on a Dask worker.
"""
model = Model1D(idx=idx, initial_state=0.0, work_dir=work_dir, kalman_config=kalman_config)
return model.run_loop(t_end, queue_name_in, queue_name_out)


# ---------------------------------------------------------------------------
# 3D model class
# ---------------------------------------------------------------------------

class Model3D:
def __init__(self, n_1d, initial_state=0.0, initial_time=0.0, base_dt=1.0):
self.n_1d = n_1d
self.state = initial_state
self.time = initial_time
self.base_dt = base_dt

def choose_dt(self, t_end):
remaining = t_end - self.time
return max(min(self.base_dt, remaining), 0.0)

def step(self, target_time, contributions):
print(f"[3D] step to t={target_time}, "
f"current_state={self.state}, contributions={contributions}")
total_contrib = sum(contributions)
self.state += total_contrib
self.time = target_time
print(f"[3D] new state={self.state}")
return self.state

def run_loop(self, t_end, queue_names_out_to_1d, queue_name_in_from_1d):
client = get_client()
q_3d_to_1d = [Queue(name) for name in queue_names_out_to_1d]
q_1d_to_3d = Queue(queue_name_in_from_1d)

while self.time < t_end:
dt = self.choose_dt(t_end)
if dt <= 0.0:
print("[3D] dt <= 0, stopping to avoid infinite loop.")
break

target_time = self.time + dt
print(f"\n[3D] === Step: t={self.time} -> t={target_time} ===")
print(f"[3D] current state={self.state}")

# send to 1D
for i in range(self.n_1d):
data_for_i = self.state + i # dummy placeholder
print(f"[3D] sending to 1D {i}: t={target_time}, data={data_for_i}")
q_3d_to_1d[i].put((target_time, data_for_i))

# receive contributions
contributions = [None] * self.n_1d
received = 0

while received < self.n_1d:
idx, t_recv, contrib = q_1d_to_3d.get()
print(f"[3D] received from 1D {idx}: t={t_recv}, contrib={contrib}")
contributions[idx] = contrib
received += 1

self.step(target_time, contributions)

print(f"[3D] finished time loop at t={self.time} (t_end={t_end}), state={self.state}")
return self.state
model = Model1D(site_id=idx, initial_state=0.0, work_dir=work_dir, model_kalman_config_dict=model_kalman_config_dict, seed=seed)
return model.run_loop(start_datetime, end_datetime, queue_name_in, queue_name_out)


# ---------------------------------------------------------------------------
# Setup function
# ---------------------------------------------------------------------------

def setup_models(n_1d, t_end, work_dir, kalman_config):
def setup_models(work_dir, config_dir, start_datetime, end_datetime, deep_model_config):
client = get_client()

queue_names_3d_to_1d = []
Expand All @@ -179,26 +80,36 @@ def setup_models(n_1d, t_end, work_dir, kalman_config):
queue_name_1d_to_3d = "q-1d-to-3d"
Queue(queue_name_1d_to_3d, client=client) # ensure creation

for i in range(n_1d):
q_name_3d_to_1d = f"q-3d-to-1d-{i}"
for i, model_1d_config in enumerate(deep_model_config["1d_models"]):
model_1d_config_path = config_dir / Path(model_1d_config)
model_kalman_config_dict = load_config(model_1d_config_path)

model_kalman_config_dict = relative_to_absolute_paths(model_kalman_config_dict, config_dir)

model_work_dir = os.path.join(work_dir, "model_1d_{}".format(i+1))
print("model_work_dir ", model_work_dir)
os.makedirs(model_work_dir, exist_ok=True)

q_name_3d_to_1d = f"q-3d-to-1d-{i+1}"
Queue(q_name_3d_to_1d, client=client) # ensure creation

queue_names_3d_to_1d.append(q_name_3d_to_1d)

fut = client.submit(
model1d_worker_entry,
i,
t_end,
i+1,
start_datetime, end_datetime,
q_name_3d_to_1d,
queue_name_1d_to_3d, work_dir, kalman_config,
queue_name_1d_to_3d, model_work_dir, model_kalman_config_dict, seed=deep_model_config["seed"],
pure=False,
)
futures_1d.append(fut)
print(f"[SETUP] Submitted Model1D idx={i}")

model_3d = Model3D(n_1d=n_1d)

model_3d = Model3D(n_1d=i+1)
final_state_3d = model_3d.run_loop(
t_end,
start_datetime, end_datetime,
queue_names_out_to_1d=queue_names_3d_to_1d,
queue_name_in_from_1d=queue_name_1d_to_3d,
)
Expand All @@ -214,23 +125,39 @@ def setup_models(n_1d, t_end, work_dir, kalman_config):
# Main
# ---------------------------------------------------------------------------

def load_config(config_path):
import yaml
with config_path.open("r") as f:
config_dict = yaml.safe_load(f)
return config_dict


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('work_dir', help='Path to work dir')
parser.add_argument('config_file', help='Path to configuration file')

args = parser.parse_args(sys.argv[1:])

work_dir = Path(args.work_dir)
kalman_config = Path(args.config_file).resolve()
composed_config_path = Path(args.config_file).resolve()
config_dir = composed_config_path.parent

composed_config = load_config(composed_config_path)

cluster = LocalCluster(n_workers=4, threads_per_worker=1)
client = Client(cluster)

n_1d = 3
t_end = 5.0
start_datetime = np.datetime64(composed_config["start_datetime"])
end_datetime = np.datetime64(composed_config["end_datetime"])

print("start date time ", start_datetime)
print("end date time ", end_datetime)

if end_datetime <= start_datetime:
raise ValueError("end-datetime must be after start-datetime")

final_state = setup_models(n_1d, t_end, work_dir, kalman_config)
final_state = setup_models(work_dir, config_dir, start_datetime, end_datetime, composed_config)
print("\n[MAIN] Final 3D state:", final_state)

client.close()
Expand Down
Loading
Loading