Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve consumer Stream implementations #1049

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 117 additions & 135 deletions async-nats/src/jetstream/consumer/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,57 +435,48 @@ impl<'a> futures::Stream for Sequence<'a> {
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match self.next.as_mut() {
None => {
let context = self.context.clone();
let subject = self.subject.clone();
let request = self.request.clone();
let pending_messages = self.pending_messages;
let this = self.as_mut().get_mut();

let next = this.next.get_or_insert_with(|| {
let context = this.context.clone();
let subject = this.subject.clone();
let request = this.request.clone();
let pending_messages = this.pending_messages;

Box::pin(async move {
let inbox = context.client.new_inbox();
let subscriber = context
.client
.subscribe(inbox.clone().into())
.await
.map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;

context
.client
.publish_with_reply(subject.into(), inbox.into(), request)
.await
.map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;

// TODO(tp): Add timeout config and defaults.
Ok(Batch {
pending_messages,
subscriber,
context,
terminated: false,
timeout: Some(Box::pin(tokio::time::sleep(Duration::from_secs(60)))),
})
})
});

let next = self.next.insert(Box::pin(async move {
let inbox = context.client.new_inbox();
let subscriber = context
.client
.subscribe(inbox.clone().into())
.await
.map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;
match next.as_mut().poll(cx) {
Poll::Ready(result) => {
self.next = None;

context
.client
.publish_with_reply(subject.into(), inbox.into(), request)
.await
.map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;

// TODO(tp): Add timeout config and defaults.
Ok(Batch {
pending_messages,
subscriber,
context,
terminated: false,
timeout: Some(Box::pin(tokio::time::sleep(Duration::from_secs(60)))),
})
}));

match next.as_mut().poll(cx) {
Poll::Ready(result) => {
self.next = None;
Poll::Ready(Some(result.map_err(|err| {
MessagesError::with_source(MessagesErrorKind::Pull, err)
})))
}
Poll::Pending => Poll::Pending,
}
Poll::Ready(Some(result.map_err(|err| {
MessagesError::with_source(MessagesErrorKind::Pull, err)
})))
}

Some(next) => match next.as_mut().poll(cx) {
Poll::Ready(result) => {
self.next = None;
Poll::Ready(Some(result.map_err(|err| {
MessagesError::with_source(MessagesErrorKind::Pull, err)
})))
}
Poll::Pending => Poll::Pending,
},
Poll::Pending => Poll::Pending,
}
}
}
Expand Down Expand Up @@ -742,34 +733,32 @@ impl<'a> futures::Stream for Ordered<'a> {
// Poll messages
if let Some(stream) = self.stream.as_mut() {
match stream.poll_next_unpin(cx) {
Poll::Ready(message) => match message {
Some(message) => {
// Do we bail out on all errors?
// Or we want to handle some? (like consumer deleted?)
let message = message?;
let info = message.info().map_err(|err| {
OrderedError::with_source(OrderedErrorKind::Other, err)
})?;
trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}",
Poll::Ready(Some(message)) => {
// Do we bail out on all errors?
// Or we want to handle some? (like consumer deleted?)
let message = message?;
let info = message
.info()
.map_err(|err| OrderedError::with_source(OrderedErrorKind::Other, err))?;
trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}",
self.consumer_sequence,
self.stream_sequence,
info.consumer_sequence,
info.stream_sequence);
if info.consumer_sequence != self.consumer_sequence + 1 {
debug!(
"ordered consumer mismatch. current {}, info: {}",
self.consumer_sequence, info.consumer_sequence
);
recreate = true;
self.consumer_sequence = 0;
} else {
self.stream_sequence = info.stream_sequence;
self.consumer_sequence = info.consumer_sequence;
return Poll::Ready(Some(Ok(message)));
}
if info.consumer_sequence != self.consumer_sequence + 1 {
debug!(
"ordered consumer mismatch. current {}, info: {}",
self.consumer_sequence, info.consumer_sequence
);
recreate = true;
self.consumer_sequence = 0;
} else {
self.stream_sequence = info.stream_sequence;
self.consumer_sequence = info.consumer_sequence;
return Poll::Ready(Some(Ok(message)));
}
None => return Poll::Ready(None),
},
}
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => (),
}
}
Expand All @@ -783,30 +772,35 @@ impl<'a> futures::Stream for Ordered<'a> {
let consumer_name = self.consumer_name.clone();
let sequence = self.consumer_sequence;
async move {
recreate_consumer_stream(context, config, stream_name, consumer_name, sequence)
.await
recreate_consumer_stream(
context,
config,
&stream_name,
&consumer_name,
sequence,
)
.await
}
}))
}
// check for recreation future
if let Some(result) = self.create_stream.as_mut() {
match result.poll_unpin(cx) {
Poll::Ready(result) => match result {
Ok(stream) => {
self.create_stream = None;
self.stream = Some(stream);
return self.poll_next(cx);
}
Err(err) => {
return Poll::Ready(Some(Err(OrderedError::with_source(
OrderedErrorKind::Recreate,
err,
))))
}
},
Poll::Ready(Ok(stream)) => {
self.create_stream = None;
self.stream = Some(stream);
return self.poll_next(cx);
}
Poll::Ready(Err(err)) => {
return Poll::Ready(Some(Err(OrderedError::with_source(
OrderedErrorKind::Recreate,
err,
))))
}
Poll::Pending => (),
}
}

