Wednesday, April 28, 2021

Reading from AWS Kinesis using GoLang


 


In this post we will review reading from AWS Kinesis using GoLang. 

AWS kinesis is widely used in AWS services, for example, it can be used to read real time logging records from CloudFront. See this blog for setup of a real time logging on CloudFront.


To access AWS services, we start with authentication to AWS. See this blog for alternatives for authentication method. Once authentication is configured, we can connect to the kinesis data stream. The record reading is done from a specific shard. In this example, we will select the first available shard.



import (
"context"
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
)

func consume() {
awsConfig, err := config.LoadDefaultConfig(context.Background(), config.WithRegion("us-east-1"))
if err != nil {
panic(err)
}

client := kinesis.NewFromConfig(awsConfig)

streamName := "my-kinesis-data-stream"
describeInput := kinesis.DescribeStreamInput{
StreamName: aws.String(streamName),
}
describeOutput, err := client.DescribeStream(context.Background(), &describeInput)
if err != nil {
panic(err)
}
shard := describeOutput.StreamDescription.Shards[0]


The reading from the stream is done using an infinite busy waiting loop, and an iterator. In this example, we start reading from the first record in the shard, and keep waiting for additional new records.



iteratorInput := kinesis.GetShardIteratorInput{
ShardId: shard.ShardId,
ShardIteratorType: "TRIM_HORIZON",
StreamName: aws.String(streamName),
StartingSequenceNumber: nil,
Timestamp: nil,
}
iteratorOutput, err := client.GetShardIterator(context.Background(), &iteratorInput)
if err != nil {
panic(err)
}

var iterator = *iteratorOutput.ShardIterator
for {
getInput := kinesis.GetRecordsInput{
ShardIterator: &iterator,
Limit: nil,
}
getOutput, err := client.GetRecords(context.Background(), &getInput)
if err != nil {
panic(err)
}
for _, record := range getOutput.Records {
data := string(record.Data)
fmt.Printf("data: %v\n", data)
}

iterator = *getOutput.NextShardIterator
}






No comments:

Post a Comment