Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,919 changes: 1,562 additions & 1,357 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,9 @@ indexmap = "2"
itertools = "0.14"
lazy_static = "1.5"
opentelemetry = "0.28"
opentelemetry-otlp = "0.28"
opentelemetry_sdk = "0.28"
# The default `reqwest-blocking-client` causes a runtime panic
opentelemetry-otlp = { version = "0.28", default-features = false, features = ["http-proto", "reqwest-client", "logs"]}
opentelemetry_sdk = {version = "0.28", features = ["experimental_metrics_periodicreader_with_async_runtime", "experimental_trace_batch_span_processor_with_async_runtime", "experimental_logs_batch_log_processor_with_async_runtime"]}
path-absolutize = "3"
quote = "1"
rand = "0.9"
Expand Down
2 changes: 1 addition & 1 deletion crates/factor-otel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ anyhow = { workspace = true }
indexmap = "2.2.6"
opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true }
opentelemetry-otlp = { workspace = true, features = ["http-proto", "http", "reqwest-client"] }
opentelemetry-otlp = { workspace = true }
spin-core = { path = "../core" }
spin-factors = { path = "../factors" }
spin-resource-table = { path = "../table" }
Expand Down
47 changes: 38 additions & 9 deletions crates/factor-otel/src/host.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use crate::InstanceState;
use anyhow::anyhow;
use anyhow::Result;
use opentelemetry::trace::TraceContextExt;
use opentelemetry_sdk::error::OTelSdkError;
use opentelemetry_sdk::metrics::exporter::PushMetricExporter;
use opentelemetry_sdk::trace::SpanProcessor;
use spin_world::wasi::otel::tracing as wasi_otel;

use spin_world::wasi;
use tracing_opentelemetry::OpenTelemetrySpanExt;

use crate::InstanceState;

