본문 바로가기

전체 글

(170)
[python]kafka -> mongoDB 처리 시 insert_many 사용하기 kafka -> mongoDB 저장 성능을 높이기 위해 고민하던 중 같이 작업하던 동료가 찾은 multiprocessing 이용한 컨슈머 소스에 insert_many() 을 적용해서 효과본거 간단하게 정리. import time from datetime import datetime from kafka import KafkaConsumer from ujson import loads from pymongo import MongoClient client = MongoClient("...:27017") db = client.testdb consumer = KafkaConsumer('demo', bootstrap_servers=['localhost:9092', ...], auto_offset_reset='earl..
[python]kafka 토픽 내 메시지 개수를 구해보자 Kafka 특정 토픽 내 메시지 개수를 구하는 용도로 많이 쓰는 쉘 명령 조합을 반복 실행하도록 살짝 수정했다. $ while true; do echo "`date +"%T.%3N"` `bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic demo --time -1 | awk -F ":" '{sum += $3} END {print sum}'`"; done GetOffsetShell 로 파티션별 offset 값을 구해서 그 합으로 메시지 개수를 확인하는 방식. 원리 응용해서 파이썬으로 구현해보면 from datetime import datetime from kafka import KafkaConsumer,..
[Bash]kafka 일괄 shutdown 스크립트 리눅스 박스에 kafka broker 3개 띄우고 테스트 중에 kafka 한꺼번에 내릴 일이 많아서 오랫만에 스크립트를 만들어봤다. shutdown 하는데 조금 시간이 걸려서 이왕 만드는 김에 완전히 내려갈 때까지 상태 찍어주는 것도 추가. #!/bin/bash SIGNAL=${SIGNAL:-TERM} GREP_STR=config/k PIDS=$(jps -m | grep $GREP_STR | grep -v grep | awk '{print $1}') if [ -z "$PIDS" ]; then echo "No kafka server to stop" exit 1 else echo $SIGNAL $PIDS kill -s $SIGNAL $PIDS idx=3 while [ ${idx} -ge 1 ]; do idx..
kafkacat 이용해서 메시지 수신 시각 확인해보기 kafka 에 메시지 수신된 시점을 확인해보려고 이것저것 시도해보는 중인데 이번에는 kafkacat 을 한 번 이용해봤다. windows 10 의 wsl 에 kafkacat 을 설치했다 (wsl 로 ubuntu 설치된 상태라 간단하게 다음 명령으로 설치) $ sudo apt-get install kafkacat kafkacat 로 컨슈밍한 메시지를 특정 토픽으로 다시 전송한 후 -f 옵션으로 Timestamp 와 메시지 본문 출력을 파일로 저장. $ kafkacat -C -b 172.20.130.105:9091 -t demo | kafkacat -P -b 172.20.130.105:9091 -t demo-out $ kafkacat -C -b broker_ip:9091 -t demo-out -f '%T %..
[java]Kafka Streams 에서 Json 처리하기 여러 장비에서 전송되는 메시지의 Kafka 처리 상황을 간단히 확인해보기 위해 Kafka Streams 를 이용해보기로 했다. Kafka Streams 기본 기능 익히기 위해 아래 페이지의 샘플 소스로 우선 기능 테스트. KAFKA STREAMS 세세한 API 사용법은 아직 모르겠지만 기본 실행은 성공. 장비에서 전달되는 메시지가 JSON 타입이므로 위 샘플 코드를 응용하려면 JSON Serdes 가 필요하겠기에 구글링으로 가장 무난한 레퍼런스를 찾았다. Implementing custom SerDes for Java objects using Json Serializer and Deserializer in Kafka Streams applications 테스트해보려는 기본 메시지 형태가 아래 같다고 할 ..
Kafka mqtt connector 사용법 - RabbitMQ Kafka mqtt connector 사용법 - Mqtt Broker 글 쓴 후 mqtt broker 로 RabbitMQ 사용하는 구조에 대한 추가 검토가 필요해져서 테스트한 내용 이어서 정리해보겠습니다. Kafka mqtt connector 사용법 mqtt 와 kafka 연동할 일이 생겨서 기술 검토를 진행했다. 일반적으로 언급되는 구조가 위 이미지처럼 mqtt broker 를 이용해서 통신하는 방법이던데 온라인 상의 다수의 레퍼런스가 confluent 의 mqtt brok tzara.tistory.com MQTT Plugin MQTT Plugin — RabbitMQ MQTT Plugin RabbitMQ supports MQTT 3.1.1 via a plugin that ships in the core..
Kafka mqtt connector 사용법 - mqtt broker mqtt 와 kafka 연동할 일이 생겨서 기술 검토를 진행했다. 일반적으로 언급되는 구조가 위 이미지처럼 mqtt broker 를 이용해서 통신하는 방법이던데 온라인 상의 다수의 레퍼런스가 confluent 의 mqtt broker 사용예라서 그 중 하나 따라서 해보았다(confluent platform(community 라이센스) 사전 설치했는데 이 설치 절차는 패스). Apache Kafka / Kafka Connect / MQTT / Mosquitto Live Demo confluent platform 설정에서 무언가 빠뜨렸는지 메시지 convert 과정에서 에러가 발생해서 connector 설정에 ByteArrayConverter 추가해준 거외는 거의 똑같이 진행했고 무난하게 성공했다. "val..
[Python]Hydra 의 로그 파일을 일별로 변경하는 방법 Hydra Customizing logging 페이지에 적힌 'Hydra is configuring Python standard logging library with the dictConfig method' 란 설명 보고 Python 표준 문서의 TimedRotatingFileHandler 참고해서 적용해보았다. formatters: basic: format: '[%(asctime)s][%(name)-12s][%(levelname)s] - %(message)s' datefmt: "%Y-%m-%d %H:%M:%S" handlers: console: class: logging.StreamHandler formatter: basic stream: ext://sys.stdout file: class: loggi..