To access kafka we will use Shopify's Sarama library.
See also the post Deploy Apache Kafka on Kubernetes, as well as the post Secure connection to Kafka from a GoLang client.
We will use a simple application that starts a producer and a consumer:
package main
import "kafka/players"
func main() {
go players.Producer()
consumer:= players.Consumer{}
consumer.Consume()
}
We will use consts used by both the producer and the consumer:
package players
const KafkaServer = "127.0.0.1:30010"
const KafkaTopic = "my-topic"
The producer code is very strait forward, it sends messages to kafka in loop:
package players
import (
"github.com/Shopify/sarama"
"time"
)
func Producer() {
syncProducer, err := sarama.NewSyncProducer([]string{KafkaServer}, nil)
if err != nil {
panic(err)
}
for {
msg := &sarama.ProducerMessage{
Topic: KafkaTopic,
Value: sarama.ByteEncoder("Hello World " + time.Now().Format(time.RFC3339)),
}
_, _, err = syncProducer.SendMessage(msg)
if err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}
The consumer is a bit more complex, as it needs to recover from a broker crash.
package players
import (
"context"
"fmt"
"github.com/Shopify/sarama"
"time"
)
type Consumer struct {
}
func (c *Consumer) Consume() {
config := sarama.NewConfig()
config.Version = sarama.V2_4_0_0
group, err := sarama.NewConsumerGroup([]string{KafkaServer}, "my-group", config)
if err != nil {
panic(err)
}
go func() {
for err := range group.Errors() {
panic(err)
}
}()
func() {
ctx := context.Background()
for {
topics := []string{KafkaTopic}
err := group.Consume(ctx, topics, c)
if err != nil {
fmt.Printf("kafka consume failed: %v, sleeping and retry in a moment\n", err)
time.Sleep(time.Second)
}
}
}()
}
func (c *Consumer) Setup(_ sarama.ConsumerGroupSession) error {
return nil
}
func (c *Consumer) Cleanup(_ sarama.ConsumerGroupSession) error {
return nil
}
func (c *Consumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("consumed a message: %v\n", string(msg.Value))
sess.MarkMessage(msg, "")
}
return nil
}
And that's all, the output from the application is:
consumed a message: Hello World 2020-06-30T08:28:10+03:00 consumed a message: Hello World 2020-06-30T08:28:18+03:00 consumed a message: Hello World 2020-06-30T08:28:14+03:00 consumed a message: Hello World 2020-06-30T08:28:19+03:00 consumed a message: Hello World 2020-06-30T08:28:16+03:00 consumed a message: Hello World 2020-06-30T08:28:15+03:00 consumed a message: Hello World 2020-06-30T08:28:20+03:00
Final Notes
In this post we have created a simple kafka producer & consumer application in GO.
Notice that the message sent from the producer is a simple text, but it could also be a JSON based on marshal of a structure.
No comments:
Post a Comment