본문 바로가기

OpenSource

Kafka mqtt connector 사용법 - mqtt broker

mqtt 와 kafka 연동할 일이 생겨서 기술 검토를 진행했다.

일반적으로 언급되는 구조가 위 이미지처럼 mqtt broker 를 이용해서 통신하는 방법이던데 온라인 상의 다수의 레퍼런스가 confluent 의 mqtt broker 사용예라서 그 중 하나 따라서 해보았다(confluent platform(community 라이센스) 사전 설치했는데 이 설치 절차는 패스).

confluent platform 설정에서 무언가 빠뜨렸는지 메시지 convert 과정에서 에러가 발생해서 connector 설정에  ByteArrayConverter 추가해준 거외는 거의 똑같이 진행했고 무난하게 성공했다.

"value.converter" : "org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter" : "org.apache.kafka.connect.json.JsonConverter",

순서가 뒤바뀐 듯 싶긴 했지만 테스트 후 라이센스를 확인해봤는데 아뿔싸!

confluent mqtt broker는 트라이얼 버전으로 30일 후에 엔터프라이즈 라이센스 적용된다는 안내가 딱 박혀있다.

오픈 소스 kafka 와 confluent platform 의 기본은 큰 차이가 없다보니 오픈 소스 kafka 에도 connect 가 기본 포함되어 있음을 확인하고 오픈 소스 코넥터를 다시 찾아보기 시작했다.

문서보니 기능 상 약간 부족한 부분이 있긴 하지만 confluent mqtt broker 와 큰 차이는 없어 보여서 적용해보기로 했다.

역시나 confluent 에 비해 적용 방법이 불편하다. 기본 내장된 sink/source connector 는 별도 작업 없이 간단히 실행 가능했는데 그 외 connector 를 추가하려면 손이 꽤 간다. 그래도 오픈 소스 사용하는데 이 정도야 감내해야지.

  1. 사용하려는 connect 소스 빌드.
  2. kafka 설치된 컴의 적당한 위치에 connector 모아놓을 디렉토리 생성(ex : $KAFKA_HOME/plugins)
  3. $KAFKA_HOME/config/connect-standalone.properties 에 plugin.path=/kafka설치경로/plugins/ 추가 ( 2에서 추가한 디렉토리 경로 )
  4. $KAFKA_HOME/plugins 에 사용할 connector 디렉토리 생성하고 여기에 1에서 생성한 jar 파일들 복사.
  5. connector 용 properties 파일 생성 : $KAFKA_HOME/config/connect-file-*.properties 파일 복사해서 적당히 편집.
  6. 앞에서 생성한 파일명이 connect-mqtt-source.properties 이라고 한다면 $KAFKA_HOME으로 이동 후 다음과 같이 connect 실행.
.\bin\windows\connect-standalone.bat .\config\connect-standalone.properties .\config\connect-mqtt-source.properties

실행 명령 보면 알 수 있겠지만 처음 confluent platform 테스트했었던 리눅스 박스 건드리기 귀찮아서 윈도우에서 테스트한 건데 이때문에 고난의 행군 시작됨.

실행해보면 무수한  'WARN could not get type for name ...' 와 'WARN The configuration ... was supplied but isn't a known config.' 그리고, NoClassDefFoundError 가 나온다.

처음에는 실행조차 안되더니 여차저차해서 실행까지는 해냈지만 앞의 저 무수한 warn 과 error 는 여전하고 mqtt 메시지 발행해도 kafka consumer 는 묵묵부답.

plugin.path 에 jar 경로 추가해보기도 하고 경고난 라이브러리들 다운받아서 경로에 넣어보기도 하고 CLASSPATH 에 경로 추가해보기도 했지만, 그리고 온라인에서 검색된 문서들 참고해서 잡다한 시도해봤지만 문제가 해결나지 않았다.

혹시나 해서 리눅스 박스에서 confluent 내리고 kafka 올려서 다시 위 순서대로 진행.

connect 실행했는데 윈도우에서 떴던 warn 과 에러가 안 나온다. mqtt 에서 메시지 발행.

에러가 나긴 했지만 살펴보니 convert 관련 에러라서 confluent connector 테스트 시에 적용했던 ByteArrayConveter 대신 StringConverter 로 설정 바꾼 뒤 다시 테스트.

kafka consumer 에 깔끔하게 메시지 찍힌다!!!

### connect-mqtt-source.properties
name=mqtt-source
connector.class=be.jovacon.kafka.connect.MQTTSourceConnector
tasks.max=1
mqtt.topic=temperature
kafka.topic= mqtt.temperature
mqtt.clientID=my_client_id
mqtt.broker=tcp://MQTT_BROKER:1883
#key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
#value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=true
#value.converter.schemas.enable=false
converter.encoding=UTF-8

자잘한 설정 절차 더 기록해두어야 겠지만 일단 여기까지.

참고.