Monday, December 25, 2023

AWS SSQ API, Implementation, and Stub in Go


 

In this post we will present a to send messages to AWS SQS. We will include an interface, an implementation, and a stub. We've previously included an example for producer and consumer in Go, and here we provide a nice API that enables us to use this code both in production and in tests.


The interface is the minimal API required to send a message. In this interface we hide the AWS session connection, as well as the AWS SQS queue name, and include only the message details.



type SqsApi interface {
SendMessage(
attributes map[string]string,
data string,
)
}



The implementation uses an AWS session. Notice that there are several methods to get an AWS session, but in this implementation we use the simplest method.


package awssqs

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
)

type SqsImpl struct {
sqsClient *sqs.SQS
queueUrl string
}

func ProduceAwsSqsImpl(
queueUrl string,
) *SqsImpl {
awsSession, err := session.NewSession()
if err!= nil{
panic(err)
}

return &SqsImpl{
sqsClient: sqs.New(awsSession),
queueUrl: queueUrl,
}
}

func (s *SqsImpl) SendMessage(
attributes map[string]string,
data string,
) {
input := sqs.SendMessageInput{
MessageAttributes: make(map[string]*sqs.MessageAttributeValue),
MessageBody: aws.String(data),
QueueUrl: aws.String(s.queueUrl),
}

for key, value := range attributes {
input.MessageAttributes[key] = &sqs.MessageAttributeValue{
DataType: aws.String("String"),
StringValue: aws.String(value),
}
}

_, err := s.sqsClient.SendMessage(&input)
if err!= nil{
panic(err)
}
}



We also include a stub that can be used during tests:



type MessageData struct {
Attributes map[string]string
Data string
}

type SqsStub struct {
Messages []*MessageData
}

func ProduceAwsSqsStub() *SqsStub {
return &SqsStub{}
}

func (s *SqsStub) SendMessage(
attributes map[string]string,
data string,
) {
messageData := MessageData{
Attributes: attributes,
Data: data,
}
s.Messages = append(s.Messages, &messageData)
}






Monday, December 18, 2023

List Files with Epoch Names



In many applications and solutions we create files with timestamp, and a very common method is to use epoch seconds in the name of the files. This is very useful to automated analysis of the files. But, from time to time, we might need to manually analyze the files, and then locating the file with the epoch we want is is tough task. Consider the following files list:


create_time.txt
graphs1702167500-1702171090.json
graphs1702171100-1702174690.json
graphs1702171100-a.json
graphs1702174700-1702178290.json
graphs1702178300-1702181890.json
graphs1702181900-1702185490.json
graphs1702185500-1702189090.json
graphs1702189100-1702192690.json
graphs1702192700-1702196290.json
graphs1702196300-1702199890.json
graphs1702199900-1702203490.json
graphs1702203500-1702205010.json
signatures_1702167500-1702167500.json
signatures_1702167510-1702167510.json
signatures_1702167520-1702167520.json
signatures_1702167530-1702167530.json
signatures_1702167540-1702167540.json
signatures_1702167550-1702167550.json
signatures_1702167560-1702167560.json
signatures_1702167570-1702167570.json
signatures_1702167580-1702167580.json


we can copy the epoch from the file name, and use bash's `date` command to find the actual time of the file, for example:


$ date -u -d @1702171100
Sun 10 Dec 2023 01:18:20 UTC


But doing this for each file is not fun. Instead, we can use a script to automate listing of the files. The following script does the job:


#!/usr/bin/env bash
set -e

folder=$1

