Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ dynamixel = ["dynamixel-sdk>=3.7.31,<3.9.0"]
# Robots
gamepad = ["lerobot[pygame-dep]", "hidapi>=0.14.0,<0.15.0"]
hopejr = ["lerobot[feetech]", "lerobot[pygame-dep]"]
trossen_arm = ["trossen_arm==1.8.5"]
lekiwi = ["lerobot[feetech]", "pyzmq>=26.2.1,<28.0.0"]
reachy2 = ["reachy2_sdk>=1.0.14,<1.1.0"]
kinematics = ["lerobot[placo-dep]"]
Expand Down
2 changes: 1 addition & 1 deletion src/lerobot/async_inference/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@
SUPPORTED_POLICIES = ["act", "smolvla", "diffusion", "tdmpc", "vqbet", "pi0", "pi05"]

# TODO: Add all other robots
SUPPORTED_ROBOTS = ["so100_follower", "so101_follower", "bi_so100_follower"]
SUPPORTED_ROBOTS = ["so100_follower", "so101_follower", "bi_so100_follower", "widow_ai_follower"]
1 change: 1 addition & 0 deletions src/lerobot/async_inference/robot_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
make_robot_from_config,
so100_follower,
so101_follower,
widow_ai_follower,
)
from lerobot.transport import (
services_pb2, # type: ignore
Expand Down
3 changes: 3 additions & 0 deletions src/lerobot/motors/trossen/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .widow_ai_arm_driver import TrossenArmDriver

__all__ = ["TrossenArmDriver"]
137 changes: 137 additions & 0 deletions src/lerobot/motors/trossen/widow_ai_arm_driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import logging
import traceback
from typing import List, Union

import numpy as np
import trossen_arm as trossen


class TrossenArmDriver:
def __init__(
self,
port: str,
model: str = "V0_LEADER",
velocity_limit_scale: float | None = None,
):
self.port = port
self.model = model
self.velocity_limit_scale = velocity_limit_scale
self.driver = None
self.is_connected = False

self.MIN_TIME_TO_MOVE = 0.1 # seconds

@property
def is_connected(self) -> bool:
return self._is_connected and self.driver is not None

@is_connected.setter
def is_connected(self, value: bool):
self._is_connected = value

def _get_model_config(self):
trossen_arm_models = {
"V0_LEADER": [trossen.Model.wxai_v0, trossen.StandardEndEffector.wxai_v0_leader],
"V0_FOLLOWER": [trossen.Model.wxai_v0, trossen.StandardEndEffector.wxai_v0_follower],
}

try:
return trossen_arm_models[self.model]
except KeyError as e:
raise ValueError(f"Unsupported model: {self.model}") from e

def _scale_velocity_limits(self, scale: float) -> None:
logging.debug(f"Scaling {self.model} arm velocity limits to {scale * 100}% for safety...")

joint_limits = self.driver.get_joint_limits()

for i in range(len(joint_limits)):
original_velocity = joint_limits[i].velocity_max
joint_limits[i].velocity_max *= scale
logging.debug(f" Joint {i}: velocity_max scaled from {original_velocity:.3f} to {joint_limits[i].velocity_max:.3f}")

self.driver.set_joint_limits(joint_limits)
logging.debug(f"{self.model} arm velocity limits successfully scaled.")

def connect(self) -> None:
if self.is_connected:
logging.debug(f"TrossenArmDriver({self.port}) is already connected.")
return

logging.debug(f"Connecting to {self.model} arm at {self.port}...")

self.driver = trossen.TrossenArmDriver()

model_name, model_end_effector = self._get_model_config()

logging.debug("Configuring the drivers...")

try:
self.driver.configure(model_name, model_end_effector, self.port, False)
except Exception as e:
traceback.print_exc()
logging.error(f"Failed to configure the driver for the {self.model} arm at {self.port}.")
raise e

self.is_connected = True

if self.velocity_limit_scale is not None:
self._scale_velocity_limits(self.velocity_limit_scale)

def disconnect(self) -> None:
if not self.is_connected:
return
self.is_connected = False
logging.debug(f"{self.model} arm disconnected.")

def read(self, data_name: str) -> np.ndarray:
if not self.is_connected:
raise RuntimeError(f"TrossenArmDriver({self.port}) is not connected. You need to run `connect()`.")

if data_name == "Present_Position":
values = self.driver.get_all_positions()
elif data_name == "External_Efforts":
values = self.driver.get_all_external_efforts()
else:
raise ValueError(f"Data name: {data_name} is not supported for reading.")

return np.array(values, dtype=np.float32)

def write(self, data_name: str, values: Union[float, List[float], np.ndarray]) -> None:
if not self.is_connected:
raise RuntimeError(f"TrossenArmDriver({self.port}) is not connected. You need to run `connect()`.")

if data_name == "Goal_Position":
values = np.array(values, dtype=np.float32)
self.driver.set_all_positions(values.tolist(), self.MIN_TIME_TO_MOVE, False)
elif data_name == "External_Efforts":
values = np.array(values, dtype=np.float32)
self.driver.set_all_external_efforts(values.tolist(), 0.0, False)
else:
raise ValueError(f"Data name: {data_name} is not supported for writing.")

def initialize_for_teleoperation(self, is_leader: bool = True) -> None:
logging.debug(f"Initializing {self.model} arm for teleoperation...")

logging.debug(f"Setting {self.model} arm mode for teleoperation...")
if is_leader:
self.driver.set_all_modes(trossen.Mode.external_effort)
# Send zero efforts immediately to prevent instability
logging.debug(f"Sending zero efforts to {self.model} arm for stability...")
try:
zero_efforts = [0.0] * self.driver.get_num_joints()
self.driver.set_all_external_efforts(zero_efforts, 0.0, False)
logging.debug(f"Zero efforts sent to {self.model} arm - should be stable now")
except Exception as e:
logging.warning(f"Failed to send zero efforts to {self.model} arm: {e}")
else:
self.driver.set_all_modes(trossen.Mode.position)

logging.debug(f"{self.model} arm initialization complete!")

def __del__(self):
try:
if getattr(self, "_is_connected", False):
self.disconnect()
except Exception as e:
logging.warning(f"Exception during cleanup: {e}")
4 changes: 4 additions & 0 deletions src/lerobot/robots/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ def make_robot_from_config(config: RobotConfig) -> Robot:
from .reachy2 import Reachy2Robot

return Reachy2Robot(config)
elif config.type == "widow_ai_follower":
from .widow_ai_follower import WidowAIFollower

return WidowAIFollower(config)
elif config.type == "mock_robot":
from tests.mocks.mock_robot import MockRobot

Expand Down
2 changes: 2 additions & 0 deletions src/lerobot/robots/widow_ai_follower/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .config_widow_ai_follower import WidowAIFollowerConfig
from .widow_ai_follower import WidowAIFollower
32 changes: 32 additions & 0 deletions src/lerobot/robots/widow_ai_follower/config_widow_ai_follower.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/usr/bin/env python

from dataclasses import dataclass, field

from lerobot.cameras import CameraConfig

from ..config import RobotConfig


@RobotConfig.register_subclass("widow_ai_follower")
@dataclass
class WidowAIFollowerConfig(RobotConfig):
# Port to connect to the arm (IP address for Trossen arms)
port: str

# Trossen arm model to use
model: str = "V0_FOLLOWER" # Options: "V0_LEADER", "V0_FOLLOWER"

# Velocity limit scale for safety (1.0 = 100% of max velocity, 0.5 = 50% of max velocity)
# Scales maximum joint velocities on connection. Set to None to skip scaling.
velocity_limit_scale: float | None = None

# `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes.
# Set this to a positive scalar to have the same value for all motors, or a list that is the same length as
# the number of motors in your follower arms.
max_relative_target: float | None = None

# cameras
cameras: dict[str, CameraConfig] = field(default_factory=dict)

# Enable effort sensing to include effort measurements in observations
effort_sensing: bool = False
183 changes: 183 additions & 0 deletions src/lerobot/robots/widow_ai_follower/widow_ai_follower.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
#!/usr/bin/env python

import logging
import time
from functools import cached_property
from typing import Any

from lerobot.cameras.utils import make_cameras_from_configs
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.motors.trossen import TrossenArmDriver

from ..robot import Robot
from ..utils import ensure_safe_goal_position
from .config_widow_ai_follower import WidowAIFollowerConfig

logger = logging.getLogger(__name__)


class WidowAIFollower(Robot):
config_class = WidowAIFollowerConfig
name = "widow_ai_follower"

def __init__(self, config: WidowAIFollowerConfig):
super().__init__(config)
self.config = config

self.bus = TrossenArmDriver(
port=self.config.port,
model=self.config.model,
velocity_limit_scale=self.config.velocity_limit_scale,
)

self.motor_names = ["shoulder_pan", "shoulder_lift", "elbow_flex", "wrist_1", "wrist_2", "wrist_3", "gripper"]

self.cameras = make_cameras_from_configs(config.cameras)

@property
def _motors_ft(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.motor_names}

@property
def _efforts_ft(self) -> dict[str, type]:
return {f"{motor}.effort": float for motor in self.motor_names}

@property
def _cameras_ft(self) -> dict[str, tuple]:
return {
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
}

@cached_property
def observation_features(self) -> dict[str, type | tuple]:
if self.config.effort_sensing:
return {**self._motors_ft, **self._efforts_ft, **self._cameras_ft}
else:
return {**self._motors_ft, **self._cameras_ft}

@cached_property
def action_features(self) -> dict[str, type]:
return self._motors_ft

@property
def is_connected(self) -> bool:
return self.bus.is_connected and all(cam.is_connected for cam in self.cameras.values())

def connect(self, calibrate: bool = True) -> None:
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")

self.bus.connect()

for cam in self.cameras.values():
cam.connect()

self.configure()
logger.info(f"{self} connected.")

@property
def is_calibrated(self) -> bool:
# Trossen arms are pre-calibrated
return True

def calibrate(self) -> None:
logger.info(f"{self} is pre-calibrated, no calibration needed.")

def configure(self) -> None:
self.bus.initialize_for_teleoperation(is_leader=False)

def setup_motors(self) -> None:
logger.info(f"{self} motors are pre-configured.")

def get_observation(self) -> dict[str, Any]:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")

start = time.perf_counter()
positions = self.bus.read("Present_Position")

# Convert numpy array to dict with motor names
obs_dict = {}
for i, motor in enumerate(self.motor_names):
if i < len(positions):
obs_dict[f"{motor}.pos"] = float(positions[i])
else:
obs_dict[f"{motor}.pos"] = 0.0

dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")

if self.config.effort_sensing:
try:
start = time.perf_counter()
efforts = self.bus.read("External_Efforts")
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read external efforts: {dt_ms:.1f}ms")

# Convert numpy array to dict with motor names
for i, motor in enumerate(self.motor_names):
if i < len(efforts):
obs_dict[f"{motor}.effort"] = float(efforts[i])
else:
obs_dict[f"{motor}.effort"] = 0.0
except Exception as e:
logger.debug(f"{self} failed to read external efforts, using zeros: {e}")
for motor in self.motor_names:
obs_dict[f"{motor}.effort"] = 0.0

# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[cam_key] = cam.async_read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")

return obs_dict

def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
"""Command arm to move to a target joint configuration.

Raises:
RobotDeviceNotConnectedError: if robot is not connected.

Returns:
the action sent to the motors, potentially clipped.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")

start = time.perf_counter()

# Extract goal positions from action dict
goal_pos = []
for motor in self.motor_names:
pos_key = f"{motor}.pos"
if pos_key in action:
goal_pos.append(action[pos_key])
else:
goal_pos.append(0.0)

# Cap goal position when too far away from present position.
if self.config.max_relative_target is not None:
present_pos = self.bus.read("Present_Position")
present_dict = {motor: float(present_pos[i]) for i, motor in enumerate(self.motor_names) if i < len(present_pos)}
goal_dict = {motor: goal_pos[i] for i, motor in enumerate(self.motor_names)}
goal_present_pos = {key: (goal_dict[key], present_dict.get(key, 0.0)) for key in goal_dict}
safe_goal_dict = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target)
goal_pos = [safe_goal_dict.get(motor, 0.0) for motor in self.motor_names]

self.bus.write("Goal_Position", goal_pos)

dt_ms = (time.perf_counter() - start) * 1e3

return {f"{motor}.pos": val for motor, val in zip(self.motor_names, goal_pos)}

def disconnect(self):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")

self.bus.disconnect()
for cam in self.cameras.values():
cam.disconnect()

logger.info(f"{self} disconnected.")
Loading