Spring Batch Job Flow

Spring Batch Job Flow Using a JobExecutionDecider

Jonny Hackett Development Technologies, Java, Spring, Spring Batch, Tutorial Leave a Comment

In this tutorial for Spring Batch, we’re going to take a look at Programmatic Flow decisions in a Spring Batch job using Spring’s JobExecutionDecider. Spring Batch is a pretty powerful framework and this is another useful tool to have in your Spring Batch toolbox.

To demonstrate, I’ll use a scenario that came up recently while working on my client’s project. After explaining the situation and my goals, I’ll jump into a detailed, step-by-step guide. Let’s get started!

The Scenario

As I mentioned above, I’ll be using a use case I came across while helping my current client. This client needed to standardize some data extracts into one extract format that could be provided to multiple clients in an easily maintained process.

For most jobs, I would configure the job to take some parameters that would tell the job which client it’s running for and then look up the configuration for the client. Then, I would create the extract using the configuration that was looked up. However, for every new client we set up in this manner, we also have to take into account a new scheduling setup along with creating its configuration.

For this new scenario, I wanted to be able to create an extract format once, setup the schedule for it, and then simply update the client configurations that utilized that particular extract and schedule.

To keep things simple, we stored the array of client configurations for this extract in a JSON file, which eventually ended up being stored as a JSON string in a database table. When this new extract job runs, it expects a parameter to be passed in as the key to look up the correct extract configuration.

The structure of the JSON-based extract configuration is an array of client configurations that contains information for each client. That info contains values such as the company ID, company name, output location, and where to send it. When the job executes, it looks up that extract configuration, parses the JSON, loops through all of the clients configured, and then generates a data extract for each client.

Since we’ll be focusing on how the JobExecutionDecider helps us with managing programmatic flow, the steps for reading the configuration and producing the extracts will be simplified to simple “task-lets,” showing that they’ve been executed.

JobExecutionDecider to the Rescue

What the JobExecutionDecider allows us to do is direct how the job flow executes based upon whatever conditions we determine. It can be used in many different ways, and you can think of it as the “Yes/No” part of a flow diagram.

In this particular case, I’ll be looping through the array of clients and executing the same step for each client. Here’s a simple outline of how the job will execute:

JobExecutionDecider for Spring Batch Job Flow

Now, let’s get started with our first task-let.

Loading Client Configurations

First, let’s create the task-let that will load the client configurations.

In the real scenario, this would take the extract configuration key from the job parameters and look up the configuration in the database. We’ll be coding the simplified version for time’s sake in this example.

The code, which is included below, involves three parts: ClientConfig, JobDeciderExampleConfigTasklet, and ClientConfigurations. Let’s begin.

ClientConfig

ClientConfig is a bean representing the specific client data). This class is a simple POJO that contains the information for each client configuration.

package com.example.demo.batch.jobflow;

public class ClientConfig {
	private int clientId;
	private String clientName;
	private String clientDir;
	private String clientEmail;
	public int getClientId() {
		return clientId;
	}
	public void setClientId(int clientId) {
		this.clientId = clientId;
	}
	public String getClientName() {
		return clientName;
	}
	public void setClientName(String clientName) {
		this.clientName = clientName;
	}
	public String getClientDir() {
		return clientDir;
	}
	public void setClientDir(String clientDir) {
		this.clientDir = clientDir;
	}
	public String getClientEmail() {
		return clientEmail;
	}
	public void setClientEmail(String clientEmail) {
		this.clientEmail = clientEmail;
	}
	@Override
	public String toString() {
		return "ClientConfig [clientId=" + clientId + ", clientName=" + clientName + ", clientDir=" + clientDir
				+ ", clientEmail=" + clientEmail + "]";
	}
	
}

JobDeciderExampleConfigTasklet

JobDeciderExampleConfigTasklet is a configuration tasklet. As stated earlier, this is hardcoded to some values to create a simple scenario for our example.

package com.example.demo.batch.jobflow;

import java.util.ArrayList;
import java.util.List;

import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;

public class JobDeciderExampleConfigTasklet implements Tasklet {
	
	@Autowired
	private ClientConfigurations clientConfigurations;

