Saturday, December 28, 2019

Helm 2 to Helm 3 Upgrade




I've recently updated a project to use Helm 3 instead of Helm 2.
The official article of how to migrate from Helm v2 to Helm v3 mostly refers to the steps of migrating installed releases from Helm 2 to Helm 3, but does not relate to the development steps.
So, I've listed the changes I made to the project setup and build as part of the migration.


Helm setup on the Kubernetes Cluster 


Tiller is out

Tiller is not longer used, so the setup of Helm is different.
You no longer need to run helm init.


Add the stable repo

Helm no longer has the helm init step, but this is a down side.
The helm init used to add the stable repo to the Helm repos.
In case you want to use charts from the stable repo, you need to manually add them:

helm repo add stable https://kubernetes-charts.storage.googleapis.com


The Release Lifecycle


Helm install

Helm v2 had auto generated the release name in case it was not specified.
Helm v3 requires (by default) the release name, hence the helm install syntax had changed.

Instead of:

helm install --name my-release-name ...

Use:

helm install my-release-name ...


Create Namespace

Helm v2 had created the namespace in case it did not exist.
In Helm v3 you will need to manually create it, for example:

kubectl create namespace my-namespace


Helm Delete

Helm v2 kept history of the uninstalled releases, and most of the Helm users used the --purge flag to avoid this behavior, as it prevents reinstalling of the same release (see this issue).
Helm v3 uses the purge flag by default. Also the delete command is an alias to uninstall.

So, instead of:

helm delete my-release-name --purge

Use:
helm uninstall my-release-name


Summary


In this post we have reviewed the development steps required to upgrade a project from using Helm version 2 to use Helm version 3. In general the upgrade was smooth, and not changes were required in the charts themselves. 

Wednesday, December 18, 2019

Helm init failure bypass

Notice:
This post is relevant to Helm version 2
This issue is NOT relevant to Helm version 3

Trying to install helm on a new kubernetes version fails.
Running helm init fails with the error message:

the server could not find the requested resource

Looking this error in the web, I've found the bypass in one of the helm issues.

Since I need to install Helm many times (I love to uninstall and reinstall my kubernetes cluster), I've automated this process:

helm init --output yaml > tiller.yaml
sed -i 's/extensions\/v1beta1/apps\/v1/g' tiller.yaml
sed -i '/strategy: {}/a \  selector:\n    matchLabels:\n      app: helm\n      name: tiller' tiller.yaml
kubectl apply -f tiller.yaml
rm -f tiller.yaml

And if you also want to include the permissions binding, run this as well:

kubectl create serviceaccount --namespace kube-system tiller
kubectl create clusterrolebinding tiller-cluster-rule --clusterrole=cluster-admin --serviceaccount=kube-system:tiller
kubectl patch deploy --namespace kube-system tiller-deploy -p '{"spec":{"template":{"spec":{"serviceAccount":"tiller"}}}}'

Simple copy and paste of these commands to save manual editing and updating of the kubernetes resources.

Access kubernetes API from GoLang


When implementing a GoLang based kuebernetes service, you will need to access the kubernetes API.
The problem I've encountered in this, is the following:

  • When running the service as part of a kubernetes pod, you need to initialize the kunernetes client using one API: InClusterConfig()
  • When debugging the service (for example using GoLand), you need to to initialize the kunernetes client using a second API: BuildConfigFromFlags()
The solution I've made is to use flags to the application indicating which API to use.
The default flags are for running as part of a kubernetes pod, while enabling changing the default for a debug session.

The service configuration implementation is:

import (
  "encoding/json"
  "github.com/namsral/flag"
  "os"
  "os/exec"
  "path/filepath"
)

type Configuration = struct {
  InCluster                  bool
  KubeConfig                 string
}

var Config Configuration

func InitConfig() {
  defaultKubeConfig := filepath.Join(os.Getenv("HOME"), ".kube", "config")

  flag.BoolVar(&Config.InCluster, "in-cluster", true,
   "run within a kubernetes cluster pod")
  flag.StringVar(&Config.KubeConfig, "kubeconfig", defaultKubeConfig, 
   "absolute path to the kubeconfig file")

  flag.Parse()
}


And the kubernetes client configuration is:

import (
  "k8s.io/client-go/kubernetes"
  appsV1 "k8s.io/client-go/kubernetes/typed/apps/v1"
  coreV1 "k8s.io/client-go/kubernetes/typed/core/v1"
  "k8s.io/client-go/rest"
  "k8s.io/client-go/tools/clientcmd"
)

type K8sClient struct {
  client *kubernetes.Clientset
}

func (client *K8sClient) Init() {
  var config *rest.Config
  var err error
  if Config.InCluster {
    Verbose("getting k8s config in cluster")
    config, err = rest.InClusterConfig()
  } else {
    Verbose("getting k8s config out of cluster, path %v", Config.KubeConfig)
    config, err = clientcmd.BuildConfigFromFlags("", Config.KubeConfig)
  }
  if err != nil {
    log.Fatalf("kubernetes client config failed: %v", err)
  }
  clientSet, err := kubernetes.NewForConfig(config)
  if err != nil {
    log.Fatalf("kubernetes client set failed: %v", err)
  }
  client.client = clientSet
}

That's all!

When running the service in debug mode, send the following argument:
--in-cluster=false

And you're good to go. The service will automatically use the kubernetes configuration from your home folder.

Thursday, December 12, 2019

Deploy Smart Contract on Ethereum using GoLang


This article presents all the steps required to deploy and use a contract using Go.

Few other posts about this exist:
So, I've decided creating this article to cover all the steps, hoping you will find it useful.
The article is setup for a private Ethereum network, but can be also used for the public Ethereum network.


We will review the following steps:
  1. Create a Smart Contract in Solidity
  2. Manually Compile the Contract to Go
  3. Create a Go Application 
  4. Create a 3 stages Dockerfile to compile the contract and the application 

1. Create Smart Contract in Solidity



Contracts are created using solidity. In this article, a simple contract with a call method and a transaction method. See this for explanation about the difference between a call and a transaction.

pragma solidity ^0.5.8;

contract Price {
  uint price = 100;

  function setPrice(uint newPrice) external payable  {
    price = newPrice;
  }

  function getPrice() external view returns (uint) {
    return price;
  }
}

The contract contains the following methods:

  • setPrice: a transaction method
  • getPrice: a call method

2. Manually Compile the Contract to Go



To use a contract in Go, we will need to generate a Go file for it.

First install the solc: the solidity compiler as explained here.

Next, install abigen, which is part of the geth-tools. The geth tools can be downloaded from the geth download site. For example, the current version is:
https://gethstore.blob.core.windows.net/builds/geth-alltools-linux-amd64-1.9.9-01744997.tar.gz

Now we can generate the go contract using the following commands:

solc --abi --output-dir Price.sol
solc --bin --output-dir Price.sol
abigen --bin Price.bin --abi Price.abi --pkg=price --out=Price.go

Copy the Price.go to the a folder named "generated" in the go application source folder.
In later stage we will automate this step as part of a Docker build.


3. Create a Go Application



The go application should handle the following:

  • Connect to the Ethereum network
  • Deploy the contract
  • Load the contract
  • Use the contract
To connect to the Ethereum network, we use the following:

url:= "ws://THE_ETHEREUM_NETWORK_IP_ADDRESS"
timedContext, _ := context.WithTimeout(context.Background(), 30*time.Second)
client, err := ethclient.DialContext(timedContext, url)
if err == nil {
  log.Fatalf("connection failed %v", err)
  return
}

Notice that we use a 30 seconds timeout based connection. Choose whatever timeout that fit your need. Also, replace the URL with your Ethereum  network address.
Notice that once all Ethereum actions are done, the ethclient should be close, so eventually call to:

