Skip to content

Commit

Permalink
Eliminate unnecessary (slow) block cache Ref()ing in MultiGet (facebo…
Browse files Browse the repository at this point in the history
…ok#9899)

Summary:
When MultiGet() determines that multiple query keys can be
served by examining the same data block in block cache (one Lookup()),
each PinnableSlice referring to data in that data block needs to hold
on to the block in cache so that they can be released at arbitrary
times by the API user. Historically this is accomplished with extra
calls to Ref() on the Handle from Lookup(), with each PinnableSlice
cleanup calling Release() on the Handle, but this creates extra
contention on the block cache for the extra Ref()s and Release()es,
especially because they hit the same cache shard repeatedly.

In the case of merge operands (possibly more cases?), the problem was
compounded by doing an extra Ref()+eventual Release() for each merge
operand for a key reusing a block (which could be the same key!), rather
than one Ref() per key. (Note: the non-shared case with `biter` was
already one per key.)

This change optimizes MultiGet not to rely on these extra, contentious
Ref()+Release() calls by instead, in the shared block case, wrapping
the cache Release() cleanup in a refcounted object referenced by the
PinnableSlices, such that after the last wrapped reference is released,
the cache entry is Release()ed. Relaxed atomic refcounts should be
much faster than mutex-guarded Ref() and Release(), and much less prone
to a performance cliff when MultiGet() does a lot of block sharing.

Note that I did not use std::shared_ptr, because that would require an
extra indirection object (shared_ptr itself new/delete) in order to
associate a ref increment/decrement with a Cleanable cleanup entry. (If
I assumed it was the size of two pointers, I could do some hackery to
make it work without the extra indirection, but that's too fragile.)

Some details:
* Fixed (removed) extra block cache tracing entries in cases of cache
entry reuse in MultiGet, but it's likely that in some other cases traces
are missing (XXX comment inserted)
* Moved existing implementations for cleanable.h from iterator.cc to
new cleanable.cc
* Improved API comments on Cleanable
* Added a public SharedCleanablePtr class to cleanable.h in case others
could benefit from the same pattern (potentially many Cleanables and/or
smart pointers referencing a shared Cleanable)
* Add a typedef for MultiGetContext::Mask
* Some variable renaming for clarity

Pull Request resolved: facebook#9899

Test Plan:
Added unit tests for SharedCleanablePtr.

Greatly enhanced ability of existing tests to detect cache use-after-free.
* Release PinnableSlices from MultiGet as they are read rather than in
bulk (in db_test_util wrapper).
* In ASAN build, default to using a trivially small LRUCache for block_cache
so that entries are immediately erased when unreferenced. (Updated two
tests that depend on caching.) New ASAN testsuite running time seems
OK to me.

If I introduce a bug into my implementation where we skip the shared
cleanups on block reuse, ASAN detects the bug in
`db_basic_test *MultiGet*`. If I remove either of the above testing
enhancements, the bug is not detected.

Consider for follow-up work: manipulate or randomize ordering of
PinnableSlice use and release from MultiGet db_test_util wrapper. But in
typical cases, natural ordering gives pretty good functional coverage.

Performance test:
In the extreme (but possible) case of MultiGetting the same or adjacent keys
in a batch, throughput can improve by an order of magnitude.
`./db_bench -benchmarks=multireadrandom -db=/dev/shm/testdb -readonly -num=5 -duration=10 -threads=20 -multiread_batched -batch_size=200`
Before ops/sec, num=5: 1,384,394
Before ops/sec, num=500: 6,423,720
After ops/sec, num=500: 10,658,794
After ops/sec, num=5: 16,027,257

Also note that previously, with high parallelism, having query keys
concentrated in a single block was worse than spreading them out a bit. Now
concentrated in a single block is faster than spread out, which is hopefully
consistent with natural expectation.

Random query performance: with num=1000000, over 999 x 10s runs running before & after simultaneously (each -threads=12):
Before: multireadrandom [AVG    999 runs] : 1088699 (± 7344) ops/sec;  120.4 (± 0.8 ) MB/sec
After: multireadrandom [AVG    999 runs] : 1090402 (± 7230) ops/sec;  120.6 (± 0.8 ) MB/sec
Possibly better, possibly in the noise.

Reviewed By: anand1976

Differential Revision: D35907003

Pulled By: pdillinger

fbshipit-source-id: bbd244d703649a8ca12d476f2d03853ed9d1a17e
  • Loading branch information
pdillinger authored and facebook-github-bot committed Apr 27, 2022
1 parent ce2d8a4 commit 9d0cae7
Show file tree
Hide file tree
Showing 14 changed files with 472 additions and 136 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,7 @@ set(SOURCES
trace_replay/trace_record_result.cc
trace_replay/trace_record.cc
trace_replay/trace_replay.cc
util/cleanable.cc
util/coding.cc
util/compaction_job_stats_impl.cc
util/comparator.cc
Expand Down
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

### Bug Fixes
* RocksDB calls FileSystem::Poll API during FilePrefetchBuffer destruction which impacts performance as it waits for read requets completion which is not needed anymore. Calling FileSystem::AbortIO to abort those requests instead fixes that performance issue.
* Fixed unnecessary block cache contention when queries within a MultiGet batch and across parallel batches access the same data block, which previously could cause severely degraded performance in this unusual case. (In more typical MultiGet cases, this fix is expected to yield a small or negligible performance improvement.)

## 7.2.0 (04/15/2022)
### Bug Fixes
Expand Down
2 changes: 2 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"trace_replay/trace_record_result.cc",
"trace_replay/trace_replay.cc",
"util/build_version.cc",
"util/cleanable.cc",
"util/coding.cc",
"util/compaction_job_stats_impl.cc",
"util/comparator.cc",
Expand Down Expand Up @@ -543,6 +544,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"trace_replay/trace_record_result.cc",
"trace_replay/trace_replay.cc",
"util/build_version.cc",
"util/cleanable.cc",
"util/coding.cc",
"util/compaction_job_stats_impl.cc",
"util/comparator.cc",
Expand Down
1 change: 1 addition & 0 deletions db/db_bloom_filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ TEST_P(DBBloomFilterTestDefFormatVersion, KeyMayExist) {
options_override.filter_policy = Create(20, bfp_impl_);
options_override.partition_filters = partition_filters_;
options_override.metadata_block_size = 32;
options_override.full_block_cache = true;
Options options = CurrentOptions(options_override);
if (partition_filters_) {
auto* table_options =
Expand Down
5 changes: 4 additions & 1 deletion db/db_iterator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,12 @@ TEST_P(DBIteratorTest, PersistedTierOnIterator) {
TEST_P(DBIteratorTest, NonBlockingIteration) {
do {
ReadOptions non_blocking_opts, regular_opts;
Options options = CurrentOptions();
anon::OptionsOverride options_override;
options_override.full_block_cache = true;
Options options = CurrentOptions(options_override);
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
non_blocking_opts.read_tier = kBlockCacheTier;

CreateAndReopenWithCF({"pikachu"}, options);
// write one kv to the database.
ASSERT_OK(Put(1, "a", "b"));
Expand Down
38 changes: 28 additions & 10 deletions db/db_test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

#include "db/forward_iterator.h"
#include "env/mock_env.h"
#include "port/lang.h"
#include "rocksdb/cache.h"
#include "rocksdb/convenience.h"
#include "rocksdb/env_encryption.h"
#include "rocksdb/unique_id.h"
Expand Down Expand Up @@ -360,6 +362,17 @@ Options DBTestBase::GetOptions(
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
"NewWritableFile:O_DIRECT");
#endif
// kMustFreeHeapAllocations -> indicates ASAN build
if (kMustFreeHeapAllocations && !options_override.full_block_cache) {
// Detecting block cache use-after-free is normally difficult in unit
// tests, because as a cache, it tends to keep unreferenced entries in
// memory, and we normally want unit tests to take advantage of block
// cache for speed. However, we also want a strong chance of detecting
// block cache use-after-free in unit tests in ASAN builds, so for ASAN
// builds we use a trivially small block cache to which entries can be
// added but are immediately freed on no more references.
table_options.block_cache = NewLRUCache(/* too small */ 1);
}

bool can_allow_mmap = IsMemoryMappedAccessSupported();
switch (option_config) {
Expand Down Expand Up @@ -831,7 +844,7 @@ std::vector<std::string> DBTestBase::MultiGet(std::vector<int> cfs,
std::vector<Status> s;
if (!batched) {
s = db_->MultiGet(options, handles, keys, &result);
for (unsigned int i = 0; i < s.size(); ++i) {
for (size_t i = 0; i < s.size(); ++i) {
if (s[i].IsNotFound()) {
result[i] = "NOT_FOUND";
} else if (!s[i].ok()) {
Expand All @@ -844,13 +857,16 @@ std::vector<std::string> DBTestBase::MultiGet(std::vector<int> cfs,
s.resize(cfs.size());
db_->MultiGet(options, cfs.size(), handles.data(), keys.data(),
pin_values.data(), s.data());
for (unsigned int i = 0; i < s.size(); ++i) {
for (size_t i = 0; i < s.size(); ++i) {
if (s[i].IsNotFound()) {
result[i] = "NOT_FOUND";
} else if (!s[i].ok()) {
result[i] = s[i].ToString();
} else {
result[i].assign(pin_values[i].data(), pin_values[i].size());
// Increase likelihood of detecting potential use-after-free bugs with
// PinnableSlices tracking the same resource
pin_values[i].Reset();
}
}
}
Expand All @@ -863,23 +879,25 @@ std::vector<std::string> DBTestBase::MultiGet(const std::vector<std::string>& k,
options.verify_checksums = true;
options.snapshot = snapshot;
std::vector<Slice> keys;
std::vector<std::string> result;
std::vector<std::string> result(k.size());
std::vector<Status> statuses(k.size());
std::vector<PinnableSlice> pin_values(k.size());

for (unsigned int i = 0; i < k.size(); ++i) {
for (size_t i = 0; i < k.size(); ++i) {
keys.push_back(k[i]);
}
db_->MultiGet(options, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), pin_values.data(), statuses.data());
result.resize(k.size());
for (auto iter = result.begin(); iter != result.end(); ++iter) {
iter->assign(pin_values[iter - result.begin()].data(),
pin_values[iter - result.begin()].size());
}
for (unsigned int i = 0; i < statuses.size(); ++i) {
for (size_t i = 0; i < statuses.size(); ++i) {
if (statuses[i].IsNotFound()) {
result[i] = "NOT_FOUND";
} else if (!statuses[i].ok()) {
result[i] = statuses[i].ToString();
} else {
result[i].assign(pin_values[i].data(), pin_values[i].size());
// Increase likelihood of detecting potential use-after-free bugs with
// PinnableSlices tracking the same resource
pin_values[i].Reset();
}
}
return result;
Expand Down
3 changes: 3 additions & 0 deletions db/db_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ struct OptionsOverride {
std::shared_ptr<const FilterPolicy> filter_policy = nullptr;
// These will be used only if filter_policy is set
bool partition_filters = false;
// Force using a default block cache. (Setting to false allows ASAN build
// use a trivially small block cache for better UAF error detection.)
bool full_block_cache = false;
uint64_t metadata_block_size = 1024;

// Used as a bit mask of individual enums in which to skip an XF test point
Expand Down
61 changes: 59 additions & 2 deletions include/rocksdb/cleanable.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,36 @@ class Cleanable {
Cleanable(Cleanable&) = delete;
Cleanable& operator=(Cleanable&) = delete;

// Executes all the registered cleanups
~Cleanable();

// Move constructor and move assignment is allowed.
Cleanable(Cleanable&&);
Cleanable& operator=(Cleanable&&);
Cleanable(Cleanable&&) noexcept;
Cleanable& operator=(Cleanable&&) noexcept;

// Clients are allowed to register function/arg1/arg2 triples that
// will be invoked when this iterator is destroyed.
//
// Note that unlike all of the preceding methods, this method is
// not abstract and therefore clients should not override it.
using CleanupFunction = void (*)(void* arg1, void* arg2);

// Add another Cleanup to the list
void RegisterCleanup(CleanupFunction function, void* arg1, void* arg2);

// Move the cleanups owned by this Cleanable to another Cleanable, adding to
// any existing cleanups it has
void DelegateCleanupsTo(Cleanable* other);

// DoCleanup and also resets the pointers for reuse
inline void Reset() {
DoCleanup();
cleanup_.function = nullptr;
cleanup_.next = nullptr;
}

inline bool HasCleanups() { return cleanup_.function != nullptr; }

protected:
struct Cleanup {
CleanupFunction function;
Expand Down Expand Up @@ -68,4 +77,52 @@ class Cleanable {
}
};

// A copyable, reference-counted pointer to a simple Cleanable that only
// performs registered cleanups after all copies are destroy. This is like
// shared_ptr<Cleanable> but works more efficiently with wrapping the pointer
// in an outer Cleanable (see RegisterCopyWith() and MoveAsCleanupTo()).
// WARNING: if you create a reference cycle, for example:
// SharedCleanablePtr scp;
// scp.Allocate();
// scp.RegisterCopyWith(&*scp);
// It will prevent cleanups from ever happening!
class SharedCleanablePtr {
public:
// Empy/null pointer
SharedCleanablePtr() {}
// Copy and move constructors and assignment
SharedCleanablePtr(const SharedCleanablePtr& from);
SharedCleanablePtr(SharedCleanablePtr&& from) noexcept;
SharedCleanablePtr& operator=(const SharedCleanablePtr& from);
SharedCleanablePtr& operator=(SharedCleanablePtr&& from) noexcept;
// Destructor (decrement refcount if non-null)
~SharedCleanablePtr();
// Create a new simple Cleanable and make this assign this pointer to it.
// (Reset()s first if necessary.)
void Allocate();
// Reset to empty/null (decrement refcount if previously non-null)
void Reset();
// Dereference to pointed-to Cleanable
Cleanable& operator*();
Cleanable* operator->();
// Get as raw pointer to Cleanable
Cleanable* get();

// Creates a (virtual) copy of this SharedCleanablePtr and registers its
// destruction with target, so that the cleanups registered with the
// Cleanable pointed to by this can only happen after the cleanups in the
// target Cleanable are run.
// No-op if this is empty (nullptr).
void RegisterCopyWith(Cleanable* target);

// Moves (virtually) this shared pointer to a new cleanup in the target.
// This is essentilly a move semantics version of RegisterCopyWith(), for
// performance optimization. No-op if this is empty (nullptr).
void MoveAsCleanupTo(Cleanable* target);

private:
struct Impl;
Impl* ptr_ = nullptr;
};

} // namespace ROCKSDB_NAMESPACE
1 change: 1 addition & 0 deletions src.mk
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ LIB_SOURCES = \
trace_replay/block_cache_tracer.cc \
trace_replay/io_tracer.cc \
util/build_version.cc \
util/cleanable.cc \
util/coding.cc \
util/compaction_job_stats_impl.cc \
util/comparator.cc \
Expand Down
Loading

0 comments on commit 9d0cae7

Please sign in to comment.