Skip to content

Commit 16fc3d3

Browse files
committed
BucketList cache
1 parent fdd833d commit 16fc3d3

File tree

5 files changed

+177
-44
lines changed

5 files changed

+177
-44
lines changed

src/bucket/BucketIndex.h

+11-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,9 @@ class BucketIndex : public NonMovableOrCopyable
8080
IndividualIndex::const_iterator>;
8181

8282
inline static const std::string DB_BACKEND_STATE = "bl";
83-
inline static const uint32_t BUCKET_INDEX_VERSION = 4;
83+
inline static const uint32_t BUCKET_INDEX_VERSION = 5;
84+
inline static const uint32_t CACHE_SIZE = 1'000'000;
85+
inline static const uint64_t INDIVIDUAL_CACHE_CUTOFF_SIZE = 100'000'000'000;
8486

8587
// Returns true if LedgerEntryType not supported by BucketListDB
8688
static bool typeNotSupported(LedgerEntryType t);
@@ -130,13 +132,21 @@ class BucketIndex : public NonMovableOrCopyable
130132
virtual std::optional<std::pair<std::streamoff, std::streamoff>>
131133
getOfferRange() const = 0;
132134

135+
// Returns true if cache hit occurred
136+
virtual std::pair<std::shared_ptr<BucketEntry>, bool>
137+
getFromCache(LedgerKey const& k) const = 0;
138+
139+
virtual void addToCache(std::shared_ptr<BucketEntry> be) const = 0;
140+
133141
// Returns page size for index. InidividualIndex returns 0 for page size
134142
virtual std::streamoff getPageSize() const = 0;
135143

136144
virtual Iterator begin() const = 0;
137145

138146
virtual Iterator end() const = 0;
139147

148+
virtual bool isFullyCached() const = 0;
149+
140150
virtual void markBloomMiss() const = 0;
141151
virtual void markBloomLookup() const = 0;
142152
virtual BucketEntryCounters const& getBucketEntryCounters() const = 0;

src/bucket/BucketIndexImpl.cpp

+75-3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "util/LogSlowExecution.h"
1919
#include "util/Logging.h"
2020
#include "util/XDRStream.h"
21+
#include "xdr/Stellar-ledger-entries.h"
2122

2223
#include <Tracy.hpp>
2324
#include <cereal/archives/binary.hpp>
@@ -28,6 +29,8 @@
2829
#include <fmt/format.h>
2930

3031
#include <memory>
32+
#include <mutex>
33+
#include <shared_mutex>
3134
#include <thread>
3235
#include <type_traits>
3336
#include <xdrpp/marshal.h>
@@ -91,12 +94,15 @@ BucketIndexImpl<IndexT>::BucketIndexImpl(BucketManager& bm,
9194
auto timer = LogSlowExecution("Indexing bucket");
9295
mData.pageSize = pageSize;
9396

97+
auto fileSize = std::filesystem::file_size(filename);
98+
bool inMemoryMap = fileSize < INDIVIDUAL_CACHE_CUTOFF_SIZE &&
99+
std::is_same_v<BucketEntryT, BucketEntry>;
100+
94101
// We don't have a good way of estimating IndividualIndex size since
95102
// keys are variable size, so only reserve range indexes since we know
96103
// the page size ahead of time
97-
if constexpr (std::is_same<IndexT, RangeIndex>::value)
104+
if (std::is_same_v<IndexT, RangeIndex>)
98105
{
99-
auto fileSize = std::filesystem::file_size(filename);
100106
auto estimatedIndexEntries = fileSize / mData.pageSize;
101107
mData.keysToOffset.reserve(estimatedIndexEntries);
102108
}
@@ -187,6 +193,15 @@ BucketIndexImpl<IndexT>::BucketIndexImpl(BucketManager& bm,
187193
}
188194
}
189195

