Skip to content

Commit

Permalink
Rework default-security-group-rules.py (#748)
Browse files Browse the repository at this point in the history
This makes the test for the default rules of security groups downwards compatible
with versions of OpenStack that don't have network.default_security_group_rules().

Solves #746.

Signed-off-by: Katharina Trentau <[email protected]>
Signed-off-by: Matthias Büchse <[email protected]>
Co-authored-by: Matthias Büchse <[email protected]>
  • Loading branch information
fraugabel and mbuechse authored Nov 8, 2024
1 parent bf0c077 commit 4ca42f9
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 95 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
**/__pycache__/
.venv/
.idea
.sandbox
.DS_Store
node_modules
Tests/kaas/results/
Expand Down
240 changes: 145 additions & 95 deletions Tests/iaas/security-groups/default-security-group-rules.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -4,127 +4,177 @@
except for ingress rules from the same Security Group. Furthermore the
presence of default rules for egress traffic is checked.
"""
import argparse
from collections import Counter
import logging
import os
import sys

import openstack
import os
import argparse
from openstack.exceptions import ResourceNotFound

logger = logging.getLogger(__name__)

def connect(cloud_name: str) -> openstack.connection.Connection:
"""Create a connection to an OpenStack cloud
SG_NAME = "scs-test-default-sg"
DESCRIPTION = "scs-test-default-sg"

:param string cloud_name:
The name of the configuration to load from clouds.yaml.

:returns: openstack.connnection.Connection
def check_default_rules(rules, short=False):
"""
return openstack.connect(
cloud=cloud_name,
)
counts all verall ingress rules and egress rules, depending on the requested testing mode

def test_rules(cloud_name: str):
try:
connection = connect(cloud_name)
rules = connection.network.default_security_group_rules()
except Exception as e:
print(str(e))
raise Exception(
f"Connection to cloud '{cloud_name}' was not successfully. "
f"The default Security Group Rules could not be accessed. "
f"Please check your cloud connection and authorization."
)

# count all overall ingress rules and egress rules.
ingress_rules = 0
ingress_from_same_sg = 0
egress_rules = 0
egress_ipv4_default_sg = 0
egress_ipv4_custom_sg = 0
egress_ipv6_default_sg = 0
egress_ipv6_custom_sg = 0
:param bool short
if short is True, the testing mode is set on short for older OpenStack versions
"""
ingress_rules = egress_rules = 0
egress_vars = {'IPv4': {}, 'IPv6': {}}
for key, value in egress_vars.items():
value['default'] = 0
if not short:
value['custom'] = 0
if not rules:
print("No default security group rules defined.")
else:
for rule in rules:
direction = rule.direction
ethertype = rule.ethertype
r_custom_sg = rule.used_in_non_default_sg
r_default_sg = rule.used_in_default_sg
if direction == "ingress":
ingress_rules += 1
logger.info("No default security group rules defined.")
for rule in rules:
direction = rule["direction"]
ethertype = rule["ethertype"]
if direction == "ingress":
if not short:
# we allow ingress from the same security group
# but only for the default security group
r_group_id = rule.remote_group_id
if (r_group_id == "PARENT" and not r_custom_sg):
ingress_from_same_sg += 1
elif direction == "egress" and ethertype == "IPv4":
egress_rules += 1
if rule.remote_ip_prefix:
# this rule does not allow traffic to all external ips
continue
if r_custom_sg:
egress_ipv4_custom_sg += 1
if r_default_sg:
egress_ipv4_default_sg += 1
elif direction == "egress" and ethertype == "IPv6":
egress_rules += 1
if rule.remote_ip_prefix:
# this rule does not allow traffic to all external ips
if rule.remote_group_id == "PARENT" and not rule["used_in_non_default_sg"]:
continue
if r_custom_sg:
egress_ipv6_custom_sg += 1
if r_default_sg:
egress_ipv6_default_sg += 1

# test whether there are no other than the allowed ingress rules
assert ingress_rules == ingress_from_same_sg, (
f"Expected only ingress rules for default security groups, "
f"that allow ingress traffic from the same group. "
f"But there are more - in total {ingress_rules} ingress rules. "
f"There should be only {ingress_from_same_sg} ingress rules.")
assert egress_rules > 0, (
f"Expected to have more than {egress_rules} egress rules present.")
var_list = [egress_ipv4_default_sg, egress_ipv4_custom_sg,
egress_ipv6_default_sg, egress_ipv6_custom_sg]
assert all([var > 0 for var in var_list]), (
"Not all expected egress rules are present. "
"Expected rules for egress for IPv4 and IPv6 "
"both for default and custom security groups.")

result_dict = {
"Ingress Rules": ingress_rules,
"Egress Rules": egress_rules
}
return result_dict
ingress_rules += 1
elif direction == "egress" and ethertype in egress_vars:
egress_rules += 1
if short:
egress_vars[ethertype]['default'] += 1
continue
if rule.remote_ip_prefix:
# this rule does not allow traffic to all external ips
continue
# note: these two are not mutually exclusive
if rule["used_in_default_sg"]:
egress_vars[ethertype]['default'] += 1
if rule["used_in_non_default_sg"]:
egress_vars[ethertype]['custom'] += 1
# test whether there are no unallowed ingress rules
if ingress_rules:
logger.error(f"Expected no default ingress rules, found {ingress_rules}.")
# test whether all expected egress rules are present
missing = [(key, key2) for key, val in egress_vars.items() for key2, val2 in val.items() if not val2]
if missing:
logger.error(
"Expected rules for egress for IPv4 and IPv6 both for default and custom security groups. "
f"Missing rule types: {', '.join(str(x) for x in missing)}"
)
logger.info(str({
"Unallowed Ingress Rules": ingress_rules,
"Egress Rules": egress_rules,
}))


def create_security_group(conn, sg_name: str = SG_NAME, description: str = DESCRIPTION):
"""Create security group in openstack
:returns:
~openstack.network.v2.security_group.SecurityGroup: The new security group or None
"""
sg = conn.network.create_security_group(name=sg_name, description=description)
return sg.id


def delete_security_group(conn, sg_id):
conn.network.delete_security_group(sg_id)
# in case of a successful delete finding the sg will throw an exception
try:
conn.network.find_security_group(name_or_id=sg_id)
except ResourceNotFound:
logger.debug(f"Security group {sg_id} was deleted successfully.")
except Exception:
logger.critical(f"Security group {sg_id} was not deleted successfully")
raise


def altern_test_rules(connection: openstack.connection.Connection):
sg_id = create_security_group(connection)
try:
sg = connection.network.find_security_group(name_or_id=sg_id)
check_default_rules(sg.security_group_rules, short=True)
finally:
delete_security_group(connection, sg_id)


def test_rules(connection: openstack.connection.Connection):
try:
rules = list(connection.network.default_security_group_rules())
except ResourceNotFound:
logger.info(
"API call failed. OpenStack components might not be up to date. "
"Falling back to old-style test method. "
)
logger.debug("traceback", exc_info=True)
altern_test_rules(connection)
else:
check_default_rules(rules)


class CountingHandler(logging.Handler):
def __init__(self, level=logging.NOTSET):
super().__init__(level=level)
self.bylevel = Counter()

def handle(self, record):
self.bylevel[record.levelno] += 1


def main():
parser = argparse.ArgumentParser(
description="SCS Default Security Group Rules Checker")
description="SCS Default Security Group Rules Checker",
)
parser.add_argument(
"--os-cloud", type=str,
"--os-cloud",
type=str,
help="Name of the cloud from clouds.yaml, alternative "
"to the OS_CLOUD environment variable"
"to the OS_CLOUD environment variable",
)
parser.add_argument(
"--debug", action="store_true",
help="Enable OpenStack SDK debug logging"
"--debug", action="store_true", help="Enable debug logging",
)
args = parser.parse_args()
openstack.enable_logging(debug=args.debug)
logging.basicConfig(
format="%(levelname)s: %(message)s",
level=logging.DEBUG if args.debug else logging.INFO,
)

