Skip to content

Commit

Permalink
Merge pull request #253 from splunk/add_full_mitre_groups
Browse files Browse the repository at this point in the history
add support for the entire mitre group metadata
  • Loading branch information
pyth0n1c authored Aug 22, 2024
2 parents 8a07fcf + 4c2f9ac commit 68e4102
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 21 deletions.
35 changes: 24 additions & 11 deletions contentctl/enrichments/attack_enrichment.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import logging
from pydantic import BaseModel, Field
from dataclasses import field
from typing import Annotated
from typing import Annotated,Any
from contentctl.objects.mitre_attack_enrichment import MitreAttackEnrichment
from contentctl.objects.config import validate
logging.getLogger('taxii2client').setLevel(logging.CRITICAL)
Expand All @@ -33,21 +33,33 @@ def getEnrichmentByMitreID(self, mitre_id:Annotated[str, Field(pattern=r"^T\d{4}
else:
raise Exception(f"Error, Unable to find Mitre Enrichment for MitreID {mitre_id}")


def addMitreID(self, technique:dict, tactics:list[str], groups:list[str])->None:

def addMitreIDViaGroupNames(self, technique:dict, tactics:list[str], groupNames:list[str])->None:
technique_id = technique['technique_id']
technique_obj = technique['technique']
tactics.sort()
groups.sort()


if technique_id in self.data:
raise Exception(f"Error, trying to redefine MITRE ID '{technique_id}'")
self.data[technique_id] = MitreAttackEnrichment(mitre_attack_id=technique_id,
mitre_attack_technique=technique_obj,
mitre_attack_tactics=tactics,
mitre_attack_groups=groupNames,
mitre_attack_group_objects=[])

def addMitreIDViaGroupObjects(self, technique:dict, tactics:list[str], groupObjects:list[dict[str,Any]])->None:
technique_id = technique['technique_id']
technique_obj = technique['technique']
tactics.sort()

groupNames:list[str] = sorted([group['group'] for group in groupObjects])

if technique_id in self.data:
raise Exception(f"Error, trying to redefine MITRE ID '{technique_id}'")
self.data[technique_id] = MitreAttackEnrichment(mitre_attack_id=technique_id,
mitre_attack_technique=technique_obj,
mitre_attack_tactics=tactics,
mitre_attack_groups=groups)
mitre_attack_groups=groupNames,
mitre_attack_group_objects=groupObjects)


def get_attack_lookup(self, input_path: str, store_csv: bool = False, force_cached_or_offline: bool = False, skip_enrichment:bool = False) -> dict:
Expand Down Expand Up @@ -86,19 +98,20 @@ def get_attack_lookup(self, input_path: str, store_csv: bool = False, force_cach
progress_percent = ((index+1)/len(all_enterprise_techniques)) * 100
if (sys.stdout.isatty() and sys.stdin.isatty() and sys.stderr.isatty()):
print(f"\r\t{'MITRE Technique Progress'.rjust(23)}: [{progress_percent:3.0f}%]...", end="", flush=True)
apt_groups = []
apt_groups:list[dict[str,Any]] = []
for relationship in enterprise_relationships:
if (relationship['target_object'] == technique['id']) and relationship['source_object'].startswith('intrusion-set'):
for group in enterprise_groups:
if relationship['source_object'] == group['id']:
apt_groups.append(group['group'])
apt_groups.append(group)
#apt_groups.append(group['group'])

tactics = []
if ('tactic' in technique):
for tactic in technique['tactic']:
tactics.append(tactic.replace('-',' ').title())

self.addMitreID(technique, tactics, apt_groups)
self.addMitreIDViaGroupObjects(technique, tactics, apt_groups)
attack_lookup[technique['technique_id']] = {'technique': technique['technique'], 'tactics': tactics, 'groups': apt_groups}

if store_csv:
Expand Down Expand Up @@ -131,7 +144,7 @@ def get_attack_lookup(self, input_path: str, store_csv: bool = False, force_cach
technique_input = {'technique_id': key , 'technique': attack_lookup[key]['technique'] }
tactics_input = attack_lookup[key]['tactics']
groups_input = attack_lookup[key]['groups']
self.addMitreID(technique=technique_input, tactics=tactics_input, groups=groups_input)
self.addMitreIDViaGroupNames(technique=technique_input, tactics=tactics_input, groups=groups_input)



Expand Down
69 changes: 66 additions & 3 deletions contentctl/objects/mitre_attack_enrichment.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from __future__ import annotations
from pydantic import BaseModel, Field, ConfigDict
from pydantic import BaseModel, Field, ConfigDict, HttpUrl, field_validator
from typing import List, Annotated
from enum import StrEnum

import datetime

class MitreTactics(StrEnum):
RECONNAISSANCE = "Reconnaissance"
Expand All @@ -21,12 +21,75 @@ class MitreTactics(StrEnum):
IMPACT = "Impact"


class AttackGroupMatrix(StrEnum):
enterprise_attack = "enterprise-attack"
ics_attack = "ics-attack"
mobile_attack = "mobile-attack"


class AttackGroupType(StrEnum):
intrusion_set = "intrusion-set"

class MitreExternalReference(BaseModel):
model_config = ConfigDict(extra='forbid')
source_name: str
external_id: None | str = None
url: None | HttpUrl = None
description: None | str = None


class MitreAttackGroup(BaseModel):
model_config = ConfigDict(extra='forbid')
contributors: list[str] = []
created: datetime.datetime
created_by_ref: str
external_references: list[MitreExternalReference]
group: str
group_aliases: list[str]
group_description: str
group_id: str
id: str
matrix: list[AttackGroupMatrix]
mitre_attack_spec_version: None | str
mitre_version: str
#assume that if the deprecated field is not present, then the group is not deprecated
mitre_deprecated: bool
modified: datetime.datetime
modified_by_ref: str
object_marking_refs: list[str]
type: AttackGroupType
url: HttpUrl


@field_validator("mitre_deprecated", mode="before")
def standardize_mitre_deprecated(cls, mitre_deprecated:bool | None) -> bool:
'''
For some reason, the API will return either a bool for mitre_deprecated OR
None. We simplify our typing by converting None to False, and assuming that
if deprecated is None, then the group is not deprecated.
'''
if mitre_deprecated is None:
return False
return mitre_deprecated

@field_validator("contributors", mode="before")
def standardize_contributors(cls, contributors:list[str] | None) -> list[str]:
'''
For some reason, the API will return either a list of strings for contributors OR
None. We simplify our typing by converting None to an empty list.
'''
if contributors is None:
return []
return contributors

class MitreAttackEnrichment(BaseModel):
ConfigDict(use_enum_values=True)
mitre_attack_id: Annotated[str, Field(pattern=r"^T\d{4}(.\d{3})?$")] = Field(...)
mitre_attack_technique: str = Field(...)
mitre_attack_tactics: List[MitreTactics] = Field(...)
mitre_attack_groups: List[str] = Field(...)

#Exclude this field from serialization - it is very large and not useful in JSON objects
mitre_attack_group_objects: list[MitreAttackGroup] = Field(..., exclude=True)
def __hash__(self) -> int:
return id(self)

14 changes: 7 additions & 7 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,20 @@ contentctl = 'contentctl.contentctl:main'

[tool.poetry.dependencies]
python = "^3.11"
pydantic = "^2.7.1"
PyYAML = "^6.0.1"
requests = "~2.32.2"
pydantic = "^2.8.2"
PyYAML = "^6.0.2"
requests = "~2.32.3"
pycvesearch = "^1.2"
xmltodict = "^0.13.0"
attackcti = ">=0.3.7,<0.5.0"
attackcti = "^0.4.0"
Jinja2 = "^3.1.4"
questionary = "^2.0.1"
docker = "^7.1.0"
splunk-sdk = "^2.0.1"
splunk-sdk = "^2.0.2"
semantic-version = "^2.10.0"
bottle = "^0.12.25"
tqdm = "^4.66.4"
pygit2 = "^1.14.1"
tqdm = "^4.66.5"
pygit2 = "^1.15.1"
tyro = "^0.8.3"
gitpython = "^3.1.43"
setuptools = ">=69.5.1,<74.0.0"
Expand Down

0 comments on commit 68e4102

Please sign in to comment.