Wednesday, October 25, 2023

Using AWS SQS in Go


In this post we will review usage of AWS SQS with a Go Application.

Let's have a look at the general structure of the example:

package main

import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"strings"
"sync"
"time"
)

const queueUrl = "https://sqs.us-east-1.amazonaws.com/YOUR_ACCOUNT_ID/YOUR_QUEUE_NAME"

func main() {
purge()
go sendLoop()
go receiveLoop(1)
go receiveLoop(2)
var waitGroup sync.WaitGroup
waitGroup.Add(1)
waitGroup.Wait()
}


We start by purging the queue messages, assuring that we start with a clear queue, and not affected by previous runs. 

Notice we can purge queue messages only once in a minute.


func purge() {
awSession := session.Must(session.NewSession())
svc := sqs.New(awSession)
queueInput := &sqs.PurgeQueueInput{
QueueUrl: aws.String(queueUrl),
}
_, err := svc.PurgeQueue(queueInput)
if err != nil {
panic(err)
}
}


We run a go routine to produce messages every ~2 seconds. 

We use DelaySeconds of 3 seconds. This might be required in case the processing cannot start immediately due to some other processing that should be prepared in the background.


func sendLoop() {
awSession := session.Must(session.NewSession())
svc := sqs.New(awSession)

i := 0
for {
i++
messageInput := sqs.SendMessageInput{
DelaySeconds: aws.Int64(3),
MessageBody: aws.String(fmt.Sprintf("%v", i)),
QueueUrl: aws.String(queueUrl),
}

_, err := svc.SendMessage(&messageInput)
if err != nil {
panic(err)
}
fmt.Printf("sent message #%v\n", i)

time.Sleep(2 * time.Second)
}
}


The last thing we do is to run two consumers as go routine. 

Notice that we configure we're willing to wait up to 1 second for messages, which is good to reduce amount of AWS API calls and enables getting larger bulks of messages, up to MaxNumberOfMessages. 

We also use VisibilityTimeout of 10 seconds that marks the message as not available to other consumers for that period. This means the consumer has up to 10 seconds to process the message and to delete if from the queue.


func receiveLoop(
receiverId int,
) {
awSession := session.Must(session.NewSession())
svc := sqs.New(awSession)

for {
messageInput := sqs.ReceiveMessageInput{
AttributeNames: []*string{
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
},
MessageAttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),
},
QueueUrl: aws.String(queueUrl),
MaxNumberOfMessages: aws.Int64(10),
VisibilityTimeout: aws.Int64(10),
WaitTimeSeconds: aws.Int64(1),
}

msgResult, err := svc.ReceiveMessage(&messageInput)
if err != nil {
panic(err)
}

if len(msgResult.Messages) == 0 {
fmt.Printf("receiver %v, no messages\n", receiverId)
} else {
items := make([]string, 0)
for _, message := range msgResult.Messages {
items = append(items, *message.Body)
}
fmt.Printf("receiver %v, messages: %v\n",
receiverId, strings.Join(items, ","))

for _, message := range msgResult.Messages {
deleteMessageInput := sqs.DeleteMessageInput{
QueueUrl: aws.String(queueUrl),
ReceiptHandle: message.ReceiptHandle,
}
_, err = svc.DeleteMessage(&deleteMessageInput)
if err != nil {
panic(err)
}
}
}
}
}


An example of output is below. 

Notice that a message is indeed consumed only 3 seconds after it was sent.

Also, once one receiver got a message, it is not available for the other consumers.


sent message #1
receiver 2, no messages
receiver 1, no messages
receiver 2, no messages
sent message #2
receiver 1, no messages
receiver 2, no messages
receiver 1, messages: 1
receiver 2, no messages
sent message #3
receiver 1, no messages
receiver 1, messages: 2
receiver 2, no messages
receiver 2, no messages
sent message #4
receiver 1, no messages
receiver 1, messages: 3
receiver 2, no messages
receiver 1, no messages
receiver 2, no messages
sent message #5
receiver 2, messages: 4
receiver 1, no messages
receiver 2, no messages
receiver 1, no messages
sent message #6
receiver 2, messages: 5
receiver 1, no messages
receiver 2, no messages
sent message #7
receiver 1, no messages
receiver 2, messages: 6
receiver 1, no messages
sent message #8
receiver 2, no messages
receiver 1, no messages
receiver 2, messages: 7
receiver 1, no messages
sent message #9
receiver 2, no messages


Let's make our consumer fail to process the messages in a timely fashion in some cases.

We do this by changing the VisibilityTimeout to 5 seconds, and add a random sleep of up to 10 seconds before delete of the message from the queue.


func receiveLoop(
receiverId int,
) {
awSession := session.Must(session.NewSession())
svc := sqs.New(awSession)

for {
messageInput := sqs.ReceiveMessageInput{
AttributeNames: []*string{
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
},
MessageAttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),
},
QueueUrl: aws.String(queueUrl),
MaxNumberOfMessages: aws.Int64(10),
VisibilityTimeout: aws.Int64(5),
WaitTimeSeconds: aws.Int64(1),
}

msgResult, err := svc.ReceiveMessage(&messageInput)
if err != nil {
panic(err)
}

if len(msgResult.Messages) == 0 {
fmt.Printf("receiver %v, no mesages\n", receiverId)
} else {
items := make([]string, 0)
for _, message := range msgResult.Messages {
items = append(items, *message.Body)
}
fmt.Printf("receiver %v, messages: %v\n",
receiverId, strings.Join(items, ","))

sleepTime := time.Second * time.Duration(rand.Int63n(10))
fmt.Printf("receiver %v, sleeping %v\n", receiverId, sleepTime)
time.Sleep(sleepTime)

for _, message := range msgResult.Messages {
deleteMessageInput := sqs.DeleteMessageInput{
QueueUrl: aws.String(queueUrl),
ReceiptHandle: message.ReceiptHandle,
}
_, err = svc.DeleteMessage(&deleteMessageInput)
if err != nil {
panic(err)
}
}
}
}
}


