Handling manager failures in Spring Batch

I’ve been writing about Spring Batch lately and one of the questions I had in terms of fault tolerance is what happens with remote partitioning in case of a Spring Batch manager failure.

If you’re not familiar with remote partitioning, here’s a great article describing it: Scaling Spring Batch processing with remote partitioning using Kafka. TLDR; you have a single, dedicated so called manager instance that distributes the work among multiple workers using a messaging channel. After the manager distributed the work, it essentially just waits and polls whether all the work has been completed by the workers.

Now, there was one use-case I was really curious about. Let’s say the manager successfully distributed the work and the workers have started to do their job. The manager now is waiting for completion by periodically polling the database for the state changes.

What if the manager crashes during this time, while it’s waiting? Can I resume the existing job which is still ongoing? If so, how?

I tried to look at the potential solutions and figure it out myself but I didn’t find a “resume” operation on any of the Spring Batch interfaces so I raised a question on StackOverflow.

Mahmoud Ben Hassine, the lead of the Spring Batch project helped me out explaining how this could be done emphasizing the fact that it’s not something Spring Batch offers out of the box.

So, in this article I’m going to show you how I implemented this using an example project so you don’t need to worry about Spring Batch manager failures anymore.

The idea

If you haven’t read the previous articles, I suggest you do it; especially the one with Kafka because that’s what I’m going to use to showcase the solution: Scaling Spring Batch processing with remote partitioning using Kafka.

So, since resuming manager jobs where they left off is not a functionality provided by Spring Batch, we need to implement it.

The idea is to make Spring Batch believe that the manager job failed even though the partitions that it created were successfully processed. And then, we can restart the original job with the same parameters and Spring Batch will recognize “oh well, all the partitions have completed so nothing to do here”.

In a nutshell, it’s just setting the job’s state to

FAILED
FAILED and then waiting for all step executions (partitions) to complete. Then using the
JobOperator
JobOperator, restarting the original job will transition the job to the
COMPLETED
COMPLETED state. Sounds easier than it’s done.

Coding the solution

The code I’m gonna use further in the article can be found here on GitHub.

Let’s build the skeleton and then we can fill it up with content. Since there’s gonna be some custom implementation to resume/start jobs upon application startup, we gotta get rid of the default Spring Batch behavior that starts up the present jobs automatically. That’s done by setting the following property in the

application.properties
application.properties:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
spring.batch.job.enabled=false
spring.batch.job.enabled=false
spring.batch.job.enabled=false

But the thing is, we don’t want this startup only in case it’s a manager instance and since we’re using profiles to control the app whether it’s a manager or a worker, we can simply create a new file called

application-manager.properties
application-manager.properties which will only be applied in case it has the
manager
manager profile.

Good. Next up, let’s add a slight delay into the worker processing so it’s not gonna be completed within the matter of milliseconds. Open the

WorkerConfiguration
WorkerConfiguration class and add a
Thread.sleep
Thread.sleep to the
ItemProcessor
ItemProcessor:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
@Bean
public ItemProcessor<Integer, Customer> itemProcessor() {
return new ItemProcessor<>() {
@Override
public Customer process(Integer item) throws InterruptedException {
Thread.sleep(50);
return new Customer(item);
}
};
}
@Bean public ItemProcessor<Integer, Customer> itemProcessor() { return new ItemProcessor<>() { @Override public Customer process(Integer item) throws InterruptedException { Thread.sleep(50); return new Customer(item); } }; }
    @Bean
    public ItemProcessor<Integer, Customer> itemProcessor() {
        return new ItemProcessor<>() {
            @Override
            public Customer process(Integer item) throws InterruptedException {
                Thread.sleep(50);
                return new Customer(item);
            }
        };
    }

Awesome. Let’s code the skeleton of the startup/resume logic. I’m gonna create a new class called

ManagerStarter
ManagerStarter that’ll implement the
CommandLineRunner
CommandLineRunner Spring interface.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
@Component
@Profile("manager")
public class ManagerStarter implements CommandLineRunner {
private static final Logger logger = LoggerFactory.getLogger(ManagerStarter.class);
@Override
public void run(String... args) throws Exception {
if (areThereStuckJobs()) {
List<Long> stuckJobIds = getStuckJobIds();
stuckJobIds.forEach(this::handleStuckJob);
} else {
// start the job as usual
}
}
private void handleStuckJob(Long stuckJobId) {
try {
waitUntilAllPartitionsFinished(stuckJobId);
// update job state
// restart job
} catch (Exception e) {
throw new RuntimeException("Exception while handling a stuck job", e);
}
}
private void waitUntilAllPartitionsFinished(Long stuckJobId) throws InterruptedException {
while (!areAllPartitionsCompleted(stuckJobId)) {
logger.info("Sleeping for a second to wait for the partitions to complete for job {}", stuckJobId);
Thread.sleep(1000);
}
}
private boolean areAllPartitionsCompleted(Long jobId) {
// decide if all partitions have completed
return true;
}
private List<Long> getStuckJobIds() {
// get all stuck job ids
return Collections.emptyList();
}
private boolean areThereStuckJobs() {
// decide if there's any stuck job
return false;
}
}
@Component @Profile("manager") public class ManagerStarter implements CommandLineRunner { private static final Logger logger = LoggerFactory.getLogger(ManagerStarter.class); @Override public void run(String... args) throws Exception { if (areThereStuckJobs()) { List<Long> stuckJobIds = getStuckJobIds(); stuckJobIds.forEach(this::handleStuckJob); } else { // start the job as usual } } private void handleStuckJob(Long stuckJobId) { try { waitUntilAllPartitionsFinished(stuckJobId); // update job state // restart job } catch (Exception e) { throw new RuntimeException("Exception while handling a stuck job", e); } } private void waitUntilAllPartitionsFinished(Long stuckJobId) throws InterruptedException { while (!areAllPartitionsCompleted(stuckJobId)) { logger.info("Sleeping for a second to wait for the partitions to complete for job {}", stuckJobId); Thread.sleep(1000); } } private boolean areAllPartitionsCompleted(Long jobId) { // decide if all partitions have completed return true; } private List<Long> getStuckJobIds() { // get all stuck job ids return Collections.emptyList(); } private boolean areThereStuckJobs() { // decide if there's any stuck job return false; } }
@Component
@Profile("manager")
public class ManagerStarter implements CommandLineRunner {
    private static final Logger logger = LoggerFactory.getLogger(ManagerStarter.class);

