Contents
see ListSpring Batch 대용량 데이터 처리
Spring Batch는 대용량 데이터의 배치 처리를 위한 프레임워크입니다. 정산, 마이그레이션, 리포트 생성 등 대량 데이터 처리 작업을 안정적으로 수행합니다.
언제 사용하나요?
- 일일/월별 정산 처리
- 대용량 데이터 마이그레이션
- 외부 시스템 데이터 연동
- 정기 리포트 생성
의존성 추가
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>기본 구조
Job (작업)
└── Step (단계)
├── ItemReader (읽기)
├── ItemProcessor (처리)
└── ItemWriter (쓰기)
// Chunk 기반 처리
// - 일정 개수(chunk size)씩 묶어서 처리
// - 트랜잭션 단위 = chunk size기본 Job 설정
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job userMigrationJob() {
return jobBuilderFactory.get("userMigrationJob")
.start(userMigrationStep())
.build();
}
@Bean
public Step userMigrationStep() {
return stepBuilderFactory.get("userMigrationStep")
.<User, UserDto>chunk(100) // 100건씩 처리
.reader(userReader())
.processor(userProcessor())
.writer(userWriter())
.faultTolerant()
.skipLimit(10) // 최대 10건 스킵
.skip(Exception.class)
.build();
}
}ItemReader 구현
// JPA 기반 Reader
@Bean
public JpaPagingItemReader<User> userReader() {
return new JpaPagingItemReaderBuilder<User>()
.name("userReader")
.entityManagerFactory(entityManagerFactory)
.queryString("SELECT u FROM User u WHERE u.status = 1")
.pageSize(100)
.build();
}
// JDBC 기반 Reader
@Bean
public JdbcCursorItemReader<User> userJdbcReader() {
return new JdbcCursorItemReaderBuilder<User>()
.name("userJdbcReader")
.dataSource(dataSource)
.sql("SELECT * FROM users WHERE status = 1")
.rowMapper(new UserRowMapper())
.build();
}
// 파일 Reader
@Bean
public FlatFileItemReader<User> fileReader() {
return new FlatFileItemReaderBuilder<User>()
.name("fileReader")
.resource(new ClassPathResource("users.csv"))
.delimited()
.names("id", "name", "email")
.targetType(User.class)
.build();
}ItemProcessor 구현
@Bean
public ItemProcessor<User, UserDto> userProcessor() {
return user -> {
// 변환 로직
return UserDto.builder()
.id(user.getId())
.name(user.getName().toUpperCase())
.email(user.getEmail())
.processedAt(LocalDateTime.now())
.build();
};
}
// 또는 별도 클래스
@Component
public class UserProcessor implements ItemProcessor<User, UserDto> {
@Override
public UserDto process(User user) throws Exception {
// null 반환 시 해당 아이템 스킵
if (!user.isActive()) {
return null;
}
return convertToDto(user);
}
}ItemWriter 구현
// JPA Writer
@Bean
public JpaItemWriter<UserDto> userWriter() {
return new JpaItemWriterBuilder<UserDto>()
.entityManagerFactory(entityManagerFactory)
.build();
}
// JDBC Writer
@Bean
public JdbcBatchItemWriter<UserDto> jdbcWriter() {
return new JdbcBatchItemWriterBuilder<UserDto>()
.dataSource(dataSource)
.sql("INSERT INTO user_backup (id, name, email) VALUES (:id, :name, :email)")
.beanMapped()
.build();
}
// 파일 Writer
@Bean
public FlatFileItemWriter<UserDto> fileWriter() {
return new FlatFileItemWriterBuilder<UserDto>()
.name("fileWriter")
.resource(new FileSystemResource("output/users.csv"))
.delimited()
.names("id", "name", "email")
.build();
}Job 실행
// 스케줄러로 실행
@Component
public class BatchScheduler {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job userMigrationJob;
@Scheduled(cron = "0 0 2 * * ?") // 매일 새벽 2시
public void runJob() throws Exception {
JobParameters params = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.addString("date", LocalDate.now().toString())
.toJobParameters();
jobLauncher.run(userMigrationJob, params);
}
}
// REST API로 실행
@RestController
public class BatchController {
@PostMapping("/batch/run")
public String runBatch() {
// Job 실행
return "Batch started";
}
}멀티 스레드 처리
@Bean
public Step parallelStep() {
return stepBuilderFactory.get("parallelStep")
.<User, UserDto>chunk(100)
.reader(userReader())
.processor(userProcessor())
.writer(userWriter())
.taskExecutor(taskExecutor())
.throttleLimit(4) // 동시 스레드 수
.build();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(100);
return executor;
}재시작과 스킵
@Bean
public Step faultTolerantStep() {
return stepBuilderFactory.get("faultTolerantStep")
.<User, UserDto>chunk(100)
.reader(userReader())
.processor(userProcessor())
.writer(userWriter())
.faultTolerant()
.skipLimit(100)
.skip(ValidationException.class)
.retryLimit(3)
.retry(TransientException.class)
.build();
}