Spring Batch remote partitioning with AWS SQS

On the last article, I explained how to implement a Spring Batch job with remote partitioning using Kafka. This time I’m gonna discuss how to do Spring Batch remote partitioning with AWS SQS as the messaging layer between the manager and workers.

You can find the previous article here: Scaling Spring Batch processing with remote partitioning using Kafka

If you’re interested in the concepts of remote partitioning, check the article above; right now I’m gonna only focus on the implementation details because the concept is the very same.

Project implementation

Before doing anything, let’s create a Gradle project on start.spring.io with the following dependencies:

  • Spring Batch
  • Spring Integration
  • MySQL
  • Liquibase

This is just the initial set of dependencies, we gotta add a few more manually. Open build.gradle:

dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-batch'
	implementation 'org.springframework.batch:spring-batch-integration'
	implementation 'org.springframework.integration:spring-integration-aws:2.3.5.RELEASE'
	implementation 'org.springframework.cloud:spring-cloud-aws-messaging:2.2.6.RELEASE'
	implementation 'com.amazonaws:aws-java-sdk-sqs:1.12.186'
	implementation 'org.liquibase:liquibase-core'
	runtimeOnly 'mysql:mysql-connector-java'
	testImplementation 'org.springframework.boot:spring-boot-starter-test'
}

The following 3 needs to be added:

  • org.springframework.integration:spring-integration-aws
  • org.springframework.cloud:spring-cloud-aws-messaging
  • com.amazonaws:aws-java-sdk-sqs

These dependencies will supply the AWS SQS support with Spring Integration.

Manager

First of all, the manager configuration class:

@Configuration
@Profile("manager")
public class ManagerConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private RemotePartitioningManagerStepBuilderFactory stepBuilderFactory;
}

This is very similar to the one in the other article but obviously we don’t need the KafkaTemplate here.

The @Profile annotation will ensure that this configuration class is only picked up if the manager profile is set.

Next, the channel we’ll send the partitioned requests to:

@Configuration
@Profile("manager")
public class ManagerConfiguration {
    // previous content is omitted for simplicity

    @Bean
    public DirectChannel outboundRequests() {
        return new DirectChannel();
    }
}

The partitioner is the exact same I used for the Kafka article:

public class ExamplePartitioner implements Partitioner {
    public static final String PARTITION_PREFIX = "partition";

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        int partitionCount = 50;
        Map<String, ExecutionContext> partitions = new HashMap<>();
        for (int i = 0; i < partitionCount; i++) {
            ExecutionContext executionContext = new ExecutionContext();
            executionContext.put("data", new ArrayList<Integer>());
            partitions.put(PARTITION_PREFIX + i, executionContext);
        }
        for (int i = 0; i < 1000; i++) {
            String key = PARTITION_PREFIX + (i % partitionCount);
            ExecutionContext executionContext = partitions.get(key);
            List<Integer> data = (List<Integer>) executionContext.get("data");
            data.add(i + 1);
        }
        return partitions;
    }
}

This partitioner implementation is not doing anything interesting. It creates 50 partitions and for each partition, it puts some numbers into a list that’s accessible under the key data. That means, each partition will have 20 numbers in the list.

Let’s create the job steps and create a bean from the partitioner:

@Configuration
@Profile("manager")
public class ManagerConfiguration {
    // previous content is omitted for simplicity

    @Bean
    public ExamplePartitioner partitioner() {
        return new ExamplePartitioner();
    }
    @Bean
    public Step partitionerStep() {
        return stepBuilderFactory.get("partitionerStep")
                .partitioner(Constants.WORKER_STEP_NAME, partitioner())
                .outputChannel(outboundRequests())
                .build();
    }
}

The Constant class looks the following:

public class Constants {
    public static final String QUEUE_NAME = "work";
    public static final String WORKER_STEP_NAME = "simpleStep";
}

Then the job definition:

@Configuration
@Profile("manager")
public class ManagerConfiguration {
    // previous content is omitted for simplicity

    @Bean(name = "partitionerJob")
    public Job partitionerJob() {
        return jobBuilderFactory.get("partitioningJob")
                .start(partitionerStep())
                .incrementer(new RunIdIncrementer())
                .build();
    }
}

And then hook up the output channel we defined earlier with AWS SQS:

