Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: validation of detections against cms_main #303

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 30 additions & 10 deletions contentctl/actions/detection_testing/DetectionTestingManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

@dataclass(frozen=False)
class DetectionTestingManagerInputDto:
config: Union[test,test_servers]
config: Union[test, test_servers]
detections: List[Detection]
views: list[DetectionTestingView]

Expand Down Expand Up @@ -65,19 +65,27 @@ def sigint_handler(signum, frame):
print("*******************************")

signal.signal(signal.SIGINT, sigint_handler)

with concurrent.futures.ThreadPoolExecutor(
max_workers=len(self.input_dto.config.test_instances),
) as instance_pool, concurrent.futures.ThreadPoolExecutor(
max_workers=len(self.input_dto.views)
) as view_runner, concurrent.futures.ThreadPoolExecutor(
max_workers=len(self.input_dto.config.test_instances),
) as view_shutdowner:
# Capture any errors for reporting at the end after all threads have been gathered
errors: dict[str, list[Exception]] = {
"INSTANCE SETUP ERRORS": [],
"TESTING ERRORS": [],
"ERRORS DURING VIEW SHUTDOWN": [],
"ERRORS DURING VIEW EXECUTION": [],
}

# Start all the views
future_views = {
view_runner.submit(view.setup): view for view in self.input_dto.views
}

# Configure all the instances
future_instances_setup = {
instance_pool.submit(instance.setup): instance
Expand All @@ -87,10 +95,10 @@ def sigint_handler(signum, frame):
# Wait for all instances to be set up
for future in concurrent.futures.as_completed(future_instances_setup):
try:
result = future.result()
_ = future.result()
except Exception as e:
self.output_dto.terminate = True
print(f"Error setting up container: {str(e)}")
errors["INSTANCE SETUP ERRORS"].append(e)

# Start and wait for all tests to run
if not self.output_dto.terminate:
Expand All @@ -102,10 +110,10 @@ def sigint_handler(signum, frame):
# Wait for execution to finish
for future in concurrent.futures.as_completed(future_instances_execute):
try:
result = future.result()
_ = future.result()
except Exception as e:
self.output_dto.terminate = True
print(f"Error running in container: {str(e)}")
errors["TESTING ERRORS"].append(e)

self.output_dto.terminate = True

Expand All @@ -115,16 +123,28 @@ def sigint_handler(signum, frame):
}
for future in concurrent.futures.as_completed(future_views_shutdowner):
try:
result = future.result()
_ = future.result()
except Exception as e:
print(f"Error stopping view: {str(e)}")
errors["ERRORS DURING VIEW SHUTDOWN"].append(e)

# Wait for original view-related threads to complete
for future in concurrent.futures.as_completed(future_views):
try:
result = future.result()
_ = future.result()
except Exception as e:
print(f"Error running container: {str(e)}")
errors["ERRORS DURING VIEW EXECUTION"].append(e)

# Log any errors
for error_type in errors:
if len(errors[error_type]) > 0:
print()
print(f"[{error_type}]:")
for error in errors[error_type]:
print(f"\t❌ {str(error)}")
if isinstance(error, ExceptionGroup):
for suberror in error.exceptions: # type: ignore
print(f"\t\t❌ {str(suberror)}") # type: ignore
print()

return self.output_dto

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@
from ssl import SSLEOFError, SSLZeroReturnError
from sys import stdout
from shutil import copyfile
from typing import Union, Optional
from typing import Union, Optional, Callable

from pydantic import ConfigDict, BaseModel, PrivateAttr, Field, dataclasses
from pydantic import ConfigDict, BaseModel, PrivateAttr, Field, dataclasses, computed_field
import requests # type: ignore
import splunklib.client as client # type: ignore
from splunklib.binding import HTTPError # type: ignore
from splunklib.results import JSONResultsReader, Message # type: ignore
import splunklib.results
from urllib3 import disable_warnings
import urllib.parse
from semantic_version import Version # type: ignore

