Skip to content

Commit 185cc52

Browse files
authored
Timeout monitor for recovery phase initializing_transaction_servers (#12396)
* Recovery init timeouts * knob for dropping * fixes based on 100K * bug fix: void -> never * bug fix: tooManyUnfinishedRecoveries * remove buggify server knob, fix gcgen in another way by reducing exp factor multiplier * self review * more self review * fmt fix * self review * address feedback
1 parent dd51006 commit 185cc52

File tree

8 files changed

+129
-14
lines changed

8 files changed

+129
-14
lines changed

fdbclient/ServerKnobs.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -845,6 +845,10 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
845845
init( CC_INVALIDATE_EXCLUDED_PROCESSES, false); if (isSimulated) CC_INVALIDATE_EXCLUDED_PROCESSES = deterministicRandom()->coinflip();
846846
init( CC_GRAY_FAILURE_STATUS_JSON, false); if (isSimulated) CC_GRAY_FAILURE_STATUS_JSON = true;
847847
init( CC_THROTTLE_SINGLETON_RERECRUIT_INTERVAL, 0.5 );
848+
init( CC_RECOVERY_INIT_REQ_TIMEOUT, 30.0 );
849+
init( CC_RECOVERY_INIT_REQ_GROWTH_FACTOR, 2.0 );
850+
init( CC_RECOVERY_INIT_REQ_MAX_TIMEOUT, 300.0 );
851+
init( CC_RECOVERY_INIT_REQ_MAX_UNFINISHED_RECOVERIES, 100 );
848852

849853
init( INCOMPATIBLE_PEERS_LOGGING_INTERVAL, 600 ); if( randomize && BUGGIFY ) INCOMPATIBLE_PEERS_LOGGING_INTERVAL = 60.0;
850854
init( EXPECTED_MASTER_FITNESS, ProcessClass::UnsetFit );

fdbclient/include/fdbclient/ServerKnobs.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -863,6 +863,15 @@ class SWIFT_CXX_IMMORTAL_SINGLETON_TYPE ServerKnobs : public KnobsImpl<ServerKno
863863
bool CC_GRAY_FAILURE_STATUS_JSON; // When enabled, returns gray failure information in machine readable status json.
864864
double CC_THROTTLE_SINGLETON_RERECRUIT_INTERVAL; // The interval to prevent re-recruiting the same singleton if a
865865
// recruiting fight between two cluster controllers occurs.
866+
double CC_RECOVERY_INIT_REQ_TIMEOUT; // Base timeout (seconds) for transaction system initialization during
867+
// recovery. Only applies to initializing_transaction_servers phase.
868+
double CC_RECOVERY_INIT_REQ_GROWTH_FACTOR; // Base of the exponential backoff calculation. The timeout is calculated
869+
// as: base_timeout * (growth_factor ^ unfinished_recoveries). Must be >
870+
// 1 and <= 10 to prevent overflow.
871+
double CC_RECOVERY_INIT_REQ_MAX_TIMEOUT; // Maximum timeout (seconds) for transaction system initialization. Only
872+
// applies to initializing_transaction_servers phase.
873+
int CC_RECOVERY_INIT_REQ_MAX_UNFINISHED_RECOVERIES; // Maximum unfinished recoveries after which transaction system
874+
// intilization timeouts above do not apply.
866875

867876
// Knobs used to select the best policy (via monte carlo)
868877
int POLICY_RATING_TESTS; // number of tests per policy (in order to compare)

fdbserver/ClusterRecovery.actor.cpp

Lines changed: 74 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
* limitations under the License.
1919
*/
2020

21+
#include <cmath>
2122
#include <utility>
2223

2324
#include "fdbclient/FDBTypes.h"
@@ -29,7 +30,9 @@
2930
#include "fdbserver/Knobs.h"
3031
#include "fdbserver/MasterInterface.h"
3132
#include "fdbserver/WaitFailure.h"
33+
#include "flow/Error.h"
3234
#include "flow/ProtocolVersion.h"
35+
#include "flow/Trace.h"
3336

3437
#include "flow/actorcompiler.h" // This must be the last #include.
3538

@@ -963,6 +966,66 @@ ACTOR Future<Standalone<CommitTransactionRef>> provisionalMaster(Reference<Clust
963966
}
964967
}
965968

