Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions source/isaaclab/docs/CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
Changelog
---------

0.46.2 (2025-09-15)
~~~~~~~~~~~~~~~~~~~

Added
^^^^^^^

* Added argument :attr:`is_single_shot` to :class:`~isaaclab.managers.EventTermCfg` to control whether the event is only applied once.


0.46.1 (2025-09-10)
~~~~~~~~~~~~~~~~~~~

Expand Down
15 changes: 11 additions & 4 deletions source/isaaclab/isaaclab/managers/event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,17 +212,24 @@ def apply(
# note: we compare with a small value to handle floating point errors
if term_cfg.is_global_time:
if time_left < 1e-6:
lower, upper = term_cfg.interval_range_s
sampled_interval = torch.rand(1) * (upper - lower) + lower
if not term_cfg.is_single_shot:
lower, upper = term_cfg.interval_range_s
sampled_interval = torch.rand(1) * (upper - lower) + lower
else:
sampled_interval = float("inf")
self._interval_term_time_left[index][:] = sampled_interval

# call the event term (with None for env_ids)
term_cfg.func(self._env, None, **term_cfg.params)
else:
valid_env_ids = (time_left < 1e-6).nonzero().flatten()
if len(valid_env_ids) > 0:
lower, upper = term_cfg.interval_range_s
sampled_time = torch.rand(len(valid_env_ids), device=self.device) * (upper - lower) + lower
if not term_cfg.is_single_shot:
lower, upper = term_cfg.interval_range_s
sampled_time = torch.rand(len(valid_env_ids), device=self.device) * (upper - lower) + lower
else:
sampled_time = float("inf")

self._interval_term_time_left[index][valid_env_ids] = sampled_time

# call the event term
Expand Down
8 changes: 8 additions & 0 deletions source/isaaclab/isaaclab/managers/manager_term_cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,14 @@ class EventTermCfg(ManagerTermBaseCfg):
This is only used if the mode is ``"reset"``.
"""

is_single_shot: bool = False
"""Whether the event is only applied once. Defaults to False.

If True, the event is only applied once when the condition is met. After that, it is never applied again until
the environment is reset. This is useful for events that should only happen once, such as a one-time
perturbation.
"""


##
# Reward manager.
Expand Down
96 changes: 96 additions & 0 deletions source/isaaclab/test/managers/test_event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,102 @@ def test_apply_interval_mode_with_global_time(env):
term_2_interval_time = event_man._interval_term_time_left[1].clone()


def test_apply_interval_mode_with_single_shot(env):
"""Test the application of event terms that are in interval mode without global time and single shot event.

During local time, each environment instance has its own time for the interval term. Single shot events
only happen once when the condition is met.
"""
# make two intervals -- one is fixed and the other is random
term_1_interval_range_s = (10 * env.dt, 10 * env.dt)
term_2_interval_range_s = (2 * env.dt, 10 * env.dt)

cfg = {
"term_1": EventTermCfg(
func=increment_dummy1_by_one,
mode="interval",
interval_range_s=term_1_interval_range_s,
is_global_time=False,
is_single_shot=True,
),
"term_2": EventTermCfg(
func=increment_dummy2_by_one,
mode="interval",
interval_range_s=term_2_interval_range_s,
is_global_time=False,
is_single_shot=True,
),
}

event_man = EventManager(cfg, env)

# obtain the initial time left for the interval terms
term_2_interval_time = event_man._interval_term_time_left[1].clone()
expected_dummy2_value = torch.zeros_like(env.dummy2)

for count in range(50):
# apply the event terms
event_man.apply("interval", dt=env.dt)
# Make sure dummy1 is only incremented once when the interval time is reached
torch.testing.assert_close(
env.dummy1, ((count + 1) * env.dt >= term_1_interval_range_s[1]) * torch.ones_like(env.dummy1)
)

# Make sure dummy2 is only incremented once when the interval time is reached
expected_dummy2_value = torch.zeros_like(env.dummy2) + (
(count + 1) * env.dt >= term_2_interval_time
).float().unsqueeze(1)
torch.testing.assert_close(env.dummy2, expected_dummy2_value)


def test_apply_interval_mode_with_single_shot_global_time(env):
"""Test the application of event terms that are in interval mode with global time and single shot event.

During global time, all the environment instances share the same time for the interval term. Single shot events
only happen once when the condition is met.
"""
# make two intervals -- one is fixed and the other is random
term_1_interval_range_s = (10 * env.dt, 10 * env.dt)
term_2_interval_range_s = (2 * env.dt, 10 * env.dt)

cfg = {
"term_1": EventTermCfg(
func=increment_dummy1_by_one,
mode="interval",
interval_range_s=term_1_interval_range_s,
is_global_time=True,
is_single_shot=True,
),
"term_2": EventTermCfg(
func=increment_dummy2_by_one,
mode="interval",
interval_range_s=term_2_interval_range_s,
is_global_time=True,
is_single_shot=True,
),
}

event_man = EventManager(cfg, env)

# obtain the initial time left for the interval terms
term_1_interval_time = event_man._interval_term_time_left[0].clone()
term_2_interval_time = event_man._interval_term_time_left[1].clone()
expected_dummy2_value = torch.zeros_like(env.dummy2)

for count in range(50):
# apply the event terms
event_man.apply("interval", dt=env.dt)
# check the values
# we increment the dummy1 by 1 at the fixed interval. Afterwards the event should not be triggered again
torch.testing.assert_close(
env.dummy1, ((count + 1) * env.dt >= term_1_interval_time) * torch.ones_like(env.dummy1)
)

# we increment the dummy2 by 1 at the random interval. Afterwards the event should not be triggered again
expected_dummy2_value = torch.zeros_like(env.dummy2) + ((count + 1) * env.dt >= term_2_interval_time).float()
torch.testing.assert_close(env.dummy2, expected_dummy2_value)


def test_apply_reset_mode(env):
"""Test the application of event terms that are in reset mode."""
cfg = {
Expand Down
Loading