62
62
BucketManager::initialize ()
63
63
{
64
64
ZoneScoped;
65
+ releaseAssert (threadIsMain ());
65
66
std::string d = mConfig .BUCKET_DIR_PATH ;
66
67
67
68
if (!fs::exists (d))
@@ -729,7 +730,7 @@ BucketManager::getBucketListReferencedBuckets() const
729
730
}
730
731
731
732
std::set<Hash>
732
- BucketManager::getAllReferencedBuckets () const
733
+ BucketManager::getAllReferencedBuckets (HistoryArchiveState const & has ) const
733
734
{
734
735
ZoneScoped;
735
736
auto referenced = getBucketListReferencedBuckets ();
@@ -740,8 +741,7 @@ BucketManager::getAllReferencedBuckets() const
740
741
741
742
// retain any bucket referenced by the last closed ledger as recorded in the
742
743
// database (as merges complete, the bucket list drifts from that state)
743
- auto lclHas = mApp .getLedgerManager ().getLastClosedLedgerHAS ();
744
- auto lclBuckets = lclHas.allBuckets ();
744
+ auto lclBuckets = has.allBuckets ();
745
745
for (auto const & h : lclBuckets)
746
746
{
747
747
auto rit = referenced.emplace (hexToBin256 (h));
@@ -752,39 +752,38 @@ BucketManager::getAllReferencedBuckets() const
752
752
}
753
753
754
754
// retain buckets that are referenced by a state in the publish queue.
755
- auto pub = mApp .getHistoryManager ().getBucketsReferencedByPublishQueue ();
755
+ for (auto const & h :
756
+ HistoryManager::getBucketsReferencedByPublishQueue (mApp .getConfig ()))
756
757
{
757
- for (auto const & h : pub)
758
+ auto rhash = hexToBin256 (h);
759
+ auto rit = referenced.emplace (rhash);
760
+ if (rit.second )
758
761
{
759
- auto rhash = hexToBin256 (h);
760
- auto rit = referenced.emplace (rhash);
761
- if (rit.second )
762
- {
763
- CLOG_TRACE (Bucket, " {} referenced by publish queue" , h);
764
-
765
- // Project referenced bucket `rhash` -- which might be a merge
766
- // input captured before a merge finished -- through our weak
767
- // map of merge input/output relationships, to find any outputs
768
- // we'll want to retain in order to resynthesize the merge in
769
- // the future, rather than re-run it.
770
- mFinishedMerges .getOutputsUsingInput (rhash, referenced);
771
- }
762
+ CLOG_TRACE (Bucket, " {} referenced by publish queue" , h);
763
+
764
+ // Project referenced bucket `rhash` -- which might be a merge
765
+ // input captured before a merge finished -- through our weak
766
+ // map of merge input/output relationships, to find any outputs
767
+ // we'll want to retain in order to resynthesize the merge in
768
+ // the future, rather than re-run it.
769
+ mFinishedMerges .getOutputsUsingInput (rhash, referenced);
772
770
}
773
771
}
774
772
return referenced;
775
773
}
776
774
777
775
void
778
- BucketManager::cleanupStaleFiles ()
776
+ BucketManager::cleanupStaleFiles (HistoryArchiveState const & has )
779
777
{
780
778
ZoneScoped;
779
+ releaseAssert (threadIsMain ());
781
780
if (mConfig .DISABLE_BUCKET_GC )
782
781
{
783
782
return ;
784
783
}
785
784
786
785
std::lock_guard<std::recursive_mutex> lock (mBucketMutex );
787
- auto referenced = getAllReferencedBuckets ();
786
+ auto referenced = getAllReferencedBuckets (has );
788
787
std::transform (std::begin (mSharedLiveBuckets ), std::end (mSharedLiveBuckets ),
789
788
std::inserter (referenced, std::end (referenced)),
790
789
[](std::pair<Hash, std::shared_ptr<LiveBucket>> const & p) {
@@ -818,11 +817,11 @@ BucketManager::cleanupStaleFiles()
818
817
}
819
818
820
819
void
821
- BucketManager::forgetUnreferencedBuckets ()
820
+ BucketManager::forgetUnreferencedBuckets (HistoryArchiveState const & has )
822
821
{
823
822
ZoneScoped;
824
823
std::lock_guard<std::recursive_mutex> lock (mBucketMutex );
825
- auto referenced = getAllReferencedBuckets ();
824
+ auto referenced = getAllReferencedBuckets (has );
826
825
auto blReferenced = getBucketListReferencedBuckets ();
827
826
828
827
auto bucketMapLoop = [&](auto & bucketMap, auto & futureMap) {
@@ -867,7 +866,7 @@ BucketManager::forgetUnreferencedBuckets()
867
866
Bucket,
868
867
" BucketManager::forgetUnreferencedBuckets dropping {}" ,
869
868
filename);
870
- if (!filename.empty () && !mApp . getConfig () .DISABLE_BUCKET_GC )
869
+ if (!filename.empty () && !mConfig .DISABLE_BUCKET_GC )
871
870
{
872
871
CLOG_TRACE (Bucket, " removing bucket file: {}" , filename);
873
872
std::filesystem::remove (filename);
@@ -1076,12 +1075,11 @@ BucketManager::startBackgroundEvictionScan(uint32_t ledgerSeq)
1076
1075
}
1077
1076
1078
1077
void
1079
- BucketManager::resolveBackgroundEvictionScan (AbstractLedgerTxn& ltx,
1080
- uint32_t ledgerSeq,
1081
- LedgerKeySet const & modifiedKeys)
1078
+ BucketManager::resolveBackgroundEvictionScan (
1079
+ AbstractLedgerTxn& ltx, uint32_t ledgerSeq,
1080
+ LedgerKeySet const & modifiedKeys, SorobanNetworkConfig& networkConfig )
1082
1081
{
1083
1082
ZoneScoped;
1084
- releaseAssert (threadIsMain ());
1085
1083
releaseAssert (mEvictionStatistics );
1086
1084
1087
1085
if (!mEvictionFuture .valid ())
@@ -1091,9 +1089,6 @@ BucketManager::resolveBackgroundEvictionScan(AbstractLedgerTxn& ltx,
1091
1089
1092
1090
auto evictionCandidates = mEvictionFuture .get ();
1093
1091
1094
- auto const & networkConfig =
1095
- mApp .getLedgerManager ().getSorobanNetworkConfig ();
1096
-
1097
1092
// If eviction related settings changed during the ledger, we have to
1098
1093
// restart the scan
1099
1094
if (!evictionCandidates.isValid (ledgerSeq,
@@ -1209,6 +1204,7 @@ BucketManager::assumeState(HistoryArchiveState const& has,
1209
1204
uint32_t maxProtocolVersion, bool restartMerges)
1210
1205
{
1211
1206
ZoneScoped;
1207
+ releaseAssert (threadIsMain ());
1212
1208
releaseAssertOrThrow (mConfig .MODE_ENABLES_BUCKETLIST );
1213
1209
1214
1210
// TODO: Assume archival bucket state
@@ -1257,7 +1253,7 @@ BucketManager::assumeState(HistoryArchiveState const& has,
1257
1253
mLiveBucketList ->restartMerges (mApp , maxProtocolVersion,
1258
1254
has.currentLedger );
1259
1255
}
1260
- cleanupStaleFiles ();
1256
+ cleanupStaleFiles (has );
1261
1257
}
1262
1258
1263
1259
void
@@ -1358,7 +1354,7 @@ std::shared_ptr<LiveBucket>
1358
1354
BucketManager::mergeBuckets (HistoryArchiveState const & has)
1359
1355
{
1360
1356
ZoneScoped;
1361
-
1357
+ releaseAssert ( threadIsMain ());
1362
1358
std::map<LedgerKey, LedgerEntry> ledgerMap = loadCompleteLedgerState (has);
1363
1359
BucketMetadata meta;
1364
1360
MergeCounters mc;
@@ -1548,9 +1544,11 @@ BucketManager::visitLedgerEntries(
1548
1544
}
1549
1545
1550
1546
std::shared_ptr<BasicWork>
1551
- BucketManager::scheduleVerifyReferencedBucketsWork ()
1547
+ BucketManager::scheduleVerifyReferencedBucketsWork (
1548
+ HistoryArchiveState const & has)
1552
1549
{
1553
- std::set<Hash> hashes = getAllReferencedBuckets ();
1550
+ releaseAssert (threadIsMain ());
1551
+ std::set<Hash> hashes = getAllReferencedBuckets (has);
1554
1552
std::vector<std::shared_ptr<BasicWork>> seq;
1555
1553
for (auto const & h : hashes)
1556
1554
{
0 commit comments