Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions kafka-mesh/example.rst
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,38 @@ produce and fetch request counts:
kafka.kafka_mesh.request.metadata_request: 8


Step 8: Test high-volume producing with batched records
*******************************************************

In production, Kafka producers often batch multiple records into a single
``ProduceRequest`` to improve throughput. The mesh filter must correctly handle
these batched requests and route records to the appropriate upstream cluster.

Send multiple messages rapidly to the ``apricots`` topic (which routes to
``cluster1`` based on the ``a`` prefix). The producer will automatically batch
these into fewer requests:

.. code-block:: console

$ docker compose run --rm kafka-client /bin/bash -c " \
for i in {1..20}; do \
echo \"apricot message \$i\"; \
done | kafka-console-producer --request-required-acks 1 --producer-property enable.idempotence=false --broker-list proxy:10000 --topic apricots"

Now verify that all 20 messages arrived at ``cluster1`` by consuming directly
from the upstream:

.. code-block:: console

$ docker compose run --rm kafka-client \
kafka-console-consumer --bootstrap-server kafka-cluster1:9092 --topic apricots --from-beginning --max-messages 20 | wc -l
20

This confirms that even though the producer may have batched the records into
multiple ``ProduceRequest``s, the mesh filter correctly routed all messages to the
appropriate cluster. This is critical for high-throughput production workloads.


.. seealso::

:ref:`Envoy Kafka mesh filter <config_network_filters_kafka_mesh>`
Expand Down
34 changes: 34 additions & 0 deletions kafka-mesh/verify.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,37 @@ stats_output=$(_curl "http://localhost:${PORT_ADMIN}/stats?filter=kafka")
echo "$stats_output" | grep "produce_request" | grep -v ": 0"
echo "$stats_output" | grep "fetch_request" | grep -v ": 0"
echo "$stats_output" | grep "metadata_request" | grep -v ": 0"

run_log "Test high-volume producing with batched records"
# Send 20 messages rapidly to trigger producer batching
kafka_client /bin/bash -c " \
for i in {1..20}; do \
echo \"apricot message \$i\"; \
done | kafka-console-producer --request-required-acks 1 --producer-property enable.idempotence=false --broker-list proxy:10000 --topic apricots"

run_log "Verify all 20 messages arrived at cluster1"
# Consume all messages and count them
message_count=$(kafka_client kafka-console-consumer --bootstrap-server kafka-cluster1:9092 --topic apricots --from-beginning --max-messages 20 2>/dev/null | wc -l)
message_count=${message_count:-0}
run_log "Received $message_count messages from apricots topic"

if [[ "$message_count" -eq 20 ]]; then
run_log "SUCCESS: All 20 messages arrived at cluster1"
else
echo "ERROR: Expected 20 messages but received $message_count" >&2
exit 1
fi

run_log "Verify produce metrics reflect the batched requests"
# Get the produce_request count - it should be greater than 0 and likely less than 20 (due to batching)
stats_output=$(_curl "http://localhost:${PORT_ADMIN}/stats?filter=kafka.kafka_mesh.request.produce_request")
produce_count=$(echo "$stats_output" | grep "produce_request:" | cut -f2 -d':' | tr -d ' ')
produce_count=${produce_count:-0}
run_log "Total produce_request count: $produce_count"

if [[ "$produce_count" -gt 0 ]]; then
run_log "SUCCESS: Produce requests tracked correctly (count: $produce_count)"
else
echo "ERROR: No produce requests tracked" >&2
exit 1
fi
51 changes: 49 additions & 2 deletions kafka/example.rst
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,55 @@ Check the admin operation metrics:
kafka.kafka_broker.request.create_partitions_request: 1


Step 10: Delete the topic
*************************
Step 10: Test consumer behavior with empty topic
************************************************

Kafka consumers can "long-poll" for messages - they connect and wait for data
even when none is available yet. This is normal Kafka behavior, and Envoy
correctly proxies these fetch requests even when they return no data.

Create a new empty topic to test this:

.. code-block:: console

$ export EMPTY_TOPIC="empty-topic-test"
$ docker compose run --rm kafka-client kafka-topics --bootstrap-server proxy:10000 --create --topic $EMPTY_TOPIC

Check the current fetch request count before consuming:

.. code-block:: console

$ curl -s "http://localhost:8001/stats?filter=kafka.kafka_broker.request.fetch_request" | grep "fetch_request:"
kafka.kafka_broker.request.fetch_request: 2

Now try to consume from the empty topic. The consumer will poll the broker
multiple times waiting for messages, then timeout after 5 seconds:

.. code-block:: console

$ docker compose run --rm kafka-client kafka-console-consumer --bootstrap-server proxy:10000 --topic $EMPTY_TOPIC --timeout-ms 5000

Even though no messages were received, Envoy proxied the fetch requests.
Check that the fetch metric increased:

.. code-block:: console

$ curl -s "http://localhost:8001/stats?filter=kafka.kafka_broker.request.fetch_request" | grep "fetch_request:"
kafka.kafka_broker.request.fetch_request: 12

The increased count proves that Envoy correctly handles long-polling fetch
requests, even when the topic is empty. This is important for applications
that need to wait for data to arrive.

Clean up the empty topic:

.. code-block:: console

$ docker compose run --rm kafka-client kafka-topics --bootstrap-server proxy:10000 --delete --topic $EMPTY_TOPIC


Step 11: Delete the main topic
******************************

Clean up by deleting the test topic:

Expand Down
30 changes: 30 additions & 0 deletions kafka/verify.sh
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,36 @@ kafka_client kafka-topics --bootstrap-server proxy:10000 \
run_log "Check create_partitions metric"
has_metric_with_at_least_1 "kafka.kafka_broker.request.create_partitions_request"

run_log "Test consumer with empty topic (long-polling behavior)"
EMPTY_TOPIC="empty-topic-test"

# Create a new empty topic
kafka_client kafka-topics --bootstrap-server proxy:10000 --create --topic $EMPTY_TOPIC

# Get the current fetch_request count
initial_fetch_count=$(_curl "http://localhost:${PORT_ADMIN}/stats?filter=kafka.kafka_broker.request.fetch_request" | grep "fetch_request:" | cut -f2 -d':' | tr -d ' ')
initial_fetch_count=${initial_fetch_count:-0}
run_log "Initial fetch_request count: $initial_fetch_count"

# Try to consume from the empty topic (will timeout after 5s with no messages, which is expected)
kafka_client kafka-console-consumer --bootstrap-server proxy:10000 --topic $EMPTY_TOPIC --timeout-ms 5000 || true

# Get the updated fetch_request count
updated_fetch_count=$(_curl "http://localhost:${PORT_ADMIN}/stats?filter=kafka.kafka_broker.request.fetch_request" | grep "fetch_request:" | cut -f2 -d':' | tr -d ' ')
updated_fetch_count=${updated_fetch_count:-0}
run_log "Updated fetch_request count: $updated_fetch_count"

# Verify that fetch requests increased (proving Envoy proxied the requests even though no data was returned)
if [[ $updated_fetch_count -gt $initial_fetch_count ]]; then
run_log "SUCCESS: Fetch requests increased from $initial_fetch_count to $updated_fetch_count"
else
echo "ERROR: Fetch requests did not increase (initial: $initial_fetch_count, updated: $updated_fetch_count)" >&2
exit 1
fi

# Clean up the empty topic
kafka_client kafka-topics --bootstrap-server proxy:10000 --delete --topic $EMPTY_TOPIC

run_log "Test delete topic"
kafka_client kafka-topics --bootstrap-server proxy:10000 --delete --topic $TOPIC

Expand Down
Loading