Skip to content

Commit 4bfcd27

Browse files
committed
Fix AsyncioConnection race conditions causing EBADF errors (#614)
Fix race conditions in AsyncioConnection that cause "[Errno 9] Bad file descriptor" errors during node restarts, especially with TLS: 1. close() now waits for _close() to complete when called from outside the event loop thread, eliminating the window where is_closed=True but the socket fd is still open. 2. handle_read() sets last_error on server EOF so factory() detects dead connections instead of returning them to callers. 3. handle_write() treats peer disconnections as clean close instead of defuncting, and both I/O handlers skip defunct() if the connection is already shutting down. Peer-disconnect detection is extracted into _is_peer_disconnect() helper covering platform-specific behaviors: - Windows: ProactorEventLoop raises plain OSError with winerror 10054 (WSAECONNRESET) or 10053 (WSAECONNABORTED) instead of ConnectionResetError. Detection uses ConnectionError base class plus winerror check. - macOS: Raises OSError(57) ENOTCONN when writing to a peer-disconnected socket, which is not a ConnectionError subclass. Detection uses errno-based checks for ENOTCONN, ESHUTDOWN, ECONNRESET, and ECONNABORTED. - Windows _close(): ProactorEventLoop does not support remove_reader/remove_writer (raises NotImplementedError). These calls are wrapped so the socket is always closed regardless, and try/finally ensures connected_event is always set even if cleanup fails.
1 parent 9c53d78 commit 4bfcd27

2 files changed

Lines changed: 408 additions & 39 deletions

File tree

cassandra/io/asyncioreactor.py

Lines changed: 110 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import errno
12
import threading
23

34
from cassandra.connection import Connection, ConnectionShutdown
@@ -12,6 +13,24 @@
1213

1314
log = logging.getLogger(__name__)
1415

16+
# Errno values that indicate the remote peer has disconnected.
17+
_PEER_DISCONNECT_ERRNOS = frozenset((
18+
errno.ENOTCONN, errno.ESHUTDOWN,
19+
errno.ECONNRESET, errno.ECONNABORTED,
20+
errno.EBADF,
21+
))
22+
23+
# Windows winerror codes for the same conditions:
24+
# 10053 = WSAECONNABORTED, 10054 = WSAECONNRESET
25+
_PEER_DISCONNECT_WINERRORS = frozenset((10053, 10054))
26+
27+
28+
def _is_peer_disconnect(err):
29+
"""Return True if *err* indicates the remote peer closed the connection."""
30+
return (isinstance(err, ConnectionError)
31+
or getattr(err, 'winerror', None) in _PEER_DISCONNECT_WINERRORS
32+
or getattr(err, 'errno', None) in _PEER_DISCONNECT_ERRNOS)
33+
1534

1635
# This module uses ``yield from`` and ``@asyncio.coroutine`` over ``await`` and
1736
# ``async def`` for pre-Python-3.5 compatibility, so keep in mind that the
@@ -140,8 +159,7 @@ def close(self):
140159
return
141160
self.is_closed = True
142161

143-
# close from the loop thread to avoid races when removing file
144-
# descriptors
162+
# Schedule async cleanup (cancel watchers, error pending requests)
145163
asyncio.run_coroutine_threadsafe(
146164
self._close(), loop=self._loop
147165
)
@@ -153,11 +171,46 @@ async def _close(self):
153171
if self._read_watcher:
154172
self._read_watcher.cancel()
155173
if self._socket:
156-
self._loop.remove_writer(self._socket.fileno())
157-
self._loop.remove_reader(self._socket.fileno())
158-
self._socket.close()
159-
160-
log.debug("Closed socket to %s" % (self.endpoint,))
174+
fd = self._socket.fileno()
175+
if fd >= 0:
176+
try:
177+
self._loop.remove_writer(fd)
178+
except NotImplementedError:
179+
# NotImplementedError: remove_reader/remove_writer are not
180+
# supported on Windows ProactorEventLoop (default since
181+
# Python 3.10). ProactorEventLoop uses completion-based
182+
# IOCP, which has no concept of "watching a fd for
183+
# readiness" to remove.
184+
pass
185+
except Exception:
186+
# It is not critical if it fails, driver can keep working,
187+
# but it should not be happening, so logged as error
188+
log.error("Unexpected error removing writer for %s",
189+
self.endpoint, exc_info=True)
190+
try:
191+
self._loop.remove_reader(fd)
192+
except NotImplementedError:
193+
# NotImplementedError: remove_reader/remove_writer are not
194+
# supported on Windows ProactorEventLoop (default since
195+
# Python 3.10). ProactorEventLoop uses completion-based
196+
# IOCP, which has no concept of "watching a fd for
197+
# readiness" to remove.
198+
pass
199+
except Exception:
200+
# It is not critical if it fails, driver can keep working,
201+
# but it should not be happening, so logged as error
202+
log.error("Unexpected error removing reader for %s",
203+
self.endpoint, exc_info=True)
204+
205+
try:
206+
self._socket.close()
207+
except OSError:
208+
# close() can fail with EIO/EBADF in rare OS-level edge cases
209+
pass
210+
except Exception:
211+
log.debug("Unexpected error closing socket to %s",
212+
self.endpoint, exc_info=True)
213+
log.debug("Closed socket to %s" % (self.endpoint,))
161214

162215
if not self.is_defunct:
163216
msg = "Connection to %s was closed" % self.endpoint
@@ -196,43 +249,61 @@ async def _push_msg(self, chunks):
196249

197250

198251
async def handle_write(self):
199-
while True:
200-
try:
252+
exc = None
253+
try:
254+
while True:
201255
next_msg = await self._write_queue.get()
202256
if next_msg:
203257
await self._loop.sock_sendall(self._socket, next_msg)
204-
except socket.error as err:
258+
except asyncio.CancelledError:
259+
pass
260+
except Exception as err:
261+
if _is_peer_disconnect(err):
262+
log.debug("Connection %s closed by peer during write: %s",
263+
self, err)
264+
else:
265+
exc = err
205266
log.debug("Exception in send for %s: %s", self, err)
206-
self.defunct(err)
207-
return
208-
except asyncio.CancelledError:
209-
return
267+
finally:
268+
self.defunct(exc or ConnectionShutdown(
269+
"Connection to %s was closed" % self.endpoint))
210270

211271
async def handle_read(self):
212-
while True:
213-
try:
214-
buf = await self._loop.sock_recv(self._socket, self.in_buffer_size)
215-
self._iobuf.write(buf)
216-
# sock_recv expects EWOULDBLOCK if socket provides no data, but
217-
# nonblocking ssl sockets raise these instead, so we handle them
218-
# ourselves by yielding to the event loop, where the socket will
219-
# get the reading/writing it "wants" before retrying
220-
except (ssl.SSLWantWriteError, ssl.SSLWantReadError):
221-
# Apparently the preferred way to yield to the event loop from within
222-
# a native coroutine based on https://github.com/python/asyncio/issues/284
223-
await asyncio.sleep(0)
224-
continue
225-
except socket.error as err:
226-
log.debug("Exception during socket recv for %s: %s",
272+
exc = None
273+
try:
274+
while True:
275+
try:
276+
buf = await self._loop.sock_recv(self._socket, self.in_buffer_size)
277+
self._iobuf.write(buf)
278+
# sock_recv expects EWOULDBLOCK if socket provides no data, but
279+
# nonblocking ssl sockets raise these instead, so we handle them
280+
# ourselves by yielding to the event loop, where the socket will
281+
# get the reading/writing it "wants" before retrying
282+
except (ssl.SSLWantWriteError, ssl.SSLWantReadError):
283+
# Apparently the preferred way to yield to the event loop from within
284+
# a native coroutine based on https://github.com/python/asyncio/issues/284
285+
await asyncio.sleep(0)
286+
continue
287+
288+
if buf and self._iobuf.tell():
289+
self.process_io_buffer()
290+
else:
291+
log.debug("Connection %s closed by server", self)
292+
exc = ConnectionShutdown(
293+
"Connection to %s was closed by server" % self.endpoint)
294+
return
295+
except asyncio.CancelledError:
296+
# Task cancellation is treated as a normal connection shutdown;
297+
# cleanup and marking the connection as defunct are handled in finally.
298+
pass
299+
except Exception as err:
300+
if _is_peer_disconnect(err):
301+
log.debug("Connection %s closed by peer during read: %s",
227302
self, err)
228-
self.defunct(err)
229-
return # leave the read loop
230-
except asyncio.CancelledError:
231-
return
232-
233-
if buf and self._iobuf.tell():
234-
self.process_io_buffer()
235303
else:
236-
log.debug("Connection %s closed by server", self)
237-
self.close()
238-
return
304+
exc = err
305+
log.debug("Exception during socket recv for %s: %s",
306+
self, err)
307+
finally:
308+
self.defunct(exc or ConnectionShutdown(
309+
"Connection to %s was closed" % self.endpoint))

0 commit comments

Comments
 (0)