본문 바로가기

Slack 채널 정리

go - db 데이터 json 형식으로 kafka publish

얼마 전 파이썬으로 만들었던 기능을 공부 차원에서 go lang 샘플 코드 주워다 만들어 봤습니다. mariaDB 데이터를 json 형식으로 kafka publish 하는거. ( 코드 아랫 부분에 코멘트해놓은거는 파일에서 읽어 처리하는거) 기능들 펑션으로 분리해놓지도 않은 날코딩이지만 go 에서 db, kafka 이용하는 기초 구문 익혀볼 수 있습니다.

 import (
    "fmt"
    "github.com/Shopify/sarama"
    "database/sql"
    _ "github.com/go-sql-driver/mysql"
)
func main() {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Return.Successes = true
    config.Producer.Retry.Max = 5
    brokers := []string{"kafka_ip:port"}
    producer, err := sarama.NewSyncProducer(brokers, config)
    if err != nil {
        // Should not reach here
        panic(err)
    }
    defer func() {
        if err := producer.Close(); err != nil {
            panic(err)
        }
    }()
    topic := "evt_job_ord_step"
    // Create the database handle, confirm driver is present
    db, _ := sql.Open("mysql", "db_id:db_pw@tcp(db_ip)/db_name")
    defer db.Close()
    // Execute the query
    query := `
            SELECT JSON_OBJECT(
            'jobOrdId',           job_ord_id,
...
            'crnId',              crn_id,
            'crnTpCd',            crn_tp_cd,
            'evntDt',             DATE_FORMAT(evnt_dt, '%Y%m%d%H%i%s')
            )
            FROM evt_job_ord
    `
    results, err := db.Query(query)
    if err != nil {
        panic(err.Error()) // proper error handling instead of panic in your app
    }
    var json_msg string
    for results.Next() {
        err = results.Scan(&json_msg)
        if err != nil {
            panic(err.Error()) // proper error handling instead of panic in your app
        }
        //fmt.Println(json_msg)
        msg := &sarama.ProducerMessage{
            Topic: topic,
            Value: sarama.StringEncoder(json_msg),
        }
        partition, offset, err := producer.SendMessage(msg)
        if err != nil {
            panic(err)
        }
        fmt.Printf("Message is stored in topic(%s)/partition(%d)/offset(%d)\n", topic, partition, offset)
    }
    
  /*
    f, _ := os.Open("D:/workspace/PycharmProjects/pyDL/kafka/evt_qc_sprd_pos_hst.txt")
    // Create a Scanner for the file
    scanner := bufio.NewScanner(f)
    // Read and print each line in the file
    for scanner.Scan() {
        line := scanner.Text()
        fmt.Println(line)
        msg := &sarama.ProducerMessage{
            Topic: topic,
            Value: sarama.StringEncoder(line),
        }
        partition, offset, err := producer.SendMessage(msg)
        if err != nil {
            panic(err)
        }
        fmt.Printf("Message is stored in topic(%s)/partition(%d)/offset(%d)\n", topic, partition, offset)
    }
  */
}