Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 38 additions & 181 deletions linkerd/trace-context/src/propagation.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,17 @@
use crate::{Flags, Id, InsufficientBytes};
use bytes::Bytes;
use http::header::HeaderValue;
use linkerd_error::Error;
use rand::thread_rng;
use thiserror::Error;
use tracing::{trace, warn};

const HTTP_TRACE_ID_HEADER: &str = "x-b3-traceid";
const HTTP_SPAN_ID_HEADER: &str = "x-b3-spanid";
const HTTP_SAMPLED_HEADER: &str = "x-b3-sampled";
use thiserror::Error;
use tracing::debug;

const GRPC_TRACE_HEADER: &str = "grpc-trace-bin";
const GRPC_TRACE_FIELD_TRACE_ID: u8 = 0;
const GRPC_TRACE_FIELD_SPAN_ID: u8 = 1;
const GRPC_TRACE_FIELD_TRACE_OPTIONS: u8 = 2;
mod b3;
mod w3c;

#[derive(Debug)]
pub enum Propagation {
Http,
Grpc,
B3Http,
B3Grpc,
W3CHttp,
}

#[derive(Debug)]
Expand All @@ -41,189 +34,53 @@ impl TraceContext {
}
}

/// Given an http request, attempt to unpack a distributed tracing context from
/// the headers. Only w3c and b3 context propagation formats are supported. The
/// former is tried first, and if no headers are present, function will attempt
/// to unpack as b3.
pub fn unpack_trace_context<B>(request: &http::Request<B>) -> Option<TraceContext> {
unpack_grpc_trace_context(request).or_else(|| unpack_http_trace_context(request))
// Attempt to parse as w3c first since it's the newest interface in
// distributed tracing ecosystem
w3c::unpack_w3c_trace_context(request)
.or_else(|| b3::unpack_grpc_trace_context(request))
.or_else(|| b3::unpack_http_trace_context(request))
}

// Generates a new span id, writes it to the request in the appropriate
// propagation format and returns the generated span id.
pub fn increment_span_id<B>(request: &mut http::Request<B>, context: &TraceContext) -> Id {
match context.propagation {
Propagation::Grpc => increment_grpc_span_id(request, context),
Propagation::Http => increment_http_span_id(request),
}
}

fn unpack_grpc_trace_context<B>(request: &http::Request<B>) -> Option<TraceContext> {
get_header_str(request, GRPC_TRACE_HEADER)
.and_then(|header_str| {
base64::decode(header_str)
.map_err(|e| {
warn!(
"trace header {} is not base64 encoded: {}",
GRPC_TRACE_HEADER, e
)
})
.ok()
})
.and_then(|vec| {
let mut bytes = vec.into();
parse_grpc_trace_context_fields(&mut bytes)
})
}

fn parse_grpc_trace_context_fields(buf: &mut Bytes) -> Option<TraceContext> {
trace!(message = "reading binary trace context", ?buf);

let _version = try_split_to(buf, 1).ok()?;

let mut context = TraceContext {
propagation: Propagation::Grpc,
trace_id: Default::default(),
parent_id: Default::default(),
flags: Default::default(),
};

while !buf.is_empty() {
match parse_grpc_trace_context_field(buf, &mut context) {
Ok(()) => {}
Err(ref e) if e.is::<UnknownFieldId>() => break,
Err(e) => {
warn!("error parsing {} header: {}", GRPC_TRACE_HEADER, e);
return None;
}
};
}
Some(context)
}

fn parse_grpc_trace_context_field(
buf: &mut Bytes,
context: &mut TraceContext,
) -> Result<(), Error> {
let field_id = try_split_to(buf, 1)?[0];
match field_id {
GRPC_TRACE_FIELD_SPAN_ID => {
let id = try_split_to(buf, 8)?;
trace!(
"reading binary trace field {:?}: {:?}",
GRPC_TRACE_FIELD_SPAN_ID,
id
);
context.parent_id = id.into();
}
GRPC_TRACE_FIELD_TRACE_ID => {
let id = try_split_to(buf, 16)?;
trace!(
"reading binary trace field {:?}: {:?}",
GRPC_TRACE_FIELD_TRACE_ID,
id
);
context.trace_id = id.into();
}
GRPC_TRACE_FIELD_TRACE_OPTIONS => {
let flags = try_split_to(buf, 1)?;
trace!(
"reading binary trace field {:?}: {:?}",
GRPC_TRACE_FIELD_TRACE_OPTIONS,
flags
);
context.flags = flags.try_into()?;
}
id => {
return Err(UnknownFieldId(id).into());
}
};
Ok(())
}

