Skip to content

Commit 69cb5cb

Browse files
authoredMar 1, 2025··
Cleanups and fixes to maintenance (#4657)
Only the last commit has functional changes, which are fixes to maintenance introduced by #4543. Changes are as follows, by commit: - Resolves #4586 - Remove an old history mode which became obsolete after `in-memory` got removed - Cleanup various interfaces, there were several unused variables - Two main changes are: (1) restore maintenance of SCP history, which got removed by accident, and (2) run ledger header maintenance on the ledger close thread to avoid potential serialization conflicts during ledger close.
2 parents a299bf4 + e9fa98b commit 69cb5cb

23 files changed

+160
-306
lines changed
 

‎src/catchup/CatchupConfiguration.cpp

-30
Original file line numberDiff line numberDiff line change
@@ -10,51 +10,21 @@
1010
namespace stellar
1111
{
1212

13-
void
14-
CatchupConfiguration::checkInvariants() const
15-
{
16-
if (mMode == CatchupConfiguration::Mode::LOCAL_BUCKETS_ONLY)
17-
{
18-
releaseAssert(mHAS && mHistoryEntry);
19-
releaseAssert(toLedger() != CatchupConfiguration::CURRENT);
20-
releaseAssert(count() == 0);
21-
}
22-
else
23-
{
24-
releaseAssert(!mHAS && !mHistoryEntry);
25-
}
26-
}
27-
2813
CatchupConfiguration::CatchupConfiguration(LedgerNumHashPair ledgerHashPair,
2914
uint32_t count, Mode mode)
3015
: mCount{count}, mLedgerHashPair{ledgerHashPair}, mMode{mode}
3116
{
32-
checkInvariants();
33-
}
34-
35-
CatchupConfiguration::CatchupConfiguration(HistoryArchiveState has,
36-
LedgerHeaderHistoryEntry lhhe)
37-
: mCount(0)
38-
, mLedgerHashPair(LedgerNumHashPair(lhhe.header.ledgerSeq,
39-
std::make_optional(lhhe.hash)))
40-
, mMode(CatchupConfiguration::Mode::LOCAL_BUCKETS_ONLY)
41-
, mHAS(std::make_optional(has))
42-
, mHistoryEntry(std::make_optional(lhhe))
43-
{
44-
checkInvariants();
4517
}
4618

4719
CatchupConfiguration::CatchupConfiguration(uint32_t toLedger, uint32_t count,
4820
Mode mode)
4921
: mCount{count}, mLedgerHashPair{toLedger, std::nullopt}, mMode{mode}
5022
{
51-
checkInvariants();
5223
}
5324

5425
CatchupConfiguration
5526
CatchupConfiguration::resolve(uint32_t remoteCheckpoint) const
5627
{
57-
checkInvariants();
5828
auto cfg = *this;
5929
if (toLedger() == CatchupConfiguration::CURRENT)
6030
{

‎src/catchup/CatchupConfiguration.h

+1-28
Original file line numberDiff line numberDiff line change
@@ -48,19 +48,13 @@ class CatchupConfiguration
4848
// Do validity checks on all history archive file types for a given
4949
// range, regardless of whether files are used or not
5050
OFFLINE_COMPLETE,
51-
ONLINE,
52-
// Similar to online catchup, except in this mode there is no archive
53-
// lookup and validation, rebuild local state present on disk, while
54-
// buffering ledgers
55-
LOCAL_BUCKETS_ONLY
51+
ONLINE
5652
};
5753
static const uint32_t CURRENT = 0;
5854

5955
CatchupConfiguration(uint32_t toLedger, uint32_t count, Mode mode);
6056
CatchupConfiguration(LedgerNumHashPair ledgerHashPair, uint32_t count,
6157
Mode mode);
62-
CatchupConfiguration(HistoryArchiveState has,
63-
LedgerHeaderHistoryEntry lhhe);
6458

6559
/**
6660
* If toLedger() == CatchupConfiguration::CURRENT it replaces it with
@@ -104,31 +98,10 @@ class CatchupConfiguration
10498
return mMode == Mode::ONLINE;
10599
}
106100

107-
bool
108-
localBucketsOnly() const
109-
{
110-
return mMode == Mode::LOCAL_BUCKETS_ONLY;
111-
}
112-
113-
std::optional<HistoryArchiveState>
114-
getHAS() const
115-
{
116-
return mHAS;
117-
}
118-
119-
std::optional<LedgerHeaderHistoryEntry>
120-
getHistoryEntry() const
121-
{
122-
return mHistoryEntry;
123-
}
124-
125101
private:
126102
uint32_t mCount;
127103
LedgerNumHashPair mLedgerHashPair;
128104
Mode mMode;
129-
std::optional<HistoryArchiveState> mHAS;
130-
std::optional<LedgerHeaderHistoryEntry> mHistoryEntry;
131-
void checkInvariants() const;
132105
};
133106

134107
uint32_t parseLedger(std::string const& str);

‎src/catchup/CatchupRange.cpp

+3-9
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@ void
1717
checkCatchupPreconditions(uint32_t lastClosedLedger,
1818
CatchupConfiguration const& configuration)
1919
{
20-
if (lastClosedLedger < LedgerManager::GENESIS_LEDGER_SEQ &&
21-
!configuration.localBucketsOnly())
20+
if (lastClosedLedger < LedgerManager::GENESIS_LEDGER_SEQ)
2221
{
2322
throw std::invalid_argument{"lastClosedLedger == 0"};
2423
}
@@ -58,7 +57,7 @@ calculateCatchupRange(uint32_t lcl, CatchupConfiguration const& cfg,
5857
}
5958

6059
// All remaining cases have LCL == genesis.
61-
releaseAssert(lcl == init || cfg.localBucketsOnly());
60+
releaseAssert(lcl == init);
6261
LedgerRange fullReplay(init + 1, fullReplayCount);
6362

6463
// Case 2: full replay because count >= target - init.
@@ -70,8 +69,7 @@ calculateCatchupRange(uint32_t lcl, CatchupConfiguration const& cfg,
7069
// Case 3: special case of buckets only, no replay; only
7170
// possible when targeting the exact end of a checkpoint.
7271
if (cfg.count() == 0 && (HistoryManager::isLastLedgerInCheckpoint(
73-
cfg.toLedger(), hm.getConfig()) ||
74-
cfg.localBucketsOnly()))
72+
cfg.toLedger(), hm.getConfig())))
7573
{
7674
return CatchupRange(cfg.toLedger());
7775
}
@@ -128,10 +126,6 @@ CatchupRange::CatchupRange(uint32_t lastClosedLedger,
128126
: CatchupRange(calculateCatchupRange(lastClosedLedger, configuration,
129127
historyManager))
130128
{
131-
if (configuration.localBucketsOnly())
132-
{
133-
releaseAssert(applyBuckets());
134-
}
135129
checkInvariants();
136130
}
137131

‎src/catchup/CatchupWork.cpp

+48-93
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,6 @@ void
150150
CatchupWork::downloadVerifyLedgerChain(CatchupRange const& catchupRange,
151151
LedgerNumHashPair rangeEnd)
152152
{
153-
releaseAssert(!mCatchupConfiguration.localBucketsOnly());
154-
155153
ZoneScoped;
156154
auto verifyRange = catchupRange.getFullRangeIncludingBucketApply();
157155
releaseAssert(verifyRange.mCount != 0);
@@ -194,9 +192,8 @@ CatchupWork::downloadVerifyTxResults(CatchupRange const& catchupRange)
194192
bool
195193
CatchupWork::alreadyHaveBucketsHistoryArchiveState(uint32_t atCheckpoint) const
196194
{
197-
return mCatchupConfiguration.localBucketsOnly() ||
198-
atCheckpoint == mGetHistoryArchiveStateWork->getHistoryArchiveState()
199-
.currentLedger;
195+
return atCheckpoint ==
196+
mGetHistoryArchiveStateWork->getHistoryArchiveState().currentLedger;
200197
}
201198

202199
WorkSeqPtr
@@ -213,28 +210,23 @@ CatchupWork::downloadApplyBuckets()
213210
std::vector<std::shared_ptr<BasicWork>> seq;
214211
auto version = mApp.getConfig().LEDGER_PROTOCOL_VERSION;
215212

216-
// Download buckets, or skip if catchup is local
217-
if (!mCatchupConfiguration.localBucketsOnly())
218-
{
219-
std::vector<std::string> hashes =
220-
mBucketHAS->differingBuckets(mLocalState);
221-
auto getBuckets = std::make_shared<DownloadBucketsWork>(
222-
mApp, mBuckets, hashes, *mDownloadDir, mArchive);
223-
seq.push_back(getBuckets);
224-
225-
auto verifyHASCallback = [has = *mBucketHAS](Application& app) {
226-
if (!has.containsValidBuckets(app))
227-
{
228-
CLOG_ERROR(History, "Malformed HAS: invalid buckets");
229-
return false;
230-
}
231-
return true;
232-
};
233-
auto verifyHAS = std::make_shared<WorkWithCallback>(mApp, "verify-has",
234-
verifyHASCallback);
235-
seq.push_back(verifyHAS);
236-
version = mVerifiedLedgerRangeStart.header.ledgerVersion;
237-
}
213+
std::vector<std::string> hashes = mBucketHAS->differingBuckets(mLocalState);
214+
auto getBuckets = std::make_shared<DownloadBucketsWork>(
215+
mApp, mBuckets, hashes, *mDownloadDir, mArchive);
216+
seq.push_back(getBuckets);
217+
218+
auto verifyHASCallback = [has = *mBucketHAS](Application& app) {
219+
if (!has.containsValidBuckets(app))
220+
{
221+
CLOG_ERROR(History, "Malformed HAS: invalid buckets");
222+
return false;
223+
}
224+
return true;
225+
};
226+
auto verifyHAS = std::make_shared<WorkWithCallback>(mApp, "verify-has",
227+
verifyHASCallback);
228+
seq.push_back(verifyHAS);
229+
version = mVerifiedLedgerRangeStart.header.ledgerVersion;
238230

239231
auto applyBuckets = std::make_shared<ApplyBucketsWork>(
240232
mApp, mBuckets, *mBucketHAS, version);
@@ -291,52 +283,37 @@ BasicWork::State
291283
CatchupWork::getAndMaybeSetHistoryArchiveState()
292284
{
293285
// First, retrieve the HAS
294-
295-
// If we're just doing local catchup, set HAS right away
296-
if (mCatchupConfiguration.localBucketsOnly())
286+
if (!mGetHistoryArchiveStateWork)
287+
{
288+
auto toLedger = mCatchupConfiguration.toLedger() == 0
289+
? "CURRENT"
290+
: std::to_string(mCatchupConfiguration.toLedger());
291+
CLOG_INFO(History,
292+
"Starting catchup with configuration:\n lastClosedLedger: "
293+
"{}\n toLedger: {}\n count: {}",
294+
mApp.getLedgerManager().getLastClosedLedgerNum(), toLedger,
295+
mCatchupConfiguration.count());
296+
297+
auto toCheckpoint =
298+
mCatchupConfiguration.toLedger() == CatchupConfiguration::CURRENT
299+
? CatchupConfiguration::CURRENT
300+
: HistoryManager::checkpointContainingLedger(
301+
mCatchupConfiguration.toLedger(), mApp.getConfig());
302+
// Set retries to 10 to ensure we retry enough in case current
303+
// checkpoint isn't published yet
304+
mGetHistoryArchiveStateWork = addWork<GetHistoryArchiveStateWork>(
305+
toCheckpoint, mArchive, true, 10);
306+
mCurrentWork = mGetHistoryArchiveStateWork;
307+
return State::WORK_RUNNING;
308+
}
309+
else if (mGetHistoryArchiveStateWork->getState() != State::WORK_SUCCESS)
297310
{
298-
mHAS = getCatchupConfiguration().getHAS();
299-
releaseAssert(mHAS.has_value());
311+
return mGetHistoryArchiveStateWork->getState();
300312
}
301313
else
302314
{
303-
// Otherwise, continue with the normal catchup flow: download and verify
304-
// HAS from history archive
305-
if (!mGetHistoryArchiveStateWork)
306-
{
307-
auto toLedger =
308-
mCatchupConfiguration.toLedger() == 0
309-
? "CURRENT"
310-
: std::to_string(mCatchupConfiguration.toLedger());
311-
CLOG_INFO(
312-
History,
313-
"Starting catchup with configuration:\n lastClosedLedger: "
314-
"{}\n toLedger: {}\n count: {}",
315-
mApp.getLedgerManager().getLastClosedLedgerNum(), toLedger,
316-
mCatchupConfiguration.count());
317-
318-
auto toCheckpoint =
319-
mCatchupConfiguration.toLedger() ==
320-
CatchupConfiguration::CURRENT
321-
? CatchupConfiguration::CURRENT
322-
: HistoryManager::checkpointContainingLedger(
323-
mCatchupConfiguration.toLedger(), mApp.getConfig());
324-
// Set retries to 10 to ensure we retry enough in case current
325-
// checkpoint isn't published yet
326-
mGetHistoryArchiveStateWork = addWork<GetHistoryArchiveStateWork>(
327-
toCheckpoint, mArchive, true, 10);
328-
mCurrentWork = mGetHistoryArchiveStateWork;
329-
return State::WORK_RUNNING;
330-
}
331-
else if (mGetHistoryArchiveStateWork->getState() != State::WORK_SUCCESS)
332-
{
333-
return mGetHistoryArchiveStateWork->getState();
334-
}
335-
else
336-
{
337-
mHAS = std::make_optional<HistoryArchiveState>(
338-
mGetHistoryArchiveStateWork->getHistoryArchiveState());
339-
}
315+
mHAS = std::make_optional<HistoryArchiveState>(
316+
mGetHistoryArchiveStateWork->getHistoryArchiveState());
340317
}
341318

342319
// Second, perform some validation
@@ -448,8 +425,7 @@ CatchupWork::runCatchupStep()
448425
// Bucket and transaction processing has started
449426
if (mCatchupSeq)
450427
{
451-
releaseAssert(mDownloadVerifyLedgersSeq ||
452-
mCatchupConfiguration.localBucketsOnly());
428+
releaseAssert(mDownloadVerifyLedgersSeq);
453429
releaseAssert(mTransactionsVerifyApplySeq ||
454430
!catchupRange.replayLedgers());
455431

@@ -494,8 +470,7 @@ CatchupWork::runCatchupStep()
494470
// the node will have to catch up again and it will clear the
495471
// ledger because clearRebuildForType has not been called yet.
496472
mApp.getLedgerManager().setLastClosedLedger(
497-
mVerifiedLedgerRangeStart,
498-
!mCatchupConfiguration.localBucketsOnly());
473+
mVerifiedLedgerRangeStart);
499474
mBucketsAppliedEmitted = true;
500475
mBuckets.clear();
501476
mLastApplied =
@@ -534,22 +509,6 @@ CatchupWork::runCatchupStep()
534509
return mCatchupSeq->getState();
535510
}
536511

537-
// If we're just doing local catchup, setup bucket application and exit
538-
if (mCatchupConfiguration.localBucketsOnly())
539-
{
540-
releaseAssert(catchupRange.applyBuckets());
541-
auto lhhe = mCatchupConfiguration.getHistoryEntry();
542-
releaseAssert(lhhe);
543-
mVerifiedLedgerRangeStart = *lhhe;
544-
545-
mBucketVerifyApplySeq = downloadApplyBuckets();
546-
std::vector<std::shared_ptr<BasicWork>> seq{mBucketVerifyApplySeq};
547-
548-
mCatchupSeq = addWork<WorkSequence>("catchup-seq", seq, RETRY_NEVER);
549-
mCurrentWork = mCatchupSeq;
550-
return State::WORK_RUNNING;
551-
}
552-
553512
// Otherwise, proceed with normal flow. Still waiting for ledger headers
554513
if (mDownloadVerifyLedgersSeq)
555514
{
@@ -649,10 +608,6 @@ CatchupWork::onFailureRaise()
649608
{
650609
CLOG_WARNING(History, "Catchup failed");
651610
Work::onFailureRaise();
652-
if (mCatchupConfiguration.localBucketsOnly())
653-
{
654-
throw std::runtime_error("Unable to rebuild local state");
655-
}
656611
}
657612

658613
void

‎src/catchup/LedgerApplyManagerImpl.cpp

+2-4
Original file line numberDiff line numberDiff line change
@@ -283,10 +283,8 @@ LedgerApplyManagerImpl::startCatchup(
283283
configuration.toLedger(), lastClosedLedger));
284284
}
285285

286-
// Offline and local catchup types aren't triggered by buffered ledgers
287-
auto offlineCatchup =
288-
configuration.offline() || configuration.localBucketsOnly();
289-
releaseAssert(offlineCatchup == mSyncingLedgers.empty());
286+
// Offline catchup isn't triggered by buffered ledgers
287+
releaseAssert(configuration.offline() == mSyncingLedgers.empty());
290288

291289
releaseAssert(!mCatchupWork);
292290

‎src/herder/HerderPersistence.h

+5-5
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,18 @@ class HerderPersistence
3636
std::vector<SCPEnvelope> const& envs,
3737
QuorumTracker::QuorumMap const& qmap) = 0;
3838

39-
static size_t copySCPHistoryToStream(Database& db, soci::session& sess,
39+
static size_t copySCPHistoryToStream(soci::session& sess,
4040
uint32_t ledgerSeq,
4141
uint32_t ledgerCount,
4242
XDROutputFileStream& scpHistory);
4343
// quorum information lookup
44-
static std::optional<Hash>
45-
getNodeQuorumSet(Database& db, soci::session& sess, NodeID const& nodeID);
46-
static SCPQuorumSetPtr getQuorumSet(Database& db, soci::session& sess,
44+
static std::optional<Hash> getNodeQuorumSet(soci::session& sess,
45+
NodeID const& nodeID);
46+
static SCPQuorumSetPtr getQuorumSet(soci::session& sess,
4747
Hash const& qSetHash);
4848

4949
static void dropAll(Database& db);
50-
static void deleteOldEntries(Database& db, uint32_t ledgerSeq,
50+
static void deleteOldEntries(soci::session& sess, uint32_t ledgerSeq,
5151
uint32_t count);
5252
};
5353
}

0 commit comments

Comments
 (0)