Sunday, May 17, 2015

[Solved] :Spring Batch Stax XML reading job is not ending when out of input

Spring Batch Stax XML reading job is not ending when out of input

I'm using Spring Batch to set up a job that will process a potentially very large XML file. I think I've set it up appropriately, but at runtime I'm finding that the job runs, processes its input, and then just hangs in an executing state (I can confirm by viewing the JobExecution's status in the JobRepository).
I've read through the Batch documentation several times but I don't see any obvious "make the job stop when out of input" configuration that I'm missing.
Here's the relevant portion of my application context:
<job id="pickupDataExtractionJob" xmlns="http://www.springframework.org/schema/batch">
        <step id="read" next="write">
            <tasklet>
                <chunk reader="jdbcItemReader" writer="xmlItemWriter" commit-interval="5" retry-limit="3" >
                    <retryable-exception-classes>
                        <include class="java.lang.Exception"/>
                        <include class="java.sql.SQLException"/>
                        <include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
                    </retryable-exception-classes>
                </chunk>
            </tasklet>
        </step>
        <step id="write" next="rename">
            <tasklet>
                <chunk reader="xmlItemReader" writer="jdbcItemWriter" commit-interval="5"  retry-limit="3" >
                    <retryable-exception-classes>
                        <include class="java.io.FileNotFoundException"/>                       
                        <include class="java.sql.SQLException"/>
                        <include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
                    </retryable-exception-classes>
                </chunk>
            </tasklet>
        </step>
        <step id="rename" next="deleteDir">
            <tasklet ref="renameTaskLet" />
        </step>
        <!-- <step id="email" next="deleteDir">
            <tasklet ref="emailTaskLet" />
        </step> -->
        <step id="deleteDir">
            <tasklet ref="fileDeletingTasklet" />
        </step>
    <listeners>
        <listener ref="processShutdownListener"></listener>
    </listeners> 
    </job>
    <bean id="renameTaskLet"
        class="com.kewill.bluedart.pos.batch.tasklet.SimpleRenameFileTaskletStep" />
    <bean id="emailTaskLet" class="com.kewill.bluedart.pos.batch.tasklet.SendEMailTasklet" />
    <bean id="fileDeletingTasklet" class="com.kewill.bluedart.pos.batch.tasklet.FileDeletingTasklet">
        <property name="directory" value="file:${batch.path}" />
    </bean>

Solution:

I implemented my own "ShipmentXmlItemReader" by referring the "StaxEventItemReader" and change the logic in moveCursorToNextFragment() mthod like below hilighted in green color.



ShipmentXmlItemReader.java
import java.io.InputStream;
import java.util.NoSuchElementException;
import javax.xml.namespace.QName;
import javax.xml.stream.XMLEventReader;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamConstants;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.events.EndElement;
import javax.xml.stream.events.StartElement;
import javax.xml.stream.events.XMLEvent;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.xml.StaxEventItemReader;
import org.springframework.batch.item.xml.StaxUtils;
import org.springframework.batch.item.xml.stax.DefaultFragmentEventReader;
import org.springframework.batch.item.xml.stax.FragmentEventReader;
import org.springframework.core.io.Resource;
import org.springframework.oxm.Unmarshaller;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

