Skip to content

Commit 4eb62ce

Browse files
committed
cluster: add control-connection query fallback
Add an opt-in control-connection fallback for application queries when the driver cannot populate normal node pools, which happens in deployments that expose the cluster through a non-broadcast IP address such as a TCP proxy or a node public IP. In that mode the driver can still execute queries over the single control connection, but throughput is poor and connection churn increases the chance of request errors. This option is intentionally disabled by default and should not be used in production. Also propagate keyspace updates on the fallback path so USE keeps the control connection in sync. Tests: - tests/unit/test_cluster.py::ClusterTest::test_set_keyspace_for_all_pools_reports_all_errors - tests/unit/test_response_future.py::ResponseFutureTests::test_control_connection_fallback_updates_connection_keyspace
1 parent e6f9e9f commit 4eb62ce

4 files changed

Lines changed: 431 additions & 5 deletions

File tree

cassandra/cluster.py

Lines changed: 145 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -930,6 +930,21 @@ def default_retry_policy(self, policy):
930930
If set to :const:`None`, there will be no timeout for these queries.
931931
"""
932932

933+
allow_control_connection_query_fallback = False
934+
"""
935+
Enables an opt-in degraded path for application queries.
936+
937+
When :const:`True`, a request may be sent on the control connection if
938+
the session has no usable node connection pools. This is intended for
939+
deployments that expose the cluster through a non-broadcast IP address,
940+
such as a TCP proxy or a node's public IP address, where the driver
941+
cannot fill the normal pool set. Queries can still execute over the single
942+
control connection, but throughput is poor and connection churn raises the
943+
chance of request errors. Do not enable this in production.
944+
945+
This fallback is not used for requests targeted to an explicit host.
946+
"""
947+
933948
idle_heartbeat_interval = 30
934949
"""
935950
Interval, in seconds, on which to heartbeat idle connections. This helps
@@ -1216,13 +1231,18 @@ def __init__(self,
12161231
metadata_request_timeout: Optional[float] = None,
12171232
column_encryption_policy=None,
12181233
application_info:Optional[ApplicationInfoBase]=None,
1219-
client_routes_config:Optional[ClientRoutesConfig]=None
1234+
client_routes_config:Optional[ClientRoutesConfig]=None,
1235+
allow_control_connection_query_fallback:Optional[bool]=False
12201236
):
12211237
"""
12221238
``executor_threads`` defines the number of threads in a pool for handling asynchronous tasks such as
12231239
extablishing connection pools or refreshing metadata.
12241240
12251241
Any of the mutable Cluster attributes may be set as keyword arguments to the constructor.
1242+
``allow_control_connection_query_fallback`` is a degraded-availability
1243+
setting for cases where the driver reaches the cluster through a
1244+
non-broadcast IP address and cannot populate the normal pools. It
1245+
should not be enabled in production.
12261246
"""
12271247