ProcessLine() {
file=$1
timeDescription="missing epoch"
epoch1=$(echo "$file" | sed 's/[a-zA-Z_-]*\([0-9]\{10\}\).*/\1/p' | head -1)
epoch1Length=${#epoch1}
if [[ "${epoch1Length}" == "10" ]]; then
date1=$(date +%Y-%m-%dT%H:%M:%S -u -d @${epoch1})
epoch2=$(echo "$file" | sed 's/.*\([0-9]\{10\}\).*\([0-9]\{10\}\).*/\2/p' | head -1)
epoch2Length=${#epoch2}
if [[ "${epoch2Length}" == "10" ]]; then
date2=$(date +%Y-%m-%dT%H:%M:%S -u -d @${epoch2})
timeDescription="${date1} to ${date2}"
else
timeDescription="${date1}"
fi
fi

sizeDescription=$(ls -lh ${file} | awk '{print $5}')
sizeDescription=$(printf "%8s %s" ${sizeDescription})
fileDescription=$(printf "%-50s %s" ${file})

echo "${fileDescription} ${sizeDescription} ${timeDescription}"
}

ls $1 | while read line; do ProcessLine "$line"; done


The output from the script lists all files in the current working directory, including the file name, size, and user friendly times. It supports files without epoch in the name, files with a single epoch in the name, and files with epoch range in their names. An example output is:


create_time.txt                                           51  missing epoch
graphs1702167500-1702171090.json 193K 2023-12-10T00:18:20 to 2023-12-10T01:18:10
graphs1702171100-1702174690.json 193K 2023-12-10T01:18:20 to 2023-12-10T02:18:10
graphs1702171100-a.json 0 2023-12-10T01:18:20
graphs1702174700-1702178290.json 193K 2023-12-10T02:18:20 to 2023-12-10T03:18:10
graphs1702178300-1702181890.json 181K 2023-12-10T03:18:20 to 2023-12-10T04:18:10
graphs1702181900-1702185490.json 184K 2023-12-10T04:18:20 to 2023-12-10T05:18:10
graphs1702185500-1702189090.json 194K 2023-12-10T05:18:20 to 2023-12-10T06:18:10
graphs1702189100-1702192690.json 195K 2023-12-10T06:18:20 to 2023-12-10T07:18:10
graphs1702192700-1702196290.json 195K 2023-12-10T07:18:20 to 2023-12-10T08:18:10
graphs1702196300-1702199890.json 178K 2023-12-10T08:18:20 to 2023-12-10T09:18:10
graphs1702199900-1702203490.json 190K 2023-12-10T09:18:20 to 2023-12-10T10:18:10
graphs1702203500-1702205010.json 76K 2023-12-10T10:18:20 to 2023-12-10T10:43:30
signatures_1702167500-1702167500.json 29 2023-12-10T00:18:20 to 2023-12-10T00:18:20
signatures_1702167510-1702167510.json 29 2023-12-10T00:18:30 to 2023-12-10T00:18:30
signatures_1702167520-1702167520.json 29 2023-12-10T00:18:40 to 2023-12-10T00:18:40
signatures_1702167530-1702167530.json 29 2023-12-10T00:18:50 to 2023-12-10T00:18:50
signatures_1702167540-1702167540.json 29 2023-12-10T00:19:00 to 2023-12-10T00:19:00
signatures_1702167550-1702167550.json 29 2023-12-10T00:19:10 to 2023-12-10T00:19:10







Monday, December 11, 2023

Using NATS in go

 


In the following post we review an example of using NATS in Go. This includes an interface API, implementation, and stub that can be used for tests.


We start with an interface:


type SubscribeHandler func(data []byte)

type ServerSessionApi interface {
PublishMessage(
queueName string,
data []byte,
)

SubscribeQueue(
queueName string,
handler SubscribeHandler,
)

StopQueueSubscriptions(
queueName string,
)
}


This simple interface enables usage of a simple publish-subscribe pattern. Now for the actual implementation.


package nats

import (
"fmt"
"github.com/nats-io/nats.go"
"sync"
)

type subscriptionData struct {
subscribers []*nats.Subscription
}

type ServerSessionImpl struct {
mutex sync.Mutex
connection *nats.Conn
subscriptions map[string]*subscriptionData
}

func CreateServerSessionImpl() *ServerSessionImpl {
s := ServerSessionImpl{
subscriptions: make(map[string]*subscriptionData),
}
if Config.NatsConnectionEnabled {
connection, err := nats.Connect(
Config.NatsUrl,
nats.ReconnectBufSize(-1), // do not buffer data in case connection is lost
nats.MaxReconnects(-1), // always retry the to reconnect
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
fmt.Println("nats server connection is lost")
}),
nats.ReconnectHandler(func(nc *nats.Conn) {
fmt.Println("nats server connection is back")
}),
nats.ErrorHandler(func(conn *nats.Conn, subscription *nats.Subscription, err error) {
fmt.Printf("nats issue: %v\n", err)
}),
)
if err != nil {
panic(err)
}
s.connection = connection
}
return &s
}

func (s *ServerSessionImpl) PublishMessage(
queueName string,
data []byte,
) {
if s.connection == nil {
return
}
err := s.connection.Publish(queueName, data)
if err != nil {
panic(err)
}
}

func (s *ServerSessionImpl) StopQueueSubscriptions(
queueName string,
) {
s.mutex.Lock()
defer s.mutex.Unlock()
subscriptions := s.subscriptions[queueName]
if subscriptions == nil {
return
}

for _, subscription := range subscriptions.subscribers {
err := subscription.Unsubscribe()
if err != nil {
panic(err)
}
}
}

