Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/endorse/common/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from .memoize import CallCache, memoize, File
from .common import substitute_placeholders, sample_from_population, workdir
from .memoize import EndorseCache, memoize, File
from .common import substitute_placeholders, sample_from_population, workdir, force_mkdir
from .config import dotdict, load_config, apply_variant, dump_config
from .report import report
from .flow_call import call_flow, FlowOutput
from .flow_call import flow_call, flow_check, FlowOutput


year = 365.2425 * 24 * 60 * 60
14 changes: 14 additions & 0 deletions src/endorse/common/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,20 @@ def __exit__(self, type, value, traceback):
shutil.rmtree(self.work_dir)


def force_mkdir(path, force=False):
"""
Make directory 'path' with all parents,
remove the leaf dir recursively if it already exists.
:param path: path to directory
:param force: if dir already exists then remove it and create new one
:return: None
"""
if force:
if os.path.isdir(path):
shutil.rmtree(path)
os.makedirs(path, mode=0o775, exist_ok=True)


def substitute_placeholders(file_in: str, file_out: str, params: Dict[str, Any]):
"""
In the template `file_in` substitute the placeholders in format '<name>'
Expand Down
3 changes: 1 addition & 2 deletions src/endorse/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,11 @@ def _item_update(key:Key, val:dotdict, sub_path:Key, sub:dotdict):
def deep_update(cfg: dotdict, iter:AddrIter, substitute:dotdict):
if iter.is_leaf():
return substitute
new_cfg = dotdict(cfg)
if isinstance(cfg, list):
key, sub_path = iter.idx()
new_cfg = list(cfg)
elif isinstance(cfg, (dict, dotdict)):
key, sub_path = iter.key()
new_cfg = dotdict(cfg)
else:
raise TypeError(f"Variant substitution: Unknown type {type(cfg)}")
new_cfg[key] = deep_update(cfg[key], sub_path, substitute)
Expand Down
116 changes: 80 additions & 36 deletions src/endorse/common/flow_call.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from typing import *
import logging
import os
import attrs
from . import dotdict, memoize, File, report, substitute_placeholders, workdir
from . import dotdict, memoize, File, report, substitute_placeholders
import bp_simunek.common
import subprocess
import yaml
from pathlib import Path

def search_file(basename, extensions):
"""
Expand All @@ -21,7 +22,7 @@ class EquationOutput:
def __init__(self, eq_name, balance_name):
self.eq_name: str = eq_name
self.spatial_file: File = search_file(eq_name+"_fields", (".msh", ".pvd"))
self.balance_file: File = search_file(balance_name+"_balance", ".txt"),
self.balance_file: File = search_file(balance_name+"_balance", (".yaml", ".txt"))
self.observe_file: File = search_file(eq_name+"_observe", ".yaml")

def _load_yaml_output(self, file, basename):
Expand Down Expand Up @@ -52,12 +53,13 @@ def balance_df(self):

class FlowOutput:

def __init__(self, process: subprocess.CompletedProcess, stdout: File, stderr: File, output_dir="output"):
def __init__(self, workdir: Path, process: subprocess.CompletedProcess):
self.process = process
self.stdout = stdout
self.stderr = stderr
self.failed_convergence_reason = 0
with workdir(output_dir):
output_dir = "output"
with bp_simunek.common.workdir(str(workdir)):
self.stdout = File("stdout")
self.stderr = File("stderr")
with bp_simunek.common.workdir(str(workdir/output_dir)):
self.log = File("flow123.0.log")
# TODO: flow ver 4.0 unify output file names
self.hydro = EquationOutput("flow", "water")
Expand Down Expand Up @@ -89,52 +91,94 @@ def check_conv_reasons(self):
continue
return True

@memoize
def _prepare_inputs(file_in, params):
in_dir, template = os.path.split(file_in)

def _prepare_inputs(workdir, file_in, params):
in_dir, template = os.path.split(file_in.path)
suffix = "_tmpl.yaml"
assert template[-len(suffix):] == suffix
filebase = template[:-len(suffix)]
main_input = filebase + ".yaml"
main_input, used_params = substitute_placeholders(file_in, main_input, params)
main_input = workdir/(filebase + ".yaml")
main_input, used_params = substitute_placeholders(file_in.path, main_input, params)
return main_input

@memoize

def _flow_subprocess(arguments, main_input):
filebase, ext = os.path.splitext(os.path.basename(main_input.path))
arguments.append(main_input.path)
logging.info("Running Flow123d: " + " ".join(arguments))
logging.info("Flow123d running with: " + " ".join(arguments))

