Skip to content

Commit

Permalink
Issue/718 check for key manager (#760)
Browse files Browse the repository at this point in the history
* changed synthesis of auth_url

Signed-off-by: Katharina Trentau <[email protected]>

* trying to find os sdk method to request user role

Signed-off-by: Katharina Trentau <[email protected]>

* alternating between auth methods

Signed-off-by: Katharina Trentau <[email protected]>

* alternating between auth methods when catching an auth error

Signed-off-by: Katharina Trentau <[email protected]>

* modularized

Signed-off-by: Katharina Trentau <[email protected]>

* start refracturing

Signed-off-by: Katharina Trentau <[email protected]>

* added auth url option

Signed-off-by: Katharina Trentau <[email protected]>

* debugged �[200~            auth_url = synth_auth_url(auth_data['auth_url'])

Signed-off-by: Katharina Trentau <[email protected]>

* debugged synth_auth_url()

Signed-off-by: Katharina Trentau <[email protected]>

* formatting

Signed-off-by: Katharina Trentau <[email protected]>

* refractured

Signed-off-by: Katharina Trentau <[email protected]>

* fetching token and identity role through keystone lib

Signed-off-by: Katharina Trentau <[email protected]>

* at authentification error continue with fernet token instead of reconnecting

Signed-off-by: Katharina Trentau <[email protected]>

* at authentification error continue with fernet token instead of reconnecting

Signed-off-by: Katharina Trentau <[email protected]>

* at authentification error continue with fernet token instead of reconnecting

Signed-off-by: Katharina Trentau <[email protected]>

* at authentification error continue with fernet token instead of reconnecting

Signed-off-by: Katharina Trentau <[email protected]>

* changed everything to session token requests only

Signed-off-by: Katharina Trentau <[email protected]>

* before refracturing with reconnecting still in it but commented

Signed-off-by: Katharina Trentau <[email protected]>

* stripped

Signed-off-by: Katharina Trentau <[email protected]>

* stripped and blacked again for flake8

Signed-off-by: Katharina Trentau <[email protected]>

* tested against new devstack

Signed-off-by: Katharina Trentau <[email protected]>

* tested against new devstack

Signed-off-by: Katharina Trentau <[email protected]>

* changed description

Signed-off-by: Katharina Trentau <[email protected]>

* make check script executable

Signed-off-by: Matthias Büchse <[email protected]>

* Removed a whole lot of unnecessary code

Signed-off-by: Matthias Büchse <[email protected]>

* initialized logger changed prints to logs

Signed-off-by: Katharina Trentau <[email protected]>

* changed assert to raised exception

Signed-off-by: Katharina Trentau <[email protected]>

* sadisfy flake

Signed-off-by: Katharina Trentau <[email protected]>

* Revised structure, logging, error handling, return code, documentation

Signed-off-by: Matthias Büchse <[email protected]>

* mention test script in official document

Signed-off-by: Matthias Büchse <[email protected]>

* Acquiesce flake8

Signed-off-by: Matthias Büchse <[email protected]>

---------

Signed-off-by: Katharina Trentau <[email protected]>
Signed-off-by: Matthias Büchse <[email protected]>
Co-authored-by: Katharina Trentau <[email protected]>
Co-authored-by: Matthias Büchse <[email protected]>
  • Loading branch information
3 people authored Oct 30, 2024
1 parent ced5954 commit ff829e7
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 131 deletions.
5 changes: 5 additions & 0 deletions Standards/scs-0116-w1-key-manager-implementation-testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ This can be done with a small change in the policy.yaml file. The `creator` has
The check for the presence of a Key Manager is done with a test script, that checks the presence of a Key Manager service in the catalog endpoint of Openstack.
This check can eventually be moved to the checks for the mandatory an supported service/API list, in case of a promotion of the Key Manager to the mandatory list.
### Implementation
The script [`check-for-key-manager.py`](https://github.com/SovereignCloudStack/standards/blob/main/Tests/iaas/key-manager/check-for-key-manager.py)
connects to OpenStack and performs the checks described in this section.

## Manual Tests

It is not possible to check a deployment for a correctly protected Master KEK automatically from the outside.
Expand Down
225 changes: 94 additions & 131 deletions Tests/iaas/key-manager/check-for-key-manager.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,190 +1,153 @@
"""Mandatory APIs checker
#!/usr/bin/env python3
"""Key Manager service checker for scs-0116-v1-key-manager-standard.md
This script retrieves the endpoint catalog from Keystone using the OpenStack
SDK and checks whether a key manager APi endpoint is present.
SDK and checks whether a key manager API endpoint is present.
It then checks whether a user with the maximum of a member role can create secrets.
This will only work after policy adjustments or with the new secure RBAC roles and policies.
The script relies on an OpenStack SDK compatible clouds.yaml file for
authentication with Keystone.
"""

import argparse
import json
import logging
import os
import sys

import openstack


logger = logging.getLogger(__name__)


def connect(cloud_name: str) -> openstack.connection.Connection:
"""Create a connection to an OpenStack cloud
:param string cloud_name:
The name of the configuration to load from clouds.yaml.
:returns: openstack.connnection.Connection
"""
return openstack.connect(
cloud=cloud_name,
)
def initialize_logging():
logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.INFO)


def check_for_member_role(conn: openstack.connection.Connection
) -> None:
"""Checks whether the current user has at maximum privileges
of the member role.
:param connection:
The current connection to an OpenStack cloud.
:returns: boolean, when role with most priviledges is member
"""
def check_for_member_role(conn: openstack.connection.Connection) -> None:
"""Checks whether the current user has at maximum privileges of the member role.
auth_data = conn.auth
auth_dict = {
"identity": {
"methods": ["password"],
"password": {
"user": {
"name": auth_data['username'],
"domain": {"name": auth_data['project_domain_name']},
"password": auth_data['password']
}
},
},
"scope": {
"project": {
"domain": {"name": auth_data['project_domain_name']},
"name": auth_data['project_name']
}
}
}

has_member_role = False
request = conn.session.request(auth_data['auth_url'] + '/v3/auth/tokens',
'POST',
json={'auth': auth_dict})
for role in json.loads(request.content)["token"]["roles"]:
role_name = role["name"]
if role_name == "admin" or role_name == "manager":
return False
elif role_name == "member":
print("User has member role.")
has_member_role = True
elif role_name == "reader":
print("User has reader role.")
else:
print("User has custom role.")
return False
return has_member_role


def check_presence_of_key_manager(cloud_name: str):
:param conn: connection to an OpenStack cloud.
:returns: boolean, when role with most privileges is member
"""
role_names = set(conn.session.auth.get_access(conn.session).role_names)
if role_names & {"admin", "manager"}:
return False
if "reader" in role_names:
logger.info("User has reader role.")
custom_roles = sorted(role_names - {"reader", "member"})
if custom_roles:
logger.info(f"User has custom roles {', '.join(custom_roles)}.")
return "member" in role_names


def check_presence_of_key_manager(conn: openstack.connection.Connection) -> None:
try:
connection = connect(cloud_name)
services = connection.service_catalog
except Exception as e:
print(str(e))
raise Exception(
f"Connection to cloud '{cloud_name}' was not successfully. "
f"The Catalog endpoint could not be accessed. "
f"Please check your cloud connection and authorization."
)
services = conn.service_catalog
except Exception:
logger.critical("Could not access Catalog endpoint.")
raise

for svc in services:
svc_type = svc['type']
svc_type = svc["type"]
if svc_type == "key-manager":
# key-manager is present
# now we want to check whether a user with member role
# can create and access secrets
check_key_manager_permissions(connection)
return 0
logger.info("Key Manager is present")
return True

# we did not find the key-manager service
logger.warning("There is no key-manager endpoint in the cloud.")
# we do not fail, until a key-manager MUST be present
return 0

def _find_secret(conn: openstack.connection.Connection, secret_name_or_id: str):
"""Replacement method for finding secrets.
def check_key_manager_permissions(conn: openstack.connection.Connection
) -> None:
Mimicks the behavior of Connection.key_manager.find_secret()
but fixes an issue with the internal implementation raising an
exception due to an unexpected microversion parameter.
"""
secrets = conn.key_manager.secrets()
for s in secrets:
if s.name == secret_name_or_id or s.id == secret_name_or_id:
return s


def check_key_manager_permissions(conn: openstack.connection.Connection) -> None:
"""
After checking that the current user only has the member and maybe the
reader role, this method verifies that the user with a member role
has sufficient access to the Key Manager API functionality.
"""
secret_name = "scs-member-role-test-secret"
if not check_for_member_role(conn):
logger.warning("Cannot test key-manager permissions. "
"User has wrong roles")
return None

def _find_secret(secret_name_or_id: str):
"""Replacement method for finding secrets.
Mimicks the behavior of Connection.key_manager.find_secret()
but fixes an issue with the internal implementation raising an
exception due to an unexpected microversion parameter.
"""
secrets = conn.key_manager.secrets()
for s in secrets:
if s.name == secret_name_or_id or s.id == secret_name_or_id:
return s
return None

try:
existing_secret = _find_secret(secret_name)
existing_secret = _find_secret(conn, secret_name)
if existing_secret:
conn.key_manager.delete_secret(existing_secret)

conn.key_manager.create_secret(
name=secret_name,
payload_content_type="text/plain",
secret_type="opaque",
payload="foo"
)

new_secret = _find_secret(secret_name)
assert new_secret, (
f"Secret created with name '{secret_name}' was not discoverable by "
f"the user"
payload="foo",
)
conn.key_manager.delete_secret(new_secret)
except openstack.exceptions.ForbiddenException as e:
print(
"Users of the 'member' role can use Key Manager API: FAIL"
try:
new_secret = _find_secret(conn, secret_name)
if not new_secret:
raise ValueError(f"Secret '{secret_name}' was not discoverable by the user")
finally:
conn.key_manager.delete_secret(new_secret)
except openstack.exceptions.ForbiddenException:
logger.debug('exception details', exc_info=True)
logger.error(
"Users with the 'member' role can use Key Manager API: FAIL"
)
print(
f"ERROR: {str(e)}"
)
exit(1)
print(
"Users of the 'member' role can use Key Manager API: PASS"
return 1
logger.info(
"Users with the 'member' role can use Key Manager API: PASS"
)


def main():
parser = argparse.ArgumentParser(
description="SCS Mandatory IaaS Service Checker")
initialize_logging()
parser = argparse.ArgumentParser(description="SCS Mandatory IaaS Service Checker")
parser.add_argument(
"--os-cloud", type=str,
"--os-cloud",
type=str,
help="Name of the cloud from clouds.yaml, alternative "
"to the OS_CLOUD environment variable"
"to the OS_CLOUD environment variable",
)
parser.add_argument(
"--debug", action="store_true",
help="Enable OpenStack SDK debug logging"
"--debug", action="store_true", help="Enable OpenStack SDK debug logging"
)
args = parser.parse_args()
openstack.enable_logging(debug=args.debug)
# @mbuechse: I think this is so much as to be unusable!
# (If necessary, a developer can always uncomment)
# openstack.enable_logging(debug=args.debug)
if args.debug:
logger.setLevel(logging.DEBUG)

# parse cloud name for lookup in clouds.yaml
cloud = os.environ.get("OS_CLOUD", None)
if args.os_cloud:
cloud = args.os_cloud
assert cloud, (
"You need to have the OS_CLOUD environment variable set to your cloud "
"name or pass it via --os-cloud"
)
cloud = args.os_cloud or os.environ.get("OS_CLOUD", None)
if not cloud:
raise RuntimeError(
"You need to have the OS_CLOUD environment variable set to your cloud "
"name or pass it via --os-cloud"
)

return check_presence_of_key_manager(cloud)
with openstack.connect(cloud=cloud) as conn:
if not check_for_member_role(conn):
logger.critical("Cannot test key-manager permissions. User has wrong roles")
return 1
if check_presence_of_key_manager(conn):
return check_key_manager_permissions(conn)
else:
# not an error, because key manager is merely recommended
logger.warning("There is no key-manager endpoint in the cloud.")


if __name__ == "__main__":
main()
try:
sys.exit(main())
except SystemExit:
raise
except BaseException:
logger.critical("exception", exc_info=True)
sys.exit(1)

0 comments on commit ff829e7

Please sign in to comment.