    @Override
    public void run(String... args) throws Exception {
        if (areThereStuckJobs()) {
            List<Long> stuckJobIds = getStuckJobIds();
            stuckJobIds.forEach(this::handleStuckJob);
        } else {
            // start the job as usual
        }
    }

    private void handleStuckJob(Long stuckJobId) {
        try {
            waitUntilAllPartitionsFinished(stuckJobId);
            // update job state
            // restart job
        } catch (Exception e) {
            throw new RuntimeException("Exception while handling a stuck job", e);
        }
    }

    private void waitUntilAllPartitionsFinished(Long stuckJobId) throws InterruptedException {
        while (!areAllPartitionsCompleted(stuckJobId)) {
            logger.info("Sleeping for a second to wait for the partitions to complete for job {}", stuckJobId);
            Thread.sleep(1000);
        }
    }

    private boolean areAllPartitionsCompleted(Long jobId) {
        // decide if all partitions have completed
        return true;
    }

    private List<Long> getStuckJobIds() {
        // get all stuck job ids
        return Collections.emptyList();
    }

    private boolean areThereStuckJobs() {
        // decide if there's any stuck job
        return false;
    }
}

Let’s fill this up step by step. The implementation for starting a new job upon startup is gonna be easy but needs a tiny bit of extra prep.

Let’s go to the

ManagerConfiguration
ManagerConfiguration and adjust the following things:

  • Extract the partitioning job name into a public constant
  • Extract the partitioning step name into a public constant
  • Add a
    JobRegistryBeanPostProcessor
    JobRegistryBeanPostProcessor to the app context

Here’s how the whole thing looks:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
@Configuration
@Profile("manager")
public class ManagerConfiguration {
public static final String PARTITIONING_JOB_NAME = "partitioningJob";
public static final String PARTITIONER_STEP_NAME = "partitionerStep";
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private RemotePartitioningManagerStepBuilderFactory stepBuilderFactory;
@Autowired
private KafkaTemplate kafkaTemplate;
@Bean
public DirectChannel outboundRequests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow() {
KafkaProducerMessageHandler messageHandler = new KafkaProducerMessageHandler(kafkaTemplate);
messageHandler.setTopicExpression(new LiteralExpression(Constants.TOPIC_NAME));
Function<Message<?>, Long> partitionIdFn = (m) -> {
StepExecutionRequest executionRequest = (StepExecutionRequest) m.getPayload();
return executionRequest.getStepExecutionId() % Constants.TOPIC_PARTITION_COUNT;
};
messageHandler.setPartitionIdExpression(new FunctionExpression<>(partitionIdFn));
return IntegrationFlows
.from(outboundRequests())
.log()
.handle(messageHandler)
.get();
}
@Bean(name = PARTITIONING_JOB_NAME)
public Job partitioningJob() {
return jobBuilderFactory.get(PARTITIONING_JOB_NAME)
.start(partitionerStep())
.incrementer(new RunIdIncrementer())
.build();
}
@Bean
public ExamplePartitioner partitioner() {
return new ExamplePartitioner();
}
@Bean
public Step partitionerStep() {
return stepBuilderFactory.get(PARTITIONER_STEP_NAME)
.partitioner(Constants.WORKER_STEP_NAME, partitioner())
.outputChannel(outboundRequests())
.build();
}
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
final JobRegistryBeanPostProcessor answer = new JobRegistryBeanPostProcessor();
answer.setJobRegistry(jobRegistry);
return answer;
}
}
@Configuration @Profile("manager") public class ManagerConfiguration { public static final String PARTITIONING_JOB_NAME = "partitioningJob"; public static final String PARTITIONER_STEP_NAME = "partitionerStep"; @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private RemotePartitioningManagerStepBuilderFactory stepBuilderFactory; @Autowired private KafkaTemplate kafkaTemplate; @Bean public DirectChannel outboundRequests() { return new DirectChannel(); } @Bean public IntegrationFlow outboundFlow() { KafkaProducerMessageHandler messageHandler = new KafkaProducerMessageHandler(kafkaTemplate); messageHandler.setTopicExpression(new LiteralExpression(Constants.TOPIC_NAME)); Function<Message<?>, Long> partitionIdFn = (m) -> { StepExecutionRequest executionRequest = (StepExecutionRequest) m.getPayload(); return executionRequest.getStepExecutionId() % Constants.TOPIC_PARTITION_COUNT; }; messageHandler.setPartitionIdExpression(new FunctionExpression<>(partitionIdFn)); return IntegrationFlows .from(outboundRequests()) .log() .handle(messageHandler) .get(); } @Bean(name = PARTITIONING_JOB_NAME) public Job partitioningJob() { return jobBuilderFactory.get(PARTITIONING_JOB_NAME) .start(partitionerStep()) .incrementer(new RunIdIncrementer()) .build(); } @Bean public ExamplePartitioner partitioner() { return new ExamplePartitioner(); } @Bean public Step partitionerStep() { return stepBuilderFactory.get(PARTITIONER_STEP_NAME) .partitioner(Constants.WORKER_STEP_NAME, partitioner()) .outputChannel(outboundRequests()) .build(); } @Bean public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) { final JobRegistryBeanPostProcessor answer = new JobRegistryBeanPostProcessor(); answer.setJobRegistry(jobRegistry); return answer; } }
@Configuration
@Profile("manager")
public class ManagerConfiguration {
    public static final String PARTITIONING_JOB_NAME = "partitioningJob";
    public static final String PARTITIONER_STEP_NAME = "partitionerStep";

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private RemotePartitioningManagerStepBuilderFactory stepBuilderFactory;

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Bean
    public DirectChannel outboundRequests() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow outboundFlow() {
        KafkaProducerMessageHandler messageHandler = new KafkaProducerMessageHandler(kafkaTemplate);
        messageHandler.setTopicExpression(new LiteralExpression(Constants.TOPIC_NAME));
        Function<Message<?>, Long> partitionIdFn = (m) -> {
            StepExecutionRequest executionRequest = (StepExecutionRequest) m.getPayload();
            return executionRequest.getStepExecutionId() % Constants.TOPIC_PARTITION_COUNT;
        };
        messageHandler.setPartitionIdExpression(new FunctionExpression<>(partitionIdFn));
        return IntegrationFlows
                .from(outboundRequests())
                .log()
                .handle(messageHandler)
                .get();
    }

