Monday, March 28, 2022

Check Certificate in GoLang

 


The following code checks a fully qualified domain name (aka FQDN), and returns a boolean indicating if the FQDN has a valid certificate. We have several steps in this.

First we check if the FQDN is an IP address. A valid certificate must be issued for a host name, and not for an IP, and hence we reject IP addresses.

Next we connect to the FQDN on port 443. To get a valid SSL certification the connection must be successful.

Now that we have an established TLS connection, we check it properties:

  • The host name in the certificate must match the FQDN
  • The SSL certificate is not expired

Once all the previous steps are done, we can set the SSL certificate as a valid one.




package certificateupdater

import (
"crypto/tls"
"fmt"
"regexp"
"time"
)

var ipRegex = regexp.MustCompile(`(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)){3}`)

func IsIpAddress(name string) bool {
return ipRegex.MatchString(name)
}

func IsValidCertificate(fqdn string) bool {
if IsIpAddress(fqdn) {
return false
}

hostAndPort := fmt.Sprintf("%v:443", fqdn)
conn, err := tls.Dial("tcp", hostAndPort, nil)
if err != nil {
return false
}
defer func() {
closeErr := conn.Close()
if closeErr != nil {
panic(closeErr)
}
}()

err = conn.VerifyHostname(fqdn)
if err != nil {
return false
}
expiry := conn.ConnectionState().PeerCertificates[0].NotAfter
if expiry.Before(time.Now()) {
return false
}

return true
}


Monday, March 21, 2022

Deploy Python Application to AWS EMR


 

In this post we will review the steps to automatically deploy a python application to spark running on AWS EMR.


Our main function is the following:



import os
import re
import stat
from zipfile import ZipFile

import boto3
import paramiko


def main():
aws_set_credentials()
chmod_ssh_key()
zip_remote_path = copy_source_zip_to_s3()
main_remote_path = copy_file_to_s3('/git/my-repo/main.py', 's3-source-bucket', 'main.py')
run_spark_application(zip_remote_path, main_remote_path)



The deploy of the application starts with handling of AWS credentials and the AWS SSH key permissions. Then we create two files in an AWS S3 bucket, that includes our application sources, and finally we run the application by SSH and run command on the AWS EMR master node.

Let examine each of the steps.



def aws_set_credentials():
credentials_file = '/config/credentials'
os.environ['AWS_SHARED_CREDENTIALS_FILE'] = credentials_file



The AWS set credentials updates an environment variable to point to the location of our credentials. These will be used for AWS operations, such as update of the S3 bucket. An example of a credentials file is:



[default]
aws_access_key_id=AKIAWJPWYKUU1234567
aws_secret_access_key=rXKlsqJ2inJdxBdJk123456782345678923



Next we update the SSH private key mode:



def chmod_ssh_key():
private_key_path = '/config/ssh.pem'
os.chmod(private_key_path, stat.S_IRUSR | stat.S_IWUSR)



The SSH private key is the one used to create the EMR master node. We will later SSH to the EMR, hence we want to make sure that SSH private key has permissions only for the owner.


Once the AWS setup is ready, we can copy the source zip file.



def create_zip_file(zip_file_path, add_folder, match_regex):
pattern = re.compile(match_regex)
with ZipFile(zip_file_path, 'w') as zip_object:
for folder_name, sub_folders, file_names in os.walk(add_folder):
for file_name in file_names:
file_path = os.path.join(folder_name, file_name)
if pattern.match(file_path):
relative_path = file_path[len(add_folder) + 1:]
zip_object.write(file_path, relative_path)


def copy_file_to_s3(local_file_path, bucket_name, remote_file_path):
remote_path = 's3://{}/{}'.format(bucket_name, remote_file_path)
session = boto3.Session()
s3_connection = session.client('s3')
s3_connection.upload_file(local_file_path, bucket_name, remote_file_path)
return remote_path


def copy_source_zip_to_s3():
source_dir = '/git/my-repo'
zip_file_name = "emr-application.zip"
local_zip_file_path = os.path.join('tmp', zip_file_name)
create_zip_file(local_zip_file_path, source_dir, ".*py")
remote_path = copy_file_to_s3(local_zip_file_path, 's3-source-bucket', zip_file_name)
os.remove(local_zip_file_path)
return remote_path



