Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 52 additions & 2 deletions src/lerobot/scripts/lerobot_teleoperate.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,22 @@
--display_data=true
```

Example with latency emulation (0.5 seconds delay):

```shell
lerobot-teleoperate \
--robot.type=so101_follower \
--robot.port=/dev/tty.usbmodem58760431541 \
--teleop.type=so101_leader \
--teleop.port=/dev/tty.usbmodem58760431551 \
--latency=0.5
```

"""

import logging
import time
from collections import deque
from dataclasses import asdict, dataclass
from pprint import pformat

Expand Down Expand Up @@ -104,6 +116,8 @@ class TeleoperateConfig:
teleop_time_s: float | None = None
# Display all cameras on screen
display_data: bool = False
# Emulated latency in seconds (0.0 means no latency)
latency: float = 0.0


def teleop_loop(
Expand All @@ -115,6 +129,7 @@ def teleop_loop(
robot_observation_processor: RobotProcessorPipeline[RobotObservation, RobotObservation],
display_data: bool = False,
duration: float | None = None,
latency: float = 0.0,
):
"""
This function continuously reads actions from a teleoperation device, processes them through optional
Expand All @@ -130,13 +145,19 @@ def teleop_loop(
teleop_action_processor: An optional pipeline to process raw actions from the teleoperator.
robot_action_processor: An optional pipeline to process actions before they are sent to the robot.
robot_observation_processor: An optional pipeline to process raw observations from the robot.
latency: Emulated latency in seconds. Actions are queued and delayed by this amount before being sent to the robot.
"""

display_len = max(len(key) for key in robot.action_features)
start = time.perf_counter()

# Initialize action queue for latency emulation
# Each item in queue is a tuple: (execution_time, robot_action_to_send)
action_queue = deque()

while True:
loop_start = time.perf_counter()
current_time = time.perf_counter()

# Get robot observation
# Not really needed for now other than for visualization
Expand All @@ -153,8 +174,36 @@ def teleop_loop(
# Process action for robot through pipeline
robot_action_to_send = robot_action_processor((teleop_action, obs))

# Send processed action to robot (robot_action_processor.to_output should return dict[str, Any])
_ = robot.send_action(robot_action_to_send)
# Add action to queue with its execution time
execution_time = current_time + latency
action_queue.append((execution_time, robot_action_to_send))

# Determine which action to send to robot based on latency
action_to_execute = None
if latency <= 0.0:
# No latency: send the current action immediately
action_to_execute = robot_action_to_send
# Clear queue since we're not using it
action_queue.clear()
else:
# Check if there's an action ready to execute
while action_queue:
exec_time, queued_action = action_queue[0]
# Check if current time has reached or passed the execution time
if current_time >= exec_time:
# Remove and use this action
action_queue.popleft()
action_to_execute = queued_action
break
else:
# Action not ready yet, stop checking
break

# Send action to robot if we have one to execute
if action_to_execute is not None:
_ = robot.send_action(action_to_execute)
# Use the executed action for display
robot_action_to_send = action_to_execute

if display_data:
# Process robot observation through pipeline
Expand Down Expand Up @@ -205,6 +254,7 @@ def teleoperate(cfg: TeleoperateConfig):
teleop_action_processor=teleop_action_processor,
robot_action_processor=robot_action_processor,
robot_observation_processor=robot_observation_processor,
latency=cfg.latency,
)
except KeyboardInterrupt:
pass
Expand Down