Skip to content

Commit 8a3d733

Browse files
mergify[bot]saikishorchristophfroehlich
authored
Add new API for the RealtimePublisher (backport #323) (#352)
--------- Co-authored-by: Sai Kishor Kothakota <[email protected]> Co-authored-by: Christoph Froehlich <[email protected]>
1 parent 9dc08a7 commit 8a3d733

File tree

5 files changed

+114
-20
lines changed

5 files changed

+114
-20
lines changed

doc/migration.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,4 @@ This list summarizes important changes between Humble (previous) and Jazzy (curr
1111
RealtimeBox
1212
*******************************
1313
* ``RealtimeBox`` is deprecated. Update your code to use ``realtime_thread_safe_box.hpp`` header, and class name ``RealtimeThreadSafeBox`` instead. (`#318 <https://github.com/ros-controls/realtime_tools/pull/318>`__, `#342 <https://github.com/ros-controls/realtime_tools/pull/342>`__).
14+
* ``RealtimePublisher`` is updated with a new ``try_publish`` API. Update your code with a local message variable and call ``try_publish`` with that variable. The old API is deprecated and will be removed in a future release. (`#323 <https://github.com/ros-controls/realtime_tools/pull/323>`__).

doc/release_notes.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,4 @@ This list summarizes the changes between Humble (previous) and Jazzy (current) r
1111
RealtimeBox
1212
*******************************
1313
* ``RealtimeBox`` got renamed to ``RealtimeThreadSafeBox`` and uses real-time mutexes now (`#318 <https://github.com/ros-controls/realtime_tools/pull/318>`__, `#342 <https://github.com/ros-controls/realtime_tools/pull/342>`__).
14+
* ``RealtimePublisher`` is updated with a new ``try_publish`` API and few methods are deprecated (`#323 <https://github.com/ros-controls/realtime_tools/pull/323>`__).

realtime_tools/doc/realtime_publisher.rst

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ The ``realtime_tools::RealtimePublisher`` allows users that write C++ ros2_contr
1212
private:
1313
std::shared_ptr<realtime_tools::RealtimePublisher<my_msgs::msg::MyMsg>> state_publisher_;
1414
std::shared_ptr<rclcpp::Publisher<my_msgs::msg::MyMsg>> s_publisher_;
15+
my_msgs::msg::MyMsg some_msg_;
1516
}
1617
1718
controller_interface::CallbackReturn MyController::on_configure(
@@ -21,7 +22,8 @@ The ``realtime_tools::RealtimePublisher`` allows users that write C++ ros2_contr
2122
s_publisher_ = get_node()->create_publisher<my_msgs::msg::MyMsg>(
2223
"~/status", rclcpp::SystemDefaultsQoS());
2324
state_publisher_ =
24-
std::make_unique<realtime_tools::RealtimePublisher<ControllerStateMsg>>(s_publisher_);
25+
std::make_unique<realtime_tools::RealtimePublisher<my_msgs::msg::MyMsg>>(s_publisher_);
26+
some_msg_.header.frame_id = params_.frame_id;
2527
...
2628
}
2729
@@ -30,7 +32,8 @@ The ``realtime_tools::RealtimePublisher`` allows users that write C++ ros2_contr
3032
{
3133
...
3234
// Publish controller state
33-
state_publisher_->lock();
34-
state_publisher_->msg_ = some_msg;
35-
state_publisher_->unlockAndPublish();
35+
some_msg_.header.stamp = get_node()->now();
36+
// Fill in the rest of the message
37+
....
38+
state_publisher_->try_publish(some_msg_);
3639
}

realtime_tools/include/realtime_tools/realtime_publisher.hpp

Lines changed: 63 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
#include <mutex>
4545
#include <string>
4646
#include <thread>
47+
#include <utility>
4748

4849
#include "rclcpp/publisher.hpp"
4950

@@ -62,7 +63,6 @@ class RealtimePublisher
6263

6364
RCLCPP_SMART_PTR_DEFINITIONS(RealtimePublisher<MessageT>)
6465

65-
/// The msg_ variable contains the data that will get published on the ROS topic.
6666
MessageT msg_;
6767

6868
/**
@@ -137,31 +137,61 @@ class RealtimePublisher
137137
*/
138138
bool trylock()
139139
{
140-
if (turn_.load(std::memory_order_acquire) == State::REALTIME && msg_mutex_.try_lock()) {
140+
return turn_.load(std::memory_order_acquire) == State::REALTIME && msg_mutex_.try_lock();
141+
}
142+
143+
/**
144+
* \brief Check if the realtime publisher is in a state to publish messages
145+
* \return true if the publisher is in a state to publish messages
146+
* \note The msg_ variable can be safely accessed if this function returns true
147+
*/
148+
bool can_publish() const
149+
{
150+
std::unique_lock<std::mutex> lock(msg_mutex_, std::try_to_lock);
151+
return can_publish(lock);
152+
}
153+
154+
/**
155+
* \brief Try to publish the given message
156+
*
157+
* This method attempts to publish the given message if the publisher is in a state to do so.
158+
* It uses a try_lock to avoid blocking if the mutex is already held by another thread.
159+
*
160+
* \param [in] msg The message to publish
161+
* \return true if the message was successfully published, false otherwise
162+
*/
163+
bool try_publish(const MessageT & msg)
164+
{
165+
std::unique_lock<std::mutex> lock(msg_mutex_, std::try_to_lock);
166+
if (can_publish(lock)) {
167+
{
168+
std::unique_lock<std::mutex> scoped_lock(std::move(lock));
169+
msg_ = msg;
170+
turn_.store(State::NON_REALTIME, std::memory_order_release);
171+
}
172+
updated_cond_.notify_one(); // Notify the publishing thread
141173
return true;
142-
} else {
143-
return false;
144174
}
175+
return false;
145176
}
146177

147178
/**
148-
* \brief Try to get the data lock from realtime and publish the given message
179+
* \brief Try to publish the given message (deprecated)
180+
* \deprecated This method is deprecated and should be replaced with try_publish()
149181
*
150-
* Tries to gain unique access to msg_ variable. If this succeeds
151-
* update the msg_ variable and call unlockAndPublish
182+
* This method is deprecated and should be replaced with try_publish().
183+
* It attempts to publish the given message if the publisher is in a state to do so.
184+
* It uses a try_lock to avoid blocking if the mutex is already held by another thread.
152185
*
153186
* \param [in] msg The message to publish
154-
* \return false in case no lock for the realtime variable is acquired. This implies the message will not be published.
187+
* \return true if the message was successfully published, false otherwise
155188
*/
189+
[[deprecated(
190+
"Use try_publish() method instead of this method. This method may be removed in future "
191+
"versions.")]]
156192
bool tryPublish(const MessageT & msg)
157193
{
158-
if (!trylock()) {
159-
return false;
160-
}
161-
162-
msg_ = msg;
163-
unlockAndPublish();
164-
return true;
194+
return try_publish(msg);
165195
}
166196

167197
/**
@@ -199,7 +229,24 @@ class RealtimePublisher
199229

200230
const std::thread & get_thread() const { return thread_; }
201231

232+
const MessageT & get_msg() const { return msg_; }
233+
234+
std::mutex & get_mutex() { return msg_mutex_; }
235+
236+
const std::mutex & get_mutex() const { return msg_mutex_; }
237+
202238
private:
239+
/**
240+
* \brief Check if the realtime publisher is in a state to publish messages
241+
* \param lock A unique_lock that is already acquired on the msg_mutex_
242+
* \return true if the publisher is in a state to publish messages
243+
* \note The msg_ variable can be safely accessed if this function returns true
244+
*/
245+
bool can_publish(std::unique_lock<std::mutex> & lock) const
246+
{
247+
return turn_.load(std::memory_order_acquire) == State::REALTIME && lock.owns_lock();
248+
}
249+
203250
// non-copyable
204251
RealtimePublisher(const RealtimePublisher &) = delete;
205252
RealtimePublisher & operator=(const RealtimePublisher &) = delete;
@@ -246,7 +293,7 @@ class RealtimePublisher
246293

247294
std::thread thread_;
248295

249-
std::mutex msg_mutex_; // Protects msg_
296+
mutable std::mutex msg_mutex_; // Protects msg_
250297
std::condition_variable updated_cond_;
251298

252299
enum class State : int { REALTIME, NON_REALTIME, LOOP_NOT_STARTED };

realtime_tools/test/realtime_publisher_tests.cpp

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,3 +132,45 @@ TEST(RealtimePublisher, rt_try_publish)
132132
EXPECT_STREQ(expected_msg, str_callback.msg_.string_value.c_str());
133133
rclcpp::shutdown();
134134
}
135+
136+
TEST(RealtimePublisher, rt_can_try_publish)
137+
{
138+
rclcpp::init(0, nullptr);
139+
const size_t ATTEMPTS = 10;
140+
const std::chrono::milliseconds DELAY(250);
141+
142+
const char * expected_msg = "Hello World";
143+
auto node = std::make_shared<rclcpp::Node>("construct_move_destruct");
144+
rclcpp::QoS qos(10);
145+
qos.reliable().transient_local();
146+
auto pub = node->create_publisher<StringMsg>("~/rt_publish", qos);
147+
RealtimePublisher<StringMsg> rt_pub(pub);
148+
ASSERT_TRUE(rt_pub.can_publish());
149+
150+
// try publish a latched message
151+
bool publish_success = false;
152+
for (std::size_t i = 0; i < ATTEMPTS; ++i) {
153+
StringMsg msg;
154+
msg.string_value = expected_msg;
155+
156+
if (rt_pub.can_publish()) {
157+
ASSERT_TRUE(rt_pub.try_publish(msg));
158+
publish_success = true;
159+
}
160+
std::this_thread::sleep_for(DELAY);
161+
}
162+
ASSERT_TRUE(publish_success);
163+
164+
// make sure subscriber gets it
165+
StringCallback str_callback;
166+
167+
auto sub = node->create_subscription<StringMsg>(
168+
"~/rt_publish", qos,
169+
std::bind(&StringCallback::callback, &str_callback, std::placeholders::_1));
170+
for (size_t i = 0; i < ATTEMPTS && str_callback.msg_.string_value.empty(); ++i) {
171+
rclcpp::spin_some(node);
172+
std::this_thread::sleep_for(DELAY);
173+
}
174+
EXPECT_STREQ(expected_msg, str_callback.msg_.string_value.c_str());
175+
rclcpp::shutdown();
176+
}

0 commit comments

Comments
 (0)