Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 62 additions & 22 deletions fdbserver/networktest.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,53 @@ struct P2PNetworkTest {
}
}

/*ACTOR static Future<Void> run_impl(P2PNetworkTest* self) {
state ActorCollection actors(false);

self->startTime = now();

fmt::print("{0} listeners, {1} remotes, {2} outgoing connections\n",
self->listeners.size(),
self->remotes.size(),
self->connectionsOut);

for (auto n : self->remotes) {
printf("Remote: %s\n", n.toString().c_str());
}

for (auto el : self->listeners) {
printf("Listener: %s\n", el->getListenAddress().toString().c_str());
actors.add(incoming(self, el));
}

printf("Request size: %s\n", self->requestBytes.toString().c_str());
printf("Response size: %s\n", self->replyBytes.toString().c_str());
printf("Requests per outgoing session: %s\n", self->requests.toString().c_str());
printf("Delay before socket read: %s\n", self->waitReadMilliseconds.toString().c_str());
printf("Delay before socket write: %s\n", self->waitWriteMilliseconds.toString().c_str());
printf("Delay before session close: %s\n", self->idleMilliseconds.toString().c_str());
printf("Send/Recv size %d bytes\n", FLOW_KNOBS->MAX_PACKET_SEND_BYTES);

if ((self->remotes.empty() || self->connectionsOut == 0) && self->listeners.empty()) {
printf("No listeners and no remotes or connectionsOut, so there is nothing to do!\n");
ASSERT((!self->remotes.empty() && (self->connectionsOut > 0)) || !self->listeners.empty());
}

if (!self->remotes.empty()) {
for (int i = 0; i < self->connectionsOut; ++i) {
actors.add(outgoing(self));
}
}

loop {
wait(delay(1.0, TaskPriority::Max));
printf("%s\n", self->statsString().c_str());
if (self->targetDuration > 0 && now() - self->globalStartTime > self->targetDuration) {
break;
}
}
}*/

