Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 123 additions & 38 deletions env/env_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1655,42 +1655,6 @@ void GenerateFilesAndRequest(Env* env, const std::string& fname,
}
}

TEST_F(EnvPosixTest, MultiReadIOUringError) {
// In this test we don't do aligned read, so we can't do direct I/O.
EnvOptions soptions;
soptions.use_direct_reads = soptions.use_direct_writes = false;
std::string fname = test::PerThreadDBPath(env_, "testfile");

std::vector<std::string> scratches;
std::vector<ReadRequest> reqs;
GenerateFilesAndRequest(env_, fname, &reqs, &scratches);
// Query the data
std::unique_ptr<RandomAccessFile> file;
ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));

bool io_uring_wait_cqe_called = false;
SyncPoint::GetInstance()->SetCallBack(
"PosixRandomAccessFile::MultiRead:io_uring_wait_cqe:return",
[&](void* arg) {
if (!io_uring_wait_cqe_called) {
io_uring_wait_cqe_called = true;
ssize_t& ret = *(static_cast<ssize_t*>(arg));
ret = 1;
}
});
SyncPoint::GetInstance()->EnableProcessing();

Status s = file->MultiRead(reqs.data(), reqs.size());
if (io_uring_wait_cqe_called) {
ASSERT_NOK(s);
} else {
s.PermitUncheckedError();
}

SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}

TEST_F(EnvPosixTest, MultiReadIOUringError2) {
// In this test we don't do aligned read, so we can't do direct I/O.
EnvOptions soptions;
Expand All @@ -1717,8 +1681,9 @@ TEST_F(EnvPosixTest, MultiReadIOUringError2) {
[&](void* arg) {
struct io_uring* iu = static_cast<struct io_uring*>(arg);
struct io_uring_cqe* cqe;
assert(io_uring_wait_cqe(iu, &cqe) == 0);
io_uring_cqe_seen(iu, cqe);
// CQ should be empty after drain - peek should fail
int ret = io_uring_peek_cqe(iu, &cqe);
assert(-EAGAIN == ret); // No CQEs available
});
SyncPoint::GetInstance()->EnableProcessing();

Expand Down Expand Up @@ -3640,6 +3605,126 @@ TEST_F(TestAsyncRead, ReadAsync) {
}
}

// Test ReadAsync -> MultiRead -> Poll with real io_uring (not mock).
// This verifies that MultiRead doesn't interfere with async read buffers.
TEST_F(TestAsyncRead, InterleavingIOUringOperations) {
#if defined(ROCKSDB_IOURING_PRESENT)
// Use the real filesystem directly (not the mock ReadAsyncFS).
std::shared_ptr<FileSystem> fs = env_->GetFileSystem();
std::string fname = test::PerThreadDBPath(env_, "testfile_iouring");

constexpr size_t kSectorSize = 4096;
constexpr size_t kNumSectors = 8;

// 1. Create & write to a file.
{
std::unique_ptr<FSWritableFile> wfile;
ASSERT_OK(
fs->NewWritableFile(fname, FileOptions(), &wfile, nullptr /*dbg*/));

for (size_t i = 0; i < kNumSectors; ++i) {
auto data = NewAligned(kSectorSize * 8, static_cast<char>(i + 1));
Slice slice(data.get(), kSectorSize);
ASSERT_OK(wfile->Append(slice, IOOptions(), nullptr));
}
ASSERT_OK(wfile->Close(IOOptions(), nullptr));
}

// 2. Test interleaved ReadAsync and MultiRead operations.
{
std::unique_ptr<FSRandomAccessFile> file;
ASSERT_OK(fs->NewRandomAccessFile(fname, FileOptions(), &file, nullptr));

IOOptions opts;
std::vector<void*> io_handles(kNumSectors);
std::vector<FSReadRequest> async_reqs(kNumSectors);
std::vector<std::unique_ptr<char, Deleter>> async_data;
std::vector<size_t> vals;
IOHandleDeleter del_fn;

// Initialize async read requests.
for (size_t i = 0; i < kNumSectors; i++) {
async_reqs[i].offset = i * kSectorSize;
async_reqs[i].len = kSectorSize;
async_data.emplace_back(NewAligned(kSectorSize, 0));
async_reqs[i].scratch = async_data.back().get();
vals.push_back(i);
}

// Callback function for async reads.
std::function<void(FSReadRequest&, void*)> callback =
[&](FSReadRequest& req, void* cb_arg) {
assert(cb_arg != nullptr);
size_t i = *(reinterpret_cast<size_t*>(cb_arg));
async_reqs[i].offset = req.offset;
async_reqs[i].result = req.result;
async_reqs[i].status = req.status;
};

// Submit asynchronous read requests.
for (size_t i = 0; i < kNumSectors; i++) {
void* cb_arg = static_cast<void*>(&(vals[i]));
IOStatus s = file->ReadAsync(async_reqs[i], opts, callback, cb_arg,
&(io_handles[i]), &del_fn, nullptr);
if (s.IsNotSupported()) {
// io_uring not supported on this system, skip the test.
fprintf(stderr, "Skipping test - io_uring not supported: %s\n",
s.ToString().c_str());
for (size_t j = 0; j < i; j++) {
if (io_handles[j] != nullptr) {
del_fn(io_handles[j]);
}
}
return;
}
// For any other error, fail the test.
ASSERT_OK(s);
}

// Do a MultiRead on same sectors while async reads are submitted.
std::vector<FSReadRequest> multi_reqs(kNumSectors);
std::vector<std::unique_ptr<char, Deleter>> multi_data;
for (size_t i = 0; i < kNumSectors; i++) {
multi_reqs[i].offset = i * kSectorSize;
multi_reqs[i].len = kSectorSize;
multi_data.emplace_back(NewAligned(kSectorSize, 0));
multi_reqs[i].scratch = multi_data.back().get();
}
ASSERT_OK(file->MultiRead(multi_reqs.data(), kNumSectors, opts, nullptr));

// Check the status of MultiRead requests (should all succeed).
for (size_t i = 0; i < kNumSectors; i++) {
auto buf = NewAligned(kSectorSize * 8, static_cast<char>(i + 1));
Slice expected_data(buf.get(), kSectorSize);

ASSERT_EQ(multi_reqs[i].offset, i * kSectorSize);
ASSERT_OK(multi_reqs[i].status);
ASSERT_EQ(expected_data.ToString(), multi_reqs[i].result.ToString());
}

// Poll for the submitted async requests.
ASSERT_OK(fs->Poll(io_handles, kNumSectors));

// Check the status of async read requests (should all succeed).
for (size_t i = 0; i < kNumSectors; i++) {
auto buf = NewAligned(kSectorSize * 8, static_cast<char>(i + 1));
Slice expected_data(buf.get(), kSectorSize);

ASSERT_EQ(async_reqs[i].offset, i * kSectorSize);
ASSERT_OK(async_reqs[i].status);
ASSERT_EQ(expected_data.ToString(), async_reqs[i].result.ToString());
}

// Delete io_handles.
for (size_t i = 0; i < io_handles.size(); i++) {
del_fn(io_handles[i]);
}
}
#else
fprintf(stderr, "Skipping test - ROCKSDB_IOURING_PRESENT not defined\n");
#endif
}