Poll::Pending
}
}
Expand Down Expand Up @@ -1027,18 +1021,15 @@ impl futures::Stream for Stream {
if !self.batch_config.idle_heartbeat.is_zero() {
trace!("checking idle hearbeats");
let timeout = self.batch_config.idle_heartbeat.saturating_mul(2);
match self
let heartbeat_timeout = self
.heartbeat_timeout
.get_or_insert_with(|| Box::pin(tokio::time::sleep(timeout)))
.poll_unpin(cx)
{
Poll::Ready(_) => {
self.heartbeat_timeout = None;
return Poll::Ready(Some(Err(MessagesError::new(
MessagesErrorKind::MissingHeartbeat,
))));
}
Poll::Pending => (),
.get_or_insert_with(|| Box::pin(tokio::time::sleep(timeout)));

if heartbeat_timeout.poll_unpin(cx).is_ready() {
self.heartbeat_timeout = None;
return Poll::Ready(Some(Err(MessagesError::new(
MessagesErrorKind::MissingHeartbeat,
))));
}
}

Expand All @@ -1055,30 +1046,26 @@ impl futures::Stream for Stream {
}

match self.request_result_rx.poll_recv(cx) {
Poll::Ready(resp) => match resp {
Some(resp) => match resp {
Ok(reset) => {
trace!("request response: {:?}", reset);
debug!("request sent, setting pending messages");
if reset {
self.pending_messages = self.batch_config.batch;
self.pending_bytes = self.batch_config.max_bytes;
} else {
self.pending_messages += self.batch_config.batch;
self.pending_bytes += self.batch_config.max_bytes;
}
self.pending_request = false;
continue;
}
Err(err) => {
return Poll::Ready(Some(Err(MessagesError::with_source(
MessagesErrorKind::Pull,
err,
))))
}
},
None => return Poll::Ready(None),
},
Poll::Ready(Some(Ok(reset))) => {
trace!("request response: {:?}", reset);
debug!("request sent, setting pending messages");
if reset {
self.pending_messages = self.batch_config.batch;
self.pending_bytes = self.batch_config.max_bytes;
} else {
self.pending_messages += self.batch_config.batch;
self.pending_bytes += self.batch_config.max_bytes;
}
self.pending_request = false;
continue;
}
Poll::Ready(Some(Err(err))) => {
return Poll::Ready(Some(Err(MessagesError::with_source(
MessagesErrorKind::Pull,
err,
))))
}
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => {
trace!("pending result");
}
Expand All @@ -1088,6 +1075,7 @@ impl futures::Stream for Stream {
match self.subscriber.receiver.poll_recv(cx) {
Poll::Ready(maybe_message) => {
self.heartbeat_timeout = None;

match maybe_message {
Some(message) => match message.status.unwrap_or(StatusCode::OK) {
StatusCode::TIMEOUT | StatusCode::REQUEST_TERMINATED => {
Expand Down Expand Up @@ -2200,23 +2188,17 @@ pub type ConsumerRecreateError = Error<ConsumerRecreateErrorKind>;
async fn recreate_consumer_stream(
context: Context,
config: Config,
stream_name: String,
consumer_name: String,
stream_name: &str,
consumer_name: &str,
sequence: u64,
) -> Result<Stream, ConsumerRecreateError> {
// TODO(jarema): retry whole operation few times?
let stream = context
.get_stream(stream_name.clone())
.await
.map_err(|err| {
ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::GetStream, err)
})?;
stream
.delete_consumer(&consumer_name)
.await
.map_err(|err| {
ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err)
})?;
let stream = context.get_stream(stream_name).await.map_err(|err| {
ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::GetStream, err)
})?;
stream.delete_consumer(consumer_name).await.map_err(|err| {
ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err)
})?;

let deliver_policy = {
if sequence == 0 {
Expand Down
Loading