func (s *ServerSessionImpl) SubscribeQueue(
queueName string,
handler SubscribeHandler,
) {
if s.connection == nil {
return
}
subscription, err := s.connection.Subscribe(queueName, func(message *nats.Msg) {
err := s.processMessage(handler, message.Data)
if err != nil {
panic(err)
}
})
if err != nil {
panic(err)
}

s.mutex.Lock()
defer s.mutex.Unlock()

subscriptions := s.subscriptions[queueName]
if subscriptions == nil {
subscriptions = &subscriptionData{}
s.subscriptions[queueName] = subscriptions
}

subscriptions.subscribers = append(subscriptions.subscribers, subscription)
}

func (s *ServerSessionImpl) processMessage(
handler SubscribeHandler,
data []byte,
) (wrappedError error) {
defer func(myError *error) {
panicError := recover()
if panicError == nil {
myError = nil
} else {
*myError = fmt.Errorf("%v", panicError)
}
}(&wrappedError)
handler(data)
return wrappedError
}


The implementation keeps the created subscriptions so these can be later used for the stop of the subscription. The actual receiving of the message is handled in the processMessage function which wraps the handler for errors handling.

Notice that we use options in the NATS connect function to make a more stable connection, as well as print some log data in case of connection issues.


To run this in tests, we can use a stub:


package nats

import "sync"

type queueData struct {
subscribers []SubscribeHandler
mutex sync.Mutex
}

type ServerSessionStub struct {
queues map[string]*queueData
}

func CreateServerSessionStub() *ServerSessionStub {
return &ServerSessionStub{
queues: make(map[string]*queueData),
}
}

func (s *ServerSessionStub) StopQueueSubscriptions(
queueName string,
) {
mutex.Lock()
defer mutex.Unlock()

s.queues[queueName] = nil
}

func (s *ServerSessionStub) PublishMessage(
queueName string,
data []byte,
) {
mutex.Lock()
defer mutex.Unlock()

subscribers := s.queues[queueName]
if subscribers == nil {
return
}
for _, subscriber := range subscribers.subscribers {
subscriber(data)
}
}

func (s *ServerSessionStub) SubscribeQueue(
queueName string,
handler SubscribeHandler,
) {
mutex.Lock()
defer mutex.Unlock()

subscribers := s.queues[queueName]
if subscribers == nil {
subscribers = &queueData{}
s.queues[queueName] = subscribers
}
subscribers.subscribers = append(subscribers.subscribers, handler)
}


Both the stub and the impl can be used as the interface, hence our code can be run with the real NATS API, as well as with the stub.






Monday, December 4, 2023

NATS cluster setup in a kubernetes environment




In this post we will review the steps to "nationalize" a NATS cluster installation as part of our own kubernetes solution.


Using a 3rd-party technology as part of a kubernetes solution usually requires that this technology is installed as part of the solution itself, using a single helm chart for the entire solution. This means we need to create our own docker images to wrap the 3rd-party docker images, and integrate the 3rd-party helm chart into our solution helm charts. Let's review the steps to accomplish this for a NATS cluster.


Docker Images

First, start by wrapping up the NATS docker images. The core NATS server uses 2 images. In addition we will wrap the nats-box images which supplies CLI utilities.


The required docker files are:

NATS reloader Dockerfile:

ARG dockerCacheRepo
FROM ${dockerCacheRepo}natsio/nats-server-config-reloader:0.14.0


NATS container Dockerfile:

ARG dockerCacheRepo
FROM ${dockerCacheRepo}nats:2.10.5-alpine


NATS box Dockerfile:

ARG dockerCacheRepo
FROM ${dockerCacheRepo}natsio/nats-box:0.14.1


Notice that we use a build argument: dockerCacheRepo, which should be sent to the docker build command using the argument:

--build-arg dockerCacheRepo=https://my.docker.proxy.com

This enables us to use a docker proxy server and avoid the docker hub download limit.


The docker images should be tagged using our own solution naming, for example:

my-project-repo.com/solution1/nats-container:0.0.1


Helm Chart

To create templates for the NATS deployment, we using the official NATS chart:

helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm template -f ./helm-values.yaml nats nats/nats --output-dir ./target


and the helm-values.yaml file is:

fullnameOverride: solution1-nats
reloader:
image:
repository: my-project-repo.com/solution1/nats-reloader/dev
tag: latest
pullPolicy: IfNotPresent
container:
image:
repository: my-project-repo.com/solution1/nats-container/dev
tag: latest
pullPolicy: IfNotPresent
natsBox:
container:
image:
pullPolicy: IfNotPresent
repository: my-project-repo.com/solution1/nats-box/dev
tag: latest
config:
cluster:
enabled: true
port: 6222
replicas: 2
nats:
port: 4222
monitor:
port: 8222

 

