Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ keywords = ["io", "async", "non-blocking", "futures"]
[features]
# Include nothing by default
default = []

bwos = []
# enable everything
full = [
"fs",
Expand All @@ -37,6 +37,7 @@ full = [
"rt",
"rt-multi-thread",
"signal",
"stats",
"sync",
"time",
]
Expand Down Expand Up @@ -72,6 +73,7 @@ process = [
# Includes basic task execution capabilities
rt = []
rt-multi-thread = [
"bwos",
"num_cpus",
"rt",
]
Expand Down
21 changes: 18 additions & 3 deletions tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ macro_rules! cfg_metrics {
$(
// For now, metrics is only disabled in loom tests.
// When stabilized, it might have a dedicated feature flag.
#[cfg(all(tokio_unstable, not(loom)))]
#[cfg(all(tokio_unstable, not(loom), feature="stats"))]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
$item
)*
Expand All @@ -206,15 +206,15 @@ macro_rules! cfg_metrics {
macro_rules! cfg_not_metrics {
($($item:item)*) => {
$(
#[cfg(not(all(tokio_unstable, not(loom))))]
#[cfg(not(all(tokio_unstable, not(loom), feature = "stats")))]
$item
)*
}
}

macro_rules! cfg_not_rt_and_metrics_and_net {
($($item:item)*) => {
$( #[cfg(not(all(feature = "net", feature = "rt", all(tokio_unstable, not(loom)))))]$item )*
$( #[cfg(not(all(feature = "net", feature = "rt", all(tokio_unstable, not(loom), feature = "stats"))))]$item )*
}
}

Expand Down Expand Up @@ -373,6 +373,21 @@ macro_rules! cfg_not_rt_multi_thread {
}
}

macro_rules! cfg_rt_multi_thread_bwos {
($($item:item)*) => {
$(
#[cfg(all(
tokio_unstable,
feature = "rt-multi-thread",
feature = "bwos",
not(tokio_wasi)
))]
#[cfg_attr(docsrs, doc(cfg(feature = "bwos")))]
$item
)*
}
}

macro_rules! cfg_taskdump {
($($item:item)*) => {
$(
Expand Down
39 changes: 31 additions & 8 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,25 @@ cfg_unstable! {

pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;

cfg_rt_multi_thread!(
#[derive(Clone, Copy)]
#[allow(dead_code)]
pub(crate) enum MultiThreadFlavor {
/// The default multithreaded tokio runqueue, based on the golang runqueue.
Default,
// There may be more (sub-) variants in the future influencing e.g. queue size
// or stealing strategy
/// A Block-based workstealing queue offering better performance
//#[cfg(all(tokio_unstable, feature = "bwos"))]
Bwos,
}
);

#[derive(Clone, Copy)]
pub(crate) enum Kind {
CurrentThread,
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
MultiThread,
MultiThread(MultiThreadFlavor),
}

impl Builder {
Expand All @@ -214,15 +228,23 @@ impl Builder {
Builder::new(Kind::CurrentThread, 31, EVENT_INTERVAL)
}

cfg_not_wasi! {
cfg_rt_multi_thread! {
/// Returns a new builder with the multi thread scheduler selected.
///
/// Configuration methods can be chained on the return value.
#[cfg(feature = "rt-multi-thread")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
pub fn new_multi_thread() -> Builder {
// The number `61` is fairly arbitrary. I believe this value was copied from golang.
Builder::new(Kind::MultiThread, 61, 61)
Builder::new(Kind::MultiThread(MultiThreadFlavor::Bwos), 61, 61)
}
}

cfg_rt_multi_thread_bwos! {
/// Returns a new builder with the BWoS multi thread scheduler selected.
///
/// Configuration methods can be chained on the return value.
pub fn new_multi_thread_bwos() -> Builder {
// The number `61` is copied from `new_multi_thread()`.
Builder::new(Kind::MultiThread(MultiThreadFlavor::Bwos), 61, 61)
}
}

Expand Down Expand Up @@ -649,7 +671,7 @@ impl Builder {
match &self.kind {
Kind::CurrentThread => self.build_current_thread_runtime(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Kind::MultiThread => self.build_threaded_runtime(),
Kind::MultiThread(flavor) => self.build_threaded_runtime(*flavor),
}
}

Expand All @@ -658,7 +680,7 @@ impl Builder {
enable_pause_time: match self.kind {
Kind::CurrentThread => true,
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Kind::MultiThread => false,
Kind::MultiThread(_) => false,
},
enable_io: self.enable_io,
enable_time: self.enable_time,
Expand Down Expand Up @@ -1163,7 +1185,7 @@ cfg_test_util! {

cfg_rt_multi_thread! {
impl Builder {
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
fn build_threaded_runtime(&mut self, flavor: MultiThreadFlavor) -> io::Result<Runtime> {
use crate::loom::sys::num_cpus;
use crate::runtime::{Config, runtime::Scheduler};
use crate::runtime::scheduler::{self, MultiThread};
Expand All @@ -1183,6 +1205,7 @@ cfg_rt_multi_thread! {

let (scheduler, handle, launch) = MultiThread::new(
core_threads,
flavor,
driver,
driver_handle,
blocking_spawner,
Expand Down
5 changes: 5 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ pub(crate) use park::{Parker, Unparker};

pub(crate) mod queue;

//pub(crate) mod queue;

mod worker;
pub(crate) use worker::{Context, Launch};

Expand All @@ -27,6 +29,7 @@ use crate::runtime::{
};
use crate::util::RngSeedGenerator;

use crate::runtime::builder::MultiThreadFlavor;
use std::fmt;
use std::future::Future;

Expand All @@ -38,6 +41,7 @@ pub(crate) struct MultiThread;
impl MultiThread {
pub(crate) fn new(
size: usize,
flavor: MultiThreadFlavor,
driver: Driver,
driver_handle: driver::Handle,
blocking_spawner: blocking::Spawner,
Expand All @@ -47,6 +51,7 @@ impl MultiThread {
let parker = Parker::new(driver);
let (handle, launch) = worker::create(
size,
flavor,
parker,
driver_handle,
blocking_spawner,
Expand Down
Loading