Skip to content

Commit

Permalink
Merge pull request #300 from splunk/raise_exception_on_malformatted_t…
Browse files Browse the repository at this point in the history
…ests

Exception on malformatted unit tests in YMLs
  • Loading branch information
pyth0n1c authored Oct 15, 2024
2 parents 2cc708b + 8f73477 commit 02eb5d7
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 15 deletions.
20 changes: 10 additions & 10 deletions contentctl/actions/detection_testing/GitService.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ def getChanges(self, target_branch:str)->List[Detection]:

#Make a filename to content map
filepath_to_content_map = { obj.file_path:obj for (_,obj) in self.director.name_to_content_map.items()}
updated_detections:List[Detection] = []
updated_macros:List[Macro] = []
updated_lookups:List[Lookup] =[]
updated_detections:set[Detection] = set()
updated_macros:set[Macro] = set()
updated_lookups:set[Lookup] = set()

for diff in all_diffs:
if type(diff) == pygit2.Patch:
Expand All @@ -80,14 +80,14 @@ def getChanges(self, target_branch:str)->List[Detection]:
if decoded_path.is_relative_to(self.config.path/"detections") and decoded_path.suffix == ".yml":
detectionObject = filepath_to_content_map.get(decoded_path, None)
if isinstance(detectionObject, Detection):
updated_detections.append(detectionObject)
updated_detections.add(detectionObject)
else:
raise Exception(f"Error getting detection object for file {str(decoded_path)}")

elif decoded_path.is_relative_to(self.config.path/"macros") and decoded_path.suffix == ".yml":
macroObject = filepath_to_content_map.get(decoded_path, None)
if isinstance(macroObject, Macro):
updated_macros.append(macroObject)
updated_macros.add(macroObject)
else:
raise Exception(f"Error getting macro object for file {str(decoded_path)}")

Expand All @@ -98,7 +98,7 @@ def getChanges(self, target_branch:str)->List[Detection]:
updatedLookup = filepath_to_content_map.get(decoded_path, None)
if not isinstance(updatedLookup,Lookup):
raise Exception(f"Expected {decoded_path} to be type {type(Lookup)}, but instead if was {(type(updatedLookup))}")
updated_lookups.append(updatedLookup)
updated_lookups.add(updatedLookup)

elif decoded_path.suffix == ".csv":
# If the CSV was updated, we want to make sure that we
Expand All @@ -125,7 +125,7 @@ def getChanges(self, target_branch:str)->List[Detection]:
if updatedLookup is not None and updatedLookup not in updated_lookups:
# It is possible that both the CSV and YML have been modified for the same lookup,
# and we do not want to add it twice.
updated_lookups.append(updatedLookup)
updated_lookups.add(updatedLookup)

else:
pass
Expand All @@ -136,7 +136,7 @@ def getChanges(self, target_branch:str)->List[Detection]:

# If a detection has at least one dependency on changed content,
# then we must test it again
changed_macros_and_lookups = updated_macros + updated_lookups
changed_macros_and_lookups:set[SecurityContentObject] = updated_macros.union(updated_lookups)

for detection in self.director.detections:
if detection in updated_detections:
Expand All @@ -146,14 +146,14 @@ def getChanges(self, target_branch:str)->List[Detection]:

for obj in changed_macros_and_lookups:
if obj in detection.get_content_dependencies():
updated_detections.append(detection)
updated_detections.add(detection)
break

#Print out the names of all modified/new content
modifiedAndNewContentString = "\n - ".join(sorted([d.name for d in updated_detections]))

print(f"[{len(updated_detections)}] Pieces of modifed and new content (this may include experimental/deprecated/manual_test content):\n - {modifiedAndNewContentString}")
return updated_detections
return sorted(list(updated_detections))

def getSelected(self, detectionFilenames: List[FilePath]) -> List[Detection]:
filepath_to_content_map: dict[FilePath, SecurityContentObject] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ def adjust_tests_and_groups(self) -> None:
the model from the list of unit tests. Also, preemptively skips all manual tests, as well as
tests for experimental/deprecated detections and Correlation type detections.
"""

# Since ManualTest and UnitTest are not differentiable without looking at the manual_test
# tag, Pydantic builds all tests as UnitTest objects. If we see the manual_test flag, we
# convert these to ManualTest
Expand Down Expand Up @@ -789,6 +790,45 @@ def search_observables_exist_validate(self):
# Found everything
return self

@field_validator("tests", mode="before")
def ensure_yml_test_is_unittest(cls, v:list[dict]):
"""The typing for the tests field allows it to be one of
a number of different types of tests. However, ONLY
UnitTest should be allowed to be defined in the YML
file. If part of the UnitTest defined in the YML
is incorrect, such as the attack_data file, then
it will FAIL to be instantiated as a UnitTest and
may instead be instantiated as a different type of
test, such as IntegrationTest (since that requires
less fields) which is incorrect. Ensure that any
raw data read from the YML can actually construct
a valid UnitTest and, if not, return errors right
away instead of letting Pydantic try to construct
it into a different type of test
Args:
v (list[dict]): list of dicts read from the yml.
Each one SHOULD be a valid UnitTest. If we cannot
construct a valid unitTest from it, a ValueError should be raised
Returns:
_type_: The input of the function, assuming no
ValueError is raised.
"""
valueErrors:list[ValueError] = []
for unitTest in v:
#This raises a ValueError on a failed UnitTest.
try:
UnitTest.model_validate(unitTest)
except ValueError as e:
valueErrors.append(e)
if len(valueErrors):
raise ValueError(valueErrors)
# All of these can be constructred as UnitTests with no
# Exceptions, so let the normal flow continue
return v


@field_validator("tests")
def tests_validate(
cls,
Expand Down
13 changes: 8 additions & 5 deletions contentctl/objects/macro.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from contentctl.input.director import DirectorOutputDto
from contentctl.objects.security_content_object import SecurityContentObject


#The following macros are included in commonly-installed apps.
#As such, we will ignore if they are missing from our app.
#Included in
Expand Down Expand Up @@ -55,10 +54,15 @@ def get_macros(text_field:str, director:DirectorOutputDto , ignore_macros:set[st
#If a comment ENDS in a macro, for example ```this is a comment with a macro `macro_here````
#then there is a small edge case where the regex below does not work properly. If that is
#the case, we edit the search slightly to insert a space
text_field = re.sub(r"\`\`\`\`", r"` ```", text_field)
text_field = re.sub(r"\`\`\`.*?\`\`\`", " ", text_field)

if re.findall(r"\`\`\`\`", text_field):
raise ValueError("Search contained four or more '`' characters in a row which is invalid SPL"
"This may have occurred when a macro was commented out.\n"
"Please ammend your search to remove the substring '````'")

# replace all the macros with a space
text_field = re.sub(r"\`\`\`[\s\S]*?\`\`\`", " ", text_field)


macros_to_get = re.findall(r'`([^\s]+)`', text_field)
#If macros take arguments, stop at the first argument. We just want the name of the macro
macros_to_get = set([macro[:macro.find('(')] if macro.find('(') != -1 else macro for macro in macros_to_get])
Expand All @@ -68,4 +72,3 @@ def get_macros(text_field:str, director:DirectorOutputDto , ignore_macros:set[st
macros_to_get -= macros_to_ignore
return Macro.mapNamesToSecurityContentObjects(list(macros_to_get), director)


0 comments on commit 02eb5d7

Please sign in to comment.