In this post we will review the steps to automatically deploy a python application to spark running on AWS EMR.
Our main function is the following:
import os
import re
import stat
from zipfile import ZipFile
import boto3
import paramiko
def main():
aws_set_credentials()
chmod_ssh_key()
zip_remote_path = copy_source_zip_to_s3()
main_remote_path = copy_file_to_s3('/git/my-repo/main.py', 's3-source-bucket', 'main.py')
run_spark_application(zip_remote_path, main_remote_path)
The deploy of the application starts with handling of AWS credentials and the AWS SSH key permissions. Then we create two files in an AWS S3 bucket, that includes our application sources, and finally we run the application by SSH and run command on the AWS EMR master node.
Let examine each of the steps.
def aws_set_credentials():
credentials_file = '/config/credentials'
os.environ['AWS_SHARED_CREDENTIALS_FILE'] = credentials_file
The AWS set credentials updates an environment variable to point to the location of our credentials. These will be used for AWS operations, such as update of the S3 bucket. An example of a credentials file is:
[default]
aws_access_key_id=AKIAWJPWYKUU1234567
aws_secret_access_key=rXKlsqJ2inJdxBdJk123456782345678923
Next we update the SSH private key mode:
def chmod_ssh_key():
private_key_path = '/config/ssh.pem'
os.chmod(private_key_path, stat.S_IRUSR | stat.S_IWUSR)
The SSH private key is the one used to create the EMR master node. We will later SSH to the EMR, hence we want to make sure that SSH private key has permissions only for the owner.
Once the AWS setup is ready, we can copy the source zip file.
def create_zip_file(zip_file_path, add_folder, match_regex):
pattern = re.compile(match_regex)
with ZipFile(zip_file_path, 'w') as zip_object:
for folder_name, sub_folders, file_names in os.walk(add_folder):
for file_name in file_names:
file_path = os.path.join(folder_name, file_name)
if pattern.match(file_path):
relative_path = file_path[len(add_folder) + 1:]
zip_object.write(file_path, relative_path)
def copy_file_to_s3(local_file_path, bucket_name, remote_file_path):
remote_path = 's3://{}/{}'.format(bucket_name, remote_file_path)
session = boto3.Session()
s3_connection = session.client('s3')
s3_connection.upload_file(local_file_path, bucket_name, remote_file_path)
return remote_path
def copy_source_zip_to_s3():
source_dir = '/git/my-repo'
zip_file_name = "emr-application.zip"
local_zip_file_path = os.path.join('tmp', zip_file_name)
create_zip_file(local_zip_file_path, source_dir, ".*py")
remote_path = copy_file_to_s3(local_zip_file_path, 's3-source-bucket', zip_file_name)
os.remove(local_zip_file_path)
return remote_path
All the related source and dependencies should be zipped and copied to the S3, so the EMR can access it. Notice that this includes the local dependencies, but the main application python file should be copied separately, and hence the main deploy function copies both the sources zip file and the main python file to the S3 bucket.
The last step is the actual run of the application on the EMR.
def get_emr_master_id():
client = boto3.client('emr')
response = client.list_clusters(
ClusterStates=[
'RUNNING', 'WAITING',
],
)
emr_cluster_name = 'my-emr'
for cluster in response['Clusters']:
if cluster['Name'] == emr_cluster_name:
return cluster['Id']
raise Exception('emr cluster {} not located'.format(emr_cluster_name))
def get_emr_master_ip():
cluster_id = get_emr_master_id()
client = boto3.client('emr')
response = client.list_instances(
ClusterId=cluster_id,
InstanceGroupTypes=[
'MASTER',
],
InstanceStates=[
'RUNNING',
]
)
instances = response['Instances']
if len(instances) != 1:
raise Exception('emr instances count {} is invalid'.format(len(instances)))
master_instance = instances[0]
ip = master_instance['PublicIpAddress']
return ip
def run_ssh_command(host, user, command):
private_key_path = '/config/ssh.pem'
private_key = paramiko.RSAKey.from_private_key_file(private_key_path)
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname=host, username=user, pkey=private_key)
ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command)
ssh_stderr.channel.recv_exit_status()
ssh_stdout.channel.recv_exit_status()
all_err = ssh_stderr.read().decode("utf-8")
all_out = ssh_stdout.read().decode("utf-8")
ssh.close()
return all_err, all_out
def run_spark_application(s3_zip_file_path, main_file_path):
host_ip = get_emr_master_ip()
command_sections = [
'spark-submit',
'--deploy-mode cluster',
'--master yarn',
'--conf spark.yarn.submit.waitAppCompletion=true',
'--py-files {}'.format(s3_zip_file_path),
main_file_path,
]
command = ' '.join(command_sections)
error, output = run_ssh_command(host_ip, 'hadoop', command)
print(error + '\n' + output)
We start by located the EMR master node public IP using boto3 API. Notice that the master must be in a AWS VPC/subnet that allows SSH to it. After the SSH connection is established, we use the spark submit command to run our code.
The logs of the application can be located in AWS EMR GUI after about 5 minutes, as the EMR periodically updates the status every 5 minutes.
No comments:
Post a Comment