카테고리 없음

스프링 배치(작성 중..)

se0nghyun2 2024. 3. 22. 18:23

 

들어가기 앞서...

현재 프로젝트를 진행 도중 배치 부분이 필요하여 정리한다.

회원가입 시 이메일 발송을 비동기 방식으로 처리하고 있으며, 메일 발송 내역을 저장하고 있다.

메일 발송 내역 테이블 내엔 해당 메일 발송 성공 유무를 포함하고 있다.

따라서, 실패한 메일 발송 데이터를 대상으로 매일 한 번씩 추출하여 재발송 진행하도록 개발해보려 한다.


Spring Batch

스프링 배치는 대용량 레코드 처리에 필수적인 기능을 제공한다.

Spring Batch에서 배치가 실패하여 작업을 재시작 하게 된다면 처음이 아닌 실패지점부터 실행한다.

또한 중복 실행을 막기 위하여 성공 이력 있는 batch는 동일한 parameters로 실행 시 Exception이 발생하게 된다.

 

용어 정리

Job

배치 처리 과정을 하나의 단위로 만들어 놓은 객체이다.

Job은 여러 가지 Step으로 구성될 수 있으며, 순차적으로 Step을 수행하며 처리한다. 

 

JobInstance

Job의 실행단위를 나타낸다. Job 실행 시 하나의 JobInstance 생성되게 된다.

 

JobParameter

각각의 JobInstance 은 이 JobParameter 객체로 구분한다.

구별 외에도 JobInstance에 전달되는 매개변수 역할도 하고 있다.

또한 String,Double,Long,Date 4가지 형식만 지원하고 있다.

 

JobExecution

JobInstance의 '실행 시도'에 대한 객체이다.

예를 들어, 금일 실행한 JobInstance가 실패하여 재실행하여도 동일한 JobInstance를 실행시키지만

이 2번의 실행에 대한 JobExecution은 개별로 생기게 된다.

 

Step

Job의 배치처리를 정의하고 순차적인 단계를 캡슐화한다.

실제 처리하는 로직을 담고 있다.

 

StepExecution

Step 실행 시도에 대한 객체를 나타낸다.

 

ExecutionContext

Job에서 데이터를 공유할 수 있는 데이터 저장소이다.

 

JobRepository

위에서 언급한 모든 배치 처리 정보를 담고 있는 매커니즘이다.

Job 실행 시 JobRepository에 JobExecution과 StepExecution을 생성하게 되며,

Execution 정보들을 저장하고 조회하며 사용하게 된다.

 

JobLauncher

Job과 JobParameters를 사용하여 Job을 실행하는 객체이다.


Spring Batch 간단 사용

 

Tasklet

1. 단일 Step

@Slf4j
@Configuration
@RequiredArgsConstructor
public class JobConfig {

    @Bean
    public Job newTestJob(PlatformTransactionManager transactionManager, JobRepository jobRepository) {
        return new JobBuilder("newTestJob", jobRepository)
                .start(newTestStep(transactionManager, jobRepository))
                .build();
    }

    @Bean
    public Step newTestStep
            (PlatformTransactionManager transactionManager, JobRepository jobRepository) {

        return new StepBuilder("newTestStep", jobRepository)
                .tasklet(newTestTasklet(), transactionManager)
                .build();
    }

    public Tasklet newTestTasklet() {
        return (contribution, chunkContext) -> {
            for(int i=0; i<10;i++){
                log.info(String.valueOf(i));
            }
            return RepeatStatus.FINISHED;
        };
    }
}

 

2. 다중 Step

@Slf4j
@Configuration
@RequiredArgsConstructor
public class JobConfig {
    @Bean
    public Job newTestJobMultiStep(PlatformTransactionManager transactionManager, JobRepository jobRepository) {
        return new JobBuilder("newTestJobMultiStep", jobRepository)
                .start(newTestStep1(transactionManager, jobRepository))
                .next(newTestStep2(transactionManager, jobRepository))
                .build();
    }

    @Bean
    public Step newTestStep1
            (PlatformTransactionManager transactionManager, JobRepository jobRepository) {
        return new StepBuilder("newTestStep1", jobRepository)
                .tasklet(newTestTasklet1(), transactionManager)
                .build();
    }

    public Tasklet newTestTasklet1() {
        return (contribution, chunkContext) -> {
            log.info("Step1 Do!!");
            return RepeatStatus.FINISHED;
        };
    }

    @Bean
    public Step newTestStep2
            (PlatformTransactionManager transactionManager, JobRepository jobRepository) {
        return new StepBuilder("newTestStep1", jobRepository)
                .tasklet(newTestTasklet2(), transactionManager)
                .build();
    }

    public Tasklet newTestTasklet2() {
        return (contribution, chunkContext) -> {
            log.info("Step2 Do!!");

            return RepeatStatus.FINISHED;
        };
    }
}

 

 

