Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ dependencies = [
"numpy",
"pyg4ometry",
"pygama >=2.2.3",
"pylegendmeta >=1.3.3",
"pylegendmeta >=1.3.5",
"reboost >=0.8.3",
"snakemake>=8.16",
"revertex >=0.1.2",
"snakemake >=8.16",
"snakemake-storage-plugin-fs",
]

Expand Down Expand Up @@ -177,7 +178,7 @@ legend-pydataobj = "*"
legend-pygeom-l200 = ">=0.8,<0.9"
legend-pygeom-tools = ">=0.1,<0.2"
numpy = "*"
pylegendmeta = ">=1.3.3,<1.4"
pylegendmeta = ">=1.3.5,<1.4"

# execution
snakemake = ">=8.16,<9"
Expand Down
7 changes: 2 additions & 5 deletions workflow/rules/hit.smk
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,6 @@ rule extract_current_pulse_model:
# dynamically generated, and that would slow down the DAG generation
# input:
# unpack(smk_extract_current_pulse_model_inputs),
params:
# evt_idx=parse_input(input.raw_wf_idx_file, parser=mutils.extract_integer),
hit_tier_name="hit",
output:
pars_file=temp(patterns.output_currmod_filename(config)),
plot_file=patterns.plot_currmod_filename(config),
Expand All @@ -279,10 +276,10 @@ rule merge_current_pulse_model_pars:
patterns.output_currmod_merged_filename(config),
run:
import dbetto
from legendsimflow.aggregate import gen_list_of_hpges_valid_for_dtmap
from legendsimflow.aggregate import gen_list_of_hpges_valid_for_modeling

