In this post we will review usage of AWS Batch with boto3.
AWS Batch enables us tyo submit one time jobs to AWS. This is a great and simple compute force available to us at a great scale. In my case I had to initiate some data injestion jobs, each of them running for ~ 1 hour. Using AWS Batch I was able to parallelize these jobs at great scale, while all the infrastructure is managed by AWS. The actual jobs are simply docker containers that are run once per request.
To use AWS Batch, we first need to configure some entities:
- Compute environment - this setups how and where to run the containers. This includes whether to use Fargate, Spot, or On-demand methods. In addition a VPC should be selected. Notice that if we need to access the internet (or even S3) we have two alternatives: using a private VPC with NAT gateway, or using a public VPC with IPv4 allocation for each container. Using the first method is safer. If we choose the second method, notice that the IPv4 allocation should be specified on the Fargate configuration.
- Job queue - where the jobs are waiting for execution
- Job Definition, configuration the docker image name, the execution role, and the CPU& memory requirements for each container.
import time
import boto3
client = boto3.client('batch')
class AwsBatchJob:
def __init__(self, name, job_definition, arguments):
self.name = name
self.arguments = arguments
self.job_id = None
self.job_definition = job_definition
def start_job(self):
job_name = self.name
job_name = job_name.replace(' ', '_')
job_name = job_name.replace(':', '_')
job_name = job_name.replace('.', '_')
response = client.submit_job(
jobName=job_name,
jobQueue='my-queue',
jobDefinition=self.job_definition,
containerOverrides={
'command': self.arguments,
},
)
self.job_id = response['jobId']
def get_job_status(self):
response = client.describe_jobs(jobs=[self.job_id])
jobs = response['jobs']
job = jobs[0]
status = job['status']
return status
def wait_for_job(self):
while True:
status = self.get_job_status()
if status == 'FAILED':
raise Exception(f'job {self.name} id {self.job_id} failed')
if status == 'SUCCEEDED':
return
time.sleep(60)
from aws_batch_job import AwsBatchJob
class AwsBatches:
def __init__(self, main_python_file, job_definition):
self.main_python_file = main_python_file
self.job_definition = job_definition
self.batches = {}
def add_batch(self, name, batch_arguments):
self.batches[name] = batch_arguments
def run_and_wait(self):
jobs = {}
for batch_name, batch_argument in self.batches.items():
arguments = ['python3', self.main_python_file]
arguments.extend(batch_argument)
job = AwsBatchJob(batch_name, self.job_definition, arguments)
job.start_job()
jobs[batch_name] = job
for job in jobs.values():
job.wait_for_job()
batches = AwsBatches('my-folder/my_python_main.py', 'my-job-definition')
batches.add_batch('my first job', ['arg1', 'arg2'])
batches.add_batch('my second job', ['arg3', 'arg4'])
batches.run_and_wait()