IoT와 Camel의 조합
IoT 기기는 MQTT라는 경량 메시지 프로토콜로 데이터를 주고받습니다. 수천 개의 센서가 보내는 데이터를 수집, 가공, 저장하는 파이프라인이 필요합니다. Camel의 MQTT 컴포넌트는 이런 IoT 백엔드를 쉽게 구축할 수 있게 해줍니다.
MQTT 기본 개념
MQTT는 발행/구독(Pub/Sub) 모델을 사용합니다. 기기는 특정 토픽에 데이터를 발행하고, 서버는 그 토픽을 구독해 데이터를 받습니다.
- 토픽: 데이터 채널. 계층 구조로 표현 (예:
factory/line1/sensor/temperature) - QoS 0: 최대 1회 전송 (유실 가능, 빠름)
- QoS 1: 최소 1회 전송 (중복 가능, 신뢰성 있음)
- QoS 2: 정확히 1회 전송 (가장 신뢰성 높지만 느림)
Camel MQTT 컴포넌트로 센서 데이터 수신
// 모든 센서 데이터 수신 (와일드카드 토픽)
from("mqtt:sensorData?host=tcp://mqtt-broker:1883"
+ "&subscribeTopicName=factory/+/sensor/#"
+ "&qualityOfServiceEnum=AtLeastOnce")
.process(exchange -> {
String topic = exchange.getIn().getHeader(MqttMessage.MQTT_TOPIC, String.class);
byte[] payload = exchange.getIn().getBody(byte[].class);
// 토픽에서 라인과 센서 타입 파싱
String[] parts = topic.split("/");
exchange.getIn().setHeader("line", parts[1]);
exchange.getIn().setHeader("sensorType", parts[3]);
exchange.getIn().setBody(new String(payload));
})
.to("direct:processSensorData");
센서 데이터 처리 파이프라인
from("direct:processSensorData")
.unmarshal().json(SensorReading.class)
// 이상값 필터링
.filter(simple("${body.value} >= 0 && ${body.value} <= 1000"))
// 실시간 DB 저장
.wireTap("direct:saveToTimeSeries")
// 임계값 초과 시 알림
.choice()
.when(simple("${body.value} > 800"))
.to("direct:sendAlert")
.end();
from("direct:saveToTimeSeries")
.to("influxdb:mydb?retentionPolicy=autogen");
Camel로 MQTT 명령 전송
// 기기에 제어 명령 발송
from("rest:post:api/devices/{deviceId}/command")
.process(exchange -> {
String deviceId = exchange.getIn().getHeader("deviceId", String.class);
exchange.getIn().setHeader(MqttMessage.MQTT_TOPIC,
"factory/devices/" + deviceId + "/command");
})
.to("mqtt:commands?host=tcp://mqtt-broker:1883"
+ "&qualityOfServiceEnum=ExactlyOnce");
대규모 IoT 데이터 배치 집계
// 1분간 센서 데이터를 집계해 평균값 계산
from("mqtt:sensorData?host=tcp://mqtt-broker:1883&subscribeTopicName=sensor/#")
.aggregate(header("sensorType"), new AverageAggregationStrategy())
.completionTimeout(60000) // 1분
.completionPredicate(exchangeProperty(AGGREGATED_SIZE).isGreaterThan(100))
.process(exchange -> {
double avg = exchange.getIn().getBody(Double.class);
exchange.getIn().setHeader("average", avg);
})
.to("influxdb:aggregated-metrics");