In this post we will review the steps to use NATS key-value from a GO application.
Install NATS
The NATS key-value is a feature supplied by JetStream, hence we install NATS with JetStream enabled:
Create a config file:
config:
cluster:
enabled: true
replicas: 3
jetstream:
enabled: true
And install using helm:
helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm install nats nats/nats -f config.yaml
#check connectivity
kubectl exec -it deployment/nats-box -- nats pub test hi
Using Key-Value from GO
The following includes a key-value access with the following:
- Put, Get, and Delete operations
- Watcher to get notifications for any update of the key
- Parallel updates and blocking awareness
package main
import (
"context"
"fmt"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"time"
)
func main() {
natsConnection, err := nats.Connect("nats://nats:4222")
if err != nil {
panic(err)
}
jetStreamConnection, err := jetstream.New(natsConnection)
if err != nil {
panic(err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
keyValue, err := jetStreamConnection.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "myKeyValue",
})
if err != nil {
panic(err)
}
const key = "myKey"
// run a watcher in the background to get updates on a specific key
watcher, _ := keyValue.Watch(ctx, key)
defer watcher.Stop()
go func() {
for update := range watcher.Updates() {
if update == nil {
fmt.Printf("watcer empty event\n")
} else {
update.Operation()
fmt.Printf("watcer operation %v key %s revision %d -> value %q\n",
update.Operation().String(), update.Key(), update.Revision(), string(update.Value()))
}
}
}()
// update #1
sequence, err := keyValue.Put(ctx, key, []byte("value1"))
if err != nil {
panic(err)
}
fmt.Printf("the update sequence is %v\n", sequence)
entry, err := keyValue.Get(ctx, key)
if err != nil {
panic(err)
}
fmt.Printf("key %s revision %d -> value %q\n", entry.Key(), entry.Revision(), string(entry.Value()))
// update #2
sequence, err = keyValue.Put(ctx, key, []byte("value2"))
if err != nil {
panic(err)
}
fmt.Printf("the update sequence is %v\n", sequence)
entry, err = keyValue.Get(ctx, key)
if err != nil {
panic(err)
}
fmt.Printf("key %s revision %d -> value %q\n", entry.Key(), entry.Revision(), string(entry.Value()))
// here we see that we can block parallel updates by specifying the revision we expect to update
sequence, err = keyValue.Update(ctx, key, []byte("parallelValue"), 1)
fmt.Printf("expected error: %s\n", err)
// ony when providing the correct revision, we are allowed to update
sequence, err = keyValue.Update(ctx, key, []byte("parallelValue"), entry.Revision())
if err != nil {
panic(err)
}
err = keyValue.Delete(ctx, key)
if err != nil {
panic(err)
}
// wait for watcher actions
time.Sleep(5 * time.Second)
}
And the output is:
watcer empty event
the update sequence is 1
watcer operation KeyValuePutOp key myKey revision 1 -> value "value1"
key myKey revision 1 -> value "value1"
the update sequence is 2
watcer operation KeyValuePutOp key myKey revision 2 -> value "value2"
key myKey revision 2 -> value "value2"
expected error: nats: nats: API error: code=400 err_code=10071 description=wrong last sequence: 2
watcer operation KeyValuePutOp key myKey revision 3 -> value "parallelValue"
watcer operation KeyValueDeleteOp key myKey revision 4 -> value ""