Monday, March 21, 2022

Deploy Python Application to AWS EMR


 

In this post we will review the steps to automatically deploy a python application to spark running on AWS EMR.


Our main function is the following:



import os
import re
import stat
from zipfile import ZipFile

import boto3
import paramiko


def main():
aws_set_credentials()
chmod_ssh_key()
zip_remote_path = copy_source_zip_to_s3()
main_remote_path = copy_file_to_s3('/git/my-repo/main.py', 's3-source-bucket', 'main.py')
run_spark_application(zip_remote_path, main_remote_path)



The deploy of the application starts with handling of AWS credentials and the AWS SSH key permissions. Then we create two files in an AWS S3 bucket, that includes our application sources, and finally we run the application by SSH and run command on the AWS EMR master node.

Let examine each of the steps.



def aws_set_credentials():
credentials_file = '/config/credentials'
os.environ['AWS_SHARED_CREDENTIALS_FILE'] = credentials_file



The AWS set credentials updates an environment variable to point to the location of our credentials. These will be used for AWS operations, such as update of the S3 bucket. An example of a credentials file is:



[default]
aws_access_key_id=AKIAWJPWYKUU1234567
aws_secret_access_key=rXKlsqJ2inJdxBdJk123456782345678923



Next we update the SSH private key mode:



def chmod_ssh_key():
private_key_path = '/config/ssh.pem'
os.chmod(private_key_path, stat.S_IRUSR | stat.S_IWUSR)



The SSH private key is the one used to create the EMR master node. We will later SSH to the EMR, hence we want to make sure that SSH private key has permissions only for the owner.


Once the AWS setup is ready, we can copy the source zip file.



def create_zip_file(zip_file_path, add_folder, match_regex):
pattern = re.compile(match_regex)
with ZipFile(zip_file_path, 'w') as zip_object:
for folder_name, sub_folders, file_names in os.walk(add_folder):
for file_name in file_names:
file_path = os.path.join(folder_name, file_name)
if pattern.match(file_path):
relative_path = file_path[len(add_folder) + 1:]
zip_object.write(file_path, relative_path)


def copy_file_to_s3(local_file_path, bucket_name, remote_file_path):
remote_path = 's3://{}/{}'.format(bucket_name, remote_file_path)
session = boto3.Session()
s3_connection = session.client('s3')
s3_connection.upload_file(local_file_path, bucket_name, remote_file_path)
return remote_path


def copy_source_zip_to_s3():
source_dir = '/git/my-repo'
zip_file_name = "emr-application.zip"
local_zip_file_path = os.path.join('tmp', zip_file_name)
create_zip_file(local_zip_file_path, source_dir, ".*py")
remote_path = copy_file_to_s3(local_zip_file_path, 's3-source-bucket', zip_file_name)
os.remove(local_zip_file_path)
return remote_path



All the related source and dependencies should be zipped and copied to the S3, so the EMR can access it. Notice that this includes the local dependencies, but the main application python file should be copied separately, and hence the main deploy function copies both the sources zip file and the main python file to the S3 bucket.


The last step is the actual run of the application on the EMR.



def get_emr_master_id():
client = boto3.client('emr')
response = client.list_clusters(
ClusterStates=[
'RUNNING', 'WAITING',
],
)

emr_cluster_name = 'my-emr'

for cluster in response['Clusters']:
if cluster['Name'] == emr_cluster_name:
return cluster['Id']

raise Exception('emr cluster {} not located'.format(emr_cluster_name))


def get_emr_master_ip():
cluster_id = get_emr_master_id()
client = boto3.client('emr')
response = client.list_instances(
ClusterId=cluster_id,
InstanceGroupTypes=[
'MASTER',
],
InstanceStates=[
'RUNNING',
]
)
instances = response['Instances']
if len(instances) != 1:
raise Exception('emr instances count {} is invalid'.format(len(instances)))

master_instance = instances[0]
ip = master_instance['PublicIpAddress']
return ip


def run_ssh_command(host, user, command):
private_key_path = '/config/ssh.pem'
private_key = paramiko.RSAKey.from_private_key_file(private_key_path)

ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname=host, username=user, pkey=private_key)
ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command)
ssh_stderr.channel.recv_exit_status()
ssh_stdout.channel.recv_exit_status()
all_err = ssh_stderr.read().decode("utf-8")
all_out = ssh_stdout.read().decode("utf-8")
ssh.close()
return all_err, all_out


def run_spark_application(s3_zip_file_path, main_file_path):
host_ip = get_emr_master_ip()
command_sections = [
'spark-submit',
'--deploy-mode cluster',
'--master yarn',
'--conf spark.yarn.submit.waitAppCompletion=true',
'--py-files {}'.format(s3_zip_file_path),
main_file_path,
]
command = ' '.join(command_sections)
error, output = run_ssh_command(host_ip, 'hadoop', command)
print(error + '\n' + output)



We start by located the EMR master node public IP using boto3 API. Notice that the master must be in a AWS VPC/subnet that allows SSH to it. After the SSH connection is established, we use the spark submit command to run our code.

The logs of the application can be located in AWS EMR GUI after about 5 minutes, as the EMR periodically updates the status every 5 minutes.








No comments:

Post a Comment