Kafka와 Camel의 조합이 강력한 이유
Kafka는 초당 수백만 건의 이벤트를 처리할 수 있는 분산 스트리밍 플랫폼입니다. Camel은 Kafka에서 데이터를 받아 가공하고 여러 시스템으로 분배하는 복잡한 로직을 쉽게 표현할 수 있습니다. 둘의 조합은 현대 데이터 파이프라인의 핵심입니다.
Kafka 컴포넌트 기본 설정
camel-kafka 의존성을 추가하고 브로커 주소만 설정하면 바로 사용할 수 있습니다.
// 토픽에 메시지 발행 (Producer)
from("timer:event?period=1000")
.setBody(simple("이벤트 발생: ${date:now:HH:mm:ss}"))
.to("kafka:my-topic?brokers=localhost:9092");
// 토픽에서 메시지 소비 (Consumer)
from("kafka:my-topic?brokers=localhost:9092&groupId=my-group")
.log("수신: ${body}")
.to("bean:eventProcessor");
파티션과 컨슈머 그룹 이해
Kafka의 확장성은 파티션에서 나옵니다. 하나의 토픽을 여러 파티션으로 나누고, 컨슈머 그룹의 각 인스턴스가 서로 다른 파티션을 담당합니다.
- 파티션 수 = 최대 병렬 처리 수: 파티션이 3개면 컨슈머를 3개까지 병렬 확장 가능
- 같은 키는 같은 파티션: 특정 사용자의 이벤트를 순서대로 처리하려면 사용자 ID를 파티션 키로
- 컨슈머 그룹: 같은 그룹 ID를 가진 컨슈머들은 메시지를 나눠 처리
// 파티션 키 지정
from("direct:send")
.setHeader(KafkaConstants.KEY, simple("${body.userId}"))
.to("kafka:orders?brokers=localhost:9092");
오프셋 관리와 메시지 보장
Kafka는 메시지를 디스크에 보관하고 소비자가 읽은 위치(오프셋)를 추적합니다. 이를 통해 처리 실패 시 재처리가 가능합니다.
// 수동 오프셋 커밋 (at-least-once 보장)
from("kafka:orders?brokers=localhost:9092&autoCommitEnable=false")
.process(exchange -> {
// 처리 성공 후에만 오프셋 커밋
exchange.getIn().setHeader(KafkaConstants.MANUAL_COMMIT,
exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT));
})
.to("bean:orderService")
.process(exchange -> {
KafkaManualCommit commit = exchange.getIn()
.getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
commit.commitSync();
});
실시간 ETL 파이프라인 구축
Kafka + Camel로 실시간 ETL 파이프라인을 구축하는 전형적인 패턴입니다.
// 원시 로그 수신 → 파싱 → 필터링 → DB 저장 + 알림
from("kafka:raw-logs?brokers=localhost:9092&groupId=etl")
.unmarshal().json(LogEvent.class)
.filter(simple("${body.level} == 'ERROR'"))
.multicast()
.to("bean:logRepository?method=save")
.to("kafka:error-alerts?brokers=localhost:9092")
.end();
이 파이프라인은 로그를 실시간으로 수신해 에러 로그만 걸러내어 DB에 저장하고 동시에 알림 토픽으로 발행합니다. Camel의 multicast와 filter EIP가 복잡한 분기 로직을 깔끔하게 표현합니다.