In this post we will show a performance test for kafka batch consumer.
First, let review a basic wrapper for the confluent kafka library.
package kafka
import (
"fmt"
kafkaApi "github.com/confluentinc/confluent-kafka-go/kafka"
"sync"
"time"
)
type Producer struct {
kafkaTopic string
producer *kafkaApi.Producer
errorsCount int
mutex sync.Mutex
}
type Consumer struct {
consumer *kafkaApi.Consumer
lastMessage *kafkaApi.Message
}
func CreateKafkaProducer(
kafkaBroker string,
kafkaTopic string,
) *Producer {
config := make(kafkaApi.ConfigMap)
config["bootstrap.servers"] = kafkaBroker
producer, err := kafkaApi.NewProducer(&config)
if err != nil {
panic(err)
}
go func() {
for {
event := <-producer.Events()
}
}()
return &Producer{
kafkaTopic: kafkaTopic,
producer: producer,
}
}
func (p *Producer) ProduceMessage(
key string,
messageData []byte,
) {
message := kafkaApi.Message{
Key: []byte(key),
Value: messageData,
TopicPartition: kafkaApi.TopicPartition{
Topic: &p.kafkaTopic,
Partition: kafkaApi.PartitionAny,
},
}
err := p.producer.Produce(&message, nil)
if err != nil {
panic(err)
}
}
func (p *Producer) Close() {
p.producer.Close()
}
func CreateKafkaConsumer(
kafkaBroker string,
kafkaTopic string,
consumerGroup string,
) *Consumer {
config := make(kafkaApi.ConfigMap)
config["bootstrap.servers"] = kafkaBroker
config["group.id"] = consumerGroup
//config["fetch.max.bytes"] = 50 * 1024 * 1024
//config["max.partition.fetch.bytes"] = 50 * 1024 * 1024
//config["auto.offset.reset"] = "earliest"
//config["api.version.request"] = false
//config["debug"] = "all"
consumer, err := kafkaApi.NewConsumer(&config)
if err != nil {
panic(err)
}
go func() {
for {
event := <-consumer.Events()
fmt.Printf("kafka consumer event: %v\n", event)
}
}()
err = consumer.Subscribe(kafkaTopic, nil)
if err != nil {
panic(err)
}
return &Consumer{
consumer: consumer,
}
}
func (c *Consumer) ReadMessage() []byte {
msg, err := c.consumer.ReadMessage(-1)
if err != nil {
panic(err)
}
c.lastMessage = msg
return msg.Value
}
func (c *Consumer) CommitLastMessage() {
_, err := c.consumer.CommitMessage(c.lastMessage)
if err != nil {
panic(err)
}
}
func (c *Consumer) CommitBulk() {
lastPartition := c.lastMessage.TopicPartition
partitions := []kafkaApi.TopicPartition{
{
Topic: lastPartition.Topic,
Partition: lastPartition.Partition,
Offset: lastPartition.Offset + 1,
},
}
_, err := c.consumer.CommitOffsets(partitions)
if err != nil {
panic(err)
}
}
To test this we run a consumer and a producer in parallel:
const broker = "localhost:9092"
const topic = "my-topic"
type Data struct {
MyId int64
A00 string
A01 string
A02 string
A03 string
A04 string
A05 string
A06 string
A07 string
A08 string
A09 string
}
type Test struct {
stubs.Stubs
id int64
}
func TestValidation(t *testing.T) {
test := Test{
Stubs: stubs.ProduceStubs(t),
}
defer test.TestCleanup()
test.check()
}
func (t *Test) check() {
if false {
return
}
go t.runProducer()
t.runConsumer()
}
func (t *Test) runConsumer() {
consumer := kafka.CreateKafkaConsumer(broker, topic, "my-group")
consumeLogger := progress.ProduceProgress(0, "consume")
consumeLogger.OnlyDelta = true
bulk := 0
startTime := time.Now()
lastLog := time.Now()
var totalConsume int64
for {
bytes := consumer.ReadMessage()
totalConsume++
var data Data
err := json.Unmarshal(bytes, &data)
kiterr.RaiseIfError(err)
consumeLogger.Increment()
bulk++
if bulk > 1000 {
bulk = 0
consumer.CommitBulk()
if time.Now().Sub(lastLog) > time.Second*10 {
lastLog = time.Now()
passed := time.Since(startTime)
perSecond := totalConsume / int64(passed.Seconds())
t.Log("average consume %v messages/sec", perSecond)
}
}
}
}
func (t *Test) runProducer() {
producer := kafka.CreateKafkaProducer(t.NowTime, broker, topic)
produceLogger := progress.ProduceProgress(0, "produce")
produceLogger.OnlyDelta = true
data := Data{
A00: kitstring.GetRandomString(100),
A01: kitstring.GetRandomString(100),
A02: kitstring.GetRandomString(100),
A03: kitstring.GetRandomString(100),
A04: kitstring.GetRandomString(100),
A05: kitstring.GetRandomString(100),
A06: kitstring.GetRandomString(100),
A07: kitstring.GetRandomString(100),
A08: kitstring.GetRandomString(100),
A09: kitstring.GetRandomString(100),
}
bytes := kitjson.ObjectToBytes(data)
for {
err := producer.ProduceMessage("", bytes)
if err != nil {
t.Log("ignoring error: %v", err)
time.Sleep(5 * time.Second)
}
produceLogger.Increment()
}
}
The kafka can be run as a docker container:
docker run -d --name=kafka -p 9092:9092 apache/kafka
The consumer reaches a rate of 120K on one core.
No comments:
Post a Comment