Skip to content

Commit f9ea5b7

Browse files
committed
feat: added example for ompl and cleaned up ompl code
1 parent fd5d69b commit f9ea5b7

File tree

3 files changed

+176
-6
lines changed

3 files changed

+176
-6
lines changed

examples/fr3/grasp_ompl_demo.py

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
import logging
2+
import pathlib
3+
from typing import Any, cast
4+
5+
import gymnasium as gym
6+
import mujoco
7+
import numpy as np
8+
from rcs._core.common import Pose
9+
from rcs.envs.base import ControlMode, GripperWrapper, RobotEnv
10+
from rcs.envs.creators import FR3SimplePickUpSimEnvCreator
11+
from rcs.ompl.mj_ompl import MjOMPL
12+
13+
import rcs
14+
15+
logger = logging.getLogger(__name__)
16+
logger.setLevel(logging.INFO)
17+
18+
RECORD_PATH = "/home/sbien/Documents/Development/RCS/internal_rcs/test_data"
19+
SCENE_ROOT = pathlib.Path("~", "Documents", "Development", "RCS", "models", "scenes", "viral").expanduser()
20+
SCENE_FILE = pathlib.Path(SCENE_ROOT, "scene.xml")
21+
CAMERAS = [
22+
"wrist",
23+
"arro",
24+
"bird_eye",
25+
"full_side",
26+
"front_up",
27+
"front_down",
28+
"side_right",
29+
"side_left_up",
30+
"back_right_up",
31+
"back_center",
32+
"back_left",
33+
"side_opposite",
34+
"side_center",
35+
]
36+
37+
38+
class OmplTrajectoryDemo:
39+
def __init__(self, env: gym.Env, planner: MjOMPL):
40+
self.env = env
41+
self.unwrapped: RobotEnv = cast(RobotEnv, self.env.unwrapped)
42+
self.home_pose: Pose = self.unwrapped.robot.get_cartesian_position()
43+
self.home_qpos: np.ndarray = self.unwrapped.robot.get_joint_position()
44+
self.sol_path = None
45+
self.planner = planner
46+
47+
def _action(self, pose: Pose, gripper: float) -> dict[str, Any]:
48+
return {"xyzrpy": pose.xyzrpy(), "gripper": gripper}
49+
50+
def _jaction(self, joints: np.ndarray, gripper: float) -> dict[str, Any]:
51+
return {"joints": joints, "gripper": gripper}
52+
53+
def get_object_pose(self, geom_name) -> Pose:
54+
model = self.env.get_wrapper_attr("sim").model
55+
data = self.env.get_wrapper_attr("sim").data
56+
57+
geom_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_GEOM, geom_name)
58+
obj_pose_world_coordinates = Pose(
59+
translation=data.geom_xpos[geom_id], rotation=data.geom_xmat[geom_id].reshape(3, 3)
60+
) * Pose(
61+
rpy_vector=np.array([0, 0, np.pi]), translation=[0, 0, 0] # type: ignore
62+
)
63+
return self.unwrapped.robot.to_pose_in_robot_coordinates(obj_pose_world_coordinates)
64+
65+
def plan_path_to_object(self, obj_name: str, delta_up):
66+
self.move_home()
67+
obj_pose_og = self.get_object_pose(geom_name=obj_name)
68+
obj_pose = obj_pose_og * Pose(
69+
translation=np.array([0, 0, delta_up]), quaternion=np.array([1, 0, 0, 0]) # type: ignore
70+
) # type: ignore
71+
goal_q = self.planner.ik(obj_pose, None)
72+
if goal_q is None:
73+
return False
74+
sol, path = self.planner.plan(goal=goal_q, start=self.home_qpos)
75+
if not sol:
76+
return False
77+
for waypoint in path:
78+
self.step(self._jaction(waypoint, GripperWrapper.BINARY_GRIPPER_OPEN))
79+
return True
80+
81+
def approach_and_grasp(self, obj_name: str, delta_up: float = 0.2):
82+
obj_pose_og = self.get_object_pose(geom_name=obj_name)
83+
84+
obj_pose_grasp = obj_pose_og * Pose(translation=np.array([0, 0, delta_up]), quaternion=np.array([1, 0, 0, 0])) # type: ignore
85+
waypoints = self.generate_waypoints(
86+
start_pose=self.unwrapped.robot.get_cartesian_position(), end_pose=obj_pose_grasp, num_waypoints=5
87+
)
88+
for waypoint in waypoints:
89+
self.step(self._jaction(waypoint, GripperWrapper.BINARY_GRIPPER_OPEN)) # type: ignore
90+
for _ in range(3): # Required for simulation stability
91+
self.step(self._jaction(waypoints[-1], GripperWrapper.BINARY_GRIPPER_CLOSED)) # type: ignore
92+
93+
def generate_waypoints(self, start_pose: Pose, end_pose: Pose, num_waypoints: int) -> list[Pose]:
94+
start_q = self.planner.ik(start_pose, None)
95+
start_q = start_q[:7] # ignore finger joints
96+
waypoints = [start_q]
97+
for i in range(num_waypoints + 1):
98+
t = i / (num_waypoints)
99+
waypoints.append(self.planner.ik(start_pose.interpolate(end_pose, t), None)[:7]) # ignore finger joints
100+
return waypoints
101+
102+
def step(self, action: dict) -> dict:
103+
return self.env.step(action)[0]
104+
105+
def execute_motion(self, waypoints: list[Pose], gripper: float = GripperWrapper.BINARY_GRIPPER_OPEN) -> dict:
106+
for i in range(len(waypoints)):
107+
obs = self.step(self._jaction(waypoints[i], gripper)) # type: ignore
108+
return obs
109+
110+
def move_home(self):
111+
end_eff_pose = self.unwrapped.robot.get_cartesian_position()
112+
waypoints = self.generate_waypoints(end_eff_pose, self.home_pose, num_waypoints=15)
113+
self.execute_motion(waypoints=waypoints, gripper=GripperWrapper.BINARY_GRIPPER_CLOSED)
114+
115+
116+
def main():
117+
# Make an environment as usual
118+
env = FR3SimplePickUpSimEnvCreator()(render_mode="human", delta_actions=False, control_mode=ControlMode.JOINTS)
119+
robot_tcp = rcs.common.Pose(
120+
translation=np.array([0.0, 0.0, 0.1034]), # type: ignore
121+
rotation=np.array([[0.707, 0.707, 0], [-0.707, 0.707, 0], [0, 0, 1]]), # type: ignore
122+
)
123+
planner = "RRTConnect" # one of ["PRM","RRT","RRTConnect","RRTstar","EST","FMT","BITstar"]
124+
125+
for _ in range(50):
126+
env.reset()
127+
128+
# For each episode, we create a new planner instance to reset its space information.
129+
ompl_planner = MjOMPL(
130+
robot_name="fr3", # Name is only for logging purposes.
131+
robot_env=env, # Pass the environment created above to the planner.
132+
njoints=7, # Specify the number of joints
133+
robot_tcp=robot_tcp, # Specify the TCP of the robot to be planned around.
134+
robot_xml_name=rcs.scenes[
135+
"fr3_simple_pick_up"
136+
].mjcf_robot, # Path to the robot xml file (NOT the scene.xml)
137+
robot_root_name="base_0", # Name of the robot root body in the xml
138+
robot_joint_name="fr3_joint#_0", # Joint name pattern of the robot in the xml, where # is [1~njoints]
139+
robot_actuator_name="fr3_joint#_0", # Actuator name pattern, used to ensure that we only plan for actuated joints.
140+
obstacle_body_names=[], # What MuJoCo bodies to consider as obstacles
141+
obstacle_geom_names=["floor", "box_geom"], # What MuJoCo geoms to consider as obstacles
142+
interpolate_num=50, # Number of interpolation points between start and goal configuration
143+
)
144+
ompl_planner.set_planner(planner)
145+
controller = OmplTrajectoryDemo(env, ompl_planner)
146+
147+
# Since the target object is part of the collision to be checked, we need sufficient clearance (delta_up=0.1)
148+
# for the planner to find a solution.
149+
success = controller.plan_path_to_object("box_geom", delta_up=0.05)
150+
if success:
151+
logger.info(f"Successfully planned with {planner}.")
152+
# Once successful, we approach the object and grasp it.
153+
controller.approach_and_grasp("box_geom", delta_up=0.01)
154+
# And then return to home position.
155+
controller.move_home()
156+
else:
157+
logger.warning(f"No solution found. Failed planning with {planner}.")
158+
continue
159+
# Since we are creating the planner object at each loop to reset its space information, we delete it here.
160+
# This also frees the memory and prevents a potential crash.
161+
ompl_planner = None
162+
logger.info("Finished all episodes.")
163+
env.close()
164+
165+
166+
if __name__ == "__main__":
167+
main()

