Skip to content

Commit 8eed8e9

Browse files
authored
chore(core): Replace EventArray::for_each_X with an iterator (#13601)
The `EventArray` functions `for_each_X` are equivalent to composing an `iter_X` iterator with that iterator's `for_each` method. Since the composable solution provides for more possible uses, drop the `for_each_X` special cases in favour of new `iter_X_mut` iterator generators.
1 parent 9cc6226 commit 8eed8e9

File tree

7 files changed

+58
-39
lines changed

7 files changed

+58
-39
lines changed

lib/vector-core/src/event/array.rs

+47-28
Original file line numberDiff line numberDiff line change
@@ -137,42 +137,29 @@ pub enum EventArray {
137137
}
138138

139139
impl EventArray {
140-
/// Call the given update function over each `LogEvent` in this array.
141-
pub fn for_each_log(&mut self, update: impl FnMut(&mut LogEvent)) {
142-
if let Self::Logs(logs) = self {
143-
logs.iter_mut().for_each(update);
144-
}
145-
}
146-
147-
/// Call the given update function over each `Metric` in this array.
148-
pub fn for_each_metric(&mut self, update: impl FnMut(&mut Metric)) {
149-
if let Self::Metrics(metrics) = self {
150-
metrics.iter_mut().for_each(update);
151-
}
152-
}
153-
154-
/// Run the given update function over each `Trace` in this array.
155-
pub fn for_each_trace(&mut self, update: impl FnMut(&mut TraceEvent)) {
156-
if let Self::Traces(traces) = self {
157-
traces.iter_mut().for_each(update);
140+
/// Iterate over references to this array's events.
141+
pub fn iter_events(&self) -> impl Iterator<Item = EventRef> {
142+
match self {
143+
Self::Logs(array) => EventArrayIter::Logs(array.iter()),
144+
Self::Metrics(array) => EventArrayIter::Metrics(array.iter()),
145+
Self::Traces(array) => EventArrayIter::Traces(array.iter()),
158146
}
159147
}
160148

161-
/// Call the given update function over each event in this array.
162-
pub fn for_each_event(&mut self, mut update: impl FnMut(EventMutRef<'_>)) {
149+
/// Iterate over mutable references to this array's events.
150+
pub fn iter_events_mut(&mut self) -> impl Iterator<Item = EventMutRef> {
163151
match self {
164-
Self::Logs(array) => array.iter_mut().for_each(|log| update(log.into())),
165-
Self::Metrics(array) => array.iter_mut().for_each(|metric| update(metric.into())),
166-
Self::Traces(array) => array.iter_mut().for_each(|trace| update(trace.into())),
152+
Self::Logs(array) => EventArrayIterMut::Logs(array.iter_mut()),
153+
Self::Metrics(array) => EventArrayIterMut::Metrics(array.iter_mut()),
154+
Self::Traces(array) => EventArrayIterMut::Traces(array.iter_mut()),
167155
}
168156
}
169157

170-
/// Iterate over this array's events.
171-
pub fn iter_events(&self) -> impl Iterator<Item = EventRef> {
158+
/// Iterate over references to the logs in this array.
159+
pub fn iter_logs_mut(&mut self) -> impl Iterator<Item = &mut LogEvent> {
172160
match self {
173-
Self::Logs(array) => EventArrayIter::Logs(array.iter()),
174-
Self::Metrics(array) => EventArrayIter::Metrics(array.iter()),
175-
Self::Traces(array) => EventArrayIter::Traces(array.iter()),
161+
Self::Logs(array) => TypedArrayIterMut(Some(array.iter_mut())),
162+
_ => TypedArrayIterMut(None),
176163
}
177164
}
178165
}
@@ -348,6 +335,29 @@ impl<'a> Iterator for EventArrayIter<'a> {
348335
}
349336
}
350337

338+
/// The iterator type for `EventArray::iter_events_mut`.
339+
#[derive(Debug)]
340+
pub enum EventArrayIterMut<'a> {
341+
/// An iterator over type `LogEvent`.
342+
Logs(slice::IterMut<'a, LogEvent>),
343+
/// An iterator over type `Metric`.
344+
Metrics(slice::IterMut<'a, Metric>),
345+
/// An iterator over type `Trace`.
346+
Traces(slice::IterMut<'a, TraceEvent>),
347+
}
348+
349+
impl<'a> Iterator for EventArrayIterMut<'a> {
350+
type Item = EventMutRef<'a>;
351+
352+
fn next(&mut self) -> Option<Self::Item> {
353+
match self {
354+
Self::Logs(i) => i.next().map(EventMutRef::from),
355+
Self::Metrics(i) => i.next().map(EventMutRef::from),
356+
Self::Traces(i) => i.next().map(EventMutRef::from),
357+
}
358+
}
359+
}
360+
351361
/// The iterator type for `EventArray::into_events`.
352362
#[derive(Debug)]
353363
pub enum EventArrayIntoIter {
@@ -371,6 +381,15 @@ impl Iterator for EventArrayIntoIter {
371381
}
372382
}
373383

384+
struct TypedArrayIterMut<'a, T>(Option<slice::IterMut<'a, T>>);
385+
386+
impl<'a, T> Iterator for TypedArrayIterMut<'a, T> {
387+
type Item = &'a mut T;
388+
fn next(&mut self) -> Option<Self::Item> {
389+
self.0.as_mut().and_then(Iterator::next)
390+
}
391+
}
392+
374393
/// Intermediate buffer for conversion of a sequence of individual
375394
/// `Event`s into a sequence of `EventArray`s by coalescing contiguous
376395
/// events of the same type into one array. This is used by

src/sinks/aws_cloudwatch_logs/integration_tests.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ async fn cloudwatch_insert_log_events_sorted() {
119119
if doit {
120120
let timestamp = chrono::Utc::now() - chrono::Duration::days(1);
121121

122-
events.for_each_log(|log| {
122+
events.iter_logs_mut().for_each(|log| {
123123
log.insert(log_schema().timestamp_key(), Value::Timestamp(timestamp));
124124
});
125125
}

src/sinks/datadog/events/tests.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ fn random_events_with_stream(
3131
(
3232
lines,
3333
stream.map(|mut events| {
34-
events.for_each_log(|log| {
34+
events.iter_logs_mut().for_each(|log| {
3535
log.insert("title", "All!");
3636
log.insert("invalid", "Tik");
3737
});
@@ -128,7 +128,7 @@ async fn api_key_in_metadata() {
128128
let (expected, events) = random_events_with_stream(100, 10, None);
129129

130130
let events = events.map(|mut events| {
131-
events.for_each_log(|log| {
131+
events.iter_logs_mut().for_each(|log| {
132132
log.metadata_mut()
133133
.set_datadog_api_key(Arc::from("from_metadata"));
134134
});

src/sinks/datadog/logs/tests.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ async fn api_key_in_metadata_inner(api_status: ApiStatus) {
220220
let api_key = "0xDECAFBAD";
221221
let events = events.map(|mut e| {
222222
println!("EVENT: {:?}", e);
223-
e.for_each_log(|log| {
223+
e.iter_logs_mut().for_each(|log| {
224224
log.metadata_mut().set_datadog_api_key(Arc::from(api_key));
225225
});
226226
e
@@ -360,7 +360,7 @@ async fn enterprise_headers_inner(api_status: ApiStatus) {
360360
let api_key = "0xDECAFBAD";
361361
let events = events.map(|mut e| {
362362
println!("EVENT: {:?}", e);
363-
e.for_each_log(|log| {
363+
e.iter_logs_mut().for_each(|log| {
364364
log.metadata_mut().set_datadog_api_key(Arc::from(api_key));
365365
});
366366
e
@@ -423,7 +423,7 @@ async fn no_enterprise_headers_inner(api_status: ApiStatus) {
423423
let api_key = "0xDECAFBAD";
424424
let events = events.map(|mut e| {
425425
println!("EVENT: {:?}", e);
426-
e.for_each_log(|log| {
426+
e.iter_logs_mut().for_each(|log| {
427427
log.metadata_mut().set_datadog_api_key(Arc::from(api_key));
428428
});
429429
e

src/sinks/elasticsearch/integration_tests.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ async fn run_insert_tests_with_config(
394394
let mut doit = false;
395395
let events = events.map(move |mut events| {
396396
if doit {
397-
events.for_each_log(|log| {
397+
events.iter_logs_mut().for_each(|log| {
398398
log.insert("_type", 1);
399399
});
400400
}

src/sinks/kafka/tests.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ mod integration_test {
271271
header_1_key.to_owned(),
272272
Value::Bytes(Bytes::from(header_1_value)),
273273
);
274-
events.for_each_log(move |log| {
274+
events.iter_logs_mut().for_each(move |log| {
275275
log.insert(headers_key.as_str(), header_values.clone());
276276
});
277277
events

src/source_sender/mod.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ impl SourceSender {
102102
// events, so we have to add a map to the receiver to handle the
103103
// finalization.
104104
let recv = recv.into_stream().flat_map(move |mut events| {
105-
events.for_each_event(|mut event| {
105+
events.iter_events_mut().for_each(|mut event| {
106106
let metadata = event.metadata_mut();
107107
metadata.update_status(status);
108108
metadata.update_sources();
@@ -126,7 +126,7 @@ impl SourceSender {
126126
EventStatus::Delivered
127127
};
128128
count += 1;
129-
events.for_each_event(|mut event| {
129+
events.iter_events_mut().for_each(|mut event| {
130130
let metadata = event.metadata_mut();
131131
metadata.update_status(status);
132132
metadata.update_sources();
@@ -144,7 +144,7 @@ impl SourceSender {
144144
) -> impl Stream<Item = EventArray> + Unpin {
145145
let (inner, recv) = Inner::new_with_buffer(100, name.clone());
146146
let recv = recv.into_stream().map(move |mut events| {
147-
events.for_each_event(|mut event| {
147+
events.iter_events_mut().for_each(|mut event| {
148148
let metadata = event.metadata_mut();
149149
metadata.update_status(status);
150150
metadata.update_sources();

0 commit comments

Comments
 (0)