상세 컨텐츠

본문 제목

6. 예제3: Spring Batch - Read from flat file and write to DB

Spring/SpringBatch

by husks 2016. 3. 31. 15:25

본문

반응형

이번 예제에서는 Spring Batch의 ItemReader, ItemWriter를 사용하여 파일의 데이터를 읽어 데이터베이스에 저장하는 방법을 알아보도록 하겠습니다.

 

소스 코드는 첨부파일을 확인하세요.

 

예제의 각 파일에 대한 기본적인 설명은 다음과 같습니다.


1. spring-context.xml: Spring Batch Infrastructure 설정.

 

2. com.hjh,batch.bean package 

  •    PlayerBean: 파일의 Player 정보를 매핑할 오브젝트.
  •    OrderBean: 파일의 Order 정보를 매핑할 오브젝트.
3. com.hjh.batch.dao package
  • PlayerDao: Player 정보를 데이터베이스에 저장할 DAO
  • OrderDao: Player 정보를 데이터베이스에 저장할 DAO
4. com.hjh.batch.itemReader package
  • PlayerFieldSetMapper: 파일의 Player 정보를 PlayerBean 오브젝트로 매핑하는 로직이 있는 클래스.
  • OrderFieldSetMapper: 파일의 Order 정보를 OrderBean 오브젝트로 매핑하는 로직이 있는 클래스.
5. com.hjh.batch.itemWriter package
  • PlayerWriter: Player정보의 ItemWriter, PlayerDao를 이용하여 데이터베이스에 저장.
  • OrderWriter: Order 정보의 ItemWriter, OrderDao를 이용하여 데이터베이스에 저장.
6. com.hjh.batch.Launcher - main() 메소드가 있는 클래스.

7. 예제에 사용된 리소스
  • player.csv: 콤마 delimited로 정의된 Player 정보를 담고있는 파일.
  • order.csv:  fixed-length(고정 길이)로 정의된 Order 정보를 담고있는파일.
  • create_table.sql: player, order 테이블 생성 sql
  • spring_batch_table_creation.sql: 데이터베이스 repository에 필요한 테이블 및 시퀀스 생성 sql

 예제 프로세스에 대한 설명은 다음과 같습니다.


Spring Batch에 사용되는 파일은 일반적으로 세 가지 유형으로 분류됩니다.

 

, ; |로 각각 데이로 분류된 정보로 이루어져 있는 파일.

② 각각의 데이터가 일률적으로 고정 길이로 이루어져 있는 파일.

③ ①번 ②번의 유형이 혼합되어 있는 파일.

 

이번 예제에서는 ①번 ②번의 유형의 파일을 읽어 데이터베이스에 저장하는 것을 다룹니다.

③의 유형에 대해서는 다음 글에 다루도록 하겠습니다.

 

player.csv는 Player의 정보(id, lastName, firstName, position, birthYear, debutYear)가 다음과 같이 ,로 분류되어 있습니다.

 

AbduKa00,Abdul-Jabbar,Karim,rb,1974,1996

AbduRa00,Abdullah,Rabih,rb,1975,1999

AberWa00,Abercrombie,Walter,rb,1959,1982

AbraDa00,Abramowicz,Danny,wr,1945,1967

AdamBo00,Adams,Bob,te,1946,1969

AdamCh00,Adams,Charlie,wr,1979,2003 

 

order.csv는 Order의 정보(isin, quantity, price, customer)가 다음과 같이 고정 길이로 분류되어 있습니다.

 

UK21341EAH4121131.11customer1

UK21341EAH4221232.11customer2

UK21341EAH4321333.11customer3

UK21341EAH4421434.11customer4

UK21341EAH4521535.11customer5

 

예제에서는 하나의 Job이 두 개의 Step으로 구성되어 있는데, 첫번째 Step은 player.csv를 읽어 데이터베이스에 저장하는 작업을 수행하며, 두번째 Step은 order.csv를 읽어 데이터베이스에 저장하는 작업을 수행합니다.

 

예제에서는 JobRepsository로 MySql  데이터베이스를 이용합니다. 따라서, 첨부파일 소스코드에 있는 <spring_batch_table_creation.sql>를 이용하여 필요한 테이블을 미리 생성하여야 합니다.

 

JobRepository의 데이터베이스 meta-data schema에 대한 상제 내용은 Spring Batch Documentation Appendix B를 참조하시길 바랍니다.

http://static.springsource.org/spring-batch/reference/html/metaDataSchema.html

 

 

