Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
d3796da
v1.3.0
dasch Oct 14, 2020
d20b62a
fix Kafka::TransactionManager#send_offsets_to_txn
stasCF Oct 21, 2020
8d25a79
fix rubocop offense
stasCF Oct 21, 2020
9da0459
Add kafka 2.6.0 to Circle CI
methodmissing Nov 11, 2020
fbf5273
Add Kafka 2.6 support to README
methodmissing Nov 11, 2020
d48aab8
add info to changelog
stasCF Nov 12, 2020
1f72b1f
Merge pull request #866 from stasCF/fix-send-offsets-to-txn
dasch Nov 12, 2020
8e00bea
Merge pull request #869 from Shopify/kafka-2.6.0
dasch Nov 12, 2020
3e6ea9f
Resolve RSpec::Mocks::OutsideOfExampleError
abicky Dec 12, 2020
ca33e40
Install cmake to install snappy
abicky Dec 12, 2020
6e25c4c
Resolve "Passing the keyword argument as ..." deprecation warning
abicky Dec 12, 2020
fe2fb3d
Install cmake to install snappy
abicky Dec 12, 2020
7d1c384
Make "consuming messages with a custom assignment strategy" stable
abicky Dec 12, 2020
cda7667
Install cmake to install snappy
abicky Dec 12, 2020
fc5c69d
Merge pull request #875 from abicky/resolve-deprecation-warning
dasch Dec 30, 2020
6d80b33
Merge pull request #874 from abicky/resolve-outside-of-example-error
dasch Dec 30, 2020
fe7bc77
Apply suggestions from code review
abicky Dec 30, 2020
49c6194
Merge pull request #876 from abicky/make-custom-assignment-strategy-e…
dasch Dec 30, 2020
ad8d8d9
kafka 2.7.0
vvuibert Jan 4, 2021
9028ed7
fix describe topic test
vvuibert Jan 13, 2021
0d6c451
Merge pull request #880 from Shopify/kafka-2.7.0
dasch Jan 14, 2021
b323c63
Adding murmur2_random partition assignment
Jan 25, 2021
db7f5f4
Add partitioner_klass as client param
Jan 27, 2021
0617573
Changes `digest-murmurhash` to be an optional dependency
Feb 1, 2021
4669c20
Revert client API change
Feb 1, 2021
b2f3eed
Small cleanup
Feb 1, 2021
d49e7c4
small cleanup
Feb 2, 2021
5a3374e
Add murmur2 to readme
Feb 2, 2021
c52c238
Add murmur2 to changelog
Feb 2, 2021
f3e5078
Add Partitioner doc
Feb 2, 2021
fd89412
Correcr readme doc
Feb 2, 2021
a56d16b
Merge pull request #884 from zendesk/divo/murmur2
dasch Feb 3, 2021
56a0476
Updated to ignore all control batches
rkruze Apr 7, 2021
2cf27e4
Support seed brokers' hostname with multiple addresses
abicky Dec 12, 2020
f07b4c1
Fix "@param tag has unknown parameter name"
abicky Apr 14, 2021
ef7c21c
Add resolve_seed_brokers option to changelog
abicky Apr 16, 2021
e766e2c
Merge pull request #877 from abicky/support-seed-broker-with-multiple…
dasch Apr 19, 2021
1e28e9b
Merge pull request #897 from abicky/fix-yard-annotation
dasch Apr 19, 2021
733a47d
Merge pull request #893 from rkruze/record_batch
dasch Apr 19, 2021
d9cb8d1
Handle SyncGroup responses with a non-zero error and no assignments
Apr 14, 2021
36e6a4b
Merge pull request #896 from BrentWheeldon/BrentWheeldon/group-sync-f…
dasch Apr 19, 2021
e6a2f2d
Merge remote-tracking branch 'upstream/master' into merge-upstream-ma…
May 5, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ jobs:
LOG_LEVEL: DEBUG
steps:
- checkout
- run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy
- run: bundle install --path vendor/bundle
- run: bundle exec rspec
- run: bundle exec rubocop
Expand Down Expand Up @@ -40,6 +41,7 @@ jobs:
KAFKA_DELETE_TOPIC_ENABLE: true
steps:
- checkout
- run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy
- run: bundle install --path vendor/bundle
- run: bundle exec rspec --profile --tag functional spec/functional

Expand Down Expand Up @@ -72,6 +74,7 @@ jobs:
KAFKA_DELETE_TOPIC_ENABLE: true
steps:
- checkout
- run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy
- run: bundle install --path vendor/bundle
- run: bundle exec rspec --profile --tag functional spec/functional

