Java 24에서 정식 도입된 Stream Gatherers(JEP 485)는 Stream API에 커스텀 중간 연산을 추가할 수 있는 혁신적인 기능입니다. 기존 Stream API의 map, filter, flatMap만으로는 구현하기 어려웠던 슬라이딩 윈도우, 청크 분할, 상태 유지 변환 등을 깔끔하게 구현할 수 있습니다.

Stream Gatherers가 필요한 이유

기존 Stream API는 map, filter, reduce 등의 내장 연산만 제공하며, 커스텀 중간 연산을 추가할 방법이 없었습니다. Collectors로 커스텀 종단 연산은 가능하지만, 중간 연산은 불가능했습니다.

// 기존 방식: 3개씩 묶기 위한 복잡한 우회
List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9);

// Stream만으로는 불가능, 인덱스 기반 수동 처리 필요
List<List<Integer>> chunks = new ArrayList<>();
for (int i = 0; i < numbers.size(); i += 3) {
    chunks.add(numbers.subList(i, Math.min(i + 3, numbers.size())));
}
// [[1,2,3], [4,5,6], [7,8,9]]

gather() 메서드와 기본 사용법

Stream에 추가된 gather() 메서드에 Gatherer를 전달하여 커스텀 중간 연산을 적용합니다.

import java.util.stream.Gatherer;
import java.util.stream.Gatherers;

List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9);

// 내장 Gatherer: windowFixed - 고정 크기 윈도우
List<List<Integer>> chunks = numbers.stream()
    .gather(Gatherers.windowFixed(3))
    .toList();
// [[1, 2, 3], [4, 5, 6], [7, 8, 9]]

// 내장 Gatherer: windowSliding - 슬라이딩 윈도우
List<List<Integer>> sliding = numbers.stream()
    .gather(Gatherers.windowSliding(3))
    .toList();
// [[1,2,3], [2,3,4], [3,4,5], [4,5,6], [5,6,7], [6,7,8], [7,8,9]]

내장 Gatherers 전체 목록

Gatherers.fold - 누적 결과를 스트림으로

// 누적 합계를 스트림으로 출력
List<Integer> runningSums = List.of(1, 2, 3, 4, 5).stream()
    .gather(Gatherers.fold(
        () -> 0,                    // 초기값
        (sum, element) -> sum + element  // 누적 함수
    ))
    .toList();
// [15] - 최종 누적 결과

Gatherers.scan - 중간 누적 결과 모두 출력

// 러닝 합계 (중간 결과 모두 포함)
List<Integer> runningSums = List.of(1, 2, 3, 4, 5).stream()
    .gather(Gatherers.scan(
        () -> 0,
        (sum, element) -> sum + element
    ))
    .toList();
// [1, 3, 6, 10, 15]

Gatherers.mapConcurrent - 병렬 매핑

// 최대 10개 가상 스레드로 동시 처리
List<String> results = urls.stream()
    .gather(Gatherers.mapConcurrent(10, url -> fetchContent(url)))
    .toList();
// 순서 보장하면서 최대 10개 동시 HTTP 요청

커스텀 Gatherer 만들기

Gatherer 인터페이스를 구현하여 완전히 새로운 중간 연산을 만들 수 있습니다.

중복 제거 Gatherer (연속 중복만)

// 연속된 중복 요소만 제거 (distinct와 다름)
static <T> Gatherer<T, ?, T> distinctConsecutive() {
    return Gatherer.ofSequential(
        () -> new Object() { T last = null; boolean hasLast = false; },
        (state, element, downstream) -> {
            if (!state.hasLast || !Objects.equals(state.last, element)) {
                state.last = element;
                state.hasLast = true;
                return downstream.push(element);
            }
            return true; // 중복이면 건너뛰기
        }
    );
}

// 사용
List<Integer> result = List.of(1, 1, 2, 2, 3, 1, 1).stream()
    .gather(distinctConsecutive())
    .toList();
// [1, 2, 3, 1] - 연속 중복만 제거, 비연속 중복은 유지

조건부 청크 분할 Gatherer

// 조건이 참인 동안 같은 그룹으로 묶기
static <T> Gatherer<T, ?, List<T>> chunkWhile(BiPredicate<T, T> shouldGroup) {
    return Gatherer.ofSequential(
        () -> new Object() { List<T> current = new ArrayList<>(); },
        (state, element, downstream) -> {
            if (state.current.isEmpty() ||
                shouldGroup.test(state.current.getLast(), element)) {
                state.current.add(element);
            } else {
                downstream.push(List.copyOf(state.current));
                state.current.clear();
                state.current.add(element);
            }
            return true;
        },
        (state, downstream) -> {
            if (!state.current.isEmpty()) {
                downstream.push(List.copyOf(state.current));
            }
        }
    );
}

// 오름차순으로 연속되는 동안 묶기
List<List<Integer>> groups = List.of(1, 2, 3, 1, 2, 5, 6).stream()
    .gather(chunkWhile((a, b) -> b > a))
    .toList();
// [[1, 2, 3], [1, 2, 5, 6]]

레이트 리미터 Gatherer

// 초당 N개로 처리 속도 제한
static <T> Gatherer<T, ?, T> rateLimited(int maxPerSecond) {
    return Gatherer.ofSequential(
        () -> new Object() {
            long lastTime = System.nanoTime();
            int count = 0;
        },
        (state, element, downstream) -> {
            long now = System.nanoTime();
            if (now - state.lastTime >= 1_000_000_000L) {
                state.lastTime = now;
                state.count = 0;
            }
            if (state.count >= maxPerSecond) {
                try {
                    Thread.sleep(1000 - (now - state.lastTime) / 1_000_000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return false;
                }
                state.lastTime = System.nanoTime();
                state.count = 0;
            }
            state.count++;
            return downstream.push(element);
        }
    );
}

// API 호출을 초당 5개로 제한
urls.stream()
    .gather(rateLimited(5))
    .gather(Gatherers.mapConcurrent(5, url -> callApi(url)))
    .forEach(System.out::println);

Gatherer 체이닝

여러 Gatherer를 체이닝하여 복잡한 데이터 변환 파이프라인을 구성할 수 있습니다.

// 로그 분석 파이프라인
List<LogSummary> summaries = logEntries.stream()
    .gather(distinctConsecutive())           // 연속 중복 로그 제거
    .gather(Gatherers.windowFixed(100))      // 100개씩 배치
    .gather(Gatherers.mapConcurrent(4,       // 4개 스레드로 병렬 분석
        batch -> analyzeBatch(batch)))
    .toList();

Stream Gatherers는 Java Stream API의 표현력을 한 단계 끌어올리는 기능입니다. 기존에는 스트림을 중간에 collect한 뒤 다시 stream하거나, 복잡한 우회 로직이 필요했던 작업들을 Gatherer 하나로 깔끔하게 해결할 수 있습니다. Collectors가 종단 연산을 확장했듯, Gatherers는 중간 연산의 무한한 확장 가능성을 열어주었습니다.