    @Bean(name = PARTITIONING_JOB_NAME)
    public Job partitioningJob() {
        return jobBuilderFactory.get(PARTITIONING_JOB_NAME)
                .start(partitionerStep())
                .incrementer(new RunIdIncrementer())
                .build();
    }

    @Bean
    public ExamplePartitioner partitioner() {
        return new ExamplePartitioner();
    }

    @Bean
    public Step partitionerStep() {
        return stepBuilderFactory.get(PARTITIONER_STEP_NAME)
                .partitioner(Constants.WORKER_STEP_NAME, partitioner())
                .outputChannel(outboundRequests())
                .build();
    }

    @Bean
    public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
        final JobRegistryBeanPostProcessor answer = new JobRegistryBeanPostProcessor();
        answer.setJobRegistry(jobRegistry);
        return answer;
    }
}

Good, let’s go back to the

ManagerStarter
ManagerStarter class and implement the startup.

The implementation is going to be very similar to what Spring Batch does internally:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
@Component
@Profile("manager")
public class ManagerStarter implements CommandLineRunner {
private static final Logger logger = LoggerFactory.getLogger(ManagerStarter.class);
@Autowired
private JobLauncher jobLauncher;
@Autowired
private JobLocator jobLocator;
@Autowired
private JobExplorer jobExplorer;
@Override
public void run(String... args) throws Exception {
if (areThereStuckJobs()) {
List<Long> stuckJobIds = getStuckJobIds();
stuckJobIds.forEach(this::handleStuckJob);
} else {
Job partitioningJob = jobLocator.getJob(PARTITIONING_JOB_NAME);
JobParameters jobParameters = new JobParametersBuilder(jobExplorer)
.getNextJobParameters(partitioningJob)
.toJobParameters();
jobLauncher.run(partitioningJob, jobParameters);
}
}
// omitted for simplicity
}
@Component @Profile("manager") public class ManagerStarter implements CommandLineRunner { private static final Logger logger = LoggerFactory.getLogger(ManagerStarter.class); @Autowired private JobLauncher jobLauncher; @Autowired private JobLocator jobLocator; @Autowired private JobExplorer jobExplorer; @Override public void run(String... args) throws Exception { if (areThereStuckJobs()) { List<Long> stuckJobIds = getStuckJobIds(); stuckJobIds.forEach(this::handleStuckJob); } else { Job partitioningJob = jobLocator.getJob(PARTITIONING_JOB_NAME); JobParameters jobParameters = new JobParametersBuilder(jobExplorer) .getNextJobParameters(partitioningJob) .toJobParameters(); jobLauncher.run(partitioningJob, jobParameters); } } // omitted for simplicity }
@Component
@Profile("manager")
public class ManagerStarter implements CommandLineRunner {
    private static final Logger logger = LoggerFactory.getLogger(ManagerStarter.class);

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private JobLocator jobLocator;

    @Autowired
    private JobExplorer jobExplorer;

    @Override
    public void run(String... args) throws Exception {
        if (areThereStuckJobs()) {
            List<Long> stuckJobIds = getStuckJobIds();
            stuckJobIds.forEach(this::handleStuckJob);
        } else {
            Job partitioningJob = jobLocator.getJob(PARTITIONING_JOB_NAME);
            JobParameters jobParameters = new JobParametersBuilder(jobExplorer)
                    .getNextJobParameters(partitioningJob)
                    .toJobParameters();
            jobLauncher.run(partitioningJob, jobParameters);
        }
    }

    // omitted for simplicity
}

Easy I’d say. Let’s continue implementing the building blocks of the resume logic.

Let’s see how to check whether there’s a stuck job. But before that let’s dig into what happens to the database in case the job starts.

We’ll have:

