Full Blog TOC

Full Blog Table Of Content with Keywords Available HERE

Sunday, July 21, 2024

Using NATS Key-Value from GO




In this post we will review the steps to use NATS key-value from a GO application.


Install NATS

The NATS key-value is a feature supplied by JetStream, hence we install NATS with JetStream enabled:

Create a config file:

config:
cluster:
enabled: true
replicas: 3
jetstream:
enabled: true


And install using helm:

helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm install nats nats/nats -f config.yaml
#check connectivity
kubectl exec -it deployment/nats-box -- nats pub test hi


Using Key-Value from GO


The following includes a key-value access with the following:

  1. Put, Get, and Delete operations
  2. Watcher to get notifications for any update of the key
  3. Parallel updates and blocking awareness


package main

import (
"context"
"fmt"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"time"
)

func main() {

natsConnection, err := nats.Connect("nats://nats:4222")
if err != nil {
panic(err)
}

jetStreamConnection, err := jetstream.New(natsConnection)
if err != nil {
panic(err)
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

keyValue, err := jetStreamConnection.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "myKeyValue",
})
if err != nil {
panic(err)
}

const key = "myKey"

// run a watcher in the background to get updates on a specific key
watcher, _ := keyValue.Watch(ctx, key)
defer watcher.Stop()
go func() {
for update := range watcher.Updates() {
if update == nil {
fmt.Printf("watcer empty event\n")
} else {
update.Operation()
fmt.Printf("watcer operation %v key %s revision %d -> value %q\n",
update.Operation().String(), update.Key(), update.Revision(), string(update.Value()))
}
}
}()

// update #1
sequence, err := keyValue.Put(ctx, key, []byte("value1"))
if err != nil {
panic(err)
}
fmt.Printf("the update sequence is %v\n", sequence)

