본문 바로가기

Lang

(36)
[Java]Pair, Pair.of() 메서드에서 2개의 값을 리턴하는거 깔끔하게 구현할 방법 찾아보다가 발견한 글. Using Pairs in Java 1. 작업 중이던 프로젝트에서는 마침 springframework 에 Pair Class 란게 있길래 사용해보았다. import org.springframework.data.util.Pair Pair pair = Pair.of(2, "Two"); pair.getFirst(); pair.getSecond(); 2. 'Apache Commons' 의 Pair Class Doc 보니 getKey/Value 외에 getLeft/Right 도 있다. 테스트해보지 않아 뭔 차이 있으려나는 아직 잘 모르...
[Java]Spring Cloud 에서 Kafka 인증 기능 사용 사내 내부망에서만 쓰는지라 Kafka에 인증 기능 적용하지 않고 설치, 사용해왔는데 이번에 외부 업체에서 제공하는 인증 기능 적용된 Kafka 에서 메시지 수신할 일 생겨서 작업한 거 메모. 사내 kafka(spring.cloud.stream.binders.kafka0), 외부 kafka(spring.cloud.stream.binders.kafka1) 각각 설정. 인증 방식으로 SASL/SCRAM 적용되어 있다고 해서 그것에 맞추에 configuration 과 jaas 설정. spring: cloud: stream: default-binder: kafka0 binders: kafka0: type: kafka environment: spring.cloud.stream.kafka.binder.brokers:..
Spring Cloud Stream - Annotation-based, Function based 혼용 현재까지의 결론은 '안된다'. 테스트해본 바로는 두 방식을 함께 적용하면 annotation 만 동작하고 functional 한 방식은 동작하지 않는다. 해법 찾으려고 시도해봤지만 공식 문서에서의 언급은 못 찾았고 아래 글 보고 일단 잠정적으로 혼용 시도는 중단. 'You cannot mix EnableBinding and functional model in the same application.' https://stackoverflow.com/questions/63862797/spring-cloud-stream-function-support-does-not-work 작년이었나 Spring Cloud Stream 3.* 대 올라가면서 functional style 도입되었다는 기사 보고 이전 짰던 소스 ..
[python]kafka -> mongoDB 처리 시 insert_many 사용하기 kafka -> mongoDB 저장 성능을 높이기 위해 고민하던 중 같이 작업하던 동료가 찾은 multiprocessing 이용한 컨슈머 소스에 insert_many() 을 적용해서 효과본거 간단하게 정리. import time from datetime import datetime from kafka import KafkaConsumer from ujson import loads from pymongo import MongoClient client = MongoClient("...:27017") db = client.testdb consumer = KafkaConsumer('demo', bootstrap_servers=['localhost:9092', ...], auto_offset_reset='earl..
[python]kafka 토픽 내 메시지 개수를 구해보자 Kafka 특정 토픽 내 메시지 개수를 구하는 용도로 많이 쓰는 쉘 명령 조합을 반복 실행하도록 살짝 수정했다. $ while true; do echo "`date +"%T.%3N"` `bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic demo --time -1 | awk -F ":" '{sum += $3} END {print sum}'`"; done GetOffsetShell 로 파티션별 offset 값을 구해서 그 합으로 메시지 개수를 확인하는 방식. 원리 응용해서 파이썬으로 구현해보면 from datetime import datetime from kafka import KafkaConsumer,..
[java]Kafka Streams 에서 Json 처리하기 여러 장비에서 전송되는 메시지의 Kafka 처리 상황을 간단히 확인해보기 위해 Kafka Streams 를 이용해보기로 했다. Kafka Streams 기본 기능 익히기 위해 아래 페이지의 샘플 소스로 우선 기능 테스트. KAFKA STREAMS 세세한 API 사용법은 아직 모르겠지만 기본 실행은 성공. 장비에서 전달되는 메시지가 JSON 타입이므로 위 샘플 코드를 응용하려면 JSON Serdes 가 필요하겠기에 구글링으로 가장 무난한 레퍼런스를 찾았다. Implementing custom SerDes for Java objects using Json Serializer and Deserializer in Kafka Streams applications 테스트해보려는 기본 메시지 형태가 아래 같다고 할 ..
[Python]Hydra 의 로그 파일을 일별로 변경하는 방법 Hydra Customizing logging 페이지에 적힌 'Hydra is configuring Python standard logging library with the dictConfig method' 란 설명 보고 Python 표준 문서의 TimedRotatingFileHandler 참고해서 적용해보았다. formatters: basic: format: '[%(asctime)s][%(name)-12s][%(levelname)s] - %(message)s' datefmt: "%Y-%m-%d %H:%M:%S" handlers: console: class: logging.StreamHandler formatter: basic stream: ext://sys.stdout file: class: loggi..
[Python] jsonbender를 이용한 json 포맷 변환 DB 데이터를 Json 포맷으로 전환할 일이 생겨서 pandas 라이브러리로 간단하게 처리해봤다. df = pd.read_sql_query(...) if df.empty: return '' else: return df.to_json(orient='records') 회사 일이란게 이렇게 쉽게 끝나는 경우가 있던가. 역시나 json 포맷이 플랫한 형태가 아니라 서브 노드가 있는 형태로 변경 되었다. 가령 SQL 로 추출한 DataFrame 을 to_json 으로 직접 변환하면 아래와 같은 형태로 변환된다. [{"No": 1, "MakerCd": "abc", "Lat"": 10, "Lng": 20, "Contact": null}, ...] 이걸 아래처럼 변경(null 로 표시된 건 ""로 변환까지 포함)해달라..