Spring Batch Multi Thread를 통하여 작업 처리 도중 이상하게 특정 item들을 ItemProcessor단계에서 여러 번 호출하여 처리하는 이슈가 발생하였다. 이에 대한 원인과 해결책을 찾아보도록 하였다.
우선 전체 코드를 확인해본다.
@Slf4j
@Configuration
@RequiredArgsConstructor
public class FreeSubNotiRegBatchMultiThreadConfig {
private final EntityManagerFactory entityManagerFactory;
private final int chunkSize=4;
private final FootBallOpenFeignClient footBallOpenFeignClient;
private final NotiProcessor notiProcessor;
private final TaskExecutor taskExecutor;
@Bean
public Job freeSubNotiRegMultiThreadJob(JobRepository jobRepository, PlatformTransactionManager transactionManager){
return new JobBuilder("freeSubNotiRegMultiThreadJob",jobRepository)
.start(freeSubNotiRegMultiThreadStep(transactionManager,jobRepository))
.build()
;
}
@Bean
@JobScope
public Step freeSubNotiRegMultiThreadStep(PlatformTransactionManager transactionManager,JobRepository jobRepository){
return new StepBuilder("freeSubNotiRegMultiThreadStep",jobRepository)
.<FreeSubNotiEntity, FreeSubNotiEntity>chunk(chunkSize,transactionManager)
.reader(freeSubNotiRegMultiThreadReader(null))
.processor(freeSubNotiRegMultiThreadProcessor())
.writer(freeSubNotiRegMultiThreadWriter())
.taskExecutor(taskExecutor)
.throttleLimit(chunkSize)
.listener(readListener())
.listener(processListener())
.faultTolerant()
.skip(FeignException.class)
.skipLimit(100)
.build()
;
}
@Bean
@StepScope
public AbstractPagingItemReader<FreeSubNotiEntity> freeSubNotiRegMultiThreadReader(@Value("#{jobParameters[nowDt]}") String nowDt){
Map<String,Object> parameterValues = new HashMap<>();
parameterValues.put("nowDt",nowDt);
CustomJpaPagingItemReader<FreeSubNotiEntity> reader = new CustomJpaPagingItemReader<>();
reader.setParameterValues(parameterValues);
reader.setQueryString("SELECT m FROM FreeSubNotiEntity m " +
"where startDt>=:nowDt " +
"order by " +
"startDt desc," +
"startTm desc," +
"notiNo desc"
);
reader.setPageSize(chunkSize);
reader.setEntityManagerFactory(entityManagerFactory);
reader.setName("customPagingReader");
reader.setSaveState(false);
return reader;
}
@Bean
public ItemProcessor<FreeSubNotiEntity, FreeSubNotiEntity> freeSubNotiRegMultiThreadProcessor(){
return item->{
if(MatchDateUtils.hasAlreadyPassedOfMatch(item.getStartDt(),item.getStartTm())){
log.info("노티 요청번호 [{}] 매치명[{}] 처리 패스",item.getNotiNo(),item.getMatchName());
return item;
}
MatchInfoResDto response = footBallOpenFeignClient.getMatch(item.getMatchNo());
boolean isManagerSubFree = Boolean.parseBoolean(response.getIs_manager_free());
boolean isSuperSubFree = Boolean.parseBoolean(response.getIs_super_sub());
notiProcessor.doNotiProcess(item,isManagerSubFree,isSuperSubFree);
return item;
};
}
@Bean
public ItemWriter<FreeSubNotiEntity> freeSubNotiRegMultiThreadWriter(){
JpaItemWriter<FreeSubNotiEntity> writer = new JpaItemWriter<>();
writer.setEntityManagerFactory(entityManagerFactory);
return writer;
}
}
원인 추측
1. JpaPagingItemReader 인스턴스를 모든 스레드에서 공유
JpaPagingItemReader를 모든 스레드에서 공유하기에 다른 Thread에서 동일한 아이템을 읽으면서 발생한 문제인가 싶었지만 ItemProcessor단계에서 여러 번 처리하여도 Thread 번호는 변하지 않는 것을 확인하고 이는 원인이 아니라 넘어갔다.
2. Spring Batch의 skip/rollback
@Bean
@JobScope
public Step freeSubNotiRegMultiThreadStep(PlatformTransactionManager transactionManager,JobRepository jobRepository){
return new StepBuilder("freeSubNotiRegMultiThreadStep",jobRepository)
.<FreeSubNotiEntity, FreeSubNotiEntity>chunk(chunkSize,transactionManager)
.reader(freeSubNotiRegMultiThreadReader(null))
.processor(freeSubNotiRegMultiThreadProcessor())
.writer(freeSubNotiRegMultiThreadWriter())
.taskExecutor(taskExecutor)
.throttleLimit(chunkSize)
.faultTolerant()
.skip(FeignException.class)
.skipLimit(100)
.build()
;
}
ItemProcessor단에서 FeignException 관련 에러가 발생할 경우 처리하지 않고 건너뛰고 다음 아이템을 처리하도록 구현하기 위해 skip옵션을 적용하였다.
현재 위의 코드대로라면 ItemProcessor단에서 FeignException 발생할 경우 현재 Chunk 전체를 롤백하고, Skip 예외 대상을 건너뛰고, 건너뛴 대상 외에 아이템들을 처음 Chunk 순서대로 재처리한다.
ex.) 1,2,3,4 대상 조회 -> 1,2,3(처리 도중 예외 발생) -> 1,2,4 재처리
따라서, 이 과정에서 아이템은 이미 한 번 처리되었음에도 불구하고 다시 ItemProcessor 처리 대상에 들어가 여러 번 처리되어지는 것이다.
https://dgjinsu.tistory.com/57
해결
그렇다면 예외 발생 시 Chunk의 처음부터 다시 처리하는 게 아닌 예외 발생 대상 이후 아이템들을 바로 처리하도록 개선하면 된다.
1. Processor에서 예외 잡기(→null 리턴)
@Bean
@JobScope
public Step freeSubNotiRegMultiThreadStep(PlatformTransactionManager transactionManager,JobRepository jobRepository){
return new StepBuilder("freeSubNotiRegMultiThreadStep",jobRepository)
.<FreeSubNotiEntity, FreeSubNotiEntity>chunk(chunkSize,transactionManager)
.reader(freeSubNotiRegMultiThreadReader(null))
.processor(freeSubNotiRegMultiThreadProcessor())
.writer(freeSubNotiRegMultiThreadWriter())
.taskExecutor(taskExecutor)
.throttleLimit(chunkSize)
// .faultTolerant()
// .skip(FeignException.class)
// .skipLimit(100)
.build()
;
}
@Bean
public ItemProcessor<FreeSubNotiEntity, FreeSubNotiEntity> freeSubNotiRegMultiThreadProcessor(){
return item->{
if(MatchDateUtils.hasAlreadyPassedOfMatch(item.getStartDt(),item.getStartTm())){
log.info("노티 요청번호 [{}] 매치명[{}] 처리 패스",item.getNotiNo(),item.getMatchName());
return item;
}
try{
MatchInfoResDto response = FootBallOpenFeignClient.getMatch(item.getMatchNo());
boolean isManagerSubFree = Boolean.parseBoolean(response.getIs_manager_free());
boolean isSuperSubFree = Boolean.parseBoolean(response.getIs_super_sub());
notiProcessor.doNotiProcess(item,isManagerSubFree,isSuperSubFree);
return item;
}catch (FeignException e){
log.warn("Feign 실패, 해당 아이템 스킵: {}", item.getNotiNo());
return null; // null을 리턴하면 이 아이템은 Writer로 넘어가지 않고 버려짐
}
};
}
비즈니스 로직 안에 try catch 구문을 두어 직접 처리하는 방식이다.
예외처리 정책이 비즈니스 코드 안에 섞이긴 하나 구현은 간단하다. 또한, skipCount 등 스킵 통계에서 빠져버리고 SkipListener 호출이 불가하다.
2. skip + noRollback 옵션 사용(해당 방식 사용)
@Bean
@JobScope
public Step freeSubNotiRegMultiThreadStep(PlatformTransactionManager transactionManager,JobRepository jobRepository){
return new StepBuilder("freeSubNotiRegMultiThreadStep",jobRepository)
.<FreeSubNotiEntity, FreeSubNotiEntity>chunk(chunkSize,transactionManager)
.reader(freeSubNotiRegMultiThreadReader(null))
.processor(freeSubNotiRegMultiThreadProcessor())
.writer(freeSubNotiRegMultiThreadWriter())
.taskExecutor(taskExecutor)
.throttleLimit(chunkSize)
.faultTolerant()
.skip(FeignException.class)
.skipLimit(100)
.noRollback(FeignException.class) //noRollback 옵션 추가
.build()
;
}
예외처리 정책이 배치 구성 코드에 존재하기에 비즈니스 코드와도 분리되어있다.
faultTolerant 관련 옵션을 사용하는 방식으로 스킵 통계에 기록 및 SkipListener 사용 가능하다.
'SpringBoot > 오류' 카테고리의 다른 글
Spring Batch Partitioning에서 Job이 끝나지 않는다면 의심해보자 (1) | 2025.01.05 |
---|---|
Spring Batch MultiThread 병렬 처리 시 LazyInitializationException 발생 원인과 해결 과정 (0) | 2024.12.19 |
SpringSecurity 순환 참조(circular references) 발생 (1) | 2024.09.11 |
@Valid MethodArgumentNotValidException 처리 (0) | 2024.08.23 |
SpringBatch JpaPagingReader 조건을 통한 조회 시 문제점과 해결방안 (0) | 2024.05.03 |