Monday, June 29, 2020

Secure connection to Kafka from a GoLang client



In this post we will review how to create a secure kafka connection from GO.

When trying to connect to a secure Apache Kafka server, you will usually receive 2 files:
  • client.keystore.jks
  • client.trustsotre.jks

These files should be converted to PEM files.
Use the following script to create the PEM files:
  • server.cer.pem
  • client.cer.pem
  • client.key.pem


keytool -importkeystore \
-srckeystore input/client.truststore.jks \
-destkeystore output/server.p12 \
-deststoretype PKCS12 \
-srcstorepass "jks-pass" \
-deststorepass "topsecret"

openssl pkcs12 -in output/server.p12 -nokeys -out output/server.cer.pem -password pass:topsecret

keytool -importkeystore \
-srckeystore input/client.keystore.jks \
-destkeystore output/client.p12 \
-deststoretype PKCS12 \
-srcstorepass "jks-pass" \
-deststorepass "topsecret"

openssl pkcs12 -in output/client.p12 -nokeys -out output/client.cer.pem -password pass:topsecret

openssl pkcs12 -in output/client.p12 -nodes -nocerts -out output/client.key.pem -password pass:topsecret


To use a secure connection from a GO client, use the following code:


package consumer

import (
"crypto/tls"
"crypto/x509"
"fmt"
"github.com/Shopify/sarama"
"io/ioutil"
)

func connect() {
config := sarama.NewConfig()
config.Version = sarama.V1_1_1_0
config.Consumer.Return.Errors = true
tlsConfig := newTLSConfig()

config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
config.Net.SASL.User = "my-user"
config.Net.SASL.Password = "my-password"

syncProducer, err := sarama.NewSyncProducer([]string{"127.0.0.1:30010"}, nil)
if err != nil {
panic(err)
}

fmt.Printf("sync producer created: %v", syncProducer)
}

