Fault-tolerant and reliable messaging with Kafka and Spring Boot

  1. Fault-tolerant and reliable messaging with Kafka
  2. Fault-tolerant and reliable messaging with Kafka and Spring Boot

This is the second article in the Reliable Kafka messaging series. If you haven’t seen the previous article, make sure you read it before continuing. It’s important that you are familiar with the concepts before trying to apply them.

Quick recap. We want to achieve a fault-tolerant and reliable Kafka messaging based communication between components. I covered 2 main scenarios in the previous article:

  • Crashing the service when a message is under processing
  • The message cannot be processed due to the unavailability of an external system, e.g. database

These 2 use-cases were discussed in the context of the at-most once delivery guarantee that comes with the Kafka auto-commit feature. One was to achieve redelivery of messages when the consumer is crashed in the middle of the processing. The other one was to achieve a way of retry the message processing in case a recoverable error happens, e.g. the database is unavailable for a short amount of time.

In this article, I’d like to show an example solution for those 2 problems using Spring Boot. Within the example implementation, I take Kafka as a managed service. If your system needs to be prepared for handling cases when Kafka is down, you need to add other measures as well to have the at-least-once delivery guarantee.

Initial setup

Let’s jump right into it. I’m gonna start off with a standard Spring Boot project that I generated at start.spring.io and I added the Kafka dependecy. I’m naming the project normal-topic-consumer just for further reference in the article.

First off, we need to set up the infrastructure a.k.a. Kafka and Zookeeper. I’m using the following docker-compose.yml on Docker for Windows:

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
    hostname: zookeeper
  kafka:
    image: wurstmeister/kafka
    command: [start-kafka.sh]
    ports:
      - "9092:9092"
    hostname: kafka
    environment:
      KAFKA_CREATE_TOPICS: "normal-topic:1:1"
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    depends_on:
      - "zookeeper"

We’ll use one topic for now, the normal-topic with single replication and single partitioning.

After executing a docker-compose up, we have the infrastructure ready to go.

Now let’s create a consumer in the Spring Boot application. Its gonna be very simple, trust me. We’ll need one new class, I’m calling it NormalTopicConsumer. The only thing it will do for now is to log out the read messages. One more thing here, I’m using Lombok because I’m too lazy to write out the loggers.

@Component
@Slf4j
public class NormalTopicConsumer {
    @KafkaListener(id = "normal-topic-consumer", groupId = "normal-topic-group", topics = "normal-topic")
    public void consume(ConsumerRecord<?, ?> consumerRecord) {
        String json = consumerRecord.value().toString();
        log.info("Consuming normal message {}", json);
    }
}

application.properties:

spring.kafka.consumer.bootstrap-servers=localhost:9092

If we start up the application as well with the Kafka infra behind, we can see the messages are being consumed. Of course first we should send some messages to the topic. To do that, I’m gonna go with the easiest solution. Get a terminal into the Kafka container and use the kafka-console-producer.sh to send messages.

As the first step, we need to find out what the container id is, you can use docker ps for that. And then exec into the container:

$ docker exec -it a0a7 bash

Then get into producer mode and send an example JSON message:

$ sh /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic normal-topic
>{"data":"test"}

On the application side, we should see the following log:

[-consumer-0-C-1] c.a.b.n.NormalTopicConsumer : Consuming normal message {"data":"test"}

Awesome, let’s continue.

Implementing manual commit

The implementation is fairly simple for manual commit mode. There is a little configuration that needs change to disable the auto-commit mode and enable the manual one. And then a little tweaking on the code.

application.properties:

spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE

The different types of ack modes are available here. There are 2 of them which are relevant. MANUAL and MANUAL_IMMEDIATE. The difference is described on the mentioned page. For the sake of the article, I’m gonna go with MANUAL_IMMEDIATE.

The code change is very simple. The only thing we need to do is to extend the parameter list of the consumer method and add an Acknowledgement parameter there. Spring will automatically populate it. The object provides an acknowledge method that manually commits the read offset. The MANUAL_IMMEDIATE ack mode sets up the consumer in a way that as soon as the acknowledge method is called, it will immediately tell the broker that the consumer has successfully processed the message.

@Component
@Slf4j
public class NormalTopicConsumer {
    @KafkaListener(id = "normal-topic-consumer", groupId = "normal-topic-group", topics = "normal-topic")
    public void consume(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
        String json = consumerRecord.value().toString();
        log.info("Consuming normal message {}", json);
        ack.acknowledge();
    }
}

