Skip to content

Commit

Permalink
Add test for k8s version recency (#288)
Browse files Browse the repository at this point in the history
* add initial test script
* check k8s release for releases
* check cluster with kubeconfig for version
* check version against each other
* if not matching, check for cadence time

Signed-off-by: Hannes Baum <[email protected]>
  • Loading branch information
cah-hbaum committed Jun 29, 2023
1 parent 59dc21e commit 20b5524
Show file tree
Hide file tree
Showing 3 changed files with 366 additions and 0 deletions.
Empty file removed Tests/kaas/.gitkeep
Empty file.
27 changes: 27 additions & 0 deletions Tests/kaas/k8s-version-recency/config.yaml.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
## Configuration file for the K8s Version Recency Test

# Github Token
token: "MY_GITHUB_TOKEN"

log:
level: INFO
version: 1
disable_existing_loggers: False
formatters:
k8s-version-recency-check:
format: "%(levelname)s: %(message)s"
handlers:
console:
class: logging.StreamHandler
formatter: k8s-version-recency-check
stream: ext://sys.stdout
file:
class: logging.handlers.WatchedFileHandler
formatter: k8s-version-recency-check
filename: MY-LOG-FILE-NAME.log
root: # Configuring the default (root) logger is highly recommended
handlers: [console]
loggers:
k8s-version-recency-check:
handlers: [console, file]
propagate: no
339 changes: 339 additions & 0 deletions Tests/kaas/k8s-version-recency/k8s-version-recency-check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,339 @@
#!/usr/bin/env python3
# vim: set ts=4 sw=4 et:
#
"""
K8s Version Recency Checker
https://github.com/SovereignCloudStack/standards
Return codes:
0: Version is inside the recency window
1: Error during script execution
2: Version isn't inside the recency windows anymore
3: Version used contains a critical CVE
One or more K8s clusters are checked by providing their kubeconfigs.
It is determined, if the version on these clusters is still inside
the recency window, which is determined by the Standard to be 4 months
for minor versions and 1 week for patch versions. An exception are
versions with critical CVEs, which should be replaced on a shorter notice.
(c) Hannes Baum <[email protected]>, 6/2023
License: CC-BY-SA 4.0
"""

import asyncio
import datetime
import getopt
import kubernetes_asyncio
import logging
import logging.config
import math
import re
import requests
import sys
import yaml


MAJOR_VERSION_CADENCE = None
MINOR_VERSION_CADENCE = 4 # months
PATCH_VERSION_CADENCE = 1 # week
CVE_VERSION_CADENCE = 3 # days
CVE_SEVERITY = 8 # CRITICAL

return_code = 0


logger = logging.getLogger("k8s-version-recency-check")


class ConfigException(BaseException):
"""Exception raised in a configuration error occurs
"""


class Config:
kubeconfig = None
token = None

log = None


def print_usage():
print("""
K8s Version Recency Compliance Check
Usage: k8s-version-recency-check.py [-h] [-t|--token GITHUB_TOKEN] [-k|--kubeconfig PATH/TO/KUBECONFIG]
The K8s version recency check returns 0 if the version of the tested cluster is still acceptable, otherwise
it returns 1 for an out-of-date minor version, 2 for an out-of-date patch level version or 3 if the currently
used version should be updated due to a highly critical CVE.
-t/--token GITHUB_TOKEN - Github token used to check the Github API for the latest K8s releases
-k/--kubeconfig PATH/TO/KUBECONFIG - Path to the kubeconfig of the server we want to check
-h - Output help
""")
return


def setup_logging(config_log):

logging.config.dictConfig(config_log)
loggers = [
logging.getLogger(name)
for name in logging.root.manager.loggerDict
if not logging.getLogger(name).level
]

for log in loggers:
log.setLevel(config_log['level'])


def load_config(argv):

config = Config()
config_path = "./config.yaml"
config_file = None

try:
opts, args = getopt.gnu_getopt(argv, "t:k:h", ["token", "kubeconfig", "help"])
except getopt.GetoptError:
print_usage()
raise ConfigException

for opt in opts:
if opt[0] == "-h" or opt[0] == "--help":
print_usage()
sys.exit()
if opt[0] == "-c" or opt[0] == "--config":
config_path = opt[1]
if opt[0] == "-k" or opt[0] == "--kubeconfig":
config.kubeconfig = opt[1]
if opt[0] == "-t" or opt[0] == "--token":
config.token = opt[1]

try:
with open(config_path, "r") as fd:
config_file = yaml.safe_load(fd)
fd.close()
except OSError:
pass

if config_file:
setup_logging(config_file['log'])
if not config.token:
config.token = config_file['token']
else:
logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.INFO)
if not config.kubeconfig or not config.token:
print("Kubeconfig and Github Token need to be set in order to test a K8s version.")
raise ConfigException

return config


class K8sVersionInfo:
def __init__(self, major=0, minor=0, patch=0):
self.major = major
self.minor = minor
self.patch = patch

self.date = None

def __eq__(self, other):
if self.major == other.major and \
self.minor == other.minor and \
self.patch == other.patch:
return True
else:
return False

def extract_version(self, string, separator="."):
v_dict = string.strip().split(separator)
self.major = int(v_dict[0])
self.minor = int(v_dict[1])
self.patch = int(v_dict[2])

def check_for_version(self, major=None, minor=None, patch=None):
if (major is None or self.major == major) and \
(minor is None or self.minor == minor) and \
(patch is None or self.patch == patch):
return True
else:
return False

def get_version(self):
return f"{self.major}.{self.minor}.{self.patch}"


def diff_months(date1, date2):
return abs((date1.year - date2.year) * 12 + date1.month - date2.month)


