Skip to content

Commit feb4ec9

Browse files
authored
Add traceparent header parsing for w3c tracecontext (#2179)
Support OpenTracing context propagation. Proxy will now participate in traces that are started with the `w3c` traceparent header, and will emit spans if the request should be sampled. Previously supported format (`b3`) can still be used, provided no other tracing headers are present on the request. When parsing trace headers, the proxy will now attempt to parse a `w3c` header first, and if not present, will attempt to parse `b3`. Signed-off-by: Matei David <[email protected]>
1 parent 0ce21db commit feb4ec9

File tree

3 files changed

+347
-181
lines changed

3 files changed

+347
-181
lines changed

linkerd/trace-context/src/propagation.rs

Lines changed: 38 additions & 181 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,17 @@
11
use crate::{Flags, Id, InsufficientBytes};
22
use bytes::Bytes;
3-
use http::header::HeaderValue;
4-
use linkerd_error::Error;
5-
use rand::thread_rng;
6-
use thiserror::Error;
7-
use tracing::{trace, warn};
83

9-
const HTTP_TRACE_ID_HEADER: &str = "x-b3-traceid";
10-
const HTTP_SPAN_ID_HEADER: &str = "x-b3-spanid";
11-
const HTTP_SAMPLED_HEADER: &str = "x-b3-sampled";
4+
use thiserror::Error;
5+
use tracing::debug;
126

13-
const GRPC_TRACE_HEADER: &str = "grpc-trace-bin";
14-
const GRPC_TRACE_FIELD_TRACE_ID: u8 = 0;
15-
const GRPC_TRACE_FIELD_SPAN_ID: u8 = 1;
16-
const GRPC_TRACE_FIELD_TRACE_OPTIONS: u8 = 2;
7+
mod b3;
8+
mod w3c;
179

1810
#[derive(Debug)]
1911
pub enum Propagation {
20-
Http,
21-
Grpc,
12+
B3Http,
13+
B3Grpc,
14+
W3CHttp,
2215
}
2316

2417
#[derive(Debug)]
@@ -41,189 +34,53 @@ impl TraceContext {
4134
}
4235
}
4336

37+
/// Given an http request, attempt to unpack a distributed tracing context from
38+
/// the headers. Only w3c and b3 context propagation formats are supported. The
39+
/// former is tried first, and if no headers are present, function will attempt
40+
/// to unpack as b3.
4441
pub fn unpack_trace_context<B>(request: &http::Request<B>) -> Option<TraceContext> {
45-
unpack_grpc_trace_context(request).or_else(|| unpack_http_trace_context(request))
42+
// Attempt to parse as w3c first since it's the newest interface in
43+
// distributed tracing ecosystem
44+
w3c::unpack_w3c_trace_context(request)
45+
.or_else(|| b3::unpack_grpc_trace_context(request))
46+
.or_else(|| b3::unpack_http_trace_context(request))
4647
}
4748

