TIL(Today I Learned)

[Today I Learned - 12] Spring Batch의 Tasklet Transaction

lazy man 2024. 1. 25. 16:32

1. 어떤 문제가 있었나?


스프링 배치를 이용하여 배치성 프로그램을 개발하고 있는데 tasklet에서 작업 시간이 오래되면서 connection timeout이 발생했다. timeout은 60초 정도 잡아두었는데 개발 환경에서 적은 데이터로도 timeout이 발생해서 트랜잭션 단위를 더 짧게 가져가기 위해 Spring Batch 에서 트랜잭션 관리를 어떻게 하는지 공부가 필요했다.

 

 

2. 문제의 원인


 먼저 Spring Batch는 Tasklet 기반과 Chunk 기반으로 구분된다. Chunk 기반의 경우 대용량의 데이터를 Chunk 단위로 구분하고 Chunk 단위로 트랜잭션을 관리한다. 하지만 Tasklet은 Step 단위(?)로 트랜잭션을 관리하는데 Tasklet 을 수행하는데 오랜 시간이 걸리면서 timeout이 발생한 것으로 보인다. 

// Step 생성
@Bean
public Step testStep() {
    return new StepBuilder("testStep", jobRepository)
          .tasklet(testTasklet() transactionManager) // <-- 트랜잭션 매니저, timeout 60초 설정
          .build();
}
// Tasklet 생성
@Bean
public Tasklet testTasklet() {
    return (contribution, chunkContext) -> {
        // 생략...
        // for ( ) {
        // 
        // }
        return RepeatStatus.FINISHED;
    }
}

 

 먼저 Spring Batch은 JobLauncher의 run 메서드를 통해 통해 Job을 실행한다. Tasklet 기반의 경우 Job은 여러 개의 Step을 가지고, 각 Step은 하나의 Tasklet을 가지며 트랜잭션을 각각 관리한다. 

// JobLauncher
@Override
public JobExecution run(final Job job, final JobParameters jobParameters) throws ...
{	
    // ...
    try {
        taskExecutor.execute(new Runnable() {
            @Override
            public void run() {
                // jobRepository에서 jobExecution 생성 후 job 실행
                job.execute(jobExcution); 
            }
        });
    } catch(TaskRejectedException e) {
        // JobExecution 실패 업데이트
    }
}

 

 execute 메서드는 내부적으로 doExecute 메서드를 호출하며 doExecute 메서드는 아래와 같이 Job에 등록된 Step을 Loop돌면서 StepHandler의 handleStep 메서드를 호출하고 있었다. 

// Job 구현체
@Override
protected void doExecute(JobExecution execution) throws ... {
    // ...
    StepExecution stepExecution = null;
    for(Step step: steps) {
        stepExecution = handleStep(step, execution);
    }
    // ...
}
// StepHandler 구현체
@Override
public StepExecution handleStep(Step step, JobExecution execution) throws ... {
    // 생략..
    step.execute(currentStepExecution); // JobExecution에서 StepExecution 생성
    // 생략..
}

 

 자 그럼 Step의 execution 메서드를 살펴보자. 거의 다 온 것 같다. 세마포어를 통해 Step 간 병렬 처리가 가능하는 소리인건가..? 아무튼 찾고자 하는 부분은 TransactionTemplate의 execute 를 호출하는 부분이다. 여기서 transactionManager를 전달하는데 transactionManager는 Step을 생성할 때 주입해주고 있다.

// TaskletStep.class
@Override
protected void doExecute(StepExecution stepExecution) throws Exception {
    // Shared semaphore per step execution, so other step executions can run
    // in parallel without needing the lock
    final Semaphore semaphore = createSemaphore();
    
    stepOperations.iterate(new StepContextRepeatCallback(stepExecution) {
        @Override
        public RepeatStatus doInChunckContext(RepeatContext repeatContext, ChunkContext chunckContext)
            throws Exception {
            // 생략...
            RepeatStatus result = new TransactionTemplate(transactionManager, transactionAttibute)
                .execute(new ChunkTransactionCallback(chunkContext, semaphore));
                
            // 생략...
        }
    })
}

 

 TransactionTemplate의 execute 메서드를 살펴보자. 트랜잭션 매니저로부터 트랜잭션을 얻은 후 TransactionCallback 클래스의 doInTransaction() 메서드를 호출하고 있다. 여기서 TransactionCallback  클래스는 TaskletStep에 정의된 내부 클래스이다. 이 때 정상적으로 처리되면 commit, 예외가 발생하면 rollback 처리를 한다. 어디서 많이 본 코드라고 생각했는데 코드를 추적하다보니 스프링에서 트랜잭션을 관리하기 위한 AOP 코드까지 찾게 된 것이다 ㅋㅋ.

// TransactionTemplate.class
public <T> T execute(TransactionCallback<T> action) throws TransactionException {
    // 생략...
    Transaction status = this.transactionManager.getTransaction(this);
    T result;
    try {
        result = action.doInTransaction(status)
    } catch(...) {
        // rollback
        throw ...
    } 
    // commit
    // 생략...
}

 

 진짜 마지막이다.. TransactionCallback 클래스의 doInTransaction 메서드를 살펴보자. 드디어 내가 정의한 tasklet의 코드가 호출되고 있다. 정리하자면 step에서 정의한 transactionManager가 tasklet을 실행하기 직전에 transaction을 생성하고 이 트랜잭션 안에서 tasklet의 코드가 실행되는 것이다. 

// TaskletStep.ChunkTransactionCallback.class
@Override
public RepeatStatus doInTransaction(TransactionStatus status) {
    // 생략...
    RepeatStatus result = tasklet.execute(contribution, chunckContext);
    
    // 생략..
    return result;
}

 

3. 해결방법


위와 같이 Tasklet 기반에서 트랜잭션이 어떻게 관리되는지 살펴보았다. Step 단위로 관리되는 트랜잭션을 좀 더 작은 단위로 관리하기 위해서는 어떻게 해야할까? 

  1. Tasklet 내부에서 새로운 트랜잭션 생성
  2. Chunk 단위로 트랜잭션 관리(Tasklet 기반에서 Chunk 기반으로 변경)

1번의 경우 JobRepository에서 스프링 배치 관련 테이블의 상태를 관리하는데 이 부분에 영향이 있을 것 같다. 왜냐하면 tasklet에서 반복문을 수행하면서 새로운 트랜잭션(REQUIRED_NEW)를 실행하게 되면 JOB의 트랜잭션과 별도로 실행된다. 즉 tasklet의 내부 작업은 모두 성공해서 개별로 commit 이 완료됐지만 작업 이후 특수한 상황에 의해 예외가 발생하면서 step 범위의 transaction은 롤백될 수 있는 상황이 발생할 수 있을 것 같다.

 

즉 1번의 경우는 리스크가 상당해보이기 때문에 이와 같은 문제를 해결하기 위해서는 Chunk 단위로 배치를 변경해야 할 것 같다.

 

4. 배운점


  • Spring Batch 에서 Tasklet 기반의 트랜잭션 관리

 

5. 출처