상세 컨텐츠

본문 제목

5. 예제2: Spring Batch Job HelloWorld - ItemReader,ItemProcessor,ItemWriter

Spring/SpringBatch

by husks 2016. 3. 31. 15:24

본문

반응형

이번 글에서는 Spring Batch에서 Step의 구성 요소인 ItemReader, ItemProcessor, ItemWriter가 어떻게 작동하는지 알아보도록 하겠습니다.

 

예제의 각 파일에 대한 기본적인 설명은 다음과 같습니다 (소스 코드는 첨부 파일을 다운로드 받으세요)


1. spring-context.xml: Spring Batch Infrastructure 설정.


2. com.hjh.batch.CustomItemReader: ItemReader 인터페이스의 커스텀 구현체


    예제에서 4개의 데이터가 설정으로 입력됩니다.

 

       <property name="itemList" >

            <list>

                <value>***Steve</value>

                <value>***Mike</value>

                <value>Joseph</value>

                <value>+++Juily</value>

            </list>

        </property>


3. com.hjh.batch.CustomItemProcessor: ItemProcessor 인터페이스의   커스텀 구현체


    예제에서 CutomItemProcessor는 다음의 작업을 합니다.


    이름앞에 *** 특수 기호로 시작하는 문자는 *** 대신에  'Mr."로 변경합니다.

    이름앞에 +++ 특수 기호로 시작하는 문자는 +++ 대신네 'Mrs.'로 변경합니다.

    그리고, 아무 특수 기호가 없는 문자는 (Joseph) 유효하지 않은 데이터로 가정하고 필터링 합니다.


4. com.hjh.batch.CustomItemWriter: ItemWriter 인터페이스의   커스텀 구현체

 

    CustomItemReader, CustomItemProcessor에 의해 처리된 최종 아이템이 콜솔에 출력됩니다.

 

5. com.hjh.batch.JobRuntime - 커스텀 Job 실행 클래스.


6. com.hjh.batch.Launcher - main() 메소드가 있는 클래스.

 

  

 

1. spring-context.xml 설정


 <?xml version="1.0" encoding="UTF-8"?>

 

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance

xmlns:aop="http://www.springframework.org/schema/aop"

xmlns:tx="http://www.springframework.org/schema/tx

xmlns:batch="http://www.springframework.org/schema/batch"

xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="

    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd

    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd

    http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd

    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">

    

    <bean id="transactionManager"

class="org.springframework.batch.support.transaction.ResourcelessTransactionManager">

</bean>

<!-- 1. Job Repository - IN-MEMORY Repository -->

    <bean id="jobRepository"

class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">

<property name="transactionManager" ref="transactionManager" />

</bean>

    

    <!-- 2. Launch Job from a Repository -->

    <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">

     <property name="jobRepository" ref="jobRepository" />

    </bean>

    

    <!-- 3. Define Custom ItemReader -->

    <bean id="customReader" class="com.hjh.batch.CustomItemReader">

        <property name="itemList" >

            <list>

                <value>***Steve</value>

                <value>***Mike</value>

                <value>Joseph</value>

                <value>+++Juily</value>

            </list>

        </property>

    </bean>

 

    <!-- 4. Define Custom ItemProcessor -->

    <bean id="customProcessor" class="com.hjh.batch.CustomItemProcessor" />

 

    <!-- 5. Define Custom ItemWriter -->

    <bean id="customWriter" class="com.hjh.batch.CustomItemWriter" />

        

    <!-- 6. Finally Job Defined -->

    <batch:job id="simpleJob">    

     <batch:step id="readWrtieStep">

     <batch:tasklet>

     <batch:chunk reader="customReader" processor="customProcessor" writer="customWriter" commit-interval="2"/>

     </batch:tasklet>

     </batch:step>

    </batch:job>

    

    <!-- 7. Custom Job Launcher Class -->

    <bean id="runtime" class="com.hjh.batch.JobRuntime">

     <property name="jobLauncher" ref="jobLauncher" />

     <property name="job" ref="simpleJob" />

    </bean>    

    

    

</beans>


1. JobRepsitory를 설정합니다. 예제에서는 In-Memory persistent store을 사용합니다. 이때 TransactionManager로 ResourcelessTransactionManager를 사용합니다. 실제 프로덕션 확경에서는 jdbc persistent store 사용이 바람직합니다.

물론 비즈니스 환경에 문제가 없다면 in-momory persistent store를 사용하셔도 무방합니다.

 

2. 실제로 Job을 실행하기 위한 JobLauncher를 설정합니다. Job을 실행하기 위해 유효한 JobExecution 정보을 업데이트하기 위한 JobRepository를 셋팅합니다.

 

3. CustomItemReader를 정의합니다. 예제에서는 실제 파일 또는 데이터베이스에서 데이터를 읽는 대신, 리스트 형식으로 데이터를 설정하였습니다.

 

4. CustomItemProcessor를 정의합니다.

 

5. CustumItemWriter를 정의합니다.

 

6. JobLauncher에 의해 실행될 Job을 정의합니다.

   Job에는 하나의 Step이 설정되고, Step에는 3,4,5번에서 정의된 CustomItemReader, CustomItemProcessor, CustomItemWriter를 설정      합니다.

 

   'commint-interval=2'로 설정함으로써, ItemReader가 2개의 아이템을 읽을 때마다, ItemWriter에 전달되고 저당되게 됩니다.(예제에서는       단순히 콜솔 출력).

 

7. 최종적으로 Job을 메인메소드에서 실행하기 위한 커스텀 launcher를 설정합니다.


2. CustomItemReader


 package com.hjh.batch;

 

import java.util.List;

 

import org.springframework.batch.item.ItemReader;