	@Override
	public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {

		
		List<ClientConfig> configurations = new ArrayList<>();		
		configurations.add(buildConfig(100, "Acme, LLC","/clients/acme","coyote@acme.com"));
		configurations.add(buildConfig(101, "MTV","/clients/mtv","beavis@mtv.com"));
		configurations.add(buildConfig(102, "Nasa","/clients/nasa","armstrong@nasa.gov"));
		clientConfigurations.setConfigProcessCount(0);
		clientConfigurations.setConfigurationsToProcess(configurations);
		return RepeatStatus.FINISHED;
	}

	private ClientConfig buildConfig(int clientId, String clientName, String clientDir, String clientEmail) {
		ClientConfig config = new ClientConfig();
		config.setClientId(clientId);
		config.setClientName(clientName);
		config.setClientDir(clientDir);
		config.setClientEmail(clientEmail);
		return config;
	}

}

ClientConfigurations

ClientConfigurations is a bean holder for client configurations. This particular class is where we hold the list of client configurations for the job to process and track the configuration to process. More on this later.

package com.example.demo.batch.jobflow;

import java.util.List;

public class ClientConfigurations {

	private List<ClientConfig> configurationsToProcess;
	private Integer configProcessCount;

	public ClientConfig getCurrentConfig() {
		return this.configurationsToProcess.get(this.getConfigProcessCount());
	}

	public List<ClientConfig> getConfigurationsToProcess() {
		return configurationsToProcess;
	}

	public void setConfigurationsToProcess(List<ClientConfig> configurationsToProcess) {
		this.configurationsToProcess = configurationsToProcess;
	}

	public Integer getConfigProcessCount() {
		return configProcessCount;
	}

	public void setConfigProcessCount(Integer configProcessCount) {
		this.configProcessCount = configProcessCount;
	}
}

Generating the Data Extract

Next, we need to create the task-let for generating the data extract.

Again, for the sake of keeping things simple in this example, we will just produce some logging that the step is being executed. In the real job, this would have a reader, optionally a processor, and a writer for producing the data extract.

Notice how it calls the auto-wired ClientConfigurations.getCurrentConfig() method. More on that later.

ClientReportTasklet

ClientReportTasklet is used to process the extract.

package com.example.demo.batch.jobflow;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;

public class ClientReportTasklet implements Tasklet {

	private static final Logger LOG = LoggerFactory.getLogger(ClientReportTasklet.class);

	@Autowired
	private ClientConfigurations clientConfigurations;

	@Override
	public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
		ClientConfig config = this.clientConfigurations.getCurrentConfig();
		LOG.info("Processing Report for Client: {}", config);

		return RepeatStatus.FINISHED;
	}

}

JobExecutionDecider

Now that we have our configuration task-let and report task-let defined, let’s talk a little bit about the JobExecutionDecider and how it will help us with the programmatic flow of the job.

The JobExecutionDecider is an interface provided by Spring Batch that, when implemented, gives you the ability to examine a condition during job execution and return a FlowExecutionStatus value to determine the next step in the job.

This can be used in various ways, but in our scenario, we need to determine if there is another client configuration to process. Here’s what our code looks like for this.

ProcessNextClientConfig

This bit is the implmented JobExecutionDecider.

package com.example.demo.batch.jobflow;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import org.springframework.beans.factory.annotation.Autowired;

public class ProcessNextClientConfig implements JobExecutionDecider {

	public static final String NEXT_CLIENT_CONFIG = "NextClientConfig";

	@Autowired
	private ClientConfigurations clientConfigurations;

	@Override
	public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
		if (this.clientConfigurations
				.getConfigProcessCount() < this.clientConfigurations.getConfigurationsToProcess().size() - 1) {
			this.clientConfigurations.setConfigProcessCount(this.clientConfigurations.getConfigProcessCount() + 1);
			return new FlowExecutionStatus(NEXT_CLIENT_CONFIG);
		}
		return FlowExecutionStatus.COMPLETED;
	}

}

In the “decide” method above, we have access to the JobExecution and StepExeuction. For our example, however, we are checking the auto-wired bean ClientConfigurations to see if there are any more clients configured to process.

If you look back at the ClientConfiguratons class we created, it has a list of client configurations and a process count. There’s also a method called ClientConfiguratons.getCurrentConfig() that uses the process count to grab the current configuration to process (we used it in the ClientReportTasklet).

What we’re doing is comparing the size of the client configuration list to the current value of the process count. If the count is less than the size of the list, then we return a FlowExecutionStatus of NextClientConfig. At the same time, it increments the process count value so that the next time, the correct configuration is pulled when calling ClientConfigurations.getCurrentConfig(). If there are no more extracts to create, then it returns a FlowExeuctionStatus of COMPLETED.