func newTLSConfig() *tls.Config {
tlsConfig := tls.Config{}

cert, err := tls.LoadX509KeyPair("client.cer.pem", "client.key.pem")
if err != nil {
panic(err)
}
tlsConfig.Certificates = []tls.Certificate{cert}

caCert, err := ioutil.ReadFile("server.cer.pem")
if err != nil {
panic(err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig.RootCAs = caCertPool
tlsConfig.InsecureSkipVerify = true

tlsConfig.BuildNameToCertificate()
return &tlsConfig
}




Kafka Producer and Consumer in GO




In this post we will review how to create an Apache Kafka producer and consumer in GO.
To access kafka we will use Shopify's Sarama library.




We will use a simple application that starts a producer and a consumer:


package main

import "kafka/players"

func main() {
go players.Producer()

consumer:= players.Consumer{}
consumer.Consume()
}


We will use consts used by both the producer and the consumer:


package players

const KafkaServer = "127.0.0.1:30010"
const KafkaTopic = "my-topic"


The producer code is very strait forward, it sends messages to kafka in loop:


package players

import (
"github.com/Shopify/sarama"
"time"
)

func Producer() {
syncProducer, err := sarama.NewSyncProducer([]string{KafkaServer}, nil)
if err != nil {
panic(err)
}

for {
msg := &sarama.ProducerMessage{
Topic: KafkaTopic,
Value: sarama.ByteEncoder("Hello World " + time.Now().Format(time.RFC3339)),
}

_, _, err = syncProducer.SendMessage(msg)
if err != nil {
panic(err)
}

time.Sleep(time.Second)
}
}


The consumer is a bit more complex, as it needs to recover from a broker crash.


package players

import (
"context"
"fmt"
"github.com/Shopify/sarama"
"time"
)

type Consumer struct {
}

func (c *Consumer) Consume() {
config := sarama.NewConfig()
config.Version = sarama.V2_4_0_0
group, err := sarama.NewConsumerGroup([]string{KafkaServer}, "my-group", config)
if err != nil {
panic(err)
}

go func() {
for err := range group.Errors() {
panic(err)
}
}()

func() {
ctx := context.Background()
for {
topics := []string{KafkaTopic}
err := group.Consume(ctx, topics, c)
if err != nil {
fmt.Printf("kafka consume failed: %v, sleeping and retry in a moment\n", err)
time.Sleep(time.Second)
}
}
}()
}

func (c *Consumer) Setup(_ sarama.ConsumerGroupSession) error {
return nil
}

func (c *Consumer) Cleanup(_ sarama.ConsumerGroupSession) error {
return nil
}

func (c *Consumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("consumed a message: %v\n", string(msg.Value))
sess.MarkMessage(msg, "")
}
return nil
}


And that's all, the output from the application is:


consumed a message: Hello World 2020-06-30T08:28:10+03:00
consumed a message: Hello World 2020-06-30T08:28:18+03:00
consumed a message: Hello World 2020-06-30T08:28:14+03:00
consumed a message: Hello World 2020-06-30T08:28:19+03:00
consumed a message: Hello World 2020-06-30T08:28:16+03:00
consumed a message: Hello World 2020-06-30T08:28:15+03:00
consumed a message: Hello World 2020-06-30T08:28:20+03:00

Final Notes


In this post we have created a simple kafka producer & consumer application in GO.

Notice that the message sent from the producer is a simple text, but it could also be a JSON based on marshal of a structure.


Wednesday, June 24, 2020

Deploy Apache Kafka on Kubernetes





In this post we will review the steps required to deploy Apache Kafka on kubernetes.
From the Apache Kafka site:

"
Kafka® is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.
"

Notice:
To use Kafka on kubernetes, start by deploy Apache ZooKeeper, which is used by Kafka to manage the cluster brokers, and the leader election. See my previous post: Deploy Apache Zookeeper on Kubernetes

Once the Zookeeper is deployed, we will create the following kubernetes resources:
  1. ConfigMap
  2. Headless Service
  3. Exposed Service
  4. StatefulSet
  5. Init container


1. The ConfigMap


The ConfigMap includes two files:
  • The logger configuration
  • The Kafka server.properties


apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-config
data:
log4j.properties: |-
# Unspecified loggers and loggers with additivity=true output to server.log and stdout
# Note that INFO only applies to unspecified loggers, the log level of the child logger is used otherwise
log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log
log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

# Change the two lines below to adjust ZK client logging
log4j.logger.org.I0Itec.zkclient.ZkClient=INFO
log4j.logger.org.apache.zookeeper=INFO

# Change the two lines below to adjust the general broker logging level (output to server.log and stdout)
log4j.logger.kafka=INFO
log4j.logger.org.apache.kafka=INFO

# Change to DEBUG or TRACE to enable request logging
log4j.logger.kafka.request.logger=WARN, requestAppender
log4j.additivity.kafka.request.logger=false

# Uncomment the lines below and change log4j.logger.kafka.network.RequestChannel$ to TRACE for additional output
# related to the handling of requests
#log4j.logger.kafka.network.Processor=TRACE, requestAppender
#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
#log4j.additivity.kafka.server.KafkaApis=false
log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
log4j.additivity.kafka.network.RequestChannel$=false

log4j.logger.kafka.controller=TRACE, controllerAppender
log4j.additivity.kafka.controller=false

log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
log4j.additivity.kafka.log.LogCleaner=false

log4j.logger.state.change.logger=TRACE, stateChangeAppender
log4j.additivity.state.change.logger=false

# Change to DEBUG to enable audit log for the authorizer
log4j.logger.kafka.authorizer.logger=WARN, authorizerAppender
log4j.additivity.kafka.authorizer.logger=false

server.properties: |-
log.dirs=/var/lib/kafka/data/topics
num.partitions=12
default.replication.factor=3
min.insync.replicas=2
auto.create.topics.enable=false
broker.rack=rack1
listeners=PLAINTEXT://:9092,OUTSIDE://:9094
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,OUTSIDE:PLAINTEXT
inter.broker.listener.name=PLAINTEXT
offsets.retention.minutes=10080
log.retention.hours=-1
zookeeper.connect=zookeeper-exposed-service:80


2. The Headless Service


The Kafka headless service is used by the init container to update the published FQDN of the broker.


apiVersion: v1
kind: Service
metadata:
name: kafka-internal-service
spec:
selector:
configid: kafka-container
type: ClusterIP
clusterIP: None
publishNotReadyAddresses: true
ports:
- port: 9092



3. The Exposed Service


The Kafka service exposes the API for the clients. 
The client connection is as follows:

  • The client connects to the exposed service
  • The client reaches randomly (by kubernetes service) to one of the Kafka brokers
  • The broker returns the published FQDN of the selected Kafka broker. This uses the following FQDN: <POD_NAME>.<HEADLESS SERVICE NAME>
  • The client directly connects to the selected broker


apiVersion: v1
kind: Service
metadata:
name: kafka-service
spec:
selector:
configid: kafka-container
ports:
- port: 9092


3. The StatefulSet


The StatefulSet includes the configuration of the Kafka broker nodes.
It includes an init container to update the kafka configuration.


apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka-statefulset
spec:
serviceName: kafka-internal-service
selector:
matchLabels:
configid: kafka-container
replicas: 3
template:
metadata:
labels:
configid: kafka-container
spec:
terminationGracePeriodSeconds: 30
initContainers:
- name: init
imagePullPolicy: IfNotPresent
image: my-registry/kafka-init/dev:latest
volumeMounts:
- name: configmap
mountPath: /etc/kafka-configmap
- name: config
mountPath: /etc/kafka
- name: extensions
mountPath: /opt/kafka/libs/extensions
containers:
- name: broker
image: solsson/kafka:2.4.1@sha256:79761e15919b4fe9857ec00313c9df799918ad0340b684c0163ab7035907bb5a
env:
- name: CLASSPATH
value: /opt/kafka/libs/extensions/*
- name: KAFKA_LOG4J_OPTS
value: -Dlog4j.configuration=file:/etc/kafka/log4j.properties
- name: JMX_PORT
value: "5555"
command:
- ./bin/kafka-server-start.sh
- /etc/kafka/server.properties
lifecycle:
preStop:
exec:
command: ["sh", "-ce", "kill -s TERM 1; while $(kill -0 1 2>/dev/null); do sleep 1; done"]
readinessProbe:
tcpSocket:
port: 9092
timeoutSeconds: 1
volumeMounts:
- name: config
mountPath: /etc/kafka
- name: data
mountPath: /var/lib/kafka/data
- name: extensions
mountPath: /opt/kafka/libs/extensions
volumes:
- name: configmap
configMap:
name: kafka-config
- name: config
emptyDir: {}
- name: extensions
emptyDir: {}
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes: [ "ReadWriteOnce" ]
storageClassName: "hostpath"
resources:
requests:
storage: 500Mi


The Init Container


The init container updates the server.properties file with the FQDN of the pod.

The Dockerfile is:


FROM ubuntu:18.04
COPY files /
ENTRYPOINT /entrypoint.sh


and the entrypoint script is:


#!/bin/bash

cp /etc/kafka-configmap/* /etc/kafka/

KAFKA_BROKER_ID=${HOSTNAME##*-}

serverName="kafka-statefulset-${KAFKA_BROKER_ID}.kafka-internal-service"
sed -i "s/#init#broker.id/broker.id=${KAFKA_BROKER_ID}/" /etc/kafka/server.properties
sed -i "s/#init#advertised.listeners/advertised.listeners=PLAINTEXT:\\/\\/${serverName}:9092/" /etc/kafka/server.properties



Final Notes


In this post we have reviewed Kafka deployment on kubernetes.
Notice that we did not get into configuration the Kafka itself for your application need.
You will probably need to update the server.properties for you needs.

For example, to run a single replica of Kafka, you will need to update the server properties with:

default.replication.factor=1
min.insync.replicas=1
auto.create.topics.enable=true
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1


Liked this post? Leave a comment...


Wednesday, June 17, 2020

Deploy Apache Zookeeper on Kubernetes



In this post we'll review how to deploy Apache ZooKeeper on kubernetes.

Zookeeper manifest is:

"Apache ZooKeeper is an effort to develop and maintain an open-source server which enables highly reliable distributed coordination."

It can be used for cluster coordination actions. For example, it used by kafka for the cluster's brokers election.

To deploy ZooKeeper on kubernetes, we need the following:
  • A ConfigMap with its related configuration files
  • An exposed service enabling clients to access the ZooKeeper
  • A headless service enabling ZooKeeper instances coordination
  • A StatefulSet to run the ZooKeeper instances
  • An init container to update the ZooKeeper instances configuration
  • An updated ZooKeeper container to run the ZooKeeper instances


You might also find the post Deploy Apache Kafka on Kubernetes relevant.


The ConfigMap


The ConfigMap holds two files:
  • The logger configuration file
  • The ZooKeeper configuration file

Notice that the ZooKeeper configuration file is a template, that will be later updated by the init container to include the list of the ZooKeeper instances.


apiVersion: v1
kind: ConfigMap
metadata:
name: zookeeper-config
data:
log4j.properties: |-
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

# Suppress connection log messages, three lines per livenessProbe execution
log4j.logger.org.apache.zookeeper.server.NIOServerCnxnFactory=WARN
log4j.logger.org.apache.zookeeper.server.NIOServerCnxn=WARN
zookeeper.properties: |-
4lw.commands.whitelist=*
tickTime=2000
dataDir=/var/lib/zookeeper/data
dataLogDir=/var/lib/zookeeper/log
clientPort=2181
maxClientCnxns=2
initLimit=5
syncLimit=2


The Exposed Service


The exposed service is the service used by the ZooKeeper clients.


apiVersion: v1
kind: Service
metadata:
name: zookeeper-service
spec:
selector:
configid: zookeeper-container
ports:
- port: 80
targetPort: 2181



The Headless Service


The headless service is used only by the ZooKeeper instances, and is not used by the ZooKeeper clients. Its purpose is coordination between the ZooKeeper cluster instances.


apiVersion: v1
kind: Service
metadata:
name: zookeeper-internal-service
spec:
selector:
configid: zookeeper-container
type: ClusterIP
clusterIP: None
publishNotReadyAddresses: true
ports:
- port: 2888
name: peer
- port: 3888
name: election


The StatefulSet


The StatefulSet creates the actual instances of the ZooKeeper cluster.
It contains an init container which updates the ZooKeeper instances configuration, and the actual ZooKeeper container.


apiVersion: apps/v1
kind: StatefulSet
metadata:
name: zookeeper-statefulset
spec:
serviceName: zookeeper-internal-service
selector:
matchLabels:
configid: zookeeper-container
replicas:
podManagementPolicy: Parallel
template:
metadata:
labels:
configid: zookeeper-container
spec:
terminationGracePeriodSeconds: 10
initContainers:
- name: init
image: my-registry/zookeeper-init:latest
env:
- name: ZOO_REPLICAS
value: "3"
volumeMounts:
- name: configmap
mountPath: /etc/kafka-configmap
- name: config
mountPath: /etc/kafka
- name: data
mountPath: /var/lib/zookeeper
containers:
- name: zookeeper
image: my-registry/zookeeper:latest
env:
- name: KAFKA_LOG4J_OPTS
value: -Dlog4j.configuration=file:/etc/kafka/log4j.properties
command:
- ./bin/zookeeper-server-start.sh
- /etc/kafka/zookeeper.properties
lifecycle:
preStop:
exec:
command:
- "/bin/bash"
- "/pre_stop.sh"
readinessProbe:
exec:
command:
- "/bin/bash"
- "/readiness_probe.sh"
volumeMounts:
- name: config
mountPath: /etc/kafka
- name: data
mountPath: /var/lib/zookeeper
volumes:
- name: configmap
configMap:
name: zookeeper-config
- name: config
emptyDir: {}
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes: [ "ReadWriteOnce" ]
storageClassName: "hostpath"
resources:
requests:
storage: 500Mi


The init container


The init container purpose is to update the ZooKeeper instances in the ZooKeeper configuration file.

It is based on the following Dockerfile:


FROM ubuntu:18.04
COPY files /
ENTRYPOINT /entrypoint.sh


And on the following script, which adds the list of the ZooKeeper pods to the configuration file.


#!/bin/bash
set -e

[[ -d /var/lib/zookeeper/data ]] || mkdir /var/lib/zookeeper/data
export ZOOKEEPER_SERVER_ID=${HOSTNAME##*-}
echo "my server id is ${ZOOKEEPER_SERVER_ID}"
echo "${ZOOKEEPER_SERVER_ID}" > /var/lib/zookeeper/data/myid

cp -Lur /etc/kafka-configmap/* /etc/kafka/
sed -i "/^server\\./d" /etc/kafka/zookeeper.properties

# ensure new line in file
echo "" >> /etc/kafka/zookeeper.properties

for N in $(seq ${ZOO_REPLICAS})
do
index=$(( $N - 1 ))
serverName="zookeeper-statefulset-${index}.zookeeper-internal-service"
echo "server.${index}=${serverName}:2888:3888:participant" >> /etc/kafka/zookeeper.properties
done

sed -i "s/server\.$ZOOKEEPER_SERVER_ID\=[a-z0-9.-]*/server.$ZOOKEEPER_SERVER_ID=0.0.0.0/" /etc/kafka/zookeeper.properties




The updated ZooKeeper container


The updated ZooKeeper container is created using the following Dockerfile:

FROM solsson/kafka:2.4.1@sha256:79761e15919b4fe9857ec00313c9df799918ad0340b684c0163ab7035907bb5a
RUN apt update
RUN apt install -y net-tools
RUN apt install -y curl

COPY files /
ENTRYPOINT /entrypoint.sh


And includes a cleanup script: pre_stop.sh


#!/bin/bash
kill -s TERM 1

while $(kill -0 1 2>/dev/null)
do
sleep 1
done



And also includes a readiness probe script: readiness_probe.sh


#!/bin/bash
set -e
response=$(echo ruok | nc -w 1 -q 1 127.0.0.1 2181)

if [[ "$response" == "imok" ]]
then
exit 0
fi

exit 1


Final Notes

In this post we have reviewed deploying a ZooKeeper cluster on kubernetes.
In case setting the ZOO_REPLICAS environment variable to "1", the ZooKeeper will run in a standalone mode.
In case setting the ZOO_REPLICAS environment variable to "3" or more, the ZooKeeper will run in a cluster mode.


Report a Grafana HeatMap graph from a GO application




In the previous post Report prometheus metrics from a GO application, we've created a simple counters report from a GO application.
In this post we will review an HeapMap reporting from a GO application.

The thing to notice is that Prometheus standard for a pre-bucket counters is that each bucket contains all the smaller buckets, while Grafana expect each bucket to include only the bucket range.

For example, assume we have the following statistics of response time per request:
  • 10 requests had a response time of 100 ms
  • 10 requests had a response time of 200 ms
  • 10 requests had a response time of 300 ms
  • 10 requests had a response time of 400 ms

Prometheus pre-buckets counters will be:
  • response_time{le="100"} 10
  • response_time{le="200"} 20
  • response_time{le="300"} 30
  • response_time{le="400"} 40
While Grafana expects:
  • response_time{le="100"} 10
  • response_time{le="200"} 10
  • response_time{le="300"} 10
  • response_time{le="400"} 10

To solve this, we implement the non cumulative buckets on our own.
First, we configure an array of 10 buckets.
Each bucket width is 500ms, so we actually represent a heatmap of 0-5000ms, using steps of 500ms.


func main() {
heatmap = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "response_time",
Help: "response time for each HTTP handler",
},
[]string{"le"},
)

