Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
  • Loading branch information
gillins committed Aug 8, 2023
1 parent fa9836a commit 9089e1e
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 14 deletions.
19 changes: 17 additions & 2 deletions bin/rios_subproc_awsbatch.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python
"""
Main program for RIOS subprocesses invocked via AWS Batch.
Main program for RIOS subprocesses invoked via AWS Batch.
"""
# This file is part of RIOS - Raster I/O Simplification
Expand All @@ -27,13 +27,18 @@

from rios.parallel import subproc

# These vars are set in the container environment
# by CloudFormation
BUCKET = os.getenv("RIOSBucket")
INQUEUE = os.getenv("RIOSInQueue")
OUTQUEUE = os.getenv("RIOSOutQueue")
DFLT_NOMSG_TIMEOUT_SECS = 60 * 60 # 1 hour
NOMSG_TIMEOUT_SECS = int(os.getenv('RIOS_NOMSG_TIMEOUT',
default=DFLT_NOMSG_TIMEOUT_SECS))

# keep a track of the last time we got a message
# - if too long we can assume the main script exited
# and exit ourselves
LAST_MESSAGE_TIME = time.time()


Expand All @@ -48,39 +53,48 @@ def main():
sqsClient = boto3.client('sqs', region_name=region)

while True:
# get a message from the queue
resp = sqsClient.receive_message(QueueUrl=INQUEUE,
MaxNumberOfMessages=1, WaitTimeSeconds=20) # must be <= 20
if 'Messages' in resp and len(resp['Messages']) > 0:
# we got something
LAST_MESSAGE_TIME = time.time()

# just look at the first one
# just look at the first one (just asked for 1)
msg = resp['Messages'][0]
body = msg['Body']
receiptHandle = msg['ReceiptHandle']
sqsClient.delete_message(
QueueUrl=INQUEUE, ReceiptHandle=receiptHandle)

# message from the main script to exit
if body == 'Stop':
print('Job Exiting')
break

print('Started', body)

# get the info out of the pkl filename
bl, x, y, o = body.split('_')
outfile = 'block_{}_{}_out.pkl'.format(x, y)

# read the input pkl
inPkl = io.BytesIO()
s3Client.download_fileobj(BUCKET, body, inPkl)
inPkl.seek(0)

# delete it
s3Client.delete_object(Bucket=BUCKET, Key=body)

# run the job
outPkl = io.BytesIO()
subproc.runJob(inPkl, outPkl)

# upload the result
outPkl.seek(0)
s3Client.upload_fileobj(outPkl, BUCKET, outfile)

# send message back to main script
sqsClient.send_message(QueueUrl=OUTQUEUE,
MessageBody=outfile)

Expand All @@ -90,6 +104,7 @@ def main():
print('No message received within timeout. Exiting')
break
else:
# sleep for a bit before checking again
time.sleep(30)


Expand Down
94 changes: 94 additions & 0 deletions rios/parallel/aws/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Using AWS Services for parallel processing in RIOS

This directory holds implementations of per tile parallel processing
using AWS services. Currently only AWS Batch is supported but it is
intended that other services will be added in future.

Refer to jobmanager.py for an overview of how RIOS handles parallel processing.

## AWS Batch

### Creating the infrastructure

This implementation comes with a CloudFormation script (`templates/batch.yaml`)
to create a separate VPC with all the infrastructure required. It is recommended
to use the script `templates/createbatch.py` for the creation or modification (via the `--modify`
command line option) of this CloudFormation stack. There are also options for
overriding some of the input parameters - see the output of `createbatch.py --help`
for more information.

When you have completed processing you can run `templates/deletebatch.py` to delete
all resources so you aren't paying for it.

Note that both `createbatch.py` and `deletebatch.py` have a `--wait` option that causes the
script to keep running until creation/deletion is complete.

### Creating the Docker image

AWS Batch requires you to provide a Docker image with the required software installed.
A `Dockerfile` is provided for this, but it it recommended that you use the `Makefile`
to build the image as this handles the details of pulling the names out of the CloudFormation
stack and creating a tar file of RIOS for copying into the Docker image. To build and push to
ECR simply run:
```
make
```

By default this image includes GDAL, boto3 and RIOS.

Normally your script will need extra packages to run. You can specify the names of Ubuntu packages
to also install with the environment variable `EXTRA_PACKAGES` like this:
```
EXTRA_PACKAGES="python3-sklearn python3-skimage" make
```

You can also use the `PIP_PACKAGES` environment variable to set the name of any pip packages like this:
```
PIP_PACKAGES="pydantic python-dateutil" make
```

