Skip to content

Commit fbb53d8

Browse files
authored
Switch read-only LedgerTxn to BucketList snapshots (#4431)
Resolves #4315 and #3800 This PR automatically switches overlay/herder to using BucketListDB snapshots instead of LedgerTxn. This change introduces a single "read-only ledger state snapshot" interface, which supports both SQL (via LedgerTxn) and BucketListDB snapshots. A unified interface avoids invasive changes to the rest of the codebase to support both snapshot types, and allows to have a centralized validation flow for both overlay and apply time. A notable change is the removal of nested LegderTxn in validation paths, and using loadWithoutRecord instead in newer protocols. This creates a stronger invariant that validation flow is only meant to be accessing read-only version of the ledger, and prevents it from making modifications by accident. Note that nested LedgerTxn is preserved for older buggy versions of the protocol (making the code a bit uglier than I would have preferred, I'm open to ideas on simplifying/refactoring old protocol logic out of the critical path).
2 parents 64da5e9 + 2f6c917 commit fbb53d8

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+1119
-699
lines changed

src/bucket/BucketListSnapshot.cpp

+7-5
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@
1313
namespace stellar
1414
{
1515

16-
BucketListSnapshot::BucketListSnapshot(BucketList const& bl, uint32_t ledgerSeq)
17-
: mLedgerSeq(ledgerSeq)
16+
BucketListSnapshot::BucketListSnapshot(BucketList const& bl,
17+
LedgerHeader header)
18+
: mHeader(std::move(header))
1819
{
1920
releaseAssert(threadIsMain());
2021

@@ -26,7 +27,7 @@ BucketListSnapshot::BucketListSnapshot(BucketList const& bl, uint32_t ledgerSeq)
2627
}
2728

2829
BucketListSnapshot::BucketListSnapshot(BucketListSnapshot const& snapshot)
29-
: mLevels(snapshot.mLevels), mLedgerSeq(snapshot.mLedgerSeq)
30+
: mLevels(snapshot.mLevels), mHeader(snapshot.mHeader)
3031
{
3132
}
3233

@@ -39,7 +40,7 @@ BucketListSnapshot::getLevels() const
3940
uint32_t
4041
BucketListSnapshot::getLedgerSeq() const
4142
{
42-
return mLedgerSeq;
43+
return mHeader.ledgerSeq;
4344
}
4445

4546
// Loops through all buckets in the given snapshot, starting with curr at level
@@ -177,7 +178,7 @@ SearchableBucketListSnapshot::scanForEviction(
177178
}
178179

179180
std::shared_ptr<LedgerEntry>
180-
SearchableBucketListSnapshot::getLedgerEntry(LedgerKey const& k)
181+
SearchableBucketListSnapshot::load(LedgerKey const& k)
181182
{
182183
ZoneScoped;
183184
mSnapshotManager.maybeUpdateSnapshot(mSnapshot, mHistoricalSnapshots);
@@ -389,4 +390,5 @@ SearchableBucketListSnapshot::SearchableBucketListSnapshot(
389390
// Initialize snapshot from SnapshotManager
390391
mSnapshotManager.maybeUpdateSnapshot(mSnapshot, mHistoricalSnapshots);
391392
}
393+
392394
}

src/bucket/BucketListSnapshot.h

+14-4
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,23 @@ class BucketListSnapshot : public NonMovable
3030
private:
3131
std::vector<BucketLevelSnapshot> mLevels;
3232

33-
// ledgerSeq that this BucketList snapshot is based off of
34-
uint32_t mLedgerSeq;
33+
// LedgerHeader associated with this ledger state snapshot
34+
LedgerHeader const mHeader;
3535

3636
public:
37-
BucketListSnapshot(BucketList const& bl, uint32_t ledgerSeq);
37+
BucketListSnapshot(BucketList const& bl, LedgerHeader hhe);
3838

3939
// Only allow copies via constructor
4040
BucketListSnapshot(BucketListSnapshot const& snapshot);
4141
BucketListSnapshot& operator=(BucketListSnapshot const&) = delete;
4242

4343
std::vector<BucketLevelSnapshot> const& getLevels() const;
4444
uint32_t getLedgerSeq() const;
45+
LedgerHeader const&
46+
getLedgerHeader() const
47+
{
48+
return mHeader;
49+
}
4550
};
4651

4752
// A lightweight wrapper around BucketListSnapshot for thread safe BucketListDB
@@ -79,7 +84,7 @@ class SearchableBucketListSnapshot : public NonMovableOrCopyable
7984
std::vector<InflationWinner> loadInflationWinners(size_t maxWinners,
8085
int64_t minBalance);
8186

82-
std::shared_ptr<LedgerEntry> getLedgerEntry(LedgerKey const& k);
87+
std::shared_ptr<LedgerEntry> load(LedgerKey const& k);
8388

8489
// Loads inKeys from the specified historical snapshot. Returns
8590
// <load_result_vec, true> if the snapshot for the given ledger is
@@ -97,5 +102,10 @@ class SearchableBucketListSnapshot : public NonMovableOrCopyable
97102
std::shared_ptr<EvictionStatistics> stats,
98103
StateArchivalSettings const& sas);
99104
uint32_t getLedgerSeq() const;
105+
LedgerHeader const&
106+
getLedgerHeader() const
107+
{
108+
return mSnapshot->getLedgerHeader();
109+
}
100110
};
101111
}

