Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions controller_manager/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,16 @@ if(BUILD_TESTING)
)

find_package(ament_cmake_pytest REQUIRED)
find_package(launch_testing_ament_cmake REQUIRED)
find_package(rclpy REQUIRED)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
find_package(launch_testing_ament_cmake REQUIRED)
find_package(rclpy REQUIRED)

As this is also included in the cmake file in the subdir

install(FILES test/test_ros2_control_node.yaml
DESTINATION test)
ament_add_pytest_test(test_ros2_control_node test/test_ros2_control_node_launch.py)
ament_add_pytest_test(test_test_utils test/test_test_utils.py)

# Now include the launch_utils subfolder
add_subdirectory(test/test_launch_utils)

endif()

install(
Expand Down
42 changes: 42 additions & 0 deletions controller_manager/test/test_launch_utils/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# This subdirectory handles pytest-based launch tests for controller_manager

find_package(ament_cmake_pytest REQUIRED)
find_package(launch_testing_ament_cmake REQUIRED)
find_package(rclpy REQUIRED)

# Install YAML test files
#
install(
FILES
test_controller_load.yaml
test_ros2_control_node_combined.yaml
DESTINATION test
)

# Make sure test files get installed into the test tree
install(
FILES
test_launch_utils_unit.py
test_launch_utils_integration_list.py
test_launch_utils_integration_dict.py
test_launch_utils_integration_load.py
DESTINATION test
)

# Register each test with ament
ament_add_pytest_test(test_launch_utils_unit test_launch_utils_unit.py
APPEND_ENV AMENT_PREFIX_PATH=${ament_index_build_path}
PYTHONPATH=${CMAKE_CURRENT_BINARY_DIR}:${CMAKE_CURRENT_SOURCE_DIR}:${PYTHONPATH}
)
ament_add_pytest_test(test_launch_utils_integration_list
test_launch_utils_integration_list.py
TIMEOUT 60
)
ament_add_pytest_test(test_launch_utils_integration_dict
test_launch_utils_integration_dict.py
TIMEOUT 60
)
ament_add_pytest_test(test_launch_utils_integration_load
test_launch_utils_integration_load.py
TIMEOUT 60
)
Comment on lines 39 to 47
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ament_add_pytest_test(test_launch_utils_integration_list
test_launch_utils_integration_list.py
TIMEOUT 60
)
ament_add_pytest_test(test_launch_utils_integration_dict
test_launch_utils_integration_dict.py
TIMEOUT 60
)
ament_add_pytest_test(test_launch_utils_integration_load
test_launch_utils_integration_load.py
TIMEOUT 60
)
ament_add_pytest_test(test_launch_utils_integration_list
test_launch_utils_integration_list.py
)
ament_add_pytest_test(test_launch_utils_integration_dict
test_launch_utils_integration_dict.py
)
ament_add_pytest_test(test_launch_utils_integration_load
test_launch_utils_integration_load.py
)

nitpick: Isn't 60 the default value?

Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
controller_manager:
ros__parameters:
update_rate: 100 # Hz

test_controller_load:
ros__parameters:
type: "controller_manager/test_controller"
joint_names: ["joint1"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
#!/usr/bin/env python3
# Copyright 2025 Robert Kwan
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
import unittest
import tempfile
import time
from pathlib import Path

from ament_index_python.packages import get_package_share_directory, get_package_prefix
from launch import LaunchDescription
import launch_testing
from launch_testing.actions import ReadyToTest
import launch_ros.actions

import rclpy

from controller_manager.launch_utils import (
generate_controllers_spawner_launch_description_from_dict,
)


@pytest.mark.launch_test
def generate_test_description():
"""
Generate launch description for testing.
THIS VERSION CREATES ALL NEEDED FILES DYNAMICALLY AND USES THE COMBINED CONFIG.
"""

# Create temporary directory for all test files
temp_dir = tempfile.mkdtemp()
print(f"Creating test files in: {temp_dir}")

# URDF path (pathlib version, no xacro)
urdf = (
Path(get_package_share_directory("ros2_control_test_assets"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Path(get_package_share_directory("ros2_control_test_assets"))
PathSubstitution(FindPackageShare("ros2_control_test_assets"))

From

from launch.substitutions import PathSubstitution
from launch_ros.substitutions import FindPackageShare

would be the ROS-ish way.

/ "urdf"
/ "test_hardware_components.urdf"
)

with open(urdf) as infp:
robot_description_content = infp.read()
robot_description = {"robot_description": robot_description_content}

# Path to combined YAML
robot_controllers = (
Path(get_package_prefix("controller_manager"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Path(get_package_prefix("controller_manager"))
PathSubstitution(FindPackagePrefix("controller_manager"))

from

from launch_ros.substitutions import FindPackagePrefix

/ "test"
/ "test_ros2_control_node_combined.yaml"
)

# Verify files exist (Path method)
assert robot_controllers.is_file(), f"Controller config not found: {robot_controllers}"
assert urdf.is_file(), f"URDF not found: {urdf}"

robot_state_pub_node = launch_ros.actions.Node(
package="robot_state_publisher",
executable="robot_state_publisher",
output="both",
parameters=[robot_description],
)

# ===== START CONTROLLER MANAGER (ros2_control_node) =====
control_node = launch_ros.actions.Node(
package="controller_manager",
executable="ros2_control_node",
parameters=[str(robot_controllers)], # Use the combined config file
output="both",
)

# The dictionary keys are the controller names to be spawned/started.
# Values can be empty lists since config is provided via the main YAML.
ctrl_dict = {
"joint_state_broadcaster": [str(robot_controllers)],
"controller1": [str(robot_controllers)],
"controller2": [str(robot_controllers)],
}
controller_list = list(ctrl_dict.keys())

# ===== GENERATE SPAWNER LAUNCH DESCRIPTION =====
print(f"Spawning controllers: {controller_list}")

# Correct function name and call
spawner_ld = generate_controllers_spawner_launch_description_from_dict(
controller_info_dict=ctrl_dict,
)

# ===== CREATE LAUNCH DESCRIPTION =====
ld = LaunchDescription(
[robot_state_pub_node, control_node, ReadyToTest()] + spawner_ld.entities
)

# Return tuple with launch description and test context
return ld, {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
robot_state_pub_node = launch_ros.actions.Node(
package="robot_state_publisher",
executable="robot_state_publisher",
output="both",
parameters=[robot_description],
)
# ===== START CONTROLLER MANAGER (ros2_control_node) =====
control_node = launch_ros.actions.Node(
package="controller_manager",
executable="ros2_control_node",
parameters=[str(robot_controllers)], # Use the combined config file
output="both",
)
# The dictionary keys are the controller names to be spawned/started.
# Values can be empty lists since config is provided via the main YAML.
ctrl_dict = {
"joint_state_broadcaster": [str(robot_controllers)],
"controller1": [str(robot_controllers)],
"controller2": [str(robot_controllers)],
}
controller_list = list(ctrl_dict.keys())
# ===== GENERATE SPAWNER LAUNCH DESCRIPTION =====
print(f"Spawning controllers: {controller_list}")
# Correct function name and call
spawner_ld = generate_controllers_spawner_launch_description_from_dict(
controller_info_dict=ctrl_dict,
)
# ===== CREATE LAUNCH DESCRIPTION =====
ld = LaunchDescription(
[robot_state_pub_node, control_node, ReadyToTest()] + spawner_ld.entities
)
# Return tuple with launch description and test context
return ld, {
# The dictionary keys are the controller names to be spawned/started.
# Values can be empty lists since config is provided via the main YAML.
ctrl_dict = {
"joint_state_broadcaster": [str(robot_controllers)],
"controller1": [str(robot_controllers)],
"controller2": [str(robot_controllers)],
}
controller_list = list(ctrl_dict.keys())
# ===== GENERATE SPAWNER LAUNCH DESCRIPTION =====
print(f"Spawning controllers: {controller_list}")
# Return tuple with launch description and test context
return LaunchDescription([
launch_ros.actions.Node(
package="robot_state_publisher",
executable="robot_state_publisher",
output="both",
parameters=[robot_description],
),
launch_ros.actions.Node(
package="controller_manager",
executable="ros2_control_node",
parameters=[str(robot_controllers)], # Use the combined config file
output="both",
),
generate_controllers_spawner_launch_description_from_dict(
controller_info_dict=ctrl_dict,
),
ReadyToTest()]), {

Do we need the variables, can't we just directly return them?

"controller_list": controller_list, # Key name updated to match the test function
"robot_controllers": robot_controllers,
"urdf_file": urdf,
"temp_dir": temp_dir,
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return ld, {
"controller_list": controller_list, # Key name updated to match the test function
"robot_controllers": robot_controllers,
"urdf_file": urdf,
"temp_dir": temp_dir,
}
return ld

Why returning this? (I'm not very familiar with pytest, just following the ROS integration tests tutorials).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @christophfroehlich for the detailed review. I will try to look into it in the next two days.



# Active tests
class TestControllerSpawnerList(unittest.TestCase):
"""Active tests that run while the launch is active."""

@classmethod
def setUpClass(cls):
rclpy.init()

@classmethod
def tearDownClass(cls):
rclpy.shutdown()

def test_spawner_nodes_launched(self, proc_info):
"""Ensure processes are running."""
process_names = proc_info.process_names()
self.assertGreater(len(process_names), 0)
print("\n[TEST] Active processes: {process_names}")

def test_controllers_loaded(self, proc_info, controller_list):
"""Test that controllers were loaded (poll until they appear)."""
node = rclpy.create_node("test_controller_query_node")

try:
from controller_manager_msgs.srv import ListControllers

client = node.create_client(ListControllers, "/controller_manager/list_controllers")

print("\n[TEST] Waiting for controller_manager service...")
wait_for_svc_timeout = 30.0
if not client.wait_for_service(timeout_sec=wait_for_svc_timeout):
process_names = proc_info.process_names()
self.fail(
f"Controller manager service not available after {wait_for_svc_timeout}s.\n"
f"Active processes: {process_names}"
)

# Poll for controllers to be registered
print("[TEST] Service available, polling for controllers (timeout 30s)...")
deadline = node.get_clock().now() + rclpy.duration.Duration(seconds=30.0)
seen = []
while node.get_clock().now() < deadline:
req = ListControllers.Request()
fut = client.call_async(req)
rclpy.spin_until_future_complete(node, fut, timeout_sec=2.0)
if fut.done() and fut.result() is not None:
response = fut.result()
seen = [c.name for c in response.controller]
if all(ctrl in seen for ctrl in controller_list):
print(f"[TEST] Loaded controllers: {seen}")
break
# small sleep to avoid tight-loop
time.sleep(0.2)
else:
# timeout expired
self.fail(
f"Timeout waiting for controllers to be loaded. "
f"Expected: {controller_list}, saw: {seen}"
)

# Final assert (defensive)
for controller in controller_list:
self.assertIn(
controller,
seen,
f"Controller '{controller}' was not loaded. Available: {seen}",
)

print(f"[TEST] ? All {len(controller_list)} controllers loaded successfully")

finally:
node.destroy_node()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not using this?

Suggested change
def test_controllers_loaded(self, proc_info, controller_list):
"""Test that controllers were loaded (poll until they appear)."""
node = rclpy.create_node("test_controller_query_node")
try:
from controller_manager_msgs.srv import ListControllers
client = node.create_client(ListControllers, "/controller_manager/list_controllers")
print("\n[TEST] Waiting for controller_manager service...")
wait_for_svc_timeout = 30.0
if not client.wait_for_service(timeout_sec=wait_for_svc_timeout):
process_names = proc_info.process_names()
self.fail(
f"Controller manager service not available after {wait_for_svc_timeout}s.\n"
f"Active processes: {process_names}"
)
# Poll for controllers to be registered
print("[TEST] Service available, polling for controllers (timeout 30s)...")
deadline = node.get_clock().now() + rclpy.duration.Duration(seconds=30.0)
seen = []
while node.get_clock().now() < deadline:
req = ListControllers.Request()
fut = client.call_async(req)
rclpy.spin_until_future_complete(node, fut, timeout_sec=2.0)
if fut.done() and fut.result() is not None:
response = fut.result()
seen = [c.name for c in response.controller]
if all(ctrl in seen for ctrl in controller_list):
print(f"[TEST] Loaded controllers: {seen}")
break
# small sleep to avoid tight-loop
time.sleep(0.2)
else:
# timeout expired
self.fail(
f"Timeout waiting for controllers to be loaded. "
f"Expected: {controller_list}, saw: {seen}"
)
# Final assert (defensive)
for controller in controller_list:
self.assertIn(
controller,
seen,
f"Controller '{controller}' was not loaded. Available: {seen}",
)
print(f"[TEST] ? All {len(controller_list)} controllers loaded successfully")
finally:
node.destroy_node()
def test_controllers_start(self):
cnames = ["ctrl_with_parameters_and_type"]
check_controllers_running(self.node, cnames, state="active")

Needs
from controller_manager.test_utils import check_controllers_running

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you see an improvement to check_controllers_running, please open a PR for it! :)


def test_spawner_exit_code(self, proc_info):
"""Test that spawner process ran (may have completed already)."""
process_names = proc_info.process_names()
print(f"\n[TEST] Checking for spawner in: {process_names}")

# The spawner may have already completed successfully and exited
# So we just verify that we have processes running
self.assertGreater(len(process_names), 0)
print(f"[TEST] ? Launch has {len(process_names)} active processes")


@launch_testing.post_shutdown_test()
class TestProcessOutput(unittest.TestCase):
"""Post-shutdown tests."""

def test_exit_codes(self, proc_info):
"""Verify all processes exited successfully."""
print("\n[POST-SHUTDOWN] Process exit codes:")
for process_name in proc_info.process_names():
info = proc_info[process_name]
print(f" {process_name}: {info.returncode}")

for process_name in proc_info.process_names():
info = proc_info[process_name]

if "ros2_control_node" in process_name:
self.assertEqual(
info.returncode, 0, f"{process_name} exited with {info.returncode}"
)
elif "spawner" in process_name:
# Spawner should complete successfully (0) or be terminated
self.assertIn(
info.returncode,
[0, -2, -15],
f"Spawner {process_name} exited with {info.returncode}",
)
else:
self.assertIn(
info.returncode, [0, -2, -15], f"{process_name} exited with {info.returncode}"
)

print("[POST-SHUTDOWN] ? All processes exited as expected")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this verbose test case necessary? Wouldn't this one be also fine?

Suggested change
print("\n[POST-SHUTDOWN] Process exit codes:")
for process_name in proc_info.process_names():
info = proc_info[process_name]
print(f" {process_name}: {info.returncode}")
for process_name in proc_info.process_names():
info = proc_info[process_name]
if "ros2_control_node" in process_name:
self.assertEqual(
info.returncode, 0, f"{process_name} exited with {info.returncode}"
)
elif "spawner" in process_name:
# Spawner should complete successfully (0) or be terminated
self.assertIn(
info.returncode,
[0, -2, -15],
f"Spawner {process_name} exited with {info.returncode}",
)
else:
self.assertIn(
info.returncode, [0, -2, -15], f"{process_name} exited with {info.returncode}"
)
print("[POST-SHUTDOWN] ? All processes exited as expected")
launch_testing.asserts.assertExitCodes(proc_info)


def test_cleanup_temp_files(self, temp_dir):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain why this is necessary? We don't do any cleanups in other pytests.

"""Clean up temporary test files."""
import shutil

print(f"\n[CLEANUP] Removing temporary directory: {temp_dir}")

# The original clean-up logic was commented out, enabling it for safety
try:
if temp_dir.exists():
shutil.rmtree(temp_dir)

print("[CLEANUP] ? Temporary files removed")
except Exception as e:
print(f"[CLEANUP] Warning: Cleanup failed: {e}")
Loading