Full Blog TOC

Full Blog Table Of Content with Keywords Available HERE

Monday, June 29, 2020

Kafka Producer and Consumer in GO




In this post we will review how to create an Apache Kafka producer and consumer in GO.
To access kafka we will use Shopify's Sarama library.




We will use a simple application that starts a producer and a consumer:


package main

import "kafka/players"

func main() {
go players.Producer()

consumer:= players.Consumer{}
consumer.Consume()
}


We will use consts used by both the producer and the consumer:


package players

const KafkaServer = "127.0.0.1:30010"
const KafkaTopic = "my-topic"


The producer code is very strait forward, it sends messages to kafka in loop:


package players

import (
"github.com/Shopify/sarama"
"time"
)

func Producer() {
syncProducer, err := sarama.NewSyncProducer([]string{KafkaServer}, nil)
if err != nil {
panic(err)
}

for {
msg := &sarama.ProducerMessage{
Topic: KafkaTopic,
Value: sarama.ByteEncoder("Hello World " + time.Now().Format(time.RFC3339)),
}

_, _, err = syncProducer.SendMessage(msg)
if err != nil {
panic(err)
}

time.Sleep(time.Second)
}
}


The consumer is a bit more complex, as it needs to recover from a broker crash.


package players

import (
"context"
"fmt"
"github.com/Shopify/sarama"
"time"
)

type Consumer struct {
}

func (c *Consumer) Consume() {
config := sarama.NewConfig()
config.Version = sarama.V2_4_0_0
group, err := sarama.NewConsumerGroup([]string{KafkaServer}, "my-group", config)
if err != nil {
panic(err)
}

go func() {
for err := range group.Errors() {
panic(err)
}
}()

func() {
ctx := context.Background()
for {
topics := []string{KafkaTopic}
err := group.Consume(ctx, topics, c)
if err != nil {
fmt.Printf("kafka consume failed: %v, sleeping and retry in a moment\n", err)
time.Sleep(time.Second)
}
}
}()
}

func (c *Consumer) Setup(_ sarama.ConsumerGroupSession) error {
return nil
}

func (c *Consumer) Cleanup(_ sarama.ConsumerGroupSession) error {
return nil
}

func (c *Consumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("consumed a message: %v\n", string(msg.Value))
sess.MarkMessage(msg, "")
}
return nil
}


And that's all, the output from the application is:


consumed a message: Hello World 2020-06-30T08:28:10+03:00
consumed a message: Hello World 2020-06-30T08:28:18+03:00
consumed a message: Hello World 2020-06-30T08:28:14+03:00
consumed a message: Hello World 2020-06-30T08:28:19+03:00
consumed a message: Hello World 2020-06-30T08:28:16+03:00
consumed a message: Hello World 2020-06-30T08:28:15+03:00
consumed a message: Hello World 2020-06-30T08:28:20+03:00

Final Notes


In this post we have created a simple kafka producer & consumer application in GO.

Notice that the message sent from the producer is a simple text, but it could also be a JSON based on marshal of a structure.


No comments:

Post a Comment