Skip to content

Commit

Permalink
Update test for k8s version recency (#288)
Browse files Browse the repository at this point in the history
* updates to test after reviews from @chess-knight and @fdobrovolny
* changed math for weeks and months, since they were faulty in the previous implementation
* changed way how the k8s versions are compared to a simpler loop which also causes less problems

Signed-off-by: Hannes Baum <[email protected]>
  • Loading branch information
cah-hbaum committed Aug 1, 2023
1 parent a761513 commit 6effbcc
Showing 1 changed file with 102 additions and 103 deletions.
205 changes: 102 additions & 103 deletions Tests/kaas/k8s-version-recency/k8s-version-recency-check.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,21 @@
import aiohttp
import asyncio
import datetime
from dateutil import relativedelta
import getopt
import kubernetes_asyncio
import logging
import logging.config
import math
import re
import requests
import sys
import yaml


MAJOR_VERSION_CADENCE = None
MINOR_VERSION_CADENCE = 4 # months
PATCH_VERSION_CADENCE = 1 # week
CVE_VERSION_CADENCE = 3 # days
MINOR_VERSION_CADENCE_MONTHS = 4
PATCH_VERSION_CADENCE_WEEKS = 1
CVE_VERSION_CADENCE_DAYS = 3
CVE_SEVERITY = 8 # CRITICAL

logging_config = {
Expand All @@ -62,7 +62,7 @@
}
}

logger = logging.getLogger("k8s-version-recency-check")
logger = logging.getLogger(__name__)


class ConfigException(BaseException):
Expand All @@ -83,13 +83,12 @@ def print_usage():
print("""
K8s Version Recency Compliance Check
Usage: k8s-version-recency-check.py [-h] [-k|--kubeconfig PATH/TO/KUBECONFIG]
Usage: k8s-version-recency-check.py [-h] [-c|--config PATH/TO/CONFIG] -k|--kubeconfig PATH/TO/KUBECONFIG
The K8s version recency check returns 0 if the version of the tested cluster is still acceptable, otherwise
it returns 1 for an out-of-date minor version, 2 for an out-of-date patch level version or 3 if the currently
used version should be updated due to a highly critical CVE.
it returns 2 for an out-of date version or 3 if the used version should be updated due to a highly critical CVE.
-c/--config PATH/TOCONFIG - Path to the config file of the test script
-c/--config PATH/TO/CONFIG - Path to the config file of the test script
-k/--kubeconfig PATH/TO/KUBECONFIG - Path to the kubeconfig of the server we want to check
-h - Output help
""")
Expand Down Expand Up @@ -142,7 +141,7 @@ def initialize_config(config):
# Otherwise, we initialize logging with the included literal
setup_logging(config.logging or logging_config)

if not config.kubeconfig:
if config.kubeconfig is None:
print("A kubeconfig needs to be set in order to test a k8s cluster version.")
raise ConfigException

Expand All @@ -167,39 +166,44 @@ def __init__(self, major=0, minor=0, patch=0):

def __eq__(self, other):
if not isinstance(other, K8sVersionInfo):
return False
raise TypeError
return self.major == other.major and self.minor == other.minor and self.patch == other.patch

def __gt__(self, other):
if not isinstance(other, K8sVersionInfo):
return False
raise TypeError
patchcomp = self.minor == other.minor and self.patch > other.patch
return self.major > other.major or (self.major == other.major and (self.minor > other.minor or patchcomp))

def __ge__(self, other):
if not isinstance(other, K8sVersionInfo):
return False
raise TypeError
patchcomp = self.minor == other.minor and self.patch >= other.patch
return self.major > other.major or (self.major == other.major and (self.minor > other.minor or patchcomp))

def __lt__(self, other):
if not isinstance(other, K8sVersionInfo):
return False
raise TypeError
patchcomp = self.minor == other.minor and self.patch < other.patch
return self.major < other.major or (self.major == other.major and (self.minor < other.minor or patchcomp))

