I am having issues with restart of local partitioning batch. I am throwing RuntimeException
on 101st processed item. The job fails, but something is going wrong, because on restart, the job continues from 150th item (and not from the 100th item that it should).
Here is the xml-conf
:
<bean id="taskExecutor" class="org.springframework.scheduling.commonj.WorkManagerTaskExecutor" >
<property name="workManagerName" value="springWorkManagers" />
</bean>
<bean id="transactionManager" class="org.springframework.transaction.jta.WebSphereUowTransactionManager"/>
<batch:job id="LocalPartitioningJob">
<batch:step id="masterStep">
<batch:partition step="slaveStep" partitioner="splitPartitioner">
<batch:handler grid-size="5" task-executor="taskExecutor" />
</batch:partition>
</batch:step>
</batch:job>
<batch:step id="slaveStep">
<batch:tasklet transaction-manager="transactionManager">
<batch:chunk reader="partitionReader" processor="compositeItemProcessor" writer="sqlWriter" commit-interval="50" />
<batch:transaction-attributes isolation="SERIALIZABLE" propagation="REQUIRE" timeout="600" />
<batch:listeners>
<batch:listener ref="Processor1" />
<batch:listener ref="Processor2" />
<batch:listener ref="Processor3" />
</batch:listeners>
</batch:tasklet>
</batch:step>
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
<property name="transactionManager" ref="transactionManager" />
<property name="tablePrefix" value="${sb.db.tableprefix}" />
<property name="dataSource" ref="ds" />
<property name="maxVarCharLength" value="1000"/>
</bean>
<bean id="transactionManager" class="org.springframework.transaction.jta.WebSphereUowTransactionManager"/>
<jee:jndi-lookup id="ds" jndi-name="${sb.db.jndi}" cache="true" expected-type="javax.sql.DataSource" />
The splitPartitioner
implements Partitioner
and splits the initial data and saves it to the executionContexts
as lists. The processors call remote ejb's to fetch additional data and the sqlWriter
is just a org.spring...JdbcBatchItemWriter
. PartitionReader code below:
public class PartitionReader implements ItemStreamReader<TransferObjectTO> {
private List<TransferObjectTO> partitionItems;
public PartitionReader() {
}
public synchronized TransferObjectTO read() {
if(partitionItems.size() > 0) {
return partitionItems.remove(0);
} else {
return null;
}
}
@SuppressWarnings("unchecked")
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
partitionItems = (List<TransferObjectTO>) executionContext.get("partitionItems");
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
executionContext.put("partitionItems", partitionItems);
}
@Override
public void close() throws ItemStreamException {
}
}
See Question&Answers more detail:os