Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 38 additions & 38 deletions src/reboost/hpge/psd.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import numpy as np
import pint
import pyg4ometry
from lgdo import Array, VectorOfVectors
from lgdo import Array
from numpy.typing import ArrayLike, NDArray

from .. import units
Expand All @@ -18,7 +18,7 @@
log = logging.getLogger(__name__)


def r90(edep: ak.Array, xloc: ak.Array, yloc: ak.Array, zloc: ak.Array) -> Array:
def r90(edep: ak.Array, xloc: ak.Array, yloc: ak.Array, zloc: ak.Array) -> ak.Array:
"""R90 HPGe pulse shape heuristic.

Parameters
Expand All @@ -31,18 +31,20 @@ def r90(edep: ak.Array, xloc: ak.Array, yloc: ak.Array, zloc: ak.Array) -> Array
array of y coordinate position.
zloc
array of z coordinate position.

Returns
-------
calculated r90 for each hit
"""
pos = [units.units_conv_ak(pos, "mm") for pos in [xloc, yloc, zloc]]

tot_energy = ak.sum(edep, axis=-1, keepdims=True)

def eweight_mean(field, energy):
return ak.sum(energy * field, axis=-1, keepdims=True) / tot_energy

# Compute distance of each edep to the weighted mean
dist = np.sqrt(
(xloc - eweight_mean(edep, xloc)) ** 2
+ (yloc - eweight_mean(edep, yloc)) ** 2
+ (zloc - eweight_mean(edep, zloc)) ** 2
)
dist = np.sqrt(sum((p - eweight_mean(edep, p)) ** 2 for p in pos))

# Sort distances and corresponding edep within each event
sorted_indices = ak.argsort(dist, axis=-1)
Expand Down Expand Up @@ -76,16 +78,16 @@ def _ak_cumsum(layout, **_kwargs):
r90_indices = ak.argmax(cumsum_edep_corrected >= threshold, axis=-1, keepdims=True)
r90 = sorted_dist[r90_indices]

return Array(ak.flatten(r90).to_numpy())
return ak.Array(ak.flatten(r90), attrs={"units": "mm"})


def drift_time(
xloc: ArrayLike,
yloc: ArrayLike,
zloc: ArrayLike,
xloc: ak.Array,
yloc: ak.Array,
zloc: ak.Array,
dt_map: HPGeScalarRZField,
coord_offset: pint.Quantity | pyg4ometry.gdml.Position = (0, 0, 0) * u.m,
) -> VectorOfVectors:
) -> ak.Array:
"""Calculates drift times for each step (cluster) in an HPGe detector.

Parameters
Expand All @@ -106,7 +108,7 @@ def drift_time(
# sanitize coord_offset
coord_offset = units.pg4_to_pint(coord_offset)

# unit handling (for matching with drift time map units)
# unit handling (.r_units) for data in (xloc, yloc)]
xu, yu = [units.units_convfact(data, dt_map.r_units) for data in (xloc, yloc)]
zu = units.units_convfact(zloc, dt_map.z_units)

Expand All @@ -122,7 +124,6 @@ def _ak_dt_map(layouts, **_kwargs):

return None

# transform coordinates
xloc = xu * xloc - coord_offset[0].to(dt_map.r_units).m
yloc = yu * yloc - coord_offset[1].to(dt_map.r_units).m
zloc = zu * zloc - coord_offset[2].to(dt_map.z_units).m
Expand All @@ -133,16 +134,13 @@ def _ak_dt_map(layouts, **_kwargs):
np.sqrt(xloc**2 + yloc**2),
zloc,
)
return VectorOfVectors(
dt_values,
attrs={"units": units.unit_to_lh5_attr(dt_map.φ_units)},
)
return units.attach_units(ak.Array(dt_values), units.unit_to_lh5_attr(dt_map.φ_units))


def drift_time_heuristic(
drift_time: ArrayLike,
edep: ArrayLike,
) -> Array:
drift_time: ak.Array,
edep: ak.Array,
) -> ak.Array:
"""HPGe drift-time-based pulse-shape heuristic.

