Skip to content

Async Execution #421

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 52 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
9b9de10
Drafting traits for generic executors
mxgrey Oct 7, 2024
2dba330
Fleshing out interfaces for async execution
mxgrey Oct 7, 2024
f10036d
Beginning to migrate subscriptions to async
mxgrey Oct 8, 2024
b1dda94
Implementing async execution for subscriptions
mxgrey Oct 8, 2024
c31ad7d
Update wait set behavior
mxgrey Oct 9, 2024
cda131d
Reimagining the way wait sets are handled
mxgrey Oct 10, 2024
d28ed70
Finished reworking wait sets -- need to migrate all waitables
mxgrey Oct 10, 2024
abb8367
Finished migrating guard conditions and services
mxgrey Oct 11, 2024
3a851be
Change Waitable from trait to struct
mxgrey Oct 12, 2024
2f91083
Migrate clients to new async implementation
mxgrey Oct 12, 2024
cc5b264
Fleshing out basic executor
mxgrey Oct 13, 2024
06cb0ab
Finished implementing basic executor -- needs testing
mxgrey Oct 13, 2024
40d8746
Add support for waiting on graph events
mxgrey Oct 13, 2024
329cafb
Migrate parameter service tests to new async executor
mxgrey Oct 13, 2024
d15f762
Try async-std timeouts instead of tokio
mxgrey Oct 13, 2024
fe56cc9
Debugging strange client failure to take
mxgrey Oct 13, 2024
8f0f192
Experimenting with only taking from services and clients in the same …
mxgrey Oct 14, 2024
c99f410
Migrate subscription to use shared callback instead of an async task
mxgrey Oct 14, 2024
26b41f8
Remove task modules that are no longer used
mxgrey Oct 14, 2024
500b083
Rename wait module to wait_set
mxgrey Oct 14, 2024
ee15de5
Move the wait_set_runner to the wait_set module
mxgrey Oct 14, 2024
d395f5f
Remove unnecessary debug outputs
mxgrey Oct 14, 2024
5c59592
Big cleanup
mxgrey Oct 14, 2024
0138e33
Update doctests
mxgrey Oct 14, 2024
22d2c84
Remove old comments
mxgrey Oct 14, 2024
29fe10e
Migrate Context, Executor, and Node creation to new API
mxgrey Nov 16, 2024
1b5c187
Update examples
mxgrey Nov 16, 2024
1ec9f10
Fix documentation
mxgrey Nov 19, 2024
0874d8d
Fix formatting
mxgrey Nov 20, 2024
433a348
Migrate to SubscriptionOptions
mxgrey Nov 20, 2024
f12f874
Migrate to PublisherOptions
mxgrey Nov 20, 2024
2c32e20
Migrate to ServiceOptions
mxgrey Nov 20, 2024
6c61c9c
Migrate to ClientOptions
mxgrey Nov 20, 2024
da05361
Enable direct creation of the _Options for all primitive types
mxgrey Nov 20, 2024
bf3d01a
Migrate Node to shared state pattern
mxgrey Nov 20, 2024
bbb5333
Migrate primitives to state pattern
mxgrey Nov 20, 2024
4c2a67b
Fix example formatting
mxgrey Nov 20, 2024
daebaa8
Fix example formatting
mxgrey Nov 20, 2024
2cff0b7
Fix examples
mxgrey Nov 20, 2024
126aaca
Fix docs
mxgrey Nov 20, 2024
012ff2e
Make deadline, liveliness_lease, and lifespan all symmetric
mxgrey Nov 21, 2024
f33e8d5
Add an API to the primitive options builder for avoiding ROS namespac…
mxgrey Nov 21, 2024
98b8ea2
Retrigger CI
mxgrey Nov 21, 2024
6059506
Implicitly cast &str to NodeOptions
mxgrey Nov 23, 2024
259fcb3
Remove debug outputs
mxgrey Nov 23, 2024
e1ceb70
Fix formatting
mxgrey Nov 23, 2024
0918476
Merge with latest main
mxgrey Dec 9, 2024
f1c4716
Merge execution structure PR
mxgrey Dec 9, 2024
2a7ff60
Merge options pattern PR
mxgrey Dec 9, 2024
e86707e
Merge in shared state pattern
mxgrey Dec 9, 2024
1f1d826
Refining merge
mxgrey Dec 9, 2024
6d3f7e4
Update examples
mxgrey Dec 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 9 additions & 15 deletions examples/message_demo/src/message_demo.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{convert::TryInto, env, sync::Arc};
use std::convert::TryInto;