client.Close()

To deploy the contract, we use the Price.go that we've generate before.

import (
  "crypto/ecdsa"
  "github.com/ethereum/go-ethereum/common"
  "github.com/ethereum/go-ethereum/crypto"
  "github.com/ethereum/go-ethereum/accounts/abi/bind"
  "github.com/ethereum/go-ethereum/common"
  "github.com/ethereum/go-ethereum/core/types"
  "github.com/ethereum/go-ethereum/ethclient"
  contract "sample.com/contract/generated"
)

func getAccountPrivateKey() *ecdsa.PrivateKey {
  // replace the private key here
  accountPrivateKey := "211dbaa6ca5e3fe1141eef3b00a0dd6d630a8d8e5bfbb7a7516865f1c746a3a0"
  privateKey, err := crypto.HexToECDSA(accountPrivateKey)
  if err != nil {
    log.Fatalf("private key to ECDSA failed: %v", err)
  }
  return privateKey
}

func getAccountPublicKey() common.Address {
  publicKey := getAccountPrivateKey().Public()
  publicKeyECDSA, ok := publicKey.(*ecdsa.PublicKey)
  if !ok {
    log.Fatalf("cannot assert type: publicKey is not of type *ecdsa.PublicKey")
  }

  address := crypto.PubkeyToAddress(*publicKeyECDSA)
  return address
}

func getTransactionOptions(client *ethclient.Client) *bind.TransactOpts {
  nonce, err := client.PendingNonceAt(context.Background(), getAccountPublicKey())
  if err != nil {
    log.Fatalf("get pending failed: %v", err)
  }

  gasPrice, err := client.SuggestGasPrice(context.Background())
  if err != nil {
    log.Fatalf("suggest gas price failed: %v", err)
  }

  transactOpts := bind.NewKeyedTransactor(getSystemAccountPrivateKey())
  transactOpts.Nonce = big.NewInt(int64(nonce))
  transactOpts.Value = big.NewInt(0)
  // you might need to update gas price if you have extremly large contract
  transactOpts.GasLimit = uint64(3000000)
  transactOpts.GasPrice = gasPrice
  return transactOpts
}

func Deploy(client *ethclient.Client) string {
  transactOptions := getTransactionOptions(client)

  address, transaction, _, err := contract.DeployPriceContract(transactOpts, client)
  if err != nil {
    log.Fatalf("deploy contract failed: %v", err)
  }
  
  track(transaction)

  return address.Hex()
}

To use this, update the private key to your owner used Ethereum account private key.
This account must have enough ETH balance to pay for the contract deploy.

This code also includes a call to a track(transaction) function.
We will review transaction tracking later.

Once the contract is deployed we get an address of the contract. This address should be saved out of the scope of the Ethereum, so that it can be reused by other remote/distributed clients.

Notice that the Deploy function returns an instance of the contract, and so we can use it to run the contract method. However I prefer not to use it, and instead to show how to load the contract using the contract address, since you will probably need this.

func LoadContract(client *ethclient.Client, contractAddress string) *contract.PriceContract {
  address := common.HexToAddress(contractAddress)
  instance, err := contract.NewPriceContract(address, client)
  if err != nil {
    log.Fatalf("could not load contract: %v", err)
  }
  return instance
}


Using a contract "call" method is very simple:

callOptions := bind.CallOpts{From: getSystemAccountPublicKey()}
price, err := contract.GetPrice(&callOptions)
if err != nil {
  log.Fatalf("get price failed: %v", err)
}

Using a "transaction" method is similar:

transactionOptions := getTransactionOptions(client)
transaction, err := contract.SetPrice(transactionOptions, big.NewInt(777))

if err != nil {
  Fatal("set price failed: %v", err)
}
track(transaction)

This code also includes a call to a track(transaction) function.

So, why do we need to track the transaction?
The transaction might fail. It also might not be mined at all due to low gas price.
If we want to know that the transaction is success, we need to check the status of it whenever a new block is mined.

This code section tracks the transaction:

func track(transaction *types.Transaction) {
  headers = make(chan *types.Header)
  var err error
  subscription, err = client.SubscribeNewHead(context.Background(), headers)
  if err != nil {
    log.Fatalf("subscribe to read blocks failed: %v", err)
  }
  defer subscription.Unsubscribe()

  for {
    select {
    case err := <-subscription.Err():
      log.Fatalf("subscription failed: %v", err)
    case <-headers:
      // got block, checking transaction
      transactionLocated := checkTransactionOnce(transaction)
      if transactionLocated {
        return 
      }
    case <-time.After(60*time.Second):
      log.Fatalf("timeout waiting for transaction")
    }
  }
}

func (wrapper *EthereumTransactionWrapper) checkTransactionOnce(transaction *types.Transaction) bool {
  _, pending, err := Client.TransactionByHash(context.Background(), transaction.Hash())
  if err != nil {
    log.Fatalf("get transaction failed: %v", err)
  }
  if pending {
    return false
  }

  receipt, err := client.TransactionReceipt(context.Background(), transaction.Hash())
  if err != nil {
    log.Fatalf("transaction receipt failed: %v", err)
  }
  if receipt.Status == 1 {
    return true
  }
  log.Fatalf("transaction failed with logs %v", receipt.Logs)
  // dead code
  return true
}

4. Create a Dockerfile



The docker file is a multi-stage docker file.
You might want to review: Use cache in a docker multi stage build for faster builds.

The Dockerfile includes 3 steps:

  1. compile the contracts
  2. compile the go application
  3. package the compiled go application

#==================
# Stage1: contracts
#==================

FROM ubuntu:18.04 as contracts-compiler