If you start up the application in debug mode and you put a breakpoint before the acknowledge call. In the meantime you send a message via the console producer. When the execution has suspended at the breakpoint and you kill the application, the offset is not yet committed. Starting up the application will result in the same message redelivered and reprocessed again. That’s the behavior we wanted to achieve.

Implementing the DLQ

As I already covered in the previous article, there are several ways to pass along data to the DLQ. It can be a custom message format, it can be shared via Kafka headers, etc. I’m gonna use Kafka headers here because I think its a much cleaner implementation when you don’t pollute your actual message payload with this level of detail.

In the docker-compose.yml, I’ll add the new dlq-topic with replication factor 1 and partition 1.

KAFKA_CREATE_TOPICS: "normal-topic:1:1,dlq-topic:1:1"

The purpose of the DLQ now is to have a place to send failed messages. The NormalTopicConsumer class needs to be changed a little bit:

@Component
@Slf4j
public class NormalTopicConsumer {
    public static final String ORIGINAL_TOPIC_HEADER_KEY = "originalTopic";
    public static final String DLQ_TOPIC = "dlq-topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @KafkaListener(id = "normal-topic-consumer", groupId = "normal-topic-group", topics = "normal-topic")
    public void consume(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
        String json = consumerRecord.value().toString();
        try {
            log.info("Consuming normal message {}", json);
            // Simulating an error case
            // throw new RuntimeException();
        } catch (Exception e) {
            log.info("Message consumption failed for message {}", json);
            String originalTopic = consumerRecord.topic();
            ProducerRecord<String, String> record = new ProducerRecord<>(DLQ_TOPIC, json);
            record.headers().add(ORIGINAL_TOPIC_HEADER_KEY, originalTopic.getBytes(UTF_8));
            kafkaTemplate.send(record);
        } finally {
            ack.acknowledge();
        }
    }
}


There are a few things I’ve done here. One is, I’ve put the processing logic into a try clause so that any errors falling out of the logic will be caught and we can take the necessary steps to recover. Obviously for now it’s only a logging and that’s not going to fail anytime soon but that’s why I’ve put there the line to throw a new RuntimeException. Just uncomment it and you’ll be in the same situation when the processing logic fails.

Another thing is the catch clause. I’m creating a ProducerRecord that holds the payload and stores which topic it is supposed to be sent to (dlq-topic). Last but not least, adding the currently processed topic name under the originalTopic key as a header on the message. Since the Kafka client API only accepts bytes as headers, the String value must be converted first. I know it’s not pretty but nothing we can’t live with.

And I’ve added a finally clause as well to always commit the offset, even if it failed. If the processing has successfully completed, then commit because of that. If the processing has failed, we still need to commit it after we’ve sent it to the DLQ, otherwise the consumer will receive the same message from the broker and will keep failing. That’s not what we want.

Now the other side, the DLQ. In normal circumstances it would be enough to just send the failed messages to the DLQ but since I want to use it as a way of retry, we need to have some logic there. I’m gonna use another service for that. Same deal, generated project just like the normal-topic-consumer service. I’m calling it dlq-topic-consumer.

The configuration is the same as for the normal-topic-consumer.

application.properties:

spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE

One Spring bean for the consumption logic, DlqTopicConsumer:

@Component
@Slf4j
public class DlqTopicConsumer {
    public static final String ORIGINAL_TOPIC_HEADER_KEY = "originalTopic";
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @KafkaListener(id = "dlq-topic-consumer", groupId = "dlq-topic-group", topics = "dlq-topic")
    public void consume(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
        String json = consumerRecord.value().toString();
        try {
            Header originalTopicHeader = consumerRecord.headers().lastHeader(ORIGINAL_TOPIC_HEADER_KEY);
            if (originalTopicHeader != null) {
                String originalTopic = new String(originalTopicHeader.value(), UTF_8);
                log.info("Consuming DLQ message {} from originalTopic {}", json, originalTopic);
            } else {
                log.error("Unable to read DLQ message because it's missing the originalTopic header");
            }
        } catch (Exception e) {
            log.error("Unable to process DLQ message {}", json);
        } finally {
            ack.acknowledge();
        }
    }
}


It does a single thing. In case the message has the originalTopic header available, it just logs out the message along with the original topic. For now this is sufficient but we’re gonna build a more sophisticated solution when we reach the retry logic.

If you start up the 2 services with the RuntimeException in place within the normal-topic-consumer and you send a message to the normal-topic. The normal-topic-consumer will pick it up, the processing will fail and it will send the same message to the dlq-topic and commit the offset. When the message is sent to the DLQ topic, the dlq-topic-consumer service kicks in. It will pick up the message and logs it. Awesome.

