If you're interested in functional programming, you might also want to checkout my second blog which i'm actively working on!!

Thursday, September 12, 2013

Spring batch demo

In this article I will show you how easy it is to create a batch job by configuring out-of-the-box components. Today we will read a [CSV file] and convert it to an [XML file] and while we are at it we filter out all persons who don't live in New York. I also stick with using annotations to configure my complete application context. Unfortunately this code currently runs into a nullpointer which I expect to be [this issue].
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.pelssers</groupId>
<artifactId>batchdemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<spring.version>3.2.4.RELEASE</spring.version>
</properties>
<dependencies>
<dependency>
<groupId>cglib</groupId>
<artifactId>cglib-nodep</artifactId>
<version>2.2</version>
<scope>compile</scope>
</dependency>
<!-- spring dependencies -->
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>2.2.1.RELEASE</version>
<exclusions>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>${spring.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- functional programming -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>14.0.1</version>
</dependency>
<!-- use in-memory database for batch -->
<dependency>
<groupId>hsqldb</groupId>
<artifactId>hsqldb</artifactId>
<version>1.8.0.10</version>
</dependency>
</dependencies>
</project>
view raw gistfile1.xml hosted with ❤ by GitHub
package com.pelssers.domain;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient;
@XmlRootElement
public class Person {
private String firstName;
private String lastName;
private String email;
private String state;
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
public String getState() {
return state;
}
@XmlTransient
public void setState(String state) {
this.state = state;
}
@Override
public String toString() {
return "Person {firstName: " + firstName + ", lastName: " + lastName
+ ",state: " + state + ", email:" + email + "}";
}
}
view raw gistfile1.java hosted with ❤ by GitHub
package com.pelssers.processor;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.springframework.batch.item.ItemProcessor;
import com.google.common.base.Predicate;
public class FilterProcessor<S> implements ItemProcessor<S,S> {
public static Logger logger = Logger.getLogger(FilterProcessor.class.getName());
private Predicate<S> predicate;
public FilterProcessor(Predicate<S> predicate) {
this.predicate = predicate;
}
public S process(S item) throws Exception {
boolean isValid = predicate.apply(item);
if (!isValid) {
logger.log(Level.INFO, "Skipping " + item.toString());
}
return isValid ? item : null;
}
}
view raw gistfile1.java hosted with ❤ by GitHub
package com.pelssers.configuration;
import javax.sql.DataSource;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
@Configuration
@EnableBatchProcessing
public class TestConfiguration {
@Bean
public DataSource dataSource() {
EmbeddedDatabaseBuilder databaseBuilder = new EmbeddedDatabaseBuilder();
return
databaseBuilder
.addScript("classpath:org/springframework/batch/core/schema-drop-hsqldb.sql")
.addScript("classpath:org/springframework/batch/core/schema-hsqldb.sql")
.setType(EmbeddedDatabaseType.HSQL)
.build();
}
}
view raw gistfile1.java hosted with ❤ by GitHub
package com.pelssers.configuration.jobs;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemStreamWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.ResourceAwareItemWriterItemStream;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.xml.StaxEventItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.oxm.Marshaller;
import org.springframework.oxm.jaxb.Jaxb2Marshaller;
import com.google.common.base.Predicate;
import com.pelssers.configuration.TestConfiguration;
import com.pelssers.domain.Person;
import com.pelssers.processor.FilterProcessor;
@Configuration
@Import(TestConfiguration.class)
public class CsvToXMLJobConfiguration {
/**
* Attention:
* You have to name these beans jobBuilders and stepbuilders respectively.
* See http://docs.spring.io/spring-batch/reference/html/configureJob.html
*/
private static final String XML_FILEPATH_PLACEHOLDER = null;
private static final String CLASSPATH_RESOURCE_PLACEHOLDER = null;
public static final String JOBPARAM_XML_FILEPATH = "xmlFilePath";
public static final String JOBPARAM_CLASSPATH_RESOURCE = "classPathResource";
@Autowired
private JobBuilderFactory jobBuilders;
@Autowired
private StepBuilderFactory stepBuilders;
@Autowired
private TestConfiguration testConfiguration;
@Bean
public Job csvToXmlJob() {
return jobBuilders.get("csvToXmlJob")
.start(step())
.build();
}
@Bean
public Step step() {
//CLASSPATH_RESOURCE_PLACEHOLDER and XML_FILEPATH_PLACEHOLDER will be overriden
//by injected jobparameter using late binding (stepscope)
return stepBuilders.get("step")
.<Person, Person>chunk(50)
.reader(reader(CLASSPATH_RESOURCE_PLACEHOLDER))
.processor(processor())
.writer(writer(XML_FILEPATH_PLACEHOLDER))
.build();
}
@Bean
@StepScope
public FlatFileItemReader<Person> reader(@Value("#{jobParameters[classPathResource]}") String classPathResource) {
FlatFileItemReader<Person> itemReader = new FlatFileItemReader<Person>();
itemReader.setLinesToSkip(1);
itemReader.setLineMapper(lineMapper());
itemReader.setResource(new ClassPathResource(classPathResource));
return itemReader;
}
@Bean
public LineMapper<Person> lineMapper() {
DefaultLineMapper<Person> lineMapper = new DefaultLineMapper<Person>();
DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
//we are only interested in below 4 fields from this CSV file
lineTokenizer.setNames(new String[]{"firstName","lastName", "state","email"});
lineTokenizer.setIncludedFields(new int[]{0,1,6, 10});
BeanWrapperFieldSetMapper<Person> fieldSetMapper = new BeanWrapperFieldSetMapper<Person>();
fieldSetMapper.setTargetType(Person.class);
lineMapper.setLineTokenizer(lineTokenizer);
lineMapper.setFieldSetMapper(fieldSetMapper);
return lineMapper;
}
@Bean
@StepScope
public ItemProcessor<Person, Person> processor() {
return new FilterProcessor<Person>(new IsNewYorkerPredicate());
}
@Bean
@StepScope
public ResourceAwareItemWriterItemStream<Person> writer(@Value("#{jobParameters[xmlFilePath]}") String xmlFilePath) {
StaxEventItemWriter<Person> writer = new StaxEventItemWriter<Person>();
writer.setResource(new FileSystemResource(xmlFilePath));
writer.setRootTagName("result");
writer.setMarshaller(marshaller());
writer.setEncoding("UTF-8");
writer.setOverwriteOutput(true);
return writer;
}
@Bean
public Marshaller marshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setClassesToBeBound(Person.class);
return marshaller;
}
public class IsNewYorkerPredicate implements Predicate<Person> {
public boolean apply(Person person) {
return "NY".equals(person.getState());
}
}
}
view raw gistfile1.java hosted with ❤ by GitHub
package com.pelssers.main;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import com.pelssers.configuration.jobs.CsvToXMLJobConfiguration;
public class CsvToXmlJobLauncher {
/**
* @param args
* @throws JobParametersInvalidException
* @throws JobInstanceAlreadyCompleteException
* @throws JobRestartException
* @throws JobExecutionAlreadyRunningException
*/
public static void main(String[] args) throws JobExecutionAlreadyRunningException,
JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException {
ApplicationContext context = new AnnotationConfigApplicationContext(CsvToXMLJobConfiguration.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(
job,
new JobParametersBuilder()
.addString(CsvToXMLJobConfiguration.JOBPARAM_XML_FILEPATH, "c:/tmp/persons.xml")
.addString(CsvToXMLJobConfiguration.JOBPARAM_CLASSPATH_RESOURCE, "500.csv")
.toJobParameters());
}
}
view raw gistfile1.java hosted with ❤ by GitHub

1 comment: