diff --git a/.vscode/settings.json b/.vscode/settings.json
index ebd13ae25..6bfb03ca1 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -3,6 +3,7 @@
"autoDocstring.startOnNewLine": true,
"cSpell.language": "en,de-de,lorem",
"cSpell.words": [
+ "animatable",
"ansible",
"autoconnect",
"basler",
@@ -87,6 +88,7 @@
"seaborn",
"segmentations",
"ssid",
+ "std_srvs",
"taskset",
"teamplayer",
"teleop",
diff --git a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/game_status_capsule.py b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/game_status_capsule.py
index c9232b0eb..7a3eaa9a2 100644
--- a/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/game_status_capsule.py
+++ b/bitbots_behavior/bitbots_blackboard/bitbots_blackboard/capsules/game_status_capsule.py
@@ -101,32 +101,32 @@ def get_team_id(self):
def get_red_cards(self):
return self.gamestate.team_mates_with_red_card
- def gamestate_callback(self, gs: GameState):
- if self.gamestate.penalized and not gs.penalized:
+ def gamestate_callback(self, gamestate_msg: GameState):
+ if self.gamestate.penalized and not gamestate_msg.penalized:
self.unpenalized_time = self.node.get_clock().now().nanoseconds / 1e9
- if gs.own_score > self.gamestate.own_score:
+ if gamestate_msg.own_score > self.gamestate.own_score:
self.last_goal_from_us_time = self.node.get_clock().now().nanoseconds / 1e9
self.last_goal_time = self.node.get_clock().now().nanoseconds / 1e9
- if gs.rival_score > self.gamestate.rival_score:
+ if gamestate_msg.rival_score > self.gamestate.rival_score:
self.last_goal_time = self.node.get_clock().now().nanoseconds / 1e9
if (
- gs.secondary_state_mode == 2
+ gamestate_msg.secondary_state_mode == 2
and self.gamestate.secondary_state_mode != 2
- and gs.game_state == GameState.GAMESTATE_PLAYING
+ and gamestate_msg.game_state == GameState.GAMESTATE_PLAYING
):
# secondary action is now executed but we will not see this in the new messages.
# it will look like a normal kick off, but we need to remember that this is some sort of free kick
# we set the kickoff value accordingly, then we will not be allowed to move if it is a kick for the others
- self.free_kick_kickoff_team = gs.secondary_state_team
+ self.free_kick_kickoff_team = gamestate_msg.secondary_state_team
- if gs.secondary_state_mode != 2 and gs.secondary_seconds_remaining == 0:
+ if gamestate_msg.secondary_state_mode != 2 and gamestate_msg.secondary_seconds_remaining == 0:
self.free_kick_kickoff_team = None
if self.free_kick_kickoff_team is not None:
- gs.has_kick_off = self.free_kick_kickoff_team == self.team_id
+ gamestate_msg.has_kick_off = self.free_kick_kickoff_team == self.team_id
self.last_update = self.node.get_clock().now().nanoseconds / 1e9
- self.gamestate = gs
+ self.gamestate = gamestate_msg
diff --git a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/actions/go_to_corner_kick_position.py b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/actions/go_to_corner_kick_position.py
index 582ac4115..cf41cbc15 100644
--- a/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/actions/go_to_corner_kick_position.py
+++ b/bitbots_behavior/bitbots_body_behavior/bitbots_body_behavior/actions/go_to_corner_kick_position.py
@@ -17,7 +17,7 @@ def __init__(self, blackboard, dsd, parameters):
self.mode = parameters.get("mode", None)
if self.mode is None or self.mode not in ("striker", "supporter", "others"):
self.blackboard.node.get_logger().error("mode for corner kick not specified")
- exit()
+ raise ValueError("mode for corner kick not specified")
def perform(self, reevaluate=False):
# The defense position should be a position between the ball and the own goal.
diff --git a/bitbots_behavior/bitbots_body_behavior/launch/behavior.launch b/bitbots_behavior/bitbots_body_behavior/launch/behavior.launch
index 7d777a7bc..83b4884b3 100644
--- a/bitbots_behavior/bitbots_body_behavior/launch/behavior.launch
+++ b/bitbots_behavior/bitbots_body_behavior/launch/behavior.launch
@@ -1,13 +1,22 @@
+
+
-
-
-
-
-
+
+
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/bitbots_behavior/bitbots_body_behavior/launch/body_behavior.launch b/bitbots_behavior/bitbots_body_behavior/launch/body_behavior.launch
deleted file mode 100644
index 83b4884b3..000000000
--- a/bitbots_behavior/bitbots_body_behavior/launch/body_behavior.launch
+++ /dev/null
@@ -1,22 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/bitbots_lowlevel/bitbots_buttons/src/button_node.cpp b/bitbots_lowlevel/bitbots_buttons/src/button_node.cpp
index 92e8c1225..f2bf324e6 100644
--- a/bitbots_lowlevel/bitbots_buttons/src/button_node.cpp
+++ b/bitbots_lowlevel/bitbots_buttons/src/button_node.cpp
@@ -51,7 +51,7 @@ class ButtonNode : public rclcpp::Node {
RCLCPP_INFO(this->get_logger(), "service switch_power not available, waiting again...");
}
- teaching_mode_client_ = this->create_client("set_teaching_mode");
+ teaching_mode_client_ = this->create_client("teaching_mode");
buttons_sub_ = this->create_subscription(
"/buttons", 1, std::bind(&bitbots_buttons::ButtonNode::buttonCb, this, _1));
gamestate_sub_ = this->create_subscription(
diff --git a/bitbots_misc/bitbots_bringup/launch/motion.launch b/bitbots_misc/bitbots_bringup/launch/motion.launch
index 75cf5f944..e0f77f5c8 100644
--- a/bitbots_misc/bitbots_bringup/launch/motion.launch
+++ b/bitbots_misc/bitbots_bringup/launch/motion.launch
@@ -31,6 +31,11 @@
+
+
+
+
+
diff --git a/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/__init__.py b/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/animation_recording.py b/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/animation_recording.py
new file mode 100644
index 000000000..3631eb9e4
--- /dev/null
+++ b/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/animation_recording.py
@@ -0,0 +1,352 @@
+#!/usr/bin/env python3
+
+import json
+import math
+import os
+from copy import deepcopy
+from datetime import datetime
+from typing import Optional
+
+import regex as re
+from rclpy.action import ActionClient
+from rclpy.client import Client as ServiceClient
+from rclpy.node import Node
+from std_srvs.srv import SetBool
+
+from bitbots_msgs.action import PlayAnimation
+from bitbots_msgs.srv import AddAnimation
+
+
+class AnimationData:
+ """
+ Defines a current status of the recorded Animation
+ """
+
+ def __init__(self):
+ self.key_frames: list[dict] = []
+ self.author: str = "Unknown"
+ self.description: str = "Edit me!"
+ self.last_edited: datetime = datetime.now().isoformat(" ")
+ self.name: str = "None yet"
+ self.version: int = 0
+
+
+class Recorder:
+ def __init__(
+ self,
+ node: Node,
+ animation_action_client: ActionClient,
+ add_animation_client: ServiceClient,
+ hcm_record_mode_client: ServiceClient,
+ ):
+ self._node = node
+ self.steps: list[tuple[AnimationData, str, bool]] = []
+ self.redo_steps: list[tuple[AnimationData, str, AnimationData]] = []
+ self.current_state = AnimationData()
+ self.animation_client = animation_action_client
+ self.add_animation_client = add_animation_client
+ self.hcm_record_mode_client = hcm_record_mode_client
+ self.save_step("Initial step", frozen=True)
+
+ def get_keyframes(self) -> list[dict]:
+ """
+ Gets the keyframes of the current animation
+
+ :return: the keyframes of the current animation
+ """
+ return self.current_state.key_frames
+
+ def get_keyframe(self, name: str) -> Optional[dict]:
+ """
+ Gets the keyframe with the given name
+
+ :param name: the name of the keyframe
+ :return: the keyframe with the given name
+ """
+ # Get the first keyframe with the given name or None
+ return next(filter(lambda key_frame: key_frame["name"] == name, self.get_keyframes()), None)
+
+ def get_keyframe_index(self, name: str) -> Optional[int]:
+ """
+ Gets the index of the keyframe with the given name
+
+ :param name: the name of the keyframe
+ :return: the index of the keyframe with the given name
+ """
+ # Get the index of the first keyframe with the given name or None
+ key_frame = self.get_keyframe(name)
+ if key_frame is None:
+ return None
+ return self.get_keyframes().index(key_frame)
+
+ def get_meta_data(self) -> tuple[str, int, str, str]:
+ """
+ Returns the meta data of the current animation
+ """
+ data = self.current_state
+ return data.name, data.version, data.author, data.description
+
+ def set_meta_data(self, name: str, version: int, author: str, description: str) -> None:
+ self.current_state.name = name
+ self.current_state.version = version
+ self.current_state.author = author
+ self.current_state.description = description
+
+ def save_step(self, description: str, state: Optional[dict] = None, frozen: bool = False) -> None:
+ """Save the current state of the Animation
+ for later restoration by the undo-command
+
+ (Yes we might save only a diff, but the Memory consumption
+ should still be relatively low this way and saving / undoing
+ is really cheap in terms of CPU and effort spent programming)
+
+ :param description: A string describing the saved action for the user
+ :param state: a AnimState can be given otherwise the current one is used
+ :param frozen: if True, the step cannot be undone
+ """
+
+ self._node.get_logger().debug(f"Saving step: {description}")
+ if not state:
+ state = deepcopy(self.current_state)
+ self.steps.append((state, description, frozen))
+
+ def undo(self) -> str:
+ """Undo the last Step if omitted"""
+ if self.steps[-1][2]:
+ self._node.get_logger().warn("Reaching frozen step (e.g. the initial state). Cannot undo further!")
+ return "Reaching frozen step (e.g. the initial state). Cannot undo further!"
+ state, description, _ = self.steps.pop()
+ self.redo_steps.append((state, description, self.current_state))
+ self._node.get_logger().info(f"Undoing: {description}")
+ if self.steps:
+ state, description, _ = self.steps[-1]
+ self.current_state = state
+ self._node.get_logger().info(f"Last noted action: {description}")
+ return f"Undoing. Last noted action: {description}"
+ else:
+ self._node.get_logger().info("There are no previously noted steps")
+ return "Undoing. There are no more previous steps."
+
+ def redo(self) -> str:
+ """Redo the last step if omitted"""
+ post_state = None
+ if not self.redo_steps:
+ self._node.get_logger().warn("Cannot redo what was not undone!")
+ return "Cannot redo what was not undone!"
+ pre_state, description, post_state = self.redo_steps.pop()
+ self.steps.append((pre_state, description, False))
+ self.current_state = post_state
+ self._node.get_logger().info(f"Last noted step is now: {self.steps[-1][1]}")
+ return f"Last noted step is now: {self.steps[-1][1]}"
+
+ def record(
+ self,
+ motor_pos: dict[str, float],
+ motor_torque: dict[str, int], # TODO: check if we can use bool instead of int
+ frame_name: str,
+ duration: float,
+ pause: float,
+ seq_pos: Optional[int] = None,
+ override: bool = False,
+ frozen: bool = False,
+ ):
+ """Record Command, save current keyframe-data
+
+ :param motor_pos: A position for each motor we want to control
+ :param motor_torque: Wether or not the motor should apply torque
+ :param frame_name: The name of the frame
+ :param duration: The duration of the frame
+ :param pause: We pause for this amount of time after the frame
+ :param seq_pos: The position in the sequence where the frame should be inserted
+ :param override: Wether or not to override the frame at the given position
+ :param frozen: if True, the step cannot be undone
+ """
+ frame = {"name": frame_name, "duration": duration, "pause": pause, "goals": motor_pos, "torque": motor_torque}
+ new_frame = deepcopy(frame)
+ if seq_pos is None:
+ self.current_state.key_frames.append(new_frame)
+ self.save_step(f"Appending new keyframe '{frame_name}'", frozen=frozen)
+ elif not override:
+ self.current_state.key_frames.insert(seq_pos, new_frame)
+ self.save_step(f"Inserting new keyframe '{frame_name}' at position {seq_pos}", frozen=frozen)
+ else:
+ self.current_state.key_frames[seq_pos] = new_frame
+ self.save_step(f"Overriding keyframe at position {seq_pos} with '{frame_name}'", frozen=frozen)
+ return True
+
+ def save_animation(self, path: str, file_name: Optional[str] = None) -> None:
+ """Dumps all keyframe data to an animation .json file
+
+ :param path: The folder where the file should be saved
+ :param file_name: The name of the file
+ """
+ if not self.current_state.key_frames:
+ self._node.get_logger().info("There is nothing to save.")
+ return "There is nothing to save."
+
+ # Use the animation name as file name if none is given
+ if not file_name:
+ file_name = self.current_state.name
+
+ path = os.path.join(path, f"{file_name}.json")
+ self._node.get_logger().debug(f"Saving to '{path}'")
+
+ # Convert the angles from radians to degrees
+ keyframes = deepcopy(self.current_state.key_frames)
+ for kf in keyframes:
+ for k, v in kf["goals"].items():
+ kf["goals"][k] = math.degrees(v)
+
+ # Construct the animation dictionary with meta data and keyframes
+ animation_dict = {
+ "name": self.current_state.name,
+ "version": self.current_state.version,
+ "last_edited": datetime.isoformat(datetime.now(), " "),
+ "author": self.current_state.author,
+ "description": self.current_state.description,
+ "keyframes": keyframes,
+ }
+
+ # Save the animation to a file
+ with open(path, "w") as fp:
+ json.dump(animation_dict, fp, sort_keys=True, indent=4)
+
+ return "Saving to '%s'" % path + ". Done."
+
+ def load_animation(self, path: str) -> None:
+ """Record command, load a animation '.json' file
+
+ :param path: path of the animation to load
+ """
+ # Load the json file
+ with open(path) as fp:
+ data = json.load(fp)
+ # Check if the file is a valid animation and convert the angles from degrees to radians
+ try:
+ for keyframe in data["keyframes"]:
+ for k, v in keyframe["goals"].items():
+ keyframe["goals"][k] = math.radians(v)
+ except ValueError as e:
+ self._node.get_logger().error("Animation {} is corrupt:\n {}".format(path, e.message.partition("\n")[0]))
+ return "Animation {} is corrupt:\n {}".format(path, e.message.partition("\n")[0])
+
+ # Set the current state to the loaded animation
+ self.current_state.key_frames = data["keyframes"]
+ self.current_state.name = data["name"]
+ self.current_state.version = data["version"] if "version" in data else 0
+ self.current_state.last_edited = data["last_edited"] if "last_edited" in data else datetime.now().isoformat(" ")
+ self.current_state.author = data["author"] if "author" in data else "Unknown"
+ self.current_state.description = data["description"] if "description" in data else ""
+
+ # Save the current state
+ self.save_step(f"Loading of animation named '{data['name']}'")
+
+ def play(self, from_frame: int = 0, until_frame: int = -1) -> tuple[str, bool]:
+ """Plays (a range of) the current animation using the animation server
+
+ :param from_frame: the keyframe from which the animation should be played
+ :param until_frame: the keyframe until which the animation should be played
+ """
+ if not self.current_state.key_frames:
+ msg = "Refusing to play, because nothing to play exists!"
+ self._node.get_logger().warn(msg)
+ return msg, False
+
+ if until_frame == -1:
+ # Play complete animation
+ until_frame = len(self.current_state.key_frames)
+ else:
+ # Check if the given frame id is in bounds
+ assert until_frame > 0, "Upper bound must be positive"
+ assert until_frame <= len(
+ self.current_state.key_frames
+ ), "Upper bound must be less than or equal to the number of frames"
+ assert from_frame >= 0, "Lower bound must be positive"
+
+ # Create name for the temporary animation that is send to the animation server
+ # We can call this animation by name to play it
+ # We do not want to overwrite the current animation
+ tmp_animation_name = "bitbots_animation_rqt_tmp"
+
+ # Create a dictionary with the animation data
+ animation_dict = {
+ "name": tmp_animation_name,
+ "version": self.current_state.version,
+ "last_edited": datetime.isoformat(datetime.now(), " "),
+ "author": self.current_state.author,
+ "description": self.current_state.description,
+ "keyframes": deepcopy(self.current_state.key_frames),
+ }
+
+ # Convert the angles from radians to degrees
+ for key_frame in animation_dict["keyframes"]:
+ for joint, v in key_frame["goals"].items():
+ key_frame["goals"][joint] = math.degrees(v)
+
+ self._node.get_logger().debug(f"Playing {len(animation_dict['keyframes'])} frames...")
+
+ # Wait for the service to be available before sending the animation
+ if not self.add_animation_client.wait_for_service(timeout_sec=2):
+ self._node.get_logger().error("Add Animation Service not available! Is the animation server running?")
+ return "Add Animation Service not available! Is the animation server running?", False
+
+ # Send request to the service (blocking to make sure the animation is added before playing it)
+ self.add_animation_client.call(AddAnimation.Request(json=json.dumps(animation_dict, sort_keys=True, indent=4)))
+
+ # Set the HCM into record mode
+ if self.hcm_record_mode_client.wait_for_service(timeout_sec=2):
+ self.hcm_record_mode_client.call(SetBool.Request(data=True))
+ else:
+ self._node.get_logger().error(
+ "HCM Record Mode Service not available! Is the HCM running? The animation might not be played if the HCM is not in record mode."
+ )
+
+ # Wait for animation action server to be available
+ if not self.animation_client.wait_for_server(timeout_sec=2):
+ self._node.get_logger().error("Animation Action Server not available! Is the animation server running?")
+ return "Animation Action Server not available! Is the animation server running?", False
+
+ # Send a goal to play the animation
+ self.animation_client.send_goal_async(
+ PlayAnimation.Goal(animation=tmp_animation_name, bounds=True, start=from_frame, end=until_frame)
+ ) # TODO maybe blocking or something else
+
+ return f"Playing {len(animation_dict['keyframes'])} frames...", True
+
+ def change_frame_order(self, new_order: list[str]) -> None:
+ """Changes the order of the frames given an array of frame names"""
+ new_ordered_frames = [self.get_keyframe(frame_name) for frame_name in new_order]
+ assert None not in new_ordered_frames, "One of the given frame names does not exist"
+ self.current_state.key_frames = new_ordered_frames
+ self.save_step("Reordered frames")
+
+ def duplicate(self, frame_name: str) -> None:
+ """
+ Duplicates a frame
+ """
+ # Get the index of the frame
+ index = self.get_keyframe_index(frame_name)
+ assert index is not None, "The given frame name does not exist"
+ # Create a deep copy of the frame
+ new_frame = deepcopy(self.get_keyframe(frame_name))
+ # Add suffix to the frame name if it already exists and increment the number instead of making it longer
+ while new_frame["name"] in [frame["name"] for frame in self.current_state.key_frames]:
+ # Check if the frame name ends with "_copy_" and a number, if so increment the number
+ if re.match(r".*?_copy_(\d+)", new_frame["name"]):
+ new_frame["name"] = re.sub(
+ r"(_copy_)(\d+)", lambda m: f"{m.group(1)}{int(m.group(2)) + 1}", new_frame["name"]
+ )
+ else:
+ new_frame["name"] = f"{frame_name}_copy_1"
+ # Insert the new frame after the original frame
+ self.current_state.key_frames.insert(index + 1, new_frame)
+ self.save_step(f"Duplicated Frame '{frame_name}'")
+
+ def delete(self, frame_name: str) -> None:
+ """
+ Deletes a frame
+ """
+ index = self.get_keyframe_index(frame_name)
+ assert index is not None, "The given frame name does not exist"
+ self.current_state.key_frames.pop(index)
+ self.save_step(f"Deleted Frame '{frame_name}'")
diff --git a/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/record_ui.py b/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/record_ui.py
new file mode 100755
index 000000000..e4e0aedf6
--- /dev/null
+++ b/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/record_ui.py
@@ -0,0 +1,749 @@
+#!/usr/bin/env python3
+import math
+import os
+import sys
+
+import rclpy
+from ament_index_python import get_package_share_directory
+from PyQt5.QtCore import Qt
+from PyQt5.QtGui import QKeySequence
+from PyQt5.QtWidgets import (
+ QAbstractItemView,
+ QFileDialog,
+ QGroupBox,
+ QLabel,
+ QLineEdit,
+ QListWidgetItem,
+ QMainWindow,
+ QMessageBox,
+ QShortcut,
+ QTreeWidgetItem,
+ QVBoxLayout,
+)
+from PyQt5.uic import loadUi
+from rclpy.action import ActionClient
+from rclpy.node import Node
+from rqt_gui.main import Main
+from rqt_gui_py.plugin import Plugin
+from sensor_msgs.msg import JointState
+from std_srvs.srv import SetBool
+
+from bitbots_animation_rqt.animation_recording import Recorder
+from bitbots_animation_rqt.utils import DragDropList, JointStateCommunicate, flatten_dict_of_lists
+from bitbots_msgs.action import Dynup, PlayAnimation
+from bitbots_msgs.msg import JointTorque
+from bitbots_msgs.srv import AddAnimation
+
+
+class RecordUI(Plugin):
+ """
+ This class is the main class for the RecordUI. It is a plugin for the rqt framework and is used to record animations.
+ """
+
+ def __init__(self, context):
+ super().__init__(context)
+ # Store reference to node
+ self._node: Node = context.node
+
+ # Set Name of the plugin
+ self.setObjectName("Record Animation")
+
+ # Create publishers
+ self.effort_pub = self._node.create_publisher(JointTorque, "set_torque_individual", 1)
+
+ # Create a action client to play animations
+ self.animation_client: ActionClient = ActionClient(self._node, PlayAnimation, "animation")
+ if not self.animation_client.wait_for_server(timeout_sec=5.0):
+ self._node.get_logger().error("Animation action server not available after waiting 5 seconds")
+ self.dynup_client = ActionClient(self._node, Dynup, "dynup")
+ if not self.dynup_client.wait_for_server(timeout_sec=5.0):
+ self._node.get_logger().error("Dynup action server not available after waiting 5 seconds")
+
+ # Create a service clients
+ self.add_animation_client = self._node.create_client(AddAnimation, "add_temporary_animation")
+ if not self.add_animation_client.wait_for_service(timeout_sec=5.0):
+ self._node.get_logger().error("AddAnimation service not available after waiting 5 seconds")
+ self.hcm_record_mode_client = self._node.create_client(SetBool, "record_mode")
+ if not self.hcm_record_mode_client.wait_for_service(timeout_sec=5.0):
+ self._node.get_logger().error("RecordMode service not available after waiting 5 seconds")
+
+ # Initialize the recorder module
+ self.create_initialized_recorder()
+
+ # Initialize the window
+ self._widget = QMainWindow()
+
+ # Load XML ui definition
+ ui_file = os.path.join(get_package_share_directory("bitbots_animation_rqt"), "resource", "RecordUI.ui")
+ loadUi(ui_file, self._widget, {})
+
+ # Initialize the GUI state
+ self._motor_controller_text_fields = {}
+ self._joint_states = JointState()
+
+ # Initialize the working values
+ self._working_angles: dict[str, float] = {}
+
+ self.update_time = 0.1 # TODO what is this for?
+
+ # QT directory for saving files
+ self._save_directory = None
+
+ # Initialize the motor tree structure where we can select which motors are stiff
+ self._motor_switcher_active_checkbox: dict[str, QTreeWidgetItem] = {}
+ self._motor_controller_torque_checkbox: dict[str, QTreeWidgetItem] = {}
+
+ # Motor hierarchy
+ self._motor_hierarchy = { # TODO this should be a parameter / loaded from the urdf
+ "Body": {
+ "Head": ["HeadPan", "HeadTilt"],
+ "Arms": {
+ "Left": [
+ "LShoulderPitch",
+ "LShoulderRoll",
+ "LElbow",
+ ],
+ "Right": [
+ "RShoulderPitch",
+ "RShoulderRoll",
+ "RElbow",
+ ],
+ },
+ "Legs": {
+ "Left": [
+ "LHipYaw",
+ "LHipRoll",
+ "LHipPitch",
+ "LKnee",
+ "LAnklePitch",
+ "LAnkleRoll",
+ ],
+ "Right": [
+ "RHipYaw",
+ "RHipRoll",
+ "RHipPitch",
+ "RKnee",
+ "RAnklePitch",
+ "RAnkleRoll",
+ ],
+ },
+ }
+ }
+ self._motor_hierarchy_flat = flatten_dict_of_lists(self._motor_hierarchy)
+
+ # Create drag and dop list for keyframes
+ self._widget.frameList = DragDropList(self._widget, self.change_keyframe_order)
+ self._widget.verticalLayout_2.insertWidget(0, self._widget.frameList)
+ self._widget.frameList.setDragDropMode(QAbstractItemView.InternalMove)
+
+ # Create a list of all motors
+ def get_motor_names(hierarchy: dict) -> list:
+ names = []
+ for element in hierarchy.values():
+ if isinstance(element, dict):
+ names.extend(get_motor_names(element))
+ else:
+ names.extend(element)
+ return names
+
+ self.motors = get_motor_names(self._motor_hierarchy)
+
+ # Create the initial joint state
+ self._initial_joints = JointState(
+ name=self.motors,
+ position=[0.0] * len(self.motors),
+ )
+
+ # Create GUI components
+ self.create_motor_controller()
+ self.create_motor_switcher()
+ # Update ticks
+ self.react_to_motor_selection()
+ self.update_frames()
+ # Connect callbacks to GUI components
+ self.connect_gui_callbacks()
+
+ # Add the widget to the context
+ context.add_widget(self._widget)
+
+ # Create subscriptions
+ self.state_sub = self._node.create_subscription(JointState, "joint_states", self.joint_state_callback, 1)
+
+ # Tell the user that the initialization is complete
+ self._node.get_logger().info("Initialization complete.")
+ self._widget.statusBar.showMessage("Initialization complete.")
+
+ def joint_state_callback(self, joint_states: JointState) -> None:
+ """
+ Callback method for /joint_states.
+ """
+ # Store the joint states
+ self._joint_states = joint_states
+
+ # Send the joint states to the main thread in a safe way
+ c = JointStateCommunicate()
+ c.signal.connect(self.q_joint_state_update)
+ c.signal.emit(joint_states)
+
+ def q_joint_state_update(self, joint_states: JointState) -> None:
+ """
+ Main thread callback for joint state updates.
+ """
+ # Check if we are doing a shutdown. Otherwise this callback might be called after the plugin was shut down and tries to access resources that are not available anymore
+ if not rclpy.ok():
+ return
+ # Update working values of non stiff motors
+ for motor_name in self.motors:
+ if self._motor_controller_torque_checkbox[motor_name].checkState(0) != Qt.CheckState.Checked:
+ # Update textfield
+ self._motor_controller_text_fields[motor_name].setText(
+ str(round(math.degrees(joint_states.position[joint_states.name.index(motor_name)]), 2))
+ )
+ # React to textfield changes
+ self.textfield_update()
+
+ def create_motor_controller(self) -> None:
+ """
+ Sets up the GUI in the middle of the Screen to control the motors.
+ Uses self._motorValues to determine which motors are present.
+ """
+ # Iterate over all motors (we iterate over the flat hierarchy to get the correct order of the motors)
+ for i, motor_name in enumerate(self._motor_hierarchy_flat.values()):
+ # Create a group of UI elements for each motor
+ group = QGroupBox()
+ layout = QVBoxLayout()
+ # Horizontally center the group
+ layout.setAlignment(Qt.AlignHCenter)
+ # Create a label for the motor name
+ label = QLabel()
+ label.setText(motor_name)
+ layout.addWidget(label)
+
+ # Add a textfield to display the exact value of the motor
+ textfield = QLineEdit()
+ textfield.setText("0.0")
+ textfield.textEdited.connect(self.textfield_update)
+ layout.addWidget(textfield)
+ self._motor_controller_text_fields[motor_name] = textfield
+
+ # Add the layout to the group and the group to the main layout (at the correct position in the 5 x n grid)
+ layout.setAlignment(Qt.AlignCenter)
+ group.setLayout(layout)
+ self._widget.motorControlLayout.addWidget(group, i // 4, i % 4)
+
+ def create_motor_switcher(self) -> None:
+ """
+ Loads the motors into the tree and adds the checkboxes
+ """
+
+ # Create a recursive function to build the tree
+ def build_widget_tree(parent, hierarchy: dict, reference_dict: dict[str, QTreeWidgetItem]) -> None:
+ # Iterate over all elements in the hierarchy
+ for key, value in hierarchy.items():
+ # If the element is a dict, create a new group in the tree
+ if isinstance(value, dict):
+ # Create a new group in the tree
+ child = QTreeWidgetItem(parent)
+ child.setText(0, key)
+ child.setFlags(child.flags() | Qt.ItemIsTristate | Qt.ItemIsUserCheckable)
+ child.setExpanded(True)
+ # Recursively call the function to add the children of the group
+ build_widget_tree(child, value, reference_dict)
+ # If the element is a list we are at the lowest level of the hierarchy and add the motors
+ elif isinstance(value, list):
+ # Create a new group in the tree
+ child = QTreeWidgetItem(parent)
+ child.setText(0, key)
+ child.setFlags(child.flags() | Qt.ItemIsTristate | Qt.ItemIsUserCheckable)
+ child.setExpanded(True)
+ for motor_name in value:
+ # Add motor enable checkbox
+ enable_checkbox = QTreeWidgetItem(child)
+ enable_checkbox.setText(0, motor_name)
+ enable_checkbox.setFlags(enable_checkbox.flags() | Qt.ItemIsTristate | Qt.ItemIsUserCheckable)
+ enable_checkbox.setCheckState(0, Qt.Checked)
+ enable_checkbox.setExpanded(True)
+ reference_dict[motor_name] = enable_checkbox
+ else:
+ raise ValueError("Invalid hierarchy")
+
+ # Build the tree for active motors
+ build_widget_tree(self._widget.motorTree, self._motor_hierarchy, self._motor_switcher_active_checkbox)
+ self._widget.motorTree.setHeaderLabel("Active Motors (for the current keyframe)")
+
+ # Build the tree for stiff motors
+ build_widget_tree(self._widget.torqueTree, self._motor_hierarchy, self._motor_controller_torque_checkbox)
+ self._widget.torqueTree.setHeaderLabel("Stiff Motors (for the current keyframe)")
+
+ # Register hook that executes our callback when the user clicks on a checkbox
+ self._widget.motorTree.itemClicked.connect(self.react_to_motor_selection)
+ self._widget.torqueTree.itemClicked.connect(self.react_to_motor_selection)
+
+ def connect_gui_callbacks(self) -> None:
+ """
+ Connects the actions in the top bar to the corresponding functions, and sets their shortcuts
+ :return:
+ """
+ self._widget.actionNew.triggered.connect(self.new)
+ self._widget.actionOpen.triggered.connect(self.open)
+ self._widget.actionSave.triggered.connect(self.save)
+ self._widget.actionSave_as.triggered.connect(lambda: self.save(new_location=True))
+ self._widget.actionInit.triggered.connect(self.goto_init)
+ self._widget.actionCurrent_Frame.triggered.connect(self.goto_frame)
+ self._widget.actionNext_Frame.triggered.connect(self.goto_next)
+ self._widget.actionAnimation.triggered.connect(self.play)
+ self._widget.actionAnimation_until_Frame.triggered.connect(self.play_until)
+ self._widget.actionDuplicate_Frame.triggered.connect(self.duplicate)
+ self._widget.actionDelete_Frame.triggered.connect(self.delete)
+ self._widget.actionLeft.triggered.connect(lambda: self.mirror_frame("L"))
+ self._widget.actionRight.triggered.connect(lambda: self.mirror_frame("R"))
+ self._widget.actionInvert.triggered.connect(self.invert_frame)
+ self._widget.actionUndo.triggered.connect(self.undo)
+ self._widget.actionRedo.triggered.connect(self.redo)
+ self._widget.actionWalkready.triggered.connect(self.play_walkready)
+ self._widget.actionHelp.triggered.connect(self.help)
+ self._widget.buttonRecord.clicked.connect(self.record)
+ self._widget.buttonRecord.shortcut = QShortcut(QKeySequence("Space"), self._widget)
+ self._widget.buttonRecord.shortcut.activated.connect(self.record)
+ self._widget.frameList.key_pressed.connect(self.delete)
+ self._widget.frameList.itemSelectionChanged.connect(self.frame_select)
+
+ def play_walkready(self) -> None:
+ """
+ Plays the walkready animation on the robot
+ """
+ result: Dynup.Result = self.dynup_client.send_goal(Dynup.Goal(direction="walkready")).result
+ if not result.successful:
+ self._node.get_logger().error("Could not execute walkready animation")
+
+ def help(self) -> None:
+ """
+ Prints out the help dialogue
+ """
+ message = "This is RecordUI, a tool to record robot animations.\n \n Keyboard shortcuts: \n \n \
+ New: Ctrl + N \n \
+ Open: Ctrl + O \n \
+ Save: Ctrl + S \n \
+ Save as: Ctrl + Shift + S \n \n \
+ Play Init: Ctrl + I \n \
+ Play Frame: Ctrl + P \n \
+ Play Next: Alt + P \n \
+ Play Animation: Ctrl + Shift + P \n \
+ Play Until: Ctrl + Alt + P \n \n \
+ Record Frame: Space \n \
+ Duplicate Frame: '+' \n \
+ Delete Frame: '-' \n \
+ Undo: Ctrl + Z \n \
+ Redo: Ctrl + Y \n \
+ Help: Ctrl + H \n \n \
+ Mirror to Left: Ctrl + Left Arrow \n \
+ Mirror to Right: Ctrl + Right Arrow \n \
+ Invert: Ctrl + Down Arrow"
+ QMessageBox.about(self._widget, "About RecordUI", message)
+
+ def new(self) -> None:
+ """
+ Deletes all currently recorded frames
+ """
+ if len(self._widget.frameList) > 1:
+ message = "This will discard your current Animation. Continue?"
+ sure = QMessageBox.question(self._widget, "Sure?", message, QMessageBox.Yes | QMessageBox.No)
+ if sure == QMessageBox.Yes:
+ # Create a new recorder (deletes all states and starts from scratch)
+ self.create_initialized_recorder()
+ self.update_frames()
+
+ def create_initialized_recorder(self) -> None:
+ """
+ Creates a new recorder and adds an initial keyframe to it
+ """
+ # Create a new recorder (deletes all states and starts from scratch)
+ self._recorder = Recorder(
+ self._node, self.animation_client, self.add_animation_client, self.hcm_record_mode_client
+ )
+ # Find a name for the first keyframe
+ self._selected_frame = "start frame"
+ # Add an initial keyframe to the recorder (it has no motors active, but serves as a starting point)
+ self._recorder.record({}, {}, self._selected_frame, 1.0, 0.0, frozen=True)
+
+ def save(self, new_location: bool = False) -> None:
+ """
+ Saves all currently recorded frames into a json file
+ """
+ # Check if there is anything to save
+ if self._recorder.get_keyframes():
+ # Add metadata from UI to the animation
+ self._recorder.set_meta_data(
+ self._widget.lineAnimationName.text(),
+ self._widget.lineVersion.text(),
+ self._widget.lineAuthor.text(),
+ self._widget.fieldDescription.toPlainText(),
+ )
+ # Ask for a save location if there is none or if the user wants to save to a new location
+ if not self._save_directory or new_location:
+ # QFileDialogue throws a gtk warning, that can not be suppressed or fixed. Should be ignored.
+ self._save_directory = QFileDialog.getExistingDirectory(
+ self._widget, "Select Directory for Animation Files", os.path.expanduser("~")
+ )
+ # Check if the user selected a directory
+ if not self._save_directory:
+ self._widget.statusBar.showMessage("Aborted saving.")
+ return
+ # Save the animation
+ status = self._recorder.save_animation(self._save_directory, self._widget.lineAnimationName.text())
+ self._widget.statusBar.showMessage(status)
+ else:
+ self._widget.statusBar.showMessage("There is nothing to save!")
+
+ def open(self) -> None:
+ """
+ Deletes all current frames and instead loads an animation from a json file
+ """
+ # Check for unsaved changes and ask the user if they want to discard them if there are any
+ if len(self._recorder.get_keyframes()) > 1:
+ message = "This will discard your current Animation. Continue?"
+ sure = QMessageBox.question(self._widget, "Sure?", message, QMessageBox.Yes | QMessageBox.No)
+ # Cancel the open if the user does not want to discard the current animation
+ if sure == QMessageBox.No:
+ return
+ # Open the file dialog in the animations build directory
+ my_file = QFileDialog.getOpenFileName(
+ directory=os.path.join(get_package_share_directory("wolfgang_animations"), "animations"), filter="*.json"
+ )
+
+ # Cancel the open if the user does not select a file
+ if not my_file or not my_file[0]:
+ return
+
+ # Load the animation
+ status = self._recorder.load_animation(my_file[0])
+
+ # Update the UI
+ if status == "":
+ status = "Load successful."
+ self._widget.statusBar.showMessage(status)
+
+ # Update the frames in the UI
+ self.update_frames()
+
+ # Update the metadata input in the UI
+ metadata = self._recorder.get_meta_data()
+ self._widget.lineAnimationName.setText(metadata[0])
+ self._widget.lineAuthor.setText(metadata[2])
+ self._widget.lineVersion.setText(str(metadata[1]))
+ self._widget.fieldDescription.setPlainText(metadata[3])
+
+ def play(self):
+ """
+ Plays the animation
+ """
+ status, success = self._recorder.play()
+ self._widget.statusBar.showMessage(status)
+ if not success:
+ QMessageBox.warning(self._widget, "Warning", status)
+
+ def play_until(self):
+ """
+ Plays the animation up to a certain frame
+ """
+ # Get the index of the selected frame
+ end_index = self._recorder.get_keyframe_index(self._selected_frame) + 1
+
+ # Play the animation
+ status, success = self._recorder.play(until_frame=end_index)
+ if not success:
+ QMessageBox.warning(self._widget, "Warning", status)
+
+ def goto_frame(self):
+ """
+ Plays one single frame
+ """
+ index = self._recorder.get_keyframe_index(self._selected_frame)
+ assert index is not None, "Selected frame not found in list of keyframes"
+
+ self._recorder.play(from_frame=index, until_frame=index + 1)
+
+ def goto_next(self):
+ # Get current index
+ index = self._recorder.get_keyframe_index(self._selected_frame)
+ assert index is not None, "Selected frame not found in list of keyframes"
+
+ # Get the next frame (keep the current frame if we are at the end)
+ next_frame_index = min(index + 1, len(self._recorder.get_keyframes()) - 1)
+
+ # Play the next frame
+ self._recorder.play(from_frame=index, until_frame=next_frame_index + 1)
+
+ # Go to the frame in the UI
+ self._widget.frameList.setCurrentRow(next_frame_index)
+ self._selected_frame = self._recorder.get_keyframes()[next_frame_index]["name"]
+ self.react_to_frame_change()
+
+ def goto_init(self):
+ """
+ Plays init frame
+ """
+ # Request record mode from HCM
+ self.hcm_record_mode_client.call(SetBool.Request(data=True))
+ # Play the init animation
+ self.animation_client.send_goal_async(
+ PlayAnimation.Goal(
+ animation="init",
+ )
+ )
+
+ def duplicate(self):
+ """
+ Creates a copy of the selected frame
+ """
+ try:
+ frame = self._widget.frameList.selectedItems()[0].text()
+ except Exception as e:
+ self._node.get_logger().error(str(e))
+ return
+ assert self._recorder.get_keyframe(frame) is not None, "Selected frame not found in list of keyframes"
+ self._recorder.duplicate(frame)
+ self._widget.statusBar.showMessage("Duplicated frame " + frame)
+ self.update_frames()
+
+ def delete(self):
+ """
+ Deletes a frame
+ """
+ try:
+ frame = self._widget.frameList.selectedItems()[0].text()
+ except Exception as e:
+ self._node.get_logger().error(str(e))
+ return
+ assert self._recorder.get_keyframe(frame) is not None, "Selected frame not found in list of keyframes"
+ self._recorder.delete(frame)
+ self._widget.statusBar.showMessage(f"Deleted frame {frame}")
+ self.update_frames()
+
+ def record(self):
+ """
+ Records a frame, meaning it saves the current motor positions and settings in the recorder at the selected frame
+ """
+ # Get the index of the currently selected frame
+ index = self._recorder.get_keyframe_index(self._selected_frame)
+ assert index is not None, "Selected frame not found in list of keyframes"
+
+ # Check if we added a valid name
+ new_name = self._widget.lineFrameName.text()
+ if not new_name:
+ QMessageBox.warning(self._widget, "Warning", "Please enter a unique name for the keyframe.")
+ return
+
+ # Check if the name is unique
+ if new_name != self._selected_frame and self._recorder.get_keyframe(new_name) is not None:
+ QMessageBox.warning(self._widget, "Warning", f"A keyframe with the name '{new_name}' already exists.")
+ return
+
+ # Record the frame
+ self._recorder.record(
+ # Only record the active motors
+ self._working_angles,
+ {},
+ new_name,
+ self._widget.spinBoxDuration.value(),
+ self._widget.spinBoxPause.value(),
+ index,
+ True,
+ )
+
+ # Update the selected frame
+ self._selected_frame = new_name
+
+ # Display a message
+ self._widget.statusBar.showMessage(f"Recorded frame {self._selected_frame}")
+
+ # Update the frames in the UI
+ self.update_frames()
+
+ def undo(self):
+ """
+ Undo the previous action
+ """
+ status = self._recorder.undo()
+ self._widget.statusBar.showMessage(status)
+ self.update_frames()
+
+ def redo(self):
+ """
+ Redos an action
+ """
+ status = self._recorder.redo()
+ self._widget.statusBar.showMessage(status)
+ self.update_frames()
+
+ def mirror_frame(self, direction):
+ """
+ Copies all motor values from one side of the robot to the other. Inverts values, if necessary
+ """
+ raise NotImplementedError("This function is not yet implemented")
+ self._widget.statusBar.showMessage("Mirrored frame to " + direction)
+
+ def invert_frame(self):
+ """
+ Copies all values from the left side to the right and all values from the right side to the left.
+ Inverts values, if necessary
+ """
+ raise NotImplementedError("This function is not yet implemented")
+ self._widget.statusBar.showMessage("Inverted frame")
+
+ def frame_select(self):
+ """
+ Gets called when a frame is selected in the list of frames
+ """
+ # Check if a frame is selected at all
+ if self._widget.frameList.currentItem() is not None:
+ # Get the selected frame
+ selected_frame_name = self._widget.frameList.currentItem().text()
+ selected_frame = self._recorder.get_keyframe(selected_frame_name)
+ if selected_frame is not None:
+ # Update state so we have a new selected frame
+ self._selected_frame = selected_frame_name
+
+ # Update the UI
+ self.react_to_frame_change()
+
+ def react_to_frame_change(self):
+ """
+ Updates the UI when the frame changes
+ """
+ # Get the selected frame
+ selected_frame = self._recorder.get_keyframe(self._selected_frame)
+
+ # Update the name
+ self._widget.lineFrameName.setText(self._selected_frame)
+
+ # Update what motors are active, what are stiff and set the angles accordingly
+ for motor_name in self.motors:
+ # Update checkbox if the motor is active or not
+ active = motor_name in selected_frame["goals"]
+ self._motor_switcher_active_checkbox[motor_name].setCheckState(0, Qt.Checked if active else Qt.Unchecked)
+
+ # Update checkbox if the motor is stiff or not
+ stiff = "torques" not in selected_frame or (
+ motor_name in selected_frame["torques"] and bool(selected_frame["torques"][motor_name])
+ )
+ self._motor_controller_torque_checkbox[motor_name].setCheckState(0, Qt.Checked if stiff else Qt.Unchecked)
+
+ # Update the motor angle controls (value and active state)
+ if active:
+ self._motor_controller_text_fields[motor_name].setText(
+ str(round(math.degrees(selected_frame["goals"][motor_name]), 2))
+ )
+ else:
+ self._motor_controller_text_fields[motor_name].setText("0.0")
+
+ # Update the duration and pause
+ self._widget.spinBoxDuration.setValue(selected_frame["duration"])
+ self._widget.spinBoxPause.setValue(selected_frame["pause"])
+
+ self.react_to_motor_selection()
+
+ def textfield_update(self):
+ """
+ If the textfield is updated, update working values
+ """
+ for motor_name, text_field in self._motor_controller_text_fields.items():
+ try:
+ # Get the angle from the textfield
+ angle = float(text_field.text())
+ except ValueError:
+ # Display QMessageBox stating that the value is not a number
+ QMessageBox.warning(
+ self._widget,
+ "Warning",
+ f"Please enter a valid number.\n '{text_field.text()}' is not a valid number.",
+ )
+ return
+ # Clip the angle to the maximum and minimum, we do this in degrees,
+ # because we do not want introduce rounding errors in the textfield
+ angle = round(max(-180.0, min(angle, 180.0)), 2)
+ # Set the angle in the textfield
+ if float(text_field.text()) != float(angle):
+ text_field.setText(str(angle))
+ # Set the angle in the working values if the motor is active
+ if self._motor_switcher_active_checkbox[motor_name].checkState(0) == Qt.CheckState.Checked:
+ self._working_angles[motor_name] = math.radians(angle)
+
+ def react_to_motor_selection(self):
+ """
+ Fetches which motors are active from the UI. It also fetches whether the motor is stiff or not. It then
+ publishes the motor torques to the robot.
+ """
+ # Go through all motors
+ for motor_name in self.motors:
+ active = self._motor_switcher_active_checkbox[motor_name].checkState(0) == Qt.CheckState.Checked
+ # Enable or disable the motor controls
+ self._motor_controller_text_fields[motor_name].setEnabled(active)
+ # Enable or disable the torque controls
+ if not active:
+ self._motor_controller_torque_checkbox[motor_name].setCheckState(0, Qt.Checked)
+ self._motor_controller_torque_checkbox[motor_name].setFlags(
+ self._motor_controller_torque_checkbox[motor_name].flags() | Qt.ItemIsEnabled
+ if active
+ else self._motor_controller_torque_checkbox[motor_name].flags() & ~Qt.ItemIsEnabled
+ )
+ # Remove the motor from the working angles if it is not active
+ if not active and motor_name in self._working_angles:
+ self._working_angles.pop(motor_name)
+
+ # Publish the torque message
+ self.effort_pub.publish(
+ JointTorque(
+ joint_names=self.motors,
+ on=[
+ self._motor_controller_torque_checkbox[motor_name].checkState(0) == Qt.CheckState.Checked
+ for motor_name in self.motors
+ ],
+ )
+ )
+
+ def update_frames(self) -> None:
+ """
+ Updates the list of frames in the GUI based on the keyframes in the recorder
+ """
+ # Get the current state of the recorder and the index of the selected frame
+ keyframes = self._recorder.get_keyframes()
+ index = self._recorder.get_keyframe_index(self._selected_frame)
+
+ # Check if the selected frame is still in the list of frames
+ if index is None:
+ # If it is not, select the first frame
+ index = 0
+ self._selected_frame = keyframes[index]["name"]
+ print(f"Selected frame {self._selected_frame}")
+
+ # Clear the list of frames
+ self._widget.frameList.clear()
+
+ # Add all frames to the list
+ for frame in keyframes:
+ item = QListWidgetItem()
+ item.setText(frame["name"])
+ self._widget.frameList.addItem(item)
+
+ # Select the correct frame
+ self._widget.frameList.setCurrentRow(index)
+
+ # Variables depending on the frame selection
+ self.react_to_frame_change()
+
+ def change_keyframe_order(self, new_order: list[str]) -> None:
+ """Calls the recorder to update frame order and updates the gui"""
+ self._recorder.change_frame_order(new_order)
+ self.update_frames()
+
+ def shutdown_plugin(self):
+ """Clean up by sending the HCM that we are not in record mode anymore"""
+ if self.hcm_record_mode_client.wait_for_service(timeout_sec=1.0):
+ self.hcm_record_mode_client.call(SetBool.Request(data=False))
+
+
+def main():
+ plugin = "bitbots_animation_rqt.record_ui.RecordUI"
+ main = Main(filename=plugin)
+ sys.exit(main.main(standalone=plugin))
diff --git a/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/utils.py b/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/utils.py
new file mode 100644
index 000000000..f9f46c05a
--- /dev/null
+++ b/bitbots_motion/bitbots_animation_rqt/bitbots_animation_rqt/utils.py
@@ -0,0 +1,66 @@
+from typing import Callable
+
+from PyQt5.QtCore import QObject, Qt, pyqtSignal
+from PyQt5.QtWidgets import QListWidget, QWidget
+from sensor_msgs.msg import JointState
+
+
+class DragDropList(QListWidget):
+ """QListWidget with an event that is called when a drag and drop action was performed."""
+
+ key_pressed = pyqtSignal()
+
+ def __init__(self, parent: QWidget, frame_order_callback: Callable[[list[str]], None]):
+ super().__init__(parent)
+
+ self.frame_order_callback = frame_order_callback
+ self.setAcceptDrops(True)
+
+ # fmt: off
+ def dropEvent(self, e): # noqa: N802
+ # fmt: on
+ super().dropEvent(e)
+ items = []
+ for i in range(0, self.count()):
+ items.append(self.item(i).text())
+ self.frame_order_callback(items)
+
+ # fmt: off
+ def keyPressEvent(self, event): # noqa: N802
+ # fmt: on
+ if event.key() == Qt.Key_Delete:
+ super().keyPressEvent(event)
+ self.key_pressed.emit()
+ elif event.key() == Qt.Key_Up and self.currentRow() - 1 >= 0:
+ self.setCurrentRow(self.currentRow() - 1)
+ elif event.key() == Qt.Key_Down and self.currentRow() + 1 < self.count():
+ self.setCurrentRow(self.currentRow() + 1)
+
+
+class JointStateCommunicate(QObject):
+ signal = pyqtSignal(JointState)
+
+
+def flatten_dict(input_dict: dict, parent_key: str = "", sep: str = ".") -> dict:
+ """Flatten a nested dictionary."""
+ items = []
+ for k, v in input_dict.items():
+ new_key = f"{parent_key}{sep}{k}" if parent_key else k
+ if isinstance(v, dict):
+ items.extend(flatten_dict(v, new_key, sep=sep).items())
+ else:
+ items.append((new_key, v))
+ return dict(items)
+
+
+def flatten_dict_of_lists(input_dict: dict, sep: str = ".") -> dict:
+ """Flatten a nested dictionary and convert lists to separate keys."""
+ flat_dict = flatten_dict(input_dict, sep=sep)
+ new_dict = {}
+ for key, value in flat_dict.items():
+ if isinstance(value, list):
+ for i, list_value in enumerate(value):
+ new_dict[f"{key}{sep}{i}"] = list_value
+ else:
+ new_dict[key] = value
+ return new_dict
diff --git a/bitbots_motion/bitbots_animation_rqt/package.xml b/bitbots_motion/bitbots_animation_rqt/package.xml
new file mode 100644
index 000000000..aec6962c0
--- /dev/null
+++ b/bitbots_motion/bitbots_animation_rqt/package.xml
@@ -0,0 +1,30 @@
+
+
+
+ bitbots_animation_rqt
+ 2.0.0
+
+ A Python GUI plugin to record joint space animations.
+
+
+ Florian Vahl
+ Hamburg Bit-Bots
+
+ Jasper Güldenstein
+
+ MIT
+
+ bitbots_hcm
+ python3-pyqt5
+ python3-rospkg
+ rclpy
+ rqt_gui_py
+ rqt_gui
+ std_srvs
+
+
+
+
+ ament_python
+
+
diff --git a/bitbots_motion/bitbots_animation_rqt/plugin.xml b/bitbots_motion/bitbots_animation_rqt/plugin.xml
new file mode 100644
index 000000000..969a3f1a4
--- /dev/null
+++ b/bitbots_motion/bitbots_animation_rqt/plugin.xml
@@ -0,0 +1,17 @@
+
+
+
+ A Python GUI plugin to record joint space animations.
+
+
+
+
+ folder
+ Plugins related to RoboCup.
+
+
+ preferences-system-network
+ A Python GUI plugin to record joint space animations.
+
+
+
diff --git a/bitbots_motion/bitbots_animation_rqt/resource/RecordUI.ui b/bitbots_motion/bitbots_animation_rqt/resource/RecordUI.ui
new file mode 100644
index 000000000..774c3e6bf
--- /dev/null
+++ b/bitbots_motion/bitbots_animation_rqt/resource/RecordUI.ui
@@ -0,0 +1,345 @@
+
+
+ MainWindow
+
+
+
+ 0
+ 0
+ 1285
+ 670
+
+
+
+ Bit-Bots Animation
+
+
+
+
+
+
+ QLayout::SetNoConstraint
+
+
+
+
+
+
+
+
+
+
+
+ Author
+
+
+
+
+
+
+
+ 16777215
+ 150
+
+
+
+
+
+
+
+ Animation Name
+
+
+
+
+
+
+
+
+
+ Version
+
+
+
+
+
+
+
+
+
+ Description
+
+
+ Qt::AlignLeading|Qt::AlignLeft|Qt::AlignTop
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Frame Name
+
+
+
+
+
+
+
+
+
+ Duration
+
+
+
+
+
+
+ 0.100000000000000
+
+
+ 1.000000000000000
+
+
+ 0.000000000000000
+
+
+
+
+
+
+ Pause
+
+
+
+
+
+
+ 0.100000000000000
+
+
+ 0.000000000000000
+
+
+
+
+
+
+ Record
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 1
+
+
+
+
+
+
+
+
+ Torque
+
+
+
+
+
+
+
+
+
+
+ toolBar
+
+
+ TopToolBarArea
+
+
+ false
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ New
+
+
+ Ctrl+N
+
+
+
+
+ Open
+
+
+ Ctrl+O
+
+
+
+
+ Save
+
+
+ Ctrl+S
+
+
+
+
+ Save as
+
+
+ Ctrl+Shift+S
+
+
+
+
+ Play Init
+
+
+ Ctrl+I
+
+
+
+
+ Play Frame
+
+
+ Ctrl+P
+
+
+
+
+ Play Next
+
+
+ Alt+P
+
+
+
+
+ Play Animation
+
+
+ Ctrl+Shift+P
+
+
+
+
+ Play Until
+
+
+ Ctrl+Alt+P
+
+
+
+
+ Duplicate Frame
+
+
+ +
+
+
+
+
+ Delete Frame
+
+
+ -
+
+
+
+
+ Undo
+
+
+ Ctrl+Z
+
+
+
+
+ Redo
+
+
+ Ctrl+Y
+
+
+
+
+ Mirror to Left
+
+
+ Ctrl+Left
+
+
+
+
+ Mirror to Right
+
+
+ Ctrl+Right
+
+
+
+
+ Invert
+
+
+ Ctrl+Down
+
+
+
+
+ Go to Walkready
+
+
+ Ctrl+W
+
+
+
+
+ Help
+
+
+ Ctrl+H
+
+
+
+
+
+
diff --git a/bitbots_motion/bitbots_animation_rqt/resource/bitbots_animation_rqt b/bitbots_motion/bitbots_animation_rqt/resource/bitbots_animation_rqt
new file mode 100644
index 000000000..e69de29bb
diff --git a/bitbots_motion/bitbots_animation_rqt/setup.cfg b/bitbots_motion/bitbots_animation_rqt/setup.cfg
new file mode 100644
index 000000000..45d0cbda9
--- /dev/null
+++ b/bitbots_motion/bitbots_animation_rqt/setup.cfg
@@ -0,0 +1,4 @@
+[develop]
+script_dir=$base/lib/bitbots_animation_rqt
+[install]
+install_scripts=$base/lib/bitbots_animation_rqt
diff --git a/bitbots_motion/bitbots_animation_rqt/setup.py b/bitbots_motion/bitbots_animation_rqt/setup.py
new file mode 100644
index 000000000..ff22687d2
--- /dev/null
+++ b/bitbots_motion/bitbots_animation_rqt/setup.py
@@ -0,0 +1,21 @@
+from setuptools import find_packages, setup
+
+package_name = "bitbots_animation_rqt"
+
+setup(
+ name=package_name,
+ packages=find_packages(),
+ data_files=[
+ ("share/" + package_name + "/resource", ["resource/RecordUI.ui"]),
+ ("share/ament_index/resource_index/packages", ["resource/" + package_name]),
+ ("share/" + package_name, ["package.xml"]),
+ ("share/" + package_name, ["plugin.xml"]),
+ ],
+ install_requires=["setuptools"],
+ zip_safe=True,
+ entry_points={
+ "console_scripts": [
+ "animation_gui = " + package_name + ".record_ui:main",
+ ],
+ },
+)
diff --git a/bitbots_motion/bitbots_animation_server/bitbots_animation_server/animation.py b/bitbots_motion/bitbots_animation_server/bitbots_animation_server/animation.py
index 6f86294cd..83e0fc745 100644
--- a/bitbots_motion/bitbots_animation_server/bitbots_animation_server/animation.py
+++ b/bitbots_motion/bitbots_animation_server/bitbots_animation_server/animation.py
@@ -19,7 +19,7 @@ class Animation:
def __init__(self, name, keyframes, default_interpolator=None):
self.name: str = name
- self.keyframes: list(Keyframe) = keyframes
+ self.keyframes: list[Keyframe] = keyframes
self.default_interpolator = default_interpolator
diff --git a/bitbots_motion/bitbots_animation_server/bitbots_animation_server/animation_node.py b/bitbots_motion/bitbots_animation_server/bitbots_animation_server/animation_node.py
index b4eec1b9a..1c24b3bd0 100755
--- a/bitbots_motion/bitbots_animation_server/bitbots_animation_server/animation_node.py
+++ b/bitbots_motion/bitbots_animation_server/bitbots_animation_server/animation_node.py
@@ -1,24 +1,28 @@
#!/usr/bin/env python3
import json
+import sys
import traceback
+from typing import Optional
import numpy as np
import rclpy
-from rclpy.action import ActionServer
+from rclpy.action import ActionServer, GoalResponse
from rclpy.action.server import ServerGoalHandle
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.duration import Duration
from rclpy.executors import ExternalShutdownException, MultiThreadedExecutor
from rclpy.node import Node
from sensor_msgs.msg import JointState
-from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
+from std_msgs.msg import Header
+from std_srvs.srv import SetBool
from bitbots_animation_server.animation import Animation, parse
from bitbots_animation_server.resource_manager import ResourceManager
from bitbots_animation_server.spline_animator import SplineAnimator
from bitbots_msgs.action import PlayAnimation
from bitbots_msgs.msg import Animation as AnimationMsg
-from bitbots_msgs.msg import RobotControlState
+from bitbots_msgs.msg import JointCommand
+from bitbots_msgs.srv import AddAnimation
class AnimationNode(Node):
@@ -29,10 +33,10 @@ def __init__(self):
super().__init__("animation_server")
self.get_logger().debug("Starting Animation Server")
- self.current_joint_states = None
- self.hcm_state = 0
+ self.current_joint_states: Optional[JointState] = None
self.current_animation = None
self.animation_cache: dict[str, Animation] = {}
+ self.animation_running: bool = False
# Get robot type and create resource manager
self.declare_parameter("robot_type", "wolfgang")
@@ -55,70 +59,135 @@ def __init__(self):
traceback.print_exc()
callback_group = ReentrantCallbackGroup()
+
+ # Subscribers
self.create_subscription(JointState, "joint_states", self.update_current_pose, 1, callback_group=callback_group)
- self.create_subscription(
- RobotControlState, "robot_state", self.update_hcm_state, 1, callback_group=callback_group
- )
+ # Service clients
+ self.hcm_animation_mode = self.create_client(SetBool, "play_animation_mode")
+
+ # Publisher for sending joint states to the HCM
self.hcm_publisher = self.create_publisher(AnimationMsg, "animation", 1)
- self._as = ActionServer(self, PlayAnimation, "animation", self.execute_cb, callback_group=callback_group)
+ # Action server for playing animations
+ self._action_server = ActionServer(
+ self, PlayAnimation, "animation", self.execute_cb, callback_group=callback_group, goal_callback=self.goal_cb
+ )
+
+ # Service to temporarily add an animation to the cache
+ self.add_animation_service = self.create_service(AddAnimation, "add_temporary_animation", self.add_animation)
+
+ def goal_cb(self, request: PlayAnimation.Goal) -> GoalResponse:
+ """This checks whether the goal is acceptable."""
+
+ # Check if we know the animation
+ if request.animation not in self.animation_cache:
+ self.get_logger().error(f"Animation '{request.animation}' not found")
+ return GoalResponse.REJECT
+
+ # Check if another animation is currently running
+ if self.animation_running:
+ self.get_logger().error("Another animation is currently running. Rejecting...")
+ return GoalResponse.REJECT
+
+ # Check if bounds are valid
+ if request.bounds:
+ # Get the length of the animation
+ animation_length = len(self.animation_cache[request.animation].keyframes)
+ # Check if the bounds are valid
+ if request.start < 0 or request.start >= animation_length:
+ self.get_logger().error(
+ f"Start index {request.start} out of bounds for animation '{request.animation}'"
+ )
+ return GoalResponse.REJECT
+ if request.end < 0 or request.end > animation_length:
+ self.get_logger().error(f"End index {request.end} out of bounds for animation '{request.animation}'")
+ return GoalResponse.REJECT
+ if request.start >= request.end:
+ self.get_logger().error(f"Start index {request.start} must be smaller than end index {request.end}")
+ return GoalResponse.REJECT
+
+ # Everything is fine we are good to go
+ return GoalResponse.ACCEPT
+
+ def execute_cb(self, goal: ServerGoalHandle) -> PlayAnimation.Result:
+ """This is called, when the action should be executed."""
- def execute_cb(self, goal: ServerGoalHandle):
- """This is called, when someone calls the animation action"""
+ # Store the fact that an animation is running
+ self.animation_running = True
+
+ # Define a function to finish the action and return the resource
+ def finish(successful: bool) -> PlayAnimation.Result:
+ """This is called when the action is finished."""
+ self.animation_running = False
+ return PlayAnimation.Result(successful=successful)
# Set type for request
request: PlayAnimation.Goal = goal.request
- first = True
self.current_animation = request.animation
# publish info to the console for the user
- self.get_logger().info(f"Request to play animation {request.animation}")
+ self.get_logger().info(f"Request to play animation {self.current_animation}")
- if self.hcm_state != 0 and not request.hcm: # 0 means controllable
- # we cant play an animation right now
- # but we send a request, so that we may can soon
- self.send_animation_request()
+ # If the request is not from the HCM we might need to wait for the HCM to be controllable
+ if not request.hcm:
+ # Wait for HCM to be up and running
+ if not self.hcm_animation_mode.wait_for_service(timeout_sec=5.0):
+ self.get_logger().error("HCM not available. Aborting animation...")
+ goal.abort()
+ return finish(successful=False)
- # Wait for the hcm to be controllable
+ # Send request to make the HCM to go into animation play mode
num_tries = 0
- while rclpy.ok() and self.hcm_state != 0 and num_tries < 10:
+ while rclpy.ok() and (not self.hcm_animation_mode.call(SetBool.Request(data=True)).success):
+ if num_tries >= 10:
+ self.get_logger().error("Failed to request HCM to go into animation play mode")
+ goal.abort()
+ return finish(successful=False)
num_tries += 1
- self.get_logger().info(f"HCM not controllable. Waiting... (try {num_tries})")
- self.get_clock().sleep_until(self.get_clock().now() + Duration(seconds=0.1))
+ self.get_clock().sleep_for(Duration(seconds=0.1))
- if self.hcm_state != 0:
- self.get_logger().info(
- "HCM not controllable. Only sent request to make it come controllable, "
- "but it was not successful until timeout"
- )
- goal.abort()
- return PlayAnimation.Result(successful=False)
+ # Create splines
+ animator = SplineAnimator(
+ self.animation_cache[self.current_animation], self.current_joint_states, self.get_logger(), self.get_clock()
+ )
- animator = self.get_animation_splines(self.current_animation)
+ # Flag that determines that we have send something once
+ once = False
+ # Loop to play the animation
while rclpy.ok() and animator:
try:
last_time = self.get_clock().now()
# if we're here we want to play the next keyframe, cause there is no other goal
# compute next pose
- t = self.get_clock().now().nanoseconds / 1e9 - animator.get_start_time()
+ t = (
+ self.get_clock().now().nanoseconds / 1e9 - animator.get_start_time()
+ ) # time since start of animation
+ # Start later if we have set bounds and only play the part of the animation
+ if request.bounds and request.start > 0:
+ t += animator.get_keyframe_times()[request.start]
+
+ # Get the robot pose at the current time from the spline interpolator
pose = animator.get_positions_rad(t)
- if pose is None:
- # animation is finished
- # tell it to the hcm
- self.send_animation(first=False, last=True, hcm=request.hcm, pose=None, torque=None)
+ # Finish if spline ends or we reaches the explicitly defined premature end
+ if pose is None or (request.bounds and once and t > animator.get_keyframe_times()[request.end - 1]):
+ # Animation is finished, tell it to the hcm (except if it is from the hcm)
+ if not request.hcm:
+ hcm_result = self.hcm_animation_mode.call(SetBool.Request(data=False))
+ if not hcm_result.success:
+ self.get_logger().error(f"Failed to finish animation on HCM. Reason: {hcm_result.message}")
+
+ # We give a positive result
goal.publish_feedback(PlayAnimation.Feedback(percent_done=100))
- # we give a positive result
goal.succeed()
- return PlayAnimation.Result(successful=True)
+ return finish(successful=True)
- self.send_animation(first=first, last=False, hcm=request.hcm, pose=pose, torque=animator.get_torque(t))
+ self.send_animation(from_hcm=request.hcm, pose=pose, torque=animator.get_torque(t))
- first = False # we have sent the first frame, all frames after this can't be the first
perc_done = int(
((self.get_clock().now().nanoseconds / 1e9 - animator.get_start_time()) / animator.get_duration())
* 100
@@ -127,55 +196,55 @@ def execute_cb(self, goal: ServerGoalHandle):
goal.publish_feedback(PlayAnimation.Feedback(percent_done=perc_done))
+ once = True
+
self.get_clock().sleep_until(last_time + Duration(seconds=0.02))
except (ExternalShutdownException, KeyboardInterrupt):
- exit()
- return PlayAnimation.Result(successful=False)
-
- def get_animation_splines(self, animation_name: str):
- if animation_name not in self.animation_cache:
- self.get_logger().error(f"Animation '{animation_name}' not found")
- return
- return SplineAnimator(
- self.animation_cache[animation_name], self.current_joint_states, self.get_logger(), self.get_clock()
- )
+ sys.exit(0)
+ return finish(successful=False)
def update_current_pose(self, msg):
"""Gets the current motor positions and updates the representing pose accordingly."""
self.current_joint_states = msg
- def update_hcm_state(self, msg: RobotControlState):
- self.hcm_state = msg.state
-
- def send_animation_request(self):
- anim_msg = AnimationMsg()
- anim_msg.request = True
- anim_msg.header.stamp = self.get_clock().now().to_msg()
- self.hcm_publisher.publish(anim_msg)
-
- def send_animation(self, first: bool, last: bool, hcm: bool, pose: dict, torque: dict):
+ def send_animation(self, from_hcm: bool, pose: dict, torque: Optional[dict]):
"""Sends an animation to the hcm"""
- anim_msg = AnimationMsg()
- anim_msg.request = False
- anim_msg.first = first
- anim_msg.last = last
- anim_msg.hcm = hcm
- if pose is not None: #
- traj_msg = JointTrajectory()
- traj_msg.joint_names = []
- traj_msg.points = [JointTrajectoryPoint()]
- # We are only using a single point in the trajectory message, since we only want to send a single joint goal
- traj_msg.points[0].positions = []
- traj_msg.points[0].effort = []
- for joint in pose:
- traj_msg.joint_names.append(joint)
- traj_msg.points[0].positions.append(pose[joint])
- if torque:
- # 1 and 2 should be mapped to 1
- traj_msg.points[0].effort.append(np.clip((torque[joint]), 0, 1))
- anim_msg.position = traj_msg
- anim_msg.header.stamp = self.get_clock().now().to_msg()
- self.hcm_publisher.publish(anim_msg)
+ self.hcm_publisher.publish(
+ AnimationMsg(
+ from_hcm=from_hcm,
+ joint_command=JointCommand(
+ header=Header(
+ stamp=self.get_clock().now().to_msg(),
+ ),
+ joint_names=pose.keys(),
+ positions=pose.values(),
+ velocities=[-1.0] * len(pose),
+ accelerations=[-1.0] * len(pose),
+ max_currents=[np.clip((torque[joint]), 0.0, 1.0) for joint in pose]
+ if torque and False # TODO
+ else [-1.0] * len(pose), # fmt: skip
+ ),
+ )
+ )
+
+ def add_animation(self, request: AddAnimation.Request, response: AddAnimation.Response) -> AddAnimation.Response:
+ """
+ Adds an animation to the cache (non persistent).
+ This is useful if e.g. the recording GUI wants to play an uncommitted animation.
+ """
+ try:
+ # Parse the animation
+ new_animation = parse(json.loads(request.json))
+ # Get the name of the animation and add it to the cache
+ self.animation_cache[new_animation.name] = new_animation
+ except ValueError:
+ self.get_logger().error(
+ "Animation provided by service call had a ValueError. "
+ "Probably there is a syntax error in the animation file. "
+ "See traceback"
+ )
+ traceback.print_exc()
+ return response
def main(args=None):
diff --git a/bitbots_motion/bitbots_animation_server/bitbots_animation_server/spline_animator.py b/bitbots_motion/bitbots_animation_server/bitbots_animation_server/spline_animator.py
index a5482b9b9..e2eaac723 100644
--- a/bitbots_motion/bitbots_animation_server/bitbots_animation_server/spline_animator.py
+++ b/bitbots_motion/bitbots_animation_server/bitbots_animation_server/spline_animator.py
@@ -1,4 +1,5 @@
import math
+from copy import deepcopy
from bitbots_splines.smooth_spline import SmoothSpline
from rclpy.clock import Clock
@@ -10,45 +11,55 @@
class SplineAnimator:
def __init__(self, animation: Animation, current_joint_states: JointState, logger: Logger, clock: Clock):
- self.anim = animation
+ self.anim = deepcopy(animation)
self.start_time = clock.now().nanoseconds / 1e9
- self.animation_duration = 0.0
- self.current_point_time = 0.0
+ self.animation_duration: float = 0.0
self.spline_dict: dict[str, SmoothSpline] = {}
self.torques = {}
+ self.keyframe_times: list[float] = []
- # add current joint positions as start
- if current_joint_states is not None:
- i = 0
- for joint in current_joint_states.name:
- if joint not in self.spline_dict:
- self.spline_dict[joint] = SmoothSpline()
- self.spline_dict[joint].add_point(0, math.degrees(current_joint_states.position[i]))
- i += 1
- else:
+ # Add current joint positions as start
+ if current_joint_states is None:
logger.warn("No current joint positions. Will play animation starting from first keyframe.")
- for joint in self.anim.keyframes[0].goals:
- if joint not in self.spline_dict:
- self.spline_dict[joint] = SmoothSpline()
- self.spline_dict[joint].add_point(0, self.anim.keyframes[0].goals[joint])
- # load keyframe positions into the splines
+ # Load keyframe positions into the splines
+ current_point_time = 0.0
for keyframe in self.anim.keyframes:
+ # Calculate important times
self.animation_duration += keyframe.duration + keyframe.pause
- self.current_point_time += keyframe.duration
+ current_point_time += keyframe.duration
+ self.keyframe_times.append(current_point_time)
+ # Add spline point for each joint in this keyframe
for joint in keyframe.goals:
+ # Check if this joint already has a spline from previous keyframes
if joint not in self.spline_dict:
+ # If not, create a new spline
self.spline_dict[joint] = SmoothSpline()
- self.spline_dict[joint].add_point(self.current_point_time, keyframe.goals[joint])
- self.spline_dict[joint].add_point(self.current_point_time + keyframe.pause, keyframe.goals[joint])
- self.current_point_time += keyframe.pause
- self.torques[self.current_point_time] = keyframe.torque
+ # Add the current joint position (if available) as the first point
+ if current_joint_states is not None:
+ # Get the number of the joint in the joint_states message
+ joint_index = current_joint_states.name.index(joint)
+ # Add the current joint position as the point before the first keyframe
+ self.spline_dict[joint].add_point(
+ current_point_time - keyframe.duration,
+ math.degrees(current_joint_states.position[joint_index]),
+ )
+ # Add the keyframe position as the next point
+ self.spline_dict[joint].add_point(current_point_time, keyframe.goals[joint])
+ # Add a second point if we want to hold the position for a while
+ self.spline_dict[joint].add_point(current_point_time + keyframe.pause, keyframe.goals[joint])
+ current_point_time += keyframe.pause
+ self.torques[current_point_time] = keyframe.torque
- # compute the splines
+ # Compute the splines
for joint in self.spline_dict:
self.spline_dict[joint].compute_spline()
+ def get_keyframe_times(self) -> list[float]:
+ assert len(self.keyframe_times) == len(self.anim.keyframes)
+ return self.keyframe_times
+
def get_positions_deg(self, time):
if time < 0 or time > self.animation_duration:
return None
@@ -63,6 +74,7 @@ def get_positions_rad(self, time):
ret_dict = {}
for joint in self.spline_dict:
ret_dict[joint] = math.radians(self.spline_dict[joint].pos(time))
+
return ret_dict
def get_torque(self, current_time):
diff --git a/bitbots_motion/bitbots_dynamic_kick/scripts/dummy_client.py b/bitbots_motion/bitbots_dynamic_kick/scripts/dummy_client.py
index 54db17c94..51e1ec0ed 100755
--- a/bitbots_motion/bitbots_dynamic_kick/scripts/dummy_client.py
+++ b/bitbots_motion/bitbots_dynamic_kick/scripts/dummy_client.py
@@ -73,7 +73,7 @@ def feedback_cb(feedback):
sys.stdout.flush()
client = ActionClient(node, Kick, "dynamic_kick")
if not client.wait_for_server():
- exit(1)
+ sys.exit(1)
print("\r[OK] Connecting to action server 'dynamic_kick'")
print()
diff --git a/bitbots_motion/bitbots_dynamic_kick/scripts/interactive_test.py b/bitbots_motion/bitbots_dynamic_kick/scripts/interactive_test.py
index eb45d597e..affdf4581 100644
--- a/bitbots_motion/bitbots_dynamic_kick/scripts/interactive_test.py
+++ b/bitbots_motion/bitbots_dynamic_kick/scripts/interactive_test.py
@@ -124,7 +124,7 @@ def feedback_cb(feedback):
sys.stdout.flush()
client = ActionClient(KickAction, "dynamic_kick")
if not client.wait_for_server():
- exit(1)
+ sys.exit(1)
robot_x = 2.8
robot_y = 0
diff --git a/bitbots_motion/bitbots_dynup/scripts/dummy_client.py b/bitbots_motion/bitbots_dynup/scripts/dummy_client.py
index 83329b0a8..5e3444b25 100755
--- a/bitbots_motion/bitbots_dynup/scripts/dummy_client.py
+++ b/bitbots_motion/bitbots_dynup/scripts/dummy_client.py
@@ -61,7 +61,7 @@ def feedback_cb(feedback):
sys.stdout.flush()
client = ActionClient(node, Dynup, "dynup")
if not client.wait_for_server():
- exit(1)
+ sys.exit(1)
print("\r[OK] Connecting to action server 'dynup'")
print()
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/play_animation.py b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/play_animation.py
index 269a47e70..c35350f38 100644
--- a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/play_animation.py
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/play_animation.py
@@ -199,3 +199,14 @@ def animation_finished(self):
self.blackboard.dynup_action_current_goal.done()
and self.blackboard.dynup_action_current_goal.result().status == GoalStatus.STATUS_SUCCEEDED
) or self.blackboard.dynup_action_current_goal.cancelled()
+
+
+class CancelAnimation(AbstractHCMActionElement):
+ """
+ This action is used to cancel an animation that is currently playing
+ """
+
+ def perform(self, reevaluate=False):
+ self.blackboard.node.get_logger().info("Canceling animation")
+ self.blackboard.external_animation_running = False
+ return self.pop()
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/wait_for.py b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/wait_for.py
index b500669e7..25705eac8 100644
--- a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/wait_for.py
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/actions/wait_for.py
@@ -45,14 +45,6 @@ def perform(self, reevaluate=False):
)
-class WaitForMotorStartup(AbstractHCMActionElement):
- """
- Waits for the motors on startup without complaining if it takes a moment.
- """
-
- pass
-
-
class WaitForMotors(AbstractHCMActionElement):
"""
Waits for the motors to connect and publishes warnings while doing so
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/animation.py b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/animation.py
index 05690041c..6157dbaae 100644
--- a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/animation.py
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/decisions/animation.py
@@ -23,10 +23,36 @@ class PlayingExternalAnimation(AbstractHCMDecisionElement):
"""
def perform(self, reevaluate=False):
- if self.blackboard.external_animation_running:
- return "ANIMATION_RUNNING"
- else:
+ # Check if the robot is currently playing an animation
+ if not self.blackboard.external_animation_running:
return "FREE"
+ self.blackboard.node.get_logger().info(str(self.blackboard.last_animation_goal_time))
+
+ # We can safely assume that the last animation start time is set because the animation is running
+ # Calculate time since last animation start
+ time_delta_since_last_animation_start = (
+ self.blackboard.node.get_clock().now().nanoseconds / 1e9
+ - self.blackboard.last_animation_start_time.nanoseconds / 1e9
+ )
+ # If the time since the last animation start is less than 0.5 seconds the animation just started and we might not have received the first command yet
+ if time_delta_since_last_animation_start < 0.5:
+ return "ANIMATION_RUNNING"
+
+ # If the animation is running for more longer we check if the last goal was more than 0.5 seconds ago and abort the animation if it was
+ # Check if the last animation goal was more than 0.5 seconds ago
+ if (
+ self.blackboard.last_animation_goal_time is None
+ or (
+ self.blackboard.node.get_clock().now().nanoseconds / 1e9
+ - self.blackboard.last_animation_goal_time.nanoseconds / 1e9
+ )
+ > 0.5
+ ):
+ return "ANIMATION_SERVER_TIMEOUT"
+
+ # We are recieving goals and the animation is running
+ return "ANIMATION_RUNNING"
+
def get_reevaluate(self):
return True
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/hcm.dsd b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/hcm.dsd
index 6cefe0515..e300439ea 100644
--- a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/hcm.dsd
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/hcm.dsd
@@ -1,14 +1,14 @@
-->HCM
$StartHCM
START_UP --> @RobotStateStartup, @PlayAnimationDynup + direction:walkready
- RUNNING --> $RecordAnimation
- RECORD_ACTIVE --> @RobotStateRecord, @Wait
- FREE --> $CheckMotors
- MOTORS_NOT_STARTED --> @RobotStateStartup, @WaitForMotorStartup
- OVERLOAD --> @RobotStateMotorOff, @CancelGoals, @StopWalking, @PlayAnimationFallingFront, @TurnMotorsOff, @Wait
- PROBLEM --> @RobotStateHardwareProblem, @WaitForMotors
- TURN_ON --> @TurnMotorsOn, @PlayAnimationDynup + direction:walkready, @Wait
- OKAY --> $TeachingMode
+ RUNNING --> $CheckMotors
+ MOTORS_NOT_STARTED --> @RobotStateStartup, @Wait
+ OVERLOAD --> @RobotStateMotorOff, @CancelGoals, @StopWalking, @PlayAnimationFallingFront, @TurnMotorsOff, @Wait
+ PROBLEM --> @RobotStateHardwareProblem, @WaitForMotors
+ TURN_ON --> @TurnMotorsOn, @PlayAnimationDynup + direction:walkready, @Wait
+ OKAY --> $RecordAnimation
+ RECORD_ACTIVE --> @RobotStateRecord, @Wait
+ FREE --> $TeachingMode
TEACH --> @RobotStateRecord, @SetTorque + stiff:false, @Wait
HOLD --> @SetTorque + stiff:true, @Wait
FINISHED --> @SetTorque + stiff:true + r:false, @RobotStateControllable, @PlayAnimationDynup + direction:walkready
@@ -34,6 +34,7 @@ $StartHCM
FALLING_BACK --> @RobotStateFalling, @CancelGoals, @StopWalking, @PlayAnimationFallingBack, @Wait
NOT_FALLING --> $PlayingExternalAnimation
ANIMATION_RUNNING --> @StopWalking, @RobotStateAnimationRunning, @Wait
+ ANIMATION_SERVER_TIMEOUT --> @CancelAnimation
FREE --> $RecentWalkingGoals
STAY_WALKING --> @RobotStateWalking, @Wait
NOT_WALKING --> $RecentKickGoals
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/hcm_blackboard.py b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/hcm_blackboard.py
index 5711896ba..7c19e8b5d 100644
--- a/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/hcm_blackboard.py
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/hcm_dsd/hcm_blackboard.py
@@ -37,11 +37,6 @@ def __init__(self, node: Node):
self.foot_zero_service = self.node.create_client(EmptySrv, "set_foot_zero")
self.motor_switch_service = self.node.create_client(SetBool, "core/switch_power")
- # Create services
- self.teaching_mode_service = self.node.create_service(
- SetTeachingMode, "set_teaching_mode", self.set_teaching_mode_callback
- )
-
# Create action clients and corresponding goal handles
self.animation_action_client: ActionClient = ActionClient(self.node, PlayAnimation, "animation")
self.animation_action_current_goal: Optional[Future] = None
@@ -64,9 +59,9 @@ def __init__(self, node: Node):
# Animation
# Animation states
- self.animation_requested: bool = False
self.external_animation_running: bool = False
- self.last_animation_goal_time: Time = self.node.get_clock().now()
+ self.last_animation_goal_time: Optional[Time] = None
+ self.last_animation_start_time: Optional[Time] = None
self.record_active: bool = False
# Get animation parameters
@@ -122,36 +117,3 @@ def __init__(self, node: Node):
def cancel_path_planning(self):
self.cancel_path_planning_pub.publish(EmptyMsg())
-
- def set_teaching_mode_callback(
- self, request: SetTeachingMode.Request, response: SetTeachingMode.Response
- ) -> SetTeachingMode.Response:
- # Store modifiable version of the requested state.
- state = request.state
-
- # Modify requested state if request state is SWITCH so that it switches between HOLD and TEACH after it was turned on once.
- if state == SetTeachingMode.Request.SWITCH:
- if self.teaching_mode_state in [SetTeachingMode.Request.HOLD, SetTeachingMode.Request.OFF]:
- state = SetTeachingMode.Request.TEACH
- elif self.teaching_mode_state == SetTeachingMode.Request.TEACH:
- state = SetTeachingMode.Request.HOLD
-
- # Check if we are able to start the teaching mode
- if state == SetTeachingMode.Request.TEACH and self.current_state not in [
- RobotControlState.CONTROLLABLE,
- RobotControlState.PICKED_UP,
- RobotControlState.PENALTY,
- RobotControlState.RECORD,
- ]:
- # Respond that we can not activate the teaching mode in the current state
- response.success = False
- return response
-
- if state == SetTeachingMode.Request.HOLD and self.teaching_mode_state != SetTeachingMode.Request.TEACH:
- response.success = False
- return response
-
- # Activate / Deactivate teaching mode
- self.teaching_mode_state = state
- response.success = True
- return response
diff --git a/bitbots_motion/bitbots_hcm/bitbots_hcm/humanoid_control_module.py b/bitbots_motion/bitbots_hcm/bitbots_hcm/humanoid_control_module.py
index c3cda8855..fba0e670a 100755
--- a/bitbots_motion/bitbots_hcm/bitbots_hcm/humanoid_control_module.py
+++ b/bitbots_motion/bitbots_hcm/bitbots_hcm/humanoid_control_module.py
@@ -20,9 +20,11 @@
from ros2_numpy import numpify
from sensor_msgs.msg import Imu, JointState
from std_msgs.msg import Bool
+from std_srvs.srv import SetBool
from bitbots_hcm.hcm_dsd.hcm_blackboard import HcmBlackboard
from bitbots_msgs.msg import FootPressure, RobotControlState
+from bitbots_msgs.srv import SetTeachingMode
class HardwareControlManager:
@@ -83,10 +85,17 @@ def __init__(self, use_sim_time, simulation_active, visualization_active):
self.node.create_subscription(Bool, "hcm_deactivate", self.deactivate_cb, 1)
self.node.create_subscription(DiagnosticArray, "diagnostics_agg", self.diag_cb, 1)
+ # Create services
+ self.node.create_service(SetBool, "record_mode", self.set_record_mode_callback)
+ self.node.create_service(SetBool, "play_animation_mode", self.set_animation_mode_callback)
+ self.teaching_mode_service = self.node.create_service(
+ SetTeachingMode, "teaching_mode", self.set_teaching_mode_callback
+ )
+
# Store time of the last tick
self.last_tick_start_time = self.node.get_clock().now()
- # Anounce the HCM startup
+ # Announce the HCM startup
speak("Starting HCM", self.blackboard.speak_publisher, priority=50)
def tick(self):
@@ -140,19 +149,75 @@ def get_state(self) -> RobotControlState:
"""Returns the current state of the HCM."""
return self.blackboard.current_state
- # The following methods are used to set the blackboard values from the cpp part
-
- def set_animation_requested(self, animation_requested: bool):
- self.blackboard.animation_requested = animation_requested
-
- def set_external_animation_running(self, running: bool):
- self.blackboard.external_animation_running = running
+ def set_animation_mode_callback(self, request: SetBool.Request, response: SetBool.Response):
+ # Check if the robot is in a state where it is allowed to play animations
+ if request.data: # We want to go into the animation mode
+ if self.blackboard.current_state in [RobotControlState.CONTROLLABLE, RobotControlState.RECORD]:
+ self.blackboard.last_animation_start_time = self.node.get_clock().now()
+ self.blackboard.external_animation_running = True
+ response.success = True
+ response.message = "Robot is now in animation mode, have fun!"
+ return response
+ else:
+ response.success = False
+ response.message = "Robot is not in a state where it is allowed to play new animations"
+ return response
+ else: # We want to go out of the animation mode
+ if self.blackboard.current_state in [RobotControlState.ANIMATION_RUNNING, RobotControlState.RECORD]:
+ self.blackboard.external_animation_running = False
+ response.success = True
+ response.message = "Animation stopped successfully"
+ return response
+ else:
+ response.success = False
+ response.message = "Robot is not in a state where it is allowed to stop animations"
+ return response
+
+ def set_record_mode_callback(self, request: SetBool.Request, response: SetBool.Response):
+ self.blackboard.record_active = request.data
+ response.success = True
+ return response
+
+ def set_teaching_mode_callback(
+ self, request: SetTeachingMode.Request, response: SetTeachingMode.Response
+ ) -> SetTeachingMode.Response:
+ # Store modifiable version of the requested state.
+ state = request.state
+
+ # Modify requested state if request state is SWITCH so that it switches between HOLD and TEACH after it was turned on once.
+ if state == SetTeachingMode.Request.SWITCH:
+ if self.blackboard.teaching_mode_state in [SetTeachingMode.Request.HOLD, SetTeachingMode.Request.OFF]:
+ state = SetTeachingMode.Request.TEACH
+ elif self.blackboard.teaching_mode_state == SetTeachingMode.Request.TEACH:
+ state = SetTeachingMode.Request.HOLD
+
+ # Check if we are able to start the teaching mode
+ if state == SetTeachingMode.Request.TEACH and self.blackboard.current_state not in [
+ RobotControlState.CONTROLLABLE,
+ RobotControlState.PICKED_UP,
+ RobotControlState.PENALTY,
+ RobotControlState.RECORD,
+ ]:
+ # Respond that we can not activate the teaching mode in the current state
+ response.success = False
+ return response
+
+ if (
+ state == SetTeachingMode.Request.HOLD
+ and self.blackboard.teaching_mode_state != SetTeachingMode.Request.TEACH
+ ):
+ response.success = False
+ return response
+
+ # Activate / Deactivate teaching mode
+ self.blackboard.teaching_mode_state = state
+ response.success = True
+ return response
- def set_record_active(self, active: bool):
- self.blackboard.record_active = active
+ # The following methods are used to set the blackboard values from the cpp part
def set_last_animation_goal_time(self, time_msg_serialized: bytes):
- self.blackboard.last_animation_goal_time = deserialize_message(time_msg_serialized, TimeMsg)
+ self.blackboard.last_animation_goal_time = Time.from_msg(deserialize_message(time_msg_serialized, TimeMsg))
def set_last_walking_goal_time(self, time_msg_serialized: bytes):
self.blackboard.last_walking_goal_time = Time.from_msg(deserialize_message(time_msg_serialized, TimeMsg))
diff --git a/bitbots_motion/bitbots_hcm/src/hcm.cpp b/bitbots_motion/bitbots_hcm/src/hcm.cpp
index e29f89c6f..b5ea88f23 100644
--- a/bitbots_motion/bitbots_hcm/src/hcm.cpp
+++ b/bitbots_motion/bitbots_hcm/src/hcm.cpp
@@ -33,27 +33,6 @@ class HCM_CPP : public rclcpp::Node {
this->get_parameter("simulation_active", simulation_active);
this->get_parameter("visualization_active", visualization_active);
- // HCM state
- current_state_ = bitbots_msgs::msg::RobotControlState::STARTUP;
-
- // Sensor states
- current_imu_ = sensor_msgs::msg::Imu();
- current_pressure_left_ = bitbots_msgs::msg::FootPressure();
- current_pressure_right_ = bitbots_msgs::msg::FootPressure();
- current_joint_state_ = sensor_msgs::msg::JointState();
-
- // Walking state
- last_walking_time_ = builtin_interfaces::msg::Time();
-
- // Kick state
- last_kick_time_ = builtin_interfaces::msg::Time();
-
- // Animation state
- record_active_ = false;
- external_animation_running_ = false;
- animation_requested_ = false;
- last_animation_goal_time_ = builtin_interfaces::msg::Time();
-
// Initialize HCM logic
// Import Python module
// "from bitbots_hcm.humanoid_control_module import HardwareControlManager"
@@ -94,73 +73,16 @@ class HCM_CPP : public rclcpp::Node {
void animation_callback(bitbots_msgs::msg::Animation msg) {
// The animation server is sending us goal positions for the next keyframe
- last_animation_goal_time_ = msg.header.stamp;
-
- // Check if the message is an animation request
- if (msg.request) {
- RCLCPP_INFO(this->get_logger(), "Got Animation request. HCM will try to get controllable now.");
- // Animation has to wait
- // DSD should try to become controllable
- animation_requested_ = true;
- return;
- }
-
- // Check if the message is the start of an animation
- if (msg.first) {
- if (msg.hcm) {
- // This was an animation from the DSD
- // We don't have to do anything, since we must be in the right state
- } else {
- // Coming from outside
- // Check if we can run an animation now
- if (current_state_ != bitbots_msgs::msg::RobotControlState::CONTROLLABLE) {
- RCLCPP_WARN(this->get_logger(), "HCM is not controllable, animation refused.");
- } else {
- // We're already controllable, go to animation running
- external_animation_running_ = true;
- }
- }
- }
-
- // Check if the message is the end of an animation
- if (msg.last) {
- if (msg.hcm) {
- // This was an animation from the DSD
- // We don't have to do anything, since we must be in the right state
- } else {
- // This is the last frame, we want to tell the DSD that we're finished with the animations
- external_animation_running_ = false;
- if (msg.position.points.size() == 0) {
- // Probably this was just to tell us we're finished
- // we don't need to set another position to the motors
- return;
- }
- }
- }
+ last_animation_goal_time_ = msg.joint_command.header.stamp;
// Forward joint positions to motors if there are any and we're in the right state
- if (msg.position.points.size() > 0 && (current_state_ == bitbots_msgs::msg::RobotControlState::CONTROLLABLE ||
- current_state_ == bitbots_msgs::msg::RobotControlState::ANIMATION_RUNNING ||
- current_state_ == bitbots_msgs::msg::RobotControlState::FALLING ||
- current_state_ == bitbots_msgs::msg::RobotControlState::FALLEN)) {
- bitbots_msgs::msg::JointCommand out_msg = bitbots_msgs::msg::JointCommand();
- out_msg.positions = msg.position.points[0].positions;
- out_msg.joint_names = msg.position.joint_names;
- int number_joints = out_msg.joint_names.size();
- std::vector values = {};
- for (int i = 0; i < number_joints; i++) {
- values.push_back(-1.0);
- }
- out_msg.accelerations = values;
- out_msg.velocities = values;
- out_msg.max_currents = values;
- if (msg.position.points[0].effort.size() != 0) {
- out_msg.max_currents = {};
- for (int i = 0; i < msg.position.points[0].effort.size(); i++) {
- out_msg.max_currents.push_back(msg.position.points[0].effort[i]);
- }
- }
- pub_controller_command_->publish(out_msg);
+ // The right state is either one of the animation robot control states or if the animation is from the HCM
+ if (msg.joint_command.positions.size() > 0 and
+ ((current_state_ == bitbots_msgs::msg::RobotControlState::ANIMATION_RUNNING or
+ current_state_ == bitbots_msgs::msg::RobotControlState::RECORD) or
+ msg.from_hcm)) {
+ // We can forward the animation goal to the motors
+ pub_controller_command_->publish(msg.joint_command);
}
}
@@ -191,11 +113,7 @@ class HCM_CPP : public rclcpp::Node {
}
void record_goal_callback(const bitbots_msgs::msg::JointCommand msg) {
- if (msg.joint_names.size() == 0) {
- // record tells us that its finished
- record_active_ = false;
- } else {
- record_active_ = true;
+ if (msg.joint_names.size() == 0 && current_state_ == bitbots_msgs::msg::RobotControlState::RECORD) {
pub_controller_command_->publish(msg);
}
}
@@ -219,23 +137,33 @@ class HCM_CPP : public rclcpp::Node {
void tick() {
// Performs one tick of the HCM DSD
- // Pass all the data nessesary data to the python module
- hcm_py_.attr("set_imu")(ros2_python_extension::toPython(current_imu_));
- hcm_py_.attr("set_pressure_left")(
- ros2_python_extension::toPython(current_pressure_left_));
- hcm_py_.attr("set_pressure_right")(
- ros2_python_extension::toPython(current_pressure_right_));
- hcm_py_.attr("set_current_joint_state")(
- ros2_python_extension::toPython(current_joint_state_));
- hcm_py_.attr("set_last_walking_goal_time")(
- ros2_python_extension::toPython(last_walking_time_));
- hcm_py_.attr("set_last_kick_goal_time")(
- ros2_python_extension::toPython(last_kick_time_));
- hcm_py_.attr("set_record_active")(record_active_);
- hcm_py_.attr("set_external_animation_running")(external_animation_running_);
- hcm_py_.attr("set_animation_requested")(animation_requested_);
- hcm_py_.attr("set_last_animation_goal_time")(
- ros2_python_extension::toPython(last_animation_goal_time_));
+ // Pass the data we have got until now to the python module
+ if (current_imu_) {
+ hcm_py_.attr("set_imu")(ros2_python_extension::toPython(current_imu_.value()));
+ }
+ if (current_pressure_left_) {
+ hcm_py_.attr("set_pressure_left")(
+ ros2_python_extension::toPython(current_pressure_left_.value()));
+ }
+ if (current_pressure_right_) {
+ hcm_py_.attr("set_pressure_right")(
+ ros2_python_extension::toPython(current_pressure_right_.value()));
+ }
+ if (current_joint_state_)
+ hcm_py_.attr("set_current_joint_state")(
+ ros2_python_extension::toPython(current_joint_state_.value()));
+ if (last_walking_time_) {
+ hcm_py_.attr("set_last_walking_goal_time")(
+ ros2_python_extension::toPython(last_walking_time_.value()));
+ }
+ if (last_kick_time_) {
+ hcm_py_.attr("set_last_kick_goal_time")(
+ ros2_python_extension::toPython(last_kick_time_.value()));
+ }
+ if (last_animation_goal_time_) {
+ hcm_py_.attr("set_last_animation_goal_time")(
+ ros2_python_extension::toPython(last_animation_goal_time_.value()));
+ }
// Run HCM Python DSD code
hcm_py_.attr("tick")();
@@ -257,25 +185,22 @@ class HCM_CPP : public rclcpp::Node {
// Python hcm module
py::object hcm_py_;
// The current robot state
- int current_state_;
+ int current_state_ = bitbots_msgs::msg::RobotControlState::STARTUP;
// Sensor states
- sensor_msgs::msg::Imu current_imu_;
- bitbots_msgs::msg::FootPressure current_pressure_left_;
- bitbots_msgs::msg::FootPressure current_pressure_right_;
- sensor_msgs::msg::JointState current_joint_state_;
+ std::optional current_imu_;
+ std::optional current_pressure_left_;
+ std::optional current_pressure_right_;
+ std::optional current_joint_state_;
// Walking state
- builtin_interfaces::msg::Time last_walking_time_;
+ std::optional last_walking_time_;
// Kick state
- builtin_interfaces::msg::Time last_kick_time_;
+ std::optional last_kick_time_;
// Animation states
- bool record_active_;
- bool external_animation_running_;
- bool animation_requested_;
- builtin_interfaces::msg::Time last_animation_goal_time_;
+ std::optional last_animation_goal_time_;
// Publishers
rclcpp::Publisher::SharedPtr pub_controller_command_;
diff --git a/bitbots_motion/bitbots_head_mover/src/move_head.cpp b/bitbots_motion/bitbots_head_mover/src/move_head.cpp
index 478e47ecd..8d9e61ef5 100644
--- a/bitbots_motion/bitbots_head_mover/src/move_head.cpp
+++ b/bitbots_motion/bitbots_head_mover/src/move_head.cpp
@@ -56,8 +56,8 @@ class HeadMover {
std::shared_ptr tf_listener_;
// Declare variables
- uint head_mode_;
- std::shared_ptr current_joint_state_;
+ uint head_mode_ = bitbots_msgs::msg::HeadMode::LOOK_FORWARD;
+ std::optional current_joint_state_;
bitbots_msgs::msg::JointCommand pos_msg_;
geometry_msgs::msg::PoseWithCovarianceStamped tf_precision_pose_;
@@ -192,7 +192,7 @@ class HeadMover {
/**
* @brief Callback used to get updates of the current joint states of the robot
*/
- void joint_state_callback(const sensor_msgs::msg::JointState::SharedPtr msg) { current_joint_state_ = msg; }
+ void joint_state_callback(const sensor_msgs::msg::JointState::SharedPtr msg) { current_joint_state_ = *msg; }
/***
* @brief Handles the goal request for the look at action
@@ -714,6 +714,11 @@ class HeadMover {
// Pull the parameters from the parameter server
params_ = param_listener_->get_params();
+ // Check if we received the joint states yet and if not, return
+ if (!current_joint_state_) {
+ return;
+ }
+
// Check if the head mode changed and if so, update the search pattern
if (prev_head_mode_ != curr_head_mode) {
switch (curr_head_mode) {
diff --git a/bitbots_motion/bitbots_moveit_bindings/bitbots_moveit_bindings/__init__.py b/bitbots_motion/bitbots_moveit_bindings/bitbots_moveit_bindings/__init__.py
index 4f784ca84..5fae02b8b 100644
--- a/bitbots_motion/bitbots_moveit_bindings/bitbots_moveit_bindings/__init__.py
+++ b/bitbots_motion/bitbots_moveit_bindings/bitbots_moveit_bindings/__init__.py
@@ -27,8 +27,7 @@ def set_moveit_parameters(parameters: [Parameter], type):
elif type == "collision":
collision_state = BitbotsMoveitBindings("moveit_bindings_collision", serialized_parameters)
else:
- print("Type for bitbots_moveit_bindings state not correctly defined")
- exit(1)
+ raise ValueError("Type for bitbots_moveit_bindings state not correctly defined")
def get_position_ik(request: GetPositionIK.Request, approximate=False):
diff --git a/bitbots_motion/bitbots_rl_motion/scripts/rl_walk.py b/bitbots_motion/bitbots_rl_motion/scripts/rl_walk.py
index d0bc4c900..c614c8232 100755
--- a/bitbots_motion/bitbots_rl_motion/scripts/rl_walk.py
+++ b/bitbots_motion/bitbots_rl_motion/scripts/rl_walk.py
@@ -1,6 +1,7 @@
#! /usr/bin/env python3
import os
+import sys
import rclpy
import yaml
@@ -29,7 +30,7 @@
env_kwargs = loaded_args["env_kwargs"]
else:
node.get_logger().fatal(f"No args.yml found in {args_path}")
- exit()
+ sys.exit()
env_kwargs["node"] = node
print(env_kwargs)
diff --git a/bitbots_motion/bitbots_splines/bitbots_splines/smooth_spline.py b/bitbots_motion/bitbots_splines/bitbots_splines/smooth_spline.py
index 119b2d074..f3ae15821 100644
--- a/bitbots_motion/bitbots_splines/bitbots_splines/smooth_spline.py
+++ b/bitbots_motion/bitbots_splines/bitbots_splines/smooth_spline.py
@@ -85,7 +85,9 @@ def polynom_fit(self, t, pos1, vel1, acc1, pos2, vel2, acc2):
def interpolation(self, x, func):
# Empty case
if len(self.splines_parts) == 0:
- exit("Error: no spline parts. did you call compute_spline()?")
+ raise ValueError(
+ "Error: no spline parts. did you call compute_spline()? Did you add any point to the spline?"
+ )
# Bound asked abscisse into spline range
if x <= self.splines_parts[0].min:
diff --git a/bitbots_msgs/CMakeLists.txt b/bitbots_msgs/CMakeLists.txt
index 53ccf0b2b..2488d45d1 100644
--- a/bitbots_msgs/CMakeLists.txt
+++ b/bitbots_msgs/CMakeLists.txt
@@ -41,6 +41,7 @@ rosidl_generate_interfaces(
"msg/TeamData.msg"
"msg/Workload.msg"
"srv/AccelerometerCalibration.srv"
+ "srv/AddAnimation.srv"
"srv/ComplementaryFilterParams.srv"
"srv/FootScale.srv"
"srv/IMURanges.srv"
diff --git a/bitbots_msgs/action/PlayAnimation.action b/bitbots_msgs/action/PlayAnimation.action
index b1d90f3ac..de93461d1 100644
--- a/bitbots_msgs/action/PlayAnimation.action
+++ b/bitbots_msgs/action/PlayAnimation.action
@@ -3,11 +3,20 @@
# Name of the animation
string animation
-# If the animation call comes from the hardware control manager,
+# If the animation call comes from the humanoid control module,
# it should be played directly, interrupting other animations and walking
# It's probably falling or standing up
bool hcm
+# Debugging controls
+bool bounds # If true, only a specific range of keyframes will be played
+# Keyframe where we want to start (use 0 as the start of the animation)
+uint8 start
+# Keyframe where we want to end (use len(animation) as the end of the animation)
+uint8 end
+
+
+
---
# Result definition
bool successful
diff --git a/bitbots_msgs/msg/Animation.msg b/bitbots_msgs/msg/Animation.msg
index f89162176..7f28d52ff 100644
--- a/bitbots_msgs/msg/Animation.msg
+++ b/bitbots_msgs/msg/Animation.msg
@@ -1,16 +1,4 @@
-std_msgs/Header header
+# Is this animation coming from the humanoid control module and can be executed during all robot states
+bool from_hcm
-# This is a request to make HCM controllable, e.g. stop walking
-bool request
-
-# First message of this animation
-bool first
-
-# Last message of this animation
-bool last
-
-# Is this animation coming from the hardware control manager
-bool hcm
-
-# Joint goals
-trajectory_msgs/JointTrajectory position
+bitbots_msgs/JointCommand joint_command
diff --git a/bitbots_msgs/srv/AddAnimation.srv b/bitbots_msgs/srv/AddAnimation.srv
new file mode 100644
index 000000000..e0f48037f
--- /dev/null
+++ b/bitbots_msgs/srv/AddAnimation.srv
@@ -0,0 +1,4 @@
+string json
+
+---
+
diff --git a/bitbots_wolfgang/wolfgang_animations/animations/motion/cheering.json b/bitbots_wolfgang/wolfgang_animations/animations/motion/cheering.json
index 75c6fb515..58eb799ac 100644
--- a/bitbots_wolfgang/wolfgang_animations/animations/motion/cheering.json
+++ b/bitbots_wolfgang/wolfgang_animations/animations/motion/cheering.json
@@ -1,171 +1,171 @@
{
- "author": "Unknown",
- "description": "Cheers with the arms up.",
- "keyframes": [
- {
- "duration": 1.0,
- "goals": {
- "HeadPan": 0,
- "HeadTilt": 0,
- "LAnklePitch": -25.850701706506673,
- "LAnkleRoll": 3.579863153426792,
- "LElbow": 2,
- "LHipPitch": 27.113534608980995,
- "LHipRoll": 3.5219474542654146,
- "LHipYaw": -0.6417206866754275,
- "LKnee": 57.63156282496905,
- "LShoulderPitch": -103,
- "LShoulderRoll": -28,
- "RAnklePitch": 25.85125887645997,
- "RAnkleRoll": -3.5800332870267404,
- "RElbow": 4,
- "RHipPitch": -27.11437498138635,
- "RHipRoll": -3.5219753606223687,
- "RHipYaw": 0.641732775157822,
- "RKnee": -57.63268422169942,
- "RShoulderPitch": 99,
- "RShoulderRoll": 46
- },
- "name": "new framed",
- "pause": 0.5
- },
- {
- "duration": 1.0,
- "goals": {
- "HeadPan": 0,
- "HeadTilt": 0,
- "LAnklePitch": -25.850701706506673,
- "LAnkleRoll": 3.579863153426792,
- "LElbow": -66,
- "LHipPitch": 27.113534608980995,
- "LHipRoll": 3.5219474542654146,
- "LHipYaw": -0.6417206866754275,
- "LKnee": 57.63156282496905,
- "LShoulderPitch": -170,
- "LShoulderRoll": -18,
- "RAnklePitch": 25.85125887645997,
- "RAnkleRoll": -3.5800332870267404,
- "RElbow": 73,
- "RHipPitch": -27.11437498138635,
- "RHipRoll": -3.5219753606223687,
- "RHipYaw": 0.641732775157822,
- "RKnee": -57.63268422169942,
- "RShoulderPitch": 174,
- "RShoulderRoll": 14
- },
- "name": "new framedd",
- "pause": 0.5
- },
- {
- "duration": 1.0,
- "goals": {
- "HeadPan": 0,
- "HeadTilt": 0,
- "LAnklePitch": -25.850701706506673,
- "LAnkleRoll": 3.579863153426792,
- "LElbow": -58,
- "LHipPitch": 27.113534608980995,
- "LHipRoll": 3.5219474542654146,
- "LHipYaw": -0.6417206866754275,
- "LKnee": 57.63156282496905,
- "LShoulderPitch": -166,
- "LShoulderRoll": -37,
- "RAnklePitch": 25.85125887645997,
- "RAnkleRoll": -3.5800332870267404,
- "RElbow": 74,
- "RHipPitch": -27.11437498138635,
- "RHipRoll": -3.5219753606223687,
- "RHipYaw": 0.641732775157822,
- "RKnee": -57.63268422169942,
- "RShoulderPitch": 175,
- "RShoulderRoll": 0
- },
- "name": "new frameddd",
- "pause": 0.5
- },
- {
- "duration": 0.5,
- "goals": {
- "HeadPan": 0,
- "HeadTilt": 0,
- "LAnklePitch": -25.850701706506673,
- "LAnkleRoll": 3.579863153426792,
- "LElbow": -54,
- "LHipPitch": 27.113534608980995,
- "LHipRoll": 3.5219474542654146,
- "LHipYaw": -0.6417206866754275,
- "LKnee": 57.63156282496905,
- "LShoulderPitch": -165,
- "LShoulderRoll": 0,
- "RAnklePitch": 25.85125887645997,
- "RAnkleRoll": -3.5800332870267404,
- "RElbow": 71,
- "RHipPitch": -27.11437498138635,
- "RHipRoll": -3.5219753606223687,
- "RHipYaw": 0.641732775157822,
- "RKnee": -57.63268422169942,
- "RShoulderPitch": 174,
- "RShoulderRoll": 23
- },
- "name": "new framedddd",
- "pause": 0.5
- },
- {
- "duration": 0.5,
- "goals": {
- "HeadPan": 0,
- "HeadTilt": 0,
- "LAnklePitch": -25.850701706506673,
- "LAnkleRoll": 3.579863153426792,
- "LElbow": -54,
- "LHipPitch": 27.113534608980995,
- "LHipRoll": 3.5219474542654146,
- "LHipYaw": -0.6417206866754275,
- "LKnee": 57.63156282496905,
- "LShoulderPitch": -165,
- "LShoulderRoll": 0,
- "RAnklePitch": 25.85125887645997,
- "RAnkleRoll": -3.5800332870267404,
- "RElbow": 71,
- "RHipPitch": -27.11437498138635,
- "RHipRoll": -3.5219753606223687,
- "RHipYaw": 0.641732775157822,
- "RKnee": -57.63268422169942,
- "RShoulderPitch": 174,
- "RShoulderRoll": 23
- },
- "name": "new framedddddd",
- "pause": 0.5
- },
- {
- "duration": 1.0,
- "goals": {
- "HeadPan": 0.0,
- "HeadTilt": 0.0,
- "LHipYaw": -0.00014488551936841167,
- "LHipRoll": 5.843569741199503,
- "LHipPitch": 16.921439726925605,
- "LKnee": 63.779899882296974,
- "LAnklePitch": -31.85822805341032,
- "LAnkleRoll": 5.843636389466469,
- "LShoulderPitch": 75.27910266806222,
- "LShoulderRoll": -0.020062925889149368,
- "LElbow": 35.86471433552932,
- "RHipYaw": 0.000308978186241276,
- "RHipRoll": -5.563113355828915,
- "RHipPitch": -16.845416637922956,
- "RKnee": -63.811661089644105,
- "RAnklePitch": 31.965934474853857,
- "RAnkleRoll": -6.850938895767449,
- "RShoulderPitch": -75.58926547530199,
- "RShoulderRoll": 0.019571467194590405,
- "RElbow": -36.10375237446914
- },
- "name": "walkready",
- "pause": 0.5
- }
- ],
- "last_edited": "2018-06-20 18:05:13.810759",
- "name": "cheering",
- "version": "0"
+ "author": "Unknown",
+ "description": "Cheers with the arms up.",
+ "keyframes": [
+ {
+ "duration": 1.0,
+ "goals": {
+ "HeadPan": 0.0,
+ "HeadTilt": 0.0,
+ "LAnklePitch": -25.850701706506673,
+ "LAnkleRoll": 3.579863153426792,
+ "LElbow": 2.0,
+ "LHipPitch": 27.113534608980995,
+ "LHipRoll": 3.5219474542654146,
+ "LHipYaw": -0.6417206866754275,
+ "LKnee": 57.63156282496905,
+ "LShoulderPitch": -103.0,
+ "LShoulderRoll": -28.0,
+ "RAnklePitch": 25.85125887645997,
+ "RAnkleRoll": -3.5800332870267404,
+ "RElbow": 4.0,
+ "RHipPitch": -27.11437498138635,
+ "RHipRoll": -3.5219753606223687,
+ "RHipYaw": 0.641732775157822,
+ "RKnee": -57.63268422169942,
+ "RShoulderPitch": 99.0,
+ "RShoulderRoll": 46.0
+ },
+ "name": "new framed",
+ "pause": 0.5
+ },
+ {
+ "duration": 1.0,
+ "goals": {
+ "HeadPan": 0.0,
+ "HeadTilt": 0.0,
+ "LAnklePitch": -25.850701706506673,
+ "LAnkleRoll": 3.579863153426792,
+ "LElbow": -66.0,
+ "LHipPitch": 27.113534608980995,
+ "LHipRoll": 3.5219474542654146,
+ "LHipYaw": -0.6417206866754275,
+ "LKnee": 57.63156282496905,
+ "LShoulderPitch": -170.0,
+ "LShoulderRoll": -18.0,
+ "RAnklePitch": 25.85125887645997,
+ "RAnkleRoll": -3.5800332870267404,
+ "RElbow": 73.0,
+ "RHipPitch": -27.11437498138635,
+ "RHipRoll": -3.5219753606223687,
+ "RHipYaw": 0.641732775157822,
+ "RKnee": -57.63268422169942,
+ "RShoulderPitch": 174.0,
+ "RShoulderRoll": 14.0
+ },
+ "name": "new framedd",
+ "pause": 0.5
+ },
+ {
+ "duration": 1.0,
+ "goals": {
+ "HeadPan": 0.0,
+ "HeadTilt": 0.0,
+ "LAnklePitch": -25.850701706506673,
+ "LAnkleRoll": 3.579863153426792,
+ "LElbow": -58.00000000000001,
+ "LHipPitch": 27.113534608980995,
+ "LHipRoll": 3.5219474542654146,
+ "LHipYaw": -0.6417206866754275,
+ "LKnee": 57.63156282496905,
+ "LShoulderPitch": -166.0,
+ "LShoulderRoll": -37.0,
+ "RAnklePitch": 25.85125887645997,
+ "RAnkleRoll": -3.5800332870267404,
+ "RElbow": 74.0,
+ "RHipPitch": -27.11437498138635,
+ "RHipRoll": -3.5219753606223687,
+ "RHipYaw": 0.641732775157822,
+ "RKnee": -57.63268422169942,
+ "RShoulderPitch": 175.0,
+ "RShoulderRoll": 0.0
+ },
+ "name": "new frameddd",
+ "pause": 0.5
+ },
+ {
+ "duration": 0.5,
+ "goals": {
+ "HeadPan": 0.0,
+ "HeadTilt": 0.0,
+ "LAnklePitch": -25.850701706506673,
+ "LAnkleRoll": 3.579863153426792,
+ "LElbow": -54.0,
+ "LHipPitch": 27.113534608980995,
+ "LHipRoll": 3.5219474542654146,
+ "LHipYaw": -0.6417206866754275,
+ "LKnee": 57.63156282496905,
+ "LShoulderPitch": -165.0,
+ "LShoulderRoll": 0.0,
+ "RAnklePitch": 25.85125887645997,
+ "RAnkleRoll": -3.5800332870267404,
+ "RElbow": 71.0,
+ "RHipPitch": -27.11437498138635,
+ "RHipRoll": -3.5219753606223687,
+ "RHipYaw": 0.641732775157822,
+ "RKnee": -57.63268422169942,
+ "RShoulderPitch": 174.0,
+ "RShoulderRoll": 23.0
+ },
+ "name": "new framedddd",
+ "pause": 0.5
+ },
+ {
+ "duration": 0.5,
+ "goals": {
+ "HeadPan": 0.0,
+ "HeadTilt": 0.0,
+ "LAnklePitch": -25.850701706506673,
+ "LAnkleRoll": 3.579863153426792,
+ "LElbow": -54.0,
+ "LHipPitch": 27.113534608980995,
+ "LHipRoll": 3.5219474542654146,
+ "LHipYaw": -0.6417206866754275,
+ "LKnee": 57.63156282496905,
+ "LShoulderPitch": -165.0,
+ "LShoulderRoll": 0.0,
+ "RAnklePitch": 25.85125887645997,
+ "RAnkleRoll": -3.5800332870267404,
+ "RElbow": 71.0,
+ "RHipPitch": -27.11437498138635,
+ "RHipRoll": -3.5219753606223687,
+ "RHipYaw": 0.641732775157822,
+ "RKnee": -57.63268422169942,
+ "RShoulderPitch": 174.0,
+ "RShoulderRoll": 23.0
+ },
+ "name": "new framedddddd",
+ "pause": 0.5
+ },
+ {
+ "duration": 1.0,
+ "goals": {
+ "HeadPan": 0.0,
+ "HeadTilt": 0.0,
+ "LAnklePitch": -31.860000000000003,
+ "LAnkleRoll": 5.84,
+ "LElbow": 35.86,
+ "LHipPitch": 27.11,
+ "LHipRoll": 5.84,
+ "LHipYaw": -0.0,
+ "LKnee": 63.78,
+ "LShoulderPitch": 75.28,
+ "LShoulderRoll": -0.02,
+ "RAnklePitch": 31.970000000000002,
+ "RAnkleRoll": -6.8500000000000005,
+ "RElbow": -36.1,
+ "RHipPitch": -27.11,
+ "RHipRoll": -5.56,
+ "RHipYaw": 0.0,
+ "RKnee": -63.81,
+ "RShoulderPitch": -75.59,
+ "RShoulderRoll": 0.02
+ },
+ "name": "walkready",
+ "pause": 0.5
+ }
+ ],
+ "last_edited": "2024-03-13 21:02:47.391643",
+ "name": "cheering",
+ "version": "0"
}
diff --git a/bitbots_wolfgang/wolfgang_webots_sim/scripts/fix_urdf_for_webots.py b/bitbots_wolfgang/wolfgang_webots_sim/scripts/fix_urdf_for_webots.py
index e8a3afdd1..f773964d8 100755
--- a/bitbots_wolfgang/wolfgang_webots_sim/scripts/fix_urdf_for_webots.py
+++ b/bitbots_wolfgang/wolfgang_webots_sim/scripts/fix_urdf_for_webots.py
@@ -5,7 +5,7 @@
if len(sys.argv) != 3:
print(f"Usage: {sys.argv[0]} ")
- exit(1)
+ sys.exit(1)
infile_path = sys.argv[1]
outfile_path = sys.argv[2]
diff --git a/bitbots_wolfgang/wolfgang_webots_sim/scripts/start_simulator.py b/bitbots_wolfgang/wolfgang_webots_sim/scripts/start_simulator.py
index 352f1722d..57d696dd6 100755
--- a/bitbots_wolfgang/wolfgang_webots_sim/scripts/start_simulator.py
+++ b/bitbots_wolfgang/wolfgang_webots_sim/scripts/start_simulator.py
@@ -1,6 +1,7 @@
#!/usr/bin/env python3
import argparse
import subprocess
+import sys
import threading
import rclpy
@@ -49,9 +50,9 @@ def __init__(self, nogui, multi_robot, headless, sim_port, robot_type):
def run_simulation(self):
# join with child process
try:
- exit(self.sim_proc.wait())
+ sys.exit(self.sim_proc.wait())
except KeyboardInterrupt:
- exit(self.sim_proc.returncode)
+ sys.exit(self.sim_proc.returncode)
if __name__ == "__main__":
diff --git a/bitbots_wolfgang/wolfgang_webots_sim/wolfgang_webots_sim/webots_robot_controller.py b/bitbots_wolfgang/wolfgang_webots_sim/wolfgang_webots_sim/webots_robot_controller.py
index ee7f06eaa..0ba93ed38 100644
--- a/bitbots_wolfgang/wolfgang_webots_sim/wolfgang_webots_sim/webots_robot_controller.py
+++ b/bitbots_wolfgang/wolfgang_webots_sim/wolfgang_webots_sim/webots_robot_controller.py
@@ -574,8 +574,7 @@ def __init__(
gyro_name = "imu gyro"
camera_name = "camera"
else:
- self.ros_node.get_logger().error("Robot type not supported: %s" % robot)
- exit()
+ raise ValueError(f"Robot type not supported: {robot}")
self.motor_names_to_external_names = {}
self.external_motor_names_to_motor_names = {}
diff --git a/scripts/deploy/deploy_robots.py b/scripts/deploy/deploy_robots.py
index faa3dcb6f..ea1dd07cc 100644
--- a/scripts/deploy/deploy_robots.py
+++ b/scripts/deploy/deploy_robots.py
@@ -1,5 +1,6 @@
import argparse
import os
+import sys
from typing import Optional
from deploy.misc import (
@@ -226,8 +227,8 @@ def run_tasks(self) -> None:
if not results.failed:
print_success(f"{task_prefix} completed.")
elif results.failed:
- print_err(f"{task_prefix} failed on the following hosts: {task._succeeded_hosts(results)}")
- exit(1)
+ print_err(f"{task_prefix} failed on the following hosts: {task._failed_hosts(results)}")
+ sys.exit(1)
current_task += 1
# Close connections
diff --git a/scripts/deploy/misc.py b/scripts/deploy/misc.py
index 156d5ba62..1feec6a4b 100644
--- a/scripts/deploy/misc.py
+++ b/scripts/deploy/misc.py
@@ -2,6 +2,7 @@
import ipaddress
import os
import subprocess
+import sys
from typing import Any, Iterable, Optional
import yaml
@@ -93,7 +94,7 @@ def hide_output() -> bool | str:
KNOWN_TARGETS: dict[str, dict[str, str]] = yaml.safe_load(f)
except FileNotFoundError:
print_err(f"Could not find known_targets.yaml in {_known_targets_path}")
- exit(1)
+ sys.exit(1)
def print_known_targets() -> None:
@@ -108,7 +109,7 @@ def print_known_targets() -> None:
table.add_row(values.get("hostname", ""), values.get("robot_name", ""), ip)
print_info("You can enter the following values as targets:")
CONSOLE.print(table)
- exit(0)
+ sys.exit(0)
def get_known_targets() -> dict[str, dict[str, str]]:
@@ -175,10 +176,10 @@ def _parse_targets(input_targets: str) -> list[str]:
target_ip = _identify_ip(input_target)
except ValueError:
print_err(f"Could not determine IP address from input: '{input_target}'")
- exit(1)
+ sys.exit(1)
if target_ip is None:
print_err(f"Could not determine IP address from input:' {input_target}'")
- exit(1)
+ sys.exit(1)
target_ips.append(target_ip)
return target_ips
@@ -231,7 +232,7 @@ def _concat_exception_args(e: Exception) -> str:
failure_table.add_column("Reason")
[failure_table.add_row(connection.host, reason) for connection, reason in failures]
CONSOLE.print(failure_table)
- exit(1)
+ sys.exit(1)
return connections
@@ -265,7 +266,7 @@ def _get_connections_from_all_known_targets(user: str, connection_timeout: Optio
open_connections.append(connection)
if len(open_connections) == 0:
print_err("Could not establish any connection to the known targets. Exiting...")
- exit(1)
+ sys.exit(1)
return ThreadingGroup.from_connections(open_connections)
@@ -311,6 +312,6 @@ def error(self, message):
if "the following arguments are required" in message:
print_err(message)
print_known_targets()
- exit(0)
+ sys.exit(0)
else:
super().error(message)
diff --git a/scripts/deploy/tasks/build.py b/scripts/deploy/tasks/build.py
index 1e2d50a6f..79fbcd78f 100644
--- a/scripts/deploy/tasks/build.py
+++ b/scripts/deploy/tasks/build.py
@@ -89,7 +89,7 @@ def _build(self, connections: Group) -> GroupResult:
results = connections.run(cmd, hide=hide_output())
print_debug(f"Build succeeded on the following hosts: {self._succeeded_hosts(results)}")
except GroupException as e:
- for connection, result in e.result.failed.items():
- print_err(f"Build on {connection.host} failed with the following errors: {result.stderr}")
+ for connection in e.result.failed.keys():
+ print_err(f"Build on {connection.host} failed!")
return e.result
return results
diff --git a/scripts/deploy_robots.py b/scripts/deploy_robots.py
index 9b3c9e38b..df9c19ead 100755
--- a/scripts/deploy_robots.py
+++ b/scripts/deploy_robots.py
@@ -1,4 +1,6 @@
#!/usr/bin/env python3
+import sys
+
from deploy.deploy_robots import DeployRobots
from deploy.misc import print_err
@@ -7,4 +9,4 @@
DeployRobots()
except KeyboardInterrupt:
print_err("Interrupted by user")
- exit(1)
+ sys.exit(1)
diff --git a/scripts/ros_bind_robot.sh b/scripts/ros_bind_robot.sh
new file mode 100755
index 000000000..235ddf462
--- /dev/null
+++ b/scripts/ros_bind_robot.sh
@@ -0,0 +1,55 @@
+#!/bin/bash
+
+# This script sets the ROS_DOMAIN_ID environment variable to the value associated with the provided robot name.
+# It also sets environment variables to enable automatic discovery of nodes in the local network.
+
+# Usage: source robot_connect.sh
+
+# Detect if this script is being sourced or run as a command
+if [ "$0" = "$BASH_SOURCE" ]; then
+ echo "This script must be sourced, not executed. Use: source $0 "
+ exit 1
+fi
+
+# Define a map of robot names to ROS_DOMAIN_IDs
+declare -A robot_map
+robot_map[amy]=11
+robot_map[rory]=12
+robot_map[jack]=13
+robot_map[donna]=14
+robot_map[melody]=15
+robot_map[rose]=16
+
+# Check if the provided argument is a valid robot name or a numeric ROS_DOMAIN_ID
+if ! ( [ -n "${robot_map[$1]}" ] || [[ $1 =~ ^[0-9]+$ ]]); then
+ echo "Usage: source $0 "
+ return 1
+fi
+
+echo "Setting up ROS connection..."
+
+# If the provided argument is a valid robot name, set the ROS_DOMAIN_ID
+if [ -n "${robot_map[$1]}" ]; then
+ export ROS_DOMAIN_ID=${robot_map[$1]}
+ echo "Binding to robot '$1' with ROS_DOMAIN_ID=$ROS_DOMAIN_ID"
+else
+ export ROS_DOMAIN_ID=$1
+ echo "Binding to robot with ROS_DOMAIN_ID=$ROS_DOMAIN_ID"
+fi
+
+# Set discovery range to local network
+export ROS_AUTOMATIC_DISCOVERY_RANGE=SUBNET
+
+# Unset local variables that may interfere with the discovery process
+unset ROS_LOCALHOST_ONLY
+
+echo "Restarting ros2 daemon..."
+ros2 daemon stop
+
+echo "Configuration complete!"
+
+# Print a list of discovered nodes if there are any
+if [ -n "$(ros2 node list 2>/dev/null)" ]; then
+ echo "\nDiscovered nodes:"
+ ros2 node list
+fi