196+
if constexpr (std::is_same_v<BucketEntryT, LiveBucket::EntryT>)
197+
{
198+
if (inMemoryMap)
199+
{
200+
mData.inMemoryMap[key] =
201+
std::make_shared<BucketEntry>(be);
202+
}
203+
}
204+
190205
if constexpr (std::is_same_v<IndexT, RangeIndex>)
191206
{
192207
auto keyBuf = xdr::xdr_to_opaque(key);
@@ -299,7 +314,8 @@ template <class IndexT>
299314
template <class Archive>
300315
BucketIndexImpl<IndexT>::BucketIndexImpl(BucketManager const& bm, Archive& ar,
301316
std::streamoff pageSize)
302-
: mBloomMissMeter(bm.getBloomMissMeter())
317+
: mData()
318+
, mBloomMissMeter(bm.getBloomMissMeter())
303319
, mBloomLookupMeter(bm.getBloomLookupMeter())
304320
{
305321
mData.pageSize = pageSize;
@@ -572,6 +588,48 @@ BucketIndexImpl<IndexT>::getOfferRange() const
572588
return getOffsetBounds(lowerBound, upperBound);
573589
}
574590

591+
template <class IndexT>
592+
std::pair<std::shared_ptr<BucketEntry>, bool>
593+
BucketIndexImpl<IndexT>::getFromCache(LedgerKey const& k) const
594+
{
595+
if (mData.inMemoryMap.empty())
596+
{
597+
std::lock_guard<std::shared_mutex> lock(mData.cacheLock);
598+
auto* ptr = mData.inMemoryCache.maybeGet(k);
599+
if (ptr)
600+
{
601+
return {*ptr, true};
602+
}
603+
else
604+
{
605+
return {nullptr, false};
606+
}
607+
}
608+
else
609+
{
610+
auto iter = mData.inMemoryMap.find(k);
611+
if (iter == mData.inMemoryMap.end())
612+
{
613+
return {nullptr, true};
614+
}
615+
else
616+
{
617+
return {iter->second, true};
618+
}
619+
}
620+
}
621+
622+
template <class IndexT>
623+
void
624+
BucketIndexImpl<IndexT>::addToCache(std::shared_ptr<BucketEntry> be) const
625+
{
626+
if (mData.inMemoryMap.empty())
627+
{
628+
std::unique_lock<std::shared_mutex> lock(mData.cacheLock);
629+
mData.inMemoryCache.put(getBucketLedgerKey(*be), be);
630+
}
631+
}
632+
575633
#ifdef BUILD_TESTS
576634
template <class IndexT>
577635
bool
@@ -620,6 +678,20 @@ BucketIndexImpl<IndexT>::operator==(BucketIndex const& inRaw) const
620678
return false;
621679
}
622680

681+
if (mData.inMemoryMap.size() != in.mData.inMemoryMap.size())
682+
{
683+
return false;
684+
}
685+
686+
for (auto const& [key, entry] : mData.inMemoryMap)
687+
{
688+
auto iter = in.mData.inMemoryMap.find(key);
689+
if (iter == in.mData.inMemoryMap.end() || !(*entry == *iter->second))
690+
{
691+
return false;
692+
}
693+
}
694+
623695
return true;
624696
}
625697
#endif

src/bucket/BucketIndexImpl.h

+26-3
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,18 @@
66

77
#include "bucket/BucketIndex.h"
88
#include "bucket/LiveBucket.h"
9+
#include "ledger/LedgerHashUtils.h"
910
#include "medida/meter.h"
1011
#include "util/BinaryFuseFilter.h"
12+
#include "util/RandomEvictionCache.h"
13+
#include "xdr/Stellar-ledger-entries.h"
1114
#include "xdr/Stellar-types.h"
1215

1316
#include "util/BufferedAsioCerealOutputArchive.h"
1417
#include <cereal/types/map.hpp>
1518
#include <map>
1619
#include <memory>
20+
#include <shared_mutex>
1721

