KafkaEx is an Elixir client for Apache Kafka with support for Kafka versions 0.10.0 and newer. KafkaEx requires Elixir 1.14+ and Erlang OTP 24+.
- HexDocs: http://hexdocs.pm/kafka_ex/
- GitHub: https://github.com/kafkaex/kafka_ex/
- Authentication: AUTH.md
- Contributing: CONTRIBUTING.md
- Features
- Quick Start
- Configuration
- Usage
- Authentication (SASL)
- Telemetry & Observability
- Error Handling & Resilience
- Testing
- Contributing
KafkaEx v1.0 uses Kayrock for Kafka protocol serialization with automatic API version negotiation—no manual version configuration needed.
- ✅ Producer - Single and batch message production with timestamps and headers
- ✅ Consumer - Message fetching with offset management
- ✅ Consumer Groups - Coordinated consumption with automatic partition assignment
- ✅ Compression - Gzip, Snappy, LZ4, and Zstd
- ✅ Authentication - SASL/PLAIN, SASL/SCRAM, OAUTHBEARER, AWS MSK IAM
- ✅ SSL/TLS - Secure connections with certificate-based authentication
- ✅ Topic Management - Create and delete topics programmatically
- ✅ Metadata API - Discover brokers, topics, and partitions
- ✅ Offset Management - Commit, fetch, and reset offsets
- ✅ Telemetry - Built-in observability with telemetry events
- ✅ Automatic Retries - Smart retry logic with exponential backoff
- Minimum: Kafka 0.10.0+
- Recommended: Kafka 0.11.0+ (for RecordBatch format, headers, timestamps)
- Tested with: Kafka 2.1.0 through 3.x
Add KafkaEx to your mix.exs dependencies:
def deps do
[
# For release candidate:
{:kafka_ex, "~> 1.0.0-rc.1"}
# For stable release (when available):
# {:kafka_ex, "~> 1.0"}
]
endThen run:
mix deps.get# Start a client
{:ok, client} = KafkaEx.API.start_client(brokers: [{"localhost", 9092}])
# Produce a message
{:ok, _metadata} = KafkaEx.API.produce_one(client, "my-topic", 0, "hello")
# Fetch messages from offset 0
{:ok, result} = KafkaEx.API.fetch(client, "my-topic", 0, 0)For production applications, define a module with the KafkaEx.API behaviour:
defmodule MyApp.Kafka do
use KafkaEx.API, client: MyApp.KafkaClient
end
# In your application.ex supervision tree:
children = [
{KafkaEx.Client, name: MyApp.KafkaClient, brokers: [{"localhost", 9092}]}
]
# Now call without passing client:
MyApp.Kafka.produce_one("my-topic", 0, "hello")
{:ok, messages} = MyApp.Kafka.fetch("my-topic", 0, 0)See KafkaEx.API documentation for the complete API reference.
KafkaEx can be configured via config.exs or by passing options directly to KafkaEx.API.start_client/1.
# config/config.exs
config :kafka_ex,
# List of Kafka brokers
brokers: [{"localhost", 9092}, {"localhost", 9093}],
# Client identifier
client_id: "my-app",
# Default consumer group
default_consumer_group: "my-consumer-group",
# Request timeout (milliseconds)
sync_timeout: 10_000config :kafka_ex,
brokers: [{"kafka.example.com", 9093}],
use_ssl: true,
ssl_options: [
cacertfile: "/path/to/ca-cert.pem",
certfile: "/path/to/client-cert.pem",
keyfile: "/path/to/client-key.pem",
verify: :verify_peer
]config :kafka_ex,
default_consumer_group: "my-group",
# Metadata refresh interval (milliseconds)
consumer_group_update_interval: 30_000,
# Auto-commit settings
commit_interval: 5_000, # Commit every 5 seconds
commit_threshold: 100, # Or every 100 messages
# What to do when no offset exists
auto_offset_reset: :earliest # or :latestCompression is set per-request, not globally:
# Produce with gzip compression
KafkaEx.API.produce(client, "topic", 0, messages, compression: :gzip)
# Supported: :none (default), :gzip, :snappy, :lz4, :zstdFor Snappy compression, add to mix.exs:
{:snappyer, "~> 1.2"}You can use MFA or anonymous functions for dynamic broker resolution:
# Using MFA tuple
config :kafka_ex,
brokers: {MyApp.Config, :get_kafka_brokers, []}
# Using anonymous function
config :kafka_ex,
brokers: fn -> Application.get_env(:my_app, :kafka_brokers) endSee KafkaEx.Config for all available options.
# Single message
{:ok, metadata} = KafkaEx.API.produce_one(
client,
"my-topic",
0, # partition
"hello world" # message value
)
# With message key (for partition routing)
{:ok, metadata} = KafkaEx.API.produce_one(
client,
"my-topic",
0,
"message value",
key: "user-123"
)
# Batch produce
messages = [
%{value: "message 1", key: "key1"},
%{value: "message 2", key: "key2"},
%{value: "message 3", key: "key3"}
]
{:ok, metadata} = KafkaEx.API.produce(client, "my-topic", 0, messages)# Fetch from specific offset
{:ok, result} = KafkaEx.API.fetch(client, "my-topic", 0, 100)
result.records
|> Enum.each(fn record ->
IO.puts("Offset: #{record.offset}, Value: #{record.value}")
end)
# Fetch all messages (earliest to high watermark)
{:ok, result} = KafkaEx.API.fetch_all(client, "my-topic", 0)Consumer groups provide coordinated consumption with automatic partition assignment and offset management.
defmodule MyApp.MessageConsumer do
use KafkaEx.Consumer.GenConsumer
require Logger
# Messages are delivered in batches
def handle_message_set(message_set, state) do
Enum.each(message_set, fn record ->
Logger.info("Processing: #{inspect(record.value)}")
# Process your message here
end)
# Commit offsets asynchronously
{:async_commit, state}
end
endAvailable commit strategies:
{:async_commit, state}- Commit in background (recommended){:sync_commit, state}- Wait for commit to complete
# In your application.ex
def start(_type, _args) do
children = [
# Start the consumer group
%{
id: MyApp.MessageConsumer,
start: {
KafkaEx.Consumer.ConsumerGroup,
:start_link,
[
MyApp.MessageConsumer, # Your consumer module
"my-consumer-group", # Consumer group ID
["topic1", "topic2"], # Topics to consume
[
# Optional configuration
commit_interval: 5_000,
commit_threshold: 100,
auto_offset_reset: :earliest
]
]
}
}
]
Supervisor.start_link(children, strategy: :one_for_one)
endSee KafkaEx.Consumer.GenConsumer for details.
# Get all topics
{:ok, metadata} = KafkaEx.API.metadata(client)
# Get specific topics
{:ok, metadata} = KafkaEx.API.metadata(client, ["topic1", "topic2"])
# Inspect partitions
metadata.topics
|> Enum.each(fn topic ->
IO.puts("Topic: #{topic.name}, Partitions: #{length(topic.partitions)}")
end)# Get latest offset for a partition
{:ok, offset} = KafkaEx.API.latest_offset(client, "my-topic", 0)
# Get earliest offset
{:ok, offset} = KafkaEx.API.earliest_offset(client, "my-topic", 0)
# List offsets by timestamp
timestamp = DateTime.utc_now() |> DateTime.add(-3600, :second) |> DateTime.to_unix(:millisecond)
partition_request = %{partition_num: 0, timestamp: timestamp}
{:ok, offsets} = KafkaEx.API.list_offsets(client, [{"my-topic", [partition_request]}])
# Fetch committed offset for consumer group
partitions = [%{partition_num: 0}]
{:ok, offsets} = KafkaEx.API.fetch_committed_offset(
client,
"my-consumer-group",
"my-topic",
partitions
)
# Commit offset for consumer group
partitions = [%{partition_num: 0, offset: 100}]
{:ok, result} = KafkaEx.API.commit_offset(
client,
"my-consumer-group",
"my-topic",
partitions
)# Create a topic
{:ok, result} = KafkaEx.API.create_topic(
client,
"new-topic",
num_partitions: 3,
replication_factor: 2,
config_entries: %{
"retention.ms" => "86400000",
"compression.type" => "gzip"
}
)
# Delete a topic
{:ok, result} = KafkaEx.API.delete_topic(client, "old-topic")KafkaEx supports multiple compression formats. Compression is applied per-request:
# Gzip compression (built-in)
{:ok, _} = KafkaEx.API.produce(
client,
"my-topic",
0,
messages,
compression: :gzip
)
# Snappy compression (requires snappyer package)
{:ok, _} = KafkaEx.API.produce(
client,
"my-topic",
0,
messages,
compression: :snappy
)
# LZ4 compression (built-in, Kafka 0.9.0+)
{:ok, _} = KafkaEx.API.produce(
client,
"my-topic",
0,
messages,
compression: :lz4
)
# Zstd compression (built-in, Kafka 2.1.0+)
{:ok, _} = KafkaEx.API.produce(
client,
"my-topic",
0,
messages,
compression: :zstd
)Supported Formats:
| Format | Kafka Version | Dependency Required |
|---|---|---|
:gzip |
0.7.0+ | None (built-in) |
:snappy |
0.8.0+ | {:snappyer, "~> 1.2"} |
:lz4 |
0.9.0+ | None (built-in) |
:zstd |
2.1.0+ | None (built-in) |
Decompression is handled automatically when consuming messages.
KafkaEx supports multiple SASL authentication mechanisms for secure connections to Kafka clusters.
Simple username/password authentication. Always use with SSL/TLS to protect credentials.
config :kafka_ex,
brokers: [{"kafka.example.com", 9092}],
use_ssl: true,
ssl_options: [verify: :verify_peer, cacertfile: "/path/to/ca.pem"],
sasl: %{
mechanism: :plain,
username: "alice",
password: "secret123"
}Challenge-response authentication (more secure than PLAIN).
config :kafka_ex,
brokers: [{"kafka.example.com", 9092}],
use_ssl: true,
sasl: %{
mechanism: :scram,
username: "alice",
password: "secret123",
mechanism_opts: %{algo: :sha256} # or :sha512
}OAuth 2.0 token-based authentication.
config :kafka_ex,
brokers: [{"kafka.example.com", 9092}],
use_ssl: true,
sasl: %{
mechanism: :oauthbearer,
mechanism_opts: %{
token_provider: &MyApp.get_oauth_token/0,
extensions: %{"traceId" => "optional-data"}
}
}AWS IAM authentication for Amazon Managed Streaming for Kafka (MSK).
config :kafka_ex,
brokers: [{"msk-cluster.region.amazonaws.com", 9098}],
use_ssl: true,
sasl: %{
mechanism: :msk_iam,
mechanism_opts: %{
region: "us-east-1"
# Credentials automatically resolved from environment
}
}Authentication Requirements:
| Mechanism | Minimum Kafka | SSL Required | Notes |
|---|---|---|---|
| PLAIN | 0.9.0+ | ✅ Yes | Never use without SSL/TLS |
| SCRAM | 0.10.2+ | Challenge-response, more secure | |
| OAUTHBEARER | 2.0+ | Requires token provider | |
| MSK_IAM | MSK 2.7.1+ | ✅ Yes | AWS-specific |
See AUTH.md for detailed authentication setup and troubleshooting.
KafkaEx emits telemetry events for monitoring connections, requests, and consumer operations.
| Category | Events | Description |
|---|---|---|
| Connection | 4 | Connect, disconnect, reconnect, close |
| Request | 4 | Request start/stop/exception, retry |
| Produce | 4 | Produce start/stop/exception, batch metrics |
| Fetch | 4 | Fetch start/stop/exception, messages received |
| Offset | 4 | Commit/fetch offset operations |
| Consumer | 8 | Group join, sync, heartbeat, rebalance, message processing |
| Metadata | 4 | Cluster metadata updates |
| SASL Auth | 6 | PLAIN/SCRAM authentication spans |
defmodule MyApp.KafkaTelemetry do
require Logger
def attach do
:telemetry.attach_many(
"my-kafka-handler",
[
[:kafka_ex, :connection, :stop],
[:kafka_ex, :request, :stop],
[:kafka_ex, :produce, :stop],
[:kafka_ex, :fetch, :stop]
],
&handle_event/4,
nil
)
end
def handle_event([:kafka_ex, :connection, :stop], measurements, metadata, _config) do
Logger.info("Connected to #{metadata.host}:#{metadata.port} in #{measurements.duration / 1_000_000}ms")
end
def handle_event([:kafka_ex, :request, :stop], measurements, metadata, _config) do
Logger.debug("Request #{metadata.api_key} took #{measurements.duration / 1_000_000}ms")
end
def handle_event([:kafka_ex, :produce, :stop], measurements, metadata, _config) do
Logger.info("Produced #{measurements.message_count} messages to #{metadata.topic}")
end
def handle_event([:kafka_ex, :fetch, :stop], measurements, metadata, _config) do
Logger.info("Fetched #{measurements.message_count} messages from #{metadata.topic}")
end
endThen in your application startup:
# application.ex
def start(_type, _args) do
MyApp.KafkaTelemetry.attach()
# ...
endSee KafkaEx.Telemetry for the complete event reference.
KafkaEx v1.0 includes smart error handling and retry logic for production resilience.
- Producer requests - Automatically retry on leadership-related errors (
not_leader_for_partition,leader_not_available) with metadata refresh - Offset commits - Retry transient errors (timeout, coordinator not available) with exponential backoff
- API version negotiation - Retry parse errors during initial connection
Consumer groups handle transient errors gracefully following the Java client pattern (KAFKA-6829):
unknown_topic_or_partitiontriggers retry instead of crash- Heartbeat errors trigger rejoin for recoverable errors
- Exponential backoff for join retries (1s→10s, up to 6 attempts)
Important: Produce requests only retry on leadership errors where we know the message wasn't written. Timeout errors are NOT retried to prevent potential duplicate messages.
For truly idempotent produces, enable enable.idempotence=true on your Kafka cluster (requires Kafka 0.11+).
When using certain versions of OTP, random timeouts can occur with SSL.
Impacted versions:
- OTP 21.3.8.1 → 21.3.8.14
- OTP 22.1 → 22.3.1
Solution: Upgrade to OTP 21.3.8.15 or 22.3.2+.
Run tests that don't require a live Kafka cluster:
mix test.unitKafkaEx includes a Dockerized test cluster with 3 Kafka brokers configured with different authentication mechanisms:
Ports:
- 9092-9094: No authentication (SSL)
- 9192-9194: SASL/PLAIN (SSL)
- 9292-9294: SASL/SCRAM (SSL)
- 9392-9394: SASL/OAUTHBEARER (SSL)
Start the test cluster:
./scripts/docker_up.shRun all tests:
# Unit tests
mix test.unit
# Integration tests
mix test.integration
# All tests together
mix testRun specific test categories:
mix test --only consumer_group
mix test --only produce
mix test --only consume
mix test --only authRun SASL tests:
MIX_ENV=test mix test --include saslmix format # Format code
mix format --check-formatted
mix credo --strict # Linting
mix dialyzer # Type checkingAll contributions are managed through the KafkaEx GitHub repo.
- Issues: github.com/kafkaex/kafka_ex/issues
- Pull Requests: See CONTRIBUTING.md for our contribution process
- Slack: #kafkaex on elixir-lang.slack.com (request invite)
- Slack Archive: slack.elixirhq.com/kafkaex
KafkaEx is released under the MIT License. See LICENSE for details.