Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core/types): Implement concurrent read for blocking read #4545

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

hoslo
Copy link
Contributor

@hoslo hoslo commented Apr 28, 2024

No description provided.

@hoslo
Copy link
Contributor Author

hoslo commented Apr 28, 2024

@Xuanwo Plz take a review for this design.

let (interval_size, mut intervals) = end
.map(|end| {
// let interval_size = (end - start + concurrent - 1) / concurrent;
let interval_size = (end - start) / concurrent;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interval_size should be decided by chunk.

core/src/types/blocking_read/blocking_reader.rs Outdated Show resolved Hide resolved
@hoslo hoslo force-pushed the feat-concurrent-blocking-read branch from 274fa3d to 5a597d3 Compare April 28, 2024 08:35
@hoslo hoslo force-pushed the feat-concurrent-blocking-read branch 2 times, most recently from d7640e3 to 9720e2b Compare April 28, 2024 11:02
/// BlockingReader is designed to read data from given path in an blocking
/// manner.
pub struct BlockingReader {
pub(crate) inner: oio::BlockingReader,
pub(crate) inner: Arc<dyn oio::BlockingRead>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change oio::BlockingReader to Arc<dyn oio::BlockingRead>, maybe worth a seperate PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you like to submit this PR first?

let mut bufs = Vec::with_capacity(self.concurrent);
let interval_size = self.chunk.unwrap_or(4 * 1024 * 1024) as u64;

let intervals: Vec<(u64, u64)> = (0..self.concurrent as u64)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't calcluate ranges at once.

We can build a RangeIterator that only generate the next range to read. BufferIterator stores RangeIterator, every time users call next, BufferIterator fetchs a new range and than read as a new buffer.

Copy link
Contributor Author

@hoslo hoslo Apr 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't calcluate ranges at once.

How can this be paralleled?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can this be paralleled?

After re-visit the rayon API docs, I found that we can't call next on a ParallelIterator directly. So we can change the RangeIterator to return a ParallelIterator instead.

Every time users call next on BufferIterator, we build a new ParallelIterator, extend into buffer and yield from it.

The whole workflow looks like:

RangeIterator::next() -> Range<u64>


struct BufferIterator {
  it: RangeIterator,
  buf: Vec<Buffer>
}

impl Iterator for BufferIterator {
    type Item = Result<Buffer>;

    fn next(&mut self) -> Option<Self::Item> {
        // fecth from buffer first.
        // Build a new ParallelIterator and collect it.
        // Return None if ranges have been all consumed.
    }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Xuanwo Plz review.

@hoslo hoslo force-pushed the feat-concurrent-blocking-read branch from 9720e2b to d500961 Compare May 6, 2024 07:30
/// BlockingReader is designed to read data from given path in an blocking
/// manner.
pub struct BlockingReader {
pub(crate) inner: oio::BlockingReader,
pub(crate) inner: Arc<dyn oio::BlockingRead>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you like to submit this PR first?

@@ -74,24 +84,26 @@ impl BlockingReader {
}
}

let iter = BufferIterator::new(
self.inner.clone(),
self.options.chunk(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can accpet an OpReader directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you like to submit this PR first?

Do you mean submit a new PR to change oio::BlockingReader to Arc first?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean submit a new PR to change oio::BlockingReader to Arc first?

Yes

offset += n as u64;
if Some(offset) == end {
return Ok(bufs.into_iter().flatten().collect());
for buffer in iter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can use collect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to return err early when using collect?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to return err early when using collect?

collect will exit once error happened. See impl<A, E, V> FromIterator<Result<A, E>> for Result<V, E>:

Takes each element in the Iterator: if it is an Err, no further elements are taken, and the Err is returned. Should no Err occur, a container with the values of each Result is returned.

return Ok(read as _);
let mut total_len = 0;
for buffer in iter {
match buffer {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please try use if-let-else:

let mut read = 0;
loop {
    let Some(bs) = iter.try_next()? else {
        return Ok(read);
    };
    read += bs.len();
    buf.put(bs);
}

}

/// Convert reader into [`StdReader`] which implements [`futures::AsyncRead`],
/// [`futures::AsyncSeek`] and [`futures::AsyncBufRead`].
#[inline]
pub fn into_std_read(self, range: Range<u64>) -> StdReader {
// TODO: the capacity should be decided by services.
StdReader::new(self.inner, range)
StdReader::new(self.inner.clone(), range)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should pass OpReader?


let mut bufs = Vec::with_capacity(self.concurrent);

let intervals: Vec<Range<u64>> = (0..self.concurrent)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should put self.finished inside the RangeIterator should it can know when to stop returning new ranges.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should put self.finished inside the RangeIterator should it can know when to stop returning new ranges.

When end is None, We only know end when after read.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When end is None, We only know end when after read.

That's why we need to store finished in RangeIterator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean read also put into RangeIterator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Xuanwo I submit a new one, is this what you want?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Close but not what I want. Can I push changes to this PR directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Close but not what I want. Can I push changes to this PR directly?

Of course, thank for your patient.

Copy link
Contributor Author

@hoslo hoslo Jun 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Xuanwo Do you still remember this?

@hoslo hoslo force-pushed the feat-concurrent-blocking-read branch 3 times, most recently from 661ba73 to 40322ce Compare May 7, 2024 06:56
@hoslo hoslo force-pushed the feat-concurrent-blocking-read branch from 40322ce to cb882aa Compare May 7, 2024 07:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants