Skip to content

Commit 9f2ec0c

Browse files
Janosch MachowinskiJanosch Machowinski
Janosch Machowinski
authored and
Janosch Machowinski
committed
Revert "Avoid losing waitable handles while using MultiThreadedExecutor (#2109)"
This reverts commit 232262c. Signed-off-by: Janosch Machowinski <[email protected]>
1 parent c06c617 commit 9f2ec0c

File tree

2 files changed

+19
-180
lines changed

2 files changed

+19
-180
lines changed

rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp

+11-12
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
#include <memory>
1919
#include <vector>
20-
#include <utility>
2120

2221
#include "rcl/allocator.h"
2322

@@ -121,8 +120,8 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
121120
}
122121
}
123122
for (size_t i = 0; i < waitable_handles_.size(); ++i) {
124-
if (waitable_handles_[i]->is_ready(wait_set)) {
125-
waitable_triggered_handles_.emplace_back(std::move(waitable_handles_[i]));
123+
if (!waitable_handles_[i]->is_ready(wait_set)) {
124+
waitable_handles_[i].reset();
126125
}
127126
}
128127

@@ -146,7 +145,10 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
146145
timer_handles_.end()
147146
);
148147

149-
waitable_handles_.clear();
148+
waitable_handles_.erase(
149+
std::remove(waitable_handles_.begin(), waitable_handles_.end(), nullptr),
150+
waitable_handles_.end()
151+
);
150152
}
151153

152154
bool collect_entities(const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override
@@ -390,17 +392,16 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
390392
rclcpp::AnyExecutable & any_exec,
391393
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override
392394
{
393-
auto & waitable_handles = waitable_triggered_handles_;
394-
auto it = waitable_handles.begin();
395-
while (it != waitable_handles.end()) {
395+
auto it = waitable_handles_.begin();
396+
while (it != waitable_handles_.end()) {
396397
std::shared_ptr<Waitable> & waitable = *it;
397398
if (waitable) {
398399
// Find the group for this handle and see if it can be serviced
399400
auto group = get_group_by_waitable(waitable, weak_groups_to_nodes);
400401
if (!group) {
401402
// Group was not found, meaning the waitable is not valid...
402403
// Remove it from the ready list and continue looking
403-
it = waitable_handles.erase(it);
404+
it = waitable_handles_.erase(it);
404405
continue;
405406
}
406407
if (!group->can_be_taken_from().load()) {
@@ -413,11 +414,11 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
413414
any_exec.waitable = waitable;
414415
any_exec.callback_group = group;
415416
any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes);
416-
waitable_handles.erase(it);
417+
waitable_handles_.erase(it);
417418
return;
418419
}
419420
// Else, the waitable is no longer valid, remove it and continue
420-
it = waitable_handles.erase(it);
421+
it = waitable_handles_.erase(it);
421422
}
422423
}
423424

@@ -498,8 +499,6 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
498499
VectorRebind<std::shared_ptr<const rcl_timer_t>> timer_handles_;
499500
VectorRebind<std::shared_ptr<Waitable>> waitable_handles_;
500501

501-
VectorRebind<std::shared_ptr<Waitable>> waitable_triggered_handles_;
502-
503502
std::shared_ptr<VoidAlloc> allocator_;
504503
};
505504

rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp

+8-168
Original file line numberDiff line numberDiff line change
@@ -63,57 +63,6 @@ class TestWaitable : public rclcpp::Waitable
6363
}
6464
};
6565

66-
static bool test_waitable_result2 = false;
67-
68-
class TestWaitable2 : public rclcpp::Waitable
69-
{
70-
public:
71-
explicit TestWaitable2(rcl_publisher_t * pub_ptr)
72-
: pub_ptr_(pub_ptr),
73-
pub_event_(rcl_get_zero_initialized_event())
74-
{
75-
EXPECT_EQ(
76-
rcl_publisher_event_init(&pub_event_, pub_ptr_, RCL_PUBLISHER_OFFERED_DEADLINE_MISSED),
77-
RCL_RET_OK);
78-
}
79-
80-
~TestWaitable2()
81-
{
82-
EXPECT_EQ(rcl_event_fini(&pub_event_), RCL_RET_OK);
83-
}
84-
85-
void add_to_wait_set(rcl_wait_set_t * wait_set) override
86-
{
87-
EXPECT_EQ(rcl_wait_set_add_event(wait_set, &pub_event_, &wait_set_event_index_), RCL_RET_OK);
88-
}
89-
90-
bool is_ready(rcl_wait_set_t *) override
91-
{
92-
return test_waitable_result2;
93-
}
94-
95-
std::shared_ptr<void>
96-
take_data() override
97-
{
98-
return nullptr;
99-
}
100-
101-
void execute(std::shared_ptr<void> & data) override
102-
{
103-
(void) data;
104-
}
105-
106-
size_t get_number_of_ready_events() override
107-
{
108-
return 1;
109-
}
110-
111-
private:
112-
rcl_publisher_t * pub_ptr_;
113-
rcl_event_t pub_event_;
114-
size_t wait_set_event_index_;
115-
};
116-
11766
struct RclWaitSetSizes
11867
{
11968
size_t size_of_subscriptions = 0;
@@ -708,129 +657,20 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_timer) {
708657
}
709658

