Skip to content

Commit e6a3bf1

Browse files
committed
switch to CounterPair struct, improve reliability of flaky test
1 parent ed8aba8 commit e6a3bf1

File tree

10 files changed

+67
-102
lines changed

10 files changed

+67
-102
lines changed

tokio/src/runtime/metrics/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ cfg_metrics! {
1919

2020
mod runtime;
2121
#[allow(unreachable_pub)] // rust-lang/rust#57411
22-
pub use runtime::RuntimeMetrics;
22+
pub use runtime::{RuntimeMetrics, CounterPair};
2323

2424
mod scheduler;
2525
pub(crate) use scheduler::SchedulerMetrics;

tokio/src/runtime/metrics/runtime.rs

+32-24
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,31 @@ pub struct RuntimeMetrics {
1515
handle: Handle,
1616
}
1717

18+
/// A gauge represented as two counters.
19+
///
20+
/// Instead of decrementing a gauge, we increment a decrements counter.
21+
/// This is beneficial as it allows you to observe activity spikes that occur
22+
/// inbetween a scrape interval
23+
#[derive(Copy, Clone, Debug, PartialEq)]
24+
pub struct CounterPair {
25+
/// Tracks how many times this gauge was incremented
26+
pub inc: u64,
27+
/// Tracks how many times this gauge was decremeneted
28+
pub dec: u64,
29+
}
30+
31+
impl CounterPair {
32+
/// Determines the current length of the pair
33+
pub fn len(&self) -> usize {
34+
(self.inc - self.dec) as usize
35+
}
36+
37+
/// Determines if the counter pair represents an empty collection
38+
pub fn is_empty(&self) -> bool {
39+
self.inc == self.dec
40+
}
41+
}
42+
1843
impl RuntimeMetrics {
1944
pub(crate) fn new(handle: Handle) -> RuntimeMetrics {
2045
RuntimeMetrics { handle }
@@ -85,10 +110,10 @@ impl RuntimeMetrics {
85110
/// }
86111
/// ```
87112
pub fn active_tasks_count(&self) -> usize {
88-
self.handle.inner.active_tasks_count()
113+
self.handle.inner.task_counts().len()
89114
}
90115

91-
/// Returns the number of started tasks in the runtime.
116+
/// Returns a counter pair representing the number of started and completed tasks in the runtime.
92117
///
93118
/// # Examples
94119
///
@@ -100,30 +125,13 @@ impl RuntimeMetrics {
100125
/// let metrics = Handle::current().metrics();
101126
///
102127
/// let n = metrics.start_tasks_count();
103-
/// println!("Runtime has {} active tasks", n);
104-
/// }
105-
/// ```
106-
pub fn start_tasks_count(&self) -> u64 {
107-
self.handle.inner.start_tasks_count()
108-
}
109-
110-
/// Returns the number of finished tasks in the runtime.
111-
///
112-
/// # Examples
113-
///
114-
/// ```
115-
/// use tokio::runtime::Handle;
116-
///
117-
/// #[tokio::main]
118-
/// async fn main() {
119-
/// let metrics = Handle::current().metrics();
120-
///
121-
/// let n = metrics.stop_tasks_count();
122-
/// println!("Runtime has {} active tasks", n);
128+
/// println!("Runtime has {} started tasks", n.inc);
129+
/// println!("Runtime has {} completed tasks", n.dec);
130+
/// println!("Runtime has {} active tasks", n.len());
123131
/// }
124132
/// ```
125-
pub fn stop_tasks_count(&self) -> u64 {
126-
self.handle.inner.stop_tasks_count()
133+
pub fn task_counts(&self) -> CounterPair {
134+
self.handle.inner.task_counts()
127135
}
128136

129137
/// Returns the number of idle threads, which have spawned by the runtime

tokio/src/runtime/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ cfg_rt! {
250250

251251
cfg_metrics! {
252252
mod metrics;
253-
pub use metrics::{RuntimeMetrics, HistogramScale};
253+
pub use metrics::{RuntimeMetrics, HistogramScale, CounterPair};
254254

255255
pub(crate) use metrics::{MetricsBatch, SchedulerMetrics, WorkerMetrics, HistogramBuilder};
256256

tokio/src/runtime/scheduler/current_thread/mod.rs

+3-10
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::future::poll_fn;
22
use crate::loom::sync::atomic::AtomicBool;
33
use crate::loom::sync::Arc;
44
use crate::runtime::driver::{self, Driver};
5+
use crate::runtime::metrics::CounterPair;
56
use crate::runtime::scheduler::{self, Defer, Inject};
67
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
78
use crate::runtime::{blocking, context, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics};
@@ -539,16 +540,8 @@ cfg_metrics! {
539540
self.blocking_spawner.queue_depth()
540541
}
541542

542-
pub(crate) fn active_tasks_count(&self) -> usize {
543-
self.shared.owned.active_tasks_count()
544-
}
545-
546-
pub(crate) fn start_tasks_count(&self) -> u64 {
547-
self.shared.owned.start_tasks_count()
548-
}
549-
550-
pub(crate) fn stop_tasks_count(&self) -> u64 {
551-
self.shared.owned.stop_tasks_count()
543+
pub(crate) fn tasks_count(&self) -> CounterPair {
544+
self.shared.owned.tasks_count()
552545
}
553546
}
554547
}

tokio/src/runtime/scheduler/mod.rs

+3-11
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ cfg_rt! {
164164
}
165165

166166
cfg_metrics! {
167-
use crate::runtime::{SchedulerMetrics, WorkerMetrics};
167+
use crate::runtime::{SchedulerMetrics, WorkerMetrics, metrics::CounterPair};
168168

169169
impl Handle {
170170
pub(crate) fn num_workers(&self) -> usize {
@@ -185,16 +185,8 @@ cfg_rt! {
185185
match_flavor!(self, Handle(handle) => handle.num_idle_blocking_threads())
186186
}
187187

188-
pub(crate) fn active_tasks_count(&self) -> usize {
189-
match_flavor!(self, Handle(handle) => handle.active_tasks_count())
190-
}
191-
192-
pub(crate) fn start_tasks_count(&self) -> u64 {
193-
match_flavor!(self, Handle(handle) => handle.start_tasks_count())
194-
}
195-
196-
pub(crate) fn stop_tasks_count(&self) -> u64 {
197-
match_flavor!(self, Handle(handle) => handle.stop_tasks_count())
188+
pub(crate) fn task_counts(&self) -> CounterPair {
189+
match_flavor!(self, Handle(handle) => handle.tasks_count())
198190
}
199191

200192
pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {

tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs

+3-11
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use super::Handle;
22

3-
use crate::runtime::{SchedulerMetrics, WorkerMetrics};
3+
use crate::runtime::{SchedulerMetrics, WorkerMetrics, metrics::CounterPair};
44

55
impl Handle {
66
pub(crate) fn num_workers(&self) -> usize {
@@ -15,16 +15,8 @@ impl Handle {
1515
self.blocking_spawner.num_idle_threads()
1616
}
1717

18-
pub(crate) fn active_tasks_count(&self) -> usize {
19-
self.shared.owned.active_tasks_count()
20-
}
21-
22-
pub(crate) fn start_tasks_count(&self) -> u64 {
23-
self.shared.owned.start_tasks_count()
24-
}
25-
26-
pub(crate) fn stop_tasks_count(&self) -> u64 {
27-
self.shared.owned.stop_tasks_count()
18+
pub(crate) fn tasks_count(&self) -> CounterPair {
19+
self.shared.owned.tasks_count()
2820
}
2921

3022
pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {

tokio/src/runtime/scheduler/multi_thread_alt/handle/metrics.rs

+3-11
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use super::Handle;
22

3-
use crate::runtime::{SchedulerMetrics, WorkerMetrics};
3+
use crate::runtime::{SchedulerMetrics, WorkerMetrics, metrics::CounterPair};
44

55
impl Handle {
66
pub(crate) fn num_workers(&self) -> usize {
@@ -15,16 +15,8 @@ impl Handle {
1515
self.blocking_spawner.num_idle_threads()
1616
}
1717

18-
pub(crate) fn active_tasks_count(&self) -> usize {
19-
self.shared.owned.active_tasks_count()
20-
}
21-
22-
pub(crate) fn start_tasks_count(&self) -> u64 {
23-
self.shared.owned.start_tasks_count()
24-
}
25-
26-
pub(crate) fn stop_tasks_count(&self) -> u64 {
27-
self.shared.owned.stop_tasks_count()
18+
pub(crate) fn tasks_count(&self) -> CounterPair {
19+
self.shared.owned.tasks_count()
2820
}
2921

3022
pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {

tokio/src/runtime/task/list.rs

+7-10
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
use crate::future::Future;
1010
use crate::loom::cell::UnsafeCell;
1111
use crate::loom::sync::Mutex;
12+
use crate::runtime::metrics::CounterPair;
1213
use crate::runtime::task::{JoinHandle, LocalNotified, Notified, Schedule, Task};
1314
use crate::util::linked_list::{CountedLinkedList, Link, LinkedList};
1415

@@ -166,16 +167,12 @@ impl<S: 'static> OwnedTasks<S> {
166167
}
167168
}
168169

169-
pub(crate) fn active_tasks_count(&self) -> usize {
170-
self.inner.lock().list.count()
171-
}
172-
173-
pub(crate) fn start_tasks_count(&self) -> u64 {
174-
self.inner.lock().list.added()
175-
}
176-
177-
pub(crate) fn stop_tasks_count(&self) -> u64 {
178-
self.inner.lock().list.removed()
170+
pub(crate) fn tasks_count(&self) -> CounterPair {
171+
let lock = self.inner.lock();
172+
CounterPair {
173+
inc: lock.list.added(),
174+
dec: lock.list.removed(),
175+
}
179176
}
180177

181178
pub(crate) fn remove(&self, task: &Task<S>) -> Option<Task<S>> {

tokio/src/util/linked_list.rs

-6
Original file line numberDiff line numberDiff line change
@@ -272,12 +272,6 @@ impl<L: Link> CountedLinkedList<L, L::Target> {
272272
val
273273
}
274274

275-
pub(crate) fn count(&self) -> usize {
276-
// this subtraction can't underflow.
277-
// this cast can't overflow because the length of the linked list can't exceed usize.
278-
(self.added - self.removed) as usize
279-
}
280-
281275
pub(crate) fn added(&self) -> u64 {
282276
self.added
283277
}

tokio/tests/rt_metrics.rs

+14-17
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::sync::{Arc, Mutex};
66
use std::task::Poll;
77
use tokio::macros::support::poll_fn;
88

9+
use tokio::runtime::CounterPair;
910
use tokio::runtime::Runtime;
1011
use tokio::task::consume_budget;
1112
use tokio::time::{self, Duration};
@@ -104,36 +105,32 @@ fn active_tasks_count() {
104105
fn active_tasks_count_pairs() {
105106
let rt = current_thread();
106107
let metrics = rt.metrics();
107-
assert_eq!(0, metrics.start_tasks_count());
108-
assert_eq!(0, metrics.stop_tasks_count());
108+
assert_eq!(CounterPair { inc: 0, dec: 0 }, metrics.task_counts());
109109

110110
rt.block_on(rt.spawn(async move {
111-
assert_eq!(1, metrics.start_tasks_count());
112-
assert_eq!(0, metrics.stop_tasks_count());
111+
assert_eq!(CounterPair { inc: 1, dec: 0 }, metrics.task_counts());
113112
}))
114113
.unwrap();
115114

116-
assert_eq!(1, rt.metrics().start_tasks_count());
117-
assert_eq!(1, rt.metrics().stop_tasks_count());
115+
assert_eq!(CounterPair { inc: 1, dec: 1 }, rt.metrics().task_counts());
118116

119117
let rt = threaded();
120118
let metrics = rt.metrics();
121-
assert_eq!(0, metrics.start_tasks_count());
122-
assert_eq!(0, metrics.stop_tasks_count());
119+
assert_eq!(CounterPair { inc: 0, dec: 0 }, metrics.task_counts());
123120

124121
rt.block_on(rt.spawn(async move {
125-
assert_eq!(1, metrics.start_tasks_count());
126-
assert_eq!(0, metrics.stop_tasks_count());
122+
assert_eq!(CounterPair { inc: 1, dec: 0 }, metrics.task_counts());
127123
}))
128124
.unwrap();
129125

130-
// for some reason, sometimes the stop count doesn't get a chance to incremenet before we get here.
131-
// Only observed on single-cpu systems. Most likely the worker thread doesn't a chance to clean up
132-
// the spawned task yet. We yield to give it an opportunity.
133-
std::thread::yield_now();
134-
135-
assert_eq!(1, rt.metrics().start_tasks_count());
136-
assert_eq!(1, rt.metrics().stop_tasks_count());
126+
for _ in 0..100 {
127+
if rt.metrics().task_counts() == (CounterPair { inc: 1, dec: 1 }) {
128+
return;
129+
}
130+
// on single threaded machines (like in CI), we need to force the OS to run the runtime threads
131+
std::thread::yield_now();
132+
}
133+
panic!("runtime didn't decrement active task gauge")
137134
}
138135

139136
#[test]

0 commit comments

Comments
 (0)