Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### New Features
* Mempurge option flag `experimental_mempurge_threshold` is now a ColumnFamilyOptions and can now be dynamically configured using `SetOptions()`.
* Support backward iteration when `ReadOptions::iter_start_ts` is set.
* Provide support for ReadOptions.async_io with direct_io to improve Seek latency by using async IO to parallelize child iterator seek and doing asynchronous prefetching on sequential scans.

### Public API changes
* Add metadata related structs and functions in C API, including
Expand Down
8 changes: 7 additions & 1 deletion env/fs_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1102,15 +1102,21 @@ class PosixFileSystem : public FileSystem {
req.scratch = posix_handle->scratch;
req.offset = posix_handle->offset;
req.len = posix_handle->len;

size_t finished_len = 0;
size_t bytes_read = 0;
bool read_again = false;
UpdateResult(cqe, "", req.len, posix_handle->iov.iov_len,
true /*async_read*/, finished_len, &req, bytes_read);
true /*async_read*/, posix_handle->use_direct_io,
posix_handle->alignment, finished_len, &req, bytes_read,
read_again);
posix_handle->is_finished = true;
io_uring_cqe_seen(iu, cqe);
posix_handle->cb(req, posix_handle->cb_arg);

(void)finished_len;
(void)bytes_read;
(void)read_again;

