Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion cassandra/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1816,7 +1816,19 @@ def __init__(self, connection, owner):
with connection.lock:
if connection.in_flight < connection.max_request_id:
connection.in_flight += 1
connection.send_msg(OptionsMessage(), connection.get_request_id(), self._options_callback)
request_id = connection.get_request_id()
try:
connection.send_msg(OptionsMessage(), request_id, self._options_callback)
except Exception as exc:
if connection.is_control_connection:
connection.in_flight -= 1
# send_msg() registers the callback before writing to the socket,
# so a write failure must unwind that registration here.
connection._requests.pop(request_id, None)
if request_id not in connection.request_ids:
Comment thread
dkropachev marked this conversation as resolved.
connection.request_ids.append(request_id)
self._exception = exc
self._event.set()
else:
Comment thread
dkropachev marked this conversation as resolved.
self._exception = Exception("Failed to send heartbeat because connection 'in_flight' exceeds threshold")
self._event.set()
Expand Down
27 changes: 26 additions & 1 deletion tests/unit/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from cassandra import OperationTimedOut
from cassandra.cluster import Cluster
from cassandra.connection import (Connection, HEADER_DIRECTION_TO_CLIENT, ProtocolError,
locally_supported_compressions, ConnectionHeartbeat, _Frame, Timer, TimerManager,
locally_supported_compressions, ConnectionHeartbeat, HeartbeatFuture, _Frame, Timer, TimerManager,
ConnectionException, ConnectionShutdown, DefaultEndPoint, ShardAwarePortGenerator)
from cassandra.marshal import uint8_pack, uint32_pack, int32_pack
from cassandra.protocol import (write_stringmultimap, write_int, write_string,
Expand Down Expand Up @@ -463,6 +463,31 @@ def test_no_req_ids(self, *args):
holder.return_connection.assert_has_calls(
[call(max_connection)] * get_holders.call_count)

def test_heartbeat_future_releases_request_id_when_send_fails(self, *args):
connection = Connection(DefaultEndPoint('1.2.3.4'))
connection.push = Mock(side_effect=ConnectionException("write failed"))
owner = Mock()
initial_in_flight = connection.in_flight
initial_request_ids = len(connection.request_ids)

# HostConnection.return_connection releases the heartbeat's in-flight slot.
def return_connection(conn):
with conn.lock:
conn.in_flight -= 1

owner.return_connection.side_effect = return_connection

future = HeartbeatFuture(connection, owner)

with pytest.raises(ConnectionException):
future.wait(0)

owner.return_connection(connection)

assert connection.in_flight == initial_in_flight
assert len(connection.request_ids) == initial_request_ids
assert not connection._requests

def test_unexpected_response(self, *args):
request_id = 999

Expand Down
Loading