Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 191 additions & 17 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -3531,6 +3531,7 @@
# shutdown) since implementing __del__ disables the cycle detector
self._cluster = weakref.proxy(cluster)
self._connection = None
self._last_connection_endpoint = None
self._timeout = timeout

self._schema_event_refresh_window = schema_event_refresh_window
Expand Down Expand Up @@ -3564,6 +3565,7 @@
with self._lock:
old = self._connection
self._connection = conn
self._last_connection_endpoint = conn.endpoint

if old:
log.debug("[control connection] Closing old connection %r, replacing with %r", old, conn)
Expand Down Expand Up @@ -3917,8 +3919,10 @@
host.dse_workloads = row.get("workloads")

tokens = row.get("tokens", None)
if partitioner and tokens and self._token_meta_enabled:
token_map[host] = tokens
if partitioner and self._token_meta_enabled:
should_rebuild_token_map |= self._tokens_changed(host, tokens)
if tokens:
token_map[host] = tokens
self._cluster.metadata.update_host(host, old_endpoint=endpoint)

for old_host_id, old_host in self._cluster.metadata.all_hosts_items():
Expand Down Expand Up @@ -3963,12 +3967,41 @@

if "tokens" in row and not row.get("tokens"):
log.debug(
"Found a zero-token node - tokens is None (broadcast_rpc: %s, host_id: %s). Ignoring host." %
"Found a zero-token node (broadcast_rpc: %s, host_id: %s). "
"Keeping host for load balancing, but omitting it from the token map." %
(broadcast_rpc, host_id))
return False

return True

def _tokens_changed(self, host, tokens):
current_token_map = self._cluster.metadata.token_map
if current_token_map is None:
return False

if isinstance(current_token_map, dict):
current_tokens = current_token_map.get(host)
return set(current_tokens or ()) != set(tokens or ())

token_to_host_owner = getattr(current_token_map, 'token_to_host_owner', None)
token_class = getattr(current_token_map, 'token_class', None)
if token_to_host_owner is None or token_class is None:
return False

current_tokens = set(
token for token, owner in token_to_host_owner.items()
if owner == host)
if not tokens:
return bool(current_tokens)

try:
refreshed_tokens = set(token_class.from_string(token) for token in tokens)
except Exception:
log.debug("[control connection] Unable to compare refreshed tokens for %s",
host, exc_info=True)
return True

return current_tokens != refreshed_tokens

def _update_location_info(self, host, datacenter, rack):
if host.datacenter == datacenter and host.rack == rack:
return False
Expand Down Expand Up @@ -4094,33 +4127,28 @@

if not connection:
connection = self._connection
local_address = self._schema_agreement_endpoint(connection)

if preloaded_results:
log.debug("[control connection] Attempting to use preloaded results for schema agreement")

peers_result = preloaded_results[0]
local_result = preloaded_results[1]
schema_mismatches = self._get_schema_mismatches(peers_result, local_result, connection.endpoint)
schema_mismatches = self._get_schema_mismatches(peers_result, local_result, local_address)
if schema_mismatches is None:
return True

log.debug("[control connection] Waiting for schema agreement")
start = self._time.time()
elapsed = 0
cl = ConsistencyLevel.ONE
schema_mismatches = None
select_peers_query = self._get_peers_query(self.PeersQueryType.PEERS_SCHEMA, connection)

