Skip to content

Commit d88c58f

Browse files
committed
Parallel ledger close implementation
1 parent 4ea77f4 commit d88c58f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+1198
-698
lines changed

src/bucket/BucketListBase.cpp

+1-2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include "bucket/LiveBucket.h"
1010
#include "crypto/SHA.h"
1111
#include "ledger/LedgerTxn.h"
12+
#include "ledger/NetworkConfig.h"
1213
#include "main/Application.h"
1314
#include "util/GlobalChecks.h"
1415
#include "util/Logging.h"
@@ -57,7 +58,6 @@ template <typename BucketT>
5758
void
5859
BucketLevel<BucketT>::setNext(FutureBucket<BucketT> const& fb)
5960
{
60-
releaseAssert(threadIsMain());
6161
mNextCurr = fb;
6262
}
6363

@@ -79,7 +79,6 @@ template <typename BucketT>
7979
void
8080
BucketLevel<BucketT>::setCurr(std::shared_ptr<BucketT> b)
8181
{
82-
releaseAssert(threadIsMain());
8382
mNextCurr.clear();
8483
mCurr = b;
8584
}

src/bucket/BucketListSnapshotBase.cpp

-2
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ BucketListSnapshot<BucketT>::BucketListSnapshot(
1919
BucketListBase<BucketT> const& bl, LedgerHeader header)
2020
: mHeader(std::move(header))
2121
{
22-
releaseAssert(threadIsMain());
23-
2422
for (uint32_t i = 0; i < BucketListBase<BucketT>::kNumLevels; ++i)
2523
{
2624
auto const& level = bl.getLevel(i);

src/bucket/BucketManager.cpp

+32-34
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ void
6262
BucketManager::initialize()
6363
{
6464
ZoneScoped;
65+
releaseAssert(threadIsMain());
6566
std::string d = mConfig.BUCKET_DIR_PATH;
6667

6768
if (!fs::exists(d))
@@ -729,7 +730,7 @@ BucketManager::getBucketListReferencedBuckets() const
729730
}
730731

731732
std::set<Hash>
732-
BucketManager::getAllReferencedBuckets() const
733+
BucketManager::getAllReferencedBuckets(HistoryArchiveState const& has) const
733734
{
734735
ZoneScoped;
735736
auto referenced = getBucketListReferencedBuckets();
@@ -740,8 +741,7 @@ BucketManager::getAllReferencedBuckets() const
740741

741742
// retain any bucket referenced by the last closed ledger as recorded in the
742743
// database (as merges complete, the bucket list drifts from that state)
743-
auto lclHas = mApp.getLedgerManager().getLastClosedLedgerHAS();
744-
auto lclBuckets = lclHas.allBuckets();
744+
auto lclBuckets = has.allBuckets();
745745
for (auto const& h : lclBuckets)
746746
{
747747
auto rit = referenced.emplace(hexToBin256(h));
@@ -752,39 +752,38 @@ BucketManager::getAllReferencedBuckets() const
752752
}
753753

754754
// retain buckets that are referenced by a state in the publish queue.
755-
auto pub = mApp.getHistoryManager().getBucketsReferencedByPublishQueue();
755+
for (auto const& h :
756+
HistoryManager::getBucketsReferencedByPublishQueue(mApp.getConfig()))
756757
{
757-
for (auto const& h : pub)
758+
auto rhash = hexToBin256(h);
759+
auto rit = referenced.emplace(rhash);
760+
if (rit.second)
758761
{
759-
auto rhash = hexToBin256(h);
760-
auto rit = referenced.emplace(rhash);
761-
if (rit.second)
762-
{
763-
CLOG_TRACE(Bucket, "{} referenced by publish queue", h);
764-
765-
// Project referenced bucket `rhash` -- which might be a merge
766-
// input captured before a merge finished -- through our weak
767-
// map of merge input/output relationships, to find any outputs
768-
// we'll want to retain in order to resynthesize the merge in
769-
// the future, rather than re-run it.
770-
mFinishedMerges.getOutputsUsingInput(rhash, referenced);
771-
}
762+
CLOG_TRACE(Bucket, "{} referenced by publish queue", h);
763+
764+
// Project referenced bucket `rhash` -- which might be a merge
765+
// input captured before a merge finished -- through our weak
766+
// map of merge input/output relationships, to find any outputs
767+
// we'll want to retain in order to resynthesize the merge in
768+
// the future, rather than re-run it.
769+
mFinishedMerges.getOutputsUsingInput(rhash, referenced);
772770
}
773771
}
774772
return referenced;
775773
}
776774

777775
void
778-
BucketManager::cleanupStaleFiles()
776+
BucketManager::cleanupStaleFiles(HistoryArchiveState const& has)
779777
{
780778
ZoneScoped;
779+
releaseAssert(threadIsMain());
781780
if (mConfig.DISABLE_BUCKET_GC)
782781
{
783782
return;
784783
}
785784

786785
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
787-
auto referenced = getAllReferencedBuckets();
786+
auto referenced = getAllReferencedBuckets(has);
788787
std::transform(std::begin(mSharedLiveBuckets), std::end(mSharedLiveBuckets),
789788
std::inserter(referenced, std::end(referenced)),
790789
[](std::pair<Hash, std::shared_ptr<LiveBucket>> const& p) {
@@ -818,11 +817,11 @@ BucketManager::cleanupStaleFiles()
818817
}
819818

820819
void
821-
BucketManager::forgetUnreferencedBuckets()
820+
BucketManager::forgetUnreferencedBuckets(HistoryArchiveState const& has)
822821
{
823822
ZoneScoped;
824823
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
825-
auto referenced = getAllReferencedBuckets();
824+
auto referenced = getAllReferencedBuckets(has);
826825
auto blReferenced = getBucketListReferencedBuckets();
827826

828827
auto bucketMapLoop = [&](auto& bucketMap, auto& futureMap) {
@@ -867,7 +866,7 @@ BucketManager::forgetUnreferencedBuckets()
867866
Bucket,
868867
"BucketManager::forgetUnreferencedBuckets dropping {}",
869868
filename);
870-
if (!filename.empty() && !mApp.getConfig().DISABLE_BUCKET_GC)
869+
if (!filename.empty() && !mConfig.DISABLE_BUCKET_GC)
871870
{
872871
CLOG_TRACE(Bucket, "removing bucket file: {}", filename);
873872
std::filesystem::remove(filename);
@@ -1076,12 +1075,11 @@ BucketManager::startBackgroundEvictionScan(uint32_t ledgerSeq)
10761075
}
10771076

10781077
void
1079-
BucketManager::resolveBackgroundEvictionScan(AbstractLedgerTxn& ltx,
1080-
uint32_t ledgerSeq,
1081-
LedgerKeySet const& modifiedKeys)
1078+
BucketManager::resolveBackgroundEvictionScan(
1079+
AbstractLedgerTxn& ltx, uint32_t ledgerSeq,
1080+
LedgerKeySet const& modifiedKeys, SorobanNetworkConfig& networkConfig)
10821081
{
10831082
ZoneScoped;
1084-
releaseAssert(threadIsMain());
10851083
releaseAssert(mEvictionStatistics);
10861084

10871085
if (!mEvictionFuture.valid())
@@ -1091,9 +1089,6 @@ BucketManager::resolveBackgroundEvictionScan(AbstractLedgerTxn& ltx,
10911089

10921090
auto evictionCandidates = mEvictionFuture.get();
10931091

1094-
auto const& networkConfig =
1095-
mApp.getLedgerManager().getSorobanNetworkConfig();
1096-
10971092
// If eviction related settings changed during the ledger, we have to
10981093
// restart the scan
10991094
if (!evictionCandidates.isValid(ledgerSeq,
@@ -1209,6 +1204,7 @@ BucketManager::assumeState(HistoryArchiveState const& has,
12091204
uint32_t maxProtocolVersion, bool restartMerges)
12101205
{
12111206
ZoneScoped;
1207+
releaseAssert(threadIsMain());
12121208
releaseAssertOrThrow(mConfig.MODE_ENABLES_BUCKETLIST);
12131209

12141210
// TODO: Assume archival bucket state
@@ -1257,7 +1253,7 @@ BucketManager::assumeState(HistoryArchiveState const& has,
12571253
mLiveBucketList->restartMerges(mApp, maxProtocolVersion,
12581254
has.currentLedger);
12591255
}
1260-
cleanupStaleFiles();
1256+
cleanupStaleFiles(has);
12611257
}
12621258

12631259
void
@@ -1358,7 +1354,7 @@ std::shared_ptr<LiveBucket>
13581354
BucketManager::mergeBuckets(HistoryArchiveState const& has)
13591355
{
13601356
ZoneScoped;
1361-
1357+
releaseAssert(threadIsMain());
13621358
std::map<LedgerKey, LedgerEntry> ledgerMap = loadCompleteLedgerState(has);
13631359
BucketMetadata meta;
13641360
MergeCounters mc;
@@ -1548,9 +1544,11 @@ BucketManager::visitLedgerEntries(
15481544
}
15491545

15501546
std::shared_ptr<BasicWork>
1551-
BucketManager::scheduleVerifyReferencedBucketsWork()
1547+
BucketManager::scheduleVerifyReferencedBucketsWork(
1548+
HistoryArchiveState const& has)
15521549
{
1553-
std::set<Hash> hashes = getAllReferencedBuckets();
1550+
releaseAssert(threadIsMain());
1551+
std::set<Hash> hashes = getAllReferencedBuckets(has);
15541552
std::vector<std::shared_ptr<BasicWork>> seq;
15551553
for (auto const& h : hashes)
15561554
{

src/bucket/BucketManager.h

+14-5
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@ class BucketManager : NonMovableOrCopyable
7070

7171
static std::string const kLockFilename;
7272

73+
// NB: ideally, BucketManager should have no access to mApp, as it's too
74+
// dangerous in the context of parallel application. BucketManager is quite
75+
// bloated, with lots of legacy code, so to ensure safety, annotate all
76+
// functions using mApp with `releaseAssert(threadIsMain())` and avoid
77+
// accessing mApp in the background. Safety invariant: lock acquisition must
78+
// always be LCL lock -> BucketManger lock, and never the other direction
7379
Application& mApp;
7480
std::unique_ptr<LiveBucketList> mLiveBucketList;
7581
std::unique_ptr<HotArchiveBucketList> mHotArchiveBucketList;
@@ -126,7 +132,7 @@ class BucketManager : NonMovableOrCopyable
126132

127133
std::atomic<bool> mIsShutdown{false};
128134

129-
void cleanupStaleFiles();
135+
void cleanupStaleFiles(HistoryArchiveState const& has);
130136
void deleteTmpDirAndUnlockBucketDir();
131137
void deleteEntireBucketDir();
132138

@@ -262,7 +268,7 @@ class BucketManager : NonMovableOrCopyable
262268
// not immediately cause the buckets to delete themselves, if someone else
263269
// is using them via a shared_ptr<>, but the BucketManager will no longer
264270
// independently keep them alive.
265-
void forgetUnreferencedBuckets();
271+
void forgetUnreferencedBuckets(HistoryArchiveState const& has);
266272

267273
// Feed a new batch of entries to the bucket list. This interface expects to
268274
// be given separate init (created) and live (updated) entry vectors. The
@@ -295,7 +301,8 @@ class BucketManager : NonMovableOrCopyable
295301
void startBackgroundEvictionScan(uint32_t ledgerSeq);
296302
void resolveBackgroundEvictionScan(AbstractLedgerTxn& ltx,
297303
uint32_t ledgerSeq,
298-
LedgerKeySet const& modifiedKeys);
304+
LedgerKeySet const& modifiedKeys,
305+
SorobanNetworkConfig& networkConfig);
299306

300307
medida::Meter& getBloomMissMeter() const;
301308
medida::Meter& getBloomLookupMeter() const;
@@ -320,7 +327,8 @@ class BucketManager : NonMovableOrCopyable
320327

321328
// Return the set of buckets referenced by the BucketList, LCL HAS,
322329
// and publish queue.
323-
std::set<Hash> getAllReferencedBuckets() const;
330+
std::set<Hash>
331+
getAllReferencedBuckets(HistoryArchiveState const& has) const;
324332

325333
// Check for missing bucket files that would prevent `assumeState` from
326334
// succeeding
@@ -377,7 +385,8 @@ class BucketManager : NonMovableOrCopyable
377385

378386
// Schedule a Work class that verifies the hashes of all referenced buckets
379387
// on background threads.
380-
std::shared_ptr<BasicWork> scheduleVerifyReferencedBucketsWork();
388+
std::shared_ptr<BasicWork>
389+
scheduleVerifyReferencedBucketsWork(HistoryArchiveState const& has);
381390

382391
Config const& getConfig() const;
383392

src/bucket/BucketSnapshotManager.cpp

-3
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ BucketSnapshotManager::recordBulkLoadMetrics(std::string const& label,
7575
{
7676
// For now, only keep metrics for the main thread. We can decide on what
7777
// metrics make sense when more background services are added later.
78-
releaseAssert(threadIsMain());
7978

8079
if (numEntries != 0)
8180
{
@@ -170,8 +169,6 @@ BucketSnapshotManager::updateCurrentSnapshot(
170169
SnapshotPtrT<LiveBucket>&& liveSnapshot,
171170
SnapshotPtrT<HotArchiveBucket>&& hotArchiveSnapshot)
172171
{
173-
releaseAssert(threadIsMain());
174-
175172
auto updateSnapshot = [numHistoricalSnapshots = mNumHistoricalSnapshots](
176173
auto& currentSnapshot, auto& historicalSnapshots,
177174
auto&& newSnapshot) {

src/bucket/LiveBucketList.h

+3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99

1010
namespace stellar
1111
{
12+
13+
class SorobanNetworkConfig;
14+
1215
// The LiveBucketList stores the current canonical state of the ledger. It is
1316
// made up of LiveBucket buckets, which in turn store individual entries of type
1417
// BucketEntry. When an entry is "evicted" from the ledger, it is removed from

src/bucket/SearchableBucketList.cpp

-2
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@ SearchableLiveBucketListSnapshot::loadPoolShareTrustLinesByAccountAndAsset(
110110
ZoneScoped;
111111

112112
// This query should only be called during TX apply
113-
releaseAssert(threadIsMain());
114113
mSnapshotManager.maybeUpdateSnapshot(mSnapshot, mHistoricalSnapshots);
115114
releaseAssert(mSnapshot);
116115

@@ -157,7 +156,6 @@ SearchableLiveBucketListSnapshot::loadInflationWinners(size_t maxWinners,
157156

158157
// This is a legacy query, should only be called by main thread during
159158
// catchup
160-
releaseAssert(threadIsMain());
161159
auto timer = mSnapshotManager.recordBulkLoadMetrics("inflationWinners", 0)
162160
.TimeScope();
163161

src/bucket/test/BucketListTests.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -868,7 +868,8 @@ TEST_CASE_VERSIONS("network config snapshots BucketList size", "[bucketlist]")
868868
for_versions_from(20, *app, [&] {
869869
LedgerManagerForBucketTests& lm = app->getLedgerManager();
870870

871-
auto& networkConfig = app->getLedgerManager().getSorobanNetworkConfig();
871+
auto& networkConfig =
872+
app->getLedgerManager().getMutableSorobanNetworkConfig();
872873

873874
uint32_t windowSize = networkConfig.stateArchivalSettings()
874875
.bucketListSizeWindowSampleSize;

src/bucket/test/BucketManagerTests.cpp

+17-9
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,8 @@ TEST_CASE_VERSIONS("bucketmanager ownership", "[bucket][bucketmanager]")
237237
CHECK(fs::exists(indexFilename));
238238

239239
b.reset();
240-
app->getBucketManager().forgetUnreferencedBuckets();
240+
app->getBucketManager().forgetUnreferencedBuckets(
241+
app->getLedgerManager().getLastClosedLedgerHAS());
241242
CHECK(!fs::exists(filename));
242243
CHECK(!fs::exists(indexFilename));
243244
};
@@ -260,7 +261,8 @@ TEST_CASE_VERSIONS("bucketmanager ownership", "[bucket][bucketmanager]")
260261

261262
// This shouldn't change if we forget unreferenced buckets since
262263
// it's referenced by bucketlist.
263-
app->getBucketManager().forgetUnreferencedBuckets();
264+
app->getBucketManager().forgetUnreferencedBuckets(
265+
app->getLedgerManager().getLastClosedLedgerHAS());
264266
CHECK(b1.use_count() == 3);
265267

266268
// But if we mutate the curr bucket of the bucketlist, it should.
@@ -343,7 +345,8 @@ TEST_CASE_VERSIONS("bucketmanager reattach to finished merge",
343345
LedgerTestUtils::generateValidLedgerEntriesWithExclusions(
344346
{CONFIG_SETTING}, 10),
345347
{});
346-
bm.forgetUnreferencedBuckets();
348+
bm.forgetUnreferencedBuckets(
349+
app->getLedgerManager().getLastClosedLedgerHAS());
347350
} while (!LiveBucketList::levelShouldSpill(ledger, level - 1));
348351

349352
// Check that the merge on level isn't committed (we're in
@@ -433,7 +436,8 @@ TEST_CASE_VERSIONS("bucketmanager reattach to running merge",
433436
{CONFIG_SETTING}, 100),
434437
{});
435438

436-
bm.forgetUnreferencedBuckets();
439+
bm.forgetUnreferencedBuckets(
440+
app->getLedgerManager().getLastClosedLedgerHAS());
437441

438442
HistoryArchiveState has(ledger, bl,
439443
app->getConfig().NETWORK_PASSPHRASE);
@@ -517,8 +521,10 @@ TEST_CASE("bucketmanager do not leak empty-merge futures",
517521
bl.resolveAnyReadyFutures();
518522
std::this_thread::sleep_for(std::chrono::seconds(1));
519523
}
520-
bm.forgetUnreferencedBuckets();
521-
auto bmRefBuckets = bm.getAllReferencedBuckets();
524+
bm.forgetUnreferencedBuckets(
525+
app->getLedgerManager().getLastClosedLedgerHAS());
526+
auto bmRefBuckets = bm.getAllReferencedBuckets(
527+
app->getLedgerManager().getLastClosedLedgerHAS());
522528
auto bmDirBuckets = bm.getBucketHashesInBucketDirForTesting();
523529

524530
// Remove the 0 bucket in case it's "referenced"; it's never a file.
@@ -574,16 +580,18 @@ TEST_CASE_VERSIONS(
574580
{CONFIG_SETTING}, 100),
575581
{});
576582
clock.crank(false);
577-
bm.forgetUnreferencedBuckets();
583+
bm.forgetUnreferencedBuckets(
584+
app->getLedgerManager().getLastClosedLedgerHAS());
578585
}
579586
// We should have published nothing and have the first
580587
// checkpoint still queued.
581588
REQUIRE(hm.getPublishSuccessCount() == 0);
582-
REQUIRE(hm.getMinLedgerQueuedToPublish() == 7);
589+
REQUIRE(HistoryManager::getMinLedgerQueuedToPublish(app->getConfig()) ==
590+
7);
583591

584592
auto oldReattachments =
585593
bm.readMergeCounters().mFinishedMergeReattachments;
586-
auto HASs = hm.getPublishQueueStates();
594+
auto HASs = HistoryManager::getPublishQueueStates(app->getConfig());
587595
REQUIRE(HASs.size() == 5);
588596
for (auto& has : HASs)
589597
{

0 commit comments

Comments
 (0)