Skip to content

Commit c2c152a

Browse files
feat(data-pipeline): Remove add_path
Replaces the add_path methods by Endpoint new path extension capability.
1 parent 812a4d6 commit c2c152a

File tree

2 files changed

+58
-78
lines changed

2 files changed

+58
-78
lines changed

data-pipeline/src/stats_exporter.rs

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ use tokio_util::sync::CancellationToken;
2121

2222
use crate::{span_concentrator::SpanConcentrator, trace_exporter::TracerMetadata};
2323

24-
const STATS_ENDPOINT_PATH: &str = "/v0.6/stats";
25-
2624
/// An exporter that concentrates and sends stats to the agent
2725
#[derive(Debug)]
2826
pub struct StatsExporter {
@@ -173,15 +171,6 @@ fn encode_stats_payload(
173171
}
174172
}
175173

176-
/// Return the stats endpoint url to send stats to the agent at `agent_url`
177-
pub fn stats_url_from_agent_url(agent_url: &str) -> anyhow::Result<hyper::Uri> {
178-
let mut parts = agent_url.parse::<hyper::Uri>()?.into_parts();
179-
parts.path_and_query = Some(hyper::http::uri::PathAndQuery::from_static(
180-
STATS_ENDPOINT_PATH,
181-
));
182-
Ok(hyper::Uri::from_parts(parts)?)
183-
}
184-
185174
#[cfg(test)]
186175
mod tests {
187176
use super::*;
@@ -195,6 +184,7 @@ mod tests {
195184
fn is_send<T: Send>() {}
196185
fn is_sync<T: Sync>() {}
197186

187+
const STATS_ENDPOINT: &str = "/v0.6/stats";
198188
const BUCKETS_DURATION: Duration = Duration::from_secs(10);
199189

200190
/// Fails to compile if stats exporter is not Send and Sync
@@ -261,7 +251,7 @@ mod tests {
261251
BUCKETS_DURATION,
262252
Arc::new(Mutex::new(get_test_concentrator())),
263253
get_test_metadata(),
264-
Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
254+
Endpoint::from_slice(&server.url(STATS_ENDPOINT)),
265255
CancellationToken::new(),
266256
);
267257

@@ -288,7 +278,7 @@ mod tests {
288278
BUCKETS_DURATION,
289279
Arc::new(Mutex::new(get_test_concentrator())),
290280
get_test_metadata(),
291-
Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
281+
Endpoint::from_slice(&server.url(STATS_ENDPOINT)),
292282
CancellationToken::new(),
293283
);
294284

@@ -320,7 +310,7 @@ mod tests {
320310
BUCKETS_DURATION,
321311
Arc::new(Mutex::new(get_test_concentrator())),
322312
get_test_metadata(),
323-
Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
313+
Endpoint::from_slice(&server.url(STATS_ENDPOINT)),
324314
CancellationToken::new(),
325315
);
326316

@@ -360,7 +350,7 @@ mod tests {
360350
buckets_duration,
361351
Arc::new(Mutex::new(get_test_concentrator())),
362352
get_test_metadata(),
363-
Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
353+
Endpoint::from_slice(&server.url(STATS_ENDPOINT)),
364354
cancellation_token.clone(),
365355
);
366356

data-pipeline/src/trace_exporter/mod.rs

Lines changed: 53 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::{
99
health_metrics, health_metrics::HealthMetric, span_concentrator::SpanConcentrator,
1010
stats_exporter,
1111
};
12+
use anyhow::Context;
1213
use arc_swap::{ArcSwap, ArcSwapOption};
1314
use bytes::Bytes;
1415
use datadog_trace_utils::msgpack_decoder::{self, decode::error::DecodeError};
@@ -24,7 +25,6 @@ use dogstatsd_client::{new, Client, DogStatsDAction};
2425
use either::Either;
2526
use error::BuilderErrorKind;
2627
use http_body_util::BodyExt;
27-
use hyper::http::uri::PathAndQuery;
2828
use hyper::{header::CONTENT_TYPE, Method, Uri};
2929
use log::{error, info};
3030
use std::io;
@@ -66,15 +66,12 @@ pub enum TraceExporterOutputFormat {
6666
}
6767

6868
impl TraceExporterOutputFormat {
69-
/// Add the agent trace endpoint path to the URL.
70-
fn add_path(&self, url: &Uri) -> Uri {
71-
add_path(
72-
url,
73-
match self {
74-
TraceExporterOutputFormat::V04 => "/v0.4/traces",
75-
TraceExporterOutputFormat::V05 => "/v0.5/traces",
76-
},
77-
)
69+
/// Return the agent trace endpoint path.
70+
fn as_path(&self) -> &str {
71+
match self {
72+
TraceExporterOutputFormat::V04 => "/v0.4/traces",
73+
TraceExporterOutputFormat::V05 => "/v0.5/traces",
74+
}
7875
}
7976

8077
#[cfg(feature = "test-utils")]
@@ -90,35 +87,6 @@ impl TraceExporterOutputFormat {
9087
}
9188
}
9289

93-
/// Add a path to the URL.
94-
///
95-
/// # Arguments
96-
///
97-
/// * `url` - The URL to which the path is to be added.
98-
/// * `path` - The path to be added to the URL.
99-
fn add_path(url: &Uri, path: &str) -> Uri {
100-
let p_and_q = url.path_and_query();
101-
102-
#[allow(clippy::unwrap_used)]
103-
let new_p_and_q = match p_and_q {
104-
Some(pq) => {
105-
let p = pq.path();
106-
let mut p = p.strip_suffix('/').unwrap_or(p).to_owned();
107-
p.push_str(path);
108-
109-
PathAndQuery::from_str(p.as_str())
110-
}
111-
None => PathAndQuery::from_str(path),
112-
}
113-
// TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
114-
.unwrap();
115-
let mut parts = url.clone().into_parts();
116-
parts.path_and_query = Some(new_p_and_q);
117-
// TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
118-
#[allow(clippy::unwrap_used)]
119-
Uri::from_parts(parts).unwrap()
120-
}
121-
12290
#[derive(Clone, Default, Debug)]
12391
pub struct TracerMetadata {
12492
pub hostname: String,
@@ -331,7 +299,9 @@ impl TraceExporter {
331299
bucket_size,
332300
stats_concentrator.clone(),
333301
self.metadata.clone(),
334-
Endpoint::from_url(add_path(&self.endpoint.url, STATS_ENDPOINT)),
302+
self.endpoint
303+
.try_clone_with_subpath(STATS_ENDPOINT)
304+
.context("failed to create Endpoint")?,
335305
cancellation_token.clone(),
336306
);
337307

@@ -442,23 +412,26 @@ impl TraceExporter {
442412
self.send_data_to_url(
443413
data,
444414
trace_count,
445-
self.output_format.add_path(&self.endpoint.url),
415+
self.endpoint
416+
.try_clone_with_subpath(self.output_format.as_path())
417+
.map_err(|e| {
418+
TraceExporterError::Builder(BuilderErrorKind::InvalidUri(e.to_string()))
419+
})?,
446420
)
447421
}
448422

449423
fn send_data_to_url(
450424
&self,
451425
data: &[u8],
452426
trace_count: usize,
453-
uri: Uri,
427+
endpoint: Endpoint,
454428
) -> Result<String, TraceExporterError> {
455429
self.runtime.block_on(async {
456-
let mut req_builder = hyper::Request::builder()
457-
.uri(uri)
458-
.header(
459-
hyper::header::USER_AGENT,
460-
concat!("Tracer/", env!("CARGO_PKG_VERSION")),
461-
)
430+
// SAFETY: the user agent is a valid header value
431+
#[allow(clippy::unwrap_used)]
432+
let mut req_builder = endpoint
433+
.to_request_builder(concat!("Tracer/", env!("CARGO_PKG_VERSION")))
434+
.unwrap()
462435
.method(Method::POST);
463436

464437
let headers: HashMap<&'static str, String> = self.metadata.borrow().into();
@@ -656,10 +629,7 @@ impl TraceExporter {
656629
};
657630

658631
let chunks = payload.size();
659-
let endpoint = Endpoint {
660-
url: self.get_agent_url(),
661-
..self.endpoint.clone()
662-
};
632+
let endpoint = self.get_agent_endpoint();
663633
let mut headers: HashMap<&str, String> = header_tags.into();
664634
headers.insert(DATADOG_SEND_REAL_HTTP_STATUS_STR, "1".to_string());
665635
headers.insert(DATADOG_TRACE_COUNT_STR, chunks.to_string());
@@ -760,16 +730,23 @@ impl TraceExporter {
760730
})
761731
}
762732

763-
fn get_agent_url(&self) -> Uri {
733+
fn get_agent_endpoint(&self) -> Endpoint {
734+
// Safety: Output format only contains valid path
735+
#[allow(clippy::unwrap_used)]
736+
let endpoint = self
737+
.endpoint
738+
.try_clone_with_subpath(self.output_format.as_path())
739+
.unwrap();
764740
#[cfg(feature = "test-utils")]
765741
{
766742
if let Some(query) = &self.query_params {
767-
let url = self.output_format.add_path(&self.endpoint.url);
768-
return self.output_format.add_query(&url, query);
743+
return Endpoint {
744+
url: self.output_format.add_query(&endpoint.url, query),
745+
..endpoint
746+
};
769747
}
770748
}
771-
772-
self.output_format.add_path(&self.endpoint.url)
749+
endpoint
773750
}
774751
}
775752

@@ -992,11 +969,20 @@ impl TraceExporterBuilder {
992969
TraceExporterError::Builder(BuilderErrorKind::InvalidUri(e.to_string()))
993970
})?;
994971

972+
let endpoint = Endpoint {
973+
url: agent_url,
974+
..Default::default()
975+
};
976+
995977
let libdatadog_version = tag!("libdatadog_version", env!("CARGO_PKG_VERSION"));
996978
let mut stats = StatsComputationStatus::Disabled;
997979

998980
let info_fetcher = AgentInfoFetcher::new(
999-
Endpoint::from_url(add_path(&agent_url, INFO_ENDPOINT)),
981+
endpoint
982+
.try_clone_with_subpath(INFO_ENDPOINT)
983+
.map_err(|e| {
984+
TraceExporterError::Builder(BuilderErrorKind::InvalidUri(e.to_string()))
985+
})?,
1000986
Duration::from_secs(5 * 60),
1001987
);
1002988

@@ -1038,7 +1024,7 @@ impl TraceExporterBuilder {
10381024
}
10391025

10401026
Ok(TraceExporter {
1041-
endpoint: Endpoint::from_url(agent_url),
1027+
endpoint,
10421028
metadata: TracerMetadata {
10431029
tracer_version: self.tracer_version,
10441030
language_version: self.language_version,
@@ -1132,8 +1118,10 @@ mod tests {
11321118

11331119
assert_eq!(
11341120
exporter
1135-
.output_format
1136-
.add_path(&exporter.endpoint.url)
1121+
.endpoint
1122+
.try_clone_with_subpath(exporter.output_format.as_path())
1123+
.unwrap()
1124+
.url
11371125
.to_string(),
11381126
"http://192.168.1.1:8127/v0.4/traces"
11391127
);
@@ -1156,8 +1144,10 @@ mod tests {
11561144

11571145
assert_eq!(
11581146
exporter
1159-
.output_format
1160-
.add_path(&exporter.endpoint.url)
1147+
.endpoint
1148+
.try_clone_with_subpath(exporter.output_format.as_path())
1149+
.unwrap()
1150+
.url
11611151
.to_string(),
11621152
"http://127.0.0.1:8126/v0.4/traces"
11631153
);

0 commit comments

Comments
 (0)