All the related source and dependencies should be zipped and copied to the S3, so the EMR can access it. Notice that this includes the local dependencies, but the main application python file should be copied separately, and hence the main deploy function copies both the sources zip file and the main python file to the S3 bucket.


The last step is the actual run of the application on the EMR.



def get_emr_master_id():
client = boto3.client('emr')
response = client.list_clusters(
ClusterStates=[
'RUNNING', 'WAITING',
],
)

emr_cluster_name = 'my-emr'

for cluster in response['Clusters']:
if cluster['Name'] == emr_cluster_name:
return cluster['Id']

raise Exception('emr cluster {} not located'.format(emr_cluster_name))


def get_emr_master_ip():
cluster_id = get_emr_master_id()
client = boto3.client('emr')
response = client.list_instances(
ClusterId=cluster_id,
InstanceGroupTypes=[
'MASTER',
],
InstanceStates=[
'RUNNING',
]
)
instances = response['Instances']
if len(instances) != 1:
raise Exception('emr instances count {} is invalid'.format(len(instances)))

master_instance = instances[0]
ip = master_instance['PublicIpAddress']
return ip


def run_ssh_command(host, user, command):
private_key_path = '/config/ssh.pem'
private_key = paramiko.RSAKey.from_private_key_file(private_key_path)

ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname=host, username=user, pkey=private_key)
ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command)
ssh_stderr.channel.recv_exit_status()
ssh_stdout.channel.recv_exit_status()
all_err = ssh_stderr.read().decode("utf-8")
all_out = ssh_stdout.read().decode("utf-8")
ssh.close()
return all_err, all_out


def run_spark_application(s3_zip_file_path, main_file_path):
host_ip = get_emr_master_ip()
command_sections = [
'spark-submit',
'--deploy-mode cluster',
'--master yarn',
'--conf spark.yarn.submit.waitAppCompletion=true',
'--py-files {}'.format(s3_zip_file_path),
main_file_path,
]
command = ' '.join(command_sections)
error, output = run_ssh_command(host_ip, 'hadoop', command)
print(error + '\n' + output)



We start by located the EMR master node public IP using boto3 API. Notice that the master must be in a AWS VPC/subnet that allows SSH to it. After the SSH connection is established, we use the spark submit command to run our code.

The logs of the application can be located in AWS EMR GUI after about 5 minutes, as the EMR periodically updates the status every 5 minutes.








Monday, March 14, 2022

React-Vis Graph with Million Points and Zoom


 


In this post we will review how to display a react-vis graph with million points, including zoom support.

React-Vis is a great library that can display many times of charts. In this post we will use a simple line chart with react and redux. We also include a zoom support. 

Another important issue in this post is handling millions of points. If we try to display millions of points, the react-vis library starts slowing down, hence we add special handling in transforming the original huge amount of points to up to 1000 visible points. We use a transformation to join several points into one average point to get to this requirement.


First, let review the reduce slice.


slice.js

import {createSlice} from '@reduxjs/toolkit'


const maxPoints = 1000

const initialState = {
selections: {},
points: {},
series: {},
}


function getSelectedPoints(points,selection) {
if (!selection) {
return points
}
const selectedPoints = []

points.forEach(point => {
if (point.x >= selection.left && point.x <= selection.right) {
selectedPoints.push(point)
}
})
return selectedPoints
}

function reducePoints(points, ratio) {
if (ratio <= 1) {
return points
}

const reduced = []
let slice = []
points.forEach(point => {
slice.push(point)
if (slice.length === ratio) {
let x = 0
let y = 0
slice.forEach(slicePoint => {
x += slicePoint.x
y += slicePoint.y
})


const averagePoint = {
x: x / slice.length,
y: y / slice.length,
}

reduced.push(averagePoint)

slice = []
}
})
return reduced
}

function convertPoints(state,graphId) {
const selection = state.selections[graphId]
const points = state.points[graphId]
const xWidth = points[points.length - 1].x - points[0].x
const selectedPoints = getSelectedPoints(points,selection)

let allowedPoints
if (selection) {
const selectedWidth = Math.trunc(selection.right - selection.left)
allowedPoints = Math.trunc(maxPoints * xWidth / selectedWidth)
} else {
allowedPoints = maxPoints
}

const ratio = Math.trunc(points.length / allowedPoints)
return reducePoints(selectedPoints, ratio)
}

