개발 환경에서 대량으로 카프카에 메시지 publish 할 일이 생겨서 python 으로 producer 프로그램을 온라인 매뉴얼 등을 참고해서 급조해보았습니다.
간단한 코딩이지만 몇가지 신경 쓸, 눈여겨 볼만한 것만 꼽아봤습니다.
1. 파이썬 3.7 이상인 경우 pip instll kafka 해서 사용하면 에러. pip install kafka-python 해서 사용해야 함.
2. json 스트링 값으로 보내기 위해 KafkaProducer에서 value_serializer에 람다 이용해서 dictionary 타입을 json 으로 변환하는 것 정도 눈여겨 보시면 될 듯.
3. pip install PyMySQL 해서 DB 라이브러리 땡겨주셔야 하고 sql 문에서는 JSON_OBJECT('key이름', 컬럼이름, ...) 식으로 해주면 json 스트링 형태로 데이터 추출 가능함.
import pymysql
from mysql.connector import Error
import json
from kafka import KafkaProducer
from kafka.client import log
kfk_servers = ['kafka_server_ip:9092']
def on_send_error(excp):
log.error('Errback', exc_info=excp)
def main():
producer = KafkaProducer(bootstrap_servers=kfk_servers, value_serializer=lambda m: json.dumps(m).encode('ascii'))
try:
con = pymysql.connect(host="db_ip", user="db_user", password="db_pwd",
db='db_name', charset='utf8')
cur = con.cursor()
sql = '''SELECT JSON_OBJECT('machId', mach_id, 'posVal', pos_val,
'evntDt', DATE_FORMAT(evnt_dt, '%Y%m%d%H%i%s')) FROM tc_sprd_pos_hst;
'''
cur.execute(sql)
result = cur.fetchall()
topic_name = "토픽명"
for row in result:
json_msg = json.loads(row[0])
producer.send(topic_name, json_msg).add_errback(on_send_error)
except Error as e:
print("Error reading data from MySQL table", e)
finally:
cur.close()
con.close()
print("MariaDB connection is closed")
producer.flush()
print("produce end")
if __name__ == '__main__':
main()
'Slack 채널 정리' 카테고리의 다른 글
Native Query 사용 시 Space is not allowed after parameter prefix ':' 에러 처리 방법 (2) | 2019.11.27 |
---|---|
sqlite 데이터, mariaDB로 이전 (0) | 2019.11.27 |
java - snake to camel (0) | 2019.11.26 |
groovy - snake to camel (0) | 2019.11.26 |
vim 에서 snake, camel 변환 (0) | 2019.11.26 |