Skip to content

Commit 6c260b2

Browse files
committed
reproduce tls blackhole
1 parent 185cc52 commit 6c260b2

File tree

2 files changed

+129
-27
lines changed

2 files changed

+129
-27
lines changed

fdbserver/networktest.actor.cpp

Lines changed: 62 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -586,6 +586,53 @@ struct P2PNetworkTest {
586586
}
587587
}
588588

589+
/*ACTOR static Future<Void> run_impl(P2PNetworkTest* self) {
590+
state ActorCollection actors(false);
591+
592+
self->startTime = now();
593+
594+
fmt::print("{0} listeners, {1} remotes, {2} outgoing connections\n",
595+
self->listeners.size(),
596+
self->remotes.size(),
597+
self->connectionsOut);
598+
599+
for (auto n : self->remotes) {
600+
printf("Remote: %s\n", n.toString().c_str());
601+
}
602+
603+
for (auto el : self->listeners) {
604+
printf("Listener: %s\n", el->getListenAddress().toString().c_str());
605+
actors.add(incoming(self, el));
606+
}
607+
608+
printf("Request size: %s\n", self->requestBytes.toString().c_str());
609+
printf("Response size: %s\n", self->replyBytes.toString().c_str());
610+
printf("Requests per outgoing session: %s\n", self->requests.toString().c_str());
611+
printf("Delay before socket read: %s\n", self->waitReadMilliseconds.toString().c_str());
612+
printf("Delay before socket write: %s\n", self->waitWriteMilliseconds.toString().c_str());
613+
printf("Delay before session close: %s\n", self->idleMilliseconds.toString().c_str());
614+
printf("Send/Recv size %d bytes\n", FLOW_KNOBS->MAX_PACKET_SEND_BYTES);
615+
616+
if ((self->remotes.empty() || self->connectionsOut == 0) && self->listeners.empty()) {
617+
printf("No listeners and no remotes or connectionsOut, so there is nothing to do!\n");
618+
ASSERT((!self->remotes.empty() && (self->connectionsOut > 0)) || !self->listeners.empty());
619+
}
620+
621+
if (!self->remotes.empty()) {
622+
for (int i = 0; i < self->connectionsOut; ++i) {
623+
actors.add(outgoing(self));
624+
}
625+
}
626+
627+
loop {
628+
wait(delay(1.0, TaskPriority::Max));
629+
printf("%s\n", self->statsString().c_str());
630+
if (self->targetDuration > 0 && now() - self->globalStartTime > self->targetDuration) {
631+
break;
632+
}
633+
}
634+
}*/
635+
589636
ACTOR static Future<Void> run_impl(P2PNetworkTest* self) {
590637
state ActorCollection actors(false);
591638

@@ -603,35 +650,28 @@ struct P2PNetworkTest {
603650

604651
for (auto el : self->listeners) {
605652
printf("Listener: %s\n", el->getListenAddress().toString().c_str());
606-
actors.add(incoming(self, el));
607653
}
608654

609-
printf("Request size: %s\n", self->requestBytes.toString().c_str());
610-
printf("Response size: %s\n", self->replyBytes.toString().c_str());
611-
printf("Requests per outgoing session: %s\n", self->requests.toString().c_str());
612-
printf("Delay before socket read: %s\n", self->waitReadMilliseconds.toString().c_str());
613-
printf("Delay before socket write: %s\n", self->waitWriteMilliseconds.toString().c_str());
614-
printf("Delay before session close: %s\n", self->idleMilliseconds.toString().c_str());
615-
printf("Send/Recv size %d bytes\n", FLOW_KNOBS->MAX_PACKET_SEND_BYTES);
616-
617-
if ((self->remotes.empty() || self->connectionsOut == 0) && self->listeners.empty()) {
618-
printf("No listeners and no remotes or connectionsOut, so there is nothing to do!\n");
619-
ASSERT((!self->remotes.empty() && (self->connectionsOut > 0)) || !self->listeners.empty());
655+
if (!self->listeners.empty()) {
656+
state Reference<IConnection> conn1 = wait(self->listeners[0]->accept());
657+
printf("Server: connected from %s\n", conn1->getPeerAddress().toString().c_str());
658+
try {
659+
wait(conn1->acceptHandshake());
660+
printf("Server: connected from %s, handshake done\n", conn1->getPeerAddress().toString().c_str());
661+
} catch (Error& e) {
662+
printf("Server: handshake error %s\n", e.what());
663+
}
664+
threadSleep(11.0);
665+
return Void();
620666
}
621667

622668
if (!self->remotes.empty()) {
623-
for (int i = 0; i < self->connectionsOut; ++i) {
624-
actors.add(outgoing(self));
625-
}
669+
state Reference<IConnection> conn2 = wait(INetworkConnections::net()->connect(self->remotes[0]));
670+
printf("Client: connected to %s\n", self->remotes[0].toString().c_str());
671+
wait(conn2->connectHandshake());
672+
printf("Client: connected to %s, handshake done\n", self->remotes[0].toString().c_str());
626673
}
627674

628-
loop {
629-
wait(delay(1.0, TaskPriority::Max));
630-
printf("%s\n", self->statsString().c_str());
631-
if (self->targetDuration > 0 && now() - self->globalStartTime > self->targetDuration) {
632-
break;
633-
}
634-
}
635675
return Void();
636676
}
637677

flow/Net2.actor.cpp

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222
#include "boost/asio/ip/address.hpp"
2323
#include "boost/system/system_error.hpp"
2424
#include "flow/Arena.h"
25+
#include "flow/Buggify.h"
2526
#include "flow/Knobs.h"
2627
#include "flow/Platform.h"
2728
#include "flow/Trace.h"
2829
#include "flow/swift.h"
2930
#include "flow/swift_concurrency_hooks.h"
3031
#include <algorithm>
32+
#include <cstdio>
3133
#include <memory>
3234
#include <string_view>
3335
#ifndef BOOST_SYSTEM_NO_LIB
@@ -846,8 +848,46 @@ struct SSLHandshakerThread final : IThreadPoolReceiver {
846848
void action(Handshake& h) {
847849
try {
848850
h.socket.next_layer().non_blocking(false, h.err);
851+
852+
timeval timeout;
853+
timeout.tv_sec = 2;
854+
timeout.tv_usec = 0;
855+
int nativeSock = h.socket.next_layer().native_handle();
856+
setsockopt(nativeSock, SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast<const char*>(&timeout), sizeof(timeout));
857+
setsockopt(nativeSock, SOL_SOCKET, SO_SNDTIMEO, reinterpret_cast<const char*>(&timeout), sizeof(timeout));
858+
849859
if (!h.err.failed()) {
860+
if (h.type == ssl_socket::handshake_type::client) {
861+
// std::this_thread::sleep_for(std::chrono::milliseconds(
862+
// FLOW_KNOBS->TLS_CLIENT_HANDSHAKE_DELAY_MS)); // artificial delay for testing
863+
printf("client handshake abort\n");
864+
// std::abort();
865+
}
866+
if (h.type == ssl_socket::handshake_type::client) {
867+
printf("client handshake start\n");
868+
} else {
869+
printf("server handshake start\n");
870+
}
850871
h.socket.handshake(h.type, h.err);
872+
if (h.type == ssl_socket::handshake_type::client) {
873+
printf("client handshake end\n");
874+
} else {
875+
printf("server handshake end\n");
876+
}
877+
timeval timeoutZero;
878+
timeoutZero.tv_sec = 0;
879+
timeoutZero.tv_usec = 0;
880+
int nativeSock = h.socket.next_layer().native_handle();
881+
setsockopt(nativeSock,
882+
SOL_SOCKET,
883+
SO_RCVTIMEO,
884+
reinterpret_cast<const char*>(&timeoutZero),
885+
sizeof(timeoutZero));
886+
setsockopt(nativeSock,
887+
SOL_SOCKET,
888+
SO_SNDTIMEO,
889+
reinterpret_cast<const char*>(&timeoutZero),
890+
sizeof(timeoutZero));
851891
}
852892
if (!h.err.failed()) {
853893
h.socket.next_layer().non_blocking(true, h.err);
@@ -1028,9 +1068,11 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
10281068
}
10291069
wait(onHandshook);
10301070
wait(delay(0, TaskPriority::Handshake));
1071+
printf("server handshake success internal\n");
10311072
connected.send(Void());
10321073
} catch (...) {
10331074
self->closeSocket();
1075+
printf("server handshake error internal\n");
10341076
connected.sendError(connection_failed());
10351077
}
10361078
}
@@ -1063,7 +1105,6 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
10631105
static SimpleCounter<int64_t>* countServerTLSHandshakeLocked =
10641106
SimpleCounter<int64_t>::makeCounter("/Net2/TLS/ServerTLSHandshakeLocked");
10651107
countServerTLSHandshakeLocked->increment(1);
1066-
10671108
Promise<Void> connected;
10681109
doAcceptHandshake(self, connected);
10691110
try {
@@ -1072,12 +1113,17 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
10721113
static SimpleCounter<int64_t>* countServerTLSHandshakesSucceed =
10731114
SimpleCounter<int64_t>::makeCounter("/Net2/TLS/ServerTLSHandshakesSucceed");
10741115
countServerTLSHandshakesSucceed->increment(1);
1116+
printf("server handshake done\n");
10751117
return Void();
10761118
}
10771119
when(wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT))) {
1078-
static SimpleCounter<int64_t>* countServerTLSHandshakesTimedout =
1079-
SimpleCounter<int64_t>::makeCounter("/Net2/TLS/ServerTLSHandshakesTimedout");
1080-
countServerTLSHandshakesTimedout->increment(1);
1120+
static SimpleCounter<int64_t>* countServerTLSHandshakesSucceed =
1121+
SimpleCounter<int64_t>::makeCounter("/Net2/TLS/ServerTLSHandshakesSucceed");
1122+
countServerTLSHandshakesSucceed->increment(1);
1123+
TraceEvent("N2_AcceptHandshakeTimeout", self->id)
1124+
.suppressFor(1.0)
1125+
.detail("PeerAddress", self->getPeerAddress());
1126+
printf("server handshake timeout\n");
10811127
throw connection_failed();
10821128
}
10831129
}
@@ -1138,6 +1184,10 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
11381184
handshake->setPeerAddr(self->getPeerAddress());
11391185
onHandshook = handshake->done.getFuture();
11401186
N2::g_net2->sslHandshakerPool->post(handshake);
1187+
printf("main thread sleep\n");
1188+
g_network->stop();
1189+
threadSleep(10.0);
1190+
printf("main thread awake\n");
11411191
} else {
11421192
// Otherwise use flow network thread
11431193
static SimpleCounter<int64_t>* countClientTLSHandshakesOnMainThread =
@@ -1147,6 +1197,10 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
11471197
p.setPeerAddr(self->getPeerAddress());
11481198
onHandshook = p.getFuture();
11491199
self->ssl_sock.async_handshake(boost::asio::ssl::stream_base::client, std::move(p));
1200+
printf("main thread sleep\n");
1201+
g_network->stop();
1202+
threadSleep(10.0);
1203+
printf("main thread awake\n");
11501204
}
11511205
wait(onHandshook);
11521206
wait(delay(0, TaskPriority::Handshake));
@@ -1164,7 +1218,7 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
11641218
static SimpleCounter<int64_t>* countClientTLSHandshakeLocked =
11651219
SimpleCounter<int64_t>::makeCounter("/Net2/TLS/ClientTLSHandshakeLocked");
11661220
countClientTLSHandshakeLocked->increment(1);
1167-
1221+
printf("client handshake locked\n");
11681222
Promise<Void> connected;
11691223
doConnectHandshake(self, connected);
11701224
try {
@@ -1173,12 +1227,17 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
11731227
static SimpleCounter<int64_t>* countClientTLSHandshakesSucceed =
11741228
SimpleCounter<int64_t>::makeCounter("/Net2/TLS/ClientTLSHandshakesSucceed");
11751229
countClientTLSHandshakesSucceed->increment(1);
1230+
printf("client handshake done\n");
11761231
return Void();
11771232
}
11781233
when(wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT))) {
11791234
static SimpleCounter<int64_t>* countClientTLSHandshakesTimedout =
11801235
SimpleCounter<int64_t>::makeCounter("/Net2/TLS/ClientTLSHandshakesTimedout");
11811236
countClientTLSHandshakesTimedout->increment(1);
1237+
TraceEvent("N2_ConnectHandshakeTimeout", self->id)
1238+
.suppressFor(1.0)
1239+
.detail("PeerAddress", self->getPeerAddress());
1240+
printf("client handshake timeout\n");
11821241
throw connection_failed();
11831242
}
11841243
}
@@ -1309,10 +1368,13 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
13091368
void closeSocket() {
13101369
boost::system::error_code cancelError;
13111370
socket.cancel(cancelError);
1371+
printf("Socket cancelled\n");
13121372
boost::system::error_code closeError;
13131373
socket.close(closeError);
1374+
printf("Socket closed\n");
13141375
boost::system::error_code shutdownError;
13151376
ssl_sock.shutdown(shutdownError);
1377+
printf("SSL shutdown\n");
13161378
}
13171379

13181380
void onReadError(const boost::system::error_code& error) {

0 commit comments

Comments
 (0)