From 91319461fc2d5ba293c52760e77fbcfefea5a1e9 Mon Sep 17 00:00:00 2001 From: Imran Hossain Shaon Date: Mon, 2 May 2016 12:14:37 -0700 Subject: [PATCH] add bucket versioning test --- nephoria/testcases/s3/bucket_tests.py | 172 +++++++++++++++++++++++++- nephoria/testcontroller.py | 2 +- nephoria/usercontext.py | 16 ++- 3 files changed, 176 insertions(+), 14 deletions(-) diff --git a/nephoria/testcases/s3/bucket_tests.py b/nephoria/testcases/s3/bucket_tests.py index 3ffb34fc..11cc0382 100644 --- a/nephoria/testcases/s3/bucket_tests.py +++ b/nephoria/testcases/s3/bucket_tests.py @@ -1,6 +1,9 @@ #!/usr/bin/env python import re -from boto.exception import S3ResponseError + +import boto +from boto.exception import S3ResponseError, S3CreateError +from boto.s3.lifecycle import Lifecycle from nephoria.testcase_utils.cli_test_runner import CliTestRunner, SkipTestException from nephoria.testcontroller import TestController @@ -67,8 +70,8 @@ def test_negative_basic(self): raise("Should have caught exception for creating bucket with empty-string name.") except S3ResponseError as e: assert (e.status == 405), 'Expected response status code to be 405, actual status code is ' + str(e.status) - assert ( - re.search("MethodNotAllowed", e.code)), "Incorrect exception returned when creating bucket with null name." + assert (re.search("MethodNotAllowed", e.code)),\ + "Incorrect exception returned when creating bucket with null name." self.log.debug("Testing an invalid bucket names, calls should fail.") @@ -96,8 +99,169 @@ def test_creating_bucket_invalid_names(bad_bucket): for bad_bucket in ["bucket&123", "bucket*123"]: test_creating_bucket_invalid_names(self.bucket_prefix + bad_bucket) + # def test_bucket_acl(self): + # ''' + # Tests bucket ACL management and adding/removing from the ACL with both valid and invalid usernames + # ''' + # + # test_bucket = self.bucket_prefix + 'acl_bucket_test' + # + # test_user_id = self.tc.user.s3.connection.get_canonical_user_id() + # self.log.debug('Starting ACL test with bucket name: ' + test_bucket + ' and userid ' + test_user_id) + # + # try: + # acl_bucket = self.tc.user.s3.create_bucket(test_bucket) + # self.tc.user.test_user_resources['_bucket'].append(acl_bucket) + # self.log.debug('Created bucket: ' + test_bucket) + # except S3CreateError: + # self.log.debug("Can't create the bucket, already exists. Deleting it an trying again") + # try: + # self.tc.user.s3.delete_bucket(test_bucket) + # acl_bucket = self.tc.s3.create_bucket(test_bucket) + # except: + # self.log.debug("Couldn't delete and create new bucket. Failing test") + # raise("Couldn't make the test bucket: " + test_bucket) + # + # policy = acl_bucket.get_acl() + # + # if policy == None: + # raise('No acl returned') + # + # self.log.debug(policy) + # # Check that the acl is correct: owner full control. + # if len(policy.acl.grants) > 1: + # self.tc.s3.delete_bucket(test_bucket) + # raise('Expected only 1 grant in acl. Found: ' + policy.acl.grants.grants.__len__()) + # + # if policy.acl.grants[0].id != test_user_id or policy.acl.grants[0].permission != 'FULL_CONTROL': + # self.tc.s3.delete_bucket(test_bucket) + # raise('Unexpected grant encountered: ' + policy.acl.grants[0].display_name + ' ' + policy.acl.grants[ + # 0].permission + ' ' + policy.acl.grants[0].id) + # + # # Get the info on the owner from the ACL returned + # owner_display_name = policy.acl.grants[0].display_name + # owner_id = policy.acl.grants[0].id + # + # # upload a new acl for the bucket + # new_acl = policy + # new_user_display_name = owner_display_name + # new_user_id = owner_id + # new_acl.acl.add_user_grant(permission="READ", user_id=new_user_id, display_name=new_user_display_name) + # try: + # acl_bucket.set_acl(new_acl) + # acl_check = acl_bucket.get_acl() + # except S3ResponseError: + # raise ("Failed to set or get new acl") + # + # self.log.info("Got ACL: " + acl_check.acl.to_xml()) + # + # # expected_result_base=' + # # + # # ' + owner_id + ''+ owner_display_name + 'FULL_CONTROL + # # EXPECTED_IDEXPECTED_NAMEREAD + # # ' + # + # if acl_check == None and not self.tc.user.s3.check_acl_equivalence(acl1=acl_check.acl, acl2=new_acl.acl): + # self.tc.user.s3.delete_bucket(test_bucket) + # raise ("Incorrect acl length or acl not found\n. Got bucket ACL:\n" + acl_check.acl.to_xml() + + # "\nExpected:" + new_acl.acl.to_xml()) + # else: + # self.log.info("Got expected basic ACL addition") + # + # self.log.info( + # "Grants 0 and 1: " + acl_check.acl.grants[0].to_xml() + " -- " + acl_check.acl.grants[1].to_xml()) + # + # # Check each canned ACL string in boto to make sure Walrus does it right + # for acl in boto.s3.acl.CannedACLStrings: + # if acl == "authenticated-read": + # continue + # self.log.info('Testing canned acl: ' + acl) + # try: + # acl_bucket.set_acl(acl) + # acl_check = acl_bucket.get_acl() + # except Exception as e: + # self.tc.user.s3.delete_bucket(test_bucket) + # raise("Got exception trying to set acl to " + acl + ": " + str(e)) + # + # self.log.info("Set canned-ACL: " + acl + " -- Got ACL from service: " + acl_check.acl.to_xml()) + # expected_acl = self.tc.user.get_canned_acl(bucket_owner_id=owner_id, canned_acl=acl, + # bucket_owner_display_name=owner_display_name) + # + # if expected_acl == None: + # self.tc.user.s3.delete_bucket(test_bucket) + # raise("Got None when trying to generate expected acl for canned acl string: " + acl) + # + # if not self.tc.user.check_acl_equivalence(acl1=expected_acl, acl2=acl_check.acl): + # self.tc.user.s3.delete_bucket(test_bucket) + # raise ( + # "Invalid " + acl + " acl returned from Walrus:\n" + acl_check.acl.to_xml() + "\nExpected\n" + expected_acl.to_xml()) + # else: + # self.log.debug("Got correct acl for: " + acl) + # + # try: + # acl_bucket.set_acl('invalid-acl') + # raise ('Did not catch expected exception for invalid canned-acl') + # except: + # self.log.debug("Caught expected exception from invalid canned-acl") + # + # self.tc.user.s3.delete_bucket(test_bucket) + # self.log.debug("Bucket ACL: PASSED") + + def test_bucket_versioning(self): + test_bucket = self.bucket_prefix + "versioning_test_bucket" + self.log.info('Testing bucket versioning using bucket:' + test_bucket) + version_bucket = self.tc.user.s3.create_bucket(test_bucket) + self.tc.user.test_user_resources['_bucket'].append(version_bucket) + version_status = version_bucket.get_versioning_status().get("Versioning") + + # Test the default setup after bucket creation. Should be disabled. + if version_status != None: + version_bucket.delete() + raise ("Expected versioning disabled (empty), found: " + str(version_status)) + elif version_status == None: + self.log.info("Null version status returned, may be correct since it should be disabled") + + # Turn on versioning, confirm that it is 'Enabled' + version_bucket.configure_versioning(True) + version_status = version_bucket.get_versioning_status().get("Versioning") + if version_status == None or version_status != "Enabled": + version_bucket.delete() + raise("Expected versioning enabled, found: " + str(version_status)) + elif version_status == None: + version_bucket.delete() + raise("Null version status returned") + self.log.info("Versioning of bucket is set to: " + version_status) + + # Turn off/suspend versioning, confirm. + version_bucket.configure_versioning(False) + version_status = version_bucket.get_versioning_status().get("Versioning") + if version_status == None or version_status != "Suspended": + version_bucket.delete() + raise("Expected versioning suspended, found: " + str(version_status)) + elif version_status == None: + version_bucket.delete() + raise("Null version status returned") + + self.log.info("Versioning of bucket is set to: " + version_status) + + version_bucket.configure_versioning(True) + version_status = version_bucket.get_versioning_status().get("Versioning") + if version_status == None or version_status != "Enabled": + version_bucket.delete() + raise("Expected versioning enabled, found: " + str(version_status)) + elif version_status == None: + version_bucket.delete() + raise("Null version status returned") + + self.log.info("Versioning of bucket is set to: " + version_status) + + # version_bucket.delete() + # self.buckets_used.remove(test_bucket) + self.log.info("Bucket Versioning: PASSED") + def clean_method(self): - pass + for bkt in self.tc.user.test_user_resources['_bucket']: + self.user.s3.delete_bucket(bkt) if __name__ == "__main__": diff --git a/nephoria/testcontroller.py b/nephoria/testcontroller.py index b287bd6b..6269506b 100644 --- a/nephoria/testcontroller.py +++ b/nephoria/testcontroller.py @@ -18,7 +18,7 @@ class TestController(object): def __init__(self, hostname=None, username='root', password=None, region=None, proxy_hostname=None, proxy_password=None, - clouduser_account='nephotest', clouduser_name='sys_admin', clouduser_credpath=None, + clouduser_account='nephotest', clouduser_name='admin', clouduser_credpath=None, clouduser_accesskey=None, clouduser_secretkey=None, cloudadmin_credpath=None, cloudadmin_accesskey=None, cloudadmin_secretkey=None, timeout=10, log_level='DEBUG', environment_file=None, diff --git a/nephoria/usercontext.py b/nephoria/usercontext.py index d91a21eb..4f1b4572 100644 --- a/nephoria/usercontext.py +++ b/nephoria/usercontext.py @@ -112,6 +112,13 @@ def __init__(self, aws_access_key=None, aws_secret_key=None, aws_account_name=N self.log.identifier = str(self) self.log.debug('Successfully created User Context') + self.test_user_resources = \ + { + '_instances': [], + '_volumes': [], + '_bucket': [] + } + ########################################################################################## # User/Account Properties, Attributes, Methods, etc.. ########################################################################################## @@ -458,12 +465,3 @@ def cleanup_artifacts(self, for account_name in self.test_resources["iam_accounts"]: self.iam.delete_account(account_name=account_name, recursive=True) except: pass - - - - - - - - -