Kafka 특정 토픽 내 메시지 개수를 구하는 용도로 많이 쓰는 쉘 명령 조합을 반복 실행하도록 살짝 수정했다.
$ while true; do echo "`date +"%T.%3N"` `bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic demo --time -1 | awk -F ":" '{sum += $3} END {print sum}'`"; done
GetOffsetShell 로 파티션별 offset 값을 구해서 그 합으로 메시지 개수를 확인하는 방식.
원리 응용해서 파이썬으로 구현해보면
from datetime import datetime
from kafka import KafkaConsumer, TopicPartition
import time
brokers = ['localhost:9092', ...]
consumer = KafkaConsumer(bootstrap_servers=brokers, auto_offset_reset='earliest', consumer_timeout_ms=10000)
TOPIC="demo"
pre_total = 0
curr_val = 0
accum_val = 0
while True:
partitions = []
for partition in consumer.partitions_for_topic(TOPIC):
partitions.append(TopicPartition(TOPIC, partition))
partitions = consumer.end_offsets(partitions)
offset_val = 0
for key, val in partitions.items():
offset_val += val
curr_val = offset_val - pre_total
if pre_total != 0:
accum_val += curr_val
print(datetime.now().strftime('%H:%M:%S.%f')[:-3], "|", offset_val, "|", curr_val, "|", accum_val)
pre_total = offset_val
time.sleep(1)
'Lang' 카테고리의 다른 글
Spring Cloud Stream - Annotation-based, Function based 혼용 (0) | 2021.05.18 |
---|---|
[python]kafka -> mongoDB 처리 시 insert_many 사용하기 (0) | 2020.12.28 |
[java]Kafka Streams 에서 Json 처리하기 (0) | 2020.11.25 |
[Python]Hydra 의 로그 파일을 일별로 변경하는 방법 (0) | 2020.10.15 |
[Python] jsonbender를 이용한 json 포맷 변환 (0) | 2020.10.15 |