Skip to content

Commit

Permalink
Merge pull request #200 from splunk/validate_csv
Browse files Browse the repository at this point in the history
Parse and validate CSV files
  • Loading branch information
pyth0n1c authored Jul 23, 2024
2 parents 3c7df89 + c1c2a40 commit 7b10d64
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 51 deletions.
81 changes: 39 additions & 42 deletions contentctl/actions/validate.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,11 @@
import sys

from dataclasses import dataclass

from pydantic import ValidationError
from typing import Union

from contentctl.objects.enums import SecurityContentProduct
from contentctl.objects.abstract_security_content_objects.security_content_object_abstract import (
SecurityContentObject_Abstract,
)
import pathlib
from contentctl.input.director import Director, DirectorOutputDto

from contentctl.objects.config import validate
from contentctl.enrichments.attack_enrichment import AttackEnrichment
from contentctl.enrichments.cve_enrichment import CveEnrichment
from contentctl.objects.atomic import AtomicTest
from contentctl.helper.utils import Utils


class Validate:
Expand Down Expand Up @@ -42,38 +33,44 @@ def execute(self, input_dto: validate) -> DirectorOutputDto:

director = Director(director_output_dto)
director.execute(input_dto)
self.ensure_no_orphaned_files_in_lookups(input_dto.path, director_output_dto)
return director_output_dto

def validate_duplicate_uuids(
self, security_content_objects: list[SecurityContentObject_Abstract]
):
all_uuids = set()
duplicate_uuids = set()
for elem in security_content_objects:
if elem.id in all_uuids:
# The uuid has been found more than once
duplicate_uuids.add(elem.id)
else:
# This is the first time the uuid has been found
all_uuids.add(elem.id)

def ensure_no_orphaned_files_in_lookups(self, repo_path:pathlib.Path, director_output_dto:DirectorOutputDto):
"""
This function ensures that only files which are relevant to lookups are included in the lookups folder.
This means that a file must be either:
1. A lookup YML (.yml)
2. A lookup CSV (.csv) which is referenced by a YML
3. A lookup MLMODEL (.mlmodel) which is referenced by a YML.
All other files, includes CSV and MLMODEL files which are NOT
referenced by a YML, will generate an exception from this function.
Args:
repo_path (pathlib.Path): path to the root of the app
director_output_dto (DirectorOutputDto): director object with all constructed content
if len(duplicate_uuids) == 0:
return
Raises:
Exception: An Exception will be raised if there are any non .yml, .csv, or .mlmodel
files in this directory. Additionally, an exception will be raised if there
exists one or more .csv or .mlmodel files that are not referenced by at least 1
detection .yml file in this directory.
This avoids having additional, unused files in this directory that may be copied into
the app when it is built (which can cause appinspect errors or larger app size.)
"""
lookupsDirectory = repo_path/"lookups"

# Get all of the files referneced by Lookups
usedLookupFiles:list[pathlib.Path] = [lookup.filename for lookup in director_output_dto.lookups if lookup.filename is not None] + [lookup.file_path for lookup in director_output_dto.lookups if lookup.file_path is not None]

# At least once duplicate uuid has been found. Enumerate all
# the pieces of content that use duplicate uuids
duplicate_messages = []
for uuid in duplicate_uuids:
duplicate_uuid_content = [
str(content.file_path)
for content in security_content_objects
if content.id in duplicate_uuids
]
duplicate_messages.append(
f"Duplicate UUID [{uuid}] in {duplicate_uuid_content}"
)

raise ValueError(
"ERROR: Duplicate ID(s) found in objects:\n"
+ "\n - ".join(duplicate_messages)
)
# Get all of the mlmodel and csv files in the lookups directory
csvAndMlmodelFiles = Utils.get_security_content_files_from_directory(lookupsDirectory, allowedFileExtensions=[".yml",".csv",".mlmodel"], fileExtensionsToReturn=[".csv",".mlmodel"])

# Generate an exception of any csv or mlmodel files exist but are not used
unusedLookupFiles:list[pathlib.Path] = [testFile for testFile in csvAndMlmodelFiles if testFile not in usedLookupFiles]
if len(unusedLookupFiles) > 0:
raise Exception(f"The following .csv or .mlmodel files exist in '{lookupsDirectory}', but are not referenced by a lookup file: {[str(path) for path in unusedLookupFiles]}")
return

43 changes: 43 additions & 0 deletions contentctl/helper/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,49 @@ def get_all_yml_files_from_directory(path: str) -> list[pathlib.Path]:
listOfFiles.append(pathlib.Path(os.path.join(dirpath, file)))

return sorted(listOfFiles)

@staticmethod
def get_security_content_files_from_directory(path: pathlib.Path, allowedFileExtensions:list[str]=[".yml"], fileExtensionsToReturn:list[str]=[".yml"]) -> list[pathlib.Path]:

