Contents
see ListJava 23에서 프리뷰로 도입된 Gatherers API(JEP 473)는 Stream API의 가장 큰 한계였던 '중간 연산 커스터마이징'을 해결합니다. 기존에는 map, filter, flatMap 등 미리 정의된 중간 연산만 사용할 수 있었지만, Gatherers를 사용하면 윈도우, 스캔, 그룹화 같은 복잡한 중간 연산을 직접 정의할 수 있습니다.
Gatherer의 구조 이해
Gatherer는 4가지 선택적 컴포넌트로 구성됩니다.
- initializer: 상태 객체 초기화 (선택)
- integrator: 각 입력 요소를 처리하는 핵심 로직 (필수)
- combiner: 병렬 스트림에서 상태 병합 (선택)
- finisher: 스트림 종료 시 최종 처리 (선택)
// Gatherer 인터페이스 시그니처
public interface Gatherer<T, A, R> {
Supplier<A> initializer(); // 상태 생성
Integrator<A, T, R> integrator(); // 요소 처리
BinaryOperator<A> combiner(); // 병렬 병합
BiConsumer<A, Downstream<R>> finisher(); // 최종 처리
}
내장 Gatherers 사용하기
Java 23은 자주 사용되는 Gatherer를 내장으로 제공합니다.
Gatherers.windowFixed(n) - 고정 크기 윈도우
import java.util.stream.Gatherers;
List<List<Integer>> windows = Stream.of(1, 2, 3, 4, 5, 6, 7, 8)
.gather(Gatherers.windowFixed(3))
.toList();
// [[1, 2, 3], [4, 5, 6], [7, 8]]
// 실전 활용: 배치 처리
List<User> allUsers = userRepository.findAll();
allUsers.stream()
.gather(Gatherers.windowFixed(100))
.forEach(batch -> {
emailService.sendBulk(batch);
log.info("{}명에게 메일 발송 완료", batch.size());
});
Gatherers.windowSliding(n) - 슬라이딩 윈도우
List<List<Integer>> sliding = Stream.of(1, 2, 3, 4, 5)
.gather(Gatherers.windowSliding(3))
.toList();
// [[1, 2, 3], [2, 3, 4], [3, 4, 5]]
// 실전 활용: 이동 평균 계산
List<Double> prices = List.of(100.0, 102.5, 98.3, 105.7, 110.2, 108.1);
List<Double> movingAvg = prices.stream()
.gather(Gatherers.windowSliding(3))
.map(w -> w.stream()
.mapToDouble(Double::doubleValue)
.average()
.orElse(0))
.toList();
// [100.27, 102.17, 104.73, 108.0]
Gatherers.fold(init, fn) - 누적 중간 연산
// 누적 합계를 중간 결과로 사용
Optional<String> combined = Stream.of("Hello", " ", "World")
.gather(Gatherers.fold(() -> "", String::concat))
.findFirst();
// Optional["Hello World"]
Gatherers.scan(init, fn) - 누적 스캔
// 누적 합계의 각 단계를 모두 출력
List<Integer> runningSums = Stream.of(1, 2, 3, 4, 5)
.gather(Gatherers.scan(() -> 0, Integer::sum))
.toList();
// [1, 3, 6, 10, 15]
Gatherers.mapConcurrent(maxConcurrency, fn)
// Virtual Thread를 활용한 동시 매핑
List<String> responses = urlList.stream()
.gather(Gatherers.mapConcurrent(10, url -> {
return httpClient.send(
HttpRequest.newBuilder(URI.create(url)).build(),
HttpResponse.BodyHandlers.ofString()
).body();
}))
.toList();
// 최대 10개의 Virtual Thread로 동시 HTTP 요청
커스텀 Gatherer 만들기
중복 제거 Gatherer (연속 중복만)
static <T> Gatherer<T, ?, T> distinctConsecutive() {
return Gatherer.ofSequential(
() -> new Object() { T last = null; boolean hasLast = false; },
(state, element, downstream) -> {
if (!state.hasLast || !Objects.equals(state.last, element)) {
state.last = element;
state.hasLast = true;
return downstream.push(element);
}
return true;
}
);
}
// 사용
List<Integer> result = Stream.of(1, 1, 2, 2, 2, 3, 1, 1)
.gather(distinctConsecutive())
.toList();
// [1, 2, 3, 1]
청크 분할 Gatherer (조건 기반)
static <T> Gatherer<T, ?, List<T>> chunkBy(BiPredicate<T, T> sameGroup) {
return Gatherer.ofSequential(
() -> new Object() {
List<T> current = new ArrayList<>();
},
(state, element, downstream) -> {
if (state.current.isEmpty() ||
sameGroup.test(state.current.getLast(), element)) {
state.current.add(element);
} else {
downstream.push(List.copyOf(state.current));
state.current.clear();
state.current.add(element);
}
return true;
},
(state, downstream) -> {
if (!state.current.isEmpty()) {
downstream.push(List.copyOf(state.current));
}
}
);
}
// 사용: 같은 부호끼리 그룹화
List<List<Integer>> groups = Stream.of(1, 3, -2, -5, 4, 7, -1)
.gather(chunkBy((a, b) -> (a >= 0) == (b >= 0)))
.toList();
// [[1, 3], [-2, -5], [4, 7], [-1]]
Gatherer 체이닝
여러 Gatherer를 andThen으로 연결할 수 있습니다.
Gatherer<Integer, ?, Double> pipeline =
Gatherers.<Integer>windowSliding(3)
.andThen(Gatherers.mapConcurrent(4, window ->
window.stream().mapToInt(Integer::intValue).average().orElse(0)));
List<Double> result = IntStream.rangeClosed(1, 100)
.boxed()
.gather(pipeline)
.toList();
Virtual Threads와의 시너지
Gatherers의 mapConcurrent는 내부적으로 Virtual Thread를 사용합니다. Java 21에서 정식 도입된 Virtual Thread와 결합하면 I/O 중심 작업에서 극적인 성능 향상을 얻을 수 있습니다.
// 1000개 URL을 50개씩 동시 크롤링
List<CrawlResult> results = urls.stream()
.gather(Gatherers.mapConcurrent(50, url -> {
var response = httpClient.send(
HttpRequest.newBuilder(URI.create(url))
.timeout(Duration.ofSeconds(10))
.build(),
HttpResponse.BodyHandlers.ofString());
return new CrawlResult(url, response.statusCode(), response.body());
}))
.filter(r -> r.statusCode() == 200)
.toList();
컴파일 및 실행
Gatherers는 Java 23에서 프리뷰 기능이므로 --enable-preview 플래그가 필요합니다.
# 컴파일
javac --enable-preview --release 23 GathererDemo.java
# 실행
java --enable-preview GathererDemo
# Maven pom.xml 설정
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<release>23</release>
<compilerArgs>
<arg>--enable-preview</arg>
</compilerArgs>
</configuration>
</plugin>