import org.springframework.batch.item.NonTransientResourceException;

import org.springframework.batch.item.ParseException;

import org.springframework.batch.item.UnexpectedInputException;

 

public class CustomItemReader implements ItemReader<String>{

private int index = 0;

private List<String> itemList;

 

@Override

public String read() throws Exception, UnexpectedInputException,

ParseException, NonTransientResourceException {

if(index < itemList.size()){

String str = itemList.get(index++);

System.out.println("Read[ " + index + " ] = " + str);

return str;

}else{

return null;

}

}

 

public List<String> getItemList() {

return itemList;

}

 

public void setItemList(List<String> itemList) {

this.itemList = itemList;

}

 

}


Step에 의해 read() 메소드는 한번씩 호출됩니다. 예제에서는 호출이 발생할 때마다, itemList의 다음 값을 읽어 리턴합니다.

더이상 값이 없는 경우(index >= itemList.size() 인 경우)에는 null 값을 리턴합니다.

예제에서 변수 index가 static하지 않은 이유는, ItemReader, ItemWriter가 Step에 의해 stateful 한 상태로 유지되기 때문입니다.



3. CustomItemProcessor


 package com.hjh.batch;

 

 

import org.springframework.batch.item.ItemProcessor;

 

public class CustomItemProcessor implements ItemProcessor<String, String>{

 

@Override

public String process(String input) throws Exception {

if (input.contains("***")) {

            input = input.substring(3, input.length());

            input = "Mr. " + input;

        } else if (input.contains("+++")) {

            input = input.substring(3, input.length());

            input = "Mrs. " + input;

        } else

            return null;

        System.out.println("Process : " + input);

        

        return input;

}

 

}


ItemReader에 의해 Step으로 리턴된 값은 ItemProcessor로 전달됩니다.

예제에서는 넘어온 데이터를 변경하고, 필터링하는 로직이 적용되어 있습니다.




4. CustomItemWriter


 package com.hjh.batch;

 

import java.util.List;

 

import org.springframework.batch.item.ItemWriter;

 

public class CustomItemWriter implements ItemWriter<String>{

 

@Override

public void write(List<? extends String> input) throws Exception {

System.out.println("Write: " + input + "\n");

}

 

}


Step에서 넘어온 결과값을 단순히 콘솔에 출력합니다. 실제 프로덕션 환경에서는 파일 또는 데이터베이스에 저장하게 됩니다.




5. JobRuntime


 package com.hjh.batch;

 

import org.springframework.batch.core.Job;

import org.springframework.batch.core.JobExecution;

import org.springframework.batch.core.JobParameters;

import org.springframework.batch.core.JobParametersBuilder;

import org.springframework.batch.core.launch.JobLauncher;

 

public class JobRuntime {

private JobLauncher jobLauncher;

private Job job;

public JobRuntime(){

}

public void setJobLauncher(JobLauncher jobLauncher){

this.jobLauncher = jobLauncher;

}

public void setJob(Job job){

this.job = job;

}

public void start() throws Exception{

        

        JobParameters jobParameters = new JobParametersBuilder().addLong("batch-date",

                System.currentTimeMillis()).toJobParameters();

        JobExecution exec = jobLauncher.run(job, jobParameters);

        

        System.out.println("Exit Status: " + exec.getStatus());

}

}


메인 메소드에서 start() 메소드를 호출하면 Job 이 실행됩니다.

 

 

6. 실행


다음과 같이 메인 메소드에서 커스텀 Job Launcher를 실행합니다.


  public static void main(String[] args)

    {

 

        try {        

          Launcher.appContext = new ClassPathXmlApplicationContext(

          SPRING_CONTEXT_CONFIG_FILE_PATH);

 

          JobRuntime runtime = (JobRuntime) appContext.getBean("runtime");

        

          runtime.start();

           

        }

        catch (Exception e) {

            System.out.print("Launcher has failed: " + e.getMessage());

            System.exit(-1);

        }

    }

 

 

7. 결과


결과를 확인하면, ItemReader, ItemProcessor에 의해 하나의 데이터가 처리되고, commit-interval=2가 되었을때 ItemWrtier에 의해 2개의 데이터가 출력되었음을 알 수 있습니다.

 

그리고 3, 4번째 아이템 중 하나가 ItemProcessor에 의해 필터링 되었음을 알 수 있습니다.(ItemWriter에 하나의 아이템만 전달됨).

 

Read[ 1 ] = ***Steve

Read[ 2 ] = ***Mike

Process : Mr. Steve

Process : Mr. Mike

Write: [Mr. Steve, Mr. Mike]

 

Read[ 3 ] = Joseph

Read[ 4 ] = +++Juily

Process : Mrs. Juily

Write: [Mrs. Juily]



Chunk-Oriented Processing 개념

 

Chunk-Oriented Processing 은 Spring Batch 2.0 부터 지원되는 개념으로 ItemReader가 하나의 아이템을 읽고, ItemProcessor가 전달된 그 하나의 아이템을 처리합니다. 이렇게 ItemReader, ItemProcessor가 처리한 아이템이 'chunk' 로 묶여져 ItemWriter로 전달되게 됩니다.

이때, ItemWriter에 전달될 묶음의 데이터 수는 commit-interval 이라는 엘리먼트에 의해 설정됩니다.

 

<Chunk-Oriented Processing Sequence Diagram  Illustration>



 

 

<Chunk-Oriented Processing Code Illustration>

List items = new Arraylist();
for(int i = 0; i < commitInterval; i++){
    Object item = itemReader.read()
    Object processedItem = itemProcessor.process(item);
    items.add(processedItem);
}
itemWriter.write(items);


반응형

관련글 더보기

댓글 영역