buckets = make([]int64, 10)
var total int64
for i := 0; i < len(buckets); i++ {
total += 500
buckets[i] = total
}

http.Handle("/metrics", promhttp.Handler())
addHandlerFunc("/foo", fooHandler)
addHandlerFunc("/bar", barHandler)
log.Fatal(http.ListenAndServe(":8080", nil))
}


Next we do the actual assignment of the request to the related bucket according to the response time:


func addHandlerFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) {
http.HandleFunc(pattern, func(w http.ResponseWriter, r *http.Request) {
startTime:= time.Now()
handler(w, r)
labels := make(map[string]string)
labels["handler"] = pattern
passedTime:= time.Since(startTime)
labels["le"] = getBucket(passedTime)
heatmap.With(labels).Inc()
})
}



and the actual bucket locate function is:


func getBucket(passedTime time.Duration) string {
millis := passedTime.Microseconds()
for i := 0; i < len(buckets); i++ {
if millis <= buckets[i] {
return strconv.FormatInt(buckets[i], 10)
}
}
return "INF"
}


And so, we get a nice heatmap in Grafana, indicating the what is the histogram of the response time:





Final Notes


In this post we have reviewed bypassing a compatibility issue of Prometheus and Grafana by using our own buckets implementation.

Maybe in one of the next versions this issue will be solve by one of the Prometheus and Grafana parties.

