diff --git a/fdbcli/LocationMetadataCommand.actor.cpp b/fdbcli/LocationMetadataCommand.actor.cpp index bf7d63e450a..9340f5446cf 100644 --- a/fdbcli/LocationMetadataCommand.actor.cpp +++ b/fdbcli/LocationMetadataCommand.actor.cpp @@ -161,11 +161,11 @@ ACTOR Future printPhysicalShardCount(Database cx) { return Void(); } -ACTOR Future printServerShards(Database cx, UID serverId) { - state Key begin = allKeys.begin; +ACTOR Future printServerShards(Database cx, UID serverId, KeyRange range = allKeys) { + state Key begin = range.begin; state int numShards = 0; - while (begin < allKeys.end) { + while (begin < range.end) { // RYW to optimize re-reading the same key ranges state Reference tr = makeReference(cx); @@ -176,7 +176,7 @@ ACTOR Future printServerShards(Database cx, UID serverId) { tr->setOption(FDBTransactionOptions::LOCK_AWARE); RangeResult serverShards = - wait(krmGetRanges(tr, serverKeysPrefixFor(serverId), KeyRangeRef(begin, allKeys.end))); + wait(krmGetRanges(tr, serverKeysPrefixFor(serverId), KeyRangeRef(begin, range.end))); for (int i = 0; i < serverShards.size() - 1; ++i) { KeyRangeRef currentRange(serverShards[i].key, serverShards[i + 1].key); @@ -184,10 +184,11 @@ ACTOR Future printServerShards(Database cx, UID serverId) { bool assigned, emptyRange; DataMoveType dataMoveType = DataMoveType::LOGICAL; decodeServerKeysValue(serverShards[i].value, assigned, emptyRange, dataMoveType, shardId); - printf("Range: %s, ShardID: %s, Assigned: %s\n", + printf("Range: %s, ShardID: %s, Assigned: %s, Empty: %s\n", Traceable::toString(currentRange).c_str(), shardId.toString().c_str(), - assigned ? "true" : "false"); + assigned ? "true" : "false", + emptyRange ? "true" : "false"); } begin = serverShards.back().key; @@ -199,6 +200,8 @@ ACTOR Future printServerShards(Database cx, UID serverId) { } } + printf("Found %d shards for server %s\n", numShards, serverId.toString().c_str()); + return Void(); } @@ -267,6 +270,12 @@ ACTOR Future locationMetadataCommandActor(Database cx, std::vector getPeer(NetworkAddress const& address); @@ -343,6 +348,11 @@ class TransportData { Int64MetricHandle countConnEstablished; Int64MetricHandle countConnClosedWithError; Int64MetricHandle countConnClosedWithoutError; + Int64MetricHandle countIncompatibleConnections; + Int64MetricHandle countConnEstablishedEvent; + Int64MetricHandle countConnectionClosedEvent; + Int64MetricHandle countIncompatibleConnectionClosedEvent; + Int64MetricHandle countIncompatibleConnectionErrorThrown; std::map> incompatiblePeers; AsyncTrigger incompatiblePeersChanged; @@ -934,6 +944,7 @@ ACTOR Future connectionKeeper(Reference self, .suppressFor(1.0) .detail("PeerAddr", self->destination) .detail("PeerAddress", self->destination); + self->transport->countConnectionClosedEvent++; } else { TraceEvent( ok ? SevInfo : SevWarnAlways, "IncompatibleConnectionClosed", conn ? conn->getDebugID() : UID()) @@ -941,7 +952,7 @@ ACTOR Future connectionKeeper(Reference self, .suppressFor(1.0) .detail("PeerAddr", self->destination) .detail("PeerAddress", self->destination); - + self->transport->countIncompatibleConnectionClosedEvent++; // Since the connection has closed, we need to check the protocol version the next time we connect self->compatible = true; } @@ -1504,6 +1515,13 @@ ACTOR static Future connectionReader(TransportData* transport, now() + FLOW_KNOBS->CONNECTION_ID_TIMEOUT; } compatible = false; + transport->countIncompatibleConnections++; + TraceEvent("IncompatibleConnectionFailEstablish", conn ? conn->getDebugID() : UID()) + .suppressFor(1.0) + .detail("Peer", conn->getPeerAddress()) + .detail("PeerAddress", conn->getPeerAddress()) + .detail("InexpensiveMultiVersionClient", + protocolVersion.hasInexpensiveMultiVersionClient()); if (!protocolVersion.hasInexpensiveMultiVersionClient()) { if (peer) { peer->protocolVersion->set(protocolVersion); @@ -1511,6 +1529,7 @@ ACTOR static Future connectionReader(TransportData* transport, // Older versions expected us to hang up. It may work even if we don't hang up here, but // it's safer to keep the old behavior. + transport->countIncompatibleConnectionErrorThrown++; throw incompatible_protocol_version(); } } else { @@ -1520,6 +1539,7 @@ ACTOR static Future connectionReader(TransportData* transport, .detail("Peer", conn->getPeerAddress()) .detail("PeerAddress", conn->getPeerAddress()) .detail("ConnectionId", connectionId); + transport->countConnEstablishedEvent++; } if (connectionId > 1) { diff --git a/fdbserver/Coordination.actor.cpp b/fdbserver/Coordination.actor.cpp index ff74ba34d4d..f182b86512e 100644 --- a/fdbserver/Coordination.actor.cpp +++ b/fdbserver/Coordination.actor.cpp @@ -19,6 +19,8 @@ */ #include +#include +#include #include "fdbclient/ConfigTransactionInterface.h" #include "fdbserver/CoordinationInterface.h" @@ -29,6 +31,7 @@ #include "fdbserver/WorkerInterface.actor.h" #include "fdbserver/Status.actor.h" #include "flow/ActorCollection.h" +#include "flow/Error.h" #include "flow/ProtocolVersion.h" #include "flow/UnitTest.h" #include "flow/IndexedSet.h" @@ -118,12 +121,91 @@ ServerCoordinators::ServerCoordinators(Reference ccr, } } -ACTOR Future localGenerationReg(GenerationRegInterface interf, OnDemandStore* pstore) { +struct CoordinatorRequestCounter { + enum class RequestType { + leaderServer_CheckDescriptorMutableRequest, + leaderServer_OpenDatabaseCoordRequest, + leaderServer_ElectionResultRequest, + leaderServer_GetLeaderRequest, + leaderServer_CandidacyRequest, + leaderServer_LeaderHeartbeatRequest, + leaderServer_ForwardRequest, + localGenerationReg_GenerationRegReadRequest, + localGenerationReg_GenerationRegWriteRequest, + }; + + std::string typeToString(RequestType type) const { + switch (type) { + case RequestType::leaderServer_CheckDescriptorMutableRequest: + return "CheckDescriptorMutableRequest"; + case RequestType::leaderServer_OpenDatabaseCoordRequest: + return "OpenDatabaseCoordRequest"; + case RequestType::leaderServer_ElectionResultRequest: + return "ElectionResultRequest"; + case RequestType::leaderServer_GetLeaderRequest: + return "GetLeaderRequest"; + case RequestType::leaderServer_CandidacyRequest: + return "CandidacyRequest"; + case RequestType::leaderServer_LeaderHeartbeatRequest: + return "LeaderHeartbeatRequest"; + case RequestType::leaderServer_ForwardRequest: + return "ForwardRequest"; + case RequestType::localGenerationReg_GenerationRegReadRequest: + return "GenerationRegReadRequest"; + case RequestType::localGenerationReg_GenerationRegWriteRequest: + return "GenerationRegWriteRequest"; + default: + return "Unknown"; + } + } + + void init() { + requestCounts.clear(); + constexpr RequestType allTypes[] = { + RequestType::leaderServer_CheckDescriptorMutableRequest, + RequestType::leaderServer_OpenDatabaseCoordRequest, + RequestType::leaderServer_ElectionResultRequest, + RequestType::leaderServer_GetLeaderRequest, + RequestType::leaderServer_CandidacyRequest, + RequestType::leaderServer_LeaderHeartbeatRequest, + RequestType::leaderServer_ForwardRequest, + RequestType::localGenerationReg_GenerationRegReadRequest, + RequestType::localGenerationReg_GenerationRegWriteRequest, + }; + for (RequestType t : allTypes) { + requestCounts[t] = 0; + } + } + + std::unordered_map requestCounts; + + void addRequest(RequestType type) { + auto it = requestCounts.find(type); + if (it == requestCounts.end()) { + requestCounts[type] = 1; + } else { + it->second++; + } + } + + void logging() { + TraceEvent e("CoordinatorRequestCounter"); + for (const auto& [type, count] : requestCounts) { + e.detail(typeToString(type), count); + } + } +}; + +ACTOR Future localGenerationReg(GenerationRegInterface interf, + OnDemandStore* pstore, + std::shared_ptr requestCounter) { state GenerationRegVal v; state OnDemandStore& store = *pstore; // SOMEDAY: concurrent access to different keys? loop choose { when(GenerationRegReadRequest _req = waitNext(interf.read.getFuture())) { + requestCounter->addRequest( + CoordinatorRequestCounter::RequestType::localGenerationReg_GenerationRegReadRequest); TraceEvent("GenerationRegReadRequest") .detail("From", _req.reply.getEndpoint().getPrimaryAddress()) .detail("K", _req.key); @@ -143,6 +225,8 @@ ACTOR Future localGenerationReg(GenerationRegInterface interf, OnDemandSto req.reply.send(GenerationRegReadReply(v.val, v.writeGen, v.readGen)); } when(GenerationRegWriteRequest _wrq = waitNext(interf.write.getFuture())) { + requestCounter->addRequest( + CoordinatorRequestCounter::RequestType::localGenerationReg_GenerationRegWriteRequest); state GenerationRegWriteRequest wrq = _wrq; Optional rawV = wait(store->readValue(wrq.kv.key)); v = rawV.present() ? BinaryReader::fromStringRef(rawV.get(), IncludeVersion()) @@ -175,7 +259,7 @@ ACTOR Future localGenerationReg(GenerationRegInterface interf, OnDemandSto TEST_CASE("/fdbserver/Coordination/localGenerationReg/simple") { state GenerationRegInterface reg; state OnDemandStore store(params.getDataDir(), deterministicRandom()->randomUniqueID(), "coordination-"); - state Future actor = localGenerationReg(reg, &store); + state Future actor = localGenerationReg(reg, &store, std::make_shared()); state Key the_key(deterministicRandom()->randomAlphaNumeric(deterministicRandom()->randomInt(0, 10))); state UniqueGeneration firstGen(0, deterministicRandom()->randomUniqueID()); @@ -624,7 +708,9 @@ StringRef getClusterDescriptor(Key key) { ACTOR Future leaderServer(LeaderElectionRegInterface interf, OnDemandStore* pStore, UID id, - Reference ccr) { + Reference ccr, + std::shared_ptr requestCounter) { + state LeaderRegisterCollection regs(pStore); state ActorCollection forwarders(false); @@ -632,12 +718,15 @@ ACTOR Future leaderServer(LeaderElectionRegInterface interf, loop choose { when(CheckDescriptorMutableRequest req = waitNext(interf.checkDescriptorMutable.getFuture())) { + requestCounter->addRequest( + CoordinatorRequestCounter::RequestType::leaderServer_CheckDescriptorMutableRequest); // Note the response returns the value of a knob enforced by checking only one coordinator. It is not // quorum based. CheckDescriptorMutableReply rep(SERVER_KNOBS->ENABLE_CROSS_CLUSTER_SUPPORT); req.reply.send(rep); } when(OpenDatabaseCoordRequest req = waitNext(interf.openDatabase.getFuture())) { + requestCounter->addRequest(CoordinatorRequestCounter::RequestType::leaderServer_OpenDatabaseCoordRequest); Optional forward = regs.getForward(req.clusterKey); if (forward.present()) { ClientDBInfo info; @@ -660,6 +749,7 @@ ACTOR Future leaderServer(LeaderElectionRegInterface interf, } } when(ElectionResultRequest req = waitNext(interf.electionResult.getFuture())) { + requestCounter->addRequest(CoordinatorRequestCounter::RequestType::leaderServer_ElectionResultRequest); Optional forward = regs.getForward(req.key); if (forward.present()) { req.reply.send(forward.get()); @@ -679,6 +769,7 @@ ACTOR Future leaderServer(LeaderElectionRegInterface interf, } } when(GetLeaderRequest req = waitNext(interf.getLeader.getFuture())) { + requestCounter->addRequest(CoordinatorRequestCounter::RequestType::leaderServer_GetLeaderRequest); Optional forward = regs.getForward(req.key); if (forward.present()) req.reply.send(forward.get()); @@ -697,6 +788,7 @@ ACTOR Future leaderServer(LeaderElectionRegInterface interf, } } when(CandidacyRequest req = waitNext(interf.candidacy.getFuture())) { + requestCounter->addRequest(CoordinatorRequestCounter::RequestType::leaderServer_CandidacyRequest); Optional forward = regs.getForward(req.key); if (forward.present()) req.reply.send(forward.get()); @@ -714,6 +806,7 @@ ACTOR Future leaderServer(LeaderElectionRegInterface interf, } } when(LeaderHeartbeatRequest req = waitNext(interf.leaderHeartbeat.getFuture())) { + requestCounter->addRequest(CoordinatorRequestCounter::RequestType::leaderServer_LeaderHeartbeatRequest); Optional forward = regs.getForward(req.key); if (forward.present()) req.reply.send(LeaderHeartbeatReply{ false }); @@ -731,6 +824,7 @@ ACTOR Future leaderServer(LeaderElectionRegInterface interf, } } when(ForwardRequest req = waitNext(interf.forward.getFuture())) { + requestCounter->addRequest(CoordinatorRequestCounter::RequestType::leaderServer_ForwardRequest); Optional forward = regs.getForward(req.key); if (forward.present()) { req.reply.send(Void()); @@ -755,6 +849,14 @@ ACTOR Future leaderServer(LeaderElectionRegInterface interf, } } +ACTOR Future coordinatorRequestCounterLogger(std::shared_ptr requestCounter) { + loop { + requestCounter->init(); + wait(delay(30.0)); + requestCounter->logging(); + } +} + ACTOR Future coordinationServer(std::string dataFolder, Reference ccr, Reference configNode, @@ -766,6 +868,10 @@ ACTOR Future coordinationServer(std::string dataFolder, state ConfigTransactionInterface configTransactionInterface; state ConfigFollowerInterface configFollowerInterface; state Future configDatabaseServer = Never(); + + state std::shared_ptr requestCounter = std::make_shared(); + state Future requestCounterLogger = coordinatorRequestCounterLogger(requestCounter); + TraceEvent("CoordinationServer", myID) .detail("MyInterfaceAddr", myInterface.read.getEndpoint().getPrimaryAddress()) .detail("Folder", dataFolder) @@ -779,8 +885,9 @@ ACTOR Future coordinationServer(std::string dataFolder, } try { - wait(localGenerationReg(myInterface, &store) || leaderServer(myLeaderInterface, &store, myID, ccr) || - store.getError() || configDatabaseServer); + wait(localGenerationReg(myInterface, &store, requestCounter) || + leaderServer(myLeaderInterface, &store, myID, ccr, requestCounter) || store.getError() || + configDatabaseServer); throw internal_error(); } catch (Error& e) { TraceEvent("CoordinationServerError", myID).errorUnsuppressed(e); diff --git a/fdbserver/networktest.actor.cpp b/fdbserver/networktest.actor.cpp index e3f27ef8527..79ef0c30511 100644 --- a/fdbserver/networktest.actor.cpp +++ b/fdbserver/networktest.actor.cpp @@ -18,6 +18,7 @@ * limitations under the License. */ +#include "flow/IRandom.h" #include "fmt/format.h" #include "fdbserver/NetworkTest.h" #include "flow/Knobs.h" @@ -358,34 +359,55 @@ struct P2PNetworkTest { // Random delay before socket writes RandomIntRange waitWriteMilliseconds; + double randomCloseMaxDelay = -1; // Maximum delay before closing a connection + + double probabilityNotCloseConn = 0; + double startTime; - int64_t bytesSent; - int64_t bytesReceived; - int sessionsIn; - int sessionsOut; - int connectErrors; - int acceptErrors; - int sessionErrors; + int64_t bytesSent = 0; + int64_t bytesReceived = 0; + int sessionsIn = 0; + int sessionsOut = 0; + int connectErrors = 0; + int acceptErrors = 0; + int sessionErrors = 0; + int handshakeReqIn = 0; + int handshakeReqOut = 0; + int handshakeDoneIn = 0; + int handshakeDoneOut = 0; Standalone msgBuffer; std::string statsString() { double elapsed = now() - startTime; - std::string s = format( - "%.2f MB/s bytes in %.2f MB/s bytes out %.2f/s completed sessions in %.2f/s completed sessions out ", - bytesReceived / elapsed / 1e6, - bytesSent / elapsed / 1e6, - sessionsIn / elapsed, - sessionsOut / elapsed); - s += format("Total Errors %d connect=%d accept=%d session=%d", - connectErrors + acceptErrors + sessionErrors, - connectErrors, - acceptErrors, - sessionErrors); + std::string s = format("%.2f MB/s in; %.2f MB/s out\nArrive: %.2f/s in HS req; %.2f/s " + "out HS req\nProcessed %.2f/s in HS req; %.2f/s out HS req\n" + "Completed Session %.2f/s in sessions; %.2f/s " + "out sessions\n", + bytesReceived / elapsed / 1e6, + bytesSent / elapsed / 1e6, + handshakeReqIn / elapsed, + handshakeReqOut / elapsed, + handshakeDoneIn / elapsed, + handshakeDoneOut / elapsed, + sessionsIn / elapsed, + sessionsOut / elapsed); + s += format("Total Errors %.2f/s ConnectError=%.2f/s AcceptError=%.2f/s SessionError=%.2f/s", + (connectErrors + acceptErrors + sessionErrors) / elapsed, + connectErrors / elapsed, + acceptErrors / elapsed, + sessionErrors / elapsed); bytesSent = 0; bytesReceived = 0; sessionsIn = 0; sessionsOut = 0; + connectErrors = 0; + acceptErrors = 0; + sessionErrors = 0; + handshakeReqIn = 0; + handshakeReqOut = 0; + handshakeDoneIn = 0; + handshakeDoneOut = 0; startTime = now(); return s; } @@ -400,10 +422,13 @@ struct P2PNetworkTest { RandomIntRange requests, RandomIntRange idleMilliseconds, RandomIntRange waitReadMilliseconds, - RandomIntRange waitWriteMilliseconds) + RandomIntRange waitWriteMilliseconds, + double randomCloseMaxDelay, + double probabilityNotCloseConn) : connectionsOut(connectionsOut), requestBytes(sendMsgBytes), replyBytes(recvMsgBytes), requests(requests), idleMilliseconds(idleMilliseconds), waitReadMilliseconds(waitReadMilliseconds), - waitWriteMilliseconds(waitWriteMilliseconds) { + waitWriteMilliseconds(waitWriteMilliseconds), randomCloseMaxDelay(randomCloseMaxDelay), + probabilityNotCloseConn(probabilityNotCloseConn) { bytesSent = 0; bytesReceived = 0; sessionsIn = 0; @@ -411,6 +436,10 @@ struct P2PNetworkTest { connectErrors = 0; acceptErrors = 0; sessionErrors = 0; + handshakeReqIn = 0; + handshakeReqOut = 0; + handshakeDoneIn = 0; + handshakeDoneOut = 0; msgBuffer = makeString(std::max(sendMsgBytes.max, recvMsgBytes.max)); if (!remoteAddresses.empty()) { @@ -494,19 +523,35 @@ struct P2PNetworkTest { return Void(); } + ACTOR static Future randomDelayedConnClose(P2PNetworkTest* self, Reference conn) { + wait(delay(deterministicRandom()->random01() * self->randomCloseMaxDelay / 1000.0)); + conn->close(); + return Void(); + } + ACTOR static Future doSession(P2PNetworkTest* self, Reference conn, bool incoming) { state int numRequests; + state Future randomClose; try { if (incoming) { + self->handshakeReqIn++; + if (self->randomCloseMaxDelay > -1) { + randomClose = randomDelayedConnClose(self, conn); + } wait(conn->acceptHandshake()); - + self->handshakeDoneIn++; // Read the number of requests for the session Standalone buf = wait(readMsg(self, conn)); ASSERT(buf.size() == sizeof(int)); numRequests = *(int*)buf.begin(); } else { + self->handshakeReqOut++; + if (self->randomCloseMaxDelay > -1) { + randomClose = randomDelayedConnClose(self, conn); + } wait(conn->connectHandshake()); + self->handshakeDoneOut++; // Pick the number of requests for the session and send it to remote numRequests = self->requests.get(); @@ -532,7 +577,9 @@ struct P2PNetworkTest { } wait(delay(self->idleMilliseconds.get() / 1e3)); - conn->close(); + if (deterministicRandom()->random01() > self->probabilityNotCloseConn) { + conn->close(); + } if (incoming) { ++self->sessionsIn; @@ -568,10 +615,8 @@ struct P2PNetworkTest { ACTOR static Future incoming(P2PNetworkTest* self, Reference listener) { state ActorCollection sessions(false); - + state uint64_t connectionCount = 0; loop { - wait(delay(0, TaskPriority::AcceptSocket)); - try { state Reference conn = wait(listener->accept()); // printf("Connected from %s\n", conn->getPeerAddress().toString().c_str()); @@ -580,6 +625,10 @@ struct P2PNetworkTest { ++self->acceptErrors; TraceEvent(SevError, "P2PIncomingError").error(e).detail("Listener", listener->getListenAddress()); } + connectionCount++; + if (connectionCount % (FLOW_KNOBS->ACCEPT_BATCH_SIZE) == 0) { + wait(delay(0, TaskPriority::AcceptSocket)); + } } } @@ -653,7 +702,9 @@ TEST_CASE(":/network/p2ptest") { params.get("requests").orDefault("10:10000"), params.get("idleMilliseconds").orDefault("0"), params.get("waitReadMilliseconds").orDefault("0"), - params.get("waitWriteMilliseconds").orDefault("0")); + params.get("waitWriteMilliseconds").orDefault("0"), + params.getDouble("randomCloseMaxDelay").orDefault(-1), + params.getDouble("probabilityNotCloseConn").orDefault(0.0)); wait(p2p.run()); return Void(); diff --git a/flow/Knobs.cpp b/flow/Knobs.cpp index 0f1b5618f1f..5ed2c3bf9b8 100644 --- a/flow/Knobs.cpp +++ b/flow/Knobs.cpp @@ -105,6 +105,12 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) { init( CONNECTION_MONITOR_INCOMING_IDLE_MULTIPLIER, 1.2 ); init( CONNECTION_MONITOR_UNREFERENCED_CLOSE_DELAY, 2.0 ); + init( TLS_HANDSHAKE_ALWAYS_BACKGROUND, false ); + init( INJECT_TLS_HANDSHAKE_BUSYNESS_SEC, 0.0 ); + init( INJECT_TLS_HANDSHAKE_VERIFY_SEC, 0.0 ); + init( TLS_CONNECTION_EARLY_CLOSE, false ); + init( TLS_FLOW_LOCK_HIGH_PRIORITY, false ); + //FlowTransport init( CONNECTION_REJECTED_MESSAGE_DELAY, 1.0 ); init( CONNECTION_ID_TIMEOUT, 600.0 ); if( randomize && BUGGIFY ) CONNECTION_ID_TIMEOUT = 60.0; diff --git a/flow/Net2.actor.cpp b/flow/Net2.actor.cpp index c4fba885cea..ef9aef8fe5e 100644 --- a/flow/Net2.actor.cpp +++ b/flow/Net2.actor.cpp @@ -22,11 +22,16 @@ #include "boost/asio/ip/address.hpp" #include "boost/system/system_error.hpp" #include "flow/Arena.h" +#include "flow/Knobs.h" #include "flow/Platform.h" +#include "flow/TaskPriority.h" #include "flow/Trace.h" #include #include +#include #include +#include +#include #ifndef BOOST_SYSTEM_NO_LIB #define BOOST_SYSTEM_NO_LIB #endif @@ -77,6 +82,7 @@ extern "C" intptr_t g_stackYieldLimit; intptr_t g_stackYieldLimit = 0; using namespace boost::asio::ip; +using std::chrono::high_resolution_clock; #if defined(__linux__) || defined(__FreeBSD__) #include @@ -113,6 +119,110 @@ DESCR struct SlowTask { int64_t numYields; // count }; +struct HandShakeMetrics { + std::chrono::nanoseconds maxDuration; + std::chrono::nanoseconds minDuration; + std::chrono::nanoseconds totalDuration; + int64_t count = 0; + + HandShakeMetrics() + : maxDuration(std::chrono::nanoseconds::zero()), minDuration(std::chrono::nanoseconds::zero()), + totalDuration(std::chrono::nanoseconds::zero()), count(0) {} + + void addMetrics(const std::chrono::nanoseconds& duration) { + if (count == 0 || maxDuration < duration) { + maxDuration = duration; + } + if (count == 0 || minDuration > duration) { + minDuration = duration; + } + if (count == 0) { + totalDuration = duration; + } else { + totalDuration += duration; + } + count++; + } + + void clear() { + maxDuration = std::chrono::nanoseconds::zero(); + minDuration = std::chrono::nanoseconds::zero(); + totalDuration = std::chrono::nanoseconds::zero(); + count = 0; + } +}; + +struct HandShakeMetricsOverview { + HandShakeMetrics clientSucceed; + HandShakeMetrics clientFailed; + HandShakeMetrics serverSucceed; + HandShakeMetrics serverFailed; + double lastLogTime = 0.0; + + void addMetrics(bool isClient, bool isSucceed, const std::chrono::nanoseconds& duration) { + if (isClient) { + if (isSucceed) { + clientSucceed.addMetrics(duration); + } else { + clientFailed.addMetrics(duration); + } + } else { + if (isSucceed) { + serverSucceed.addMetrics(duration); + } else { + serverFailed.addMetrics(duration); + } + } + } + + void clear() { + clientSucceed.clear(); + clientFailed.clear(); + serverSucceed.clear(); + serverFailed.clear(); + } + + std::string convertToString(const std::chrono::nanoseconds& duration) { + return std::to_string(std::chrono::duration_cast>(duration).count()); + } + + void logging() { + if (lastLogTime != 0 && now() - lastLogTime < 1.0) { + return; + } + TraceEvent(SevInfo, "N2_HandShakeMetricsOverview") + .detail("ClientSucceedCount", clientSucceed.count) + .detail("ClientSucceedMaxDuration", convertToString(clientSucceed.maxDuration)) + .detail("ClientSucceedMinDuration", convertToString(clientSucceed.minDuration)) + .detail("ClientSucceedTotalDuration", convertToString(clientSucceed.totalDuration)) + .detail("ClientFailedCount", clientFailed.count) + .detail("ClientFailedMaxDuration", convertToString(clientFailed.maxDuration)) + .detail("ClientFailedMinDuration", convertToString(clientFailed.minDuration)) + .detail("ClientFailedTotalDuration", convertToString(clientFailed.totalDuration)) + .detail("ServerSucceedCount", serverSucceed.count) + .detail("ServerSucceedMaxDuration", convertToString(serverSucceed.maxDuration)) + .detail("ServerSucceedMinDuration", convertToString(serverSucceed.minDuration)) + .detail("ServerSucceedTotalDuration", convertToString(serverSucceed.totalDuration)) + .detail("ServerFailedCount", serverFailed.count) + .detail("ServerFailedMaxDuration", convertToString(serverFailed.maxDuration)) + .detail("ServerFailedMinDuration", convertToString(serverFailed.minDuration)) + .detail("ServerFailedTotalDuration", convertToString(serverFailed.totalDuration)); + clear(); + return; + } +}; + +struct HandShakeMetricsOutput { + Optional duration; + bool isClient = false; + bool isSucceed = false; + + HandShakeMetricsOutput(std::chrono::nanoseconds duration, bool isClient, bool isSucceed) + : duration(duration), isClient(isClient), isSucceed(isSucceed) {} + + HandShakeMetricsOutput() = default; +}; + namespace N2 { // No indent, it's the whole file class Net2; @@ -138,6 +248,8 @@ class Net2 final : public INetwork, public INetworkConnections { void run() override; void initMetrics() override; + std::shared_ptr handShakeMetricsOverview; + // INetworkConnections interface Future> connect(NetworkAddress toAddr, tcp::socket* existingSocket = nullptr) override; Future> connectExternal(NetworkAddress toAddr) override; @@ -220,7 +332,6 @@ class Net2 final : public INetwork, public INetworkConnections { #endif bool useThreadPool; - // private: ASIOReactor reactor; @@ -241,8 +352,8 @@ class Net2 final : public INetwork, public INetworkConnections { TDMetricCollection tdmetrics; MetricCollection metrics; ChaosMetrics chaosMetrics; - // we read now() from a different thread. On Intel, reading a double is atomic anyways, but on other platforms it's - // not. For portability this should be atomic + // we read now() from a different thread. On Intel, reading a double is atomic anyways, but on other platforms + // it's not. For portability this should be atomic std::atomic currentTime; // May be accessed off the network thread, e.g. by onMainThread std::atomic stopped; @@ -308,6 +419,10 @@ class Net2 final : public INetwork, public INetworkConnections { DoubleMetricHandle countLaunchTime; DoubleMetricHandle countReactTime; BoolMetricHandle awakeMetric; + Int64MetricHandle countClientTLSHandshakesOnSideThreads; + Int64MetricHandle countClientTLSHandshakesOnMainThread; + Int64MetricHandle countServerTLSHandshakesOnSideThreads; + Int64MetricHandle countServerTLSHandshakesOnMainThread; EventMetricHandle slowTaskMetric; @@ -344,19 +459,43 @@ class BindPromise { std::variant errContext; UID errID; NetworkAddress peerAddr; + double requestTime = 0; + double startTime = 0; + double acceptTime = 0; + double acquireLockTime = 0; + double lockTime = 0; + bool hasSet = false; public: BindPromise(const char* errContext, UID errID) : errContext(errContext), errID(errID) {} BindPromise(AuditedEvent auditedEvent, UID errID) : errContext(auditedEvent), errID(errID) {} - BindPromise(BindPromise const& r) : p(r.p), errContext(r.errContext), errID(r.errID), peerAddr(r.peerAddr) {} + BindPromise(BindPromise const& r) + : p(r.p), errContext(r.errContext), errID(r.errID), peerAddr(r.peerAddr), requestTime(r.requestTime), + startTime(r.startTime), acceptTime(r.acceptTime), acquireLockTime(r.acquireLockTime), lockTime(r.lockTime), + hasSet(r.hasSet) {} BindPromise(BindPromise&& r) noexcept - : p(std::move(r.p)), errContext(r.errContext), errID(r.errID), peerAddr(r.peerAddr) {} + : p(std::move(r.p)), errContext(r.errContext), errID(r.errID), peerAddr(r.peerAddr), requestTime(r.requestTime), + startTime(r.startTime), acceptTime(r.acceptTime), acquireLockTime(r.acquireLockTime), lockTime(r.lockTime), + hasSet(r.hasSet) {} Future getFuture() const { return p.getFuture(); } NetworkAddress getPeerAddr() const { return peerAddr; } - void setPeerAddr(const NetworkAddress& addr) { peerAddr = addr; } + void setPeerAddr(const NetworkAddress& addr, + double reqTime, + double sTime, + double aTime, + double alTime, + double lTime) { + peerAddr = addr; + requestTime = reqTime; + startTime = sTime; + acceptTime = aTime; + acquireLockTime = alTime; + lockTime = lTime; + hasSet = true; + } void operator()(const boost::system::error_code& error, size_t bytesWritten = 0) { try { @@ -381,6 +520,12 @@ class BindPromise { evt.detail("PeerAddr", peerAddr); evt.detail("PeerAddress", peerAddr); } + evt.detail("Duration", now() - requestTime); + evt.detail("ProcessTime", now() - startTime); + evt.detail("AcceptToFinishTime", now() - acceptTime); + evt.detail("AcquireLockToFinishTime", now() - acquireLockTime); + evt.detail("LockToFinishTime", now() - lockTime); + evt.detail("HasSet", hasSet); } p.sendError(connection_failed()); @@ -453,8 +598,8 @@ class Connection final : public IConnection, ReferenceCounted { return f; } - // Reads as many bytes as possible from the read buffer into [begin,end) and returns the number of bytes read (might - // be 0) + // Reads as many bytes as possible from the read buffer into [begin,end) and returns the number of bytes read + // (might be 0) int read(uint8_t* begin, uint8_t* end) override { boost::system::error_code err; ++g_net2->countReads; @@ -475,8 +620,8 @@ class Connection final : public IConnection, ReferenceCounted { return size; } - // Writes as many bytes as possible from the given SendBuffer chain into the write buffer and returns the number of - // bytes written (might be 0) + // Writes as many bytes as possible from the given SendBuffer chain into the write buffer and returns the number + // of bytes written (might be 0) int write(SendBuffer const* data, int limit) override { boost::system::error_code err; ++g_net2->countWrites; @@ -485,8 +630,8 @@ class Connection final : public IConnection, ReferenceCounted { boost::iterator_range(SendBufferIterator(data, limit), SendBufferIterator()), err); if (err) { - // Since there was an error, sent's value can't be used to infer that the buffer has data and the limit is - // positive so check explicitly. + // Since there was an error, sent's value can't be used to infer that the buffer has data and the limit + // is positive so check explicitly. ASSERT(limit > 0); bool notEmpty = false; for (auto p = data; p; p = p->next) @@ -813,14 +958,35 @@ struct SSLHandshakerThread final : IThreadPoolReceiver { return std::move(o).str(); } + void setRequestTime(double reqTime) { requestTime = reqTime; } + + void setStartTime(double time) { startTime = time; } + + void setAcceptTime(double time) { acceptTime = time; } + + void setAcquireLockTime(double time) { acquireLockTime = time; } + + void setLockTime(double time) { lockTime = time; } + ThreadReturnPromise done; + ThreadReturnPromise metricsOutput; ssl_socket& socket; ssl_socket::handshake_type type; boost::system::error_code err; + double requestTime = 0.0; + double startTime = 0.0; + double acceptTime = 0.0; + double acquireLockTime = 0.0; + double lockTime = 0.0; }; void action(Handshake& h) { + auto start = high_resolution_clock::now(); try { + if (FLOW_KNOBS->INJECT_TLS_HANDSHAKE_BUSYNESS_SEC > 0) { + std::this_thread::sleep_for( + std::chrono::duration(FLOW_KNOBS->INJECT_TLS_HANDSHAKE_BUSYNESS_SEC)); + } h.socket.next_layer().non_blocking(false, h.err); if (!h.err.failed()) { h.socket.handshake(h.type, h.err); @@ -836,9 +1002,22 @@ struct SSLHandshakerThread final : IThreadPoolReceiver { .detail("PeerAddress", h.getPeerAddress()) .detail("ErrorCode", h.err.value()) .detail("ErrorMsg", h.err.message().c_str()) - .detail("BackgroundThread", true); + .detail("BackgroundThread", true) + .detail("Duration", now() - h.requestTime) + .detail("ProcessTime", now() - h.startTime) + .detail("AcceptToFinishTime", now() - h.acceptTime) + .detail("AcquireLockToFinishTime", now() - h.acquireLockTime) + .detail("LockTimeToFinish", now() - h.lockTime); + h.metricsOutput.send(HandShakeMetricsOutput( + std::chrono::duration_cast(high_resolution_clock::now() - start), + (h.type == ssl_socket::handshake_type::client), + false)); h.done.sendError(connection_failed()); } else { + h.metricsOutput.send(HandShakeMetricsOutput( + std::chrono::duration_cast(high_resolution_clock::now() - start), + (h.type == ssl_socket::handshake_type::client), + true)); h.done.send(Void()); } } catch (...) { @@ -847,7 +1026,16 @@ struct SSLHandshakerThread final : IThreadPoolReceiver { : "N2_AcceptHandshakeUnknownError"_audit) .detail("PeerAddr", h.getPeerAddress()) .detail("PeerAddress", h.getPeerAddress()) - .detail("BackgroundThread", true); + .detail("BackgroundThread", true) + .detail("Duration", now() - h.requestTime) + .detail("ProcessTime", now() - h.startTime) + .detail("AcceptToFinishTime", now() - h.acceptTime) + .detail("AcquireLockToFinishTime", now() - h.acquireLockTime) + .detail("LockTimeToFinish", now() - h.lockTime); + h.metricsOutput.send(HandShakeMetricsOutput( + std::chrono::duration_cast(high_resolution_clock::now() - start), + (h.type == ssl_socket::handshake_type::client), + false)); h.done.sendError(connection_failed()); } } @@ -863,11 +1051,15 @@ class SSLConnection final : public IConnection, ReferenceCounted explicit SSLConnection(boost::asio::io_service& io_service, Reference> context) : id(nondeterministicRandom()->randomUniqueID()), socket(io_service), ssl_sock(socket, context->mutate()), - sslContext(context), has_trusted_peer(false) {} + sslContext(context), has_trusted_peer(false) { + requestTime = now(); + } explicit SSLConnection(Reference> context, tcp::socket* existingSocket) : id(nondeterministicRandom()->randomUniqueID()), socket(std::move(*existingSocket)), - ssl_sock(socket, context->mutate()), sslContext(context) {} + ssl_sock(socket, context->mutate()), sslContext(context) { + requestTime = now(); + } // This is not part of the IConnection interface, because it is wrapped by INetwork::connect() ACTOR static Future> connect(boost::asio::io_service* ios, @@ -922,26 +1114,60 @@ class SSLConnection final : public IConnection, ReferenceCounted ACTOR static void doAcceptHandshake(Reference self, Promise connected) { state Hold holder; + state bool executeInBackground = false; + state HandShakeMetricsOutput handshakeMetricsOutput; + state Future onHandShakeMetrics; + state Future onHandshook; + state double startTime = now(); try { - Future onHandshook; ConfigureSSLStream(N2::g_net2->activeTlsPolicy, self->ssl_sock, [conn = self.getPtr()](bool verifyOk) { conn->has_trusted_peer = verifyOk; }); // If the background handshakers are not all busy, use one - if (N2::g_net2->sslPoolHandshakesInProgress < N2::g_net2->sslHandshakerThreadsStarted) { + + // FIXME: see comment elsewhere about making this the only path. + if ((FLOW_KNOBS->TLS_HANDSHAKE_ALWAYS_BACKGROUND && N2::g_net2->sslHandshakerThreadsStarted > 0) || + N2::g_net2->sslPoolHandshakesInProgress < N2::g_net2->sslHandshakerThreadsStarted) { + g_net2->countServerTLSHandshakesOnSideThreads++; holder = Hold(&N2::g_net2->sslPoolHandshakesInProgress); auto handshake = new SSLHandshakerThread::Handshake(self->ssl_sock, boost::asio::ssl::stream_base::server); + handshake->setRequestTime(self->requestTime); + handshake->setStartTime(startTime); + handshake->setAcceptTime(self->acceptTime); + handshake->setAcquireLockTime(self->acquireLockTime); + handshake->setLockTime(self->lockTime); onHandshook = handshake->done.getFuture(); + onHandShakeMetrics = handshake->metricsOutput.getFuture(); + executeInBackground = true; N2::g_net2->sslHandshakerPool->post(handshake); } else { // Otherwise use flow network thread + g_net2->countServerTLSHandshakesOnMainThread++; BindPromise p("N2_AcceptHandshakeError"_audit, self->id); - p.setPeerAddr(self->getPeerAddress()); + p.setPeerAddr(self->getPeerAddress(), + self->requestTime, + startTime, + self->acceptTime, + self->acquireLockTime, + self->lockTime); onHandshook = p.getFuture(); self->ssl_sock.async_handshake(boost::asio::ssl::stream_base::server, std::move(p)); } + if (executeInBackground) { + wait(store(handshakeMetricsOutput, onHandShakeMetrics)); + if (handshakeMetricsOutput.duration.present()) { + N2::g_net2->handShakeMetricsOverview->addMetrics(handshakeMetricsOutput.isClient, + handshakeMetricsOutput.isSucceed, + handshakeMetricsOutput.duration.get()); + N2::g_net2->handShakeMetricsOverview->logging(); + } else { + TraceEvent(SevWarn, "N2_HandshakeMetricsError", self->id) + .suppressFor(5.0) + .detail("Message", "Handshake metrics duration not set"); + } + } wait(onHandshook); wait(delay(0, TaskPriority::Handshake)); connected.send(Void()); @@ -970,7 +1196,18 @@ class SSLConnection final : public IConnection, ReferenceCounted } } - wait(g_network->networkInfo.handshakeLock->take()); + self->acquireLockTime = now(); + + wait(g_network->networkInfo.handshakeLock->take( + FLOW_KNOBS->TLS_FLOW_LOCK_HIGH_PRIORITY ? TaskPriority::HandshakeFlowLock : TaskPriority::DefaultYield)); + + self->lockTime = now(); + + if (self->lockTime - self->acquireLockTime > 2) { + self->closeSocket(); + throw connection_failed(); + } + state FlowLock::Releaser releaser(*g_network->networkInfo.handshakeLock); Promise connected; @@ -981,6 +1218,10 @@ class SSLConnection final : public IConnection, ReferenceCounted return Void(); } when(wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT))) { + TraceEvent("N2_AcceptHandshakeTimeout", self->id) + .suppressFor(1.0) + .detail("PeerAddr", self->getPeerAddress()) + .detail("PeerAddress", self->getPeerAddress()); throw connection_failed(); } } @@ -1005,26 +1246,68 @@ class SSLConnection final : public IConnection, ReferenceCounted ACTOR static void doConnectHandshake(Reference self, Promise connected) { state Hold holder; + state bool executeInBackground = false; + state HandShakeMetricsOutput handshakeMetricsOutput; + state Future onHandShakeMetrics; + state Future onHandshook; + state double startTime = now(); + try { - Future onHandshook; ConfigureSSLStream(N2::g_net2->activeTlsPolicy, self->ssl_sock, [conn = self.getPtr()](bool verifyOk) { conn->has_trusted_peer = verifyOk; }); // If the background handshakers are not all busy, use one - if (N2::g_net2->sslPoolHandshakesInProgress < N2::g_net2->sslHandshakerThreadsStarted) { + + // FIXME: this should probably be changed never to use the + // main thread, as waiting for potentially high-RTT TLS + // handshakes can delay execution of everything else that + // runs on the main thread. The cost of that (in terms of + // unpredictable system performance and reliability) is + // much, much higher than the cost a few hundred or + // thousand incremental threads. + + if ((FLOW_KNOBS->TLS_HANDSHAKE_ALWAYS_BACKGROUND && N2::g_net2->sslHandshakerThreadsStarted > 0) || + N2::g_net2->sslPoolHandshakesInProgress < N2::g_net2->sslHandshakerThreadsStarted) { + g_net2->countClientTLSHandshakesOnSideThreads++; holder = Hold(&N2::g_net2->sslPoolHandshakesInProgress); auto handshake = new SSLHandshakerThread::Handshake(self->ssl_sock, boost::asio::ssl::stream_base::client); + handshake->setRequestTime(self->requestTime); + handshake->setStartTime(startTime); + handshake->setAcceptTime(self->acceptTime); + handshake->setAcquireLockTime(self->acquireLockTime); + handshake->setLockTime(self->lockTime); onHandshook = handshake->done.getFuture(); + onHandShakeMetrics = handshake->metricsOutput.getFuture(); + executeInBackground = true; N2::g_net2->sslHandshakerPool->post(handshake); } else { // Otherwise use flow network thread + g_net2->countClientTLSHandshakesOnMainThread++; BindPromise p("N2_ConnectHandshakeError"_audit, self->id); - p.setPeerAddr(self->getPeerAddress()); + p.setPeerAddr(self->getPeerAddress(), + self->requestTime, + startTime, + self->acceptTime, + self->acquireLockTime, + self->lockTime); onHandshook = p.getFuture(); self->ssl_sock.async_handshake(boost::asio::ssl::stream_base::client, std::move(p)); } + if (executeInBackground) { + wait(store(handshakeMetricsOutput, onHandShakeMetrics)); + if (handshakeMetricsOutput.duration.present()) { + N2::g_net2->handShakeMetricsOverview->addMetrics(handshakeMetricsOutput.isClient, + handshakeMetricsOutput.isSucceed, + handshakeMetricsOutput.duration.get()); + N2::g_net2->handShakeMetricsOverview->logging(); + } else { + TraceEvent(SevWarn, "N2_HandshakeMetricsError", self->id) + .suppressFor(5.0) + .detail("Message", "Connect handshake metrics duration not set"); + } + } wait(onHandshook); wait(delay(0, TaskPriority::Handshake)); connected.send(Void()); @@ -1035,7 +1318,13 @@ class SSLConnection final : public IConnection, ReferenceCounted } ACTOR static Future connectHandshakeWrapper(Reference self) { - wait(g_network->networkInfo.handshakeLock->take()); + self->acquireLockTime = now(); + + wait(g_network->networkInfo.handshakeLock->take( + FLOW_KNOBS->TLS_FLOW_LOCK_HIGH_PRIORITY ? TaskPriority::HandshakeFlowLock : TaskPriority::DefaultYield)); + + self->lockTime = now(); + state FlowLock::Releaser releaser(*g_network->networkInfo.handshakeLock); Promise connected; @@ -1046,6 +1335,10 @@ class SSLConnection final : public IConnection, ReferenceCounted return Void(); } when(wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT))) { + TraceEvent("N2_ConnectHandshakeTimeout", self->id) + .suppressFor(1.0) + .detail("PeerAddr", self->getPeerAddress()) + .detail("PeerAddress", self->getPeerAddress()); throw connection_failed(); } } @@ -1086,8 +1379,8 @@ class SSLConnection final : public IConnection, ReferenceCounted return f; } - // Reads as many bytes as possible from the read buffer into [begin,end) and returns the number of bytes read (might - // be 0) + // Reads as many bytes as possible from the read buffer into [begin,end) and returns the number of bytes read + // (might be 0) int read(uint8_t* begin, uint8_t* end) override { boost::system::error_code err; ++g_net2->countReads; @@ -1108,12 +1401,12 @@ class SSLConnection final : public IConnection, ReferenceCounted return size; } - // Writes as many bytes as possible from the given SendBuffer chain into the write buffer and returns the number of - // bytes written (might be 0) + // Writes as many bytes as possible from the given SendBuffer chain into the write buffer and returns the number + // of bytes written (might be 0) int write(SendBuffer const* data, int limit) override { #ifdef __APPLE__ - // For some reason, writing ssl_sock with more than 2016 bytes when socket is writeable sometimes results in a - // broken pipe error. + // For some reason, writing ssl_sock with more than 2016 bytes when socket is writeable sometimes results in + // a broken pipe error. limit = std::min(limit, 2016); #endif boost::system::error_code err; @@ -1123,8 +1416,8 @@ class SSLConnection final : public IConnection, ReferenceCounted boost::iterator_range(SendBufferIterator(data, limit), SendBufferIterator()), err); if (err) { - // Since there was an error, sent's value can't be used to infer that the buffer has data and the limit is - // positive so check explicitly. + // Since there was an error, sent's value can't be used to infer that the buffer has data and the limit + // is positive so check explicitly. ASSERT(limit > 0); bool notEmpty = false; for (auto p = data; p; p = p->next) @@ -1157,6 +1450,14 @@ class SSLConnection final : public IConnection, ReferenceCounted ssl_socket& getSSLSocket() { return ssl_sock; } + double requestTime = 0.0; + + double acceptTime = 0.0; + + double acquireLockTime = 0.0; + + double lockTime = 0.0; + private: UID id; tcp::socket socket; @@ -1243,6 +1544,8 @@ class SSLListener final : public IListener, ReferenceCounted { auto f = p.getFuture(); self->acceptor.async_accept(conn->getSocket(), peer_endpoint, std::move(p)); wait(f); + + conn->acceptTime = now(); auto peer_address = peer_endpoint.address().is_v6() ? IPAddress(peer_endpoint.address().to_v6().to_bytes()) : IPAddress(peer_endpoint.address().to_v4().to_ulong()); @@ -1269,6 +1572,8 @@ Net2::Net2(const TLSConfig& tlsConfig, bool useThreadPool, bool useMetrics) // Until run() is called, yield() will always yield TraceEvent("Net2Starting").log(); + handShakeMetricsOverview = std::make_shared(); + // Set the global members if (useMetrics) { setGlobal(INetwork::enTDMetrics, (flowGlobalType)&tdmetrics); @@ -1433,6 +1738,10 @@ void Net2::initMetrics() { slowTaskMetric.init("Net2.SlowTask"_sr); countLaunchTime.init("Net2.CountLaunchTime"_sr); countReactTime.init("Net2.CountReactTime"_sr); + countClientTLSHandshakesOnSideThreads.init("Net2.CountClientTLSHandshakesOnSideThreads"_sr); + countClientTLSHandshakesOnMainThread.init("Net2.CountClientTLSHandshakesOnMainThread"_sr); + countServerTLSHandshakesOnSideThreads.init("Net2.CountServerTLSHandshakesOnSideThreads"_sr); + countServerTLSHandshakesOnMainThread.init("Net2.CountServerTLSHandshakesOnMainThread"_sr); taskQueue.initMetrics(); } @@ -2267,7 +2576,7 @@ TEST_CASE("noSim/flow/Net2/onMainThreadFIFO") { return Void(); } -void net2_test(){ +void net2_test() { /* g_network = newNet2(); // for promise serialization below diff --git a/flow/SystemMonitor.cpp b/flow/SystemMonitor.cpp index a7a3a5042b1..2141427aa59 100644 --- a/flow/SystemMonitor.cpp +++ b/flow/SystemMonitor.cpp @@ -172,6 +172,14 @@ SystemStatistics customSystemMonitor(std::string const& eventName, StatisticsSta .detail("ConnectionsEstablished", (double)(netData.countConnEstablished - statState->networkState.countConnEstablished) / currentStats.elapsed) + .detail("ConnectionsEstablishedCount", + netData.countConnEstablished - statState->networkState.countConnEstablished) + .detail( + "ConnectionEstablishedEvent", + (double)(netData.countConnEstablishedEvent - statState->networkState.countConnEstablishedEvent) / + currentStats.elapsed) + .detail("ConnectionEstablishedEventCount", + netData.countConnEstablishedEvent - statState->networkState.countConnEstablishedEvent) .detail("ConnectionsClosed", ((netData.countConnClosedWithError - statState->networkState.countConnClosedWithError) + (netData.countConnClosedWithoutError - statState->networkState.countConnClosedWithoutError)) / @@ -182,6 +190,49 @@ SystemStatistics customSystemMonitor(std::string const& eventName, StatisticsSta .detail("TLSPolicyFailures", (netData.countTLSPolicyFailures - statState->networkState.countTLSPolicyFailures) / currentStats.elapsed) + .detail("ClientTLSHandshakesOnSideThreads", + (netData.countClientTLSHandshakesOnSideThreads - + statState->networkState.countClientTLSHandshakesOnSideThreads) / + currentStats.elapsed) + .detail("ClientTLSHandshakesOnMainThread", + (netData.countClientTLSHandshakesOnMainThread - + statState->networkState.countClientTLSHandshakesOnMainThread) / + currentStats.elapsed) + .detail("ServerTLSHandshakesOnSideThreads", + (netData.countServerTLSHandshakesOnSideThreads - + statState->networkState.countServerTLSHandshakesOnSideThreads) / + currentStats.elapsed) + .detail("ServerTLSHandshakesOnMainThread", + (netData.countServerTLSHandshakesOnMainThread - + statState->networkState.countServerTLSHandshakesOnMainThread) / + currentStats.elapsed) + .detail("IncompatibleConnections", + (double)(netData.countIncompatibleConnections - + statState->networkState.countIncompatibleConnections) / + currentStats.elapsed) + .detail("IncompatibleConnectionsCount", + netData.countIncompatibleConnections - statState->networkState.countIncompatibleConnections) + .detail( + "ConnectionClosedEvent", + (double)(netData.countConnectionClosedEvent - statState->networkState.countConnectionClosedEvent) / + currentStats.elapsed) + .detail("ConnectionClosedEventCount", + netData.countConnectionClosedEvent - statState->networkState.countConnectionClosedEvent) + .detail("IncompatibleConnectionClosedEvent", + (double)(netData.countIncompatibleConnectionClosedEvent - + statState->networkState.countIncompatibleConnectionClosedEvent) / + currentStats.elapsed) + .detail("IncompatibleConnectionClosedEventCount", + netData.countIncompatibleConnectionClosedEvent - + statState->networkState.countIncompatibleConnectionClosedEvent) + .detail("IncompatibleConnectionErrorThrown", + (double)(netData.countIncompatibleConnectionErrorThrown - + statState->networkState.countIncompatibleConnectionErrorThrown) / + currentStats.elapsed) + .detail("IncompatibleConnectionErrorThrownCount", + netData.countIncompatibleConnectionErrorThrown - + statState->networkState.countIncompatibleConnectionErrorThrown) + .trackLatest(eventName); TraceEvent("MemoryMetrics") diff --git a/flow/TLSConfig.actor.cpp b/flow/TLSConfig.actor.cpp index c23685bdffa..8e59093b206 100644 --- a/flow/TLSConfig.actor.cpp +++ b/flow/TLSConfig.actor.cpp @@ -42,6 +42,7 @@ TLSPolicy::~TLSPolicy() {} #include #include #include +#include #include "flow/Platform.h" #include "flow/IAsyncFile.h" @@ -147,6 +148,9 @@ void ConfigureSSLStream(Reference policy, std::function callback) { try { stream.set_verify_callback([policy, callback](bool preverified, boost::asio::ssl::verify_context& ctx) { + if (FLOW_KNOBS->INJECT_TLS_HANDSHAKE_VERIFY_SEC > 0) { + std::this_thread::sleep_for(std::chrono::duration(FLOW_KNOBS->INJECT_TLS_HANDSHAKE_VERIFY_SEC)); + } bool success = policy->verify_peer(preverified, ctx.native_handle()); if (!success) { if (policy->on_failure) diff --git a/flow/include/flow/Knobs.h b/flow/include/flow/Knobs.h index 816135236c5..4154da0780e 100644 --- a/flow/include/flow/Knobs.h +++ b/flow/include/flow/Knobs.h @@ -126,6 +126,12 @@ class FlowKnobs : public KnobsImpl { double CLIENT_REQUEST_INTERVAL; double SERVER_REQUEST_INTERVAL; + bool TLS_HANDSHAKE_ALWAYS_BACKGROUND; + double INJECT_TLS_HANDSHAKE_BUSYNESS_SEC; + double INJECT_TLS_HANDSHAKE_VERIFY_SEC; + bool TLS_CONNECTION_EARLY_CLOSE; + bool TLS_FLOW_LOCK_HIGH_PRIORITY; + int DISABLE_ASSERTS; double QUEUE_MODEL_SMOOTHING_AMOUNT; diff --git a/flow/include/flow/SystemMonitor.h b/flow/include/flow/SystemMonitor.h index 01ab39477b7..a3047b2ef1f 100644 --- a/flow/include/flow/SystemMonitor.h +++ b/flow/include/flow/SystemMonitor.h @@ -90,11 +90,20 @@ struct NetworkData { int64_t countFilePageCacheMisses; int64_t countFilePageCacheEvictions; int64_t countConnEstablished; + int64_t countConnEstablishedEvent; int64_t countConnClosedWithError; int64_t countConnClosedWithoutError; int64_t countTLSPolicyFailures; double countLaunchTime; double countReactTime; + int64_t countClientTLSHandshakesOnSideThreads; + int64_t countClientTLSHandshakesOnMainThread; + int64_t countServerTLSHandshakesOnSideThreads; + int64_t countServerTLSHandshakesOnMainThread; + int64_t countIncompatibleConnections; + int64_t countConnectionClosedEvent; + int64_t countIncompatibleConnectionClosedEvent; + int64_t countIncompatibleConnectionErrorThrown; void init() { bytesSent = Int64Metric::getValueOrDefault("Net2.BytesSent"_sr); @@ -118,6 +127,7 @@ struct NetworkData { countYieldCallsTrue = Int64Metric::getValueOrDefault("Net2.CountYieldCallsTrue"_sr); countRunLoopProfilingSignals = Int64Metric::getValueOrDefault("Net2.CountRunLoopProfilingSignals"_sr); countConnEstablished = Int64Metric::getValueOrDefault("Net2.CountConnEstablished"_sr); + countConnEstablishedEvent = Int64Metric::getValueOrDefault("Net2.CountConnEstablishedEvent"_sr); countConnClosedWithError = Int64Metric::getValueOrDefault("Net2.CountConnClosedWithError"_sr); countConnClosedWithoutError = Int64Metric::getValueOrDefault("Net2.CountConnClosedWithoutError"_sr); countTLSPolicyFailures = Int64Metric::getValueOrDefault("Net2.CountTLSPolicyFailures"_sr); @@ -137,6 +147,20 @@ struct NetworkData { countFilePageCacheHits = Int64Metric::getValueOrDefault("AsyncFile.CountCachePageReadsHit"_sr); countFilePageCacheMisses = Int64Metric::getValueOrDefault("AsyncFile.CountCachePageReadsMissed"_sr); countFilePageCacheEvictions = Int64Metric::getValueOrDefault("EvictablePageCache.CacheEvictions"_sr); + countClientTLSHandshakesOnSideThreads = + Int64Metric::getValueOrDefault("Net2.CountClientTLSHandshakesOnSideThreads"_sr); + countClientTLSHandshakesOnMainThread = + Int64Metric::getValueOrDefault("Net2.CountClientTLSHandshakesOnMainThread"_sr); + countServerTLSHandshakesOnSideThreads = + Int64Metric::getValueOrDefault("Net2.CountServerTLSHandshakesOnSideThreads"_sr); + countServerTLSHandshakesOnMainThread = + Int64Metric::getValueOrDefault("Net2.CountServerTLSHandshakesOnMainThread"_sr); + countIncompatibleConnections = Int64Metric::getValueOrDefault("Net2.CountIncompatibleConnections"_sr); + countConnectionClosedEvent = Int64Metric::getValueOrDefault("Net2.CountConnectionClosedEvent"_sr); + countIncompatibleConnectionClosedEvent = + Int64Metric::getValueOrDefault("Net2.CountIncompatibleConnectionClosedEvent"_sr); + countIncompatibleConnectionErrorThrown = + Int64Metric::getValueOrDefault("Net2.CountIncompatibleConnectionErrorThrown"_sr); } }; diff --git a/flow/include/flow/TaskPriority.h b/flow/include/flow/TaskPriority.h index e7d53e6fd31..809fef9a6c0 100644 --- a/flow/include/flow/TaskPriority.h +++ b/flow/include/flow/TaskPriority.h @@ -27,6 +27,7 @@ enum class TaskPriority { ASIOReactor = 20001, RunCycleFunction = 20000, FlushTrace = 10500, + HandshakeFlowLock = 10400, WriteSocket = 10000, PollEIO = 9900, DiskIOComplete = 9150,