entry, err := keyValue.Get(ctx, key)
if err != nil {
panic(err)
}
fmt.Printf("key %s revision %d -> value %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

// update #2
sequence, err = keyValue.Put(ctx, key, []byte("value2"))
if err != nil {
panic(err)
}
fmt.Printf("the update sequence is %v\n", sequence)
entry, err = keyValue.Get(ctx, key)
if err != nil {
panic(err)
}
fmt.Printf("key %s revision %d -> value %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

// here we see that we can block parallel updates by specifying the revision we expect to update
sequence, err = keyValue.Update(ctx, key, []byte("parallelValue"), 1)
fmt.Printf("expected error: %s\n", err)

// ony when providing the correct revision, we are allowed to update
sequence, err = keyValue.Update(ctx, key, []byte("parallelValue"), entry.Revision())
if err != nil {
panic(err)
}

err = keyValue.Delete(ctx, key)
if err != nil {
panic(err)
}

// wait for watcher actions
time.Sleep(5 * time.Second)
}


And the output is:

watcer empty event
the update sequence is 1
watcer operation KeyValuePutOp key myKey revision 1 -> value "value1"
key myKey revision 1 -> value "value1"
the update sequence is 2
watcer operation KeyValuePutOp key myKey revision 2 -> value "value2"
key myKey revision 2 -> value "value2"
expected error: nats: nats: API error: code=400 err_code=10071 description=wrong last sequence: 2
watcer operation KeyValuePutOp key myKey revision 3 -> value "parallelValue"
watcer operation KeyValueDeleteOp key myKey revision 4 -> value ""



 


Sunday, July 14, 2024

Embedding NATS server in a GO application

 

In this post we examine two methods of embedding a NATS server as part of a GO application. This post is base on this video.

First we we use a NATS server that includes a listener so we can have NATS connections from the external world, and then we run NATS server without a listener.


Embedded NATS server with a Listener

This can be used when we want the application to run NATS server embedded within the GO process, and we still want to preserve the ability to get messages from external clients through the network. We can even run this NATS servers as part of a cluster.


package main

import (
"errors"
"fmt"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
"time"
)

func main() {
options := server.Options{}
natsServer, err := server.NewServer(&options)
if err != nil {
panic(err)
}

natsServer.ConfigureLogger()

go natsServer.Start()

if !natsServer.ReadyForConnections(time.Minute) {
panic(errors.New("nats server not ready"))
}

natsConnection, err := nats.Connect(natsServer.ClientURL())
if err != nil {
panic(err)
}

natsSubscription, err := natsConnection.Subscribe("queue1", func(message *nats.Msg) {
response := fmt.Sprintf("got your message: %s", string(message.Data))
err := message.Respond([]byte(response))
if err != nil {
panic(err)
}
})

if err != nil {
panic(err)
}

for i := range 10 {
message := fmt.Sprintf("My %v message", i)
response, err := natsConnection.Request("queue1", []byte(message), time.Second)
if err != nil {
panic(err)
}
fmt.Printf("got response: %v\n", string(response.Data))
}

err = natsSubscription.Unsubscribe()
if err != nil {
panic(err)
}

natsServer.WaitForShutdown()
}

And the output is:

[19234] [INF] Starting nats-server
[19234] [INF] Version: 2.10.17
[19234] [INF] Git: [not set]
[19234] [INF] Name: NCD4DRH4CVNVY44VGEPOTKKJMGM524KCUN26GQNQTXYYKCGNEXUNIZ6P
[19234] [INF] ID: NCD4DRH4CVNVY44VGEPOTKKJMGM524KCUN26GQNQTXYYKCGNEXUNIZ6P
[19234] [INF] Listening for client connections on 0.0.0.0:4222
[19234] [INF] Server is ready
got response: got your message: My 0 message
got response: got your message: My 1 message
got response: got your message: My 2 message
got response: got your message: My 3 message
got response: got your message: My 4 message
got response: got your message: My 5 message
got response: got your message: My 6 message
got response: got your message: My 7 message
got response: got your message: My 8 message
got response: got your message: My 9 message
^C[19234] [INF] Initiating Shutdown...
[19234] [INF] Server Exiting..




Embedded NATS server with NO Listener

This can be used for an application that requires NATS only for its internal message distribution, and separation between internal modules. Using in-process API instead of localhost communication is much faster.

To do this, we configure the NATS server to "DontListen", and use a in-process option for the client connection.


package main

import (
"errors"
"fmt"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
"time"
)

func main() {
serverOptions := server.Options{
DontListen: true,
}
natsServer, err := server.NewServer(&serverOptions)
if err != nil {
panic(err)
}

natsServer.ConfigureLogger()

go natsServer.Start()

if !natsServer.ReadyForConnections(time.Minute) {
panic(errors.New("nats server not ready"))
}

option := nats.InProcessServer(natsServer)
natsConnection, err := nats.Connect(natsServer.ClientURL(), option)
if err != nil {
panic(err)
}

natsSubscription, err := natsConnection.Subscribe("queue1", func(message *nats.Msg) {
response := fmt.Sprintf("got your message: %s", string(message.Data))
err := message.Respond([]byte(response))
if err != nil {
panic(err)
}
})

if err != nil {
panic(err)
}

for i := range 10 {
message := fmt.Sprintf("My %v message", i)
response, err := natsConnection.Request("queue1", []byte(message), time.Second)
if err != nil {
panic(err)
}
fmt.Printf("got response: %v\n", string(response.Data))
}

err = natsSubscription.Unsubscribe()
if err != nil {
panic(err)
}

natsServer.WaitForShutdown()
}


and the output is:

[20802] [INF] Starting nats-server
[20802] [INF] Version: 2.10.17
[20802] [INF] Git: [not set]
[20802] [INF] Name: NB2IB6QLEWN2XNPJUAVIWKRICFJUIQJ2CRQWGOC2XKV7IFO36HOK7WXM
[20802] [INF] ID: NB2IB6QLEWN2XNPJUAVIWKRICFJUIQJ2CRQWGOC2XKV7IFO36HOK7WXM
[20802] [INF] Server is ready
got response: got your message: My 0 message
got response: got your message: My 1 message
got response: got your message: My 2 message
got response: got your message: My 3 message
got response: got your message: My 4 message
got response: got your message: My 5 message
got response: got your message: My 6 message
got response: got your message: My 7 message
got response: got your message: My 8 message
got response: got your message: My 9 message
^C[20802] [INF] Initiating Shutdown...
[20802] [INF] Server Exiting..