본문 바로가기

Lang

[python]kafka -> mongoDB 처리 시 insert_many 사용하기

kafka -> mongoDB 저장 성능을 높이기 위해 고민하던 중 같이 작업하던 동료가 찾은 multiprocessing 이용한 컨슈머 소스에 insert_many() 을 적용해서 효과본거 간단하게 정리.

import time
from datetime import datetime

from kafka import KafkaConsumer
from ujson import loads
from pymongo import MongoClient

client = MongoClient("...:27017")
db = client.testdb            
			
consumer = KafkaConsumer('demo',
	bootstrap_servers=['localhost:9092', ...],
	auto_offset_reset='earliest',
	enable_auto_commit=True,
	max_poll_records=2000, # default 500
	group_id='my-group',
	consumer_timeout_ms=1000,
	value_deserializer=lambda x: loads(x.decode('utf-8'))
)

while True:
	message = consumer.poll()
	if len(message) == 0: 
		time.sleep(1)
	for topic_partition, records in message.items():
		kmsgs = []
		for record in records:
			msg = record.value
			dt_rcv = datetime.fromtimestamp(record.timestamp / 1000).strftime("%H%M%S.%f")[:-3]
			dt_cre = datetime.now().strftime("%H%M%S.%f")[:-3]
			msg['rcvDt'] = dt_rcv
			msg['creDt'] = dt_cre
			#db.demo.insert_one(msg)
			kmsgs.append(msg)

		#print(topic_partition, len(kmsgs))
		db.demo.insert_many(kmsgs)

consumer.close()

multiprocessing 관련 부분은 빼고 컨슈밍해서 몽고에 저장 구현하는 부분만 옮겨봤다.

수신한 메시지에 메시지 수신 시간과 처리 시간 추가한 후 insert_one() 으로 건별 저장하는 대신 리스트에 담은 후에  insert_many() 로 멀티 인서트 처리.

max_poll_records 설정값을 늘려주면 insert_many() 에서 한 번에 처리하는 건수가 늘어나니까 다른 제약조건과 충돌하지 않는다면 일반적인 상황에서는 더 높은 성능을 보여줄 것이다( max_poll_records 의 기본값은 500. 테스트시에는 이 값을 2000으로 변경 적용했음)