12281248
# Handle port passed as string
@@ -1464,6 +1484,7 @@ def __init__(self,
14641484
self.cql_version = cql_version
14651485
self.max_schema_agreement_wait = max_schema_agreement_wait
14661486
self.control_connection_timeout = control_connection_timeout
1487+
self.allow_control_connection_query_fallback = bool(allow_control_connection_query_fallback)
14671488
self.metadata_request_timeout = self.control_connection_timeout if metadata_request_timeout is None else metadata_request_timeout
14681489
self.idle_heartbeat_interval = idle_heartbeat_interval
14691490
self.idle_heartbeat_timeout = idle_heartbeat_timeout
@@ -3369,7 +3390,7 @@ def pool_finished_setting_keyspace(pool, host_errors):
33693390
errors[pool.host] = host_errors
33703391

33713392
if not remaining_callbacks:
3372-
callback(host_errors)
3393+
callback(errors)
33733394

33743395
for pool in tuple(self._pools.values()):
33753396
pool._set_keyspace_for_all_conns(keyspace, pool_finished_setting_keyspace)
@@ -4439,6 +4460,7 @@ class ResponseFuture(object):
44394460
_spec_execution_plan = NoSpeculativeExecutionPlan()
44404461
_continuous_paging_session = None
44414462
_host = None
4463+
_control_connection_query_attempted = False
44424464
_TABLET_ROUTING_CTYPE = None
44434465

44444466
_warned_timeout = False
@@ -4459,6 +4481,7 @@ def __init__(self, session, message, query, timeout, metrics=None, prepared_stat
44594481
self._callback_lock = Lock()
44604482
self._start_time = start_time or time.time()
44614483
self._host = host
4484+
self._control_connection_query_attempted = False
44624485
self._spec_execution_plan = speculative_execution_plan or self._spec_execution_plan
44634486
self._make_query_plan()
44644487
self._event = Event()
@@ -4537,11 +4560,16 @@ def _on_timeout(self, _attempts=0):
45374560
self._connection.orphaned_threshold_reached = True
45384561

45394562
pool.return_connection(self._connection, stream_was_orphaned=True)
4563+
elif getattr(self._connection, 'is_control_connection', False):
4564+
with self._connection.lock:
4565+
self._connection.orphaned_request_ids.add(self._req_id)
4566+
if len(self._connection.orphaned_request_ids) >= self._connection.orphaned_threshold:
4567+
self._connection.orphaned_threshold_reached = True
45404568

45414569
errors = self._errors
45424570
if not errors:
45434571
if self.is_schema_agreed:
4544-
key = str(self._current_host.endpoint) if self._current_host else 'no host queried before timeout'
4572+
key = str(self._get_host_endpoint(self._current_host)) if self._current_host else 'no host queried before timeout'
45454573
errors = {key: "Client request timeout. See Session.execute[_async](timeout)"}
45464574
else:
45474575
connection = self.session.cluster.control_connection._connection
@@ -4599,14 +4627,111 @@ def send_request(self, error_no_hosts=True):
45994627
self._on_timeout()
46004628
return True
46014629
if error_no_hosts:
4630+
if self._fallback_to_control_connection():
4631+
req_id = self._query_control_connection()
4632+
if req_id is not None:
4633+
self._req_id = req_id
4634+
return True
4635+
46024636
self._set_final_exception(NoHostAvailable(
46034637
"Unable to complete the operation against any hosts", self._errors))
46044638
return False
46054639

4640+
@staticmethod
4641+
def _get_host_endpoint(host):
4642+
return getattr(host, 'endpoint', host)
4643+
4644+
def _has_usable_node_pool(self):
4645+
try:
4646+
pools = tuple(self.session._pools.values())
4647+
except (AttributeError, TypeError):
4648+
return False
4649+
4650+
return any(pool and not pool.is_shutdown for pool in pools)
4651+
4652+
def _fallback_to_control_connection(self):
4653+
if getattr(self.session.cluster, 'allow_control_connection_query_fallback', False) is not True:
4654+
return False
4655+
if self._host or self._control_connection_query_attempted:
4656+
return False
4657+
return not self._has_usable_node_pool()
4658+
4659+
def _borrow_control_connection(self, connection):
4660+
with connection.lock:
4661+
if connection.in_flight >= connection.max_request_id:
4662+
raise NoConnectionsAvailable("All request IDs are currently in use")
4663+
connection.in_flight += 1
4664+
return connection.get_request_id()
4665+
4666+
def _release_control_connection_request(self, connection, request_id):
4667+
with connection.lock:
4668+
connection.in_flight -= 1
4669+
connection.request_ids.append(request_id)
4670+
connection._requests.pop(request_id, None)
4671+
4672+
def _handle_control_connection_response(self, connection, cb, response):
4673+
with connection.lock:
4674+
connection.in_flight -= 1
4675+
cb(response)
4676+
4677+
def _query_control_connection(self, message=None, cb=None, connection=None, host=None):
4678+
self._control_connection_query_attempted = True
4679+
4680+
if message is None:
4681+
message = self.message
4682+
4683+
if connection is None:
4684+
control_connection = self.session.cluster.control_connection
4685+
connection = control_connection._connection if control_connection else None
4686+
if not connection:
4687+
self._errors['control connection'] = ConnectionException("Control connection is not connected")
4688+
return None
4689+
4690+
if host is None:
4691+
host = self.session.cluster.get_control_connection_host() or connection.endpoint
4692+
self._current_host = host
4693+
4694+
request_id = None
4695+
request_sent = False
4696+
try:
4697+
request_id = self._borrow_control_connection(connection)
4698+
self._connection = connection
4699+
result_meta = self.prepared_statement.result_metadata if self.prepared_statement else []
4700+
if cb is None:
4701+
cb = partial(self._set_result, host, connection, None)
4702+
cb = partial(self._handle_control_connection_response, connection, cb)
4703+
4704+
log.debug("No usable node pools; falling back to control connection for host %s", host)
4705+
self.request_encoded_size = connection.send_msg(message, request_id, cb=cb,
4706+
encoder=self._protocol_handler.encode_message,
4707+
decoder=self._protocol_handler.decode_message,
4708+
result_metadata=result_meta)
4709+
request_sent = True
4710+
self.attempted_hosts.append(host)
4711+
return request_id
4712+
except NoConnectionsAvailable as exc:
4713+
log.debug("Control connection is at capacity")
4714+
self._errors[host] = exc
4715+
except ConnectionBusy as exc:
4716+
log.debug("Control connection is busy")
4717+
self._errors[host] = exc
4718+
except Exception as exc:
4719+
log.debug("Error querying control connection", exc_info=True)
4720+
self._errors[host] = exc
4721+
if self._metrics is not None:
4722+
self._metrics.on_connection_error()
4723+
finally:
4724+
if request_id is not None and not request_sent:
4725+
self._release_control_connection_request(connection, request_id)
4726+
4727+
return None
4728+
46064729
def _query(self, host, message=None, cb=None):
46074730
if message is None:
46084731
message = self.message
46094732

4733+
self._control_connection_query_attempted = False
4734+
46104735
pool = self.session._pools.get(host)
46114736
if not pool:
46124737
self._errors[host] = ConnectionException("Host has been marked down or removed")
@@ -4717,12 +4842,17 @@ def start_fetching_next_page(self):
47174842
self._event.clear()
47184843
self._final_result = _NOT_SET
47194844
self._final_exception = None
4845+
self._control_connection_query_attempted = False
47204846
self._start_timer()
47214847
self.send_request()
47224848

47234849
def _reprepare(self, prepare_message, host, connection, pool):
47244850
cb = partial(self.session.submit, self._execute_after_prepare, host, connection, pool)
4725-
request_id = self._query(host, prepare_message, cb=cb)
4851+
if pool is None and getattr(connection, 'is_control_connection', False):
4852+
request_id = self._query_control_connection(prepare_message, cb=cb,
4853+
connection=connection, host=host)
4854+
else:
4855+
request_id = self._query(host, prepare_message, cb=cb)
47264856
if request_id is None:
47274857
# try to submit the original prepared statement on some other host
47284858
self.send_request()
@@ -4761,6 +4891,8 @@ def _set_result(self, host, connection, pool, response):
47614891
if isinstance(response, ResultMessage):
47624892
if response.kind == RESULT_KIND_SET_KEYSPACE:
47634893
session = getattr(self, 'session', None)
4894+
if connection is not None:
4895+
connection.keyspace = response.new_keyspace
47644896
# since we're running on the event loop thread, we need to
47654897
# use a non-blocking method for setting the keyspace on
47664898
# all connections in this session, otherwise the event
@@ -4940,7 +5072,10 @@ def _execute_after_prepare(self, host, connection, pool, response):
49405072

49415073
# use self._query to re-use the same host and
49425074
# at the same time properly borrow the connection
4943-
request_id = self._query(host)
5075+
if pool is None and getattr(connection, 'is_control_connection', False):
5076+
request_id = self._query_control_connection(connection=connection, host=host)
5077+
else:
5078+
request_id = self._query(host)
49445079
if request_id is None:
49455080
# this host errored out, move on to the next
49465081
self.send_request()
@@ -5053,6 +5188,11 @@ def _retry_task(self, reuse_connection, host):
50535188
# to retry the operation
50545189
return
50555190

5191+
if self._control_connection_query_attempted:
5192+
self._control_connection_query_attempted = False
5193+
self.send_request()
5194+
return
5195+
50565196
if reuse_connection and self._query(host) is not None:
50575197
return
50585198

docs/api/cassandra/cluster.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ Clusters and Sessions
4848

4949
.. autoattribute:: control_connection_timeout
5050

51+
.. autoattribute:: allow_control_connection_query_fallback
52+
5153
.. autoattribute:: idle_heartbeat_interval
5254

5355
.. autoattribute:: idle_heartbeat_timeout

tests/unit/test_cluster.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
InvalidRequest, Unauthorized, AuthenticationFailed, OperationTimedOut, UnsupportedOperation, RequestValidationException, ConfigurationException, ProtocolVersion
2424
from cassandra.cluster import _Scheduler, Session, Cluster, default_lbp_factory, \
2525
ExecutionProfile, _ConfigMode, EXEC_PROFILE_DEFAULT
26+
from cassandra.connection import ConnectionException
2627
from cassandra.pool import Host
2728
from cassandra.policies import HostDistance, RetryPolicy, RoundRobinPolicy, DowngradingConsistencyRetryPolicy, SimpleConvictionPolicy
2829
from cassandra.query import SimpleStatement, named_tuple_factory, tuple_factory
@@ -184,6 +185,10 @@ def test_port_range(self):
184185
with pytest.raises(ValueError):
185186
cluster = Cluster(contact_points=['127.0.0.1'], port=invalid_port)
186187

188+
def test_control_connection_query_fallback_flag(self):
189+
assert Cluster().allow_control_connection_query_fallback is False
190+
assert Cluster(allow_control_connection_query_fallback=True).allow_control_connection_query_fallback is True
191+
187192
def test_compression_autodisabled_without_libraries(self):
188193
with patch.dict('cassandra.cluster.locally_supported_compressions', {}, clear=True):
189194
with patch('cassandra.cluster.log') as patched_logger:
@@ -339,6 +344,32 @@ def test_set_keyspace_escapes_quotes(self, *_):
339344
assert query == 'USE simple_ks', (
340345
"Simple keyspace names should not be quoted, got: %r" % query)
341346

347+
@mock_session_pools
348+
def test_set_keyspace_for_all_pools_reports_all_errors(self, *_):
349+
cluster = Cluster()
350+
session = Session(
351+
cluster,
352+
[Host("127.0.0.1", SimpleConvictionPolicy, host_id=uuid.uuid4())],
353+
)
354+
355+
pool1 = Mock(host='host1')
356+
pool2 = Mock(host='host2')
357+
keyspace_error = ConnectionException("boom")
358+
359+
pool1._set_keyspace_for_all_conns.side_effect = (
360+
lambda keyspace, callback: callback(pool1, [keyspace_error])
361+
)
362+
pool2._set_keyspace_for_all_conns.side_effect = (
363+
lambda keyspace, callback: callback(pool2, [])
364+
)
365+
session._pools = {'host1': pool1, 'host2': pool2}
366+
367+
callback = Mock()
368+
session._set_keyspace_for_all_pools('ks', callback)
369+
370+
callback.assert_called_once()
371+
assert callback.call_args.args[0] == {'host1': [keyspace_error]}
372+
342373
class ProtocolVersionTests(unittest.TestCase):
343374

344375
def test_protocol_downgrade_test(self):

0 commit comments

Comments
 (0)