Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate Element selection and Control evaluation conditions for Threats. #178

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 74 additions & 11 deletions pytm/pytm.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,17 @@ def __set__(self, instance, value):
)
super().__set__(instance, list(value))

class varThreats(var):
def __set__(self, instance, value):
for i, e in enumerate(value):
if not isinstance(e, Threat):
raise ValueError(
"expecting a list of Threat, item number {} is a {}".format(
i, type(e)
)
)
super().__set__(instance, list(value))


class varAction(var):
def __set__(self, instance, value):
Expand All @@ -163,6 +174,13 @@ def __set__(self, instance, value):
super().__set__(instance, value)


class varThreatResult(var):
def __set__(self, instance, value):
if not isinstance(value, ThreatResult):
raise ValueError("expecting a ThreatResult, got a {}".format(type(value)))
super().__set__(instance, value)


class varData(var):
def __set__(self, instance, value):
if isinstance(value, str):
Expand Down Expand Up @@ -204,6 +222,13 @@ def __ne__(self, other):
def __str__(self):
return ", ".join(sorted(set(d.name for d in self)))

class ThreatResult(Enum):
VALID = "VALID"
INVALID_TARGET = "INVALID_TARGET"
NOT_APPLICABLE = "NOT_APPLICABLE"
MITIGATED = "MITIGATED"
OVERRIDE = "OVERRIDE"


class Action(Enum):
"""Action taken when validating a threat model."""
Expand Down Expand Up @@ -479,6 +504,7 @@ class Threat:

id = varString("", required=True)
description = varString("")
target_condition = varString("")
condition = varString(
"",
doc="""a Python expression that should evaluate
Expand All @@ -495,6 +521,7 @@ def __init__(self, **kwargs):
self.id = kwargs["SID"]
self.description = kwargs.get("description", "")
self.condition = kwargs.get("condition", "True")
self.target_condition = kwargs.get("target_condition", "True")
target = kwargs.get("target", "Element")
if not isinstance(target, str) and isinstance(target, Iterable):
target = tuple(target)
Expand All @@ -517,14 +544,23 @@ def __str__(self):

def apply(self, target):
if not isinstance(target, self.target):
return None
return eval(self.condition)
return ThreatResult.INVALID_TARGET

if not eval(self.target_condition):
return ThreatResult.NOT_APPLICABLE

if not eval(self.condition):
return ThreatResult.MITIGATED
else:
return ThreatResult.VALID



class Finding:
"""Represents a Finding - the element in question
and a description of the finding"""

status = varThreatResult(None, required=True, doc="Result from Threat evaluation")
element = varElement(None, required=True, doc="Element this finding applies to")
target = varString("", doc="Name of the element this finding applies to")
description = varString("", required=True, doc="Threat description")
Expand Down Expand Up @@ -593,7 +629,7 @@ def __init__(

for k, v in kwargs.items():
setattr(self, k, v)

def __repr__(self):
return "<{0}.{1}({2}) at {3}>".format(
self.__module__, type(self).__name__, self.id, hex(id(self))
Expand Down Expand Up @@ -659,14 +695,18 @@ def _add_threats(self):

for i in threats_json:
for k, v in i.items():
if isinstance(v, str) and k != "condition":
if isinstance(v, str) and (k != "condition" and k != "target_condition"):
i[k] = html.escape(i[k])
TM._threats.append(Threat(**i))

def resolve(self):
findings = []
elements = defaultdict(list)
element_findings = defaultdict(list)
element_mitigated_threats = defaultdict(list)
element_not_applicable_threats = defaultdict(list)

for e in TM._elements:
print("----")
if not e.inScope:
continue

Expand All @@ -679,14 +719,35 @@ def resolve(self):
pass

for t in TM._threats:
if not t.apply(e) and t.id not in override_ids:
threat_result = t.apply(e)
if not threat_result == ThreatResult.VALID and t.id not in override_ids:

if t.id in override_ids:
threat_result = ThreatResult.OVERRIDE
elif threat_result == ThreatResult.NOT_APPLICABLE:
if isinstance(e, (Asset, Dataflow)):
element_not_applicable_threats[e].append(t)
elif threat_result == ThreatResult.MITIGATED:
if isinstance(e, (Asset, Dataflow)):
element_mitigated_threats[e].append(t)


continue
f = Finding(e, threat=t)

#TODO : Consider adding to findings for all results expect for INVALID_TARGET
f = Finding(e, status=threat_result, threat=t)
findings.append(f)
elements[e].append(f)
element_findings[e].append(f)

self.findings = findings
for e, findings in elements.items():
e.findings = findings
for e, f in element_findings.items():
e.findings = f

for e, threats in element_mitigated_threats.items():
e.mitigated_threats = threats

for e, threats in element_not_applicable_threats.items():
e.not_applicable_threats = threats

def check(self):
if self.description is None:
Expand Down Expand Up @@ -978,6 +1039,8 @@ class Element:
doc="Maximum data classification this element can handle.",
)
findings = varFindings([], doc="Threats that apply to this element")
not_applicable_threats = varThreats([], doc="Threats that do not apply to this element per element annotations")
mitigated_threats = varThreats([], doc="Threats which are mitiigiated per element annotations")
overrides = varFindings(
[],
doc="""Overrides to findings, allowing to set
Expand Down Expand Up @@ -1660,7 +1723,7 @@ def serialize(obj, nested=False):
and not isinstance(value, str)
and isinstance(value, Iterable)
):
value = [v.id if isinstance(v, Finding) else v.name for v in value]
value = [v.id if isinstance(v, (Finding, Threat)) else v.name for v in value]
result[i.lstrip("_")] = value
return result

