Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP-345 Static membership implementation #2333

Open
wants to merge 40 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
62b2a83
KIP-345 Add static consumer membership support
Oct 21, 2022
6242c03
KIP-345 Add examples to docs
Oct 21, 2022
0d75205
KIP-345 Add leave_group_on_close flag
Oct 23, 2022
4e962e2
KIP-345 Add tests for static membership
Oct 23, 2022
7e474ab
KIP-345 Update docs for leave_group_on_close option
KazakovDenis Oct 24, 2022
b95e46d
Rename project from kafka-python to kafka-python-ng (#1)
wbarnha Mar 7, 2024
78c74c0
Fix artifact downloads for release
wbarnha Mar 7, 2024
e796019
Fix badge links in README.rst
wbarnha Mar 8, 2024
38e159a
Reconfigure tests to complete in a more timely manner and skip some i…
wbarnha Mar 8, 2024
c5b9918
Merge branch 'master' into KIP-345
wbarnha Mar 8, 2024
e762321
Test Kafka 0.8.2.2 using Python 3.10 in the meantime (#161)
wbarnha Mar 9, 2024
00750aa
Remove support for EOL'ed versions of Python (#160)
wbarnha Mar 9, 2024
5bd1323
Stop testing Python 3.13 in python-package.yml (#162)
wbarnha Mar 9, 2024
cda8f81
Avoid 100% CPU usage while socket is closed (#156)
wbarnha Mar 9, 2024
c02df08
Fix DescribeConfigsResponse_v1 config_source (#150)
wbarnha Mar 9, 2024
cc35d9d
Merge branch 'master' into KIP-345
wbarnha Mar 9, 2024
a9d5e8b
Update changelog.rst
wbarnha Mar 10, 2024
65eacfb
Fix base class of DescribeClientQuotasResponse_v0 (#144)
wbarnha Mar 10, 2024
e0ebe5d
Update license_file to license_files (#131)
wbarnha Mar 10, 2024
26bb3eb
Update some RST documentation syntax (#130)
wbarnha Mar 10, 2024
88763da
Fix crc32c's __main__ for Python 3 (#142)
wbarnha Mar 10, 2024
96f6054
Merge branch 'master' into KIP-345
wbarnha Mar 10, 2024
b1a4c53
Strip trailing dot off hostname. (#133)
wbarnha Mar 10, 2024
18eaa2d
Handle OSError to properly recycle SSL connection, fix infinite loop …
wbarnha Mar 10, 2024
54cbd63
client_async: Allow throwing an exception upon socket error during (#…
wbarnha Mar 10, 2024
eb6fd9b
Log connection errors at ERROR level (#139)
wbarnha Mar 12, 2024
6ad79a4
Support custom SASL mechanisms including AWS MSK (#170)
wbarnha Mar 18, 2024
deeccfa
Update python-package.yml to have 15m as timeout
wbarnha Mar 18, 2024
fcca556
Run pyupgrade on everything. (#171)
wbarnha Mar 18, 2024
a856dc4
Remove all vendoring (#169)
s-t-e-v-e-n-k Mar 19, 2024
2f2ccb1
Support Describe log dirs (#145)
wbarnha Mar 19, 2024
d4327f5
Merge branch 'master' into KIP-345
wbarnha Mar 19, 2024
33b2754
remove six from base.py
wbarnha Mar 19, 2024
52c0d20
Update base.py
wbarnha Mar 19, 2024
984c996
Update base.py
wbarnha Mar 19, 2024
ecdabd6
Update base.py
wbarnha Mar 19, 2024
f65921b
Update changelog.rst
wbarnha Mar 20, 2024
0259502
Update conftest.py to use request.node.originalname instead for legal…
wbarnha Mar 20, 2024
83f629c
Merge branch 'master' into KIP-345
wbarnha Mar 20, 2024
1b5a10e
Update README.rst
wbarnha Mar 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Run pyupgrade on everything. (#171)
wbarnha authored Mar 18, 2024
commit fcca556619bad504e38f974844091a475af04e6e
2 changes: 0 additions & 2 deletions kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

__title__ = 'kafka'
from kafka.version import __version__
__author__ = 'Dana Powers'
2 changes: 0 additions & 2 deletions kafka/admin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

from kafka.admin.config_resource import ConfigResource, ConfigResourceType
from kafka.admin.client import KafkaAdminClient
from kafka.admin.acl_resource import (ACL, ACLFilter, ResourcePattern, ResourcePatternFilter, ACLOperation,
11 changes: 5 additions & 6 deletions kafka/admin/acl_resource.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from __future__ import absolute_import
from kafka.errors import IllegalArgumentError

# enum in stdlib as of py3.4
@@ -69,7 +68,7 @@ class ACLResourcePatternType(IntEnum):
PREFIXED = 4


class ACLFilter(object):
class ACLFilter:
"""Represents a filter to use with describing and deleting ACLs

The difference between this class and the ACL class is mainly that
@@ -161,7 +160,7 @@ def __init__(
permission_type,
resource_pattern
):
super(ACL, self).__init__(principal, host, operation, permission_type, resource_pattern)
super().__init__(principal, host, operation, permission_type, resource_pattern)
self.validate()

def validate(self):
@@ -173,7 +172,7 @@ def validate(self):
raise IllegalArgumentError("resource_pattern must be a ResourcePattern object")


class ResourcePatternFilter(object):
class ResourcePatternFilter:
def __init__(
self,
resource_type,
@@ -232,13 +231,13 @@ def __init__(
resource_name,
pattern_type=ACLResourcePatternType.LITERAL
):
super(ResourcePattern, self).__init__(resource_type, resource_name, pattern_type)
super().__init__(resource_type, resource_name, pattern_type)
self.validate()

def validate(self):
if self.resource_type == ResourceType.ANY:
raise IllegalArgumentError("resource_type cannot be ANY")
if self.pattern_type in [ACLResourcePatternType.ANY, ACLResourcePatternType.MATCH]:
raise IllegalArgumentError(
"pattern_type cannot be {} on a concrete ResourcePattern".format(self.pattern_type.name)
f"pattern_type cannot be {self.pattern_type.name} on a concrete ResourcePattern"
)
10 changes: 4 additions & 6 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

from collections import defaultdict
import copy
import logging
@@ -32,7 +30,7 @@
log = logging.getLogger(__name__)


class KafkaAdminClient(object):
class KafkaAdminClient:
"""A class for administering the Kafka cluster.

Warning:
@@ -194,7 +192,7 @@ def __init__(self, **configs):
log.debug("Starting KafkaAdminClient with configuration: %s", configs)
extra_configs = set(configs).difference(self.DEFAULT_CONFIG)
if extra_configs:
raise KafkaConfigurationError("Unrecognized configs: {}".format(extra_configs))
raise KafkaConfigurationError(f"Unrecognized configs: {extra_configs}")

self.config = copy.copy(self.DEFAULT_CONFIG)
self.config.update(configs)
@@ -874,7 +872,7 @@ def describe_configs(self, config_resources, include_synonyms=False):
))
else:
raise NotImplementedError(
"Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient.".format(version))
f"Support for DescribeConfigs v{version} has not yet been added to KafkaAdminClient.")

self._wait_for_futures(futures)
return [f.value for f in futures]
@@ -1197,7 +1195,7 @@ def _list_consumer_group_offsets_send_request(self, group_id,
topics_partitions_dict = defaultdict(set)
for topic, partition in partitions:
topics_partitions_dict[topic].add(partition)
topics_partitions = list(six.iteritems(topics_partitions_dict))
topics_partitions = list(topics_partitions_dict.items())
request = OffsetFetchRequest[version](group_id, topics_partitions)
else:
raise NotImplementedError(
4 changes: 1 addition & 3 deletions kafka/admin/config_resource.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

# enum in stdlib as of py3.4
try:
from enum import IntEnum # pylint: disable=import-error
@@ -15,7 +13,7 @@ class ConfigResourceType(IntEnum):
TOPIC = 2


class ConfigResource(object):
class ConfigResource:
"""A class for specifying config resources.
Arguments:
resource_type (ConfigResourceType): the type of kafka resource
5 changes: 1 addition & 4 deletions kafka/admin/new_partitions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
from __future__ import absolute_import


class NewPartitions(object):
class NewPartitions:
"""A class for new partition creation on existing topics. Note that the length of new_assignments, if specified,
must be the difference between the new total number of partitions and the existing number of partitions.
Arguments:
4 changes: 1 addition & 3 deletions kafka/admin/new_topic.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from __future__ import absolute_import

from kafka.errors import IllegalArgumentError


class NewTopic(object):
class NewTopic:
""" A class for new topic creation
Arguments:
name (string): name of the topic
22 changes: 8 additions & 14 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import, division

import collections
import copy
import logging
@@ -32,14 +30,10 @@
from kafka.vendor import socketpair
from kafka.version import __version__

if six.PY2:
ConnectionError = None


log = logging.getLogger('kafka.client')


class KafkaClient(object):
class KafkaClient:
"""
A network client for asynchronous request/response network I/O.

@@ -374,7 +368,7 @@ def _maybe_connect(self, node_id):

if conn is None:
broker = self.cluster.broker_metadata(node_id)
assert broker, 'Broker id %s not in current metadata' % (node_id,)
assert broker, 'Broker id {} not in current metadata'.format(node_id)

log.debug("Initiating connection to node %s at %s:%s",
node_id, broker.host, broker.port)
@@ -686,7 +680,7 @@ def _poll(self, timeout):
unexpected_data = key.fileobj.recv(1)
if unexpected_data: # anything other than a 0-byte read means protocol issues
log.warning('Protocol out of sync on %r, closing', conn)
except socket.error:
except OSError:
pass
conn.close(Errors.KafkaConnectionError('Socket EVENT_READ without in-flight-requests'))
continue
@@ -701,7 +695,7 @@ def _poll(self, timeout):
if conn not in processed and conn.connected() and conn._sock.pending():
self._pending_completion.extend(conn.recv())

for conn in six.itervalues(self._conns):
for conn in self._conns.values():
if conn.requests_timed_out():
log.warning('%s timed out after %s ms. Closing connection.',
conn, conn.config['request_timeout_ms'])
@@ -941,7 +935,7 @@ def wakeup(self):
except socket.timeout:
log.warning('Timeout to send to wakeup socket!')
raise Errors.KafkaTimeoutError()
except socket.error as e:
except OSError as e:
log.warning('Unable to send to wakeup socket!')
if self._raise_upon_socket_err_during_wakeup:
raise e
@@ -951,7 +945,7 @@ def _clear_wake_fd(self):
while True:
try:
self._wake_r.recv(1024)
except socket.error:
except OSError:
break

def _maybe_close_oldest_connection(self):
@@ -981,7 +975,7 @@ def bootstrap_connected(self):
OrderedDict = dict


class IdleConnectionManager(object):
class IdleConnectionManager:
def __init__(self, connections_max_idle_ms):
if connections_max_idle_ms > 0:
self.connections_max_idle = connections_max_idle_ms / 1000
@@ -1043,7 +1037,7 @@ def poll_expired_connection(self):
return None


class KafkaClientMetrics(object):
class KafkaClientMetrics:
def __init__(self, metrics, metric_group_prefix, conns):
self.metrics = metrics
self.metric_group_name = metric_group_prefix + '-metrics'
12 changes: 5 additions & 7 deletions kafka/cluster.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

import collections
import copy
import logging
@@ -16,7 +14,7 @@
log = logging.getLogger(__name__)


class ClusterMetadata(object):
class ClusterMetadata:
"""
A class to manage kafka cluster metadata.

@@ -128,9 +126,9 @@ def available_partitions_for_topic(self, topic):
"""
if topic not in self._partitions:
return None
return set([partition for partition, metadata
in six.iteritems(self._partitions[topic])
if metadata.leader != -1])
return {partition for partition, metadata
in self._partitions[topic].items()
if metadata.leader != -1}

def leader_for_partition(self, partition):
"""Return node_id of leader, -1 unavailable, None if unknown."""
@@ -361,7 +359,7 @@ def add_group_coordinator(self, group, response):

# Use a coordinator-specific node id so that group requests
# get a dedicated connection
node_id = 'coordinator-{}'.format(response.coordinator_id)
node_id = f'coordinator-{response.coordinator_id}'
coordinator = BrokerMetadata(
node_id,
response.host,
6 changes: 0 additions & 6 deletions kafka/codec.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

import gzip
import io
import platform
@@ -149,10 +147,6 @@ def snappy_encode(payload, xerial_compatible=True, xerial_blocksize=32*1024):
# buffer... likely a python-snappy bug, so just use a slice copy
chunker = lambda payload, i, size: payload[i:size+i]

elif six.PY2:
# Sliced buffer avoids additional copies
# pylint: disable-msg=undefined-variable
chunker = lambda payload, i, size: buffer(payload, i, size)
else:
# snappy.compress does not like raw memoryviews, so we have to convert
# tobytes, which is a copy... oh well. it's the thought that counts.
Loading