Skip to content

Commit 7aee63e

Browse files
committed
Restore timeout diagnostics and libev cleanup
1 parent bd4b77a commit 7aee63e

7 files changed

Lines changed: 67 additions & 10 deletions

File tree

cassandra/__init__.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -687,10 +687,29 @@ class OperationTimedOut(DriverException):
687687
The last :class:`~.Host` this operation was attempted against.
688688
"""
689689

690-
def __init__(self, errors=None, last_host=None):
690+
timeout = None
691+
"""
692+
The timeout value (in seconds) that was in effect when the operation
693+
timed out, or ``None`` if not applicable.
694+
"""
695+
696+
in_flight = None
697+
"""
698+
The number of in-flight requests on the connection at the time of
699+
the timeout (includes orphaned requests), or ``None`` if not applicable.
700+
"""
701+
702+
def __init__(self, errors=None, last_host=None, timeout=None, in_flight=None):
691703
self.errors = errors
692704
self.last_host = last_host
705+
self.timeout = timeout
706+
self.in_flight = in_flight
693707
message = "errors=%s, last_host=%s" % (self.errors, self.last_host)
708+
if self.timeout is not None:
709+
message += " (timeout=%ss" % self.timeout
710+
if self.in_flight is not None:
711+
message += ", in_flight=%d" % self.in_flight
712+
message += ")"
694713
Exception.__init__(self, message)
695714

696715

cassandra/cluster.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1694,7 +1694,9 @@ def add_execution_profile(self, name, profile, pool_wait_timeout=5):
16941694
futures.update(session.update_created_pools())
16951695
_, not_done = wait_futures(futures, pool_wait_timeout)
16961696
if not_done:
1697-
raise OperationTimedOut("Failed to create all new connection pools in the %ss timeout.")
1697+
raise OperationTimedOut(
1698+
"Failed to create all new connection pools in the %ss timeout." % pool_wait_timeout,
1699+
timeout=pool_wait_timeout)
16981700

16991701
def connection_factory(self, endpoint, host_conn = None, *args, **kwargs):
17001702
"""
@@ -4794,6 +4796,7 @@ def _on_timeout(self, _attempts=0):
47944796
)
47954797
return
47964798

4799+
conn_in_flight = None
47974800
if self._connection is not None:
47984801
try:
47994802
self._connection._requests.pop(self._req_id)
@@ -4804,9 +4807,13 @@ def _on_timeout(self, _attempts=0):
48044807
except KeyError:
48054808
key = "Connection defunct by heartbeat"
48064809
errors = {key: "Client request timeout. See Session.execute[_async](timeout)"}
4807-
self._set_final_exception(OperationTimedOut(errors, self._current_host))
4810+
self._set_final_exception(OperationTimedOut(errors, self._current_host,
4811+
timeout=self.timeout,
4812+
in_flight=self._connection.in_flight))
48084813
return
48094814

4815+
# Capture connection stats before pool.return_connection() can alter state.
4816+
conn_in_flight = self._connection.in_flight
48104817
pool = self.session._pools.get(self._current_host)
48114818
if pool and not pool.is_shutdown:
48124819
# Do not return the stream ID to the pool yet. We cannot reuse it
@@ -4831,7 +4838,9 @@ def _on_timeout(self, _attempts=0):
48314838
host = str(connection.endpoint) if connection else 'unknown'
48324839
errors = {host: "Request timed out while waiting for schema agreement. See Session.execute[_async](timeout) and Cluster.max_schema_agreement_wait."}
48334840

4834-
self._set_final_exception(OperationTimedOut(errors, self._current_host))
4841+
self._set_final_exception(OperationTimedOut(errors, self._current_host,
4842+
timeout=self.timeout,
4843+
in_flight=conn_in_flight))
48354844

48364845
def _on_speculative_execute(self):
48374846
self._timer = None

cassandra/connection.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1827,7 +1827,10 @@ def wait(self, timeout):
18271827
if self._exception:
18281828
raise self._exception
18291829
else:
1830-
raise OperationTimedOut("Connection heartbeat timeout after %s seconds" % (timeout,), self.connection.endpoint)
1830+
raise OperationTimedOut("Connection heartbeat timeout after %s seconds" % (timeout,),
1831+
self.connection.endpoint,
1832+
timeout=timeout,
1833+
in_flight=self.connection.in_flight)
18311834

