본문 바로가기

Lang

[java]Kafka Streams 에서 Json 처리하기

여러 장비에서 전송되는 메시지의 Kafka 처리 상황을 간단히 확인해보기 위해 Kafka Streams 를 이용해보기로 했다.

Kafka Streams 기본 기능 익히기 위해 아래 페이지의 샘플 소스로 우선 기능 테스트.

세세한 API 사용법은 아직 모르겠지만 기본 실행은 성공.

장비에서 전달되는 메시지가 JSON 타입이므로 위 샘플 코드를 응용하려면 JSON Serdes 가 필요하겠기에 구글링으로 가장 무난한 레퍼런스를 찾았다.

테스트해보려는 기본 메시지 형태가 아래 같다고 할 때

{"machId": "A01", "lat": 10, "lon": 992, "pubDt": "20201125100233"}
{"machId": "A02", "lat": 11, "lon": 892, "pubDt": "20201125100233"}

테스트를 위해 소스용 토픽으로 "demo", 스트림 처리 결과를 받을 "demo-out", 이렇게 두 개의 토픽을 생성해주었고, 메시지에 맞추어 소스 토픽용 Java Object(Machine) 과 스트림 처리 결과용 Java Object 를 만들어주었다.

스트림 처리 결과용 Java ObjectLombok 이용해서 코딩을 단순화해줬고 Kafka Stream 처리 시점을 추가해주었다.

import lombok.Data;
import java.time.LocalDateTime;

@Data
public class Mach {

    private String machId;
    private String lat;
    private String lon;
    private String pubDt;

    private LocalDateTime rcvDt = LocalDateTime.now();
}

레퍼런스의 코드들에서 새로 만든 Java Object 명에 맞추어 클래스명과 메서드명만 일부 바꿔주고, 예제 이용해서 간단한 스트림 애플리케이션을 아래와 같이 코딩했다.

public class Pipe1 {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe1");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "...:9092,...:9093,...:9094");

        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();

        builder.stream("demo", Consumed.with(Serdes.String(), CustomSerdes.Mach())).to("demo-out");

        final Topology topology = builder.build();

        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

          Runtime.getRuntime().addShutdownHook(new Thread("sd-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

실행해보면 유명한 개발자 유머처럼 '어 이게 왜 되지?!' 싶게 잘 동작한다. 다만, 예상했던대로 DateTime 처리 부분에 개선할 부분이 보인다.

$ ./bin/kafka-console-consumer.sh --bootstrap-server ... --topic demo

{"machId": "A01", "lat": 10, "lon": 992,"pubDt": "20201125100233"}


$ ./bin/kafka-console-consumer.sh --bootstrap-server ... --topic demo-out

{"machId":"A01","lat":"10","lon":"992","pubDt":"20201125100233","rcvtDt":{"month":"NOVEMBER","dayOfWeek":"WEDNESDAY","dayOfYear":330,"nano":706899400,"year":2020,"monthValue":11,"dayOfMonth":25,"hour":10,"minute":2,"second":31,"chronology":{"calendarType":"iso8601","id":"ISO"}}}

JsonSerializer 구현해놓은 코드 보니 jackson 의 ObjectMapper 를 사용하고 있어서 가장 기본적인 방법을 적용해보기로 했다.

JavaTimeModule 사용하기 위해 라이브러리 추가하고 JsonSerializer 를 아래와 같이 수정했다.

    <dependency>
        <groupId>com.fasterxml.jackson.datatype</groupId>
        <artifactId>jackson-datatype-jsr310</artifactId>
        <version>2.11.3</version>
    </dependency>
public class JsonSerializer<T> implements Serializer<T> {
//    private ObjectMapper objectMapper = new ObjectMapper();
    private final ObjectMapper objectMapper = JsonMapper.builder()
        .addModule(new JavaTimeModule())
        .build();
...

그리고, 원하는 포맷으로 출력하기 위해 Java Object 에는 @JsonFormat 을 추가.

    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyyMMddHHmmss")
    private LocalDateTime countDt = LocalDateTime.now();

의도한대로 날짜 포맷 적용되어 메시지 처리가 된다.

{"machId":"A01","lat":"10","lon":"992","pubDt":"20201125131050","rcvDt":"20201125131048"}

* pubDt 에 비해 rcvDt 가 빠른건 pub 클라이언트 실행한 머쉰보다 스트림 처리한 머쉰의 시스템 시간이 좀 빠른 탓.

자, 그럼 이제 Word Count 예제를 응용해서 장비별 카운트를 구현해보자.

우선 Count 용 Java Object 코딩.

@Data
public class MachCount {
    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyyMMddHHmmss")
    private LocalDateTime countDt = LocalDateTime.now();

    private String machId;
    private long count;

    public MachCount(String machId, long count) {
        this.machId = machId;
        this.count = count;
    }
}

집계 시점을 확인하기 위해 countDt 를 넣어줬고, machId, count 필드 이용한 생성자를 만들었다.

CustomSerdes 에 Serdes 를 추가해주고

    static public final class MachCountSerde
            extends Serdes.WrapperSerde<MachCount> {
        public MachCountSerde() {
            super(new JsonSerializer<>(),
                    new JsonDeserializer<>(MachCount.class));
        }
    }

    public static Serde<MachCount> MachCount() {
        return new MachCountSerde();
    }

Pipe1 소스에서 스트림 처리 로직 부분만 Word Count 예 응용해서 고쳐주었다.

        final KStream<String, Machine> source = builder
                .stream("demo", Consumed.with(Serdes.String(), CustomSerdes.Machine()));

        final KTable<String, MachCount> counts = source
                .groupBy((key, value) -> value.getMachId())
                .count()
                .mapValues(MachCount::new);

        counts
                .toStream()
                .to("demo-out", Produced.with(Serdes.String(), CustomSerdes.MachCount()));

go 로 짜놓은 pub client 로 대량 메시지 발송해보니 잘 나온다.

{"countDt":"20201125132339","machId":"A08","count":956}
{"countDt":"20201125132339","machId":"A09","count":807}
...

KTable 에 카운트가 계속 누적되는데 주기별로 초기화해서 주기별 카운트 표시해줄 수 있으면 더 유용할거 같은데 이건 아직 고민 중.