kafka 에 메시지 수신된 시점을 확인해보려고 이것저것 시도해보는 중인데 이번에는 kafkacat 을 한 번 이용해봤다.
windows 10 의 wsl 에 kafkacat 을 설치했다 (wsl 로 ubuntu 설치된 상태라 간단하게 다음 명령으로 설치)
$ sudo apt-get install kafkacat
kafkacat 로 컨슈밍한 메시지를 특정 토픽으로 다시 전송한 후 -f 옵션으로 Timestamp 와 메시지 본문 출력을 파일로 저장.
$ kafkacat -C -b 172.20.130.105:9091 -t demo | kafkacat -P -b 172.20.130.105:9091 -t demo-out
$ kafkacat -C -b broker_ip:9091 -t demo-out -f '%T %s\n' | tee raw.out
Timestamp 를 쉽게 읽기 위해 파이썬으로 포맷 변경.
from datetime import datetime
def readable_time(str_ts):
date_time = datetime.fromtimestamp(int(str_ts) / 1000)
return date_time.strftime("%H:%M:%S.%f")[:-3]
with open("raw.out", 'r') as f, open("result.out", "w") as f1:
for line in f:
ts = readable_time(line.split()[0])
f1.write(f"{ts} {line[14:]}")
송신 메시지 : {"machId": "AB1",...}
raw.out : 1607408610929 {"machId": "AB1",...}
result.out : 15:23:30.929 {"machId": "AB1",...}
추가(2020.12.14).
너무 간단해서 오히려 뒤늦게 알게된건데 kafka broker 에서 'log.message.timestamp.type' 옵션 설정을 바꾸어서 메시지 수신 시점을 알아내는 방법이 있었다. 설정 기본값은 'CreateTime'으로 메시지 생성 시점에 timestamp 이 생성된다. 이 설정값을 'LogAppendTime'으로 바꾸면 파티션에 로그 저장하는 시점에 timestamp이 생성된다.
아래는 메시지 timestamp 를 확인하는 간단한 파이썬 스크립트.
consumer = KafkaConsumer('demo',
bootstrap_servers=[...],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
consumer_timeout_ms=1000,
value_deserializer=lambda x: loads(x.decode('utf-8'))
)
for message in consumer:
print(datetime.fromtimestamp(message.timestamp / 1000).strftime("%H:%M:%S.%f")[:-3])
ps.
1. 수신한 시간과 다시 pub 처리한 시간 사이에 차가 있을테니 이걸로 정확한 수신 시간을 확인할 수는 없을거고 일단 참조용.
2. 같은 장비인데도 windows 시스템 시간과 wsl 의 시스템 시간이 ms 단위에서 차이가 있다. kafkacat 으로 처리한 시점과 윈도우에서 produce 한 시간 비교해서 분석하려다가 이것때문에 지금 '헉' 하고 있는 상태.
'OpenSource' 카테고리의 다른 글
Helm chart repository deprecation 관련 메모 (0) | 2021.02.26 |
---|---|
[Bash]kafka 일괄 shutdown 스크립트 (0) | 2020.12.09 |
Kafka mqtt connector 사용법 - RabbitMQ (0) | 2020.10.28 |
Kafka mqtt connector 사용법 - mqtt broker (1) | 2020.10.27 |
Kafka 모니터링 - burrow 설치 (0) | 2020.07.28 |