긴급하게 Kafka 이용한 대량 데이터 처리 테스트 지원 요청이 있어서 예전에 go 로 짰던 프로그램을 요건에 맞추어 좀 수정해봤습니다.
여러 장비에서 대량 데이터를 짧은 시간 내에 produce 하는 척 하기 위해 AsyncProducer 와 고루틴 등을 이용해보았는데 아직 목표치만큼 송신하지는 못하고 있어서 개선 부분 고민 중입니다.
온라인에서 참고한 샘플 소스랑 요건 조합하다 보니 go 초보 수준이라 네이밍이나 이런게 엉망이지만 혹시라도 조언 주실 분 있을지 몰라 올려봅니다.
package main
import (
"bufio"
"container/list"
"flag"
"fmt"
"github.com/Shopify/sarama"
"gopkg.in/yaml.v2"
"io/ioutil"
"log"
"os"
"path/filepath"
"strconv"
"strings"
"time"
)
func check(e error) {
if e != nil {
panic(e)
}
}
type Config struct {
DeviceCount int `yaml:"device-count"`
DataDir string `yaml:"data-path"`
Kafka struct {
Brokers []string `yaml:"brokers,flow"`
Topic string `yaml:"topic"`
} `yaml:"kafka"`
}
func main() {
config := getConfig()
dataFile, deviceIndex := parseFlag()
topic := config.Kafka.Topic
dataDir := config.DataDir
deviceNum := config.DeviceCount
arrLists := makeMsgList(topic, dataDir, dataFile, deviceNum, deviceIndex)
brokers := config.Kafka.Brokers
producer := getAsyncProducer(brokers)
for i := 0; i < deviceNum; i++ {
go func(i int) {
for temp := arrLists[i].Front(); temp != nil; temp = temp.Next() {
msg := temp.Value.(*sarama.ProducerMessage)
fmt.Printf("%s %s\n", time.Unix(0, time.Now().UnixNano()), msg.Value)
producer.Input() <- msg
}
}(i)
}
//fmt.Scanln()
timeoutChan := time.After(10 * time.Second)
select {
case <-timeoutChan:
println("Close Async")
}
}
func getConfig () Config {
filename, _ := filepath.Abs("./config.yml")
yamlFile, err := ioutil.ReadFile(filename)
check(err)
var config Config
err = yaml.Unmarshal(yamlFile, &config)
check(err)
return config
}
func parseFlag() (*string, *int) {
dataFile := flag.String("data", "data.txt", "test data")
deviceIndex := flag.Int("didx", 2, "device flag")
flag.Parse()
return dataFile, deviceIndex
}
func makeMsgList(topic string, dataDir string, dataFile *string, deviceNum int, deviceIndex *int) []list.List {
arrLists := make([]list.List, deviceNum, 10)
f, err := os.Open(dataDir + *dataFile)
check(err)
scanner := bufio.NewScanner(f)
for j := 0; scanner.Scan(); j++ {
key := j % deviceNum
line := strings.Replace(scanner.Text(), "Q1C05", "QC_"+strconv.Itoa(key+deviceNum**deviceIndex), 1)
//fmt.Println(line)
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(line),
}
arrLists[key].PushBack(msg)
}
return arrLists
}
func getAsyncProducer(brokerList []string) sarama.AsyncProducer {
config := sarama.NewConfig()
//config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.RequiredAcks = sarama.NoResponse
config.Producer.Compression = sarama.CompressionSnappy
config.Producer.Flush.Frequency = 1 * time.Millisecond
producer, err := sarama.NewAsyncProducer(brokerList, config)
if err != nil {
log.Fatalln("Failed to start Sarama producer:", err)
}
go func() {
for err := range producer.Errors() {
log.Println("Failed to write access log entry:", err)
}
}()
return producer
}