Kafka 메시지 처리를 위해 Spring Cloud Stream 을 이용하고 있고 Spring Cloud Stream 3.* 대로 올리면서 functional style 로 기존 소스 수정했었던 얘기 예전에 간단하게 적은 적 있었었다.
메시지 추가된게 있어서 간만에 코딩하게 되었는데 기존 메시지 포맷과 다른 몇 가지 점(json 배열 타입, nested 구조 등) 있어서 관련해서 메모해본다.
기존 메시지 포맷
{ "key1": "val1", "key2": "val2", ... }
추가된 메시지 포맷
[
{ "key1": "val1", "key2": "val2", "group1" : { ... }, "group2": { ... }, ... },
{ "key1": "val1", "key2": "val2", "group1" : { ... }, "group2": { ... }, ... }
]
1. json nested 처리.
nested 한 구조 처리하는 방법은 여러가지가 있을텐데 이번 경우는 같은 성격의 데이터끼리 그룹핑해놓은 것일 뿐이고 한 테이블에 단일 레코드로 납작하게(flatten) 처리하면 되는거라 기존 Entity 클래스 구조에 nested 된 json 을 unpack*** 메서드로 풀어놓도록 다음과 같은 식으로 간단히 구현했다.
...
private String jobStsCd;
private String jobTpCd;
@SuppressWarnings("unchecked")
@JsonProperty("jobdetailinfo")
private void unpackJobDtl(JobDtl jobDtl) {
this.jobStsCd = jobDtl.getJobStsCd();
this.jobTpCd = jobDtl.getJobTpCd();
}
...
2. Json 배열 처리
List 타입 이용해서 for 루핑 처리하면 되지 않을까 싶었는데 문제없이 너무 쉽게 해결 됨.
@Bean
public Consumer<JobOrd> handleJobOrd() {
return jobOrd -> {
log.info(jobOrd);
};
}
// JSON Array 처리
@Bean
public Consumer<List<JobOrd>> handleJobOrds() {
return jobOrds -> {
jobOrds.forEach(jobOrd -> {
log.info(jobOrd);
});
};
}
3. Kafka Topic 에 메시지 수신된 시간.
토픽에 메시지 들어온 시간 정보를 추가 저장해달라는 요청이 있어서 이거 해결하느라 좀 고생.
이리저리 찾아보니 Message<T> 식으로 받으면 Header 정보를 이용할 수 있는 듯 싶어서 msg.getHeaders() 로 헤더 정보를 먼저 확인해보았다.
{ kafka_offset=241, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2b87b1f8, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, id=c2d35089-e028-f6fc-b443-40f1a1ac73f0, kafka_receivedPartitionId=0, kafka_receivedTopic=JobOrder, kafka_receivedTimestamp=1637299928389, contentType=application/json, kafka_groupId=jo-group, timestamp=1637300723810 }
헤더 정보 중 kafka_receivedTimestamp 이 있는 것 확인했고, 이거 이용하기 위해 아래와 같은 식으로 소스 수정.
@Bean
public Consumer<Message<List<JobOrd>>> handleJobOrds() {
return msg -> {
// log.info(String.valueOf(msg.getHeaders()));
// log.info(String.valueOf(msg.getPayload()));
long rcvTs = (long) msg.getHeaders().get("kafka_receivedTimestamp");
LocalDateTime kfDt = LocalDateTime.ofInstant(Instant.ofEpochMilli(rcvTs), TimeZone.getDefault().toZoneId());
List<EvtJobOrd> jobOrds = msg.getPayload();
jobOrds.forEach(jobOrd -> {
jobOrd.setKafkaDt(kfDt);
log.info(jobOrd);
});
};
}
header 와 payload 사용하는 방법 알았으니 앞으로 응용할 수 있는 부분 많이 있을 듯.
'Lang' 카테고리의 다른 글
[python]엑셀 파일에서 데이터 추출해서 소스 생성하기 (0) | 2022.04.14 |
---|---|
[kotlin]애기 걸음, reactive 구구단 (0) | 2022.04.04 |
[Julia]DataFrame 이용한 간단한 데이터 조작 예 (0) | 2021.10.15 |
[Julia]첫 공부는 이런 식으로 어떨런지 (0) | 2021.09.27 |
[julia]Decimal to Binary conversion (0) | 2021.09.23 |