Our example is pretty simple, but you could do a number of things here to determine the next step in the flow of the job execution.

Spring Batch Job Configuration

That leads us to the Spring Batch Job configuration and piecing this all together! For now, I’ll just share the important code snippets for the example, but don’t worry, I’ll share the full job configuration at the end of the article.

Back to ClientConfigurations

First, let’s step back to the ClientConfigurations bean. You’ve probably noticed (and I’ve mentioned it a couple of times) that it’s being auto-wired in wherever it’s used. I chose to create a simple POJO bean that is context aware to hold the client configurations. It’s easy to use and equally easy to reference.

In the job configuration, it looks like this:

	@Bean
	@JobScope
	public ClientConfigurations clientConfigurations() {
		return new ClientConfigurations();
	}

You’ll notice I added the @JobScope annotation on there. That’s to ensure that a new bean is created every time there is a new execution of the job. When that is instantiated, the bean is stored in the context for that specific job execution, which makes it easily available to be used via dependency injection (i.e. @Autowired).

ProcessNextClientConfig

Next is the code snippet from the Job configuration that defines the flow and how we use our JobExecutionDecider that we implemented as the ProcessNextConfig bean.

	@Bean
	public Job jobExecution() {
		return this.jobBuilderFactory.get(JOB_NAME).start(jobConfigStep())
				.next(startClientConfigStep())
				.next(clientReportStep())
				.next(processNextClientConfig()).on(ProcessNextClientConfig.NEXT_CLIENT_CONFIG).to(startClientConfigStep())
				.from(processNextClientConfig()).on(FlowExecutionStatus.COMPLETED.getName()).end()
				.build().build();
	}

Here’s the explanation for the first few pieces of this job flow.

  • jobConfigStep: This is the configuration step that looks up and builds the list of ClientConfig objects that make up the ClientConfigurations.
  • startClientConfigStep: This is a simple task-let that just gives us a starting point for the loop process as it loops through each configuration.
  • clientReportStep: This is the step that produces the data extract based on the current client configuration.

Next in the flow is where we use our ProcessNextClientConfig JobExecutionDecider to conditionally determine the next steps.

next(processNextClientConfig()).on(ProcessNextClientConfig.NEXT_CLIENT_CONFIG).to(startClientConfigStep())

This first line establishes that we’re starting a new flow point and states that if the status returned from the ProcessNextClientConfig JobExecutionDecider is NextClientConfig, it should run the startClientConfigStep next. This is a simple task-let that we use as a pointer on the next iteration of the client configuration loop.

The next line establishes another flow definition starting from the already established flow point that we defined in the previous line.

.from(processNextClientConfig()).on(FlowExecutionStatus.COMPLETED.getName()).end()

This states that if the status returned from the ProcessNextClientConfig JobExecutionDecider is COMPLETED, then the job should end.

When executing the job, the output in the logs should look something like this:

