@@ -370,18 +370,26 @@ def _maybe_connect(self, node_id):
370
370
conn = self ._conns .get (node_id )
371
371
372
372
if conn is None :
373
- broker = self .cluster .broker_metadata (node_id )
374
- assert broker , 'Broker id %s not in current metadata' % (node_id ,)
375
-
376
- log .debug ("Initiating connection to node %s at %s:%s" ,
377
- node_id , broker .host , broker .port )
378
- host , port , afi = get_ip_port_afi (broker .host )
379
- cb = WeakMethod (self ._conn_state_change )
380
- conn = BrokerConnection (host , broker .port , afi ,
381
- state_change_callback = cb ,
382
- node_id = node_id ,
383
- ** self .config )
384
- self ._conns [node_id ] = conn
373
+ broker_metadata = self .cluster .broker_metadata (node_id )
374
+
375
+ # The broker may have been removed from the cluster after the
376
+ # call to `maybe_connect`. At this point there is no way to
377
+ # recover, so just ignore the connection
378
+ if broker_metadata is None :
379
+ log .debug ("Node %s is not available anymore, discarding connection" , node_id )
380
+ if node_id in self ._connecting :
381
+ self ._connecting .remove (node_id )
382
+ return False
383
+ else :
384
+ log .debug ("Initiating connection to node %s at %s:%s" ,
385
+ node_id , broker_metadata .host , broker_metadata .port )
386
+ host , port , afi = get_ip_port_afi (broker_metadata .host )
387
+ cb = WeakMethod (self ._conn_state_change )
388
+ conn = BrokerConnection (host , broker_metadata .port , afi ,
389
+ state_change_callback = cb ,
390
+ node_id = node_id ,
391
+ ** self .config )
392
+ self ._conns [node_id ] = conn
385
393
386
394
# Check if existing connection should be recreated because host/port changed
387
395
elif self ._should_recycle_connection (conn ):
0 commit comments