얼마 전 파이썬으로 만들었던 기능을 공부 차원에서 go lang 샘플 코드 주워다 만들어 봤습니다. mariaDB 데이터를 json 형식으로 kafka publish 하는거. ( 코드 아랫 부분에 코멘트해놓은거는 파일에서 읽어 처리하는거) 기능들 펑션으로 분리해놓지도 않은 날코딩이지만 go 에서 db, kafka 이용하는 기초 구문 익혀볼 수 있습니다.
import (
"fmt"
"github.com/Shopify/sarama"
"database/sql"
_ "github.com/go-sql-driver/mysql"
)
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Return.Successes = true
config.Producer.Retry.Max = 5
brokers := []string{"kafka_ip:port"}
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
// Should not reach here
panic(err)
}
defer func() {
if err := producer.Close(); err != nil {
panic(err)
}
}()
topic := "evt_job_ord_step"
// Create the database handle, confirm driver is present
db, _ := sql.Open("mysql", "db_id:db_pw@tcp(db_ip)/db_name")
defer db.Close()
// Execute the query
query := `
SELECT JSON_OBJECT(
'jobOrdId', job_ord_id,
...
'crnId', crn_id,
'crnTpCd', crn_tp_cd,
'evntDt', DATE_FORMAT(evnt_dt, '%Y%m%d%H%i%s')
)
FROM evt_job_ord
`
results, err := db.Query(query)
if err != nil {
panic(err.Error()) // proper error handling instead of panic in your app
}
var json_msg string
for results.Next() {
err = results.Scan(&json_msg)
if err != nil {
panic(err.Error()) // proper error handling instead of panic in your app
}
//fmt.Println(json_msg)
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(json_msg),
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
panic(err)
}
fmt.Printf("Message is stored in topic(%s)/partition(%d)/offset(%d)\n", topic, partition, offset)
}
/*
f, _ := os.Open("D:/workspace/PycharmProjects/pyDL/kafka/evt_qc_sprd_pos_hst.txt")
// Create a Scanner for the file
scanner := bufio.NewScanner(f)
// Read and print each line in the file
for scanner.Scan() {
line := scanner.Text()
fmt.Println(line)
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(line),
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
panic(err)
}
fmt.Printf("Message is stored in topic(%s)/partition(%d)/offset(%d)\n", topic, partition, offset)
}
*/
}
'Slack 채널 정리' 카테고리의 다른 글
mariadb 에서 Point 타입으로 위,경도 값 넣어 이동 거리 계산 (0) | 2019.11.29 |
---|---|
[JPA]Point 타입 DB 저장 처리 (0) | 2019.11.29 |
실패한 브라우저 탭 갯수 제한 스크립트 (0) | 2019.11.28 |
empty cathc block 처리 하기위한 python 스크립트 (0) | 2019.11.28 |
바탕화면에 바로가기 버튼 만드는 스크립트 (0) | 2019.11.28 |