Skip to content

Commit

Permalink
Fix error where duplicate data_sources
Browse files Browse the repository at this point in the history
were added to an analytic story if
multiple detections referenced the
same data_source. This was done
by making data_sources a
computed_field for Story rather than
building at while deteciton objects
are built.  Additionally added eq, lt,
and hash methods to
SecurityContentObject_Abstract
so that set operations and sorts can
happen easily for all objects.
  • Loading branch information
pyth0n1c committed Jul 25, 2024
1 parent 0eebfd9 commit b3e7d09
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class Detection_Abstract(SecurityContentObject):
#contentType: SecurityContentType = SecurityContentType.detections
type: AnalyticsType = Field(...)
status: DetectionStatus = Field(...)
data_source: Optional[List[str]] = None
data_source: list[str] = []
tags: DetectionTags = Field(...)
search: Union[str, dict[str,Any]] = Field(...)
how_to_implement: str = Field(..., min_length=4)
Expand All @@ -54,7 +54,7 @@ class Detection_Abstract(SecurityContentObject):
# A list of groups of tests, relying on the same data
test_groups: Union[list[TestGroup], None] = Field(None,validate_default=True)

data_source_objects: Optional[List[DataSource]] = None
data_source_objects: list[DataSource] = []


@field_validator("search", mode="before")
Expand Down Expand Up @@ -420,9 +420,7 @@ def model_post_init(self, ctx:dict[str,Any]):
self.data_source_objects = matched_data_sources

for story in self.tags.analytic_story:
story.detections.append(self)
story.data_sources.extend(self.data_source_objects)

story.detections.append(self)
return self


Expand All @@ -446,14 +444,16 @@ def mapDetectionNamesToBaselineObjects(cls, v:list[str], info:ValidationInfo)->L
raise ValueError("Error, baselines are constructed automatically at runtime. Please do not include this field.")


name:Union[str,dict] = info.data.get("name",None)
name:Union[str,None] = info.data.get("name",None)
if name is None:
raise ValueError("Error, cannot get Baselines because the Detection does not have a 'name' defined.")

director:DirectorOutputDto = info.context.get("output_dto",None)
baselines:List[Baseline] = []
for baseline in director.baselines:
if name in baseline.tags.detections:
# This matching is a bit strange, because baseline.tags.detections starts as a list of strings, but
# is eventually updated to a list of Detections as we construct all of the detection objects.
if name in [detection_name for detection_name in baseline.tags.detections if isinstance(detection_name,str)]:
baselines.append(baseline)

return baselines
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,33 @@ def __repr__(self)->str:

def __str__(self)->str:
return(self.__repr__())

def __lt__(self, other:object)->bool:
if not isinstance(other,SecurityContentObject_Abstract):
raise Exception(f"SecurityContentObject can only be compared to each other, not to {type(other)}")
return self.name < other.name

def __eq__(self, other:object)->bool:
if not isinstance(other,SecurityContentObject_Abstract):
raise Exception(f"SecurityContentObject can only be compared to each other, not to {type(other)}")

if id(self) == id(other) and self.name == other.name and self.id == other.id:
# Yes, this is the same object
return True

elif id(self) == id(other) or self.name == other.name or self.id == other.id:
raise Exception("Attempted to compare two SecurityContentObjects, but their fields indicate they were not globally unique:"
f"\n\tid(obj1) : {id(self)}"
f"\n\tid(obj2) : {id(other)}"
f"\n\tobj1.name : {self.name}"
f"\n\tobj2.name : {other.name}"
f"\n\tobj1.id : {self.id}"
f"\n\tobj2.id : {other.id}")
else:
return False

def __hash__(self) -> NonNegativeInt:
return id(self)



Expand Down
28 changes: 26 additions & 2 deletions contentctl/objects/data_source.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations
from typing import Union, Optional, List
from pydantic import model_validator, Field, FilePath
from typing import Optional, Any
from pydantic import Field, FilePath, model_serializer
from contentctl.objects.security_content_object import SecurityContentObject
from contentctl.objects.event_source import EventSource

Expand All @@ -16,3 +16,27 @@ class DataSource(SecurityContentObject):
example_log: Optional[str] = None


@model_serializer
def serialize_model(self):
#Call serializer for parent
super_fields = super().serialize_model()

#All fields custom to this model
model:dict[str,Any] = {
"source": self.source,
"sourcetype": self.sourcetype,
"separator": self.separator,
"configuration": self.configuration,
"supported_TA": self.supported_TA,
"fields": self.fields,
"field_mappings": self.field_mappings,
"convert_to_log_source": self.convert_to_log_source,
"example_log":self.example_log
}


#Combine fields from this model with fields from parent
super_fields.update(model)

#return the model
return super_fields
12 changes: 11 additions & 1 deletion contentctl/objects/story.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,17 @@ class Story(SecurityContentObject):
detections:List[Detection] = []
investigations: List[Investigation] = []
baselines: List[Baseline] = []
data_sources: List[DataSource] = []


@computed_field
@property
def data_sources(self)-> list[DataSource]:
# Only add a data_source if it does not already exist in the story
data_source_objects:set[DataSource] = set()
for detection in self.detections:
data_source_objects.update(set(detection.data_source_objects))

return sorted(list(data_source_objects))


def storyAndInvestigationNamesWithApp(self, app_name:str)->List[str]:
Expand Down

0 comments on commit b3e7d09

Please sign in to comment.