사내 내부망에서만 쓰는지라 Kafka에 인증 기능 적용하지 않고 설치, 사용해왔는데 이번에 외부 업체에서 제공하는 인증 기능 적용된 Kafka 에서 메시지 수신할 일 생겨서 작업한 거 메모.
사내 kafka(spring.cloud.stream.binders.kafka0), 외부 kafka(spring.cloud.stream.binders.kafka1) 각각 설정.
인증 방식으로 SASL/SCRAM 적용되어 있다고 해서 그것에 맞추에 configuration 과 jaas 설정.
spring:
cloud:
stream:
default-binder: kafka0
binders:
kafka0:
type: kafka
environment:
spring.cloud.stream.kafka.binder.brokers: ****:9092
kafka1:
type: kafka
environment:
spring.cloud.stream.kafka:
binder:
brokers: ****:9093
configuration:
security:
protocol: SASL_PLAINTEXT
sasl:
mechanism: SCRAM-SHA-256
jaas:
loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
options:
username: ***
password: ***
functional 방식 사용하기 위해 function 설정하고 binder 에 kafka1(외부 Kafka) 지정.
spring:
cloud:
function:
definition: handleOutside;...;
stream:
bindings:
handleOutside-in-0:
consumer:
max-attempts: 1
back-off-initial-interval: 100
retryable-exceptions:
javax:
validation:
ValidationException: false
destination: test_topic
binder: kafka1
group: test-group
concurrency: 2
설정한 이름에 맞추어 구현.
@Bean
public Consumer<Msg0101> handleOutside() {
return msg0101 -> {
try {
Data0101 data0101 = msg0101.getData0101();
log.info("receive: {}", mapper.writeValueAsString(data0101));
...
} catch (JsonProcessingException e) {
log.error(e.getMessage());
}
};
}
인증 기능은 처음 써보는거라 조금 긴장했는데 걱정에 비해서는 수월하게 해결.
'Lang' 카테고리의 다른 글
[Java] FeignClient, no suitable HttpMessageConverter 에러 (1) | 2021.06.11 |
---|---|
[Java]Pair, Pair.of() (0) | 2021.06.09 |
Spring Cloud Stream - Annotation-based, Function based 혼용 (0) | 2021.05.18 |
[python]kafka -> mongoDB 처리 시 insert_many 사용하기 (0) | 2020.12.28 |
[python]kafka 토픽 내 메시지 개수를 구해보자 (0) | 2020.12.28 |