Skip to content

Commit 1139fa6

Browse files
Solve TCP disconnect while incomplete read deadlock (#6066) (#6092)
* Refs #23655. Add regression test Signed-off-by: Juan Lopez Fernandez <[email protected]> * Refs #23655. Solve deadlock and improve thread safety Signed-off-by: Juan Lopez Fernandez <[email protected]> * Refs #23655. Bonus track: protect status getter Signed-off-by: Juan Lopez Fernandez <[email protected]> * Refs #23655. Fix test in Windows Signed-off-by: Juan Lopez Fernandez <[email protected]> * Refs #23655. Remove deprecated code Signed-off-by: Juan Lopez Fernandez <[email protected]> * Refs #23655. Restore noexcept overloads usage Signed-off-by: Juan Lopez Fernandez <[email protected]> * Refs #23655. Apply suggestions Signed-off-by: Juan Lopez Fernandez <[email protected]> --------- Signed-off-by: Juan Lopez Fernandez <[email protected]> (cherry picked from commit f2e173c) # Conflicts: # test/blackbox/common/BlackboxTestsTransportTCP.cpp Co-authored-by: juanlofer-eprosima <[email protected]>
1 parent a552a33 commit 1139fa6

File tree

12 files changed

+172
-148
lines changed

12 files changed

+172
-148
lines changed

src/cpp/fastdds/core/condition/StatusConditionImpl.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@ const StatusMask& StatusConditionImpl::get_enabled_statuses() const
7070
return mask_;
7171
}
7272

73+
const StatusMask& StatusConditionImpl::get_raw_status() const
74+
{
75+
std::lock_guard<std::mutex> guard(mutex_);
76+
return status_;
77+
}
78+
7379
void StatusConditionImpl::set_status(
7480
const StatusMask& status,
7581
bool trigger_value)

src/cpp/fastdds/core/condition/StatusConditionImpl.hpp

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,7 @@ struct StatusConditionImpl
8080
* @brief Retrieves the list of communication statuses that are currently triggered.
8181
* @return Triggered status.
8282
*/
83-
const StatusMask& get_raw_status() const
84-
{
85-
return status_;
86-
}
83+
const StatusMask& get_raw_status() const;
8784

