Skip to content

Commit c305218

Browse files
committed
In-process log submission in sidecar (for appsec helper)
1 parent 7dfc1e2 commit c305218

File tree

13 files changed

+396
-3
lines changed

13 files changed

+396
-3
lines changed

datadog-sidecar/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ datadog-remote-config = { path = "../datadog-remote-config", features = ["test"]
104104
datadog-trace-utils = { path = "../datadog-trace-utils", features = ["test-utils"] }
105105
datadog-dynamic-configuration = { path = "../datadog-dynamic-configuration", features = ["test"] }
106106

107+
[build-dependencies]
108+
cbindgen = "0.26"
109+
107110
[lints.rust]
108111
unexpected_cfgs = { level = "warn", check-cfg = [
109112
'cfg(tokio_taskdump,windows_seh_wrapper)',

datadog-sidecar/cbindgen.toml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
language = "C"
5+
6+
include_guard = "DDOG_TELEMETRY_API_H"
7+
autogen_warning = "/* Warning, this file is autogenerated by cbindgen. Don't modify this manually. */"
8+
9+
[export]
10+
item_types = ["functions", "structs", "enums"]

datadog-sidecar/include/sidecar_ffi.h

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
#ifndef DDOG_TELEMETRY_API_H
2+
#define DDOG_TELEMETRY_API_H
3+
4+
/* Warning, this file is autogenerated by cbindgen. Don't modify this manually. */
5+
6+
#include <stdarg.h>
7+
#include <stdbool.h>
8+
#include <stdint.h>
9+
#include <stdlib.h>
10+
11+
typedef enum CLogLevel {
12+
Error = 1,
13+
Warn = 2,
14+
Debug = 3,
15+
} CLogLevel;
16+
17+
typedef enum FfiError {
18+
Ok = 0,
19+
PointerNull = 1,
20+
OperationFailed = 2,
21+
InvalidUtf8 = 3,
22+
QueueFull = 4,
23+
} FfiError;
24+
25+
typedef struct FfiString {
26+
const char *ptr;
27+
uintptr_t len;
28+
} FfiString;
29+
30+
/**
31+
* Enqueues a telemetry log action to be processed internally.
32+
* Non-blocking. Logs might be dropped if the internal queue is full.
33+
*
34+
* # Safety
35+
* Pointers must be valid, strings must be null-terminated if not null.
36+
*/
37+
enum FfiError ddog_sidecar_enqueue_telemetry_log(struct FfiString session_id_ffi,
38+
struct FfiString runtime_id_ffi,
39+
uint64_t queue_id,
40+
struct FfiString identifier_ffi,
41+
enum CLogLevel level,
42+
struct FfiString message_ffi,
43+
struct FfiString *stack_trace_ffi,
44+
struct FfiString *tags_ffi,
45+
bool is_sensitive);
46+
47+
#endif /* DDOG_TELEMETRY_API_H */

datadog-sidecar/src/entry.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::setup::{self, IpcClient, IpcServer, Liaison};
2727

2828
use crate::config::{self, Config};
2929
use crate::self_telemetry::self_telemetry;
30+
use crate::service::telemetry_action_receiver_task;
3031
use crate::tracer::get_shm_limiter;
3132
use crate::watchdog::Watchdog;
3233
use crate::{ddog_daemon_entry_point, setup_daemon_process};
@@ -91,6 +92,9 @@ where
9192
drop(get_shm_limiter().lock());
9293

9394
let server = SidecarServer::default();
95+
96+
tokio::spawn(telemetry_action_receiver_task(server.clone()));
97+
9498
let (shutdown_complete_tx, shutdown_complete_rx) = mpsc::channel::<()>(1);
9599

96100
let watchdog_handle =

datadog-sidecar/src/service/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ pub(crate) mod sidecar_server;
4444
mod telemetry;
4545
pub(crate) mod tracing;
4646

47+
pub(crate) use telemetry::telemetry_action_receiver_task;
48+
4749
#[derive(Clone, Debug, Serialize, Deserialize)]
4850
pub struct SessionConfig {
4951
pub endpoint: Endpoint,

datadog-sidecar/src/service/queue_id.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize};
99
#[derive(Default, Copy, Clone, Hash, PartialEq, Eq, Debug, Serialize, Deserialize)]
1010
#[repr(transparent)]
1111
pub struct QueueId {
12-
inner: u64,
12+
pub(crate) inner: u64,
1313
}
1414

1515
impl QueueId {

datadog-sidecar/src/service/sidecar_server.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ pub struct SidecarServer {
109109
pub self_telemetry_config:
110110
Arc<Mutex<Option<ManualFutureCompleter<ddtelemetry::config::Config>>>>,
111111
/// Keeps track of the number of submitted payloads.
112-
pub submitted_payloads: Arc<AtomicU64>,
112+
pub(crate) submitted_payloads: Arc<AtomicU64>,
113113
/// All tracked agent infos per endpoint
114114
pub agent_infos: AgentInfos,
115115
/// All remote config handling
@@ -224,7 +224,7 @@ impl SidecarServer {
224224
}
225225
}
226226

227-
fn get_session(&self, session_id: &String) -> SessionInfo {
227+
pub(crate) fn get_session(&self, session_id: &String) -> SessionInfo {
228228
let mut sessions = self.sessions.lock_or_panic();
229229
match sessions.get(session_id) {
230230
Some(session) => session.clone(),
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
#![allow(clippy::missing_safety_doc)]
4+
5+
use crate::service::{InstanceId, QueueId};
6+
use ddtelemetry::data::{Log as DDLog, LogLevel as DDLogLevel};
7+
use ddtelemetry::worker::{LogIdentifier, TelemetryActions};
8+
use std::collections::hash_map::DefaultHasher;
9+
use std::ffi::CStr;
10+
use std::hash::{Hash, Hasher};
11+
use std::ptr::NonNull;
12+
use tokio::sync::mpsc;
13+
use tracing::{error, warn};
14+
15+
use super::{get_telemetry_action_sender, InternalTelemetryActions};
16+
17+
#[repr(C)]
18+
pub enum FfiError {
19+
Ok = 0,
20+
PointerNull = 1,
21+
OperationFailed = 2,
22+
InvalidUtf8 = 3,
23+
QueueFull = 4,
24+
}
25+
26+
#[repr(C)]
27+
pub enum CLogLevel {
28+
Error = 1,
29+
Warn = 2,
30+
Debug = 3,
31+
}
32+
33+
impl From<CLogLevel> for DDLogLevel {
34+
fn from(level: CLogLevel) -> Self {
35+
match level {
36+
CLogLevel::Error => DDLogLevel::Error,
37+
CLogLevel::Warn => DDLogLevel::Warn,
38+
CLogLevel::Debug => DDLogLevel::Debug,
39+
}
40+
}
41+
}
42+
43+
#[repr(C)]
44+
pub struct FfiString {
45+
ptr: *const libc::c_char,
46+
len: usize,
47+
}
48+
49+
impl FfiString {
50+
fn is_empty(&self) -> bool {
51+
self.len == 0 || self.ptr.is_null()
52+
}
53+
}
54+
55+
impl TryFrom<&FfiString> for String {
56+
type Error = FfiError;
57+
58+
fn try_from(vref: &FfiString) -> Result<Self, Self::Error> {
59+
if vref.ptr.is_null() {
60+
return Err(FfiError::PointerNull);
61+
}
62+
let cstr = unsafe {
63+
CStr::from_ptr(vref.ptr)
64+
.to_str()
65+
.map_err(|_| FfiError::InvalidUtf8)
66+
}?;
67+
Ok(cstr.to_string())
68+
}
69+
}
70+
71+
impl TryFrom<FfiString> for String {
72+
type Error = FfiError;
73+
74+
fn try_from(value: FfiString) -> Result<Self, Self::Error> {
75+
String::try_from(&value)
76+
}
77+
}
78+
79+
/// Enqueues a telemetry log action to be processed internally.
80+
/// Non-blocking. Logs might be dropped if the internal queue is full.
81+
///
82+
/// # Safety
83+
/// Pointers must be valid, strings must be null-terminated if not null.
84+
#[no_mangle]
85+
pub unsafe extern "C" fn ddog_sidecar_enqueue_telemetry_log(
86+
session_id_ffi: FfiString,
87+
runtime_id_ffi: FfiString,
88+
queue_id: u64,
89+
identifier_ffi: FfiString,
90+
level: CLogLevel,
91+
message_ffi: FfiString,
92+
stack_trace_ffi: Option<NonNull<FfiString>>,
93+
tags_ffi: Option<NonNull<FfiString>>,
94+
is_sensitive: bool,
95+
) -> FfiError {
96+
match ddog_sidecar_enqueue_telemetry_log_result(
97+
session_id_ffi,
98+
runtime_id_ffi,
99+
queue_id,
100+
identifier_ffi,
101+
level,
102+
message_ffi,
103+
stack_trace_ffi,
104+
tags_ffi,
105+
is_sensitive,
106+
) {
107+
Ok(result) => result,
108+
Err(e) => e,
109+
}
110+
}
111+
112+
#[allow(clippy::too_many_arguments)]
113+
fn ddog_sidecar_enqueue_telemetry_log_result(
114+
session_id_ffi: FfiString,
115+
runtime_id_ffi: FfiString,
116+
queue_id: u64,
117+
identifier_ffi: FfiString,
118+
level: CLogLevel,
119+
message_ffi: FfiString,
120+
stack_trace_ffi: Option<NonNull<FfiString>>,
121+
tags_ffi: Option<NonNull<FfiString>>,
122+
is_sensitive: bool,
123+
) -> Result<FfiError, FfiError> {
124+
if session_id_ffi.is_empty()
125+
|| runtime_id_ffi.is_empty()
126+
|| queue_id == 0
127+
|| identifier_ffi.is_empty()
128+
|| message_ffi.is_empty()
129+
{
130+
return Err(FfiError::PointerNull);
131+
}
132+
133+
let sender = match get_telemetry_action_sender() {
134+
Ok(s) => s,
135+
Err(e) => {
136+
error!("Failed to get telemetry action sender: {}", e);
137+
return Err(FfiError::OperationFailed);
138+
}
139+
};
140+
141+
let instance_id = InstanceId::new(
142+
String::try_from(session_id_ffi)?,
143+
String::try_from(runtime_id_ffi)?,
144+
);
145+
let queue_id = QueueId { inner: queue_id };
146+
let identifier: String = identifier_ffi.try_into()?;
147+
let message: String = message_ffi.try_into()?;
148+
149+
let stack_trace = stack_trace_ffi
150+
.map(|s| String::try_from(unsafe { s.as_ref() }))
151+
.transpose()?;
152+
let tags: Option<String> = tags_ffi
153+
.map(|s| String::try_from(unsafe { s.as_ref() }))
154+
.transpose()?;
155+
156+
let mut hasher = DefaultHasher::new();
157+
identifier.hash(&mut hasher);
158+
let log_id = LogIdentifier {
159+
indentifier: hasher.finish(),
160+
};
161+
162+
let log_data = DDLog {
163+
message,
164+
level: level.into(),
165+
stack_trace,
166+
count: 1,
167+
tags: tags.unwrap_or("".into()),
168+
is_sensitive,
169+
};
170+
let log_action = TelemetryActions::AddLog((log_id, log_data));
171+
172+
let msg = InternalTelemetryActions {
173+
instance_id,
174+
queue_id,
175+
actions: vec![log_action],
176+
};
177+
178+
match sender.try_send(msg) {
179+
Ok(_) => Ok(FfiError::Ok),
180+
Err(mpsc::error::TrySendError::Full(_)) => {
181+
warn!("Telemetry action queue full. Action dropped.");
182+
Err(FfiError::QueueFull)
183+
}
184+
Err(mpsc::error::TrySendError::Closed(_)) => {
185+
error!("Telemetry action receiver closed.");
186+
Err(FfiError::OperationFailed)
187+
}
188+
}
189+
}

0 commit comments

Comments
 (0)