Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 32 additions & 17 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4095,24 +4095,39 @@
metadata_host = self.cluster.metadata.get_host_by_host_id(host.host_id)

target_host = metadata_host if metadata_host is not None else host
target_host_matches = False
for pool_host in tuple(retained_pools):
if pool_host is target_host:
target_host_matches = True
elif pool_host == target_host:
previous_pools.append(retained_pools.pop(pool_host))

if target_host_matches:
reuse_existing_pool = True
target_endpoint_changed = False
if target_host is not host:
with target_host.lock:
target_endpoint_changed = not self._endpoints_match(
target_host.endpoint, creation_endpoint)

if target_endpoint_changed:
log.debug(
"Discarding stale connection pool for host %s; "
"metadata host endpoint changed from %s",
host, creation_endpoint)
self._invalidate_pool_creation(
host, expected_endpoint=creation_endpoint)
discard_pool = True
else:
source_host = new_pool.host
if (source_host is not target_host and
target_host.sharding_info is None):
target_host.sharding_info = source_host.sharding_info
new_pool.host = target_host
retained_pools[target_host] = new_pool
self._pools = retained_pools
self._clear_pool_creation(host, creation_epoch)
target_host_matches = False
for pool_host in tuple(retained_pools):
if pool_host is target_host:
target_host_matches = True
elif pool_host == target_host:
previous_pools.append(retained_pools.pop(pool_host))

if target_host_matches:
reuse_existing_pool = True
else:
source_host = new_pool.host
if (source_host is not target_host and
target_host.sharding_info is None):
target_host.sharding_info = source_host.sharding_info
new_pool.host = target_host
retained_pools[target_host] = new_pool
self._pools = retained_pools
self._clear_pool_creation(host, creation_epoch)

if reuse_existing_pool:
log.debug("Reusing existing connection pool for host %s", host)
Expand Down Expand Up @@ -5409,7 +5424,7 @@
self._scheduled_tasks.discard(task_key)
fn, args, kwargs = task
kwargs = dict(kwargs)
future = self._executor.submit(fn, *args, **kwargs)

Check failure on line 5427 in cassandra/cluster.py

View workflow job for this annotation

GitHub Actions / test asyncio (3.11)

cannot schedule new futures after shutdown
future.add_done_callback(self._log_if_failed)
else:
self._queue.put_nowait((run_at, i, task_key, task))
Expand Down
28 changes: 28 additions & 0 deletions tests/unit/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from cassandra.cluster import _Scheduler, Session, Cluster, default_lbp_factory, \
ExecutionProfile, _ConfigMode, EXEC_PROFILE_DEFAULT
from cassandra.connection import ClientRoutesEndPoint, ConnectionException, DefaultEndPoint, SniEndPoint
from cassandra.metadata import Metadata
from cassandra.pool import Host, HostConnection, _HostReconnectionHandler
from cassandra.policies import HostDistance, RetryPolicy, RoundRobinPolicy, DowngradingConsistencyRetryPolicy, SimpleConvictionPolicy
from cassandra.query import SimpleStatement, named_tuple_factory, tuple_factory
Expand Down Expand Up @@ -664,6 +665,33 @@ def make_pool(host, distance, pool_session, endpoint=None):
assert session._pools == {}
created_pools[0].shutdown.assert_called_once_with()

def test_stale_host_pool_creation_does_not_publish_to_replacement_host(self):
host_id = uuid.uuid4()
stale_host = Host(DefaultEndPoint("127.0.0.1"), SimpleConvictionPolicy,
host_id=host_id)
replacement_host = Host(DefaultEndPoint("127.0.0.2"),
SimpleConvictionPolicy, host_id=host_id)
cluster, session, executor = self._make_cluster_and_session(
[replacement_host])
cluster.metadata = Metadata()
cluster.metadata.add_or_return_host(replacement_host)
created_pools = []

def make_pool(host, distance, pool_session, endpoint=None):
pool = self._make_pool(host, distance, pool_session, endpoint)
created_pools.append(pool)
return pool

with patch("cassandra.cluster.HostConnection", side_effect=make_pool):
future = session.add_or_renew_pool(
stale_host, is_host_addition=False)

executor.run_next()

assert future.result() is False
assert session._pools == {}
created_pools[0].shutdown.assert_called_once_with()

def test_remove_pool_expected_host_mismatch_invalidates_stale_creation(self):
stale_host = self._make_host("127.0.0.1")
replacement_host = self._make_host("127.0.0.1")
Expand Down
Loading