Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 136 additions & 57 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1916,8 +1916,9 @@

log.debug("Waiting to acquire lock for handling up status of node %s", host)
with host.lock:
if host._currently_handling_node_up:
log.debug("Another thread is already handling up status of node %s", host)
if (host._currently_handling_node_up or
getattr(host, "_currently_handling_node_addition", False)):
log.debug("Another thread is already handling up/add status of node %s", host)
return

if host.is_up:
Expand Down Expand Up @@ -1958,8 +1959,10 @@
future = session.add_or_renew_pool(host, is_host_addition=False)
if future is not None:
have_future = True
future.add_done_callback(callback)
futures.add(future)

for future in tuple(futures):
future.add_done_callback(callback)
except Exception:
log.exception("Unexpected failure handling node %s being marked up:", host)
for future in futures:
Expand Down Expand Up @@ -2050,69 +2053,98 @@

log.debug("Handling new host %r and notifying listeners", host)

self.profile_manager.on_add(host)
self.control_connection.on_add(host, refresh_nodes)
# Keep refresh-time pool rebuilds from racing this host's pool creation.
with host.lock:
if getattr(host, "_currently_handling_node_addition", False):
log.debug("Another thread is already handling add status of node %s", host)
return
host._currently_handling_node_addition = True

distance = self.profile_manager.distance(host)
if distance != HostDistance.IGNORED:
self._prepare_all_queries(host)
log.debug("Done preparing queries for new host %r", host)
have_future = False
add_aborted = False
futures = set()
try:
self.profile_manager.on_add(host)
self.control_connection.on_add(host, refresh_nodes)

if distance == HostDistance.IGNORED:
log.debug("Not adding connection pool for new host %r because the "
"load balancing policy has marked it as IGNORED", host)
self._finalize_add(host, set_up=False)
return
distance = self.profile_manager.distance(host)
if distance != HostDistance.IGNORED:
self._prepare_all_queries(host)
log.debug("Done preparing queries for new host %r", host)

futures_lock = Lock()
futures_results = []
futures = set()
if distance == HostDistance.IGNORED:
log.debug("Not adding connection pool for new host %r because the "
"load balancing policy has marked it as IGNORED", host)
self._finalize_add(host, set_up=False)
return

def future_completed(future):
with futures_lock:
futures.discard(future)
futures_lock = Lock()
futures_results = []

try:
futures_results.append(future.result())
except Exception as exc:
futures_results.append(exc)
def future_completed(future):
with futures_lock:
futures.discard(future)

if futures:
return
if add_aborted:
return

log.debug('All futures have completed for added host %s', host)
try:
futures_results.append(future.result())
except Exception as exc:
futures_results.append(exc)

for exc in [f for f in futures_results if isinstance(f, Exception)]:
log.error("Unexpected failure while adding node %s, will not mark up:", host, exc_info=exc)
return
if futures:
return

if not all(futures_results):
log.warning("Connection pool could not be created, not marking node %s up", host)
return
log.debug('All futures have completed for added host %s', host)

self._finalize_add(host)
for exc in [f for f in futures_results if isinstance(f, Exception)]:
log.error("Unexpected failure while adding node %s, will not mark up:", host, exc_info=exc)
with host.lock:
host._currently_handling_node_addition = False
return

have_future = False
for session in tuple(self.sessions):
future = session.add_or_renew_pool(host, is_host_addition=True)
if future is not None:
have_future = True
futures.add(future)
if not all(futures_results):
log.warning("Connection pool could not be created, not marking node %s up", host)
with host.lock:
host._currently_handling_node_addition = False
return

self._finalize_add(host)

for session in tuple(self.sessions):
future = session.add_or_renew_pool(host, is_host_addition=True)
if future is not None:
have_future = True
futures.add(future)

for future in tuple(futures):
future.add_done_callback(future_completed)

if not have_future:
self._finalize_add(host)
if not have_future:
self._finalize_add(host)
except Exception:
add_aborted = True
for future in tuple(futures):
future.cancel()
with host.lock:
host._currently_handling_node_addition = False
raise

def _finalize_add(self, host, set_up=True):
if set_up:
host.set_up()
try:
if set_up:
host.set_up()

for listener in self.listeners:
listener.on_add(host)
for listener in self.listeners:
listener.on_add(host)

# see if there are any pools to add or remove now that the host is marked up
for session in tuple(self.sessions):
session.update_created_pools()
# see if there are any pools to add or remove now that the host is marked up
for session in tuple(self.sessions):
session.update_created_pools()
finally:
with host.lock:
host._currently_handling_node_addition = False

def on_remove(self, host):
if self.is_shutdown:
Expand All @@ -2137,7 +2169,8 @@
self.on_down(host, is_host_addition, expect_host_to_be_down)
return is_down