def __le__(self, other):
if not isinstance(other, K8sVersionInfo):
return False
raise TypeError
patchcomp = self.minor == other.minor and self.patch <= other.patch
return self.major < other.major or (self.major == other.major and (self.minor < other.minor or patchcomp))

@classmethod
def extract_version(cls, string, separator="."):
def extract_version(cls, string, separator=".", strip=None):
if strip is None:
strip = ["v"]
for s in strip:
string = string.strip(s)
components = string.strip().split(separator)
return cls(int(components[0]), int(components[1]), int(components[2]))

def check_for_version(self, major=None, minor=None, patch=None):
"""Check if a version or part of the version is equal to the given version numbers"""
return (major is None or self.major == major) and \
(minor is None or self.minor == minor) and \
(patch is None or self.patch == patch)
Expand All @@ -209,42 +213,57 @@ def __str__(self):


class CVEVersionInfo:
def __init__(self, lower_version, upper_version, less_than=False, equal=False):
"""Class that contains a CVE version info.<
Attributes:
upper_version (K8sVersionInfo): Last version with the CVE
lower_version (K8sVersionInfo): First version with the CVE; this value will be set if either an affected version
is directly set in a CVE dataset or if the CVE dataset is in a non-standard format.
If the variable is set, `lower_version` and `upper_version` create a range of affected versions.
equal (bool): check if the version is equal to the `upper_version`, (less than is always checked, since the
format is build like this)
"""
def __init__(self, lower_version, upper_version, equal=False):
self.lower_version = lower_version
self.upper_version = upper_version

self.less_than = less_than
self.equal = equal

def __eq__(self, other):
if not isinstance(other, CVEVersionInfo):
return False
return self.lower_version == other.lower_version and self.upper_version == other.upper_version and \
self.less_than == other.less_than and self.equal == self.equal
raise TypeError
return self.lower_version == other.lower_version and \
self.upper_version == other.upper_version and \
self.equal == self.equal

def is_version_affected(self, version_info):
# See the following link for more information about the format
# https://www.cve.org/AllResources/CveServices#cve-json-5

# Check if an `upper version` exists
if self.upper_version:
# Check if a `lower version` exists and compare the version against it
if self.lower_version:
gt = self.lower_version <= version_info
else:
gt = True
if self.less_than:
if self.equal:
return gt and self.upper_version >= version_info
return gt and self.upper_version > version_info
# Compare the version either with `less than` or `less than or equal` against the `upper version`
if self.equal:
return gt and self.upper_version >= version_info
return gt and self.upper_version > version_info
else:
# If no upper version exists, we only need to check if the version is equal to the `lower version`
return self.lower_version == version_info


def diff_months(date1, date2):
return abs((date1.year - date2.year) * 12 + date1.month - date2.month)
r = relativedelta.relativedelta(date2, date1)
return r.months + (12 * r.years)


def diff_weeks(date1, date2):
day1 = (date1 - datetime.timedelta(days=date1.weekday()))
day2 = (date2 - datetime.timedelta(days=date2.weekday()))
diff = day2 - day1
return abs((diff.days / 7) + math.ceil(diff.seconds / 86400))
delta = date1 - date2
return abs(delta.days / 7)


def diff_days(date1, date2):
Expand All @@ -254,48 +273,46 @@ def diff_days(date1, date2):

async def request_cve_data(session: aiohttp.ClientSession, cveid: str) -> dict:
"""Request for a single CVE data item."""
resp = await session.request('GET', f"https://cveawg.mitre.org/api/cve/{cveid}",
headers={"Accept": "application/json"})
return await resp.json()
async with session.get(
f"https://cveawg.mitre.org/api/cve/{cveid}",
headers={"Accept": "application/json"}
) as resp:
return await resp.json()


def parse_cve_version_information(cve_version_info):
"""Parse the CVE version information according to their CVE JSON 5.0 schema"""
vi_lower_version = None
vi_upper_version = None
less_than = False
equal = False

