Scaling Spring Batch processing with remote partitioning using Kafka

I haven’t really covered the topic of batch jobs so far and it happened that I needed to work with them lately and design a quite complicated batch job setup based on Spring Batch with partitioning using Kafka.

Obviously I’m not gonna describe the exact system but only some of the concepts that Spring Batch provides along with working examples.

Problem statement

Let’s talk a little bit about the problem statement. I’m not gonna go into detail about batch jobs in general but rather focus on a more specific problem – scaling batch jobs.

Imagine a process which you need to run on a regular basis, for example end of day (EOD). Let’s say the amount of data this process needs to handle is continuously increasing.

In a naive world, you could do a very simple Spring scheduling (or Quartz or what have you) that just executes a single method that loads all the data at once, processes all of it and writes back the result into the database.

A pseudo code would be similar to this:

void job() {
    read_all_data()
    for (item : data)
       process(item)
    write_result_to_database()
}

What’s the problem with this? Nothing at all until the data you read is constant in volume and reading it all at once satisfies the needs.

Let’s say now the number of rows we read (from a database for example) is 10 000 rows. Works perfectly fine but what if suddenly there’s 10 000 000 rows? The execution will probably fail due to OutOfMemoryError or takes an eternity to finish.

So how could we make this scalable and make sure the increasing data volume is not going to be an issue on the long run either?

Remote partitioning

Well, the concept of remote partitioning is simple.

You take your initial dataset, for example if we are reading transactions (or whatever domain object) from a database, we take the transaction IDs only. Divide them into partitions (not chunks; chunks have a different meaning in the Spring Batch world) and send the partitions to workers who can process them and do the actual business logic.

What I just described could be true for regular partitioning as well. The main difference between regular and remote partitioning is where the workers are. In case of regular partitioning, the process that acts as workers are local threads in the same JVM as the process that’s partitioning the data.

In case of remote partitioning though, the workers are not running in the same JVM but they are completely different JVMs. The individual workers are notified through a messaging system when there’s some work to pick up.

The image below illustrates the idea:

The messaging middleware can be different in each environment but for the sake of the article, I’ll choose Kafka (and probably later I’ll write an article with AWS SQS as well).

Limitations

Kafka operates based on topics. Topics can have partitions. The amount of consumers you can have (for the same consumer group) depends on the amount of partitions you have for the topic. Meaning that the concurrency factor for your partitioned batch jobs are directly connected with the amount of topic partitions.

The number of partitions used for a topic shall be set during the time of creating the topic. Later on, it’s possible to change the number of partitions for an existing topic however there are certain side-effects you have to be aware of.

This implies that dynamically scaling the amount of workers based on data volume is not possible with Kafka out of the box. By dynamic I mean that sometimes you need 10 workers but let’s say the data volume vastly increases during Christmas time and you’d need 50. That’s something you’ll need some custom scripts for.

After all, I think a good rule of thumb – in case of Kafka – is to oversize the number of topic partitions. Let’s say if you need 10 consumers during non-peak days and you’ll need 20 during peak days, I’d say you could go with double/triple that to make sure you’ll have room to grow without much of a headache. So I’d say 60 could be a good partition number to support at most 60 simultaneous consumers. Of course this depends on the speed your data volume grows but you get the idea.

Scaling in action

It’s coding time, let’s see all of this in action.

Before we do anything, we’ll need a project. Let’s go to start.spring.io as usual and create a new project with the following dependencies:

  • Spring Batch
  • Spring Integration
  • Spring for Apache Kafka
  • MySQL
  • Liquibase

I’m going with a Gradle project but I defer that question to your best judgment.

Let’s open up the project and add one more dependency to the build.gradle:

implementation 'org.springframework.batch:spring-batch-integration'

And the full list of dependencies for me:

dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-batch'
	implementation 'org.springframework.batch:spring-batch-integration'
	implementation 'org.springframework.boot:spring-boot-starter-integration'
	implementation 'org.liquibase:liquibase-core'
	implementation 'org.springframework.integration:spring-integration-kafka'
	implementation 'org.springframework.kafka:spring-kafka'
	runtimeOnly 'mysql:mysql-connector-java'
	testImplementation 'org.springframework.boot:spring-boot-starter-test'
	testImplementation 'org.springframework.batch:spring-batch-test'
	testImplementation 'org.springframework.integration:spring-integration-test'
	testImplementation 'org.springframework.kafka:spring-kafka-test'
}

Good. Let’s jump into the coding piece.

Manager

We’ll start with the manager and its configuration. Let’s have a ManagerConfiguration class. We’ll need a couple of dependencies for the configuration and a 2 annotations:

