Featured image for “Scaling Spring Batch – Step Partitioning”

Scaling Spring Batch – Step Partitioning

December 9, 2013


Attention: The following article was published over 12 years ago, and the information provided may be aged or outdated. Please keep that in mind as you read the post.

We have talked about how to get up and running with Spring Batch in a few of previous articles. Now, we’re going to start discussing some of the strategies available for scaling Spring Batch.

This article will focus on partitioning a step so that the step has several threads that are each processing a chunk of data in parallel. This is beneficial if you have a large chunk of data that can be logically split up into smaller chunks that can be processed in parallel. The way this works is that you will define a master step that is responsible for determining the basis of the chunks, and then farming all of those chunks out to a set of slave steps to process each chunk.

Partitioning

If I could go back in time to a previous experience, a perfect example of this would be processing all of the daily invoices for each company within a large procurement system. Our data that would be processed could be split up logically by each company being processed. Let’s say that there are 250 companies that participate in this procurement system and our partitioned step has been defined to have 15 threads. Our Partitioner would likely run a query to find all of the companies that have invoices waiting to be processed for that day. The Partitioner’s responsibility at that point would be to create an ExecutionContext for each of those companies and add that to a map with a unique key. That ExecutionContext should contain any information required to process invoices for that company, such as the company ID and any other relevant information. When the Partitioner returns the map of ExecutionContexts, Spring Batch will then create a new Step for each entry in the map and use the key value as part of the step name. Based upon the configuration, such as 15 threads, it will then create a pool of 15 threads and start executing the steps in parallel 15 at a time. For example, if you had 85 steps, Spring Batch would start executing 15 steps and as each step completed, the thread that had completed the step would pick up the next step and begin execution until all steps have been completed.

An Example

So now that we have a basic understanding of how partitioning works, let’s take a look at a simple example. For our use case, we’re going to be inspecting an inbound directory where incoming vendor catalog files will be dumped. So to create a spring batch partitioner, we’ll need to create a class that implements the Spring Batch’s Partitioner interface. Since this is something generic and can be made reusable, we’re going to call this class MultiFileResourcePartitioner, which is a simple POJO and has only one field name “inboundDir” representing the path to the directory that contains the files to be processed. The Partitioner interfaces specifies that the class should implement a method named “partition” that takes an int parameter representing the grid size and returns a Map that holds the ExecutionContext.

Here’s the class listing for MultiFileResourcePartitioner:

package com.keyhole.example.partition;

import java.io.File;
import java.util.HashMap;
import java.util.Map;

import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.core.io.FileSystemResource;

public class MultiFileResourcePartitioner implements Partitioner {

	private String inboundDir;

	@Override
	public Map<String, ExecutionContext> partition(int gridSize) {

		Map<String, ExecutionContext> partitionMap = new HashMap<String, ExecutionContext>();
		File dir = new File(inboundDir);
		if (dir.isDirectory()) {
			File[] files = dir.listFiles();
			for (File file : files) {
				ExecutionContext context = new ExecutionContext();
				context.put("fileResource", file.toURI().toString());
				partitionMap.put(file.getName(), context);
			}
		}
		return partitionMap;
	}

	public String getInboundDir() {
		return inboundDir;
	}

	public void setInboundDir(String inboundDir) {
		this.inboundDir = inboundDir;
	}

}

The configuration for this bean in our application context will look like this:

<bean id="inventoryFilePartitioner" class="com.keyhole.example.partition.MultiFileResourcePartitioner" scope="step">
		<property name="inboundDir" value="/data/inbound" />
</bean>

Looking at the partition method that is implemented, we’re simply going to list all of the files that are found in the specified inbound directory and create an ExecutionContext for each file that is found add it to the map that is returned. The unique key that is used to place each ExecutionContext into the map will also become part of the step name that is created for each entry in the map. Spring Batch will use that partition map to create a slave step from each of the keys that are found in the map.

To partition a step, you need to first create the step that will be referenced by the partition configuration. This step should configured just like any other step within a Spring Batch and for this example we’ll define a FlatFileItemReader and a simple ItemWriter that will just call the toString() method and log it to the console.

Here are the configuration details for the step and its associated spring beans. The important thing to note here is that we have the ItemReader scoped at the step level so that we don’t run into any issues with multiple threads using the same beans to process the data. We also need them scoped that way so that we can use the Spring late binding to specify the value holding the file resource in the Step’s ExecutionContext.