Wednesday, June 10, 2020

Report prometheus metrics from a GO application




In this post we'll review adding a simple counter metric to a GO application.
See also the related posts:

Let's assume we have a GO application providing two services over HTTP: /foo and /bar.
For this example, the services implementation is sleep for a random time, and return a string in the response.


package main

import (
"log"
"math/rand"
"net/http"
"time"
)

func main() {
http.HandleFunc("/foo", fooHandler)
http.HandleFunc("/bar", barHandler)
log.Fatal(http.ListenAndServe(":8080", nil))
}

func fooHandler(w http.ResponseWriter, _ *http.Request) {
randomTime := time.Duration(int(rand.Float32() * 1000))
time.Sleep(time.Millisecond * randomTime)
_, _ = w.Write([]byte("foo is done"))
}
func barHandler(w http.ResponseWriter, _ *http.Request) {
randomTime := time.Duration(int(rand.Float32() * 1000))
time.Sleep(time.Millisecond * randomTime)
_, _ = w.Write([]byte("bar is done"))
}


First we want to integrate the /metrics URL with the prometheus handler.
We add the following code:


package main

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
)


func main() {
http.Handle("/metrics", promhttp.Handler())


Once this code is added, we immediately get some defaults GO related counters in the /metrics URL call:

$ curl localhost:8080/metrics

# HELP go_gc_duration_seconds A summary of the pause duration of garbage collection cycles.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 0
go_gc_duration_seconds{quantile="0.25"} 0
go_gc_duration_seconds{quantile="0.5"} 0
go_gc_duration_seconds{quantile="0.75"} 0
go_gc_duration_seconds{quantile="1"} 0
go_gc_duration_seconds_sum 0
go_gc_duration_seconds_count 0
# HELP go_goroutines Number of goroutines that currently exist.
# TYPE go_goroutines gauge
go_goroutines 7
# HELP go_info Information about the Go environment.
# TYPE go_info gauge
go_info{version="go1.14.2"} 1
# HELP go_memstats_alloc_bytes Number of bytes allocated and still in use.
# TYPE go_memstats_alloc_bytes gauge
go_memstats_alloc_bytes 639848
# HELP go_memstats_alloc_bytes_total Total number of bytes allocated, even if freed.
# TYPE go_memstats_alloc_bytes_total counter
go_memstats_alloc_bytes_total 639848
# HELP go_memstats_buck_hash_sys_bytes Number of bytes used by the profiling bucket hash table.
# TYPE go_memstats_buck_hash_sys_bytes gauge
go_memstats_buck_hash_sys_bytes 3836
# HELP go_memstats_frees_total Total number of frees.
# TYPE go_memstats_frees_total counter
go_memstats_frees_total 112
# HELP go_memstats_gc_cpu_fraction The fraction of this program's available CPU time used by the GC since the program started.
# TYPE go_memstats_gc_cpu_fraction gauge
go_memstats_gc_cpu_fraction 0
# HELP go_memstats_gc_sys_bytes Number of bytes used for garbage collection system metadata.
# TYPE go_memstats_gc_sys_bytes gauge
go_memstats_gc_sys_bytes 3.436808e+06
# HELP go_memstats_heap_alloc_bytes Number of heap bytes allocated and still in use.
# TYPE go_memstats_heap_alloc_bytes gauge
go_memstats_heap_alloc_bytes 639848
# HELP go_memstats_heap_idle_bytes Number of heap bytes waiting to be used.
# TYPE go_memstats_heap_idle_bytes gauge
go_memstats_heap_idle_bytes 6.5093632e+07
# HELP go_memstats_heap_inuse_bytes Number of heap bytes that are in use.
# TYPE go_memstats_heap_inuse_bytes gauge
go_memstats_heap_inuse_bytes 1.589248e+06
# HELP go_memstats_heap_objects Number of allocated objects.
# TYPE go_memstats_heap_objects gauge
go_memstats_heap_objects 2260
# HELP go_memstats_heap_released_bytes Number of heap bytes released to OS.
# TYPE go_memstats_heap_released_bytes gauge
go_memstats_heap_released_bytes 6.5093632e+07
# HELP go_memstats_heap_sys_bytes Number of heap bytes obtained from system.
# TYPE go_memstats_heap_sys_bytes gauge
go_memstats_heap_sys_bytes 6.668288e+07
# HELP go_memstats_last_gc_time_seconds Number of seconds since 1970 of last garbage collection.
# TYPE go_memstats_last_gc_time_seconds gauge
go_memstats_last_gc_time_seconds 0
# HELP go_memstats_lookups_total Total number of pointer lookups.
# TYPE go_memstats_lookups_total counter
go_memstats_lookups_total 0
# HELP go_memstats_mallocs_total Total number of mallocs.
# TYPE go_memstats_mallocs_total counter
go_memstats_mallocs_total 2372
# HELP go_memstats_mcache_inuse_bytes Number of bytes in use by mcache structures.
# TYPE go_memstats_mcache_inuse_bytes gauge
go_memstats_mcache_inuse_bytes 13888
# HELP go_memstats_mcache_sys_bytes Number of bytes used for mcache structures obtained from system.
# TYPE go_memstats_mcache_sys_bytes gauge
go_memstats_mcache_sys_bytes 16384
# HELP go_memstats_mspan_inuse_bytes Number of bytes in use by mspan structures.
# TYPE go_memstats_mspan_inuse_bytes gauge
go_memstats_mspan_inuse_bytes 37400
# HELP go_memstats_mspan_sys_bytes Number of bytes used for mspan structures obtained from system.
# TYPE go_memstats_mspan_sys_bytes gauge
go_memstats_mspan_sys_bytes 49152
# HELP go_memstats_next_gc_bytes Number of heap bytes when next garbage collection will take place.
# TYPE go_memstats_next_gc_bytes gauge
go_memstats_next_gc_bytes 4.473924e+06
# HELP go_memstats_other_sys_bytes Number of bytes used for other system allocations.
# TYPE go_memstats_other_sys_bytes gauge
go_memstats_other_sys_bytes 1.034244e+06
# HELP go_memstats_stack_inuse_bytes Number of bytes in use by the stack allocator.
# TYPE go_memstats_stack_inuse_bytes gauge
go_memstats_stack_inuse_bytes 425984
# HELP go_memstats_stack_sys_bytes Number of bytes obtained from system for stack allocator.
# TYPE go_memstats_stack_sys_bytes gauge
go_memstats_stack_sys_bytes 425984
# HELP go_memstats_sys_bytes Number of bytes obtained from system.
# TYPE go_memstats_sys_bytes gauge
go_memstats_sys_bytes 7.1649288e+07
# HELP go_threads Number of OS threads created.
# TYPE go_threads gauge
go_threads 7
# HELP process_cpu_seconds_total Total user and system CPU time spent in seconds.
# TYPE process_cpu_seconds_total counter
process_cpu_seconds_total 0.42
# HELP process_max_fds Maximum number of open file descriptors.
# TYPE process_max_fds gauge
process_max_fds 1.048576e+06
# HELP process_open_fds Number of open file descriptors.
# TYPE process_open_fds gauge
process_open_fds 9
# HELP process_resident_memory_bytes Resident memory size in bytes.
# TYPE process_resident_memory_bytes gauge
process_resident_memory_bytes 8.392704e+06
# HELP process_start_time_seconds Start time of the process since unix epoch in seconds.
# TYPE process_start_time_seconds gauge
process_start_time_seconds 1.59184909075e+09
# HELP process_virtual_memory_bytes Virtual memory size in bytes.
# TYPE process_virtual_memory_bytes gauge
process_virtual_memory_bytes 1.112522752e+09
# HELP process_virtual_memory_max_bytes Maximum amount of virtual memory available in bytes.
# TYPE process_virtual_memory_max_bytes gauge
process_virtual_memory_max_bytes -1
# HELP promhttp_metric_handler_requests_in_flight Current number of scrapes being served.
# TYPE promhttp_metric_handler_requests_in_flight gauge
promhttp_metric_handler_requests_in_flight 1
# HELP promhttp_metric_handler_requests_total Total number of scrapes by HTTP status code.
# TYPE promhttp_metric_handler_requests_total counter
promhttp_metric_handler_requests_total{code="200"} 0
promhttp_metric_handler_requests_total{code="500"} 0
promhttp_metric_handler_requests_total{code="503"} 0


Next, we want to add our own metrics. 
Let's add counters for the amount of each service invocation.
To implement this, we add a vector of counters containing an entry per each handler, and we wrap the handler execution with a counter update code.


package main

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"log"
"math/rand"
"net/http"
"time"
)

