Monday, December 25, 2023

AWS SSQ API, Implementation, and Stub in Go


 

In this post we will present a to send messages to AWS SQS. We will include an interface, an implementation, and a stub. We've previously included an example for producer and consumer in Go, and here we provide a nice API that enables us to use this code both in production and in tests.


The interface is the minimal API required to send a message. In this interface we hide the AWS session connection, as well as the AWS SQS queue name, and include only the message details.



type SqsApi interface {
SendMessage(
attributes map[string]string,
data string,
)
}



The implementation uses an AWS session. Notice that there are several methods to get an AWS session, but in this implementation we use the simplest method.


package awssqs

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
)

type SqsImpl struct {
sqsClient *sqs.SQS
queueUrl string
}

func ProduceAwsSqsImpl(
queueUrl string,
) *SqsImpl {
awsSession, err := session.NewSession()
if err!= nil{
panic(err)
}

return &SqsImpl{
sqsClient: sqs.New(awsSession),
queueUrl: queueUrl,
}
}

func (s *SqsImpl) SendMessage(
attributes map[string]string,
data string,
) {
input := sqs.SendMessageInput{
MessageAttributes: make(map[string]*sqs.MessageAttributeValue),
MessageBody: aws.String(data),
QueueUrl: aws.String(s.queueUrl),
}

for key, value := range attributes {
input.MessageAttributes[key] = &sqs.MessageAttributeValue{
DataType: aws.String("String"),
StringValue: aws.String(value),
}
}

_, err := s.sqsClient.SendMessage(&input)
if err!= nil{
panic(err)
}
}



We also include a stub that can be used during tests:



type MessageData struct {
Attributes map[string]string
Data string
}

type SqsStub struct {
Messages []*MessageData
}

func ProduceAwsSqsStub() *SqsStub {
return &SqsStub{}
}

func (s *SqsStub) SendMessage(
attributes map[string]string,
data string,
) {
messageData := MessageData{
Attributes: attributes,
Data: data,
}
s.Messages = append(s.Messages, &messageData)
}






Monday, December 18, 2023

List Files with Epoch Names



In many applications and solutions we create files with timestamp, and a very common method is to use epoch seconds in the name of the files. This is very useful to automated analysis of the files. But, from time to time, we might need to manually analyze the files, and then locating the file with the epoch we want is is tough task. Consider the following files list:


create_time.txt
graphs1702167500-1702171090.json
graphs1702171100-1702174690.json
graphs1702171100-a.json
graphs1702174700-1702178290.json
graphs1702178300-1702181890.json
graphs1702181900-1702185490.json
graphs1702185500-1702189090.json
graphs1702189100-1702192690.json
graphs1702192700-1702196290.json
graphs1702196300-1702199890.json
graphs1702199900-1702203490.json
graphs1702203500-1702205010.json
signatures_1702167500-1702167500.json
signatures_1702167510-1702167510.json
signatures_1702167520-1702167520.json
signatures_1702167530-1702167530.json
signatures_1702167540-1702167540.json
signatures_1702167550-1702167550.json
signatures_1702167560-1702167560.json
signatures_1702167570-1702167570.json
signatures_1702167580-1702167580.json


we can copy the epoch from the file name, and use bash's `date` command to find the actual time of the file, for example:


$ date -u -d @1702171100
Sun 10 Dec 2023 01:18:20 UTC


But doing this for each file is not fun. Instead, we can use a script to automate listing of the files. The following script does the job:


#!/usr/bin/env bash
set -e

folder=$1

