diff --git a/.circleci/config.yml b/.circleci/config.yml index 9b78abdd1..3cc9f5d0e 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -7,6 +7,7 @@ jobs: LOG_LEVEL: DEBUG steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec - run: bundle exec rubocop @@ -40,6 +41,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -72,6 +74,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -104,6 +107,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -136,6 +140,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -168,6 +173,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -200,6 +206,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -232,6 +239,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -264,6 +272,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -296,6 +305,73 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy + - run: bundle install --path vendor/bundle + - run: bundle exec rspec --profile --tag functional spec/functional + + kafka-2.6: + docker: + - image: circleci/ruby:2.5.1-node + environment: + LOG_LEVEL: DEBUG + - image: wurstmeister/zookeeper + - image: wurstmeister/kafka:2.13-2.6.0 + environment: + KAFKA_ADVERTISED_HOST_NAME: localhost + KAFKA_ADVERTISED_PORT: 9092 + KAFKA_PORT: 9092 + KAFKA_ZOOKEEPER_CONNECT: localhost:2181 + KAFKA_DELETE_TOPIC_ENABLE: true + - image: wurstmeister/kafka:2.13-2.6.0 + environment: + KAFKA_ADVERTISED_HOST_NAME: localhost + KAFKA_ADVERTISED_PORT: 9093 + KAFKA_PORT: 9093 + KAFKA_ZOOKEEPER_CONNECT: localhost:2181 + KAFKA_DELETE_TOPIC_ENABLE: true + - image: wurstmeister/kafka:2.13-2.6.0 + environment: + KAFKA_ADVERTISED_HOST_NAME: localhost + KAFKA_ADVERTISED_PORT: 9094 + KAFKA_PORT: 9094 + KAFKA_ZOOKEEPER_CONNECT: localhost:2181 + KAFKA_DELETE_TOPIC_ENABLE: true + steps: + - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy + - run: bundle install --path vendor/bundle + - run: bundle exec rspec --profile --tag functional spec/functional + + kafka-2.7: + docker: + - image: circleci/ruby:2.5.1-node + environment: + LOG_LEVEL: DEBUG + - image: wurstmeister/zookeeper + - image: wurstmeister/kafka:2.13-2.7.0 + environment: + KAFKA_ADVERTISED_HOST_NAME: localhost + KAFKA_ADVERTISED_PORT: 9092 + KAFKA_PORT: 9092 + KAFKA_ZOOKEEPER_CONNECT: localhost:2181 + KAFKA_DELETE_TOPIC_ENABLE: true + - image: wurstmeister/kafka:2.13-2.7.0 + environment: + KAFKA_ADVERTISED_HOST_NAME: localhost + KAFKA_ADVERTISED_PORT: 9093 + KAFKA_PORT: 9093 + KAFKA_ZOOKEEPER_CONNECT: localhost:2181 + KAFKA_DELETE_TOPIC_ENABLE: true + - image: wurstmeister/kafka:2.13-2.7.0 + environment: + KAFKA_ADVERTISED_HOST_NAME: localhost + KAFKA_ADVERTISED_PORT: 9094 + KAFKA_PORT: 9094 + KAFKA_ZOOKEEPER_CONNECT: localhost:2181 + KAFKA_DELETE_TOPIC_ENABLE: true + steps: + - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -313,3 +389,5 @@ workflows: - kafka-2.3 - kafka-2.4 - kafka-2.5 + - kafka-2.6 + - kafka-2.7 diff --git a/CHANGELOG.md b/CHANGELOG.md index 1084c6e1d..04ff2d26e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,13 @@ Changes and additions to the library will be listed here. ## Unreleased +- Fix `Kafka::TransactionManager#send_offsets_to_txn` (#866). +- Add support for `murmur2` based partitioning. +- Add `resolve_seed_brokers` option to support seed brokers' hostname with multiple addresses (#877). +- Handle SyncGroup responses with a non-zero error and no assignments (#896). + +## 1.3.0 + - Support custom assignment strategy (#846). - Improved Exceptions in TransactionManager (#862). diff --git a/README.md b/README.md index e91f5874d..3a6541726 100644 --- a/README.md +++ b/README.md @@ -129,6 +129,16 @@ Or install it yourself as: Limited support Limited support + + Kafka 2.6 + Limited support + Limited support + + + Kafka 2.7 + Limited support + Limited support + This library is targeting Kafka 0.9 with the v0.4.x series and Kafka 0.10 with the v0.5.x series. There's limited support for Kafka 0.8, and things should work with Kafka 0.11, although there may be performance issues due to changes in the protocol. @@ -144,6 +154,8 @@ This library is targeting Kafka 0.9 with the v0.4.x series and Kafka 0.10 with t - **Kafka 2.3:** Everything that works with Kafka 2.2 should still work, but so far no features specific to Kafka 2.3 have been added. - **Kafka 2.4:** Everything that works with Kafka 2.3 should still work, but so far no features specific to Kafka 2.4 have been added. - **Kafka 2.5:** Everything that works with Kafka 2.4 should still work, but so far no features specific to Kafka 2.5 have been added. +- **Kafka 2.6:** Everything that works with Kafka 2.5 should still work, but so far no features specific to Kafka 2.6 have been added. +- **Kafka 2.7:** Everything that works with Kafka 2.6 should still work, but so far no features specific to Kafka 2.7 have been added. This library requires Ruby 2.1 or higher. @@ -164,6 +176,12 @@ require "kafka" kafka = Kafka.new(["kafka1:9092", "kafka2:9092"], client_id: "my-application") ``` +You can also use a hostname with seed brokers' IP addresses: + +```ruby +kafka = Kafka.new("seed-brokers:9092", client_id: "my-application", resolve_seed_brokers: true) +``` + ### Producing Messages to Kafka The simplest way to write a message to a Kafka topic is to call `#deliver_message`: @@ -370,6 +388,16 @@ partitioner = -> (partition_count, message) { ... } Kafka.new(partitioner: partitioner, ...) ``` +##### Supported partitioning schemes + +In order for semantic partitioning to work a `partition_key` must map to the same partition number every time. The general approach, and the one used by this library, is to hash the key and mod it by the number of partitions. There are many different algorithms that can be used to calculate a hash. By default `crc32` is used. `murmur2` is also supported for compatibility with Java based Kafka producers. + +To use `murmur2` hashing pass it as an argument to `Partitioner`. For example: + +```ruby +Kafka.new(partitioner: Kafka::Partitioner.new(hash_function: :murmur2)) +``` + #### Buffering and Error Handling The producer is designed for resilience in the face of temporary network errors, Kafka broker failovers, and other issues that prevent the client from writing messages to the destination topics. It does this by employing local, in-memory buffers. Only when messages are acknowledged by a Kafka broker will they be removed from the buffer. diff --git a/lib/kafka/client.rb b/lib/kafka/client.rb index 23829e0ed..97d5ef3ce 100644 --- a/lib/kafka/client.rb +++ b/lib/kafka/client.rb @@ -74,16 +74,22 @@ class Client # the SSL certificate and the signing chain of the certificate have the correct domains # based on the CA certificate # + # @param resolve_seed_brokers [Boolean] whether to resolve each hostname of the seed brokers. + # If a broker is resolved to multiple IP addresses, the client tries to connect to each + # of the addresses until it can connect. + # # @return [Client] def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_timeout: nil, socket_timeout: nil, ssl_ca_cert_file_path: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil, ssl_client_cert_key_password: nil, ssl_client_cert_chain: nil, sasl_gssapi_principal: nil, sasl_gssapi_keytab: nil, sasl_plain_authzid: '', sasl_plain_username: nil, sasl_plain_password: nil, sasl_scram_username: nil, sasl_scram_password: nil, sasl_scram_mechanism: nil, - sasl_over_ssl: true, ssl_ca_certs_from_system: false, partitioner: nil, sasl_oauth_token_provider: nil, ssl_verify_hostname: true) + sasl_over_ssl: true, ssl_ca_certs_from_system: false, partitioner: nil, sasl_oauth_token_provider: nil, ssl_verify_hostname: true, + resolve_seed_brokers: false) @logger = TaggedLogger.new(logger) @instrumenter = Instrumenter.new(client_id: client_id) @seed_brokers = normalize_seed_brokers(seed_brokers) + @resolve_seed_brokers = resolve_seed_brokers ssl_context = SslContext.build( ca_cert_file_path: ssl_ca_cert_file_path, @@ -811,6 +817,7 @@ def initialize_cluster seed_brokers: @seed_brokers, broker_pool: broker_pool, logger: @logger, + resolve_seed_brokers: @resolve_seed_brokers, ) end diff --git a/lib/kafka/cluster.rb b/lib/kafka/cluster.rb index bc3717e8c..8aaf19154 100644 --- a/lib/kafka/cluster.rb +++ b/lib/kafka/cluster.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require "kafka/broker_pool" +require "resolv" require "set" module Kafka @@ -18,7 +19,8 @@ class Cluster # @param seed_brokers [Array] # @param broker_pool [Kafka::BrokerPool] # @param logger [Logger] - def initialize(seed_brokers:, broker_pool:, logger:) + # @param resolve_seed_brokers [Boolean] See {Kafka::Client#initialize} + def initialize(seed_brokers:, broker_pool:, logger:, resolve_seed_brokers: false) if seed_brokers.empty? raise ArgumentError, "At least one seed broker must be configured" end @@ -26,6 +28,7 @@ def initialize(seed_brokers:, broker_pool:, logger:) @logger = TaggedLogger.new(logger) @seed_brokers = seed_brokers @broker_pool = broker_pool + @resolve_seed_brokers = resolve_seed_brokers @cluster_info = nil @stale = true @@ -117,7 +120,7 @@ def get_leader(topic, partition) # Finds the broker acting as the coordinator of the given group. # - # @param group_id: [String] + # @param group_id [String] # @return [Broker] the broker that's currently coordinator. def get_group_coordinator(group_id:) @logger.debug "Getting group coordinator for `#{group_id}`" @@ -127,7 +130,7 @@ def get_group_coordinator(group_id:) # Finds the broker acting as the coordinator of the given transaction. # - # @param transactional_id: [String] + # @param transactional_id [String] # @return [Broker] the broker that's currently coordinator. def get_transaction_coordinator(transactional_id:) @logger.debug "Getting transaction coordinator for `#{transactional_id}`" @@ -418,32 +421,35 @@ def get_leader_id(topic, partition) # @return [Protocol::MetadataResponse] the cluster metadata. def fetch_cluster_info errors = [] - @seed_brokers.shuffle.each do |node| - @logger.info "Fetching cluster metadata from #{node}" - - begin - broker = @broker_pool.connect(node.hostname, node.port) - cluster_info = broker.fetch_metadata(topics: @target_topics) - - if cluster_info.brokers.empty? - @logger.error "No brokers in cluster" - else - @logger.info "Discovered cluster metadata; nodes: #{cluster_info.brokers.join(', ')}" - - @stale = false - - return cluster_info + (@resolve_seed_brokers ? Resolv.getaddresses(node.hostname).shuffle : [node.hostname]).each do |hostname_or_ip| + node_info = node.to_s + node_info << " (#{hostname_or_ip})" if node.hostname != hostname_or_ip + @logger.info "Fetching cluster metadata from #{node_info}" + + begin + broker = @broker_pool.connect(hostname_or_ip, node.port) + cluster_info = broker.fetch_metadata(topics: @target_topics) + + if cluster_info.brokers.empty? + @logger.error "No brokers in cluster" + else + @logger.info "Discovered cluster metadata; nodes: #{cluster_info.brokers.join(', ')}" + + @stale = false + + return cluster_info + end + rescue Error => e + @logger.error "Failed to fetch metadata from #{node_info}: #{e}" + errors << [node_info, e] + ensure + broker.disconnect unless broker.nil? end - rescue Error => e - @logger.error "Failed to fetch metadata from #{node}: #{e}" - errors << [node, e] - ensure - broker.disconnect unless broker.nil? end end - error_description = errors.map {|node, exception| "- #{node}: #{exception}" }.join("\n") + error_description = errors.map {|node_info, exception| "- #{node_info}: #{exception}" }.join("\n") raise ConnectionError, "Could not connect to any of the seed brokers:\n#{error_description}" end diff --git a/lib/kafka/crc32_hash.rb b/lib/kafka/crc32_hash.rb new file mode 100644 index 000000000..1849008a6 --- /dev/null +++ b/lib/kafka/crc32_hash.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +require "zlib" + +module Kafka + class Crc32Hash + + # crc32 is supported natively + def load; end + + def hash(value) + Zlib.crc32(value) + end + end +end diff --git a/lib/kafka/digest.rb b/lib/kafka/digest.rb new file mode 100644 index 000000000..8ba4cc206 --- /dev/null +++ b/lib/kafka/digest.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +require "kafka/crc32_hash" +require "kafka/murmur2_hash" + +module Kafka + module Digest + FUNCTIONS_BY_NAME = { + :crc32 => Crc32Hash.new, + :murmur2 => Murmur2Hash.new + }.freeze + + def self.find_digest(name) + digest = FUNCTIONS_BY_NAME.fetch(name) do + raise LoadError, "Unknown hash function #{name}" + end + + digest.load + digest + end + end +end diff --git a/lib/kafka/murmur2_hash.rb b/lib/kafka/murmur2_hash.rb new file mode 100644 index 000000000..a6223b0d6 --- /dev/null +++ b/lib/kafka/murmur2_hash.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +module Kafka + class Murmur2Hash + SEED = [0x9747b28c].pack('L') + + def load + require 'digest/murmurhash' + rescue LoadError + raise LoadError, "using murmur2 hashing requires adding a dependency on the `digest-murmurhash` gem to your Gemfile." + end + + def hash(value) + ::Digest::MurmurHash2.rawdigest(value, SEED) & 0x7fffffff + end + end +end diff --git a/lib/kafka/partitioner.rb b/lib/kafka/partitioner.rb index f4fcd2882..e11052442 100644 --- a/lib/kafka/partitioner.rb +++ b/lib/kafka/partitioner.rb @@ -1,11 +1,16 @@ # frozen_string_literal: true -require "zlib" +require "kafka/digest" module Kafka # Assigns partitions to messages. class Partitioner + # @param hash_function [Symbol, nil] the algorithm used to compute a messages + # destination partition. Default is :crc32 + def initialize(hash_function: nil) + @digest = Digest.find_digest(hash_function || :crc32) + end # Assigns a partition number based on a partition key. If no explicit # partition key is provided, the message key will be used instead. @@ -28,7 +33,7 @@ def call(partition_count, message) if key.nil? rand(partition_count) else - Zlib.crc32(key) % partition_count + @digest.hash(key) % partition_count end end end diff --git a/lib/kafka/protocol/add_offsets_to_txn_response.rb b/lib/kafka/protocol/add_offsets_to_txn_response.rb index 830613dfb..7ac824cd7 100644 --- a/lib/kafka/protocol/add_offsets_to_txn_response.rb +++ b/lib/kafka/protocol/add_offsets_to_txn_response.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + module Kafka module Protocol class AddOffsetsToTxnResponse diff --git a/lib/kafka/protocol/encoder.rb b/lib/kafka/protocol/encoder.rb index d4bc7a391..56a584a01 100644 --- a/lib/kafka/protocol/encoder.rb +++ b/lib/kafka/protocol/encoder.rb @@ -126,7 +126,7 @@ def write_varint_string(string) # Writes an integer under varints serializing to the IO object. # https://developers.google.com/protocol-buffers/docs/encoding#varints # - # @param string [Integer] + # @param int [Integer] # @return [nil] def write_varint(int) int = int << 1 diff --git a/lib/kafka/protocol/record_batch.rb b/lib/kafka/protocol/record_batch.rb index 4201cc737..75c861234 100644 --- a/lib/kafka/protocol/record_batch.rb +++ b/lib/kafka/protocol/record_batch.rb @@ -77,7 +77,7 @@ def encode(encoder) record_batch_encoder.write_int8(MAGIC_BYTE) body = encode_record_batch_body - crc = Digest::CRC32c.checksum(body) + crc = ::Digest::CRC32c.checksum(body) record_batch_encoder.write_int32(crc) record_batch_encoder.write(body) @@ -213,7 +213,7 @@ def self.decode(decoder) end def mark_control_record - if in_transaction && is_control_batch + if is_control_batch record = @records.first record.is_control_record = true unless record.nil? end diff --git a/lib/kafka/protocol/sync_group_response.rb b/lib/kafka/protocol/sync_group_response.rb index a1e4aab83..148945095 100644 --- a/lib/kafka/protocol/sync_group_response.rb +++ b/lib/kafka/protocol/sync_group_response.rb @@ -13,9 +13,12 @@ def initialize(error_code:, member_assignment:) end def self.decode(decoder) + error_code = decoder.int16 + member_assignment_bytes = decoder.bytes + new( - error_code: decoder.int16, - member_assignment: MemberAssignment.decode(Decoder.from_string(decoder.bytes)), + error_code: error_code, + member_assignment: member_assignment_bytes ? MemberAssignment.decode(Decoder.from_string(member_assignment_bytes)) : nil ) end end diff --git a/lib/kafka/protocol/txn_offset_commit_response.rb b/lib/kafka/protocol/txn_offset_commit_response.rb index 5bd3363fc..628af7d66 100644 --- a/lib/kafka/protocol/txn_offset_commit_response.rb +++ b/lib/kafka/protocol/txn_offset_commit_response.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + module Kafka module Protocol class TxnOffsetCommitResponse diff --git a/lib/kafka/version.rb b/lib/kafka/version.rb index 22636a095..523fd37e0 100644 --- a/lib/kafka/version.rb +++ b/lib/kafka/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module Kafka - VERSION = "1.2.0" + VERSION = "1.3.0" end diff --git a/ruby-kafka.gemspec b/ruby-kafka.gemspec index 2589e97d5..d50092dc5 100644 --- a/ruby-kafka.gemspec +++ b/ruby-kafka.gemspec @@ -33,6 +33,7 @@ Gem::Specification.new do |spec| spec.add_development_dependency "rake", "~> 10.0" spec.add_development_dependency "rspec" spec.add_development_dependency "pry" + spec.add_development_dependency "digest-murmurhash" spec.add_development_dependency "dotenv" spec.add_development_dependency "docker-api" spec.add_development_dependency "rspec-benchmark" diff --git a/spec/async_producer_spec.rb b/spec/async_producer_spec.rb index 7a0ed1351..ff91e9c8a 100644 --- a/spec/async_producer_spec.rb +++ b/spec/async_producer_spec.rb @@ -76,6 +76,8 @@ def instrument(name, payload = {}) sleep 0.2 # wait for worker to call produce expect(sync_producer).to have_received(:produce) + + async_producer.shutdown end it "retries until configured max_retries" do @@ -89,6 +91,8 @@ def instrument(name, payload = {}) metric = instrumenter.metrics_for("error.async_producer").first expect(metric.payload[:error]).to be_a(Kafka::BufferOverflow) expect(sync_producer).to have_received(:produce).exactly(3).times + + async_producer.shutdown end it "requires `topic` to be a String" do diff --git a/spec/cluster_spec.rb b/spec/cluster_spec.rb index acefa1f62..9d1d7d32d 100644 --- a/spec/cluster_spec.rb +++ b/spec/cluster_spec.rb @@ -103,4 +103,47 @@ }.to raise_exception(ArgumentError) end end + + describe "#cluster_info" do + let(:cluster) { + Kafka::Cluster.new( + seed_brokers: [URI("kafka://test1:9092")], + broker_pool: broker_pool, + logger: LOGGER, + resolve_seed_brokers: resolve_seed_brokers, + ) + } + + before do + allow(broker).to receive(:fetch_metadata) { raise Kafka::ConnectionError, "Operation timed out" } + allow(broker).to receive(:disconnect) + end + + context "when resolve_seed_brokers is false" do + let(:resolve_seed_brokers) { false } + + it "tries the seed broker hostnames as is" do + expect(broker_pool).to receive(:connect).with("test1", 9092) { broker } + expect { + cluster.cluster_info + }.to raise_error(Kafka::ConnectionError, %r{kafka://test1:9092: Operation timed out}) + end + end + + context "when resolve_seed_brokers is true" do + let(:resolve_seed_brokers) { true } + + before do + allow(Resolv).to receive(:getaddresses) { ["127.0.0.1", "::1"] } + end + + it "tries all the resolved IP addresses" do + expect(broker_pool).to receive(:connect).with("127.0.0.1", 9092) { broker } + expect(broker_pool).to receive(:connect).with("::1", 9092) { broker } + expect { + cluster.cluster_info + }.to raise_error(Kafka::ConnectionError, %r{kafka://test1:9092 \(127\.0\.0\.1\): Operation timed out}) + end + end + end end diff --git a/spec/digest_spec.rb b/spec/digest_spec.rb new file mode 100644 index 000000000..c4b98a8bf --- /dev/null +++ b/spec/digest_spec.rb @@ -0,0 +1,33 @@ +# frozen_string_literal: true + +describe Kafka::Digest do + describe "crc32" do + let(:digest) { Kafka::Digest.find_digest(:crc32) } + + it "is supported" do + expect(digest).to be_truthy + end + + it "produces hash for value" do + expect(digest.hash("yolo")).to eq(1623057525) + end + end + + describe "murmur2" do + let(:digest) { Kafka::Digest.find_digest(:murmur2) } + + it "is supported" do + expect(digest).to be_truthy + end + + it "produces hash for value" do + expect(digest.hash("yolo")).to eq(1633766415) + end + end + + describe "unknown hash function" do + it "raises" do + expect { Kafka::Digest.find_digest(:yolo) }.to raise_error + end + end +end diff --git a/spec/functional/consumer_group_spec.rb b/spec/functional/consumer_group_spec.rb index 23cc1525e..322ab9e63 100644 --- a/spec/functional/consumer_group_spec.rb +++ b/spec/functional/consumer_group_spec.rb @@ -476,12 +476,21 @@ def call(cluster:, members:, partitions:) end end + joined_consumers = [] consumers = 2.times.map do |i| assignment_strategy = assignment_strategy_class.new(i + 1) kafka = Kafka.new(kafka_brokers, client_id: "test", logger: logger) consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time, assignment_strategy: assignment_strategy) consumer.subscribe(topic) + + allow(consumer).to receive(:trigger_heartbeat).and_wrap_original do |m, *args| + joined_consumers |= [consumer] + # Wait until all the consumers try to join to prevent one consumer from processing all messages + raise Kafka::HeartbeatError if joined_consumers.size < consumers.size + m.call(*args) + end + consumer end diff --git a/spec/functional/topic_management_spec.rb b/spec/functional/topic_management_spec.rb index d10b5fdd6..3dbdcaec3 100644 --- a/spec/functional/topic_management_spec.rb +++ b/spec/functional/topic_management_spec.rb @@ -70,11 +70,10 @@ def with_retry(opts = {}, &block) topic = generate_topic_name kafka.create_topic(topic, num_partitions: 3) - configs = kafka.describe_topic(topic, %w(retention.ms retention.bytes non_exists)) + configs = kafka.describe_topic(topic, %w(retention.ms)) - expect(configs.keys).to eql(%w(retention.ms retention.bytes)) + expect(configs.keys).to eql(%w(retention.ms)) expect(configs['retention.ms']).to be_a(String) - expect(configs['retention.bytes']).to be_a(String) end example "alter a topic configuration" do @@ -90,8 +89,9 @@ def with_retry(opts = {}, &block) 'max.message.bytes' => '987654' ) - configs = kafka.describe_topic(topic, %w(retention.ms max.message.bytes)) - expect(configs['retention.ms']).to eql('1234567') - expect(configs['max.message.bytes']).to eql('987654') + retention_configs = kafka.describe_topic(topic, %w(retention.ms)) + expect(retention_configs['retention.ms']).to eql('1234567') + max_msg_bytes_configs = kafka.describe_topic(topic, %w(max.message.bytes)) + expect(max_msg_bytes_configs['max.message.bytes']).to eql('987654') end end diff --git a/spec/partitioner_spec.rb b/spec/partitioner_spec.rb index dafe73557..547dccd13 100644 --- a/spec/partitioner_spec.rb +++ b/spec/partitioner_spec.rb @@ -1,29 +1,81 @@ # frozen_string_literal: true describe Kafka::Partitioner, "#call" do - let(:partitioner) { Kafka::Partitioner.new } let(:message) { double(:message, key: nil, partition_key: "yolo") } - it "deterministically returns a partition number for a partition key and partition count" do - partition = partitioner.call(3, message) - expect(partition).to eq 0 - end + describe "default partitioner" do + let(:partitioner) { Kafka::Partitioner.new } + + it "deterministically returns a partition number for a partition key and partition count" do + partition = partitioner.call(3, message) + expect(partition).to eq 0 + end + + it "falls back to the message key if no partition key is available" do + allow(message).to receive(:partition_key) { nil } + allow(message).to receive(:key) { "hey" } - it "falls back to the message key if no partition key is available" do - allow(message).to receive(:partition_key) { nil } - allow(message).to receive(:key) { "hey" } + partition = partitioner.call(3, message) - partition = partitioner.call(3, message) + expect(partition).to eq 2 + end - expect(partition).to eq 2 + it "randomly picks a partition if the key is nil" do + allow(message).to receive(:key) { nil } + allow(message).to receive(:partition_key) { nil } + + partitions = 30.times.map { partitioner.call(3, message) } + + expect(partitions.uniq).to contain_exactly(0, 1, 2) + end end - it "randomly picks a partition if the key is nil" do - allow(message).to receive(:key) { nil } - allow(message).to receive(:partition_key) { nil } + describe "murmur2 partitioner" do + let(:partitioner) { Kafka::Partitioner.new(hash_function: :murmur2) } + let(:message) { double(:message, key: nil, partition_key: "yolo") } + + it "deterministically returns a partition number for a partition key and partition count" do + partition = partitioner.call(3, message) + expect(partition).to eq 0 + end + + it "falls back to the message key if no partition key is available" do + allow(message).to receive(:partition_key) { nil } + allow(message).to receive(:key) { "hey" } + + partition = partitioner.call(3, message) + + expect(partition).to eq 1 + end + + it "randomly picks a partition if the key is nil" do + allow(message).to receive(:key) { nil } + allow(message).to receive(:partition_key) { nil } + + partitions = 30.times.map { partitioner.call(3, message) } - partitions = 30.times.map { partitioner.call(3, message) } + expect(partitions.uniq).to contain_exactly(0, 1, 2) + end - expect(partitions.uniq).to contain_exactly(0, 1, 2) + it "picks a Java Kafka compatible partition" do + partition_count = 100 + { + # librdkafka test cases taken from tests/0048-partitioner.c + "" => 0x106e08d9 % partition_count, + "this is another string with more length to it perhaps" => 0x4f7703da % partition_count, + "hejsan" => 0x5ec19395 % partition_count, + # Java Kafka test cases taken from UtilsTest.java. + # The Java tests check the result of murmur2 directly, + # so have been ANDd with 0x7fffffff to work here + "21" => (-973932308 & 0x7fffffff) % partition_count, + "foobar" => (-790332482 & 0x7fffffff) % partition_count, + "a-little-bit-long-string" => (-985981536 & 0x7fffffff) % partition_count, + "a-little-bit-longer-string" => (-1486304829 & 0x7fffffff) % partition_count, + "lkjh234lh9fiuh90y23oiuhsafujhadof229phr9h19h89h8" => (-58897971 & 0x7fffffff) % partition_count + }.each do |key, partition| + allow(message).to receive(:partition_key) { key } + expect(partitioner.call(partition_count, message)).to eq partition + end + end end end diff --git a/spec/protocol/sync_group_response_spec.rb b/spec/protocol/sync_group_response_spec.rb new file mode 100644 index 000000000..8979f244c --- /dev/null +++ b/spec/protocol/sync_group_response_spec.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +describe Kafka::Protocol::SyncGroupResponse do + describe ".decode" do + subject(:response) { Kafka::Protocol::SyncGroupResponse.decode(decoder) } + + let(:decoder) { Kafka::Protocol::Decoder.new(buffer) } + let(:buffer) { StringIO.new(response_bytes) } + + context "the response is successful" do + let(:response_bytes) { "\x00\x00\x00\x00\x007\x00\x00\x00\x00\x00\x01\x00\x1Fsome-topic-f064d6897583eb395896\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\xFF\xFF\xFF\xFF" } + + it "decodes the response including the member assignment" do + expect(response.error_code).to eq 0 + expect(response.member_assignment.topics).to eq({ "some-topic-f064d6897583eb395896" => [0, 1] }) + end + end + + context "the response is not successful" do + let(:response_bytes) { "\x00\x19\xFF\xFF\xFF\xFF" } + + it "decodes the response including the member assignment" do + expect(response.error_code).to eq 25 + expect(response.member_assignment).to be_nil + end + end + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 32563ab04..1e8a44202 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -61,9 +61,9 @@ def generate_topic_name "#{RUN_ID}-topic-#{SecureRandom.uuid}" end - def create_random_topic(*args) + def create_random_topic(**args) topic = generate_topic_name - create_topic(topic, *args) + create_topic(topic, **args) topic end diff --git a/spec/transaction_manager_spec.rb b/spec/transaction_manager_spec.rb index 9895fd1d5..494a02265 100644 --- a/spec/transaction_manager_spec.rb +++ b/spec/transaction_manager_spec.rb @@ -591,10 +591,10 @@ ) ) allow(group_coordinator).to receive(:txn_offset_commit).and_return( - txn_offset_commit_response( + txn_offset_commit_response({ 'hello' => [1], 'world' => [2] - ) + }) ) end @@ -680,12 +680,12 @@ def success_add_partitions_to_txn_response(topics) end def txn_offset_commit_response(topics, error_code: 0) - Kafka::Protocol::AddPartitionsToTxnResponse.new( + Kafka::Protocol::TxnOffsetCommitResponse.new( errors: topics.map do |topic, partitions| - Kafka::Protocol::AddPartitionsToTxnResponse::TopicPartitionsError.new( + Kafka::Protocol::TxnOffsetCommitResponse::TopicPartitionsError.new( topic: topic, partitions: partitions.map do |partition| - Kafka::Protocol::AddPartitionsToTxnResponse::PartitionError.new( + Kafka::Protocol::TxnOffsetCommitResponse::PartitionError.new( partition: partition, error_code: error_code )