This creates multiple files under the target folder. This files are the templates that should be integrated into our solution, probably as one of the sub charts. An example of one of these files containing the NATS deployment is below.


---
# Source: nats/templates/nats-box/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app.kubernetes.io/component: nats-box
app.kubernetes.io/instance: nats
app.kubernetes.io/managed-by: Helm
app.kubernetes.io/name: nats
app.kubernetes.io/version: 2.10.5
helm.sh/chart: nats-1.1.5
name: solution1-nats-box
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/component: nats-box
app.kubernetes.io/instance: nats
app.kubernetes.io/name: nats
template:
metadata:
labels:
app.kubernetes.io/component: nats-box
app.kubernetes.io/instance: nats
app.kubernetes.io/managed-by: Helm
app.kubernetes.io/name: nats
app.kubernetes.io/version: 2.10.5
helm.sh/chart: nats-1.1.5
spec:
containers:
- args:
- trap true INT TERM; sleep infinity & wait
command:
- sh
- -ec
- |
work_dir="$(pwd)"
mkdir -p "$XDG_CONFIG_HOME/nats"
cd "$XDG_CONFIG_HOME/nats"
if ! [ -s context ]; then
ln -s /etc/nats-contexts context
fi
if ! [ -f context.txt ]; then
echo -n "default" > context.txt
fi
cd "$work_dir"
exec sh -ec "$0"
image: my-project-repo.com/solution1/nats-box/dev:latest
imagePullPolicy: IfNotPresent
name: nats-box
volumeMounts:
- mountPath: /etc/nats-contexts
name: contexts
enableServiceLinks: false
volumes:
- name: contexts
secret:
secretName: solution1-nats-box-contexts











Sunday, November 26, 2023

Connect and traverse On AWS Neptune using GoLang

 

This is an example for using the gremlingo library to connect to the AWS Neptune graph database.

The documentation for the library is very poor, so I though adding this example might be required.

Notice that to run it, you need to run the GO binary in the same VPC as the AWS Neptune.


In the example we connect to the database, add some nodes, and traverse the added nodes.


package main

import (
"fmt"
gremlingo "github.com/apache/tinkerpop/gremlin-go/v3/driver"
)

func main() {
// Creating the connection to the server.
connection, err := gremlingo.NewDriverRemoteConnection("wss://db-neptune-2.cluster-cby3ysf1nfyw.us-east-1.neptune.amazonaws.com:8182/gremlin",
func(settings *gremlingo.DriverRemoteConnectionSettings) {
settings.TraversalSource = "g"
})
if err != nil {
panic(err)
}
defer connection.Close()

useGraphDb(connection)
}

func useGraphDb(connection *gremlingo.DriverRemoteConnection) {
traversalSource := gremlingo.Traversal_().WithRemote(connection)

alice := addVertex(traversalSource, "buyer", "alice")
bob := addVertex(traversalSource, "buyer", "bob")
charlie := addVertex(traversalSource, "buyer", "charlie")
bread := addVertex(traversalSource, "product", "bread")
butter := addVertex(traversalSource, "product", "butter")

_, err := traversalSource.AddE("buy").From(alice).To(bread).Next()
if err != nil {
panic(err)
}
_, err = traversalSource.AddE("buy").From(bob).To(bread).Next()
if err != nil {
panic(err)
}
_, err = traversalSource.AddE("buy").From(charlie).To(butter).Next()
if err != nil {
panic(err)
}

printList(traversalSource.V())
printList(traversalSource.E())
printList(traversalSource.E().Property("type", "buy").V())
}

func printList(traversalSource *gremlingo.GraphTraversal) {
results, err := traversalSource.Limit(100).ToList()
if err != nil {
panic(err)
}
fmt.Printf("the list has %v items\n", len(results))

for _, r := range results {
fmt.Printf("%+v\n", r.String())
}
}

func addVertex(
traversalSource *gremlingo.GraphTraversalSource,
vertexType string,
vertexName string,
) *gremlingo.Vertex {
result, err := traversalSource.AddV(vertexType).Property("name", vertexName).Next()
if err != nil {
panic(err)
}
vertex, err := result.GetVertex()
if err != nil {
panic(err)
}
return vertex
}



Monday, November 20, 2023

Using OpenAI in Go


 


In this post we will review how to use OpenAI API in Go. We will download an HTML file, and let OpenAI analyze it.


Let's start with the basic, we need to send HTTP requests both for downloading the HTML file, and for accessing OpenAI API, hence the function to handle HTTP requests is:


