diff --git a/contentctl/actions/detection_testing/GitService.py b/contentctl/actions/detection_testing/GitService.py index bfed85a3..fe1f4ca8 100644 --- a/contentctl/actions/detection_testing/GitService.py +++ b/contentctl/actions/detection_testing/GitService.py @@ -67,9 +67,9 @@ def getChanges(self, target_branch:str)->List[Detection]: #Make a filename to content map filepath_to_content_map = { obj.file_path:obj for (_,obj) in self.director.name_to_content_map.items()} - updated_detections:List[Detection] = [] - updated_macros:List[Macro] = [] - updated_lookups:List[Lookup] =[] + updated_detections:set[Detection] = set() + updated_macros:set[Macro] = set() + updated_lookups:set[Lookup] = set() for diff in all_diffs: if type(diff) == pygit2.Patch: @@ -80,14 +80,14 @@ def getChanges(self, target_branch:str)->List[Detection]: if decoded_path.is_relative_to(self.config.path/"detections") and decoded_path.suffix == ".yml": detectionObject = filepath_to_content_map.get(decoded_path, None) if isinstance(detectionObject, Detection): - updated_detections.append(detectionObject) + updated_detections.add(detectionObject) else: raise Exception(f"Error getting detection object for file {str(decoded_path)}") elif decoded_path.is_relative_to(self.config.path/"macros") and decoded_path.suffix == ".yml": macroObject = filepath_to_content_map.get(decoded_path, None) if isinstance(macroObject, Macro): - updated_macros.append(macroObject) + updated_macros.add(macroObject) else: raise Exception(f"Error getting macro object for file {str(decoded_path)}") @@ -98,7 +98,7 @@ def getChanges(self, target_branch:str)->List[Detection]: updatedLookup = filepath_to_content_map.get(decoded_path, None) if not isinstance(updatedLookup,Lookup): raise Exception(f"Expected {decoded_path} to be type {type(Lookup)}, but instead if was {(type(updatedLookup))}") - updated_lookups.append(updatedLookup) + updated_lookups.add(updatedLookup) elif decoded_path.suffix == ".csv": # If the CSV was updated, we want to make sure that we @@ -125,7 +125,7 @@ def getChanges(self, target_branch:str)->List[Detection]: if updatedLookup is not None and updatedLookup not in updated_lookups: # It is possible that both the CSV and YML have been modified for the same lookup, # and we do not want to add it twice. - updated_lookups.append(updatedLookup) + updated_lookups.add(updatedLookup) else: pass @@ -136,7 +136,7 @@ def getChanges(self, target_branch:str)->List[Detection]: # If a detection has at least one dependency on changed content, # then we must test it again - changed_macros_and_lookups = updated_macros + updated_lookups + changed_macros_and_lookups:set[SecurityContentObject] = updated_macros.union(updated_lookups) for detection in self.director.detections: if detection in updated_detections: @@ -146,14 +146,14 @@ def getChanges(self, target_branch:str)->List[Detection]: for obj in changed_macros_and_lookups: if obj in detection.get_content_dependencies(): - updated_detections.append(detection) + updated_detections.add(detection) break #Print out the names of all modified/new content modifiedAndNewContentString = "\n - ".join(sorted([d.name for d in updated_detections])) print(f"[{len(updated_detections)}] Pieces of modifed and new content (this may include experimental/deprecated/manual_test content):\n - {modifiedAndNewContentString}") - return updated_detections + return sorted(list(updated_detections)) def getSelected(self, detectionFilenames: List[FilePath]) -> List[Detection]: filepath_to_content_map: dict[FilePath, SecurityContentObject] = { diff --git a/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructure.py b/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructure.py index 95ebc464..8e816025 100644 --- a/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructure.py +++ b/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructure.py @@ -13,7 +13,7 @@ from shutil import copyfile from typing import Union, Optional -from pydantic import BaseModel, PrivateAttr, Field, dataclasses +from pydantic import ConfigDict, BaseModel, PrivateAttr, Field, dataclasses import requests # type: ignore import splunklib.client as client # type: ignore from splunklib.binding import HTTPError # type: ignore @@ -48,9 +48,9 @@ class SetupTestGroupResults(BaseModel): success: bool = True duration: float = 0 start_time: float - - class Config: - arbitrary_types_allowed = True + model_config = ConfigDict( + arbitrary_types_allowed=True + ) class CleanupTestGroupResults(BaseModel): @@ -68,6 +68,15 @@ class CannotRunBaselineException(Exception): # exception pass +class ReplayIndexDoesNotExistOnServer(Exception): + ''' + In order to replay data files into the Splunk Server + for testing, they must be replayed into an index that + exists. If that index does not exist, this error will + be generated and raised before we try to do anything else + with that Data File. + ''' + pass @dataclasses.dataclass(frozen=False) class DetectionTestingManagerOutputDto(): @@ -75,7 +84,7 @@ class DetectionTestingManagerOutputDto(): outputQueue: list[Detection] = Field(default_factory=list) currentTestingQueue: dict[str, Union[Detection, None]] = Field(default_factory=dict) start_time: Union[datetime.datetime, None] = None - replay_index: str = "CONTENTCTL_TESTING_INDEX" + replay_index: str = "contentctl_testing_index" replay_host: str = "CONTENTCTL_HOST" timeout_seconds: int = 60 terminate: bool = False @@ -88,12 +97,13 @@ class DetectionTestingInfrastructure(BaseModel, abc.ABC): sync_obj: DetectionTestingManagerOutputDto hec_token: str = "" hec_channel: str = "" + all_indexes_on_server: list[str] = [] _conn: client.Service = PrivateAttr() pbar: tqdm.tqdm = None start_time: Optional[float] = None - - class Config: - arbitrary_types_allowed = True + model_config = ConfigDict( + arbitrary_types_allowed=True + ) def __init__(self, **data): super().__init__(**data) @@ -131,6 +141,7 @@ def setup(self): (self.get_conn, "Waiting for App Installation"), (self.configure_conf_file_datamodels, "Configuring Datamodels"), (self.create_replay_index, f"Create index '{self.sync_obj.replay_index}'"), + (self.get_all_indexes, "Getting all indexes from server"), (self.configure_imported_roles, "Configuring Roles"), (self.configure_delete_indexes, "Configuring Indexes"), (self.configure_hec, "Configuring HEC"), @@ -169,12 +180,11 @@ def configure_hec(self): pass try: - res = self.get_conn().inputs.create( name="DETECTION_TESTING_HEC", kind="http", index=self.sync_obj.replay_index, - indexes=f"{self.sync_obj.replay_index},_internal,_audit", + indexes=",".join(self.all_indexes_on_server), # This allows the HEC to write to all indexes useACK=True, ) self.hec_token = str(res.token) @@ -183,6 +193,23 @@ def configure_hec(self): except Exception as e: raise (Exception(f"Failure creating HEC Endpoint: {str(e)}")) + def get_all_indexes(self) -> None: + """ + Retrieve a list of all indexes in the Splunk instance + """ + try: + # We do not include the replay index because by + # the time we get to this function, it has already + # been created on the server. + indexes = [] + res = self.get_conn().indexes + for index in res.list(): + indexes.append(index.name) + # Retrieve all available indexes on the splunk instance + self.all_indexes_on_server = indexes + except Exception as e: + raise (Exception(f"Failure getting indexes: {str(e)}")) + def get_conn(self) -> client.Service: try: if not self._conn: @@ -265,11 +292,7 @@ def configure_imported_roles( self, imported_roles: list[str] = ["user", "power", "can_delete"], enterprise_security_roles: list[str] = ["ess_admin", "ess_analyst", "ess_user"], - indexes: list[str] = ["_*", "*"], - ): - indexes.append(self.sync_obj.replay_index) - indexes_encoded = ";".join(indexes) - + ): try: # Set which roles should be configured. For Enterprise Security/Integration Testing, # we must add some extra foles. @@ -281,7 +304,7 @@ def configure_imported_roles( self.get_conn().roles.post( self.infrastructure.splunk_app_username, imported_roles=roles, - srchIndexesAllowed=indexes_encoded, + srchIndexesAllowed=";".join(self.all_indexes_on_server), srchIndexesDefault=self.sync_obj.replay_index, ) return @@ -293,19 +316,17 @@ def configure_imported_roles( self.get_conn().roles.post( self.infrastructure.splunk_app_username, imported_roles=imported_roles, - srchIndexesAllowed=indexes_encoded, + srchIndexesAllowed=";".join(self.all_indexes_on_server), srchIndexesDefault=self.sync_obj.replay_index, ) - def configure_delete_indexes(self, indexes: list[str] = ["_*", "*"]): - indexes.append(self.sync_obj.replay_index) + def configure_delete_indexes(self): endpoint = "/services/properties/authorize/default/deleteIndexesAllowed" - indexes_encoded = ";".join(indexes) try: - self.get_conn().post(endpoint, value=indexes_encoded) + self.get_conn().post(endpoint, value=";".join(self.all_indexes_on_server)) except Exception as e: self.pbar.write( - f"Error configuring deleteIndexesAllowed with '{indexes_encoded}': [{str(e)}]" + f"Error configuring deleteIndexesAllowed with '{self.all_indexes_on_server}': [{str(e)}]" ) def wait_for_conf_file(self, app_name: str, conf_file_name: str): @@ -654,8 +675,6 @@ def execute_unit_test( # Set the mode and timeframe, if required kwargs = {"exec_mode": "blocking"} - - # Set earliest_time and latest_time appropriately if FORCE_ALL_TIME is False if not FORCE_ALL_TIME: if test.earliest_time is not None: @@ -1035,8 +1054,8 @@ def retry_search_until_timeout( # Get the start time and compute the timeout search_start_time = time.time() search_stop_time = time.time() + self.sync_obj.timeout_seconds - - # Make a copy of the search string since we may + + # Make a copy of the search string since we may # need to make some small changes to it below search = detection.search @@ -1088,8 +1107,6 @@ def retry_search_until_timeout( # Initialize the collection of fields that are empty that shouldn't be present_threat_objects: set[str] = set() empty_fields: set[str] = set() - - # Filter out any messages in the results for result in results: @@ -1119,7 +1136,7 @@ def retry_search_until_timeout( # not populated and we should throw an error. This can happen if there is a typo # on a field. In this case, the field will appear but will not contain any values current_empty_fields: set[str] = set() - + for field in observable_fields_set: if result.get(field, 'null') == 'null': if field in risk_object_fields_set: @@ -1139,9 +1156,7 @@ def retry_search_until_timeout( if field in threat_object_fields_set: present_threat_objects.add(field) continue - - # If everything succeeded up until now, and no empty fields are found in the # current result, then the search was a success if len(current_empty_fields) == 0: @@ -1155,8 +1170,7 @@ def retry_search_until_timeout( else: empty_fields = empty_fields.union(current_empty_fields) - - + missing_threat_objects = threat_object_fields_set - present_threat_objects # Report a failure if there were empty fields in a threat object in all results if len(missing_threat_objects) > 0: @@ -1172,7 +1186,6 @@ def retry_search_until_timeout( duration=time.time() - search_start_time, ) return - test.result.set_job_content( job.content, @@ -1233,9 +1246,19 @@ def replay_attack_data_file( test_group: TestGroup, test_group_start_time: float, ): - tempfile = mktemp(dir=tmp_dir) - + # Before attempting to replay the file, ensure that the index we want + # to replay into actuall exists. If not, we should throw a detailed + # exception that can easily be interpreted by the user. + if attack_data_file.custom_index is not None and \ + attack_data_file.custom_index not in self.all_indexes_on_server: + raise ReplayIndexDoesNotExistOnServer( + f"Unable to replay data file {attack_data_file.data} " + f"into index '{attack_data_file.custom_index}'. " + "The index does not exist on the Splunk Server. " + f"The only valid indexes on the server are {self.all_indexes_on_server}" + ) + tempfile = mktemp(dir=tmp_dir) if not (str(attack_data_file.data).startswith("http://") or str(attack_data_file.data).startswith("https://")) : if pathlib.Path(str(attack_data_file.data)).is_file(): @@ -1280,7 +1303,6 @@ def replay_attack_data_file( ) ) - # Upload the data self.format_pbar_string( TestReportingType.GROUP, diff --git a/contentctl/actions/detection_testing/views/DetectionTestingViewWeb.py b/contentctl/actions/detection_testing/views/DetectionTestingViewWeb.py index 5e2e46c0..cd50d978 100644 --- a/contentctl/actions/detection_testing/views/DetectionTestingViewWeb.py +++ b/contentctl/actions/detection_testing/views/DetectionTestingViewWeb.py @@ -1,12 +1,14 @@ -from bottle import template, Bottle, ServerAdapter -from contentctl.actions.detection_testing.views.DetectionTestingView import ( - DetectionTestingView, -) +from threading import Thread +from bottle import template, Bottle, ServerAdapter from wsgiref.simple_server import make_server, WSGIRequestHandler import jinja2 import webbrowser -from threading import Thread +from pydantic import ConfigDict + +from contentctl.actions.detection_testing.views.DetectionTestingView import ( + DetectionTestingView, +) DEFAULT_WEB_UI_PORT = 7999 @@ -100,9 +102,9 @@ def log_exception(*args, **kwargs): class DetectionTestingViewWeb(DetectionTestingView): bottleApp: Bottle = Bottle() server: SimpleWebServer = SimpleWebServer(host="0.0.0.0", port=DEFAULT_WEB_UI_PORT) - - class Config: - arbitrary_types_allowed = True + model_config = ConfigDict( + arbitrary_types_allowed=True + ) def setup(self): self.bottleApp.route("/", callback=self.showStatus) diff --git a/contentctl/enrichments/cve_enrichment.py b/contentctl/enrichments/cve_enrichment.py index 748a66b3..66160eda 100644 --- a/contentctl/enrichments/cve_enrichment.py +++ b/contentctl/enrichments/cve_enrichment.py @@ -5,7 +5,7 @@ import shelve import time from typing import Annotated, Any, Union, TYPE_CHECKING -from pydantic import BaseModel,Field, computed_field +from pydantic import ConfigDict, BaseModel,Field, computed_field from decimal import Decimal from requests.exceptions import ReadTimeout from contentctl.objects.annotated_types import CVE_TYPE @@ -32,13 +32,12 @@ def url(self)->str: class CveEnrichment(BaseModel): use_enrichment: bool = True cve_api_obj: Union[CVESearch,None] = None - - class Config: - # Arbitrary_types are allowed to let us use the CVESearch Object - arbitrary_types_allowed = True - frozen = True - + # Arbitrary_types are allowed to let us use the CVESearch Object + model_config = ConfigDict( + arbitrary_types_allowed=True, + frozen=True + ) @staticmethod def getCveEnrichment(config:validate, timeout_seconds:int=10, force_disable_enrichment:bool=True)->CveEnrichment: diff --git a/contentctl/objects/abstract_security_content_objects/detection_abstract.py b/contentctl/objects/abstract_security_content_objects/detection_abstract.py index 1b716097..34374a88 100644 --- a/contentctl/objects/abstract_security_content_objects/detection_abstract.py +++ b/contentctl/objects/abstract_security_content_objects/detection_abstract.py @@ -36,7 +36,7 @@ from contentctl.objects.integration_test import IntegrationTest from contentctl.objects.data_source import DataSource from contentctl.objects.base_test_result import TestResultStatus - +from contentctl.objects.drilldown import Drilldown, DRILLDOWN_SEARCH_PLACEHOLDER from contentctl.objects.enums import ProvidingTechnology from contentctl.enrichments.cve_enrichment import CveEnrichmentObj import datetime @@ -90,6 +90,7 @@ class Detection_Abstract(SecurityContentObject): test_groups: list[TestGroup] = [] data_source_objects: list[DataSource] = [] + drilldown_searches: list[Drilldown] = Field(default=[], description="A list of Drilldowns that should be included with this search") def get_conf_stanza_name(self, app:CustomApp)->str: stanza_name = CONTENTCTL_DETECTION_STANZA_NAME_FORMAT_TEMPLATE.format(app_label=app.label, detection_name=self.name) @@ -167,6 +168,7 @@ def adjust_tests_and_groups(self) -> None: the model from the list of unit tests. Also, preemptively skips all manual tests, as well as tests for experimental/deprecated detections and Correlation type detections. """ + # Since ManualTest and UnitTest are not differentiable without looking at the manual_test # tag, Pydantic builds all tests as UnitTest objects. If we see the manual_test flag, we # convert these to ManualTest @@ -563,6 +565,46 @@ def model_post_init(self, __context: Any) -> None: # Derive TestGroups and IntegrationTests, adjust for ManualTests, skip as needed self.adjust_tests_and_groups() + # Ensure that if there is at least 1 drilldown, at least + # 1 of the drilldowns contains the string Drilldown.SEARCH_PLACEHOLDER. + # This is presently a requirement when 1 or more drilldowns are added to a detection. + # Note that this is only required for production searches that are not hunting + + if self.type == AnalyticsType.Hunting.value or self.status != DetectionStatus.production.value: + #No additional check need to happen on the potential drilldowns. + pass + else: + found_placeholder = False + if len(self.drilldown_searches) < 2: + raise ValueError(f"This detection is required to have 2 drilldown_searches, but only has [{len(self.drilldown_searches)}]") + for drilldown in self.drilldown_searches: + if DRILLDOWN_SEARCH_PLACEHOLDER in drilldown.search: + found_placeholder = True + if not found_placeholder: + raise ValueError("Detection has one or more drilldown_searches, but none of them " + f"contained '{DRILLDOWN_SEARCH_PLACEHOLDER}. This is a requirement " + "if drilldown_searches are defined.'") + + # Update the search fields with the original search, if required + for drilldown in self.drilldown_searches: + drilldown.perform_search_substitutions(self) + + #For experimental purposes, add the default drilldowns + #self.drilldown_searches.extend(Drilldown.constructDrilldownsFromDetection(self)) + + @property + def drilldowns_in_JSON(self) -> list[dict[str,str]]: + """This function is required for proper JSON + serializiation of drilldowns to occur in savedsearches.conf. + It returns the list[Drilldown] as a list[dict]. + Without this function, the jinja template is unable + to convert list[Drilldown] to JSON + + Returns: + list[dict[str,str]]: List of Drilldowns dumped to dict format + """ + return [drilldown.model_dump() for drilldown in self.drilldown_searches] + @field_validator('lookups', mode="before") @classmethod def getDetectionLookups(cls, v:list[str], info:ValidationInfo) -> list[Lookup]: @@ -789,6 +831,45 @@ def search_observables_exist_validate(self): # Found everything return self + @field_validator("tests", mode="before") + def ensure_yml_test_is_unittest(cls, v:list[dict]): + """The typing for the tests field allows it to be one of + a number of different types of tests. However, ONLY + UnitTest should be allowed to be defined in the YML + file. If part of the UnitTest defined in the YML + is incorrect, such as the attack_data file, then + it will FAIL to be instantiated as a UnitTest and + may instead be instantiated as a different type of + test, such as IntegrationTest (since that requires + less fields) which is incorrect. Ensure that any + raw data read from the YML can actually construct + a valid UnitTest and, if not, return errors right + away instead of letting Pydantic try to construct + it into a different type of test + + Args: + v (list[dict]): list of dicts read from the yml. + Each one SHOULD be a valid UnitTest. If we cannot + construct a valid unitTest from it, a ValueError should be raised + + Returns: + _type_: The input of the function, assuming no + ValueError is raised. + """ + valueErrors:list[ValueError] = [] + for unitTest in v: + #This raises a ValueError on a failed UnitTest. + try: + UnitTest.model_validate(unitTest) + except ValueError as e: + valueErrors.append(e) + if len(valueErrors): + raise ValueError(valueErrors) + # All of these can be constructred as UnitTests with no + # Exceptions, so let the normal flow continue + return v + + @field_validator("tests") def tests_validate( cls, diff --git a/contentctl/objects/base_test_result.py b/contentctl/objects/base_test_result.py index 1e1b287c..d29f93cb 100644 --- a/contentctl/objects/base_test_result.py +++ b/contentctl/objects/base_test_result.py @@ -1,8 +1,8 @@ from typing import Union, Any from enum import Enum -from pydantic import BaseModel -from splunklib.data import Record +from pydantic import ConfigDict, BaseModel +from splunklib.data import Record # type: ignore from contentctl.helper.utils import Utils @@ -53,11 +53,11 @@ class BaseTestResult(BaseModel): # The Splunk endpoint URL sid_link: Union[None, str] = None - class Config: - validate_assignment = True - - # Needed to allow for embedding of Exceptions in the model - arbitrary_types_allowed = True + # Needed to allow for embedding of Exceptions in the model + model_config = ConfigDict( + validate_assignment=True, + arbitrary_types_allowed=True + ) @property def passed(self) -> bool: diff --git a/contentctl/objects/correlation_search.py b/contentctl/objects/correlation_search.py index a0b25da9..504fdf6f 100644 --- a/contentctl/objects/correlation_search.py +++ b/contentctl/objects/correlation_search.py @@ -1,10 +1,11 @@ import logging import time import json -from typing import Union, Optional, Any +from typing import Any from enum import Enum +from functools import cached_property -from pydantic import BaseModel, validator, Field, PrivateAttr +from pydantic import ConfigDict, BaseModel, computed_field, Field, PrivateAttr from splunklib.results import JSONResultsReader, Message # type: ignore from splunklib.binding import HTTPError, ResponseReader # type: ignore import splunklib.client as splunklib # type: ignore @@ -15,7 +16,7 @@ from contentctl.objects.base_test_result import TestResultStatus from contentctl.objects.integration_test_result import IntegrationTestResult from contentctl.actions.detection_testing.progress_bar import ( - format_pbar_string, + format_pbar_string, # type: ignore TestReportingType, TestingStates ) @@ -178,13 +179,14 @@ class PbarData(BaseModel): :param fq_test_name: the fully qualifed (fq) test name (":") used for logging :param start_time: the start time used for logging """ - pbar: tqdm + pbar: tqdm # type: ignore fq_test_name: str start_time: float - class Config: - # needed to support the tqdm type - arbitrary_types_allowed = True + # needed to support the tqdm type + model_config = ConfigDict( + arbitrary_types_allowed=True + ) class CorrelationSearch(BaseModel): @@ -197,143 +199,110 @@ class CorrelationSearch(BaseModel): :param pbar_data: the encapsulated info needed for logging w/ pbar :param test_index: the index attack data is forwarded to for testing (optionally used in cleanup) """ - ## The following three fields are explicitly needed at instantiation # noqa: E266 - # the detection associated with the correlation search (e.g. "Windows Modify Registry EnableLinkedConnections") - detection: Detection + detection: Detection = Field(...) # a Service instance representing a connection to a Splunk instance - service: splunklib.Service + service: splunklib.Service = Field(...) # the encapsulated info needed for logging w/ pbar - pbar_data: PbarData - - ## The following field is optional for instantiation # noqa: E266 + pbar_data: PbarData = Field(...) # The index attack data is sent to; can be None if we are relying on the caller to do our # cleanup of this index - test_index: Optional[str] = Field(default=None, min_length=1) - - ## All remaining fields can be derived from other fields or have intentional defaults that # noqa: E266 - ## should not be changed (validators should prevent instantiating some of these fields directly # noqa: E266 - ## to prevent undefined behavior) # noqa: E266 + test_index: str | None = Field(default=None, min_length=1) # The logger to use (logs all go to a null pipe unless ENABLE_LOGGING is set to True, so as not # to conflict w/ tqdm) - logger: logging.Logger = Field(default_factory=get_logger) - - # The search name (e.g. "ESCU - Windows Modify Registry EnableLinkedConnections - Rule") - name: Optional[str] = None - - # The path to the saved search on the Splunk instance - splunk_path: Optional[str] = None - - # A model of the saved search as provided by splunklib - saved_search: Optional[splunklib.SavedSearch] = None + logger: logging.Logger = Field(default_factory=get_logger, init=False) # The set of indexes to clear on cleanup - indexes_to_purge: set[str] = set() + indexes_to_purge: set[str] = Field(default=set(), init=False) # The risk analysis adaptive response action (if defined) - risk_analysis_action: Union[RiskAnalysisAction, None] = None + _risk_analysis_action: RiskAnalysisAction | None = PrivateAttr(default=None) # The notable adaptive response action (if defined) - notable_action: Union[NotableAction, None] = None + _notable_action: NotableAction | None = PrivateAttr(default=None) # The list of risk events found - _risk_events: Optional[list[RiskEvent]] = PrivateAttr(default=None) + _risk_events: list[RiskEvent] | None = PrivateAttr(default=None) # The list of notable events found - _notable_events: Optional[list[NotableEvent]] = PrivateAttr(default=None) + _notable_events: list[NotableEvent] | None = PrivateAttr(default=None) - class Config: - # needed to allow fields w/ types like SavedSearch - arbitrary_types_allowed = True - # We want to have more ridgid typing - extra = 'forbid' + # Need arbitrary types to allow fields w/ types like SavedSearch; we also want to forbid + # unexpected fields + model_config = ConfigDict( + arbitrary_types_allowed=True, + extra='forbid' + ) - @validator("name", always=True) - @classmethod - def _convert_detection_to_search_name(cls, v, values) -> str: - """ - Validate name and derive if None - """ - if "detection" not in values: - raise ValueError("detection missing; name is dependent on detection") + def model_post_init(self, __context: Any) -> None: + super().model_post_init(__context) - expected_name = f"ESCU - {values['detection'].name} - Rule" - if v is not None and v != expected_name: - raise ValueError( - "name must be derived from detection; leave as None and it will be derived automatically" - ) - return expected_name + # Parse the initial values for the risk/notable actions + self._parse_risk_and_notable_actions() - @validator("splunk_path", always=True) - @classmethod - def _derive_splunk_path(cls, v, values) -> str: + @computed_field + @cached_property + def name(self) -> str: """ - Validate splunk_path and derive if None + The search name (e.g. "ESCU - Windows Modify Registry EnableLinkedConnections - Rule") + + :returns: the search name + :rtype: str """ - if "name" not in values: - raise ValueError("name missing; splunk_path is dependent on name") + return f"ESCU - {self.detection.name} - Rule" - expected_path = f"saved/searches/{values['name']}" - if v is not None and v != expected_path: - raise ValueError( - "splunk_path must be derived from name; leave as None and it will be derived automatically" - ) - return f"saved/searches/{values['name']}" + @computed_field + @cached_property + def splunk_path(self) -> str: + """ + The path to the saved search on the Splunk instance - @validator("saved_search", always=True) - @classmethod - def _instantiate_saved_search(cls, v, values) -> str: + :returns: the search path + :rtype: str """ - Ensure saved_search was initialized as None and derive + return f"/saved/searches/{self.name}" + + @computed_field + @cached_property + def saved_search(self) -> splunklib.SavedSearch: """ - if "splunk_path" not in values or "service" not in values: - raise ValueError("splunk_path or service missing; saved_search is dependent on both") + A model of the saved search as provided by splunklib - if v is not None: - raise ValueError( - "saved_search must be derived from the service and splunk_path; leave as None and it will be derived " - "automatically" - ) + :returns: the SavedSearch object + :rtype: :class:`splunklib.client.SavedSearch` + """ return splunklib.SavedSearch( - values['service'], - values['splunk_path'], + self.service, + self.splunk_path, ) - @validator("risk_analysis_action", always=True) - @classmethod - def _init_risk_analysis_action(cls, v, values) -> Optional[RiskAnalysisAction]: - """ - Initialize risk_analysis_action + # TODO (cmcginley): need to make this refreshable + @computed_field + @property + def risk_analysis_action(self) -> RiskAnalysisAction | None: """ - if "saved_search" not in values: - raise ValueError("saved_search missing; risk_analysis_action is dependent on saved_search") + The risk analysis adaptive response action (if defined) - if v is not None: - raise ValueError( - "risk_analysis_action must be derived from the saved_search; leave as None and it will be derived " - "automatically" - ) - return CorrelationSearch._get_risk_analysis_action(values['saved_search'].content) - - @validator("notable_action", always=True) - @classmethod - def _init_notable_action(cls, v, values) -> Optional[NotableAction]: + :returns: the RiskAnalysisAction object, if it exists + :rtype: :class:`contentctl.objects.risk_analysis_action.RiskAnalysisAction` | None """ - Initialize notable_action + return self._risk_analysis_action + + # TODO (cmcginley): need to make this refreshable + @computed_field + @property + def notable_action(self) -> NotableAction | None: """ - if "saved_search" not in values: - raise ValueError("saved_search missing; notable_action is dependent on saved_search") + The notable adaptive response action (if defined) - if v is not None: - raise ValueError( - "notable_action must be derived from the saved_search; leave as None and it will be derived " - "automatically" - ) - return CorrelationSearch._get_notable_action(values['saved_search'].content) + :returns: the NotableAction object, if it exists + :rtype: :class:`contentctl.objects.notable_action.NotableAction` | None + """ + return self._notable_action @property def earliest_time(self) -> str: @@ -393,7 +362,7 @@ def has_notable_action(self) -> bool: return self.notable_action is not None @staticmethod - def _get_risk_analysis_action(content: dict[str, Any]) -> Optional[RiskAnalysisAction]: + def _get_risk_analysis_action(content: dict[str, Any]) -> RiskAnalysisAction | None: """ Given the saved search content, parse the risk analysis action :param content: a dict of strings to values @@ -407,7 +376,7 @@ def _get_risk_analysis_action(content: dict[str, Any]) -> Optional[RiskAnalysisA return None @staticmethod - def _get_notable_action(content: dict[str, Any]) -> Optional[NotableAction]: + def _get_notable_action(content: dict[str, Any]) -> NotableAction | None: """ Given the saved search content, parse the notable action :param content: a dict of strings to values @@ -431,10 +400,6 @@ def _get_relevant_observables(observables: list[Observable]) -> list[Observable] relevant.append(observable) return relevant - # TODO (PEX-484): ideally, we could handle this and the following init w/ a call to - # model_post_init, so that all the logic is encapsulated w/in _parse_risk_and_notable_actions - # but that is a pydantic v2 feature (see the init validators for risk/notable actions): - # https://docs.pydantic.dev/latest/api/base_model/#pydantic.main.BaseModel.model_post_init def _parse_risk_and_notable_actions(self) -> None: """Parses the risk/notable metadata we care about from self.saved_search.content @@ -445,12 +410,12 @@ def _parse_risk_and_notable_actions(self) -> None: unpacked to be anything other than a singleton """ # grab risk details if present - self.risk_analysis_action = CorrelationSearch._get_risk_analysis_action( + self._risk_analysis_action = CorrelationSearch._get_risk_analysis_action( self.saved_search.content # type: ignore ) # grab notable details if present - self.notable_action = CorrelationSearch._get_notable_action(self.saved_search.content) # type: ignore + self._notable_action = CorrelationSearch._get_notable_action(self.saved_search.content) # type: ignore def refresh(self) -> None: """Refreshes the metadata in the SavedSearch entity, and re-parses the fields we care about @@ -738,7 +703,7 @@ def validate_risk_events(self) -> None: # TODO (#250): Re-enable and refactor code that validates the specific risk counts # Validate risk events in aggregate; we should have an equal amount of risk events for each # relevant observable, and the total count should match the total number of events - # individual_count: Optional[int] = None + # individual_count: int | None = None # total_count = 0 # for observable_str in observable_counts: # self.logger.debug( @@ -802,7 +767,7 @@ def test(self, max_sleep: int = TimeoutConfig.MAX_SLEEP.value, raise_on_exc: boo ) # initialize result as None - result: Optional[IntegrationTestResult] = None + result: IntegrationTestResult | None = None # keep track of time slept and number of attempts for exponential backoff (base 2) elapsed_sleep_time = 0 diff --git a/contentctl/objects/detection_tags.py b/contentctl/objects/detection_tags.py index c8dce678..b1d489f4 100644 --- a/contentctl/objects/detection_tags.py +++ b/contentctl/objects/detection_tags.py @@ -79,7 +79,7 @@ def severity(self)->RiskSeverity: security_domain: SecurityDomain = Field(...) cve: List[CVE_TYPE] = [] atomic_guid: List[AtomicTest] = [] - drilldown_search: Optional[str] = None + # enrichment mitre_attack_enrichments: List[MitreAttackEnrichment] = Field([], validate_default=True) @@ -114,7 +114,7 @@ def cis20(self) -> list[Cis18Value]: # TODO (#268): Validate manual_test has length > 0 if not None manual_test: Optional[str] = None - + # The following validator is temporarily disabled pending further discussions # @validator('message') # def validate_message(cls,v,values): diff --git a/contentctl/objects/drilldown.py b/contentctl/objects/drilldown.py new file mode 100644 index 00000000..3fe41e7c --- /dev/null +++ b/contentctl/objects/drilldown.py @@ -0,0 +1,70 @@ +from __future__ import annotations +from pydantic import BaseModel, Field, model_serializer +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from contentctl.objects.detection import Detection +from contentctl.objects.enums import AnalyticsType +DRILLDOWN_SEARCH_PLACEHOLDER = "%original_detection_search%" +EARLIEST_OFFSET = "$info_min_time$" +LATEST_OFFSET = "$info_max_time$" +RISK_SEARCH = "index = risk starthoursago = 168 endhoursago = 0 | stats count values(search_name) values(risk_message) values(analyticstories) values(annotations._all) values(annotations.mitre_attack.mitre_tactic) " + +class Drilldown(BaseModel): + name: str = Field(..., description="The name of the drilldown search", min_length=5) + search: str = Field(..., description="The text of a drilldown search. This must be valid SPL.", min_length=1) + earliest_offset:None | str = Field(..., + description="Earliest offset time for the drilldown search. " + f"The most common value for this field is '{EARLIEST_OFFSET}', " + "but it is NOT the default value and must be supplied explicitly.", + min_length= 1) + latest_offset:None | str = Field(..., + description="Latest offset time for the driolldown search. " + f"The most common value for this field is '{LATEST_OFFSET}', " + "but it is NOT the default value and must be supplied explicitly.", + min_length= 1) + + @classmethod + def constructDrilldownsFromDetection(cls, detection: Detection) -> list[Drilldown]: + victim_observables = [o for o in detection.tags.observable if o.role[0] == "Victim"] + if len(victim_observables) == 0 or detection.type == AnalyticsType.Hunting: + # No victims, so no drilldowns + return [] + print(f"Adding default drilldowns for [{detection.name}]") + variableNamesString = ' and '.join([f"${o.name}$" for o in victim_observables]) + nameField = f"View the detection results for {variableNamesString}" + appendedSearch = " | search " + ' '.join([f"{o.name} = ${o.name}$" for o in victim_observables]) + search_field = f"{detection.search}{appendedSearch}" + detection_results = cls(name=nameField, earliest_offset=EARLIEST_OFFSET, latest_offset=LATEST_OFFSET, search=search_field) + + + nameField = f"View risk events for the last 7 days for {variableNamesString}" + fieldNamesListString = ', '.join([o.name for o in victim_observables]) + search_field = f"{RISK_SEARCH}by {fieldNamesListString} {appendedSearch}" + risk_events_last_7_days = cls(name=nameField, earliest_offset=None, latest_offset=None, search=search_field) + + return [detection_results,risk_events_last_7_days] + + + def perform_search_substitutions(self, detection:Detection)->None: + """Replaces the field DRILLDOWN_SEARCH_PLACEHOLDER (%original_detection_search%) + with the search contained in the detection. We do this so that the YML does not + need the search copy/pasted from the search field into the drilldown object. + + Args: + detection (Detection): Detection to be used to update the search field of the drilldown + """ + self.search = self.search.replace(DRILLDOWN_SEARCH_PLACEHOLDER, detection.search) + + + @model_serializer + def serialize_model(self) -> dict[str,str]: + #Call serializer for parent + model:dict[str,str] = {} + + model['name'] = self.name + model['search'] = self.search + if self.earliest_offset is not None: + model['earliest_offset'] = self.earliest_offset + if self.latest_offset is not None: + model['latest_offset'] = self.latest_offset + return model \ No newline at end of file diff --git a/contentctl/objects/macro.py b/contentctl/objects/macro.py index 48daf602..ba5faa8f 100644 --- a/contentctl/objects/macro.py +++ b/contentctl/objects/macro.py @@ -10,7 +10,6 @@ from contentctl.input.director import DirectorOutputDto from contentctl.objects.security_content_object import SecurityContentObject - #The following macros are included in commonly-installed apps. #As such, we will ignore if they are missing from our app. #Included in @@ -55,10 +54,15 @@ def get_macros(text_field:str, director:DirectorOutputDto , ignore_macros:set[st #If a comment ENDS in a macro, for example ```this is a comment with a macro `macro_here```` #then there is a small edge case where the regex below does not work properly. If that is #the case, we edit the search slightly to insert a space - text_field = re.sub(r"\`\`\`\`", r"` ```", text_field) - text_field = re.sub(r"\`\`\`.*?\`\`\`", " ", text_field) - + if re.findall(r"\`\`\`\`", text_field): + raise ValueError("Search contained four or more '`' characters in a row which is invalid SPL" + "This may have occurred when a macro was commented out.\n" + "Please ammend your search to remove the substring '````'") + # replace all the macros with a space + text_field = re.sub(r"\`\`\`[\s\S]*?\`\`\`", " ", text_field) + + macros_to_get = re.findall(r'`([^\s]+)`', text_field) #If macros take arguments, stop at the first argument. We just want the name of the macro macros_to_get = set([macro[:macro.find('(')] if macro.find('(') != -1 else macro for macro in macros_to_get]) @@ -68,4 +72,3 @@ def get_macros(text_field:str, director:DirectorOutputDto , ignore_macros:set[st macros_to_get -= macros_to_ignore return Macro.mapNamesToSecurityContentObjects(list(macros_to_get), director) - \ No newline at end of file diff --git a/contentctl/objects/notable_event.py b/contentctl/objects/notable_event.py index d28d4a62..51b9715d 100644 --- a/contentctl/objects/notable_event.py +++ b/contentctl/objects/notable_event.py @@ -1,4 +1,4 @@ -from pydantic import BaseModel +from pydantic import ConfigDict, BaseModel from contentctl.objects.detection import Detection @@ -11,10 +11,11 @@ class NotableEvent(BaseModel): # The search ID that found that generated this risk event orig_sid: str - class Config: - # Allowing fields that aren't explicitly defined to be passed since some of the risk event's - # fields vary depending on the SPL which generated them - extra = 'allow' + # Allowing fields that aren't explicitly defined to be passed since some of the risk event's + # fields vary depending on the SPL which generated them + model_config = ConfigDict( + extra='allow' + ) def validate_against_detection(self, detection: Detection) -> None: raise NotImplementedError() diff --git a/contentctl/objects/risk_analysis_action.py b/contentctl/objects/risk_analysis_action.py index e29939d3..2fa295e4 100644 --- a/contentctl/objects/risk_analysis_action.py +++ b/contentctl/objects/risk_analysis_action.py @@ -1,7 +1,7 @@ from typing import Any import json -from pydantic import BaseModel, validator +from pydantic import BaseModel, field_validator from contentctl.objects.risk_object import RiskObject from contentctl.objects.threat_object import ThreatObject @@ -21,11 +21,11 @@ class RiskAnalysisAction(BaseModel): risk_objects: list[RiskObject] message: str - @validator("message", always=True, pre=True) + @field_validator("message", mode="before") @classmethod - def _validate_message(cls, v, values) -> str: + def _validate_message(cls, v: Any) -> str: """ - Validate splunk_path and derive if None + Validate message and derive if None """ if v is None: raise ValueError( diff --git a/contentctl/objects/risk_event.py b/contentctl/objects/risk_event.py index 7d30d324..de98bd0b 100644 --- a/contentctl/objects/risk_event.py +++ b/contentctl/objects/risk_event.py @@ -1,7 +1,7 @@ import re +from functools import cached_property -from pydantic import BaseModel, Field, PrivateAttr, field_validator, computed_field - +from pydantic import ConfigDict, BaseModel, Field, PrivateAttr, field_validator, computed_field from contentctl.objects.errors import ValidationFailed from contentctl.objects.detection import Detection from contentctl.objects.observable import Observable @@ -85,10 +85,11 @@ class RiskEvent(BaseModel): # Private attribute caching the observable this RiskEvent is mapped to _matched_observable: Observable | None = PrivateAttr(default=None) - class Config: - # Allowing fields that aren't explicitly defined to be passed since some of the risk event's - # fields vary depending on the SPL which generated them - extra = "allow" + # Allowing fields that aren't explicitly defined to be passed since some of the risk event's + # fields vary depending on the SPL which generated them + model_config = ConfigDict( + extra="allow" + ) @field_validator("annotations_mitre_attack", "analyticstories", mode="before") @classmethod @@ -103,7 +104,7 @@ def _convert_str_value_to_singleton(cls, v: str | list[str]) -> list[str]: return [v] @computed_field - @property + @cached_property def source_field_name(self) -> str: """ A cached derivation of the source field name the risk event corresponds to in the relevant diff --git a/contentctl/output/templates/savedsearches_detections.j2 b/contentctl/output/templates/savedsearches_detections.j2 index f2f345aa..396bb2c6 100644 --- a/contentctl/output/templates/savedsearches_detections.j2 +++ b/contentctl/output/templates/savedsearches_detections.j2 @@ -112,7 +112,8 @@ alert.suppress.fields = {{ detection.tags.throttling.conf_formatted_fields() }} alert.suppress.period = {{ detection.tags.throttling.period }} {% endif %} search = {{ detection.search | escapeNewlines() }} - +action.notable.param.drilldown_searches = {{ detection.drilldowns_in_JSON | tojson | escapeNewlines() }} {% endif %} + {% endfor %} ### END {{ app.label }} DETECTIONS ### diff --git a/contentctl/templates/detections/endpoint/anomalous_usage_of_7zip.yml b/contentctl/templates/detections/endpoint/anomalous_usage_of_7zip.yml index 1a4af7b1..a101fd7d 100644 --- a/contentctl/templates/detections/endpoint/anomalous_usage_of_7zip.yml +++ b/contentctl/templates/detections/endpoint/anomalous_usage_of_7zip.yml @@ -29,6 +29,15 @@ references: - https://attack.mitre.org/techniques/T1560/001/ - https://www.microsoft.com/security/blog/2021/01/20/deep-dive-into-the-solorigate-second-stage-activation-from-sunburst-to-teardrop-and-raindrop/ - https://thedfirreport.com/2021/01/31/bazar-no-ryuk/ +drilldown_searches: +- name: View the detection results for $user$ and $dest$ + search: '%original_detection_search% | search user = $user$ dest = $dest$' + earliest_offset: $info_min_time$ + latest_offset: $info_max_time$ +- name: View risk events for the last 7 days for $user$ and $dest$ + search: '| from datamodel Risk.All_Risk | search normalized_risk_object IN ($user$, $dest$) starthoursago=168 endhoursago=1 | stats count min(_time) as firstTime max(_time) as lastTime values(search_name) as "Search Name" values(risk_message) as "Risk Message" values(analyticstories) as "Analytic Stories" values(annotations._all) as "Annotations" values(annotations.mitre_attack.mitre_tactic) as "ATT&CK Tactics" by normalized_risk_object | `security_content_ctime(firstTime)` | `security_content_ctime(lastTime)`' + earliest_offset: $info_min_time$ + latest_offset: $info_max_time$ tags: analytic_story: - Cobalt Strike @@ -80,4 +89,4 @@ tests: attack_data: - data: https://media.githubusercontent.com/media/splunk/attack_data/master/datasets/attack_techniques/T1560.001/archive_utility/windows-sysmon.log source: XmlWinEventLog:Microsoft-Windows-Sysmon/Operational - sourcetype: xmlwineventlog \ No newline at end of file + sourcetype: xmlwineventlog diff --git a/pyproject.toml b/pyproject.toml index b1ed67a7..465ad842 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,7 +15,7 @@ pydantic = "^2.8.2" PyYAML = "^6.0.2" requests = "~2.32.3" pycvesearch = "^1.2" -xmltodict = "^0.13.0" +xmltodict = ">=0.13,<0.15" attackcti = "^0.4.0" Jinja2 = "^3.1.4" questionary = "^2.0.1"