Skip to content

Commit b954af4

Browse files
Add pausable worker for telemetry
1 parent c3b43bc commit b954af4

File tree

4 files changed

+170
-63
lines changed

4 files changed

+170
-63
lines changed

data-pipeline/src/telemetry/mod.rs

Lines changed: 57 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@ use datadog_trace_utils::{
1212
};
1313
use ddcommon::tag::Tag;
1414
use ddtelemetry::worker::{
15-
LifecycleAction, TelemetryActions, TelemetryWorkerBuilder, TelemetryWorkerFlavor,
16-
TelemetryWorkerHandle,
15+
LifecycleAction, TelemetryActions, TelemetryWorker, TelemetryWorkerBuilder,
16+
TelemetryWorkerFlavor, TelemetryWorkerHandle,
1717
};
1818
use std::{collections::HashMap, time::Duration};
19-
use tokio::task::JoinHandle;
19+
use tokio::runtime::Handle;
2020

2121
/// Structure to build a Telemetry client.
2222
///
@@ -86,7 +86,10 @@ impl TelemetryClientBuilder {
8686
}
8787

8888
/// Builds the telemetry client.
89-
pub async fn build(self) -> Result<TelemetryClient, TelemetryError> {
89+
pub fn build(
90+
self,
91+
runtime: Handle,
92+
) -> Result<(TelemetryClient, TelemetryWorker), TelemetryError> {
9093
#[allow(clippy::unwrap_used)]
9194
let mut builder = TelemetryWorkerBuilder::new_fetch_host(
9295
self.service_name.unwrap(),
@@ -102,16 +105,17 @@ impl TelemetryClientBuilder {
102105
builder.runtime_id = Some(id);
103106
}
104107

105-
let (worker, handle) = builder
106-
.spawn()
107-
.await
108+
let (worker_handle, worker) = builder
109+
.build_worker(runtime)
108110
.map_err(|e| TelemetryError::Builder(e.to_string()))?;
109111

110-
Ok(TelemetryClient {
111-
handle,
112-
metrics: Metrics::new(&worker),
112+
Ok((
113+
TelemetryClient {
114+
metrics: Metrics::new(&worker_handle),
115+
worker: worker_handle,
116+
},
113117
worker,
114-
})
118+
))
115119
}
116120
}
117121

@@ -120,7 +124,6 @@ impl TelemetryClientBuilder {
120124
pub struct TelemetryClient {
121125
metrics: Metrics,
122126
worker: TelemetryWorkerHandle,
123-
handle: JoinHandle<()>,
124127
}
125128

126129
/// Telemetry describing the sending of a trace payload
@@ -246,26 +249,18 @@ impl TelemetryClient {
246249

247250
/// Starts the client
248251
pub async fn start(&self) {
249-
if let Err(_e) = self
252+
_ = self
250253
.worker
251254
.send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start))
252-
.await
253-
{
254-
self.handle.abort();
255-
}
255+
.await;
256256
}
257257

258258
/// Shutdowns the telemetry client.
259259
pub async fn shutdown(self) {
260-
if let Err(_e) = self
260+
_ = self
261261
.worker
262262
.send_msg(TelemetryActions::Lifecycle(LifecycleAction::Stop))
263-
.await
264-
{
265-
self.handle.abort();
266-
}
267-
268-
let _ = self.handle.await;
263+
.await;
269264
}
270265
}
271266

