Categories: Camel프레임워크

[Camel in Action] 12-1. Camel 병렬 처리와 비동기 라우팅 – Threads, Parallel 완전 정복

왜 병렬 처리가 필요한가?

메시지 하나를 처리하는 데 여러 외부 시스템에 순차적으로 요청하면 시간이 오래 걸립니다. 예를 들어 주문 처리 시 재고 확인(200ms), 결제 처리(300ms), 배송 예약(150ms)을 순서대로 하면 650ms가 필요합니다. 병렬로 처리하면 가장 느린 300ms만 기다리면 됩니다.

Multicast 병렬 실행

parallelProcessing() 옵션을 추가하면 Multicast의 각 경로를 동시에 실행합니다.

from("direct:processOrder")
  .multicast()
    .parallelProcessing()
    .to("direct:checkInventory", "direct:processPayment", "direct:notifyWarehouse")
  .end()
  .log("모든 처리 완료");

기본적으로 Multicast는 모든 경로가 완료될 때까지 기다립니다. stopOnException()을 추가하면 하나라도 실패하면 전체를 중단합니다.

Thread Pool 설정

Camel의 스레드 풀을 커스터마이징해 처리량을 세밀하게 제어할 수 있습니다.

// 명시적 스레드 풀 생성
ExecutorService pool = Executors.newFixedThreadPool(20);

from("direct:heavyProcessing")
  .threads(10)           // 동시 처리 스레드 수
    .maxQueueSize(100)   // 대기 큐 크기
  .to("bean:heavyService");

// Spring XML에서 ThreadPoolProfile 설정
<threadPoolProfile id="myProfile" defaultProfile="false"
  poolSize="10" maxPoolSize="50" keepAliveTime="30"
  maxQueueSize="500" rejectedPolicy="CallerRuns"/>

비동기 처리 – async와 sync 혼합

응답을 기다리지 않고 즉시 결과를 반환한 후 백그라운드에서 처리하는 패턴입니다.

// 즉시 응답 후 비동기 처리
from("direct:submitJob")
  .process(exchange -> {
    String jobId = UUID.randomUUID().toString();
    exchange.getIn().setHeader("jobId", jobId);
    exchange.getIn().setBody("{"jobId":"" + jobId + "","status":"accepted"}");
  })
  .wireTap("direct:processAsync") // 비동기로 처리 시작
  .end(); // 즉시 응답 반환

from("direct:processAsync")
  .delay(0)
  .to("bean:longRunningService");

Splitter + 병렬 처리

대용량 메시지를 분할하고 각 조각을 병렬로 처리하면 처리 시간을 크게 줄일 수 있습니다.

from("direct:processBatch")
  .split(body()).parallelProcessing()
    .to("bean:itemProcessor")
  .end()
  .log("배치 처리 완료: ${header.CamelSplitSize}개");

분할된 결과를 다시 합쳐야 할 때는 AggregationStrategy를 지정합니다.

from("direct:processBatch")
  .split(body(), new GroupedBodyAggregationStrategy()).parallelProcessing()
    .to("bean:itemProcessor")
  .end()
  .log("처리된 결과: ${body}");

CompletableFuture와 Camel 비동기 API

Java 코드에서 Camel을 비동기로 호출할 때는 ProducerTemplate의 async 메서드를 사용합니다.

CompletableFuture<Object> future = template.asyncRequestBody(
  "direct:processOrder", orderData);

// 다른 작업 수행...

Object result = future.get(30, TimeUnit.SECONDS);

zerg96

Share
Published by
zerg96

Recent Posts

[Apache Camel] 2025년 최신 트렌드 – AI 통합과 서버리스 Camel의 미래

2025년 Apache Camel의 최신 트렌드를 분석합니다. AI/LLM 통합 컴포넌트, 서버리스 배포, Camel K 진화, WebAssembly…

9시간 ago

[Camel in Action] 완결편 – Apache Camel 전체 여정 회고와 다음 단계

Camel in Action을 완독한 후 Apache Camel의 전체 그림을 다시 정리합니다. 핵심 철학, 학습 경로,…

9시간 ago

[Camel in Action] 실전편 – Camel 마이그레이션 가이드 2.x에서 4.x까지

Apache Camel 2.x에서 3.x, 4.x로 마이그레이션하는 단계별 가이드입니다. 주요 API 변경사항, 제거된 컴포넌트, 자동화 도구…

9시간 ago

[Camel in Action] 실전편 – Camel 라우트 디버깅 기법과 문제 해결 가이드

Apache Camel 라우트에서 발생하는 문제를 디버깅하고 해결하는 실전 기법을 설명합니다. 로그 분석, breakpoint 디버깅, Tracer,…

9시간 ago

[Camel in Action] 실전편 – Camel 도입 전 반드시 알아야 할 것들

Apache Camel을 프로젝트에 도입하기 전 알아야 할 핵심 사항을 정리합니다. 학습 곡선, 도입 비용, 적합한…

9시간 ago

[Camel in Action] 실전편 – Enterprise Integration Patterns 20가지 핵심 정리

엔터프라이즈 통합 패턴(EIP) 20가지를 Apache Camel 코드와 함께 한 번에 정리합니다. 메시징 채널, 메시지 라우팅,…

9시간 ago