func sendHttpRequest(
method string,
fullUrl string,
headers map[string]string,
body interface{},
) string {
httpClient := http.DefaultClient
var bodyReader io.Reader
if body != nil {
bodyBytes, err := json.Marshal(body)
if err != nil {
panic(err)
}
bodyReader = bytes.NewReader(bodyBytes)
}

httpRequest, err := http.NewRequest(method, fullUrl, bodyReader)
for key, value := range headers {
httpRequest.Header.Set(key, value)
}
httpResponse, err := httpClient.Do(httpRequest)
if err != nil {
panic(err)
}
bodyBytes, err := ioutil.ReadAll(httpResponse.Body)
if err != nil {
panic(err)
}

bodyString := string(bodyBytes)

if httpResponse.StatusCode != 200 {
panic(bodyString)
}
return bodyString
}


Let's analyze a specific HTML file:


analyzeHtml("https://assets.bounceexchange.com/assets/bounce/local_storage_frame17.min.html")



and the analyze function is:


func analyzeHtml(
fullUrl string,
) {
htmlData := sendHttpRequest("GET", fullUrl, nil, nil)

openAiListModels()

guide := "provide the following in JSON format:\n" +
"1. length, int, the characters amount in the HTML\n" +
"2. scripts, boolean, does the HTML include javascripts"
openAiCompletions(&completionRequest{
Model: "gpt-3.5-turbo-1106",
MaxTokens: 100,
Messages: []*completionMessage{
{
Role: "system",
Content: guide,
},
{
Role: "user",
Content: htmlData,
},
},
Temperature: 0,
})
}


We download the HTML file, and then use OpenAI API to analyze it.

As a side note, we can list the models available for us:

func openAiListModels() {
headers := map[string]string{
"Authorization": "Bearer " + Config.OpenAiKey,
}
result := sendHttpRequest("GET", "https://api.openai.com/v1/models", headers, nil)
print(result)
}


The structure of the completion request is:

type completionMessage struct {
Role string `json:"role"`
Content string `json:"content"`
}

type completionRequest struct {
Model string `json:"model"`
Messages []*completionMessage `json:"messages"`
Temperature float32 `json:"temperature"`
MaxTokens int `json:"max_tokens"`
}



and an example output is:


{
"id": "chatcmpl-8MyaJSYyAu4CVheOnHeQaA4Z7ThRQ",
"object": "chat.completion",
"created": 1700486795,
"model": "gpt-3.5-turbo-1106",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "```json\n{\n \"length\": 2283,\n \"scripts\": true\n}\n```"
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": 705,
"completion_tokens": 20,
"total_tokens": 725
},
"system_fingerprint": "fp_eeff13170a"
}


This output can now be parsed by another code, and act automatically as required.



Sunday, November 12, 2023

AWS Kinesis vs. AWS SQS


 


In previous post we've demonstrated Using AWS Kinesis in Go, and Using AWS SQS in Go. In this post we'll review when should we use each of this technologies.

Before going into details, there is one rule of thumb that let you know which technology to use. This might not be always correct, and we should check the details below for the final decision, but this is a very good start point. 

Use AWS SQS if the following is required:

  • High rate and transparent dynamic scalability of messages processing
  • Ordering is not important
Use AWS Kinesis if the following is required:
  • Reading of the same message by several consumers
  • Replay of messages 
  • Ordering is important


Let's dive into the details. We will review each of the characteristics of the technologies, and compare where it better to use one over the other.


Scale

AWS SQS automatically transparently handles dynamic load changes. This means the application automatically adjusts to load changes. This is a huge advantage for AWS SQS.

AWS Kinesis can handle high load but the AWS Kinesis shards and the application needs to be preconfigured with the expected load. Change of the shards amount requires both change of application behavior and a resharding operation, which is definitely not a fun game.


Multiple Consumers

AWS SQS supports message level operations. We can read a message, and then we should delete it within a preconfigured time limit. Once a message is read and deleted it is no longer available to other consumers. If we require multiple consumers reading the same messages, we cannot do it by AWS SQS itself, but we can use AWS SNS in combination with AWS SQS to create a fanout pattern. This fanout is preconfigured, and dynamic changes to the fanout configuration require both application changes and SQS+SNS changes.

AWS Kinesis works on stream level, and the messages are kept in the stream until the end of the retention period. This enable us using multiple consumers reading the same message, each using its own iterator. More than that, the consumers can be created on the fly without any preconfiguration on the AWS Kinesis side.


Ordering

AWS SQS does not guarantee message ordering. We can how ever use an AWS SQS FIFO queue that guarantees order as well as exactly-once processing. This however comes with a cost both in price and in performance.

AWS Kinesis does guarantee message ordering, but only on the shard level. This is usually fine, as implementation with AWS Kinesis tends to create consumer for each application logical entity on the shard level.


Message Size

