From b6ec58714ba663162fa6b5d60a8e138f1fccc569 Mon Sep 17 00:00:00 2001 From: Zhe Wang Date: Thu, 9 Oct 2025 20:29:31 -0700 Subject: [PATCH] reproduce tls blackhole --- fdbserver/networktest.actor.cpp | 84 ++++++++++++++++++++++++--------- flow/Net2.actor.cpp | 66 ++++++++++++++++++++++++-- 2 files changed, 123 insertions(+), 27 deletions(-) diff --git a/fdbserver/networktest.actor.cpp b/fdbserver/networktest.actor.cpp index d7c20cca8a5..82c53ca49fe 100644 --- a/fdbserver/networktest.actor.cpp +++ b/fdbserver/networktest.actor.cpp @@ -586,6 +586,53 @@ struct P2PNetworkTest { } } + /*ACTOR static Future run_impl(P2PNetworkTest* self) { + state ActorCollection actors(false); + + self->startTime = now(); + + fmt::print("{0} listeners, {1} remotes, {2} outgoing connections\n", + self->listeners.size(), + self->remotes.size(), + self->connectionsOut); + + for (auto n : self->remotes) { + printf("Remote: %s\n", n.toString().c_str()); + } + + for (auto el : self->listeners) { + printf("Listener: %s\n", el->getListenAddress().toString().c_str()); + actors.add(incoming(self, el)); + } + + printf("Request size: %s\n", self->requestBytes.toString().c_str()); + printf("Response size: %s\n", self->replyBytes.toString().c_str()); + printf("Requests per outgoing session: %s\n", self->requests.toString().c_str()); + printf("Delay before socket read: %s\n", self->waitReadMilliseconds.toString().c_str()); + printf("Delay before socket write: %s\n", self->waitWriteMilliseconds.toString().c_str()); + printf("Delay before session close: %s\n", self->idleMilliseconds.toString().c_str()); + printf("Send/Recv size %d bytes\n", FLOW_KNOBS->MAX_PACKET_SEND_BYTES); + + if ((self->remotes.empty() || self->connectionsOut == 0) && self->listeners.empty()) { + printf("No listeners and no remotes or connectionsOut, so there is nothing to do!\n"); + ASSERT((!self->remotes.empty() && (self->connectionsOut > 0)) || !self->listeners.empty()); + } + + if (!self->remotes.empty()) { + for (int i = 0; i < self->connectionsOut; ++i) { + actors.add(outgoing(self)); + } + } + + loop { + wait(delay(1.0, TaskPriority::Max)); + printf("%s\n", self->statsString().c_str()); + if (self->targetDuration > 0 && now() - self->globalStartTime > self->targetDuration) { + break; + } + } + }*/ + ACTOR static Future run_impl(P2PNetworkTest* self) { state ActorCollection actors(false); @@ -603,35 +650,28 @@ struct P2PNetworkTest { for (auto el : self->listeners) { printf("Listener: %s\n", el->getListenAddress().toString().c_str()); - actors.add(incoming(self, el)); } - printf("Request size: %s\n", self->requestBytes.toString().c_str()); - printf("Response size: %s\n", self->replyBytes.toString().c_str()); - printf("Requests per outgoing session: %s\n", self->requests.toString().c_str()); - printf("Delay before socket read: %s\n", self->waitReadMilliseconds.toString().c_str()); - printf("Delay before socket write: %s\n", self->waitWriteMilliseconds.toString().c_str()); - printf("Delay before session close: %s\n", self->idleMilliseconds.toString().c_str()); - printf("Send/Recv size %d bytes\n", FLOW_KNOBS->MAX_PACKET_SEND_BYTES); - - if ((self->remotes.empty() || self->connectionsOut == 0) && self->listeners.empty()) { - printf("No listeners and no remotes or connectionsOut, so there is nothing to do!\n"); - ASSERT((!self->remotes.empty() && (self->connectionsOut > 0)) || !self->listeners.empty()); + if (!self->listeners.empty()) { + state Reference conn1 = wait(self->listeners[0]->accept()); + printf("Server: connected from %s\n", conn1->getPeerAddress().toString().c_str()); + try { + wait(conn1->acceptHandshake()); + printf("Server: connected from %s, handshake done\n", conn1->getPeerAddress().toString().c_str()); + } catch (Error& e) { + printf("Server: handshake error %s\n", e.what()); + } + threadSleep(11.0); + return Void(); } if (!self->remotes.empty()) { - for (int i = 0; i < self->connectionsOut; ++i) { - actors.add(outgoing(self)); - } + state Reference conn2 = wait(INetworkConnections::net()->connect(self->remotes[0])); + printf("Client: connected to %s\n", self->remotes[0].toString().c_str()); + wait(conn2->connectHandshake()); + printf("Client: connected to %s, handshake done\n", self->remotes[0].toString().c_str()); } - loop { - wait(delay(1.0, TaskPriority::Max)); - printf("%s\n", self->statsString().c_str()); - if (self->targetDuration > 0 && now() - self->globalStartTime > self->targetDuration) { - break; - } - } return Void(); } diff --git a/flow/Net2.actor.cpp b/flow/Net2.actor.cpp index 508e9e6dff0..0c7cc942f3c 100644 --- a/flow/Net2.actor.cpp +++ b/flow/Net2.actor.cpp @@ -22,12 +22,14 @@ #include "boost/asio/ip/address.hpp" #include "boost/system/system_error.hpp" #include "flow/Arena.h" +#include "flow/Buggify.h" #include "flow/Knobs.h" #include "flow/Platform.h" #include "flow/Trace.h" #include "flow/swift.h" #include "flow/swift_concurrency_hooks.h" #include +#include #include #include #ifndef BOOST_SYSTEM_NO_LIB @@ -846,8 +848,40 @@ struct SSLHandshakerThread final : IThreadPoolReceiver { void action(Handshake& h) { try { h.socket.next_layer().non_blocking(false, h.err); + + timeval timeout; + timeout.tv_sec = 2; + timeout.tv_usec = 0; + int nativeSock = h.socket.next_layer().native_handle(); + setsockopt(nativeSock, SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast(&timeout), sizeof(timeout)); + setsockopt(nativeSock, SOL_SOCKET, SO_SNDTIMEO, reinterpret_cast(&timeout), sizeof(timeout)); + if (!h.err.failed()) { + if (h.type == ssl_socket::handshake_type::client) { + printf("client handshake start\n"); + } else { + printf("server handshake start\n"); + } h.socket.handshake(h.type, h.err); + if (h.type == ssl_socket::handshake_type::client) { + printf("client handshake end\n"); + } else { + printf("server handshake end\n"); + } + timeval timeoutZero; + timeoutZero.tv_sec = 0; + timeoutZero.tv_usec = 0; + int nativeSock = h.socket.next_layer().native_handle(); + setsockopt(nativeSock, + SOL_SOCKET, + SO_RCVTIMEO, + reinterpret_cast(&timeoutZero), + sizeof(timeoutZero)); + setsockopt(nativeSock, + SOL_SOCKET, + SO_SNDTIMEO, + reinterpret_cast(&timeoutZero), + sizeof(timeoutZero)); } if (!h.err.failed()) { h.socket.next_layer().non_blocking(true, h.err); @@ -1028,9 +1062,11 @@ class SSLConnection final : public IConnection, ReferenceCounted } wait(onHandshook); wait(delay(0, TaskPriority::Handshake)); + printf("server handshake success internal\n"); connected.send(Void()); } catch (...) { self->closeSocket(); + printf("server handshake error internal\n"); connected.sendError(connection_failed()); } } @@ -1063,7 +1099,6 @@ class SSLConnection final : public IConnection, ReferenceCounted static SimpleCounter* countServerTLSHandshakeLocked = SimpleCounter::makeCounter("/Net2/TLS/ServerTLSHandshakeLocked"); countServerTLSHandshakeLocked->increment(1); - Promise connected; doAcceptHandshake(self, connected); try { @@ -1072,12 +1107,17 @@ class SSLConnection final : public IConnection, ReferenceCounted static SimpleCounter* countServerTLSHandshakesSucceed = SimpleCounter::makeCounter("/Net2/TLS/ServerTLSHandshakesSucceed"); countServerTLSHandshakesSucceed->increment(1); + printf("server handshake done\n"); return Void(); } when(wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT))) { - static SimpleCounter* countServerTLSHandshakesTimedout = - SimpleCounter::makeCounter("/Net2/TLS/ServerTLSHandshakesTimedout"); - countServerTLSHandshakesTimedout->increment(1); + static SimpleCounter* countServerTLSHandshakesSucceed = + SimpleCounter::makeCounter("/Net2/TLS/ServerTLSHandshakesSucceed"); + countServerTLSHandshakesSucceed->increment(1); + TraceEvent("N2_AcceptHandshakeTimeout", self->id) + .suppressFor(1.0) + .detail("PeerAddress", self->getPeerAddress()); + printf("server handshake timeout\n"); throw connection_failed(); } } @@ -1138,6 +1178,10 @@ class SSLConnection final : public IConnection, ReferenceCounted handshake->setPeerAddr(self->getPeerAddress()); onHandshook = handshake->done.getFuture(); N2::g_net2->sslHandshakerPool->post(handshake); + printf("main thread sleep\n"); + g_network->stop(); + threadSleep(10.0); + printf("main thread awake\n"); } else { // Otherwise use flow network thread static SimpleCounter* countClientTLSHandshakesOnMainThread = @@ -1147,6 +1191,10 @@ class SSLConnection final : public IConnection, ReferenceCounted p.setPeerAddr(self->getPeerAddress()); onHandshook = p.getFuture(); self->ssl_sock.async_handshake(boost::asio::ssl::stream_base::client, std::move(p)); + printf("main thread sleep\n"); + g_network->stop(); + threadSleep(10.0); + printf("main thread awake\n"); } wait(onHandshook); wait(delay(0, TaskPriority::Handshake)); @@ -1164,7 +1212,7 @@ class SSLConnection final : public IConnection, ReferenceCounted static SimpleCounter* countClientTLSHandshakeLocked = SimpleCounter::makeCounter("/Net2/TLS/ClientTLSHandshakeLocked"); countClientTLSHandshakeLocked->increment(1); - + printf("client handshake locked\n"); Promise connected; doConnectHandshake(self, connected); try { @@ -1173,12 +1221,17 @@ class SSLConnection final : public IConnection, ReferenceCounted static SimpleCounter* countClientTLSHandshakesSucceed = SimpleCounter::makeCounter("/Net2/TLS/ClientTLSHandshakesSucceed"); countClientTLSHandshakesSucceed->increment(1); + printf("client handshake done\n"); return Void(); } when(wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT))) { static SimpleCounter* countClientTLSHandshakesTimedout = SimpleCounter::makeCounter("/Net2/TLS/ClientTLSHandshakesTimedout"); countClientTLSHandshakesTimedout->increment(1); + TraceEvent("N2_ConnectHandshakeTimeout", self->id) + .suppressFor(1.0) + .detail("PeerAddress", self->getPeerAddress()); + printf("client handshake timeout\n"); throw connection_failed(); } } @@ -1309,10 +1362,13 @@ class SSLConnection final : public IConnection, ReferenceCounted void closeSocket() { boost::system::error_code cancelError; socket.cancel(cancelError); + printf("Socket cancelled\n"); boost::system::error_code closeError; socket.close(closeError); + printf("Socket closed\n"); boost::system::error_code shutdownError; ssl_sock.shutdown(shutdownError); + printf("SSL shutdown\n"); } void onReadError(const boost::system::error_code& error) {