Handling manager failures in Spring Batch

I’ve been writing about Spring Batch lately and one of the questions I had in terms of fault tolerance is what happens with remote partitioning in case of a Spring Batch manager failure.

If you’re not familiar with remote partitioning, here’s a great article describing it: Scaling Spring Batch processing with remote partitioning using Kafka. TLDR; you have a single, dedicated so called manager instance that distributes the work among multiple workers using a messaging channel. After the manager distributed the work, it essentially just waits and polls whether all the work has been completed by the workers.

Now, there was one use-case I was really curious about. Let’s say the manager successfully distributed the work and the workers have started to do their job. The manager now is waiting for completion by periodically polling the database for the state changes.

What if the manager crashes during this time, while it’s waiting? Can I resume the existing job which is still ongoing? If so, how?

I tried to look at the potential solutions and figure it out myself but I didn’t find a “resume” operation on any of the Spring Batch interfaces so I raised a question on StackOverflow.

Mahmoud Ben Hassine, the lead of the Spring Batch project helped me out explaining how this could be done emphasizing the fact that it’s not something Spring Batch offers out of the box.

So, in this article I’m going to show you how I implemented this using an example project so you don’t need to worry about Spring Batch manager failures anymore.

The idea

If you haven’t read the previous articles, I suggest you do it; especially the one with Kafka because that’s what I’m going to use to showcase the solution: Scaling Spring Batch processing with remote partitioning using Kafka.

So, since resuming manager jobs where they left off is not a functionality provided by Spring Batch, we need to implement it.

The idea is to make Spring Batch believe that the manager job failed even though the partitions that it created were successfully processed. And then, we can restart the original job with the same parameters and Spring Batch will recognize “oh well, all the partitions have completed so nothing to do here”.

In a nutshell, it’s just setting the job’s state to FAILED and then waiting for all step executions (partitions) to complete. Then using the JobOperator, restarting the original job will transition the job to the COMPLETED state. Sounds easier than it’s done.

Coding the solution

The code I’m gonna use further in the article can be found here on GitHub.

Let’s build the skeleton and then we can fill it up with content. Since there’s gonna be some custom implementation to resume/start jobs upon application startup, we gotta get rid of the default Spring Batch behavior that starts up the present jobs automatically. That’s done by setting the following property in the application.properties:

spring.batch.job.enabled=false

But the thing is, we don’t want this startup only in case it’s a manager instance and since we’re using profiles to control the app whether it’s a manager or a worker, we can simply create a new file called application-manager.properties which will only be applied in case it has the manager profile.

Good. Next up, let’s add a slight delay into the worker processing so it’s not gonna be completed within the matter of milliseconds. Open the WorkerConfiguration class and add a Thread.sleep to the ItemProcessor:

    @Bean
    public ItemProcessor<Integer, Customer> itemProcessor() {
        return new ItemProcessor<>() {
            @Override
            public Customer process(Integer item) throws InterruptedException {
                Thread.sleep(50);
                return new Customer(item);
            }
        };
    }

Awesome. Let’s code the skeleton of the startup/resume logic. I’m gonna create a new class called ManagerStarter that’ll implement the CommandLineRunner Spring interface.

@Component
@Profile("manager")
public class ManagerStarter implements CommandLineRunner {
    private static final Logger logger = LoggerFactory.getLogger(ManagerStarter.class);

    @Override
    public void run(String... args) throws Exception {
        if (areThereStuckJobs()) {
            List<Long> stuckJobIds = getStuckJobIds();
            stuckJobIds.forEach(this::handleStuckJob);
        } else {
            // start the job as usual
        }
    }

    private void handleStuckJob(Long stuckJobId) {
        try {
            waitUntilAllPartitionsFinished(stuckJobId);
            // update job state
            // restart job
        } catch (Exception e) {
            throw new RuntimeException("Exception while handling a stuck job", e);
        }
    }