  • 1 record in the
    BATCH_JOB_INSTANCE
    BATCH_JOB_INSTANCE table that represents this job
  • 1 record in the
    BATCH_JOB_EXECUTION
    BATCH_JOB_EXECUTION table that represents a single execution for the corresponding
    BATCH_JOB_INSTANCE
    BATCH_JOB_INSTANCE
  • 1+50 records in
    BATCH_STEP_EXECUTION
    BATCH_STEP_EXECUTION table because the first step of the job is the partitioning step; that’ll spawn 50 partitions which means the worker step needs to be executed 50 times, hence the 51 rows.

And then if the manager fails in the middle of the workers processing the partitions:

  • The
    BATCH_JOB_EXECUTION
    BATCH_JOB_EXECUTION table will contain the execution stuck in
    STARTED
    STARTED status
  • The
    BATCH_STEP_EXECUTION
    BATCH_STEP_EXECUTION table will have the partitioning step stuck in
    STARTED
    STARTED status as well

BATCH_JOB_INSTANCE

JOB_INSTANCE_IDJOB_NAMEJOB_KEY
1partitioningJob853d3449e311f40366811cbefb3d93d7

BATCH_JOB_EXECUTION

JOB_EXECUTION_IDJOB_INSTANCE_IDSTART_TIMESTATUS
112022-04-26 13:52:22.655000STARTED

BATCH_STEP_EXECUTION

STEP_EXECUTION_IDSTEP_NAMEJOB_EXECUTION_IDSTATUS
1partitionerStep1STARTED
2simpleStep:partition11COMPLETED

And in theory, if we resume the same job (i.e. same parameters), the

BATCH_JOB_INSTANCE
BATCH_JOB_INSTANCE will still contain 1 row but the
BATCH_JOB_EXECUTION
BATCH_JOB_EXECUTION will have one more row that represents the resumed job..

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
@Component
@Profile("manager")
public class ManagerStarter implements CommandLineRunner {
// omitted for simplicity
@Autowired
private NamedParameterJdbcTemplate jdbcTemplate;
@Override
public void run(String... args) throws Exception {
// omitted for simplicity
}
// omitted for simplicity
private boolean areThereStuckJobs() {
Long stuckJobCount = jdbcTemplate.queryForObject("SELECT COUNT(*) as STUCK_JOB_COUNT FROM BATCH_JOB_INSTANCE bji " +
"INNER JOIN BATCH_JOB_EXECUTION bje ON bji.JOB_INSTANCE_ID = bje.JOB_INSTANCE_ID " +
"WHERE bje.STATUS IN (:statuses) AND bji.JOB_NAME = :jobName AND bje.JOB_INSTANCE_ID NOT IN (" +
"SELECT bje.JOB_INSTANCE_ID FROM BATCH_JOB_INSTANCE bji " +
"INNER JOIN BATCH_JOB_EXECUTION bje ON bji.JOB_INSTANCE_ID = bje.JOB_INSTANCE_ID " +
"WHERE bje.STATUS = :completedStatus AND bji.JOB_NAME = :jobName)", Map.of(
"statuses", List.of(STARTED.name(), FAILED.name()),
"jobName", PARTITIONING_JOB_NAME,
"completedStatus", COMPLETED.name()), Long.class);
return stuckJobCount != 0L;
}
}
@Component @Profile("manager") public class ManagerStarter implements CommandLineRunner { // omitted for simplicity @Autowired private NamedParameterJdbcTemplate jdbcTemplate; @Override public void run(String... args) throws Exception { // omitted for simplicity } // omitted for simplicity private boolean areThereStuckJobs() { Long stuckJobCount = jdbcTemplate.queryForObject("SELECT COUNT(*) as STUCK_JOB_COUNT FROM BATCH_JOB_INSTANCE bji " + "INNER JOIN BATCH_JOB_EXECUTION bje ON bji.JOB_INSTANCE_ID = bje.JOB_INSTANCE_ID " + "WHERE bje.STATUS IN (:statuses) AND bji.JOB_NAME = :jobName AND bje.JOB_INSTANCE_ID NOT IN (" + "SELECT bje.JOB_INSTANCE_ID FROM BATCH_JOB_INSTANCE bji " + "INNER JOIN BATCH_JOB_EXECUTION bje ON bji.JOB_INSTANCE_ID = bje.JOB_INSTANCE_ID " + "WHERE bje.STATUS = :completedStatus AND bji.JOB_NAME = :jobName)", Map.of( "statuses", List.of(STARTED.name(), FAILED.name()), "jobName", PARTITIONING_JOB_NAME, "completedStatus", COMPLETED.name()), Long.class); return stuckJobCount != 0L; } }
@Component
@Profile("manager")
public class ManagerStarter implements CommandLineRunner {
    // omitted for simplicity

    @Autowired
    private NamedParameterJdbcTemplate jdbcTemplate;

    @Override
    public void run(String... args) throws Exception {
        // omitted for simplicity
    }

    // omitted for simplicity