AWS SQS supports messages up to 256KiB.
AWS Kinesis supports messages up to 1MiB.


Throughput

AWS SQS supports nearly unlimited number of transactions per second (!!!).

AWS Kinesis supports for each shard:
  • write 1MB/1000 transactions per second
  • read 2MB/2000 transactions per second

AWS Services Integration

AWS SQS messages can be configured to be processed by AWS Lambda.

AWS Kinesis messages can be configured to be processes by many AWS services such as AWS S3, AWS EkasticSearch, AWS Lambda, and more.


Pricing

In general first look on the costs seems to be similar, but the details here matter especially if we plan to process huge amount of data. The costs should be compared based of the amount of shards we might need for AWS Kinesis. There is not guideline here (sorry...), we will need to plan the entire solution to get an estimation of the cost.




Sunday, November 5, 2023

Using AWS Kinesis in Go


 

In this post we will review a simple Go implementation of AWS kinesis producer and consumer.


The main function starts the producer and the consumer, and then waits forever.


import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"sync"
"time"
)

func main() {
go producer()
go consumer()

waitGroup := sync.WaitGroup{}
waitGroup.Add(1)
waitGroup.Wait()
}


The producer writes a record to AWS kinesis stream every second.


func producer() {
awSession := session.Must(session.NewSession())
kinesisClient := kinesis.New(awSession)
for i := 0; ; i++ {
input := kinesis.PutRecordInput{
Data: []byte(fmt.Sprintf("message data %v", i)),
PartitionKey: aws.String(fmt.Sprintf("key%v", i)),
StreamName: aws.String("s1"),
}
output, err := kinesisClient.PutRecord(&input)
if err != nil {
panic(err)
}

fmt.Printf("produced record to shard %v\n", *output.ShardId)
time.Sleep(time.Second)
}
}


The consumer is more complex. First we need to list the stream's shards, and start a consumer for each of the shards.


func consumer() {
awSession := session.Must(session.NewSession())
kinesisClient := kinesis.New(awSession)

input := kinesis.DescribeStreamInput{
StreamName: aws.String("s1"),
}
output, err := kinesisClient.DescribeStream(&input)
if err != nil {
panic(err)
}

fmt.Printf("%v shards loacted\n", len(output.StreamDescription.Shards))

for i := 0; i < len(output.StreamDescription.Shards); i++ {
shardId := output.StreamDescription.Shards[i].ShardId
go consumerShard(kinesisClient, shardId)
}
}


Each shard consumer has a small trick. First we use iterator to get all the new records. Once we got a record, we modify the iterator to read all records after the last received record.


func consumerShard(
kinesisClient *kinesis.Kinesis,
shardId *string,
) {
iteratorInput := kinesis.GetShardIteratorInput{
ShardId: shardId,
ShardIteratorType: aws.String("LATEST"),
StreamName: aws.String("s1"),
}
shardIteratorOutput, err := kinesisClient.GetShardIterator(&iteratorInput)
if err != nil {
panic(err)
}

var lastRecord *string
for {
recordsInput := kinesis.GetRecordsInput{
ShardIterator: shardIteratorOutput.ShardIterator,
}
records, err := kinesisClient.GetRecords(&recordsInput)
if err != nil {
panic(err)
}

fmt.Printf("shard %v got %v records\n", *shardId, len(records.Records))

for i := 0; i < len(records.Records); i++ {
record := records.Records[i]
fmt.Printf("shard %v data: %v\n", *shardId, string(record.Data))
lastRecord = record.SequenceNumber
}

time.Sleep(5 * time.Second)

if lastRecord != nil {
iteratorInput.ShardIteratorType = aws.String("AFTER_SEQUENCE_NUMBER")
iteratorInput.StartingSequenceNumber = lastRecord
shardIteratorOutput, err = kinesisClient.GetShardIterator(&iteratorInput)
if err != nil {
panic(err)
}
}
}
}


An example output for running this is below.