4849
// Generates a new span id, writes it to the request in the appropriate
4950
// propagation format and returns the generated span id.
5051
pub fn increment_span_id<B>(request: &mut http::Request<B>, context: &TraceContext) -> Id {
5152
match context.propagation {
52-
Propagation::Grpc => increment_grpc_span_id(request, context),
53-
Propagation::Http => increment_http_span_id(request),
54-
}
55-
}
56-
57-
fn unpack_grpc_trace_context<B>(request: &http::Request<B>) -> Option<TraceContext> {
58-
get_header_str(request, GRPC_TRACE_HEADER)
59-
.and_then(|header_str| {
60-
base64::decode(header_str)
61-
.map_err(|e| {
62-
warn!(
63-
"trace header {} is not base64 encoded: {}",
64-
GRPC_TRACE_HEADER, e
65-
)
66-
})
67-
.ok()
68-
})
69-
.and_then(|vec| {
70-
let mut bytes = vec.into();
71-
parse_grpc_trace_context_fields(&mut bytes)
72-
})
73-
}
74-
75-
fn parse_grpc_trace_context_fields(buf: &mut Bytes) -> Option<TraceContext> {
76-
trace!(message = "reading binary trace context", ?buf);
77-
78-
let _version = try_split_to(buf, 1).ok()?;
79-
80-
let mut context = TraceContext {
81-
propagation: Propagation::Grpc,
82-
trace_id: Default::default(),
83-
parent_id: Default::default(),
84-
flags: Default::default(),
85-
};
86-
87-
while !buf.is_empty() {
88-
match parse_grpc_trace_context_field(buf, &mut context) {
89-
Ok(()) => {}
90-
Err(ref e) if e.is::<UnknownFieldId>() => break,
91-
Err(e) => {
92-
warn!("error parsing {} header: {}", GRPC_TRACE_HEADER, e);
93-
return None;
94-
}
95-
};
96-
}
97-
Some(context)
98-
}
99-
100-
fn parse_grpc_trace_context_field(
101-
buf: &mut Bytes,
102-
context: &mut TraceContext,
103-
) -> Result<(), Error> {
104-
let field_id = try_split_to(buf, 1)?[0];
105-
match field_id {
106-
GRPC_TRACE_FIELD_SPAN_ID => {
107-
let id = try_split_to(buf, 8)?;
108-
trace!(
109-
"reading binary trace field {:?}: {:?}",
110-
GRPC_TRACE_FIELD_SPAN_ID,
111-
id
112-
);
113-
context.parent_id = id.into();
114-
}
115-
GRPC_TRACE_FIELD_TRACE_ID => {
116-
let id = try_split_to(buf, 16)?;
117-
trace!(
118-
"reading binary trace field {:?}: {:?}",
119-
GRPC_TRACE_FIELD_TRACE_ID,
120-
id
121-
);
122-
context.trace_id = id.into();
123-
}
124-
GRPC_TRACE_FIELD_TRACE_OPTIONS => {
125-
let flags = try_split_to(buf, 1)?;
126-
trace!(
127-
"reading binary trace field {:?}: {:?}",
128-
GRPC_TRACE_FIELD_TRACE_OPTIONS,
129-
flags
130-
);
131-
context.flags = flags.try_into()?;
132-
}
133-
id => {
134-
return Err(UnknownFieldId(id).into());
135-
}
136-
};
137-
Ok(())
138-
}
139-
140-
// This code looks significantly weirder if some of the elements are added using
141-
// the `vec![]` macro, despite clippy's suggestions otherwise...
142-
#[allow(clippy::vec_init_then_push)]
143-
fn increment_grpc_span_id<B>(request: &mut http::Request<B>, context: &TraceContext) -> Id {
144-
let span_id = Id::new_span_id(&mut thread_rng());
145-
146-
trace!(message = "incremented span id", %span_id);
147-
148-
let mut bytes = Vec::<u8>::new();
149-
150-
// version
151-
bytes.push(0);
152-
153-
// trace id
154-
bytes.push(GRPC_TRACE_FIELD_TRACE_ID);
155-
bytes.extend(context.trace_id.0.iter());
156-
157-
// span id
158-
bytes.push(GRPC_TRACE_FIELD_SPAN_ID);
159-
bytes.extend(span_id.0.iter());
160-
161-
// trace options
162-
bytes.push(GRPC_TRACE_FIELD_TRACE_OPTIONS);
163-
bytes.push(context.flags.0);
164-
165-
let bytes_b64 = base64::encode(&bytes);
166-
167-
if let Result::Ok(hv) = HeaderValue::from_str(&bytes_b64) {
168-
request.headers_mut().insert(GRPC_TRACE_HEADER, hv);
169-
} else {
170-
warn!("invalid header: {:?}", &bytes_b64);
53+
Propagation::B3Grpc => b3::increment_grpc_span_id(request, context),
54+
Propagation::B3Http => b3::increment_http_span_id(request),
55+
Propagation::W3CHttp => w3c::increment_http_span_id(request, context),
17156
}
172-
span_id
173-
}
174-
175-
fn unpack_http_trace_context<B>(request: &http::Request<B>) -> Option<TraceContext> {
176-
let parent_id = parse_header_id(request, HTTP_SPAN_ID_HEADER, 8)?;
177-
let trace_id = parse_header_id(request, HTTP_TRACE_ID_HEADER, 16)?;
178-
let flags = match get_header_str(request, HTTP_SAMPLED_HEADER) {
179-
Some("1") => Flags(1),
180-
_ => Flags(0),
181-
};
182-
Some(TraceContext {
183-
propagation: Propagation::Http,
184-
trace_id,
185-
parent_id,
186-
flags,
187-
})
18857
}
18958

190-
fn increment_http_span_id<B>(request: &mut http::Request<B>) -> Id {
191-
let span_id = Id::new_span_id(&mut thread_rng());
192-
193-
trace!("incremented span id: {}", span_id);
194-
195-
let span_str = hex::encode(span_id.as_ref());
196-
197-
if let Result::Ok(hv) = HeaderValue::from_str(&span_str) {
198-
request.headers_mut().insert(HTTP_SPAN_ID_HEADER, hv);
199-
} else {
200-
warn!("invalid {} header: {:?}", HTTP_SPAN_ID_HEADER, span_str);
201-
}
202-
span_id
203-
}
59+
// === Header parse utils ===
20460

205-
fn get_header_str<'a, B>(request: &'a http::Request<B>, header: &str) -> Option<&'a str> {
61+
fn get_header_str<'a, B>(
62+
request: &'a http::Request<B>,
63+
header: &http::header::HeaderName,
64+
) -> Option<&'a str> {
20665
let hv = request.headers().get(header)?;
20766
hv.to_str()
208-
.map_err(|e| warn!("Invalid trace header {}: {}", header, e))
67+
.map_err(|_| debug!(header_value = %header, "Invalid non-ASCII or control character in header value"))
20968
.ok()
21069
}
21170

212-
fn parse_header_id<B>(request: &http::Request<B>, header: &str, pad_to: usize) -> Option<Id> {
213-
let header_value = get_header_str(request, header)?;
214-
hex::decode(header_value)
215-
.map(|mut data| {
216-
if data.len() < pad_to {
217-
let padding = pad_to - data.len();
218-
let mut padded = vec![0u8; padding];
219-
padded.append(&mut data);
220-
Id(padded)
221-
} else {
222-
Id(data)
223-
}
224-
})
225-
.map_err(|e| warn!("Header {} does not contain a hex value: {}", header, e))
226-
.ok()
71+
// Attempt to decode a hex value to an id, padding the buffer up to the
72+
// specified argument. Used to decode header values from hex to binary.
73+
fn decode_id_with_padding(value: &str, pad_to: usize) -> Result<Id, hex::FromHexError> {
74+
hex::decode(value).map(|mut data| {
75+
if data.len() < pad_to {
76+
let padding = pad_to - data.len();
77+
let mut padded = vec![0u8; padding];
78+
padded.append(&mut data);
79+
Id(padded)
80+
} else {
81+
Id(data)
82+
}
83+
})
22784
}
22885

22986
/// Attempt to split_to the given index. If there are not enough bytes then

0 commit comments

Comments
 (0)