This post explains about a kubernetes controller that I've created.
I've had reviewed several guides for kubernetes controller creation, such as
Extending Kubernetes,
Kubernetes Custom Controller, and
this. I've found myself overwhelmed by the complexity of such a simple requirement. This post is the present the KISS gist of my blog. When you need to perform a task, ask yourself, does it really need to be so complex? Do I need all this monster for a simple task?
Many controllers are based on kubernetes Custom Resource Definition (aka CRD).
CRDs are indeed nice, but make the implementation much more complex, as you will have to use code generator tools.
If this is an internal solution, do you really need CRD? Why not use a simple ConfigMap?
In the solution presented below, I am using a ConfigMap that configures the number of StatefulSets to create. The ConfigMap also contains the template that will be used for the StatefulSet creation.
An example of the ConfigMap is:
apiVersion: v1
kind: ConfigMap
metadata:
labels:
resource-type: "primary"
name: primary-config
namespace: default
data:
general.yaml: |-
count: 2
statefulSet.yaml: "apiVersion: apps/v1\nkind: StatefulSet\nmetadata:\n name: my-statefulset-___sequence___\n
\ labels:\n resource-type: \"secondary\" \n app.kubernetes.io/instance
: bouncer\n app.kubernetes.io/name : bouncer\nspec:\n
...
The controller main code is the merge logic, which fetch the primary ConfigMap, and creates/delete StatefulSet accordingly. This is done by the following logic:
- Get the primary ConfigMap
- Load the existing StatefulSets
- Loop until the required count in the primary ConfigMap
- Create the StatefulSet if it does not exist
- Delete all StatefulSets that were not encountered in the loop above
package mypackage
import (
"fmt"
"gopkg.in/yaml.v2"
apps "k8s.io/api/apps/v1"
core "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/scheme"
"strconv"
"strings"
"sync"
)
type Merger struct {
updates chan int
api *K8sApi
primaryConfig *core.ConfigMap
currentEntities []apps.StatefulSet
requiredEntities []apps.StatefulSet
}
func (merger *Merger) Init() {
merger.updates = make(chan int, 100)
go func() {
for {
select {
case update := <-merger.updates:
for len(merger.updates) > 0 {
<-merger.updates
}
merger.merge()
}
}
}()
}
func (merger *Merger) NotifyUpdate() {
merger.updates <- MessageUpdate
}
func (merger *Merger) merge() {
configMaps := merger.api.configMap.FetchByLabels("resource-type=primary")
merger.primaryConfig = &configMaps[0]
merger.currentEntities = merger.api.statefulSet.FetchByLabels("resource-type=secondary")
merger.requiredEntities = []apps.StatefulSet{}
chains := merger.getRequiredCount()
for i := 0; i < chains; i++ {
merger.mergeSequence(i)
}
merger.deleteNonUsedStatefulSets()
}
func (merger *Merger) deleteNonUsedStatefulSets() {
usedNames := map[string]bool{}
for _, s := range merger.requiredEntities {
usedNames[s.Name] = true
}
for _, s := range merger.currentEntities {
if !usedNames[s.Name] {
merger.api.statefulSet.Delete(s.Name)
}
}
}
func (merger *Merger) mergeSequence(sequence int) {
statefulSetText := merger.getStatefulSetUpdatedText(sequence, name)
decode := scheme.Codecs.UniversalDeserializer().Decode
obj, _, err := decode([]byte(statefulSetText), nil, nil)
if err != nil {
Fatal("decode statefulSetText failed, %v", err)
}
statefulSet, ok := obj.(*apps.StatefulSet)
if !ok {
Fatal("cast statefulSet failed")
}
merger.requiredEntities = append(merger.requiredEntities, *statefulSet)
if merger.statefulSetExists(statefulSet.Name) {
return
}
merger.api.statefulSet.Create(statefulSet)
}
func (merger *Merger) getStatefulSetUpdatedText(sequence int, name string) string {
statefulSet := merger.primaryConfig.Data["statefulSet.yaml"]
statefulSet = merger.templateReplace(statefulSet, "sequence", strconv.Itoa(sequence))
statefulSet = merger.templateReplace(statefulSet, "name", name)
replacements := merger.getGeneralConfig()[name]
return statefulSet
}
func (merger *Merger) templateReplace(text string, from string, to string) string {
return strings.Replace(text, fmt.Sprintf("___%v___", from), to, -1)
}
func (merger *Merger) getRequiredCount() int {
generalConfig := merger.getGeneralConfig()
countStr := generalConfig["count"]
count, err := strconv.Atoi(fmt.Sprintf("%v", countStr))
if err != nil {
Fatal("parse of count %v failed %v", countStr, err)
}
return count
}
func (merger *Merger) getGeneralConfig() map[interface{}]interface{} {
general := merger.primaryConfig.Data["general.yaml"]
generalConfig := make(map[interface{}]interface{})
err := yaml.Unmarshal([]byte(general), &generalConfig)
if err != nil {
Fatal("error parsing configMap yaml %v", err)
}
return generalConfig
}
func (merger *Merger) statefulSetExists(name string) bool {
for _, statefulSet := range merger.currentEntities.statefulSets {
if statefulSet.Name == name {
return true
}
}
return false
}
All we need now is to activate the NotifyUpdate upon change.
For this purpose we use the kubernetes watch API. We should watch for both the ConfigMap, and for StatefulSet changes. Watch for the StatefuleSet for example is done by the following:
func (api *K8sStatefulSetApi) WatchByLabels(labels string, notifier *Notifier) {
listOptions := meta.ListOptions{
LabelSelector: labels,
}
watcher, err := client.AppsV1().StatefulSets("default").Watch(listOptions)
if err != nil {
Fatal("watch kubernetes statefulSet failed: %v", err)
}
ch := watcher.ResultChan()
go func() {
Verbose("watcher loop for statefulSet established")
for event := range ch {
statefulSet, ok := event.Object.(*apps.StatefulSet)
if !ok {
Fatal("unexpected event in watcher channel")
}
Verbose("got event for statefulSet %v", statefulSet.Name)
(*notifier)()
}
}()
}
Summary
In this post a simple kubernetes controller based on a ConfigMap was presented.
The primary resource is the configMap, marked by the label: "resource-type=primary"
The secondary resources are the StatefulSets, marked by the label: "resource-type=secondary"
This controller does not use CRD, hence it does not require any code generation tools, and allows KISS implementation.