I’ve been writing about Spring Batch lately and one of the questions I had in terms of fault tolerance is what happens with remote partitioning in case of a Spring Batch manager failure.
If you’re not familiar with remote partitioning, here’s a great article describing it: Scaling Spring Batch processing with remote partitioning using Kafka. TLDR; you have a single, dedicated so called manager instance that distributes the work among multiple workers using a messaging channel. After the manager distributed the work, it essentially just waits and polls whether all the work has been completed by the workers.
Now, there was one use-case I was really curious about. Let’s say the manager successfully distributed the work and the workers have started to do their job. The manager now is waiting for completion by periodically polling the database for the state changes.
What if the manager crashes during this time, while it’s waiting? Can I resume the existing job which is still ongoing? If so, how?
I tried to look at the potential solutions and figure it out myself but I didn’t find a “resume” operation on any of the Spring Batch interfaces so I raised a question on StackOverflow.
Mahmoud Ben Hassine, the lead of the Spring Batch project helped me out explaining how this could be done emphasizing the fact that it’s not something Spring Batch offers out of the box.
So, in this article I’m going to show you how I implemented this using an example project so you don’t need to worry about Spring Batch manager failures anymore.
The idea
If you haven’t read the previous articles, I suggest you do it; especially the one with Kafka because that’s what I’m going to use to showcase the solution: Scaling Spring Batch processing with remote partitioning using Kafka.
So, since resuming manager jobs where they left off is not a functionality provided by Spring Batch, we need to implement it.
The idea is to make Spring Batch believe that the manager job failed even though the partitions that it created were successfully processed. And then, we can restart the original job with the same parameters and Spring Batch will recognize “oh well, all the partitions have completed so nothing to do here”.
In a nutshell, it’s just setting the job’s state to FAILED
and then waiting for all step executions (partitions) to complete. Then using the JobOperator
, restarting the original job will transition the job to the COMPLETED
state. Sounds easier than it’s done.
Coding the solution
The code I’m gonna use further in the article can be found here on GitHub.
Let’s build the skeleton and then we can fill it up with content. Since there’s gonna be some custom implementation to resume/start jobs upon application startup, we gotta get rid of the default Spring Batch behavior that starts up the present jobs automatically. That’s done by setting the following property in the application.properties
:
spring.batch.job.enabled=false
But the thing is, we don’t want this startup only in case it’s a manager instance and since we’re using profiles to control the app whether it’s a manager or a worker, we can simply create a new file called application-manager.properties
which will only be applied in case it has the manager
profile.
Good. Next up, let’s add a slight delay into the worker processing so it’s not gonna be completed within the matter of milliseconds. Open the WorkerConfiguration
class and add a Thread.sleep
to the ItemProcessor
:
@Bean public ItemProcessor<Integer, Customer> itemProcessor() { return new ItemProcessor<>() { @Override public Customer process(Integer item) throws InterruptedException { Thread.sleep(50); return new Customer(item); } }; }
Awesome. Let’s code the skeleton of the startup/resume logic. I’m gonna create a new class called ManagerStarter
that’ll implement the CommandLineRunner
Spring interface.
@Component @Profile("manager") public class ManagerStarter implements CommandLineRunner { private static final Logger logger = LoggerFactory.getLogger(ManagerStarter.class); @Override public void run(String... args) throws Exception { if (areThereStuckJobs()) { List<Long> stuckJobIds = getStuckJobIds(); stuckJobIds.forEach(this::handleStuckJob); } else { // start the job as usual } } private void handleStuckJob(Long stuckJobId) { try { waitUntilAllPartitionsFinished(stuckJobId); // update job state // restart job } catch (Exception e) { throw new RuntimeException("Exception while handling a stuck job", e); } } private void waitUntilAllPartitionsFinished(Long stuckJobId) throws InterruptedException { while (!areAllPartitionsCompleted(stuckJobId)) { logger.info("Sleeping for a second to wait for the partitions to complete for job {}", stuckJobId); Thread.sleep(1000); } } private boolean areAllPartitionsCompleted(Long jobId) { // decide if all partitions have completed return true; } private List<Long> getStuckJobIds() { // get all stuck job ids return Collections.emptyList(); } private boolean areThereStuckJobs() { // decide if there's any stuck job return false; } }
Let’s fill this up step by step. The implementation for starting a new job upon startup is gonna be easy but needs a tiny bit of extra prep.
Let’s go to the ManagerConfiguration
and adjust the following things:
- Extract the partitioning job name into a public constant
- Extract the partitioning step name into a public constant
- Add a
JobRegistryBeanPostProcessor
to the app context
Here’s how the whole thing looks:
@Configuration @Profile("manager") public class ManagerConfiguration { public static final String PARTITIONING_JOB_NAME = "partitioningJob"; public static final String PARTITIONER_STEP_NAME = "partitionerStep"; @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private RemotePartitioningManagerStepBuilderFactory stepBuilderFactory; @Autowired private KafkaTemplate kafkaTemplate; @Bean public DirectChannel outboundRequests() { return new DirectChannel(); } @Bean public IntegrationFlow outboundFlow() { KafkaProducerMessageHandler messageHandler = new KafkaProducerMessageHandler(kafkaTemplate); messageHandler.setTopicExpression(new LiteralExpression(Constants.TOPIC_NAME)); Function<Message<?>, Long> partitionIdFn = (m) -> { StepExecutionRequest executionRequest = (StepExecutionRequest) m.getPayload(); return executionRequest.getStepExecutionId() % Constants.TOPIC_PARTITION_COUNT; }; messageHandler.setPartitionIdExpression(new FunctionExpression<>(partitionIdFn)); return IntegrationFlows .from(outboundRequests()) .log() .handle(messageHandler) .get(); } @Bean(name = PARTITIONING_JOB_NAME) public Job partitioningJob() { return jobBuilderFactory.get(PARTITIONING_JOB_NAME) .start(partitionerStep()) .incrementer(new RunIdIncrementer()) .build(); } @Bean public ExamplePartitioner partitioner() { return new ExamplePartitioner(); } @Bean public Step partitionerStep() { return stepBuilderFactory.get(PARTITIONER_STEP_NAME) .partitioner(Constants.WORKER_STEP_NAME, partitioner()) .outputChannel(outboundRequests()) .build(); } @Bean public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) { final JobRegistryBeanPostProcessor answer = new JobRegistryBeanPostProcessor(); answer.setJobRegistry(jobRegistry); return answer; } }
Good, let’s go back to the ManagerStarter
class and implement the startup.
The implementation is going to be very similar to what Spring Batch does internally:
@Component @Profile("manager") public class ManagerStarter implements CommandLineRunner { private static final Logger logger = LoggerFactory.getLogger(ManagerStarter.class); @Autowired private JobLauncher jobLauncher; @Autowired private JobLocator jobLocator; @Autowired private JobExplorer jobExplorer; @Override public void run(String... args) throws Exception { if (areThereStuckJobs()) { List<Long> stuckJobIds = getStuckJobIds(); stuckJobIds.forEach(this::handleStuckJob); } else { Job partitioningJob = jobLocator.getJob(PARTITIONING_JOB_NAME); JobParameters jobParameters = new JobParametersBuilder(jobExplorer) .getNextJobParameters(partitioningJob) .toJobParameters(); jobLauncher.run(partitioningJob, jobParameters); } } // omitted for simplicity }
Easy I’d say. Let’s continue implementing the building blocks of the resume logic.
Let’s see how to check whether there’s a stuck job. But before that let’s dig into what happens to the database in case the job starts.
We’ll have:
- 1 record in the
BATCH_JOB_INSTANCE
table that represents this job - 1 record in the
BATCH_JOB_EXECUTION
table that represents a single execution for the correspondingBATCH_JOB_INSTANCE
- 1+50 records in
BATCH_STEP_EXECUTION
table because the first step of the job is the partitioning step; that’ll spawn 50 partitions which means the worker step needs to be executed 50 times, hence the 51 rows.
And then if the manager fails in the middle of the workers processing the partitions:
- The
BATCH_JOB_EXECUTION
table will contain the execution stuck inSTARTED
status - The
BATCH_STEP_EXECUTION
table will have the partitioning step stuck inSTARTED
status as well
BATCH_JOB_INSTANCE
JOB_INSTANCE_ID | JOB_NAME | JOB_KEY |
1 | partitioningJob | 853d3449e311f40366811cbefb3d93d7 |
BATCH_JOB_EXECUTION
JOB_EXECUTION_ID | JOB_INSTANCE_ID | START_TIME | STATUS |
1 | 1 | 2022-04-26 13:52:22.655000 | STARTED |
BATCH_STEP_EXECUTION
STEP_EXECUTION_ID | STEP_NAME | JOB_EXECUTION_ID | STATUS |
1 | partitionerStep | 1 | STARTED |
2 | simpleStep:partition1 | 1 | COMPLETED |
And in theory, if we resume the same job (i.e. same parameters), the BATCH_JOB_INSTANCE
will still contain 1 row but the BATCH_JOB_EXECUTION
will have one more row that represents the resumed job..
@Component @Profile("manager") public class ManagerStarter implements CommandLineRunner { // omitted for simplicity @Autowired private NamedParameterJdbcTemplate jdbcTemplate; @Override public void run(String... args) throws Exception { // omitted for simplicity } // omitted for simplicity private boolean areThereStuckJobs() { Long stuckJobCount = jdbcTemplate.queryForObject("SELECT COUNT(*) as STUCK_JOB_COUNT FROM BATCH_JOB_INSTANCE bji " + "INNER JOIN BATCH_JOB_EXECUTION bje ON bji.JOB_INSTANCE_ID = bje.JOB_INSTANCE_ID " + "WHERE bje.STATUS IN (:statuses) AND bji.JOB_NAME = :jobName AND bje.JOB_INSTANCE_ID NOT IN (" + "SELECT bje.JOB_INSTANCE_ID FROM BATCH_JOB_INSTANCE bji " + "INNER JOIN BATCH_JOB_EXECUTION bje ON bji.JOB_INSTANCE_ID = bje.JOB_INSTANCE_ID " + "WHERE bje.STATUS = :completedStatus AND bji.JOB_NAME = :jobName)", Map.of( "statuses", List.of(STARTED.name(), FAILED.name()), "jobName", PARTITIONING_JOB_NAME, "completedStatus", COMPLETED.name()), Long.class); return stuckJobCount != 0L; } }
The query does the following:
- Gets all
JOB_INSTANCE_ID
s for the partitioning job which have a completed job execution - Gets all partitioning job executions which are in
STARTED
orFAILED
state and from this set, it removes the jobs that already have a completed job execution
The last step is needed because if the job is resumed and the execution finally completes, there’s gonna be 2 records for the same job instance, one FAILED
and one COMPLETED
so we wanna make sure these are excluded.
The last thing in the query is to count the remaining rows in the resultset. If there’s even a single row, it indicates there’s a job stuck.
Similarly to this, we’ll need the stuck job IDs to modify their state to FAILED
so that Spring Batch can resume later on. The query is almost identical to the one above:
@Component @Profile("manager") public class ManagerStarter implements CommandLineRunner { // omitted for simplicity @Autowired private NamedParameterJdbcTemplate jdbcTemplate; @Override public void run(String... args) throws Exception { // omitted for simplicity } // omitted for simplicity private List<Long> getStuckJobIds() { return jdbcTemplate.queryForList("SELECT bje.JOB_EXECUTION_ID FROM BATCH_JOB_INSTANCE bji " + "INNER JOIN BATCH_JOB_EXECUTION bje ON bji.JOB_INSTANCE_ID = bje.JOB_INSTANCE_ID " + "WHERE bje.STATUS IN (:statuses) AND bji.JOB_NAME = :jobName AND bje.JOB_INSTANCE_ID NOT IN (" + "SELECT bje.JOB_INSTANCE_ID FROM BATCH_JOB_INSTANCE bji " + "INNER JOIN BATCH_JOB_EXECUTION bje ON bji.JOB_INSTANCE_ID = bje.JOB_INSTANCE_ID " + "WHERE bje.STATUS = :completedStatus AND bji.JOB_NAME = :jobName)", Map.of( "statuses", List.of(STARTED.name(), FAILED.name()), "jobName", PARTITIONING_JOB_NAME, "completedStatus", COMPLETED.name()), Long.class); } }
The difference is that instead of the count aggregate function, I’m just returning the job execution IDs.
Good. Now, let’s decide if all partitions have finished for our job. This is not gonna be a complicated job because as seen on one of the tables above, this is stored in the BATCH_STEP_EXECUTION
table.
@Component @Profile("manager") public class ManagerStarter implements CommandLineRunner { // omitted for simplicity @Autowired private NamedParameterJdbcTemplate jdbcTemplate; @Override public void run(String... args) throws Exception { // omitted for simplicity } // omitted for simplicity private boolean areAllPartitionsCompleted(Long jobId) { Long partitionsNotCompleted = jdbcTemplate.queryForObject("SELECT COUNT(bse.STEP_EXECUTION_ID) FROM BATCH_STEP_EXECUTION bse " + "WHERE bse.JOB_EXECUTION_ID = :jobExecutionId AND bse.STEP_NAME <> :stepName AND bse.status <> :status", Map.of( "jobExecutionId", jobId, "stepName", PARTITIONER_STEP_NAME, "status", COMPLETED.name()), Long.class); return partitionsNotCompleted == 0L; } }
Awesome. Next up, as a last step, we need to implement modifying our job state that needs to be resumed. We’ll need a JobOperator
from Spring Batch and a TransactionRunner
that we’ll create in a second to ensure transactionality for our statements.
The TransactionalRunner
looks the following:
@Component public class TransactionRunner { @Transactional(propagation = Propagation.REQUIRES_NEW) public void runInTransaction(Runnable r) { r.run(); } }
And then the ManagerStarter
:
@Component @Profile("manager") public class ManagerStarter implements CommandLineRunner { // omitted for simplicity @Autowired private JobOperator jobOperator; @Autowired private TransactionRunner txRunner; @Override public void run(String... args) throws Exception { if (areThereStuckJobs()) { List<Long> stuckJobIds = getStuckJobIds(); stuckJobIds.forEach(this::handleStuckJob); } else { Job partitioningJob = jobLocator.getJob(PARTITIONING_JOB_NAME); JobParameters jobParameters = new JobParametersBuilder(jobExplorer) .getNextJobParameters(partitioningJob) .toJobParameters(); jobLauncher.run(partitioningJob, jobParameters); } } private void handleStuckJob(Long stuckJobId) { try { waitUntilAllPartitionsFinished(stuckJobId); txRunner.runInTransaction(() -> { jdbcTemplate.update("UPDATE BATCH_STEP_EXECUTION SET STATUS = :status WHERE JOB_EXECUTION_ID = :jobExecutionId AND STEP_NAME = :stepName", Map.of( "status", FAILED.name(), "jobExecutionId", stuckJobId, "stepName", PARTITIONER_STEP_NAME)); jdbcTemplate.update("UPDATE BATCH_JOB_EXECUTION SET STATUS = :status, START_TIME = null, END_TIME = null WHERE JOB_EXECUTION_ID = :jobExecutionId", Map.of( "status", FAILED.name(), "jobExecutionId", stuckJobId)); }); jobOperator.restart(stuckJobId); } catch (Exception e) { throw new RuntimeException("Exception while handling a stuck job", e); } } // omitted for simplicity }
The implementation updates the stuck BATCH_STEP_EXECUTION
for the partitioning step and updates the stuck BATCH_JOB_EXECUTION
for the partitioning job execution. It’s important to note that this is done within the context of the TransactionalRunner
and the reason is because we wanna make sure it’s happening in a separate transaction than the one Spring Batch starts (if I remember correctly, Spring Batch even recognizes if you try to wrap everything into the same transaction and fails).
The new status we need to update the rows is FAILED
. Plus, what took me some time to figure out is the fact that Spring Batch does some internal checks whether a job is running already and it’s sometimes decided based on the START_TIME
and END_TIME
columns of a particular job execution so that’s something we need to set to null
, otherwise Spring Batch will think the job is still running and won’t allow resuming.
Spring Batch manager failure testing
That’s it, let’s test everything.
Start up a single worker:
$ ./gradlew bootRun --args='--spring.profiles.active=worker'
Then start up a manager:
$ ./gradlew bootRun --args='--spring.profiles.active=manager'
Wait until the messages are sent to Kafka. You’ll see it in the logs:
2022-04-26 13:52:24.099 INFO 7100 --- [main] o.s.integration.handler.LoggingHandler : GenericMessage [payload=StepExecutionRequest: [jobExecutionId=7, stepExecutionId=163, stepName=simpleStep], headers={sequenceNumber=49, correlationId=7:simpleStep, id=02bb3c11-ae59-fd1f-60e4-4715a8ac5cec, sequenceSize=50, timestamp=1650973944099}]
Then stop the manager and notice that the worker is still processing the messages.
Start up the manager again:
$ ./gradlew bootRun --args='--spring.profiles.active=manager'
And it’ll wait until the partitions are complete:
2022-04-26 14:24:28.040 INFO 16780 --- [main] c.a.b.s.manager.ManagerStarter : Sleeping for a second to wait for the partitions to complete for job 9 2022-04-26 14:24:29.045 INFO 16780 --- [main] c.a.b.s.manager.ManagerStarter : Sleeping for a second to wait for the partitions to complete for job 9 2022-04-26 14:24:30.092 INFO 16780 --- [main] o.s.b.c.l.support.SimpleJobOperator : Checking status of job execution with id=9 2022-04-26 14:24:30.220 INFO 16780 --- [main] o.s.b.c.l.support.SimpleJobOperator : Attempting to resume job with name=partitioningJob and parameters={run.id=5} 2022-04-26 14:24:30.307 INFO 16780 --- [main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=partitioningJob]] launched with the following parameters: [{run.id=5}] 2022-04-26 14:24:30.396 INFO 16780 --- [main] o.s.batch.core.job.SimpleStepHandler : Executing step: [partitionerStep] 2022-04-26 14:24:30.702 INFO 16780 --- [main] o.s.batch.core.step.AbstractStep : Step: [partitionerStep] executed in 306ms 2022-04-26 14:24:30.743 INFO 16780 --- [main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=partitioningJob]] completed with the following parameters: [{run.id=5}] and the following status: [COMPLETED] in 404ms
Perfect. The job has completed successfully without reprocessing any of the partitions.
Summary
In my opinion this could be a solution provided out-of-the box by Spring Batch but maybe it’s too specific. I’m not sure. Maybe they could just provide a “smart resume” flag configured via properties that does this. We’ll see.
One more note on the implementation. It’s currently considering only a single job to restart. If you need a full-fledged solution that’ll restart any partitioned job, there are a couple of changes that need to be made. Maybe I’ll bring that as well later on.
Hope you liked it, here’s the GitHub code on the master-crash branch.