Skip to content

Commit d0dd988

Browse files
committed
feat(sidecar): use cached telemetry clients
Signed-off-by: Alexandre Rulleau <[email protected]>
1 parent a4fc2ae commit d0dd988

File tree

1 file changed

+139
-71
lines changed

1 file changed

+139
-71
lines changed

datadog-sidecar/src/service/sidecar_server.rs

Lines changed: 139 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
use crate::log;
55
use crate::log::{get_multi_log_filter, get_multi_log_writer, TemporarilyRetainedMapStats};
6+
use crate::primary_sidecar_identifier;
67
use crate::service::{
78
sidecar_interface::ServeSidecarInterface,
89
telemetry::{AppInstance, AppOrQueue},
@@ -11,7 +12,8 @@ use crate::service::{
1112
RuntimeInfo, RuntimeMetadata, SerializedTracerHeaderTags, SessionConfig, SessionInfo,
1213
SidecarAction, SidecarInterface, SidecarInterfaceRequest, SidecarInterfaceResponse,
1314
};
14-
use data_pipeline::telemetry::TelemetryClient;
15+
use base64::prelude::BASE64_URL_SAFE_NO_PAD;
16+
use base64::Engine;
1517
use datadog_ipc::platform::{AsyncChannel, ShmHandle};
1618
use datadog_ipc::tarpc;
1719
use datadog_ipc::tarpc::context::Context;
@@ -21,19 +23,25 @@ use datadog_trace_utils::tracer_payload::decode_to_trace_chunks;
2123
use datadog_trace_utils::tracer_payload::TraceEncoding;
2224
use ddcommon::{Endpoint, MutexExt};
2325
use ddtelemetry::worker::{
24-
LifecycleAction, TelemetryActions, TelemetryWorkerBuilder, TelemetryWorkerStats,
26+
LifecycleAction, TelemetryActions, TelemetryWorkerBuilder, TelemetryWorkerHandle,
27+
TelemetryWorkerStats,
2528
};
2629
use futures::future;
2730
use futures::future::{join_all, Ready};
2831
use manual_future::{ManualFuture, ManualFutureCompleter};
2932
use std::borrow::Cow;
3033
use std::collections::hash_map::Entry;
3134
use std::collections::{HashMap, HashSet};
35+
use std::ffi::CString;
36+
use std::hash::{Hash, Hasher};
37+
use std::os::unix::ffi::OsStrExt;
38+
use std::path::PathBuf;
3239
use std::pin::Pin;
3340
use std::sync::atomic::{AtomicU64, Ordering};
3441
use std::sync::{Arc, Mutex};
3542
use std::time::{Duration, Instant};
3643
use tracing::{debug, error, info, trace, warn};
44+
use zwohash::ZwoHasher;
3745

3846
use futures::FutureExt;
3947
use serde::{Deserialize, Serialize};
@@ -95,7 +103,7 @@ unsafe impl Send for ProcessHandle {}
95103
unsafe impl Sync for ProcessHandle {}
96104

97105
struct TelemetryCachedClient {
98-
client: TelemetryClient,
106+
client: TelemetryWorkerHandle,
99107
last_used: Instant,
100108
}
101109

@@ -167,6 +175,66 @@ impl Default for SidecarServer {
167175
}
168176

169177
impl SidecarServer {
178+
pub fn get_or_create_telemetry_worker_handle(
179+
&self,
180+
service: &str,
181+
env: &str,
182+
version: &str,
183+
instance_id: &InstanceId,
184+
runtime_meta: &RuntimeMetadata,
185+
) -> Option<TelemetryWorkerHandle> {
186+
let key = (service.to_string(), env.to_string(), version.to_string());
187+
188+
let mut clients = self.telemetry_clients.lock_or_panic();
189+
190+
if let Some(existing) = clients.get_mut(&key) {
191+
existing.last_used = Instant::now();
192+
return Some(existing.client.clone());
193+
}
194+
195+
// Build and spawn worker
196+
let mut builder = TelemetryWorkerBuilder::new_fetch_host(
197+
service.to_string(),
198+
runtime_meta.language_name.to_string(),
199+
runtime_meta.language_version.to_string(),
200+
runtime_meta.tracer_version.to_string(),
201+
);
202+
203+
builder.runtime_id = Some(instance_id.runtime_id.clone());
204+
builder.application.env = Some(env.to_string());
205+
206+
let session_info = self.get_session(&instance_id.session_id);
207+
let config = session_info
208+
.session_config
209+
.lock_or_panic()
210+
.clone()
211+
.unwrap_or_else(ddtelemetry::config::Config::from_env);
212+
213+
builder.config = config.clone();
214+
215+
match builder.spawn().now_or_never() {
216+
Some(Ok((handle, _join))) => {
217+
clients.insert(
218+
key,
219+
TelemetryCachedClient {
220+
client: handle.clone(),
221+
last_used: Instant::now(),
222+
},
223+
);
224+
info!("spawning telemetry worker {config:?}");
225+
Some(handle)
226+
}
227+
Some(Err(e)) => {
228+
error!("failed to spawn telemetry worker: {:?}", e);
229+
None
230+
}
231+
None => {
232+
error!("failed to spawn telemetry worker: spawn did not complete immediately");
233+
None
234+
}
235+
}
236+
}
237+
170238
/// Accepts a new connection and starts processing requests.
171239
///
172240
/// This function creates a new `tarpc` server with the provided `async_channel` and starts
@@ -467,6 +535,22 @@ impl SidecarServer {
467535
}
468536
}
469537

538+
pub fn path_for_telemetry(instance_id: &InstanceId) -> CString {
539+
let mut hasher = ZwoHasher::default();
540+
instance_id.runtime_id.hash(&mut hasher);
541+
let hash = hasher.finish();
542+
543+
let mut path = format!(
544+
"/ddtl{}-{}",
545+
primary_sidecar_identifier(),
546+
BASE64_URL_SAFE_NO_PAD.encode(hash.to_ne_bytes()),
547+
);
548+
path.truncate(31);
549+
550+
#[allow(clippy::unwrap_used)]
551+
CString::new(path).unwrap()
552+
}
553+
470554
impl SidecarInterface for SidecarServer {
471555
type EnqueueActionsFut = NoResponse;
472556

@@ -543,6 +627,20 @@ impl SidecarInterface for SidecarServer {
543627
to_process, &mut app,
544628
)
545629
.await;
630+
631+
let path = PathBuf::from(std::ffi::OsStr::from_bytes(
632+
path_for_telemetry(&instance_id).as_bytes(),
633+
));
634+
if let Err(e) = std::fs::write(
635+
&path,
636+
simd_json::to_vec(&actions).unwrap_or_default(),
637+
) {
638+
error!(
639+
"Failed to write telemetry SHM file at {:?}: {:?}",
640+
path, e
641+
);
642+
}
643+
546644
app.telemetry.send_msgs(actions).await.ok();
547645
}
548646
});
@@ -613,33 +711,6 @@ impl SidecarInterface for SidecarServer {
613711
let rt_info = self.get_runtime(&instance_id);
614712
let manual_app_future = rt_info.get_app(&service_name, &env_name);
615713

616-
let instance_future = if manual_app_future.completer.is_some() {
617-
let mut builder = TelemetryWorkerBuilder::new_fetch_host(
618-
service_name.to_owned(),
619-
runtime_meta.language_name.to_owned(),
620-
runtime_meta.language_version.to_owned(),
621-
runtime_meta.tracer_version.to_owned(),
622-
);
623-
builder.runtime_id = Some(instance_id.runtime_id.to_owned());
624-
builder.application.env = Some(env_name.to_owned());
625-
let session_info = self.get_session(&instance_id.session_id);
626-
let mut config = session_info
627-
.session_config
628-
.lock_or_panic()
629-
.clone()
630-
.unwrap_or_else(ddtelemetry::config::Config::from_env);
631-
config.restartable = true;
632-
builder.config = config.clone();
633-
Some(builder.spawn().map(move |result| {
634-
if result.is_ok() {
635-
info!("spawning telemetry worker {config:?}");
636-
}
637-
result
638-
}))
639-
} else {
640-
None
641-
};
642-
643714
let (buffered_paths, buffered_integrations) = {
644715
let mut apps = rt_info.lock_applications();
645716
if let Some(app_data) = apps.get_mut(&queue_id) {
@@ -652,64 +723,62 @@ impl SidecarInterface for SidecarServer {
652723
}
653724
};
654725

655-
tokio::spawn(async move {
656-
if let Some(instance_future) = instance_future {
657-
let instance_option = match instance_future.await {
658-
Ok((handle, worker_join)) => {
659-
let instance = AppInstance {
660-
telemetry: handle,
661-
telemetry_worker_shutdown: worker_join
662-
.map(Result::ok)
663-
.boxed()
664-
.shared(),
665-
telemetry_metrics: Default::default(),
666-
};
667-
668-
let mut actions: Vec<TelemetryActions> = vec![];
669-
enqueued_data.extract_telemetry_actions(&mut actions).await;
670-
instance.telemetry.send_msgs(actions).await.ok();
726+
let worker_handle = if manual_app_future.completer.is_some() {
727+
self.get_or_create_telemetry_worker_handle(
728+
&service_name,
729+
&env_name,
730+
&runtime_meta.tracer_version,
731+
&instance_id,
732+
&runtime_meta,
733+
)
734+
} else {
735+
None
736+
};
671737

672-
instance
673-
.telemetry
674-
.send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start))
675-
.await
676-
.ok();
677-
Some(instance)
678-
}
679-
Err(e) => {
680-
error!("could not spawn telemetry worker {:?}", e);
681-
None
682-
}
738+
tokio::spawn(async move {
739+
if let Some(worker) = worker_handle {
740+
let instance = AppInstance {
741+
telemetry: worker.clone(),
742+
telemetry_worker_shutdown: futures::future::pending().boxed().shared(),
743+
telemetry_metrics: Default::default(),
683744
};
684745

685-
#[allow(clippy::expect_used)]
686-
manual_app_future
687-
.completer
688-
.expect("Completed expected Some ManualFuture for application instance, but found none")
689-
.complete(instance_option)
690-
.await;
691-
}
746+
let mut actions = vec![];
692747

693-
if let Some(mut app) = manual_app_future.app_future.await {
694-
let mut flush_actions: Vec<TelemetryActions> = vec![];
748+
enqueued_data.extract_telemetry_actions(&mut actions).await;
695749

696750
for integration in buffered_integrations {
697-
flush_actions.push(TelemetryActions::AddIntegration(integration));
751+
actions.push(TelemetryActions::AddIntegration(integration));
698752
}
699753

700754
for path in buffered_paths {
701755
for dep in EnqueuedTelemetryData::extract_composer_telemetry(path)
702756
.await
703757
.iter()
704758
{
705-
flush_actions.push(TelemetryActions::AddDependency(dep.clone()));
759+
actions.push(TelemetryActions::AddDependency(dep.clone()));
706760
}
707761
}
708762

709-
if !flush_actions.is_empty() {
710-
app.telemetry.send_msgs(flush_actions).await.ok();
763+
if !actions.is_empty() {
764+
worker.send_msgs(actions).await.ok();
711765
}
712766

767+
instance
768+
.telemetry
769+
.send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start))
770+
.await
771+
.ok();
772+
773+
#[allow(clippy::expect_used)]
774+
manual_app_future
775+
.completer
776+
.expect("Expected Some completer")
777+
.complete(Some(instance))
778+
.await;
779+
}
780+
781+
if let Some(mut app) = manual_app_future.app_future.await {
713782
// Register metrics
714783
for metric in std::mem::take(&mut enqueued_data.metrics).into_iter() {
715784
app.register_metric(metric);
@@ -726,7 +795,6 @@ impl SidecarInterface for SidecarServer {
726795
if actions.iter().any(|action| {
727796
matches!(action, TelemetryActions::Lifecycle(LifecycleAction::Stop))
728797
}) {
729-
// Avoid self.get_runtime(), it could create a new one.
730798
if let Some(session) =
731799
self.sessions.lock_or_panic().get(&instance_id.session_id)
732800
{

0 commit comments

Comments
 (0)