diff --git a/ros2controlcli/doc/userdoc.rst b/ros2controlcli/doc/userdoc.rst index f6eb6cdc6a..d550dda158 100644 --- a/ros2controlcli/doc/userdoc.rst +++ b/ros2controlcli/doc/userdoc.rst @@ -20,6 +20,7 @@ Currently supported commands are - ros2 control switch_controllers - ros2 control unload_controller - ros2 control view_controller_chains + - ros2 control view_hardware_status list_controllers @@ -364,3 +365,24 @@ view_controller_chains --include-hidden-nodes Consider hidden nodes as well --ros-args ... Pass arbitrary arguments to the executable + + +view_hardware_status +---------------------- + +.. code-block:: console + + $ ros2 control view_hardware_status -h + usage: ros2 control view_hardware_status [-h] [--spin-time SPIN_TIME] [-s] [-i HARDWARE_ID] [-d DEVICE_ID] + + Echo hardware status messages with filtering capabilities + + options: + -h, --help show this help message and exit + --spin-time SPIN_TIME + Spin time in seconds to wait for discovery (only applies when not using an already running daemon) + -s, --use-sim-time Enable ROS simulation time + -i HARDWARE_ID, --hardware-id HARDWARE_ID + Filter by a specific hardware component ID. + -d DEVICE_ID, --device-id DEVICE_ID + Filter by a specific device ID within a hardware component. diff --git a/ros2controlcli/package.xml b/ros2controlcli/package.xml index a27f3a3602..6b57002d5d 100644 --- a/ros2controlcli/package.xml +++ b/ros2controlcli/package.xml @@ -18,6 +18,7 @@ ros2param controller_manager controller_manager_msgs + control_msgs rosidl_runtime_py python3-pygraphviz diff --git a/ros2controlcli/ros2controlcli/verb/view_hardware_status.py b/ros2controlcli/ros2controlcli/verb/view_hardware_status.py new file mode 100644 index 0000000000..4454c3fe7d --- /dev/null +++ b/ros2controlcli/ros2controlcli/verb/view_hardware_status.py @@ -0,0 +1,177 @@ +# Copyright 2025 ROS-Control Development Team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from functools import partial +import datetime + +import rclpy +from ros2cli.node.direct import add_arguments +from ros2cli.node.strategy import NodeStrategy +from ros2cli.verb import VerbExtension +from ros2topic.api import get_topic_names_and_types + +from control_msgs.msg import HardwareStatus +from controller_manager import bcolors + +from rosidl_runtime_py import message_to_yaml + +_DISCOVERY_THRESHOLD = 10 + + +class ViewHardwareStatusVerb(VerbExtension): + """Echo hardware status messages with filtering capabilities.""" + + def __init__(self): + super().__init__() + self.found_hardware_ids = set() + self.found_device_ids = set() + self.message_count = 0 + self.discovery_complete = False + + def add_arguments(self, parser, cli_name): + add_arguments(parser) + parser.add_argument( + "-i", + "--hardware-id", + dest="hardware_id", + help="Filter by a specific hardware component ID.", + ) + parser.add_argument( + "-d", + "--device-id", + dest="device_id", + help="Filter by a specific device ID within a hardware component.", + ) + + def _on_message(self, msg, args): + self.message_count += 1 + self.found_hardware_ids.add(msg.hardware_id) + for device_state in msg.hardware_device_states: + self.found_device_ids.add(device_state.device_id) + + if not self.discovery_complete and self.message_count >= _DISCOVERY_THRESHOLD: + self.discovery_complete = True + + if args.hardware_id and args.hardware_id not in self.found_hardware_ids: + print( + f"\n{bcolors.FAIL}Error: Hardware ID '{args.hardware_id}' not found.{bcolors.ENDC}" + ) + if self.found_hardware_ids: + print(f"{bcolors.OKBLUE}Available Hardware IDs:{bcolors.ENDC}") + for hw_id in sorted(self.found_hardware_ids): + print(f"\t{hw_id}") + else: + print(f"{bcolors.WARNING}No hardware IDs discovered.{bcolors.ENDC}") + rclpy.shutdown() + return + + if args.device_id and args.device_id not in self.found_device_ids: + print( + f"\n{bcolors.FAIL}Error: Device ID '{args.device_id}' not found.{bcolors.ENDC}" + ) + if self.found_device_ids: + print(f"{bcolors.OKBLUE}Available Device IDs:{bcolors.ENDC}") + for dev_id in sorted(self.found_device_ids): + print(f"\t{dev_id}") + else: + print(f"{bcolors.WARNING}No device IDs discovered.{bcolors.ENDC}") + rclpy.shutdown() + return + + if args.hardware_id and msg.hardware_id != args.hardware_id: + return + + if args.device_id and not any( + d.device_id == args.device_id for d in msg.hardware_device_states + ): + return + + try: + dt_object = datetime.datetime.fromtimestamp(msg.header.stamp.sec) + nano_str = f"{msg.header.stamp.nanosec:09d}" + timestamp = f"{dt_object.strftime('%H:%M:%S')}.{nano_str[:3]}" + except (ValueError, OSError): + timestamp = f"{msg.header.stamp.sec}.{msg.header.stamp.nanosec:09d}" + + print( + f"{bcolors.OKGREEN}Hardware ID: {bcolors.ENDC}{msg.hardware_id} ({bcolors.WARNING}stamp: {timestamp}{bcolors.ENDC})" + ) + + for device_state in msg.hardware_device_states: + if args.device_id and device_state.device_id != args.device_id: + continue + + print(f" {bcolors.OKCYAN}Device ID: {bcolors.ENDC}{device_state.device_id}") + + state_types = [ + ("Generic Hardware States", device_state.hardware_status), + ("CANopen States", device_state.canopen_states), + ("EtherCAT States", device_state.ethercat_states), + ("VDA5050 States", device_state.vda5050_states), + ] + + any_state_printed = False + for title, states in state_types: + if states: + any_state_printed = True + print(f" {bcolors.OKBLUE}{title}:{bcolors.ENDC}") + for state in states: + print(" -") + yaml_str = message_to_yaml(state, flow_style=False) + indented_str = "\n".join( + [f" {line}" for line in yaml_str.splitlines()] + ) + print(indented_str) + + if not any_state_printed: + print(f" {bcolors.FAIL}Status: No specific states reported{bcolors.ENDC}") + + print("---") + + def main(self, *, args): + with NodeStrategy(args).direct_node as node: + topic_names_and_types = get_topic_names_and_types( + node=node, include_hidden_topics=True + ) + + status_topics = sorted( + [ + name + for name, types in topic_names_and_types + if name.endswith("/hardware_status") + and "control_msgs/msg/HardwareStatus" in types + ] + ) + + if not status_topics: + print( + f"{bcolors.FAIL}No topics of type 'control_msgs/msg/HardwareStatus' found.{bcolors.ENDC}" + ) + return 1 + + print(f"{bcolors.OKBLUE}Subscribing to the following topics:{bcolors.ENDC}") + for topic in status_topics: + print(f"\t{topic}") + print("---") + + _ = [ + node.create_subscription( + HardwareStatus, topic, partial(self._on_message, args=args), 10 + ) + for topic in status_topics + ] + + rclpy.spin(node) + + return 0 diff --git a/ros2controlcli/setup.py b/ros2controlcli/setup.py index 21000f7f31..e6805c8e47 100644 --- a/ros2controlcli/setup.py +++ b/ros2controlcli/setup.py @@ -69,6 +69,7 @@ ros2controlcli.verb.set_hardware_component_state:SetHardwareComponentStateVerb", "switch_controllers = ros2controlcli.verb.switch_controllers:SwitchControllersVerb", "unload_controller = ros2controlcli.verb.unload_controller:UnloadControllerVerb", + "view_hardware_status = ros2controlcli.verb.view_hardware_status:ViewHardwareStatusVerb", ], }, )