diff --git a/movement/kinematics/__init__.py b/movement/kinematics/__init__.py index fd6de1b14..5f2d3667b 100644 --- a/movement/kinematics/__init__.py +++ b/movement/kinematics/__init__.py @@ -16,6 +16,7 @@ ) from movement.kinematics.kinetic_energy import compute_kinetic_energy from movement.kinematics.path import ( + compute_directional_change, compute_path_length, compute_path_straightness, compute_turning_angle, @@ -30,6 +31,7 @@ "compute_speed", "compute_path_length", "compute_path_straightness", + "compute_directional_change", "compute_time_derivative", "compute_pairwise_distances", "compute_forward_vector", diff --git a/movement/kinematics/path.py b/movement/kinematics/path.py index ca3c2d896..7b99e808e 100644 --- a/movement/kinematics/path.py +++ b/movement/kinematics/path.py @@ -1,4 +1,4 @@ -"""Compute path-level metrics such as path length and straightness. +"""Compute path metrics: length, straightness, and directional change. By 'path' we refer to the spatial trajectory of an individual over the time span of the data. While these metrics can be computed based on any @@ -296,6 +296,103 @@ def compute_turning_angle( return turning +def compute_directional_change( + data: xr.DataArray, + in_degrees: bool = False, + min_step_length: float = 0.0, +) -> xr.DataArray: + r"""Compute the directional change (DC) per time step. + + The directional change at step :math:`i` is the absolute turning + angle divided by the temporal interval spanning the two steps that + define it [1]_: + + .. math:: + \mathrm{DC}_i = \frac{|\theta_i|}{\Delta t_i} + + where :math:`\theta_i` is the signed turning angle at step :math:`i` + and :math:`\Delta t_i = t_i - t_{i-2}`. + + Parameters + ---------- + data : xarray.DataArray + The input data containing position information, with ``time`` + and ``space`` (in Cartesian coordinates) as required dimensions. + in_degrees : bool, optional + If ``True``, the turning angles (and hence the directional + change) are expressed in degrees rather than radians. Defaults + to ``False``. + min_step_length : float, optional + The minimum step length used when computing turning angles. + Steps shorter than or equal to this value produce ``NaN`` + turning angles, which propagate to ``NaN`` directional change + values. The default ``0.0`` only masks steps with exactly + zero length; near-zero steps from positional jitter may still + produce spurious turning angles. See + :func:`compute_turning_angle` for details. + + Returns + ------- + xarray.DataArray + Directional change values with the same dimensions as the + input, except ``space`` is removed. Values are in radians per + ``time`` unit (e.g. radians/second if ``time`` is in seconds), + or degrees per ``time`` unit if ``in_degrees`` is ``True``. + + Notes + ----- + **Boundary behaviour:** The first two time steps of the output are + always ``NaN``, because a turning angle requires three consecutive + positions (see :func:`compute_turning_angle`). + + References + ---------- + .. [1] Kitamura, T. & Imafuku, M. (2015). Behavioural mimicry in + flight path of Batesian intraspecific polymorphic butterfly + *Papilio polytes*. *Proc. R. Soc. B* 282(1809). + https://doi.org/10.1098/rspb.2015.0483 + + See Also + -------- + :func:`compute_turning_angle` : + The underlying function used to compute turning angles. + + Examples + -------- + >>> from movement.kinematics import compute_directional_change + + Compute directional change from the centroid trajectory of a + poses dataset ``ds``: + + >>> centroid = ds.position.mean(dim="keypoint") + >>> dc = compute_directional_change(centroid) + + Compute over a specific time window: + + >>> dc = compute_directional_change(centroid.sel(time=slice(0, 100))) + + Filter out pose-estimation jitter by setting ``min_step_length``: + + >>> dc = compute_directional_change(centroid, min_step_length=3) + + """ + data = _validate_time_points(data, "directional change") + + theta = compute_turning_angle( + data, in_degrees=in_degrees, min_step_length=min_step_length + ) + + time_coord = data["time"] + # Span the two steps that define theta_i (positions i-2..i), matching + # compute_turning_angle's support so DC is correct for non-uniform time. + dt = time_coord - time_coord.shift(time=2) + + dc = abs(theta) / dt + dc.name = "directional_change" + dc.attrs["long_name"] = "Directional Change" + return dc + + def _validate_time_points( data: xr.DataArray, metric_name: str, diff --git a/tests/test_unit/test_kinematics/test_path.py b/tests/test_unit/test_kinematics/test_path.py index c2deb9463..b6a06d052 100644 --- a/tests/test_unit/test_kinematics/test_path.py +++ b/tests/test_unit/test_kinematics/test_path.py @@ -6,11 +6,32 @@ import xarray as xr from movement.kinematics import ( + compute_directional_change, compute_path_length, compute_path_straightness, compute_turning_angle, ) +# Shared by all metrics that require at least 2 time points. +time_points_value_error = pytest.raises( + ValueError, + match="At least 2 time points are required", +) + +# Pre-sliced time-range cases shared by the straightness and directional +# change tests, which validate via the same minimum-time-points check. +time_range_cases = [ + pytest.param(slice(None, None), does_not_raise(), id="full-range"), + pytest.param(slice(0, 9), does_not_raise(), id="explicit-full-range"), + pytest.param(slice(1, 8), does_not_raise(), id="partial-range"), + pytest.param( + slice(9, 0), time_points_value_error, id="start-greater-than-stop" + ), + pytest.param( + slice(0, 0.5), time_points_value_error, id="too-few-time-points" + ), +] + # ───────────────────────────────────────────── # Fixtures # ───────────────────────────────────────────── @@ -98,11 +119,6 @@ def sharp_turn_paths(straight_paths): # Path length tests # ───────────────────────────────────────────── -time_points_value_error = pytest.raises( - ValueError, - match="At least 2 time points are required to compute path length", -) - @pytest.mark.parametrize( "time_slice, expected_exception", @@ -288,42 +304,8 @@ def test_path_length_nan_warn_threshold( # Path straightness tests # ───────────────────────────────────────────── -time_points_value_error_straightness = pytest.raises( - ValueError, - match=("At least 2 time points are required to compute path straightness"), -) - -@pytest.mark.parametrize( - "time_slice, expected_exception", - [ - pytest.param( - slice(None, None), - does_not_raise(), - id="full-range", - ), - pytest.param( - slice(0, 9), - does_not_raise(), - id="explicit-full-range", - ), - pytest.param( - slice(1, 8), - does_not_raise(), - id="partial-range", - ), - pytest.param( - slice(9, 0), - time_points_value_error_straightness, - id="start-greater-than-stop", - ), - pytest.param( - slice(0, 0.5), - time_points_value_error_straightness, - id="too-few-time-points", - ), - ], -) +@pytest.mark.parametrize("time_slice, expected_exception", time_range_cases) def test_path_straightness_across_time_ranges( valid_poses_dataset, time_slice, expected_exception ): @@ -633,3 +615,85 @@ def test_turning_angle_stationary_keypoint_independent_masking(): # Stationary keypoint (kp_1): should be all NaN angles_kp1 = angles.isel(keypoint=1) assert np.all(np.isnan(angles_kp1.values)) + + +# ───────────────────────────────────────────── +# Directional change tests +# ───────────────────────────────────────────── + + +@pytest.mark.parametrize( + "fixture_name, expected_value, expected_all_nan", + [ + pytest.param("straight_paths", 0.0, False, id="straight-line"), + pytest.param("stationary_paths", None, True, id="stationary"), + ], +) +def test_directional_change_known_values( + request, fixture_name, expected_value, expected_all_nan +): + """Test directional change for trajectories with known geometry. + + Straight-line motion produces zero turning angle, so DC is 0 at + every valid time step. Stationary paths produce NaN turning angles, + so DC is NaN everywhere. + """ + position = request.getfixturevalue(fixture_name) + dc = compute_directional_change(position) + assert dc.name == "directional_change" + assert dc.attrs["long_name"] == "Directional Change" + + if expected_all_nan: + assert dc.isnull().all() + else: + valid = dc.isel(time=slice(2, None)) + xr.testing.assert_allclose(valid, xr.full_like(valid, expected_value)) + # Only the first two time steps are NaN. + assert dc.isel(time=[0, 1]).isnull().all() + + +@pytest.mark.parametrize( + "in_degrees, expected_turn", + [ + pytest.param(False, 3 * np.pi / 4, id="radians"), + pytest.param(True, 135.0, id="degrees"), + ], +) +def test_directional_change_nonzero_value( + sharp_turn_paths, in_degrees, expected_turn +): + """Test DC against a known nonzero value on non-uniform time. + + In ``sharp_turn_paths`` both individuals move in a straight line for + 8 steps, then turn sharply on the final step, producing a turning + angle of ``3 * pi / 4`` (135 degrees) at the last time step. + Dividing by the turning interval ``t[-1] - t[-3]`` gives the + expected DC there. The non-uniform ``time`` coordinates ensure the + interval is aligned to the turning angle's support (positions + ``i-2..i``) rather than a centered difference around ``i``. + """ + time = np.array([0, 1, 2, 4, 7, 11, 16, 22, 29, 37], dtype=float) + path = sharp_turn_paths.assign_coords(time=time) + + dc = compute_directional_change(path, in_degrees=in_degrees) + + expected_last = expected_turn / (time[-1] - time[-3]) + last = dc.isel(time=-1) + xr.testing.assert_allclose(last, xr.full_like(last, expected_last)) + # Only the first two time steps are NaN; the last step is now valid. + assert dc.isel(time=[0, 1]).isnull().all() + assert dc.isel(time=slice(2, None)).notnull().all() + + +@pytest.mark.parametrize("time_slice, expected_exception", time_range_cases) +def test_directional_change_across_time_ranges( + valid_poses_dataset, time_slice, expected_exception +): + """Test that DC raises with too few time points, and works + otherwise. + """ + position = valid_poses_dataset.position.sel(time=time_slice) + with expected_exception: + dc = compute_directional_change(position) + assert dc.name == "directional_change" + assert dc.attrs["long_name"] == "Directional Change"