Contents
see ListJava 24에서 정식 도입된 Stream Gatherers(JEP 485)는 Stream API에 커스텀 중간 연산을 추가할 수 있는 혁신적인 기능입니다. 기존 Stream API의 map, filter, flatMap만으로는 구현하기 어려웠던 슬라이딩 윈도우, 청크 분할, 상태 유지 변환 등을 깔끔하게 구현할 수 있습니다.
Stream Gatherers가 필요한 이유
기존 Stream API는 map, filter, reduce 등의 내장 연산만 제공하며, 커스텀 중간 연산을 추가할 방법이 없었습니다. Collectors로 커스텀 종단 연산은 가능하지만, 중간 연산은 불가능했습니다.
// 기존 방식: 3개씩 묶기 위한 복잡한 우회
List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9);
// Stream만으로는 불가능, 인덱스 기반 수동 처리 필요
List<List<Integer>> chunks = new ArrayList<>();
for (int i = 0; i < numbers.size(); i += 3) {
chunks.add(numbers.subList(i, Math.min(i + 3, numbers.size())));
}
// [[1,2,3], [4,5,6], [7,8,9]]gather() 메서드와 기본 사용법
Stream에 추가된 gather() 메서드에 Gatherer를 전달하여 커스텀 중간 연산을 적용합니다.
import java.util.stream.Gatherer;
import java.util.stream.Gatherers;
List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9);
// 내장 Gatherer: windowFixed - 고정 크기 윈도우
List<List<Integer>> chunks = numbers.stream()
.gather(Gatherers.windowFixed(3))
.toList();
// [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
// 내장 Gatherer: windowSliding - 슬라이딩 윈도우
List<List<Integer>> sliding = numbers.stream()
.gather(Gatherers.windowSliding(3))
.toList();
// [[1,2,3], [2,3,4], [3,4,5], [4,5,6], [5,6,7], [6,7,8], [7,8,9]]내장 Gatherers 전체 목록
Gatherers.fold - 누적 결과를 스트림으로
// 누적 합계를 스트림으로 출력
List<Integer> runningSums = List.of(1, 2, 3, 4, 5).stream()
.gather(Gatherers.fold(
() -> 0, // 초기값
(sum, element) -> sum + element // 누적 함수
))
.toList();
// [15] - 최종 누적 결과Gatherers.scan - 중간 누적 결과 모두 출력
// 러닝 합계 (중간 결과 모두 포함)
List<Integer> runningSums = List.of(1, 2, 3, 4, 5).stream()
.gather(Gatherers.scan(
() -> 0,
(sum, element) -> sum + element
))
.toList();
// [1, 3, 6, 10, 15]Gatherers.mapConcurrent - 병렬 매핑
// 최대 10개 가상 스레드로 동시 처리
List<String> results = urls.stream()
.gather(Gatherers.mapConcurrent(10, url -> fetchContent(url)))
.toList();
// 순서 보장하면서 최대 10개 동시 HTTP 요청커스텀 Gatherer 만들기
Gatherer 인터페이스를 구현하여 완전히 새로운 중간 연산을 만들 수 있습니다.
중복 제거 Gatherer (연속 중복만)
// 연속된 중복 요소만 제거 (distinct와 다름)
static <T> Gatherer<T, ?, T> distinctConsecutive() {
return Gatherer.ofSequential(
() -> new Object() { T last = null; boolean hasLast = false; },
(state, element, downstream) -> {
if (!state.hasLast || !Objects.equals(state.last, element)) {
state.last = element;
state.hasLast = true;
return downstream.push(element);
}
return true; // 중복이면 건너뛰기
}
);
}
// 사용
List<Integer> result = List.of(1, 1, 2, 2, 3, 1, 1).stream()
.gather(distinctConsecutive())
.toList();
// [1, 2, 3, 1] - 연속 중복만 제거, 비연속 중복은 유지조건부 청크 분할 Gatherer
// 조건이 참인 동안 같은 그룹으로 묶기
static <T> Gatherer<T, ?, List<T>> chunkWhile(BiPredicate<T, T> shouldGroup) {
return Gatherer.ofSequential(
() -> new Object() { List<T> current = new ArrayList<>(); },
(state, element, downstream) -> {
if (state.current.isEmpty() ||
shouldGroup.test(state.current.getLast(), element)) {
state.current.add(element);
} else {
downstream.push(List.copyOf(state.current));
state.current.clear();
state.current.add(element);
}
return true;
},
(state, downstream) -> {
if (!state.current.isEmpty()) {
downstream.push(List.copyOf(state.current));
}
}
);
}
// 오름차순으로 연속되는 동안 묶기
List<List<Integer>> groups = List.of(1, 2, 3, 1, 2, 5, 6).stream()
.gather(chunkWhile((a, b) -> b > a))
.toList();
// [[1, 2, 3], [1, 2, 5, 6]]레이트 리미터 Gatherer
// 초당 N개로 처리 속도 제한
static <T> Gatherer<T, ?, T> rateLimited(int maxPerSecond) {
return Gatherer.ofSequential(
() -> new Object() {
long lastTime = System.nanoTime();
int count = 0;
},
(state, element, downstream) -> {
long now = System.nanoTime();
if (now - state.lastTime >= 1_000_000_000L) {
state.lastTime = now;
state.count = 0;
}
if (state.count >= maxPerSecond) {
try {
Thread.sleep(1000 - (now - state.lastTime) / 1_000_000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
state.lastTime = System.nanoTime();
state.count = 0;
}
state.count++;
return downstream.push(element);
}
);
}
// API 호출을 초당 5개로 제한
urls.stream()
.gather(rateLimited(5))
.gather(Gatherers.mapConcurrent(5, url -> callApi(url)))
.forEach(System.out::println);Gatherer 체이닝
여러 Gatherer를 체이닝하여 복잡한 데이터 변환 파이프라인을 구성할 수 있습니다.
// 로그 분석 파이프라인
List<LogSummary> summaries = logEntries.stream()
.gather(distinctConsecutive()) // 연속 중복 로그 제거
.gather(Gatherers.windowFixed(100)) // 100개씩 배치
.gather(Gatherers.mapConcurrent(4, // 4개 스레드로 병렬 분석
batch -> analyzeBatch(batch)))
.toList();Stream Gatherers는 Java Stream API의 표현력을 한 단계 끌어올리는 기능입니다. 기존에는 스트림을 중간에 collect한 뒤 다시 stream하거나, 복잡한 우회 로직이 필요했던 작업들을 Gatherer 하나로 깔끔하게 해결할 수 있습니다. Collectors가 종단 연산을 확장했듯, Gatherers는 중간 연산의 무한한 확장 가능성을 열어주었습니다.