1822
namespace stellar
1923
{
@@ -34,15 +38,19 @@ template <class IndexT> class BucketIndexImpl : public BucketIndex
3438
std::streamoff pageSize{};
3539
std::unique_ptr<BinaryFuseFilter16> filter{};
3640
std::map<Asset, std::vector<PoolID>> assetToPoolID{};
41+
std::map<LedgerKey, std::shared_ptr<BucketEntry>> inMemoryMap;
42+
mutable RandomEvictionCache<LedgerKey, std::shared_ptr<BucketEntry>>
43+
inMemoryCache;
44+
mutable std::shared_mutex cacheLock;
3745
BucketEntryCounters counters{};
3846

3947
template <class Archive>
4048
void
4149
save(Archive& ar) const
4250
{
4351
auto version = BUCKET_INDEX_VERSION;
44-
ar(version, pageSize, assetToPoolID, keysToOffset, filter,
45-
counters);
52+
ar(version, pageSize, assetToPoolID, keysToOffset, filter, counters,
53+
inMemoryMap);
4654
}
4755

4856
// Note: version and pageSize must be loaded before this function is
@@ -53,7 +61,11 @@ template <class IndexT> class BucketIndexImpl : public BucketIndex
5361
void
5462
load(Archive& ar)
5563
{
56-
ar(assetToPoolID, keysToOffset, filter, counters);
64+
ar(assetToPoolID, keysToOffset, filter, counters, inMemoryMap);
65+
}
66+
67+
SerializableBucketIndex() : inMemoryCache(CACHE_SIZE)
68+
{
5769
}
5870
} mData;
5971

@@ -100,6 +112,11 @@ template <class IndexT> class BucketIndexImpl : public BucketIndex
100112
virtual std::optional<std::pair<std::streamoff, std::streamoff>>
101113
getOfferRange() const override;
102114

115+
virtual std::pair<std::shared_ptr<BucketEntry>, bool>
116+
getFromCache(LedgerKey const& k) const override;
117+
118+
virtual void addToCache(std::shared_ptr<BucketEntry> be) const override;
119+
103120
virtual std::streamoff
104121
getPageSize() const override
105122
{
@@ -118,6 +135,12 @@ template <class IndexT> class BucketIndexImpl : public BucketIndex
118135
return mData.keysToOffset.end();
119136
}
120137

138+
virtual bool
139+
isFullyCached() const override
140+
{
141+
return !mData.inMemoryMap.empty();
142+
}
143+
121144
virtual void markBloomMiss() const override;
122145
virtual void markBloomLookup() const override;
123146
virtual BucketEntryCounters const& getBucketEntryCounters() const override;

src/bucket/BucketSnapshot.cpp

+64-36
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,12 @@ BucketSnapshotBase<BucketT>::getEntryAtOffset(LedgerKey const& k,
6363
}
6464
else if (stream.readPage(be, k, pageSize))
6565
{
66-
return {std::make_shared<typename BucketT::EntryT>(be), false};
66+
auto ret = std::make_shared<typename BucketT::EntryT>(be);
67+
if constexpr (std::is_same_v<BucketT, LiveBucket>)
68+
{
69+
mBucket->getIndex().addToCache(ret);
70+
}
71+
return {ret, false};
6772
}
6873

6974
// Mark entry miss for metrics
@@ -81,6 +86,15 @@ BucketSnapshotBase<BucketT>::getBucketEntry(LedgerKey const& k) const
8186
return {nullptr, false};
8287
}
8388