3. Flow를 통한 Step 구성

    @Bean
    public Job newTestJobFlow(PlatformTransactionManager transactionManager, JobRepository jobRepository) {
        return new JobBuilder("newTestJobFlow", jobRepository)
                .start(startStep(transactionManager,jobRepository))
                .on(ExitStatus.FAILED.getExitCode()) //startStep FAILED인 경우
                .to(failStep(transactionManager,jobRepository)) //failStep 실행
                .on("*") //결과상관없이
                .end() //종료
                .build()
                .build()
                ;
    }

    @Bean
    public Step startStep(PlatformTransactionManager transactionManager, JobRepository jobRepository) {
        return new StepBuilder("startStep",jobRepository)
                .tasklet(startTasklet(),transactionManager)
                .build()
                ;
    }

    public Tasklet startTasklet(){
        return (contribution, chunkContext) -> {
            log.info("startTasklet Do!");
            contribution.setExitStatus(ExitStatus.FAILED);
            return  RepeatStatus.FINISHED;
        };
    }

    @Bean
    public Step failStep(PlatformTransactionManager transactionManager, JobRepository jobRepository) {
        return new StepBuilder("failStep",jobRepository)
                .tasklet(failTasklet(),transactionManager)
                .build()
                ;
    }

    public Tasklet failTasklet(){
        return (contribution, chunkContext) -> {
            log.info("failTasklet Do!");
            contribution.setExitStatus(ExitStatus.COMPLETED);
            return RepeatStatus.FINISHED;
        };
    }

 

 

Chunk

Spring Batch에서 Chunk는 데이터 덩어리로 작업 시 각 커밋마다 처리되는 row 수를 의미한다.

즉, 한 번에 하나씩 데이터를 읽어 Chunk라는 덩어리로 만든 뒤, Chunk 단위로 트랜잭션을 다루는 것을 의미한다. 

또한, Chunk단위로 Transaction 수행하기에 실패 시 Chunk단위만큼 rollback된다.

 

Chunk 시나리오

1. 읽기(Reader) - DB에서 배치 처리할 Data 읽는다.

2. 처리(Processer) - Data를 처리한다.

3. 쓰기(Writer) - 처리한 Data를 DB에 저장한다.

 

1. With JPA

PagingSize와 ChunkSize

이 두 개는 서로 의미하는 바가 다르다.

PagingSize는 한 번에 조회할 item 수를 의미하며, ChunkSize는 앞서 말했듯이 트랜잭션 단위를 말한다.

예를 들어 PagingSize=4, ChunkSize=8인 경우는 2번의 read(4(pagingSize)x2) 후 1번의 커밋이 이루어진다.

1번의 트랜잭션에 대해서 2번의 쿼리 수행이 이루어지는 셈이다.

적절한 페이징 사이즈와 청크 사이즈에 관한 내용을 다음과 같이 가져왔다.

Setting a fairly large page size and using a commit interval that matches the page size should provide better performance.
페이지 크기를 상당히 크게 설정하고 페이지 크기와 일치하는 커밋 간격을 사용하면 성능이 향상됩니다.

 

 

도서의 isbn의 값을 모두 0000으로 바꾸는 배치를 예시로 구현하였다.

위의 페이징과 청크 사이즈를 참고하여 둘의 사이즈를 동일하게 맞췄다.

@Bean
    public Job chunkJob(PlatformTransactionManager transactionManager, JobRepository jobRepository){
        return new JobBuilder("chunkJob",jobRepository)
                .start(chunkStep(transactionManager,jobRepository))
                .build()
                ;
    }

    public Step chunkStep(PlatformTransactionManager transactionManager,JobRepository jobRepository){
        return new StepBuilder("chunkStep",jobRepository)
                .<BookEntity,BookEntity>chunk(5,transactionManager)
                .reader(bookReader())
                .processor(bookProcessor())
                .writer(bookWriter())
                .build()
                ;
    }

    @Bean
    public JpaPagingItemReader<BookEntity> bookReader(){
        log.info("ItemReader 실행됨");
        JpaPagingItemReader<BookEntity> reader = new JpaPagingItemReader<>();
        reader.setPageSize(5);
        reader.setName("reader");
        reader.setEntityManagerFactory(entityManagerFactory);
        reader.setQueryString("SELECT c FROM BookEntity c");
        return reader;
    }

    @Bean
    public ItemProcessor<BookEntity,BookEntity> bookProcessor(){
        log.info("bookProcessor 실행됨");
        return item -> {
            log.info("process 처리!!");
            item.setIsbn("000000");
            return item;
        };

    }

    @Bean
    public JpaItemWriter<BookEntity> bookWriter(){
        log.info("bookWriter 실행됨");
        JpaItemWriter<BookEntity> writer = new JpaItemWriter<>();
        writer.setEntityManagerFactory(entityManagerFactory);
        return writer;
    }

 

 

 

 

 

참고)

https://khj93.tistory.com/entry/Spring-Batch%EB%9E%80-%EC%9D%B4%ED%95%B4%ED%95%98%EA%B3%A0-%EC%82%AC%EC%9A%A9%ED%95%98%EA%B8%B0