Spring Batch 대용량 데이터 처리


Spring Batch는 대용량 데이터의 배치 처리를 위한 프레임워크입니다. 정산, 마이그레이션, 리포트 생성 등 대량 데이터 처리 작업을 안정적으로 수행합니다.



언제 사용하나요?



  • 일일/월별 정산 처리

  • 대용량 데이터 마이그레이션

  • 외부 시스템 데이터 연동

  • 정기 리포트 생성



의존성 추가


<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>


기본 구조


Job (작업)
└── Step (단계)
├── ItemReader (읽기)
├── ItemProcessor (처리)
└── ItemWriter (쓰기)

// Chunk 기반 처리
// - 일정 개수(chunk size)씩 묶어서 처리
// - 트랜잭션 단위 = chunk size


기본 Job 설정


@Configuration
@EnableBatchProcessing
public class BatchConfig {

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Bean
public Job userMigrationJob() {
return jobBuilderFactory.get("userMigrationJob")
.start(userMigrationStep())
.build();
}

@Bean
public Step userMigrationStep() {
return stepBuilderFactory.get("userMigrationStep")
.<User, UserDto>chunk(100) // 100건씩 처리
.reader(userReader())
.processor(userProcessor())
.writer(userWriter())
.faultTolerant()
.skipLimit(10) // 최대 10건 스킵
.skip(Exception.class)
.build();
}
}


ItemReader 구현


// JPA 기반 Reader
@Bean
public JpaPagingItemReader<User> userReader() {
return new JpaPagingItemReaderBuilder<User>()
.name("userReader")
.entityManagerFactory(entityManagerFactory)
.queryString("SELECT u FROM User u WHERE u.status = 1")
.pageSize(100)
.build();
}

// JDBC 기반 Reader
@Bean
public JdbcCursorItemReader<User> userJdbcReader() {
return new JdbcCursorItemReaderBuilder<User>()
.name("userJdbcReader")
.dataSource(dataSource)
.sql("SELECT * FROM users WHERE status = 1")
.rowMapper(new UserRowMapper())
.build();
}

// 파일 Reader
@Bean
public FlatFileItemReader<User> fileReader() {
return new FlatFileItemReaderBuilder<User>()
.name("fileReader")
.resource(new ClassPathResource("users.csv"))
.delimited()
.names("id", "name", "email")
.targetType(User.class)
.build();
}


ItemProcessor 구현


@Bean
public ItemProcessor<User, UserDto> userProcessor() {
return user -> {
// 변환 로직
return UserDto.builder()
.id(user.getId())
.name(user.getName().toUpperCase())
.email(user.getEmail())
.processedAt(LocalDateTime.now())
.build();
};
}

// 또는 별도 클래스
@Component
public class UserProcessor implements ItemProcessor<User, UserDto> {

@Override
public UserDto process(User user) throws Exception {
// null 반환 시 해당 아이템 스킵
if (!user.isActive()) {
return null;
}
return convertToDto(user);
}
}


ItemWriter 구현


// JPA Writer
@Bean
public JpaItemWriter<UserDto> userWriter() {
return new JpaItemWriterBuilder<UserDto>()
.entityManagerFactory(entityManagerFactory)
.build();
}

// JDBC Writer
@Bean
public JdbcBatchItemWriter<UserDto> jdbcWriter() {
return new JdbcBatchItemWriterBuilder<UserDto>()
.dataSource(dataSource)
.sql("INSERT INTO user_backup (id, name, email) VALUES (:id, :name, :email)")
.beanMapped()
.build();
}

// 파일 Writer
@Bean
public FlatFileItemWriter<UserDto> fileWriter() {
return new FlatFileItemWriterBuilder<UserDto>()
.name("fileWriter")
.resource(new FileSystemResource("output/users.csv"))
.delimited()
.names("id", "name", "email")
.build();
}


Job 실행


// 스케줄러로 실행
@Component
public class BatchScheduler {

@Autowired
private JobLauncher jobLauncher;

@Autowired
private Job userMigrationJob;

@Scheduled(cron = "0 0 2 * * ?") // 매일 새벽 2시
public void runJob() throws Exception {
JobParameters params = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.addString("date", LocalDate.now().toString())
.toJobParameters();

jobLauncher.run(userMigrationJob, params);
}
}

// REST API로 실행
@RestController
public class BatchController {

@PostMapping("/batch/run")
public String runBatch() {
// Job 실행
return "Batch started";
}
}


멀티 스레드 처리


@Bean
public Step parallelStep() {
return stepBuilderFactory.get("parallelStep")
.<User, UserDto>chunk(100)
.reader(userReader())
.processor(userProcessor())
.writer(userWriter())
.taskExecutor(taskExecutor())
.throttleLimit(4) // 동시 스레드 수
.build();
}

@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(100);
return executor;
}


재시작과 스킵


@Bean
public Step faultTolerantStep() {
return stepBuilderFactory.get("faultTolerantStep")
.<User, UserDto>chunk(100)
.reader(userReader())
.processor(userProcessor())
.writer(userWriter())
.faultTolerant()
.skipLimit(100)
.skip(ValidationException.class)
.retryLimit(3)
.retry(TransientException.class)
.build();
}