1. spring-context.xml 설정


 <?xml version="1.0" encoding="UTF-8"?>

 

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance

xmlns:aop="http://www.springframework.org/schema/aop"

xmlns:tx="http://www.springframework.org/schema/tx

xmlns:batch="http://www.springframework.org/schema/batch"

xmlns:context="http://www.springframework.org/schema/context"

xmlns:util="http://www.springframework.org/schema/util"

xsi:schemaLocation="

    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd

    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd    

    http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd

    http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd

    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd

    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xs">

    

    <tx:annotation-driven transaction-manager="transactionManager" />

    

    <!-- 1. Define MySQL DataSource  -->

    <bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">

     <property name="driverClassName" value="com.mysql.jdbc.Driver" />

     <property name="url" value="jdbc:mysql://127.0.0.1:3306/test" />

     <property name="username" value="root" />

     <property name="password" value="will" />    

    </bean>

    

    <!-- 2. Define DataSource TransactionManager -->

    <bean id="transactionManager"

class="org.springframework.jdbc.datasource.DataSourceTransactionManager">

<property name="dataSource" ref="dataSource" />

</bean>

    <!-- 3. Job Repository - MySQL Database Repository with MySQL DataSource-->

    <bean id="jobRepository"

class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">

<property name="transactionManager" ref="transactionManager" />

<property name="dataSource" ref="dataSource" />

<property name="tablePrefix" value="BATCH_" />

<property name="isolationLevelForCreate" value="ISOLATION_SERIALIZABLE"/>

<property name="databaseType" value="MySql" />

</bean>

    

    <!-- 4. Launch Job from a Repository -->

    <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">

     <property name="jobRepository" ref="jobRepository" />

    </bean>

    

    <!-- 5. Define JdbcTemplate to access Database -->

    <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">

     <property name="dataSource" ref="dataSource" />

    </bean> 

    

    <!-- 6. Define FlatFileItemReader to read input from delimited csv file -->

    

 <bean id="delimitedFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader">

     <property name="resource" value="classpath:player.csv" />

    

     <property name="lineMapper">

     <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">

     <property name="lineTokenizer">

     <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">

     <property name="names" value="id, lastName, firstName, position, birthYear, debutYear" />

     <property name="delimiter" value="," /> <!-- default -->    

     </bean>

     </property>

     <property name="fieldSetMapper">

     <bean class="com.hjh.batch.itemReader.PlayerFieldSetMapper" />

     </property>

     </bean>

     </property>

    </bean>    

    

    

    <!-- 7. Define FlatFileItemReader to read input from fixed-length csv file -->

 <bean id="fixedFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader">

     <property name="resource" value="classpath:order.csv" />

    

     <property name="lineMapper">

     <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">

     <property name="lineTokenizer">

     <bean class="org.springframework.batch.item.file.transform.FixedLengthTokenizer">

     <property name="names" value="isbn, quantity, price, customer" />

     <property name="columns" value="1-12, 13-15, 16-20, 21-29" />   

     </bean>

     </property>

     <property name="fieldSetMapper">

     <bean class="com.hjh.batch.itemReader.OrderFieldSetMapper" />

     </property>

     </bean>

     </property>    

    </bean>

   

    <!-- 8. define database dao for writer -->

    <bean id="playerDao" class="com.hjh.batch.dao.PlayerDao">

     <property name="jdbcTemplate" ref="jdbcTemplate" />

    </bean>

    

    <bean id="orderDao" class="com.hjh.batch.dao.OrderDao">

     <property name="jdbcTemplate" ref="jdbcTemplate" />

    </bean>

    

    <!-- 9. define ItemWriter for PlayerBean and GameBean -->

    <bean id="dbPlayerWriter" class="com.hjh.batch.itemWriter.PlayerWriter">

     <property name="dao" ref="playerDao" />

    </bean>

    

    <bean id="dbOrderWriter" class="com.hjh.batch.itemWriter.OrderWriter">

     <property name="dao" ref="orderDao" />

    </bean>       

       

    <!-- 10. Finally Job Defined -->

    <batch:job id="simpleJob">    

     <batch:step id="delimtedFileToDBStep" next="fiexedFileToDBStep">

     <batch:tasklet>

     <batch:chunk reader="delimitedFileItemReader" writer="dbPlayerWriter" commit-interval="2"/>

     </batch:tasklet>

     </batch:step>

     <batch:step id="fiexedFileToDBStep">

     <batch:tasklet>

     <batch:chunk reader="fixedFileItemReader" writer="dbOrderWriter" commit-interval="2"/>

     </batch:tasklet>

     </batch:step>

    </batch:job>       

    