89+
if constexpr (std::is_same_v<BucketT, LiveBucket>)
90+
{
91+
auto [entryOp, hit] = mBucket->getIndex().getFromCache(k);
92+
if (hit)
93+
{
94+
return {entryOp, false};
95+
}
96+
}
97+
8498
auto pos = mBucket->getIndex().lookup(k);
8599
if (pos.has_value())
86100
{
@@ -111,53 +125,67 @@ BucketSnapshotBase<BucketT>::loadKeys(
111125
auto currKeyIt = keys.begin();
112126
auto const& index = mBucket->getIndex();
113127
auto indexIter = index.begin();
114-
while (currKeyIt != keys.end() && indexIter != index.end())
128+
129+
while (currKeyIt != keys.end() &&
130+
(indexIter != index.end() || index.isFullyCached()))
115131
{
116-
auto [offOp, newIndexIter] = index.scan(indexIter, *currKeyIt);
117-
indexIter = newIndexIter;
118-
if (offOp)
132+
std::shared_ptr<typename BucketT::EntryT> entryOp{};
133+
bool cacheHit = false;
134+
if constexpr (std::is_same_v<BucketT, LiveBucket>)
135+
{
136+
std::tie(entryOp, cacheHit) = index.getFromCache(*currKeyIt);
137+
}
138+
139+
if (!cacheHit)
119140
{
120-
auto [entryOp, bloomMiss] = getEntryAtOffset(
141+
std::optional<std::streamoff> offOp{};
142+
auto bloomMiss = false;
143+
std::tie(offOp, indexIter) = index.scan(indexIter, *currKeyIt);
144+
if (!offOp)
145+
{
146+
++currKeyIt;
147+
continue;
148+
}
149+
150+
std::tie(entryOp, bloomMiss) = getEntryAtOffset(
121151
*currKeyIt, *offOp, mBucket->getIndex().getPageSize());
152+
}
122153

123-
if (entryOp)
154+
if (entryOp)
155+
{
156+
// Don't return tombstone entries, as these do not exist wrt
157+
// ledger state
158+
if (!BucketT::isTombstoneEntry(*entryOp))
124159
{
125-
// Don't return tombstone entries, as these do not exist wrt
126-
// ledger state
127-
if (!BucketT::isTombstoneEntry(*entryOp))
160+
// Only live bucket loads can be metered
161+
if constexpr (std::is_same_v<BucketT, LiveBucket>)
128162
{
129-
// Only live bucket loads can be metered
130-
if constexpr (std::is_same_v<BucketT, LiveBucket>)
163+
bool addEntry = true;
164+
if (lkMeter)
131165
{
132-
bool addEntry = true;
133-
if (lkMeter)
134-
{
135-
// Here, we are metering after the entry has been
136-
// loaded. This is because we need to know the size
137-
// of the entry to meter it. Future work will add
138-
// metering at the xdr level.
139-
auto entrySize =
140-
xdr::xdr_size(entryOp->liveEntry());
141-
addEntry = lkMeter->canLoad(*currKeyIt, entrySize);
142-
lkMeter->updateReadQuotasForKey(*currKeyIt,
143-
entrySize);
144-
}
145-
if (addEntry)
146-
{
147-
result.push_back(entryOp->liveEntry());
148-
}
166+
// Here, we are metering after the entry has been
167+
// loaded. This is because we need to know the size
168+
// of the entry to meter it. Future work will add
169+
// metering at the xdr level.
170+
auto entrySize = xdr::xdr_size(entryOp->liveEntry());
171+
addEntry = lkMeter->canLoad(*currKeyIt, entrySize);
172+
lkMeter->updateReadQuotasForKey(*currKeyIt, entrySize);
149173
}
150-
else
174+
if (addEntry)
151175
{
152-
static_assert(std::is_same_v<BucketT, HotArchiveBucket>,
153-
"unexpected bucket type");
154-
result.push_back(*entryOp);
176+
result.push_back(entryOp->liveEntry());
155177
}
156178
}
157-
158-
currKeyIt = keys.erase(currKeyIt);
159-
continue;
179+
else
180+
{
181+
static_assert(std::is_same_v<BucketT, HotArchiveBucket>,
182+
"unexpected bucket type");
183+
result.push_back(*entryOp);
184+
}
160185
}
186+
187+
currKeyIt = keys.erase(currKeyIt);
188+
continue;
161189
}
162190

163191
++currKeyIt;

src/main/Config.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ Config::Config() : NODE_SEED(SecretKey::random())
162162
DEPRECATED_SQL_LEDGER_STATE = false;
163163
BUCKETLIST_DB_INDEX_PAGE_SIZE_EXPONENT = 14; // 2^14 == 16 kb
164164
BUCKETLIST_DB_INDEX_CUTOFF = 20; // 20 mb
165-
BUCKETLIST_DB_PERSIST_INDEX = true;
165+
BUCKETLIST_DB_PERSIST_INDEX = false;
166166
BACKGROUND_EVICTION_SCAN = true;
167167
PUBLISH_TO_ARCHIVE_DELAY = std::chrono::seconds{0};
168168
// automatic maintenance settings:

0 commit comments

Comments
 (0)