Wednesday, June 24, 2020

Deploy Apache Kafka on Kubernetes





In this post we will review the steps required to deploy Apache Kafka on kubernetes.
From the Apache Kafka site:

"
Kafka® is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.
"

Notice:
To use Kafka on kubernetes, start by deploy Apache ZooKeeper, which is used by Kafka to manage the cluster brokers, and the leader election. See my previous post: Deploy Apache Zookeeper on Kubernetes

Once the Zookeeper is deployed, we will create the following kubernetes resources:
  1. ConfigMap
  2. Headless Service
  3. Exposed Service
  4. StatefulSet
  5. Init container


1. The ConfigMap


The ConfigMap includes two files:
  • The logger configuration
  • The Kafka server.properties


apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-config
data:
log4j.properties: |-
# Unspecified loggers and loggers with additivity=true output to server.log and stdout
# Note that INFO only applies to unspecified loggers, the log level of the child logger is used otherwise
log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log
log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

# Change the two lines below to adjust ZK client logging
log4j.logger.org.I0Itec.zkclient.ZkClient=INFO
log4j.logger.org.apache.zookeeper=INFO

# Change the two lines below to adjust the general broker logging level (output to server.log and stdout)
log4j.logger.kafka=INFO
log4j.logger.org.apache.kafka=INFO

# Change to DEBUG or TRACE to enable request logging
log4j.logger.kafka.request.logger=WARN, requestAppender
log4j.additivity.kafka.request.logger=false

# Uncomment the lines below and change log4j.logger.kafka.network.RequestChannel$ to TRACE for additional output
# related to the handling of requests
#log4j.logger.kafka.network.Processor=TRACE, requestAppender
#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
#log4j.additivity.kafka.server.KafkaApis=false
log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
log4j.additivity.kafka.network.RequestChannel$=false

log4j.logger.kafka.controller=TRACE, controllerAppender
log4j.additivity.kafka.controller=false

log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
log4j.additivity.kafka.log.LogCleaner=false

log4j.logger.state.change.logger=TRACE, stateChangeAppender
log4j.additivity.state.change.logger=false

# Change to DEBUG to enable audit log for the authorizer
log4j.logger.kafka.authorizer.logger=WARN, authorizerAppender
log4j.additivity.kafka.authorizer.logger=false

server.properties: |-
log.dirs=/var/lib/kafka/data/topics
num.partitions=12
default.replication.factor=3
min.insync.replicas=2
auto.create.topics.enable=false
broker.rack=rack1
listeners=PLAINTEXT://:9092,OUTSIDE://:9094
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,OUTSIDE:PLAINTEXT
inter.broker.listener.name=PLAINTEXT
offsets.retention.minutes=10080
log.retention.hours=-1
zookeeper.connect=zookeeper-exposed-service:80


2. The Headless Service


The Kafka headless service is used by the init container to update the published FQDN of the broker.


apiVersion: v1
kind: Service
metadata:
name: kafka-internal-service
spec:
selector:
configid: kafka-container
type: ClusterIP
clusterIP: None
publishNotReadyAddresses: true
ports:
- port: 9092



3. The Exposed Service


The Kafka service exposes the API for the clients. 
The client connection is as follows:

  • The client connects to the exposed service
  • The client reaches randomly (by kubernetes service) to one of the Kafka brokers
  • The broker returns the published FQDN of the selected Kafka broker. This uses the following FQDN: <POD_NAME>.<HEADLESS SERVICE NAME>
  • The client directly connects to the selected broker


apiVersion: v1
kind: Service
metadata:
name: kafka-service
spec:
selector:
configid: kafka-container
ports:
- port: 9092


3. The StatefulSet


The StatefulSet includes the configuration of the Kafka broker nodes.
It includes an init container to update the kafka configuration.


apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka-statefulset
spec:
serviceName: kafka-internal-service
selector:
matchLabels:
configid: kafka-container
replicas: 3
template:
metadata:
labels:
configid: kafka-container
spec:
terminationGracePeriodSeconds: 30
initContainers:
- name: init
imagePullPolicy: IfNotPresent
image: my-registry/kafka-init/dev:latest
volumeMounts:
- name: configmap
mountPath: /etc/kafka-configmap
- name: config
mountPath: /etc/kafka
- name: extensions
mountPath: /opt/kafka/libs/extensions
containers:
- name: broker
image: solsson/kafka:2.4.1@sha256:79761e15919b4fe9857ec00313c9df799918ad0340b684c0163ab7035907bb5a
env:
- name: CLASSPATH
value: /opt/kafka/libs/extensions/*
- name: KAFKA_LOG4J_OPTS
value: -Dlog4j.configuration=file:/etc/kafka/log4j.properties
- name: JMX_PORT
value: "5555"
command:
- ./bin/kafka-server-start.sh
- /etc/kafka/server.properties
lifecycle:
preStop:
exec:
command: ["sh", "-ce", "kill -s TERM 1; while $(kill -0 1 2>/dev/null); do sleep 1; done"]
readinessProbe:
tcpSocket:
port: 9092
timeoutSeconds: 1
volumeMounts:
- name: config
mountPath: /etc/kafka
- name: data
mountPath: /var/lib/kafka/data
- name: extensions
mountPath: /opt/kafka/libs/extensions
volumes:
- name: configmap
configMap:
name: kafka-config
- name: config
emptyDir: {}
- name: extensions
emptyDir: {}
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes: [ "ReadWriteOnce" ]
storageClassName: "hostpath"
resources:
requests:
storage: 500Mi


The Init Container


The init container updates the server.properties file with the FQDN of the pod.

The Dockerfile is:


FROM ubuntu:18.04
COPY files /
ENTRYPOINT /entrypoint.sh


and the entrypoint script is:


#!/bin/bash

cp /etc/kafka-configmap/* /etc/kafka/

KAFKA_BROKER_ID=${HOSTNAME##*-}

serverName="kafka-statefulset-${KAFKA_BROKER_ID}.kafka-internal-service"
sed -i "s/#init#broker.id/broker.id=${KAFKA_BROKER_ID}/" /etc/kafka/server.properties
sed -i "s/#init#advertised.listeners/advertised.listeners=PLAINTEXT:\\/\\/${serverName}:9092/" /etc/kafka/server.properties



Final Notes


In this post we have reviewed Kafka deployment on kubernetes.
Notice that we did not get into configuration the Kafka itself for your application need.
You will probably need to update the server.properties for you needs.

For example, to run a single replica of Kafka, you will need to update the server properties with:

default.replication.factor=1
min.insync.replicas=1
auto.create.topics.enable=true
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1


Liked this post? Leave a comment...


No comments:

Post a Comment