@@ -166,6 +166,16 @@ def check_supported(self):
166166 """
167167 pass
168168
169+ def on_control_connection_host (self , host ):
170+ """
171+ Called when the control connection resolves the metadata host behind
172+ the endpoint it is currently using.
173+
174+ Policies that maintain a dynamic host allowlist can override this to
175+ update their internal view of the cluster.
176+ """
177+ pass
178+
169179
170180class RoundRobinPolicy (LoadBalancingPolicy ):
171181 """
@@ -540,6 +550,9 @@ def on_add(self, *args, **kwargs):
540550 def on_remove (self , * args , ** kwargs ):
541551 return self ._child_policy .on_remove (* args , ** kwargs )
542552
553+ def on_control_connection_host (self , host ):
554+ return self ._child_policy .on_control_connection_host (host )
555+
543556
544557class WhiteListRoundRobinPolicy (RoundRobinPolicy ):
545558 """
@@ -594,6 +607,58 @@ def on_add(self, host):
594607 RoundRobinPolicy .on_add (self , host )
595608
596609
610+ class DynamicWhiteListRoundRobinPolicy (RoundRobinPolicy ):
611+ """
612+ A :class:`.RoundRobinPolicy` variant whose allowlist is updated from the
613+ control connection.
614+
615+ This is intended for proxy deployments where the driver can only reach the
616+ host currently behind the control connection endpoint. The policy keeps
617+ every other discovered node at :attr:`~.HostDistance.IGNORED` until the
618+ control connection resolves a different host.
619+ """
620+
621+ def __init__ (self ):
622+ self ._allowed_host_ids = frozenset (())
623+ self ._cluster = None
624+ RoundRobinPolicy .__init__ (self )
625+
626+ def _host_is_allowed (self , host ):
627+ return getattr (host , "host_id" , None ) in self ._allowed_host_ids
628+
629+ def _refresh_live_hosts (self , hosts ):
630+ self ._live_hosts = frozenset (
631+ host for host in hosts
632+ if self ._host_is_allowed (host ) and host .is_up is not False
633+ )
634+
635+ def populate (self , cluster , hosts ):
636+ self ._cluster = cluster
637+ self ._refresh_live_hosts (hosts )
638+ if len (self ._live_hosts ) > 1 :
639+ self ._position = randint (0 , len (self ._live_hosts ) - 1 )
640+ else :
641+ self ._position = 0
642+
643+ def distance (self , host ):
644+ return HostDistance .LOCAL if self ._host_is_allowed (host ) else HostDistance .IGNORED
645+
646+ def on_up (self , host ):
647+ if self ._host_is_allowed (host ):
648+ RoundRobinPolicy .on_up (self , host )
649+
650+ def on_add (self , host ):
651+ if self ._host_is_allowed (host ):
652+ RoundRobinPolicy .on_add (self , host )
653+
654+ def on_control_connection_host (self , host ):
655+ with self ._hosts_lock :
656+ allowed_host_id = getattr (host , "host_id" , None )
657+ self ._allowed_host_ids = frozenset ((allowed_host_id ,)) if allowed_host_id is not None else frozenset (())
658+ if self ._cluster is not None :
659+ self ._refresh_live_hosts (self ._cluster .metadata .all_hosts ())
660+
661+
597662class HostFilterPolicy (LoadBalancingPolicy ):
598663 """
599664 A :class:`.LoadBalancingPolicy` subclass configured with a child policy,
@@ -654,6 +719,9 @@ def on_add(self, host, *args, **kwargs):
654719 def on_remove (self , host , * args , ** kwargs ):
655720 return self ._child_policy .on_remove (host , * args , ** kwargs )
656721
722+ def on_control_connection_host (self , host ):
723+ return self ._child_policy .on_control_connection_host (host )
724+
657725 @property
658726 def predicate (self ):
659727 """
@@ -1322,6 +1390,9 @@ def on_add(self, *args, **kwargs):
13221390 def on_remove (self , * args , ** kwargs ):
13231391 return self ._child_policy .on_remove (* args , ** kwargs )
13241392
1393+ def on_control_connection_host (self , host ):
1394+ return self ._child_policy .on_control_connection_host (host )
1395+
13251396
13261397class DefaultLoadBalancingPolicy (WrapperPolicy ):
13271398 """
0 commit comments