🍃 Spring

[Spring Batch] 개인 정리

loose 2025. 2. 14. 22:46
반응형

스프링 배치 소개

- 스프링 배치는 Job 안에 여러개의 Step이 있는 구조다

@Bean
public Job myJob(JobRepository jobRepository, Step step1, Step step2) {
    return new JobBuilder("myJob", jobRepository)
            .start(step1)  // Step1 실행
            .next(step2)   // Step2 실행
            .build();

 

- JobRepository
Job이 실행될 때마다 실행한 기록(성공/실패)과 관련된 상태 정보가 JobRepository에 저장된다.

저장되는 위치는 기본적으로 인모메리 DB인 HSQL 메모리에 저장된다.

하지만 mysql 의존성을 어떻게든 추가하면 거기다가 BATCH 관련 테이블을 저장한다.

spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/spring_batch
spring.datasource.username=root
spring.datasource.password=password

 

 

- Step은 Tasklet 혹은 Chunk(Chunk Oriented Processing) 기반으로 나뉘며 Chunk는 대량 데이터를 처리하고 Tasklet은 단순한 로직을 처리한다.

- Chuck 구성 요소

  • ItemReader - DB, CSV, JDBC, JPA 등을 이용해 데이터를 읽는 역할을 한다.
  • ItemProcessor - 가져온 데이터를 가공한다.
  • ItemWriter - 가공된 데이터를 저장한다.

여기서 알 수 있는 점은 스프링 배치는 크게 Read(읽기), Process(처리), Write(쓰기) 작업으로 나뉜다.

Tasklet

@Component
public class SimpleTasklet implements Tasklet {

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        // StepContribution에서 JobParameters 바로 가져오기
        JobParameters jobParameters = contribution.getStepExecution()
                                                  .getJobExecution()
                                                  .getJobParameters();
                                                  
	// JobParameters jobParameters = chunkContext.getStepContext().getJobParameters();
        // chunkContext 방식은 복잡함
        String name = jobParameters.getString("name");

        System.out.println("Hello, " + name + " with Tasklet!");

        return RepeatStatus.FINISHED;
    }
}

 

tasklet에선 간단한 처리를 위해 사용하며 읽기, 처리, 쓰기를 모두 하나의 execute 메소드 안에서 구현한다.

StepContribution과 ChunkContext

StepContribution은 현재 실행 중인 Step의 실행 상태를 관리하는 객체다.

ChunkContext는 현재 실행 중인 Chunk의 실행 상태를 관리하는 객체다.

 

tasklet과 chunk는 서로 다른데 tasklet에서 ChunkContext를 쓰는 이유는 Step이 항상 Chunk 기반으로 설계되어 있기 때문이다.

JobParamter

 JobParameters jobParameters = new JobParametersBuilder()
        .addLong("time", System.currentTimeMillis())
        .toJobParameters();

      jobLauncher.run(batchExecutionLogJob, jobParameters);

위와 같이 job을 실행하기전에 Job Parameter를 전달할 수 있다.

Job Parameter란 스프링 배치의 Job에 동적 변수를 전달하는 행위를 말한다.

tasklet에서 구현된 execute에서 StepContribution, ChunkContext에서 모두 jobParameter를 가져올 수 있다.

Chunk

Chunk는 위에 설명했듯이 Read(읽기), Process(처리), Write(쓰기) 작업으로 나뉜다

그래서 메소드도 3개로 만든다.

Job 내에서 3개를 순서대로 호출하는 형태다.

StepScope 어노테이션

@StepScope
public StepScopeItemReader(@Value("#{jobParameters['filePath']}") String filePath) {
        System.out.println("Reading file from path: " + filePath); // JobParameter 출력
        List<String> items = Arrays.asList("Line1", "Line2", "Line3");
        this.data = items.iterator();
    }

 

우선 위 코드는 Reader 메소드의 구현인데, 추가적으로 @StepScope를 붙이면 JobParameter를 넘겨받을 수 있다.

