스프링 배치 소개
- 스프링 배치는 Job 안에 여러개의 Step이 있는 구조다
@Bean
public Job myJob(JobRepository jobRepository, Step step1, Step step2) {
return new JobBuilder("myJob", jobRepository)
.start(step1) // Step1 실행
.next(step2) // Step2 실행
.build();
- JobRepository
Job이 실행될 때마다 실행한 기록(성공/실패)과 관련된 상태 정보가 JobRepository에 저장된다.
저장되는 위치는 기본적으로 인모메리 DB인 HSQL 메모리에 저장된다.
하지만 mysql 의존성을 어떻게든 추가하면 거기다가 BATCH 관련 테이블을 저장한다.
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/spring_batch
spring.datasource.username=root
spring.datasource.password=password
- Step은 Tasklet 혹은 Chunk(Chunk Oriented Processing) 기반으로 나뉘며 Chunk는 대량 데이터를 처리하고 Tasklet은 단순한 로직을 처리한다.
- Chuck 구성 요소
- ItemReader - DB, CSV, JDBC, JPA 등을 이용해 데이터를 읽는 역할을 한다.
- ItemProcessor - 가져온 데이터를 가공한다.
- ItemWriter - 가공된 데이터를 저장한다.
여기서 알 수 있는 점은 스프링 배치는 크게 Read(읽기), Process(처리), Write(쓰기) 작업으로 나뉜다.
ItemReader, ItemProcessor, ItemWriter
이 3개는 말 그대로 Item에 관한 처리를 한다는 의미다.
쉽게 표현하자면 Items에 대한 처리가 아니라 Item이다.
무슨 말이냐면 Reader에서 데이터를 읽어올 때 한번에 읽어오는 것이 아니라 하나씩 읽어서 Processor로 던져서 처리한다.
그러므로 요구 조건에 따라 어떻게 처리해야할 지 결정해야하는 것인데
가령 데이터를 읽어서 중복을 제거하고 변형하는 작업은 위 3가지 흐름을 이용하고
단순히 리스트에 대한 총합 갯수는 Tasklet을 이용하면 된다.
기존 Java를 알던 사람이라면 조금 당황스러운 부분은 Java 자료구조형인 List, Set 등을 이용하면 중복을 거르는 로직을 Spring Batch에서는 그렇게 이용하면 안된다. 기본적으로 한 저장 공간에 모든 것을 담아두고 사용하면 메모리 부하가 생기기 때문이다.
스프링 배치는 빠르게 처리하는 것에도 목적이 있지만 메모리 부하를 관리하면서 통계 처리하는 데에도 의의가 있다.
그러므로 속도 뿐만 아니라 읽기 단위를 몇개로 할 지에 대한 균형을 잘 맞춰서 개발하는 것이 중요하다.
Tasklet
@Component
public class SimpleTasklet implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
// StepContribution에서 JobParameters 바로 가져오기
JobParameters jobParameters = contribution.getStepExecution()
.getJobExecution()
.getJobParameters();
// JobParameters jobParameters = chunkContext.getStepContext().getJobParameters();
// chunkContext 방식은 복잡함
String name = jobParameters.getString("name");
System.out.println("Hello, " + name + " with Tasklet!");
return RepeatStatus.FINISHED;
}
}
tasklet에선 간단한 처리를 위해 사용하며 읽기, 처리, 쓰기를 모두 하나의 execute 메소드 안에서 구현한다.
StepContribution과 ChunkContext
StepContribution은 현재 실행 중인 Step의 실행 상태를 관리하는 객체다.
ChunkContext는 현재 실행 중인 Chunk의 실행 상태를 관리하는 객체다.
tasklet과 chunk는 서로 다른데 tasklet에서 ChunkContext를 쓰는 이유는 Step이 항상 Chunk 기반으로 설계되어 있기 때문이다.
JobParamter
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(batchExecutionLogJob, jobParameters);
위와 같이 job을 실행하기전에 Job Parameter를 전달할 수 있다.
Job Parameter란 스프링 배치의 Job에 동적 변수를 전달하는 행위를 말한다.
tasklet에서 구현된 execute에서 StepContribution, ChunkContext에서 모두 jobParameter를 가져올 수 있다.
Chunk
Chunk는 위에 설명했듯이 Read(읽기), Process(처리), Write(쓰기) 작업으로 나뉜다
그래서 메소드도 3개로 만든다.
Job 내에서 3개를 순서대로 호출하는 형태다.
StepScope 어노테이션
@StepScope
public StepScopeItemReader(@Value("#{jobParameters['filePath']}") String filePath) {
System.out.println("Reading file from path: " + filePath); // JobParameter 출력
List<String> items = Arrays.asList("Line1", "Line2", "Line3");
this.data = items.iterator();
}
우선 위 코드는 Reader 메소드의 구현인데, 추가적으로 @StepScope를 붙이면 JobParameter를 넘겨받을 수 있다.
@StepScope가 붙으면 Step이 실행되는 시점에 Bean을 생성한다.
Chunk 방식에서 JobParameter를 사용하기 위해서는 이렇게 사용한다.
물론 이 Reader는 Job 내에서 호출 시키는 형태인데, 거기서 위에서 설명한 StepContribution과 ChunkContext가 존재해서 거기서 사용해도 되지만 그건 tasklet에서 가져다 쓰는 방법이고 이건 Reader에서 갖다 쓰는 방법이다.
StepScope의 프록시 객체