<batch:step id="inventoryLoadStep" xmlns="http://www.springframework.org/schema/batch">
	<batch:tasklet transaction-manager="transactionManager">
		<batch:chunk reader="inventoryLoadReader" writer="logItemWriter" commit-interval="5000" />
	</batch:tasklet>
</batch:step>
<bean name="inventoryLoadReader" scope="step" class="org.springframework.batch.item.file.FlatFileItemReader">
	<property name="resource" value="#{stepExecutionContext['fileResource']}" />
	<property name="lineMapper" ref="inventoryLineMapper" />
<property name="linesToSkip" value="1" />
</bean>

<bean name="inventoryLineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
	<property name="fieldSetMapper" ref="inventoryFieldMapper" />
	<property name="lineTokenizer" ref="inventoryLineTokenizer" />
</bean>

<bean name="inventoryLineTokenizer" class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer" />

Since we are reading and processing comma delimited text files in this example, we have very little code to write for this step configuration. The only code we’ll need to implement is the FieldSetMapper required to map the contents of the line to the object representing the file record. Each line in the file will contain the fields “category”,”sub category”,”description”,”catalog num”,”color”,”size”,”price” and “qty.” So our object will contain those fields and our FieldSetMapper code listing will look like this.

package com.keyhole.example.partition;

import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.stereotype.Component;
import org.springframework.validation.BindException;

@Component("inventoryFieldMapper")
public class InventoryItemFieldSetMapper implements FieldSetMapper<InventoryItem> {

	@Override
	public InventoryItem mapFieldSet(FieldSet fieldSet) throws BindException {
		InventoryItem item = new InventoryItem();
		item.setCategory(fieldSet.readString(0));
		item.setSubCategory(fieldSet.readString(1));
		item.setDescription(fieldSet.readString(2));
		item.setCatalogNum(fieldSet.readString(3));
		item.setColor(fieldSet.readString(4));
		item.setSize(fieldSet.readString(5));
		item.setPrice(fieldSet.readDouble(6));
		item.setQty(fieldSet.readInt(7));
		return item;
	}

}

Now that we have the Partitioner and Step created and configured, all that remains left to do is configure the partitioned step itself! And it’s as easy as this:

<batch:job id="InventoryLoader">
	<batch:step id="partitionedInventoryLoadStep">
		<batch:partition step="inventoryLoadStep" partitioner="inventoryFilePartitioner">
			<batch:handler grid-size="10" task-executor="inventoryLoadTaskExecutor" />
		</batch:partition>
	</batch:step>
</batch:job>

When configuring the partitioned step, you define a step just as you would any other step by giving it an ID and if required a next step value. Instead of defining the contents of a step as a normal chunk or tasklet, Spring Batch provides a partition tag that requires you to specify the job step to be partitioned and the Partitioner that will be used to determine the chunks of data. You will also need to define the partition handler that will be processing those steps, in this case we’ll be using a ThreadPoolTaskExecutor which will have a thread pool size of 10 and allow them to timeout if they aren’t being used.

<bean id="inventoryLoadTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
	<property name="corePoolSize" value="10" />
	<property name="maxPoolSize" value="10" />
	<property name="allowCoreThreadTimeOut" value="true" />
</bean>

So, if you have a Spring Batch process that has a step processing a large amount of records and you’re interested in improving the performance, consider giving step partitioning a try. It should be easy to implement and provide you with some additional performance to help speed up processing times.

Additional Resources

For the example code associated with this article, I have uploaded the source code to a github repository at https://github.com/jonny-hackett/batch-example. To execute the example code related to this article, there is a JUnit test name InventoryLoadTest. The data files are located under src/test/resources/data/inbound and need to be placed into a local directory that matches the Partitioner inbound directory. Also, check out http://docs.spring.io/spring-batch/reference/html/scalability.html.

— Jonny Hackett, [email protected]

Spring Batch Blog Series

Part One: Introducing Spring Batch

Part Two:  Getting Started With Spring Batch

Part Three: Generating Large Excel Files Using Spring Batch

Scaling Spring Batch – Step Partitioning

Spring Batch Unit Testing and Mockito

Spring Batch – Replacing XML Job Configuration With JavaConfig

Spring Batch Testing & Mocking Revisited with Spring Boot

About The Author

More From Jonny Hackett

About Keyhole Software

Expert team of software developer consultants solving complex software challenges for U.S. clients.

Share This Post

Related Posts


Discuss This Article

Subscribe
Notify of
guest
19 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
David
David
10 years ago