def add_host(self, endpoint, datacenter=None, rack=None, signal=True, refresh_nodes=True, host_id=None):
def add_host(self, endpoint, datacenter=None, rack=None, signal=True, refresh_nodes=True, host_id=None,
is_zero_token=None):
"""
Called when adding initial contact points and when the control
connection subsequently discovers a new node.
Expand All @@ -2147,8 +2180,16 @@
"""
with self.metadata._hosts_lock:
if endpoint in self.metadata._host_id_by_endpoint:
return self.metadata._hosts[self.metadata._host_id_by_endpoint[endpoint]], False
host, new = self.metadata.add_or_return_host(Host(endpoint, self.conviction_policy_factory, datacenter, rack, host_id=host_id))
host = self.metadata._hosts[self.metadata._host_id_by_endpoint[endpoint]]
if is_zero_token is not None:
host.is_zero_token = is_zero_token
return host, False
host = Host(endpoint, self.conviction_policy_factory, datacenter, rack, host_id=host_id)
if is_zero_token is not None:
host.is_zero_token = is_zero_token
host, new = self.metadata.add_or_return_host(host)
if not new and is_zero_token is not None:
host.is_zero_token = is_zero_token
if new and signal:
log.info("New Cassandra host %r discovered", host)
self.on_add(host, refresh_nodes)
Expand Down Expand Up @@ -3315,7 +3356,10 @@
# we don't eagerly set is_up on previously ignored hosts. None is included here
# to allow us to attempt connections to hosts that have gone from ignored to something
# else.
if distance != HostDistance.IGNORED and host.is_up in (True, None):
# on_up() and on_add() already own pool creation for hosts in flight.
if (distance != HostDistance.IGNORED and host.is_up in (True, None) and
not host._currently_handling_node_up and
not getattr(host, "_currently_handling_node_addition", False)):
future = self.add_or_renew_pool(host, False)
elif distance != pool.host_distance:
# the distance has changed
Expand Down Expand Up @@ -3864,6 +3908,8 @@
# every node in the cluster was in the contact points, we won't discover
# any new nodes, so we need this additional check. (See PYTHON-90)
should_rebuild_token_map = force_token_rebuild or self._cluster.metadata.partitioner is None
zero_token_status_changed = False
promoted_zero_token_hosts = []
for row in peers_result:
if not self._is_valid_peer(row):
continue
Expand All @@ -3884,10 +3930,20 @@
host = self._cluster.metadata.get_host(endpoint)
datacenter = row.get("data_center")
rack = row.get("rack")
tokens = row.get("tokens", None)
has_token_status = "tokens" in row
is_zero_token = has_token_status and not tokens
token_status = is_zero_token if has_token_status else None

if host is None:
host = self._cluster.metadata.get_host_by_host_id(host_id)
if host and host.endpoint != endpoint:
if has_token_status:
status_changed = self._update_zero_token_info(host, is_zero_token)
zero_token_status_changed |= status_changed
should_rebuild_token_map |= status_changed
if status_changed and not is_zero_token and host.is_up is not True:
promoted_zero_token_hosts.append(host)
log.debug("[control connection] Updating host ip from %s to %s for (%s)", host.endpoint, endpoint, host_id)
reconnector = host.get_and_set_reconnection_handler(None)
if reconnector:
Expand All @@ -3901,11 +3957,20 @@

if host is None:
log.debug("[control connection] Found new host to connect to: %s", endpoint)
host, _ = self._cluster.add_host(endpoint, datacenter=datacenter, rack=rack, signal=True, refresh_nodes=False, host_id=host_id)
host, _ = self._cluster.add_host(endpoint, datacenter=datacenter, rack=rack, signal=True,
refresh_nodes=False, host_id=host_id,
is_zero_token=token_status)
should_rebuild_token_map = True
else:
should_rebuild_token_map |= self._update_location_info(host, datacenter, rack)

if has_token_status:
status_changed = self._update_zero_token_info(host, is_zero_token)
zero_token_status_changed |= status_changed
should_rebuild_token_map |= status_changed
if status_changed and not is_zero_token and host.is_up is not True:
promoted_zero_token_hosts.append(host)

host.host_id = host_id
host.broadcast_address = _NodeInfo.get_broadcast_address(row)
host.broadcast_port = _NodeInfo.get_broadcast_port(row)
Expand All @@ -3916,7 +3981,6 @@
host.dse_workload = row.get("workload")
host.dse_workloads = row.get("workloads")

tokens = row.get("tokens", None)
if partitioner and tokens and self._token_meta_enabled:
token_map[host] = tokens
self._cluster.metadata.update_host(host, old_endpoint=endpoint)
Expand All @@ -3932,6 +3996,22 @@
log.debug("[control connection] Rebuilding token map due to topology changes")
self._cluster.metadata.rebuild_token_map(partitioner, token_map)

for host in promoted_zero_token_hosts:
self._cluster.on_up(host)

if zero_token_status_changed:
for session in tuple(getattr(self._cluster, "sessions", ())):
session.update_created_pools()

@staticmethod
def _update_zero_token_info(host, is_zero_token):
is_zero_token = bool(is_zero_token)
if host.is_zero_token == is_zero_token:
return False

host.is_zero_token = is_zero_token
return True

@staticmethod
def _is_valid_peer(row):
broadcast_rpc = _NodeInfo.get_broadcast_rpc_address(row)
Expand Down Expand Up @@ -3963,9 +4043,8 @@

if "tokens" in row and not row.get("tokens"):
log.debug(
"Found a zero-token node - tokens is None (broadcast_rpc: %s, host_id: %s). Ignoring host." %
"Found a zero-token node - tokens are empty (broadcast_rpc: %s, host_id: %s). Adding host without tokens." %
(broadcast_rpc, host_id))
return False

return True

Expand Down Expand Up @@ -4340,7 +4419,7 @@
fn, args, kwargs = task
kwargs = dict(kwargs)
future = self._executor.submit(fn, *args, **kwargs)
future.add_done_callback(self._log_if_failed)

Check failure on line 4422 in cassandra/cluster.py

View workflow job for this annotation

GitHub Actions / test asyncore (3.11)

cannot schedule new futures after shutdown

Check failure on line 4422 in cassandra/cluster.py

View workflow job for this annotation

GitHub Actions / test asyncio (3.12)

cannot schedule new futures after shutdown
else:
self._queue.put_nowait((run_at, i, task))
break
Expand Down
Loading
Loading