본문 바로가기

Lang

[python]kafka 토픽 내 메시지 개수를 구해보자

Kafka 특정 토픽 내 메시지 개수를 구하는 용도로 많이 쓰는 쉘 명령 조합을 반복 실행하도록 살짝 수정했다.

$ while true; do echo "`date +"%T.%3N"` `bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic demo --time -1 | awk -F ":" '{sum += $3} END {print sum}'`"; done

GetOffsetShell  파티션별 offset 값을 구해서 그 합으로 메시지 개수를 확인하는 방식.

원리 응용해서 파이썬으로 구현해보면

from datetime import datetime
from kafka import KafkaConsumer, TopicPartition
import time

brokers = ['localhost:9092', ...]
consumer = KafkaConsumer(bootstrap_servers=brokers, auto_offset_reset='earliest', consumer_timeout_ms=10000)

TOPIC="demo"
pre_total = 0
curr_val = 0
accum_val = 0

while True:
    partitions = []
    for partition in consumer.partitions_for_topic(TOPIC):
        partitions.append(TopicPartition(TOPIC, partition))

    partitions = consumer.end_offsets(partitions)
    offset_val = 0
    for key, val in partitions.items():
        offset_val += val

    curr_val = offset_val - pre_total
    if pre_total != 0:
        accum_val += curr_val

    print(datetime.now().strftime('%H:%M:%S.%f')[:-3], "|",  offset_val, "|", curr_val, "|", accum_val)
    pre_total = offset_val
    time.sleep(1)