Sunday, November 5, 2023

Using AWS Kinesis in Go


 

In this post we will review a simple Go implementation of AWS kinesis producer and consumer.


The main function starts the producer and the consumer, and then waits forever.


import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"sync"
"time"
)

func main() {
go producer()
go consumer()

waitGroup := sync.WaitGroup{}
waitGroup.Add(1)
waitGroup.Wait()
}


The producer writes a record to AWS kinesis stream every second.


func producer() {
awSession := session.Must(session.NewSession())
kinesisClient := kinesis.New(awSession)
for i := 0; ; i++ {
input := kinesis.PutRecordInput{
Data: []byte(fmt.Sprintf("message data %v", i)),
PartitionKey: aws.String(fmt.Sprintf("key%v", i)),
StreamName: aws.String("s1"),
}
output, err := kinesisClient.PutRecord(&input)
if err != nil {
panic(err)
}

fmt.Printf("produced record to shard %v\n", *output.ShardId)
time.Sleep(time.Second)
}
}


The consumer is more complex. First we need to list the stream's shards, and start a consumer for each of the shards.


func consumer() {
awSession := session.Must(session.NewSession())
kinesisClient := kinesis.New(awSession)

input := kinesis.DescribeStreamInput{
StreamName: aws.String("s1"),
}
output, err := kinesisClient.DescribeStream(&input)
if err != nil {
panic(err)
}

fmt.Printf("%v shards loacted\n", len(output.StreamDescription.Shards))

for i := 0; i < len(output.StreamDescription.Shards); i++ {
shardId := output.StreamDescription.Shards[i].ShardId
go consumerShard(kinesisClient, shardId)
}
}


Each shard consumer has a small trick. First we use iterator to get all the new records. Once we got a record, we modify the iterator to read all records after the last received record.


func consumerShard(
kinesisClient *kinesis.Kinesis,
shardId *string,
) {
iteratorInput := kinesis.GetShardIteratorInput{
ShardId: shardId,
ShardIteratorType: aws.String("LATEST"),
StreamName: aws.String("s1"),
}
shardIteratorOutput, err := kinesisClient.GetShardIterator(&iteratorInput)
if err != nil {
panic(err)
}

var lastRecord *string
for {
recordsInput := kinesis.GetRecordsInput{
ShardIterator: shardIteratorOutput.ShardIterator,
}
records, err := kinesisClient.GetRecords(&recordsInput)
if err != nil {
panic(err)
}

fmt.Printf("shard %v got %v records\n", *shardId, len(records.Records))

for i := 0; i < len(records.Records); i++ {
record := records.Records[i]
fmt.Printf("shard %v data: %v\n", *shardId, string(record.Data))
lastRecord = record.SequenceNumber
}

time.Sleep(5 * time.Second)

if lastRecord != nil {
iteratorInput.ShardIteratorType = aws.String("AFTER_SEQUENCE_NUMBER")
iteratorInput.StartingSequenceNumber = lastRecord
shardIteratorOutput, err = kinesisClient.GetShardIterator(&iteratorInput)
if err != nil {
panic(err)
}
}
}
}


An example output for running this is below.


produced record to shard shardId-000000000000
4 shards loacted
shard shardId-000000000000 got 0 records
shard shardId-000000000003 got 0 records
shard shardId-000000000002 got 0 records
shard shardId-000000000001 got 0 records
produced record to shard shardId-000000000003
produced record to shard shardId-000000000001
produced record to shard shardId-000000000000
produced record to shard shardId-000000000003
shard shardId-000000000000 got 1 records
shard shardId-000000000000 data: message data 3
shard shardId-000000000003 got 2 records
shard shardId-000000000003 data: message data 1
shard shardId-000000000003 data: message data 4
shard shardId-000000000001 got 1 records
shard shardId-000000000001 data: message data 2
shard shardId-000000000002 got 0 records
produced record to shard shardId-000000000000
produced record to shard shardId-000000000001
produced record to shard shardId-000000000003
produced record to shard shardId-000000000001
produced record to shard shardId-000000000000
shard shardId-000000000000 got 2 records
shard shardId-000000000000 data: message data 5
shard shardId-000000000000 data: message data 9
shard shardId-000000000003 got 1 records
shard shardId-000000000003 data: message data 7
shard shardId-000000000002 got 0 records
shard shardId-000000000001 got 2 records
shard shardId-000000000001 data: message data 6
shard shardId-000000000001 data: message data 8
produced record to shard shardId-000000000003
produced record to shard shardId-000000000003
produced record to shard shardId-000000000003
produced record to shard shardId-000000000003
shard shardId-000000000000 got 0 records
shard shardId-000000000003 got 4 records
shard shardId-000000000003 data: message data 10
shard shardId-000000000003 data: message data 11
shard shardId-000000000003 data: message data 12
shard shardId-000000000003 data: message data 13
shard shardId-000000000002 got 0 records
shard shardId-000000000001 got 0 records
produced record to shard shardId-000000000002
produced record to shard shardId-000000000001
produced record to shard shardId-000000000003
produced record to shard shardId-000000000002
produced record to shard shardId-000000000000
shard shardId-000000000000 got 1 records
shard shardId-000000000000 data: message data 18
shard shardId-000000000002 got 2 records
shard shardId-000000000002 data: message data 14
shard shardId-000000000002 data: message data 17
shard shardId-000000000003 got 1 records
shard shardId-000000000003 data: message data 16
shard shardId-000000000001 got 1 records
shard shardId-000000000001 data: message data 15
produced record to shard shardId-000000000001
produced record to shard shardId-000000000000


This is just a starting point for using AWS kinesis, as they are many details that affect the stability of a production grade solution. Still, this is a good starting point.


No comments:

Post a Comment