Full Blog TOC

Full Blog Table Of Content with Keywords Available HERE

Thursday, December 30, 2021

Remove Cilium from AWS EKS

 

Cilium is a great too, but removing it might be an issue.

To remove cilium from AWS EKS, do the following:


1. Uninstall cilium chart:

helm delete cilium   --namespace kube-system 


2. Use node-shell to remove cilium CNI on each node:

curl -LO https://github.com/kvaps/kubectl-node-shell/raw/master/kubectl-node_shell chmod +x ./kubectl-node_shellsudo mv ./kubectl-node_shell /usr/local/bin/kubectl-node_shellkubectl get nodes | awk '{print "kubectl node-shell "  $1 " -- rm -f /etc/cni/net.d/05-cilium.conf&" }' >xchmod +x x; ./x; rm x


3. Reinstall AWS node CNI

kubectl apply -n kube-system -f aws-node.yaml


The aws-node is located here:
The list of aws-node is available here:



Wednesday, December 22, 2021

Using Random Forest in Python

 

image from https://en.wikipedia.org/wiki/Random_forest



In this post we will review usage of a random forest classifier in python.


We use a very simple CSV as input. In real life you will have many columns, and complex data.



height,weight,person
80,40,child
70,30,child
50,10,child
180,80,adult
170,80,adult
185,80,adult



First we load the CSV to a data frame, and print its head.



import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split

pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)

df = pd.read_csv("input.csv")
print(df.head(5))



The random forest works with floats, both on features, and on labels. Hence we convert the person column to an int label:



def convert_to_int(row):
if row['person'] == 'adult':
return 1
return 0


df['is_adult'] = df.apply(lambda row: convert_to_int(row), axis=1)
df.drop(labels=['person'], axis=1, inplace=True)



Next we split the data to training and testing segments:



labels = np.array(df['is_adult'])
features = df.drop('is_adult', axis=1)
feature_list = list(features.columns)
features = np.array(features)
train_features, test_features, train_labels, test_labels = \
train_test_split(features,
labels,
test_size=0.25,
random_state=42,
)
print('features shape {} labels shape {}'.format(
train_features.shape, train_labels.shape))
print('features shape {} labels shape {}'.format(
test_features.shape, test_labels.shape))

with np.printoptions(threshold=np.inf):
print(train_features)
print(train_labels)



Let's examine a dummy model as a baseline. This model always guess that we have a child, and not an adult.



baseline_predictions = np.full(test_labels.shape, 0)
baseline_errors = abs(baseline_predictions - test_labels)

with np.printoptions(threshold=np.inf):
print("baseline predictions", baseline_predictions)
print("baseline errors",baseline_errors)

print('error baseline {}'.format(
round(np.mean(baseline_errors), 3)))



Now let create the random forest classifier, and check its error rate.



forest = RandomForestRegressor(n_estimators=1000, random_state=42)
forest.fit(train_features, train_labels)

predictions = forest.predict(test_features)

prediction_threshold = 0.5
predictions[predictions < prediction_threshold] = 0
predictions[predictions >= prediction_threshold] = 1
with np.printoptions(threshold=np.inf):
print(predictions)

prediction_errors = predictions - test_labels
print('error for test {}'.format(
round(np.mean(abs(prediction_errors)), 3), 'degrees.'))



We can check the importance of each feature in the model:



importances = list(forest.feature_importances_)
feature_importances = [(feature, round(importance, 2)) for feature, importance in
zip(feature_list, importances)]
feature_importances = sorted(feature_importances, key=lambda x: x[1], reverse=True)
for pair in feature_importances:
print('variable: {} Importance: {}'.format(*pair))



Lastly, we can examine true/false positive/negative rate:



joined = np.stack((predictions, test_labels), axis=1)
tp = joined[np.where(
(joined[:, 0] == 1) *
(joined[:, 1] == 1)
)]
tn = joined[np.where(
(joined[:, 0] == 0) *
(joined[:, 1] == 0)
)]
fp = joined[np.where(
(joined[:, 0] == 1) *
(joined[:, 1] == 0)
)]
fn = joined[np.where(
(joined[:, 0] == 0) *
(joined[:, 1] == 1)
)]
print('true positive {}'.format(np.shape(tp)[0]))
print('true negative {}'.format(np.shape(tn)[0]))
print('false positive {}'.format(np.shape(fp)[0]))
print('false negative {}'.format(np.shape(fn)[0]))





