Skip to content

Commit b6ec587

Browse files
committed
reproduce tls blackhole
1 parent 185cc52 commit b6ec587

File tree

2 files changed

+123
-27
lines changed

2 files changed

+123
-27
lines changed

fdbserver/networktest.actor.cpp

Lines changed: 62 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -586,6 +586,53 @@ struct P2PNetworkTest {
586586
}
587587
}
588588

589+
/*ACTOR static Future<Void> run_impl(P2PNetworkTest* self) {
590+
state ActorCollection actors(false);
591+
592+
self->startTime = now();
593+
594+
fmt::print("{0} listeners, {1} remotes, {2} outgoing connections\n",
595+
self->listeners.size(),
596+
self->remotes.size(),
597+
self->connectionsOut);
598+
599+
for (auto n : self->remotes) {
600+
printf("Remote: %s\n", n.toString().c_str());
601+
}
602+
603+
for (auto el : self->listeners) {
604+
printf("Listener: %s\n", el->getListenAddress().toString().c_str());
605+
actors.add(incoming(self, el));
606+
}
607+
608+
printf("Request size: %s\n", self->requestBytes.toString().c_str());
609+
printf("Response size: %s\n", self->replyBytes.toString().c_str());
610+
printf("Requests per outgoing session: %s\n", self->requests.toString().c_str());
611+
printf("Delay before socket read: %s\n", self->waitReadMilliseconds.toString().c_str());
612+
printf("Delay before socket write: %s\n", self->waitWriteMilliseconds.toString().c_str());
613+
printf("Delay before session close: %s\n", self->idleMilliseconds.toString().c_str());
614+
printf("Send/Recv size %d bytes\n", FLOW_KNOBS->MAX_PACKET_SEND_BYTES);
615+
616+
if ((self->remotes.empty() || self->connectionsOut == 0) && self->listeners.empty()) {
617+
printf("No listeners and no remotes or connectionsOut, so there is nothing to do!\n");
618+
ASSERT((!self->remotes.empty() && (self->connectionsOut > 0)) || !self->listeners.empty());
619+
}
620+
621+
if (!self->remotes.empty()) {
622+
for (int i = 0; i < self->connectionsOut; ++i) {
623+
actors.add(outgoing(self));
624+
}
625+
}
626+
627+
loop {
628+
wait(delay(1.0, TaskPriority::Max));
629+
printf("%s\n", self->statsString().c_str());
630+
if (self->targetDuration > 0 && now() - self->globalStartTime > self->targetDuration) {
631+
break;
632+
}
633+
}
634+
}*/
635+
589636
ACTOR static Future<Void> run_impl(P2PNetworkTest* self) {
590637
state ActorCollection actors(false);
591638

@@ -603,35 +650,28 @@ struct P2PNetworkTest {
603650

604651
for (auto el : self->listeners) {
605652
printf("Listener: %s\n", el->getListenAddress().toString().c_str());
606-
actors.add(incoming(self, el));
607653
}
608654

609-
printf("Request size: %s\n", self->requestBytes.toString().c_str());
610-
printf("Response size: %s\n", self->replyBytes.toString().c_str());
611-
printf("Requests per outgoing session: %s\n", self->requests.toString().c_str());
612-
printf("Delay before socket read: %s\n", self->waitReadMilliseconds.toString().c_str());
613-
printf("Delay before socket write: %s\n", self->waitWriteMilliseconds.toString().c_str());
614-
printf("Delay before session close: %s\n", self->idleMilliseconds.toString().c_str());
615-
printf("Send/Recv size %d bytes\n", FLOW_KNOBS->MAX_PACKET_SEND_BYTES);
616-
617-
if ((self->remotes.empty() || self->connectionsOut == 0) && self->listeners.empty()) {
618-
printf("No listeners and no remotes or connectionsOut, so there is nothing to do!\n");
619-
ASSERT((!self->remotes.empty() && (self->connectionsOut > 0)) || !self->listeners.empty());
655+
if (!self->listeners.empty()) {
656+
state Reference<IConnection> conn1 = wait(self->listeners[0]->accept());
657+
printf("Server: connected from %s\n", conn1->getPeerAddress().toString().c_str());
658+
try {
659+
wait(conn1->acceptHandshake());
660+
printf("Server: connected from %s, handshake done\n", conn1->getPeerAddress().toString().c_str());
661+
} catch (Error& e) {
662+
printf("Server: handshake error %s\n", e.what());
663+
}
664+
threadSleep(11.0);
665+
return Void();
620666
}
621667

622668
if (!self->remotes.empty()) {
623-
for (int i = 0; i < self->connectionsOut; ++i) {
624-
actors.add(outgoing(self));
625-
}
669+
state Reference<IConnection> conn2 = wait(INetworkConnections::net()->connect(self->remotes[0]));
670+
printf("Client: connected to %s\n", self->remotes[0].toString().c_str());
671+
wait(conn2->connectHandshake());
672+
printf("Client: connected to %s, handshake done\n", self->remotes[0].toString().c_str());
626673
}
627674

628-
loop {
629-
wait(delay(1.0, TaskPriority::Max));
630-
printf("%s\n", self->statsString().c_str());
631-
if (self->targetDuration > 0 && now() - self->globalStartTime > self->targetDuration) {
632-
break;
633-
}
634-
}
635675
return Void();
636676
}
637677

flow/Net2.actor.cpp

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222
#include "boost/asio/ip/address.hpp"
2323
#include "boost/system/system_error.hpp"
2424
#include "flow/Arena.h"
25+
#include "flow/Buggify.h"
2526
#include "flow/Knobs.h"
2627
#include "flow/Platform.h"
2728
#include "flow/Trace.h"
2829
#include "flow/swift.h"
2930
#include "flow/swift_concurrency_hooks.h"
3031
#include <algorithm>
32+
#include <cstdio>
3133
#include <memory>
3234
#include <string_view>
3335
#ifndef BOOST_SYSTEM_NO_LIB
@@ -846,8 +848,40 @@ struct SSLHandshakerThread final : IThreadPoolReceiver {
846848
void action(Handshake& h) {
847849
try {
848850
h.socket.next_layer().non_blocking(false, h.err);
851+
852+
timeval timeout;
853+
timeout.tv_sec = 2;
854+
timeout.tv_usec = 0;
855+
int nativeSock = h.socket.next_layer().native_handle();
856+
setsockopt(nativeSock, SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast<const char*>(&timeout), sizeof(timeout));
857+
setsockopt(nativeSock, SOL_SOCKET, SO_SNDTIMEO, reinterpret_cast<const char*>(&timeout), sizeof(timeout));
858+
849859
if (!h.err.failed()) {
860+
if (h.type == ssl_socket::handshake_type::client) {
861+
printf("client handshake start\n");
862+
} else {
863+
printf("server handshake start\n");
864+
}
850865
h.socket.handshake(h.type, h.err);
866+
if (h.type == ssl_socket::handshake_type::client) {
867+
printf("client handshake end\n");
868+
} else {
869+
printf("server handshake end\n");
870+
}
871+
timeval timeoutZero;
872+
timeoutZero.tv_sec = 0;
873+
timeoutZero.tv_usec = 0;
874+
int nativeSock = h.socket.next_layer().native_handle();
875+
setsockopt(nativeSock,
876+
SOL_SOCKET,
877+
SO_RCVTIMEO,
878+
reinterpret_cast<const char*>(&timeoutZero),
879+
sizeof(timeoutZero));
880+
setsockopt(nativeSock,
881+
SOL_SOCKET,
882+
SO_SNDTIMEO,
883+
reinterpret_cast<const char*>(&timeoutZero),
884+
sizeof(timeoutZero));
851885
}
852886
if (!h.err.failed()) {
853887
h.socket.next_layer().non_blocking(true, h.err);
@@ -1028,9 +1062,11 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
10281062
}
10291063
wait(onHandshook);
10301064
wait(delay(0, TaskPriority::Handshake));
1065+
printf("server handshake success internal\n");
10311066
connected.send(Void());
10321067
} catch (...) {
10331068
self->closeSocket();
1069+
printf("server handshake error internal\n");
10341070
connected.sendError(connection_failed());
10351071
}
10361072
}
@@ -1063,7 +1099,6 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
10631099
static SimpleCounter<int64_t>* countServerTLSHandshakeLocked =
10641100
SimpleCounter<int64_t>::makeCounter("/Net2/TLS/ServerTLSHandshakeLocked");
10651101
countServerTLSHandshakeLocked->increment(1);
1066-
10671102
Promise<Void> connected;
10681103
doAcceptHandshake(self, connected);
10691104
try {
@@ -1072,12 +1107,17 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
10721107
static SimpleCounter<int64_t>* countServerTLSHandshakesSucceed =
10731108
SimpleCounter<int64_t>::makeCounter("/Net2/TLS/ServerTLSHandshakesSucceed");
10741109
countServerTLSHandshakesSucceed->increment(1);
1110+
printf("server handshake done\n");
10751111
return Void();
10761112
}
10771113
when(wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT))) {
1078-
static SimpleCounter<int64_t>* countServerTLSHandshakesTimedout =
1079-
SimpleCounter<int64_t>::makeCounter("/Net2/TLS/ServerTLSHandshakesTimedout");
1080-
countServerTLSHandshakesTimedout->increment(1);
1114+
static SimpleCounter<int64_t>* countServerTLSHandshakesSucceed =
1115+
SimpleCounter<int64_t>::makeCounter("/Net2/TLS/ServerTLSHandshakesSucceed");
1116+
countServerTLSHandshakesSucceed->increment(1);
1117+
TraceEvent("N2_AcceptHandshakeTimeout", self->id)
1118+
.suppressFor(1.0)
1119+
.detail("PeerAddress", self->getPeerAddress());
1120+
printf("server handshake timeout\n");
10811121
throw connection_failed();
10821122
}
10831123
}
@@ -1138,6 +1178,10 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
11381178
handshake->setPeerAddr(self->getPeerAddress());
11391179
onHandshook = handshake->done.getFuture();
11401180
N2::g_net2->sslHandshakerPool->post(handshake);
1181+
printf("main thread sleep\n");
1182+
g_network->stop();
1183+
threadSleep(10.0);
1184+
printf("main thread awake\n");
11411185
} else {
11421186
// Otherwise use flow network thread
11431187
static SimpleCounter<int64_t>* countClientTLSHandshakesOnMainThread =
@@ -1147,6 +1191,10 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
11471191
p.setPeerAddr(self->getPeerAddress());
11481192
onHandshook = p.getFuture();
11491193
self->ssl_sock.async_handshake(boost::asio::ssl::stream_base::client, std::move(p));
1194+
printf("main thread sleep\n");
1195+
g_network->stop();
1196+
threadSleep(10.0);
1197+
printf("main thread awake\n");
11501198
}
11511199
wait(onHandshook);
11521200
wait(delay(0, TaskPriority::Handshake));
@@ -1164,7 +1212,7 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
11641212
static SimpleCounter<int64_t>* countClientTLSHandshakeLocked =
11651213
SimpleCounter<int64_t>::makeCounter("/Net2/TLS/ClientTLSHandshakeLocked");
11661214
countClientTLSHandshakeLocked->increment(1);
1167-
1215+
printf("client handshake locked\n");
11681216
Promise<Void> connected;
11691217
doConnectHandshake(self, connected);
11701218
try {
@@ -1173,12 +1221,17 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
11731221
static SimpleCounter<int64_t>* countClientTLSHandshakesSucceed =
11741222
SimpleCounter<int64_t>::makeCounter("/Net2/TLS/ClientTLSHandshakesSucceed");
11751223
countClientTLSHandshakesSucceed->increment(1);
1224+
printf("client handshake done\n");
11761225
return Void();
11771226
}
11781227
when(wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT))) {
11791228
static SimpleCounter<int64_t>* countClientTLSHandshakesTimedout =
11801229
SimpleCounter<int64_t>::makeCounter("/Net2/TLS/ClientTLSHandshakesTimedout");
11811230
countClientTLSHandshakesTimedout->increment(1);
1231+
TraceEvent("N2_ConnectHandshakeTimeout", self->id)
1232+
.suppressFor(1.0)
1233+
.detail("PeerAddress", self->getPeerAddress());
1234+
printf("client handshake timeout\n");
11821235
throw connection_failed();
11831236
}
11841237
}
@@ -1309,10 +1362,13 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
13091362
void closeSocket() {
13101363
boost::system::error_code cancelError;
13111364
socket.cancel(cancelError);
1365+
printf("Socket cancelled\n");
13121366
boost::system::error_code closeError;
13131367
socket.close(closeError);
1368+
printf("Socket closed\n");
13141369
boost::system::error_code shutdownError;
13151370
ssl_sock.shutdown(shutdownError);
1371+
printf("SSL shutdown\n");
13161372
}
13171373

13181374
void onReadError(const boost::system::error_code& error) {

0 commit comments

Comments
 (0)