struct StaticDestructionTester {
bool activated = false;
~StaticDestructionTester() {
Expand Down
21 changes: 14 additions & 7 deletions env/fs_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,10 @@ class PosixFileSystem : public FileSystem {
options
#if defined(ROCKSDB_IOURING_PRESENT)
,
!IsIOUringEnabled() ? nullptr : thread_local_io_urings_.get()
!IsIOUringEnabled() ? nullptr
: thread_local_async_read_io_urings_.get(),
!IsIOUringEnabled() ? nullptr
: thread_local_multi_read_io_urings_.get()
#endif
));
}
Expand Down Expand Up @@ -1087,8 +1090,9 @@ class PosixFileSystem : public FileSystem {
#if defined(ROCKSDB_IOURING_PRESENT)
// io_uring_queue_init.
struct io_uring* iu = nullptr;
if (thread_local_io_urings_) {
iu = static_cast<struct io_uring*>(thread_local_io_urings_->Get());
if (thread_local_async_read_io_urings_) {
iu = static_cast<struct io_uring*>(
thread_local_async_read_io_urings_->Get());
}

// Init failed, platform doesn't support io_uring.
Expand Down Expand Up @@ -1161,8 +1165,9 @@ class PosixFileSystem : public FileSystem {
#if defined(ROCKSDB_IOURING_PRESENT)
// io_uring_queue_init.
struct io_uring* iu = nullptr;
if (thread_local_io_urings_) {
iu = static_cast<struct io_uring*>(thread_local_io_urings_->Get());
if (thread_local_async_read_io_urings_) {
iu = static_cast<struct io_uring*>(
thread_local_async_read_io_urings_->Get());
}

// Init failed, platform doesn't support io_uring.
Expand Down Expand Up @@ -1277,7 +1282,8 @@ class PosixFileSystem : public FileSystem {

#if defined(ROCKSDB_IOURING_PRESENT)
// io_uring instance
std::unique_ptr<ThreadLocalPtr> thread_local_io_urings_;
std::unique_ptr<ThreadLocalPtr> thread_local_async_read_io_urings_;
std::unique_ptr<ThreadLocalPtr> thread_local_multi_read_io_urings_;
#endif

size_t page_size_;
Expand Down Expand Up @@ -1337,7 +1343,8 @@ PosixFileSystem::PosixFileSystem()
// io_uring can be created.
struct io_uring* new_io_uring = CreateIOUring();
if (new_io_uring != nullptr) {
thread_local_io_urings_.reset(new ThreadLocalPtr(DeleteIOUring));
thread_local_async_read_io_urings_.reset(new ThreadLocalPtr(DeleteIOUring));
thread_local_multi_read_io_urings_.reset(new ThreadLocalPtr(DeleteIOUring));
delete new_io_uring;
}
#endif
Expand Down
Loading
Loading