@Configuration
@Profile("manager")
public class ManagerConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private RemotePartitioningManagerStepBuilderFactory stepBuilderFactory;

    @Autowired
    private KafkaTemplate kafkaTemplate;
}

The @Profile annotation is vital because we only want this configuration to kick off when we’re trying to run the manager and we’ll control that with Spring’s profiles.

The JobBuilderFactory will be used to create our partitioning job. The RemotePartitioningManagerStepBuilderFactory is going to be used to create the steps for our job and it’s very important to use this class and not the regular StepBuilderFactory. Also, note that there’s a very similar StepBuilderFactory called RemotePartitioningWorkerStepBuilderFactory which is intended to be used by the workers and not the manager. We’ll get there soon.

The KafkaTemplate is automatically configured for us and we’ll need it to configure the channel between the manager and Kafka.

Now, let’s add a channel to the picture that we’ll use as an output channel from the application to Kafka:

@Configuration
@Profile("manager")
public class ManagerConfiguration {
    // previous content is omitted for simplicity

    @Bean
    public DirectChannel outboundRequests() {
        return new DirectChannel();
    }
}

DirectChannel is just an abstraction over messaging channels in Spring Integration.

Next up, let’s create the Partitioner that’ll partition our dataset. It’ll be a new class, I’m calling it ExamplePartitioner.

public class ExamplePartitioner implements Partitioner {
    public static final String PARTITION_PREFIX = "partition";

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        int partitionCount = 50;
        Map<String, ExecutionContext> partitions = new HashMap<>();
        for (int i = 0; i < partitionCount; i++) {
            ExecutionContext executionContext = new ExecutionContext();
            executionContext.put("data", new ArrayList<Integer>());
            partitions.put(PARTITION_PREFIX + i, executionContext);
        }
        for (int i = 0; i < 1000; i++) {
            String key = PARTITION_PREFIX + (i % partitionCount);
            ExecutionContext executionContext = partitions.get(key);
            List<Integer> data = (List<Integer>) executionContext.get("data");
            data.add(i + 1);
        }
        return partitions;
    }
}

This partitioner implementation is not doing anything interesting. It creates 50 partitions and for each partition, it puts some numbers into a list that’s accessible under the key data. That means, each partition will have 20 numbers in the list.

This is where you can imagine acquiring the transaction or whatever IDs you want to process and later down the line, the worker will load the corresponding rows from the database.

Good, let’s create the job steps and create a bean from the partitioner:

@Configuration
@Profile("manager")
public class ManagerConfiguration {
    // previous content is omitted for simplicity

    @Bean
    public ExamplePartitioner partitioner() {
        return new ExamplePartitioner();
    }

    @Bean
    public Step partitionerStep() {
        return stepBuilderFactory.get("partitionerStep")
                .partitioner(Constants.WORKER_STEP_NAME, partitioner())
                .outputChannel(outboundRequests())
                .build();
    }
}

Nothing special, we create the step that’ll invoke our partitioner and the output channel where we want to send the partitions to. Also, there’s a reference here to the Constants class, let me show you the content of it.

public class Constants {
    public static final String TOPIC_NAME = "work";
    public static final String WORKER_STEP_NAME = "simpleStep";
    public static final int TOPIC_PARTITION_COUNT = 3;
}

That’s all. We’ll call the Kafka topic work and it’ll have 3 topic partitions and the worker step that we want to invoke upon the partitioned dataset is called simpleStep.

Good, now let’s create the partitioner job:

@Configuration
@Profile("manager")
public class ManagerConfiguration {
    // previous content is omitted for simplicity

    @Bean(name = "partitionerJob")
    public Job partitionerJob() {
        return jobBuilderFactory.get("partitioningJob")
                .start(partitionerStep())
                .incrementer(new RunIdIncrementer())
                .build();
    }
}

Again, nothing special, just referencing the previous partitioner step we created and adding a RunIdIncrementer to the job so that we can rerun the job easily.

Good. Now, I’d say the most complicated stuff, how to wire the channel into Kafka and make sure that the topic partitions are utilized properly.

We’ll use Spring Integration for this as well:

@Configuration
@Profile("manager")
public class ManagerConfiguration {
    // previous content is omitted for simplicity

    @Bean
    public IntegrationFlow outboundFlow() {
        KafkaProducerMessageHandler messageHandler = new KafkaProducerMessageHandler(kafkaTemplate);
        messageHandler.setTopicExpression(new LiteralExpression(Constants.TOPIC_NAME));
        return IntegrationFlows
                .from(outboundRequests())
                .log()
                .handle(messageHandler)
                .get();
    }
}

First of all, we’ll need a KafkaProducerMessageHandler that’ll take the incoming messages and publish them into a Kafka topic. The topic is marked by the setTopicExpression method call and at the end, we just hook everything up as an integration flow.

