Skip to content

Commit

Permalink
Merge pull request #92 from splunk/feature/risk-message-validation
Browse files Browse the repository at this point in the history
Adding risk message validation++

Merging after an internal testing run gave expected results!
There is still a transient issue we will have to chase down where two scheduled detections failed to run as expected, but this sometimes happens.
  • Loading branch information
pyth0n1c authored Aug 9, 2024
2 parents 54e0f48 + 596edba commit 9b5e02e
Show file tree
Hide file tree
Showing 26 changed files with 1,392 additions and 796 deletions.
8 changes: 8 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
{
"configurations": [
{
"name":"contentctl (pick args)",
"type":"debugpy",
"request":"launch",
"program":"${workspaceFolder}/.venv/bin/contentctl",
"console":"integratedTerminal",
"cwd":"${env:SECURITY_CONTENT_PATH}",
"args":"${command:pickArgs}"},
{
"name": "contentctl init",
"type": "debugpy",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class DetectionTestingInfrastructure(BaseModel, abc.ABC):
hec_channel: str = ""
_conn: client.Service = PrivateAttr()
pbar: tqdm.tqdm = None
start_time: float = None
start_time: Optional[float] = None

class Config:
arbitrary_types_allowed = True
Expand Down Expand Up @@ -136,7 +136,6 @@ def setup(self):
TestReportingType.SETUP,
self.get_name(),
msg,
self.start_time,
update_sync_status=True,
)
func()
Expand All @@ -147,7 +146,7 @@ def setup(self):
self.finish()
return

self.format_pbar_string(TestReportingType.SETUP, self.get_name(), "Finished Setup!", self.start_time)
self.format_pbar_string(TestReportingType.SETUP, self.get_name(), "Finished Setup!")

def wait_for_ui_ready(self):
self.get_conn()
Expand Down Expand Up @@ -216,7 +215,6 @@ def connect_to_api(self, sleep_seconds: int = 5):
TestReportingType.SETUP,
self.get_name(),
"Waiting for reboot",
self.start_time,
update_sync_status=True,
)
else:
Expand All @@ -236,18 +234,12 @@ def connect_to_api(self, sleep_seconds: int = 5):
self.pbar.write(
f"Error getting API connection (not quitting) '{type(e).__name__}': {str(e)}"
)
print("wow")
# self.pbar.write(
# f"Unhandled exception getting connection to splunk server: {str(e)}"
# )
# self.sync_obj.terminate = True

for _ in range(sleep_seconds):
self.format_pbar_string(
TestReportingType.SETUP,
self.get_name(),
"Getting API Connection",
self.start_time,
update_sync_status=True,
)
time.sleep(1)
Expand Down Expand Up @@ -318,7 +310,6 @@ def wait_for_conf_file(self, app_name: str, conf_file_name: str):
TestReportingType.SETUP,
self.get_name(),
"Configuring Datamodels",
self.start_time,
)

def configure_conf_file_datamodels(self, APP_NAME: str = "Splunk_SA_CIM"):
Expand Down Expand Up @@ -424,7 +415,7 @@ def test_detection(self, detection: Detection) -> None:
TestReportingType.GROUP,
test_group.name,
FinalTestingStates.SKIP.value,
time.time(),
start_time=time.time(),
set_pbar=False,
)
)
Expand Down Expand Up @@ -465,7 +456,7 @@ def test_detection(self, detection: Detection) -> None:
TestReportingType.GROUP,
test_group.name,
TestingStates.DONE_GROUP.value,
setup_results.start_time,
start_time=setup_results.start_time,
set_pbar=False,
)
)
Expand All @@ -486,7 +477,7 @@ def setup_test_group(self, test_group: TestGroup) -> SetupTestGroupResults:
TestReportingType.GROUP,
test_group.name,
TestingStates.BEGINNING_GROUP.value,
setup_start_time
start_time=setup_start_time
)
# https://github.com/WoLpH/python-progressbar/issues/164
# Use NullBar if there is more than 1 container or we are running
Expand Down Expand Up @@ -526,7 +517,7 @@ def cleanup_test_group(
TestReportingType.GROUP,
test_group.name,
TestingStates.DELETING.value,
test_group_start_time,
start_time=test_group_start_time,
)