    private boolean areThereStuckJobs() {
        Long stuckJobCount = jdbcTemplate.queryForObject("SELECT COUNT(*) as STUCK_JOB_COUNT FROM BATCH_JOB_INSTANCE bji " +
                "INNER JOIN BATCH_JOB_EXECUTION bje ON bji.JOB_INSTANCE_ID = bje.JOB_INSTANCE_ID " +
                "WHERE bje.STATUS IN (:statuses) AND bji.JOB_NAME = :jobName AND bje.JOB_INSTANCE_ID NOT IN (" +
                "SELECT bje.JOB_INSTANCE_ID FROM BATCH_JOB_INSTANCE bji " +
                "INNER JOIN BATCH_JOB_EXECUTION bje ON bji.JOB_INSTANCE_ID = bje.JOB_INSTANCE_ID " +
                "WHERE bje.STATUS  = :completedStatus AND bji.JOB_NAME = :jobName)", Map.of(
                "statuses", List.of(STARTED.name(), FAILED.name()),
                "jobName", PARTITIONING_JOB_NAME,
                "completedStatus", COMPLETED.name()), Long.class);
        return stuckJobCount != 0L;
    }
}

The query does the following:

  1. Gets all
    JOB_INSTANCE_ID
    JOB_INSTANCE_IDs for the partitioning job which have a completed job execution
  2. Gets all partitioning job executions which are in
    STARTED
    STARTED or
    FAILED
    FAILED state and from this set, it removes the jobs that already have a completed job execution

The last step is needed because if the job is resumed and the execution finally completes, there’s gonna be 2 records for the same job instance, one

FAILED
FAILED and one
COMPLETED
COMPLETED so we wanna make sure these are excluded.

The last thing in the query is to count the remaining rows in the resultset. If there’s even a single row, it indicates there’s a job stuck.

Similarly to this, we’ll need the stuck job IDs to modify their state to

FAILED
FAILED so that Spring Batch can resume later on. The query is almost identical to the one above:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
@Component
@Profile("manager")
public class ManagerStarter implements CommandLineRunner {
// omitted for simplicity
@Autowired
private NamedParameterJdbcTemplate jdbcTemplate;
@Override
public void run(String... args) throws Exception {
// omitted for simplicity
}
// omitted for simplicity
private List<Long> getStuckJobIds() {
return jdbcTemplate.queryForList("SELECT bje.JOB_EXECUTION_ID FROM BATCH_JOB_INSTANCE bji " +
"INNER JOIN BATCH_JOB_EXECUTION bje ON bji.JOB_INSTANCE_ID = bje.JOB_INSTANCE_ID " +
"WHERE bje.STATUS IN (:statuses) AND bji.JOB_NAME = :jobName AND bje.JOB_INSTANCE_ID NOT IN (" +
"SELECT bje.JOB_INSTANCE_ID FROM BATCH_JOB_INSTANCE bji " +
"INNER JOIN BATCH_JOB_EXECUTION bje ON bji.JOB_INSTANCE_ID = bje.JOB_INSTANCE_ID " +
"WHERE bje.STATUS = :completedStatus AND bji.JOB_NAME = :jobName)", Map.of(
"statuses", List.of(STARTED.name(), FAILED.name()),
"jobName", PARTITIONING_JOB_NAME,
"completedStatus", COMPLETED.name()), Long.class);
}
}
@Component @Profile("manager") public class ManagerStarter implements CommandLineRunner { // omitted for simplicity @Autowired private NamedParameterJdbcTemplate jdbcTemplate; @Override public void run(String... args) throws Exception { // omitted for simplicity } // omitted for simplicity private List<Long> getStuckJobIds() { return jdbcTemplate.queryForList("SELECT bje.JOB_EXECUTION_ID FROM BATCH_JOB_INSTANCE bji " + "INNER JOIN BATCH_JOB_EXECUTION bje ON bji.JOB_INSTANCE_ID = bje.JOB_INSTANCE_ID " + "WHERE bje.STATUS IN (:statuses) AND bji.JOB_NAME = :jobName AND bje.JOB_INSTANCE_ID NOT IN (" + "SELECT bje.JOB_INSTANCE_ID FROM BATCH_JOB_INSTANCE bji " + "INNER JOIN BATCH_JOB_EXECUTION bje ON bji.JOB_INSTANCE_ID = bje.JOB_INSTANCE_ID " + "WHERE bje.STATUS = :completedStatus AND bji.JOB_NAME = :jobName)", Map.of( "statuses", List.of(STARTED.name(), FAILED.name()), "jobName", PARTITIONING_JOB_NAME, "completedStatus", COMPLETED.name()), Long.class); } }
@Component
@Profile("manager")
public class ManagerStarter implements CommandLineRunner {
    // omitted for simplicity

    @Autowired
    private NamedParameterJdbcTemplate jdbcTemplate;

    @Override
    public void run(String... args) throws Exception {
        // omitted for simplicity
    }

    // omitted for simplicity