18321835
def _options_callback(self, response):
18331836
if isinstance(response, SupportedMessage):

cassandra/io/libevreactor.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
# limitations under the License.
1414
import atexit
1515
from collections import deque
16-
from functools import partial
1716
import logging
1817
import os
1918
import socket
@@ -45,6 +44,12 @@ def _cleanup(loop):
4544
loop._cleanup()
4645

4746

47+
def _atexit_cleanup():
48+
global _global_loop
49+
if _global_loop is not None:
50+
_cleanup(_global_loop)
51+
52+
4853
class LibevLoop(object):
4954

5055
def __init__(self):
@@ -233,7 +238,7 @@ def _loop_will_run(self, prepare):
233238

234239

235240
_global_loop = None
236-
atexit.register(partial(_cleanup, _global_loop))
241+
atexit.register(_atexit_cleanup)
237242

238243

239244
class LibevConnection(Connection):

tests/unit/io/test_libevreactor.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,19 @@ def test_watchers_are_finished(self):
8888

8989
_global_loop._shutdown = False
9090

91+
def test_atexit_cleanup_uses_current_global_loop(self):
92+
from cassandra.io import libevreactor
93+
94+
old_loop = libevreactor._global_loop
95+
current_loop = Mock()
96+
libevreactor._global_loop = current_loop
97+
self.addCleanup(setattr, libevreactor, '_global_loop', old_loop)
98+
99+
with patch.object(libevreactor, '_cleanup') as cleanup:
100+
libevreactor._atexit_cleanup()
101+
102+
cleanup.assert_called_once_with(current_loop)
103+
91104

92105
class LibevTimerPatcher(unittest.TestCase):
93106

tests/unit/test_connection.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,8 @@ def send_msg(msg, req_id, msg_callback):
520520
assert isinstance(exc, OperationTimedOut)
521521
assert exc.errors == 'Connection heartbeat timeout after 0.05 seconds'
522522
assert exc.last_host == DefaultEndPoint('localhost')
523+
assert exc.timeout == 0.05
524+
assert isinstance(exc.in_flight, int)
523525
holder.return_connection.assert_has_calls(
524526
[call(connection)] * get_holders.call_count)
525527

tests/unit/test_response_future.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,8 @@ def test_heartbeat_defunct_deadlock(self):
142142

143143
connection = MagicMock(spec=Connection)
144144
connection._requests = {}
145+
connection.in_flight = 5
146+
connection.orphaned_request_ids = set()
145147

146148
pool = Mock()
147149
pool.is_shutdown = False
@@ -162,8 +164,10 @@ def test_heartbeat_defunct_deadlock(self):
162164

163165
# Simulate ResponseFuture timing out
164166
rf._on_timeout()
165-
with pytest.raises(OperationTimedOut, match="Connection defunct by heartbeat"):
167+
with pytest.raises(OperationTimedOut, match="Connection defunct by heartbeat") as exc_info:
166168
rf.result()
169+
assert exc_info.value.timeout == 1
170+
assert exc_info.value.in_flight == 5
167171

168172
def test_read_timeout_error_message(self):
169173
session = self.make_session()
@@ -653,7 +657,7 @@ def test_timeout_does_not_release_stream_id(self):
653657
pool = self.make_pool()
654658
session._pools.get.return_value = pool
655659
connection = Mock(spec=Connection, lock=RLock(), _requests={}, request_ids=deque(),
656-
orphaned_request_ids=set(), orphaned_threshold=256)
660+
orphaned_request_ids=set(), orphaned_threshold=256, in_flight=3)
657661
pool.borrow_connection.return_value = (connection, 1)
658662

659663
rf = self.make_response_future(session)
@@ -663,8 +667,10 @@ def test_timeout_does_not_release_stream_id(self):
663667

664668
rf._on_timeout()
665669
pool.return_connection.assert_called_once_with(connection, stream_was_orphaned=True)
666-
with pytest.raises(OperationTimedOut, match="Client request timeout"):
670+
with pytest.raises(OperationTimedOut, match="Client request timeout") as exc_info:
667671
rf.result()
672+
assert exc_info.value.timeout == 1
673+
assert exc_info.value.in_flight == 3
668674

669675
assert len(connection.request_ids) == 0, \
670676
"Request IDs should be empty but it's not: {}".format(connection.request_ids)

0 commit comments

Comments
 (0)