diff --git a/refl1d/uncertainty.py b/refl1d/uncertainty.py index cea1647d..810eead4 100644 --- a/refl1d/uncertainty.py +++ b/refl1d/uncertainty.py @@ -19,9 +19,11 @@ "show_residuals", ] +import multiprocessing import os import sys +import dill import numpy as np from bumps.errplot import reload_errors from bumps.plotutil import dhsv, form_quantiles, next_color, plot_quantiles @@ -117,7 +119,19 @@ def _usage(): print(run_errors.__doc__) -def calc_errors(problem, points): +def _initialize_worker(shared_serialized_problem): + global _shared_problem + _shared_problem = dill.loads(shared_serialized_problem[:].tobytes()) + + +_shared_problem = None # used by multiprocessing pool to hold problem + + +def _worker_eval_point(point): + return _eval_point(_shared_problem, point) + + +def calc_errors(problem, points, parallel: int = 0): """ Align the sample profiles and compute the residual difference from the measured reflectivity for a set of points. @@ -126,6 +140,9 @@ def calc_errors(problem, points): distribution computed from MCMC, bootstrapping or sampled from the error ellipse calculated at the minimum. + The *parallel* parameter controls the number of parallel processes + (set to 1 to disable use of ProcessPoolExecutor, or 0 to use all processors). + Each of the returned arguments is a dictionary mapping model number to error sample data as follows: @@ -159,8 +176,26 @@ def calc_errors(problem, points): # Put best at slot 0, no alignment data = [_eval_point(problem, problem.getp())] - for p in points: - data.append(_eval_point(problem, p)) + + if parallel != 1: + import concurrent.futures + from functools import partial + + max_workers = parallel if parallel > 0 else None + serialized_problem = dill.dumps(problem) + + with multiprocessing.Manager() as manager: + shared_serialized_problem = manager.Array("B", serialized_problem) + args = [(shared_serialized_problem, point) for point in points] + + with concurrent.futures.ProcessPoolExecutor( + max_workers=max_workers, initializer=_initialize_worker, initargs=(shared_serialized_problem,) + ) as executor: + results = executor.map(_worker_eval_point, points) + data.extend(results) + else: + for p in points: + data.append(_eval_point(problem, p)) profiles, slabs, residuals = zip(*data) diff --git a/refl1d/validation/gepore_runner.py b/refl1d/validation/gepore_runner.py index 43fcf037..22a99584 100644 --- a/refl1d/validation/gepore_runner.py +++ b/refl1d/validation/gepore_runner.py @@ -77,7 +77,7 @@ def run(self, layers, QS, DQ, NQ, EPS, H, zeeman_corrections=True, output_folder for IP, IM in ((0.0, 1.0), (1.0, 0.0)): with open(header, "w") as fid: fid.write( - f"{NL} {NC} {QS} {DQ} {NQ} {radians(EPS)} ({IP},0.0) ({IM},0.0) {1e-6 * ROINP} {1e-6 * ROINM} {1e-6 * ROSUP} {1e-6* ROSUM}\n" + f"{NL} {NC} {QS} {DQ} {NQ} {radians(EPS)} ({IP},0.0) ({IM},0.0) {1e-6 * ROINP} {1e-6 * ROINM} {1e-6 * ROSUP} {1e-6 * ROSUM}\n" # "%d %d %f %f %d %f (%f,0.0) (%f,0.0) %e %e %e %e\n" # % (NL, NC, QS, DQ, NQ, radians(EPS), IP, IM, 1e-6 * ROINP, 1e-6 * ROINM, 1e-6 * ROSUP, 1e-6 * ROSUM) )