Skip to content

Commit 25cce7a

Browse files
authored
Replace stream attribute filter with allowed keys (open-telemetry#1161)
1 parent b28e41a commit 25cce7a

File tree

4 files changed

+24
-33
lines changed

4 files changed

+24
-33
lines changed

opentelemetry-sdk/benches/metric.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ fn counters(c: &mut Criterion) {
309309
Some(
310310
new_view(
311311
Instrument::new().name("*"),
312-
Stream::new().attribute_filter(|kv| kv.key == Key::new("K")),
312+
Stream::new().allowed_attribute_keys([Key::new("K")]),
313313
)
314314
.unwrap(),
315315
),

opentelemetry-sdk/src/metrics/instrument.rs

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
use std::{any::Any, borrow::Cow, fmt, hash::Hash, marker, sync::Arc};
1+
use std::{any::Any, borrow::Cow, collections::HashSet, hash::Hash, marker, sync::Arc};
22

33
use opentelemetry_api::{
44
metrics::{
55
AsyncInstrument, MetricsError, Result, SyncCounter, SyncHistogram, SyncUpDownCounter, Unit,
66
},
7-
KeyValue,
7+
Key, KeyValue,
88
};
99

1010
use crate::{
@@ -158,7 +158,7 @@ impl Instrument {
158158
/// let view = new_view(criteria, mask);
159159
/// # drop(view);
160160
/// ```
161-
#[derive(Default)]
161+
#[derive(Default, Debug)]
162162
#[non_exhaustive]
163163
pub struct Stream {
164164
/// The human-readable identifier of the stream.
@@ -169,12 +169,14 @@ pub struct Stream {
169169
pub unit: Unit,
170170
/// Aggregation the stream uses for an instrument.
171171
pub aggregation: Option<Aggregation>,
172-
/// applied to all attributes recorded for an instrument.
173-
pub attribute_filter: Option<Filter>,
172+
/// An allow-list of attribute keys that will be preserved for the stream.
173+
///
174+
/// Any attribute recorded for the stream with a key not in this set will be
175+
/// dropped. If the set is empty, all attributes will be dropped, if `None` all
176+
/// attributes will be kept.
177+
pub allowed_attribute_keys: Option<Arc<HashSet<Key>>>,
174178
}
175179

176-
type Filter = Arc<dyn Fn(&KeyValue) -> bool + Send + Sync>;
177-
178180
impl Stream {
179181
/// Create a new stream with empty values.
180182
pub fn new() -> Self {
@@ -205,25 +207,14 @@ impl Stream {
205207
self
206208
}
207209

208-
/// Set the stream attribute filter.
209-
pub fn attribute_filter(
210-
mut self,
211-
filter: impl Fn(&KeyValue) -> bool + Send + Sync + 'static,
212-
) -> Self {
213-
self.attribute_filter = Some(Arc::new(filter));
214-
self
215-
}
216-
}
210+
/// Set the stream allowed attribute keys.
211+
///
212+
/// Any attribute recorded for the stream with a key not in this set will be
213+
/// dropped. If this set is empty all attributes will be dropped.
214+
pub fn allowed_attribute_keys(mut self, attribute_keys: impl IntoIterator<Item = Key>) -> Self {
215+
self.allowed_attribute_keys = Some(Arc::new(attribute_keys.into_iter().collect()));
217216

218-
impl fmt::Debug for Stream {
219-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
220-
f.debug_struct("Stream")
221-
.field("name", &self.name)
222-
.field("description", &self.description)
223-
.field("unit", &self.unit)
224-
.field("aggregation", &self.aggregation)
225-
.field("attribute_filter", &self.attribute_filter.is_some())
226-
.finish()
217+
self
227218
}
228219
}
229220

opentelemetry-sdk/src/metrics/pipeline.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ where
303303
description: inst.description,
304304
unit: inst.unit,
305305
aggregation: None,
306-
attribute_filter: None,
306+
allowed_attribute_keys: None,
307307
};
308308

309309
match self.cached_aggregator(&inst.scope, kind, stream) {
@@ -369,8 +369,8 @@ where
369369
other => return other, // Drop aggregator or error
370370
};
371371

372-
if let Some(filter) = &stream.attribute_filter {
373-
agg = internal::new_filter(agg, Arc::clone(filter));
372+
if let Some(allowed) = stream.allowed_attribute_keys.as_ref().map(Arc::clone) {
373+
agg = internal::new_filter(agg, Arc::new(move |kv| allowed.contains(&kv.key)));
374374
}
375375

376376
self.pipeline.add_sync(

opentelemetry-sdk/src/metrics/view.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,9 @@ impl View for Box<dyn View> {
8686
/// The [Stream] mask only applies updates for non-empty fields. By default, the
8787
/// [Instrument] the [View] matches against will be use for the name,
8888
/// description, and unit of the returned [Stream] and no `aggregation` or
89-
/// `attribute_filter` are set. All non-empty fields of mask are used instead of
90-
/// the default. If you need to set a an empty value in the returned stream,
91-
/// create a custom [View] directly.
89+
/// `allowed_attribute_keys` are set. All non-empty fields of mask are used
90+
/// instead of the default. If you need to set a an empty value in the returned
91+
/// stream, create a custom [View] directly.
9292
///
9393
/// # Example
9494
///
@@ -167,7 +167,7 @@ pub fn new_view(criteria: Instrument, mask: Stream) -> Result<Box<dyn View>> {
167167
i.unit.clone()
168168
},
169169
aggregation: agg.clone(),
170-
attribute_filter: mask.attribute_filter.clone(),
170+
allowed_attribute_keys: mask.allowed_attribute_keys.clone(),
171171
})
172172
} else {
173173
None

0 commit comments

Comments
 (0)