Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions genesis/engine/sensors/contact_force.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,25 @@ def _kernel_get_contacts_forces(
link_b: qd.types.ndarray(),
links_quat: qd.types.ndarray(),
sensors_link_idx: qd.types.ndarray(),
filter_links_idx: qd.types.ndarray(), # (n_sensors, max_filter_links); unused slots are -1
output: qd.types.ndarray(),
):
for i_c, i_s, i_b in qd.ndrange(link_a.shape[-1], sensors_link_idx.shape[-1], output.shape[-1]):
contact_data_link_a = link_a[i_b, i_c]
contact_data_link_b = link_b[i_b, i_c]
if contact_data_link_a == sensors_link_idx[i_s] or contact_data_link_b == sensors_link_idx[i_s]:
# When this sensor matches one side of the contact the "other" link is the counterpart; skip the
# contact for this sensor if the counterpart is in the sensor's filter list. (-1 padding entries
# never match a real link index.)
counterpart_a_filtered = 0 # other side (link_b) blacklisted, when this sensor is link_a
counterpart_b_filtered = 0 # other side (link_a) blacklisted, when this sensor is link_b
for i_f in range(filter_links_idx.shape[-1]):
f = filter_links_idx[i_s, i_f]
if f == contact_data_link_b:
counterpart_a_filtered = 1
if f == contact_data_link_a:
counterpart_b_filtered = 1

j_s = i_s * 3 # per-sensor output dimension is 3