ProcessLine() {
file=$1
timeDescription="missing epoch"
epoch1=$(echo "$file" | sed 's/[a-zA-Z_-]*\([0-9]\{10\}\).*/\1/p' | head -1)
epoch1Length=${#epoch1}
if [[ "${epoch1Length}" == "10" ]]; then
date1=$(date +%Y-%m-%dT%H:%M:%S -u -d @${epoch1})
epoch2=$(echo "$file" | sed 's/.*\([0-9]\{10\}\).*\([0-9]\{10\}\).*/\2/p' | head -1)
epoch2Length=${#epoch2}
if [[ "${epoch2Length}" == "10" ]]; then
date2=$(date +%Y-%m-%dT%H:%M:%S -u -d @${epoch2})
timeDescription="${date1} to ${date2}"
else
timeDescription="${date1}"
fi
fi

sizeDescription=$(ls -lh ${file} | awk '{print $5}')
sizeDescription=$(printf "%8s %s" ${sizeDescription})
fileDescription=$(printf "%-50s %s" ${file})

echo "${fileDescription} ${sizeDescription} ${timeDescription}"
}

ls $1 | while read line; do ProcessLine "$line"; done


The output from the script lists all files in the current working directory, including the file name, size, and user friendly times. It supports files without epoch in the name, files with a single epoch in the name, and files with epoch range in their names. An example output is:


create_time.txt                                           51  missing epoch
graphs1702167500-1702171090.json 193K 2023-12-10T00:18:20 to 2023-12-10T01:18:10
graphs1702171100-1702174690.json 193K 2023-12-10T01:18:20 to 2023-12-10T02:18:10
graphs1702171100-a.json 0 2023-12-10T01:18:20
graphs1702174700-1702178290.json 193K 2023-12-10T02:18:20 to 2023-12-10T03:18:10
graphs1702178300-1702181890.json 181K 2023-12-10T03:18:20 to 2023-12-10T04:18:10
graphs1702181900-1702185490.json 184K 2023-12-10T04:18:20 to 2023-12-10T05:18:10
graphs1702185500-1702189090.json 194K 2023-12-10T05:18:20 to 2023-12-10T06:18:10
graphs1702189100-1702192690.json 195K 2023-12-10T06:18:20 to 2023-12-10T07:18:10
graphs1702192700-1702196290.json 195K 2023-12-10T07:18:20 to 2023-12-10T08:18:10
graphs1702196300-1702199890.json 178K 2023-12-10T08:18:20 to 2023-12-10T09:18:10
graphs1702199900-1702203490.json 190K 2023-12-10T09:18:20 to 2023-12-10T10:18:10
graphs1702203500-1702205010.json 76K 2023-12-10T10:18:20 to 2023-12-10T10:43:30
signatures_1702167500-1702167500.json 29 2023-12-10T00:18:20 to 2023-12-10T00:18:20
signatures_1702167510-1702167510.json 29 2023-12-10T00:18:30 to 2023-12-10T00:18:30
signatures_1702167520-1702167520.json 29 2023-12-10T00:18:40 to 2023-12-10T00:18:40
signatures_1702167530-1702167530.json 29 2023-12-10T00:18:50 to 2023-12-10T00:18:50
signatures_1702167540-1702167540.json 29 2023-12-10T00:19:00 to 2023-12-10T00:19:00
signatures_1702167550-1702167550.json 29 2023-12-10T00:19:10 to 2023-12-10T00:19:10







Monday, December 11, 2023

Using NATS in go

 


In the following post we review an example of using NATS in Go. This includes an interface API, implementation, and stub that can be used for tests.


We start with an interface:


type SubscribeHandler func(data []byte)

type ServerSessionApi interface {
PublishMessage(
queueName string,
data []byte,
)

SubscribeQueue(
queueName string,
handler SubscribeHandler,
)

StopQueueSubscriptions(
queueName string,
)
}


This simple interface enables usage of a simple publish-subscribe pattern. Now for the actual implementation.


package nats

import (
"fmt"
"github.com/nats-io/nats.go"
"sync"
)

type subscriptionData struct {
subscribers []*nats.Subscription
}

type ServerSessionImpl struct {
mutex sync.Mutex
connection *nats.Conn
subscriptions map[string]*subscriptionData
}

func CreateServerSessionImpl() *ServerSessionImpl {
s := ServerSessionImpl{
subscriptions: make(map[string]*subscriptionData),
}
if Config.NatsConnectionEnabled {
connection, err := nats.Connect(
Config.NatsUrl,
nats.ReconnectBufSize(-1), // do not buffer data in case connection is lost
nats.MaxReconnects(-1), // always retry the to reconnect
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
fmt.Println("nats server connection is lost")
}),
nats.ReconnectHandler(func(nc *nats.Conn) {
fmt.Println("nats server connection is back")
}),
nats.ErrorHandler(func(conn *nats.Conn, subscription *nats.Subscription, err error) {
fmt.Printf("nats issue: %v\n", err)
}),
)
if err != nil {
panic(err)
}
s.connection = connection
}
return &s
}

