Skip to content

Commit 4740a60

Browse files
Remove query params and use Endpoint path extension
1 parent 1fc655e commit 4740a60

File tree

3 files changed

+55
-122
lines changed

3 files changed

+55
-122
lines changed

data-pipeline/src/stats_exporter.rs

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ use tokio_util::sync::CancellationToken;
2121

2222
use crate::{span_concentrator::SpanConcentrator, trace_exporter::TracerMetadata};
2323

24-
const STATS_ENDPOINT_PATH: &str = "/v0.6/stats";
25-
2624
/// An exporter that concentrates and sends stats to the agent
2725
#[derive(Debug)]
2826
pub struct StatsExporter {
@@ -173,15 +171,6 @@ fn encode_stats_payload(
173171
}
174172
}
175173

176-
/// Return the stats endpoint url to send stats to the agent at `agent_url`
177-
pub fn stats_url_from_agent_url(agent_url: &str) -> anyhow::Result<hyper::Uri> {
178-
let mut parts = agent_url.parse::<hyper::Uri>()?.into_parts();
179-
parts.path_and_query = Some(hyper::http::uri::PathAndQuery::from_static(
180-
STATS_ENDPOINT_PATH,
181-
));
182-
Ok(hyper::Uri::from_parts(parts)?)
183-
}
184-
185174
#[cfg(test)]
186175
mod tests {
187176
use super::*;
@@ -195,6 +184,7 @@ mod tests {
195184
fn is_send<T: Send>() {}
196185
fn is_sync<T: Sync>() {}
197186

187+
const STATS_ENDPOINT: &str = "/v0.6/stats";
198188
const BUCKETS_DURATION: Duration = Duration::from_secs(10);
199189

200190
/// Fails to compile if stats exporter is not Send and Sync
@@ -261,7 +251,7 @@ mod tests {
261251
BUCKETS_DURATION,
262252
Arc::new(Mutex::new(get_test_concentrator())),
263253
get_test_metadata(),
264-
Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
254+
Endpoint::from_slice(&server.url(STATS_ENDPOINT)),
265255
CancellationToken::new(),
266256
);
267257

@@ -288,7 +278,7 @@ mod tests {
288278
BUCKETS_DURATION,
289279
Arc::new(Mutex::new(get_test_concentrator())),
290280
get_test_metadata(),
291-
Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
281+
Endpoint::from_slice(&server.url(STATS_ENDPOINT)),
292282
CancellationToken::new(),
293283
);
294284

@@ -320,7 +310,7 @@ mod tests {
320310
BUCKETS_DURATION,
321311
Arc::new(Mutex::new(get_test_concentrator())),
322312
get_test_metadata(),
323-
Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
313+
Endpoint::from_slice(&server.url(STATS_ENDPOINT)),
324314
CancellationToken::new(),
325315
);
326316

@@ -360,7 +350,7 @@ mod tests {
360350
buckets_duration,
361351
Arc::new(Mutex::new(get_test_concentrator())),
362352
get_test_metadata(),
363-
Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
353+
Endpoint::from_slice(&server.url(STATS_ENDPOINT)),
364354
cancellation_token.clone(),
365355
);
366356

data-pipeline/src/trace_exporter/mod.rs

Lines changed: 44 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::{
99
health_metrics, health_metrics::HealthMetric, span_concentrator::SpanConcentrator,
1010
stats_exporter,
1111
};
12+
use anyhow::Context;
1213
use arc_swap::{ArcSwap, ArcSwapOption};
1314
use bytes::Bytes;
1415
use datadog_trace_utils::msgpack_decoder::{self, decode::error::DecodeError};
@@ -24,13 +25,12 @@ use dogstatsd_client::{new, Client, DogStatsDAction};
2425
use either::Either;
2526
use error::BuilderErrorKind;
2627
use http_body_util::BodyExt;
27-
use hyper::http::uri::PathAndQuery;
28-
use hyper::{header::CONTENT_TYPE, Method, Uri};
28+
use hyper::{header::CONTENT_TYPE, Method};
2929
use log::{error, info};
3030
use std::io;
3131
use std::sync::{Arc, Mutex};
3232
use std::time::Duration;
33-
use std::{borrow::Borrow, collections::HashMap, str::FromStr, time};
33+
use std::{borrow::Borrow, collections::HashMap, time};
3434
use tokio::{runtime::Runtime, task::JoinHandle};
3535
use tokio_util::sync::CancellationToken;
3636

@@ -67,56 +67,12 @@ pub enum TraceExporterOutputFormat {
6767

6868
impl TraceExporterOutputFormat {
6969
/// Add the agent trace endpoint path to the URL.
70-
fn add_path(&self, url: &Uri) -> Uri {
71-
add_path(
72-
url,
73-
match self {
74-
TraceExporterOutputFormat::V04 => "/v0.4/traces",
75-
TraceExporterOutputFormat::V05 => "/v0.5/traces",
76-
},
77-
)
78-
}
79-
80-
#[cfg(feature = "test-utils")]
81-
// This function is only intended for testing purposes so we don't need to go to all the trouble
82-
// of breaking the uri down and parsing it as rigorously as we would if we were using it in
83-
// production code.
84-
fn add_query(&self, url: &Uri, query: &str) -> Uri {
85-
let url = format!("{}?{}", url, query);
86-
87-
// TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
88-
#[allow(clippy::expect_used)]
89-
Uri::from_str(&url).expect("Failed to create Uri from string")
90-
}
91-
}
92-
93-
/// Add a path to the URL.
94-
///
95-
/// # Arguments
96-
///
97-
/// * `url` - The URL to which the path is to be added.
98-
/// * `path` - The path to be added to the URL.
99-
fn add_path(url: &Uri, path: &str) -> Uri {
100-
let p_and_q = url.path_and_query();
101-
102-
#[allow(clippy::unwrap_used)]
103-
let new_p_and_q = match p_and_q {
104-
Some(pq) => {
105-
let p = pq.path();
106-
let mut p = p.strip_suffix('/').unwrap_or(p).to_owned();
107-
p.push_str(path);
108-
109-
PathAndQuery::from_str(p.as_str())
70+
fn as_path(&self) -> &str {
71+
match self {
72+
TraceExporterOutputFormat::V04 => "/v0.4/traces",
73+
TraceExporterOutputFormat::V05 => "/v0.5/traces",
11074
}
111-
None => PathAndQuery::from_str(path),
11275
}
113-
// TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
114-
.unwrap();
115-
let mut parts = url.clone().into_parts();
116-
parts.path_and_query = Some(new_p_and_q);
117-
// TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
118-
#[allow(clippy::unwrap_used)]
119-
Uri::from_parts(parts).unwrap()
12076
}
12177

12278
#[derive(Clone, Default, Debug)]
@@ -208,8 +164,6 @@ pub struct TraceExporter {
208164
agent_info: AgentInfoArc,
209165
previous_info_state: ArcSwapOption<String>,
210166
telemetry: Option<TelemetryClient>,
211-
#[cfg(feature = "test-utils")]
212-
query_params: Option<String>,
213167
}
214168

215169
impl TraceExporter {
@@ -331,7 +285,9 @@ impl TraceExporter {
331285
bucket_size,
332286
stats_concentrator.clone(),
333287
self.metadata.clone(),
334-
Endpoint::from_url(add_path(&self.endpoint.url, STATS_ENDPOINT)),
288+
self.endpoint
289+
.try_to_path(STATS_ENDPOINT)
290+
.context("failed to create Endpoint")?,
335291
cancellation_token.clone(),
336292
);
337293

@@ -442,23 +398,26 @@ impl TraceExporter {
442398
self.send_data_to_url(
443399
data,
444400
trace_count,
445-
self.output_format.add_path(&self.endpoint.url),
401+
self.endpoint
402+
.try_to_path(self.output_format.as_path())
403+
.map_err(|e| {
404+
TraceExporterError::Builder(BuilderErrorKind::InvalidUri(e.to_string()))
405+
})?,
446406
)
447407
}
448408

449409
fn send_data_to_url(
450410
&self,
451411
data: &[u8],
452412
trace_count: usize,
453-
uri: Uri,
413+
endpoint: Endpoint,
454414
) -> Result<String, TraceExporterError> {
455415
self.runtime.block_on(async {
456-
let mut req_builder = hyper::Request::builder()
457-
.uri(uri)
458-
.header(
459-
hyper::header::USER_AGENT,
460-
concat!("Tracer/", env!("CARGO_PKG_VERSION")),
461-
)
416+
// SAFETY: the user agent is a valid header value
417+
#[allow(clippy::unwrap_used)]
418+
let mut req_builder = endpoint
419+
.to_request_builder(concat!("Tracer/", env!("CARGO_PKG_VERSION")))
420+
.unwrap()
462421
.method(Method::POST);
463422

464423
let headers: HashMap<&'static str, String> = self.metadata.borrow().into();
@@ -656,10 +615,12 @@ impl TraceExporter {
656615
};
657616

658617
let chunks = payload.size();
659-
let endpoint = Endpoint {
660-
url: self.get_agent_url(),
661-
..self.endpoint.clone()
662-
};
618+
let endpoint = self
619+
.endpoint
620+
.try_to_path(self.output_format.as_path())
621+
.map_err(|e| {
622+
TraceExporterError::Builder(BuilderErrorKind::InvalidUri(e.to_string()))
623+
})?;
663624
let mut headers: HashMap<&str, String> = header_tags.into();
664625
headers.insert(DATADOG_SEND_REAL_HTTP_STATUS_STR, "1".to_string());
665626
headers.insert(DATADOG_TRACE_COUNT_STR, chunks.to_string());
@@ -759,18 +720,6 @@ impl TraceExporter {
759720
}
760721
})
761722
}
762-
763-
fn get_agent_url(&self) -> Uri {
764-
#[cfg(feature = "test-utils")]
765-
{
766-
if let Some(query) = &self.query_params {
767-
let url = self.output_format.add_path(&self.endpoint.url);
768-
return self.output_format.add_query(&url, query);
769-
}
770-
}
771-
772-
self.output_format.add_path(&self.endpoint.url)
773-
}
774723
}
775724

776725
const DEFAULT_AGENT_URL: &str = "http://127.0.0.1:8126";
@@ -801,9 +750,6 @@ pub struct TraceExporterBuilder {
801750
dogstatsd_url: Option<String>,
802751
client_computed_stats: bool,
803752
client_computed_top_level: bool,
804-
#[cfg(feature = "test-utils")]
805-
/// not supported in production, but useful for interacting with the test-agent
806-
query_params: Option<String>,
807753
// Stats specific fields
808754
/// A Some value enables stats-computation, None if it is disabled
809755
stats_bucket_size: Option<Duration>,
@@ -958,14 +904,6 @@ impl TraceExporterBuilder {
958904
self
959905
}
960906

961-
#[cfg(feature = "test-utils")]
962-
/// Set query parameters to be used in the URL when communicating with the test-agent. This is
963-
/// not supported in production as the real agent doesn't accept query params.
964-
pub fn set_query_params(&mut self, query_params: &str) -> &mut Self {
965-
self.query_params = Some(query_params.to_owned());
966-
self
967-
}
968-
969907
#[allow(missing_docs)]
970908
pub fn build(self) -> Result<TraceExporter, TraceExporterError> {
971909
if !Self::is_inputs_outputs_formats_compatible(self.input_format, self.output_format) {
@@ -992,11 +930,18 @@ impl TraceExporterBuilder {
992930
TraceExporterError::Builder(BuilderErrorKind::InvalidUri(e.to_string()))
993931
})?;
994932

933+
let endpoint = Endpoint {
934+
url: agent_url,
935+
..Default::default()
936+
};
937+
995938
let libdatadog_version = tag!("libdatadog_version", env!("CARGO_PKG_VERSION"));
996939
let mut stats = StatsComputationStatus::Disabled;
997940

998941
let info_fetcher = AgentInfoFetcher::new(
999-
Endpoint::from_url(add_path(&agent_url, INFO_ENDPOINT)),
942+
endpoint.try_to_path(INFO_ENDPOINT).map_err(|e| {
943+
TraceExporterError::Builder(BuilderErrorKind::InvalidUri(e.to_string()))
944+
})?,
1000945
Duration::from_secs(5 * 60),
1001946
);
1002947

@@ -1038,7 +983,7 @@ impl TraceExporterBuilder {
1038983
}
1039984

1040985
Ok(TraceExporter {
1041-
endpoint: Endpoint::from_url(agent_url),
986+
endpoint,
1042987
metadata: TracerMetadata {
1043988
tracer_version: self.tracer_version,
1044989
language_version: self.language_version,
@@ -1064,8 +1009,6 @@ impl TraceExporterBuilder {
10641009
agent_info,
10651010
previous_info_state: ArcSwapOption::new(None),
10661011
telemetry,
1067-
#[cfg(feature = "test-utils")]
1068-
query_params: self.query_params,
10691012
})
10701013
}
10711014

@@ -1132,8 +1075,10 @@ mod tests {
11321075

11331076
assert_eq!(
11341077
exporter
1135-
.output_format
1136-
.add_path(&exporter.endpoint.url)
1078+
.endpoint
1079+
.try_to_path(exporter.output_format.as_path())
1080+
.unwrap()
1081+
.url
11371082
.to_string(),
11381083
"http://192.168.1.1:8127/v0.4/traces"
11391084
);
@@ -1156,8 +1101,10 @@ mod tests {
11561101

11571102
assert_eq!(
11581103
exporter
1159-
.output_format
1160-
.add_path(&exporter.endpoint.url)
1104+
.endpoint
1105+
.try_to_path(exporter.output_format.as_path())
1106+
.unwrap()
1107+
.url
11611108
.to_string(),
11621109
"http://127.0.0.1:8126/v0.4/traces"
11631110
);

0 commit comments

Comments
 (0)