Monday, November 29, 2021

Run a Python Server in Docker


 


In this post we will review the steps to run a python web server in a docker container.

Let start with a python code to run a simple Hello World server:


server.py

from flask import Flask
from waitress import serve

app = Flask(__name__)


@app.route("/")
def hello():
return "Hello World"


serve(app, host="0.0.0.0", port=8080)



To enable our docker image to include all the dependencies, we create a requirements.txt file using the following command:



pip3 freeze > requirements.txt 



Next, let's create the docker file:


Dockerfile

FROM python:3.9.9
WORKDIR /app

COPY src/requirements.txt requirements.txt
RUN pip3 install -r requirements.txt

COPY src /app

CMD [ "python3", "server.py"]



The docker file first installs all the dependencies using the requirements.txt file, and then copies the python source, and runs our web server.




Monday, November 22, 2021

Sending email using SendGrid in GO


 


In this post we'll send an email using SendGrid in a GO application.


The code is very simple:


import (
"github.com/sendgrid/sendgrid-go"
"github.com/sendgrid/sendgrid-go/helpers/mail"
)


message .:= mail.NewV3Mail()
personalization := mail.NewPersonalization()

message.SetFrom(mail.NewEmail("John Doe","from@example.com"))
personalization.AddTos(mail.NewEmail("Alice Bob", "to@example.com"))
personalization.Subject = "My email is here"

body := "This is my <b>HTML</b> body<br>The end!"
message.AddContent(mail.NewContent("text/html", body))
message.AddPersonalizations(personalization)

client := sendgrid.NewSendClient("MY_SECRET_API_KEY_FROM_SENDGRID")
_, err := client.Send(message)
if err != nil {
panic(err)
}
return nil



However, there are some important items to notice:

  • The SendGrid library does not use port 25 to send email. It is based on an HTTPS request to the SendGrid API servers. This is Great for networks with firewall blocking any non HTTP request.
  • You cannot send from any email that you want. To enable sending from a specific email, you need to add a sender, and authenticate it using the SendGrid GUI.
  • The free account in SendGrid is limited to 100 emails per day. In case you want more - you'll need to pay.

Why not just start your own SMTP server, and bypass the 100 emails per day limit?
Well that's because an SMTP server must have an matching MX entry in the DNS server. In addition, many mail servers just block any email arriving from an unknown SMTP server. Some even block the SendGrid SMTP itself.




Monday, November 15, 2021

Create a Graph in PDF from ElasticSearch data using Python

 



In this post we will review how to use python to fetch data from ElasticSearch, and create a graph in a new PDF document. 

Why do we need this? Well, sometime Kibana, which is part of the Elastic stack, cannot get the data that you need, and you want the make some multi-pass manipulations on the data, to prepare it for presentation. Another thing is that you might need to automation PDF creation ad part of a daily job, for example as an internal step before merging multiple PDF documents into a one large PDF report.


Let's start by installing the dependent libraries. We create a requirements file:


requirements.txt

elasticsearch==7.15.1
matplotlib==3.4.3
numpy==1.21.3
pandas==1.3.4
PyPDF2==1.26.0
python_dateutil==2.8.2



And install it using the command:



pip install -r requirements.txt



Now we can create a connection to the ElasticSearch. Notice that we add timeouts configuration to ensure that long running queries will not cause errors.



import datetime
import os
import elasticsearch
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from dateutil import parser
from matplotlib.backends.backend_pdf import PdfPages

from account import Account
from transaction import Transaction


es = elasticsearch.Elasticsearch(hosts=['https://my-elastic-search.example.com'],
http_auth=('elastic', 'my-password'),
request_timeout=120,
timeout=120)


What about running queries? How do we replace query parameters?

I've found the easiest way is to create a json file, that includes place holders for replacing parameters. So we use a function to get the query and replace the place holders.



def replace_in_string(data, replace_name, replace_value):
return data.replace(
'___{}___'.format(replace_name),
'{}'.format(replace_value)
)


def get_query(file_name, replaces):
file_path = 'queries/{}.json'.format(file_name)
file = open(file_path, 'r')
data = file.read()
file.close()
for name, value in replaces.items():
data = replace_in_string(data, name, value)
return data