# TODO: do we want to clean up even if replay failed? Could have been partial failure?
Expand All @@ -544,7 +535,7 @@ def format_pbar_string(
test_reporting_type: TestReportingType,
test_name: str,
state: str,
start_time: Union[float, None],
start_time: Optional[float] = None,
set_pbar: bool = True,
update_sync_status: bool = False,
) -> str:
Expand All @@ -559,8 +550,13 @@ def format_pbar_string(
:param update_sync_status: bool indicating whether a sync status update should be queued
:returns: a formatted string for use w/ pbar
"""
# set start time id not provided
# set start time if not provided
if start_time is None:
# if self.start_time is still None, something went wrong
if self.start_time is None:
raise ValueError(
"self.start_time is still None; a function may have been called before self.setup()"
)
start_time = self.start_time

# invoke the helper method
Expand All @@ -575,7 +571,7 @@ def format_pbar_string(

# update sync status if needed
if update_sync_status:
self.sync_obj.currentTestingQueue[self.get_name()] = {
self.sync_obj.currentTestingQueue[self.get_name()] = { # type: ignore
"name": state,
"search": "N/A",
}
Expand Down Expand Up @@ -621,7 +617,7 @@ def execute_unit_test(
TestReportingType.UNIT,
f"{detection.name}:{test.name}",
TestingStates.BEGINNING_TEST,
test_start_time,
start_time=test_start_time,
)

# if the replay failed, record the test failure and return
Expand Down Expand Up @@ -690,14 +686,14 @@ def execute_unit_test(
res = "ERROR"
link = detection.search
else:
res = test.result.status.value.upper()
res = test.result.status.value.upper() # type: ignore
link = test.result.get_summary_dict()["sid_link"]

self.format_pbar_string(
TestReportingType.UNIT,
f"{detection.name}:{test.name}",
f"{res} - {link} (CTRL+D to continue)",
test_start_time,
start_time=test_start_time,
)

# Wait for user input
Expand All @@ -722,7 +718,7 @@ def execute_unit_test(
TestReportingType.UNIT,
f"{detection.name}:{test.name}",
FinalTestingStates.PASS.value,
test_start_time,
start_time=test_start_time,
set_pbar=False,
)
)
Expand All @@ -744,7 +740,7 @@ def execute_unit_test(
TestReportingType.UNIT,
f"{detection.name}:{test.name}",
FinalTestingStates.FAIL.value,
test_start_time,
start_time=test_start_time,
set_pbar=False,
)
)
Expand All @@ -755,7 +751,7 @@ def execute_unit_test(
TestReportingType.UNIT,
f"{detection.name}:{test.name}",
FinalTestingStates.ERROR.value,
test_start_time,
start_time=test_start_time,
set_pbar=False,
)
)
Expand All @@ -770,7 +766,7 @@ def execute_unit_test(
stdout.flush()
test.result.duration = round(time.time() - test_start_time, 2)

# TODO (cmcginley): break up the execute routines for integration/unit tests some more to remove
# TODO (#227): break up the execute routines for integration/unit tests some more to remove
# code w/ similar structure
def execute_integration_test(
self,
Expand Down Expand Up @@ -837,7 +833,7 @@ def execute_integration_test(
TestReportingType.INTEGRATION,
f"{detection.name}:{test.name}",
TestingStates.BEGINNING_TEST,
test_start_time,
start_time=test_start_time,
)

# if the replay failed, record the test failure and return
Expand Down Expand Up @@ -874,15 +870,10 @@ def execute_integration_test(
start_time=test_start_time
)

# TODO (cmcginley): right now, we are creating one CorrelationSearch instance for each
# test case; typically, there is only one unit test, and thus one integration test,
# per detection, so this is not an issue. However, if we start having many test cases
# per detection, we will be duplicating some effort & network calls that we don't need
# to. Consider refactoring in order to re-use CorrelationSearch objects across tests
# in such a case
# TODO (#228): consider reusing CorrelationSearch instances across test cases
# Instantiate the CorrelationSearch
correlation_search = CorrelationSearch(
detection_name=detection.name,
detection=detection,
service=self.get_conn(),
pbar_data=pbar_data,
)
Expand All @@ -892,14 +883,12 @@ def execute_integration_test(
except Exception as e:
# Catch and report and unhandled exceptions in integration testing
test.result = IntegrationTestResult(
message="TEST FAILED: unhandled exception in CorrelationSearch",
message="TEST ERROR: unhandled exception in CorrelationSearch",
exception=e,
status=TestResultStatus.ERROR
)

# TODO (cmcginley): when in interactive mode, consider maybe making the cleanup routine in
# correlation_search happen after the user breaks the interactivity; currently
# risk/notable indexes are dumped before the user can inspect
# TODO (#229): when in interactive mode, cleanup should happen after user interaction
# Pause here if the terminate flag has NOT been set AND either of the below are true:
# 1. the behavior is always_pause
# 2. the behavior is pause_on_failure and the test failed
Expand All @@ -908,7 +897,7 @@ def execute_integration_test(
if test.result is None:
res = "ERROR"
else:
res = test.result.status.value.upper()
res = test.result.status.value.upper() # type: ignore

# Get the link to the saved search in this specific instance
link = f"https://{self.infrastructure.instance_address}:{self.infrastructure.web_ui_port}"
Expand All @@ -917,7 +906,7 @@ def execute_integration_test(
TestReportingType.INTEGRATION,
f"{detection.name}:{test.name}",
f"{res} - {link} (CTRL+D to continue)",
test_start_time,
start_time=test_start_time,
)

# Wait for user input
Expand Down Expand Up @@ -1036,8 +1025,8 @@ def retry_search_until_timeout(
search = f"{detection.search} {test.pass_condition}"

# Ensure searches that do not begin with '|' must begin with 'search '
if not search.strip().startswith("|"):
if not search.strip().startswith("search "):
if not search.strip().startswith("|"): # type: ignore
if not search.strip().startswith("search "): # type: ignore
search = f"search {search}"

# exponential backoff for wait time
Expand All @@ -1054,7 +1043,7 @@ def retry_search_until_timeout(
TestReportingType.UNIT,
f"{detection.name}:{test.name}",
TestingStates.PROCESSING.value,
start_time
start_time=start_time
)

time.sleep(1)
Expand All @@ -1063,7 +1052,7 @@ def retry_search_until_timeout(
TestReportingType.UNIT,
f"{detection.name}:{test.name}",
TestingStates.SEARCHING.value,
start_time,
start_time=start_time,
)

# Execute the search and read the results
Expand All @@ -1079,7 +1068,7 @@ def retry_search_until_timeout(
test.result = UnitTestResult()

# Initialize the collection of fields that are empty that shouldn't be
empty_fields = set()
empty_fields: set[str] = set()

# Filter out any messages in the results
for result in results:
Expand Down Expand Up @@ -1194,10 +1183,15 @@ def replay_attack_data_file(
):
tempfile = mktemp(dir=tmp_dir)


if not (str(attack_data_file.data).startswith("http://") or
str(attack_data_file.data).startswith("https://")) :
if pathlib.Path(str(attack_data_file.data)).is_file():
self.format_pbar_string(TestReportingType.GROUP, test_group.name, "Copying Data", test_group_start_time)
self.format_pbar_string(TestReportingType.GROUP,
test_group.name,
"Copying Data",
test_group_start_time)

try:
copyfile(str(attack_data_file.data), tempfile)
except Exception as e:
Expand All @@ -1221,7 +1215,7 @@ def replay_attack_data_file(
TestReportingType.GROUP,
test_group.name,
TestingStates.DOWNLOADING.value,
test_group_start_time
start_time=test_group_start_time
)

Utils.download_file_from_http(
Expand All @@ -1240,7 +1234,7 @@ def replay_attack_data_file(
TestReportingType.GROUP,
test_group.name,
TestingStates.REPLAYING.value,
test_group_start_time
start_time=test_group_start_time
)

self.hec_raw_replay(tempfile, attack_data_file)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,7 @@ def getSummaryObject(
total_pass, total_detections-total_skipped, 1
)

# TODO (cmcginley): add stats around total test cases and unit/integration test
# sucess/failure? maybe configurable reporting? add section to summary called
# "testwise_summary" listing per test metrics (e.g. total test, total tests passed, ...);
# also list num skipped at both detection and test level
# TODO (#230): expand testing metrics reported
# Construct and return the larger results dict
result_dict = {
"summary": {
Expand Down
Loading

0 comments on commit 9b5e02e

Please sign in to comment.