use anyhow::{Error, Result};
use rosidl_runtime_rs::{seq, BoundedSequence, Message, Sequence};
Expand Down Expand Up @@ -138,38 +138,32 @@ fn demonstrate_sequences() {
fn demonstrate_pubsub() -> Result<(), Error> {
println!("================== Interoperability demo ==================");
// Demonstrate interoperability between idiomatic and RMW-native message types
let context = rclrs::Context::new(env::args())?;
let node = rclrs::create_node(&context, "message_demo")?;
let mut executor = rclrs::Context::default_from_env()?.create_basic_executor();
let node = executor.create_node("message_demo")?;

let idiomatic_publisher = node.create_publisher::<rclrs_example_msgs::msg::VariousTypes>(
"topic",
rclrs::QOS_PROFILE_DEFAULT,
)?;
let direct_publisher = node.create_publisher::<rclrs_example_msgs::msg::rmw::VariousTypes>(
"topic",
rclrs::QOS_PROFILE_DEFAULT,
)?;
let idiomatic_publisher =
node.create_publisher::<rclrs_example_msgs::msg::VariousTypes>("topic")?;
let direct_publisher =
node.create_publisher::<rclrs_example_msgs::msg::rmw::VariousTypes>("topic")?;

let _idiomatic_subscription = node
.create_subscription::<rclrs_example_msgs::msg::VariousTypes, _>(
"topic",
rclrs::QOS_PROFILE_DEFAULT,
move |_msg: rclrs_example_msgs::msg::VariousTypes| println!("Got idiomatic message!"),
)?;
let _direct_subscription = node
.create_subscription::<rclrs_example_msgs::msg::rmw::VariousTypes, _>(
"topic",
rclrs::QOS_PROFILE_DEFAULT,
move |_msg: rclrs_example_msgs::msg::rmw::VariousTypes| {
println!("Got RMW-native message!")
},
)?;
println!("Sending idiomatic message.");
idiomatic_publisher.publish(rclrs_example_msgs::msg::VariousTypes::default())?;
rclrs::spin_once(Arc::clone(&node), None)?;
executor.spin(rclrs::SpinOptions::spin_once())?;
println!("Sending RMW-native message.");
direct_publisher.publish(rclrs_example_msgs::msg::rmw::VariousTypes::default())?;
rclrs::spin_once(Arc::clone(&node), None)?;
executor.spin(rclrs::SpinOptions::spin_once())?;

Ok(())
}
Expand Down
32 changes: 15 additions & 17 deletions examples/minimal_client_service/src/minimal_client.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,32 @@
use std::env;

use anyhow::{Error, Result};
use rclrs::{Context, SpinOptions, Promise};

fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;
let mut executor = Context::default_from_env()?.create_basic_executor();

let node = rclrs::create_node(&context, "minimal_client")?;
let node = executor.create_node("minimal_client")?;

let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;

let request = example_interfaces::srv::AddTwoInts_Request { a: 41, b: 1 };

println!("Starting client");

while !client.service_is_ready()? {
std::thread::sleep(std::time::Duration::from_millis(10));
}

client.async_send_request_with_callback(
&request,
move |response: example_interfaces::srv::AddTwoInts_Response| {
println!(
"Result of {} + {} is: {}",
request.a, request.b, response.sum
);
},
)?;
let request = example_interfaces::srv::AddTwoInts_Request { a: 41, b: 1 };

let response: Promise<example_interfaces::srv::AddTwoInts_Response> = client.call(&request).unwrap();

std::thread::sleep(std::time::Duration::from_millis(500));
let promise = executor.commands().run(async move {
let response = response.await.unwrap();
println!(
"Result of {} + {} is: {}",
request.a, request.b, response.sum,
);
});

println!("Waiting for response");
rclrs::spin(node).map_err(|err| err.into())
executor.spin(SpinOptions::new().until_promise_resolved(promise))?;
Ok(())
}
31 changes: 14 additions & 17 deletions examples/minimal_client_service/src/minimal_client_async.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::env;

use anyhow::{Error, Result};
use rclrs::{Context, SpinOptions};

