TIL(Today I Learned)

[Today I Learned - 14] JpaPagingItemReader Limit, Offset not working - (2)

lazy man 2024. 2. 1. 12:00

1. 무슨 문제가 있었는지?


 Chunk 기반의 스프링 배치를 개발하고 있는데 JpaPagingItemReader를 사용했을 때 PageSize를 설정해도 limit과 offset이 설정되지 않는 문제가 발생해서 원인을 분석중이다. 지난 번 포스팅에서는 Chunk 기반의 트랜잭션이 어떻게 설정되고 ItemReader, ItemProcessor, ItemWriter 가 어떻게 사용되는지 살펴보았다. 이번 포스팅에서는 문제의 원인일 가능성이 높은 ItemReader에 대해 좀 더 자세히 분석해보도록 하자.

 

https://lazy-man.tistory.com/86

 

[Today I Learned - 13] JpaPagingItemReader Limit, Offset not working - (1)

1. 무슨 문제가 있었는지? 스프링 배치를 이용하여 Chunk 기반으로 개발하고 있었는데 ItemReader를 구현하는 부분에서 Limit, Offset이 적용되지 않아 전체 테이블을 조회하는 문제가 발생했는데 ItemRead

lazy-man.tistory.com

 

 

2. 문제의 원인은?


 먼저 ChunkOrientedTasklet 에서 ChunkProvider를 이용하여 데이터를 읽어보는 부분을 자세히 살펴보자. ChunkOrientedTasklet 의 execute 메서드에서 ChunkProvider의 provide 메서드를 호출하고 있다. provide 메서드에서는  내부 반복자를 통해 Item을 1건씩 가져오고 있다. 반복문은 모든 아이템을 가져올 때까지(item == null) 수행하면서 Chunk<I> inputs에 데이터를 추가한다. 

//ChunkOrientedTasklet.class
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) ... {
    Chunk<I> inputs = ...;
    inputs = chunkProvider.provide(contribution); // <-- 요기
    
    chunkProcessor.process(contribution, inputs);
    // 생략...
}
@Override
public Chunk<I> provide(final StepContribution contribution) throws Exception {
    final Chunk<I> inputs = new Chunk<>();
    repeatOperations.iterate(new RepeatCallback() {
        @Override
        public RepeatStatus doInIteration(final RepeatContext context) throws Exception {
            I item = null;
            // 생략...
            item = read(contribution, inputs);
            // ...
            if (item == null) {
                inputs.setEnd();
                return RepeatStatus.FINISHED;
            }
            inputs.add(item);
            contribution.incrementReadCount();
            return RepeatStatus.CONTINUABLE;
        }
    })
}

 

 read 메서드를 따라가다보면 아래와 같이 ItemReader의 read 메서드를 호출하는 것을 볼 수 있다. 여기서 ItemReader는 Step을 생성할 때 ItemReader로 설정했던 JpaPagingItemReader 가 될 것이다. 

// ItemReader.class
@Bean
public ItemReader<TaxInvoiceReserveEntity> exportItemReader() {
    HashMap<String, Object> parameters = new HashMap<>();
    parameters.put("param1", {{param1}});
    parameters.put("param2", {{param2}});

    return new JpaPagingItemReaderBuilder<TestEntity>()
            .entityManagerFactory(entityManagerFactory)
            .name("JpaPagingItemReader")
            .pageSize(CHUNK_SIZE)
            .queryString("""
                    SELECT tiReserve
                        FROM TestEntity t1
                            LEFT JOIN FETCH t1.items
                        WHERE t1.param1 =:param1
                            AND t1.param2 = :param2
                    ORDER BY t1.id ASC
                    """)
            .parameterValues(parameters)
            .build();
}
protected final I doRead() throws Exception {
    // 생략...
    I item = itemReader.read();
}

 

 ItemReader의 read 메서드를 따라가보면 AbstractItemCountingItemStreamItemReader 의 read() 메서드를 호출되고 read 메서드 안에서 AbstractPagingItemReader 클래스의 doRead() 메서드를 호출한다.

(JpaPagingItemReader는 AbstractPagingItemReader 클래스를 상속하고 있고, AbstractPagingItemReader는 AbstractItemCountingItemStreamItemReader 클래스를 상속하고 있다.)

// AbstractItemCountingItemStreamItemReader.class
@Override
public T read() throws Exception, UnexpectedInputException, ParseException {
    // 생략...
    T item = doRead();
    // ...
    return item;
}

 

 다음은 AbstractPagingItemReader의 doRead 메서드이다. doRead 메서드에서는 doReadPage 메서드를 호출하는데 이 메서드는 JpaPagingItemReader 클래스에서 구현하고 있다. JpaPagingItemReader 의 doReadPage 메서드에서는 쿼리 실행 결과를 results 변수에 add 하도록 되어있다. 

 

 즉 아래의 메서드는 쿼리 실행 결과(result)를 인덱스(next) 참조를 통해 하나씩 리턴하도록 되어있다. 그러다가 모든 실행 결과를 리턴하면 null을 리턴하게 되고 null이 리턴되면 ChunkProvider의 provide 메서드가 종료되는 것이다.

