diff --git a/.github/workflows/test_against_escu.yml b/.github/workflows/test_against_escu.yml new file mode 100644 index 00000000..8acc1cbd --- /dev/null +++ b/.github/workflows/test_against_escu.yml @@ -0,0 +1,69 @@ +# The default branch of security_content should always be correct. +# As such, we should use it in our test workflow, here, to ensure +# that contentctl is also correct and does not throw unexpected errors. + +# We should remember that if contentctl introduces NEW validations that have +# note yet been fixed in security_content, we may see this workflow fail. +name: test_against_escu +on: + push: + pull_request: + types: [opened, reopened] + schedule: + - cron: "44 4 * * *" + +jobs: + smoketest_escu: + strategy: + fail-fast: false + matrix: + python_version: ["3.11", "3.12"] + operating_system: ["ubuntu-20.04", "ubuntu-22.04", "macos-latest", "macos-14"] + #operating_system: ["ubuntu-20.04", "ubuntu-22.04", "macos-latest"] + + + runs-on: ${{ matrix.operating_system }} + steps: + # Checkout the current branch of contentctl repo + - name: Checkout repo + uses: actions/checkout@v4 + + # Checkout the develop (default) branch of security_content + - name: Checkout repo + uses: actions/checkout@v4 + with: + path: security_content + repository: splunk/security_content + + #Install the given version of Python we will test against + - name: Install Required Python Version + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python_version }} + architecture: "x64" + + - name: Install Poetry + run: + python -m pip install poetry + + - name: Install contentctl and activate the shell + run: | + poetry install --no-interaction + + + - name: Clone the AtomicRedTeam Repo (for extended validation) + run: | + cd security_content + git clone --depth 1 https://github.com/redcanaryco/atomic-red-team + + + # We do not separately run validate and build + # since a build ALSO performs a validate + - name: Run contentctl build + run: | + cd security_content + poetry run contentctl build --enrichments + + # Do not run a test - it will take far too long! + # Do not upload any artifacts + diff --git a/.gitignore b/.gitignore index bae4a468..2e4fcc96 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,7 @@ apps* test_results* attack_data* security_content/ +contentctl.yml # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/contentctl/actions/apav_deploy.py b/contentctl/actions/apav_deploy.py deleted file mode 100644 index fcffff97..00000000 --- a/contentctl/actions/apav_deploy.py +++ /dev/null @@ -1,98 +0,0 @@ -import splunklib.client as client -import multiprocessing -import http.server -import time -import sys -import subprocess -import os -class Deploy: - def __init__(self, args): - - - - #First, check to ensure that the legal ack is correct. If not, quit - if args.acs_legal_ack != "Y": - raise(Exception(f"Error - must supply 'acs-legal-ack=Y', not 'acs-legal-ack={args.acs_legal_ack}'")) - - self.acs_legal_ack = args.acs_legal_ack - self.app_package = args.app_package - if not os.path.exists(self.app_package): - raise(Exception(f"Error - app_package file {self.app_package} does not exist")) - self.username = args.username - self.password = args.password - self.server = args.server - - - - self.deploy_to_splunk_cloud() - #self.http_process = self.start_http_server() - - #self.install_app() - - - def deploy_to_splunk_cloud(self): - - commandline = f"acs apps install private --acs-legal-ack={self.acs_legal_ack} "\ - f"--app-package {self.app_package} --server {self.server} --username "\ - f"{self.username} --password {self.password}" - - - try: - res = subprocess.run(args = commandline.split(' '), ) - except Exception as e: - raise(Exception(f"Error deploying to Splunk Cloud Instance: {str(e)}")) - print(res.returncode) - if res.returncode != 0: - raise(Exception("Error deploying to Splunk Cloud Instance. Review output to diagnose error.")) - - ''' - def install_app_local(self) -> bool: - #Connect to the service - time.sleep(1) - #self.http_process.start() - #time.sleep(2) - - - print(f"Connecting to server {self.host}") - try: - service = client.connect(host=self.host, port=self.api_port, username=self.username, password=self.password) - assert isinstance(service, client.Service) - - except Exception as e: - raise(Exception(f"Failure connecting the Splunk Search Head: {str(e)}")) - - - #Install the app - try: - params = {'name': self.server_app_path} - res = service.post('apps/appinstall', **params) - #Check the result? - - print(f"Successfully installed {self.server_app_path}!") - - - - except Exception as e: - raise(Exception(f"Failure installing the app {self.server_app_path}: {str(e)}")) - - - #Query and list all of the installed apps - try: - all_apps = service.apps - except Exception as e: - print(f"Failed listing all apps: {str(e)}") - return False - - print("Installed apps:") - for count, app in enumerate(all_apps): - print("\t{count}. {app.name}") - - - print(f"Installing app {self.path}") - - self.http_process.terminate() - - return True - ''' - - \ No newline at end of file diff --git a/contentctl/actions/api_deploy.py b/contentctl/actions/api_deploy.py deleted file mode 100644 index eb466385..00000000 --- a/contentctl/actions/api_deploy.py +++ /dev/null @@ -1,151 +0,0 @@ -import os -import sys -import json -import requests -from requests.auth import HTTPBasicAuth - -from dataclasses import dataclass -from configparser import RawConfigParser -import splunklib.client as client - -from contentctl.objects.config import Config -import pathlib - -@dataclass(frozen=True) -class API_DeployInputDto: - path: pathlib.Path - config: Config - - -class API_Deploy: - def fix_newlines_in_conf_files(self, conf_path: pathlib.Path) -> RawConfigParser: - parser = RawConfigParser() - with open(conf_path, "r") as conf_data_file: - conf_data = conf_data_file.read() - - # ConfigParser cannot read multipleline strings that simply escape the newline character with \ - # To include a newline, you need to include a space at the beginning of the newline. - # We will simply replace all \NEWLINE with NEWLINESPACE (removing the leading literal \). - # We will discuss whether we intend to make these changes to the underlying conf files - # or just apply the changes here - conf_data = conf_data.replace("\\\n", "\n ") - - parser.read_string(conf_data) - return parser - - def execute(self, input_dto: API_DeployInputDto) -> None: - if len(input_dto.config.deployments.rest_api_deployments) == 0: - raise Exception("No rest_api_deployments defined in 'contentctl.yml'") - app_path = pathlib.Path(input_dto.config.build.path_root)/input_dto.config.build.title - if not app_path.is_dir(): - raise Exception(f"The unpackaged app does not exist at the path {app_path}. Please run 'contentctl build' to generate the app.") - for target in input_dto.config.deployments.rest_api_deployments: - print(f"Deploying '{input_dto.config.build.title}' to target '{target.server}' [{target.description}]") - splunk_args = { - "host": target.server, - "port": target.port, - "username": target.username, - "password": target.password, - "owner": "nobody", - "app": "search", - } - print("Warning - we are currently deploying all content into the 'search' app. " - "At this time, this means the user does not have to install the app " - "manually, but this will change") - service = client.connect(**splunk_args) - - - macros_parser = self.fix_newlines_in_conf_files( - app_path/"default"/"macros.conf" - ) - import tqdm - - bar_format_macros = ( - f"Deploying macros " - + "{percentage:3.0f}%[{bar:20}]" - + "[{n_fmt}/{total_fmt} | ETA: {remaining}]" - ) - bar_format_detections = ( - f"Deploying saved searches" - + "{percentage:3.0f}%[{bar:20}]" - + "[{n_fmt}/{total_fmt} | ETA: {remaining}]" - ) - for section in tqdm.tqdm( - macros_parser.sections(), bar_format=bar_format_macros - ): - try: - service.post("properties/macros", __stanza=section) - service.post("properties/macros/" + section, **macros_parser[section]) - tqdm.tqdm.write(f"Deployed macro [{section}]") - except Exception as e: - tqdm.tqdm.write(f"Error deploying macro {section}: {str(e)}") - - detection_parser = RawConfigParser() - detection_parser = self.fix_newlines_in_conf_files( - app_path/"default"/"savedsearches.conf", - ) - - - for section in tqdm.tqdm( - detection_parser.sections(), bar_format=bar_format_detections - ): - try: - if section.startswith(input_dto.config.build.prefix): - params = detection_parser[section] - params["name"] = section - response_actions = [] - if ( - input_dto.config.detection_configuration.notable - and input_dto.config.detection_configuration.notable.rule_description - ): - response_actions.append("notable") - if ( - input_dto.config.detection_configuration.rba - and input_dto.config.detection_configuration.rba.enabled - ): - response_actions.append("risk") - params["actions"] = ",".join(response_actions) - params["request.ui_dispatch_app"] = "ES Content Updates" - params["request.ui_dispatch_view"] = "ES Content Updates" - params["alert_type"] = params.pop("counttype") - params["alert_comparator"] = params.pop("relation") - params["alert_threshold"] = params.pop("quantity") - params.pop("enablesched") - - try: - service.saved_searches.delete(section) - #tqdm.tqdm.write(f"Deleted old saved search: {section}") - except Exception as e: - #tqdm.tqdm.write(f"Error deleting savedsearch '{section}' :[{str(e)}]") - pass - - service.post("saved/searches", **params) - tqdm.tqdm.write(f"Deployed savedsearch [{section}]") - - except Exception as e: - tqdm.tqdm.write(f"Error deploying saved search {section}: {str(e)}") - - # story_parser = RawConfigParser() - # story_parser.read(os.path.join(input_dto.path, input_dto.config.build.splunk_app.path, "default", "analyticstories.conf")) - - # for section in story_parser.sections(): - # if section.startswith("analytic_story"): - # params = story_parser[section] - # params = dict(params.items()) - # params["spec_version"] = 1 - # params["version"] = 1 - # name = section[17:] - # #service.post('services/analyticstories/configs/analytic_story', name=name, content=json.dumps(params)) - - # url = "https://3.72.220.157:8089/services/analyticstories/configs/analytic_story" - # data = dict() - # data["name"] = name - # data["content"] = params - # print(json.dumps(data)) - # response = requests.post( - # url, - # auth=HTTPBasicAuth('admin', 'fgWFshd0mm7eErMj9qX'), - # data=json.dumps(data), - # verify=False - # ) - # print(response.text) diff --git a/contentctl/actions/acs_deploy.py b/contentctl/actions/deploy_acs.py similarity index 100% rename from contentctl/actions/acs_deploy.py rename to contentctl/actions/deploy_acs.py diff --git a/contentctl/actions/inspect.py b/contentctl/actions/inspect.py index 27210ed2..9c46abae 100644 --- a/contentctl/actions/inspect.py +++ b/contentctl/actions/inspect.py @@ -61,7 +61,7 @@ def inspectAppAPI(self, config: inspect)->str: if not package_path.is_file(): raise Exception(f"Cannot run Appinspect API on App '{config.app.title}' - " f"no package exists as expected path '{package_path}'.\nAre you " - "trying to 'contentctl acs_deploy' the package BEFORE running 'contentctl build'?") + "trying to 'contentctl deploy_acs' the package BEFORE running 'contentctl build'?") files = { "app_package": open(package_path,"rb"), diff --git a/contentctl/actions/new_content.py b/contentctl/actions/new_content.py index e8f6787c..6c155bd0 100644 --- a/contentctl/actions/new_content.py +++ b/contentctl/actions/new_content.py @@ -25,7 +25,8 @@ def buildDetection(self)->dict[str,Any]: answers['date'] = datetime.today().strftime('%Y-%m-%d') answers['author'] = answers['detection_author'] del answers['detection_author'] - answers['data_source'] = answers['data_source'] + answers['data_sources'] = answers['data_source'] + del answers['data_source'] answers['type'] = answers['detection_type'] del answers['detection_type'] answers['status'] = "production" #start everything as production since that's what we INTEND the content to become @@ -49,6 +50,7 @@ def buildDetection(self)->dict[str,Any]: answers['tags']['required_fields'] = ['UPDATE'] answers['tags']['risk_score'] = 'UPDATE (impact * confidence)/100' answers['tags']['security_domain'] = answers['security_domain'] + del answers["security_domain"] answers['tags']['cve'] = ['UPDATE WITH CVE(S) IF APPLICABLE'] #generate the tests section @@ -64,6 +66,7 @@ def buildDetection(self)->dict[str,Any]: ] } ] + del answers["mitre_attack_ids"] return answers def buildStory(self)->dict[str,Any]: @@ -111,12 +114,12 @@ def writeObjectNewContent(self, object: dict, subdirectory_name: str, type: NewC #make sure the output folder exists for this detection output_folder.mkdir(exist_ok=True) - YmlWriter.writeYmlFile(file_path, object) + YmlWriter.writeDetection(file_path, object) print("Successfully created detection " + file_path) elif type == NewContentType.story: file_path = os.path.join(self.output_path, 'stories', self.convertNameToFileName(object['name'], object['tags']['product'])) - YmlWriter.writeYmlFile(file_path, object) + YmlWriter.writeStory(file_path, object) print("Successfully created story " + file_path) else: diff --git a/contentctl/actions/validate.py b/contentctl/actions/validate.py index 90394b96..c7673af4 100644 --- a/contentctl/actions/validate.py +++ b/contentctl/actions/validate.py @@ -23,6 +23,7 @@ def execute(self, input_dto: validate) -> DirectorOutputDto: director_output_dto = DirectorOutputDto(AtomicTest.getAtomicTestsFromArtRepo(repo_path=input_dto.getAtomicRedTeamRepoPath(), enabled=input_dto.enrichments), AttackEnrichment.getAttackEnrichment(input_dto), + CveEnrichment.getCveEnrichment(input_dto), [],[],[],[],[],[],[],[],[]) diff --git a/contentctl/api.py b/contentctl/api.py new file mode 100644 index 00000000..5de988ec --- /dev/null +++ b/contentctl/api.py @@ -0,0 +1,137 @@ +from pathlib import Path +from typing import Any, Union, Type +from contentctl.input.yml_reader import YmlReader +from contentctl.objects.config import test_common, test, test_servers +from contentctl.objects.security_content_object import SecurityContentObject +from contentctl.input.director import DirectorOutputDto + +def config_from_file(path:Path=Path("contentctl.yml"), config: dict[str,Any]={}, + configType:Type[Union[test,test_servers]]=test)->test_common: + + """ + Fetch a configuration object that can be used for a number of different contentctl + operations including validate, build, inspect, test, and test_servers. A file will + be used as the basis for constructing the configuration. + + Args: + path (Path, optional): Relative or absolute path to a contentctl config file. + Defaults to Path("contentctl.yml"), which is the default name and location (in the current directory) + of the configuration files which are automatically generated for contentctl. + config (dict[], optional): Dictionary of values to override values read from the YML + path passed as the first argument. Defaults to {}, an empty dict meaning that nothing + will be overwritten + configType (Type[Union[test,test_servers]], optional): The Config Class to instantiate. + This may be a test or test_servers object. Note that this is NOT an instance of the class. Defaults to test. + Returns: + test_common: Returns a complete contentctl test_common configuration. Note that this configuration + will have all applicable field for validate and build as well, but can also be used for easily + construction a test or test_servers object. + """ + + try: + yml_dict = YmlReader.load_file(path, add_fields=False) + + + except Exception as e: + raise Exception(f"Failed to load contentctl configuration from file '{path}': {str(e)}") + + # Apply settings that have been overridden from the ones in the file + try: + yml_dict.update(config) + except Exception as e: + raise Exception(f"Failed updating dictionary of values read from file '{path}'" + f" with the dictionary of arguments passed: {str(e)}") + + # The function below will throw its own descriptive exception if it fails + configObject = config_from_dict(yml_dict, configType=configType) + + return configObject + + + + +def config_from_dict(config: dict[str,Any]={}, + configType:Type[Union[test,test_servers]]=test)->test_common: + """ + Fetch a configuration object that can be used for a number of different contentctl + operations including validate, build, inspect, test, and test_servers. A dict will + be used as the basis for constructing the configuration. + + Args: + config (dict[str,Any],Optional): If a dictionary is not explicitly passed, then + an empty dict will be used to create a configuration, if possible, from default + values. Note that based on default values in the contentctl/objects/config.py + file, this may raise an exception. If so, please set appropriate default values + in the file above or supply those values via this argument. + configType (Type[Union[test,test_servers]], optional): The Config Class to instantiate. + This may be a test or test_servers object. Note that this is NOT an instance of the class. Defaults to test. + Returns: + test_common: Returns a complete contentctl test_common configuration. Note that this configuration + will have all applicable field for validate and build as well, but can also be used for easily + construction a test or test_servers object. + """ + try: + test_object = configType.model_validate(config) + except Exception as e: + raise Exception(f"Failed to load contentctl configuration from dict:\n{str(e)}") + + return test_object + + +def update_config(config:Union[test,test_servers], **key_value_updates:dict[str,Any])->test_common: + + """Update any relevant keys in a config file with the specified values. + Full validation will be performed after this update and descriptive errors + will be produced + + Args: + config (test_common): A previously-constructed test_common object. This can be + build using the configFromDict or configFromFile functions. + key_value_updates (kwargs, optional): Additional keyword/argument pairs to update + arbitrary fields in the configuration. + + Returns: + test_common: A validated object which has had the relevant fields updated. + Note that descriptive Exceptions will be generated if updated values are either + invalid (have the wrong type, or disallowed values) or you attempt to update + fields that do not exist + """ + # Create a copy so we don't change the underlying model + config_copy = config.model_copy(deep=True) + + # Force validation of assignment since doing so via arbitrary dict can be error prone + # Also, ensure that we do not try to add fields that are not part of the model + config_copy.model_config.update({'validate_assignment': True, 'extra': 'forbid'}) + + + + # Collect any errors that may occur + errors:list[Exception] = [] + + # We need to do this one by one because the extra:forbid argument does not appear to + # be respected at this time. + for key, value in key_value_updates.items(): + try: + setattr(config_copy,key,value) + except Exception as e: + errors.append(e) + if len(errors) > 0: + errors_string = '\n'.join([str(e) for e in errors]) + raise Exception(f"Error(s) updaitng configuration:\n{errors_string}") + + return config_copy + + + +def content_to_dict(director:DirectorOutputDto)->dict[str,list[dict[str,Any]]]: + output_dict:dict[str,list[dict[str,Any]]] = {} + for contentType in ['detections','stories','baselines','investigations', + 'playbooks','macros','lookups','deployments','ssa_detections']: + + output_dict[contentType] = [] + t:list[SecurityContentObject] = getattr(director,contentType) + + for item in t: + output_dict[contentType].append(item.model_dump()) + return output_dict + diff --git a/contentctl/contentctl.py b/contentctl/contentctl.py index e5c3718b..eac735d1 100644 --- a/contentctl/contentctl.py +++ b/contentctl/contentctl.py @@ -1,6 +1,11 @@ -from contentctl.actions.initialize import Initialize +import traceback +import sys +import warnings +import pathlib import tyro -from contentctl.objects.config import init, validate, build, new, deploy_acs, deploy_rest, test, test_servers, inspect, report, test_common, release_notes + +from contentctl.actions.initialize import Initialize +from contentctl.objects.config import init, validate, build, new, deploy_acs, test, test_servers, inspect, report, test_common, release_notes from contentctl.actions.validate import Validate from contentctl.actions.new_content import NewContent from contentctl.actions.detection_testing.GitService import GitService @@ -9,14 +14,10 @@ DirectorOutputDto, Build, ) - from contentctl.actions.test import Test from contentctl.actions.test import TestInputDto from contentctl.actions.reporting import ReportingInputDto, Reporting from contentctl.actions.inspect import Inspect -import sys -import warnings -import pathlib from contentctl.input.yml_reader import YmlReader from contentctl.actions.release_notes import ReleaseNotes @@ -95,13 +96,14 @@ def new_func(config:new): def deploy_acs_func(config:deploy_acs): #This is a bit challenging to get to work with the default values. - raise Exception("deploy acs not yet implemented") - -def deploy_rest_func(config:deploy_rest): - raise Exception("deploy rest not yet implemented") - + raise Exception("deploy acs not yet implemented") def test_common_func(config:test_common): + if type(config) == test: + #construct the container Infrastructure objects + config.getContainerInfrastructureObjects() + #otherwise, they have already been passed as servers + director_output_dto = build_func(config) gitServer = GitService(director=director_output_dto,config=config) detections_to_test = gitServer.getContent() @@ -175,15 +177,14 @@ def main(): "test":test.model_validate(config_obj), "test_servers":test_servers.model_construct(**t.__dict__), "release_notes": release_notes.model_construct(**config_obj), - "deploy_acs": deploy_acs.model_construct(**t.__dict__), - #"deploy_rest":deploy_rest() + "deploy_acs": deploy_acs.model_construct(**t.__dict__) } ) - + config = None try: # Since some model(s) were constructed and not model_validated, we have to catch # warnings again when creating the cli @@ -209,20 +210,23 @@ def main(): elif type(config) == deploy_acs: updated_config = deploy_acs.model_validate(config) deploy_acs_func(updated_config) - elif type(config) == deploy_rest: - deploy_rest_func(config) elif type(config) == test or type(config) == test_servers: - if type(config) == test: - #construct the container Infrastructure objects - config.getContainerInfrastructureObjects() - #otherwise, they have already been passed as servers test_common_func(config) else: raise Exception(f"Unknown command line type '{type(config).__name__}'") except Exception as e: - import traceback - traceback.print_exc() - traceback.print_stack() - #print(e) + if config is None: + print("There was a serious issue where the config file could not be created.\n" + "The entire stack trace is provided below (please include it if filing a bug report).\n") + traceback.print_exc() + elif config.verbose: + print("Verbose error logging is ENABLED.\n" + "The entire stack trace has been provided below (please include it if filing a bug report):\n") + traceback.print_exc() + else: + print("Verbose error logging is DISABLED.\n" + "Please use the --verbose command line argument if you need more context for your error or file a bug report.") + print(e) + sys.exit(1) \ No newline at end of file diff --git a/contentctl/enrichments/cve_enrichment.py b/contentctl/enrichments/cve_enrichment.py index 2d4d824e..eb426623 100644 --- a/contentctl/enrichments/cve_enrichment.py +++ b/contentctl/enrichments/cve_enrichment.py @@ -4,97 +4,62 @@ import os import shelve import time -from typing import Annotated -from pydantic import BaseModel,Field,ConfigDict - +from typing import Annotated, Any, Union, TYPE_CHECKING +from pydantic import BaseModel,Field, computed_field from decimal import Decimal -CVESSEARCH_API_URL = 'https://cve.circl.lu' - -CVE_CACHE_FILENAME = "lookups/CVE_CACHE.db" - -NON_PERSISTENT_CACHE = {} - - -'''''' -@functools.cache -def cvesearch_helper(url:str, cve_id:str, force_cached_or_offline:bool=False, max_api_attempts:int=3, retry_sleep_seconds:int=5): - if max_api_attempts < 1: - raise(Exception(f"The minimum number of CVESearch API attempts is 1. You have passed {max_api_attempts}")) - - if force_cached_or_offline: - if not os.path.exists(CVE_CACHE_FILENAME): - print(f"Cache at {CVE_CACHE_FILENAME} not found - Creating it.") - cache = shelve.open(CVE_CACHE_FILENAME, flag='c', writeback=True) - else: - cache = NON_PERSISTENT_CACHE - if cve_id in cache: - result = cache[cve_id] - #print(f"hit cve_enrichment: {time.time() - start:.2f}") - else: - api_attempts_remaining = max_api_attempts - result = None - while api_attempts_remaining > 0: - api_attempts_remaining -= 1 - try: - cve = cvesearch_id_helper(url) - result = cve.id(cve_id) - break - except Exception as e: - if api_attempts_remaining > 0: - print(f"The option 'force_cached_or_offline' was used, but {cve_id} not found in {CVE_CACHE_FILENAME} and unable to connect to {CVESSEARCH_API_URL}: {str(e)}") - print(f"Retrying the CVESearch API up to {api_attempts_remaining} more times after a sleep of {retry_sleep_seconds} seconds...") - time.sleep(retry_sleep_seconds) - else: - raise(Exception(f"The option 'force_cached_or_offline' was used, but {cve_id} not found in {CVE_CACHE_FILENAME} and unable to connect to {CVESSEARCH_API_URL} after {max_api_attempts} attempts: {str(e)}")) - - if result is None: - raise(Exception(f'CveEnrichment for [ {cve_id} ] failed - CVE does not exist')) - cache[cve_id] = result - - if isinstance(cache, shelve.Shelf): - #close the cache if it was a shelf - cache.close() +from requests.exceptions import ReadTimeout - return result +if TYPE_CHECKING: + from contentctl.objects.config import validate -@functools.cache -def cvesearch_id_helper(url:str): - #The initial CVESearch call takes some time. - #We cache it to avoid making this call each time we need to do a lookup - cve = CVESearch(CVESSEARCH_API_URL) - return cve +CVESSEARCH_API_URL = 'https://cve.circl.lu' class CveEnrichmentObj(BaseModel): id:Annotated[str, "^CVE-[1|2][0-9]{3}-[0-9]+$"] cvss:Annotated[Decimal, Field(ge=.1, le=10, decimal_places=1)] summary:str + + @computed_field + @property + def url(self)->str: + BASE_NVD_URL = "https://nvd.nist.gov/vuln/detail/" + return f"{BASE_NVD_URL}{self.id}" - @staticmethod - def buildEnrichmentOnFailure(id:Annotated[str, "^CVE-[1|2][0-9]{3}-[0-9]+$"], errorMessage:str)->CveEnrichmentObj: - message = f"{errorMessage}. Default CVSS of 5.0 used" - print(message) - return CveEnrichmentObj(id=id, cvss=Decimal(5.0), summary=message) +class CveEnrichment(BaseModel): + use_enrichment: bool = True + cve_api_obj: Union[CVESearch,None] = None + -class CveEnrichment(): - @classmethod - def enrich_cve(cls, cve_id: str, force_cached_or_offline: bool = False, treat_failures_as_warnings:bool=True) -> CveEnrichmentObj: - cve_enriched = dict() - try: - - result = cvesearch_helper(CVESSEARCH_API_URL, cve_id, force_cached_or_offline) - cve_enriched['id'] = cve_id - cve_enriched['cvss'] = result['cvss'] - cve_enriched['summary'] = result['summary'] - except Exception as e: - message = f"issue enriching {cve_id}, with error: {str(e)}" - if treat_failures_as_warnings: - return CveEnrichmentObj.buildEnrichmentOnFailure(id = cve_id, errorMessage=f"WARNING, {message}") - else: - raise ValueError(f"ERROR, {message}") + class Config: + # Arbitrary_types are allowed to let us use the CVESearch Object + arbitrary_types_allowed = True + frozen = True + + + @staticmethod + def getCveEnrichment(config:validate, timeout_seconds:int=10, force_disable_enrichment:bool=True)->CveEnrichment: + if force_disable_enrichment: + return CveEnrichment(use_enrichment=False, cve_api_obj=None) - return CveEnrichmentObj.model_validate(cve_enriched) + if config.enrichments: + try: + cve_api_obj = CVESearch(CVESSEARCH_API_URL, timeout=timeout_seconds) + return CveEnrichment(use_enrichment=True, cve_api_obj=cve_api_obj) + except Exception as e: + raise Exception(f"Error setting CVE_SEARCH API to: {CVESSEARCH_API_URL}: {str(e)}") + return CveEnrichment(use_enrichment=False, cve_api_obj=None) + + + def enrich_cve(self, cve_id:str, raise_exception_on_failure:bool=True)->CveEnrichmentObj: + + if not self.use_enrichment: + return CveEnrichmentObj(id=cve_id,cvss=Decimal(5.0),summary="SUMMARY NOT AVAILABLE! ONLY THE LINK WILL BE USED AT THIS TIME") + else: + print("WARNING - Dynamic enrichment not supported at this time.") + return CveEnrichmentObj(id=cve_id,cvss=Decimal(5.0),summary="SUMMARY NOT AVAILABLE! ONLY THE LINK WILL BE USED AT THIS TIME") + # Depending on needs, we may add dynamic enrichment functionality back to the tool \ No newline at end of file diff --git a/contentctl/input/director.py b/contentctl/input/director.py index eef9879a..72d78996 100644 --- a/contentctl/input/director.py +++ b/contentctl/input/director.py @@ -5,9 +5,8 @@ from pydantic import ValidationError from uuid import UUID from contentctl.input.yml_reader import YmlReader - - - + + from contentctl.objects.detection import Detection from contentctl.objects.story import Story @@ -28,29 +27,69 @@ from contentctl.objects.config import validate - -@dataclass() +@dataclass class DirectorOutputDto: - # Atomic Tests are first because parsing them - # is far quicker than attack_enrichment - atomic_tests: Union[list[AtomicTest],None] - attack_enrichment: AttackEnrichment - detections: list[Detection] - stories: list[Story] - baselines: list[Baseline] - investigations: list[Investigation] - playbooks: list[Playbook] - macros: list[Macro] - lookups: list[Lookup] - deployments: list[Deployment] - ssa_detections: list[SSADetection] - #cve_enrichment: CveEnrichment - - name_to_content_map: dict[str, SecurityContentObject] = field(default_factory=dict) - uuid_to_content_map: dict[UUID, SecurityContentObject] = field(default_factory=dict) - + # Atomic Tests are first because parsing them + # is far quicker than attack_enrichment + atomic_tests: Union[list[AtomicTest],None] + attack_enrichment: AttackEnrichment + cve_enrichment: CveEnrichment + detections: list[Detection] + stories: list[Story] + baselines: list[Baseline] + investigations: list[Investigation] + playbooks: list[Playbook] + macros: list[Macro] + lookups: list[Lookup] + deployments: list[Deployment] + ssa_detections: list[SSADetection] + + name_to_content_map: dict[str, SecurityContentObject] = field(default_factory=dict) + uuid_to_content_map: dict[UUID, SecurityContentObject] = field(default_factory=dict) + + def addContentToDictMappings(self, content: SecurityContentObject): + content_name = content.name + if isinstance(content, SSADetection): + # Since SSA detections may have the same name as ESCU detection, + # for this function we prepend 'SSA ' to the name. + content_name = f"SSA {content_name}" + if content_name in self.name_to_content_map: + raise ValueError( + f"Duplicate name '{content_name}' with paths:\n" + f" - {content.file_path}\n" + f" - {self.name_to_content_map[content_name].file_path}" + ) + elif content.id in self.uuid_to_content_map: + raise ValueError( + f"Duplicate id '{content.id}' with paths:\n" + f" - {content.file_path}\n" + f" - {self.name_to_content_map[content_name].file_path}" + ) + + if isinstance(content, Lookup): + self.lookups.append(content) + elif isinstance(content, Macro): + self.macros.append(content) + elif isinstance(content, Deployment): + self.deployments.append(content) + elif isinstance(content, Playbook): + self.playbooks.append(content) + elif isinstance(content, Baseline): + self.baselines.append(content) + elif isinstance(content, Investigation): + self.investigations.append(content) + elif isinstance(content, Story): + self.stories.append(content) + elif isinstance(content, Detection): + self.detections.append(content) + elif isinstance(content, SSADetection): + self.ssa_detections.append(content) + else: + raise Exception(f"Unknown security content type: {type(content)}") + self.name_to_content_map[content_name] = content + self.uuid_to_content_map[content.id] = content from contentctl.input.ssa_detection_builder import SSADetectionBuilder @@ -60,13 +99,6 @@ class DirectorOutputDto: from contentctl.helper.utils import Utils - - - - - - - class Director(): input_dto: validate output_dto: DirectorOutputDto @@ -77,27 +109,7 @@ class Director(): def __init__(self, output_dto: DirectorOutputDto) -> None: self.output_dto = output_dto self.ssa_detection_builder = SSADetectionBuilder() - - def addContentToDictMappings(self, content:SecurityContentObject): - content_name = content.name - if isinstance(content,SSADetection): - # Since SSA detections may have the same name as ESCU detection, - # for this function we prepend 'SSA ' to the name. - content_name = f"SSA {content_name}" - if content_name in self.output_dto.name_to_content_map: - raise ValueError(f"Duplicate name '{content_name}' with paths:\n" - f" - {content.file_path}\n" - f" - {self.output_dto.name_to_content_map[content_name].file_path}") - elif content.id in self.output_dto.uuid_to_content_map: - raise ValueError(f"Duplicate id '{content.id}' with paths:\n" - f" - {content.file_path}\n" - f" - {self.output_dto.name_to_content_map[content_name].file_path}") - - self.output_dto.name_to_content_map[content_name] = content - self.output_dto.uuid_to_content_map[content.id] = content - - def execute(self, input_dto: validate) -> None: self.input_dto = input_dto @@ -146,50 +158,41 @@ def createSecurityContent(self, contentType: SecurityContentType) -> None: if contentType == SecurityContentType.lookups: lookup = Lookup.model_validate(modelDict,context={"output_dto":self.output_dto, "config":self.input_dto}) - self.output_dto.lookups.append(lookup) - self.addContentToDictMappings(lookup) + self.output_dto.addContentToDictMappings(lookup) elif contentType == SecurityContentType.macros: macro = Macro.model_validate(modelDict,context={"output_dto":self.output_dto}) - self.output_dto.macros.append(macro) - self.addContentToDictMappings(macro) + self.output_dto.addContentToDictMappings(macro) elif contentType == SecurityContentType.deployments: deployment = Deployment.model_validate(modelDict,context={"output_dto":self.output_dto}) - self.output_dto.deployments.append(deployment) - self.addContentToDictMappings(deployment) + self.output_dto.addContentToDictMappings(deployment) elif contentType == SecurityContentType.playbooks: playbook = Playbook.model_validate(modelDict,context={"output_dto":self.output_dto}) - self.output_dto.playbooks.append(playbook) - self.addContentToDictMappings(playbook) + self.output_dto.addContentToDictMappings(playbook) elif contentType == SecurityContentType.baselines: baseline = Baseline.model_validate(modelDict,context={"output_dto":self.output_dto}) - self.output_dto.baselines.append(baseline) - self.addContentToDictMappings(baseline) + self.output_dto.addContentToDictMappings(baseline) elif contentType == SecurityContentType.investigations: investigation = Investigation.model_validate(modelDict,context={"output_dto":self.output_dto}) - self.output_dto.investigations.append(investigation) - self.addContentToDictMappings(investigation) + self.output_dto.addContentToDictMappings(investigation) elif contentType == SecurityContentType.stories: story = Story.model_validate(modelDict,context={"output_dto":self.output_dto}) - self.output_dto.stories.append(story) - self.addContentToDictMappings(story) + self.output_dto.addContentToDictMappings(story) elif contentType == SecurityContentType.detections: - detection = Detection.model_validate(modelDict,context={"output_dto":self.output_dto}) - self.output_dto.detections.append(detection) - self.addContentToDictMappings(detection) + detection = Detection.model_validate(modelDict,context={"output_dto":self.output_dto, "app":self.input_dto.app}) + self.output_dto.addContentToDictMappings(detection) elif contentType == SecurityContentType.ssa_detections: self.constructSSADetection(self.ssa_detection_builder, self.output_dto,str(file)) ssa_detection = self.ssa_detection_builder.getObject() if ssa_detection.status in [DetectionStatus.production.value, DetectionStatus.validation.value]: - self.output_dto.ssa_detections.append(ssa_detection) - self.addContentToDictMappings(ssa_detection) + self.output_dto.addContentToDictMappings(ssa_detection) else: raise Exception(f"Unsupported type: [{contentType}]") @@ -228,6 +231,3 @@ def constructSSADetection(self, builder: SSADetectionBuilder, directorOutput:Dir builder.addMappings() builder.addUnitTest() builder.addRBA() - - - \ No newline at end of file diff --git a/contentctl/objects/abstract_security_content_objects/detection_abstract.py b/contentctl/objects/abstract_security_content_objects/detection_abstract.py index a51eea07..e4ecf82f 100644 --- a/contentctl/objects/abstract_security_content_objects/detection_abstract.py +++ b/contentctl/objects/abstract_security_content_objects/detection_abstract.py @@ -26,7 +26,7 @@ #from contentctl.objects.playbook import Playbook from contentctl.objects.enums import DataSource,ProvidingTechnology -from contentctl.enrichments.cve_enrichment import CveEnrichment, CveEnrichmentObj +from contentctl.enrichments.cve_enrichment import CveEnrichmentObj class Detection_Abstract(SecurityContentObject): @@ -40,7 +40,6 @@ class Detection_Abstract(SecurityContentObject): search: Union[str, dict[str,Any]] = Field(...) how_to_implement: str = Field(..., min_length=4) known_false_positives: str = Field(..., min_length=4) - check_references: bool = False #data_source: Optional[List[DataSource]] = None enabled_by_default: bool = False @@ -54,6 +53,58 @@ class Detection_Abstract(SecurityContentObject): # A list of groups of tests, relying on the same data test_groups: Union[list[TestGroup], None] = Field(None,validate_default=True) + + @field_validator("search", mode="before") + @classmethod + def validate_presence_of_filter_macro(cls, value:Union[str, dict[str,Any]], info:ValidationInfo)->Union[str, dict[str,Any]]: + """ + Validates that, if required to be present, the filter macro is present with the proper name. + The filter macro MUST be derived from the name of the detection + + + Args: + value (Union[str, dict[str,Any]]): The search. It can either be a string (and should be SPL) + or a dict, in which case it is Sigma-formatted. + info (ValidationInfo): The validation info can contain a number of different objects. Today it only contains the director. + + Returns: + Union[str, dict[str,Any]]: The search, either in sigma or SPL format. + """ + + if isinstance(value,dict): + #If the search is a dict, then it is in Sigma format so return it + return value + + # Otherwise, the search is SPL. + + + # In the future, we will may add support that makes the inclusion of the + # filter macro optional or automatically generates it for searches that + # do not have it. For now, continue to require that all searches have a filter macro. + FORCE_FILTER_MACRO = True + if not FORCE_FILTER_MACRO: + return value + + # Get the required macro name, which is derived from the search name. + # Note that a separate validation ensures that the file name matches the content name + name:Union[str,None] = info.data.get("name",None) + if name is None: + #The search was sigma formatted (or failed other validation and was None), so we will not validate macros in it + raise ValueError("Cannot validate filter macro, field 'name' (which is required to validate the macro) was missing from the detection YML.") + + #Get the file name without the extension. Note this is not a full path! + file_name = pathlib.Path(cls.contentNameToFileName(name)).stem + file_name_with_filter = f"`{file_name}_filter`" + + if file_name_with_filter not in value: + raise ValueError(f"Detection does not contain the EXACT filter macro {file_name_with_filter}. " + "This filter macro MUST be present in the search. It usually placed at the end " + "of the search and is useful for environment-specific filtering of False Positive or noisy results.") + + return value + + + @field_validator("test_groups") @classmethod def validate_test_groups(cls, value:Union[None, List[TestGroup]], info:ValidationInfo) -> Union[List[TestGroup], None]: @@ -144,17 +195,30 @@ def mappings(self)->dict[str, List[str]]: macros: list[Macro] = Field([],validate_default=True) lookups: list[Lookup] = Field([],validate_default=True) - @computed_field - @property - def cve_enrichment(self)->List[CveEnrichmentObj]: - raise Exception("CVE Enrichment Functionality not currently supported. It will be re-added at a later time.") - enriched_cves = [] - for cve_id in self.tags.cve: - print(f"\nEnriching {cve_id}\n") - enriched_cves.append(CveEnrichment.enrich_cve(cve_id)) + cve_enrichment: list[CveEnrichmentObj] = Field([], validate_default=True) + + @model_validator(mode="after") + def cve_enrichment_func(self, info:ValidationInfo): + if len(self.cve_enrichment) > 0: + raise ValueError(f"Error, field 'cve_enrichment' should be empty and " + f"dynamically populated at runtime. Instead, this field contained: {self.cve_enrichment}") + + output_dto:Union[DirectorOutputDto,None]= info.context.get("output_dto",None) + if output_dto is None: + raise ValueError("Context not provided to detection model post validator") + + + enriched_cves:list[CveEnrichmentObj] = [] - return enriched_cves + for cve_id in self.tags.cve: + try: + enriched_cves.append(output_dto.cve_enrichment.enrich_cve(cve_id, raise_exception_on_failure=False)) + except Exception as e: + raise ValueError(f"{e}") + self.cve_enrichment = enriched_cves + return self + splunk_app_enrichment: Optional[List[dict]] = None @computed_field @@ -382,11 +446,11 @@ def getDetectionMacros(cls, v:list[str], info:ValidationInfo)->list[Macro]: filter_macro = Macro.model_validate({"name":filter_macro_name, "definition":'search *', "description":'Update this macro to limit the output results to filter out false positives.'}) - director.macros.append(filter_macro) + director.addContentToDictMappings(filter_macro) macros_from_search = Macro.get_macros(search, director) - return macros_from_search + [filter_macro] + return macros_from_search def get_content_dependencies(self)->list[SecurityContentObject]: #Do this separately to satisfy type checker diff --git a/contentctl/objects/abstract_security_content_objects/security_content_object_abstract.py b/contentctl/objects/abstract_security_content_objects/security_content_object_abstract.py index 8f160795..90c5376d 100644 --- a/contentctl/objects/abstract_security_content_objects/security_content_object_abstract.py +++ b/contentctl/objects/abstract_security_content_objects/security_content_object_abstract.py @@ -12,6 +12,7 @@ import abc import uuid import datetime +import pprint from pydantic import BaseModel, field_validator, Field, ValidationInfo, FilePath, HttpUrl, NonNegativeInt, ConfigDict, model_validator, model_serializer from typing import Tuple, Optional, List, Union import pathlib @@ -181,6 +182,22 @@ def create_filename_to_content_dict(all_objects:list[SecurityContentObject_Abstr for object in all_objects: name_dict[str(pathlib.Path(object.file_path))] = object return name_dict + + + def __repr__(self)->str: + # Just use the model_dump functionality that + # has already been written. This loses some of the + # richness where objects reference themselves, but + # is usable + m = self.model_dump() + return pprint.pformat(m, indent=3) + + def __str__(self)->str: + return(self.__repr__()) + + + + diff --git a/contentctl/objects/baseline.py b/contentctl/objects/baseline.py index 91cb8958..ee9e66bf 100644 --- a/contentctl/objects/baseline.py +++ b/contentctl/objects/baseline.py @@ -31,7 +31,6 @@ class Baseline(SecurityContentObject): search: str = Field(..., min_length=4) how_to_implement: str = Field(..., min_length=4) known_false_positives: str = Field(..., min_length=4) - check_references: bool = False #Validation is done in order, this field must be defined first tags: BaselineTags = Field(...) # enrichment diff --git a/contentctl/objects/config.py b/contentctl/objects/config.py index f036d132..20f77ed2 100644 --- a/contentctl/objects/config.py +++ b/contentctl/objects/config.py @@ -154,6 +154,10 @@ class Config_Base(BaseModel): path: DirectoryPath = Field(default=DirectoryPath("."), description="The root of your app.") app:CustomApp = Field(default_factory=CustomApp) + verbose:bool = Field(default=False, description="Enable verbose error logging, including a stacktrace. " + "This option makes debugging contentctl errors much easier, but produces way more " + "output than is useful under most uses cases. " + "Please use this flag if you are submitting a bug report or issue on GitHub.") @field_serializer('path',when_used='always') def serialize_path(path: DirectoryPath)->str: @@ -269,14 +273,6 @@ class Infrastructure(BaseModel): instance_name: str = Field(...) -class deploy_rest(build): - model_config = ConfigDict(use_enum_values=True,validate_default=True, arbitrary_types_allowed=True) - - target:Infrastructure = Infrastructure(instance_name="splunk_target_host", instance_address="localhost") - #This will overwrite existing content without promprting for confirmation - overwrite_existing_content:bool = Field(default=True, description="Overwrite existing macros and savedsearches in your enviornment") - - class Container(Infrastructure): model_config = ConfigDict(use_enum_values=True,validate_default=True, arbitrary_types_allowed=True) instance_address:str = Field(default="localhost", description="Address of your splunk server.") diff --git a/contentctl/objects/detection_tags.py b/contentctl/objects/detection_tags.py index bd3920cb..73849de6 100644 --- a/contentctl/objects/detection_tags.py +++ b/contentctl/objects/detection_tags.py @@ -145,7 +145,7 @@ def serialize_model(self): @model_validator(mode="after") def addAttackEnrichment(self, info:ValidationInfo): if len(self.mitre_attack_enrichments) > 0: - raise ValueError(f"Error, field 'mitre_attack_enrichment' should be empty and dynamically populated at runtime. Instead, this field contained: {str(v)}") + raise ValueError(f"Error, field 'mitre_attack_enrichment' should be empty and dynamically populated at runtime. Instead, this field contained: {self.mitre_attack_enrichments}") output_dto:Union[DirectorOutputDto,None]= info.context.get("output_dto",None) if output_dto is None: diff --git a/contentctl/objects/macro.py b/contentctl/objects/macro.py index 478e5e13..5f01f2d1 100644 --- a/contentctl/objects/macro.py +++ b/contentctl/objects/macro.py @@ -9,13 +9,14 @@ from contentctl.objects.security_content_object import SecurityContentObject - -MACROS_TO_IGNORE = set(["_filter", "drop_dm_object_name"]) -#Should all of the following be included as well? -MACROS_TO_IGNORE.add("get_asset" ) -MACROS_TO_IGNORE.add("get_risk_severity") -MACROS_TO_IGNORE.add("cim_corporate_web_domain_search") -MACROS_TO_IGNORE.add("prohibited_processes") +#The following macros are included in commonly-installed apps. +#As such, we will ignore if they are missing from our app. +#Included in +MACROS_TO_IGNORE = set(["drop_dm_object_name"]) # Part of CIM/Splunk_SA_CIM +MACROS_TO_IGNORE.add("get_asset") #SA-IdentityManagement, part of Enterprise Security +MACROS_TO_IGNORE.add("get_risk_severity") #SA-ThreatIntelligence, part of Enterprise Security +MACROS_TO_IGNORE.add("cim_corporate_web_domain_search") #Part of CIM/Splunk_SA_CIM +#MACROS_TO_IGNORE.add("prohibited_processes") class Macro(SecurityContentObject): diff --git a/contentctl/output/yml_writer.py b/contentctl/output/yml_writer.py index 6ceb02a3..7d71762b 100644 --- a/contentctl/output/yml_writer.py +++ b/contentctl/output/yml_writer.py @@ -8,4 +8,42 @@ class YmlWriter: def writeYmlFile(file_path : str, obj : dict[Any,Any]) -> None: with open(file_path, 'w') as outfile: - yaml.safe_dump(obj, outfile, default_flow_style=False, sort_keys=False) \ No newline at end of file + yaml.safe_dump(obj, outfile, default_flow_style=False, sort_keys=False) + + @staticmethod + def writeDetection(file_path: str, obj: dict[Any,Any]) -> None: + output = dict() + output["name"] = obj["name"] + output["id"] = obj["id"] + output["version"] = obj["version"] + output["date"] = obj["date"] + output["author"] = obj["author"] + output["type"] = obj["type"] + output["status"] = obj["status"] + output["data_source"] = obj['data_sources'] + output["description"] = obj["description"] + output["search"] = obj["search"] + output["how_to_implement"] = obj["how_to_implement"] + output["known_false_positives"] = obj["known_false_positives"] + output["references"] = obj["references"] + output["tags"] = obj["tags"] + output["tests"] = obj["tags"] + + YmlWriter.writeYmlFile(file_path=file_path, obj=output) + + @staticmethod + def writeStory(file_path: str, obj: dict[Any,Any]) -> None: + output = dict() + output['name'] = obj['name'] + output['id'] = obj['id'] + output['version'] = obj['version'] + output['date'] = obj['date'] + output['author'] = obj['author'] + output['description'] = obj['description'] + output['narrative'] = obj['narrative'] + output['references'] = obj['references'] + output['tags'] = obj['tags'] + + YmlWriter.writeYmlFile(file_path=file_path, obj=output) + + diff --git a/contentctl/templates/app_template/default/distsearch.conf b/contentctl/templates/app_template/default/distsearch.conf deleted file mode 100644 index 23129734..00000000 --- a/contentctl/templates/app_template/default/distsearch.conf +++ /dev/null @@ -1,5 +0,0 @@ -[replicationSettings:refineConf] -replicate.analytic_stories = false - -[replicationBlacklist] -excludeESCU = apps[/\\]DA-ESS-ContentUpdate[/\\]lookups[/\\]... diff --git a/pyproject.toml b/pyproject.toml index ba7a0abd..4f67beb5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "contentctl" -version = "4.0.5" +version = "4.1.0" description = "Splunk Content Control Tool" authors = ["STRT "] license = "Apache 2.0" @@ -11,27 +11,26 @@ contentctl = 'contentctl.contentctl:main' [tool.poetry.dependencies] python = "^3.11" -pydantic = "^2.5.1" +pydantic = "^2.7.1" PyYAML = "^6.0.1" requests = "~2.32.2" pycvesearch = "^1.2" xmltodict = "^0.13.0" attackcti = "^0.3.7" -Jinja2 = "^3.1.2" +Jinja2 = "^3.1.4" questionary = "^2.0.1" docker = "^7.1.0" splunk-sdk = "^2.0.1" -validators = "^0.22.0" semantic-version = "^2.10.0" bottle = "^0.12.25" -tqdm = "^4.66.1" +tqdm = "^4.66.4" #splunk-appinspect = "^2.36.0" -pysigma = "^0.10.8" -pysigma-backend-splunk = "^1.0.3" +pysigma = "^0.11.5" +pysigma-backend-splunk = "^1.1.0" pygit2 = "^1.14.1" tyro = "^0.8.3" gitpython = "^3.1.43" -setuptools = "^69.5.1" +setuptools = ">=69.5.1,<71.0.0" [tool.poetry.dev-dependencies] [build-system]