Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 83 additions & 8 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,13 +333,17 @@ class _HostLivenessState(_EventFenceState):
_UP = "up"
_DOWN = "down"
_PENDING_UP = "pending_up"
_PENDING_DOWN = "pending_down"

__slots__ = ("up_endpoint", "pending_up_endpoint")
__slots__ = ("up_endpoint", "down_endpoint",
"pending_up_endpoint", "pending_down_endpoint")

def __init__(self):
_EventFenceState.__init__(self)
self.up_endpoint = None
self.down_endpoint = None
self.pending_up_endpoint = None
self.pending_down_endpoint = None

@property
def up_epoch(self):
Expand All @@ -365,6 +369,14 @@ def pending_up_epoch(self):
def pending_up_epoch(self, epoch):
self._set_or_clear_event(self._PENDING_UP, epoch)

@property
def pending_down_epoch(self):
return self.get_event(self._PENDING_DOWN)

@pending_down_epoch.setter
def pending_down_epoch(self, epoch):
self._set_or_clear_event(self._PENDING_DOWN, epoch)


class _PoolCreationState(_EventFenceState):
_CREATE = "create"
Expand Down Expand Up @@ -2193,7 +2205,42 @@ def _handle_pending_node_up(self, host, pending_up):

def _clear_down_handling(self, host, down_epoch=None):
state = self._get_host_liveness_state(host)
return state.clear_event(_HostLivenessState._DOWN, down_epoch)
if state.clear_event(_HostLivenessState._DOWN, down_epoch):
state.down_endpoint = None
return True
return False

def _clear_pending_down(self, state):
state.pending_down_epoch = None
state.pending_down_endpoint = None

def _pop_pending_node_down_if_ready(self, host):
state = self._get_host_liveness_state(host)
if state.pending_down_epoch is None:
state.pending_down_endpoint = None
return None
if host.is_up or state.up_epoch is not None or state.down_epoch is not None:
return None

pending_down_epoch = state.pending_down_epoch
pending_down_endpoint = state.pending_down_endpoint
if state.epoch != pending_down_epoch:
self._clear_pending_down(state)
return None

self._clear_pending_down(state)
return pending_down_epoch, pending_down_endpoint

def _handle_pending_node_down(self, host, pending_down, is_host_addition):
if pending_down is None:
return False
_pending_down_epoch, pending_down_endpoint = pending_down
log.debug("Handling queued down status of node %s", host)
self.on_down(
host, is_host_addition=is_host_addition,
expect_host_to_be_down=True,
expected_endpoint=pending_down_endpoint)
return True

def _finish_superseded_up_handling(self, host, up_epoch, expected_endpoint=None):
self._cleanup_superseded_up_handling(
Expand Down Expand Up @@ -2515,6 +2562,7 @@ def on_down_potentially_blocking(
return
down_epoch = state.epoch
state.down_epoch = down_epoch
state.down_endpoint = expected_endpoint
elif not owns_reserved_down_handling:
log.debug("Ignoring stale down handling for host %s", host)
return
Expand Down Expand Up @@ -2576,12 +2624,16 @@ def on_down_potentially_blocking(
else:
log.debug("Not starting reconnector for removed host %s", host)
finally:
pending_down = None
pending_up_epoch = None
with host.lock:
if down_epoch is not None and self._clear_down_handling(host, down_epoch):
pending_up_epoch = self._pop_pending_node_up_if_ready(host)
pending_down = self._pop_pending_node_down_if_ready(host)
if pending_down is None:
pending_up_epoch = self._pop_pending_node_up_if_ready(host)

self._handle_pending_node_up(host, pending_up_epoch)
if not self._handle_pending_node_down(host, pending_down, is_host_addition):
self._handle_pending_node_up(host, pending_up_epoch)

def on_down(self, host, is_host_addition, expect_host_to_be_down=False,
expected_endpoint=None):
Expand All @@ -2591,7 +2643,8 @@ def on_down(self, host, is_host_addition, expect_host_to_be_down=False,
if self.is_shutdown:
return

if (self._discount_down_events and
if (not expect_host_to_be_down and
self._discount_down_events and
self.profile_manager.distance(host) != HostDistance.IGNORED):
with host.lock:
host_endpoint = host.endpoint
Expand Down Expand Up @@ -2626,6 +2679,20 @@ def on_down(self, host, is_host_addition, expect_host_to_be_down=False,

was_up = host.is_up
state = self._get_host_liveness_state(host)
down_endpoint = expected_endpoint if expected_endpoint is not None else host.endpoint
pending_down_endpoint = None
if state.down_endpoint is not None:
if not self._endpoints_match(state.down_endpoint, down_endpoint):
pending_down_endpoint = down_endpoint

if pending_down_endpoint is not None:
state.advance()
state.pending_up_epoch = None
state.pending_up_endpoint = None
state.pending_down_epoch = state.epoch
state.pending_down_endpoint = pending_down_endpoint
host.set_down()
return

if not expect_host_to_be_down:
if was_up is False:
Expand All @@ -2639,6 +2706,7 @@ def on_down(self, host, is_host_addition, expect_host_to_be_down=False,
state.advance()
state.pending_up_epoch = None
state.pending_up_endpoint = None
self._clear_pending_down(state)
host.set_down()
down_epoch = state.epoch
if state.down_epoch is not None:
Expand All @@ -2648,16 +2716,21 @@ def on_down(self, host, is_host_addition, expect_host_to_be_down=False,
state.up_epoch is None):
return
state.down_epoch = down_epoch
state.down_endpoint = down_endpoint
log.warning("Host %s has been marked down", host)

future = self.on_down_potentially_blocking(
host, is_host_addition, down_epoch, expected_endpoint)
host, is_host_addition, down_epoch, down_endpoint)
if future is None:
pending_down = None
pending_up_epoch = None
with host.lock:
if self._clear_down_handling(host, down_epoch):
pending_up_epoch = self._pop_pending_node_up_if_ready(host)
self._handle_pending_node_up(host, pending_up_epoch)
pending_down = self._pop_pending_node_down_if_ready(host)
if pending_down is None:
pending_up_epoch = self._pop_pending_node_up_if_ready(host)
if not self._handle_pending_node_down(host, pending_down, is_host_addition):
self._handle_pending_node_up(host, pending_up_epoch)

def on_add(self, host, refresh_nodes=True):
if self.is_shutdown:
Expand Down Expand Up @@ -2742,9 +2815,11 @@ def on_remove(self, host):
state.advance()
state.pending_up_epoch = None
state.pending_up_endpoint = None
self._clear_pending_down(state)
state.up_epoch = None
state.up_endpoint = None
state.down_epoch = None
state.down_endpoint = None
host.set_down()
self._set_non_retryable_auth_failure(host, False)
self.profile_manager.on_remove(host)
Expand Down
Loading
Loading