From bf98dcf9a8ffd1f0af2707571ff4a283d0788092 Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Thu, 21 Mar 2024 12:38:53 -0700 Subject: [PATCH] Fix kBlockCacheTier read when merge-chain base value is in a blob file (#12462) Summary: The original goal is to propagate failures from `GetContext::SaveValue()` -> `GetContext::GetBlobValue()` -> `BlobFetcher::FetchBlob()` up to the user. This call sequence happens when a merge chain ends with a base value in a blob file. There's also fixes for bugs encountered along the way where non-ok statuses were ignored/overwritten, and a bit of plumbing work for functions that had no capability to return a status. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12462 Test Plan: A repro command ``` db=/dev/shm/dbstress_db ; exp=/dev/shm/dbstress_exp ; rm -rf $db $exp ; mkdir -p $db $exp ./db_stress \ --clear_column_family_one_in=0 \ --test_batches_snapshots=0 \ --write_fault_one_in=0 \ --use_put_entity_one_in=0 \ --prefixpercent=0 \ --read_fault_one_in=0 \ --readpercent=0 \ --reopen=0 \ --set_options_one_in=10000 \ --delpercent=0 \ --delrangepercent=0 \ --open_metadata_write_fault_one_in=0 \ --open_read_fault_one_in=0 \ --open_write_fault_one_in=0 \ --destroy_db_initially=0 \ --ingest_external_file_one_in=0 \ --iterpercent=0 \ --nooverwritepercent=0 \ --db=$db \ --enable_blob_files=1 \ --expected_values_dir=$exp \ --max_background_compactions=20 \ --max_bytes_for_level_base=2097152 \ --max_key=100000 \ --min_blob_size=0 \ --open_files=-1 \ --ops_per_thread=100000000 \ --prefix_size=-1 \ --target_file_size_base=524288 \ --use_merge=1 \ --value_size_mult=32 \ --write_buffer_size=524288 \ --writepercent=100 ``` It used to fail like: ``` ... frame https://github.com/facebook/rocksdb/issues/9: 0x00007fc63903bc93 libc.so.6`__GI___assert_fail(assertion="HasDefaultColumn(columns)", file="fbcode/internal_repo_rocksdb/repo/db/wide/wide_columns_helper.h", line=33, function="static const rocksdb::Slice &rocksdb::WideColumnsHelper::GetDefaultColumn(const rocksdb::WideColumns &)") at assert.c:101:3 frame https://github.com/facebook/rocksdb/issues/10: 0x00000000006f7e92 db_stress`rocksdb::Version::Get(rocksdb::ReadOptions const&, rocksdb::LookupKey const&, rocksdb::PinnableSlice*, rocksdb::PinnableWideColumns*, std::__cxx11::basic_string, std::allocator>*, rocksdb::Status*, rocksdb::MergeContext*, unsigned long*, rocksdb::PinnedIteratorsManager*, bool*, bool*, unsigned long*, rocksdb::ReadCallback*, bool*, bool) [inlined] rocksdb::WideColumnsHelper::GetDefaultColumn(columns=size=0) at wide_columns_helper.h:33 frame https://github.com/facebook/rocksdb/issues/11: 0x00000000006f7e76 db_stress`rocksdb::Version::Get(this=0x00007fc5ec763000, read_options=, k=, value=0x0000000000000000, columns=0x00007fc6035fd1d8, timestamp=, status=0x00007fc6035fd250, merge_context=0x00007fc6035fce40, max_covering_tombstone_seq=0x00007fc6035fce90, pinned_iters_mgr=0x00007fc6035fcdf0, value_found=0x0000000000000000, key_exists=0x0000000000000000, seq=0x0000000000000000, callback=0x0000000000000000, is_blob=0x0000000000000000, do_merge=) at version_set.cc:2492 frame https://github.com/facebook/rocksdb/issues/12: 0x000000000051e245 db_stress`rocksdb::DBImpl::GetImpl(this=0x00007fc637a86000, read_options=0x00007fc6035fcf60, key=, get_impl_options=0x00007fc6035fd000) at db_impl.cc:2408 frame https://github.com/facebook/rocksdb/issues/13: 0x000000000050cec2 db_stress`rocksdb::DBImpl::GetEntity(this=0x00007fc637a86000, _read_options=, column_family=, key=0x00007fc6035fd3c8, columns=0x00007fc6035fd1d8) at db_impl.cc:2109 frame https://github.com/facebook/rocksdb/issues/14: 0x000000000074f688 db_stress`rocksdb::(anonymous namespace)::MemTableInserter::MergeCF(this=0x00007fc6035fd450, column_family_id=2, key=0x00007fc6035fd3c8, value=0x00007fc6035fd3a0) at write_batch.cc:2656 frame https://github.com/facebook/rocksdb/issues/15: 0x00000000007476fc db_stress`rocksdb::WriteBatchInternal::Iterate(wb=0x00007fc6035fe698, handler=0x00007fc6035fd450, begin=12, end=) at write_batch.cc:607 frame https://github.com/facebook/rocksdb/issues/16: 0x000000000074d7dd db_stress`rocksdb::WriteBatchInternal::InsertInto(rocksdb::WriteThread::WriteGroup&, unsigned long, rocksdb::ColumnFamilyMemTables*, rocksdb::FlushScheduler*, rocksdb::TrimHistoryScheduler*, bool, unsigned long, rocksdb::DB*, bool, bool, bool) [inlined] rocksdb::WriteBatch::Iterate(this=, handler=0x00007fc6035fd450) const at write_batch.cc:505 frame https://github.com/facebook/rocksdb/issues/17: 0x000000000074d77b db_stress`rocksdb::WriteBatchInternal::InsertInto(write_group=, sequence=, memtables=, flush_scheduler=, trim_history_scheduler=, ignore_missing_column_families=, recovery_log_number=0, db=0x00007fc637a86000, concurrent_memtable_writes=, seq_per_batch=false, batch_per_txn=) at write_batch.cc:3084 frame https://github.com/facebook/rocksdb/issues/18: 0x0000000000631d77 db_stress`rocksdb::DBImpl::PipelinedWriteImpl(this=0x00007fc637a86000, write_options=, my_batch=0x00007fc6035fe698, callback=0x0000000000000000, log_used=, log_ref=0, disable_memtable=, seq_used=0x0000000000000000) at db_impl_write.cc:807 frame https://github.com/facebook/rocksdb/issues/19: 0x000000000062ceeb db_stress`rocksdb::DBImpl::WriteImpl(this=, write_options=, my_batch=0x00007fc6035fe698, callback=0x0000000000000000, log_used=, log_ref=0, disable_memtable=, seq_used=0x0000000000000000, batch_cnt=0, pre_release_callback=0x0000000000000000, post_memtable_callback=0x0000000000000000) at db_impl_write.cc:312 frame https://github.com/facebook/rocksdb/issues/20: 0x000000000062c8ec db_stress`rocksdb::DBImpl::Write(this=0x00007fc637a86000, write_options=0x00007fc6035feca8, my_batch=0x00007fc6035fe698) at db_impl_write.cc:157 frame https://github.com/facebook/rocksdb/issues/21: 0x000000000062b847 db_stress`rocksdb::DB::Merge(this=0x00007fc637a86000, opt=0x00007fc6035feca8, column_family=0x00007fc6370bf140, key=0x00007fc6035fe8d8, value=0x00007fc6035fe830) at db_impl_write.cc:2544 frame https://github.com/facebook/rocksdb/issues/22: 0x000000000062b6ef db_stress`rocksdb::DBImpl::Merge(this=0x00007fc637a86000, o=, column_family=0x00007fc6370bf140, key=0x00007fc6035fe8d8, val=0x00007fc6035fe830) at db_impl_write.cc:72 frame https://github.com/facebook/rocksdb/issues/23: 0x00000000004d6397 db_stress`rocksdb::NonBatchedOpsStressTest::TestPut(this=0x00007fc637041000, thread=0x00007fc6370dbc00, write_opts=0x00007fc6035feca8, read_opts=0x00007fc6035fe9c8, rand_column_families=, rand_keys=size=1, value={P\xe9_\x03\xc6\x7f\0\0}) at no_batched_ops_stress.cc:1317 frame https://github.com/facebook/rocksdb/issues/24: 0x000000000049361d db_stress`rocksdb::StressTest::OperateDb(this=0x00007fc637041000, thread=0x00007fc6370dbc00) at db_stress_test_base.cc:1148 ... ``` Reviewed By: ltamasi Differential Revision: D55157795 Pulled By: ajkr fbshipit-source-id: 5f7c1380ead5794c29d41680028e34b839744764 --- db/blob/db_blob_basic_test.cc | 24 ++++++++++++++ db/table_cache.cc | 13 ++++---- db/table_cache.h | 3 +- db/table_cache_sync_and_async.h | 10 ++++-- table/block_based/block_based_table_reader.cc | 17 +++++++--- .../block_based_table_reader_sync_and_async.h | 16 +++++++-- table/cuckoo/cuckoo_table_reader.cc | 5 ++- table/get_context.cc | 33 +++++++++++-------- table/get_context.h | 13 ++++---- table/mock_table.cc | 8 ++++- table/plain/plain_table_reader.cc | 8 +++-- .../blockcachetier_blob_as_mergebase.md | 1 + 12 files changed, 110 insertions(+), 41 deletions(-) create mode 100644 unreleased_history/bug_fixes/blockcachetier_blob_as_mergebase.md diff --git a/db/blob/db_blob_basic_test.cc b/db/blob/db_blob_basic_test.cc index ef48844b432..e41933d3549 100644 --- a/db/blob/db_blob_basic_test.cc +++ b/db/blob/db_blob_basic_test.cc @@ -1182,6 +1182,30 @@ TEST_F(DBBlobBasicTest, GetMergeBlobWithPut) { ASSERT_EQ(Get("Key1"), "v1,v2,v3"); } +TEST_F(DBBlobBasicTest, GetMergeBlobFromMemoryTier) { + Options options = GetDefaultOptions(); + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + options.enable_blob_files = true; + options.min_blob_size = 0; + + Reopen(options); + + ASSERT_OK(Put(Key(0), "v1")); + ASSERT_OK(Flush()); + ASSERT_OK(Merge(Key(0), "v2")); + ASSERT_OK(Flush()); + + // Regular `Get()` loads data block to cache. + std::string value; + ASSERT_OK(db_->Get(ReadOptions(), Key(0), &value)); + ASSERT_EQ("v1,v2", value); + + // Base value blob is still uncached, so an in-memory read will fail. + ReadOptions read_options; + read_options.read_tier = kBlockCacheTier; + ASSERT_TRUE(db_->Get(read_options, Key(0), &value).IsIncomplete()); +} + TEST_F(DBBlobBasicTest, MultiGetMergeBlobWithPut) { constexpr size_t num_keys = 3; diff --git a/db/table_cache.cc b/db/table_cache.cc index 13cbbe58418..02956c7c29c 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -395,7 +395,7 @@ uint64_t TableCache::CreateRowCacheKeyPrefix(const ReadOptions& options, bool TableCache::GetFromRowCache(const Slice& user_key, IterKey& row_cache_key, size_t prefix_size, GetContext* get_context, - SequenceNumber seq_no) { + Status* read_status, SequenceNumber seq_no) { bool found = false; row_cache_key.TrimAppend(prefix_size, user_key.data(), user_key.size()); @@ -414,8 +414,8 @@ bool TableCache::GetFromRowCache(const Slice& user_key, IterKey& row_cache_key, row_cache.RegisterReleaseAsCleanup(row_handle, value_pinner); // If row cache hit, knowing cache key is the same to row_cache_key, // can use row_cache_key's seq no to construct InternalKey. - replayGetContextLog(*row_cache.Value(row_handle), user_key, get_context, - &value_pinner, seq_no); + *read_status = replayGetContextLog(*row_cache.Value(row_handle), user_key, + get_context, &value_pinner, seq_no); RecordTick(ioptions_.stats, ROW_CACHE_HIT); found = true; } else { @@ -440,21 +440,20 @@ Status TableCache::Get( // Check row cache if enabled. // Reuse row_cache_key sequence number when row cache hits. + Status s; if (ioptions_.row_cache && !get_context->NeedToReadSequence()) { auto user_key = ExtractUserKey(k); uint64_t cache_entry_seq_no = CreateRowCacheKeyPrefix(options, fd, k, get_context, row_cache_key); done = GetFromRowCache(user_key, row_cache_key, row_cache_key.Size(), - get_context, cache_entry_seq_no); + get_context, &s, cache_entry_seq_no); if (!done) { row_cache_entry = &row_cache_entry_buffer; } } - Status s; TableReader* t = fd.table_reader; TypedHandle* handle = nullptr; - if (!done) { - assert(s.ok()); + if (s.ok() && !done) { if (t == nullptr) { s = FindTable(options, file_options_, internal_comparator, file_meta, &handle, block_protection_bytes_per_key, prefix_extractor, diff --git a/db/table_cache.h b/db/table_cache.h index ae3fc93c376..cf3cd25c914 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -273,6 +273,7 @@ class TableCache { // user key to row_cache_key at offset prefix_size bool GetFromRowCache(const Slice& user_key, IterKey& row_cache_key, size_t prefix_size, GetContext* get_context, + Status* read_status, SequenceNumber seq_no = kMaxSequenceNumber); const ImmutableOptions& ioptions_; @@ -286,4 +287,4 @@ class TableCache { std::string db_session_id_; }; -} // namespace ROCKSDB_NAMESPACE \ No newline at end of file +} // namespace ROCKSDB_NAMESPACE diff --git a/db/table_cache_sync_and_async.h b/db/table_cache_sync_and_async.h index a06b7d43164..f069c8b8055 100644 --- a/db/table_cache_sync_and_async.h +++ b/db/table_cache_sync_and_async.h @@ -50,8 +50,14 @@ DEFINE_SYNC_AND_ASYNC(Status, TableCache::MultiGet) GetContext* get_context = miter->get_context; - if (GetFromRowCache(user_key, row_cache_key, row_cache_key_prefix_size, - get_context)) { + Status read_status; + bool ret = + GetFromRowCache(user_key, row_cache_key, row_cache_key_prefix_size, + get_context, &read_status); + if (!read_status.ok()) { + CO_RETURN read_status; + } + if (ret) { table_range.SkipKey(miter); } else { row_cache_entries.emplace_back(); diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index e85a8bff4c3..adf5b972356 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -2339,11 +2339,18 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, biter.key(), &parsed_key, false /* log_err_key */); // TODO if (!pik_status.ok()) { s = pik_status; + break; } - if (!get_context->SaveValue( - parsed_key, biter.value(), &matched, - biter.IsValuePinned() ? &biter : nullptr)) { + Status read_status; + bool ret = get_context->SaveValue( + parsed_key, biter.value(), &matched, &read_status, + biter.IsValuePinned() ? &biter : nullptr); + if (!read_status.ok()) { + s = read_status; + break; + } + if (!ret) { if (get_context->State() == GetContext::GetState::kFound) { does_referenced_key_exist = true; referenced_data_size = biter.key().size() + biter.value().size(); @@ -2352,7 +2359,9 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, break; } } - s = biter.status(); + if (s.ok()) { + s = biter.status(); + } if (!s.ok()) { break; } diff --git a/table/block_based/block_based_table_reader_sync_and_async.h b/table/block_based/block_based_table_reader_sync_and_async.h index 98cf73dcacf..6350560c1d3 100644 --- a/table/block_based/block_based_table_reader_sync_and_async.h +++ b/table/block_based/block_based_table_reader_sync_and_async.h @@ -735,9 +735,17 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::MultiGet) biter->key(), &parsed_key, false /* log_err_key */); // TODO if (!pik_status.ok()) { s = pik_status; + break; + } + Status read_status; + bool ret = get_context->SaveValue( + parsed_key, biter->value(), &matched, &read_status, + value_pinner ? value_pinner : nullptr); + if (!read_status.ok()) { + s = read_status; + break; } - if (!get_context->SaveValue(parsed_key, biter->value(), &matched, - value_pinner)) { + if (!ret) { if (get_context->State() == GetContext::GetState::kFound) { does_referenced_key_exist = true; referenced_data_size = @@ -746,7 +754,9 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::MultiGet) done = true; break; } - s = biter->status(); + if (s.ok()) { + s = biter->status(); + } } // Write the block cache access. // XXX: There appear to be 'break' statements above that bypass this diff --git a/table/cuckoo/cuckoo_table_reader.cc b/table/cuckoo/cuckoo_table_reader.cc index d74a0b041f8..5be1ebc19ec 100644 --- a/table/cuckoo/cuckoo_table_reader.cc +++ b/table/cuckoo/cuckoo_table_reader.cc @@ -186,7 +186,10 @@ Status CuckooTableReader::Get(const ReadOptions& /*readOptions*/, return s; } bool dont_care __attribute__((__unused__)); - get_context->SaveValue(found_ikey, value, &dont_care); + get_context->SaveValue(found_ikey, value, &dont_care, &s); + if (!s.ok()) { + return s; + } } // We don't support merge operations. So, we return here. return Status::OK(); diff --git a/table/get_context.cc b/table/get_context.cc index 16251f93a9c..763bd8d197e 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -221,7 +221,7 @@ void GetContext::ReportCounters() { bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, const Slice& value, bool* matched, - Cleanable* value_pinner) { + Status* read_status, Cleanable* value_pinner) { assert(matched); assert((state_ != kMerge && parsed_key.type != kTypeMerge) || merge_context_ != nullptr); @@ -356,8 +356,8 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, // merge_context_->operand_list if (type == kTypeBlobIndex) { PinnableSlice pin_val; - if (GetBlobValue(parsed_key.user_key, unpacked_value, &pin_val) == - false) { + if (GetBlobValue(parsed_key.user_key, unpacked_value, &pin_val, + read_status) == false) { return false; } Slice blob_value(pin_val); @@ -383,8 +383,8 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, assert(merge_operator_ != nullptr); if (type == kTypeBlobIndex) { PinnableSlice pin_val; - if (GetBlobValue(parsed_key.user_key, unpacked_value, &pin_val) == - false) { + if (GetBlobValue(parsed_key.user_key, unpacked_value, &pin_val, + read_status) == false) { return false; } Slice blob_value(pin_val); @@ -547,14 +547,14 @@ void GetContext::MergeWithWideColumnBaseValue(const Slice& entity) { } bool GetContext::GetBlobValue(const Slice& user_key, const Slice& blob_index, - PinnableSlice* blob_value) { + PinnableSlice* blob_value, Status* read_status) { constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; constexpr uint64_t* bytes_read = nullptr; - Status status = blob_fetcher_->FetchBlob( - user_key, blob_index, prefetch_buffer, blob_value, bytes_read); - if (!status.ok()) { - if (status.IsIncomplete()) { + *read_status = blob_fetcher_->FetchBlob(user_key, blob_index, prefetch_buffer, + blob_value, bytes_read); + if (!read_status->ok()) { + if (read_status->IsIncomplete()) { // FIXME: this code is not covered by unit tests MarkKeyMayExist(); return false; @@ -577,9 +577,9 @@ void GetContext::push_operand(const Slice& value, Cleanable* value_pinner) { } } -void replayGetContextLog(const Slice& replay_log, const Slice& user_key, - GetContext* get_context, Cleanable* value_pinner, - SequenceNumber seq_no) { +Status replayGetContextLog(const Slice& replay_log, const Slice& user_key, + GetContext* get_context, Cleanable* value_pinner, + SequenceNumber seq_no) { Slice s = replay_log; Slice ts; size_t ts_sz = get_context->TimestampSize(); @@ -610,8 +610,13 @@ void replayGetContextLog(const Slice& replay_log, const Slice& user_key, (void)ret; - get_context->SaveValue(ikey, value, &dont_care, value_pinner); + Status read_status; + get_context->SaveValue(ikey, value, &dont_care, &read_status, value_pinner); + if (!read_status.ok()) { + return read_status; + } } + return Status::OK(); } } // namespace ROCKSDB_NAMESPACE diff --git a/table/get_context.h b/table/get_context.h index da41631fc40..ada479001c9 100644 --- a/table/get_context.h +++ b/table/get_context.h @@ -135,7 +135,8 @@ class GetContext { // Returns True if more keys need to be read (due to merges) or // False if the complete value has been found. bool SaveValue(const ParsedInternalKey& parsed_key, const Slice& value, - bool* matched, Cleanable* value_pinner = nullptr); + bool* matched, Status* read_status, + Cleanable* value_pinner = nullptr); // Simplified version of the previous function. Should only be used when we // know that the operation is a Put. @@ -204,7 +205,7 @@ class GetContext { void MergeWithWideColumnBaseValue(const Slice& entity); bool GetBlobValue(const Slice& user_key, const Slice& blob_index, - PinnableSlice* blob_value); + PinnableSlice* blob_value, Status* read_status); void appendToReplayLog(ValueType type, Slice value, Slice ts); @@ -250,9 +251,9 @@ class GetContext { // Call this to replay a log and bring the get_context up to date. The replay // log must have been created by another GetContext object, whose replay log // must have been set by calling GetContext::SetReplayLog(). -void replayGetContextLog(const Slice& replay_log, const Slice& user_key, - GetContext* get_context, - Cleanable* value_pinner = nullptr, - SequenceNumber seq_no = kMaxSequenceNumber); +Status replayGetContextLog(const Slice& replay_log, const Slice& user_key, + GetContext* get_context, + Cleanable* value_pinner = nullptr, + SequenceNumber seq_no = kMaxSequenceNumber); } // namespace ROCKSDB_NAMESPACE diff --git a/table/mock_table.cc b/table/mock_table.cc index ef12060d741..14fbb3f1d07 100644 --- a/table/mock_table.cc +++ b/table/mock_table.cc @@ -220,7 +220,13 @@ Status MockTableReader::Get(const ReadOptions&, const Slice& key, } bool dont_care __attribute__((__unused__)); - if (!get_context->SaveValue(parsed_key, iter->value(), &dont_care)) { + Status read_status; + bool ret = get_context->SaveValue(parsed_key, iter->value(), &dont_care, + &read_status); + if (!read_status.ok()) { + return read_status; + } + if (!ret) { break; } } diff --git a/table/plain/plain_table_reader.cc b/table/plain/plain_table_reader.cc index 595dfb10d26..d3c968f73a9 100644 --- a/table/plain/plain_table_reader.cc +++ b/table/plain/plain_table_reader.cc @@ -614,8 +614,12 @@ Status PlainTableReader::Get(const ReadOptions& /*ro*/, const Slice& target, // can we enable the fast path? if (internal_comparator_.Compare(found_key, parsed_target) >= 0) { bool dont_care __attribute__((__unused__)); - if (!get_context->SaveValue(found_key, found_value, &dont_care, - dummy_cleanable_.get())) { + bool ret = get_context->SaveValue(found_key, found_value, &dont_care, &s, + dummy_cleanable_.get()); + if (!s.ok()) { + return s; + } + if (!ret) { break; } } diff --git a/unreleased_history/bug_fixes/blockcachetier_blob_as_mergebase.md b/unreleased_history/bug_fixes/blockcachetier_blob_as_mergebase.md new file mode 100644 index 00000000000..13ee51a827c --- /dev/null +++ b/unreleased_history/bug_fixes/blockcachetier_blob_as_mergebase.md @@ -0,0 +1 @@ +* Fixed `kBlockCacheTier` reads to return `Status::Incomplete` when I/O is needed to fetch a merge chain's base value from a blob file.