Showing posts with label spring. Show all posts
Showing posts with label spring. Show all posts

Wednesday, October 16, 2013

Spring Batch - simple Range Partitioner

This sample range partitioner could be used in Spring Batch application to partition data based on the Grid size and allocate them to individual executors.

[java]

package com.hashfold.spring.batch;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;

public class RangePartitioner implements Partitioner {

private long start;
private long end;

public final void setStart(long start) {
this.start = start;
}

public final void setEnd(long end) {
this.end = end;
}

public RangePartitioner() {
}

@Override
public Map<String, ExecutionContext> partition(int gridSize) {

long rangeSize = end - start;
if (rangeSize <= 0)
return Collections.<String, ExecutionContext> emptyMap();

int numberOfIntervals = gridSize;
long sizeOfSmallSublists = rangeSize / numberOfIntervals;
long sizeOfLargeSublists = sizeOfSmallSublists + 1;
long numberOfLargeSublists = rangeSize % numberOfIntervals;
long numberOfSmallSublists = numberOfIntervals - numberOfLargeSublists;

Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();

long numberOfElementsHandled = 0;

for (long i = 0; i < numberOfIntervals; i++) {

long size = i < numberOfSmallSublists ? sizeOfSmallSublists
: sizeOfLargeSublists;

long threadSeq = i + 1;
long startId = numberOfElementsHandled;
long endId = numberOfElementsHandled + size;

/*
* happens when range is less than the grid size. e.g. start=0, end
* = 2 and gridSize=3
*/
if ((endId - startId) <= 0)
continue;

ExecutionContext value = new ExecutionContext();

value.putLong("start", startId);

value.putLong("end", endId);

value.putString("name", "Thread-" + threadSeq);
result.put("partition" + threadSeq, value);

System.out.println("Starting : Thread-" + threadSeq + " : "
+ startId + " - " + endId);

numberOfElementsHandled += size;
}

return result;
}

// used only to test some cases. we should use junit and remove this method.
public static void main(String args[]) {
RangePartitioner rp = new RangePartitioner();
rp.setStart(1);
rp.setEnd(11);
rp.partition(3);

}

}

[/java]

Batch configuration change

[xml]

<!-- partitioner job -->
<job id="partitionJob" xmlns="http://www.springframework.org/schema/batch">

<!-- master step, 10 threads (grid-size) -->
<step id="masterStep">
<partition step="slave" partitioner="transactionPartitioner">
<handler grid-size="10" task-executor="taskExecutor" />
</partition>
</step>
</job>

<bean id="taskExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="10" />
<property name="maxPoolSize" value="10" />
</bean>

<bean id="transactionPartitioner" class="com.hashfold.spring.batch.RangePartitioner">
<!-- we should process all records with range from [start, end). Note end is not included. it processes upto end-1-->
<property name="start" value="1" /> <!-- this number is included-->
<property name="end" value="11" /> <!-- this number is excluded-->
<!-- DB query should be executed with "WHERE ID >= :start and ID < :end" clause-->
</bean>

[/xml]