From 9089e1ef0c1eefa22f0c9724750ce9ebd4bbf556 Mon Sep 17 00:00:00 2001 From: Sam Gillingham Date: Wed, 9 Aug 2023 09:21:12 +1000 Subject: [PATCH] comments --- bin/rios_subproc_awsbatch.py | 19 ++++- rios/parallel/aws/README.md | 94 ++++++++++++++++++++++ rios/parallel/aws/batch.py | 37 +++++++-- rios/parallel/aws/templates/Dockerfile | 8 +- rios/parallel/aws/templates/Makefile | 10 ++- rios/parallel/aws/templates/batch.yaml | 45 ++++++++++- rios/parallel/aws/templates/createbatch.py | 10 +++ 7 files changed, 209 insertions(+), 14 deletions(-) create mode 100644 rios/parallel/aws/README.md diff --git a/bin/rios_subproc_awsbatch.py b/bin/rios_subproc_awsbatch.py index 618df59d..7991bc76 100755 --- a/bin/rios_subproc_awsbatch.py +++ b/bin/rios_subproc_awsbatch.py @@ -1,6 +1,6 @@ #!/usr/bin/env python """ -Main program for RIOS subprocesses invocked via AWS Batch. +Main program for RIOS subprocesses invoked via AWS Batch. """ # This file is part of RIOS - Raster I/O Simplification @@ -27,6 +27,8 @@ from rios.parallel import subproc +# These vars are set in the container environment +# by CloudFormation BUCKET = os.getenv("RIOSBucket") INQUEUE = os.getenv("RIOSInQueue") OUTQUEUE = os.getenv("RIOSOutQueue") @@ -34,6 +36,9 @@ NOMSG_TIMEOUT_SECS = int(os.getenv('RIOS_NOMSG_TIMEOUT', default=DFLT_NOMSG_TIMEOUT_SECS)) +# keep a track of the last time we got a message +# - if too long we can assume the main script exited +# and exit ourselves LAST_MESSAGE_TIME = time.time() @@ -48,39 +53,48 @@ def main(): sqsClient = boto3.client('sqs', region_name=region) while True: + # get a message from the queue resp = sqsClient.receive_message(QueueUrl=INQUEUE, MaxNumberOfMessages=1, WaitTimeSeconds=20) # must be <= 20 if 'Messages' in resp and len(resp['Messages']) > 0: + # we got something LAST_MESSAGE_TIME = time.time() - # just look at the first one + # just look at the first one (just asked for 1) msg = resp['Messages'][0] body = msg['Body'] receiptHandle = msg['ReceiptHandle'] sqsClient.delete_message( QueueUrl=INQUEUE, ReceiptHandle=receiptHandle) + # message from the main script to exit if body == 'Stop': print('Job Exiting') break print('Started', body) + # get the info out of the pkl filename bl, x, y, o = body.split('_') outfile = 'block_{}_{}_out.pkl'.format(x, y) + # read the input pkl inPkl = io.BytesIO() s3Client.download_fileobj(BUCKET, body, inPkl) inPkl.seek(0) + # delete it s3Client.delete_object(Bucket=BUCKET, Key=body) + # run the job outPkl = io.BytesIO() subproc.runJob(inPkl, outPkl) + # upload the result outPkl.seek(0) s3Client.upload_fileobj(outPkl, BUCKET, outfile) + # send message back to main script sqsClient.send_message(QueueUrl=OUTQUEUE, MessageBody=outfile) @@ -90,6 +104,7 @@ def main(): print('No message received within timeout. Exiting') break else: + # sleep for a bit before checking again time.sleep(30) diff --git a/rios/parallel/aws/README.md b/rios/parallel/aws/README.md new file mode 100644 index 00000000..d4a2f2e7 --- /dev/null +++ b/rios/parallel/aws/README.md @@ -0,0 +1,94 @@ +# Using AWS Services for parallel processing in RIOS + +This directory holds implementations of per tile parallel processing +using AWS services. Currently only AWS Batch is supported but it is +intended that other services will be added in future. + +Refer to jobmanager.py for an overview of how RIOS handles parallel processing. + +## AWS Batch + +### Creating the infrastructure + +This implementation comes with a CloudFormation script (`templates/batch.yaml`) +to create a separate VPC with all the infrastructure required. It is recommended +to use the script `templates/createbatch.py` for the creation or modification (via the `--modify` +command line option) of this CloudFormation stack. There are also options for +overriding some of the input parameters - see the output of `createbatch.py --help` +for more information. + +When you have completed processing you can run `templates/deletebatch.py` to delete +all resources so you aren't paying for it. + +Note that both `createbatch.py` and `deletebatch.py` have a `--wait` option that causes the +script to keep running until creation/deletion is complete. + +### Creating the Docker image + +AWS Batch requires you to provide a Docker image with the required software installed. +A `Dockerfile` is provided for this, but it it recommended that you use the `Makefile` +to build the image as this handles the details of pulling the names out of the CloudFormation +stack and creating a tar file of RIOS for copying into the Docker image. To build and push to +ECR simply run: +``` +make +``` + +By default this image includes GDAL, boto3 and RIOS. + +Normally your script will need extra packages to run. You can specify the names of Ubuntu packages +to also install with the environment variable `EXTRA_PACKAGES` like this: +``` +EXTRA_PACKAGES="python3-sklearn python3-skimage" make +``` + +You can also use the `PIP_PACKAGES` environment variable to set the name of any pip packages like this: +``` +PIP_PACKAGES="pydantic python-dateutil" make +``` + +You can also specify both if needed: +``` +EXTRA_PACKAGES="python3-sklearn python3-skimage" PIP_PACKAGES="pydantic python-dateutil" make +``` + +### Setting up your main script + +To enable parallel processing using AWS Batch in your RIOS script you must import the batch module: +``` +from rios.parallel.aws import batch +``` + +Secondly, you must set up an (Applier Controls)[https://www.rioshome.org/en/latest/rios_applier.html#rios.applier.ApplierControls] +object and pass it to (apply)[https://www.rioshome.org/en/latest/rios_applier.html#rios.applier.apply]. On this +object, you must make the following calls: +``` +controls.setNumThreads(4) # or whatever number you want +controls.setJobManagerType('AWSBatch') +``` + +Note that the number of AWS Batch jobs started will be (numThreads - 1) as one job is done by the main RIOS script. + +It is recommended that you run this main script within a container based on the one above. This reduces the likelihood +of problems introduced by different versions of Python or other packages your script needs between the main RIOS +script and the AWS Batch workers. + +To do this, create a `Dockerfile` like the one below (replacing `myscript.py` with the name of your script): + +``` +# Created by make command above +FROM rios:latest + +COPY myscript.py /usr/local/bin +RUN chmod +x /usr/local/bin/myscript.py + +ENTRYPOINT ["/usr/bin/python3", "/usr/local/bin/myscript.py"] +``` + +Don't forget to pass in your `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables to this +container when it runs (these variables are automatically set if running as a AWS Batch job but you'll +need to set them otherwise). + +Needless to say the account that this "main" script run as should have sufficient permissions on the resources +created by CloudFormation. + diff --git a/rios/parallel/aws/batch.py b/rios/parallel/aws/batch.py index ec3f3c25..c76e9cb7 100644 --- a/rios/parallel/aws/batch.py +++ b/rios/parallel/aws/batch.py @@ -1,5 +1,7 @@ """ -Module for implementing parallel with AWS Batch +Module for implementing parallel with AWS Batch. + +See the class AWSBatch for the implemenation. """ import io @@ -44,25 +46,35 @@ def getStackOutputs(stackName=DFLT_STACK_NAME, region=DFLT_REGION): class AWSBatch(jobmanager.JobManager): + """ + Implementation of parallelisation via AWS Batch. + This uses 2 SQS queues for communication between the + 'main' RIOS script and the subprocesses (which run on Batch) + and an S3 bucket to hold the pickled data (which the SQS + messages refer to). + """ jobMgrType = 'AWSBatch' def __init__(self, numSubJobs, stackName=DFLT_STACK_NAME, region=DFLT_REGION): super().__init__(numSubJobs) + # get the output of the CloudFormation so we know what + # the resources are called. self.stackOutputs = getStackOutputs(stackName, region) self.batchClient = boto3.client('batch', region_name=region) self.s3Client = boto3.client('s3', region_name=region) self.sqsClient = boto3.client('sqs', region_name=region) # check they haven't asked for more jobs than we have batch instances + # minus one as one job is always done in this thread if numSubJobs - 1 > int(self.stackOutputs['BatchMaxJobs']): print('Number of threads greater than number of MaxJobs input to ' + 'CloudFormation. Consider increasing this number.') # start the required number of batch jobs running now + # minus one as one job is always done in this thread for n in range(numSubJobs - 1): self.batchClient.submit_job(jobName='RIOS_{}'.format(n), - #containerOverrides={'command': ['/usr/bin/python3', '/usr/local/bin/rios_subproc_awsbatch.py']}, jobQueue=self.stackOutputs['BatchProcessingJobQueueName'], jobDefinition=self.stackOutputs['BatchProcessingJobDefinitionName']) @@ -74,27 +86,35 @@ def startOneJob(self, userFunc, jobInfo): allInputs = (userFunc, jobInfo) allInputsPickled = cloudpickle.dumps(allInputs) + # save pickled data in file like BytesIO fileObj = io.BytesIO(allInputsPickled) + # create a unique filename based on the coords of the + # current block. s3Key = 'block_{}_{}_in.pkl'.format(jobInfo.info.xblock, jobInfo.info.yblock) + # upload this file to S3 self.s3Client.upload_fileobj(fileObj, self.stackOutputs['BatchBucket'], s3Key) + # send the filename as a message to the Batch workers self.sqsClient.send_message(QueueUrl=self.stackOutputs['BatchInQueue'], MessageBody=s3Key) - print('sent', s3Key) - + # return the block coords so we can look for this message later return (jobInfo.info.xblock, jobInfo.info.yblock) def waitOnJobs(self, jobIDlist): """ Wait on all the jobs. Do nothing. """ - print('wait on jobs') + pass def gatherAllOutputs(self, jobIDlist): + """ + Gather all the results. Checks the output SQS Queue + """ + # first one that is done in this thread outputBlocksList = [jobIDlist[0]] # convert to a set so we can easily search for which blocks # out 'ours'. They should all be, but I'm just being paranoid @@ -103,6 +123,7 @@ def gatherAllOutputs(self, jobIDlist): inBlocks.add((xblock, yblock)) outputBlocksDict = {} + # look for all the blocks while len(outputBlocksDict) < len(jobIDlist) - 1: resp = self.sqsClient.receive_message( QueueUrl=self.stackOutputs['BatchOutQueue'], @@ -113,13 +134,13 @@ def gatherAllOutputs(self, jobIDlist): for msg in resp['Messages']: body = msg['Body'] receiptHandle = msg['ReceiptHandle'] + # get the info out of the filename bl, x, y, o = body.split('_') - print('received', body) x = int(x) y = int(y) # one of ours? if (x, y) in inBlocks: - print('processing', body) + # delete it so we don't see it again self.sqsClient.delete_message( QueueUrl=self.stackOutputs['BatchOutQueue'], ReceiptHandle=receiptHandle) @@ -130,8 +151,10 @@ def gatherAllOutputs(self, jobIDlist): self.stackOutputs['BatchBucket'], body, pickledOutput) pickledOutput.seek(0) outputObj = pickle.load(pickledOutput) + # save outputBlocksDict[(x, y)] = outputObj + # delete pkl self.s3Client.delete_object( Bucket=self.stackOutputs['BatchBucket'], Key=body) diff --git a/rios/parallel/aws/templates/Dockerfile b/rios/parallel/aws/templates/Dockerfile index 3565850a..505f6368 100644 --- a/rios/parallel/aws/templates/Dockerfile +++ b/rios/parallel/aws/templates/Dockerfile @@ -1,6 +1,8 @@ +# Based off Ubuntu FROM ubuntu:22.04 +# These need to be passed in as --build-args ARG EXTRA_PACKAGES ARG PIP_PACKAGES ARG RIOS_VER @@ -8,19 +10,23 @@ ARG RIOS_VER ARG DEBIAN_FRONTEND=noninteractive ENV PYTHONUNBUFFERED=1 -# Use Aussie mirrors +# Use Aussie mirrors (TODO: a way of setting theses?) RUN sed -i 's/http:\/\/archive./http:\/\/ap-southeast-2.ec2.archive./g' /etc/apt/sources.list RUN apt-get update RUN apt-get upgrade -y +# install our prereqs, plus anything else the user has asked for RUN apt-get install -y python3-gdal python3-boto3 python3-pip $EXTRA_PACKAGES +# Copy the source archive created by the Makefile COPY rios-$RIOS_VER.tar.gz /tmp +# install RIOS RUN cd /tmp && tar xf rios-$RIOS_VER.tar.gz \ && cd rios-$RIOS_VER \ && pip install . \ && cd .. && rm -rf rios-$RIOS_VER rios-$RIOS_VER.tar.gz +# Set our subproc script for AWS Batch as the entrypoint. ENTRYPOINT ["/usr/bin/python3", "/usr/local/bin/rios_subproc_awsbatch.py"] # any pip packages? diff --git a/rios/parallel/aws/templates/Makefile b/rios/parallel/aws/templates/Makefile index 87423590..7a1e14da 100644 --- a/rios/parallel/aws/templates/Makefile +++ b/rios/parallel/aws/templates/Makefile @@ -1,4 +1,9 @@ - +# A make file to create and push a docker image with RIOS and any other required packages +# to ECR. +# To request other packages be installed into this docker either or both of: +# EXTRA_PACKAGES environment variable (for Ubuntu pakackes) +# PIP_PACKAGES environment variable (for pip pakackes) +# These accept a space seperated list of package names ECR_URL := $(shell ./print_ecr_path.py --base) REPO := $(shell ./print_ecr_path.py):latest @@ -6,10 +11,13 @@ RIOS_VER := $(shell python3 -c 'import rios;print(rios.RIOS_VERSION)') default: all +# grab the current RIOS source tree and make it available to the +# docker COPY command dist: cd ../../../..;python3 setup.py sdist --formats=gztar cp ../../../../dist/rios-$(RIOS_VER).tar.gz . +# Login to ECR, build package and push to ECR all: dist aws ecr get-login-password --region ap-southeast-2 | docker login --username AWS --password-stdin $(ECR_URL) docker build --build-arg EXTRA_PACKAGES=${EXTRA_PACKAGES} --build-arg=PIP_PACKAGES=${PIP_PACKAGES} --build-arg RIOS_VER=$(RIOS_VER) -t rios . diff --git a/rios/parallel/aws/templates/batch.yaml b/rios/parallel/aws/templates/batch.yaml index ebea1361..72af0e56 100644 --- a/rios/parallel/aws/templates/batch.yaml +++ b/rios/parallel/aws/templates/batch.yaml @@ -1,6 +1,14 @@ --- +# Create all the infrastructure for AWS Batch with RIOS +# This includes a temporary S3 for storing the pickled data files, +# an ECR for saving the docker image that is used for processing, +# SQS Queues for communicating between main script and workers. +# Use the script createbatch.py for creation and modification. +# deletebatch.py for deletion. AWSTemplateFormatVersion: '2010-09-09' Description: 'RIOS with AWS Batch using CloudFormation' +# These values can be altered here or overridden with the params +# for createbatch.py. Parameters: ServiceName: Type: String @@ -25,6 +33,10 @@ Parameters: Default: 32 Resources: + # Create our own vpc for resources so we are separate + # from whatever else the running account has. + # And we can easily determine the output names of resources from + # CloudFormation BatchVPC: Type: AWS::EC2::VPC Properties: @@ -32,6 +44,7 @@ Resources: # Below needed for Batch it seems EnableDnsSupport: 'true' EnableDnsHostnames: 'true' + # Createa subnet for each availability zone BatchSubnet1: Type: AWS::EC2::Subnet Properties: @@ -53,6 +66,7 @@ Resources: Ref: BatchVPC CidrBlock: 10.0.2.0/24 AvailabilityZone: !Ref AZ3 + # A security group for everything to run as BatchSecurityGroup: Type: AWS::EC2::SecurityGroup Properties: @@ -61,6 +75,10 @@ Resources: VpcId: !Ref BatchVPC # Jobs must have internet connectivity otherwise they won't run... + # They need to talk to ECS/S3/CloudWatch. This could possibly be + # accomplished with VPC Endpoints instead... + # Leaving this using an internet gateway so functions can access + # any other resources that are on the internet. RouteTable: Type: "AWS::EC2::RouteTable" Properties: @@ -78,6 +96,7 @@ Resources: DestinationCidrBlock: "0.0.0.0/0" GatewayId: !Ref InternetGateway RouteTableId: !Ref RouteTable + # Associate the route table with each subnet Subnet1RouteTableAssociation: Type: "AWS::EC2::SubnetRouteTableAssociation" Properties: @@ -94,11 +113,13 @@ Resources: RouteTableId: !Ref RouteTable SubnetId: !Ref BatchSubnet3 + # Create an ECR to hold the image that contains RIOS + # (and any other packages the function needs) + # Expires older untagged images BatchRepository: Type: AWS::ECR::Repository Properties: RepositoryName: !Join ['', [!Ref ServiceName, "ecr"]] - LifecyclePolicy: LifecyclePolicyText: | { @@ -118,22 +139,29 @@ Resources: } ] } - + + # An input queue for passing information from the main RIOS + # script to the Batch workers BatchInQueue: Type: AWS::SQS::Queue Properties: QueueName: !Join ['', [!Ref ServiceName, InQueue]] + # an output queue for passing information from the batch + # workers back to the main RIOS script. BatchOutQueue: Type: AWS::SQS::Queue Properties: QueueName: !Join ['', [!Ref ServiceName, OutQueue]] + # A bucket for holding the pickled information for each tile + # that is refered to in the SQS messages. BatchBucket: Type: AWS::S3::Bucket Properties: BucketName: !Join ['', [!Ref ServiceName, bucket]] + # Ensure the workers have enough access to the S3 bucket AccessS3ManagedPolicy: Type: AWS::IAM::ManagedPolicy Properties: @@ -148,10 +176,12 @@ Resources: - !Join ['/', [!GetAtt BatchBucket.Arn, '*']] - Effect: Allow Action: + # we use GetBucketLocation to determin which region we are in - 's3:GetBucketLocation' Resource: - !GetAtt BatchBucket.Arn - + + # Ensure workers can access the queues AccessQueuesManagedPolicy: Type: AWS::IAM::ManagedPolicy Properties: @@ -169,6 +199,7 @@ Resources: - !GetAtt BatchInQueue.Arn - !GetAtt BatchOutQueue.Arn + # Needed by AWS Batch. BatchServiceRole: Type: AWS::IAM::Role Properties: @@ -186,6 +217,8 @@ Resources: Properties: Roles: - Ref: EcsInstanceRole + # This is the user that the batch workers run as. + # Ensure we attach all the permissions it will need. EcsInstanceRole: Type: AWS::IAM::Role Properties: @@ -203,6 +236,8 @@ Resources: - arn:aws:iam::aws:policy/AWSCloudFormationReadOnlyAccess - !Ref AccessS3ManagedPolicy - !Ref AccessQueuesManagedPolicy + # The worker job. Set the S3 bucket and SQS queues info in the + # enironment. BatchProcessingJobDefinition: Type: AWS::Batch::JobDefinition Properties: @@ -219,6 +254,7 @@ Resources: Value: !Ref BatchInQueue - Name: "RIOSOutQueue" Value: !Ref BatchOutQueue + # Our queue BatchProcessingJobQueue: Type: AWS::Batch::JobQueue Properties: @@ -228,6 +264,7 @@ Resources: - Order: 1 ComputeEnvironment: Ref: ComputeEnvironment + # Compute Environment - set subnets and security group etc. ComputeEnvironment: Type: AWS::Batch::ComputeEnvironment Properties: @@ -249,6 +286,8 @@ Resources: ServiceRole: Ref: BatchServiceRole +# Outputs that the main script can queury to find +# the names and paths of things. Outputs: VPC: Value: diff --git a/rios/parallel/aws/templates/createbatch.py b/rios/parallel/aws/templates/createbatch.py index bba1f375..f455b609 100755 --- a/rios/parallel/aws/templates/createbatch.py +++ b/rios/parallel/aws/templates/createbatch.py @@ -3,6 +3,12 @@ """ Helper script to create the RIOS Batch CloudFormation. Optionally wait until stack is created before returning. + +The --modify command line argument allows modification of +an existing stack. + +All the other parameters for the CloudFormation stack can +be altered with the command line params. """ import time @@ -77,6 +83,7 @@ def createBatch(stackname, region, azs, ecrName, vCPUs, maxMem, maxJobs, Do the work of creating the CloudFormation Stack """ + # set overridden stack parameters params = [] if azs is not None: addParam(params, 'AZ1', azs[0]) @@ -100,12 +107,14 @@ def createBatch(stackname, region, azs, ecrName, vCPUs, maxMem, maxJobs, client = boto3.client('cloudformation', region_name=region) if modify: + # modify stack resp = client.update_stack(StackName=stackname, TemplateBody=body, Capabilities=['CAPABILITY_IAM'], Parameters=params) inProgressStatus = 'UPDATE_IN_PROGRESS' else: + # create stack resp = client.create_stack(StackName=stackname, TemplateBody=body, Capabilities=['CAPABILITY_IAM'], Parameters=params) @@ -115,6 +124,7 @@ def createBatch(stackname, region, azs, ecrName, vCPUs, maxMem, maxJobs, print('StackId: {}'.format(stackId)) status = None + # If they asked to wait, loop calling describe_stacks if wait: while True: time.sleep(30)