Skip to content

Commit ed791d3

Browse files
committed
Restore static single threaded executor
Signed-off-by: Michael Carroll <[email protected]>
1 parent 14e009e commit ed791d3

9 files changed

+2151
-4
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,357 @@
1+
// Copyright 2020 Open Source Robotics Foundation, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_
16+
#define RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_
17+
18+
#include <chrono>
19+
#include <list>
20+
#include <map>
21+
#include <memory>
22+
#include <vector>
23+
24+
#include "rcl/guard_condition.h"
25+
#include "rcl/wait.h"
26+
27+
#include "rclcpp/experimental/executable_list.hpp"
28+
#include "rclcpp/macros.hpp"
29+
#include "rclcpp/memory_strategy.hpp"
30+
#include "rclcpp/visibility_control.hpp"
31+
#include "rclcpp/waitable.hpp"
32+
33+
namespace rclcpp
34+
{
35+
namespace executors
36+
{
37+
typedef std::map<rclcpp::CallbackGroup::WeakPtr,
38+
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
39+
std::owner_less<rclcpp::CallbackGroup::WeakPtr>> WeakCallbackGroupsToNodesMap;
40+
41+
class StaticExecutorEntitiesCollector final
42+
: public rclcpp::Waitable,
43+
public std::enable_shared_from_this<StaticExecutorEntitiesCollector>
44+
{
45+
public:
46+
RCLCPP_SMART_PTR_DEFINITIONS(StaticExecutorEntitiesCollector)
47+
48+
// Constructor
49+
RCLCPP_PUBLIC
50+
StaticExecutorEntitiesCollector() = default;
51+
52+
// Destructor
53+
RCLCPP_PUBLIC
54+
~StaticExecutorEntitiesCollector();
55+
56+
/// Initialize StaticExecutorEntitiesCollector
57+
/**
58+
* \param p_wait_set A reference to the wait set to be used in the executor
59+
* \param memory_strategy Shared pointer to the memory strategy to set.
60+
* \throws std::runtime_error if memory strategy is null
61+
*/
62+
RCLCPP_PUBLIC
63+
void
64+
init(
65+
rcl_wait_set_t * p_wait_set,
66+
rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy);
67+
68+
/// Finalize StaticExecutorEntitiesCollector to clear resources
69+
RCLCPP_PUBLIC
70+
bool
71+
is_init() {return initialized_;}
72+
73+
RCLCPP_PUBLIC
74+
void
75+
fini();
76+
77+
/// Execute the waitable.
78+
RCLCPP_PUBLIC
79+
void
80+
execute(std::shared_ptr<void> & data) override;
81+
82+
/// Take the data so that it can be consumed with `execute`.
83+
/**
84+
* For `StaticExecutorEntitiesCollector`, this always return `nullptr`.
85+
* \sa rclcpp::Waitable::take_data()
86+
*/
87+
RCLCPP_PUBLIC
88+
std::shared_ptr<void>
89+
take_data() override;
90+
91+
/// Function to add_handles_to_wait_set and wait for work and
92+
/**
93+
* block until the wait set is ready or until the timeout has been exceeded.
94+
* \throws std::runtime_error if wait set couldn't be cleared or filled.
95+
* \throws any rcl errors from rcl_wait, \see rclcpp::exceptions::throw_from_rcl_error()
96+
*/
97+
RCLCPP_PUBLIC
98+
void
99+
refresh_wait_set(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
100+
101+
/**
102+
* \throws std::runtime_error if it couldn't add guard condition to wait set
103+
*/
104+
RCLCPP_PUBLIC
105+
void
106+
add_to_wait_set(rcl_wait_set_t * wait_set) override;
107+
108+
RCLCPP_PUBLIC
109+
size_t
110+
get_number_of_ready_guard_conditions() override;
111+
112+
/// Add a callback group to an executor.
113+
/**
114+
* \see rclcpp::Executor::add_callback_group
115+
*/
116+
RCLCPP_PUBLIC
117+
bool
118+
add_callback_group(
119+
rclcpp::CallbackGroup::SharedPtr group_ptr,
120+
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr);
121+
122+
/// Add a callback group to an executor.
123+
/**
124+
* \see rclcpp::Executor::add_callback_group
125+
* \return boolean whether the node from the callback group is new
126+
*/
127+
RCLCPP_PUBLIC
128+
bool
129+
add_callback_group(
130+
rclcpp::CallbackGroup::SharedPtr group_ptr,
131+
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
132+
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes);
133+
134+
/// Remove a callback group from the executor.
135+
/**
136+
* \see rclcpp::Executor::remove_callback_group
137+
*/
138+
RCLCPP_PUBLIC
139+
bool
140+
remove_callback_group(
141+
rclcpp::CallbackGroup::SharedPtr group_ptr);
142+
143+
/// Remove a callback group from the executor.
144+
/**
145+
* \see rclcpp::Executor::remove_callback_group_from_map
146+
*/
147+
RCLCPP_PUBLIC
148+
bool
149+
remove_callback_group_from_map(
150+
rclcpp::CallbackGroup::SharedPtr group_ptr,
151+
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes);
152+
153+
/**
154+
* \see rclcpp::Executor::add_node()
155+
* \throw std::runtime_error if node was already added
156+
*/
157+
RCLCPP_PUBLIC
158+
bool
159+
add_node(
160+
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr);
161+
162+
/**
163+
* \see rclcpp::Executor::remove_node()
164+
* \throw std::runtime_error if no guard condition is associated with node.
165+
*/
166+
RCLCPP_PUBLIC
167+
bool
168+
remove_node(
169+
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr);
170+
171+
RCLCPP_PUBLIC
172+
std::vector<rclcpp::CallbackGroup::WeakPtr>
173+
get_all_callback_groups();
174+
175+
/// Get callback groups that belong to executor.
176+
/**
177+
* \see rclcpp::Executor::get_manually_added_callback_groups()
178+
*/
179+
RCLCPP_PUBLIC
180+
std::vector<rclcpp::CallbackGroup::WeakPtr>
181+
get_manually_added_callback_groups();
182+
183+
/// Get callback groups that belong to executor.
184+
/**
185+
* \see rclcpp::Executor::get_automatically_added_callback_groups_from_nodes()
186+
*/
187+
RCLCPP_PUBLIC
188+
std::vector<rclcpp::CallbackGroup::WeakPtr>
189+
get_automatically_added_callback_groups_from_nodes();
190+
191+
/// Complete all available queued work without blocking.
192+
/**
193+
* This function checks if after the guard condition was triggered
194+
* (or a spurious wakeup happened) we are really ready to execute
195+
* i.e. re-collect entities
196+
*/
197+
RCLCPP_PUBLIC
198+
bool
199+
is_ready(rcl_wait_set_t * wait_set) override;
200+
201+
/// Return number of timers
202+
/**
203+
* \return number of timers
204+
*/
205+
RCLCPP_PUBLIC
206+
size_t
207+
get_number_of_timers() {return exec_list_.number_of_timers;}
208+
209+
/// Return number of subscriptions
210+
/**
211+
* \return number of subscriptions
212+
*/
213+
RCLCPP_PUBLIC
214+
size_t
215+
get_number_of_subscriptions() {return exec_list_.number_of_subscriptions;}
216+
217+
/// Return number of services
218+
/**
219+
* \return number of services
220+
*/
221+
RCLCPP_PUBLIC
222+
size_t
223+
get_number_of_services() {return exec_list_.number_of_services;}
224+
225+
/// Return number of clients
226+
/**
227+
* \return number of clients
228+
*/
229+
RCLCPP_PUBLIC
230+
size_t
231+
get_number_of_clients() {return exec_list_.number_of_clients;}
232+
233+
/// Return number of waitables
234+
/**
235+
* \return number of waitables
236+
*/
237+
RCLCPP_PUBLIC
238+
size_t
239+
get_number_of_waitables() {return exec_list_.number_of_waitables;}
240+
241+
/** Return a SubscritionBase Sharedptr by index.
242+
* \param[in] i The index of the SubscritionBase
243+
* \return a SubscritionBase shared pointer
244+
* \throws std::out_of_range if the argument is higher than the size of the structrue.
245+
*/
246+
RCLCPP_PUBLIC
247+
rclcpp::SubscriptionBase::SharedPtr
248+
get_subscription(size_t i) {return exec_list_.subscription[i];}
249+
250+
/** Return a TimerBase Sharedptr by index.
251+
* \param[in] i The index of the TimerBase
252+
* \return a TimerBase shared pointer
253+
* \throws std::out_of_range if the argument is higher than the size.
254+
*/
255+
RCLCPP_PUBLIC
256+
rclcpp::TimerBase::SharedPtr
257+
get_timer(size_t i) {return exec_list_.timer[i];}
258+
259+
/** Return a ServiceBase Sharedptr by index.
260+
* \param[in] i The index of the ServiceBase
261+
* \return a ServiceBase shared pointer
262+
* \throws std::out_of_range if the argument is higher than the size.
263+
*/
264+
RCLCPP_PUBLIC
265+
rclcpp::ServiceBase::SharedPtr
266+
get_service(size_t i) {return exec_list_.service[i];}
267+
268+
/** Return a ClientBase Sharedptr by index
269+
* \param[in] i The index of the ClientBase
270+
* \return a ClientBase shared pointer
271+
* \throws std::out_of_range if the argument is higher than the size.
272+
*/
273+
RCLCPP_PUBLIC
274+
rclcpp::ClientBase::SharedPtr
275+
get_client(size_t i) {return exec_list_.client[i];}
276+
277+
/** Return a Waitable Sharedptr by index
278+
* \param[in] i The index of the Waitable
279+
* \return a Waitable shared pointer
280+
* \throws std::out_of_range if the argument is higher than the size.
281+
*/
282+
RCLCPP_PUBLIC
283+
rclcpp::Waitable::SharedPtr
284+
get_waitable(size_t i) {return exec_list_.waitable[i];}
285+
286+
private:
287+
/// Function to reallocate space for entities in the wait set.
288+
/**
289+
* \throws std::runtime_error if wait set couldn't be cleared or resized.
290+
*/
291+
void
292+
prepare_wait_set();
293+
294+
void
295+
fill_executable_list();
296+
297+
void
298+
fill_memory_strategy();
299+
300+
/// Return true if the node belongs to the collector
301+
/**
302+
* \param[in] node_ptr a node base interface shared pointer
303+
* \param[in] weak_groups_to_nodes map to nodes to lookup
304+
* \return boolean whether a node belongs the collector
305+
*/
306+
bool
307+
has_node(
308+
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
309+
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const;
310+
311+
/// Add all callback groups that can be automatically added by any executor
312+
/// and is not already associated with an executor from nodes
313+
/// that are associated with executor
314+
/**
315+
* \see rclcpp::Executor::add_callback_groups_from_nodes_associated_to_executor()
316+
*/
317+
void
318+
add_callback_groups_from_nodes_associated_to_executor();
319+
320+
void
321+
fill_executable_list_from_map(const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes);
322+
323+
/// Memory strategy: an interface for handling user-defined memory allocation strategies.
324+
rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy_;
325+
326+
// maps callback groups to nodes.
327+
WeakCallbackGroupsToNodesMap weak_groups_associated_with_executor_to_nodes_;
328+
// maps callback groups to nodes.
329+
WeakCallbackGroupsToNodesMap weak_groups_to_nodes_associated_with_executor_;
330+
331+
typedef std::map<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
332+
const rclcpp::GuardCondition *,
333+
std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>
334+
WeakNodesToGuardConditionsMap;
335+
WeakNodesToGuardConditionsMap weak_nodes_to_guard_conditions_;
336+
337+
/// List of weak nodes registered in the static executor
338+
std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr> weak_nodes_;
339+
340+
// Mutex to protect vector of new nodes.
341+
std::mutex new_nodes_mutex_;
342+
std::vector<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr> new_nodes_;
343+
344+
/// Wait set for managing entities that the rmw layer waits on.
345+
rcl_wait_set_t * p_wait_set_ = nullptr;
346+
347+
/// Executable list: timers, subscribers, clients, services and waitables
348+
rclcpp::experimental::ExecutableList exec_list_;
349+
350+
/// Bool to check if the entities collector has been initialized
351+
bool initialized_ = false;
352+
};
353+
354+
} // namespace executors
355+
} // namespace rclcpp
356+
357+
#endif // RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_

0 commit comments

Comments
 (0)