Skip to content

Commit

Permalink
[WIP] user.awsverification: new module.
Browse files Browse the repository at this point in the history
This module contains functions for authenticating a person's AWS
identity (account ID and user ID) by means of signed URLs.

Amazon S3 authenticates clients using a per-request "signature" that
incorporates the request path and headers together with a secret key
held by the client.  This means that the client can pre-compute this
signature and send it to someone else, allowing the recipient to
perform that request on that client's behalf, without revealing the
secret key itself.

We can arrange to create an S3 URL that can only be accessed by a
particular AWS identity, and then ask someone to pre-compute the
signature that they would use to access that resource (which they can
do using the AWS CLI or other S3-compatible tools and libraries.)

If we then submit that signature to S3 and it succeeds, we know that
the requester holds the secret key for that identity.  In fact, the
resource in question doesn't need to actually exist, as long as we can
tell the difference between an unauthorized request (HTTP 403) and an
authorized request for something that doesn't exist (HTTP 404).
  • Loading branch information
Benjamin Moody committed Oct 30, 2023
1 parent 4a20ba9 commit ee7a7b4
Show file tree
Hide file tree
Showing 2 changed files with 307 additions and 0 deletions.
5 changes: 5 additions & 0 deletions physionet-django/physionet/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@
AWS_HEADER_VALUE2 = config('AWS_VALUE2', default=False)
AWS_CLOUD_FORMATION = config('AWS_CLOUD_FORMATION', default=False)

# User verification bucket (see user/awsverification.py)
AWS_VERIFICATION_BUCKET_NAME = config('AWS_VERIFICATION_BUCKET_NAME', default=None)
AWS_VERIFICATION_BUCKET_REGION = config('AWS_VERIFICATION_BUCKET_REGION', default=None)
AWS_VERIFICATION_BUCKET_OWNER = config('AWS_VERIFICATION_BUCKET_OWNER', default=None)

# Tags for the DataCite API used for DOI
DATACITE_API_URL = config('DATACITE_API_URL', default='https://api.test.datacite.org/dois')
DATACITE_PREFIX = config('DATACITE_PREFIX', default='')
Expand Down
302 changes: 302 additions & 0 deletions physionet-django/user/awsverification.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,302 @@
import json
import re
import urllib.parse

import boto3
from django.conf import settings
import requests


def aws_verification_available():
"""
Check whether the site is configured for AWS account authentication.
"""
return bool(settings.AWS_VERIFICATION_BUCKET_NAME
and settings.AWS_VERIFICATION_BUCKET_REGION
and settings.AWS_VERIFICATION_BUCKET_OWNER)


def get_aws_verification_key(site_domain, user_email, aws_account, aws_userid):
"""
Generate an S3 key used to authenticate an AWS user.
This is a string that must be signed by the user, and then
verified by Amazon S3, to verify the user's credentials.
"""
quoted_email = urllib.parse.quote(user_email, safe='@')
return (f'{site_domain}-verification/'
f'account={aws_account}/'
f'userid={aws_userid}/'
f'email={quoted_email}/')


def parse_aws_verification_key(site_domain, user_email, key):
"""
Extract user information from a verification key.
"""
unquoted_key = urllib.parse.unquote(key)
match = re.fullmatch(r'(?P<site>[^/]+)-verification/'
r'account=(?P<account>[^/]+)/'
r'userid=(?P<userid>[^/]+)/'
r'email=(?P<email>[^/]+)/', unquoted_key)
if (not match
or match.group('site') != site_domain
or match.group('email') != user_email):
raise InvalidVerificationKey(key)
return {
'account': match.group('account'),
'userid': match.group('userid'),
}


def get_aws_verification_command(site_domain, user_email,
aws_account, aws_userid):
"""
Generate a shell command used to authenticate an AWS user.
After the user enters their account ID and user ID, they will be
asked to run this command and copy its output into the form. The
output of the command is a signed URL: it proves that the person
who generated it has appropriate AWS credentials, without
revealing the person's secret key.
"""
bucket = settings.AWS_VERIFICATION_BUCKET_NAME
region = settings.AWS_VERIFICATION_BUCKET_REGION
if not bucket or not region:
raise AWSVerificationNotConfigured