Expand Down Expand Up @@ -104,6 +107,7 @@ jobs:
KAFKA_DELETE_TOPIC_ENABLE: true
steps:
- checkout
- run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy
- run: bundle install --path vendor/bundle
- run: bundle exec rspec --profile --tag functional spec/functional

Expand Down Expand Up @@ -136,6 +140,7 @@ jobs:
KAFKA_DELETE_TOPIC_ENABLE: true
steps:
- checkout
- run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy
- run: bundle install --path vendor/bundle
- run: bundle exec rspec --profile --tag functional spec/functional

Expand Down Expand Up @@ -168,6 +173,7 @@ jobs:
KAFKA_DELETE_TOPIC_ENABLE: true
steps:
- checkout
- run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy
- run: bundle install --path vendor/bundle
- run: bundle exec rspec --profile --tag functional spec/functional

Expand Down Expand Up @@ -200,6 +206,7 @@ jobs:
KAFKA_DELETE_TOPIC_ENABLE: true
steps:
- checkout
- run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy
- run: bundle install --path vendor/bundle
- run: bundle exec rspec --profile --tag functional spec/functional

Expand Down Expand Up @@ -232,6 +239,7 @@ jobs:
KAFKA_DELETE_TOPIC_ENABLE: true
steps:
- checkout
- run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy
- run: bundle install --path vendor/bundle
- run: bundle exec rspec --profile --tag functional spec/functional

Expand Down Expand Up @@ -264,6 +272,7 @@ jobs:
KAFKA_DELETE_TOPIC_ENABLE: true
steps:
- checkout
- run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy
- run: bundle install --path vendor/bundle
- run: bundle exec rspec --profile --tag functional spec/functional

Expand Down Expand Up @@ -296,6 +305,73 @@ jobs:
KAFKA_DELETE_TOPIC_ENABLE: true
steps:
- checkout
- run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy
- run: bundle install --path vendor/bundle
- run: bundle exec rspec --profile --tag functional spec/functional

kafka-2.6:
docker:
- image: circleci/ruby:2.5.1-node
environment:
LOG_LEVEL: DEBUG
- image: wurstmeister/zookeeper
- image: wurstmeister/kafka:2.13-2.6.0
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_PORT: 9092
KAFKA_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_DELETE_TOPIC_ENABLE: true
- image: wurstmeister/kafka:2.13-2.6.0
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_PORT: 9093
KAFKA_PORT: 9093
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_DELETE_TOPIC_ENABLE: true
- image: wurstmeister/kafka:2.13-2.6.0
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_PORT: 9094
KAFKA_PORT: 9094
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_DELETE_TOPIC_ENABLE: true
steps:
- checkout
- run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy
- run: bundle install --path vendor/bundle
- run: bundle exec rspec --profile --tag functional spec/functional

kafka-2.7:
docker:
- image: circleci/ruby:2.5.1-node
environment:
LOG_LEVEL: DEBUG
- image: wurstmeister/zookeeper
- image: wurstmeister/kafka:2.13-2.7.0
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_PORT: 9092
KAFKA_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_DELETE_TOPIC_ENABLE: true
- image: wurstmeister/kafka:2.13-2.7.0
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_PORT: 9093
KAFKA_PORT: 9093
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_DELETE_TOPIC_ENABLE: true
- image: wurstmeister/kafka:2.13-2.7.0
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_PORT: 9094
KAFKA_PORT: 9094
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_DELETE_TOPIC_ENABLE: true
steps:
- checkout
- run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy
- run: bundle install --path vendor/bundle
- run: bundle exec rspec --profile --tag functional spec/functional

Expand All @@ -313,3 +389,5 @@ workflows:
- kafka-2.3
- kafka-2.4
- kafka-2.5
- kafka-2.6
- kafka-2.7
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ Changes and additions to the library will be listed here.

## Unreleased

