From 65eacfb2e6900d789ad501c6d5c81fcbc420ed16 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sat, 9 Mar 2024 23:15:36 -0500 Subject: [PATCH 1/8] Fix base class of DescribeClientQuotasResponse_v0 (#144) Co-authored-by: Denis Otkidach --- kafka/protocol/admin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 41b4a9576..0bb1a7acc 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -925,7 +925,7 @@ class DeleteGroupsRequest_v1(Request): ] -class DescribeClientQuotasResponse_v0(Request): +class DescribeClientQuotasResponse_v0(Response): API_KEY = 48 API_VERSION = 0 SCHEMA = Schema( From e0ebe5dd3191778b35967b3a0dd22ca6e091a9b7 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sat, 9 Mar 2024 23:16:53 -0500 Subject: [PATCH 2/8] Update license_file to license_files (#131) The former has been deprecated since setuptools 56 Co-authored-by: micwoj92 <45581170+micwoj92@users.noreply.github.com> --- setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index 5c6311daf..76daa0897 100644 --- a/setup.cfg +++ b/setup.cfg @@ -2,4 +2,4 @@ universal=1 [metadata] -license_file = LICENSE +license_files = LICENSE From 26bb3eb6534cc1aff0a2a7751de7f7dedfdd4cd5 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sat, 9 Mar 2024 23:22:03 -0500 Subject: [PATCH 3/8] Update some RST documentation syntax (#130) * docs: Update syntax in README.rst * docs: Update code block syntax in docs/index.rst --------- Co-authored-by: HalfSweet <60973476+HalfSweet@users.noreply.github.com> --- README.rst | 165 +++++++++++++++++++++++++++++++------------------ docs/index.rst | 114 +++++++++++++++++++++------------- 2 files changed, 174 insertions(+), 105 deletions(-) diff --git a/README.rst b/README.rst index 8a5c71b38..b7acfc8a2 100644 --- a/README.rst +++ b/README.rst @@ -32,13 +32,19 @@ check code (perhaps using zookeeper or consul). For older brokers, you can achieve something similar by manually assigning different partitions to each consumer instance with config management tools like chef, ansible, etc. This approach will work fine, though it does not support rebalancing on failures. -See + +See https://kafka-python.readthedocs.io/en/master/compatibility.html + for more details. Please note that the master branch may contain unreleased features. For release documentation, please see readthedocs and/or python's inline help. ->>> pip install kafka-python-ng + +.. code-block:: bash + + $ pip install kafka-python-ng + KafkaConsumer @@ -48,42 +54,56 @@ KafkaConsumer is a high-level message consumer, intended to operate as similarly as possible to the official java client. Full support for coordinated consumer groups requires use of kafka brokers that support the Group APIs: kafka v0.9+. -See + +See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html + for API and configuration details. The consumer iterator returns ConsumerRecords, which are simple namedtuples that expose basic message attributes: topic, partition, offset, key, and value: ->>> from kafka import KafkaConsumer ->>> consumer = KafkaConsumer('my_favorite_topic') ->>> for msg in consumer: -... print (msg) +.. code-block:: python ->>> # join a consumer group for dynamic partition assignment and offset commits ->>> from kafka import KafkaConsumer ->>> consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group') ->>> for msg in consumer: -... print (msg) + from kafka import KafkaConsumer + consumer = KafkaConsumer('my_favorite_topic') + for msg in consumer: + print (msg) ->>> # manually assign the partition list for the consumer ->>> from kafka import TopicPartition ->>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234') ->>> consumer.assign([TopicPartition('foobar', 2)]) ->>> msg = next(consumer) +.. code-block:: python ->>> # Deserialize msgpack-encoded values ->>> consumer = KafkaConsumer(value_deserializer=msgpack.loads) ->>> consumer.subscribe(['msgpackfoo']) ->>> for msg in consumer: -... assert isinstance(msg.value, dict) + # join a consumer group for dynamic partition assignment and offset commits + from kafka import KafkaConsumer + consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group') + for msg in consumer: + print (msg) ->>> # Access record headers. The returned value is a list of tuples ->>> # with str, bytes for key and value ->>> for msg in consumer: -... print (msg.headers) +.. code-block:: python ->>> # Get consumer metrics ->>> metrics = consumer.metrics() + # manually assign the partition list for the consumer + from kafka import TopicPartition + consumer = KafkaConsumer(bootstrap_servers='localhost:1234') + consumer.assign([TopicPartition('foobar', 2)]) + msg = next(consumer) + +.. code-block:: python + + # Deserialize msgpack-encoded values + consumer = KafkaConsumer(value_deserializer=msgpack.loads) + consumer.subscribe(['msgpackfoo']) + for msg in consumer: + assert isinstance(msg.value, dict) + +.. code-block:: python + + # Access record headers. The returned value is a list of tuples + # with str, bytes for key and value + for msg in consumer: + print (msg.headers) + +.. code-block:: python + + # Get consumer metrics + metrics = consumer.metrics() KafkaProducer @@ -91,46 +111,66 @@ KafkaProducer KafkaProducer is a high-level, asynchronous message producer. The class is intended to operate as similarly as possible to the official java client. -See + +See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html + for more details. ->>> from kafka import KafkaProducer ->>> producer = KafkaProducer(bootstrap_servers='localhost:1234') ->>> for _ in range(100): -... producer.send('foobar', b'some_message_bytes') +.. code-block:: python + + from kafka import KafkaProducer + producer = KafkaProducer(bootstrap_servers='localhost:1234') + for _ in range(100): + producer.send('foobar', b'some_message_bytes') + +.. code-block:: python + + # Block until a single message is sent (or timeout) + future = producer.send('foobar', b'another_message') + result = future.get(timeout=60) + +.. code-block:: python + + # Block until all pending messages are at least put on the network + # NOTE: This does not guarantee delivery or success! It is really + # only useful if you configure internal batching using linger_ms + producer.flush() + +.. code-block:: python ->>> # Block until a single message is sent (or timeout) ->>> future = producer.send('foobar', b'another_message') ->>> result = future.get(timeout=60) + # Use a key for hashed-partitioning + producer.send('foobar', key=b'foo', value=b'bar') ->>> # Block until all pending messages are at least put on the network ->>> # NOTE: This does not guarantee delivery or success! It is really ->>> # only useful if you configure internal batching using linger_ms ->>> producer.flush() +.. code-block:: python ->>> # Use a key for hashed-partitioning ->>> producer.send('foobar', key=b'foo', value=b'bar') + # Serialize json messages + import json + producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8')) + producer.send('fizzbuzz', {'foo': 'bar'}) ->>> # Serialize json messages ->>> import json ->>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8')) ->>> producer.send('fizzbuzz', {'foo': 'bar'}) +.. code-block:: python ->>> # Serialize string keys ->>> producer = KafkaProducer(key_serializer=str.encode) ->>> producer.send('flipflap', key='ping', value=b'1234') + # Serialize string keys + producer = KafkaProducer(key_serializer=str.encode) + producer.send('flipflap', key='ping', value=b'1234') ->>> # Compress messages ->>> producer = KafkaProducer(compression_type='gzip') ->>> for i in range(1000): -... producer.send('foobar', b'msg %d' % i) +.. code-block:: python ->>> # Include record headers. The format is list of tuples with string key ->>> # and bytes value. ->>> producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')]) + # Compress messages + producer = KafkaProducer(compression_type='gzip') + for i in range(1000): + producer.send('foobar', b'msg %d' % i) ->>> # Get producer performance metrics ->>> metrics = producer.metrics() +.. code-block:: python + + # Include record headers. The format is list of tuples with string key + # and bytes value. + producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')]) + +.. code-block:: python + + # Get producer performance metrics + metrics = producer.metrics() Thread safety @@ -154,7 +194,9 @@ kafka-python-ng supports the following compression formats: - Zstandard (zstd) gzip is supported natively, the others require installing additional libraries. -See for more information. + +See https://kafka-python.readthedocs.io/en/master/install.html for more information. + Optimized CRC32 Validation @@ -162,8 +204,9 @@ Optimized CRC32 Validation Kafka uses CRC32 checksums to validate messages. kafka-python-ng includes a pure python implementation for compatibility. To improve performance for high-throughput -applications, kafka-python-ng will use `crc32c` for optimized native code if installed. -See for installation instructions. +applications, kafka-python will use `crc32c` for optimized native code if installed. +See https://kafka-python.readthedocs.io/en/master/install.html for installation instructions. + See https://pypi.org/project/crc32c/ for details on the underlying crc32c lib. diff --git a/docs/index.rst b/docs/index.rst index 92b998d92..779ad997b 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -31,7 +31,11 @@ failures. See `Compatibility `_ for more details. Please note that the master branch may contain unreleased features. For release documentation, please see readthedocs and/or python's inline help. ->>> pip install kafka-python-ng + +.. code:: bash + + pip install kafka-python-ng + KafkaConsumer @@ -47,28 +51,36 @@ See `KafkaConsumer `_ for API and configuration detai The consumer iterator returns ConsumerRecords, which are simple namedtuples that expose basic message attributes: topic, partition, offset, key, and value: ->>> from kafka import KafkaConsumer ->>> consumer = KafkaConsumer('my_favorite_topic') ->>> for msg in consumer: -... print (msg) +.. code:: python + + from kafka import KafkaConsumer + consumer = KafkaConsumer('my_favorite_topic') + for msg in consumer: + print (msg) + +.. code:: python + + # join a consumer group for dynamic partition assignment and offset commits + from kafka import KafkaConsumer + consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group') + for msg in consumer: + print (msg) + +.. code:: python ->>> # join a consumer group for dynamic partition assignment and offset commits ->>> from kafka import KafkaConsumer ->>> consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group') ->>> for msg in consumer: -... print (msg) + # manually assign the partition list for the consumer + from kafka import TopicPartition + consumer = KafkaConsumer(bootstrap_servers='localhost:1234') + consumer.assign([TopicPartition('foobar', 2)]) + msg = next(consumer) ->>> # manually assign the partition list for the consumer ->>> from kafka import TopicPartition ->>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234') ->>> consumer.assign([TopicPartition('foobar', 2)]) ->>> msg = next(consumer) +.. code:: python ->>> # Deserialize msgpack-encoded values ->>> consumer = KafkaConsumer(value_deserializer=msgpack.loads) ->>> consumer.subscribe(['msgpackfoo']) ->>> for msg in consumer: -... assert isinstance(msg.value, dict) + # Deserialize msgpack-encoded values + consumer = KafkaConsumer(value_deserializer=msgpack.loads) + consumer.subscribe(['msgpackfoo']) + for msg in consumer: + assert isinstance(msg.value, dict) KafkaProducer @@ -78,36 +90,50 @@ KafkaProducer The class is intended to operate as similarly as possible to the official java client. See `KafkaProducer `_ for more details. ->>> from kafka import KafkaProducer ->>> producer = KafkaProducer(bootstrap_servers='localhost:1234') ->>> for _ in range(100): -... producer.send('foobar', b'some_message_bytes') +.. code:: python + + from kafka import KafkaProducer + producer = KafkaProducer(bootstrap_servers='localhost:1234') + for _ in range(100): + producer.send('foobar', b'some_message_bytes') + +.. code:: python + + # Block until a single message is sent (or timeout) + future = producer.send('foobar', b'another_message') + result = future.get(timeout=60) + +.. code:: python + + # Block until all pending messages are at least put on the network + # NOTE: This does not guarantee delivery or success! It is really + # only useful if you configure internal batching using linger_ms + producer.flush() + +.. code:: python + + # Use a key for hashed-partitioning + producer.send('foobar', key=b'foo', value=b'bar') ->>> # Block until a single message is sent (or timeout) ->>> future = producer.send('foobar', b'another_message') ->>> result = future.get(timeout=60) +.. code:: python ->>> # Block until all pending messages are at least put on the network ->>> # NOTE: This does not guarantee delivery or success! It is really ->>> # only useful if you configure internal batching using linger_ms ->>> producer.flush() + # Serialize json messages + import json + producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8')) + producer.send('fizzbuzz', {'foo': 'bar'}) ->>> # Use a key for hashed-partitioning ->>> producer.send('foobar', key=b'foo', value=b'bar') +.. code:: python ->>> # Serialize json messages ->>> import json ->>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8')) ->>> producer.send('fizzbuzz', {'foo': 'bar'}) + # Serialize string keys + producer = KafkaProducer(key_serializer=str.encode) + producer.send('flipflap', key='ping', value=b'1234') ->>> # Serialize string keys ->>> producer = KafkaProducer(key_serializer=str.encode) ->>> producer.send('flipflap', key='ping', value=b'1234') +.. code:: python ->>> # Compress messages ->>> producer = KafkaProducer(compression_type='gzip') ->>> for i in range(1000): -... producer.send('foobar', b'msg %d' % i) + # Compress messages + producer = KafkaProducer(compression_type='gzip') + for i in range(1000): + producer.send('foobar', b'msg %d' % i) Thread safety From 88763da301f72759911ec4c0e4dc8b4e9f83e124 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sun, 10 Mar 2024 00:14:12 -0500 Subject: [PATCH 4/8] Fix crc32c's __main__ for Python 3 (#142) * Fix crc32c's __main__ for Python 3 * Remove TODO from _crc32c.py --------- Co-authored-by: Yonatan Goldschmidt --- kafka/record/_crc32c.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/kafka/record/_crc32c.py b/kafka/record/_crc32c.py index 9b51ad8a9..6642b5bbe 100644 --- a/kafka/record/_crc32c.py +++ b/kafka/record/_crc32c.py @@ -139,7 +139,5 @@ def crc(data): if __name__ == "__main__": import sys - # TODO remove the pylint disable once pylint fixes - # https://github.com/PyCQA/pylint/issues/2571 - data = sys.stdin.read() # pylint: disable=assignment-from-no-return + data = sys.stdin.buffer.read() # pylint: disable=assignment-from-no-return print(hex(crc(data))) From b1a4c53cfc426118cca491a552861e9b07592629 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sun, 10 Mar 2024 00:59:32 -0500 Subject: [PATCH 5/8] Strip trailing dot off hostname. (#133) Co-authored-by: Dave Voutila --- kafka/conn.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/conn.py b/kafka/conn.py index 1efb8a0a1..1f3bc2006 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -496,7 +496,7 @@ def _wrap_ssl(self): try: self._sock = self._ssl_context.wrap_socket( self._sock, - server_hostname=self.host, + server_hostname=self.host.rstrip("."), do_handshake_on_connect=False) except ssl.SSLError as e: log.exception('%s: Failed to wrap socket in SSLContext!', self) From 18eaa2d29e7ba3d0f261e620397712b5d67c3a94 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sun, 10 Mar 2024 01:47:35 -0500 Subject: [PATCH 6/8] Handle OSError to properly recycle SSL connection, fix infinite loop (#155) * handling OSError * better error output * removed traceback logging --------- Co-authored-by: Alexander Sibiryakov --- kafka/conn.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/conn.py b/kafka/conn.py index 1f3bc2006..80f17009c 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -510,7 +510,7 @@ def _try_handshake(self): # old ssl in python2.6 will swallow all SSLErrors here... except (SSLWantReadError, SSLWantWriteError): pass - except (SSLZeroReturnError, ConnectionError, TimeoutError, SSLEOFError): + except (SSLZeroReturnError, ConnectionError, TimeoutError, SSLEOFError, ssl.SSLError, OSError) as e: log.warning('SSL connection closed by server during handshake.') self.close(Errors.KafkaConnectionError('SSL connection closed by server during handshake')) # Other SSLErrors will be raised to user From 54cbd63a275e781c9db98b57d6daa36a15eaa0ff Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sun, 10 Mar 2024 13:05:12 -0400 Subject: [PATCH 7/8] client_async: Allow throwing an exception upon socket error during (#134) wakeup When wakeup() is called, we sometime notice that we get an endless prints: "Unable to send to wakeup socket!". Those prints are spamming the logs. This commit aims to address it by allowing restating the application via an intentional exception raise. This behavior is configurable and its default is backward compatible. Signed-off-by: shimon-armis Co-authored-by: shimon-armis --- kafka/client_async.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 530a1f441..3076c4ba0 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -154,6 +154,8 @@ class KafkaClient(object): sasl mechanism handshake. Default: one of bootstrap servers sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None + raise_upon_socket_err_during_wakeup (bool): If set to True, raise an exception + upon socket error during wakeup(). Default: False """ DEFAULT_CONFIG = { @@ -192,7 +194,8 @@ class KafkaClient(object): 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, - 'sasl_oauth_token_provider': None + 'sasl_oauth_token_provider': None, + 'raise_upon_socket_err_during_wakeup': False } def __init__(self, **configs): @@ -243,6 +246,8 @@ def __init__(self, **configs): check_timeout = self.config['api_version_auto_timeout_ms'] / 1000 self.config['api_version'] = self.check_version(timeout=check_timeout) + self._raise_upon_socket_err_during_wakeup = self.config['raise_upon_socket_err_during_wakeup'] + def _can_bootstrap(self): effective_failures = self._bootstrap_fails // self._num_bootstrap_hosts backoff_factor = 2 ** effective_failures @@ -936,8 +941,10 @@ def wakeup(self): except socket.timeout: log.warning('Timeout to send to wakeup socket!') raise Errors.KafkaTimeoutError() - except socket.error: + except socket.error as e: log.warning('Unable to send to wakeup socket!') + if self._raise_upon_socket_err_during_wakeup: + raise e def _clear_wake_fd(self): # reading from wake socket should only happen in a single thread From eb6fd9b46a1b4fa79a2e52da60ac809d0b0ddb8f Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Tue, 12 Mar 2024 17:05:12 -0400 Subject: [PATCH 8/8] Log connection errors at ERROR level (#139) Co-authored-by: drewdogg --- kafka/conn.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/conn.py b/kafka/conn.py index 80f17009c..d04acce3e 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -916,7 +916,7 @@ def close(self, error=None): with self._lock: if self.state is ConnectionStates.DISCONNECTED: return - log.info('%s: Closing connection. %s', self, error or '') + log.log(logging.ERROR if error else logging.INFO, '%s: Closing connection. %s', self, error or '') self._update_reconnect_backoff() self._sasl_auth_future = None self._protocol = KafkaProtocol(