Implementing the retry logic

Hope you are still with me. The last thing we want to do is the retry mechanism.

The way it’s going to work is this. The DLQ consumer will set a retryCount header on the message when it’s resending it to the original topic. If the retryCount header is not present yet in the incoming DLQ message, it will set it to zero. If it’s already there, it will read it out, increment it and set the new value to the outgoing message. On the other side, the normal topic consumer will copy the retryCount header to the DLQ message in case it’s available.

When the DLQ consumer is about to resend the message, it will check the retry count against a threshold, I’ll use 5 as a threshold here but you can use whatever value that suits you.

The code looks the following:

NormalTopicConsumer:

@Component
@Slf4j
public class NormalTopicConsumer {
    public static final String RETRY_COUNT_HEADER_KEY = "retryCount";
    public static final String ORIGINAL_TOPIC_HEADER_KEY = "originalTopic";
    public static final String DLQ_TOPIC = "dlq-topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @KafkaListener(id = "normal-topic-consumer", groupId = "normal-topic-group", topics = "normal-topic")
    public void consume(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
        String json = consumerRecord.value().toString();
        try {
            log.info("Consuming normal message {}", json);
            throw new RuntimeException();
        } catch (Exception e) {
            log.info("Message consumption failed for message {}", json);
            String originalTopic = consumerRecord.topic();
            ProducerRecord<String, String> record = new ProducerRecord<>(DLQ_TOPIC, json);
            record.headers().add(ORIGINAL_TOPIC_HEADER_KEY, originalTopic.getBytes(UTF_8));

            Header retryCount = consumerRecord.headers().lastHeader(RETRY_COUNT_HEADER_KEY);
            if (retryCount != null) {
                record.headers().add(retryCount);
            }
            kafkaTemplate.send(record);
        } finally {
            ack.acknowledge();
        }
    }
}

DlqTopicConsumer:

@Component
@Slf4j
public class DlqTopicConsumer {
    public static final String RETRY_COUNT_HEADER_KEY = "retryCount";
    public static final String ORIGINAL_TOPIC_HEADER_KEY = "originalTopic";
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @KafkaListener(id = "dlq-topic-consumer", groupId = "dlq-topic-group", topics = "dlq-topic")
    public void consume(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
        String json = consumerRecord.value().toString();
        try {
            log.info("Consuming DLQ message {}", json);
            Header originalTopicHeader = consumerRecord.headers().lastHeader(ORIGINAL_TOPIC_HEADER_KEY);
            if (originalTopicHeader != null) {
                String originalTopic = new String(originalTopicHeader.value(), UTF_8);
                Header retryCountHeader = consumerRecord.headers().lastHeader(RETRY_COUNT_HEADER_KEY);
                int retryCount = 0;
                if (retryCountHeader != null) {
                    retryCount = Integer.parseInt(new String(retryCountHeader.value(), UTF_8));
                }
                if (retryCount < 5) {
                    retryCount += 1;
                    log.info("Resending attempt {}", retryCount);
                    ProducerRecord<String, String> record = new ProducerRecord<>(originalTopic, json);
                    byte[] retryCountHeaderInByte = Integer.valueOf(retryCount).toString().getBytes(UTF_8);
                    record.headers().add(RETRY_COUNT_HEADER_KEY, retryCountHeaderInByte);
                    kafkaTemplate.send(record);
                    });
                } else {
                    log.error("Retry limit exceeded for message {}", json);
                }
            } else {
                log.error("Unable to resend DLQ message because it's missing the originalTopic header");
            }
        } catch (Exception e) {
            log.error("Unable to process DLQ message {}", json);
        } finally {
            ack.acknowledge();
        }
    }
}

One thing is left to make the retry logic a little smarter, to introduce some delay into the resending. That can be achieved by using an AsyncTaskExecutor and we’ll do the message sending in another thread but sleeping it at the beginning for some time, I’m using 5 seconds delay here. The final code looks the following:

@Component
@Slf4j
public class DlqTopicConsumer {
    public static final String RETRY_COUNT_HEADER_KEY = "retryCount";
    public static final String ORIGINAL_TOPIC_HEADER_KEY = "originalTopic";
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private AsyncTaskExecutor asyncTaskExecutor;