Expand Down
65 changes: 62 additions & 3 deletions pytm/threatlib/threats.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,70 @@
[
{
"SID":"NOZ01",
"target": ["Server"],
"target_condition": "target.usesEnvironmentVariables",
"description": "NOZ01 threat",
"details": "Testing - Valid Threat",
"Likelihood Of Attack": "High",
"severity": "High",
"condition": "True",
"prerequisites": "foobar",
"mitigations": "foobar",
"example": "foobar",
"references": "foobar"
},
{
"SID":"NOZ02",
"target": ["Server"],
"target_condition": "not target.usesEnvironmentVariables",
"description": "NOZ02 threat",
"details": "Testing - NA threat",
"Likelihood Of Attack": "High",
"severity": "High",
"condition": "True",
"prerequisites": "foobar",
"mitigations": "foobar",
"example": "foobar",
"references": "foobar"
},
{
"SID":"NOZ03",
"target": ["Server"],
"target_condition": "target.usesEnvironmentVariables",
"description": "NOZ03 threat",
"details": "Testing - mitigated",
"Likelihood Of Attack": "High",
"severity": "High",
"condition": "False",
"prerequisites": "foobar",
"mitigations": "foobar",
"example": "foobar",
"references": "foobar"
},

{
"SID":"NOZ04",
"target": ["Server"],
"target_condition": "any(t.id == 'NOZ03' for t in target.mitigated_threats)",
"description": "NOZ03 threat",
"details": "Testing - mitigated",
"Likelihood Of Attack": "High",
"severity": "High",
"condition": "True",
"prerequisites": "foobar",
"mitigations": "foobar",
"example": "foobar",
"references": "foobar"
},
{
"SID":"INP01",
"target": ["Lambda","Process"],
"target": ["Lambda","Process","Server"],
"target_condition": "target.usesEnvironmentVariables",
"description": "Buffer Overflow via Environment Variables",
"details": "This attack pattern involves causing a buffer overflow through manipulation of environment variables. Once the attacker finds that they can modify an environment variable, they may try to overflow associated buffers. This attack leverages implicit trust often placed in environment variables.",
"Likelihood Of Attack": "High",
"severity": "High",
"condition": "target.usesEnvironmentVariables is True and target.sanitizesInput is False and target.checksInputBounds is False",
"condition": "target.sanitizesInput is False and target.checksInputBounds is False",
"prerequisites": "The application uses environment variables.An environment variable exposed to the user is vulnerable to a buffer overflow.The vulnerable environment variable uses untrusted data.Tainted data used in the environment variables is not properly validated. For instance boundary checking is not done before copying the input data to a buffer.",
"mitigations": "Do not expose environment variable to the user.Do not use untrusted data in your environment variables. Use a language or compiler that performs automatic bounds checking. There are tools such as Sharefuzz [R.10.3] which is an environment variable fuzzer for Unix that support loading a shared library. You can use Sharefuzz to determine if you are exposing an environment variable vulnerable to buffer overflow.",
"example": "Attack Example: Buffer Overflow in $HOME A buffer overflow in sccw allows local users to gain root access via the $HOME environmental variable. Attack Example: Buffer Overflow in TERM A buffer overflow in the rlogin program involves its consumption of the TERM environmental variable.",
Expand Down Expand Up @@ -79,7 +137,8 @@
},
{
"SID":"INP05",
"target": ["Server"],
"target": ["Server", "Process"],
"target_condition": "any(d.sink.isSQL for d in target.outputs)",
"description": "Command Line Execution through SQL Injection",
"details": "An attacker uses standard SQL injection methods to inject data into the command line for execution. This could be done directly through misuse of directives such as MSSQL_xp_cmdshell or indirectly through injection of data into the database that would be interpreted as shell commands. Sometime later, an unscrupulous backend application (or could be part of the functionality of the same application) fetches the injected data stored in the database and uses this data as command line arguments without performing proper validation. The malicious data escapes that data plane by spawning new commands to be executed on the host.",
"Likelihood Of Attack": "Low",
Expand Down
4 changes: 4 additions & 0 deletions tm.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
web.sanitizesInput = False
web.encodesOutput = True
web.authorizesSource = False
web.usesEnvironmentVariables = True
web.checksInputBounds = False
web.usesStrongSessionIdentifiers = True

db = Datastore("SQL Database")
db.OS = "CentOS"
Expand Down Expand Up @@ -104,3 +107,4 @@

if __name__ == "__main__":
tm.process()