From b8e8f2d56bff17fb95bb9c81e01dab8eedaf2cdf Mon Sep 17 00:00:00 2001 From: pyth0n1c Date: Mon, 22 Jul 2024 13:49:29 -0700 Subject: [PATCH 1/5] Parse and validate CSV files to ensure that each row has at least one row, so that fields/headings can be extracted, and that each row has the proper number of fields in it. Report verbose errors for each row if there is an issue. --- contentctl/objects/lookup.py | 55 ++++++++++++++++++++++++++++++------ 1 file changed, 47 insertions(+), 8 deletions(-) diff --git a/contentctl/objects/lookup.py b/contentctl/objects/lookup.py index c8ac5d81..d0b88fc8 100644 --- a/contentctl/objects/lookup.py +++ b/contentctl/objects/lookup.py @@ -2,6 +2,7 @@ from pydantic import field_validator, ValidationInfo, model_validator, FilePath, model_serializer from typing import TYPE_CHECKING, Optional, Any, Union import re +import csv if TYPE_CHECKING: from contentctl.input.director import DirectorOutputDto from contentctl.objects.config import validate @@ -61,15 +62,53 @@ def fix_lookup_path(cls, data:Any, info: ValidationInfo)->Any: raise ValueError("config required for constructing lookup filename, but it was not") return data - @field_validator('filename') - @classmethod - def lookup_file_valid(cls, v: Union[FilePath,None], info: ValidationInfo): - if not v: - return v - if not (v.name.endswith(".csv") or v.name.endswith(".mlmodel")): - raise ValueError(f"All Lookup files must be CSV files and end in .csv. The following file does not: '{v}'") - return v + def model_post_init(self, ctx:dict[str,Any]): + if not self.filename: + return + import pathlib + filenamePath = pathlib.Path(self.filename) + + if filenamePath.suffix not in [".csv", ".mlmodel"]: + raise ValueError(f"All Lookup files must be CSV files and end in .csv. The following file does not: '{filenamePath}'") + + + + if filenamePath.suffix == ".mlmodel": + # Do not need any additional checks for an mlmodel file + return + + # https://docs.python.org/3/library/csv.html#csv.DictReader + # Column Names (fieldnames) determine by the number of columns in the first row. + # If a row has MORE fields than fieldnames, they will be dumped in a list under the key 'restkey' - this should throw an Exception + # If a row has LESS fields than fieldnames, then the field should contain None by default. This should also throw an exception. + csv_errors:list[str] = [] + with open(filenamePath, "r") as csv_fp: + RESTKEY = "extra_fields_in_a_row" + csv_dict = csv.DictReader(csv_fp, restkey=RESTKEY) + if csv_dict.fieldnames is None: + raise ValueError(f"Error validating the CSV referenced by the lookup: {filenamePath}:\n\t" + "Unable to read fieldnames from CSV. Is the CSV empty?\n" + " Please try opening the file with a CSV Editor to ensure that it is correct.") + # Remember that row 1 has the headers and we do not iterate over it in the loop below + # CSVs are typically indexed starting a row 1 for the header. + for row_index, data_row in enumerate(csv_dict): + row_index+=2 + if len(data_row.get(RESTKEY,[])) > 0: + csv_errors.append(f"row [{row_index}] should have [{len(csv_dict.fieldnames)}] columns," + f" but instead had [{len(csv_dict.fieldnames) + len(data_row.get(RESTKEY,[]))}].") + + for column_index, column_name in enumerate(data_row): + if data_row[column_name] is None: + csv_errors.append(f"row [{row_index}] should have [{len(csv_dict.fieldnames)}] columns, " + f"but instead had [{column_index}].") + if len(csv_errors) > 0: + err_string = '\n\t'.join(csv_errors) + raise ValueError(f"Error validating the CSV referenced by the lookup: {filenamePath}:\n\t{err_string}\n" + f" Please try opening the file with a CSV Editor to ensure that it is correct.") + + return + @field_validator('match_type') @classmethod From a8de0b792b45f31806eb4c56a97a2e6814df6230 Mon Sep 17 00:00:00 2001 From: pyth0n1c Date: Mon, 22 Jul 2024 16:33:30 -0700 Subject: [PATCH 2/5] throw exception if an unused file exists in the lookups directory --- contentctl/actions/validate.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/contentctl/actions/validate.py b/contentctl/actions/validate.py index bd586850..208abc0d 100644 --- a/contentctl/actions/validate.py +++ b/contentctl/actions/validate.py @@ -1,5 +1,5 @@ import sys - +import pathlib from dataclasses import dataclass from pydantic import ValidationError @@ -42,8 +42,19 @@ def execute(self, input_dto: validate) -> DirectorOutputDto: director = Director(director_output_dto) director.execute(input_dto) + self.ensure_no_orphaned_lookup_files(input_dto.path, director_output_dto) return director_output_dto + def ensure_no_orphaned_lookup_files(self, repo_path:pathlib.Path, director_output_dto:DirectorOutputDto): + # get all files in the lookup folder + usedLookupFiles:list[pathlib.Path] = [lookup.filename for lookup in director_output_dto.lookups if lookup.filename is not None] + [lookup.file_path for lookup in director_output_dto.lookups if lookup.file_path is not None] + lookupsDirectory = repo_path/"lookups" + unusedLookupFiles:list[pathlib.Path] = [testFile for testFile in lookupsDirectory.glob("**/*.*") if testFile not in usedLookupFiles] + if len(unusedLookupFiles) > 0: + raise ValueError(f"The following files exist in '{lookupsDirectory}', but either do not end in .yml, .csv, or .mlmodel or are not used by a YML file: {[str(path) for path in unusedLookupFiles]}") + return + + def validate_duplicate_uuids( self, security_content_objects: list[SecurityContentObject_Abstract] ): From 289c3ec2fe6b62939190630baad4994bb34dae0d Mon Sep 17 00:00:00 2001 From: pyth0n1c Date: Tue, 23 Jul 2024 16:22:14 -0700 Subject: [PATCH 3/5] Improve how we determine what file extensions are allowed in the lookups folder. A new function is introduced that improves checking on what file extensions are allowed in any folder which should supercede most usage of the Utils.get_all_yml_files_from_directory function in the future. --- contentctl/actions/validate.py | 90 ++++++++++++++-------------------- contentctl/helper/utils.py | 43 ++++++++++++++++ 2 files changed, 81 insertions(+), 52 deletions(-) diff --git a/contentctl/actions/validate.py b/contentctl/actions/validate.py index 208abc0d..eff89be1 100644 --- a/contentctl/actions/validate.py +++ b/contentctl/actions/validate.py @@ -1,20 +1,11 @@ -import sys -import pathlib -from dataclasses import dataclass - -from pydantic import ValidationError -from typing import Union -from contentctl.objects.enums import SecurityContentProduct -from contentctl.objects.abstract_security_content_objects.security_content_object_abstract import ( - SecurityContentObject_Abstract, -) +import pathlib from contentctl.input.director import Director, DirectorOutputDto - from contentctl.objects.config import validate from contentctl.enrichments.attack_enrichment import AttackEnrichment from contentctl.enrichments.cve_enrichment import CveEnrichment from contentctl.objects.atomic import AtomicTest +from contentctl.helper.utils import Utils class Validate: @@ -42,49 +33,44 @@ def execute(self, input_dto: validate) -> DirectorOutputDto: director = Director(director_output_dto) director.execute(input_dto) - self.ensure_no_orphaned_lookup_files(input_dto.path, director_output_dto) + self.ensure_no_orphaned_files_in_lookups(input_dto.path, director_output_dto) return director_output_dto - def ensure_no_orphaned_lookup_files(self, repo_path:pathlib.Path, director_output_dto:DirectorOutputDto): - # get all files in the lookup folder - usedLookupFiles:list[pathlib.Path] = [lookup.filename for lookup in director_output_dto.lookups if lookup.filename is not None] + [lookup.file_path for lookup in director_output_dto.lookups if lookup.file_path is not None] + + def ensure_no_orphaned_files_in_lookups(self, repo_path:pathlib.Path, director_output_dto:DirectorOutputDto): + """ + This function ensures that only files which are relevant to lookups are included in the lookups folder. + This means that a file must be either: + 1. A lookup YML (.yml) + 2. A lookup CSV (.csv) which is referenced by a YML + 3. A lookup MLMODEL (.mlmodel) which is referenced by a YML. + + All other files, includes CSV and MLMODEL files which are NOT + referenced by a YML, will generate an exception from this function. + + Args: + repo_path (pathlib.Path): path to the root of the app + director_output_dto (DirectorOutputDto): director object with all constructed content + + Raises: + Exception: An Exception will be raised if there are any non .yml, .csv, or .mlmodel + files in this directory. Additionally, an exception will be raised if there + exists one or more .csv or .mlmodel files that are not referenced by at least 1 + detection .yml file in this directory. + This avoids having additional, unused files in this directory that may be copied into + the app when it is built (which can cause appinspect errors or larger app size.) + """ lookupsDirectory = repo_path/"lookups" - unusedLookupFiles:list[pathlib.Path] = [testFile for testFile in lookupsDirectory.glob("**/*.*") if testFile not in usedLookupFiles] + + # Get all of the files referneced by Lookups + usedLookupFiles:list[pathlib.Path] = [lookup.filename for lookup in director_output_dto.lookups if lookup.filename is not None] + [lookup.file_path for lookup in director_output_dto.lookups if lookup.file_path is not None] + + # Get all of the mlmodel and csv files in the lookups directory + csvAndMlmodelFiles = Utils.get_security_content_files_from_directory(lookupsDirectory, allowedFileExtensions=[".yml",".csv",".mlmodel"], fileExtensionsToReturn=[".csv",".mlmodel"]) + + # Generate an exception of any csv or mlmodel files exist but are not used + unusedLookupFiles:list[pathlib.Path] = [testFile for testFile in csvAndMlmodelFiles if testFile not in usedLookupFiles] if len(unusedLookupFiles) > 0: - raise ValueError(f"The following files exist in '{lookupsDirectory}', but either do not end in .yml, .csv, or .mlmodel or are not used by a YML file: {[str(path) for path in unusedLookupFiles]}") + raise Exception(f"The following .csv or .mlmodel files exist in '{lookupsDirectory}', but are not referenced by a lookup file: {[str(path) for path in unusedLookupFiles]}") return - - - def validate_duplicate_uuids( - self, security_content_objects: list[SecurityContentObject_Abstract] - ): - all_uuids = set() - duplicate_uuids = set() - for elem in security_content_objects: - if elem.id in all_uuids: - # The uuid has been found more than once - duplicate_uuids.add(elem.id) - else: - # This is the first time the uuid has been found - all_uuids.add(elem.id) - - if len(duplicate_uuids) == 0: - return - - # At least once duplicate uuid has been found. Enumerate all - # the pieces of content that use duplicate uuids - duplicate_messages = [] - for uuid in duplicate_uuids: - duplicate_uuid_content = [ - str(content.file_path) - for content in security_content_objects - if content.id in duplicate_uuids - ] - duplicate_messages.append( - f"Duplicate UUID [{uuid}] in {duplicate_uuid_content}" - ) - - raise ValueError( - "ERROR: Duplicate ID(s) found in objects:\n" - + "\n - ".join(duplicate_messages) - ) + \ No newline at end of file diff --git a/contentctl/helper/utils.py b/contentctl/helper/utils.py index ae668685..1edd0286 100644 --- a/contentctl/helper/utils.py +++ b/contentctl/helper/utils.py @@ -34,6 +34,49 @@ def get_all_yml_files_from_directory(path: str) -> list[pathlib.Path]: listOfFiles.append(pathlib.Path(os.path.join(dirpath, file))) return sorted(listOfFiles) + + @staticmethod + def get_security_content_files_from_directory(path: pathlib.Path, allowedFileExtensions:list[str]=[".yml"], fileExtensionsToReturn:list[str]=[".yml"]) -> list[pathlib.Path]: + + """ + Get all of the Security Content Object Files rooted in a given directory. These will almost + certain be YML files, but could be other file types as specified by the user + + Args: + path (pathlib.Path): The root path at which to enumerate all Security Content Files. All directories will be traversed. + allowedFileExtensions (set[str], optional): File extensions which are allowed to be present in this directory. In most cases, we do not want to allow the presence of non-YML files. Defaults to [".yml"]. + fileExtensionsToReturn (set[str], optional): Filenames with extensions that should be returned from this function. For example, the lookups/ directory contains YML, CSV, and MLMODEL directories, but only the YMLs are Security Content Objects for constructing Lookyps. Defaults to[".yml"]. + + Raises: + Exception: Will raise an exception if allowedFileExtensions is not a subset of fileExtensionsToReturn. + Exception: Will raise an exception if the path passed to the function does not exist or is not a directory + Exception: Will raise an exception if there are any files rooted in the directory which are not in allowedFileExtensions + + Returns: + list[pathlib.Path]: list of files with an extension in fileExtensionsToReturn found in path + """ + if not set(fileExtensionsToReturn).issubset(set(allowedFileExtensions)): + raise Exception(f"allowedFileExtensions {allowedFileExtensions} MUST be a subset of fileExtensionsToReturn {fileExtensionsToReturn}, but it is not") + + if not path.exists() or not path.is_dir(): + raise Exception(f"Unable to get security_content files, required directory '{str(path)}' does not exist or is not a directory") + + allowedFiles:list[pathlib.Path] = [] + erroneousFiles:list[pathlib.Path] = [] + #Get every single file extension + for filePath in path.glob("**/*.*"): + if filePath.suffix in allowedFileExtensions: + # Yes these are allowed + allowedFiles.append(filePath) + else: + # No these have not been allowed + erroneousFiles.append(filePath) + + if len(erroneousFiles): + raise Exception(f"The following files are not allowed in the directory '{path}'. Only files with the extensions {allowedFileExtensions} are allowed:{[str(filePath) for filePath in erroneousFiles]}") + + # There were no errorneous files, so return the requested files + return [filePath for filePath in allowedFiles if filePath.suffix in fileExtensionsToReturn] @staticmethod def get_all_yml_files_from_directory_one_layer_deep(path: str) -> list[pathlib.Path]: From 415b1e1f0bf6483808180362ff369cd59f057a6e Mon Sep 17 00:00:00 2001 From: pyth0n1c Date: Tue, 23 Jul 2024 16:39:03 -0700 Subject: [PATCH 4/5] sort returned list of files --- contentctl/helper/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contentctl/helper/utils.py b/contentctl/helper/utils.py index 1edd0286..261ecb64 100644 --- a/contentctl/helper/utils.py +++ b/contentctl/helper/utils.py @@ -76,7 +76,7 @@ def get_security_content_files_from_directory(path: pathlib.Path, allowedFileExt raise Exception(f"The following files are not allowed in the directory '{path}'. Only files with the extensions {allowedFileExtensions} are allowed:{[str(filePath) for filePath in erroneousFiles]}") # There were no errorneous files, so return the requested files - return [filePath for filePath in allowedFiles if filePath.suffix in fileExtensionsToReturn] + return sorted([filePath for filePath in allowedFiles if filePath.suffix in fileExtensionsToReturn]) @staticmethod def get_all_yml_files_from_directory_one_layer_deep(path: str) -> list[pathlib.Path]: From c1c2a4057de490a83cf4b712a224230bcc41a234 Mon Sep 17 00:00:00 2001 From: pyth0n1c Date: Tue, 23 Jul 2024 16:44:46 -0700 Subject: [PATCH 5/5] bump pyproject in prep for patch release --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 6b5dec71..5b0d74cc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "contentctl" -version = "4.1.4" +version = "4.1.5" description = "Splunk Content Control Tool" authors = ["STRT "] license = "Apache 2.0"