Skip to content

Commit

Permalink
Merge branch 'main' into ruff_config
Browse files Browse the repository at this point in the history
  • Loading branch information
ljstella authored Sep 30, 2024
2 parents 9ad0d96 + 5488ca6 commit 8f1845e
Show file tree
Hide file tree
Showing 37 changed files with 640 additions and 411 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/test_against_escu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@ jobs:
poetry install --no-interaction
- name: Clone the AtomicRedTeam Repo (for extended validation)
- name: Clone the AtomicRedTeam Repo and the Mitre/CTI repos for testing enrichments
run: |
cd security_content
git clone --depth 1 https://github.com/redcanaryco/atomic-red-team
git clone --single-branch https://github.com/redcanaryco/atomic-red-team external_repos/atomic-red-team
git clone --single-branch https://github.com/mitre/cti external_repos/cti
# We do not separately run validate and build
Expand Down
1 change: 1 addition & 0 deletions contentctl/actions/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def execute(self, input_dto: BuildInputDto) -> DirectorOutputDto:
updated_conf_files.update(conf_output.writeObjects(input_dto.director_output_dto.investigations, SecurityContentType.investigations))
updated_conf_files.update(conf_output.writeObjects(input_dto.director_output_dto.lookups, SecurityContentType.lookups))
updated_conf_files.update(conf_output.writeObjects(input_dto.director_output_dto.macros, SecurityContentType.macros))
updated_conf_files.update(conf_output.writeObjects(input_dto.director_output_dto.dashboards, SecurityContentType.dashboards))
updated_conf_files.update(conf_output.writeAppConf())

#Ensure that the conf file we just generated/update is syntactically valid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,17 +269,25 @@ def configure_imported_roles(
):
indexes.append(self.sync_obj.replay_index)
indexes_encoded = ";".join(indexes)

try:
# Set which roles should be configured. For Enterprise Security/Integration Testing,
# we must add some extra foles.
if self.global_config.enable_integration_testing:
roles = imported_roles + enterprise_security_roles
else:
roles = imported_roles

self.get_conn().roles.post(
self.infrastructure.splunk_app_username,
imported_roles=imported_roles + enterprise_security_roles,
imported_roles=roles,
srchIndexesAllowed=indexes_encoded,
srchIndexesDefault=self.sync_obj.replay_index,
)
return
except Exception as e:
self.pbar.write(
f"Enterprise Security Roles do not exist:'{enterprise_security_roles}: {str(e)}"
f"The following role(s) do not exist:'{enterprise_security_roles}: {str(e)}"
)

