Fault-tolerant and reliable messaging with Kafka

Lately there is a trend to use a messaging system for practically everything. Just like it is with microservices. When it comes to messaging, you have to balance between the different properties each messaging system provides. For the last few years, Kafka has started to increase its market share considerably. In addition to microservices and messaging, there is an architectural pattern that has started to spread as well, event-sourcing.

Kafka is a perfect fit for event-sourcing since it provides the properties the architectural pattern requires. One of the key concepts in event-sourcing is to store an immutable sequence of events – think about this as an audit log – to capture the state of the system. This gives the possibility of recreating the system state at any given time just by replaying the events until a certain point. Of course, as every pattern, it has drawbacks. It introduces a hell lot of problems you have to deal with that you can forget about when writing a standard application with a simple database in the back.

One difficulty when dealing with a system that uses messaging under the hood is the asynchronous nature of the communication. On one hand it requires a merely different thinking. On the other hand it makes error handling hard.

When choosing the architecture of a system, often people look at just the brochure, and they forget everything has pros and cons. When its decided that messaging will be used and the implementation starts, its simply forgotten that a proper error handling is needed.

Let’s look at a simple scenario:

  1. A user registers on a website triggering an API call
  2. The API is a synchronous one and goes through an API Gateway
  3. The API Gateway synchronously forwarding the call to a downstream microservice to save the new user into a MySQL database
  4. The MySQL database is inaccessible for 10 seconds
  5. The error ripples back to the user

In this case, the user is immediately notified that the registration was unsuccessful and that the procedure has to be restarted. Let’s see what happens if we transform this to use a messaging system:

  1. A user registers on a website triggering an API call
  2. The API is a synchronous one and goes through an API Gateway
  3. The API Gateway is putting a message into a message queue – Kafka – and gives back an OK status to the user
  4. In the meantime, the downstream service is asynchronously picking up the registration message and starts the processing
  5. The MySQL database is inaccessible for 10 seconds when the registration event is processed
  6. The error is hidden

In the latter case with messaging, we just lost a user registration. How to deal with this situation?

Crashing when Kafka message in-flight

Even if we look at the above scenario, there are other problems as well. Assume that the service is normally processing the registration message and MySQL is fully operational. When processing the message in the service, it crashes due to lack of memory. In case the developer has not invested time into handling this scenario and the service uses auto commit mode for reading messages from Kafka, the messages might get lost.

What is this auto-commit? Kafka has a notion of an offset. An offset defines that which messages have been successfully seen by the consumer on a particular topic. It’s a very simple concept. If you have 3 messages in a Kafka topic, as soon as the consumer reads the first message, it marks that it was successfully read. When the next message reading is successful, it marks the offset again and so on. This way, when the consumer is down, messages can simply pile up and it will be able to retrieve the non-seen messages only – from the latest seen offset. Marking the offset seen is so called commit in the Kafka ecosystem.

When going with default settings on several frameworks that provides an abstraction over Kafka consumption, usually they use the auto-commit feature of Kafka. As the name might suggest, it is an automatic commit. In a nutshell it means that some time after the message is read from the Kafka topic, the offset is committed. Of course if we want to go deeper, there are several other factors to the behavior of auto-commit, like tuning the auto-commit interval and so on. Why is this a problem?

Well, imagine that your application is doing the message processing, contacting external systems, reading from the database, and so on. The message was successfully retrieved and the offset is committed. However, the message processing is still ongoing and if at that time the service crashes, we lose the message. Since the offset is committed, Kafka will not push the failed message to the consumer again.

Delivery guarantees

I began the article with the importance of choosing the best fit messaging system for your application. If you go with Kafka, one crucial attribute is the delivery guarantee it provides. Practically speaking, the delivery guarantee mostly depends on the clients, not on the Kafka installation settings.

There are 3 types of delivery guarantees:

  • At-most once delivery
  • Exactly once delivery
  • At-least once delivery

I think the different types speak for themselves but let me cover it on a high-level.

The at-most once delivery is when a new message appears on a topic, the consumer will either receive the message and successfully process it, or not. As a matter of fact, this is the default behavior for most of the frameworks and consumers. When auto-commit mode is enabled, we have the at-most once guarantee in place.

The exactly once delivery is when it is ensured that a message only gets delivered to the consumer exactly once and redelivery of processed messages are not happening.

The at-least once delivery is going to be discussed in the next section.

The idea of manual commit

As always, solving a specific problem comes with different aspects depending on the use-case. I’ll cover the general principle for moving away from the auto-commit mode and guarantee at-least once message delivery.