This will however not utilize the topic partitions and the messages will be published to the very same partition. Let’s add a custom expression for that as well via the setPartitionIdExpression method:

@Configuration
@Profile("manager")
public class ManagerConfiguration {
    // previous content is omitted for simplicity

    @Bean
    public IntegrationFlow outboundFlow() {
        KafkaProducerMessageHandler messageHandler = new KafkaProducerMessageHandler(kafkaTemplate);
        messageHandler.setTopicExpression(new LiteralExpression(Constants.TOPIC_NAME));
        Function<Message<?>, Long> partitionIdFn = (m) -> {
            StepExecutionRequest executionRequest = (StepExecutionRequest) m.getPayload();
            return executionRequest.getStepExecutionId() % Constants.TOPIC_PARTITION_COUNT;
        };
        messageHandler.setPartitionIdExpression(new FunctionExpression<>(partitionIdFn));
        return IntegrationFlows
                .from(outboundRequests())
                .log()
                .handle(messageHandler)
                .get();
    }
}

We’ll supply a FunctionExpression that will dynamically unwrap the message and gets the stepExecutionId attribute and combines it with the modulo operator. The current value for the partition count is 3. This will mean that the partition ID expression will return a value from the [0, 1, 2] range which will denote the targeted topic partition. This is kind of provides an equal distribution among the partitions but not 100%. If you need a sophisticated partition ID decider, you can definitely adjust the implementation.

Also, you can similarly use the setMessageKeyExpression method to supply a similar FunctionExpression to calculate the message key instead of directly telling Kafka which partition to use.

One more thing to note is that I added log() to the integration flow so the messages sent out are logged; just for debugging purposes.

That’s the manager configuration.

Worker

The worker configuration is going to be similar. Let’s create a WorkerConfiguration class.

@Configuration
@Profile("worker")
public class WorkerConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private RemotePartitioningWorkerStepBuilderFactory stepBuilderFactory;

    @Autowired
    private DataSource dataSource;

    @Bean
    public IntegrationFlow inboundFlow(ConsumerFactory<String, String> cf) {
        return IntegrationFlows
                .from(Kafka.messageDrivenChannelAdapter(cf, Constants.TOPIC_NAME))
                .channel(inboundRequests())
                .get();
    }

    @Bean
    public QueueChannel inboundRequests() {
        return new QueueChannel();
    }
}

Couple of dependencies, a message channel for inbound messages and wiring it together with Spring Integration.

Let’s create the worker step:

@Configuration
@Profile("worker")
public class WorkerConfiguration {
    // previous content is omitted for simplicity

    @Bean
    public Step simpleStep() {
        return stepBuilderFactory.get(Constants.WORKER_STEP_NAME)
                .inputChannel(inboundRequests())
                .<Integer, Customer>chunk(100)
                .reader(itemReader(null))
                .processor(itemProcessor())
                .writer(itemWriter())
                .build();
    }
}

This will create the step definition, connects it with the inbound message channel and references the ItemReader, ItemProcessor and ItemWriter instances. Those look the following:

@Configuration
@Profile("worker")
public class WorkerConfiguration {
    // previous content is omitted for simplicity
    
    @Bean
    @StepScope
    public ItemReader<Integer> itemReader(@Value("#{stepExecutionContext['data']}") List<Integer> data) {
        List<Integer> remainingData = new ArrayList<>(data);
        return new ItemReader<>() {
            @Override
            public Integer read() {
                if (remainingData.size() > 0) {
                    return remainingData.remove(0);
                }

                return null;
            }
        };
    }
}

The ItemReader is a bean and will receive the partitioned data as a parameter in the Spring Batch execution context under the data key. Note that it’s mandatory to use @StepScope on the bean definition to enable late binding for the step.

The implementation is simple. We’ll store the received IDs in a local List and during each ItemReader invocation, we’ll remove one item from the list until there’s nothing left.

@Configuration
@Profile("worker")
public class WorkerConfiguration {
    // previous content is omitted for simplicity
    
    @Bean
    public ItemWriter<Customer> itemWriter() {
        return new JdbcBatchItemWriterBuilder<Customer>()
                .beanMapped()
                .dataSource(dataSource)
                .sql("INSERT INTO customers (id) VALUES (:id)")
                .build();
    }

    @Bean
    public ItemProcessor<Integer, Customer> itemProcessor() {
        return new ItemProcessor<>() {
            @Override
            public Customer process(Integer item) {
                return new Customer(item);
            }
        };
    }
}

The ItemProcessor and ItemWriter are even easier. The ItemProcessor just converts the ID into a Customer object, simulating some type of processing on a DTO and the ItemWriter just writes the Customers into the database.

The Customer class is a simple POJO, nothing special:

