1. 무슨 문제가 있었는지?
Chunk 기반의 스프링 배치를 개발하고 있는데 JpaPagingItemReader를 사용했을 때 PageSize를 설정해도 limit과 offset이 설정되지 않는 문제가 발생해서 원인을 분석중이다. 지난 번 포스팅에서는 Chunk 기반의 트랜잭션이 어떻게 설정되고 ItemReader, ItemProcessor, ItemWriter 가 어떻게 사용되는지 살펴보았다. 이번 포스팅에서는 문제의 원인일 가능성이 높은 ItemReader에 대해 좀 더 자세히 분석해보도록 하자.
https://lazy-man.tistory.com/86
[Today I Learned - 13] JpaPagingItemReader Limit, Offset not working - (1)
1. 무슨 문제가 있었는지? 스프링 배치를 이용하여 Chunk 기반으로 개발하고 있었는데 ItemReader를 구현하는 부분에서 Limit, Offset이 적용되지 않아 전체 테이블을 조회하는 문제가 발생했는데 ItemRead
lazy-man.tistory.com
2. 문제의 원인은?
먼저 ChunkOrientedTasklet 에서 ChunkProvider를 이용하여 데이터를 읽어보는 부분을 자세히 살펴보자. ChunkOrientedTasklet 의 execute 메서드에서 ChunkProvider의 provide 메서드를 호출하고 있다. provide 메서드에서는 내부 반복자를 통해 Item을 1건씩 가져오고 있다. 반복문은 모든 아이템을 가져올 때까지(item == null) 수행하면서 Chunk<I> inputs에 데이터를 추가한다.
//ChunkOrientedTasklet.class
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) ... {
Chunk<I> inputs = ...;
inputs = chunkProvider.provide(contribution); // <-- 요기
chunkProcessor.process(contribution, inputs);
// 생략...
}
@Override
public Chunk<I> provide(final StepContribution contribution) throws Exception {
final Chunk<I> inputs = new Chunk<>();
repeatOperations.iterate(new RepeatCallback() {
@Override
public RepeatStatus doInIteration(final RepeatContext context) throws Exception {
I item = null;
// 생략...
item = read(contribution, inputs);
// ...
if (item == null) {
inputs.setEnd();
return RepeatStatus.FINISHED;
}
inputs.add(item);
contribution.incrementReadCount();
return RepeatStatus.CONTINUABLE;
}
})
}
read 메서드를 따라가다보면 아래와 같이 ItemReader의 read 메서드를 호출하는 것을 볼 수 있다. 여기서 ItemReader는 Step을 생성할 때 ItemReader로 설정했던 JpaPagingItemReader 가 될 것이다.
// ItemReader.class
@Bean
public ItemReader<TaxInvoiceReserveEntity> exportItemReader() {
HashMap<String, Object> parameters = new HashMap<>();
parameters.put("param1", {{param1}});
parameters.put("param2", {{param2}});
return new JpaPagingItemReaderBuilder<TestEntity>()
.entityManagerFactory(entityManagerFactory)
.name("JpaPagingItemReader")
.pageSize(CHUNK_SIZE)
.queryString("""
SELECT tiReserve
FROM TestEntity t1
LEFT JOIN FETCH t1.items
WHERE t1.param1 =:param1
AND t1.param2 = :param2
ORDER BY t1.id ASC
""")
.parameterValues(parameters)
.build();
}
protected final I doRead() throws Exception {
// 생략...
I item = itemReader.read();
}
ItemReader의 read 메서드를 따라가보면 AbstractItemCountingItemStreamItemReader 의 read() 메서드를 호출되고 read 메서드 안에서 AbstractPagingItemReader 클래스의 doRead() 메서드를 호출한다.
(JpaPagingItemReader는 AbstractPagingItemReader 클래스를 상속하고 있고, AbstractPagingItemReader는 AbstractItemCountingItemStreamItemReader 클래스를 상속하고 있다.)
// AbstractItemCountingItemStreamItemReader.class
@Override
public T read() throws Exception, UnexpectedInputException, ParseException {
// 생략...
T item = doRead();
// ...
return item;
}
다음은 AbstractPagingItemReader의 doRead 메서드이다. doRead 메서드에서는 doReadPage 메서드를 호출하는데 이 메서드는 JpaPagingItemReader 클래스에서 구현하고 있다. JpaPagingItemReader 의 doReadPage 메서드에서는 쿼리 실행 결과를 results 변수에 add 하도록 되어있다.
즉 아래의 메서드는 쿼리 실행 결과(result)를 인덱스(next) 참조를 통해 하나씩 리턴하도록 되어있다. 그러다가 모든 실행 결과를 리턴하면 null을 리턴하게 되고 null이 리턴되면 ChunkProvider의 provide 메서드가 종료되는 것이다.
// AbstractPagingItemReader.class
@Override
protected T doRead() throws Exception {
synchronized(lock) {
// 생략...
doReadPage();
page ++;
if(current >= pageSize) {
curent = 0;
}
}
int next = current ++;
if(next < result.size()) {
return result.get(next);
}
else {
return null;
}
}
그럼 JpaPagingItemReader 의 doReadPage 메서드를 살펴보자. createQuery 메서드를 통해 Query 객체를 생성하는데 createQuery 메서드를 살펴보면 JpaPagingItemReader를 만들 때 내가 작성했던 QueryString을 기반으로 Query 객체를 생성하는 것을 확인할 수 있다. QueryString 에는 offset, limit 을 작성하지 않았기 때문에 createQuery 메서드에서 offset, limit을 설정해주는 것은 아닌 것을 알 수 있다.
// JpaPagingItemReader.class
protected void doReadPage() {
// 트랜잭션 시작 ...
Query query = createQuery().setFirstResult(getPage() * getPageSize()).setMaxResults(getPageSize());
// ...
results.addAll(query.getResultList());
// 트랜잭션 커밋 ...
}
// JpaPagingItemReader.class
private Query createQuery() {
if(queryProvider == null) {
return entityManager.createQuery(queryString);
}
else {
return queryProvider.createQuery();
}
}
그렇다면 Query의 getResultList 메서드를 살펴보자. 해당 메서드를 추적하다보면 AbstractSelectionQuery 의 list 메서드를 확인할 수 있다. doList() 메서드를 실행하고 있는데 이 메서드는 QuerySqmImpl에서 구현하고 있다.
// AbstractSelectionQuery.class
@Override
public List<R> list() {
// 생략...
final List<R> result = doList();
return result;
}
QuerySqmImpl 의 doList() 메서드를 살펴보자. 해당 메서드에서는 SelectQueryPlan의 performList 메서드를 통해 쿼리 실행 결과를 취득하고 있다. performList에는 executionContextFordoList 메서드를 통해 생성한 DomainQueryExecutionContext 객체를 전달하고 있다.
// QuerySqmImpl.class
protected List<R> doList() {
final SqmSelectStatement<?> sqmStatement = (SqmSelectStatement<?>) getSqmStatement();
final boolean containsCollectionFetches = sqmStatement.containsCollectionFetches()
|| AppliedGraphs.containsCollectionFetches(getQueryOptions());
final boolean hasLimit = hasLimit(sqmStatement, getQueryOptions());
// ...
// ★★ 기억 ★★
final List<R> list = resolveSelectQueryPlan()
.performList(executionContextFordoList(containsCollectionFetches, hasLimit, needsDistinct);
// ...
return list;
}
먼저 SelectQueryPlan(ConcreteSqmSelectQueryPlan)의 performList 메서드부터 따라가보자. performList 메서드를 따라가다보면 buildCacheableSqmInterpretation 메서드에 DomainQueryExecutionContext 객체를 전달하여 실행하고 있다.
// ConcreteSqmSelectQueryPlan.class
@Override
public List<R> performList(DomainQueryExecutionContext executionContext) {
// ...
return withCacheableSqmInterpretation(executionContext, null, listInterpreter);
}
// ConcreteSqmSelectQueryPlan.class
private <T, X> T withCacheableSqmInterpretation(DomainQueryExecutionContext executionContext..){
CacheableSqmInterpretation localCopy = cacheableSqmInterpretation;
// 생략...
localCopy = buildCacheableSqmInterpretation(...executionContext);
// ...
}
SelectQueryPlan(ConcreteSqmSelectQueryPlan)의 buildCacheableSqmInterpretation 메서드를 살펴보면 SqlAstTranslator의 translate 메서드를 통해 JdbcOperationQuerySelect 객체를 생성하고 있다. 클래스의 이름만 보았을 때 Select 문을 만드는 부분인 것 같다.
private static CacheableSqmInterpretation buildCacheableSqmInterpretation(..,
DomainQueryExecutionContext executionContext) {
// 생략...
final SqlAstTranslator<JdbcOperationQuerySelect> selectTranslator = sqlAstTranslatorFactory
.buildSelectTranslator(sessionFactory, sqmInterpretation.getSqlAst());
// ...
final JdbcOperationQuerySelect jdbcSelect = selectTranslator.translate(jdbcParameterBindings,
executionContext.getQueryOptions()); // <-- 핵심
return ...;
}
SqlAstTranslator의 translate 메서드를 살펴보면 아래와 같다. translate 메서드 내부에서 limit에 대한 값을 설정해주고 있었다. queryOptions의 상태를 디버깅해서 살펴봤는데 "Limit 정보가 비어있다".
JpaPagingItemReader를 생성할 때 PaseSize 값을 설정하면 Limit 값이 저절로 생성될텐데 왜 비어있는 것일까? 포스팅이 생각보다 길어져서 남은 부분은 다음 포스팅에서 이어서 정리해보도록 하겠다.
// SqlAstTranslator.class
@Override
public T translate(JdbcParameterBinding jdbcParameterBindings, QueryOptions queryOptions) {
// 디버깅
this.limit = queryOptions.getLimit() == null ? null : queryOptions.getLimit().makeCopy();
// ...
return (T) jdbcOpertaion;
}
3. 해결 방법
4. 배운점
- ChunkOrientedTasklet 에서는 ChunkProvider 의 내부 반복자를 통해 데이터를 가져오는데, 내부 반복자를 통해 읽어온 데이터의 수가 ItemReader를 통해 읽어온 결과의 수와 같을때까지 반복한다.
5. 출처
'TIL(Today I Learned)' 카테고리의 다른 글
[Today I Learned - 16] JpaAuditing 적용으로 인한 단위 테스트 실패 (0) | 2024.02.23 |
---|---|
[Today I Learned - 15] JpaPagingItemReader Limit, Offset not working - (3) (1) | 2024.02.08 |
[Today I Learned - 13] JpaPagingItemReader Limit, Offset not working - (1) (0) | 2024.01.31 |
[Today I Learned - 12] Spring Batch의 Tasklet Transaction (0) | 2024.01.25 |
[Today I Learned - 11] JpaTransactionManager의 Timeout (1) | 2024.01.23 |