Skip to content

Commit

Permalink
trying to find os sdk method to request user role
Browse files Browse the repository at this point in the history
Signed-off-by: Katharina Trentau <[email protected]>
  • Loading branch information
fraugabel committed Sep 18, 2024
1 parent 83e1bc3 commit c0af338
Showing 1 changed file with 76 additions and 4 deletions.
80 changes: 76 additions & 4 deletions Tests/iaas/key-manager/check-for-key-manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,50 @@ def connect(cloud_name: str) -> openstack.connection.Connection:
cloud=cloud_name,
)

def delete_application_credential(conn: openstack.connection.Connection,
credential_name: str) -> None:
existing_credential = conn.identity.find_application_credential(
conn.current_user_id,
credential_name
)
if existing_credential:
print(
f"INFO: deleting application credential "
f"'{credential_name}' ..."
)
conn.identity.delete_application_credential(
conn.current_user_id,
existing_credential
)

def reconnect_with_role(conn: openstack.connection.Connection,
target_role_name: str
) -> openstack.connection.Connection:
"""
Uses the existing cloud connection to create a new application credential
in the Identity API limited to the role specified via target_role_name.
Creates a new cloud connection using the application credential and
returns it, effectively scoping the returned connection to the specific
role.
"""
credential_name = APP_CREDENTIAL_NAME
delete_application_credential(conn, credential_name)
app_credential = conn.identity.create_application_credential(
conn.current_user_id,
credential_name,
roles=[{"name": target_role_name}]
)

# Open a new connection using the application credential
new_conn = openstack.connect(
region_name=conn.config.config["region"],
auth_type="v3applicationcredential",
auth={
"auth_url": conn.auth["auth_url"],
"application_credential_id": app_credential.id,
},
)
return new_conn

def check_for_member_role(conn: openstack.connection.Connection) -> None:
"""Checks whether the current user has at maximum privileges
Expand Down Expand Up @@ -57,11 +101,37 @@ def check_for_member_role(conn: openstack.connection.Connection) -> None:
}

has_member_role = False
# auth_url = auth_data['auth_url'] + '/v3/auth/tokens'
auth_url = auth_data["auth_url"] + "auth/tokens"
print("!URL" + auth_url)
request = conn.session.request(auth_url, "POST", json={"auth": auth_dict})

# check whether auth_url already has v3
if "/v3/" in auth_data['auth_url']:
auth_url = auth_data['auth_url'] + 'auth/tokens'
else:
auth_url = auth_data['auth_url'] + '/v3/auth/tokens'

print(f"!URL {auth_url}")
request = conn.session.request(auth_url, "POST", json={"auth": auth_dict})
#print(request.content)

# # test alternative role function
# roles = conn.identity.roles()
# for role in roles:
# print (role.name)
# if role.name == "member":
# member_role = role
# print(member_role)
# break

# # Check if the user has the "member" role
# role_assignments = conn.identity.role_assignments(user=user_id, project=project_id)
# has_member_role = False
# for assignment in role_assignments:
# role = conn.identity.get_role(assignment.role['id'])
# if role.name == "member":
# has_member_role = True
# break


# working original test
for role in json.loads(request.content)["token"]["roles"]:
role_name = role["name"]
if role_name == "admin" or role_name == "manager":
Expand Down Expand Up @@ -130,6 +200,7 @@ def _find_secret(secret_name_or_id: str):

try:
existing_secret = _find_secret(secret_name)
print(existing_secret)
if existing_secret:
conn.key_manager.delete_secret(existing_secret)

Expand All @@ -146,6 +217,7 @@ def _find_secret(secret_name_or_id: str):
f"the user"
)
conn.key_manager.delete_secret(new_secret)

except openstack.exceptions.ForbiddenException as e:
print("Users of the 'member' role can use Key Manager API: FAIL")
print(f"ERROR: {str(e)}")
Expand Down

0 comments on commit c0af338

Please sign in to comment.