Monday, December 13, 2021

Create your Bash Completion

 


In this post we will review how to create a bash completion for your project scripts.

As our project grows we add more and more scripts that a developer can manually execute for various development operations. Some of these script require arguments and a bash completion could highly assist the develop, especially when this scripts are often used, and when the argument are long.

My recommendation for this is to add a single bash completion script as part of the project GIT repository. This script should be executed in the bash RC file: ~/.bashrc , for example:

source /home/foo/git/my-project/bash_completion.sh


To provide a static auto-complete, we use a list of values. The following example is a completion for a script that gets the environment name as an argument. 


complete -W "dev staging prod" ./examine-env.sh


In other cases the arguments values are dynamic, for example, we have a parent folder under which we have a folder for each microservice. We have many script that receive a name of a microservice. So we want a dynamic completion with the service name. This is done using the following:


function service_completion(){
if [ "${#COMP_WORDS[@]}" != "2" ]; then
return
fi

local suggestions=($(compgen -W "$(ls ./services-parent-folder | sed 's/\t/ /')" -- "${COMP_WORDS[1]}"))

if [ "${#suggestions[@]}" == "1" ]; then
local value=$(echo ${suggestions[0]/%\ */})
COMPREPLY=("$value")
else
COMPREPLY=("${suggestions[@]}")
fi
}


complete -F service_completion ./build-micro-service.sh



Notice that the auto completion is for an actual used working directory. In case running from another folder, and instead of running ./examine-env.sh you would use ./foo/examine-env.sh , the auto completion would not be invoked.




Wednesday, December 8, 2021

Locate Origin of Kubernetes Pods using Go

 


In this post we will find for each running pod, the source deployment or statefulset that caused it to run. This is required if we want to show this information in a nice table, or or we want to get additional information about the pod from the source deployment or statefulset.


We start by initiating the kubernetes client. See this post for information about methods to create the kubernetes client.



package main

import (
"context"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"os"
"path/filepath"
)