func (s *ServerSessionImpl) PublishMessage(
queueName string,
data []byte,
) {
if s.connection == nil {
return
}
err := s.connection.Publish(queueName, data)
if err != nil {
panic(err)
}
}

func (s *ServerSessionImpl) StopQueueSubscriptions(
queueName string,
) {
s.mutex.Lock()
defer s.mutex.Unlock()
subscriptions := s.subscriptions[queueName]
if subscriptions == nil {
return
}

for _, subscription := range subscriptions.subscribers {
err := subscription.Unsubscribe()
if err != nil {
panic(err)
}
}
}

func (s *ServerSessionImpl) SubscribeQueue(
queueName string,
handler SubscribeHandler,
) {
if s.connection == nil {
return
}
subscription, err := s.connection.Subscribe(queueName, func(message *nats.Msg) {
err := s.processMessage(handler, message.Data)
if err != nil {
panic(err)
}
})
if err != nil {
panic(err)
}

s.mutex.Lock()
defer s.mutex.Unlock()

subscriptions := s.subscriptions[queueName]
if subscriptions == nil {
subscriptions = &subscriptionData{}
s.subscriptions[queueName] = subscriptions
}

subscriptions.subscribers = append(subscriptions.subscribers, subscription)
}

func (s *ServerSessionImpl) processMessage(
handler SubscribeHandler,
data []byte,
) (wrappedError error) {
defer func(myError *error) {
panicError := recover()
if panicError == nil {
myError = nil
} else {
*myError = fmt.Errorf("%v", panicError)
}
}(&wrappedError)
handler(data)
return wrappedError
}


The implementation keeps the created subscriptions so these can be later used for the stop of the subscription. The actual receiving of the message is handled in the processMessage function which wraps the handler for errors handling.

Notice that we use options in the NATS connect function to make a more stable connection, as well as print some log data in case of connection issues.


To run this in tests, we can use a stub:


package nats

import "sync"

type queueData struct {
subscribers []SubscribeHandler
mutex sync.Mutex
}

type ServerSessionStub struct {
queues map[string]*queueData
}

func CreateServerSessionStub() *ServerSessionStub {
return &ServerSessionStub{
queues: make(map[string]*queueData),
}
}

func (s *ServerSessionStub) StopQueueSubscriptions(
queueName string,
) {
mutex.Lock()
defer mutex.Unlock()

s.queues[queueName] = nil
}

func (s *ServerSessionStub) PublishMessage(
queueName string,
data []byte,
) {
mutex.Lock()
defer mutex.Unlock()

subscribers := s.queues[queueName]
if subscribers == nil {
return
}
for _, subscriber := range subscribers.subscribers {
subscriber(data)
}
}

func (s *ServerSessionStub) SubscribeQueue(
queueName string,
handler SubscribeHandler,
) {
mutex.Lock()
defer mutex.Unlock()

subscribers := s.queues[queueName]
if subscribers == nil {
subscribers = &queueData{}
s.queues[queueName] = subscribers
}
subscribers.subscribers = append(subscribers.subscribers, handler)
}


Both the stub and the impl can be used as the interface, hence our code can be run with the real NATS API, as well as with the stub.