from contentctl.objects.config import test_common, Infrastructure
from contentctl.objects.config import test_common, Infrastructure, All
from contentctl.objects.enums import PostTestBehavior, AnalyticsType
from contentctl.objects.detection import Detection
from contentctl.objects.base_test import BaseTest
Expand All @@ -34,6 +34,7 @@
from contentctl.objects.test_group import TestGroup
from contentctl.objects.base_test_result import TestResultStatus
from contentctl.objects.correlation_search import CorrelationSearch, PbarData
from contentctl.objects.content_versioning_service import ContentVersioningService
from contentctl.helper.utils import Utils
from contentctl.actions.detection_testing.progress_bar import (
format_pbar_string,
Expand All @@ -42,6 +43,9 @@
TestingStates
)

# The app name of ES; needed to check ES version
ES_APP_NAME = "SplunkEnterpriseSecuritySuite"


class SetupTestGroupResults(BaseModel):
exception: Union[Exception, None] = None
Expand All @@ -60,6 +64,8 @@ class CleanupTestGroupResults(BaseModel):

class ContainerStoppedException(Exception):
pass


class CannotRunBaselineException(Exception):
# Support for testing detections with baselines
# does not currently exist in contentctl.
Expand Down Expand Up @@ -135,19 +141,25 @@ def setup(self):
)

self.start_time = time.time()
try:
for func, msg in [
(self.start, "Starting"),
(self.get_conn, "Waiting for App Installation"),
(self.configure_conf_file_datamodels, "Configuring Datamodels"),
(self.create_replay_index, f"Create index '{self.sync_obj.replay_index}'"),
(self.get_all_indexes, "Getting all indexes from server"),
(self.configure_imported_roles, "Configuring Roles"),
(self.configure_delete_indexes, "Configuring Indexes"),
(self.configure_hec, "Configuring HEC"),
(self.wait_for_ui_ready, "Finishing Setup")
]:

# Init the list of setup functions we always need
primary_setup_functions: list[tuple[Callable[[], None | client.Service], str]] = [
(self.start, "Starting"),
(self.get_conn, "Waiting for App Installation"),
(self.configure_conf_file_datamodels, "Configuring Datamodels"),
(self.create_replay_index, f"Create index '{self.sync_obj.replay_index}'"),
(self.get_all_indexes, "Getting all indexes from server"),
(self.check_for_es_install, "Checking for ES Install"),
(self.configure_imported_roles, "Configuring Roles"),
(self.configure_delete_indexes, "Configuring Indexes"),
(self.configure_hec, "Configuring HEC"),
(self.wait_for_ui_ready, "Finishing Primary Setup")
]