func main() {
configPath := filepath.Join(os.Getenv("HOME"), ".kube", "config")
restConfig, err := clientcmd.BuildConfigFromFlags("", configPath)
if err != nil {
panic(err)
}

k8sClient, err := kubernetes.NewForConfig(restConfig)
if err != nil {
panic(err)
}



Next we want to fill up a map of owners. This is a map of statefulsets and deployments that caused a pod to start. Notice that deployments are actually starting a replicaset, so we add the replicaset name to point to the original deployment.



owners := make(map[string]string)

listOptions := metav1.ListOptions{}
namespace := "default"

statefulsets, err := k8sClient.AppsV1().StatefulSets(namespace).List(context.Background(), listOptions)
if err != nil {
panic(err)
}
for _, statefulSet := range statefulsets.Items {
owners[statefulSet.Name] = fmt.Sprintf("statefulset %v", statefulSet.Name)
}

deployments, err := k8sClient.AppsV1().Deployments(namespace).List(context.Background(), listOptions)
if err != nil {
panic(err)
}

for _, deployment := range deployments.Items {
owners[deployment.Name] = fmt.Sprintf("deployment %v", deployment.Name)
}

replicasets, err := k8sClient.AppsV1().ReplicaSets(namespace).List(context.Background(), listOptions)
if err != nil {
panic(err)
}

for _, replica := range replicasets.Items {
for _, owner := range replica.OwnerReferences {
deployment := owners[owner.Name]
owners[replica.Name] = deployment
}
}



Having the owners map populated, we can now scan the pods, and print the owner for each pod.



pods, err := k8sClient.CoreV1().Pods(namespace).List(context.Background(), listOptions)
if err != nil {
panic(err)
}

for _, pod := range pods.Items {
for _, owner := range pod.OwnerReferences {
parent := owners[owner.Name]
fmt.Printf("pod %v owner %v\n", pod.Name, parent)
}
}



And an example output is:



pod sample-hackazon-deployment-546f47b8cb-j4j7x owner deployment sample-hackazon-deployment

pod sample-hackazon-mysql-deployment-bd6465f75-m4sgc owner deployment sample-hackazon-mysql-deployment

pod sample-onepro-deployment-7669d59cc4-k8g8v owner deployment sample-onepro-deployment

pod sample-onepro-nginx-deployment-7669dd8d46-8fxlw owner deployment sample-onepro-nginx-deployment

pod udger-statefulset-0 owner statefulset udger-statefulset









Monday, November 29, 2021

Run a Python Server in Docker


 


In this post we will review the steps to run a python web server in a docker container.

Let start with a python code to run a simple Hello World server:


server.py

from flask import Flask
from waitress import serve

app = Flask(__name__)


@app.route("/")
def hello():
return "Hello World"


serve(app, host="0.0.0.0", port=8080)



To enable our docker image to include all the dependencies, we create a requirements.txt file using the following command:



pip3 freeze > requirements.txt 



Next, let's create the docker file:


Dockerfile

FROM python:3.9.9
WORKDIR /app

COPY src/requirements.txt requirements.txt
RUN pip3 install -r requirements.txt

COPY src /app

CMD [ "python3", "server.py"]



The docker file first installs all the dependencies using the requirements.txt file, and then copies the python source, and runs our web server.




Monday, November 22, 2021

Sending email using SendGrid in GO


 


In this post we'll send an email using SendGrid in a GO application.


The code is very simple:


import (
"github.com/sendgrid/sendgrid-go"
"github.com/sendgrid/sendgrid-go/helpers/mail"
)


message .:= mail.NewV3Mail()
personalization := mail.NewPersonalization()

message.SetFrom(mail.NewEmail("John Doe","from@example.com"))
personalization.AddTos(mail.NewEmail("Alice Bob", "to@example.com"))
personalization.Subject = "My email is here"

body := "This is my <b>HTML</b> body<br>The end!"
message.AddContent(mail.NewContent("text/html", body))
message.AddPersonalizations(personalization)

client := sendgrid.NewSendClient("MY_SECRET_API_KEY_FROM_SENDGRID")
_, err := client.Send(message)
if err != nil {
panic(err)
}
return nil



However, there are some important items to notice:

  • The SendGrid library does not use port 25 to send email. It is based on an HTTPS request to the SendGrid API servers. This is Great for networks with firewall blocking any non HTTP request.
  • You cannot send from any email that you want. To enable sending from a specific email, you need to add a sender, and authenticate it using the SendGrid GUI.
  • The free account in SendGrid is limited to 100 emails per day. In case you want more - you'll need to pay.

Why not just start your own SMTP server, and bypass the 100 emails per day limit?
Well that's because an SMTP server must have an matching MX entry in the DNS server. In addition, many mail servers just block any email arriving from an unknown SMTP server. Some even block the SendGrid SMTP itself.




Monday, November 15, 2021

Create a Graph in PDF from ElasticSearch data using Python

 



In this post we will review how to use python to fetch data from ElasticSearch, and create a graph in a new PDF document. 

Why do we need this? Well, sometime Kibana, which is part of the Elastic stack, cannot get the data that you need, and you want the make some multi-pass manipulations on the data, to prepare it for presentation. Another thing is that you might need to automation PDF creation ad part of a daily job, for example as an internal step before merging multiple PDF documents into a one large PDF report.


Let's start by installing the dependent libraries. We create a requirements file:


requirements.txt

elasticsearch==7.15.1
matplotlib==3.4.3
numpy==1.21.3
pandas==1.3.4
PyPDF2==1.26.0
python_dateutil==2.8.2



And install it using the command:



pip install -r requirements.txt



Now we can create a connection to the ElasticSearch. Notice that we add timeouts configuration to ensure that long running queries will not cause errors.



import datetime
import os
import elasticsearch
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from dateutil import parser
from matplotlib.backends.backend_pdf import PdfPages

from account import Account
from transaction import Transaction


es = elasticsearch.Elasticsearch(hosts=['https://my-elastic-search.example.com'],
http_auth=('elastic', 'my-password'),
request_timeout=120,
timeout=120)


What about running queries? How do we replace query parameters?

I've found the easiest way is to create a json file, that includes place holders for replacing parameters. So we use a function to get the query and replace the place holders.



def replace_in_string(data, replace_name, replace_value):
return data.replace(
'___{}___'.format(replace_name),
'{}'.format(replace_value)
)


def get_query(file_name, replaces):
file_path = 'queries/{}.json'.format(file_name)
file = open(file_path, 'r')
data = file.read()
file.close()
for name, value in replaces.items():
data = replace_in_string(data, name, value)
return data



An example of a query file is the following:


query.json

{
"sort": [
{
"timestamp": {
"order": "desc",
"unmapped_type": "boolean"
}
}
],
___SEARCH_AFTER___
"query": {
"bool": {
"must": [],
"filter": [
{
"match_all": {}
},
{
"range": {
"timestamp": {
"gte": "___FROM_TIME___",
"lte": "___TO_TIME___",
"format": "strict_date_optional_time"
}
}
},
{
"range": {
"cost": {
"gte": ___FROM_COST___,
"lt": 100
}
}
},
{
"match_phrase": {
"category": "toys"
}
}
],
"should": [],
"must_not": []
}
}
}



This query fetches payment records in a specified time range, and with some additional filters. Notice the "search after" place holder. This one will be used for pagination. Why do we need pagination? Because the ElasticSearch returns up to 10K records. After that, we need to repeatedly send a new query that asks for the next page. The following function handles the fetch including the pagination.



def es_scroll(index, query_name, replaces, page_size=10000, max_items=0):
page_number = 1
search_after = ''
fetched_items = 0
while True:
replaces['SEARCH_AFTER'] = search_after
query = get_query(query_name, replaces)
try:
page = es.search(index=index, body=query, size=page_size)
except elasticsearch.exceptions.RequestError as e:
print('query was: {}'.format(query))
raise e
hits = page['hits']['hits']
if len(hits) == 0:
break
page_number += 1
hits = page['hits']['hits']
for hit in hits:
yield hit['_source']
fetched_items += 1

if max_items > 0:
max_items -= 1
if max_items == 0:
return

search_after = '"search_after": [{}],'.format(hit['sort'][0])



Let's run the actual fetch of data. We use a function to translate python date to ElasticSearch format.



def to_elastic_iso(date):
return date.isoformat().replace("+00:00", "Z")



And we fetch the records into a list.



to_time_string = '2021-11-14T08:00:00.000Z'
to_time = parser.parse(to_time_string)
duration = datetime.timedelta(minutes=0, hours=24)

replaces = {
'FROM_TIME': to_elastic_iso(to_time-duration),
'TO_TIME': to_elastic_iso(to_time),
'FROM_COST': 50
}

headers = ['Time', 'Cost']
rows = [headers]

for item in es_scroll('transactions-*', 'query', replaces):
timestamp = item["timestamp"]
cost = item["cost"]
rows.append([timestamp,cost])



The last step is to create a time series graph in a PDF. We use the following function:



def create_graph(file_name, title, data):
headers = data[0]
time_header = headers[0]
df = pd.DataFrame(data[1:], columns=headers)
df[time_header] = pd.to_datetime(df[time_header])
df.set_index(time_header)
plot = df.plot(x=time_header, y=headers[1:])
plot.set_title(title)
figure = plot.get_figure()
figure.savefig('output/{}.pdf'.format(file_name), format='pdf')
figure.clear()
plt.close(figure)
plt.cla()
plt.clf()



Final Note


The pieces of code displayed int this post can be join forces of ElasticSearch and python to create great PDF reports. Notice that the python matplotlib can be configured to display the graphs in various presentation methods.



Monday, November 8, 2021

Create Redux App with ASync Ajax Calls




 In this post we will create a react/redux app with async ajax calls.

I'm adding this example, because I believe this app should be the base for any react/redux application with ajax calls. This includes:

  • Proxy handling for the ajax requests
  • Ajax requests wrapper
  • Notification top bar to present errors
  • An example component to show the world time


Setup and Cleanup


We start from the redux base template:



npx create-react-app my-app --template redux



Delete the counter feature by the following:

  • delete folder src/features/counter
  • in src/App.js
    • return only empty <div/>
  • in src/app/store.js
    • remove import for counter reducer
    • remove the counter from the store
  • move src/app/store.js to src/store.js


Proxy


Add middleware to allow proxy requests to other services:


npm i -s http-proxy-middleware



And add the proxy configuration:


src/setupProxy.js
const {createProxyMiddleware} = require('http-proxy-middleware')

module.exports = function (app) {
app.use(
'/api',
createProxyMiddleware({
target: 'http://worldclockapi.com/',
changeOrigin: true,
logLevel: 'debug',
}),
)
}


Requests Wrapper


Add a utility to handle sending ajax requests, and parsing the errors.


src/features/request/request.js

export async function sendRequest(method, url, body) {
let fetchOptions = {
method: method,
headers: {
'Content-type': 'application/json',
},
}
if (body) {fetchOptions.body = JSON.stringify(body)}
const response = await fetch(url, fetchOptions)

if (response.status === 200) {
const json = await response.json()
return {
ok: true,
response: json,
}
}

let error = await response.text()
try {
const parsed = JSON.parse(error)
if (parsed.message) {
error = parsed.message
}
} catch (e) {
// ignore - send the actual error text
}

return {
ok: false,
response: error,
}
}



Notification/Errors Top Bar


Add a top bar notification with ability to show info/error notifications


src/features/notifications/component.js

import React from 'react'
import {useSelector} from 'react-redux'
import {selectState} from './slice'
import styles from './component.module.css'

export function Notifications() {
const state = useSelector(selectState)

let key = 0
const items = state.items.map(item =>
(
<div
key={key++}
className={styles.notification}
>
{item.text}
</div>
))

return (
<div>
{items}
</div>
)
}


src/features/component.module.css

.notification {
font-weight: bold;
color: red;
}



src/features/slice.js

import {createSlice} from '@reduxjs/toolkit'


const initialState = {
items: [],
}


export const slice = createSlice({
name: 'notifications',
initialState,
reducers: {
add: (state, action) => {
state.items.push(action.payload)
},
cleanup: (state, action) => {
const expire = Date.now() - 5000
state.items = state.items.filter(item => item.time > expire)
},
},
})

const {add, cleanup} = slice.actions


export const tick = () => (dispatch, getState) => {
dispatch(cleanup())
const state = getState().notifications
if (state.items.length > 0) {
setTimeout(() => {
dispatch(tick())
}, 1000)
}
}

export const addNotification = (isError, text) => (dispatch) => {
const item = {
text,
isError,
time: Date.now(),
}
dispatch(add(item))
dispatch(tick())
}

export const selectState = (state) => state.notifications

export default slice.reducer



World Time Component Example


Create our own feature to display the current time from worldclockapi.com.


src/features/worldtime/component.js

import React from 'react'
import {useDispatch, useSelector} from 'react-redux'
import {errorAjax, load, selectState} from './slice'
import styles from './component.module.css'

export function WorldTime() {
const dispatch = useDispatch()
const state = useSelector(selectState)
if (state.loading) {
return (
<div className={styles.loading}>
Loading
</div>
)
}

return (
<div>
<div className={styles.queries}>
{state.queries} queries
</div>
<div className={styles.time}>
{state.time}
</div>
<div
className={styles.button}
onClick={
() => {
dispatch(load())
}
}>
Reload
</div>
<div
className={styles.button}
onClick={
() => {
dispatch(errorAjax())
}
}>
Make request error
</div>
</div>
)

}



src/features/worldtime/component.module.css

.loading {
position: absolute;
left: 0;
width: 100%;
height: 100%;
background-color: rgba(128, 128, 128, 0.75);
}

.time {
width: 100%;
text-align: center;
}

.queries {
width: 100%;
text-align: center;
}

.button {
background-color: blue;
border-radius: 10px;
cursor: pointer;
width: 200px;
padding: 20px;
margin: 20px;
}



src/features/worldtime/slice.js

import {createAsyncThunk, createSlice} from '@reduxjs/toolkit'
import {sendRequest} from '../request/request'
import {addNotification} from '../notifications/slice'


const initialState = {
time: 'Unknown',
queries: 0,
loading: false,
}

export const load = createAsyncThunk(
'worldtime/load',
async (arg, thunkApi) => {
const {ok, response} = await sendRequest('get', '/api/json/est/now')

if (ok) {
return response
}

thunkApi.dispatch(addNotification(true, response))
},
)

export const errorAjax = createAsyncThunk(
'worldtime/load',
async (arg, thunkApi) => {
const {ok, response} = await sendRequest('post', '/api/not-here')

if (ok) {
return response
}

thunkApi.dispatch(addNotification(true, response))
},
)

export const slice = createSlice({
name: 'worldtime',
initialState,
reducers: {},
extraReducers: (builder) => {
builder
.addCase(load.pending, (state) => {
state.loading = true
})
.addCase(load.fulfilled, (state, action) => {
state.loading = false
if (!action.payload) {
// request had failed, do not update
return
}
state.queries++
state.time = action.payload.currentDateTime
})
},
})

export const selectState = (state) => state.worldtime

export default slice.reducer



Integrate into the Application


Now we call to the reducers from the main reducer store:


src/store.js

import {configureStore} from '@reduxjs/toolkit'
import worldtime from './features/worldtime/slice'
import notifications from './features/notifications/slice'

export const store = configureStore({
reducer: {
worldtime,
notifications,
},
})



and include the components from the main app:


src/App.js

import React from 'react'
import './App.css'
import {WorldTime} from './features/worldtime/component'
import {Notifications} from './features/notifications/component'

function App() {
return (
<div>
<Notifications/>
<WorldTime/>
</div>
)
}

export default App



To make the load time run upon application start, call it from the index



index.js (partial)

import {load} from './features/worldtime/slice'

store.dispatch(load())

ReactDOM.render(



Final Note


As you might have noticed, I've spent ~zero time in making this application beautiful. The goal of this post is to handle the functional aspects of react/redux + ajax calls. I believe that the create-react-app template might had been much better if it included these as part of the basic template.


Monday, November 1, 2021

GoLang Integration with Cilium and Hubble in a Kubernetes Cluster


 


In this post we will install Cilium and Hubble, and us a GO application to integrate with their API to monitor the traffic between pods, and to apply a new Cilium Network Policy.

Cilium and Hubble provide visibility and security for the network activity in a kubernetes cluster. Cilium, installed on each kubernetes node, utilizes eBPF for high performance enforcement of network policies, while Hubble relay connects to all of the cilium, and collects network traffic statistics.



Installation


To install cilium on a bare metal kubernetes, use the following:



curl -L --remote-name-all https://github.com/cilium/cilium-cli/releases/latest/download/cilium-linux-amd64.tar.gz{,.sha256sum}
sha256sum --check cilium-linux-amd64.tar.gz.sha256sum
sudo sudo tar xzvfC cilium-linux-amd64.tar.gz /usr/local/bin
rm cilium-linux-amd64.tar.gz{,.sha256sum}
cilium install



Then, to install Hubble relay on the kubernetes cluster, use the following:



cilium hubble enable
cilium status --wait



To install the Hubble client, which communicates with the Hubble relay, use the following:



export HUBBLE_VERSION=$(curl -s https://raw.githubusercontent.com/cilium/hubble/master/stable.txt)
curl -L --remote-name-all https://github.com/cilium/hubble/releases/download/$HUBBLE_VERSION/hubble-linux-amd64.tar.gz{,.sha256sum}
sha256sum --check hubble-linux-amd64.tar.gz.sha256sum
sudo tar xzvfC hubble-linux-amd64.tar.gz /usr/local/bin
rm hubble-linux-amd64.tar.gz{,.sha256sum}



Monitoring


Let's create a GO application to print the network traffic flows captured by Hubble. We start by initiating a gRPC connection to the Hubble relay.



package main

import (
"context"
"fmt"
"github.com/cilium/cilium/api/v1/flow"
"github.com/cilium/cilium/api/v1/observer"
"google.golang.org/grpc"
"io"
)

func main() {
var grpcOptions []grpc.DialOption
grpcOptions = append(grpcOptions, grpc.WithBlock())
grpcOptions = append(grpcOptions, grpc.FailOnNonTempDialError(true))
grpcOptions = append(grpcOptions, grpc.WithInsecure())

grpcConnection, err := grpc.DialContext(context.Background(), "hubble-relay.kube-system.svc.cluster.local:80", grpcOptions...)
if err != nil {
panic(err)
}



Next we wrap the gRPC connection with cilium observer, and loop on the flows.



client := observer.NewObserverClient(grpcConnection)
request := observer.GetFlowsRequest{
Follow: true,
}
flows, err := client.GetFlows(context.Background(), &request)
if err != nil {
panic(err)
}

for {
response, err := flows.Recv()
if err != nil {
if err == io.EOF || err == context.Canceled {
return
}

panic(err)
}
analyzeFlowsResponse(response)
}



In our case we will print only the ingress permitted flows.



func analyzeFlowsResponse(response *observer.GetFlowsResponse) {
switch response.GetResponseTypes().(type) {
case *observer.GetFlowsResponse_Flow:
capturedFlow := response.GetFlow()

if capturedFlow.Verdict != flow.Verdict_FORWARDED {
return
}

if capturedFlow.TrafficDirection != flow.TrafficDirection_INGRESS {
return
}

fmt.Printf("%+v", capturedFlow)
}
}



An example printed flow is the following:



time:{seconds:1635760734  nanos:916929131}  verdict:FORWARDED  ethernet:{source:"02:dd:e6:6c:a0:db"  destination:"7e:a9:d2:3f:0b:2c"}  IP:{source:"10.0.0.126"  destination:"10.0.0.147"  ipVersion:IPv4}  l4:{TCP:{source_port:51320  destination_port:4245  flags:{SYN:true}}}  source:{identity:1  labels:"reserved:host"}  destination:{ID:1010  identity:4240  namespace:"kube-system"  labels:"k8s:io.cilium.k8s.policy.cluster=kubernetes"  labels:"k8s:io.cilium.k8s.policy.serviceaccount=hubble-relay"  labels:"k8s:io.kubernetes.pod.namespace=kube-system"  labels:"k8s:k8s-app=hubble-relay"  pod_name:"hubble-relay-5f55dc4987-vh79m"}  Type:L3_L4  node_name:"alon-laptop"  event_type:{type:4}  traffic_direction:INGRESS  trace_observation_point:TO_ENDPOINT  is_reply:{}  interface:{index:36  name:"lxcfee2d5062d36"}  Summary:"TCP Flags: SYN"


Enforcement


To create a cilium policy, we start by connecting to the kubernetes API:



package main

import (
"context"
"fmt"
ciliumApiPolicy "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
ciliumApiClient "github.com/cilium/cilium/pkg/k8s/client/clientset/versioned/typed/cilium.io/v2"
ciliumApiLabels "github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/meta/v1"
ciliumApiRules "github.com/cilium/cilium/pkg/policy/api"
k8sApiMachinery "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/rest"
)

func main() {
restConfig, err := rest.InClusterConfig()
if err != nil {
panic(err)
}

client, err := ciliumApiClient.NewForConfig(restConfig)
if err != nil {
panic(err)
}



Next we create the policy. In this case we limit access to a pod with label app=my-target only for pod with label app=my-source.



namespace := "default"
policyName := "my-policy"
sourceLabels := map[string]string{"app": "my-source"}
targetLabels := map[string]string{"app": "my-target"}
port := 80
policy := ciliumApiPolicy.CiliumNetworkPolicy{
TypeMeta: k8sApiMachinery.TypeMeta{
Kind: "CiliumNetworkPolicy",
APIVersion: "cilium.io/v2",
},
ObjectMeta: k8sApiMachinery.ObjectMeta{
Name: policyName,
Namespace: namespace,
},
Spec: &ciliumApiRules.Rule{
EndpointSelector: ciliumApiRules.EndpointSelector{
LabelSelector: &ciliumApiLabels.LabelSelector{
MatchLabels: targetLabels,
},
},
Ingress: []ciliumApiRules.IngressRule{
{
IngressCommonRule: ciliumApiRules.IngressCommonRule{
FromEndpoints: []ciliumApiRules.EndpointSelector{
{
LabelSelector: &ciliumApiLabels.LabelSelector{
MatchLabels: sourceLabels,
},
},
},
},
ToPorts: []ciliumApiRules.PortRule{
{
Ports: []ciliumApiRules.PortProtocol{
{
Port: fmt.Sprintf("%v", port),
Protocol: "TCP",
},
},
},
},
},
},
},
}

options := k8sApiMachinery.CreateOptions{}
_, err = client.CiliumNetworkPolicies(namespace).Create(context.Background(), &policy, options)
if err != nil {
panic(err)
}



Final Note


Cilium and Hubble provide a great method to enforce network traffic policies, while maintaining high performance of the network. We can use these tools manually, or as described in this post in an automatic manner.