kafka -> mongoDB 저장 성능을 높이기 위해 고민하던 중 같이 작업하던 동료가 찾은 multiprocessing 이용한 컨슈머 소스에 insert_many() 을 적용해서 효과본거 간단하게 정리.
import time
from datetime import datetime
from kafka import KafkaConsumer
from ujson import loads
from pymongo import MongoClient
client = MongoClient("...:27017")
db = client.testdb
consumer = KafkaConsumer('demo',
bootstrap_servers=['localhost:9092', ...],
auto_offset_reset='earliest',
enable_auto_commit=True,
max_poll_records=2000, # default 500
group_id='my-group',
consumer_timeout_ms=1000,
value_deserializer=lambda x: loads(x.decode('utf-8'))
)
while True:
message = consumer.poll()
if len(message) == 0:
time.sleep(1)
for topic_partition, records in message.items():
kmsgs = []
for record in records:
msg = record.value
dt_rcv = datetime.fromtimestamp(record.timestamp / 1000).strftime("%H%M%S.%f")[:-3]
dt_cre = datetime.now().strftime("%H%M%S.%f")[:-3]
msg['rcvDt'] = dt_rcv
msg['creDt'] = dt_cre
#db.demo.insert_one(msg)
kmsgs.append(msg)
#print(topic_partition, len(kmsgs))
db.demo.insert_many(kmsgs)
consumer.close()
multiprocessing 관련 부분은 빼고 컨슈밍해서 몽고에 저장 구현하는 부분만 옮겨봤다.
수신한 메시지에 메시지 수신 시간과 처리 시간 추가한 후 insert_one() 으로 건별 저장하는 대신 리스트에 담은 후에 insert_many() 로 멀티 인서트 처리.
max_poll_records 설정값을 늘려주면 insert_many() 에서 한 번에 처리하는 건수가 늘어나니까 다른 제약조건과 충돌하지 않는다면 일반적인 상황에서는 더 높은 성능을 보여줄 것이다( max_poll_records 의 기본값은 500. 테스트시에는 이 값을 2000으로 변경 적용했음)
'Lang' 카테고리의 다른 글
[Java]Spring Cloud 에서 Kafka 인증 기능 사용 (0) | 2021.06.02 |
---|---|
Spring Cloud Stream - Annotation-based, Function based 혼용 (0) | 2021.05.18 |
[python]kafka 토픽 내 메시지 개수를 구해보자 (0) | 2020.12.28 |
[java]Kafka Streams 에서 Json 처리하기 (0) | 2020.11.25 |
[Python]Hydra 의 로그 파일을 일별로 변경하는 방법 (0) | 2020.10.15 |