From 06db2aa956bccb06f25fe7aa20bba08c4ffa96ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20=C5=A0im=C5=AFnek?= Date: Thu, 27 Nov 2025 11:05:00 +0100 Subject: [PATCH] Add modified src/common files --- src/endorse/common/__init__.py | 6 +- src/endorse/common/common.py | 14 ++++ src/endorse/common/config.py | 7 +- src/endorse/common/flow_call.py | 115 ++++++++++++++++++++--------- src/endorse/common/memoize.py | 125 ++++++++++++-------------------- 5 files changed, 148 insertions(+), 119 deletions(-) diff --git a/src/endorse/common/__init__.py b/src/endorse/common/__init__.py index a55c19c9..b953d20c 100644 --- a/src/endorse/common/__init__.py +++ b/src/endorse/common/__init__.py @@ -1,7 +1,7 @@ -from .memoize import CallCache, memoize, File -from .common import substitute_placeholders, sample_from_population, workdir +from .memoize import EndorseCache, memoize, File +from .common import substitute_placeholders, sample_from_population, workdir, force_mkdir from .config import dotdict, load_config, apply_variant, dump_config from .report import report -from .flow_call import call_flow, FlowOutput +from .flow_call import flow_call, flow_check, FlowOutput year = 365.2425 * 24 * 60 * 60 \ No newline at end of file diff --git a/src/endorse/common/common.py b/src/endorse/common/common.py index 63c15fed..fc73201b 100644 --- a/src/endorse/common/common.py +++ b/src/endorse/common/common.py @@ -91,6 +91,20 @@ def __exit__(self, type, value, traceback): shutil.rmtree(self.work_dir) +def force_mkdir(path, force=False): + """ + Make directory 'path' with all parents, + remove the leaf dir recursively if it already exists. + :param path: path to directory + :param force: if dir already exists then remove it and create new one + :return: None + """ + if force: + if os.path.isdir(path): + shutil.rmtree(path) + os.makedirs(path, mode=0o775, exist_ok=True) + + def substitute_placeholders(file_in: str, file_out: str, params: Dict[str, Any]): """ In the template `file_in` substitute the placeholders in format '' diff --git a/src/endorse/common/config.py b/src/endorse/common/config.py index 54c62d3a..28d9ef04 100644 --- a/src/endorse/common/config.py +++ b/src/endorse/common/config.py @@ -119,12 +119,11 @@ def _item_update(key:Key, val:dotdict, sub_path:Key, sub:dotdict): def deep_update(cfg: dotdict, iter:PathIter, substitute:dotdict): if iter.is_leaf(): return substitute + new_cfg = dotdict(cfg) if isinstance(cfg, list): key, sub_path = iter.idx() - new_cfg = list(cfg) elif isinstance(cfg, (dict, dotdict)): key, sub_path = iter.key() - new_cfg = dotdict(cfg) else: raise TypeError(f"Variant substitution: Unknown type {type(cfg)}") new_cfg[key] = deep_update(cfg[key], sub_path, substitute) @@ -146,11 +145,11 @@ def apply_variant(cfg:dotdict, variant:VariantPatch) -> dotdict: :param variant: dictionary path -> dotdict :return: """ - new_cfg = dotdict.create(cfg) + new_cfg = cfg for path_str, val in variant.items(): path = tuple(path_str.split('/')) assert path - new_cfg = deep_update(new_cfg, PathIter(path), dotdict.create(val)) + new_cfg = deep_update(new_cfg, PathIter(path), val) return new_cfg def is_glob_pattern(s): diff --git a/src/endorse/common/flow_call.py b/src/endorse/common/flow_call.py index 948e38fb..34d4f345 100644 --- a/src/endorse/common/flow_call.py +++ b/src/endorse/common/flow_call.py @@ -1,10 +1,11 @@ from typing import * import logging import os -import attrs -from . import dotdict, memoize, File, report, substitute_placeholders, workdir +from . import dotdict, memoize, File, report, substitute_placeholders +import bp_simunek.common import subprocess import yaml +from pathlib import Path def search_file(basename, extensions): """ @@ -21,7 +22,7 @@ class EquationOutput: def __init__(self, eq_name, balance_name): self.eq_name: str = eq_name self.spatial_file: File = search_file(eq_name+"_fields", (".msh", ".pvd")) - self.balance_file: File = search_file(balance_name+"_balance", ".txt"), + self.balance_file: File = search_file(balance_name+"_balance", (".yaml", ".txt")) self.observe_file: File = search_file(eq_name+"_observe", ".yaml") def _load_yaml_output(self, file, basename): @@ -52,11 +53,13 @@ def balance_df(self): class FlowOutput: - def __init__(self, process: subprocess.CompletedProcess, stdout: File, stderr: File, output_dir="output"): + def __init__(self, workdir: Path, process: subprocess.CompletedProcess): self.process = process - self.stdout = stdout - self.stderr = stderr - with workdir(output_dir): + output_dir = "output" + with bp_simunek.common.workdir(str(workdir)): + self.stdout = File("stdout") + self.stderr = File("stderr") + with bp_simunek.common.workdir(str(workdir/output_dir)): self.log = File("flow123.0.log") # TODO: flow ver 4.0 unify output file names self.hydro = EquationOutput("flow", "water") @@ -87,52 +90,94 @@ def check_conv_reasons(self): continue return True -@memoize -def _prepare_inputs(file_in, params): - in_dir, template = os.path.split(file_in) + +def _prepare_inputs(workdir, file_in, params): + in_dir, template = os.path.split(file_in.path) suffix = "_tmpl.yaml" assert template[-len(suffix):] == suffix filebase = template[:-len(suffix)] - main_input = filebase + ".yaml" - main_input, used_params = substitute_placeholders(file_in, main_input, params) + main_input = workdir/(filebase + ".yaml") + main_input, used_params = substitute_placeholders(file_in.path, main_input, params) return main_input -@memoize + def _flow_subprocess(arguments, main_input): - filebase, ext = os.path.splitext(os.path.basename(main_input.path)) arguments.append(main_input.path) - logging.info("Running Flow123d: " + " ".join(arguments)) + logging.info("Flow123d running with: " + " ".join(arguments)) - stdout_path = filebase + "_stdout" - stderr_path = filebase + "_stderr" + stdout_path = "stdout" + stderr_path = "stderr" with open(stdout_path, "w") as stdout: with open(stderr_path, "w") as stderr: - completed = subprocess.run(arguments, stdout=stdout, stderr=stderr) - return File(stdout_path), File(stderr_path), completed + completed_process = subprocess.run(arguments, stdout=stdout, stderr=stderr) + return completed_process, File(stdout_path), File(stderr_path) + @report -@memoize -def call_flow(cfg:'dotdict', file_in:File, params: Dict[str,str]) -> FlowOutput: +def flow_call_with_check(workdir: Path, executable_list, input_template: File, params: Dict[str,str], + result_files:List[Path]=[], timeout=0): + """ + Common call of `flow_call` and `flow_check`. + """ + completed_process, stdout, stderr = flow_call(workdir, executable_list, input_template, params) + res, fo = flow_check(workdir, completed_process, result_files, timeout) + return res, fo + + +@report +def flow_call(workdir: Path, executable_list, input_template: File, params: Dict[str,str])\ + -> (subprocess.CompletedProcess, File, File): """ Run Flow123d in actual work dir with main input given be given template and dictionary of parameters. - 1. prepare the main input file from filebase_in + "_tmpl.yamlL" - 2. run Flow123d + 1. change dir to workdir (resolve abs path) + 2. prepare the main input file from input_template file: suppose filename ends with + "_tmpl.yaml" + 3. run Flow123d + + Returns CompletedProcess (by subprocess), standard output, standard error output. TODO: pass only flow configuration """ - main_input = _prepare_inputs(file_in, params) - stdout, stderr, completed = _flow_subprocess(cfg.flow_executable.copy(), main_input) - logging.info(f"Exit status: {completed.returncode}") - if completed.returncode != 0: - with open(stderr.path, "r") as stderr: - print(stderr.read()) - raise Exception("Flow123d ended with error") - - fo = FlowOutput(completed, stdout.path, stderr.path) - conv_check = fo.check_conv_reasons() - logging.info(f"converged: {conv_check}") - return fo + workdir_abs = workdir.absolute() + orig_dir = os.getcwd() + os.chdir(workdir) + + main_input = _prepare_inputs(workdir_abs, input_template, params) + completed_process, stdout, stderr = _flow_subprocess(executable_list.copy(), main_input) + logging.info(f"Flow123d exit status: {completed_process.returncode}") + + os.chdir(orig_dir) + return completed_process, stdout, stderr + + +def flow_check(workdir: Path, completed_process, result_files:List[Path]=[], timeout=0) -> (bool, FlowOutput): + """ + Check results of Flow123d, possibly output of `flow_call`. + + Create FlowOutput object. + If any `result_files` are requested, check their existence. + If they exist, then check Flow123d for convergence reason and return. + Else check only the return code of subprocess(Flow123d) and return. + + TODO: wait for existence of output files in given timeout + """ + res = False + fo = FlowOutput(workdir, completed_process) + + # check the user requsted result files exist: + if all([(workdir/f).exists() for f in result_files]): + conv_check = fo.check_conv_reasons() + logging.info(f"Flow123d convergence reason: {conv_check}") + res = conv_check >= 0 # Flow123d algebraic solver converged + return res, fo + + # if completed_process.returncode != 0: + # with open(fo.stderr.path, "r") as stderr: + # print(stderr.read()) + # raise Exception("Flow123d ended with error") + + res = completed_process.returncode == 0 + return res, fo # TODO: # - call_flow variant with creating dir, copy, diff --git a/src/endorse/common/memoize.py b/src/endorse/common/memoize.py index 59f6238d..a0cee196 100644 --- a/src/endorse/common/memoize.py +++ b/src/endorse/common/memoize.py @@ -1,97 +1,65 @@ import logging from typing import * -# import redis_cache -import pathlib -import joblib +#import redis_cache import hashlib from functools import wraps import time import os -""" -Caching of pure function calls currently based on the joblib. -- File wrapper class allows safe file results with appropriate hashes. -TODO: -- support for other storage methods -- hashing of function implementation and subcalls - (working prototype in endorse-experiment, but does not generilize to more complex programs) +""" +TODO: modify redis_simple_cache or our memoize decorator to hash also function code see https://stackoverflow.com/questions/18134087/how-do-i-check-if-a-python-function-changed-in-live-code that one should aslo hash called function .. the whole tree more over we should also hash over serialization of classes - -- program execution view in browser (? how related to Ray, Dask, ..) - """ - -class CallCache: - """ - Global singleton for the function call cache. - Configuration is lazy: parameters passed to the instance method - are stored and updated by subsequent instance calls, but the actual - instance is created during first call of the memoized function. - """ - __instance_args__ = {} - __singleton_instance__ = None - - @staticmethod - def __instance__(): - if CallCache.__singleton_instance__ is None: - CallCache.__singleton_instance__ = CallCache(**CallCache.__instance_args__) - return CallCache.__singleton_instance__ - - @staticmethod - def instance(**kwargs): - """ - Parameters: - workdir - str or Path where to place the cache - expire_all - if True, delete whole cache - - parameters passed to joblib.Memory: - verbose - """ - CallCache.__instance_args__.update(kwargs) - - def __init__(self, workdir="", expire_all=False, **kwargs): - # TODO: possibly start redis server - self.workdir = pathlib.Path(workdir) - - self.mem_cache = joblib.Memory( - location=self.workdir / "joblib_cache", - **kwargs) - - if expire_all: - self.mem_cache.clear() - - def expire_all(self): - """ - Deprecated, call instance with 'expire_all=True' instead. - """ - CallCache.instance(expire_all=True) - +class EndorseCache: + pass +# __instance__ = None +# @staticmethod +# def instance(*args, **kwargs): +# if EndorseCache.__instance__ is None: +# EndorseCache.__instance__ = EndorseCache(*args, **kwargs) +# return EndorseCache.__instance__ +# +# def __init__(self, host="localhost", port=6379): +# # TODO: possibly start redis server +# self.cache = redis_cache.SimpleCache(10000, hashkeys=True, host=host, port=port) +# +# +# def expire_all(self): +# self.cache.expire_all_in_set() + +# Workaround missing module in the function call key +# def memoize(): +# endorse_cache = EndorseCache.__instance__ +# def decorator(fn): +# # redis-simple-cache does not include the function module into the key +# # we poss in a functions with additional parameter +# def key_fn(fn_id , *args, **kwargs): +# return fn(*args, **kwargs) +# +# modif_fn = redis_cache.cache_it(limit=10000, expire=redis_cache.DEFAULT_EXPIRY, cache=endorse_cache.cache)(key_fn) +# +# @wraps(fn) +# def wrapper(*args, **kwargs): +# return modif_fn((fn.__name__, fn.__module__), *args, **kwargs) +# return wrapper +# return decorator def memoize(fn): - decorated_fn = None - @wraps(fn) - def wrapper(*args, **kwargs): - nonlocal decorated_fn - if decorated_fn is None: - mem: joblib.Memory = CallCache.__instance__().mem_cache - decorated_fn = mem.cache(fn) - return decorated_fn(*args, **kwargs) - return wrapper - + return fn +# endorse_cache = EndorseCache.instance() +# redis_cache_deco = redis_cache.cache_it(limit=10000, expire=redis_cache.DEFAULT_EXPIRY, cache=endorse_cache.cache) +# return redis_cache_deco(fn) +# class File: """ An object that should represent a file as a computation result. - Use cases: - - pass File(file_path) to a memoize function that reads from a file - - return File(file_path) from a memoize function that writes to a file - Contains the path and the file content hash. The system should also prevent modification of the files that are already created. To this end one has to use File.open instead of the standard open(). @@ -104,8 +72,7 @@ class File: Ideally, the File class could operate as the file handle and context manager. However that means calling system open() and then modify its __exit__ method. - However I was unable to do that. Seems like __exit__ is changed, but changed to the original - one smowere latter as + However I was unable to do that. Seems like __exit__ is changed, but changed to the original one smowere latter as it is not called. Other possibility is to wrap standard file handle and use it like: @joblib.task @@ -114,6 +81,10 @@ def make_file(file_path, content):` f.handle.write(content) # called File.__exit__ which calls close(self.handle) and performs hashing. return f + + TODO: there is an (unsuccessful) effort to provide special handle for writting. + TODO: Override deserialization in order to check that the file is unchanged. + Seems that caching just returns the object without actuall checking. """ # @classmethod @@ -129,8 +100,7 @@ def make_file(file_path, content):` # """ # return cls(path, postponed=True) _hash_fn = hashlib.md5 - - def __init__(self, path: str, files: List['File'] = None): # , hash:Union[bytes, str]=None) #, postponed=False): + def __init__(self, path: str, files:List['File'] = None): # , hash:Union[bytes, str]=None) #, postponed=False): """ For file 'path' create object containing both path and content hash. Optionaly the files referenced by the file 'path' could be passed by `files` argument @@ -183,6 +153,7 @@ def __hash__(self): def __str__(self): return f"File('{self.path}', hash={self.hash})" + """ Could be used from Python 3.11 @staticmethod