From 46ad8f23b902265a404da2c0ab866de3738ecf77 Mon Sep 17 00:00:00 2001 From: Benjamin Moody Date: Mon, 23 Oct 2023 11:59:29 -0400 Subject: [PATCH] [WIP] user.awsverification: new module. This module contains functions for authenticating a person's AWS identity (account ID and user ID) by means of signed URLs. Amazon S3 authenticates clients using a per-request "signature" that incorporates the request path and headers together with a secret key held by the client. This means that the client can pre-compute this signature and send it to someone else, allowing the recipient to perform that request on that client's behalf, without revealing the secret key itself. We can arrange to create an S3 URL that can only be accessed by a particular AWS identity, and then ask someone to pre-compute the signature that they would use to access that resource (which they can do using the AWS CLI or other S3-compatible tools and libraries.) If we then submit that signature to S3 and it succeeds, we know that the requester holds the secret key for that identity. In fact, the resource in question doesn't need to actually exist, as long as we can tell the difference between an unauthorized request (HTTP 403) and an authorized request for something that doesn't exist (HTTP 404). --- physionet-django/physionet/settings/base.py | 4 + physionet-django/user/awsverification.py | 299 ++++++++++++++++++++ 2 files changed, 303 insertions(+) create mode 100644 physionet-django/user/awsverification.py diff --git a/physionet-django/physionet/settings/base.py b/physionet-django/physionet/settings/base.py index ca7cb457f7..65d387d714 100644 --- a/physionet-django/physionet/settings/base.py +++ b/physionet-django/physionet/settings/base.py @@ -232,6 +232,10 @@ AWS_HEADER_VALUE2 = config('AWS_VALUE2', default=False) AWS_CLOUD_FORMATION = config('AWS_CLOUD_FORMATION', default=False) +# User verification bucket (see user/awsverification.py) +AWS_VERIFICATION_BUCKET_NAME = config('AWS_VERIFICATION_BUCKET_NAME', default=None) +AWS_VERIFICATION_BUCKET_REGION = config('AWS_VERIFICATION_BUCKET_REGION', default=None) + # Tags for the DataCite API used for DOI DATACITE_API_URL = config('DATACITE_API_URL', default='https://api.test.datacite.org/dois') DATACITE_PREFIX = config('DATACITE_PREFIX', default='') diff --git a/physionet-django/user/awsverification.py b/physionet-django/user/awsverification.py new file mode 100644 index 0000000000..716165430d --- /dev/null +++ b/physionet-django/user/awsverification.py @@ -0,0 +1,299 @@ +import json +import re +import urllib.parse + +import boto3 +from django.conf import settings +import requests + + +def aws_verification_available(): + """ + Check whether the site is configured for AWS account authentication. + """ + return bool(settings.AWS_VERIFICATION_BUCKET_NAME + and settings.AWS_VERIFICATION_BUCKET_REGION) + + +def get_aws_verification_key(site_domain, user_email, aws_account, aws_userid): + """ + Generate an S3 key used to authenticate an AWS user. + + This is a string that must be signed by the user, and then + verified by Amazon S3, to verify the user's credentials. + """ + quoted_email = urllib.parse.quote(user_email, safe='@') + return (f'{site_domain}-verification/' + f'account={aws_account}/' + f'userid={aws_userid}/' + f'email={quoted_email}/') + + +def parse_aws_verification_key(site_domain, user_email, key): + """ + Extract user information from a verification key. + """ + unquoted_key = urllib.parse.unquote(key) + match = re.fullmatch(r'(?P[^/]+)-verification/' + r'account=(?P[^/]+)/' + r'userid=(?P[^/]+)/' + r'email=(?P[^/]+)/', unquoted_key) + if (not match + or match.group('site') != site_domain + or match.group('email') != user_email): + raise InvalidVerificationKey(key) + return { + 'account': match.group('account'), + 'userid': match.group('userid'), + } + + +def get_aws_verification_command(site_domain, user_email, + aws_account, aws_userid): + """ + Generate a shell command used to authenticate an AWS user. + + After the user enters their account ID and user ID, they will be + asked to run this command and copy its output into the form. The + output of the command is a signed URL: it proves that the person + who generated it has appropriate AWS credentials, without + revealing the person's secret key. + """ + bucket = settings.AWS_VERIFICATION_BUCKET_NAME + region = settings.AWS_VERIFICATION_BUCKET_REGION + if not bucket or not region: + raise AWSVerificationNotConfigured + + key = get_aws_verification_key(site_domain, user_email, + aws_account, aws_userid) + return f'aws s3 presign s3://{bucket}/{key} --region {region}' + + +def check_aws_verification_url(site_domain, user_email, signed_url): + """ + Verify a signed URL to determine a user's AWS identity. + + To verify their AWS identity, the user is asked to generate a + specific signed URL. If the URL is correct and valid, this + function returns a dictionary containing the person's verified + account ID and user ID. + + For this to work, the verification bucket must be configured by + calling configure_aws_verification_bucket(). + """ + bucket = settings.AWS_VERIFICATION_BUCKET_NAME + region = settings.AWS_VERIFICATION_BUCKET_REGION + if not bucket or not region: + raise AWSVerificationNotConfigured + + try: + unsigned_url, query = signed_url.split('?') + query_dict = urllib.parse.parse_qs(query) + except ValueError: + raise InvalidSignedURL(signed_url) + + # Check whether this appears to be an AWS signed URL (either old + # or new format.) + query_keys = set(query_dict.keys()) + if query_keys >= {'X-Amz-Algorithm', 'X-Amz-Credential', + 'X-Amz-Date', 'X-Amz-Expires', + 'X-Amz-SignedHeaders', 'X-Amz-Signature'}: + pass + elif query_keys >= {'AWSAccessKeyId', 'Signature', 'Expires'}: + pass + else: + raise InvalidSignedURL(signed_url) + + # Check whether the URL corresponds to the correct bucket name. + # Any of these base URLs might be used depending on the region and + # the client configuration. + base_urls = [ + f'https://{bucket}.s3.{region}.amazonaws.com/', + f'https://s3.{region}.amazonaws.com/{bucket}/', + f'https://{bucket}.s3.amazonaws.com/', + f'https://s3.amazonaws.com/{bucket}/', + ] + for base_url in base_urls: + if unsigned_url.startswith(base_url): + key = unsigned_url[len(base_url):] + break + else: + raise InvalidS3Hostname(signed_url) + + # Parse the path and extract account info. + + account_info = parse_aws_verification_key(site_domain, user_email, key) + + # Finally, verify the signature. + + with requests.Session() as session: + # If the signature is correct, and the account and userid are + # correct as determined by the bucket policy, then S3 should + # return a 404 response (because the resource doesn't, in + # fact, exist.) + response = session.get(signed_url) + if response.status_code != 404: + raise InvalidAWSSignature(signed_url, response) + + # As a sanity check, verify that S3 returns a 403 response if + # the AWS signature is missing. + response = session.get(unsigned_url) + if response.status_code != 403: + raise BadBucketPolicy(unsigned_url, response) + + # FIXME: sanity-check the bucket ownership and/or access + # policy. Note we can't attach an X-Amz-Expected-Bucket-Owner + # header to a pre-signed URL (that header would have to be + # included in the signature, and I don't think there's a way + # to do that with awscli.) + + return account_info + + +class AWSVerificationFailed(Exception): + """Generic exception used if AWS user cannot be verified.""" + + +class AWSVerificationNotConfigured(AWSVerificationFailed): + """Required settings for AWS verification are not defined.""" + + +class InvalidSignedURL(AWSVerificationFailed): + """Client-supplied URL does not appear to be an AWS signed URL.""" + + +class InvalidS3Hostname(AWSVerificationFailed): + """Client-supplied URL does not match expected S3 hostname.""" + + +class InvalidVerificationKey(AWSVerificationFailed): + """Client-supplied URL does not match expected verification key.""" + + +class InvalidAWSSignature(AWSVerificationFailed): + """Client-supplied URL cannot be verified by AWS.""" + + +class BadBucketPolicy(AWSVerificationFailed): + """Verification bucket is not correctly configured.""" + + +def configure_aws_verification_bucket(bucket_name, bucket_region): + """ + Configure an S3 bucket to be used for identity verification. + """ + s3 = boto3.client('s3', region_name=bucket_region) + try: + s3.create_bucket(Bucket=bucket_name) + except s3.exceptions.BucketAlreadyOwnedByYou: + pass + + s3.put_public_access_block( + Bucket=bucket_name, + PublicAccessBlockConfiguration={ + "BlockPublicAcls": False, + "IgnorePublicAcls": False, + "BlockPublicPolicy": False, + "RestrictPublicBuckets": False, + }, + ) + + policy = json.dumps({ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": "*", + "Action": "s3:GetObject", + "Resource": (f"arn:aws:s3:::{bucket_name}/" + + "*-verification/" + + "account=${aws:PrincipalAccount}/" + + "userid=${aws:userid}/" + + "email=*/"), + }, + { + "Effect": "Allow", + "Principal": "*", + "Action": "s3:ListBucket", + "Resource": f"arn:aws:s3:::{bucket_name}", + "Condition": { + "StringLike": { + "s3:prefix": ("*-verification/" + + "account=${aws:PrincipalAccount}/" + + "userid=${aws:userid}/" + + "email=*/"), + }, + }, + }, + ], + }) + + s3.put_bucket_policy(Bucket=bucket_name, Policy=policy) + + +def test_aws_verification_bucket(bucket_name, bucket_region): + """ + Test functionality of an identity verification bucket. + """ + s3 = boto3.client('s3', region_name=bucket_region) + sts = boto3.client('sts') + + identity = sts.get_caller_identity() + aws_account = identity['Account'] + aws_userid = identity['UserId'] + + def assert_response(url, expected_status): + response = requests.get(url, headers=headers) + if response.status_code != expected_status: + raise Exception( + f"Expected {expected_status} for {url}, got instead:\n" + f" {response.status_code} {response.reason}\n\n" + f" {response.content}\n" + ) + + def tweak(string): + return string.translate({ord(i): ord(j) for i, j in zip( + '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz+/', + '1032547698BADCFEHGJILKNMPORQTSVUXWZYbadcfehgjilknmporqtsvuxwzy/+' + )}) + + site_domain = 'physionet.org' + user_email = 'root@example.com' + + # Correct signed URL should give a 404 + signed_url = s3.generate_presigned_url('get_object', Params={ + 'Bucket': bucket_name, + 'Key': get_aws_verification_key(site_domain, user_email, + aws_account, aws_userid), + }) + assert_response(signed_url, 404) + + # URL without signature should give a 403 + unsigned_url, query = signed_url.split('?') + assert_response(unsigned_url, 403) + + # Wrong signature should give a 403 + query_dict = dict(urllib.parse.parse_qsl(query)) + for key in ('Signature', 'X-Amz-Signature'): + if key in query_dict: + query_dict[key] = tweak(query_dict[key]) + wrong_url = unsigned_url + '?' + urllib.urlencode(query_dict) + assert_response(wrong_url, 403) + + # Signed URL with wrong account ID should give a 403 + wrong_account = tweak(aws_account) + wrong_url = s3.generate_presigned_url('get_object', Params={ + 'Bucket': bucket_name, + 'Key': get_aws_verification_key(site_domain, user_email, + wrong_account, aws_userid), + }) + assert_response(wrong_url, 403) + + # Signed URL with wrong user ID should give a 403 + wrong_userid = tweak(aws_userid) + wrong_url = s3.generate_presigned_url('get_object', Params={ + 'Bucket': bucket_name, + 'Key': get_aws_verification_key(site_domain, user_email, + aws_account, wrong_userid) + }) + assert_response(wrong_url, 403)