Job: [FlowJob: [name=JobExecutionDeciderExample-Job]] launched with the following parameters: [{}]
Executing step: [jobConfigStep]
Step: [jobConfigStep] executed in 13ms
Executing step: [startClientConfig]
Begin process for client: ClientConfig [clientId=100, clientName=Acme, LLC, clientDir=/clients/acme, clientEmail=coyote@acme.com]
Step: [startClientConfig] executed in 5ms
Executing step: [clientReportStep]
Processing Report for Client: ClientConfig [clientId=100, clientName=Acme, LLC, clientDir=/clients/acme, clientEmail=coyote@acme.com]
Step: [clientReportStep] executed in 4ms
Duplicate step [startClientConfig] detected in execution of job=[JobExecutionDeciderExample-Job]. If either step fails, both will be executed again on restart.
Executing step: [startClientConfig]
Begin process for client: ClientConfig [clientId=101, clientName=MTV, clientDir=/clients/mtv, clientEmail=beavis@mtv.com]
Step: [startClientConfig] executed in 2ms
Duplicate step [clientReportStep] detected in execution of job=[JobExecutionDeciderExample-Job]. If either step fails, both will be executed again on restart.
Executing step: [clientReportStep]
Processing Report for Client: ClientConfig [clientId=101, clientName=MTV, clientDir=/clients/mtv, clientEmail=beavis@mtv.com]
Step: [clientReportStep] executed in 3ms
Duplicate step [startClientConfig] detected in execution of job=[JobExecutionDeciderExample-Job]. If either step fails, both will be executed again on restart.
Executing step: [startClientConfig]
Begin process for client: ClientConfig [clientId=102, clientName=Nasa, clientDir=/clients/nasa, clientEmail=armstrong@nasa.gov]
Step: [startClientConfig] executed in 3ms
Duplicate step [clientReportStep] detected in execution of job=[JobExecutionDeciderExample-Job]. If either step fails, both will be executed again on restart.
Executing step: [clientReportStep]
Processing Report for Client: ClientConfig [clientId=102, clientName=Nasa, clientDir=/clients/nasa, clientEmail=armstrong@nasa.gov]
Step: [clientReportStep] executed in 4ms
Job: [FlowJob: [name=JobExecutionDeciderExample-Job]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 131ms

Important Note:

Before you get too excited, here’s the one drawback of this specific example and use case. If you look closely, you’ll see some log entries like this:

Duplicate step [clientReportStep] detected in execution of job=[JobExecutionDeciderExample-Job]. If either step fails, both will be executed again on restart.

Because we are reusing steps and executing them multiple times, if the job were to fail, it wouldn’t know where to pick up and continue executing on a restart. It’s something to be aware of, especially if you are manipulating data.

For this particular case it’s not a problem. If any extracts were executed a second time restarting after a failure, it would just overwrite the existing files.

That’s one thing you have to keep in mind when designing job flows! You should think about how you will handle failures and any reprocessing that might need to be done.

Wrapping Up

As you can see, it’s a pretty straightforward process to implement and use a JobExecutionDecider to control the flow of your job execution in Spring Batch.

This is a pretty simple example, however, you can create some fairly complicated job flows if you needed to. Just remember, the more complicated you make the batch jobs, the more issues you could potentially create in supporting the batch jobs.

If you have any questions or comments, let me know below, and if you enjoyed this post check out some of the other Spring-related posts I’ve written on the Keyhole Dev Blog.

As promised, here’s the full listing for the job configuration.

JobExecutionDeciderExampleJobConfig (job configuration)

package com.example.demo.batch.xml.read;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.example.demo.batch.jobflow.ClientConfigurations;
import com.example.demo.batch.jobflow.ClientReportTasklet;
import com.example.demo.batch.jobflow.JobDeciderExampleConfigTasklet;
import com.example.demo.batch.jobflow.ProcessNextClientConfig;
import com.example.demo.batch.jobflow.StartClientConfigTasklet;

@Configuration
@EnableBatchProcessing
public class JobExecutionDeciderExampleJobConfig {

	public static final String JOB_NAME = "JobExecutionDeciderExample-Job";

	@Autowired
	private JobBuilderFactory jobBuilderFactory;

	@Autowired
	private StepBuilderFactory stepBuilderFactory;
	

	@Bean 
	@JobScope
	public ClientConfigurations clientConfigurations() {
		return new ClientConfigurations();
	}

	@Bean
	public Step jobConfigStep() {
		return this.stepBuilderFactory.get("jobConfigStep").tasklet(jobConfigTasklet()).build();
	}
	
	@Bean
	public Step clientReportStep() {
		String stepName = "clientReportStep";
		return this.stepBuilderFactory.get(stepName).tasklet(clientReportTasklet()).build();
	}

	@Bean
	public Tasklet jobConfigTasklet() {
		return new JobDeciderExampleConfigTasklet();
	}
	
	@Bean
	public Tasklet clientReportTasklet() {
		return new ClientReportTasklet();
	}
	
	@Bean
	public Tasklet startClientConfigTasklet() {
		return new StartClientConfigTasklet();
	}
	
	@Bean
	public Step startClientConfigStep() {
		return this.stepBuilderFactory.get("startClientConfig").tasklet(startClientConfigTasklet()).build();
	}

	@Bean
	public Job jobExecution() {
		return this.jobBuilderFactory.get(JOB_NAME).start(jobConfigStep())
				.next(startClientConfigStep())
				.next(clientReportStep())
				.next(processNextClientConfig()).on(ProcessNextClientConfig.NEXT_CLIENT_CONFIG).to(startClientConfigStep())
				.from(processNextClientConfig()).on(FlowExecutionStatus.COMPLETED.getName()).end()
				.build().build();
	}

	@Bean
	public ProcessNextClientConfig processNextClientConfig() 
	{
		return new ProcessNextClientConfig();
	}
}
0 0 votes
Article Rating
Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments