Handy tool for quick producing/consuming Kafka messages and more.

1. What is kafkacat ?

Citing official README:

kafkacat is a generic non-JVM producer and consumer for Apache Kafka >=0.8, think of it as a netcat for Kafka.

Simply put — it’s very handy tool to work with Kafka.

To understand what exactly the tools does, one should try it out. So, below I give simple walkthrough, that demonstrates how to set-up cluster with 1 broker and produce/consume messages with kafkacat. Without further ado — let’s start.

2. Setting-up local Kafka cluster with one broker

Let’s quickly bootstrap local Kafka cluster:

version: "3"

services:

  kafka-broker:
    image: confluentinc/cp-kafka:latest
    container_name: kafka-broker
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_MESSAGE_MAX_BYTES: 10000000

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2

Note: KAFKA_AUTO_CREATE_TOPICS_ENABLE is set to "true" deliberately, so we can experiment with producing /consuming without setting up topics beforehand.

3. Using kafkacat

kafkacat docker image is available on Docker Hub. So, if you have docker installed — you can spend no time on installation, and just start using it.

3.1. Metadata mode

List brokers and topics in cluster

This will print cluster metadata:

$ docker run --tty --rm --interactive \
             --network=host \
             confluentinc/cp-kafkacat \
             kafkacat -b localhost:9092 -L

-b — broker host:port
-L — metadata mode (will list brokers and topics in the cluster)

The above command outputs:

Metadata for all topics (from broker 1: localhost:9092/1):
 1 brokers:
  broker 1 at localhost:9092
 1 topics:
  topic "__confluent.support.metrics" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1

3.2. Producer mode

Producing messages from a file

Let’s create file with messages:

$ cat >> /tmp/orders.txt <<EOF
> 1:{"order_id":1,"order_ts":1534772501276,"total_amount":10.50,"customer_name":"Bob Smith"}
> 2:{"order_id":2,"order_ts":1534772605276,"total_amount":3.32,"customer_name":"Sarah Black"}
> 3:{"order_id":3,"order_ts":1534772742276,"total_amount":21.00,"customer_name":"Emma Turner"}
> EOF

Next, lets send messages from file to orders topic:

$ docker run --tty --rm --interactive \
             --network=host \
             --volume /tmp/orders.txt:/data/orders.txt \
             confluentinc/cp-kafkacat \
             kafkacat -b locahost:9092 -t orders -P -l /data/orders.txt

-b — broker host:port
-t — topic to produce to
-P — produce mode
-l — send messages from a file (only one file allowed)

Producing messages inline

This will send three messages, with given key:value:

$ docker run --interactive --rm \
             --network=host \
             confluentinc/cp-kafkacat \
             kafkacat -b localhost:9092 -t orders -K: -P <<EOF
4:{"order_id":4,"order_ts":1534772801276,"total_amount":11.50,"customer_name":"Alina Smith"}
5:{"order_id":5,"order_ts":1534772905276,"total_amount":13.32,"customer_name":"Alex Black"}
6:{"order_id":6,"order_ts":1534773042276,"total_amount":31.00,"customer_name":"Emma Watson"}
EOF

-b — broker host:port
-t — topic to consume from
-K: — print message keys prefixing the message with :
-P — produce mode
<<EOF …​ EOF — a here document, that redirects messages to be produced

3.3. Consumer mode

Consuming messages from a topic

$ docker run --tty --rm --interactive \
             --network=host \
             confluentinc/cp-kafkacat \
             kafkacat -C -b localhost:9092 -K: \
                      -f '\nKey (%K bytes): %k\t\nValue (%S bytes): %s\n\Partition: %p\tOffset: %o\n--\n' \
                      -t orders -c 1

-C — consume mode
-b — broker host:port
-K: — print message keys prefixing the message with :
-f — output formatting string
-t — topic to consume from
-c — exit after producing 1 message

The above command will consume all messages from orders topic:

Key (-1 bytes):
Value (90 bytes): 1:{"order_id":1,"order_ts":1534772501276,"total_amount":10.50,"customer_name":"Bob Smith"}
Partition: 0	Offset: 0

3.4. Query mode

Query mode allows to query offset by timestamp in the following format:

kafkacat -Q -b broker -t <topic>:<partition>:<timestamp>

Consuming offset from a topic

$ docker run --tty --rm --interactive \
             --network=host \
             confluentinc/cp-kafkacat \
             kafkacat -Q -b localhost:9092 -t orders:0:-1

-Q — query mode
-b — broker host:port
-t — topic to consume from

The above command will output:

orders [0] offset 6

Now that we have the offset, let’s query all messages after specified offset:

$ docker run --tty --rm --interactive \
             --network=host \
             confluentinc/cp-kafkacat \
             kafkacat -q -b localhost:9092 -t orders -p 0 -o 5

-q — be quite (verbosity set to 0)
-p — partition
-o — offset to start consuming from

4. Conclusion

That’s it for now. Hopefully, you learnt something interesting or useful ;)

Oleksii Zghurskyi