여러 장비에서 전송되는 메시지의 Kafka 처리 상황을 간단히 확인해보기 위해 Kafka Streams 를 이용해보기로 했다.
Kafka Streams 기본 기능 익히기 위해 아래 페이지의 샘플 소스로 우선 기능 테스트.
세세한 API 사용법은 아직 모르겠지만 기본 실행은 성공.
장비에서 전달되는 메시지가 JSON 타입이므로 위 샘플 코드를 응용하려면 JSON Serdes 가 필요하겠기에 구글링으로 가장 무난한 레퍼런스를 찾았다.
테스트해보려는 기본 메시지 형태가 아래 같다고 할 때
{"machId": "A01", "lat": 10, "lon": 992, "pubDt": "20201125100233"}
{"machId": "A02", "lat": 11, "lon": 892, "pubDt": "20201125100233"}
테스트를 위해 소스용 토픽으로 "demo", 스트림 처리 결과를 받을 "demo-out", 이렇게 두 개의 토픽을 생성해주었고, 메시지에 맞추어 소스 토픽용 Java Object(Machine) 과 스트림 처리 결과용 Java Object 를 만들어주었다.
스트림 처리 결과용 Java Object 는 Lombok 이용해서 코딩을 단순화해줬고 Kafka Stream 처리 시점을 추가해주었다.
import lombok.Data;
import java.time.LocalDateTime;
@Data
public class Mach {
private String machId;
private String lat;
private String lon;
private String pubDt;
private LocalDateTime rcvDt = LocalDateTime.now();
}
레퍼런스의 코드들에서 새로 만든 Java Object 명에 맞추어 클래스명과 메서드명만 일부 바꿔주고, 예제 이용해서 간단한 스트림 애플리케이션을 아래와 같이 코딩했다.
public class Pipe1 {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe1");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "...:9092,...:9093,...:9094");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
builder.stream("demo", Consumed.with(Serdes.String(), CustomSerdes.Mach())).to("demo-out");
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("sd-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
실행해보면 유명한 개발자 유머처럼 '어 이게 왜 되지?!' 싶게 잘 동작한다. 다만, 예상했던대로 DateTime 처리 부분에 개선할 부분이 보인다.
$ ./bin/kafka-console-consumer.sh --bootstrap-server ... --topic demo
{"machId": "A01", "lat": 10, "lon": 992,"pubDt": "20201125100233"}
$ ./bin/kafka-console-consumer.sh --bootstrap-server ... --topic demo-out
{"machId":"A01","lat":"10","lon":"992","pubDt":"20201125100233","rcvtDt":{"month":"NOVEMBER","dayOfWeek":"WEDNESDAY","dayOfYear":330,"nano":706899400,"year":2020,"monthValue":11,"dayOfMonth":25,"hour":10,"minute":2,"second":31,"chronology":{"calendarType":"iso8601","id":"ISO"}}}
JsonSerializer 구현해놓은 코드 보니 jackson 의 ObjectMapper 를 사용하고 있어서 가장 기본적인 방법을 적용해보기로 했다.
JavaTimeModule 사용하기 위해 라이브러리 추가하고 JsonSerializer 를 아래와 같이 수정했다.
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.11.3</version>
</dependency>
public class JsonSerializer<T> implements Serializer<T> {
// private ObjectMapper objectMapper = new ObjectMapper();
private final ObjectMapper objectMapper = JsonMapper.builder()
.addModule(new JavaTimeModule())
.build();
...
그리고, 원하는 포맷으로 출력하기 위해 Java Object 에는 @JsonFormat 을 추가.
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyyMMddHHmmss")
private LocalDateTime countDt = LocalDateTime.now();
의도한대로 날짜 포맷 적용되어 메시지 처리가 된다.
{"machId":"A01","lat":"10","lon":"992","pubDt":"20201125131050","rcvDt":"20201125131048"}
* pubDt 에 비해 rcvDt 가 빠른건 pub 클라이언트 실행한 머쉰보다 스트림 처리한 머쉰의 시스템 시간이 좀 빠른 탓.
자, 그럼 이제 Word Count 예제를 응용해서 장비별 카운트를 구현해보자.
우선 Count 용 Java Object 코딩.
@Data
public class MachCount {
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyyMMddHHmmss")
private LocalDateTime countDt = LocalDateTime.now();
private String machId;
private long count;
public MachCount(String machId, long count) {
this.machId = machId;
this.count = count;
}
}
집계 시점을 확인하기 위해 countDt 를 넣어줬고, machId, count 필드 이용한 생성자를 만들었다.
CustomSerdes 에 Serdes 를 추가해주고
static public final class MachCountSerde
extends Serdes.WrapperSerde<MachCount> {
public MachCountSerde() {
super(new JsonSerializer<>(),
new JsonDeserializer<>(MachCount.class));
}
}
public static Serde<MachCount> MachCount() {
return new MachCountSerde();
}
Pipe1 소스에서 스트림 처리 로직 부분만 Word Count 예 응용해서 고쳐주었다.
final KStream<String, Machine> source = builder
.stream("demo", Consumed.with(Serdes.String(), CustomSerdes.Machine()));
final KTable<String, MachCount> counts = source
.groupBy((key, value) -> value.getMachId())
.count()
.mapValues(MachCount::new);
counts
.toStream()
.to("demo-out", Produced.with(Serdes.String(), CustomSerdes.MachCount()));
go 로 짜놓은 pub client 로 대량 메시지 발송해보니 잘 나온다.
{"countDt":"20201125132339","machId":"A08","count":956}
{"countDt":"20201125132339","machId":"A09","count":807}
...
KTable 에 카운트가 계속 누적되는데 주기별로 초기화해서 주기별 카운트 표시해줄 수 있으면 더 유용할거 같은데 이건 아직 고민 중.
'Lang' 카테고리의 다른 글
[python]kafka -> mongoDB 처리 시 insert_many 사용하기 (0) | 2020.12.28 |
---|---|
[python]kafka 토픽 내 메시지 개수를 구해보자 (0) | 2020.12.28 |
[Python]Hydra 의 로그 파일을 일별로 변경하는 방법 (0) | 2020.10.15 |
[Python] jsonbender를 이용한 json 포맷 변환 (0) | 2020.10.15 |
Pandas DataFrame 의 특정 컬럼 값 변환하는 방법 (0) | 2020.10.14 |