Skip to content

Commit e3f6149

Browse files
committed
feat(sidecar): buffer integrations and dependencies
Signed-off-by: Alexandre Rulleau <[email protected]>
1 parent 91a4388 commit e3f6149

File tree

5 files changed

+96
-25
lines changed

5 files changed

+96
-25
lines changed

datadog-sidecar-ffi/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,7 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_addDependency(
420420
let version =
421421
(!dependency_version.is_empty()).then(|| dependency_version.to_utf8_lossy().into_owned());
422422

423-
let dependency = TelemetryActions::AddDependecy(Dependency {
423+
let dependency = TelemetryActions::AddDependency(Dependency {
424424
name: dependency_name.to_utf8_lossy().into_owned(),
425425
version,
426426
});

datadog-sidecar/src/service/runtime_info.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,16 @@ use crate::service::{
88
};
99
use datadog_live_debugger::sender::{generate_tags, PayloadSender};
1010
use ddcommon::{tag::Tag, MutexExt};
11+
use ddtelemetry::data::Integration;
1112
use futures::{
1213
future::{self, join_all, Shared},
1314
FutureExt,
1415
};
1516
use manual_future::{ManualFuture, ManualFutureCompleter};
1617
use simd_json::prelude::ArrayTrait;
17-
use std::collections::HashMap;
18+
use std::collections::{HashMap, HashSet};
1819
use std::fmt::Display;
20+
use std::path::PathBuf;
1921
use std::sync::{Arc, Mutex, MutexGuard};
2022
use tracing::{debug, info};
2123

@@ -55,6 +57,8 @@ pub(crate) struct ActiveApplication {
5557
pub live_debugger_tag_cache: Option<Arc<String>>,
5658
pub debugger_logs_payload_sender: Arc<tokio::sync::Mutex<Option<PayloadSender>>>,
5759
pub debugger_diagnostics_payload_sender: Arc<tokio::sync::Mutex<Option<PayloadSender>>>,
60+
pub buffered_integrations: HashSet<Integration>,
61+
pub buffered_composer_paths: HashSet<PathBuf>,
5862
}
5963

6064
impl RuntimeInfo {

datadog-sidecar/src/service/sidecar_server.rs

Lines changed: 78 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -488,22 +488,37 @@ impl SidecarInterface for SidecarServer {
488488
match applications.entry(queue_id) {
489489
Entry::Occupied(mut entry) => {
490490
let value = entry.get_mut();
491+
492+
let to_process: Vec<SidecarAction> = actions
493+
.into_iter()
494+
.filter_map(|action| match &action {
495+
SidecarAction::Telemetry(TelemetryActions::AddIntegration(integration)) => {
496+
value.buffered_integrations.insert(integration.clone());
497+
None
498+
}
499+
SidecarAction::PhpComposerTelemetryFile(path) => {
500+
value.buffered_composer_paths.insert(path.clone());
501+
None
502+
}
503+
_ => Some(action),
504+
})
505+
.collect();
506+
491507
match value.app_or_actions {
492508
AppOrQueue::Inactive => {
493-
if is_stop_actions(&actions) {
509+
if is_stop_actions(&to_process) {
494510
entry.remove();
495511
} else {
496512
value.app_or_actions =
497-
AppOrQueue::Queue(EnqueuedTelemetryData::processed(actions));
513+
AppOrQueue::Queue(EnqueuedTelemetryData::processed(to_process));
498514
}
499515
}
500516
AppOrQueue::Queue(ref mut data) => {
501-
data.process(actions);
517+
data.process(to_process);
502518
}
503519
AppOrQueue::App(ref service_future) => {
504520
let service_future = service_future.clone();
505-
// drop on stop
506-
if actions.iter().any(|action| {
521+
if to_process.iter().any(|action| {
507522
matches!(
508523
action,
509524
SidecarAction::Telemetry(TelemetryActions::Lifecycle(
@@ -522,9 +537,10 @@ impl SidecarInterface for SidecarServer {
522537
return;
523538
};
524539
if let Some(mut app) = app_future.await {
525-
let actions =
526-
EnqueuedTelemetryData::process_immediately(actions, &mut app)
527-
.await;
540+
let actions = EnqueuedTelemetryData::process_immediately(
541+
to_process, &mut app,
542+
)
543+
.await;
528544
app.telemetry.send_msgs(actions).await.ok();
529545
}
530546
});
@@ -533,12 +549,29 @@ impl SidecarInterface for SidecarServer {
533549
}
534550
Entry::Vacant(entry) => {
535551
if !is_stop_actions(&actions) {
536-
entry.insert(ActiveApplication {
537-
app_or_actions: AppOrQueue::Queue(EnqueuedTelemetryData::processed(
538-
actions,
539-
)),
540-
..Default::default()
541-
});
552+
let mut new_app = ActiveApplication::default();
553+
554+
// Buffer actions for new app entry
555+
let to_process: Vec<SidecarAction> = actions
556+
.into_iter()
557+
.filter_map(|action| match &action {
558+
SidecarAction::Telemetry(TelemetryActions::AddIntegration(
559+
integration,
560+
)) => {
561+
new_app.buffered_integrations.insert(integration.clone());
562+
None
563+
}
564+
SidecarAction::PhpComposerTelemetryFile(path) => {
565+
new_app.buffered_composer_paths.insert(path.clone());
566+
None
567+
}
568+
_ => Some(action),
569+
})
570+
.collect();
571+
572+
new_app.app_or_actions =
573+
AppOrQueue::Queue(EnqueuedTelemetryData::processed(to_process));
574+
entry.insert(new_app);
542575
}
543576
}
544577
}
@@ -605,6 +638,18 @@ impl SidecarInterface for SidecarServer {
605638
None
606639
};
607640

641+
let (buffered_paths, buffered_integrations) = {
642+
let mut apps = rt_info.lock_applications();
643+
if let Some(app_data) = apps.get_mut(&queue_id) {
644+
(
645+
app_data.buffered_composer_paths.drain().collect::<Vec<_>>(),
646+
app_data.buffered_integrations.drain().collect::<Vec<_>>(),
647+
)
648+
} else {
649+
(vec![], vec![])
650+
}
651+
};
652+
608653
tokio::spawn(async move {
609654
if let Some(instance_future) = instance_future {
610655
let instance_option = match instance_future.await {
@@ -644,6 +689,25 @@ impl SidecarInterface for SidecarServer {
644689
}
645690

646691
if let Some(mut app) = manual_app_future.app_future.await {
692+
let mut flush_actions: Vec<TelemetryActions> = vec![];
693+
694+
for integration in buffered_integrations {
695+
flush_actions.push(TelemetryActions::AddIntegration(integration));
696+
}
697+
698+
for path in buffered_paths {
699+
for dep in EnqueuedTelemetryData::extract_composer_telemetry(path)
700+
.await
701+
.iter()
702+
{
703+
flush_actions.push(TelemetryActions::AddDependency(dep.clone()));
704+
}
705+
}
706+
707+
if !flush_actions.is_empty() {
708+
app.telemetry.send_msgs(flush_actions).await.ok();
709+
}
710+
647711
// Register metrics
648712
for metric in std::mem::take(&mut enqueued_data.metrics).into_iter() {
649713
app.register_metric(metric);

datadog-sidecar/src/service/telemetry/enqueued_telemetry_data.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ impl EnqueuedTelemetryData {
8282
SidecarAction::Telemetry(TelemetryActions::AddConfig(c)) => {
8383
self.configurations.insert(c)
8484
}
85-
SidecarAction::Telemetry(TelemetryActions::AddDependecy(d)) => {
85+
SidecarAction::Telemetry(TelemetryActions::AddDependency(d)) => {
8686
self.dependencies.insert(d)
8787
}
8888
SidecarAction::Telemetry(TelemetryActions::AddIntegration(i)) => {
@@ -124,11 +124,11 @@ impl EnqueuedTelemetryData {
124124
pub(crate) async fn extract_telemetry_actions(&mut self, actions: &mut Vec<TelemetryActions>) {
125125
for computed_deps in self.computed_dependencies.clone() {
126126
for d in computed_deps.await.iter() {
127-
actions.push(TelemetryActions::AddDependecy(d.clone()));
127+
actions.push(TelemetryActions::AddDependency(d.clone()));
128128
}
129129
}
130130
for d in self.dependencies.unflushed() {
131-
actions.push(TelemetryActions::AddDependecy(d.clone()));
131+
actions.push(TelemetryActions::AddDependency(d.clone()));
132132
}
133133
for c in self.configurations.unflushed() {
134134
actions.push(TelemetryActions::AddConfig(c.clone()));
@@ -159,7 +159,7 @@ impl EnqueuedTelemetryData {
159159
SidecarAction::Telemetry(t) => actions.push(t),
160160
SidecarAction::PhpComposerTelemetryFile(path) => {
161161
for nested in Self::extract_composer_telemetry(path).await.iter() {
162-
actions.push(TelemetryActions::AddDependecy(nested.clone()));
162+
actions.push(TelemetryActions::AddDependency(nested.clone()));
163163
}
164164
}
165165
SidecarAction::RegisterTelemetryMetric(metric) => app.register_metric(metric),
@@ -182,7 +182,7 @@ impl EnqueuedTelemetryData {
182182
/// # Returns
183183
///
184184
/// * A `ManualFuture` that resolves to an `Arc<Vec<data::Dependency>>>`.
185-
fn extract_composer_telemetry(path: PathBuf) -> ManualFuture<Arc<Vec<data::Dependency>>> {
185+
pub fn extract_composer_telemetry(path: PathBuf) -> ManualFuture<Arc<Vec<data::Dependency>>> {
186186
let (deps, completer) = ManualFuture::new();
187187
tokio::spawn(async {
188188
let mut cache = get_composer_cache().lock().await;

ddtelemetry/src/worker/mod.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ macro_rules! telemetry_worker_log {
7575
pub enum TelemetryActions {
7676
AddPoint((f64, ContextKey, Vec<Tag>)),
7777
AddConfig(data::Configuration),
78-
AddDependecy(Dependency),
78+
AddDependency(Dependency),
7979
AddIntegration(Integration),
8080
AddLog((LogIdentifier, Log)),
8181
Lifecycle(LifecycleAction),
@@ -286,7 +286,7 @@ impl TelemetryWorker {
286286
.schedule_event(LifecycleAction::FlushData)
287287
.unwrap();
288288
}
289-
AddConfig(_) | AddDependecy(_) | AddIntegration(_) | Lifecycle(ExtendedHeartbeat) => {}
289+
AddConfig(_) | AddDependency(_) | AddIntegration(_) | Lifecycle(ExtendedHeartbeat) => {}
290290
Lifecycle(Stop) => {
291291
if !self.data.started {
292292
return BREAK;
@@ -361,7 +361,7 @@ impl TelemetryWorker {
361361
self.data.started = true;
362362
}
363363
}
364-
AddDependecy(dep) => self.data.dependencies.insert(dep),
364+
AddDependency(dep) => self.data.dependencies.insert(dep),
365365
AddIntegration(integration) => self.data.integrations.insert(integration),
366366
AddConfig(cfg) => self.data.configurations.insert(cfg),
367367
AddLog((identifier, log)) => {
@@ -831,7 +831,10 @@ impl TelemetryWorkerHandle {
831831

832832
pub fn add_dependency(&self, name: String, version: Option<String>) -> Result<()> {
833833
self.sender
834-
.try_send(TelemetryActions::AddDependecy(Dependency { name, version }))?;
834+
.try_send(TelemetryActions::AddDependency(Dependency {
835+
name,
836+
version,
837+
}))?;
835838
Ok(())
836839
}
837840

0 commit comments

Comments
 (0)