Using Spring Integration In Conjunction With Spring Batch

Mark Fricke JavaScript, Spring, Spring Batch, Technology Snapshot Leave a Comment

Recently I was working on a development project for a client focused on Spring Batch. The program required a pull of the SFTP directory for an encrypted file, decryption of that file, starting of the Spring Batch program, and archive of that file.

Initially, my first thought was to use a shell script to perform all the tasks. Then one of my colleagues suggested Spring Integration; I thought this was great opportunity to learn and get my hands dirty with something new.

In this blog, I will show an example of Spring Integration configuration code, break it apart, and show how each part works.

Our Example

Spring Integration enables lightweight messaging within Spring apps. It supports integration with external systems using declarative adapters. Those adapters provide a higher level of abstraction over Spring’s support for remoting, messaging, and scheduling.

In this example, we will particularly focus on how Spring Integration provides a messaging paradigm to wire together POJOs and also provides gateway adapters to connect to external sources.

Let’s first start off with showing the entire Spring Integration configuration code and then explain each part.

@Component
@EnableIntegration
public class IntergrationConfiguration {

    @Value("${ftp.upload.dir}")
    private String ftpUploadDir;

    @Bean
    public MessageChannel fileInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel pgpFileProcessor() {
        return new DirectChannel();
    }

    @Bean
    @InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "30000"))
    public MessageSource<File> fileReadingMessageSource() {
        FileReadingMessageSource source = new FileReadingMessageSource();
        source.setDirectory(new File(ftpUploadDir));
        source.setFilter(new SimplePatternFileListFilter("*.pgp"));
        source.setScanEachPoll(true);
        source.setUseWatchService(true);
        return source;
    }

    @Bean
    @ServiceActivator(inputChannel = "jobChannel", outputChannel = "nullChannel")
    protected JobLaunchingMessageHandler launcher(JobLauncher jobLauncher) {
        return new JobLaunchingMessageHandler(jobLauncher);
    }
}  

In my application, I need to pull for a new file that is SFTP to a directory. I use the @InboundChannelAdapater annotation to denote that the bean is an adapter. It is configured to pull every 30 seconds at specified directory for a PGP file.

The FileReadingMessageSource provided by Spring Integration setups of the directory and file filter to scan. When a new file is SFTP to the specified directory, Spring Integration sends a message containing the file to the outbound channel.

@Bean
@InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "30000"))
public MessageSource<File> fileReadingMessageSource() {
    FileReadingMessageSource source = new FileReadingMessageSource();
    source.setDirectory(new File(ftpUploadDir));
    source.setFilter(new SimplePatternFileListFilter("*.pgp"));
    source.setScanEachPoll(true);
    source.setUseWatchService(true);
    return source;
}

To link the InboundChannelAdapter to the Transformer that will decrypt the file, a Channel is used to connect them. Think of a Channel as the pipe that connects two points. The fileInputChannel bean will connect the file puller to my PGP Transformer using a direct, one-to-one channel.

@Bean
public MessageChannel fileInputChannel() {
    return new DirectChannel();
}

The Spring Integration Transformer allows you to convert input to another type for the output. Such as String-to-JSON. Spring Integration provides many built-in transformers such as XML, Stream, Object to Map, and JSON.

In this case, I will convert the encrypted file to a decrypted file using a PGP Java library provided by Sloan Searman. The output channel will send the decrypted file to the next transformer that will create a Spring Batch request job.

@Component
public class PGPTransformer {

    @Value("${pgp.archive.dir}")
    private String archiveDir;

    @Autowired
    private PGPFileProcessor pgpFileProcessor;

    @Transformer(inputChannel = "fileInputChannel", outputChannel = "fileToJobProcessor")
    public File transform(File aFile) throws Exception {

        if (!pgpFileProcessor.decrypt(aFile)) {
		throw new Exception("Failed to decrypted input file.");
	}
			
        //Move old file to archive directory.
        if (aFile.renameTo(new File(archiveDir + "/" + aFile.getName()))) {
            log.info(String.format("%s file archived to %s", aFile.getName(), aFile.getAbsolutePath()));
        }
        return pgpFileProcessor.getDecryptFile();
    }
}

The FileToJobTransformer will take the input file and create a Spring Batch job request with the file as a job parameter.

@Component
public class FileToJobTransformer implements ApplicationContextAware {

    private static final Logger log = LoggerFactory.getLogger(FileToJobTransformer.class);

    @Autowired
    private Job job;

    private ApplicationContext context;

    @Transformer(inputChannel = "fileToJobProcessor", outputChannel = "jobChannel")
    public JobLaunchRequest transform(File aFile) {

        String fileName = aFile.getAbsolutePath();

        JobParameters jobParameters = new JobParametersBuilder()
                .addString("fileName", fileName)
                .addDate("dateTime", new Date())
                .toJobParameters();


        JobLaunchRequest request = new JobLaunchRequest(job, jobParameters);

        return request;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.context = applicationContext;
    }
}

The final piece in my Spring Integration is the Service Activator that will start the Spring Batch job. The incoming message which is my job parameters will be passed into the built-in job launcher provided by Spring Batch.

Since this is end of my integration, my output channel is defined as nullChannel.

@Bean
@ServiceActivator(inputChannel = "jobChannel", outputChannel = "nullChannel")
protected JobLaunchingMessageHandler launcher(JobLauncher jobLauncher) {
    return new JobLaunchingMessageHandler(jobLauncher);
}

Final Thoughts

And there you have it! Spring Integration turned out to be a simple solution to my client’s needs. Using Spring Integration and Spring Batch with Spring Boot, I was able to have a single deployable jar that included everything to run the application. I no longer needed separate deployments for the shell script, and batch process and all code is one Java project.

I like to get my hands dirty and learn something new. I’ve barely scratched the service of what is capable with Spring Integration, but I know I will have fun playing in the mud in the meantime.


About the Author
Mark Fricke

Mark Fricke

As consultant for Keyhole Software, Mark has 20 years of experience in the software industry. He primarily works with Java and C#, but has worked with variety of other languages such as COBOL and C/C++. He has worked on almost every type platform from mainframe dinosaurs, classic desktop, and modern mobile applications. When he is not working, then he is with his family or playing in his bands.


Share this Post

Leave a Reply