diff --git a/contentctl/enrichments/attack_enrichment.py b/contentctl/enrichments/attack_enrichment.py index e3d09822..59676bcd 100644 --- a/contentctl/enrichments/attack_enrichment.py +++ b/contentctl/enrichments/attack_enrichment.py @@ -7,7 +7,7 @@ import logging from pydantic import BaseModel, Field from dataclasses import field -from typing import Annotated +from typing import Annotated,Any from contentctl.objects.mitre_attack_enrichment import MitreAttackEnrichment from contentctl.objects.config import validate logging.getLogger('taxii2client').setLevel(logging.CRITICAL) @@ -33,21 +33,33 @@ def getEnrichmentByMitreID(self, mitre_id:Annotated[str, Field(pattern=r"^T\d{4} else: raise Exception(f"Error, Unable to find Mitre Enrichment for MitreID {mitre_id}") - - def addMitreID(self, technique:dict, tactics:list[str], groups:list[str])->None: - + def addMitreIDViaGroupNames(self, technique:dict, tactics:list[str], groupNames:list[str])->None: technique_id = technique['technique_id'] technique_obj = technique['technique'] tactics.sort() - groups.sort() - + if technique_id in self.data: raise Exception(f"Error, trying to redefine MITRE ID '{technique_id}'") + self.data[technique_id] = MitreAttackEnrichment(mitre_attack_id=technique_id, + mitre_attack_technique=technique_obj, + mitre_attack_tactics=tactics, + mitre_attack_groups=groupNames, + mitre_attack_group_objects=[]) + + def addMitreIDViaGroupObjects(self, technique:dict, tactics:list[str], groupObjects:list[dict[str,Any]])->None: + technique_id = technique['technique_id'] + technique_obj = technique['technique'] + tactics.sort() + groupNames:list[str] = sorted([group['group'] for group in groupObjects]) + + if technique_id in self.data: + raise Exception(f"Error, trying to redefine MITRE ID '{technique_id}'") self.data[technique_id] = MitreAttackEnrichment(mitre_attack_id=technique_id, mitre_attack_technique=technique_obj, mitre_attack_tactics=tactics, - mitre_attack_groups=groups) + mitre_attack_groups=groupNames, + mitre_attack_group_objects=groupObjects) def get_attack_lookup(self, input_path: str, store_csv: bool = False, force_cached_or_offline: bool = False, skip_enrichment:bool = False) -> dict: @@ -86,19 +98,20 @@ def get_attack_lookup(self, input_path: str, store_csv: bool = False, force_cach progress_percent = ((index+1)/len(all_enterprise_techniques)) * 100 if (sys.stdout.isatty() and sys.stdin.isatty() and sys.stderr.isatty()): print(f"\r\t{'MITRE Technique Progress'.rjust(23)}: [{progress_percent:3.0f}%]...", end="", flush=True) - apt_groups = [] + apt_groups:list[dict[str,Any]] = [] for relationship in enterprise_relationships: if (relationship['target_object'] == technique['id']) and relationship['source_object'].startswith('intrusion-set'): for group in enterprise_groups: if relationship['source_object'] == group['id']: - apt_groups.append(group['group']) + apt_groups.append(group) + #apt_groups.append(group['group']) tactics = [] if ('tactic' in technique): for tactic in technique['tactic']: tactics.append(tactic.replace('-',' ').title()) - self.addMitreID(technique, tactics, apt_groups) + self.addMitreIDViaGroupObjects(technique, tactics, apt_groups) attack_lookup[technique['technique_id']] = {'technique': technique['technique'], 'tactics': tactics, 'groups': apt_groups} if store_csv: @@ -131,7 +144,7 @@ def get_attack_lookup(self, input_path: str, store_csv: bool = False, force_cach technique_input = {'technique_id': key , 'technique': attack_lookup[key]['technique'] } tactics_input = attack_lookup[key]['tactics'] groups_input = attack_lookup[key]['groups'] - self.addMitreID(technique=technique_input, tactics=tactics_input, groups=groups_input) + self.addMitreIDViaGroupNames(technique=technique_input, tactics=tactics_input, groups=groups_input) diff --git a/contentctl/objects/mitre_attack_enrichment.py b/contentctl/objects/mitre_attack_enrichment.py index b3d2baad..627b261f 100644 --- a/contentctl/objects/mitre_attack_enrichment.py +++ b/contentctl/objects/mitre_attack_enrichment.py @@ -1,8 +1,8 @@ from __future__ import annotations -from pydantic import BaseModel, Field, ConfigDict +from pydantic import BaseModel, Field, ConfigDict, HttpUrl, field_validator from typing import List, Annotated from enum import StrEnum - +import datetime class MitreTactics(StrEnum): RECONNAISSANCE = "Reconnaissance" @@ -21,12 +21,75 @@ class MitreTactics(StrEnum): IMPACT = "Impact" +class AttackGroupMatrix(StrEnum): + enterprise_attack = "enterprise-attack" + ics_attack = "ics-attack" + mobile_attack = "mobile-attack" + + +class AttackGroupType(StrEnum): + intrusion_set = "intrusion-set" + +class MitreExternalReference(BaseModel): + model_config = ConfigDict(extra='forbid') + source_name: str + external_id: None | str = None + url: None | HttpUrl = None + description: None | str = None + + +class MitreAttackGroup(BaseModel): + model_config = ConfigDict(extra='forbid') + contributors: list[str] = [] + created: datetime.datetime + created_by_ref: str + external_references: list[MitreExternalReference] + group: str + group_aliases: list[str] + group_description: str + group_id: str + id: str + matrix: list[AttackGroupMatrix] + mitre_attack_spec_version: None | str + mitre_version: str + #assume that if the deprecated field is not present, then the group is not deprecated + mitre_deprecated: bool + modified: datetime.datetime + modified_by_ref: str + object_marking_refs: list[str] + type: AttackGroupType + url: HttpUrl + + + @field_validator("mitre_deprecated", mode="before") + def standardize_mitre_deprecated(cls, mitre_deprecated:bool | None) -> bool: + ''' + For some reason, the API will return either a bool for mitre_deprecated OR + None. We simplify our typing by converting None to False, and assuming that + if deprecated is None, then the group is not deprecated. + ''' + if mitre_deprecated is None: + return False + return mitre_deprecated + + @field_validator("contributors", mode="before") + def standardize_contributors(cls, contributors:list[str] | None) -> list[str]: + ''' + For some reason, the API will return either a list of strings for contributors OR + None. We simplify our typing by converting None to an empty list. + ''' + if contributors is None: + return [] + return contributors + class MitreAttackEnrichment(BaseModel): ConfigDict(use_enum_values=True) mitre_attack_id: Annotated[str, Field(pattern=r"^T\d{4}(.\d{3})?$")] = Field(...) mitre_attack_technique: str = Field(...) mitre_attack_tactics: List[MitreTactics] = Field(...) mitre_attack_groups: List[str] = Field(...) - + #Exclude this field from serialization - it is very large and not useful in JSON objects + mitre_attack_group_objects: list[MitreAttackGroup] = Field(..., exclude=True) def __hash__(self) -> int: return id(self) + diff --git a/pyproject.toml b/pyproject.toml index a06d7f60..6d78a4ab 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,20 +11,20 @@ contentctl = 'contentctl.contentctl:main' [tool.poetry.dependencies] python = "^3.11" -pydantic = "^2.7.1" -PyYAML = "^6.0.1" -requests = "~2.32.2" +pydantic = "^2.8.2" +PyYAML = "^6.0.2" +requests = "~2.32.3" pycvesearch = "^1.2" xmltodict = "^0.13.0" -attackcti = ">=0.3.7,<0.5.0" +attackcti = "^0.4.0" Jinja2 = "^3.1.4" questionary = "^2.0.1" docker = "^7.1.0" -splunk-sdk = "^2.0.1" +splunk-sdk = "^2.0.2" semantic-version = "^2.10.0" bottle = "^0.12.25" -tqdm = "^4.66.4" -pygit2 = "^1.14.1" +tqdm = "^4.66.5" +pygit2 = "^1.15.1" tyro = "^0.8.3" gitpython = "^3.1.43" setuptools = ">=69.5.1,<74.0.0"