KafkaEx is an Elixir client for Apache Kafka with support for Kafka versions 0.8.0 and newer.
See http://hexdocs.pm/kafka_ex/ for documentation, https://github.com/kafkaex/kafka_ex/ for code.
KakfaEx supports the following Kafka features:
- Broker and Topic Metadata
- Produce Messages
- Fetch Messages
- Message Compression with Snappy and gzip
- Offset Management (fetch / commit / autocommit)
See Kafka Protocol Documentation and A Guide to the Kafka Protocol for details of these features.
KafkaEx does support consumer groups for message consumption. This feature was added in Kafka 0.8.2. This translates to providing a consumer group name when committing offsets. It is up to the client to assign partitions to workers in this mode of operation.
KafkaEx currently provides limited support for the Kafka ConsumerGroup API that was added in Kafka 0.9.0. Most of the protocol requests are implemented in KafkaEx, but we do not yet support automatic joining and management of consumer group memebership (e.g., automatically assigning partitions to clients). We are actively working on an implementation for automatic consumer group management.
The standard approach for adding dependencies to an Elixir application applies: add KafkaEx to the deps and applications lists in your project's mix.exs file. You may also optionally add snappy-erlang-nif (required only if you want to use snappy compression).
# mix.exs
defmodule MyApp.Mixfile do
# ...
def application do
[
mod: {MyApp, []},
applications: [
# add to existing apps - :logger, etc..
:kafka_ex,
:snappy # if using snappy compression
]
]
end
defp deps do
[
# add to your existing deps
{:kafka_ex, "~> 0.6.5"},
# if using snappy compression
{:snappy, git: "https://github.com/fdmanana/snappy-erlang-nif"}
]
end
end
Then run mix deps.get
to fetch dependencies.
See config/config.exs or KafkaEx.Config for a description of configuration variables, including the Kafka broker list and default consumer group.
You can also override options when creating a worker, see below.
KafkaEx worker processes manage the state of the connection to the Kafka broker.
iex> KafkaEx.create_worker(:pr) # where :pr is the process name of the created worker
{:ok, #PID<0.171.0>}
With custom options:
iex> uris = [{"localhost", 9092}, {"localhost", 9093}, {"localhost", 9094}]
[{"localhost", 9092}, {"localhost", 9093}, {"localhost", 9094}]
iex> KafkaEx.create_worker(:pr, [uris: uris, consumer_group: "kafka_ex", consumer_group_update_interval: 100])
{:ok, #PID<0.172.0>}
You may find you want to create many workers, say in conjunction with
a poolboy
pool. In this scenario you usually won't want to name these worker processes.
To create an unnamed worked with create_worker
:
iex> KafkaEx.create_worker(:no_name) # indicates to the server process not to name the process
{:ok, #PID<0.171.0>}
Note that KafkaEx has a supervisor to manage its workers. If you are using Poolboy or a similar
library, you will want to manually create a worker so that it is not supervised by KafkaEx.Supervisor
.
To do this, you will need to call:
GenServer.start_link(KafkaEx.Config.server_impl,
[
[uris: Application.get_env(:kafka_ex, :brokers),
consumer_group: Application.get_env(:kafka_ex, :consumer_group)],
:no_name
]
)
For all metadata
iex> KafkaEx.metadata
%KafkaEx.Protocol.Metadata.Response{brokers: [%KafkaEx.Protocol.Metadata.Broker{host:
"192.168.59.103",
node_id: 49162, port: 49162, socket: nil}],
topic_metadatas: [%KafkaEx.Protocol.Metadata.TopicMetadata{error_code: :no_error,
partition_metadatas: [%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error,
isrs: [49162], leader: 49162, partition_id: 0, replicas: [49162]}],
topic: "LRCYFQDVWUFEIUCCTFGP"},
%KafkaEx.Protocol.Metadata.TopicMetadata{error_code: :no_error,
partition_metadatas: [%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error,
isrs: [49162], leader: 49162, partition_id: 0, replicas: [49162]}],
topic: "JSIMKCLQYTWXMSIGESYL"},
%KafkaEx.Protocol.Metadata.TopicMetadata{error_code: :no_error,
partition_metadatas: [%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error,
isrs: [49162], leader: 49162, partition_id: 0, replicas: [49162]}],
topic: "SCFRRXXLDFPOWSPQQMSD"},
%KafkaEx.Protocol.Metadata.TopicMetadata{error_code: :no_error,
...
For a specific topic
iex> KafkaEx.metadata(topic: "foo")
%KafkaEx.Protocol.Metadata.Response{brokers: [%KafkaEx.Protocol.Metadata.Broker{host: "192.168.59.103",
node_id: 49162, port: 49162, socket: nil}],
topic_metadatas: [%KafkaEx.Protocol.Metadata.TopicMetadata{error_code: :no_error,
partition_metadatas: [%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error,
isrs: [49162], leader: 49162, partition_id: 0, replicas: [49162]}],
topic: "foo"}]}
Kafka will get the starting offset of the log segment that is created no later than the given timestamp. Due to this, and since the offset request is served only at segment granularity, the offset fetch request returns less accurate results for larger segment sizes.
iex> KafkaEx.offset("foo", 0, {{2015, 3, 29}, {23, 56, 40}}) # Note that the time specified should match/be ahead of time on the server that kafka runs
[%KafkaEx.Protocol.Offset.Response{partition_offsets: [%{error_code: :no_error, offset: [256], partition: 0}], topic: "foo"}]
iex> KafkaEx.latest_offset("foo", 0) # where 0 is the partition
[%KafkaEx.Protocol.Offset.Response{partition_offsets: [%{error_code: :no_error, offsets: [16], partition: 0}], topic: "foo"}]
iex> KafkaEx.earliest_offset("foo", 0) # where 0 is the partition
[%KafkaEx.Protocol.Offset.Response{partition_offsets: [%{error_code: :no_error, offset: [0], partition: 0}], topic: "foo"}]
NOTE You must pass auto_commit: false
in the options for fetch/3
when using Kafka < 0.8.2 or when using :no_consumer_group
.
iex> KafkaEx.fetch("foo", 0, offset: 5) # where 0 is the partition and 5 is the offset we want to start fetching from
[%KafkaEx.Protocol.Fetch.Response{partitions: [%{error_code: :no_error,
hw_mark_offset: 115,
message_set: [
%KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 4264455069, key: nil, offset: 5, value: "hey"},
%KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 4264455069, key: nil, offset: 6, value: "hey"},
%KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 4264455069, key: nil, offset: 7, value: "hey"},
%KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 4264455069, key: nil, offset: 8, value: "hey"},
%KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 4264455069, key: nil, offset: 9, value: "hey"}
...], partition: 0}], topic: "foo"}]
iex> KafkaEx.produce("foo", 0, "hey") # where "foo" is the topic and "hey" is the message
:ok
NOTE You must pass auto_commit: false
in the options for stream/3
when using Kafka < 0.8.2 or when using :no_consumer_group
.
iex> KafkaEx.create_worker(:stream, [uris: [{"localhost", 9092}]])
{:ok, #PID<0.196.0>}
iex> KafkaEx.produce("foo", 0, "hey", worker_name: :stream)
:ok
iex> KafkaEx.produce("foo", 0, "hi", worker_name: :stream)
:ok
iex> KafkaEx.stream("foo", 0, offset: 0) |> Enum.take(2)
[%{attributes: 0, crc: 4264455069, key: nil, offset: 0, value: "hey"},
%{attributes: 0, crc: 4251893211, key: nil, offset: 1, value: "hi"}]
As mentioned, for Kafka < 0.8.2 the stream/3
requires autocommit: false
iex> KafkaEx.stream("foo", 0, offset: 0, auto_commit: false) |> Enum.take(2)
Snappy and gzip compression is supported. Example usage for producing compressed messages:
message1 = %KafkaEx.Protocol.Produce.Message{value: "value 1"}
message2 = %KafkaEx.Protocol.Produce.Message{key: "key 2", value: "value 2"}
messages = [message1, message2]
#snappy
produce_request = %KafkaEx.Protocol.Produce.Request{
topic: "test_topic",
partition: 0,
required_acks: 1,
compression: :snappy,
messages: messages}
KafkaEx.produce(produce_request)
#gzip
produce_request = %KafkaEx.Protocol.Produce.Request{
topic: "test_topic",
partition: 0,
required_acks: 1,
compression: :gzip,
messages: messages}
KafkaEx.produce(produce_request)
Compression is handled automatically on the consuming/fetching end.
It is strongly recommended to test using the Dockerized test cluster described below. This is required for contributions to KafkaEx.
NOTE You may have to run the test suite twice to get tests to pass. Due to asynchronous issues, the test suite sometimes fails on the first try.
Testing KafkaEx requires a local SSL-enabled Kafka cluster with 3 nodes: one node listening on each port 9092, 9093, and 9093. The easiest way to do this is using the scripts in this repository that utilize Docker and Docker Compose (both of which are freely available). This is the method we use for our CI testing of KafkaEx.
To launch the included test cluster, run
./scripts/docker_up.sh
The docker_up.sh
script will attempt to determine an IP address for your
computer on an active network interface. If it has trouble with this, you can
try manually specifying a network interface in the IP_IFACE
environment
variable:
IP_IFACE=eth0 ./scripts/docker_up.sh
The test cluster runs Kafka 0.9.2.
The KafkaEx tests are split up using tags to handle testing multiple scenarios and Kafka versions.
These tests do not require a Kafka cluster to be running.
mix test --no-start
If you are not using the Docker test cluster, you may need to modify
config/config.exs
for your set up.
The full test suite requires Kafka 0.9+.
The 0.9 client includes functionality that cannot be tested with older clusters.
mix test --include integration --include consumer_group --include server_0_p_9_p_0
Kafka 0.8.2 introduced the consumer group API.
mix test --include consumer_group --include integration
If your test cluster is older, the consumer group tests must be omitted.
mix test --include integration
This requires Elixir 1.3.2+.
mix dialyzer
All contributions are managed through the kafkaex github repo.
If you find a bug or would like to contribute, please open an issue or submit a pull request. Please refer to CONTRIBUTING.md for our contribution process.
KafkaEx has a Slack channel: #kafkaex on elixir-lang.slack.com. You can request an invite via http://bit.ly/slackelixir. The Slack channel is appropriate for quick questions or general design discussions. The Slack discussion is archived at http://slack.elixirhq.com/kafkaex.