Wednesday, December 4, 2019

Creating a kubernetes controller


This post explains about a kubernetes controller that I've created.
I've had reviewed several guides for kubernetes controller creation, such as Extending Kubernetes, Kubernetes Custom Controller, and this. I've found myself overwhelmed by the complexity of such a simple requirement. This post is the present the KISS gist of my blog. When you need to perform a task, ask yourself, does it really need to be so complex? Do I need all this monster for a simple task?

Many controllers are based on kubernetes Custom Resource Definition (aka CRD).
CRDs are indeed nice, but make the implementation much more complex, as you will have to use code generator tools.
If this is an internal solution, do you really need CRD? Why not use a simple ConfigMap?

In the solution presented below, I am using a ConfigMap that configures the number of StatefulSets to create. The ConfigMap also contains the template that will be used for the StatefulSet creation.
An example of the ConfigMap is:

apiVersion: v1
kind: ConfigMap
metadata:
  labels:
    resource-type: "primary"
  name: primary-config
  namespace: default
data:
  general.yaml: |-
    count: 2
  statefulSet.yaml: "apiVersion: apps/v1\nkind: StatefulSet\nmetadata:\n  name: my-statefulset-___sequence___\n
    \ labels:\n    resource-type: \"secondary\"    \n    app.kubernetes.io/instance
    : bouncer\n    app.kubernetes.io/name : bouncer\nspec:\n  
 
 ...

The controller main code is the merge logic, which fetch the primary ConfigMap, and creates/delete StatefulSet accordingly. This is done by the following logic:

  • Get the primary ConfigMap
  • Load the existing StatefulSets
  • Loop until the required count in the primary ConfigMap
    • Create the StatefulSet if it does not exist
  • Delete all StatefulSets that were not encountered in the loop above

package mypackage

import (
  "fmt"
  "gopkg.in/yaml.v2"
  apps "k8s.io/api/apps/v1"
  core "k8s.io/api/core/v1"
  "k8s.io/client-go/kubernetes/scheme"
  "strconv"
  "strings"
  "sync"
)

type Merger struct {
  updates          chan int
  api              *K8sApi
  primaryConfig   *core.ConfigMap
  currentEntities  []apps.StatefulSet
  requiredEntities []apps.StatefulSet
}

func (merger *Merger) Init() {
  merger.updates = make(chan int, 100)
  go func() {
    for {
      select {
      case update := <-merger.updates:
        for len(merger.updates) > 0 {
          <-merger.updates
        }
        merger.merge()
      }
    }
  }()
}

func (merger *Merger) NotifyUpdate() {
  merger.updates <- MessageUpdate
}

func (merger *Merger) merge() {
  configMaps := merger.api.configMap.FetchByLabels("resource-type=primary")
  merger.primaryConfig = &configMaps[0]

  merger.currentEntities = merger.api.statefulSet.FetchByLabels("resource-type=secondary")
  merger.requiredEntities = []apps.StatefulSet{}
  chains := merger.getRequiredCount()
  for i := 0; i < chains; i++ {
    merger.mergeSequence(i)
  }
  merger.deleteNonUsedStatefulSets()
}

func (merger *Merger) deleteNonUsedStatefulSets() {
  usedNames := map[string]bool{}
  for _, s := range merger.requiredEntities {
    usedNames[s.Name] = true
  }
  for _, s := range merger.currentEntities {
    if !usedNames[s.Name] {
      merger.api.statefulSet.Delete(s.Name)
    }
  }
}

func (merger *Merger) mergeSequence(sequence int) {
  statefulSetText := merger.getStatefulSetUpdatedText(sequence, name)
  decode := scheme.Codecs.UniversalDeserializer().Decode
  obj, _, err := decode([]byte(statefulSetText), nil, nil)
  if err != nil {
    Fatal("decode statefulSetText failed, %v", err)
  }
  statefulSet, ok := obj.(*apps.StatefulSet)
  if !ok {
    Fatal("cast statefulSet failed")
  }

  merger.requiredEntities = append(merger.requiredEntities, *statefulSet)
  if merger.statefulSetExists(statefulSet.Name) {
    return
  }

  merger.api.statefulSet.Create(statefulSet)
}

func (merger *Merger) getStatefulSetUpdatedText(sequence int, name string) string {
  statefulSet := merger.primaryConfig.Data["statefulSet.yaml"]
  statefulSet = merger.templateReplace(statefulSet, "sequence", strconv.Itoa(sequence))
  statefulSet = merger.templateReplace(statefulSet, "name", name)
  replacements := merger.getGeneralConfig()[name]
  return statefulSet
}

func (merger *Merger) templateReplace(text string, from string, to string) string {
  return strings.Replace(text, fmt.Sprintf("___%v___", from), to, -1)

}

func (merger *Merger) getRequiredCount() int {
  generalConfig := merger.getGeneralConfig()
  countStr := generalConfig["count"]
  count, err := strconv.Atoi(fmt.Sprintf("%v", countStr))
  if err != nil {
    Fatal("parse of count %v failed %v", countStr, err)
  }
  return count
}

func (merger *Merger) getGeneralConfig() map[interface{}]interface{} {
  general := merger.primaryConfig.Data["general.yaml"]
  generalConfig := make(map[interface{}]interface{})
  err := yaml.Unmarshal([]byte(general), &generalConfig)
  if err != nil {
    Fatal("error parsing configMap yaml %v", err)
  }
  return generalConfig
}

func (merger *Merger) statefulSetExists(name string) bool {
  for _, statefulSet := range merger.currentEntities.statefulSets {
    if statefulSet.Name == name {
      return true
    }
  }
  return false
}

All we need now is to activate the NotifyUpdate upon change.
For this purpose we use the kubernetes watch API. We should watch for both the ConfigMap, and for StatefulSet changes. Watch for the StatefuleSet for example is done by the following:

func (api *K8sStatefulSetApi) WatchByLabels(labels string, notifier *Notifier) {
  listOptions := meta.ListOptions{
    LabelSelector: labels,
  }
  watcher, err := client.AppsV1().StatefulSets("default").Watch(listOptions)
  if err != nil {
    Fatal("watch kubernetes statefulSet failed: %v", err)
  }
  ch := watcher.ResultChan()
  go func() {
    Verbose("watcher loop for statefulSet established")
    for event := range ch {
      statefulSet, ok := event.Object.(*apps.StatefulSet)
      if !ok {
        Fatal("unexpected event in watcher channel")
      }

      Verbose("got event for statefulSet %v", statefulSet.Name)
      (*notifier)()
    }
  }()
}


Summary

In this post a simple kubernetes controller based on a ConfigMap was presented.
The primary resource is the configMap, marked by the label: "resource-type=primary"
The secondary resources are the StatefulSets, marked by the label: "resource-type=secondary"
This controller does not use CRD, hence it does not require any code generation tools, and allows KISS implementation.


No comments:

Post a Comment