diff --git a/async-nats/src/jetstream/consumer/pull.rs b/async-nats/src/jetstream/consumer/pull.rs index 014f70e56..3d539d394 100644 --- a/async-nats/src/jetstream/consumer/pull.rs +++ b/async-nats/src/jetstream/consumer/pull.rs @@ -435,57 +435,48 @@ impl<'a> futures::Stream for Sequence<'a> { mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - match self.next.as_mut() { - None => { - let context = self.context.clone(); - let subject = self.subject.clone(); - let request = self.request.clone(); - let pending_messages = self.pending_messages; + let this = self.as_mut().get_mut(); + + let next = this.next.get_or_insert_with(|| { + let context = this.context.clone(); + let subject = this.subject.clone(); + let request = this.request.clone(); + let pending_messages = this.pending_messages; + + Box::pin(async move { + let inbox = context.client.new_inbox(); + let subscriber = context + .client + .subscribe(inbox.clone().into()) + .await + .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?; + + context + .client + .publish_with_reply(subject.into(), inbox.into(), request) + .await + .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?; + + // TODO(tp): Add timeout config and defaults. + Ok(Batch { + pending_messages, + subscriber, + context, + terminated: false, + timeout: Some(Box::pin(tokio::time::sleep(Duration::from_secs(60)))), + }) + }) + }); - let next = self.next.insert(Box::pin(async move { - let inbox = context.client.new_inbox(); - let subscriber = context - .client - .subscribe(inbox.clone().into()) - .await - .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?; + match next.as_mut().poll(cx) { + Poll::Ready(result) => { + self.next = None; - context - .client - .publish_with_reply(subject.into(), inbox.into(), request) - .await - .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?; - - // TODO(tp): Add timeout config and defaults. - Ok(Batch { - pending_messages, - subscriber, - context, - terminated: false, - timeout: Some(Box::pin(tokio::time::sleep(Duration::from_secs(60)))), - }) - })); - - match next.as_mut().poll(cx) { - Poll::Ready(result) => { - self.next = None; - Poll::Ready(Some(result.map_err(|err| { - MessagesError::with_source(MessagesErrorKind::Pull, err) - }))) - } - Poll::Pending => Poll::Pending, - } + Poll::Ready(Some(result.map_err(|err| { + MessagesError::with_source(MessagesErrorKind::Pull, err) + }))) } - - Some(next) => match next.as_mut().poll(cx) { - Poll::Ready(result) => { - self.next = None; - Poll::Ready(Some(result.map_err(|err| { - MessagesError::with_source(MessagesErrorKind::Pull, err) - }))) - } - Poll::Pending => Poll::Pending, - }, + Poll::Pending => Poll::Pending, } } } @@ -742,34 +733,32 @@ impl<'a> futures::Stream for Ordered<'a> { // Poll messages if let Some(stream) = self.stream.as_mut() { match stream.poll_next_unpin(cx) { - Poll::Ready(message) => match message { - Some(message) => { - // Do we bail out on all errors? - // Or we want to handle some? (like consumer deleted?) - let message = message?; - let info = message.info().map_err(|err| { - OrderedError::with_source(OrderedErrorKind::Other, err) - })?; - trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}", + Poll::Ready(Some(message)) => { + // Do we bail out on all errors? + // Or we want to handle some? (like consumer deleted?) + let message = message?; + let info = message + .info() + .map_err(|err| OrderedError::with_source(OrderedErrorKind::Other, err))?; + trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}", self.consumer_sequence, self.stream_sequence, info.consumer_sequence, info.stream_sequence); - if info.consumer_sequence != self.consumer_sequence + 1 { - debug!( - "ordered consumer mismatch. current {}, info: {}", - self.consumer_sequence, info.consumer_sequence - ); - recreate = true; - self.consumer_sequence = 0; - } else { - self.stream_sequence = info.stream_sequence; - self.consumer_sequence = info.consumer_sequence; - return Poll::Ready(Some(Ok(message))); - } + if info.consumer_sequence != self.consumer_sequence + 1 { + debug!( + "ordered consumer mismatch. current {}, info: {}", + self.consumer_sequence, info.consumer_sequence + ); + recreate = true; + self.consumer_sequence = 0; + } else { + self.stream_sequence = info.stream_sequence; + self.consumer_sequence = info.consumer_sequence; + return Poll::Ready(Some(Ok(message))); } - None => return Poll::Ready(None), - }, + } + Poll::Ready(None) => return Poll::Ready(None), Poll::Pending => (), } } @@ -783,30 +772,35 @@ impl<'a> futures::Stream for Ordered<'a> { let consumer_name = self.consumer_name.clone(); let sequence = self.consumer_sequence; async move { - recreate_consumer_stream(context, config, stream_name, consumer_name, sequence) - .await + recreate_consumer_stream( + context, + config, + &stream_name, + &consumer_name, + sequence, + ) + .await } })) } // check for recreation future if let Some(result) = self.create_stream.as_mut() { match result.poll_unpin(cx) { - Poll::Ready(result) => match result { - Ok(stream) => { - self.create_stream = None; - self.stream = Some(stream); - return self.poll_next(cx); - } - Err(err) => { - return Poll::Ready(Some(Err(OrderedError::with_source( - OrderedErrorKind::Recreate, - err, - )))) - } - }, + Poll::Ready(Ok(stream)) => { + self.create_stream = None; + self.stream = Some(stream); + return self.poll_next(cx); + } + Poll::Ready(Err(err)) => { + return Poll::Ready(Some(Err(OrderedError::with_source( + OrderedErrorKind::Recreate, + err, + )))) + } Poll::Pending => (), } } + Poll::Pending } } @@ -1027,18 +1021,15 @@ impl futures::Stream for Stream { if !self.batch_config.idle_heartbeat.is_zero() { trace!("checking idle hearbeats"); let timeout = self.batch_config.idle_heartbeat.saturating_mul(2); - match self + let heartbeat_timeout = self .heartbeat_timeout - .get_or_insert_with(|| Box::pin(tokio::time::sleep(timeout))) - .poll_unpin(cx) - { - Poll::Ready(_) => { - self.heartbeat_timeout = None; - return Poll::Ready(Some(Err(MessagesError::new( - MessagesErrorKind::MissingHeartbeat, - )))); - } - Poll::Pending => (), + .get_or_insert_with(|| Box::pin(tokio::time::sleep(timeout))); + + if heartbeat_timeout.poll_unpin(cx).is_ready() { + self.heartbeat_timeout = None; + return Poll::Ready(Some(Err(MessagesError::new( + MessagesErrorKind::MissingHeartbeat, + )))); } } @@ -1055,30 +1046,26 @@ impl futures::Stream for Stream { } match self.request_result_rx.poll_recv(cx) { - Poll::Ready(resp) => match resp { - Some(resp) => match resp { - Ok(reset) => { - trace!("request response: {:?}", reset); - debug!("request sent, setting pending messages"); - if reset { - self.pending_messages = self.batch_config.batch; - self.pending_bytes = self.batch_config.max_bytes; - } else { - self.pending_messages += self.batch_config.batch; - self.pending_bytes += self.batch_config.max_bytes; - } - self.pending_request = false; - continue; - } - Err(err) => { - return Poll::Ready(Some(Err(MessagesError::with_source( - MessagesErrorKind::Pull, - err, - )))) - } - }, - None => return Poll::Ready(None), - }, + Poll::Ready(Some(Ok(reset))) => { + trace!("request response: {:?}", reset); + debug!("request sent, setting pending messages"); + if reset { + self.pending_messages = self.batch_config.batch; + self.pending_bytes = self.batch_config.max_bytes; + } else { + self.pending_messages += self.batch_config.batch; + self.pending_bytes += self.batch_config.max_bytes; + } + self.pending_request = false; + continue; + } + Poll::Ready(Some(Err(err))) => { + return Poll::Ready(Some(Err(MessagesError::with_source( + MessagesErrorKind::Pull, + err, + )))) + } + Poll::Ready(None) => return Poll::Ready(None), Poll::Pending => { trace!("pending result"); } @@ -1088,6 +1075,7 @@ impl futures::Stream for Stream { match self.subscriber.receiver.poll_recv(cx) { Poll::Ready(maybe_message) => { self.heartbeat_timeout = None; + match maybe_message { Some(message) => match message.status.unwrap_or(StatusCode::OK) { StatusCode::TIMEOUT | StatusCode::REQUEST_TERMINATED => { @@ -2200,23 +2188,17 @@ pub type ConsumerRecreateError = Error; async fn recreate_consumer_stream( context: Context, config: Config, - stream_name: String, - consumer_name: String, + stream_name: &str, + consumer_name: &str, sequence: u64, ) -> Result { // TODO(jarema): retry whole operation few times? - let stream = context - .get_stream(stream_name.clone()) - .await - .map_err(|err| { - ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::GetStream, err) - })?; - stream - .delete_consumer(&consumer_name) - .await - .map_err(|err| { - ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err) - })?; + let stream = context.get_stream(stream_name).await.map_err(|err| { + ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::GetStream, err) + })?; + stream.delete_consumer(consumer_name).await.map_err(|err| { + ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err) + })?; let deliver_policy = { if sequence == 0 { diff --git a/async-nats/src/jetstream/consumer/push.rs b/async-nats/src/jetstream/consumer/push.rs index e1b9c6a6e..eeda1c369 100644 --- a/async-nats/src/jetstream/consumer/push.rs +++ b/async-nats/src/jetstream/consumer/push.rs @@ -276,15 +276,15 @@ pub struct Config { impl FromConsumer for Config { fn try_from_consumer_config(config: super::Config) -> Result { - if config.deliver_subject.is_none() { - return Err(Box::new(io::Error::new( + let deliver_subject = config.deliver_subject.ok_or_else(|| { + Box::new(io::Error::new( ErrorKind::Other, "push consumer must have delivery subject", - ))); - } + )) + })?; Ok(Config { - deliver_subject: config.deliver_subject.unwrap(), + deliver_subject, durable_name: config.durable_name, name: config.name, description: config.description, @@ -511,7 +511,7 @@ impl Consumer { recreate_ephemeral_consumer( context.clone(), config.clone(), - stream_name.clone(), + &stream_name, last_sequence.load(Ordering::Relaxed), ) }) @@ -563,24 +563,22 @@ impl<'a> futures::Stream for Ordered<'a> { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - match self - .heartbeat_sleep - .get_or_insert_with(|| { - Box::pin(tokio::time::sleep(ORDERED_IDLE_HEARTBEAT.saturating_mul(2))) - }) - .poll_unpin(cx) - { - Poll::Ready(_) => { - self.heartbeat_sleep = None; - return Poll::Ready(Some(Err(OrderedError::new( - OrderedErrorKind::MissingHeartbeat, - )))); - } - Poll::Pending => (), + let this = self.as_mut().get_mut(); + + let heartbeat_sleep = this.heartbeat_sleep.get_or_insert_with(|| { + Box::pin(tokio::time::sleep(ORDERED_IDLE_HEARTBEAT.saturating_mul(2))) + }); + + if heartbeat_sleep.poll_unpin(cx).is_ready() { + this.heartbeat_sleep = None; + + return Poll::Ready(Some(Err(OrderedError::new( + OrderedErrorKind::MissingHeartbeat, + )))); } loop { - match self.shutdown.try_recv() { + match this.shutdown.try_recv() { Ok(err) => { return Poll::Ready(Some(Err(OrderedError::with_source( OrderedErrorKind::Other, @@ -595,140 +593,118 @@ impl<'a> futures::Stream for Ordered<'a> { } Err(TryRecvError::Empty) => {} } - if self.subscriber.is_none() { - match self.subscriber_future.as_mut() { - None => { - trace!( - "subscriber and subscriber future are None. Recreating the consumer" - ); - let context = self.context.clone(); - let sequence = self.stream_sequence.clone(); - let config = self.consumer.config.clone(); - let stream_name = self.consumer.info.stream_name.clone(); - let subscriber_future = - self.subscriber_future.insert(Box::pin(async move { - recreate_consumer_and_subscription( - context, - config, - stream_name, - sequence.load(Ordering::Relaxed), - ) - .await - })); - match subscriber_future.as_mut().poll(cx) { - Poll::Ready(subscriber) => { - self.subscriber_future = None; - self.consumer_sequence.store(0, Ordering::Relaxed); - self.subscriber = Some(subscriber.map_err(|err| { - OrderedError::with_source(OrderedErrorKind::Recreate, err) - })?); - } - Poll::Pending => { - return Poll::Pending; - } - } - } - Some(subscriber) => match subscriber.as_mut().poll(cx) { + + let subscriber = match &mut this.subscriber { + Some(subscriber) => subscriber, + None => { + let subscriber_future = this.subscriber_future.get_or_insert_with(|| { + let context = this.context.clone(); + let sequence = this.stream_sequence.clone(); + let config = this.consumer.config.clone(); + let stream_name = this.consumer.info.stream_name.clone(); + + Box::pin(async move { + recreate_consumer_and_subscription( + context, + config, + stream_name, + sequence.load(Ordering::Relaxed), + ) + .await + }) + }); + + match subscriber_future.as_mut().poll(cx) { Poll::Ready(subscriber) => { - self.subscriber_future = None; - self.consumer_sequence.store(0, Ordering::Relaxed); - self.subscriber = Some(subscriber.map_err(|err| { + this.subscriber_future = None; + this.consumer_sequence.store(0, Ordering::Relaxed); + this.subscriber.insert(subscriber.map_err(|err| { OrderedError::with_source(OrderedErrorKind::Recreate, err) - })?); - } - Poll::Pending => { - return Poll::Pending; + })?) } - }, + Poll::Pending => return Poll::Pending, + } } - } - if let Some(subscriber) = self.subscriber.as_mut() { - match subscriber.receiver.poll_recv(cx) { - Poll::Ready(maybe_message) => match maybe_message { - Some(message) => { - self.heartbeat_sleep = None; - match message.status { - Some(StatusCode::IDLE_HEARTBEAT) => { - debug!("received idle heartbeats"); - if let Some(headers) = message.headers.as_ref() { - if let Some(sequence) = - headers.get(crate::header::NATS_LAST_CONSUMER) - { - let sequence: u64 = - sequence.as_str().parse().map_err(|err| { - OrderedError::with_source( - OrderedErrorKind::Other, - err, - ) - })?; - - let last_sequence = - self.consumer_sequence.load(Ordering::Relaxed); - - if sequence != last_sequence { - debug!("hearbeats sequence mismatch. got {}, expected {}, resetting consumer", sequence, last_sequence); - self.subscriber = None; - } - } + }; + + match subscriber.receiver.poll_recv(cx) { + Poll::Ready(Some(message)) => { + this.heartbeat_sleep = None; + + match message.status { + Some(StatusCode::IDLE_HEARTBEAT) => { + debug!("received idle heartbeats"); + if let Some(headers) = message.headers.as_ref() { + if let Some(sequence) = + headers.get(crate::header::NATS_LAST_CONSUMER) + { + let sequence: u64 = + sequence.as_str().parse().map_err(|err| { + OrderedError::with_source(OrderedErrorKind::Other, err) + })?; + + let last_sequence = + this.consumer_sequence.load(Ordering::Relaxed); + + if sequence != last_sequence { + debug!("hearbeats sequence mismatch. got {}, expected {}, resetting consumer", sequence, last_sequence); + this.subscriber = None; } - // flow control. - if let Some(subject) = message.reply.clone() { - trace!("received flow control message"); - let client = self.context.client.clone(); - tokio::task::spawn(async move { - client - .publish(subject, Bytes::from_static(b"")) - .await - .ok(); - }); - } - continue; - } - Some(status) => { - debug!("received status message: {}", status); - continue; - } - None => { - trace!("received a message"); - let jetstream_message = jetstream::message::Message { - message, - context: self.context.clone(), - }; - - let info = jetstream_message.info().map_err(|err| { - OrderedError::with_source(OrderedErrorKind::Other, err) - })?; - trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}", - self.consumer_sequence, - self.stream_sequence, - info.consumer_sequence, - info.stream_sequence); - if info.consumer_sequence - != self.consumer_sequence.load(Ordering::Relaxed) + 1 - { - debug!( - "ordered consumer mismatch. current {}, info: {}", - self.consumer_sequence.load(Ordering::Relaxed), - info.consumer_sequence - ); - self.subscriber = None; - self.consumer_sequence.store(0, Ordering::Relaxed); - continue; - } - self.stream_sequence - .store(info.stream_sequence, Ordering::Relaxed); - self.consumer_sequence - .store(info.consumer_sequence, Ordering::Relaxed); - return Poll::Ready(Some(Ok(jetstream_message))); } } + // flow control. + if let Some(subject) = message.reply.clone() { + trace!("received flow control message"); + let client = this.context.client.clone(); + + tokio::task::spawn(async move { + client.publish(subject, Bytes::from_static(b"")).await.ok(); + client.flush().await.ok(); + }); + } + continue; + } + Some(status) => { + debug!("received status message: {}", status); + continue; } None => { - return Poll::Ready(None); + trace!("received a message"); + let jetstream_message = jetstream::message::Message { + message, + context: this.context.clone(), + }; + + let info = jetstream_message.info().map_err(|err| { + OrderedError::with_source(OrderedErrorKind::Other, err) + })?; + trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}", + this.consumer_sequence, + this.stream_sequence, + info.consumer_sequence, + info.stream_sequence); + if info.consumer_sequence + != this.consumer_sequence.load(Ordering::Relaxed) + 1 + { + debug!( + "ordered consumer mismatch. current {}, info: {}", + this.consumer_sequence.load(Ordering::Relaxed), + info.consumer_sequence + ); + this.subscriber = None; + this.consumer_sequence.store(0, Ordering::Relaxed); + continue; + } + this.stream_sequence + .store(info.stream_sequence, Ordering::Relaxed); + this.consumer_sequence + .store(info.consumer_sequence, Ordering::Relaxed); + return Poll::Ready(Some(Ok(jetstream_message))); } - }, - Poll::Pending => return Poll::Pending, + } } + Poll::Ready(None) => return Poll::Ready(None), + Poll::Pending => return Poll::Pending, } } } @@ -836,21 +812,18 @@ async fn recreate_consumer_and_subscription( ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Subscription, err) })?; - recreate_ephemeral_consumer(context, config, stream_name, sequence).await?; + recreate_ephemeral_consumer(context, config, &stream_name, sequence).await?; Ok(subscriber) } async fn recreate_ephemeral_consumer( context: Context, config: OrderedConfig, - stream_name: String, + stream_name: &str, sequence: u64, ) -> Result<(), ConsumerRecreateError> { - let stream = context - .get_stream(stream_name.clone()) - .await - .map_err(|err| { - ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::GetStream, err) - })?; + let stream = context.get_stream(stream_name).await.map_err(|err| { + ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::GetStream, err) + })?; let deliver_policy = { if sequence == 0 {