[Camel in Action] 실전편 – MQTT와 IoT 기기 연동, 센서 데이터 파이프라인 구축

IoT와 Camel의 조합

IoT 기기는 MQTT라는 경량 메시지 프로토콜로 데이터를 주고받습니다. 수천 개의 센서가 보내는 데이터를 수집, 가공, 저장하는 파이프라인이 필요합니다. Camel의 MQTT 컴포넌트는 이런 IoT 백엔드를 쉽게 구축할 수 있게 해줍니다.

MQTT 기본 개념

MQTT는 발행/구독(Pub/Sub) 모델을 사용합니다. 기기는 특정 토픽에 데이터를 발행하고, 서버는 그 토픽을 구독해 데이터를 받습니다.

  • 토픽: 데이터 채널. 계층 구조로 표현 (예: factory/line1/sensor/temperature)
  • QoS 0: 최대 1회 전송 (유실 가능, 빠름)
  • QoS 1: 최소 1회 전송 (중복 가능, 신뢰성 있음)
  • QoS 2: 정확히 1회 전송 (가장 신뢰성 높지만 느림)

Camel MQTT 컴포넌트로 센서 데이터 수신

// 모든 센서 데이터 수신 (와일드카드 토픽)
from("mqtt:sensorData?host=tcp://mqtt-broker:1883"
  + "&subscribeTopicName=factory/+/sensor/#"
  + "&qualityOfServiceEnum=AtLeastOnce")
  .process(exchange -> {
    String topic = exchange.getIn().getHeader(MqttMessage.MQTT_TOPIC, String.class);
    byte[] payload = exchange.getIn().getBody(byte[].class);
    // 토픽에서 라인과 센서 타입 파싱
    String[] parts = topic.split("/");
    exchange.getIn().setHeader("line", parts[1]);
    exchange.getIn().setHeader("sensorType", parts[3]);
    exchange.getIn().setBody(new String(payload));
  })
  .to("direct:processSensorData");

센서 데이터 처리 파이프라인

from("direct:processSensorData")
  .unmarshal().json(SensorReading.class)
  // 이상값 필터링
  .filter(simple("${body.value} >= 0 && ${body.value} <= 1000"))
  // 실시간 DB 저장
  .wireTap("direct:saveToTimeSeries")
  // 임계값 초과 시 알림
  .choice()
    .when(simple("${body.value} > 800"))
      .to("direct:sendAlert")
  .end();

from("direct:saveToTimeSeries")
  .to("influxdb:mydb?retentionPolicy=autogen");

Camel로 MQTT 명령 전송

// 기기에 제어 명령 발송
from("rest:post:api/devices/{deviceId}/command")
  .process(exchange -> {
    String deviceId = exchange.getIn().getHeader("deviceId", String.class);
    exchange.getIn().setHeader(MqttMessage.MQTT_TOPIC,
      "factory/devices/" + deviceId + "/command");
  })
  .to("mqtt:commands?host=tcp://mqtt-broker:1883"
    + "&qualityOfServiceEnum=ExactlyOnce");

대규모 IoT 데이터 배치 집계

// 1분간 센서 데이터를 집계해 평균값 계산
from("mqtt:sensorData?host=tcp://mqtt-broker:1883&subscribeTopicName=sensor/#")
  .aggregate(header("sensorType"), new AverageAggregationStrategy())
    .completionTimeout(60000) // 1분
    .completionPredicate(exchangeProperty(AGGREGATED_SIZE).isGreaterThan(100))
  .process(exchange -> {
    double avg = exchange.getIn().getBody(Double.class);
    exchange.getIn().setHeader("average", avg);
  })
  .to("influxdb:aggregated-metrics");

Leave a Comment