diff --git a/crates/fluvio/src/consumer/stream.rs b/crates/fluvio/src/consumer/stream.rs index fbd9005ceb..7b3f593396 100644 --- a/crates/fluvio/src/consumer/stream.rs +++ b/crates/fluvio/src/consumer/stream.rs @@ -1,20 +1,18 @@ -use std::pin::Pin; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::{Duration, SystemTime}; use async_channel::Sender; use fluvio_protocol::{link::ErrorCode, record::ConsumerRecord as Record}; +use futures_util::future::BoxFuture; use futures_util::stream::select_all; -use futures_util::{future::try_join_all, ready, Future, FutureExt}; +use futures_util::{future::try_join_all, ready, FutureExt}; use futures_util::Stream; use tracing::{info, warn}; use super::config::OffsetManagementStrategy; use super::{offset::OffsetLocalStore, StreamToServer}; -type OffsetFlushFuture<'a> = Pin> + Send + 'a>>; - /// Extension of [`Stream`] trait with offset management capabilities. pub trait ConsumerStream: Stream> + Unpin { /// Mark the offset of the last yelded record as committed. Depending on [`OffsetManagementStrategy`] @@ -22,7 +20,7 @@ pub trait ConsumerStream: Stream> + Unpin { fn offset_commit(&mut self) -> Result<(), ErrorCode>; /// Send the committed offset to the server. The method waits for the server's acknowledgment before it finishes. - fn offset_flush(&mut self) -> OffsetFlushFuture<'_>; + fn offset_flush(&mut self) -> BoxFuture<'_, Result<(), ErrorCode>>; } pub struct MultiplePartitionConsumerStream { @@ -120,7 +118,7 @@ where self.get_mut().offset_commit() } - fn offset_flush(&mut self) -> OffsetFlushFuture<'_> { + fn offset_flush(&mut self) -> BoxFuture<'_, Result<(), ErrorCode>> { self.get_mut().offset_flush() } } @@ -132,7 +130,7 @@ impl> + Unpin> ConsumerStream self.offset_mngt.commit() } - fn offset_flush(&mut self) -> OffsetFlushFuture<'_> { + fn offset_flush(&mut self) -> BoxFuture<'_, Result<(), ErrorCode>> { Box::pin(self.offset_mngt.flush()) } } @@ -147,7 +145,7 @@ impl> + Unpin> ConsumerStream Ok(()) } - fn offset_flush(&mut self) -> OffsetFlushFuture<'_> { + fn offset_flush(&mut self) -> BoxFuture<'_, Result<(), ErrorCode>> { let futures: Vec<_> = self.offset_mgnts.iter().map(|p| p.flush()).collect(); Box::pin(try_join_all(futures).map(|r| r.map(|_| ()))) }