var counters *prometheus.CounterVec

func main() {
counters = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "invocations",
Help: "counters for each HTTP service",
},
[]string{"handler"},
)

http.Handle("/metrics", promhttp.Handler())
addHandlerFunc("/foo", fooHandler)
addHandlerFunc("/bar", barHandler)
log.Fatal(http.ListenAndServe(":8080", nil))
}

func addHandlerFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) {
http.HandleFunc(pattern, func(w http.ResponseWriter, r *http.Request) {
labels := make(map[string]string)
labels["handler"] = pattern
counters.With(labels).Inc()

handler(w, r)
})
}


Now we get the custom counters in the /metrics HTTP response:


$ curl localhost:8080/metrics | grep invocations

# HELP invocations counters for each HTTP service
# TYPE invocations counter
invocations{handler="/bar"} 2
invocations{handler="/foo"} 1


Great!
Our application is now reporting the custom metrics.
Let run it in a kubernetes, that already has a prometheus installed.
We need to specify the prometheus annotations, to indicate that we want prometheus to scrape the pod:
  1. prometheus.io/scrape: "true"
  2. prometheus.io/path: "/metrics"
  3. prometheus.io/port: "8080"

We also configure the deployment to run 2 replicas of our application.


apiVersion: apps/v1
kind: Deployment
metadata:
name: prom-deployment
spec:
replicas: 2
selector:
matchLabels:
configid: prom-container
template:
metadata:
labels:
configid: prom-container
annotations:
prometheus.io/scrape: "true"
prometheus.io/path: "/metrics"
prometheus.io/port: "8080"
spec:
containers:
- name: prom
image: myregistry:5000/prom/dev:latest
imagePullPolicy: IfNotPresent


