Aggregator EIP란?
여러 개의 관련된 메시지를 하나의 메시지로 합치는 패턴입니다. 예를 들어 여러 센서에서 오는 데이터를 모아 배치로 처리하거나, 분할 처리된 부분 결과들을 합칠 때 사용합니다.
기본 Aggregator 구현
from("direct:orderItems")
.aggregate(header("orderId"), new OrderAggregator()) // 집계 키와 전략
.completionSize(5) // 5개 모이면 완료
.to("direct:processFullOrder");
// 집계 전략 구현
public class OrderAggregator implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
return newExchange; // 첫 번째 메시지
}
// 이전 집계 결과 + 새 메시지 합치기
List<OrderItem> items = oldExchange.getIn()
.getBody(List.class);
OrderItem newItem = newExchange.getIn().getBody(OrderItem.class);
items.add(newItem);
oldExchange.getIn().setBody(items);
return oldExchange;
}
}
집계 완료 조건 (Completion Conditions)
from("direct:events")
.aggregate(header("sessionId"), new SessionAggregator())
// 크기 기반
.completionSize(10)
// 타임아웃 기반 (마지막 메시지 후 N초)
.completionTimeout(30000)
// 인터벌 기반 (N초마다 강제 완료)
.completionInterval(60000)
// Predicate 기반
.completionPredicate(simple("${body.isLast} == true"))
// 크기 표현식 기반
.completionSizeExpression(header("expectedCount"))
// 조건 조합 (OR)
.completionSize(100).completionTimeout(5000)
.to("direct:processSession");
eagerCheckCompletion – 집계 전 완료 검사
from("direct:messages")
.aggregate(header("groupId"), new GroupAggregator())
.eagerCheckCompletion() // 집계 전에 완료 조건 먼저 검사
.completionPredicate(
simple("${in.header.lastMessage} == 'true'"))
.to("direct:done");
집계 완료 후 처리
from("direct:parts")
.aggregate(header("batchId"), new BatchAggregator())
.completionSize(simple("${header.totalParts}"))
.completionTimeout(30000)
.discardOnCompletionTimeout() // 타임아웃 시 버림
.process(e -> {
System.out.println("집계 완료: " + e.getIn().getBody());
})
.to("direct:processCompleteBatch");
AggregationRepository – 영속성
서버 재시작 시에도 집계 상태를 유지하려면 영속성 있는 AggregationRepository를 사용합니다.
// JDBC 기반 영속성
JdbcAggregationRepository repo = new JdbcAggregationRepository(
dataSource, "aggregation_table");
from("direct:orders")
.aggregate(header("batchId"), new OrderAggregator())
.completionSize(100)
.completionTimeout(60000)
.aggregationRepository(repo) // 영속성 설정
.to("direct:processBatch");
Spring Bean으로 집계 전략 정의
@Component("myAggregator")
public class MyAggregator implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange old, Exchange neu) {
if (old == null) {
List<Object> list = new ArrayList<>();
list.add(neu.getIn().getBody());
neu.getIn().setBody(list);
return neu;
}
old.getIn().getBody(List.class).add(neu.getIn().getBody());
return old;
}
}
// 라우트에서 Bean 이름으로 참조
from("direct:items")
.aggregate(header("groupId")).aggregationStrategyRef("myAggregator")
.completionSize(5)
.to("direct:grouped");