From 4d598055dab7da99e41bfcceffa8462b32931cdd Mon Sep 17 00:00:00 2001 From: Kurt McKee Date: Sun, 27 Mar 2022 00:01:45 -0700 Subject: [PATCH 01/38] Fix the link to the compatibility page (#2295) The current link leads to HTTP 404. --- docs/index.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/index.rst b/docs/index.rst index 1f2a4ce98..91e5086cc 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -2,7 +2,7 @@ kafka-python ############ .. image:: https://img.shields.io/badge/kafka-2.6%2C%202.5%2C%202.4%2C%202.3%2C%202.2%2C%202.1%2C%202.0%2C%201.1%2C%201.0%2C%200.11%2C%200.10%2C%200.9%2C%200.8-brightgreen.svg - :target: https://kafka-python.readthedocs.io/compatibility.html + :target: https://kafka-python.readthedocs.io/en/master/compatibility.html .. image:: https://img.shields.io/pypi/pyversions/kafka-python.svg :target: https://pypi.python.org/pypi/kafka-python .. image:: https://coveralls.io/repos/dpkp/kafka-python/badge.svg?branch=master&service=github From 7ac6c6e29099ccba4d50f5b842972dd7332d0e58 Mon Sep 17 00:00:00 2001 From: Andrew Zhang Date: Thu, 2 Mar 2023 15:25:13 -0500 Subject: [PATCH 02/38] Allow disabling thread wakeup in send_request_to_node (#2335) --- kafka/admin/client.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index fd4d66110..8eb7504a7 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -355,13 +355,14 @@ def _find_coordinator_ids(self, group_ids): } return groups_coordinators - def _send_request_to_node(self, node_id, request): + def _send_request_to_node(self, node_id, request, wakeup=True): """Send a Kafka protocol message to a specific broker. Returns a future that may be polled for status and results. :param node_id: The broker id to which to send the message. :param request: The message to send. + :param wakeup: Optional flag to disable thread-wakeup. :return: A future object that may be polled for status and results. :exception: The exception if the message could not be sent. """ @@ -369,7 +370,7 @@ def _send_request_to_node(self, node_id, request): # poll until the connection to broker is ready, otherwise send() # will fail with NodeNotReadyError self._client.poll() - return self._client.send(node_id, request) + return self._client.send(node_id, request, wakeup) def _send_request_to_controller(self, request): """Send a Kafka protocol message to the cluster controller. From f8a7e9b8b4b6db298a43d9fe5d427e6439d5824a Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 4 Aug 2023 17:57:27 -0400 Subject: [PATCH 03/38] Transition CI/CD to GitHub Workflows (#2378) * Create GH workflows to test code * Update tests for future Python versions --- .github/dependabot.yml | 7 + .github/workflows/codeql-analysis.yml | 67 ++++++++++ .github/workflows/python-package.yml | 179 ++++++++++++++++++++++++++ Makefile | 6 +- README.rst | 10 +- requirements-dev.txt | 34 ++--- setup.py | 9 +- test/test_assignors.py | 2 +- tox.ini | 21 +-- travis_java_install.sh | 0 10 files changed, 303 insertions(+), 32 deletions(-) create mode 100644 .github/dependabot.yml create mode 100644 .github/workflows/codeql-analysis.yml create mode 100644 .github/workflows/python-package.yml mode change 100644 => 100755 travis_java_install.sh diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 000000000..2c7d17083 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,7 @@ +version: 2 +updates: + # Maintain dependencies for GitHub Actions + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: "daily" diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml new file mode 100644 index 000000000..43427fab9 --- /dev/null +++ b/.github/workflows/codeql-analysis.yml @@ -0,0 +1,67 @@ +--- +# For most projects, this workflow file will not need changing; you simply need +# to commit it to your repository. +# +# You may wish to alter this file to override the set of languages analyzed, +# or to provide custom queries or build logic. +# +# ******** NOTE ******** +# We have attempted to detect the languages in your repository. Please check +# the `language` matrix defined below to confirm you have the correct set of +# supported CodeQL languages. +# +name: CodeQL +on: + push: + branches: [master] + pull_request: + # The branches below must be a subset of the branches above + branches: [master] + schedule: + - cron: 19 10 * * 6 +jobs: + analyze: + name: Analyze + runs-on: ubuntu-latest + permissions: + actions: read + contents: read + security-events: write + strategy: + fail-fast: false + matrix: + language: [python] + # CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python' ] + # Learn more: + # https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + # Initializes the CodeQL tools for scanning. + - name: Initialize CodeQL + uses: github/codeql-action/init@v2 + with: + languages: ${{ matrix.language }} + # If you wish to specify custom queries, you can do so here or in a config file. + # By default, queries listed here will override any specified in a config file. + # Prefix the list here with "+" to use these queries and those in the config file. + # queries: ./path/to/local/query, your-org/your-repo/queries@main + + # Autobuild attempts to build any compiled languages (C/C++, C#, or Java). + # If this step fails, then you should remove it and run the build manually (see below) + - name: Autobuild + uses: github/codeql-action/autobuild@v2 + + # ℹī¸ Command-line programs to run using the OS shell. + # 📚 https://git.io/JvXDl + + # ✏ī¸ If the Autobuild fails above, remove it and uncomment the following three lines + # and modify them (or add more) to build your code if your project + # uses a compiled language + + #- run: | + # make bootstrap + # make release + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v2 diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml new file mode 100644 index 000000000..9ef4846bd --- /dev/null +++ b/.github/workflows/python-package.yml @@ -0,0 +1,179 @@ +name: CI/CD + +on: + push: + branches: ["master"] + pull_request: + branches: ["master"] + release: + types: [created] + branches: + - 'master' + workflow_dispatch: + +env: + FORCE_COLOR: "1" # Make tools pretty. + PIP_DISABLE_PIP_VERSION_CHECK: "1" + PIP_NO_PYTHON_VERSION_WARNING: "1" + PYTHON_LATEST: "3.11" + KAFKA_LATEST: "2.6.0" + + # For re-actors/checkout-python-sdist + sdist-artifact: python-package-distributions + +jobs: + + build-sdist: + name: đŸ“Ļ Build the source distribution + runs-on: ubuntu-latest + steps: + - name: Checkout project + uses: actions/checkout@v3 + with: + fetch-depth: 0 + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: ${{ env.PYTHON_LATEST }} + cache: pip + - run: python -m pip install build + name: Install core libraries for build and install + - name: Build artifacts + run: python -m build + - name: Upload built artifacts for testing + uses: actions/upload-artifact@v3 + with: + name: ${{ env.sdist-artifact }} + # NOTE: Exact expected file names are specified here + # NOTE: as a safety measure — if anything weird ends + # NOTE: up being in this dir or not all dists will be + # NOTE: produced, this will fail the workflow. + path: dist/${{ env.sdist-name }} + retention-days: 15 + + test-python: + name: Tests on ${{ matrix.python-version }} + needs: + - build-sdist + runs-on: ubuntu-latest + continue-on-error: ${{ matrix.experimental }} + strategy: + fail-fast: false + matrix: + python-version: + - "3.8" + - "3.9" + - "3.10" + - "3.11" + experimental: [ false ] + include: + - python-version: "pypy3.9" + experimental: true +# - python-version: "~3.12.0-0" +# experimental: true + steps: + - name: Checkout the source code + uses: actions/checkout@v3 + with: + fetch-depth: 0 + - name: Setup java + uses: actions/setup-java@v3 + with: + distribution: temurin + java-version: 11 + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + cache: pip + cache-dependency-path: | + requirements-dev.txt + - name: Check Java installation + run: source travis_java_install.sh + - name: Pull Kafka releases + run: ./build_integration.sh + env: + PLATFORM: ${{ matrix.platform }} + KAFKA_VERSION: ${{ env.KAFKA_LATEST }} + # TODO: Cache releases to expedite testing + - name: Install dependencies + run: | + sudo apt install -y libsnappy-dev libzstd-dev + python -m pip install --upgrade pip + python -m pip install tox tox-gh-actions + pip install . + pip install -r requirements-dev.txt + - name: Test with tox + run: tox + env: + PLATFORM: ${{ matrix.platform }} + KAFKA_VERSION: ${{ env.KAFKA_LATEST }} + + test-kafka: + name: Tests for Kafka ${{ matrix.kafka-version }} + needs: + - build-sdist + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + kafka-version: + - "0.8.2.2" + - "0.9.0.1" + - "0.10.2.2" + - "0.11.0.2" + - "0.11.0.3" + - "1.1.1" + - "2.4.0" + - "2.5.0" + - "2.6.0" + steps: + - name: Checkout the source code + uses: actions/checkout@v3 + with: + fetch-depth: 0 + - name: Setup java + uses: actions/setup-java@v3 + with: + distribution: temurin + java-version: 8 + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: ${{ env.PYTHON_LATEST }} + cache: pip + cache-dependency-path: | + requirements-dev.txt + - name: Pull Kafka releases + run: ./build_integration.sh + env: + # This is fast enough as long as you pull only one release at a time, + # no need to worry about caching + PLATFORM: ${{ matrix.platform }} + KAFKA_VERSION: ${{ matrix.kafka-version }} + - name: Install dependencies + run: | + sudo apt install -y libsnappy-dev libzstd-dev + python -m pip install --upgrade pip + python -m pip install tox tox-gh-actions + pip install . + pip install -r requirements-dev.txt + - name: Test with tox + run: tox + env: + PLATFORM: ${{ matrix.platform }} + KAFKA_VERSION: ${{ matrix.kafka-version }} + + check: # This job does nothing and is only used for the branch protection + name: ✅ Ensure the required checks passing + if: always() + needs: + - build-sdist + - test-python + - test-kafka + runs-on: ubuntu-latest + steps: + - name: Decide whether the needed jobs succeeded or failed + uses: re-actors/alls-green@release/v1 + with: + jobs: ${{ toJSON(needs) }} diff --git a/Makefile b/Makefile index b4dcbffc9..fc8fa5b21 100644 --- a/Makefile +++ b/Makefile @@ -20,14 +20,14 @@ test37: build-integration test27: build-integration KAFKA_VERSION=$(KAFKA_VERSION) SCALA_VERSION=$(SCALA_VERSION) tox -e py27 -- $(FLAGS) -# Test using py.test directly if you want to use local python. Useful for other +# Test using pytest directly if you want to use local python. Useful for other # platforms that require manual installation for C libraries, ie. Windows. test-local: build-integration - KAFKA_VERSION=$(KAFKA_VERSION) SCALA_VERSION=$(SCALA_VERSION) py.test \ + KAFKA_VERSION=$(KAFKA_VERSION) SCALA_VERSION=$(SCALA_VERSION) pytest \ --pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF $(FLAGS) kafka test cov-local: build-integration - KAFKA_VERSION=$(KAFKA_VERSION) SCALA_VERSION=$(SCALA_VERSION) py.test \ + KAFKA_VERSION=$(KAFKA_VERSION) SCALA_VERSION=$(SCALA_VERSION) pytest \ --pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka \ --cov-config=.covrc --cov-report html $(FLAGS) kafka test @echo "open file://`pwd`/htmlcov/index.html" diff --git a/README.rst b/README.rst index 5f834442c..78a92a884 100644 --- a/README.rst +++ b/README.rst @@ -7,10 +7,16 @@ Kafka Python client :target: https://pypi.python.org/pypi/kafka-python .. image:: https://coveralls.io/repos/dpkp/kafka-python/badge.svg?branch=master&service=github :target: https://coveralls.io/github/dpkp/kafka-python?branch=master -.. image:: https://travis-ci.org/dpkp/kafka-python.svg?branch=master - :target: https://travis-ci.org/dpkp/kafka-python .. image:: https://img.shields.io/badge/license-Apache%202-blue.svg :target: https://github.com/dpkp/kafka-python/blob/master/LICENSE +.. image:: https://img.shields.io/pypi/dw/kafka-python.svg + :target: https://pypistats.org/packages/kafka-python +.. image:: https://img.shields.io/pypi/v/kafka-python.svg + :target: https://pypi.org/project/kafka-python +.. image:: https://img.shields.io/pypi/implementation/kafka-python + :target: https://github.com/dpkp/kafka-python/blob/master/setup.py + + Python client for the Apache Kafka distributed stream processing system. kafka-python is designed to function much like the official java client, with a diff --git a/requirements-dev.txt b/requirements-dev.txt index 00ad68c22..1fa933da2 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,17 +1,17 @@ -coveralls==2.1.2 -crc32c==2.1 -docker-py==1.10.6 -flake8==3.8.3 -lz4==3.1.0 -mock==4.0.2 -py==1.9.0 -pylint==2.6.0 -pytest==6.0.2 -pytest-cov==2.10.1 -pytest-mock==3.3.1 -pytest-pylint==0.17.0 -python-snappy==0.5.4 -Sphinx==3.2.1 -sphinx-rtd-theme==0.5.0 -tox==3.20.0 -xxhash==2.0.0 +coveralls +crc32c +docker-py +flake8 +lz4 +mock +py +pylint +pytest +pytest-cov +pytest-mock +pytest-pylint +python-snappy +Sphinx +sphinx-rtd-theme +tox +xxhash diff --git a/setup.py b/setup.py index fe8a594f3..2b5ca380f 100644 --- a/setup.py +++ b/setup.py @@ -50,7 +50,10 @@ def run(cls): license="Apache License 2.0", description="Pure Python client for Apache Kafka", long_description=README, - keywords="apache kafka", + keywords=[ + "apache kafka", + "kafka", + ], classifiers=[ "Development Status :: 5 - Production/Stable", "Intended Audience :: Developers", @@ -64,6 +67,10 @@ def run(cls): "Programming Language :: Python :: 3.6", "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: PyPy", "Topic :: Software Development :: Libraries :: Python Modules", ] diff --git a/test/test_assignors.py b/test/test_assignors.py index 67e91e131..858ef426d 100644 --- a/test/test_assignors.py +++ b/test/test_assignors.py @@ -655,7 +655,7 @@ def test_conflicting_previous_assignments(mocker): 'execution_number,n_topics,n_consumers', [(i, randint(10, 20), randint(20, 40)) for i in range(100)] ) def test_reassignment_with_random_subscriptions_and_changes(mocker, execution_number, n_topics, n_consumers): - all_topics = set(['t{}'.format(i) for i in range(1, n_topics + 1)]) + all_topics = sorted(['t{}'.format(i) for i in range(1, n_topics + 1)]) partitions = dict([(t, set(range(1, i + 1))) for i, t in enumerate(all_topics)]) cluster = create_cluster(mocker, topics=all_topics, topic_partitions_lambda=lambda t: partitions[t]) diff --git a/tox.ini b/tox.ini index 10e9911dc..7a38ee4a8 100644 --- a/tox.ini +++ b/tox.ini @@ -1,17 +1,25 @@ [tox] -envlist = py{26,27,34,35,36,37,38,py}, docs +envlist = py{38,39,310,311,py}, docs [pytest] testpaths = kafka test addopts = --durations=10 log_format = %(created)f %(filename)-23s %(threadName)s %(message)s +[gh-actions] +python = + 3.8: py38 + 3.9: py39 + 3.10: py310 + 3.11: py311 + pypy-3.9: pypy + [testenv] deps = pytest pytest-cov - py{27,34,35,36,37,38,py}: pylint - py{27,34,35,36,37,38,py}: pytest-pylint + pylint + pytest-pylint pytest-mock mock python-snappy @@ -20,19 +28,16 @@ deps = xxhash crc32c commands = - py.test {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka --cov-config=.covrc} + pytest {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka --cov-config=.covrc} setenv = CRC32C_SW_MODE = auto PROJECT_ROOT = {toxinidir} passenv = KAFKA_VERSION -[testenv:py26] -# pylint doesn't support python2.6 -commands = py.test {posargs:--cov=kafka --cov-config=.covrc} [testenv:pypy] # pylint is super slow on pypy... -commands = py.test {posargs:--cov=kafka --cov-config=.covrc} +commands = pytest {posargs:--cov=kafka --cov-config=.covrc} [testenv:docs] deps = diff --git a/travis_java_install.sh b/travis_java_install.sh old mode 100644 new mode 100755 From 94901bb1b3a7322c778d60edb90156c9cc27e1f9 Mon Sep 17 00:00:00 2001 From: Majeed Dourandeesh Date: Sat, 5 Aug 2023 00:58:20 +0300 Subject: [PATCH 04/38] Update usage.rst (#2334) add imort json and msgpack into consumer and producer --- docs/usage.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/usage.rst b/docs/usage.rst index 1cf1aa414..fb58509a7 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -8,6 +8,8 @@ KafkaConsumer .. code:: python from kafka import KafkaConsumer + import json + import msgpack # To consume latest messages and auto-commit offsets consumer = KafkaConsumer('my-topic', @@ -57,6 +59,8 @@ KafkaProducer from kafka import KafkaProducer from kafka.errors import KafkaError + import msgpack + import json producer = KafkaProducer(bootstrap_servers=['broker1:1234']) From 46473bacafd759bc6cd072876327c6a7a5415007 Mon Sep 17 00:00:00 2001 From: Tim Gates Date: Sat, 5 Aug 2023 07:58:37 +1000 Subject: [PATCH 05/38] docs: Fix a few typos (#2319) * docs: Fix a few typos There are small typos in: - kafka/codec.py - kafka/coordinator/base.py - kafka/record/abc.py - kafka/record/legacy_records.py Fixes: - Should read `timestamp` rather than `typestamp`. - Should read `minimum` rather than `miniumum`. - Should read `encapsulated` rather than `incapsulates`. - Should read `callback` rather than `callbak`. * Update abc.py --- kafka/codec.py | 2 +- kafka/coordinator/base.py | 2 +- kafka/record/abc.py | 2 +- kafka/record/legacy_records.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/kafka/codec.py b/kafka/codec.py index 917400e74..c740a181c 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -187,7 +187,7 @@ def _detect_xerial_stream(payload): The version is the version of this format as written by xerial, in the wild this is currently 1 as such we only support v1. - Compat is there to claim the miniumum supported version that + Compat is there to claim the minimum supported version that can read a xerial block stream, presently in the wild this is 1. """ diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 5e41309df..e71984108 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -952,7 +952,7 @@ def _run_once(self): # disable here to prevent propagating an exception to this # heartbeat thread # must get client._lock, or maybe deadlock at heartbeat - # failure callbak in consumer poll + # failure callback in consumer poll self.coordinator._client.poll(timeout_ms=0) with self.coordinator._lock: diff --git a/kafka/record/abc.py b/kafka/record/abc.py index d5c172aaa..8509e23e5 100644 --- a/kafka/record/abc.py +++ b/kafka/record/abc.py @@ -85,7 +85,7 @@ def build(self): class ABCRecordBatch(object): - """ For v2 incapsulates a RecordBatch, for v0/v1 a single (maybe + """ For v2 encapsulates a RecordBatch, for v0/v1 a single (maybe compressed) message. """ __metaclass__ = abc.ABCMeta diff --git a/kafka/record/legacy_records.py b/kafka/record/legacy_records.py index e2ee5490c..2f8523fcb 100644 --- a/kafka/record/legacy_records.py +++ b/kafka/record/legacy_records.py @@ -263,7 +263,7 @@ def __iter__(self): # When magic value is greater than 0, the timestamp # of a compressed message depends on the - # typestamp type of the wrapper message: + # timestamp type of the wrapper message: if timestamp_type == self.LOG_APPEND_TIME: timestamp = self._timestamp From b7a9be6c48f82a957dce0a12e4070aa612eb82f9 Mon Sep 17 00:00:00 2001 From: Atheer Abdullatif <42766508+athlatif@users.noreply.github.com> Date: Sat, 5 Aug 2023 00:59:06 +0300 Subject: [PATCH 06/38] Update usage.rst (#2308) Adding [ClusterMetadata] and [KafkaAdminClient] --- docs/usage.rst | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/docs/usage.rst b/docs/usage.rst index fb58509a7..047bbad77 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -112,3 +112,52 @@ KafkaProducer # configure multiple retries producer = KafkaProducer(retries=5) + + +ClusterMetadata +============= +.. code:: python + + from kafka.cluster import ClusterMetadata + + clusterMetadata = ClusterMetadata(bootstrap_servers=['broker1:1234']) + + # get all brokers metadata + print(clusterMetadata.brokers()) + + # get specific broker metadata + print(clusterMetadata.broker_metadata('bootstrap-0')) + + # get all partitions of a topic + print(clusterMetadata.partitions_for_topic("topic")) + + # list topics + print(clusterMetadata.topics()) + + +KafkaAdminClient +============= +.. code:: python + from kafka import KafkaAdminClient + from kafka.admin import NewTopic + + admin = KafkaAdminClient(bootstrap_servers=['broker1:1234']) + + # create a new topic + topics_list = [] + topics_list.append(NewTopic(name="testtopic", num_partitions=1, replication_factor=1)) + admin.create_topics(topics_list,timeout_ms=None, validate_only=False) + + # delete a topic + admin.delete_topics(['testtopic']) + + # list consumer groups + print(admin.list_consumer_groups()) + + # get consumer group details + print(admin.describe_consumer_groups('cft-plt-qa.connect')) + + # get consumer group offset + print(admin.list_consumer_group_offsets('cft-plt-qa.connect')) + + From 57d833820bf20c84618954108767da08ea22f853 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 4 Aug 2023 18:51:28 -0400 Subject: [PATCH 07/38] Enable testing for Python 3.12 (#2379) I don't expect this to work yet since I know 3.12 is in an incomplete state, but here goes nothing. --- .github/workflows/python-package.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 9ef4846bd..37875fb9f 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -69,8 +69,8 @@ jobs: include: - python-version: "pypy3.9" experimental: true -# - python-version: "~3.12.0-0" -# experimental: true + - python-version: "~3.12.0-0" + experimental: true steps: - name: Checkout the source code uses: actions/checkout@v3 From 7e87a014a9e47a7da3af73c76b31e802e208e7a3 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sun, 6 Aug 2023 15:23:04 -0400 Subject: [PATCH 08/38] Add py312 to tox.ini (#2382) --- tox.ini | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index 7a38ee4a8..d9b1e36d4 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py{38,39,310,311,py}, docs +envlist = py{38,39,310,311,312,py}, docs [pytest] testpaths = kafka test @@ -12,6 +12,7 @@ python = 3.9: py39 3.10: py310 3.11: py311 + 3.12: py312 pypy-3.9: pypy [testenv] From f98498411caabcf60894c536ce8fc9e83bd43241 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sun, 6 Aug 2023 18:28:52 -0400 Subject: [PATCH 09/38] Update fixtures.py to use "127.0.0.1" for local ports (#2384) --- test/fixtures.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/fixtures.py b/test/fixtures.py index 26fb5e89d..d9c072b86 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -25,7 +25,7 @@ def get_open_port(): sock = socket.socket() - sock.bind(("", 0)) + sock.bind(("127.0.0.1", 0)) port = sock.getsockname()[1] sock.close() return port From d9201085f021aaa376b6ef429f9afc2cc4d29439 Mon Sep 17 00:00:00 2001 From: Felix B Date: Tue, 8 Aug 2023 15:33:53 +0200 Subject: [PATCH 10/38] use isinstance in builtin crc32 (#2329) --- kafka/record/_crc32c.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/record/_crc32c.py b/kafka/record/_crc32c.py index ecff48f5e..9b51ad8a9 100644 --- a/kafka/record/_crc32c.py +++ b/kafka/record/_crc32c.py @@ -105,7 +105,7 @@ def crc_update(crc, data): Returns: 32-bit updated CRC-32C as long. """ - if type(data) != array.array or data.itemsize != 1: + if not isinstance(data, array.array) or data.itemsize != 1: buf = array.array("B", data) else: buf = data From a33fcf4d22bdf34e9660e394a7a6f84225411325 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Wed, 9 Aug 2023 12:44:53 -0400 Subject: [PATCH 11/38] Update setup.py to install zstandard instead of python-zstandard (#2387) Closes https://github.com/dpkp/kafka-python/issues/2350, since it's a valid security concern. --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 2b5ca380f..483d7ab60 100644 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ def run(cls): "crc32c": ["crc32c"], "lz4": ["lz4"], "snappy": ["python-snappy"], - "zstd": ["python-zstandard"], + "zstd": ["zstandard"], }, cmdclass={"test": Tox}, packages=find_packages(exclude=['test']), From d894e9aac0f5154b62f5cd08cc769a1f955d3eb7 Mon Sep 17 00:00:00 2001 From: shifqu Date: Thu, 2 Nov 2023 04:58:38 +0100 Subject: [PATCH 12/38] build: update vendored six from 1.11.0 to 1.16.0 (#2398) In this commit, the del X is still commented out due to the fact that upstream https://github.com/benjaminp/six/pull/176 is not merged. --- kafka/vendor/six.py | 149 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 128 insertions(+), 21 deletions(-) diff --git a/kafka/vendor/six.py b/kafka/vendor/six.py index 3621a0ab4..319821353 100644 --- a/kafka/vendor/six.py +++ b/kafka/vendor/six.py @@ -1,6 +1,6 @@ # pylint: skip-file -# Copyright (c) 2010-2017 Benjamin Peterson +# Copyright (c) 2010-2020 Benjamin Peterson # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -31,7 +31,7 @@ import types __author__ = "Benjamin Peterson " -__version__ = "1.11.0" +__version__ = "1.16.0" # Useful for very coarse version differentiation. @@ -77,6 +77,11 @@ def __len__(self): # https://github.com/dpkp/kafka-python/pull/979#discussion_r100403389 # del X +if PY34: + from importlib.util import spec_from_loader +else: + spec_from_loader = None + def _add_doc(func, doc): """Add documentation to a function.""" @@ -192,6 +197,11 @@ def find_module(self, fullname, path=None): return self return None + def find_spec(self, fullname, path, target=None): + if fullname in self.known_modules: + return spec_from_loader(fullname, self) + return None + def __get_module(self, fullname): try: return self.known_modules[fullname] @@ -229,6 +239,12 @@ def get_code(self, fullname): return None get_source = get_code # same as get_code + def create_module(self, spec): + return self.load_module(spec.name) + + def exec_module(self, module): + pass + _importer = _SixMetaPathImporter(__name__) @@ -253,7 +269,7 @@ class _MovedItems(_LazyModule): MovedAttribute("reduce", "__builtin__", "functools"), MovedAttribute("shlex_quote", "pipes", "shlex", "quote"), MovedAttribute("StringIO", "StringIO", "io"), - MovedAttribute("UserDict", "UserDict", "collections"), + MovedAttribute("UserDict", "UserDict", "collections", "IterableUserDict", "UserDict"), MovedAttribute("UserList", "UserList", "collections"), MovedAttribute("UserString", "UserString", "collections"), MovedAttribute("xrange", "__builtin__", "builtins", "xrange", "range"), @@ -261,9 +277,11 @@ class _MovedItems(_LazyModule): MovedAttribute("zip_longest", "itertools", "itertools", "izip_longest", "zip_longest"), MovedModule("builtins", "__builtin__"), MovedModule("configparser", "ConfigParser"), + MovedModule("collections_abc", "collections", "collections.abc" if sys.version_info >= (3, 3) else "collections"), MovedModule("copyreg", "copy_reg"), MovedModule("dbm_gnu", "gdbm", "dbm.gnu"), - MovedModule("_dummy_thread", "dummy_thread", "_dummy_thread"), + MovedModule("dbm_ndbm", "dbm", "dbm.ndbm"), + MovedModule("_dummy_thread", "dummy_thread", "_dummy_thread" if sys.version_info < (3, 9) else "_thread"), MovedModule("http_cookiejar", "cookielib", "http.cookiejar"), MovedModule("http_cookies", "Cookie", "http.cookies"), MovedModule("html_entities", "htmlentitydefs", "html.entities"), @@ -643,13 +661,16 @@ def u(s): import io StringIO = io.StringIO BytesIO = io.BytesIO + del io _assertCountEqual = "assertCountEqual" if sys.version_info[1] <= 1: _assertRaisesRegex = "assertRaisesRegexp" _assertRegex = "assertRegexpMatches" + _assertNotRegex = "assertNotRegexpMatches" else: _assertRaisesRegex = "assertRaisesRegex" _assertRegex = "assertRegex" + _assertNotRegex = "assertNotRegex" else: def b(s): return s @@ -671,6 +692,7 @@ def indexbytes(buf, i): _assertCountEqual = "assertItemsEqual" _assertRaisesRegex = "assertRaisesRegexp" _assertRegex = "assertRegexpMatches" + _assertNotRegex = "assertNotRegexpMatches" _add_doc(b, """Byte literal""") _add_doc(u, """Text literal""") @@ -687,6 +709,10 @@ def assertRegex(self, *args, **kwargs): return getattr(self, _assertRegex)(*args, **kwargs) +def assertNotRegex(self, *args, **kwargs): + return getattr(self, _assertNotRegex)(*args, **kwargs) + + if PY3: exec_ = getattr(moves.builtins, "exec") @@ -722,16 +748,7 @@ def exec_(_code_, _globs_=None, _locs_=None): """) -if sys.version_info[:2] == (3, 2): - exec_("""def raise_from(value, from_value): - try: - if from_value is None: - raise value - raise value from from_value - finally: - value = None -""") -elif sys.version_info[:2] > (3, 2): +if sys.version_info[:2] > (3,): exec_("""def raise_from(value, from_value): try: raise value from from_value @@ -811,13 +828,33 @@ def print_(*args, **kwargs): _add_doc(reraise, """Reraise an exception.""") if sys.version_info[0:2] < (3, 4): + # This does exactly the same what the :func:`py3:functools.update_wrapper` + # function does on Python versions after 3.2. It sets the ``__wrapped__`` + # attribute on ``wrapper`` object and it doesn't raise an error if any of + # the attributes mentioned in ``assigned`` and ``updated`` are missing on + # ``wrapped`` object. + def _update_wrapper(wrapper, wrapped, + assigned=functools.WRAPPER_ASSIGNMENTS, + updated=functools.WRAPPER_UPDATES): + for attr in assigned: + try: + value = getattr(wrapped, attr) + except AttributeError: + continue + else: + setattr(wrapper, attr, value) + for attr in updated: + getattr(wrapper, attr).update(getattr(wrapped, attr, {})) + wrapper.__wrapped__ = wrapped + return wrapper + _update_wrapper.__doc__ = functools.update_wrapper.__doc__ + def wraps(wrapped, assigned=functools.WRAPPER_ASSIGNMENTS, updated=functools.WRAPPER_UPDATES): - def wrapper(f): - f = functools.wraps(wrapped, assigned, updated)(f) - f.__wrapped__ = wrapped - return f - return wrapper + return functools.partial(_update_wrapper, wrapped=wrapped, + assigned=assigned, updated=updated) + wraps.__doc__ = functools.wraps.__doc__ + else: wraps = functools.wraps @@ -830,7 +867,15 @@ def with_metaclass(meta, *bases): class metaclass(type): def __new__(cls, name, this_bases, d): - return meta(name, bases, d) + if sys.version_info[:2] >= (3, 7): + # This version introduced PEP 560 that requires a bit + # of extra care (we mimic what is done by __build_class__). + resolved_bases = types.resolve_bases(bases) + if resolved_bases is not bases: + d['__orig_bases__'] = bases + else: + resolved_bases = bases + return meta(name, resolved_bases, d) @classmethod def __prepare__(cls, name, this_bases): @@ -850,13 +895,75 @@ def wrapper(cls): orig_vars.pop(slots_var) orig_vars.pop('__dict__', None) orig_vars.pop('__weakref__', None) + if hasattr(cls, '__qualname__'): + orig_vars['__qualname__'] = cls.__qualname__ return metaclass(cls.__name__, cls.__bases__, orig_vars) return wrapper +def ensure_binary(s, encoding='utf-8', errors='strict'): + """Coerce **s** to six.binary_type. + + For Python 2: + - `unicode` -> encoded to `str` + - `str` -> `str` + + For Python 3: + - `str` -> encoded to `bytes` + - `bytes` -> `bytes` + """ + if isinstance(s, binary_type): + return s + if isinstance(s, text_type): + return s.encode(encoding, errors) + raise TypeError("not expecting type '%s'" % type(s)) + + +def ensure_str(s, encoding='utf-8', errors='strict'): + """Coerce *s* to `str`. + + For Python 2: + - `unicode` -> encoded to `str` + - `str` -> `str` + + For Python 3: + - `str` -> `str` + - `bytes` -> decoded to `str` + """ + # Optimization: Fast return for the common case. + if type(s) is str: + return s + if PY2 and isinstance(s, text_type): + return s.encode(encoding, errors) + elif PY3 and isinstance(s, binary_type): + return s.decode(encoding, errors) + elif not isinstance(s, (text_type, binary_type)): + raise TypeError("not expecting type '%s'" % type(s)) + return s + + +def ensure_text(s, encoding='utf-8', errors='strict'): + """Coerce *s* to six.text_type. + + For Python 2: + - `unicode` -> `unicode` + - `str` -> `unicode` + + For Python 3: + - `str` -> `str` + - `bytes` -> decoded to `str` + """ + if isinstance(s, binary_type): + return s.decode(encoding, errors) + elif isinstance(s, text_type): + return s + else: + raise TypeError("not expecting type '%s'" % type(s)) + + def python_2_unicode_compatible(klass): """ - A decorator that defines __unicode__ and __str__ methods under Python 2. + A class decorator that defines __unicode__ and __str__ methods under Python 2. Under Python 3 it does nothing. To support Python 2 and 3 with a single code base, define a __str__ method From 779a23c81755b763a5fd90194d12b997889f9f8c Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 2 Nov 2023 00:04:55 -0400 Subject: [PATCH 13/38] Bump actions/checkout from 3 to 4 (#2392) Bumps [actions/checkout](https://github.com/actions/checkout) from 3 to 4. - [Release notes](https://github.com/actions/checkout/releases) - [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md) - [Commits](https://github.com/actions/checkout/compare/v3...v4) --- updated-dependencies: - dependency-name: actions/checkout dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/codeql-analysis.yml | 2 +- .github/workflows/python-package.yml | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index 43427fab9..0d5078b39 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -36,7 +36,7 @@ jobs: # https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed steps: - name: Checkout repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 # Initializes the CodeQL tools for scanning. - name: Initialize CodeQL diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 37875fb9f..50ade7486 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -28,7 +28,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout project - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: fetch-depth: 0 - name: Set up Python @@ -73,7 +73,7 @@ jobs: experimental: true steps: - name: Checkout the source code - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: fetch-depth: 0 - name: Setup java @@ -129,7 +129,7 @@ jobs: - "2.6.0" steps: - name: Checkout the source code - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: fetch-depth: 0 - name: Setup java From 4861bee15458effd30e69f9ad0b6373d6f8417e0 Mon Sep 17 00:00:00 2001 From: Hirotaka Wakabayashi Date: Fri, 3 Nov 2023 11:40:34 +0900 Subject: [PATCH 14/38] Uses assert_called_with instead of called_with (#2375) --- test/test_client_async.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/test_client_async.py b/test/test_client_async.py index 74da66a36..66b227aa9 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -220,12 +220,12 @@ def test_send(cli, conn): request = ProduceRequest[0](0, 0, []) assert request.expect_response() is False ret = cli.send(0, request) - assert conn.send.called_with(request) + conn.send.assert_called_with(request, blocking=False) assert isinstance(ret, Future) request = MetadataRequest[0]([]) cli.send(0, request) - assert conn.send.called_with(request) + conn.send.assert_called_with(request, blocking=False) def test_poll(mocker): From 0362b87ab47ac198b5348936e9c89f3c454e20f1 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Thu, 2 Nov 2023 23:02:08 -0400 Subject: [PATCH 15/38] Update python-package.yml to expect 3.12 tests to pass and extend experimental tests (#2406) --- .github/workflows/python-package.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 50ade7486..6f9ef58a1 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -65,11 +65,12 @@ jobs: - "3.9" - "3.10" - "3.11" + - "3.12" experimental: [ false ] include: - python-version: "pypy3.9" experimental: true - - python-version: "~3.12.0-0" + - python-version: "~3.13.0-0" experimental: true steps: - name: Checkout the source code From 0dbf74689bb51dd517b6b8c8035c2370f2b8dd3a Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Thu, 2 Nov 2023 23:02:45 -0400 Subject: [PATCH 16/38] Update setup.py to indicate 3.12 support --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index 483d7ab60..77043da04 100644 --- a/setup.py +++ b/setup.py @@ -70,6 +70,7 @@ def run(cls): "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", "Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: PyPy", "Topic :: Software Development :: Libraries :: Python Modules", From 38e8d045e33b894bad30f55c212f8ff497a5a513 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 3 Nov 2023 00:20:15 -0400 Subject: [PATCH 17/38] Update conn.py to catch OSError in case of failed import (#2407) Closes #2399. --- kafka/conn.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/conn.py b/kafka/conn.py index cac354875..1efb8a0a1 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -78,7 +78,7 @@ class SSLWantWriteError(Exception): try: import gssapi from gssapi.raw.misc import GSSError -except ImportError: +except (ImportError, OSError): #no gssapi available, will disable gssapi mechanism gssapi = None GSSError = None From a1d268a95f34ed9d1b42b2e5dfc36dab6fbbc1e5 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 3 Nov 2023 00:30:54 -0400 Subject: [PATCH 18/38] Update PYTHON_LATEST in python-package.yml to 3.12 --- .github/workflows/python-package.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 6f9ef58a1..5829d899a 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -15,7 +15,7 @@ env: FORCE_COLOR: "1" # Make tools pretty. PIP_DISABLE_PIP_VERSION_CHECK: "1" PIP_NO_PYTHON_VERSION_WARNING: "1" - PYTHON_LATEST: "3.11" + PYTHON_LATEST: "3.12" KAFKA_LATEST: "2.6.0" # For re-actors/checkout-python-sdist From 364397c1b32ab3b8440d315516f46edfcfb7efbb Mon Sep 17 00:00:00 2001 From: rootlulu <110612150+rootlulu@users.noreply.github.com> Date: Sat, 4 Nov 2023 10:11:52 +0800 Subject: [PATCH 19/38] [FIX] suitablt for the high vresion python. (#2394) * [FIX] suitablt for the high vresion python. it won't import Mapping from collections at python3.11. tested it worked from python3.6 to 3.11.2. * Update selectors34.py to have conditional importing of Mapping from collections --------- Co-authored-by: William Barnhart --- kafka/vendor/selectors34.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/kafka/vendor/selectors34.py b/kafka/vendor/selectors34.py index ebf5d515e..787490340 100644 --- a/kafka/vendor/selectors34.py +++ b/kafka/vendor/selectors34.py @@ -15,7 +15,11 @@ from __future__ import absolute_import from abc import ABCMeta, abstractmethod -from collections import namedtuple, Mapping +from collections import namedtuple +try: + from collections.abc import Mapping +except ImportError: + from collections import Mapping from errno import EINTR import math import select From 0864817de97549ad71e7bc2432c53108c5806cf1 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Mon, 6 Nov 2023 23:20:17 -0500 Subject: [PATCH 20/38] Update python-package.yml to publish to PyPi for every release (#2381) I know that the typical release is uploaded to PyPi manually, however I figure I'd draft a PR with these changes because having the option to start doing this is worthwhile. More info can be found on https://github.com/pypa/gh-action-pypi-publish. --- .github/workflows/python-package.yml | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 5829d899a..f60926c0e 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -178,3 +178,21 @@ jobs: uses: re-actors/alls-green@release/v1 with: jobs: ${{ toJSON(needs) }} + publish: + name: đŸ“Ļ Publish to PyPI + runs-on: ubuntu-latest + needs: [build-sdist] + permissions: + id-token: write + environment: pypi + if: github.event_name == 'release' && github.event.action == 'created' + steps: + - name: Download the sdist artifact + uses: actions/download-artifact@v3 + with: + name: artifact + path: dist + - name: Publish package to PyPI + uses: pypa/gh-action-pypi-publish@release/v1 + with: + password: ${{ secrets.PYPI_API_TOKEN }} From 43822d05749b308ae638e0485bfc24a91583411f Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 13 Dec 2023 20:32:50 -0500 Subject: [PATCH 21/38] Bump github/codeql-action from 2 to 3 (#2419) Bumps [github/codeql-action](https://github.com/github/codeql-action) from 2 to 3. - [Release notes](https://github.com/github/codeql-action/releases) - [Changelog](https://github.com/github/codeql-action/blob/main/CHANGELOG.md) - [Commits](https://github.com/github/codeql-action/compare/v2...v3) --- updated-dependencies: - dependency-name: github/codeql-action dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/codeql-analysis.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index 0d5078b39..4f6360b71 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -40,7 +40,7 @@ jobs: # Initializes the CodeQL tools for scanning. - name: Initialize CodeQL - uses: github/codeql-action/init@v2 + uses: github/codeql-action/init@v3 with: languages: ${{ matrix.language }} # If you wish to specify custom queries, you can do so here or in a config file. @@ -51,7 +51,7 @@ jobs: # Autobuild attempts to build any compiled languages (C/C++, C#, or Java). # If this step fails, then you should remove it and run the build manually (see below) - name: Autobuild - uses: github/codeql-action/autobuild@v2 + uses: github/codeql-action/autobuild@v3 # ℹī¸ Command-line programs to run using the OS shell. # 📚 https://git.io/JvXDl @@ -64,4 +64,4 @@ jobs: # make bootstrap # make release - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@v2 + uses: github/codeql-action/analyze@v3 From e9dfaf9c48d898ea3e24538cb3d189d479898bfe Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 13 Dec 2023 20:32:59 -0500 Subject: [PATCH 22/38] Bump actions/setup-python from 4 to 5 (#2418) Bumps [actions/setup-python](https://github.com/actions/setup-python) from 4 to 5. - [Release notes](https://github.com/actions/setup-python/releases) - [Commits](https://github.com/actions/setup-python/compare/v4...v5) --- updated-dependencies: - dependency-name: actions/setup-python dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/python-package.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index f60926c0e..9e0c2007c 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -32,7 +32,7 @@ jobs: with: fetch-depth: 0 - name: Set up Python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ env.PYTHON_LATEST }} cache: pip @@ -83,7 +83,7 @@ jobs: distribution: temurin java-version: 11 - name: Set up Python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} cache: pip @@ -139,7 +139,7 @@ jobs: distribution: temurin java-version: 8 - name: Set up Python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ env.PYTHON_LATEST }} cache: pip From b68f61d49556377bf111bebb82f8f2bd360cc6f7 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 13 Dec 2023 21:11:04 -0500 Subject: [PATCH 23/38] Bump actions/setup-java from 3 to 4 (#2417) Bumps [actions/setup-java](https://github.com/actions/setup-java) from 3 to 4. - [Release notes](https://github.com/actions/setup-java/releases) - [Commits](https://github.com/actions/setup-java/compare/v3...v4) --- updated-dependencies: - dependency-name: actions/setup-java dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/python-package.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 9e0c2007c..59ad718cf 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -78,7 +78,7 @@ jobs: with: fetch-depth: 0 - name: Setup java - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: distribution: temurin java-version: 11 @@ -134,7 +134,7 @@ jobs: with: fetch-depth: 0 - name: Setup java - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: distribution: temurin java-version: 8 From b95e46da7f22f2a226383691d5a5224e04463037 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Thu, 7 Mar 2024 10:31:28 -0500 Subject: [PATCH 24/38] Rename project from kafka-python to kafka-python-ng (#1) --- CHANGES.md | 2 +- Makefile | 2 +- README.rst | 40 ++++++++++++++++++++-------------------- docs/Makefile | 8 ++++---- docs/apidoc/modules.rst | 2 +- docs/changelog.rst | 2 +- docs/compatibility.rst | 18 +++++++++--------- docs/conf.py | 10 +++++----- docs/index.rst | 16 ++++++++-------- docs/install.rst | 20 ++++++++++---------- docs/license.rst | 6 +++--- docs/support.rst | 2 +- docs/tests.rst | 10 +++++----- kafka/version.py | 10 +++++++++- setup.py | 11 ++++++----- 15 files changed, 84 insertions(+), 75 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 097c55db6..ccec6b5c3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -413,7 +413,7 @@ Some of the major changes include: * SASL authentication is working (we think) * Removed several circular references to improve gc on close() -Thanks to all contributors -- the state of the kafka-python community is strong! +Thanks to all contributors -- the state of the kafka-python-ng community is strong! Detailed changelog are listed below: diff --git a/Makefile b/Makefile index fc8fa5b21..9d7d89f4d 100644 --- a/Makefile +++ b/Makefile @@ -33,7 +33,7 @@ cov-local: build-integration @echo "open file://`pwd`/htmlcov/index.html" # Check the readme for syntax errors, which can lead to invalid formatting on -# PyPi homepage (https://pypi.python.org/pypi/kafka-python) +# PyPi homepage (https://pypi.python.org/pypi/kafka-python-ng) check-readme: python setup.py check -rms diff --git a/README.rst b/README.rst index 78a92a884..f185bf690 100644 --- a/README.rst +++ b/README.rst @@ -2,27 +2,27 @@ Kafka Python client ------------------------ .. image:: https://img.shields.io/badge/kafka-2.6%2C%202.5%2C%202.4%2C%202.3%2C%202.2%2C%202.1%2C%202.0%2C%201.1%2C%201.0%2C%200.11%2C%200.10%2C%200.9%2C%200.8-brightgreen.svg - :target: https://kafka-python.readthedocs.io/en/master/compatibility.html -.. image:: https://img.shields.io/pypi/pyversions/kafka-python.svg - :target: https://pypi.python.org/pypi/kafka-python -.. image:: https://coveralls.io/repos/dpkp/kafka-python/badge.svg?branch=master&service=github - :target: https://coveralls.io/github/dpkp/kafka-python?branch=master + :target: https://kafka-python-ng.readthedocs.io/en/master/compatibility.html +.. image:: https://img.shields.io/pypi/pyversions/kafka-python-ng.svg + :target: https://pypi.python.org/pypi/kafka-python-ng +.. image:: https://coveralls.io/repos/wbarnha/kafka-python-ng-ng/badge.svg?branch=master&service=github + :target: https://coveralls.io/github/wbarnha/kafka-python-ng-ng?branch=master .. image:: https://img.shields.io/badge/license-Apache%202-blue.svg - :target: https://github.com/dpkp/kafka-python/blob/master/LICENSE -.. image:: https://img.shields.io/pypi/dw/kafka-python.svg + :target: https://github.com/wbarnha/kafka-python-ng-ng/blob/master/LICENSE +.. image:: https://img.shields.io/pypi/dw/kafka-python-ng.svg :target: https://pypistats.org/packages/kafka-python .. image:: https://img.shields.io/pypi/v/kafka-python.svg :target: https://pypi.org/project/kafka-python .. image:: https://img.shields.io/pypi/implementation/kafka-python - :target: https://github.com/dpkp/kafka-python/blob/master/setup.py + :target: https://github.com/wbarnha/kafka-python-ng/blob/master/setup.py Python client for the Apache Kafka distributed stream processing system. -kafka-python is designed to function much like the official java client, with a +kafka-python-ng is designed to function much like the official java client, with a sprinkling of pythonic interfaces (e.g., consumer iterators). -kafka-python is best used with newer brokers (0.9+), but is backwards-compatible with +kafka-python-ng is best used with newer brokers (0.9+), but is backwards-compatible with older versions (to 0.8.0). Some features will only be enabled on newer brokers. For example, fully coordinated consumer groups -- i.e., dynamic partition assignment to multiple consumers in the same group -- requires use of 0.9+ kafka @@ -32,13 +32,13 @@ check code (perhaps using zookeeper or consul). For older brokers, you can achieve something similar by manually assigning different partitions to each consumer instance with config management tools like chef, ansible, etc. This approach will work fine, though it does not support rebalancing on failures. -See +See for more details. Please note that the master branch may contain unreleased features. For release documentation, please see readthedocs and/or python's inline help. ->>> pip install kafka-python +>>> pip install kafka-python-ng KafkaConsumer @@ -48,7 +48,7 @@ KafkaConsumer is a high-level message consumer, intended to operate as similarly as possible to the official java client. Full support for coordinated consumer groups requires use of kafka brokers that support the Group APIs: kafka v0.9+. -See +See for API and configuration details. The consumer iterator returns ConsumerRecords, which are simple namedtuples @@ -91,7 +91,7 @@ KafkaProducer KafkaProducer is a high-level, asynchronous message producer. The class is intended to operate as similarly as possible to the official java client. -See +See for more details. >>> from kafka import KafkaProducer @@ -146,7 +146,7 @@ multiprocessing is recommended. Compression *********** -kafka-python supports the following compression formats: +kafka-python-ng supports the following compression formats: - gzip - LZ4 @@ -154,23 +154,23 @@ kafka-python supports the following compression formats: - Zstandard (zstd) gzip is supported natively, the others require installing additional libraries. -See for more information. +See for more information. Optimized CRC32 Validation ************************** -Kafka uses CRC32 checksums to validate messages. kafka-python includes a pure +Kafka uses CRC32 checksums to validate messages. kafka-python-ng includes a pure python implementation for compatibility. To improve performance for high-throughput -applications, kafka-python will use `crc32c` for optimized native code if installed. -See for installation instructions. +applications, kafka-python-ng will use `crc32c` for optimized native code if installed. +See for installation instructions. See https://pypi.org/project/crc32c/ for details on the underlying crc32c lib. Protocol ******** -A secondary goal of kafka-python is to provide an easy-to-use protocol layer +A secondary goal of kafka-python-ng is to provide an easy-to-use protocol layer for interacting with kafka brokers via the python repl. This is useful for testing, probing, and general experimentation. The protocol support is leveraged to enable a KafkaClient.check_version() method that diff --git a/docs/Makefile b/docs/Makefile index b27cf7742..31e74e1aa 100644 --- a/docs/Makefile +++ b/docs/Makefile @@ -85,17 +85,17 @@ qthelp: @echo @echo "Build finished; now you can run "qcollectiongenerator" with the" \ ".qhcp project file in $(BUILDDIR)/qthelp, like this:" - @echo "# qcollectiongenerator $(BUILDDIR)/qthelp/kafka-python.qhcp" + @echo "# qcollectiongenerator $(BUILDDIR)/qthelp/kafka-python-ng.qhcp" @echo "To view the help file:" - @echo "# assistant -collectionFile $(BUILDDIR)/qthelp/kafka-python.qhc" + @echo "# assistant -collectionFile $(BUILDDIR)/qthelp/kafka-python-ng.qhc" devhelp: $(SPHINXBUILD) -b devhelp $(ALLSPHINXOPTS) $(BUILDDIR)/devhelp @echo @echo "Build finished." @echo "To view the help file:" - @echo "# mkdir -p $$HOME/.local/share/devhelp/kafka-python" - @echo "# ln -s $(BUILDDIR)/devhelp $$HOME/.local/share/devhelp/kafka-python" + @echo "# mkdir -p $$HOME/.local/share/devhelp/kafka-python-ng" + @echo "# ln -s $(BUILDDIR)/devhelp $$HOME/.local/share/devhelp/kafka-python-ng" @echo "# devhelp" epub: diff --git a/docs/apidoc/modules.rst b/docs/apidoc/modules.rst index 066fc6523..29be3486f 100644 --- a/docs/apidoc/modules.rst +++ b/docs/apidoc/modules.rst @@ -1,4 +1,4 @@ -kafka-python API +kafka-python-ng API **************** .. toctree:: diff --git a/docs/changelog.rst b/docs/changelog.rst index 446b29021..9d3cb6512 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -484,7 +484,7 @@ Some of the major changes include: * SASL authentication is working (we think) * Removed several circular references to improve gc on close() -Thanks to all contributors -- the state of the kafka-python community is strong! +Thanks to all contributors -- the state of the kafka-python-ng community is strong! Detailed changelog are listed below: diff --git a/docs/compatibility.rst b/docs/compatibility.rst index b3ad00634..e8e1342c3 100644 --- a/docs/compatibility.rst +++ b/docs/compatibility.rst @@ -2,20 +2,20 @@ Compatibility ------------- .. image:: https://img.shields.io/badge/kafka-2.6%2C%202.5%2C%202.4%2C%202.3%2C%202.2%2C%202.1%2C%202.0%2C%201.1%2C%201.0%2C%200.11%2C%200.10%2C%200.9%2C%200.8-brightgreen.svg - :target: https://kafka-python.readthedocs.io/compatibility.html -.. image:: https://img.shields.io/pypi/pyversions/kafka-python.svg - :target: https://pypi.python.org/pypi/kafka-python + :target: https://kafka-python-ng.readthedocs.io/compatibility.html +.. image:: https://img.shields.io/pypi/pyversions/kafka-python-ng.svg + :target: https://pypi.python.org/pypi/kafka-python-ng -kafka-python is compatible with (and tested against) broker versions 2.6 -through 0.8.0 . kafka-python is not compatible with the 0.8.2-beta release. +kafka-python-ng is compatible with (and tested against) broker versions 2.6 +through 0.8.0 . kafka-python-ng is not compatible with the 0.8.2-beta release. -Because the kafka server protocol is backwards compatible, kafka-python is +Because the kafka server protocol is backwards compatible, kafka-python-ng is expected to work with newer broker releases as well. -Although kafka-python is tested and expected to work on recent broker versions, +Although kafka-python-ng is tested and expected to work on recent broker versions, not all features are supported. Specifically, authentication codecs, and transactional producer/consumer support are not fully implemented. PRs welcome! -kafka-python is tested on python 2.7, 3.4, 3.7, 3.8 and pypy2.7. +kafka-python-ng is tested on python 2.7, 3.4, 3.7, 3.8 and pypy2.7. -Builds and tests via Travis-CI. See https://travis-ci.org/dpkp/kafka-python +Builds and tests via Travis-CI. See https://travis-ci.org/wbarnha/kafka-python-ng diff --git a/docs/conf.py b/docs/conf.py index efa8d0807..e5b013b0d 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# kafka-python documentation build configuration file, created by +# kafka-python-ng documentation build configuration file, created by # sphinx-quickstart on Sun Jan 4 12:21:50 2015. # # This file is execfile()d with the current directory set to its @@ -47,7 +47,7 @@ master_doc = 'index' # General information about the project. -project = u'kafka-python' +project = u'kafka-python-ng' copyright = u'2016 -- Dana Powers, David Arthur, and Contributors' # The version info for the project you're documenting, acts as replacement for @@ -201,7 +201,7 @@ # (source start file, target name, title, # author, documentclass [howto, manual, or own class]). latex_documents = [ - ('index', 'kafka-python.tex', u'kafka-python Documentation', + ('index', 'kafka-python-ng.tex', u'kafka-python-ng Documentation', u'Dana Powers', 'manual'), ] @@ -231,7 +231,7 @@ # One entry per manual page. List of tuples # (source start file, name, description, authors, manual section). man_pages = [ - ('index', 'kafka-python', u'kafka-python Documentation', + ('index', 'kafka-python-ng', u'kafka-python-ng Documentation', [u'Dana Powers'], 1) ] @@ -245,7 +245,7 @@ # (source start file, target name, title, author, # dir menu entry, description, category) texinfo_documents = [ - ('index', 'kafka-python', u'kafka-python Documentation', + ('index', 'kafka-python-ng', u'kafka-python-ng Documentation', u'Dana Powers', 'kafka-python', 'One line description of project.', 'Miscellaneous'), ] diff --git a/docs/index.rst b/docs/index.rst index 91e5086cc..92b998d92 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,16 +1,16 @@ -kafka-python +kafka-python-ng ############ .. image:: https://img.shields.io/badge/kafka-2.6%2C%202.5%2C%202.4%2C%202.3%2C%202.2%2C%202.1%2C%202.0%2C%201.1%2C%201.0%2C%200.11%2C%200.10%2C%200.9%2C%200.8-brightgreen.svg :target: https://kafka-python.readthedocs.io/en/master/compatibility.html .. image:: https://img.shields.io/pypi/pyversions/kafka-python.svg - :target: https://pypi.python.org/pypi/kafka-python -.. image:: https://coveralls.io/repos/dpkp/kafka-python/badge.svg?branch=master&service=github - :target: https://coveralls.io/github/dpkp/kafka-python?branch=master -.. image:: https://travis-ci.org/dpkp/kafka-python.svg?branch=master - :target: https://travis-ci.org/dpkp/kafka-python + :target: https://pypi.python.org/pypi/kafka-python-ng +.. image:: https://coveralls.io/repos/wbarnha/kafka-python-ng/badge.svg?branch=master&service=github + :target: https://coveralls.io/github/wbarnha/kafka-python-ng?branch=master +.. image:: https://travis-ci.org/wbarnha/kafka-python-ng.svg?branch=master + :target: https://travis-ci.org/wbarnha/kafka-python-ng .. image:: https://img.shields.io/badge/license-Apache%202-blue.svg - :target: https://github.com/dpkp/kafka-python/blob/master/LICENSE + :target: https://github.com/wbarnha/kafka-python-ng/blob/master/LICENSE Python client for the Apache Kafka distributed stream processing system. kafka-python is designed to function much like the official java client, with a @@ -31,7 +31,7 @@ failures. See `Compatibility `_ for more details. Please note that the master branch may contain unreleased features. For release documentation, please see readthedocs and/or python's inline help. ->>> pip install kafka-python +>>> pip install kafka-python-ng KafkaConsumer diff --git a/docs/install.rst b/docs/install.rst index 19901ee29..6ed917cd4 100644 --- a/docs/install.rst +++ b/docs/install.rst @@ -9,9 +9,9 @@ Pip: .. code:: bash - pip install kafka-python + pip install kafka-python-ng -Releases are also listed at https://github.com/dpkp/kafka-python/releases +Releases are also listed at https://github.com/wbarnha/kafka-python-ng/releases Bleeding-Edge @@ -19,21 +19,21 @@ Bleeding-Edge .. code:: bash - git clone https://github.com/dpkp/kafka-python - pip install ./kafka-python + git clone https://github.com/wbarnha/kafka-python-ng + pip install ./kafka-python-ng Optional crc32c install *********************** -Highly recommended if you are using Kafka 11+ brokers. For those `kafka-python` +Highly recommended if you are using Kafka 11+ brokers. For those `kafka-python-ng` uses a new message protocol version, that requires calculation of `crc32c`, -which differs from the `zlib.crc32` hash implementation. By default `kafka-python` +which differs from the `zlib.crc32` hash implementation. By default `kafka-python-ng` calculates it in pure python, which is quite slow. To speed it up we optionally support https://pypi.python.org/pypi/crc32c package if it's installed. .. code:: bash - pip install 'kafka-python[crc32c]' + pip install 'kafka-python-ng[crc32c]' Optional ZSTD install @@ -41,7 +41,7 @@ Optional ZSTD install To enable ZSTD compression/decompression, install python-zstandard: ->>> pip install 'kafka-python[zstd]' +>>> pip install 'kafka-python-ng[zstd]' Optional LZ4 install @@ -49,7 +49,7 @@ Optional LZ4 install To enable LZ4 compression/decompression, install python-lz4: ->>> pip install 'kafka-python[lz4]' +>>> pip install 'kafka-python-ng[lz4]' Optional Snappy install @@ -90,4 +90,4 @@ Install the `python-snappy` module .. code:: bash - pip install 'kafka-python[snappy]' + pip install 'kafka-python-ng[snappy]' diff --git a/docs/license.rst b/docs/license.rst index e9d5c9adb..016a916ba 100644 --- a/docs/license.rst +++ b/docs/license.rst @@ -2,9 +2,9 @@ License ------- .. image:: https://img.shields.io/badge/license-Apache%202-blue.svg - :target: https://github.com/dpkp/kafka-python/blob/master/LICENSE + :target: https://github.com/wbarnha/kafka-python-ng/blob/master/LICENSE -Apache License, v2.0. See `LICENSE `_. +Apache License, v2.0. See `LICENSE `_. Copyright 2016, Dana Powers, David Arthur, and Contributors -(See `AUTHORS `_). +(See `AUTHORS `_). diff --git a/docs/support.rst b/docs/support.rst index 63d4a86a2..25014b3fd 100644 --- a/docs/support.rst +++ b/docs/support.rst @@ -1,7 +1,7 @@ Support ------- -For support, see github issues at https://github.com/dpkp/kafka-python +For support, see github issues at https://github.com/wbarnha/kafka-python-ng Limited IRC chat at #kafka-python on freenode (general chat is #apache-kafka). diff --git a/docs/tests.rst b/docs/tests.rst index 561179ca5..763c2e54d 100644 --- a/docs/tests.rst +++ b/docs/tests.rst @@ -1,17 +1,17 @@ Tests ===== -.. image:: https://coveralls.io/repos/dpkp/kafka-python/badge.svg?branch=master&service=github - :target: https://coveralls.io/github/dpkp/kafka-python?branch=master -.. image:: https://travis-ci.org/dpkp/kafka-python.svg?branch=master - :target: https://travis-ci.org/dpkp/kafka-python +.. image:: https://coveralls.io/repos/wbarnha/kafka-python-ng/badge.svg?branch=master&service=github + :target: https://coveralls.io/github/wbarnha/kafka-python-ng?branch=master +.. image:: https://travis-ci.org/wbarnha/kafka-python-ng.svg?branch=master + :target: https://travis-ci.org/wbarnha/kafka-python-ng Test environments are managed via tox. The test suite is run via pytest. Linting is run via pylint, but is generally skipped on pypy due to pylint compatibility / performance issues. -For test coverage details, see https://coveralls.io/github/dpkp/kafka-python +For test coverage details, see https://coveralls.io/github/wbarnha/kafka-python-ng The test suite includes unit tests that mock network interfaces, as well as integration tests that setup and teardown kafka broker (and zookeeper) diff --git a/kafka/version.py b/kafka/version.py index 06306bd1f..8a26a1868 100644 --- a/kafka/version.py +++ b/kafka/version.py @@ -1 +1,9 @@ -__version__ = '2.0.3-dev' +import sys + +if sys.version_info < (3, 8): + from importlib_metadata import version +else: + from importlib.metadata import version + + +__version__ = version("kafka-python-ng") diff --git a/setup.py b/setup.py index 77043da04..75d2b8bcb 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,6 @@ # Pull version from source without importing # since we can't import something we haven't built yet :) -exec(open('kafka/version.py').read()) class Tox(Command): @@ -32,9 +31,9 @@ def run(cls): README = f.read() setup( - name="kafka-python", - version=__version__, - + name="kafka-python-ng", + use_scm_version=True, + setup_requires=["setuptools_scm"], tests_require=test_require, extras_require={ "crc32c": ["crc32c"], @@ -46,7 +45,9 @@ def run(cls): packages=find_packages(exclude=['test']), author="Dana Powers", author_email="dana.powers@gmail.com", - url="https://github.com/dpkp/kafka-python", + maintainer="William Barnhart", + maintainer_email="williambbarnhart@gmail.com", + url="https://github.com/wbarnha/kafka-python-ng", license="Apache License 2.0", description="Pure Python client for Apache Kafka", long_description=README, From 78c74c095172ec30df875bbf0861aeee4ed441c8 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Thu, 7 Mar 2024 11:14:36 -0500 Subject: [PATCH 25/38] Fix artifact downloads for release --- .github/workflows/python-package.yml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 59ad718cf..cee99ef5c 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -41,7 +41,7 @@ jobs: - name: Build artifacts run: python -m build - name: Upload built artifacts for testing - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: ${{ env.sdist-artifact }} # NOTE: Exact expected file names are specified here @@ -187,11 +187,11 @@ jobs: environment: pypi if: github.event_name == 'release' && github.event.action == 'created' steps: - - name: Download the sdist artifact - uses: actions/download-artifact@v3 + - name: Download the artifacts + uses: actions/download-artifact@v4 with: - name: artifact - path: dist + name: ${{ env.sdist-artifact }} + path: dist/${{ env.sdist-name }} - name: Publish package to PyPI uses: pypa/gh-action-pypi-publish@release/v1 with: From e7960197430d80bbd9aa8ab1d4c4d8c67f0cf624 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 8 Mar 2024 14:02:49 -0500 Subject: [PATCH 26/38] Fix badge links in README.rst --- README.rst | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/README.rst b/README.rst index f185bf690..8a5c71b38 100644 --- a/README.rst +++ b/README.rst @@ -5,15 +5,15 @@ Kafka Python client :target: https://kafka-python-ng.readthedocs.io/en/master/compatibility.html .. image:: https://img.shields.io/pypi/pyversions/kafka-python-ng.svg :target: https://pypi.python.org/pypi/kafka-python-ng -.. image:: https://coveralls.io/repos/wbarnha/kafka-python-ng-ng/badge.svg?branch=master&service=github - :target: https://coveralls.io/github/wbarnha/kafka-python-ng-ng?branch=master +.. image:: https://coveralls.io/repos/wbarnha/kafka-python-ng/badge.svg?branch=master&service=github + :target: https://coveralls.io/github/wbarnha/kafka-python-ng?branch=master .. image:: https://img.shields.io/badge/license-Apache%202-blue.svg - :target: https://github.com/wbarnha/kafka-python-ng-ng/blob/master/LICENSE + :target: https://github.com/wbarnha/kafka-python-ng/blob/master/LICENSE .. image:: https://img.shields.io/pypi/dw/kafka-python-ng.svg - :target: https://pypistats.org/packages/kafka-python + :target: https://pypistats.org/packages/kafka-python-ng .. image:: https://img.shields.io/pypi/v/kafka-python.svg - :target: https://pypi.org/project/kafka-python -.. image:: https://img.shields.io/pypi/implementation/kafka-python + :target: https://pypi.org/project/kafka-python-ng +.. image:: https://img.shields.io/pypi/implementation/kafka-python-ng :target: https://github.com/wbarnha/kafka-python-ng/blob/master/setup.py From 38e159a1863dba4c3dc36f0338f6e74f4abaa7c2 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 8 Mar 2024 18:06:20 -0500 Subject: [PATCH 27/38] Reconfigure tests to complete in a more timely manner and skip some iterations for Kafka 0.8.2 and Python 3.12 (#159) * skip failing tests for PyPy since they work locally * Reconfigure tests for PyPy and 3.12 * Skip partitioner tests in test_partitioner.py if 3.12 and 0.8.2 * Update test_partitioner.py * Update test_producer.py * Timeout tests after ten minutes * Set 0.8.2.2 to be experimental from hereon * Formally support PyPy 3.9 --- .github/workflows/python-package.yml | 10 +++++++--- test/test_admin_integration.py | 5 +++++ test/test_consumer_group.py | 4 ++++ test/test_partitioner.py | 1 + test/test_producer.py | 6 +++++- 5 files changed, 22 insertions(+), 4 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index cee99ef5c..ef49b4e58 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -66,10 +66,9 @@ jobs: - "3.10" - "3.11" - "3.12" + - "pypy3.9" experimental: [ false ] include: - - python-version: "pypy3.9" - experimental: true - python-version: "~3.13.0-0" experimental: true steps: @@ -115,11 +114,11 @@ jobs: needs: - build-sdist runs-on: ubuntu-latest + timeout-minutes: 10 strategy: fail-fast: false matrix: kafka-version: - - "0.8.2.2" - "0.9.0.1" - "0.10.2.2" - "0.11.0.2" @@ -128,6 +127,11 @@ jobs: - "2.4.0" - "2.5.0" - "2.6.0" + experimental: [false] + include: + - kafka-version: '0.8.2.2' + experimental: true + continue-on-error: ${{ matrix.experimental }} steps: - name: Checkout the source code uses: actions/checkout@v4 diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index 06c40a223..283023049 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -1,3 +1,5 @@ +import platform + import pytest from logging import info @@ -151,6 +153,9 @@ def test_describe_consumer_group_does_not_exist(kafka_admin_client): group_description = kafka_admin_client.describe_consumer_groups(['test']) +@pytest.mark.skipif( + platform.python_implementation() == "PyPy", reason="Works on PyPy if run locally, but not in CI/CD pipeline." +) @pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11') def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_factory, topic): """Tests that the describe consumer group call returns valid consumer group information diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index 58dc7ebf9..4904ffeea 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -1,5 +1,6 @@ import collections import logging +import platform import threading import time @@ -40,6 +41,9 @@ def test_consumer_topics(kafka_broker, topic): consumer.close() +@pytest.mark.skipif( + platform.python_implementation() == "PyPy", reason="Works on PyPy if run locally, but not in CI/CD pipeline." +) @pytest.mark.skipif(env_kafka_version() < (0, 9), reason='Unsupported Kafka Version') def test_group(kafka_broker, topic): num_partitions = 4 diff --git a/test/test_partitioner.py b/test/test_partitioner.py index 853fbf69e..09fa0412a 100644 --- a/test/test_partitioner.py +++ b/test/test_partitioner.py @@ -2,6 +2,7 @@ import pytest + from kafka.partitioner import DefaultPartitioner, murmur2 diff --git a/test/test_producer.py b/test/test_producer.py index 7263130d1..15c244113 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -1,5 +1,6 @@ import gc import platform +import sys import time import threading @@ -10,6 +11,7 @@ from test.testutil import env_kafka_version, random_string +@pytest.mark.skipif(env_kafka_version() <= (0, 8, 2) and sys.version_info > (3, 11), reason="Kafka 0.8.2 and earlier not supported by 3.12") def test_buffer_pool(): pool = SimpleBufferPool(1000, 1000) @@ -21,8 +23,8 @@ def test_buffer_pool(): buf2 = pool.allocate(1000, 1000) assert buf2.read() == b'' - @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") +@pytest.mark.skipif(env_kafka_version() <= (0, 8, 2) and sys.version_info > (3, 11), reason="Kafka 0.8.2 and earlier not supported by 3.12") @pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd']) def test_end_to_end(kafka_broker, compression): if compression == 'lz4': @@ -70,6 +72,7 @@ def test_end_to_end(kafka_broker, compression): @pytest.mark.skipif(platform.python_implementation() != 'CPython', reason='Test relies on CPython-specific gc policies') +@pytest.mark.skipif(env_kafka_version() <= (0, 8, 2) and sys.version_info > (3, 11), reason="Kafka 0.8.2 and earlier not supported by 3.12") def test_kafka_producer_gc_cleanup(): gc.collect() threads = threading.active_count() @@ -81,6 +84,7 @@ def test_kafka_producer_gc_cleanup(): @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") +@pytest.mark.skipif(env_kafka_version() <= (0, 8, 2) and sys.version_info > (3, 11), reason="Kafka 0.8.2 and earlier not supported by 3.12") @pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd']) def test_kafka_producer_proper_record_metadata(kafka_broker, compression): if compression == 'zstd' and env_kafka_version() < (2, 1, 0): From e76232106e4129d5ab2e279f546791446260b6b7 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 8 Mar 2024 19:49:57 -0500 Subject: [PATCH 28/38] Test Kafka 0.8.2.2 using Python 3.10 in the meantime (#161) * Test Kafka 0.8.2.2 using Python 3.11 in the meantime * Override PYTHON_LATEST conditionally in python-package.yml * Update python-package.yml * add python annotation to kafka version test matrix * Update python-package.yml * try python 3.10 --- .github/workflows/python-package.yml | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index ef49b4e58..d39eeccf8 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -110,7 +110,7 @@ jobs: KAFKA_VERSION: ${{ env.KAFKA_LATEST }} test-kafka: - name: Tests for Kafka ${{ matrix.kafka-version }} + name: Tests for Kafka ${{ matrix.kafka-version }} (Python ${{ matrix.python-version }}) needs: - build-sdist runs-on: ubuntu-latest @@ -127,10 +127,17 @@ jobs: - "2.4.0" - "2.5.0" - "2.6.0" + python-version: ['3.12'] experimental: [false] include: - kafka-version: '0.8.2.2' experimental: true + python-version: "3.12" + - kafka-version: '0.8.2.2' + experimental: false + python-version: "3.10" + env: + PYTHON_LATEST: ${{ matrix.python-version }} continue-on-error: ${{ matrix.experimental }} steps: - name: Checkout the source code @@ -145,7 +152,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v5 with: - python-version: ${{ env.PYTHON_LATEST }} + python-version: ${{ matrix.python-version }} cache: pip cache-dependency-path: | requirements-dev.txt From 00750aaa2eb7fe568e15d995d0bca12700bf94b9 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 8 Mar 2024 20:09:44 -0500 Subject: [PATCH 29/38] Remove support for EOL'ed versions of Python (#160) * Remove support for EOL'ed versions of Python * Update setup.py --- setup.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/setup.py b/setup.py index 75d2b8bcb..dd4e5de90 100644 --- a/setup.py +++ b/setup.py @@ -32,6 +32,7 @@ def run(cls): setup( name="kafka-python-ng", + python_requires=">=3.8", use_scm_version=True, setup_requires=["setuptools_scm"], tests_require=test_require, @@ -60,13 +61,6 @@ def run(cls): "Intended Audience :: Developers", "License :: OSI Approved :: Apache Software License", "Programming Language :: Python", - "Programming Language :: Python :: 2", - "Programming Language :: Python :: 2.7", - "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.4", - "Programming Language :: Python :: 3.5", - "Programming Language :: Python :: 3.6", - "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", From 5bd1323d3ed4d44081592d6518435fc89724318a Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 8 Mar 2024 20:21:06 -0500 Subject: [PATCH 30/38] Stop testing Python 3.13 in python-package.yml (#162) Too many MRs to review... so little time. --- .github/workflows/python-package.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index d39eeccf8..df0e4e489 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -68,9 +68,6 @@ jobs: - "3.12" - "pypy3.9" experimental: [ false ] - include: - - python-version: "~3.13.0-0" - experimental: true steps: - name: Checkout the source code uses: actions/checkout@v4 From cda8f81ae16ee087cd039a4277be13f2004d4d09 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 8 Mar 2024 20:44:41 -0500 Subject: [PATCH 31/38] Avoid 100% CPU usage while socket is closed (#156) After stop/start kafka service, kafka-python may use 100% CPU caused by busy-retry while the socket was closed. This fix the issue by unregister the socket if the fd is negative. Co-authored-by: Orange Kao --- kafka/client_async.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kafka/client_async.py b/kafka/client_async.py index 58f22d4ec..530a1f441 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -637,6 +637,9 @@ def _poll(self, timeout): self._sensors.select_time.record((end_select - start_select) * 1000000000) for key, events in ready: + if key.fileobj.fileno() < 0: + self._selector.unregister(key.fileobj) + if key.fileobj is self._wake_r: self._clear_wake_fd() continue From c02df0899f3b6a1e71f05fd91ff506b1b0c4c3eb Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 8 Mar 2024 22:45:43 -0500 Subject: [PATCH 32/38] Fix DescribeConfigsResponse_v1 config_source (#150) Co-authored-by: Ryar Nyah --- kafka/protocol/admin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index f9d61e5cd..41b4a9576 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -719,7 +719,7 @@ class DescribeConfigsResponse_v1(Response): ('config_names', String('utf-8')), ('config_value', String('utf-8')), ('read_only', Boolean), - ('is_default', Boolean), + ('config_source', Int8), ('is_sensitive', Boolean), ('config_synonyms', Array( ('config_name', String('utf-8')), From 65eacfb2e6900d789ad501c6d5c81fcbc420ed16 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sat, 9 Mar 2024 23:15:36 -0500 Subject: [PATCH 33/38] Fix base class of DescribeClientQuotasResponse_v0 (#144) Co-authored-by: Denis Otkidach --- kafka/protocol/admin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 41b4a9576..0bb1a7acc 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -925,7 +925,7 @@ class DeleteGroupsRequest_v1(Request): ] -class DescribeClientQuotasResponse_v0(Request): +class DescribeClientQuotasResponse_v0(Response): API_KEY = 48 API_VERSION = 0 SCHEMA = Schema( From e0ebe5dd3191778b35967b3a0dd22ca6e091a9b7 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sat, 9 Mar 2024 23:16:53 -0500 Subject: [PATCH 34/38] Update license_file to license_files (#131) The former has been deprecated since setuptools 56 Co-authored-by: micwoj92 <45581170+micwoj92@users.noreply.github.com> --- setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index 5c6311daf..76daa0897 100644 --- a/setup.cfg +++ b/setup.cfg @@ -2,4 +2,4 @@ universal=1 [metadata] -license_file = LICENSE +license_files = LICENSE From 26bb3eb6534cc1aff0a2a7751de7f7dedfdd4cd5 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sat, 9 Mar 2024 23:22:03 -0500 Subject: [PATCH 35/38] Update some RST documentation syntax (#130) * docs: Update syntax in README.rst * docs: Update code block syntax in docs/index.rst --------- Co-authored-by: HalfSweet <60973476+HalfSweet@users.noreply.github.com> --- README.rst | 165 +++++++++++++++++++++++++++++++------------------ docs/index.rst | 114 +++++++++++++++++++++------------- 2 files changed, 174 insertions(+), 105 deletions(-) diff --git a/README.rst b/README.rst index 8a5c71b38..b7acfc8a2 100644 --- a/README.rst +++ b/README.rst @@ -32,13 +32,19 @@ check code (perhaps using zookeeper or consul). For older brokers, you can achieve something similar by manually assigning different partitions to each consumer instance with config management tools like chef, ansible, etc. This approach will work fine, though it does not support rebalancing on failures. -See + +See https://kafka-python.readthedocs.io/en/master/compatibility.html + for more details. Please note that the master branch may contain unreleased features. For release documentation, please see readthedocs and/or python's inline help. ->>> pip install kafka-python-ng + +.. code-block:: bash + + $ pip install kafka-python-ng + KafkaConsumer @@ -48,42 +54,56 @@ KafkaConsumer is a high-level message consumer, intended to operate as similarly as possible to the official java client. Full support for coordinated consumer groups requires use of kafka brokers that support the Group APIs: kafka v0.9+. -See + +See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html + for API and configuration details. The consumer iterator returns ConsumerRecords, which are simple namedtuples that expose basic message attributes: topic, partition, offset, key, and value: ->>> from kafka import KafkaConsumer ->>> consumer = KafkaConsumer('my_favorite_topic') ->>> for msg in consumer: -... print (msg) +.. code-block:: python ->>> # join a consumer group for dynamic partition assignment and offset commits ->>> from kafka import KafkaConsumer ->>> consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group') ->>> for msg in consumer: -... print (msg) + from kafka import KafkaConsumer + consumer = KafkaConsumer('my_favorite_topic') + for msg in consumer: + print (msg) ->>> # manually assign the partition list for the consumer ->>> from kafka import TopicPartition ->>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234') ->>> consumer.assign([TopicPartition('foobar', 2)]) ->>> msg = next(consumer) +.. code-block:: python ->>> # Deserialize msgpack-encoded values ->>> consumer = KafkaConsumer(value_deserializer=msgpack.loads) ->>> consumer.subscribe(['msgpackfoo']) ->>> for msg in consumer: -... assert isinstance(msg.value, dict) + # join a consumer group for dynamic partition assignment and offset commits + from kafka import KafkaConsumer + consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group') + for msg in consumer: + print (msg) ->>> # Access record headers. The returned value is a list of tuples ->>> # with str, bytes for key and value ->>> for msg in consumer: -... print (msg.headers) +.. code-block:: python ->>> # Get consumer metrics ->>> metrics = consumer.metrics() + # manually assign the partition list for the consumer + from kafka import TopicPartition + consumer = KafkaConsumer(bootstrap_servers='localhost:1234') + consumer.assign([TopicPartition('foobar', 2)]) + msg = next(consumer) + +.. code-block:: python + + # Deserialize msgpack-encoded values + consumer = KafkaConsumer(value_deserializer=msgpack.loads) + consumer.subscribe(['msgpackfoo']) + for msg in consumer: + assert isinstance(msg.value, dict) + +.. code-block:: python + + # Access record headers. The returned value is a list of tuples + # with str, bytes for key and value + for msg in consumer: + print (msg.headers) + +.. code-block:: python + + # Get consumer metrics + metrics = consumer.metrics() KafkaProducer @@ -91,46 +111,66 @@ KafkaProducer KafkaProducer is a high-level, asynchronous message producer. The class is intended to operate as similarly as possible to the official java client. -See + +See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html + for more details. ->>> from kafka import KafkaProducer ->>> producer = KafkaProducer(bootstrap_servers='localhost:1234') ->>> for _ in range(100): -... producer.send('foobar', b'some_message_bytes') +.. code-block:: python + + from kafka import KafkaProducer + producer = KafkaProducer(bootstrap_servers='localhost:1234') + for _ in range(100): + producer.send('foobar', b'some_message_bytes') + +.. code-block:: python + + # Block until a single message is sent (or timeout) + future = producer.send('foobar', b'another_message') + result = future.get(timeout=60) + +.. code-block:: python + + # Block until all pending messages are at least put on the network + # NOTE: This does not guarantee delivery or success! It is really + # only useful if you configure internal batching using linger_ms + producer.flush() + +.. code-block:: python ->>> # Block until a single message is sent (or timeout) ->>> future = producer.send('foobar', b'another_message') ->>> result = future.get(timeout=60) + # Use a key for hashed-partitioning + producer.send('foobar', key=b'foo', value=b'bar') ->>> # Block until all pending messages are at least put on the network ->>> # NOTE: This does not guarantee delivery or success! It is really ->>> # only useful if you configure internal batching using linger_ms ->>> producer.flush() +.. code-block:: python ->>> # Use a key for hashed-partitioning ->>> producer.send('foobar', key=b'foo', value=b'bar') + # Serialize json messages + import json + producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8')) + producer.send('fizzbuzz', {'foo': 'bar'}) ->>> # Serialize json messages ->>> import json ->>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8')) ->>> producer.send('fizzbuzz', {'foo': 'bar'}) +.. code-block:: python ->>> # Serialize string keys ->>> producer = KafkaProducer(key_serializer=str.encode) ->>> producer.send('flipflap', key='ping', value=b'1234') + # Serialize string keys + producer = KafkaProducer(key_serializer=str.encode) + producer.send('flipflap', key='ping', value=b'1234') ->>> # Compress messages ->>> producer = KafkaProducer(compression_type='gzip') ->>> for i in range(1000): -... producer.send('foobar', b'msg %d' % i) +.. code-block:: python ->>> # Include record headers. The format is list of tuples with string key ->>> # and bytes value. ->>> producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')]) + # Compress messages + producer = KafkaProducer(compression_type='gzip') + for i in range(1000): + producer.send('foobar', b'msg %d' % i) ->>> # Get producer performance metrics ->>> metrics = producer.metrics() +.. code-block:: python + + # Include record headers. The format is list of tuples with string key + # and bytes value. + producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')]) + +.. code-block:: python + + # Get producer performance metrics + metrics = producer.metrics() Thread safety @@ -154,7 +194,9 @@ kafka-python-ng supports the following compression formats: - Zstandard (zstd) gzip is supported natively, the others require installing additional libraries. -See for more information. + +See https://kafka-python.readthedocs.io/en/master/install.html for more information. + Optimized CRC32 Validation @@ -162,8 +204,9 @@ Optimized CRC32 Validation Kafka uses CRC32 checksums to validate messages. kafka-python-ng includes a pure python implementation for compatibility. To improve performance for high-throughput -applications, kafka-python-ng will use `crc32c` for optimized native code if installed. -See for installation instructions. +applications, kafka-python will use `crc32c` for optimized native code if installed. +See https://kafka-python.readthedocs.io/en/master/install.html for installation instructions. + See https://pypi.org/project/crc32c/ for details on the underlying crc32c lib. diff --git a/docs/index.rst b/docs/index.rst index 92b998d92..779ad997b 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -31,7 +31,11 @@ failures. See `Compatibility `_ for more details. Please note that the master branch may contain unreleased features. For release documentation, please see readthedocs and/or python's inline help. ->>> pip install kafka-python-ng + +.. code:: bash + + pip install kafka-python-ng + KafkaConsumer @@ -47,28 +51,36 @@ See `KafkaConsumer `_ for API and configuration detai The consumer iterator returns ConsumerRecords, which are simple namedtuples that expose basic message attributes: topic, partition, offset, key, and value: ->>> from kafka import KafkaConsumer ->>> consumer = KafkaConsumer('my_favorite_topic') ->>> for msg in consumer: -... print (msg) +.. code:: python + + from kafka import KafkaConsumer + consumer = KafkaConsumer('my_favorite_topic') + for msg in consumer: + print (msg) + +.. code:: python + + # join a consumer group for dynamic partition assignment and offset commits + from kafka import KafkaConsumer + consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group') + for msg in consumer: + print (msg) + +.. code:: python ->>> # join a consumer group for dynamic partition assignment and offset commits ->>> from kafka import KafkaConsumer ->>> consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group') ->>> for msg in consumer: -... print (msg) + # manually assign the partition list for the consumer + from kafka import TopicPartition + consumer = KafkaConsumer(bootstrap_servers='localhost:1234') + consumer.assign([TopicPartition('foobar', 2)]) + msg = next(consumer) ->>> # manually assign the partition list for the consumer ->>> from kafka import TopicPartition ->>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234') ->>> consumer.assign([TopicPartition('foobar', 2)]) ->>> msg = next(consumer) +.. code:: python ->>> # Deserialize msgpack-encoded values ->>> consumer = KafkaConsumer(value_deserializer=msgpack.loads) ->>> consumer.subscribe(['msgpackfoo']) ->>> for msg in consumer: -... assert isinstance(msg.value, dict) + # Deserialize msgpack-encoded values + consumer = KafkaConsumer(value_deserializer=msgpack.loads) + consumer.subscribe(['msgpackfoo']) + for msg in consumer: + assert isinstance(msg.value, dict) KafkaProducer @@ -78,36 +90,50 @@ KafkaProducer The class is intended to operate as similarly as possible to the official java client. See `KafkaProducer `_ for more details. ->>> from kafka import KafkaProducer ->>> producer = KafkaProducer(bootstrap_servers='localhost:1234') ->>> for _ in range(100): -... producer.send('foobar', b'some_message_bytes') +.. code:: python + + from kafka import KafkaProducer + producer = KafkaProducer(bootstrap_servers='localhost:1234') + for _ in range(100): + producer.send('foobar', b'some_message_bytes') + +.. code:: python + + # Block until a single message is sent (or timeout) + future = producer.send('foobar', b'another_message') + result = future.get(timeout=60) + +.. code:: python + + # Block until all pending messages are at least put on the network + # NOTE: This does not guarantee delivery or success! It is really + # only useful if you configure internal batching using linger_ms + producer.flush() + +.. code:: python + + # Use a key for hashed-partitioning + producer.send('foobar', key=b'foo', value=b'bar') ->>> # Block until a single message is sent (or timeout) ->>> future = producer.send('foobar', b'another_message') ->>> result = future.get(timeout=60) +.. code:: python ->>> # Block until all pending messages are at least put on the network ->>> # NOTE: This does not guarantee delivery or success! It is really ->>> # only useful if you configure internal batching using linger_ms ->>> producer.flush() + # Serialize json messages + import json + producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8')) + producer.send('fizzbuzz', {'foo': 'bar'}) ->>> # Use a key for hashed-partitioning ->>> producer.send('foobar', key=b'foo', value=b'bar') +.. code:: python ->>> # Serialize json messages ->>> import json ->>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8')) ->>> producer.send('fizzbuzz', {'foo': 'bar'}) + # Serialize string keys + producer = KafkaProducer(key_serializer=str.encode) + producer.send('flipflap', key='ping', value=b'1234') ->>> # Serialize string keys ->>> producer = KafkaProducer(key_serializer=str.encode) ->>> producer.send('flipflap', key='ping', value=b'1234') +.. code:: python ->>> # Compress messages ->>> producer = KafkaProducer(compression_type='gzip') ->>> for i in range(1000): -... producer.send('foobar', b'msg %d' % i) + # Compress messages + producer = KafkaProducer(compression_type='gzip') + for i in range(1000): + producer.send('foobar', b'msg %d' % i) Thread safety From 88763da301f72759911ec4c0e4dc8b4e9f83e124 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sun, 10 Mar 2024 00:14:12 -0500 Subject: [PATCH 36/38] Fix crc32c's __main__ for Python 3 (#142) * Fix crc32c's __main__ for Python 3 * Remove TODO from _crc32c.py --------- Co-authored-by: Yonatan Goldschmidt --- kafka/record/_crc32c.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/kafka/record/_crc32c.py b/kafka/record/_crc32c.py index 9b51ad8a9..6642b5bbe 100644 --- a/kafka/record/_crc32c.py +++ b/kafka/record/_crc32c.py @@ -139,7 +139,5 @@ def crc(data): if __name__ == "__main__": import sys - # TODO remove the pylint disable once pylint fixes - # https://github.com/PyCQA/pylint/issues/2571 - data = sys.stdin.read() # pylint: disable=assignment-from-no-return + data = sys.stdin.buffer.read() # pylint: disable=assignment-from-no-return print(hex(crc(data))) From b1a4c53cfc426118cca491a552861e9b07592629 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sun, 10 Mar 2024 00:59:32 -0500 Subject: [PATCH 37/38] Strip trailing dot off hostname. (#133) Co-authored-by: Dave Voutila --- kafka/conn.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/conn.py b/kafka/conn.py index 1efb8a0a1..1f3bc2006 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -496,7 +496,7 @@ def _wrap_ssl(self): try: self._sock = self._ssl_context.wrap_socket( self._sock, - server_hostname=self.host, + server_hostname=self.host.rstrip("."), do_handshake_on_connect=False) except ssl.SSLError as e: log.exception('%s: Failed to wrap socket in SSLContext!', self) From 18eaa2d29e7ba3d0f261e620397712b5d67c3a94 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sun, 10 Mar 2024 01:47:35 -0500 Subject: [PATCH 38/38] Handle OSError to properly recycle SSL connection, fix infinite loop (#155) * handling OSError * better error output * removed traceback logging --------- Co-authored-by: Alexander Sibiryakov --- kafka/conn.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/conn.py b/kafka/conn.py index 1f3bc2006..80f17009c 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -510,7 +510,7 @@ def _try_handshake(self): # old ssl in python2.6 will swallow all SSLErrors here... except (SSLWantReadError, SSLWantWriteError): pass - except (SSLZeroReturnError, ConnectionError, TimeoutError, SSLEOFError): + except (SSLZeroReturnError, ConnectionError, TimeoutError, SSLEOFError, ssl.SSLError, OSError) as e: log.warning('SSL connection closed by server during handshake.') self.close(Errors.KafkaConnectionError('SSL connection closed by server during handshake')) # Other SSLErrors will be raised to user