Skip to content

Commit 3782462

Browse files
Move PausableWorker to a separate file
1 parent 575db6c commit 3782462

File tree

3 files changed

+91
-87
lines changed

3 files changed

+91
-87
lines changed

data-pipeline/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
1313
pub mod agent_info;
1414
mod health_metrics;
15+
pub mod pausable_worker;
1516
#[allow(missing_docs)]
1617
pub mod span_concentrator;
1718
#[allow(missing_docs)]

data-pipeline/src/pausable_worker.rs

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
//! Defines a pausable worker to be able to stop background processes before forks
2+
use ddtelemetry::worker::TelemetryWorker;
3+
use tokio::{runtime::Runtime, select, task::JoinHandle};
4+
use tokio_util::sync::CancellationToken;
5+
6+
use crate::{agent_info::AgentInfoFetcher, stats_exporter::StatsExporter};
7+
8+
pub trait Worker {
9+
/// Main worker loop
10+
fn run(&mut self) -> impl std::future::Future<Output = ()> + Send;
11+
/// Hook called on the worker when pausing.
12+
/// The worker can be paused on any await call. This hook can be use to clean the worker
13+
/// state after pausing.
14+
fn on_pause(&mut self);
15+
}
16+
17+
impl Worker for StatsExporter {
18+
async fn run(&mut self) {
19+
Self::run(self).await
20+
}
21+
fn on_pause(&mut self) {}
22+
}
23+
24+
impl Worker for AgentInfoFetcher {
25+
async fn run(&mut self) {
26+
Self::run(self).await
27+
}
28+
fn on_pause(&mut self) {}
29+
}
30+
31+
impl Worker for TelemetryWorker {
32+
async fn run(&mut self) {
33+
Self::run(self).await
34+
}
35+
fn on_pause(&mut self) {
36+
self.cleanup();
37+
}
38+
}
39+
40+
/// A pausable worker which can be paused and restarded on forks.
41+
#[derive(Debug)]
42+
pub enum PausableWorker<T: Worker + Send + Sync + 'static> {
43+
Running {
44+
handle: JoinHandle<T>,
45+
stop_token: CancellationToken,
46+
},
47+
Paused {
48+
worker: T,
49+
},
50+
InvalidState,
51+
}
52+
53+
impl<T: Worker + Send + Sync + 'static> PausableWorker<T> {
54+
pub fn new(worker: T) -> Self {
55+
Self::Paused { worker }
56+
}
57+
58+
pub fn start(&mut self, rt: &Runtime) {
59+
if let Self::Paused { mut worker } = std::mem::replace(self, Self::InvalidState) {
60+
let stop_token = CancellationToken::new();
61+
let cloned_token = stop_token.clone();
62+
let handle = rt.spawn(async move {
63+
select! {
64+
_ = worker.run() => {worker}
65+
_ = cloned_token.cancelled() => {worker}
66+
}
67+
});
68+
69+
*self = PausableWorker::Running { handle, stop_token };
70+
}
71+
}
72+
73+
pub async fn stop(&mut self) {
74+
if let PausableWorker::Running { handle, stop_token } = self {
75+
stop_token.cancel();
76+
let worker = handle.await.unwrap();
77+
worker.on_pause();
78+
*self = PausableWorker::Paused { worker };
79+
}
80+
}
81+
82+
/// Wait for the run method of the worker to exit.
83+
pub async fn join(&mut self) {
84+
if let PausableWorker::Running { handle, stop_token } = self {
85+
let worker = handle.await.unwrap();
86+
*self = PausableWorker::Paused { worker };
87+
}
88+
}
89+
}

data-pipeline/src/trace_exporter/mod.rs

Lines changed: 1 addition & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
pub mod agent_response;
44
pub mod error;
55
use crate::agent_info::{AgentInfoArc, AgentInfoFetcher};
6+
use crate::pausable_worker::PausableWorker;
67
use crate::stats_exporter::StatsExporter;
78
use crate::telemetry::{self, SendPayloadTelemetry, TelemetryClient, TelemetryClientBuilder};
89
use crate::trace_exporter::error::{RequestError, TraceExporterError};
@@ -68,91 +69,6 @@ pub enum TraceExporterOutputFormat {
6869
V05,
6970
}
7071

71-
mod pausable_worker {
72-
use ddtelemetry::worker::TelemetryWorker;
73-
use tokio::{runtime::Runtime, select, task::JoinHandle};
74-
use tokio_util::sync::CancellationToken;
75-
76-
use crate::{agent_info::AgentInfoFetcher, stats_exporter::StatsExporter};
77-
78-
pub trait Worker {
79-
fn run(&mut self) -> impl std::future::Future<Output = ()> + Send;
80-
fn on_pause(&mut self);
81-
}
82-
83-
impl Worker for StatsExporter {
84-
async fn run(&mut self) {
85-
Self::run(self).await
86-
}
87-
fn on_pause(&mut self) {}
88-
}
89-
90-
impl Worker for AgentInfoFetcher {
91-
async fn run(&mut self) {
92-
Self::run(self).await
93-
}
94-
fn on_pause(&mut self) {}
95-
}
96-
97-
impl Worker for TelemetryWorker {
98-
async fn run(&mut self) {
99-
Self::run(self).await
100-
}
101-
fn on_pause(&mut self) {
102-
self.cleanup();
103-
}
104-
}
105-
106-
#[derive(Debug)]
107-
pub enum PausableWorker<T: Worker + Send + Sync + 'static> {
108-
Running {
109-
handle: JoinHandle<T>,
110-
stop_token: CancellationToken,
111-
},
112-
Paused {
113-
worker: T,
114-
},
115-
InvalidState,
116-
}
117-
118-
impl<T: Worker + Send + Sync + 'static> PausableWorker<T> {
119-
pub fn new(worker: T) -> Self {
120-
Self::Paused { worker }
121-
}
122-
123-
pub fn start(&mut self, rt: &Runtime) {
124-
if let Self::Paused { mut worker } = std::mem::replace(self, Self::InvalidState) {
125-
let stop_token = CancellationToken::new();
126-
let cloned_token = stop_token.clone();
127-
let handle = rt.spawn(async move {
128-
select! {
129-
_ = worker.run() => {worker}
130-
_ = cloned_token.cancelled() => {worker}
131-
}
132-
});
133-
134-
*self = PausableWorker::Running { handle, stop_token };
135-
}
136-
}
137-
138-
pub async fn stop(&mut self) {
139-
if let PausableWorker::Running { handle, stop_token } = self {
140-
stop_token.cancel();
141-
let worker = handle.await.unwrap();
142-
*self = PausableWorker::Paused { worker };
143-
}
144-
}
145-
146-
/// Wait for the run method of the worker to exit.
147-
pub async fn join(&mut self) {
148-
if let PausableWorker::Running { handle, stop_token } = self {
149-
let worker = handle.await.unwrap();
150-
*self = PausableWorker::Paused { worker };
151-
}
152-
}
153-
}
154-
}
155-
15672
impl TraceExporterOutputFormat {
15773
/// Add the agent trace endpoint path to the URL.
15874
fn add_path(&self, url: &Uri) -> Uri {
@@ -248,8 +164,6 @@ enum StatsComputationStatus {
248164
},
249165
}
250166

251-
use pausable_worker::PausableWorker;
252-
253167
#[derive(Debug)]
254168
struct TraceExporterWorkers {
255169
pub info: PausableWorker<AgentInfoFetcher>,

0 commit comments

Comments
 (0)