Wednesday, October 25, 2023

Using AWS SQS in Go


In this post we will review usage of AWS SQS with a Go Application.

Let's have a look at the general structure of the example:

package main

import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"strings"
"sync"
"time"
)

const queueUrl = "https://sqs.us-east-1.amazonaws.com/YOUR_ACCOUNT_ID/YOUR_QUEUE_NAME"

func main() {
purge()
go sendLoop()
go receiveLoop(1)
go receiveLoop(2)
var waitGroup sync.WaitGroup
waitGroup.Add(1)
waitGroup.Wait()
}


We start by purging the queue messages, assuring that we start with a clear queue, and not affected by previous runs. 

Notice we can purge queue messages only once in a minute.


func purge() {
awSession := session.Must(session.NewSession())
svc := sqs.New(awSession)
queueInput := &sqs.PurgeQueueInput{
QueueUrl: aws.String(queueUrl),
}
_, err := svc.PurgeQueue(queueInput)
if err != nil {
panic(err)
}
}


We run a go routine to produce messages every ~2 seconds. 

We use DelaySeconds of 3 seconds. This might be required in case the processing cannot start immediately due to some other processing that should be prepared in the background.


func sendLoop() {
awSession := session.Must(session.NewSession())
svc := sqs.New(awSession)

i := 0
for {
i++
messageInput := sqs.SendMessageInput{
DelaySeconds: aws.Int64(3),
MessageBody: aws.String(fmt.Sprintf("%v", i)),
QueueUrl: aws.String(queueUrl),
}

_, err := svc.SendMessage(&messageInput)
if err != nil {
panic(err)
}
fmt.Printf("sent message #%v\n", i)

time.Sleep(2 * time.Second)
}
}


The last thing we do is to run two consumers as go routine. 

Notice that we configure we're willing to wait up to 1 second for messages, which is good to reduce amount of AWS API calls and enables getting larger bulks of messages, up to MaxNumberOfMessages. 

We also use VisibilityTimeout of 10 seconds that marks the message as not available to other consumers for that period. This means the consumer has up to 10 seconds to process the message and to delete if from the queue.


func receiveLoop(
receiverId int,
) {
awSession := session.Must(session.NewSession())
svc := sqs.New(awSession)

for {
messageInput := sqs.ReceiveMessageInput{
AttributeNames: []*string{
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
},
MessageAttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),
},
QueueUrl: aws.String(queueUrl),
MaxNumberOfMessages: aws.Int64(10),
VisibilityTimeout: aws.Int64(10),
WaitTimeSeconds: aws.Int64(1),
}

msgResult, err := svc.ReceiveMessage(&messageInput)
if err != nil {
panic(err)
}

if len(msgResult.Messages) == 0 {
fmt.Printf("receiver %v, no messages\n", receiverId)
} else {
items := make([]string, 0)
for _, message := range msgResult.Messages {
items = append(items, *message.Body)
}
fmt.Printf("receiver %v, messages: %v\n",
receiverId, strings.Join(items, ","))

for _, message := range msgResult.Messages {
deleteMessageInput := sqs.DeleteMessageInput{
QueueUrl: aws.String(queueUrl),
ReceiptHandle: message.ReceiptHandle,
}
_, err = svc.DeleteMessage(&deleteMessageInput)
if err != nil {
panic(err)
}
}
}
}
}


An example of output is below. 

Notice that a message is indeed consumed only 3 seconds after it was sent.

Also, once one receiver got a message, it is not available for the other consumers.


sent message #1
receiver 2, no messages
receiver 1, no messages
receiver 2, no messages
sent message #2
receiver 1, no messages
receiver 2, no messages
receiver 1, messages: 1
receiver 2, no messages
sent message #3
receiver 1, no messages
receiver 1, messages: 2
receiver 2, no messages
receiver 2, no messages
sent message #4
receiver 1, no messages
receiver 1, messages: 3
receiver 2, no messages
receiver 1, no messages
receiver 2, no messages
sent message #5
receiver 2, messages: 4
receiver 1, no messages
receiver 2, no messages
receiver 1, no messages
sent message #6
receiver 2, messages: 5
receiver 1, no messages
receiver 2, no messages
sent message #7
receiver 1, no messages
receiver 2, messages: 6
receiver 1, no messages
sent message #8
receiver 2, no messages
receiver 1, no messages
receiver 2, messages: 7
receiver 1, no messages
sent message #9
receiver 2, no messages


Let's make our consumer fail to process the messages in a timely fashion in some cases.

We do this by changing the VisibilityTimeout to 5 seconds, and add a random sleep of up to 10 seconds before delete of the message from the queue.


func receiveLoop(
receiverId int,
) {
awSession := session.Must(session.NewSession())
svc := sqs.New(awSession)

for {
messageInput := sqs.ReceiveMessageInput{
AttributeNames: []*string{
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
},
MessageAttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),
},
QueueUrl: aws.String(queueUrl),
MaxNumberOfMessages: aws.Int64(10),
VisibilityTimeout: aws.Int64(5),
WaitTimeSeconds: aws.Int64(1),
}

msgResult, err := svc.ReceiveMessage(&messageInput)
if err != nil {
panic(err)
}

if len(msgResult.Messages) == 0 {
fmt.Printf("receiver %v, no mesages\n", receiverId)
} else {
items := make([]string, 0)
for _, message := range msgResult.Messages {
items = append(items, *message.Body)
}
fmt.Printf("receiver %v, messages: %v\n",
receiverId, strings.Join(items, ","))

sleepTime := time.Second * time.Duration(rand.Int63n(10))
fmt.Printf("receiver %v, sleeping %v\n", receiverId, sleepTime)
time.Sleep(sleepTime)

for _, message := range msgResult.Messages {
deleteMessageInput := sqs.DeleteMessageInput{
QueueUrl: aws.String(queueUrl),
ReceiptHandle: message.ReceiptHandle,
}
_, err = svc.DeleteMessage(&deleteMessageInput)
if err != nil {
panic(err)
}
}
}
}
}


The result is below.

We can see that some messages that took too long to process are re-consumed by another consumer (see message #4 for example).


sent message #1
receiver 2, no mesages
receiver 1, no mesages
sent message #2
receiver 2, no mesages
receiver 1, no mesages
receiver 2, messages: 1
receiver 2, sleeping 9s
receiver 1, no mesages
sent message #3
receiver 1, messages: 2
receiver 1, sleeping 2s
sent message #4
receiver 1, messages: 3
receiver 1, sleeping 4s
sent message #5
sent message #6
receiver 1, messages: 4
receiver 1, sleeping 8s
receiver 2, messages: 5
receiver 2, sleeping 7s
sent message #7
sent message #8
sent message #9
sent message #10
receiver 2, messages: 4,6,7
receiver 2, sleeping 5s
receiver 1, messages: 8
receiver 1, sleeping 6s
sent message #11
sent message #12
receiver 2, messages: 10
receiver 2, sleeping 8s
sent message #13
receiver 1, messages: 9,11
receiver 1, sleeping 3s
sent message #14
receiver 1, messages: 12
receiver 1, sleeping 3s
sent message #15
sent message #16
receiver 1, messages: 14
receiver 1, sleeping 7s
receiver 2, messages: 13,15
receiver 2, sleeping 2s
sent message #17
receiver 2, messages: 16
receiver 2, sleeping 8s
sent message #18
sent message #19
receiver 1, messages: 18
receiver 1, sleeping 5s




No comments:

Post a Comment