diff --git a/FF_calculation/FF_QCD.py b/FF_calculation/FF_QCD.py index 9d681a6..9c5545a 100644 --- a/FF_calculation/FF_QCD.py +++ b/FF_calculation/FF_QCD.py @@ -43,6 +43,7 @@ def calculation_QCD_FFs( sample_paths, # sample_paths: List[str], output_path, # output_path: str, logger, # logger: str, + lock, # lock: str, (needed for cache snapshots) *_, # SRlike_hists, ARlike_hists only used in ttbar calculation ) = args @@ -70,6 +71,7 @@ def calculation_QCD_FFs( category_cuts=splitting.split, region_cuts=region_conf, logger=logger, + lock=lock, ) # event filter for QCD application-like region @@ -82,6 +84,7 @@ def calculation_QCD_FFs( category_cuts=splitting.split, region_cuts=region_conf, logger=logger, + lock=lock, ) # get binning of the dependent variable @@ -222,6 +225,7 @@ def non_closure_correction( evaluator, corr_evaluators, for_DRtoSR, + lock, # lock: str, (needed for cache snapshots) ) = args log = logging.getLogger(logger) @@ -251,6 +255,7 @@ def non_closure_correction( category_cuts=splitting.split, region_cuts=region_conf, logger=logger, + lock=lock, ) # event filter for QCD application-like region @@ -263,6 +268,7 @@ def non_closure_correction( category_cuts=splitting.split, region_cuts=region_conf, logger=logger, + lock=lock, ) # evaluate the measured fake factors for the specific processes @@ -469,6 +475,7 @@ def DR_SR_correction( category_cuts=splitting.split, region_cuts=region_conf, logger=logger, + lock=lock, ) # event filter for QCD application-like region @@ -481,6 +488,7 @@ def DR_SR_correction( category_cuts=splitting.split, region_cuts=region_conf, logger=logger, + lock=lock, ) # evaluate the measured fake factors for the specific processes diff --git a/FF_calculation/FF_Wjets.py b/FF_calculation/FF_Wjets.py index ec15e78..808dea3 100644 --- a/FF_calculation/FF_Wjets.py +++ b/FF_calculation/FF_Wjets.py @@ -43,6 +43,7 @@ def calculation_Wjets_FFs( sample_paths, # sample_paths: List[str], output_path, # output_path: str, logger, # logger: str, + lock, # lock: str, (needed for cache snapshots) *_, # SRlike_hists, ARlike_hists only used in ttbar calculation ) = args @@ -73,6 +74,7 @@ def calculation_Wjets_FFs( category_cuts=splitting.split, region_cuts=region_conf, logger=logger, + lock=lock, ) # QCD estimation from same sign in signal-like region @@ -89,6 +91,7 @@ def calculation_Wjets_FFs( category_cuts=splitting.split, region_cuts=region_conf, logger=logger, + lock=lock, ) # event filter for Wjets application-like region @@ -101,6 +104,7 @@ def calculation_Wjets_FFs( category_cuts=splitting.split, region_cuts=region_conf, logger=logger, + lock=lock, ) # QCD estimation from same sign in application-like region @@ -119,6 +123,7 @@ def calculation_Wjets_FFs( category_cuts=splitting.split, region_cuts=region_conf, logger=logger, + lock=lock, ) # get binning of the dependent variable from the computed binning @@ -279,6 +284,7 @@ def non_closure_correction( evaluator, corr_evaluators, for_DRtoSR, + lock, # lock: str, (needed for cache snapshots) ) = args log = logging.getLogger(logger) @@ -312,6 +318,7 @@ def non_closure_correction( category_cuts=splitting.split, region_cuts=region_conf, logger=logger, + lock=lock, ) # QCD estimation from same sign in signal-like region @@ -328,6 +335,7 @@ def non_closure_correction( category_cuts=splitting.split, region_cuts=region_conf, logger=logger, + lock=lock, ) # event filter for Wjets application-like region @@ -340,6 +348,7 @@ def non_closure_correction( category_cuts=splitting.split, region_cuts=region_conf, logger=logger, + lock=lock, ) # evaluate the measured fake factors for the specific processes @@ -372,6 +381,7 @@ def non_closure_correction( category_cuts=splitting.split, region_cuts=region_conf, logger=logger, + lock=lock, ) # get binning of the dependent variable @@ -581,6 +591,7 @@ def DR_SR_correction( category_cuts=splitting.split, region_cuts=region_conf, logger=logger, + lock=lock, ) # event filter for Wjets application-like region @@ -593,6 +604,7 @@ def DR_SR_correction( category_cuts=splitting.split, region_cuts=region_conf, logger=logger, + lock=lock, ) rdf_ARlike = evaluator.evaluate_fake_factor(rdf=rdf_ARlike) diff --git a/FF_calculation/FF_ttbar.py b/FF_calculation/FF_ttbar.py index 8ad473a..1356b0f 100644 --- a/FF_calculation/FF_ttbar.py +++ b/FF_calculation/FF_ttbar.py @@ -47,6 +47,7 @@ def calculation_ttbar_FFs( logger, # logger: str, SRlike_hists, # SRlike_hists: Dict[str, ROOT.TH1D], ARlike_hists, # ARlike_hists: Dict[str, ROOT.TH1D], + lock, # lock: str, (needed for cache snapshots) ) = args log = logging.getLogger(logger) @@ -75,6 +76,7 @@ def calculation_ttbar_FFs( category_cuts=splitting.split, region_cuts=region_conf, logger=logger, + lock=lock, ) # event filter for ttbar application region @@ -87,6 +89,7 @@ def calculation_ttbar_FFs( category_cuts=splitting.split, region_cuts=region_conf, logger=logger, + lock=lock, ) # get binning of the dependent variable @@ -182,6 +185,7 @@ def calculation_FF_data_scaling_factor( sample_paths: List[str], process: str, logger: str, + lock: multiplrocessing.Lock, ) -> Tuple[Dict[str, ROOT.TH1D], Dict[str, ROOT.TH1D]]: """ This function calculates the global SR-like and AR-like histograms for the ttbar process @@ -225,6 +229,7 @@ def calculation_FF_data_scaling_factor( category_cuts=None, region_cuts=region_conf, logger=logger, + lock=lock, ) # QCD estimation from same sign in signal-like region @@ -241,6 +246,7 @@ def calculation_FF_data_scaling_factor( category_cuts=None, region_cuts=region_conf, logger=logger, + lock=lock, ) # event filter for ttbar application-like region @@ -253,6 +259,7 @@ def calculation_FF_data_scaling_factor( category_cuts=None, region_cuts=region_conf, logger=logger, + lock=lock, ) # QCD estimation from same sign in application-like region @@ -269,6 +276,7 @@ def calculation_FF_data_scaling_factor( category_cuts=None, region_cuts=region_conf, logger=logger, + lock=lock, ) # make yield histograms for FF data correction @@ -348,6 +356,7 @@ def non_closure_correction( logger, evaluator, corr_evaluators, + lock, # lock: str, (needed for cache snapshots) *_, # for_DRtoSR not needed for ttbar ) = args @@ -379,6 +388,7 @@ def non_closure_correction( category_cuts=splitting.split, region_cuts=region_conf, logger=logger, + lock=lock, ) # event filter for ttbar application region @@ -391,6 +401,7 @@ def non_closure_correction( category_cuts=splitting.split, region_cuts=region_conf, logger=logger, + lock=lock, ) rdf_AR = evaluator.evaluate_fake_factor(rdf=rdf_AR) diff --git a/FF_calculation/fractions.py b/FF_calculation/fractions.py index 7b250f9..6527927 100644 --- a/FF_calculation/fractions.py +++ b/FF_calculation/fractions.py @@ -41,6 +41,7 @@ def fraction_calculation( sample_paths, # sample_paths: List[str], output_path, # output_path: str, logger, # logger: str, + lock, # lock: str, (needed for cache snapshots) *_, # SRlike_hists, ARlike_hists only needed for ttbar ) = args @@ -67,6 +68,7 @@ def fraction_calculation( category_cuts=splitting.split, region_cuts=region_conf, logger=logger, + lock=lock, ) # event filter for signal region; this is not needed for the FF calculation, just for control plots @@ -79,6 +81,7 @@ def fraction_calculation( category_cuts=splitting.split, region_cuts=region_conf, logger=logger, + lock=lock, ) # get binning of the dependent variable diff --git a/helper/ff_functions.py b/helper/ff_functions.py index 26d6cd1..ce6cb74 100644 --- a/helper/ff_functions.py +++ b/helper/ff_functions.py @@ -53,6 +53,7 @@ def cache_rdf_snapshot(cache_dir: str = "./.RDF_CACHE") -> Callable: def decorator(function: Callable) -> Callable: @functools.wraps(function) def wrapper(*args: Any, **kwargs: Any) -> ROOT.RDataFrame: + lock = kwargs;get("lock") log = logging.getLogger(kwargs.get("logger") or function.__module__ + '.' + function.__name__) tree_name = "ntuple" @@ -81,25 +82,28 @@ def wrapper(*args: Any, **kwargs: Any) -> ROOT.RDataFrame: if os.path.exists(cache_filepath) and func.RuntimeVariables.USE_CACHED_INTERMEDIATE_STEPS: log.info(f"Using existent filtered Rdf: {cache_filepath}") - return ROOT.RDataFrame(tree_name, cache_filepath) + with lock: + rdf = ROOT.RDataFrame(tree_name, cache_filepath) + return rdf log.info(f"Creating filtered Rdf under: {cache_filepath}") - cols = [str(c) for c in base_rdf.GetColumnNames()] - filtered_rdf = function(*args, **kwargs) - - if filtered_rdf.Count().GetValue() == 0: - log.warning("Filter resulted in zero events. Creating an empty snapshot with the correct schema.") - f = ROOT.TFile(cache_filepath, "RECREATE") - tree = ROOT.TTree(tree_name, tree_name) - for c in cols: - arr = ROOT.std.vector("float")() - tree.Branch(c, arr) - tree.Write() - f.Close() - else: - snapshot_result = filtered_rdf.Snapshot(tree_name, cache_filepath, cols) - snapshot_result.GetValue() # force execution + with lock: + cols = [str(c) for c in base_rdf.GetColumnNames()] + filtered_rdf = function(*args, **kwargs) + + if filtered_rdf.Count().GetValue() == 0: + log.warning("Filter resulted in zero events. Creating an empty snapshot with the correct schema.") + f = ROOT.TFile(cache_filepath, "RECREATE") + tree = ROOT.TTree(tree_name, tree_name) + for c in cols: + arr = ROOT.std.vector("float")() + tree.Branch(c, arr) + tree.Write() + f.Close() + else: + snapshot_result = filtered_rdf.Snapshot(tree_name, cache_filepath, cols) + snapshot_result.GetValue() # force execution if not os.path.exists(cache_filepath): log.error(f"Snapshot failed, file {cache_filepath} not created") @@ -699,12 +703,14 @@ def fill_corrlib_expression( @cache_rdf_snapshot(cache_dir="./.RDF_CACHE") def apply_region_filters( + *, # require all arguments to be keyword arguments rdf: Any, channel: str, sample: str, category_cuts: Union[Dict[str, str], None], region_cuts: Dict[str, str], logger: str, + lock: multiprocessing.Lock, ) -> Any: """ Function which applies filters to a root DataFrame for the fake factor calculation. This includes the region cuts and the category splitting.