On the last article, I explained how to implement a Spring Batch job with remote partitioning using Kafka. This time I’m gonna discuss how to do Spring Batch remote partitioning with AWS SQS as the messaging layer between the manager and workers.
You can find the previous article here: Scaling Spring Batch processing with remote partitioning using Kafka
If you’re interested in the concepts of remote partitioning, check the article above; right now I’m gonna only focus on the implementation details because the concept is the very same.
Project implementation
Before doing anything, let’s create a Gradle project on start.spring.io with the following dependencies:
- Spring Batch
- Spring Integration
- MySQL
- Liquibase
This is just the initial set of dependencies, we gotta add a few more manually. Open build.gradle:
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-batch'
implementation 'org.springframework.batch:spring-batch-integration'
implementation 'org.springframework.integration:spring-integration-aws:2.3.5.RELEASE'
implementation 'org.springframework.cloud:spring-cloud-aws-messaging:2.2.6.RELEASE'
implementation 'com.amazonaws:aws-java-sdk-sqs:1.12.186'
implementation 'org.liquibase:liquibase-core'
runtimeOnly 'mysql:mysql-connector-java'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
}
The following 3 needs to be added:
org.springframework.integration:spring-integration-awsorg.springframework.cloud:spring-cloud-aws-messagingcom.amazonaws:aws-java-sdk-sqs
These dependencies will supply the AWS SQS support with Spring Integration.
Manager
First of all, the manager configuration class:
@Configuration
@Profile("manager")
public class ManagerConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private RemotePartitioningManagerStepBuilderFactory stepBuilderFactory;
}
This is very similar to the one in the other article but obviously we don’t need the KafkaTemplate here.
The @Profile annotation will ensure that this configuration class is only picked up if the manager profile is set.
Next, the channel we’ll send the partitioned requests to:
@Configuration
@Profile("manager")
public class ManagerConfiguration {
// previous content is omitted for simplicity
@Bean
public DirectChannel outboundRequests() {
return new DirectChannel();
}
}
The partitioner is the exact same I used for the Kafka article:
public class ExamplePartitioner implements Partitioner {
public static final String PARTITION_PREFIX = "partition";
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
int partitionCount = 50;
Map<String, ExecutionContext> partitions = new HashMap<>();
for (int i = 0; i < partitionCount; i++) {
ExecutionContext executionContext = new ExecutionContext();
executionContext.put("data", new ArrayList<Integer>());
partitions.put(PARTITION_PREFIX + i, executionContext);
}
for (int i = 0; i < 1000; i++) {
String key = PARTITION_PREFIX + (i % partitionCount);
ExecutionContext executionContext = partitions.get(key);
List<Integer> data = (List<Integer>) executionContext.get("data");
data.add(i + 1);
}
return partitions;
}
}
This partitioner implementation is not doing anything interesting. It creates 50 partitions and for each partition, it puts some numbers into a list that’s accessible under the key data. That means, each partition will have 20 numbers in the list.
Let’s create the job steps and create a bean from the partitioner:
@Configuration
@Profile("manager")
public class ManagerConfiguration {
// previous content is omitted for simplicity
@Bean
public ExamplePartitioner partitioner() {
return new ExamplePartitioner();
}
@Bean
public Step partitionerStep() {
return stepBuilderFactory.get("partitionerStep")
.partitioner(Constants.WORKER_STEP_NAME, partitioner())
.outputChannel(outboundRequests())
.build();
}
}
The Constant class looks the following:
public class Constants {
public static final String QUEUE_NAME = "work";
public static final String WORKER_STEP_NAME = "simpleStep";
}
Then the job definition:
@Configuration
@Profile("manager")
public class ManagerConfiguration {
// previous content is omitted for simplicity
@Bean(name = "partitionerJob")
public Job partitionerJob() {
return jobBuilderFactory.get("partitioningJob")
.start(partitionerStep())
.incrementer(new RunIdIncrementer())
.build();
}
}
And then hook up the output channel we defined earlier with AWS SQS:
@Configuration
@Profile("manager")
public class ManagerConfiguration {
// previous content is omitted for simplicity
@Bean
public IntegrationFlow outboundFlow(AmazonSQSAsync sqsAsync) {
SqsMessageHandler sqsMessageHandler = new SqsMessageHandler(sqsAsync);
sqsMessageHandler.setQueue(Constants.QUEUE_NAME);
return IntegrationFlows.from(outboundRequests())
.transform(objectToJsonTransformer())
.log()
.handle(sqsMessageHandler)
.get();
}
}
The SqsMessageHandler is the important Spring Integration class we’re using with an AmazonSQSAsync bean which is essenatially just an SQS client – we’ll define that in a second. One more thing to call out here is the objectToJsonTransformer() call which is a bean definition since we have to tell Spring Integration how to serialize the messages.
@Configuration
@Profile("manager")
public class ManagerConfiguration {
// previous content is omitted for simplicity
@Bean
public ObjectToJsonTransformer objectToJsonTransformer() {
return new ObjectToJsonTransformer();
}
}
Then on, let’s create a new class to define the SQS client:
@Configuration
public class SqsConfiguration {
@Bean
public AmazonSQSAsync amazonSQSAsync() {
AmazonSQSAsync sqsAsync = AmazonSQSAsyncClientBuilder.standard().build();
ListQueuesResult listQueuesResult = sqsAsync.listQueues(Constants.QUEUE_NAME);
if (listQueuesResult.getQueueUrls().isEmpty()) {
sqsAsync.createQueueAsync(Constants.QUEUE_NAME);
}
return sqsAsync;
}
}
If you’re running a real AWS SQS queue, this will be the configuration but since we have emulators, let’s configure the SQS client to point to localhost – i.e. the emulated SQS service. It’s also important to note that the queue has to be created so the configuration above actually creates the queue on startup if it doesn’t exist.
@Configuration
public class SqsConfiguration {
@Bean
public AmazonSQSAsync amazonSQSAsync() {
AwsClientBuilder.EndpointConfiguration endpointConfiguration =
new AwsClientBuilder.EndpointConfiguration("http://localhost:4566", "us-east-1");
AmazonSQSAsync sqsAsync = AmazonSQSAsyncClientBuilder.standard().withCredentials(
new AWSStaticCredentialsProvider(new BasicAWSCredentials("", ""))
).withEndpointConfiguration(endpointConfiguration).build();
ListQueuesResult listQueuesResult = sqsAsync.listQueues(Constants.QUEUE_NAME);
if (listQueuesResult.getQueueUrls().isEmpty()) {
sqsAsync.createQueueAsync(Constants.QUEUE_NAME);
}
return sqsAsync;
}
}
I’ll use localstack to create the emulated service, you’ll see it in a sec.
That concludes the configuration for the manager.
Worker
The worker is going to be very similar:
@Configuration
@Profile("worker")
public class WorkerConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private RemotePartitioningWorkerStepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Bean
public IntegrationFlow inboundFlow(AmazonSQSAsync sqsAsync) {
SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(sqsAsync, Constants.QUEUE_NAME);
return IntegrationFlows
.from(adapter)
.transform(jsonToObjectTransformer())
.channel(inboundRequests())
.get();
}
@Bean
public Transformer jsonToObjectTransformer() {
return new JsonToStepExecutionRequestTransformer();
}
@Bean
public QueueChannel inboundRequests() {
return new QueueChannel();
}
}
In this case, the SqsMessageDrivenChannelAdapter is used with the SQS client and we need to deserialize the JSON message into an Object so we’ll use a JsonToStepExecutionRequestTransformer which looks the following:
public class JsonToStepExecutionRequestTransformer extends AbstractTransformer {
@Override
protected Object doTransform(Message<?> message) {
Map map = null;
try {
map = new ObjectMapper().readValue(message.getPayload().toString(), Map.class);
StepExecutionRequest stepExecutionRequest = new StepExecutionRequest((String) map.get("stepName"), Long.valueOf((Integer) map.get("jobExecutionId")),
Long.valueOf((Integer) map.get("stepExecutionId")));
return this.getMessageBuilderFactory().withPayload(stepExecutionRequest).build();
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}
And the remaining configuration for the worker is the steps:
@Configuration
@Profile("worker")
public class WorkerConfiguration {
// previous content is omitted for simplicity
@Bean(name = "simpleStep")
public Step simpleStep() {
return stepBuilderFactory.get(Constants.WORKER_STEP_NAME)
.inputChannel(inboundRequests())
.<Integer, Customer>chunk(100)
.reader(itemReader(null))
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
@Bean
public ItemWriter<Customer> itemWriter() {
return new JdbcBatchItemWriterBuilder<Customer>()
.beanMapped()
.dataSource(dataSource)
.sql("INSERT INTO customers (id) VALUES (:id)")
.build();
}
@Bean
public ItemProcessor<Integer, Customer> itemProcessor() {
return new ItemProcessor<>() {
@Override
public Customer process(Integer item) {
return new Customer(item);
}
};
}
@Bean
@StepScope
public ItemReader<Integer> itemReader(@Value("#{stepExecutionContext['data']}") List<Integer> data) {
List<Integer> remainingData = new ArrayList<>(data);
return new ItemReader<>() {
@Override
public Integer read() {
if (remainingData.size() > 0) {
return remainingData.remove(0);
}
return null;
}
};
}
}
The steps are essentially the same as for the Kafka article.
The Customer class:
public class Customer {
private int id;
public Customer(int id) {
this.id = id;
}
public int getId() {
return id;
}
}
That’s it.
Bit more generic config
Liquibase changelog in src/main/resources/db/changelog/db.changelog-master.xml:
<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.1.xsd">
<changeSet id="0001-initial" author="Arnold Galovics">
<createTable tableName="customers">
<column name="id" type="number">
</column>
</createTable>
<sqlFile path="classpath:/org/springframework/batch/core/schema-mysql.sql" relativeToChangelogFile="false"/>
</changeSet>
</databaseChangeLog>
The application.properties:
spring.datasource.url=jdbc:mysql://localhost:3306/db_example?createDatabaseIfNotExist=true spring.datasource.username=root spring.datasource.password=mysql spring.datasource.driver-class-name=com.mysql.jdbc.Driver spring.liquibase.change-log=classpath:db/changelog/db.changelog-master.xml
That’s all.
Testing
For testing, we need the infrastructure, a MySQL server and a locakstack server to provide an emulated SQS service. Let’s create a docker-compose.yml in the root of the project:
version: "3"
services:
mysql:
image: mysql
ports:
- '3306:3306'
command: --default-authentication-plugin=mysql_native_password
restart: always
environment:
MYSQL_ROOT_PASSWORD: mysql
localstack:
image: localstack/localstack
ports:
- "4566:4566"
environment:
- SERVICES=sqs
- DOCKER_HOST=unix:///var/run/docker.sock
- DEFAULT_REGION=us-east-1
- HOSTNAME_EXTERNAL=localhost
volumes:
- "/var/run/docker.sock:/var/run/docker.sock"
That’s all. Let’s do a docker-compose up to start it.
Then start 3 worker instances by executing the below command three times:
$ ./gradlew bootRun --args='--spring.profiles.active=worker'
Then let’s start the manager with:
$ ./gradlew bootRun --args='--spring.profiles.active=manager'
The manager instance will distribute the work and then wait until the workers have completed all the partitions and eventually stops successfully with the message:
2022-04-10 12:20:35.539 INFO 26368 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=partitioningJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED] in 6s629ms
And then if we check the database state on the customers table, you’ll find 1000 distinct IDs which is the expected behavior.
Summary
That’s how easy it is to integrate Spring Batch with AWS SQS for remote partitioning.
The full code is available on GitHub.
If you liked it, feel free to share it with your friends, colleagues and make sure to follow me on Twitter and Facebook.