Skip to content

Commit

Permalink
Merge pull request #180 from splunk/add_support_for_data_source_objects
Browse files Browse the repository at this point in the history
Add support for data source objects
  • Loading branch information
pyth0n1c authored Jul 10, 2024
2 parents d670f3e + 58f08fb commit bccb33e
Show file tree
Hide file tree
Showing 10 changed files with 232 additions and 97 deletions.
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
"python.envFile": "${workspaceFolder}/.env",
"python.testing.cwd": "${workspaceFolder}",
"python.languageServer": "Pylance",
"python.analysis.typeCheckingMode": "strict"
"python.analysis.typeCheckingMode": "strict",
"editor.defaultFormatter": "ms-python.black-formatter"


}
56 changes: 38 additions & 18 deletions contentctl/actions/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,47 @@
from typing import Union

from contentctl.objects.enums import SecurityContentProduct
from contentctl.objects.abstract_security_content_objects.security_content_object_abstract import SecurityContentObject_Abstract
from contentctl.input.director import (
Director,
DirectorOutputDto
from contentctl.objects.abstract_security_content_objects.security_content_object_abstract import (
SecurityContentObject_Abstract,
)
from contentctl.input.director import Director, DirectorOutputDto

from contentctl.objects.config import validate
from contentctl.enrichments.attack_enrichment import AttackEnrichment
from contentctl.enrichments.cve_enrichment import CveEnrichment
from contentctl.objects.atomic import AtomicTest


class Validate:
def execute(self, input_dto: validate) -> DirectorOutputDto:

director_output_dto = DirectorOutputDto(AtomicTest.getAtomicTestsFromArtRepo(repo_path=input_dto.getAtomicRedTeamRepoPath(),
enabled=input_dto.enrichments),
AttackEnrichment.getAttackEnrichment(input_dto),
CveEnrichment.getCveEnrichment(input_dto),
[],[],[],[],[],[],[],[],[])



director_output_dto = DirectorOutputDto(
AtomicTest.getAtomicTestsFromArtRepo(
repo_path=input_dto.getAtomicRedTeamRepoPath(),
enabled=input_dto.enrichments,
),
AttackEnrichment.getAttackEnrichment(input_dto),
CveEnrichment.getCveEnrichment(input_dto),
[],
[],
[],
[],
[],
[],
[],
[],
[],
[],
[],
)

director = Director(director_output_dto)
director.execute(input_dto)

return director_output_dto

def validate_duplicate_uuids(self, security_content_objects:list[SecurityContentObject_Abstract]):
def validate_duplicate_uuids(
self, security_content_objects: list[SecurityContentObject_Abstract]
):
all_uuids = set()
duplicate_uuids = set()
for elem in security_content_objects:
Expand All @@ -45,14 +59,20 @@ def validate_duplicate_uuids(self, security_content_objects:list[SecurityContent

if len(duplicate_uuids) == 0:
return

# At least once duplicate uuid has been found. Enumerate all
# the pieces of content that use duplicate uuids
duplicate_messages = []
for uuid in duplicate_uuids:
duplicate_uuid_content = [str(content.file_path) for content in security_content_objects if content.id in duplicate_uuids]
duplicate_messages.append(f"Duplicate UUID [{uuid}] in {duplicate_uuid_content}")

duplicate_uuid_content = [
str(content.file_path)
for content in security_content_objects
if content.id in duplicate_uuids
]
duplicate_messages.append(
f"Duplicate UUID [{uuid}] in {duplicate_uuid_content}"
)

raise ValueError(
"ERROR: Duplicate ID(s) found in objects:\n"
+ "\n - ".join(duplicate_messages)
Expand Down
21 changes: 21 additions & 0 deletions contentctl/helper/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,34 @@ class Utils:
@staticmethod
def get_all_yml_files_from_directory(path: str) -> list[pathlib.Path]:
listOfFiles:list[pathlib.Path] = []
base_path = pathlib.Path(path)
if not base_path.exists():
return listOfFiles
for (dirpath, dirnames, filenames) in os.walk(path):
for file in filenames:
if file.endswith(".yml"):
listOfFiles.append(pathlib.Path(os.path.join(dirpath, file)))

return sorted(listOfFiles)

@staticmethod
def get_all_yml_files_from_directory_one_layer_deep(path: str) -> list[pathlib.Path]:
listOfFiles: list[pathlib.Path] = []
base_path = pathlib.Path(path)
if not base_path.exists():
return listOfFiles
# Check the base directory
for item in base_path.iterdir():
if item.is_file() and item.suffix == '.yml':
listOfFiles.append(item)
# Check one subfolder level deep
for subfolder in base_path.iterdir():
if subfolder.is_dir() and subfolder.name != "cim":
for item in subfolder.iterdir():
if item.is_file() and item.suffix == '.yml':
listOfFiles.append(item)
return sorted(listOfFiles)


@staticmethod
def add_id(id_dict:dict[str, list[pathlib.Path]], obj:SecurityContentObject, path:pathlib.Path) -> None:
Expand Down
174 changes: 119 additions & 55 deletions contentctl/input/director.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import sys
import pathlib
from typing import Union
from dataclasses import dataclass, field
from pydantic import ValidationError
Expand All @@ -20,11 +21,24 @@
from contentctl.objects.ssa_detection import SSADetection
from contentctl.objects.atomic import AtomicTest
from contentctl.objects.security_content_object import SecurityContentObject
from contentctl.objects.data_source import DataSource
from contentctl.objects.event_source import EventSource

from contentctl.enrichments.attack_enrichment import AttackEnrichment
from contentctl.enrichments.cve_enrichment import CveEnrichment

from contentctl.objects.config import validate
from contentctl.input.ssa_detection_builder import SSADetectionBuilder
from contentctl.objects.enums import SecurityContentType

from contentctl.objects.enums import DetectionStatus
from contentctl.helper.utils import Utils

from contentctl.input.ssa_detection_builder import SSADetectionBuilder
from contentctl.objects.enums import SecurityContentType

from contentctl.objects.enums import DetectionStatus
from contentctl.helper.utils import Utils


@dataclass
Expand All @@ -43,7 +57,8 @@ class DirectorOutputDto:
lookups: list[Lookup]
deployments: list[Deployment]
ssa_detections: list[SSADetection]

data_sources: list[DataSource]
event_sources: list[EventSource]
name_to_content_map: dict[str, SecurityContentObject] = field(default_factory=dict)
uuid_to_content_map: dict[UUID, SecurityContentObject] = field(default_factory=dict)

Expand Down Expand Up @@ -92,66 +107,84 @@ def addContentToDictMappings(self, content: SecurityContentObject):
self.uuid_to_content_map[content.id] = content


from contentctl.input.ssa_detection_builder import SSADetectionBuilder
from contentctl.objects.enums import SecurityContentType

from contentctl.objects.enums import DetectionStatus
from contentctl.helper.utils import Utils


class Director():
input_dto: validate
output_dto: DirectorOutputDto
ssa_detection_builder: SSADetectionBuilder



def __init__(self, output_dto: DirectorOutputDto) -> None:
self.output_dto = output_dto
self.ssa_detection_builder = SSADetectionBuilder()

def execute(self, input_dto: validate) -> None:
self.input_dto = input_dto


self.createSecurityContent(SecurityContentType.deployments)
self.createSecurityContent(SecurityContentType.lookups)
self.createSecurityContent(SecurityContentType.macros)
self.createSecurityContent(SecurityContentType.stories)
self.createSecurityContent(SecurityContentType.baselines)
self.createSecurityContent(SecurityContentType.investigations)
self.createSecurityContent(SecurityContentType.event_sources)
self.createSecurityContent(SecurityContentType.data_sources)
self.createSecurityContent(SecurityContentType.playbooks)
self.createSecurityContent(SecurityContentType.detections)


self.createSecurityContent(SecurityContentType.ssa_detections)


def createSecurityContent(self, contentType: SecurityContentType) -> None:
if contentType == SecurityContentType.ssa_detections:
files = Utils.get_all_yml_files_from_directory(os.path.join(self.input_dto.path, 'ssa_detections'))
security_content_files = [f for f in files if f.name.startswith('ssa___')]

elif contentType in [SecurityContentType.deployments,
SecurityContentType.lookups,
SecurityContentType.macros,
SecurityContentType.stories,
SecurityContentType.baselines,
SecurityContentType.investigations,
SecurityContentType.playbooks,
SecurityContentType.detections]:
files = Utils.get_all_yml_files_from_directory(os.path.join(self.input_dto.path, str(contentType.name)))
security_content_files = [f for f in files if not f.name.startswith('ssa___')]
files = Utils.get_all_yml_files_from_directory(
os.path.join(self.input_dto.path, "ssa_detections")
)
security_content_files = [f for f in files if f.name.startswith("ssa___")]

elif contentType == SecurityContentType.data_sources:
security_content_files = (
Utils.get_all_yml_files_from_directory_one_layer_deep(
os.path.join(self.input_dto.path, "data_sources")
)
)

elif contentType == SecurityContentType.event_sources:
security_content_files = Utils.get_all_yml_files_from_directory(
os.path.join(self.input_dto.path, "data_sources", "cloud", "event_sources")
)
security_content_files.extend(
Utils.get_all_yml_files_from_directory(
os.path.join(self.input_dto.path, "data_sources", "endpoint", "event_sources")
)
)
security_content_files.extend(
Utils.get_all_yml_files_from_directory(
os.path.join(self.input_dto.path, "data_sources", "network", "event_sources")
)
)

elif contentType in [
SecurityContentType.deployments,
SecurityContentType.lookups,
SecurityContentType.macros,
SecurityContentType.stories,
SecurityContentType.baselines,
SecurityContentType.investigations,
SecurityContentType.playbooks,
SecurityContentType.detections,
]:
files = Utils.get_all_yml_files_from_directory(
os.path.join(self.input_dto.path, str(contentType.name))
)
security_content_files = [
f for f in files if not f.name.startswith("ssa___")
]
else:
raise(Exception(f"Cannot createSecurityContent for unknown product."))
raise (Exception(f"Cannot createSecurityContent for unknown product."))

validation_errors = []

already_ran = False
progress_percent = 0
for index,file in enumerate(security_content_files):
progress_percent = ((index+1)/len(security_content_files)) * 100

for index, file in enumerate(security_content_files):
progress_percent = ((index + 1) / len(security_content_files)) * 100
try:
type_string = contentType.name.upper()
modelDict = YmlReader.load_file(file)
Expand All @@ -167,7 +200,7 @@ def createSecurityContent(self, contentType: SecurityContentType) -> None:
elif contentType == SecurityContentType.deployments:
deployment = Deployment.model_validate(modelDict,context={"output_dto":self.output_dto})
self.output_dto.addContentToDictMappings(deployment)

elif contentType == SecurityContentType.playbooks:
playbook = Playbook.model_validate(modelDict,context={"output_dto":self.output_dto})
self.output_dto.addContentToDictMappings(playbook)
Expand All @@ -193,36 +226,67 @@ def createSecurityContent(self, contentType: SecurityContentType) -> None:
ssa_detection = self.ssa_detection_builder.getObject()
if ssa_detection.status in [DetectionStatus.production.value, DetectionStatus.validation.value]:
self.output_dto.addContentToDictMappings(ssa_detection)

elif contentType == SecurityContentType.data_sources:
data_source = DataSource.model_validate(
modelDict, context={"output_dto": self.output_dto}
)
self.output_dto.data_sources.append(data_source)

elif contentType == SecurityContentType.event_sources:
event_source = EventSource.model_validate(
modelDict, context={"output_dto": self.output_dto}
)
self.output_dto.event_sources.append(event_source)

else:
raise Exception(f"Unsupported type: [{contentType}]")

if (sys.stdout.isatty() and sys.stdin.isatty() and sys.stderr.isatty()) or not already_ran:
already_ran = True
print(f"\r{f'{type_string} Progress'.rjust(23)}: [{progress_percent:3.0f}%]...", end="", flush=True)

except (ValidationError, ValueError) as e:
relative_path = file.absolute().relative_to(self.input_dto.path.absolute())
validation_errors.append((relative_path,e))

raise Exception(f"Unsupported type: [{contentType}]")

if (
sys.stdout.isatty() and sys.stdin.isatty() and sys.stderr.isatty()
) or not already_ran:
already_ran = True
print(
f"\r{f'{type_string} Progress'.rjust(23)}: [{progress_percent:3.0f}%]...",
end="",
flush=True,
)

print(f"\r{f'{contentType.name.upper()} Progress'.rjust(23)}: [{progress_percent:3.0f}%]...", end="", flush=True)
except (ValidationError, ValueError) as e:
relative_path = file.absolute().relative_to(
self.input_dto.path.absolute()
)
validation_errors.append((relative_path, e))

print(
f"\r{f'{contentType.name.upper()} Progress'.rjust(23)}: [{progress_percent:3.0f}%]...",
end="",
flush=True,
)
print("Done!")

if len(validation_errors) > 0:
errors_string = '\n\n'.join([f"File: {e_tuple[0]}\nError: {str(e_tuple[1])}" for e_tuple in validation_errors])
#print(f"The following {len(validation_errors)} error(s) were found during validation:\n\n{errors_string}\n\nVALIDATION FAILED")
errors_string = "\n\n".join(
[
f"File: {e_tuple[0]}\nError: {str(e_tuple[1])}"
for e_tuple in validation_errors
]
)
# print(f"The following {len(validation_errors)} error(s) were found during validation:\n\n{errors_string}\n\nVALIDATION FAILED")
# We quit after validation a single type/group of content because it can cause significant cascading errors in subsequent
# types of content (since they may import or otherwise use it)
raise Exception(f"The following {len(validation_errors)} error(s) were found during validation:\n\n{errors_string}\n\nVALIDATION FAILED")




raise Exception(
f"The following {len(validation_errors)} error(s) were found during validation:\n\n{errors_string}\n\nVALIDATION FAILED"
)

def constructSSADetection(self, builder: SSADetectionBuilder, directorOutput:DirectorOutputDto, file_path: str) -> None:
def constructSSADetection(
self,
builder: SSADetectionBuilder,
directorOutput: DirectorOutputDto,
file_path: str,
) -> None:
builder.reset()
builder.setObject(file_path,self.output_dto)
builder.setObject(file_path)
builder.addMitreAttackEnrichmentNew(directorOutput.attack_enrichment)
builder.addKillChainPhase()
builder.addCIS()
Expand Down
Loading

0 comments on commit bccb33e

Please sign in to comment.