Skip to content

Commit

Permalink
Fix potential incorrect result for duplicate key in MultiGet (faceboo…
Browse files Browse the repository at this point in the history
…k#12295)

Summary:
The RocksDB correctness testing has recently discovered a possible, but very unlikely, correctness issue with MultiGet. The issue happens when all of the below conditions are met -
1. Duplicate keys in a MultiGet batch
2. Key matches the last key in a non-zero, non-bottommost level file
3. Final value is not in the file (merge operand, not snapshot visible etc)
4. Multiple entries exist for the key in the file spanning more than 1 data block. This can happen due to snapshots, which would force multiple versions of the key in the file, and they may spill over to another data block
5. Lookup attempt in the SST for the first of the duplicates fails with IO error on a data block (NOT the first data block, but the second or subsequent uncached block), but no errors for the other duplicates
6. Value or merge operand for the key is present in the very next level

The problem is, in FilePickerMultiGet, when looking up keys in a level we use FileIndexer and the overlapping file in the current level to determine the search bounds for that key in the file list in the next level. If the next level is empty, the search bounds are reset and we do a full binary search in the next non-empty level's LevelFilesBrief. However, under the  conditions facebook#1 and facebook#2 listed above, only the first of the duplicates has its next-level search bounds updated, and the remaining duplicates are skipped.

Pull Request resolved: facebook#12295

Test Plan: Add unit tests that fail an assertion or return wrong result without the fix

Reviewed By: hx235

Differential Revision: D53187634

Pulled By: anand1976

fbshipit-source-id: a5eadf4fede9bbdec784cd993b15e3341436d1ea
  • Loading branch information
anand1976 authored and facebook-github-bot committed Feb 2, 2024
1 parent 046ac91 commit 95b41ee
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 42 deletions.
250 changes: 210 additions & 40 deletions db/db_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,60 @@
#include "util/random.h"
#include "utilities/counted_fs.h"
#include "utilities/fault_injection_env.h"
#include "utilities/fault_injection_fs.h"
#include "utilities/merge_operators.h"
#include "utilities/merge_operators/string_append/stringappend.h"

namespace ROCKSDB_NAMESPACE {
namespace {
class MyFlushBlockPolicy : public FlushBlockPolicy {
public:
explicit MyFlushBlockPolicy(const int num_keys_in_block,
const BlockBuilder& data_block_builder)
: num_keys_in_block_(num_keys_in_block),
num_keys_(0),
data_block_builder_(data_block_builder) {}

bool Update(const Slice& /*key*/, const Slice& /*value*/) override {
if (data_block_builder_.empty()) {
// First key in this block
num_keys_ = 1;
return false;
}
// Flush every 10 keys
if (num_keys_ == num_keys_in_block_) {
num_keys_ = 1;
return true;
}
num_keys_++;
return false;
}

private:
const int num_keys_in_block_;
int num_keys_;
const BlockBuilder& data_block_builder_;
};

class MyFlushBlockPolicyFactory : public FlushBlockPolicyFactory {
public:
explicit MyFlushBlockPolicyFactory(const int num_keys_in_block)
: num_keys_in_block_(num_keys_in_block) {}

virtual const char* Name() const override {
return "MyFlushBlockPolicyFactory";
}

virtual FlushBlockPolicy* NewFlushBlockPolicy(
const BlockBasedTableOptions& /*table_options*/,
const BlockBuilder& data_block_builder) const override {
return new MyFlushBlockPolicy(num_keys_in_block_, data_block_builder);
}

private:
const int num_keys_in_block_;
};
} // namespace

static bool enable_io_uring = true;
extern "C" bool RocksDbIOUringEnable() { return enable_io_uring; }
Expand Down Expand Up @@ -1853,7 +1903,7 @@ TEST_P(DBMultiGetTestWithParam, MultiGetBatchedMultiLevel) {
}
}

TEST_P(DBMultiGetTestWithParam, MultiGetBatchedEmptyLevel) {
TEST_P(DBMultiGetTestWithParam, MultiGetDuplicatesEmptyLevel) {
#ifndef USE_COROUTINES
if (std::get<1>(GetParam())) {
ROCKSDB_GTEST_BYPASS("This test requires coroutine support");
Expand All @@ -1865,12 +1915,37 @@ TEST_P(DBMultiGetTestWithParam, MultiGetBatchedEmptyLevel) {
ROCKSDB_GTEST_BYPASS("This test is only for batched MultiGet");
return;
}
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(env_->GetFileSystem()));
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fault_fs));
Options options = CurrentOptions();
options.env = env.get();
options.disable_auto_compactions = true;
options.merge_operator = MergeOperators::CreateStringAppendOperator();

LRUCacheOptions cache_opts;
cache_opts.capacity = 1 << 20;

BlockBasedTableOptions table_opts;
table_opts.metadata_cache_options.top_level_index_pinning = PinningTier::kAll;
table_opts.metadata_cache_options.partition_pinning = PinningTier::kNone;
table_opts.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
table_opts.cache_index_and_filter_blocks = true;
table_opts.block_cache = cache_opts.MakeSharedCache();
table_opts.flush_block_policy_factory.reset(new MyFlushBlockPolicyFactory(1));

options.table_factory.reset(new BlockBasedTableFactory(table_opts));
Reopen(options);
int key;

// Setup the LSM so that the following search bounds are generated for
// key 9 for each level -
// Level 1 - lb = 0, rb = max
// Level 2 - lb = 0, rb = 0
// Level 3 - lb = 0, rb = -1
// Level 4 - lb = 0, rb = 0

key = 9;
ASSERT_OK(Put("key_" + std::to_string(key), "val_l2_" + std::to_string(key)));
ASSERT_OK(Flush());
Expand All @@ -1881,6 +1956,9 @@ TEST_P(DBMultiGetTestWithParam, MultiGetBatchedEmptyLevel) {
key = 9;
ASSERT_OK(
Merge("key_" + std::to_string(key), "val_l2_" + std::to_string(key)));
const Snapshot* snap = dbfull()->GetSnapshot();
ASSERT_OK(
Merge("key_" + std::to_string(key), "val_l2_ext_" + std::to_string(key)));
ASSERT_OK(Flush());
// Leave level 3 empty
MoveFilesToLevel(2);
Expand All @@ -1899,8 +1977,138 @@ TEST_P(DBMultiGetTestWithParam, MultiGetBatchedEmptyLevel) {
keys.push_back("key_" + std::to_string(9));
keys.push_back("key_" + std::to_string(9));

int num_reads = 0;
SyncPoint::GetInstance()->SetCallBack(
"FaultInjectionTestFS::RandomRead", [&](void*) {
++num_reads;
// Fail on the 2nd read. First read is index partition,
// second read is data block in level 1
if (num_reads == 2) {
fault_fs->SetFilesystemActive(false);
} else {
fault_fs->SetFilesystemActive(true);
}
});
SyncPoint::GetInstance()->EnableProcessing();
size_t capacity = table_opts.block_cache->GetCapacity();
table_opts.block_cache->SetCapacity(0);
table_opts.block_cache->SetCapacity(capacity);

values = MultiGet(keys, nullptr, std::get<1>(GetParam()));
ASSERT_EQ(values.size(), 2);

SyncPoint::GetInstance()->DisableProcessing();
dbfull()->ReleaseSnapshot(snap);
Destroy(options);
}

TEST_P(DBMultiGetTestWithParam, MultiGetDuplicatesNonEmptyLevel) {
#ifndef USE_COROUTINES
if (std::get<1>(GetParam())) {
ROCKSDB_GTEST_BYPASS("This test requires coroutine support");
return;
}
#endif // USE_COROUTINES
// Skip for unbatched MultiGet
if (!std::get<0>(GetParam())) {
ROCKSDB_GTEST_BYPASS("This test is only for batched MultiGet");
return;
}
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(env_->GetFileSystem()));
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fault_fs));
Options options = CurrentOptions();
options.env = env.get();
options.disable_auto_compactions = true;
options.merge_operator = MergeOperators::CreateStringAppendOperator();

