diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 5e75a13da2..c59f0164a8 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -333,13 +333,17 @@ class _HostLivenessState(_EventFenceState): _UP = "up" _DOWN = "down" _PENDING_UP = "pending_up" + _PENDING_DOWN = "pending_down" - __slots__ = ("up_endpoint", "pending_up_endpoint") + __slots__ = ("up_endpoint", "down_endpoint", + "pending_up_endpoint", "pending_down_endpoint") def __init__(self): _EventFenceState.__init__(self) self.up_endpoint = None + self.down_endpoint = None self.pending_up_endpoint = None + self.pending_down_endpoint = None @property def up_epoch(self): @@ -365,6 +369,14 @@ def pending_up_epoch(self): def pending_up_epoch(self, epoch): self._set_or_clear_event(self._PENDING_UP, epoch) + @property + def pending_down_epoch(self): + return self.get_event(self._PENDING_DOWN) + + @pending_down_epoch.setter + def pending_down_epoch(self, epoch): + self._set_or_clear_event(self._PENDING_DOWN, epoch) + class _PoolCreationState(_EventFenceState): _CREATE = "create" @@ -2193,7 +2205,42 @@ def _handle_pending_node_up(self, host, pending_up): def _clear_down_handling(self, host, down_epoch=None): state = self._get_host_liveness_state(host) - return state.clear_event(_HostLivenessState._DOWN, down_epoch) + if state.clear_event(_HostLivenessState._DOWN, down_epoch): + state.down_endpoint = None + return True + return False + + def _clear_pending_down(self, state): + state.pending_down_epoch = None + state.pending_down_endpoint = None + + def _pop_pending_node_down_if_ready(self, host): + state = self._get_host_liveness_state(host) + if state.pending_down_epoch is None: + state.pending_down_endpoint = None + return None + if host.is_up or state.up_epoch is not None or state.down_epoch is not None: + return None + + pending_down_epoch = state.pending_down_epoch + pending_down_endpoint = state.pending_down_endpoint + if state.epoch != pending_down_epoch: + self._clear_pending_down(state) + return None + + self._clear_pending_down(state) + return pending_down_epoch, pending_down_endpoint + + def _handle_pending_node_down(self, host, pending_down, is_host_addition): + if pending_down is None: + return False + _pending_down_epoch, pending_down_endpoint = pending_down + log.debug("Handling queued down status of node %s", host) + self.on_down( + host, is_host_addition=is_host_addition, + expect_host_to_be_down=True, + expected_endpoint=pending_down_endpoint) + return True def _finish_superseded_up_handling(self, host, up_epoch, expected_endpoint=None): self._cleanup_superseded_up_handling( @@ -2515,6 +2562,7 @@ def on_down_potentially_blocking( return down_epoch = state.epoch state.down_epoch = down_epoch + state.down_endpoint = expected_endpoint elif not owns_reserved_down_handling: log.debug("Ignoring stale down handling for host %s", host) return @@ -2576,12 +2624,16 @@ def on_down_potentially_blocking( else: log.debug("Not starting reconnector for removed host %s", host) finally: + pending_down = None pending_up_epoch = None with host.lock: if down_epoch is not None and self._clear_down_handling(host, down_epoch): - pending_up_epoch = self._pop_pending_node_up_if_ready(host) + pending_down = self._pop_pending_node_down_if_ready(host) + if pending_down is None: + pending_up_epoch = self._pop_pending_node_up_if_ready(host) - self._handle_pending_node_up(host, pending_up_epoch) + if not self._handle_pending_node_down(host, pending_down, is_host_addition): + self._handle_pending_node_up(host, pending_up_epoch) def on_down(self, host, is_host_addition, expect_host_to_be_down=False, expected_endpoint=None): @@ -2591,7 +2643,8 @@ def on_down(self, host, is_host_addition, expect_host_to_be_down=False, if self.is_shutdown: return - if (self._discount_down_events and + if (not expect_host_to_be_down and + self._discount_down_events and self.profile_manager.distance(host) != HostDistance.IGNORED): with host.lock: host_endpoint = host.endpoint @@ -2626,6 +2679,20 @@ def on_down(self, host, is_host_addition, expect_host_to_be_down=False, was_up = host.is_up state = self._get_host_liveness_state(host) + down_endpoint = expected_endpoint if expected_endpoint is not None else host.endpoint + pending_down_endpoint = None + if state.down_endpoint is not None: + if not self._endpoints_match(state.down_endpoint, down_endpoint): + pending_down_endpoint = down_endpoint + + if pending_down_endpoint is not None: + state.advance() + state.pending_up_epoch = None + state.pending_up_endpoint = None + state.pending_down_epoch = state.epoch + state.pending_down_endpoint = pending_down_endpoint + host.set_down() + return if not expect_host_to_be_down: if was_up is False: @@ -2639,6 +2706,7 @@ def on_down(self, host, is_host_addition, expect_host_to_be_down=False, state.advance() state.pending_up_epoch = None state.pending_up_endpoint = None + self._clear_pending_down(state) host.set_down() down_epoch = state.epoch if state.down_epoch is not None: @@ -2648,16 +2716,21 @@ def on_down(self, host, is_host_addition, expect_host_to_be_down=False, state.up_epoch is None): return state.down_epoch = down_epoch + state.down_endpoint = down_endpoint log.warning("Host %s has been marked down", host) future = self.on_down_potentially_blocking( - host, is_host_addition, down_epoch, expected_endpoint) + host, is_host_addition, down_epoch, down_endpoint) if future is None: + pending_down = None pending_up_epoch = None with host.lock: if self._clear_down_handling(host, down_epoch): - pending_up_epoch = self._pop_pending_node_up_if_ready(host) - self._handle_pending_node_up(host, pending_up_epoch) + pending_down = self._pop_pending_node_down_if_ready(host) + if pending_down is None: + pending_up_epoch = self._pop_pending_node_up_if_ready(host) + if not self._handle_pending_node_down(host, pending_down, is_host_addition): + self._handle_pending_node_up(host, pending_up_epoch) def on_add(self, host, refresh_nodes=True): if self.is_shutdown: @@ -2742,9 +2815,11 @@ def on_remove(self, host): state.advance() state.pending_up_epoch = None state.pending_up_endpoint = None + self._clear_pending_down(state) state.up_epoch = None state.up_endpoint = None state.down_epoch = None + state.down_endpoint = None host.set_down() self._set_non_retryable_auth_failure(host, False) self.profile_manager.on_remove(host) diff --git a/tests/unit/test_cluster.py b/tests/unit/test_cluster.py index 7576725f2a..4337c3496a 100644 --- a/tests/unit/test_cluster.py +++ b/tests/unit/test_cluster.py @@ -1150,6 +1150,29 @@ def test_discount_down_event_applies_to_current_expected_endpoint(self): assert host.is_up cluster.on_down_potentially_blocking.assert_not_called() + def test_forced_down_is_not_discounted_by_connected_pool(self): + host = self._make_host() + host.set_up() + endpoint = host.endpoint + pool = Mock() + pool.host = host + pool.endpoint = endpoint + pool.open_count = 1 + session = self._make_session_with_pool(host, pool) + cluster = self._make_cluster(session=session) + cluster._discount_down_events = True + cluster.profile_manager.distance.return_value = HostDistance.LOCAL + cluster.on_down_potentially_blocking = Mock(return_value=None) + session.cluster = cluster + + Cluster.on_down( + cluster, host, is_host_addition=False, + expect_host_to_be_down=True, expected_endpoint=endpoint) + + assert not host.is_up + cluster.on_down_potentially_blocking.assert_called_once_with( + host, False, ANY, endpoint) + @staticmethod def _state(cluster, host): return cluster._get_host_liveness_state(host) @@ -1186,6 +1209,31 @@ def test_stale_down_handling_is_ignored_after_host_is_up(self): listener.on_down.assert_not_called() cluster._start_reconnector.assert_not_called() + def test_stale_generic_down_handling_uses_original_endpoint_after_endpoint_swap(self): + executor = _QueuedExecutor() + session = Mock() + listener = Mock() + cluster = self._make_cluster(session=session, listener=listener) + cluster.executor = executor + cluster.profile_manager.distance.return_value = HostDistance.LOCAL + host = self._make_host() + host.set_up() + old_endpoint = host.endpoint + new_endpoint = DefaultEndPoint("127.0.0.2") + + Cluster.on_down(cluster, host, is_host_addition=False) + host.endpoint = new_endpoint + + executor.run_next() + + cluster.profile_manager.on_down.assert_not_called() + cluster.control_connection.on_down.assert_not_called() + session.on_down.assert_called_once_with( + host, expected_endpoint=old_endpoint) + listener.on_down.assert_not_called() + cluster._start_reconnector.assert_not_called() + assert self._state(cluster, host).down_epoch is None + def test_unreserved_down_handling_is_ignored_during_host_up_handling(self): session = Mock() cluster = self._make_cluster(session=session) @@ -1354,7 +1402,9 @@ def test_newer_forced_down_during_up_handling_is_preserved(self): session.on_down.assert_called_once_with( host, expected_endpoint=host.endpoint) listener.on_down.assert_called_once_with(host) - cluster._start_reconnector.assert_called_once_with(host, False, expected_down_epoch=ANY) + cluster._start_reconnector.assert_called_once_with( + host, False, expected_down_epoch=ANY, + expected_endpoint=host.endpoint) assert state.epoch > first_up_epoch assert state.up_epoch == first_up_epoch assert not host.is_up @@ -1388,7 +1438,9 @@ def test_stale_failed_up_callback_does_not_cleanup_newer_down(self): session.on_down.assert_called_once_with( host, expected_endpoint=host.endpoint) listener.on_down.assert_called_once_with(host) - cluster._start_reconnector.assert_called_once_with(host, False, expected_down_epoch=ANY) + cluster._start_reconnector.assert_called_once_with( + host, False, expected_down_epoch=ANY, + expected_endpoint=host.endpoint) listener.on_up.assert_not_called() assert not host.is_up assert self._state(cluster, host).up_epoch is None @@ -1422,7 +1474,8 @@ def force_down_before_cleanup(message, *args, **kwargs): host, expected_endpoint=host.endpoint) listener.on_down.assert_called_once_with(host) cluster._start_reconnector.assert_called_once_with( - host, False, expected_down_epoch=ANY) + host, False, expected_down_epoch=ANY, + expected_endpoint=host.endpoint) assert session.remove_pool.call_count == 1 listener.on_up.assert_not_called() assert not host.is_up @@ -1480,7 +1533,9 @@ def force_down_before_reconnector_is_cleared(h, up_epoch, **kwargs): session.on_down.assert_called_once_with( host, expected_endpoint=host.endpoint) listener.on_down.assert_called_once_with(host) - cluster._start_reconnector.assert_called_once_with(host, False, expected_down_epoch=ANY) + cluster._start_reconnector.assert_called_once_with( + host, False, expected_down_epoch=ANY, + expected_endpoint=host.endpoint) cluster.profile_manager.on_up.assert_not_called() cluster.control_connection.on_up.assert_not_called() old_reconnector.cancel.assert_called_once_with() @@ -1504,7 +1559,9 @@ def test_forced_down_while_reconnecting_runs_new_down_handling(self): session.on_down.assert_called_once_with( host, expected_endpoint=host.endpoint) listener.on_down.assert_called_once_with(host) - cluster._start_reconnector.assert_called_once_with(host, False, expected_down_epoch=ANY) + cluster._start_reconnector.assert_called_once_with( + host, False, expected_down_epoch=ANY, + expected_endpoint=host.endpoint) assert self._state(cluster, host).down_epoch is None def test_newer_down_before_up_side_effects_suppresses_stale_up(self): @@ -1531,7 +1588,9 @@ def force_down_before_first_superseded_check(h, up_epoch): cluster.control_connection.on_down.assert_called_once_with(host) cluster.profile_manager.on_up.assert_not_called() cluster.control_connection.on_up.assert_not_called() - cluster._start_reconnector.assert_called_once_with(host, False, expected_down_epoch=ANY) + cluster._start_reconnector.assert_called_once_with( + host, False, expected_down_epoch=ANY, + expected_endpoint=host.endpoint) assert not host.is_up assert self._state(cluster, host).up_epoch is None assert self._state(cluster, host).down_epoch is None @@ -1739,7 +1798,9 @@ def test_down_during_up_listener_is_handled(self): session.on_down.assert_called_once_with( host, expected_endpoint=host.endpoint) listener.on_down.assert_called_once_with(host) - cluster._start_reconnector.assert_called_once_with(host, False, expected_down_epoch=ANY) + cluster._start_reconnector.assert_called_once_with( + host, False, expected_down_epoch=ANY, + expected_endpoint=host.endpoint) assert not host.is_up assert self._state(cluster, host).up_epoch is None assert self._state(cluster, host).down_epoch is None @@ -1863,7 +1924,9 @@ def test_on_up_queues_after_down_is_submitted_before_worker_runs(self): session.on_down.assert_called_once_with( host, expected_endpoint=host.endpoint) listener.on_down.assert_called_once_with(host) - cluster._start_reconnector.assert_called_once_with(host, False, expected_down_epoch=ANY) + cluster._start_reconnector.assert_called_once_with( + host, False, expected_down_epoch=ANY, + expected_endpoint=host.endpoint) cluster.profile_manager.on_up.assert_called_once_with(host) cluster.control_connection.on_up.assert_called_once_with(host) assert host.is_up @@ -1885,6 +1948,7 @@ def test_on_up_stays_queued_after_endpoint_update_before_down_worker_runs(self): Cluster.on_down( cluster, host, is_host_addition=False, expect_host_to_be_down=True) state = self._state(cluster, host) + old_endpoint = host.endpoint host.endpoint = DefaultEndPoint("127.0.0.2") @@ -1899,12 +1963,12 @@ def test_on_up_stays_queued_after_endpoint_update_before_down_worker_runs(self): executor.run_next() - cluster.profile_manager.on_down.assert_called_once_with(host) - cluster.control_connection.on_down.assert_called_once_with(host) + cluster.profile_manager.on_down.assert_not_called() + cluster.control_connection.on_down.assert_not_called() session.on_down.assert_called_once_with( - host, expected_endpoint=host.endpoint) - listener.on_down.assert_called_once_with(host) - cluster._start_reconnector.assert_called_once_with(host, False, expected_down_epoch=ANY) + host, expected_endpoint=old_endpoint) + listener.on_down.assert_not_called() + cluster._start_reconnector.assert_not_called() cluster.profile_manager.on_up.assert_called_once_with(host) cluster.control_connection.on_up.assert_called_once_with(host) assert host.is_up @@ -1912,6 +1976,93 @@ def test_on_up_stays_queued_after_endpoint_update_before_down_worker_runs(self): assert state.up_epoch is None assert state.pending_up_epoch is None + def test_down_for_replacement_endpoint_during_pending_old_down_is_handled(self): + executor = _QueuedExecutor() + session = Mock() + listener = Mock() + cluster = self._make_cluster(session=session, listener=listener) + cluster.executor = executor + cluster.profile_manager.distance.return_value = HostDistance.LOCAL + host = self._make_host() + host.set_up() + old_endpoint = host.endpoint + new_endpoint = DefaultEndPoint("127.0.0.2") + + Cluster.on_down( + cluster, host, is_host_addition=False, + expected_endpoint=old_endpoint) + state = self._state(cluster, host) + assert state.down_epoch == state.epoch + + host.endpoint = new_endpoint + Cluster.on_up(cluster, host) + assert state.pending_up_epoch == state.epoch + + Cluster.on_down( + cluster, host, is_host_addition=False, + expected_endpoint=new_endpoint) + + executor.run_next() + + assert len(executor.submissions) == 1 + executor.run_next() + + session.on_down.assert_any_call( + host, expected_endpoint=old_endpoint) + session.on_down.assert_any_call( + host, expected_endpoint=new_endpoint) + listener.on_down.assert_called_once_with(host) + cluster._start_reconnector.assert_called_once_with( + host, False, expected_down_epoch=ANY, + expected_endpoint=new_endpoint) + cluster.profile_manager.on_up.assert_not_called() + cluster.control_connection.on_up.assert_not_called() + assert not host.is_up + assert state.down_epoch is None + assert state.up_epoch is None + assert state.pending_up_epoch is None + + def test_forced_down_for_replacement_endpoint_during_old_down_is_handled(self): + executor = _QueuedExecutor() + session = Mock() + listener = Mock() + cluster = self._make_cluster(session=session, listener=listener) + cluster.executor = executor + cluster.profile_manager.distance.return_value = HostDistance.LOCAL + host = self._make_host() + host.set_up() + old_endpoint = host.endpoint + new_endpoint = DefaultEndPoint("127.0.0.2") + + Cluster.on_down( + cluster, host, is_host_addition=False, + expected_endpoint=old_endpoint) + state = self._state(cluster, host) + assert state.down_epoch == state.epoch + + host.endpoint = new_endpoint + Cluster.on_down( + cluster, host, is_host_addition=False, + expect_host_to_be_down=True, expected_endpoint=new_endpoint) + + executor.run_next() + + assert len(executor.submissions) == 1 + executor.run_next() + + session.on_down.assert_any_call( + host, expected_endpoint=old_endpoint) + session.on_down.assert_any_call( + host, expected_endpoint=new_endpoint) + listener.on_down.assert_called_once_with(host) + cluster._start_reconnector.assert_called_once_with( + host, False, expected_down_epoch=ANY, + expected_endpoint=new_endpoint) + assert not host.is_up + assert state.down_epoch is None + assert state.up_epoch is None + assert state.pending_up_epoch is None + def test_later_down_before_worker_runs_does_not_skip_pool_cleanup(self): executor = _QueuedExecutor() host = self._make_host() @@ -2229,7 +2380,9 @@ def test_real_down_for_unknown_host_marks_host_down(self): assert host.is_up is False cluster.profile_manager.on_down.assert_called_once_with(host) cluster.control_connection.on_down.assert_called_once_with(host) - cluster._start_reconnector.assert_called_once_with(host, False, expected_down_epoch=ANY) + cluster._start_reconnector.assert_called_once_with( + host, False, expected_down_epoch=ANY, + expected_endpoint=host.endpoint) def test_expected_down_for_unknown_host_marks_host_down(self): cluster = self._make_cluster() @@ -2241,7 +2394,9 @@ def test_expected_down_for_unknown_host_marks_host_down(self): assert host.is_up is False cluster.profile_manager.on_down.assert_called_once_with(host) cluster.control_connection.on_down.assert_called_once_with(host) - cluster._start_reconnector.assert_called_once_with(host, False, expected_down_epoch=ANY) + cluster._start_reconnector.assert_called_once_with( + host, False, expected_down_epoch=ANY, + expected_endpoint=host.endpoint) class SessionTest(unittest.TestCase):