Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 37 additions & 33 deletions src/reboost/hpge/psd.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import numpy as np
import pint
import pyg4ometry
from lgdo import Array, VectorOfVectors
from lgdo import Array
from numpy.typing import ArrayLike, NDArray

from .. import units
Expand All @@ -18,7 +18,7 @@
log = logging.getLogger(__name__)


def r90(edep: ak.Array, xloc: ak.Array, yloc: ak.Array, zloc: ak.Array) -> Array:
def r90(edep: ak.Array, xloc: ak.Array, yloc: ak.Array, zloc: ak.Array) -> ak.Array:
"""R90 HPGe pulse shape heuristic.

Parameters
Expand All @@ -31,18 +31,20 @@ def r90(edep: ak.Array, xloc: ak.Array, yloc: ak.Array, zloc: ak.Array) -> Array
array of y coordinate position.
zloc
array of z coordinate position.

Returns
-------
calculated r90 for each hit
"""
pos = [units.units_conv_ak(pos, "mm") for pos in [xloc, yloc, zloc]]

tot_energy = ak.sum(edep, axis=-1, keepdims=True)

def eweight_mean(field, energy):
return ak.sum(energy * field, axis=-1, keepdims=True) / tot_energy

# Compute distance of each edep to the weighted mean
dist = np.sqrt(
(xloc - eweight_mean(edep, xloc)) ** 2
+ (yloc - eweight_mean(edep, yloc)) ** 2
+ (zloc - eweight_mean(edep, zloc)) ** 2
)
dist = np.sqrt(sum((p - eweight_mean(edep, p)) ** 2 for p in pos))

# Sort distances and corresponding edep within each event
sorted_indices = ak.argsort(dist, axis=-1)
Expand Down Expand Up @@ -76,16 +78,16 @@ def _ak_cumsum(layout, **_kwargs):
r90_indices = ak.argmax(cumsum_edep_corrected >= threshold, axis=-1, keepdims=True)
r90 = sorted_dist[r90_indices]

return Array(ak.flatten(r90).to_numpy())
return ak.Array(ak.flatten(r90), attrs={"units": "mm"})


def drift_time(
xloc: ArrayLike,
yloc: ArrayLike,
zloc: ArrayLike,
xloc: ak.Array,
yloc: ak.Array,
zloc: ak.Array,
dt_map: HPGeScalarRZField,
coord_offset: pint.Quantity | pyg4ometry.gdml.Position = (0, 0, 0) * u.m,
) -> VectorOfVectors:
) -> ak.Array:
"""Calculates drift times for each step (cluster) in an HPGe detector.

Parameters
Expand All @@ -106,7 +108,7 @@ def drift_time(
# sanitize coord_offset
coord_offset = units.pg4_to_pint(coord_offset)

# unit handling (for matching with drift time map units)
# unit handling (.r_units) for data in (xloc, yloc)]
xu, yu = [units.units_convfact(data, dt_map.r_units) for data in (xloc, yloc)]
zu = units.units_convfact(zloc, dt_map.z_units)

Expand All @@ -122,7 +124,6 @@ def _ak_dt_map(layouts, **_kwargs):

return None

# transform coordinates
xloc = xu * xloc - coord_offset[0].to(dt_map.r_units).m
yloc = yu * yloc - coord_offset[1].to(dt_map.r_units).m
zloc = zu * zloc - coord_offset[2].to(dt_map.z_units).m
Expand All @@ -133,16 +134,16 @@ def _ak_dt_map(layouts, **_kwargs):
np.sqrt(xloc**2 + yloc**2),
zloc,
)
return VectorOfVectors(
return ak.Array(
dt_values,
attrs={"units": units.unit_to_lh5_attr(dt_map.φ_units)},
)


def drift_time_heuristic(
drift_time: ArrayLike,
edep: ArrayLike,
) -> Array:
drift_time: ak.Array,
edep: ak.Array,
) -> ak.Array:
"""HPGe drift-time-based pulse-shape heuristic.