# Extract the version if it is viable, but it's not a requirement
try:
vi_lower_version = K8sVersionInfo().extract_version(cve_version_info['version'].strip("v"))
vi_lower_version = K8sVersionInfo.extract_version(cve_version_info['version'])
except ValueError:
pass

if 'lessThanOrEqual' in cve_version_info:
vi_upper_version = K8sVersionInfo().extract_version(cve_version_info['lessThanOrEqual'].strip("v"))
vi_upper_version.patch += 1
less_than = True
vi_upper_version = K8sVersionInfo.extract_version(cve_version_info['lessThanOrEqual'])
equal = True
elif 'lessThan' in cve_version_info:
vi_upper_version = K8sVersionInfo().extract_version(cve_version_info['lessThan'].strip("v"))
less_than = True
vi_upper_version = K8sVersionInfo.extract_version(cve_version_info['lessThan'])

# This shouldn't happen, but if it happens, we look for non-standard descriptions
# According to this(https://www.cve.org/AllResources/CveServices#cve-json-5),
# this isn't how the data should be described
if not vi_lower_version and not vi_upper_version:
if vi_lower_version is None and vi_upper_version is None:
if re.search(r'v?\d+.\d+.x', cve_version_info['version']):
vdata = cve_version_info['version'].strip("v").split(".")
vi_lower_version = K8sVersionInfo(vdata[0], vdata[1], 0)
vi_upper_version = K8sVersionInfo(vdata[0], int(vdata[1])+1, 0)
vi_upper_version = K8sVersionInfo(vdata[0], vdata[1], 0)

if re.search(r'v?\d+.\d+.\d+\s+-\s+v?\d+.\d+.\d+', cve_version_info['version']):
vdata = cve_version_info['version'].split("-")
vi_lower_version = K8sVersionInfo().extract_version(vdata[0].strip("v"))
vi_upper_version = K8sVersionInfo().extract_version(vdata[1].strip("v"))
vi_lower_version = K8sVersionInfo.extract_version(vdata[0])
vi_upper_version = K8sVersionInfo.extract_version(vdata[1])

return CVEVersionInfo(vi_lower_version, vi_upper_version, less_than, equal)
return CVEVersionInfo(vi_lower_version, vi_upper_version, equal)


async def collect_cve_versions():
Expand Down Expand Up @@ -349,8 +366,11 @@ async def collect_cve_versions():
if version_info['status'] == "affected"
]
for cvev in affected_kubernetes_versions:
if cvev not in cfvs:
cfvs.append(cvev)
try:
if cvev not in cfvs:
cfvs.append(cvev)
except TypeError:
pass

return cfvs

Expand All @@ -363,73 +383,51 @@ async def get_k8s_cluster_version(kubeconfig):
version_api = kubernetes_asyncio.client.VersionApi(api)
ret = await version_api.get_code()

version = K8sVersionInfo.extract_version(ret.git_version.strip("v"))
version = K8sVersionInfo.extract_version(ret.git_version)
version.date = datetime.datetime.strptime(ret.build_date, '%Y-%m-%dT%H:%M:%SZ')

return version, cluster_config.current_context['name']


def is_current_version(version, k8s_version_list, cve_version_list=[]):
"""Filter the versions depending on the usage times set by the Standard and
the times set for CVE versions.
"""
try:
if diff_months(version.date, datetime.datetime.now()) >= MINOR_VERSION_CADENCE:
return False

for kv in k8s_version_list:
if version.check_for_version(major=kv.major, minor=kv.minor) and \
version.patch < kv.patch:

if diff_weeks(datetime.datetime.now(), kv.date) >= PATCH_VERSION_CADENCE:
return False

if kv in cve_version_list and \
diff_days(datetime.datetime.now(), kv.date) >= CVE_VERSION_CADENCE:
return False
except (KeyError, IndexError, TypeError) as e:
logger.debug(f"An error occurred during version filtering: {e}")
return False
else:
return True