I have a Spring Batch which currently uses this implementation of partitioning, the corePoolSize and maxPoolSize are set in a property file. However, because I have some external dependencies, I cannot run the ideal max pool size all day. The system can handle a much higher load at night than during the day, and each day/night could have external factors changing the number of threads I would be able to call at a given time.

My question, is there a way to control the number of threads in the thread pool at run time? I would like the pool size to change depending on the average response time, or some factor like that.

Jonny Hackett
Jonny Hackett
10 years ago
Reply to  David

Hi David, I’m not sure if there is a way to dynamically set the pool size during runtime. You might pose this question over at the Spring Batch forums (http://forum.spring.io/forum/spring-projects/batch) or perhaps on Stack Overflow (http://stackoverflow.com/questions/tagged/spring-batch).

Thanks,
Jonny

Hemant
Hemant
10 years ago

Hi Jonny

Nice article. I have one scenario where i have to add one more level of partitioning because there can be multiple folder for each customer within inbound directory and each folder can have multiple files. I want to create a separate thread for each customer. Can i use nested partitioning? The job should be restartable.

Akila
Akila
9 years ago

Hi Jonny, I am using Spring Batch and finally writing the contents to a text file. The issue is, the information available in the threads are randomly written to that text file (Thread1 content, Thread2, Thread 3 then again Thread1 like that). How can I write as ordered. Any idea / suggestion required to handle

Jonny Hackett (@jhackett01)

You might have to write each thread to a separate file and then have a final step collate the files into the proper order needed for the file that will be used. Give that a try.

Dinesh
Dinesh
9 years ago

I have requirement to read multiple files(large size) to read from SFTP/FTP and put in Database. Files can be fixed or delimited. How can I achieve using Spring Batch. File Configuration is stored in Database

Adrian
Adrian
9 years ago

Hi Jonny,
I took your model into consideration in my project that has to process several files and if success to archive/delete them.
Could you please give me a hint what is the best way to archive/delete successfully processed files ?
I tried several options but with no success
-in a afterStep method in StepExecutionListener for slave steps, but the writer holds lock on the file
-in a separate step, after master step, but how cat I share file names successfully processed by slave steps,to delete only those files ? Also thread-safety must by take into consideration.
Thanks in advance,
Adrian

Adrian
Adrian
9 years ago

Hi Johnny,
Thanks for the tutorial, I used it in my application and it works.
Now I have a requirement to delete/archive successfully processed files.
What is the best approach for this, I have tried several ways but no success:
-in an afterStep method on slave step StepExecutionListener but the writer is holding lock on the file
-in a new step after master step, but I don’t know how to get successfully processed file names, I guess this is an information that has to be shared with slave steps, also thread safety must be taken into consideration. This approach also has the disadvantage that it waits for all the slave steps to end.
requirement
Thank you in advance,
Adrian

Jonny Hackett
Jonny Hackett
9 years ago
Reply to  Adrian

Hi Adrian, just a quick thought off the top of my head but you might try storing a list of the successful files in the job’s execution context. Then have a cleanup or archive tasklet after that step. The only problem there is that the files aren’t archived immediately after processing, so if the job is restartable you might need to add some kind of filter on the partitioner that looks for that completed file list.

Paul
Paul
9 years ago

Hi Jonny,

How are the job restarts handled? Say that there are 10 files and the grid size is 10. If the job runs for a few minutes and fails, when it is restarted, does it each file item reader pick up from where it is left off?

Thanks for the nice article.

Paul

Jonny Hackett
Jonny Hackett
9 years ago
Reply to  Paul

Hi Paul, each partition should have its own execution context which keeps track of the read, writes, and commits for that partition. You should be able to test that fairly easy by inducing an error in the file data, correcting the file data and restarting the job.

alekya
alekya
9 years ago

Hi Jonny,

I wrote a partitioner to read multiple files from the folder. I verified and the partition map has 3 filenames as input with this executioncontext info , but only the third file runs and not the other files. Am i missing anything?? when calling this itemreader step?
please let me know.

Good Article

dhruva
dhruva
8 years ago

very sweet!
I will give this a trial for my use case 🙂
Thanks a ton, Jonny!!

SpringBatch Learner
SpringBatch Learner
8 years ago

How does master step knows that the slaves created by partitioner have completed execution before moving to the next step?

SpringBatch Learner
SpringBatch Learner
8 years ago

How does “partitionedInventoryLoadStep” knows that (slave threads)”InventoryLoadStep” are completed before next step if any after “partitionedInventoryLoadStep” to be executed ?

Ravi
Ravi
7 years ago

I’m new to spring batch. I don’t see the file being partioned to small files. Can you please how it is being done?