710659
TEST_F(TestAllocatorMemoryStrategy, get_next_waitable) {
660+
auto node1 = std::make_shared<rclcpp::Node>("waitable_node", "ns");
661+
auto node2 = std::make_shared<rclcpp::Node>("waitable_node2", "ns");
662+
rclcpp::Waitable::SharedPtr waitable1 = std::make_shared<TestWaitable>();
663+
rclcpp::Waitable::SharedPtr waitable2 = std::make_shared<TestWaitable>();
664+
node1->get_node_waitables_interface()->add_waitable(waitable1, nullptr);
665+
node2->get_node_waitables_interface()->add_waitable(waitable2, nullptr);
666+
711667
auto get_next_entity = [this](const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) {
712668
rclcpp::AnyExecutable result;
713669
allocator_memory_strategy()->get_next_waitable(result, weak_groups_to_nodes);
714670
return result;
715671
};
716672

717-
{
718-
auto node1 = std::make_shared<rclcpp::Node>(
719-
"waitable_node", "ns",
720-
rclcpp::NodeOptions()
721-
.start_parameter_event_publisher(false)
722-
.start_parameter_services(false));
723-
724-
rclcpp::PublisherOptions pub_options;
725-
pub_options.use_default_callbacks = false;
726-
727-
auto pub1 = node1->create_publisher<test_msgs::msg::Empty>(
728-
"test_topic_1", rclcpp::QoS(10), pub_options);
729-
730-
auto waitable1 =
731-
std::make_shared<TestWaitable2>(pub1->get_publisher_handle().get());
732-
node1->get_node_waitables_interface()->add_waitable(waitable1, nullptr);
733-
734-
auto basic_node = create_node_with_disabled_callback_groups("basic_node");
735-
WeakCallbackGroupsToNodesMap weak_groups_to_nodes;
736-
basic_node->for_each_callback_group(
737-
[basic_node, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr)
738-
{
739-
weak_groups_to_nodes.insert(
740-
std::pair<rclcpp::CallbackGroup::WeakPtr,
741-
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>(
742-
group_ptr,
743-
basic_node->get_node_base_interface()));
744-
});
745-
node1->for_each_callback_group(
746-
[node1, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr)
747-
{
748-
weak_groups_to_nodes.insert(
749-
std::pair<rclcpp::CallbackGroup::WeakPtr,
750-
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>(
751-
group_ptr,
752-
node1->get_node_base_interface()));
753-
});
754-
allocator_memory_strategy()->collect_entities(weak_groups_to_nodes);
755-
756-
rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set();
757-
ASSERT_EQ(
758-
rcl_wait_set_init(
759-
&wait_set,
760-
allocator_memory_strategy()->number_of_ready_subscriptions(),
761-
allocator_memory_strategy()->number_of_guard_conditions(),
762-
allocator_memory_strategy()->number_of_ready_timers(),
763-
allocator_memory_strategy()->number_of_ready_clients(),
764-
allocator_memory_strategy()->number_of_ready_services(),
765-
allocator_memory_strategy()->number_of_ready_events(),
766-
rclcpp::contexts::get_global_default_context()->get_rcl_context().get(),
767-
allocator_memory_strategy()->get_allocator()),
768-
RCL_RET_OK);
769-
770-
ASSERT_TRUE(allocator_memory_strategy()->add_handles_to_wait_set(&wait_set));
771-
772-
ASSERT_EQ(
773-
rcl_wait(
774-
&wait_set,
775-
std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::milliseconds(100))
776-
.count()),
777-
RCL_RET_OK);
778-
test_waitable_result2 = true;
779-
allocator_memory_strategy()->remove_null_handles(&wait_set);
780-
781-
rclcpp::AnyExecutable result = get_next_entity(weak_groups_to_nodes);
782-
EXPECT_EQ(result.node_base, node1->get_node_base_interface());
783-
test_waitable_result2 = false;
784-
785-
EXPECT_EQ(rcl_wait_set_fini(&wait_set), RCL_RET_OK);
786-
}
787-
788-
{
789-
auto node2 = std::make_shared<rclcpp::Node>(
790-
"waitable_node2", "ns",
791-
rclcpp::NodeOptions()
792-
.start_parameter_services(false)
793-
.start_parameter_event_publisher(false));
794-
795-
rclcpp::PublisherOptions pub_options;
796-
pub_options.use_default_callbacks = false;
797-
798-
auto pub2 = node2->create_publisher<test_msgs::msg::Empty>(
799-
"test_topic_2", rclcpp::QoS(10), pub_options);
800-
801-
auto waitable2 =
802-
std::make_shared<TestWaitable2>(pub2->get_publisher_handle().get());
803-
node2->get_node_waitables_interface()->add_waitable(waitable2, nullptr);
804-
805-
auto basic_node2 = std::make_shared<rclcpp::Node>(
806-
"basic_node2", "ns",
807-
rclcpp::NodeOptions()
808-
.start_parameter_services(false)
809-
.start_parameter_event_publisher(false));
810-
WeakCallbackGroupsToNodesMap weak_groups_to_uncollected_nodes;
811-
basic_node2->for_each_callback_group(
812-
[basic_node2, &weak_groups_to_uncollected_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr)
813-
{
814-
weak_groups_to_uncollected_nodes.insert(
815-
std::pair<rclcpp::CallbackGroup::WeakPtr,
816-
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>(
817-
group_ptr,
818-
basic_node2->get_node_base_interface()));
819-
});
820-
node2->for_each_callback_group(
821-
[node2,
822-
&weak_groups_to_uncollected_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr)
823-
{
824-
weak_groups_to_uncollected_nodes.insert(
825-
std::pair<rclcpp::CallbackGroup::WeakPtr,
826-
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>(
827-
group_ptr,
828-
node2->get_node_base_interface()));
829-
});
830-
831-
rclcpp::AnyExecutable failed_result = get_next_entity(weak_groups_to_uncollected_nodes);
832-
EXPECT_EQ(failed_result.node_base, nullptr);
833-
}
673+
EXPECT_TRUE(TestGetNextEntity(node1, node2, get_next_entity));
834674
}
835675

836676
TEST_F(TestAllocatorMemoryStrategy, get_next_subscription_mutually_exclusive) {

0 commit comments

Comments
 (0)