    private List<Long> getStuckJobIds() {
        return jdbcTemplate.queryForList("SELECT bje.JOB_EXECUTION_ID FROM BATCH_JOB_INSTANCE bji " +
                "INNER JOIN BATCH_JOB_EXECUTION bje ON bji.JOB_INSTANCE_ID = bje.JOB_INSTANCE_ID " +
                "WHERE bje.STATUS IN (:statuses) AND bji.JOB_NAME = :jobName AND bje.JOB_INSTANCE_ID NOT IN (" +
                "SELECT bje.JOB_INSTANCE_ID FROM BATCH_JOB_INSTANCE bji " +
                "INNER JOIN BATCH_JOB_EXECUTION bje ON bji.JOB_INSTANCE_ID = bje.JOB_INSTANCE_ID " +
                "WHERE bje.STATUS = :completedStatus AND bji.JOB_NAME = :jobName)", Map.of(
                "statuses", List.of(STARTED.name(), FAILED.name()),
                "jobName", PARTITIONING_JOB_NAME,
                "completedStatus", COMPLETED.name()), Long.class);
    }
}

The difference is that instead of the count aggregate function, I’m just returning the job execution IDs.

Good. Now, let’s decide if all partitions have finished for our job. This is not gonna be a complicated job because as seen on one of the tables above, this is stored in the

BATCH_STEP_EXECUTION
BATCH_STEP_EXECUTION table.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
@Component
@Profile("manager")
public class ManagerStarter implements CommandLineRunner {
// omitted for simplicity
@Autowired
private NamedParameterJdbcTemplate jdbcTemplate;
@Override
public void run(String... args) throws Exception {
// omitted for simplicity
}
// omitted for simplicity
private boolean areAllPartitionsCompleted(Long jobId) {
Long partitionsNotCompleted = jdbcTemplate.queryForObject("SELECT COUNT(bse.STEP_EXECUTION_ID) FROM BATCH_STEP_EXECUTION bse " +
"WHERE bse.JOB_EXECUTION_ID = :jobExecutionId AND bse.STEP_NAME <> :stepName AND bse.status <> :status", Map.of(
"jobExecutionId", jobId,
"stepName", PARTITIONER_STEP_NAME,
"status", COMPLETED.name()), Long.class);
return partitionsNotCompleted == 0L;
}
}
@Component @Profile("manager") public class ManagerStarter implements CommandLineRunner { // omitted for simplicity @Autowired private NamedParameterJdbcTemplate jdbcTemplate; @Override public void run(String... args) throws Exception { // omitted for simplicity } // omitted for simplicity private boolean areAllPartitionsCompleted(Long jobId) { Long partitionsNotCompleted = jdbcTemplate.queryForObject("SELECT COUNT(bse.STEP_EXECUTION_ID) FROM BATCH_STEP_EXECUTION bse " + "WHERE bse.JOB_EXECUTION_ID = :jobExecutionId AND bse.STEP_NAME <> :stepName AND bse.status <> :status", Map.of( "jobExecutionId", jobId, "stepName", PARTITIONER_STEP_NAME, "status", COMPLETED.name()), Long.class); return partitionsNotCompleted == 0L; } }
@Component
@Profile("manager")
public class ManagerStarter implements CommandLineRunner {
    // omitted for simplicity

    @Autowired
    private NamedParameterJdbcTemplate jdbcTemplate;

    @Override
    public void run(String... args) throws Exception {
        // omitted for simplicity
    }

    // omitted for simplicity

      private boolean areAllPartitionsCompleted(Long jobId) {
        Long partitionsNotCompleted = jdbcTemplate.queryForObject("SELECT COUNT(bse.STEP_EXECUTION_ID) FROM BATCH_STEP_EXECUTION bse " +
                "WHERE bse.JOB_EXECUTION_ID = :jobExecutionId AND bse.STEP_NAME <> :stepName AND bse.status <> :status", Map.of(
                "jobExecutionId", jobId,
                "stepName", PARTITIONER_STEP_NAME,
                "status", COMPLETED.name()), Long.class);
        return partitionsNotCompleted == 0L;
    }
}

Awesome. Next up, as a last step, we need to implement modifying our job state that needs to be resumed. We’ll need a

JobOperator
JobOperator from Spring Batch and a
TransactionRunner
TransactionRunner that we’ll create in a second to ensure transactionality for our statements.

The

TransactionalRunner
TransactionalRunner looks the following:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
@Component
public class TransactionRunner {
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void runInTransaction(Runnable r) {
r.run();
}
}
@Component public class TransactionRunner { @Transactional(propagation = Propagation.REQUIRES_NEW) public void runInTransaction(Runnable r) { r.run(); } }
@Component
public class TransactionRunner {
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void runInTransaction(Runnable r) {
        r.run();
    }
}

And then the

ManagerStarter
ManagerStarter:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
@Component
@Profile("manager")
public class ManagerStarter implements CommandLineRunner {
// omitted for simplicity
@Autowired
private JobOperator jobOperator;
@Autowired
private TransactionRunner txRunner;
@Override
public void run(String... args) throws Exception {
if (areThereStuckJobs()) {
List<Long> stuckJobIds = getStuckJobIds();
stuckJobIds.forEach(this::handleStuckJob);
} else {
Job partitioningJob = jobLocator.getJob(PARTITIONING_JOB_NAME);
JobParameters jobParameters = new JobParametersBuilder(jobExplorer)
.getNextJobParameters(partitioningJob)
.toJobParameters();
jobLauncher.run(partitioningJob, jobParameters);
}
}
private void handleStuckJob(Long stuckJobId) {
try {
waitUntilAllPartitionsFinished(stuckJobId);
txRunner.runInTransaction(() -> {
jdbcTemplate.update("UPDATE BATCH_STEP_EXECUTION SET STATUS = :status WHERE JOB_EXECUTION_ID = :jobExecutionId AND STEP_NAME = :stepName", Map.of(
"status", FAILED.name(),
"jobExecutionId", stuckJobId,
"stepName", PARTITIONER_STEP_NAME));
jdbcTemplate.update("UPDATE BATCH_JOB_EXECUTION SET STATUS = :status, START_TIME = null, END_TIME = null WHERE JOB_EXECUTION_ID = :jobExecutionId", Map.of(
"status", FAILED.name(),
"jobExecutionId", stuckJobId));
});
jobOperator.restart(stuckJobId);
} catch (Exception e) {
throw new RuntimeException("Exception while handling a stuck job", e);
}
}
// omitted for simplicity
}
@Component @Profile("manager") public class ManagerStarter implements CommandLineRunner { // omitted for simplicity @Autowired private JobOperator jobOperator; @Autowired private TransactionRunner txRunner; @Override public void run(String... args) throws Exception { if (areThereStuckJobs()) { List<Long> stuckJobIds = getStuckJobIds(); stuckJobIds.forEach(this::handleStuckJob); } else { Job partitioningJob = jobLocator.getJob(PARTITIONING_JOB_NAME); JobParameters jobParameters = new JobParametersBuilder(jobExplorer) .getNextJobParameters(partitioningJob) .toJobParameters(); jobLauncher.run(partitioningJob, jobParameters); } } private void handleStuckJob(Long stuckJobId) { try { waitUntilAllPartitionsFinished(stuckJobId); txRunner.runInTransaction(() -> { jdbcTemplate.update("UPDATE BATCH_STEP_EXECUTION SET STATUS = :status WHERE JOB_EXECUTION_ID = :jobExecutionId AND STEP_NAME = :stepName", Map.of( "status", FAILED.name(), "jobExecutionId", stuckJobId, "stepName", PARTITIONER_STEP_NAME)); jdbcTemplate.update("UPDATE BATCH_JOB_EXECUTION SET STATUS = :status, START_TIME = null, END_TIME = null WHERE JOB_EXECUTION_ID = :jobExecutionId", Map.of( "status", FAILED.name(), "jobExecutionId", stuckJobId)); }); jobOperator.restart(stuckJobId); } catch (Exception e) { throw new RuntimeException("Exception while handling a stuck job", e); } } // omitted for simplicity }
@Component
@Profile("manager")
public class ManagerStarter implements CommandLineRunner {
    // omitted for simplicity

