Kafka Provision Spring Boot Starter enables distributed Kafka topics provisioning and centralized topic configs management.

Overview

Kafka Provision Spring Boot Starter supports following set of features:

  • creating new topics

  • adding partitions to the existing topics

  • setting/updating topic configurations

In this post I will describe the process of creating application, that uses the starter.

Demo application description

To not overshadow main goal, the demo application will be really simple. We will create 2 "microservices":

  • The first service will produce tasks and push them to the tasks topic

  • The second service will pull the tasks from the topic, sleep randomly and send events to results topic

So, everything sounds really simple - let’s overengineer it as much as possible!

Setting up basic Spring Boot services

First, let’s bootstrap Spring Boot app with Spring Cloud Stream, Spring Kafka and Lombok support:

$ curl https://start.spring.io/starter.zip \
-d dependencies=cloud-stream,kafka,lombok \
-d type=gradle-project \
-d baseDir=task-producer \
-d groupId=com.oxymorus.kafka.producer \
-d artifactId=task-producer \
-o task-producer.zip
$ unzip task-producer.zip && rm task-producer.zip

Just a quick note: Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.

The core building blocks of Spring Cloud Stream are:

  • Destination Binders: Components responsible to provide integration with the external messaging systems.

  • Destination Bindings: Bridge between the external messaging systems and application provided Producers and Consumers of messages (created by the Destination Binders).

  • Message: The canonical data structure used by producers and consumers to communicate with Destination Binders (and thus other applications via external messaging systems).

To get more detail just read the official reference here.

Ok, let’s get back to our main course and bootstrap task-consumer service:

$ curl https://start.spring.io/starter.zip -d dependencies=cloud-stream,kafka,lombok \
-d type=gradle-project \
-d baseDir=task-consumer \
-d groupId=com.oxymorus.kafka.consumer \
-d artifactId=task-consumer \
-o task-consumer.zip
$ unzip task-consumer.zip && rm task-consumer.zip

Configuring Kafka topics

As described earlier two created services will communicate over Kafka topics: tasks and results. So, we need to create & configure these topics. Here, comes the time for Kafka Provision Spring Boot Starter.

We will do this in three steps:

  1. Add dependency

  2. Add @EnableTopicProvisioning

  3. Configure topics

Let’s do this procedure step-by-step for task-producer service. First, let’s add dependency to build.gradle:

dependencies {
    implementation 'io.github.zghurskyi.kafka:kafka-provision-spring-boot-starter:0.0.1'
}

Next, let’s add @EnableTopicProvisioning to TaskProducerApp class:

package com.oxymorus.kafka.producer;

import io.github.zghurskyi.kafka.EnableTopicProvisioning;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableTopicProvisioning
public class TaskProducerApp {

  public static void main(String[] args) {
    SpringApplication.run(TaskProducerApp.class, args);
  }
}

And finally, let’s configure required topics:

kafka.provision:
  brokers: localhost:9092
  topics:
  - name: tasks
    numPartitions: 4
    replicationFactor: 1
    configs:
      cleanup.policy: delete
      retention.ms: 3600000
  - name: results
    numPartitions: 4
    replicationFactor: 1
    configs:
      cleanup.policy: delete
      retention.ms: 3600000

The above steps are similar for task-consumer service.

The details of setting up Spring Cloud Stream & Kafka in Spring Boot app deserve separate blog post, so to just stay on point I will skip them. You can find completed demo app here.

Setting up Kafka infrastructure

For the purposes of this demo we set up infrastructure with following docker-compose.yml:

version: '3'

services:

    kafka:
        image: confluentinc/cp-kafka:latest
        container_name: kafka
        ports:
        - "9092:9092"
        depends_on:
        - zookeeper
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
          KAFKA_MESSAGE_MAX_BYTES: 10000000

    zookeeper:
        image: confluentinc/cp-zookeeper:latest
        container_name: zookeeper
        ports:
        - "2181:2181"
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
          ZOOKEEPER_INIT_LIMIT: 5
          ZOOKEEPER_SYNC_LIMIT: 2

