[Camel in Action] 5-1. Aggregator EIP – 메시지 집계 완전 정복

Aggregator EIP란?

여러 개의 관련된 메시지를 하나의 메시지로 합치는 패턴입니다. 예를 들어 여러 센서에서 오는 데이터를 모아 배치로 처리하거나, 분할 처리된 부분 결과들을 합칠 때 사용합니다.

기본 Aggregator 구현

from("direct:orderItems")
    .aggregate(header("orderId"), new OrderAggregator())  // 집계 키와 전략
        .completionSize(5)      // 5개 모이면 완료
    .to("direct:processFullOrder");

// 집계 전략 구현
public class OrderAggregator implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (oldExchange == null) {
            return newExchange;  // 첫 번째 메시지
        }
        // 이전 집계 결과 + 새 메시지 합치기
        List<OrderItem> items = oldExchange.getIn()
            .getBody(List.class);
        OrderItem newItem = newExchange.getIn().getBody(OrderItem.class);
        items.add(newItem);
        oldExchange.getIn().setBody(items);
        return oldExchange;
    }
}

집계 완료 조건 (Completion Conditions)

from("direct:events")
    .aggregate(header("sessionId"), new SessionAggregator())
        // 크기 기반
        .completionSize(10)

        // 타임아웃 기반 (마지막 메시지 후 N초)
        .completionTimeout(30000)

        // 인터벌 기반 (N초마다 강제 완료)
        .completionInterval(60000)

        // Predicate 기반
        .completionPredicate(simple("${body.isLast} == true"))

        // 크기 표현식 기반
        .completionSizeExpression(header("expectedCount"))

        // 조건 조합 (OR)
        .completionSize(100).completionTimeout(5000)
    .to("direct:processSession");

eagerCheckCompletion – 집계 전 완료 검사

from("direct:messages")
    .aggregate(header("groupId"), new GroupAggregator())
        .eagerCheckCompletion()  // 집계 전에 완료 조건 먼저 검사
        .completionPredicate(
            simple("${in.header.lastMessage} == 'true'"))
    .to("direct:done");

집계 완료 후 처리

from("direct:parts")
    .aggregate(header("batchId"), new BatchAggregator())
        .completionSize(simple("${header.totalParts}"))
        .completionTimeout(30000)
        .discardOnCompletionTimeout()  // 타임아웃 시 버림
    .process(e -> {
        System.out.println("집계 완료: " + e.getIn().getBody());
    })
    .to("direct:processCompleteBatch");

AggregationRepository – 영속성

서버 재시작 시에도 집계 상태를 유지하려면 영속성 있는 AggregationRepository를 사용합니다.

// JDBC 기반 영속성
JdbcAggregationRepository repo = new JdbcAggregationRepository(
    dataSource, "aggregation_table");

from("direct:orders")
    .aggregate(header("batchId"), new OrderAggregator())
        .completionSize(100)
        .completionTimeout(60000)
        .aggregationRepository(repo)  // 영속성 설정
    .to("direct:processBatch");

Spring Bean으로 집계 전략 정의

@Component("myAggregator")
public class MyAggregator implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange old, Exchange neu) {
        if (old == null) {
            List<Object> list = new ArrayList<>();
            list.add(neu.getIn().getBody());
            neu.getIn().setBody(list);
            return neu;
        }
        old.getIn().getBody(List.class).add(neu.getIn().getBody());
        return old;
    }
}

// 라우트에서 Bean 이름으로 참조
from("direct:items")
    .aggregate(header("groupId")).aggregationStrategyRef("myAggregator")
        .completionSize(5)
    .to("direct:grouped");

Leave a Comment