// AbstractPagingItemReader.class
@Override
protected T doRead() throws Exception {
    synchronized(lock) {
      // 생략...
      doReadPage();
      page ++;
      if(current >= pageSize) {
          curent = 0;
      }
    }
    
    int next = current ++;
    if(next < result.size()) {
        return result.get(next);
    }
    else {
        return null;
    }
}

 

 그럼 JpaPagingItemReader 의 doReadPage 메서드를 살펴보자. createQuery 메서드를 통해 Query 객체를 생성하는데 createQuery 메서드를 살펴보면 JpaPagingItemReader를 만들 때 내가 작성했던 QueryString을 기반으로 Query 객체를 생성하는 것을 확인할 수 있다. QueryString 에는 offset, limit 을 작성하지 않았기 때문에 createQuery 메서드에서 offset, limit을 설정해주는 것은 아닌 것을 알 수 있다.

// JpaPagingItemReader.class
protected void doReadPage() {
    // 트랜잭션 시작 ...
    Query query = createQuery().setFirstResult(getPage() * getPageSize()).setMaxResults(getPageSize());
    
    // ...
    results.addAll(query.getResultList());
    // 트랜잭션 커밋 ...
}

// JpaPagingItemReader.class
private Query createQuery() {
    if(queryProvider == null) {
        return entityManager.createQuery(queryString);
    }
    else {
        return queryProvider.createQuery();
    }
}

 

 그렇다면 Query의 getResultList 메서드를 살펴보자. 해당 메서드를 추적하다보면 AbstractSelectionQuery 의 list 메서드를 확인할 수 있다. doList() 메서드를 실행하고 있는데 이 메서드는 QuerySqmImpl에서 구현하고 있다.

// AbstractSelectionQuery.class
@Override
public List<R> list() {
    // 생략...
    final List<R> result = doList();
    return result;
}

 

 QuerySqmImpl 의 doList() 메서드를 살펴보자. 해당 메서드에서는 SelectQueryPlan의 performList 메서드를 통해 쿼리 실행 결과를 취득하고 있다. performList에는 executionContextFordoList 메서드를 통해 생성한 DomainQueryExecutionContext 객체를 전달하고 있다.

// QuerySqmImpl.class
protected List<R> doList() {
    final SqmSelectStatement<?> sqmStatement = (SqmSelectStatement<?>) getSqmStatement();
    final boolean containsCollectionFetches = sqmStatement.containsCollectionFetches()
        || AppliedGraphs.containsCollectionFetches(getQueryOptions());
    final boolean hasLimit = hasLimit(sqmStatement, getQueryOptions());
    // ...
    // ★★ 기억 ★★
    final List<R> list = resolveSelectQueryPlan()
        .performList(executionContextFordoList(containsCollectionFetches, hasLimit, needsDistinct);
    // ...
    return list;
}

 

 먼저 SelectQueryPlan(ConcreteSqmSelectQueryPlan)의 performList 메서드부터 따라가보자. performList 메서드를 따라가다보면 buildCacheableSqmInterpretation 메서드에 DomainQueryExecutionContext 객체를 전달하여 실행하고 있다.

// ConcreteSqmSelectQueryPlan.class
@Override
public List<R> performList(DomainQueryExecutionContext executionContext) {
    // ...
    return withCacheableSqmInterpretation(executionContext, null, listInterpreter);
}

// ConcreteSqmSelectQueryPlan.class
private <T, X> T withCacheableSqmInterpretation(DomainQueryExecutionContext executionContext..){
    CacheableSqmInterpretation localCopy = cacheableSqmInterpretation;
    // 생략...
    localCopy = buildCacheableSqmInterpretation(...executionContext);
    // ...
}

 

 SelectQueryPlan(ConcreteSqmSelectQueryPlan)의 buildCacheableSqmInterpretation 메서드를 살펴보면 SqlAstTranslator의 translate 메서드를 통해 JdbcOperationQuerySelect 객체를 생성하고 있다. 래스의 이름만 보았을 때 Select 문을 만드는 부분인 것 같다. 

private static CacheableSqmInterpretation buildCacheableSqmInterpretation(.., 
    DomainQueryExecutionContext executionContext) {
    // 생략...
    final SqlAstTranslator<JdbcOperationQuerySelect> selectTranslator = sqlAstTranslatorFactory
        .buildSelectTranslator(sessionFactory, sqmInterpretation.getSqlAst());
    // ...
    final JdbcOperationQuerySelect jdbcSelect = selectTranslator.translate(jdbcParameterBindings,
        executionContext.getQueryOptions()); // <-- 핵심
    
    return ...;
}

 

 SqlAstTranslator의 translate 메서드를 살펴보면 아래와 같다. translate 메서드 내부에서 limit에 대한 값을 설정해주고 있었다. queryOptions의 상태를 디버깅해서 살펴봤는데 "Limit 정보가 비어있다". 

 

JpaPagingItemReader를 생성할 때 PaseSize 값을 설정하면 Limit 값이 저절로 생성될텐데 왜 비어있는 것일까? 포스팅이 생각보다 길어져서 남은 부분은 다음 포스팅에서 이어서 정리해보도록 하겠다.

// SqlAstTranslator.class
@Override
public T translate(JdbcParameterBinding jdbcParameterBindings, QueryOptions queryOptions) {
    // 디버깅
    this.limit = queryOptions.getLimit() == null ? null : queryOptions.getLimit().makeCopy();
    // ...
    
    return (T) jdbcOpertaion;
}

queryOptions의 limit 조건이 비어있다?

 

3. 해결 방법


 

 

4. 배운점


  • ChunkOrientedTasklet 에서는 ChunkProvider 의 내부 반복자 통해 데이터를 가져오는데, 내부 반복자를 통해 읽어온 데이터의 수가 ItemReader를 통해 읽어온 결과의 수와 같을때까지 반복한다.

 

 

5. 출처