본문 바로가기

Lang

[Java]Spring Cloud Stream 에서 Kafka 메시지 헤더 이용하는 방법 외

Kafka 메시지 처리를 위해 Spring Cloud Stream 을 이용하고 있고 Spring Cloud Stream 3.* 대로 올리면서 functional style 로 기존 소스 수정했었던 얘기 예전에 간단하게 적은 적 있었었다.

메시지 추가된게 있어서 간만에 코딩하게 되었는데 기존 메시지 포맷과 다른 몇 가지 점(json 배열 타입, nested 구조 등) 있어서 관련해서 메모해본다.

기존 메시지 포맷
{ "key1": "val1", "key2": "val2", ... }

추가된 메시지 포맷
[
  { "key1": "val1", "key2": "val2", "group1" : { ... }, "group2": { ... }, ... },
  { "key1": "val1", "key2": "val2", "group1" : { ... }, "group2": { ... }, ... }
]

1. json nested 처리.

nested 한 구조 처리하는 방법은 여러가지가 있을텐데 이번 경우는 같은 성격의 데이터끼리 그룹핑해놓은 것일 뿐이고 한 테이블에 단일 레코드로 납작하게(flatten) 처리하면 되는거라 기존 Entity 클래스 구조에 nested 된 json 을 unpack*** 메서드로 풀어놓도록 다음과 같은 식으로 간단히 구현했다.

    ...
    private String jobStsCd;
    private String jobTpCd;

    @SuppressWarnings("unchecked")
    @JsonProperty("jobdetailinfo")
    private void unpackJobDtl(JobDtl jobDtl) {
        this.jobStsCd = jobDtl.getJobStsCd();
        this.jobTpCd = jobDtl.getJobTpCd();
    }
    
    ...

2. Json 배열 처리

List 타입 이용해서 for 루핑 처리하면 되지 않을까 싶었는데 문제없이 너무 쉽게 해결 됨.

    @Bean
    public Consumer<JobOrd> handleJobOrd() {
        return jobOrd -> {
            log.info(jobOrd);
        };
    }
    
    // JSON Array 처리
    @Bean
    public Consumer<List<JobOrd>> handleJobOrds() {
        return jobOrds -> {
            jobOrds.forEach(jobOrd -> {
                log.info(jobOrd);
            });
        };
    }

 

3. Kafka Topic 에 메시지 수신된 시간.

토픽에 메시지 들어온 시간 정보를 추가 저장해달라는 요청이 있어서 이거 해결하느라 좀 고생.

이리저리 찾아보니 Message<T> 식으로 받으면 Header 정보를 이용할 수 있는 듯 싶어서 msg.getHeaders()  로 헤더 정보를 먼저 확인해보았다.

{ kafka_offset=241, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2b87b1f8, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, id=c2d35089-e028-f6fc-b443-40f1a1ac73f0, kafka_receivedPartitionId=0, kafka_receivedTopic=JobOrder, kafka_receivedTimestamp=1637299928389, contentType=application/json, kafka_groupId=jo-group, timestamp=1637300723810 }

헤더 정보 중 kafka_receivedTimestamp 이 있는 것 확인했고, 이거 이용하기 위해 아래와 같은 식으로 소스 수정. 

    @Bean
    public Consumer<Message<List<JobOrd>>> handleJobOrds() {
        return msg -> {
//            log.info(String.valueOf(msg.getHeaders()));
//            log.info(String.valueOf(msg.getPayload()));
            long rcvTs = (long) msg.getHeaders().get("kafka_receivedTimestamp");
            LocalDateTime kfDt = LocalDateTime.ofInstant(Instant.ofEpochMilli(rcvTs), TimeZone.getDefault().toZoneId());
            List<EvtJobOrd> jobOrds = msg.getPayload();
            jobOrds.forEach(jobOrd -> {
                jobOrd.setKafkaDt(kfDt);
                log.info(jobOrd);
            });
        };
    }

header 와 payload 사용하는 방법 알았으니 앞으로 응용할 수 있는 부분 많이 있을 듯.