key = get_aws_verification_key(site_domain, user_email,
aws_account, aws_userid)
return f'aws s3 presign s3://{bucket}/{key} --region {region}'


def check_aws_verification_url(site_domain, user_email, signed_url):
"""
Verify a signed URL to determine a user's AWS identity.
To verify their AWS identity, the user is asked to generate a
specific signed URL. If the URL is correct and valid, this
function returns a dictionary containing the person's verified
account ID and user ID.
For this to work, the verification bucket must be configured by
calling configure_aws_verification_bucket().
"""
bucket = settings.AWS_VERIFICATION_BUCKET_NAME
region = settings.AWS_VERIFICATION_BUCKET_REGION
owner = settings.AWS_VERIFICATION_BUCKET_OWNER
if not bucket or not region or not owner:
raise AWSVerificationNotConfigured

try:
unsigned_url, query = signed_url.split('?')
query_dict = urllib.parse.parse_qs(query)
except ValueError:
raise InvalidSignedURL(signed_url)

# Check whether this appears to be an AWS signed URL (either old
# or new format.)
query_keys = set(query_dict.keys())
if query_keys >= {'X-Amz-Algorithm', 'X-Amz-Credential',
'X-Amz-Date', 'X-Amz-Expires',
'X-Amz-SignedHeaders', 'X-Amz-Signature'}:
pass
elif query_keys >= {'AWSAccessKeyId', 'Signature', 'Expires'}:
pass
else:
raise InvalidSignedURL(signed_url)

# Check whether the URL corresponds to the correct bucket name.
# Any of these base URLs might be used depending on the region and
# the client configuration.
base_urls = [
f'https://{bucket}.s3.{region}.amazonaws.com/',
f'https://s3.{region}.amazonaws.com/{bucket}/',
f'https://{bucket}.s3.amazonaws.com/',
f'https://s3.amazonaws.com/{bucket}/',
]
for base_url in base_urls:
if unsigned_url.startswith(base_url):
key = unsigned_url[len(base_url):]
break
else:
raise InvalidS3Hostname(signed_url)

# Parse the path and extract account info.

account_info = parse_aws_verification_key(site_domain, user_email, key)

# Finally, verify the signature.

with requests.Session() as session:
headers = {'X-Amz-Expected-Bucket-Owner': owner}

# If the signature is correct, and the account and userid are
# correct as determined by the bucket policy, then S3 should
# return a 404 response (because the resource doesn't, in
# fact, exist.)
response = session.get(signed_url, headers=headers)
if response.status_code != 404:
raise InvalidAWSSignature(signed_url, response)

# As a sanity check, verify that S3 returns a 403 response if
# the AWS signature is missing.
response = session.get(unsigned_url, headers=headers)
if response.status_code != 403:
raise BadBucketPolicy(unsigned_url, response)

return account_info


class AWSVerificationFailed(Exception):
"""Generic exception used if AWS user cannot be verified."""


class AWSVerificationNotConfigured(AWSVerificationFailed):
"""Required settings for AWS verification are not defined."""


class InvalidSignedURL(AWSVerificationFailed):
"""Client-supplied URL does not appear to be an AWS signed URL."""


class InvalidS3Hostname(AWSVerificationFailed):
"""Client-supplied URL does not match expected S3 hostname."""


class InvalidVerificationKey(AWSVerificationFailed):
"""Client-supplied URL does not match expected verification key."""


class InvalidAWSSignature(AWSVerificationFailed):
"""Client-supplied URL cannot be verified by AWS."""


class BadBucketPolicy(AWSVerificationFailed):
"""Verification bucket is not correctly configured."""