if (static_cast<Posix_IOHandle*>(io_handles[i]) == posix_handle) {
break;
Expand Down
68 changes: 18 additions & 50 deletions env/io_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,23 +199,6 @@ bool IsSyncFileRangeSupported(int fd) {

} // anonymous namespace

/*
* DirectIOHelper
*/
namespace {

bool IsSectorAligned(const size_t off, size_t sector_size) {
assert((sector_size & (sector_size - 1)) == 0);
return (off & (sector_size - 1)) == 0;
}

#ifndef NDEBUG
bool IsSectorAligned(const void* ptr, size_t sector_size) {
return uintptr_t(ptr) % sector_size == 0;
}
#endif
} // namespace

/*
* PosixSequentialFile
*/
Expand Down Expand Up @@ -760,32 +743,21 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs,

FSReadRequest* req = req_wrap->req;
size_t bytes_read = 0;
bool read_again = false;
UpdateResult(cqe, filename_, req->len, req_wrap->iov.iov_len,
false /*async_read*/, req_wrap->finished_len, req,
bytes_read);
false /*async_read*/, use_direct_io(),
GetRequiredBufferAlignment(), req_wrap->finished_len, req,
bytes_read, read_again);
int32_t res = cqe->res;
if (res >= 0) {
if (bytes_read == 0) {
/// cqe->res == 0 can means EOF, or can mean partial results. See
// comment
// https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435
// Fall back to pread in this case.
if (use_direct_io() &&
!IsSectorAligned(req_wrap->finished_len,
GetRequiredBufferAlignment())) {
// Bytes reads don't fill sectors. Should only happen at the end
// of the file.
req->result = Slice(req->scratch, req_wrap->finished_len);
req->status = IOStatus::OK();
} else {
Slice tmp_slice;
req->status =
Read(req->offset + req_wrap->finished_len,
req->len - req_wrap->finished_len, options, &tmp_slice,
req->scratch + req_wrap->finished_len, dbg);
req->result =
Slice(req->scratch, req_wrap->finished_len + tmp_slice.size());
}
if (bytes_read == 0 && read_again) {
Slice tmp_slice;
req->status =
Read(req->offset + req_wrap->finished_len,
req->len - req_wrap->finished_len, options, &tmp_slice,
req->scratch + req_wrap->finished_len, dbg);
req->result =
Slice(req->scratch, req_wrap->finished_len + tmp_slice.size());
} else if (bytes_read < req_wrap->iov.iov_len) {
incomplete_rq_list.push_back(req_wrap);
}
Expand Down Expand Up @@ -910,19 +882,15 @@ IOStatus PosixRandomAccessFile::ReadAsync(
args = nullptr;
};

Posix_IOHandle* posix_handle = new Posix_IOHandle();
*io_handle = static_cast<void*>(posix_handle);
*del_fn = deletefn;

// Initialize Posix_IOHandle.
posix_handle->iu = iu;
Posix_IOHandle* posix_handle =
new Posix_IOHandle(iu, cb, cb_arg, req.offset, req.len, req.scratch,
use_direct_io(), GetRequiredBufferAlignment());
posix_handle->iov.iov_base = req.scratch;
posix_handle->iov.iov_len = req.len;
posix_handle->cb = cb;
posix_handle->cb_arg = cb_arg;
posix_handle->offset = req.offset;
posix_handle->len = req.len;
posix_handle->scratch = req.scratch;

*io_handle = static_cast<void*>(posix_handle);
*del_fn = deletefn;

// Step 3: io_uring_sqe_set_data
struct io_uring_sqe* sqe;
Expand Down
59 changes: 53 additions & 6 deletions env/io_posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,24 +52,57 @@ class PosixHelper {
size_t* size);
};

/*
* DirectIOHelper
*/
inline bool IsSectorAligned(const size_t off, size_t sector_size) {
assert((sector_size & (sector_size - 1)) == 0);
return (off & (sector_size - 1)) == 0;
}

#ifndef NDEBUG
inline bool IsSectorAligned(const void* ptr, size_t sector_size) {
return uintptr_t(ptr) % sector_size == 0;
}
#endif

#if defined(ROCKSDB_IOURING_PRESENT)
struct Posix_IOHandle {
Posix_IOHandle(struct io_uring* _iu,
std::function<void(const FSReadRequest&, void*)> _cb,
void* _cb_arg, uint64_t _offset, size_t _len, char* _scratch,
bool _use_direct_io, size_t _alignment)
: iu(_iu),
cb(_cb),
cb_arg(_cb_arg),
offset(_offset),
len(_len),
scratch(_scratch),
use_direct_io(_use_direct_io),
alignment(_alignment),
is_finished(false),
req_count(0) {}

struct iovec iov;
struct io_uring* iu;
std::function<void(const FSReadRequest&, void*)> cb;
void* cb_arg;
uint64_t offset;
size_t len;
char* scratch;
bool is_finished = false;
bool use_direct_io;
size_t alignment;
bool is_finished;
// req_count is used by AbortIO API to keep track of number of requests.
uint32_t req_count = 0;
uint32_t req_count;
};

inline void UpdateResult(struct io_uring_cqe* cqe, const std::string& file_name,
size_t len, size_t iov_len, bool async_read,
bool use_direct_io, uint32_t alignment,
size_t& finished_len, FSReadRequest* req,
size_t& bytes_read) {
size_t& bytes_read, bool& read_again) {
read_again = false;
if (cqe->res < 0) {
req->result = Slice(req->scratch, 0);
req->status = IOError("Req failed", file_name, cqe->res);
Expand All @@ -80,10 +113,24 @@ inline void UpdateResult(struct io_uring_cqe* cqe, const std::string& file_name,
req->result = Slice(req->scratch, req->len);
req->status = IOStatus::OK();
} else if (bytes_read == 0) {
if (async_read) {
// No bytes read. It can means EOF.
req->result = Slice(req->scratch, 0);
/// cqe->res == 0 can means EOF, or can mean partial results. See
// comment
// https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435
// Fall back to pread in this case.
if (use_direct_io && !IsSectorAligned(finished_len, alignment)) {
// Bytes reads don't fill sectors. Should only happen at the end
// of the file.
req->result = Slice(req->scratch, finished_len);
req->status = IOStatus::OK();
} else {
if (async_read) {
// No bytes read. It can means EOF. In case of partial results, it's
// caller responsibility to call read/readasync again.
req->result = Slice(req->scratch, 0);
req->status = IOStatus::OK();
} else {
read_again = true;
}
}
} else if (bytes_read < iov_len) {
assert(bytes_read > 0);
Expand Down
18 changes: 8 additions & 10 deletions file/file_prefetch_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ Status FilePrefetchBuffer::Read(const IOOptions& opts,
Slice result;
Status s = reader->Read(opts, rounddown_start + chunk_len, read_len, &result,
bufs_[index].buffer_.BufferStart() + chunk_len,
nullptr, rate_limiter_priority);
/*aligned_buf=*/nullptr, rate_limiter_priority);
#ifndef NDEBUG
if (result.size() < read_len) {
// Fake an IO error to force db_stress fault injection to ignore
Expand All @@ -108,7 +108,6 @@ Status FilePrefetchBuffer::Read(const IOOptions& opts,

Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts,
RandomAccessFileReader* reader,
Env::IOPriority rate_limiter_priority,
uint64_t read_len, uint64_t chunk_len,
uint64_t rounddown_start, uint32_t index) {
// callback for async read request.
Expand All @@ -120,8 +119,9 @@ Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts,
req.offset = rounddown_start + chunk_len;
req.result = result;
req.scratch = bufs_[index].buffer_.BufferStart() + chunk_len;
Status s = reader->ReadAsync(req, opts, fp, nullptr /*cb_arg*/, &io_handle_,
&del_fn_, rate_limiter_priority);
Status s = reader->ReadAsync(req, opts, fp,
/*cb_arg=*/nullptr, &io_handle_, &del_fn_,
/*aligned_buf=*/nullptr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to keep rate_limiter_priority. And, perhaps in another PR, ReadAsync should use the rate_limiter_priority and also pass it in IOOptions to the file system.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am planning to create another PR supporting rate_limiter. I will pass the argument there.

req.status.PermitUncheckedError();
if (s.ok()) {
async_read_in_progress_ = true;
Expand Down Expand Up @@ -373,8 +373,7 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal(
bufs_[second].offset_ = rounddown_start2;
assert(roundup_len2 >= chunk_len2);
uint64_t read_len2 = static_cast<size_t>(roundup_len2 - chunk_len2);
ReadAsync(opts, reader, rate_limiter_priority, read_len2, chunk_len2,
rounddown_start2, second)
ReadAsync(opts, reader, read_len2, chunk_len2, rounddown_start2, second)
.PermitUncheckedError();
}

Expand Down Expand Up @@ -544,7 +543,8 @@ void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req,
if (req.status.ok()) {
if (req.offset + req.result.size() <=
bufs_[index].offset_ + bufs_[index].buffer_.CurrentSize()) {
// All requested bytes are already in the buffer. So no need to update.
// All requested bytes are already in the buffer or no data is read
// because of EOF. So no need to update.
return;
}
if (req.offset < bufs_[index].offset_) {
Expand All @@ -560,7 +560,6 @@ void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req,
Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
RandomAccessFileReader* reader,
uint64_t offset, size_t n,
Env::IOPriority rate_limiter_priority,
Slice* result) {
assert(reader != nullptr);
if (!enable_) {
Expand Down Expand Up @@ -630,8 +629,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,

size_t read_len = static_cast<size_t>(roundup_len - chunk_len);

s = ReadAsync(opts, reader, rate_limiter_priority, read_len, chunk_len,
rounddown_start, second);
s = ReadAsync(opts, reader, read_len, chunk_len, rounddown_start, second);

if (!s.ok()) {
return s;
Expand Down
10 changes: 3 additions & 7 deletions file/file_prefetch_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,16 +138,13 @@ class FilePrefetchBuffer {
// reader : the file reader.
// offset : the file offset to start reading from.
// n : the number of bytes to read.
// rate_limiter_priority : rate limiting priority, or `Env::IO_TOTAL` to
// bypass.
// result : if data already exists in the buffer, result will
// be updated with the data.
//
// If data already exist in the buffer, it will return Status::OK, otherwise
// it will send asynchronous request and return Status::TryAgain.
Status PrefetchAsync(const IOOptions& opts, RandomAccessFileReader* reader,
uint64_t offset, size_t n,
Env::IOPriority rate_limiter_priority, Slice* result);
uint64_t offset, size_t n, Slice* result);

// Tries returning the data for a file read from this buffer if that data is
// in the buffer.
Expand Down Expand Up @@ -246,9 +243,8 @@ class FilePrefetchBuffer {
uint64_t chunk_len, uint64_t rounddown_start, uint32_t index);

Status ReadAsync(const IOOptions& opts, RandomAccessFileReader* reader,
Env::IOPriority rate_limiter_priority, uint64_t read_len,
uint64_t chunk_len, uint64_t rounddown_start,
uint32_t index);
uint64_t read_len, uint64_t chunk_len,
uint64_t rounddown_start, uint32_t index);

// Copy the data from src to third buffer.
void CopyDataToBuffer(uint32_t src, uint64_t& offset, size_t& length);
Expand Down
Loading