# NOTE: this is guaranteed to be sorted as in the input file list
hpges = gen_list_of_hpges_valid_for_dtmap(
hpges = gen_list_of_hpges_valid_for_modeling(
config, wildcards.runid, cache=SIMFLOW_CONTEXT.modelable_hpges
)

Expand Down
70 changes: 33 additions & 37 deletions workflow/src/legendsimflow/hpge_pars.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
from pathlib import Path

import awkward as ak
import dbetto
import numpy as np
from dspeed.vis import WaveformBrowser
from legendmeta import LegendMetadata
from lgdo import lh5
from matplotlib import pyplot as plt
from matplotlib.axes import Axes
Expand All @@ -16,8 +16,8 @@
from reboost.hpge.psd import _current_pulse_model as current_pulse_model
from scipy.optimize import curve_fit

from . import SimflowConfig, utils
from . import metadata as mutils
from . import utils

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -207,7 +207,8 @@ def plot_currmod_fit_result(


def lookup_currmod_fit_inputs(
config: SimflowConfig,
l200data: str,
metadata: LegendMetadata,
runid: str,
hpge: str,
hit_tier_name: str = "hit",
Expand All @@ -216,40 +217,35 @@ def lookup_currmod_fit_inputs(

Parameters
----------
config
simflow configuration object.
l200data
The path to the L200 data production cycle.
metadata
The metadata instance
runid
LEGEND-200 run identifier.
LEGEND-200 run identifier, must be of the form `{EXPERIMENT}-{PERIOD}-{RUN}-{TYPE}`.
hpge
name of the HPGe detector
hit_tier_name
name of the hit tier. This is typically "hit" or "pht".
"""
l200data = config.paths.l200data

if isinstance(l200data, str):
l200data = Path(l200data)

dataflow_config = utils.lookup_dataflow_config(l200data)

# get the reference cal run
cal_runid = mutils.reference_cal_run(config.metadata, runid)
cal_runid = mutils.reference_cal_run(metadata, runid)
_, period, run, _ = re.split(r"\W+", cal_runid)

msg = f"inferred reference calibration run: {cal_runid}"
log.debug(msg)

# look for the dataflow config file
df_cfgs = [p for p in l200data.glob("*config.*") if not p.name.startswith(".")]

if len(df_cfgs) > 1:
msg = f"found multiple configuration files in {l200data}, this cannot be!"
raise RuntimeError(msg)

msg = f"found dataflow configuration file: {df_cfgs[0]}"
log.debug(msg)
central_config = dbetto.utils.load_dict(df_cfgs[0])

# get the paths to hit and raw tier files
df_cfg = (
central_config["setups"]["l200"]["paths"]
if ("setups" in central_config)
else central_config["paths"]
dataflow_config["setups"]["l200"]["paths"]
if ("setups" in dataflow_config)
else dataflow_config["paths"]
)

hit_path = Path(df_cfg[f"tier_{hit_tier_name}"].replace("$_", str(l200data)))
Expand All @@ -262,21 +258,6 @@ def lookup_currmod_fit_inputs(
msg = f"no hit tier files found in {hit_path}/cal/{period}/{run}"
raise ValueError(msg)

lh5_group = utils._get_lh5_table(config.metadata, hit_files[0], hpge, "hit", runid)

msg = "looking for best event to fit"
log.debug(msg)
wf_idx, file_idx = lookup_currmod_fit_data(hit_files, lh5_group)

raw_path = Path(df_cfg["tier_raw"].replace("$_", str(l200data)))
hit_file = hit_files[file_idx]
raw_file = raw_path / str(hit_file.relative_to(hit_path)).replace(
hit_tier_name, "raw"
)

msg = f"determined raw file: {raw_file} (event index {wf_idx})"
log.debug(msg)

dsp_cfg_regex = r"l200-*-r%-T%-ICPC-dsp_proc_chain.*"
dsp_cfg_files = list(
(l200data / "inputs/dataprod/config/tier_dsp").glob(dsp_cfg_regex)
Expand All @@ -291,4 +272,19 @@ def lookup_currmod_fit_inputs(
msg = f"could not find a suitable dsp config file in {l200data} (or multiple found)"
raise RuntimeError(msg)

lh5_group = utils._get_lh5_table(metadata, hit_files[0], hpge, "hit", runid)

msg = "looking for best event to fit"
log.debug(msg)

wf_idx, file_idx = lookup_currmod_fit_data(hit_files, lh5_group)

raw_path = Path(df_cfg["tier_raw"].replace("$_", str(l200data)))
hit_file = hit_files[file_idx]
raw_file = raw_path / str(hit_file.relative_to(hit_path)).replace(
hit_tier_name, "raw"
)
msg = f"determined raw file: {raw_file} (event index {wf_idx})"
log.debug(msg)

return raw_file.resolve(), wf_idx, dsp_cfg_files[0]
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@
args = nersc.dvs_ro_snakemake(snakemake) # noqa: F821

config = args.config

if "l200data" not in args.config.paths:
msg = "Cannot extract current pars without setting the path to the l200data in the simconfig file."
raise KeyError(msg)

runid = args.wildcards.runid
hpge = args.wildcards.hpge_detector
hit_tier_name = args.params.hit_tier_name
metadata = args.config.metadata
pars_file = args.output.pars_file
plot_file = args.output.plot_file
Expand All @@ -37,8 +41,16 @@
# setup logging
logger = ldfs.utils.build_log(metadata.simprod.config.logging, log_file)

l200data = args.config.paths.l200data
hit_tier_name = utils.get_hit_tier_name(l200data)

msg = f"... determined hit tier name is {hit_tier_name}"
logger.info(msg)

logger.info("... looking up the fit inputs")
raw_file, wf_idx, dsp_cfg_file = hpge_pars.lookup_currmod_fit_inputs(
config,
l200data,
metadata,
runid,
hpge,
hit_tier_name,
Expand All @@ -52,16 +64,17 @@
runid,
)

logger.info("fetching the current pulse")
logger.info("... fetching the current pulse")
t, A = hpge_pars.get_current_pulse(raw_file, lh5_group, wf_idx, str(dsp_cfg_file))

logger.info("fitting the current pulse to extract the model")
logger.info("... fitting the current pulse to extract the model")
popt, x, y = hpge_pars.fit_currmod(t, A)

# now plot
logger.info("plotting the fit result")
logger.info("... plotting the fit result")
fig, _ = hpge_pars.plot_currmod_fit_result(t, A, x, y)
decorate(fig)
plt.savefig(plot_file)

logger.info("... saving outputs")
dbetto.utils.write_dict(utils._curve_fit_popt_to_dict(popt), pars_file)
78 changes: 77 additions & 1 deletion workflow/src/legendsimflow/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
from __future__ import annotations

import inspect
import logging
import os
from datetime import datetime
from pathlib import Path

import dbetto
import legenddataflowscripts as ldfs
from dbetto import AttrsDict
from legendmeta import LegendMetadata
Expand All @@ -30,6 +32,8 @@
from . import SimflowConfig
from . import metadata as mutils

log = logging.getLogger(__name__)


def _merge_defaults(user: dict, default: dict) -> dict:
# merge default into user without overwriting user values
Expand Down Expand Up @@ -83,8 +87,10 @@ def _make_path(d):
# NOTE: this will attempt a clone of legend-metadata, if the directory does not exist
# NOTE: don't use lazy=True, we need a fully functional TextDB
metadata = LegendMetadata(config.paths.metadata)

if "legend_metadata_version" in config:
metadata.checkout(config.legend_metadata_version)

config["metadata"] = metadata

# make sure all simflow plots are made with a consistent style
Expand Down Expand Up @@ -137,10 +143,80 @@ def _get_lh5_table(

# otherwise fall back to the old format
timestamp = mutils.runinfo(metadata, runid).start_key
rawid = metadata.channelmap(timestamp)[hpge].daq.rawid
log.info(timestamp)

chmap = metadata.channelmap(timestamp)
log.info(chmap.keys())
rawid = chmap[hpge].daq.rawid
return f"ch{rawid}/{tier}"


def lookup_dataflow_config(l200data: Path | str) -> AttrsDict:
"""Find the paths to the data inputs.

Parameters
----------
l200data
The path to the L200 data production cycle.
Returns
-------
the dataflow configuration file as a dictionary.
"""
if not isinstance(l200data, Path):
l200data = Path(l200data)

# look for the dataflow config file
df_cfgs = [p for p in l200data.glob("*config.*") if not p.name.startswith(".")]

if len(df_cfgs) > 1:
msg = f"found multiple configuration files in {l200data}, this cannot be!"
raise RuntimeError(msg)

if len(df_cfgs) == 0:
msg = f"did not find any valid config file in {l200data}, this cannot be!"
raise RuntimeError(msg)

msg = f"found dataflow configuration file: {df_cfgs[0]}"
log.debug(msg)

return dbetto.utils.load_dict(df_cfgs[0])


def get_hit_tier_name(l200data: str) -> str:
"""Extract the name of the hit tier for this production cycle.

If the `pht` tier is present this is used else the `hit` tier is used.

Parameters
----------
l200data
Path to the production cycle of l200 data.
"""

dataflow_config = lookup_dataflow_config(l200data)

df_cfg = (
dataflow_config["setups"]["l200"]["paths"]
if ("setups" in dataflow_config)
else dataflow_config["paths"]
)

# first check if pht exists
has_pht = ("tier_pht" in df_cfg) and Path(
df_cfg["tier_pht"].replace("$_", str(l200data))
).exists()
has_hit = ("tier_hit" in df_cfg) and Path(
df_cfg["tier_hit"].replace("$_", str(l200data))
).exists()

if has_pht:
return "pht"
if has_hit:
return "hit"
msg = f"The l200data {l200data} does not contain a valid pht or hit tier"
raise RuntimeError(msg)


def _curve_fit_popt_to_dict(popt: ArrayLike) -> dict:
"""Get the :func:`scipy.curve_fit` parameter results as a dictionary"""
params = list(inspect.signature(current_pulse_model).parameters)
Expand Down
Loading