You can also specify both if needed:
```
EXTRA_PACKAGES="python3-sklearn python3-skimage" PIP_PACKAGES="pydantic python-dateutil" make
```

### Setting up your main script

To enable parallel processing using AWS Batch in your RIOS script you must import the batch module:
```
from rios.parallel.aws import batch
```

Secondly, you must set up an (Applier Controls)[https://www.rioshome.org/en/latest/rios_applier.html#rios.applier.ApplierControls]
object and pass it to (apply)[https://www.rioshome.org/en/latest/rios_applier.html#rios.applier.apply]. On this
object, you must make the following calls:
```
controls.setNumThreads(4) # or whatever number you want
controls.setJobManagerType('AWSBatch')
```

Note that the number of AWS Batch jobs started will be (numThreads - 1) as one job is done by the main RIOS script.

It is recommended that you run this main script within a container based on the one above. This reduces the likelihood
of problems introduced by different versions of Python or other packages your script needs between the main RIOS
script and the AWS Batch workers.

To do this, create a `Dockerfile` like the one below (replacing `myscript.py` with the name of your script):

```
# Created by make command above
FROM rios:latest
COPY myscript.py /usr/local/bin
RUN chmod +x /usr/local/bin/myscript.py
ENTRYPOINT ["/usr/bin/python3", "/usr/local/bin/myscript.py"]
```

Don't forget to pass in your `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables to this
container when it runs (these variables are automatically set if running as a AWS Batch job but you'll
need to set them otherwise).

Needless to say the account that this "main" script run as should have sufficient permissions on the resources
created by CloudFormation.

37 changes: 30 additions & 7 deletions rios/parallel/aws/batch.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""
Module for implementing parallel with AWS Batch
Module for implementing parallel with AWS Batch.
See the class AWSBatch for the implemenation.
"""

import io
Expand Down Expand Up @@ -44,25 +46,35 @@ def getStackOutputs(stackName=DFLT_STACK_NAME, region=DFLT_REGION):


class AWSBatch(jobmanager.JobManager):
"""
Implementation of parallelisation via AWS Batch.
This uses 2 SQS queues for communication between the
'main' RIOS script and the subprocesses (which run on Batch)
and an S3 bucket to hold the pickled data (which the SQS
messages refer to).
"""
jobMgrType = 'AWSBatch'

def __init__(self, numSubJobs, stackName=DFLT_STACK_NAME,
region=DFLT_REGION):
super().__init__(numSubJobs)
# get the output of the CloudFormation so we know what
# the resources are called.
self.stackOutputs = getStackOutputs(stackName, region)
self.batchClient = boto3.client('batch', region_name=region)
self.s3Client = boto3.client('s3', region_name=region)
self.sqsClient = boto3.client('sqs', region_name=region)

# check they haven't asked for more jobs than we have batch instances
# minus one as one job is always done in this thread
if numSubJobs - 1 > int(self.stackOutputs['BatchMaxJobs']):
print('Number of threads greater than number of MaxJobs input to ' +
'CloudFormation. Consider increasing this number.')

# start the required number of batch jobs running now
# minus one as one job is always done in this thread
for n in range(numSubJobs - 1):
self.batchClient.submit_job(jobName='RIOS_{}'.format(n),
#containerOverrides={'command': ['/usr/bin/python3', '/usr/local/bin/rios_subproc_awsbatch.py']},
jobQueue=self.stackOutputs['BatchProcessingJobQueueName'],
jobDefinition=self.stackOutputs['BatchProcessingJobDefinitionName'])

Expand All @@ -74,27 +86,35 @@ def startOneJob(self, userFunc, jobInfo):

allInputs = (userFunc, jobInfo)
allInputsPickled = cloudpickle.dumps(allInputs)
# save pickled data in file like BytesIO
fileObj = io.BytesIO(allInputsPickled)

# create a unique filename based on the coords of the
# current block.
s3Key = 'block_{}_{}_in.pkl'.format(jobInfo.info.xblock, jobInfo.info.yblock)

# upload this file to S3
self.s3Client.upload_fileobj(fileObj,
self.stackOutputs['BatchBucket'], s3Key)

# send the filename as a message to the Batch workers
self.sqsClient.send_message(QueueUrl=self.stackOutputs['BatchInQueue'],
MessageBody=s3Key)

print('sent', s3Key)

# return the block coords so we can look for this message later
return (jobInfo.info.xblock, jobInfo.info.yblock)

def waitOnJobs(self, jobIDlist):
"""
Wait on all the jobs. Do nothing.
"""
print('wait on jobs')
pass

def gatherAllOutputs(self, jobIDlist):
"""
Gather all the results. Checks the output SQS Queue
"""
# first one that is done in this thread
outputBlocksList = [jobIDlist[0]]
# convert to a set so we can easily search for which blocks
# out 'ours'. They should all be, but I'm just being paranoid
Expand All @@ -103,6 +123,7 @@ def gatherAllOutputs(self, jobIDlist):
inBlocks.add((xblock, yblock))
outputBlocksDict = {}

# look for all the blocks
while len(outputBlocksDict) < len(jobIDlist) - 1:
resp = self.sqsClient.receive_message(
QueueUrl=self.stackOutputs['BatchOutQueue'],
Expand All @@ -113,13 +134,13 @@ def gatherAllOutputs(self, jobIDlist):
for msg in resp['Messages']:
body = msg['Body']
receiptHandle = msg['ReceiptHandle']
# get the info out of the filename
bl, x, y, o = body.split('_')
print('received', body)
x = int(x)
y = int(y)
# one of ours?
if (x, y) in inBlocks:
print('processing', body)
# delete it so we don't see it again
self.sqsClient.delete_message(
QueueUrl=self.stackOutputs['BatchOutQueue'],
ReceiptHandle=receiptHandle)
Expand All @@ -130,8 +151,10 @@ def gatherAllOutputs(self, jobIDlist):
self.stackOutputs['BatchBucket'], body, pickledOutput)
pickledOutput.seek(0)
outputObj = pickle.load(pickledOutput)
# save
outputBlocksDict[(x, y)] = outputObj

# delete pkl
self.s3Client.delete_object(
Bucket=self.stackOutputs['BatchBucket'], Key=body)

Expand Down
8 changes: 7 additions & 1 deletion rios/parallel/aws/templates/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,26 +1,32 @@

# Based off Ubuntu
FROM ubuntu:22.04

# These need to be passed in as --build-args
ARG EXTRA_PACKAGES
ARG PIP_PACKAGES
ARG RIOS_VER

ARG DEBIAN_FRONTEND=noninteractive
ENV PYTHONUNBUFFERED=1

# Use Aussie mirrors
# Use Aussie mirrors (TODO: a way of setting theses?)
RUN sed -i 's/http:\/\/archive./http:\/\/ap-southeast-2.ec2.archive./g' /etc/apt/sources.list
RUN apt-get update
RUN apt-get upgrade -y

# install our prereqs, plus anything else the user has asked for
RUN apt-get install -y python3-gdal python3-boto3 python3-pip $EXTRA_PACKAGES

# Copy the source archive created by the Makefile
COPY rios-$RIOS_VER.tar.gz /tmp
# install RIOS
RUN cd /tmp && tar xf rios-$RIOS_VER.tar.gz \
&& cd rios-$RIOS_VER \
&& pip install . \
&& cd .. && rm -rf rios-$RIOS_VER rios-$RIOS_VER.tar.gz

# Set our subproc script for AWS Batch as the entrypoint.
ENTRYPOINT ["/usr/bin/python3", "/usr/local/bin/rios_subproc_awsbatch.py"]

# any pip packages?
Expand Down
10 changes: 9 additions & 1 deletion rios/parallel/aws/templates/Makefile
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@

# A make file to create and push a docker image with RIOS and any other required packages
# to ECR.
# To request other packages be installed into this docker either or both of:
# EXTRA_PACKAGES environment variable (for Ubuntu pakackes)
# PIP_PACKAGES environment variable (for pip pakackes)
# These accept a space seperated list of package names

ECR_URL := $(shell ./print_ecr_path.py --base)
REPO := $(shell ./print_ecr_path.py):latest
RIOS_VER := $(shell python3 -c 'import rios;print(rios.RIOS_VERSION)')

default: all

# grab the current RIOS source tree and make it available to the
# docker COPY command
dist:
cd ../../../..;python3 setup.py sdist --formats=gztar
cp ../../../../dist/rios-$(RIOS_VER).tar.gz .

# Login to ECR, build package and push to ECR
all: dist
aws ecr get-login-password --region ap-southeast-2 | docker login --username AWS --password-stdin $(ECR_URL)
docker build --build-arg EXTRA_PACKAGES=${EXTRA_PACKAGES} --build-arg=PIP_PACKAGES=${PIP_PACKAGES} --build-arg RIOS_VER=$(RIOS_VER) -t rios .
Expand Down
Loading

0 comments on commit 9089e1e

Please sign in to comment.