From 50704d2881329849d4d5e6ecb4acd01047200e23 Mon Sep 17 00:00:00 2001 From: pyth0n1c Date: Tue, 15 Oct 2024 21:03:19 -0400 Subject: [PATCH] Throw much better and descriptive exception when triyng to replay to a custom_index that does not exist on the target server. list out the attempted index and all indexes on the server for documentation purposes. --- .../DetectionTestingInfrastructure.py | 78 ++++++++++--------- 1 file changed, 42 insertions(+), 36 deletions(-) diff --git a/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructure.py b/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructure.py index 71583634..8e816025 100644 --- a/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructure.py +++ b/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructure.py @@ -68,6 +68,15 @@ class CannotRunBaselineException(Exception): # exception pass +class ReplayIndexDoesNotExistOnServer(Exception): + ''' + In order to replay data files into the Splunk Server + for testing, they must be replayed into an index that + exists. If that index does not exist, this error will + be generated and raised before we try to do anything else + with that Data File. + ''' + pass @dataclasses.dataclass(frozen=False) class DetectionTestingManagerOutputDto(): @@ -75,7 +84,7 @@ class DetectionTestingManagerOutputDto(): outputQueue: list[Detection] = Field(default_factory=list) currentTestingQueue: dict[str, Union[Detection, None]] = Field(default_factory=dict) start_time: Union[datetime.datetime, None] = None - replay_index: str = "CONTENTCTL_TESTING_INDEX" + replay_index: str = "contentctl_testing_index" replay_host: str = "CONTENTCTL_HOST" timeout_seconds: int = 60 terminate: bool = False @@ -88,6 +97,7 @@ class DetectionTestingInfrastructure(BaseModel, abc.ABC): sync_obj: DetectionTestingManagerOutputDto hec_token: str = "" hec_channel: str = "" + all_indexes_on_server: list[str] = [] _conn: client.Service = PrivateAttr() pbar: tqdm.tqdm = None start_time: Optional[float] = None @@ -131,6 +141,7 @@ def setup(self): (self.get_conn, "Waiting for App Installation"), (self.configure_conf_file_datamodels, "Configuring Datamodels"), (self.create_replay_index, f"Create index '{self.sync_obj.replay_index}'"), + (self.get_all_indexes, "Getting all indexes from server"), (self.configure_imported_roles, "Configuring Roles"), (self.configure_delete_indexes, "Configuring Indexes"), (self.configure_hec, "Configuring HEC"), @@ -169,14 +180,11 @@ def configure_hec(self): pass try: - # Retrieve all available indexes on the splunk instance - all_indexes = self.get_all_indexes() - res = self.get_conn().inputs.create( name="DETECTION_TESTING_HEC", kind="http", index=self.sync_obj.replay_index, - indexes=",".join(all_indexes), # This allows the HEC to write to all indexes + indexes=",".join(self.all_indexes_on_server), # This allows the HEC to write to all indexes useACK=True, ) self.hec_token = str(res.token) @@ -185,17 +193,20 @@ def configure_hec(self): except Exception as e: raise (Exception(f"Failure creating HEC Endpoint: {str(e)}")) - def get_all_indexes(self) -> list[str]: + def get_all_indexes(self) -> None: """ Retrieve a list of all indexes in the Splunk instance """ try: - # Always include the special, default replay index here - indexes = [self.sync_obj.replay_index] + # We do not include the replay index because by + # the time we get to this function, it has already + # been created on the server. + indexes = [] res = self.get_conn().indexes for index in res.list(): indexes.append(index.name) - return indexes + # Retrieve all available indexes on the splunk instance + self.all_indexes_on_server = indexes except Exception as e: raise (Exception(f"Failure getting indexes: {str(e)}")) @@ -281,11 +292,7 @@ def configure_imported_roles( self, imported_roles: list[str] = ["user", "power", "can_delete"], enterprise_security_roles: list[str] = ["ess_admin", "ess_analyst", "ess_user"], - indexes: list[str] = ["_*", "*"], - ): - indexes.append(self.sync_obj.replay_index) - indexes_encoded = ";".join(indexes) - + ): try: # Set which roles should be configured. For Enterprise Security/Integration Testing, # we must add some extra foles. @@ -297,7 +304,7 @@ def configure_imported_roles( self.get_conn().roles.post( self.infrastructure.splunk_app_username, imported_roles=roles, - srchIndexesAllowed=indexes_encoded, + srchIndexesAllowed=";".join(self.all_indexes_on_server), srchIndexesDefault=self.sync_obj.replay_index, ) return @@ -309,19 +316,17 @@ def configure_imported_roles( self.get_conn().roles.post( self.infrastructure.splunk_app_username, imported_roles=imported_roles, - srchIndexesAllowed=indexes_encoded, + srchIndexesAllowed=";".join(self.all_indexes_on_server), srchIndexesDefault=self.sync_obj.replay_index, ) - def configure_delete_indexes(self, indexes: list[str] = ["_*", "*"]): - indexes.append(self.sync_obj.replay_index) + def configure_delete_indexes(self): endpoint = "/services/properties/authorize/default/deleteIndexesAllowed" - indexes_encoded = ";".join(indexes) try: - self.get_conn().post(endpoint, value=indexes_encoded) + self.get_conn().post(endpoint, value=";".join(self.all_indexes_on_server)) except Exception as e: self.pbar.write( - f"Error configuring deleteIndexesAllowed with '{indexes_encoded}': [{str(e)}]" + f"Error configuring deleteIndexesAllowed with '{self.all_indexes_on_server}': [{str(e)}]" ) def wait_for_conf_file(self, app_name: str, conf_file_name: str): @@ -670,8 +675,6 @@ def execute_unit_test( # Set the mode and timeframe, if required kwargs = {"exec_mode": "blocking"} - - # Set earliest_time and latest_time appropriately if FORCE_ALL_TIME is False if not FORCE_ALL_TIME: if test.earliest_time is not None: @@ -1051,8 +1054,8 @@ def retry_search_until_timeout( # Get the start time and compute the timeout search_start_time = time.time() search_stop_time = time.time() + self.sync_obj.timeout_seconds - - # Make a copy of the search string since we may + + # Make a copy of the search string since we may # need to make some small changes to it below search = detection.search @@ -1104,8 +1107,6 @@ def retry_search_until_timeout( # Initialize the collection of fields that are empty that shouldn't be present_threat_objects: set[str] = set() empty_fields: set[str] = set() - - # Filter out any messages in the results for result in results: @@ -1135,7 +1136,7 @@ def retry_search_until_timeout( # not populated and we should throw an error. This can happen if there is a typo # on a field. In this case, the field will appear but will not contain any values current_empty_fields: set[str] = set() - + for field in observable_fields_set: if result.get(field, 'null') == 'null': if field in risk_object_fields_set: @@ -1155,9 +1156,7 @@ def retry_search_until_timeout( if field in threat_object_fields_set: present_threat_objects.add(field) continue - - # If everything succeeded up until now, and no empty fields are found in the # current result, then the search was a success if len(current_empty_fields) == 0: @@ -1171,8 +1170,7 @@ def retry_search_until_timeout( else: empty_fields = empty_fields.union(current_empty_fields) - - + missing_threat_objects = threat_object_fields_set - present_threat_objects # Report a failure if there were empty fields in a threat object in all results if len(missing_threat_objects) > 0: @@ -1188,7 +1186,6 @@ def retry_search_until_timeout( duration=time.time() - search_start_time, ) return - test.result.set_job_content( job.content, @@ -1249,9 +1246,19 @@ def replay_attack_data_file( test_group: TestGroup, test_group_start_time: float, ): - tempfile = mktemp(dir=tmp_dir) - + # Before attempting to replay the file, ensure that the index we want + # to replay into actuall exists. If not, we should throw a detailed + # exception that can easily be interpreted by the user. + if attack_data_file.custom_index is not None and \ + attack_data_file.custom_index not in self.all_indexes_on_server: + raise ReplayIndexDoesNotExistOnServer( + f"Unable to replay data file {attack_data_file.data} " + f"into index '{attack_data_file.custom_index}'. " + "The index does not exist on the Splunk Server. " + f"The only valid indexes on the server are {self.all_indexes_on_server}" + ) + tempfile = mktemp(dir=tmp_dir) if not (str(attack_data_file.data).startswith("http://") or str(attack_data_file.data).startswith("https://")) : if pathlib.Path(str(attack_data_file.data)).is_file(): @@ -1296,7 +1303,6 @@ def replay_attack_data_file( ) ) - # Upload the data self.format_pbar_string( TestReportingType.GROUP,