def diff_weeks(date1, date2):
day1 = (date1 - datetime.timedelta(days=date1.weekday()))
day2 = (date2 - datetime.timedelta(days=date2.weekday()))
diff = day2 - day1
return abs((diff.days / 7) + math.ceil(diff.seconds / 86400))


def diff_days(date1, date2):
delta = date1 - date2
return abs(delta.days)


def check_cve_versions():

# CVE fix versions
cfvs = list()

# Request latest version
cve_list = requests.get("https://kubernetes.io/docs/reference/issues-security/official-cve-feed/index.json",
headers={"Accept": "application/json"}).json()

for cve in cve_list['items']:
cveid = cve['external_url'].split("=")[-1]
cve_data = requests.get(f"https://cveawg.mitre.org/api/cve/{cveid}",
headers={"Accept": "application/json"}).json()
try:
cve_cna = cve_data['containers']['cna']
cve_metrics = cve_cna['metrics'][0]
key = None
for k in cve_metrics.keys():
if re.search(r'[cC][vV][sS]{1,2}V\d', k):
key = k
break

if cve_metrics[key]['baseScore'] >= CVE_SEVERITY:
for v in cve_cna['affected'][0]['versions']:
vi = K8sVersionInfo()
if 'lessThanOrEqual' in v.keys():
vi.extract_version(v['lessThanOrEqual'].strip("v"))
vi.patch += 1
elif 'lessThan' in v.keys():
vi.extract_version(v['lessThan'].strip("v"))
if vi not in cfvs:
cfvs.append(vi)
except KeyError as e:
logger.debug(f"They key {e} couldn't be found in the CVE json data for CVE {cveid}.")
pass

return cfvs


async def get_k8s_cluster_version(kubeconfig):

await kubernetes_asyncio.config.load_kube_config(kubeconfig)

async with kubernetes_asyncio.client.ApiClient() as api:
version_api = kubernetes_asyncio.client.VersionApi(api)
ret = await version_api.get_code()

version = K8sVersionInfo()
version.extract_version(ret.git_version.strip("v"))
version.date = datetime.datetime.strptime(ret.build_date, '%Y-%m-%dT%H:%M:%SZ')

with open(kubeconfig, "r") as fd:
cluster_config = yaml.safe_load(fd)
fd.close()

return version, cluster_config['contexts'][0]['name']


def get_k8s_versions(token, cve_versions=None):

k8s_versions = []

github_headers = {
"Accept": "application/vnd.github+json",
"Authorization": f"Bearer {token}",
"X-GitHub-Api-Version": "2022-11-28"
}

# Request latest version
resp = requests.get("https://api.github.com/repos/kubernetes/kubernetes/releases/latest",
headers=github_headers).json()

# Latest version
lv = K8sVersionInfo()
lv.extract_version(resp['tag_name'].strip("v"))
lv.date = datetime.datetime.strptime(resp['published_at'], '%Y-%m-%dT%H:%M:%SZ')

# Request the latest 100 version (the next are not needed, since these versions are too old)
resp = requests.get("https://api.github.com/repos/kubernetes/kubernetes/releases?per_page=100",
headers=github_headers).json()

# Iterate all versions until the first patch of the previous latest version
for i in range(0, len(resp)):
v = K8sVersionInfo()
v.extract_version(resp[i]['tag_name'].split("-")[0].strip("v"))
v.date = datetime.datetime.strptime(resp[i]['published_at'], '%Y-%m-%dT%H:%M:%SZ')

if resp[i]['draft'] is False and resp[i]['prerelease'] is False:
k8s_versions.append(v)

# Stop adding new version if the version we added is the previous latest minor versions first patch,
# since it is most of the time around 3-6 months until a new version comes out or an old one goes into EOL
if v.check_for_version(lv.major, (lv.minor-1), 0):
break

# Filter the versions depending on the usage times set by the standard
for i in range(0, len(k8s_versions)):
v = k8s_versions.__getitem__(i)
try:
if diff_months(v.date, datetime.datetime.now()) >= MINOR_VERSION_CADENCE:
v = None
else:
for kv in k8s_versions:
if kv is not None and \
v.check_for_version(major=kv.major, minor=kv.minor) and v.patch < kv.patch:
if diff_weeks(datetime.datetime.now(), kv.date) >= PATCH_VERSION_CADENCE:
v = None
break
if kv in cve_versions and \
diff_days(datetime.datetime.now(), kv.date) >= CVE_VERSION_CADENCE:
v = None
break
except (KeyError, IndexError, TypeError) as e:
logger.debug(f"An error occurred during version filtering: {e}")
v = None
finally:
k8s_versions[i] = v

return [v for v in k8s_versions if v is not None]


async def main(argv):
global return_code

try:
config = load_config(argv)
except ConfigException:
return_code = 1
return

cve_versions = check_cve_versions()
k8s_versions = get_k8s_versions(config.token, cve_versions)
cluster_version, cluster_name = await get_k8s_cluster_version(config.kubeconfig)

for k8sv in k8s_versions:
if k8sv == cluster_version:
logger.info("The K8s cluster version %s of cluster '%s' is still in the recency time window." %
(cluster_version.get_version(), cluster_name))
return
if cluster_version in cve_versions:
logger.error("The K8s cluster version %s of cluster '%s' is an outdated version with a CRITICAL CVE." %
(cluster_version.get_version(), cluster_name))
return_code = 3
else:
logger.error("The K8s cluster version %s of cluster '%s' is outdated according to the standard." %
(cluster_version.get_version(), cluster_name))
return_code = 2
return


if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(sys.argv[1:]))
loop.close()

sys.exit(return_code)

0 comments on commit 20b5524

Please sign in to comment.