Skip to content

Commit 99c08e6

Browse files
authored
Prefix producer logs w/ client id and transactional id (#2591)
1 parent c5cbe84 commit 99c08e6

File tree

2 files changed

+69
-61
lines changed

2 files changed

+69
-61
lines changed

Diff for: kafka/producer/kafka.py

+27-23
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,6 @@ class KafkaProducer(object):
380380
}
381381

382382
def __init__(self, **configs):
383-
log.debug("Starting the Kafka producer") # trace
384383
self.config = copy.copy(self.DEFAULT_CONFIG)
385384
user_provided_configs = set(configs.keys())
386385
for key in self.config:
@@ -409,8 +408,10 @@ def __init__(self, **configs):
409408
self.config['api_version'] = None
410409
else:
411410
self.config['api_version'] = tuple(map(int, deprecated.split('.')))
412-
log.warning('use api_version=%s [tuple] -- "%s" as str is deprecated',
413-
str(self.config['api_version']), deprecated)
411+
log.warning('%s: use api_version=%s [tuple] -- "%s" as str is deprecated',
412+
self, str(self.config['api_version']), deprecated)
413+
414+
log.debug("%s: Starting Kafka producer", self)
414415

415416
# Configure metrics
416417
if self.config['metrics_enabled']:
@@ -466,26 +467,26 @@ def __init__(self, **configs):
466467
metadata=self._metadata,
467468
)
468469
if self._transaction_manager.is_transactional():
469-
log.info("Instantiated a transactional producer.")
470+
log.info("%s: Instantiated a transactional producer.", self)
470471
else:
471-
log.info("Instantiated an idempotent producer.")
472+
log.info("%s: Instantiated an idempotent producer.", self)
472473

473474
if 'retries' not in user_provided_configs:
474-
log.info("Overriding the default 'retries' config to 3 since the idempotent producer is enabled.")
475+
log.info("%s: Overriding the default 'retries' config to 3 since the idempotent producer is enabled.", self)
475476
self.config['retries'] = 3
476477
elif self.config['retries'] == 0:
477478
raise Errors.KafkaConfigurationError("Must set 'retries' to non-zero when using the idempotent producer.")
478479

479480
if 'max_in_flight_requests_per_connection' not in user_provided_configs:
480-
log.info("Overriding the default 'max_in_flight_requests_per_connection' to 1 since idempontence is enabled.")
481+
log.info("%s: Overriding the default 'max_in_flight_requests_per_connection' to 1 since idempontence is enabled.", self)
481482
self.config['max_in_flight_requests_per_connection'] = 1
482483
elif self.config['max_in_flight_requests_per_connection'] != 1:
483484
raise Errors.KafkaConfigurationError("Must set 'max_in_flight_requests_per_connection' to 1 in order"
484485
" to use the idempotent producer."
485486
" Otherwise we cannot guarantee idempotence.")
486487

