Sunday, June 26, 2022

Using AWS Batch with boto3


 

In this post we will review usage of AWS Batch with boto3.

AWS Batch enables us tyo submit one time jobs to AWS. This is a great and simple compute force available to us at a great scale. In my case I had to initiate some data injestion jobs, each of them running for ~ 1 hour. Using AWS Batch I was able to parallelize these jobs at great scale, while all the infrastructure is managed by AWS. The actual jobs are simply docker containers that are run once per request.


To use AWS Batch, we first need to configure some entities:

  • Compute environment - this setups how and where to run the containers. This includes whether to use Fargate, Spot, or On-demand methods. In addition a VPC should be selected. Notice that if we need to access the internet (or even S3) we have two alternatives: using a private VPC with NAT gateway, or using a public VPC with IPv4 allocation for each container. Using the first method is safer. If we choose the second method, notice that the IPv4 allocation should be specified on the Fargate configuration.
  • Job queue - where the jobs are waiting for execution
  • Job Definition, configuration the docker image name, the execution role, and the CPU& memory requirements for each container.

Once these entities are ready, we can start running jobs.

To send the jobs from boto3, we create 2 classes. The first one represents a single job.



import time

import boto3

client = boto3.client('batch')


class AwsBatchJob:
def __init__(self, name, job_definition, arguments):
self.name = name
self.arguments = arguments
self.job_id = None
self.job_definition = job_definition

def start_job(self):
job_name = self.name
job_name = job_name.replace(' ', '_')
job_name = job_name.replace(':', '_')
job_name = job_name.replace('.', '_')
response = client.submit_job(
jobName=job_name,
jobQueue='my-queue',
jobDefinition=self.job_definition,
containerOverrides={
'command': self.arguments,
},
)
self.job_id = response['jobId']

def get_job_status(self):
response = client.describe_jobs(jobs=[self.job_id])
jobs = response['jobs']
job = jobs[0]
status = job['status']
return status

def wait_for_job(self):
while True:
status = self.get_job_status()
if status == 'FAILED':
raise Exception(f'job {self.name} id {self.job_id} failed')
if status == 'SUCCEEDED':
return
time.sleep(60)



The second class manages multiple jobs that are run in parallel.


from aws_batch_job import AwsBatchJob


class AwsBatches:
def __init__(self, main_python_file, job_definition):
self.main_python_file = main_python_file
self.job_definition = job_definition
self.batches = {}

def add_batch(self, name, batch_arguments):
self.batches[name] = batch_arguments

def run_and_wait(self):
jobs = {}
for batch_name, batch_argument in self.batches.items():
arguments = ['python3', self.main_python_file]
arguments.extend(batch_argument)
job = AwsBatchJob(batch_name, self.job_definition, arguments)
job.start_job()
jobs[batch_name] = job

for job in jobs.values():
job.wait_for_job()



A simple example for usage is below.


batches = AwsBatches('my-folder/my_python_main.py', 'my-job-definition')

batches.add_batch('my first job', ['arg1', 'arg2'])
batches.add_batch('my second job', ['arg3', 'arg4'])

batches.run_and_wait()



That's all.

This simple solution is great for small projects that don't require online response. Notice that the job starts within several minutes after it is launched, so we can use this solution only for, as the AWS Batch implies, batch processing.


No comments:

Post a Comment