Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions linkerd/trace-context/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ use thiserror::Error;

const SPAN_ID_LEN: usize = 8;

#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub struct Id(Vec<u8>);

#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub struct Flags(u8);

#[derive(Debug, Error)]
Expand Down
142 changes: 133 additions & 9 deletions linkerd/trace-context/src/propagation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,20 @@ const HTTP_TRACE_ID_HEADER: &str = "x-b3-traceid";
const HTTP_SPAN_ID_HEADER: &str = "x-b3-spanid";
const HTTP_SAMPLED_HEADER: &str = "x-b3-sampled";

const W3C_HTTP_TRACEPARENT: &str = "traceparent";
const W3C_HEADER_VALUE_SEPARATOR: &str = "-";
const W3C_HEADER_VERSION: &str = "00";

const GRPC_TRACE_HEADER: &str = "grpc-trace-bin";
const GRPC_TRACE_FIELD_TRACE_ID: u8 = 0;
const GRPC_TRACE_FIELD_SPAN_ID: u8 = 1;
const GRPC_TRACE_FIELD_TRACE_OPTIONS: u8 = 2;

#[derive(Debug)]
pub enum Propagation {
Http,
Grpc,
B3Http,
B3Grpc,
TraceContext,
}

#[derive(Debug)]
Expand All @@ -42,16 +47,111 @@ impl TraceContext {
}

pub fn unpack_trace_context<B>(request: &http::Request<B>) -> Option<TraceContext> {
unpack_grpc_trace_context(request).or_else(|| unpack_http_trace_context(request))
// Attempt to parse as w3c first since it's the newest interface in
// distributed tracing ecosystem
unpack_w3c_trace_context(request)
.or_else(|| unpack_grpc_trace_context(request))
.or_else(|| unpack_http_trace_context(request))
}

// Generates a new span id, writes it to the request in the appropriate
// propagation format and returns the generated span id.
pub fn increment_span_id<B>(request: &mut http::Request<B>, context: &TraceContext) -> Id {
match context.propagation {
Propagation::Grpc => increment_grpc_span_id(request, context),
Propagation::Http => increment_http_span_id(request),
Propagation::B3Grpc => increment_grpc_span_id(request, context),
Propagation::B3Http => increment_http_span_id(request),
Propagation::TraceContext => {
increment_w3c_http_span_id(request, context.trace_id.clone(), context.flags.clone())
}
}
}

fn unpack_w3c_trace_context<B>(request: &http::Request<B>) -> Option<TraceContext> {
get_header_str(request, W3C_HTTP_TRACEPARENT).and_then(parse_w3c_context)
}

fn parse_w3c_context(header_value: &str) -> Option<TraceContext> {
let rest = match header_value.split_once(W3C_HEADER_VALUE_SEPARATOR) {
Some((version, rest)) => {
if version != W3C_HEADER_VERSION {
warn!(
"Tracecontext header {} contains invalid version: {}",
header_value, version
);
return None;
}
rest
}
None => {
warn!("Tracecontext header {} is invalid", header_value);
return None;
}
};

let (trace_id, rest) = if let Some((id, rest)) = parse_w3c_header_value(rest, 16) {
(id, rest)
} else {
warn!("Tracecontext header {} contains invalid id", header_value);
return None;
};

let (parent_id, rest) = if let Some((id, rest)) = parse_w3c_header_value(rest, 8) {
(id, rest)
} else {
warn!("Tracecontext header {} contains invalid id", header_value);
return None;
};

let flags = hex::decode(rest)
.map_err(|e| {
warn!(
"Tracecontext header {} contains invalid flags value: {}",
header_value, e
)
})
.ok()
.map(|data| Flags(data[0] & 0x1))
.unwrap_or(Flags(0));

Some(TraceContext {
propagation: Propagation::TraceContext,
trace_id,
parent_id,
flags,
})
}

fn parse_w3c_header_value(next_header: &str, pad_to: usize) -> Option<(Id, &str)> {
next_header
.split_once(W3C_HEADER_VALUE_SEPARATOR)
.filter(|(id, _rest)| !id.chars().all(|c| c == '0'))
.and_then(|(id, rest)| decode_id_with_padding(id, pad_to).zip(Some(rest)))
}

