diff --git a/db/c.cc b/db/c.cc index b02c7bc4bd19..82dfeb2c4100 100644 --- a/db/c.cc +++ b/db/c.cc @@ -4299,6 +4299,16 @@ unsigned char rocksdb_options_get_use_direct_io_for_flush_and_compaction( return opt->rep.use_direct_io_for_flush_and_compaction; } +void rocksdb_options_set_use_direct_io_for_wal(rocksdb_options_t* opt, + unsigned char v) { + opt->rep.use_direct_io_for_wal = v; +} + +unsigned char rocksdb_options_get_use_direct_io_for_wal( + rocksdb_options_t* opt) { + return opt->rep.use_direct_io_for_wal; +} + void rocksdb_options_set_allow_mmap_reads(rocksdb_options_t* opt, unsigned char v) { opt->rep.allow_mmap_reads = v; diff --git a/db/c_test.c b/db/c_test.c index ca5a76fba063..037aa70872ba 100644 --- a/db/c_test.c +++ b/db/c_test.c @@ -2650,6 +2650,9 @@ int main(int argc, char** argv) { CheckCondition( 1 == rocksdb_options_get_use_direct_io_for_flush_and_compaction(o)); + rocksdb_options_set_use_direct_io_for_wal(o, 1); + CheckCondition(1 == rocksdb_options_get_use_direct_io_for_wal(o)); + rocksdb_options_set_is_fd_close_on_exec(o, 1); CheckCondition(1 == rocksdb_options_get_is_fd_close_on_exec(o)); @@ -2857,6 +2860,7 @@ int main(int argc, char** argv) { CheckCondition(1 == rocksdb_options_get_use_direct_reads(copy)); CheckCondition( 1 == rocksdb_options_get_use_direct_io_for_flush_and_compaction(copy)); + CheckCondition(1 == rocksdb_options_get_use_direct_io_for_wal(copy)); CheckCondition(1 == rocksdb_options_get_is_fd_close_on_exec(copy)); CheckCondition(18 == rocksdb_options_get_stats_dump_period_sec(copy)); CheckCondition(5 == rocksdb_options_get_stats_persist_period_sec(copy)); @@ -3127,6 +3131,10 @@ int main(int argc, char** argv) { CheckCondition( 1 == rocksdb_options_get_use_direct_io_for_flush_and_compaction(o)); + rocksdb_options_set_use_direct_io_for_wal(copy, 0); + CheckCondition(0 == rocksdb_options_get_use_direct_io_for_wal(copy)); + CheckCondition(1 == rocksdb_options_get_use_direct_io_for_wal(o)); + rocksdb_options_set_is_fd_close_on_exec(copy, 0); CheckCondition(0 == rocksdb_options_get_is_fd_close_on_exec(copy)); CheckCondition(1 == rocksdb_options_get_is_fd_close_on_exec(o)); diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index cccc3ea2c708..1db4021414f8 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -2344,6 +2344,16 @@ IOStatus DBImpl::CreateWAL(const WriteOptions& write_options, lfile->SetWriteLifeTimeHint(opt_file_options.write_hint); lfile->SetPreallocationBlockSize(preallocate_block_size); + // For direct I/O WAL, pre-allocation is mandatory even if user specified 0 + // Use a default block size of 1MB for direct I/O WAL + uint64_t direct_io_preallocation_block_size = + immutable_db_options_.wal_direct_io_preallocation_block_size; + if (immutable_db_options_.use_direct_io_for_wal && + direct_io_preallocation_block_size == 0) { + direct_io_preallocation_block_size = + DBOptions{}.wal_direct_io_preallocation_block_size; + } + const auto& listeners = immutable_db_options_.listeners; FileTypeSet tmp_set = immutable_db_options_.checksum_handoff_file_types; std::unique_ptr file_writer(new WritableFileWriter( @@ -2351,7 +2361,8 @@ IOStatus DBImpl::CreateWAL(const WriteOptions& write_options, immutable_db_options_.clock, io_tracer_, nullptr /* stats */, Histograms::HISTOGRAM_ENUM_MAX /* hist_type */, listeners, nullptr, tmp_set.Contains(FileType::kWalFile), - tmp_set.Contains(FileType::kWalFile))); + tmp_set.Contains(FileType::kWalFile), + direct_io_preallocation_block_size, immutable_db_options_.env)); *new_log = new log::Writer(std::move(file_writer), log_file_num, immutable_db_options_.recycle_log_file_num > 0, immutable_db_options_.manual_wal_flush, diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 75e13724a75e..4d6bab0ccf3a 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -3144,6 +3144,389 @@ TEST_F(DBWALTest, WALWriteErrorNoRecovery) { fault_fs->DisableThreadLocalErrorInjection(FaultInjectionIOType::kWrite); Destroy(options); } + +#if defined(ROCKSDB_PLATFORM_POSIX) +TEST_F(DBWALTest, DirectIOForWAL) { + Options options = CurrentOptions(); + options.use_direct_io_for_wal = true; + options.create_if_missing = true; + + // Check if direct I/O is supported + Status s = TryReopen(options); + if (!s.ok()) { + // Direct I/O might not be supported on this file system + ROCKSDB_GTEST_SKIP("Direct I/O not supported"); + return; + } + + // Write some data to generate WAL writes + for (int i = 0; i < 100; i++) { + ASSERT_OK(Put("key" + std::to_string(i), "value" + std::to_string(i))); + } + + // Flush to ensure WAL is written + ASSERT_OK(dbfull()->FlushWAL(false)); + + // Verify data can be read back + for (int i = 0; i < 100; i++) { + ASSERT_EQ("value" + std::to_string(i), Get("key" + std::to_string(i))); + } + + // Reopen and verify data persisted + Reopen(options); + for (int i = 0; i < 100; i++) { + ASSERT_EQ("value" + std::to_string(i), Get("key" + std::to_string(i))); + } +} + +TEST_F(DBWALTest, DirectIOWithPreallocation) { + Options options = CurrentOptions(); + options.use_direct_io_for_wal = true; + options.wal_direct_io_preallocation_block_size = 1024 * 1024; // 1MB + options.create_if_missing = true; + options.wal_bytes_per_sync = 0; // Disable sync-per-write for performance + + Status s = TryReopen(options); + if (!s.ok()) { + ROCKSDB_GTEST_SKIP("Direct I/O not supported"); + return; + } + + // Write enough data to trigger multiple pre-allocation blocks + const int num_writes = 1000; + for (int i = 0; i < num_writes; i++) { + std::string value(1024, 'a' + (i % 26)); // 1KB value + ASSERT_OK(Put("key" + std::to_string(i), value)); + } + + ASSERT_OK(dbfull()->FlushWAL(false)); + + // Verify all data is readable + for (int i = 0; i < num_writes; i++) { + std::string value(1024, 'a' + (i % 26)); + ASSERT_EQ(value, Get("key" + std::to_string(i))); + } + + // Reopen and verify persistence + Reopen(options); + for (int i = 0; i < num_writes; i++) { + std::string value(1024, 'a' + (i % 26)); + ASSERT_EQ(value, Get("key" + std::to_string(i))); + } +} + +TEST_F(DBWALTest, DirectIOPreallocationWithSmallBlockSize) { + Options options = CurrentOptions(); + options.use_direct_io_for_wal = true; + options.wal_direct_io_preallocation_block_size = 64 * 1024; // 64KB + options.create_if_missing = true; + + Status s = TryReopen(options); + if (!s.ok()) { + ROCKSDB_GTEST_SKIP("Direct I/O not supported"); + return; + } + + // Write data that will require multiple preallocation blocks + for (int i = 0; i < 200; i++) { + std::string value(512, 'x'); + ASSERT_OK(Put("key" + std::to_string(i), value)); + } + + ASSERT_OK(dbfull()->FlushWAL(false)); + + // Verify data integrity + for (int i = 0; i < 200; i++) { + std::string value(512, 'x'); + ASSERT_EQ(value, Get("key" + std::to_string(i))); + } +} + +TEST_F(DBWALTest, DirectIOPreallocationWithLargeBlockSize) { + Options options = CurrentOptions(); + options.use_direct_io_for_wal = true; + options.wal_direct_io_preallocation_block_size = 4 * 1024 * 1024; // 4MB + options.create_if_missing = true; + + Status s = TryReopen(options); + if (!s.ok()) { + ROCKSDB_GTEST_SKIP("Direct I/O not supported"); + return; + } + + // Write data smaller than one preallocation block + for (int i = 0; i < 500; i++) { + ASSERT_OK(Put("key" + std::to_string(i), "value" + std::to_string(i))); + } + + ASSERT_OK(dbfull()->FlushWAL(false)); + + // Verify data + for (int i = 0; i < 500; i++) { + ASSERT_EQ("value" + std::to_string(i), Get("key" + std::to_string(i))); + } +} + +TEST_F(DBWALTest, DirectIOPreallocationMultipleFlushes) { + Options options = CurrentOptions(); + options.use_direct_io_for_wal = true; + options.wal_direct_io_preallocation_block_size = 512 * 1024; // 512KB + options.create_if_missing = true; + + Status s = TryReopen(options); + if (!s.ok()) { + ROCKSDB_GTEST_SKIP("Direct I/O not supported"); + return; + } + + // Perform multiple write-flush cycles + for (int round = 0; round < 5; round++) { + for (int i = 0; i < 100; i++) { + std::string key = + "round" + std::to_string(round) + "_key" + std::to_string(i); + std::string value(256, 'a' + round); + ASSERT_OK(Put(key, value)); + } + ASSERT_OK(dbfull()->FlushWAL(false)); + } + + // Verify all data across all rounds + for (int round = 0; round < 5; round++) { + for (int i = 0; i < 100; i++) { + std::string key = + "round" + std::to_string(round) + "_key" + std::to_string(i); + std::string value(256, 'a' + round); + ASSERT_EQ(value, Get(key)); + } + } +} + +TEST_F(DBWALTest, DirectIOPreallocationWithSync) { + Options options = CurrentOptions(); + options.use_direct_io_for_wal = true; + options.wal_direct_io_preallocation_block_size = 256 * 1024; // 256KB + options.create_if_missing = true; + + Status s = TryReopen(options); + if (!s.ok()) { + ROCKSDB_GTEST_SKIP("Direct I/O not supported"); + return; + } + + WriteOptions write_opts; + write_opts.sync = true; + + // Write with sync enabled to test interaction with preallocation + for (int i = 0; i < 100; i++) { + std::string value(512, 'y'); + ASSERT_OK(Put("key" + std::to_string(i), value, write_opts)); + } + + // Verify data + for (int i = 0; i < 100; i++) { + std::string value(512, 'y'); + ASSERT_EQ(value, Get("key" + std::to_string(i))); + } + + // Reopen to ensure synced data is durable + Reopen(options); + for (int i = 0; i < 100; i++) { + std::string value(512, 'y'); + ASSERT_EQ(value, Get("key" + std::to_string(i))); + } +} + +TEST_F(DBWALTest, DirectIOPreallocationRecovery) { + Options options = CurrentOptions(); + options.use_direct_io_for_wal = true; + options.wal_direct_io_preallocation_block_size = 1024 * 1024; + options.create_if_missing = true; + + Status s = TryReopen(options); + if (!s.ok()) { + ROCKSDB_GTEST_SKIP("Direct I/O not supported"); + return; + } + + // Write data + for (int i = 0; i < 500; i++) { + ASSERT_OK(Put("key" + std::to_string(i), "value" + std::to_string(i))); + } + ASSERT_OK(dbfull()->FlushWAL(true)); + + // Close and reopen to test WAL recovery + Close(); + ASSERT_OK(TryReopen(options)); + + // Verify recovered data + for (int i = 0; i < 500; i++) { + ASSERT_EQ("value" + std::to_string(i), Get("key" + std::to_string(i))); + } +} + +TEST_F(DBWALTest, DirectIOPreallocationZeroBlockSize) { + Options options = CurrentOptions(); + options.use_direct_io_for_wal = true; + options.wal_direct_io_preallocation_block_size = 0; // 0 (triggers default 1MB fallback) + options.create_if_missing = true; + + Status s = TryReopen(options); + if (!s.ok()) { + ROCKSDB_GTEST_SKIP("Direct I/O not supported"); + return; + } + + // Even with preallocation disabled, direct I/O should work + // The default will be used (1MB) + for (int i = 0; i < 100; i++) { + ASSERT_OK(Put("key" + std::to_string(i), "value" + std::to_string(i))); + } + + ASSERT_OK(dbfull()->FlushWAL(false)); + + for (int i = 0; i < 100; i++) { + ASSERT_EQ("value" + std::to_string(i), Get("key" + std::to_string(i))); + } +} + +TEST_F(DBWALTest, DirectIOPreallocationConcurrentWrites) { + Options options = CurrentOptions(); + options.use_direct_io_for_wal = true; + options.wal_direct_io_preallocation_block_size = 512 * 1024; + options.create_if_missing = true; + options.max_background_jobs = 4; + + Status s = TryReopen(options); + if (!s.ok()) { + ROCKSDB_GTEST_SKIP("Direct I/O not supported"); + return; + } + + // Concurrent writes to test thread safety of preallocation + std::atomic write_count{0}; + auto writer = [this, &write_count]() { + for (int i = 0; i < 50; i++) { + int id = write_count.fetch_add(1); + std::string key = "key" + std::to_string(id); + std::string value(256, 'a' + (id % 26)); + ASSERT_OK(Put(key, value)); + } + }; + + std::vector threads; + for (int i = 0; i < 4; i++) { + threads.emplace_back(writer); + } + + for (auto& t : threads) { + t.join(); + } + + ASSERT_OK(dbfull()->FlushWAL(false)); + + // Verify all writes succeeded + for (int i = 0; i < 200; i++) { + std::string key = "key" + std::to_string(i); + std::string value(256, 'a' + (i % 26)); + ASSERT_EQ(value, Get(key)); + } +} + +TEST_F(DBWALTest, DirectIOPreallocationWithMemtableFlush) { + Options options = CurrentOptions(); + options.use_direct_io_for_wal = true; + options.wal_direct_io_preallocation_block_size = 256 * 1024; + options.create_if_missing = true; + options.write_buffer_size = 64 * 1024; // Small memtable for frequent flushes + + Status s = TryReopen(options); + if (!s.ok()) { + ROCKSDB_GTEST_SKIP("Direct I/O not supported"); + return; + } + + // Write enough to trigger memtable flushes + for (int i = 0; i < 1000; i++) { + std::string value(128, 'z'); + ASSERT_OK(Put("key" + std::to_string(i), value)); + } + + // Wait for flushes to complete + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); + + // Verify data + for (int i = 0; i < 1000; i++) { + std::string value(128, 'z'); + ASSERT_EQ(value, Get("key" + std::to_string(i))); + } +} + +TEST_F(DBWALTest, DirectIOPreallocationEdgeCases) { + Options options = CurrentOptions(); + options.use_direct_io_for_wal = true; + options.wal_direct_io_preallocation_block_size = 128 * 1024; + options.create_if_missing = true; + + Status s = TryReopen(options); + if (!s.ok()) { + ROCKSDB_GTEST_SKIP("Direct I/O not supported"); + return; + } + + // Test 1: Empty write + ASSERT_OK(dbfull()->FlushWAL(false)); + + // Test 2: Single small write + ASSERT_OK(Put("key1", "value1")); + ASSERT_OK(dbfull()->FlushWAL(false)); + ASSERT_EQ("value1", Get("key1")); + + // Test 3: Write close to block boundary + std::string large_value(128 * 1024 - 100, 'b'); // Close to boundary + ASSERT_OK(Put("large_key", large_value)); + ASSERT_OK(dbfull()->FlushWAL(false)); + ASSERT_EQ(large_value, Get("large_key")); + + // Test 4: Very small writes + for (int i = 0; i < 10; i++) { + ASSERT_OK(Put("tiny" + std::to_string(i), "v")); + } + ASSERT_OK(dbfull()->FlushWAL(false)); + for (int i = 0; i < 10; i++) { + ASSERT_EQ("v", Get("tiny" + std::to_string(i))); + } +} + +TEST_F(DBWALTest, DirectIOPreallocationCloseWhileActive) { + Options options = CurrentOptions(); + options.use_direct_io_for_wal = true; + options.wal_direct_io_preallocation_block_size = 1024 * 1024; + options.create_if_missing = true; + + Status s = TryReopen(options); + if (!s.ok()) { + ROCKSDB_GTEST_SKIP("Direct I/O not supported"); + return; + } + + // Write data to potentially trigger background preallocation + for (int i = 0; i < 500; i++) { + std::string value(1024, 'c'); + ASSERT_OK(Put("key" + std::to_string(i), value)); + } + + // Immediately close - this should wait for preallocation to complete + Close(); + + // Reopen and verify data was persisted correctly + ASSERT_OK(TryReopen(options)); + for (int i = 0; i < 500; i++) { + std::string value(1024, 'c'); + ASSERT_EQ(value, Get("key" + std::to_string(i))); + } +} +#endif // ROCKSDB_PLATFORM_POSIX + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 619c24e75b40..b44ac288cbe6 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -182,6 +182,7 @@ DECLARE_bool(mmap_read); DECLARE_bool(mmap_write); DECLARE_bool(use_direct_reads); DECLARE_bool(use_direct_io_for_flush_and_compaction); +DECLARE_bool(use_direct_io_for_wal); DECLARE_bool(mock_direct_io); DECLARE_bool(statistics); DECLARE_bool(sync); diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index e9f7e172bd15..fae522cfc0d2 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -639,6 +639,10 @@ DEFINE_bool(use_direct_io_for_flush_and_compaction, ROCKSDB_NAMESPACE::Options().use_direct_io_for_flush_and_compaction, "Use O_DIRECT for writing data"); +DEFINE_bool(use_direct_io_for_wal, + ROCKSDB_NAMESPACE::Options().use_direct_io_for_wal, + "Use O_DIRECT for WAL writes"); + DEFINE_bool(mock_direct_io, false, "Mock direct IO by not using O_DIRECT for direct IO read"); diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 6a37af5a4c66..6da8227827e1 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -4397,6 +4397,7 @@ void InitializeOptionsFromFlags( options.use_direct_reads = FLAGS_use_direct_reads; options.use_direct_io_for_flush_and_compaction = FLAGS_use_direct_io_for_flush_and_compaction; + options.use_direct_io_for_wal = FLAGS_use_direct_io_for_wal; options.recycle_log_file_num = static_cast(FLAGS_recycle_log_file_num); options.target_file_size_base = FLAGS_target_file_size_base; diff --git a/env/file_system.cc b/env/file_system.cc index fad48cc1175f..39de618db990 100644 --- a/env/file_system.cc +++ b/env/file_system.cc @@ -146,6 +146,7 @@ FileOptions FileSystem::OptimizeForLogWrite(const FileOptions& file_options, optimized_file_options.bytes_per_sync = db_options.wal_bytes_per_sync; optimized_file_options.writable_file_max_buffer_size = db_options.writable_file_max_buffer_size; + optimized_file_options.use_direct_writes = db_options.use_direct_io_for_wal; return optimized_file_options; } diff --git a/env/fs_posix.cc b/env/fs_posix.cc index c93d9ce8675f..ea2deba49bc5 100644 --- a/env/fs_posix.cc +++ b/env/fs_posix.cc @@ -921,7 +921,7 @@ class PosixFileSystem : public FileSystem { const DBOptions& db_options) const override { FileOptions optimized = file_options; optimized.use_mmap_writes = false; - optimized.use_direct_writes = false; + optimized.use_direct_writes = db_options.use_direct_io_for_wal; optimized.bytes_per_sync = db_options.wal_bytes_per_sync; // TODO(icanadi) it's faster if fallocate_with_keep_size is false, but it // breaks TransactionLogIteratorStallAtLastRecord unit test. Fix the unit diff --git a/file/writable_file_writer.cc b/file/writable_file_writer.cc index 2a92c0754dcd..6105f9abf873 100644 --- a/file/writable_file_writer.cc +++ b/file/writable_file_writer.cc @@ -250,6 +250,11 @@ IOStatus WritableFileWriter::Pad(const IOOptions& opts, const size_t pad_bytes, IOStatus WritableFileWriter::Close(const IOOptions& opts) { IOOptions io_options = FinalizeIOOptions(opts); + + // Disable pre-allocation and wait for any ongoing pre-allocation to complete + direct_io_preallocation_disabled_.store(true, std::memory_order_release); + WaitForPreallocation(); + if (seen_error()) { IOStatus interim; if (writable_file_.get() != nullptr) { @@ -367,12 +372,37 @@ IOStatus WritableFileWriter::Flush(const IOOptions& opts) { if (buf_.CurrentSize() > 0) { if (use_direct_io()) { - if (pending_sync_) { + // Direct I/O path: + // We only issue a positional write if there is *new* logical data + // beyond what we previously flushed. After a prior direct write we + // may have refit a leftover (partial page) tail back into the buffer; + // that tail has already been persisted once (together with zero + // padding). Without this guard we'd rewrite the same tail page again + // on every Flush() call that finds the buffer non-empty but contains + // no newly appended bytes. The condition below prevents that by + // comparing logical file size to the last flushed logical size + // (`flushed_filesize_`). + uint64_t cur_size = next_write_offset_ + buf_.CurrentSize(); + uint64_t flushed_filesize = + flushed_filesize_.load(std::memory_order_acquire); + if (pending_sync_ && cur_size > flushed_filesize) { + // Check if we need to wait for pre-allocation + uint64_t preallocated_size = + direct_io_preallocated_size_.load(std::memory_order_acquire); + if (UseDirectIOPreallocation() && cur_size > preallocated_size) { + WaitForPreallocation(); + } + if (perform_data_verification_ && buffered_data_with_checksum_) { s = WriteDirectWithChecksum(io_options); } else { s = WriteDirect(io_options); } + + if (s.ok()) { + // Check if we should trigger next pre-allocation + MaybeSchedulePreallocation(); + } } } else { if (perform_data_verification_ && buffered_data_with_checksum_) { @@ -484,6 +514,27 @@ IOStatus WritableFileWriter::Sync(const IOOptions& opts, bool use_fsync) { return s; } } + if (use_direct_io() && UseDirectIOPreallocation()) { + uint64_t cur_size = filesize_.load(std::memory_order_acquire); + uint64_t preallocated_size = + direct_io_preallocated_size_.load(std::memory_order_acquire); + if (cur_size > preallocated_size) { + // Ensure any ongoing pre-allocation is complete before returning + WaitForPreallocation(); + + // Check again after waiting + preallocated_size = + direct_io_preallocated_size_.load(std::memory_order_acquire); + // If still not enough, sync to ensure data durability + if (cur_size > preallocated_size) { + s = SyncInternal(io_options, use_fsync); + if (!s.ok()) { + set_seen_error(s); + return s; + } + } + } + } TEST_KILL_RANDOM("WritableFileWriter::Sync:1"); pending_sync_ = false; return IOStatus::OK(); @@ -656,6 +707,7 @@ IOStatus WritableFileWriter::WriteBuffered(const IOOptions& opts, src += allowed; uint64_t cur_size = flushed_size_.load(std::memory_order_acquire); flushed_size_.store(cur_size + allowed, std::memory_order_release); + flushed_filesize_.store(cur_size + allowed, std::memory_order_release); } buf_.Size(0); buffered_data_crc32c_checksum_ = 0; @@ -749,6 +801,7 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum(const IOOptions& opts, buffered_data_crc32c_checksum_ = 0; uint64_t cur_size = flushed_size_.load(std::memory_order_acquire); flushed_size_.store(cur_size + left, std::memory_order_release); + flushed_filesize_.store(cur_size + left, std::memory_order_release); if (!s.ok()) { set_seen_error(s); } @@ -803,6 +856,14 @@ IOStatus WritableFileWriter::WriteDirect(const IOOptions& opts) { // whole page fills out. const size_t leftover_tail = buf_.CurrentSize() - file_advance; + // After the positional write succeeds we `RefitTail()` to move the + // leftover logical bytes (which were already persisted once together with + // zero padding) to the beginning of the buffer so that future appends can + // extend them. Until more logical data is appended `filesize_ == + // flushed_filesize_` and a subsequent Flush() must not re-issue another + // PositionedAppend for this same tail. That is why Flush() adds the + // `filesize_ > flushed_filesize_` guard for direct I/O. + // Round up and pad buf_.PadToAlignmentWith(0); @@ -860,8 +921,10 @@ IOStatus WritableFileWriter::WriteDirect(const IOOptions& opts) { left -= size; src += size; write_offset += size; - uint64_t cur_size = flushed_size_.load(std::memory_order_acquire); - flushed_size_.store(cur_size + size, std::memory_order_release); + flushed_size_.store(write_offset, std::memory_order_release); + flushed_filesize_.store( + std::min(write_offset, filesize_.load(std::memory_order_acquire)), + std::memory_order_release); assert((next_write_offset_ % alignment) == 0); } @@ -966,8 +1029,10 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum(const IOOptions& opts) { IOSTATS_ADD(bytes_written, left); assert((next_write_offset_ % alignment) == 0); - uint64_t cur_size = flushed_size_.load(std::memory_order_acquire); - flushed_size_.store(cur_size + left, std::memory_order_release); + flushed_size_.store(next_write_offset_ + left, std::memory_order_release); + flushed_filesize_.store(std::min(next_write_offset_ + left, + filesize_.load(std::memory_order_acquire)), + std::memory_order_release); if (s.ok()) { // Move the tail to the beginning of the buffer @@ -1013,4 +1078,124 @@ IOOptions WritableFileWriter::FinalizeIOOptions(const IOOptions& opts) const { } return io_options; } + +void WritableFileWriter::InitialPreallocation() { + if (!UseDirectIOPreallocation() || env_ == nullptr) { + return; + } + + // Trigger initial pre-allocation in background with HIGH priority + direct_io_preallocation_in_progress_.store(true, std::memory_order_release); + env_->Schedule(&WritableFileWriter::BGWorkPreallocation, this, + Env::Priority::HIGH); +} + +void WritableFileWriter::WaitForPreallocation() { + if (!UseDirectIOPreallocation()) { + return; + } + + // Wait for pre-allocation to complete + auto lock = std::unique_lock(direct_io_preallocation_mutex_); + direct_io_preallocation_cv_.wait(lock, [this]() { + return !direct_io_preallocation_in_progress_.load( + std::memory_order_acquire); + }); +} + +void WritableFileWriter::MaybeSchedulePreallocation() { + if (!UseDirectIOPreallocation() || env_ == nullptr) { + return; + } + + // Don't schedule new pre-allocations if disabled (e.g., during Close) + if (direct_io_preallocation_disabled_.load(std::memory_order_acquire)) { + return; + } + + uint64_t cur_size = flushed_filesize_.load(std::memory_order_acquire); + uint64_t preallocated_size = + direct_io_preallocated_size_.load(std::memory_order_acquire); + + // Trigger next pre-allocation when remaining space < 50% of block size + // Example: block_size=1MB, preallocated=3MB -> trigger when filesize > 2.5MB + if (cur_size + direct_io_preallocation_block_size_ / 2 > preallocated_size) { + bool expected = false; + if (direct_io_preallocation_in_progress_.compare_exchange_strong( + expected, true, std::memory_order_acq_rel)) { + // Schedule background work + env_->Schedule(&WritableFileWriter::BGWorkPreallocation, this, + Env::Priority::LOW); + } + } +} + +void WritableFileWriter::BGWorkPreallocation(void* writer) { + reinterpret_cast(writer)->DoPreallocation(); +} + +void WritableFileWriter::DoPreallocation() { + assert(UseDirectIOPreallocation()); + + auto lock = std::unique_lock(direct_io_preallocation_mutex_); + + // Check again if we should proceed (might have been disabled) + if (direct_io_preallocation_disabled_.load(std::memory_order_acquire)) { + direct_io_preallocation_in_progress_.store(false, + std::memory_order_release); + lock.unlock(); + direct_io_preallocation_cv_.notify_one(); + return; + } + + const size_t alignment = buf_.Alignment(); + uint64_t block_size = direct_io_preallocation_block_size_; + uint64_t preallocated_size = + direct_io_preallocated_size_.load(std::memory_order_acquire); + assert(preallocated_size % alignment == 0); + uint64_t flushed_size = flushed_size_.load(std::memory_order_acquire); + assert(flushed_size % alignment == 0); + uint64_t preallocation_offset = std::max(preallocated_size, flushed_size); + + // Use AlignedBuffer for zero-filling to support direct I/O + AlignedBuffer zero_buffer; + zero_buffer.Alignment(writable_file_->GetRequiredBufferAlignment()); + zero_buffer.AllocateNewBuffer(block_size); + memset(zero_buffer.BufferStart(), 0, block_size); + + IOOptions io_opts; + io_opts.io_activity = Env::IOActivity::kCustomIOActivity80; + + // Write zeros to extend the file + IOStatus s; + uint64_t written = 0; + while (written < block_size && s.ok()) { + size_t to_write = block_size - written; + s = writable_file_->PositionedAppend( + Slice(zero_buffer.BufferStart(), to_write), + preallocation_offset + written, io_opts, nullptr); + if (s.ok()) { + written += to_write; + assert((preallocation_offset + written) % alignment == 0); + } + } + + if (s.ok()) { + // Sync to persist file length metadata + s = writable_file_->Sync(io_opts, nullptr /* dbg */); + } + + if (s.ok()) { + // Update preallocated size + direct_io_preallocated_size_.store(preallocation_offset + written, + std::memory_order_release); + } else { + // Ignore pre-allocation errors. They will be surfaced during actual writes. + } + + direct_io_preallocation_in_progress_.store(false, std::memory_order_release); + lock.unlock(); + direct_io_preallocation_cv_.notify_one(); // Wake up any waiting threads +} + } // namespace ROCKSDB_NAMESPACE diff --git a/file/writable_file_writer.h b/file/writable_file_writer.h index 619821204b3e..27e25f7a310e 100644 --- a/file/writable_file_writer.h +++ b/file/writable_file_writer.h @@ -9,6 +9,7 @@ #pragma once #include +#include #include #include "db/version_edit.h" @@ -145,6 +146,19 @@ class WritableFileWriter { // Actually written data size can be used for truncate // not counting padding data std::atomic filesize_; + // Actually flushed logical data size (excluding any page-alignment padding + // bytes written during direct I/O). In direct I/O mode we may write a whole + // aligned page that includes a "leftover tail" of logical data plus zero + // padding; that tail is then kept in the buffer (refit) for future appends. + // When a subsequent Flush() occurs with no new logical data appended, + // `filesize_ == flushed_filesize_` and we must avoid re-writing the same + // tail page again. The Flush() direct I/O branch checks + // `filesize_ > flushed_filesize_` to decide whether there is new logical + // data to persist. This prevents redundant positional writes of the tail + // page. Contrast with `flushed_size_` which tracks physical bytes written + // (including padding and page alignment effects) and is used for range + // sync heuristics. + std::atomic flushed_filesize_; std::atomic flushed_size_; // This is necessary when we use unbuffered access // and writes must happen on aligned offsets @@ -168,6 +182,14 @@ class WritableFileWriter { bool buffered_data_with_checksum_; Temperature temperature_; + uint64_t direct_io_preallocation_block_size_; + std::atomic direct_io_preallocated_size_; + std::atomic direct_io_preallocation_in_progress_; + std::atomic direct_io_preallocation_disabled_; + std::mutex direct_io_preallocation_mutex_; + std::condition_variable direct_io_preallocation_cv_; + Env* env_; // For scheduling background work + public: WritableFileWriter( std::unique_ptr&& file, const std::string& _file_name, @@ -178,13 +200,15 @@ class WritableFileWriter { const std::vector>& listeners = {}, FileChecksumGenFactory* file_checksum_gen_factory = nullptr, bool perform_data_verification = false, - bool buffered_data_with_checksum = false) + bool buffered_data_with_checksum = false, + uint64_t direct_io_preallocation_block_size = 0, Env* env = nullptr) : file_name_(_file_name), writable_file_(std::move(file), io_tracer, _file_name), clock_(clock), buf_(), max_buffer_size_(options.writable_file_max_buffer_size), filesize_(0), + flushed_filesize_(0), flushed_size_(0), next_write_offset_(0), pending_sync_(false), @@ -202,9 +226,24 @@ class WritableFileWriter { checksum_finalized_(false), perform_data_verification_(perform_data_verification), buffered_data_crc32c_checksum_(0), - buffered_data_with_checksum_(buffered_data_with_checksum) { + buffered_data_with_checksum_(buffered_data_with_checksum), + direct_io_preallocation_block_size_(direct_io_preallocation_block_size), + direct_io_preallocated_size_(0), + direct_io_preallocation_in_progress_(false), + direct_io_preallocation_disabled_(false), + env_(env) { temperature_ = options.temperature; assert(!use_direct_io() || max_buffer_size_ > 0); + // Adjust direct IO preallocation block size to be multiple of alignment + if (use_direct_io()) { + if (direct_io_preallocation_block_size_ > 0) { + direct_io_preallocation_block_size_ = + Roundup(direct_io_preallocation_block_size_, + writable_file_->GetRequiredBufferAlignment()); + } + } else { + direct_io_preallocation_block_size_ = 0; + } TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0", reinterpret_cast(max_buffer_size_)); buf_.Alignment(writable_file_->GetRequiredBufferAlignment()); @@ -222,6 +261,8 @@ class WritableFileWriter { file_checksum_gen_factory->CreateFileChecksumGenerator( checksum_gen_context); } + // If enabled, trigger initial preallocation in background. + InitialPreallocation(); } static IOStatus Create(const std::shared_ptr& fs, @@ -367,5 +408,20 @@ class WritableFileWriter { // `opts` should've been called with `FinalizeIOOptions()` before passing in IOStatus SyncInternal(const IOOptions& opts, bool use_fsync); IOOptions FinalizeIOOptions(const IOOptions& opts) const; + + // Check if direct I/O preallocation is enabled + bool UseDirectIOPreallocation() const { + return direct_io_preallocation_block_size_ > 0; + } + // Trigger initial pre-allocation for newly created file + void InitialPreallocation(); + // Check if we should trigger pre-allocation (>50% of current block used) + void MaybeSchedulePreallocation(); + // Wait for ongoing pre-allocation to complete + void WaitForPreallocation(); + // Background work for pre-allocation + static void BGWorkPreallocation(void* writer); + // Perform pre-allocation + void DoPreallocation(); }; } // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index ce9e5229c824..c17957a2fb46 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -1779,6 +1779,10 @@ rocksdb_options_set_use_direct_io_for_flush_and_compaction(rocksdb_options_t*, unsigned char); extern ROCKSDB_LIBRARY_API unsigned char rocksdb_options_get_use_direct_io_for_flush_and_compaction(rocksdb_options_t*); +extern ROCKSDB_LIBRARY_API void rocksdb_options_set_use_direct_io_for_wal( + rocksdb_options_t*, unsigned char); +extern ROCKSDB_LIBRARY_API unsigned char +rocksdb_options_get_use_direct_io_for_wal(rocksdb_options_t*); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_is_fd_close_on_exec( rocksdb_options_t*, unsigned char); extern ROCKSDB_LIBRARY_API unsigned char diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index d8acfe8f7175..ba7e64dd6a00 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1082,6 +1082,18 @@ struct DBOptions { // Default: false bool use_direct_io_for_flush_and_compaction = false; + // Use O_DIRECT for WAL writes. + // When enabled, WAL writes will bypass the OS page cache and use direct I/O. + // This requires proper alignment (typically 512 bytes). The log writer will + // automatically pad records to meet alignment requirements. + // Default: false + bool use_direct_io_for_wal = false; + + // The block size for preallocation when using direct I/O for WAL. + // Must be multiple of the logical sector size of the underlying storage. + // Default: 1MB + uint64_t wal_direct_io_preallocation_block_size = 1024 * 1024; + // If false, fallocate() calls are bypassed, which disables file // preallocation. The file space preallocation is used to increase the file // write/append performance. By default, RocksDB preallocates space for WAL, diff --git a/options/db_options.cc b/options/db_options.cc index dfacea8e5b22..1792a7177a01 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -183,6 +183,15 @@ static std::unordered_map use_direct_io_for_flush_and_compaction), OptionType::kBoolean, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"use_direct_io_for_wal", + {offsetof(struct ImmutableDBOptions, use_direct_io_for_wal), + OptionType::kBoolean, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"wal_direct_io_preallocation_block_size", + {offsetof(struct ImmutableDBOptions, + wal_direct_io_preallocation_block_size), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, {"allow_2pc", {offsetof(struct ImmutableDBOptions, allow_2pc), OptionType::kBoolean, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, @@ -751,6 +760,9 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) use_direct_reads(options.use_direct_reads), use_direct_io_for_flush_and_compaction( options.use_direct_io_for_flush_and_compaction), + use_direct_io_for_wal(options.use_direct_io_for_wal), + wal_direct_io_preallocation_block_size( + options.wal_direct_io_preallocation_block_size), allow_fallocate(options.allow_fallocate), is_fd_close_on_exec(options.is_fd_close_on_exec), advise_random_on_open(options.advise_random_on_open), @@ -873,6 +885,11 @@ void ImmutableDBOptions::Dump(Logger* log) const { " " "Options.use_direct_io_for_flush_and_compaction: %d", use_direct_io_for_flush_and_compaction); + ROCKS_LOG_HEADER(log, " Options.use_direct_io_for_wal: %d", + use_direct_io_for_wal); + ROCKS_LOG_HEADER( + log, " Options.wal_direct_io_preallocation_block_size: %" ROCKSDB_PRIszt, + wal_direct_io_preallocation_block_size); ROCKS_LOG_HEADER(log, " Options.create_missing_column_families: %d", create_missing_column_families); ROCKS_LOG_HEADER(log, " Options.db_log_dir: %s", diff --git a/options/db_options.h b/options/db_options.h index ef8607d8bba1..9fea238b4997 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -55,6 +55,8 @@ struct ImmutableDBOptions { bool allow_mmap_writes; bool use_direct_reads; bool use_direct_io_for_flush_and_compaction; + bool use_direct_io_for_wal; + uint64_t wal_direct_io_preallocation_block_size; bool allow_fallocate; bool is_fd_close_on_exec; bool advise_random_on_open; diff --git a/options/options_helper.cc b/options/options_helper.cc index e5622d0a3238..63e9e5bca793 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -113,6 +113,9 @@ void BuildDBOptions(const ImmutableDBOptions& immutable_db_options, options.use_direct_reads = immutable_db_options.use_direct_reads; options.use_direct_io_for_flush_and_compaction = immutable_db_options.use_direct_io_for_flush_and_compaction; + options.use_direct_io_for_wal = immutable_db_options.use_direct_io_for_wal; + options.wal_direct_io_preallocation_block_size = + immutable_db_options.wal_direct_io_preallocation_block_size; options.allow_fallocate = immutable_db_options.allow_fallocate; options.is_fd_close_on_exec = immutable_db_options.is_fd_close_on_exec; options.stats_dump_period_sec = mutable_db_options.stats_dump_period_sec; diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index c752b2401718..576885ee339f 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -438,6 +438,7 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "allow_mmap_reads=false;" "use_direct_reads=false;" "use_direct_io_for_flush_and_compaction=false;" + "use_direct_io_for_wal=false;" "max_log_file_size=4607;" "advise_random_on_open=true;" "enable_pipelined_write=false;" diff --git a/options/options_test.cc b/options/options_test.cc index 7111872f541b..fcf6239a109a 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -169,6 +169,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"allow_mmap_writes", "false"}, {"use_direct_reads", "false"}, {"use_direct_io_for_flush_and_compaction", "false"}, + {"use_direct_io_for_wal", "false"}, {"is_fd_close_on_exec", "true"}, {"skip_log_error_on_recovery", "false"}, {"stats_dump_period_sec", "46"}, @@ -352,6 +353,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.allow_mmap_writes, false); ASSERT_EQ(new_db_opt.use_direct_reads, false); ASSERT_EQ(new_db_opt.use_direct_io_for_flush_and_compaction, false); + ASSERT_EQ(new_db_opt.use_direct_io_for_wal, false); ASSERT_EQ(new_db_opt.is_fd_close_on_exec, true); ASSERT_EQ(new_db_opt.stats_dump_period_sec, 46U); ASSERT_EQ(new_db_opt.stats_persist_period_sec, 57U); @@ -2490,6 +2492,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) { {"allow_mmap_writes", "false"}, {"use_direct_reads", "false"}, {"use_direct_io_for_flush_and_compaction", "false"}, + {"use_direct_io_for_wal", "false"}, {"is_fd_close_on_exec", "true"}, {"skip_log_error_on_recovery", "false"}, {"stats_dump_period_sec", "46"}, @@ -2678,6 +2681,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.allow_mmap_writes, false); ASSERT_EQ(new_db_opt.use_direct_reads, false); ASSERT_EQ(new_db_opt.use_direct_io_for_flush_and_compaction, false); + ASSERT_EQ(new_db_opt.use_direct_io_for_wal, false); ASSERT_EQ(new_db_opt.is_fd_close_on_exec, true); ASSERT_EQ(new_db_opt.stats_dump_period_sec, 46U); ASSERT_EQ(new_db_opt.stats_persist_period_sec, 57U); diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 655bba868f6e..73cc44192bb0 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1672,6 +1672,10 @@ DEFINE_bool(use_direct_io_for_flush_and_compaction, ROCKSDB_NAMESPACE::Options().use_direct_io_for_flush_and_compaction, "Use O_DIRECT for background flush and compaction writes"); +DEFINE_bool(use_direct_io_for_wal, + ROCKSDB_NAMESPACE::Options().use_direct_io_for_wal, + "Use O_DIRECT for WAL writes"); + DEFINE_bool(advise_random_on_open, ROCKSDB_NAMESPACE::Options().advise_random_on_open, "Advise random access on table file open"); @@ -4396,6 +4400,7 @@ class Benchmark { options.use_direct_reads = FLAGS_use_direct_reads; options.use_direct_io_for_flush_and_compaction = FLAGS_use_direct_io_for_flush_and_compaction; + options.use_direct_io_for_wal = FLAGS_use_direct_io_for_wal; options.manual_wal_flush = FLAGS_manual_wal_flush; options.wal_compression = FLAGS_wal_compression_e; options.ttl = FLAGS_fifo_compaction_ttl; diff --git a/util/file_reader_writer_test.cc b/util/file_reader_writer_test.cc index 3ac8b9fe782b..155037ad405a 100644 --- a/util/file_reader_writer_test.cc +++ b/util/file_reader_writer_test.cc @@ -334,6 +334,30 @@ TEST_F(WritableFileWriterTest, BufferWithZeroCapacityDirectIO) { } } +TEST_F(WritableFileWriterTest, RepeatedFlushWithDirectIO) { + EnvOptions env_opts; + env_opts.use_direct_writes = true; + env_opts.writable_file_max_buffer_size = 1024 * 1024; + + std::unique_ptr writer; + auto s = + WritableFileWriter::Create(FileSystem::Default(), /*fname=*/"dont_care_2", + FileOptions(env_opts), &writer, + /*dbg=*/nullptr); + EXPECT_OK(s); + + const auto slice = Slice("hello"); + EXPECT_OK(writer->Append(IOOptions(), slice)); + ASSERT_EQ(writer->GetFileSize(), slice.size()); + + EXPECT_OK(writer->Flush(IOOptions())); + const auto flushed_size = writer->GetFlushedSize(); + + // flushed twice. + EXPECT_OK(writer->Flush(IOOptions())); + ASSERT_EQ(writer->GetFlushedSize(), flushed_size); +} + class DBWritableFileWriterTest : public DBTestBase { public: DBWritableFileWriterTest()