</beans>


 

1. 데이터베이스에 CRUD 작업을 위한 'DriverManagerDataSource'를 설정합니다. 예제에서는 MySql 데이터베이스를 사용하였습니다.

 

2. DatasourceTransactionManager를 설정합니다.

 

3. Spring Batch JobRepsitory를 설정합니다. 예제에서는 비즈니스 데이터(PlayerBean, OrderBean) CRUD 수행을 위한 DataSource와 TransactionManager를 사용하고 있습니다.

JobRepsitory를 위한 별도의 DataSource와 TransactionManager를 사용할 수도 있습니다.

 

4. 실제로 Job을 실행하기 위한 JobLauncher를 설정합니다. Job을 실행하기 위해 유효한 JobExecution 정보을 업데이트하기 위한 JobRepository를 셋팅합니다.

 

5. PlayerDao와 OrderDao에서 사용할 JdbcTemplate을 설정합니다.

 

6. 이번 예제에서 핵심내용인 파일을 읽기 위한 ItemReader를 설정합니다. player.csv 파일을 읽기 위하여 Spring Batch에 내장되어있는 FlatFileItemReader를 ItemReader로 사용합니다. 각각의 설정에 대한 설명은 다음과 같습니다.


6.1. 주어진 input 파일을 읽고 파싱하기 위해 FlatFileReader를 설정합니다.

FlatFileReader에는 input 파일의 위치, 파싱한 String line을 오프젝트로 변환할 LineMapper을 설정합니다.

 

 <bean id="delimitedFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader">

<property name="resource" value="classpath:player.csv" />


<property name="lineMapper">

. . . . 

</property>


 

6.2. LineMapper로 Spring Batch이 내장 구현체인 DefaultLineMapper를 설정합니다.

DefaultLineMapper에 LineToTokenizer와 FieldSetMapper를 설정합니다.

 

LineToTokenizer: String line을  LineTokenizer#tokenize() method에 전달하여 FieldSet 오브젝트로 변환하는 역할을 합니다.

FieldSetMapper: LineToTokenizer에 의해 리턴된 FieldSet 오브젝트를 PlayerBean 오브젝트로 변환하는 역할을 합니다.

 

 <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">

<property name="lineTokenizer">

       . . . . .

</property>

<property name="fieldSetMapper">

      . . . . . .

</property>

 </bean>


6.3. player.csv를 delimiter를 구분된 데이터의 집합체임으로, LineToTokenizer로 DelimitedLineTokenizer을 사용합니다.

DelimitedLineTokenizer에 파싱된 각각의 데이터에 대한 meta 정보 및 delimiter를 설정합니다.


 <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">

  <property name="names" value="id, lastName, firstName, position, birthYear, debutYear" />

  <property name="delimiter" value="," /> <!-- default -->    

 </bean>


6.4. 마지막으로 FieldSet 을 PlayerBean 오브젝트로 매핑할 FieldSetMapper를 DefaultLineMapper에 다음과 같이 설정합니다.

예제에서 사용된 PlayerFieldSetMapper는 FieldSetMapper 인터페이스 구현체입니다.


 <property name="fieldSetMapper">

  <bean class="com.hjh.batch.itemReader.PlayerFieldSetMapper" />

 </property>


PlayerFieldSetMapper는 FieldSet으로 부터 파싱된 데이터를 읽기 위해, DelimitedLineToTokenizer의 names 엘리먼트에 정의된 값을 사용하는 것을 알 수 있습니다.


 package com.hjh.batch.itemReader;

 

import org.springframework.batch.item.file.mapping.FieldSetMapper;

import org.springframework.batch.item.file.transform.FieldSet;

import org.springframework.validation.BindException;

 

import com.hjh.batch.bean.PlayerBean;

 

