From f11621625c26769294fe8d4217e7358de9e7db6e Mon Sep 17 00:00:00 2001 From: Zhe Wang Date: Fri, 17 Oct 2025 10:25:41 -0500 Subject: [PATCH 1/6] fix handshake timeout mechanism --- fdbserver/networktest.actor.cpp | 41 +++++++++++++++++++++++++++++++++ flow/Knobs.cpp | 1 + flow/Net2.actor.cpp | 38 +++++++++++++++++++++++++++++- flow/include/flow/Knobs.h | 1 + 4 files changed, 80 insertions(+), 1 deletion(-) diff --git a/fdbserver/networktest.actor.cpp b/fdbserver/networktest.actor.cpp index e3f27ef8527..7200e4a9057 100644 --- a/fdbserver/networktest.actor.cpp +++ b/fdbserver/networktest.actor.cpp @@ -627,6 +627,47 @@ struct P2PNetworkTest { } } + ACTOR static Future run_impl_simple(P2PNetworkTest* self) { + state ActorCollection actors(false); + + self->startTime = now(); + + fmt::print("{0} listeners, {1} remotes, {2} outgoing connections\n", + self->listeners.size(), + self->remotes.size(), + self->connectionsOut); + + for (auto n : self->remotes) { + printf("Remote: %s\n", n.toString().c_str()); + } + + for (auto el : self->listeners) { + printf("Listener: %s\n", el->getListenAddress().toString().c_str()); + } + + if (!self->listeners.empty()) { + state Reference conn1 = wait(self->listeners[0]->accept()); + printf("Server: connected from %s\n", conn1->getPeerAddress().toString().c_str()); + try { + wait(conn1->acceptHandshake()); + printf("Server: connected from %s, handshake done\n", conn1->getPeerAddress().toString().c_str()); + } catch (Error& e) { + printf("Server: handshake error %s\n", e.what()); + } + threadSleep(11.0); + return Void(); + } + + if (!self->remotes.empty()) { + state Reference conn2 = wait(INetworkConnections::net()->connect(self->remotes[0])); + printf("Client: connected to %s\n", self->remotes[0].toString().c_str()); + wait(conn2->connectHandshake()); + printf("Client: connected to %s, handshake done\n", self->remotes[0].toString().c_str()); + } + + return Void(); + } + Future run() { return run_impl(this); } }; diff --git a/flow/Knobs.cpp b/flow/Knobs.cpp index 8cf956eaa55..e7cb61df508 100644 --- a/flow/Knobs.cpp +++ b/flow/Knobs.cpp @@ -137,6 +137,7 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) { init( TLS_HANDSHAKE_LIMIT, 1000 ); init( DISABLE_MAINTHREAD_TLS_HANDSHAKE, false ); init( TLS_HANDSHAKE_FLOWLOCK_PRIORITY, static_cast(TaskPriority::DefaultYield) ); + init( TLS_HANDSHAKE_TIMEOUT_SECONDS, 2.0 ); // 0 -> no timeout init( NETWORK_TEST_CLIENT_COUNT, 30 ); init( NETWORK_TEST_REPLY_SIZE, 600e3 ); init( NETWORK_TEST_REQUEST_COUNT, 0 ); // 0 -> run forever diff --git a/flow/Net2.actor.cpp b/flow/Net2.actor.cpp index 9220a753439..dad4debcd00 100644 --- a/flow/Net2.actor.cpp +++ b/flow/Net2.actor.cpp @@ -831,10 +831,20 @@ struct SSLHandshakerThread final : IThreadPoolReceiver { void setPeerAddr(const NetworkAddress& addr) { peerAddr = addr; } + void setTimeout(double seconds) { + timeval timeout; + timeout.tv_sec = seconds; + timeout.tv_usec = 0; + int nativeSock = socket.next_layer().native_handle(); + setsockopt(nativeSock, SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast(&timeout), sizeof(timeout)); + setsockopt(nativeSock, SOL_SOCKET, SO_SNDTIMEO, reinterpret_cast(&timeout), sizeof(timeout)); + } + ThreadReturnPromise done; ssl_socket& socket; ssl_socket::handshake_type type; boost::system::error_code err; + double timeoutSecond = 0; NetworkAddress peerAddr; }; @@ -842,7 +852,13 @@ struct SSLHandshakerThread final : IThreadPoolReceiver { try { h.socket.next_layer().non_blocking(false, h.err); if (!h.err.failed()) { + if (h.timeoutSecond > 0) { + h.setTimeout(h.timeoutSecond); + } h.socket.handshake(h.type, h.err); + if (h.timeoutSecond > 0) { + h.setTimeout(0); // reset + } } if (!h.err.failed()) { h.socket.next_layer().non_blocking(true, h.err); @@ -959,6 +975,13 @@ class SSLConnection final : public IConnection, ReferenceCounted auto handshake = new SSLHandshakerThread::Handshake(self->ssl_sock, boost::asio::ssl::stream_base::server); handshake->setPeerAddr(self->getPeerAddress()); + if (FLOW_KNOBS->TLS_HANDSHAKE_TIMEOUT_SECONDS > 0) { + handshake->timeoutSecond = std::max(FLOW_KNOBS->TLS_HANDSHAKE_TIMEOUT_SECONDS, + FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT * 1.5); + // Mutiplying by 1.5 to ensure the syscall timeout happens after the ssl shutdown + } else { + handshake->timeoutSecond = 0; + } onHandshook = handshake->done.getFuture(); N2::g_net2->sslHandshakerPool->post(handshake); } else { @@ -1012,6 +1035,9 @@ class SSLConnection final : public IConnection, ReferenceCounted } when(wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT))) { g_net2->countServerTLSHandshakesTimedout++; + TraceEvent("N2_AcceptHandshakeTimeout", self->id) + .suppressFor(1.0) + .detail("PeerAddress", self->getPeerAddress()); throw connection_failed(); } } @@ -1058,6 +1084,13 @@ class SSLConnection final : public IConnection, ReferenceCounted auto handshake = new SSLHandshakerThread::Handshake(self->ssl_sock, boost::asio::ssl::stream_base::client); handshake->setPeerAddr(self->getPeerAddress()); + if (FLOW_KNOBS->TLS_HANDSHAKE_TIMEOUT_SECONDS > 0) { + handshake->timeoutSecond = std::max(FLOW_KNOBS->TLS_HANDSHAKE_TIMEOUT_SECONDS, + FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT * 1.5); + // Mutiplying by 1.5 to ensure the syscall timeout happens after the ssl shutdown + } else { + handshake->timeoutSecond = 0; + } onHandshook = handshake->done.getFuture(); N2::g_net2->sslHandshakerPool->post(handshake); } else { @@ -1092,6 +1125,9 @@ class SSLConnection final : public IConnection, ReferenceCounted } when(wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT))) { g_net2->countClientTLSHandshakesTimedout++; + TraceEvent("N2_ConnectHandshakeTimeout", self->id) + .suppressFor(1.0) + .detail("PeerAddress", self->getPeerAddress()); throw connection_failed(); } } @@ -2323,7 +2359,7 @@ TEST_CASE("noSim/flow/Net2/onMainThreadFIFO") { return Void(); } -void net2_test(){ +void net2_test() { /* g_network = newNet2(); // for promise serialization below diff --git a/flow/include/flow/Knobs.h b/flow/include/flow/Knobs.h index 2043a5acc9d..c6c789d32ce 100644 --- a/flow/include/flow/Knobs.h +++ b/flow/include/flow/Knobs.h @@ -203,6 +203,7 @@ class FlowKnobs : public KnobsImpl { int TLS_HANDSHAKE_LIMIT; bool DISABLE_MAINTHREAD_TLS_HANDSHAKE; int TLS_HANDSHAKE_FLOWLOCK_PRIORITY; + double TLS_HANDSHAKE_TIMEOUT_SECONDS; int NETWORK_TEST_CLIENT_COUNT; int NETWORK_TEST_REPLY_SIZE; From 185be67d3e4e87c7f0c9071a91b3479085dded87 Mon Sep 17 00:00:00 2001 From: Zhe Wang Date: Fri, 17 Oct 2025 10:27:22 -0500 Subject: [PATCH 2/6] revert knob --- flow/Knobs.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/Knobs.cpp b/flow/Knobs.cpp index e7cb61df508..20d31ea6c91 100644 --- a/flow/Knobs.cpp +++ b/flow/Knobs.cpp @@ -137,7 +137,7 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) { init( TLS_HANDSHAKE_LIMIT, 1000 ); init( DISABLE_MAINTHREAD_TLS_HANDSHAKE, false ); init( TLS_HANDSHAKE_FLOWLOCK_PRIORITY, static_cast(TaskPriority::DefaultYield) ); - init( TLS_HANDSHAKE_TIMEOUT_SECONDS, 2.0 ); // 0 -> no timeout + init( TLS_HANDSHAKE_TIMEOUT_SECONDS, 0 ); // 0 -> no timeout init( NETWORK_TEST_CLIENT_COUNT, 30 ); init( NETWORK_TEST_REPLY_SIZE, 600e3 ); init( NETWORK_TEST_REQUEST_COUNT, 0 ); // 0 -> run forever From 0d44ad1dee78d27380108aee7aa47b9e908ae7a0 Mon Sep 17 00:00:00 2001 From: Zhe Wang Date: Fri, 17 Oct 2025 12:43:54 -0500 Subject: [PATCH 3/6] add debugId to background handshakes --- flow/Net2.actor.cpp | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/flow/Net2.actor.cpp b/flow/Net2.actor.cpp index dad4debcd00..a230a360755 100644 --- a/flow/Net2.actor.cpp +++ b/flow/Net2.actor.cpp @@ -831,6 +831,8 @@ struct SSLHandshakerThread final : IThreadPoolReceiver { void setPeerAddr(const NetworkAddress& addr) { peerAddr = addr; } + void setDebugId(const UID& id) { debugId = id; } + void setTimeout(double seconds) { timeval timeout; timeout.tv_sec = seconds; @@ -846,6 +848,7 @@ struct SSLHandshakerThread final : IThreadPoolReceiver { boost::system::error_code err; double timeoutSecond = 0; NetworkAddress peerAddr; + UID debugId; }; void action(Handshake& h) { @@ -866,7 +869,8 @@ struct SSLHandshakerThread final : IThreadPoolReceiver { if (h.err.failed()) { TraceEvent(SevWarn, h.type == ssl_socket::handshake_type::client ? "N2_ConnectHandshakeError"_audit - : "N2_AcceptHandshakeError"_audit) + : "N2_AcceptHandshakeError"_audit, + h.debugId) .detail("PeerAddr", h.getPeerAddress()) .detail("PeerAddress", h.getPeerAddress()) .detail("PeerEndPoint", h.getPeerEndPointAddress()) @@ -880,7 +884,8 @@ struct SSLHandshakerThread final : IThreadPoolReceiver { } catch (...) { TraceEvent(SevWarn, h.type == ssl_socket::handshake_type::client ? "N2_ConnectHandshakeUnknownError"_audit - : "N2_AcceptHandshakeUnknownError"_audit) + : "N2_AcceptHandshakeUnknownError"_audit, + h.debugId) .detail("PeerAddr", h.getPeerAddress()) .detail("PeerAddress", h.getPeerAddress()) .detail("PeerEndPoint", h.getPeerEndPointAddress()) @@ -975,6 +980,7 @@ class SSLConnection final : public IConnection, ReferenceCounted auto handshake = new SSLHandshakerThread::Handshake(self->ssl_sock, boost::asio::ssl::stream_base::server); handshake->setPeerAddr(self->getPeerAddress()); + handshake->setDebugId(self->id); if (FLOW_KNOBS->TLS_HANDSHAKE_TIMEOUT_SECONDS > 0) { handshake->timeoutSecond = std::max(FLOW_KNOBS->TLS_HANDSHAKE_TIMEOUT_SECONDS, FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT * 1.5); @@ -1084,6 +1090,7 @@ class SSLConnection final : public IConnection, ReferenceCounted auto handshake = new SSLHandshakerThread::Handshake(self->ssl_sock, boost::asio::ssl::stream_base::client); handshake->setPeerAddr(self->getPeerAddress()); + handshake->setDebugId(self->id); if (FLOW_KNOBS->TLS_HANDSHAKE_TIMEOUT_SECONDS > 0) { handshake->timeoutSecond = std::max(FLOW_KNOBS->TLS_HANDSHAKE_TIMEOUT_SECONDS, FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT * 1.5); From 8934a4cda6e9787e11db977ab6516227bf29e686 Mon Sep 17 00:00:00 2001 From: Zhe Wang Date: Fri, 17 Oct 2025 17:28:59 -0500 Subject: [PATCH 4/6] set handshake timeout should on main thread --- flow/Net2.actor.cpp | 56 ++++++++++++++++++++++++++------------------- 1 file changed, 32 insertions(+), 24 deletions(-) diff --git a/flow/Net2.actor.cpp b/flow/Net2.actor.cpp index a230a360755..da3fbe0daef 100644 --- a/flow/Net2.actor.cpp +++ b/flow/Net2.actor.cpp @@ -833,20 +833,10 @@ struct SSLHandshakerThread final : IThreadPoolReceiver { void setDebugId(const UID& id) { debugId = id; } - void setTimeout(double seconds) { - timeval timeout; - timeout.tv_sec = seconds; - timeout.tv_usec = 0; - int nativeSock = socket.next_layer().native_handle(); - setsockopt(nativeSock, SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast(&timeout), sizeof(timeout)); - setsockopt(nativeSock, SOL_SOCKET, SO_SNDTIMEO, reinterpret_cast(&timeout), sizeof(timeout)); - } - ThreadReturnPromise done; ssl_socket& socket; ssl_socket::handshake_type type; boost::system::error_code err; - double timeoutSecond = 0; NetworkAddress peerAddr; UID debugId; }; @@ -855,13 +845,7 @@ struct SSLHandshakerThread final : IThreadPoolReceiver { try { h.socket.next_layer().non_blocking(false, h.err); if (!h.err.failed()) { - if (h.timeoutSecond > 0) { - h.setTimeout(h.timeoutSecond); - } h.socket.handshake(h.type, h.err); - if (h.timeoutSecond > 0) { - h.setTimeout(0); // reset - } } if (!h.err.failed()) { h.socket.next_layer().non_blocking(true, h.err); @@ -962,8 +946,25 @@ class SSLConnection final : public IConnection, ReferenceCounted init(); } + void setHandshakeTimeout(double seconds, const UID& debugId) { + timeval timeout; + timeout.tv_sec = seconds; + timeout.tv_usec = 0; + if (!ssl_sock.next_layer().is_open()) { + TraceEvent(SevWarn, "N2_SetSSLSocketTimeoutError", debugId) + .detail("PeerAddr", peer_address) + .detail("PeerAddress", peer_address) + .detail("Message", "Invalid native socket handle"); + return; + } + int nativeSock = ssl_sock.next_layer().native_handle(); + setsockopt(nativeSock, SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast(&timeout), sizeof(timeout)); + setsockopt(nativeSock, SOL_SOCKET, SO_SNDTIMEO, reinterpret_cast(&timeout), sizeof(timeout)); + } + ACTOR static void doAcceptHandshake(Reference self, Promise connected) { state Hold holder; + state bool hasSetHandshakeTimeout = false; try { Future onHandshook; @@ -982,11 +983,11 @@ class SSLConnection final : public IConnection, ReferenceCounted handshake->setPeerAddr(self->getPeerAddress()); handshake->setDebugId(self->id); if (FLOW_KNOBS->TLS_HANDSHAKE_TIMEOUT_SECONDS > 0) { - handshake->timeoutSecond = std::max(FLOW_KNOBS->TLS_HANDSHAKE_TIMEOUT_SECONDS, - FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT * 1.5); + double timeoutSecond = std::max(FLOW_KNOBS->TLS_HANDSHAKE_TIMEOUT_SECONDS, + FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT * 1.5); + self->setHandshakeTimeout(timeoutSecond, self->id); + hasSetHandshakeTimeout = true; // Mutiplying by 1.5 to ensure the syscall timeout happens after the ssl shutdown - } else { - handshake->timeoutSecond = 0; } onHandshook = handshake->done.getFuture(); N2::g_net2->sslHandshakerPool->post(handshake); @@ -999,6 +1000,9 @@ class SSLConnection final : public IConnection, ReferenceCounted self->ssl_sock.async_handshake(boost::asio::ssl::stream_base::server, std::move(p)); } wait(onHandshook); + if (hasSetHandshakeTimeout) { + self->setHandshakeTimeout(0, self->id); + } wait(delay(0, TaskPriority::Handshake)); connected.send(Void()); } catch (...) { @@ -1067,6 +1071,7 @@ class SSLConnection final : public IConnection, ReferenceCounted ACTOR static void doConnectHandshake(Reference self, Promise connected) { state Hold holder; + state bool hasSetHandshakeTimeout = false; try { Future onHandshook; @@ -1092,11 +1097,11 @@ class SSLConnection final : public IConnection, ReferenceCounted handshake->setPeerAddr(self->getPeerAddress()); handshake->setDebugId(self->id); if (FLOW_KNOBS->TLS_HANDSHAKE_TIMEOUT_SECONDS > 0) { - handshake->timeoutSecond = std::max(FLOW_KNOBS->TLS_HANDSHAKE_TIMEOUT_SECONDS, - FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT * 1.5); + double timeoutSecond = std::max(FLOW_KNOBS->TLS_HANDSHAKE_TIMEOUT_SECONDS, + FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT * 1.5); + self->setHandshakeTimeout(timeoutSecond, self->id); + hasSetHandshakeTimeout = true; // Mutiplying by 1.5 to ensure the syscall timeout happens after the ssl shutdown - } else { - handshake->timeoutSecond = 0; } onHandshook = handshake->done.getFuture(); N2::g_net2->sslHandshakerPool->post(handshake); @@ -1109,6 +1114,9 @@ class SSLConnection final : public IConnection, ReferenceCounted self->ssl_sock.async_handshake(boost::asio::ssl::stream_base::client, std::move(p)); } wait(onHandshook); + if (hasSetHandshakeTimeout) { + self->setHandshakeTimeout(0, self->id); + } wait(delay(0, TaskPriority::Handshake)); connected.send(Void()); } catch (...) { From 0ee76ca22ab7123cbea2ac8adcc1ab621f5deeea Mon Sep 17 00:00:00 2001 From: Zhe Wang Date: Fri, 17 Oct 2025 17:42:26 -0500 Subject: [PATCH 5/6] nits --- flow/Net2.actor.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flow/Net2.actor.cpp b/flow/Net2.actor.cpp index da3fbe0daef..74e3683b089 100644 --- a/flow/Net2.actor.cpp +++ b/flow/Net2.actor.cpp @@ -987,7 +987,7 @@ class SSLConnection final : public IConnection, ReferenceCounted FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT * 1.5); self->setHandshakeTimeout(timeoutSecond, self->id); hasSetHandshakeTimeout = true; - // Mutiplying by 1.5 to ensure the syscall timeout happens after the ssl shutdown + // Mutiplying by 1.5 to ensure timeout never happens before ssl shutdown } onHandshook = handshake->done.getFuture(); N2::g_net2->sslHandshakerPool->post(handshake); @@ -1001,7 +1001,7 @@ class SSLConnection final : public IConnection, ReferenceCounted } wait(onHandshook); if (hasSetHandshakeTimeout) { - self->setHandshakeTimeout(0, self->id); + self->setHandshakeTimeout(0, self->id); // reset } wait(delay(0, TaskPriority::Handshake)); connected.send(Void()); @@ -1101,7 +1101,7 @@ class SSLConnection final : public IConnection, ReferenceCounted FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT * 1.5); self->setHandshakeTimeout(timeoutSecond, self->id); hasSetHandshakeTimeout = true; - // Mutiplying by 1.5 to ensure the syscall timeout happens after the ssl shutdown + // Mutiplying by 1.5 to ensure timeout never happens before ssl shutdown } onHandshook = handshake->done.getFuture(); N2::g_net2->sslHandshakerPool->post(handshake); @@ -1115,7 +1115,7 @@ class SSLConnection final : public IConnection, ReferenceCounted } wait(onHandshook); if (hasSetHandshakeTimeout) { - self->setHandshakeTimeout(0, self->id); + self->setHandshakeTimeout(0, self->id); // reset } wait(delay(0, TaskPriority::Handshake)); connected.send(Void()); From 026f36f6ec67e4b62343d7856daa6374d8ec8a1b Mon Sep 17 00:00:00 2001 From: Zhe Wang Date: Tue, 21 Oct 2025 12:51:56 -0700 Subject: [PATCH 6/6] add a signal indicating the conn has been closed --- flow/Net2.actor.cpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/flow/Net2.actor.cpp b/flow/Net2.actor.cpp index 74e3683b089..d0f6ccf8be7 100644 --- a/flow/Net2.actor.cpp +++ b/flow/Net2.actor.cpp @@ -950,14 +950,16 @@ class SSLConnection final : public IConnection, ReferenceCounted timeval timeout; timeout.tv_sec = seconds; timeout.tv_usec = 0; - if (!ssl_sock.next_layer().is_open()) { + int nativeSock = ssl_sock.next_layer().native_handle(); + if (closed || !ssl_sock.next_layer().is_open() || nativeSock < 0) { TraceEvent(SevWarn, "N2_SetSSLSocketTimeoutError", debugId) .detail("PeerAddr", peer_address) .detail("PeerAddress", peer_address) + .detail("Closed", closed) + .detail("NativeSock", nativeSock) .detail("Message", "Invalid native socket handle"); return; } - int nativeSock = ssl_sock.next_layer().native_handle(); setsockopt(nativeSock, SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast(&timeout), sizeof(timeout)); setsockopt(nativeSock, SOL_SOCKET, SO_SNDTIMEO, reinterpret_cast(&timeout), sizeof(timeout)); } @@ -1261,6 +1263,7 @@ class SSLConnection final : public IConnection, ReferenceCounted NetworkAddress peer_address; Reference> sslContext; bool has_trusted_peer; + bool closed = false; void init() { // Socket settings that have to be set after connect or accept succeeds @@ -1276,6 +1279,7 @@ class SSLConnection final : public IConnection, ReferenceCounted socket.close(closeError); boost::system::error_code shutdownError; ssl_sock.shutdown(shutdownError); + closed = true; } void onReadError(const boost::system::error_code& error) {