본문 바로가기
SpringBoot/오류

Spring Batch Partitioning에서 Job이 끝나지 않는다면 의심해보자

by se0nghyun2 2025. 1. 5.

Spring Batch Partitioning을 통한 병렬 처리 작업 시 특정 Worker Step은 정상 종료되었으나 그 외 다른 Worker Step은 종료되지 않아 job이 끝나지 않는 케이스가 발생하였다.

또한 어떠한 Exception도 로그로 찍고 있지 않고 있어 원인을 찾는데 애를 먹었다.

 

전체 코드(필요코드 외 생략)

@Slf4j
@Configuration
@RequiredArgsConstructor
public class FreeSubNotiBatchPartitionConfig {
    private final EntityManagerFactory entityManagerFactory;
    private final int chunkSize=5;
    private final TaskExecutor taskExecutor;
    private final FreeSubNotiRepository freeSubNotiRepository;
    ....
    
    

    @Bean
    public Job FreeSubNotiBatchPartitionJob(JobRepository jobRepository, PlatformTransactionManager transactionManager){
        return new JobBuilder("freeSubNotiBatchPartitionJob",jobRepository)
                .start(step1Manager(transactionManager,jobRepository))
                .build()
                ;
    }

    @Bean
    public Step step1Manager(PlatformTransactionManager transactionManager, JobRepository jobRepository){
        return new StepBuilder("step1Manager",jobRepository)
                .partitioner("step1",partitioner(null)) //step1에 사용될 partition 구현체를 등록
                .step(step1(transactionManager,jobRepository)) //파티셔닝 될 step을 등록, step1이 등록한 Partition 로직에 따라 서로 다른 StepExecutions를 가진 여러 개로 생성
                .partitionHandler(partitionHandler(transactionManager,jobRepository)) //사용할 partitionHandler 등록
                .listener(partitionListener())
                .build()
                ;
    }

    //매니저(마스터) step이 Worker Step을 어떻게 구성할지 정의
    @Bean
    public PartitionHandler partitionHandler(PlatformTransactionManager transactionManager, JobRepository jobRepository){
        //Local환경에서 Multi Thread로 수행할 수 있도록 TaskExecutorPartitionHandler 할당
        TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler();

        //Worker로 실행할 Step 지정
        //Partitioner가 만들어준 stepExecutions 환경에서 개별적으로 실행
        partitionHandler.setStep(step1(transactionManager,jobRepository));

        partitionHandler.setTaskExecutor(executor());

        partitionHandler.setGridSize(4);

        return partitionHandler;
    }


    @StepScope
    @Bean
    public Partitioner partitioner(@Value("#{jobParameters['startDt']}") String startDt){
        return new NewNotiNoRangePartitioner(freeSubNotiRepository,startDt);
    }

    @Bean
    public Step step1(PlatformTransactionManager transactionManager, JobRepository jobRepository){
        return new StepBuilder("step1",jobRepository)
                .<FreeSubNotiEntity, FreeSubNotiEntity>chunk(chunkSize,transactionManager)
                .reader(reader(null,null))
                .processor(processor())
                .writer(writer())
                .taskExecutor(executor())
                .listener(partitionListener())
                .listener(readListener())
                .listener(processListener())
                .faultTolerant()
                .skip(FeignException.class)
                .skipLimit(100)
                .build()
                ;
    }
    
    @Bean
    public TaskExecutor executor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(8);
        executor.setQueueCapacity(10);
        executor.setThreadNamePrefix("partition-thread");
        executor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE);
        executor.initialize();
        return executor;
    }
    
    
    .........
 }

 

실행할 기본 스레드 수(CorePoolSize)는 4개, 만약 설정된 기본 스레드가 다 찼다면 큐에 대기시킬 요청의 최대 갯수(QueueCapacity)를 10개로 두고,만약 이마저도 다 찼다면 최대 스레드 개수(MaxPoolSize)인 8개로 늘려 작업을 처리하도록 도와주는 ThreadPoolExecutor를 사용한다.

 

 