ACTOR static Future<Void> run_impl(P2PNetworkTest* self) {
state ActorCollection actors(false);

Expand All @@ -603,35 +650,28 @@ struct P2PNetworkTest {

for (auto el : self->listeners) {
printf("Listener: %s\n", el->getListenAddress().toString().c_str());
actors.add(incoming(self, el));
}

printf("Request size: %s\n", self->requestBytes.toString().c_str());
printf("Response size: %s\n", self->replyBytes.toString().c_str());
printf("Requests per outgoing session: %s\n", self->requests.toString().c_str());
printf("Delay before socket read: %s\n", self->waitReadMilliseconds.toString().c_str());
printf("Delay before socket write: %s\n", self->waitWriteMilliseconds.toString().c_str());
printf("Delay before session close: %s\n", self->idleMilliseconds.toString().c_str());
printf("Send/Recv size %d bytes\n", FLOW_KNOBS->MAX_PACKET_SEND_BYTES);

if ((self->remotes.empty() || self->connectionsOut == 0) && self->listeners.empty()) {
printf("No listeners and no remotes or connectionsOut, so there is nothing to do!\n");
ASSERT((!self->remotes.empty() && (self->connectionsOut > 0)) || !self->listeners.empty());
if (!self->listeners.empty()) {
state Reference<IConnection> conn1 = wait(self->listeners[0]->accept());
printf("Server: connected from %s\n", conn1->getPeerAddress().toString().c_str());
try {
wait(conn1->acceptHandshake());
printf("Server: connected from %s, handshake done\n", conn1->getPeerAddress().toString().c_str());
} catch (Error& e) {
printf("Server: handshake error %s\n", e.what());
}
threadSleep(11.0);
return Void();
}

if (!self->remotes.empty()) {
for (int i = 0; i < self->connectionsOut; ++i) {
actors.add(outgoing(self));
}
state Reference<IConnection> conn2 = wait(INetworkConnections::net()->connect(self->remotes[0]));
printf("Client: connected to %s\n", self->remotes[0].toString().c_str());
wait(conn2->connectHandshake());
printf("Client: connected to %s, handshake done\n", self->remotes[0].toString().c_str());
}

loop {
wait(delay(1.0, TaskPriority::Max));
printf("%s\n", self->statsString().c_str());
if (self->targetDuration > 0 && now() - self->globalStartTime > self->targetDuration) {
break;
}
}
return Void();
}

Expand Down
66 changes: 61 additions & 5 deletions flow/Net2.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
#include "boost/asio/ip/address.hpp"
#include "boost/system/system_error.hpp"
#include "flow/Arena.h"
#include "flow/Buggify.h"
#include "flow/Knobs.h"
#include "flow/Platform.h"
#include "flow/Trace.h"
#include "flow/swift.h"
#include "flow/swift_concurrency_hooks.h"
#include <algorithm>
#include <cstdio>
#include <memory>
#include <string_view>
#ifndef BOOST_SYSTEM_NO_LIB
Expand Down Expand Up @@ -846,8 +848,40 @@ struct SSLHandshakerThread final : IThreadPoolReceiver {
void action(Handshake& h) {
try {
h.socket.next_layer().non_blocking(false, h.err);

timeval timeout;
timeout.tv_sec = 2;
timeout.tv_usec = 0;
int nativeSock = h.socket.next_layer().native_handle();
setsockopt(nativeSock, SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast<const char*>(&timeout), sizeof(timeout));
setsockopt(nativeSock, SOL_SOCKET, SO_SNDTIMEO, reinterpret_cast<const char*>(&timeout), sizeof(timeout));

if (!h.err.failed()) {
if (h.type == ssl_socket::handshake_type::client) {
printf("client handshake start\n");
} else {
printf("server handshake start\n");
}
h.socket.handshake(h.type, h.err);
if (h.type == ssl_socket::handshake_type::client) {
printf("client handshake end\n");
} else {
printf("server handshake end\n");
}
timeval timeoutZero;
timeoutZero.tv_sec = 0;
timeoutZero.tv_usec = 0;
int nativeSock = h.socket.next_layer().native_handle();
setsockopt(nativeSock,
SOL_SOCKET,
SO_RCVTIMEO,
reinterpret_cast<const char*>(&timeoutZero),
sizeof(timeoutZero));
setsockopt(nativeSock,
SOL_SOCKET,
SO_SNDTIMEO,
reinterpret_cast<const char*>(&timeoutZero),
sizeof(timeoutZero));
}
if (!h.err.failed()) {
h.socket.next_layer().non_blocking(true, h.err);
Expand Down Expand Up @@ -1028,9 +1062,11 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
}
wait(onHandshook);
wait(delay(0, TaskPriority::Handshake));
printf("server handshake success internal\n");
connected.send(Void());
} catch (...) {
self->closeSocket();
printf("server handshake error internal\n");
connected.sendError(connection_failed());
}
}
Expand Down Expand Up @@ -1063,7 +1099,6 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
static SimpleCounter<int64_t>* countServerTLSHandshakeLocked =
SimpleCounter<int64_t>::makeCounter("/Net2/TLS/ServerTLSHandshakeLocked");
countServerTLSHandshakeLocked->increment(1);

Promise<Void> connected;
doAcceptHandshake(self, connected);
try {
Expand All @@ -1072,12 +1107,17 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
static SimpleCounter<int64_t>* countServerTLSHandshakesSucceed =
SimpleCounter<int64_t>::makeCounter("/Net2/TLS/ServerTLSHandshakesSucceed");
countServerTLSHandshakesSucceed->increment(1);
printf("server handshake done\n");
return Void();
}
when(wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT))) {
static SimpleCounter<int64_t>* countServerTLSHandshakesTimedout =
SimpleCounter<int64_t>::makeCounter("/Net2/TLS/ServerTLSHandshakesTimedout");
countServerTLSHandshakesTimedout->increment(1);
static SimpleCounter<int64_t>* countServerTLSHandshakesSucceed =
SimpleCounter<int64_t>::makeCounter("/Net2/TLS/ServerTLSHandshakesSucceed");
countServerTLSHandshakesSucceed->increment(1);
TraceEvent("N2_AcceptHandshakeTimeout", self->id)
.suppressFor(1.0)
.detail("PeerAddress", self->getPeerAddress());
printf("server handshake timeout\n");
throw connection_failed();
}
}
Expand Down Expand Up @@ -1138,6 +1178,10 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
handshake->setPeerAddr(self->getPeerAddress());
onHandshook = handshake->done.getFuture();
N2::g_net2->sslHandshakerPool->post(handshake);
printf("main thread sleep\n");
g_network->stop();
threadSleep(10.0);
printf("main thread awake\n");
} else {
// Otherwise use flow network thread
static SimpleCounter<int64_t>* countClientTLSHandshakesOnMainThread =
Expand All @@ -1147,6 +1191,10 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
p.setPeerAddr(self->getPeerAddress());
onHandshook = p.getFuture();
self->ssl_sock.async_handshake(boost::asio::ssl::stream_base::client, std::move(p));
printf("main thread sleep\n");
g_network->stop();
threadSleep(10.0);
printf("main thread awake\n");
}
wait(onHandshook);
wait(delay(0, TaskPriority::Handshake));
Expand All @@ -1164,7 +1212,7 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
static SimpleCounter<int64_t>* countClientTLSHandshakeLocked =
SimpleCounter<int64_t>::makeCounter("/Net2/TLS/ClientTLSHandshakeLocked");
countClientTLSHandshakeLocked->increment(1);

printf("client handshake locked\n");
Promise<Void> connected;
doConnectHandshake(self, connected);
try {
Expand All @@ -1173,12 +1221,17 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
static SimpleCounter<int64_t>* countClientTLSHandshakesSucceed =
SimpleCounter<int64_t>::makeCounter("/Net2/TLS/ClientTLSHandshakesSucceed");
countClientTLSHandshakesSucceed->increment(1);
printf("client handshake done\n");
return Void();
}
when(wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT))) {
static SimpleCounter<int64_t>* countClientTLSHandshakesTimedout =
SimpleCounter<int64_t>::makeCounter("/Net2/TLS/ClientTLSHandshakesTimedout");
countClientTLSHandshakesTimedout->increment(1);
TraceEvent("N2_ConnectHandshakeTimeout", self->id)
.suppressFor(1.0)
.detail("PeerAddress", self->getPeerAddress());
printf("client handshake timeout\n");
throw connection_failed();
}
}
Expand Down Expand Up @@ -1309,10 +1362,13 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
void closeSocket() {
boost::system::error_code cancelError;
socket.cancel(cancelError);
printf("Socket cancelled\n");
boost::system::error_code closeError;
socket.close(closeError);
printf("Socket closed\n");
boost::system::error_code shutdownError;
ssl_sock.shutdown(shutdownError);
printf("SSL shutdown\n");
}

void onReadError(const boost::system::error_code& error) {
Expand Down