Skip to content

Commit

Permalink
Merge branch 'feature/coverage-report' of https://github.com/splunk/c…
Browse files Browse the repository at this point in the history
…ontentctl into feature/coverage-report
  • Loading branch information
cmcginley-splunk committed Aug 27, 2024
2 parents 8f0f20d + 4211063 commit e297044
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,19 @@ class CleanupTestGroupResults(BaseModel):

class ContainerStoppedException(Exception):
pass
class CannotRunBaselineException(Exception):
# Support for testing detections with baselines
# does not currently exist in contentctl.
# As such, whenever we encounter a detection
# with baselines we should generate a descriptive
# exception
pass


@dataclasses.dataclass(frozen=False)
class DetectionTestingManagerOutputDto():
inputQueue: list[Detection] = Field(default_factory=list)
outputQueue: list[Detection] = Field(default_factory=list)
skippedQueue: list[Detection] = Field(default_factory=list)
currentTestingQueue: dict[str, Union[Detection, None]] = Field(default_factory=dict)
start_time: Union[datetime.datetime, None] = None
replay_index: str = "CONTENTCTL_TESTING_INDEX"
Expand Down Expand Up @@ -646,11 +652,7 @@ def execute_unit_test(
# Set the mode and timeframe, if required
kwargs = {"exec_mode": "blocking"}

# Iterate over baselines (if any)
for baseline in test.baselines:
# TODO: this is executing the test, not the baseline...
# TODO: should this be in a try/except if the later call is?
self.retry_search_until_timeout(detection, test, kwargs, test_start_time)


# Set earliest_time and latest_time appropriately if FORCE_ALL_TIME is False
if not FORCE_ALL_TIME:
Expand All @@ -661,7 +663,23 @@ def execute_unit_test(

# Run the detection's search query
try:
# Iterate over baselines (if any)
for baseline in detection.baselines:
raise CannotRunBaselineException("Detection requires Execution of a Baseline, "
"however Baseline execution is not "
"currently supported in contentctl. Mark "
"this as manual_test.")
self.retry_search_until_timeout(detection, test, kwargs, test_start_time)
except CannotRunBaselineException as e:
# Init the test result and record a failure if there was an issue during the search
test.result = UnitTestResult()
test.result.set_job_content(
None,
self.infrastructure,
TestResultStatus.ERROR,
exception=e,
duration=time.time() - test_start_time
)
except ContainerStoppedException as e:
raise e
except Exception as e:
Expand Down Expand Up @@ -1014,18 +1032,15 @@ def retry_search_until_timeout(
"""
# Get the start time and compute the timeout
search_start_time = time.time()
search_stop_time = time.time() + self.sync_obj.timeout_seconds

# We will default to ensuring at least one result exists
if test.pass_condition is None:
search = detection.search
else:
# Else, use the explicit pass condition
search = f"{detection.search} {test.pass_condition}"
search_stop_time = time.time() + self.sync_obj.timeout_seconds

# Make a copy of the search string since we may
# need to make some small changes to it below
search = detection.search

# Ensure searches that do not begin with '|' must begin with 'search '
if not search.strip().startswith("|"): # type: ignore
if not search.strip().startswith("search "): # type: ignore
if not search.strip().startswith("|"):
if not search.strip().startswith("search "):
search = f"search {search}"

# exponential backoff for wait time
Expand Down
13 changes: 10 additions & 3 deletions contentctl/actions/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,24 @@ class TestInputDto:


class Test:
def filter_tests(self, input_dto: TestInputDto) -> None:
"""
If integration testing has NOT been enabled, then skip
all of the integration tests. Otherwise, do nothing
Args:
input_dto (TestInputDto): A configuration of the test and all of the
tests to be run.
"""

def filter_tests(self, input_dto: TestInputDto) -> TestInputDto:
if not input_dto.config.enable_integration_testing:
# Skip all integraiton tests if integration testing is not enabled:
for detection in input_dto.detections:
for test in detection.tests:
if isinstance(test, IntegrationTest):
test.skip("TEST SKIPPED: Skipping all integration tests")

return input_dto


def execute(self, input_dto: TestInputDto) -> bool:
output_dto = DetectionTestingManagerOutputDto()

Expand Down
9 changes: 3 additions & 6 deletions contentctl/contentctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,14 @@ def test_common_func(config:test_common):
test_input_dto = TestInputDto(detections_to_test, config)

t = Test()

# Remove detections or disable tests that we do not want to test (e.g. integration testing is
# disabled)
filted_test_input_dto = t.filter_tests(test_input_dto)
t.filter_tests(test_input_dto)

if config.plan_only:
#Emit the test plan and quit. Do not actually run the test
config.dumpCICDPlanAndQuit(gitServer.getHash(),filted_test_input_dto.detections)
config.dumpCICDPlanAndQuit(gitServer.getHash(),test_input_dto.detections)
return

success = t.execute(filted_test_input_dto)
success = t.execute(test_input_dto)

if success:
#Everything passed!
Expand Down
6 changes: 0 additions & 6 deletions contentctl/objects/unit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,6 @@ class UnitTest(BaseTest):
# The test type (unit)
test_type: TestType = Field(default=TestType.UNIT)

# The condition to check if the search was successful
pass_condition: str | None = None

# Baselines to be run before a unit test
baselines: list[UnitTestBaseline] = []

# The attack data to be ingested for the unit test
attack_data: list[TestAttackData]

Expand Down

0 comments on commit e297044

Please sign in to comment.