function updateSeries(state, graphId) {
state.series[graphId] = [
{
title: 'Apples',
disabled: false,
data: convertPoints(state,graphId),
},
]
}


export const slice = createSlice({
name: 'graph',
initialState,
reducers: {
setPoints: (state, action) => {
const {graphId, points} = action.payload
state.points[graphId] = points
updateSeries(state,graphId)
},
setSelection: (state, action) => {
const {graphId, selection} = action.payload
state.selections[graphId] = selection
updateSeries(state,graphId)
},
clearSelection: (state, action) => {
const {graphId} = action.payload
delete state.selections[graphId]
updateSeries(state,graphId)
},
},
})

export const {setSelection, clearSelection, setPoints} = slice.actions


export function selectState(state) {
return state.graph
}

export default slice.reducer


 We supply 3 actions: set points, set selection (for zoom), and clear selection (for zoom removal).

The convertPoints function, first selects only the relevant points according to the zoom selection area, and the reduce the points according to the amount of points. For example, if we have 1,000,000 points, and we zoom on scale 400,000-500,000, then we're left with 100,000 points. Then we convert each 100 points to a single average point so we only use 1000 points for the actual display.


The following class controls the way the axis labels are displayed:



custom-axis-label.js

import React, { PureComponent } from 'react';
import './index.css'

class CustomAxisLabel extends PureComponent {

render() {

const yLabelOffset = {
y: this.props.marginTop + this.props.innerHeight / 2 + this.props.title.length*2,
x: 10
};

const xLabelOffset = {
x: this.props.marginLeft + (this.props.innerWidth)/2 - this.props.title.length*2,
y: 1.2 * this.props.innerHeight
};

const transform = this.props.xAxis
? `translate(${xLabelOffset.x}, ${xLabelOffset.y})`
: `translate(${yLabelOffset.x}, ${yLabelOffset.y}) rotate(-90)`;

return (
<g transform={transform}>
<text className= 'unselectable axis-labels'>
{this.props.title}
</text>
</g>
);
}
}

CustomAxisLabel.displayName = 'CustomAxisLabel';
CustomAxisLabel.requiresSVG = true;
export default CustomAxisLabel;


We set the labels orientation, and the margins for the labels.

The highlight class handles the zoom in the graph by sending and event with the selection zoom rectangle.


highlight.js

import React from "react";
import { ScaleUtils, AbstractSeries } from "react-vis";