LRUCacheOptions cache_opts;
cache_opts.capacity = 1 << 20;

BlockBasedTableOptions table_opts;
table_opts.metadata_cache_options.top_level_index_pinning = PinningTier::kAll;
table_opts.metadata_cache_options.partition_pinning = PinningTier::kNone;
table_opts.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
table_opts.cache_index_and_filter_blocks = true;
table_opts.block_cache = cache_opts.MakeSharedCache();
table_opts.flush_block_policy_factory.reset(new MyFlushBlockPolicyFactory(1));

options.table_factory.reset(new BlockBasedTableFactory(table_opts));
Reopen(options);
int key;

// Setup the LSM so that the following search bounds are generated for
// key 9 for each level -
// Level 1 - lb = 0, rb = max
// Level 2 - lb = 0, rb = 0
// Level 3 - lb = 0, rb = 1
// Level 4 - N/A

key = 8;
ASSERT_OK(Put("key_" + std::to_string(key), "val_l2_" + std::to_string(key)));
ASSERT_OK(Flush());
MoveFilesToLevel(4);

key = 7;
ASSERT_OK(Put("key_" + std::to_string(key), "val_l2_" + std::to_string(key)));
ASSERT_OK(Flush());

key = 9;
ASSERT_OK(Put("key_" + std::to_string(key), "val_l2_" + std::to_string(key)));
ASSERT_OK(Flush());
MoveFilesToLevel(3);

