Java 23에서 프리뷰로 도입된 Gatherers API(JEP 473)는 Stream API의 가장 큰 한계였던 '중간 연산 커스터마이징'을 해결합니다. 기존에는 map, filter, flatMap 등 미리 정의된 중간 연산만 사용할 수 있었지만, Gatherers를 사용하면 윈도우, 스캔, 그룹화 같은 복잡한 중간 연산을 직접 정의할 수 있습니다.

Gatherer의 구조 이해

Gatherer는 4가지 선택적 컴포넌트로 구성됩니다.

  • initializer: 상태 객체 초기화 (선택)
  • integrator: 각 입력 요소를 처리하는 핵심 로직 (필수)
  • combiner: 병렬 스트림에서 상태 병합 (선택)
  • finisher: 스트림 종료 시 최종 처리 (선택)
// Gatherer 인터페이스 시그니처
public interface Gatherer<T, A, R> {
    Supplier<A> initializer();           // 상태 생성
    Integrator<A, T, R> integrator();    // 요소 처리
    BinaryOperator<A> combiner();        // 병렬 병합
    BiConsumer<A, Downstream<R>> finisher(); // 최종 처리
}

내장 Gatherers 사용하기

Java 23은 자주 사용되는 Gatherer를 내장으로 제공합니다.

Gatherers.windowFixed(n) - 고정 크기 윈도우

import java.util.stream.Gatherers;

List<List<Integer>> windows = Stream.of(1, 2, 3, 4, 5, 6, 7, 8)
    .gather(Gatherers.windowFixed(3))
    .toList();
// [[1, 2, 3], [4, 5, 6], [7, 8]]

// 실전 활용: 배치 처리
List<User> allUsers = userRepository.findAll();
allUsers.stream()
    .gather(Gatherers.windowFixed(100))
    .forEach(batch -> {
        emailService.sendBulk(batch);
        log.info("{}명에게 메일 발송 완료", batch.size());
    });

Gatherers.windowSliding(n) - 슬라이딩 윈도우

List<List<Integer>> sliding = Stream.of(1, 2, 3, 4, 5)
    .gather(Gatherers.windowSliding(3))
    .toList();
// [[1, 2, 3], [2, 3, 4], [3, 4, 5]]

// 실전 활용: 이동 평균 계산
List<Double> prices = List.of(100.0, 102.5, 98.3, 105.7, 110.2, 108.1);
List<Double> movingAvg = prices.stream()
    .gather(Gatherers.windowSliding(3))
    .map(w -> w.stream()
        .mapToDouble(Double::doubleValue)
        .average()
        .orElse(0))
    .toList();
// [100.27, 102.17, 104.73, 108.0]

Gatherers.fold(init, fn) - 누적 중간 연산

// 누적 합계를 중간 결과로 사용
Optional<String> combined = Stream.of("Hello", " ", "World")
    .gather(Gatherers.fold(() -> "", String::concat))
    .findFirst();
// Optional["Hello World"]

Gatherers.scan(init, fn) - 누적 스캔

// 누적 합계의 각 단계를 모두 출력
List<Integer> runningSums = Stream.of(1, 2, 3, 4, 5)
    .gather(Gatherers.scan(() -> 0, Integer::sum))
    .toList();
// [1, 3, 6, 10, 15]

Gatherers.mapConcurrent(maxConcurrency, fn)

// Virtual Thread를 활용한 동시 매핑
List<String> responses = urlList.stream()
    .gather(Gatherers.mapConcurrent(10, url -> {
        return httpClient.send(
            HttpRequest.newBuilder(URI.create(url)).build(),
            HttpResponse.BodyHandlers.ofString()
        ).body();
    }))
    .toList();
// 최대 10개의 Virtual Thread로 동시 HTTP 요청

커스텀 Gatherer 만들기

중복 제거 Gatherer (연속 중복만)

static <T> Gatherer<T, ?, T> distinctConsecutive() {
    return Gatherer.ofSequential(
        () -> new Object() { T last = null; boolean hasLast = false; },
        (state, element, downstream) -> {
            if (!state.hasLast || !Objects.equals(state.last, element)) {
                state.last = element;
                state.hasLast = true;
                return downstream.push(element);
            }
            return true;
        }
    );
}

// 사용
List<Integer> result = Stream.of(1, 1, 2, 2, 2, 3, 1, 1)
    .gather(distinctConsecutive())
    .toList();
// [1, 2, 3, 1]

청크 분할 Gatherer (조건 기반)

static <T> Gatherer<T, ?, List<T>> chunkBy(BiPredicate<T, T> sameGroup) {
    return Gatherer.ofSequential(
        () -> new Object() {
            List<T> current = new ArrayList<>();
        },
        (state, element, downstream) -> {
            if (state.current.isEmpty() ||
                sameGroup.test(state.current.getLast(), element)) {
                state.current.add(element);
            } else {
                downstream.push(List.copyOf(state.current));
                state.current.clear();
                state.current.add(element);
            }
            return true;
        },
        (state, downstream) -> {
            if (!state.current.isEmpty()) {
                downstream.push(List.copyOf(state.current));
            }
        }
    );
}

// 사용: 같은 부호끼리 그룹화
List<List<Integer>> groups = Stream.of(1, 3, -2, -5, 4, 7, -1)
    .gather(chunkBy((a, b) -> (a >= 0) == (b >= 0)))
    .toList();
// [[1, 3], [-2, -5], [4, 7], [-1]]

Gatherer 체이닝

여러 Gatherer를 andThen으로 연결할 수 있습니다.

Gatherer<Integer, ?, Double> pipeline = 
    Gatherers.<Integer>windowSliding(3)
        .andThen(Gatherers.mapConcurrent(4, window ->
            window.stream().mapToInt(Integer::intValue).average().orElse(0)));

List<Double> result = IntStream.rangeClosed(1, 100)
    .boxed()
    .gather(pipeline)
    .toList();

Virtual Threads와의 시너지

Gatherers의 mapConcurrent는 내부적으로 Virtual Thread를 사용합니다. Java 21에서 정식 도입된 Virtual Thread와 결합하면 I/O 중심 작업에서 극적인 성능 향상을 얻을 수 있습니다.

// 1000개 URL을 50개씩 동시 크롤링
List<CrawlResult> results = urls.stream()
    .gather(Gatherers.mapConcurrent(50, url -> {
        var response = httpClient.send(
            HttpRequest.newBuilder(URI.create(url))
                .timeout(Duration.ofSeconds(10))
                .build(),
            HttpResponse.BodyHandlers.ofString());
        return new CrawlResult(url, response.statusCode(), response.body());
    }))
    .filter(r -> r.statusCode() == 200)
    .toList();

컴파일 및 실행

Gatherers는 Java 23에서 프리뷰 기능이므로 --enable-preview 플래그가 필요합니다.

# 컴파일
javac --enable-preview --release 23 GathererDemo.java

# 실행  
java --enable-preview GathererDemo

# Maven pom.xml 설정
<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <configuration>
        <release>23</release>
        <compilerArgs>
            <arg>--enable-preview</arg>
        </compilerArgs>
    </configuration>
</plugin>