An example of a query file is the following:


query.json

{
"sort": [
{
"timestamp": {
"order": "desc",
"unmapped_type": "boolean"
}
}
],
___SEARCH_AFTER___
"query": {
"bool": {
"must": [],
"filter": [
{
"match_all": {}
},
{
"range": {
"timestamp": {
"gte": "___FROM_TIME___",
"lte": "___TO_TIME___",
"format": "strict_date_optional_time"
}
}
},
{
"range": {
"cost": {
"gte": ___FROM_COST___,
"lt": 100
}
}
},
{
"match_phrase": {
"category": "toys"
}
}
],
"should": [],
"must_not": []
}
}
}



This query fetches payment records in a specified time range, and with some additional filters. Notice the "search after" place holder. This one will be used for pagination. Why do we need pagination? Because the ElasticSearch returns up to 10K records. After that, we need to repeatedly send a new query that asks for the next page. The following function handles the fetch including the pagination.



def es_scroll(index, query_name, replaces, page_size=10000, max_items=0):
page_number = 1
search_after = ''
fetched_items = 0
while True:
replaces['SEARCH_AFTER'] = search_after
query = get_query(query_name, replaces)
try:
page = es.search(index=index, body=query, size=page_size)
except elasticsearch.exceptions.RequestError as e:
print('query was: {}'.format(query))
raise e
hits = page['hits']['hits']
if len(hits) == 0:
break
page_number += 1
hits = page['hits']['hits']
for hit in hits:
yield hit['_source']
fetched_items += 1

if max_items > 0:
max_items -= 1
if max_items == 0:
return

search_after = '"search_after": [{}],'.format(hit['sort'][0])



Let's run the actual fetch of data. We use a function to translate python date to ElasticSearch format.



def to_elastic_iso(date):
return date.isoformat().replace("+00:00", "Z")



And we fetch the records into a list.



to_time_string = '2021-11-14T08:00:00.000Z'
to_time = parser.parse(to_time_string)
duration = datetime.timedelta(minutes=0, hours=24)

replaces = {
'FROM_TIME': to_elastic_iso(to_time-duration),
'TO_TIME': to_elastic_iso(to_time),
'FROM_COST': 50
}

headers = ['Time', 'Cost']
rows = [headers]

for item in es_scroll('transactions-*', 'query', replaces):
timestamp = item["timestamp"]
cost = item["cost"]
rows.append([timestamp,cost])



The last step is to create a time series graph in a PDF. We use the following function:



def create_graph(file_name, title, data):
headers = data[0]
time_header = headers[0]
df = pd.DataFrame(data[1:], columns=headers)
df[time_header] = pd.to_datetime(df[time_header])
df.set_index(time_header)
plot = df.plot(x=time_header, y=headers[1:])
plot.set_title(title)
figure = plot.get_figure()
figure.savefig('output/{}.pdf'.format(file_name), format='pdf')
figure.clear()
plt.close(figure)
plt.cla()
plt.clf()



Final Note


The pieces of code displayed int this post can be join forces of ElasticSearch and python to create great PDF reports. Notice that the python matplotlib can be configured to display the graphs in various presentation methods.



Monday, November 8, 2021

Create Redux App with ASync Ajax Calls




 In this post we will create a react/redux app with async ajax calls.

I'm adding this example, because I believe this app should be the base for any react/redux application with ajax calls. This includes:

  • Proxy handling for the ajax requests
  • Ajax requests wrapper
  • Notification top bar to present errors
  • An example component to show the world time


Setup and Cleanup


We start from the redux base template:



npx create-react-app my-app --template redux



Delete the counter feature by the following:

  • delete folder src/features/counter
  • in src/App.js
    • return only empty <div/>
  • in src/app/store.js
    • remove import for counter reducer
    • remove the counter from the store
  • move src/app/store.js to src/store.js


Proxy


Add middleware to allow proxy requests to other services:


npm i -s http-proxy-middleware



And add the proxy configuration:


src/setupProxy.js
const {createProxyMiddleware} = require('http-proxy-middleware')

module.exports = function (app) {
app.use(
'/api',
createProxyMiddleware({
target: 'http://worldclockapi.com/',
changeOrigin: true,
logLevel: 'debug',
}),
)
}