#[tokio::main]
async fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;
fn main() -> Result<(), Error> {
let mut executor = Context::default_from_env()?.create_basic_executor();

let node = rclrs::create_node(&context, "minimal_client")?;
let node = executor.create_node("minimal_client")?;

let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;

Expand All @@ -18,18 +16,17 @@ async fn main() -> Result<(), Error> {

let request = example_interfaces::srv::AddTwoInts_Request { a: 41, b: 1 };

let future = client.call_async(&request);
let promise = client.call_then(
&request,
move |response: example_interfaces::srv::AddTwoInts_Response| {
println!(
"Result of {} + {} is: {}",
request.a, request.b, response.sum,
);
}
).unwrap();

println!("Waiting for response");

let rclrs_spin = tokio::task::spawn_blocking(move || rclrs::spin(node));

let response = future.await?;
println!(
"Result of {} + {} is: {}",
request.a, request.b, response.sum
);

rclrs_spin.await.ok();
executor.spin(SpinOptions::new().until_promise_resolved(promise))?;
Ok(())
}
19 changes: 12 additions & 7 deletions examples/minimal_client_service/src/minimal_service.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,30 @@
use std::env;

use anyhow::{Error, Result};
use rclrs::{Context, ServiceInfo, SpinOptions};

fn handle_service(
_request_header: &rclrs::rmw_request_id_t,
request: example_interfaces::srv::AddTwoInts_Request,
info: ServiceInfo,
) -> example_interfaces::srv::AddTwoInts_Response {
println!("request: {} + {}", request.a, request.b);
let timestamp = info
.received_timestamp
.map(|t| format!(" at [{t:?}]"))
.unwrap_or(String::new());

println!("request{timestamp}: {} + {}", request.a, request.b);
example_interfaces::srv::AddTwoInts_Response {
sum: request.a + request.b,
}
}

fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;
let mut executor = Context::default_from_env()?.create_basic_executor();

let node = rclrs::create_node(&context, "minimal_service")?;
let node = executor.create_node("minimal_service")?;

let _server = node
.create_service::<example_interfaces::srv::AddTwoInts, _>("add_two_ints", handle_service)?;

println!("Starting server");
rclrs::spin(node).map_err(|err| err.into())
executor.spin(SpinOptions::default())?;
Ok(())
}
10 changes: 4 additions & 6 deletions examples/minimal_pub_sub/src/minimal_publisher.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use std::env;

use anyhow::{Error, Result};

fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;
let context = rclrs::Context::default_from_env()?;
let executor = context.create_basic_executor();

let node = rclrs::create_node(&context, "minimal_publisher")?;
let node = executor.create_node("minimal_publisher")?;

let publisher =
node.create_publisher::<std_msgs::msg::String>("topic", rclrs::QOS_PROFILE_DEFAULT)?;
let publisher = node.create_publisher::<std_msgs::msg::String>("topic")?;

let mut message = std_msgs::msg::String::default();

Expand Down
22 changes: 12 additions & 10 deletions examples/minimal_pub_sub/src/minimal_subscriber.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
use std::env;

use anyhow::{Error, Result};
use std::sync::Mutex;
use rclrs::{Context, SpinOptions};

fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;

let node = rclrs::create_node(&context, "minimal_subscriber")?;
let context = Context::default_from_env()?;
let mut executor = context.create_basic_executor();

let mut num_messages: usize = 0;
let node = executor.create_node("minimal_subscriber")?;

let num_messages = Mutex::new(0usize);
let _subscription = node.create_subscription::<std_msgs::msg::String, _>(
"topic",
rclrs::QOS_PROFILE_DEFAULT,
move |msg: std_msgs::msg::String| {
num_messages += 1;
let mut num = num_messages.lock().unwrap();
*num += 1;
println!("I heard: '{}'", msg.data);
println!("(Got {} messages so far)", num_messages);
println!("(Got {} messages so far)", num);
},
)?;

