[Camel in Action] 5-2. Splitter EIP – 대용량 메시지 분할 처리

Splitter EIP – 쪼개서 정복하기

Splitter 패턴은 하나의 큰 메시지를 여러 개의 작은 메시지로 분할해 각각 처리하는 패턴입니다. Aggregator의 반대 역할이며, 보통 함께 사용됩니다.

기본 Splitter 사용

// List 본문을 각 요소로 분할
from("direct:orderBatch")
    .split(body())              // List의 각 요소로 분할
        .log("처리 중: ${body}")
        .to("direct:processItem")
    .end()
    .log("모든 항목 처리 완료");

// String 분할 (구분자 기반)
from("direct:csvLine")
    .split(body().tokenize(","))  // 콤마로 분할
        .log("필드: ${body}")
    .end();

// XML 노드 분할
from("direct:xmlBatch")
    .split(xpath("//order"))  // 각 order 요소로 분할
        .log("주문: ${body}")
    .end();

Bean으로 분할 로직 정의

@Component
public class OrderSplitter {
    // Iterator 반환 - 메모리 효율적
    public Iterator<OrderItem> split(@Body Order order) {
        return order.getItems().iterator();
    }

    // 커스텀 분할 로직
    public List<String> splitByChunk(@Body String bigText) {
        List<String> chunks = new ArrayList<>();
        for (int i = 0; i < bigText.length(); i += 1000) {
            chunks.add(bigText.substring(i, Math.min(i + 1000, bigText.length())));
        }
        return chunks;
    }
}

from("direct:bigOrder")
    .split().method(OrderSplitter.class, "split")
        .to("direct:processItem")
    .end();

병렬 처리

from("direct:orders")
    .split(body())
        .parallelProcessing()         // 병렬 처리 활성화
        .executorService(myExecutor)  // 커스텀 스레드 풀 지정
        .timeout(10000)               // 10초 타임아웃
        .stopOnException()            // 오류 시 중단
        .log("병렬 처리: ${body}")
        .to("direct:process")
    .end();

대용량 파일 스트리밍 분할

// TokenizeLanguage로 대용량 XML 스트리밍 처리
from("file:big-files?noop=true")
    .split()
        .tokenizeXML("order")  // <order> 태그 단위로 스트리밍 분할
        .streaming()           // 스트리밍 모드 (메모리 절약)
        .log("처리: ${body}")
        .to("jms:queue:orders")
    .end();

// 줄 단위 분할
from("file:csv-files?noop=true")
    .split()
        .tokenize("
")        // 개행 기준 분할
        .streaming()
        .filter(simple("${body} != ''"))  // 빈 줄 제거
        .to("direct:processLine")
    .end();

분할 후 집계 (Splitter + Aggregator)

from("direct:processOrder")
    .split(simple("${body.items}"), new OrderResultAggregator())
        .parallelProcessing()
        .process(e -> {
            OrderItem item = e.getIn().getBody(OrderItem.class);
            item.setProcessed(true);
        })
    .end()  // 집계된 결과가 본문으로 설정됨
    .log("전체 처리 결과: ${body}");

오류 발생 시 동작

from("direct:items")
    .split(body())
        .stopOnException()     // 오류 시 즉시 중단
        // 또는
        .shareUnitOfWork()     // 하나의 트랜잭션 단위로 처리
        .log("처리: ${body}")
        .to("direct:process")
    .end();

Splitter 메타데이터 헤더

  • CamelSplitIndex: 현재 인덱스 (0부터 시작)
  • CamelSplitSize: 전체 분할 수 (알 수 있는 경우)
  • CamelSplitComplete: 마지막 항목 여부 (true/false)
from("direct:items")
    .split(body())
        .log("${header.CamelSplitIndex + 1}번째 / 총 ${header.CamelSplitSize}개")
    .end();

Leave a Comment