Putting everything together

The time has come to start everything up:

  • Boot up Kafka with docker-compose.yml:

$ cd kafka-provision-examples/
$ docker-compose up
  • Build and start task-producer:

$ ./task-producer/gradlew -b ./task-producer/build.gradle clean build
$ java -jar task-producer/build/libs/task-producer-0.0.1-SNAPSHOT.jar
  • Build and start task-consumer:

$ ./task-consumer/gradlew -b ./task-consumer/build.gradle clean build
$ java -jar task-consumer/build/libs/task-consumer-0.0.1-SNAPSHOT.jar

After starting everything up, we will see something like this in the logs:

task_producer    | 2019-04-21 10:27:49.071  INFO 1 --- [container-0-C-1] c.o.kafka.bindings.ResultsListener       : Received: ResultMessage(action=EAT, status=SUCCESS)
task_producer    | 2019-04-21 10:27:49.191  INFO 1 --- [   scheduling-1] c.o.kafka.bindings.TasksPublisher        : Published: TaskMessage(task=Task(action=SLEEP))
task_producer    | 2019-04-21 10:27:49.413  INFO 1 --- [container-0-C-1] c.o.kafka.bindings.ResultsListener       : Received: ResultMessage(action=WRITE_CODE, status=SUCCESS)
task_producer    | 2019-04-21 10:27:50.191  INFO 1 --- [   scheduling-1] c.o.kafka.bindings.TasksPublisher        : Published: TaskMessage(task=Task(action=SLEEP))
task_producer    | 2019-04-21 10:27:50.826  INFO 1 --- [container-0-C-1] c.o.kafka.bindings.ResultsListener       : Received: ResultMessage(action=EAT, status=FAIL)
task_producer    | 2019-04-21 10:27:52.945  INFO 1 --- [container-0-C-1] c.o.kafka.bindings.ResultsListener       : Received: ResultMessage(action=SLEEP, status=SKIP_THIS_TIME)
task_producer    | 2019-04-21 10:27:53.191  INFO 1 --- [   scheduling-1] c.o.kafka.bindings.TasksPublisher        : Received: ResultMessage(action=WRITE_CODE, status=SUCCESS)

This indicates, that everything works as it’s supposed to :)

Checking topics configuration

Now let’s checkout Kafka topic configs, that were provisioned by starter:

$ docker exec -ti kafka /bin/bash
[email protected]:/# kafka-topics --zookeeper zookeeper:2181 --list
__confluent.support.metrics
__consumer_offsets
results
tasks
[email protected]:/# kafka-topics --zookeeper zookeeper:2181 --describe --topic tasks
Topic:tasks    PartitionCount:4    ReplicationFactor:1    Configs:retention.ms=360000,cleanup.policy=delete
Topic: tasks    Partition: 0    Leader: 1    Replicas: 1    Isr: 1
Topic: tasks    Partition: 1    Leader: 1    Replicas: 1    Isr: 1
Topic: tasks    Partition: 2    Leader: 1    Replicas: 1    Isr: 1
Topic: tasks    Partition: 3    Leader: 1    Replicas: 1    Isr: 1
[email protected]:/# kafka-topics --zookeeper zookeeper:2181 --describe --topic results
Topic:results    PartitionCount:4    ReplicationFactor:1    Configs:retention.ms=360000,cleanup.policy=delete
Topic: results    Partition: 0    Leader: 1    Replicas: 1    Isr: 1
Topic: results    Partition: 1    Leader: 1    Replicas: 1    Isr: 1
Topic: results    Partition: 2    Leader: 1    Replicas: 1    Isr: 1
Topic: results    Partition: 3    Leader: 1    Replicas: 1    Isr: 1

So, as we can see Kafka Provision Spring boot starter has created required topics for us and added specified configs.

Oleksii Zghurskyi