969+
// Monitors the initialization of transaction system roles (commit proxies, GRV proxies, resolvers, TLogs, LogRouters)
970+
// during cluster recovery and enforces a timeout if they take too long to initialize.
971+
//
972+
// The timeout uses exponential backoff based on the number of previous failed recovery attempts:
973+
// - By default: base timeout (30s) doubles with each unfinished recovery: 30s, 60s, 120s, 240s, up to max (300s)
974+
// - This prevents rapid recovery retry loops while allowing quick initial attempts
975+
// - All timeout values are configurable via SERVER_KNOBS
976+
ACTOR Future<Void> monitorInitializingTxnSystem(int unfinishedRecoveries) {
977+
// Validate parameters to prevent overflow and ensure exponential backoff works correctly
978+
// With growth factor <= 10 and unfinishedRecoveries <= 100, max scaling factor is 10^100
979+
const bool validParameters = unfinishedRecoveries >= 1 && SERVER_KNOBS->CC_RECOVERY_INIT_REQ_TIMEOUT > 0 &&
980+
SERVER_KNOBS->CC_RECOVERY_INIT_REQ_MAX_TIMEOUT > 0 &&
981+
SERVER_KNOBS->CC_RECOVERY_INIT_REQ_GROWTH_FACTOR > 1.0 &&
982+
SERVER_KNOBS->CC_RECOVERY_INIT_REQ_GROWTH_FACTOR <= 10.0;
983+
984+
if (!validParameters) {
985+
TraceEvent(SevWarnAlways, "InitializingTxnSystemTimeoutInvalid")
986+
.detail("BaseTimeout", SERVER_KNOBS->CC_RECOVERY_INIT_REQ_TIMEOUT)
987+
.detail("GrowthFactor", SERVER_KNOBS->CC_RECOVERY_INIT_REQ_GROWTH_FACTOR)
988+
.detail("MaxTimeout", SERVER_KNOBS->CC_RECOVERY_INIT_REQ_MAX_TIMEOUT)
989+
.detail("UnfinishedRecoveries", unfinishedRecoveries)
990+
.detail("MaxUnfinishedRecoveries", SERVER_KNOBS->CC_RECOVERY_INIT_REQ_MAX_UNFINISHED_RECOVERIES);
991+
ASSERT_WE_THINK(false); // it is expected for these parameters to always be valid so we assert/crash in
992+
// simulation if that's not the case
993+
return Never();
994+
}
995+
996+
const bool tooManyUnfinishedRecoveries =
997+
unfinishedRecoveries >= SERVER_KNOBS->CC_RECOVERY_INIT_REQ_MAX_UNFINISHED_RECOVERIES;
998+
if (tooManyUnfinishedRecoveries) {
999+
TraceEvent(SevWarnAlways, "InitializingTxnSystemTimeoutTooMany")
1000+
.detail("BaseTimeout", SERVER_KNOBS->CC_RECOVERY_INIT_REQ_TIMEOUT)
1001+
.detail("GrowthFactor", SERVER_KNOBS->CC_RECOVERY_INIT_REQ_GROWTH_FACTOR)
1002+
.detail("MaxTimeout", SERVER_KNOBS->CC_RECOVERY_INIT_REQ_MAX_TIMEOUT)
1003+
.detail("UnfinishedRecoveries", unfinishedRecoveries)
1004+
.detail("MaxUnfinishedRecoveries", SERVER_KNOBS->CC_RECOVERY_INIT_REQ_MAX_UNFINISHED_RECOVERIES);
1005+
return Never(); // if there have been too many recoveries, clearly something is wrong. At this point, an
1006+
// operator needs to look into the issue rather than us relying on this timeout monitor.
1007+
// Triggering more timeouts can make the situation worse.
1008+
}
1009+
1010+
// Calculate timeout with exponential backoff
1011+
const double scalingFactor = std::pow(SERVER_KNOBS->CC_RECOVERY_INIT_REQ_GROWTH_FACTOR, unfinishedRecoveries);
1012+
const double scaledTimeout = std::min(SERVER_KNOBS->CC_RECOVERY_INIT_REQ_TIMEOUT * scalingFactor,
1013+
SERVER_KNOBS->CC_RECOVERY_INIT_REQ_MAX_TIMEOUT);
1014+
1015+
TraceEvent("InitializingTxnSystemTimeout")
1016+
.detail("BaseTimeout", SERVER_KNOBS->CC_RECOVERY_INIT_REQ_TIMEOUT)
1017+
.detail("GrowthFactor", SERVER_KNOBS->CC_RECOVERY_INIT_REQ_GROWTH_FACTOR)
1018+
.detail("MaxTimeout", SERVER_KNOBS->CC_RECOVERY_INIT_REQ_MAX_TIMEOUT)
1019+
.detail("UnfinishedRecoveries", unfinishedRecoveries)
1020+
.detail("ScalingFactor", scalingFactor)
1021+
.detail("ScaledTimeout", scaledTimeout);
1022+
1023+
wait(delay(scaledTimeout));
1024+
1025+
TraceEvent("InitializingTxnSystemTimeoutTriggered");
1026+
throw cluster_recovery_failed();
1027+
}
1028+
9661029
ACTOR Future<std::vector<Standalone<CommitTransactionRef>>> recruitEverything(
9671030
Reference<ClusterRecoveryData> self,
9681031
std::vector<StorageServerInterface>* seedServers,
@@ -1061,13 +1124,19 @@ ACTOR Future<std::vector<Standalone<CommitTransactionRef>>> recruitEverything(
10611124
.detail("RemoteDcIds", remoteDcIds)
10621125
.trackLatest(self->clusterRecoveryStateEventHolder->trackingKey);
10631126

1064-
// Actually, newSeedServers does both the recruiting and initialization of the seed servers; so if this is a brand
1065-
// new database we are sort of lying that we are past the recruitment phase. In a perfect world we would split that
1066-
// up so that the recruitment part happens above (in parallel with recruiting the transaction servers?).
1127+
// Actually, newSeedServers does both the recruiting and initialization of the seed servers; so if this is a
1128+
// brand new database we are sort of lying that we are past the recruitment phase. In a perfect world we would
1129+
// split that up so that the recruitment part happens above (in parallel with recruiting the transaction
1130+
// servers?).
10671131
wait(newSeedServers(self, recruits, seedServers));
1132+
10681133
state std::vector<Standalone<CommitTransactionRef>> confChanges;
1069-
wait(newCommitProxies(self, recruits) && newGrvProxies(self, recruits) && newResolvers(self, recruits) &&
1070-
newTLogServers(self, recruits, oldLogSystem, &confChanges));
1134+
Future<Void> txnSystemInitialized =
1135+
traceAfter(newCommitProxies(self, recruits), "CommitProxiesInitialized") &&
1136+
traceAfter(newGrvProxies(self, recruits), "GRVProxiesInitialized") &&
1137+
traceAfter(newResolvers(self, recruits), "ResolversInitialized") &&
1138+
traceAfter(newTLogServers(self, recruits, oldLogSystem, &confChanges), "TLogServersInitialized");
1139+
wait(txnSystemInitialized || monitorInitializingTxnSystem(self->controllerData->db.unfinishedRecoveries));
10711140

10721141
// Update recovery related information to the newly elected sequencer (master) process.
10731142
wait(brokenPromiseToNever(

fdbserver/TagPartitionedLogSystem.actor.cpp

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2788,6 +2788,9 @@ ACTOR Future<Void> TagPartitionedLogSystem::recruitOldLogRouters(TagPartitionedL
27882788
// Recruit log routers for old generations of the primary locality
27892789
if (tLogs->locality == locality) {
27902790
logRouterInitializationReplies.emplace_back();
2791+
TraceEvent("LogRouterInitReqSent1")
2792+
.detail("Locality", locality)
2793+
.detail("LogRouterTags", self->logRouterTags);
27912794
for (int i = 0; i < self->logRouterTags; i++) {
27922795
InitializeLogRouterRequest req;
27932796
req.recoveryCount = recoveryCount;
@@ -2798,6 +2801,7 @@ ACTOR Future<Void> TagPartitionedLogSystem::recruitOldLogRouters(TagPartitionedL
27982801
req.locality = locality;
27992802
req.recoverAt = self->recoverAt.get();
28002803
req.knownLockedTLogIds = self->knownLockedTLogIds;
2804+
req.allowDropInSim = !forRemote;
28012805
auto reply = transformErrors(
28022806
throwErrorOr(workers[nextRouter].logRouter.getReplyUnlessFailedFor(
28032807
req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
@@ -2839,6 +2843,9 @@ ACTOR Future<Void> TagPartitionedLogSystem::recruitOldLogRouters(TagPartitionedL
28392843
// Recruit log routers for old generations of the primary locality
28402844
if (tLogs->locality == locality) {
28412845
logRouterInitializationReplies.emplace_back();
2846+
TraceEvent("LogRouterInitReqSent2")
2847+
.detail("Locality", locality)
2848+
.detail("LogRouterTags", old.logRouterTags);
28422849
for (int i = 0; i < old.logRouterTags; i++) {
28432850
InitializeLogRouterRequest req;
28442851
req.recoveryCount = recoveryCount;
@@ -2848,6 +2855,7 @@ ACTOR Future<Void> TagPartitionedLogSystem::recruitOldLogRouters(TagPartitionedL
28482855
req.tLogPolicy = tLogPolicy;
28492856
req.locality = locality;
28502857
req.recoverAt = old.recoverAt;
2858+
req.allowDropInSim = !forRemote;
28512859
auto reply = transformErrors(
28522860
throwErrorOr(workers[nextRouter].logRouter.getReplyUnlessFailedFor(
28532861
req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
@@ -2860,7 +2868,7 @@ ACTOR Future<Void> TagPartitionedLogSystem::recruitOldLogRouters(TagPartitionedL
28602868
}
28612869
}
28622870

2863-
wait(waitForAll(allReplies));
2871+
wait(traceAfter(waitForAll(allReplies), "AllLogRouterRepliesReceived"));
28642872

28652873
int nextReplies = 0;
28662874
lastStart = std::numeric_limits<Version>::max();
@@ -2997,13 +3005,14 @@ ACTOR Future<Void> TagPartitionedLogSystem::newRemoteEpoch(TagPartitionedLogSyst
29973005
logSet->startVersion,
29983006
localities,
29993007
logSet->tLogPolicy,
3000-
true);
3008+
/* forRemote */ true);
30013009
}
30023010

30033011
state std::vector<Future<TLogInterface>> logRouterInitializationReplies;
30043012
const Version startVersion = oldLogSystem->logRouterTags == 0
30053013
? oldLogSystem->recoverAt.get() + 1
30063014
: std::max(self->tLogs[0]->startVersion, logSet->startVersion);
3015+
TraceEvent("LogRouterInitReqSent3").detail("Locality", remoteLocality).detail("LogRouterTags", self->logRouterTags);
30073016
for (int i = 0; i < self->logRouterTags; i++) {
30083017
InitializeLogRouterRequest req;
30093018
req.recoveryCount = recoveryCount;
@@ -3012,6 +3021,7 @@ ACTOR Future<Void> TagPartitionedLogSystem::newRemoteEpoch(TagPartitionedLogSyst
30123021
req.tLogLocalities = localities;
30133022
req.tLogPolicy = logSet->tLogPolicy;
30143023
req.locality = remoteLocality;
3024+
req.allowDropInSim = false;
30153025
TraceEvent("RemoteTLogRouterReplies", self->dbgid)
30163026
.detail("WorkerID", remoteWorkers.logRouters[i % remoteWorkers.logRouters.size()].id());
30173027
logRouterInitializationReplies.push_back(transformErrors(
@@ -3090,7 +3100,7 @@ ACTOR Future<Void> TagPartitionedLogSystem::newRemoteEpoch(TagPartitionedLogSyst
30903100

30913101
remoteTLogInitializationReplies.reserve(remoteWorkers.remoteTLogs.size());
30923102
for (int i = 0; i < remoteWorkers.remoteTLogs.size(); i++) {
3093-
TraceEvent("RemoteTLogReplies", self->dbgid).detail("WorkerID", remoteWorkers.remoteTLogs[i].id());
3103+
TraceEvent("RemoteTLogInitReqSent", self->dbgid).detail("WorkerID", remoteWorkers.remoteTLogs[i].id());
30943104
remoteTLogInitializationReplies.push_back(transformErrors(
30953105
throwErrorOr(remoteWorkers.remoteTLogs[i].tLog.getReplyUnlessFailedFor(
30963106
remoteTLogReqs[i], SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
@@ -3289,7 +3299,7 @@ ACTOR Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
32893299
logSystem->tLogs[0]->startVersion,
32903300
localities,
32913301
logSystem->tLogs[0]->tLogPolicy,
3292-
false);
3302+
/* forRemote */ false);
32933303
if (oldLogSystem->knownCommittedVersion - logSystem->tLogs[0]->startVersion >
32943304
SERVER_KNOBS->MAX_RECOVERY_VERSIONS) {
32953305
// make sure we can recover in the other DC.
@@ -3380,7 +3390,7 @@ ACTOR Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
33803390

33813391
primaryTLogReplies.reserve(recr.tLogs.size());
33823392
for (int i = 0; i < recr.tLogs.size(); i++) {
3383-
TraceEvent("PrimaryTLogReqSent", logSystem->getDebugID()).detail("WorkerID", recr.tLogs[i].id());
3393+
TraceEvent("PrimaryTLogInitReqSent", logSystem->getDebugID()).detail("WorkerID", recr.tLogs[i].id());
33843394
primaryTLogReplies.push_back(transformErrors(
33853395
throwErrorOr(recr.tLogs[i].tLog.getReplyUnlessFailedFor(
33863396
reqs[i], SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
@@ -3449,7 +3459,7 @@ ACTOR Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
34493459

34503460
satelliteInitializationReplies.reserve(recr.satelliteTLogs.size());
34513461
for (int i = 0; i < recr.satelliteTLogs.size(); i++) {
3452-
TraceEvent("PrimarySatelliteTLogReplies", logSystem->getDebugID())
3462+
TraceEvent("PrimarySatelliteTLogInitReqSent", logSystem->getDebugID())
34533463
.detail("WorkerID", recr.satelliteTLogs[i].id());
34543464
satelliteInitializationReplies.push_back(transformErrors(
34553465
throwErrorOr(recr.satelliteTLogs[i].tLog.getReplyUnlessFailedFor(

fdbserver/include/fdbserver/ClusterController.actor.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,9 @@ class ClusterControllerData {
142142
DatabaseConfiguration config; // Asynchronously updated via master registration
143143
DatabaseConfiguration fullyRecoveredConfig;
144144
Database db;
145-
int unfinishedRecoveries;
145+
int unfinishedRecoveries; // Counter tracking incomplete recovery attempts. Incremented when a new
146+
// sequencer/master is recruited, reset to 0 when recovery reaches fully_recovered.
147+
// A high value indicates multiple recovery attempts that failed to complete.
146148
bool cachePopulated;
147149
std::map<NetworkAddress, std::pair<double, OpenDatabaseRequest>> clientStatus;
148150
Future<Void> clientCounter;

fdbserver/include/fdbserver/WorkerInterface.actor.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -674,6 +674,9 @@ struct InitializeLogRouterRequest {
674674
// information from the logSystem).
675675
Optional<std::map<uint8_t, std::vector<uint16_t>>> knownLockedTLogIds =
676676
Optional<std::map<uint8_t, std::vector<uint16_t>>>();
677+
bool allowDropInSim; // Simulation-only field for fault injection testing. When true, allows the worker to
678+
// selectively drop responses to initialization messages to test recovery behavior under
679+
// partial failures. Must only be true in simulation.
677680

678681
template <class Ar>
679682
void serialize(Ar& ar) {
@@ -686,7 +689,8 @@ struct InitializeLogRouterRequest {
686689
locality,
687690
reply,
688691
recoverAt,
689-
knownLockedTLogIds);
692+
knownLockedTLogIds,
693+
allowDropInSim);
690694
}
691695
};
692696

fdbserver/worker.actor.cpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "fdbclient/FDBTypes.h"
2727
#include "fdbserver/BlobMigratorInterface.h"
2828
#include "flow/ApiVersion.h"
29+
#include "flow/Buggify.h"
2930
#include "flow/CodeProbe.h"
3031
#include "flow/IAsyncFile.h"
3132
#include "fdbrpc/Locality.h"
@@ -2194,6 +2195,14 @@ void cleanupStorageDisks(Reference<AsyncVar<ServerDBInfo>> dbInfo,
21942195
}
21952196
}
21962197

2198+
bool skipInitRspInSim(const UID workerInterfID, const bool allowDropInSim) {
2199+
const bool skip = allowDropInSim && g_network->isSimulated() && BUGGIFY_WITH_PROB(/* 1% */ 0.01);
2200+
if (skip) {
2201+
TraceEvent("SkipInitRspInSimTrue").detail("WorkerInterfID", workerInterfID);
2202+
}
2203+
return skip;
2204+
}
2205+
21972206
ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
21982207
Reference<AsyncVar<Optional<ClusterControllerFullInterface>> const> ccInterface,
21992208
LocalityData locality,
@@ -3367,7 +3376,10 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
33673376
errorForwarders.add(
33683377
zombie(recruited,
33693378
forwardError(errors, Role::LOG_ROUTER, recruited.id(), logRouter(recruited, req, dbInfo))));
3370-
req.reply.send(recruited);
3379+
3380+
if (!skipInitRspInSim(interf.id(), req.allowDropInSim)) {
3381+
req.reply.send(recruited);
3382+
}
33713383
}
33723384
when(CoordinationPingMessage m = waitNext(interf.coordinationPing.getFuture())) {
33733385
TraceEvent("CoordinationPing", interf.id())

tests/slow/GcGenerations.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@ remoteConfig = 'remote_double'
1010
max_write_transaction_life_versions = 5000000
1111
record_recover_at_in_cstate = true
1212
track_tlog_recovery = true
13+
# The GcGenerations workload intentionally builds up generations by injecting network degradation and timed
14+
# usage of process restarts. How long to inject network degradation is a sensitive value. Having
15+
# a high cc_recovery_init_req_growth_factor could mean that network degradation is healed too early, which
16+
# we do not want in this test. Therefore, a low value like 1.01 is used.
17+
cc_recovery_init_req_growth_factor = 1.01
1318

1419
[[test]]
1520
testTitle = 'GcGenerations'

0 commit comments

Comments
 (0)