Skip to content

Commit

Permalink
Improve stream Stream implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
paolobarbolini committed Oct 8, 2023
1 parent 4222b83 commit fac8a7e
Showing 1 changed file with 105 additions and 96 deletions.
201 changes: 105 additions & 96 deletions async-nats/src/jetstream/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::{
use base64::engine::general_purpose::STANDARD;
use base64::engine::Engine;
use bytes::Bytes;
use futures::{future::BoxFuture, TryFutureExt};
use futures::{future::BoxFuture, FutureExt, TryFutureExt};
use serde::{Deserialize, Deserializer, Serialize};
use serde_json::json;
use time::{serde::rfc3339, OffsetDateTime};
Expand Down Expand Up @@ -1696,60 +1696,64 @@ impl futures::Stream for ConsumerNames<'_> {
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match self.page_request.as_mut() {
Some(page) => match page.try_poll_unpin(cx) {
std::task::Poll::Ready(page) => {
self.page_request = None;
let page = page.map_err(|err| {
ConsumerNamesError::with_source(ConsumerNamesErrorKind::Other, err)
) -> Poll<Option<Self::Item>> {
let this = self.as_mut().get_mut();

loop {
if let Some(consumer) = this.consumers.pop() {
return Poll::Ready(Some(Ok(consumer)));
}

if this.done {
return Poll::Ready(None);
}

let page_request = this.page_request.get_or_insert_with(|| {
let context = this.context.clone();
let offset = this.offset;
let stream = this.stream.clone();

Box::pin(async move {
match context
.request(
format!("CONSUMER.NAMES.{stream}").into(),
&json!({
"offset": offset,
}),
)
.await?
{
Response::Err { error } => Err(RequestError::with_source(
ConsumerNamesErrorKind::Other,
error,
)),
Response::Ok(page) => Ok(page),
}
})
});

match page_request.poll_unpin(cx) {
Poll::Ready(page_result) => {
this.page_request = None;

let page = page_result.map_err(|err| {
ConsumersError::with_source(ConsumerNamesErrorKind::Other, err)
})?;
match page.consumers {
Some(consumers) => {
this.consumers = consumers;

if let Some(consumers) = page.consumers {
self.offset += consumers.len();
self.consumers = consumers;
if self.offset >= page.total {
self.done = true;
this.offset += this.consumers.len();
this.done = this.offset >= page.total;
continue;
}
match self.consumers.pop() {
Some(stream) => Poll::Ready(Some(Ok(stream))),
None => Poll::Ready(None),
None => {
this.done = true;
return Poll::Ready(None);
}
} else {
Poll::Ready(None)
}
}
std::task::Poll::Pending => std::task::Poll::Pending,
},
None => {
if let Some(stream) = self.consumers.pop() {
Poll::Ready(Some(Ok(stream)))
} else {
if self.done {
return Poll::Ready(None);
}
let context = self.context.clone();
let offset = self.offset;
let stream = self.stream.clone();
self.page_request = Some(Box::pin(async move {
match context
.request(
format!("CONSUMER.NAMES.{stream}").into(),
&json!({
"offset": offset,
}),
)
.await?
{
Response::Err { error } => Err(RequestError::with_source(
super::context::RequestErrorKind::Other,
error,
)),
Response::Ok(page) => Ok(page),
}
}));
self.poll_next(cx)
}
Poll::Pending => return Poll::Pending,
}
}
}
Expand All @@ -1774,59 +1778,64 @@ impl futures::Stream for Consumers<'_> {
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match self.page_request.as_mut() {
Some(page) => match page.try_poll_unpin(cx) {
std::task::Poll::Ready(page) => {
self.page_request = None;
let page = page.map_err(|err| {
) -> Poll<Option<Self::Item>> {
let this = self.as_mut().get_mut();

loop {
if let Some(consumer) = this.consumers.pop() {
return Poll::Ready(Some(Ok(consumer)));
}

if this.done {
return Poll::Ready(None);
}

let page_request = this.page_request.get_or_insert_with(|| {
let context = this.context.clone();
let offset = this.offset;
let stream = this.stream.clone();

Box::pin(async move {
match context
.request(
format!("CONSUMER.LIST.{stream}").into(),
&json!({
"offset": offset,
}),
)
.await?
{
Response::Err { error } => Err(RequestError::with_source(
super::context::RequestErrorKind::Other,
error,
)),
Response::Ok(page) => Ok(page),
}
})
});

match page_request.poll_unpin(cx) {
Poll::Ready(page_result) => {
this.page_request = None;

let page = page_result.map_err(|err| {
ConsumersError::with_source(ConsumersErrorKind::Other, err)
})?;
if let Some(consumers) = page.consumers {
self.offset += consumers.len();
self.consumers = consumers;
if self.offset >= page.total {
self.done = true;
match page.consumers {
Some(consumers) => {
this.consumers = consumers;

this.offset += this.consumers.len();
this.done = this.offset >= page.total;
continue;
}
match self.consumers.pop() {
Some(consumer) => Poll::Ready(Some(Ok(consumer))),
None => Poll::Ready(None),
None => {
this.done = true;
return Poll::Ready(None);
}
} else {
Poll::Ready(None)
}
}
std::task::Poll::Pending => std::task::Poll::Pending,
},
None => {
if let Some(stream) = self.consumers.pop() {
Poll::Ready(Some(Ok(stream)))
} else {
if self.done {
return Poll::Ready(None);
}
let context = self.context.clone();
let offset = self.offset;
let stream = self.stream.clone();
self.page_request = Some(Box::pin(async move {
match context
.request(
format!("CONSUMER.LIST.{stream}").into(),
&json!({
"offset": offset,
}),
)
.await?
{
Response::Err { error } => Err(RequestError::with_source(
super::context::RequestErrorKind::Other,
error,
)),
Response::Ok(page) => Ok(page),
}
}));
self.poll_next(cx)
}
Poll::Pending => return Poll::Pending,
}
}
}
Expand Down

0 comments on commit fac8a7e

Please sign in to comment.