@Configuration
@Profile("manager")
public class ManagerConfiguration {
    // previous content is omitted for simplicity

    @Bean
    public IntegrationFlow outboundFlow(AmazonSQSAsync sqsAsync) {
        SqsMessageHandler sqsMessageHandler = new SqsMessageHandler(sqsAsync);
        sqsMessageHandler.setQueue(Constants.QUEUE_NAME);
        return IntegrationFlows.from(outboundRequests())
                .transform(objectToJsonTransformer())
                .log()
                .handle(sqsMessageHandler)
                .get();
    }
}

The SqsMessageHandler is the important Spring Integration class we’re using with an AmazonSQSAsync bean which is essenatially just an SQS client – we’ll define that in a second. One more thing to call out here is the objectToJsonTransformer() call which is a bean definition since we have to tell Spring Integration how to serialize the messages.

@Configuration
@Profile("manager")
public class ManagerConfiguration {
    // previous content is omitted for simplicity

    @Bean
    public ObjectToJsonTransformer objectToJsonTransformer() {
        return new ObjectToJsonTransformer();
    }
}

Then on, let’s create a new class to define the SQS client:

@Configuration
public class SqsConfiguration {
    @Bean
    public AmazonSQSAsync amazonSQSAsync() {
        AmazonSQSAsync sqsAsync = AmazonSQSAsyncClientBuilder.standard().build();
        ListQueuesResult listQueuesResult = sqsAsync.listQueues(Constants.QUEUE_NAME);
        if (listQueuesResult.getQueueUrls().isEmpty()) {
            sqsAsync.createQueueAsync(Constants.QUEUE_NAME);
        }
        return sqsAsync;
    }
}

If you’re running a real AWS SQS queue, this will be the configuration but since we have emulators, let’s configure the SQS client to point to localhost – i.e. the emulated SQS service. It’s also important to note that the queue has to be created so the configuration above actually creates the queue on startup if it doesn’t exist.

@Configuration
public class SqsConfiguration {
    @Bean
    public AmazonSQSAsync amazonSQSAsync() {
        AwsClientBuilder.EndpointConfiguration endpointConfiguration =
                new AwsClientBuilder.EndpointConfiguration("http://localhost:4566", "us-east-1");

        AmazonSQSAsync sqsAsync = AmazonSQSAsyncClientBuilder.standard().withCredentials(
                new AWSStaticCredentialsProvider(new BasicAWSCredentials("", ""))
        ).withEndpointConfiguration(endpointConfiguration).build();
        ListQueuesResult listQueuesResult = sqsAsync.listQueues(Constants.QUEUE_NAME);
        if (listQueuesResult.getQueueUrls().isEmpty()) {
            sqsAsync.createQueueAsync(Constants.QUEUE_NAME);
        }
        return sqsAsync;
    }
}

I’ll use localstack to create the emulated service, you’ll see it in a sec.

That concludes the configuration for the manager.

Worker

The worker is going to be very similar:

@Configuration
@Profile("worker")
public class WorkerConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private RemotePartitioningWorkerStepBuilderFactory stepBuilderFactory;

    @Autowired
    private DataSource dataSource;

    @Bean
    public IntegrationFlow inboundFlow(AmazonSQSAsync sqsAsync) {
        SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(sqsAsync, Constants.QUEUE_NAME);
        return IntegrationFlows
                .from(adapter)
                .transform(jsonToObjectTransformer())
                .channel(inboundRequests())
                .get();
    }

    @Bean
    public Transformer jsonToObjectTransformer() {
        return new JsonToStepExecutionRequestTransformer();
    }

    @Bean
    public QueueChannel inboundRequests() {
        return new QueueChannel();
    }
}

In this case, the SqsMessageDrivenChannelAdapter is used with the SQS client and we need to deserialize the JSON message into an Object so we’ll use a JsonToStepExecutionRequestTransformer which looks the following:

public class JsonToStepExecutionRequestTransformer extends AbstractTransformer {
    @Override
    protected Object doTransform(Message<?> message) {
        Map map = null;
        try {
            map = new ObjectMapper().readValue(message.getPayload().toString(), Map.class);
            StepExecutionRequest stepExecutionRequest = new StepExecutionRequest((String) map.get("stepName"), Long.valueOf((Integer) map.get("jobExecutionId")),
                    Long.valueOf((Integer) map.get("stepExecutionId")));
            return this.getMessageBuilderFactory().withPayload(stepExecutionRequest).build();
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }
}

And the remaining configuration for the worker is the steps:

@Configuration
@Profile("worker")
public class WorkerConfiguration {
    // previous content is omitted for simplicity