    private void waitUntilAllPartitionsFinished(Long stuckJobId) throws InterruptedException {
        while (!areAllPartitionsCompleted(stuckJobId)) {
            logger.info("Sleeping for a second to wait for the partitions to complete for job {}", stuckJobId);
            Thread.sleep(1000);
        }
    }

    private boolean areAllPartitionsCompleted(Long jobId) {
        // decide if all partitions have completed
        return true;
    }

    private List<Long> getStuckJobIds() {
        // get all stuck job ids
        return Collections.emptyList();
    }

    private boolean areThereStuckJobs() {
        // decide if there's any stuck job
        return false;
    }
}

Let’s fill this up step by step. The implementation for starting a new job upon startup is gonna be easy but needs a tiny bit of extra prep.

Let’s go to the ManagerConfiguration and adjust the following things:

  • Extract the partitioning job name into a public constant
  • Extract the partitioning step name into a public constant
  • Add a JobRegistryBeanPostProcessor to the app context

Here’s how the whole thing looks:

@Configuration
@Profile("manager")
public class ManagerConfiguration {
    public static final String PARTITIONING_JOB_NAME = "partitioningJob";
    public static final String PARTITIONER_STEP_NAME = "partitionerStep";

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private RemotePartitioningManagerStepBuilderFactory stepBuilderFactory;

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Bean
    public DirectChannel outboundRequests() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow outboundFlow() {
        KafkaProducerMessageHandler messageHandler = new KafkaProducerMessageHandler(kafkaTemplate);
        messageHandler.setTopicExpression(new LiteralExpression(Constants.TOPIC_NAME));
        Function<Message<?>, Long> partitionIdFn = (m) -> {
            StepExecutionRequest executionRequest = (StepExecutionRequest) m.getPayload();
            return executionRequest.getStepExecutionId() % Constants.TOPIC_PARTITION_COUNT;
        };
        messageHandler.setPartitionIdExpression(new FunctionExpression<>(partitionIdFn));
        return IntegrationFlows
                .from(outboundRequests())
                .log()
                .handle(messageHandler)
                .get();
    }

    @Bean(name = PARTITIONING_JOB_NAME)
    public Job partitioningJob() {
        return jobBuilderFactory.get(PARTITIONING_JOB_NAME)
                .start(partitionerStep())
                .incrementer(new RunIdIncrementer())
                .build();
    }

    @Bean
    public ExamplePartitioner partitioner() {
        return new ExamplePartitioner();
    }

    @Bean
    public Step partitionerStep() {
        return stepBuilderFactory.get(PARTITIONER_STEP_NAME)
                .partitioner(Constants.WORKER_STEP_NAME, partitioner())
                .outputChannel(outboundRequests())
                .build();
    }

    @Bean
    public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
        final JobRegistryBeanPostProcessor answer = new JobRegistryBeanPostProcessor();
        answer.setJobRegistry(jobRegistry);
        return answer;
    }
}

Good, let’s go back to the ManagerStarter class and implement the startup.

The implementation is going to be very similar to what Spring Batch does internally:

@Component
@Profile("manager")
public class ManagerStarter implements CommandLineRunner {
    private static final Logger logger = LoggerFactory.getLogger(ManagerStarter.class);

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private JobLocator jobLocator;

    @Autowired
    private JobExplorer jobExplorer;

    @Override
    public void run(String... args) throws Exception {
        if (areThereStuckJobs()) {
            List<Long> stuckJobIds = getStuckJobIds();
            stuckJobIds.forEach(this::handleStuckJob);
        } else {
            Job partitioningJob = jobLocator.getJob(PARTITIONING_JOB_NAME);
            JobParameters jobParameters = new JobParametersBuilder(jobExplorer)
                    .getNextJobParameters(partitioningJob)
                    .toJobParameters();
            jobLauncher.run(partitioningJob, jobParameters);
        }
    }

    // omitted for simplicity
}

Easy I’d say. Let’s continue implementing the building blocks of the resume logic.

Let’s see how to check whether there’s a stuck job. But before that let’s dig into what happens to the database in case the job starts.

