Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 214 additions & 3 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@

import atexit
import datetime
from enum import Enum
from binascii import hexlify
from collections import defaultdict
from collections.abc import Mapping
from concurrent.futures import ThreadPoolExecutor, FIRST_COMPLETED, wait as wait_futures
from concurrent.futures import Future, ThreadPoolExecutor, FIRST_COMPLETED, wait as wait_futures
from copy import copy
from functools import partial, reduce, wraps
from itertools import groupby, count, chain
import json
import logging
from typing import Any, Dict, Optional, Union
from typing import Any, Dict, Optional, Union, Tuple
from warnings import warn
from random import random
import re
Expand Down Expand Up @@ -214,6 +215,14 @@ def __init__(self, message, errors):
self.errors = errors


class SchemaAgreementScope(str, Enum):
"""Scope selectors for :meth:`.Session.wait_for_schema_agreement`."""

RACK = 'rack'
DC = 'dc'
CLUSTER = 'cluster'


def _future_completed(future):
""" Helper for run_in_executor() """
exc = future.exception()
Expand Down Expand Up @@ -3374,6 +3383,185 @@ def pool_finished_setting_keyspace(pool, host_errors):
for pool in tuple(self._pools.values()):
pool._set_keyspace_for_all_conns(keyspace, pool_finished_setting_keyspace)

def wait_for_schema_agreement(self, wait_time: Optional[float] = None,
scope: SchemaAgreementScope = SchemaAgreementScope.CLUSTER) -> bool:
"""
Wait for connected hosts in the selected scope to report the same
schema version from ``system.local``.

By default, the timeout for this operation is governed by
:attr:`~.Cluster.max_schema_agreement_wait` and
:attr:`~.Cluster.control_connection_timeout`.

Passing ``wait_time`` here overrides
:attr:`~.Cluster.max_schema_agreement_wait`. If provided, ``wait_time``
must be greater than 0.

Comment thread
Lorak-mmk marked this conversation as resolved.
``scope`` determines which connected hosts participate in the check.
Pass :attr:`SchemaAgreementScope.RACK`, :attr:`SchemaAgreementScope.DC`,
or :attr:`SchemaAgreementScope.CLUSTER`.
The default is :attr:`SchemaAgreementScope.CLUSTER`. ``RACK`` narrows
the check to connected hosts in the local rack only. ``DC`` checks
connected hosts in the local datacenter. ``CLUSTER`` queries every
connected host across all datacenters.

:param wait_time: Override for
:attr:`~.Cluster.max_schema_agreement_wait`, should be positive
number.
:param scope: Restricts the check to connected hosts in the local rack,
local datacenter, or whole connected cluster.
:returns: ``True`` when the selected connected hosts agree on schema,
otherwise ``False``.
:raises ValueError: If ``wait_time`` is provided and is not greater
than 0.
:raises ValueError: If ``scope`` is not one of the schema agreement
scope values.
"""

if wait_time is not None and wait_time <= 0:
raise ValueError("wait_time must be greater than 0")

total_timeout = wait_time if wait_time is not None else self.cluster.max_schema_agreement_wait
if total_timeout <= 0:
raise ValueError("total_timeout must be greater than 0")

Comment thread
Lorak-mmk marked this conversation as resolved.
deadline = time.time() + total_timeout
schema_mismatches = None
scope_label = 'local rack' if scope is SchemaAgreementScope.RACK else (
'local datacenter' if scope is SchemaAgreementScope.DC else 'cluster')

while time.time() < deadline:
schema_mismatches = self._get_schema_mismatches_for_scope(deadline, scope)
if schema_mismatches is None:
return True

log.debug("[session] Connected hosts in the %s still disagree on schema, trying again", scope_label)
remaining = deadline - time.time()
if remaining > 0:
time.sleep(min(0.2, remaining))

log.warning("[session] Connected hosts in the %s are reporting a schema disagreement: %s",
scope_label, schema_mismatches)
return False

def _get_schema_mismatches_for_scope(self, deadline: float,
scope: SchemaAgreementScope) -> Optional[Dict[Any, Any]]:
hosts = self._get_schema_agreement_hosts(scope)
mismatches = defaultdict(list)
errors = {}
scope_label = 'local rack' if scope is SchemaAgreementScope.RACK else (
'local datacenter' if scope is SchemaAgreementScope.DC else 'cluster')

if not hosts:
errors[scope.value] = ConnectionException(
"No connected hosts available in the %s" % (scope_label,)
)
return {'unavailable': errors}

metadata_request_timeout = self.cluster.control_connection._metadata_request_timeout
query = maybe_add_timeout_to_query(ControlConnection._SELECT_SCHEMA_LOCAL, metadata_request_timeout)

schema_version_futures = []
for host in hosts:
try:
schema_version_future = self._query_local_schema_version(host, query, deadline)
except Exception as exc:
errors[host.endpoint] = exc
continue

schema_version_futures.append((host, schema_version_future))

if schema_version_futures:
# Start all host queries first, then wait for the whole batch.
remaining = max(0.0, deadline - time.time())
if remaining > 0:
wait_futures([future for _, future in schema_version_futures], timeout=remaining)

for host, future in schema_version_futures:
if future.done():
try:
rows = future.result()
except Exception as exc:
errors[host.endpoint] = exc
continue

row = rows.one()
schema_version = getattr(row, "schema_version", None) if row is not None else None
mismatches[schema_version].append(host.endpoint)
else:
errors[host.endpoint] = OperationTimedOut(last_host=host, timeout=max(0.0, deadline - time.time()))

if len(mismatches) == 1 and None not in mismatches and not errors:
log.debug("[session] Connected hosts in the %s agree on schema", scope_label)
return None

