I haven’t really covered the topic of batch jobs so far and it happened that I needed to work with them lately and design a quite complicated batch job setup based on Spring Batch with partitioning using Kafka.
Obviously I’m not gonna describe the exact system but only some of the concepts that Spring Batch provides along with working examples.
Problem statement
Let’s talk a little bit about the problem statement. I’m not gonna go into detail about batch jobs in general but rather focus on a more specific problem – scaling batch jobs.
Imagine a process which you need to run on a regular basis, for example end of day (EOD). Let’s say the amount of data this process needs to handle is continuously increasing.
In a naive world, you could do a very simple Spring scheduling (or Quartz or what have you) that just executes a single method that loads all the data at once, processes all of it and writes back the result into the database.
A pseudo code would be similar to this:
void job() { read_all_data() for (item : data) process(item) write_result_to_database() }
What’s the problem with this? Nothing at all until the data you read is constant in volume and reading it all at once satisfies the needs.
Let’s say now the number of rows we read (from a database for example) is 10 000 rows. Works perfectly fine but what if suddenly there’s 10 000 000 rows? The execution will probably fail due to OutOfMemoryError
or takes an eternity to finish.
So how could we make this scalable and make sure the increasing data volume is not going to be an issue on the long run either?
Remote partitioning
Well, the concept of remote partitioning is simple.
You take your initial dataset, for example if we are reading transactions (or whatever domain object) from a database, we take the transaction IDs only. Divide them into partitions (not chunks; chunks have a different meaning in the Spring Batch world) and send the partitions to workers who can process them and do the actual business logic.
What I just described could be true for regular partitioning as well. The main difference between regular and remote partitioning is where the workers are. In case of regular partitioning, the process that acts as workers are local threads in the same JVM as the process that’s partitioning the data.
In case of remote partitioning though, the workers are not running in the same JVM but they are completely different JVMs. The individual workers are notified through a messaging system when there’s some work to pick up.
The image below illustrates the idea:
The messaging middleware can be different in each environment but for the sake of the article, I’ll choose Kafka (and probably later I’ll write an article with AWS SQS as well).
Limitations
Kafka operates based on topics. Topics can have partitions. The amount of consumers you can have (for the same consumer group) depends on the amount of partitions you have for the topic. Meaning that the concurrency factor for your partitioned batch jobs are directly connected with the amount of topic partitions.
The number of partitions used for a topic shall be set during the time of creating the topic. Later on, it’s possible to change the number of partitions for an existing topic however there are certain side-effects you have to be aware of.
This implies that dynamically scaling the amount of workers based on data volume is not possible with Kafka out of the box. By dynamic I mean that sometimes you need 10 workers but let’s say the data volume vastly increases during Christmas time and you’d need 50. That’s something you’ll need some custom scripts for.
After all, I think a good rule of thumb – in case of Kafka – is to oversize the number of topic partitions. Let’s say if you need 10 consumers during non-peak days and you’ll need 20 during peak days, I’d say you could go with double/triple that to make sure you’ll have room to grow without much of a headache. So I’d say 60 could be a good partition number to support at most 60 simultaneous consumers. Of course this depends on the speed your data volume grows but you get the idea.
Scaling in action
It’s coding time, let’s see all of this in action.
Before we do anything, we’ll need a project. Let’s go to start.spring.io as usual and create a new project with the following dependencies:
- Spring Batch
- Spring Integration
- Spring for Apache Kafka
- MySQL
- Liquibase
I’m going with a Gradle project but I defer that question to your best judgment.
Let’s open up the project and add one more dependency to the build.gradle
:
implementation 'org.springframework.batch:spring-batch-integration'
And the full list of dependencies for me:
dependencies { implementation 'org.springframework.boot:spring-boot-starter-batch' implementation 'org.springframework.batch:spring-batch-integration' implementation 'org.springframework.boot:spring-boot-starter-integration' implementation 'org.liquibase:liquibase-core' implementation 'org.springframework.integration:spring-integration-kafka' implementation 'org.springframework.kafka:spring-kafka' runtimeOnly 'mysql:mysql-connector-java' testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'org.springframework.batch:spring-batch-test' testImplementation 'org.springframework.integration:spring-integration-test' testImplementation 'org.springframework.kafka:spring-kafka-test' }
Good. Let’s jump into the coding piece.
Manager
We’ll start with the manager and its configuration. Let’s have a ManagerConfiguration
class. We’ll need a couple of dependencies for the configuration and a 2 annotations:
@Configuration @Profile("manager") public class ManagerConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private RemotePartitioningManagerStepBuilderFactory stepBuilderFactory; @Autowired private KafkaTemplate kafkaTemplate; }
The @Profile
annotation is vital because we only want this configuration to kick off when we’re trying to run the manager and we’ll control that with Spring’s profiles.
The JobBuilderFactory
will be used to create our partitioning job. The RemotePartitioningManagerStepBuilderFactory
is going to be used to create the steps for our job and it’s very important to use this class and not the regular StepBuilderFactory
. Also, note that there’s a very similar StepBuilderFactory
called RemotePartitioningWorkerStepBuilderFactory
which is intended to be used by the workers and not the manager. We’ll get there soon.
The KafkaTemplate
is automatically configured for us and we’ll need it to configure the channel between the manager and Kafka.
Now, let’s add a channel to the picture that we’ll use as an output channel from the application to Kafka:
@Configuration @Profile("manager") public class ManagerConfiguration { // previous content is omitted for simplicity @Bean public DirectChannel outboundRequests() { return new DirectChannel(); } }
DirectChannel
is just an abstraction over messaging channels in Spring Integration.
Next up, let’s create the Partitioner
that’ll partition our dataset. It’ll be a new class, I’m calling it ExamplePartitioner
.
public class ExamplePartitioner implements Partitioner { public static final String PARTITION_PREFIX = "partition"; @Override public Map<String, ExecutionContext> partition(int gridSize) { int partitionCount = 50; Map<String, ExecutionContext> partitions = new HashMap<>(); for (int i = 0; i < partitionCount; i++) { ExecutionContext executionContext = new ExecutionContext(); executionContext.put("data", new ArrayList<Integer>()); partitions.put(PARTITION_PREFIX + i, executionContext); } for (int i = 0; i < 1000; i++) { String key = PARTITION_PREFIX + (i % partitionCount); ExecutionContext executionContext = partitions.get(key); List<Integer> data = (List<Integer>) executionContext.get("data"); data.add(i + 1); } return partitions; } }
This partitioner implementation is not doing anything interesting. It creates 50 partitions and for each partition, it puts some numbers into a list that’s accessible under the key data
. That means, each partition will have 20 numbers in the list.
This is where you can imagine acquiring the transaction or whatever IDs you want to process and later down the line, the worker will load the corresponding rows from the database.
Good, let’s create the job steps and create a bean from the partitioner:
@Configuration @Profile("manager") public class ManagerConfiguration { // previous content is omitted for simplicity @Bean public ExamplePartitioner partitioner() { return new ExamplePartitioner(); } @Bean public Step partitionerStep() { return stepBuilderFactory.get("partitionerStep") .partitioner(Constants.WORKER_STEP_NAME, partitioner()) .outputChannel(outboundRequests()) .build(); } }
Nothing special, we create the step that’ll invoke our partitioner and the output channel where we want to send the partitions to. Also, there’s a reference here to the Constants
class, let me show you the content of it.
public class Constants { public static final String TOPIC_NAME = "work"; public static final String WORKER_STEP_NAME = "simpleStep"; public static final int TOPIC_PARTITION_COUNT = 3; }
That’s all. We’ll call the Kafka topic work
and it’ll have 3 topic partitions and the worker step that we want to invoke upon the partitioned dataset is called simpleStep
.
Good, now let’s create the partitioner job:
@Configuration @Profile("manager") public class ManagerConfiguration { // previous content is omitted for simplicity @Bean(name = "partitionerJob") public Job partitionerJob() { return jobBuilderFactory.get("partitioningJob") .start(partitionerStep()) .incrementer(new RunIdIncrementer()) .build(); } }
Again, nothing special, just referencing the previous partitioner step we created and adding a RunIdIncrementer
to the job so that we can rerun the job easily.
Good. Now, I’d say the most complicated stuff, how to wire the channel into Kafka and make sure that the topic partitions are utilized properly.
We’ll use Spring Integration for this as well:
@Configuration @Profile("manager") public class ManagerConfiguration { // previous content is omitted for simplicity @Bean public IntegrationFlow outboundFlow() { KafkaProducerMessageHandler messageHandler = new KafkaProducerMessageHandler(kafkaTemplate); messageHandler.setTopicExpression(new LiteralExpression(Constants.TOPIC_NAME)); return IntegrationFlows .from(outboundRequests()) .log() .handle(messageHandler) .get(); } }
First of all, we’ll need a KafkaProducerMessageHandler
that’ll take the incoming messages and publish them into a Kafka topic. The topic is marked by the setTopicExpression
method call and at the end, we just hook everything up as an integration flow.
This will however not utilize the topic partitions and the messages will be published to the very same partition. Let’s add a custom expression for that as well via the setPartitionIdExpression
method:
@Configuration @Profile("manager") public class ManagerConfiguration { // previous content is omitted for simplicity @Bean public IntegrationFlow outboundFlow() { KafkaProducerMessageHandler messageHandler = new KafkaProducerMessageHandler(kafkaTemplate); messageHandler.setTopicExpression(new LiteralExpression(Constants.TOPIC_NAME)); Function<Message<?>, Long> partitionIdFn = (m) -> { StepExecutionRequest executionRequest = (StepExecutionRequest) m.getPayload(); return executionRequest.getStepExecutionId() % Constants.TOPIC_PARTITION_COUNT; }; messageHandler.setPartitionIdExpression(new FunctionExpression<>(partitionIdFn)); return IntegrationFlows .from(outboundRequests()) .log() .handle(messageHandler) .get(); } }
We’ll supply a FunctionExpression
that will dynamically unwrap the message and gets the stepExecutionId
attribute and combines it with the modulo operator. The current value for the partition count is 3. This will mean that the partition ID expression will return a value from the [0, 1, 2] range which will denote the targeted topic partition. This is kind of provides an equal distribution among the partitions but not 100%. If you need a sophisticated partition ID decider, you can definitely adjust the implementation.
Also, you can similarly use the setMessageKeyExpression
method to supply a similar FunctionExpression
to calculate the message key instead of directly telling Kafka which partition to use.
One more thing to note is that I added log()
to the integration flow so the messages sent out are logged; just for debugging purposes.
That’s the manager configuration.
Worker
The worker configuration is going to be similar. Let’s create a WorkerConfiguration
class.
@Configuration @Profile("worker") public class WorkerConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private RemotePartitioningWorkerStepBuilderFactory stepBuilderFactory; @Autowired private DataSource dataSource; @Bean public IntegrationFlow inboundFlow(ConsumerFactory<String, String> cf) { return IntegrationFlows .from(Kafka.messageDrivenChannelAdapter(cf, Constants.TOPIC_NAME)) .channel(inboundRequests()) .get(); } @Bean public QueueChannel inboundRequests() { return new QueueChannel(); } }
Couple of dependencies, a message channel for inbound messages and wiring it together with Spring Integration.
Let’s create the worker step:
@Configuration @Profile("worker") public class WorkerConfiguration { // previous content is omitted for simplicity @Bean public Step simpleStep() { return stepBuilderFactory.get(Constants.WORKER_STEP_NAME) .inputChannel(inboundRequests()) .<Integer, Customer>chunk(100) .reader(itemReader(null)) .processor(itemProcessor()) .writer(itemWriter()) .build(); } }
This will create the step definition, connects it with the inbound message channel and references the ItemReader
, ItemProcessor
and ItemWriter
instances. Those look the following:
@Configuration @Profile("worker") public class WorkerConfiguration { // previous content is omitted for simplicity @Bean @StepScope public ItemReader<Integer> itemReader(@Value("#{stepExecutionContext['data']}") List<Integer> data) { List<Integer> remainingData = new ArrayList<>(data); return new ItemReader<>() { @Override public Integer read() { if (remainingData.size() > 0) { return remainingData.remove(0); } return null; } }; } }
The ItemReader
is a bean and will receive the partitioned data as a parameter in the Spring Batch execution context under the data
key. Note that it’s mandatory to use @StepScope
on the bean definition to enable late binding for the step.
The implementation is simple. We’ll store the received IDs in a local List and during each ItemReader
invocation, we’ll remove one item from the list until there’s nothing left.
@Configuration @Profile("worker") public class WorkerConfiguration { // previous content is omitted for simplicity @Bean public ItemWriter<Customer> itemWriter() { return new JdbcBatchItemWriterBuilder<Customer>() .beanMapped() .dataSource(dataSource) .sql("INSERT INTO customers (id) VALUES (:id)") .build(); } @Bean public ItemProcessor<Integer, Customer> itemProcessor() { return new ItemProcessor<>() { @Override public Customer process(Integer item) { return new Customer(item); } }; } }
The ItemProcessor
and ItemWriter
are even easier. The ItemProcessor
just converts the ID into a Customer
object, simulating some type of processing on a DTO and the ItemWriter
just writes the Customer
s into the database.
The Customer
class is a simple POJO, nothing special:
public class Customer { private int id; public Customer(int id) { this.id = id; } public int getId() { return id; } }
Last config steps
The next thing we need to do is to create the Kafka topic with the desired partition count so let’s create a new KafkaConfiguration
class:
@Configuration public class KafkaConfiguration { @Bean public NewTopic topic() { return TopicBuilder.name(Constants.TOPIC_NAME) .partitions(Constants.TOPIC_PARTITION_COUNT) .build(); } }
This will automatically create the topic with the partition count if it doesn’t exist yet.
Next, we’ll need to create the database structure for storing our Customer
s and for allowing Spring to manage its state. Let’s create a db.changelog-master.xml
file under the src/main/resources/db/changelog
folder with the following content:
<?xml version="1.0" encoding="UTF-8"?> <databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.1.xsd"> <changeSet id="0001-initial" author="Arnold Galovics"> <createTable tableName="customers"> <column name="id" type="number"> </column> </createTable> <sqlFile path="classpath:/org/springframework/batch/core/schema-mysql.sql" relativeToChangelogFile="false"/> </changeSet> </databaseChangeLog>
The createTable
is simple, the SQL file import is something provided by Spring Batch’s core module.
Let’s add some configuration into the application.properties
:
spring.datasource.url=jdbc:mysql://localhost:3306/db_example?createDatabaseIfNotExist=true spring.datasource.username=root spring.datasource.password=mysql spring.datasource.driver-class-name=com.mysql.jdbc.Driver spring.liquibase.change-log=classpath:db/changelog/db.changelog-master.xml
DataSource config with Liquibase. And then the Kafka producer configuration:
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.producer.group-id=producer-g
The most important thing here is to use the JsonSerializer
so that the message Spring Batch is sending is encoded into a JSON.
Similarly, the consumer:
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.LongDeserializer spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.group-id=consumer-g
And one more thing:
spring.kafka.consumer.properties.spring.json.trusted.packages=*
That concludes the configuration.
Showtime
Let’s create the infra for starting up the app. I’ll create a docker-compose.yml
:
version: "3" services: zookeeper: image: 'bitnami/zookeeper:latest' ports: - '2181:2181' environment: - ALLOW_ANONYMOUS_LOGIN=yes kafka: image: 'bitnami/kafka:latest' ports: - '9092:9092' environment: - KAFKA_BROKER_ID=1 - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT - KAFKA_CFG_LISTENERS=CLIENT://:9093,EXTERNAL://:9092 - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9093,EXTERNAL://localhost:9092 - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT depends_on: - zookeeper kafka-ui: image: provectuslabs/kafka-ui:latest ports: - '8080:8080' environment: - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9093 depends_on: - kafka mysql: image: mysql ports: - '3306:3306' command: --default-authentication-plugin=mysql_native_password restart: always environment: MYSQL_ROOT_PASSWORD: mysql
I’m not gonna go into detail. It starts up a Kafka broker, a Kafka UI instance on port 8080 to view the state of the topic if you want to and a MySQL server.
Let’s do a quick docker-compose up
to start up everything.
And then let’s use Gradle to start up the apps on the corresponding profiles.
$ ./gradlew bootRun --args='--spring.profiles.active=worker'
Do this in 3 different terminal windows and you should have your 3 workers. The log message you should see at the end of the logs is similar to this:
2022-03-29 21:19:11.475 INFO 3756 --- [ntainer#0-0-C-1] s.k.l.ConcurrentMessageListenerContainer : consumer-g: partitions assigned: [work-2]
2022-03-29 21:19:11.487 INFO 7260 --- [ntainer#0-0-C-1] s.k.l.ConcurrentMessageListenerContainer : consumer-g: partitions assigned: [work-1]
2022-03-29 21:19:11.475 INFO 23756 --- [ntainer#0-0-C-1] s.k.l.ConcurrentMessageListenerContainer : consumer-g: partitions assigned: [work-0]
The 3 different topic partitions are assigned to the 3 different worker instances.
Then let’s start the manager with:
$ ./gradlew bootRun --args='--spring.profiles.active=manager'
The manager instance will distribute the work, wait until the workers have completed all the partitions and it stops successfully with the message:
2022-03-29 21:21:41.475 INFO 11996 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=partitioningJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED] in 11s998ms
Then, if you check the customers
table in your MySQL, it will contain the expected 1000 records.
Takeaway
The reason I decided to write this article because I barely found other sources on how exactly I should wire Kafka into Spring Batch as a messaging middleware but I eventually sorted it out and wanted to help others.
I know it’s a bit long but hope it helps others. I’m planning to cover a similar piece with a tiny bit of difference that the messaging system is AWS SQS; cause again, lack of resources available.
The full code is available on GitHub.
If you liked it, feel free to share it with your friends, colleagues, whoever and make sure to follow me on Twitter and Facebook.
Manager dont stops after send the messages.