We’ll have:

  • 1 record in the BATCH_JOB_INSTANCE table that represents this job
  • 1 record in the BATCH_JOB_EXECUTION table that represents a single execution for the corresponding BATCH_JOB_INSTANCE
  • 1+50 records in BATCH_STEP_EXECUTION table because the first step of the job is the partitioning step; that’ll spawn 50 partitions which means the worker step needs to be executed 50 times, hence the 51 rows.

And then if the manager fails in the middle of the workers processing the partitions:

  • The BATCH_JOB_EXECUTION table will contain the execution stuck in STARTED status
  • The BATCH_STEP_EXECUTION table will have the partitioning step stuck in STARTED status as well

BATCH_JOB_INSTANCE

JOB_INSTANCE_IDJOB_NAMEJOB_KEY
1partitioningJob853d3449e311f40366811cbefb3d93d7

BATCH_JOB_EXECUTION

JOB_EXECUTION_IDJOB_INSTANCE_IDSTART_TIMESTATUS
112022-04-26 13:52:22.655000STARTED

BATCH_STEP_EXECUTION

STEP_EXECUTION_IDSTEP_NAMEJOB_EXECUTION_IDSTATUS
1partitionerStep1STARTED
2simpleStep:partition11COMPLETED

And in theory, if we resume the same job (i.e. same parameters), the BATCH_JOB_INSTANCE will still contain 1 row but the BATCH_JOB_EXECUTION will have one more row that represents the resumed job..

@Component
@Profile("manager")
public class ManagerStarter implements CommandLineRunner {
    // omitted for simplicity

    @Autowired
    private NamedParameterJdbcTemplate jdbcTemplate;

    @Override
    public void run(String... args) throws Exception {
        // omitted for simplicity
    }

    // omitted for simplicity

    private boolean areThereStuckJobs() {
        Long stuckJobCount = jdbcTemplate.queryForObject("SELECT COUNT(*) as STUCK_JOB_COUNT FROM BATCH_JOB_INSTANCE bji " +
                "INNER JOIN BATCH_JOB_EXECUTION bje ON bji.JOB_INSTANCE_ID = bje.JOB_INSTANCE_ID " +
                "WHERE bje.STATUS IN (:statuses) AND bji.JOB_NAME = :jobName AND bje.JOB_INSTANCE_ID NOT IN (" +
                "SELECT bje.JOB_INSTANCE_ID FROM BATCH_JOB_INSTANCE bji " +
                "INNER JOIN BATCH_JOB_EXECUTION bje ON bji.JOB_INSTANCE_ID = bje.JOB_INSTANCE_ID " +
                "WHERE bje.STATUS  = :completedStatus AND bji.JOB_NAME = :jobName)", Map.of(
                "statuses", List.of(STARTED.name(), FAILED.name()),
                "jobName", PARTITIONING_JOB_NAME,
                "completedStatus", COMPLETED.name()), Long.class);
        return stuckJobCount != 0L;
    }
}

The query does the following:

  1. Gets all JOB_INSTANCE_IDs for the partitioning job which have a completed job execution
  2. Gets all partitioning job executions which are in STARTED or FAILED state and from this set, it removes the jobs that already have a completed job execution

The last step is needed because if the job is resumed and the execution finally completes, there’s gonna be 2 records for the same job instance, one FAILED and one COMPLETED so we wanna make sure these are excluded.

The last thing in the query is to count the remaining rows in the resultset. If there’s even a single row, it indicates there’s a job stuck.

Similarly to this, we’ll need the stuck job IDs to modify their state to FAILED so that Spring Batch can resume later on. The query is almost identical to the one above:

@Component
@Profile("manager")
public class ManagerStarter implements CommandLineRunner {
    // omitted for simplicity

    @Autowired
    private NamedParameterJdbcTemplate jdbcTemplate;

    @Override
    public void run(String... args) throws Exception {
        // omitted for simplicity
    }

    // omitted for simplicity