Monday, December 4, 2023

NATS cluster setup in a kubernetes environment




In this post we will review the steps to "nationalize" a NATS cluster installation as part of our own kubernetes solution.


Using a 3rd-party technology as part of a kubernetes solution usually requires that this technology is installed as part of the solution itself, using a single helm chart for the entire solution. This means we need to create our own docker images to wrap the 3rd-party docker images, and integrate the 3rd-party helm chart into our solution helm charts. Let's review the steps to accomplish this for a NATS cluster.


Docker Images

First, start by wrapping up the NATS docker images. The core NATS server uses 2 images. In addition we will wrap the nats-box images which supplies CLI utilities.


The required docker files are:

NATS reloader Dockerfile:

ARG dockerCacheRepo
FROM ${dockerCacheRepo}natsio/nats-server-config-reloader:0.14.0


NATS container Dockerfile:

ARG dockerCacheRepo
FROM ${dockerCacheRepo}nats:2.10.5-alpine


NATS box Dockerfile:

ARG dockerCacheRepo
FROM ${dockerCacheRepo}natsio/nats-box:0.14.1


Notice that we use a build argument: dockerCacheRepo, which should be sent to the docker build command using the argument:

--build-arg dockerCacheRepo=https://my.docker.proxy.com

This enables us to use a docker proxy server and avoid the docker hub download limit.


The docker images should be tagged using our own solution naming, for example:

my-project-repo.com/solution1/nats-container:0.0.1


Helm Chart

To create templates for the NATS deployment, we using the official NATS chart:

helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm template -f ./helm-values.yaml nats nats/nats --output-dir ./target


and the helm-values.yaml file is:

fullnameOverride: solution1-nats
reloader:
image:
repository: my-project-repo.com/solution1/nats-reloader/dev
tag: latest
pullPolicy: IfNotPresent
container:
image:
repository: my-project-repo.com/solution1/nats-container/dev
tag: latest
pullPolicy: IfNotPresent
natsBox:
container:
image:
pullPolicy: IfNotPresent
repository: my-project-repo.com/solution1/nats-box/dev
tag: latest
config:
cluster:
enabled: true
port: 6222
replicas: 2
nats:
port: 4222
monitor:
port: 8222

 

This creates multiple files under the target folder. This files are the templates that should be integrated into our solution, probably as one of the sub charts. An example of one of these files containing the NATS deployment is below.


---
# Source: nats/templates/nats-box/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app.kubernetes.io/component: nats-box
app.kubernetes.io/instance: nats
app.kubernetes.io/managed-by: Helm
app.kubernetes.io/name: nats
app.kubernetes.io/version: 2.10.5
helm.sh/chart: nats-1.1.5
name: solution1-nats-box
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/component: nats-box
app.kubernetes.io/instance: nats
app.kubernetes.io/name: nats
template:
metadata:
labels:
app.kubernetes.io/component: nats-box
app.kubernetes.io/instance: nats
app.kubernetes.io/managed-by: Helm
app.kubernetes.io/name: nats
app.kubernetes.io/version: 2.10.5
helm.sh/chart: nats-1.1.5
spec:
containers:
- args:
- trap true INT TERM; sleep infinity & wait
command:
- sh
- -ec
- |
work_dir="$(pwd)"
mkdir -p "$XDG_CONFIG_HOME/nats"
cd "$XDG_CONFIG_HOME/nats"
if ! [ -s context ]; then
ln -s /etc/nats-contexts context
fi
if ! [ -f context.txt ]; then
echo -n "default" > context.txt
fi
cd "$work_dir"
exec sh -ec "$0"
image: my-project-repo.com/solution1/nats-box/dev:latest
imagePullPolicy: IfNotPresent
name: nats-box
volumeMounts:
- mountPath: /etc/nats-contexts
name: contexts
enableServiceLinks: false
volumes:
- name: contexts
secret:
secretName: solution1-nats-box-contexts