rclrs::spin(node).map_err(|err| err.into())
println!("Waiting for messages...");
executor.spin(SpinOptions::default())?;
Ok(())
}
46 changes: 21 additions & 25 deletions examples/minimal_pub_sub/src/minimal_two_nodes.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
use std::{
env,
sync::{
atomic::{AtomicU32, Ordering},
Arc, Mutex,
},
use std::sync::{
atomic::{AtomicU32, Ordering},
Arc, Mutex,
};

use anyhow::{Error, Result};

struct MinimalSubscriber {
num_messages: AtomicU32,
node: Arc<rclrs::Node>,
subscription: Mutex<Option<Arc<rclrs::Subscription<std_msgs::msg::String>>>>,
node: rclrs::Node,
subscription: Mutex<Option<rclrs::Subscription<std_msgs::msg::String>>>,
}

impl MinimalSubscriber {
pub fn new(name: &str, topic: &str) -> Result<Arc<Self>, rclrs::RclrsError> {
let context = rclrs::Context::new(env::args())?;
let node = rclrs::create_node(&context, name)?;
pub fn new(
executor: &rclrs::Executor,
name: &str,
topic: &str,
) -> Result<Arc<Self>, rclrs::RclrsError> {
let node = executor.create_node(name)?;
let minimal_subscriber = Arc::new(MinimalSubscriber {
num_messages: 0.into(),
node,
Expand All @@ -29,7 +29,6 @@ impl MinimalSubscriber {
.node
.create_subscription::<std_msgs::msg::String, _>(
topic,
rclrs::QOS_PROFILE_DEFAULT,
move |msg: std_msgs::msg::String| {
minimal_subscriber_aux.callback(msg);
},
Expand All @@ -50,14 +49,15 @@ impl MinimalSubscriber {
}

fn main() -> Result<(), Error> {
let publisher_context = rclrs::Context::new(env::args())?;
let publisher_node = rclrs::create_node(&publisher_context, "minimal_publisher")?;
let mut executor = rclrs::Context::default_from_env()?.create_basic_executor();
let publisher_node = executor.create_node("minimal_publisher")?;

let subscriber_node_one = MinimalSubscriber::new("minimal_subscriber_one", "topic")?;
let subscriber_node_two = MinimalSubscriber::new("minimal_subscriber_two", "topic")?;
let _subscriber_node_one =
MinimalSubscriber::new(&executor, "minimal_subscriber_one", "topic")?;
let _subscriber_node_two =
MinimalSubscriber::new(&executor, "minimal_subscriber_two", "topic")?;

let publisher = publisher_node
.create_publisher::<std_msgs::msg::String>("topic", rclrs::QOS_PROFILE_DEFAULT)?;
let publisher = publisher_node.create_publisher::<std_msgs::msg::String>("topic")?;

std::thread::spawn(move || -> Result<(), rclrs::RclrsError> {
let mut message = std_msgs::msg::String::default();
Expand All @@ -71,11 +71,7 @@ fn main() -> Result<(), Error> {
}
});

let executor = rclrs::SingleThreadedExecutor::new();

executor.add_node(&publisher_node)?;
executor.add_node(&subscriber_node_one.node)?;
executor.add_node(&subscriber_node_two.node)?;

executor.spin().map_err(|err| err.into())
executor
.spin(rclrs::SpinOptions::default())
.map_err(|err| err.into())
}
10 changes: 4 additions & 6 deletions examples/minimal_pub_sub/src/zero_copy_publisher.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use std::env;

use anyhow::{Error, Result};

fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;
let context = rclrs::Context::default_from_env()?;
let executor = context.create_basic_executor();

let node = rclrs::create_node(&context, "minimal_publisher")?;
let node = executor.create_node("minimal_publisher")?;

let publisher =
node.create_publisher::<std_msgs::msg::rmw::UInt32>("topic", rclrs::QOS_PROFILE_DEFAULT)?;
let publisher = node.create_publisher::<std_msgs::msg::rmw::UInt32>("topic")?;

let mut publish_count: u32 = 1;

Expand Down
21 changes: 11 additions & 10 deletions examples/minimal_pub_sub/src/zero_copy_subscriber.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
use std::env;

use anyhow::{Error, Result};
use std::sync::Mutex;
use rclrs::ReadOnlyLoanedMessage;

fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;
let mut executor = rclrs::Context::default_from_env()?.create_basic_executor();

let node = rclrs::create_node(&context, "minimal_subscriber")?;
let node = executor.create_node("minimal_subscriber")?;

let mut num_messages: usize = 0;
let num_messages = Mutex::new(0usize);

let _subscription = node.create_subscription::<std_msgs::msg::UInt32, _>(
"topic",
rclrs::QOS_PROFILE_DEFAULT,
move |msg: rclrs::ReadOnlyLoanedMessage<'_, std_msgs::msg::UInt32>| {
num_messages += 1;
move |msg: ReadOnlyLoanedMessage<std_msgs::msg::UInt32>| {
let mut num = num_messages.lock().unwrap();
*num += 1;
println!("I heard: '{}'", msg.data);
println!("(Got {} messages so far)", num_messages);
println!("(Got {} messages so far)", *num);
},
)?;

rclrs::spin(node).map_err(|err| err.into())
executor.spin(rclrs::SpinOptions::default())?;
Ok(())
}
Loading
Loading