# count the number of log records per level (used for summary and return code)
counting_handler = CountingHandler(level=logging.INFO)
logger.addHandler(counting_handler)

# parse cloud name for lookup in clouds.yaml
cloud = os.environ.get("OS_CLOUD", None)
if args.os_cloud:
cloud = args.os_cloud
assert cloud, (
"You need to have the OS_CLOUD environment variable set to your cloud "
"name or pass it via --os-cloud"
)
cloud = args.os_cloud or os.environ.get("OS_CLOUD", None)
if not cloud:
raise ValueError(
"You need to have the OS_CLOUD environment variable set to your cloud "
"name or pass it via --os-cloud"
)

print(test_rules(cloud))
with openstack.connect(cloud) as conn:
test_rules(conn)

c = counting_handler.bylevel
logger.debug(f"Total critical / error / warning: {c[logging.CRITICAL]} / {c[logging.ERROR]} / {c[logging.WARNING]}")
if not c[logging.CRITICAL]:
print("security-groups-default-rules-check: " + ('PASS', 'FAIL')[min(1, c[logging.ERROR])])
return min(127, c[logging.CRITICAL] + c[logging.ERROR]) # cap at 127 due to OS restrictions


if __name__ == "__main__":
main()
try:
sys.exit(main())
except SystemExit:
raise
except BaseException as exc:
logging.debug("traceback", exc_info=True)
logging.critical(str(exc))
sys.exit(1)

0 comments on commit 4ca42f9

Please sign in to comment.