stdout_path = filebase + "_stdout"
stderr_path = filebase + "_stderr"
stdout_path = "stdout"
stderr_path = "stderr"
with open(stdout_path, "w") as stdout:
with open(stderr_path, "w") as stderr:
completed = subprocess.run(arguments, stdout=stdout, stderr=stderr)
return File(stdout_path), File(stderr_path), completed
completed_process = subprocess.run(arguments, stdout=stdout, stderr=stderr)
return completed_process, File(stdout_path), File(stderr_path)


@report
@memoize
def call_flow(cfg:'dotdict', file_in:File, params: Dict[str,str]) -> FlowOutput:
def flow_call_with_check(workdir: Path, executable_list, input_template: File, params: Dict[str,str],
result_files:List[Path]=[], timeout=0):
"""
Common call of `flow_call` and `flow_check`.
"""
completed_process, stdout, stderr = flow_call(workdir, executable_list, input_template, params)
res, fo = flow_check(workdir, completed_process, result_files, timeout)
return res, fo


@report
def flow_call(workdir: Path, executable_list, input_template: File, params: Dict[str,str])\
-> (subprocess.CompletedProcess, File, File):
"""
Run Flow123d in actual work dir with main input given be given template and dictionary of parameters.

1. prepare the main input file from filebase_in + "_tmpl.yamlL"
2. run Flow123d
1. change dir to workdir (resolve abs path)
2. prepare the main input file from input_template file: suppose filename ends with + "_tmpl.yaml"
3. run Flow123d

Returns CompletedProcess (by subprocess), standard output, standard error output.

TODO: pass only flow configuration
"""
main_input = _prepare_inputs(file_in, params)
stdout, stderr, completed = _flow_subprocess(cfg.flow_executable.copy(), main_input)
logging.info(f"Exit status: {completed.returncode}")
if completed.returncode != 0:
with open(stderr.path, "r") as stderr:
print(stderr.read())
raise Exception("Flow123d ended with error")

fo = FlowOutput(completed, stdout.path, stderr.path)
conv_check = fo.check_conv_reasons()
logging.info(f"converged: {conv_check}")
return fo
workdir_abs = workdir.absolute()
orig_dir = os.getcwd()
os.chdir(workdir)

main_input = _prepare_inputs(workdir_abs, input_template, params)
completed_process, stdout, stderr = _flow_subprocess(executable_list.copy(), main_input)
logging.info(f"Flow123d exit status: {completed_process.returncode}")

os.chdir(orig_dir)
return completed_process, stdout, stderr


def flow_check(workdir: Path, completed_process, result_files:List[Path]=[], timeout=0) -> (bool, FlowOutput):
"""
Check results of Flow123d, possibly output of `flow_call`.

Create FlowOutput object.
If any `result_files` are requested, check their existence.
If they exist, then check Flow123d for convergence reason and return.
Else check only the return code of subprocess(Flow123d) and return.

TODO: wait for existence of output files in given timeout
"""
res = False
fo = FlowOutput(workdir, completed_process)

# check the user requsted result files exist:
if all([(workdir/f).exists() for f in result_files]):
conv_check = fo.check_conv_reasons()
logging.info(f"Flow123d convergence reason: {conv_check}")
res = conv_check >= 0 # Flow123d algebraic solver converged
return res, fo

# if completed_process.returncode != 0:
# with open(fo.stderr.path, "r") as stderr:
# print(stderr.read())
# raise Exception("Flow123d ended with error")

res = completed_process.returncode == 0
return res, fo

# TODO:
# - call_flow variant with creating dir, copy,
Expand Down
125 changes: 48 additions & 77 deletions src/endorse/common/memoize.py
Original file line number Diff line number Diff line change
@@ -1,97 +1,65 @@
import logging
from typing import *
# import redis_cache
import pathlib
import joblib
#import redis_cache
import hashlib
from functools import wraps
import time
import os

"""
Caching of pure function calls currently based on the joblib.
- File wrapper class allows safe file results with appropriate hashes.

TODO:
- support for other storage methods
- hashing of function implementation and subcalls
(working prototype in endorse-experiment, but does not generilize to more complex programs)
"""
TODO: modify redis_simple_cache or our memoize decorator to hash also function code
see https://stackoverflow.com/questions/18134087/how-do-i-check-if-a-python-function-changed-in-live-code
that one should aslo hash called function .. the whole tree
more over we should also hash over serialization of classes

- program execution view in browser (? how related to Ray, Dask, ..)