public class Customer {
    private int id;

    public Customer(int id) {
        this.id = id;
    }

    public int getId() {
        return id;
    }
}

Last config steps

The next thing we need to do is to create the Kafka topic with the desired partition count so let’s create a new KafkaConfiguration class:

@Configuration
public class KafkaConfiguration {
    @Bean
    public NewTopic topic() {
        return TopicBuilder.name(Constants.TOPIC_NAME)
                .partitions(Constants.TOPIC_PARTITION_COUNT)
                .build();
    }
}

This will automatically create the topic with the partition count if it doesn’t exist yet.

Next, we’ll need to create the database structure for storing our Customers and for allowing Spring to manage its state. Let’s create a db.changelog-master.xml file under the src/main/resources/db/changelog folder with the following content:

<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
                   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                   xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.1.xsd">
    <changeSet id="0001-initial" author="Arnold Galovics">
        <createTable tableName="customers">
            <column name="id" type="number">
            </column>
        </createTable>
        <sqlFile path="classpath:/org/springframework/batch/core/schema-mysql.sql" relativeToChangelogFile="false"/>
    </changeSet>
</databaseChangeLog>

The createTable is simple, the SQL file import is something provided by Spring Batch’s core module.

Let’s add some configuration into the application.properties:

spring.datasource.url=jdbc:mysql://localhost:3306/db_example?createDatabaseIfNotExist=true
spring.datasource.username=root
spring.datasource.password=mysql
spring.datasource.driver-class-name=com.mysql.jdbc.Driver

spring.liquibase.change-log=classpath:db/changelog/db.changelog-master.xml

DataSource config with Liquibase. And then the Kafka producer configuration:

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.group-id=producer-g

The most important thing here is to use the JsonSerializer so that the message Spring Batch is sending is encoded into a JSON.

Similarly, the consumer:

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.LongDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.group-id=consumer-g

And one more thing:

spring.kafka.consumer.properties.spring.json.trusted.packages=*

That concludes the configuration.

Showtime

Let’s create the infra for starting up the app. I’ll create a docker-compose.yml:

version: "3"
services:
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: 'bitnami/kafka:latest'
    ports:
      - '9092:9092'
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9093,EXTERNAL://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9093,EXTERNAL://localhost:9092
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
    depends_on:
      - zookeeper
  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - '8080:8080'
    environment:
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9093
    depends_on:
      - kafka
  mysql:
    image: mysql
    ports:
      - '3306:3306'
    command: --default-authentication-plugin=mysql_native_password
    restart: always
    environment:
      MYSQL_ROOT_PASSWORD: mysql

I’m not gonna go into detail. It starts up a Kafka broker, a Kafka UI instance on port 8080 to view the state of the topic if you want to and a MySQL server.

Let’s do a quick docker-compose up to start up everything.

And then let’s use Gradle to start up the apps on the corresponding profiles.

$ ./gradlew bootRun --args='--spring.profiles.active=worker'

Do this in 3 different terminal windows and you should have your 3 workers. The log message you should see at the end of the logs is similar to this:

2022-03-29 21:19:11.475  INFO 3756 --- [ntainer#0-0-C-1] s.k.l.ConcurrentMessageListenerContainer : consumer-g: partitions assigned: [work-2]
2022-03-29 21:19:11.487  INFO 7260 --- [ntainer#0-0-C-1] s.k.l.ConcurrentMessageListenerContainer : consumer-g: partitions assigned: [work-1]
2022-03-29 21:19:11.475  INFO 23756 --- [ntainer#0-0-C-1] s.k.l.ConcurrentMessageListenerContainer : consumer-g: partitions assigned: [work-0]

The 3 different topic partitions are assigned to the 3 different worker instances.

Then let’s start the manager with:

$ ./gradlew bootRun --args='--spring.profiles.active=manager'

The manager instance will distribute the work, wait until the workers have completed all the partitions and it stops successfully with the message:

2022-03-29 21:21:41.475  INFO 11996 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=partitioningJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED] in 11s998ms

Then, if you check the customers table in your MySQL, it will contain the expected 1000 records.

Takeaway

The reason I decided to write this article because I barely found other sources on how exactly I should wire Kafka into Spring Batch as a messaging middleware but I eventually sorted it out and wanted to help others.

I know it’s a bit long but hope it helps others. I’m planning to cover a similar piece with a tiny bit of difference that the messaging system is AWS SQS; cause again, lack of resources available.

The full code is available on GitHub.

If you liked it, feel free to share it with your friends, colleagues, whoever and make sure to follow me on Twitter and Facebook.

One Reply to “Scaling Spring Batch processing with remote partitioning using Kafka”

Leave a Reply

Your email address will not be published. Required fields are marked *