일단 결론만 말하자면 위에 코드에서 @StepScope를 붙이는 경우 Return 타입을 ItemReader로 하면 안되고 구현체는 JpaPagingItemReader를 해야한다고 한다.
그냥 테스트 코드 돌릴 때 ItemReader 타입을 리턴할 경우
@StepScope의 proxyMode = ScopedProxyMode.TARGET_CLASS로 인해서 ItemReader 인터페이스의 프록시 객체를 생성하여 리턴하게 된다.
그 프록시 객체에는 Listener Annotation을 사용할 수 없기 때문에(예를 들어 @AfterStep, @BeforeStep) 구현 클래스로 리턴해줘야 한다고 함.
안하면 o.s.b.c.l.AbstractListenerFactoryBean : org.springframework.batch.item.ItemReader is an interface. The implementing class will not be queried for annotation based listener configurations. If using @StepScope on a @Bean method, be sure to return the implementing class so listner annotations can be used. 이 에러 발생.
Chunk 구조 파헤치기
@Bean
public Step dailyEventLogStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("dailyEventLogStep", jobRepository)
.<SearchResponse, Long>chunk(1, transactionManager)
.reader(eventLogReader(null))
.processor(eventLogProcessor())
.writer(eventLogWriter())
.faultTolerant()
.retry(Exception.class)
.retryLimit(3)
.build();
}
- chunk(1, 이라고 선언하면 reader에서 데이터 조회해온 결과를 몇개씩 처리할 지 결정하는 것을 말한다.
reader에서 100건을 가져오면 1건씩 처리하겠다는 의미다.
- transactionManager는 reader, processor, writer 기능을 서로 분리해서 트랜잭션 관리하는 기능을 말한다.
@Bean
@StepScope
public ItemReader<SearchResponse> eventLogReader(
@Value("#{jobParameters[date]}") String dateStr) {
return new ItemReader<>() {
이건 Reader의 코드인데, 오픈서치의 결과를 ItemReader로 묶어서 리턴하게 되면 자동으로 Processor에 결과를 던지게 된다.
테스트 코드
@ExtendWith(MockitoExtension.class)
class DailyEventLogJobConfigTest {
@Mock
private RestHighLevelClient openSearchClient;
@InjectMocks
private DailyEventLogJobConfig jobConfig;
InjectMocks는 실제 테스트 대상이고 거기서 필요한 openSearchClient가 있으니까 그걸 가짜로 만든다.
@Slf4j
@Configuration
@RequiredArgsConstructor
public class DailyEventLogJobConfig {
private final RestHighLevelClient openSearchClient;
private final StatisticsRepository statisticsRepository;
@Value("${opensearch.index-prefix}")
private String indexPrefix;
실제로 보면 openSearchClient가 필요하다.
그리고 여기서 indexPrefix는 private인데 테스트 코드에서 직접 접근할 수 없ekrh gksek.
@BeforeEach
void setUp() {
ReflectionTestUtils.setField(jobConfig, "indexPrefix", "test-index");
}
그래서 위와 같이 ReflectionTestUtils라는 Junit에 포함된 기능을 이용해 임의로 설정한다고 함.
afterStep과 afterJob
말그대로 Step과 Job이 끝날 때 실행된다.
검색하다보면 step의 요소는 ItemReader, Processor, Writer가 있다고 설명한다.
하지만 이게 한 사이클이 돌았을 때를 "하나의 step"이라고 명명하진 않는다.
무슨 말이냐면 가령 하나의 step 내에서 청크 단위 10개로 9번 회전하는 로직이 있다고 가정하자.
그러니까 ItemReader, Processor, Writer가 하나의 사이클을 9번 돈다는건데, 이게 그럼 하나의 Step이란 것은 9번이 아니라 Step안에서 처리되는 모든 과정을 하나의 Step이라고 한다.
그러므로 Step과 Job이 1개씩밖에없다면 afterStep과 afterJob은 실행시점이 동일하다.
@Bean
public Job commandFilteringJob() throws Exception {
return new JobBuilder("commandFilteringJob", jobRepository)
.start(step1()) // 엑셀 파일 읽기 Step
.next(step2()) // 데이터 검증 Step
.next(step3()) // 결과 저장 Step
.listener(jobCompletionListener())
.build();
}
'🍃 Spring' 카테고리의 다른 글
[Spring] RFC 7232 - Conditional Requests로 비용 및 부하 최적화 하기 (0) | 2025.01.21 |
---|---|
[Spring] gRPC 사용법 (0) | 2025.01.20 |
[Spring] Redis 연결 관리 및 성능 최적화 (0) | 2025.01.06 |
[Spring] 동시성(Concurrency) 이슈 - 그 외(3) (0) | 2024.11.05 |
[Spring] 동시성(Concurrency) 이슈 - Database(2) (3) | 2024.10.25 |