src/bucket/BucketManager.h

+3-4
Original file line numberDiff line numberDiff line change
@@ -265,10 +265,9 @@ class BucketManager : NonMovableOrCopyable
265265

266266
// Feed a new batch of entries to the bucket list. This interface expects to
267267
// be given separate init (created) and live (updated) entry vectors. The
268-
// `currLedger` and `currProtocolVersion` values should be taken from the
269-
// ledger at which this batch is being added.
270-
virtual void addBatch(Application& app, uint32_t currLedger,
271-
uint32_t currLedgerProtocol,
268+
// `header` value should be taken from the ledger at which this batch is
269+
// being added.
270+
virtual void addBatch(Application& app, LedgerHeader header,
272271
std::vector<LedgerEntry> const& initEntries,
273272
std::vector<LedgerEntry> const& liveEntries,
274273
std::vector<LedgerKey> const& deadEntries) = 0;

src/bucket/BucketManagerImpl.cpp

+7-19
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,9 @@ BucketManagerImpl::initialize()
127127
if (mApp.getConfig().isUsingBucketListDB())
128128
{
129129
mSnapshotManager = std::make_unique<BucketSnapshotManager>(
130-
mApp, std::make_unique<BucketListSnapshot>(*mBucketList, 0),
130+
mApp,
131+
std::make_unique<BucketListSnapshot>(*mBucketList,
132+
LedgerHeader()),
131133
mApp.getConfig().QUERY_SNAPSHOT_LEDGERS);
132134
}
133135
}
@@ -884,8 +886,7 @@ BucketManagerImpl::forgetUnreferencedBuckets()
884886
}
885887

