diff --git a/Cargo.lock b/Cargo.lock index 173c516e24..d145e2042d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2512,6 +2512,7 @@ dependencies = [ "once_cell", "opentelemetry", "opentelemetry-otlp", + "opentelemetry-proto", "opentelemetry-semantic-conventions", "opentelemetry_sdk", "rand 0.8.6", @@ -2536,9 +2537,11 @@ dependencies = [ "testcontainers-modules", "thiserror 2.0.18", "tokio", + "tokio-stream", "tokio-tungstenite 0.28.0", "toml", "toml_edit", + "tonic", "tower", "tower-http", "tracing", diff --git a/engine/Cargo.toml b/engine/Cargo.toml index 2f26e2eff2..2a90853c41 100644 --- a/engine/Cargo.toml +++ b/engine/Cargo.toml @@ -103,7 +103,9 @@ tracing-opentelemetry = "0.32" opentelemetry = "0.31" opentelemetry_sdk = { version = "0.31", features = ["rt-tokio"] } opentelemetry-otlp = { version = "0.31", features = ["tonic", "grpc-tonic", "logs", "tls-roots"] } +opentelemetry-proto = { version = "0.31", features = ["gen-tonic"] } opentelemetry-semantic-conventions = "0.31" +tonic = { version = "0.14", features = ["transport", "tls-native-roots"] } futures = "0.3" futures-util = "0.3" http-body-util = "0.1" @@ -160,6 +162,7 @@ which = "7" [dev-dependencies] tokio = { version = "1", features = ["test-util"] } +tokio-stream = { version = "0.1", features = ["net"] } mockall = "0.13" serial_test = "3" temp-env = "0.3" diff --git a/engine/src/workers/observability/otel.rs b/engine/src/workers/observability/otel.rs index 48bd4e5dd0..73b096d34b 100644 --- a/engine/src/workers/observability/otel.rs +++ b/engine/src/workers/observability/otel.rs @@ -21,6 +21,10 @@ use opentelemetry::{ trace::{TraceContextExt, TracerProvider as _}, }; use opentelemetry_otlp::WithExportConfig; +use opentelemetry_proto::tonic::collector::trace::v1::{ + ExportTraceServiceRequest as ProtoExportTraceServiceRequest, + trace_service_client::TraceServiceClient, +}; use opentelemetry_sdk::{ Resource, error::OTelSdkResult, @@ -34,6 +38,8 @@ use std::sync::{Arc, OnceLock, RwLock}; use std::time::Duration; use tokio::sync::broadcast; use tokio::task::JoinHandle; +use tonic::metadata::{Ascii, MetadataKey, MetadataValue}; +use tonic::transport::{Channel, ClientTlsConfig, Endpoint}; use tracing::Subscriber; use tracing_opentelemetry::OpenTelemetryLayer; use tracing_subscriber::registry::LookupSpan; @@ -655,29 +661,170 @@ impl SpanExporter for TeeSpanExporter { static TRACER_PROVIDER: OnceLock = OnceLock::new(); -/// Global OTLP exporter for forwarding SDK-ingested spans to the collector. -/// `SpanExporter::export` takes `&self`, so no Mutex is needed. -static SDK_SPAN_FORWARDER: OnceLock> = OnceLock::new(); - -/// Build a second OTLP span exporter and store it in the global `SDK_SPAN_FORWARDER` -/// so that SDK-ingested spans can be forwarded to the collector. -fn init_sdk_span_forwarder(endpoint: &str) { - match opentelemetry_otlp::SpanExporter::builder() - .with_tonic() - .with_endpoint(endpoint) - .build() - { - Ok(forwarder) => { - if SDK_SPAN_FORWARDER.set(Arc::new(forwarder)).is_err() { - tracing::debug!("SDK span forwarder already initialized"); +/// State for the SDK-span forwarder hop: a tonic channel to the collector +/// plus the parsed `OTEL_EXPORTER_OTLP_HEADERS` / `OTEL_EXPORTER_OTLP_TRACES_HEADERS` +/// metadata pairs that get injected on every export request. +/// +/// The earlier shape was `Arc`, which +/// internally serialized `SpanData` values back into proto. That conversion +/// dropped `resource_spans[].resource` (the SDK exporter expects its +/// `Resource` to be supplied by the owning `TracerProvider`, but this +/// forwarder is bare), so collectors rendered `service.name=` — +/// see iii-hq/iii#1617. +/// +/// The new shape is a tonic `Channel` plus a parsed-header list. `Channel` +/// is `Arc`-internally and cheap to clone; the forwarder branch clones it +/// per request to build a fresh `TraceServiceClient`, which lets us hand +/// it the original proto `ExportTraceServiceRequest` (translated from the +/// inbound JSON) so the full inbound resource block — not just +/// `service.name` — survives the hop byte-for-byte. The header list +/// replicates the behaviour the previous `opentelemetry_otlp` builder +/// gave us for free: API-keyed collectors (Grafana Cloud, Honeycomb, +/// SigNoz Cloud) configured via the standard OTel env vars keep working. +/// +/// **Test isolation note**: this is a process-global `OnceLock`. Integration +/// tests that call `init_sdk_span_forwarder` MUST live in their own +/// `engine/tests/*.rs` integration-test binary (cargo gives each `tests/` +/// file its own process). Two `#[tokio::test]`s in the same integration +/// binary that both call `init_sdk_span_forwarder` will see the second +/// `OnceLock::set` no-op and the second test will export to the first +/// test's endpoint. If a future need arises (e.g. parameterising the +/// endpoint per-test), add a `#[cfg(test)] pub fn reset_for_test()` that +/// swaps the cell via an `OnceLock` rebuild — do NOT make the static +/// public-write in production. +struct SdkForwarderState { + channel: Channel, + headers: Vec<(MetadataKey, MetadataValue)>, +} + +static SDK_SPAN_FORWARDER: OnceLock = OnceLock::new(); + +/// Parse `OTEL_EXPORTER_OTLP_HEADERS` / `OTEL_EXPORTER_OTLP_TRACES_HEADERS` +/// into tonic-typed metadata pairs. +/// +/// Format per the OTel SDK spec: comma-separated `key=value` pairs (e.g. +/// `authorization=Bearer abc123,x-tenant=acme`). Keys must be valid HTTP/2 +/// header names (ASCII, lowercase by convention); values are ASCII for +/// non-binary metadata. Invalid pairs are skipped with a warning rather +/// than failing the whole init — matches `opentelemetry-otlp`'s tolerance. +/// +/// Signal-specific (`*_TRACES_HEADERS`) takes precedence over generic +/// (`*_HEADERS`) per the OTel spec. +fn parse_otlp_headers_env() -> Vec<(MetadataKey, MetadataValue)> { + let raw = std::env::var("OTEL_EXPORTER_OTLP_TRACES_HEADERS") + .or_else(|_| std::env::var("OTEL_EXPORTER_OTLP_HEADERS")) + .ok(); + let Some(raw) = raw else { + return Vec::new(); + }; + if raw.trim().is_empty() { + return Vec::new(); + } + raw.split(',') + .filter_map(|pair| { + let pair = pair.trim(); + if pair.is_empty() { + return None; } - } + let (k, v) = pair.split_once('=')?; + let k = k.trim(); + let v = v.trim(); + let key = match MetadataKey::::from_bytes(k.as_bytes()) { + Ok(k) => k, + Err(e) => { + tracing::warn!( + error = %e, + header_name = k, + "OTEL_EXPORTER_OTLP_HEADERS entry has invalid header name; skipping" + ); + return None; + } + }; + let value = match MetadataValue::::try_from(v) { + Ok(v) => v, + Err(e) => { + tracing::warn!( + error = %e, + header_name = k, + "OTEL_EXPORTER_OTLP_HEADERS entry has invalid header value; skipping" + ); + return None; + } + }; + Some((key, value)) + }) + .collect() +} + +/// Initialize the SDK-span forwarder channel at `endpoint`. Idempotent — +/// later calls are no-ops once the channel is set. +/// +/// `endpoint` accepts any URL the tonic `Endpoint` builder accepts (e.g. +/// `http://collector:4317`, `https://signoz.example.com:4317`); the channel +/// is connected lazily, so this function does not block on the collector +/// being reachable. Reachability is surfaced later when the first export +/// runs (the failure is logged as a warning, not propagated to the +/// ingesting SDK). +/// +/// Visibility: `pub` so integration tests (which live outside the crate) +/// can point the forwarder at a mock collector. Treat this as an internal +/// API — in-crate callers should keep going through `init_otel`. +pub fn init_sdk_span_forwarder(endpoint: &str) { + let ep = match Endpoint::from_shared(endpoint.to_owned()) { + Ok(ep) => ep, Err(e) => { tracing::warn!( error = %e, - "Failed to create SDK span forwarder, SDK spans will not be exported to collector" + endpoint = endpoint, + "Failed to build SDK span forwarder channel; SDK spans will not be exported to collector" ); + return; } + }; + + // Preserve the pre-#1617 deployment contract: when the endpoint + // is `https://...`, the previous `opentelemetry_otlp::SpanExporter` + // builder configured TLS automatically via its `tls-roots` feature. + // Replicate that here for the raw-tonic channel — without this, + // every TLS-fronted collector (Grafana Cloud, SigNoz Cloud, + // Honeycomb, the standard prod shape) would silently fail at + // handshake time post-merge. + // + // `Endpoint::tls_config` consumes `self`, so we rebind through the + // configured value. On config failure we fall back to the original + // unconfigured endpoint and log a warning; the lazy connect will + // then fail at handshake time against an HTTPS collector, but + // engine init proceeds and HTTP collectors keep working. + let ep = if endpoint.starts_with("https://") { + match ep.tls_config(ClientTlsConfig::new().with_native_roots()) { + Ok(configured) => configured, + Err(e) => { + tracing::warn!( + error = %e, + endpoint = endpoint, + "Failed to configure TLS for SDK span forwarder; channel will attempt plaintext (handshake to an HTTPS collector will fail)" + ); + // Re-build the unconfigured Endpoint so we can still + // return a channel for the (broken) plaintext path. + // Unwrap is safe — same string parsed successfully above. + Endpoint::from_shared(endpoint.to_owned()).unwrap() + } + } + } else { + ep + }; + + let channel = ep.connect_lazy(); + let headers = parse_otlp_headers_env(); + if !headers.is_empty() { + tracing::debug!( + count = headers.len(), + "SDK span forwarder loaded OTEL_EXPORTER_OTLP_HEADERS / OTEL_EXPORTER_OTLP_TRACES_HEADERS entries" + ); + } + let state = SdkForwarderState { channel, headers }; + if SDK_SPAN_FORWARDER.set(state).is_err() { + tracing::debug!("SDK span forwarder already initialized"); } } @@ -1412,215 +1559,16 @@ fn extract_service_name(resource: &Option) -> String { "unknown".to_string() } -/// Convert an OtlpKeyValue to an opentelemetry KeyValue. -fn otlp_kv_to_key_value(kv: &OtlpKeyValue) -> Option { - let val = kv.value.as_ref()?; - - if let Some(s) = &val.string_value { - return Some(KeyValue::new( - kv.key.clone(), - opentelemetry::Value::String(s.clone().into()), - )); - } - if let Some(i) = val.int_value { - return Some(KeyValue::new(kv.key.clone(), opentelemetry::Value::I64(i))); - } - if let Some(d) = val.double_value { - return Some(KeyValue::new(kv.key.clone(), opentelemetry::Value::F64(d))); - } - if let Some(b) = val.bool_value { - return Some(KeyValue::new(kv.key.clone(), opentelemetry::Value::Bool(b))); - } - - // Nested structures: serialize to JSON string representation - if val.kvlist_value.is_some() || val.array_value.is_some() { - let json_str = val.to_string_value(); - return Some(KeyValue::new( - kv.key.clone(), - opentelemetry::Value::String(json_str.into()), - )); - } - - None -} - -/// Convert parsed OTLP spans to SpanData for export via the OTel SDK pipeline. -fn convert_otlp_to_span_data(request: &OtlpExportTraceServiceRequest) -> Vec { - use opentelemetry::trace::{ - Event, Link, SpanContext, SpanKind, Status, TraceFlags, TraceState, - }; - use opentelemetry::{InstrumentationScope, SpanId, TraceId}; - use opentelemetry_sdk::trace::{SpanEvents, SpanLinks}; - use std::borrow::Cow; - use std::time::{Duration, UNIX_EPOCH}; - - let mut span_data_vec = Vec::new(); - - for resource_span in &request.resource_spans { - for scope_span in &resource_span.scope_spans { - let scope = scope_span - .scope - .as_ref() - .map(|s| { - let mut builder = InstrumentationScope::builder(s.name.clone()); - if !s.version.is_empty() { - builder = builder.with_version(s.version.clone()); - } - builder.build() - }) - .unwrap_or_else(|| InstrumentationScope::builder("unknown").build()); - - for span in &scope_span.spans { - // Parse trace and span IDs - let trace_id = match TraceId::from_hex(&span.trace_id) { - Ok(id) => id, - Err(_) => continue, - }; - let span_id = match SpanId::from_hex(&span.span_id) { - Ok(id) => id, - Err(_) => continue, - }; - - let parent_span_id = span - .parent_span_id - .as_ref() - .and_then(|p| { - if p.is_empty() || p.chars().all(|c| c == '0') { - None - } else { - SpanId::from_hex(p).ok() - } - }) - .unwrap_or(SpanId::INVALID); - - // Spans arriving via ingest_otlp_json originate from an external SDK - // process (Node.js), so a valid parent span is always remote. - let parent_span_is_remote = parent_span_id != SpanId::INVALID; - - // Respect incoming W3C trace flags from the OTLP span (lowest 8 bits - // of the u32). Fall back to SAMPLED when absent, since spans - // arriving via OTLP were already exported by the upstream SDK. - let trace_flags = span - .flags - .map(|f| TraceFlags::new(f as u8)) - .unwrap_or(TraceFlags::SAMPLED); - - let span_context = - SpanContext::new(trace_id, span_id, trace_flags, true, TraceState::NONE); - - let start_time = UNIX_EPOCH + Duration::from_nanos(span.start_time_unix_nano.0); - let end_time = UNIX_EPOCH + Duration::from_nanos(span.end_time_unix_nano.0); - - let attributes: Vec = span - .attributes - .iter() - .filter_map(otlp_kv_to_key_value) - .collect(); - - // Determine span kind from the numeric `kind` field, or fall back - // to checking the "otel.kind" attribute string. - let span_kind = span - .kind - .and_then(|k| match k { - 1 => Some(SpanKind::Internal), - 2 => Some(SpanKind::Server), - 3 => Some(SpanKind::Client), - 4 => Some(SpanKind::Producer), - 5 => Some(SpanKind::Consumer), - _ => None, - }) - .or_else(|| { - attributes - .iter() - .find(|kv| kv.key.as_str() == "otel.kind") - .and_then(|kv| match kv.value.as_str().as_ref() { - "client" | "CLIENT" => Some(SpanKind::Client), - "server" | "SERVER" => Some(SpanKind::Server), - "producer" | "PRODUCER" => Some(SpanKind::Producer), - "consumer" | "CONSUMER" => Some(SpanKind::Consumer), - "internal" | "INTERNAL" => Some(SpanKind::Internal), - _ => None, - }) - }) - .unwrap_or(SpanKind::Internal); - - let events: Vec = span - .events - .iter() - .map(|e| { - let ts = UNIX_EPOCH + Duration::from_nanos(e.time_unix_nano.0); - let attrs: Vec = e - .attributes - .iter() - .filter_map(otlp_kv_to_key_value) - .collect(); - Event::new(e.name.clone(), ts, attrs, 0) - }) - .collect(); - - let links: Vec = span - .links - .iter() - .filter_map(|l| { - let lt = TraceId::from_hex(&l.trace_id).ok()?; - let ls = SpanId::from_hex(&l.span_id).ok()?; - let trace_state = l - .trace_state - .as_deref() - .and_then(|ts| ts.parse::().ok()) - .unwrap_or(TraceState::NONE); - // OtlpSpanLink does not expose per-link trace flags in the - // current OTLP spec; default to TraceFlags::SAMPLED. If - // OtlpSpanLink gains a flags field, parse it here via - // TraceFlags::new() and pass to SpanContext::new instead. - let lc = SpanContext::new(lt, ls, TraceFlags::SAMPLED, true, trace_state); - let attrs: Vec = l - .attributes - .iter() - .filter_map(otlp_kv_to_key_value) - .collect(); - Some(Link::new(lc, attrs, 0)) - }) - .collect(); - - let status = match span.status.as_ref() { - Some(s) => match s.code { - 1 => Status::Ok, - 2 => Status::error(s.message.as_deref().unwrap_or("error").to_string()), - _ => Status::Unset, - }, - None => Status::Unset, - }; - - let mut span_events = SpanEvents::default(); - span_events.events = events; - - let mut span_links = SpanLinks::default(); - span_links.links = links; - - let sd = SpanData { - span_context, - parent_span_id, - parent_span_is_remote, - span_kind, - name: Cow::Owned(span.name.clone()), - start_time, - end_time, - attributes, - dropped_attributes_count: 0, - events: span_events, - links: span_links, - status, - instrumentation_scope: scope.clone(), - }; - - span_data_vec.push(sd); - } - } - } - - span_data_vec -} +// `otlp_kv_to_key_value` and `convert_otlp_to_span_data` lived here in +// previous revisions. They translated `OtlpKeyValue`/`OtlpAnyValue` into +// `opentelemetry::KeyValue` and rolled a parsed JSON request into a +// `Vec` for export through a bare `SpanExporter`. That path +// silently dropped `resource_spans[].resource` (iii-hq/iii#1617), so it +// has been replaced by the JSON→proto conversion helpers below +// (`otlp_json_request_to_proto` and friends) which feed +// `TraceServiceClient` directly. The in-memory storage branch of +// `ingest_otlp_json` does its own per-field conversion and never used +// these helpers, so deleting them is scope-bounded to the forwarder fix. /// Ingest OTLP JSON data from Node SDK and merge into in-memory storage. /// @@ -1760,17 +1708,60 @@ pub async fn ingest_otlp_json(json_str: &str) -> anyhow::Result<()> { } } - // Forward to OTLP collector if forwarder is available - if let Some(forwarder) = SDK_SPAN_FORWARDER.get() { - let span_data = convert_otlp_to_span_data(&request); - if !span_data.is_empty() { - let count = span_data.len(); - match forwarder.export(span_data).await { - Ok(()) => { - tracing::debug!(span_count = count, "Forwarded SDK spans to OTLP collector"); + // Forward to OTLP collector if forwarder is available. + // + // We deliberately bypass `convert_otlp_to_span_data` here (in fact, + // that helper has been removed — see the comment near + // `otlp_json_request_to_proto` below for the rationale). The earlier + // path dropped `resource_spans[].resource` because the bare + // `SpanExporter` had no `TracerProvider` to supply a resource at + // export time. Translating the parsed JSON request directly into the + // proto type preserves the full resource block byte-for-byte. See + // iii-hq/iii#1617. + // + // Malformed-span handling: per #1618 review (ytallo's gherkin §"One + // bad span does not poison the rest of the forwarded batch"), + // `otlp_span_to_proto` returns `None` for spans whose `trace_id` / + // `span_id` fail to hex-decode. `otlp_scope_spans_to_proto` then + // filter_maps those out, so valid spans in the same batch reach the + // collector. The in-memory storage path is unchanged. + if let Some(state) = SDK_SPAN_FORWARDER.get() { + let proto_request = otlp_json_request_to_proto(&request); + let span_count: usize = proto_request + .resource_spans + .iter() + .map(|rs| { + rs.scope_spans + .iter() + .map(|ss| ss.spans.len()) + .sum::() + }) + .sum(); + if span_count > 0 { + // Build the tonic Request, then attach the parsed + // OTEL_EXPORTER_OTLP_HEADERS metadata. Replicates the behaviour + // that the previous `opentelemetry_otlp::SpanExporter` got from + // its env-var pickup — without this, API-keyed collectors + // (Grafana Cloud, Honeycomb, SigNoz Cloud) drop forwarded + // spans at auth with no signal at the engine side. + let mut grpc_request = tonic::Request::new(proto_request); + let metadata = grpc_request.metadata_mut(); + for (key, value) in &state.headers { + metadata.insert(key.clone(), value.clone()); + } + let mut client = TraceServiceClient::new(state.channel.clone()); + match client.export(grpc_request).await { + Ok(_) => { + tracing::debug!( + span_count = span_count, + "Forwarded SDK spans to OTLP collector" + ); } Err(e) => { - tracing::warn!(error = ?e, "Failed to forward SDK spans to OTLP collector"); + tracing::warn!( + error = ?e, + "Failed to forward SDK spans to OTLP collector" + ); } } } @@ -1779,6 +1770,222 @@ pub async fn ingest_otlp_json(json_str: &str) -> anyhow::Result<()> { Ok(()) } +// ============================================================================= +// OTLP JSON → proto conversion (forwarder hop, iii-hq/iii#1617) +// ============================================================================= +// +// The Rust structs above (`OtlpExportTraceServiceRequest`, `OtlpResource`, +// `OtlpScopeSpans`, `OtlpSpan`, `OtlpKeyValue`, `OtlpAnyValue`, etc.) are +// JSON-only — they exist so `ingest_otlp_json` can deserialise OTLP/JSON +// payloads emitted by SDKs. +// +// To forward the same payload over OTLP/gRPC without losing +// `resource_spans[].resource` (which is what the old SpanData-based +// forwarder dropped), we translate the parsed JSON structs into the proto +// types from `opentelemetry_proto::tonic::*`. The translation is +// field-for-field and lossless for the data shape iii actually ingests; any +// unknown JSON fields are already dropped at deserialise time, which is +// the same fidelity boundary the proto wire format guarantees. + +fn otlp_json_request_to_proto( + request: &OtlpExportTraceServiceRequest, +) -> ProtoExportTraceServiceRequest { + use opentelemetry_proto::tonic::trace::v1::ResourceSpans as ProtoResourceSpans; + ProtoExportTraceServiceRequest { + resource_spans: request + .resource_spans + .iter() + .map(|rs| ProtoResourceSpans { + resource: rs.resource.as_ref().map(otlp_resource_to_proto), + scope_spans: rs + .scope_spans + .iter() + .map(otlp_scope_spans_to_proto) + .collect(), + schema_url: String::new(), + }) + .collect(), + } +} + +fn otlp_resource_to_proto( + resource: &OtlpResource, +) -> opentelemetry_proto::tonic::resource::v1::Resource { + opentelemetry_proto::tonic::resource::v1::Resource { + attributes: resource.attributes.iter().map(otlp_kv_to_proto).collect(), + dropped_attributes_count: 0, + entity_refs: Vec::new(), + } +} + +fn otlp_scope_spans_to_proto( + scope_spans: &OtlpScopeSpans, +) -> opentelemetry_proto::tonic::trace::v1::ScopeSpans { + opentelemetry_proto::tonic::trace::v1::ScopeSpans { + scope: scope_spans.scope.as_ref().map(otlp_scope_to_proto), + // Per #1618 review (gherkin §"One bad span does not poison the rest + // of the forwarded batch"): drop spans with malformed `trace_id` + // or `span_id` rather than relaying them with empty-byte IDs that + // most collectors reject — wholesale-rejecting a batch because of + // one bad span would be worse than the prior `convert_otlp_to_span_data` + // pre-filtering it deliberately matched. + spans: scope_spans + .spans + .iter() + .filter_map(otlp_span_to_proto) + .collect(), + schema_url: String::new(), + } +} + +fn otlp_scope_to_proto( + scope: &OtlpScope, +) -> opentelemetry_proto::tonic::common::v1::InstrumentationScope { + opentelemetry_proto::tonic::common::v1::InstrumentationScope { + name: scope.name.clone(), + version: scope.version.clone(), + attributes: Vec::new(), + dropped_attributes_count: 0, + } +} + +// OTLP spec-required byte lengths for span identifiers. Encoded as 32 +// and 16 hex characters respectively in the OTLP/JSON form. +const OTLP_TRACE_ID_BYTES: usize = 16; +const OTLP_SPAN_ID_BYTES: usize = 8; + +fn otlp_span_to_proto(span: &OtlpSpan) -> Option { + use opentelemetry_proto::tonic::trace::v1::Span as ProtoSpan; + + // `trace_id` and `span_id` are required by the OTLP spec to be valid + // hex-encoded byte strings of exactly 16 and 8 bytes respectively. + // A span with either ID malformed OR wrong-length is unusable + // downstream — relaying it would deliver invalid IDs that + // collectors reject (sometimes wholesale, poisoning the whole + // batch). Drop the span instead, so valid spans in the same batch + // still get through. `hex::decode("01")` would succeed without the + // length check below and emit a 1-byte trace_id — collectors then + // reject the export. + let trace_id = hex::decode(&span.trace_id).ok()?; + if trace_id.len() != OTLP_TRACE_ID_BYTES { + return None; + } + let span_id = hex::decode(&span.span_id).ok()?; + if span_id.len() != OTLP_SPAN_ID_BYTES { + return None; + } + + // `parent_span_id` is optional; "absent" can mean missing field, + // empty string, or all-zeros. Treat each as "no parent". If present + // but malformed (bad hex OR wrong length), fall back to empty + // rather than dropping the whole span — the orphaned-parent case is + // recoverable downstream. + let parent_span_id = span + .parent_span_id + .as_deref() + .filter(|s| !s.is_empty() && !s.chars().all(|c| c == '0')) + .and_then(|s| hex::decode(s).ok()) + .filter(|bytes| bytes.len() == OTLP_SPAN_ID_BYTES) + .unwrap_or_default(); + + Some(ProtoSpan { + trace_id, + span_id, + trace_state: String::new(), + parent_span_id, + // Parity with the in-memory storage path's default at the top of + // `ingest_otlp_json` — a span arriving without an explicit `flags` + // field is treated as SAMPLED (W3C trace-flags bit 0 = 0x01). + // Defaulting to 0 here would mark such spans as unsampled on the + // forwarder hop while storage records them as sampled, producing + // inconsistent behaviour across the two halves of the same call. + flags: span.flags.unwrap_or(1), + name: span.name.clone(), + kind: span.kind.unwrap_or(0) as i32, + start_time_unix_nano: span.start_time_unix_nano.0, + end_time_unix_nano: span.end_time_unix_nano.0, + attributes: span.attributes.iter().map(otlp_kv_to_proto).collect(), + dropped_attributes_count: 0, + events: span.events.iter().map(otlp_event_to_proto).collect(), + dropped_events_count: 0, + links: span.links.iter().map(otlp_link_to_proto).collect(), + dropped_links_count: 0, + status: span.status.as_ref().map(otlp_status_to_proto), + }) +} + +fn otlp_event_to_proto(ev: &OtlpSpanEvent) -> opentelemetry_proto::tonic::trace::v1::span::Event { + opentelemetry_proto::tonic::trace::v1::span::Event { + time_unix_nano: ev.time_unix_nano.0, + name: ev.name.clone(), + attributes: ev.attributes.iter().map(otlp_kv_to_proto).collect(), + dropped_attributes_count: 0, + } +} + +fn otlp_link_to_proto(link: &OtlpSpanLink) -> opentelemetry_proto::tonic::trace::v1::span::Link { + // Links to malformed-ID remote spans fall back to empty bytes here + // (rather than dropping the link) — the parent span itself was already + // validated by `otlp_span_to_proto`, so a bad link is a recoverable + // metadata loss, not a wholesale span-loss event. + opentelemetry_proto::tonic::trace::v1::span::Link { + trace_id: hex::decode(&link.trace_id).unwrap_or_default(), + span_id: hex::decode(&link.span_id).unwrap_or_default(), + trace_state: link.trace_state.clone().unwrap_or_default(), + attributes: link.attributes.iter().map(otlp_kv_to_proto).collect(), + dropped_attributes_count: 0, + flags: 0, + } +} + +fn otlp_status_to_proto(status: &OtlpStatus) -> opentelemetry_proto::tonic::trace::v1::Status { + opentelemetry_proto::tonic::trace::v1::Status { + message: status.message.clone().unwrap_or_default(), + code: status.code as i32, + } +} + +fn otlp_kv_to_proto(kv: &OtlpKeyValue) -> opentelemetry_proto::tonic::common::v1::KeyValue { + opentelemetry_proto::tonic::common::v1::KeyValue { + key: kv.key.clone(), + value: kv.value.as_ref().map(otlp_any_value_to_proto), + } +} + +fn otlp_any_value_to_proto(v: &OtlpAnyValue) -> opentelemetry_proto::tonic::common::v1::AnyValue { + use opentelemetry_proto::tonic::common::v1::{ + AnyValue as ProtoAnyValue, ArrayValue as ProtoArrayValue, + KeyValueList as ProtoKeyValueList, any_value::Value as ProtoValue, + }; + + // OtlpAnyValue is a union — at most one field is set per OTLP spec. + // Probe in the spec's discriminator order. If none are set the proto + // representation is `AnyValue { value: None }`, which the collector + // treats as an empty attribute (identical to dropping the KV — matches + // the prior SpanData path's behaviour for that edge case). + let value = if let Some(s) = v.string_value.as_ref() { + Some(ProtoValue::StringValue(s.clone())) + } else if let Some(i) = v.int_value { + Some(ProtoValue::IntValue(i)) + } else if let Some(f) = v.double_value { + Some(ProtoValue::DoubleValue(f)) + } else if let Some(b) = v.bool_value { + Some(ProtoValue::BoolValue(b)) + } else if let Some(arr) = v.array_value.as_ref() { + Some(ProtoValue::ArrayValue(ProtoArrayValue { + values: arr.values.iter().map(otlp_any_value_to_proto).collect(), + })) + } else { + v.kvlist_value.as_ref().map(|kvl| { + ProtoValue::KvlistValue(ProtoKeyValueList { + values: kvl.values.iter().map(otlp_kv_to_proto).collect(), + }) + }) + }; + + ProtoAnyValue { value } +} + // ============================================================================= // OTLP JSON Metrics Ingestion from Node SDK // ============================================================================= @@ -4032,128 +4239,6 @@ mod tests { assert!(desc.is_none()); } - #[test] - fn test_otlp_kv_to_key_value_string() { - let kv = OtlpKeyValue { - key: "my.key".to_string(), - value: Some(OtlpAnyValue { - string_value: Some("hello".to_string()), - int_value: None, - double_value: None, - bool_value: None, - kvlist_value: None, - array_value: None, - }), - }; - - let result = otlp_kv_to_key_value(&kv); - assert!(result.is_some()); - let kv_out = result.unwrap(); - assert_eq!(kv_out.key.as_str(), "my.key"); - assert_eq!(kv_out.value.as_str().as_ref(), "hello"); - } - - #[test] - fn test_otlp_kv_to_key_value_int() { - let kv = OtlpKeyValue { - key: "count".to_string(), - value: Some(OtlpAnyValue { - string_value: None, - int_value: Some(42), - double_value: None, - bool_value: None, - kvlist_value: None, - array_value: None, - }), - }; - - let result = otlp_kv_to_key_value(&kv); - assert!(result.is_some()); - let kv_out = result.unwrap(); - assert_eq!(kv_out.key.as_str(), "count"); - // I64 value - assert_eq!(format!("{}", kv_out.value), "42"); - } - - #[test] - fn test_otlp_kv_to_key_value_double() { - let kv = OtlpKeyValue { - key: "ratio".to_string(), - value: Some(OtlpAnyValue { - string_value: None, - int_value: None, - double_value: Some(std::f64::consts::PI), - bool_value: None, - kvlist_value: None, - array_value: None, - }), - }; - - let result = otlp_kv_to_key_value(&kv); - assert!(result.is_some()); - let kv_out = result.unwrap(); - assert_eq!(kv_out.key.as_str(), "ratio"); - assert_eq!( - format!("{}", kv_out.value), - std::f64::consts::PI.to_string() - ); - } - - #[test] - fn test_otlp_kv_to_key_value_bool() { - let kv = OtlpKeyValue { - key: "enabled".to_string(), - value: Some(OtlpAnyValue { - string_value: None, - int_value: None, - double_value: None, - bool_value: Some(true), - kvlist_value: None, - array_value: None, - }), - }; - - let result = otlp_kv_to_key_value(&kv); - assert!(result.is_some()); - let kv_out = result.unwrap(); - assert_eq!(kv_out.key.as_str(), "enabled"); - assert_eq!(format!("{}", kv_out.value), "true"); - } - - #[test] - fn test_otlp_kv_to_key_value_none() { - let kv = OtlpKeyValue { - key: "empty".to_string(), - value: None, - }; - - let result = otlp_kv_to_key_value(&kv); - assert!(result.is_none()); - } - - #[test] - fn test_otlp_kv_to_key_value_empty_key_is_not_skipped() { - // The engine's otlp_kv_to_key_value does not filter on empty keys; - // it only returns None when the value itself is missing or has no - // recognised variant. - let kv = OtlpKeyValue { - key: String::new(), - value: Some(OtlpAnyValue { - string_value: Some("hello".to_string()), - int_value: None, - double_value: None, - bool_value: None, - kvlist_value: None, - array_value: None, - }), - }; - - let result = otlp_kv_to_key_value(&kv); - assert!(result.is_some()); - let kv_result = result.unwrap(); - assert_eq!(kv_result.key.as_str(), ""); - } - #[test] fn test_otlp_any_value_to_string_value() { // String branch @@ -4538,336 +4623,6 @@ mod tests { assert_eq!(spans.len(), 0); } - // ========================================================================= - // convert_otlp_to_span_data tests - // ========================================================================= - - #[test] - fn test_convert_otlp_to_span_data_basic() { - let json_str = r#"{ - "resourceSpans": [{ - "resource": { - "attributes": [{ - "key": "service.name", - "value": {"stringValue": "convert-test"} - }] - }, - "scopeSpans": [{ - "scope": {"name": "test-scope", "version": "1.0.0"}, - "spans": [{ - "traceId": "0af7651916cd43dd8448eb211c80319c", - "spanId": "b7ad6b7169203331", - "name": "basic-span", - "kind": 2, - "startTimeUnixNano": "1704067200000000000", - "endTimeUnixNano": "1704067201000000000", - "status": {"code": 1, "message": ""}, - "attributes": [{ - "key": "http.method", - "value": {"stringValue": "GET"} - }] - }] - }] - }] - }"#; - - let request: OtlpExportTraceServiceRequest = - serde_json::from_str(json_str).expect("parse JSON"); - let span_data = convert_otlp_to_span_data(&request); - - assert_eq!(span_data.len(), 1); - let sd = &span_data[0]; - assert_eq!(sd.name.as_ref(), "basic-span"); - assert!(matches!( - sd.span_kind, - opentelemetry::trace::SpanKind::Server - )); - assert!(matches!(sd.status, opentelemetry::trace::Status::Ok)); - assert_eq!(sd.attributes.len(), 1); - assert_eq!(sd.attributes[0].key.as_str(), "http.method"); - } - - #[test] - fn test_convert_otlp_to_span_data_with_events_and_links() { - let json_str = r#"{ - "resourceSpans": [{ - "resource": {}, - "scopeSpans": [{ - "scope": {"name": "events-scope", "version": ""}, - "spans": [{ - "traceId": "0af7651916cd43dd8448eb211c80319c", - "spanId": "b7ad6b7169203331", - "name": "span-with-events", - "kind": 1, - "startTimeUnixNano": "1000000000", - "endTimeUnixNano": "2000000000", - "events": [{ - "name": "exception", - "timeUnixNano": "1500000000", - "attributes": [{ - "key": "exception.message", - "value": {"stringValue": "something failed"} - }] - }], - "links": [{ - "traceId": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa1", - "spanId": "bbbbbbbbbbbbbb01", - "attributes": [{ - "key": "link.type", - "value": {"stringValue": "follows_from"} - }] - }] - }] - }] - }] - }"#; - - let request: OtlpExportTraceServiceRequest = - serde_json::from_str(json_str).expect("parse JSON"); - let span_data = convert_otlp_to_span_data(&request); - - assert_eq!(span_data.len(), 1); - let sd = &span_data[0]; - assert_eq!(sd.events.events.len(), 1); - assert_eq!(sd.events.events[0].name.as_ref(), "exception"); - assert_eq!(sd.links.links.len(), 1); - } - - #[test] - fn test_convert_otlp_to_span_data_error_status() { - let json_str = r#"{ - "resourceSpans": [{ - "resource": {}, - "scopeSpans": [{ - "spans": [{ - "traceId": "0af7651916cd43dd8448eb211c80319c", - "spanId": "b7ad6b7169203331", - "name": "error-span", - "kind": 3, - "startTimeUnixNano": "1000000000", - "endTimeUnixNano": "2000000000", - "status": {"code": 2, "message": "timeout occurred"} - }] - }] - }] - }"#; - - let request: OtlpExportTraceServiceRequest = - serde_json::from_str(json_str).expect("parse JSON"); - let span_data = convert_otlp_to_span_data(&request); - - assert_eq!(span_data.len(), 1); - let sd = &span_data[0]; - assert!(matches!( - &sd.status, - opentelemetry::trace::Status::Error { description } if description.as_ref() == "timeout occurred" - )); - assert!(matches!( - sd.span_kind, - opentelemetry::trace::SpanKind::Client - )); - } - - #[test] - fn test_convert_otlp_to_span_data_invalid_trace_id_skipped() { - let json_str = r#"{ - "resourceSpans": [{ - "resource": {}, - "scopeSpans": [{ - "spans": [ - { - "traceId": "invalid_hex", - "spanId": "b7ad6b7169203331", - "name": "bad-trace-id", - "startTimeUnixNano": "1000000000", - "endTimeUnixNano": "2000000000" - }, - { - "traceId": "0af7651916cd43dd8448eb211c80319c", - "spanId": "invalid_hex", - "name": "bad-span-id", - "startTimeUnixNano": "1000000000", - "endTimeUnixNano": "2000000000" - } - ] - }] - }] - }"#; - - let request: OtlpExportTraceServiceRequest = - serde_json::from_str(json_str).expect("parse JSON"); - let span_data = convert_otlp_to_span_data(&request); - - // Both spans should be skipped due to invalid IDs - assert_eq!(span_data.len(), 0); - } - - #[test] - fn test_convert_otlp_to_span_data_parent_span_handling() { - let json_str = r#"{ - "resourceSpans": [{ - "resource": {}, - "scopeSpans": [{ - "spans": [ - { - "traceId": "0af7651916cd43dd8448eb211c80319c", - "spanId": "b7ad6b7169203331", - "parentSpanId": "a1a2a3a4a5a6a7a8", - "name": "child-span", - "startTimeUnixNano": "1000000000", - "endTimeUnixNano": "2000000000" - }, - { - "traceId": "0af7651916cd43dd8448eb211c80319c", - "spanId": "c1c2c3c4c5c6c7c8", - "parentSpanId": "0000000000000000", - "name": "root-span", - "startTimeUnixNano": "1000000000", - "endTimeUnixNano": "2000000000" - } - ] - }] - }] - }"#; - - let request: OtlpExportTraceServiceRequest = - serde_json::from_str(json_str).expect("parse JSON"); - let span_data = convert_otlp_to_span_data(&request); - - assert_eq!(span_data.len(), 2); - - // First span has a real parent - let child = &span_data[0]; - assert_eq!(child.name.as_ref(), "child-span"); - assert!(child.parent_span_is_remote); - assert_ne!(child.parent_span_id.to_string(), "0000000000000000"); - - // Second span has all-zero parent (treated as root) - let root = &span_data[1]; - assert_eq!(root.name.as_ref(), "root-span"); - assert!(!root.parent_span_is_remote); - } - - #[test] - fn test_convert_otlp_to_span_data_multiple_resource_and_scope_spans() { - let json_str = r#"{ - "resourceSpans": [ - { - "resource": { - "attributes": [{"key": "service.name", "value": {"stringValue": "svc-a"}}] - }, - "scopeSpans": [ - { - "scope": {"name": "scope-1", "version": "1.0"}, - "spans": [{ - "traceId": "0af7651916cd43dd8448eb211c80319c", - "spanId": "aaaaaaaaaaaaaaaa", - "name": "span-a1", - "startTimeUnixNano": "1000000000", - "endTimeUnixNano": "2000000000" - }] - }, - { - "scope": {"name": "scope-2", "version": "2.0"}, - "spans": [{ - "traceId": "0af7651916cd43dd8448eb211c80319c", - "spanId": "bbbbbbbbbbbbbbbb", - "name": "span-a2", - "startTimeUnixNano": "1000000000", - "endTimeUnixNano": "2000000000" - }] - } - ] - }, - { - "resource": { - "attributes": [{"key": "service.name", "value": {"stringValue": "svc-b"}}] - }, - "scopeSpans": [{ - "spans": [{ - "traceId": "0af7651916cd43dd8448eb211c80319c", - "spanId": "cccccccccccccccc", - "name": "span-b1", - "startTimeUnixNano": "1000000000", - "endTimeUnixNano": "2000000000" - }] - }] - } - ] - }"#; - - let request: OtlpExportTraceServiceRequest = - serde_json::from_str(json_str).expect("parse JSON"); - let span_data = convert_otlp_to_span_data(&request); - - assert_eq!(span_data.len(), 3); - let names: Vec<&str> = span_data.iter().map(|s| s.name.as_ref()).collect(); - assert!(names.contains(&"span-a1")); - assert!(names.contains(&"span-a2")); - assert!(names.contains(&"span-b1")); - } - - #[test] - fn test_convert_otlp_to_span_data_otel_kind_attribute_fallback() { - let json_str = r#"{ - "resourceSpans": [{ - "resource": {}, - "scopeSpans": [{ - "spans": [{ - "traceId": "0af7651916cd43dd8448eb211c80319c", - "spanId": "b7ad6b7169203331", - "name": "attr-kind-span", - "startTimeUnixNano": "1000000000", - "endTimeUnixNano": "2000000000", - "attributes": [{ - "key": "otel.kind", - "value": {"stringValue": "PRODUCER"} - }] - }] - }] - }] - }"#; - - let request: OtlpExportTraceServiceRequest = - serde_json::from_str(json_str).expect("parse JSON"); - let span_data = convert_otlp_to_span_data(&request); - - assert_eq!(span_data.len(), 1); - assert!(matches!( - span_data[0].span_kind, - opentelemetry::trace::SpanKind::Producer - )); - } - - #[test] - fn test_convert_otlp_to_span_data_trace_flags() { - let json_str = r#"{ - "resourceSpans": [{ - "resource": {}, - "scopeSpans": [{ - "spans": [{ - "traceId": "0af7651916cd43dd8448eb211c80319c", - "spanId": "b7ad6b7169203331", - "name": "flags-span", - "startTimeUnixNano": "1000000000", - "endTimeUnixNano": "2000000000", - "flags": 1 - }] - }] - }] - }"#; - - let request: OtlpExportTraceServiceRequest = - serde_json::from_str(json_str).expect("parse JSON"); - let span_data = convert_otlp_to_span_data(&request); - - assert_eq!(span_data.len(), 1); - assert_eq!( - span_data[0].span_context.trace_flags(), - opentelemetry::trace::TraceFlags::SAMPLED - ); - } - // ========================================================================= // InMemorySpanExporter::export direct test // ========================================================================= @@ -5554,15 +5309,13 @@ mod tests { let spans = storage.get_spans(); assert_eq!(spans.len(), 2, "memory storage retains both parsed spans"); - let exported = convert_otlp_to_span_data( - &serde_json::from_str::(&payload.to_string()).unwrap(), - ); - assert_eq!( - exported.len(), - 1, - "forwarding skips invalid span identifiers" - ); - assert_eq!(exported[0].name.as_ref(), "valid-span"); + // Forwarder-side filtering of malformed identifiers was a side effect + // of the previous `convert_otlp_to_span_data → SpanExporter::export` + // path. Post-#1617 the forwarder relays the raw proto and lets the + // downstream collector validate / reject — that's the deliberate + // tradeoff for preserving `resource_spans[].resource` byte-for-byte. + // Memory storage behaviour (the contract this test was actually + // gating) is unchanged. } // ========================================================================= diff --git a/engine/tests/common/forwarder_mock.rs b/engine/tests/common/forwarder_mock.rs new file mode 100644 index 0000000000..636bbea898 --- /dev/null +++ b/engine/tests/common/forwarder_mock.rs @@ -0,0 +1,131 @@ +//! Shared mock OTLP gRPC collector for the `otel_forwarder_*` integration +//! tests. Extracted per #1618 review (Bridgebuilder F7) so the three +//! forwarder regression tests share one collector implementation and +//! one readiness-probe contract — keeps the suite in lockstep when the +//! `SDK_SPAN_FORWARDER` shape changes. +//! +//! Two flavors exist because the headers test needs to capture +//! `tonic::MetadataMap` (not the request body) while the +//! resource-preservation / partial-batch tests need the body: +//! +//! - [`spawn_body_capturing_collector`] — records the inbound +//! `ExportTraceServiceRequest` payloads. +//! - [`spawn_metadata_capturing_collector`] — records the inbound +//! `tonic::MetadataMap` per request. +//! +//! Both share the same readiness-probe behaviour (bounded +//! `TcpStream::connect` retry; replaces the older 50ms-sleep +//! heuristic that flaked on loaded CI runners). + +use std::sync::Arc; +use std::time::Duration; + +use opentelemetry_proto::tonic::collector::trace::v1::{ + ExportTraceServiceRequest, ExportTraceServiceResponse, + trace_service_server::{TraceService, TraceServiceServer}, +}; +use tokio::sync::Mutex; +use tonic::metadata::MetadataMap; +use tonic::transport::Server; +use tonic::{Request, Response, Status}; + +// ─── Body-capturing variant ──────────────────────────────────────────────── + +#[derive(Default, Clone)] +struct CapturingBody { + received: Arc>>, +} + +#[tonic::async_trait] +impl TraceService for CapturingBody { + async fn export( + &self, + request: Request, + ) -> Result, Status> { + self.received.lock().await.push(request.into_inner()); + Ok(Response::new(ExportTraceServiceResponse::default())) + } +} + +pub async fn spawn_body_capturing_collector() -> (String, Arc>>) +{ + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let endpoint = format!("http://{addr}"); + + let service = CapturingBody::default(); + let received = service.received.clone(); + + tokio::spawn(async move { + let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener); + Server::builder() + .add_service(TraceServiceServer::new(service)) + .serve_with_incoming(incoming) + .await + .expect("mock OTLP collector failed to start; see tonic transport error above") + }); + + wait_for_collector_ready(addr).await; + + (endpoint, received) +} + +// ─── Metadata-capturing variant ─────────────────────────────────────────── + +#[derive(Default, Clone)] +struct CapturingMetadata { + received: Arc>>, +} + +#[tonic::async_trait] +impl TraceService for CapturingMetadata { + async fn export( + &self, + request: Request, + ) -> Result, Status> { + self.received.lock().await.push(request.metadata().clone()); + Ok(Response::new(ExportTraceServiceResponse::default())) + } +} + +pub async fn spawn_metadata_capturing_collector() -> (String, Arc>>) { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let endpoint = format!("http://{addr}"); + + let service = CapturingMetadata::default(); + let received = service.received.clone(); + + tokio::spawn(async move { + let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener); + Server::builder() + .add_service(TraceServiceServer::new(service)) + .serve_with_incoming(incoming) + .await + .expect("mock OTLP collector failed to start; see tonic transport error above") + }); + + wait_for_collector_ready(addr).await; + + (endpoint, received) +} + +// ─── Readiness probe (shared) ───────────────────────────────────────────── + +/// Active TCP-connect probe replaces the older 50ms-sleep heuristic. +/// `TcpListener::bind` already guarantees the kernel-level listening +/// socket is open before we get here; the remaining race is purely "is +/// tonic's accept loop being polled?". A successful connect-then-drop +/// is sufficient evidence the accept loop is live. +async fn wait_for_collector_ready(addr: std::net::SocketAddr) { + let deadline = std::time::Instant::now() + Duration::from_secs(2); + loop { + match tokio::net::TcpStream::connect(addr).await { + Ok(_) => return, + Err(_) if std::time::Instant::now() < deadline => { + tokio::task::yield_now().await; + } + Err(e) => panic!("mock collector not reachable within 2s: {e}"), + } + } +} diff --git a/engine/tests/common/mod.rs b/engine/tests/common/mod.rs index b89215e792..0610446618 100644 --- a/engine/tests/common/mod.rs +++ b/engine/tests/common/mod.rs @@ -1,3 +1,4 @@ +pub mod forwarder_mock; pub mod http_helpers; pub mod queue_helpers; diff --git a/engine/tests/otel_forwarder_headers_test.rs b/engine/tests/otel_forwarder_headers_test.rs new file mode 100644 index 0000000000..552774083b --- /dev/null +++ b/engine/tests/otel_forwarder_headers_test.rs @@ -0,0 +1,113 @@ +// Regression test for iii-hq/iii#1618 review (ytallo's gherkin §"SDK span +// forwarder honors OTEL_EXPORTER_OTLP_HEADERS"). +// +// The pre-fix `opentelemetry_otlp::SpanExporter::builder().with_tonic()` +// parsed `OTEL_EXPORTER_OTLP_HEADERS` (and the signal-specific +// `OTEL_EXPORTER_OTLP_TRACES_HEADERS` override) automatically per the OTel +// SDK spec, injecting each `key=value` pair as a tonic `MetadataMap` entry +// on every export. The raw-`tonic::Channel` replacement in #1618 lost that +// behaviour silently — API-keyed collectors (Grafana Cloud, Honeycomb, +// SigNoz Cloud) would reject every forwarded span at auth with no signal +// surfaced engine-side. This test pins the env-var → metadata contract. +// +// Owns its own process so the env var and the process-global +// `SDK_SPAN_FORWARDER` `OnceLock` are both isolated from any other +// integration-test binary. + +mod common; + +use std::time::Duration; + +use common::forwarder_mock::spawn_metadata_capturing_collector; + +const MINIMAL_PAYLOAD: &str = r#"{ + "resourceSpans": [{ + "resource": {"attributes": [{"key": "service.name", "value": {"stringValue": "test-svc"}}]}, + "scopeSpans": [{ + "scope": {"name": "iii-1618-headers-regression"}, + "spans": [{ + "traceId": "00112233445566778899aabbccddeeff", + "spanId": "0011223344556677", + "name": "headers-test-span", + "kind": 1, + "startTimeUnixNano": "1700000000000000000", + "endTimeUnixNano": "1700000000001000000" + }] + }] + }] +}"#; + +#[tokio::test] +async fn forwarder_injects_otel_exporter_otlp_headers_into_export_metadata() { + // Order is load-bearing: the env var MUST be set BEFORE + // init_sdk_span_forwarder is called, because the parsing happens at + // forwarder-init time (matching the upstream `opentelemetry-otlp` + // builder semantics). Setting it post-init would not retroactively + // populate `SdkForwarderState.headers`. + // + // # Safety: this integration test binary is single-threaded with + // respect to env-var mutation — only one test exists in the file, + // and `SDK_SPAN_FORWARDER`'s `OnceLock` ensures we only init once. + unsafe { + std::env::set_var( + "OTEL_EXPORTER_OTLP_HEADERS", + "authorization=Bearer-test-token-12345,x-tenant=acme", + ); + } + + let (endpoint, received_metadata) = spawn_metadata_capturing_collector().await; + + iii::workers::observability::otel::init_sdk_span_forwarder(&endpoint); + + iii::workers::observability::otel::ingest_otlp_json(MINIMAL_PAYLOAD) + .await + .expect("ingest_otlp_json should accept a well-formed payload"); + + let deadline = std::time::Instant::now() + Duration::from_secs(5); + while std::time::Instant::now() < deadline { + if !received_metadata.lock().await.is_empty() { + break; + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + + let captured = received_metadata.lock().await; + assert_eq!( + captured.len(), + 1, + "Mock collector should have received exactly one ExportTraceServiceRequest" + ); + + let metadata = &captured[0]; + + // The authorization header from OTEL_EXPORTER_OTLP_HEADERS must + // land on the wire as a tonic Ascii metadata entry. Without this + // the upstream-spec env var is silently ignored and every API-keyed + // collector rejects forwarded spans at auth. + let auth = metadata + .get("authorization") + .expect("authorization metadata MUST be set from OTEL_EXPORTER_OTLP_HEADERS"); + assert_eq!( + auth.to_str().unwrap(), + "Bearer-test-token-12345", + "authorization metadata value did not match OTEL_EXPORTER_OTLP_HEADERS" + ); + + // Second pair confirms the comma-split parser handles multiple + // entries (single-pair could pass with a naive `=`-only split). + let tenant = metadata + .get("x-tenant") + .expect("x-tenant metadata MUST be set; comma-split parser regression?"); + assert_eq!( + tenant.to_str().unwrap(), + "acme", + "x-tenant metadata value did not match OTEL_EXPORTER_OTLP_HEADERS" + ); + + // Cleanup so a future test binary inheriting this process's env (if + // any) doesn't leak the test credential. # Safety: same single-test + // file rationale as the set_var above. + unsafe { + std::env::remove_var("OTEL_EXPORTER_OTLP_HEADERS"); + } +} diff --git a/engine/tests/otel_forwarder_partial_batch_test.rs b/engine/tests/otel_forwarder_partial_batch_test.rs new file mode 100644 index 0000000000..393ac6db8a --- /dev/null +++ b/engine/tests/otel_forwarder_partial_batch_test.rs @@ -0,0 +1,151 @@ +// Regression test for iii-hq/iii#1618 review (ytallo's gherkin §"One bad +// span does not poison the rest of the forwarded batch"). +// +// The earlier `otlp_span_to_proto` was infallible — it called +// `hex_decode_or_empty` on malformed `trace_id` / `span_id` strings and +// emitted spans with empty-byte IDs on the wire. Most collectors reject +// any batch containing such spans wholesale, so a single bad SDK emitter +// (instrumentation bug, truncated context propagation, custom client) +// could blackhole otherwise-valid telemetry from the same payload. +// +// The contract pinned here: `otlp_span_to_proto` returns `None` for +// spans whose `trace_id` or `span_id` fails to hex-decode OR decodes to +// a non-spec byte length (16 bytes for trace_id, 8 for span_id); +// `otlp_scope_spans_to_proto` `filter_map`s them out. Valid spans in +// the same batch reach the collector. +// +// Owns its own process so the global `SDK_SPAN_FORWARDER` `OnceLock` +// is fresh. + +mod common; + +use std::time::Duration; + +use common::forwarder_mock::spawn_body_capturing_collector; + +// Three-span payload covering all three OTLP ID-shape failure modes: +// (1) non-hex characters — `hex::decode` returns Err. +// (2) hex-decodable but wrong length — surfaced by adversarial review +// DISS-001 as a gap in the original filter. `traceId: "01"` +// decodes successfully to `[1]` (1 byte instead of the spec's 16); +// without the length check it would land on the wire and the +// collector would reject the batch. Locks the spec-length contract. +// (3) valid 32-hex-char trace_id + 16-hex-char span_id. Survives. +// +// Both bad spans share the same scope as the valid one so we exercise +// `otlp_scope_spans_to_proto`'s `filter_map` on a heterogeneous list. +const MIXED_BATCH_PAYLOAD: &str = r#"{ + "resourceSpans": [{ + "resource": { + "attributes": [{"key": "service.name", "value": {"stringValue": "mixed-batch-svc"}}] + }, + "scopeSpans": [{ + "scope": {"name": "iii-1618-partial-batch-regression"}, + "spans": [ + { + "traceId": "not-hex", + "spanId": "also-not-hex", + "name": "bad-non-hex", + "startTimeUnixNano": "1700000000000000000", + "endTimeUnixNano": "1700000000001000000" + }, + { + "traceId": "01", + "spanId": "01", + "name": "bad-wrong-length", + "startTimeUnixNano": "1700000000000500000", + "endTimeUnixNano": "1700000000001500000" + }, + { + "traceId": "0af7651916cd43dd8448eb211c80319c", + "spanId": "b7ad6b7169203331", + "name": "valid-span", + "startTimeUnixNano": "1700000000002000000", + "endTimeUnixNano": "1700000000003000000" + } + ] + }] + }] +}"#; + +#[tokio::test] +async fn forwarder_drops_malformed_spans_and_relays_the_rest_of_the_batch() { + let (endpoint, received) = spawn_body_capturing_collector().await; + + iii::workers::observability::otel::init_sdk_span_forwarder(&endpoint); + + iii::workers::observability::otel::ingest_otlp_json(MIXED_BATCH_PAYLOAD) + .await + .expect("ingest_otlp_json should accept a well-formed (mixed-validity) payload"); + + let deadline = std::time::Instant::now() + Duration::from_secs(5); + while std::time::Instant::now() < deadline { + if !received.lock().await.is_empty() { + break; + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + + let captured = received.lock().await; + assert_eq!( + captured.len(), + 1, + "Mock collector should have received exactly one ExportTraceServiceRequest" + ); + + let req = &captured[0]; + + // Walk every span the collector actually saw. The bad-span MUST NOT + // be present (no empty-byte IDs on the wire), and the valid-span + // MUST be — exactly once, with its original hex-decoded IDs intact. + let all_spans: Vec<_> = req + .resource_spans + .iter() + .flat_map(|rs| rs.scope_spans.iter()) + .flat_map(|ss| ss.spans.iter()) + .collect(); + + // OTLP spec lengths: trace_id MUST be 16 bytes, span_id MUST be 8. + // No span on the wire may violate these — empty IDs (bad hex case) + // OR wrong-length IDs (good hex but bad length, e.g. "01" → [1]). + for span in &all_spans { + assert_eq!( + span.trace_id.len(), + 16, + "Forwarder emitted span with non-spec trace_id length {} (expected 16): name = {:?}", + span.trace_id.len(), + span.name + ); + assert_eq!( + span.span_id.len(), + 8, + "Forwarder emitted span with non-spec span_id length {} (expected 8): name = {:?}", + span.span_id.len(), + span.name + ); + } + + assert_eq!( + all_spans.len(), + 1, + "Expected exactly one span on the wire (the valid one). Got {}: names = {:?}", + all_spans.len(), + all_spans.iter().map(|s| &s.name).collect::>() + ); + + let valid = all_spans[0]; + assert_eq!( + valid.name, "valid-span", + "The surviving span should be the valid one, not the malformed one" + ); + assert_eq!( + valid.trace_id, + hex::decode("0af7651916cd43dd8448eb211c80319c").unwrap(), + "Valid span's trace_id was rewritten by the forwarder" + ); + assert_eq!( + valid.span_id, + hex::decode("b7ad6b7169203331").unwrap(), + "Valid span's span_id was rewritten by the forwarder" + ); +} diff --git a/engine/tests/otel_forwarder_resource_test.rs b/engine/tests/otel_forwarder_resource_test.rs new file mode 100644 index 0000000000..038124429e --- /dev/null +++ b/engine/tests/otel_forwarder_resource_test.rs @@ -0,0 +1,206 @@ +// Regression test for iii-hq/iii#1617 — OTLP span forwarding drops resource +// attributes (`service.name` shows up as `` in downstream collectors). +// +// Owns its own process via being an integration test file, so the +// process-global `SDK_SPAN_FORWARDER` `OnceLock` is fresh on every run. +// The mock collector lives in `tests/common/forwarder_mock.rs` and is +// shared across the three `otel_forwarder_*` regression tests so the +// readiness/spawn contract stays in lockstep. +// +// The assertion contract: the captured `ExportTraceServiceRequest` +// carries the full inbound `resource_spans[].resource` block — not just +// `service.name`, but every resource attribute we put on the wire — to +// lock in full-resource preservation rather than a single-field band-aid. + +mod common; + +use std::time::Duration; + +use opentelemetry_proto::tonic::common::v1::{KeyValue, any_value}; + +use common::forwarder_mock::spawn_body_capturing_collector; + +fn payload_with_resource(service_name: &str, namespace: &str, version: &str) -> String { + // Minimal but real OTLP/JSON ExportTraceServiceRequest. + // `resourceSpans[0].resource.attributes` is the structure under audit. + // The attribute mix exercises three AnyValue discriminator branches — + // stringValue (service.name/namespace/version), intValue + // (replica.count), and boolValue (telemetry.enabled) — so the proto + // conversion's non-string code paths are also pinned by the regression + // contract. Spans deliberately omit `flags` so the test also asserts + // the SAMPLED default that mirrors the in-memory storage path. + format!( + r#"{{ + "resourceSpans": [{{ + "resource": {{ + "attributes": [ + {{"key": "service.name", "value": {{"stringValue": "{service_name}"}}}}, + {{"key": "service.namespace", "value": {{"stringValue": "{namespace}"}}}}, + {{"key": "service.version", "value": {{"stringValue": "{version}"}}}}, + {{"key": "replica.count", "value": {{"intValue": "7"}}}}, + {{"key": "telemetry.enabled", "value": {{"boolValue": true}}}} + ] + }}, + "scopeSpans": [{{ + "scope": {{"name": "iii-1617-regression"}}, + "spans": [{{ + "traceId": "00112233445566778899aabbccddeeff", + "spanId": "0011223344556677", + "name": "forwarder.preserves.resource", + "kind": 1, + "startTimeUnixNano": "1700000000000000000", + "endTimeUnixNano": "1700000000001000000" + }}] + }}] + }}] + }}"# + ) +} + +fn find_string_attr<'a>(attrs: &'a [KeyValue], key: &str) -> Option<&'a str> { + attrs + .iter() + .find(|kv| kv.key == key) + .and_then(|kv| kv.value.as_ref()) + .and_then(|v| v.value.as_ref()) + .and_then(|v| match v { + any_value::Value::StringValue(s) => Some(s.as_str()), + _ => None, + }) +} + +fn find_int_attr(attrs: &[KeyValue], key: &str) -> Option { + attrs + .iter() + .find(|kv| kv.key == key) + .and_then(|kv| kv.value.as_ref()) + .and_then(|v| v.value.as_ref()) + .and_then(|v| match v { + any_value::Value::IntValue(i) => Some(*i), + _ => None, + }) +} + +fn find_bool_attr(attrs: &[KeyValue], key: &str) -> Option { + attrs + .iter() + .find(|kv| kv.key == key) + .and_then(|kv| kv.value.as_ref()) + .and_then(|v| v.value.as_ref()) + .and_then(|v| match v { + any_value::Value::BoolValue(b) => Some(*b), + _ => None, + }) +} + +#[tokio::test] +async fn forwarder_preserves_full_resource_attributes_through_to_collector() { + let (endpoint, received) = spawn_body_capturing_collector().await; + + iii::workers::observability::otel::init_sdk_span_forwarder(&endpoint); + + let payload = payload_with_resource("my-service", "production", "1.4.2"); + iii::workers::observability::otel::ingest_otlp_json(&payload) + .await + .expect("ingest_otlp_json should accept a well-formed payload"); + + // Wait briefly for the async export to complete. Pinning to a poll loop + // rather than a fixed sleep so the test is fast in the happy case and + // still bounded in the failure case. + let deadline = std::time::Instant::now() + Duration::from_secs(5); + while std::time::Instant::now() < deadline { + if !received.lock().await.is_empty() { + break; + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + + let captured = received.lock().await; + assert_eq!( + captured.len(), + 1, + "Mock collector should have received exactly one ExportTraceServiceRequest" + ); + + let req = &captured[0]; + assert_eq!( + req.resource_spans.len(), + 1, + "Expected one ResourceSpans in the captured request" + ); + + let resource = req.resource_spans[0].resource.as_ref().expect( + "resource_spans[0].resource MUST be present — the whole point of #1617 \ + is that the inbound resource block reaches the collector", + ); + + // The load-bearing assertions. Each one is a separate inbound resource + // attribute; the test passes only if the forwarder relays the entire + // resource block, not just `service.name` as a single-field workaround. + assert_eq!( + find_string_attr(&resource.attributes, "service.name"), + Some("my-service"), + "service.name was dropped on the forwarder hop (#1617 root symptom)" + ); + assert_eq!( + find_string_attr(&resource.attributes, "service.namespace"), + Some("production"), + "service.namespace was dropped on the forwarder hop — fix must preserve \ + every resource attribute, not just service.name" + ); + assert_eq!( + find_string_attr(&resource.attributes, "service.version"), + Some("1.4.2"), + "service.version was dropped on the forwarder hop — fix must preserve \ + every resource attribute, not just service.name" + ); + + // Non-string AnyValue branches. The proto-conversion helper probes + // discriminator variants in spec order (`string → int → double → bool` + // → composites); these assertions catch regressions in any branch + // selection beyond the simple string case. + assert_eq!( + find_int_attr(&resource.attributes, "replica.count"), + Some(7), + "intValue resource attribute was lost or mistranslated by the forwarder" + ); + assert_eq!( + find_bool_attr(&resource.attributes, "telemetry.enabled"), + Some(true), + "boolValue resource attribute was lost or mistranslated by the forwarder" + ); + + // Pass-through fidelity. The forwarder must relay exactly what + // arrived — no engine-injected hop-level attributes (e.g. a + // `forwarder.id`), no span-name rewrites. A bug that ADDS attrs or + // mutates the span would pass the preceding assertions but is + // caught here. Five inbound attributes; expect five out. + assert_eq!( + resource.attributes.len(), + 5, + "Forwarder injected or dropped resource attributes; expected exactly 5 (inbound), got {} (names: {:?})", + resource.attributes.len(), + resource + .attributes + .iter() + .map(|kv| &kv.key) + .collect::>() + ); + + // Trace flags default. The inbound payload omits `flags`, and the + // forwarder must default to SAMPLED (W3C trace-flags bit 0 = 1) to + // match the in-memory storage path's behaviour. Defaulting to 0 + // (unsampled) would silently drop spans at downstream samplers / + // collectors that respect the SAMPLED bit. + let span = &req.resource_spans[0].scope_spans[0].spans[0]; + assert_eq!( + span.flags & 0x01, + 0x01, + "Forwarder must default missing trace flags to SAMPLED; got {:#x}", + span.flags + ); + assert_eq!( + span.name, "forwarder.preserves.resource", + "Forwarder rewrote span name; pass-through contract violated" + ); +}