Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions fdbcli/LocationMetadataCommand.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,11 @@ ACTOR Future<Void> printPhysicalShardCount(Database cx) {
return Void();
}

ACTOR Future<Void> printServerShards(Database cx, UID serverId) {
state Key begin = allKeys.begin;
ACTOR Future<Void> printServerShards(Database cx, UID serverId, KeyRange range = allKeys) {
state Key begin = range.begin;
state int numShards = 0;

while (begin < allKeys.end) {
while (begin < range.end) {
// RYW to optimize re-reading the same key ranges
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);

Expand All @@ -176,18 +176,19 @@ ACTOR Future<Void> printServerShards(Database cx, UID serverId) {
tr->setOption(FDBTransactionOptions::LOCK_AWARE);

RangeResult serverShards =
wait(krmGetRanges(tr, serverKeysPrefixFor(serverId), KeyRangeRef(begin, allKeys.end)));
wait(krmGetRanges(tr, serverKeysPrefixFor(serverId), KeyRangeRef(begin, range.end)));

for (int i = 0; i < serverShards.size() - 1; ++i) {
KeyRangeRef currentRange(serverShards[i].key, serverShards[i + 1].key);
UID shardId;
bool assigned, emptyRange;
DataMoveType dataMoveType = DataMoveType::LOGICAL;
decodeServerKeysValue(serverShards[i].value, assigned, emptyRange, dataMoveType, shardId);
printf("Range: %s, ShardID: %s, Assigned: %s\n",
printf("Range: %s, ShardID: %s, Assigned: %s, Empty: %s\n",
Traceable<KeyRangeRef>::toString(currentRange).c_str(),
shardId.toString().c_str(),
assigned ? "true" : "false");
assigned ? "true" : "false",
emptyRange ? "true" : "false");
}

begin = serverShards.back().key;
Expand All @@ -199,6 +200,8 @@ ACTOR Future<Void> printServerShards(Database cx, UID serverId) {
}
}

printf("Found %d shards for server %s\n", numShards, serverId.toString().c_str());

return Void();
}

Expand Down Expand Up @@ -267,6 +270,12 @@ ACTOR Future<bool> locationMetadataCommandActor(Database cx, std::vector<StringR
return false;
}
wait(printServerShards(cx, UID::fromString(tokens[2].toString())));
} else if (tokencmp(tokens[1], "perftestservershards")) {
if (tokens.size() != 3) {
printUsage(tokens[0]);
return false;
}
wait(printServerShards(cx, UID::fromString(tokens[2].toString()), perfTestKeys));
} else if (tokencmp(tokens[1], "listshards")) {
if (tokens.size() == 4 && !tokencmp(tokens[3], "physical")) {
printUsage(tokens[0]);
Expand Down
3 changes: 3 additions & 0 deletions fdbclient/SystemData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1184,6 +1184,9 @@ const KeyRef tagThrottleAutoEnabledKey = "\xff\x02/throttledTags/autoThrottlingE
const KeyRef tagThrottleLimitKey = "\xff\x02/throttledTags/manualThrottleLimit"_sr;
const KeyRef tagThrottleCountKey = "\xff\x02/throttledTags/manualThrottleCount"_sr;

const KeyRangeRef perfTestKeys =
KeyRangeRef("\x02perf\x00\x02Random\x00\x15\x01"_sr, "\x02perf\x00\x02Random\x00\x15\x02"_sr);

// Client status info prefix
const KeyRangeRef fdbClientInfoPrefixRange("\xff\x02/fdbClientInfo/"_sr, "\xff\x02/fdbClientInfo0"_sr);
// See remaining fields in GlobalConfig.actor.h
Expand Down
2 changes: 2 additions & 0 deletions fdbclient/include/fdbclient/SystemData.h
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,8 @@ extern const KeyRef tagThrottleCountKey;
extern const KeyRangeRef tagQuotaKeys;
extern const KeyRef tagQuotaPrefix;

extern const KeyRangeRef perfTestKeys;

// Log Range constant variables
// Used in the backup pipeline to track mutations
// \xff/logRanges/[16-byte UID][begin key] := serialize( make_pair([end key], [destination key prefix]),
Expand Down
22 changes: 21 additions & 1 deletion fdbrpc/FlowTransport.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,11 @@ class TransportData {
countConnEstablished.init("Net2.CountConnEstablished"_sr);
countConnClosedWithError.init("Net2.CountConnClosedWithError"_sr);
countConnClosedWithoutError.init("Net2.CountConnClosedWithoutError"_sr);
countIncompatibleConnections.init("Net2.CountIncompatibleConnections"_sr);
countConnEstablishedEvent.init("Net2.CountConnEstablishedEvent"_sr);
countConnectionClosedEvent.init("Net2.CountConnectionClosedEvent"_sr);
countIncompatibleConnectionClosedEvent.init("Net2.CountIncompatibleConnectionClosedEvent"_sr);
countIncompatibleConnectionErrorThrown.init("Net2.CountIncompatibleConnectionErrorThrown"_sr);
}

Reference<struct Peer> getPeer(NetworkAddress const& address);
Expand Down Expand Up @@ -343,6 +348,11 @@ class TransportData {
Int64MetricHandle countConnEstablished;
Int64MetricHandle countConnClosedWithError;
Int64MetricHandle countConnClosedWithoutError;
Int64MetricHandle countIncompatibleConnections;
Int64MetricHandle countConnEstablishedEvent;
Int64MetricHandle countConnectionClosedEvent;
Int64MetricHandle countIncompatibleConnectionClosedEvent;
Int64MetricHandle countIncompatibleConnectionErrorThrown;

std::map<NetworkAddress, std::pair<uint64_t, double>> incompatiblePeers;
AsyncTrigger incompatiblePeersChanged;
Expand Down Expand Up @@ -934,14 +944,15 @@ ACTOR Future<Void> connectionKeeper(Reference<Peer> self,
.suppressFor(1.0)
.detail("PeerAddr", self->destination)
.detail("PeerAddress", self->destination);
self->transport->countConnectionClosedEvent++;
} else {
TraceEvent(
ok ? SevInfo : SevWarnAlways, "IncompatibleConnectionClosed", conn ? conn->getDebugID() : UID())
.errorUnsuppressed(e)
.suppressFor(1.0)
.detail("PeerAddr", self->destination)
.detail("PeerAddress", self->destination);

self->transport->countIncompatibleConnectionClosedEvent++;
// Since the connection has closed, we need to check the protocol version the next time we connect
self->compatible = true;
}
Expand Down Expand Up @@ -1504,13 +1515,21 @@ ACTOR static Future<Void> connectionReader(TransportData* transport,
now() + FLOW_KNOBS->CONNECTION_ID_TIMEOUT;
}
compatible = false;
transport->countIncompatibleConnections++;
TraceEvent("IncompatibleConnectionFailEstablish", conn ? conn->getDebugID() : UID())
.suppressFor(1.0)
.detail("Peer", conn->getPeerAddress())
.detail("PeerAddress", conn->getPeerAddress())
.detail("InexpensiveMultiVersionClient",
protocolVersion.hasInexpensiveMultiVersionClient());
if (!protocolVersion.hasInexpensiveMultiVersionClient()) {
if (peer) {
peer->protocolVersion->set(protocolVersion);
}

// Older versions expected us to hang up. It may work even if we don't hang up here, but
// it's safer to keep the old behavior.
transport->countIncompatibleConnectionErrorThrown++;
throw incompatible_protocol_version();
}
} else {
Expand All @@ -1520,6 +1539,7 @@ ACTOR static Future<Void> connectionReader(TransportData* transport,
.detail("Peer", conn->getPeerAddress())
.detail("PeerAddress", conn->getPeerAddress())
.detail("ConnectionId", connectionId);
transport->countConnEstablishedEvent++;
}

if (connectionId > 1) {
Expand Down
117 changes: 112 additions & 5 deletions fdbserver/Coordination.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
*/

#include <cstdint>
#include <memory>
#include <unordered_map>

#include "fdbclient/ConfigTransactionInterface.h"
#include "fdbserver/CoordinationInterface.h"
Expand All @@ -29,6 +31,7 @@
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/Status.actor.h"
#include "flow/ActorCollection.h"
#include "flow/Error.h"
#include "flow/ProtocolVersion.h"
#include "flow/UnitTest.h"
#include "flow/IndexedSet.h"
Expand Down Expand Up @@ -118,12 +121,91 @@ ServerCoordinators::ServerCoordinators(Reference<IClusterConnectionRecord> ccr,
}
}

ACTOR Future<Void> localGenerationReg(GenerationRegInterface interf, OnDemandStore* pstore) {
struct CoordinatorRequestCounter {
enum class RequestType {
leaderServer_CheckDescriptorMutableRequest,
leaderServer_OpenDatabaseCoordRequest,
leaderServer_ElectionResultRequest,
leaderServer_GetLeaderRequest,
leaderServer_CandidacyRequest,
leaderServer_LeaderHeartbeatRequest,
leaderServer_ForwardRequest,
localGenerationReg_GenerationRegReadRequest,
localGenerationReg_GenerationRegWriteRequest,
};

std::string typeToString(RequestType type) const {
switch (type) {
case RequestType::leaderServer_CheckDescriptorMutableRequest:
return "CheckDescriptorMutableRequest";
case RequestType::leaderServer_OpenDatabaseCoordRequest:
return "OpenDatabaseCoordRequest";
case RequestType::leaderServer_ElectionResultRequest:
return "ElectionResultRequest";
case RequestType::leaderServer_GetLeaderRequest:
return "GetLeaderRequest";
case RequestType::leaderServer_CandidacyRequest:
return "CandidacyRequest";
case RequestType::leaderServer_LeaderHeartbeatRequest:
return "LeaderHeartbeatRequest";
case RequestType::leaderServer_ForwardRequest:
return "ForwardRequest";
case RequestType::localGenerationReg_GenerationRegReadRequest:
return "GenerationRegReadRequest";
case RequestType::localGenerationReg_GenerationRegWriteRequest:
return "GenerationRegWriteRequest";
default:
return "Unknown";
}
}

void init() {
requestCounts.clear();
constexpr RequestType allTypes[] = {
RequestType::leaderServer_CheckDescriptorMutableRequest,
RequestType::leaderServer_OpenDatabaseCoordRequest,
RequestType::leaderServer_ElectionResultRequest,
RequestType::leaderServer_GetLeaderRequest,
RequestType::leaderServer_CandidacyRequest,
RequestType::leaderServer_LeaderHeartbeatRequest,
RequestType::leaderServer_ForwardRequest,
RequestType::localGenerationReg_GenerationRegReadRequest,
RequestType::localGenerationReg_GenerationRegWriteRequest,
};
for (RequestType t : allTypes) {
requestCounts[t] = 0;
}
}

std::unordered_map<RequestType, uint64_t> requestCounts;

void addRequest(RequestType type) {
auto it = requestCounts.find(type);
if (it == requestCounts.end()) {
requestCounts[type] = 1;
} else {
it->second++;
}
}

void logging() {
TraceEvent e("CoordinatorRequestCounter");
for (const auto& [type, count] : requestCounts) {
e.detail(typeToString(type), count);
}
}
};

ACTOR Future<Void> localGenerationReg(GenerationRegInterface interf,
OnDemandStore* pstore,
std::shared_ptr<CoordinatorRequestCounter> requestCounter) {
state GenerationRegVal v;
state OnDemandStore& store = *pstore;
// SOMEDAY: concurrent access to different keys?
loop choose {
when(GenerationRegReadRequest _req = waitNext(interf.read.getFuture())) {
requestCounter->addRequest(
CoordinatorRequestCounter::RequestType::localGenerationReg_GenerationRegReadRequest);
TraceEvent("GenerationRegReadRequest")
.detail("From", _req.reply.getEndpoint().getPrimaryAddress())
.detail("K", _req.key);
Expand All @@ -143,6 +225,8 @@ ACTOR Future<Void> localGenerationReg(GenerationRegInterface interf, OnDemandSto
req.reply.send(GenerationRegReadReply(v.val, v.writeGen, v.readGen));
}
when(GenerationRegWriteRequest _wrq = waitNext(interf.write.getFuture())) {
requestCounter->addRequest(
CoordinatorRequestCounter::RequestType::localGenerationReg_GenerationRegWriteRequest);
state GenerationRegWriteRequest wrq = _wrq;
Optional<Value> rawV = wait(store->readValue(wrq.kv.key));
v = rawV.present() ? BinaryReader::fromStringRef<GenerationRegVal>(rawV.get(), IncludeVersion())
Expand Down Expand Up @@ -175,7 +259,7 @@ ACTOR Future<Void> localGenerationReg(GenerationRegInterface interf, OnDemandSto
TEST_CASE("/fdbserver/Coordination/localGenerationReg/simple") {
state GenerationRegInterface reg;
state OnDemandStore store(params.getDataDir(), deterministicRandom()->randomUniqueID(), "coordination-");
state Future<Void> actor = localGenerationReg(reg, &store);
state Future<Void> actor = localGenerationReg(reg, &store, std::make_shared<CoordinatorRequestCounter>());
state Key the_key(deterministicRandom()->randomAlphaNumeric(deterministicRandom()->randomInt(0, 10)));

state UniqueGeneration firstGen(0, deterministicRandom()->randomUniqueID());
Expand Down Expand Up @@ -624,20 +708,25 @@ StringRef getClusterDescriptor(Key key) {
ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf,
OnDemandStore* pStore,
UID id,
Reference<IClusterConnectionRecord> ccr) {
Reference<IClusterConnectionRecord> ccr,
std::shared_ptr<CoordinatorRequestCounter> requestCounter) {

state LeaderRegisterCollection regs(pStore);
state ActorCollection forwarders(false);

wait(LeaderRegisterCollection::init(&regs));

loop choose {
when(CheckDescriptorMutableRequest req = waitNext(interf.checkDescriptorMutable.getFuture())) {
requestCounter->addRequest(
CoordinatorRequestCounter::RequestType::leaderServer_CheckDescriptorMutableRequest);
// Note the response returns the value of a knob enforced by checking only one coordinator. It is not
// quorum based.
CheckDescriptorMutableReply rep(SERVER_KNOBS->ENABLE_CROSS_CLUSTER_SUPPORT);
req.reply.send(rep);
}
when(OpenDatabaseCoordRequest req = waitNext(interf.openDatabase.getFuture())) {
requestCounter->addRequest(CoordinatorRequestCounter::RequestType::leaderServer_OpenDatabaseCoordRequest);
Optional<LeaderInfo> forward = regs.getForward(req.clusterKey);
if (forward.present()) {
ClientDBInfo info;
Expand All @@ -660,6 +749,7 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf,
}
}
when(ElectionResultRequest req = waitNext(interf.electionResult.getFuture())) {
requestCounter->addRequest(CoordinatorRequestCounter::RequestType::leaderServer_ElectionResultRequest);
Optional<LeaderInfo> forward = regs.getForward(req.key);
if (forward.present()) {
req.reply.send(forward.get());
Expand All @@ -679,6 +769,7 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf,
}
}
when(GetLeaderRequest req = waitNext(interf.getLeader.getFuture())) {
requestCounter->addRequest(CoordinatorRequestCounter::RequestType::leaderServer_GetLeaderRequest);
Optional<LeaderInfo> forward = regs.getForward(req.key);
if (forward.present())
req.reply.send(forward.get());
Expand All @@ -697,6 +788,7 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf,
}
}
when(CandidacyRequest req = waitNext(interf.candidacy.getFuture())) {
requestCounter->addRequest(CoordinatorRequestCounter::RequestType::leaderServer_CandidacyRequest);
Optional<LeaderInfo> forward = regs.getForward(req.key);
if (forward.present())
req.reply.send(forward.get());
Expand All @@ -714,6 +806,7 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf,
}
}
when(LeaderHeartbeatRequest req = waitNext(interf.leaderHeartbeat.getFuture())) {
requestCounter->addRequest(CoordinatorRequestCounter::RequestType::leaderServer_LeaderHeartbeatRequest);
Optional<LeaderInfo> forward = regs.getForward(req.key);
if (forward.present())
req.reply.send(LeaderHeartbeatReply{ false });
Expand All @@ -731,6 +824,7 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf,
}
}
when(ForwardRequest req = waitNext(interf.forward.getFuture())) {
requestCounter->addRequest(CoordinatorRequestCounter::RequestType::leaderServer_ForwardRequest);
Optional<LeaderInfo> forward = regs.getForward(req.key);
if (forward.present()) {
req.reply.send(Void());
Expand All @@ -755,6 +849,14 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf,
}
}

ACTOR Future<Void> coordinatorRequestCounterLogger(std::shared_ptr<CoordinatorRequestCounter> requestCounter) {
loop {
requestCounter->init();
wait(delay(30.0));
requestCounter->logging();
}
}

ACTOR Future<Void> coordinationServer(std::string dataFolder,
Reference<IClusterConnectionRecord> ccr,
Reference<ConfigNode> configNode,
Expand All @@ -766,6 +868,10 @@ ACTOR Future<Void> coordinationServer(std::string dataFolder,
state ConfigTransactionInterface configTransactionInterface;
state ConfigFollowerInterface configFollowerInterface;
state Future<Void> configDatabaseServer = Never();

state std::shared_ptr<CoordinatorRequestCounter> requestCounter = std::make_shared<CoordinatorRequestCounter>();
state Future<Void> requestCounterLogger = coordinatorRequestCounterLogger(requestCounter);

TraceEvent("CoordinationServer", myID)
.detail("MyInterfaceAddr", myInterface.read.getEndpoint().getPrimaryAddress())
.detail("Folder", dataFolder)
Expand All @@ -779,8 +885,9 @@ ACTOR Future<Void> coordinationServer(std::string dataFolder,
}

try {
wait(localGenerationReg(myInterface, &store) || leaderServer(myLeaderInterface, &store, myID, ccr) ||
store.getError() || configDatabaseServer);
wait(localGenerationReg(myInterface, &store, requestCounter) ||
leaderServer(myLeaderInterface, &store, myID, ccr, requestCounter) || store.getError() ||
configDatabaseServer);
throw internal_error();
} catch (Error& e) {
TraceEvent("CoordinationServerError", myID).errorUnsuppressed(e);
Expand Down
Loading