fn increment_w3c_http_span_id<B>(request: &mut http::Request<B>, trace_id: Id, flags: Flags) -> Id {
let span_id = Id::new_span_id(&mut thread_rng());

trace!("incremented span id: {}", span_id);

let new_header = {
let mut buf = String::with_capacity(60);
buf.push_str(W3C_HEADER_VERSION);
buf.push_str(W3C_HEADER_VALUE_SEPARATOR);
buf.push_str(&hex::encode(trace_id));
buf.push_str(W3C_HEADER_VALUE_SEPARATOR);
buf.push_str(&hex::encode(span_id.as_ref()));
buf.push_str(W3C_HEADER_VALUE_SEPARATOR);
buf.push_str(&hex::encode(vec![flags.0]));
buf
};

if let Result::Ok(hv) = HeaderValue::from_str(&new_header) {
request.headers_mut().insert(W3C_HTTP_TRACEPARENT, hv);
} else {
warn!("invalid value {} for tracecontext header", new_header);
}

span_id
}

fn unpack_grpc_trace_context<B>(request: &http::Request<B>) -> Option<TraceContext> {
Expand All @@ -78,7 +178,7 @@ fn parse_grpc_trace_context_fields(buf: &mut Bytes) -> Option<TraceContext> {
let _version = try_split_to(buf, 1).ok()?;

let mut context = TraceContext {
propagation: Propagation::Grpc,
propagation: Propagation::B3Grpc,
trace_id: Default::default(),
parent_id: Default::default(),
flags: Default::default(),
Expand Down Expand Up @@ -180,7 +280,7 @@ fn unpack_http_trace_context<B>(request: &http::Request<B>) -> Option<TraceConte
_ => Flags(0),
};
Some(TraceContext {
propagation: Propagation::Http,
propagation: Propagation::B3Http,
trace_id,
parent_id,
flags,
Expand Down Expand Up @@ -211,7 +311,13 @@ fn get_header_str<'a, B>(request: &'a http::Request<B>, header: &str) -> Option<

fn parse_header_id<B>(request: &http::Request<B>, header: &str, pad_to: usize) -> Option<Id> {
let header_value = get_header_str(request, header)?;
hex::decode(header_value)
decode_id_with_padding(header_value, pad_to)
}

/// Attempt to decode a hex value to an id, padding the buffer up to the
/// specified argument. Used to decode header values from hex to binary.
fn decode_id_with_padding(value: &str, pad_to: usize) -> Option<Id> {
hex::decode(value)
.map(|mut data| {
if data.len() < pad_to {
let padding = pad_to - data.len();
Expand All @@ -222,7 +328,7 @@ fn parse_header_id<B>(request: &http::Request<B>, header: &str, pad_to: usize) -
Id(data)
}
})
.map_err(|e| warn!("Header {} does not contain a hex value: {}", header, e))
.map_err(|e| warn!("Header value {} does not contain a hex: {}", value, e))
.ok()
}

Expand All @@ -235,3 +341,21 @@ fn try_split_to(buf: &mut Bytes, n: usize) -> Result<Bytes, InsufficientBytes> {
Err(InsufficientBytes)
}
}

#[test]
fn test_w3c_context_parsed_successfully() {
let input = "00-94d7f6ec6b95f3e916179cb6cfd01390-55ccfce77f972614-01";
let actual = parse_w3c_context(input);

let expected_trace = hex::decode("94d7f6ec6b95f3e916179cb6cfd01390")
.expect("Failed to decode trace parent from hex");
let expected_parent =
hex::decode("55ccfce77f972614").expect("Failed to decode span parent from hex");
let expected_flags = 1;

assert!(actual.is_some());
let actual = actual.unwrap();
assert_eq!(expected_trace, actual.trace_id.0);
assert_eq!(expected_parent, actual.parent_id.0);
assert_eq!(expected_flags, actual.flags.0);
}