본문 바로가기

카테고리 없음

[Go Lang] Kafka Async Producer

긴급하게 Kafka 이용한 대량 데이터 처리 테스트 지원 요청이 있어서 예전에 go 로 짰던 프로그램을 요건에 맞추어 좀 수정해봤습니다.

여러 장비에서 대량 데이터를 짧은 시간 내에 produce 하는 척 하기 위해 AsyncProducer 와 고루틴 등을 이용해보았는데 아직 목표치만큼 송신하지는 못하고 있어서 개선 부분 고민 중입니다. 

온라인에서 참고한 샘플 소스랑 요건 조합하다 보니 go 초보 수준이라 네이밍이나 이런게 엉망이지만 혹시라도 조언 주실 분 있을지 몰라 올려봅니다.

package main

import (
	"bufio"
	"container/list"
	"flag"
	"fmt"
	"github.com/Shopify/sarama"
	"gopkg.in/yaml.v2"
	"io/ioutil"
	"log"
	"os"
	"path/filepath"
	"strconv"
	"strings"
	"time"
)

func check(e error) {
	if e != nil {
		panic(e)
	}
}

type Config struct {
	DeviceCount int `yaml:"device-count"`
	DataDir string `yaml:"data-path"`
	Kafka struct {
		Brokers []string `yaml:"brokers,flow"`
		Topic string `yaml:"topic"`
	} `yaml:"kafka"`
}

func main() {
	config := getConfig()

	dataFile, deviceIndex := parseFlag()
	topic := config.Kafka.Topic
	dataDir := config.DataDir
	deviceNum := config.DeviceCount
	arrLists := makeMsgList(topic, dataDir, dataFile, deviceNum, deviceIndex)

	brokers := config.Kafka.Brokers
	producer := getAsyncProducer(brokers)
	for i := 0; i < deviceNum; i++ {
		go func(i int) {
			for temp := arrLists[i].Front(); temp != nil; temp = temp.Next() {
				msg := temp.Value.(*sarama.ProducerMessage)
				fmt.Printf("%s %s\n", time.Unix(0, time.Now().UnixNano()), msg.Value)
				producer.Input() <- msg
			}
		}(i)
	}
	//fmt.Scanln()

	timeoutChan := time.After(10 * time.Second)
	select {
	case <-timeoutChan:
		println("Close Async")
	}
}

func getConfig () Config {
	filename, _ := filepath.Abs("./config.yml")
	yamlFile, err := ioutil.ReadFile(filename)
	check(err)
	var config Config
	err = yaml.Unmarshal(yamlFile, &config)
	check(err)
	return config
}

func parseFlag() (*string, *int) {
	dataFile := flag.String("data", "data.txt", "test data")
	deviceIndex := flag.Int("didx", 2, "device flag")
	flag.Parse()
	return dataFile, deviceIndex
}

func makeMsgList(topic string, dataDir string, dataFile *string, deviceNum int, deviceIndex *int) []list.List {
	arrLists := make([]list.List, deviceNum, 10)

	f, err := os.Open(dataDir + *dataFile)
	check(err)
	scanner := bufio.NewScanner(f)
	for j := 0; scanner.Scan(); j++ {
		key := j % deviceNum
		line := strings.Replace(scanner.Text(), "Q1C05", "QC_"+strconv.Itoa(key+deviceNum**deviceIndex), 1)
		//fmt.Println(line)
		msg := &sarama.ProducerMessage{
			Topic: topic,
			Value: sarama.StringEncoder(line),
		}
		arrLists[key].PushBack(msg)
	}
	return arrLists
}

func getAsyncProducer(brokerList []string) sarama.AsyncProducer {
	config := sarama.NewConfig()
	//config.Producer.RequiredAcks = sarama.WaitForLocal
	config.Producer.RequiredAcks = sarama.NoResponse
	config.Producer.Compression = sarama.CompressionSnappy
	config.Producer.Flush.Frequency = 1 * time.Millisecond

	producer, err := sarama.NewAsyncProducer(brokerList, config)
	if err != nil {
		log.Fatalln("Failed to start Sarama producer:", err)
	}
	go func() {
		for err := range producer.Errors() {
			log.Println("Failed to write access log entry:", err)
		}
	}()

	return producer
}