export default class Highlight extends AbstractSeries {
static displayName = "HighlightOverlay";
static defaultProps = {
allow: "x",
color: "rgb(77, 182, 172)",
opacity: 0.3
};

state = {
drawing: false,
drawArea: { top: 0, right: 0, bottom: 0, left: 0 },
x_start: 0,
y_start: 0,
x_mode: false,
y_mode: false,
xy_mode: false
};

constructor(props){
super(props);
document.addEventListener("mouseup", function (e) {
this.stopDrawing()
}.bind(this));
}

_cropDimension(loc, startLoc, minLoc, maxLoc) {
if (loc < startLoc) {
return {
start: Math.max(loc, minLoc),
stop: startLoc
};
}

return {
stop: Math.min(loc, maxLoc),
start: startLoc
};
}

_getDrawArea(loc) {
const { innerWidth, innerHeight } = this.props;
const { x_mode, y_mode, xy_mode } = this.state;
const { drawArea, x_start, y_start } = this.state;
const { x, y } = loc;
let out = drawArea;

if (x_mode | xy_mode) {
// X mode or XY mode
const { start, stop } = this._cropDimension(x, x_start, 0, innerWidth);
out = {
...out,
left: start,
right: stop
}
}
if (y_mode | xy_mode) {
// Y mode or XY mode
const { start, stop } = this._cropDimension(y, y_start, 0, innerHeight);
out = {
...out,
top: innerHeight - start,
bottom: innerHeight - stop
}
}
return out
}

onParentMouseDown(e) {
const { innerHeight, innerWidth, onBrushStart } = this.props;
const { x, y } = this._getMousePosition(e);
const y_rect = innerHeight - y;

// Define zoom mode
if (x < 0 & y >= 0) {
// Y mode
this.setState({
y_mode: true,
drawing: true,
drawArea: {
top: y_rect,
right: innerWidth,
bottom: y_rect,
left: 0
},
y_start: y
});

} else if (x >= 0 & y < 0) {
// X mode
this.setState({
x_mode: true,
drawing: true,
drawArea: {
top: innerHeight,
right: x,
bottom: 0,
left: x
},
x_start: x
});

} else if (x >= 0 & y >= 0) {
// XY mode
this.setState({
xy_mode: true,
drawing: true,
drawArea: {
top: y_rect,
right: x,
bottom: y_rect,
left: x
},
x_start: x,
y_start: y
});
}

// onBrushStart callback
if (onBrushStart) {
onBrushStart(e);
}

}

stopDrawing() {
// Reset zoom state
this.setState({
x_mode: false,
y_mode: false,
xy_mode: false
});

// Quickly short-circuit if the user isn't drawing in our component
if (!this.state.drawing) {
return;
}

const { onBrushEnd } = this.props;
const { drawArea } = this.state;
const xScale = ScaleUtils.getAttributeScale(this.props, "x");
const yScale = ScaleUtils.getAttributeScale(this.props, "y");

// Clear the draw area
this.setState({
drawing: false,
drawArea: { top: 0, right: 0, bottom: 0, left: 0 },
x_start: 0,
y_start: 0
});

// Invoke the callback with null if the selected area was < 5px
if (Math.abs(drawArea.right - drawArea.left) < 5) {
onBrushEnd(null);
return;
}

// Compute the corresponding domain drawn
const domainArea = {
bottom: yScale.invert(drawArea.top),
right: xScale.invert(drawArea.right),
top: yScale.invert(drawArea.bottom),
left: xScale.invert(drawArea.left)
};

if (onBrushEnd) {
onBrushEnd(domainArea);
}
}

_getMousePosition(e) {
// Get graph size
const { marginLeft, marginTop, innerHeight } = this.props;

// Compute position in pixels relative to axis
const loc_x = e.nativeEvent.offsetX - marginLeft;
const loc_y = innerHeight + marginTop - e.nativeEvent.offsetY;

// Return (x, y) coordinates
return {
x: loc_x,
y: loc_y
}

}

onParentMouseMove(e) {
const { drawing } = this.state;

if (drawing) {
const pos = this._getMousePosition(e);
const newDrawArea = this._getDrawArea(pos);
this.setState({ drawArea: newDrawArea });
}

}

render() {
const {
marginLeft,
marginTop,
innerWidth,
innerHeight,
color,
opacity
} = this.props;
const { drawArea: { left, right, top, bottom } } = this.state;
return (
<g
transform={`translate(${marginLeft}, ${marginTop})`}
className="highlight-container">
<rect
className="mouse-target"
fill="black"
opacity="0"
x={0}
y={0}
width={innerWidth}
height={innerHeight}
/>
<rect
className="highlight"
pointerEvents="none"
opacity={opacity}
fill={color}
x={left}
y={bottom}
width={right - left}
height={top - bottom}
/>
</g>
);
}
}



Last one is the graph class with uses all of the above.


component.js

import React from 'react'
import '../../node_modules/react-vis/dist/style.css'

import {
Borders,
DiscreteColorLegend,
HorizontalGridLines,
LineSeries,
VerticalGridLines,
XAxis,
XYPlot,
YAxis,
} from 'react-vis'
import Highlight from './highlight'
import {useDispatch, useSelector} from 'react-redux'
import {selectState, setSelection} from './slice'

function Graph(props) {
const {graphId} = props
const dispatch = useDispatch()
const state = useSelector(selectState)
const selection = state.selections[graphId]
const series = state.series[graphId]

if (!series) {
return null
}

const width = 1000

function highlightArea(area) {
dispatch(setSelection({
graphId,
selection: area,
}))
}

return (
<div>
<div className="legend">
<DiscreteColorLegend
width={180}
items={series}/>
</div>

<div className="chart no-select" onDragStart={function (e) {
e.preventDefault()
}}>
<XYPlot
xDomain={selection && [selection.left, selection.right]}
yDomain={selection && [selection.bottom, selection.top]}
height={500}
width={width}
margin={{left: 45, right: 20, top: 10, bottom: 200}}>

<HorizontalGridLines/>
<VerticalGridLines/>

{series.map(entry => (
<LineSeries
key={entry.title}
data={entry.data}
/>
))}

<Highlight
onBrushEnd={highlightArea}
/>
<Borders style={{all: {fill: '#fff'}}}/>
<XAxis tickFormat={(v) => new Date(v * 3600 * 1000).toISOString()} tickLabelAngle={-60}/>
<YAxis tickFormat={(v) => (<tspan className="unselectable"> {v} </tspan>)}/>
</XYPlot>
</div>
</div>
)
}