key = 5;
ASSERT_OK(Put("key_" + std::to_string(key), "val_l2_" + std::to_string(key)));
key = 9;
ASSERT_OK(
Merge("key_" + std::to_string(key), "merge1_l2_" + std::to_string(key)));
const Snapshot* snap = dbfull()->GetSnapshot();
ASSERT_OK(
Merge("key_" + std::to_string(key), "merge2_l2_" + std::to_string(key)));
ASSERT_OK(Flush());
MoveFilesToLevel(2);

key = 2;
ASSERT_OK(Put("key_" + std::to_string(key), "val_l2_" + std::to_string(key)));
key = 6;
ASSERT_OK(
Merge("key_" + std::to_string(key), "val_l2_" + std::to_string(key)));
ASSERT_OK(Flush());
MoveFilesToLevel(1);

std::vector<std::string> keys;
std::vector<std::string> values;

keys.push_back("key_" + std::to_string(9));
keys.push_back("key_" + std::to_string(9));

int num_reads = 0;
SyncPoint::GetInstance()->SetCallBack(
"FaultInjectionTestFS::RandomRead", [&](void*) {
++num_reads;
// Fail on the 2nd read. First read is index partition,
// second read is data block in level 1
if (num_reads == 2) {
fault_fs->SetFilesystemActive(false);
} else {
fault_fs->SetFilesystemActive(true);
}
});
SyncPoint::GetInstance()->EnableProcessing();
size_t capacity = table_opts.block_cache->GetCapacity();
table_opts.block_cache->SetCapacity(0);
table_opts.block_cache->SetCapacity(capacity);

values = MultiGet(keys, nullptr, std::get<1>(GetParam()));
ASSERT_EQ(values.size(), 2);
ASSERT_EQ(values[0], "Corruption: Not active");
ASSERT_EQ(values[1], "val_l2_9,merge1_l2_9,merge2_l2_9");

SyncPoint::GetInstance()->DisableProcessing();
dbfull()->ReleaseSnapshot(snap);
Destroy(options);
}