    private List<Long> getStuckJobIds() {
        return jdbcTemplate.queryForList("SELECT bje.JOB_EXECUTION_ID FROM BATCH_JOB_INSTANCE bji " +
                "INNER JOIN BATCH_JOB_EXECUTION bje ON bji.JOB_INSTANCE_ID = bje.JOB_INSTANCE_ID " +
                "WHERE bje.STATUS IN (:statuses) AND bji.JOB_NAME = :jobName AND bje.JOB_INSTANCE_ID NOT IN (" +
                "SELECT bje.JOB_INSTANCE_ID FROM BATCH_JOB_INSTANCE bji " +
                "INNER JOIN BATCH_JOB_EXECUTION bje ON bji.JOB_INSTANCE_ID = bje.JOB_INSTANCE_ID " +
                "WHERE bje.STATUS = :completedStatus AND bji.JOB_NAME = :jobName)", Map.of(
                "statuses", List.of(STARTED.name(), FAILED.name()),
                "jobName", PARTITIONING_JOB_NAME,
                "completedStatus", COMPLETED.name()), Long.class);
    }
}

The difference is that instead of the count aggregate function, I’m just returning the job execution IDs.

Good. Now, let’s decide if all partitions have finished for our job. This is not gonna be a complicated job because as seen on one of the tables above, this is stored in the BATCH_STEP_EXECUTION table.

@Component
@Profile("manager")
public class ManagerStarter implements CommandLineRunner {
    // omitted for simplicity

    @Autowired
    private NamedParameterJdbcTemplate jdbcTemplate;

    @Override
    public void run(String... args) throws Exception {
        // omitted for simplicity
    }

    // omitted for simplicity

      private boolean areAllPartitionsCompleted(Long jobId) {
        Long partitionsNotCompleted = jdbcTemplate.queryForObject("SELECT COUNT(bse.STEP_EXECUTION_ID) FROM BATCH_STEP_EXECUTION bse " +
                "WHERE bse.JOB_EXECUTION_ID = :jobExecutionId AND bse.STEP_NAME <> :stepName AND bse.status <> :status", Map.of(
                "jobExecutionId", jobId,
                "stepName", PARTITIONER_STEP_NAME,
                "status", COMPLETED.name()), Long.class);
        return partitionsNotCompleted == 0L;
    }
}

Awesome. Next up, as a last step, we need to implement modifying our job state that needs to be resumed. We’ll need a JobOperator from Spring Batch and a TransactionRunner that we’ll create in a second to ensure transactionality for our statements.

The TransactionalRunner looks the following:

@Component
public class TransactionRunner {
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void runInTransaction(Runnable r) {
        r.run();
    }
}

And then the ManagerStarter:

@Component
@Profile("manager")
public class ManagerStarter implements CommandLineRunner {
    // omitted for simplicity

    @Autowired
    private JobOperator jobOperator;

    @Autowired
    private TransactionRunner txRunner;

    @Override
    public void run(String... args) throws Exception {
        if (areThereStuckJobs()) {
            List<Long> stuckJobIds = getStuckJobIds();
            stuckJobIds.forEach(this::handleStuckJob);
        } else {
            Job partitioningJob = jobLocator.getJob(PARTITIONING_JOB_NAME);
            JobParameters jobParameters = new JobParametersBuilder(jobExplorer)
                    .getNextJobParameters(partitioningJob)
                    .toJobParameters();
            jobLauncher.run(partitioningJob, jobParameters);
        }
    }

    private void handleStuckJob(Long stuckJobId) {
        try {
            waitUntilAllPartitionsFinished(stuckJobId);
            txRunner.runInTransaction(() -> {
                jdbcTemplate.update("UPDATE BATCH_STEP_EXECUTION SET STATUS = :status WHERE JOB_EXECUTION_ID = :jobExecutionId AND STEP_NAME = :stepName", Map.of(
                        "status", FAILED.name(),
                        "jobExecutionId", stuckJobId,
                        "stepName", PARTITIONER_STEP_NAME));
                jdbcTemplate.update("UPDATE BATCH_JOB_EXECUTION SET STATUS = :status, START_TIME = null, END_TIME = null WHERE JOB_EXECUTION_ID = :jobExecutionId", Map.of(
                        "status", FAILED.name(),
                        "jobExecutionId", stuckJobId));
            });
            jobOperator.restart(stuckJobId);
        } catch (Exception e) {
            throw new RuntimeException("Exception while handling a stuck job", e);
        }
    }
    