    @KafkaListener(id = "dlq-topic-consumer", groupId = "dlq-topic-group", topics = "dlq-topic")
    public void consume(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
        String json = consumerRecord.value().toString();
        try {
            log.info("Consuming DLQ message {}", json);
            Header originalTopicHeader = consumerRecord.headers().lastHeader(ORIGINAL_TOPIC_HEADER_KEY);
            if (originalTopicHeader != null) {
                String originalTopic = new String(originalTopicHeader.value(), UTF_8);
                Header retryCountHeader = consumerRecord.headers().lastHeader(RETRY_COUNT_HEADER_KEY);
                int retryCount = 0;
                if (retryCountHeader != null) {
                    retryCount = Integer.parseInt(new String(retryCountHeader.value(), UTF_8));
                }
                if (retryCount < 5) {
                    retryCount += 1;
                    log.info("Resending attempt {}", retryCount);
                    ProducerRecord<String, String> record = new ProducerRecord<>(originalTopic, json);
                    byte[] retryCountHeaderInByte = Integer.valueOf(retryCount).toString().getBytes(UTF_8);
                    record.headers().add(RETRY_COUNT_HEADER_KEY, retryCountHeaderInByte);
                    asyncTaskExecutor.execute(() -> {
                        try {
                            log.info("Waiting for 5 seconds until resend");
                            Thread.sleep(5000);
                            kafkaTemplate.send(record);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    });
                } else {
                    log.error("Retry limit exceeded for message {}", json);
                }
            } else {
                log.error("Unable to resend DLQ message because it's missing the originalTopic header");
            }
        } catch (Exception e) {
            log.error("Unable to process DLQ message {}", json);
        } finally {
            ack.acknowledge();
        }
    }
}


If you do the same test as we did before, sending the message to the normal-topic. It will send it to the DLQ topic and will try to send it 5 times with 5 seconds delay before each resend.

UPDATE: Jakub in the comments perfectly pointed out that in case the DLQ service crashes while on the async thread, waiting for the message to be resent, its going to lose the message. The quickest solution is to not use an async resending model but do the waiting in the original thread. However, that affects the overall throughput of the DLQ service.

@Component
@Slf4j
public class DlqTopicConsumer {
    public static final String RETRY_COUNT_HEADER_KEY = "retryCount";
    public static final String ORIGINAL_TOPIC_HEADER_KEY = "originalTopic";
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @KafkaListener(id = "dlq-topic-consumer", groupId = "dlq-topic-group", topics = "dlq-topic")
    public void consume(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
        String json = consumerRecord.value().toString();
        try {
            log.info("Consuming DLQ message {}", json);
            Header originalTopicHeader = consumerRecord.headers().lastHeader(ORIGINAL_TOPIC_HEADER_KEY);
            if (originalTopicHeader != null) {
                String originalTopic = new String(originalTopicHeader.value(), UTF_8);
                Header retryCountHeader = consumerRecord.headers().lastHeader(RETRY_COUNT_HEADER_KEY);
                int retryCount = 0;
                if (retryCountHeader != null) {
                    retryCount = Integer.parseInt(new String(retryCountHeader.value(), UTF_8));
                }
                if (retryCount < 5) {
                    retryCount += 1;
                    log.info("Resending attempt {}", retryCount);
                    ProducerRecord<String, String> record = new ProducerRecord<>(originalTopic, json);
                    byte[] retryCountHeaderInByte = Integer.valueOf(retryCount).toString().getBytes(UTF_8);
                    record.headers().add(RETRY_COUNT_HEADER_KEY, retryCountHeaderInByte);
                    log.info("Waiting for 5 seconds until resend");
                    Thread.sleep(5000);
                    kafkaTemplate.send(record);
                } else {
                    log.error("Retry limit exceeded for message {}", json);
                }
            } else {
                log.error("Unable to resend DLQ message because it's missing the originalTopic header");
            }
        } catch (Exception e) {
            log.error("Unable to process DLQ message {}", json);
        } finally {
            ack.acknowledge();
        }
    }
}

Another option is to have a persistent store for the delayed messages. In case the service crashes, it can pick up the message resending.

Conclusion

In this article, I wanted to show you how a fault-tolerant and reliable messaging can be achieved using Spring Boot. This is just covering the basics but definitely, you can start off from this to guarantee message ordering during the resends, multiple partitions, and so on.

I hope you liked it and if you did, give it a thumbs up and share it. If you are interested in more, make sure you follow me on Twitter. And as usual, the code is available on GitHub.

3 Replies to “Fault-tolerant and reliable messaging with Kafka and Spring Boot”

  1. Jakub says:
    1. Arnold Galovics says:
  2. Pingback: Fault Tolerance in the Context of Kafka Consumer – Iron Man PH Alternate Universe 2030

Leave a Reply

Your email address will not be published. Required fields are marked *