The idea is very simple. When a message consumption starts, don’t immediately commit the read offset but wait until the processing is complete and then manually commit the offset. Going with this approach ensures that messages are only considered processed when they have made it through the application logic to handle them. This solves the problem of losing in-flight messages within the application in case of a crash.

However, one thing to consider. When you restart the application after the crash. It must be able to re-process the message. In other words, the message processing should be idempotent. Imagine when the user registers to the system and right after inserting the new row into the database, the application fails hence the offset is not committed. Then it gets restarted, it picks up the same message and inserts another row to the database with the same data. Obviously, this is a glitch in the system. This is why it’s important for your message processing logic to be idempotent.

With manual commit, there’s another use-case you have to prepare for, the error cases. Let’s go with the example of the database being dead. This case, when using manual commit means that the application will keep trying to reprocess the message. How to recover from this?

Well, there are multiple ways to approach. One option is to simply have a generic error handler in the processing that will catch any type of errors that the application code is not prepared for and simply log a message that there was an error. It’s gonna work flawlessly but we can do this much smarter.

Recovering from errors

I only mentioned one example of errors but the complex systems out there today, with the cloud architecture, the different cloud components. Not preparing for errors to happen is simply irresponsible. Trust me, they are going to happen.

Back to the example. Recovering from this type of error is kind of easy to imagine. Just wait some time until you try to use the database. You can prepare your application code to do smart retries but on a generic level, you could utilize Kafka for this purpose. Imagine that you have another topic. There is a consumer on this topic that does only one thing, resends the message to the original topic.

There you go, we just created a retry mechanism without any explicit retry polluting the application code – although I’m not saying its a bad idea to protect your code from failing. Yet, this is still a very primitive approach because it will infinitely try to resend the message in case of a failure, even if its a non-recoverable error. Although this will work, in case you need to have ordered message delivery, it gets much more complex.

There is an industry standard to handle failing messages, it’s called Dead-letter queue (DLQ). This is practically what the new topic and consumer was doing however instead of just dropping failed messages onto the topic to inspect them in the future, we enhanced it as a way of retry.

The DLQ consumer is able to resend the messages, that’s good but we should be able to move away from the infinite retries. So we could introduce the notion of retryCount that tells the DLQ consumer how many times a particular message was retried. If it reaches a threshold, say 5, it simply stops retrying the message. When the threshold is reached, the message should be inspected closely, either in an automated manner or in a manual one. You could simply log the message and use Kibana, Splunk, etc to detect those retried but failed messages. You could use a dedicated storage to put those, for instance S3 but this is just the question of your architecture and what is suitable for your case.

With this addition, the system will not retry infinitely, that’s awesome. But if you think about the retry mechanism, say there is a message coming in to the normal consumer. It fails because it cannot reach the MySQL database since its down for 10 seconds. If the application keeps retrying messages immediately, it might just finish too quickly when the database is not back online. For a little smarter retry logic, one can introduce a delay in resending the messages. It could be a constant delay, like 5 seconds or it could be an exponential delay, say starting with 1 second and then continuously increasing.

A practical implementation on storing and passing the retry count and the original topic within messages could come in different fashions. Again it depends on the architecture you have. For instance the retry count could be stored as a wrapper on top of your message, let’s say you have this JSON message:

You introduce a special type of message, the retryable message and convert the original one into this:

You can immediately store the retry information.

Another possibility is to store the retry count in the message header if the Kafka broker you are using supports it. In that case it’s fully transparent.

When a message needs to be resent in the DLQ consumer, it must figure out which topic it must resend the message to. Again, there are multiple possibilities here but let me mention two of them.

One approach is to store the original topic within the message just like with the retry count or utilize a Kafka header.

Another approach is to encode the original topic into the DLQ topic. Imagine that the DLQ consumer can listen on a number of topics based on a regular expression. The rule could be very simple, if the topic name ends with -dlq, then it must listen on. If we use this approach, if the normal topic is called normal-topic and in case of a message processing failure, we send the message to normal-topic-dlq, the DLQ consumer can easily deduct which topic it needs to resend to.

Conclusion

We’ve seen what problems can be encountered when using asynchronous message processing in a system. The focus was on the cases when messages can be either lost or unable to be processed. For any architecture that uses messaging systems, should be prepared for cases when the application components simply fail. We’ve looked at utilizing manual commit mode to achieve at-least once delivery. And we’ve checked how a DLQ can be set up to implement smart retries along with inspectable message failures.

In an upcoming post, I’m going to show an implementation on these concepts based on Java and Spring Boot.

If you liked the article, give it a thumbs up and share it. If you are interested in more, make sure you follow me on Twitter.

Leave a Reply

Your email address will not be published. Required fields are marked *