self.get_conn().roles.post(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,17 @@ def get_docker_client(self):
def check_for_teardown(self):

try:
self.get_docker_client().containers.get(self.get_name())
container: docker.models.containers.Container = self.get_docker_client().containers.get(self.get_name())
except Exception as e:
if self.sync_obj.terminate is not True:
self.pbar.write(
f"Error: could not get container [{self.get_name()}]: {str(e)}"
)
self.sync_obj.terminate = True
else:
if container.status != 'running':
self.sync_obj.terminate = True
self.container = None

if self.sync_obj.terminate:
self.finish()
Expand Down
12 changes: 10 additions & 2 deletions contentctl/actions/new_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ class NewContent:

def buildDetection(self)->dict[str,Any]:
questions = NewContentQuestions.get_questions_detection()
answers = questionary.prompt(questions)
answers: dict[str,str] = questionary.prompt(
questions,
kbi_msg="User did not answer all of the prompt questions. Exiting...")
if not answers:
raise ValueError("User didn't answer one or more questions!")
answers.update(answers)
answers['name'] = answers['detection_name']
del answers['detection_name']
Expand Down Expand Up @@ -70,7 +74,11 @@ def buildDetection(self)->dict[str,Any]:

def buildStory(self)->dict[str,Any]:
questions = NewContentQuestions.get_questions_story()
answers = questionary.prompt(questions)
answers = questionary.prompt(
questions,
kbi_msg="User did not answer all of the prompt questions. Exiting...")
if not answers:
raise ValueError("User didn't answer one or more questions!")
answers['name'] = answers['story_name']
del answers['story_name']
answers['id'] = str(uuid.uuid4())
Expand Down
9 changes: 3 additions & 6 deletions contentctl/actions/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,16 @@
from contentctl.objects.config import validate
from contentctl.enrichments.attack_enrichment import AttackEnrichment
from contentctl.enrichments.cve_enrichment import CveEnrichment
from contentctl.objects.atomic import AtomicTest
from contentctl.objects.atomic import AtomicEnrichment
from contentctl.helper.utils import Utils
from contentctl.objects.data_source import DataSource
from contentctl.helper.splunk_app import SplunkApp


class Validate:
def execute(self, input_dto: validate) -> DirectorOutputDto:

director_output_dto = DirectorOutputDto(
AtomicTest.getAtomicTestsFromArtRepo(
repo_path=input_dto.getAtomicRedTeamRepoPath(),
enabled=input_dto.enrichments,
),
AtomicEnrichment.getAtomicEnrichment(input_dto),
AttackEnrichment.getAttackEnrichment(input_dto),
CveEnrichment.getCveEnrichment(input_dto),
[],
Expand All @@ -30,6 +26,7 @@ def execute(self, input_dto: validate) -> DirectorOutputDto:
[],
[],
[],
[]
)

director = Director(director_output_dto)
Expand Down
3 changes: 3 additions & 0 deletions contentctl/contentctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ def main():
test_common_func(config)
else:
raise Exception(f"Unknown command line type '{type(config).__name__}'")
except FileNotFoundError as e:
print(e)
sys.exit(1)
except Exception as e:
if config is None:
print("There was a serious issue where the config file could not be created.\n"
Expand Down
130 changes: 49 additions & 81 deletions contentctl/enrichments/attack_enrichment.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@

from __future__ import annotations
import csv
import os
import sys
from attackcti import attack_client
import logging
from pydantic import BaseModel, Field
from pydantic import BaseModel
from dataclasses import field
from typing import Annotated,Any
from contentctl.objects.mitre_attack_enrichment import MitreAttackEnrichment
from typing import Any
from pathlib import Path
from contentctl.objects.mitre_attack_enrichment import MitreAttackEnrichment, MitreTactics
from contentctl.objects.config import validate
from contentctl.objects.annotated_types import MITRE_ATTACK_ID_TYPE
logging.getLogger('taxii2client').setLevel(logging.CRITICAL)
Expand All @@ -21,84 +20,82 @@ class AttackEnrichment(BaseModel):
@staticmethod
def getAttackEnrichment(config:validate)->AttackEnrichment:
enrichment = AttackEnrichment(use_enrichment=config.enrichments)
_ = enrichment.get_attack_lookup(str(config.path))
_ = enrichment.get_attack_lookup(config.mitre_cti_repo_path, config.enrichments)
return enrichment

def getEnrichmentByMitreID(self, mitre_id:MITRE_ATTACK_ID_TYPE)->MitreAttackEnrichment:
if not self.use_enrichment:
raise Exception(f"Error, trying to add Mitre Enrichment, but use_enrichment was set to False")
raise Exception("Error, trying to add Mitre Enrichment, but use_enrichment was set to False")

enrichment = self.data.get(mitre_id, None)
if enrichment is not None:
return enrichment
else:
raise Exception(f"Error, Unable to find Mitre Enrichment for MitreID {mitre_id}")

def addMitreIDViaGroupNames(self, technique:dict, tactics:list[str], groupNames:list[str])->None:
def addMitreIDViaGroupNames(self, technique:dict[str,Any], tactics:list[str], groupNames:list[str])->None:
technique_id = technique['technique_id']
technique_obj = technique['technique']
tactics.sort()

if technique_id in self.data:
raise Exception(f"Error, trying to redefine MITRE ID '{technique_id}'")
self.data[technique_id] = MitreAttackEnrichment(mitre_attack_id=technique_id,
mitre_attack_technique=technique_obj,
mitre_attack_tactics=tactics,
mitre_attack_groups=groupNames,
mitre_attack_group_objects=[])
self.data[technique_id] = MitreAttackEnrichment.model_validate({'mitre_attack_id':technique_id,
'mitre_attack_technique':technique_obj,
'mitre_attack_tactics':tactics,
'mitre_attack_groups':groupNames,
'mitre_attack_group_objects':[]})

def addMitreIDViaGroupObjects(self, technique:dict, tactics:list[str], groupObjects:list[dict[str,Any]])->None:
def addMitreIDViaGroupObjects(self, technique:dict[str,Any], tactics:list[MitreTactics], groupDicts:list[dict[str,Any]])->None:
technique_id = technique['technique_id']
technique_obj = technique['technique']
tactics.sort()

groupNames:list[str] = sorted([group['group'] for group in groupObjects])
groupNames:list[str] = sorted([group['group'] for group in groupDicts])

if technique_id in self.data:
raise Exception(f"Error, trying to redefine MITRE ID '{technique_id}'")
self.data[technique_id] = MitreAttackEnrichment(mitre_attack_id=technique_id,
mitre_attack_technique=technique_obj,
mitre_attack_tactics=tactics,
mitre_attack_groups=groupNames,
mitre_attack_group_objects=groupObjects)

self.data[technique_id] = MitreAttackEnrichment.model_validate({'mitre_attack_id': technique_id,
'mitre_attack_technique': technique_obj,
'mitre_attack_tactics': tactics,
'mitre_attack_groups': groupNames,
'mitre_attack_group_objects': groupDicts})


def get_attack_lookup(self, input_path: str, store_csv: bool = False, force_cached_or_offline: bool = False, skip_enrichment:bool = False) -> dict:
if not self.use_enrichment:
return {}
print("Getting MITRE Attack Enrichment Data. This may take some time...")
attack_lookup = dict()
file_path = os.path.join(input_path, "app_template", "lookups", "mitre_enrichment.csv")

if skip_enrichment is True:
print("Skipping enrichment")
def get_attack_lookup(self, input_path: Path, enrichments:bool = False) -> dict[str,MitreAttackEnrichment]:
attack_lookup:dict[str,MitreAttackEnrichment] = {}
if not enrichments:
return attack_lookup

try:

if force_cached_or_offline is True:
raise(Exception("WARNING - Using cached MITRE Attack Enrichment. Attack Enrichment may be out of date. Only use this setting for offline environments and development purposes."))
print(f"\r{'Client'.rjust(23)}: [{0:3.0f}%]...", end="", flush=True)
lift = attack_client()
print(f"\r{'Client'.rjust(23)}: [{100:3.0f}%]...Done!", end="\n", flush=True)
print(f"Performing MITRE Enrichment using the repository at {input_path}...",end="", flush=True)
# The existence of the input_path is validated during cli argument validation, but it is
# possible that the repo is in the wrong format. If the following directories do not
# exist, then attack_client will fall back to resolving via REST API. We do not
# want this as it is slow and error prone, so we will force an exception to
# be generated.
enterprise_path = input_path/"enterprise-attack"
mobile_path = input_path/"ics-attack"
ics_path = input_path/"mobile-attack"
if not (enterprise_path.is_dir() and mobile_path.is_dir() and ics_path.is_dir()):
raise FileNotFoundError("One or more of the following paths does not exist: "
f"{[str(enterprise_path),str(mobile_path),str(ics_path)]}. "
f"Please ensure that the {input_path} directory "
"has been git cloned correctly.")
lift = attack_client(
local_paths= {
"enterprise":str(enterprise_path),
"mobile":str(mobile_path),
"ics":str(ics_path)
}
)

print(f"\r{'Techniques'.rjust(23)}: [{0.0:3.0f}%]...", end="", flush=True)
all_enterprise_techniques = lift.get_enterprise_techniques(stix_format=False)

print(f"\r{'Techniques'.rjust(23)}: [{100:3.0f}%]...Done!", end="\n", flush=True)

print(f"\r{'Relationships'.rjust(23)}: [{0.0:3.0f}%]...", end="", flush=True)
enterprise_relationships = lift.get_enterprise_relationships(stix_format=False)
print(f"\r{'Relationships'.rjust(23)}: [{100:3.0f}%]...Done!", end="\n", flush=True)

print(f"\r{'Groups'.rjust(23)}: [{0:3.0f}%]...", end="", flush=True)
enterprise_groups = lift.get_enterprise_groups(stix_format=False)
print(f"\r{'Groups'.rjust(23)}: [{100:3.0f}%]...Done!", end="\n", flush=True)


for index, technique in enumerate(all_enterprise_techniques):
progress_percent = ((index+1)/len(all_enterprise_techniques)) * 100
if (sys.stdout.isatty() and sys.stdin.isatty() and sys.stderr.isatty()):
print(f"\r\t{'MITRE Technique Progress'.rjust(23)}: [{progress_percent:3.0f}%]...", end="", flush=True)
for technique in all_enterprise_techniques:
apt_groups:list[dict[str,Any]] = []
for relationship in enterprise_relationships:
if (relationship['target_object'] == technique['id']) and relationship['source_object'].startswith('intrusion-set'):
Expand All @@ -115,39 +112,10 @@ def get_attack_lookup(self, input_path: str, store_csv: bool = False, force_cach
self.addMitreIDViaGroupObjects(technique, tactics, apt_groups)
attack_lookup[technique['technique_id']] = {'technique': technique['technique'], 'tactics': tactics, 'groups': apt_groups}

if store_csv:
f = open(file_path, 'w')
writer = csv.writer(f)
writer.writerow(['mitre_id', 'technique', 'tactics' ,'groups'])
for key in attack_lookup.keys():
if len(attack_lookup[key]['groups']) == 0:
groups = 'no'
else:
groups = '|'.join(attack_lookup[key]['groups'])

writer.writerow([
key,
attack_lookup[key]['technique'],
'|'.join(attack_lookup[key]['tactics']),
groups
])

f.close()


except Exception as err:
print(f'\nError: {str(err)}')
print('Use local copy app_template/lookups/mitre_enrichment.csv')
with open(file_path, mode='r') as inp:
reader = csv.reader(inp)
attack_lookup = {rows[0]:{'technique': rows[1], 'tactics': rows[2].split('|'), 'groups': rows[3].split('|')} for rows in reader}
attack_lookup.pop('mitre_id')
for key in attack_lookup.keys():
technique_input = {'technique_id': key , 'technique': attack_lookup[key]['technique'] }
tactics_input = attack_lookup[key]['tactics']
groups_input = attack_lookup[key]['groups']
self.addMitreIDViaGroupNames(technique=technique_input, tactics=tactics_input, groups=groups_input)



raise Exception(f"Error getting MITRE Enrichment: {str(err)}")

print("Done!")
return attack_lookup
Loading

0 comments on commit 8f1845e

Please sign in to comment.