Let's view the counters in prometheus GUI.
We search metric by the name invocations, and we find 4 entries, as we have 2 replicas, each running 2 handlers.






Using the metrics collected by the prometheus, we can easily add grafana graphs to view the statistics overtime.

For example, we can view the counters by pod:





and we can view the counters by handler:




Final Notes

In this post we have reviewed how to create an HTTP service based on a GO application, and report metric for prometheus.
We have also see how can we view the reported metrics in prometheus, and create graphs in grafana.


Wednesday, June 3, 2020

Java LinkedList vs ArrayList


Last week I have interviewed a Senior Java Developer, having more than 10 years of experience in the Java domain.
I wanted to ask some "warm-up" questions to break the ice in the beginning of the interview, so I've asked about the differences between LinkedList and ArrayList. 
To my surprise, he had failed providing answers.
If you claim to be a Java expert, you must know the answer to this question.


What is a LinkedList?


A LinkedList, as its name implies, is a list based on list of elements.
Java keep a doubly linked list, which means each element keeps a pointer to its previous element, and its next element.




The upside of using a LinkedList is that we can, with relatively low cost, update it.

For example, adding an element in the middle of the list is simple:
  • Create a new node, and update its pointer to the next element.
  • Update the previous element pointer to point the the new node.

There are several downsides for using a LinkedList.