Requests Wrapper


Add a utility to handle sending ajax requests, and parsing the errors.


src/features/request/request.js

export async function sendRequest(method, url, body) {
let fetchOptions = {
method: method,
headers: {
'Content-type': 'application/json',
},
}
if (body) {fetchOptions.body = JSON.stringify(body)}
const response = await fetch(url, fetchOptions)

if (response.status === 200) {
const json = await response.json()
return {
ok: true,
response: json,
}
}

let error = await response.text()
try {
const parsed = JSON.parse(error)
if (parsed.message) {
error = parsed.message
}
} catch (e) {
// ignore - send the actual error text
}

return {
ok: false,
response: error,
}
}



Notification/Errors Top Bar


Add a top bar notification with ability to show info/error notifications


src/features/notifications/component.js

import React from 'react'
import {useSelector} from 'react-redux'
import {selectState} from './slice'
import styles from './component.module.css'

export function Notifications() {
const state = useSelector(selectState)

let key = 0
const items = state.items.map(item =>
(
<div
key={key++}
className={styles.notification}
>
{item.text}
</div>
))

return (
<div>
{items}
</div>
)
}


src/features/component.module.css

.notification {
font-weight: bold;
color: red;
}



src/features/slice.js

import {createSlice} from '@reduxjs/toolkit'


const initialState = {
items: [],
}


export const slice = createSlice({
name: 'notifications',
initialState,
reducers: {
add: (state, action) => {
state.items.push(action.payload)
},
cleanup: (state, action) => {
const expire = Date.now() - 5000
state.items = state.items.filter(item => item.time > expire)
},
},
})

const {add, cleanup} = slice.actions


export const tick = () => (dispatch, getState) => {
dispatch(cleanup())
const state = getState().notifications
if (state.items.length > 0) {
setTimeout(() => {
dispatch(tick())
}, 1000)
}
}

export const addNotification = (isError, text) => (dispatch) => {
const item = {
text,
isError,
time: Date.now(),
}
dispatch(add(item))
dispatch(tick())
}

export const selectState = (state) => state.notifications

export default slice.reducer



World Time Component Example


Create our own feature to display the current time from worldclockapi.com.


src/features/worldtime/component.js

import React from 'react'
import {useDispatch, useSelector} from 'react-redux'
import {errorAjax, load, selectState} from './slice'
import styles from './component.module.css'

export function WorldTime() {
const dispatch = useDispatch()
const state = useSelector(selectState)
if (state.loading) {
return (
<div className={styles.loading}>
Loading
</div>
)
}

return (
<div>
<div className={styles.queries}>
{state.queries} queries
</div>
<div className={styles.time}>
{state.time}
</div>
<div
className={styles.button}
onClick={
() => {
dispatch(load())
}
}>
Reload
</div>
<div
className={styles.button}
onClick={
() => {
dispatch(errorAjax())
}
}>
Make request error
</div>
</div>
)

}



src/features/worldtime/component.module.css

.loading {
position: absolute;
left: 0;
width: 100%;
height: 100%;
background-color: rgba(128, 128, 128, 0.75);
}

.time {
width: 100%;
text-align: center;
}

.queries {
width: 100%;
text-align: center;
}

.button {
background-color: blue;
border-radius: 10px;
cursor: pointer;
width: 200px;
padding: 20px;
margin: 20px;
}



src/features/worldtime/slice.js

import {createAsyncThunk, createSlice} from '@reduxjs/toolkit'
import {sendRequest} from '../request/request'
import {addNotification} from '../notifications/slice'


const initialState = {
time: 'Unknown',
queries: 0,
loading: false,
}

export const load = createAsyncThunk(
'worldtime/load',
async (arg, thunkApi) => {
const {ok, response} = await sendRequest('get', '/api/json/est/now')

if (ok) {
return response
}

thunkApi.dispatch(addNotification(true, response))
},
)

export const errorAjax = createAsyncThunk(
'worldtime/load',
async (arg, thunkApi) => {
const {ok, response} = await sendRequest('post', '/api/not-here')

if (ok) {
return response
}

thunkApi.dispatch(addNotification(true, response))
},
)

