diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index a27e58f..2485889 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -51,6 +51,11 @@ jobs: uses: actions/setup-python@v6 with: python-version: ${{ matrix.python-version }} + - name: Install non-python (homebrew) dependencies + if: ${{ matrix.os == 'macOS-latest' }} + run: | + brew update + brew install opencascade cgal gmp mpfr boost - name: Get dependencies and install legend-simflow run: | python -m pip install --upgrade pip wheel setuptools diff --git a/docs/source/conf.py b/docs/source/conf.py index 4f28e0b..ba00e70 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -79,6 +79,7 @@ "dbetto": ("https://dbetto.readthedocs.io/en/stable", None), "pylegendmeta": ("https://pylegendmeta.readthedocs.io/en/stable", None), "reboost": ("https://reboost.readthedocs.io/en/stable", None), + "scipy": ("https://docs.scipy.org/doc/scipy", None), } # add new intersphinx mappings here # sphinx-autodoc diff --git a/docs/source/manual/setup.md b/docs/source/manual/setup.md index ad3dcb0..988c99f 100644 --- a/docs/source/manual/setup.md +++ b/docs/source/manual/setup.md @@ -23,6 +23,9 @@ the workflow in great detail. Here's a basic description of its fields: - `n_primaries`: number of primary events to be simulated in the lower tiers `ver` and `raw`. - `paths`: customize paths to input or output files. + - `l200data`: path to a LEGEND-200 data production folder (e.g. + `<...>/public/prodenv/prod-blind/ref-v1.0.0`) used to extract production + parameters (e.g. energy resolution) :::{tip} diff --git a/simflow-config.yaml b/simflow-config.yaml index 1c06e6b..ff61e0a 100644 --- a/simflow-config.yaml +++ b/simflow-config.yaml @@ -27,6 +27,7 @@ benchmark: stp: 10_000 paths: + l200data: <...>/public/prodenv/prod-blind/ref-v1.0.0 benchmarks: $_/generated/benchmarks log: $_/generated/log diff --git a/tests/conftest.py b/tests/conftest.py index 9627d07..cb1476c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -17,7 +17,7 @@ @pytest.fixture(scope="session") def legend_testdata(): ldata = LegendTestData() - ldata.checkout("8247690") + ldata.checkout("c564b05") return ldata diff --git a/tests/test_aggregate.py b/tests/test_aggregate.py index 0f9881f..1d4eb2a 100644 --- a/tests/test_aggregate.py +++ b/tests/test_aggregate.py @@ -62,7 +62,7 @@ def test_dtmap_stuff(config): assert agg.start_key(config, "l200-p02-r005-phy") == "20220602T000000Z" - assert agg.gen_list_of_hpges_valid_for_dtmap(config, "l200-p02-r005-phy") == [ + assert agg.gen_list_of_hpges_valid_for_modeling(config, "l200-p02-r005-phy") == [ "V99000A" ] diff --git a/tests/test_hpge_pars.py b/tests/test_hpge_pars.py new file mode 100644 index 0000000..f9e72d0 --- /dev/null +++ b/tests/test_hpge_pars.py @@ -0,0 +1,74 @@ +from __future__ import annotations + +from pathlib import Path + +import numpy as np +from lgdo import lh5 +from matplotlib.axes import Axes +from matplotlib.figure import Figure +from scipy.stats import norm + +from legendsimflow import hpge_pars + + +def test_fit(): + t = np.linspace(-1000, 2000, 3001) + A = norm.pdf(t, loc=0, scale=50) + + res = hpge_pars.fit_currmod(t, A) + + assert isinstance(res[0], np.ndarray) + assert isinstance(res[1], np.ndarray) + assert isinstance(res[2], np.ndarray) + + +def test_get_index(legend_testdata): + ref_path = legend_testdata.get_path("lh5/prod-ref-l200/") + path = ref_path / Path("generated/tier/hit/cal/p03/r001") + files = [str(p) for p in path.glob("*")] + + # we have to be very generous with the (low stats) test file + idx, file_idx = hpge_pars.lookup_currmod_fit_data( + files, "ch1084803/hit", ewin_center=100, ewin_width=20 + ) + + assert idx > -1 + assert file_idx > -1 + + # now read back in and check + + energy = lh5.read( + "ch1084803/hit/cuspEmax_ctc_cal", files[file_idx], idx=[idx] + ).view_as("np") + AoE = lh5.read("ch1084803/hit/AoE_Classifier", files[file_idx], idx=[idx]).view_as( + "np" + ) + + assert (energy > 90) & (energy < 110) + assert abs(AoE) < 1.5 + + +def test_get_waveform(legend_testdata): + times, wf = hpge_pars.get_current_pulse( + legend_testdata.get_path( + "lh5/prod-ref-l200/generated/tier/raw/cal/p03/r001/l200-p03-r001-cal-20230318T012144Z-tier_raw.lh5" + ), + "ch1084803/raw", + idx=0, + dsp_config=None, + dsp_output="waveform", + align=None, + ) + + assert isinstance(times, np.ndarray) + assert isinstance(wf, np.ndarray) + assert len(times) == len(wf) + + +def test_plot(): + fig, ax = hpge_pars.plot_currmod_fit_result( + [1, 2, 3], [0, 10, 20], np.linspace(0, 3, 1000), np.linspace(0, 3, 1000) + ) + + assert isinstance(fig, Figure) + assert isinstance(ax, Axes) diff --git a/tests/test_metadata.py b/tests/test_metadata.py index 8cd41cd..90f7907 100644 --- a/tests/test_metadata.py +++ b/tests/test_metadata.py @@ -55,3 +55,16 @@ def test_run_stuff(config): assert metadata.get_runlist(config, "sis1_z8224_src1_Ra224_to_Pb208") == [ f"l200-p02-r00{r}-phy" for r in range(8) ] + + assert ( + metadata.reference_cal_run(config.metadata, "l200-p16-r006-phy") + == "l200-p16-r006-cal" + ) + assert ( + metadata.reference_cal_run(config.metadata, "l200-p16-r008-ssc") + == "l200-p16-r007-cal" + ) + assert ( + metadata.reference_cal_run(config.metadata, "l200-p16-r009-ssc") + == "l200-p16-r007-cal" + ) diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..ef92d5b --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,10 @@ +from __future__ import annotations + +from legendsimflow import utils + + +def test_get_parameter_dict(): + popt = [100, 10, 60, 0.6, 100, 0.2, 60] + + popt_dict = utils._curve_fit_popt_to_dict(popt) + assert len(popt_dict) == 8 diff --git a/workflow/rules/hit.smk b/workflow/rules/hit.smk index 3ba5fe7..76c8949 100644 --- a/workflow/rules/hit.smk +++ b/workflow/rules/hit.smk @@ -1,6 +1,8 @@ +import re + from dbetto.utils import load_dict -from legendsimflow import patterns, aggregate +from legendsimflow import patterns, aggregate, hpge_pars from legendsimflow import metadata as mutils from legendsimflow import nersc @@ -68,9 +70,10 @@ rule build_tier_hit: input: geom=patterns.geom_gdml_filename(config, tier="stp"), stp_file=patterns.output_simjob_filename(config, tier="stp"), - # NOTE: we pass here the full list of maps, but likely not all of them - # will be used. room for improvement + # NOTE: we pass here the full list of maps/current models, but likely + # not all of them will be used. room for improvement hpge_dtmaps=lambda wc: aggregate.gen_list_of_merged_dtmaps(config, wc.simid), + hpge_currmods=lambda wc: aggregate.gen_list_of_merged_currmods(config, wc.simid), # NOTE: technically this rule only depends on one block in the # partitioning file, but in practice the full file will always change simstat_part_file=rules.make_simstat_partition_file.output[0], @@ -87,6 +90,7 @@ rule build_tier_hit: def smk_hpge_drift_time_map_inputs(wildcards): + """Prepare inputs for the HPGe drift time map rule.""" meta = config.metadata.hardware.detectors.germanium.diodes[wildcards.hpge_detector] ids = {"bege": "B", "coax": "C", "ppc": "P", "icpc": "V"} crystal_name = ( @@ -197,3 +201,85 @@ rule merge_hpge_drift_time_maps: done done """ + + +def smk_extract_current_pulse_model_inputs(wildcards): + """Prepare inputs for the HPGe current model extraction rule.""" + raw_file, wf_idx, dsp_cfg_file = hpge_pars.find_current_pulse_model_inputs( + config, + wildcards.runid, + wildcards.hpge_detector, + hit_tier_name="hit", + use_hpge_name="true", + ) + + evt_idx_file = patterns.input_currmod_evt_idx_file( + config, runid=wildcards.runid, hpge_detector=wildcards.hpge_detector + ) + + with evt_idx_file.open() as f: + f.write(wf_idx) + + return { + "raw_file": raw_file, + "raw_wf_idx_file": evt_idx_file, + "dsp_cfg_file": dsp_cfg_file, + } + + +rule extract_current_pulse_model: + """Extract the HPGe current signal model. + + Perform a fit of the current waveform and stores the best-fit model + parameters in a YAML file. + + :::{warning} + This rule does not have the relevant LEGEND-200 data files as input, since + they are dynamically discovered and this would therefore slow down the DAG + generation. Therefore, remember to force-rerun if the input data is + updated! + ::: + + Uses wildcards `runid` and `hpge_detector`. + """ + message: + "Extracting current model for detector {wildcards.hpge_detector} in {wildcards.runid}" + # NOTE: we don't list the file dependencies here because they are + # dynamically generated, and that would slow down the DAG generation + # input: + # unpack(smk_extract_current_pulse_model_inputs), + params: + # evt_idx=parse_input(input.raw_wf_idx_file, parser=mutils.extract_integer), + hit_tier_name="hit", + output: + pars_file=temp(patterns.output_currmod_filename(config)), + plot_file=patterns.plot_currmod_filename(config), + log: + patterns.log_currmod_filename(config, SIMFLOW_CONTEXT.proctime), + script: + "../src/legendsimflow/scripts/extract_hpge_current_pulse_model.py" + + +rule merge_current_pulse_model_pars: + """Merge the HPGe current signal model parameters in a single file per `runid`. + + Uses wildcard `runid`. + """ + message: + "Merging current model parameters in {wildcards.runid}" + input: + lambda wc: aggregate.gen_list_of_currmods(config, wc.runid), + output: + patterns.output_currmod_merged_filename(config), + run: + import dbetto + from legendsimflow.aggregate import gen_list_of_hpges_valid_for_dtmap + + # NOTE: this is guaranteed to be sorted as in the input file list + hpges = gen_list_of_hpges_valid_for_dtmap(config, wildcards.runid) + + out_dict = {} + for i, f in enumerate(input): + out_dict[hpges[i]] = dbetto.utils.load_dict(f) + + dbetto.utils.write_dict(out_dict, output[0]) diff --git a/workflow/src/legendsimflow/aggregate.py b/workflow/src/legendsimflow/aggregate.py index f08d6e9..3c64d5c 100644 --- a/workflow/src/legendsimflow/aggregate.py +++ b/workflow/src/legendsimflow/aggregate.py @@ -67,7 +67,10 @@ def gen_list_of_simid_outputs( def gen_list_of_plots_outputs(config: SimflowConfig, tier: str, simid: str): """Generate the list of plots files for a `tier.simid`.""" if tier == "hit": - return gen_list_of_dtmap_plots_outputs(config, simid) + return [ + *gen_list_of_dtmap_plots_outputs(config, simid), + *gen_list_of_currmod_plots_outputs(config, simid), + ] if tier == "stp": return [patterns.plot_tier_stp_vertices_filename(config, simid=simid)] return [] @@ -126,21 +129,32 @@ def start_key(config: SimflowConfig, runid: str) -> str: return config.metadata.datasets.runinfo[period][run][datatype].start_key -def gen_list_of_hpges_valid_for_dtmap(config: SimflowConfig, runid: str) -> list[str]: - """Make a list of HPGe detector for which we want to generate a drift time map. +def gen_list_of_hpges_valid_for_modeling( + config: SimflowConfig, runid: str +) -> list[str]: + """Make a sorted list of HPGe detectors for which we want to compute a model. It generates the list of deployed detectors in `runid` via the LEGEND channelmap, then checks if in the crystal metadata there's all the - information required to generate a drift time map. + information required to generate a drift time map etc. """ - chmap = config.metadata.hardware.configuration.channelmaps.on( - start_key(config, runid) - ) + timestamp = start_key(config, runid) + metadata = config.metadata + chmap = metadata.hardware.configuration.channelmaps.on(timestamp) + statuses = metadata.datasets.statuses.on(timestamp) hpges = [] for _, hpge in chmap.group("system").geds.items(): + # TEMPORARY HACK + if hpge.name in ("V00050A", "V13046A", "V00048B", "V14654A"): + continue + + # we don't model detectors that are OFF or AC + if statuses[hpge.name].usability != "on": + continue + m = crystal_meta( - config, config.metadata.hardware.detectors.germanium.diodes[hpge.name] + config, metadata.hardware.detectors.germanium.diodes[hpge.name] ) if m is not None: @@ -157,7 +171,7 @@ def gen_list_of_hpges_valid_for_dtmap(config: SimflowConfig, runid: str) -> list msg = f"the list of HPGes valid for drift time map generation in {runid} is empty!" log.warning(msg) - return hpges + return sorted(hpges) def gen_list_of_all_runids(config): @@ -171,7 +185,7 @@ def gen_list_of_all_runids(config): def gen_list_of_dtmaps(config: SimflowConfig, runid: str) -> list[str]: """Generate the list of HPGe drift time map files for a `runid`.""" - hpges = gen_list_of_hpges_valid_for_dtmap(config, runid) + hpges = gen_list_of_hpges_valid_for_modeling(config, runid) return [ patterns.output_dtmap_filename(config, hpge_detector=hpge, runid=runid) for hpge in hpges @@ -190,7 +204,7 @@ def gen_list_of_dtmap_plots_outputs(config: SimflowConfig, simid: str) -> set[st """Generate the list of HPGe drift time map plot outputs.""" files = set() for runid in get_runlist(config, simid): - for hpge in gen_list_of_hpges_valid_for_dtmap(config, runid): + for hpge in gen_list_of_hpges_valid_for_modeling(config, runid): files.add( patterns.plot_dtmap_filename(config, hpge_detector=hpge, runid=runid) ) @@ -207,6 +221,44 @@ def gen_list_of_all_dtmap_plots_outputs(config: SimflowConfig) -> set[str]: return files +def gen_list_of_currmods(config: SimflowConfig, runid: str) -> list[str]: + """Generate the list of HPGe current model parameter files for a `runid`.""" + hpges = gen_list_of_hpges_valid_for_modeling(config, runid) + return [ + patterns.output_currmod_filename(config, hpge_detector=hpge, runid=runid) + for hpge in hpges + ] + + +def gen_list_of_merged_currmods(config: SimflowConfig, simid: str) -> list[str]: + r"""Generate the list of (merged) HPGe current model parameter files for all requested `runid`\ s.""" + return [ + patterns.output_currmod_merged_filename(config, runid=runid) + for runid in get_runlist(config, simid) + ] + + +def gen_list_of_currmod_plots_outputs(config: SimflowConfig, simid: str) -> set[str]: + """Generate the list of HPGe drift time map plot outputs.""" + files = set() + for runid in get_runlist(config, simid): + for hpge in gen_list_of_hpges_valid_for_modeling(config, runid): + files.add( + patterns.plot_currmod_filename(config, hpge_detector=hpge, runid=runid) + ) + + return files + + +def gen_list_of_all_currmod_plots_outputs(config: SimflowConfig) -> set[str]: + """Generate the list of HPGe drift time map plot outputs.""" + files = set() + for simid in gen_list_of_all_simids(config): + files.update(gen_list_of_currmod_plots_outputs(config, simid)) + + return files + + def process_simlist( config: SimflowConfig, simlist: Iterable[str] | None = None ) -> list[Path]: diff --git a/workflow/src/legendsimflow/hpge_pars.py b/workflow/src/legendsimflow/hpge_pars.py new file mode 100644 index 0000000..07449ba --- /dev/null +++ b/workflow/src/legendsimflow/hpge_pars.py @@ -0,0 +1,299 @@ +from __future__ import annotations + +import logging +import re +from pathlib import Path + +import awkward as ak +import dbetto +import numpy as np +from dspeed.vis import WaveformBrowser +from lgdo import lh5 +from matplotlib import pyplot as plt +from matplotlib.axes import Axes +from matplotlib.figure import Figure +from numpy.typing import NDArray +from reboost.hpge.psd import _current_pulse_model as current_pulse_model +from scipy.optimize import curve_fit + +from . import SimflowConfig, utils +from . import metadata as mutils + +log = logging.getLogger(__name__) + + +def lookup_currmod_fit_data( + hit_files: list[str, Path], + lh5_group: str, + ewin_center: float = 1593, + ewin_width: float = 10, +) -> tuple[int, int]: + """Extract the index of the event to fit. + + Considers events with ``abs(A/E) < 1.5`` and finds the one that is closest + to the median of the distribution. Returns the index of the event in the + file and the index of the file in the input file list. + + Parameters + ---------- + hit_files + tier-hit files used to determine the best index. + lh5_group + where the tier-hit data is found in the files. + ewin_center + center of the energy window to use for the event search (same units as + in data). + ewin_width + width of the energy window to use for the event search (dame units as + in data). + """ + idxs = [] + energies = [] + dts = [] + + for file in hit_files: + energy = lh5.read(f"{lh5_group}/cuspEmax_ctc_cal", file).view_as("np") + aoe = lh5.read(f"{lh5_group}/AoE_Classifier", file).view_as("np") + + # get drift time if possible + if f"{lh5_group}/dt_eff" in lh5.ls(file): + dt_eff = lh5.read(f"{lh5_group}/dt_eff", file).view_as("np") + else: + dt_eff = np.zeros(len(aoe)) + + idx = np.where((abs(energy - ewin_center) < ewin_width / 2) & (abs(aoe) < 1.5))[ + 0 + ] + idxs.append(idx) + energies.append(energy[idx]) + dts.append(dt_eff[idx]) + + n = sum(len(d) for d in dts) + + if n > 500: + break + + # now chose the best index (closest to median) + if len(dts) == 0: + msg = "no data found in the considered hit files" + raise ValueError(msg) + + dts = ak.Array(dts) + idxs = ak.Array(idxs) + energies = ak.Array(energies) + + med = np.median(ak.flatten(dts)) + best = ak.argmin(abs(dts - med)) + n = ak.num(dts, axis=-1) + + array = np.full(len(ak.flatten(dts)), False) + array[best] = True + + sel = ak.unflatten(array, n) + + idx = ak.flatten(idxs[sel])[0] + file_idx = ak.where(ak.num(dts[sel], axis=-1) == 1)[0][0] + + return idx, file_idx + + +def fit_currmod(times: NDArray, current: NDArray) -> tuple: + """Fit the model to the raw HPGe current pulse. + + Uses :func:`scipy.curve_fit` to fit + :func:`reboost.hpge.psd._current_pulse_model` to the input raw pulse. + + Parameters + ---------- + times + the timesteps of the pulse. + current + the values of the pulse. + + Returns + ------- + tuple of the best fit parameters, and arrays of the best fit model + (time and current). + """ + t = times + A = current + + maxi = np.argmax(A) + max_t = t[maxi] + + low = max_t - 1000 + high = max_t + 1000 + + height = A[maxi] + + # initial guesses and ranges + p0 = [height, max_t, 60, 0.6, 100, 0.2, 60] + + ranges = [ + [0, max_t - 100, 1, 0, 1, 0, 1], + [height * 2, max_t + 100, 500, 1, 800, 1, 800], + ] + x = np.linspace(low, high, 1000) + y = current_pulse_model(x, *p0) + + # do the fit + popt, _ = curve_fit( + current_pulse_model, + t[(t > low) & (t < high)], + A[(t > low) & (t < high)], + p0=p0, + bounds=ranges, + ) + + x = np.linspace(low, high, int(100 * (high - low))) + y = current_pulse_model(x, *popt) + + return popt, x, y + + +def get_current_pulse( + raw_file: Path | str, + lh5_group: str, + idx: int, + dsp_config: str, + dsp_output: str = "curr_av", + align: str = "tp_aoe_max", +) -> tuple: + """Extract the current pulse. + + Parameters + ---------- + raw_file + path to the raw tier file. + lh5_group + where to find the waveform table. + idx + the index of the waveform to read. + dsp_config + the :mod:`dspeed` configuration file defining the DSP processing chain + to estimate the current pulse. + dsp_output + the name of the DSP output corresponding to the current pulse. + align + DSP value around which the pulses are aligned. + """ + + browser = WaveformBrowser( + str(raw_file), + lh5_group, + dsp_config=dsp_config, + lines=[dsp_output], + align=align, + ) + + browser.find_entry(idx) + + t = browser.lines[dsp_output][0].get_xdata() + A = browser.lines[dsp_output][0].get_ydata() + + return t, A + + +def plot_currmod_fit_result( + t: NDArray, A: NDArray, model_t: NDArray, model_A: NDArray +) -> tuple[Figure, Axes]: + """Plot the best fit results.""" + fig, ax = plt.subplots() + + ax.plot(t, A, linewidth=2, label="Waveform") + ax.plot(model_t, model_A, linewidth=1, label="Fit") + + ax.legend() + + ax.set_xlabel("Time [ns]") + ax.set_ylabel("Current [ADC]") + + return fig, ax + + +def lookup_currmod_fit_inputs( + config: SimflowConfig, + runid: str, + hpge: str, + hit_tier_name: str = "hit", +) -> tuple[Path, int, Path]: + """Find the raw file, event index and DSP configuration file. + + Parameters + ---------- + config + simflow configuration object. + runid + LEGEND-200 run identifier. + hpge + name of the HPGe detector + hit_tier_name + name of the hit tier. This is typically "hit" or "pht". + """ + l200data = config.paths.l200data + + # get the reference cal run + cal_runid = mutils.reference_cal_run(config.metadata, runid) + _, period, run, _ = re.split(r"\W+", cal_runid) + + msg = f"inferred reference calibration run: {cal_runid}" + log.debug(msg) + + # look for the dataflow config file + df_cfgs = [p for p in l200data.glob("*config.*") if not p.name.startswith(".")] + + if len(df_cfgs) > 1: + msg = f"found multiple configuration files in {l200data}, this cannot be!" + raise RuntimeError(msg) + + msg = f"found dataflow configuration file: {df_cfgs[0]}" + log.debug(msg) + central_config = dbetto.utils.load_dict(df_cfgs[0]) + + # get the paths to hit and raw tier files + df_cfg = ( + central_config["setups"]["l200"]["paths"] + if ("setups" in central_config) + else central_config["paths"] + ) + + hit_path = Path(df_cfg[f"tier_{hit_tier_name}"].replace("$_", str(l200data))) + msg = f"looking for hit tier files in {hit_path / 'cal' / period / run}/*" + log.debug(msg) + + hit_files = list((hit_path / "cal" / period / run).glob("*")) + + if len(hit_files) == 0: + msg = f"no hit tier files found in {hit_path}/cal/{period}/{run}" + raise ValueError(msg) + + lh5_group = utils._get_lh5_table(config.metadata, hit_files[0], hpge, "hit", runid) + + msg = "looking for best event to fit" + log.debug(msg) + wf_idx, file_idx = lookup_currmod_fit_data(hit_files, lh5_group) + + raw_path = Path(df_cfg["tier_raw"].replace("$_", str(l200data))) + hit_file = hit_files[file_idx] + raw_file = raw_path / str(hit_file.relative_to(hit_path)).replace( + hit_tier_name, "raw" + ) + + msg = f"determined raw file: {raw_file} (event index {wf_idx})" + log.debug(msg) + + dsp_cfg_regex = r"l200-*-r%-T%-ICPC-dsp_proc_chain.*" + dsp_cfg_files = list( + (l200data / "inputs/dataprod/config/tier_dsp").glob(dsp_cfg_regex) + ) + + if dsp_cfg_files == []: + dsp_cfg_files = list( + (l200data / "inputs/dataprod/config/tier/dsp").glob(dsp_cfg_regex) + ) + + if len(dsp_cfg_files) != 1: + msg = f"could not find a suitable dsp config file in {l200data} (or multiple found)" + raise RuntimeError(msg) + + return raw_file.resolve(), wf_idx, dsp_cfg_files[0] diff --git a/workflow/src/legendsimflow/metadata.py b/workflow/src/legendsimflow/metadata.py index dd2b476..9a6a80d 100644 --- a/workflow/src/legendsimflow/metadata.py +++ b/workflow/src/legendsimflow/metadata.py @@ -20,6 +20,7 @@ import logging import re from collections.abc import Collection +from pathlib import Path from dbetto import AttrsDict from legendmeta import LegendMetadata @@ -124,6 +125,11 @@ def smk_hash_simconfig( return hash_dict(scfg) +def extract_integer(file_path: Path) -> int: + with file_path.open() as f: + return int(f.read().strip()) + + def runinfo(metadata: LegendMetadata, runid: str) -> str: """Get the `datasets.runinfo` entry for a LEGEND run identifier. @@ -138,6 +144,38 @@ def runinfo(metadata: LegendMetadata, runid: str) -> str: return metadata.datasets.runinfo[period][run][datatype] +def reference_cal_run(metadata: LegendMetadata, runid: str) -> str: + """The reference calibration run for `runid`. + + Warning + ------- + This function does not account for dataflow overrides (e.g. calibration + back-applying)! + """ + exp, period, run, datatype = re.split(r"\W+", runid) + + if datatype == "cal": + return runid + + p_runinfo = metadata.datasets.runinfo[period] + + if "cal" in p_runinfo[run]: + return f"{exp}-{period}-{run}-cal" + + runs = sorted(p_runinfo.keys()) + index = runs.index(run) + + while True: + index -= 1 + if index < 0: + msg = f"there is no previous calibration run in {period} for {runid}" + raise RuntimeError(msg) + + prev_r = runs[index] + if "cal" in p_runinfo[prev_r]: + return f"{exp}-{period}-{prev_r}-cal" + + def simpars(metadata: LegendMetadata, par: str, runid: str) -> AttrsDict: """Extract simflow parameters for a certain LEGEND run. diff --git a/workflow/src/legendsimflow/patterns.py b/workflow/src/legendsimflow/patterns.py index 71ba9c1..c44b9fd 100644 --- a/workflow/src/legendsimflow/patterns.py +++ b/workflow/src/legendsimflow/patterns.py @@ -221,7 +221,7 @@ def plot_dtmap_filename(config: SimflowConfig, **kwargs) -> Path: return _expand(pat, **kwargs) -def benchmark_dtmap_filename(config: SimflowConfig, **kwargs): +def benchmark_dtmap_filename(config: SimflowConfig, **kwargs) -> Path: pat = ( config.paths.benchmarks / "hpge/dtmaps/{runid}-{hpge_detector}-drift-time-map.tsv" @@ -229,6 +229,38 @@ def benchmark_dtmap_filename(config: SimflowConfig, **kwargs): return expand(pat, **kwargs, allow_missing=True)[0] +# hpge current model + + +def input_currmod_evt_idx_file(config: SimflowConfig, **kwargs) -> Path: + pat = config.paths.genmeta / "hpge/currmod/{runid}-{hpge_detector}-best-evt-idx.txt" + return expand(pat, **kwargs, allow_missing=True)[0] + + +def output_currmod_filename(config: SimflowConfig, **kwargs) -> Path: + return _expand( + config.paths.genmeta / "hpge/currmod/{runid}-{hpge_detector}-model.yaml", + **kwargs, + ) + + +def output_currmod_merged_filename(config: SimflowConfig, **kwargs) -> Path: + return _expand( + config.paths.genmeta / "hpge/currmod/{runid}-models.yaml", + **kwargs, + ) + + +def log_currmod_filename(config: SimflowConfig, time: str, **kwargs) -> Path: + pat = log_dirname(config, time) / "hpge/currmod/{runid}-{hpge_detector}-model.log" + return _expand(pat, **kwargs) + + +def plot_currmod_filename(config: SimflowConfig, **kwargs) -> Path: + pat = config.paths.plots / "hpge/currmod/{runid}-{hpge_detector}-fit-result.pdf" + return _expand(pat, **kwargs) + + # hit tier diff --git a/workflow/src/legendsimflow/reboost.py b/workflow/src/legendsimflow/reboost.py index a645cbd..e83db6f 100644 --- a/workflow/src/legendsimflow/reboost.py +++ b/workflow/src/legendsimflow/reboost.py @@ -15,6 +15,7 @@ from __future__ import annotations import logging +from collections.abc import Mapping from pathlib import Path import awkward as ak @@ -197,7 +198,7 @@ def get_remage_hit_range( return i_start, n_entries -def hpge_corrected_dt_heuristic( +def hpge_corrected_drift_time( chunk: ak.Array, dt_map: reboost.hpge.utils.HPGeScalarRZField, det_loc: pyg4ometry.gdml.Defines.Position, @@ -208,14 +209,14 @@ def hpge_corrected_dt_heuristic( ---- This function will be moved to :mod:`reboost`. """ - _phi = np.arctan2( + phi = np.arctan2( chunk.yloc * 1000 - det_loc.eval()[1], chunk.xloc * 1000 - det_loc.eval()[0], ) - _drift_time = {} + drift_time = {} for angle, _map in dt_map.items(): - _drift_time[angle] = reboost.hpge.psd.drift_time( + drift_time[angle] = reboost.hpge.psd.drift_time( chunk.xloc, chunk.yloc, chunk.zloc, @@ -223,12 +224,48 @@ def hpge_corrected_dt_heuristic( coord_offset=det_loc, ).view_as("ak") - _drift_time_corr = ( - _drift_time["045"] - + (_drift_time["000"] - _drift_time["045"]) * (1 - np.cos(4 * _phi)) / 2 + return ( + drift_time["045"] + + (drift_time["000"] - drift_time["045"]) * (1 - np.cos(4 * phi)) / 2 ) - return reboost.hpge.psd.drift_time_heuristic(_drift_time_corr, chunk.edep) + +def hpge_max_current_cal( + edep: ak.Array, + drift_time: ak.Array, + currmod_pars: Mapping, +) -> ak.Array: + """Calculate the maximum of the current pulse. + + Parameters + ---------- + edep + energy deposited at each step. + drift_time + drift time of each energy deposit. + currmod_pars + dictionary storing the parameters of the current model (see + :func:`reboost.hpge.psd.get_current_template`) + """ + # current pulse template domain in ns (step is 1 ns) + t_domain = {"low": -1000, "high": 4000, "step": 1} + + # set the maximum of the template to unity, so the A/E will be + # already "calibrated" + currmod_pars["mean_aoe"] = 1 + + # instantiate the template + a_tmpl = reboost.hpge.psd.get_current_template( + **t_domain, + **currmod_pars, + ) + # and calculate the maximum current + return reboost.hpge.psd.maximum_current( + edep, + drift_time, + template=a_tmpl, + times=np.arange(t_domain["low"], t_domain["high"]), + ) def build_tcm(hit_file: str | Path) -> None: diff --git a/workflow/src/legendsimflow/scripts/extract_hpge_current_pulse_model.py b/workflow/src/legendsimflow/scripts/extract_hpge_current_pulse_model.py new file mode 100644 index 0000000..e5a859a --- /dev/null +++ b/workflow/src/legendsimflow/scripts/extract_hpge_current_pulse_model.py @@ -0,0 +1,67 @@ +# ruff: noqa: I002 + +# Copyright (C) 2025 Luigi Pertoldi , +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see . + +import dbetto +import legenddataflowscripts as ldfs +import legenddataflowscripts.utils +import matplotlib.pyplot as plt + +from legendsimflow import hpge_pars, nersc, utils +from legendsimflow.plot import decorate + +args = nersc.dvs_ro_snakemake(snakemake) # noqa: F821 + +config = args.config +runid = args.wildcards.runid +hpge = args.wildcards.hpge_detector +hit_tier_name = args.params.hit_tier_name +metadata = args.config.metadata +pars_file = args.output.pars_file +plot_file = args.output.plot_file +log_file = args.log[0] + +# setup logging +logger = ldfs.utils.build_log(metadata.simprod.config.logging, log_file) + +raw_file, wf_idx, dsp_cfg_file = hpge_pars.lookup_currmod_fit_inputs( + config, + runid, + hpge, + hit_tier_name, +) + +lh5_group = utils._get_lh5_table( + metadata, + raw_file, + hpge, + "raw", + runid, +) + +logger.info("fetching the current pulse") +t, A = hpge_pars.get_current_pulse(raw_file, lh5_group, wf_idx, str(dsp_cfg_file)) + +logger.info("fitting the current pulse to extract the model") +popt, x, y = hpge_pars.fit_currmod(t, A) + +# now plot +logger.info("plotting the fit result") +fig, _ = hpge_pars.plot_currmod_fit_result(t, A, x, y) +decorate(fig) +plt.savefig(plot_file) + +dbetto.utils.write_dict(utils._curve_fit_popt_to_dict(popt), pars_file) diff --git a/workflow/src/legendsimflow/scripts/tier/hit.py b/workflow/src/legendsimflow/scripts/tier/hit.py index 80e9c18..d1867de 100644 --- a/workflow/src/legendsimflow/scripts/tier/hit.py +++ b/workflow/src/legendsimflow/scripts/tier/hit.py @@ -33,7 +33,7 @@ from lgdo.lh5 import LH5Iterator from legendsimflow import metadata as mutils -from legendsimflow import nersc, partitioning +from legendsimflow import nersc, partitioning, patterns from legendsimflow import reboost as reboost_utils args = nersc.dvs_ro_snakemake(snakemake) # noqa: F821 @@ -45,10 +45,10 @@ log_file = args.log[0] metadata = args.config.metadata hpge_dtmap_files = args.input.hpge_dtmaps +hpge_currmods_files = args.input.hpge_currmods simstat_part_file = args.input.simstat_part_file buffer_len = args.params.buffer_len - # setup logging log = ldfs.utils.build_log(metadata.simprod.config.logging, log_file) @@ -64,6 +64,13 @@ msg = f"processing partition corresponding to {runid}, event range {evt_idx_range}" log.info(msg) + # load in parameters of the HPGe current signal model + currmod_pars_file = patterns.output_currmod_merged_filename( + snakemake.config, # noqa: F821 + runid=runid, + ) + currmod_pars_all = dbetto.utils.load_dict(currmod_pars_file) + # loop over the sensitive volumes registered in the geometry for det_name, geom_meta in sensvols.items(): msg = f"looking for data from sensitive volume {det_name} (uid={geom_meta.uid})..." @@ -93,64 +100,83 @@ buffer_len=buffer_len, ) - # process the HPGe output - if geom_meta.detector_type == "germanium": - msg = f"processing the {det_name} output table..." - log.info(msg) + # only process the HPGe output + if geom_meta.detector_type != "germanium": + continue + + msg = f"processing the {det_name} output table..." + log.info(msg) - log.debug("creating an pygeomhpges.HPGe object") - pyobj = pygeomhpges.make_hpge( - geom_meta.metadata, registry=None, allow_cylindrical_asymmetry=False + log.debug("creating an pygeomhpges.HPGe object") + pyobj = pygeomhpges.make_hpge( + geom_meta.metadata, registry=None, allow_cylindrical_asymmetry=False + ) + + fccd = mutils.get_sanitized_fccd(metadata, det_name) + det_loc = geom.physicalVolumeDict[det_name].position + # NOTE: we don't use the script arg but we use the (known) file patterns. more robust + dt_map = reboost_utils.load_hpge_dtmaps(snakemake.config, det_name, runid) # noqa: F821 + + # load parameters of the current model + currmod_pars = currmod_pars_all.get(det_name, None) + + # iterate over input data + for lgdo_chunk in iterator: + chunk = lgdo_chunk.view_as("ak") + + _distance_to_nplus = reboost.hpge.surface.distance_to_surface( + chunk.xloc * 1000, # mm + chunk.yloc * 1000, # mm + chunk.zloc * 1000, # mm + pyobj, + det_loc.eval(), + distances_precompute=chunk.dist_to_surf * 1000, + precompute_cutoff=(fccd + 1), + surface_type="nplus", ) - fccd = mutils.get_sanitized_fccd(metadata, det_name) - det_loc = geom.physicalVolumeDict[det_name].position - dt_map = reboost_utils.load_hpge_dtmaps(snakemake.config, det_name, runid) # noqa: F821 - - # iterate over input data - for lgdo_chunk in iterator: - chunk = lgdo_chunk.view_as("ak") - - _distance_to_nplus = reboost.hpge.surface.distance_to_surface( - chunk.xloc * 1000, # mm - chunk.yloc * 1000, # mm - chunk.zloc * 1000, # mm - pyobj, - det_loc.eval(), - distances_precompute=chunk.dist_to_surf * 1000, - precompute_cutoff=(fccd + 1), - surface_type="nplus", - ) + _activeness = reboost.math.functions.piecewise_linear_activeness( + _distance_to_nplus, + fccd=fccd, + dlf=0.2, + ) - _activeness = reboost.math.functions.piecewise_linear_activeness( - _distance_to_nplus, - fccd=fccd, - dlf=0.2, - ) + edep_active = chunk.edep * _activeness + energy = ak.sum(edep_active, axis=-1) - energy = ak.sum(chunk.edep * _activeness, axis=-1) + # PSD: if the drift time map is none, it means that we don't + # have the detector model to simulate PSD in a more advanced + # way - if dt_map is not None: - rdt_heuristic = reboost_utils.hpge_corrected_dt_heuristic( - chunk, dt_map, det_loc - ) - else: - dt_heuristic = np.full(len(chunk), np.nan) + # default to NaN + dt_heuristic = np.full(len(chunk), np.nan) + aoe = np.full(len(chunk), np.nan) - out_table = reboost_utils.make_output_chunk(lgdo_chunk) - out_table.add_field( - "energy", lgdo.Array(energy, attrs={"units": "keV"}) + if dt_map is not None: + _drift_time = reboost_utils.hpge_corrected_drift_time( + chunk, dt_map, det_loc + ) + dt_heuristic = reboost.hpge.psd.drift_time_heuristic( + _drift_time, chunk.edep + ) + _a_max = reboost_utils.hpge_max_current_cal( + edep_active, _drift_time, currmod_pars ) - out_table.add_field("drift_time_heuristic", lgdo.Array(dt_heuristic)) + aoe = _a_max / energy - partitioning.add_field_runid(out_table, runid) + out_table = reboost_utils.make_output_chunk(lgdo_chunk) + out_table.add_field("energy", lgdo.Array(energy, attrs={"units": "keV"})) + out_table.add_field("drift_time_heuristic", lgdo.Array(dt_heuristic)) + out_table.add_field("aoe", lgdo.Array(aoe)) - reboost_utils.write_chunk( - out_table, - f"/hit/{det_name}", - hit_file, - geom_meta.uid, - ) + partitioning.add_field_runid(out_table, runid) + + reboost_utils.write_chunk( + out_table, + f"/hit/{det_name}", + hit_file, + geom_meta.uid, + ) log.debug("building the TCM") reboost_utils.build_tcm(hit_file) diff --git a/workflow/src/legendsimflow/utils.py b/workflow/src/legendsimflow/utils.py index 2671f46..d4805eb 100644 --- a/workflow/src/legendsimflow/utils.py +++ b/workflow/src/legendsimflow/utils.py @@ -15,6 +15,7 @@ from __future__ import annotations +import inspect import os from datetime import datetime from pathlib import Path @@ -22,8 +23,12 @@ import legenddataflowscripts as ldfs from dbetto import AttrsDict from legendmeta import LegendMetadata +from lgdo import lh5 +from numpy.typing import ArrayLike +from reboost.hpge.psd import _current_pulse_model as current_pulse_model from . import SimflowConfig +from . import metadata as mutils def _merge_defaults(user: dict, default: dict) -> dict: @@ -38,6 +43,20 @@ def _merge_defaults(user: dict, default: dict) -> dict: def init_simflow_context(raw_config: dict, workflow) -> AttrsDict: + """Pre-process and sanitize the Simflow configuration. + + - set default configuration fields; + - substitute ``$_`` and environment variables; + - convert to :class:`~dbetto.attrsdict.AttrsDict`; + - cast filesystem paths to :class:`pathlib.Path`; + - clone and configure `legend-metadata`; + - attach a :class:`legendmeta.LegendMetadata` instance to the Simflow + configuration; + - export important environment variables. + + Returns a dictionary with useful objects to be used in the Simflow + Snakefiles (i.e. the "context"). + """ if not raw_config: msg = "you must set a config file with --configfile" raise RuntimeError(msg) @@ -82,6 +101,7 @@ def _make_path(d): def setup_logdir_link(config: SimflowConfig, proctime): + """Set up the timestamp-tagged directory for the workflow log files.""" logdir = Path(config.paths.log) logdir.mkdir(parents=True, exist_ok=True) @@ -90,3 +110,40 @@ def setup_logdir_link(config: SimflowConfig, proctime): if link.exists() or link.is_symlink(): link.unlink() link.symlink_to(proctime, target_is_directory=True) + + +# FIXME: this should be removed once the PRL25 data is reprocessed +def _get_lh5_table( + metadata: LegendMetadata, + fname: str | Path, + hpge: str, + tier: str, + runid: str, +) -> str: + """The correct LH5 table path. + + Determines the correct path to a `hpge` detector table in tier `tier`. + """ + # check if the latest format is available + path = f"{tier}/{hpge}" + if lh5.ls(fname, path) == [path]: + return path + + # otherwise fall back to the old format + timestamp = mutils.runinfo(metadata, runid).start_key + rawid = metadata.channelmap(timestamp)[hpge].daq.rawid + return f"ch{rawid}/{tier}" + + +def _curve_fit_popt_to_dict(popt: ArrayLike) -> dict: + """Get the :func:`scipy.curve_fit` parameter results as a dictionary""" + params = list(inspect.signature(current_pulse_model).parameters) + param_names = params[1:] + + popt_dict = dict(zip(param_names, popt, strict=True)) + popt_dict["mean_aoe"] = popt_dict["amax"] / 1593 + + for key, value in popt_dict.items(): + popt_dict[key] = float(f"{value:.3g}") + + return popt_dict