I was recently tasked with summarizing the data of a several-million-row table, and the task proved to be a bit grueling at first. Eventually, I found a way to summarize the large dataset with Spring Batch, but not without a wrong turn or two at first. In this post, I’ll walk you through my process and how I overcame this problem.
Disclaimer: To protect any proprietary or private information, I have fictionalized the specific facts in this example.
Problem Statement
A large, long-term study on blood pressure has millions of participants, and each of them will have several data points stored over the course of the study. Additionally, there are several fields that need to be aggregated for further reporting. All this to say, this is a very large dataset we’re processing here.
The individual data points are stored in a BLOOD_PRESSURE
table. This table is linked to a PATIENT_INFO
table by the PATIENT_ID
. We do not have access to the PATIENT_INFO
table for HIPPA reasons, so we need to populate a table that another process can then cross-reference to find other patient data (like age, ethnicity, etc.).
The output data defines what we need to read in and what the item we will put the data into will look like. It’s a simple POJO object with readers and writers for the minimum, maximum, average, and standard deviation for each of the systolic, diastolic, and pulse fields.
First Cut
A simple JdbcCursorItemReader
should do the trick, right? A simple mapper to the POJO item, and away we go…
@Bean(name = "bpReader") @StepScope private ItemReader<MinMaxLoadItem> createBpReader(DataSource bpDatasource, BpItemMapper mapper) { JdbcCursorItemReader<MinMaxLoadItem> bpReader = new JdbcCursorItemReader<>(); bpReader.setDataSource(bpDatasource); bpReader.setRowMapper(mapper); bpReader.setSql(createReadSql()); return bpReader; } private String createReadSql() { StringBuilder readSql = new StringBuilder("select"); readSql.append(" PATIENT_ID"); readSql.append(",min(BP_SYSTOLIC) as BP_SYSTOLIC_MIN"); readSql.append(",max(BP_SYSTOLIC) as BP_SYSTOLIC_MAX"); readSql.append(",avg(BP_SYSTOLIC) as BP_SYSTOLIC_AVG"); readSql.append(",stdev(BP_SYSTOLIC) as BP_SYSTOLIC_STD_DEV"); readSql.append(",min(BP_DIASTOLIC) as BP_DIASTOLIC_MIN"); readSql.append(",max(BP_DIASTOLIC) as BP_DIASTOLIC_MAX"); readSql.append(",avg(BP_DIASTOLIC) as BP_DIASTOLIC_AVG"); readSql.append(",stdev(BP_DIASTOLIC) as BP_DIASTOLIC_STD_DEV"); readSql.append(",min(BP_PULSE) as BP_PULSE_MIN"); readSql.append(",max(BP_PULSE) as BP_PULSE_MAX"); readSql.append(",avg(BP_PULSE) as BP_PULSE_AVG"); readSql.append(",stdev(BP_PULSE) as BP_PULSE_STD_DEV"); readSql.append(" from STUDY_1069A.BLOOD_PRESSURE"); readSql.append(" group by PATIENT_ID"); return readSql.toString(); }
Sure, it will eventually work, but with millions of patients in the database (each of which with years of blood pressure data) the summarization SQL will take a very long to complete. Not only is this method incredibly slow, but it could very well overtax the database server as well. Yuck.
We need a way to perform the aggregation in bits so the database can continue to perform all its other duties in a timely fashion.
Enter the JdbcPagingItemReader
There are several other places in our application where we need to page data out of the database to display on the screen to a user, so I went looking for a Spring Batch mechanism to take advantage of that ability and to quickly summarize my large dataset.
Lo and behold, I found the JdbcPagingItemReader
. It allows us to specify how many rows of data we want to be returned, which effectively chunks up how many records get summarized in one go.
It also utilizes a PagingQueryProvider
. This serves to break the SQL into several parts, so it can more easily manipulate what it’s going to send to the database server for execution. The creation of the actual PagingQueryProvider
is handled by Spring through the factory pattern.
In this case, we want to use the SqlPagingQueryProviderFoctoryBean
. Setting this up looks like this:
@Bean(name = "bpReader") @StepScope private ItemReader<MinMaxLoadItem> createBpReader(DataSource bpDatasource, BpItemMapper mapper, PagingQueryProvider provider) throws Exception { JdbcPagingItemReader<MinMaxLoadItem> result = new JdbcPagingItemReader<>(); result.setName("minMaxLoadItemReader"); result.setDataSource(bpDatasource); result.setPageSize(1000); result.setRowMapper(mapper); result.setQueryProvider(provider); return result; } @Bean(name = "pbQueryProviderFactory") protected SqlPagingQueryProviderFactoryBean createQueryProviderFactory() { SqlPagingQueryProviderFactoryBean factory = new SqlPagingQueryProviderFactoryBean(); factory.setDataSource(mccSqlDatasource); factory.setSelectClause(createReadSelect()); factory.setFromClause("from STUDY_1069A.BLOOD_PRESSURE"); factory.setGroupClause("group by PATIENT_ID"); factory.setSortKey("PATIENT_ID"); return factory; } private String createReadSelect() { StringBuilder readSql = new StringBuilder("select"); readSql.append(" PATIENT_ID"); readSql.append(",min(BP_SYSTOLIC) as BP_SYSTOLIC_MIN"); readSql.append(",max(BP_SYSTOLIC) as BP_SYSTOLIC_MAX"); readSql.append(",avg(BP_SYSTOLIC) as BP_SYSTOLIC_AVG"); readSql.append(",stdev(BP_SYSTOLIC) as BP_SYSTOLIC_STD_DEV"); readSql.append(",min(BP_DIASTOLIC) as BP_DIASTOLIC_MIN"); readSql.append(",max(BP_DIASTOLIC) as BP_DIASTOLIC_MAX"); readSql.append(",avg(BP_DIASTOLIC) as BP_DIASTOLIC_AVG"); readSql.append(",stdev(BP_DIASTOLIC) as BP_DIASTOLIC_STD_DEV"); readSql.append(",min(BP_PULSE) as BP_PULSE_MIN"); readSql.append(",max(BP_PULSE) as BP_PULSE_MAX"); readSql.append(",avg(BP_PULSE) as BP_PULSE_AVG"); readSql.append(",stdev(BP_PULSE) as BP_PULSE_STD_DEV"); return readSql.toString(); }
Basically, behind the scenes, this is getting a TOP
number of rows to return. In this example, it adds TOP (1000)
after the select
, limiting how many summarization rows will be returned.
In order to ensure it doesn’t grab the same records over and over, it uses the field you specified in the factory.setSortKey()
to limit the calls after the first by adding a WHERE
clause.
In this example, it adds something like where PATIENT_ID > :maxId
to the SQL statement, setting the parameter to the last value returned by the previous call. This allows it to start the next set in the right place without having to recalculate the preceding rows to determine what to skip.
Rather clever, actually.
Writing the Results
With the above configuration, Spring Batch will now have SQL summarize the dataset for 1,000 patients at a time. Good. However, it will still try to read all the patients into memory before writing anything out.
(Disclaimer: This example has no need of processing the read data into anything else, so we are ignoring that part of the process, no pun intended.)
Anyways, this is handled by setting the chunk size when defining the step that used the reader we created above. This allows Spring Batch to hold more than 1,000 patients’ worth of data in memory before writing it out. Thus, limiting the number of commits needed without requiring them all to be read first.
Summarization in Summary
After some experimentation, we find that a balance between chunk size and page size results in a batch process that runs in a timely fashion without burdening the database server inordinately.
While there are certainly ways to accomplish this same functionality directly on the database server, this method allows for less aggressive use of the server’s resources. It also doesn’t require permissions on that server that some DBAs are less likely to grant.
So, problem solved. To summarize a large dataset like the one in this example (depending on your goals of course) can be quickly and safely handled with Spring Batch.
Also, if you enjoyed this post, I recommend that you check out this one on Flat File processing with Apache Camel and Spring Boot, or just visit the Keyhole Dev Blog – lots of good reads on there.
Happy programming!
About the Author
Clayton Neff has been programming computers he had to throw switches to get the darn things to boot up. That is why he is contemplating retirement. He was introduced to object-oriented programming in the mid-1980s, and that has been so much fun that he still enjoys what he does.
If you have any questions or comments, please feel free to contact him at [email protected]