- Fix `Kafka::TransactionManager#send_offsets_to_txn` (#866).
- Add support for `murmur2` based partitioning.
- Add `resolve_seed_brokers` option to support seed brokers' hostname with multiple addresses (#877).
- Handle SyncGroup responses with a non-zero error and no assignments (#896).

## 1.3.0

- Support custom assignment strategy (#846).
- Improved Exceptions in TransactionManager (#862).

Expand Down
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,16 @@ Or install it yourself as:
<td>Limited support</td>
<td>Limited support</td>
</tr>
<tr>
<th>Kafka 2.6</th>
<td>Limited support</td>
<td>Limited support</td>
</tr>
<tr>
<th>Kafka 2.7</th>
<td>Limited support</td>
<td>Limited support</td>
</tr>
</table>

This library is targeting Kafka 0.9 with the v0.4.x series and Kafka 0.10 with the v0.5.x series. There's limited support for Kafka 0.8, and things should work with Kafka 0.11, although there may be performance issues due to changes in the protocol.
Expand All @@ -144,6 +154,8 @@ This library is targeting Kafka 0.9 with the v0.4.x series and Kafka 0.10 with t
- **Kafka 2.3:** Everything that works with Kafka 2.2 should still work, but so far no features specific to Kafka 2.3 have been added.
- **Kafka 2.4:** Everything that works with Kafka 2.3 should still work, but so far no features specific to Kafka 2.4 have been added.
- **Kafka 2.5:** Everything that works with Kafka 2.4 should still work, but so far no features specific to Kafka 2.5 have been added.
- **Kafka 2.6:** Everything that works with Kafka 2.5 should still work, but so far no features specific to Kafka 2.6 have been added.
- **Kafka 2.7:** Everything that works with Kafka 2.6 should still work, but so far no features specific to Kafka 2.7 have been added.

This library requires Ruby 2.1 or higher.

Expand All @@ -164,6 +176,12 @@ require "kafka"
kafka = Kafka.new(["kafka1:9092", "kafka2:9092"], client_id: "my-application")
```

You can also use a hostname with seed brokers' IP addresses:

```ruby
kafka = Kafka.new("seed-brokers:9092", client_id: "my-application", resolve_seed_brokers: true)
```

### Producing Messages to Kafka

The simplest way to write a message to a Kafka topic is to call `#deliver_message`:
Expand Down Expand Up @@ -370,6 +388,16 @@ partitioner = -> (partition_count, message) { ... }
Kafka.new(partitioner: partitioner, ...)
```

##### Supported partitioning schemes

In order for semantic partitioning to work a `partition_key` must map to the same partition number every time. The general approach, and the one used by this library, is to hash the key and mod it by the number of partitions. There are many different algorithms that can be used to calculate a hash. By default `crc32` is used. `murmur2` is also supported for compatibility with Java based Kafka producers.

To use `murmur2` hashing pass it as an argument to `Partitioner`. For example:

```ruby
Kafka.new(partitioner: Kafka::Partitioner.new(hash_function: :murmur2))
```

#### Buffering and Error Handling

The producer is designed for resilience in the face of temporary network errors, Kafka broker failovers, and other issues that prevent the client from writing messages to the destination topics. It does this by employing local, in-memory buffers. Only when messages are acknowledged by a Kafka broker will they be removed from the buffer.
Expand Down
9 changes: 8 additions & 1 deletion lib/kafka/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,22 @@ class Client
# the SSL certificate and the signing chain of the certificate have the correct domains
# based on the CA certificate
#
# @param resolve_seed_brokers [Boolean] whether to resolve each hostname of the seed brokers.
# If a broker is resolved to multiple IP addresses, the client tries to connect to each
# of the addresses until it can connect.
#
# @return [Client]
def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_timeout: nil, socket_timeout: nil,
ssl_ca_cert_file_path: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil,
ssl_client_cert_key_password: nil, ssl_client_cert_chain: nil, sasl_gssapi_principal: nil,
sasl_gssapi_keytab: nil, sasl_plain_authzid: '', sasl_plain_username: nil, sasl_plain_password: nil,
sasl_scram_username: nil, sasl_scram_password: nil, sasl_scram_mechanism: nil,
sasl_over_ssl: true, ssl_ca_certs_from_system: false, partitioner: nil, sasl_oauth_token_provider: nil, ssl_verify_hostname: true)
sasl_over_ssl: true, ssl_ca_certs_from_system: false, partitioner: nil, sasl_oauth_token_provider: nil, ssl_verify_hostname: true,
resolve_seed_brokers: false)
@logger = TaggedLogger.new(logger)
@instrumenter = Instrumenter.new(client_id: client_id)
@seed_brokers = normalize_seed_brokers(seed_brokers)
@resolve_seed_brokers = resolve_seed_brokers

ssl_context = SslContext.build(
ca_cert_file_path: ssl_ca_cert_file_path,
Expand Down Expand Up @@ -811,6 +817,7 @@ def initialize_cluster
seed_brokers: @seed_brokers,
broker_pool: broker_pool,
logger: @logger,
resolve_seed_brokers: @resolve_seed_brokers,
)
end

Expand Down
54 changes: 30 additions & 24 deletions lib/kafka/cluster.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true

require "kafka/broker_pool"
require "resolv"
require "set"

module Kafka
Expand All @@ -18,14 +19,16 @@ class Cluster
# @param seed_brokers [Array<URI>]
# @param broker_pool [Kafka::BrokerPool]
# @param logger [Logger]
def initialize(seed_brokers:, broker_pool:, logger:)
# @param resolve_seed_brokers [Boolean] See {Kafka::Client#initialize}
def initialize(seed_brokers:, broker_pool:, logger:, resolve_seed_brokers: false)
if seed_brokers.empty?
raise ArgumentError, "At least one seed broker must be configured"
end

@logger = TaggedLogger.new(logger)
@seed_brokers = seed_brokers
@broker_pool = broker_pool
@resolve_seed_brokers = resolve_seed_brokers
@cluster_info = nil
@stale = true

Expand Down Expand Up @@ -117,7 +120,7 @@ def get_leader(topic, partition)

# Finds the broker acting as the coordinator of the given group.
#
# @param group_id: [String]
# @param group_id [String]
# @return [Broker] the broker that's currently coordinator.
def get_group_coordinator(group_id:)
@logger.debug "Getting group coordinator for `#{group_id}`"
Expand All @@ -127,7 +130,7 @@ def get_group_coordinator(group_id:)

# Finds the broker acting as the coordinator of the given transaction.
#
# @param transactional_id: [String]
# @param transactional_id [String]
# @return [Broker] the broker that's currently coordinator.
def get_transaction_coordinator(transactional_id:)
@logger.debug "Getting transaction coordinator for `#{transactional_id}`"
Expand Down Expand Up @@ -418,32 +421,35 @@ def get_leader_id(topic, partition)
# @return [Protocol::MetadataResponse] the cluster metadata.
def fetch_cluster_info
errors = []

@seed_brokers.shuffle.each do |node|
@logger.info "Fetching cluster metadata from #{node}"

begin
broker = @broker_pool.connect(node.hostname, node.port)
cluster_info = broker.fetch_metadata(topics: @target_topics)

if cluster_info.brokers.empty?
@logger.error "No brokers in cluster"
else
@logger.info "Discovered cluster metadata; nodes: #{cluster_info.brokers.join(', ')}"

@stale = false

return cluster_info
(@resolve_seed_brokers ? Resolv.getaddresses(node.hostname).shuffle : [node.hostname]).each do |hostname_or_ip|
node_info = node.to_s
node_info << " (#{hostname_or_ip})" if node.hostname != hostname_or_ip
@logger.info "Fetching cluster metadata from #{node_info}"

begin
broker = @broker_pool.connect(hostname_or_ip, node.port)
cluster_info = broker.fetch_metadata(topics: @target_topics)

if cluster_info.brokers.empty?
@logger.error "No brokers in cluster"
else
@logger.info "Discovered cluster metadata; nodes: #{cluster_info.brokers.join(', ')}"

@stale = false

return cluster_info
end
rescue Error => e
@logger.error "Failed to fetch metadata from #{node_info}: #{e}"
errors << [node_info, e]
ensure
broker.disconnect unless broker.nil?
end
rescue Error => e
@logger.error "Failed to fetch metadata from #{node}: #{e}"
errors << [node, e]
ensure
broker.disconnect unless broker.nil?
end
end

error_description = errors.map {|node, exception| "- #{node}: #{exception}" }.join("\n")
error_description = errors.map {|node_info, exception| "- #{node_info}: #{exception}" }.join("\n")

raise ConnectionError, "Could not connect to any of the seed brokers:\n#{error_description}"
end
Expand Down
15 changes: 15 additions & 0 deletions lib/kafka/crc32_hash.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# frozen_string_literal: true

require "zlib"

module Kafka
class Crc32Hash

# crc32 is supported natively
def load; end

def hash(value)
Zlib.crc32(value)
end
end
end
22 changes: 22 additions & 0 deletions lib/kafka/digest.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# frozen_string_literal: true

require "kafka/crc32_hash"
require "kafka/murmur2_hash"

module Kafka
module Digest
FUNCTIONS_BY_NAME = {
:crc32 => Crc32Hash.new,
:murmur2 => Murmur2Hash.new
}.freeze

def self.find_digest(name)
digest = FUNCTIONS_BY_NAME.fetch(name) do
raise LoadError, "Unknown hash function #{name}"
end

digest.load
digest
end
end
end
Loading