From e26c4b13c6d0afbfc7e994176656c6f67c833b68 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Tue, 5 May 2026 21:39:46 -0400 Subject: [PATCH] session: discard stale replacement-host pools --- cassandra/cluster.py | 49 +++++++++++++++++++++++++------------- tests/unit/test_cluster.py | 28 ++++++++++++++++++++++ 2 files changed, 60 insertions(+), 17 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index a879e23417..1fce1f0749 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -4095,24 +4095,39 @@ def callback(pool, errors): metadata_host = self.cluster.metadata.get_host_by_host_id(host.host_id) target_host = metadata_host if metadata_host is not None else host - target_host_matches = False - for pool_host in tuple(retained_pools): - if pool_host is target_host: - target_host_matches = True - elif pool_host == target_host: - previous_pools.append(retained_pools.pop(pool_host)) - - if target_host_matches: - reuse_existing_pool = True + target_endpoint_changed = False + if target_host is not host: + with target_host.lock: + target_endpoint_changed = not self._endpoints_match( + target_host.endpoint, creation_endpoint) + + if target_endpoint_changed: + log.debug( + "Discarding stale connection pool for host %s; " + "metadata host endpoint changed from %s", + host, creation_endpoint) + self._invalidate_pool_creation( + host, expected_endpoint=creation_endpoint) + discard_pool = True else: - source_host = new_pool.host - if (source_host is not target_host and - target_host.sharding_info is None): - target_host.sharding_info = source_host.sharding_info - new_pool.host = target_host - retained_pools[target_host] = new_pool - self._pools = retained_pools - self._clear_pool_creation(host, creation_epoch) + target_host_matches = False + for pool_host in tuple(retained_pools): + if pool_host is target_host: + target_host_matches = True + elif pool_host == target_host: + previous_pools.append(retained_pools.pop(pool_host)) + + if target_host_matches: + reuse_existing_pool = True + else: + source_host = new_pool.host + if (source_host is not target_host and + target_host.sharding_info is None): + target_host.sharding_info = source_host.sharding_info + new_pool.host = target_host + retained_pools[target_host] = new_pool + self._pools = retained_pools + self._clear_pool_creation(host, creation_epoch) if reuse_existing_pool: log.debug("Reusing existing connection pool for host %s", host) diff --git a/tests/unit/test_cluster.py b/tests/unit/test_cluster.py index ed5934833c..f906f3e5f2 100644 --- a/tests/unit/test_cluster.py +++ b/tests/unit/test_cluster.py @@ -26,6 +26,7 @@ from cassandra.cluster import _Scheduler, Session, Cluster, default_lbp_factory, \ ExecutionProfile, _ConfigMode, EXEC_PROFILE_DEFAULT from cassandra.connection import ClientRoutesEndPoint, ConnectionException, DefaultEndPoint, SniEndPoint +from cassandra.metadata import Metadata from cassandra.pool import Host, HostConnection, _HostReconnectionHandler from cassandra.policies import HostDistance, RetryPolicy, RoundRobinPolicy, DowngradingConsistencyRetryPolicy, SimpleConvictionPolicy from cassandra.query import SimpleStatement, named_tuple_factory, tuple_factory @@ -664,6 +665,33 @@ def make_pool(host, distance, pool_session, endpoint=None): assert session._pools == {} created_pools[0].shutdown.assert_called_once_with() + def test_stale_host_pool_creation_does_not_publish_to_replacement_host(self): + host_id = uuid.uuid4() + stale_host = Host(DefaultEndPoint("127.0.0.1"), SimpleConvictionPolicy, + host_id=host_id) + replacement_host = Host(DefaultEndPoint("127.0.0.2"), + SimpleConvictionPolicy, host_id=host_id) + cluster, session, executor = self._make_cluster_and_session( + [replacement_host]) + cluster.metadata = Metadata() + cluster.metadata.add_or_return_host(replacement_host) + created_pools = [] + + def make_pool(host, distance, pool_session, endpoint=None): + pool = self._make_pool(host, distance, pool_session, endpoint) + created_pools.append(pool) + return pool + + with patch("cassandra.cluster.HostConnection", side_effect=make_pool): + future = session.add_or_renew_pool( + stale_host, is_host_addition=False) + + executor.run_next() + + assert future.result() is False + assert session._pools == {} + created_pools[0].shutdown.assert_called_once_with() + def test_remove_pool_expected_host_mismatch_invalidates_stale_creation(self): stale_host = self._make_host("127.0.0.1") replacement_host = self._make_host("127.0.0.1")