diff --git a/pykafka/broker.py b/pykafka/broker.py index bfcdfd15c..e06e01fcb 100644 --- a/pykafka/broker.py +++ b/pykafka/broker.py @@ -294,7 +294,16 @@ def _get_unique_req_handler(self, connection_id): handler = RequestHandler(self._handler, conn) handler.start() self._req_handlers[connection_id] = handler - return self._req_handlers[connection_id] + + handler = self._req_handlers[connection_id] + + # Ensure that we're returning a handler with a connected connection. + # If the connection is disconnected, it will raise SocketDisconnectedError + if not handler.shared.connection.connected: + log.warn('Attempting to reconnect for connection id %s..', connection_id) + handler.shared.connection.connect(self._socket_timeout_ms) + + return handler @_check_handler def fetch_messages(self, diff --git a/pykafka/managedbalancedconsumer.py b/pykafka/managedbalancedconsumer.py index fb4613ec8..c2081ee95 100644 --- a/pykafka/managedbalancedconsumer.py +++ b/pykafka/managedbalancedconsumer.py @@ -324,10 +324,10 @@ def _update_member_assignment(self): log.debug("Successfully rebalanced consumer '%s'", self._consumer_id) break except Exception as ex: + log.exception(ex) if i == self._rebalance_max_retries - 1: log.warning('Failed to rebalance s after %d retries.', i) raise - log.exception(ex) log.info('Unable to complete rebalancing. Retrying') self._cluster.handler.sleep(i * (self._rebalance_backoff_ms / 1000)) self._raise_worker_exceptions()