    @Autowired
    private JobOperator jobOperator;

    @Autowired
    private TransactionRunner txRunner;

    @Override
    public void run(String... args) throws Exception {
        if (areThereStuckJobs()) {
            List<Long> stuckJobIds = getStuckJobIds();
            stuckJobIds.forEach(this::handleStuckJob);
        } else {
            Job partitioningJob = jobLocator.getJob(PARTITIONING_JOB_NAME);
            JobParameters jobParameters = new JobParametersBuilder(jobExplorer)
                    .getNextJobParameters(partitioningJob)
                    .toJobParameters();
            jobLauncher.run(partitioningJob, jobParameters);
        }
    }

    private void handleStuckJob(Long stuckJobId) {
        try {
            waitUntilAllPartitionsFinished(stuckJobId);
            txRunner.runInTransaction(() -> {
                jdbcTemplate.update("UPDATE BATCH_STEP_EXECUTION SET STATUS = :status WHERE JOB_EXECUTION_ID = :jobExecutionId AND STEP_NAME = :stepName", Map.of(
                        "status", FAILED.name(),
                        "jobExecutionId", stuckJobId,
                        "stepName", PARTITIONER_STEP_NAME));
                jdbcTemplate.update("UPDATE BATCH_JOB_EXECUTION SET STATUS = :status, START_TIME = null, END_TIME = null WHERE JOB_EXECUTION_ID = :jobExecutionId", Map.of(
                        "status", FAILED.name(),
                        "jobExecutionId", stuckJobId));
            });
            jobOperator.restart(stuckJobId);
        } catch (Exception e) {
            throw new RuntimeException("Exception while handling a stuck job", e);
        }
    }
    
