Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions engine/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,12 @@ fn extract_otel_config(cfg: &EngineConfig) -> OtelConfig {
if let Some(service_name) = module_config.service_name {
otel_cfg.service_name = service_name;
}
if let Some(service_version) = module_config.service_version {
otel_cfg.service_version = service_version;
}
if let Some(service_namespace) = module_config.service_namespace {
otel_cfg.service_namespace = Some(service_namespace);
}
if let Some(exporter) = module_config.exporter {
otel_cfg.exporter = match exporter {
crate::workers::observability::config::OtelExporterType::Memory => ExporterType::Memory,
Expand Down Expand Up @@ -1042,6 +1048,8 @@ mod tests {
config: Some(serde_json::json!({
"enabled": true,
"service_name": "test-service",
"service_version": "1.2.3",
"service_namespace": "production",
"exporter": "memory",
"endpoint": "http://collector:4317",
"sampling_ratio": 0.25,
Expand All @@ -1054,6 +1062,8 @@ mod tests {
let otel = extract_otel_config(&cfg);
assert!(otel.enabled);
assert_eq!(otel.service_name, "test-service");
assert_eq!(otel.service_version, "1.2.3");
assert_eq!(otel.service_namespace.as_deref(), Some("production"));
assert!(matches!(otel.exporter, ExporterType::Memory));
assert_eq!(otel.endpoint, "http://collector:4317");
assert_eq!(otel.sampling_ratio, 0.25);
Expand Down
161 changes: 129 additions & 32 deletions engine/src/workers/observability/otel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,22 @@ pub struct OtelConfig {
pub memory_max_spans: usize,
}

fn non_blank(value: String) -> Option<String> {
(!value.trim().is_empty()).then_some(value)
}

fn service_name_or_default(configured: &str) -> String {
non_blank(configured.to_string())
.or_else(|| env::var("OTEL_SERVICE_NAME").ok().and_then(non_blank))
.unwrap_or_else(|| "iii".to_string())
}

fn service_version_or_default(configured: &str) -> String {
non_blank(configured.to_string())
.or_else(|| env::var("SERVICE_VERSION").ok().and_then(non_blank))
.unwrap_or_else(|| "unknown".to_string())
}

impl Default for OtelConfig {
fn default() -> Self {
// First check global config from YAML, then fall back to environment variables
Expand All @@ -124,19 +140,22 @@ impl Default for OtelConfig {
})
.unwrap_or(false);

let service_name = global_cfg
.and_then(|c| c.service_name.clone())
.or_else(|| env::var("OTEL_SERVICE_NAME").ok())
.unwrap_or_else(|| "iii".to_string());
let service_name = service_name_or_default(
global_cfg
.and_then(|c| c.service_name.as_deref())
.unwrap_or(""),
);

let service_version = global_cfg
.and_then(|c| c.service_version.clone())
.or_else(|| env::var("SERVICE_VERSION").ok())
.unwrap_or_else(|| "unknown".to_string());
let service_version = service_version_or_default(
global_cfg
.and_then(|c| c.service_version.as_deref())
.unwrap_or(""),
);

let service_namespace = global_cfg
.and_then(|c| c.service_namespace.clone())
.or_else(|| env::var("SERVICE_NAMESPACE").ok());
.and_then(non_blank)
.or_else(|| env::var("SERVICE_NAMESPACE").ok().and_then(non_blank));

let exporter = global_cfg
.and_then(|c| c.exporter.clone())
Expand Down Expand Up @@ -657,6 +676,10 @@ impl SpanExporter for TeeSpanExporter {
fn shutdown_with_timeout(&mut self, timeout: std::time::Duration) -> OTelSdkResult {
self.otlp_exporter.shutdown_with_timeout(timeout)
}

fn set_resource(&mut self, resource: &Resource) {
self.otlp_exporter.set_resource(resource);
}
}

static TRACER_PROVIDER: OnceLock<SdkTracerProvider> = OnceLock::new();
Expand Down Expand Up @@ -852,19 +875,24 @@ where
// Build the sampler using advanced configuration if available
let sampler = build_sampler(config);

// Build resource attributes with OTEL semantic conventions
// Using string keys for attributes not available in the crate version
let service_name = service_name_or_default(&config.service_name);
let service_version = service_version_or_default(&config.service_version);
let service_namespace = config
.service_namespace
.clone()
.and_then(non_blank)
.or_else(|| env::var("SERVICE_NAMESPACE").ok().and_then(non_blank));

let mut resource_builder = Resource::builder()
.with_service_name(config.service_name.clone())
.with_service_name(service_name.clone())
.with_attributes([
KeyValue::new("service.version", config.service_version.clone()),
KeyValue::new("service.version", service_version),
KeyValue::new("service.instance.id", uuid::Uuid::new_v4().to_string()),
]);

// Only add namespace if provided (optional attribute)
if let Some(namespace) = &config.service_namespace {
if let Some(namespace) = service_namespace {
resource_builder =
resource_builder.with_attribute(KeyValue::new("service.namespace", namespace.clone()));
resource_builder.with_attribute(KeyValue::new("service.namespace", namespace));
}

let resource = resource_builder.build();
Expand Down Expand Up @@ -899,10 +927,8 @@ where
"Failed to create OTLP exporter, falling back to memory-only mode"
);
// Fall back to memory-only mode
let exporter = InMemorySpanExporter::new(
config.memory_max_spans,
config.service_name.clone(),
);
let exporter =
InMemorySpanExporter::new(config.memory_max_spans, service_name.clone());
SdkTracerProvider::builder()
.with_simple_exporter(exporter)
.with_sampler(sampler)
Expand All @@ -913,8 +939,7 @@ where
}
}
ExporterType::Memory => {
let exporter =
InMemorySpanExporter::new(config.memory_max_spans, config.service_name.clone());
let exporter = InMemorySpanExporter::new(config.memory_max_spans, service_name.clone());

SdkTracerProvider::builder()
.with_simple_exporter(exporter)
Expand All @@ -940,11 +965,8 @@ where
init_sdk_span_forwarder(&config.endpoint);

// Create tee exporter that sends to both
let tee_exporter = TeeSpanExporter::new(
otlp_exporter,
memory_storage,
config.service_name.clone(),
);
let tee_exporter =
TeeSpanExporter::new(otlp_exporter, memory_storage, service_name.clone());

SdkTracerProvider::builder()
.with_batch_exporter(tee_exporter)
Expand All @@ -960,10 +982,8 @@ where
"Failed to create OTLP exporter for 'both' mode, using memory-only"
);
// Fall back to memory-only with our already-created storage
let exporter = InMemorySpanExporter::with_storage(
memory_storage,
config.service_name.clone(),
);
let exporter =
InMemorySpanExporter::with_storage(memory_storage, service_name.clone());
SdkTracerProvider::builder()
.with_simple_exporter(exporter)
.with_sampler(sampler)
Expand Down Expand Up @@ -997,7 +1017,7 @@ where

println!(
"OpenTelemetry initialized: exporter={}, service_name={}, sampling_ratio={}",
exporter_info, config.service_name, config.sampling_ratio
exporter_info, service_name, config.sampling_ratio
);

Some(OpenTelemetryLayer::new(tracer))
Expand Down Expand Up @@ -3183,6 +3203,83 @@ mod tests {
use super::*;
use serial_test::serial;

fn restore_env_var(name: &str, value: Option<std::ffi::OsString>) {
unsafe {
match value {
Some(value) => env::set_var(name, value),
None => env::remove_var(name),
}
}
}

struct EnvVarGuard {
name: &'static str,
value: Option<std::ffi::OsString>,
}

impl EnvVarGuard {
fn set(name: &'static str, value: &str) -> Self {
let guard = Self {
name,
value: env::var_os(name),
};
unsafe {
env::set_var(name, value);
}
guard
}

fn remove(name: &'static str) -> Self {
let guard = Self {
name,
value: env::var_os(name),
};
unsafe {
env::remove_var(name);
}
guard
}
}

impl Drop for EnvVarGuard {
fn drop(&mut self) {
restore_env_var(self.name, self.value.clone());
}
}

#[test]
#[serial]
fn test_service_name_or_default_uses_env_when_config_is_blank() {
let _service_name = EnvVarGuard::set("OTEL_SERVICE_NAME", "iii-engine");

assert_eq!(service_name_or_default(""), "iii-engine");
assert_eq!(service_name_or_default(" "), "iii-engine");
}

#[test]
#[serial]
fn test_service_name_or_default_uses_builtin_default_when_config_and_env_are_blank() {
let _service_name = EnvVarGuard::set("OTEL_SERVICE_NAME", "");

assert_eq!(service_name_or_default(""), "iii");
}

#[test]
#[serial]
fn test_service_version_or_default_uses_env_when_config_is_blank() {
let _service_version = EnvVarGuard::set("SERVICE_VERSION", "1.2.3");

assert_eq!(service_version_or_default(""), "1.2.3");
}

#[test]
#[serial]
fn test_service_version_or_default_uses_builtin_default_when_config_and_env_are_blank() {
let _service_version = EnvVarGuard::remove("SERVICE_VERSION");

assert_eq!(service_version_or_default(""), "unknown");
}

#[tokio::test]
#[serial]
async fn test_ingest_otlp_metrics_gauge() {
Expand Down
Loading