def collect_accepted_k8s_versions(cve_version_list=[]):
"""Collect a list of k8s versions that comply to the cadence time set by the standard"""

k8s_versions = []
def check_k8s_version_recency(version, cve_version_list=None):
"""Check a given K8s cluster version against the list of released versions in order to find out, if the version
is an accepted recent version according to the standard."""
if cve_version_list is None:
cve_version_list = list()

github_headers = {
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28"
}

# Request latest version
resp = requests.get("https://api.github.com/repos/kubernetes/kubernetes/releases/latest",
headers=github_headers).json()

# Latest version
lv = K8sVersionInfo.extract_version(resp['tag_name'].strip("v"))
lv.date = datetime.datetime.strptime(resp['published_at'], '%Y-%m-%dT%H:%M:%SZ')

# Request the latest 100 version (the next are not needed, since these versions are too old)
response = requests.get("https://api.github.com/repos/kubernetes/kubernetes/releases?per_page=100",
headers=github_headers).json()

# Iterate all versions until the first patch of the previous latest version
for r in response:
v = K8sVersionInfo.extract_version(r['tag_name'].split("-")[0].strip("v"))
v = K8sVersionInfo.extract_version(r['tag_name'].split("-")[0])
v.date = datetime.datetime.strptime(r['published_at'], '%Y-%m-%dT%H:%M:%SZ')

if not r['draft'] and not r['prerelease']:
k8s_versions.append(v)
if r['draft'] or r['prerelease']:
continue

# Check if the version is recent
if v.minor >= version.minor:
if diff_months(v.date, datetime.datetime.now()) >= MINOR_VERSION_CADENCE_MONTHS:
return False

if version.check_for_version(major=v.major, minor=v.minor) and version.patch < v.patch:
if diff_weeks(datetime.datetime.now(), v.date) >= PATCH_VERSION_CADENCE_WEEKS:
return False

# Stop adding new version if the version we added is the previous latest minor versions first patch,
# since it is most of the time around 3-6 months until a new version comes out or an old one goes into EOL
if v.check_for_version(lv.major, (lv.minor-1), 0):
if v in cve_version_list and \
diff_days(datetime.datetime.now(), v.date) >= CVE_VERSION_CADENCE_DAYS:
return False

if v.minor == (version.minor + 1) and v.patch == 0:
break

return [v for v in k8s_versions if is_current_version(v, k8s_versions, cve_version_list)]
return True


async def main(argv):
Expand All @@ -440,20 +438,21 @@ async def main(argv):
return 1

cve_versions = await collect_cve_versions()
k8s_versions = collect_accepted_k8s_versions(cve_versions)
cluster_version, cluster_name = await get_k8s_cluster_version(config.kubeconfig)

for k8sv in k8s_versions:
if k8sv == cluster_version:
logger.info("The K8s cluster version %s of cluster '%s' is still in the recency time window." %
(str(cluster_version), cluster_name))
return 0
if check_k8s_version_recency(cluster_version, cve_versions):
logger.info("The K8s cluster version %s of cluster '%s' is still in the recency time window." %
(str(cluster_version), cluster_name))
return 0

for cvev in cve_versions:
if cvev.is_version_affected(cluster_version):
logger.error("The K8s cluster version %s of cluster '%s' is an outdated version "
"with a possible CRITICAL CVE." % (str(cluster_version), cluster_name))
return 3
try:
if cvev.is_version_affected(cluster_version):
logger.error("The K8s cluster version %s of cluster '%s' is an outdated version "
"with a possible CRITICAL CVE." % (str(cluster_version), cluster_name))
return 3
except TypeError as e:
logger.error(f"An error occurred during CVE check: {e}")

logger.error("The K8s cluster version %s of cluster '%s' is outdated according to the Standard." %
(str(cluster_version), cluster_name))
Expand Down

0 comments on commit 6effbcc

Please sign in to comment.