Monday, December 11, 2023

Using NATS in go

 


In the following post we review an example of using NATS in Go. This includes an interface API, implementation, and stub that can be used for tests.


We start with an interface:


type SubscribeHandler func(data []byte)

type ServerSessionApi interface {
PublishMessage(
queueName string,
data []byte,
)

SubscribeQueue(
queueName string,
handler SubscribeHandler,
)

StopQueueSubscriptions(
queueName string,
)
}


This simple interface enables usage of a simple publish-subscribe pattern. Now for the actual implementation.


package nats

import (
"fmt"
"github.com/nats-io/nats.go"
"sync"
)

type subscriptionData struct {
subscribers []*nats.Subscription
}

type ServerSessionImpl struct {
mutex sync.Mutex
connection *nats.Conn
subscriptions map[string]*subscriptionData
}

func CreateServerSessionImpl() *ServerSessionImpl {
s := ServerSessionImpl{
subscriptions: make(map[string]*subscriptionData),
}
if Config.NatsConnectionEnabled {
connection, err := nats.Connect(
Config.NatsUrl,
nats.ReconnectBufSize(-1), // do not buffer data in case connection is lost
nats.MaxReconnects(-1), // always retry the to reconnect
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
fmt.Println("nats server connection is lost")
}),
nats.ReconnectHandler(func(nc *nats.Conn) {
fmt.Println("nats server connection is back")
}),
nats.ErrorHandler(func(conn *nats.Conn, subscription *nats.Subscription, err error) {
fmt.Printf("nats issue: %v\n", err)
}),
)
if err != nil {
panic(err)
}
s.connection = connection
}
return &s
}

func (s *ServerSessionImpl) PublishMessage(
queueName string,
data []byte,
) {
if s.connection == nil {
return
}
err := s.connection.Publish(queueName, data)
if err != nil {
panic(err)
}
}

func (s *ServerSessionImpl) StopQueueSubscriptions(
queueName string,
) {
s.mutex.Lock()
defer s.mutex.Unlock()
subscriptions := s.subscriptions[queueName]
if subscriptions == nil {
return
}

for _, subscription := range subscriptions.subscribers {
err := subscription.Unsubscribe()
if err != nil {
panic(err)
}
}
}

func (s *ServerSessionImpl) SubscribeQueue(
queueName string,
handler SubscribeHandler,
) {
if s.connection == nil {
return
}
subscription, err := s.connection.Subscribe(queueName, func(message *nats.Msg) {
err := s.processMessage(handler, message.Data)
if err != nil {
panic(err)
}
})
if err != nil {
panic(err)
}

s.mutex.Lock()
defer s.mutex.Unlock()

subscriptions := s.subscriptions[queueName]
if subscriptions == nil {
subscriptions = &subscriptionData{}
s.subscriptions[queueName] = subscriptions
}

subscriptions.subscribers = append(subscriptions.subscribers, subscription)
}

func (s *ServerSessionImpl) processMessage(
handler SubscribeHandler,
data []byte,
) (wrappedError error) {
defer func(myError *error) {
panicError := recover()
if panicError == nil {
myError = nil
} else {
*myError = fmt.Errorf("%v", panicError)
}
}(&wrappedError)
handler(data)
return wrappedError
}


The implementation keeps the created subscriptions so these can be later used for the stop of the subscription. The actual receiving of the message is handled in the processMessage function which wraps the handler for errors handling.

Notice that we use options in the NATS connect function to make a more stable connection, as well as print some log data in case of connection issues.


To run this in tests, we can use a stub:


package nats

import "sync"

type queueData struct {
subscribers []SubscribeHandler
mutex sync.Mutex
}

type ServerSessionStub struct {
queues map[string]*queueData
}

func CreateServerSessionStub() *ServerSessionStub {
return &ServerSessionStub{
queues: make(map[string]*queueData),
}
}

func (s *ServerSessionStub) StopQueueSubscriptions(
queueName string,
) {
mutex.Lock()
defer mutex.Unlock()

s.queues[queueName] = nil
}

func (s *ServerSessionStub) PublishMessage(
queueName string,
data []byte,
) {
mutex.Lock()
defer mutex.Unlock()

subscribers := s.queues[queueName]
if subscribers == nil {
return
}
for _, subscriber := range subscribers.subscribers {
subscriber(data)
}
}

func (s *ServerSessionStub) SubscribeQueue(
queueName string,
handler SubscribeHandler,
) {
mutex.Lock()
defer mutex.Unlock()

subscribers := s.queues[queueName]
if subscribers == nil {
subscribers = &queueData{}
s.queues[queueName] = subscribers
}
subscribers.subscribers = append(subscribers.subscribers, handler)
}


Both the stub and the impl can be used as the interface, hence our code can be run with the real NATS API, as well as with the stub.






No comments:

Post a Comment