    // omitted for simplicity
}

The implementation updates the stuck

BATCH_STEP_EXECUTION
BATCH_STEP_EXECUTION for the partitioning step and updates the stuck
BATCH_JOB_EXECUTION
BATCH_JOB_EXECUTION for the partitioning job execution. It’s important to note that this is done within the context of the
TransactionalRunner
TransactionalRunner and the reason is because we wanna make sure it’s happening in a separate transaction than the one Spring Batch starts (if I remember correctly, Spring Batch even recognizes if you try to wrap everything into the same transaction and fails).

The new status we need to update the rows is

FAILED
FAILED. Plus, what took me some time to figure out is the fact that Spring Batch does some internal checks whether a job is running already and it’s sometimes decided based on the
START_TIME
START_TIME and
END_TIME
END_TIME columns of a particular job execution so that’s something we need to set to
null
null, otherwise Spring Batch will think the job is still running and won’t allow resuming.

Spring Batch manager failure testing

That’s it, let’s test everything.

Start up a single worker:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
$ ./gradlew bootRun --args='--spring.profiles.active=worker'
$ ./gradlew bootRun --args='--spring.profiles.active=worker'
$ ./gradlew bootRun --args='--spring.profiles.active=worker'

Then start up a manager:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
$ ./gradlew bootRun --args='--spring.profiles.active=manager'
$ ./gradlew bootRun --args='--spring.profiles.active=manager'
$ ./gradlew bootRun --args='--spring.profiles.active=manager'

Wait until the messages are sent to Kafka. You’ll see it in the logs:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
2022-04-26 13:52:24.099 INFO 7100 --- [main] o.s.integration.handler.LoggingHandler : GenericMessage [payload=StepExecutionRequest: [jobExecutionId=7, stepExecutionId=163, stepName=simpleStep], headers={sequenceNumber=49, correlationId=7:simpleStep, id=02bb3c11-ae59-fd1f-60e4-4715a8ac5cec, sequenceSize=50, timestamp=1650973944099}]
2022-04-26 13:52:24.099 INFO 7100 --- [main] o.s.integration.handler.LoggingHandler : GenericMessage [payload=StepExecutionRequest: [jobExecutionId=7, stepExecutionId=163, stepName=simpleStep], headers={sequenceNumber=49, correlationId=7:simpleStep, id=02bb3c11-ae59-fd1f-60e4-4715a8ac5cec, sequenceSize=50, timestamp=1650973944099}]
2022-04-26 13:52:24.099  INFO 7100 --- [main] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=StepExecutionRequest: [jobExecutionId=7, stepExecutionId=163, stepName=simpleStep], headers={sequenceNumber=49, correlationId=7:simpleStep, id=02bb3c11-ae59-fd1f-60e4-4715a8ac5cec, sequenceSize=50, timestamp=1650973944099}]

Then stop the manager and notice that the worker is still processing the messages.

Start up the manager again:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
$ ./gradlew bootRun --args='--spring.profiles.active=manager'
$ ./gradlew bootRun --args='--spring.profiles.active=manager'
$ ./gradlew bootRun --args='--spring.profiles.active=manager'

And it’ll wait until the partitions are complete:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
2022-04-26 14:24:28.040 INFO 16780 --- [main] c.a.b.s.manager.ManagerStarter : Sleeping for a second to wait for the partitions to complete for job 9
2022-04-26 14:24:29.045 INFO 16780 --- [main] c.a.b.s.manager.ManagerStarter : Sleeping for a second to wait for the partitions to complete for job 9
2022-04-26 14:24:30.092 INFO 16780 --- [main] o.s.b.c.l.support.SimpleJobOperator : Checking status of job execution with id=9
2022-04-26 14:24:30.220 INFO 16780 --- [main] o.s.b.c.l.support.SimpleJobOperator : Attempting to resume job with name=partitioningJob and parameters={run.id=5}
2022-04-26 14:24:30.307 INFO 16780 --- [main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=partitioningJob]] launched with the following parameters: [{run.id=5}]
2022-04-26 14:24:30.396 INFO 16780 --- [main] o.s.batch.core.job.SimpleStepHandler : Executing step: [partitionerStep]
2022-04-26 14:24:30.702 INFO 16780 --- [main] o.s.batch.core.step.AbstractStep : Step: [partitionerStep] executed in 306ms
2022-04-26 14:24:30.743 INFO 16780 --- [main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=partitioningJob]] completed with the following parameters: [{run.id=5}] and the following status: [COMPLETED] in 404ms
2022-04-26 14:24:28.040 INFO 16780 --- [main] c.a.b.s.manager.ManagerStarter : Sleeping for a second to wait for the partitions to complete for job 9 2022-04-26 14:24:29.045 INFO 16780 --- [main] c.a.b.s.manager.ManagerStarter : Sleeping for a second to wait for the partitions to complete for job 9 2022-04-26 14:24:30.092 INFO 16780 --- [main] o.s.b.c.l.support.SimpleJobOperator : Checking status of job execution with id=9 2022-04-26 14:24:30.220 INFO 16780 --- [main] o.s.b.c.l.support.SimpleJobOperator : Attempting to resume job with name=partitioningJob and parameters={run.id=5} 2022-04-26 14:24:30.307 INFO 16780 --- [main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=partitioningJob]] launched with the following parameters: [{run.id=5}] 2022-04-26 14:24:30.396 INFO 16780 --- [main] o.s.batch.core.job.SimpleStepHandler : Executing step: [partitionerStep] 2022-04-26 14:24:30.702 INFO 16780 --- [main] o.s.batch.core.step.AbstractStep : Step: [partitionerStep] executed in 306ms 2022-04-26 14:24:30.743 INFO 16780 --- [main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=partitioningJob]] completed with the following parameters: [{run.id=5}] and the following status: [COMPLETED] in 404ms
2022-04-26 14:24:28.040  INFO 16780 --- [main] c.a.b.s.manager.ManagerStarter           : Sleeping for a second to wait for the partitions to complete for job 9
2022-04-26 14:24:29.045  INFO 16780 --- [main] c.a.b.s.manager.ManagerStarter           : Sleeping for a second to wait for the partitions to complete for job 9
2022-04-26 14:24:30.092  INFO 16780 --- [main] o.s.b.c.l.support.SimpleJobOperator      : Checking status of job execution with id=9
2022-04-26 14:24:30.220  INFO 16780 --- [main] o.s.b.c.l.support.SimpleJobOperator      : Attempting to resume job with name=partitioningJob and parameters={run.id=5}
2022-04-26 14:24:30.307  INFO 16780 --- [main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=partitioningJob]] launched with the following parameters: [{run.id=5}]
2022-04-26 14:24:30.396  INFO 16780 --- [main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [partitionerStep]
2022-04-26 14:24:30.702  INFO 16780 --- [main] o.s.batch.core.step.AbstractStep         : Step: [partitionerStep] executed in 306ms
2022-04-26 14:24:30.743  INFO 16780 --- [main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=partitioningJob]] completed with the following parameters: [{run.id=5}] and the following status: [COMPLETED] in 404ms

Perfect. The job has completed successfully without reprocessing any of the partitions.

Summary

In my opinion this could be a solution provided out-of-the box by Spring Batch but maybe it’s too specific. I’m not sure. Maybe they could just provide a “smart resume” flag configured via properties that does this. We’ll see.

One more note on the implementation. It’s currently considering only a single job to restart. If you need a full-fledged solution that’ll restart any partitioned job, there are a couple of changes that need to be made. Maybe I’ll bring that as well later on.

Hope you liked it, here’s the GitHub code on the master-crash branch.

Follow me on Twitter and Facebook for more.

Leave a Reply

Your email address will not be published. Required fields are marked *