Skip to content

Commit 985ceeb

Browse files
Solve TCP disconnect while incomplete read deadlock (#6066) (#6091)
* Refs #23655. Add regression test * Refs #23655. Solve deadlock and improve thread safety * Refs #23655. Bonus track: protect status getter * Refs #23655. Fix test in Windows * Refs #23655. Remove deprecated code * Refs #23655. Restore noexcept overloads usage * Refs #23655. Apply suggestions --------- (cherry picked from commit f2e173c) Signed-off-by: Juan Lopez Fernandez <[email protected]> Co-authored-by: juanlofer-eprosima <[email protected]>
1 parent cdd01d5 commit 985ceeb

File tree

12 files changed

+172
-148
lines changed

12 files changed

+172
-148
lines changed

src/cpp/fastdds/core/condition/StatusConditionImpl.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@ const StatusMask& StatusConditionImpl::get_enabled_statuses() const
7070
return mask_;
7171
}
7272

73+
const StatusMask& StatusConditionImpl::get_raw_status() const
74+
{
75+
std::lock_guard<std::mutex> guard(mutex_);
76+
return status_;
77+
}
78+
7379
void StatusConditionImpl::set_status(
7480
const StatusMask& status,
7581
bool trigger_value)

src/cpp/fastdds/core/condition/StatusConditionImpl.hpp

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,7 @@ struct StatusConditionImpl
7878
* @brief Retrieves the list of communication statuses that are currently triggered.
7979
* @return Triggered status.
8080
*/
81-
const StatusMask& get_raw_status() const
82-
{
83-
return status_;
84-
}
81+
const StatusMask& get_raw_status() const;
8582