while elapsed < total_timeout:
peers_query = QueryMessage(query=maybe_add_timeout_to_query(select_peers_query, self._metadata_request_timeout),
consistency_level=cl)
local_query = QueryMessage(query=maybe_add_timeout_to_query(self._SELECT_SCHEMA_LOCAL, self._metadata_request_timeout),
consistency_level=cl)
try:
remaining = total_timeout - elapsed
timeout = min(self._timeout, remaining) if self._timeout is not None else remaining
peers_result, local_result = connection.wait_for_responses(
peers_query, local_query, timeout=timeout)
peers_result, local_result, local_address = self._get_schema_agreement_results(
connection, timeout)
except OperationTimedOut as timeout:
log.debug("[control connection] Timed out waiting for "
"response during schema agreement check: %s", timeout)
Expand All @@ -4131,9 +4159,13 @@
log.debug("[control connection] Aborting wait for schema match due to shutdown")
return None
else:
raise
log.debug("[control connection] Connection lost during schema agreement check")
return False
except (ConnectionBusy, ConnectionException, NoConnectionsAvailable) as exc:
log.debug("[control connection] Unable to check schema agreement: %s", exc)
return False

schema_mismatches = self._get_schema_mismatches(peers_result, local_result, connection.endpoint)
schema_mismatches = self._get_schema_mismatches(peers_result, local_result, local_address)
if schema_mismatches is None:
return True

Expand All @@ -4142,9 +4174,150 @@
elapsed = self._time.time() - start

log.warning("Node %s is reporting a schema disagreement: %s",
connection.endpoint, schema_mismatches)
local_address, schema_mismatches)
return False

def _get_schema_agreement_results(self, connection, timeout):
if self._is_connection_unusable(connection):
return self._get_schema_agreement_results_from_session(connection, timeout)

try:
return self._get_schema_agreement_results_from_connection(connection, timeout)
except OperationTimedOut:
raise
except (ConnectionBusy, ConnectionException):
if self._is_shutdown:
raise
self.return_connection(connection)
log.debug("[control connection] Falling back to session connection for schema agreement")
return self._get_schema_agreement_results_from_session(connection, timeout)

def _get_schema_agreement_results_from_connection(self, connection, timeout):
peers_query, local_query = self._schema_agreement_queries(connection)
peers_result, local_result = connection.wait_for_responses(peers_query, local_query, timeout=timeout)
return peers_result, local_result, connection.endpoint

def _get_schema_agreement_results_from_session(self, connection, timeout):
endpoint = self._schema_agreement_endpoint(connection)
deadline = None if timeout is None else time.time() + timeout

pool, borrowed_connection, request_id = self._borrow_schema_agreement_connection(endpoint, timeout)
peers_query, _ = self._schema_agreement_queries(borrowed_connection)
local_address = borrowed_connection.endpoint
peers_result = self._wait_for_borrowed_schema_response(
pool, borrowed_connection, request_id, peers_query, self._remaining_timeout(deadline))

pool, borrowed_connection, request_id = self._borrow_schema_agreement_connection(
endpoint, self._remaining_timeout(deadline))
_, local_query = self._schema_agreement_queries(borrowed_connection)
local_result = self._wait_for_borrowed_schema_response(
pool, borrowed_connection, request_id, local_query, self._remaining_timeout(deadline))

return peers_result, local_result, local_address

def _schema_agreement_queries(self, connection):
cl = ConsistencyLevel.ONE
select_peers_query = self._get_peers_query(self.PeersQueryType.PEERS_SCHEMA, connection)
peers_query = QueryMessage(query=maybe_add_timeout_to_query(select_peers_query, self._metadata_request_timeout),
consistency_level=cl)
local_query = QueryMessage(query=maybe_add_timeout_to_query(self._SELECT_SCHEMA_LOCAL, self._metadata_request_timeout),
consistency_level=cl)
return peers_query, local_query

def _remaining_timeout(self, deadline):
if deadline is None:
return None
return max(0, deadline - time.time())

def _borrow_schema_agreement_connection(self, endpoint, timeout):
last_error = None
for pool in self._schema_agreement_pools(endpoint):
try:
return (pool,) + pool.borrow_connection(timeout=timeout)
except (NoConnectionsAvailable, ConnectionBusy, ConnectionException) as exc:
last_error = exc
log.debug("[control connection] Unable to borrow connection for schema agreement: %s", exc)