export const slice = createSlice({
name: 'worldtime',
initialState,
reducers: {},
extraReducers: (builder) => {
builder
.addCase(load.pending, (state) => {
state.loading = true
})
.addCase(load.fulfilled, (state, action) => {
state.loading = false
if (!action.payload) {
// request had failed, do not update
return
}
state.queries++
state.time = action.payload.currentDateTime
})
},
})

export const selectState = (state) => state.worldtime

export default slice.reducer



Integrate into the Application


Now we call to the reducers from the main reducer store:


src/store.js

import {configureStore} from '@reduxjs/toolkit'
import worldtime from './features/worldtime/slice'
import notifications from './features/notifications/slice'

export const store = configureStore({
reducer: {
worldtime,
notifications,
},
})



and include the components from the main app:


src/App.js

import React from 'react'
import './App.css'
import {WorldTime} from './features/worldtime/component'
import {Notifications} from './features/notifications/component'

function App() {
return (
<div>
<Notifications/>
<WorldTime/>
</div>
)
}

export default App



To make the load time run upon application start, call it from the index



index.js (partial)

import {load} from './features/worldtime/slice'

store.dispatch(load())

ReactDOM.render(



Final Note


As you might have noticed, I've spent ~zero time in making this application beautiful. The goal of this post is to handle the functional aspects of react/redux + ajax calls. I believe that the create-react-app template might had been much better if it included these as part of the basic template.


Monday, November 1, 2021

GoLang Integration with Cilium and Hubble in a Kubernetes Cluster


 


In this post we will install Cilium and Hubble, and us a GO application to integrate with their API to monitor the traffic between pods, and to apply a new Cilium Network Policy.

Cilium and Hubble provide visibility and security for the network activity in a kubernetes cluster. Cilium, installed on each kubernetes node, utilizes eBPF for high performance enforcement of network policies, while Hubble relay connects to all of the cilium, and collects network traffic statistics.



Installation


To install cilium on a bare metal kubernetes, use the following:



curl -L --remote-name-all https://github.com/cilium/cilium-cli/releases/latest/download/cilium-linux-amd64.tar.gz{,.sha256sum}
sha256sum --check cilium-linux-amd64.tar.gz.sha256sum
sudo sudo tar xzvfC cilium-linux-amd64.tar.gz /usr/local/bin
rm cilium-linux-amd64.tar.gz{,.sha256sum}
cilium install



Then, to install Hubble relay on the kubernetes cluster, use the following:



cilium hubble enable
cilium status --wait



To install the Hubble client, which communicates with the Hubble relay, use the following:



export HUBBLE_VERSION=$(curl -s https://raw.githubusercontent.com/cilium/hubble/master/stable.txt)
curl -L --remote-name-all https://github.com/cilium/hubble/releases/download/$HUBBLE_VERSION/hubble-linux-amd64.tar.gz{,.sha256sum}
sha256sum --check hubble-linux-amd64.tar.gz.sha256sum
sudo tar xzvfC hubble-linux-amd64.tar.gz /usr/local/bin
rm hubble-linux-amd64.tar.gz{,.sha256sum}



Monitoring


Let's create a GO application to print the network traffic flows captured by Hubble. We start by initiating a gRPC connection to the Hubble relay.



package main

import (
"context"
"fmt"
"github.com/cilium/cilium/api/v1/flow"
"github.com/cilium/cilium/api/v1/observer"
"google.golang.org/grpc"
"io"
)

func main() {
var grpcOptions []grpc.DialOption
grpcOptions = append(grpcOptions, grpc.WithBlock())
grpcOptions = append(grpcOptions, grpc.FailOnNonTempDialError(true))
grpcOptions = append(grpcOptions, grpc.WithInsecure())

grpcConnection, err := grpc.DialContext(context.Background(), "hubble-relay.kube-system.svc.cluster.local:80", grpcOptions...)
if err != nil {
panic(err)
}



Next we wrap the gRPC connection with cilium observer, and loop on the flows.



client := observer.NewObserverClient(grpcConnection)
request := observer.GetFlowsRequest{
Follow: true,
}
flows, err := client.GetFlows(context.Background(), &request)
if err != nil {
panic(err)
}

for {
response, err := flows.Recv()
if err != nil {
if err == io.EOF || err == context.Canceled {
return
}

panic(err)
}
analyzeFlowsResponse(response)
}



In our case we will print only the ingress permitted flows.



func analyzeFlowsResponse(response *observer.GetFlowsResponse) {
switch response.GetResponseTypes().(type) {
case *observer.GetFlowsResponse_Flow:
capturedFlow := response.GetFlow()

if capturedFlow.Verdict != flow.Verdict_FORWARDED {
return
}

if capturedFlow.TrafficDirection != flow.TrafficDirection_INGRESS {
return
}

fmt.Printf("%+v", capturedFlow)
}
}



An example printed flow is the following:



time:{seconds:1635760734  nanos:916929131}  verdict:FORWARDED  ethernet:{source:"02:dd:e6:6c:a0:db"  destination:"7e:a9:d2:3f:0b:2c"}  IP:{source:"10.0.0.126"  destination:"10.0.0.147"  ipVersion:IPv4}  l4:{TCP:{source_port:51320  destination_port:4245  flags:{SYN:true}}}  source:{identity:1  labels:"reserved:host"}  destination:{ID:1010  identity:4240  namespace:"kube-system"  labels:"k8s:io.cilium.k8s.policy.cluster=kubernetes"  labels:"k8s:io.cilium.k8s.policy.serviceaccount=hubble-relay"  labels:"k8s:io.kubernetes.pod.namespace=kube-system"  labels:"k8s:k8s-app=hubble-relay"  pod_name:"hubble-relay-5f55dc4987-vh79m"}  Type:L3_L4  node_name:"alon-laptop"  event_type:{type:4}  traffic_direction:INGRESS  trace_observation_point:TO_ENDPOINT  is_reply:{}  interface:{index:36  name:"lxcfee2d5062d36"}  Summary:"TCP Flags: SYN"


Enforcement


To create a cilium policy, we start by connecting to the kubernetes API:



package main

import (
"context"
"fmt"
ciliumApiPolicy "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
ciliumApiClient "github.com/cilium/cilium/pkg/k8s/client/clientset/versioned/typed/cilium.io/v2"
ciliumApiLabels "github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/meta/v1"
ciliumApiRules "github.com/cilium/cilium/pkg/policy/api"
k8sApiMachinery "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/rest"
)

func main() {
restConfig, err := rest.InClusterConfig()
if err != nil {
panic(err)
}

client, err := ciliumApiClient.NewForConfig(restConfig)
if err != nil {
panic(err)
}



Next we create the policy. In this case we limit access to a pod with label app=my-target only for pod with label app=my-source.



namespace := "default"
policyName := "my-policy"
sourceLabels := map[string]string{"app": "my-source"}
targetLabels := map[string]string{"app": "my-target"}
port := 80
policy := ciliumApiPolicy.CiliumNetworkPolicy{
TypeMeta: k8sApiMachinery.TypeMeta{
Kind: "CiliumNetworkPolicy",
APIVersion: "cilium.io/v2",
},
ObjectMeta: k8sApiMachinery.ObjectMeta{
Name: policyName,
Namespace: namespace,
},
Spec: &ciliumApiRules.Rule{
EndpointSelector: ciliumApiRules.EndpointSelector{
LabelSelector: &ciliumApiLabels.LabelSelector{
MatchLabels: targetLabels,
},
},
Ingress: []ciliumApiRules.IngressRule{
{
IngressCommonRule: ciliumApiRules.IngressCommonRule{
FromEndpoints: []ciliumApiRules.EndpointSelector{
{
LabelSelector: &ciliumApiLabels.LabelSelector{
MatchLabels: sourceLabels,
},
},
},
},
ToPorts: []ciliumApiRules.PortRule{
{
Ports: []ciliumApiRules.PortProtocol{
{
Port: fmt.Sprintf("%v", port),
Protocol: "TCP",
},
},
},
},
},
},
},
}

options := k8sApiMachinery.CreateOptions{}
_, err = client.CiliumNetworkPolicies(namespace).Create(context.Background(), &policy, options)
if err != nil {
panic(err)
}



Final Note


Cilium and Hubble provide a great method to enforce network traffic policies, while maintaining high performance of the network. We can use these tools manually, or as described in this post in an automatic manner.