On the last article, I explained how to implement a Spring Batch job with remote partitioning using Kafka. This time I’m gonna discuss how to do Spring Batch remote partitioning with AWS SQS as the messaging layer between the manager and workers.
You can find the previous article here: Scaling Spring Batch processing with remote partitioning using Kafka
If you’re interested in the concepts of remote partitioning, check the article above; right now I’m gonna only focus on the implementation details because the concept is the very same.
Project implementation
Before doing anything, let’s create a Gradle project on start.spring.io with the following dependencies:
- Spring Batch
- Spring Integration
- MySQL
- Liquibase
This is just the initial set of dependencies, we gotta add a few more manually. Open build.gradle
:
dependencies { implementation 'org.springframework.boot:spring-boot-starter-batch' implementation 'org.springframework.batch:spring-batch-integration' implementation 'org.springframework.integration:spring-integration-aws:2.3.5.RELEASE' implementation 'org.springframework.cloud:spring-cloud-aws-messaging:2.2.6.RELEASE' implementation 'com.amazonaws:aws-java-sdk-sqs:1.12.186' implementation 'org.liquibase:liquibase-core' runtimeOnly 'mysql:mysql-connector-java' testImplementation 'org.springframework.boot:spring-boot-starter-test' }
The following 3 needs to be added:
org.springframework.integration:spring-integration-aws
org.springframework.cloud:spring-cloud-aws-messaging
com.amazonaws:aws-java-sdk-sqs
These dependencies will supply the AWS SQS support with Spring Integration.
Manager
First of all, the manager configuration class:
@Configuration @Profile("manager") public class ManagerConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private RemotePartitioningManagerStepBuilderFactory stepBuilderFactory; }
This is very similar to the one in the other article but obviously we don’t need the KafkaTemplate
here.
The @Profile
annotation will ensure that this configuration class is only picked up if the manager
profile is set.
Next, the channel we’ll send the partitioned requests to:
@Configuration @Profile("manager") public class ManagerConfiguration { // previous content is omitted for simplicity @Bean public DirectChannel outboundRequests() { return new DirectChannel(); } }
The partitioner is the exact same I used for the Kafka article:
public class ExamplePartitioner implements Partitioner { public static final String PARTITION_PREFIX = "partition"; @Override public Map<String, ExecutionContext> partition(int gridSize) { int partitionCount = 50; Map<String, ExecutionContext> partitions = new HashMap<>(); for (int i = 0; i < partitionCount; i++) { ExecutionContext executionContext = new ExecutionContext(); executionContext.put("data", new ArrayList<Integer>()); partitions.put(PARTITION_PREFIX + i, executionContext); } for (int i = 0; i < 1000; i++) { String key = PARTITION_PREFIX + (i % partitionCount); ExecutionContext executionContext = partitions.get(key); List<Integer> data = (List<Integer>) executionContext.get("data"); data.add(i + 1); } return partitions; } }
This partitioner implementation is not doing anything interesting. It creates 50 partitions and for each partition, it puts some numbers into a list that’s accessible under the key data
. That means, each partition will have 20 numbers in the list.
Let’s create the job steps and create a bean from the partitioner:
@Configuration @Profile("manager") public class ManagerConfiguration { // previous content is omitted for simplicity @Bean public ExamplePartitioner partitioner() { return new ExamplePartitioner(); } @Bean public Step partitionerStep() { return stepBuilderFactory.get("partitionerStep") .partitioner(Constants.WORKER_STEP_NAME, partitioner()) .outputChannel(outboundRequests()) .build(); } }
The Constant
class looks the following:
public class Constants { public static final String QUEUE_NAME = "work"; public static final String WORKER_STEP_NAME = "simpleStep"; }
Then the job definition:
@Configuration @Profile("manager") public class ManagerConfiguration { // previous content is omitted for simplicity @Bean(name = "partitionerJob") public Job partitionerJob() { return jobBuilderFactory.get("partitioningJob") .start(partitionerStep()) .incrementer(new RunIdIncrementer()) .build(); } }
And then hook up the output channel we defined earlier with AWS SQS:
@Configuration @Profile("manager") public class ManagerConfiguration { // previous content is omitted for simplicity @Bean public IntegrationFlow outboundFlow(AmazonSQSAsync sqsAsync) { SqsMessageHandler sqsMessageHandler = new SqsMessageHandler(sqsAsync); sqsMessageHandler.setQueue(Constants.QUEUE_NAME); return IntegrationFlows.from(outboundRequests()) .transform(objectToJsonTransformer()) .log() .handle(sqsMessageHandler) .get(); } }
The SqsMessageHandler
is the important Spring Integration class we’re using with an AmazonSQSAsync
bean which is essenatially just an SQS client – we’ll define that in a second. One more thing to call out here is the objectToJsonTransformer()
call which is a bean definition since we have to tell Spring Integration how to serialize the messages.
@Configuration @Profile("manager") public class ManagerConfiguration { // previous content is omitted for simplicity @Bean public ObjectToJsonTransformer objectToJsonTransformer() { return new ObjectToJsonTransformer(); } }
Then on, let’s create a new class to define the SQS client:
@Configuration public class SqsConfiguration { @Bean public AmazonSQSAsync amazonSQSAsync() { AmazonSQSAsync sqsAsync = AmazonSQSAsyncClientBuilder.standard().build(); ListQueuesResult listQueuesResult = sqsAsync.listQueues(Constants.QUEUE_NAME); if (listQueuesResult.getQueueUrls().isEmpty()) { sqsAsync.createQueueAsync(Constants.QUEUE_NAME); } return sqsAsync; } }
If you’re running a real AWS SQS queue, this will be the configuration but since we have emulators, let’s configure the SQS client to point to localhost – i.e. the emulated SQS service. It’s also important to note that the queue has to be created so the configuration above actually creates the queue on startup if it doesn’t exist.
@Configuration public class SqsConfiguration { @Bean public AmazonSQSAsync amazonSQSAsync() { AwsClientBuilder.EndpointConfiguration endpointConfiguration = new AwsClientBuilder.EndpointConfiguration("http://localhost:4566", "us-east-1"); AmazonSQSAsync sqsAsync = AmazonSQSAsyncClientBuilder.standard().withCredentials( new AWSStaticCredentialsProvider(new BasicAWSCredentials("", "")) ).withEndpointConfiguration(endpointConfiguration).build(); ListQueuesResult listQueuesResult = sqsAsync.listQueues(Constants.QUEUE_NAME); if (listQueuesResult.getQueueUrls().isEmpty()) { sqsAsync.createQueueAsync(Constants.QUEUE_NAME); } return sqsAsync; } }
I’ll use localstack
to create the emulated service, you’ll see it in a sec.
That concludes the configuration for the manager.
Worker
The worker is going to be very similar:
@Configuration @Profile("worker") public class WorkerConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private RemotePartitioningWorkerStepBuilderFactory stepBuilderFactory; @Autowired private DataSource dataSource; @Bean public IntegrationFlow inboundFlow(AmazonSQSAsync sqsAsync) { SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(sqsAsync, Constants.QUEUE_NAME); return IntegrationFlows .from(adapter) .transform(jsonToObjectTransformer()) .channel(inboundRequests()) .get(); } @Bean public Transformer jsonToObjectTransformer() { return new JsonToStepExecutionRequestTransformer(); } @Bean public QueueChannel inboundRequests() { return new QueueChannel(); } }
In this case, the SqsMessageDrivenChannelAdapter
is used with the SQS client and we need to deserialize the JSON message into an Object so we’ll use a JsonToStepExecutionRequestTransformer
which looks the following:
public class JsonToStepExecutionRequestTransformer extends AbstractTransformer { @Override protected Object doTransform(Message<?> message) { Map map = null; try { map = new ObjectMapper().readValue(message.getPayload().toString(), Map.class); StepExecutionRequest stepExecutionRequest = new StepExecutionRequest((String) map.get("stepName"), Long.valueOf((Integer) map.get("jobExecutionId")), Long.valueOf((Integer) map.get("stepExecutionId"))); return this.getMessageBuilderFactory().withPayload(stepExecutionRequest).build(); } catch (JsonProcessingException e) { throw new RuntimeException(e); } } }
And the remaining configuration for the worker is the steps:
@Configuration @Profile("worker") public class WorkerConfiguration { // previous content is omitted for simplicity @Bean(name = "simpleStep") public Step simpleStep() { return stepBuilderFactory.get(Constants.WORKER_STEP_NAME) .inputChannel(inboundRequests()) .<Integer, Customer>chunk(100) .reader(itemReader(null)) .processor(itemProcessor()) .writer(itemWriter()) .build(); } @Bean public ItemWriter<Customer> itemWriter() { return new JdbcBatchItemWriterBuilder<Customer>() .beanMapped() .dataSource(dataSource) .sql("INSERT INTO customers (id) VALUES (:id)") .build(); } @Bean public ItemProcessor<Integer, Customer> itemProcessor() { return new ItemProcessor<>() { @Override public Customer process(Integer item) { return new Customer(item); } }; } @Bean @StepScope public ItemReader<Integer> itemReader(@Value("#{stepExecutionContext['data']}") List<Integer> data) { List<Integer> remainingData = new ArrayList<>(data); return new ItemReader<>() { @Override public Integer read() { if (remainingData.size() > 0) { return remainingData.remove(0); } return null; } }; } }
The steps are essentially the same as for the Kafka article.
The Customer
class:
public class Customer { private int id; public Customer(int id) { this.id = id; } public int getId() { return id; } }
That’s it.
Bit more generic config
Liquibase changelog in src/main/resources/db/changelog/db.changelog-master.xml
:
<?xml version="1.0" encoding="UTF-8"?> <databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.1.xsd"> <changeSet id="0001-initial" author="Arnold Galovics"> <createTable tableName="customers"> <column name="id" type="number"> </column> </createTable> <sqlFile path="classpath:/org/springframework/batch/core/schema-mysql.sql" relativeToChangelogFile="false"/> </changeSet> </databaseChangeLog>
The application.properties
:
spring.datasource.url=jdbc:mysql://localhost:3306/db_example?createDatabaseIfNotExist=true spring.datasource.username=root spring.datasource.password=mysql spring.datasource.driver-class-name=com.mysql.jdbc.Driver spring.liquibase.change-log=classpath:db/changelog/db.changelog-master.xml
That’s all.
Testing
For testing, we need the infrastructure, a MySQL server and a locakstack server to provide an emulated SQS service. Let’s create a docker-compose.yml
in the root of the project:
version: "3" services: mysql: image: mysql ports: - '3306:3306' command: --default-authentication-plugin=mysql_native_password restart: always environment: MYSQL_ROOT_PASSWORD: mysql localstack: image: localstack/localstack ports: - "4566:4566" environment: - SERVICES=sqs - DOCKER_HOST=unix:///var/run/docker.sock - DEFAULT_REGION=us-east-1 - HOSTNAME_EXTERNAL=localhost volumes: - "/var/run/docker.sock:/var/run/docker.sock"
That’s all. Let’s do a docker-compose up
to start it.
Then start 3 worker instances by executing the below command three times:
$ ./gradlew bootRun --args='--spring.profiles.active=worker'
Then let’s start the manager with:
$ ./gradlew bootRun --args='--spring.profiles.active=manager'
The manager instance will distribute the work and then wait until the workers have completed all the partitions and eventually stops successfully with the message:
2022-04-10 12:20:35.539 INFO 26368 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=partitioningJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED] in 6s629ms
And then if we check the database state on the customers
table, you’ll find 1000 distinct IDs which is the expected behavior.
Summary
That’s how easy it is to integrate Spring Batch with AWS SQS for remote partitioning.
The full code is available on GitHub.
If you liked it, feel free to share it with your friends, colleagues and make sure to follow me on Twitter and Facebook.