// This code looks significantly weirder if some of the elements are added using
// the `vec![]` macro, despite clippy's suggestions otherwise...
#[allow(clippy::vec_init_then_push)]
fn increment_grpc_span_id<B>(request: &mut http::Request<B>, context: &TraceContext) -> Id {
let span_id = Id::new_span_id(&mut thread_rng());

trace!(message = "incremented span id", %span_id);

let mut bytes = Vec::<u8>::new();

// version
bytes.push(0);

// trace id
bytes.push(GRPC_TRACE_FIELD_TRACE_ID);
bytes.extend(context.trace_id.0.iter());

// span id
bytes.push(GRPC_TRACE_FIELD_SPAN_ID);
bytes.extend(span_id.0.iter());

// trace options
bytes.push(GRPC_TRACE_FIELD_TRACE_OPTIONS);
bytes.push(context.flags.0);

let bytes_b64 = base64::encode(&bytes);

if let Result::Ok(hv) = HeaderValue::from_str(&bytes_b64) {
request.headers_mut().insert(GRPC_TRACE_HEADER, hv);
} else {
warn!("invalid header: {:?}", &bytes_b64);
Propagation::B3Grpc => b3::increment_grpc_span_id(request, context),
Propagation::B3Http => b3::increment_http_span_id(request),
Propagation::W3CHttp => w3c::increment_http_span_id(request, context),
}
span_id
}

fn unpack_http_trace_context<B>(request: &http::Request<B>) -> Option<TraceContext> {
let parent_id = parse_header_id(request, HTTP_SPAN_ID_HEADER, 8)?;
let trace_id = parse_header_id(request, HTTP_TRACE_ID_HEADER, 16)?;
let flags = match get_header_str(request, HTTP_SAMPLED_HEADER) {
Some("1") => Flags(1),
_ => Flags(0),
};
Some(TraceContext {
propagation: Propagation::Http,
trace_id,
parent_id,
flags,
})
}

fn increment_http_span_id<B>(request: &mut http::Request<B>) -> Id {
let span_id = Id::new_span_id(&mut thread_rng());

trace!("incremented span id: {}", span_id);

let span_str = hex::encode(span_id.as_ref());

if let Result::Ok(hv) = HeaderValue::from_str(&span_str) {
request.headers_mut().insert(HTTP_SPAN_ID_HEADER, hv);
} else {
warn!("invalid {} header: {:?}", HTTP_SPAN_ID_HEADER, span_str);
}
span_id
}
// === Header parse utils ===

fn get_header_str<'a, B>(request: &'a http::Request<B>, header: &str) -> Option<&'a str> {
fn get_header_str<'a, B>(
request: &'a http::Request<B>,
header: &http::header::HeaderName,
) -> Option<&'a str> {
let hv = request.headers().get(header)?;
hv.to_str()
.map_err(|e| warn!("Invalid trace header {}: {}", header, e))
.map_err(|_| debug!(header_value = %header, "Invalid non-ASCII or control character in header value"))
.ok()
}

fn parse_header_id<B>(request: &http::Request<B>, header: &str, pad_to: usize) -> Option<Id> {
let header_value = get_header_str(request, header)?;
hex::decode(header_value)
.map(|mut data| {
if data.len() < pad_to {
let padding = pad_to - data.len();
let mut padded = vec![0u8; padding];
padded.append(&mut data);
Id(padded)
} else {
Id(data)
}
})
.map_err(|e| warn!("Header {} does not contain a hex value: {}", header, e))
.ok()
// Attempt to decode a hex value to an id, padding the buffer up to the
// specified argument. Used to decode header values from hex to binary.
fn decode_id_with_padding(value: &str, pad_to: usize) -> Result<Id, hex::FromHexError> {
hex::decode(value).map(|mut data| {
if data.len() < pad_to {
let padding = pad_to - data.len();
let mut padded = vec![0u8; padding];
padded.append(&mut data);
Id(padded)
} else {
Id(data)
}
})
}

/// Attempt to split_to the given index. If there are not enough bytes then
Expand Down
Loading