@StepScope가 붙으면 Step이 실행되는 시점에 Bean을 생성한다.

Chunk 방식에서 JobParameter를 사용하기 위해서는 이렇게 사용한다.

물론 이 Reader는 Job 내에서 호출 시키는 형태인데, 거기서 위에서 설명한 StepContribution과 ChunkContext가 존재해서 거기서 사용해도 되지만 그건 tasklet에서 가져다 쓰는 방법이고 이건 Reader에서 갖다 쓰는 방법이다.

 

StepScope의 프록시 객체

일단 결론만 말하자면 위에 코드에서 @StepScope를 붙이는 경우 Return 타입을 ItemReader로 하면 안되고 구현체는 JpaPagingItemReader를 해야한다고 한다.

 

그냥 테스트 코드 돌릴 때 ItemReader 타입을 리턴할 경우

@StepScope의 proxyMode = ScopedProxyMode.TARGET_CLASS로 인해서 ItemReader 인터페이스의 프록시 객체를 생성하여 리턴하게 된다.

그 프록시 객체에는 Listener Annotation을 사용할 수 없기 때문에(예를 들어 @AfterStep, @BeforeStep) 구현 클래스로 리턴해줘야 한다고 함.

 

안하면 o.s.b.c.l.AbstractListenerFactoryBean    : org.springframework.batch.item.ItemReader is an interface.  The implementing class will not be queried for annotation based listener configurations.  If using @StepScope on a @Bean method, be sure to return the implementing class so listner annotations can be used. 이 에러 발생.

 

Chunk 구조 파헤치기

   @Bean
    public Step dailyEventLogStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("dailyEventLogStep", jobRepository)
                .<SearchResponse, Long>chunk(1, transactionManager)
                .reader(eventLogReader(null))
                .processor(eventLogProcessor())
                .writer(eventLogWriter())
                .faultTolerant()
                .retry(Exception.class)
                .retryLimit(3)
                .build();
    }

 

- chunk(1, 이라고 선언하면 reader에서 데이터 조회해온 결과를 몇개씩 처리할 지 결정하는 것을 말한다.

reader에서 100건을 가져오면 1건씩 처리하겠다는 의미다. 

- transactionManager는 reader, processor, writer 기능을 서로 분리해서 트랜잭션 관리하는 기능을 말한다.

 

 @Bean
    @StepScope
    public ItemReader<SearchResponse> eventLogReader(
            @Value("#{jobParameters[date]}") String dateStr) {
        return new ItemReader<>() {

이건 Reader의 코드인데, 오픈서치의 결과를 ItemReader로 묶어서 리턴하게 되면 자동으로 Processor에 결과를 던지게 된다.

 

테스트 코드

@ExtendWith(MockitoExtension.class)
class DailyEventLogJobConfigTest {

    @Mock
    private RestHighLevelClient openSearchClient;

    @InjectMocks
    private DailyEventLogJobConfig jobConfig;

 

InjectMocks는 실제 테스트 대상이고 거기서 필요한 openSearchClient가 있으니까 그걸 가짜로 만든다.

@Slf4j
@Configuration
@RequiredArgsConstructor
public class DailyEventLogJobConfig {

    private final RestHighLevelClient openSearchClient;
    private final StatisticsRepository statisticsRepository;

    @Value("${opensearch.index-prefix}")
    private String indexPrefix;

실제로 보면 openSearchClient가 필요하다.

 

그리고 여기서 indexPrefix는 private인데 테스트 코드에서 직접 접근할 수 없ekrh gksek.

  @BeforeEach
    void setUp() {
        ReflectionTestUtils.setField(jobConfig, "indexPrefix", "test-index");
    }

그래서 위와 같이 ReflectionTestUtils라는 Junit에 포함된 기능을 이용해 임의로 설정한다고 함.

 

728x90