@@ -276,21 +271,23 @@ mod tests {
276271
use httpmock::MockServer;
277272
use hyper::{Response, StatusCode};
278273
use regex::Regex;
274+
use tokio::time::sleep;
279275

280276
use super::*;
281277

282278
async fn get_test_client(url: &str) -> TelemetryClient {
283-
TelemetryClientBuilder::default()
279+
let (client, mut worker) = TelemetryClientBuilder::default()
284280
.set_service_name("test_service")
285281
.set_language("test_language")
286282
.set_language_version("test_language_version")
287283
.set_tracer_version("test_tracer_version")
288284
.set_url(url)
289285
.set_heartbeat(100)
290286
.set_debug_enabled(true)
291-
.build()
292-
.await
293-
.unwrap()
287+
.build(Handle::current())
288+
.unwrap();
289+
tokio::spawn(async move { worker.run().await });
290+
client
294291
}
295292

296293
#[test]
@@ -320,15 +317,14 @@ mod tests {
320317
}
321318

322319
#[cfg_attr(miri, ignore)]
323-
#[tokio::test]
320+
#[tokio::test(flavor = "multi_thread")]
324321
async fn spawn_test() {
325322
let client = TelemetryClientBuilder::default()
326323
.set_service_name("test_service")
327324
.set_language("test_language")
328325
.set_language_version("test_language_version")
329326
.set_tracer_version("test_tracer_version")
330-
.build()
331-
.await;
327+
.build(Handle::current());
332328

333329
assert!(client.is_ok());
334330
}
@@ -356,6 +352,9 @@ mod tests {
356352
client.start().await;
357353
let _ = client.send(&data);
358354
client.shutdown().await;
355+
while telemetry_srv.hits_async().await == 0 {
356+
sleep(Duration::from_millis(10)).await;
357+
}
359358
telemetry_srv.assert_hits_async(1).await;
360359
}
361360

@@ -382,6 +381,9 @@ mod tests {
382381
client.start().await;
383382
let _ = client.send(&data);
384383
client.shutdown().await;
384+
while telemetry_srv.hits_async().await == 0 {
385+
sleep(Duration::from_millis(10)).await;
386+
}
385387
telemetry_srv.assert_hits_async(1).await;
386388
}
387389

@@ -408,6 +410,9 @@ mod tests {
408410
client.start().await;
409411
let _ = client.send(&data);
410412
client.shutdown().await;
413+
while telemetry_srv.hits_async().await == 0 {
414+
sleep(Duration::from_millis(10)).await;
415+
}
411416
telemetry_srv.assert_hits_async(1).await;
412417
}
413418

@@ -434,6 +439,9 @@ mod tests {
434439
client.start().await;
435440
let _ = client.send(&data);
436441
client.shutdown().await;
442+
while telemetry_srv.hits_async().await == 0 {
443+
sleep(Duration::from_millis(10)).await;
444+
}
437445
telemetry_srv.assert_hits_async(1).await;
438446
}
439447

@@ -460,6 +468,9 @@ mod tests {
460468
client.start().await;
461469
let _ = client.send(&data);
462470
client.shutdown().await;
471+
while telemetry_srv.hits_async().await == 0 {
472+
sleep(Duration::from_millis(10)).await;
473+
}
463474
telemetry_srv.assert_hits_async(1).await;
464475
}
465476

@@ -486,6 +497,9 @@ mod tests {
486497
client.start().await;
487498
let _ = client.send(&data);
488499
client.shutdown().await;
500+
while telemetry_srv.hits_async().await == 0 {
501+
sleep(Duration::from_millis(10)).await;
502+
}
489503
telemetry_srv.assert_hits_async(1).await;
490504
}
491505

@@ -512,6 +526,9 @@ mod tests {
512526
client.start().await;
513527
let _ = client.send(&data);
514528
client.shutdown().await;
529+
while telemetry_srv.hits_async().await == 0 {
530+
sleep(Duration::from_millis(10)).await;
531+
}
515532
telemetry_srv.assert_hits_async(1).await;
516533
}
517534

@@ -538,6 +555,9 @@ mod tests {
538555
client.start().await;
539556
let _ = client.send(&data);
540557
client.shutdown().await;
558+
while telemetry_srv.hits_async().await == 0 {
559+
sleep(Duration::from_millis(10)).await;
560+
}
541561
telemetry_srv.assert_hits_async(1).await;
542562
}
543563

@@ -675,10 +695,10 @@ mod tests {
675695
.set_url(&server.url("/"))
676696
.set_heartbeat(100)
677697
.set_runtime_id("foo")
678-
.build()
679-
.await;
698+
.build(Handle::current());
680699

681-
let client = result.unwrap();
700+
let (client, mut worker) = result.unwrap();
701+
tokio::spawn(async move { worker.run().await });
682702

683703
client.start().await;
684704
client
@@ -688,6 +708,9 @@ mod tests {
688708
})
689709
.unwrap();
690710
client.shutdown().await;
711+
while telemetry_srv.hits_async().await == 0 {
712+
sleep(Duration::from_millis(10)).await;
713+
}
691714
// One payload generate-metrics
692715
telemetry_srv.assert_hits_async(1).await;
693716
}

data-pipeline/src/trace_exporter/mod.rs