quat_a = qd.Vector.zero(gs.qd_float, 4)
Expand All @@ -62,10 +75,10 @@ def _kernel_get_contacts_forces(
force_a = qd_inv_transform_by_quat(-force_vec, quat_a)
force_b = qd_inv_transform_by_quat(force_vec, quat_b)

if contact_data_link_a == sensors_link_idx[i_s]:
if contact_data_link_a == sensors_link_idx[i_s] and counterpart_a_filtered == 0:
for j in qd.static(range(3)):
output[j_s + j, i_b] += force_a[j]
if contact_data_link_b == sensors_link_idx[i_s]:
if contact_data_link_b == sensors_link_idx[i_s] and counterpart_b_filtered == 0:
for j in qd.static(range(3)):
output[j_s + j, i_b] += force_b[j]

Expand Down Expand Up @@ -201,6 +214,11 @@ class ContactForceSensorMetadata(RigidSensorMetadataMixin, ImperfectSensorMetada

min_force: torch.Tensor = make_tensor_field((0, 3))
max_force: torch.Tensor = make_tensor_field((0, 3))
# (num_force_sensors, max_num_filter_links); unused slots are -1.
filter_links_idx: torch.Tensor = make_tensor_field((0, 0), dtype_factory=lambda: gs.tc_int)
# Indices into links_idx of sensors that have at least one filter link. Lets the GT update skip the
# 4D contact-vs-filter comparison for the (typically larger) subset of sensors with no filter.
filtered_sensor_idx: torch.Tensor = make_tensor_field((0,), dtype_factory=lambda: gs.tc_int)


class ContactForceSensor(
Expand Down Expand Up @@ -230,6 +248,20 @@ def build(self):
self._shared_metadata.max_force, self._options.max_force, expand=(1, 3)
)

num_sensors, cur_num_filter_links = self._shared_metadata.filter_links_idx.shape
max_num_filter_links = max(cur_num_filter_links, len(self._options.filter_link_idx))
filter_links_idx = torch.full((num_sensors + 1, max_num_filter_links), -1, dtype=gs.tc_int, device=gs.device)
filter_links_idx[:num_sensors, :cur_num_filter_links] = self._shared_metadata.filter_links_idx
filter_links_idx[num_sensors, : len(self._options.filter_link_idx)] = torch.tensor(
self._options.filter_link_idx, dtype=gs.tc_int, device=gs.device
)
self._shared_metadata.filter_links_idx = filter_links_idx

if len(self._options.filter_link_idx) > 0:
self._shared_metadata.filtered_sensor_idx = concat_with_tensor(
self._shared_metadata.filtered_sensor_idx, num_sensors, expand=(1,), dim=0
)

def _get_return_format(self) -> tuple[int, ...]:
return (3,)

Expand Down Expand Up @@ -262,6 +294,15 @@ def _update_shared_ground_truth_cache(
# Forces are aggregated BEFORE moving them in local frame for efficiency
force_mask_a = link_a[:, None] == shared_metadata.links_idx[None, :, None]
force_mask_b = link_b[:, None] == shared_metadata.links_idx[None, :, None]
# Apply the (more expensive) filter-aware update only on sensors that declared a filter; other
# sensors keep the cheap masks computed above.
if shared_metadata.filtered_sensor_idx.numel() > 0:
filt = shared_metadata.filtered_sensor_idx
sub_filter = shared_metadata.filter_links_idx[filt][None, :, None, :]
filtered_a = (link_b[:, None, :, None] == sub_filter).any(dim=-1)
filtered_b = (link_a[:, None, :, None] == sub_filter).any(dim=-1)
force_mask_a[:, filt, :] = force_mask_a[:, filt, :] & ~filtered_a
force_mask_b[:, filt, :] = force_mask_b[:, filt, :] & ~filtered_b
force_mask = force_mask_b.to(dtype=gs.tc_float) - force_mask_a.to(dtype=gs.tc_float)
sensors_force = (force_mask[..., None] * force[:, None]).sum(dim=2)
sensors_quat = links_quat[:, shared_metadata.links_idx]
Expand All @@ -277,6 +318,7 @@ def _update_shared_ground_truth_cache(
link_b.contiguous(),
links_quat.contiguous(),
shared_metadata.links_idx,
shared_metadata.filter_links_idx,
shared_ground_truth_cache,
)

Expand Down
15 changes: 15 additions & 0 deletions genesis/options/sensors/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ class ContactForce(RigidSensorOptionsMixin["ContactForceSensor"], ImperfectSenso

Parameters
----------
filter_link_idx : array-like[int], optional
Global rigid link indices (solver link space). Contacts with the sensor link where the other
participant is one of these links are ignored (their force is not included). Default is empty
(no filtering).
min_force : float | array-like[float, float, float], optional
The minimum detectable absolute force per each axis. Values below this will be treated as 0. Default is 0.
max_force : float | array-like[float, float, float], optional
Expand All @@ -204,6 +208,8 @@ class ContactForce(RigidSensorOptionsMixin["ContactForceSensor"], ImperfectSenso
The scale factor for the debug force arrow. Defaults to 0.01.
"""

filter_link_idx: OptionalIArrayType = Field(default_factory=tuple)

resolution: LaxVec3FType = 0.0

min_force: LaxNonNegativeUnboundedVec3FType = 0.0
Expand All @@ -217,6 +223,15 @@ def model_post_init(self, context: Any) -> None:
if np.any(np.array(self.max_force) <= np.array(self.min_force)):
gs.raise_exception(f"min_force should be less than max_force, got: {self.min_force} and {self.max_force}")

def validate_scene(self, scene: "Scene"):
super().validate_scene(scene)
if self.filter_link_idx:
n_links = scene.sim.rigid_solver.n_links
if np.any(np.array(self.filter_link_idx) < 0) or np.any(np.array(self.filter_link_idx) >= n_links):
gs.raise_exception(
f"ContactForce sensor filter_link_idx should be in range [0, {n_links}). Got {self.filter_link_idx}"
)


class TemperatureProperties(NamedTuple):
"""
Expand Down
55 changes: 55 additions & 0 deletions tests/test_sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,61 @@ def test_contact_sensor_filter_link_idx(show_viewer):
assert filtered_data[1], "Contact sensor with filter_link_idx should still detect contact with the box"


@pytest.mark.required
def test_contact_force_sensor_filter_link_idx(show_viewer, tol):
"""ContactForce sensor filter_link_idx drops the force contributed by contacts with listed links."""
scene = gs.Scene(
sim_options=gs.options.SimOptions(
gravity=(0.0, 0.0, -10.0),
),
profiling_options=gs.options.ProfilingOptions(show_FPS=False),
show_viewer=show_viewer,
)
floor = scene.add_entity(morph=gs.morphs.Plane())
box_on_floor = scene.add_entity(
morph=gs.morphs.Box(
size=(0.2, 0.2, 0.2),
pos=(0.0, 0.0, 0.1),
),
)
box = scene.add_entity(
morph=gs.morphs.Box(
size=(0.2, 0.2, 0.2),
pos=(0.0, 0.5, 0.1),
),
)
sensor = scene.add_sensor(
gs.sensors.ContactForce(
entity_idx=box_on_floor.idx,
)
)
sensor_filtered = scene.add_sensor(
gs.sensors.ContactForce(
entity_idx=box_on_floor.idx,
filter_link_idx=(floor.link_start,),
)
)
scene.build(n_envs=2)
box.set_pos(
(
(0.0, 0.5, 0.1), # box not touching box_on_floor
(0.0, 0.0, 0.3), # box on top of box_on_floor
)
)
for _ in range(20): # make sure the boxes are stably resting
scene.step()
force = sensor.read()
force_filtered = sensor_filtered.read()
# env 0: box_on_floor only touches the floor.
assert torch.linalg.norm(force[0]) > 1.0, "ContactForce should report the floor support force"
assert force[0][2] > 1.0, "the floor pushes box_on_floor up (+z)"
assert torch.linalg.norm(force_filtered[0]) < tol, "filtering the floor (the only contact) leaves zero force"
# env 1: box_on_floor touches the floor (below) and the box (above).
assert torch.linalg.norm(force[1]) > 1.0, "ContactForce should report a non-zero net contact force"
assert force_filtered[1][2] < -1.0, "filtering the floor leaves only the box-on-top contact, which pushes down (-z)"
assert not torch.allclose(force[1], force_filtered[1], atol=tol), "filtering the floor should change the result"


# ------------------------------------------------------------------------------------------
# ------------------------------------ Raycast Sensors -------------------------------------
# ------------------------------------------------------------------------------------------
Expand Down