diff --git a/.codecov.yml b/.codecov.yml index bf866b089..c9dc0d1f2 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -4,3 +4,4 @@ ignore: - "libensemble/tools/live_data/*" - "libensemble/sim_funcs/executor_hworld.py" - "libensemble/gen_funcs/persistent_tasmanian.py" + - "libensemble/gen_classes/gpCAM.py" diff --git a/.flake8 b/.flake8 index 073989807..87d249502 100644 --- a/.flake8 +++ b/.flake8 @@ -38,6 +38,7 @@ per-file-ignores = # Need to set something before the APOSMM import libensemble/tests/regression_tests/test_persistent_aposmm*:E402 + libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py:E402 libensemble/tests/regression_tests/test_persistent_gp_multitask_ax.py:E402 libensemble/tests/functionality_tests/test_uniform_sampling_then_persistent_localopt_runs.py:E402 libensemble/tests/functionality_tests/test_stats_output.py:E402 diff --git a/.github/workflows/extra.yml b/.github/workflows/extra.yml index e41de99af..4b77ee4d5 100644 --- a/.github/workflows/extra.yml +++ b/.github/workflows/extra.yml @@ -113,6 +113,7 @@ jobs: rm ./libensemble/tests/regression_tests/test_persistent_fd_param_finder.py # needs octave, which doesn't yet support 3.13 rm ./libensemble/tests/regression_tests/test_persistent_aposmm_external_localopt.py # needs octave, which doesn't yet support 3.13 rm ./libensemble/tests/regression_tests/test_gpCAM.py # needs gpcam, which doesn't build on 3.13 + rm ./libensemble/tests/regression_tests/test_asktell_gpCAM.py # needs gpcam, which doesn't build on 3.13 - name: Install redis/proxystore run: | diff --git a/docs/function_guides/ask_tell_generator.rst b/docs/function_guides/ask_tell_generator.rst new file mode 100644 index 000000000..73f97124c --- /dev/null +++ b/docs/function_guides/ask_tell_generator.rst @@ -0,0 +1,21 @@ + +Ask/Tell Generators +=================== + +**BETA - SUBJECT TO CHANGE** + +These generators, implementations, methods, and subclasses are in BETA, and +may change in future releases. + +The Generator interface is expected to roughly correspond with CAMPA's standard: +https://github.com/campa-consortium/gest-api + +libEnsemble is in the process of supporting generator objects that implement the following interface: + +.. automodule:: generators + :members: Generator LibensembleGenerator + :undoc-members: + +.. autoclass:: Generator + :member-order: bysource + :members: diff --git a/docs/function_guides/function_guide_index.rst b/docs/function_guides/function_guide_index.rst index 621bf36d2..0539e24c6 100644 --- a/docs/function_guides/function_guide_index.rst +++ b/docs/function_guides/function_guide_index.rst @@ -13,6 +13,7 @@ These guides describe common development patterns and optional components: :caption: Writing User Functions generator + ask_tell_generator simulator allocator sim_gen_alloc_api diff --git a/libensemble/__init__.py b/libensemble/__init__.py index 605336821..8df3af207 100644 --- a/libensemble/__init__.py +++ b/libensemble/__init__.py @@ -12,3 +12,4 @@ from libensemble import logger from .ensemble import Ensemble +from .generators import Generator diff --git a/libensemble/comms/comms.py b/libensemble/comms/comms.py index 51042c463..52f71dad9 100644 --- a/libensemble/comms/comms.py +++ b/libensemble/comms/comms.py @@ -264,8 +264,8 @@ def __init__(self, main, nworkers, *args, **kwargs): self.inbox = Queue() self.outbox = Queue() super().__init__(self, main, *args, **kwargs) - comm = QComm(self.inbox, self.outbox, nworkers) - self.handle = Process(target=_qcomm_main, args=(comm, main) + args, kwargs=kwargs) + self.comm = QComm(self.inbox, self.outbox, nworkers) + self.handle = Process(target=_qcomm_main, args=(self.comm, main) + args, kwargs=kwargs) def terminate(self, timeout=None): """Terminate the process.""" diff --git a/libensemble/gen_classes/__init__.py b/libensemble/gen_classes/__init__.py new file mode 100644 index 000000000..d0524159d --- /dev/null +++ b/libensemble/gen_classes/__init__.py @@ -0,0 +1,2 @@ +from .aposmm import APOSMM # noqa: F401 +from .sampling import UniformSample # noqa: F401 diff --git a/libensemble/gen_classes/aposmm.py b/libensemble/gen_classes/aposmm.py new file mode 100644 index 000000000..886171821 --- /dev/null +++ b/libensemble/gen_classes/aposmm.py @@ -0,0 +1,229 @@ +import copy +from math import gamma, pi, sqrt +from typing import List + +import numpy as np +from gest_api.vocs import VOCS +from numpy import typing as npt + +from libensemble.generators import PersistentGenInterfacer +from libensemble.message_numbers import EVAL_GEN_TAG, PERSIS_STOP + + +class APOSMM(PersistentGenInterfacer): + """ + APOSMM coordinates multiple local optimization runs, dramatically reducing time for + discovering multiple minima on parallel systems. + + This *generator* adheres to the `Generator Standard `_. + + .. seealso:: + + `https://doi.org/10.1007/s12532-017-0131-4 `_ + + VOCS variables must include both regular and *_on_cube versions. E.g.,: + + vars_std = { + "var1": [-10.0, 10.0], + "var2": [0.0, 100.0], + "var3": [1.0, 50.0], + "var1_on_cube": [0, 1.0], + "var2_on_cube": [0, 1.0], + "var3_on_cube": [0, 1.0] + } + variables_mapping = { + "x": ["var1", "var2", "var3"], + "x_on_cube": ["var1_on_cube", "var2_on_cube", "var3_on_cube"], + } + gen = APOSMM(vocs, 3, 3, variables_mapping=variables_mapping, ...) + + Parameters + ---------- + vocs: VOCS + The VOCS object, adhering to the VOCS interface from the Generator Standard. + + max_active_runs: int + Bound on number of runs APOSMM is advancing. + + initial_sample_size: int + Number of uniformly sampled points to be evaluated internally before starting + the localopt runs. `.suggest()` will return samples from these points. + + History: npt.NDArray = [] + An optional history of previously evaluated points. + + sample_points: npt.NDArray = None + Points to be sampled (original domain). + If more sample points are needed by APOSMM during the course of the + optimization, points will be drawn uniformly over the domain. + + localopt_method: str = "LN_BOBYQA" + The local optimization method to use. + + rk_const: float = None + Multiplier in front of the ``r_k`` value. + If not provided, it will be set to ``0.5 * ((gamma(1 + (n / 2)) * 5) ** (1 / n)) / sqrt(pi)`` + + xtol_abs: float = 1e-6 + Localopt method's convergence tolerance. + + ftol_abs: float = 1e-6 + Localopt method's convergence tolerance. + + dist_to_bound_multiple: float = 0.5 + What fraction of the distance to the nearest boundary should the initial + step size be in localopt runs. + + random_seed: int = 1 + Seed for the random number generator. + """ + + def __init__( + self, + vocs: VOCS, + max_active_runs: int, + initial_sample_size: int, + History: npt.NDArray = [], + sample_points: npt.NDArray = None, + localopt_method: str = "LN_BOBYQA", + rk_const: float = None, + xtol_abs: float = 1e-6, + ftol_abs: float = 1e-6, + dist_to_bound_multiple: float = 0.5, + random_seed: int = 1, + **kwargs, + ) -> None: + + from libensemble.gen_funcs.persistent_aposmm import aposmm + + self.VOCS = vocs + + gen_specs = {} + gen_specs["user"] = {} + persis_info = {} + libE_info = {} + gen_specs["gen_f"] = aposmm + n = len(list(vocs.variables.keys())) + + if not rk_const: + rk_const = 0.5 * ((gamma(1 + (n / 2)) * 5) ** (1 / n)) / sqrt(pi) + + FIELDS = [ + "initial_sample_size", + "sample_points", + "localopt_method", + "rk_const", + "xtol_abs", + "ftol_abs", + "dist_to_bound_multiple", + "max_active_runs", + ] + + for k in FIELDS: + val = locals().get(k) + if val is not None: + gen_specs["user"][k] = val + + super().__init__(vocs, History, persis_info, gen_specs, libE_info, **kwargs) + + # Set bounds using the correct x mapping + x_mapping = self.variables_mapping["x"] + self.gen_specs["user"]["lb"] = np.array([vocs.variables[var].domain[0] for var in x_mapping]) + self.gen_specs["user"]["ub"] = np.array([vocs.variables[var].domain[1] for var in x_mapping]) + + x_size = len(self.variables_mapping.get("x", [])) + x_on_cube_size = len(self.variables_mapping.get("x_on_cube", [])) + assert x_size > 0 and x_on_cube_size > 0, "Both x and x_on_cube must be specified in variables_mapping" + assert x_size == x_on_cube_size, f"x and x_on_cube must have same length but got {x_size} and {x_on_cube_size}" + + gen_specs["out"] = [ + ("x", float, x_size), + ("x_on_cube", float, x_on_cube_size), + ("sim_id", int), + ("local_min", bool), + ("local_pt", bool), + ] + + gen_specs["persis_in"] = ["sim_id", "x", "x_on_cube", "f", "sim_ended"] + if "components" in kwargs or "components" in gen_specs.get("user", {}): + gen_specs["persis_in"].append("fvec") + + # SH - Need to know if this is gen_on_manager or not. + self.persis_info["nworkers"] = gen_specs["user"].get("max_active_runs") + self.all_local_minima = [] + self._suggest_idx = 0 + self._last_suggest = None + self._ingest_buf = None + self._n_buffd_results = 0 + self._told_initial_sample = False + + def _slot_in_data(self, results): + """Slot in libE_calc_in and trial data into corresponding array fields. *Initial sample only!!*""" + self._ingest_buf[self._n_buffd_results : self._n_buffd_results + len(results)] = results + + def _enough_initial_sample(self): + return ( + self._n_buffd_results >= int(self.gen_specs["user"]["initial_sample_size"]) + ) or self._told_initial_sample + + def _ready_to_suggest_genf(self): + """ + We're presumably ready to be suggested IF: + - When we're working on the initial sample: + - We have no _last_suggest cached + - all points given out have returned AND we've been suggested *at least* as many points as we cached + - When we're done with the initial sample: + - we've been suggested *at least* as many points as we cached + """ + if not self._told_initial_sample and self._last_suggest is not None: + cond = all([i in self._ingest_buf["sim_id"] for i in self._last_suggest["sim_id"]]) + else: + cond = True + return self._last_suggest is None or (cond and (self._suggest_idx >= len(self._last_suggest))) + + def suggest_numpy(self, num_points: int = 0) -> npt.NDArray: + """Request the next set of points to evaluate, as a NumPy array.""" + if self._ready_to_suggest_genf(): + self._suggest_idx = 0 + self._last_suggest = super().suggest_numpy(num_points) + + if self._last_suggest["local_min"].any(): # filter out local minima rows + min_idxs = self._last_suggest["local_min"] + self.all_local_minima.append(self._last_suggest[min_idxs]) + self._last_suggest = self._last_suggest[~min_idxs] + + if num_points > 0: # we've been suggested for a selection of the last suggest + results = np.copy(self._last_suggest[self._suggest_idx : self._suggest_idx + num_points]) + self._suggest_idx += num_points + + else: + results = np.copy(self._last_suggest) + self._last_suggest = None + + return results + + def ingest_numpy(self, results: npt.NDArray, tag: int = EVAL_GEN_TAG) -> None: + if (results is None and tag == PERSIS_STOP) or self._told_initial_sample: + super().ingest_numpy(results, tag) + return + + # Initial sample buffering here: + + if self._n_buffd_results == 0: + self._ingest_buf = np.zeros(self.gen_specs["user"]["initial_sample_size"], dtype=results.dtype) + self._ingest_buf["sim_id"] = -1 + + if not self._enough_initial_sample(): + self._slot_in_data(np.copy(results)) + self._n_buffd_results += len(results) + + if self._enough_initial_sample(): + super().ingest_numpy(self._ingest_buf, tag) + self._told_initial_sample = True + self._n_buffd_results = 0 + + def suggest_updates(self) -> List[npt.NDArray]: + """Request a list of NumPy arrays containing entries that have been identified as minima.""" + minima = copy.deepcopy(self.all_local_minima) + self.all_local_minima = [] + return minima diff --git a/libensemble/gen_classes/gpCAM.py b/libensemble/gen_classes/gpCAM.py new file mode 100644 index 000000000..b54447883 --- /dev/null +++ b/libensemble/gen_classes/gpCAM.py @@ -0,0 +1,153 @@ +"""Generator class exposing gpCAM functionality""" + +import time +from typing import List + +import numpy as np +from gest_api.vocs import VOCS +from gpcam import GPOptimizer as GP +from numpy import typing as npt + +# While there are class / func duplicates - re-use functions. +from libensemble.gen_funcs.persistent_gpCAM import ( + _calculate_grid_distances, + _eval_var, + _find_eligible_points, + _generate_mesh, + _read_testpoints, +) +from libensemble.generators import LibensembleGenerator + +__all__ = [ + "GP_CAM", + "GP_CAM_Covar", +] + + +# Equivalent to function persistent_gpCAM_ask_tell +class GP_CAM(LibensembleGenerator): + """ + This generation function constructs a global surrogate of `f` values. + + It is a batched method that produces a first batch uniformly random from + (lb, ub). On subsequent iterations, it calls an optimization method to + produce the next batch of points. This optimization might be too slow + (relative to the simulation evaluation time) for some use cases. + """ + + def __init__(self, VOCS: VOCS, ask_max_iter: int = 10, random_seed: int = 1, *args, **kwargs): + + super().__init__(VOCS, *args, **kwargs) + self.rng = np.random.default_rng(random_seed) + + self.lb = np.array([VOCS.variables[i].domain[0] for i in VOCS.variables]) + self.ub = np.array([VOCS.variables[i].domain[1] for i in VOCS.variables]) + self.n = len(self.lb) # dimension + self.all_x = np.empty((0, self.n)) + self.all_y = np.empty((0, 1)) + assert isinstance(self.n, int), "Dimension must be an integer" + assert isinstance(self.lb, np.ndarray), "lb must be a numpy array" + assert isinstance(self.ub, np.ndarray), "ub must be a numpy array" + + self.dtype = [("x", float, (self.n))] + + self.my_gp = None + self.noise = 1e-8 # 1e-12 + self.ask_max_iter = ask_max_iter + + def _validate_vocs(self, vocs): + assert len(vocs.variables), "VOCS must contain variables." + assert len(vocs.objectives), "VOCS must contain at least one objective." + + def suggest_numpy(self, n_trials: int) -> npt.NDArray: + if self.all_x.shape[0] == 0: + self.x_new = self.rng.uniform(self.lb, self.ub, (n_trials, self.n)) + else: + start = time.time() + self.x_new = self.my_gp.ask( + input_set=np.column_stack((self.lb, self.ub)), + n=n_trials, + pop_size=n_trials, + acquisition_function="total correlation", + max_iter=self.ask_max_iter, # Larger takes longer. gpCAM default is 20. + )["x"] + print(f"Ask time:{time.time() - start}") + H_o = np.zeros(n_trials, dtype=self.dtype) + H_o["x"] = self.x_new + return H_o + + def ingest_numpy(self, calc_in: npt.NDArray) -> None: + if calc_in is not None: + if "x" in calc_in.dtype.names: # SH should we require x in? + self.x_new = np.atleast_2d(calc_in["x"]) + self.y_new = np.atleast_2d(calc_in["f"]).T + nan_indices = [i for i, fval in enumerate(self.y_new) if np.isnan(fval[0])] + self.x_new = np.delete(self.x_new, nan_indices, axis=0) + self.y_new = np.delete(self.y_new, nan_indices, axis=0) + + self.all_x = np.vstack((self.all_x, self.x_new)) + self.all_y = np.vstack((self.all_y, self.y_new)) + + noise_var = self.noise * np.ones(len(self.all_y)) + if self.my_gp is None: + self.my_gp = GP(self.all_x, self.all_y.flatten(), noise_variances=noise_var) + else: + self.my_gp.tell(self.all_x, self.all_y.flatten(), noise_variances=noise_var) + self.my_gp.train() + + +class GP_CAM_Covar(GP_CAM): + """ + This generation function constructs a global surrogate of `f` values. + + It is a batched method that produces a first batch uniformly random from + (lb, ub) and on following iterations samples the GP posterior covariance + function to find sample points. + """ + + def __init__(self, VOCS, test_points_file: str = None, use_grid: bool = False, *args, **kwargs): + super().__init__(VOCS, *args, **kwargs) + self.test_points = _read_testpoints({"test_points_file": test_points_file}) + self.x_for_var = None + self.var_vals = None + self.use_grid = use_grid + self.persis_info = {} + if self.use_grid: + self.num_points = 10 + self.x_for_var = _generate_mesh(self.lb, self.ub, self.num_points) + self.r_low_init, self.r_high_init = _calculate_grid_distances(self.lb, self.ub, self.num_points) + + def suggest_numpy(self, n_trials: int) -> List[dict]: + if self.all_x.shape[0] == 0: + x_new = self.rng.uniform(self.lb, self.ub, (n_trials, self.n)) + else: + if not self.use_grid: + x_new = self.x_for_var[np.argsort(self.var_vals)[-n_trials:]] + else: + r_high = self.r_high_init + r_low = self.r_low_init + x_new = [] + r_cand = r_high # Let's start with a large radius and stop when we have batchsize points + + sorted_indices = np.argsort(-self.var_vals) + while len(x_new) < n_trials: + x_new = _find_eligible_points(self.x_for_var, sorted_indices, r_cand, n_trials) + if len(x_new) < n_trials: + r_high = r_cand + r_cand = (r_high + r_low) / 2.0 + + self.x_new = x_new + H_o = np.zeros(n_trials, dtype=self.dtype) + H_o["x"] = self.x_new + return H_o + + def ingest_numpy(self, calc_in: npt.NDArray): + if calc_in is not None: + super().ingest_numpy(calc_in) + if not self.use_grid: + n_trials = len(self.y_new) + self.x_for_var = self.rng.uniform(self.lb, self.ub, (10 * n_trials, self.n)) + + self.var_vals = _eval_var( + self.my_gp, self.all_x, self.all_y, self.x_for_var, self.test_points, self.persis_info + ) diff --git a/libensemble/gen_classes/sampling.py b/libensemble/gen_classes/sampling.py new file mode 100644 index 000000000..5e8102c22 --- /dev/null +++ b/libensemble/gen_classes/sampling.py @@ -0,0 +1,36 @@ +"""Generator classes providing points using sampling""" + +import numpy as np +from gest_api.vocs import VOCS + +from libensemble.generators import LibensembleGenerator + +__all__ = [ + "UniformSample", +] + + +class UniformSample(LibensembleGenerator): + """ + Samples over the domain specified in the VOCS. + """ + + def __init__(self, VOCS: VOCS, random_seed: int = 1, *args, **kwargs): + super().__init__(VOCS, *args, **kwargs) + self.rng = np.random.default_rng(random_seed) + + self.n = len(list(self.VOCS.variables.keys())) + self.np_dtype = [("x", float, (self.n))] + self.lb = np.array([VOCS.variables[i].domain[0] for i in VOCS.variables]) + self.ub = np.array([VOCS.variables[i].domain[1] for i in VOCS.variables]) + + def suggest_numpy(self, n_trials): + out = np.zeros(n_trials, dtype=self.np_dtype) + + for i in range(n_trials): + out[i]["x"] = self.rng.uniform(self.lb, self.ub, (self.n)) + + return out + + def ingest_numpy(self, calc_in): + pass # random sample so nothing to tell diff --git a/libensemble/gen_funcs/aposmm_localopt_support.py b/libensemble/gen_funcs/aposmm_localopt_support.py index 2de29c870..21ceb5e0e 100644 --- a/libensemble/gen_funcs/aposmm_localopt_support.py +++ b/libensemble/gen_funcs/aposmm_localopt_support.py @@ -18,6 +18,7 @@ import numpy as np import psutil +import traceback import libensemble.gen_funcs from libensemble.message_numbers import EVAL_GEN_TAG, STOP_TAG # Only used to simulate receiving from manager @@ -645,8 +646,8 @@ def run_local_tao(user_specs, comm_queue, x0, f0, child_can_read, parent_can_rea def opt_runner(run_local_opt, user_specs, comm_queue, x0, f0, child_can_read, parent_can_read): try: run_local_opt(user_specs, comm_queue, x0, f0, child_can_read, parent_can_read) - except Exception as e: - comm_queue.put(ErrorMsg(e)) + except Exception: + comm_queue.put(ErrorMsg(traceback.format_exc())) parent_can_read.set() @@ -743,7 +744,7 @@ def put_set_wait_get(x, comm_queue, parent_can_read, child_can_read, user_specs) if user_specs.get("periodic"): assert np.allclose(x % 1, values[0] % 1, rtol=1e-15, atol=1e-15), "The point I gave is not the point I got back" else: - assert np.allclose(x, values[0], rtol=1e-15, atol=1e-15), "The point I gave is not the point I got back" + assert np.allclose(x, values[0], rtol=1e-8, atol=1e-8), "The point I gave is not the point I got back" return values diff --git a/libensemble/generators.py b/libensemble/generators.py new file mode 100644 index 000000000..7c7c5b933 --- /dev/null +++ b/libensemble/generators.py @@ -0,0 +1,239 @@ +from abc import abstractmethod +from typing import List, Optional + +import numpy as np +from gest_api import Generator +from gest_api.vocs import VOCS +from numpy import typing as npt + +from libensemble.comms.comms import QCommProcess # , QCommThread +from libensemble.executors import Executor +from libensemble.message_numbers import EVAL_GEN_TAG, PERSIS_STOP +from libensemble.tools.tools import add_unique_random_streams +from libensemble.utils.misc import list_dicts_to_np, np_to_list_dicts, unmap_numpy_array + + +class GeneratorNotStartedException(Exception): + """Exception raised by a threaded/multiprocessed generator upon being suggested without having been started""" + + +class LibensembleGenerator(Generator): + """ + Generator interface that accepts the classic History, persis_info, gen_specs, libE_info parameters after VOCS. + + ``suggest/ingest`` methods communicate lists of dictionaries, like the standard. + ``suggest_numpy/ingest_numpy`` methods communicate numpy arrays containing the same data. + + .. note:: + Most LibensembleGenerator instances operate on "x" for variables and "f" for objectives internally. + By default we map "x" to the VOCS variables and "f" to the VOCS objectives, which works for most use cases. + If a given generator iterates internally over multiple, multi-dimensional variables or objectives, + then providing a custom ``variables_mapping`` is recommended. + + For instance: + ``variables_mapping = {"x": ["core", "edge"], + "y": ["mirror-x", "mirror-y"], + "f": ["energy"], + "grad": ["grad_x", "grad_y"]}``. + """ + + def __init__( + self, + VOCS: VOCS, + History: npt.NDArray = [], + persis_info: dict = {}, + gen_specs: dict = {}, + libE_info: dict = {}, + variables_mapping: dict = {}, + **kwargs, + ): + self._validate_vocs(VOCS) + self.VOCS = VOCS + self.History = History + self.gen_specs = gen_specs + self.libE_info = libE_info + + self.variables_mapping = variables_mapping + if not self.variables_mapping: + self.variables_mapping = {} + # Map variables to x if not already mapped + if "x" not in self.variables_mapping: + # SH TODO - is this check needed? + if len(list(self.VOCS.variables.keys())) > 1 or list(self.VOCS.variables.keys())[0] != "x": + self.variables_mapping["x"] = self._get_unmapped_keys(self.VOCS.variables, "x") + # Map objectives to f if not already mapped + if "f" not in self.variables_mapping: + if ( + len(list(self.VOCS.objectives.keys())) > 1 or list(self.VOCS.objectives.keys())[0] != "f" + ): # e.g. {"f": ["f"]} doesn't need mapping + self.variables_mapping["f"] = self._get_unmapped_keys(self.VOCS.objectives, "f") + + if len(kwargs) > 0: # so user can specify gen-specific parameters as kwargs to constructor + if not self.gen_specs.get("user"): + self.gen_specs["user"] = {} + self.gen_specs["user"].update(kwargs) + if not persis_info.get("rand_stream"): + self.persis_info = add_unique_random_streams({}, 4, seed=4321)[1] + else: + self.persis_info = persis_info + + def _validate_vocs(self, vocs) -> None: + pass + + def _get_unmapped_keys(self, vocs_dict, default_key): + """Get keys from vocs_dict that aren't already mapped to other keys in variables_mapping.""" + # Get all variables that aren't already mapped to other keys + mapped_vars = [] + for mapped_list in self.variables_mapping.values(): + mapped_vars.extend(mapped_list) + unmapped_vars = [v for v in list(vocs_dict.keys()) if v not in mapped_vars] + return unmapped_vars + + @abstractmethod + def suggest_numpy(self, num_points: Optional[int] = 0) -> npt.NDArray: + """Request the next set of points to evaluate, as a NumPy array.""" + + @abstractmethod + def ingest_numpy(self, results: npt.NDArray) -> None: + """Send the results, as a NumPy array, of evaluations to the generator.""" + + @staticmethod + def convert_np_types(dict_list): + return [ + {key: (value.item() if isinstance(value, np.generic) else value) for key, value in item.items()} + for item in dict_list + ] + + def suggest(self, num_points: Optional[int] = 0) -> List[dict]: + """Request the next set of points to evaluate.""" + return LibensembleGenerator.convert_np_types( + np_to_list_dicts(self.suggest_numpy(num_points), mapping=self.variables_mapping) + ) + + def ingest(self, results: List[dict]) -> None: + """Send the results of evaluations to the generator.""" + self.ingest_numpy(list_dicts_to_np(results, mapping=self.variables_mapping)) + + +class PersistentGenInterfacer(LibensembleGenerator): + """Implement suggest/ingest for traditionally written libEnsemble persistent generator functions. + Still requires a handful of libEnsemble-specific data-structures on initialization. + """ + + def __init__( + self, + VOCS: VOCS, + History: npt.NDArray = [], + persis_info: dict = {}, + gen_specs: dict = {}, + libE_info: dict = {}, + **kwargs, + ) -> None: + super().__init__(VOCS, History, persis_info, gen_specs, libE_info, **kwargs) + self.gen_f = gen_specs["gen_f"] + self.History = History + self.libE_info = libE_info + self.running_gen_f = None + self.gen_result = None + + def setup(self) -> None: + """Must be called once before calling suggest/ingest. Initializes the background thread.""" + if self.running_gen_f is not None: + return + # SH this contains the thread lock - removing.... wrong comm to pass on anyway. + if hasattr(Executor.executor, "comm"): + del Executor.executor.comm + self.libE_info["executor"] = Executor.executor + + self.running_gen_f = QCommProcess( + self.gen_f, + None, + self.History, + self.persis_info, + self.gen_specs, + self.libE_info, + user_function=True, + ) + + # This can be set here since the object isnt started until the first suggest + self.libE_info["comm"] = self.running_gen_f.comm + + def _prep_fields(self, results: npt.NDArray) -> npt.NDArray: + """Filter out fields that are not in persis_in and add sim_ended to the dtype""" + filtered_dtype = [ + (name, results.dtype[name]) for name in results.dtype.names if name in self.gen_specs["persis_in"] + ] + + new_dtype = filtered_dtype + [("sim_ended", bool)] + new_results = np.zeros(len(results), dtype=new_dtype) + + for field in new_results.dtype.names: + try: + new_results[field] = results[field] + except ValueError: + continue + + new_results["sim_ended"] = True + return new_results + + def ingest(self, results: List[dict], tag: int = EVAL_GEN_TAG) -> None: + """Send the results of evaluations to the generator.""" + self.ingest_numpy(list_dicts_to_np(results, mapping=self.variables_mapping), tag) + + def suggest_numpy(self, num_points: int = 0) -> npt.NDArray: + """Request the next set of points to evaluate, as a NumPy array.""" + if self.running_gen_f is None: + self.setup() + self.running_gen_f.run() + _, suggest_full = self.running_gen_f.recv() + return suggest_full["calc_out"] + + def ingest_numpy(self, results: npt.NDArray, tag: int = EVAL_GEN_TAG) -> None: + """Send the results of evaluations to the generator, as a NumPy array.""" + if results is not None: + results = self._prep_fields(results) + Work = {"libE_info": {"H_rows": np.copy(results["sim_id"]), "persistent": True, "executor": None}} + self.running_gen_f.send(tag, Work) + self.running_gen_f.send( + tag, np.copy(results) + ) # SH for threads check - might need deepcopy due to dtype=object + else: + self.running_gen_f.send(tag, None) + + def finalize(self) -> None: + """Stop the generator process and store the returned data.""" + self.ingest_numpy(None, PERSIS_STOP) # conversion happens in ingest + self.gen_result = self.running_gen_f.result() + + def export( + self, user_fields: bool = False, as_dicts: bool = False + ) -> tuple[npt.NDArray | list | None, dict | None, int | None]: + """Return the generator's results + Parameters + ---------- + user_fields : bool, optional + If True, return local_H with variables unmapped from arrays back to individual fields. + Default is False. + as_dicts : bool, optional + If True, return local_H as list of dictionaries instead of numpy array. + Default is False. + Returns + ------- + local_H : npt.NDArray | list + Generator history array (unmapped if user_fields=True, as dicts if as_dicts=True). + persis_info : dict + Persistent information. + tag : int + Status flag (e.g., FINISHED_PERSISTENT_GEN_TAG). + """ + if not self.gen_result: + return (None, None, None) + local_H, persis_info, tag = self.gen_result + if user_fields and local_H is not None and self.variables_mapping: + local_H = unmap_numpy_array(local_H, self.variables_mapping) + if as_dicts and local_H is not None: + if user_fields and self.variables_mapping: + local_H = np_to_list_dicts(local_H, self.variables_mapping, allow_arrays=True) + else: + local_H = np_to_list_dicts(local_H, allow_arrays=True) + return (local_H, persis_info, tag) diff --git a/libensemble/libE.py b/libensemble/libE.py index af302d13c..2936ea7a1 100644 --- a/libensemble/libE.py +++ b/libensemble/libE.py @@ -280,7 +280,7 @@ def manager( logger.info(f"libE version v{__version__}") if "out" in gen_specs and ("sim_id", int) in gen_specs["out"]: - if "libensemble.gen_funcs" not in gen_specs["gen_f"].__module__: + if hasattr(gen_specs["gen_f"], "__module__") and "libensemble.gen_funcs" not in gen_specs["gen_f"].__module__: logger.manager_warning(_USER_SIM_ID_WARNING) try: @@ -458,6 +458,7 @@ def start_proc_team(nworkers, sim_specs, gen_specs, libE_specs, log_comm=True): for wcomm in wcomms: wcomm.run() + return wcomms diff --git a/libensemble/manager.py b/libensemble/manager.py index b12b96a77..97f8f8225 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -615,6 +615,7 @@ def _final_receive_and_kill(self, persis_info: dict) -> (dict, int, int): if self.live_data is not None: self.live_data.finalize(self.hist) + persis_info["num_gens_started"] = 0 return persis_info, exit_flag, self.elapsed() def _sim_max_given(self) -> bool: diff --git a/libensemble/sim_funcs/borehole_kills.py b/libensemble/sim_funcs/borehole_kills.py index 54a31256b..47a00af90 100644 --- a/libensemble/sim_funcs/borehole_kills.py +++ b/libensemble/sim_funcs/borehole_kills.py @@ -5,7 +5,7 @@ from libensemble.sim_funcs.surmise_test_function import borehole_true -def subproc_borehole(H, delay): +def subproc_borehole(H, delay, poll_manager): """This evaluates the Borehole function using a subprocess running compiled code. @@ -15,14 +15,14 @@ def subproc_borehole(H, delay): """ with open("input", "w") as f: - H["thetas"][0].tofile(f) - H["x"][0].tofile(f) + H["thetas"].tofile(f) + H["x"].tofile(f) exctr = Executor.executor args = "input" + " " + str(delay) task = exctr.submit(app_name="borehole", app_args=args, stdout="out.txt", stderr="err.txt") - calc_status = exctr.polling_loop(task, delay=0.01, poll_manager=True) + calc_status = exctr.polling_loop(task, delay=0.01, poll_manager=poll_manager) if calc_status in MAN_KILL_SIGNALS + [TASK_FAILED]: f = np.inf @@ -45,7 +45,7 @@ def borehole(H, persis_info, sim_specs, libE_info): if sim_id > sim_specs["user"]["init_sample_size"]: delay = 2 + np.random.normal(scale=0.5) - f, calc_status = subproc_borehole(H, delay) + f, calc_status = subproc_borehole(H, delay, sim_specs["user"].get("poll_manager", True)) if calc_status in MAN_KILL_SIGNALS and "sim_killed" in H_o.dtype.names: H_o["sim_killed"] = True # For calling script to print only. diff --git a/libensemble/specs.py b/libensemble/specs.py index 308491303..e386a2b4e 100644 --- a/libensemble/specs.py +++ b/libensemble/specs.py @@ -82,6 +82,11 @@ class GenSpecs(BaseModel): simulator function, and makes decisions based on simulator function output. """ + generator: object | None = None + """ + A pre-initialized generator object. + """ + inputs: list[str] | None = Field(default=[], alias="in") """ list of **field names** out of the complete history to pass @@ -109,6 +114,24 @@ class GenSpecs(BaseModel): calling them locally. """ + initial_batch_size: int = 0 + """ + Number of initial points to request that the generator create. If zero, falls back to ``batch_size``. + If both options are zero, defaults to the number of workers. + + Note: Certain generators included with libEnsemble decide + batch sizes via ``gen_specs["user"]`` or other methods. + """ + + batch_size: int = 0 + """ + Number of points to generate in each batch. If zero, falls back to the number of + completed evaluations most recently told to the generator. + + Note: Certain generators included with libEnsemble decide + batch sizes via ``gen_specs["user"]`` or other methods. + """ + threaded: bool | None = False """ Instruct Worker process to launch user function to a thread. diff --git a/libensemble/tests/functionality_tests/test_asktell_sampling.py b/libensemble/tests/functionality_tests/test_asktell_sampling.py new file mode 100644 index 000000000..55e3b7afc --- /dev/null +++ b/libensemble/tests/functionality_tests/test_asktell_sampling.py @@ -0,0 +1,109 @@ +""" +Runs libEnsemble with Latin hypercube sampling on a simple 1D problem + +Execute via one of the following commands (e.g. 3 workers): + mpiexec -np 4 python test_sampling_asktell_gen.py + python test_sampling_asktell_gen.py --nworkers 3 --comms local + python test_sampling_asktell_gen.py --nworkers 3 --comms tcp + +The number of concurrent evaluations of the objective function will be 4-1=3. +""" + +# Do not change these lines - they are parsed by run-tests.sh +# TESTSUITE_COMMS: mpi local +# TESTSUITE_NPROCS: 2 4 + +import numpy as np +from gest_api import Generator +from gest_api.vocs import VOCS + +# Import libEnsemble items for this test +from libensemble.alloc_funcs.start_only_persistent import only_persistent_gens as alloc_f +from libensemble.gen_classes.sampling import UniformSample +from libensemble.libE import libE +from libensemble.tools import add_unique_random_streams, parse_args + + +class StandardSample(Generator): + """ + This sampler only adheres to the complete standard interface, with no additional numpy methods. + """ + + def __init__(self, VOCS: VOCS): + self.VOCS = VOCS + self.rng = np.random.default_rng(1) + super().__init__(VOCS) + + def _validate_vocs(self, VOCS): + assert len(self.VOCS.variables), "VOCS must contain variables." + + def suggest(self, n_trials): + output = [] + for _ in range(n_trials): + trial = {} + for key in self.VOCS.variables.keys(): + trial[key] = self.rng.uniform(self.VOCS.variables[key].domain[0], self.VOCS.variables[key].domain[1]) + output.append(trial) + return output + + def ingest(self, calc_in): + pass # random sample so nothing to tell + + +def sim_f(In): + Out = np.zeros(1, dtype=[("f", float)]) + Out["f"] = np.linalg.norm(In) + return Out + + +if __name__ == "__main__": + nworkers, is_manager, libE_specs, _ = parse_args() + libE_specs["gen_on_manager"] = True + + sim_specs = { + "sim_f": sim_f, + "in": ["x"], + "out": [("f", float)], + } + + gen_specs = { + "persis_in": ["x", "f", "sim_id"], + "out": [("x", float, (2,))], + "initial_batch_size": 20, + "batch_size": 10, + "user": { + "initial_batch_size": 20, # for wrapper + "lb": np.array([-3, -2]), + "ub": np.array([3, 2]), + }, + } + + variables = {"x0": [-3, 3], "x1": [-2, 2]} + objectives = {"energy": "EXPLORE"} + + vocs = VOCS(variables=variables, objectives=objectives) + + alloc_specs = {"alloc_f": alloc_f} + exit_criteria = {"gen_max": 201} + persis_info = add_unique_random_streams({}, nworkers + 1, seed=1234) + + for test in range(3): + if test == 0: + generator = StandardSample(vocs) + + elif test == 1: + persis_info["num_gens_started"] = 0 + generator = UniformSample(vocs) + + elif test == 2: + persis_info["num_gens_started"] = 0 + generator = UniformSample(vocs, variables_mapping={"x": ["x0", "x1"], "f": ["energy"]}) + + gen_specs["generator"] = generator + H, persis_info, flag = libE( + sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs=libE_specs + ) + + if is_manager: + print(H[["sim_id", "x", "f"]][:10]) + assert len(H) >= 201, f"H has length {len(H)}" diff --git a/libensemble/tests/functionality_tests/test_persistent_uniform_gen_decides_stop.py b/libensemble/tests/functionality_tests/test_persistent_uniform_gen_decides_stop.py index 68c8aaaa0..d9b946508 100644 --- a/libensemble/tests/functionality_tests/test_persistent_uniform_gen_decides_stop.py +++ b/libensemble/tests/functionality_tests/test_persistent_uniform_gen_decides_stop.py @@ -82,9 +82,7 @@ assert ( sum(counts == init_batch_size) >= ngens ), "The initial batch of each gen should be common among initial_batch_size number of points" - assert ( - len(counts) > 1 - ), "All gen_ended_times are the same; they should be different for the async case" + assert len(counts) > 1, "All gen_ended_times are the same; they should be different for the async case" gen_workers = np.unique(H["gen_worker"]) print("Generators that issued points", gen_workers) diff --git a/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py b/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py new file mode 100644 index 000000000..67716dca1 --- /dev/null +++ b/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py @@ -0,0 +1,97 @@ +""" +Runs libEnsemble with APOSMM with the NLopt local optimizer. + +Execute via one of the following commands (e.g. 3 workers): + mpiexec -np 4 python test_persistent_aposmm_nlopt.py + python test_persistent_aposmm_nlopt.py --nworkers 3 --comms local + python test_persistent_aposmm_nlopt.py --nworkers 3 --comms tcp + +When running with the above commands, the number of concurrent evaluations of +the objective function will be 2, as one of the three workers will be the +persistent generator. +""" + +# Do not change these lines - they are parsed by run-tests.sh +# TESTSUITE_COMMS: local mpi tcp +# TESTSUITE_NPROCS: 3 + +import sys +from math import gamma, pi, sqrt + +import numpy as np + +import libensemble.gen_funcs + +# Import libEnsemble items for this test +from libensemble.sim_funcs.six_hump_camel import six_hump_camel as sim_f + +libensemble.gen_funcs.rc.aposmm_optimizers = "nlopt" +from time import time + +from gest_api.vocs import VOCS + +from libensemble import Ensemble +from libensemble.alloc_funcs.persistent_aposmm_alloc import persistent_aposmm_alloc as alloc_f +from libensemble.gen_classes import APOSMM +from libensemble.specs import AllocSpecs, ExitCriteria, GenSpecs, SimSpecs +from libensemble.tests.regression_tests.support import six_hump_camel_minima as minima + +# Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). +if __name__ == "__main__": + + workflow = Ensemble(parse_args=True) + + if workflow.is_manager: + start_time = time() + + if workflow.nworkers < 2: + sys.exit("Cannot run with a persistent worker if only one worker -- aborting...") + + n = 2 + workflow.sim_specs = SimSpecs(sim_f=sim_f, inputs=["x"], outputs=[("f", float)]) + workflow.alloc_specs = AllocSpecs(alloc_f=alloc_f) + workflow.exit_criteria = ExitCriteria(sim_max=2000) + + vocs = VOCS( + variables={"core": [-3, 3], "edge": [-2, 2], "core_on_cube": [-3, 3], "edge_on_cube": [-2, 2]}, + objectives={"energy": "MINIMIZE"}, + ) + + aposmm = APOSMM( + vocs, + max_active_runs=workflow.nworkers, # should this match nworkers always? practically? + variables_mapping={"x": ["core", "edge"], "x_on_cube": ["core_on_cube", "edge_on_cube"], "f": ["energy"]}, + initial_sample_size=100, + sample_points=minima, + localopt_method="LN_BOBYQA", + rk_const=0.5 * ((gamma(1 + (n / 2)) * 5) ** (1 / n)) / sqrt(pi), + xtol_abs=1e-6, + ftol_abs=1e-6, + ) + + # SH TODO - dont want this stuff duplicated - pass with vocs instead + workflow.gen_specs = GenSpecs( + persis_in=["x", "x_on_cube", "sim_id", "local_min", "local_pt", "f"], + generator=aposmm, + batch_size=5, + initial_batch_size=10, + user={"initial_sample_size": 100}, + ) + + workflow.libE_specs.gen_on_manager = True + workflow.add_random_streams() + + H, _, _ = workflow.run() + + # Perform the run + + if workflow.is_manager: + print("[Manager]:", H[np.where(H["local_min"])]["x"]) + print("[Manager]: Time taken =", time() - start_time, flush=True) + + tol = 1e-5 + for m in minima: + # The minima are known on this test problem. + # We use their values to test APOSMM has identified all minima + print(np.min(np.sum((H[H["local_min"]]["x"] - m) ** 2, 1)), flush=True) + assert np.min(np.sum((H[H["local_min"]]["x"] - m) ** 2, 1)) < tol diff --git a/libensemble/tests/regression_tests/test_asktell_gpCAM.py b/libensemble/tests/regression_tests/test_asktell_gpCAM.py new file mode 100644 index 000000000..b093a0df7 --- /dev/null +++ b/libensemble/tests/regression_tests/test_asktell_gpCAM.py @@ -0,0 +1,96 @@ +""" +Tests libEnsemble with gpCAM + +Execute via one of the following commands (e.g. 3 workers): + mpiexec -np 4 python test_gpCAM_class.py + python test_gpCAM_class.py --nworkers 3 --comms local + +When running with the above commands, the number of concurrent evaluations of +the objective function will be 2, as one of the three workers will be the +persistent generator. + +See libensemble.gen_funcs.persistent_gpCAM for more details about the generator +setup. +""" + +# Do not change these lines - they are parsed by run-tests.sh +# TESTSUITE_COMMS: mpi local +# TESTSUITE_NPROCS: 4 +# TESTSUITE_EXTRA: true + +import sys +import warnings + +import numpy as np +from gest_api.vocs import VOCS + +from libensemble.alloc_funcs.start_only_persistent import only_persistent_gens as alloc_f +from libensemble.gen_classes.gpCAM import GP_CAM, GP_CAM_Covar + +# Import libEnsemble items for this test +from libensemble.libE import libE +from libensemble.sim_funcs.rosenbrock import rosenbrock_eval as sim_f +from libensemble.tools import parse_args, save_libE_output + +warnings.filterwarnings("ignore", message="Default hyperparameter_bounds") + + +# Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). +if __name__ == "__main__": + nworkers, is_manager, libE_specs, _ = parse_args() + + if nworkers < 2: + sys.exit("Cannot run with a persistent worker if only one worker -- aborting...") + + n = 4 + batch_size = 15 + + sim_specs = { + "sim_f": sim_f, + "in": ["x"], + "out": [ + ("f", float), + ], + } + + gen_specs = { + "persis_in": ["x", "f", "sim_id"], + "out": [("x", float, (n,))], + "batch_size": batch_size, + "user": { + "lb": np.array([-3, -2, -1, -1]), + "ub": np.array([3, 2, 1, 1]), + }, + } + + vocs = VOCS(variables={"x0": [-3, 3], "x1": [-2, 2], "x2": [-1, 1], "x3": [-1, 1]}, objectives={"f": "MINIMIZE"}) + + alloc_specs = {"alloc_f": alloc_f} + + gen = GP_CAM_Covar(vocs) + + for inst in range(3): + if inst == 0: + gen_specs["generator"] = gen + num_batches = 10 + exit_criteria = {"sim_max": num_batches * batch_size, "wallclock_max": 300} + libE_specs["save_every_k_gens"] = 150 + libE_specs["H_file_prefix"] = "gpCAM_nongrid" + if inst == 1: + gen = GP_CAM_Covar(vocs, use_grid=True, test_points_file="gpCAM_nongrid_after_gen_150.npy") + gen_specs["generator"] = gen + libE_specs["final_gen_send"] = True + del libE_specs["H_file_prefix"] + del libE_specs["save_every_k_gens"] + elif inst == 2: + gen = GP_CAM(vocs, ask_max_iter=1) + gen_specs["generator"] = gen + num_batches = 3 # Few because the ask_tell gen can be slow + exit_criteria = {"sim_max": num_batches * batch_size, "wallclock_max": 300} + + # Perform the run + H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, {}, alloc_specs, libE_specs) + if is_manager: + assert len(np.unique(H["gen_ended_time"])) == num_batches + + save_libE_output(H, persis_info, __file__, nworkers) diff --git a/libensemble/tests/regression_tests/test_persistent_aposmm_nlopt.py b/libensemble/tests/regression_tests/test_persistent_aposmm_nlopt.py index 3cf69bf5d..4ecd3229e 100644 --- a/libensemble/tests/regression_tests/test_persistent_aposmm_nlopt.py +++ b/libensemble/tests/regression_tests/test_persistent_aposmm_nlopt.py @@ -79,7 +79,7 @@ alloc_specs = {"alloc_f": alloc_f} - persis_info = add_unique_random_streams({}, nworkers + 1) + persis_info = add_unique_random_streams({}, nworkers + 1, seed=4321) exit_criteria = {"sim_max": 2000} diff --git a/libensemble/tests/unit_tests/test_asktell.py b/libensemble/tests/unit_tests/test_asktell.py new file mode 100644 index 000000000..d8c90d741 --- /dev/null +++ b/libensemble/tests/unit_tests/test_asktell.py @@ -0,0 +1,156 @@ +import numpy as np +from libensemble.utils.misc import unmap_numpy_array + + +def _check_conversion(H, npp, mapping={}): + + for field in H.dtype.names: + print(f"Comparing {field}: {H[field]} {npp[field]}") + + if isinstance(H[field], np.ndarray): + assert np.array_equal(H[field], npp[field]), f"Mismatch found in field {field}" + + elif isinstance(H[field], str) and isinstance(npp[field], str): + assert H[field] == npp[field], f"Mismatch found in field {field}" + + elif np.isscalar(H[field]) and np.isscalar(npp[field]): + assert np.isclose(H[field], npp[field]), f"Mismatch found in field {field}" + + else: + raise TypeError(f"Unhandled or mismatched types in field {field}: {type(H[field])} vs {type(npp[field])}") + + +def test_awkward_list_dict(): + from libensemble.utils.misc import list_dicts_to_np + + # test list_dicts_to_np on a weirdly formatted dictionary + # Unfortunately, we're not really checking against some original + # libE-styled source of truth, like H. + + weird_list_dict = [ + { + "x0": "abcd", + "x1": "efgh", + "y": 56, + "z0": 1, + "z1": 2, + "z2": 3, + "z3": 4, + "z4": 5, + "z5": 6, + "z6": 7, + "z7": 8, + "z8": 9, + "z9": 10, + "z10": 11, + "a0": "B", + } + ] + + out_np = list_dicts_to_np(weird_list_dict) + + assert all([i in ("x", "y", "z", "a0") for i in out_np.dtype.names]) + + weird_list_dict = [ + { + "sim_id": 77, + "core": 89, + "edge": 10.1, + "beam": 76.5, + "energy": 12.34, + "local_pt": True, + "local_min": False, + }, + { + "sim_id": 10, + "core": 32.8, + "edge": 16.2, + "beam": 33.5, + "energy": 99.34, + "local_pt": False, + "local_min": False, + }, + ] + + # target dtype: [("sim_id", int), ("x, float, (3,)), ("f", float), ("local_pt", bool), ("local_min", bool)] + + mapping = {"x": ["core", "edge", "beam"], "f": ["energy"]} + out_np = list_dicts_to_np(weird_list_dict, mapping=mapping) + + assert all([i in ("sim_id", "x", "f", "local_pt", "local_min") for i in out_np.dtype.names]) + + +def test_awkward_H(): + from libensemble.utils.misc import list_dicts_to_np, np_to_list_dicts + + dtype = [("a", "i4"), ("x", "f4", (3,)), ("y", "f4", (1,)), ("z", "f4", (12,)), ("greeting", "U10"), ("co2", "f8")] + H = np.zeros(2, dtype=dtype) + H[0] = (1, [1.1, 2.2, 3.3], [10.1], [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12], "hello", "1.23") + H[1] = (2, [4.4, 5.5, 6.6], [11.1], [51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62], "goodbye", "2.23") + + list_dicts = np_to_list_dicts(H) + npp = list_dicts_to_np(list_dicts, dtype=dtype) + _check_conversion(H, npp) + + +def test_unmap_numpy_array_basic(): + """Test basic unmapping of x and x_on_cube arrays""" + + dtype = [("sim_id", int), ("x", float, (3,)), ("x_on_cube", float, (3,)), ("f", float), ("grad", float, (3,))] + H = np.zeros(2, dtype=dtype) + H[0] = (0, [1.1, 2.2, 3.3], [0.1, 0.2, 0.3], 10.5, [0.1, 0.2, 0.3]) + H[1] = (1, [4.4, 5.5, 6.6], [0.4, 0.5, 0.6], 20.7, [0.4, 0.5, 0.6]) + + mapping = {"x": ["x0", "x1", "x2"], "x_on_cube": ["x0_cube", "x1_cube", "x2_cube"]} + H_unmapped = unmap_numpy_array(H, mapping) + + expected_fields = ["sim_id", "x0", "x1", "x2", "x0_cube", "x1_cube", "x2_cube", "f"] + assert all(field in H_unmapped.dtype.names for field in expected_fields) + + assert H_unmapped["x0"][0] == 1.1 + assert H_unmapped["x1"][0] == 2.2 + assert H_unmapped["x2"][0] == 3.3 + assert H_unmapped["x0_cube"][0] == 0.1 + assert H_unmapped["x1_cube"][0] == 0.2 + assert H_unmapped["x2_cube"][0] == 0.3 + # Test that non-mapped array fields are passed through unchanged + assert "grad" in H_unmapped.dtype.names + assert np.array_equal(H_unmapped["grad"], H["grad"]) + + +def test_unmap_numpy_array_single_dimension(): + """Test unmapping with single dimension""" + + dtype = [("sim_id", int), ("x", float, (1,)), ("f", float)] + H = np.zeros(1, dtype=dtype) + H[0] = (0, [5.5], 15.0) + + mapping = {"x": ["x0"]} + H_unmapped = unmap_numpy_array(H, mapping) + + assert "x0" in H_unmapped.dtype.names + assert H_unmapped["x0"][0] == 5.5 + + +def test_unmap_numpy_array_edge_cases(): + """Test edge cases for unmap_numpy_array""" + + dtype = [("sim_id", int), ("x", float, (2,)), ("f", float)] + H = np.zeros(1, dtype=dtype) + H[0] = (0, [1.0, 2.0], 10.0) + + # No mapping + H_no_mapping = unmap_numpy_array(H, {}) + assert H_no_mapping is H + + # None array + H_none = unmap_numpy_array(None, {"x": ["x0", "x1"]}) + assert H_none is None + + +if __name__ == "__main__": + test_awkward_list_dict() + test_awkward_H() + test_unmap_numpy_array_basic() + test_unmap_numpy_array_single_dimension() + test_unmap_numpy_array_edge_cases() diff --git a/libensemble/tests/unit_tests/RENAME_test_persistent_aposmm.py b/libensemble/tests/unit_tests/test_persistent_aposmm.py similarity index 52% rename from libensemble/tests/unit_tests/RENAME_test_persistent_aposmm.py rename to libensemble/tests/unit_tests/test_persistent_aposmm.py index b08bc85fa..05aee2137 100644 --- a/libensemble/tests/unit_tests/RENAME_test_persistent_aposmm.py +++ b/libensemble/tests/unit_tests/test_persistent_aposmm.py @@ -122,6 +122,53 @@ def test_standalone_persistent_aposmm(): assert min_found >= 6, f"Found {min_found} minima" +def _evaluate_aposmm_instance(my_APOSMM): + from libensemble.message_numbers import FINISHED_PERSISTENT_GEN_TAG + from libensemble.sim_funcs.six_hump_camel import six_hump_camel_func + from libensemble.tests.regression_tests.support import six_hump_camel_minima as minima + + initial_sample = my_APOSMM.suggest(100) + + total_evals = 0 + eval_max = 2000 + + for point in initial_sample: + point["energy"] = six_hump_camel_func(np.array([point["core"], point["edge"]])) + total_evals += 1 + + my_APOSMM.ingest(initial_sample) + + potential_minima = [] + + while total_evals < eval_max: + + sample, detected_minima = my_APOSMM.suggest(6), my_APOSMM.suggest_updates() + if len(detected_minima): + for m in detected_minima: + potential_minima.append(m) + for point in sample: + point["energy"] = six_hump_camel_func(np.array([point["core"], point["edge"]])) + total_evals += 1 + my_APOSMM.ingest(sample) + my_APOSMM.finalize() + H, persis_info, exit_code = my_APOSMM.export() + + assert exit_code == FINISHED_PERSISTENT_GEN_TAG, "Standalone persistent_aposmm didn't exit correctly" + assert persis_info.get("run_order"), "Standalone persistent_aposmm didn't do any localopt runs" + + assert len(potential_minima) >= 6, f"Found {len(potential_minima)} minima" + + tol = 1e-3 + min_found = 0 + for m in minima: + # The minima are known on this test problem. + # We use their values to test APOSMM has identified all minima + print(np.min(np.sum((H[H["local_min"]]["x"] - m) ** 2, 1)), flush=True) + if np.min(np.sum((H[H["local_min"]]["x"] - m) ** 2, 1)) < tol: + min_found += 1 + assert min_found >= 6, f"Found {min_found} minima" + + @pytest.mark.extra def test_standalone_persistent_aposmm_combined_func(): from math import gamma, pi, sqrt @@ -168,8 +215,132 @@ def test_standalone_persistent_aposmm_combined_func(): assert persis_info.get("run_order"), "Standalone persistent_aposmm didn't do any localopt runs" +@pytest.mark.extra +def test_asktell_with_persistent_aposmm(): + from math import gamma, pi, sqrt + + from gest_api.vocs import VOCS + + import libensemble.gen_funcs + from libensemble.gen_classes import APOSMM + from libensemble.tests.regression_tests.support import six_hump_camel_minima as minima + + libensemble.gen_funcs.rc.aposmm_optimizers = "nlopt" + + n = 2 + + variables = {"core": [-3, 3], "edge": [-2, 2], "core_on_cube": [0, 1], "edge_on_cube": [0, 1]} + objectives = {"energy": "MINIMIZE"} + + variables_mapping = { + "x": ["core", "edge"], + "x_on_cube": ["core_on_cube", "edge_on_cube"], + "f": ["energy"], + } + + vocs = VOCS(variables=variables, objectives=objectives) + + my_APOSMM = APOSMM( + vocs, + max_active_runs=6, + initial_sample_size=100, + variables_mapping=variables_mapping, + sample_points=np.round(minima, 1), + localopt_method="LN_BOBYQA", + rk_const=0.5 * ((gamma(1 + (n / 2)) * 5) ** (1 / n)) / sqrt(pi), + xtol_abs=1e-6, + ftol_abs=1e-6, + dist_to_bound_multiple=0.5, + ) + + _evaluate_aposmm_instance(my_APOSMM) + + +def _run_aposmm_export_test(variables_mapping): + """Helper function to run APOSMM export tests with given variables_mapping""" + from gest_api.vocs import VOCS + + from libensemble.gen_classes import APOSMM + + variables = { + "core": [-3, 3], + "edge": [-2, 2], + "core_on_cube": [0, 1], + "edge_on_cube": [0, 1], + } + objectives = {"energy": "MINIMIZE"} + + vocs = VOCS(variables=variables, objectives=objectives) + aposmm = APOSMM( + vocs, + max_active_runs=6, + initial_sample_size=10, + variables_mapping=variables_mapping, + localopt_method="LN_BOBYQA", + xtol_abs=1e-6, + ftol_abs=1e-6, + dist_to_bound_multiple=0.5, + ) + # Test basic export before finalize + H, _, _ = aposmm.export() + print(f"Export before finalize: {H}") # Debug + assert H is None # Should be None before finalize + # Test export after suggest/ingest cycle + sample = aposmm.suggest(5) + for point in sample: + point["energy"] = 1.0 # Mock evaluation + aposmm.ingest(sample) + aposmm.finalize() + + # Test export with unmapped fields + H, _, _ = aposmm.export() + if H is not None: + assert "x" in H.dtype.names and H["x"].ndim == 2 + assert "f" in H.dtype.names and H["f"].ndim == 1 + + # Test export with user_fields + H_unmapped, _, _ = aposmm.export(user_fields=True) + print(f"H_unmapped: {H_unmapped}") # Debug + if H_unmapped is not None: + assert "core" in H_unmapped.dtype.names + assert "edge" in H_unmapped.dtype.names + assert "energy" in H_unmapped.dtype.names + # Test export with as_dicts + H_dicts, _, _ = aposmm.export(as_dicts=True) + assert isinstance(H_dicts, list) + assert isinstance(H_dicts[0], dict) + assert "x" in H_dicts[0] # x remains as array + assert "f" in H_dicts[0] + # Test export with both options + H_both, _, _ = aposmm.export(user_fields=True, as_dicts=True) + assert isinstance(H_both, list) + assert "core" in H_both[0] + assert "edge" in H_both[0] + assert "energy" in H_both[0] + + +@pytest.mark.extra +def test_aposmm_export(): + """Test APOSMM export function with different options""" + + # Test with full variables_mapping + full_mapping = { + "x": ["core", "edge"], + "x_on_cube": ["core_on_cube", "edge_on_cube"], + "f": ["energy"], + } + _run_aposmm_export_test(full_mapping) + # Test with just x_on_cube mapping (should auto-map x and f) + minimal_mapping = { + "x_on_cube": ["core_on_cube", "edge_on_cube"], + } + _run_aposmm_export_test(minimal_mapping) + + if __name__ == "__main__": test_persis_aposmm_localopt_test() test_update_history_optimal() test_standalone_persistent_aposmm() test_standalone_persistent_aposmm_combined_func() + test_asktell_with_persistent_aposmm() + test_aposmm_export() diff --git a/libensemble/tests/unit_tests/test_ufunc_runners.py b/libensemble/tests/unit_tests/test_ufunc_runners.py index 09f17b07e..0b362700f 100644 --- a/libensemble/tests/unit_tests/test_ufunc_runners.py +++ b/libensemble/tests/unit_tests/test_ufunc_runners.py @@ -30,8 +30,8 @@ def get_ufunc_args(): def test_normal_runners(): calc_in, sim_specs, gen_specs = get_ufunc_args() - simrunner = Runner(sim_specs) - genrunner = Runner(gen_specs) + simrunner = Runner.from_specs(sim_specs) + genrunner = Runner.from_specs(gen_specs) assert not hasattr(simrunner, "globus_compute_executor") and not hasattr( genrunner, "globus_compute_executor" ), "Globus Compute use should not be detected without setting endpoint fields" @@ -47,7 +47,7 @@ def tupilize(arg1, arg2): sim_specs["sim_f"] = tupilize persis_info = {"hello": "threads"} - simrunner = Runner(sim_specs) + simrunner = Runner.from_specs(sim_specs) result = simrunner._result(calc_in, persis_info, {}) assert result == (calc_in, persis_info) assert hasattr(simrunner, "thread_handle") @@ -75,7 +75,7 @@ def test_globus_compute_runner_init(): sim_specs["globus_compute_endpoint"] = "1234" with mock.patch("globus_compute_sdk.Executor"): - runner = Runner(sim_specs) + runner = Runner.from_specs(sim_specs) assert hasattr( runner, "globus_compute_executor" @@ -89,7 +89,7 @@ def test_globus_compute_runner_pass(): sim_specs["globus_compute_endpoint"] = "1234" with mock.patch("globus_compute_sdk.Executor"): - runner = Runner(sim_specs) + runner = Runner.from_specs(sim_specs) # Creating Mock Globus ComputeExecutor and Globus Compute future object - no exception globus_compute_mock = mock.Mock() @@ -115,7 +115,7 @@ def test_globus_compute_runner_fail(): gen_specs["globus_compute_endpoint"] = "4321" with mock.patch("globus_compute_sdk.Executor"): - runner = Runner(gen_specs) + runner = Runner.from_specs(gen_specs) # Creating Mock Globus ComputeExecutor and Globus Compute future object - yes exception globus_compute_mock = mock.Mock() diff --git a/libensemble/tools/alloc_support.py b/libensemble/tools/alloc_support.py index bbb5e275e..a8c9a65c8 100644 --- a/libensemble/tools/alloc_support.py +++ b/libensemble/tools/alloc_support.py @@ -280,6 +280,7 @@ def gen_work(self, wid, H_fields, H_rows, persis_info, **libE_info): H_fields = AllocSupport._check_H_fields(H_fields) libE_info["H_rows"] = AllocSupport._check_H_rows(H_rows) + libE_info["batch_size"] = len(self.avail_worker_ids(gen_workers=False)) work = { "H_fields": H_fields, diff --git a/libensemble/utils/misc.py b/libensemble/utils/misc.py index cfb4f4df2..dfc39e538 100644 --- a/libensemble/utils/misc.py +++ b/libensemble/utils/misc.py @@ -2,8 +2,12 @@ Misc internal functions """ -from itertools import groupby +from itertools import chain, groupby from operator import itemgetter +from typing import List + +import numpy as np +import numpy.typing as npt def extract_H_ranges(Work: dict) -> str: @@ -57,3 +61,208 @@ def specs_checker_getattr(obj, key, default=None): def specs_checker_setattr(obj, key, value): obj.__dict__[key] = value + + +def _combine_names(names: list) -> list: + """combine fields with same name *except* for final digits""" + out_names = [] + stripped = list(i.rstrip("0123456789") for i in names) # ['x', 'x', y', 'z', 'a'] + for name in names: + stripped_name = name.rstrip("0123456789") + if stripped.count(stripped_name) > 1: # if name appears >= 1, will combine, don't keep int suffix + out_names.append(stripped_name) + else: + out_names.append(name) # name appears once, keep integer suffix, e.g. "co2" + + # intending [x, y, z, a0] from [x0, x1, y, z0, z1, z2, z3, a0] + return list(set(out_names)) + + +def _get_new_dtype_fields(first: dict, mapping: dict = {}) -> list: + """build list of fields that will be in the output numpy array""" + new_dtype_names = _combine_names([i for i in first.keys()]) # -> ['x', 'y'] + fields_to_convert = list( # combining all mapping lists + chain.from_iterable(list(mapping.values())) + ) # fields like ["beam_length", "beam_width"] that will become "x" + new_dtype_names = [i for i in new_dtype_names if i not in fields_to_convert] + list( + mapping.keys() + ) # array dtype needs "x". avoid fields from mapping values since we're converting those to "x" + return new_dtype_names + + +def _get_combinable_multidim_names(first: dict, new_dtype_names: list) -> list: + """inspect the input dict for fields that can be combined (e.g. x0, x1)""" + combinable_names = [] + for name in new_dtype_names: + combinable_group = [i for i in first.keys() if i.rstrip("0123456789") == name] + if len(combinable_group) > 1: # multiple similar names, e.g. x0, x1 + combinable_names.append(combinable_group) + else: # single name, e.g. local_pt, a0 *AS LONG AS THERE ISNT AN A1* + combinable_names.append([name]) + return combinable_names + + +def _decide_dtype(name: str, entry, size: int) -> tuple: + """decide dtype of field, and size if needed""" + if isinstance(entry, str): # use numpy style for string type + output_type = "U" + str(len(entry) + 1) + else: + output_type = type(entry) # use default "python" type + if size == 1 or not size: + return (name, output_type) + else: + return (name, output_type, (size,)) # 3-tuple for multi-dimensional + + +def _start_building_dtype( + first: dict, new_dtype_names: list, combinable_names: list, dtype: list, mapping: dict +) -> list: + """parse out necessary components of dtype for output numpy array""" + for i, entry in enumerate(combinable_names): + name = new_dtype_names[i] + size = len(combinable_names[i]) # e.g. 2 for [x0, x1] + if name not in mapping: # mapping keys are what we're converting *to* + dtype.append(_decide_dtype(name, first[entry[0]], size)) + return dtype + + +def _pack_field(input_dict: dict, field_names: list) -> tuple: + """pack dict data into tuple for slotting into numpy array""" + # {"x0": 1, "x1": 2} -> (1, 2) + return tuple(input_dict[name] for name in field_names) if len(field_names) > 1 else input_dict[field_names[0]] + + +def list_dicts_to_np(list_dicts: list, dtype: list = None, mapping: dict = {}) -> npt.NDArray: + if list_dicts is None: + return None + + if not isinstance(list_dicts, list): # presumably already a numpy array, conversion not necessary + return list_dicts + + # entering gen: convert _id to sim_id + for entry in list_dicts: + if "_id" in entry: + entry["sim_id"] = entry.pop("_id") + + # first entry is used to determine dtype + first = list_dicts[0] + + # build a presumptive dtype + new_dtype_names = _get_new_dtype_fields(first, mapping) + combinable_names = _get_combinable_multidim_names(first, new_dtype_names) # [['x0', 'x1'], ['z']] + + if ( + dtype is None + ): # rather roundabout. I believe default value gets set upon function instantiation. (default is mutable!) + dtype = [] + + # build dtype of non-mapped fields. appending onto empty dtype + if not len(dtype): + dtype = _start_building_dtype(first, new_dtype_names, combinable_names, dtype, mapping) + + # append dtype of mapped float fields + if len(mapping): + for name in mapping: + size = len(mapping[name]) + dtype.append(_decide_dtype(name, 0.0, size)) # float + + out = np.zeros(len(list_dicts), dtype=dtype) + + # starting packing data from list of dicts into array + for j, input_dict in enumerate(list_dicts): + for output_name, input_names in zip(new_dtype_names, combinable_names): # [('x', ['x0', 'x1']), ...] + if output_name not in mapping: + out[output_name][j] = _pack_field(input_dict, input_names) + else: + out[output_name][j] = _pack_field(input_dict, mapping[output_name]) + return out + + +def _is_multidim(selection: npt.NDArray) -> bool: + return hasattr(selection, "__len__") and len(selection) > 1 and not isinstance(selection, str) + + +def _is_singledim(selection: npt.NDArray) -> bool: + return (hasattr(selection, "__len__") and len(selection) == 1) or selection.shape == () + + +def unmap_numpy_array(array: npt.NDArray, mapping: dict = {}) -> npt.NDArray: + """Convert numpy array with mapped fields back to individual scalar fields. + Parameters + ---------- + array : npt.NDArray + Input array with mapped fields like x = [x0, x1, x2] + mapping : dict + Mapping from field names to variable names + Returns + ------- + npt.NDArray + Array with unmapped fields like x0, x1, x2 as individual scalars + """ + if not mapping or array is None: + return array + # Create new dtype with unmapped fields + new_fields = [] + for field in array.dtype.names: + if field in mapping: + for var_name in mapping[field]: + new_fields.append((var_name, array[field].dtype.type)) + else: + # Preserve the original field structure including per-row shape + field_dtype = array.dtype[field] + new_fields.append((field, field_dtype)) + unmapped_array = np.zeros(len(array), dtype=new_fields) + for field in array.dtype.names: + if field in mapping: + # Unmap array fields + if len(array[field].shape) == 1: + # Scalar field mapped to single variable + unmapped_array[mapping[field][0]] = array[field] + else: + # Multi-dimensional field + for i, var_name in enumerate(mapping[field]): + unmapped_array[var_name] = array[field][:, i] + else: + # Copy non-mapped fields + unmapped_array[field] = array[field] + return unmapped_array + + +def np_to_list_dicts(array: npt.NDArray, mapping: dict = {}, allow_arrays: bool = False) -> List[dict]: + if array is None: + return None + out = [] + + for row in array: + new_dict = {} + + for field in row.dtype.names: + # non-string arrays, lists, etc. + if field not in list(mapping.keys()): + if _is_multidim(row[field]) and not allow_arrays: + for i, x in enumerate(row[field]): + new_dict[field + str(i)] = x + + else: + new_dict[field] = row[field] + + else: # keys from mapping and array unpacked into corresponding fields in dicts + field_shape = array.dtype[field].shape[0] if len(array.dtype[field].shape) > 0 else 1 + assert field_shape == len(mapping[field]), ( + "dimension mismatch between mapping and array with field " + field + ) + + for i, name in enumerate(mapping[field]): + if _is_multidim(row[field]): + new_dict[name] = row[field][i] + elif _is_singledim(row[field]): + new_dict[name] = row[field] + + out.append(new_dict) + + # exiting gen: convert sim_id to _id + for entry in out: + if "sim_id" in entry: + entry["_id"] = entry.pop("sim_id") + + return out diff --git a/libensemble/utils/pydantic_bindings.py b/libensemble/utils/pydantic_bindings.py index 6c297bb95..6ae28efe8 100644 --- a/libensemble/utils/pydantic_bindings.py +++ b/libensemble/utils/pydantic_bindings.py @@ -15,8 +15,8 @@ check_inputs_exist, check_logical_cores, check_mpi_runner_type, - check_output_fields, check_provided_ufuncs, + check_set_gen_specs_from_variables, check_valid_comms_type, check_valid_in, check_valid_out, @@ -77,6 +77,7 @@ __validators__={ "check_valid_out": check_valid_out, "check_valid_in": check_valid_in, + "check_set_gen_specs_from_variables": check_set_gen_specs_from_variables, "genf_set_in_out_from_attrs": genf_set_in_out_from_attrs, }, ) @@ -102,7 +103,6 @@ __base__=specs._EnsembleSpecs, __validators__={ "check_exit_criteria": check_exit_criteria, - "check_output_fields": check_output_fields, "check_H0": check_H0, "check_provided_ufuncs": check_provided_ufuncs, }, diff --git a/libensemble/utils/runners.py b/libensemble/utils/runners.py index 9554b1176..af1fd28b8 100644 --- a/libensemble/utils/runners.py +++ b/libensemble/utils/runners.py @@ -1,22 +1,35 @@ import inspect import logging import logging.handlers +import time import numpy.typing as npt from libensemble.comms.comms import QCommThread +from libensemble.generators import LibensembleGenerator, PersistentGenInterfacer +from libensemble.message_numbers import EVAL_GEN_TAG, FINISHED_PERSISTENT_GEN_TAG, PERSIS_STOP, STOP_TAG +from libensemble.tools.persistent_support import PersistentSupport +from libensemble.utils.misc import list_dicts_to_np, np_to_list_dicts logger = logging.getLogger(__name__) class Runner: - def __new__(cls, specs): + @classmethod + def from_specs(cls, specs): if len(specs.get("globus_compute_endpoint", "")) > 0: - return super(Runner, GlobusComputeRunner).__new__(GlobusComputeRunner) - if specs.get("threaded"): # TODO: undecided interface - return super(Runner, ThreadRunner).__new__(ThreadRunner) + return GlobusComputeRunner(specs) + if specs.get("threaded"): + return ThreadRunner(specs) + if (generator := specs.get("generator")) is not None: + if isinstance(generator, PersistentGenInterfacer): + return LibensembleGenThreadRunner(specs) + if isinstance(generator, LibensembleGenerator): + return LibensembleGenRunner(specs) + else: + return StandardGenRunner(specs) else: - return super().__new__(Runner) + return Runner(specs) def __init__(self, specs): self.specs = specs @@ -85,3 +98,123 @@ def _result(self, calc_in: npt.NDArray, persis_info: dict, libE_info: dict) -> ( def shutdown(self) -> None: if self.thread_handle is not None: self.thread_handle.terminate() + + +class StandardGenRunner(Runner): + """Interact with suggest/ingest generator. Base class initialized for third-party generators.""" + + def __init__(self, specs): + super().__init__(specs) + self.gen = specs.get("generator") + + def _get_points_updates(self, batch_size: int) -> (npt.NDArray, npt.NDArray): + # no suggest_updates on external gens + return ( + list_dicts_to_np( + self.gen.suggest(batch_size), + dtype=self.specs.get("out"), + mapping=getattr(self.gen, "variables_mapping", {}), + ), + None, + ) + + def _convert_ingest(self, x: npt.NDArray) -> list: + self.gen.ingest(np_to_list_dicts(x)) + + def _loop_over_gen(self, tag, Work, H_in): + """Interact with suggest/ingest generator that *does not* contain a background thread""" + while tag not in [PERSIS_STOP, STOP_TAG]: + batch_size = self.specs.get("batch_size") or len(H_in) + H_out, _ = self._get_points_updates(batch_size) + tag, Work, H_in = self.ps.send_recv(H_out) + self._convert_ingest(H_in) + return H_in + + def _get_initial_suggest(self, libE_info) -> npt.NDArray: + """Get initial batch from generator based on generator type""" + initial_batch = self.specs.get("initial_batch_size") or self.specs.get("batch_size") or libE_info["batch_size"] + H_out = self.gen.suggest(initial_batch) + return H_out + + def _start_generator_loop(self, tag, Work, H_in): + """Start the generator loop after choosing best way of giving initial results to gen""" + self.gen.ingest(np_to_list_dicts(H_in, mapping=getattr(self.gen, "variables_mapping", {}))) + return self._loop_over_gen(tag, Work, H_in) + + def _persistent_result(self, calc_in, persis_info, libE_info): + """Setup comms with manager, setup gen, loop gen to completion, return gen's results""" + self.ps = PersistentSupport(libE_info, EVAL_GEN_TAG) + # libE gens will hit the following line, but list_dicts_to_np will passthrough if the output is a numpy array + H_out = list_dicts_to_np( + self._get_initial_suggest(libE_info), + dtype=self.specs.get("out"), + mapping=getattr(self.gen, "variables_mapping", {}), + ) + tag, Work, H_in = self.ps.send_recv(H_out) # evaluate the initial sample + final_H_out = self._start_generator_loop(tag, Work, H_in) + self.gen.finalize() + return final_H_out, FINISHED_PERSISTENT_GEN_TAG + + def _result(self, calc_in: npt.NDArray, persis_info: dict, libE_info: dict) -> (npt.NDArray, dict, int): + if libE_info.get("persistent"): + return self._persistent_result(calc_in, persis_info, libE_info) + raise ValueError( + "suggest/ingest generators must run in persistent mode. This may be the default in the future." + ) + + +class LibensembleGenRunner(StandardGenRunner): + def _get_initial_suggest(self, libE_info) -> npt.NDArray: + """Get initial batch from generator based on generator type""" + initial_batch = self.specs.get("initial_batch_size") or self.specs.get("batch_size") or libE_info["batch_size"] + H_out = self.gen.suggest_numpy(initial_batch) + return H_out + + def _get_points_updates(self, batch_size: int) -> (npt.NDArray, list): + numpy_out = self.gen.suggest_numpy(batch_size) + if callable(getattr(self.gen, "suggest_updates", None)): + updates = self.gen.suggest_updates() + else: + updates = None + return numpy_out, updates + + def _convert_ingest(self, x: npt.NDArray) -> list: + self.gen.ingest_numpy(x) + + def _start_generator_loop(self, tag, Work, H_in) -> npt.NDArray: + """Start the generator loop after choosing best way of giving initial results to gen""" + self.gen.ingest_numpy(H_in) + return self._loop_over_gen(tag, Work, H_in) # see parent class + + +class LibensembleGenThreadRunner(StandardGenRunner): + def _get_initial_suggest(self, libE_info) -> npt.NDArray: + """Get initial batch from generator based on generator type""" + return self.gen.suggest_numpy() # libE really needs to receive the *entire* initial batch from a threaded gen + + def _suggest_and_send(self): + """Loop over generator's outbox contents, send to manager""" + while not self.gen.running_gen_f.outbox.empty(): # recv/send any outstanding messages + points = self.gen.suggest_numpy() + if callable(getattr(self.gen, "suggest_updates", None)): + updates = self.gen.suggest_updates() + else: + updates = None + if updates is not None and len(updates): + self.ps.send(points) + for i in updates: + self.ps.send(i, keep_state=True) # keep_state since an update doesn't imply "new points" + else: + self.ps.send(points) + + def _loop_over_gen(self, *args): + """Cycle between moving all outbound / inbound messages between threaded gen and manager""" + while True: + time.sleep(0.0025) # dont need to ping the gen relentlessly. Let it calculate. 400hz + self._suggest_and_send() + while self.ps.comm.mail_flag(): # receive any new messages from Manager, give all to gen + tag, _, H_in = self.ps.recv() + if tag in [STOP_TAG, PERSIS_STOP]: + self.gen.ingest_numpy(H_in, PERSIS_STOP) + return self.gen.running_gen_f.result() + self.gen.ingest_numpy(H_in) diff --git a/libensemble/utils/specs_checkers.py b/libensemble/utils/specs_checkers.py index cf33d359f..b8e793fa5 100644 --- a/libensemble/utils/specs_checkers.py +++ b/libensemble/utils/specs_checkers.py @@ -25,28 +25,10 @@ def _check_exit_criteria(values): return values -def _check_output_fields(values): - out_names = [e[0] for e in libE_fields] - if scg(values, "H0") is not None and scg(values, "H0").dtype.names is not None: - out_names += list(scg(values, "H0").dtype.names) - out_names += [e[0] for e in scg(values, "sim_specs").outputs] - if scg(values, "gen_specs"): - out_names += [e[0] for e in scg(values, "gen_specs").outputs] - if scg(values, "alloc_specs"): - out_names += [e[0] for e in scg(values, "alloc_specs").outputs] - - for name in scg(values, "sim_specs").inputs: - assert name in out_names, ( - name + " in sim_specs['in'] is not in sim_specs['out'], " - "gen_specs['out'], alloc_specs['out'], H0, or libE_fields." - ) - - if scg(values, "gen_specs"): - for name in scg(values, "gen_specs").inputs: - assert name in out_names, ( - name + " in gen_specs['in'] is not in sim_specs['out'], " - "gen_specs['out'], alloc_specs['out'], H0, or libE_fields." - ) +def _check_set_gen_specs_from_variables(values): + if not len(scg(values, "outputs")): + if scg(values, "generator") and len(scg(values, "generator").gen_specs["out"]): + scs(values, "outputs", scg(values, "generator").gen_specs["out"]) return values diff --git a/libensemble/utils/validators.py b/libensemble/utils/validators.py index e2fec4e13..2164bf2f4 100644 --- a/libensemble/utils/validators.py +++ b/libensemble/utils/validators.py @@ -11,8 +11,8 @@ _check_exit_criteria, _check_H0, _check_logical_cores, - _check_output_fields, _check_set_calc_dirs_on_input_dir, + _check_set_gen_specs_from_variables, _check_set_workflow_dir, ) @@ -152,13 +152,13 @@ def check_exit_criteria(self): @model_validator(mode="after") -def check_output_fields(self): - return _check_output_fields(self) +def check_H0(self): + return _check_H0(self) @model_validator(mode="after") -def check_H0(self): - return _check_H0(self) +def check_set_gen_specs_from_variables(self): + return _check_set_gen_specs_from_variables(self) @model_validator(mode="after") @@ -168,7 +168,9 @@ def check_provided_ufuncs(self): if self.alloc_specs.alloc_f.__name__ != "give_pregenerated_sim_work": assert hasattr(self.gen_specs, "gen_f"), "Generator function not provided to GenSpecs." - assert isinstance(self.gen_specs.gen_f, Callable), "Generator function is not callable." + assert ( + isinstance(self.gen_specs.gen_f, Callable) if self.gen_specs.gen_f is not None else True + ), "Generator function is not callable." return self diff --git a/libensemble/worker.py b/libensemble/worker.py index 44d5f0dde..8113f5d40 100644 --- a/libensemble/worker.py +++ b/libensemble/worker.py @@ -172,8 +172,8 @@ def __init__( self.workerID = workerID self.libE_specs = libE_specs self.stats_fmt = libE_specs.get("stats_fmt", {}) - self.sim_runner = Runner(sim_specs) - self.gen_runner = Runner(gen_specs) + self.sim_runner = Runner.from_specs(sim_specs) + self.gen_runner = Runner.from_specs(gen_specs) self.runners = {EVAL_SIM_TAG: self.sim_runner.run, EVAL_GEN_TAG: self.gen_runner.run} self.calc_iter = {EVAL_SIM_TAG: 0, EVAL_GEN_TAG: 0} Worker._set_executor(self.workerID, self.comm) @@ -262,6 +262,7 @@ def _handle_calc(self, Work: dict, calc_in: npt.NDArray) -> (npt.NDArray, dict, try: logger.debug(f"Starting {enum_desc}: {calc_id}") + out = None calc = self.runners[calc_type] with timer: if self.EnsembleDirectory.use_calc_dirs(calc_type): @@ -285,8 +286,8 @@ def _handle_calc(self, Work: dict, calc_in: npt.NDArray) -> (npt.NDArray, dict, if tag in [STOP_TAG, PERSIS_STOP] and message is MAN_SIGNAL_FINISH: calc_status = MAN_SIGNAL_FINISH - if out: - if len(out) >= 3: # Out, persis_info, calc_status + if out is not None: + if not isinstance(out, np.ndarray) and len(out) >= 3: # Out, persis_info, calc_status calc_status = out[2] return out elif len(out) == 2: # Out, persis_info OR Out, calc_status diff --git a/pyproject.toml b/pyproject.toml index 80535d8cb..edf4bc6c0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ authors = [{name = "Jeffrey Larson"}, {name = "Stephen Hudson"}, {name = "Stefan M. Wild"}, {name = "David Bindel"}, {name = "John-Luke Navarro"}] -dependencies = [ "numpy", "psutil", "pydantic", "pyyaml", "tomli"] +dependencies = ["numpy", "psutil", "pyyaml", "tomli", "gest-api @ git+https://github.com/campa-consortium/gest-api@main", "pydantic"] description = "A Python toolkit for coordinating asynchronous and dynamic ensembles of calculations." name = "libensemble" @@ -95,7 +95,7 @@ python = ">=3.10,<3.14" pip = ">=24.3.1,<25" setuptools = ">=75.6.0,<76" numpy = ">=1.21,<3" -pydantic = ">=1.10,<3" +pydantic = ">=2.11.7,<3" pyyaml = ">=6.0,<7" tomli = ">=1.2.1,<3" psutil = ">=5.9.4,<7" @@ -105,7 +105,7 @@ clang_osx-arm64 = ">=19.1.2,<20" [tool.black] line-length = 120 -target-version = ['py39', 'py310', 'py311', 'py312', 'py313'] +target-version = ['py310', 'py311', 'py312', 'py313'] force-exclude = ''' ( /( @@ -143,4 +143,4 @@ extend-exclude = ["*.bib", "*.xml", "docs/nitpicky"] disable_error_code = ["import-not-found", "import-untyped"] [dependency-groups] -dev = ["pyenchant", "enchant>=0.0.1,<0.0.2", "flake8-modern-annotations>=1.6.0,<2", "flake8-type-checking>=3.0.0,<4"] +dev = ["pyenchant", "enchant>=0.0.1,<0.0.2", "flake8-modern-annotations>=1.6.0,<2", "flake8-type-checking>=3.0.0,<4", "wat>=0.6.0,<0.7"]