Skip to content

Commit 540ea1e

Browse files
feat(data-pipeline): expose API to send spans without going through deserialization (#992)
# What does this PR do? This PR adds the following features: * A public API to send trace chunks through the trace exporter. This function takes a `Vec<Vec<Span<T>>` so we can rapidly iterate in ddtrace-rs on which datastructure we want to use to pass spans * A public API to wait for the /info endpoint to be ready. This is needed for deterministic tests with the test agent, because otherwise it's possible to not have stats during the first submission * A way to specify env vars when instantiating an apm test agent. We need to skip some meta keys in dd-trace-rs using the `SNAPSHOT_IGNORED_ATTRS` env, and to recreate the snapshots when running outside of the CI with the `SNAPSHOT_CI` env parameters # Motivation Working on integrating the trace-exporter in dd-trace-rs
1 parent 18c8773 commit 540ea1e

File tree

10 files changed

+206
-91
lines changed

10 files changed

+206
-91
lines changed

data-pipeline/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,6 @@ tokio = { version = "1.23", features = [
6161
"time",
6262
"test-util",
6363
], default-features = false }
64+
65+
[features]
66+
test-utils = []

data-pipeline/src/trace_exporter/mod.rs

Lines changed: 112 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use arc_swap::{ArcSwap, ArcSwapOption};
1313
use bytes::Bytes;
1414
use datadog_trace_utils::msgpack_decoder::{self, decode::error::DecodeError};
1515
use datadog_trace_utils::send_with_retry::{send_with_retry, RetryStrategy, SendWithRetryError};
16-
use datadog_trace_utils::span::SpanSlice;
16+
use datadog_trace_utils::span::{Span, SpanText};
1717
use datadog_trace_utils::trace_utils::{self, TracerHeaderTags};
1818
use datadog_trace_utils::tracer_payload;
1919
use ddcommon::header::{
@@ -199,51 +199,47 @@ pub struct TraceExporter {
199199
telemetry: Option<TelemetryClient>,
200200
}
201201

202+
enum DeserInputFormat {
203+
V04,
204+
V05,
205+
}
206+
202207
impl TraceExporter {
203208
#[allow(missing_docs)]
204209
pub fn builder() -> TraceExporterBuilder {
205210
TraceExporterBuilder::default()
206211
}
207212

208213
/// Send msgpack serialized traces to the agent
209-
#[allow(missing_docs)]
214+
///
215+
/// # Arguments
216+
///
217+
/// * data: A slice containing the serialized traces. This slice should be encoded following the
218+
/// input_format passed to the TraceExporter on creating.
219+
/// * trace_count: The number of traces in the data
220+
///
221+
/// # Returns
222+
/// * Ok(AgentResponse): The response from the agent
223+
/// * Err(TraceExporterError): An error detailling what went wrong in the process
210224
pub fn send(
211225
&self,
212226
data: &[u8],
213227
trace_count: usize,
214228
) -> Result<AgentResponse, TraceExporterError> {
215229
self.check_agent_info();
216230

217-
match self.input_format {
231+
let res = match self.input_format {
218232
TraceExporterInputFormat::Proxy => self.send_proxy(data.as_ref(), trace_count),
219-
TraceExporterInputFormat::V04 => match msgpack_decoder::v04::from_slice(data) {
220-
Ok((traces, _)) => self.send_deser_ser(traces),
221-
Err(e) => Err(TraceExporterError::Deserialization(e)),
222-
},
223-
TraceExporterInputFormat::V05 => match msgpack_decoder::v05::from_slice(data) {
224-
Ok((traces, _)) => self.send_deser_ser(traces),
225-
Err(e) => Err(TraceExporterError::Deserialization(e)),
226-
},
233+
TraceExporterInputFormat::V04 => self.send_deser(data, DeserInputFormat::V04),
234+
TraceExporterInputFormat::V05 => self.send_deser(data, DeserInputFormat::V05),
235+
}?;
236+
if res.is_empty() {
237+
return Err(TraceExporterError::Agent(
238+
error::AgentErrorKind::EmptyResponse,
239+
));
227240
}
228-
.and_then(|res| {
229-
if res.is_empty() {
230-
return Err(TraceExporterError::Agent(
231-
error::AgentErrorKind::EmptyResponse,
232-
));
233-
}
234241

235-
Ok(AgentResponse::from(res))
236-
})
237-
.map_err(|err| {
238-
if let TraceExporterError::Deserialization(ref e) = err {
239-
error!("Error deserializing trace from request body: {e}");
240-
self.emit_metric(
241-
HealthMetric::Count(health_metrics::STAT_DESER_TRACES_ERRORS, 1),
242-
None,
243-
);
244-
}
245-
err
246-
})
242+
Ok(AgentResponse::from(res))
247243
}
248244

249245
/// Safely shutdown the TraceExporter and all related tasks
@@ -425,6 +421,35 @@ impl TraceExporter {
425421
}
426422
}
427423

424+
/// !!! This function is only for testing purposes !!!
425+
///
426+
/// Waits the agent info to be ready by checking the agent_info state.
427+
/// It will only return Ok after the agent info has been fetched at least once or Err if timeout
428+
/// has been reached
429+
///
430+
/// In production:
431+
/// 1) We should not synchronously wait for this to be ready before sending traces
432+
/// 2) It's not guaranteed to not block forever, since the /info endpoint might not be
433+
/// available.
434+
///
435+
/// The `send`` function will check agent_info when running, which will only be available if the
436+
/// fetcher had time to reach to the agent.
437+
/// Since agent_info can enable CSS computation, waiting for this during testing can make
438+
/// snapshots non-determinitic.
439+
#[cfg(feature = "test-utils")]
440+
pub fn wait_agent_info_ready(&self, timeout: Duration) -> anyhow::Result<()> {
441+
let start = std::time::Instant::now();
442+
loop {
443+
if std::time::Instant::now().duration_since(start) > timeout {
444+
anyhow::bail!("Timeout waiting for agent info to be ready",);
445+
}
446+
if self.agent_info.load().is_some() {
447+
return Ok(());
448+
}
449+
std::thread::sleep(Duration::from_millis(10));
450+
}
451+
}
452+
428453
fn send_proxy(&self, data: &[u8], trace_count: usize) -> Result<String, TraceExporterError> {
429454
self.send_data_to_url(
430455
data,
@@ -546,42 +571,79 @@ impl TraceExporter {
546571
/// Add all spans from the given iterator into the stats concentrator
547572
/// # Panic
548573
/// Will panic if another thread panicked will holding the lock on `stats_concentrator`
549-
fn add_spans_to_stats(&self, traces: &[Vec<SpanSlice>]) {
550-
if let StatsComputationStatus::Enabled {
551-
stats_concentrator,
552-
cancellation_token: _,
553-
exporter_handle: _,
554-
} = &**self.client_side_stats.load()
555-
{
556-
#[allow(clippy::unwrap_used)]
557-
let mut stats_concentrator = stats_concentrator.lock().unwrap();
558-
559-
let spans = traces.iter().flat_map(|trace| trace.iter());
560-
for span in spans {
561-
stats_concentrator.add_span(span);
562-
}
574+
fn add_spans_to_stats<T: SpanText>(
575+
&self,
576+
stats_concentrator: &Mutex<SpanConcentrator>,
577+
traces: &[Vec<Span<T>>],
578+
) {
579+
#[allow(clippy::unwrap_used)]
580+
let mut stats_concentrator = stats_concentrator.lock().unwrap();
581+
582+
let spans = traces.iter().flat_map(|trace| trace.iter());
583+
for span in spans {
584+
stats_concentrator.add_span(span);
563585
}
564586
}
565587

566-
fn send_deser_ser(
588+
/// Send a list of trace chunks to the agent
589+
///
590+
/// # Arguments
591+
/// * trace_chunks: A list of trace chunks. Each trace chunk is a list of spans.
592+
///
593+
/// # Returns
594+
/// * Ok(String): The response from the agent
595+
/// * Err(TraceExporterError): An error detailing what went wrong in the process
596+
pub fn send_trace_chunks<T: SpanText>(
597+
&self,
598+
trace_chunks: Vec<Vec<Span<T>>>,
599+
) -> Result<String, TraceExporterError> {
600+
self.check_agent_info();
601+
self.send_trace_chunks_inner(trace_chunks)
602+
}
603+
604+
/// Deserializes, processes and sends trace chunks to the agent
605+
fn send_deser(
567606
&self,
568-
mut traces: Vec<Vec<SpanSlice>>,
607+
data: &[u8],
608+
format: DeserInputFormat,
569609
) -> Result<String, TraceExporterError> {
610+
let (traces, _) = match format {
611+
DeserInputFormat::V04 => msgpack_decoder::v04::from_slice(data),
612+
DeserInputFormat::V05 => msgpack_decoder::v05::from_slice(data),
613+
}
614+
.map_err(|e| {
615+
error!("Error deserializing trace from request body: {e}");
616+
self.emit_metric(
617+
HealthMetric::Count(health_metrics::STAT_DESER_TRACES_ERRORS, 1),
618+
None,
619+
);
620+
TraceExporterError::Deserialization(e)
621+
})?;
570622
self.emit_metric(
571623
HealthMetric::Count(health_metrics::STAT_DESER_TRACES, traces.len() as i64),
572624
None,
573625
);
574626

627+
self.send_trace_chunks_inner(traces)
628+
}
629+
630+
fn send_trace_chunks_inner<T: SpanText>(
631+
&self,
632+
mut traces: Vec<Vec<Span<T>>>,
633+
) -> Result<String, TraceExporterError> {
575634
let mut header_tags: TracerHeaderTags = self.metadata.borrow().into();
576635

577636
// Stats computation
578-
if let StatsComputationStatus::Enabled { .. } = &**self.client_side_stats.load() {
637+
if let StatsComputationStatus::Enabled {
638+
stats_concentrator, ..
639+
} = &**self.client_side_stats.load()
640+
{
579641
if !self.client_computed_top_level {
580642
for chunk in traces.iter_mut() {
581643
datadog_trace_utils::span::trace_utils::compute_top_level_span(chunk);
582644
}
583645
}
584-
self.add_spans_to_stats(&traces);
646+
self.add_spans_to_stats(stats_concentrator, &traces);
585647
// Once stats have been computed we can drop all chunks that are not going to be
586648
// sampled by the agent
587649
let datadog_trace_utils::span::trace_utils::DroppedP0Stats {
@@ -597,18 +659,9 @@ impl TraceExporter {
597659
header_tags.dropped_p0_spans = dropped_p0_spans;
598660
}
599661

600-
let use_v05_format = match (self.input_format, self.output_format) {
601-
(TraceExporterInputFormat::V04, TraceExporterOutputFormat::V04) => false,
602-
(TraceExporterInputFormat::V04, TraceExporterOutputFormat::V05)
603-
| (TraceExporterInputFormat::V05, TraceExporterOutputFormat::V05) => true,
604-
(TraceExporterInputFormat::V05, TraceExporterOutputFormat::V04) => {
605-
// TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
606-
unreachable!("Conversion from v05 to v04 not implemented")
607-
}
608-
(TraceExporterInputFormat::Proxy, _) => {
609-
// TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
610-
unreachable!("Codepath invalid for proxy mode",)
611-
}
662+
let use_v05_format = match self.output_format {
663+
TraceExporterOutputFormat::V05 => true,
664+
TraceExporterOutputFormat::V04 => false,
612665
};
613666
let payload = trace_utils::collect_trace_chunks(traces, use_v05_format).map_err(|e| {
614667
TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string()))

data-pipeline/tests/test_fetch_info.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ mod tracing_integration_tests {
1212
#[cfg_attr(miri, ignore)]
1313
#[tokio::test]
1414
async fn test_fetch_info_from_test_agent() {
15-
let test_agent = DatadogTestAgent::new(None, None).await;
15+
let test_agent = DatadogTestAgent::new(None, None, &[]).await;
1616
let endpoint = Endpoint::from_url(test_agent.get_uri_for_endpoint("info", None).await);
1717
let info = fetch_info(&endpoint)
1818
.await
@@ -28,7 +28,7 @@ mod tracing_integration_tests {
2828
#[cfg_attr(miri, ignore)]
2929
#[tokio::test]
3030
async fn test_agent_info_fetcher_with_test_agent() {
31-
let test_agent = DatadogTestAgent::new(None, None).await;
31+
let test_agent = DatadogTestAgent::new(None, None, &[]).await;
3232
let endpoint = Endpoint::from_url(test_agent.get_uri_for_endpoint("info", None).await);
3333
let fetcher = AgentInfoFetcher::new(endpoint, Duration::from_secs(1));
3434
let info_arc = fetcher.get_info();

data-pipeline/tests/test_trace_exporter.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,6 @@ mod tracing_integration_tests {
99
use datadog_trace_utils::test_utils::datadog_test_agent::DatadogTestAgent;
1010
use datadog_trace_utils::test_utils::{create_test_json_span, create_test_v05_span};
1111
use serde_json::json;
12-
#[cfg(target_os = "linux")]
13-
use std::fs::Permissions;
14-
#[cfg(target_os = "linux")]
15-
use std::os::unix::fs::PermissionsExt;
1612
use tokio::task;
1713

1814
fn get_v04_trace_snapshot_test_payload(name_prefix: &str) -> Vec<u8> {
@@ -105,7 +101,7 @@ mod tracing_integration_tests {
105101
async fn compare_v04_trace_snapshot_test() {
106102
let relative_snapshot_path = "data-pipeline/tests/snapshots/";
107103
let snapshot_name = "compare_exporter_v04_trace_snapshot_test";
108-
let test_agent = DatadogTestAgent::new(Some(relative_snapshot_path), None).await;
104+
let test_agent = DatadogTestAgent::new(Some(relative_snapshot_path), None, &[]).await;
109105
let url = test_agent.get_base_uri().await;
110106
let rate_param = "{\"service:test,env:test_env\": 0.5, \"service:test2,env:prod\": 0.2}";
111107
test_agent
@@ -154,7 +150,7 @@ mod tracing_integration_tests {
154150
async fn compare_v04_to_v05_trace_snapshot_test() {
155151
let relative_snapshot_path = "data-pipeline/tests/snapshots/";
156152
let snapshot_name = "compare_exporter_v04_to_v05_trace_snapshot_test";
157-
let test_agent = DatadogTestAgent::new(Some(relative_snapshot_path), None).await;
153+
let test_agent = DatadogTestAgent::new(Some(relative_snapshot_path), None, &[]).await;
158154
let url = test_agent.get_base_uri().await;
159155
let rate_param = "{\"service:test,env:test_env\": 0.5, \"service:test2,env:prod\": 0.2}";
160156
test_agent
@@ -197,7 +193,7 @@ mod tracing_integration_tests {
197193
async fn compare_v05_trace_snapshot_test() {
198194
let relative_snapshot_path = "data-pipeline/tests/snapshots/";
199195
let snapshot_name = "compare_exporter_v05_trace_snapshot_test";
200-
let test_agent = DatadogTestAgent::new(Some(relative_snapshot_path), None).await;
196+
let test_agent = DatadogTestAgent::new(Some(relative_snapshot_path), None, &[]).await;
201197
let url = test_agent.get_base_uri().await;
202198
let rate_param = "{\"service:test,env:test_env\": 0.5, \"service:test2,env:prod\": 0.2}";
203199
test_agent
@@ -243,6 +239,9 @@ mod tracing_integration_tests {
243239
// assign unique names to the spans and instantiate a unique session for the test to avoid flaky
244240
// behavior when running on CI
245241
async fn uds_snapshot_test() {
242+
use std::fs::Permissions;
243+
use std::os::unix::fs::PermissionsExt;
244+
246245
let relative_snapshot_path = "data-pipeline/tests/snapshots/";
247246
let snapshot_name = "compare_exporter_v04_trace_snapshot_uds_test";
248247
// Create a temporary directory for the socket to be mounted in the test agent container
@@ -266,6 +265,7 @@ mod tracing_integration_tests {
266265
let test_agent = DatadogTestAgent::new(
267266
Some(relative_snapshot_path),
268267
Some(&absolute_socket_dir_path),
268+
&[],
269269
)
270270
.await;
271271

datadog-trace-utils/src/span/mod.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,21 @@ impl FromStr for SpanKey {
6363
/// Trait representing the requirements for a type to be used as a Span "string" type.
6464
/// Note: Borrow<str> is not required by the derived traits, but allows to access HashMap elements
6565
/// from a static str and check if the string is empty.
66-
pub trait SpanText: Eq + Hash + Borrow<str> + Serialize + Default + Clone {}
67-
/// Implement the SpanText trait for any type which satisfies the sub traits.
68-
impl<T: Eq + Hash + Borrow<str> + Serialize + Default + Clone> SpanText for T {}
66+
pub trait SpanText: Eq + Hash + Borrow<str> + Serialize + Default + Clone {
67+
fn from_static_str(value: &'static str) -> Self;
68+
}
69+
70+
impl SpanText for &str {
71+
fn from_static_str(value: &'static str) -> Self {
72+
value
73+
}
74+
}
75+
76+
impl SpanText for BytesString {
77+
fn from_static_str(value: &'static str) -> Self {
78+
BytesString::from_static(value)
79+
}
80+
}
6981

7082
/// Checks if the `value` represents an empty string. Used to skip serializing empty strings
7183
/// with serde.

datadog-trace-utils/src/span/trace_utils.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@ const TRACER_TOP_LEVEL_KEY: &str = "_dd.top_level";
1313
const MEASURED_KEY: &str = "_dd.measured";
1414
const PARTIAL_VERSION_KEY: &str = "_dd.partial_version";
1515

16-
fn set_top_level_span<'a, T>(span: &mut Span<T>, is_top_level: bool)
16+
fn set_top_level_span<T>(span: &mut Span<T>, is_top_level: bool)
1717
where
18-
T: SpanText + From<&'a str>,
18+
T: SpanText,
1919
{
2020
if is_top_level {
21-
span.metrics.insert(TOP_LEVEL_KEY.into(), 1.0);
21+
span.metrics.insert(T::from_static_str(TOP_LEVEL_KEY), 1.0);
2222
} else {
2323
span.metrics.remove(TOP_LEVEL_KEY);
2424
}
@@ -30,9 +30,9 @@ where
3030
/// - OR its parent is unknown (other part of the code, distributed trace)
3131
/// - OR its parent belongs to another service (in that case it's a "local root" being the highest
3232
/// ancestor of other spans belonging to this service and attached to it).
33-
pub fn compute_top_level_span<'a, T>(trace: &mut [Span<T>])
33+
pub fn compute_top_level_span<T>(trace: &mut [Span<T>])
3434
where
35-
T: SpanText + Clone + From<&'a str>,
35+
T: SpanText,
3636
{
3737
let mut span_id_to_service: HashMap<u64, T> = HashMap::new();
3838
for span in trace.iter() {
@@ -102,7 +102,7 @@ const SAMPLING_ANALYTICS_RATE_KEY: &str = "_dd1.sr.eausr";
102102
/// dropped and the latter to the spans dropped.
103103
pub fn drop_chunks<T>(traces: &mut Vec<Vec<Span<T>>>) -> DroppedP0Stats
104104
where
105-
T: SpanText + Default,
105+
T: SpanText,
106106
{
107107
let mut dropped_p0_traces = 0;
108108
let mut dropped_p0_spans = 0;

0 commit comments

Comments
 (0)