From 0d67640287380a62a8675b5c0dde80876005b604 Mon Sep 17 00:00:00 2001 From: martinspetlik Date: Mon, 8 Aug 2022 15:35:19 +0200 Subject: [PATCH 01/11] kurtosis check --- mlmc/estimator.py | 84 +++++++++++++++++++++++++++++- mlmc/plot/diagnostic_plots.py | 0 mlmc/quantity/quantity_estimate.py | 53 +++++++++++++++++-- mlmc/tool/hdf5.py | 2 +- 4 files changed, 133 insertions(+), 6 deletions(-) create mode 100644 mlmc/plot/diagnostic_plots.py diff --git a/mlmc/estimator.py b/mlmc/estimator.py index af4f6e03..9e9fad64 100644 --- a/mlmc/estimator.py +++ b/mlmc/estimator.py @@ -3,6 +3,7 @@ import scipy.integrate as integrate import mlmc.quantity.quantity_estimate as qe import mlmc.tool.simple_distribution +from mlmc.quantity.quantity_estimate import mask_nan_samples from mlmc.quantity.quantity_types import ScalarType from mlmc.plot import plots from mlmc.quantity.quantity_spec import ChunkSpec @@ -16,6 +17,7 @@ def __init__(self, quantity, sample_storage, moments_fn=None): self._quantity = quantity self._sample_storage = sample_storage self._moments_fn = moments_fn + self._moments_mean = None @property def quantity(self): @@ -29,6 +31,16 @@ def quantity(self, quantity): def n_moments(self): return self._moments_fn.size + @property + def moments_mean_obj(self): + return self._moments_mean + + @moments_mean_obj.setter + def moments_mean_obj(self, moments_mean): + if not isinstance(moments_mean, mlmc.quantity.quantity.QuantityMean): + raise TypeError + self._moments_mean = moments_mean + def estimate_moments(self, moments_fn=None): """ Use collected samples to estimate moments and variance of this estimate. @@ -39,6 +51,7 @@ def estimate_moments(self, moments_fn=None): moments_fn = self._moments_fn moments_mean = qe.estimate_mean(qe.moments(self._quantity, moments_fn)) + self.moments_mean_obj = moments_mean return moments_mean.mean, moments_mean.var def estimate_covariance(self, moments_fn=None): @@ -340,6 +353,51 @@ def get_level_samples(self, level_id, n_samples=None): chunk_spec = next(self._sample_storage.chunks(level_id=level_id, n_samples=n_samples)) return self._quantity.samples(chunk_spec=chunk_spec) + def kurtosis_check(self, quantity=None): + if quantity is None: + quantity = self._quantity + moments_mean_quantity = qe.estimate_mean(quantity) + kurtosis = qe.level_kurtosis(quantity, moments_mean_quantity) + return kurtosis + + +def consistency_check(quantity, sample_storage=None): + + fine_samples = {} + coarse_samples = {} + for chunk_spec in quantity.get_quantity_storage().chunks(): + samples = quantity.samples(chunk_spec) + chunk, n_mask_samples = mask_nan_samples(samples) + + # No samples in chunk + if chunk.shape[1] == 0: + continue + + fine_samples.setdefault(chunk_spec.level_id, []).extend(chunk[:, :, 0]) + if chunk_spec.level_id > 0: + coarse_samples.setdefault(chunk_spec.level_id, []).extend(chunk[:, :, 1]) + + cons_check_val = {} + for level_id in range(sample_storage.get_n_levels()): + if level_id > 0: + fine_mean = np.mean(fine_samples[level_id]) + coarse_mean = np.mean(coarse_samples[level_id]) + diff_mean = np.mean(np.array(fine_samples[level_id]) - np.array(coarse_samples[level_id])) + + fine_var = np.var(fine_samples[level_id]) + coarse_var = np.var(fine_samples[level_id]) + diff_var = np.var(np.array(fine_samples[level_id]) - np.array(coarse_samples[level_id])) + + val = np.abs(coarse_mean - fine_mean + diff_mean) / ( + 3 * (np.sqrt(coarse_var) + np.sqrt(fine_var) + np.sqrt(diff_var))) + + assert np.isclose(coarse_mean - fine_mean + diff_mean, 0) + assert val < 0.9 + + cons_check_val[level_id] = val + + return cons_check_val + def estimate_domain(quantity, sample_storage, quantile=None): """ @@ -363,7 +421,23 @@ def estimate_domain(quantity, sample_storage, quantile=None): return np.min(ranges[:, 0]), np.max(ranges[:, 1]) -def estimate_n_samples_for_target_variance(target_variance, prescribe_vars, n_ops, n_levels): +def coping_with_high_kurtosis(vars, costs, kurtosis, kurtosis_threshold=100): + """ + Coping with high kurtosis is recommended by prof. M. Giles in http://people.maths.ox.ac.uk/~gilesm/talks/MCQMC_22_b.pdf + :param vars: vars[L, M] for all levels L and moments_fn M safe the (zeroth) constant moment with zero variance. + :param costs: cost of level's sample + :param kurtosis: each level's sample kurtosis + :param kurtosis_threshold: Kurtosis is considered to be too high if it is above this threshold. + Original variances are underestimated and therefore modified in this metod + :return: vars + """ + for l_id in range(2, vars.shape[0]): + if kurtosis[l_id] > kurtosis_threshold: + vars[l_id] = np.maximum(vars[l_id], 0.5 * vars[l_id - 1] * costs[l_id - 1] / costs[l_id]) + return vars + + +def estimate_n_samples_for_target_variance(target_variance, prescribe_vars, n_ops, n_levels, theta=0, kurtosis=None): """ Estimate optimal number of samples for individual levels that should provide a target variance of resulting moment estimate. @@ -372,12 +446,20 @@ def estimate_n_samples_for_target_variance(target_variance, prescribe_vars, n_op :param prescribe_vars: vars[ L, M] for all levels L and moments_fn M safe the (zeroth) constant moment with zero variance. :param n_ops: number of operations at each level :param n_levels: number of levels + :param theta: number of samples N_l control parameter, suitable values: 0.25 ... 0.5 + :param kurtosis: levels' kurtosis :return: np.array with number of optimal samples for individual levels and moments_fn, array (LxR) """ vars = prescribe_vars + + if kurtosis is not None and len(vars) == len(kurtosis): + vars = coping_with_high_kurtosis(vars, n_ops, kurtosis) + sqrt_var_n = np.sqrt(vars.T * n_ops) # moments_fn in rows, levels in cols total = np.sum(sqrt_var_n, axis=1) # sum over levels n_samples_estimate = np.round((sqrt_var_n / n_ops).T * total / target_variance).astype(int) # moments_fn in cols + n_samples_estimate = 1/(1-theta) * n_samples_estimate + # Limit maximal number of samples per level n_samples_estimate_safe = np.maximum( np.minimum(n_samples_estimate, vars * n_levels / target_variance), 2) diff --git a/mlmc/plot/diagnostic_plots.py b/mlmc/plot/diagnostic_plots.py new file mode 100644 index 00000000..e69de29b diff --git a/mlmc/quantity/quantity_estimate.py b/mlmc/quantity/quantity_estimate.py index 2436d7b9..a1f63ddd 100644 --- a/mlmc/quantity/quantity_estimate.py +++ b/mlmc/quantity/quantity_estimate.py @@ -19,13 +19,17 @@ def cache_clear(): mlmc.quantity.quantity.QuantityConst.samples.cache_clear() -def estimate_mean(quantity): +def estimate_mean(quantity, form="diff", operation_func=None, **kwargs): """ MLMC mean estimator. The MLMC method is used to compute the mean estimate to the Quantity dependent on the collected samples. The squared error of the estimate (the estimator variance) is estimated using the central limit theorem. Data is processed by chunks, so that it also supports big data processing :param quantity: Quantity + :param form: if "diff" estimates based on difference between fine and coarse data = MLMC approach + "fine" estimates based on level's fine data + "coarse" estimates based on level's coarse data + :param operation_func: function to process level data, e.g. kurtosis estimation :return: QuantityMean which holds both mean and variance """ cache_clear() @@ -56,10 +60,26 @@ def estimate_mean(quantity): sums = [np.zeros(chunk.shape[0]) for _ in range(n_levels)] sums_of_squares = [np.zeros(chunk.shape[0]) for _ in range(n_levels)] - if chunk_spec.level_id == 0: - chunk_diff = chunk[:, :, 0] + # Estimates of level's fine data + if form == "fine": + if chunk_spec.level_id == 0: + chunk_diff = chunk[:, :, 0] + else: + chunk_diff = chunk[:, :, 0] + # Estimate of level's coarse data + elif form == "coarse": + if chunk_spec.level_id == 0: + chunk_diff = np.zeros(chunk[:, :, 0].shape) + else: + chunk_diff = chunk[:, :, 1] else: - chunk_diff = chunk[:, :, 0] - chunk[:, :, 1] + if chunk_spec.level_id == 0: + chunk_diff = chunk[:, :, 0] + else: + chunk_diff = chunk[:, :, 0] - chunk[:, :, 1] + + if operation_func is not None: + chunk_diff = operation_func(chunk_diff, chunk_spec, **kwargs) sums[chunk_spec.level_id] += np.sum(chunk_diff, axis=1) sums_of_squares[chunk_spec.level_id] += np.sum(chunk_diff**2, axis=1) @@ -154,3 +174,28 @@ def eval_cov(x): else: moments_qtype = qt.ArrayType(shape=(moments_fn.size, moments_fn.size, ), qtype=quantity.qtype) return mlmc.quantity.quantity.Quantity(quantity_type=moments_qtype, input_quantities=[quantity], operation=eval_cov) + + +def kurtosis_numerator(chunk_diff, chunk_spec, l_means): + """ + Estimate sample kurtosis nominator: + E[(Y_l - E[Y_l])^4] + :param chunk_diff: np.ndarray, [quantity shape, number of samples] + :param chunk_spec: quantity_spec.ChunkSpec + :return: np.ndarray, unchanged shape + """ + chunk_diff = (chunk_diff - l_means[chunk_spec.level_id]) ** 4 + return chunk_diff + + +def level_kurtosis(quantity, means_obj): + """ + Estimate sample kurtosis at each level as: + E[(Y_l - E[Y_l])^4] / (Var[Y_l])^2, where Y_l = fine_l - coarse_l + :param quantity: Quantity + :param means_obj: Quantity.QuantityMean + :return: np.ndarray, kurtosis per level + """ + numerator_means_obj = estimate_mean(quantity, operation_func=kurtosis_numerator, l_means=means_obj.l_means) + kurtosis = numerator_means_obj.l_means / (means_obj.l_vars)**2 + return kurtosis diff --git a/mlmc/tool/hdf5.py b/mlmc/tool/hdf5.py index f6f3219c..a83f6a8a 100644 --- a/mlmc/tool/hdf5.py +++ b/mlmc/tool/hdf5.py @@ -357,7 +357,7 @@ def chunks(self, n_samples=None): dataset = hdf_file["/".join([self.level_group_path, "collected_values"])] if n_samples is not None: - yield ChunkSpec(chunk_id=0, chunk_slice=slice(0, n_samples, 1), level_id=int(self.level_id)) + yield ChunkSpec(chunk_id=0, chunk_slice=slice(0, n_samples, ...), level_id=int(self.level_id)) else: for chunk_id, chunk in enumerate(dataset.iter_chunks()): yield ChunkSpec(chunk_id=chunk_id, chunk_slice=chunk[0], level_id=int(self.level_id)) # slice, level_id From 1d387443c7e6f2ec302f24a24381380c0251d977 Mon Sep 17 00:00:00 2001 From: martinspetlik Date: Tue, 23 Aug 2022 11:58:45 +0200 Subject: [PATCH 02/11] mlmc diagnostic plots --- examples/shooting/shooting_1D_mcqmc.py | 252 +++++++++++++++++++++++++ mlmc/plot/diagnostic_plots.py | 97 ++++++++++ test/test_estimates.py | 234 +++++++++++++++++++++++ test/test_sampler.py | 43 +++++ 4 files changed, 626 insertions(+) create mode 100644 examples/shooting/shooting_1D_mcqmc.py create mode 100644 test/test_estimates.py diff --git a/examples/shooting/shooting_1D_mcqmc.py b/examples/shooting/shooting_1D_mcqmc.py new file mode 100644 index 00000000..bcf5f5fe --- /dev/null +++ b/examples/shooting/shooting_1D_mcqmc.py @@ -0,0 +1,252 @@ +import numpy as np +import mlmc.estimator as est +from mlmc.estimator import Estimate, estimate_n_samples_for_target_variance +from mlmc.sampler import Sampler +from mlmc.sample_storage import Memory +from mlmc.sampling_pool import OneProcessPool +from examples.shooting.simulation_shooting_1D import ShootingSimulation1D +from mlmc.quantity.quantity import make_root_quantity +from mlmc.quantity.quantity_estimate import moments, estimate_mean +from mlmc.moments import Legendre +from mlmc.plot.plots import Distribution + + +# Tutorial class for 1D shooting simulation, includes +# - samples scheduling +# - process results: +# - create Quantity instance +# - approximate density +class ProcessShooting1D: + + def __init__(self): + n_levels = 3 + # Number of MLMC levels + step_range = [1, 1e-3] + # step_range [simulation step at the coarsest level, simulation step at the finest level] + level_parameters = ProcessShooting1D.determine_level_parameters(n_levels, step_range) + # Determine each level parameters (in this case, simulation step at each level), level_parameters should be + # simulation dependent + self._sample_sleep = 0#30 + # Time to do nothing just to make sure the simulations aren't constantly checked, useful mainly for PBS run + self._sample_timeout = 60 + # Maximum waiting time for running simulations + self._adding_samples_coef = 0.1 + self._n_moments = 20 + # number of generalized statistical moments used for MLMC number of samples estimation + self._quantile = 0.01 + # Setting parameters that are utilized when scheduling samples + ### + # MLMC run + ### + sampler = self.create_sampler(level_parameters=level_parameters) + # Create sampler (mlmc.Sampler instance) - crucial class that controls MLMC run + self.generate_samples(sampler, n_samples=None, target_var=1e-3) + # Generate MLMC samples, there are two ways: + # 1) set exact number of samples at each level, + # e.g. for 5 levels - self.generate_samples(sampler, n_samples=[1000, 500, 250, 100, 50]) + # 2) set target variance of MLMC estimates, + # e.g. self.generate_samples(sampler, n_samples=None, target_var=1e-6) + self.all_collect(sampler) + # Check if all samples are finished + ### + # Postprocessing + ### + self.process_results(sampler, n_levels) + # Postprocessing, MLMC is finished at this point + + def create_sampler(self, level_parameters): + """ + Create: + # sampling pool - the way sample simulations are executed + # sample storage - stores sample results + # sampler - controls MLMC execution + :param level_parameters: list of lists + :return: mlmc.sampler.Sampler instance + """ + # Create OneProcessPool - all run in the same process + sampling_pool = OneProcessPool() + # There is another option mlmc.sampling_pool.ProcessPool() - supports local parallel sample simulation run + # sampling_pool = ProcessPool(n), n - number of parallel simulations, depends on computer architecture + + # Simulation configuration which is passed to simulation constructor + simulation_config = { + "start_position": np.array([0, 0]), + "start_velocity": np.array([10, 0]), + "area_borders": np.array([-100, 200, -300, 400]), + "max_time": 10, + "complexity": 2, # used for initial estimate of number of operations per sample + 'fields_params': dict(model='gauss', dim=1, sigma=1, corr_length=0.1), + } + + # Create simulation factory, instance of class that inherits from mlmc.sim.simulation + simulation_factory = ShootingSimulation1D(config=simulation_config) + + # Create simple sample storage + # Memory() storage keeps samples in computer main memory + sample_storage = Memory() + # We support also HDF file storage mlmc.sample_storage_hdf.SampleStorageHDF() + # sample_storage = SampleStorageHDF(file_path=path_to_HDF5_file) + + # Create sampler + # Controls the execution of MLMC + sampler = Sampler(sample_storage=sample_storage, sampling_pool=sampling_pool, sim_factory=simulation_factory, + level_parameters=level_parameters) + return sampler + + def generate_samples(self, sampler, n_samples=None, target_var=None): + """ + Generate MLMC samples + :param sampler: mlmc.sampler.Sampler instance + :param n_samples: None or list, number of samples at each level + :param target_var: target variance of MLMC estimates + :return: None + """ + # The number of samples is set by user + if n_samples is not None: + sampler.set_initial_n_samples(n_samples) + # The number of initial samples is determined automatically + else: + sampler.set_initial_n_samples() + # Samples are scheduled and the program is waiting for all of them to be completed. + sampler.schedule_samples() + sampler.ask_sampling_pool_for_samples(sleep=self._sample_sleep, timeout=self._sample_timeout) + self.all_collect(sampler) + + # MLMC estimates target variance is set + if target_var is not None: + # The mlmc.quantity.quantity.Quantity instance is created + # parameters 'storage' and 'q_specs' are obtained from sample_storage, + # originally 'q_specs' is set in the simulation class + root_quantity = make_root_quantity(storage=sampler.sample_storage, + q_specs=sampler.sample_storage.load_result_format()) + + # Moment functions object is created + # The MLMC algorithm determines number of samples according to the moments variance, + # Type of moment functions (Legendre by default) might affect the total number of MLMC samples + moments_fn = self.set_moments(root_quantity, sampler.sample_storage, n_moments=self._n_moments) + estimate_obj = Estimate(root_quantity, sample_storage=sampler.sample_storage, + moments_fn=moments_fn) + + # Initial estimation of the number of samples at each level + variances, n_ops = estimate_obj.estimate_diff_vars_regression(sampler.n_finished_samples) + # Firstly, the variance of moments and execution time of samples at each level are calculated from already finished samples + n_estimated = estimate_n_samples_for_target_variance(target_var, variances, n_ops, + n_levels=sampler.n_levels) + + ##### + # MLMC sampling algorithm - gradually schedules samples and refines the total number of samples + ##### + # Loop until number of estimated samples is greater than the number of scheduled samples + while not sampler.process_adding_samples(n_estimated, self._sample_sleep, self._adding_samples_coef, + timeout=self._sample_timeout): + # New estimation according to already finished samples + variances, n_ops = estimate_obj.estimate_diff_vars_regression(sampler._n_scheduled_samples) + n_estimated = estimate_n_samples_for_target_variance(target_var, variances, n_ops, + n_levels=sampler.n_levels) + + def set_moments(self, quantity, sample_storage, n_moments=25): + true_domain = Estimate.estimate_domain(quantity, sample_storage, quantile=self._quantile) + return Legendre(n_moments, true_domain) + + def all_collect(self, sampler): + """ + Collect samples, wait until all samples are finished + :param sampler: mlmc.sampler.Sampler object + :return: None + """ + running = 1 + while running > 0: + running = 0 + running += sampler.ask_sampling_pool_for_samples() + print("N running: ", running) + + def process_results(self, sampler, n_levels): + """ + Process MLMC results + :param sampler: mlmc.sampler.Sampler instance + :param n_levels: int, number of MLMC levels + :return: None + """ + sample_storage = sampler.sample_storage + # Load result format from the sample storage + result_format = sample_storage.load_result_format() + # Create Quantity instance representing our real quantity of interest + root_quantity = make_root_quantity(sample_storage, result_format) + + print("N collected ", sample_storage.get_n_collected()) + + # It is possible to access items of the quantity according to the result format + target = root_quantity['target'] + time = target[10] + position = time['0'] + q_value = position[0] + + # Compute moments, first estimate domain of moment functions + estimated_domain = Estimate.estimate_domain(q_value, sample_storage, quantile=self._quantile) + moments_fn = Legendre(self._n_moments, estimated_domain) + + # Create estimator for the quantity + estimator = Estimate(quantity=q_value, sample_storage=sample_storage, moments_fn=moments_fn) + # Estimate moment means and variances + means, vars = estimator.estimate_moments(moments_fn) + + est.plot_checks(quantity=q_value, sample_storage=sample_storage, moments_fn=moments_fn) + est.consistency_check(quantity=q_value, sample_storage=sample_storage) + estimator.kurtosis_check(q_value) + + # Generally, root quantity has different domain than its items + root_quantity_estimated_domain = Estimate.estimate_domain(root_quantity, sample_storage, + quantile=self._quantile) + root_quantity_moments_fn = Legendre(self._n_moments, root_quantity_estimated_domain) + + # There is another possible approach to calculating all moments at once and then select desired quantity + moments_quantity = moments(root_quantity, moments_fn=root_quantity_moments_fn, mom_at_bottom=True) + moments_mean = estimate_mean(moments_quantity) + target_mean = moments_mean['target'] + time_mean = target_mean[10] # times: [1] + location_mean = time_mean['0'] # locations: ['0'] + value_mean = location_mean[0] # result shape: (1,) + + assert value_mean.mean[0] == 1 + self.approx_distribution(estimator, n_levels, tol=1e-8) + + def approx_distribution(self, estimator, n_levels, tol=1.95): + """ + Probability density function approximation + :param estimator: mlmc.estimator.Estimate instance, it contains quantity for which the density is approximated + :param n_levels: int, number of MLMC levels + :param tol: Tolerance of the fitting problem, with account for variances in moments. + :return: None + """ + distr_obj, result, _, _ = estimator.construct_density(tol=tol) + distr_plot = Distribution(title="distributions", error_plot=None) + distr_plot.add_distribution(distr_obj) + + if n_levels == 1: + samples = estimator.get_level_samples(level_id=0)[..., 0] + distr_plot.add_raw_samples(np.squeeze(samples)) + distr_plot.show(None) + distr_plot.reset() + + @staticmethod + def determine_level_parameters(n_levels, step_range): + """ + Determine level parameters, + In this case, a step of fine simulation at each level + :param n_levels: number of MLMC levels + :param step_range: simulation step range + :return: list of lists + """ + assert step_range[0] > step_range[1] + level_parameters = [] + for i_level in range(n_levels): + if n_levels == 1: + level_param = 1 + else: + level_param = i_level / (n_levels - 1) + level_parameters.append([step_range[0] ** (1 - level_param) * step_range[1] ** level_param]) + return level_parameters + + +if __name__ == "__main__": + ProcessShooting1D() diff --git a/mlmc/plot/diagnostic_plots.py b/mlmc/plot/diagnostic_plots.py index e69de29b..5d6a844e 100644 --- a/mlmc/plot/diagnostic_plots.py +++ b/mlmc/plot/diagnostic_plots.py @@ -0,0 +1,97 @@ +import numpy as np +import scipy.stats as st +from scipy import interpolate +import matplotlib + +matplotlib.rcParams.update({'font.size': 22}) +from matplotlib.patches import Patch +import matplotlib.pyplot as plt + + +# def log_var_level(variances, l_vars, err_variances=0, err_l_vars=0, moments=[1,2,3,4]): +# fig, ax1 = plt.subplots(figsize=(8, 5)) +# for m in moments: +# # line1, = ax1.errorbar(np.log2(variances[m]), yerr=err_variances, label="m={}".format(m), marker="o") +# # line2, = ax1.errorbar(np.log2(l_vars[:, m]), yerr=err_l_vars, label="m={}".format(m), marker="s") +# #line1, = ax1.plot(np.log2(variances[m]), label="m={}".format(m), marker="o") +# line2, = ax1.plot(np.log2(l_vars[:, m]), label="m={}".format(m), marker="s") +# +# ax1.set_ylabel('log' + r'$_2$' + 'variance') +# ax1.set_xlabel('level' + r'$l$') +# plt.legend() +# #plt.savefig("MLMC_cost_saves.pdf") +# plt.show() + + +def log_var_per_level(l_vars, err_variances=0, err_l_vars=0, moments=[1, 2, 3, 4]): + fig, ax1 = plt.subplots(figsize=(8, 5)) + for m in moments: + # line1, = ax1.errorbar(np.log2(variances[m]), yerr=err_variances, label="m={}".format(m), marker="o") + # line2, = ax1.errorbar(np.log2(l_vars[:, m]), yerr=err_l_vars, label="m={}".format(m), marker="s") + #line1, = ax1.plot(np.log2(variances[m]), label="m={}".format(m), marker="o") + line2, = ax1.plot(np.log2(l_vars[:, m]), label="m={}".format(m), marker="s") + + ax1.set_ylabel('log' + r'$_2$' + 'variance') + ax1.set_xlabel('level' + r'$l$') + plt.legend() + #plt.savefig("MLMC_cost_saves.pdf") + plt.show() + + +# def log_mean_level(means, l_means, err_means=0, err_l_means=0, moments=[1,2,3,4]): +# fig, ax1 = plt.subplots(figsize=(8, 5)) +# for m in moments: +# # line1, = ax1.errorbar(np.log2(variances[m]), yerr=err_variances, label="m={}".format(m), marker="o") +# # line2, = ax1.errorbar(np.log2(l_vars[:, m]), yerr=err_l_vars, label="m={}".format(m), marker="s") +# #line1, = ax1.plot(np.log2(variances[m]), label="m={}".format(m), marker="o") +# line2, = ax1.plot(np.log2(np.abs(l_means[:, m])), label="m={}".format(m), marker="s") +# +# ax1.set_ylabel('log' + r'$_2$' + 'mean') +# ax1.set_xlabel('level' + r'$l$') +# plt.legend() +# #plt.savefig("MLMC_cost_saves.pdf") +# plt.show() + + +def log_mean_per_level(l_means, err_means=0, err_l_means=0, moments=[1, 2, 3, 4]): + fig, ax1 = plt.subplots(figsize=(8, 5)) + for m in moments: + # line1, = ax1.errorbar(np.log2(variances[m]), yerr=err_variances, label="m={}".format(m), marker="o") + # line2, = ax1.errorbar(np.log2(l_vars[:, m]), yerr=err_l_vars, label="m={}".format(m), marker="s") + #line1, = ax1.plot(np.log2(variances[m]), label="m={}".format(m), marker="o") + line2, = ax1.plot(np.log2(np.abs(l_means[:, m])), label="m={}".format(m), marker="s") + + ax1.set_ylabel('log' + r'$_2$' + 'mean') + ax1.set_xlabel('level' + r'$l$') + plt.legend() + #plt.savefig("MLMC_cost_saves.pdf") + plt.show() + + +def sample_cost_per_level(costs): + fig, ax1 = plt.subplots(figsize=(8, 5)) + line2, = ax1.plot(np.log2(costs), marker="s") + + ax1.set_ylabel('log' + r'$_2$' + 'cost per sample') + ax1.set_xlabel('level' + r'$l$') + plt.legend() + #plt.savefig("MLMC_cost_saves.pdf") + plt.show() + + +def kurtosis_per_level(means, l_means, err_means=0, err_l_means=0, moments=[1, 2, 3, 4]): + fig, ax1 = plt.subplots(figsize=(8, 5)) + for m in moments: + # line1, = ax1.errorbar(np.log2(variances[m]), yerr=err_variances, label="m={}".format(m), marker="o") + # line2, = ax1.errorbar(np.log2(l_vars[:, m]), yerr=err_l_vars, label="m={}".format(m), marker="s") + #line1, = ax1.plot(np.log2(variances[m]), label="m={}".format(m), marker="o") + line2, = ax1.plot(np.log2(np.abs(l_means[:, m])), label="m={}".format(m), marker="s") + + ax1.set_ylabel('log ' + r'$_2$ ' + 'mean') + ax1.set_xlabel('level ' + r'$l$') + plt.legend() + #plt.savefig("MLMC_cost_saves.pdf") + plt.show() + + + diff --git a/test/test_estimates.py b/test/test_estimates.py new file mode 100644 index 00000000..b8eb1ac5 --- /dev/null +++ b/test/test_estimates.py @@ -0,0 +1,234 @@ +import os +import shutil +import numpy as np +from scipy import stats +import pytest +from mlmc.sim.synth_simulation import SynthSimulationWorkspace +from test.synth_sim_for_tests import SynthSimulationForTests +from mlmc.sampler import Sampler +from mlmc.sample_storage import Memory +from mlmc.sample_storage_hdf import SampleStorageHDF +from mlmc.sampling_pool import OneProcessPool, ProcessPool +from mlmc.moments import Legendre +from mlmc.quantity.quantity import make_root_quantity +import mlmc.estimator + +# Set work dir +os.chdir(os.path.dirname(os.path.realpath(__file__))) +work_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), '_test_tmp') +if os.path.exists(work_dir): + shutil.rmtree(work_dir) +os.makedirs(work_dir) + +# Create simulations +failed_fraction = 0.0 +distr = stats.norm() +simulation_config = dict(distr=distr, complexity=2, nan_fraction=failed_fraction, sim_method='_sample_fn') + +shutil.copyfile('synth_sim_config.yaml', os.path.join(work_dir, 'synth_sim_config.yaml')) +simulation_config_workspace = {"config_yaml": os.path.join(work_dir, 'synth_sim_config.yaml')} + + +def hdf_storage_factory(file_name="mlmc_test.hdf5"): + os.chdir(os.path.dirname(os.path.realpath(__file__))) + work_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), '_test_tmp') + if os.path.exists(work_dir): + shutil.rmtree(work_dir) + os.makedirs(work_dir) + + # Create sample storages + return SampleStorageHDF(file_path=os.path.join(work_dir, file_name)) + + +def mlmc_test(test_case): + #np.random.seed(1234) + n_moments = 20 + step_range = [[0.1], [0.005], [0.00025]] + + simulation_factory, sample_storage, sampling_pool = test_case + + if simulation_factory.need_workspace: + os.chdir(os.path.dirname(os.path.realpath(__file__))) + shutil.copyfile('synth_sim_config.yaml', os.path.join(work_dir, 'synth_sim_config.yaml')) + + sampler = Sampler(sample_storage=sample_storage, sampling_pool=sampling_pool, sim_factory=simulation_factory, + level_parameters=step_range) + + true_domain = distr.ppf([0.0001, 0.9999]) + moments_fn = Legendre(n_moments, true_domain) + # moments_fn = Monomial(n_moments, true_domain) + + sampler.set_initial_n_samples([100, 50, 25]) + # sampler.set_initial_n_samples([10000]) + sampler.schedule_samples() + sampler.ask_sampling_pool_for_samples() + + target_var = 1e-3 + sleep = 0 + add_coef = 0.1 + + quantity = make_root_quantity(sample_storage, q_specs=simulation_factory.result_format()) + + length = quantity['length'] + time = length[1] + location = time['10'] + value_quantity = location[0] + + estimator = mlmc.estimator.Estimate(value_quantity, sample_storage, moments_fn) + + # New estimation according to already finished samples + variances, n_ops = estimator.estimate_diff_vars_regression(sampler._n_scheduled_samples) + n_estimated = mlmc.estimator.estimate_n_samples_for_target_variance(target_var, variances, n_ops, + n_levels=sampler.n_levels) + + + # Loop until number of estimated samples is greater than the number of scheduled samples + while not sampler.process_adding_samples(n_estimated, sleep, add_coef): + print("n estimated ", n_estimated) + # New estimation according to already finished samples + variances, n_ops = estimator.estimate_diff_vars_regression(sampler._n_scheduled_samples) + n_estimated = mlmc.estimator.estimate_n_samples_for_target_variance(target_var, variances, n_ops, + n_levels=sampler.n_levels) + + means, vars = estimator.estimate_moments(moments_fn) + assert means[0] == 1 + assert vars[0] == 0 + + return estimator.moments_mean_obj, sample_storage.get_n_collected(), sample_storage.get_n_ops() + + +def mlmc_test_mcqmc(test_case): + # np.random.seed(1234) + n_moments = 10 + step_range = [[0.1], [0.005], [0.00025]] + + simulation_factory, sample_storage, sampling_pool = test_case + + if simulation_factory.need_workspace: + os.chdir(os.path.dirname(os.path.realpath(__file__))) + shutil.copyfile('synth_sim_config.yaml', os.path.join(work_dir, 'synth_sim_config.yaml')) + + sampler = Sampler(sample_storage=sample_storage, sampling_pool=sampling_pool, sim_factory=simulation_factory, + level_parameters=step_range) + + true_domain = distr.ppf([0.0001, 0.9999]) + moments_fn = Legendre(n_moments, true_domain) + # moments_fn = Monomial(n_moments, true_domain) + + sampler.set_initial_n_samples([100, 50, 25]) + # sampler.set_initial_n_samples([10000]) + sampler.schedule_samples() + sampler.ask_sampling_pool_for_samples() + + target_var = 1e-4 + sleep = 0 + add_coef = 0.1 + + quantity = make_root_quantity(sample_storage, q_specs=simulation_factory.result_format()) + + length = quantity['length'] + time = length[1] + location = time['10'] + value_quantity = location[0] + + estimator = mlmc.estimator.Estimate(value_quantity, sample_storage, moments_fn) + + # New estimation according to already finished samples + variances, n_ops = estimator.estimate_diff_vars_regression(sampler._n_scheduled_samples) + n_estimated = mlmc.estimator.estimate_n_samples_for_target_variance_giles(target_var, variances, n_ops, + n_levels=sampler.n_levels, theta=0.25) + + + # Loop until number of estimated samples is greater than the number of scheduled samples + while not sampler.process_adding_samples(n_estimated, sleep, add_coef): + # New estimation according to already finished samples + kurtosis = estimator.kurtosis_check() + variances, n_ops = estimator.estimate_diff_vars_regression(sampler._n_scheduled_samples) + n_estimated = mlmc.estimator.estimate_n_samples_for_target_variance_giles(target_var, variances, n_ops, + n_levels=sampler.n_levels, theta=0.25, + kurtosis=kurtosis) + + means, vars = estimator.estimate_moments(moments_fn) + + estimator.kurtosis_check(value_quantity) + + assert means[0] == 1 + assert vars[0] == 0 + + return estimator.moments_mean_obj, sample_storage.get_n_collected(), sample_storage.get_n_ops() + + +def mlmc_test_data(method): + from mlmc.plot.diagnostic_plots import log_var_per_level, log_mean_per_level, sample_cost_per_level, kurtosis_per_level + means = [] + l_means = [] + vars = [] + l_vars = [] + n_collected = [] + n_ops = [] + for _ in range(2): + moments_mean_obj, n_col, n_op = method((SynthSimulationForTests(simulation_config), hdf_storage_factory(file_name="mlmc_test.hdf5"), OneProcessPool())) + means.append(moments_mean_obj.mean) + vars.append(moments_mean_obj.var) + l_means.append(moments_mean_obj.l_means) + l_vars.append(moments_mean_obj.l_vars) + n_collected.append(n_col) + n_ops.append(n_op) + + print("means ", np.mean(means, axis=0)) + print("vars ", np.mean(vars, axis=0)) + + log_var_per_level(l_vars=np.mean(l_vars, axis=0), moments=[1, 2, 5]) + log_mean_per_level(l_means=np.mean(l_means, axis=0), moments=[1, 2, 5]) + #sample_cost_level(costs=np.mean(n_ops, axis=0)) + + print("n collected ", np.mean(n_collected, axis=0)) + print("cost on level ", np.mean(n_ops, axis=0)) + print("total cost ", np.sum(np.mean(n_ops * np.array(n_collected), axis=0))) + + +if __name__ == "__main__": + # means_mlmc = [] + # means_mlmc_giles = [] + # n_collected_mlmc = [] + # n_collected_mlmc_giles = [] + # vars_mlmc = [] + # vars_mlmc_giles = [] + for i in range(3): + # print("############### Original estimator ###################") + # mlmc_test_data(mlmc_test) + print("######################################################") + print("############### Improved estimator ###################") + mlmc_test_data(mlmc_test_mcqmc) + # mean, var, n_estimated, n_ops = mlmc_test((SynthSimulationForTests(simulation_config), + # hdf_storage_factory(file_name="mlmc_test.hdf5"), + # OneProcessPool())) + # means_mlmc.append(mean) + # vars_mlmc.append(var) + # n_collected_mlmc.append(n_estimated) + # mean, var, n_estimated, n_ops = mlmc_test_giles((SynthSimulationForTests(simulation_config), + # hdf_storage_factory(file_name="mlmc_giles_test.hdf5"), + # OneProcessPool())) + # means_mlmc_giles.append(mean) + # vars_mlmc_giles.append(var) + # n_collected_mlmc_giles.append(n_estimated) + + # + # print("mlmc means ", np.mean(means_mlmc, axis=0)) + # print("mlmc vars ", np.mean(vars_mlmc, axis=0)) + # print("mlmc n collected ", np.mean(n_collected_mlmc, axis=0)) + # + # print("mlmc giles means ", np.mean(means_mlmc_giles, axis=0)) + # print("mlmc giles vars ", np.mean(vars_mlmc_giles, axis=0)) + # print("mlmc giles n collected ", np.mean(n_collected_mlmc_giles, axis=0)) + + + + + + + + + + + diff --git a/test/test_sampler.py b/test/test_sampler.py index dc846f3a..d559b6ea 100644 --- a/test/test_sampler.py +++ b/test/test_sampler.py @@ -1,5 +1,8 @@ +import os +import shutil import numpy as np from scipy import stats +import mlmc from mlmc.sample_storage import Memory from mlmc.sim.synth_simulation import SynthSimulation from mlmc.sampling_pool import OneProcessPool @@ -34,3 +37,43 @@ def test_sampler(): n_estimated = np.array([100, 50, 20]) sampler.process_adding_samples(n_estimated, 0, 0.1) assert np.allclose(sampler._n_target_samples, init_samples + (n_estimated * 0.1), atol=1) + + +def test_sampler_hdf(): + # Create simulations + failed_fraction = 0.1 + distr = stats.norm() + simulation_config = dict(distr=distr, complexity=2, nan_fraction=failed_fraction, sim_method='_sample_fn') + simulation = SynthSimulation(simulation_config) + + work_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), '_test_tmp') + if os.path.exists(work_dir): + shutil.rmtree(work_dir) + os.makedirs(work_dir) + file_path = os.path.join(work_dir, "mlmc_test.hdf5") + storage = mlmc.SampleStorageHDF(file_path=file_path) + sampling_pool = OneProcessPool() + + step_range = [[0.1], [0.01], [0.001]] + + sampler = Sampler(sample_storage=storage, sampling_pool=sampling_pool, sim_factory=simulation, + level_parameters=step_range) + + assert len(sampler._level_sim_objects) == len(step_range) + for step, level_sim in zip(step_range, sampler._level_sim_objects): + assert step[0] == level_sim.config_dict['fine']['step'] + + init_samples = list(np.ones(len(step_range)) * 10) + + sampler.set_initial_n_samples(init_samples) + assert np.allclose(sampler._n_target_samples, init_samples) + assert 0 == sampler.ask_sampling_pool_for_samples() + sampler.schedule_samples() + assert np.allclose(sampler._n_scheduled_samples, init_samples) + + n_estimated = np.array([100, 50, 20]) + sampler.process_adding_samples(n_estimated, 0, 0.1) + assert np.allclose(sampler._n_target_samples, init_samples + (n_estimated * 0.1), atol=1) + + +test_sampler_hdf() \ No newline at end of file From 067bbce3a984919ec118814250e6b0b52fa31570 Mon Sep 17 00:00:00 2001 From: martinspetlik Date: Tue, 9 May 2023 14:31:56 +0200 Subject: [PATCH 03/11] numpy 1.24 fix --- mlmc/sample_storage.py | 9 ++++++--- mlmc/sample_storage_hdf.py | 8 ++++---- mlmc/tool/hdf5.py | 2 +- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/mlmc/sample_storage.py b/mlmc/sample_storage.py index 01be9082..ec05080f 100644 --- a/mlmc/sample_storage.py +++ b/mlmc/sample_storage.py @@ -168,12 +168,15 @@ def _save_successful(self, samples): :return: None """ for level_id, res in samples.items(): - res = np.array(res) + res = np.array(res, dtype=object) fine_coarse_res = res[:, 1] - result_type = np.dtype((np.float, np.array(fine_coarse_res[0]).shape)) + result_type = np.dtype((float, np.array(fine_coarse_res[0], dtype=object).shape)) results = np.empty(shape=(len(res),), dtype=result_type) - results[:] = [val for val in fine_coarse_res] + + for idx, val in enumerate(fine_coarse_res): + results[idx, 0] = val[0] + results[idx, 1] = val[1] # Save sample ids self._successful_sample_ids.setdefault(level_id, []).extend(res[:, 0]) diff --git a/mlmc/sample_storage_hdf.py b/mlmc/sample_storage_hdf.py index 7e5fbef5..68b1af9d 100644 --- a/mlmc/sample_storage_hdf.py +++ b/mlmc/sample_storage_hdf.py @@ -39,7 +39,7 @@ def _hdf_result_format(self, locations, times): :return: """ if len(locations[0]) == 3: - tuple_dtype = np.dtype((np.float, (3,))) + tuple_dtype = np.dtype((float, (3,))) loc_dtype = np.dtype((tuple_dtype, (len(locations),))) else: loc_dtype = np.dtype(('S50', (len(locations),))) @@ -48,14 +48,14 @@ def _hdf_result_format(self, locations, times): 'formats': ('S50', 'S50', np.dtype((np.int32, (2,))), - np.dtype((np.float, (len(times),))), + np.dtype((float, (len(times),))), loc_dtype ) } return result_dtype - def save_global_data(self, level_parameters: List[np.float], result_format: List[QuantitySpec]): + def save_global_data(self, level_parameters: List[float], result_format: List[QuantitySpec]): """ Save hdf5 file global attributes :param level_parameters: list of simulation steps @@ -125,7 +125,7 @@ def save_samples(self, successful, failed): def _save_succesful(self, successful_samples): for level, samples in successful_samples.items(): if len(samples) > 0: - self._level_groups[level].append_successful(np.array(samples)) + self._level_groups[level].append_successful(np.array(samples, dtype=object)) def _save_failed(self, failed_samples): for level, samples in failed_samples.items(): diff --git a/mlmc/tool/hdf5.py b/mlmc/tool/hdf5.py index f6f3219c..32e24149 100644 --- a/mlmc/tool/hdf5.py +++ b/mlmc/tool/hdf5.py @@ -309,7 +309,7 @@ def append_successful(self, samples: np.array): self._append_dataset(self.collected_ids_dset, samples[:, 0]) values = samples[:, 1] - result_type = np.dtype((np.float, np.array(values[0]).shape)) + result_type = np.dtype((float, np.array(values[0]).shape)) # Create dataset for failed samples self._make_dataset(name='collected_values', shape=(0,), From f8b7f3fa40812cd5beb5dd2a6693713aa0501eb8 Mon Sep 17 00:00:00 2001 From: martinspetlik Date: Thu, 3 Aug 2023 13:09:06 +0200 Subject: [PATCH 04/11] pbs run fix --- mlmc/random/correlated_field.py | 6 +- mlmc/sampler.py | 25 ++- mlmc/sampling_pool_pbs.py | 266 ++++++++++++++++++++++++-------- mlmc/tool/pbs_job.py | 13 +- 4 files changed, 240 insertions(+), 70 deletions(-) diff --git a/mlmc/random/correlated_field.py b/mlmc/random/correlated_field.py index ba83e15b..b5c809a7 100644 --- a/mlmc/random/correlated_field.py +++ b/mlmc/random/correlated_field.py @@ -226,7 +226,11 @@ def sample(self): for field in self.fields: sample = field.sample() if field.is_outer: - result[field.name] = np.zeros(self.n_elements) + if field.name == "cond_tn": + result[field.name] = np.zeros((self.n_elements, 3)) + else: + result[field.name] = np.zeros(self.n_elements) + #result[field.name] = np.zeros(self.n_elements) result[field.name][field.full_sample_ids] = sample return result diff --git a/mlmc/sampler.py b/mlmc/sampler.py index 6a550726..17b7dce0 100644 --- a/mlmc/sampler.py +++ b/mlmc/sampler.py @@ -119,7 +119,7 @@ def _get_sample_tag(self, level_id): """ return "L{:02d}_S{:07d}".format(level_id, int(self._n_scheduled_samples[level_id])) - def schedule_samples(self, timeout=None): + def schedule_samples(self, timeout=None, level_id=None, n_samples=None): """ Create simulation samples, loop through "levels" and its samples (given the number of target samples): 1) generate sample tag (same for fine and coarse simulation) @@ -132,7 +132,9 @@ def schedule_samples(self, timeout=None): self.ask_sampling_pool_for_samples(timeout=timeout) plan_samples = self._n_target_samples - self._n_scheduled_samples - for level_id, n_samples in enumerate(plan_samples): + if level_id is None: + level_id = len(plan_samples) - 1 + if n_samples is not None: samples = [] for _ in range(int(n_samples)): # Unique sample id @@ -143,11 +145,28 @@ def schedule_samples(self, timeout=None): self._sampling_pool.schedule_sample(sample_id, level_sim) # Increment number of created samples at current level self._n_scheduled_samples[level_id] += 1 - samples.append(sample_id) # Store scheduled samples self.sample_storage.save_scheduled_samples(level_id, samples) + else: + for n_samples in np.flip(plan_samples): + samples = [] + for _ in range(int(n_samples)): + # Unique sample id + sample_id = self._get_sample_tag(level_id) + level_sim = self._level_sim_objects[level_id] + + # Schedule current sample + self._sampling_pool.schedule_sample(sample_id, level_sim) + # Increment number of created samples at current level + self._n_scheduled_samples[level_id] += 1 + + samples.append(sample_id) + + # Store scheduled samples + self.sample_storage.save_scheduled_samples(level_id, samples) + level_id -= 1 def _check_failed_samples(self): """ diff --git a/mlmc/sampling_pool_pbs.py b/mlmc/sampling_pool_pbs.py index 01c4128b..2142fdcd 100644 --- a/mlmc/sampling_pool_pbs.py +++ b/mlmc/sampling_pool_pbs.py @@ -5,6 +5,8 @@ import pickle import json import glob +import time +import numpy as np from mlmc.level_simulation import LevelSimulation from mlmc.sampling_pool import SamplingPool from mlmc.tool.pbs_job import PbsJob @@ -133,9 +135,17 @@ def pbs_common_setting(self, **kwargs): :return: None """ # Script header - select_flags_list = kwargs.get('select_flags', []) - if select_flags_list: - kwargs['select_flags'] = ":" + ":".join(select_flags_list) + select_flags_dict = kwargs.get('select_flags', {}) + + # Set scratch dir + if any(re.compile('scratch.*').match(flag) for flag in list(select_flags_dict.keys())): + if kwargs['scratch_dir'] is None: + kwargs['scratch_dir'] = "$SCRATCHDIR" + else: + kwargs['scratch_dir'] = '' + + if select_flags_dict: + kwargs['select_flags'] = ":" + ':'.join('{}={}'.format(*item) for item in select_flags_dict.items()) else: kwargs['select_flags'] = "" @@ -162,7 +172,7 @@ def pbs_common_setting(self, **kwargs): kwargs['optional_pbs_requests']) # e.g. ['#PBS -m ae'] means mail is sent when the job aborts or terminates self._pbs_header_template.extend(('MLMC_WORKDIR=\"{}\"'.format(self._work_dir),)) self._pbs_header_template.extend(kwargs['env_setting']) - self._pbs_header_template.extend(('{python} -m mlmc.tool.pbs_job {output_dir} {job_name} >' + self._pbs_header_template.extend(('{python} -m mlmc.tool.pbs_job {output_dir} {job_name} {scratch_dir} >' '{pbs_output_dir}/{job_name}_STDOUT 2>&1',)) self._pbs_config = kwargs @@ -221,28 +231,31 @@ def execute(self): script_content = "\n".join(self.pbs_script) self.write_script(script_content, job_file) - process = subprocess.run(['qsub', job_file], stderr=subprocess.PIPE, stdout=subprocess.PIPE) - try: - if process.returncode != 0: - raise Exception(process.stderr.decode('ascii')) - # Find all finished jobs - self._qsub_failed_n = 0 - # Write current job count - self._job_count += 1 - - # Get pbs_id from qsub output - pbs_id = process.stdout.decode("ascii").split(".")[0] - # Store pbs id for future qstat calls - self._pbs_ids.append(pbs_id) - pbs_process.write_pbs_id(pbs_id) - - self._current_job_weight = 0 - self._n_samples_in_job = 0 - self._scheduled = [] - except: - self._qsub_failed_n += 1 - if self._qsub_failed_n > SamplingPoolPBS.QSUB_FAILED_MAX_N: - raise Exception(process.stderr.decode("ascii")) + while self._qsub_failed_n <= SamplingPoolPBS.QSUB_FAILED_MAX_N: + process = subprocess.run(['qsub', job_file], stderr=subprocess.PIPE, stdout=subprocess.PIPE) + try: + if process.returncode != 0: + raise Exception(process.stderr.decode('ascii')) + # Find all finished jobs + self._qsub_failed_n = 0 + # Write current job count + self._job_count += 1 + + # Get pbs_id from qsub output + pbs_id = process.stdout.decode("ascii").split(".")[0] + # Store pbs id for future qstat calls + self._pbs_ids.append(pbs_id) + pbs_process.write_pbs_id(pbs_id) + + self._current_job_weight = 0 + self._n_samples_in_job = 0 + self._scheduled = [] + break + except: + self._qsub_failed_n += 1 + time.sleep(30) + if self._qsub_failed_n > SamplingPoolPBS.QSUB_FAILED_MAX_N: + raise Exception(process.stderr.decode("ascii")) def _create_script(self): """ @@ -277,6 +290,64 @@ def get_finished(self): finished_pbs_jobs, unfinished_pbs_jobs = self._qstat_pbs_job() return self._get_result_files(finished_pbs_jobs, unfinished_pbs_jobs) + def collect_data(self): + successful_results = {} + failed_results = {} + times = {} + sim_data_results = {} + # running_times = {} + # extract_mesh_times = {} + # make_field_times = {} + # generate_rnd_times = {} + # fine_flow_times = {} + # coarse_flow_times = {} + n_running = 0 + + os.chdir(self._jobs_dir) + for file in glob.glob("*_STDOUT"): + job_id = re.findall(r'(\d+)_STDOUT', file)[0] + + successful, failed, time = PbsJob.read_results(job_id, self._jobs_dir) + + # Split results to levels + for level_id, results in successful.items(): + successful_results.setdefault(level_id, []).extend(results) + for level_id, results in failed.items(): + failed_results.setdefault(level_id, []).extend(results) + for level_id, results in time.items(): + if level_id in times: + times[level_id][0] += results[-1][0] + times[level_id][1] += results[-1][1] + else: + times[level_id] = list(results[-1]) + + # # Optional simulation data + # for level_id, results in sim_data.items(): + # sim_data_results.setdefault(level_id, []).extend(results) + + # for level_id, results in running_time.items(): + # running_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] + # + # for level_id, results in extract_mesh.items(): + # extract_mesh_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] + # + # for level_id, results in make_field.items(): + # make_field_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] + # + # for level_id, results in generate_rnd.items(): + # generate_rnd_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] + # + # for level_id, results in fine_flow.items(): + # fine_flow_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] + # + # for level_id, results in coarse_flow.items(): + # coarse_flow_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] + + return successful_results, failed_results, n_running #, sim_data_results #list(times.items()), list(running_times.items()), \ + # list(extract_mesh_times.items()), list(make_field_times.items()), list(generate_rnd_times.items()), \ + # list(fine_flow_times.items()), \ + # list(coarse_flow_times.items()) + def _qstat_pbs_job(self): """ Parse qstat output and get all unfinished job ids @@ -286,23 +357,36 @@ def _qstat_pbs_job(self): if len(self._pbs_ids) > 0: # Get PBS id's status, # '-x' - displays status information for finished and moved jobs in addition to queued and running jobs. - qstat_call = ["qstat", "-x"] + qstat_call = ["qstat", "-xs"] qstat_call.extend(self._pbs_ids) - # qstat call - process = subprocess.run(qstat_call, stderr=subprocess.PIPE, stdout=subprocess.PIPE) - try: - if process.returncode != 0: - raise Exception(process.stderr.decode("ascii")) - output = process.stdout.decode("ascii") - # Find all finished jobs - finished_pbs_jobs = re.findall(r"(\d+)\..*\d+ F", output) - self._qstat_failed_n = 0 - except: - self._qstat_failed_n += 1 - if self._qstat_failed_n > SamplingPoolPBS.QSTAT_FAILED_MAX_N: - raise Exception(process.stderr.decode("ascii")) - finished_pbs_jobs = [] + while self._qstat_failed_n <= SamplingPoolPBS.QSTAT_FAILED_MAX_N: + # qstat call + unknown_job_ids = [] + process = subprocess.run(qstat_call, stderr=subprocess.PIPE, stdout=subprocess.PIPE) + try: + if process.returncode != 0: + err_output = process.stderr.decode("ascii") + # Presumably, Job Ids are 'unknown' for PBS after some time of their inactivity + unknown_job_ids = re.findall(r"Unknown Job Id (\d+)\.", err_output) + + if len(unknown_job_ids) == 0: + raise Exception(process.stderr.decode("ascii")) + + output = process.stdout.decode("ascii") + # Find all finished jobs + finished_pbs_jobs = re.findall(r"(\d+)\..*\d+ F", output) + finished_moved_pbs_jobs = re.findall(r"(\d+)\..*\d+ M.*\n.*Job finished", output) + finished_pbs_jobs.extend(finished_moved_pbs_jobs) + finished_pbs_jobs.extend(unknown_job_ids) + self._qstat_failed_n = 0 + break + except: + self._qstat_failed_n += 1 + time.sleep(30) + if self._qstat_failed_n > SamplingPoolPBS.QSTAT_FAILED_MAX_N: + raise Exception(process.stderr.decode("ascii")) + finished_pbs_jobs = [] # Get unfinished as diff between planned and finished unfinished_pbs_jobs = [] @@ -327,7 +411,7 @@ def _get_result_files(self, finished_pbs_jobs, unfinished_pbs_jobs): :return: successful_results: Dict[level_id, List[Tuple[sample_id: str, Tuple[fine_result: np.ndarray, coarse_result: n.ndarray]]]] failed_results: Dict[level_id, List[Tuple[sample_id: str, err_msg: str]]] n_running: int, number of running samples - times: + times: """ os.chdir(self._jobs_dir) @@ -343,6 +427,14 @@ def _get_result_files(self, finished_pbs_jobs, unfinished_pbs_jobs): successful_results = {} failed_results = {} times = {} + #sim_data_results = {} + # running_times = {} + # extract_mesh_times = {} + # make_field_times = {} + # generate_rnd_times = {} + # fine_flow_times = {} + # coarse_flow_times = {} + for pbs_id in finished_pbs_jobs: reg = "*_{}".format(pbs_id) # JobID_PbsId file file = glob.glob(reg) @@ -359,6 +451,8 @@ def _get_result_files(self, finished_pbs_jobs, unfinished_pbs_jobs): successful_results.setdefault(level_id, []).extend(results) for level_id, results in failed.items(): failed_results.setdefault(level_id, []).extend(results) + # for level_id, results in sim_data.items(): + # sim_data_results.setdefault(level_id, []).extend(results) for level_id, results in time.items(): if level_id in times: times[level_id][0] += results[-1][0] @@ -366,14 +460,38 @@ def _get_result_files(self, finished_pbs_jobs, unfinished_pbs_jobs): else: times[level_id] = list(results[-1]) + # for level_id, results in running_time.items(): + # running_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] + # + # for level_id, results in extract_mesh.items(): + # extract_mesh_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] + # + # for level_id, results in make_field.items(): + # make_field_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] + # + # for level_id, results in generate_rnd.items(): + # generate_rnd_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] + # + # for level_id, results in fine_flow.items(): + # fine_flow_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] + # + # for level_id, results in coarse_flow.items(): + # coarse_flow_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] # Delete pbsID file - it means job is finished SamplingPoolPBS.delete_pbs_id_file(file) if self._unfinished_sample_ids: successful_results, failed_results, times = self._collect_unfinished(successful_results, - failed_results, times) + failed_results, times, + ) + # running_times + # extract_mesh_times, + # make_field_times, + # generate_rnd_times, + # fine_flow_times, + # coarse_flow_times) - return successful_results, failed_results, n_running, list(times.items()) + return successful_results, failed_results, n_running, list(times.items())#, sim_data_results def _collect_unfinished(self, successful_results, failed_results, times): """ @@ -384,42 +502,64 @@ def _collect_unfinished(self, successful_results, failed_results, times): :return: all input dictionaries """ already_collected = set() + for sample_id in self._unfinished_sample_ids: if sample_id in already_collected: continue - job_id = PbsJob.job_id_from_sample_id(sample_id, self._jobs_dir) + try: + job_id = PbsJob.job_id_from_sample_id(sample_id, self._jobs_dir) + except (FileNotFoundError, KeyError) as e: + level_id = int(re.findall(r'L0?(\d*)', sample_id)[0]) + failed_results.setdefault(level_id, []).append((sample_id, "".format(e))) + continue + successful, failed, time = PbsJob.read_results(job_id, self._jobs_dir) # Split results to levels for level_id, results in successful.items(): - for res in results: - if res[0] in self._unfinished_sample_ids: - already_collected.add(res[0]) - successful_results.setdefault(level_id, []).append(res) - - for level_id, results in failed_results.items(): - for res in results: - if res[0] in self._unfinished_sample_ids: - already_collected.add(res[0]) - failed_results.setdefault(level_id, []).append(res) - - for level_id, results in times.items(): - for res in results: - if res[0] in self._unfinished_sample_ids: - times.setdefault(level_id, []).append(res) - times[level_id] = results + successful_results.setdefault(level_id, []).extend(results) + for level_id, results in failed.items(): + failed_results.setdefault(level_id, []).extend(results) + for level_id, results in time.items(): + times[level_id] = results[-1] + + # for level_id, results in sim_data.items(): + # sim_data_results.setdefault(level_id, []).extend(results) + + # for level_id, results in running_time.items(): + # running_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] + # + # for level_id, results in extract_mesh.items(): + # extract_mesh_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] + # + # for level_id, results in make_field.items(): + # make_field_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] + # + # for level_id, results in generate_rnd.items(): + # generate_rnd_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] + # + # for level_id, results in fine_flow.items(): + # fine_flow_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] + # + # for level_id, results in coarse_flow.items(): + # coarse_flow_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] + + level_id_sample_id_seed = PbsJob.get_scheduled_sample_ids(job_id, self._jobs_dir) + + for level_id, sample_id, _ in level_id_sample_id_seed: + already_collected.add(sample_id) # Delete pbsID file - it means job is finished # SamplingPoolPBS.delete_pbs_id_file(file) self._unfinished_sample_ids = set() - return successful_results, failed_results, times + return successful_results, failed_results, times#, sim_data_results def have_permanent_samples(self, sample_ids): """ - List of unfinished sample ids, the corresponding samples are collecting in next get_finished() call . + List of unfinished sample ids, the corresponding samples are collecting in next get_finished() call """ self._unfinished_sample_ids = set(sample_ids) diff --git a/mlmc/tool/pbs_job.py b/mlmc/tool/pbs_job.py index a39b8646..6b8a6535 100644 --- a/mlmc/tool/pbs_job.py +++ b/mlmc/tool/pbs_job.py @@ -149,11 +149,13 @@ def calculate_samples(self): current_samples = [] # Currently saved samples start_time = time.time() + successful_samples_time = 0 times = [] # Sample calculation time - Tuple(level_id, [n samples, cumul time for n sample]) n_times = 0 successful_dest_dir = os.path.join(self._output_dir, SamplingPool.SEVERAL_SUCCESSFUL_DIR) for level_id, sample_id, seed in level_id_sample_id_seed: + start_time = time.time() # Deserialize level simulation config if level_id not in self._level_simulations: self._get_level_sim(level_id) @@ -161,9 +163,10 @@ def calculate_samples(self): # Start measuring time if current_level != level_id: # Save previous level times - times.append((current_level, time.time() - start_time, n_times)) + times.append((current_level, successful_samples_time, n_times)) n_times = 0 start_time = time.time() + successful_samples_time = 0 current_level = level_id level_sim = self._level_simulations[current_level] @@ -178,6 +181,10 @@ def calculate_samples(self): SamplingPool.move_successful_rm(sample_id, level_sim, output_dir=self._output_dir, dest_dir=SamplingPool.SEVERAL_SUCCESSFUL_DIR) + n_times += 1 + successful_samples_time += (time.time() - start_time) + print("sample time ", time.time() - start_time) + # times.append((current_level, time.time() - start_time, n_times)) else: failed.append((current_level, sample_id, err_msg)) SamplingPool.move_failed_rm(sample_id, level_sim, @@ -185,8 +192,8 @@ def calculate_samples(self): dest_dir=SamplingPool.FAILED_DIR) current_samples.append(sample_id) - n_times += 1 - times.append((current_level, time.time() - start_time, n_times)) + #n_times += 1 + times.append((current_level, successful_samples_time, n_times)) self._save_to_file(success, failed, times, current_samples) success = [] From 95633ded1e80f0640218d70694fa77063841eb53 Mon Sep 17 00:00:00 2001 From: martinspetlik Date: Mon, 12 Aug 2024 12:08:54 +0200 Subject: [PATCH 05/11] scikit-learn --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 765d3159..f9301bd2 100644 --- a/setup.py +++ b/setup.py @@ -61,5 +61,5 @@ def read(*names, **kwargs): # include automatically all files in the template MANIFEST.in include_package_data=True, zip_safe=False, - install_requires=['numpy', 'scipy', 'sklearn', 'h5py>=3.1.0', 'ruamel.yaml', 'attrs', 'gstools', 'memoization'], + install_requires=['numpy', 'scipy', 'scikit-learn', 'h5py>=3.1.0', 'ruamel.yaml', 'attrs', 'gstools', 'memoization'], ) From 9d94434dda74ad50eb06eec227a1d01998c2b74d Mon Sep 17 00:00:00 2001 From: martinspetlik Date: Mon, 12 Aug 2024 12:47:41 +0200 Subject: [PATCH 06/11] hdf estimator fix --- mlmc/estimator.py | 66 ++++++++++++++++++++++++++++------------------- mlmc/tool/hdf5.py | 2 +- 2 files changed, 41 insertions(+), 27 deletions(-) diff --git a/mlmc/estimator.py b/mlmc/estimator.py index 9e9fad64..b5957f80 100644 --- a/mlmc/estimator.py +++ b/mlmc/estimator.py @@ -82,7 +82,9 @@ def estimate_diff_vars_regression(self, n_created_samples, moments_fn=None, raw_ raw_vars, n_samples = self.estimate_diff_vars(moments_fn) sim_steps = np.squeeze(self._sample_storage.get_level_parameters()) + # print("sim steps ", sim_steps) vars = self._all_moments_variance_regression(raw_vars, sim_steps) + # We need to get n_ops_estimate from storage return vars, self._sample_storage.get_n_ops() @@ -95,6 +97,8 @@ def estimate_diff_vars(self, moments_fn=None): n_samples - shape L, num samples for individual levels. """ moments_mean = qe.estimate_mean(qe.moments(self._quantity, moments_fn)) + # print("moments_mean.l_vars ", moments_mean.l_vars) + # print("moments_mean.n_samples ", moments_mean.n_samples) return moments_mean.l_vars, moments_mean.n_samples def _all_moments_variance_regression(self, raw_vars, sim_steps): @@ -196,7 +200,7 @@ def est_bootstrap(self, n_subsamples=100, sample_vector=None, moments_fn=None): bs_l_means = [] bs_l_vars = [] for i in range(n_subsamples): - quantity_subsample = self.quantity.select(self.quantity.subsample(sample_vec=sample_vector)) + quantity_subsample = self.quantity.subsample(sample_vec=sample_vector) moments_quantity = qe.moments(quantity_subsample, moments_fn=moments_fn, mom_at_bottom=False) q_mean = qe.estimate_mean(moments_quantity) @@ -205,6 +209,9 @@ def est_bootstrap(self, n_subsamples=100, sample_vector=None, moments_fn=None): bs_l_means.append(q_mean.l_means) bs_l_vars.append(q_mean.l_vars) + self.bs_mean = bs_mean + self.bs_var = bs_var + self.mean_bs_mean = np.mean(bs_mean, axis=0) self.mean_bs_var = np.mean(bs_var, axis=0) self.mean_bs_l_means = np.mean(bs_l_means, axis=0) @@ -217,14 +224,15 @@ def est_bootstrap(self, n_subsamples=100, sample_vector=None, moments_fn=None): self._bs_level_mean_variance = self.var_bs_l_means * np.array(self._sample_storage.get_n_collected())[:, None] - def bs_target_var_n_estimated(self, target_var, sample_vec=None): + def bs_target_var_n_estimated(self, target_var, sample_vec=None, n_subsamples=100): sample_vec = determine_sample_vec(n_collected_samples=self._sample_storage.get_n_collected(), n_levels=self._sample_storage.get_n_levels(), sample_vector=sample_vec) - self.est_bootstrap(n_subsamples=300, sample_vector=sample_vec) + self.est_bootstrap(n_subsamples=n_subsamples, sample_vector=sample_vec) variances, n_ops = self.estimate_diff_vars_regression(sample_vec, raw_vars=self.mean_bs_l_vars) + n_estimated = estimate_n_samples_for_target_variance(target_var, variances, n_ops, n_levels=self._sample_storage.get_n_levels()) @@ -304,10 +312,17 @@ def estimate_domain(quantity, sample_storage, quantile=None): except AttributeError: print("No collected values for level {}".format(level_id)) break - chunk_spec = next(sample_storage.chunks(n_samples=sample_storage.get_n_collected()[level_id])) - fine_samples = quantity.samples(chunk_spec)[..., 0] # Fine samples at level 0 + #print("sample_storage.get_n_collected()[level_id] ", type(sample_storage.get_n_collected()[level_id])) + print("sample_storage.get_n_collected() ", type(sample_storage.get_n_collected()[0])) + + if isinstance(sample_storage.get_n_collected()[level_id], AttributeError): + print("continue") + continue + chunk_spec = next(sample_storage.chunks(level_id=level_id, n_samples=sample_storage.get_n_collected()[level_id])) + fine_samples = quantity.samples(chunk_spec)[..., 0] # Fine samples at level 0 fine_samples = np.squeeze(fine_samples) + print("fine samples ", fine_samples) fine_samples = fine_samples[~np.isnan(fine_samples)] # remove NaN ranges.append(np.percentile(fine_samples, [100 * quantile, 100 * (1 - quantile)])) @@ -399,26 +414,26 @@ def consistency_check(quantity, sample_storage=None): return cons_check_val -def estimate_domain(quantity, sample_storage, quantile=None): - """ - Estimate moments domain from MLMC samples. - :param quantity: mlmc.quantity.Quantity instance, represents the real quantity - :param sample_storage: mlmc.sample_storage.SampleStorage instance, provides all the samples - :param quantile: float in interval (0, 1), None means whole sample range - :return: lower_bound, upper_bound - """ - ranges = [] - if quantile is None: - quantile = 0.01 - - for level_id in range(sample_storage.get_n_levels()): - fine_samples = quantity.samples(ChunkSpec(level_id=level_id, n_samples=sample_storage.get_n_collected()[0]))[..., 0] - - fine_samples = np.squeeze(fine_samples) - ranges.append(np.percentile(fine_samples, [100 * quantile, 100 * (1 - quantile)])) - - ranges = np.array(ranges) - return np.min(ranges[:, 0]), np.max(ranges[:, 1]) +# def estimate_domain(quantity, sample_storage, quantile=None): +# """ +# Estimate moments domain from MLMC samples. +# :param quantity: mlmc.quantity.Quantity instance, represents the real quantity +# :param sample_storage: mlmc.sample_storage.SampleStorage instance, provides all the samples +# :param quantile: float in interval (0, 1), None means whole sample range +# :return: lower_bound, upper_bound +# """ +# ranges = [] +# if quantile is None: +# quantile = 0.01 +# +# for level_id in range(sample_storage.get_n_levels()): +# fine_samples = quantity.samples(ChunkSpec(level_id=level_id, n_samples=sample_storage.get_n_collected()[0]))[..., 0] +# +# fine_samples = np.squeeze(fine_samples) +# ranges.append(np.percentile(fine_samples, [100 * quantile, 100 * (1 - quantile)])) +# +# ranges = np.array(ranges) +# return np.min(ranges[:, 0]), np.max(ranges[:, 1]) def coping_with_high_kurtosis(vars, costs, kurtosis, kurtosis_threshold=100): @@ -531,4 +546,3 @@ def determine_n_samples(n_levels, n_samples=None): return n_samples - diff --git a/mlmc/tool/hdf5.py b/mlmc/tool/hdf5.py index 62a9ac99..32e24149 100644 --- a/mlmc/tool/hdf5.py +++ b/mlmc/tool/hdf5.py @@ -357,7 +357,7 @@ def chunks(self, n_samples=None): dataset = hdf_file["/".join([self.level_group_path, "collected_values"])] if n_samples is not None: - yield ChunkSpec(chunk_id=0, chunk_slice=slice(0, n_samples, ...), level_id=int(self.level_id)) + yield ChunkSpec(chunk_id=0, chunk_slice=slice(0, n_samples, 1), level_id=int(self.level_id)) else: for chunk_id, chunk in enumerate(dataset.iter_chunks()): yield ChunkSpec(chunk_id=chunk_id, chunk_slice=chunk[0], level_id=int(self.level_id)) # slice, level_id From edf1f5a4160eb68e34879024acf4904fdc0ced8e Mon Sep 17 00:00:00 2001 From: martinspetlik Date: Mon, 12 Aug 2024 13:13:05 +0200 Subject: [PATCH 07/11] srf --- mlmc/random/correlated_field.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/mlmc/random/correlated_field.py b/mlmc/random/correlated_field.py index b5c809a7..cebd1572 100644 --- a/mlmc/random/correlated_field.py +++ b/mlmc/random/correlated_field.py @@ -198,7 +198,9 @@ def set_points(self, points, region_ids=[], region_map={}): :return: """ self.n_elements = len(points) - assert len(points) == len(region_ids) + print("n elements: {}, len(points): {}".format(self.n_elements, len(points))) + + #assert len(points) == len(region_ids) reg_points = {} for i, reg_id in enumerate(region_ids): reg_list = reg_points.get(reg_id, []) @@ -504,14 +506,14 @@ def _sample(self): class GSToolsSpatialCorrelatedField(RandomFieldBase): - def __init__(self, model, mode_no=1000, log=False, sigma=1): + def __init__(self, model, mode_no=1000, log=False, sigma=1, seed=None): """ :param model: instance of covariance model class, which parent is gstools.covmodel.CovModel :param mode_no: number of Fourier modes, default: 1000 as in gstools package """ self.model = model self.mode_no = mode_no - self.srf = gstools.SRF(model, mode_no=mode_no) + self.srf = gstools.SRF(model, mode_no=mode_no, seed=seed) self.mu = self.srf.mean self.sigma = sigma self.dim = model.dim @@ -753,7 +755,3 @@ def _sample(self): :return: Random field evaluated in points given by 'set_points'. """ return self.random_field() - - if not self.log: - return field - return np.exp(field) From 6e06ecc6e8a3640a05b3dc8bfd68136b23f89051 Mon Sep 17 00:00:00 2001 From: martinspetlik Date: Wed, 21 Aug 2024 14:34:09 +0200 Subject: [PATCH 08/11] ruamel.yaml lower version --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index ad19c689..83fe5a4a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,7 @@ numpy scipy sklearn h5py>=3.1.0 -ruamel.yaml +ruamel.yaml<0.18.0 attrs gstools memoization From bc22fa969fb4f56ceec5ffe181f182aa42ab415c Mon Sep 17 00:00:00 2001 From: martinspetlik Date: Wed, 21 Aug 2024 14:48:10 +0200 Subject: [PATCH 09/11] remove py36 from tox --- tox.ini | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tox.ini b/tox.ini index 7a4d4ef6..704732ff 100644 --- a/tox.ini +++ b/tox.ini @@ -3,12 +3,11 @@ # content of: tox.ini , put in same dir as setup.py [tox] -envlist = py36, py37, py38 +envlist = py37, py38 #envlist = py36 [gh-actions] python = - 3.6: py36 3.7: py37 3.8: py38 From b6f5df1e43806935bf7c2a0264f92e0842f24a30 Mon Sep 17 00:00:00 2001 From: martinspetlik Date: Wed, 21 Aug 2024 15:10:21 +0200 Subject: [PATCH 10/11] ruaml.yaml 0.17 --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 83fe5a4a..8931ac1d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,7 @@ numpy scipy sklearn h5py>=3.1.0 -ruamel.yaml<0.18.0 +ruamel.yaml==0.17.26 attrs gstools memoization From 9bc3168df84b64d69bf662a2247d537d3ecc2675 Mon Sep 17 00:00:00 2001 From: martinspetlik Date: Mon, 14 Jul 2025 15:37:39 +0200 Subject: [PATCH 11/11] error msg print --- mlmc/sampling_pool.py | 1 + 1 file changed, 1 insertion(+) diff --git a/mlmc/sampling_pool.py b/mlmc/sampling_pool.py index 0d402325..cbbbb360 100644 --- a/mlmc/sampling_pool.py +++ b/mlmc/sampling_pool.py @@ -122,6 +122,7 @@ def calculate_sample(sample_id, level_sim, work_dir=None, seed=None): except Exception: str_list = traceback.format_exception(*sys.exc_info()) err_msg = "".join(str_list) + print("Error msg: ", err_msg) return sample_id, res, err_msg, running_time