public class ShipmentXmlItemReader<T> extends StaxEventItemReader<T>{


private static final Log logger = LogFactory.getLog(StaxEventItemReader.class);

private FragmentEventReader fragmentReader;

private XMLEventReader eventReader;

private Unmarshaller unmarshaller;

private Resource resource;

private InputStream inputStream;

private String fragmentRootElementName;

private boolean noInput;

private boolean strict = true;

private String fragmentRootElementNameSpace;

public ShipmentXmlItemReader() {
setName(ClassUtils.getShortName(ShipmentXmlItemReader.class));
}

/**
* In strict mode the reader will throw an exception on
* {@link #open(org.springframework.batch.item.ExecutionContext)} if the input resource does not exist.
* @param strict false by default
*/
public void setStrict(boolean strict) {
this.strict = strict;
}

@Override
public void setResource(Resource resource) {
this.resource = resource;
}

/**
* @param unmarshaller maps xml fragments corresponding to records to objects
*/
public void setUnmarshaller(Unmarshaller unmarshaller) {
this.unmarshaller = unmarshaller;
}

/**
* @param fragmentRootElementName name of the root element of the fragment
*/
public void setFragmentRootElementName(String fragmentRootElementName) {
this.fragmentRootElementName = fragmentRootElementName;
}

/**
* Ensure that all required dependencies for the ItemReader to run are provided after all properties have been set.
*
* @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
* @throws IllegalArgumentException if the Resource, FragmentDeserializer or FragmentRootElementName is null, or if
* the root element is empty.
* @throws IllegalStateException if the Resource does not exist.
*/
@Override
public void afterPropertiesSet() throws Exception {
Assert.notNull(unmarshaller, "The Unmarshaller must not be null.");
Assert.hasLength(fragmentRootElementName, "The FragmentRootElementName must not be null");
if (fragmentRootElementName.contains("{")) {
fragmentRootElementNameSpace = fragmentRootElementName.replaceAll("\\{(.*)\\}.*", "$1");
fragmentRootElementName = fragmentRootElementName.replaceAll("\\{.*\\}(.*)", "$1");
}
}

/**
* Responsible for moving the cursor before the StartElement of the fragment root.
*
* This implementation simply looks for the next corresponding element, it does not care about element nesting. You
* will need to override this method to correctly handle composite fragments.
*
* @return <code>true</code> if next fragment was found, <code>false</code> otherwise.
*
* @throws NonTransientResourceException if the cursor could not be moved. This will be treated as fatal and
* subsequent calls to read will return null.
*/
protected boolean moveCursorToNextFragment(XMLEventReader reader) throws NonTransientResourceException {
try {
System.out.println(" 1) reader.peek().getEventType() ::"+ reader.peek().getEventType());
if (reader.peek().getEventType() == XMLStreamConstants.END_DOCUMENT) {
return false;
}
while (true) {
while (reader.peek() != null && !reader.peek().isStartElement()) {
if (reader.peek().getEventType() == XMLStreamConstants.END_DOCUMENT) {
return false;
}
reader.nextEvent();
}
if (reader.peek() == null) {
return false;
}
QName startElementName = ((StartElement) reader.peek()).getName();
if (startElementName.getLocalPart().equals(fragmentRootElementName)) {
if (fragmentRootElementNameSpace == null
|| startElementName.getNamespaceURI().equals(fragmentRootElementNameSpace)) {
return true;
}
}
reader.nextEvent();

}
}
catch (XMLStreamException e) {
throw new NonTransientResourceException("Error while reading from event reader", e);
}

}

@Override
protected void doClose() throws Exception {
try {
if (fragmentReader != null) {
fragmentReader.close();
}
if (inputStream != null) {
inputStream.close();
}
}
finally {
fragmentReader = null;
inputStream = null;
}

}

@Override
protected void doOpen() throws Exception {
Assert.notNull(resource, "The Resource must not be null.");

noInput = true;
if (!resource.exists()) {
if (strict) {
throw new IllegalStateException("Input resource must exist (reader is in 'strict' mode)");
}
logger.warn("Input resource does not exist " + resource.getDescription());
return;
}
if (!resource.isReadable()) {
if (strict) {
throw new IllegalStateException("Input resource must be readable (reader is in 'strict' mode)");
}
logger.warn("Input resource is not readable " + resource.getDescription());
return;
}

inputStream = resource.getInputStream();
eventReader = XMLInputFactory.newInstance().createXMLEventReader(inputStream);
fragmentReader = new DefaultFragmentEventReader(eventReader);
noInput = false;

}

/**
* Move to next fragment and map it to item.
*/
@Override
protected T doRead() throws Exception {

if (noInput) {
return null;
}

T item = null;

boolean success = false;
try {
success = moveCursorToNextFragment(fragmentReader);
}
catch (NonTransientResourceException e) {
// Prevent caller from retrying indefinitely since this is fatal
noInput = true;
throw e;
}
if (success) {
fragmentReader.markStartFragment();

try {
@SuppressWarnings("unchecked")
T mappedFragment = (T) unmarshaller.unmarshal(StaxUtils.getSource(fragmentReader));
item = mappedFragment;
}
finally {
fragmentReader.markFragmentProcessed();
}
}

return item;
}

/*
* jumpToItem is overridden because reading in and attempting to bind an entire fragment is unacceptable in a
* restart scenario, and may cause exceptions to be thrown that were already skipped in previous runs.
*/
@Override
protected void jumpToItem(int itemIndex) throws Exception {
for (int i = 0; i < itemIndex; i++) {
try {
readToStartFragment();
readToEndFragment();
} catch (NoSuchElementException e) {
if (itemIndex == (i + 1)) {
// we can presume a NoSuchElementException on the last item means the EOF was reached on the last run
return;
} else {
// if NoSuchElementException occurs on an item other than the last one, this indicates a problem
throw e;
}
}
}
}

/*
* Read until the first StartElement tag that matches the provided fragmentRootElementName. Because there may be any
* number of tags in between where the reader is now and the fragment start, this is done in a loop until the
* element type and name match.
*/
private void readToStartFragment() throws XMLStreamException {
while (true) {
XMLEvent nextEvent = eventReader.nextEvent();
if (nextEvent.isStartElement()
&& ((StartElement) nextEvent).getName().getLocalPart().equals(fragmentRootElementName)) {
return;
}
}
}

/*
* Read until the first EndElement tag that matches the provided fragmentRootElementName. Because there may be any
* number of tags in between where the reader is now and the fragment end tag, this is done in a loop until the
* element type and name match
*/
private void readToEndFragment() throws XMLStreamException {
while (true) {
XMLEvent nextEvent = eventReader.nextEvent();
if (nextEvent.isEndElement()
&& ((EndElement) nextEvent).getName().getLocalPart().equals(fragmentRootElementName)) {
return;
}
}
}

}


No comments:

Post a Comment