Skip to content

Commit

Permalink
alternative function successfully tested
Browse files Browse the repository at this point in the history
Signed-off-by: Katharina Trentau <[email protected]>
  • Loading branch information
fraugabel committed Sep 12, 2024
1 parent 042b927 commit e82cf34
Showing 1 changed file with 38 additions and 71 deletions.
109 changes: 38 additions & 71 deletions Tests/iaas/security-groups/default-security-group-rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,8 @@ def connect(cloud_name: str) -> openstack.connection.Connection:
def test_rules(cloud_name: str):
try:
connection = connect(cloud_name)
print("test_rules")
rules = connection.network.default_security_group_rules()
print("After test_rules")
except Exception as e:
print("except test_rules")
print(str(e))
raise Exception(
f"Connection to cloud '{cloud_name}' was not successful. "
Expand Down Expand Up @@ -150,7 +147,10 @@ def create_security_group(

def delete_security_group(conn, sg_id):
conn.network.delete_security_group(sg_id)
print(f"security group {sg_id} deleted")
try:
rest = conn.network.find_security_group(name_or_id=sg_id)
except:
print(f"Security group {sg_id} was deleted successfully.")


def altern_test_rules(cloud_name: str):
Expand All @@ -163,86 +163,54 @@ def altern_test_rules(cloud_name: str):
f"The default Security Group Rules could not be accessed. "
f"Please check your cloud connection and authorization."
)

sg_id = create_security_group(connection)
print(f"!!! created security group {sg_id}")
rules = connection.network.find_security_group(name_or_id=sg_id)
# rules = connection.network.security_group_rules()
print(f"!! worked: altern_rules {type(rules)} {rules}")
try:
sg_id = create_security_group(connection)
rules = connection.network.find_security_group(name_or_id=sg_id)
except:
print("Security group was not created successfully.")

# count all overall ingress rules and egress rules.
ingress_rules = 0
ingress_from_same_sg = 0
egress_rules = 0
egress_ipv4_default_sg = 0
egress_ipv4_custom_sg = 0
egress_ipv6_default_sg = 0
egress_ipv6_custom_sg = 0
egress_ipv4 = 0
egress_ipv6 = 0
if not rules:
print("No default security group rules defined.")
else:
for rule in rules.security_group_rules:
print (f"#############{rule}")
direction = rule['direction']
ethertype = rule['ethertype']
# r_custom_sg = rule['used_in_non_default_sg']
# r_default_sg = rule['used_in_default_sg']
# r_custom_sg = rule.used_in_non_default_sg
# r_default_sg = rule.used_in_default_sg

print(f"#############{direction}")
print(f"#############{ethertype}")

direction = rule["direction"]
ethertype = rule["ethertype"]
if direction == "ingress":
ingress_rules += 1
# we allow ingress from the same security group
# but only for the default security group
# r_group_id = rule.remote_group_id
# if (r_group_id == "PARENT" and not r_custom_sg):
# ingress_from_same_sg += 1
elif direction == "egress" and ethertype == "IPv4":
egress_rules += 1
# if rule.remote_ip_prefix:
# # this rule does not allow traffic to all external ips
# continue
# if r_custom_sg:
# egress_ipv4_custom_sg += 1
# if r_default_sg:
# egress_ipv4_default_sg += 1
egress_ipv4 += 1
elif direction == "egress" and ethertype == "IPv6":
egress_rules += 1
# if rule.remote_ip_prefix:
# # this rule does not allow traffic to all external ips
# continue
# if r_custom_sg:
# egress_ipv6_custom_sg += 1
# if r_default_sg:
# egress_ipv6_default_sg += 1
egress_ipv6 += 1

# test whether there are no other than the allowed ingress rules
# assert ingress_rules == ingress_from_same_sg, (
# f"Expected only ingress rules for default security groups, "
# f"that allow ingress traffic from the same group. "
# f"But there are more - in total {ingress_rules} ingress rules. "
# f"There should be only {ingress_from_same_sg} ingress rules."
# )
# test whether there are no ingress rules
assert ingress_rules == 0, (
f"Expected no default ingress rules for security groups, "
f"But there are {ingress_rules} ingress rules. "
f"There should be only none."
)
assert (
egress_rules > 0
), f"Expected to have more than {egress_rules} egress rules present."
var_list = [
egress_ipv4_default_sg,
egress_ipv4_custom_sg,
egress_ipv6_default_sg,
egress_ipv6_custom_sg,
egress_ipv4,
egress_ipv6,
]
assert all([var > 0 for var in var_list]), (
"Not all expected egress rules are present. "
"Expected rules for egress for IPv4 and IPv6 "
"both for default and custom security groups."
)


delete_security_group(connection, sg_id)
try:
delete_security_group(connection, sg_id)
except:
print(f"Security group {sg_id} was not deleted successfully")
result_dict = {"Ingress Rules": ingress_rules, "Egress Rules": egress_rules}
return result_dict

Expand Down Expand Up @@ -271,18 +239,17 @@ def main():
"You need to have the OS_CLOUD environment variable set to your cloud "
"name or pass it via --os-cloud"
)
altern_test_rules(cloud)
# try:
# print(test_rules(cloud))
# except ResourceNotFound as e:
# print(
# "##### Ressource could not be found."
# f"Error: {e}"
# "Openstack components are not up to date and might soon be depricated!"
# f"{altern_test_rules(cloud)}"
# )
# except Exception as e:
# print(f"Error occured: {e}")
try:
print(test_rules(cloud))
except ResourceNotFound as e:
print(
"Ressource could not be found."
"Openstack components might not up to date and soon be depricated!"
f"Error: {e}"
)
print(altern_test_rules(cloud))
except Exception as e:
print(f"Error occured: {e}")


if __name__ == "__main__":
Expand Down

0 comments on commit e82cf34

Please sign in to comment.