1. 무슨 문제가 있었는지?
스프링 배치를 이용하여 Chunk 기반으로 개발하고 있었는데 ItemReader를 구현하는 부분에서 Limit, Offset이 적용되지 않아 전체 테이블을 조회하는 문제가 발생했는데 ItemReader는 JpaPagingItemReader를 이용했고 아래와 같이 PageSize를 설정하여 사용하고 있다.
실제 서비스에서 사용하는 코드를 올릴 수 없어서 임의의 엔티티 클래스(TestEntity)와 파라미터(param1, param2)를 설정했다. 스프링 배치를 주로 사용하신 분이라면 JPQL 만 보고 문제의 원인을 바로 알아보는 분들이 계실수도 있지만 나는 배치를 처음 사용하는 것이기 때문에 2~3일 정도를 오류 분석에 시간을 투자했다.. ㅠㅠ
우선 삽질한 내용이 길기 때문에 이번 포스팅에서는 Chunk 기반에서 트랜잭션과 ItemReader, ItemProcessor, ItemWriter가 어떻게 사용되는지 알아보려고 한다.
@Bean
public ItemReader<TaxInvoiceReserveEntity> exportItemReader() {
HashMap<String, Object> parameters = new HashMap<>();
parameters.put("param1", {{param1}});
parameters.put("param2", {{param2}});
return new JpaPagingItemReaderBuilder<TestEntity>()
.entityManagerFactory(entityManagerFactory)
.name("JpaPagingItemReader")
.pageSize(CHUNK_SIZE)
.queryString("""
SELECT tiReserve
FROM TestEntity t1
LEFT JOIN FETCH t1.items
WHERE t1.param1 =:param1
AND t1.param2 = :param2
ORDER BY t1.id ASC
""")
.parameterValues(parameters)
.build();
}
2. 문제의 원인은?
우선 ItemReader 가 어디서 호출되는지 코드를 추적해보았다. 처음 과정은 약간 생략하고 트랜잭션 관리를 확인하기 위해 Step에서 tasklet을 실행하는 코드부터 살펴보도록 하자.
아래 코드를 보면 Step(TaskletStep)에서 stepOperations를 통해 Loop 돌면서 TransactionTemplate의 execute 메서드의 인자로 ChunkTransactionCallback을 전달하고 있다. execute 메서드는 스프링에서 트랜잭션 관리를 위해 AOP를 적용한 코드이고 해당 메서드 내에서 ChunkTransactionCallback의 doInTransaction(TransactionStatus status) 메서드가 호출된다.
ChunkTransactionCallback의 doInTransaction 메서드는 결국 ChunkOrientedTasklet 의 execute 메서드를 실행하게 된다. execute 메서드 안에서는 read, process, write 가 동작하는데 이 말은 즉 청크 단위(read+process+write)로 트랜잭션을 관리된다는 의미이다.
(참고로 ChunkTransactionCallback 클래스는 TaskletStep 클래스의 내부 클래스이다)
// TaskletStep.class
@Override
protected void doExecute(StepExecution stepExecution) throws Exception {
// 생략...
stepOperations.iterate(new StepContextRepeatableCallback(stepExecution) {
@Override
public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunckContext)
throws Exception {
// 생략...
RepeatStatus result;
try {
result = new TransactionTemplate(transactionManager, transactionAttribute)
.execute(new ChunkTransactionCallback(chunkContext, semaphore));
} catch(...) {}
}
});
}
ChunkTransactionCallback 클래스의 doInTransaction 메서드는 아래와 같이 동작한다. 참고로 Chunk 방식에서는 tasklet은 ChunkOrientedTasklet 구현체를 사용하게 된다.
@Override
public RepeatStatus doInTransaction(TransactionStatus status) {
// 생략..
result = tasklet.execute(contiribution, chunkContext);
// ...
}
ChunkOrientedTasklet 의 execute 메서드는 다음과 같다. ChunkProvider의 provide 메서드를 통해 Chunk<I> inputs 를 취득하고 그 데이터를 ChunkProcessor의 process 메서드로 전달하고 있다.
// ChunkOrientedTasklet.class
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
// 생략...
Chunk<I> inputs = ...;
inputs = chunkProvider.provide(contribution);
chunkProcessor.process(contribution, inputs);
// ...
}
먼저 ChunkProvider의 provide 메서드를 살펴보면 Provider 내부의 repeatOperations를 통해 read 메서드를 통해 반복적으로 호출하도록 되어있다. read 메서드를 따라가다보면 doRead() 메서드에서 ItemReader의 read 메서드를 호출하는 것을 확인할 수 있다.
이렇게 read 메서드를 통해 item 을 하나씩 가져오는데 item이 null이 아닐 때까지 반복하면서 Chunk<I> inputs에 데이터를 추가하고 있다. item이 null이 되면 반복을 종료하면서 Chunk<I> inputs를 리턴한다.
// SimpleChunkProvider.class
@Override
public Chunk<I> provide(final StepContribution contribution) throws Exception {
final Chunk<I> inputs = new Chunk<>();
repeatOperations.iterate(new RepeatCallback() {
@Override
public RepeatStatus doInIteration(final RepeatContext context) throws Exception {
I item = null;
// ...
item = read(contribution, inputs); // <== 여기가 핵심인듯!!
// ...
if(item == null) {
inputs.setEnd();
return RepeatStatus.FINISHED;
}
inputs.add(item);
contribution.incrementReadCount();
return RepeatStatus.CONTINUABLE;
}
});
return inputs;
}
protected final I doRead() throws Exception {
// 생략...
I item = itemReader.read(); // <== ItemReader 사용부분
return item;
}
자 그럼 다시 ChunkOrientedTasklet의 execute 메서드로 돌아와서 ChunkProvider의 provide 메서드를 통해 리턴받은 input값을 ChunkProcessor의 process 메서드로 전달하는 부분을 살펴보자.
먼저 ChunkProcessor의 process 메서드에서 transform 메서드를 호출하는데 transform 메서드는 Chunk<I> inputs를 전달받고 Chunk<O> outputs 을 리턴한다. transform 메서드 내부에서는 doProcess 메서드를 호출하고 doProcess 메서드에서 ItemProcessor의 process 메서드를 호출하는 것을 확인할 수 있다.
ChunkProcessor의 transform 메서드 이후 write 메서드를 호출하고 있다. write 메서드를 따라가다보면 writeItems 메서드에서 ItemWriter의 write 메서드를 실행하는 것을 확인할 수 있다. 여기서 확인해야 하는 부분은 writeItems 메서드에서는 Chunk<O> items 라는 청크 단위를 전달받아서 처리하고 있다.
// SimpleChunkProcessor.class
@Override
public final void process(StepContribution contribution, Chunk<I> inputs) throws Exception {
// 생략...
Chunk<O> outputs = transform(contribution, inputs);
// ..
write(contribution, inputs, getAdjustedOutputs(inputs, outputs));
}
protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws Exception {
Chunk<O> outputs = new Chunk<>();
for(Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {
final I item = iterator.next();
O output;
// 생략...
output = doProcess(item);
// ...
if (output != null) {
outputs.add(output);
}
// ...
}
return outputs;
}
protected final O doProcess(I item) throws Exception {
// 생략...
O result = itemProcessor.process(item); // <== ItemProcessor 사용부분
return result;
}
protected final void doWrite(Chunk<O> items) throws Exception {
// 생략...
writeItems(items);
// ...
}
protected void writeItems(Chunk<O> items) throws Exception {
// ...
itemWriter.write(items) // <== ItemWriter 사용부분
}
3. 해결 방법
오늘은 우선 Chunk 기반에서 스프링 배치의 트랜잭션은 어떻게 관리되고 ItemReader, ItemProcessor, ItemWriter 가 어디서 사용되는지 알아보았는데 아직 페이징하여 JPQL이 실행되지 않는 문제는 접근도 못했다.
아마 ItemReader의 사용 부분에서 확인할 수 있을 것 같은데 다음 포스팅에서 문제의 원인을 찾아봐야겠다.
4. 배운 점
- Chunk 기반의 스프링 배치는 Chunk 단위로 트랜잭션을 관리한다.
- Chunk 기반의 스프링 배치에서 사용하는 ItemReader, ItemProcessor, ItemWriter가 어디서 사용되는가
5. 출처
'TIL(Today I Learned)' 카테고리의 다른 글
[Today I Learned - 15] JpaPagingItemReader Limit, Offset not working - (3) (1) | 2024.02.08 |
---|---|
[Today I Learned - 14] JpaPagingItemReader Limit, Offset not working - (2) (0) | 2024.02.01 |
[Today I Learned - 12] Spring Batch의 Tasklet Transaction (0) | 2024.01.25 |
[Today I Learned - 11] JpaTransactionManager의 Timeout (1) | 2024.01.23 |
[Today I Learned - 10] 400 Bad Request With Spring (1) | 2024.01.22 |