본문 바로가기

OpenSource

Kafka mqtt connector 사용법 - RabbitMQ

Kafka mqtt connector 사용법 - Mqtt Broker 글 쓴 후 mqtt broker 로 RabbitMQ 사용하는 구조에 대한 추가 검토가 필요해져서 테스트한 내용 이어서 정리해보겠습니다.

 

 

Kafka mqtt connector 사용법

mqtt 와 kafka 연동할 일이 생겨서 기술 검토를 진행했다. 일반적으로 언급되는 구조가 위 이미지처럼 mqtt broker 를 이용해서 통신하는 방법이던데 온라인 상의 다수의 레퍼런스가 confluent 의 mqtt brok

tzara.tistory.com

 

MQTT Plugin — RabbitMQ

MQTT Plugin RabbitMQ supports MQTT 3.1.1 via a plugin that ships in the core distribution. Key covered topics are: and more. The Quorum Requirement As of 3.8, the plugin requires a quorum of cluster nodes to be present. This means two nodes out of three, t

www.rabbitmq.com

How it Works

RabbitMQ MQTT plugin targets MQTT 3.1.1 and supports a broad range of MQTT clients. It also makes it possible for MQTT clients to interoperate with AMQP 0-9-1, AMQP 1.0, and STOMP clients. There is also support for multi-tenancy.
The plugin builds on top of RabbitMQ core protocol's entities: exchanges and queues. Messages published to MQTT topics use a topic exchange (amq.topic by default) internally. Subscribers consume from RabbitMQ queues bound to the topic exchange. This both enables interoperability with other protocols and makes it possible to use the Management plugin to inspect queue sizes, message rates, and so on.

이 부분만 읽어보면 mqtt topic 을 AMQP 로 변환해서 RabbitMQ 의 큐로 바인딩하는 식으로 처리되는 듯. 그렇다면 mqtt connector 써서 mqtt broker(ex : mosquitto)와 통신했던 앞 글 방식과 달리 RabbitMQ connector 를 사용해야 하지 않을까 싶었는데 당장은 RabbitMQ, kafka connect 양 쪽 익숙치 않은 상황이라 우선 설정 편리한 Confluent connect 로 테스트를 해보기로 했다.

 

RabbitMQ Source Connector for Confluent Platform — Confluent Documentation 6.0.0

RabbitMQ Source Connector for Confluent Platform The Kafka Connect RabbitMQ Source connector integrates with RabbitMQ servers, using the AMQP protocol. The RabbitMQ Source connector reads data from a RabbitMQ queue or topic and persists the data in an Apac

docs.confluent.io

  1. ERLang 설치, RabbitMQ windows 버전 zip 파일 다운로드 후 압축 해제.
  2. mqtt 플럭인 사용 설정 : rabbitmq-plugins enable rabbitmq_mqtt
  3. Queue 생성 후 topic 과 바인딩 설정.
  4. 원격 접속 가능하도록 계정 추가 및 권한 설정.
  5. confluent rabbitmq connector 설치 : confluent-hub install confluentinc/kafka-connect-rabbitmq:latest

RabbitMQ 잘 모르니 Queue 생성 후 mqtt topic 과 어떻게 바인딩 설정해야 하는지 헷갈렸던 것과 기본 계정인 guest 사용하면 원격에서는 접속 안되므로 계정 추가해야 되었던 것 외에는 mqtt connector 설치해서 테스트해보며 한차례 시행착오 겪어봤던 터라 크게 어렵지 않게 성공.

아래는 confluent rabbitmq connector 설치 후 인스턴스 설정 한 내용.

curl -s -X POST -H 'Content-Type: application/json' http://KAFKA:8083/connectors -d '
{
   "name" : "RabbitMQSourceConnector",
   "config" : {
    "connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
    "tasks.max" : "1",
    "kafka.topic" : "mqtt.temperature",
    "rabbitmq.queue" : "temperature",
    "rabbitmq.host" : "RABBITMQ_IP",
    "rabbitmq.username" : "user",
    "rabbitmq.password" : "pw",
    "confluent.topic.bootstrap.servers": "localhost:9092",
    "confluent.topic.replication.factor": "1",
    "confluent.license":"",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter"
   }
}'

RabbitMQ -> rabbitmq source connector -> Kafka 기본 동작 무난히 되는 것 확인되었으므로 이제 오픈 소스 connector 구해서 적용 테스트해보면 되겠다 싶어 구글링.

가장 추천받는 connector 가 어떤건지 자세히 조사해보진 않았지만 그래도 ibm 에서 공개한거고 소스 업데이트도 비교적 근래라 위 문서에서 사용한 connector 를 적용해봤다.

기본 적용법은 앞 글에서 mqtt connector 적용했던 것과 큰 차이없이 소스 내려받아서 빌드 후 내려받은 소스에 포함된 rabbitmq-source.properties 복사해서 약간 수정하고 거기에 rabbitmq.host, rabbitmq.useranme, rabbitmq.password 추가해준 후 connect 실행.

간단한 메시지 테스트다 보니 앞서 mqtt connector 삽질하며 얻은 경험도 있고해서인지 단번에 성공.

좀 더 상세한 설정 및 테스트 방법은 이 문서 조금씩 수정하며 보충하도록 하겠습니다.