First, the memory footprint is higher, as we need to keep two pointers per each list element (for the previous element, and for the next element). 
Each pointer required additional 4 bytes or 8 bytes on 32 bits JVM or 64 bits JVM.
So in most cases this means 2 X 8 bytes = 16 additional bytes per each element.

Notice that each element is actually a java object:  LinkedList.Node
This means also overhead on the garbage collector per the size of the list.


Second, to get the Nth element, we need to scan all elements until the Nth position. 
This means that data access to the Nth element has cost of O(N).


What is an ArrayList?


An ArrayList is a list that internally uses a dynamic sized array to store the elements.
The dynamic array size is automatically increased when its capacity is reached.
The related size increase is based on this formula (simplified version, see the actual JRE source code for full code):

int newCapacity = oldCapacity + (oldCapacity >> 1);

This means we'll expand the size by 50% upon capacity reach.




There are several upsides for using an ArrayList.

First is the memory usage, which is almost identical to the actual elements size.
Notice that we actually keep an array of pointers, so we do have a pointer to keep, but as we must keep a pointer to access an object, we can ignore this overhead.

Second is the access to the Nth element, which is using a direct access by:
  array start location + N*pointer size

The downside of ArrayList is updates.
To add or remove element from an ArrayList, we need to shift all elements after the added elements.