The result is below.

We can see that some messages that took too long to process are re-consumed by another consumer (see message #4 for example).


sent message #1
receiver 2, no mesages
receiver 1, no mesages
sent message #2
receiver 2, no mesages
receiver 1, no mesages
receiver 2, messages: 1
receiver 2, sleeping 9s
receiver 1, no mesages
sent message #3
receiver 1, messages: 2
receiver 1, sleeping 2s
sent message #4
receiver 1, messages: 3
receiver 1, sleeping 4s
sent message #5
sent message #6
receiver 1, messages: 4
receiver 1, sleeping 8s
receiver 2, messages: 5
receiver 2, sleeping 7s
sent message #7
sent message #8
sent message #9
sent message #10
receiver 2, messages: 4,6,7
receiver 2, sleeping 5s
receiver 1, messages: 8
receiver 1, sleeping 6s
sent message #11
sent message #12
receiver 2, messages: 10
receiver 2, sleeping 8s
sent message #13
receiver 1, messages: 9,11
receiver 1, sleeping 3s
sent message #14
receiver 1, messages: 12
receiver 1, sleeping 3s
sent message #15
sent message #16
receiver 1, messages: 14
receiver 1, sleeping 7s
receiver 2, messages: 13,15
receiver 2, sleeping 2s
sent message #17
receiver 2, messages: 16
receiver 2, sleeping 8s
sent message #18
sent message #19
receiver 1, messages: 18
receiver 1, sleeping 5s




Sunday, October 22, 2023

Go-Redis Pipeline for HGETALL



In this post we will show an example for using redis pipeline for HGETALL command.

I've found some examples out there on the web, but most of them cover only the update commands, and less referring to the read commands.


Let's start with creating a redis connection:


options := redis.Options{
Addr: "127.0.0.1:6379",
}
client := redis.NewClient(&options)


To handle the pipeline commands, we need to keep for each HGETALL command the pointer to the command, and only access it after the pipeline exec, hence we use a struct wrapper for this.


func ProducePipelineWrapper(
context context.Context,
origin redis.Pipeliner,
) *PipelineWrapper {
return &PipelineWrapper{
context: context,
origin: origin,
hGetAllCommands: make(map[string]*redis.MapStringStringCmd),
hGetAllResults: make(map[string]HGetAllResult),
}
}

func (p *PipelineWrapper) HGetAll(key string) {
p.hGetAllCommands[key] = p.origin.HGetAll(p.context, key)
}

func (p *PipelineWrapper) Exec() {
_, err := p.origin.Exec(p.context)
if err != nil {
panic(err)
}

for key, cmd := range p.hGetAllCommands {
result, err := cmd.Result()
if err != nil {
panic(err)
}
p.hGetAllResults[key] = result
}
}

func (p *PipelineWrapper) ResultsForHGetAll() map[string]HGetAllResult {
return p.hGetAllResults
}


To use this wrapper, we call HGETALL for a list of keys, and after the exec, we check the results.


wrapper := ProducePipelineWrapper(context.Background(), client.Pipeline())
keys := []string{"k1", "k2"}
for _, key := range keys {
wrapper.HGetAll(key)
}

wrapper.Exec()

for _, key := range keys {
values := wrapper.ResultsForHGetAll()[key]
fmt.Printf("key: %v, values: %v\n", key, values)
}


That's all folks!




Monday, October 16, 2023

Lazy Update - Reduce Redis Load Method

 



In the following post we will review a method to reduce load on a Redis. I've have used this method in several projects, and it make the difference between a non working project with too high costs to a functioning project with reasonable costs.

When working with Redis, there are usually several pods processing work tasks, and updating the Redis with the results. There are several gradual steps in the process of a product maturity until it settles on the best method to use the Redis.

The Steps

The first an naive step is to update the Redis for each work task. The work task might be a user action, a web transaction, or a system event, and hence we expect huge amount of work tasks, and the implication is huge amount of Redis updates.

Trying to reduce the Redis updates, usually leads to memory state. We have multiple kubernetes pods as part of the same kubernetes deployment. Each of the pods keeps its own memory state, where all updates are done, and then once in a short period (for example once in 5 seconds), the state is saved back to the Redis. Why is this better? We usually increment the same counters and set value for the save Redis key for each work task. Instead, we can do this in memory, and only update the Redis once, while converting multiple increment operations to a single operation. This reduces the complexity of the Redis updates from O(N) where N in the work tasks number to ~O(time period).

The next step is reducing the updates even more. Part of the in-memory state that we keep in the pods is saved only for cases the the pod is terminated, and we need to reload the state. Do we really need to save it every 5 seconds? How critical would it be if we lose some of the updates? If some updates are not critical, we can save these using higher interval, for example once in 10 minutes. Notice that it is important to use random interval to prevent parallel save of all the pods exactly in the same time, and hence loading the Redis and slowing the system. An example of time interval calculation is:

10 minutes + random(10 minutes)

Final Note

In this post we've reviewed methods to reduce the load on Redis. 
We have reviewed state save methods. The same methods can be used also for load state. 
Using these methods should be the one of the first attempts to solve Redis stress issue, before jumping into conclusion that the Redis cluster should be upgraded to use more CPU and memory resource, and hence reducing costs in an effective way.