Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions movement/kinematics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
)
from movement.kinematics.kinetic_energy import compute_kinetic_energy
from movement.kinematics.path import (
compute_directional_change,
compute_path_length,
compute_path_straightness,
compute_turning_angle,
Expand All @@ -30,6 +31,7 @@
"compute_speed",
"compute_path_length",
"compute_path_straightness",
"compute_directional_change",
"compute_time_derivative",
"compute_pairwise_distances",
"compute_forward_vector",
Expand Down
99 changes: 98 additions & 1 deletion movement/kinematics/path.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Compute path-level metrics such as path length and straightness.
"""Compute path metrics: length, straightness, and directional change.

By 'path' we refer to the spatial trajectory of an individual over the
time span of the data. While these metrics can be computed based on any
Expand Down Expand Up @@ -296,6 +296,103 @@ def compute_turning_angle(
return turning


def compute_directional_change(
data: xr.DataArray,
in_degrees: bool = False,
min_step_length: float = 0.0,
) -> xr.DataArray:
r"""Compute the directional change (DC) per time step.

The directional change at step :math:`i` is the absolute turning
angle divided by the temporal interval spanning the two steps that
define it [1]_:

.. math::
\mathrm{DC}_i = \frac{|\theta_i|}{\Delta t_i}

where :math:`\theta_i` is the signed turning angle at step :math:`i`
and :math:`\Delta t_i = t_i - t_{i-2}`.

Parameters
----------
data : xarray.DataArray
The input data containing position information, with ``time``
and ``space`` (in Cartesian coordinates) as required dimensions.
in_degrees : bool, optional
If ``True``, the turning angles (and hence the directional
change) are expressed in degrees rather than radians. Defaults
to ``False``.
min_step_length : float, optional
The minimum step length used when computing turning angles.
Steps shorter than or equal to this value produce ``NaN``
turning angles, which propagate to ``NaN`` directional change
values. The default ``0.0`` only masks steps with exactly
zero length; near-zero steps from positional jitter may still
produce spurious turning angles. See
:func:`compute_turning_angle` for details.

Returns
-------
xarray.DataArray
Directional change values with the same dimensions as the
input, except ``space`` is removed. Values are in radians per
``time`` unit (e.g. radians/second if ``time`` is in seconds),
or degrees per ``time`` unit if ``in_degrees`` is ``True``.

Notes
-----
**Boundary behaviour:** The first two time steps of the output are
always ``NaN``, because a turning angle requires three consecutive
positions (see :func:`compute_turning_angle`).

References
----------
.. [1] Kitamura, T. & Imafuku, M. (2015). Behavioural mimicry in
flight path of Batesian intraspecific polymorphic butterfly
*Papilio polytes*. *Proc. R. Soc. B* 282(1809).
https://doi.org/10.1098/rspb.2015.0483

See Also
--------
:func:`compute_turning_angle` :
The underlying function used to compute turning angles.

Examples
--------
>>> from movement.kinematics import compute_directional_change

Compute directional change from the centroid trajectory of a
poses dataset ``ds``:

>>> centroid = ds.position.mean(dim="keypoint")
>>> dc = compute_directional_change(centroid)

Compute over a specific time window:

>>> dc = compute_directional_change(centroid.sel(time=slice(0, 100)))

Filter out pose-estimation jitter by setting ``min_step_length``:

>>> dc = compute_directional_change(centroid, min_step_length=3)

"""
data = _validate_time_points(data, "directional change")

theta = compute_turning_angle(
data, in_degrees=in_degrees, min_step_length=min_step_length
)

time_coord = data["time"]
# Span the two steps that define theta_i (positions i-2..i), matching
# compute_turning_angle's support so DC is correct for non-uniform time.
dt = time_coord - time_coord.shift(time=2)

dc = abs(theta) / dt
dc.name = "directional_change"
dc.attrs["long_name"] = "Directional Change"
return dc


def _validate_time_points(
data: xr.DataArray,
metric_name: str,
Expand Down
144 changes: 104 additions & 40 deletions tests/test_unit/test_kinematics/test_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,32 @@
import xarray as xr

from movement.kinematics import (
compute_directional_change,
compute_path_length,
compute_path_straightness,
compute_turning_angle,
)

# Shared by all metrics that require at least 2 time points.
time_points_value_error = pytest.raises(
ValueError,
match="At least 2 time points are required",
)

# Pre-sliced time-range cases shared by the straightness and directional
# change tests, which validate via the same minimum-time-points check.
time_range_cases = [
Comment thread
niksirbi marked this conversation as resolved.
pytest.param(slice(None, None), does_not_raise(), id="full-range"),
pytest.param(slice(0, 9), does_not_raise(), id="explicit-full-range"),
pytest.param(slice(1, 8), does_not_raise(), id="partial-range"),
pytest.param(
slice(9, 0), time_points_value_error, id="start-greater-than-stop"
),
pytest.param(
slice(0, 0.5), time_points_value_error, id="too-few-time-points"
),
]

# ─────────────────────────────────────────────
# Fixtures
# ─────────────────────────────────────────────
Expand Down Expand Up @@ -98,11 +119,6 @@ def sharp_turn_paths(straight_paths):
# Path length tests
# ─────────────────────────────────────────────

time_points_value_error = pytest.raises(
ValueError,
match="At least 2 time points are required to compute path length",
)


@pytest.mark.parametrize(
"time_slice, expected_exception",
Expand Down Expand Up @@ -288,42 +304,8 @@ def test_path_length_nan_warn_threshold(
# Path straightness tests
# ─────────────────────────────────────────────

time_points_value_error_straightness = pytest.raises(
ValueError,
match=("At least 2 time points are required to compute path straightness"),
)


@pytest.mark.parametrize(
"time_slice, expected_exception",
[
pytest.param(
slice(None, None),
does_not_raise(),
id="full-range",
),
pytest.param(
slice(0, 9),
does_not_raise(),
id="explicit-full-range",
),
pytest.param(
slice(1, 8),
does_not_raise(),
id="partial-range",
),
pytest.param(
slice(9, 0),
time_points_value_error_straightness,
id="start-greater-than-stop",
),
pytest.param(
slice(0, 0.5),
time_points_value_error_straightness,
id="too-few-time-points",
),
],
)
@pytest.mark.parametrize("time_slice, expected_exception", time_range_cases)
def test_path_straightness_across_time_ranges(
valid_poses_dataset, time_slice, expected_exception
):
Expand Down Expand Up @@ -633,3 +615,85 @@ def test_turning_angle_stationary_keypoint_independent_masking():
# Stationary keypoint (kp_1): should be all NaN
angles_kp1 = angles.isel(keypoint=1)
assert np.all(np.isnan(angles_kp1.values))


# ─────────────────────────────────────────────
# Directional change tests
# ─────────────────────────────────────────────


@pytest.mark.parametrize(
"fixture_name, expected_value, expected_all_nan",
[
pytest.param("straight_paths", 0.0, False, id="straight-line"),
pytest.param("stationary_paths", None, True, id="stationary"),
],
)
def test_directional_change_known_values(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests only assert DC = 0 (straight) and all-NaN (stationary). Neither exercises a known nonzero DC value, so neither the radians-per-time scaling nor the dt alignment is verified. A sharp_turn_paths fixture already exists is this file, btw. A case asserting a known nonzero DC, ideally with non-uniform time coords, would have caught the alignment issue.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, for better coverage, added test_directional_change_nonzero_value
Also parametrized over in_degrees, so the radians scaling is pinned down too

request, fixture_name, expected_value, expected_all_nan
):
"""Test directional change for trajectories with known geometry.

Straight-line motion produces zero turning angle, so DC is 0 at
every valid time step. Stationary paths produce NaN turning angles,
so DC is NaN everywhere.
"""
position = request.getfixturevalue(fixture_name)
dc = compute_directional_change(position)
assert dc.name == "directional_change"
assert dc.attrs["long_name"] == "Directional Change"

if expected_all_nan:
assert dc.isnull().all()
else:
valid = dc.isel(time=slice(2, None))
xr.testing.assert_allclose(valid, xr.full_like(valid, expected_value))
# Only the first two time steps are NaN.
assert dc.isel(time=[0, 1]).isnull().all()


@pytest.mark.parametrize(
"in_degrees, expected_turn",
[
pytest.param(False, 3 * np.pi / 4, id="radians"),
pytest.param(True, 135.0, id="degrees"),
],
)
def test_directional_change_nonzero_value(
sharp_turn_paths, in_degrees, expected_turn
):
"""Test DC against a known nonzero value on non-uniform time.

In ``sharp_turn_paths`` both individuals move in a straight line for
8 steps, then turn sharply on the final step, producing a turning
angle of ``3 * pi / 4`` (135 degrees) at the last time step.
Dividing by the turning interval ``t[-1] - t[-3]`` gives the
expected DC there. The non-uniform ``time`` coordinates ensure the
interval is aligned to the turning angle's support (positions
``i-2..i``) rather than a centered difference around ``i``.
"""
time = np.array([0, 1, 2, 4, 7, 11, 16, 22, 29, 37], dtype=float)
path = sharp_turn_paths.assign_coords(time=time)

dc = compute_directional_change(path, in_degrees=in_degrees)

expected_last = expected_turn / (time[-1] - time[-3])
last = dc.isel(time=-1)
xr.testing.assert_allclose(last, xr.full_like(last, expected_last))
# Only the first two time steps are NaN; the last step is now valid.
assert dc.isel(time=[0, 1]).isnull().all()
assert dc.isel(time=slice(2, None)).notnull().all()


@pytest.mark.parametrize("time_slice, expected_exception", time_range_cases)
def test_directional_change_across_time_ranges(
valid_poses_dataset, time_slice, expected_exception
):
"""Test that DC raises with too few time points, and works
otherwise.
"""
position = valid_poses_dataset.position.sel(time=time_slice)
with expected_exception:
dc = compute_directional_change(position)
assert dc.name == "directional_change"
assert dc.attrs["long_name"] == "Directional Change"
Loading