diff --git a/.vscode/settings.json b/.vscode/settings.json index 973b51b9..8a62413d 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -3,7 +3,8 @@ "python.envFile": "${workspaceFolder}/.env", "python.testing.cwd": "${workspaceFolder}", "python.languageServer": "Pylance", - "python.analysis.typeCheckingMode": "strict" + "python.analysis.typeCheckingMode": "strict", + "editor.defaultFormatter": "ms-python.black-formatter" } \ No newline at end of file diff --git a/contentctl/actions/validate.py b/contentctl/actions/validate.py index c7673af4..bd586850 100644 --- a/contentctl/actions/validate.py +++ b/contentctl/actions/validate.py @@ -6,33 +6,47 @@ from typing import Union from contentctl.objects.enums import SecurityContentProduct -from contentctl.objects.abstract_security_content_objects.security_content_object_abstract import SecurityContentObject_Abstract -from contentctl.input.director import ( - Director, - DirectorOutputDto +from contentctl.objects.abstract_security_content_objects.security_content_object_abstract import ( + SecurityContentObject_Abstract, ) +from contentctl.input.director import Director, DirectorOutputDto from contentctl.objects.config import validate from contentctl.enrichments.attack_enrichment import AttackEnrichment from contentctl.enrichments.cve_enrichment import CveEnrichment from contentctl.objects.atomic import AtomicTest + class Validate: def execute(self, input_dto: validate) -> DirectorOutputDto: - - director_output_dto = DirectorOutputDto(AtomicTest.getAtomicTestsFromArtRepo(repo_path=input_dto.getAtomicRedTeamRepoPath(), - enabled=input_dto.enrichments), - AttackEnrichment.getAttackEnrichment(input_dto), - CveEnrichment.getCveEnrichment(input_dto), - [],[],[],[],[],[],[],[],[]) - - + + director_output_dto = DirectorOutputDto( + AtomicTest.getAtomicTestsFromArtRepo( + repo_path=input_dto.getAtomicRedTeamRepoPath(), + enabled=input_dto.enrichments, + ), + AttackEnrichment.getAttackEnrichment(input_dto), + CveEnrichment.getCveEnrichment(input_dto), + [], + [], + [], + [], + [], + [], + [], + [], + [], + [], + [], + ) + director = Director(director_output_dto) director.execute(input_dto) - return director_output_dto - def validate_duplicate_uuids(self, security_content_objects:list[SecurityContentObject_Abstract]): + def validate_duplicate_uuids( + self, security_content_objects: list[SecurityContentObject_Abstract] + ): all_uuids = set() duplicate_uuids = set() for elem in security_content_objects: @@ -45,14 +59,20 @@ def validate_duplicate_uuids(self, security_content_objects:list[SecurityContent if len(duplicate_uuids) == 0: return - + # At least once duplicate uuid has been found. Enumerate all # the pieces of content that use duplicate uuids duplicate_messages = [] for uuid in duplicate_uuids: - duplicate_uuid_content = [str(content.file_path) for content in security_content_objects if content.id in duplicate_uuids] - duplicate_messages.append(f"Duplicate UUID [{uuid}] in {duplicate_uuid_content}") - + duplicate_uuid_content = [ + str(content.file_path) + for content in security_content_objects + if content.id in duplicate_uuids + ] + duplicate_messages.append( + f"Duplicate UUID [{uuid}] in {duplicate_uuid_content}" + ) + raise ValueError( "ERROR: Duplicate ID(s) found in objects:\n" + "\n - ".join(duplicate_messages) diff --git a/contentctl/helper/utils.py b/contentctl/helper/utils.py index a9b17d3b..ae668685 100644 --- a/contentctl/helper/utils.py +++ b/contentctl/helper/utils.py @@ -25,6 +25,9 @@ class Utils: @staticmethod def get_all_yml_files_from_directory(path: str) -> list[pathlib.Path]: listOfFiles:list[pathlib.Path] = [] + base_path = pathlib.Path(path) + if not base_path.exists(): + return listOfFiles for (dirpath, dirnames, filenames) in os.walk(path): for file in filenames: if file.endswith(".yml"): @@ -32,6 +35,24 @@ def get_all_yml_files_from_directory(path: str) -> list[pathlib.Path]: return sorted(listOfFiles) + @staticmethod + def get_all_yml_files_from_directory_one_layer_deep(path: str) -> list[pathlib.Path]: + listOfFiles: list[pathlib.Path] = [] + base_path = pathlib.Path(path) + if not base_path.exists(): + return listOfFiles + # Check the base directory + for item in base_path.iterdir(): + if item.is_file() and item.suffix == '.yml': + listOfFiles.append(item) + # Check one subfolder level deep + for subfolder in base_path.iterdir(): + if subfolder.is_dir() and subfolder.name != "cim": + for item in subfolder.iterdir(): + if item.is_file() and item.suffix == '.yml': + listOfFiles.append(item) + return sorted(listOfFiles) + @staticmethod def add_id(id_dict:dict[str, list[pathlib.Path]], obj:SecurityContentObject, path:pathlib.Path) -> None: diff --git a/contentctl/input/director.py b/contentctl/input/director.py index 72d78996..b53b4770 100644 --- a/contentctl/input/director.py +++ b/contentctl/input/director.py @@ -1,5 +1,6 @@ import os import sys +import pathlib from typing import Union from dataclasses import dataclass, field from pydantic import ValidationError @@ -20,11 +21,24 @@ from contentctl.objects.ssa_detection import SSADetection from contentctl.objects.atomic import AtomicTest from contentctl.objects.security_content_object import SecurityContentObject +from contentctl.objects.data_source import DataSource +from contentctl.objects.event_source import EventSource from contentctl.enrichments.attack_enrichment import AttackEnrichment from contentctl.enrichments.cve_enrichment import CveEnrichment from contentctl.objects.config import validate +from contentctl.input.ssa_detection_builder import SSADetectionBuilder +from contentctl.objects.enums import SecurityContentType + +from contentctl.objects.enums import DetectionStatus +from contentctl.helper.utils import Utils + +from contentctl.input.ssa_detection_builder import SSADetectionBuilder +from contentctl.objects.enums import SecurityContentType + +from contentctl.objects.enums import DetectionStatus +from contentctl.helper.utils import Utils @dataclass @@ -43,7 +57,8 @@ class DirectorOutputDto: lookups: list[Lookup] deployments: list[Deployment] ssa_detections: list[SSADetection] - + data_sources: list[DataSource] + event_sources: list[EventSource] name_to_content_map: dict[str, SecurityContentObject] = field(default_factory=dict) uuid_to_content_map: dict[UUID, SecurityContentObject] = field(default_factory=dict) @@ -92,66 +107,84 @@ def addContentToDictMappings(self, content: SecurityContentObject): self.uuid_to_content_map[content.id] = content -from contentctl.input.ssa_detection_builder import SSADetectionBuilder -from contentctl.objects.enums import SecurityContentType - -from contentctl.objects.enums import DetectionStatus -from contentctl.helper.utils import Utils - - class Director(): input_dto: validate output_dto: DirectorOutputDto ssa_detection_builder: SSADetectionBuilder - - def __init__(self, output_dto: DirectorOutputDto) -> None: self.output_dto = output_dto self.ssa_detection_builder = SSADetectionBuilder() - + def execute(self, input_dto: validate) -> None: self.input_dto = input_dto - - self.createSecurityContent(SecurityContentType.deployments) self.createSecurityContent(SecurityContentType.lookups) self.createSecurityContent(SecurityContentType.macros) self.createSecurityContent(SecurityContentType.stories) self.createSecurityContent(SecurityContentType.baselines) self.createSecurityContent(SecurityContentType.investigations) + self.createSecurityContent(SecurityContentType.event_sources) + self.createSecurityContent(SecurityContentType.data_sources) self.createSecurityContent(SecurityContentType.playbooks) self.createSecurityContent(SecurityContentType.detections) - - self.createSecurityContent(SecurityContentType.ssa_detections) - def createSecurityContent(self, contentType: SecurityContentType) -> None: if contentType == SecurityContentType.ssa_detections: - files = Utils.get_all_yml_files_from_directory(os.path.join(self.input_dto.path, 'ssa_detections')) - security_content_files = [f for f in files if f.name.startswith('ssa___')] - - elif contentType in [SecurityContentType.deployments, - SecurityContentType.lookups, - SecurityContentType.macros, - SecurityContentType.stories, - SecurityContentType.baselines, - SecurityContentType.investigations, - SecurityContentType.playbooks, - SecurityContentType.detections]: - files = Utils.get_all_yml_files_from_directory(os.path.join(self.input_dto.path, str(contentType.name))) - security_content_files = [f for f in files if not f.name.startswith('ssa___')] + files = Utils.get_all_yml_files_from_directory( + os.path.join(self.input_dto.path, "ssa_detections") + ) + security_content_files = [f for f in files if f.name.startswith("ssa___")] + + elif contentType == SecurityContentType.data_sources: + security_content_files = ( + Utils.get_all_yml_files_from_directory_one_layer_deep( + os.path.join(self.input_dto.path, "data_sources") + ) + ) + + elif contentType == SecurityContentType.event_sources: + security_content_files = Utils.get_all_yml_files_from_directory( + os.path.join(self.input_dto.path, "data_sources", "cloud", "event_sources") + ) + security_content_files.extend( + Utils.get_all_yml_files_from_directory( + os.path.join(self.input_dto.path, "data_sources", "endpoint", "event_sources") + ) + ) + security_content_files.extend( + Utils.get_all_yml_files_from_directory( + os.path.join(self.input_dto.path, "data_sources", "network", "event_sources") + ) + ) + + elif contentType in [ + SecurityContentType.deployments, + SecurityContentType.lookups, + SecurityContentType.macros, + SecurityContentType.stories, + SecurityContentType.baselines, + SecurityContentType.investigations, + SecurityContentType.playbooks, + SecurityContentType.detections, + ]: + files = Utils.get_all_yml_files_from_directory( + os.path.join(self.input_dto.path, str(contentType.name)) + ) + security_content_files = [ + f for f in files if not f.name.startswith("ssa___") + ] else: - raise(Exception(f"Cannot createSecurityContent for unknown product.")) + raise (Exception(f"Cannot createSecurityContent for unknown product.")) validation_errors = [] - + already_ran = False progress_percent = 0 - - for index,file in enumerate(security_content_files): - progress_percent = ((index+1)/len(security_content_files)) * 100 + + for index, file in enumerate(security_content_files): + progress_percent = ((index + 1) / len(security_content_files)) * 100 try: type_string = contentType.name.upper() modelDict = YmlReader.load_file(file) @@ -167,7 +200,7 @@ def createSecurityContent(self, contentType: SecurityContentType) -> None: elif contentType == SecurityContentType.deployments: deployment = Deployment.model_validate(modelDict,context={"output_dto":self.output_dto}) self.output_dto.addContentToDictMappings(deployment) - + elif contentType == SecurityContentType.playbooks: playbook = Playbook.model_validate(modelDict,context={"output_dto":self.output_dto}) self.output_dto.addContentToDictMappings(playbook) @@ -193,36 +226,67 @@ def createSecurityContent(self, contentType: SecurityContentType) -> None: ssa_detection = self.ssa_detection_builder.getObject() if ssa_detection.status in [DetectionStatus.production.value, DetectionStatus.validation.value]: self.output_dto.addContentToDictMappings(ssa_detection) + + elif contentType == SecurityContentType.data_sources: + data_source = DataSource.model_validate( + modelDict, context={"output_dto": self.output_dto} + ) + self.output_dto.data_sources.append(data_source) + + elif contentType == SecurityContentType.event_sources: + event_source = EventSource.model_validate( + modelDict, context={"output_dto": self.output_dto} + ) + self.output_dto.event_sources.append(event_source) else: - raise Exception(f"Unsupported type: [{contentType}]") - - if (sys.stdout.isatty() and sys.stdin.isatty() and sys.stderr.isatty()) or not already_ran: - already_ran = True - print(f"\r{f'{type_string} Progress'.rjust(23)}: [{progress_percent:3.0f}%]...", end="", flush=True) - - except (ValidationError, ValueError) as e: - relative_path = file.absolute().relative_to(self.input_dto.path.absolute()) - validation_errors.append((relative_path,e)) - + raise Exception(f"Unsupported type: [{contentType}]") + + if ( + sys.stdout.isatty() and sys.stdin.isatty() and sys.stderr.isatty() + ) or not already_ran: + already_ran = True + print( + f"\r{f'{type_string} Progress'.rjust(23)}: [{progress_percent:3.0f}%]...", + end="", + flush=True, + ) - print(f"\r{f'{contentType.name.upper()} Progress'.rjust(23)}: [{progress_percent:3.0f}%]...", end="", flush=True) + except (ValidationError, ValueError) as e: + relative_path = file.absolute().relative_to( + self.input_dto.path.absolute() + ) + validation_errors.append((relative_path, e)) + + print( + f"\r{f'{contentType.name.upper()} Progress'.rjust(23)}: [{progress_percent:3.0f}%]...", + end="", + flush=True, + ) print("Done!") if len(validation_errors) > 0: - errors_string = '\n\n'.join([f"File: {e_tuple[0]}\nError: {str(e_tuple[1])}" for e_tuple in validation_errors]) - #print(f"The following {len(validation_errors)} error(s) were found during validation:\n\n{errors_string}\n\nVALIDATION FAILED") + errors_string = "\n\n".join( + [ + f"File: {e_tuple[0]}\nError: {str(e_tuple[1])}" + for e_tuple in validation_errors + ] + ) + # print(f"The following {len(validation_errors)} error(s) were found during validation:\n\n{errors_string}\n\nVALIDATION FAILED") # We quit after validation a single type/group of content because it can cause significant cascading errors in subsequent # types of content (since they may import or otherwise use it) - raise Exception(f"The following {len(validation_errors)} error(s) were found during validation:\n\n{errors_string}\n\nVALIDATION FAILED") - - - - + raise Exception( + f"The following {len(validation_errors)} error(s) were found during validation:\n\n{errors_string}\n\nVALIDATION FAILED" + ) - def constructSSADetection(self, builder: SSADetectionBuilder, directorOutput:DirectorOutputDto, file_path: str) -> None: + def constructSSADetection( + self, + builder: SSADetectionBuilder, + directorOutput: DirectorOutputDto, + file_path: str, + ) -> None: builder.reset() - builder.setObject(file_path,self.output_dto) + builder.setObject(file_path) builder.addMitreAttackEnrichmentNew(directorOutput.attack_enrichment) builder.addKillChainPhase() builder.addCIS() diff --git a/contentctl/input/ssa_detection_builder.py b/contentctl/input/ssa_detection_builder.py index 90c0ecde..b6c63bf6 100644 --- a/contentctl/input/ssa_detection_builder.py +++ b/contentctl/input/ssa_detection_builder.py @@ -13,17 +13,14 @@ from contentctl.enrichments.splunk_app_enrichment import SplunkAppEnrichment from contentctl.objects.ssa_detection import SSADetection from contentctl.objects.constants import * -from contentctl.input.director import DirectorOutputDto from contentctl.enrichments.attack_enrichment import AttackEnrichment class SSADetectionBuilder(): security_content_obj : SSADetection - def setObject(self, path: str, - output_dto:DirectorOutputDto ) -> None: - yml_dict = YmlReader.load_file(path) - #self.security_content_obj = SSADetection.model_validate(yml_dict, context={"output_dto":output_dto}) + def setObject(self, path: str) -> None: + yml_dict = YmlReader.load_file(path) self.security_content_obj = SSADetection.parse_obj(yml_dict) self.security_content_obj.source = os.path.split(os.path.dirname(self.security_content_obj.file_path))[-1] diff --git a/contentctl/objects/abstract_security_content_objects/detection_abstract.py b/contentctl/objects/abstract_security_content_objects/detection_abstract.py index e4ecf82f..8389ad78 100644 --- a/contentctl/objects/abstract_security_content_objects/detection_abstract.py +++ b/contentctl/objects/abstract_security_content_objects/detection_abstract.py @@ -40,7 +40,7 @@ class Detection_Abstract(SecurityContentObject): search: Union[str, dict[str,Any]] = Field(...) how_to_implement: str = Field(..., min_length=4) known_false_positives: str = Field(..., min_length=4) - #data_source: Optional[List[DataSource]] = None + data_source_objects: Optional[List[DataSource]] = None enabled_by_default: bool = False file_path: FilePath = Field(...) @@ -369,8 +369,6 @@ def model_post_init(self, ctx:dict[str,Any]): # if not isinstance(director,DirectorOutputDto): # raise ValueError("DirectorOutputDto was not passed in context of Detection model_post_init") director: Optional[DirectorOutputDto] = ctx.get("output_dto",None) - for story in self.tags.analytic_story: - story.detections.append(self) #Ensure that all baselines link to this detection for baseline in self.baselines: @@ -385,10 +383,25 @@ def model_post_init(self, ctx:dict[str,Any]): if replaced is False: raise ValueError(f"Error, failed to replace detection reference in Baseline '{baseline.name}' to detection '{self.name}'") baseline.tags.detections = new_detections - - return self + self.data_source_objects = [] + for data_source_obj in director.data_sources: + for detection_data_source in self.data_source: + if data_source_obj.name in detection_data_source: + self.data_source_objects.append(data_source_obj) + + # Remove duplicate data source objects based on their 'name' property + unique_data_sources = {} + for data_source_obj in self.data_source_objects: + if data_source_obj.name not in unique_data_sources: + unique_data_sources[data_source_obj.name] = data_source_obj + self.data_source_objects = list(unique_data_sources.values()) + for story in self.tags.analytic_story: + story.detections.append(self) + story.data_sources.extend(self.data_source_objects) + + return self @field_validator('lookups',mode="before") diff --git a/contentctl/objects/data_source.py b/contentctl/objects/data_source.py index 04feb347..2c87777e 100644 --- a/contentctl/objects/data_source.py +++ b/contentctl/objects/data_source.py @@ -2,20 +2,27 @@ from pydantic import BaseModel - class DataSource(BaseModel): name: str id: str - date: str author: str - type: str source: str sourcetype: str - category: str = None - product: str - service: str = None - supported_TA: list - references: list - raw_fields: list - field_mappings: list = None - convert_to_log_source: list = None \ No newline at end of file + separator: str = None + configuration: str = None + supported_TA: dict + event_names: list = None + event_sources: list = None + fields: list = None + example_log: str = None + + def model_post_init(self, ctx:dict[str,Any]): + context = ctx.get("output_dto") + + if self.event_names: + self.event_sources = [] + for event_source in context.event_sources: + if any(event['event_name'] == event_source.event_name for event in self.event_names): + self.event_sources.append(event_source) + + return self \ No newline at end of file diff --git a/contentctl/objects/enums.py b/contentctl/objects/enums.py index 5cb06400..e7016033 100644 --- a/contentctl/objects/enums.py +++ b/contentctl/objects/enums.py @@ -55,6 +55,8 @@ class SecurityContentType(enum.Enum): investigations = 8 unit_tests = 9 ssa_detections = 10 + data_sources = 11 + event_sources = 12 # Bringing these changes back in line will take some time after # the initial merge is complete diff --git a/contentctl/objects/event_source.py b/contentctl/objects/event_source.py new file mode 100644 index 00000000..c14dcf53 --- /dev/null +++ b/contentctl/objects/event_source.py @@ -0,0 +1,10 @@ +from __future__ import annotations +from pydantic import BaseModel + + +class EventSource(BaseModel): + event_name: str + fields: list[str] + field_mappings: list[dict] = None + convert_to_log_source: list[dict] = None + example_log: str = None diff --git a/contentctl/objects/story.py b/contentctl/objects/story.py index 05a36fb8..6a2eac1c 100644 --- a/contentctl/objects/story.py +++ b/contentctl/objects/story.py @@ -7,7 +7,7 @@ from contentctl.objects.detection import Detection from contentctl.objects.investigation import Investigation from contentctl.objects.baseline import Baseline - + from contentctl.objects.data_source import DataSource from contentctl.objects.security_content_object import SecurityContentObject @@ -33,7 +33,7 @@ class Story(SecurityContentObject): detections:List[Detection] = [] investigations: List[Investigation] = [] baselines: List[Baseline] = [] - + data_sources: List[DataSource] = [] def storyAndInvestigationNamesWithApp(self, app_name:str)->List[str]: return [f"{app_name} - {name} - Rule" for name in self.detection_names] + \