From 48b60cd9e33bdc2605f0bf62da4caa0a684b17f4 Mon Sep 17 00:00:00 2001 From: tdixon Date: Sat, 20 Dec 2025 07:49:18 -0800 Subject: [PATCH 01/15] add utilities to extract A/E model pars --- tests/test_hpge_pars.py | 101 ++++++ workflow/src/legendsimflow/hpge_pars.py | 299 ++++++++++++++++++ .../scripts/extract_hpge_pulse_pars.py | 53 ++++ 3 files changed, 453 insertions(+) create mode 100644 tests/test_hpge_pars.py create mode 100644 workflow/src/legendsimflow/hpge_pars.py create mode 100644 workflow/src/legendsimflow/scripts/extract_hpge_pulse_pars.py diff --git a/tests/test_hpge_pars.py b/tests/test_hpge_pars.py new file mode 100644 index 0000000..8bfa3fb --- /dev/null +++ b/tests/test_hpge_pars.py @@ -0,0 +1,101 @@ +from __future__ import annotations + +from pathlib import Path + +import numpy as np +import pytest +from lgdo import lh5 +from matplotlib.axes import Axes +from matplotlib.figure import Figure +from scipy.stats import norm + +from legendsimflow.hpge_pars import ( + fit_waveform, + get_index, + get_parameter_dict, + get_raw_file, + get_waveform, + plot_fit, +) + + +def test_fit(): + t = np.linspace(-1000, 2000, 3001) + A = norm.pdf(t, loc=0, scale=50) + + res = fit_waveform(t, A) + + assert isinstance(res[0], np.ndarray) + assert isinstance(res[1], np.ndarray) + assert isinstance(res[2], np.ndarray) + + +def test_get_index(legend_testdata): + ref_path = legend_testdata.get_path("lh5/prod-ref-l200/") + path = ref_path / Path("generated/tier/hit/cal/p03/r001") + files = [str(p) for p in path.glob("*")] + + # we have to be very generous with the (low stats) test file + idx, file_idx = get_index(files, "ch1084803/hit", peak=100, peak_range=10) + + assert idx > -1 + assert file_idx > -1 + + # now read back in and check + + energy = lh5.read( + "ch1084803/hit/cuspEmax_ctc_cal", files[file_idx], idx=[idx] + ).view_as("np") + AoE = lh5.read("ch1084803/hit/AoE_Classifier", files[file_idx], idx=[idx]).view_as( + "np" + ) + + assert (energy > 90) & (energy < 110) + assert abs(AoE) < 1.5 + + +@pytest.fixture(scope="session") +def test_get_rawfile(legend_testdata): + ref_path = legend_testdata.get_path("lh5/prod-ref-l200/") + path = ref_path / Path("generated/tier/hit/") + raw_path = ref_path / Path("generated/tier/raw/") + + hit_file = next(p for p in path.glob("cal/p03/r001/*")) + + file = get_raw_file(hit_file, path, raw_path, hit_name="hit") + + # check file is lh5 + assert file.suffix == ".lh5" + + return file + + +def test_get_waveform(test_get_rawfile): + times, wf = get_waveform( + test_get_rawfile, + "ch1084803/raw", + idx=0, + dsp_config=None, + align=None, + current="waveform", + ) + + assert isinstance(times, np.ndarray) + assert isinstance(wf, np.ndarray) + assert len(times) == len(wf) + + +def test_plot(): + fig, ax = plot_fit( + [1, 2, 3], [0, 10, 20], np.linspace(0, 3, 1000), np.linspace(0, 3, 1000) + ) + + assert isinstance(fig, Figure) + assert isinstance(ax, Axes) + + +def test_get_parameter_dict(): + popt = [100, 10, 60, 0.6, 100, 0.2, 60] + + popt_dict = get_parameter_dict(popt) + assert len(popt_dict) == 8 diff --git a/workflow/src/legendsimflow/hpge_pars.py b/workflow/src/legendsimflow/hpge_pars.py new file mode 100644 index 0000000..2b63791 --- /dev/null +++ b/workflow/src/legendsimflow/hpge_pars.py @@ -0,0 +1,299 @@ +from __future__ import annotations + +import inspect +from pathlib import Path + +import awkward as ak +import dbetto +import numpy as np +from dspeed.vis import WaveformBrowser +from legendmeta import LegendMetadata +from lgdo import lh5 +from matplotlib import pyplot as plt +from matplotlib.axes import Axes +from matplotlib.figure import Figure +from numpy.typing import ArrayLike, NDArray +from reboost.hpge.psd import _current_pulse_model as model +from scipy.optimize import curve_fit + + +def get_index( + files: list, lh5_group: str, peak: float = 1593, peak_range: float = 5 +) -> tuple[int, int]: + """Extract the index of the event to fit""" + idxs = [] + energies = [] + dts = [] + + for file in files: + energy = lh5.read(f"{lh5_group}/cuspEmax_ctc_cal", file).view_as("np") + AoE = lh5.read(f"{lh5_group}/AoE_Classifier", file).view_as("np") + + # get drift time if possible + if f"{lh5_group}/dt_eff" in lh5.ls(file): + dt_eff = lh5.read(f"{lh5_group}/dt_eff", file).view_as("np") + else: + dt_eff = np.zeros(len(AoE)) + + idx = np.where((abs(energy - peak) < peak_range) & (abs(AoE) < 1.5))[0] + idxs.append(idx) + energies.append(energy[idx]) + dts.append(dt_eff[idx]) + n = sum(len(d) for d in dts) + + if n > 500: + break + + # now chose the best index (closest to median) + + if len(dts) == 0: + msg = "No data found in files" + raise ValueError(msg) + + dts = ak.Array(dts) + idxs = ak.Array(idxs) + energies = ak.Array(energies) + + med = np.median(ak.flatten(dts)) + best = ak.argmin(abs(dts - med)) + n = ak.num(dts, axis=-1) + + array = np.full(len(ak.flatten(dts)), False) + array[best] = True + + sel = ak.unflatten(array, n) + + idx = ak.flatten(idxs[sel])[0] + file_idx = ak.where(ak.num(dts[sel], axis=-1) == 1)[0][0] + + return idx, file_idx + + +def get_raw_file( + hit_file: Path, hit_path: Path, raw_path: Path, hit_name: str = "pht" +) -> Path: + """Get the raw file from the paths""" + + raw_file = hit_file.relative_to(hit_path) + file = Path(raw_path) / raw_file + + return file.parent / file.name.replace(hit_name, "raw") + + +def fit_waveform(times: NDArray, current: NDArray) -> tuple: + """Fit the waveform + + Parameters + ---------- + times + The timesteps of the waveform + current + The values of the waveform + + Returns + ------- + tuple of the best fit parameters, and arrays of the best fit model (time and current). + """ + t = times + A = current + + maxi = np.argmax(A) + max_t = t[maxi] + + low = max_t - 1000 + high = max_t + 1000 + + height = A[maxi] + + # initial guesses and ranges + p0 = [height, max_t, 60, 0.6, 100, 0.2, 60] + + ranges = [ + [0, max_t - 100, 1, 0, 1, 0, 1], + [height * 2, max_t + 100, 500, 1, 800, 1, 800], + ] + x = np.linspace(low, high, 1000) + y = model(x, *p0) + + # do the fit + popt, _ = curve_fit( + model, + t[(t > low) & (t < high)], + A[(t > low) & (t < high)], + p0=p0, + bounds=ranges, + ) + + x = np.linspace(low, high, int(100 * (high - low))) + y = model(x, *popt) + + return popt, x, y + + +def get_waveform( + raw_file: Path | str, + lh5_group: str, + idx: int, + dsp_config: str, + *, + current: str = "curr_av", + align: str = "tp_aoe_max", +) -> tuple: + """Extract the current waveform""" + + browser = WaveformBrowser( + str(raw_file), + lh5_group, + dsp_config=dsp_config, + lines=[current], + align=align, + ) + + browser.find_entry(idx) + + t = browser.lines[current][0].get_xdata() + A = browser.lines[current][0].get_ydata() + + return t, A + + +def plot_fit( + t: NDArray, A: NDArray, model_t: NDArray, model_A: NDArray +) -> tuple[Figure, Axes]: + """Plot the best fit reconstruction""" + fig, ax = plt.subplots() + + ax.plot(t, A, linewidth=2, label="Waveform") + ax.plot(model_t, model_A, color="red", linewidth=2, label="Fit") + + ax.legend() + ax.set_xlim(-1200, 1200) + + ax.set_xlabel("time [ns]") + ax.set_ylabel("Current [ADC]") + + return fig, ax + + +def get_parameter_dict(popt: ArrayLike) -> dict: + """Get the parameter results as a dictionary""" + + params = list(inspect.signature(model).parameters) + param_names = params[1:] + + popt_dict = dict(zip(param_names, popt, strict=True)) + popt_dict["mean_AoE"] = popt_dict["amax"] / 1593 + + for key, value in popt_dict.items(): + popt_dict[key] = float(f"{value:.3g}") + + return popt_dict + + +def get_pulse_pars( + hpge: str, + prod_cycle_path: str, + period: str, + run: str, + plot_path: str | None = None, + hit_tier: str = "pht", + use_channel_name: bool = False, + meta: LegendMetadata | None = None, +) -> dict: + """Get the parameters of the fitted waveform + + Parameters + ---------- + hpge + The hpgeector name + prod_cycle_path + The path to the production cycle with data. + period + The period to inspect. + run + The run to inspect. + plot_path + Optional path to save a plot. + hit_tier + The name of the hit level tier (typically hit or pht) + use_channel_name + Whether the data is the new format referenced by channel name, or the old one. + meta + The legendMetadata instance, if `None` will be constructed. + """ + + # extract some metadata + + config_path = next( + p + for p in Path(f"{prod_cycle_path}/").glob("*config.*") + if not p.name.startswith(".") + ) + central_config = dbetto.utils.load_dict(str(config_path)) + + if meta is None: + meta = LegendMetadata() + + timestamp = meta.datasets.runinfo[period][run].cal.start_key + + chmap = meta.channelmap(timestamp) + rawid = chmap[hpge].daq.rawid + + # get the paths + c_dict = ( + central_config["setups"]["l200"]["paths"] + if ("setups" in central_config) + else central_config["paths"] + ) + + hit_path = c_dict[f"tier_{hit_tier}"].replace("$_", prod_cycle_path) + + raw_path = c_dict["tier_raw"].replace("$_", prod_cycle_path) + + files = list(Path(f"{hit_path}/cal/{period}/{run}/").glob("*")) + + if len(files) == 0: + msg = f"No files found in {hit_path}/cal/{period}/{run}" + raise ValueError(msg) + + idx, file_idx = get_index( + files, f"hit/{hpge}" if use_channel_name else f"ch{rawid}/hit" + ) + + raw_file = get_raw_file( + files[file_idx], hit_path, raw_path, hit_name=hit_tier + ).resolve() + + # extract the waveform to fit + + if Path(f"{prod_cycle_path}/inputs/dataprod/config/tier_dsp/").exists(): + path = Path(f"{prod_cycle_path}/inputs/dataprod/config/tier_dsp/") + elif Path(f"{prod_cycle_path}/inputs/dataprod/config/tier/dsp/").exists(): + path = Path(f"{prod_cycle_path}/inputs/dataprod/config/tier/dsp/") + else: + msg = "No path with dsp config file found!" + raise RuntimeError(msg) + + config = str(next(path.glob("l200-*-r%-T%-ICPC-dsp_proc_chain.*"))) + + t, A = get_waveform( + raw_file, + current="curr_av", + lh5_group=f"raw/{hpge}" if use_channel_name else f"ch{rawid}/raw", + dsp_config=config, + idx=idx, + ) + + # now perform the fit + popt, x, y = fit_waveform(t, A) + + # now plot + if plot_path is not None: + fig, _ = plot_fit(t, A, x, y) + plt.savefig(f"{plot_path}/current_fit-{period}-{run}-{hpge}.pdf") + plt.close(fig) + + # get the results as a dict + popt_dict = get_parameter_dict(popt) + + return popt_dict, t, A diff --git a/workflow/src/legendsimflow/scripts/extract_hpge_pulse_pars.py b/workflow/src/legendsimflow/scripts/extract_hpge_pulse_pars.py new file mode 100644 index 0000000..657f094 --- /dev/null +++ b/workflow/src/legendsimflow/scripts/extract_hpge_pulse_pars.py @@ -0,0 +1,53 @@ +from __future__ import annotations + +import argparse + +import dbetto + +from legendsimflow.hpge_pars import get_pulse_pars + + +def parse_args(): + parser = argparse.ArgumentParser(description="Example script") + + parser.add_argument( + "-i", + "--data-path", + required=True, + help="Path to the data production environment", + ) + parser.add_argument( + "-p", + "--plot-path", + required=False, + default=None, + help="Path to the folder to save plots.", + ) + parser.add_argument( + "-o", "--out-file", required=True, help="Path to the output file" + ) + + parser.add_argument( + "-r", "--run", type=str, required=True, help="Run number formatted as pXY-rXYZ" + ) + parser.add_argument( + "-d", "--detectors", nargs="+", required=True, help="One or more detector names" + ) + + return parser.parse_args() + + +if __name__ == "__main__": + args = parse_args() + + period = args.run.split("-")[0] + run = args.run.split("-")[1] + + out = { + det: get_pulse_pars( + det, args.data_path, period=period, run=run, plot_path=args.plot_path + ) + for det in args.detectors + } + + dbetto.utils.write_dict(out, args.out_file) From a98e600f5c8237b66adfe28e4de2cabf0b9cf6e3 Mon Sep 17 00:00:00 2001 From: Luigi Pertoldi Date: Wed, 24 Dec 2025 10:15:53 -0800 Subject: [PATCH 02/15] add function to get the reference calibration run --- tests/conftest.py | 2 +- tests/test_metadata.py | 13 +++++++++++ workflow/src/legendsimflow/metadata.py | 32 ++++++++++++++++++++++++++ 3 files changed, 46 insertions(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index 9627d07..cb1476c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -17,7 +17,7 @@ @pytest.fixture(scope="session") def legend_testdata(): ldata = LegendTestData() - ldata.checkout("8247690") + ldata.checkout("c564b05") return ldata diff --git a/tests/test_metadata.py b/tests/test_metadata.py index 8cd41cd..90f7907 100644 --- a/tests/test_metadata.py +++ b/tests/test_metadata.py @@ -55,3 +55,16 @@ def test_run_stuff(config): assert metadata.get_runlist(config, "sis1_z8224_src1_Ra224_to_Pb208") == [ f"l200-p02-r00{r}-phy" for r in range(8) ] + + assert ( + metadata.reference_cal_run(config.metadata, "l200-p16-r006-phy") + == "l200-p16-r006-cal" + ) + assert ( + metadata.reference_cal_run(config.metadata, "l200-p16-r008-ssc") + == "l200-p16-r007-cal" + ) + assert ( + metadata.reference_cal_run(config.metadata, "l200-p16-r009-ssc") + == "l200-p16-r007-cal" + ) diff --git a/workflow/src/legendsimflow/metadata.py b/workflow/src/legendsimflow/metadata.py index dd2b476..c561f79 100644 --- a/workflow/src/legendsimflow/metadata.py +++ b/workflow/src/legendsimflow/metadata.py @@ -138,6 +138,38 @@ def runinfo(metadata: LegendMetadata, runid: str) -> str: return metadata.datasets.runinfo[period][run][datatype] +def reference_cal_run(metadata: LegendMetadata, runid: str) -> str: + """The reference calibration run for `runid`. + + Warning + ------- + This function does not account for dataflow overrides (e.g. calibration + back-applying)! + """ + exp, period, run, datatype = re.split(r"\W+", runid) + + if datatype == "cal": + return runid + + p_runinfo = metadata.datasets.runinfo[period] + + if "cal" in p_runinfo[run]: + return f"{exp}-{period}-{run}-cal" + + runs = sorted(p_runinfo.keys()) + index = runs.index(run) + + while True: + index -= 1 + if index < 0: + msg = f"there is no previous calibration run in {period} for {runid}" + raise RuntimeError(msg) + + prev_r = runs[index] + if "cal" in p_runinfo[prev_r]: + return f"{exp}-{period}-{prev_r}-cal" + + def simpars(metadata: LegendMetadata, par: str, runid: str) -> AttrsDict: """Extract simflow parameters for a certain LEGEND run. From c1808c8c7921333c002777798f1f763804a2c877 Mon Sep 17 00:00:00 2001 From: Luigi Pertoldi Date: Sat, 27 Dec 2025 08:34:11 -0800 Subject: [PATCH 03/15] refactor current model code and integrate into simflow --- simflow-config.yaml | 1 + tests/test_hpge_pars.py | 46 +-- workflow/rules/hit.smk | 92 +++++- workflow/src/legendsimflow/aggregate.py | 25 +- workflow/src/legendsimflow/hpge_pars.py | 294 ++++++++++-------- workflow/src/legendsimflow/metadata.py | 6 + workflow/src/legendsimflow/patterns.py | 34 +- .../extract_hpge_current_pulse_model.py | 73 +++++ .../scripts/extract_hpge_pulse_pars.py | 53 ---- 9 files changed, 405 insertions(+), 219 deletions(-) create mode 100644 workflow/src/legendsimflow/scripts/extract_hpge_current_pulse_model.py delete mode 100644 workflow/src/legendsimflow/scripts/extract_hpge_pulse_pars.py diff --git a/simflow-config.yaml b/simflow-config.yaml index 1c06e6b..ff61e0a 100644 --- a/simflow-config.yaml +++ b/simflow-config.yaml @@ -27,6 +27,7 @@ benchmark: stp: 10_000 paths: + l200data: <...>/public/prodenv/prod-blind/ref-v1.0.0 benchmarks: $_/generated/benchmarks log: $_/generated/log diff --git a/tests/test_hpge_pars.py b/tests/test_hpge_pars.py index 8bfa3fb..c95cc92 100644 --- a/tests/test_hpge_pars.py +++ b/tests/test_hpge_pars.py @@ -3,27 +3,19 @@ from pathlib import Path import numpy as np -import pytest from lgdo import lh5 from matplotlib.axes import Axes from matplotlib.figure import Figure from scipy.stats import norm -from legendsimflow.hpge_pars import ( - fit_waveform, - get_index, - get_parameter_dict, - get_raw_file, - get_waveform, - plot_fit, -) +from legendsimflow import hpge_pars def test_fit(): t = np.linspace(-1000, 2000, 3001) A = norm.pdf(t, loc=0, scale=50) - res = fit_waveform(t, A) + res = hpge_pars.fit_current_pulse(t, A) assert isinstance(res[0], np.ndarray) assert isinstance(res[1], np.ndarray) @@ -36,7 +28,9 @@ def test_get_index(legend_testdata): files = [str(p) for p in path.glob("*")] # we have to be very generous with the (low stats) test file - idx, file_idx = get_index(files, "ch1084803/hit", peak=100, peak_range=10) + idx, file_idx = hpge_pars.find_best_event_idx( + files, "ch1084803/hit", ewin_center=100, ewin_width=20 + ) assert idx > -1 assert file_idx > -1 @@ -54,30 +48,16 @@ def test_get_index(legend_testdata): assert abs(AoE) < 1.5 -@pytest.fixture(scope="session") -def test_get_rawfile(legend_testdata): - ref_path = legend_testdata.get_path("lh5/prod-ref-l200/") - path = ref_path / Path("generated/tier/hit/") - raw_path = ref_path / Path("generated/tier/raw/") - - hit_file = next(p for p in path.glob("cal/p03/r001/*")) - - file = get_raw_file(hit_file, path, raw_path, hit_name="hit") - - # check file is lh5 - assert file.suffix == ".lh5" - - return file - - -def test_get_waveform(test_get_rawfile): - times, wf = get_waveform( - test_get_rawfile, +def test_get_waveform(legend_testdata): + times, wf = hpge_pars.get_current_pulse( + legend_testdata.get_path( + "lh5/prod-ref-l200/generated/tier/raw/cal/p03/r001/l200-p03-r001-cal-20230318T012144Z-tier_raw.lh5" + ), "ch1084803/raw", idx=0, dsp_config=None, + dsp_output="waveform", align=None, - current="waveform", ) assert isinstance(times, np.ndarray) @@ -86,7 +66,7 @@ def test_get_waveform(test_get_rawfile): def test_plot(): - fig, ax = plot_fit( + fig, ax = hpge_pars.plot_fit_result( [1, 2, 3], [0, 10, 20], np.linspace(0, 3, 1000), np.linspace(0, 3, 1000) ) @@ -97,5 +77,5 @@ def test_plot(): def test_get_parameter_dict(): popt = [100, 10, 60, 0.6, 100, 0.2, 60] - popt_dict = get_parameter_dict(popt) + popt_dict = hpge_pars._curve_fit_popt_to_dict(popt) assert len(popt_dict) == 8 diff --git a/workflow/rules/hit.smk b/workflow/rules/hit.smk index 3ba5fe7..bbba7f7 100644 --- a/workflow/rules/hit.smk +++ b/workflow/rules/hit.smk @@ -1,6 +1,8 @@ +import re + from dbetto.utils import load_dict -from legendsimflow import patterns, aggregate +from legendsimflow import patterns, aggregate, hpge_pars from legendsimflow import metadata as mutils from legendsimflow import nersc @@ -68,9 +70,10 @@ rule build_tier_hit: input: geom=patterns.geom_gdml_filename(config, tier="stp"), stp_file=patterns.output_simjob_filename(config, tier="stp"), - # NOTE: we pass here the full list of maps, but likely not all of them - # will be used. room for improvement + # NOTE: we pass here the full list of maps/current models, but likely + # not all of them will be used. room for improvement hpge_dtmaps=lambda wc: aggregate.gen_list_of_merged_dtmaps(config, wc.simid), + hpge_currmods=lambda wc: aggregate.gen_list_of_merged_currmods(config, wc.simid), # NOTE: technically this rule only depends on one block in the # partitioning file, but in practice the full file will always change simstat_part_file=rules.make_simstat_partition_file.output[0], @@ -87,6 +90,7 @@ rule build_tier_hit: def smk_hpge_drift_time_map_inputs(wildcards): + """Prepare inputs for the HPGe drift time map rule.""" meta = config.metadata.hardware.detectors.germanium.diodes[wildcards.hpge_detector] ids = {"bege": "B", "coax": "C", "ppc": "P", "icpc": "V"} crystal_name = ( @@ -197,3 +201,85 @@ rule merge_hpge_drift_time_maps: done done """ + + +def smk_extract_current_pulse_model_inputs(wildcards): + """Prepare inputs for the HPGe current model extraction rule.""" + raw_file, wf_idx, dsp_cfg_file = hpge_pars.current_pulse_model_inputs( + config, + wildcards.runid, + wildcards.hpge_detector, + hit_tier_name="hit", + use_hpge_name="true", + ) + + evt_idx_file = patterns.input_currmod_evt_idx_file( + config, runid=wildcards.runid, hpge_detector=wildcards.hpge_detector + ) + + with evt_idx_file.open() as f: + f.write(wf_idx) + + return { + "raw_file": raw_file, + "raw_wf_idx_file": evt_idx_file, + "dsp_cfg_file": dsp_cfg_file, + } + + +rule extract_current_pulse_model: + """Extract the HPGe current signal model. + + Perform a fit of the current waveform and stores the best-fit model + parameters in a YAML file. + + :::{warning} + This rule does not have the relevant LEGEND-200 data files as input, since + they are dynamically discovered and this would therefore slow down the DAG + generation. Therefore, remember to force-rerun if the input data is + updated! + ::: + + Uses wildcards `runid` and `hpge_detector`. + """ + message: + "Extracting current model for detector {wildcards.hpge_detector} in {wildcards.runid}" + # NOTE: we don't list the file dependencies here because they are + # dynamically generated, and that would slow down the DAG generation + # input: + # unpack(smk_extract_current_pulse_model_inputs), + params: + # evt_idx=parse_input(input.raw_wf_idx_file, parser=mutils.extract_integer), + hit_tier_name="hit", + output: + pars_file=temp(patterns.output_currmod_filename(config)), + plot_file=patterns.plot_currmod_filename(config), + log: + patterns.log_currmod_filename(config, SIMFLOW_CONTEXT.proctime), + script: + "../src/legendsimflow/scripts/extract_hpge_current_pulse_model.py" + + +rule merge_current_pulse_model_pars: + """Merge the HPGe current signal model parameters in a single file per `runid`. + + Uses wildcard `runid`. + """ + message: + "Merging current model parameters in {wildcards.runid}" + input: + lambda wc: aggregate.gen_list_of_currmods(config, wc.runid), + output: + patterns.output_currmod_merged_filename(config), + run: + import dbetto + from legendsimflow.aggregate import gen_list_of_hpges_valid_for_dtmap + + # NOTE: this is guaranteed to be sorted as in the input file list + hpges = gen_list_of_hpges_valid_for_dtmap(config, wildcards.runid) + + out_dict = {} + for i, f in enumerate(input): + out_dict[hpges[i]] = dbetto.utils.load_dict(f) + + dbetto.utils.write_dict(out_dict, output[0]) diff --git a/workflow/src/legendsimflow/aggregate.py b/workflow/src/legendsimflow/aggregate.py index f08d6e9..2de3ea4 100644 --- a/workflow/src/legendsimflow/aggregate.py +++ b/workflow/src/legendsimflow/aggregate.py @@ -127,7 +127,7 @@ def start_key(config: SimflowConfig, runid: str) -> str: def gen_list_of_hpges_valid_for_dtmap(config: SimflowConfig, runid: str) -> list[str]: - """Make a list of HPGe detector for which we want to generate a drift time map. + """Make a sorted list of HPGe detector for which we want to generate a drift time map. It generates the list of deployed detectors in `runid` via the LEGEND channelmap, then checks if in the crystal metadata there's all the @@ -139,6 +139,10 @@ def gen_list_of_hpges_valid_for_dtmap(config: SimflowConfig, runid: str) -> list hpges = [] for _, hpge in chmap.group("system").geds.items(): + # TEMPORARY HACK + if hpge.name in ("V00050A", "V13046A", "V00048B", "V14654A"): + continue + m = crystal_meta( config, config.metadata.hardware.detectors.germanium.diodes[hpge.name] ) @@ -157,7 +161,7 @@ def gen_list_of_hpges_valid_for_dtmap(config: SimflowConfig, runid: str) -> list msg = f"the list of HPGes valid for drift time map generation in {runid} is empty!" log.warning(msg) - return hpges + return sorted(hpges) def gen_list_of_all_runids(config): @@ -207,6 +211,23 @@ def gen_list_of_all_dtmap_plots_outputs(config: SimflowConfig) -> set[str]: return files +def gen_list_of_currmods(config: SimflowConfig, runid: str) -> list[str]: + """Generate the list of HPGe current model parameter files for a `runid`.""" + hpges = gen_list_of_hpges_valid_for_dtmap(config, runid) + return [ + patterns.output_currmod_filename(config, hpge_detector=hpge, runid=runid) + for hpge in hpges + ] + + +def gen_list_of_merged_currmods(config: SimflowConfig, simid: str) -> list[str]: + r"""Generate the list of (merged) HPGe current model parameter files for all requested `runid`\ s.""" + return [ + patterns.output_currmod_merged_filename(config, runid=runid) + for runid in get_runlist(config, simid) + ] + + def process_simlist( config: SimflowConfig, simlist: Iterable[str] | None = None ) -> list[Path]: diff --git a/workflow/src/legendsimflow/hpge_pars.py b/workflow/src/legendsimflow/hpge_pars.py index 2b63791..e2498ad 100644 --- a/workflow/src/legendsimflow/hpge_pars.py +++ b/workflow/src/legendsimflow/hpge_pars.py @@ -1,6 +1,8 @@ from __future__ import annotations import inspect +import logging +import re from pathlib import Path import awkward as ak @@ -13,19 +15,81 @@ from matplotlib.axes import Axes from matplotlib.figure import Figure from numpy.typing import ArrayLike, NDArray -from reboost.hpge.psd import _current_pulse_model as model +from reboost.hpge.psd import _current_pulse_model as current_pulse_model from scipy.optimize import curve_fit +from . import SimflowConfig +from . import metadata as mutils -def get_index( - files: list, lh5_group: str, peak: float = 1593, peak_range: float = 5 +log = logging.getLogger(__name__) + + +# FIXME: this should be removed once the PRL25 data is reprocessed +def get_lh5_table( + metadata: LegendMetadata, + fname: str | Path, + hpge: str, + tier: str, + runid: str, +) -> str: + """The correct LH5 table path. + + Determines the correct path to a `hpge` detector table in tier `tier`. + """ + # check if the latest format is available + path = f"{tier}/{hpge}" + if lh5.ls(fname, path) == [path]: + return path + + # otherwise fall back to the old format + timestamp = mutils.runinfo(metadata, runid).start_key + rawid = metadata.channelmap(timestamp)[hpge].daq.rawid + return f"ch{rawid}/{tier}" + + +def _curve_fit_popt_to_dict(popt: ArrayLike) -> dict: + """Get the ``scipy.curve_fit()`` parameter results as a dictionary""" + params = list(inspect.signature(current_pulse_model).parameters) + param_names = params[1:] + + popt_dict = dict(zip(param_names, popt, strict=True)) + popt_dict["mean_AoE"] = popt_dict["amax"] / 1593 + + for key, value in popt_dict.items(): + popt_dict[key] = float(f"{value:.3g}") + + return popt_dict + + +def find_best_event_idx( + hit_files: list[str, Path], + lh5_group: str, + ewin_center: float = 1593, + ewin_width: float = 10, ) -> tuple[int, int]: - """Extract the index of the event to fit""" + """Extract the index of the event to fit. + + Returns the index of the event in the file and the index of the file in the + input file list. + + Parameters + ---------- + hit_files + tier-hit files used to determine the best index. + lh5_group + where the tier-hit data is found in the files. + ewin_center + center of the energy window to use for the event search (same units as + in data). + ewin_width + width of the energy window to use for the event search (dame units as + in data). + """ idxs = [] energies = [] dts = [] - for file in files: + for file in hit_files: energy = lh5.read(f"{lh5_group}/cuspEmax_ctc_cal", file).view_as("np") AoE = lh5.read(f"{lh5_group}/AoE_Classifier", file).view_as("np") @@ -35,19 +99,21 @@ def get_index( else: dt_eff = np.zeros(len(AoE)) - idx = np.where((abs(energy - peak) < peak_range) & (abs(AoE) < 1.5))[0] + idx = np.where((abs(energy - ewin_center) < ewin_width / 2) & (abs(AoE) < 1.5))[ + 0 + ] idxs.append(idx) energies.append(energy[idx]) dts.append(dt_eff[idx]) + n = sum(len(d) for d in dts) if n > 500: break # now chose the best index (closest to median) - if len(dts) == 0: - msg = "No data found in files" + msg = "no data found in the considered hit files" raise ValueError(msg) dts = ak.Array(dts) @@ -69,30 +135,23 @@ def get_index( return idx, file_idx -def get_raw_file( - hit_file: Path, hit_path: Path, raw_path: Path, hit_name: str = "pht" -) -> Path: - """Get the raw file from the paths""" - - raw_file = hit_file.relative_to(hit_path) - file = Path(raw_path) / raw_file +def fit_current_pulse(times: NDArray, current: NDArray) -> tuple: + """Fit the model to the raw HPGe current pulse. - return file.parent / file.name.replace(hit_name, "raw") - - -def fit_waveform(times: NDArray, current: NDArray) -> tuple: - """Fit the waveform + Uses :func:`scipy.curve_fit` to fit + :func:`reboost.hpge.psd._current_pulse_model` to the input raw pulse. Parameters ---------- times - The timesteps of the waveform + the timesteps of the pulse. current - The values of the waveform + the values of the pulse. Returns ------- - tuple of the best fit parameters, and arrays of the best fit model (time and current). + tuple of the best fit parameters, and arrays of the best fit model + (time and current). """ t = times A = current @@ -113,11 +172,11 @@ def fit_waveform(times: NDArray, current: NDArray) -> tuple: [height * 2, max_t + 100, 500, 1, 800, 1, 800], ] x = np.linspace(low, high, 1000) - y = model(x, *p0) + y = current_pulse_model(x, *p0) # do the fit popt, _ = curve_fit( - model, + current_pulse_model, t[(t > low) & (t < high)], A[(t > low) & (t < high)], p0=p0, @@ -125,42 +184,59 @@ def fit_waveform(times: NDArray, current: NDArray) -> tuple: ) x = np.linspace(low, high, int(100 * (high - low))) - y = model(x, *popt) + y = current_pulse_model(x, *popt) return popt, x, y -def get_waveform( +def get_current_pulse( raw_file: Path | str, lh5_group: str, idx: int, dsp_config: str, *, - current: str = "curr_av", + dsp_output: str = "curr_av", align: str = "tp_aoe_max", ) -> tuple: - """Extract the current waveform""" + """Extract the current pulse. + + Parameters + ---------- + raw_file + path to the raw tier file. + lh5_group + where to find the waveform table. + idx + the index of the waveform to read. + dsp_config + the :mod:`dspeed` configuration file defining the DSP processing chain + to estimate the current pulse. + dsp_output + the name of the DSP output corresponding to the current pulse. + align + DSP value around which the pulses are aligned. + """ browser = WaveformBrowser( str(raw_file), lh5_group, dsp_config=dsp_config, - lines=[current], + lines=[dsp_output], align=align, ) browser.find_entry(idx) - t = browser.lines[current][0].get_xdata() - A = browser.lines[current][0].get_ydata() + t = browser.lines[dsp_output][0].get_xdata() + A = browser.lines[dsp_output][0].get_ydata() return t, A -def plot_fit( +def plot_fit_result( t: NDArray, A: NDArray, model_t: NDArray, model_A: NDArray ) -> tuple[Figure, Axes]: - """Plot the best fit reconstruction""" + """Plot the best fit results.""" fig, ax = plt.subplots() ax.plot(t, A, linewidth=2, label="Waveform") @@ -175,125 +251,89 @@ def plot_fit( return fig, ax -def get_parameter_dict(popt: ArrayLike) -> dict: - """Get the parameter results as a dictionary""" - - params = list(inspect.signature(model).parameters) - param_names = params[1:] - - popt_dict = dict(zip(param_names, popt, strict=True)) - popt_dict["mean_AoE"] = popt_dict["amax"] / 1593 - - for key, value in popt_dict.items(): - popt_dict[key] = float(f"{value:.3g}") - - return popt_dict - - -def get_pulse_pars( +def current_pulse_model_inputs( + config: SimflowConfig, + runid: str, hpge: str, - prod_cycle_path: str, - period: str, - run: str, - plot_path: str | None = None, - hit_tier: str = "pht", - use_channel_name: bool = False, - meta: LegendMetadata | None = None, -) -> dict: - """Get the parameters of the fitted waveform + hit_tier_name: str = "hit", +) -> tuple[Path, int, Path]: + """Find the raw file, event index and DSP configuration file. Parameters ---------- + config + simflow configuration object. + runid + LEGEND-200 run identifier. hpge - The hpgeector name - prod_cycle_path - The path to the production cycle with data. - period - The period to inspect. - run - The run to inspect. - plot_path - Optional path to save a plot. - hit_tier - The name of the hit level tier (typically hit or pht) - use_channel_name - Whether the data is the new format referenced by channel name, or the old one. - meta - The legendMetadata instance, if `None` will be constructed. + name of the HPGe detector + hit_tier_name + name of the hit tier. This is typically "hit" or "pht". """ + l200data = config.paths.l200data - # extract some metadata + # get the reference cal run + cal_runid = mutils.reference_cal_run(config.metadata, runid) + _, period, run, _ = re.split(r"\W+", cal_runid) - config_path = next( - p - for p in Path(f"{prod_cycle_path}/").glob("*config.*") - if not p.name.startswith(".") - ) - central_config = dbetto.utils.load_dict(str(config_path)) + msg = f"inferred reference calibration run: {cal_runid}" + log.debug(msg) - if meta is None: - meta = LegendMetadata() + # look for the dataflow config file + df_cfgs = [p for p in l200data.glob("*config.*") if not p.name.startswith(".")] - timestamp = meta.datasets.runinfo[period][run].cal.start_key + if len(df_cfgs) > 1: + msg = f"found multiple configuration files in {l200data}, this cannot be!" + raise RuntimeError(msg) - chmap = meta.channelmap(timestamp) - rawid = chmap[hpge].daq.rawid + msg = f"found dataflow configuration file: {df_cfgs[0]}" + log.debug(msg) + central_config = dbetto.utils.load_dict(df_cfgs[0]) - # get the paths - c_dict = ( + # get the paths to hit and raw tier files + df_cfg = ( central_config["setups"]["l200"]["paths"] if ("setups" in central_config) else central_config["paths"] ) - hit_path = c_dict[f"tier_{hit_tier}"].replace("$_", prod_cycle_path) - - raw_path = c_dict["tier_raw"].replace("$_", prod_cycle_path) + hit_path = Path(df_cfg[f"tier_{hit_tier_name}"].replace("$_", str(l200data))) + msg = f"looking for hit tier files in {hit_path / 'cal' / period / run}/*" + log.debug(msg) - files = list(Path(f"{hit_path}/cal/{period}/{run}/").glob("*")) + hit_files = list((hit_path / "cal" / period / run).glob("*")) - if len(files) == 0: - msg = f"No files found in {hit_path}/cal/{period}/{run}" + if len(hit_files) == 0: + msg = f"no hit tier files found in {hit_path}/cal/{period}/{run}" raise ValueError(msg) - idx, file_idx = get_index( - files, f"hit/{hpge}" if use_channel_name else f"ch{rawid}/hit" - ) - - raw_file = get_raw_file( - files[file_idx], hit_path, raw_path, hit_name=hit_tier - ).resolve() + lh5_group = get_lh5_table(config.metadata, hit_files[0], hpge, "hit", runid) - # extract the waveform to fit + msg = "looking for best event to fit" + log.debug(msg) + wf_idx, file_idx = find_best_event_idx(hit_files, lh5_group) - if Path(f"{prod_cycle_path}/inputs/dataprod/config/tier_dsp/").exists(): - path = Path(f"{prod_cycle_path}/inputs/dataprod/config/tier_dsp/") - elif Path(f"{prod_cycle_path}/inputs/dataprod/config/tier/dsp/").exists(): - path = Path(f"{prod_cycle_path}/inputs/dataprod/config/tier/dsp/") - else: - msg = "No path with dsp config file found!" - raise RuntimeError(msg) + raw_path = Path(df_cfg["tier_raw"].replace("$_", str(l200data))) + hit_file = hit_files[file_idx] + raw_file = raw_path / str(hit_file.relative_to(hit_path)).replace( + hit_tier_name, "raw" + ) - config = str(next(path.glob("l200-*-r%-T%-ICPC-dsp_proc_chain.*"))) + msg = f"determined raw file: {raw_file} (event index {wf_idx})" + log.debug(msg) - t, A = get_waveform( - raw_file, - current="curr_av", - lh5_group=f"raw/{hpge}" if use_channel_name else f"ch{rawid}/raw", - dsp_config=config, - idx=idx, + dsp_cfg_regex = r"l200-*-r%-T%-ICPC-dsp_proc_chain.*" + dsp_cfg_files = list( + (l200data / "inputs/dataprod/config/tier_dsp").glob(dsp_cfg_regex) ) - # now perform the fit - popt, x, y = fit_waveform(t, A) - - # now plot - if plot_path is not None: - fig, _ = plot_fit(t, A, x, y) - plt.savefig(f"{plot_path}/current_fit-{period}-{run}-{hpge}.pdf") - plt.close(fig) + if dsp_cfg_files == []: + dsp_cfg_files = list( + (l200data / "inputs/dataprod/config/tier/dsp").glob(dsp_cfg_regex) + ) - # get the results as a dict - popt_dict = get_parameter_dict(popt) + if len(dsp_cfg_files) != 1: + msg = f"could not find a suitable dsp config file in {l200data} (or multiple found)" + raise RuntimeError(msg) - return popt_dict, t, A + return raw_file.resolve(), wf_idx, dsp_cfg_files[0] diff --git a/workflow/src/legendsimflow/metadata.py b/workflow/src/legendsimflow/metadata.py index c561f79..9a6a80d 100644 --- a/workflow/src/legendsimflow/metadata.py +++ b/workflow/src/legendsimflow/metadata.py @@ -20,6 +20,7 @@ import logging import re from collections.abc import Collection +from pathlib import Path from dbetto import AttrsDict from legendmeta import LegendMetadata @@ -124,6 +125,11 @@ def smk_hash_simconfig( return hash_dict(scfg) +def extract_integer(file_path: Path) -> int: + with file_path.open() as f: + return int(f.read().strip()) + + def runinfo(metadata: LegendMetadata, runid: str) -> str: """Get the `datasets.runinfo` entry for a LEGEND run identifier. diff --git a/workflow/src/legendsimflow/patterns.py b/workflow/src/legendsimflow/patterns.py index 71ba9c1..c44b9fd 100644 --- a/workflow/src/legendsimflow/patterns.py +++ b/workflow/src/legendsimflow/patterns.py @@ -221,7 +221,7 @@ def plot_dtmap_filename(config: SimflowConfig, **kwargs) -> Path: return _expand(pat, **kwargs) -def benchmark_dtmap_filename(config: SimflowConfig, **kwargs): +def benchmark_dtmap_filename(config: SimflowConfig, **kwargs) -> Path: pat = ( config.paths.benchmarks / "hpge/dtmaps/{runid}-{hpge_detector}-drift-time-map.tsv" @@ -229,6 +229,38 @@ def benchmark_dtmap_filename(config: SimflowConfig, **kwargs): return expand(pat, **kwargs, allow_missing=True)[0] +# hpge current model + + +def input_currmod_evt_idx_file(config: SimflowConfig, **kwargs) -> Path: + pat = config.paths.genmeta / "hpge/currmod/{runid}-{hpge_detector}-best-evt-idx.txt" + return expand(pat, **kwargs, allow_missing=True)[0] + + +def output_currmod_filename(config: SimflowConfig, **kwargs) -> Path: + return _expand( + config.paths.genmeta / "hpge/currmod/{runid}-{hpge_detector}-model.yaml", + **kwargs, + ) + + +def output_currmod_merged_filename(config: SimflowConfig, **kwargs) -> Path: + return _expand( + config.paths.genmeta / "hpge/currmod/{runid}-models.yaml", + **kwargs, + ) + + +def log_currmod_filename(config: SimflowConfig, time: str, **kwargs) -> Path: + pat = log_dirname(config, time) / "hpge/currmod/{runid}-{hpge_detector}-model.log" + return _expand(pat, **kwargs) + + +def plot_currmod_filename(config: SimflowConfig, **kwargs) -> Path: + pat = config.paths.plots / "hpge/currmod/{runid}-{hpge_detector}-fit-result.pdf" + return _expand(pat, **kwargs) + + # hit tier diff --git a/workflow/src/legendsimflow/scripts/extract_hpge_current_pulse_model.py b/workflow/src/legendsimflow/scripts/extract_hpge_current_pulse_model.py new file mode 100644 index 0000000..722cd03 --- /dev/null +++ b/workflow/src/legendsimflow/scripts/extract_hpge_current_pulse_model.py @@ -0,0 +1,73 @@ +# ruff: noqa: I002 + +# Copyright (C) 2025 Luigi Pertoldi , +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see . + +import dbetto +import legenddataflowscripts as ldfs +import legenddataflowscripts.utils +import matplotlib.pyplot as plt + +from legendsimflow import hpge_pars, nersc +from legendsimflow.plot import decorate + +args = nersc.dvs_ro_snakemake(snakemake) # noqa: F821 + +config = args.config +runid = args.wildcards.runid +hpge = args.wildcards.hpge_detector +hit_tier_name = args.params.hit_tier_name +metadata = args.config.metadata +pars_file = args.output.pars_file +plot_file = args.output.plot_file +log_file = args.log[0] + +# setup logging +logger = ldfs.utils.build_log(metadata.simprod.config.logging, log_file) + +raw_file, wf_idx, dsp_cfg_file = hpge_pars.current_pulse_model_inputs( + config, + runid, + hpge, + hit_tier_name=hit_tier_name, +) + +lh5_group = hpge_pars.get_lh5_table( + metadata, + raw_file, + hpge, + "raw", + runid, +) + +logger.info("fetching the current pulse") +t, A = hpge_pars.get_current_pulse( + raw_file, + current="curr_av", + lh5_group=lh5_group, + dsp_config=str(dsp_cfg_file), + idx=wf_idx, +) + +logger.info("fitting the current pulse to extract the model") +popt, x, y = hpge_pars.fit_current_pulse(t, A) + +# now plot +logger.info("plotting the fit result") +fig, _ = hpge_pars.plot_fit_result(t, A, x, y) +decorate(fig) +plt.savefig(plot_file) + +dbetto.utils.write_dict(hpge_pars._curve_fit_popt_to_dict(popt), pars_file) diff --git a/workflow/src/legendsimflow/scripts/extract_hpge_pulse_pars.py b/workflow/src/legendsimflow/scripts/extract_hpge_pulse_pars.py deleted file mode 100644 index 657f094..0000000 --- a/workflow/src/legendsimflow/scripts/extract_hpge_pulse_pars.py +++ /dev/null @@ -1,53 +0,0 @@ -from __future__ import annotations - -import argparse - -import dbetto - -from legendsimflow.hpge_pars import get_pulse_pars - - -def parse_args(): - parser = argparse.ArgumentParser(description="Example script") - - parser.add_argument( - "-i", - "--data-path", - required=True, - help="Path to the data production environment", - ) - parser.add_argument( - "-p", - "--plot-path", - required=False, - default=None, - help="Path to the folder to save plots.", - ) - parser.add_argument( - "-o", "--out-file", required=True, help="Path to the output file" - ) - - parser.add_argument( - "-r", "--run", type=str, required=True, help="Run number formatted as pXY-rXYZ" - ) - parser.add_argument( - "-d", "--detectors", nargs="+", required=True, help="One or more detector names" - ) - - return parser.parse_args() - - -if __name__ == "__main__": - args = parse_args() - - period = args.run.split("-")[0] - run = args.run.split("-")[1] - - out = { - det: get_pulse_pars( - det, args.data_path, period=period, run=run, plot_path=args.plot_path - ) - for det in args.detectors - } - - dbetto.utils.write_dict(out, args.out_file) From e9072df24db2351fa1f81c87ed081cc692284a33 Mon Sep 17 00:00:00 2001 From: Luigi Pertoldi Date: Sat, 27 Dec 2025 18:12:33 +0100 Subject: [PATCH 04/15] rename hpge_pars functions --- tests/test_hpge_pars.py | 13 +--- tests/test_utils.py | 10 +++ workflow/rules/hit.smk | 2 +- workflow/src/legendsimflow/hpge_pars.py | 63 ++++--------------- .../extract_hpge_current_pulse_model.py | 20 +++--- workflow/src/legendsimflow/utils.py | 42 +++++++++++++ 6 files changed, 75 insertions(+), 75 deletions(-) create mode 100644 tests/test_utils.py diff --git a/tests/test_hpge_pars.py b/tests/test_hpge_pars.py index c95cc92..f9e72d0 100644 --- a/tests/test_hpge_pars.py +++ b/tests/test_hpge_pars.py @@ -15,7 +15,7 @@ def test_fit(): t = np.linspace(-1000, 2000, 3001) A = norm.pdf(t, loc=0, scale=50) - res = hpge_pars.fit_current_pulse(t, A) + res = hpge_pars.fit_currmod(t, A) assert isinstance(res[0], np.ndarray) assert isinstance(res[1], np.ndarray) @@ -28,7 +28,7 @@ def test_get_index(legend_testdata): files = [str(p) for p in path.glob("*")] # we have to be very generous with the (low stats) test file - idx, file_idx = hpge_pars.find_best_event_idx( + idx, file_idx = hpge_pars.lookup_currmod_fit_data( files, "ch1084803/hit", ewin_center=100, ewin_width=20 ) @@ -66,16 +66,9 @@ def test_get_waveform(legend_testdata): def test_plot(): - fig, ax = hpge_pars.plot_fit_result( + fig, ax = hpge_pars.plot_currmod_fit_result( [1, 2, 3], [0, 10, 20], np.linspace(0, 3, 1000), np.linspace(0, 3, 1000) ) assert isinstance(fig, Figure) assert isinstance(ax, Axes) - - -def test_get_parameter_dict(): - popt = [100, 10, 60, 0.6, 100, 0.2, 60] - - popt_dict = hpge_pars._curve_fit_popt_to_dict(popt) - assert len(popt_dict) == 8 diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..ef92d5b --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,10 @@ +from __future__ import annotations + +from legendsimflow import utils + + +def test_get_parameter_dict(): + popt = [100, 10, 60, 0.6, 100, 0.2, 60] + + popt_dict = utils._curve_fit_popt_to_dict(popt) + assert len(popt_dict) == 8 diff --git a/workflow/rules/hit.smk b/workflow/rules/hit.smk index bbba7f7..76c8949 100644 --- a/workflow/rules/hit.smk +++ b/workflow/rules/hit.smk @@ -205,7 +205,7 @@ rule merge_hpge_drift_time_maps: def smk_extract_current_pulse_model_inputs(wildcards): """Prepare inputs for the HPGe current model extraction rule.""" - raw_file, wf_idx, dsp_cfg_file = hpge_pars.current_pulse_model_inputs( + raw_file, wf_idx, dsp_cfg_file = hpge_pars.find_current_pulse_model_inputs( config, wildcards.runid, wildcards.hpge_detector, diff --git a/workflow/src/legendsimflow/hpge_pars.py b/workflow/src/legendsimflow/hpge_pars.py index e2498ad..4885f90 100644 --- a/workflow/src/legendsimflow/hpge_pars.py +++ b/workflow/src/legendsimflow/hpge_pars.py @@ -1,6 +1,5 @@ from __future__ import annotations -import inspect import logging import re from pathlib import Path @@ -9,59 +8,21 @@ import dbetto import numpy as np from dspeed.vis import WaveformBrowser -from legendmeta import LegendMetadata from lgdo import lh5 from matplotlib import pyplot as plt from matplotlib.axes import Axes from matplotlib.figure import Figure -from numpy.typing import ArrayLike, NDArray +from numpy.typing import NDArray from reboost.hpge.psd import _current_pulse_model as current_pulse_model from scipy.optimize import curve_fit -from . import SimflowConfig +from . import SimflowConfig, utils from . import metadata as mutils log = logging.getLogger(__name__) -# FIXME: this should be removed once the PRL25 data is reprocessed -def get_lh5_table( - metadata: LegendMetadata, - fname: str | Path, - hpge: str, - tier: str, - runid: str, -) -> str: - """The correct LH5 table path. - - Determines the correct path to a `hpge` detector table in tier `tier`. - """ - # check if the latest format is available - path = f"{tier}/{hpge}" - if lh5.ls(fname, path) == [path]: - return path - - # otherwise fall back to the old format - timestamp = mutils.runinfo(metadata, runid).start_key - rawid = metadata.channelmap(timestamp)[hpge].daq.rawid - return f"ch{rawid}/{tier}" - - -def _curve_fit_popt_to_dict(popt: ArrayLike) -> dict: - """Get the ``scipy.curve_fit()`` parameter results as a dictionary""" - params = list(inspect.signature(current_pulse_model).parameters) - param_names = params[1:] - - popt_dict = dict(zip(param_names, popt, strict=True)) - popt_dict["mean_AoE"] = popt_dict["amax"] / 1593 - - for key, value in popt_dict.items(): - popt_dict[key] = float(f"{value:.3g}") - - return popt_dict - - -def find_best_event_idx( +def lookup_currmod_fit_data( hit_files: list[str, Path], lh5_group: str, ewin_center: float = 1593, @@ -69,8 +30,9 @@ def find_best_event_idx( ) -> tuple[int, int]: """Extract the index of the event to fit. - Returns the index of the event in the file and the index of the file in the - input file list. + Considers events with |A/E| < 1.5 and finds the one that is closest to the + median of the distribution. Returns the index of the event in the file and + the index of the file in the input file list. Parameters ---------- @@ -135,7 +97,7 @@ def find_best_event_idx( return idx, file_idx -def fit_current_pulse(times: NDArray, current: NDArray) -> tuple: +def fit_currmod(times: NDArray, current: NDArray) -> tuple: """Fit the model to the raw HPGe current pulse. Uses :func:`scipy.curve_fit` to fit @@ -194,7 +156,6 @@ def get_current_pulse( lh5_group: str, idx: int, dsp_config: str, - *, dsp_output: str = "curr_av", align: str = "tp_aoe_max", ) -> tuple: @@ -233,7 +194,7 @@ def get_current_pulse( return t, A -def plot_fit_result( +def plot_currmod_fit_result( t: NDArray, A: NDArray, model_t: NDArray, model_A: NDArray ) -> tuple[Figure, Axes]: """Plot the best fit results.""" @@ -245,13 +206,13 @@ def plot_fit_result( ax.legend() ax.set_xlim(-1200, 1200) - ax.set_xlabel("time [ns]") + ax.set_xlabel("Time [ns]") ax.set_ylabel("Current [ADC]") return fig, ax -def current_pulse_model_inputs( +def lookup_currmod_fit_inputs( config: SimflowConfig, runid: str, hpge: str, @@ -307,11 +268,11 @@ def current_pulse_model_inputs( msg = f"no hit tier files found in {hit_path}/cal/{period}/{run}" raise ValueError(msg) - lh5_group = get_lh5_table(config.metadata, hit_files[0], hpge, "hit", runid) + lh5_group = utils._get_lh5_table(config.metadata, hit_files[0], hpge, "hit", runid) msg = "looking for best event to fit" log.debug(msg) - wf_idx, file_idx = find_best_event_idx(hit_files, lh5_group) + wf_idx, file_idx = lookup_currmod_fit_data(hit_files, lh5_group) raw_path = Path(df_cfg["tier_raw"].replace("$_", str(l200data))) hit_file = hit_files[file_idx] diff --git a/workflow/src/legendsimflow/scripts/extract_hpge_current_pulse_model.py b/workflow/src/legendsimflow/scripts/extract_hpge_current_pulse_model.py index 722cd03..a337eff 100644 --- a/workflow/src/legendsimflow/scripts/extract_hpge_current_pulse_model.py +++ b/workflow/src/legendsimflow/scripts/extract_hpge_current_pulse_model.py @@ -20,7 +20,7 @@ import legenddataflowscripts.utils import matplotlib.pyplot as plt -from legendsimflow import hpge_pars, nersc +from legendsimflow import hpge_pars, nersc, utils from legendsimflow.plot import decorate args = nersc.dvs_ro_snakemake(snakemake) # noqa: F821 @@ -37,11 +37,11 @@ # setup logging logger = ldfs.utils.build_log(metadata.simprod.config.logging, log_file) -raw_file, wf_idx, dsp_cfg_file = hpge_pars.current_pulse_model_inputs( +raw_file, wf_idx, dsp_cfg_file = hpge_pars.lookup_currmod_fit_inputs( config, runid, hpge, - hit_tier_name=hit_tier_name, + hit_tier_name, ) lh5_group = hpge_pars.get_lh5_table( @@ -53,21 +53,15 @@ ) logger.info("fetching the current pulse") -t, A = hpge_pars.get_current_pulse( - raw_file, - current="curr_av", - lh5_group=lh5_group, - dsp_config=str(dsp_cfg_file), - idx=wf_idx, -) +t, A = hpge_pars.get_current_pulse(raw_file, lh5_group, wf_idx, str(dsp_cfg_file)) logger.info("fitting the current pulse to extract the model") -popt, x, y = hpge_pars.fit_current_pulse(t, A) +popt, x, y = hpge_pars.fit_currmod(t, A) # now plot logger.info("plotting the fit result") -fig, _ = hpge_pars.plot_fit_result(t, A, x, y) +fig, _ = hpge_pars.plot_currmod_fit_result(t, A, x, y) decorate(fig) plt.savefig(plot_file) -dbetto.utils.write_dict(hpge_pars._curve_fit_popt_to_dict(popt), pars_file) +dbetto.utils.write_dict(utils._curve_fit_popt_to_dict(popt), pars_file) diff --git a/workflow/src/legendsimflow/utils.py b/workflow/src/legendsimflow/utils.py index 2671f46..69bd996 100644 --- a/workflow/src/legendsimflow/utils.py +++ b/workflow/src/legendsimflow/utils.py @@ -15,6 +15,7 @@ from __future__ import annotations +import inspect import os from datetime import datetime from pathlib import Path @@ -22,8 +23,12 @@ import legenddataflowscripts as ldfs from dbetto import AttrsDict from legendmeta import LegendMetadata +from lgdo import lh5 +from numpy.typing import ArrayLike +from reboost.hpge.psd import _current_pulse_model as current_pulse_model from . import SimflowConfig +from . import metadata as mutils def _merge_defaults(user: dict, default: dict) -> dict: @@ -90,3 +95,40 @@ def setup_logdir_link(config: SimflowConfig, proctime): if link.exists() or link.is_symlink(): link.unlink() link.symlink_to(proctime, target_is_directory=True) + + +# FIXME: this should be removed once the PRL25 data is reprocessed +def _get_lh5_table( + metadata: LegendMetadata, + fname: str | Path, + hpge: str, + tier: str, + runid: str, +) -> str: + """The correct LH5 table path. + + Determines the correct path to a `hpge` detector table in tier `tier`. + """ + # check if the latest format is available + path = f"{tier}/{hpge}" + if lh5.ls(fname, path) == [path]: + return path + + # otherwise fall back to the old format + timestamp = mutils.runinfo(metadata, runid).start_key + rawid = metadata.channelmap(timestamp)[hpge].daq.rawid + return f"ch{rawid}/{tier}" + + +def _curve_fit_popt_to_dict(popt: ArrayLike) -> dict: + """Get the ``scipy.curve_fit()`` parameter results as a dictionary""" + params = list(inspect.signature(current_pulse_model).parameters) + param_names = params[1:] + + popt_dict = dict(zip(param_names, popt, strict=True)) + popt_dict["mean_AoE"] = popt_dict["amax"] / 1593 + + for key, value in popt_dict.items(): + popt_dict[key] = float(f"{value:.3g}") + + return popt_dict From e4506cba35531028384d766acd9cd4cba23e9abf Mon Sep 17 00:00:00 2001 From: Luigi Pertoldi Date: Sat, 27 Dec 2025 18:23:58 +0100 Subject: [PATCH 05/15] fix: docs, macOS ci --- .github/workflows/main.yml | 5 +++++ docs/source/conf.py | 1 + workflow/src/legendsimflow/hpge_pars.py | 6 +++--- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index a27e58f..2485889 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -51,6 +51,11 @@ jobs: uses: actions/setup-python@v6 with: python-version: ${{ matrix.python-version }} + - name: Install non-python (homebrew) dependencies + if: ${{ matrix.os == 'macOS-latest' }} + run: | + brew update + brew install opencascade cgal gmp mpfr boost - name: Get dependencies and install legend-simflow run: | python -m pip install --upgrade pip wheel setuptools diff --git a/docs/source/conf.py b/docs/source/conf.py index 4f28e0b..ba00e70 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -79,6 +79,7 @@ "dbetto": ("https://dbetto.readthedocs.io/en/stable", None), "pylegendmeta": ("https://pylegendmeta.readthedocs.io/en/stable", None), "reboost": ("https://reboost.readthedocs.io/en/stable", None), + "scipy": ("https://docs.scipy.org/doc/scipy", None), } # add new intersphinx mappings here # sphinx-autodoc diff --git a/workflow/src/legendsimflow/hpge_pars.py b/workflow/src/legendsimflow/hpge_pars.py index 4885f90..bf09592 100644 --- a/workflow/src/legendsimflow/hpge_pars.py +++ b/workflow/src/legendsimflow/hpge_pars.py @@ -30,9 +30,9 @@ def lookup_currmod_fit_data( ) -> tuple[int, int]: """Extract the index of the event to fit. - Considers events with |A/E| < 1.5 and finds the one that is closest to the - median of the distribution. Returns the index of the event in the file and - the index of the file in the input file list. + Considers events with ``abs(A/E) < 1.5`` and finds the one that is closest + to the median of the distribution. Returns the index of the event in the + file and the index of the file in the input file list. Parameters ---------- From d061bbe4d55a22d3c09b338c3b99b2af1ad94da1 Mon Sep 17 00:00:00 2001 From: Luigi Pertoldi Date: Sat, 27 Dec 2025 18:23:58 +0100 Subject: [PATCH 06/15] fix: docs, macOS ci --- .github/workflows/main.yml | 5 +++++ docs/source/conf.py | 1 + workflow/src/legendsimflow/hpge_pars.py | 6 +++--- .../scripts/extract_hpge_current_pulse_model.py | 2 +- 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index a27e58f..2485889 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -51,6 +51,11 @@ jobs: uses: actions/setup-python@v6 with: python-version: ${{ matrix.python-version }} + - name: Install non-python (homebrew) dependencies + if: ${{ matrix.os == 'macOS-latest' }} + run: | + brew update + brew install opencascade cgal gmp mpfr boost - name: Get dependencies and install legend-simflow run: | python -m pip install --upgrade pip wheel setuptools diff --git a/docs/source/conf.py b/docs/source/conf.py index 4f28e0b..ba00e70 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -79,6 +79,7 @@ "dbetto": ("https://dbetto.readthedocs.io/en/stable", None), "pylegendmeta": ("https://pylegendmeta.readthedocs.io/en/stable", None), "reboost": ("https://reboost.readthedocs.io/en/stable", None), + "scipy": ("https://docs.scipy.org/doc/scipy", None), } # add new intersphinx mappings here # sphinx-autodoc diff --git a/workflow/src/legendsimflow/hpge_pars.py b/workflow/src/legendsimflow/hpge_pars.py index 4885f90..bf09592 100644 --- a/workflow/src/legendsimflow/hpge_pars.py +++ b/workflow/src/legendsimflow/hpge_pars.py @@ -30,9 +30,9 @@ def lookup_currmod_fit_data( ) -> tuple[int, int]: """Extract the index of the event to fit. - Considers events with |A/E| < 1.5 and finds the one that is closest to the - median of the distribution. Returns the index of the event in the file and - the index of the file in the input file list. + Considers events with ``abs(A/E) < 1.5`` and finds the one that is closest + to the median of the distribution. Returns the index of the event in the + file and the index of the file in the input file list. Parameters ---------- diff --git a/workflow/src/legendsimflow/scripts/extract_hpge_current_pulse_model.py b/workflow/src/legendsimflow/scripts/extract_hpge_current_pulse_model.py index a337eff..e5a859a 100644 --- a/workflow/src/legendsimflow/scripts/extract_hpge_current_pulse_model.py +++ b/workflow/src/legendsimflow/scripts/extract_hpge_current_pulse_model.py @@ -44,7 +44,7 @@ hit_tier_name, ) -lh5_group = hpge_pars.get_lh5_table( +lh5_group = utils._get_lh5_table( metadata, raw_file, hpge, From d817f2435899f44caa271a2dbbd7f85caf1e11fc Mon Sep 17 00:00:00 2001 From: Luigi Pertoldi Date: Sat, 27 Dec 2025 09:41:09 -0800 Subject: [PATCH 07/15] adjust currmod plot --- workflow/src/legendsimflow/hpge_pars.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/workflow/src/legendsimflow/hpge_pars.py b/workflow/src/legendsimflow/hpge_pars.py index bf09592..141ba40 100644 --- a/workflow/src/legendsimflow/hpge_pars.py +++ b/workflow/src/legendsimflow/hpge_pars.py @@ -201,10 +201,9 @@ def plot_currmod_fit_result( fig, ax = plt.subplots() ax.plot(t, A, linewidth=2, label="Waveform") - ax.plot(model_t, model_A, color="red", linewidth=2, label="Fit") + ax.plot(model_t, model_A, linewidth=1, label="Fit") ax.legend() - ax.set_xlim(-1200, 1200) ax.set_xlabel("Time [ns]") ax.set_ylabel("Current [ADC]") From 7351af9b9e2f8a532096f827ec681b8c02248de3 Mon Sep 17 00:00:00 2001 From: Luigi Pertoldi Date: Sun, 28 Dec 2025 11:34:46 +0100 Subject: [PATCH 08/15] rename currmod fit param mean_AoE -> mean_aoe --- workflow/src/legendsimflow/hpge_pars.py | 6 +++--- workflow/src/legendsimflow/utils.py | 19 +++++++++++++++++-- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/workflow/src/legendsimflow/hpge_pars.py b/workflow/src/legendsimflow/hpge_pars.py index 141ba40..07449ba 100644 --- a/workflow/src/legendsimflow/hpge_pars.py +++ b/workflow/src/legendsimflow/hpge_pars.py @@ -53,15 +53,15 @@ def lookup_currmod_fit_data( for file in hit_files: energy = lh5.read(f"{lh5_group}/cuspEmax_ctc_cal", file).view_as("np") - AoE = lh5.read(f"{lh5_group}/AoE_Classifier", file).view_as("np") + aoe = lh5.read(f"{lh5_group}/AoE_Classifier", file).view_as("np") # get drift time if possible if f"{lh5_group}/dt_eff" in lh5.ls(file): dt_eff = lh5.read(f"{lh5_group}/dt_eff", file).view_as("np") else: - dt_eff = np.zeros(len(AoE)) + dt_eff = np.zeros(len(aoe)) - idx = np.where((abs(energy - ewin_center) < ewin_width / 2) & (abs(AoE) < 1.5))[ + idx = np.where((abs(energy - ewin_center) < ewin_width / 2) & (abs(aoe) < 1.5))[ 0 ] idxs.append(idx) diff --git a/workflow/src/legendsimflow/utils.py b/workflow/src/legendsimflow/utils.py index 69bd996..d4805eb 100644 --- a/workflow/src/legendsimflow/utils.py +++ b/workflow/src/legendsimflow/utils.py @@ -43,6 +43,20 @@ def _merge_defaults(user: dict, default: dict) -> dict: def init_simflow_context(raw_config: dict, workflow) -> AttrsDict: + """Pre-process and sanitize the Simflow configuration. + + - set default configuration fields; + - substitute ``$_`` and environment variables; + - convert to :class:`~dbetto.attrsdict.AttrsDict`; + - cast filesystem paths to :class:`pathlib.Path`; + - clone and configure `legend-metadata`; + - attach a :class:`legendmeta.LegendMetadata` instance to the Simflow + configuration; + - export important environment variables. + + Returns a dictionary with useful objects to be used in the Simflow + Snakefiles (i.e. the "context"). + """ if not raw_config: msg = "you must set a config file with --configfile" raise RuntimeError(msg) @@ -87,6 +101,7 @@ def _make_path(d): def setup_logdir_link(config: SimflowConfig, proctime): + """Set up the timestamp-tagged directory for the workflow log files.""" logdir = Path(config.paths.log) logdir.mkdir(parents=True, exist_ok=True) @@ -121,12 +136,12 @@ def _get_lh5_table( def _curve_fit_popt_to_dict(popt: ArrayLike) -> dict: - """Get the ``scipy.curve_fit()`` parameter results as a dictionary""" + """Get the :func:`scipy.curve_fit` parameter results as a dictionary""" params = list(inspect.signature(current_pulse_model).parameters) param_names = params[1:] popt_dict = dict(zip(param_names, popt, strict=True)) - popt_dict["mean_AoE"] = popt_dict["amax"] / 1593 + popt_dict["mean_aoe"] = popt_dict["amax"] / 1593 for key, value in popt_dict.items(): popt_dict[key] = float(f"{value:.3g}") From e2e28113f448bde11853182dd163677cb5519ed4 Mon Sep 17 00:00:00 2001 From: Luigi Pertoldi Date: Sun, 28 Dec 2025 14:41:10 +0100 Subject: [PATCH 09/15] integrate A/E into hit.py --- workflow/src/legendsimflow/reboost.py | 16 +++---- .../src/legendsimflow/scripts/tier/hit.py | 47 ++++++++++++++++--- 2 files changed, 48 insertions(+), 15 deletions(-) diff --git a/workflow/src/legendsimflow/reboost.py b/workflow/src/legendsimflow/reboost.py index a645cbd..d4e998f 100644 --- a/workflow/src/legendsimflow/reboost.py +++ b/workflow/src/legendsimflow/reboost.py @@ -197,7 +197,7 @@ def get_remage_hit_range( return i_start, n_entries -def hpge_corrected_dt_heuristic( +def hpge_corrected_drift_time( chunk: ak.Array, dt_map: reboost.hpge.utils.HPGeScalarRZField, det_loc: pyg4ometry.gdml.Defines.Position, @@ -208,14 +208,14 @@ def hpge_corrected_dt_heuristic( ---- This function will be moved to :mod:`reboost`. """ - _phi = np.arctan2( + phi = np.arctan2( chunk.yloc * 1000 - det_loc.eval()[1], chunk.xloc * 1000 - det_loc.eval()[0], ) - _drift_time = {} + drift_time = {} for angle, _map in dt_map.items(): - _drift_time[angle] = reboost.hpge.psd.drift_time( + drift_time[angle] = reboost.hpge.psd.drift_time( chunk.xloc, chunk.yloc, chunk.zloc, @@ -223,13 +223,11 @@ def hpge_corrected_dt_heuristic( coord_offset=det_loc, ).view_as("ak") - _drift_time_corr = ( - _drift_time["045"] - + (_drift_time["000"] - _drift_time["045"]) * (1 - np.cos(4 * _phi)) / 2 + return ( + drift_time["045"] + + (drift_time["000"] - drift_time["045"]) * (1 - np.cos(4 * phi)) / 2 ) - return reboost.hpge.psd.drift_time_heuristic(_drift_time_corr, chunk.edep) - def build_tcm(hit_file: str | Path) -> None: """Re-create the TCM table from remage. diff --git a/workflow/src/legendsimflow/scripts/tier/hit.py b/workflow/src/legendsimflow/scripts/tier/hit.py index 80e9c18..2ad4305 100644 --- a/workflow/src/legendsimflow/scripts/tier/hit.py +++ b/workflow/src/legendsimflow/scripts/tier/hit.py @@ -33,7 +33,7 @@ from lgdo.lh5 import LH5Iterator from legendsimflow import metadata as mutils -from legendsimflow import nersc, partitioning +from legendsimflow import nersc, partitioning, patterns from legendsimflow import reboost as reboost_utils args = nersc.dvs_ro_snakemake(snakemake) # noqa: F821 @@ -45,10 +45,10 @@ log_file = args.log[0] metadata = args.config.metadata hpge_dtmap_files = args.input.hpge_dtmaps +hpge_currmods_files = args.input.hpge_currmods simstat_part_file = args.input.simstat_part_file buffer_len = args.params.buffer_len - # setup logging log = ldfs.utils.build_log(metadata.simprod.config.logging, log_file) @@ -64,6 +64,13 @@ msg = f"processing partition corresponding to {runid}, event range {evt_idx_range}" log.info(msg) + # load in parameters of the HPGe current signal model + currmod_pars_file = patterns.output_currmod_merged_filename( + snakemake.config, # noqa: F821 + runid=runid, + ) + currmod_pars_all = dbetto.utils.load_dict(currmod_pars_file) + # loop over the sensitive volumes registered in the geometry for det_name, geom_meta in sensvols.items(): msg = f"looking for data from sensitive volume {det_name} (uid={geom_meta.uid})..." @@ -105,8 +112,11 @@ fccd = mutils.get_sanitized_fccd(metadata, det_name) det_loc = geom.physicalVolumeDict[det_name].position + # NOTE: we don't use the script arg but we use the (known) file patterns. more robust dt_map = reboost_utils.load_hpge_dtmaps(snakemake.config, det_name, runid) # noqa: F821 + currmod_pars = currmod_pars_all.get(det_name, None) + # iterate over input data for lgdo_chunk in iterator: chunk = lgdo_chunk.view_as("ak") @@ -128,20 +138,45 @@ dlf=0.2, ) - energy = ak.sum(chunk.edep * _activeness, axis=-1) + edep_active = chunk.edep * _activeness + energy = ak.sum(edep_active, axis=-1) + + # default to NaN for some PSD fields + dt_heuristic = np.full(len(chunk), np.nan) + aoe = np.full(len(chunk), np.nan) if dt_map is not None: - rdt_heuristic = reboost_utils.hpge_corrected_dt_heuristic( + drift_time = reboost_utils.hpge_corrected_drift_time( chunk, dt_map, det_loc ) - else: - dt_heuristic = np.full(len(chunk), np.nan) + dt_heuristic = reboost.hpge.psd.drift_time_heuristic( + drift_time, chunk.edep + ) + + if currmod_pars is not None: + # current pulse template domain in ns (step is 1 ns) + t_domain = {"low": -1000, "high": 4000, "step": 1} + + # instantiate the template + a_tmpl = reboost.hpge.psd.get_current_template( + **t_domain, + **currmod_pars, + ) + # and calculate A/E + a_max = reboost.hpge.psd.maximum_current( + chunk.edep_active, + drift_time, + template=a_tmpl, + times=np.arange(t_domain["low"], t_domain["high"]), + ) + aoe = a_max / energy out_table = reboost_utils.make_output_chunk(lgdo_chunk) out_table.add_field( "energy", lgdo.Array(energy, attrs={"units": "keV"}) ) out_table.add_field("drift_time_heuristic", lgdo.Array(dt_heuristic)) + out_table.add_field("aoe", lgdo.Array(aoe)) partitioning.add_field_runid(out_table, runid) From 9bba6a4606ffe888482483e8abba0d537e7fd0f9 Mon Sep 17 00:00:00 2001 From: Luigi Pertoldi Date: Sun, 28 Dec 2025 14:49:10 +0100 Subject: [PATCH 10/15] rename function that aggregates hpges good for modeling --- tests/test_aggregate.py | 2 +- workflow/src/legendsimflow/aggregate.py | 14 ++++++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/tests/test_aggregate.py b/tests/test_aggregate.py index 0f9881f..1d4eb2a 100644 --- a/tests/test_aggregate.py +++ b/tests/test_aggregate.py @@ -62,7 +62,7 @@ def test_dtmap_stuff(config): assert agg.start_key(config, "l200-p02-r005-phy") == "20220602T000000Z" - assert agg.gen_list_of_hpges_valid_for_dtmap(config, "l200-p02-r005-phy") == [ + assert agg.gen_list_of_hpges_valid_for_modeling(config, "l200-p02-r005-phy") == [ "V99000A" ] diff --git a/workflow/src/legendsimflow/aggregate.py b/workflow/src/legendsimflow/aggregate.py index 2de3ea4..d02c2b7 100644 --- a/workflow/src/legendsimflow/aggregate.py +++ b/workflow/src/legendsimflow/aggregate.py @@ -126,12 +126,14 @@ def start_key(config: SimflowConfig, runid: str) -> str: return config.metadata.datasets.runinfo[period][run][datatype].start_key -def gen_list_of_hpges_valid_for_dtmap(config: SimflowConfig, runid: str) -> list[str]: - """Make a sorted list of HPGe detector for which we want to generate a drift time map. +def gen_list_of_hpges_valid_for_modeling( + config: SimflowConfig, runid: str +) -> list[str]: + """Make a sorted list of HPGe detectors for which we want to compute a model. It generates the list of deployed detectors in `runid` via the LEGEND channelmap, then checks if in the crystal metadata there's all the - information required to generate a drift time map. + information required to generate a drift time map etc. """ chmap = config.metadata.hardware.configuration.channelmaps.on( start_key(config, runid) @@ -175,7 +177,7 @@ def gen_list_of_all_runids(config): def gen_list_of_dtmaps(config: SimflowConfig, runid: str) -> list[str]: """Generate the list of HPGe drift time map files for a `runid`.""" - hpges = gen_list_of_hpges_valid_for_dtmap(config, runid) + hpges = gen_list_of_hpges_valid_for_modeling(config, runid) return [ patterns.output_dtmap_filename(config, hpge_detector=hpge, runid=runid) for hpge in hpges @@ -194,7 +196,7 @@ def gen_list_of_dtmap_plots_outputs(config: SimflowConfig, simid: str) -> set[st """Generate the list of HPGe drift time map plot outputs.""" files = set() for runid in get_runlist(config, simid): - for hpge in gen_list_of_hpges_valid_for_dtmap(config, runid): + for hpge in gen_list_of_hpges_valid_for_modeling(config, runid): files.add( patterns.plot_dtmap_filename(config, hpge_detector=hpge, runid=runid) ) @@ -213,7 +215,7 @@ def gen_list_of_all_dtmap_plots_outputs(config: SimflowConfig) -> set[str]: def gen_list_of_currmods(config: SimflowConfig, runid: str) -> list[str]: """Generate the list of HPGe current model parameter files for a `runid`.""" - hpges = gen_list_of_hpges_valid_for_dtmap(config, runid) + hpges = gen_list_of_hpges_valid_for_modeling(config, runid) return [ patterns.output_currmod_filename(config, hpge_detector=hpge, runid=runid) for hpge in hpges From b3e7c2294b8c4bfdb170cb656fdf3bcb1c14b4cd Mon Sep 17 00:00:00 2001 From: Luigi Pertoldi Date: Sun, 28 Dec 2025 14:49:41 +0100 Subject: [PATCH 11/15] we don't want to model off or ac hpge detectors --- workflow/src/legendsimflow/aggregate.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/workflow/src/legendsimflow/aggregate.py b/workflow/src/legendsimflow/aggregate.py index d02c2b7..33bae43 100644 --- a/workflow/src/legendsimflow/aggregate.py +++ b/workflow/src/legendsimflow/aggregate.py @@ -135,9 +135,10 @@ def gen_list_of_hpges_valid_for_modeling( channelmap, then checks if in the crystal metadata there's all the information required to generate a drift time map etc. """ - chmap = config.metadata.hardware.configuration.channelmaps.on( - start_key(config, runid) - ) + timestamp = start_key(config, runid) + metadata = config.metadata + chmap = metadata.hardware.configuration.channelmaps.on(timestamp) + statuses = metadata.datasets.statuses.on(timestamp) hpges = [] for _, hpge in chmap.group("system").geds.items(): @@ -145,8 +146,12 @@ def gen_list_of_hpges_valid_for_modeling( if hpge.name in ("V00050A", "V13046A", "V00048B", "V14654A"): continue + # we don't model detectors that are OFF or AC + if statuses[hpge.name].usability != "on": + continue + m = crystal_meta( - config, config.metadata.hardware.detectors.germanium.diodes[hpge.name] + config, metadata.hardware.detectors.germanium.diodes[hpge.name] ) if m is not None: From 9b3dcd29ebf81ecc6fdc3fe8c16ab0e1abf51e5c Mon Sep 17 00:00:00 2001 From: Luigi Pertoldi Date: Sun, 28 Dec 2025 15:06:15 +0100 Subject: [PATCH 12/15] add currmod plot targets --- workflow/src/legendsimflow/aggregate.py | 26 ++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/workflow/src/legendsimflow/aggregate.py b/workflow/src/legendsimflow/aggregate.py index 33bae43..3c64d5c 100644 --- a/workflow/src/legendsimflow/aggregate.py +++ b/workflow/src/legendsimflow/aggregate.py @@ -67,7 +67,10 @@ def gen_list_of_simid_outputs( def gen_list_of_plots_outputs(config: SimflowConfig, tier: str, simid: str): """Generate the list of plots files for a `tier.simid`.""" if tier == "hit": - return gen_list_of_dtmap_plots_outputs(config, simid) + return [ + *gen_list_of_dtmap_plots_outputs(config, simid), + *gen_list_of_currmod_plots_outputs(config, simid), + ] if tier == "stp": return [patterns.plot_tier_stp_vertices_filename(config, simid=simid)] return [] @@ -235,6 +238,27 @@ def gen_list_of_merged_currmods(config: SimflowConfig, simid: str) -> list[str]: ] +def gen_list_of_currmod_plots_outputs(config: SimflowConfig, simid: str) -> set[str]: + """Generate the list of HPGe drift time map plot outputs.""" + files = set() + for runid in get_runlist(config, simid): + for hpge in gen_list_of_hpges_valid_for_modeling(config, runid): + files.add( + patterns.plot_currmod_filename(config, hpge_detector=hpge, runid=runid) + ) + + return files + + +def gen_list_of_all_currmod_plots_outputs(config: SimflowConfig) -> set[str]: + """Generate the list of HPGe drift time map plot outputs.""" + files = set() + for simid in gen_list_of_all_simids(config): + files.update(gen_list_of_currmod_plots_outputs(config, simid)) + + return files + + def process_simlist( config: SimflowConfig, simlist: Iterable[str] | None = None ) -> list[Path]: From 3f1adc4e01300c5d54fb071e0d0ce8fa678fc2db Mon Sep 17 00:00:00 2001 From: Luigi Pertoldi Date: Sun, 28 Dec 2025 15:12:19 +0100 Subject: [PATCH 13/15] docs update --- docs/source/manual/setup.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/source/manual/setup.md b/docs/source/manual/setup.md index ad3dcb0..988c99f 100644 --- a/docs/source/manual/setup.md +++ b/docs/source/manual/setup.md @@ -23,6 +23,9 @@ the workflow in great detail. Here's a basic description of its fields: - `n_primaries`: number of primary events to be simulated in the lower tiers `ver` and `raw`. - `paths`: customize paths to input or output files. + - `l200data`: path to a LEGEND-200 data production folder (e.g. + `<...>/public/prodenv/prod-blind/ref-v1.0.0`) used to extract production + parameters (e.g. energy resolution) :::{tip} From d70991cd28bc2b633b6d9d931719cb93c4b9a25e Mon Sep 17 00:00:00 2001 From: Luigi Pertoldi Date: Sun, 28 Dec 2025 16:03:04 +0100 Subject: [PATCH 14/15] move a max calculation to a separate function --- workflow/src/legendsimflow/reboost.py | 39 +++++++++++++++++++ .../src/legendsimflow/scripts/tier/hit.py | 33 ++++++---------- 2 files changed, 51 insertions(+), 21 deletions(-) diff --git a/workflow/src/legendsimflow/reboost.py b/workflow/src/legendsimflow/reboost.py index d4e998f..e83db6f 100644 --- a/workflow/src/legendsimflow/reboost.py +++ b/workflow/src/legendsimflow/reboost.py @@ -15,6 +15,7 @@ from __future__ import annotations import logging +from collections.abc import Mapping from pathlib import Path import awkward as ak @@ -229,6 +230,44 @@ def hpge_corrected_drift_time( ) +def hpge_max_current_cal( + edep: ak.Array, + drift_time: ak.Array, + currmod_pars: Mapping, +) -> ak.Array: + """Calculate the maximum of the current pulse. + + Parameters + ---------- + edep + energy deposited at each step. + drift_time + drift time of each energy deposit. + currmod_pars + dictionary storing the parameters of the current model (see + :func:`reboost.hpge.psd.get_current_template`) + """ + # current pulse template domain in ns (step is 1 ns) + t_domain = {"low": -1000, "high": 4000, "step": 1} + + # set the maximum of the template to unity, so the A/E will be + # already "calibrated" + currmod_pars["mean_aoe"] = 1 + + # instantiate the template + a_tmpl = reboost.hpge.psd.get_current_template( + **t_domain, + **currmod_pars, + ) + # and calculate the maximum current + return reboost.hpge.psd.maximum_current( + edep, + drift_time, + template=a_tmpl, + times=np.arange(t_domain["low"], t_domain["high"]), + ) + + def build_tcm(hit_file: str | Path) -> None: """Re-create the TCM table from remage. diff --git a/workflow/src/legendsimflow/scripts/tier/hit.py b/workflow/src/legendsimflow/scripts/tier/hit.py index 2ad4305..fa9732e 100644 --- a/workflow/src/legendsimflow/scripts/tier/hit.py +++ b/workflow/src/legendsimflow/scripts/tier/hit.py @@ -115,6 +115,7 @@ # NOTE: we don't use the script arg but we use the (known) file patterns. more robust dt_map = reboost_utils.load_hpge_dtmaps(snakemake.config, det_name, runid) # noqa: F821 + # load parameters of the current model currmod_pars = currmod_pars_all.get(det_name, None) # iterate over input data @@ -141,35 +142,25 @@ edep_active = chunk.edep * _activeness energy = ak.sum(edep_active, axis=-1) - # default to NaN for some PSD fields + # PSD: if the drift time map is none, it means that we don't + # have the detector model to simulate PSD in a more advanced + # way + + # default to NaN dt_heuristic = np.full(len(chunk), np.nan) aoe = np.full(len(chunk), np.nan) if dt_map is not None: - drift_time = reboost_utils.hpge_corrected_drift_time( + _drift_time = reboost_utils.hpge_corrected_drift_time( chunk, dt_map, det_loc ) dt_heuristic = reboost.hpge.psd.drift_time_heuristic( - drift_time, chunk.edep + _drift_time, chunk.edep ) - - if currmod_pars is not None: - # current pulse template domain in ns (step is 1 ns) - t_domain = {"low": -1000, "high": 4000, "step": 1} - - # instantiate the template - a_tmpl = reboost.hpge.psd.get_current_template( - **t_domain, - **currmod_pars, - ) - # and calculate A/E - a_max = reboost.hpge.psd.maximum_current( - chunk.edep_active, - drift_time, - template=a_tmpl, - times=np.arange(t_domain["low"], t_domain["high"]), - ) - aoe = a_max / energy + _a_max = reboost_utils.hpge_max_current_cal( + edep_active, _drift_time, currmod_pars + ) + aoe = _a_max / energy out_table = reboost_utils.make_output_chunk(lgdo_chunk) out_table.add_field( From 9da5e380b0bc461e872a1514a0a4e414eab79c59 Mon Sep 17 00:00:00 2001 From: Luigi Pertoldi Date: Sun, 28 Dec 2025 16:05:06 +0100 Subject: [PATCH 15/15] code cosmetics --- .../src/legendsimflow/scripts/tier/hit.py | 140 +++++++++--------- 1 file changed, 70 insertions(+), 70 deletions(-) diff --git a/workflow/src/legendsimflow/scripts/tier/hit.py b/workflow/src/legendsimflow/scripts/tier/hit.py index fa9732e..d1867de 100644 --- a/workflow/src/legendsimflow/scripts/tier/hit.py +++ b/workflow/src/legendsimflow/scripts/tier/hit.py @@ -100,83 +100,83 @@ buffer_len=buffer_len, ) - # process the HPGe output - if geom_meta.detector_type == "germanium": - msg = f"processing the {det_name} output table..." - log.info(msg) - - log.debug("creating an pygeomhpges.HPGe object") - pyobj = pygeomhpges.make_hpge( - geom_meta.metadata, registry=None, allow_cylindrical_asymmetry=False + # only process the HPGe output + if geom_meta.detector_type != "germanium": + continue + + msg = f"processing the {det_name} output table..." + log.info(msg) + + log.debug("creating an pygeomhpges.HPGe object") + pyobj = pygeomhpges.make_hpge( + geom_meta.metadata, registry=None, allow_cylindrical_asymmetry=False + ) + + fccd = mutils.get_sanitized_fccd(metadata, det_name) + det_loc = geom.physicalVolumeDict[det_name].position + # NOTE: we don't use the script arg but we use the (known) file patterns. more robust + dt_map = reboost_utils.load_hpge_dtmaps(snakemake.config, det_name, runid) # noqa: F821 + + # load parameters of the current model + currmod_pars = currmod_pars_all.get(det_name, None) + + # iterate over input data + for lgdo_chunk in iterator: + chunk = lgdo_chunk.view_as("ak") + + _distance_to_nplus = reboost.hpge.surface.distance_to_surface( + chunk.xloc * 1000, # mm + chunk.yloc * 1000, # mm + chunk.zloc * 1000, # mm + pyobj, + det_loc.eval(), + distances_precompute=chunk.dist_to_surf * 1000, + precompute_cutoff=(fccd + 1), + surface_type="nplus", ) - fccd = mutils.get_sanitized_fccd(metadata, det_name) - det_loc = geom.physicalVolumeDict[det_name].position - # NOTE: we don't use the script arg but we use the (known) file patterns. more robust - dt_map = reboost_utils.load_hpge_dtmaps(snakemake.config, det_name, runid) # noqa: F821 - - # load parameters of the current model - currmod_pars = currmod_pars_all.get(det_name, None) - - # iterate over input data - for lgdo_chunk in iterator: - chunk = lgdo_chunk.view_as("ak") - - _distance_to_nplus = reboost.hpge.surface.distance_to_surface( - chunk.xloc * 1000, # mm - chunk.yloc * 1000, # mm - chunk.zloc * 1000, # mm - pyobj, - det_loc.eval(), - distances_precompute=chunk.dist_to_surf * 1000, - precompute_cutoff=(fccd + 1), - surface_type="nplus", - ) + _activeness = reboost.math.functions.piecewise_linear_activeness( + _distance_to_nplus, + fccd=fccd, + dlf=0.2, + ) - _activeness = reboost.math.functions.piecewise_linear_activeness( - _distance_to_nplus, - fccd=fccd, - dlf=0.2, - ) + edep_active = chunk.edep * _activeness + energy = ak.sum(edep_active, axis=-1) - edep_active = chunk.edep * _activeness - energy = ak.sum(edep_active, axis=-1) - - # PSD: if the drift time map is none, it means that we don't - # have the detector model to simulate PSD in a more advanced - # way - - # default to NaN - dt_heuristic = np.full(len(chunk), np.nan) - aoe = np.full(len(chunk), np.nan) - - if dt_map is not None: - _drift_time = reboost_utils.hpge_corrected_drift_time( - chunk, dt_map, det_loc - ) - dt_heuristic = reboost.hpge.psd.drift_time_heuristic( - _drift_time, chunk.edep - ) - _a_max = reboost_utils.hpge_max_current_cal( - edep_active, _drift_time, currmod_pars - ) - aoe = _a_max / energy - - out_table = reboost_utils.make_output_chunk(lgdo_chunk) - out_table.add_field( - "energy", lgdo.Array(energy, attrs={"units": "keV"}) - ) - out_table.add_field("drift_time_heuristic", lgdo.Array(dt_heuristic)) - out_table.add_field("aoe", lgdo.Array(aoe)) + # PSD: if the drift time map is none, it means that we don't + # have the detector model to simulate PSD in a more advanced + # way - partitioning.add_field_runid(out_table, runid) + # default to NaN + dt_heuristic = np.full(len(chunk), np.nan) + aoe = np.full(len(chunk), np.nan) - reboost_utils.write_chunk( - out_table, - f"/hit/{det_name}", - hit_file, - geom_meta.uid, + if dt_map is not None: + _drift_time = reboost_utils.hpge_corrected_drift_time( + chunk, dt_map, det_loc + ) + dt_heuristic = reboost.hpge.psd.drift_time_heuristic( + _drift_time, chunk.edep ) + _a_max = reboost_utils.hpge_max_current_cal( + edep_active, _drift_time, currmod_pars + ) + aoe = _a_max / energy + + out_table = reboost_utils.make_output_chunk(lgdo_chunk) + out_table.add_field("energy", lgdo.Array(energy, attrs={"units": "keV"})) + out_table.add_field("drift_time_heuristic", lgdo.Array(dt_heuristic)) + out_table.add_field("aoe", lgdo.Array(aoe)) + + partitioning.add_field_runid(out_table, runid) + + reboost_utils.write_chunk( + out_table, + f"/hit/{det_name}", + hit_file, + geom_meta.uid, + ) log.debug("building the TCM") reboost_utils.build_tcm(hit_file)