if last_error:
raise last_error
raise NoConnectionsAvailable("No session connection available for schema agreement on %s" % (endpoint,))

def _schema_agreement_pools(self, endpoint):
if endpoint is None:
return

host = self._cluster.metadata.get_host(endpoint)
for session in tuple(getattr(self._cluster, 'sessions', ())):
pools = getattr(session, '_pools', {})
pool = pools.get(host) if host else None
if pool is None:
for pool_host, candidate_pool in tuple(pools.items()):
if getattr(pool_host, 'endpoint', None) == endpoint:
pool = candidate_pool
break
if pool and not pool.is_shutdown:
yield pool

def _wait_for_borrowed_schema_response(self, pool, connection, request_id, message, timeout):
event = Event()
responses = []

def callback(response):
responses.append(response)
event.set()

sent = False
orphaned = False
try:
connection.send_msg(message, request_id, callback)
sent = True
if not event.wait(timeout):
orphaned = self._orphan_borrowed_request(pool, connection, request_id)
raise OperationTimedOut(timeout=timeout, in_flight=getattr(connection, 'in_flight', None))

response = responses[0]
if isinstance(response, Exception):
if hasattr(response, 'to_exception'):
response = response.to_exception()
raise response
return response
except Exception:
if not sent:
self._return_request_id_if_unused(connection, request_id)
raise
finally:
if not orphaned:
pool.return_connection(connection)

def _orphan_borrowed_request(self, pool, connection, request_id):
try:
connection._requests.pop(request_id)
except KeyError:
return False

with connection.lock:
connection.orphaned_request_ids.add(request_id)
if len(connection.orphaned_request_ids) >= connection.orphaned_threshold:
connection.orphaned_threshold_reached = True

pool.return_connection(connection, stream_was_orphaned=True)
return True

def _return_request_id_if_unused(self, connection, request_id):
with connection.lock:
if request_id in connection._requests or request_id in connection.orphaned_request_ids:
return
if request_id not in connection.request_ids:
connection.request_ids.append(request_id)

def _schema_agreement_endpoint(self, connection):
return getattr(connection, 'endpoint', None) or self._last_connection_endpoint

def _is_connection_unusable(self, connection):
return (connection is None or
getattr(connection, 'is_closed', False) or
getattr(connection, 'is_defunct', False))

def _get_schema_mismatches(self, peers_result, local_result, local_address):
peers_result = dict_factory(peers_result.column_names, peers_result.parsed_rows)

Expand Down Expand Up @@ -4265,7 +4438,8 @@
return [c] if c else []

def return_connection(self, connection):
if connection is self._connection and (connection.is_defunct or connection.is_closed):
if (connection is self._connection and
(getattr(connection, 'is_defunct', False) or getattr(connection, 'is_closed', False))):
self.reconnect()


Expand Down Expand Up @@ -4340,7 +4514,7 @@
self._scheduled_tasks.discard(task)
fn, args, kwargs = task
kwargs = dict(kwargs)
future = self._executor.submit(fn, *args, **kwargs)

Check failure on line 4517 in cassandra/cluster.py

View workflow job for this annotation

GitHub Actions / test libev (3.12)

cannot schedule new futures after shutdown
future.add_done_callback(self._log_if_failed)
else:
self._queue.put_nowait((run_at, i, task))
Expand Down
5 changes: 5 additions & 0 deletions cassandra/policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,11 @@ class TokenAwarePolicy(LoadBalancingPolicy):

If no :attr:`~.Statement.routing_key` is set on the query, the child
policy's query plan will be used as is.

Token awareness only applies to token-owning nodes. Zero-token CQL
proxy or front-layer nodes are not replicas. When those nodes are the
intended query front layer, configure a round-robin policy such as
:class:`.RoundRobinPolicy` or :class:`.DCAwareRoundRobinPolicy` directly.
"""

_child_policy = None
Expand Down
Loading
Loading