# Execute and report on each setup function
try:
# Run the primary setup functions
for func, msg in primary_setup_functions:
self.format_pbar_string(
TestReportingType.SETUP,
self.get_name(),
Expand All @@ -157,16 +169,114 @@ def setup(self):
func()
self.check_for_teardown()

# Run any setup functions only applicable to content versioning validation
if self.should_test_content_versioning:
self.pbar.write(
self.format_pbar_string(
TestReportingType.SETUP,
self.get_name(),
"Beginning Content Versioning Validation...",
set_pbar=False
)
)
for func, msg in self.content_versioning_service.setup_functions:
self.format_pbar_string(
TestReportingType.SETUP,
self.get_name(),
msg,
update_sync_status=True,
)
func()
self.check_for_teardown()

except Exception as e:
self.pbar.write(str(e))
msg = f"[{self.get_name()}]: {str(e)}"
self.finish()
return
if isinstance(e, ExceptionGroup):
raise ExceptionGroup(msg, e.exceptions) from e # type: ignore
raise Exception(msg) from e

self.format_pbar_string(TestReportingType.SETUP, self.get_name(), "Finished Setup!")
self.pbar.write(
self.format_pbar_string(
TestReportingType.SETUP,
self.get_name(),
"Finished Setup!",
set_pbar=False
)
)

def wait_for_ui_ready(self):
self.get_conn()

@computed_field
@property
def content_versioning_service(self) -> ContentVersioningService:
"""
A computed field returning a handle to the content versioning service, used by ES to
version detections. We use this model to validate that all detections have been installed
compatibly with ES versioning.

:return: a handle to the content versioning service on the instance
:rtype: :class:`contentctl.objects.content_versioning_service.ContentVersioningService`
"""
return ContentVersioningService(
global_config=self.global_config,
infrastructure=self.infrastructure,
service=self.get_conn(),
detections=self.sync_obj.inputQueue
)

@property
def should_test_content_versioning(self) -> bool:
"""
Indicates whether we should test content versioning. Content versioning
should be tested when integration testing is enabled, the mode is all, and ES is at least
version 8.0.0.

:return: a bool indicating whether we should test content versioning
:rtype: bool
"""
es_version = self.es_version
return (
self.global_config.enable_integration_testing
and isinstance(self.global_config.mode, All)
and es_version is not None
and es_version >= Version("8.0.0")
)

@property
def es_version(self) -> Version | None:
"""
Returns the version of Enterprise Security installed on the instance; None if not installed.

:return: the version of ES, as a semver aware object
:rtype: :class:`semantic_version.Version`
"""
if not self.es_installed:
return None
return Version(self.get_conn().apps[ES_APP_NAME]["version"]) # type: ignore

@property
def es_installed(self) -> bool:
"""
Indicates whether ES is installed on the instance.

:return: a bool indicating whether ES is installed or not
:rtype: bool
"""
return ES_APP_NAME in self.get_conn().apps

def check_for_es_install(self) -> None:
"""
Validating function which raises an error if Enterprise Security is not installed and
integration testing is enabled.
"""
if not self.es_installed and self.global_config.enable_integration_testing:
raise Exception(
"Enterprise Security does not appear to be installed on this instance and "
"integration testing is enabled."
)

def configure_hec(self):
self.hec_channel = str(uuid.uuid4())
try:
Expand Down Expand Up @@ -292,15 +402,16 @@ def configure_imported_roles(
self,
imported_roles: list[str] = ["user", "power", "can_delete"],
enterprise_security_roles: list[str] = ["ess_admin", "ess_analyst", "ess_user"],
):
try:
# Set which roles should be configured. For Enterprise Security/Integration Testing,
# we must add some extra foles.
if self.global_config.enable_integration_testing:
roles = imported_roles + enterprise_security_roles
else:
roles = imported_roles
):

# Set which roles should be configured. For Enterprise Security/Integration Testing,
# we must add some extra foles.
if self.global_config.enable_integration_testing:
roles = imported_roles + enterprise_security_roles
else:
roles = imported_roles

try:
self.get_conn().roles.post(
self.infrastructure.splunk_app_username,
imported_roles=roles,
Expand All @@ -309,16 +420,9 @@ def configure_imported_roles(
)
return
except Exception as e:
self.pbar.write(
f"The following role(s) do not exist:'{enterprise_security_roles}: {str(e)}"
)

self.get_conn().roles.post(
self.infrastructure.splunk_app_username,
imported_roles=imported_roles,
srchIndexesAllowed=";".join(self.all_indexes_on_server),
srchIndexesDefault=self.sync_obj.replay_index,
)
msg = f"Error configuring roles: {str(e)}"
self.pbar.write(msg)
raise Exception(msg) from e

def configure_delete_indexes(self):
endpoint = "/services/properties/authorize/default/deleteIndexesAllowed"
Expand Down Expand Up @@ -1219,7 +1323,7 @@ def delete_attack_data(self, attack_data_files: list[TestAttackData]):
job = self.get_conn().jobs.create(splunk_search, **kwargs)
results_stream = job.results(output_mode="json")
# TODO: should we be doing something w/ this reader?
_ = splunklib.results.JSONResultsReader(results_stream)
_ = JSONResultsReader(results_stream)

except Exception as e:
raise (
Expand Down Expand Up @@ -1423,6 +1527,7 @@ def hec_raw_replay(
def status(self):
pass

# TODO (cmcginley): the finish function doesn't actually stop execution
def finish(self):
self.pbar.bar_format = f"Finished running tests on instance: [{self.get_name()}]"
self.pbar.update()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ def get_docker_client(self):
raise (Exception(f"Failed to get docker client: {str(e)}"))

def check_for_teardown(self):

try:
container: docker.models.containers.Container = self.get_docker_client().containers.get(self.get_name())
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ def showStatus(self, interval: int = 1):
while True:
summary = self.getSummaryObject()

# TODO (cmcginley): there's a 1-off error here I think (we show one more than we
# actually have during testing)
total = len(
summary.get("tested_detections", [])
+ summary.get("untested_detections", [])
Expand Down
Loading
Loading