Skip to content

Commit

Permalink
[WIP] Add test cases for AWS identity verification.
Browse files Browse the repository at this point in the history
  • Loading branch information
Benjamin Moody committed Nov 7, 2023
1 parent 51b4b91 commit cab730e
Showing 1 changed file with 167 additions and 2 deletions.
169 changes: 167 additions & 2 deletions physionet-django/user/test_views.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import contextlib
import datetime
import json
import logging
import os
import pdb
Expand All @@ -16,12 +17,22 @@
from django.core import mail
from django.core.management import call_command
from django.core.files.uploadedfile import SimpleUploadedFile
from django.test import RequestFactory, TestCase
from django.test import RequestFactory, TestCase, override_settings
from django.urls import reverse
from django.utils import timezone
import requests_mock

from user.enums import TrainingStatus
from user.models import AssociatedEmail, Profile, User, Training, TrainingType, Question, TrainingQuestion
from user.models import (
AssociatedEmail,
CloudInformation,
Profile,
Question,
Training,
TrainingQuestion,
TrainingType,
User,
)
from user.views import (activate_user, edit_emails, edit_profile,
edit_password_complete, public_profile, register, user_settings,
verify_email)
Expand Down Expand Up @@ -666,3 +677,157 @@ def test_reject_training_invalid(self, mock_get_info_from_certificate_pdf):

self.assertEqual(response.status_code, 200)
self.assertEqual(self.training.status, TrainingStatus.REVIEW)


@override_settings(
AWS_VERIFICATION_BUCKET_NAME='example-bucket',
AWS_VERIFICATION_BUCKET_REGION='us-east-2',
)
class TestAWSVerification(TestCase):
"""
Test AWS user verification
"""

USER_EMAIL = '[email protected]'
AWS_ACCOUNT = '314159265359'
AWS_USERID = 'AIDAAAAAAAAAAAAAAAAAA'
AWS_ARN = 'arn:aws:iam::314159265359:user/tim'
IDENTITY_JSON = json.dumps({
'UserId': AWS_USERID,
'Account': AWS_ACCOUNT,
'Arn': AWS_ARN,
})
SIGNED_URL = (
'https://example-bucket.s3.amazonaws.com/'
'localhost%3A8000-verification/'
'account%3D' + AWS_ACCOUNT + '/'
'userid%3D' + AWS_USERID + '/'
'email%3D' + USER_EMAIL.replace("@", "%40") + '/'
'?X-Amz-Algorithm=AWS4-HMAC-SHA256'
'&X-Amz-Credential=AKIAZZZZZZZZZZZZZZZZ'
'%2F20231101%2Fus-east-2%2Fs3%2Faws4_request'
'&X-Amz-Date=20231101T170000Z'
'&X-Amz-Expires=3600'
'&X-Amz-SignedHeaders=host'
'&X-Amz-Signature='
'0123456789abcdef0123456789abcdef'
'0123456789abcdef0123456780abcdef'
)

S3_RESPONSE_403 = (
'<?xml version="1.0" encoding="UTF-8"?>'
'<Error><Code>AccessDenied</Code>'
'<Message>Access Denied</Message></Error>'
)
S3_RESPONSE_404 = (
'<?xml version="1.0" encoding="UTF-8"?>'
'<Error><Code>NoSuchKey</Code>'
'<Message>The specified key does not exist.</Message></Error>'
)

def get_cloud_information(self):
user = User.objects.get(email=self.USER_EMAIL)
cloud_info, _ = CloudInformation.objects.get_or_create(user=user)
return cloud_info

@requests_mock.Mocker()
def test_verify_success(self, mocker):
"""
Test successfully adding a valid AWS identity
"""
self.client.login(username=self.USER_EMAIL, password='Tester11!')

cloud_info = self.get_cloud_information()
cloud_info.aws_id = None
cloud_info.aws_userid = None
cloud_info.aws_verification_datetime = None
cloud_info.save()

response = self.client.post(
reverse('edit_cloud'),
data={'save-aws': '', 'aws_identity': self.IDENTITY_JSON},
)
self.assertEqual(response.status_code, 302)
self.assertEqual(response['location'], reverse('edit_cloud_aws'))

# Assuming signature is correct, this URL should give a 404.
# With signature missing, it should give 403.
mocker.get(
self.SIGNED_URL, complete_qs=True,
status_code=404, text=self.S3_RESPONSE_404,
)
mocker.get(
self.SIGNED_URL.split('?')[0], complete_qs=True,
status_code=403, text=self.S3_RESPONSE_403,
)
response = self.client.post(
reverse('edit_cloud_aws'),
data={'signed_url': self.SIGNED_URL},
)
self.assertEqual(response.status_code, 302)
self.assertEqual(response['location'], reverse('edit_cloud'))

cloud_info = self.get_cloud_information()
self.assertEqual(cloud_info.aws_id, self.AWS_ACCOUNT)
self.assertEqual(cloud_info.aws_userid, self.AWS_USERID)
self.assertIsNotNone(cloud_info.aws_verification_datetime)

@requests_mock.Mocker()
def test_verify_failure(self, mocker):
"""
Test failure to add an invalid AWS identity
"""
self.client.login(username=self.USER_EMAIL, password='Tester11!')

cloud_info = self.get_cloud_information()
cloud_info.aws_id = None
cloud_info.aws_userid = None
cloud_info.aws_verification_datetime = None
cloud_info.save()

response = self.client.post(
reverse('edit_cloud'),
data={'save-aws': '', 'aws_identity': self.IDENTITY_JSON},
)
self.assertEqual(response.status_code, 302)
self.assertEqual(response['location'], reverse('edit_cloud_aws'))

# Assuming signature is wrong, this URL should give a 403.
mocker.get(
self.SIGNED_URL, complete_qs=True,
status_code=403, text=self.S3_RESPONSE_403,
)
response = self.client.post(
reverse('edit_cloud_aws'),
data={'signed_url': self.SIGNED_URL},
)
self.assertEqual(response.status_code, 200)

cloud_info = self.get_cloud_information()
self.assertIsNone(cloud_info.aws_id)
self.assertIsNone(cloud_info.aws_userid)
self.assertIsNone(cloud_info.aws_verification_datetime)

@requests_mock.Mocker()
def test_delete_info(self, mocker):
"""
Test deleting an existing AWS identity
"""
self.client.login(username=self.USER_EMAIL, password='Tester11!')

cloud_info = self.get_cloud_information()
cloud_info.aws_id = self.AWS_ACCOUNT
cloud_info.aws_userid = self.AWS_USERID
cloud_info.aws_verification_datetime = timezone.now()
cloud_info.save()

response = self.client.post(
reverse('edit_cloud'),
data={'delete-aws': ''},
)
self.assertEqual(response.status_code, 200)

cloud_info = self.get_cloud_information()
self.assertIsNone(cloud_info.aws_id)
self.assertIsNone(cloud_info.aws_userid)
self.assertIsNone(cloud_info.aws_verification_datetime)

0 comments on commit cab730e

Please sign in to comment.