    @Bean(name = "simpleStep")
    public Step simpleStep() {
        return stepBuilderFactory.get(Constants.WORKER_STEP_NAME)
                .inputChannel(inboundRequests())
                .<Integer, Customer>chunk(100)
                .reader(itemReader(null))
                .processor(itemProcessor())
                .writer(itemWriter())
                .build();
    }

    @Bean
    public ItemWriter<Customer> itemWriter() {
        return new JdbcBatchItemWriterBuilder<Customer>()
                .beanMapped()
                .dataSource(dataSource)
                .sql("INSERT INTO customers (id) VALUES (:id)")
                .build();
    }

    @Bean
    public ItemProcessor<Integer, Customer> itemProcessor() {
        return new ItemProcessor<>() {
            @Override
            public Customer process(Integer item) {
                return new Customer(item);
            }
        };
    }

    @Bean
    @StepScope
    public ItemReader<Integer> itemReader(@Value("#{stepExecutionContext['data']}") List<Integer> data) {
        List<Integer> remainingData = new ArrayList<>(data);
        return new ItemReader<>() {
            @Override
            public Integer read() {
                if (remainingData.size() > 0) {
                    return remainingData.remove(0);
                }

                return null;
            }
        };
    }
}

The steps are essentially the same as for the Kafka article.

The Customer class:

public class Customer {
    private int id;

    public Customer(int id) {
        this.id = id;
    }

    public int getId() {
        return id;
    }
}

That’s it.

Bit more generic config

Liquibase changelog in src/main/resources/db/changelog/db.changelog-master.xml:

<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
                   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                   xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.1.xsd">
    <changeSet id="0001-initial" author="Arnold Galovics">
        <createTable tableName="customers">
            <column name="id" type="number">
            </column>
        </createTable>
        <sqlFile path="classpath:/org/springframework/batch/core/schema-mysql.sql" relativeToChangelogFile="false"/>
    </changeSet>
</databaseChangeLog>

The application.properties:

spring.datasource.url=jdbc:mysql://localhost:3306/db_example?createDatabaseIfNotExist=true
spring.datasource.username=root
spring.datasource.password=mysql
spring.datasource.driver-class-name=com.mysql.jdbc.Driver

spring.liquibase.change-log=classpath:db/changelog/db.changelog-master.xml

That’s all.

Testing

For testing, we need the infrastructure, a MySQL server and a locakstack server to provide an emulated SQS service. Let’s create a docker-compose.yml in the root of the project:

version: "3"
services:
  mysql:
    image: mysql
    ports:
      - '3306:3306'
    command: --default-authentication-plugin=mysql_native_password
    restart: always
    environment:
      MYSQL_ROOT_PASSWORD: mysql
  localstack:
    image: localstack/localstack
    ports:
      - "4566:4566"
    environment:
      - SERVICES=sqs
      - DOCKER_HOST=unix:///var/run/docker.sock
      - DEFAULT_REGION=us-east-1
      - HOSTNAME_EXTERNAL=localhost
    volumes:
      - "/var/run/docker.sock:/var/run/docker.sock"

That’s all. Let’s do a docker-compose up to start it.

Then start 3 worker instances by executing the below command three times:

$ ./gradlew bootRun --args='--spring.profiles.active=worker'

Then let’s start the manager with:

$ ./gradlew bootRun --args='--spring.profiles.active=manager'

The manager instance will distribute the work and then wait until the workers have completed all the partitions and eventually stops successfully with the message:

2022-04-10 12:20:35.539  INFO 26368 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=partitioningJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED] in 6s629ms

And then if we check the database state on the customers table, you’ll find 1000 distinct IDs which is the expected behavior.

Summary

That’s how easy it is to integrate Spring Batch with AWS SQS for remote partitioning.

The full code is available on GitHub.

If you liked it, feel free to share it with your friends, colleagues and make sure to follow me on Twitter and Facebook.

Leave a Reply

Your email address will not be published. Required fields are marked *