diff --git a/opentelemetry-appender-tracing/benches/logs.rs b/opentelemetry-appender-tracing/benches/logs.rs index ba229419d4..2a81cb9b1d 100644 --- a/opentelemetry-appender-tracing/benches/logs.rs +++ b/opentelemetry-appender-tracing/benches/logs.rs @@ -18,8 +18,8 @@ use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::logs::LogResult; use opentelemetry::KeyValue; use opentelemetry_appender_tracing::layer as tracing_layer; -use opentelemetry_sdk::export::logs::{LogData, LogExporter}; -use opentelemetry_sdk::logs::{LogProcessor, LoggerProvider}; +use opentelemetry_sdk::export::logs::LogExporter; +use opentelemetry_sdk::logs::{LogData, LogProcessor, LoggerProvider}; use opentelemetry_sdk::Resource; use pprof::criterion::{Output, PProfProfiler}; use tracing::error; @@ -34,7 +34,13 @@ struct NoopExporter { #[async_trait] impl LogExporter for NoopExporter { - async fn export<'a>(&mut self, _: Vec>) -> LogResult<()> { + async fn export( + &mut self, + _: Vec<( + &opentelemetry_sdk::logs::LogRecord, + &opentelemetry::InstrumentationLibrary, + )>, + ) -> LogResult<()> { LogResult::Ok(()) } diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index 396dec680d..83f25c1f9f 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -3,13 +3,15 @@ use std::sync::Arc; use async_trait::async_trait; use http::{header::CONTENT_TYPE, Method}; use opentelemetry::logs::{LogError, LogResult}; -use opentelemetry_sdk::export::logs::{LogData, LogExporter}; +use opentelemetry::InstrumentationLibrary; +use opentelemetry_sdk::export::logs::LogExporter; +use opentelemetry_sdk::logs::LogRecord; use super::OtlpHttpClient; #[async_trait] impl LogExporter for OtlpHttpClient { - async fn export<'a>(&mut self, batch: Vec>) -> LogResult<()> { + async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> { let client = self .client .lock() @@ -19,13 +21,7 @@ impl LogExporter for OtlpHttpClient { _ => Err(LogError::Other("exporter is already shut down".into())), })?; - //TODO: avoid cloning here. - let owned_batch = batch - .into_iter() - .map(|cow_log_data| cow_log_data.into_owned()) // Converts Cow to owned LogData - .collect::>(); - - let (body, content_type) = { self.build_logs_export_body(owned_batch)? }; + let (body, content_type) = { self.build_logs_export_body(batch)? }; let mut request = http::Request::builder() .method(Method::POST) .uri(&self.collector_endpoint) diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index 2fa3ff851b..1b60971d76 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -7,16 +7,18 @@ use crate::{ OTEL_EXPORTER_OTLP_TIMEOUT, }; use http::{HeaderName, HeaderValue, Uri}; +#[cfg(feature = "logs")] +use opentelemetry::InstrumentationLibrary; use opentelemetry_http::HttpClient; use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema; #[cfg(feature = "logs")] use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope; #[cfg(feature = "trace")] use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope; -#[cfg(feature = "logs")] -use opentelemetry_sdk::export::logs::LogData; #[cfg(feature = "trace")] use opentelemetry_sdk::export::trace::SpanData; +#[cfg(feature = "logs")] +use opentelemetry_sdk::logs::LogRecord; #[cfg(feature = "metrics")] use opentelemetry_sdk::metrics::data::ResourceMetrics; use prost::Message; @@ -328,7 +330,7 @@ impl OtlpHttpClient { #[cfg(feature = "logs")] fn build_logs_export_body( &self, - logs: Vec, + logs: Vec<(&LogRecord, &InstrumentationLibrary)>, ) -> opentelemetry::logs::LogResult<(Vec, &'static str)> { use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource); diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index b529eda511..5a8d04b97e 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -4,13 +4,16 @@ use opentelemetry::logs::{LogError, LogResult}; use opentelemetry_proto::tonic::collector::logs::v1::{ logs_service_client::LogsServiceClient, ExportLogsServiceRequest, }; -use opentelemetry_sdk::export::logs::{LogData, LogExporter}; +use opentelemetry_sdk::export::logs::LogExporter; use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope; use super::BoxInterceptor; +use opentelemetry::InstrumentationLibrary; +use opentelemetry_sdk::logs::LogRecord; + pub(crate) struct TonicLogsClient { inner: Option, #[allow(dead_code)] @@ -54,7 +57,7 @@ impl TonicLogsClient { #[async_trait] impl LogExporter for TonicLogsClient { - async fn export<'a>(&mut self, batch: Vec>) -> LogResult<()> { + async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> { let (mut client, metadata, extensions) = match &mut self.inner { Some(inner) => { let (m, e, _) = inner @@ -67,13 +70,7 @@ impl LogExporter for TonicLogsClient { None => return Err(LogError::Other("exporter is already shut down".into())), }; - //TODO: avoid cloning here. - let owned_batch = batch - .into_iter() - .map(|cow_log_data| cow_log_data.into_owned()) // Converts Cow to owned LogData - .collect::>(); - - let resource_logs = group_logs_by_resource_and_scope(owned_batch, &self.resource); + let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource); client .export(Request::from_parts( diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index 3f21697fb0..9a67bf66d9 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -14,7 +14,9 @@ use std::fmt::Debug; use opentelemetry::logs::LogError; -use opentelemetry_sdk::{export::logs::LogData, runtime::RuntimeChannel, Resource}; +use opentelemetry::InstrumentationLibrary; +use opentelemetry_sdk::logs::LogRecord; +use opentelemetry_sdk::{runtime::RuntimeChannel, Resource}; /// Compression algorithm to use, defaults to none. pub const OTEL_EXPORTER_OTLP_LOGS_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION"; @@ -98,9 +100,9 @@ impl LogExporter { #[async_trait] impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { - async fn export<'a>( + async fn export( &mut self, - batch: Vec>, + batch: Vec<(&LogRecord, &InstrumentationLibrary)>, ) -> opentelemetry::logs::LogResult<()> { self.client.export(batch).await } diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index dfd845c5d8..9ab688bac2 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -54,8 +54,8 @@ pub mod tonic { } } - impl From for LogRecord { - fn from(log_record: opentelemetry_sdk::logs::LogRecord) -> Self { + impl From<&opentelemetry_sdk::logs::LogRecord> for LogRecord { + fn from(log_record: &opentelemetry_sdk::logs::LogRecord) -> Self { let trace_context = log_record.trace_context.as_ref(); let severity_number = match log_record.severity_number { Some(Severity::Trace) => SeverityNumber::Trace, @@ -118,7 +118,7 @@ pub mod tonic { }, severity_number: severity_number.into(), severity_text: log_record.severity_text.map(Into::into).unwrap_or_default(), - body: log_record.body.map(Into::into), + body: log_record.body.clone().map(Into::into), dropped_attributes_count: 0, flags: trace_context .map(|ctx| { @@ -139,17 +139,23 @@ pub mod tonic { impl From<( - opentelemetry_sdk::export::logs::LogData, + ( + &opentelemetry_sdk::logs::LogRecord, + &opentelemetry::InstrumentationLibrary, + ), &ResourceAttributesWithSchema, )> for ResourceLogs { fn from( data: ( - opentelemetry_sdk::export::logs::LogData, + ( + &opentelemetry_sdk::logs::LogRecord, + &opentelemetry::InstrumentationLibrary, + ), &ResourceAttributesWithSchema, ), ) -> Self { - let (log_data, resource) = data; + let ((log_record, instrumentation), resource) = data; ResourceLogs { resource: Some(Resource { @@ -158,21 +164,23 @@ pub mod tonic { }), schema_url: resource.schema_url.clone().unwrap_or_default(), scope_logs: vec![ScopeLogs { - schema_url: log_data - .instrumentation + schema_url: instrumentation .schema_url .clone() .map(Into::into) .unwrap_or_default(), - scope: Some((log_data.instrumentation, log_data.record.target.clone()).into()), - log_records: vec![log_data.record.into()], + scope: Some((instrumentation, log_record.target.clone()).into()), + log_records: vec![log_record.into()], }], } } } pub fn group_logs_by_resource_and_scope( - logs: Vec, + logs: Vec<( + &opentelemetry_sdk::logs::LogRecord, + &opentelemetry::InstrumentationLibrary, + )>, resource: &ResourceAttributesWithSchema, ) -> Vec { // Group logs by target or instrumentation name @@ -180,15 +188,20 @@ pub mod tonic { HashMap::new(), |mut scope_map: HashMap< Cow<'static, str>, - Vec<&opentelemetry_sdk::export::logs::LogData>, + Vec<( + &opentelemetry_sdk::logs::LogRecord, + &opentelemetry::InstrumentationLibrary, + )>, >, - log| { - let key = log - .record + (log_record, instrumentation)| { + let key = log_record .target .clone() - .unwrap_or_else(|| log.instrumentation.name.clone()); - scope_map.entry(key).or_default().push(log); + .unwrap_or_else(|| Cow::Owned(instrumentation.name.clone().into_owned())); + scope_map + .entry(key) + .or_default() + .push((log_record, instrumentation)); scope_map }, ); @@ -197,13 +210,13 @@ pub mod tonic { .into_iter() .map(|(key, log_data)| ScopeLogs { scope: Some(InstrumentationScope::from(( - &log_data.first().unwrap().instrumentation, - Some(key), + log_data.first().unwrap().1, + Some(key.into_owned().into()), ))), schema_url: resource.schema_url.clone().unwrap_or_default(), log_records: log_data .into_iter() - .map(|log_data| log_data.record.clone().into()) + .map(|(log_record, _)| log_record.into()) .collect(), }) .collect(); @@ -223,30 +236,29 @@ pub mod tonic { mod tests { use crate::transform::common::tonic::ResourceAttributesWithSchema; use opentelemetry::logs::LogRecord as _; - use opentelemetry_sdk::export::logs::LogData; + use opentelemetry::InstrumentationLibrary; use opentelemetry_sdk::{logs::LogRecord, Resource}; use std::time::SystemTime; - fn create_test_log_data(instrumentation_name: &str, _message: &str) -> LogData { + fn create_test_log_data( + instrumentation_name: &str, + _message: &str, + ) -> (LogRecord, InstrumentationLibrary) { let mut logrecord = LogRecord::default(); logrecord.set_timestamp(SystemTime::now()); logrecord.set_observed_timestamp(SystemTime::now()); - LogData { - instrumentation: opentelemetry_sdk::InstrumentationLibrary::builder( - instrumentation_name.to_string(), - ) - .build(), - record: logrecord, - } + let instrumentation = + InstrumentationLibrary::builder(instrumentation_name.to_string()).build(); + (logrecord, instrumentation) } #[test] fn test_group_logs_by_resource_and_scope_single_scope() { let resource = Resource::default(); - let log1 = create_test_log_data("test-lib", "Log 1"); - let log2 = create_test_log_data("test-lib", "Log 2"); + let log_data1 = create_test_log_data("test-lib", "Log 1"); + let log_data2 = create_test_log_data("test-lib", "Log 2"); - let logs = vec![log1, log2]; + let logs = vec![(&log_data1.0, &log_data1.1), (&log_data2.0, &log_data2.1)]; let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema let grouped_logs = @@ -263,10 +275,10 @@ mod tests { #[test] fn test_group_logs_by_resource_and_scope_multiple_scopes() { let resource = Resource::default(); - let log1 = create_test_log_data("lib1", "Log 1"); - let log2 = create_test_log_data("lib2", "Log 2"); + let log_data1 = create_test_log_data("lib1", "Log 1"); + let log_data2 = create_test_log_data("lib2", "Log 2"); - let logs = vec![log1, log2]; + let logs = vec![(&log_data1.0, &log_data1.1), (&log_data2.0, &log_data2.1)]; let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema let grouped_logs = crate::transform::logs::tonic::group_logs_by_resource_and_scope(logs, &resource); diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 496cc3543f..c674857dd2 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -25,6 +25,44 @@ [#2021](https://github.com/open-telemetry/opentelemetry-rust/pull/2021) - Provide default implementation for `event_enabled` method in `LogProcessor` trait that returns `true` always. +- **Breaking** [#2035](https://github.com/open-telemetry/opentelemetry-rust/pull/2035) + - The Exporter::export() interface is modified as below: + Previous Signature: + ```rust + async fn export<'a>(&mut self, batch: Vec>) -> LogResult<()>; + ``` + + Updated Signature: + ```rust + async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()>; + ``` + This change simplifies the processing required by exporters. Exporters no longer need to determine if the LogData is borrowed or owned, as they now work directly with references. As a result, exporters must explicitly create a copy of LogRecord and/or InstrumentationLibrary when needed, as the new interface only provides references to these structures. + + - The LogData structure is NO longer the part of the export interface. So it has been moved from `opentelemetry_sdk::export::logs` to `opentelemetry_sdk::logs` namespace. The custom implementations of `LogProcessor` need to update the imports accordindgly. + + - The LogData structure has been changed as below: + Previous Signature + ```rust + #[derive(Clone, Debug)] + pub struct LogData { + /// Log record + pub record: LogRecord, + /// Instrumentation details for the emitter who produced this `LogEvent`. + pub instrumentation: InstrumentationLibrary, + ``` + + Updated Signature: + ```rust + #[derive(Clone, Debug)] + pub struct LogData<'a> { + /// Log record, which can be borrowed or owned. + pub record: Cow<'a, LogRecord>, + /// Instrumentation details for the emitter who produced this `LogEvent`. + pub instrumentation: Cow<'a, InstrumentationLibrary>, + } + ``` + The custom implementation of `LogProcessor` need to accordingly modify the handling of LogData + received through LogProcessor::emit() interface. ## v0.24.1 diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index 7fd1fafc7a..8f569fa6f9 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -61,15 +61,15 @@ name = "span_builder" harness = false [[bench]] -name = "metric_counter" +name = "metrics_counter" harness = false [[bench]] -name = "metric_gauge" +name = "metrics_gauge" harness = false [[bench]] -name = "metric_histogram" +name = "metrics_histogram" harness = false [[bench]] diff --git a/opentelemetry-sdk/benches/log.rs b/opentelemetry-sdk/benches/log.rs index 71d5fc699f..840560a1f4 100644 --- a/opentelemetry-sdk/benches/log.rs +++ b/opentelemetry-sdk/benches/log.rs @@ -26,7 +26,7 @@ use opentelemetry::logs::{ use opentelemetry::trace::Tracer; use opentelemetry::trace::TracerProvider as _; use opentelemetry::Key; -use opentelemetry_sdk::export::logs::LogData; +use opentelemetry_sdk::logs::LogData; use opentelemetry_sdk::logs::LogProcessor; use opentelemetry_sdk::logs::{Logger, LoggerProvider}; use opentelemetry_sdk::trace; diff --git a/opentelemetry-sdk/benches/log_exporter.rs b/opentelemetry-sdk/benches/log_exporter.rs index 97069db21c..73fde7d61d 100644 --- a/opentelemetry-sdk/benches/log_exporter.rs +++ b/opentelemetry-sdk/benches/log_exporter.rs @@ -18,8 +18,10 @@ use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::logs::{LogRecord as _, LogResult, Logger as _, LoggerProvider as _, Severity}; -use opentelemetry_sdk::export::logs::LogData; +use opentelemetry::InstrumentationLibrary; +use opentelemetry_sdk::logs::LogData; use opentelemetry_sdk::logs::LogProcessor; +use opentelemetry_sdk::logs::LogRecord; use opentelemetry_sdk::logs::LoggerProvider; use pprof::criterion::{Output, PProfProfiler}; use std::fmt::Debug; @@ -28,11 +30,11 @@ use std::fmt::Debug; // cargo bench --bench log_exporter #[async_trait] pub trait LogExporterWithFuture: Send + Sync + Debug { - async fn export(&mut self, batch: Vec); + async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>); } pub trait LogExporterWithoutFuture: Send + Sync + Debug { - fn export(&mut self, batch: Vec); + fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>); } #[derive(Debug)] @@ -40,13 +42,13 @@ struct NoOpExporterWithFuture {} #[async_trait] impl LogExporterWithFuture for NoOpExporterWithFuture { - async fn export(&mut self, _batch: Vec) {} + async fn export(&mut self, _batch: Vec<(&LogRecord, &InstrumentationLibrary)>) {} } #[derive(Debug)] struct NoOpExporterWithoutFuture {} impl LogExporterWithoutFuture for NoOpExporterWithoutFuture { - fn export(&mut self, _batch: Vec) {} + fn export(&mut self, _batch: Vec<(&LogRecord, &InstrumentationLibrary)>) {} } #[derive(Debug)] @@ -65,7 +67,9 @@ impl ExportingProcessorWithFuture { impl LogProcessor for ExportingProcessorWithFuture { fn emit(&self, data: &mut LogData) { let mut exporter = self.exporter.lock().expect("lock error"); - futures_executor::block_on(exporter.export(vec![data.clone()])); + futures_executor::block_on( + exporter.export(vec![(data.record.as_ref(), data.instrumentation.as_ref())]), + ); } fn force_flush(&self) -> LogResult<()> { @@ -95,7 +99,7 @@ impl LogProcessor for ExportingProcessorWithoutFuture { self.exporter .lock() .expect("lock error") - .export(vec![data.clone()]); + .export(vec![(data.record.as_ref(), data.instrumentation.as_ref())]); } fn force_flush(&self) -> LogResult<()> { diff --git a/opentelemetry-sdk/benches/log_processor.rs b/opentelemetry-sdk/benches/log_processor.rs index c75dee65c1..7e78897669 100644 --- a/opentelemetry-sdk/benches/log_processor.rs +++ b/opentelemetry-sdk/benches/log_processor.rs @@ -19,10 +19,8 @@ use std::{ use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::logs::{LogRecord as _, LogResult, Logger as _, LoggerProvider as _, Severity}; -use opentelemetry_sdk::{ - export::logs::LogData, - logs::{LogProcessor, LogRecord, Logger, LoggerProvider}, -}; +use opentelemetry_sdk::logs::{LogData, LogProcessor, LogRecord, Logger, LoggerProvider}; +use std::borrow::Cow; // Run this benchmark with: // cargo bench --bench log_processor @@ -45,7 +43,7 @@ fn create_log_record(logger: &Logger) -> LogRecord { struct NoopProcessor; impl LogProcessor for NoopProcessor { - fn emit(&self, _data: &mut LogData) {} + fn emit(&self, _data: &mut LogData<'_>) {} fn force_flush(&self) -> LogResult<()> { Ok(()) @@ -60,7 +58,7 @@ impl LogProcessor for NoopProcessor { struct CloningProcessor; impl LogProcessor for CloningProcessor { - fn emit(&self, data: &mut LogData) { + fn emit(&self, data: &mut LogData<'_>) { let _data_cloned = data.clone(); } @@ -75,8 +73,8 @@ impl LogProcessor for CloningProcessor { #[derive(Debug)] struct SendToChannelProcessor { - sender: std::sync::mpsc::Sender, - receiver: Arc>>, + sender: std::sync::mpsc::Sender>, + receiver: Arc>>>, } impl SendToChannelProcessor { @@ -104,7 +102,10 @@ impl SendToChannelProcessor { impl LogProcessor for SendToChannelProcessor { fn emit(&self, data: &mut LogData) { - let data_cloned = data.clone(); + let data_cloned = LogData { + record: Cow::Owned(data.record.clone().into_owned()), + instrumentation: Cow::Owned(data.instrumentation.clone().into_owned()), + }; let res = self.sender.send(data_cloned); if res.is_err() { println!("Error sending log data to channel {0}", res.err().unwrap()); diff --git a/opentelemetry-sdk/benches/metric_counter.rs b/opentelemetry-sdk/benches/metrics_counter.rs similarity index 99% rename from opentelemetry-sdk/benches/metric_counter.rs rename to opentelemetry-sdk/benches/metrics_counter.rs index 34afa4defc..b6951664cd 100644 --- a/opentelemetry-sdk/benches/metric_counter.rs +++ b/opentelemetry-sdk/benches/metrics_counter.rs @@ -36,7 +36,7 @@ static ATTRIBUTE_VALUES: [&str; 10] = [ ]; // Run this benchmark with: -// cargo bench --bench metric_counter +// cargo bench --bench metrics_counter fn create_counter(name: &'static str) -> Counter { let meter_provider: SdkMeterProvider = SdkMeterProvider::builder() .with_reader(ManualReader::builder().build()) diff --git a/opentelemetry-sdk/benches/metric_gauge.rs b/opentelemetry-sdk/benches/metrics_gauge.rs similarity index 98% rename from opentelemetry-sdk/benches/metric_gauge.rs rename to opentelemetry-sdk/benches/metrics_gauge.rs index b4d5686180..b63c8a7b52 100644 --- a/opentelemetry-sdk/benches/metric_gauge.rs +++ b/opentelemetry-sdk/benches/metrics_gauge.rs @@ -32,7 +32,7 @@ static ATTRIBUTE_VALUES: [&str; 10] = [ ]; // Run this benchmark with: -// cargo bench --bench metric_gauge +// cargo bench --bench metrics_gauge fn create_gauge() -> Gauge { let meter_provider: SdkMeterProvider = SdkMeterProvider::builder() .with_reader(ManualReader::builder().build()) diff --git a/opentelemetry-sdk/benches/metric_histogram.rs b/opentelemetry-sdk/benches/metrics_histogram.rs similarity index 98% rename from opentelemetry-sdk/benches/metric_histogram.rs rename to opentelemetry-sdk/benches/metrics_histogram.rs index eca50ec649..517877e673 100644 --- a/opentelemetry-sdk/benches/metric_histogram.rs +++ b/opentelemetry-sdk/benches/metrics_histogram.rs @@ -34,7 +34,7 @@ static ATTRIBUTE_VALUES: [&str; 10] = [ ]; // Run this benchmark with: -// cargo bench --bench metric_histogram +// cargo bench --bench metrics_histogram fn create_histogram(name: &'static str) -> Histogram { let meter_provider: SdkMeterProvider = SdkMeterProvider::builder() .with_reader(ManualReader::builder().build()) diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index e1426553a1..353c89042c 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -8,14 +8,16 @@ use opentelemetry::{ logs::{LogError, LogResult}, InstrumentationLibrary, }; -use std::borrow::Cow; use std::fmt::Debug; /// `LogExporter` defines the interface that log exporters should implement. #[async_trait] pub trait LogExporter: Send + Sync + Debug { - /// Exports a batch of [`LogData`]. - async fn export<'a>(&mut self, batch: Vec>) -> LogResult<()>; + /// Exports a batch of [`LogRecord`, `InstrumentationLibrary`]. + async fn export( + &mut self, + records: Vec<(&LogRecord, &InstrumentationLibrary)>, + ) -> LogResult<()>; /// Shuts down the exporter. fn shutdown(&mut self) {} #[cfg(feature = "logs_level_enabled")] @@ -28,14 +30,5 @@ pub trait LogExporter: Send + Sync + Debug { fn set_resource(&mut self, _resource: &Resource) {} } -/// `LogData` represents a single log event without resource context. -#[derive(Clone, Debug)] -pub struct LogData { - /// Log record - pub record: LogRecord, - /// Instrumentation details for the emitter who produced this `LogEvent`. - pub instrumentation: InstrumentationLibrary, -} - /// Describes the result of an export. pub type ExportResult = Result<(), LogError>; diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index c9d3e5a828..ff4f7b57ba 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -1,9 +1,6 @@ use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, TraceContext}; -use crate::{ - export::logs::{LogData, LogExporter}, - runtime::RuntimeChannel, - Resource, -}; +use crate::logs::LogData; +use crate::{export::logs::LogExporter, runtime::RuntimeChannel, Resource}; use opentelemetry::{ global, logs::{LogError, LogResult}, @@ -274,8 +271,8 @@ impl opentelemetry::logs::Logger for Logger { } let mut data = LogData { - record: log_record, - instrumentation: self.instrumentation_library().clone(), + record: Cow::Borrowed(&log_record), + instrumentation: Cow::Borrowed(self.instrumentation_library()), }; for p in processors { @@ -336,7 +333,7 @@ mod tests { } impl LogProcessor for ShutdownTestLogProcessor { - fn emit(&self, _data: &mut LogData) { + fn emit(&self, _data: &mut LogData<'_>) { self.is_shutdown .lock() .map(|is_shutdown| { @@ -566,7 +563,7 @@ mod tests { } impl LogProcessor for LazyLogProcessor { - fn emit(&self, _data: &mut LogData) { + fn emit(&self, _data: &mut LogData<'_>) { // nothing to do. } diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 7366f19791..072e350ca4 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -1,5 +1,6 @@ +use crate::logs::LogData; use crate::{ - export::logs::{ExportResult, LogData, LogExporter}, + export::logs::{ExportResult, LogExporter}, runtime::{RuntimeChannel, TrySend}, Resource, }; @@ -55,7 +56,7 @@ pub trait LogProcessor: Send + Sync + Debug { /// /// # Parameters /// - `data`: A mutable reference to `LogData` representing the log record. - fn emit(&self, data: &mut LogData); + fn emit(&self, data: &mut LogData<'_>); /// Force the logs lying in the cache to be exported. fn force_flush(&self) -> LogResult<()>; /// Shuts down the processor. @@ -93,7 +94,7 @@ impl SimpleLogProcessor { } impl LogProcessor for SimpleLogProcessor { - fn emit(&self, data: &mut LogData) { + fn emit(&self, data: &mut LogData<'_>) { // noop after shutdown if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { return; @@ -104,7 +105,10 @@ impl LogProcessor for SimpleLogProcessor { .lock() .map_err(|_| LogError::Other("simple logprocessor mutex poison".into())) .and_then(|mut exporter| { - futures_executor::block_on(exporter.export(vec![Cow::Borrowed(data)])) + // Extract references to LogRecord and InstrumentationLibrary + let log_record = data.record.as_ref(); + let instrumentation = data.instrumentation.as_ref(); + futures_executor::block_on(exporter.export(vec![(log_record, instrumentation)])) }); if let Err(err) = result { global::handle_error(err); @@ -150,10 +154,14 @@ impl Debug for BatchLogProcessor { } impl LogProcessor for BatchLogProcessor { - fn emit(&self, data: &mut LogData) { + fn emit(&self, data: &mut LogData<'_>) { + let owned_data = LogData { + record: Cow::Owned(data.record.clone().into_owned()), + instrumentation: Cow::Owned(data.instrumentation.clone().into_owned()), + }; let result = self .message_sender - .try_send(BatchMessage::ExportLog(data.clone())); + .try_send(BatchMessage::ExportLog(owned_data)); if let Err(err) = result { global::handle_error(LogError::Other(err.into())); @@ -300,7 +308,7 @@ async fn export_with_timeout<'a, R, E>( time_out: Duration, exporter: &mut E, runtime: &R, - batch: Vec>, + batch: Vec>>, ) -> ExportResult where R: RuntimeChannel, @@ -309,8 +317,13 @@ where if batch.is_empty() { return Ok(()); } + // Convert the Vec<&LogData> to Vec<(&LogRecord, &InstrumentationLibrary)> + let export_batch = batch + .iter() + .map(|log_data| (log_data.record.as_ref(), log_data.instrumentation.as_ref())) + .collect(); - let export = exporter.export(batch); + let export = exporter.export(export_batch); let timeout = runtime.delay(time_out); pin_mut!(export); pin_mut!(timeout); @@ -490,7 +503,7 @@ where #[derive(Debug)] enum BatchMessage { /// Export logs, usually called when the log is emitted. - ExportLog(LogData), + ExportLog(LogData<'static>), /// Flush the current buffer to the backend, it can be triggered by /// pre configured interval or a call to `force_push` function. Flush(Option>), @@ -506,9 +519,11 @@ mod tests { BatchLogProcessor, OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, OTEL_BLRP_MAX_QUEUE_SIZE, OTEL_BLRP_SCHEDULE_DELAY, }; + use crate::logs::LogData; + use crate::logs::LogRecord; use crate::testing::logs::InMemoryLogsExporterBuilder; use crate::{ - export::logs::{LogData, LogExporter}, + export::logs::LogExporter, logs::{ log_processor::{ OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, @@ -523,6 +538,7 @@ mod tests { use async_trait::async_trait; use opentelemetry::logs::AnyValue; use opentelemetry::logs::{Logger, LoggerProvider as _}; + use opentelemetry::InstrumentationLibrary; use opentelemetry::Key; use opentelemetry::{logs::LogResult, KeyValue}; use std::borrow::Cow; @@ -536,7 +552,10 @@ mod tests { #[async_trait] impl LogExporter for MockLogExporter { - async fn export<'a>(&mut self, _batch: Vec>) -> LogResult<()> { + async fn export( + &mut self, + _batch: Vec<(&LogRecord, &InstrumentationLibrary)>, + ) -> LogResult<()> { Ok(()) } @@ -805,20 +824,26 @@ mod tests { #[derive(Debug)] struct FirstProcessor { - pub(crate) logs: Arc>>, + pub(crate) logs: Arc>>>, } impl LogProcessor for FirstProcessor { - fn emit(&self, data: &mut LogData) { + fn emit(&self, data: &mut LogData<'_>) { // add attribute - data.record.attributes.get_or_insert(vec![]).push(( + let record = data.record.to_mut(); + record.attributes.get_or_insert(vec![]).push(( Key::from_static_str("processed_by"), AnyValue::String("FirstProcessor".into()), )); // update body - data.record.body = Some("Updated by FirstProcessor".into()); - - self.logs.lock().unwrap().push(data.clone()); //clone as the LogProcessor is storing the data. + record.body = Some("Updated by FirstProcessor".into()); + // Convert the modified LogData to an owned version + let owned_data = LogData { + record: Cow::Owned(record.clone()), // Since record is already owned, no need to clone deeply + instrumentation: Cow::Owned(data.instrumentation.clone().into_owned()), + }; + + self.logs.lock().unwrap().push(owned_data); //clone as the LogProcessor is storing the data. } fn force_flush(&self) -> LogResult<()> { @@ -832,11 +857,11 @@ mod tests { #[derive(Debug)] struct SecondProcessor { - pub(crate) logs: Arc>>, + pub(crate) logs: Arc>>>, } impl LogProcessor for SecondProcessor { - fn emit(&self, data: &mut LogData) { + fn emit(&self, data: &mut LogData<'_>) { assert!(data.record.attributes.as_ref().map_or(false, |attrs| { attrs.iter().any(|(key, value)| { key.as_str() == "processed_by" @@ -847,7 +872,12 @@ mod tests { data.record.body.clone().unwrap() == AnyValue::String("Updated by FirstProcessor".into()) ); - self.logs.lock().unwrap().push(data.clone()); + let record = data.record.to_mut(); + let owned_data = LogData { + record: Cow::Owned(record.clone()), // Convert the record to owned + instrumentation: Cow::Owned(data.instrumentation.clone().into_owned()), + }; + self.logs.lock().unwrap().push(owned_data); } fn force_flush(&self) -> LogResult<()> { diff --git a/opentelemetry-sdk/src/logs/mod.rs b/opentelemetry-sdk/src/logs/mod.rs index 5d2e72719b..92e384ee41 100644 --- a/opentelemetry-sdk/src/logs/mod.rs +++ b/opentelemetry-sdk/src/logs/mod.rs @@ -4,12 +4,23 @@ mod log_emitter; mod log_processor; mod record; +use crate::InstrumentationLibrary; pub use log_emitter::{Builder, Logger, LoggerProvider}; pub use log_processor::{ BatchConfig, BatchConfigBuilder, BatchLogProcessor, BatchLogProcessorBuilder, LogProcessor, SimpleLogProcessor, }; pub use record::{LogRecord, TraceContext}; +use std::borrow::Cow; + +/// `LogData` represents a single log event without resource context. +#[derive(Clone, Debug)] +pub struct LogData<'a> { + /// Log record, which can be borrowed or owned. + pub record: Cow<'a, LogRecord>, + /// Instrumentation details for the emitter who produced this `LogEvent`. + pub instrumentation: Cow<'a, InstrumentationLibrary>, +} #[cfg(all(test, feature = "testing"))] mod tests { diff --git a/opentelemetry-sdk/src/metrics/internal/aggregate.rs b/opentelemetry-sdk/src/metrics/internal/aggregate.rs index 61619b2039..79a06f7242 100644 --- a/opentelemetry-sdk/src/metrics/internal/aggregate.rs +++ b/opentelemetry-sdk/src/metrics/internal/aggregate.rs @@ -5,11 +5,8 @@ use opentelemetry::KeyValue; use crate::metrics::data::{Aggregation, Gauge, Temporality}; use super::{ - exponential_histogram::ExpoHistogram, - histogram::Histogram, - last_value::LastValue, - sum::{PrecomputedSum, Sum}, - Number, + exponential_histogram::ExpoHistogram, histogram::Histogram, last_value::LastValue, + precomputed_sum::PrecomputedSum, sum::Sum, Number, }; const STREAM_CARDINALITY_LIMIT: u32 = 2000; diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 3d89479f5a..41f97aa20b 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -2,6 +2,7 @@ mod aggregate; mod exponential_histogram; mod histogram; mod last_value; +mod precomputed_sum; mod sum; use core::fmt; diff --git a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs new file mode 100644 index 0000000000..14e9c19b25 --- /dev/null +++ b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs @@ -0,0 +1,193 @@ +use opentelemetry::KeyValue; + +use crate::metrics::data::{self, Aggregation, DataPoint, Temporality}; + +use super::{Assign, AtomicTracker, Number, ValueMap}; +use std::{ + collections::{HashMap, HashSet}, + sync::{atomic::Ordering, Arc, Mutex}, + time::SystemTime, +}; + +/// Summarizes a set of pre-computed sums as their arithmetic sum. +pub(crate) struct PrecomputedSum> { + value_map: ValueMap, + monotonic: bool, + start: Mutex, + reported: Mutex, T>>, +} + +impl> PrecomputedSum { + pub(crate) fn new(monotonic: bool) -> Self { + PrecomputedSum { + value_map: ValueMap::new(), + monotonic, + start: Mutex::new(SystemTime::now()), + reported: Mutex::new(Default::default()), + } + } + + pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { + // The argument index is not applicable to PrecomputedSum. + self.value_map.measure(measurement, attrs, 0); + } + + pub(crate) fn delta( + &self, + dest: Option<&mut dyn Aggregation>, + ) -> (usize, Option>) { + let t = SystemTime::now(); + let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); + + let s_data = dest.and_then(|d| d.as_mut().downcast_mut::>()); + let mut new_agg = if s_data.is_none() { + Some(data::Sum { + data_points: vec![], + temporality: Temporality::Delta, + is_monotonic: self.monotonic, + }) + } else { + None + }; + let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); + s_data.data_points.clear(); + s_data.temporality = Temporality::Delta; + s_data.is_monotonic = self.monotonic; + + // Max number of data points need to account for the special casing + // of the no attribute value + overflow attribute. + let n = self.value_map.count.load(Ordering::SeqCst) + 2; + if n > s_data.data_points.capacity() { + s_data + .data_points + .reserve_exact(n - s_data.data_points.capacity()); + } + let mut new_reported = HashMap::with_capacity(n); + let mut reported = match self.reported.lock() { + Ok(r) => r, + Err(_) => return (0, None), + }; + + if self + .value_map + .has_no_attribute_value + .swap(false, Ordering::AcqRel) + { + let value = self.value_map.no_attribute_tracker.get_value(); + let delta = value - *reported.get(&vec![]).unwrap_or(&T::default()); + new_reported.insert(vec![], value); + + s_data.data_points.push(DataPoint { + attributes: vec![], + start_time: Some(prev_start), + time: Some(t), + value: delta, + exemplars: vec![], + }); + } + + let mut trackers = match self.value_map.trackers.write() { + Ok(v) => v, + Err(_) => return (0, None), + }; + + let mut seen = HashSet::new(); + for (attrs, tracker) in trackers.drain() { + if seen.insert(Arc::as_ptr(&tracker)) { + let value = tracker.get_value(); + let delta = value - *reported.get(&attrs).unwrap_or(&T::default()); + new_reported.insert(attrs.clone(), value); + s_data.data_points.push(DataPoint { + attributes: attrs.clone(), + start_time: Some(prev_start), + time: Some(t), + value: delta, + exemplars: vec![], + }); + } + } + + // The delta collection cycle resets. + if let Ok(mut start) = self.start.lock() { + *start = t; + } + self.value_map.count.store(0, Ordering::SeqCst); + + *reported = new_reported; + drop(reported); // drop before values guard is dropped + + ( + s_data.data_points.len(), + new_agg.map(|a| Box::new(a) as Box<_>), + ) + } + + pub(crate) fn cumulative( + &self, + dest: Option<&mut dyn Aggregation>, + ) -> (usize, Option>) { + let t = SystemTime::now(); + let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); + + let s_data = dest.and_then(|d| d.as_mut().downcast_mut::>()); + let mut new_agg = if s_data.is_none() { + Some(data::Sum { + data_points: vec![], + temporality: Temporality::Cumulative, + is_monotonic: self.monotonic, + }) + } else { + None + }; + let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); + s_data.data_points.clear(); + s_data.temporality = Temporality::Cumulative; + s_data.is_monotonic = self.monotonic; + + // Max number of data points need to account for the special casing + // of the no attribute value + overflow attribute. + let n = self.value_map.count.load(Ordering::SeqCst) + 2; + if n > s_data.data_points.capacity() { + s_data + .data_points + .reserve_exact(n - s_data.data_points.capacity()); + } + + if self + .value_map + .has_no_attribute_value + .load(Ordering::Acquire) + { + s_data.data_points.push(DataPoint { + attributes: vec![], + start_time: Some(prev_start), + time: Some(t), + value: self.value_map.no_attribute_tracker.get_value(), + exemplars: vec![], + }); + } + + let trackers = match self.value_map.trackers.write() { + Ok(v) => v, + Err(_) => return (0, None), + }; + + let mut seen = HashSet::new(); + for (attrs, tracker) in trackers.iter() { + if seen.insert(Arc::as_ptr(tracker)) { + s_data.data_points.push(DataPoint { + attributes: attrs.clone(), + start_time: Some(prev_start), + time: Some(t), + value: tracker.get_value(), + exemplars: vec![], + }); + } + } + + ( + s_data.data_points.len(), + new_agg.map(|a| Box::new(a) as Box<_>), + ) + } +} diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 819c8ff58f..68a58d1e8d 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -2,13 +2,13 @@ use std::collections::HashSet; use std::sync::atomic::Ordering; use std::sync::Arc; use std::vec; -use std::{collections::HashMap, sync::Mutex, time::SystemTime}; +use std::{sync::Mutex, time::SystemTime}; use crate::metrics::data::{self, Aggregation, DataPoint, Temporality}; use opentelemetry::KeyValue; -use super::{Assign, Increment, ValueMap}; use super::{AtomicTracker, Number}; +use super::{Increment, ValueMap}; /// Summarizes a set of measurements made as their arithmetic sum. pub(crate) struct Sum> { @@ -185,186 +185,3 @@ impl> Sum { ) } } - -/// Summarizes a set of pre-computed sums as their arithmetic sum. -pub(crate) struct PrecomputedSum> { - value_map: ValueMap, - monotonic: bool, - start: Mutex, - reported: Mutex, T>>, -} - -impl> PrecomputedSum { - pub(crate) fn new(monotonic: bool) -> Self { - PrecomputedSum { - value_map: ValueMap::new(), - monotonic, - start: Mutex::new(SystemTime::now()), - reported: Mutex::new(Default::default()), - } - } - - pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { - // The argument index is not applicable to PrecomputedSum. - self.value_map.measure(measurement, attrs, 0); - } - - pub(crate) fn delta( - &self, - dest: Option<&mut dyn Aggregation>, - ) -> (usize, Option>) { - let t = SystemTime::now(); - let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); - - let s_data = dest.and_then(|d| d.as_mut().downcast_mut::>()); - let mut new_agg = if s_data.is_none() { - Some(data::Sum { - data_points: vec![], - temporality: Temporality::Delta, - is_monotonic: self.monotonic, - }) - } else { - None - }; - let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); - s_data.data_points.clear(); - s_data.temporality = Temporality::Delta; - s_data.is_monotonic = self.monotonic; - - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > s_data.data_points.capacity() { - s_data - .data_points - .reserve_exact(n - s_data.data_points.capacity()); - } - let mut new_reported = HashMap::with_capacity(n); - let mut reported = match self.reported.lock() { - Ok(r) => r, - Err(_) => return (0, None), - }; - - if self - .value_map - .has_no_attribute_value - .swap(false, Ordering::AcqRel) - { - let value = self.value_map.no_attribute_tracker.get_value(); - let delta = value - *reported.get(&vec![]).unwrap_or(&T::default()); - new_reported.insert(vec![], value); - - s_data.data_points.push(DataPoint { - attributes: vec![], - start_time: Some(prev_start), - time: Some(t), - value: delta, - exemplars: vec![], - }); - } - - let mut trackers = match self.value_map.trackers.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.drain() { - if seen.insert(Arc::as_ptr(&tracker)) { - let value = tracker.get_value(); - let delta = value - *reported.get(&attrs).unwrap_or(&T::default()); - new_reported.insert(attrs.clone(), value); - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: delta, - exemplars: vec![], - }); - } - } - - // The delta collection cycle resets. - if let Ok(mut start) = self.start.lock() { - *start = t; - } - self.value_map.count.store(0, Ordering::SeqCst); - - *reported = new_reported; - drop(reported); // drop before values guard is dropped - - ( - s_data.data_points.len(), - new_agg.map(|a| Box::new(a) as Box<_>), - ) - } - - pub(crate) fn cumulative( - &self, - dest: Option<&mut dyn Aggregation>, - ) -> (usize, Option>) { - let t = SystemTime::now(); - let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); - - let s_data = dest.and_then(|d| d.as_mut().downcast_mut::>()); - let mut new_agg = if s_data.is_none() { - Some(data::Sum { - data_points: vec![], - temporality: Temporality::Cumulative, - is_monotonic: self.monotonic, - }) - } else { - None - }; - let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); - s_data.data_points.clear(); - s_data.temporality = Temporality::Cumulative; - s_data.is_monotonic = self.monotonic; - - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > s_data.data_points.capacity() { - s_data - .data_points - .reserve_exact(n - s_data.data_points.capacity()); - } - - if self - .value_map - .has_no_attribute_value - .load(Ordering::Acquire) - { - s_data.data_points.push(DataPoint { - attributes: vec![], - start_time: Some(prev_start), - time: Some(t), - value: self.value_map.no_attribute_tracker.get_value(), - exemplars: vec![], - }); - } - - let trackers = match self.value_map.trackers.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.iter() { - if seen.insert(Arc::as_ptr(tracker)) { - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: tracker.get_value(), - exemplars: vec![], - }); - } - } - - ( - s_data.data_points.len(), - new_agg.map(|a| Box::new(a) as Box<_>), - ) - } -} diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 9810e28487..6ad7303415 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -229,204 +229,42 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn counter_aggregation_overflow_delta() { - // Arrange - let mut test_context = TestContext::new(Temporality::Delta); - let counter = test_context.u64_counter("test", "my_counter", None); - - // Act - // Record measurements with A:0, A:1,.......A:1999, which just fits in the 2000 limit - for v in 0..2000 { - counter.add(100, &[KeyValue::new("A", v.to_string())]); - } - - // Empty attributes is specially treated and does not count towards the limit. - counter.add(3, &[]); - counter.add(3, &[]); - - // All of the below will now go into overflow. - counter.add(100, &[KeyValue::new("A", "foo")]); - counter.add(100, &[KeyValue::new("A", "another")]); - counter.add(100, &[KeyValue::new("A", "yet_another")]); - test_context.flush_metrics(); - - let sum = test_context.get_aggregation::>("my_counter", None); - - // Expecting 2002 metric points. (2000 + 1 overflow + Empty attributes) - assert_eq!(sum.data_points.len(), 2002); - - let data_point = - find_datapoint_with_key_value(&sum.data_points, "otel.metric.overflow", "true") - .expect("overflow point expected"); - assert_eq!(data_point.value, 300); + counter_aggregation_overflow_helper(Temporality::Delta); + } - // TODO: This is relying on the current behavior of the SDK that 0th - // point is empty attributes, but it is not guaranteed to be the case in - // the future. - let empty_attrs_data_point = &sum.data_points[0]; - assert!( - empty_attrs_data_point.attributes.is_empty(), - "Non-empty attribute set" - ); - assert_eq!( - empty_attrs_data_point.value, 6, - "Empty attributes value should be 3+3=6" - ); + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn counter_aggregation_overflow_cumulative() { + counter_aggregation_overflow_helper(Temporality::Cumulative); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - async fn counter_aggregation_attribute_order_sorted_first() { + async fn counter_aggregation_attribute_order_sorted_first_delta() { // Run this test with stdout enabled to see output. - // cargo test counter_aggregation_attribute_order_sorted_first --features=testing -- --nocapture - - counter_aggregation_attribute_order_sorted_first_helper(Temporality::Delta); - counter_aggregation_attribute_order_sorted_first_helper(Temporality::Cumulative); - - fn counter_aggregation_attribute_order_sorted_first_helper(temporality: Temporality) { - // Arrange - let mut test_context = TestContext::new(temporality); - let counter = test_context.u64_counter("test", "my_counter", None); - - // Act - // Add the same set of attributes in different order. (they are expected - // to be treated as same attributes) - // start with sorted order - counter.add( - 1, - &[ - KeyValue::new("A", "a"), - KeyValue::new("B", "b"), - KeyValue::new("C", "c"), - ], - ); - counter.add( - 1, - &[ - KeyValue::new("A", "a"), - KeyValue::new("C", "c"), - KeyValue::new("B", "b"), - ], - ); - counter.add( - 1, - &[ - KeyValue::new("B", "b"), - KeyValue::new("A", "a"), - KeyValue::new("C", "c"), - ], - ); - counter.add( - 1, - &[ - KeyValue::new("B", "b"), - KeyValue::new("C", "c"), - KeyValue::new("A", "a"), - ], - ); - counter.add( - 1, - &[ - KeyValue::new("C", "c"), - KeyValue::new("B", "b"), - KeyValue::new("A", "a"), - ], - ); - counter.add( - 1, - &[ - KeyValue::new("C", "c"), - KeyValue::new("A", "a"), - KeyValue::new("B", "b"), - ], - ); - test_context.flush_metrics(); - - let sum = test_context.get_aggregation::>("my_counter", None); - - // Expecting 1 time-series. - assert_eq!(sum.data_points.len(), 1); - - // validate the sole datapoint - let data_point1 = &sum.data_points[0]; - assert_eq!(data_point1.value, 6); - } + // cargo test counter_aggregation_attribute_order_sorted_first_delta --features=testing -- --nocapture + counter_aggregation_attribute_order_helper(Temporality::Delta, true); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - async fn counter_aggregation_attribute_order_unsorted_first() { + async fn counter_aggregation_attribute_order_sorted_first_cumulative() { // Run this test with stdout enabled to see output. - // cargo test counter_aggregation_attribute_order_unsorted_first --features=testing -- --nocapture - - counter_aggregation_attribute_order_unsorted_first_helper(Temporality::Delta); - counter_aggregation_attribute_order_unsorted_first_helper(Temporality::Cumulative); - - fn counter_aggregation_attribute_order_unsorted_first_helper(temporality: Temporality) { - // Arrange - let mut test_context = TestContext::new(temporality); - let counter = test_context.u64_counter("test", "my_counter", None); + // cargo test counter_aggregation_attribute_order_sorted_first_cumulative --features=testing -- --nocapture + counter_aggregation_attribute_order_helper(Temporality::Cumulative, true); + } - // Act - // Add the same set of attributes in different order. (they are expected - // to be treated as same attributes) - // start with unsorted order - counter.add( - 1, - &[ - KeyValue::new("A", "a"), - KeyValue::new("C", "c"), - KeyValue::new("B", "b"), - ], - ); - counter.add( - 1, - &[ - KeyValue::new("A", "a"), - KeyValue::new("B", "b"), - KeyValue::new("C", "c"), - ], - ); - counter.add( - 1, - &[ - KeyValue::new("B", "b"), - KeyValue::new("A", "a"), - KeyValue::new("C", "c"), - ], - ); - counter.add( - 1, - &[ - KeyValue::new("B", "b"), - KeyValue::new("C", "c"), - KeyValue::new("A", "a"), - ], - ); - counter.add( - 1, - &[ - KeyValue::new("C", "c"), - KeyValue::new("B", "b"), - KeyValue::new("A", "a"), - ], - ); - counter.add( - 1, - &[ - KeyValue::new("C", "c"), - KeyValue::new("A", "a"), - KeyValue::new("B", "b"), - ], - ); - test_context.flush_metrics(); + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn counter_aggregation_attribute_order_unsorted_first_delta() { + // Run this test with stdout enabled to see output. + // cargo test counter_aggregation_attribute_order_unsorted_first_delta --features=testing -- --nocapture - let sum = test_context.get_aggregation::>("my_counter", None); + counter_aggregation_attribute_order_helper(Temporality::Delta, false); + } - // Expecting 1 time-series. - assert_eq!(sum.data_points.len(), 1); + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn counter_aggregation_attribute_order_unsorted_first_cumulative() { + // Run this test with stdout enabled to see output. + // cargo test counter_aggregation_attribute_order_unsorted_first_cumulative --features=testing -- --nocapture - // validate the sole datapoint - let data_point1 = &sum.data_points[0]; - assert_eq!(data_point1.value, 6); - } + counter_aggregation_attribute_order_helper(Temporality::Cumulative, false); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] @@ -1224,6 +1062,15 @@ mod tests { histogram_multithreaded_aggregation_helper(Temporality::Delta); histogram_multithreaded_aggregation_helper(Temporality::Cumulative); } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn histogram_f64_multithreaded() { + // Run this test with stdout enabled to see output. + // cargo test histogram_f64_multithreaded --features=testing -- --nocapture + + histogram_f64_multithreaded_aggregation_helper(Temporality::Delta); + histogram_f64_multithreaded_aggregation_helper(Temporality::Cumulative); + } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn synchronous_instruments_cumulative_with_gap_in_measurements() { // Run this test with stdout enabled to see output. @@ -1723,6 +1570,143 @@ mod tests { } } + fn histogram_f64_multithreaded_aggregation_helper(temporality: Temporality) { + // Arrange + let mut test_context = TestContext::new(temporality); + let histogram = Arc::new(test_context.meter().f64_histogram("test_histogram").init()); + + for i in 0..10 { + thread::scope(|s| { + s.spawn(|| { + histogram.record(1.5, &[]); + histogram.record(4.6, &[]); + + histogram.record(5.0, &[KeyValue::new("key1", "value1")]); + histogram.record(7.3, &[KeyValue::new("key1", "value1")]); + histogram.record(18.1, &[KeyValue::new("key1", "value1")]); + + // Test concurrent collection by forcing half of the update threads to `force_flush` metrics and sleep for some time. + if i % 2 == 0 { + test_context.flush_metrics(); + thread::sleep(Duration::from_millis(i)); // Make each thread sleep for some time duration for better testing + } + + histogram.record(35.1, &[KeyValue::new("key1", "value1")]); + histogram.record(35.1, &[KeyValue::new("key1", "value1")]); + }); + }); + } + + test_context.flush_metrics(); + + // Assert + // We invoke `test_context.flush_metrics()` six times. + let histograms = test_context.get_from_multiple_aggregations::>( + "test_histogram", + None, + 6, + ); + + let ( + mut sum_zero_attributes, + mut count_zero_attributes, + mut min_zero_attributes, + mut max_zero_attributes, + ) = (0.0, 0, f64::MAX, f64::MIN); + let (mut sum_key1_value1, mut count_key1_value1, mut min_key1_value1, mut max_key1_value1) = + (0.0, 0, f64::MAX, f64::MIN); + + let mut bucket_counts_zero_attributes = vec![0; 16]; // There are 16 buckets for the default configuration + let mut bucket_counts_key1_value1 = vec![0; 16]; + + histograms.iter().for_each(|histogram| { + assert_eq!(histogram.data_points.len(), 2); // Expecting 1 time-series. + assert_eq!(histogram.temporality, temporality); + + let data_point_zero_attributes = + find_histogram_datapoint_with_no_attributes(&histogram.data_points).unwrap(); + let data_point_key1_value1 = + find_histogram_datapoint_with_key_value(&histogram.data_points, "key1", "value1") + .unwrap(); + + if temporality == Temporality::Delta { + sum_zero_attributes += data_point_zero_attributes.sum; + sum_key1_value1 += data_point_key1_value1.sum; + + count_zero_attributes += data_point_zero_attributes.count; + count_key1_value1 += data_point_key1_value1.count; + + min_zero_attributes = + min_zero_attributes.min(data_point_zero_attributes.min.unwrap()); + min_key1_value1 = min_key1_value1.min(data_point_key1_value1.min.unwrap()); + + max_zero_attributes = + max_zero_attributes.max(data_point_zero_attributes.max.unwrap()); + max_key1_value1 = max_key1_value1.max(data_point_key1_value1.max.unwrap()); + + assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16); + assert_eq!(data_point_key1_value1.bucket_counts.len(), 16); + + for (i, _) in data_point_zero_attributes.bucket_counts.iter().enumerate() { + bucket_counts_zero_attributes[i] += data_point_zero_attributes.bucket_counts[i]; + } + + for (i, _) in data_point_key1_value1.bucket_counts.iter().enumerate() { + bucket_counts_key1_value1[i] += data_point_key1_value1.bucket_counts[i]; + } + } else { + sum_zero_attributes = data_point_zero_attributes.sum; + sum_key1_value1 = data_point_key1_value1.sum; + + count_zero_attributes = data_point_zero_attributes.count; + count_key1_value1 = data_point_key1_value1.count; + + min_zero_attributes = data_point_zero_attributes.min.unwrap(); + min_key1_value1 = data_point_key1_value1.min.unwrap(); + + max_zero_attributes = data_point_zero_attributes.max.unwrap(); + max_key1_value1 = data_point_key1_value1.max.unwrap(); + + assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16); + assert_eq!(data_point_key1_value1.bucket_counts.len(), 16); + + bucket_counts_zero_attributes.clone_from(&data_point_zero_attributes.bucket_counts); + bucket_counts_key1_value1.clone_from(&data_point_key1_value1.bucket_counts); + }; + }); + + // Default buckets: + // (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, 25.0], (25.0, 50.0], (50.0, 75.0], (75.0, 100.0], (100.0, 250.0], (250.0, 500.0], + // (500.0, 750.0], (750.0, 1000.0], (1000.0, 2500.0], (2500.0, 5000.0], (5000.0, 7500.0], (7500.0, 10000.0], (10000.0, +∞). + + assert_eq!(count_zero_attributes, 20); // Each of the 10 update threads record two measurements. + assert!(f64::abs(61.0 - sum_zero_attributes) < 0.0001); // Each of the 10 update threads record measurements summing up to 6.1 (1.5 + 4.6) + assert_eq!(min_zero_attributes, 1.5); + assert_eq!(max_zero_attributes, 4.6); + + for (i, count) in bucket_counts_zero_attributes.iter().enumerate() { + match i { + 1 => assert_eq!(*count, 20), // For each of the 10 update threads, both the recorded values 1.5 and 4.6 fall under the bucket (0, 5.0]. + _ => assert_eq!(*count, 0), + } + } + + assert_eq!(count_key1_value1, 50); // Each of the 10 update threads record 5 measurements. + assert!(f64::abs(1006.0 - sum_key1_value1) < 0.0001); // Each of the 10 update threads record measurements summing up to 100.4 (5.0 + 7.3 + 18.1 + 35.1 + 35.1). + assert_eq!(min_key1_value1, 5.0); + assert_eq!(max_key1_value1, 35.1); + + for (i, count) in bucket_counts_key1_value1.iter().enumerate() { + match i { + 1 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 5.0 falls under the bucket (0, 5.0]. + 2 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 7.3 falls under the bucket (5.0, 10.0]. + 3 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 18.1 falls under the bucket (10.0, 25.0]. + 4 => assert_eq!(*count, 20), // For each of the 10 update threads, the recorded value 35.1 (recorded twice) falls under the bucket (25.0, 50.0]. + _ => assert_eq!(*count, 0), + } + } + } + fn histogram_aggregation_helper(temporality: Temporality) { // Arrange let mut test_context = TestContext::new(temporality); @@ -2025,6 +2009,131 @@ mod tests { } } + fn counter_aggregation_overflow_helper(temporality: Temporality) { + // Arrange + let mut test_context = TestContext::new(temporality); + let counter = test_context.u64_counter("test", "my_counter", None); + + // Act + // Record measurements with A:0, A:1,.......A:1999, which just fits in the 2000 limit + for v in 0..2000 { + counter.add(100, &[KeyValue::new("A", v.to_string())]); + } + + // Empty attributes is specially treated and does not count towards the limit. + counter.add(3, &[]); + counter.add(3, &[]); + + // All of the below will now go into overflow. + counter.add(100, &[KeyValue::new("A", "foo")]); + counter.add(100, &[KeyValue::new("A", "another")]); + counter.add(100, &[KeyValue::new("A", "yet_another")]); + test_context.flush_metrics(); + + let sum = test_context.get_aggregation::>("my_counter", None); + + // Expecting 2002 metric points. (2000 + 1 overflow + Empty attributes) + assert_eq!(sum.data_points.len(), 2002); + + let data_point = + find_datapoint_with_key_value(&sum.data_points, "otel.metric.overflow", "true") + .expect("overflow point expected"); + assert_eq!(data_point.value, 300); + + // let empty_attrs_data_point = &sum.data_points[0]; + let empty_attrs_data_point = find_datapoint_with_no_attributes(&sum.data_points) + .expect("Empty attributes point expected"); + assert!( + empty_attrs_data_point.attributes.is_empty(), + "Non-empty attribute set" + ); + assert_eq!( + empty_attrs_data_point.value, 6, + "Empty attributes value should be 3+3=6" + ); + } + + fn counter_aggregation_attribute_order_helper(temporality: Temporality, start_sorted: bool) { + // Arrange + let mut test_context = TestContext::new(temporality); + let counter = test_context.u64_counter("test", "my_counter", None); + + // Act + // Add the same set of attributes in different order. (they are expected + // to be treated as same attributes) + // start with sorted order + if start_sorted { + counter.add( + 1, + &[ + KeyValue::new("A", "a"), + KeyValue::new("B", "b"), + KeyValue::new("C", "c"), + ], + ); + } else { + counter.add( + 1, + &[ + KeyValue::new("A", "a"), + KeyValue::new("C", "c"), + KeyValue::new("B", "b"), + ], + ); + } + + counter.add( + 1, + &[ + KeyValue::new("A", "a"), + KeyValue::new("C", "c"), + KeyValue::new("B", "b"), + ], + ); + counter.add( + 1, + &[ + KeyValue::new("B", "b"), + KeyValue::new("A", "a"), + KeyValue::new("C", "c"), + ], + ); + counter.add( + 1, + &[ + KeyValue::new("B", "b"), + KeyValue::new("C", "c"), + KeyValue::new("A", "a"), + ], + ); + counter.add( + 1, + &[ + KeyValue::new("C", "c"), + KeyValue::new("B", "b"), + KeyValue::new("A", "a"), + ], + ); + counter.add( + 1, + &[ + KeyValue::new("C", "c"), + KeyValue::new("A", "a"), + KeyValue::new("B", "b"), + ], + ); + test_context.flush_metrics(); + + let sum = test_context.get_aggregation::>("my_counter", None); + + // Expecting 1 time-series. + assert_eq!(sum.data_points.len(), 1); + + // validate the sole datapoint + let data_point1 = &sum.data_points[0]; + assert_eq!(data_point1.value, 6); + } + fn updown_counter_aggregation_helper(temporality: Temporality) { // Arrange let mut test_context = TestContext::new(temporality); diff --git a/opentelemetry-sdk/src/metrics/test.md b/opentelemetry-sdk/src/metrics/test.md index f5eed24b15..bc0ac92da4 100644 --- a/opentelemetry-sdk/src/metrics/test.md +++ b/opentelemetry-sdk/src/metrics/test.md @@ -8,17 +8,17 @@ Tests are located in [mod.rs](mod.rs) | Test Type | Counter (Delta) | Counter (Cumulative) | UpDownCounter (Delta) | UpDownCounter (Cumulative) | Gauge (Delta) | Gauge (Cumulative) | Histogram (Delta) | Histogram (Cumulative) | |----------------------------|-----------------|----------------------|----------------------|----------------------------|---------------|--------------------|-------------------|------------------------| -| Regular aggregation test | [Yes] | [Yes] | [No] | [No] | [No] | [No] | [No] | [No] | -| No-attribute test | [Yes] | [Yes] | [No] | [No] | [No] | [No] | [No] | [No] | -| Overflow test | [Yes] | [No] | [No] | [No] | [No] | [No] | [No] | [No] | -| Attr Order Sorted First | [Yes] | [Yes] | [No] | [No] | [No] | [No] | [No] | [No] | -| Attr Order Unsorted First | [Yes] | [Yes] | [No] | [No] | [No] | [No] | [No] | [No] | +| Regular aggregation test | :white_check_mark: | :white_check_mark: | :x: | :x: | :x: | :x: | :x: | :x: | +| No-attribute test | :white_check_mark: | :white_check_mark: | :x: | :x: | :x: | :x: | :x: | :x: | +| Overflow test | :white_check_mark: | :white_check_mark: | :x: | :x: | :x: | :x: | :x: | :x: | +| Attr Order Sorted First | :white_check_mark: | :white_check_mark: | :x: | :x: | :x: | :x: | :x: | :x: | +| Attr Order Unsorted First | :white_check_mark: | :white_check_mark: | :x: | :x: | :x: | :x: | :x: | :x: | ## Observable Instruments | Test Type | ObservableCounter (Delta) | ObservableCounter (Cumulative) | ObservableGauge (Delta) | ObservableGauge (Cumulative) | ObservableUpDownCounter (Delta) | ObservableUpDownCounter (Cumulative) | |----------------------------|---------------------------|-------------------------------|-------------------------|------------------------------|---------------------------------|--------------------------------------| -| Regular aggregation test | [No] | [No] | [No] | [No] | [No] | [No] | -| No-attribute test | [No] | [No] | [No] | [No] | [No] | [No] | -| Attr Order Sorted First | [Yes] | [Yes] | [No] | [No] | [No] | [No] | [No] | [No] | -| Attr Order Unsorted First | [Yes] | [Yes] | [No] | [No] | [No] | [No] | [No] | [No] | \ No newline at end of file +| Regular aggregation test | :x: | :x: | :x: | :x: | :x: | :x: | +| No-attribute test | :x: | :x: | :x: | :x: | :x: | :x: | +| Attr Order Sorted First | :white_check_mark: | :white_check_mark: | :x: | :x: | :x: | :x: | :x: | :x: | +| Attr Order Unsorted First | :white_check_mark: | :white_check_mark: | :x: | :x: | :x: | :x: | :x: | :x: | \ No newline at end of file diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index 8068fafaec..346ef9861a 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -1,4 +1,4 @@ -use crate::export::logs::{LogData, LogExporter}; +use crate::export::logs::LogExporter; use crate::logs::LogRecord; use crate::Resource; use async_trait::async_trait; @@ -37,13 +37,23 @@ use std::sync::{Arc, Mutex}; ///# } /// ``` /// +/// #[derive(Clone, Debug)] pub struct InMemoryLogsExporter { - logs: Arc>>, + logs: Arc>>, resource: Arc>, should_reset_on_shutdown: bool, } +/// `OwnedLogData` represents a single log event without resource context. +#[derive(Debug, Clone)] +pub struct OwnedLogData { + /// Log record, which can be borrowed or owned. + pub record: LogRecord, + /// Instrumentation details for the emitter who produced this `LogEvent`. + pub instrumentation: InstrumentationLibrary, +} + impl Default for InMemoryLogsExporter { fn default() -> Self { InMemoryLogsExporterBuilder::new().build() @@ -175,10 +185,14 @@ impl InMemoryLogsExporter { #[async_trait] impl LogExporter for InMemoryLogsExporter { - async fn export<'a>(&mut self, batch: Vec>) -> LogResult<()> { + async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> { let mut logs_guard = self.logs.lock().map_err(LogError::from)?; - for log in batch.into_iter() { - logs_guard.push(log.into_owned()); + for (log_record, instrumentation) in batch.into_iter() { + let owned_log = OwnedLogData { + record: log_record.clone(), + instrumentation: instrumentation.clone(), + }; + logs_guard.push(owned_log); } Ok(()) } diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index dacefa3d8b..fd59701c0b 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -1,12 +1,12 @@ use async_trait::async_trait; use core::fmt; +use opentelemetry::InstrumentationLibrary; use opentelemetry::{ logs::{LogError, LogResult}, ExportError, }; -use opentelemetry_sdk::export::logs::{ExportResult, LogData}; +use opentelemetry_sdk::logs::LogRecord; use opentelemetry_sdk::Resource; -use std::borrow::Cow; use std::io::{stdout, Write}; type Encoder = @@ -45,14 +45,9 @@ impl fmt::Debug for LogExporter { #[async_trait] impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { /// Export spans to stdout - async fn export<'a>(&mut self, batch: Vec>) -> ExportResult { + async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> { if let Some(writer) = &mut self.writer { - // TODO - Avoid cloning logdata if it is borrowed. - let log_data = crate::logs::transform::LogData::from(( - batch.into_iter().map(Cow::into_owned).collect(), - &self.resource, - )); - let result = (self.encoder)(writer, log_data) as LogResult<()>; + let result = (self.encoder)(writer, (batch, &self.resource).into()) as LogResult<()>; result.and_then(|_| writer.write_all(b"\n").map_err(|e| Error(e).into())) } else { Err("exporter is shut down".into()) diff --git a/opentelemetry-stdout/src/logs/transform.rs b/opentelemetry-stdout/src/logs/transform.rs index 0560e0c064..84e864f469 100644 --- a/opentelemetry-stdout/src/logs/transform.rs +++ b/opentelemetry-stdout/src/logs/transform.rs @@ -16,13 +16,19 @@ pub struct LogData { impl From<( - Vec, + Vec<( + &opentelemetry_sdk::logs::LogRecord, + &opentelemetry::InstrumentationLibrary, + )>, &opentelemetry_sdk::Resource, )> for LogData { fn from( (sdk_logs, sdk_resource): ( - Vec, + Vec<( + &opentelemetry_sdk::logs::LogRecord, + &opentelemetry::InstrumentationLibrary, + )>, &opentelemetry_sdk::Resource, ), ) -> Self { @@ -30,8 +36,8 @@ impl for sdk_log in sdk_logs { let resource_schema_url = sdk_resource.schema_url().map(|s| s.to_string().into()); - let schema_url = sdk_log.instrumentation.schema_url.clone(); - let scope: Scope = sdk_log.instrumentation.clone().into(); + let schema_url = sdk_log.1.schema_url.clone(); + let scope: Scope = sdk_log.1.clone().into(); let resource: Resource = sdk_resource.into(); let rl = resource_logs @@ -43,10 +49,10 @@ impl }); match rl.scope_logs.iter_mut().find(|sl| sl.scope == scope) { - Some(sl) => sl.log_records.push(sdk_log.into()), + Some(sl) => sl.log_records.push(sdk_log.0.into()), None => rl.scope_logs.push(ScopeLogs { scope, - log_records: vec![sdk_log.into()], + log_records: vec![sdk_log.0.into()], schema_url, }), } @@ -104,18 +110,17 @@ struct LogRecord { trace_id: Option, } -impl From for LogRecord { - fn from(value: opentelemetry_sdk::export::logs::LogData) -> Self { +impl From<&opentelemetry_sdk::logs::LogRecord> for LogRecord { + fn from(record: &opentelemetry_sdk::logs::LogRecord) -> Self { LogRecord { attributes: { - let attributes = value - .record + let attributes = record .attributes_iter() .map(|(k, v)| KeyValue::from((k.clone(), v.clone()))) // Map each pair to a KeyValue .collect::>(); // Collect into a Vecs #[cfg(feature = "populate-logs-event-name")] - if let Some(event_name) = value.record.event_name { + if let Some(event_name) = record.event_name { let mut attributes_with_name = attributes; attributes_with_name.push(KeyValue::from(( "name".into(), @@ -129,33 +134,24 @@ impl From for LogRecord { #[cfg(not(feature = "populate-logs-event-name"))] attributes }, - trace_id: value - .record + trace_id: record .trace_context .as_ref() .map(|c| c.trace_id.to_string()), - span_id: value - .record + span_id: record.trace_context.as_ref().map(|c| c.span_id.to_string()), + flags: record .trace_context .as_ref() - .map(|c| c.span_id.to_string()), - flags: value - .record - .trace_context .map(|c| c.trace_flags.map(|f| f.to_u8())) .unwrap_or_default(), - time_unix_nano: value.record.timestamp, - time: value.record.timestamp, - observed_time_unix_nano: value.record.observed_timestamp.unwrap(), - observed_time: value.record.observed_timestamp.unwrap(), - severity_number: value - .record - .severity_number - .map(|u| u as u32) - .unwrap_or_default(), + time_unix_nano: record.timestamp, + time: record.timestamp, + observed_time_unix_nano: record.observed_timestamp.unwrap(), + observed_time: record.observed_timestamp.unwrap(), + severity_number: record.severity_number.map(|u| u as u32).unwrap_or_default(), dropped_attributes_count: 0, - severity_text: value.record.severity_text, - body: value.record.body.map(|a| a.into()), + severity_text: record.severity_text, + body: record.body.clone().map(|a| a.into()), } } } diff --git a/stress/src/logs.rs b/stress/src/logs.rs index 6cec97463c..1798401e32 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -20,7 +20,7 @@ mod throughput; pub struct NoOpLogProcessor; impl LogProcessor for NoOpLogProcessor { - fn emit(&self, _data: &mut opentelemetry_sdk::export::logs::LogData) {} + fn emit(&self, _data: &mut opentelemetry_sdk::logs::LogData) {} fn force_flush(&self) -> opentelemetry::logs::LogResult<()> { Ok(()) diff --git a/stress/src/metrics_histogram.rs b/stress/src/metrics_histogram.rs index 3e31dc2d4b..e0f469fc33 100644 --- a/stress/src/metrics_histogram.rs +++ b/stress/src/metrics_histogram.rs @@ -6,7 +6,7 @@ ~9.0 M/sec Hardware: AMD EPYC 7763 64-Core Processor - 2.44 GHz, 16vCPUs, - ~2.2 M /sec // Needs to be updated. + ~12.0 M /sec */ use lazy_static::lazy_static;