diff --git a/libdd-data-pipeline/src/health_metrics.rs b/libdd-data-pipeline/src/health_metrics.rs index 70308f2ff2..18823d5f16 100644 --- a/libdd-data-pipeline/src/health_metrics.rs +++ b/libdd-data-pipeline/src/health_metrics.rs @@ -69,6 +69,8 @@ //! - `type:`: Error type classification for non-HTTP errors (e.g., `type:network`, //! `type:timeout`, `type:response_body`, `type:build`, `type:unknown`) +use std::borrow::Cow; + // ============================================================================= // Trace Processing Metrics // ============================================================================= @@ -90,10 +92,8 @@ pub(crate) const DESERIALIZE_TRACES_ERRORS: &str = "datadog.tracer.exporter.dese /// Number of trace serialization errors. /// /// **Type**: Count -/// **When Emitted**: Currently unused but reserved for future trace serialization error tracking -/// **Tags**: `libdatadog_version` -/// **Status**: Dead code (marked with `#[allow(dead_code)]`) -#[allow(dead_code)] // TODO (APMSP-1584) Add support for health metrics when using trace utils +/// **When Emitted**: When msgpack serialization fails +/// **Tags**: `libdatadog_version` pub(crate) const SERIALIZE_TRACES_ERRORS: &str = "datadog.tracer.exporter.serialize.errors"; // ============================================================================= @@ -186,7 +186,610 @@ pub(crate) const TRANSPORT_DROPPED_BYTES: &str = "datadog.tracer.exporter.transp pub(crate) const TRANSPORT_REQUESTS: &str = "datadog.tracer.exporter.transport.requests"; #[derive(Debug)] +#[cfg_attr(test, derive(PartialEq))] pub(crate) enum HealthMetric { Count(&'static str, i64), Distribution(&'static str, i64), } + +/// Categorization of errors from different sources (direct hyper responses vs +/// send_with_retry results) for consistent metric emission +#[derive(Debug, Clone, Copy)] +#[cfg_attr(test, derive(PartialEq, Eq))] +pub(crate) enum TransportErrorType { + /// HTTP error with a specific status code (4xx, 5xx) + Http(u16), + /// Network/connection error + Network, + /// Request timeout + Timeout, + /// Failed to read response body + ResponseBody, + /// Failed to build the HTTP request + Build, +} + +impl TransportErrorType { + pub(crate) fn as_tag_value(&self) -> Cow<'static, str> { + match self { + TransportErrorType::Http(code) => Cow::Owned(code.to_string()), + TransportErrorType::Network => Cow::Borrowed("network"), + TransportErrorType::Timeout => Cow::Borrowed("timeout"), + TransportErrorType::ResponseBody => Cow::Borrowed("response_body"), + TransportErrorType::Build => Cow::Borrowed("build"), + } + } + + /// Per the health metrics specification: + /// - 404 and 415 status codes do NOT emit dropped metrics + /// - All other HTTP errors and non-HTTP errors emit dropped metrics + pub(crate) fn should_emit_dropped_metrics(&self) -> bool { + !matches!( + self, + TransportErrorType::Http(404) | TransportErrorType::Http(415) + ) + } +} + +/// Result structure for health metrics emission +/// +/// This structure captures all the information needed to emit the appropriate +/// health metric for a send operation regardless whence it came +#[derive(Debug)] +#[cfg_attr(test, derive(Clone, PartialEq))] +pub(crate) struct SendResult { + /// The error type if the operation failed, or `None` if it succeeded. + pub error_type: Option, + /// Size of the payload in bytes + pub payload_bytes: usize, + /// Number of trace chunks in the payload + pub trace_chunks: usize, + /// Number of HTTP request attempts (including retries) + pub request_attempts: u32, +} + +impl SendResult { + /// Create a new successful send result + pub(crate) fn success( + payload_bytes: usize, + trace_chunks: usize, + request_attempts: u32, + ) -> Self { + debug_assert!( + request_attempts > 0, + "SendResult::success called with zero request attempts" + ); + Self { + error_type: None, + payload_bytes, + trace_chunks, + request_attempts, + } + } + + /// Create a new failed send result + pub(crate) fn failure( + error_type: TransportErrorType, + payload_bytes: usize, + trace_chunks: usize, + request_attempts: u32, + ) -> Self { + debug_assert!( + request_attempts > 0, + "SendResult::failure called with zero request attempts" + ); + Self { + error_type: Some(error_type), + payload_bytes, + trace_chunks, + request_attempts, + } + } + + /// Returns whether the send operation was successful + #[cfg(test)] + pub(crate) fn is_success(&self) -> bool { + self.error_type.is_none() + } + + /// Collect all health metrics that should be emitted for this result + /// + /// This method encapsulates all the logic for determining which metrics to + /// emit based on the send operation. It returns a vector of metrics that + /// should be sent to DogStatsD + /// + /// # Returns + /// + /// A vector of `(HealthMetric, Option)` tuples where: + /// - The first element is the metric to emit + /// - The second element is an optional tag value for error classification + pub(crate) fn collect_metrics(&self) -> Vec<(HealthMetric, Option)> { + // Max capacity: 3 base + 1 outcome + 2 dropped + let mut metrics = Vec::with_capacity(6); + + // Always emit: sent bytes, sent traces, request count + metrics.push(( + HealthMetric::Distribution(TRANSPORT_SENT_BYTES, self.payload_bytes as i64), + None, + )); + metrics.push(( + HealthMetric::Distribution(TRANSPORT_TRACES_SENT, self.trace_chunks as i64), + None, + )); + metrics.push(( + HealthMetric::Distribution(TRANSPORT_REQUESTS, self.request_attempts as i64), + None, + )); + + match &self.error_type { + None => { + // Emit successful traces count + metrics.push(( + HealthMetric::Count(TRANSPORT_TRACES_SUCCESSFUL, self.trace_chunks as i64), + None, + )); + } + Some(error_type) => { + // Emit failed metric with type tag + metrics.push(( + HealthMetric::Count(TRANSPORT_TRACES_FAILED, 1), + Some(error_type.as_tag_value().into_owned()), + )); + + if error_type.should_emit_dropped_metrics() { + metrics.push(( + HealthMetric::Distribution( + TRANSPORT_DROPPED_BYTES, + self.payload_bytes as i64, + ), + None, + )); + metrics.push(( + HealthMetric::Distribution( + TRANSPORT_TRACES_DROPPED, + self.trace_chunks as i64, + ), + None, + )); + } + } + } + + metrics + } +} + +#[cfg(test)] +mod tests { + use super::*; + + /// Test-only extension methods for SendResult + impl SendResult { + /// Create a `SendResult` from a `SendWithRetryResult`. + /// + /// This conversion handles all variants of the retry result and extracts the + /// appropriate error type and attempt count. + /// + /// # Arguments + /// + /// * `result` - The result from `send_with_retry` + /// * `payload_bytes` - Size of the payload that was sent + /// * `trace_chunks` - Number of trace chunks in the payload + pub(crate) fn from_retry_result( + result: &libdd_trace_utils::send_with_retry::SendWithRetryResult, + payload_bytes: usize, + trace_chunks: usize, + ) -> Self { + use libdd_trace_utils::send_with_retry::SendWithRetryError; + + match result { + Ok((response, attempts)) => { + if response.status().is_success() { + Self::success(payload_bytes, trace_chunks, *attempts) + } else { + // Non-success status in Ok variant (shouldn't happen with + // send_with_retry) + Self::failure( + TransportErrorType::Http(response.status().as_u16()), + payload_bytes, + trace_chunks, + *attempts, + ) + } + } + Err(err) => { + let (error_type, attempts) = match err { + SendWithRetryError::Http(response, attempts) => ( + TransportErrorType::Http(response.status().as_u16()), + *attempts, + ), + SendWithRetryError::Timeout(attempts) => { + (TransportErrorType::Timeout, *attempts) + } + SendWithRetryError::Network(_, attempts) => { + (TransportErrorType::Network, *attempts) + } + SendWithRetryError::Build(attempts) => { + (TransportErrorType::Build, *attempts) + } + }; + Self::failure(error_type, payload_bytes, trace_chunks, attempts) + } + } + } + } + + #[test] + fn test_http_tag_value() { + assert_eq!(TransportErrorType::Http(400).as_tag_value().as_ref(), "400"); + assert_eq!(TransportErrorType::Http(404).as_tag_value().as_ref(), "404"); + assert_eq!(TransportErrorType::Http(500).as_tag_value().as_ref(), "500"); + } + + #[test] + fn test_non_http_tag_values() { + assert_eq!( + TransportErrorType::Network.as_tag_value().as_ref(), + "network" + ); + assert_eq!( + TransportErrorType::Timeout.as_tag_value().as_ref(), + "timeout" + ); + assert_eq!( + TransportErrorType::ResponseBody.as_tag_value().as_ref(), + "response_body" + ); + assert_eq!(TransportErrorType::Build.as_tag_value().as_ref(), "build"); + } + + #[test] + fn test_dropped_excludes_404_and_415() { + assert!(!TransportErrorType::Http(404).should_emit_dropped_metrics()); + assert!(!TransportErrorType::Http(415).should_emit_dropped_metrics()); + } + + #[test] + fn test_dropped_includes_other_http() { + assert!(TransportErrorType::Http(400).should_emit_dropped_metrics()); + assert!(TransportErrorType::Http(401).should_emit_dropped_metrics()); + assert!(TransportErrorType::Http(403).should_emit_dropped_metrics()); + assert!(TransportErrorType::Http(500).should_emit_dropped_metrics()); + assert!(TransportErrorType::Http(502).should_emit_dropped_metrics()); + assert!(TransportErrorType::Http(503).should_emit_dropped_metrics()); + } + + #[test] + fn test_dropped_includes_non_http() { + assert!(TransportErrorType::Network.should_emit_dropped_metrics()); + assert!(TransportErrorType::Timeout.should_emit_dropped_metrics()); + assert!(TransportErrorType::ResponseBody.should_emit_dropped_metrics()); + assert!(TransportErrorType::Build.should_emit_dropped_metrics()); + } + + #[test] + fn test_success_construction() { + let result = SendResult::success(1024, 5, 1); + + assert!(result.is_success()); + assert_eq!(result.error_type, None); + assert_eq!(result.payload_bytes, 1024); + assert_eq!(result.trace_chunks, 5); + assert_eq!(result.request_attempts, 1); + } + + #[test] + fn test_failure_construction() { + let result = SendResult::failure(TransportErrorType::Http(500), 2048, 10, 3); + + assert!(!result.is_success()); + assert_eq!(result.error_type, Some(TransportErrorType::Http(500))); + assert_eq!(result.payload_bytes, 2048); + assert_eq!(result.trace_chunks, 10); + assert_eq!(result.request_attempts, 3); + } + + #[test] + fn test_success_metrics() { + let result = SendResult::success(1024, 5, 1); + let metrics = result.collect_metrics(); + + // Should emit 4 metrics for success + assert_eq!(metrics.len(), 4); + assert!(metrics.contains(&(HealthMetric::Distribution(TRANSPORT_SENT_BYTES, 1024), None))); + assert!(metrics.contains(&(HealthMetric::Distribution(TRANSPORT_TRACES_SENT, 5), None))); + assert!(metrics.contains(&(HealthMetric::Distribution(TRANSPORT_REQUESTS, 1), None))); + assert!(metrics.contains(&(HealthMetric::Count(TRANSPORT_TRACES_SUCCESSFUL, 5), None))); + } + + #[test] + fn test_success_no_failure_metrics() { + let result = SendResult::success(1024, 5, 1); + let metrics = result.collect_metrics(); + + for (metric, _) in &metrics { + match metric { + HealthMetric::Count(name, _) => { + assert_ne!(*name, TRANSPORT_TRACES_FAILED); + } + HealthMetric::Distribution(name, _) => { + assert_ne!(*name, TRANSPORT_DROPPED_BYTES); + assert_ne!(*name, TRANSPORT_TRACES_DROPPED); + } + } + } + } + + #[test] + fn test_http_400_emits_dropped() { + let result = SendResult::failure(TransportErrorType::Http(400), 2048, 10, 5); + let metrics = result.collect_metrics(); + + assert_eq!(metrics.len(), 6); + assert!(metrics.contains(&( + HealthMetric::Count(TRANSPORT_TRACES_FAILED, 1), + Some("400".to_string()) + ))); + assert!(metrics.contains(&( + HealthMetric::Distribution(TRANSPORT_DROPPED_BYTES, 2048), + None + ))); + assert!(metrics.contains(&( + HealthMetric::Distribution(TRANSPORT_TRACES_DROPPED, 10), + None + ))); + } + + #[test] + fn test_http_404_skips_dropped() { + let result = SendResult::failure(TransportErrorType::Http(404), 2048, 10, 5); + let metrics = result.collect_metrics(); + + assert_eq!(metrics.len(), 4); + assert!(metrics.contains(&( + HealthMetric::Count(TRANSPORT_TRACES_FAILED, 1), + Some("404".to_string()) + ))); + for (metric, _) in &metrics { + if let HealthMetric::Distribution(name, _) = metric { + assert_ne!(*name, TRANSPORT_DROPPED_BYTES); + assert_ne!(*name, TRANSPORT_TRACES_DROPPED); + } + } + } + + #[test] + fn test_http_415_skips_dropped() { + let result = SendResult::failure(TransportErrorType::Http(415), 1024, 3, 1); + let metrics = result.collect_metrics(); + + assert_eq!(metrics.len(), 4); + assert!(metrics.contains(&( + HealthMetric::Count(TRANSPORT_TRACES_FAILED, 1), + Some("415".to_string()) + ))); + } + + #[test] + fn test_network_error_emits_dropped() { + let result = SendResult::failure(TransportErrorType::Network, 512, 2, 3); + let metrics = result.collect_metrics(); + + assert_eq!(metrics.len(), 6); + assert!(metrics.contains(&( + HealthMetric::Count(TRANSPORT_TRACES_FAILED, 1), + Some("network".to_string()) + ))); + assert!(metrics.contains(&( + HealthMetric::Distribution(TRANSPORT_DROPPED_BYTES, 512), + None + ))); + } + + #[test] + fn test_timeout_emits_dropped() { + let result = SendResult::failure(TransportErrorType::Timeout, 1024, 5, 5); + let metrics = result.collect_metrics(); + + assert_eq!(metrics.len(), 6); + assert!(metrics.contains(&( + HealthMetric::Count(TRANSPORT_TRACES_FAILED, 1), + Some("timeout".to_string()) + ))); + assert!(metrics.contains(&( + HealthMetric::Distribution(TRANSPORT_DROPPED_BYTES, 1024), + None + ))); + } + + #[test] + fn test_build_error_emits_dropped() { + let result = SendResult::failure(TransportErrorType::Build, 256, 1, 1); + let metrics = result.collect_metrics(); + + assert_eq!(metrics.len(), 6); + assert!(metrics.contains(&( + HealthMetric::Count(TRANSPORT_TRACES_FAILED, 1), + Some("build".to_string()) + ))); + assert!(metrics.contains(&( + HealthMetric::Distribution(TRANSPORT_DROPPED_BYTES, 256), + None + ))); + } + + #[test] + fn test_response_body_error_emits_dropped() { + let result = SendResult::failure(TransportErrorType::ResponseBody, 4096, 20, 1); + let metrics = result.collect_metrics(); + + assert_eq!(metrics.len(), 6); + assert!(metrics.contains(&( + HealthMetric::Count(TRANSPORT_TRACES_FAILED, 1), + Some("response_body".to_string()) + ))); + } + + #[test] + fn test_base_metrics_always_emitted() { + let scenarios = vec![ + SendResult::success(100, 1, 1), + SendResult::failure(TransportErrorType::Http(500), 200, 2, 2), + SendResult::failure(TransportErrorType::Network, 300, 3, 3), + SendResult::failure(TransportErrorType::Http(404), 400, 4, 4), + ]; + + for result in scenarios { + let metrics = result.collect_metrics(); + + let has_sent_bytes = metrics.iter().any(|(m, _)| { + matches!(m, HealthMetric::Distribution(name, _) if *name == TRANSPORT_SENT_BYTES) + }); + assert!(has_sent_bytes, "Missing sent_bytes for {:?}", result); + + let has_sent_traces = metrics.iter().any(|(m, _)| { + matches!(m, HealthMetric::Distribution(name, _) if *name == TRANSPORT_TRACES_SENT) + }); + assert!(has_sent_traces, "Missing sent_traces for {:?}", result); + + let has_requests = metrics.iter().any(|(m, _)| { + matches!(m, HealthMetric::Distribution(name, _) if *name == TRANSPORT_REQUESTS) + }); + assert!(has_requests, "Missing requests for {:?}", result); + } + } + + #[test] + fn test_request_attempts_reflects_retries() { + let result = SendResult::failure(TransportErrorType::Http(503), 1024, 5, 5); + let metrics = result.collect_metrics(); + + assert!(metrics.contains(&(HealthMetric::Distribution(TRANSPORT_REQUESTS, 5), None))); + } + + mod send_with_retry_conversion { + use super::*; + use bytes::Bytes; + use hyper::{Response, StatusCode}; + use libdd_common::hyper_migration; + use libdd_trace_utils::send_with_retry::{SendWithRetryError, SendWithRetryResult}; + + /// Helper to create a mock HTTP response for testing + fn mock_response(status: StatusCode) -> hyper_migration::HttpResponse { + hyper_migration::mock_response( + Response::builder().status(status), + Bytes::from("test body"), + ) + .unwrap() + } + + #[test] + fn test_from_retry_result_success_2xx() { + let response = mock_response(StatusCode::OK); + let retry_result: SendWithRetryResult = Ok((response, 1)); + + let send_result = SendResult::from_retry_result(&retry_result, 1024, 5); + + assert!(send_result.is_success()); + assert_eq!(send_result.payload_bytes, 1024); + assert_eq!(send_result.trace_chunks, 5); + assert_eq!(send_result.request_attempts, 1); + } + + #[test] + fn test_from_retry_result_http_error() { + let response = mock_response(StatusCode::BAD_REQUEST); + let retry_result: SendWithRetryResult = Err(SendWithRetryError::Http(response, 3)); + + let send_result = SendResult::from_retry_result(&retry_result, 2048, 10); + + assert_eq!(send_result.error_type, Some(TransportErrorType::Http(400))); + assert_eq!(send_result.request_attempts, 3); + } + + #[test] + fn test_from_retry_result_timeout_error() { + let retry_result: SendWithRetryResult = Err(SendWithRetryError::Timeout(5)); + + let send_result = SendResult::from_retry_result(&retry_result, 512, 2); + + assert_eq!(send_result.error_type, Some(TransportErrorType::Timeout)); + assert_eq!(send_result.request_attempts, 5); + } + + #[test] + fn test_from_retry_result_network_error() { + // We can't really simulate network error, so we test the behavior + // via the API directly + let send_result = SendResult::failure(TransportErrorType::Network, 256, 1, 3); + + assert_eq!(send_result.error_type, Some(TransportErrorType::Network)); + assert_eq!(send_result.request_attempts, 3); + + let metrics = send_result.collect_metrics(); + assert!(metrics.contains(&( + HealthMetric::Count(TRANSPORT_TRACES_FAILED, 1), + Some("network".to_string()) + ))); + assert!(metrics.contains(&( + HealthMetric::Distribution(TRANSPORT_DROPPED_BYTES, 256), + None + ))); + } + + #[test] + fn test_from_retry_result_build_error() { + let retry_result: SendWithRetryResult = Err(SendWithRetryError::Build(1)); + + let send_result = SendResult::from_retry_result(&retry_result, 100, 1); + + assert_eq!(send_result.error_type, Some(TransportErrorType::Build)); + assert_eq!(send_result.request_attempts, 1); + } + + #[test] + fn test_from_retry_result_preserves_context() { + let response = mock_response(StatusCode::OK); + let retry_result: SendWithRetryResult = Ok((response, 2)); + + let send_result = SendResult::from_retry_result(&retry_result, 4096, 25); + + assert_eq!(send_result.payload_bytes, 4096); + assert_eq!(send_result.trace_chunks, 25); + assert_eq!(send_result.request_attempts, 2); + } + } + + /// Tests for serialization/deserialization metric constants + mod serialization_metrics { + use super::*; + + #[test] + fn test_serialize_errors_constant_defined() { + assert_eq!( + SERIALIZE_TRACES_ERRORS, + "datadog.tracer.exporter.serialize.errors" + ); + } + + #[test] + fn test_deserialize_errors_constant_defined() { + assert_eq!( + DESERIALIZE_TRACES_ERRORS, + "datadog.tracer.exporter.deserialize.errors" + ); + } + + #[test] + fn test_serialize_metric_can_be_used() { + let metric = HealthMetric::Count(SERIALIZE_TRACES_ERRORS, 1); + match metric { + HealthMetric::Count(name, count) => { + assert_eq!(name, SERIALIZE_TRACES_ERRORS); + assert_eq!(count, 1); + } + _ => panic!("Expected Count metric"), + } + } + } +} diff --git a/libdd-data-pipeline/src/trace_exporter/metrics.rs b/libdd-data-pipeline/src/trace_exporter/metrics.rs index 3b47c5d67e..57f2765be5 100644 --- a/libdd-data-pipeline/src/trace_exporter/metrics.rs +++ b/libdd-data-pipeline/src/trace_exporter/metrics.rs @@ -1,7 +1,7 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::health_metrics::HealthMetric; +use crate::health_metrics::{HealthMetric, SendResult}; use either::Either; use libdd_common::tag::Tag; use libdd_dogstatsd_client::{Client, DogStatsDAction}; @@ -62,6 +62,20 @@ impl<'a> MetricsEmitter<'a> { ); } } + + /// Emit all health metrics from a SendResult + /// + /// This method processes the SendResult and emits all appropriate metrics + /// based on the operation's outcome (success/failure, error type, etc.) + pub(crate) fn emit_send_result(&self, result: &SendResult) { + for (metric, type_tag_value) in result.collect_metrics() { + let type_tag = type_tag_value + .as_ref() + .and_then(|v| Tag::new("type", v).ok()); + let custom_tags = type_tag.as_ref().map(|t| vec![t]); + self.emit(metric, custom_tags); + } + } } // Primary testing is done in the main TraceExporter module for now. diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index baec855df3..44d919e4b5 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -27,14 +27,14 @@ use crate::trace_exporter::error::{InternalErrorKind, RequestError, TraceExporte use crate::{ agent_info::{self, schema::AgentInfo}, health_metrics, - health_metrics::HealthMetric, + health_metrics::{HealthMetric, SendResult, TransportErrorType}, }; use arc_swap::{ArcSwap, ArcSwapOption}; use http_body_util::BodyExt; use hyper::http::uri::PathAndQuery; use hyper::Uri; +use libdd_common::tag::Tag; use libdd_common::{hyper_migration, Endpoint}; -use libdd_common::{tag, tag::Tag}; use libdd_common::{HttpClient, MutexExt}; use libdd_dogstatsd_client::Client; use libdd_telemetry::worker::TelemetryWorker; @@ -519,24 +519,66 @@ impl TraceExporter { trace_count: usize, uri: Uri, ) -> Result { - let transport_client = TransportClient::new( - &self.metadata, - self.health_metrics_enabled, - self.dogstatsd.as_ref(), - &self.common_stats_tags, - ); + let transport_client = TransportClient::new(&self.metadata); let req = transport_client.build_trace_request(data, trace_count, uri); match hyper_migration::new_default_client().request(req).await { Ok(response) => { let response = hyper_migration::into_response(response); - transport_client - .process_http_response(response, trace_count, data.len()) + // For proxy path, always 1 request attempt (no retry) + self.handle_proxy_response(response, trace_count, data.len()) .await } Err(err) => self.handle_request_error(err, data.len(), trace_count), } } + /// Handle response for proxy path (no retry) + async fn handle_proxy_response( + &self, + response: hyper::Response, + trace_count: usize, + payload_len: usize, + ) -> Result { + let status = response.status(); + + if !status.is_success() { + let send_result = SendResult::failure( + TransportErrorType::Http(status.as_u16()), + payload_len, + trace_count, + 1, + ); + self.emit_send_result(&send_result); + let body = Self::read_response_body(response).await.map_err(|e| { + error!(?e, "Error reading error response body"); + TraceExporterError::from(e) + })?; + return Err(TraceExporterError::Request(RequestError::new( + status, &body, + ))); + } + + match Self::read_response_body(response).await { + Ok(body) => { + debug!(trace_count, "Traces sent successfully to agent"); + let send_result = SendResult::success(payload_len, trace_count, 1); + self.emit_send_result(&send_result); + Ok(AgentResponse::Changed { body }) + } + Err(err) => { + error!(?err, "Failed to read agent response body"); + let send_result = SendResult::failure( + TransportErrorType::ResponseBody, + payload_len, + trace_count, + 1, + ); + self.emit_send_result(&send_result); + Err(TraceExporterError::from(err)) + } + } + } + /// Handle HTTP request errors fn handle_request_error( &self, @@ -548,26 +590,10 @@ impl TraceExporter { error = %err, "Request to agent failed" ); - let type_tag = tag!("type", "network"); - self.emit_metric( - HealthMetric::Count(health_metrics::TRANSPORT_TRACES_FAILED, 1), - Some(vec![&type_tag]), - ); - // Emit dropped bytes metric for network/connection errors - self.emit_metric( - HealthMetric::Distribution( - health_metrics::TRANSPORT_DROPPED_BYTES, - payload_size as i64, - ), - None, - ); - self.emit_metric( - HealthMetric::Distribution( - health_metrics::TRANSPORT_TRACES_DROPPED, - trace_count as i64, - ), - None, - ); + // For direct hyper errors (proxy path), always 1 request attempt + let send_result = + SendResult::failure(TransportErrorType::Network, payload_size, trace_count, 1); + self.emit_send_result(&send_result); Err(TraceExporterError::from(err)) } @@ -579,6 +605,14 @@ impl TraceExporter { } } + /// Emit all health metrics from a SendResult + fn emit_send_result(&self, result: &SendResult) { + if self.health_metrics_enabled { + let emitter = MetricsEmitter::new(self.dogstatsd.as_ref(), &self.common_stats_tags); + emitter.emit_send_result(result); + } + } + /// Send a list of trace chunks to the agent /// /// # Arguments @@ -640,21 +674,6 @@ impl TraceExporter { let result = send_with_retry(&self.http_client, endpoint, mp_payload, &headers, &strategy).await; - // Emit http.requests health metric based on number of attempts - let requests_count = match &result { - Ok((_, attempts)) => *attempts as i64, - Err(err) => match err { - SendWithRetryError::Http(_, attempts) => *attempts as i64, - SendWithRetryError::Timeout(attempts) => *attempts as i64, - SendWithRetryError::Network(_, attempts) => *attempts as i64, - SendWithRetryError::Build(attempts) => *attempts as i64, - }, - }; - self.emit_metric( - HealthMetric::Distribution(health_metrics::TRANSPORT_REQUESTS, requests_count), - None, - ); - // Send telemetry for the payload sending if let Some(telemetry) = &self.telemetry { if let Err(e) = telemetry.send(&SendPayloadTelemetry::from_retry_result( @@ -687,7 +706,17 @@ impl TraceExporter { self.output_format, self.agent_payload_response_version.as_ref(), ); - let prepared = serializer.prepare_traces_payload(traces, header_tags)?; + let prepared = match serializer.prepare_traces_payload(traces, header_tags) { + Ok(p) => p, + Err(e) => { + error!("Error serializing traces: {e}"); + self.emit_metric( + HealthMetric::Count(health_metrics::SERIALIZE_TRACES_ERRORS, 1), + None, + ); + return Err(e); + } + }; let endpoint = Endpoint { url: self.get_agent_url(), @@ -712,19 +741,9 @@ impl TraceExporter { chunks: usize, payload_len: usize, ) -> Result { - // Always emit http.sent.* metrics regardless of success/failure - self.emit_metric( - HealthMetric::Distribution(health_metrics::TRANSPORT_SENT_BYTES, payload_len as i64), - None, - ); - self.emit_metric( - HealthMetric::Distribution(health_metrics::TRANSPORT_TRACES_SENT, chunks as i64), - None, - ); - match result { - Ok((response, _)) => { - self.handle_agent_response(chunks, response, payload_len) + Ok((response, attempts)) => { + self.handle_agent_response(chunks, response, payload_len, attempts) .await } Err(err) => self.handle_send_error(err, payload_len, chunks).await, @@ -740,47 +759,33 @@ impl TraceExporter { ) -> Result { error!(?err, "Error sending traces"); - // Only emit the error metric for non-HTTP errors here - // HTTP errors will be handled by handle_http_send_error with specific status codes - match &err { - SendWithRetryError::Http(_, _) => { - // Will be handled by handle_http_send_error - } - SendWithRetryError::Timeout(_) => { - let type_tag = tag!("type", "timeout"); - self.emit_metric( - HealthMetric::Count(health_metrics::TRANSPORT_TRACES_FAILED, 1), - Some(vec![&type_tag]), - ); + match err { + SendWithRetryError::Http(response, attempts) => { + self.handle_http_send_error(response, payload_len, chunks, attempts) + .await } - SendWithRetryError::Network(_, _) => { - let type_tag = tag!("type", "network"); - self.emit_metric( - HealthMetric::Count(health_metrics::TRANSPORT_TRACES_FAILED, 1), - Some(vec![&type_tag]), - ); + SendWithRetryError::Timeout(attempts) => { + let send_result = + SendResult::failure(TransportErrorType::Timeout, payload_len, chunks, attempts); + self.emit_send_result(&send_result); + Err(TraceExporterError::from(io::Error::from( + io::ErrorKind::TimedOut, + ))) } - SendWithRetryError::Build(_) => { - let type_tag = tag!("type", "build"); - self.emit_metric( - HealthMetric::Count(health_metrics::TRANSPORT_TRACES_FAILED, 1), - Some(vec![&type_tag]), - ); + SendWithRetryError::Network(err, attempts) => { + let send_result = + SendResult::failure(TransportErrorType::Network, payload_len, chunks, attempts); + self.emit_send_result(&send_result); + Err(TraceExporterError::from(err)) } - }; - - match err { - SendWithRetryError::Http(response, _) => { - self.handle_http_send_error(response, payload_len, chunks) - .await + SendWithRetryError::Build(attempts) => { + let send_result = + SendResult::failure(TransportErrorType::Build, payload_len, chunks, attempts); + self.emit_send_result(&send_result); + Err(TraceExporterError::from(io::Error::from( + io::ErrorKind::Other, + ))) } - SendWithRetryError::Timeout(_) => Err(TraceExporterError::from(io::Error::from( - io::ErrorKind::TimedOut, - ))), - SendWithRetryError::Network(err, _) => Err(TraceExporterError::from(err)), - SendWithRetryError::Build(_) => Err(TraceExporterError::from(io::Error::from( - io::ErrorKind::Other, - ))), } } @@ -790,34 +795,21 @@ impl TraceExporter { response: hyper::Response, payload_len: usize, chunks: usize, + attempts: u32, ) -> Result { let status = response.status(); // Check if the agent state has changed for error responses self.info_response_observer.check_response(&response); - // Emit send traces errors metric with status code type - let type_tag = - Tag::new("type", status.as_str()).unwrap_or_else(|_| tag!("type", "unknown")); - self.emit_metric( - HealthMetric::Count(health_metrics::TRANSPORT_TRACES_FAILED, 1), - Some(vec![&type_tag]), + // Emit health metrics using SendResult + let send_result = SendResult::failure( + TransportErrorType::Http(status.as_u16()), + payload_len, + chunks, + attempts, ); - - // Emit dropped bytes metric for HTTP error responses, excluding 404 and 415 - if status.as_u16() != 404 && status.as_u16() != 415 { - self.emit_metric( - HealthMetric::Distribution( - health_metrics::TRANSPORT_DROPPED_BYTES, - payload_len as i64, - ), - None, - ); - self.emit_metric( - HealthMetric::Distribution(health_metrics::TRANSPORT_TRACES_DROPPED, chunks as i64), - None, - ); - } + self.emit_send_result(&send_result); let body = self.read_error_response_body(response).await?; Err(TraceExporterError::Request(RequestError::new( @@ -873,40 +865,24 @@ impl TraceExporter { /// Read response body and handle potential errors async fn read_response_body( - &self, response: hyper::Response, - ) -> Result { - match response.into_body().collect().await { - Ok(body) => Ok(String::from_utf8_lossy(&body.to_bytes()).to_string()), - Err(err) => { - error!(?err, "Error reading agent response body"); - let type_tag = tag!("type", "response_body"); - self.emit_metric( - HealthMetric::Count(health_metrics::TRANSPORT_TRACES_FAILED, 1), - Some(vec![&type_tag]), - ); - Err(TraceExporterError::from(err)) - } - } + ) -> Result { + let body = response.into_body().collect().await?; + Ok(String::from_utf8_lossy(&body.to_bytes()).to_string()) } /// Handle successful trace sending response fn handle_successful_trace_response( &self, chunks: usize, - status: hyper::StatusCode, + payload_len: usize, + attempts: u32, body: String, payload_version_changed: bool, ) -> Result { - debug!( - chunks = chunks, - status = %status, - "Trace chunks sent successfully to agent" - ); - self.emit_metric( - HealthMetric::Count(health_metrics::TRANSPORT_TRACES_SUCCESSFUL, chunks as i64), - None, - ); + debug!(chunks = chunks, "Trace chunks sent successfully to agent"); + let send_result = SendResult::success(payload_len, chunks, attempts); + self.emit_send_result(&send_result); Ok(if payload_version_changed { AgentResponse::Changed { body } @@ -920,48 +896,53 @@ impl TraceExporter { chunks: usize, response: hyper::Response, payload_len: usize, + attempts: u32, ) -> Result { // Check if the agent state has changed self.info_response_observer.check_response(&response); let status = response.status(); let payload_version_changed = self.check_payload_version_changed(&response); - let body = self.read_response_body(response).await?; - if !status.is_success() { - warn!( - status = %status, - "Agent returned non-success status for trace send" - ); - let type_tag = - Tag::new("type", status.as_str()).unwrap_or_else(|_| tag!("type", "unknown")); - self.emit_metric( - HealthMetric::Count(health_metrics::TRANSPORT_TRACES_FAILED, 1), - Some(vec![&type_tag]), - ); - // Emit dropped bytes metric for non-success status codes, excluding 404 and 415 - if status.as_u16() != 404 && status.as_u16() != 415 { - self.emit_metric( - HealthMetric::Distribution( - health_metrics::TRANSPORT_DROPPED_BYTES, - payload_len as i64, - ), - None, - ); - self.emit_metric( - HealthMetric::Distribution( - health_metrics::TRANSPORT_TRACES_DROPPED, - chunks as i64, - ), - None, + match Self::read_response_body(response).await { + Ok(body) => { + if !status.is_success() { + warn!( + status = %status, + "Agent returned non-success status for trace send" + ); + let send_result = SendResult::failure( + TransportErrorType::Http(status.as_u16()), + payload_len, + chunks, + attempts, + ); + self.emit_send_result(&send_result); + return Err(TraceExporterError::Request(RequestError::new( + status, &body, + ))); + } + + self.handle_successful_trace_response( + chunks, + payload_len, + attempts, + body, + payload_version_changed, + ) + } + Err(err) => { + error!(?err, "Error reading agent response body"); + let send_result = SendResult::failure( + TransportErrorType::ResponseBody, + payload_len, + chunks, + attempts, ); + self.emit_send_result(&send_result); + Err(TraceExporterError::from(err)) } - return Err(TraceExporterError::Request(RequestError::new( - status, &body, - ))); } - - self.handle_successful_trace_response(chunks, status, body, payload_version_changed) } fn get_agent_url(&self) -> Uri { diff --git a/libdd-data-pipeline/src/trace_exporter/transport.rs b/libdd-data-pipeline/src/trace_exporter/transport.rs index 71fb392dea..7043ec01e8 100644 --- a/libdd-data-pipeline/src/trace_exporter/transport.rs +++ b/libdd-data-pipeline/src/trace_exporter/transport.rs @@ -1,41 +1,24 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::health_metrics::{self, HealthMetric}; -use crate::trace_exporter::agent_response::AgentResponse; -use crate::trace_exporter::error::{RequestError, TraceExporterError}; -use crate::trace_exporter::metrics::MetricsEmitter; use crate::trace_exporter::TracerMetadata; use bytes::Bytes; -use http_body_util::BodyExt; use hyper::{Method, Uri}; use libdd_common::hyper_migration; -use libdd_common::{tag, tag::Tag}; use std::collections::HashMap; -use tracing::{debug, error, warn}; /// Transport client for trace exporter operations +/// +/// This struct is responsible for building HTTP requests for trace data. +/// Response handling and metric emission are handled by TraceExporter. pub(super) struct TransportClient<'a> { metadata: &'a TracerMetadata, - health_metrics_enabled: bool, - dogstatsd: Option<&'a libdd_dogstatsd_client::Client>, - common_stats_tags: &'a [Tag], } impl<'a> TransportClient<'a> { /// Create a new transport client - pub(super) fn new( - metadata: &'a TracerMetadata, - health_metrics_enabled: bool, - dogstatsd: Option<&'a libdd_dogstatsd_client::Client>, - common_stats_tags: &'a [Tag], - ) -> Self { - Self { - metadata, - health_metrics_enabled, - dogstatsd, - common_stats_tags, - } + pub(super) fn new(metadata: &'a TracerMetadata) -> Self { + Self { metadata } } /// Build HTTP request for sending trace data to agent @@ -51,73 +34,6 @@ impl<'a> TransportClient<'a> { self.build_request_with_body(req_builder, data) } - /// Handle HTTP error response and emit appropriate metrics - pub(super) async fn handle_http_error_response( - &self, - response: hyper::Response, - payload_size: usize, - trace_count: usize, - ) -> Result { - let response_status = response.status(); - let response_body = self.extract_response_body(response).await; - self.log_and_emit_error_metrics(response_status, payload_size, trace_count); - Err(TraceExporterError::Request(RequestError::new( - response_status, - &response_body, - ))) - } - - /// Handle successful HTTP response - pub(super) async fn handle_http_success_response( - &self, - response: hyper::Response, - trace_count: usize, - ) -> Result { - match response.into_body().collect().await { - Ok(body) => { - debug!(trace_count, "Traces sent successfully to agent"); - self.emit_metric( - HealthMetric::Count( - health_metrics::TRANSPORT_TRACES_SUCCESSFUL, - trace_count as i64, - ), - None, - ); - Ok(AgentResponse::Changed { - body: String::from_utf8_lossy(&body.to_bytes()).to_string(), - }) - } - Err(err) => { - error!( - error = %err, - "Failed to read agent response body" - ); - let type_tag = tag!("type", "response_body"); - self.emit_metric( - HealthMetric::Count(health_metrics::TRANSPORT_TRACES_FAILED, 1), - Some(vec![&type_tag]), - ); - Err(TraceExporterError::from(err)) - } - } - } - - /// Process HTTP response based on status code - pub(super) async fn process_http_response( - &self, - response: hyper::Response, - trace_count: usize, - payload_size: usize, - ) -> Result { - if !response.status().is_success() { - self.handle_http_error_response(response, payload_size, trace_count) - .await - } else { - self.handle_http_success_response(response, trace_count) - .await - } - } - /// Create base HTTP request builder with URI, user agent, and method fn create_base_request_builder(&self, uri: Uri) -> hyper::http::request::Builder { hyper::Request::builder() @@ -166,78 +82,12 @@ impl<'a> TransportClient<'a> { // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190). .unwrap() } - - /// Extract response body from HTTP response - async fn extract_response_body( - &self, - response: hyper::Response, - ) -> String { - // TODO: Properly handle non-OK states to prevent possible panics - // (APMSP-18190). - #[allow(clippy::unwrap_used)] - let body_bytes = response.into_body().collect().await.unwrap().to_bytes(); - String::from_utf8(body_bytes.to_vec()).unwrap_or_default() - } - - /// Log error and emit metrics for HTTP error response - fn log_and_emit_error_metrics( - &self, - response_status: hyper::StatusCode, - payload_size: usize, - trace_count: usize, - ) { - let resp_tag_res = &Tag::new("response_code", response_status.as_str()); - match resp_tag_res { - Ok(resp_tag) => { - warn!( - response_code = response_status.as_u16(), - "HTTP error response received from agent" - ); - let type_tag = Tag::new("type", response_status.as_str()) - .unwrap_or_else(|_| tag!("type", "unknown")); - self.emit_metric( - HealthMetric::Count(health_metrics::TRANSPORT_TRACES_FAILED, 1), - Some(vec![&resp_tag, &type_tag]), - ); - if response_status.as_u16() != 404 && response_status.as_u16() != 415 { - self.emit_metric( - HealthMetric::Distribution( - health_metrics::TRANSPORT_DROPPED_BYTES, - payload_size as i64, - ), - None, - ); - self.emit_metric( - HealthMetric::Distribution( - health_metrics::TRANSPORT_TRACES_DROPPED, - trace_count as i64, - ), - None, - ); - } - } - Err(tag_err) => { - error!(?tag_err, "Failed to serialize response_code to tag") - } - } - } - - /// Emit a health metric to dogstatsd - fn emit_metric(&self, metric: HealthMetric, custom_tags: Option>) { - if self.health_metrics_enabled { - let emitter = MetricsEmitter::new(self.dogstatsd, self.common_stats_tags); - emitter.emit(metric, custom_tags); - } - } } #[cfg(test)] mod tests { use super::*; use crate::trace_exporter::TracerMetadata; - use bytes::Bytes; - use hyper::{Response, StatusCode}; - use libdd_common::tag; fn create_test_metadata() -> TracerMetadata { TracerMetadata { @@ -260,20 +110,15 @@ mod tests { #[test] fn test_transport_client_new() { let metadata = create_test_metadata(); - let tags = vec![tag!("env", "test")]; - let client = TransportClient::new(&metadata, true, None, &tags); + let client = TransportClient::new(&metadata); - assert!(client.health_metrics_enabled); - assert!(client.dogstatsd.is_none()); - assert_eq!(client.common_stats_tags.len(), 1); assert_eq!(client.metadata.service, "test-service"); } #[test] fn test_build_trace_request() { let metadata = create_test_metadata(); - let tags = vec![tag!("test", "value")]; - let client = TransportClient::new(&metadata, false, None, &tags); + let client = TransportClient::new(&metadata); let uri = "http://localhost:8126/v0.4/traces".parse().unwrap(); let data = b"test payload"; let trace_count = 5; @@ -299,106 +144,13 @@ mod tests { assert_eq!(headers.get("datadog-meta-tracer-version").unwrap(), "1.0.0"); } - #[tokio::test] - async fn test_handle_http_success_response() { - let metadata = create_test_metadata(); - let tags = vec![tag!("test", "value")]; - let client = TransportClient::new(&metadata, false, None, &tags); - let body = r#"{"rate_by_service": {"service:test": 1.0}}"#; - let response = - hyper_migration::mock_response(Response::builder().status(200), Bytes::from(body)) - .unwrap(); - - let result = client.handle_http_success_response(response, 10).await; - - assert!(result.is_ok()); - match result.unwrap() { - AgentResponse::Changed { - body: response_body, - } => { - assert_eq!(response_body, body); - } - _ => panic!("Expected Changed response"), - } - } - - #[tokio::test] - async fn test_handle_http_error_response() { - let metadata = create_test_metadata(); - let tags = vec![tag!("test", "value")]; - let client = TransportClient::new(&metadata, false, None, &tags); - let error_body = r#"{"error": "Bad Request"}"#; - let response = hyper_migration::mock_response( - Response::builder().status(400), - Bytes::from(error_body), - ) - .unwrap(); - - let result = client.handle_http_error_response(response, 1024, 5).await; - - assert!(result.is_err()); - match result.unwrap_err() { - TraceExporterError::Request(req_err) => { - assert_eq!(req_err.status(), StatusCode::BAD_REQUEST); - } - _ => panic!("Expected Request error"), - } - } - - #[tokio::test] - async fn test_process_http_response_success() { - let metadata = create_test_metadata(); - let tags = vec![tag!("test", "value")]; - let client = TransportClient::new(&metadata, false, None, &tags); - let body = r#"{"success": true}"#; - let response = - hyper_migration::mock_response(Response::builder().status(200), Bytes::from(body)) - .unwrap(); - - let result = client.process_http_response(response, 3, 512).await; - - assert!(result.is_ok()); - match result.unwrap() { - AgentResponse::Changed { - body: response_body, - } => { - assert_eq!(response_body, body); - } - _ => panic!("Expected Changed response"), - } - } - - #[tokio::test] - async fn test_process_http_response_error() { - let metadata = create_test_metadata(); - let tags = vec![tag!("test", "value")]; - let client = TransportClient::new(&metadata, false, None, &tags); - let error_body = r#"{"error": "Internal Server Error"}"#; - let response = hyper_migration::mock_response( - Response::builder().status(500), - Bytes::from(error_body), - ) - .unwrap(); - - let result = client.process_http_response(response, 2, 256).await; - - assert!(result.is_err()); - match result.unwrap_err() { - TraceExporterError::Request(req_err) => { - assert_eq!(req_err.status(), StatusCode::INTERNAL_SERVER_ERROR); - } - _ => panic!("Expected Request error"), - } - } - #[test] fn test_request_headers_metadata_integration() { let mut metadata = create_test_metadata(); metadata.language = "python".to_string(); metadata.tracer_version = "2.0.0".to_string(); - let tags = vec![tag!("region", "us-east-1")]; - let client = TransportClient::new(&metadata, false, None, &tags); + let client = TransportClient::new(&metadata); let uri = "http://localhost:8126/v0.4/traces".parse().unwrap(); let data = b"test";