Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ tracing-opentelemetry = "0.32"
opentelemetry = "0.31"
opentelemetry_sdk = { version = "0.31", features = ["rt-tokio"] }
opentelemetry-otlp = { version = "0.31", features = ["tonic", "grpc-tonic", "logs", "tls-roots"] }
opentelemetry-proto = { version = "0.31", features = ["gen-tonic"] }
opentelemetry-semantic-conventions = "0.31"
tonic = { version = "0.14", features = ["transport", "tls-native-roots"] }
futures = "0.3"
futures-util = "0.3"
http-body-util = "0.1"
Expand Down Expand Up @@ -160,6 +162,7 @@ which = "7"

[dev-dependencies]
tokio = { version = "1", features = ["test-util"] }
tokio-stream = { version = "0.1", features = ["net"] }
mockall = "0.13"
serial_test = "3"
temp-env = "0.3"
Expand Down
1,145 changes: 449 additions & 696 deletions engine/src/workers/observability/otel.rs

Large diffs are not rendered by default.

131 changes: 131 additions & 0 deletions engine/tests/common/forwarder_mock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
//! Shared mock OTLP gRPC collector for the `otel_forwarder_*` integration
//! tests. Extracted per #1618 review (Bridgebuilder F7) so the three
//! forwarder regression tests share one collector implementation and
//! one readiness-probe contract — keeps the suite in lockstep when the
//! `SDK_SPAN_FORWARDER` shape changes.
//!
//! Two flavors exist because the headers test needs to capture
//! `tonic::MetadataMap` (not the request body) while the
//! resource-preservation / partial-batch tests need the body:
//!
//! - [`spawn_body_capturing_collector`] — records the inbound
//! `ExportTraceServiceRequest` payloads.
//! - [`spawn_metadata_capturing_collector`] — records the inbound
//! `tonic::MetadataMap` per request.
//!
//! Both share the same readiness-probe behaviour (bounded
//! `TcpStream::connect` retry; replaces the older 50ms-sleep
//! heuristic that flaked on loaded CI runners).

use std::sync::Arc;
use std::time::Duration;

use opentelemetry_proto::tonic::collector::trace::v1::{
ExportTraceServiceRequest, ExportTraceServiceResponse,
trace_service_server::{TraceService, TraceServiceServer},
};
use tokio::sync::Mutex;
use tonic::metadata::MetadataMap;
use tonic::transport::Server;
use tonic::{Request, Response, Status};

// ─── Body-capturing variant ────────────────────────────────────────────────

#[derive(Default, Clone)]
struct CapturingBody {
received: Arc<Mutex<Vec<ExportTraceServiceRequest>>>,
}

#[tonic::async_trait]
impl TraceService for CapturingBody {
async fn export(
&self,
request: Request<ExportTraceServiceRequest>,
) -> Result<Response<ExportTraceServiceResponse>, Status> {
self.received.lock().await.push(request.into_inner());
Ok(Response::new(ExportTraceServiceResponse::default()))
}
}

pub async fn spawn_body_capturing_collector() -> (String, Arc<Mutex<Vec<ExportTraceServiceRequest>>>)
{
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let endpoint = format!("http://{addr}");

let service = CapturingBody::default();
let received = service.received.clone();

tokio::spawn(async move {
let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
Server::builder()
.add_service(TraceServiceServer::new(service))
.serve_with_incoming(incoming)
.await
.expect("mock OTLP collector failed to start; see tonic transport error above")
});

wait_for_collector_ready(addr).await;

(endpoint, received)
}

// ─── Metadata-capturing variant ───────────────────────────────────────────

#[derive(Default, Clone)]
struct CapturingMetadata {
received: Arc<Mutex<Vec<MetadataMap>>>,
}

#[tonic::async_trait]
impl TraceService for CapturingMetadata {
async fn export(
&self,
request: Request<ExportTraceServiceRequest>,
) -> Result<Response<ExportTraceServiceResponse>, Status> {
self.received.lock().await.push(request.metadata().clone());
Ok(Response::new(ExportTraceServiceResponse::default()))
}
}

pub async fn spawn_metadata_capturing_collector() -> (String, Arc<Mutex<Vec<MetadataMap>>>) {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let endpoint = format!("http://{addr}");

let service = CapturingMetadata::default();
let received = service.received.clone();

tokio::spawn(async move {
let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
Server::builder()
.add_service(TraceServiceServer::new(service))
.serve_with_incoming(incoming)
.await
.expect("mock OTLP collector failed to start; see tonic transport error above")
});

wait_for_collector_ready(addr).await;

(endpoint, received)
}

// ─── Readiness probe (shared) ─────────────────────────────────────────────

/// Active TCP-connect probe replaces the older 50ms-sleep heuristic.
/// `TcpListener::bind` already guarantees the kernel-level listening
/// socket is open before we get here; the remaining race is purely "is
/// tonic's accept loop being polled?". A successful connect-then-drop
/// is sufficient evidence the accept loop is live.
async fn wait_for_collector_ready(addr: std::net::SocketAddr) {
let deadline = std::time::Instant::now() + Duration::from_secs(2);
loop {
match tokio::net::TcpStream::connect(addr).await {
Ok(_) => return,
Err(_) if std::time::Instant::now() < deadline => {
tokio::task::yield_now().await;
}
Err(e) => panic!("mock collector not reachable within 2s: {e}"),
}
}
}
1 change: 1 addition & 0 deletions engine/tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod forwarder_mock;
pub mod http_helpers;
pub mod queue_helpers;