"""


class CallCache:
"""
Global singleton for the function call cache.
Configuration is lazy: parameters passed to the instance method
are stored and updated by subsequent instance calls, but the actual
instance is created during first call of the memoized function.
"""
__instance_args__ = {}
__singleton_instance__ = None

@staticmethod
def __instance__():
if CallCache.__singleton_instance__ is None:
CallCache.__singleton_instance__ = CallCache(**CallCache.__instance_args__)
return CallCache.__singleton_instance__

@staticmethod
def instance(**kwargs):
"""
Parameters:
workdir - str or Path where to place the cache
expire_all - if True, delete whole cache

parameters passed to joblib.Memory:
verbose
"""
CallCache.__instance_args__.update(kwargs)

def __init__(self, workdir="", expire_all=False, **kwargs):
# TODO: possibly start redis server
self.workdir = pathlib.Path(workdir)

self.mem_cache = joblib.Memory(
location=self.workdir / "joblib_cache",
**kwargs)

if expire_all:
self.mem_cache.clear()

def expire_all(self):
"""
Deprecated, call instance with 'expire_all=True' instead.
"""
CallCache.instance(expire_all=True)

class EndorseCache:
pass
# __instance__ = None
# @staticmethod
# def instance(*args, **kwargs):
# if EndorseCache.__instance__ is None:
# EndorseCache.__instance__ = EndorseCache(*args, **kwargs)
# return EndorseCache.__instance__
#
# def __init__(self, host="localhost", port=6379):
# # TODO: possibly start redis server
# self.cache = redis_cache.SimpleCache(10000, hashkeys=True, host=host, port=port)
#
#
# def expire_all(self):
# self.cache.expire_all_in_set()

# Workaround missing module in the function call key
# def memoize():
# endorse_cache = EndorseCache.__instance__
# def decorator(fn):
# # redis-simple-cache does not include the function module into the key
# # we poss in a functions with additional parameter
# def key_fn(fn_id , *args, **kwargs):
# return fn(*args, **kwargs)
#
# modif_fn = redis_cache.cache_it(limit=10000, expire=redis_cache.DEFAULT_EXPIRY, cache=endorse_cache.cache)(key_fn)
#
# @wraps(fn)
# def wrapper(*args, **kwargs):
# return modif_fn((fn.__name__, fn.__module__), *args, **kwargs)
# return wrapper
# return decorator

def memoize(fn):
decorated_fn = None
@wraps(fn)
def wrapper(*args, **kwargs):
nonlocal decorated_fn
if decorated_fn is None:
mem: joblib.Memory = CallCache.__instance__().mem_cache
decorated_fn = mem.cache(fn)
return decorated_fn(*args, **kwargs)
return wrapper

return fn
# endorse_cache = EndorseCache.instance()
# redis_cache_deco = redis_cache.cache_it(limit=10000, expire=redis_cache.DEFAULT_EXPIRY, cache=endorse_cache.cache)
# return redis_cache_deco(fn)
#



class File:
"""
An object that should represent a file as a computation result.
Use cases:
- pass File(file_path) to a memoize function that reads from a file
- return File(file_path) from a memoize function that writes to a file

Contains the path and the file content hash.
The system should also prevent modification of the files that are already created.
To this end one has to use File.open instead of the standard open().
Expand All @@ -104,8 +72,7 @@ class File:

Ideally, the File class could operate as the file handle and context manager.
However that means calling system open() and then modify its __exit__ method.
However I was unable to do that. Seems like __exit__ is changed, but changed to the original
one smowere latter as
However I was unable to do that. Seems like __exit__ is changed, but changed to the original one smowere latter as
it is not called. Other possibility is to wrap standard file handle and use it like:

@joblib.task
Expand All @@ -114,6 +81,10 @@ def make_file(file_path, content):`
f.handle.write(content)
# called File.__exit__ which calls close(self.handle) and performs hashing.
return f

TODO: there is an (unsuccessful) effort to provide special handle for writting.
TODO: Override deserialization in order to check that the file is unchanged.
Seems that caching just returns the object without actuall checking.
"""

# @classmethod
Expand All @@ -129,8 +100,7 @@ def make_file(file_path, content):`
# """
# return cls(path, postponed=True)
_hash_fn = hashlib.md5

def __init__(self, path: str, files: List['File'] = None): # , hash:Union[bytes, str]=None) #, postponed=False):
def __init__(self, path: str, files:List['File'] = None): # , hash:Union[bytes, str]=None) #, postponed=False):
"""
For file 'path' create object containing both path and content hash.
Optionaly the files referenced by the file 'path' could be passed by `files` argument
Expand Down Expand Up @@ -183,6 +153,7 @@ def __hash__(self):
def __str__(self):
return f"File('{self.path}', hash={self.hash})"


"""
Could be used from Python 3.11
@staticmethod
Expand Down
Loading