TEST_P(DBMultiGetTestWithParam, MultiGetBatchedMultiLevelMerge) {
Expand Down Expand Up @@ -3566,7 +3774,7 @@ class DBBasicTestMultiGet : public DBTestBase {
table_options.pin_l0_filter_and_index_blocks_in_cache = true;
}
table_options.flush_block_policy_factory.reset(
new MyFlushBlockPolicyFactory());
new MyFlushBlockPolicyFactory(10));
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
if (!compression_enabled_) {
options.compression = kNoCompression;
Expand Down Expand Up @@ -3654,44 +3862,6 @@ class DBBasicTestMultiGet : public DBTestBase {
static void TearDownTestCase() {}

protected:
class MyFlushBlockPolicyFactory : public FlushBlockPolicyFactory {
public:
MyFlushBlockPolicyFactory() {}

const char* Name() const override { return "MyFlushBlockPolicyFactory"; }

FlushBlockPolicy* NewFlushBlockPolicy(
const BlockBasedTableOptions& /*table_options*/,
const BlockBuilder& data_block_builder) const override {
return new MyFlushBlockPolicy(data_block_builder);
}
};

class MyFlushBlockPolicy : public FlushBlockPolicy {
public:
explicit MyFlushBlockPolicy(const BlockBuilder& data_block_builder)
: num_keys_(0), data_block_builder_(data_block_builder) {}

bool Update(const Slice& /*key*/, const Slice& /*value*/) override {
if (data_block_builder_.empty()) {
// First key in this block
num_keys_ = 1;
return false;
}
// Flush every 10 keys
if (num_keys_ == 10) {
num_keys_ = 1;
return true;
}
num_keys_++;
return false;
}

private:
int num_keys_;
const BlockBuilder& data_block_builder_;
};

class MyBlockCache : public CacheWrapper {
public:
explicit MyBlockCache(std::shared_ptr<Cache> target)
Expand Down
9 changes: 8 additions & 1 deletion db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@ class FilePickerMultiGet {
FdWithKeyRange* f = nullptr;
bool file_hit = false;
int cmp_largest = -1;
int cmp_smallest = -1;
if (curr_file_index >= curr_file_level_->num_files) {
// In the unlikely case the next key is a duplicate of the current key,
// and the current key is the last in the level and the internal key
Expand Down Expand Up @@ -642,7 +643,7 @@ class FilePickerMultiGet {
// Check if key is within a file's range. If search left bound and
// right bound point to the same find, we are sure key falls in
// range.
int cmp_smallest = user_comparator_->CompareWithoutTimestamp(
cmp_smallest = user_comparator_->CompareWithoutTimestamp(
user_key, false, ExtractUserKey(f->smallest_key), true);

assert(curr_level_ == 0 ||
Expand Down Expand Up @@ -688,6 +689,12 @@ class FilePickerMultiGet {
user_comparator_->CompareWithoutTimestamp(
batch_iter_->ukey_without_ts, false,
upper_key_->ukey_without_ts, false) == 0) {
if (curr_level_ > 0) {
struct FilePickerContext& ctx = fp_ctx_array_[upper_key_.index()];
file_indexer_->GetNextLevelIndex(
curr_level_, ctx.curr_index_in_curr_level, cmp_smallest,
cmp_largest, &ctx.search_left_bound, &ctx.search_right_bound);
}
++upper_key_;
}
break;
Expand Down
3 changes: 2 additions & 1 deletion include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,8 @@ class DB {
// (*values) will always be resized to be the same size as (keys).
// Similarly, the number of returned statuses will be the number of keys.
// Note: keys will not be "de-duplicated". Duplicate keys will return
// duplicate values in order.
// duplicate values in order, and may return different status values
// in case there are errors.
virtual std::vector<Status> MultiGet(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_family,
Expand Down
1 change: 1 addition & 0 deletions unreleased_history/bug_fixes/mget_duplicates.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed a bug that can, under rare circumstances, cause MultiGet to return an incorrect result for a duplicate key in a MultiGet batch.
1 change: 1 addition & 0 deletions utilities/fault_injection_fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ IOStatus TestFSRandomAccessFile::Read(uint64_t offset, size_t n,
const IOOptions& options, Slice* result,
char* scratch,
IODebugContext* dbg) const {
TEST_SYNC_POINT("FaultInjectionTestFS::RandomRead");
if (!fs_->IsFilesystemActive()) {
return fs_->GetError();
}
Expand Down

0 comments on commit 95b41ee

Please sign in to comment.