This sample range partitioner could be used in Spring Batch application to partition data based on the Grid size and allocate them to individual executors.
[java]
package com.hashfold.spring.batch;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
public class RangePartitioner implements Partitioner {
private long start;
private long end;
public final void setStart(long start) {
this.start = start;
}
public final void setEnd(long end) {
this.end = end;
}
public RangePartitioner() {
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
long rangeSize = end - start;
if (rangeSize <= 0)
return Collections.<String, ExecutionContext> emptyMap();
int numberOfIntervals = gridSize;
long sizeOfSmallSublists = rangeSize / numberOfIntervals;
long sizeOfLargeSublists = sizeOfSmallSublists + 1;
long numberOfLargeSublists = rangeSize % numberOfIntervals;
long numberOfSmallSublists = numberOfIntervals - numberOfLargeSublists;
Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
long numberOfElementsHandled = 0;
for (long i = 0; i < numberOfIntervals; i++) {
long size = i < numberOfSmallSublists ? sizeOfSmallSublists
: sizeOfLargeSublists;
long threadSeq = i + 1;
long startId = numberOfElementsHandled;
long endId = numberOfElementsHandled + size;
/*
* happens when range is less than the grid size. e.g. start=0, end
* = 2 and gridSize=3
*/
if ((endId - startId) <= 0)
continue;
ExecutionContext value = new ExecutionContext();
value.putLong("start", startId);
value.putLong("end", endId);
value.putString("name", "Thread-" + threadSeq);
result.put("partition" + threadSeq, value);
System.out.println("Starting : Thread-" + threadSeq + " : "
+ startId + " - " + endId);
numberOfElementsHandled += size;
}
return result;
}
// used only to test some cases. we should use junit and remove this method.
public static void main(String args[]) {
RangePartitioner rp = new RangePartitioner();
rp.setStart(1);
rp.setEnd(11);
rp.partition(3);
}
}
[/java]
Batch configuration change
[xml]
<!-- partitioner job -->
<job id="partitionJob" xmlns="http://www.springframework.org/schema/batch">
<!-- master step, 10 threads (grid-size) -->
<step id="masterStep">
<partition step="slave" partitioner="transactionPartitioner">
<handler grid-size="10" task-executor="taskExecutor" />
</partition>
</step>
</job>
<bean id="taskExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="10" />
<property name="maxPoolSize" value="10" />
</bean>
<bean id="transactionPartitioner" class="com.hashfold.spring.batch.RangePartitioner">
<!-- we should process all records with range from [start, end). Note end is not included. it processes upto end-1-->
<property name="start" value="1" /> <!-- this number is included-->
<property name="end" value="11" /> <!-- this number is excluded-->
<!-- DB query should be executed with "WHERE ID >= :start and ID < :end" clause-->
</bean>
[/xml]