    // omitted for simplicity
}

The implementation updates the stuck BATCH_STEP_EXECUTION for the partitioning step and updates the stuck BATCH_JOB_EXECUTION for the partitioning job execution. It’s important to note that this is done within the context of the TransactionalRunner and the reason is because we wanna make sure it’s happening in a separate transaction than the one Spring Batch starts (if I remember correctly, Spring Batch even recognizes if you try to wrap everything into the same transaction and fails).

The new status we need to update the rows is FAILED. Plus, what took me some time to figure out is the fact that Spring Batch does some internal checks whether a job is running already and it’s sometimes decided based on the START_TIME and END_TIME columns of a particular job execution so that’s something we need to set to null, otherwise Spring Batch will think the job is still running and won’t allow resuming.

Spring Batch manager failure testing

That’s it, let’s test everything.

Start up a single worker:

$ ./gradlew bootRun --args='--spring.profiles.active=worker'

Then start up a manager:

$ ./gradlew bootRun --args='--spring.profiles.active=manager'

Wait until the messages are sent to Kafka. You’ll see it in the logs:

2022-04-26 13:52:24.099  INFO 7100 --- [main] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=StepExecutionRequest: [jobExecutionId=7, stepExecutionId=163, stepName=simpleStep], headers={sequenceNumber=49, correlationId=7:simpleStep, id=02bb3c11-ae59-fd1f-60e4-4715a8ac5cec, sequenceSize=50, timestamp=1650973944099}]

Then stop the manager and notice that the worker is still processing the messages.

Start up the manager again:

$ ./gradlew bootRun --args='--spring.profiles.active=manager'

And it’ll wait until the partitions are complete:

2022-04-26 14:24:28.040  INFO 16780 --- [main] c.a.b.s.manager.ManagerStarter           : Sleeping for a second to wait for the partitions to complete for job 9
2022-04-26 14:24:29.045  INFO 16780 --- [main] c.a.b.s.manager.ManagerStarter           : Sleeping for a second to wait for the partitions to complete for job 9
2022-04-26 14:24:30.092  INFO 16780 --- [main] o.s.b.c.l.support.SimpleJobOperator      : Checking status of job execution with id=9
2022-04-26 14:24:30.220  INFO 16780 --- [main] o.s.b.c.l.support.SimpleJobOperator      : Attempting to resume job with name=partitioningJob and parameters={run.id=5}
2022-04-26 14:24:30.307  INFO 16780 --- [main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=partitioningJob]] launched with the following parameters: [{run.id=5}]
2022-04-26 14:24:30.396  INFO 16780 --- [main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [partitionerStep]
2022-04-26 14:24:30.702  INFO 16780 --- [main] o.s.batch.core.step.AbstractStep         : Step: [partitionerStep] executed in 306ms
2022-04-26 14:24:30.743  INFO 16780 --- [main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=partitioningJob]] completed with the following parameters: [{run.id=5}] and the following status: [COMPLETED] in 404ms

Perfect. The job has completed successfully without reprocessing any of the partitions.

Summary

In my opinion this could be a solution provided out-of-the box by Spring Batch but maybe it’s too specific. I’m not sure. Maybe they could just provide a “smart resume” flag configured via properties that does this. We’ll see.

One more note on the implementation. It’s currently considering only a single job to restart. If you need a full-fledged solution that’ll restart any partitioned job, there are a couple of changes that need to be made. Maybe I’ll bring that as well later on.

Hope you liked it, here’s the GitHub code on the master-crash branch.

Follow me on Twitter and Facebook for more.

Leave a Reply

Your email address will not be published.