diff --git a/pyproject.toml b/pyproject.toml index f85c628..06b7060 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -60,9 +60,10 @@ dependencies = [ "numpy", "pyg4ometry", "pygama >=2.2.3", - "pylegendmeta >=1.3.3", + "pylegendmeta >=1.3.5", "reboost >=0.8.3", - "snakemake>=8.16", + "revertex >=0.1.2", + "snakemake >=8.16", "snakemake-storage-plugin-fs", ] @@ -177,7 +178,7 @@ legend-pydataobj = "*" legend-pygeom-l200 = ">=0.8,<0.9" legend-pygeom-tools = ">=0.1,<0.2" numpy = "*" -pylegendmeta = ">=1.3.3,<1.4" +pylegendmeta = ">=1.3.5,<1.4" # execution snakemake = ">=8.16,<9" diff --git a/workflow/rules/hit.smk b/workflow/rules/hit.smk index e2fa6b5..863acbd 100644 --- a/workflow/rules/hit.smk +++ b/workflow/rules/hit.smk @@ -252,9 +252,6 @@ rule extract_current_pulse_model: # dynamically generated, and that would slow down the DAG generation # input: # unpack(smk_extract_current_pulse_model_inputs), - params: - # evt_idx=parse_input(input.raw_wf_idx_file, parser=mutils.extract_integer), - hit_tier_name="hit", output: pars_file=temp(patterns.output_currmod_filename(config)), plot_file=patterns.plot_currmod_filename(config), @@ -279,10 +276,10 @@ rule merge_current_pulse_model_pars: patterns.output_currmod_merged_filename(config), run: import dbetto - from legendsimflow.aggregate import gen_list_of_hpges_valid_for_dtmap + from legendsimflow.aggregate import gen_list_of_hpges_valid_for_modeling # NOTE: this is guaranteed to be sorted as in the input file list - hpges = gen_list_of_hpges_valid_for_dtmap( + hpges = gen_list_of_hpges_valid_for_modeling( config, wildcards.runid, cache=SIMFLOW_CONTEXT.modelable_hpges ) diff --git a/workflow/src/legendsimflow/hpge_pars.py b/workflow/src/legendsimflow/hpge_pars.py index 066f78d..ca907ec 100644 --- a/workflow/src/legendsimflow/hpge_pars.py +++ b/workflow/src/legendsimflow/hpge_pars.py @@ -5,9 +5,9 @@ from pathlib import Path import awkward as ak -import dbetto import numpy as np from dspeed.vis import WaveformBrowser +from legendmeta import LegendMetadata from lgdo import lh5 from matplotlib import pyplot as plt from matplotlib.axes import Axes @@ -16,8 +16,8 @@ from reboost.hpge.psd import _current_pulse_model as current_pulse_model from scipy.optimize import curve_fit -from . import SimflowConfig, utils from . import metadata as mutils +from . import utils log = logging.getLogger(__name__) @@ -207,7 +207,8 @@ def plot_currmod_fit_result( def lookup_currmod_fit_inputs( - config: SimflowConfig, + l200data: str, + metadata: LegendMetadata, runid: str, hpge: str, hit_tier_name: str = "hit", @@ -216,40 +217,35 @@ def lookup_currmod_fit_inputs( Parameters ---------- - config - simflow configuration object. + l200data + The path to the L200 data production cycle. + metadata + The metadata instance runid - LEGEND-200 run identifier. + LEGEND-200 run identifier, must be of the form `{EXPERIMENT}-{PERIOD}-{RUN}-{TYPE}`. hpge name of the HPGe detector hit_tier_name name of the hit tier. This is typically "hit" or "pht". """ - l200data = config.paths.l200data + + if isinstance(l200data, str): + l200data = Path(l200data) + + dataflow_config = utils.lookup_dataflow_config(l200data) # get the reference cal run - cal_runid = mutils.reference_cal_run(config.metadata, runid) + cal_runid = mutils.reference_cal_run(metadata, runid) _, period, run, _ = re.split(r"\W+", cal_runid) msg = f"inferred reference calibration run: {cal_runid}" log.debug(msg) - # look for the dataflow config file - df_cfgs = [p for p in l200data.glob("*config.*") if not p.name.startswith(".")] - - if len(df_cfgs) > 1: - msg = f"found multiple configuration files in {l200data}, this cannot be!" - raise RuntimeError(msg) - - msg = f"found dataflow configuration file: {df_cfgs[0]}" - log.debug(msg) - central_config = dbetto.utils.load_dict(df_cfgs[0]) - # get the paths to hit and raw tier files df_cfg = ( - central_config["setups"]["l200"]["paths"] - if ("setups" in central_config) - else central_config["paths"] + dataflow_config["setups"]["l200"]["paths"] + if ("setups" in dataflow_config) + else dataflow_config["paths"] ) hit_path = Path(df_cfg[f"tier_{hit_tier_name}"].replace("$_", str(l200data))) @@ -262,21 +258,6 @@ def lookup_currmod_fit_inputs( msg = f"no hit tier files found in {hit_path}/cal/{period}/{run}" raise ValueError(msg) - lh5_group = utils._get_lh5_table(config.metadata, hit_files[0], hpge, "hit", runid) - - msg = "looking for best event to fit" - log.debug(msg) - wf_idx, file_idx = lookup_currmod_fit_data(hit_files, lh5_group) - - raw_path = Path(df_cfg["tier_raw"].replace("$_", str(l200data))) - hit_file = hit_files[file_idx] - raw_file = raw_path / str(hit_file.relative_to(hit_path)).replace( - hit_tier_name, "raw" - ) - - msg = f"determined raw file: {raw_file} (event index {wf_idx})" - log.debug(msg) - dsp_cfg_regex = r"l200-*-r%-T%-ICPC-dsp_proc_chain.*" dsp_cfg_files = list( (l200data / "inputs/dataprod/config/tier_dsp").glob(dsp_cfg_regex) @@ -291,4 +272,19 @@ def lookup_currmod_fit_inputs( msg = f"could not find a suitable dsp config file in {l200data} (or multiple found)" raise RuntimeError(msg) + lh5_group = utils._get_lh5_table(metadata, hit_files[0], hpge, "hit", runid) + + msg = "looking for best event to fit" + log.debug(msg) + + wf_idx, file_idx = lookup_currmod_fit_data(hit_files, lh5_group) + + raw_path = Path(df_cfg["tier_raw"].replace("$_", str(l200data))) + hit_file = hit_files[file_idx] + raw_file = raw_path / str(hit_file.relative_to(hit_path)).replace( + hit_tier_name, "raw" + ) + msg = f"determined raw file: {raw_file} (event index {wf_idx})" + log.debug(msg) + return raw_file.resolve(), wf_idx, dsp_cfg_files[0] diff --git a/workflow/src/legendsimflow/scripts/extract_hpge_current_pulse_model.py b/workflow/src/legendsimflow/scripts/extract_hpge_current_pulse_model.py index e5a859a..a4dcda4 100644 --- a/workflow/src/legendsimflow/scripts/extract_hpge_current_pulse_model.py +++ b/workflow/src/legendsimflow/scripts/extract_hpge_current_pulse_model.py @@ -26,9 +26,13 @@ args = nersc.dvs_ro_snakemake(snakemake) # noqa: F821 config = args.config + +if "l200data" not in args.config.paths: + msg = "Cannot extract current pars without setting the path to the l200data in the simconfig file." + raise KeyError(msg) + runid = args.wildcards.runid hpge = args.wildcards.hpge_detector -hit_tier_name = args.params.hit_tier_name metadata = args.config.metadata pars_file = args.output.pars_file plot_file = args.output.plot_file @@ -37,8 +41,16 @@ # setup logging logger = ldfs.utils.build_log(metadata.simprod.config.logging, log_file) +l200data = args.config.paths.l200data +hit_tier_name = utils.get_hit_tier_name(l200data) + +msg = f"... determined hit tier name is {hit_tier_name}" +logger.info(msg) + +logger.info("... looking up the fit inputs") raw_file, wf_idx, dsp_cfg_file = hpge_pars.lookup_currmod_fit_inputs( - config, + l200data, + metadata, runid, hpge, hit_tier_name, @@ -52,16 +64,17 @@ runid, ) -logger.info("fetching the current pulse") +logger.info("... fetching the current pulse") t, A = hpge_pars.get_current_pulse(raw_file, lh5_group, wf_idx, str(dsp_cfg_file)) -logger.info("fitting the current pulse to extract the model") +logger.info("... fitting the current pulse to extract the model") popt, x, y = hpge_pars.fit_currmod(t, A) # now plot -logger.info("plotting the fit result") +logger.info("... plotting the fit result") fig, _ = hpge_pars.plot_currmod_fit_result(t, A, x, y) decorate(fig) plt.savefig(plot_file) +logger.info("... saving outputs") dbetto.utils.write_dict(utils._curve_fit_popt_to_dict(popt), pars_file) diff --git a/workflow/src/legendsimflow/utils.py b/workflow/src/legendsimflow/utils.py index 31a67f6..d30abc4 100644 --- a/workflow/src/legendsimflow/utils.py +++ b/workflow/src/legendsimflow/utils.py @@ -16,10 +16,12 @@ from __future__ import annotations import inspect +import logging import os from datetime import datetime from pathlib import Path +import dbetto import legenddataflowscripts as ldfs from dbetto import AttrsDict from legendmeta import LegendMetadata @@ -30,6 +32,8 @@ from . import SimflowConfig from . import metadata as mutils +log = logging.getLogger(__name__) + def _merge_defaults(user: dict, default: dict) -> dict: # merge default into user without overwriting user values @@ -83,8 +87,10 @@ def _make_path(d): # NOTE: this will attempt a clone of legend-metadata, if the directory does not exist # NOTE: don't use lazy=True, we need a fully functional TextDB metadata = LegendMetadata(config.paths.metadata) + if "legend_metadata_version" in config: metadata.checkout(config.legend_metadata_version) + config["metadata"] = metadata # make sure all simflow plots are made with a consistent style @@ -137,10 +143,80 @@ def _get_lh5_table( # otherwise fall back to the old format timestamp = mutils.runinfo(metadata, runid).start_key - rawid = metadata.channelmap(timestamp)[hpge].daq.rawid + log.info(timestamp) + + chmap = metadata.channelmap(timestamp) + log.info(chmap.keys()) + rawid = chmap[hpge].daq.rawid return f"ch{rawid}/{tier}" +def lookup_dataflow_config(l200data: Path | str) -> AttrsDict: + """Find the paths to the data inputs. + + Parameters + ---------- + l200data + The path to the L200 data production cycle. + Returns + ------- + the dataflow configuration file as a dictionary. + """ + if not isinstance(l200data, Path): + l200data = Path(l200data) + + # look for the dataflow config file + df_cfgs = [p for p in l200data.glob("*config.*") if not p.name.startswith(".")] + + if len(df_cfgs) > 1: + msg = f"found multiple configuration files in {l200data}, this cannot be!" + raise RuntimeError(msg) + + if len(df_cfgs) == 0: + msg = f"did not find any valid config file in {l200data}, this cannot be!" + raise RuntimeError(msg) + + msg = f"found dataflow configuration file: {df_cfgs[0]}" + log.debug(msg) + + return dbetto.utils.load_dict(df_cfgs[0]) + + +def get_hit_tier_name(l200data: str) -> str: + """Extract the name of the hit tier for this production cycle. + + If the `pht` tier is present this is used else the `hit` tier is used. + + Parameters + ---------- + l200data + Path to the production cycle of l200 data. + """ + + dataflow_config = lookup_dataflow_config(l200data) + + df_cfg = ( + dataflow_config["setups"]["l200"]["paths"] + if ("setups" in dataflow_config) + else dataflow_config["paths"] + ) + + # first check if pht exists + has_pht = ("tier_pht" in df_cfg) and Path( + df_cfg["tier_pht"].replace("$_", str(l200data)) + ).exists() + has_hit = ("tier_hit" in df_cfg) and Path( + df_cfg["tier_hit"].replace("$_", str(l200data)) + ).exists() + + if has_pht: + return "pht" + if has_hit: + return "hit" + msg = f"The l200data {l200data} does not contain a valid pht or hit tier" + raise RuntimeError(msg) + + def _curve_fit_popt_to_dict(popt: ArrayLike) -> dict: """Get the :func:`scipy.curve_fit` parameter results as a dictionary""" params = list(inspect.signature(current_pulse_model).parameters)