[Camel in Action] 실전편 – Enterprise Integration Patterns 20가지 핵심 정리

EIP란 무엇인가?

“Enterprise Integration Patterns”(Hohpe, Woolf, 2003)은 분산 시스템 통합의 바이블입니다. 65가지 패턴을 정의했고, Apache Camel은 이 패턴들을 코드 레벨에서 직접 구현할 수 있게 해줍니다. 가장 자주 사용하는 20가지를 Camel 코드와 함께 정리합니다.

메시지 라우팅 패턴 (Routing Patterns)

// 1. Content-Based Router (CBR) - 내용 기반 라우팅
from("direct:order")
  .choice()
    .when(simple("${body.type} == 'VIP'")).to("direct:vipOrder")
    .when(simple("${body.amount} > 10000")).to("direct:largeOrder")
    .otherwise().to("direct:standardOrder")
  .end();

// 2. Message Filter - 조건 필터링
from("direct:allEvents")
  .filter(simple("${body.priority} == 'HIGH'"))
  .to("direct:urgentEvents");

// 3. Splitter - 메시지 분할
from("direct:orderBatch")
  .split(body()).to("direct:singleOrder");

// 4. Aggregator - 메시지 집계
from("direct:parts")
  .aggregate(header("correlationId"), new BodyConcatAggregationStrategy())
    .completionSize(5)
  .to("direct:assembled");

// 5. Resequencer - 순서 재정렬
from("direct:outOfOrder")
  .resequence(header("seqNo")).stream()
  .to("direct:inOrder");

메시지 변환 패턴 (Transformation Patterns)

// 6. Message Translator - 메시지 형식 변환
from("direct:xmlOrder")
  .unmarshal().jacksonXml(OrderXml.class)
  .process(exchange -> {
    OrderXml xml = exchange.getIn().getBody(OrderXml.class);
    exchange.getIn().setBody(OrderJson.from(xml));
  })
  .marshal().json();

// 7. Content Enricher - 데이터 보강
from("direct:basicOrder")
  .enrich("direct:getCustomerDetails",
    (original, enriched) -> {
      original.getIn().getBody(Order.class)
        .setCustomer(enriched.getIn().getBody(Customer.class));
      return original;
    });

// 8. Content Filter - 불필요한 데이터 제거
from("direct:fullMessage")
  .process(exchange -> {
    FullMessage msg = exchange.getIn().getBody(FullMessage.class);
    exchange.getIn().setBody(new SlimMessage(msg.getId(), msg.getEssential()));
  });

// 9. Normalizer - 여러 형식을 통일
from("direct:xmlInput").unmarshal().jacksonXml(Order.class).to("direct:normalized");
from("direct:jsonInput").unmarshal().json(Order.class).to("direct:normalized");
from("direct:csvInput").unmarshal().bindy(BindyType.Csv, OrderCsv.class)
  .process(e -> e.getIn().setBody(OrderCsv.toOrder(e.getIn().getBody(OrderCsv.class))))
  .to("direct:normalized");

메시지 엔드포인트 패턴 (Endpoint Patterns)

// 10. Polling Consumer - 주기적 폴링
from("file:inbox?delay=5000").to("direct:process");

// 11. Event-Driven Consumer - 이벤트 기반 소비
from("activemq:queue:orders").to("bean:orderService");

// 12. Idempotent Consumer - 중복 처리 방지
from("activemq:queue:payments")
  .idempotentConsumer(header("paymentId"),
    MemoryIdempotentRepository.memoryIdempotentRepository(1000))
  .to("bean:paymentService");

// 13. Selective Consumer - 선택적 소비 (JMS Message Selector)
from("activemq:queue:orders?selector=priority='HIGH'")
  .to("bean:priorityOrderService");

// 14. Dead Letter Channel - 실패 메시지 격리
errorHandler(deadLetterChannel("activemq:queue:DLQ")
  .maximumRedeliveries(3).redeliveryDelay(1000));

메시징 채널 패턴 (Channel Patterns)

// 15. Point-to-Point Channel - 일대일 전송 (큐)
template.sendBody("activemq:queue:singleReceiver", message);

// 16. Publish-Subscribe Channel - 일대다 전송 (토픽)
template.sendBody("activemq:topic:broadcast", message);

// 17. Datatype Channel - 타입별 채널
from("direct:raw").choice()
  .when(body().isInstanceOf(Order.class)).to("activemq:queue:orders")
  .when(body().isInstanceOf(Invoice.class)).to("activemq:queue:invoices");

// 18. Wire Tap - 감청/복제
from("direct:main").wireTap("direct:audit").to("direct:process");

// 19. Message Bus - 공통 메시지 버스
from("activemq:topic:enterprise-bus").to("bean:routingService");

// 20. Correlation Identifier - 연관 메시지 추적
from("direct:request")
  .setHeader("correlationId", simple("${id}"))
  .to("activemq:queue:async-process");

Leave a Comment