def configure_aws_verification_bucket(bucket_name, bucket_region):
"""
Configure an S3 bucket to be used for identity verification.
"""
s3 = boto3.client('s3', region_name=bucket_region)
try:
s3.create_bucket(Bucket=bucket_name)
except s3.exceptions.BucketAlreadyOwnedByYou:
pass

s3.put_public_access_block(
Bucket=bucket_name,
PublicAccessBlockConfiguration={
"BlockPublicAcls": False,
"IgnorePublicAcls": False,
"BlockPublicPolicy": False,
"RestrictPublicBuckets": False,
},
)

policy = json.dumps({
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": "*",
"Action": "s3:GetObject",
"Resource": (f"arn:aws:s3:::{bucket_name}/"
+ "*-verification/"
+ "account=${aws:PrincipalAccount}/"
+ "userid=${aws:userid}/"
+ "email=*/"),
},
{
"Effect": "Allow",
"Principal": "*",
"Action": "s3:ListBucket",
"Resource": f"arn:aws:s3:::{bucket_name}",
"Condition": {
"StringLike": {
"s3:prefix": ("*-verification/"
+ "account=${aws:PrincipalAccount}/"
+ "userid=${aws:userid}/"
+ "email=*/"),
},
},
},
],
})

s3.put_bucket_policy(Bucket=bucket_name, Policy=policy)


def test_aws_verification_bucket(bucket_name, bucket_region, bucket_owner):
"""
Test functionality of an identity verification bucket.
"""
s3 = boto3.client('s3', region_name=bucket_region)
sts = boto3.client('sts')

identity = sts.get_caller_identity()
aws_account = identity['Account']
aws_userid = identity['UserId']

def assert_response(url, expected_owner, expected_status):
headers = {'X-Amz-Expected-Bucket-Owner': expected_owner}
response = requests.get(url, headers=headers)
if response.status_code != expected_status:
raise Exception(
f"Expected {expected_status} for {url}, got instead:\n"
f" {response.status_code} {response.reason}\n\n"
f" {response.content}\n"
)

def tweak(string):
return string.translate({ord(i): ord(j) for i, j in zip(
'0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz+/',
'1032547698BADCFEHGJILKNMPORQTSVUXWZYbadcfehgjilknmporqtsvuxwzy/+'
)})

site_domain = 'physionet.org'
user_email = '[email protected]'

# Correct signed URL should give a 404
signed_url = s3.generate_presigned_url('get_object', Params={
'Bucket': bucket_name,
'Key': get_aws_verification_key(site_domain, user_email,
aws_account, aws_userid),
})
assert_response(signed_url, bucket_owner, 404)

# URL without signature should give a 403
unsigned_url, query = signed_url.split('?')
assert_response(unsigned_url, bucket_owner, 403)

# Wrong bucket owner should give a 403
wrong_owner = tweak(bucket_owner)
assert_response(signed_url, wrong_owner, 403)

# Wrong signature should give a 403
query_dict = dict(urllib.parse.parse_qsl(query))
for key in ('Signature', 'X-Amz-Signature'):
if key in query_dict:
query_dict[key] = tweak(query_dict[key])
wrong_url = unsigned_url + '?' + urllib.urlencode(query_dict)
assert_response(wrong_url, bucket_owner, 403)

# Signed URL with wrong account ID should give a 403
wrong_account = tweak(aws_account)
wrong_url = s3.generate_presigned_url('get_object', Params={
'Bucket': bucket_name,
'Key': get_aws_verification_key(site_domain, user_email,
wrong_account, aws_userid),
})
assert_response(wrong_url, bucket_owner, 403)

# Signed URL with wrong user ID should give a 403
wrong_userid = tweak(aws_userid)
wrong_url = s3.generate_presigned_url('get_object', Params={
'Bucket': bucket_name,
'Key': get_aws_verification_key(site_domain, user_email,
aws_account, wrong_userid)
})
assert_response(wrong_url, bucket_owner, 403)

0 comments on commit ee7a7b4

Please sign in to comment.