Skip to content

Commit 74e40d7

Browse files
committed
feat(sidecar): apply feedbacks
Signed-off-by: Alexandre Rulleau <[email protected]>
1 parent 2671326 commit 74e40d7

File tree

8 files changed

+202
-189
lines changed

8 files changed

+202
-189
lines changed

datadog-sidecar-ffi/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,7 @@ pub unsafe extern "C" fn ddog_sidecar_session_set_config(
540540
remote_config_products_count: usize,
541541
remote_config_capabilities: *const RemoteConfigCapabilities,
542542
remote_config_capabilities_count: usize,
543+
remote_config_enabled: bool,
543544
is_fork: bool,
544545
) -> MaybeError {
545546
#[cfg(unix)]
@@ -583,6 +584,7 @@ pub unsafe extern "C" fn ddog_sidecar_session_set_config(
583584
)
584585
.as_slice()
585586
.to_vec(),
587+
remote_config_enabled,
586588
},
587589
is_fork
588590
));
@@ -736,7 +738,7 @@ pub unsafe extern "C" fn ddog_sidecar_set_remote_config_data(
736738
app_version: ffi::CharSlice,
737739
global_tags: &ddcommon_ffi::Vec<Tag>,
738740
) -> MaybeError {
739-
try_c!(blocking::set_remote_config_data(
741+
try_c!(blocking::set_universal_service_tags(
740742
transport,
741743
instance_id,
742744
queue_id,

datadog-sidecar-ffi/tests/sidecar.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ fn test_ddog_sidecar_register_app() {
107107
null(),
108108
0,
109109
false,
110+
false,
110111
)
111112
.unwrap_none();
112113

@@ -156,6 +157,7 @@ fn test_ddog_sidecar_register_app() {
156157
null(),
157158
0,
158159
false,
160+
false,
159161
)
160162
.unwrap_none();
161163

datadog-sidecar/src/service/blocking.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ pub fn acquire_exception_hash_rate_limiter(
383383
/// # Returns
384384
///
385385
/// An `io::Result<()>` indicating the result of the operation.
386-
pub fn set_remote_config_data(
386+
pub fn set_universal_service_tags(
387387
transport: &mut SidecarTransport,
388388
instance_id: &InstanceId,
389389
queue_id: &QueueId,
@@ -392,7 +392,7 @@ pub fn set_remote_config_data(
392392
app_version: String,
393393
global_tags: Vec<Tag>,
394394
) -> io::Result<()> {
395-
transport.send(SidecarInterfaceRequest::SetRemoteConfigData {
395+
transport.send(SidecarInterfaceRequest::SetUniversalServiceTags {
396396
instance_id: instance_id.clone(),
397397
queue_id: *queue_id,
398398
service_name,

datadog-sidecar/src/service/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ pub struct SessionConfig {
5959
pub log_file: config::LogMethod,
6060
pub remote_config_products: Vec<RemoteConfigProduct>,
6161
pub remote_config_capabilities: Vec<RemoteConfigCapabilities>,
62+
pub remote_config_enabled: bool,
6263
}
6364

6465
#[derive(Debug, Deserialize, Serialize)]

datadog-sidecar/src/service/session_info.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ pub(crate) struct SessionInfo {
4242
Arc<Mutex<Option<(MultiEnvFilterGuard<'static>, MultiWriterGuard<'static>)>>>,
4343
pub(crate) session_id: String,
4444
pub(crate) pid: Arc<AtomicI32>,
45+
pub(crate) remote_config_enabled: bool,
4546
}
4647

4748
impl Clone for SessionInfo {
@@ -60,6 +61,7 @@ impl Clone for SessionInfo {
6061
log_guard: self.log_guard.clone(),
6162
session_id: self.session_id.clone(),
6263
pid: self.pid.clone(),
64+
remote_config_enabled: self.remote_config_enabled,
6365
}
6466
}
6567
}

datadog-sidecar/src/service/sidecar_interface.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ pub trait SidecarInterface {
137137
/// * `granularity` - how much time needs to pass between two exceptions
138138
async fn acquire_exception_hash_rate_limiter(exception_hash: u64, granularity: Duration);
139139

140-
/// Sets contextual data for the remote config client.
140+
/// Sets contextual data
141141
///
142142
/// # Arguments
143143
/// * `instance_id` - The ID of the instance.
@@ -146,7 +146,7 @@ pub trait SidecarInterface {
146146
/// * `env_name` - The name of the environment.
147147
/// * `app_version` - The application version.
148148
/// * `global_tags` - Global tags which need to be propagated.
149-
async fn set_remote_config_data(
149+
async fn set_universal_service_tags(
150150
instance_id: InstanceId,
151151
queue_id: QueueId,
152152
service_name: String,

datadog-sidecar/src/service/sidecar_server.rs

Lines changed: 45 additions & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@
33

44
use crate::log::{TemporarilyRetainedMapStats, MULTI_LOG_FILTER, MULTI_LOG_WRITER};
55
use crate::service::{
6-
sidecar_interface::ServeSidecarInterface, tracing::TraceFlusher, InstanceId, QueueId,
7-
RequestIdentification, RequestIdentifier, RuntimeInfo, RuntimeMetadata,
6+
sidecar_interface::ServeSidecarInterface,
7+
telemetry::{TelemetryCachedClient, TelemetryCachedClientSet},
8+
tracing::TraceFlusher,
9+
InstanceId, QueueId, RequestIdentification, RequestIdentifier, RuntimeInfo, RuntimeMetadata,
810
SerializedTracerHeaderTags, SessionConfig, SessionInfo, SidecarAction, SidecarInterface,
911
SidecarInterfaceRequest, SidecarInterfaceResponse,
1012
};
11-
use base64::prelude::BASE64_URL_SAFE_NO_PAD;
12-
use base64::Engine;
1313
use datadog_ipc::platform::{AsyncChannel, ShmHandle};
1414
use datadog_ipc::tarpc;
1515
use datadog_ipc::tarpc::context::Context;
@@ -18,25 +18,18 @@ use datadog_trace_utils::trace_utils::SendData;
1818
use datadog_trace_utils::tracer_payload::decode_to_trace_chunks;
1919
use datadog_trace_utils::tracer_payload::TraceEncoding;
2020
use ddcommon::{Endpoint, MutexExt};
21-
use ddtelemetry::worker::{
22-
LifecycleAction, TelemetryActions, TelemetryWorkerBuilder, TelemetryWorkerStats,
23-
};
21+
use ddtelemetry::worker::{LifecycleAction, TelemetryActions, TelemetryWorkerStats};
2422
use futures::future;
2523
use futures::future::Ready;
2624
use manual_future::ManualFutureCompleter;
2725
use std::borrow::Cow;
2826
use std::collections::hash_map::Entry;
2927
use std::collections::{HashMap, HashSet};
30-
use std::ffi::CString;
31-
use std::hash::{Hash, Hasher};
32-
use std::os::unix::ffi::OsStrExt;
33-
use std::path::PathBuf;
3428
use std::pin::Pin;
3529
use std::sync::atomic::{AtomicU64, Ordering};
3630
use std::sync::{Arc, Mutex};
37-
use std::time::{Duration, Instant};
31+
use std::time::Duration;
3832
use tracing::{debug, error, info, trace, warn};
39-
use zwohash::ZwoHasher;
4033

4134
use futures::FutureExt;
4235
use serde::{Deserialize, Serialize};
@@ -122,107 +115,7 @@ pub struct SidecarServer {
122115
process_handle: Option<ProcessHandle>,
123116
}
124117

125-
pub fn path_for_telemetry(service: &str, env: &str, version: &str) -> CString {
126-
let mut hasher = ZwoHasher::default();
127-
service.hash(&mut hasher);
128-
env.hash(&mut hasher);
129-
version.hash(&mut hasher);
130-
let hash = hasher.finish();
131-
132-
let mut path = format!(
133-
"/ddtl{}-{}",
134-
primary_sidecar_identifier(),
135-
BASE64_URL_SAFE_NO_PAD.encode(hash.to_ne_bytes()),
136-
);
137-
path.truncate(31);
138-
139-
#[allow(clippy::unwrap_used)]
140-
CString::new(path).unwrap()
141-
}
142-
143118
impl SidecarServer {
144-
pub fn get_or_create_telemetry_client(
145-
&self,
146-
service: &str,
147-
env: &str,
148-
version: &str,
149-
instance_id: &InstanceId,
150-
runtime_meta: &RuntimeMetadata,
151-
) -> Option<TelemetryCachedClient> {
152-
let key = (service.to_string(), env.to_string(), version.to_string());
153-
154-
let mut clients = self.telemetry_clients.inner.lock_or_panic();
155-
156-
if let Some(existing) = clients.get_mut(&key) {
157-
existing.last_used = Instant::now();
158-
let telemetry = existing.clone();
159-
tokio::spawn(async move {
160-
telemetry
161-
.client
162-
.send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start))
163-
.await
164-
.ok();
165-
});
166-
return Some(existing.clone());
167-
}
168-
169-
// Build and spawn worker
170-
let mut builder = TelemetryWorkerBuilder::new_fetch_host(
171-
service.to_string(),
172-
runtime_meta.language_name.to_string(),
173-
runtime_meta.language_version.to_string(),
174-
runtime_meta.tracer_version.to_string(),
175-
);
176-
177-
builder.runtime_id = Some(instance_id.runtime_id.clone());
178-
builder.application.env = Some(env.to_string());
179-
180-
let session_info = self.get_session(&instance_id.session_id);
181-
let config = session_info
182-
.session_config
183-
.lock_or_panic()
184-
.clone()
185-
.unwrap_or_else(ddtelemetry::config::Config::from_env);
186-
187-
builder.config = config.clone();
188-
189-
match builder.spawn().now_or_never() {
190-
Some(Ok((handle, _join))) => {
191-
let telemetry_client = TelemetryCachedClient {
192-
client: handle.clone(),
193-
shmem_file: PathBuf::from(std::ffi::OsStr::from_bytes(
194-
path_for_telemetry(service, env, version).as_bytes(),
195-
)),
196-
last_used: Instant::now(),
197-
buffered_integrations: HashSet::new(),
198-
buffered_composer_paths: HashSet::new(),
199-
telemetry_metrics: Default::default(),
200-
};
201-
clients.insert(key, telemetry_client.clone());
202-
203-
let telemetry = telemetry_client.clone();
204-
tokio::spawn(async move {
205-
telemetry
206-
.client
207-
.send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start))
208-
.await
209-
.ok();
210-
});
211-
212-
info!("spawning telemetry worker {config:?}");
213-
Some(telemetry_client)
214-
}
215-
Some(Err(e)) => {
216-
error!("failed to spawn telemetry worker: {:?}", e);
217-
None
218-
}
219-
None => {
220-
error!("failed to spawn telemetry worker: spawn did not complete immediately");
221-
None
222-
}
223-
}
224-
}
225-
226119
/// Accepts a new connection and starts processing requests.
227120
///
228121
/// This function creates a new `tarpc` server with the provided `async_channel` and starts
@@ -494,30 +387,41 @@ impl SidecarInterface for SidecarServer {
494387
let version = value.app_version.clone().unwrap();
495388
let service = value.service_name.clone().unwrap();
496389

497-
let mut telemetry = match self.get_or_create_telemetry_client(
390+
let mut telemetry = match self.telemetry_clients.get_or_create(
498391
&service,
499392
&env,
500393
&version,
501394
&instance_id,
502395
&runtime_metadata,
396+
|| {
397+
self.get_session(&instance_id.session_id)
398+
.session_config
399+
.lock_or_panic()
400+
.clone()
401+
},
503402
) {
504403
Some(client) => client,
505404
None => return no_response(),
506405
};
507406

508407
let mut actions_to_process = vec![];
509408
let mut composer_paths_to_process = vec![];
409+
let mut buffered_info_changed = false;
510410

511411
for action in actions {
512-
match &action {
513-
SidecarAction::Telemetry(TelemetryActions::AddIntegration(integration)) => {
412+
match action {
413+
SidecarAction::Telemetry(TelemetryActions::AddIntegration(
414+
ref integration,
415+
)) => {
514416
if telemetry.buffered_integrations.insert(integration.clone()) {
515417
actions_to_process.push(action);
418+
buffered_info_changed = true;
516419
}
517420
}
518421
SidecarAction::PhpComposerTelemetryFile(path) => {
519422
if telemetry.buffered_composer_paths.insert(path.clone()) {
520-
composer_paths_to_process.push(path.clone());
423+
composer_paths_to_process.push(path);
424+
buffered_info_changed = true;
521425
}
522426
}
523427
_ => {
@@ -560,16 +464,13 @@ impl SidecarInterface for SidecarServer {
560464
});
561465
}
562466

563-
// Synchronous SHM write
564-
if let Ok(buf) = bincode::serialize(&(
565-
&telemetry.buffered_integrations,
566-
&telemetry.buffered_composer_paths,
567-
)) {
568-
if let Err(e) = std::fs::write(&telemetry.shmem_file, buf) {
569-
error!(
570-
"Failed to write telemetry SHM file to {:?}: {:?}",
571-
telemetry.shmem_file, e
572-
);
467+
// Synchronous SHM write if changes have been made
468+
if buffered_info_changed {
469+
if let Ok(buf) = bincode::serialize(&(
470+
&telemetry.buffered_integrations,
471+
&telemetry.buffered_composer_paths,
472+
)) {
473+
telemetry.shm_writer.write(&buf);
573474
}
574475
}
575476
}
@@ -594,7 +495,7 @@ impl SidecarInterface for SidecarServer {
594495
) -> Self::SetSessionConfigFut {
595496
debug!("Set session config for {session_id} to {config:?}");
596497

597-
let session = self.get_session(&session_id);
498+
let mut session = self.get_session(&session_id);
598499
#[cfg(unix)]
599500
{
600501
session.pid.store(pid, Ordering::Relaxed);
@@ -604,6 +505,7 @@ impl SidecarInterface for SidecarServer {
604505
{
605506
*session.remote_config_notify_function.lock().unwrap() = remote_config_notify_function;
606507
}
508+
session.remote_config_enabled = config.remote_config_enabled;
607509
session.modify_telemetry_config(|cfg| {
608510
cfg.telemetry_heartbeat_interval = config.telemetry_heartbeat_interval;
609511
let endpoint =
@@ -833,9 +735,9 @@ impl SidecarInterface for SidecarServer {
833735
no_response()
834736
}
835737

836-
type SetRemoteConfigDataFut = NoResponse;
738+
type SetUniversalServiceTagsFut = NoResponse;
837739

838-
fn set_remote_config_data(
740+
fn set_universal_service_tags(
839741
self,
840742
_: Context,
841743
instance_id: InstanceId,
@@ -844,7 +746,7 @@ impl SidecarInterface for SidecarServer {
844746
env_name: String,
845747
app_version: String,
846748
global_tags: Vec<Tag>,
847-
) -> Self::SetRemoteConfigDataFut {
749+
) -> Self::SetUniversalServiceTagsFut {
848750
debug!("Registered remote config metadata: instance {instance_id:?}, queue_id: {queue_id:?}, service: {service_name}, env: {env_name}, version: {app_version}");
849751

850752
let session = self.get_session(&instance_id.session_id);
@@ -871,16 +773,18 @@ impl SidecarInterface for SidecarServer {
871773
let runtime_info = session.get_runtime(&instance_id.runtime_id);
872774
let mut applications = runtime_info.lock_applications();
873775
let app = applications.entry(queue_id).or_default();
874-
app.remote_config_guard = Some(self.remote_configs.add_runtime(
875-
invariants,
876-
*session.remote_config_interval.lock_or_panic(),
877-
instance_id.runtime_id,
878-
notify_target,
879-
env_name.clone(),
880-
service_name.clone(),
881-
app_version.clone(),
882-
global_tags.clone(),
883-
));
776+
if session.remote_config_enabled {
777+
app.remote_config_guard = Some(self.remote_configs.add_runtime(
778+
invariants,
779+
*session.remote_config_interval.lock_or_panic(),
780+
instance_id.runtime_id,
781+
notify_target,
782+
env_name.clone(),
783+
service_name.clone(),
784+
app_version.clone(),
785+
global_tags.clone(),
786+
));
787+
}
884788
app.set_metadata(env_name, app_version, service_name, global_tags);
885789

886790
no_response()

0 commit comments

Comments
 (0)