public class PlayerFieldSetMapper implements FieldSetMapper<PlayerBean>{

 

@Override

public PlayerBean mapFieldSet(FieldSet fs) throws BindException {

if(fs == null){

return null;

}

 

PlayerBean player = new PlayerBean();

player.setId(fs.readString("id"));

player.setLastName(fs.readString("lastName"));

player.setFirstName(fs.readString("firstName"));

player.setPsoition(fs.readString("position"));

player.setBirthYear(fs.readInt("birthYear"));

player.setDebutYear(fs.readInt("debutYear"));

return player;

}

 



7. order.csv파일을 읽기 위하여 Spring Batch에 내장되어있는 FlatFileItemReader를 ItemReader로 사용합니다. 6번의 player.csv를 일기 위한 FlatFileItemReader 설정과 같으면, 단지 LineToTokenizer설정과 FieldSetMapper의 정의만 틀립니다.


order.csv는 고정길이 데이터의 집합체임으로 LineTokenizer로 FiexedLengthTokenizer을 사용합니다.


 <property name="lineTokenizer">

  <bean class="org.springframework.batch.item.file.transform.FixedLengthTokenizer">

     <property name="names" value="isbn, quantity, price, customer" />

     <property name="columns" value="1-12, 13-15, 16-20, 21-29" />   

  </bean>

 </property>


<OrderFieldSetMapper>

 package com.hjh.batch.itemReader;

 

import org.springframework.batch.item.file.mapping.FieldSetMapper;

import org.springframework.batch.item.file.transform.FieldSet;

import org.springframework.validation.BindException;

 

import com.hjh.batch.bean.OrderBean;

 

public class OrderFieldSetMapper implements FieldSetMapper<OrderBean>{

 

@Override

public OrderBean mapFieldSet(FieldSet fs) throws BindException {

if(fs == null){

return null;

}

OrderBean order = new OrderBean();

order.setIsbn(fs.readString("isbn"));

order.setQuantity(fs.readInt("quantity"));

order.setPrice(fs.readDouble("price"));

order.setCustomer(fs.readString("customer"));

return order;

}

 

}


8. PlayerDao와 OrderDao를 설정합니다.


9. PlayerWrtier와 OrderWriter를 설정합니다.


10. 마지막으로 실행할 Job을 설정합니다.



2. PlayerDAO와 OrderDao를 작성

 

 package com.hjh.batch.dao;

 

import java.sql.PreparedStatement;

import java.sql.SQLException;

 

import org.springframework.jdbc.core.JdbcTemplate;

import org.springframework.jdbc.core.PreparedStatementSetter;

import org.springframework.transaction.annotation.Propagation;

import org.springframework.transaction.annotation.Transactional;

 

import com.hjh.batch.bean.PlayerBean;

 

public class PlayerDao {

 

private JdbcTemplate jdbcTemplate;

public PlayerDao(){

}

public void setJdbcTemplate(JdbcTemplate jdbcTemplate){

this.jdbcTemplate = jdbcTemplate;

}

@Transactional(propagation = Propagation.REQUIRED)

public void save(final PlayerBean player){

String sql = "insert into test.player(player_id, first_name, last_name, position, birth_year, debut_year) values(?,?,?,?,?,?)";

jdbcTemplate.update(sql, new PreparedStatementSetter(){

public void setValues(PreparedStatement stmt) throws SQLException{

stmt.setString(1,  player.getId());

stmt.setString(2,  player.getFirstName());

stmt.setString(3,  player.getLastName());

stmt.setString(4, player.getPsoition());

stmt.setInt(5,  player.getBirthYear());

stmt.setInt(6,  player.getDebutYear());

}

});

}

}

 

 

 package com.hjh.batch.dao;

 

import java.sql.PreparedStatement;

import java.sql.SQLException;

 

import org.springframework.jdbc.core.JdbcTemplate;

import org.springframework.jdbc.core.PreparedStatementSetter;

import org.springframework.transaction.annotation.Propagation;

import org.springframework.transaction.annotation.Transactional;

 

import com.hjh.batch.bean.OrderBean;

 

public class OrderDao {

 

private JdbcTemplate jdbcTemplate;

public OrderDao(){

}

public void setJdbcTemplate(JdbcTemplate jdbcTemplate){

this.jdbcTemplate = jdbcTemplate;

}

@Transactional(propagation = Propagation.REQUIRED)

public void save(final OrderBean order){

String sql = "insert into test.order(isbn, quantity, price, customer) values(?, ?, ?, ?)";

jdbcTemplate.update(sql, new PreparedStatementSetter(){

public void setValues(PreparedStatement stmt) throws SQLException{

stmt.setString(1,  order.getIsbn());

stmt.setInt(2,  order.getQuantity());

stmt.setDouble(3,  order.getPrice());

stmt.setString(4, order.getCustomer());

}

});

}

}

 

3. PlayerWriter와 OrderWriter를 작성

 

