본문 바로가기

OpenSource

kafkacat 이용해서 메시지 수신 시각 확인해보기

kafka 에 메시지 수신된 시점을 확인해보려고 이것저것 시도해보는 중인데 이번에는 kafkacat 을 한 번 이용해봤다.

windows 10 의 wsl 에 kafkacat 을 설치했다 (wsl 로 ubuntu 설치된 상태라 간단하게 다음 명령으로 설치) 

$ sudo apt-get install kafkacat

kafkacat 로 컨슈밍한 메시지를 특정 토픽으로 다시 전송한 후 -f 옵션으로 Timestamp 와 메시지 본문 출력을 파일로 저장.

$ kafkacat -C -b 172.20.130.105:9091 -t demo | kafkacat -P -b 172.20.130.105:9091 -t demo-out
kafkacat -C -b broker_ip:9091 -t demo-out -f '%T %s\n' | tee raw.out

Timestamp 를 쉽게 읽기 위해 파이썬으로 포맷 변경.

from datetime import datetime


def readable_time(str_ts):
  date_time = datetime.fromtimestamp(int(str_ts) / 1000)
  return date_time.strftime("%H:%M:%S.%f")[:-3]


with open("raw.out", 'r') as f, open("result.out", "w") as f1:
  for line in f:
    ts = readable_time(line.split()[0])
    f1.write(f"{ts} {line[14:]}")
송신 메시지 : {"machId": "AB1",...}
raw.out       : 1607408610929 {"machId": "AB1",...}
result.out     : 15:23:30.929 {"machId": "AB1",...}

추가(2020.12.14).

너무 간단해서 오히려 뒤늦게 알게된건데 kafka broker 에서 'log.message.timestamp.type' 옵션 설정을 바꾸어서 메시지 수신 시점을 알아내는 방법이 있었다. 설정 기본값은 'CreateTime'으로 메시지 생성 시점에 timestamp 이 생성된다. 이 설정값을 'LogAppendTime'으로 바꾸면 파티션에 로그 저장하는 시점에 timestamp이 생성된다.

아래는 메시지 timestamp 를 확인하는 간단한 파이썬 스크립트.

consumer = KafkaConsumer('demo',
     bootstrap_servers=[...],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group',
     consumer_timeout_ms=1000,
     value_deserializer=lambda x: loads(x.decode('utf-8'))
)

for message in consumer:
    print(datetime.fromtimestamp(message.timestamp / 1000).strftime("%H:%M:%S.%f")[:-3])

ps.

1. 수신한 시간과 다시 pub 처리한 시간 사이에 차가 있을테니 이걸로 정확한 수신 시간을 확인할 수는 없을거고 일단 참조용.

2. 같은 장비인데도 windows 시스템 시간과 wsl 의 시스템 시간이 ms 단위에서 차이가 있다. kafkacat 으로 처리한 시점과 윈도우에서 produce 한 시간 비교해서 분석하려다가 이것때문에 지금 '헉' 하고 있는 상태.