8885
/**
8986
* @brief Set the trigger value of a specific status

src/cpp/rtps/transport/TCPAcceptorSecure.cpp

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ void TCPAcceptorSecure::accept(
5555

5656
try
5757
{
58-
#if ASIO_VERSION >= 101200
5958
acceptor_.async_accept(
6059
[locator, parent, &ssl_context](const std::error_code& error, tcp::socket socket)
6160
{
@@ -82,33 +81,6 @@ void TCPAcceptorSecure::accept(
8281
parent->SecureSocketAccepted(nullptr, locator, error); // This method manages errors too.
8382
}
8483
});
85-
#else
86-
auto secure_socket = std::make_shared<asio::ssl::stream<asio::ip::tcp::socket>>(*io_context_, ssl_context);
87-
88-
acceptor_.async_accept(secure_socket->lowest_layer(),
89-
[locator, parent, secure_socket](const std::error_code& error)
90-
{
91-
if (!error)
92-
{
93-
ssl::stream_base::handshake_type role = ssl::stream_base::server;
94-
if (parent->configuration()->tls_config.handshake_role == TLSHSRole::CLIENT)
95-
{
96-
role = ssl::stream_base::client;
97-
}
98-
99-
secure_socket->async_handshake(role,
100-
[secure_socket, locator, parent](const std::error_code& error)
101-
{
102-
//EPROSIMA_LOG_ERROR(RTCP_TLS, "Handshake: " << error.message());
103-
parent->SecureSocketAccepted(secure_socket, locator, error);
104-
});
105-
}
106-
else
107-
{
108-
parent->SecureSocketAccepted(nullptr, locator, error); // This method manages errors too.
109-
}
110-
});
111-
#endif // if ASIO_VERSION >= 101200
11284
}
11385
catch (std::error_code& error)
11486
{

src/cpp/rtps/transport/TCPChannelResource.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ TCPChannelResource::TCPChannelResource(
6666
, parent_(parent)
6767
, locator_()
6868
, waiting_for_keep_alive_(false)
69-
, connection_status_(eConnectionStatus::eConnected)
69+
, connection_status_(eConnectionStatus::eDisconnected)
7070
, tcp_connection_type_(TCPConnectionType::TCP_ACCEPT_TYPE)
7171
{
7272
}
@@ -376,6 +376,9 @@ void TCPChannelResource::set_socket_options(
376376
asio::basic_socket<asio::ip::tcp>& socket,
377377
const TCPTransportDescriptor* options)
378378
{
379+
// Options setting should be done before connection is established
380+
assert(!connected());
381+
379382
uint32_t minimum_value = options->maxMessageSize;
380383

381384
// Set the send buffer size

src/cpp/rtps/transport/TCPChannelResource.h

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ class TCPChannelResource : public ChannelResource
5353

5454
enum eConnectionStatus
5555
{
56-
eDisconnected = 0,
56+
eDisconnecting = -1, // Transition state to disconnected, used to protect from concurrent (dis)connects.
57+
eDisconnected = 0, // Initial state.
5758
eConnecting, // Output -> Trying connection.
5859
eConnected, // Output -> Send bind message.
5960
eWaitingForBind, // Input -> Waiting for the bind message.
@@ -71,7 +72,6 @@ class TCPChannelResource : public ChannelResource
7172
std::vector<uint16_t> pending_logical_output_ports_; // Must be accessed after lock pending_logical_mutex_
7273
std::vector<uint16_t> logical_output_ports_;
7374
std::condition_variable_any logical_output_ports_updated_cv;
74-
std::mutex read_mutex_;
7575
std::recursive_mutex pending_logical_mutex_;
7676
std::atomic<eConnectionStatus> connection_status_;
7777

@@ -113,6 +113,17 @@ class TCPChannelResource : public ChannelResource
113113
return connection_status_ == eConnectionStatus::eEstablished;
114114
}
115115

116+
bool connected()
117+
{
118+
return connection_status_ >= eConnectionStatus::eConnected;
119+
}
120+
121+
bool disconnected()
122+
{
123+
// NOTE: covers both eDisconnected and eDisconnecting states
124+
return connection_status_ <= eConnectionStatus::eDisconnected;
125+
}
126+
116127
eConnectionStatus connection_status()
117128
{
118129
return connection_status_;
@@ -240,7 +251,7 @@ class TCPChannelResource : public ChannelResource
240251
* @param socket Socket on which to set the options.
241252
* @param options Descriptor with the options to set.
242253
*/
243-
static void set_socket_options(
254+
void set_socket_options(
244255
asio::basic_socket<asio::ip::tcp>& socket,
245256
const TCPTransportDescriptor* options);
246257

src/cpp/rtps/transport/TCPChannelResourceBasic.cpp

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@
1414

1515
#include <rtps/transport/TCPChannelResourceBasic.h>
1616

17-
#include <future>
1817
#include <array>
18+
#include <future>
19+
#include <mutex>
1920

2021
#include <asio.hpp>
2122
#include <fastrtps/utils/IPLocator.h>
@@ -79,13 +80,7 @@ void TCPChannelResourceBasic::connect(
7980
asio::async_connect(
8081
*socket_,
8182
endpoints,
82-
[this, channel_weak_ptr](std::error_code ec
83-
#if ASIO_VERSION >= 101200
84-
, ip::tcp::endpoint
85-
#else
86-
, ip::tcp::resolver::iterator
87-
#endif // if ASIO_VERSION >= 101200
88-
)
83+
[this, channel_weak_ptr](std::error_code ec, ip::tcp::endpoint)
8984
{
9085
if (!channel_weak_ptr.expired())
9186
{
@@ -103,25 +98,22 @@ void TCPChannelResourceBasic::connect(
10398

10499
void TCPChannelResourceBasic::disconnect()
105100
{
106-
if (eConnecting < change_status(eConnectionStatus::eDisconnected) && alive())
101+
// Go to disconnecting state to protect from concurrent connects and disconnects
102+
auto prev_status = change_status(eConnectionStatus::eDisconnecting);
103+
if (eConnecting < prev_status && alive())
107104
{
108-
std::lock_guard<std::mutex> read_lock(read_mutex_);
109-
auto socket = socket_;
105+
// Shutdown the socket to abort any ongoing read and write operations
106+
shutdown(asio::ip::tcp::socket::shutdown_both);
110107

111-
std::error_code ec;
112-
socket->shutdown(asio::ip::tcp::socket::shutdown_both, ec);
108+
cancel();
109+
close(); // Blocks until all read and write operations have finished
113110

114-
asio::post(context_, [&, socket]()
115-
{
116-
try
117-
{
118-
socket->cancel();
119-
socket->close();
120-
}
121-
catch (std::exception&)
122-
{
123-
}
124-
});
111+
// Change to disconnected state as the last step
112+
change_status(eConnectionStatus::eDisconnected);
113+
}
114+
else if (eConnectionStatus::eDisconnecting != prev_status || !alive())
115+
{
116+
change_status(eConnectionStatus::eDisconnected);
125117
}
126118
}
127119

@@ -130,10 +122,9 @@ uint32_t TCPChannelResourceBasic::read(
130122
std::size_t size,
131123
asio::error_code& ec)
132124
{
133-
std::unique_lock<std::mutex> read_lock(read_mutex_);
134-
135-
if (eConnecting < connection_status_)
125+
if (connected())
136126
{
127+
std::unique_lock<std::mutex> read_lock(read_mutex_);
137128
return static_cast<uint32_t>(asio::read(*socket_, asio::buffer(buffer, size), transfer_exactly(size), ec));
138129
}
139130

@@ -149,7 +140,7 @@ size_t TCPChannelResourceBasic::send(
149140
{
150141
size_t bytes_sent = 0;
151142

152-
if (eConnecting < connection_status_)
143+
if (connected())
153144
{
154145
std::lock_guard<std::mutex> send_guard(send_mutex_);
155146

@@ -200,23 +191,32 @@ asio::ip::tcp::endpoint TCPChannelResourceBasic::local_endpoint(
200191
void TCPChannelResourceBasic::set_options(
201192
const TCPTransportDescriptor* options)
202193
{
203-
TCPChannelResource::set_socket_options(*socket_, options);
194+
set_socket_options(*socket_, options);
204195
}
205196

206197
void TCPChannelResourceBasic::cancel()
207198
{
208-
socket_->cancel();
199+
std::error_code ec;
200+
socket_->cancel(ec); // thread safe with respect to asio's read and write methods
209201
}
210202

211203
void TCPChannelResourceBasic::close()
212204
{
213-
socket_->close();
205+
// Wait for read and write operations to finish before closing the socket (otherwise not thread safe)
206+
// NOTE: shutdown should have been called before closing to abort any ongoing operation
207+
std::unique_lock<std::mutex> send_lk(send_mutex_, std::defer_lock);
208+
std::unique_lock<std::mutex> read_lk(read_mutex_, std::defer_lock);
209+
std::lock(send_lk, read_lk); // Pre C++17 alternative to std::scoped_lock
210+
211+
std::error_code ec;
212+
socket_->close(ec);
214213
}
215214

216215
void TCPChannelResourceBasic::shutdown(
217216
asio::socket_base::shutdown_type what)
218217
{
219-
socket_->shutdown(what);
218+
std::error_code ec;
219+
socket_->shutdown(what, ec); // thread safe with respect to asio's read and write methods
220220
}
221221

222222
} // namespace rtps

src/cpp/rtps/transport/TCPChannelResourceBasic.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class TCPChannelResourceBasic : public TCPChannelResource
2828
asio::io_context& context_;
2929

3030
std::mutex send_mutex_;
31+
std::mutex read_mutex_;
3132
std::shared_ptr<asio::ip::tcp::socket> socket_;
3233

3334
public:

src/cpp/rtps/transport/TCPChannelResourceSecure.cpp

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,7 @@ void TCPChannelResourceSecure::connect(
9494
const auto secure_socket = secure_socket_;
9595

9696
asio::async_connect(secure_socket_->lowest_layer(), endpoints,
97-
[secure_socket, channel_weak_ptr, parent](const std::error_code& error
98-
#if ASIO_VERSION >= 101200
99-
, ip::tcp::endpoint
100-
#else
101-
, const tcp::resolver::iterator& /*endpoint*/
102-
#endif // if ASIO_VERSION >= 101200
103-
)
97+
[secure_socket, channel_weak_ptr, parent](const std::error_code& error, ip::tcp::endpoint)
10498
{
10599
if (!error)
106100
{
@@ -141,19 +135,33 @@ void TCPChannelResourceSecure::connect(
141135

142136
void TCPChannelResourceSecure::disconnect()
143137
{
144-
if (eConnecting < change_status(eConnectionStatus::eDisconnected) && alive())
138+
// Go to disconnecting state to protect from concurrent connects and disconnects
139+
auto prev_status = change_status(eConnectionStatus::eDisconnecting);
140+
if (eConnecting < prev_status && alive())
145141
{
146142
auto socket = secure_socket_;
147143

148144
post(context_, [&, socket]()
149145
{
150146
std::error_code ec;
151-
socket->lowest_layer().close(ec);
152147
socket->async_shutdown([&, socket](const std::error_code&)
153148
{
154149
});
150+
151+
// Close the underlying socket after SSL shutdown
152+
// NOTE: the (async) SSL shutdown may not complete before the socket is closed
153+
socket->lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both, ec);
154+
socket->lowest_layer().cancel(ec);
155+
socket->lowest_layer().close(ec);
156+
157+
// Change to disconnected state as the last step
158+
this->change_status(eConnectionStatus::eDisconnected);
155159
});
156160
}
161+
else if (eConnectionStatus::eDisconnecting != prev_status || !alive())
162+
{
163+
change_status(eConnectionStatus::eDisconnected);
164+
}
157165
}
158166

159167
uint32_t TCPChannelResourceSecure::read(
@@ -163,7 +171,7 @@ uint32_t TCPChannelResourceSecure::read(
163171
{
164172
size_t bytes_read = 0;
165173

166-
if (eConnecting < connection_status_)
174+
if (connected())
167175
{
168176
std::promise<size_t> read_bytes_promise;
169177
auto bytes_future = read_bytes_promise.get_future();
@@ -208,7 +216,7 @@ size_t TCPChannelResourceSecure::send(
208216
{
209217
size_t bytes_sent = 0;
210218

211-
if (eConnecting < connection_status_)
219+
if (connected())
212220
{
213221
if (parent_->configuration()->non_blocking_send &&
214222
!check_socket_send_buffer(header_size + size,
@@ -280,7 +288,7 @@ asio::ip::tcp::endpoint TCPChannelResourceSecure::local_endpoint(
280288
void TCPChannelResourceSecure::set_options(
281289
const TCPTransportDescriptor* options)
282290
{
283-
TCPChannelResource::set_socket_options(secure_socket_->lowest_layer(), options);
291+
set_socket_options(secure_socket_->lowest_layer(), options);
284292
}
285293

286294
void TCPChannelResourceSecure::set_tls_verify_mode(
@@ -341,6 +349,7 @@ void TCPChannelResourceSecure::close()
341349
void TCPChannelResourceSecure::shutdown(
342350
asio::socket_base::shutdown_type)
343351
{
352+
// WARNING: This function blocks until receiving the peer’s close_notify (or an error occurs).
344353
secure_socket_->shutdown();
345354
}
346355

0 commit comments

Comments
 (0)