Skip to content

Commit 1e53b11

Browse files
committed
Close ledgers in parallel when flag is enabled
1 parent e0c5ff1 commit 1e53b11

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+574
-379
lines changed

src/bucket/BucketList.cpp

+3-5
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include "crypto/SHA.h"
1212
#include "ledger/LedgerManager.h"
1313
#include "ledger/LedgerTxn.h"
14+
#include "ledger/NetworkConfig.h"
1415
#include "main/Application.h"
1516
#include "util/GlobalChecks.h"
1617
#include "util/Logging.h"
@@ -56,7 +57,6 @@ BucketLevel::getNext()
5657
void
5758
BucketLevel::setNext(FutureBucket const& fb)
5859
{
59-
releaseAssert(threadIsMain());
6060
mNextCurr = fb;
6161
}
6262

@@ -75,7 +75,6 @@ BucketLevel::getSnap() const
7575
void
7676
BucketLevel::setCurr(std::shared_ptr<Bucket> b)
7777
{
78-
releaseAssert(threadIsMain());
7978
mNextCurr.clear();
8079
mCurr = b;
8180
}
@@ -776,7 +775,8 @@ void
776775
BucketList::scanForEvictionLegacy(Application& app, AbstractLedgerTxn& ltx,
777776
uint32_t ledgerSeq,
778777
EvictionCounters& counters,
779-
std::shared_ptr<EvictionStatistics> stats)
778+
std::shared_ptr<EvictionStatistics> stats,
779+
SorobanNetworkConfig& networkConfig)
780780
{
781781
releaseAssert(stats);
782782

@@ -785,8 +785,6 @@ BucketList::scanForEvictionLegacy(Application& app, AbstractLedgerTxn& ltx,
785785
return iter.isCurrBucket ? level.getCurr() : level.getSnap();
786786
};
787787

788-
auto const& networkConfig =
789-
app.getLedgerManager().getSorobanNetworkConfig();
790788
auto const firstScanLevel =
791789
networkConfig.stateArchivalSettings().startingEvictionScanLevel;
792790
auto evictionIter = networkConfig.evictionIterator();

src/bucket/BucketList.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,7 @@ struct BucketEntryCounters;
349349
class Config;
350350
struct EvictionCounters;
351351
struct InflationWinner;
352+
class SorobanNetworkConfig;
352353

353354
namespace testutil
354355
{
@@ -478,7 +479,8 @@ class BucketList
478479

479480
void scanForEvictionLegacy(Application& app, AbstractLedgerTxn& ltx,
480481
uint32_t ledgerSeq, EvictionCounters& counters,
481-
std::shared_ptr<EvictionStatistics> stats);
482+
std::shared_ptr<EvictionStatistics> stats,
483+
SorobanNetworkConfig& networkConfig);
482484

483485
// Restart any merges that might be running on background worker threads,
484486
// merging buckets between levels. This needs to be called after forcing a

src/bucket/BucketListSnapshot.cpp

-2
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ BucketListSnapshot::BucketListSnapshot(BucketList const& bl,
1717
LedgerHeader header)
1818
: mHeader(std::move(header))
1919
{
20-
releaseAssert(threadIsMain());
21-
2220
for (uint32_t i = 0; i < BucketList::kNumLevels; ++i)
2321
{
2422
auto const& level = bl.getLevel(i);

src/bucket/BucketManager.h

+7-3
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ struct HistoryArchiveState;
3535
struct InflationWinner;
3636
struct LedgerHeader;
3737
struct MergeKey;
38+
class SorobanNetworkConfig;
3839

3940
// A fine-grained merge-operation-counter structure for tracking various
4041
// events during merges. These are not medida counters because we do not
@@ -288,12 +289,15 @@ class BucketManager : NonMovableOrCopyable
288289
// pointed to by EvictionIterator. Scans until `maxEntriesToEvict` entries
289290
// have been evicted or maxEvictionScanSize bytes have been scanned.
290291
virtual void scanForEvictionLegacy(AbstractLedgerTxn& ltx,
291-
uint32_t ledgerSeq) = 0;
292+
uint32_t ledgerSeq,
293+
SorobanNetworkConfig& networkConfig) = 0;
292294

293-
virtual void startBackgroundEvictionScan(uint32_t ledgerSeq) = 0;
295+
virtual void startBackgroundEvictionScan(uint32_t ledgerSeq,
296+
bool callFromLedgerClose) = 0;
294297
virtual void
295298
resolveBackgroundEvictionScan(AbstractLedgerTxn& ltx, uint32_t ledgerSeq,
296-
LedgerKeySet const& modifiedKeys) = 0;
299+
LedgerKeySet const& modifiedKeys,
300+
SorobanNetworkConfig& networkConfig) = 0;
297301

298302
virtual medida::Meter& getBloomMissMeter() const = 0;
299303
virtual medida::Meter& getBloomLookupMeter() const = 0;

src/bucket/BucketManagerImpl.cpp

+42-41
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ void
9191
BucketManagerImpl::initialize()
9292
{
9393
ZoneScoped;
94-
std::string d = mApp.getConfig().BUCKET_DIR_PATH;
94+
std::string d = mConfig.BUCKET_DIR_PATH;
9595

9696
if (!fs::exists(d))
9797
{
@@ -122,17 +122,17 @@ BucketManagerImpl::initialize()
122122
mLockedBucketDir = std::make_unique<std::string>(d);
123123
mTmpDirManager = std::make_unique<TmpDirManager>(d + "/tmp");
124124

125-
if (mApp.getConfig().MODE_ENABLES_BUCKETLIST)
125+
if (mConfig.MODE_ENABLES_BUCKETLIST)
126126
{
127127
mBucketList = std::make_unique<BucketList>();
128128

129-
if (mApp.getConfig().isUsingBucketListDB())
129+
if (mConfig.isUsingBucketListDB())
130130
{
131131
mSnapshotManager = std::make_unique<BucketSnapshotManager>(
132132
mApp,
133133
std::make_unique<BucketListSnapshot>(*mBucketList,
134134
LedgerHeader()),
135-
mApp.getConfig().QUERY_SNAPSHOT_LEDGERS);
135+
mConfig.QUERY_SNAPSHOT_LEDGERS);
136136
}
137137
}
138138

@@ -141,11 +141,10 @@ BucketManagerImpl::initialize()
141141
// BUCKET_DIR_PATH, HISTORY_FILE_TYPE_SCP is persisted to the database
142142
// so create the remaining ledger header, transactions and results
143143
// directories
144-
createPublishDir(FileType::HISTORY_FILE_TYPE_LEDGER, mApp.getConfig());
145-
createPublishDir(FileType::HISTORY_FILE_TYPE_TRANSACTIONS,
146-
mApp.getConfig());
147-
createPublishDir(FileType::HISTORY_FILE_TYPE_RESULTS, mApp.getConfig());
148-
HistoryManager::createPublishQueueDir(mApp.getConfig());
144+
createPublishDir(FileType::HISTORY_FILE_TYPE_LEDGER, mConfig);
145+
createPublishDir(FileType::HISTORY_FILE_TYPE_TRANSACTIONS, mConfig);
146+
createPublishDir(FileType::HISTORY_FILE_TYPE_RESULTS, mConfig);
147+
HistoryManager::createPublishQueueDir(mConfig);
149148
}
150149

151150
void
@@ -178,6 +177,7 @@ EvictionCounters::EvictionCounters(Application& app)
178177

179178
BucketManagerImpl::BucketManagerImpl(Application& app)
180179
: mApp(app)
180+
, mConfig(app.getConfig())
181181
, mBucketList(nullptr)
182182
, mSnapshotManager(nullptr)
183183
, mTmpDirManager(nullptr)
@@ -299,7 +299,7 @@ void
299299
BucketManagerImpl::deleteEntireBucketDir()
300300
{
301301
ZoneScoped;
302-
std::string d = mApp.getConfig().BUCKET_DIR_PATH;
302+
std::string d = mConfig.BUCKET_DIR_PATH;
303303
if (fs::exists(d))
304304
{
305305
// First clean out the contents of the tmpdir, as usual.
@@ -332,7 +332,7 @@ BucketManagerImpl::deleteTmpDirAndUnlockBucketDir()
332332
// Then delete the lockfile $BUCKET_DIR_PATH/stellar-core.lock
333333
if (mLockedBucketDir)
334334
{
335-
std::string d = mApp.getConfig().BUCKET_DIR_PATH;
335+
std::string d = mConfig.BUCKET_DIR_PATH;
336336
std::string lock = d + "/" + kLockFilename;
337337
releaseAssert(fs::exists(lock));
338338
fs::unlockFile(lock);
@@ -343,14 +343,14 @@ BucketManagerImpl::deleteTmpDirAndUnlockBucketDir()
343343
BucketList&
344344
BucketManagerImpl::getBucketList()
345345
{
346-
releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST);
346+
releaseAssertOrThrow(mConfig.MODE_ENABLES_BUCKETLIST);
347347
return *mBucketList;
348348
}
349349

350350
BucketSnapshotManager&
351351
BucketManagerImpl::getBucketSnapshotManager() const
352352
{
353-
releaseAssertOrThrow(mApp.getConfig().isUsingBucketListDB());
353+
releaseAssertOrThrow(mConfig.isUsingBucketListDB());
354354
releaseAssert(mSnapshotManager);
355355
return *mSnapshotManager;
356356
}
@@ -476,7 +476,7 @@ BucketManagerImpl::renameBucketDirFile(std::filesystem::path const& src,
476476
std::filesystem::path const& dst)
477477
{
478478
ZoneScoped;
479-
if (mApp.getConfig().DISABLE_XDR_FSYNC)
479+
if (mConfig.DISABLE_XDR_FSYNC)
480480
{
481481
return rename(src.string().c_str(), dst.string().c_str()) == 0;
482482
}
@@ -492,7 +492,7 @@ BucketManagerImpl::adoptFileAsBucket(std::string const& filename,
492492
std::unique_ptr<BucketIndex const> index)
493493
{
494494
ZoneScoped;
495-
releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST);
495+
releaseAssertOrThrow(mConfig.MODE_ENABLES_BUCKETLIST);
496496
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
497497

498498
if (mergeKey)
@@ -566,7 +566,7 @@ BucketManagerImpl::adoptFileAsBucket(std::string const& filename,
566566
void
567567
BucketManagerImpl::noteEmptyMergeOutput(MergeKey const& mergeKey)
568568
{
569-
releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST);
569+
releaseAssertOrThrow(mConfig.MODE_ENABLES_BUCKETLIST);
570570

571571
// We _do_ want to remove the mergeKey from mLiveFutures, both so that that
572572
// map does not grow without bound and more importantly so that we drop the
@@ -681,7 +681,7 @@ BucketManagerImpl::putMergeFuture(
681681
MergeKey const& key, std::shared_future<std::shared_ptr<Bucket>> wp)
682682
{
683683
ZoneScoped;
684-
releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST);
684+
releaseAssertOrThrow(mConfig.MODE_ENABLES_BUCKETLIST);
685685
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
686686
CLOG_TRACE(
687687
Bucket,
@@ -704,7 +704,7 @@ BucketManagerImpl::getBucketListReferencedBuckets() const
704704
{
705705
ZoneScoped;
706706
std::set<Hash> referenced;
707-
if (!mApp.getConfig().MODE_ENABLES_BUCKETLIST)
707+
if (!mConfig.MODE_ENABLES_BUCKETLIST)
708708
{
709709
return referenced;
710710
}
@@ -743,7 +743,7 @@ BucketManagerImpl::getAllReferencedBuckets() const
743743
{
744744
ZoneScoped;
745745
auto referenced = getBucketListReferencedBuckets();
746-
if (!mApp.getConfig().MODE_ENABLES_BUCKETLIST)
746+
if (!mConfig.MODE_ENABLES_BUCKETLIST)
747747
{
748748
return referenced;
749749
}
@@ -788,7 +788,7 @@ void
788788
BucketManagerImpl::cleanupStaleFiles()
789789
{
790790
ZoneScoped;
791-
if (mApp.getConfig().DISABLE_BUCKET_GC)
791+
if (mConfig.DISABLE_BUCKET_GC)
792792
{
793793
return;
794794
}
@@ -867,7 +867,7 @@ BucketManagerImpl::forgetUnreferencedBuckets()
867867
CLOG_TRACE(Bucket,
868868
"BucketManager::forgetUnreferencedBuckets dropping {}",
869869
filename);
870-
if (!filename.empty() && !mApp.getConfig().DISABLE_BUCKET_GC)
870+
if (!filename.empty() && !mConfig.DISABLE_BUCKET_GC)
871871
{
872872
CLOG_TRACE(Bucket, "removing bucket file: {}", filename);
873873
std::filesystem::remove(filename);
@@ -974,7 +974,7 @@ BucketManagerImpl::snapshotLedger(LedgerHeader& currentHeader)
974974
{
975975
ZoneScoped;
976976
Hash hash;
977-
if (mApp.getConfig().MODE_ENABLES_BUCKETLIST)
977+
if (mConfig.MODE_ENABLES_BUCKETLIST)
978978
{
979979
hash = mBucketList->getHash();
980980
}
@@ -1005,25 +1005,29 @@ BucketManagerImpl::maybeSetIndex(std::shared_ptr<Bucket> b,
10051005

10061006
void
10071007
BucketManagerImpl::scanForEvictionLegacy(AbstractLedgerTxn& ltx,
1008-
uint32_t ledgerSeq)
1008+
uint32_t ledgerSeq,
1009+
SorobanNetworkConfig& networkConfig)
10091010
{
10101011
ZoneScoped;
10111012
releaseAssert(protocolVersionStartsFrom(ltx.getHeader().ledgerVersion,
10121013
SOROBAN_PROTOCOL_VERSION));
1013-
mBucketList->scanForEvictionLegacy(
1014-
mApp, ltx, ledgerSeq, mBucketListEvictionCounters, mEvictionStatistics);
1014+
mBucketList->scanForEvictionLegacy(mApp, ltx, ledgerSeq,
1015+
mBucketListEvictionCounters,
1016+
mEvictionStatistics, networkConfig);
10151017
}
10161018

10171019
void
1018-
BucketManagerImpl::startBackgroundEvictionScan(uint32_t ledgerSeq)
1020+
BucketManagerImpl::startBackgroundEvictionScan(uint32_t ledgerSeq,
1021+
bool callFromLedgerClose)
10191022
{
1020-
releaseAssert(mApp.getConfig().isUsingBucketListDB());
1023+
releaseAssert(!threadIsMain() || !mConfig.parallelLedgerClose());
1024+
releaseAssert(mConfig.isUsingBucketListDB());
10211025
releaseAssert(mSnapshotManager);
10221026
releaseAssert(!mEvictionFuture.valid());
10231027
releaseAssert(mEvictionStatistics);
10241028

10251029
auto searchableBL = mSnapshotManager->copySearchableBucketListSnapshot();
1026-
auto const& cfg = mApp.getLedgerManager().getSorobanNetworkConfig();
1030+
auto cfg = mApp.getLedgerManager().getSorobanNetworkConfig();
10271031
auto const& sas = cfg.stateArchivalSettings();
10281032

10291033
using task_t = std::packaged_task<EvictionResult()>;
@@ -1045,28 +1049,25 @@ BucketManagerImpl::startBackgroundEvictionScan(uint32_t ledgerSeq)
10451049
void
10461050
BucketManagerImpl::resolveBackgroundEvictionScan(
10471051
AbstractLedgerTxn& ltx, uint32_t ledgerSeq,
1048-
LedgerKeySet const& modifiedKeys)
1052+
LedgerKeySet const& modifiedKeys, SorobanNetworkConfig& networkConfig)
10491053
{
10501054
ZoneScoped;
1051-
releaseAssert(threadIsMain());
1055+
releaseAssert(!threadIsMain() || !mConfig.parallelLedgerClose());
10521056
releaseAssert(mEvictionStatistics);
10531057

10541058
if (!mEvictionFuture.valid())
10551059
{
1056-
startBackgroundEvictionScan(ledgerSeq);
1060+
startBackgroundEvictionScan(ledgerSeq, false);
10571061
}
10581062

10591063
auto evictionCandidates = mEvictionFuture.get();
10601064

1061-
auto const& networkConfig =
1062-
mApp.getLedgerManager().getSorobanNetworkConfig();
1063-
10641065
// If eviction related settings changed during the ledger, we have to
10651066
// restart the scan
10661067
if (!evictionCandidates.isValid(ledgerSeq,
10671068
networkConfig.stateArchivalSettings()))
10681069
{
1069-
startBackgroundEvictionScan(ledgerSeq);
1070+
startBackgroundEvictionScan(ledgerSeq, false);
10701071
evictionCandidates = mEvictionFuture.get();
10711072
}
10721073

@@ -1176,7 +1177,7 @@ BucketManagerImpl::assumeState(HistoryArchiveState const& has,
11761177
uint32_t maxProtocolVersion, bool restartMerges)
11771178
{
11781179
ZoneScoped;
1179-
releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST);
1180+
releaseAssertOrThrow(mConfig.MODE_ENABLES_BUCKETLIST);
11801181

11811182
for (uint32_t i = 0; i < BucketList::kNumLevels; ++i)
11821183
{
@@ -1203,7 +1204,7 @@ BucketManagerImpl::assumeState(HistoryArchiveState const& has,
12031204

12041205
// Buckets on the BucketList should always be indexed when
12051206
// BucketListDB enabled
1206-
if (mApp.getConfig().isUsingBucketListDB())
1207+
if (mConfig.isUsingBucketListDB())
12071208
{
12081209
releaseAssert(curr->isEmpty() || curr->isIndexed());
12091210
releaseAssert(snap->isEmpty() || snap->isIndexed());
@@ -1328,7 +1329,7 @@ BucketManagerImpl::mergeBuckets(HistoryArchiveState const& has)
13281329
BucketMetadata meta;
13291330
MergeCounters mc;
13301331
auto& ctx = mApp.getClock().getIOContext();
1331-
meta.ledgerVersion = mApp.getConfig().LEDGER_PROTOCOL_VERSION;
1332+
meta.ledgerVersion = mConfig.LEDGER_PROTOCOL_VERSION;
13321333
BucketOutputIterator out(getTmpDir(), /*keepDeadEntries=*/false, meta, mc,
13331334
ctx, /*doFsync=*/true);
13341335
for (auto const& pair : ledgerMap)
@@ -1539,13 +1540,13 @@ BucketManagerImpl::scheduleVerifyReferencedBucketsWork()
15391540
Config const&
15401541
BucketManagerImpl::getConfig() const
15411542
{
1542-
return mApp.getConfig();
1543+
return mConfig;
15431544
}
15441545

15451546
std::shared_ptr<SearchableBucketListSnapshot>
15461547
BucketManagerImpl::getSearchableBucketListSnapshot()
15471548
{
1548-
releaseAssert(mApp.getConfig().isUsingBucketListDB());
1549+
releaseAssert(mConfig.isUsingBucketListDB());
15491550
// Any other threads must maintain their own snapshot
15501551
releaseAssert(threadIsMain());
15511552
if (!mSearchableBucketListSnapshot)
@@ -1560,7 +1561,7 @@ BucketManagerImpl::getSearchableBucketListSnapshot()
15601561
void
15611562
BucketManagerImpl::reportBucketEntryCountMetrics()
15621563
{
1563-
if (!mApp.getConfig().isUsingBucketListDB())
1564+
if (!mConfig.isUsingBucketListDB())
15641565
{
15651566
return;
15661567
}

0 commit comments

Comments
 (0)