See :func:`_drift_time_heuristic_impl` for a description of the algorithm.
Expand All @@ -160,11 +158,12 @@ def drift_time_heuristic(
edep, e_units = units.unwrap_lgdo(edep)

# we want to attach the right units to the dt heuristic, if possible
attrs = {}
unit = None
if t_units is not None and e_units is not None:
attrs["units"] = units.unit_to_lh5_attr(t_units / e_units)
unit = units.unit_to_lh5_attr(t_units / e_units)
return units.attach_units(ak.Array(_drift_time_heuristic_impl(drift_time, edep)), unit)

return Array(_drift_time_heuristic_impl(drift_time, edep), attrs=attrs)
return ak.Array(_drift_time_heuristic_impl(drift_time, edep))


@numba.njit(cache=True)
Expand Down Expand Up @@ -754,9 +753,9 @@ def _estimate_current_impl(


def maximum_current(
edep: ArrayLike,
drift_time: ArrayLike,
dist_to_nplus: ArrayLike | None = None,
edep: ak.Array,
drift_time: ak.Array,
dist_to_nplus: ak.Array | None = None,
*,
template: np.array,
times: np.array,
Expand All @@ -765,22 +764,22 @@ def maximum_current(
activeness_surface: ArrayLike | None = None,
surface_step_in_um: float = 10,
return_mode: str = "current",
) -> Array:
) -> ak.Array:
"""Estimate the maximum current in the HPGe detector based on :func:`_estimate_current_impl`.

Parameters
----------
edep
Array of energies for each step.
Energies for each step.
drift_time
Array of drift times for each step.
Drift times for each step.
dist_to_nplus
Distance to n-plus electrode, only needed if surface heuristics are enabled.
template
array of the bulk pulse template
Array of the bulk pulse template
times
time-stamps for the bulk pulse template
fccd
fccd_in_um
Value of the full-charge-collection depth, if `None` no surface corrections are performed.
surface_library
2D array (distance, time) of the rate of charge arriving at the p-n junction. Each row
Expand All @@ -797,9 +796,9 @@ def maximum_current(
"""
# extract LGDO data and units

drift_time, _ = units.unwrap_lgdo(drift_time)
edep, _ = units.unwrap_lgdo(edep)
dist_to_nplus, _ = units.unwrap_lgdo(dist_to_nplus)
drift_time = units.units_conv_ak(drift_time, "ns")
edep = units.units_conv_ak(edep, "keV")
dist_to_nplus = units.units_conv_ak(dist_to_nplus, "um")

include_surface_effects = False

Expand Down Expand Up @@ -837,11 +836,12 @@ def maximum_current(

# return
if return_mode == "max_time":
return Array(time)
return units.attach_units(ak.Array(time), "ns")
if return_mode == "current":
return Array(curr)
# current has no unit (depends on the template)
return ak.Array(curr)
if return_mode == "energy":
return Array(energy)
return units.attach_units(ak.Array(energy), "keV")

msg = f"Return mode {return_mode} is not implemented."
raise ValueError(msg)
49 changes: 30 additions & 19 deletions src/reboost/hpge/surface.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,30 @@
import awkward as ak
import numba
import numpy as np
import pygeomhpges
from lgdo import VectorOfVectors
import pint
import pyg4ometry
from lgdo.types import LGDO
from numpy.typing import ArrayLike
from numpy.typing import NDArray
from scipy import stats

from reboost.units import ureg as u

from .. import units

log = logging.getLogger(__name__)


def distance_to_surface(
positions_x: VectorOfVectors,
positions_y: VectorOfVectors,
positions_z: VectorOfVectors,
hpge: pygeomhpges.base.HPGe,
det_pos: ArrayLike,
positions_x: ak.Array,
positions_y: ak.Array,
positions_z: ak.Array,
hpge: legendhpges.base.HPGe,
det_pos: pint.Quantity | pyg4ometry.gdml.Position | tuple = (0, 0, 0) * u.m,
*,
surface_type: str | None = None,
unit: str = "mm",
distances_precompute: VectorOfVectors | None = None,
distances_precompute: ak.Array | None = None,
precompute_cutoff: float | None = None,
) -> VectorOfVectors:
) -> ak.Array:
"""Computes the distance from each step to the detector surface.

The calculation can be performed for any surface type `nplus`, `pplus`,
Expand All @@ -44,32 +47,36 @@ def distance_to_surface(
hpge
HPGe object.
det_pos
position of the detector origin, must be a 3 component array corresponding to `(x,y,z)`.
position of the detector origin, must be a 3 component array corresponding to `(x,y,z)`. If no units
are specified mm is assumed.
surface_type
string of which surface to use, can be `nplus`, `pplus` `passive` or None (in which case the distance to any surface is calculated).
unit
unit for the hit tier positions table.
distances_precompute
VectorOfVectors of distance to any surface computed by remage.
Distance to any surface computed by remage.
precompute_cutoff
cutoff on distances_precompute to not compute the distance for (in mm)

Returns
-------
VectorOfVectors with the same shape as `positions_x/y/z` of the distance to the surface.
Distance to the surface for each hit with the same shape as `positions_x/y/z`.

Note
----
`positions_x/positions_y/positions_z` must all have the same shape.
"""
factor = np.array([1, 100, 1000])[unit == np.array(["mm", "cm", "m"])][0]

# compute local positions
pos = []
sizes = []

if not isinstance(det_pos, pint.Quantity | pyg4ometry.gdml.Position):
det_pos = det_pos * u.mm

det_pos = units.pg4_to_pint(det_pos)

for idx, pos_tmp in enumerate([positions_x, positions_y, positions_z]):
local_pos_tmp = ak.Array(pos_tmp) * factor - det_pos[idx]
local_pos_tmp = units.units_conv_ak(pos_tmp, "mm") - det_pos[idx].to("mm").m
local_pos_flat_tmp = ak.flatten(local_pos_tmp).to_numpy()
pos.append(local_pos_flat_tmp)
sizes.append(ak.num(local_pos_tmp, axis=1))
Expand Down Expand Up @@ -106,7 +113,7 @@ def distance_to_surface(
local_positions[indices], surface_indices=surface_indices
)

return VectorOfVectors(ak.unflatten(distances, size), dtype=np.float32)
return units.attach_units(ak.Array(ak.unflatten(distances, size)), "mm")


@numba.njit(cache=True)
Expand Down Expand Up @@ -222,7 +229,7 @@ def get_surface_response(
factor: float = 0.29,
nsteps: int = 10000,
delta_x: float = 10,
):
) -> NDArray:
"""Extract the surface response current pulse based on diffusion.

This extracts the amount of charge arrived (cumulative) at the p-n
Expand All @@ -248,6 +255,10 @@ def get_surface_response(
the number of time steps.
delta_x
the width of each position bin.

Returns
-------
2D array of the amount of charge arriving at the p-n junction as a function of time for each depth.
"""
# number of position steps
nx = int(fccd / delta_x)
Expand Down
Loading
Loading