Expand Down
113 changes: 113 additions & 0 deletions engine/tests/otel_forwarder_headers_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Regression test for iii-hq/iii#1618 review (ytallo's gherkin §"SDK span
// forwarder honors OTEL_EXPORTER_OTLP_HEADERS").
//
// The pre-fix `opentelemetry_otlp::SpanExporter::builder().with_tonic()`
// parsed `OTEL_EXPORTER_OTLP_HEADERS` (and the signal-specific
// `OTEL_EXPORTER_OTLP_TRACES_HEADERS` override) automatically per the OTel
// SDK spec, injecting each `key=value` pair as a tonic `MetadataMap` entry
// on every export. The raw-`tonic::Channel` replacement in #1618 lost that
// behaviour silently — API-keyed collectors (Grafana Cloud, Honeycomb,
// SigNoz Cloud) would reject every forwarded span at auth with no signal
// surfaced engine-side. This test pins the env-var → metadata contract.
//
// Owns its own process so the env var and the process-global
// `SDK_SPAN_FORWARDER` `OnceLock` are both isolated from any other
// integration-test binary.

mod common;

use std::time::Duration;

use common::forwarder_mock::spawn_metadata_capturing_collector;

const MINIMAL_PAYLOAD: &str = r#"{
"resourceSpans": [{
"resource": {"attributes": [{"key": "service.name", "value": {"stringValue": "test-svc"}}]},
"scopeSpans": [{
"scope": {"name": "iii-1618-headers-regression"},
"spans": [{
"traceId": "00112233445566778899aabbccddeeff",
"spanId": "0011223344556677",
"name": "headers-test-span",
"kind": 1,
"startTimeUnixNano": "1700000000000000000",
"endTimeUnixNano": "1700000000001000000"
}]
}]
}]
}"#;

#[tokio::test]
async fn forwarder_injects_otel_exporter_otlp_headers_into_export_metadata() {
// Order is load-bearing: the env var MUST be set BEFORE
// init_sdk_span_forwarder is called, because the parsing happens at
// forwarder-init time (matching the upstream `opentelemetry-otlp`
// builder semantics). Setting it post-init would not retroactively
// populate `SdkForwarderState.headers`.
//
// # Safety: this integration test binary is single-threaded with
// respect to env-var mutation — only one test exists in the file,
// and `SDK_SPAN_FORWARDER`'s `OnceLock` ensures we only init once.
unsafe {
std::env::set_var(
"OTEL_EXPORTER_OTLP_HEADERS",
"authorization=Bearer-test-token-12345,x-tenant=acme",
);
}

let (endpoint, received_metadata) = spawn_metadata_capturing_collector().await;

iii::workers::observability::otel::init_sdk_span_forwarder(&endpoint);

iii::workers::observability::otel::ingest_otlp_json(MINIMAL_PAYLOAD)
.await
.expect("ingest_otlp_json should accept a well-formed payload");

let deadline = std::time::Instant::now() + Duration::from_secs(5);
while std::time::Instant::now() < deadline {
if !received_metadata.lock().await.is_empty() {
break;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}

let captured = received_metadata.lock().await;
assert_eq!(
captured.len(),
1,
"Mock collector should have received exactly one ExportTraceServiceRequest"
);

let metadata = &captured[0];

// The authorization header from OTEL_EXPORTER_OTLP_HEADERS must
// land on the wire as a tonic Ascii metadata entry. Without this
// the upstream-spec env var is silently ignored and every API-keyed
// collector rejects forwarded spans at auth.
let auth = metadata
.get("authorization")
.expect("authorization metadata MUST be set from OTEL_EXPORTER_OTLP_HEADERS");
assert_eq!(
auth.to_str().unwrap(),
"Bearer-test-token-12345",
"authorization metadata value did not match OTEL_EXPORTER_OTLP_HEADERS"
);

// Second pair confirms the comma-split parser handles multiple
// entries (single-pair could pass with a naive `=`-only split).
let tenant = metadata
.get("x-tenant")
.expect("x-tenant metadata MUST be set; comma-split parser regression?");
assert_eq!(
tenant.to_str().unwrap(),
"acme",
"x-tenant metadata value did not match OTEL_EXPORTER_OTLP_HEADERS"
);

// Cleanup so a future test binary inheriting this process's env (if
// any) doesn't leak the test credential. # Safety: same single-test
// file rationale as the set_var above.
unsafe {
std::env::remove_var("OTEL_EXPORTER_OTLP_HEADERS");
}
Comment on lines +99 to +170
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Test isolation is incomplete for header precedence.

Because OTEL_EXPORTER_OTLP_TRACES_HEADERS overrides OTEL_EXPORTER_OTLP_HEADERS, this test can flake if the process inherits *_TRACES_HEADERS. Clear (and restore) both vars around init to make the assertion deterministic.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@engine/tests/otel_forwarder_headers_test.rs` around lines 99 - 170, The test
forwarder_injects_otel_exporter_otlp_headers_into_export_metadata must
explicitly guard against existing OTEL_EXPORTER_OTLP_TRACES_HEADERS precedence:
capture the current values of OTEL_EXPORTER_OTLP_HEADERS and
OTEL_EXPORTER_OTLP_TRACES_HEADERS, clear or set both appropriately before
calling iii::workers::observability::otel::init_sdk_span_forwarder(&endpoint)
(ensuring the desired OTEL_EXPORTER_OTLP_HEADERS is set and
OTEL_EXPORTER_OTLP_TRACES_HEADERS is cleared), and then restore the original
values at the end of the test; this guarantees deterministic header precedence
for the assertions without changing init_sdk_span_forwarder itself.

}
Loading
Loading