java.util.concurrent.RejectedExecutionException

더보기

java.util.concurrent.RejectedExecutionException: Task org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable@409e4eee rejected from org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor$1@21ed3e72[Running, pool size = 8, active threads = 8, queued tasks = 10, completed tasks = 0]

하나하나 디버깅하여 Exception을 발견하였다.

해당 Exception은 스레드 풀의 큐와 사용가능한 스레드가 없는 경우 발생하는 Exception이다.

그러나 총 8개의 스레드와 10개의 요청을 대기시킬 정도의 작업은 분명히 내 환경에선 없다.

그렇다면 어디선가 계속계속 스레드 요청을 증가시킨다는 소리로 이해하고 해당 부분을 찾아 나섰다.

 

스레드 요청하는 부분을 찾아보자.

1. TaskExecutorPartitionHandler

 

첫번째 요청하는 부분을 발견하였다.

설정한 gridSize 수만큼 파티셔닝된 step들(patitionStepExecutions)을 for문을 통해 FutureTask로 만들어 TaskExecutor를 통해 실행시킨다. (gridSzie=4로 세팅했기에 실행될 기본 스레드 개수인 4개의 스레드로 수행) 

그리고 마지막에 FutureTask의 get을 호출하여 결과를 반환할 때까지 대기하게 된다. 

 

 

2. TaskletStep

TaskletStep.doExecute메소드에서 stepOperations을 통해 반복실행된다.

이 때, stepOperations의 구현체는 TaskExecutorRepeatTemplate 이다.

 

TaskExecutorRepeatTemplate

두번째 요청하는 부분도 발견하였다.

해당 메소드에서 taskExecutor.execute를 호출한다.

이미 corePoolSize인 4개의 파티셔닝된 step들을 실행 중인 스레드가 있기 때문에 큐에 요청을 대기시켜두게 된다. 

 

이상한 부분이 있다

첫째, 이미 실행중인 4개의 스레드 작업들은 이제 시작될 스레드 작업이 선행되어야 종료될 수 있다. 

파티셔닝된 step들을 실행 중인 4개의 스레드들은 정상적으로 처리하고 종료되기 위해선 현재 요청이 마무리 되어야 한다.

 

둘째로, 디버깅하여 보니 while반복문을 빠져나가지 못하고 지속적으로 taskExecutor.excute를 호출하게 된다.

그래서 큐에 계속 요청이 쌓이게 되고, 요청이 쌓이다 보니 큐마저 꽉 차 maxPoolSize인 8개의 처리 가능한 스레드를 늘려 특정 요청을 처리하게 된다. 그러다 늘린 스레드마저도 꽉 차게 되어 RejectedExecutionException 발생하게 된 것이다.

 

 

그런데 stepOperations 구현체가 TaskExecutorRepeatTemplate 가 맞을까??

확인해 보니 TaskExecutorRepeatTemplate가 아닌 RepeatTemplate이여야 한다.

구현체가 잘못 세팅되어 있었기에 RejectedExecutionException 발생하여 잡이 끝나지 못하게 된것이였다.

 

 

그러면 어디stepOperations 구현체를 바꿔줄 수 있을까?

AbstractTaskletStepBuilder

 

taskExecutor가 null이 아닌 경우 TaskExecutorRepeatTemplate을 세팅하기에 taskExecutor를 넣어주지 않으면 된다.

 

해결

step1 생성 시 taskExecutor는 제외해주면 된다.

    @Bean
    public Step step1(PlatformTransactionManager transactionManager, JobRepository jobRepository){
        return new StepBuilder("step1",jobRepository)
                .<FreeSubNotiEntity, FreeSubNotiEntity>chunk(chunkSize,transactionManager)
                .reader(reader(null,null))
                .processor(processor())
                .writer(writer())
//                .taskExecutor(taskExecutor)
                .listener(partitionListener())
                .listener(readListener())
                .listener(processListener())
                .faultTolerant()
                .skip(FeignException.class)
                .skipLimit(100)
                .build()
                ;
    }