From bb16d97718c7aecaf70c38f5a6484dc0e99ec4b5 Mon Sep 17 00:00:00 2001 From: pyth0n1c Date: Tue, 20 Aug 2024 17:28:18 -0700 Subject: [PATCH 1/5] add support for the entire mitre group object that is returned. --- contentctl/enrichments/attack_enrichment.py | 16 ++++--- contentctl/objects/mitre_attack_enrichment.py | 48 +++++++++++++++++-- 2 files changed, 54 insertions(+), 10 deletions(-) diff --git a/contentctl/enrichments/attack_enrichment.py b/contentctl/enrichments/attack_enrichment.py index e3d09822..2d4e59e2 100644 --- a/contentctl/enrichments/attack_enrichment.py +++ b/contentctl/enrichments/attack_enrichment.py @@ -7,7 +7,7 @@ import logging from pydantic import BaseModel, Field from dataclasses import field -from typing import Annotated +from typing import Annotated,Any from contentctl.objects.mitre_attack_enrichment import MitreAttackEnrichment from contentctl.objects.config import validate logging.getLogger('taxii2client').setLevel(logging.CRITICAL) @@ -34,20 +34,21 @@ def getEnrichmentByMitreID(self, mitre_id:Annotated[str, Field(pattern=r"^T\d{4} raise Exception(f"Error, Unable to find Mitre Enrichment for MitreID {mitre_id}") - def addMitreID(self, technique:dict, tactics:list[str], groups:list[str])->None: + def addMitreID(self, technique:dict, tactics:list[str], groups:list[dict[str,Any]])->None: technique_id = technique['technique_id'] technique_obj = technique['technique'] tactics.sort() - groups.sort() + group_names_only:list[str] = sorted([group['group'] for group in groups]) + if technique_id in self.data: raise Exception(f"Error, trying to redefine MITRE ID '{technique_id}'") - self.data[technique_id] = MitreAttackEnrichment(mitre_attack_id=technique_id, mitre_attack_technique=technique_obj, mitre_attack_tactics=tactics, - mitre_attack_groups=groups) + mitre_attack_groups=group_names_only, + mitre_attack_group_objects=groups) def get_attack_lookup(self, input_path: str, store_csv: bool = False, force_cached_or_offline: bool = False, skip_enrichment:bool = False) -> dict: @@ -86,12 +87,13 @@ def get_attack_lookup(self, input_path: str, store_csv: bool = False, force_cach progress_percent = ((index+1)/len(all_enterprise_techniques)) * 100 if (sys.stdout.isatty() and sys.stdin.isatty() and sys.stderr.isatty()): print(f"\r\t{'MITRE Technique Progress'.rjust(23)}: [{progress_percent:3.0f}%]...", end="", flush=True) - apt_groups = [] + apt_groups:list[dict[str,Any]] = [] for relationship in enterprise_relationships: if (relationship['target_object'] == technique['id']) and relationship['source_object'].startswith('intrusion-set'): for group in enterprise_groups: if relationship['source_object'] == group['id']: - apt_groups.append(group['group']) + apt_groups.append(group) + #apt_groups.append(group['group']) tactics = [] if ('tactic' in technique): diff --git a/contentctl/objects/mitre_attack_enrichment.py b/contentctl/objects/mitre_attack_enrichment.py index b3d2baad..31521e94 100644 --- a/contentctl/objects/mitre_attack_enrichment.py +++ b/contentctl/objects/mitre_attack_enrichment.py @@ -1,8 +1,8 @@ from __future__ import annotations -from pydantic import BaseModel, Field, ConfigDict +from pydantic import BaseModel, Field, ConfigDict, HttpUrl from typing import List, Annotated from enum import StrEnum - +import datetime class MitreTactics(StrEnum): RECONNAISSANCE = "Reconnaissance" @@ -20,6 +20,48 @@ class MitreTactics(StrEnum): EXFILTRATION = "Exfiltration" IMPACT = "Impact" +from enum import StrEnum +class AttackGroupMatrix(StrEnum): + mitre_attack = "mitre-attack" + + +class AttackGroupType(StrEnum): + intrusion_set = "intrusion-set" + +class MitreDomain(StrEnum): + intrusion_set = "enterprise-attack" + mobile_attack = "mobile-attack" + ics_attack = "ics-attack" + +class MitreExternalReference(BaseModel): + model_config = ConfigDict(extra='forbid') + source_name: str + external_id: None | str = None + url: None | HttpUrl = None + description: None | str = None + + +class MitreAttackGroup(BaseModel): + model_config = ConfigDict(extra='forbid') + created: datetime.datetime + created_by_ref: str + external_references: list[MitreExternalReference] + group: str + group_aliases: list[str] + group_description: str + id: str + matrix: AttackGroupMatrix + modified: datetime.datetime + object_marking_refs: list[str] + type: AttackGroupType + url: HttpUrl + x_mitre_attack_spec_version: None | str = None + x_mitre_deprecated: None | bool = None + x_mitre_domains: list[MitreDomain] + x_mitre_modified_by_ref: str + x_mitre_version: str + contributors: list[str] = [] + class MitreAttackEnrichment(BaseModel): ConfigDict(use_enum_values=True) @@ -27,6 +69,6 @@ class MitreAttackEnrichment(BaseModel): mitre_attack_technique: str = Field(...) mitre_attack_tactics: List[MitreTactics] = Field(...) mitre_attack_groups: List[str] = Field(...) - + mitre_attack_group_objects: list[MitreAttackGroup] = Field(...) def __hash__(self) -> int: return id(self) From aec612a3b85bbf70d054c0e8535cce85cb00fffc Mon Sep 17 00:00:00 2001 From: pyth0n1c Date: Tue, 20 Aug 2024 17:32:53 -0700 Subject: [PATCH 2/5] clean up import --- contentctl/objects/mitre_attack_enrichment.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contentctl/objects/mitre_attack_enrichment.py b/contentctl/objects/mitre_attack_enrichment.py index 31521e94..2abe5118 100644 --- a/contentctl/objects/mitre_attack_enrichment.py +++ b/contentctl/objects/mitre_attack_enrichment.py @@ -20,7 +20,7 @@ class MitreTactics(StrEnum): EXFILTRATION = "Exfiltration" IMPACT = "Impact" -from enum import StrEnum + class AttackGroupMatrix(StrEnum): mitre_attack = "mitre-attack" From e4cb761525a64cfb60e8de78f18c6dc40f788edd Mon Sep 17 00:00:00 2001 From: pyth0n1c Date: Wed, 21 Aug 2024 17:22:17 -0700 Subject: [PATCH 3/5] don't serialize the verbose MitreAttackGroup object. --- contentctl/objects/mitre_attack_enrichment.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/contentctl/objects/mitre_attack_enrichment.py b/contentctl/objects/mitre_attack_enrichment.py index 2abe5118..09aea5d7 100644 --- a/contentctl/objects/mitre_attack_enrichment.py +++ b/contentctl/objects/mitre_attack_enrichment.py @@ -69,6 +69,8 @@ class MitreAttackEnrichment(BaseModel): mitre_attack_technique: str = Field(...) mitre_attack_tactics: List[MitreTactics] = Field(...) mitre_attack_groups: List[str] = Field(...) - mitre_attack_group_objects: list[MitreAttackGroup] = Field(...) + #Exclude this field from serialization - it is very large and not useful in JSON objects + mitre_attack_group_objects: list[MitreAttackGroup] = Field(..., exclude=True) def __hash__(self) -> int: return id(self) + From 98f6b7a2a62a00fa6d71aa2fc216232b0a0d2fdf Mon Sep 17 00:00:00 2001 From: pyth0n1c Date: Thu, 22 Aug 2024 11:39:56 -0700 Subject: [PATCH 4/5] add print statements to help debug in ci --- contentctl/enrichments/attack_enrichment.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/contentctl/enrichments/attack_enrichment.py b/contentctl/enrichments/attack_enrichment.py index 2d4e59e2..ab40e8c5 100644 --- a/contentctl/enrichments/attack_enrichment.py +++ b/contentctl/enrichments/attack_enrichment.py @@ -41,7 +41,12 @@ def addMitreID(self, technique:dict, tactics:list[str], groups:list[dict[str,Any tactics.sort() group_names_only:list[str] = sorted([group['group'] for group in groups]) - + import pprint + print(technique_id) + print(technique_obj) + print(tactics) + print(group_names_only) + pprint.pprint(groups) if technique_id in self.data: raise Exception(f"Error, trying to redefine MITRE ID '{technique_id}'") self.data[technique_id] = MitreAttackEnrichment(mitre_attack_id=technique_id, From 758924db7ed741fcd8d7f32096a8185675279602 Mon Sep 17 00:00:00 2001 From: pyth0n1c Date: Thu, 22 Aug 2024 15:00:24 -0700 Subject: [PATCH 5/5] Bump some versions in pyproject.toml. Restructure the Pydantic Object for building MitreAttack API Groups based on the newer version of attackcti library. --- contentctl/enrichments/attack_enrichment.py | 32 ++++++++----- contentctl/objects/mitre_attack_enrichment.py | 47 +++++++++++++------ pyproject.toml | 14 +++--- 3 files changed, 59 insertions(+), 34 deletions(-) diff --git a/contentctl/enrichments/attack_enrichment.py b/contentctl/enrichments/attack_enrichment.py index ab40e8c5..59676bcd 100644 --- a/contentctl/enrichments/attack_enrichment.py +++ b/contentctl/enrichments/attack_enrichment.py @@ -33,27 +33,33 @@ def getEnrichmentByMitreID(self, mitre_id:Annotated[str, Field(pattern=r"^T\d{4} else: raise Exception(f"Error, Unable to find Mitre Enrichment for MitreID {mitre_id}") - - def addMitreID(self, technique:dict, tactics:list[str], groups:list[dict[str,Any]])->None: + def addMitreIDViaGroupNames(self, technique:dict, tactics:list[str], groupNames:list[str])->None: + technique_id = technique['technique_id'] + technique_obj = technique['technique'] + tactics.sort() + if technique_id in self.data: + raise Exception(f"Error, trying to redefine MITRE ID '{technique_id}'") + self.data[technique_id] = MitreAttackEnrichment(mitre_attack_id=technique_id, + mitre_attack_technique=technique_obj, + mitre_attack_tactics=tactics, + mitre_attack_groups=groupNames, + mitre_attack_group_objects=[]) + + def addMitreIDViaGroupObjects(self, technique:dict, tactics:list[str], groupObjects:list[dict[str,Any]])->None: technique_id = technique['technique_id'] technique_obj = technique['technique'] tactics.sort() - group_names_only:list[str] = sorted([group['group'] for group in groups]) - import pprint - print(technique_id) - print(technique_obj) - print(tactics) - print(group_names_only) - pprint.pprint(groups) + groupNames:list[str] = sorted([group['group'] for group in groupObjects]) + if technique_id in self.data: raise Exception(f"Error, trying to redefine MITRE ID '{technique_id}'") self.data[technique_id] = MitreAttackEnrichment(mitre_attack_id=technique_id, mitre_attack_technique=technique_obj, mitre_attack_tactics=tactics, - mitre_attack_groups=group_names_only, - mitre_attack_group_objects=groups) + mitre_attack_groups=groupNames, + mitre_attack_group_objects=groupObjects) def get_attack_lookup(self, input_path: str, store_csv: bool = False, force_cached_or_offline: bool = False, skip_enrichment:bool = False) -> dict: @@ -105,7 +111,7 @@ def get_attack_lookup(self, input_path: str, store_csv: bool = False, force_cach for tactic in technique['tactic']: tactics.append(tactic.replace('-',' ').title()) - self.addMitreID(technique, tactics, apt_groups) + self.addMitreIDViaGroupObjects(technique, tactics, apt_groups) attack_lookup[technique['technique_id']] = {'technique': technique['technique'], 'tactics': tactics, 'groups': apt_groups} if store_csv: @@ -138,7 +144,7 @@ def get_attack_lookup(self, input_path: str, store_csv: bool = False, force_cach technique_input = {'technique_id': key , 'technique': attack_lookup[key]['technique'] } tactics_input = attack_lookup[key]['tactics'] groups_input = attack_lookup[key]['groups'] - self.addMitreID(technique=technique_input, tactics=tactics_input, groups=groups_input) + self.addMitreIDViaGroupNames(technique=technique_input, tactics=tactics_input, groups=groups_input) diff --git a/contentctl/objects/mitre_attack_enrichment.py b/contentctl/objects/mitre_attack_enrichment.py index 09aea5d7..627b261f 100644 --- a/contentctl/objects/mitre_attack_enrichment.py +++ b/contentctl/objects/mitre_attack_enrichment.py @@ -1,5 +1,5 @@ from __future__ import annotations -from pydantic import BaseModel, Field, ConfigDict, HttpUrl +from pydantic import BaseModel, Field, ConfigDict, HttpUrl, field_validator from typing import List, Annotated from enum import StrEnum import datetime @@ -22,17 +22,14 @@ class MitreTactics(StrEnum): class AttackGroupMatrix(StrEnum): - mitre_attack = "mitre-attack" + enterprise_attack = "enterprise-attack" + ics_attack = "ics-attack" + mobile_attack = "mobile-attack" class AttackGroupType(StrEnum): intrusion_set = "intrusion-set" -class MitreDomain(StrEnum): - intrusion_set = "enterprise-attack" - mobile_attack = "mobile-attack" - ics_attack = "ics-attack" - class MitreExternalReference(BaseModel): model_config = ConfigDict(extra='forbid') source_name: str @@ -43,25 +40,47 @@ class MitreExternalReference(BaseModel): class MitreAttackGroup(BaseModel): model_config = ConfigDict(extra='forbid') + contributors: list[str] = [] created: datetime.datetime created_by_ref: str external_references: list[MitreExternalReference] group: str group_aliases: list[str] group_description: str + group_id: str id: str - matrix: AttackGroupMatrix + matrix: list[AttackGroupMatrix] + mitre_attack_spec_version: None | str + mitre_version: str + #assume that if the deprecated field is not present, then the group is not deprecated + mitre_deprecated: bool modified: datetime.datetime + modified_by_ref: str object_marking_refs: list[str] type: AttackGroupType url: HttpUrl - x_mitre_attack_spec_version: None | str = None - x_mitre_deprecated: None | bool = None - x_mitre_domains: list[MitreDomain] - x_mitre_modified_by_ref: str - x_mitre_version: str - contributors: list[str] = [] + + + @field_validator("mitre_deprecated", mode="before") + def standardize_mitre_deprecated(cls, mitre_deprecated:bool | None) -> bool: + ''' + For some reason, the API will return either a bool for mitre_deprecated OR + None. We simplify our typing by converting None to False, and assuming that + if deprecated is None, then the group is not deprecated. + ''' + if mitre_deprecated is None: + return False + return mitre_deprecated + @field_validator("contributors", mode="before") + def standardize_contributors(cls, contributors:list[str] | None) -> list[str]: + ''' + For some reason, the API will return either a list of strings for contributors OR + None. We simplify our typing by converting None to an empty list. + ''' + if contributors is None: + return [] + return contributors class MitreAttackEnrichment(BaseModel): ConfigDict(use_enum_values=True) diff --git a/pyproject.toml b/pyproject.toml index e43c8bdf..a0d867b1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,20 +11,20 @@ contentctl = 'contentctl.contentctl:main' [tool.poetry.dependencies] python = "^3.11" -pydantic = "^2.7.1" -PyYAML = "^6.0.1" -requests = "~2.32.2" +pydantic = "^2.8.2" +PyYAML = "^6.0.2" +requests = "~2.32.3" pycvesearch = "^1.2" xmltodict = "^0.13.0" -attackcti = ">=0.3.7,<0.5.0" +attackcti = "^0.4.0" Jinja2 = "^3.1.4" questionary = "^2.0.1" docker = "^7.1.0" -splunk-sdk = "^2.0.1" +splunk-sdk = "^2.0.2" semantic-version = "^2.10.0" bottle = "^0.12.25" -tqdm = "^4.66.4" -pygit2 = "^1.14.1" +tqdm = "^4.66.5" +pygit2 = "^1.15.1" tyro = "^0.8.3" gitpython = "^3.1.43" setuptools = ">=69.5.1,<74.0.0"