See :func:`_drift_time_heuristic_impl` for a description of the algorithm.
Expand All @@ -164,7 +165,9 @@ def drift_time_heuristic(
if t_units is not None and e_units is not None:
attrs["units"] = units.unit_to_lh5_attr(t_units / e_units)

return Array(_drift_time_heuristic_impl(drift_time, edep), attrs=attrs)
return units.units_conv_ak(
ak.Array(_drift_time_heuristic_impl(drift_time, edep)), attrs["units"]
)


@numba.njit(cache=True)
Expand Down Expand Up @@ -734,9 +737,9 @@ def _estimate_current_impl(


def maximum_current(
edep: ArrayLike,
drift_time: ArrayLike,
dist_to_nplus: ArrayLike | None = None,
edep: ak.Array,
drift_time: ak.Array,
dist_to_nplus: ak.Array | None = None,
*,
template: np.array,
times: np.array,
Expand All @@ -745,22 +748,22 @@ def maximum_current(
activeness_surface: ArrayLike | None = None,
surface_step_in_um: float = 10,
return_mode: str = "current",
) -> Array:
) -> ak.Array:
"""Estimate the maximum current in the HPGe detector based on :func:`_estimate_current_impl`.

Parameters
----------
edep
Array of energies for each step.
Energies for each step.
drift_time
Array of drift times for each step.
Drift times for each step.
dist_to_nplus
Distance to n-plus electrode, only needed if surface heuristics are enabled.
template
array of the bulk pulse template
Array of the bulk pulse template
times
time-stamps for the bulk pulse template
fccd
fccd_in_um
Value of the full-charge-collection depth, if `None` no surface corrections are performed.
surface_library
2D array (distance, time) of the rate of charge arriving at the p-n junction. Each row
Expand All @@ -777,9 +780,9 @@ def maximum_current(
"""
# extract LGDO data and units

drift_time, _ = units.unwrap_lgdo(drift_time)
edep, _ = units.unwrap_lgdo(edep)
dist_to_nplus, _ = units.unwrap_lgdo(dist_to_nplus)
drift_time = units.units_conv_ak(drift_time, "ns")
edep = units.units_conv_ak(edep, "keV")
dist_to_nplus = units.units_conv_ak(dist_to_nplus, "um")

include_surface_effects = False

Expand Down Expand Up @@ -817,11 +820,12 @@ def maximum_current(

# return
if return_mode == "max_time":
return Array(time)
return ak.Array(time, attrs={"units": "ns"})
if return_mode == "current":
return Array(curr)
# current has no unit (depends on the template)
return ak.Array(curr)
if return_mode == "energy":
return Array(energy)
return ak.Array(energy, attrs={"units": "kev"})

msg = f"Return mode {return_mode} is not implemented."
raise ValueError(msg)
34 changes: 18 additions & 16 deletions src/reboost/hpge/surface.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,26 @@
import legendhpges
import numba
import numpy as np
from lgdo import VectorOfVectors
from lgdo.types import LGDO
from numpy.typing import ArrayLike
from numpy.typing import NDArray
from scipy import stats

from .. import units

log = logging.getLogger(__name__)


def distance_to_surface(
positions_x: VectorOfVectors,
positions_y: VectorOfVectors,
positions_z: VectorOfVectors,
positions_x: ak.Array,
positions_y: ak.Array,
positions_z: ak.Array,
hpge: legendhpges.base.HPGe,
det_pos: ArrayLike,
det_pos: ak.Array,
*,
surface_type: str | None = None,
unit: str = "mm",
distances_precompute: VectorOfVectors | None = None,
distances_precompute: ak.Array | None = None,
precompute_cutoff: float | None = None,
) -> VectorOfVectors:
) -> ak.Array:
"""Computes the distance from each step to the detector surface.

The calculation can be performed for any surface type `nplus`, `pplus`,
Expand All @@ -50,26 +50,24 @@ def distance_to_surface(
unit
unit for the hit tier positions table.
distances_precompute
VectorOfVectors of distance to any surface computed by remage.
Distance to any surface computed by remage.
precompute_cutoff
cutoff on distances_precompute to not compute the distance for (in mm)

Returns
-------
VectorOfVectors with the same shape as `positions_x/y/z` of the distance to the surface.
Distance to the surface for each hit with the same shape as `positions_x/y/z`.

Note
----
`positions_x/positions_y/positions_z` must all have the same shape.
"""
factor = np.array([1, 100, 1000])[unit == np.array(["mm", "cm", "m"])][0]

# compute local positions
pos = []
sizes = []

for idx, pos_tmp in enumerate([positions_x, positions_y, positions_z]):
local_pos_tmp = ak.Array(pos_tmp) * factor - det_pos[idx]
local_pos_tmp = units.units_conv_ak(pos_tmp, "mm") - det_pos[idx]
local_pos_flat_tmp = ak.flatten(local_pos_tmp).to_numpy()
pos.append(local_pos_flat_tmp)
sizes.append(ak.num(local_pos_tmp, axis=1))
Expand Down Expand Up @@ -106,7 +104,7 @@ def distance_to_surface(
local_positions[indices], surface_indices=surface_indices
)

return VectorOfVectors(ak.unflatten(distances, size), dtype=np.float32)
return ak.Array(ak.unflatten(distances, size), attrs={"units": "mm"})


@numba.njit(cache=True)
Expand Down Expand Up @@ -222,7 +220,7 @@ def get_surface_response(
factor: float = 0.29,
nsteps: int = 10000,
delta_x: float = 10,
):
) -> NDArray:
"""Extract the surface response current pulse based on diffusion.

This extracts the amount of charge arrived (cumulative) at the p-n
Expand All @@ -248,6 +246,10 @@ def get_surface_response(
the number of time steps.
delta_x
the width of each position bin.

Returns
-------
2D array of the amount of charge arriving at the p-n junction as a function of time for each depth.
"""
# number of position steps
nx = int(fccd / delta_x)
Expand Down
Loading
Loading