Monday, April 25, 2022

Update Go's Empty Interface

 

Some functions in Go need to update an empty interface, while finding out the actual type of data that should be used in some way. In this post we will examine how to set value for the Go's empty interface.

Let's start by configuring two structs:


import (
"fmt"
"reflect"
)

type Data1 struct {
name string
}

type Data2 struct {
age int
}



Next we use Go's reflect to update an empty interface:



func initialize(kind string, data interface{}) {
dataValue := reflect.ValueOf(data)

var setValue reflect.Value
if kind == "data1" {
initialValue := Data1{name: "init"}
setValue = reflect.ValueOf(initialValue)
} else {
initialValue := Data2{age: 120}
setValue = reflect.ValueOf(initialValue)
}
dataValue.Elem().Set(setValue)
}



Now we can send any type of struct to the function, and it will try to set its value, for example:



data1 := Data1{}
initialize("data1", &data1)
fmt.Printf("%+v\n", data1)


This will print:


{name:init}


We can do the same for the second type of struct:



data2 := Data2{}
initialize("data2", &data2)
fmt.Printf("%+v\n", data2)


And this time we get the following:


{age:120}



If we send a wrong type of structure, we will have a Go panic error, for example:


initialize("data2", &data1)

Will panic with the following error:



panic: reflect.Set: value of type main.Data2 is not assignable to type main.Data1




Monday, April 11, 2022

Python Profiling



In this post we will review how to profile CPU usage of python functions.


First, let us create a CPU consuming code:



import math
import random
import time


def calculate(value):
return math.sqrt(value * 100)


def scan_dict(dict):
for key in dict.keys():
value = calculate(dict[key])
if value > 100:
print("big")


def main():
start = time.time()
dict = {}
for i in range(10000):
dict[i] = random.randint(0, 100)

for i in range(1000):
scan_dict(dict)

passed = time.time() - start
print('total {:.3f} seconds'.format(passed))



This code initializes a dictionary with 10K entries, and then it calls 1000 time to a function that scans the dictionary entries, and calculate square root for each element.


As you can see, we have a timing printing for the entire main() function. On my machine, it prints:


total 1.787 seconds


So we about 2 seconds for this program. Now lets try finding out where is the time spent. We will use cProfile for this.



import cProfile
import pstats
import io

pr = cProfile.Profile()
pr.enable()
main()
pr.disable()
s = io.StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats('cumtime')
ps.print_stats()

with open('test.txt', 'w+') as f:
f.write(s.getvalue())


The first thing noticed when wrapping the main() function with cProfile, is the response time degradation.


total 3.572 seconds


So profiling has its price. The analyze of the cProfile is saved to a file, which is sorted by the cumulative time spent in each function.



        20054767 function calls in 3.572 seconds

Ordered by: cumulative time

ncalls tottime percall cumtime percall filename:lineno(function)
1 0.003 0.003 3.572 3.572 /home/alon/git/a.py:17(main)
1000 1.527 0.002 3.557 0.004 /home/alon/git/a.py:10(scan_dict)
10000000 1.455 0.000 2.030 0.000 /home/alon/git/a.py:6(calculate)
10000000 0.575 0.000 0.575 0.000 {built-in method math.sqrt}
10000 0.002 0.000 0.011 0.000 /usr/lib/python3.8/random.py:244(randint)
10000 0.004 0.000 0.009 0.000 /usr/lib/python3.8/random.py:200(randrange)
10000 0.003 0.000 0.005 0.000 /usr/lib/python3.8/random.py:250(_randbelow_with_getrandbits)
12761 0.001 0.000 0.001 0.000 {method 'getrandbits' of '_random.Random' objects}
10000 0.001 0.000 0.001 0.000 {method 'bit_length' of 'int' objects}
1000 0.000 0.000 0.000 0.000 {method 'keys' of 'dict' objects}
1 0.000 0.000 0.000 0.000 {built-in method builtins.print}
1 0.000 0.000 0.000 0.000 {method 'format' of 'str' objects}
2 0.000 0.000 0.000 0.000 {built-in method time.time}
1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}



The table contains a line per function. For each function, we have the following information.

  • ncalls - amount of function calls
  • tottime - time spent in the function, sub-functions not included
  • percall - tottime/ncalls
  • cumtime - time spent in the function, sub-functions included
  • percall - cumtime/ncalls


Final Note

The results are quite surprising. I had expected that the square root function will consume most of the CPU, but it turns out that most of the time was spent in the scanning of the dictionary. Turns out that python is not very effective tool to scan a big dictionary. For example, the same code in GO would run ~15 times faster.











Monday, April 4, 2022

Working with EMR Best Practices

 


In this post I'll describe some of the best practices I've learned while working with AWS EMR.


Auto Terminate

Running an EMR cluster has its costs. To save money, configure the EMR to automatically terminate in case it was not active for a long period of time, for example: 1 hour.

AWS CLI

Do not manually create the EMR cluster every time. Once the EMR cluster is configured per your need, use the AWS CLI export button to create a CLI to create the EMR cluster. Then a recreation of a terminated cluster is simple, and can even be automated.



Use Bootstrap

Bootstrap script is a shell script that runs before the spark instance starts. It is used to install pre-requirements for your need. A common pre-requirement is to install python's libraries, for example:


#!/bin/bash
sudo yum install unzip
sudo python3 -m pip install -U boto3 paramiko


Write Dynamic Code

When writing code we sometimes have, well... bugs...
To debug these, we can print debug printing to STDOUT, and check the printings in the logs.
Another method to debug is to run the code locally on your development environment, using the auto-created spark server from the pyspark library. However, there are cases that need to run differently when running on your development machine, for example, you might want to redirect access to S3 files to accessing local files on your machine. To check if the code is running in a cluster or on a development machine, we can use the following simple method:


def is_local_spark():
return 'SPARK_PUBLIC_DNS' not in os.environ


Spark Context

Spark context must be created only once. In case a global variable is used by several modules, python might reinitialize it, hence causing errors that spark context is already created. To avoid this, we use a singleton class.



class SingletonMeta(type):
_instances = {}

def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
instance = super().__call__(*args, **kwargs)
cls._instances[cls] = instance
return cls._instances[cls]


class SparkWrapper(metaclass=SingletonMeta):
def __init__(self):
self.spark_context = SparkContext.getOrCreate()


print(SparkWrapper().spark_context)