Lines changed: 43 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ pub mod agent_response;
44
pub mod error;
55
use crate::agent_info::{AgentInfoArc, AgentInfoFetcher};
66
use crate::stats_exporter::StatsExporter;
7-
use crate::telemetry::{SendPayloadTelemetry, TelemetryClient, TelemetryClientBuilder};
7+
use crate::telemetry::{self, SendPayloadTelemetry, TelemetryClient, TelemetryClientBuilder};
88
use crate::trace_exporter::error::{RequestError, TraceExporterError};
99
use crate::{
1010
health_metrics, health_metrics::HealthMetric, span_concentrator::SpanConcentrator,
@@ -22,6 +22,7 @@ use ddcommon::header::{
2222
};
2323
use ddcommon::tag::Tag;
2424
use ddcommon::{hyper_migration, parse_uri, tag, Endpoint};
25+
use ddtelemetry::worker::{self, TelemetryWorker};
2526
use dogstatsd_client::{new, Client, DogStatsDAction};
2627
use either::Either;
2728
use error::BuilderErrorKind;
@@ -68,24 +69,37 @@ pub enum TraceExporterOutputFormat {
6869
}
6970

7071
mod pausable_worker {
72+
use ddtelemetry::worker::TelemetryWorker;
7173
use tokio::{runtime::Runtime, select, task::JoinHandle};
7274
use tokio_util::sync::CancellationToken;
7375

7476
use crate::{agent_info::AgentInfoFetcher, stats_exporter::StatsExporter};
7577

7678
pub trait Worker {
77-
fn run(&self) -> impl std::future::Future<Output = ()> + Send;
79+
fn run(&mut self) -> impl std::future::Future<Output = ()> + Send;
80+
fn on_pause(&mut self);
7881
}
7982

8083
impl Worker for StatsExporter {
81-
async fn run(&self) {
82-
self.run().await
84+
async fn run(&mut self) {
85+
Self::run(self).await
8386
}
87+
fn on_pause(&mut self) {}
8488
}
8589

8690
impl Worker for AgentInfoFetcher {
87-
async fn run(&self) {
88-
self.run().await
91+
async fn run(&mut self) {
92+
Self::run(self).await
93+
}
94+
fn on_pause(&mut self) {}
95+
}
96+
97+
impl Worker for TelemetryWorker {
98+
async fn run(&mut self) {
99+
Self::run(self).await
100+
}
101+
fn on_pause(&mut self) {
102+
self.cleanup();
89103
}
90104
}
91105

@@ -107,7 +121,7 @@ mod pausable_worker {
107121
}
108122

109123
pub fn start(&mut self, rt: &Runtime) {
110-
if let Self::Paused { worker } = std::mem::replace(self, Self::InvalidState) {
124+
if let Self::Paused { mut worker } = std::mem::replace(self, Self::InvalidState) {
111125
let stop_token = CancellationToken::new();
112126
let cloned_token = stop_token.clone();
113127
let handle = rt.spawn(async move {
@@ -240,6 +254,7 @@ use pausable_worker::PausableWorker;
240254
struct TraceExporterWorkers {
241255
pub info: PausableWorker<AgentInfoFetcher>,
242256
pub stats: Option<PausableWorker<StatsExporter>>,
257+
pub telemetry: Option<PausableWorker<TelemetryWorker>>,
243258
}
244259

245260
/// The TraceExporter ingest traces from the tracers serialized as messagepack and forward them to
@@ -312,6 +327,12 @@ impl TraceExporter {
312327
if let Some(stats_worker) = &mut workers.stats {
313328
stats_worker.start(runtime);
314329
}
330+
if let Some(telemetry_worker) = &mut workers.telemetry {
331+
telemetry_worker.start(runtime);
332+
if let Some(client) = &self.telemetry {
333+
runtime.block_on(client.start());
334+
}
335+
}
315336
}
316337

317338
pub fn stop_worker(&self) {
@@ -324,6 +345,9 @@ impl TraceExporter {
324345
if let Some(stats_worker) = &mut workers.stats {
325346
stats_worker.stop().await
326347
};
348+
if let Some(telemetry_worker) = &mut workers.telemetry {
349+
telemetry_worker.stop().await
350+
};
327351
});
328352
}
329353
// Drop runtime to shutdown all threads
@@ -1137,15 +1161,21 @@ impl TraceExporterBuilder {
11371161
if let Some(id) = telemetry_config.runtime_id {
11381162
builder = builder.set_runtime_id(&id);
11391163
}
1140-
builder.build().await
1164+
builder.build(runtime.handle().clone())
11411165
})?)
11421166
} else {
11431167
None
11441168
};
11451169

1146-
if let Some(client) = &telemetry {
1147-
runtime.block_on(client.start());
1148-
}
1170+
let (telemetry_client, telemetry_worker) = match telemetry {
1171+
Some((client, worker)) => {
1172+
let mut telemetry_worker = PausableWorker::new(worker);
1173+
telemetry_worker.start(&runtime);
1174+
runtime.block_on(client.start());
1175+
(Some(client), Some(telemetry_worker))
1176+
}
1177+
None => (None, None),
1178+
};
11491179

11501180
Ok(TraceExporter {
11511181
endpoint: Endpoint {
@@ -1177,10 +1207,11 @@ impl TraceExporterBuilder {
11771207
client_side_stats: ArcSwap::new(stats.into()),
11781208
agent_info,
11791209
previous_info_state: ArcSwapOption::new(None),
1180-
telemetry,
1210+
telemetry: telemetry_client,
11811211
workers: Arc::new(Mutex::new(TraceExporterWorkers {
11821212
info: info_fetcher_worker,
11831213
stats: None,
1214+
telemetry: telemetry_worker,
11841215
})),
11851216
})
11861217
}

0 commit comments

Comments
 (0)