"""
Get all of the Security Content Object Files rooted in a given directory. These will almost
certain be YML files, but could be other file types as specified by the user
Args:
path (pathlib.Path): The root path at which to enumerate all Security Content Files. All directories will be traversed.
allowedFileExtensions (set[str], optional): File extensions which are allowed to be present in this directory. In most cases, we do not want to allow the presence of non-YML files. Defaults to [".yml"].
fileExtensionsToReturn (set[str], optional): Filenames with extensions that should be returned from this function. For example, the lookups/ directory contains YML, CSV, and MLMODEL directories, but only the YMLs are Security Content Objects for constructing Lookyps. Defaults to[".yml"].
Raises:
Exception: Will raise an exception if allowedFileExtensions is not a subset of fileExtensionsToReturn.
Exception: Will raise an exception if the path passed to the function does not exist or is not a directory
Exception: Will raise an exception if there are any files rooted in the directory which are not in allowedFileExtensions
Returns:
list[pathlib.Path]: list of files with an extension in fileExtensionsToReturn found in path
"""
if not set(fileExtensionsToReturn).issubset(set(allowedFileExtensions)):
raise Exception(f"allowedFileExtensions {allowedFileExtensions} MUST be a subset of fileExtensionsToReturn {fileExtensionsToReturn}, but it is not")

if not path.exists() or not path.is_dir():
raise Exception(f"Unable to get security_content files, required directory '{str(path)}' does not exist or is not a directory")

allowedFiles:list[pathlib.Path] = []
erroneousFiles:list[pathlib.Path] = []
#Get every single file extension
for filePath in path.glob("**/*.*"):
if filePath.suffix in allowedFileExtensions:
# Yes these are allowed
allowedFiles.append(filePath)
else:
# No these have not been allowed
erroneousFiles.append(filePath)

if len(erroneousFiles):
raise Exception(f"The following files are not allowed in the directory '{path}'. Only files with the extensions {allowedFileExtensions} are allowed:{[str(filePath) for filePath in erroneousFiles]}")

# There were no errorneous files, so return the requested files
return sorted([filePath for filePath in allowedFiles if filePath.suffix in fileExtensionsToReturn])

@staticmethod
def get_all_yml_files_from_directory_one_layer_deep(path: str) -> list[pathlib.Path]:
Expand Down
55 changes: 47 additions & 8 deletions contentctl/objects/lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from pydantic import field_validator, ValidationInfo, model_validator, FilePath, model_serializer
from typing import TYPE_CHECKING, Optional, Any, Union
import re
import csv
if TYPE_CHECKING:
from contentctl.input.director import DirectorOutputDto
from contentctl.objects.config import validate
Expand Down Expand Up @@ -61,15 +62,53 @@ def fix_lookup_path(cls, data:Any, info: ValidationInfo)->Any:
raise ValueError("config required for constructing lookup filename, but it was not")
return data

@field_validator('filename')
@classmethod
def lookup_file_valid(cls, v: Union[FilePath,None], info: ValidationInfo):
if not v:
return v
if not (v.name.endswith(".csv") or v.name.endswith(".mlmodel")):
raise ValueError(f"All Lookup files must be CSV files and end in .csv. The following file does not: '{v}'")

return v
def model_post_init(self, ctx:dict[str,Any]):
if not self.filename:
return
import pathlib
filenamePath = pathlib.Path(self.filename)

if filenamePath.suffix not in [".csv", ".mlmodel"]:
raise ValueError(f"All Lookup files must be CSV files and end in .csv. The following file does not: '{filenamePath}'")



if filenamePath.suffix == ".mlmodel":
# Do not need any additional checks for an mlmodel file
return

# https://docs.python.org/3/library/csv.html#csv.DictReader
# Column Names (fieldnames) determine by the number of columns in the first row.
# If a row has MORE fields than fieldnames, they will be dumped in a list under the key 'restkey' - this should throw an Exception
# If a row has LESS fields than fieldnames, then the field should contain None by default. This should also throw an exception.
csv_errors:list[str] = []
with open(filenamePath, "r") as csv_fp:
RESTKEY = "extra_fields_in_a_row"
csv_dict = csv.DictReader(csv_fp, restkey=RESTKEY)
if csv_dict.fieldnames is None:
raise ValueError(f"Error validating the CSV referenced by the lookup: {filenamePath}:\n\t"
"Unable to read fieldnames from CSV. Is the CSV empty?\n"
" Please try opening the file with a CSV Editor to ensure that it is correct.")
# Remember that row 1 has the headers and we do not iterate over it in the loop below
# CSVs are typically indexed starting a row 1 for the header.
for row_index, data_row in enumerate(csv_dict):
row_index+=2
if len(data_row.get(RESTKEY,[])) > 0:
csv_errors.append(f"row [{row_index}] should have [{len(csv_dict.fieldnames)}] columns,"
f" but instead had [{len(csv_dict.fieldnames) + len(data_row.get(RESTKEY,[]))}].")

for column_index, column_name in enumerate(data_row):
if data_row[column_name] is None:
csv_errors.append(f"row [{row_index}] should have [{len(csv_dict.fieldnames)}] columns, "
f"but instead had [{column_index}].")
if len(csv_errors) > 0:
err_string = '\n\t'.join(csv_errors)
raise ValueError(f"Error validating the CSV referenced by the lookup: {filenamePath}:\n\t{err_string}\n"
f" Please try opening the file with a CSV Editor to ensure that it is correct.")

return


@field_validator('match_type')
@classmethod
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "contentctl"
version = "4.1.4"
version = "4.1.5"
description = "Splunk Content Control Tool"
authors = ["STRT <[email protected]>"]
license = "Apache 2.0"
Expand Down

0 comments on commit 7b10d64

Please sign in to comment.