8683
/**
8784
* @brief Set the trigger value of a specific status

src/cpp/rtps/transport/TCPAcceptorSecure.cpp

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ void TCPAcceptorSecure::accept(
5555

5656
try
5757
{
58-
#if ASIO_VERSION >= 101200
5958
acceptor_.async_accept(
6059
[locator, parent, &ssl_context](const std::error_code& error, tcp::socket socket)
6160
{
@@ -82,33 +81,6 @@ void TCPAcceptorSecure::accept(
8281
parent->SecureSocketAccepted(nullptr, locator, error); // This method manages errors too.
8382
}
8483
});
85-
#else
86-
auto secure_socket = std::make_shared<asio::ssl::stream<asio::ip::tcp::socket>>(*io_context_, ssl_context);
87-
88-
acceptor_.async_accept(secure_socket->lowest_layer(),
89-
[locator, parent, secure_socket](const std::error_code& error)
90-
{
91-
if (!error)
92-
{
93-
ssl::stream_base::handshake_type role = ssl::stream_base::server;
94-
if (parent->configuration()->tls_config.handshake_role == TLSHSRole::CLIENT)
95-
{
96-
role = ssl::stream_base::client;
97-
}
98-
99-
secure_socket->async_handshake(role,
100-
[secure_socket, locator, parent](const std::error_code& error)
101-
{
102-
//EPROSIMA_LOG_ERROR(RTCP_TLS, "Handshake: " << error.message());
103-
parent->SecureSocketAccepted(secure_socket, locator, error);
104-
});
105-
}
106-
else
107-
{
108-
parent->SecureSocketAccepted(nullptr, locator, error); // This method manages errors too.
109-
}
110-
});
111-
#endif // if ASIO_VERSION >= 101200
11284
}
11385
catch (std::error_code& error)
11486
{

src/cpp/rtps/transport/TCPChannelResource.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ TCPChannelResource::TCPChannelResource(
6666
, parent_(parent)
6767
, locator_()
6868
, waiting_for_keep_alive_(false)
69-
, connection_status_(eConnectionStatus::eConnected)
69+
, connection_status_(eConnectionStatus::eDisconnected)
7070
, tcp_connection_type_(TCPConnectionType::TCP_ACCEPT_TYPE)
7171
{
7272
}
@@ -376,6 +376,9 @@ void TCPChannelResource::set_socket_options(
376376
asio::basic_socket<asio::ip::tcp>& socket,
377377
const TCPTransportDescriptor* options)
378378
{
379+
// Options setting should be done before connection is established
380+
assert(!connected());
381+
379382
uint32_t minimum_value = options->maxMessageSize;
380383

381384
// Set the send buffer size

src/cpp/rtps/transport/TCPChannelResource.h

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ class TCPChannelResource : public ChannelResource
5353

5454
enum eConnectionStatus
5555
{
56-
eDisconnected = 0,
56+
eDisconnecting = -1, // Transition state to disconnected, used to protect from concurrent (dis)connects.
57+
eDisconnected = 0, // Initial state.
5758
eConnecting, // Output -> Trying connection.
5859
eConnected, // Output -> Send bind message.
5960
eWaitingForBind, // Input -> Waiting for the bind message.
@@ -71,7 +72,6 @@ class TCPChannelResource : public ChannelResource
7172
std::vector<uint16_t> pending_logical_output_ports_; // Must be accessed after lock pending_logical_mutex_
7273
std::vector<uint16_t> logical_output_ports_;
7374
std::condition_variable_any logical_output_ports_updated_cv;
74-
std::mutex read_mutex_;
7575
std::recursive_mutex pending_logical_mutex_;
7676
std::atomic<eConnectionStatus> connection_status_;
7777

@@ -113,6 +113,17 @@ class TCPChannelResource : public ChannelResource
113113
return connection_status_ == eConnectionStatus::eEstablished;
114114
}
115115

116+
bool connected()
117+
{
118+
return connection_status_ >= eConnectionStatus::eConnected;
119+
}
120+
121+
bool disconnected()
122+
{
123+
// NOTE: covers both eDisconnected and eDisconnecting states
124+
return connection_status_ <= eConnectionStatus::eDisconnected;
125+
}
126+
116127
eConnectionStatus connection_status()
117128
{
118129
return connection_status_;
@@ -275,7 +286,7 @@ class TCPChannelResource : public ChannelResource
275286
* @param socket Socket on which to set the options.
276287
* @param options Descriptor with the options to set.
277288
*/
278-
static void set_socket_options(
289+
void set_socket_options(
279290
asio::basic_socket<asio::ip::tcp>& socket,
280291
const TCPTransportDescriptor* options);
281292

src/cpp/rtps/transport/TCPChannelResourceBasic.cpp

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@
1414

1515
#include <rtps/transport/TCPChannelResourceBasic.h>
1616

17-
#include <future>
1817
#include <array>
18+
#include <future>
19+
#include <mutex>
1920

2021
#include <asio.hpp>
2122
#include <fastdds/utils/IPLocator.hpp>
@@ -77,13 +78,7 @@ void TCPChannelResourceBasic::connect(
7778
asio::async_connect(
7879
*socket_,
7980
endpoints,
80-
[this, channel_weak_ptr](std::error_code ec
81-
#if ASIO_VERSION >= 101200
82-
, ip::tcp::endpoint
83-
#else
84-
, ip::tcp::resolver::iterator
85-
#endif // if ASIO_VERSION >= 101200
86-
)
81+
[this, channel_weak_ptr](std::error_code ec, ip::tcp::endpoint)
8782
{
8883
if (!channel_weak_ptr.expired())
8984
{
@@ -101,25 +96,22 @@ void TCPChannelResourceBasic::connect(
10196

10297
void TCPChannelResourceBasic::disconnect()
10398
{
104-
if (eConnecting < change_status(eConnectionStatus::eDisconnected) && alive())
99+
// Go to disconnecting state to protect from concurrent connects and disconnects
100+
auto prev_status = change_status(eConnectionStatus::eDisconnecting);
101+
if (eConnecting < prev_status && alive())
105102
{
106-
std::lock_guard<std::mutex> read_lock(read_mutex_);
107-
auto socket = socket_;
103+
// Shutdown the socket to abort any ongoing read and write operations
104+
shutdown(asio::ip::tcp::socket::shutdown_both);
108105

109-
std::error_code ec;
110-
socket->shutdown(asio::ip::tcp::socket::shutdown_both, ec);
106+
cancel();
107+
close(); // Blocks until all read and write operations have finished
111108

112-
asio::post(context_, [&, socket]()
113-
{
114-
try
115-
{
116-
socket->cancel();
117-
socket->close();
118-
}
119-
catch (std::exception&)
120-
{
121-
}
122-
});
109+
// Change to disconnected state as the last step
110+
change_status(eConnectionStatus::eDisconnected);
111+
}
112+
else if (eConnectionStatus::eDisconnecting != prev_status || !alive())
113+
{
114+
change_status(eConnectionStatus::eDisconnected);
123115
}
124116
}
125117

@@ -128,10 +120,9 @@ uint32_t TCPChannelResourceBasic::read(
128120
std::size_t size,
129121
asio::error_code& ec)
130122
{
131-
std::unique_lock<std::mutex> read_lock(read_mutex_);
132-
133-
if (eConnecting < connection_status_)
123+
if (connected())
134124
{
125+
std::unique_lock<std::mutex> read_lock(read_mutex_);
135126
return static_cast<uint32_t>(asio::read(*socket_, asio::buffer(buffer, size), transfer_exactly(size), ec));
136127
}
137128

@@ -147,7 +138,7 @@ size_t TCPChannelResourceBasic::send(
147138
{
148139
size_t bytes_sent = 0;
149140

150-
if (eConnecting < connection_status_)
141+
if (connected())
151142
{
152143
std::lock_guard<std::mutex> send_guard(send_mutex_);
153144

@@ -195,23 +186,32 @@ asio::ip::tcp::endpoint TCPChannelResourceBasic::local_endpoint(
195186
void TCPChannelResourceBasic::set_options(
196187
const TCPTransportDescriptor* options)
197188
{
198-
TCPChannelResource::set_socket_options(*socket_, options);
189+
set_socket_options(*socket_, options);
199190
}
200191

201192
void TCPChannelResourceBasic::cancel()
202193
{
203-
socket_->cancel();
194+
std::error_code ec;
195+
socket_->cancel(ec); // thread safe with respect to asio's read and write methods
204196
}
205197

206198
void TCPChannelResourceBasic::close()
207199
{
208-
socket_->close();
200+
// Wait for read and write operations to finish before closing the socket (otherwise not thread safe)
201+
// NOTE: shutdown should have been called before closing to abort any ongoing operation
202+
std::unique_lock<std::mutex> send_lk(send_mutex_, std::defer_lock);
203+
std::unique_lock<std::mutex> read_lk(read_mutex_, std::defer_lock);
204+
std::lock(send_lk, read_lk); // Pre C++17 alternative to std::scoped_lock
205+
206+
std::error_code ec;
207+
socket_->close(ec);
209208
}
210209

211210
void TCPChannelResourceBasic::shutdown(
212211
asio::socket_base::shutdown_type what)
213212
{
214-
socket_->shutdown(what);
213+
std::error_code ec;
214+
socket_->shutdown(what, ec); // thread safe with respect to asio's read and write methods
215215
}
216216

217217
} // namespace rtps

src/cpp/rtps/transport/TCPChannelResourceBasic.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class TCPChannelResourceBasic : public TCPChannelResource
2828
asio::io_context& context_;
2929

3030
std::mutex send_mutex_;
31+
std::mutex read_mutex_;
3132
std::shared_ptr<asio::ip::tcp::socket> socket_;
3233

3334
public:

src/cpp/rtps/transport/TCPChannelResourceSecure.cpp

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,7 @@ void TCPChannelResourceSecure::connect(
9191
const auto secure_socket = secure_socket_;
9292

9393
asio::async_connect(secure_socket_->lowest_layer(), endpoints,
94-
[secure_socket, channel_weak_ptr, parent](const std::error_code& error
95-
#if ASIO_VERSION >= 101200
96-
, ip::tcp::endpoint
97-
#else
98-
, const tcp::resolver::iterator& /*endpoint*/
99-
#endif // if ASIO_VERSION >= 101200
100-
)
94+
[secure_socket, channel_weak_ptr, parent](const std::error_code& error, ip::tcp::endpoint)
10195
{
10296
if (!error)
10397
{
@@ -138,19 +132,33 @@ void TCPChannelResourceSecure::connect(
138132

139133
void TCPChannelResourceSecure::disconnect()
140134
{
141-
if (eConnecting < change_status(eConnectionStatus::eDisconnected) && alive())
135+
// Go to disconnecting state to protect from concurrent connects and disconnects
136+
auto prev_status = change_status(eConnectionStatus::eDisconnecting);
137+
if (eConnecting < prev_status && alive())
142138
{
143139
auto socket = secure_socket_;
144140

145141
post(context_, [&, socket]()
146142
{
147143
std::error_code ec;
148-
socket->lowest_layer().close(ec);
149144
socket->async_shutdown([&, socket](const std::error_code&)
150145
{
151146
});
147+
148+
// Close the underlying socket after SSL shutdown
149+
// NOTE: the (async) SSL shutdown may not complete before the socket is closed
150+
socket->lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both, ec);
151+
socket->lowest_layer().cancel(ec);
152+
socket->lowest_layer().close(ec);
153+
154+
// Change to disconnected state as the last step
155+
this->change_status(eConnectionStatus::eDisconnected);
152156
});
153157
}
158+
else if (eConnectionStatus::eDisconnecting != prev_status || !alive())
159+
{
160+
change_status(eConnectionStatus::eDisconnected);
161+
}
154162
}
155163

156164
uint32_t TCPChannelResourceSecure::read(
@@ -160,7 +168,7 @@ uint32_t TCPChannelResourceSecure::read(
160168
{
161169
size_t bytes_read = 0;
162170

163-
if (eConnecting < connection_status_)
171+
if (connected())
164172
{
165173
std::promise<size_t> read_bytes_promise;
166174
auto bytes_future = read_bytes_promise.get_future();
@@ -205,7 +213,7 @@ size_t TCPChannelResourceSecure::send(
205213
{
206214
size_t bytes_sent = 0;
207215

208-
if (eConnecting < connection_status_)
216+
if (connected())
209217
{
210218
if (parent_->configuration()->non_blocking_send &&
211219
!check_socket_send_buffer(header_size + total_bytes,
@@ -278,7 +286,7 @@ asio::ip::tcp::endpoint TCPChannelResourceSecure::local_endpoint(
278286
void TCPChannelResourceSecure::set_options(
279287
const TCPTransportDescriptor* options)
280288
{
281-
TCPChannelResource::set_socket_options(secure_socket_->lowest_layer(), options);
289+
set_socket_options(secure_socket_->lowest_layer(), options);
282290
}
283291

284292
void TCPChannelResourceSecure::set_tls_verify_mode(
@@ -339,6 +347,7 @@ void TCPChannelResourceSecure::close()
339347
void TCPChannelResourceSecure::shutdown(
340348
asio::socket_base::shutdown_type)
341349
{
350+
// WARNING: This function blocks until receiving the peer’s close_notify (or an error occurs).
342351
secure_socket_->shutdown();
343352
}
344353

0 commit comments

Comments
 (0)