produced record to shard shardId-000000000000
4 shards loacted
shard shardId-000000000000 got 0 records
shard shardId-000000000003 got 0 records
shard shardId-000000000002 got 0 records
shard shardId-000000000001 got 0 records
produced record to shard shardId-000000000003
produced record to shard shardId-000000000001
produced record to shard shardId-000000000000
produced record to shard shardId-000000000003
shard shardId-000000000000 got 1 records
shard shardId-000000000000 data: message data 3
shard shardId-000000000003 got 2 records
shard shardId-000000000003 data: message data 1
shard shardId-000000000003 data: message data 4
shard shardId-000000000001 got 1 records
shard shardId-000000000001 data: message data 2
shard shardId-000000000002 got 0 records
produced record to shard shardId-000000000000
produced record to shard shardId-000000000001
produced record to shard shardId-000000000003
produced record to shard shardId-000000000001
produced record to shard shardId-000000000000
shard shardId-000000000000 got 2 records
shard shardId-000000000000 data: message data 5
shard shardId-000000000000 data: message data 9
shard shardId-000000000003 got 1 records
shard shardId-000000000003 data: message data 7
shard shardId-000000000002 got 0 records
shard shardId-000000000001 got 2 records
shard shardId-000000000001 data: message data 6
shard shardId-000000000001 data: message data 8
produced record to shard shardId-000000000003
produced record to shard shardId-000000000003
produced record to shard shardId-000000000003
produced record to shard shardId-000000000003
shard shardId-000000000000 got 0 records
shard shardId-000000000003 got 4 records
shard shardId-000000000003 data: message data 10
shard shardId-000000000003 data: message data 11
shard shardId-000000000003 data: message data 12
shard shardId-000000000003 data: message data 13
shard shardId-000000000002 got 0 records
shard shardId-000000000001 got 0 records
produced record to shard shardId-000000000002
produced record to shard shardId-000000000001
produced record to shard shardId-000000000003
produced record to shard shardId-000000000002
produced record to shard shardId-000000000000
shard shardId-000000000000 got 1 records
shard shardId-000000000000 data: message data 18
shard shardId-000000000002 got 2 records
shard shardId-000000000002 data: message data 14
shard shardId-000000000002 data: message data 17
shard shardId-000000000003 got 1 records
shard shardId-000000000003 data: message data 16
shard shardId-000000000001 got 1 records
shard shardId-000000000001 data: message data 15
produced record to shard shardId-000000000001
produced record to shard shardId-000000000000


This is just a starting point for using AWS kinesis, as they are many details that affect the stability of a production grade solution. Still, this is a good starting point.


Wednesday, October 25, 2023

Using AWS SQS in Go


In this post we will review usage of AWS SQS with a Go Application.

Let's have a look at the general structure of the example:

package main

import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"strings"
"sync"
"time"
)

const queueUrl = "https://sqs.us-east-1.amazonaws.com/YOUR_ACCOUNT_ID/YOUR_QUEUE_NAME"

func main() {
purge()
go sendLoop()
go receiveLoop(1)
go receiveLoop(2)
var waitGroup sync.WaitGroup
waitGroup.Add(1)
waitGroup.Wait()
}


We start by purging the queue messages, assuring that we start with a clear queue, and not affected by previous runs. 

Notice we can purge queue messages only once in a minute.


func purge() {
awSession := session.Must(session.NewSession())
svc := sqs.New(awSession)
queueInput := &sqs.PurgeQueueInput{
QueueUrl: aws.String(queueUrl),
}
_, err := svc.PurgeQueue(queueInput)
if err != nil {
panic(err)
}
}


We run a go routine to produce messages every ~2 seconds. 

We use DelaySeconds of 3 seconds. This might be required in case the processing cannot start immediately due to some other processing that should be prepared in the background.


func sendLoop() {
awSession := session.Must(session.NewSession())
svc := sqs.New(awSession)

i := 0
for {
i++
messageInput := sqs.SendMessageInput{
DelaySeconds: aws.Int64(3),
MessageBody: aws.String(fmt.Sprintf("%v", i)),
QueueUrl: aws.String(queueUrl),
}

_, err := svc.SendMessage(&messageInput)
if err != nil {
panic(err)
}
fmt.Printf("sent message #%v\n", i)

time.Sleep(2 * time.Second)
}
}


The last thing we do is to run two consumers as go routine. 

Notice that we configure we're willing to wait up to 1 second for messages, which is good to reduce amount of AWS API calls and enables getting larger bulks of messages, up to MaxNumberOfMessages. 

We also use VisibilityTimeout of 10 seconds that marks the message as not available to other consumers for that period. This means the consumer has up to 10 seconds to process the message and to delete if from the queue.


func receiveLoop(
receiverId int,
) {
awSession := session.Must(session.NewSession())
svc := sqs.New(awSession)

for {
messageInput := sqs.ReceiveMessageInput{
AttributeNames: []*string{
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
},
MessageAttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),
},
QueueUrl: aws.String(queueUrl),
MaxNumberOfMessages: aws.Int64(10),
VisibilityTimeout: aws.Int64(10),
WaitTimeSeconds: aws.Int64(1),
}

msgResult, err := svc.ReceiveMessage(&messageInput)
if err != nil {
panic(err)
}

