Full Blog TOC

Full Blog Table Of Content with Keywords Available HERE

Sunday, July 21, 2024

Using NATS Key-Value from GO




In this post we will review the steps to use NATS key-value from a GO application.


Install NATS

The NATS key-value is a feature supplied by JetStream, hence we install NATS with JetStream enabled:

Create a config file:

config:
cluster:
enabled: true
replicas: 3
jetstream:
enabled: true


And install using helm:

helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm install nats nats/nats -f config.yaml
#check connectivity
kubectl exec -it deployment/nats-box -- nats pub test hi


Using Key-Value from GO


The following includes a key-value access with the following:

  1. Put, Get, and Delete operations
  2. Watcher to get notifications for any update of the key
  3. Parallel updates and blocking awareness


package main

import (
"context"
"fmt"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"time"
)

func main() {

natsConnection, err := nats.Connect("nats://nats:4222")
if err != nil {
panic(err)
}

jetStreamConnection, err := jetstream.New(natsConnection)
if err != nil {
panic(err)
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

keyValue, err := jetStreamConnection.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "myKeyValue",
})
if err != nil {
panic(err)
}

const key = "myKey"

// run a watcher in the background to get updates on a specific key
watcher, _ := keyValue.Watch(ctx, key)
defer watcher.Stop()
go func() {
for update := range watcher.Updates() {
if update == nil {
fmt.Printf("watcer empty event\n")
} else {
update.Operation()
fmt.Printf("watcer operation %v key %s revision %d -> value %q\n",
update.Operation().String(), update.Key(), update.Revision(), string(update.Value()))
}
}
}()

// update #1
sequence, err := keyValue.Put(ctx, key, []byte("value1"))
if err != nil {
panic(err)
}
fmt.Printf("the update sequence is %v\n", sequence)

entry, err := keyValue.Get(ctx, key)
if err != nil {
panic(err)
}
fmt.Printf("key %s revision %d -> value %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

// update #2
sequence, err = keyValue.Put(ctx, key, []byte("value2"))
if err != nil {
panic(err)
}
fmt.Printf("the update sequence is %v\n", sequence)
entry, err = keyValue.Get(ctx, key)
if err != nil {
panic(err)
}
fmt.Printf("key %s revision %d -> value %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

// here we see that we can block parallel updates by specifying the revision we expect to update
sequence, err = keyValue.Update(ctx, key, []byte("parallelValue"), 1)
fmt.Printf("expected error: %s\n", err)

// ony when providing the correct revision, we are allowed to update
sequence, err = keyValue.Update(ctx, key, []byte("parallelValue"), entry.Revision())
if err != nil {
panic(err)
}

err = keyValue.Delete(ctx, key)
if err != nil {
panic(err)
}

// wait for watcher actions
time.Sleep(5 * time.Second)
}


And the output is:

watcer empty event
the update sequence is 1
watcer operation KeyValuePutOp key myKey revision 1 -> value "value1"
key myKey revision 1 -> value "value1"
the update sequence is 2
watcer operation KeyValuePutOp key myKey revision 2 -> value "value2"
key myKey revision 2 -> value "value2"
expected error: nats: nats: API error: code=400 err_code=10071 description=wrong last sequence: 2
watcer operation KeyValuePutOp key myKey revision 3 -> value "parallelValue"
watcer operation KeyValueDeleteOp key myKey revision 4 -> value ""



 


No comments:

Post a Comment