if errors:
mismatches['unavailable'] = errors
return dict(mismatches)

Comment thread
Lorak-mmk marked this conversation as resolved.
def _get_schema_agreement_hosts(self, scope: SchemaAgreementScope) -> Tuple[Host, ...]:
if scope is SchemaAgreementScope.RACK:
allowed_distances = (HostDistance.LOCAL_RACK,)
elif scope is SchemaAgreementScope.DC:
allowed_distances = (HostDistance.LOCAL_RACK, HostDistance.LOCAL)
else:
allowed_distances = (HostDistance.LOCAL_RACK, HostDistance.LOCAL, HostDistance.REMOTE)

return tuple(
host for host, pool in tuple(self._pools.items())
if host.is_up
and not pool.is_shutdown
and self._profile_manager.distance(host) in allowed_distances)

def _query_local_schema_version(self, host: Host, query: str, deadline: float) -> Future:
remaining = max(0.0, deadline - time.time())
try:
response_future = self.execute_async(
query,
timeout=self._schema_agreement_query_timeout(remaining),
host=host,
)
except OperationTimedOut as timeout:
log.debug("[session] Timed out waiting for schema version from %s: %s", host, timeout)
raise
except Exception as exc:
log.debug("[session] Error querying schema version from %s: %s", host, exc)
raise

# execute_async returns cassandra.cluster.ResponseFuture, which does not have bulk waiting logic for it.
# That is why _query_local_schema_version returns concurrent.futures.Future
# so that schema agreement logic could use concurrent.futures.wait_futures to wait on them.
# schema_version_future is an adapter between cassandra.cluster.ResponseFuture and concurrent.futures.Future
# to make things work
schema_version_future = Future()

def _set_result(result, result_future=schema_version_future, response_future=response_future):
if result_future.done():
return
try:
result_future.set_result(ResultSet(response_future, result))
except Exception as exc:
result_future.set_exception(exc)

def _set_exception(exc, result_future=schema_version_future):
if result_future.done():
return
result_future.set_exception(exc)

try:
response_future.add_callbacks(_set_result, _set_exception)
except Exception as exc:
log.debug("[session] Error registering schema version callback from %s: %s", host, exc)
raise
Comment thread
dkropachev marked this conversation as resolved.

return schema_version_future
Comment thread
dkropachev marked this conversation as resolved.

def _schema_agreement_query_timeout(self, remaining: float) -> float:
control_timeout = self.cluster.control_connection._timeout
if control_timeout is None:
return max(0.0, remaining)
return max(0.0, min(control_timeout, remaining))

def user_type_registered(self, keyspace, user_type, klass):
"""
Called by the parent Cluster instance when the user registers a new
Expand Down Expand Up @@ -3786,7 +3974,7 @@ def _refresh_schema(self, connection, preloaded_results=None, schema_agreement_w
if self._cluster.is_shutdown:
return False

agreed = self.wait_for_schema_agreement(connection,
agreed = self._wait_for_schema_agreement(connection=connection,
preloaded_results=preloaded_results,
wait_time=schema_agreement_wait)

Expand Down Expand Up @@ -4079,7 +4267,30 @@ def _handle_schema_change(self, event):
self._cluster.scheduler.schedule_unique(delay, self.refresh_schema, **event)

def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wait_time=None):
"""
Wait for schema agreement from the control connection's metadata view.

This method is intended for internal metadata refresh flows. External
callers should use :meth:`.Session.wait_for_schema_agreement` instead.

The control connection observes schema agreement from its own
perspective, which may include hosts the session is not using, and it
may fail when the control connection itself is transiently unhealthy.
That can produce false positives or failures that do not reflect
whether a session can safely proceed.

.. deprecated:: 3.30.0
Use :meth:`.Session.wait_for_schema_agreement` instead.
"""
warn("ControlConnection.wait_for_schema_agreement is deprecated and will be removed in 4.0. "
"Use Session.wait_for_schema_agreement instead. "
"This method is for internal metadata refresh use only.",
DeprecationWarning, stacklevel=2)
return self._wait_for_schema_agreement(connection=connection,
preloaded_results=preloaded_results,
wait_time=wait_time)

def _wait_for_schema_agreement(self, connection=None, preloaded_results=None, wait_time=None):
total_timeout = wait_time if wait_time is not None else self._cluster.max_schema_agreement_wait
if total_timeout <= 0:
return True
Expand Down
2 changes: 2 additions & 0 deletions docs/api/cassandra/cluster.rst
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ Clusters and Sessions

.. automethod:: set_keyspace(keyspace)

.. automethod:: wait_for_schema_agreement

.. automethod:: get_execution_profile

.. automethod:: execution_profile_clone_update
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/long/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,4 @@ def check_and_wait_for_agreement(self, session, rs, exepected):
time.sleep(1)
assert rs.response_future.is_schema_agreed == exepected
if not rs.response_future.is_schema_agreed:
session.cluster.control_connection.wait_for_schema_agreement(wait_time=1000)
session.wait_for_schema_agreement(wait_time=1000)
2 changes: 1 addition & 1 deletion tests/integration/standard/test_udts.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def test_can_register_udt_before_connecting(self):
c.register_user_type("udt_test_register_before_connecting2", "user", User2)

s = c.connect(wait_for_all_pools=True)
c.control_connection.wait_for_schema_agreement()
s.wait_for_schema_agreement()

s.execute("INSERT INTO udt_test_register_before_connecting.mytable (a, b) VALUES (%s, %s)", (0, User1(42, 'bob')))
result = s.execute("SELECT b FROM udt_test_register_before_connecting.mytable WHERE a=0")
Expand Down
Loading
Loading