if len(msgResult.Messages) == 0 {
fmt.Printf("receiver %v, no messages\n", receiverId)
} else {
items := make([]string, 0)
for _, message := range msgResult.Messages {
items = append(items, *message.Body)
}
fmt.Printf("receiver %v, messages: %v\n",
receiverId, strings.Join(items, ","))

for _, message := range msgResult.Messages {
deleteMessageInput := sqs.DeleteMessageInput{
QueueUrl: aws.String(queueUrl),
ReceiptHandle: message.ReceiptHandle,
}
_, err = svc.DeleteMessage(&deleteMessageInput)
if err != nil {
panic(err)
}
}
}
}
}


An example of output is below. 

Notice that a message is indeed consumed only 3 seconds after it was sent.

Also, once one receiver got a message, it is not available for the other consumers.


sent message #1
receiver 2, no messages
receiver 1, no messages
receiver 2, no messages
sent message #2
receiver 1, no messages
receiver 2, no messages
receiver 1, messages: 1
receiver 2, no messages
sent message #3
receiver 1, no messages
receiver 1, messages: 2
receiver 2, no messages
receiver 2, no messages
sent message #4
receiver 1, no messages
receiver 1, messages: 3
receiver 2, no messages
receiver 1, no messages
receiver 2, no messages
sent message #5
receiver 2, messages: 4
receiver 1, no messages
receiver 2, no messages
receiver 1, no messages
sent message #6
receiver 2, messages: 5
receiver 1, no messages
receiver 2, no messages
sent message #7
receiver 1, no messages
receiver 2, messages: 6
receiver 1, no messages
sent message #8
receiver 2, no messages
receiver 1, no messages
receiver 2, messages: 7
receiver 1, no messages
sent message #9
receiver 2, no messages


Let's make our consumer fail to process the messages in a timely fashion in some cases.

We do this by changing the VisibilityTimeout to 5 seconds, and add a random sleep of up to 10 seconds before delete of the message from the queue.


func receiveLoop(
receiverId int,
) {
awSession := session.Must(session.NewSession())
svc := sqs.New(awSession)

for {
messageInput := sqs.ReceiveMessageInput{
AttributeNames: []*string{
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
},
MessageAttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),
},
QueueUrl: aws.String(queueUrl),
MaxNumberOfMessages: aws.Int64(10),
VisibilityTimeout: aws.Int64(5),
WaitTimeSeconds: aws.Int64(1),
}

msgResult, err := svc.ReceiveMessage(&messageInput)
if err != nil {
panic(err)
}

if len(msgResult.Messages) == 0 {
fmt.Printf("receiver %v, no mesages\n", receiverId)
} else {
items := make([]string, 0)
for _, message := range msgResult.Messages {
items = append(items, *message.Body)
}
fmt.Printf("receiver %v, messages: %v\n",
receiverId, strings.Join(items, ","))

sleepTime := time.Second * time.Duration(rand.Int63n(10))
fmt.Printf("receiver %v, sleeping %v\n", receiverId, sleepTime)
time.Sleep(sleepTime)

for _, message := range msgResult.Messages {
deleteMessageInput := sqs.DeleteMessageInput{
QueueUrl: aws.String(queueUrl),
ReceiptHandle: message.ReceiptHandle,
}
_, err = svc.DeleteMessage(&deleteMessageInput)
if err != nil {
panic(err)
}
}
}
}
}


The result is below.

We can see that some messages that took too long to process are re-consumed by another consumer (see message #4 for example).


sent message #1
receiver 2, no mesages
receiver 1, no mesages
sent message #2
receiver 2, no mesages
receiver 1, no mesages
receiver 2, messages: 1
receiver 2, sleeping 9s
receiver 1, no mesages
sent message #3
receiver 1, messages: 2
receiver 1, sleeping 2s
sent message #4
receiver 1, messages: 3
receiver 1, sleeping 4s
sent message #5
sent message #6
receiver 1, messages: 4
receiver 1, sleeping 8s
receiver 2, messages: 5
receiver 2, sleeping 7s
sent message #7
sent message #8
sent message #9
sent message #10
receiver 2, messages: 4,6,7
receiver 2, sleeping 5s
receiver 1, messages: 8
receiver 1, sleeping 6s
sent message #11
sent message #12
receiver 2, messages: 10
receiver 2, sleeping 8s
sent message #13
receiver 1, messages: 9,11
receiver 1, sleeping 3s
sent message #14
receiver 1, messages: 12
receiver 1, sleeping 3s
sent message #15
sent message #16
receiver 1, messages: 14
receiver 1, sleeping 7s
receiver 2, messages: 13,15
receiver 2, sleeping 2s
sent message #17
receiver 2, messages: 16
receiver 2, sleeping 8s
sent message #18
sent message #19
receiver 1, messages: 18
receiver 1, sleeping 5s