In addition, add of a new element, would sometime cause inflation of the array, which means, allocation of a new array, and copy of the elements, but in average, over many add operations, this is still O(1).


Summary


We should use ArrayList when:
  • We have a lot of elements, and we want to reduce memory footprint
  • We do not update the list, but only adding elements at the end of the list
We should use LinkedList when:
  • We do not have many elements
  • We update the list a lot


  LinkedList  ArrayList 
 add O(1) O(1) on average
 remove and add in specific index O(1) O(N)
 get specific index O(N) O(1)


Some Performance Tests


I wanted to verify some of the statement made before, and I've run the following test code:


package com.alon.listing;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;

public class ListPerf {
private static final int MAX_SIZE = 1_000_000;
private final List<Integer> list;

public ListPerf(List<Integer> list) {
this.list = list;
}

public void stressTests() {
addTests();
fetchTests();
deleteTests();
deleteFirstTests();
}

private void timerRun(int times, String description, Runnable runnable) {
long start = System.currentTimeMillis();
for (int i = 0; i < times; i++) {
runnable.run();
}
float singleTime = System.currentTimeMillis() - start;
singleTime = singleTime * 1000 / times;
System.out.println(description + " timing " + String.format("%.3fms", singleTime));
}

private void addTests() {
Random random = new Random();
timerRun(1_000_000, "add", () -> {
list.add(random.nextInt());
});
}

private void fetchTests() {
Random random = new Random();
timerRun(1000, "fetch", () -> {
list.get(random.nextInt(list.size()));
});
}

private void deleteTests() {
Random random = new Random();
timerRun(1000, "delete", () -> {
list.remove(random.nextInt(list.size()));
});
}

private void deleteFirstTests() {
timerRun(1000, "delete first", () -> {
list.remove(0);
});
}

public static void main(String[] args) {
System.out.println("=== LinkedList tests ===");
new ListPerf(new LinkedList<>()).stressTests();

System.out.println("=== ArrayList tests ===");
new ListPerf(new ArrayList<>()).stressTests();
}
}


and get the following results:


=== LinkedList tests ===
add timing 0.173ms
fetch timing 1534.000ms
delete timing 1317.000ms
delete first timing 0.000ms
=== ArrayList tests ===
add timing 0.060ms
fetch timing 0.000ms
delete timing 133.000ms
delete first timing 261.000ms


Notice that the delete test has better results on the ArrayList, which is opposite than the expected result.
The reason is that removal of an Nth element requires first to locate the element, hence the cost is higher on the LinkedList.

When removing the first element, there is no need to scan the list, and hence the LinkedList performance is much better.