[Camel in Action] 12-1. Camel 병렬 처리와 비동기 라우팅 – Threads, Parallel 완전 정복

왜 병렬 처리가 필요한가?

메시지 하나를 처리하는 데 여러 외부 시스템에 순차적으로 요청하면 시간이 오래 걸립니다. 예를 들어 주문 처리 시 재고 확인(200ms), 결제 처리(300ms), 배송 예약(150ms)을 순서대로 하면 650ms가 필요합니다. 병렬로 처리하면 가장 느린 300ms만 기다리면 됩니다.

Multicast 병렬 실행

parallelProcessing() 옵션을 추가하면 Multicast의 각 경로를 동시에 실행합니다.

from("direct:processOrder")
  .multicast()
    .parallelProcessing()
    .to("direct:checkInventory", "direct:processPayment", "direct:notifyWarehouse")
  .end()
  .log("모든 처리 완료");

기본적으로 Multicast는 모든 경로가 완료될 때까지 기다립니다. stopOnException()을 추가하면 하나라도 실패하면 전체를 중단합니다.

Thread Pool 설정

Camel의 스레드 풀을 커스터마이징해 처리량을 세밀하게 제어할 수 있습니다.

// 명시적 스레드 풀 생성
ExecutorService pool = Executors.newFixedThreadPool(20);

from("direct:heavyProcessing")
  .threads(10)           // 동시 처리 스레드 수
    .maxQueueSize(100)   // 대기 큐 크기
  .to("bean:heavyService");

// Spring XML에서 ThreadPoolProfile 설정
<threadPoolProfile id="myProfile" defaultProfile="false"
  poolSize="10" maxPoolSize="50" keepAliveTime="30"
  maxQueueSize="500" rejectedPolicy="CallerRuns"/>

비동기 처리 – async와 sync 혼합

응답을 기다리지 않고 즉시 결과를 반환한 후 백그라운드에서 처리하는 패턴입니다.

// 즉시 응답 후 비동기 처리
from("direct:submitJob")
  .process(exchange -> {
    String jobId = UUID.randomUUID().toString();
    exchange.getIn().setHeader("jobId", jobId);
    exchange.getIn().setBody("{"jobId":"" + jobId + "","status":"accepted"}");
  })
  .wireTap("direct:processAsync") // 비동기로 처리 시작
  .end(); // 즉시 응답 반환

from("direct:processAsync")
  .delay(0)
  .to("bean:longRunningService");

Splitter + 병렬 처리

대용량 메시지를 분할하고 각 조각을 병렬로 처리하면 처리 시간을 크게 줄일 수 있습니다.

from("direct:processBatch")
  .split(body()).parallelProcessing()
    .to("bean:itemProcessor")
  .end()
  .log("배치 처리 완료: ${header.CamelSplitSize}개");

분할된 결과를 다시 합쳐야 할 때는 AggregationStrategy를 지정합니다.

from("direct:processBatch")
  .split(body(), new GroupedBodyAggregationStrategy()).parallelProcessing()
    .to("bean:itemProcessor")
  .end()
  .log("처리된 결과: ${body}");

CompletableFuture와 Camel 비동기 API

Java 코드에서 Camel을 비동기로 호출할 때는 ProducerTemplate의 async 메서드를 사용합니다.

CompletableFuture<Object> future = template.asyncRequestBody(
  "direct:processOrder", orderData);

// 다른 작업 수행...

Object result = future.get(30, TimeUnit.SECONDS);

Leave a Comment