Skip to content

Allow to extend Endpoint path #1008

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 5 additions & 15 deletions data-pipeline/src/stats_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ use tokio_util::sync::CancellationToken;

use crate::{span_concentrator::SpanConcentrator, trace_exporter::TracerMetadata};

const STATS_ENDPOINT_PATH: &str = "/v0.6/stats";

/// An exporter that concentrates and sends stats to the agent
#[derive(Debug)]
pub struct StatsExporter {
Expand Down Expand Up @@ -173,15 +171,6 @@ fn encode_stats_payload(
}
}

/// Return the stats endpoint url to send stats to the agent at `agent_url`
pub fn stats_url_from_agent_url(agent_url: &str) -> anyhow::Result<hyper::Uri> {
let mut parts = agent_url.parse::<hyper::Uri>()?.into_parts();
parts.path_and_query = Some(hyper::http::uri::PathAndQuery::from_static(
STATS_ENDPOINT_PATH,
));
Ok(hyper::Uri::from_parts(parts)?)
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -195,6 +184,7 @@ mod tests {
fn is_send<T: Send>() {}
fn is_sync<T: Sync>() {}

const STATS_ENDPOINT: &str = "/v0.6/stats";
const BUCKETS_DURATION: Duration = Duration::from_secs(10);

/// Fails to compile if stats exporter is not Send and Sync
Expand Down Expand Up @@ -261,7 +251,7 @@ mod tests {
BUCKETS_DURATION,
Arc::new(Mutex::new(get_test_concentrator())),
get_test_metadata(),
Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
Endpoint::from_slice(&server.url(STATS_ENDPOINT)),
CancellationToken::new(),
);

Expand All @@ -288,7 +278,7 @@ mod tests {
BUCKETS_DURATION,
Arc::new(Mutex::new(get_test_concentrator())),
get_test_metadata(),
Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
Endpoint::from_slice(&server.url(STATS_ENDPOINT)),
CancellationToken::new(),
);

Expand Down Expand Up @@ -320,7 +310,7 @@ mod tests {
BUCKETS_DURATION,
Arc::new(Mutex::new(get_test_concentrator())),
get_test_metadata(),
Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
Endpoint::from_slice(&server.url(STATS_ENDPOINT)),
CancellationToken::new(),
);

Expand Down Expand Up @@ -360,7 +350,7 @@ mod tests {
buckets_duration,
Arc::new(Mutex::new(get_test_concentrator())),
get_test_metadata(),
Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
Endpoint::from_slice(&server.url(STATS_ENDPOINT)),
cancellation_token.clone(),
);

Expand Down
116 changes: 53 additions & 63 deletions data-pipeline/src/trace_exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
health_metrics, health_metrics::HealthMetric, span_concentrator::SpanConcentrator,
stats_exporter,
};
use anyhow::Context;
use arc_swap::{ArcSwap, ArcSwapOption};
use bytes::Bytes;
use datadog_trace_utils::msgpack_decoder::{self, decode::error::DecodeError};
Expand All @@ -24,7 +25,6 @@ use dogstatsd_client::{new, Client, DogStatsDAction};
use either::Either;
use error::BuilderErrorKind;
use http_body_util::BodyExt;
use hyper::http::uri::PathAndQuery;
use hyper::{header::CONTENT_TYPE, Method, Uri};
use log::{error, info};
use std::io;
Expand Down Expand Up @@ -66,15 +66,12 @@ pub enum TraceExporterOutputFormat {
}

impl TraceExporterOutputFormat {
/// Add the agent trace endpoint path to the URL.
fn add_path(&self, url: &Uri) -> Uri {
add_path(
url,
match self {
TraceExporterOutputFormat::V04 => "/v0.4/traces",
TraceExporterOutputFormat::V05 => "/v0.5/traces",
},
)
/// Return the agent trace endpoint path.
fn as_path(&self) -> &str {
match self {
TraceExporterOutputFormat::V04 => "/v0.4/traces",
TraceExporterOutputFormat::V05 => "/v0.5/traces",
}
}

#[cfg(feature = "test-utils")]
Expand All @@ -90,35 +87,6 @@ impl TraceExporterOutputFormat {
}
}

/// Add a path to the URL.
///
/// # Arguments
///
/// * `url` - The URL to which the path is to be added.
/// * `path` - The path to be added to the URL.
fn add_path(url: &Uri, path: &str) -> Uri {
let p_and_q = url.path_and_query();

#[allow(clippy::unwrap_used)]
let new_p_and_q = match p_and_q {
Some(pq) => {
let p = pq.path();
let mut p = p.strip_suffix('/').unwrap_or(p).to_owned();
p.push_str(path);

PathAndQuery::from_str(p.as_str())
}
None => PathAndQuery::from_str(path),
}
// TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
.unwrap();
let mut parts = url.clone().into_parts();
parts.path_and_query = Some(new_p_and_q);
// TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
#[allow(clippy::unwrap_used)]
Uri::from_parts(parts).unwrap()
}

#[derive(Clone, Default, Debug)]
pub struct TracerMetadata {
pub hostname: String,
Expand Down Expand Up @@ -331,7 +299,9 @@ impl TraceExporter {
bucket_size,
stats_concentrator.clone(),
self.metadata.clone(),
Endpoint::from_url(add_path(&self.endpoint.url, STATS_ENDPOINT)),
self.endpoint
.try_clone_with_subpath(STATS_ENDPOINT)
.context("failed to create Endpoint")?,
cancellation_token.clone(),
);

Expand Down Expand Up @@ -442,23 +412,26 @@ impl TraceExporter {
self.send_data_to_url(
data,
trace_count,
self.output_format.add_path(&self.endpoint.url),
self.endpoint
.try_clone_with_subpath(self.output_format.as_path())
.map_err(|e| {
TraceExporterError::Builder(BuilderErrorKind::InvalidUri(e.to_string()))
})?,
)
}

fn send_data_to_url(
&self,
data: &[u8],
trace_count: usize,
uri: Uri,
endpoint: Endpoint,
) -> Result<String, TraceExporterError> {
self.runtime.block_on(async {
let mut req_builder = hyper::Request::builder()
.uri(uri)
.header(
hyper::header::USER_AGENT,
concat!("Tracer/", env!("CARGO_PKG_VERSION")),
)
// SAFETY: the user agent is a valid header value
#[allow(clippy::unwrap_used)]
let mut req_builder = endpoint
.to_request_builder(concat!("Tracer/", env!("CARGO_PKG_VERSION")))
.unwrap()
.method(Method::POST);

let headers: HashMap<&'static str, String> = self.metadata.borrow().into();
Expand Down Expand Up @@ -656,10 +629,7 @@ impl TraceExporter {
};

let chunks = payload.size();
let endpoint = Endpoint {
url: self.get_agent_url(),
..self.endpoint.clone()
};
let endpoint = self.get_agent_endpoint();
let mut headers: HashMap<&str, String> = header_tags.into();
headers.insert(DATADOG_SEND_REAL_HTTP_STATUS_STR, "1".to_string());
headers.insert(DATADOG_TRACE_COUNT_STR, chunks.to_string());
Expand Down Expand Up @@ -760,16 +730,23 @@ impl TraceExporter {
})
}

fn get_agent_url(&self) -> Uri {
fn get_agent_endpoint(&self) -> Endpoint {
// Safety: Output format only contains valid path
#[allow(clippy::unwrap_used)]
let endpoint = self
.endpoint
.try_clone_with_subpath(self.output_format.as_path())
.unwrap();
#[cfg(feature = "test-utils")]
{
if let Some(query) = &self.query_params {
let url = self.output_format.add_path(&self.endpoint.url);
return self.output_format.add_query(&url, query);
return Endpoint {
url: self.output_format.add_query(&endpoint.url, query),
..endpoint
};
}
}

self.output_format.add_path(&self.endpoint.url)
endpoint
}
}

Expand Down Expand Up @@ -992,11 +969,20 @@ impl TraceExporterBuilder {
TraceExporterError::Builder(BuilderErrorKind::InvalidUri(e.to_string()))
})?;

let endpoint = Endpoint {
url: agent_url,
..Default::default()
};

let libdatadog_version = tag!("libdatadog_version", env!("CARGO_PKG_VERSION"));
let mut stats = StatsComputationStatus::Disabled;

let info_fetcher = AgentInfoFetcher::new(
Endpoint::from_url(add_path(&agent_url, INFO_ENDPOINT)),
endpoint
.try_clone_with_subpath(INFO_ENDPOINT)
.map_err(|e| {
TraceExporterError::Builder(BuilderErrorKind::InvalidUri(e.to_string()))
})?,
Duration::from_secs(5 * 60),
);

Expand Down Expand Up @@ -1038,7 +1024,7 @@ impl TraceExporterBuilder {
}

Ok(TraceExporter {
endpoint: Endpoint::from_url(agent_url),
endpoint,
metadata: TracerMetadata {
tracer_version: self.tracer_version,
language_version: self.language_version,
Expand Down Expand Up @@ -1132,8 +1118,10 @@ mod tests {

assert_eq!(
exporter
.output_format
.add_path(&exporter.endpoint.url)
.endpoint
.try_clone_with_subpath(exporter.output_format.as_path())
.unwrap()
.url
.to_string(),
"http://192.168.1.1:8127/v0.4/traces"
);
Expand All @@ -1156,8 +1144,10 @@ mod tests {

assert_eq!(
exporter
.output_format
.add_path(&exporter.endpoint.url)
.endpoint
.try_clone_with_subpath(exporter.output_format.as_path())
.unwrap()
.url
.to_string(),
"http://127.0.0.1:8126/v0.4/traces"
);
Expand Down
63 changes: 63 additions & 0 deletions ddcommon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#![cfg_attr(not(test), deny(clippy::todo))]
#![cfg_attr(not(test), deny(clippy::unimplemented))]

use http::{uri::PathAndQuery, Uri};
use hyper::{
header::HeaderValue,
http::uri::{self},
Expand Down Expand Up @@ -229,6 +230,40 @@ impl Endpoint {
/// Default value for the timeout field in milliseconds.
pub const DEFAULT_TIMEOUT: u64 = 3_000;

/// Add a path to the url of an `Endpoint`.
///
/// The given path must start with a slash (e.g. "/v0.4/traces").
/// Returns an error if the path is not valid.
pub fn add_path(&mut self, path: &str) -> anyhow::Result<()> {
let mut parts = self.url.clone().into_parts();
parts.path_and_query = Some(match parts.path_and_query {
Some(pq) => {
let p = pq.path();
let mut p = p.strip_suffix('/').unwrap_or(p).to_owned();
p.push_str(path);
if let Some(q) = pq.query() {
p.push('?');
p.push_str(q);
}
PathAndQuery::from_str(p.as_str())
}
None => PathAndQuery::from_str(path),
}?);
self.url = Uri::from_parts(parts)?;
Ok(())
}

/// Create a new `Endpoint` by extending the url with the given path.
///
/// The given path must start with a slash (e.g. "/v0.4/traces").
/// Returns an error if the path is not valid.
/// All the other fields are copied.
pub fn try_clone_with_subpath(&self, path: &str) -> anyhow::Result<Self> {
let mut endpoint = self.clone();
endpoint.add_path(path)?;
Ok(endpoint)
}

/// Return a request builder with the following headers:
/// - User agent
/// - Api key
Expand Down Expand Up @@ -286,3 +321,31 @@ impl Endpoint {
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_add_path() {
let test_cases = [
("http://test.com/", "/foo", "http://test.com/foo"),
("http://test.com/bar", "/foo", "http://test.com/bar/foo"),
(
"http://test.com/bar",
"/foo/baz",
"http://test.com/bar/foo/baz",
),
(
"http://test.com/bar?data=dog&product=apm",
"/foo/baz",
"http://test.com/bar/foo/baz?data=dog&product=apm",
),
];
for (url, path, expected) in test_cases {
let mut endpoint = Endpoint::from_url(url.parse().unwrap());
endpoint.add_path(path).unwrap();
assert_eq!(endpoint.url, expected);
}
}
}
Loading