487488
if 'acks' not in user_provided_configs:
488-
log.info("Overriding the default 'acks' config to 'all' since idempotence is enabled")
489+
log.info("%s: Overriding the default 'acks' config to 'all' since idempotence is enabled", self)
489490
self.config['acks'] = -1
490491
elif self.config['acks'] != -1:
491492
raise Errors.KafkaConfigurationError("Must set 'acks' config to 'all' in order to use the idempotent"
@@ -509,7 +510,7 @@ def __init__(self, **configs):
509510

510511
self._cleanup = self._cleanup_factory()
511512
atexit.register(self._cleanup)
512-
log.debug("Kafka producer started")
513+
log.debug("%s: Kafka producer started", self)
513514

514515
def bootstrap_connected(self):
515516
"""Return True if the bootstrap is connected."""
@@ -564,7 +565,7 @@ def __getattr__(self, name):
564565
self._unregister_cleanup()
565566

566567
if not hasattr(self, '_closed') or self._closed:
567-
log.info('Kafka producer closed')
568+
log.info('%s: Kafka producer closed', self)
568569
return
569570
if timeout is None:
570571
# threading.TIMEOUT_MAX is available in Python3.3+
@@ -574,26 +575,26 @@ def __getattr__(self, name):
574575
else:
575576
assert timeout >= 0
576577

577-
log.info("Closing the Kafka producer with %s secs timeout.", timeout)
578+
log.info("%s: Closing the Kafka producer with %s secs timeout.", self, timeout)
578579
self.flush(timeout)
579580
invoked_from_callback = bool(threading.current_thread() is self._sender)
580581
if timeout > 0:
581582
if invoked_from_callback:
582-
log.warning("Overriding close timeout %s secs to 0 in order to"
583+
log.warning("%s: Overriding close timeout %s secs to 0 in order to"
583584
" prevent useless blocking due to self-join. This"
584585
" means you have incorrectly invoked close with a"
585586
" non-zero timeout from the producer call-back.",
586-
timeout)
587+
self, timeout)
587588
else:
588589
# Try to close gracefully.
589590
if self._sender is not None:
590591
self._sender.initiate_close()
591592
self._sender.join(timeout)
592593

593594
if self._sender is not None and self._sender.is_alive():
594-
log.info("Proceeding to force close the producer since pending"
595+
log.info("%s: Proceeding to force close the producer since pending"
595596
" requests could not be completed within timeout %s.",
596-
timeout)
597+
self, timeout)
597598
self._sender.force_close()
598599

599600
if self._metrics:
@@ -607,7 +608,7 @@ def __getattr__(self, name):
607608
except AttributeError:
608609
pass
609610
self._closed = True
610-
log.debug("The Kafka producer has closed.")
611+
log.debug("%s: The Kafka producer has closed.", self)
611612

612613
def partitions_for(self, topic):
613614
"""Returns set of all known partitions for the topic."""
@@ -816,7 +817,7 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
816817
self._ensure_valid_record_size(message_size)
817818

818819
tp = TopicPartition(topic, partition)
819-
log.debug("Sending (key=%r value=%r headers=%r) to %s", key, value, headers, tp)
820+
log.debug("%s: Sending (key=%r value=%r headers=%r) to %s", self, key, value, headers, tp)
820821

821822
if self._transaction_manager and self._transaction_manager.is_transactional():
822823
self._transaction_manager.maybe_add_partition_to_transaction(tp)
@@ -825,16 +826,16 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
825826
key_bytes, value_bytes, headers)
826827
future, batch_is_full, new_batch_created = result
827828
if batch_is_full or new_batch_created:
828-
log.debug("Waking up the sender since %s is either full or"
829-
" getting a new batch", tp)
829+
log.debug("%s: Waking up the sender since %s is either full or"
830+
" getting a new batch", self, tp)
830831
self._sender.wakeup()
831832

832833
return future
833834
# handling exceptions and record the errors;
834835
# for API exceptions return them in the future,
835836
# for other exceptions raise directly
836837
except Errors.BrokerResponseError as e:
837-
log.error("Exception occurred during message send: %s", e)
838+
log.error("%s: Exception occurred during message send: %s", self, e)
838839
return FutureRecordMetadata(
839840
FutureProduceResult(TopicPartition(topic, partition)),
840841
-1, None, None,
@@ -865,7 +866,7 @@ def flush(self, timeout=None):
865866
KafkaTimeoutError: failure to flush buffered records within the
866867
provided timeout
867868
"""
868-
log.debug("Flushing accumulated records in producer.") # trace
869+
log.debug("%s: Flushing accumulated records in producer.", self)
869870
self._accumulator.begin_flush()
870871
self._sender.wakeup()
871872
self._accumulator.await_flush_completion(timeout=timeout)
@@ -911,7 +912,7 @@ def _wait_on_metadata(self, topic, max_wait):
911912
if not metadata_event:
912913
metadata_event = threading.Event()
913914

914-
log.debug("Requesting metadata update for topic %s", topic)
915+
log.debug("%s: Requesting metadata update for topic %s", self, topic)
915916

916917
metadata_event.clear()
917918
future = self._metadata.request_update()
@@ -925,7 +926,7 @@ def _wait_on_metadata(self, topic, max_wait):
925926
raise Errors.TopicAuthorizationFailedError(set([topic]))
926927
else:
927928
elapsed = time.time() - begin
928-
log.debug("_wait_on_metadata woke after %s secs.", elapsed)
929+
log.debug("%s: _wait_on_metadata woke after %s secs.", self, elapsed)
929930

930931
def _serialize(self, f, topic, data):
931932
if not f:
@@ -972,3 +973,6 @@ def metrics(self, raw=False):
972973
metrics[k.group][k.name] = {}
973974
metrics[k.group][k.name] = v.value()
974975
return metrics
976+
977+
def __str__(self):
978+
return "<KafkaProducer client_id=%s transactional_id=%s>" % (self.config['client_id'], self.config['transactional_id'])

0 commit comments

Comments
 (0)