Spring Batch partitioning - Part One
Hello everybody!
In this two parts series, I want to show you my small extension library to the Spring Batch framework, composed by an extension to the ResourcePartitioner
and the FileItemReader
.
In this first part, we'll take a look at how partitioning is done in Spring Batch and when it can be useful. I'll be using Kotlin for this project.
Why should I use partitioning?
Partitioning lets us repeat the same batch logic on an arbitrary number of different sources. It can be dynamic, and it can also be parallelized easily. For example, I'll show you how to run the same batch operation on N number of files.
Project set up
Let's create a bare-bone Spring project, with the only dependencies being Spring Batch and H2, using Spring Initializr (link to my configuration).
Now let's define what our batch job should do:
Read the data from a .csv file
Perform some computation on the rows of the file
Write the output to a text file
Defining the batch operation
I'm going to use the data of "CO2 Emissions per person" found on https://www.gapminder.org/data/ (which is a fantastic organization with the mission of spreading knowledge and fighting misconceptions, go take a look at their work).
Let's say that we want to process the data, sum up the total tonnes of emissions of each country and save the results in a text file.
Of course, this specific task would not require the firepower that Spring Batch offers, but we'll use it as a simple example.
Let's get coding
Creating the Job
The first thing we need to do is create our Configuration class and Job bean, in the standard Spring Batch way
@EnableBatchProcessing
@Configuration
class SpringBatchPartitioningConfiguration(
private val jobBuilder: JobBuilderFactory,
@Qualifier("dataProcessingStep")
private val step: Step,
) {
@Bean
fun job(): Job =
jobBuilder
.get("dataProcessing")
.incrementer(RunIdIncrementer())
.start(step)
.build()
}
Creating the Step
Now we will implement the single step that will compose our job, starting by defining some domain objects.
Let's take a look at the structure of the CSV by looking at the header and a \random** row:
country | ... | 2002 | 2003 | ... |
Italy | ... | 8.36 | 8.53 |
where the years go from 1800 to 2018, and we don't always have data for every year.
So the classes I decided to write will be these three:
data class CountryRawData(
val countryName: String,
val yearlyData: List<BigDecimal>,
)
data class CountrySummedData(
val countryName: String,
val totalAmount: BigDecimal,
)
And we now can define how our step is going to look:
@Configuration
class DataProcessingStep(
private val stepBuilder: StepBuilderFactory,
) {
@Qualifier("dataProcessingStep")
@Bean
fun dataProcessingStep(): Step =
stepBuilder
.get("dataProcessingSingleFile")
.chunk<CountryRawData, CountrySummedData>(10)
.reader(reader(FileSystemResource("path/to/input/file")))
.processor(processor())
.writer(writer(FileSystemResource("path/to/output/file")))
.build()
fun reader(file: Resource): ItemReader<CountryRawData> = TODO()
fun processor(): ItemProcessor<CountryRawData, CountrySummedData> = TODO()
fun writer(): ItemWriter<CountrySummedData> = TODO()
}
For the scope of this article, it's not important to go into detail about the implementation of the processor and writer. If you're interested, you'll find all the code and commits as part of the GitHub repo for this project. We'll focus our attention to the ItemReader
:
fun reader(file: Resource): ItemReader<CountryRawData> =
FlatFileItemReader<CountryRawData>()
.apply {
setResource(file)
setLinesToSkip(1)
setLineMapper { line, _ ->
Thread.sleep(50) // Simulating a heavier task
line.toDomain()
}
}
We're using the built-in FlatFileItemReader
, which needs a few basic settings to work:
The
Resource
to readThe mapping function for each line
(optional) The number of lines to skip (in our case we want to skip the header)
That's it, super easy. The FlatFileItemReader
takes care of opening the file, reading each line as a stream and applying our mapping function, and closing the file at the end.
Running it
[main] INFO o.s.b.c.l.s.SimpleJobLauncher - Job: [SimpleJob: [name=dataProcessing]] launched with the following parameters: [{run.id=1}]
[main] INFO o.s.b.c.j.SimpleStepHandler - Executing step: [dataProcessingSingleFile]
[main] INFO ChunkListener - Chunk finished - ReadCount=10, WriteCount=10
[main] INFO ChunkListener - Chunk finished - ReadCount=20, WriteCount=20
[...]
[main] INFO ChunkListener - Chunk finished - ReadCount=180, WriteCount=180
[main] INFO ChunkListener - Chunk finished - ReadCount=190, WriteCount=190
[main] INFO ChunkListener - Chunk finished - ReadCount=194, WriteCount=194
[main] INFO o.s.b.c.s.AbstractStep - Step: [dataProcessingSingleFile] executed in 11s927ms
[main] INFO o.s.b.c.l.s.SimpleJobLauncher - Job: [SimpleJob: [name=dataProcessing]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED] in 11s937ms
Great! Our job runs and takes ~12s to run (with some added sleeping in the app to simulate a heavier task).
Multiple sources
Now our 'boss' comes to us and tells us that he likes our job and that he wants us to run these jobs on more than one file at a time, all done in one single take. So now we have to add the data in the file"Life Expectancy (years)" (we won't question the use case for the sum of the life expectancies from the year 1800).
The structure of the .csv file is the same, so we can reuse all of our code. But how do we run a step for each file?
We could simply add a new step to our job, and extract the file as a parameter to pass to the step, but there's a better way.
Using a Partitioner we can run the same step an arbitrary number of times on different sources, without having to change the job each time. Another important point is that it can be dynamic and doesn't require us to know how many times we will need to run the step at runtime.
Partitioner
Spring Batch offers a way to handle multiple steps that use the same 'sub-step' via the Partitioner
API. So let's define our partitioning step
@Configuration
class DataProcessingPartitioningStepConfiguration(
private val stepBuilder: StepBuilderFactory,
@Qualifier("dataProcessingStep")
private val dataProcessingStep: Step,
) {
private val resources: List<Resource> = TODO()
@Qualifier("dataProcessingPartitioningStep")
@Bean
fun dataProcessingPartitioningStep(): Step =
stepBuilder
.get("dataProcessingSingleFile")
.partitioner("fileProcessingStep", partitioner(resources))
.step(dataProcessingStep)
.build()
private fun partitioner(resources: List<Resource>): Partitioner = TODO()
}
This 'wrapper step' is made up of the 'child step' to be executed for each partition, and the Partitioner
, which will handle the partition creation. But what is a 'partition' exactly? Well, it's nothing more than a context. An ExecutionContext
, to be precise. Which in its simple interpretation is just a map containing the parameters for the step.
Again, Spring Batch offers a built-in partitioner to manage files, the MultiResourcePartitioner
, which creates a context for each resource.
private fun partitioner(resources: List<Resource>): Partitioner =
MultiResourcePartitioner().apply {
setResources(resources.toTypedArray())
}
The context will contain a parameter containing the file name, which then should be used by our data processing step. To do this, we inject the value as a parameter for the reader, taken from the step execution context.
@Bean
@StepScope
fun reader(@Value("#{stepExecutionContext[fileName]}") pathToFile: String): FlatFileItemReader<CountryRawData>
We have to add the @StepScope
annotation, which instantiates the Bean at the moment the step is executed. This is needed because we'll get the fileName at runtime, after the Bean creations.
Running it
[main] INFO o.s.b.c.l.s.SimpleJobLauncher - Job: [SimpleJob: [name=dataProcessing]] launched with the following parameters: [{run.id=1}]
[main] INFO o.s.b.c.j.SimpleStepHandler - Executing step: [dataProcessingMultipleFiles]
[main] INFO ChunkListener - Chunk finished - File=co2_emissions_tonnes_per_person.csv ReadCount=10, WriteCount=10
[main] INFO ChunkListener - Chunk finished - File=co2_emissions_tonnes_per_person.csv ReadCount=20, WriteCount=20
[...]
[main] INFO ChunkListener - Chunk finished - File=co2_emissions_tonnes_per_person.csv ReadCount=190, WriteCount=190
[main] INFO ChunkListener - Chunk finished - File=co2_emissions_tonnes_per_person.csv ReadCount=194, WriteCount=194
[main] INFO o.s.b.c.s.AbstractStep - Step: [dataProcessingSingleFile:partition0] executed in 11s989ms
[main] INFO ChunkListener - Chunk finished - File=life_expectancy_years.csv ReadCount=10, WriteCount=10
[main] INFO ChunkListener - Chunk finished - File=life_expectancy_years.csv ReadCount=20, WriteCount=20
[...]
[main] INFO ChunkListener - Chunk finished - File=life_expectancy_years.csv ReadCount=190, WriteCount=190
[main] INFO ChunkListener - Chunk finished - File=life_expectancy_years.csv ReadCount=195, WriteCount=195
[main] INFO o.s.b.c.s.AbstractStep - Step: [dataProcessingSingleFile:partition1] executed in 12s71ms
[main] INFO o.s.b.c.s.AbstractStep - Step: [dataProcessingMultipleFiles] executed in 24s69ms
[main] INFO o.s.b.c.l.s.SimpleJobLauncher - Job: [SimpleJob: [name=dataProcessing]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED] in 24s80ms
Now we're processing both files and creating two separate output files containing the processed data. Awesome!
We see that the partitions are created, and then executed one after the other. The total time it takes doubled, as we're processing two (almost) identical files.
Now we'll make use of threads to parallelize our steps.
Parallelization
Since our processing doesn't depend on the order in which we process the files, we can make good use of parallelization. To do this, we'll assign a TaskExecutor
to the partitioner, which will be in charge of assigning the tasks to separate Threads.
@Qualifier("dataProcessingPartitioningStep")
@Bean
fun dataProcessingPartitioningStep(): Step =
stepBuilder
.get("dataProcessingMultipleFiles")
.partitioner("fileProcessingStep", partitioner(resources))
.taskExecutor(taskExecutor())
.step(dataProcessingStep)
.build()
private fun partitioner(resources: List<Resource>): Partitioner =
MultiResourcePartitioner().apply {
setResources(resources.toTypedArray())
}
private fun taskExecutor(): TaskExecutor =
SimpleAsyncTaskExecutor().apply {
concurrencyLimit = 2
}
For this example, we'll use the SimpleAsyncTaskExecutor
, which is super straightforward to use. For real business applications, I'd suggest using the ThreadPoolTaskExecutor
.
Running it
[main] INFO o.s.b.c.l.s.SimpleJobLauncher - Job: [SimpleJob: [name=dataProcessing]] launched with the following parameters: [{run.id=1}]
[main] INFO o.s.b.c.j.SimpleStepHandler - Executing step: [dataProcessingMultipleFiles]
[Thread-1] INFO ChunkListener - Chunk finished - File=co2_emissions_tonnes_per_person.csv ReadCount=10, WriteCount=10
[Thread-2] INFO ChunkListener - Chunk finished - File=life_expectancy_years.csv ReadCount=10, WriteCount=10
[Thread-1] INFO ChunkListener - Chunk finished - File=co2_emissions_tonnes_per_person.csv ReadCount=20, WriteCount=20
[Thread-2] INFO ChunkListener - Chunk finished - File=life_expectancy_years.csv ReadCount=20, WriteCount=20
[Thread-1] INFO ChunkListener - Chunk finished - File=co2_emissions_tonnes_per_person.csv ReadCount=30, WriteCount=30
[Thread-2] INFO ChunkListener - Chunk finished - File=life_expectancy_years.csv ReadCount=30, WriteCount=30
[...]
[Thread-2] INFO ChunkListener - Chunk finished - File=life_expectancy_years.csv ReadCount=170, WriteCount=170
[Thread-1] INFO ChunkListener - Chunk finished - File=co2_emissions_tonnes_per_person.csv ReadCount=180, WriteCount=180
[Thread-2] INFO ChunkListener - Chunk finished - File=life_expectancy_years.csv ReadCount=180, WriteCount=180
[Thread-1] INFO ChunkListener - Chunk finished - File=co2_emissions_tonnes_per_person.csv ReadCount=190, WriteCount=190
[Thread-2] INFO ChunkListener - Chunk finished - File=life_expectancy_years.csv ReadCount=190, WriteCount=190
[Thread-1] INFO ChunkListener - Chunk finished - File=co2_emissions_tonnes_per_person.csv ReadCount=194, WriteCount=194
[Thread-1] INFO o.s.b.c.s.AbstractStep - Step: [dataProcessingSingleFile:partition0] executed in 12s47ms
[Thread-2] INFO ChunkListener - Chunk finished - File=life_expectancy_years.csv ReadCount=195, WriteCount=195
[Thread-2] INFO o.s.b.c.s.AbstractStep - Step: [dataProcessingSingleFile:partition1] executed in 12s263ms
[main] INFO o.s.b.c.s.AbstractStep - Step: [dataProcessingMultipleFiles] executed in 12s274ms
[main] INFO o.s.b.c.l.s.SimpleJobLauncher - Job: [SimpleJob: [name=dataProcessing]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED] in 12s287ms
The two files are read and processed at the same time now! And the job now takes about half the time!
Conclusions
In this first part, I explained how to make use of the partitioners in Spring Batch, which can greatly improve the performance of our batch job and keep our architecture clean, easily debuggable and maintainable.
In the next one, I'll share my small library which extends the partitioning functionality for some specific use cases.
Links
GitHub repository for this project with follow-along commits:
https://github.com/lorenzo-milicia/hashnode-spring-batch-partitioning