Skip to content

Commit

Permalink
Make garbage collection thread-safe
Browse files Browse the repository at this point in the history
  • Loading branch information
marta-lokhova committed Dec 16, 2024
1 parent a41c6d4 commit 95cab12
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 29 deletions.
22 changes: 11 additions & 11 deletions src/bucket/BucketManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ BucketManager::getBucketListReferencedBuckets() const
}

std::set<Hash>
BucketManager::getAllReferencedBuckets() const
BucketManager::getAllReferencedBuckets(HistoryArchiveState const& has) const
{
ZoneScoped;
auto referenced = getBucketListReferencedBuckets();
Expand All @@ -748,8 +748,7 @@ BucketManager::getAllReferencedBuckets() const

// retain any bucket referenced by the last closed ledger as recorded in the
// database (as merges complete, the bucket list drifts from that state)
auto lclHas = mApp.getLedgerManager().getLastClosedLedgerHAS();
auto lclBuckets = lclHas.allBuckets();
auto lclBuckets = has.allBuckets();
for (auto const& h : lclBuckets)
{
auto rit = referenced.emplace(hexToBin256(h));
Expand Down Expand Up @@ -781,7 +780,7 @@ BucketManager::getAllReferencedBuckets() const
}

void
BucketManager::cleanupStaleFiles()
BucketManager::cleanupStaleFiles(HistoryArchiveState const& has)
{
ZoneScoped;
releaseAssert(threadIsMain());
Expand All @@ -791,7 +790,7 @@ BucketManager::cleanupStaleFiles()
}

std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
auto referenced = getAllReferencedBuckets();
auto referenced = getAllReferencedBuckets(has);
std::transform(std::begin(mSharedLiveBuckets), std::end(mSharedLiveBuckets),
std::inserter(referenced, std::end(referenced)),
[](std::pair<Hash, std::shared_ptr<LiveBucket>> const& p) {
Expand Down Expand Up @@ -825,11 +824,11 @@ BucketManager::cleanupStaleFiles()
}

void
BucketManager::forgetUnreferencedBuckets()
BucketManager::forgetUnreferencedBuckets(HistoryArchiveState const& has)
{
ZoneScoped;
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
auto referenced = getAllReferencedBuckets();
auto referenced = getAllReferencedBuckets(has);
auto blReferenced = getBucketListReferencedBuckets();

auto bucketMapLoop = [&](auto& bucketMap, auto& futureMap) {
Expand Down Expand Up @@ -945,7 +944,7 @@ BucketManager::addLiveBatch(Application& app, LedgerHeader header,
initEntries, liveEntries, deadEntries);
mLiveBucketListSizeCounter.set_count(mLiveBucketList->getSize());

if (app.getConfig().isUsingBucketListDB())
if (mConfig.isUsingBucketListDB())
{
reportBucketEntryCountMetrics();
}
Expand Down Expand Up @@ -1280,7 +1279,7 @@ BucketManager::assumeState(HistoryArchiveState const& has,
mLiveBucketList->restartMerges(mApp, maxProtocolVersion,
has.currentLedger);
}
cleanupStaleFiles();
cleanupStaleFiles(has);
}

void
Expand Down Expand Up @@ -1571,10 +1570,11 @@ BucketManager::visitLedgerEntries(
}

std::shared_ptr<BasicWork>
BucketManager::scheduleVerifyReferencedBucketsWork()
BucketManager::scheduleVerifyReferencedBucketsWork(
HistoryArchiveState const& has)
{
releaseAssert(threadIsMain());
std::set<Hash> hashes = getAllReferencedBuckets();
std::set<Hash> hashes = getAllReferencedBuckets(has);
std::vector<std::shared_ptr<BasicWork>> seq;
for (auto const& h : hashes)
{
Expand Down
10 changes: 6 additions & 4 deletions src/bucket/BucketManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class BucketManager : NonMovableOrCopyable

std::atomic<bool> mIsShutdown{false};

void cleanupStaleFiles();
void cleanupStaleFiles(HistoryArchiveState const& has);
void deleteTmpDirAndUnlockBucketDir();
void deleteEntireBucketDir();

Expand Down Expand Up @@ -268,7 +268,7 @@ class BucketManager : NonMovableOrCopyable
// not immediately cause the buckets to delete themselves, if someone else
// is using them via a shared_ptr<>, but the BucketManager will no longer
// independently keep them alive.
void forgetUnreferencedBuckets();
void forgetUnreferencedBuckets(HistoryArchiveState const& has);

// Feed a new batch of entries to the bucket list. This interface expects to
// be given separate init (created) and live (updated) entry vectors. The
Expand Down Expand Up @@ -329,7 +329,8 @@ class BucketManager : NonMovableOrCopyable

// Return the set of buckets referenced by the BucketList, LCL HAS,
// and publish queue.
std::set<Hash> getAllReferencedBuckets() const;
std::set<Hash>
getAllReferencedBuckets(HistoryArchiveState const& has) const;

// Check for missing bucket files that would prevent `assumeState` from
// succeeding
Expand Down Expand Up @@ -386,7 +387,8 @@ class BucketManager : NonMovableOrCopyable

// Schedule a Work class that verifies the hashes of all referenced buckets
// on background threads.
std::shared_ptr<BasicWork> scheduleVerifyReferencedBucketsWork();
std::shared_ptr<BasicWork>
scheduleVerifyReferencedBucketsWork(HistoryArchiveState const& has);

Config const& getConfig() const;

Expand Down
21 changes: 14 additions & 7 deletions src/bucket/test/BucketManagerTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ TEST_CASE_VERSIONS("bucketmanager ownership", "[bucket][bucketmanager]")
}

b.reset();
app->getBucketManager().forgetUnreferencedBuckets();
app->getBucketManager().forgetUnreferencedBuckets(
app->getLedgerManager().getLastClosedLedgerHAS());
CHECK(!fs::exists(filename));
CHECK(!fs::exists(indexFilename));
};
Expand All @@ -276,7 +277,8 @@ TEST_CASE_VERSIONS("bucketmanager ownership", "[bucket][bucketmanager]")

// This shouldn't change if we forget unreferenced buckets since
// it's referenced by bucketlist.
app->getBucketManager().forgetUnreferencedBuckets();
app->getBucketManager().forgetUnreferencedBuckets(
app->getLedgerManager().getLastClosedLedgerHAS());
CHECK(b1.use_count() == 3);

// But if we mutate the curr bucket of the bucketlist, it should.
Expand Down Expand Up @@ -370,7 +372,8 @@ TEST_CASE_VERSIONS("bucketmanager reattach to finished merge",
LedgerTestUtils::generateValidLedgerEntriesWithExclusions(
{CONFIG_SETTING}, 10),
{});
bm.forgetUnreferencedBuckets();
bm.forgetUnreferencedBuckets(
app->getLedgerManager().getLastClosedLedgerHAS());
} while (!LiveBucketList::levelShouldSpill(ledger, level - 1));

// Check that the merge on level isn't committed (we're in
Expand Down Expand Up @@ -460,7 +463,8 @@ TEST_CASE_VERSIONS("bucketmanager reattach to running merge",
{CONFIG_SETTING}, 100),
{});

bm.forgetUnreferencedBuckets();
bm.forgetUnreferencedBuckets(
app->getLedgerManager().getLastClosedLedgerHAS());

HistoryArchiveState has(ledger, bl,
app->getConfig().NETWORK_PASSPHRASE);
Expand Down Expand Up @@ -544,8 +548,10 @@ TEST_CASE("bucketmanager do not leak empty-merge futures",
bl.resolveAnyReadyFutures();
std::this_thread::sleep_for(std::chrono::seconds(1));
}
bm.forgetUnreferencedBuckets();
auto bmRefBuckets = bm.getAllReferencedBuckets();
bm.forgetUnreferencedBuckets(
app->getLedgerManager().getLastClosedLedgerHAS());
auto bmRefBuckets = bm.getAllReferencedBuckets(
app->getLedgerManager().getLastClosedLedgerHAS());
auto bmDirBuckets = bm.getBucketHashesInBucketDirForTesting();

// Remove the 0 bucket in case it's "referenced"; it's never a file.
Expand Down Expand Up @@ -601,7 +607,8 @@ TEST_CASE_VERSIONS(
{CONFIG_SETTING}, 100),
{});
clock.crank(false);
bm.forgetUnreferencedBuckets();
bm.forgetUnreferencedBuckets(
app->getLedgerManager().getLastClosedLedgerHAS());
}
// We should have published nothing and have the first
// checkpoint still queued.
Expand Down
12 changes: 9 additions & 3 deletions src/ledger/LedgerManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1118,8 +1118,13 @@ LedgerManagerImpl::closeLedger(LedgerCloseData const& ledgerData,
hm.logAndUpdatePublishStatus();

// Step 6. Clean up unreferecned buckets post-apply
// TODO: thread-safety: this races with addBatch
mApp.getBucketManager().forgetUnreferencedBuckets();
{
// Ledger state might be updated at the same time, so protect GC
// call with state mutex
std::lock_guard<std::recursive_mutex> guard(mLedgerStateMutex);
mApp.getBucketManager().forgetUnreferencedBuckets(
getLastClosedLedgerHAS());
}

// Step 7. Maybe set LedgerManager into synced state, maybe let
// Herder trigger next ledger
Expand Down Expand Up @@ -1752,6 +1757,7 @@ LedgerManagerImpl::transferLedgerEntriesToBucketList(
LedgerHeader lh, uint32_t initialLedgerVers)
{
ZoneScoped;
// `ledgerClosed` protects this call with a mutex
std::vector<LedgerEntry> initEntries, liveEntries;
std::vector<LedgerKey> deadEntries;
auto blEnabled = mApp.getConfig().MODE_ENABLES_BUCKETLIST;
Expand All @@ -1764,7 +1770,6 @@ LedgerManagerImpl::transferLedgerEntriesToBucketList(
if (blEnabled &&
protocolVersionStartsFrom(initialLedgerVers, SOROBAN_PROTOCOL_VERSION))
{
std::lock_guard<std::recursive_mutex> guard(mLedgerStateMutex);
{
auto keys = ltx.getAllTTLKeysWithoutSealing();
LedgerTxn ltxEvictions(ltx);
Expand Down Expand Up @@ -1807,6 +1812,7 @@ LedgerManagerImpl::ledgerClosed(
uint32_t initialLedgerVers)
{
ZoneScoped;
std::lock_guard<std::recursive_mutex> guard(mLedgerStateMutex);
auto ledgerSeq = ltx.loadHeader().current().ledgerSeq;
auto currLedgerVers = ltx.loadHeader().current().ledgerVersion;
CLOG_TRACE(Ledger,
Expand Down
3 changes: 2 additions & 1 deletion src/main/ApplicationImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,8 @@ ApplicationImpl::gracefulStop()
// This call happens in shutdown -- before destruction -- so that we can
// be sure other subsystems (ledger etc.) are still alive and we can
// call into them to figure out which buckets _are_ referenced.
mBucketManager->forgetUnreferencedBuckets();
mBucketManager->forgetUnreferencedBuckets(
mLedgerManager->getLastClosedLedgerHAS());
mBucketManager->shutdown();
}
if (mHerder)
Expand Down
6 changes: 4 additions & 2 deletions src/main/ApplicationUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,8 @@ selfCheck(Config cfg)

// Then we scan all the buckets to check they have expected hashes.
LOG_INFO(DEFAULT_LOG, "Self-check phase 2: bucket hash verification");
auto seq2 = app->getBucketManager().scheduleVerifyReferencedBucketsWork();
auto seq2 = app->getBucketManager().scheduleVerifyReferencedBucketsWork(
app->getLedgerManager().getLastClosedLedgerHAS());
while (clock.crank(true) && !seq2->isDone())
;

Expand Down Expand Up @@ -983,7 +984,8 @@ publish(Application::pointer app)
}

// Cleanup buckets not referenced by publish queue anymore
app->getBucketManager().forgetUnreferencedBuckets();
app->getBucketManager().forgetUnreferencedBuckets(
app->getLedgerManager().getLastClosedLedgerHAS());

LOG_INFO(DEFAULT_LOG, "*");
LOG_INFO(DEFAULT_LOG, "* Publish finished.");
Expand Down
3 changes: 2 additions & 1 deletion src/simulation/CoreTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,8 @@ TEST_CASE("Bucket list entries vs write throughput", "[scalability][!hide]")
batch.GetSnapshot().get99thPercentile(), batch.max(),
(double)merges.count(), merges.max(), merges.mean()});

app->getBucketManager().forgetUnreferencedBuckets();
app->getBucketManager().forgetUnreferencedBuckets(
app->getLedgerManager().getLastClosedLedgerHAS());
}
}
}

0 comments on commit 95cab12

Please sign in to comment.