diff --git a/physionet-django/user/test_views.py b/physionet-django/user/test_views.py index 3c36d906b8..b050b78cd5 100644 --- a/physionet-django/user/test_views.py +++ b/physionet-django/user/test_views.py @@ -1,5 +1,6 @@ import contextlib import datetime +import json import logging import os import pdb @@ -16,12 +17,22 @@ from django.core import mail from django.core.management import call_command from django.core.files.uploadedfile import SimpleUploadedFile -from django.test import RequestFactory, TestCase +from django.test import RequestFactory, TestCase, override_settings from django.urls import reverse from django.utils import timezone +import requests_mock from user.enums import TrainingStatus -from user.models import AssociatedEmail, Profile, User, Training, TrainingType, Question, TrainingQuestion +from user.models import ( + AssociatedEmail, + CloudInformation, + Profile, + Question, + Training, + TrainingQuestion, + TrainingType, + User, +) from user.views import (activate_user, edit_emails, edit_profile, edit_password_complete, public_profile, register, user_settings, verify_email) @@ -666,3 +677,157 @@ def test_reject_training_invalid(self, mock_get_info_from_certificate_pdf): self.assertEqual(response.status_code, 200) self.assertEqual(self.training.status, TrainingStatus.REVIEW) + + +@override_settings( + AWS_VERIFICATION_BUCKET_NAME='example-bucket', + AWS_VERIFICATION_BUCKET_REGION='us-east-2', +) +class TestAWSVerification(TestCase): + """ + Test AWS user verification + """ + + USER_EMAIL = 'admin@mit.edu' + AWS_ACCOUNT = '314159265359' + AWS_USERID = 'AIDAAAAAAAAAAAAAAAAAA' + AWS_ARN = 'arn:aws:iam::314159265359:user/tim' + IDENTITY_JSON = json.dumps({ + 'UserId': AWS_USERID, + 'Account': AWS_ACCOUNT, + 'Arn': AWS_ARN, + }) + SIGNED_URL = ( + 'https://example-bucket.s3.amazonaws.com/' + 'localhost%3A8000-verification/' + 'account%3D' + AWS_ACCOUNT + '/' + 'userid%3D' + AWS_USERID + '/' + 'email%3D' + USER_EMAIL.replace("@", "%40") + '/' + '?X-Amz-Algorithm=AWS4-HMAC-SHA256' + '&X-Amz-Credential=AKIAZZZZZZZZZZZZZZZZ' + '%2F20231101%2Fus-east-2%2Fs3%2Faws4_request' + '&X-Amz-Date=20231101T170000Z' + '&X-Amz-Expires=3600' + '&X-Amz-SignedHeaders=host' + '&X-Amz-Signature=' + '0123456789abcdef0123456789abcdef' + '0123456789abcdef0123456780abcdef' + ) + + S3_RESPONSE_403 = ( + '' + 'AccessDenied' + 'Access Denied' + ) + S3_RESPONSE_404 = ( + '' + 'NoSuchKey' + 'The specified key does not exist.' + ) + + def get_cloud_information(self): + user = User.objects.get(email=self.USER_EMAIL) + cloud_info, _ = CloudInformation.objects.get_or_create(user=user) + return cloud_info + + @requests_mock.Mocker() + def test_verify_success(self, mocker): + """ + Test successfully adding a valid AWS identity + """ + self.client.login(username=self.USER_EMAIL, password='Tester11!') + + cloud_info = self.get_cloud_information() + cloud_info.aws_id = None + cloud_info.aws_userid = None + cloud_info.aws_verification_datetime = None + cloud_info.save() + + response = self.client.post( + reverse('edit_cloud'), + data={'save-aws': '', 'aws_identity': self.IDENTITY_JSON}, + ) + self.assertEqual(response.status_code, 302) + self.assertEqual(response['location'], reverse('edit_cloud_aws')) + + # Assuming signature is correct, this URL should give a 404. + # With signature missing, it should give 403. + mocker.get( + self.SIGNED_URL, complete_qs=True, + status_code=404, text=self.S3_RESPONSE_404, + ) + mocker.get( + self.SIGNED_URL.split('?')[0], complete_qs=True, + status_code=403, text=self.S3_RESPONSE_403, + ) + response = self.client.post( + reverse('edit_cloud_aws'), + data={'signed_url': self.SIGNED_URL}, + ) + self.assertEqual(response.status_code, 302) + self.assertEqual(response['location'], reverse('edit_cloud')) + + cloud_info = self.get_cloud_information() + self.assertEqual(cloud_info.aws_id, self.AWS_ACCOUNT) + self.assertEqual(cloud_info.aws_userid, self.AWS_USERID) + self.assertIsNotNone(cloud_info.aws_verification_datetime) + + @requests_mock.Mocker() + def test_verify_failure(self, mocker): + """ + Test failure to add an invalid AWS identity + """ + self.client.login(username=self.USER_EMAIL, password='Tester11!') + + cloud_info = self.get_cloud_information() + cloud_info.aws_id = None + cloud_info.aws_userid = None + cloud_info.aws_verification_datetime = None + cloud_info.save() + + response = self.client.post( + reverse('edit_cloud'), + data={'save-aws': '', 'aws_identity': self.IDENTITY_JSON}, + ) + self.assertEqual(response.status_code, 302) + self.assertEqual(response['location'], reverse('edit_cloud_aws')) + + # Assuming signature is wrong, this URL should give a 403. + mocker.get( + self.SIGNED_URL, complete_qs=True, + status_code=403, text=self.S3_RESPONSE_403, + ) + response = self.client.post( + reverse('edit_cloud_aws'), + data={'signed_url': self.SIGNED_URL}, + ) + self.assertEqual(response.status_code, 200) + + cloud_info = self.get_cloud_information() + self.assertIsNone(cloud_info.aws_id) + self.assertIsNone(cloud_info.aws_userid) + self.assertIsNone(cloud_info.aws_verification_datetime) + + @requests_mock.Mocker() + def test_delete_info(self, mocker): + """ + Test deleting an existing AWS identity + """ + self.client.login(username=self.USER_EMAIL, password='Tester11!') + + cloud_info = self.get_cloud_information() + cloud_info.aws_id = self.AWS_ACCOUNT + cloud_info.aws_userid = self.AWS_USERID + cloud_info.aws_verification_datetime = timezone.now() + cloud_info.save() + + response = self.client.post( + reverse('edit_cloud'), + data={'delete-aws': ''}, + ) + self.assertEqual(response.status_code, 200) + + cloud_info = self.get_cloud_information() + self.assertIsNone(cloud_info.aws_id) + self.assertIsNone(cloud_info.aws_userid) + self.assertIsNone(cloud_info.aws_verification_datetime)