886888
void
887-
BucketManagerImpl::addBatch(Application& app, uint32_t currLedger,
888-
uint32_t currLedgerProtocol,
889+
BucketManagerImpl::addBatch(Application& app, LedgerHeader header,
889890
std::vector<LedgerEntry> const& initEntries,
890891
std::vector<LedgerEntry> const& liveEntries,
891892
std::vector<LedgerKey> const& deadEntries)
@@ -895,21 +896,15 @@ BucketManagerImpl::addBatch(Application& app, uint32_t currLedger,
895896
#ifdef BUILD_TESTS
896897
if (mUseFakeTestValuesForNextClose)
897898
{
898-
currLedgerProtocol = mFakeTestProtocolVersion;
899+
header.ledgerVersion = mFakeTestProtocolVersion;
899900
}
900901
#endif
901902
auto timer = mBucketAddBatch.TimeScope();
902903
mBucketObjectInsertBatch.Mark(initEntries.size() + liveEntries.size() +
903904
deadEntries.size());
904-
mBucketList->addBatch(app, currLedger, currLedgerProtocol, initEntries,
905-
liveEntries, deadEntries);
905+
mBucketList->addBatch(app, header.ledgerSeq, header.ledgerVersion,
906+
initEntries, liveEntries, deadEntries);
906907
mBucketListSizeCounter.set_count(mBucketList->getSize());
907-
908-
if (app.getConfig().isUsingBucketListDB())
909-
{
910-
mSnapshotManager->updateCurrentSnapshot(
911-
std::make_unique<BucketListSnapshot>(*mBucketList, currLedger));
912-
}
913908
}
914909

915910
#ifdef BUILD_TESTS
@@ -1195,13 +1190,6 @@ BucketManagerImpl::assumeState(HistoryArchiveState const& has,
11951190
{
11961191
mBucketList->restartMerges(mApp, maxProtocolVersion, has.currentLedger);
11971192
}
1198-
1199-
if (mApp.getConfig().isUsingBucketListDB())
1200-
{
1201-
mSnapshotManager->updateCurrentSnapshot(
1202-
std::make_unique<BucketListSnapshot>(*mBucketList,
1203-
has.currentLedger));
1204-
}
12051193
cleanupStaleFiles();
12061194
}
12071195

src/bucket/BucketManagerImpl.h

+1-2
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,7 @@ class BucketManagerImpl : public BucketManager
133133
#endif
134134

135135
void forgetUnreferencedBuckets() override;
136-
void addBatch(Application& app, uint32_t currLedger,
137-
uint32_t currLedgerProtocol,
136+
void addBatch(Application& app, LedgerHeader header,
138137
std::vector<LedgerEntry> const& initEntries,
139138
std::vector<LedgerEntry> const& liveEntries,
140139
std::vector<LedgerKey> const& deadEntries) override;

src/bucket/BucketSnapshotManager.h

+1-12
Original file line numberDiff line numberDiff line change
@@ -58,22 +58,11 @@ class BucketSnapshotManager : NonMovableOrCopyable
5858

5959
mutable std::optional<VirtualClock::time_point> mTimerStart;
6060

61+
public:
6162
// Called by main thread to update mCurrentSnapshot whenever the BucketList
6263
// is updated
6364
void updateCurrentSnapshot(
6465
std::unique_ptr<BucketListSnapshot const>&& newSnapshot);
65-
66-
friend void
67-
BucketManagerImpl::addBatch(Application& app, uint32_t currLedger,
68-
uint32_t currLedgerProtocol,
69-
std::vector<LedgerEntry> const& initEntries,
70-
std::vector<LedgerEntry> const& liveEntries,
71-
std::vector<LedgerKey> const& deadEntries);
72-
friend void BucketManagerImpl::assumeState(HistoryArchiveState const& has,
73-
uint32_t maxProtocolVersion,
74-
bool restartMerges);
75-
76-
public:
7766
// numHistoricalLedgers is the number of historical snapshots that the
7867
// snapshot manager will maintain. If numHistoricalLedgers is 5, snapshots
7968
// will be capable of querying state from ledger [lcl, lcl - 5].

src/bucket/test/BucketIndexTests.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ class BucketIndexTest
132132
.copySearchableBucketListSnapshot();
133133
auto lk = LedgerEntryKey(canonicalEntry);
134134

135-
auto currentLoadedEntry = searchableBL->getLedgerEntry(lk);
135+
auto currentLoadedEntry = searchableBL->load(lk);
136136
REQUIRE(currentLoadedEntry);
137137

138138
// Note: The definition of "historical snapshot" ledger is that the
@@ -261,7 +261,7 @@ class BucketIndexTest
261261
loadResult.clear();
262262
for (auto const& key : mKeysToSearch)
263263
{
264-
auto entryPtr = searchableBL->getLedgerEntry(key);
264+
auto entryPtr = searchableBL->load(key);
265265
if (entryPtr)
266266
{
267267
loadResult.emplace_back(*entryPtr);
@@ -332,7 +332,7 @@ class BucketIndexTest
332332
// Test individual load
333333
for (auto const& key : invalidKeys)
334334
{
335-
auto entryPtr = searchableBL->getLedgerEntry(key);
335+
auto entryPtr = searchableBL->load(key);
336336
REQUIRE(!entryPtr);
337337
}
338338
}

src/bucket/test/BucketListTests.cpp

+35-16
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,11 @@ TEST_CASE_VERSIONS("bucket list", "[bucket][bucketlist]")
143143
!app->getClock().getIOContext().stopped() && i < 130; ++i)
144144
{
145145
app->getClock().crank(false);
146-
bl.addBatch(
147-
*app, i, getAppLedgerVersion(app), {},
146+
auto lh =
147+
app->getLedgerManager().getLastClosedLedgerHeader().header;
148+
lh.ledgerSeq = i;
149+
addBatchAndUpdateSnapshot(
150+
bl, *app, lh, {},
148151
LedgerTestUtils::generateValidUniqueLedgerEntries(8),
149152
LedgerTestUtils::generateValidLedgerEntryKeysWithExclusions(
150153
{CONFIG_SETTING}, 5));
@@ -268,8 +271,11 @@ TEST_CASE_VERSIONS("bucket list shadowing pre/post proto 12",
268271
BucketEntryBob.liveEntry().data.account() = bob;
269272
liveBatch.push_back(BucketEntryBob.liveEntry());
270273

271-
bl.addBatch(
272-
*app, i, getAppLedgerVersion(app), {}, liveBatch,
274+
auto lh =
275+
app->getLedgerManager().getLastClosedLedgerHeader().header;
276+
lh.ledgerSeq = i;
277+
addBatchAndUpdateSnapshot(
278+
bl, *app, lh, {}, liveBatch,
273279
LedgerTestUtils::generateValidLedgerEntryKeysWithExclusions(
274280
{CONFIG_SETTING}, 5));
275281
if (i % 100 == 0)
@@ -366,8 +372,11 @@ TEST_CASE_VERSIONS("bucket tombstones expire at bottom level",
366372
for (auto j : ledgers)
367373
{
368374
auto n = mergeTimer.count();
369-
bl.addBatch(
370-
*app, j, getAppLedgerVersion(app), {},
375+
auto lh =
376+
app->getLedgerManager().getLastClosedLedgerHeader().header;
377+
lh.ledgerSeq = j;
378+
addBatchAndUpdateSnapshot(
379+
bl, *app, lh, {},
371380
LedgerTestUtils::generateValidUniqueLedgerEntries(8),
372381
LedgerTestUtils::generateValidLedgerEntryKeysWithExclusions(
373382
{CONFIG_SETTING}, 5));
@@ -407,7 +416,6 @@ TEST_CASE_VERSIONS("bucket tombstones mutually-annihilate init entries",
407416
for_versions_with_differing_bucket_logic(cfg, [&](Config const& cfg) {
408417
Application::pointer app = createTestApplication(clock, cfg);
409418
BucketList bl;
410-
auto vers = getAppLedgerVersion(app);
411419
autocheck::generator<bool> flip;
412420
std::deque<LedgerEntry> entriesToModify;
413421
for (uint32_t i = 1; i < 512; ++i)
@@ -443,7 +451,11 @@ TEST_CASE_VERSIONS("bucket tombstones mutually-annihilate init entries",
443451
deadEntries.push_back(LedgerEntryKey(e));
444452
}
445453
}
446-
bl.addBatch(*app, i, vers, initEntries, liveEntries, deadEntries);
454+
auto lh =
455+
app->getLedgerManager().getLastClosedLedgerHeader().header;
456+
lh.ledgerSeq = i;
457+
addBatchAndUpdateSnapshot(bl, *app, lh, initEntries, liveEntries,
458+
deadEntries);
447459
app->getClock().crank(false);
448460
for (uint32_t k = 0u; k < BucketList::kNumLevels; ++k)
449461
{
@@ -487,17 +499,21 @@ TEST_CASE_VERSIONS("single entry bubbling up",
487499
std::vector<stellar::LedgerEntry> emptySetEntry;
488500

489501
CLOG_DEBUG(Bucket, "Adding single entry in lowest level");
490-
bl.addBatch(*app, 1, getAppLedgerVersion(app), {},
491-
LedgerTestUtils::generateValidLedgerEntries(1),
492-
emptySet);
502+
addBatchAndUpdateSnapshot(
503+
bl, *app,
504+
app->getLedgerManager().getLastClosedLedgerHeader().header, {},
505+
LedgerTestUtils::generateValidLedgerEntries(1), emptySet);
493506

494507
CLOG_DEBUG(Bucket, "Adding empty batches to bucket list");
495508
for (uint32_t i = 2;
496509
!app->getClock().getIOContext().stopped() && i < 300; ++i)
497510
{
498511
app->getClock().crank(false);
499-
bl.addBatch(*app, i, getAppLedgerVersion(app), {},
500-
emptySetEntry, emptySet);
512+
auto lh =
513+
app->getLedgerManager().getLastClosedLedgerHeader().header;
514+
lh.ledgerSeq = i;
515+
addBatchAndUpdateSnapshot(bl, *app, lh, {}, emptySetEntry,
516+
emptySet);
501517
if (i % 10 == 0)
502518
CLOG_DEBUG(Bucket, "Added batch {}, hash={}", i,
503519
binToHex(bl.getHash()));
@@ -655,8 +671,11 @@ TEST_CASE("BucketList check bucket sizes", "[bucket][bucketlist][count]")
655671
{
656672
app->getClock().crank(false);
657673
ledgers[ledgerSeq - 1].lastModifiedLedgerSeq = ledgerSeq;
658-
bl.addBatch(*app, ledgerSeq, getAppLedgerVersion(app), {},
659-
{ledgers[ledgerSeq - 1]}, emptySet);
674+
auto lh =
675+
app->getLedgerManager().getLastClosedLedgerHeader().header;
676+
lh.ledgerSeq = ledgerSeq;
677+
addBatchAndUpdateSnapshot(bl, *app, lh, {},
678+
{ledgers[ledgerSeq - 1]}, emptySet);
660679
}
661680
for (uint32_t level = 0; level < BucketList::kNumLevels; ++level)
662681
{
@@ -1262,7 +1281,7 @@ TEST_CASE_VERSIONS("Searchable BucketListDB snapshots", "[bucketlist]")
12621281
closeLedger(*app);
12631282

12641283
// Snapshot should automatically update with latest version
1265-
auto loadedEntry = searchableBL->getLedgerEntry(LedgerEntryKey(entry));
1284+
auto loadedEntry = searchableBL->load(LedgerEntryKey(entry));
12661285
REQUIRE((loadedEntry && *loadedEntry == entry));
12671286
}
12681287
}

src/bucket/test/BucketManagerTests.cpp

+18-9
Original file line numberDiff line numberDiff line change
@@ -358,9 +358,12 @@ TEST_CASE_VERSIONS("bucketmanager reattach to finished merge",
358358
do
359359
{
360360
++ledger;
361-
bl.addBatch(*app, ledger, vers, {},
362-
LedgerTestUtils::generateValidUniqueLedgerEntries(10),
363-
{});
361+
auto lh =
362+
app->getLedgerManager().getLastClosedLedgerHeader().header;
363+
lh.ledgerSeq = ledger;
364+
addBatchAndUpdateSnapshot(
365+
bl, *app, lh, {},
366+
LedgerTestUtils::generateValidUniqueLedgerEntries(10), {});
364367
bm.forgetUnreferencedBuckets();
365368
} while (!BucketList::levelShouldSpill(ledger, level - 1));
366369

@@ -442,9 +445,12 @@ TEST_CASE_VERSIONS("bucketmanager reattach to running merge",
442445
// Merges will start on one or more levels here, starting a race
443446
// between the main thread here and the background workers doing
444447
// the merges.
445-
bl.addBatch(*app, ledger, vers, {},
446-
LedgerTestUtils::generateValidUniqueLedgerEntries(100),
447-
{});
448+
auto lh =
449+
app->getLedgerManager().getLastClosedLedgerHeader().header;
450+
lh.ledgerSeq = ledger;
451+
addBatchAndUpdateSnapshot(
452+
bl, *app, lh, {},
453+
LedgerTestUtils::generateValidUniqueLedgerEntries(100), {});
448454

449455
bm.forgetUnreferencedBuckets();
450456

@@ -577,9 +583,12 @@ TEST_CASE_VERSIONS(
577583
auto ra = bm.readMergeCounters().mFinishedMergeReattachments;
578584
CLOG_INFO(Bucket, "finished-merge reattachments while queueing: {}",
579585
ra);
580-
bl.addBatch(*app, lm.getLastClosedLedgerNum() + 1, vers, {},
581-
LedgerTestUtils::generateValidUniqueLedgerEntries(100),
582-
{});
586+
auto lh =
587+
app->getLedgerManager().getLastClosedLedgerHeader().header;
588+
lh.ledgerSeq++;
589+
addBatchAndUpdateSnapshot(
590+
bl, *app, lh, {},
591+
LedgerTestUtils::generateValidUniqueLedgerEntries(100), {});
583592
clock.crank(false);
584593
bm.forgetUnreferencedBuckets();
585594
}

0 commit comments

Comments
 (0)