impl wasi_otel::Host for InstanceState {
async fn on_start(&mut self, context: wasi_otel::SpanContext) -> Result<()> {
impl wasi::otel::tracing::Host for InstanceState {
async fn on_start(&mut self, context: wasi::otel::tracing::SpanContext) -> Result<()> {
let mut state = self.state.write().unwrap();

// Before we do anything make sure we track the original host span ID for reparenting
Expand All @@ -32,7 +32,7 @@ impl wasi_otel::Host for InstanceState {
Ok(())
}

async fn on_end(&mut self, span_data: wasi_otel::SpanData) -> Result<()> {
async fn on_end(&mut self, span_data: wasi::otel::tracing::SpanData) -> Result<()> {
let mut state = self.state.write().unwrap();

let span_context: opentelemetry::trace::SpanContext = span_data.span_context.clone().into();
Expand All @@ -42,12 +42,12 @@ impl wasi_otel::Host for InstanceState {
Err(anyhow!("Trying to end a span that was not started"))?;
}

self.processor.on_end(span_data.into());
self.span_processor.on_end(span_data.into());

Ok(())
}

async fn outer_span_context(&mut self) -> Result<wasi_otel::SpanContext> {
async fn outer_span_context(&mut self) -> Result<wasi::otel::tracing::SpanContext> {
Ok(tracing::Span::current()
.context()
.span()
Expand All @@ -56,3 +56,32 @@ impl wasi_otel::Host for InstanceState {
.into())
}
}

impl wasi::otel::metrics::Host for InstanceState {
async fn export(
&mut self,
metrics: wasi::otel::metrics::ResourceMetrics,
) -> spin_core::wasmtime::Result<std::result::Result<(), wasi::otel::metrics::Error>> {
let mut rm: opentelemetry_sdk::metrics::data::ResourceMetrics = metrics.into();
match self.metric_exporter.export(&mut rm).await {
Ok(_) => Ok(Ok(())),
Err(e) => match e {
OTelSdkError::AlreadyShutdown => {
let msg = "Shutdown has already been invoked";
tracing::error!(msg);
Ok(Err(msg.to_string()))
}
OTelSdkError::InternalFailure(e) => {
let detailed_msg = format!("Internal failure: {}", e);
tracing::error!(detailed_msg);
Ok(Err("Internal failure.".to_string()))
}
OTelSdkError::Timeout(d) => {
let detailed_msg = format!("Operation timed out after {} seconds", d.as_secs());
tracing::error!(detailed_msg);
Ok(Err("Operation timed out.".to_string()))
}
},
}
}
}
66 changes: 41 additions & 25 deletions crates/factor-otel/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
mod host;

use std::{
sync::{Arc, RwLock},
};

use anyhow::bail;
use indexmap::IndexMap;
use opentelemetry::{
trace::{SpanContext, SpanId, TraceContextExt},
Context,
};
use opentelemetry_otlp::MetricExporter;
use opentelemetry_sdk::{
resource::{EnvResourceDetector, TelemetryResourceDetector},
trace::{BatchSpanProcessor, SpanProcessor},
resource::{EnvResourceDetector, ResourceDetector, TelemetryResourceDetector},
runtime::Tokio,
trace::{span_processor_with_async_runtime::BatchSpanProcessor, SpanProcessor},
Resource,
};
use spin_factors::{Factor, FactorData, PrepareContext, RuntimeFactors, SelfInstanceBuilder};
use spin_telemetry::{detector::SpinResourceDetector, env::OtlpProtocol};
use std::sync::{Arc, RwLock};
use tracing_opentelemetry::OpenTelemetrySpanExt;

pub struct OtelFactor {
processor: Arc<BatchSpanProcessor>,
span_processor: Arc<BatchSpanProcessor<Tokio>>,
metric_exporter: Arc<MetricExporter>,
}

impl Factor for OtelFactor {
Expand All @@ -30,6 +30,7 @@ impl Factor for OtelFactor {

fn init(&mut self, ctx: &mut impl spin_factors::InitContext<Self>) -> anyhow::Result<()> {
ctx.link_bindings(spin_world::wasi::otel::tracing::add_to_linker::<_, FactorData<Self>>)?;
ctx.link_bindings(spin_world::wasi::otel::metrics::add_to_linker::<_, FactorData<Self>>)?;
Ok(())
}

Expand All @@ -49,15 +50,31 @@ impl Factor for OtelFactor {
guest_span_contexts: Default::default(),
original_host_span_id: None,
})),
processor: self.processor.clone(),
span_processor: self.span_processor.clone(),
metric_exporter: self.metric_exporter.clone(),
})
}
}

impl OtelFactor {
pub fn new() -> anyhow::Result<Self> {
// This is a hack b/c we know the version of this crate will be the same as the version of Spin
let spin_version = env!("CARGO_PKG_VERSION").to_string();

let resource = Resource::builder()
.with_detectors(&[
// Set service.name from env OTEL_SERVICE_NAME > env OTEL_RESOURCE_ATTRIBUTES > spin
// Set service.version from Spin metadata
Box::new(SpinResourceDetector::new(spin_version)) as Box<dyn ResourceDetector>,
// Sets fields from env OTEL_RESOURCE_ATTRIBUTES
Box::new(EnvResourceDetector::new()),
// Sets telemetry.sdk{name, language, version}
Box::new(TelemetryResourceDetector),
])
.build();

// This will configure the exporter based on the OTEL_EXPORTER_* environment variables.
let exporter = match OtlpProtocol::traces_protocol_from_env() {
let span_exporter = match OtlpProtocol::traces_protocol_from_env() {
OtlpProtocol::Grpc => opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.build()?,
Expand All @@ -67,32 +84,31 @@ impl OtelFactor {
OtlpProtocol::HttpJson => bail!("http/json OTLP protocol is not supported"),
};

let mut processor = opentelemetry_sdk::trace::BatchSpanProcessor::builder(exporter).build();
let mut span_processor = BatchSpanProcessor::builder(span_exporter, Tokio).build();

// This is a hack b/c we know the version of this crate will be the same as the version of Spin
let spin_version = env!("CARGO_PKG_VERSION").to_string();
span_processor.set_resource(&resource);

let detectors: &[Box<dyn opentelemetry_sdk::resource::ResourceDetector>; 3] = &[
// Set service.name from env OTEL_SERVICE_NAME > env OTEL_RESOURCE_ATTRIBUTES > spin
// Set service.version from Spin metadata
Box::new(SpinResourceDetector::new(spin_version)),
// Sets fields from env OTEL_RESOURCE_ATTRIBUTES
Box::new(EnvResourceDetector::new()),
// Sets telemetry.sdk{name, language, version}
Box::new(TelemetryResourceDetector),
];

processor.set_resource(&Resource::builder().with_detectors(detectors).build());
let metric_exporter = match OtlpProtocol::metrics_protocol_from_env() {
OtlpProtocol::Grpc => opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.build()?,
OtlpProtocol::HttpProtobuf => opentelemetry_otlp::MetricExporter::builder()
.with_http()
.build()?,
OtlpProtocol::HttpJson => bail!("http/json OTLP protocol is not supported"),
};

Ok(Self {
processor: Arc::new(processor),
span_processor: Arc::new(span_processor),
metric_exporter: Arc::new(metric_exporter),
})
}
}

pub struct InstanceState {
pub(crate) state: Arc<RwLock<State>>,
pub(crate) processor: Arc<BatchSpanProcessor>,
pub(crate) span_processor: Arc<BatchSpanProcessor<Tokio>>,
pub(crate) metric_exporter: Arc<MetricExporter>,
}

impl SelfInstanceBuilder for InstanceState {}
Expand Down
5 changes: 3 additions & 2 deletions crates/telemetry/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use std::{ascii::escape_default, sync::OnceLock};
use anyhow::bail;
use opentelemetry::logs::{LogRecord, Logger, LoggerProvider};
use opentelemetry_sdk::{
logs::{BatchConfigBuilder, BatchLogProcessor, SdkLogger},
logs::{log_processor_with_async_runtime::BatchLogProcessor, BatchConfigBuilder, SdkLogger},
resource::{EnvResourceDetector, ResourceDetector, TelemetryResourceDetector},
runtime::Tokio,
Resource,
};

Expand Down Expand Up @@ -97,7 +98,7 @@ pub(crate) fn init_otel_logging_backend(spin_version: String) -> anyhow::Result<
let provider = opentelemetry_sdk::logs::SdkLoggerProvider::builder()
.with_resource(resource)
.with_log_processor(
BatchLogProcessor::builder(exporter)
BatchLogProcessor::builder(exporter, Tokio)
.with_batch_config(BatchConfigBuilder::default().build())
.build(),
)
Expand Down
5 changes: 3 additions & 2 deletions crates/telemetry/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use anyhow::{bail, Result};
use opentelemetry::global;
use opentelemetry_sdk::{
metrics::{PeriodicReader, SdkMeterProvider},
metrics::{periodic_reader_with_async_runtime::PeriodicReader, SdkMeterProvider},
resource::{EnvResourceDetector, ResourceDetector, TelemetryResourceDetector},
runtime::Tokio,
Resource,
};
use tracing::Subscriber;
Expand Down Expand Up @@ -45,7 +46,7 @@ pub(crate) fn otel_metrics_layer<S: Subscriber + for<'span> LookupSpan<'span>>(
OtlpProtocol::HttpJson => bail!("http/json OTLP protocol is not supported"),
};

let reader = PeriodicReader::builder(exporter).build();
let reader = PeriodicReader::builder(exporter, Tokio).build();
let meter_provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_resource(resource)
Expand Down
7 changes: 6 additions & 1 deletion crates/telemetry/src/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use anyhow::bail;
use opentelemetry::{global, trace::TracerProvider};
use opentelemetry_sdk::{
resource::{EnvResourceDetector, ResourceDetector, TelemetryResourceDetector},
runtime::Tokio,
Resource,
};
use tracing::Subscriber;
Expand Down Expand Up @@ -42,7 +43,11 @@ pub(crate) fn otel_tracing_layer<S: Subscriber + for<'span> LookupSpan<'span>>(
OtlpProtocol::HttpJson => bail!("http/json OTLP protocol is not supported"),
};

let span_processor = opentelemetry_sdk::trace::BatchSpanProcessor::builder(exporter).build();
let span_processor =
opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor::builder(
exporter, Tokio,
)
.build();

let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_resource(resource)
Expand Down
Loading