export default Graph



Notice that the graph supports multiple instance by using the graphId property. In this case we treat the x-axis as hours so we multiply it we 3600 seconds. The graph display a list of points, each including the x and y properties.


Final Note

While the react-vis response time is good, redux slows the GUI down. To prevent this, configure redux to skip the data of the reducer containing the huge amount of data. For example, to ignore the graph reducer data, use:

import {configureStore} from '@reduxjs/toolkit'

import notification from './notification/slice'
import dashboard from './dashboard/slice'
import graph from './graph/slice'
import histogram from './histogram/slice'

export const store = configureStore({
middleware: (getDefaultMiddleware) => getDefaultMiddleware({
immutableCheck: {ignoredPaths: ['graph']},
serializableCheck: {ignoredPaths: ['graph']},
}),
reducer: {
dashboard,
histogram,
graph,
notification,
},
})


Sunday, March 6, 2022

Custom Marshaling in GoLang


 


In this post we will review how to handle custom marshaling in GoLang.

I've recently had to marshal a structure containing a map with float as key. Then I got this error:


json: unsupported type: map[float64]int


Reading the documents I've found that since JSON does not support float as keys, the GoLang json marshaling does not automatically convert the float to string, and instead is returning an error. The solution in this case is to implement a custom marshaling. Let's examine the structures.


type MainStruct struct {
Name string
InnerData InnerStruct
}

type InnerStruct struct {
Exists bool
Count int
Values map[float64]int
}


We have a main struct, and an inner struct. I've included two structures to emphasis that the marshaling is done on the main struct, but still we will add our custom marshaling on the inner struct, and it will be used even that the marshaling is not done directly on it. Let's examine the marshal example:


func TestJson(t *testing.T) {
data := MainStruct{
Name: "john",
InnerData: InnerStruct{
Exists: true,
Count: 72,
Values: map[float64]int{
1.2: 42,
3.4: 56,
},
},
}

jsonBytes, err := json.Marshal(data)
if err != nil {
t.Fatal(err)
}

t.Log(string(jsonBytes))

var loadedData MainStruct
err = json.Unmarshal(jsonBytes, &loadedData)
if err != nil {
t.Fatal(err)
}

t.Log(loadedData)
}


If we will run the test now, we will get the unsupported type error displayed above. To solve the issue we add 2 methods to handle the custom marshaling and unmarshaling.


func (i InnerStruct) MarshalJSON() ([]byte, error) {
newStruct := struct {
Exists bool
Count int
Values map[string]int
}{
Exists: i.Exists,
Count: i.Count,
}

if i.Values != nil {
newStruct.Values = make(map[string]int)
for key, value := range i.Values {
keyString := fmt.Sprintf("%v", key)
newStruct.Values[keyString] = value
}
}
return json.Marshal(&newStruct)
}

func (i *InnerStruct) UnmarshalJSON(data []byte) error {
newStruct := struct {
Exists bool
Count int
Values map[string]int
}{}

err := json.Unmarshal(data, &newStruct)
if err != nil {
return fmt.Errorf("custom unmarshal failed: %v", err)
}

i.Exists = newStruct.Exists
i.Count = newStruct.Count

if newStruct.Values != nil {
i.Values = make(map[float64]int)
for key, value := range newStruct.Values {
valueFloat, err := strconv.ParseFloat(key, 64)
if err != nil {
return fmt.Errorf("parse float failed: %v", err)
}
i.Values[valueFloat] = value
}
}
return nil
}



The methods convert the map of floats keys to map of strings keys, and hence bypass the GoLang non-supporting the float as key. Both of the methods are using a temporary structure to covert the float to string and vise versa.


Final Note

In this post we have reviewed how to overcome the unsupported type error for GoLang marshaling. Note that other languages, such as javascript automatically handle this conversion.