python/rcs/envs/sim.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ def reset(
381381
iso_cube = np.array([0.498, 0.0, 0.226])
382382
iso_cube_pose = rcs.common.Pose(translation=np.array(iso_cube), rpy_vector=np.array([0, 0, 0])) # type: ignore
383383
iso_cube = self.unwrapped.robot.to_pose_in_world_coordinates(iso_cube_pose).translation()
384-
pos_z = 0.0288 / 2
384+
pos_z = 0.0288
385385
pos_x = iso_cube[0] + np.random.random() * 0.2 - 0.1
386386
pos_y = iso_cube[1] + np.random.random() * 0.2 - 0.1
387387

python/rcs/ompl/mj_ompl.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ def __init__(
6262
robot_name: str,
6363
env: RobotEnv,
6464
njoints: int = 7,
65+
robot_tcp: Pose | None = None,
6566
robot_xml_name: str = "",
6667
robot_root_name: str = "base_0",
6768
robot_joint_name: str = "fr3_joint#_0",
@@ -99,6 +100,7 @@ def __init__(
99100
), f"Robot XML file {robot_xml_name} does not exist. Please provide a valid path."
100101

101102
self.robot_name = robot_name
103+
self.tcp_offset = robot_tcp if robot_tcp is not None else self.franka_hand_tcp
102104
self.env = env
103105
self.model = self.env.get_wrapper_attr("sim").model
104106
self.data = self.env.get_wrapper_attr("sim").data
@@ -268,7 +270,7 @@ def get_joint_positions(self):
268270
# Deepcopy might not be necessary
269271
return [deepcopy(self.data.qpos[joint_id]) for joint_id in self.joint_ids]
270272

271-
def ik(self, pose, q0=None, tcp_offset=None):
273+
def ik(self, pose, q0=None):
272274
"""
273275
Perform inverse kinematics to find joint positions that achieve the desired pose.
274276
@@ -280,8 +282,7 @@ def ik(self, pose, q0=None, tcp_offset=None):
280282
Returns:
281283
numpy.ndarray: Joint positions that achieve the desired pose, or None if no solution is found.
282284
"""
283-
tcp_offset = tcp_offset if tcp_offset is not None else self.franka_hand_tcp
284-
return self.env.unwrapped.robot.get_ik().inverse(pose, q0, tcp_offset) # type: ignore[attr-defined]
285+
return self.env.unwrapped.robot.get_ik().inverse(pose, q0, self.tcp_offset) # type: ignore[attr-defined]
285286

286287

287288
class MjOStateSpace(ob.RealVectorStateSpace):
@@ -318,6 +319,7 @@ def __init__(
318319
robot_joint_name: str,
319320
robot_actuator_name: str,
320321
njoints: int,
322+
robot_tcp: Pose | None = None,
321323
obstacle_body_names: list | None = None,
322324
obstacle_geom_names: list | None = None,
323325
interpolate_num: int = INTERPOLATE_NUM,
@@ -368,6 +370,7 @@ def __init__(
368370
robot_name,
369371
robot_env,
370372
njoints=njoints,
373+
robot_tcp=robot_tcp,
371374
robot_xml_name=robot_xml_name,
372375
robot_root_name=robot_root_name,
373376
robot_joint_name=robot_joint_name,
@@ -485,7 +488,7 @@ def _plan_start_goal(self, start: np.ndarray, goal: np.ndarray, allowed_time=DEF
485488
sol_path_list = [np.array(sol_path, dtype=np.float32) for sol_path in sol_path_list]
486489
return res, sol_path_list
487490

488-
def ik(self, pose: Pose, q0: np.ndarray | None = None, tcp_offset: Pose | None = None):
491+
def ik(self, pose: Pose, q0: np.ndarray | None = None):
489492
"""
490493
Perform inverse kinematics to find joint positions that achieve the desired pose.
491494
@@ -500,7 +503,7 @@ def ik(self, pose: Pose, q0: np.ndarray | None = None, tcp_offset: Pose | None =
500503
"""
501504
if q0 is None:
502505
q0 = self.robot.get_joint_positions()
503-
return self.robot.ik(pose, q0, tcp_offset)
506+
return self.robot.ik(pose, q0)
504507

505508
def state_to_list(self, state):
506509
"""

0 commit comments

Comments
 (0)