 package com.hjh.batch.itemWriter;

 

import java.util.List;

 

import org.springframework.batch.item.ItemWriter;

 

import com.hjh.batch.bean.PlayerBean;

import com.hjh.batch.dao.PlayerDao;

 

public class PlayerWriter implements ItemWriter<PlayerBean>{

 

private PlayerDao dao;

public void setDao(PlayerDao dao){

this.dao = dao;

}

@Override

public void write(List<? extends PlayerBean> players) throws Exception {

for(PlayerBean player : players){

dao.save(player);

}

System.out.println("Players Writed To DB: " + players + "\n");

}

 

}

 

 package com.hjh.batch.itemWriter;

 

import java.util.List;

 

import org.springframework.batch.item.ItemWriter;

 

import com.hjh.batch.bean.OrderBean;

import com.hjh.batch.dao.OrderDao;

 

public class OrderWriter implements ItemWriter<OrderBean>{

 

private OrderDao dao;

public void setDao(OrderDao dao){

this.dao = dao;

}

@Override

public void write(List<? extends OrderBean> orders) throws Exception {

for(OrderBean order : orders){

dao.save(order);

}

System.out.println("Orders Writed To DB: " + orders + "\n");

}

}

 

4. PlayerBean과 OrderBean 작성


 package com.hjh.batch.bean;

 

import java.io.Serializable;

 

public class PlayerBean implements Serializable{

/**

 * 

 */

private static final long serialVersionUID = -5330794101303450135L;

 

private String id;

private String lastName;

private String firstName;

private String psoition;

private int birthYear;

private int debutYear;

public PlayerBean(){

}

 

// setter/getter 메소드


 package com.hjh.batch.bean;

 

import java.io.Serializable;

 

public class OrderBean implements Serializable{

 

/**

 * 

 */

private static final long serialVersionUID = -8382772480142873117L;

private String isbn;

private int quantity;

private double price;

private String customer;

public OrderBean(){

}

 

// setter/getter 메소드


 

5. 실행.

 

 package com.hjh.batch;

 

import org.springframework.batch.core.launch.support.CommandLineJobRunner;

 

public final class Launcher {

public static void main(String[] agrs){

try{

CommandLineJobRunner.main(new String[]{"spring-context.xml", "simpleJob"}); 

}catch(Exception e){

 System.out.print("Launcher has failed: " + e.getMessage());

         System.exit(-1);

}

}

 

}

 

 

6. 결과.


결과를 확인하면, Spring Batch의 Chunk-Oriented Processing의 의해 ItemReader, ItemProcessor에 의해 하나의 데이터가 처리되고, commit-interval=2가 되었을때 ItemWrtier에 의해 2개의 데이터가 데이터베이스에 저장되었음을 알 수 있습니다.

 

 Players Writed To DB: [PlayerBean [id=AbduKa00, lastName=Abdul-Jabbar, firstName=Karim, psoition=rb, birthYear=1974, debutYear=1996], PlayerBean [id=AbduRa00, lastName=Abdullah, firstName=Rabih, psoition=rb, birthYear=1975, debutYear=1999]]

 

Players Writed To DB: [PlayerBean [id=AberWa00, lastName=Abercrombie, firstName=Walter, psoition=rb, birthYear=1959, debutYear=1982], PlayerBean [id=AbraDa00, lastName=Abramowicz, firstName=Danny, psoition=wr, birthYear=1945, debutYear=1967]]

 

Players Writed To DB: [PlayerBean [id=AdamBo00, lastName=Adams, firstName=Bob, psoition=te, birthYear=1946, debutYear=1969], PlayerBean [id=AdamCh00, lastName=Adams, firstName=Charlie, psoition=wr, birthYear=1979, debutYear=2003]]

 

 

Orders Writed To DB: [OrderBean [isbn=UK21341EAH41, quantity=211, price=31.11, customer=customer1], OrderBean [isbn=UK21341EAH42, quantity=212, price=32.11, customer=customer2]]

 

Orders Writed To DB: [OrderBean [isbn=UK21341EAH43, quantity=213, price=33.11, customer=customer3], OrderBean [isbn=UK21341EAH44, quantity=214, price=34.11, customer=customer4]]

 

Orders Writed To DB: [OrderBean [isbn=UK21341EAH45, quantity=215, price=35.11, customer=customer5]]


출처: http://willygwu2003.blog.me/130171676403

반응형

관련글 더보기

댓글 영역