# install wget
RUN apt-get update && \
    apt-get install -y software-properties-common wget && \
    rm -rf /var/lib/apt/lists/*

# install solc
RUN add-apt-repository ppa:ethereum/ethereum && \
    apt-get update && \
    apt-get install -y solc

# install geth tools
RUN mkdir /geth_extract
ARG SOLIDITY_ALL_TOOLS
RUN wget --progress=dot:giga https://gethstore.blob.core.windows.net/builds/${SOLIDITY_ALL_TOOLS} -O /geth_extract/tools.tar.gz
RUN tar xvzf /geth_extract/tools.tar.gz -C /geth_extract
RUN rm /geth_extract/tools.tar.gz
RUN mv /geth_extract/* /geth_extract/extracted
RUN mv -v /geth_extract/extracted/* /usr/local/bin
RUN rm -rf /geth_extract

ADD ./src/PriceContract.sol /contracts/PriceContract.sol
WORKDIR /contracts
RUN solc --abi --output-dir /compiled PriceContract.sol
RUN solc --bin --output-dir /compiled PriceContract.sol

WORKDIR /compiled
RUN abigen --bin PriceContract.bin --abi PriceContract.abi --pkg=bouncer_contract --out=PriceContract.go

#================
# Stage2: compile
#================

FROM golang:1.12 AS go-compiler

RUN apt-get update && \
    apt-get install -y git

WORKDIR /src
ENV GOPATH=/go
ENV GOBIN=/go/bin

# get dependencies
COPY ["./src/go.mod", "./src/go.sum", "/src/"]
RUN GOARCH=amd64 GOOS=linux CGO_ENABLED=0 go mod download

# compile source
ADD ./src /src

# copy generated contract
RUN mkdir -p /src/internal/generated
COPY --from=contracts-compiler /compiled/PriceContract.go /src/generated/PriceContract.go

RUN GOARCH=amd64 GOOS=linux CGO_ENABLED=0 go build -a -installsuffix cgo -o my-go-application

#================
# Stage3: package
#================

FROM ubuntu:18.04
COPY --from=go-compiler /src/my-go-application /my-go-application
WORKDIR /
ENTRYPOINT ["/my-go-application"]


Summary

In this article we have reviewed how to use Ethereum contract in a Go based application, include deploy of the contract, using the contract methods, and building the application.
We've included a method of tracking the contract transactions.

Wednesday, December 4, 2019

Creating a kubernetes controller


This post explains about a kubernetes controller that I've created.
I've had reviewed several guides for kubernetes controller creation, such as Extending Kubernetes, Kubernetes Custom Controller, and this. I've found myself overwhelmed by the complexity of such a simple requirement. This post is the present the KISS gist of my blog. When you need to perform a task, ask yourself, does it really need to be so complex? Do I need all this monster for a simple task?

Many controllers are based on kubernetes Custom Resource Definition (aka CRD).
CRDs are indeed nice, but make the implementation much more complex, as you will have to use code generator tools.
If this is an internal solution, do you really need CRD? Why not use a simple ConfigMap?

In the solution presented below, I am using a ConfigMap that configures the number of StatefulSets to create. The ConfigMap also contains the template that will be used for the StatefulSet creation.
An example of the ConfigMap is:

apiVersion: v1
kind: ConfigMap
metadata:
  labels:
    resource-type: "primary"
  name: primary-config
  namespace: default
data:
  general.yaml: |-
    count: 2
  statefulSet.yaml: "apiVersion: apps/v1\nkind: StatefulSet\nmetadata:\n  name: my-statefulset-___sequence___\n
    \ labels:\n    resource-type: \"secondary\"    \n    app.kubernetes.io/instance
    : bouncer\n    app.kubernetes.io/name : bouncer\nspec:\n  
 
 ...

The controller main code is the merge logic, which fetch the primary ConfigMap, and creates/delete StatefulSet accordingly. This is done by the following logic:

  • Get the primary ConfigMap
  • Load the existing StatefulSets
  • Loop until the required count in the primary ConfigMap
    • Create the StatefulSet if it does not exist
  • Delete all StatefulSets that were not encountered in the loop above

package mypackage

import (
  "fmt"
  "gopkg.in/yaml.v2"
  apps "k8s.io/api/apps/v1"
  core "k8s.io/api/core/v1"
  "k8s.io/client-go/kubernetes/scheme"
  "strconv"
  "strings"
  "sync"
)

type Merger struct {
  updates          chan int
  api              *K8sApi
  primaryConfig   *core.ConfigMap
  currentEntities  []apps.StatefulSet
  requiredEntities []apps.StatefulSet
}

func (merger *Merger) Init() {
  merger.updates = make(chan int, 100)
  go func() {
    for {
      select {
      case update := <-merger.updates:
        for len(merger.updates) > 0 {
          <-merger.updates
        }
        merger.merge()
      }
    }
  }()
}

func (merger *Merger) NotifyUpdate() {
  merger.updates <- MessageUpdate
}

func (merger *Merger) merge() {
  configMaps := merger.api.configMap.FetchByLabels("resource-type=primary")
  merger.primaryConfig = &configMaps[0]

  merger.currentEntities = merger.api.statefulSet.FetchByLabels("resource-type=secondary")
  merger.requiredEntities = []apps.StatefulSet{}
  chains := merger.getRequiredCount()
  for i := 0; i < chains; i++ {
    merger.mergeSequence(i)
  }
  merger.deleteNonUsedStatefulSets()
}

func (merger *Merger) deleteNonUsedStatefulSets() {
  usedNames := map[string]bool{}
  for _, s := range merger.requiredEntities {
    usedNames[s.Name] = true
  }
  for _, s := range merger.currentEntities {
    if !usedNames[s.Name] {
      merger.api.statefulSet.Delete(s.Name)
    }
  }
}

func (merger *Merger) mergeSequence(sequence int) {
  statefulSetText := merger.getStatefulSetUpdatedText(sequence, name)
  decode := scheme.Codecs.UniversalDeserializer().Decode
  obj, _, err := decode([]byte(statefulSetText), nil, nil)
  if err != nil {
    Fatal("decode statefulSetText failed, %v", err)
  }
  statefulSet, ok := obj.(*apps.StatefulSet)
  if !ok {
    Fatal("cast statefulSet failed")
  }

  merger.requiredEntities = append(merger.requiredEntities, *statefulSet)
  if merger.statefulSetExists(statefulSet.Name) {
    return
  }

  merger.api.statefulSet.Create(statefulSet)
}

func (merger *Merger) getStatefulSetUpdatedText(sequence int, name string) string {
  statefulSet := merger.primaryConfig.Data["statefulSet.yaml"]
  statefulSet = merger.templateReplace(statefulSet, "sequence", strconv.Itoa(sequence))
  statefulSet = merger.templateReplace(statefulSet, "name", name)
  replacements := merger.getGeneralConfig()[name]
  return statefulSet
}

func (merger *Merger) templateReplace(text string, from string, to string) string {
  return strings.Replace(text, fmt.Sprintf("___%v___", from), to, -1)

}

func (merger *Merger) getRequiredCount() int {
  generalConfig := merger.getGeneralConfig()
  countStr := generalConfig["count"]
  count, err := strconv.Atoi(fmt.Sprintf("%v", countStr))
  if err != nil {
    Fatal("parse of count %v failed %v", countStr, err)
  }
  return count
}

func (merger *Merger) getGeneralConfig() map[interface{}]interface{} {
  general := merger.primaryConfig.Data["general.yaml"]
  generalConfig := make(map[interface{}]interface{})
  err := yaml.Unmarshal([]byte(general), &generalConfig)
  if err != nil {
    Fatal("error parsing configMap yaml %v", err)
  }
  return generalConfig
}

func (merger *Merger) statefulSetExists(name string) bool {
  for _, statefulSet := range merger.currentEntities.statefulSets {
    if statefulSet.Name == name {
      return true
    }
  }
  return false
}

All we need now is to activate the NotifyUpdate upon change.
For this purpose we use the kubernetes watch API. We should watch for both the ConfigMap, and for StatefulSet changes. Watch for the StatefuleSet for example is done by the following:

func (api *K8sStatefulSetApi) WatchByLabels(labels string, notifier *Notifier) {
  listOptions := meta.ListOptions{
    LabelSelector: labels,
  }
  watcher, err := client.AppsV1().StatefulSets("default").Watch(listOptions)
  if err != nil {
    Fatal("watch kubernetes statefulSet failed: %v", err)
  }
  ch := watcher.ResultChan()
  go func() {
    Verbose("watcher loop for statefulSet established")
    for event := range ch {
      statefulSet, ok := event.Object.(*apps.StatefulSet)
      if !ok {
        Fatal("unexpected event in watcher channel")
      }

      Verbose("got event for statefulSet %v", statefulSet.Name)
      (*notifier)()
    }
  }()
}


Summary

In this post a simple kubernetes controller based on a ConfigMap was presented.
The primary resource is the configMap, marked by the label: "resource-type=primary"
The secondary resources are the StatefulSets, marked by the label: "resource-type=secondary"
This controller does not use CRD, hence it does not require any code generation tools, and allows KISS implementation.


Use cache in a docker multi stage build


Docker multi stage build (see docker official documentation) provides ability to both compile and create final image in a single docker file.
The logic is as follows:

Stage 1:
  • Use the related compiler docker image
  • Compile the sources
  • Create the final executable binary file

Stage 2:
  • Use the production related docker image (e.g. ubuntu/alpine)
  • Copy the executable binary file from stage 1
  • Run any additional required final image preparation steps


For example, a two-stages build for a GO image is:
# Stage1: compile
FROM golang:1.12 AS build-env
RUN apt-get update && apt-get install -y git

WORKDIR /src
ENV GOPATH=/go
ENV GOBIN=/go/bin
COPY ["./src/go.mod", "./src/go.sum", "/src/"]
RUN GOARCH=amd64 GOOS=linux CGO_ENABLED=0 go mod download
ADD ./src /src
RUN GOARCH=amd64 GOOS=linux CGO_ENABLED=0 go build -a -installsuffix cgo -o final-binary

# Stage2: package
FROM ubuntu:18.04
COPY --from=build-env /src/my-final-binary /final-binary
WORKDIR /
ENTRYPOINT ["/final-binary"]

However, when running the docker build, I've noticed that the first stage keeps running every time for a long time, even that I did not change anything in the source.
It is due to the fact that docker multi stage build does not keep cache for the intermediate steps, as described here.

The solution I've used is to split the docker build into two builds.
The first build docker image is not used, but exists only for caching purpose.
So the actual final build contains the following files:

  • Dockerfile_stage1
  • Dockerfile_stage2
  • build.sh
Let's review the files. First, the Dockerfile_stage1, which includes the first section of the original Dockerfile.

# Stage1: compile
FROM golang:1.12 AS build-env
RUN apt-get update && apt-get install -y git

WORKDIR /src
ENV GOPATH=/go
ENV GOBIN=/go/bin
COPY ["./src/go.mod", "./src/go.sum", "/src/"]
RUN GOARCH=amd64 GOOS=linux CGO_ENABLED=0 go mod download
ADD ./src /src
RUN GOARCH=amd64 GOOS=linux CGO_ENABLED=0 go build -a -installsuffix cgo -o final-binary

Next, the Dockerfile_stage2, which includes the second section of the original Dockerfile.

# Stage2: package
FROM ubuntu:18.04
COPY --from=build-env /src/my-final-binary /final-binary
WORKDIR /
ENTRYPOINT ["/final-binary"]

And last, the build.sh that does all the magic.

#!/usr/bin/env bash
docker build -d Dockerfile_stage1 -t "local/myimage-stage1:latest"

cat Dockerfile_stage1 > Dockerfile_full
cat Dockerfile_stage2 >> Dockerfile_full

docker build -d Dockerfile_full -t "local/myimage:latest"

rm -f Dockerfile_full


Summary

The docker not caching multi stage builds issue can be simply bypassed easily using the method described above. The method also avoids using complicated steps as described in other solutions, and it prevents from using duplicate code among files.



Wednesday, November 27, 2019

MongoDB ReplicaSet on kubernetes



Recently I wanted to use MongoDB ReplicaSet on kubernetes.
I did not want to use MongoDB cluster, as it is too complex to configure and maintain in a kubernetes environment. I also did not need high MongoDB performance, so sharding was also not required.

Gladly, I've found a Helm chart in the official helm chart github (see here).
It even included special handling for kubernetes pod initialization, in a dedicated script: on-init.sh

However, I've found this is not working.



There were several issues. For example:

  1. Once a MongoDB pod was added to a ReplicaSet configuration, it was never removed. This causes several problems. For example, inability to get quorum once a pod had been restarted and got a different IP.
  2. In case a MongoDB pod had started, but was not able to communicate with the previous pods, it started its own new ReplicaSet, instead of failing.
  3. A single MongoDB pod, which was restarted, and got a new IP, was not able to start the ReplicaSet, and got error that it is not part of its own replica.



I've created my own init container to manage the ReplicaSet configuration. 
It was inspired by the on-init.sh script.
I've decided to use nodeJS, as the logic was too much for a simple shell script.

First, I've created a StatefulSet:

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: mongodb-statefulset
spec:
  replicas: 3
  selector:
    matchLabels:
      configid: mongodb-container
  template:
    metadata:
      labels:
        configid: mongodb-container
    spec:
      serviceAccountName: mongodb-service-account
      initContainers:
        - name: init
          image: LOCACL_REGISTRY/mongo-init/dev:latest
          env:
            - name: POD_IP
              valueFrom:
                fieldRef:
                  fieldPath: status.podIP
          volumeMounts:
            - name: persist-data
              mountPath: /data
              readOnly: false
      containers:
        - name: mongodb
          image: mongo:4.2.1
          imagePullPolicy: Always
          command:
            - mongod
          args:
            - --config=/mongo-config/mongo.conf
            - --dbpath=/data
            - --replSet=mongoreplica
            - --port=27017
            - --bind_ip=0.0.0.0
          volumeMounts:
            - name: persist-data
              mountPath: /data
              readOnly: false
            - name: mongo-config
              mountPath: /mongo-config
      volumes:
        - name: mongo-config
          configMap:
            name: mongo-config
  volumeClaimTemplates:
    - metadata:
        name: persist-data
      spec:
        accessModes: [ "ReadWriteOnce" ]
        storageClassName: "hostpath"

The StatefulSet includes an init container which runs the nodeJS code to configure the ReplicaSet (this will be covered later in this article).
In addition, it includes a volumeClaimTemplate that allocates the same storage for the related pod even it is restarted.
Also, a config map with mongo.conf file is included. I've currently used an empty file, as I require only the default configuration.

Notice that the StatefulSet includes a serviceAccountName. This account should be granted with permissions to list and get pods.

Now, let review the mongo-init container.
This includes a nodeJS application to configure the MongoDB ReplicaSet.




The following general logic is implemented:

  • Start Mongo DB
  • Run kubectl to get all  related pod IPs
  • Check each of the pods, to located the Mongo DB primary
  • If primary is located, and it is myself, we're done
  • If primary is located, and it is another pod, find my configuration index according to the pod name, e.g. pod name mongodo-statefulset-4 is index#4. If this index exists in the configuration, update the IP in the existing replica. Otherwise, add new secondary.
  • If primary is not located, start a new ReplicaSet.
  • Stop Mongo DB, and wait for shutdown, allowing it to save the updated configuratoin


const podIp = process.env['POD_IP']
const podName = process.env['HOSTNAME']
const mongoReplicaSetName = 'mongoreplica'
const mongoPort = 27017'
const addSecondaryRetries = 20

init()

async function init() {
  await startMongo()
  await configureReplicaSet()
  await stopMongo()
}

async function startMongo() {
  const command = `mongod --config=files/mongo.conf --dbpath=files/data --replSet=${mongoReplicaSetName} --port=${mongoPort} --bind_ip=0.0.0.0`
  const options = {
    checkError: false,
    waitForCompletion: false,
    streamOutputCallback: mongoOutputCallback,
  }
  const promiseWrapper = await runCommand(command, options)
  const commandPromise = promiseWrapper.promise

  const startedPromise = waitForPing()

  const result = await Promise.race([commandPromise, startedPromise])
  if (!result.mongoResponse) {
    throw new Error(`mongo start failed`)
  }
}

function mongoOutputCallback(data) {
  data.split('\n').forEach(line => {
    line = line.trim()
    if (line.length > 0) {
      console.log(`[MONGO] ${line}`)
    }
  })
}

async function stopMongo() {
  const mongoCommand = `db.shutdownServer({force: true})`
  await runMongoAdmin(false, '127.0.0.1', mongoCommand)
  await waitForMongoStop()
}

async function waitForMongoStop() {
  while (true) {
    const processes = await runCommand('ps -ef | grep mongo | grep -v grep', {checkError: false})
    if (processes.trim().length === 0) {
      return
    }

    await sleep(1000)
  }
}

async function configureReplicaSet() {
  const primary = await findPrimaryNode()
  if (primary) {
    await configureReplicaWithExistingPrimary(primary)
  } else {
    if (await isSecondaryPodLocated()) {
      throw new Error('secondary pod located, unable to start until primary located')
    }
    if (!await isReplicaConfigured()) {
      await createNewReplicaSet()
    }
  }
}

async function configureReplicaWithExistingPrimary(primary) {
  if (primary === podIp) {
    return
  }

  const memberIndex = await findMemberIndex(primary)
  if (memberIndex === null) {
    await addAsSecondary(primary)
  } else {
    await updateMemberAddress(primary, memberIndex)
  }
  await waitUntilSecondaryReady()
}

async function updateMemberAddress(primary, memberIndex) {
  const mongoCommand = `c=rs.conf(); c.members[${memberIndex}].host='${podIp}'; rs.reconfig(c)`
  await runMongoAdmin(true, primary, mongoCommand)
}

async function findMemberIndex(primary) {
  const configuration = await getReplicaConfiguration(primary)
  const members = configuration.members
  const podSuffix = parseInt(podName.substring(podName.lastIndexOf('-') + 1))

  for (let i = 0; i < members.length; i++) {
    const member = members[i]
    const memberId = parseInt(member['_id'])
    if (memberId === podSuffix) {
      return i
    }
  }
  return null
}

async function getReplicaConfiguration(primary) {
  let configurationText = await runMongoAdmin(true, primary, `rs.conf()`)
  configurationText = configurationText.replace(/NumberLong\((\d+)\)/g, '$1')
  configurationText = configurationText.replace(/ObjectId\("(\S+)"\)/g, '"$1"')
  return JSON.parse(configurationText)
}

async function isReplicaConfigured() {
  const result = await runMongoAdmin(false, '127.0.0.1', 'rs.status()')
  const configured = !result.includes('no replset config has been received')
  return configured
}

async function addAsSecondary(primary) {
  for (let i = 1; i <= addSecondaryRetries; i++) {
    try {
      await addAsSecondaryOnce(primary)
      return
    } catch (e) {
      console.log(`add secondary node failed: ${e.stack}`)
      if (i < addSecondaryRetries ) {
        console.log(`retry #${i} in a moment`)
        await sleep(10000)
      }
    }
  }
  throw new Error(`add node as secondary failed after ${addSecondaryRetries} retries`)
}

async function addAsSecondaryOnce(primary) {
  const mongoCommand = `rs.add('${podIp}:${mongoPort}')`
  const result = await runMongoAdmin(true, primary, mongoCommand)
  if (result.includes(`Quorum check failed`)) {
    throw new Error('add node as secondary failed')
  }
}

async function createNewReplicaSet() {
  const mongoCommand = `rs.initiate({'_id': '${mongoReplicaSetName}', 'members': [{'_id': 0, 'host': '${podIp}'}]})`
  try {
    await runMongoAdmin(true, '127.0.0.1', mongoCommand)
  } catch (e) {
    // replica set configuration might popup ony now, so we recheck the replica set status
    console.log(`create replica failed: ${e.stack}`)
    if (!await reconfigureReplicaSetIfPossible()) {
      throw e
    }
  }
  await waitForMasterReady('127.0.0.1')
}

async function reconfigureReplicaSetIfPossible() {
  const result = await runMongoAdmin(true, '127.0.0.1', `rs.status()`)
  if (!result.includes('we are not a member of it')) {
    return false
  }

  await reconfigureReplicaSet()
  return true
}

async function reconfigureReplicaSet() {
  const mongoCommand = `\
    c=rs.conf(); \
    c.members.splice(1); \
    c.members[0].host='${podIp}'; \
    rs.reconfig(c, {force: true}) \
  `

  await runMongoAdmin(true, '127.0.0.1', mongoCommand)
}

async function findPrimaryNode() {
  const ips = await getPodsIps()
  for (let i = 0; i < ips.length; i++) {
    const ip = ips[i]
    if (await isPrimary(ip)) {
      return ip
    }
  }
}

async function isSecondaryPodLocated() {
  const ips = await getPodsIps()
  for (let i = 0; i < ips.length; i++) {
    const ip = ips[i]
    if (await isSecondary(ip)) {
      return true
    }
  }
  return false
}

async function isSecondary(ip) {
  const state = await runMongoAdmin(false, ip, 'rs.status().myState')
  return state === '2'
}

async function isPrimary(ip) {
  const state = await runMongoAdmin(false, ip, 'rs.status().myState')
  if (state !== '1') {
    return false
  }

  await waitForMasterReady(ip)
  return true
}

async function getPodsIps() {
  let args = `get pods -l configid=mongodb-container -o jsonpath='{range.items[*]}{.status.podIP} '`
  const stdout = await kubectl.runKubectl(true, args)
  const ips = []
  stdout.trim().split(' ').forEach(ip => {
    if (ip.trim().length > 0) {
      ips.push(ip)
    }
  })
  return ips
}

async function waitForPing() {
  return await runMongoAdminUntilResponse('127.0.0.1', `db.adminCommand('ping').ok`, '1')
}

async function waitForMasterReady(host) {
  return await runMongoAdminUntilResponse(host, `db.isMaster().ismaster`, 'true')
}

async function waitUntilSecondaryReady() {
  return await runMongoAdminUntilResponse('127.0.0.1', `rs.status().myState`, '2')
}

async function runMongoAdminUntilResponse(host, mongoCommand, expectedResult) {
  const startTime = new Date().getTime()
  while (true) {
    const result = await runMongoAdmin(false, host, mongoCommand)
    if (result === expectedResult) {
      break
    }

    const passedTime = new Date().getTime() - startTime
    if (passedTime > 120000) {
      throw new Error(`timeout waiting for good response from command: ${mongoCommand}\nLast response was: ${result}`)
    }
    await sleep(1000)
  }
  return {
    mongoResponse: true,
  }
}

async function runMongoAdmin(checkError, host, mongoCommand) {
  const commandLine = `mongo admin --host ${host} --quiet --eval "${mongoCommand}"`
  let result = await runCommand(commandLine, {checkError: checkError})
  result = result.trim()
  return result.trim()
}


The code uses the following helpers:


const {exec} = require('child_process')
async function runCommand(commandLine, options) {
  if (options === undefined) {
    options = {}
  }
  if (options.checkError === undefined) {
    options.checkError = true
  }
  if (options.waitForCompletion === undefined) {
    options.waitForCompletion = true
  }
  const promise = new Promise((resolve, reject) => {

    const execHandler = exec(commandLine, (err, stdout, stderr) => {
      let result = ''
      if (stdout) {
        result += stdout
      }
      if (stderr) {
        result += stderr
      }

      if (err) {
        if (options.checkError) {
          reject(err)
        } else {
          resolve(result)
        }
        return
      }

      resolve(result)
    })

    if (options.streamOutputCallback) {
      execHandler.stdout.on('data', (data) => {
        options.streamOutputCallback(data)
      })
    }
  })

  if (options.waitForCompletion) {
    return await promise
  }

  return {promise: promise}
}



async function sleep(time) {
  await new Promise((resolve) => {
    setTimeout(() => {
      resolve()
    }, time)
  })
}

Summary

I've successfully run MongoDB ReplicaSet on kubernetes.
The ReplicaSet has proven to recover from both partial and full restart of the pods.




Wednesday, November 20, 2019

redis cluster survivability in kubernetes

A week ago, I've published an article of how to deploy redis cluster on kubernetes.
In the article, I've added an entrypoint.sh, which handles the IP change of the redis pod in case it is restarted.

But, that's not enough.

In case the entire redis cluster pods are restarted, the redis pods will no longer be able to communicate with each other.

Why?

Each redis pods holds a nodes.conf file, which includes list of the redis nodes. Each line in the file contains the redis node ID, and the redis node IP. If we restart all of the redis cluster nodes, all nodes IPs are changing, hence the redis node cannot re-establish connection with then according to the out of date IPs in the nodes.conf.

How can we solve this?



The general solution is as follows:

  1. Create a kubernetes config map holding mapping of resdis node ID to kubernetes pod name.
    The kubernetes pod name is assured not to change, since we are using a kubernetes StatefulSet. An example of such config is:

    79315a4ceef00496afc8fa7a97874e5b71dc547b  redis-statefulset-1
    b4d9be9e397d19c63bce602a8661b85ccc6e2d1d redis-statefulset-2
  2. Upon each redis pod startup read the pods config map, and find the new IP by the pod name, and then replace it in the nodes.conf. This can be done as a nodejs initContainer running in the redis node before the redis container. This container should be have the pods config map mapped a volume, for example under /nodes.txt.
The redis config update init container code is below.
const fs = require('fs')

init()

async function init() {
  const nodesConfPath = '/data/nodes.conf'
  if (!fs.existsSync(nodesConfPath)) {
    return
  }

  const configuration = fs.readFileSync(nodesConfPath, 'utf8')
  const updatedConfiguration = await updateConfiguration(configuration)
  fs.writeFileSync(nodesConfOutputPath, updatedConfiguration)
}

async function updateConfiguration(configuration) {
  const lines = []
  const configLines = configuration.split('\n')
  for (let i=0; i 0) {
      lines.push(await updateConfigurationLine(line))
    }
  })
  return lines.join('\n')
}

async function updateConfigurationLine(line) {
  const sections = line.match(/(\S+) (\S+)(:.*)/)
  if (sections == null) {
    return line
  }
  const nodeId = sections[1]
  const nodeIp = sections[2]
  const other = sections[3]
  const currentNodeIp = await getCurrentNodeIp(nodeId, nodeIp)
  return `${nodeId} ${currentNodeIp}${other}`
}

async function getCurrentNodeIp(nodeId, nodeIp) {
  const nodesPods = fs.readFileSync(nodesPath, 'utf8')
  const nodesPodsLines = nodesPods.split('\n')
  for (let i=0; i< nodesPodsLines.length; i++) {
    const line = nodesPodsLines[i].trim()
    if (line.length > 0) {
      const sections = line.split(' ')
      const configuredNodeId = sections[0]
      const configuredPodName = sections[1]
      if (configuredNodeId === nodeId) {
        const existingNodeIp = await fetchPodIpByName(configuredPodName)
        if (existingNodeIp != null) {
          nodeIp = existingNodeIp
        }
      }
    }
  })

  return nodeIp
}

async function fetchPodIpByName(podName) {
  const jsonParse = '{.status.podIP}'
  const args = `get pods ${podName} -o jsonpath='${jsonParse}'`
  const stdout = await kubectl(args)
  const ip = stdout.match(/(\d+\.\d+\.\d+\.\d+)/)
  if (ip) {
    return ip[1]
  }

  return null
}


async function kubectl(args) {
  return await new Promise((resolve, reject) => {
    const commandLine = `kubectl ${args}`
    exec(commandLine, (err, stdout, stderr) => {
      if (err) {
        reject(err)
        return
      }
      resolve(stdout)
    })
  })
}



Summary

Using the IPs updater init container, in combination with the config map, allows redis cluster to fully recover from both full and partial restarts. Notice that the init container should be granted with permissions to execute list and get for the pods resource.


Don't use ethereum bootnode



Ethereum documentation recommends using bootnode for private ethereum networks.
Once the boot node is installed, any geth based miner specifies the boot node ip and key, for example:

geth --bootnodes "enode:// BOOT_NOT_KEY @ BOOT_NODE_IP :30301"

This way, the boot node is notified of any new miner that starts, and update the new miner, using the gossip protocol, of any other existing miners, hence allowing the new miner to join the ethereum network.

Sounds fine, right?

Well... Not perfect.

This design works only as long as the boot node is alive.
Once the boot node crashes, new nodes cannot connect to the ethereum network.
The boot node is a single point of failure.

You can state: that's fine. Run the boot node as part of kubernetes, or docker swarm, and let it automatically restart the boot node if it crashes.

But let's examine this:
The boot node will start, from scratch. No miner peers to talk with, so all the existing miners will be part of the old ethereum network, while new miners will establish a new ethereum network, separated from the old one.

What is the solution?

The solution involves storing a set of all miners in an external database, such as MongoDB, or in case running in kubernetes, the kubernetes ConfigMap can be used.
Wrap each miner startup with the following logic:

  1. Generate node key using the bootnode -genkey command
  2. Store the current miner enode address in the external set of miners. The enode address is in format: enode://NODE_KEY@NODE_IP:30303
  3. Fetch all enodes addresses from the external set of miners, and run geth using all the addresses separated by comma. For example:
    geth --bootnodes "enode://BOOT_NOT_KEY1@BOOT_NODE_IP1 :30301,enode://BOOT_NOT_KEY2@BOOT_NODE_IP2 :30301"

Summary

Using the external set of miners has multiple advantages:
  1. There is no single point of failure
  2. Miners always join the same ethereum network, and do not create a separate one
  3. The bootnode process is no longer required (though we do need an external location to save the set of miners)


Wednesday, November 13, 2019

Deploy redis cluster on kubernetes








Notice: See an update for this issue in the next article: redis survivability in kubernetes




Redis is an in-memory data store that can be used as cache and message broker.
It is very useful in a kubernetes environment where multiple replicas of one deployment might need to save/pass information that will later be used by replicas of another deployment.

To deploy redis on kubernetes, I've used a simple working implementation.
This article presents the steps I've made:

  • Create a docker image to wrap the redis, and add some files
  • Create a kubernetes statefulset
  • Create a kubernetes service




The Docker Image 


A docker image is required to wrap the original redis docker images with some extensions.
Notice that you can do this even without creating your own image, but instead by updating the changes using the kubernetes deployment. However, I've found it more clear to create a dedicated image.

Dockerfile:

FROM redis:5.0.6
COPY files /
ENTRYPOINT /entrypoint.sh

The 'files' folder contains two files:

redis.conf:

  • configuring the instance as part of cluster enabled 
  • specifying path of the redis configuration files

port 6379
cluster-enabled yes
cluster-require-full-coverage yes
cluster-node-timeout 5000
appendonly yes
cluster-config-file /data/HOSTNAME/nodes.conf
entrypoint.sh:

  • The entrypoint script replaces the IP in the nodes.conf. This is required to allow redis to identify itself in case of a pod restart. The script itself was originated from this issue.
  • Once the IP address is handled, start the redis server

#!/usr/bin/env bash
set -e
HOST_FOLDER=/data/${HOSTNAME}
CLUSTER_CONFIG="${HOST_FOLDER}/nodes.conf"
mkdir -p ${HOST_FOLDER}

if [[ -f ${CLUSTER_CONFIG} ]]; then
  if [ -z "${POD_IP}" ]; then
    echo "Unable to determine Pod IP address!"
    exit 1
  fi
  echo "Updating my IP to ${POD_IP} in ${CLUSTER_CONFIG}"
  sed -i.bak -e "/myself/ s/[0-9]\{1,3\}\.[0-9]\{1,3\}\.[0-9]\{1,3\}\.[0-9]\{1,3\}/${POD_IP}/" ${CLUSTER_CONFIG}
fi

sed -i "s/HOSTNAME/${HOSTNAME}/g" /redis.conf
exec /usr/local/bin/redis-server /redis.conf


The Kubernetes StatefulSet







A kubernetes StatefulSet (unlike deployment) is required to ensure that the pod host name remains the same between restarts. This allows the entrypoint.sh script (mentioned above) to replace the IP based on the (Pod) host name.

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: redis-statefulset
spec:
  serviceName: redis-service
  podManagementPolicy: "Parallel"
  replicas: 6
  selector:
    matchLabels:
      configid: redis-container
  template:
    metadata:
      labels:
        configid: redis-container        
    spec:
      containers:
      - name: redis
        image: my-registry/my-redis:latest
        env:
        - name: POD_IP
          valueFrom:
            fieldRef:
              fieldPath: status.podIP
        volumeMounts:
        - name: redis-data
          mountPath: /data
          readOnly: false
        livenessProbe:
          timeoutSeconds: 5
          successThreshold: 1
          failureThreshold: 3
          initialDelaySeconds: 5
          periodSeconds: 10
          exec:
            command:
              - /usr/local/bin/redis-cli
              - ping
        readinessProbe:
          timeoutSeconds: 5
          successThreshold: 1
          failureThreshold: 1
          initialDelaySeconds: 5
          periodSeconds: 10
          exec:
            command:
              - /usr/local/bin/redis-cli
              - ping
      volumes:
      - name: redis-data
        hostPath:
          path: /opt/redis


In this case 6 replicas are used, so we will have 3 masters and 3 slaves.
We also include liveness and readiness probes. These only check that the redis instance is alive, but do not check the cluster health (out of scope for this article).
The image specified as my-registry/my-redis should point to the docker image, which was created at the previous step.




The Kubernetes Service









The service exposes 2 ports:

  • 6379 is used by clients connecting to the redis cluster
  • 16379 is used by the redis instances for the cluster management
apiVersion: v1
kind: Service
metadata:
  name: redis-service
  labels:    
spec:
  selector:
    configid: redis-container
  type: NodePort
  ports:
      - port: 6379
        targetPort: 6379
        name: clients
        protocol: TCP
        nodePort: 30002
      - port: 16379
        targetPort: 16379
        name: gossip
        protocol: TCP

Redis Cluster Initialization

The last step is to tell redis to create cluster.
This can be done, for example, as part of a helm post install script.
I've used a javascript to wait for all of the redis instances to start, and then configure the cluster using kubectl exec on the first redis pod.

const {exec} = require('child_process')

const redisPods = 6


async function kubectl(args) {
  return await new Promise((resolve, reject) => {
    const commandLine = `kubectl ${args}`
    exec(commandLine, (err, stdout, stderr) => {
      if (err) {
        reject(err)
        return
      }
      resolve(stdout)
    })
  })
}

async function getRedisPods() {
  const args = `get pods -l configid=redis-container -o jsonpath='{range.items[*]}{.status.podIP} '`
  const stdout = await kubectl(args)
  return stdout.trim().split(' ')
}

async function executeClusterCreate(pods) {
  let redisNodes = ''
  pods.forEach(p => {
    redisNodes += ` ${p}:6379`
  })

  const command = 'exec redis-statefulset-0 ' +
    '-- redis-cli --cluster create --cluster-replicas 1 --cluster-yes ' +
    redisNodes

  const createResult = await kubectl(command)
  if (createResult.includes('[OK] All 16384 slots covered')) {
    return true
  }
  return false
}

async function configureRedisCluster() {
  while (true) {
    const pods = await getRedisPods()

    if (pods.length !== redisPods) {
      continue
    }

    if (!await executeClusterCreate(pods)) {
      console.warn(`create cluster failed, retrying in a moment`)
      await timeUtil.sleep(1000)
      continue
    }

    return
  }
}

configureRedisCluster()


If you run this script as part of a helm post install hook job, you will need to add permissions for the job to get and list pods, and to execute on pods.

Summary

After all these steps, we have a running redis cluster, BUT the redis cluster is not transparent to it users. A user addressing one redis instance might get a MOVED redirection (see Moved Redirection at redis cluster specification documentation), which means that there is another instance which handles the related information, and it is up to the client to address the relevant instance.

To make this actually transparent, use a redis client library that handles theses issues.
I have used the ioredis javascript library, which handles the MOVED redirection.

Monday, November 11, 2019

Using websocket on client and on server side

For many web applications, REST calls are the standard for client-server communication.
However, REST calls do not suit a more complex interactions, for example:

  • The server should notify the client when something changes
  • Using sticky sessions in a load balanced environment (Notice that you can use ingress to enforce stickiness, but forces usage of ingress, see this article)

A standard method to handle this requirement is using a web sockets.


The client creates a socket to the server, and leaves it open. Then, the server and the client can send frames over the socket whenever they need. This has performance implications, as the server must maintain an open socket for each running client.

Implementing web sockets in javascript is easy. Let review the implementation on the client side and on the server side. First, install the web socket library:


npm install --save socket.io-client


The Client Side

On the main client page, import the web socket library

const socketIo = require('socket.io-client')

Next, upon page load, or any other event that require the web socket establishment, connect to the server

const webSocket = socketIo.connect('http://my-server', {transports: ['websocket']})

To send a frame from the client over the web socket to the server, we specify the frame type, and the data

webSocket.emit('type1',{name:'Alice'})

And the last thing to do, is to handle the events triggered by the web socket. These include both error handling, and frames received from the server

webSocket.on('error', (e) => {
 console.error(e)
})
webSocket.on('disconnect', () => {
 console.log('web socket is disconnected')
})
webSocket.on('connect', () => {
 console.log('web socket is connected')
})
webSocket.on('type2', (dataFromServer) => {
 console.log('got type2 frame from the server', dataFromServer)
})


The Server Side

Create web socket listener on the server side

import socketIo from 'socket.io'
import express from 'express'
import http from 'http'

const app = express()
const server = http.Server(app)
const webSocketServer = socketIo(server, {transports: ['websocket']})
server.listen(8080)

Next, handle the socket events:

webSocketServer.on('connection', socket => {
  console.log(`A user connected with socket ${socket.id}`)

  socket.on('type1', dataFromClient => {
    logger.debug('got type1 frame from the client', dataFromClient)
  })
  
  socket.on('disconnect', () => {
    logger.debug('user socket disconnected')
  })
  
})

Summary

Javascript provides libraries that makes websocket implementation easy, and it take only few minutes to use it. Notice that to view the actual frame, you can use the chrome debugger network tab, see the Debugging WebSocket answer.

To view more details about the web socket library, check the socket-io home page.

Sunday, November 3, 2019

Create GO based server using GoLand on ubuntu


GO lang is one of the more successful programming languages, allowing performance oriented programming to be part of the code, while handling garbage collection (Unlike C++).
In this post we'll create a new GO based http server on ubuntu 18.04, and use JenBrains' GoLand IDE for the project development. Following are the steps for creating the project.

Install GoLang

Start by updating ubuntu packages:

sudo apt-get update

Then ,find the latest GoLang binary release in https://golang.org/dl/
and download it:

wget https://dl.google.com/go/go1.13.4.linux-amd64.tar.gz

Extract and move it to /usr/local

tar -xvf go1.13.4.linux-amd64.tar.gz
sudo mv go /usr/local

Install JetBrains GoLand



Download the lastest GoLand from: https://www.jetbrains.com/go/download/#section=linux

Unzip, and run the extracted folder the GoLand
./bin/goland.sh

Note:
If yo're working on ubuntu, use the GoLand to create desktop entry. This can be done only after the "Create New project" step. To create desktop entry, use the GoLand menu:
Tools, Create Desktop Entry...

Create New Project

Click on File, New project, and select Go Modules.
Type the location for the project, for example:

/home/my-user/projects/my-server

Click on the plus sign next to the GOROOT, select local, and select the go installation:

/usr/local/go

And click on the Create button.

The project explorer is displayed, and the go.mod file is created.
The go.mod includes the project name, and the go version. It is actually the GoLand running for us the `go mod init` command that had created this file.

Create the Server

Create new file named main.go, and enter the following code:

package main
import (
   "k8s.io/klog"   "net/http"   "strconv")

func main() {
   klog.Info("Server starting")

   mux := http.NewServeMux()
   mux.HandleFunc("/", helloHandler)
   mux.HandleFunc("/square", squareHandler)

   server := &http.Server{
      Addr:    ":8080",
      Handler: mux,
   }

   err := server.ListenAndServe()
   if err != nil {
      klog.Fatalf("Failed to start, error: %v", err)
   }
}

func squareHandler(w http.ResponseWriter, r *http.Request) {
   n, err := strconv.Atoi(r.URL.Query().Get("n"))
   if err != nil {
      w.WriteHeader(http.StatusInternalServerError)
      w.Write([]byte("conversion error"))
      return   }

   w.Write([]byte("square is " + strconv.Itoa(n*n)))
} 
func helloHandler(w http.ResponseWriter, r *http.Request) {
   w.Write([]byte("The server is up"))
}

This server is starting on port 8080, and has two handler.
The / path reply with "the server is up".
the /square path gets a query parameter named n, and returns the square of the number.

Run the server

Right click on the main.go file, and select:
Run `go build main.go`

This will both handle the `go get` command to update the dependencies in the go.mod file, as well as running the actual server.

Summary

This post includes the first steps in creation of a go based server, and running it using GoLand.
In future post we'll examine dockerization of the build, and tests.

Wednesday, October 30, 2019

Using RxJS


Recently I've used RxJS as part of our reverse proxy code. The learning curve to start using the RxJS was very fast for a startup project. However once you have a problem in the RxJS usage, you might spend a lot of time trying to understand the problem. This is mostly due to poor errors description, and no useful error stack in case of problems.



RxJS is based on an implementation and usage of Observable objects.
One example for this is the redux-observable library, which is a middleware that enables redux actions to be handled as Observable objects as part of epics.

For example, for example, to handle action of type TYPE1, and send action of TYPE2, you can use the following:

import {ofType} from 'redux-observable'
import {flatMap} from 'rxjs/operators'

export const handleType1Epic = (action$) =>
  action$.pipe(
    ofType('TYPE1'),
    flatMap(() => {
      return [
         { type: TYPE2}
      ]
    })
  )

Notice the following:

  • The action$ is the Observable of the redux action
  • We configure the epic to handle only actions of type TYPE1
  • We return an array of actions that will be handled, so the flatMap converts each of the returned array elements into a new observable.

What if we want to examine the action parameters, and decide which is the next action to be handled accordingly?

export const handleType1Epic = (action$) =>
  action$.pipe(
    ofType('TYPE1'),
    flatMap((action) => {
      return [
         { type: action.myParameter===2 ? 'TYPE2' : ' TYPE3' }
      ]
    })
  )

We can see that we must not access the action$ variable, as it is not the action, but instead it is the observable wrapping the action. However, once we get the action into the flatMap, we get the actual action and its values.


RxJS handles can be combined with Promises. Instead of using await to wait for a promise completion, we convert the promise to observable:

export const handleType1Epic = (action$) =>
  action$.pipe(
    ofType('TYPE1'),
    flatMap(() => {
      return from(myPromiseReturningFunction(action.myParameter)).pipe(
        flatMap((myPromiseResult) => {
          return [
             { 
               type: 'TYPE2',
               result: myPromiseResult
             }
          ]
        })
      )
    })
  )


Notice:

  • We have converted the Promise to observable using the from keyword
  • We must use return from to return the observable created by the from
  • The observable created by the from is handled using a new pipeline

What is something fails? Here is when RxJS power come into the play:
export const handleType1Epic = (action$) =>
  action$.pipe(
    ofType('TYPE1'),
    flatMap(() => {
      return from(myPromiseReturningFunction(action.myParameter)).pipe(
        flatMap((myPromiseResult) => {
          return [
             { 
               type: 'TYPE2',
               result: myPromiseResult
             }
          ]
        }),
        retryWhen(errors => retryFunction(errors)),
      )
    })
  )


function retryFunction(errors) {
  return errors.pipe(
    flatMap((error, count) => {
      if (count > 3) return throwError(error)
      logger.error(`retrying attempt ${count}, error: ${error}`)
      return timer(100)
    }),
  )
}


The retryWhen will rerun the entire from observable upon our decision.
In this example, we retry 3 times with 100ms delay, and if it still fails, we throw an error.


What if we want a central location to handle errors?
We can produce a new redux action to and handle it in a central errors handling epic:

export const handleType1Epic = (action$) =>
  action$.pipe(
    ofType('TYPE1'),
    flatMap(() => {
      return from(myPromiseReturningFunction(action.myParameter)).pipe(
        flatMap((myPromiseResult) => {
          return [
             { 
               type: 'TYPE2',
               result: myPromiseResult
             }
          ]
        }),
        retryWhen(errors => retryFunction(errors)),
        catchError(error => [{type: 'ERROR', action, error}]),
      )
    })
  )

export const errorEpic = (action$) =>
  action$.pipe(
    ofType('ERROR'),
    flatMap(action => {
      logger.error(`error at action ${JSON.stringify(action.action)}: ${action.error}`)
      return []
    }),
  )

To sum:
RxJS could realy accelerate the development of new applications, due to its powerful builtin capabilities. Maintaining RxJS might be more complex than standard Promises based code.
I still recommend using RxJS instead of standard redux code.


Wednesday, October 23, 2019

Docker cleanup using system prune


Running docker build usually involves creating new docker image, and tagging it with the current build number. But this leads to leaving images leftovers on the build machine, that would eventually cause out of disk space issues. Let's examine the reason for that, and how can we address this issue.


The Build Process

The build process usually includes the following steps:
  • Run docker build using tag according to the Jenkins build number
  • Push the build number tagged docker image
  • Tag the image using :latest tag as well
  • Push the latest tagged docker image
For example (assuming 123 is the current build number):

1. docker build -t my-registry/my-image:123 .
2. docker push my-registry/my-image:123
3. docker tag my-registry/my-image:123 my-registry/my-image:latest
4. docker push my-registry/my-image:latest
5. docker rmi my-registry/my-image:123

The problem starts when we run the next build.
Let examine the docker images after running build #123:
REPOSITORY            TAG     IMAGE ID      CREATED             SIZE
my-registry/my-image  latest  8193a6ec2439  17 seconds ago      91.4MB

And now let run build #124:

1. docker build -t my-registry/my-image:124 .
2. docker push my-registry/my-image:124

So the docker images are now:
REPOSITORY            TAG     IMAGE ID      CREATED             SIZE
my-registry/my-image  124     bfef88428b40  17 seconds ago      91.4MB
my-registry/my-image  latest  8193a6ec2439  55 seconds ago      91.4MB

And after the tag to latest command:

3. docker tag my-registry/my-image:124 my-registry/my-image:latest
4. docker push my-registry/my-image:latest

The images are now:
REPOSITORY            TAG     IMAGE ID      CREATED             SIZE
my-registry/my-image  124     bfef88428b40  17 seconds ago      91.4MB
my-registry/my-image  latest  bfef88428b40  55 seconds ago      91.4MB
<none>                <none>  8193a6ec2439  2 minutes ago       91.4MB

So now we have leftover "zombie" image marked with <none>.
This is even we have removed the previous build image, and marked the new build as "latest".
Even after removing the build 124:

5. docker rmi my-registry/my-image:124

We still have the zombie image:
REPOSITORY            TAG     IMAGE ID      CREATED             SIZE
my-registry/my-image  latest  bfef88428b40  55 seconds ago      91.4MB
<none>                <none>  8193a6ec2439  2 minutes ago       91.4MB

The Solution

The solution is to use the command:

docker system prune -f


This would remove any zombie images.
Notice this only works in combination with `docker rmi`  for the current build image, and leave only the 'latest' tag for the image. This ensures that the latest tag is replaced to the new build, and the onld image remains as <none>, hence allowing the docker system prune command to remove it.

From the docker documentation page, we can find the the system prune command removes all unused containers (and more).


See also Delete images from a Private Docker Registry.