Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Adding version enforcement #280

Merged
merged 19 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
0f4ea67
Adding version enforcement
cmcginley-splunk Sep 10, 2024
001a309
updating comments
cmcginley-splunk Sep 10, 2024
651978f
Merge branch 'main' into feature/version-bumping-enforcement
cmcginley-splunk Sep 10, 2024
63530a5
annotating issue number
cmcginley-splunk Sep 10, 2024
f0e71e9
suggested README updates
cmcginley-splunk Sep 10, 2024
6dd0a4b
adding bool flag to enable/suppress metadata validation
cmcginley-splunk Sep 10, 2024
87d128a
adding validator forcing enrichments for inspect action
cmcginley-splunk Sep 10, 2024
890a239
changing download functionality to use actual file's name
cmcginley-splunk Sep 11, 2024
6826188
typo
cmcginley-splunk Sep 11, 2024
249bf27
Added DetectionMetadata class; some docstring changes
cmcginley-splunk Sep 11, 2024
dac23df
uncommenting appinspect actions
cmcginley-splunk Sep 11, 2024
89e2473
adding alternate description to inspect config for enrichments
cmcginley-splunk Sep 11, 2024
1be4d87
updating description so it pulls from the base class description
cmcginley-splunk Sep 12, 2024
0a7db93
allowing for new fields in DetectionMetadata
cmcginley-splunk Sep 12, 2024
dfbfc52
adding comments about ESCU version compatibility
cmcginley-splunk Sep 12, 2024
4cf3628
another commment about versions
cmcginley-splunk Sep 12, 2024
f00e886
Merge branch 'main' into feature/version-bumping-enforcement
cmcginley-splunk Sep 16, 2024
44cbee7
removing hardcoded references to ESCU and the ESCU app UID
cmcginley-splunk Sep 16, 2024
094b4bb
Merge branch 'main' into feature/version-bumping-enforcement
pyth0n1c Sep 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ This section is under active development. It will allow you to a [MITRE Map](ht
Choose TYPE {detection, story} to create new content for the Content Pack. The tool will interactively ask a series of questions required for generating a basic piece of content and automatically add it to the Content Pack.

### contentctl inspect
This section is under development. It will enable the user to perform an appinspect of the content pack in preparation for deployment onto a Splunk Instance or via Splunk Cloud.
This section is under development. The inspect action performs a number post-build validations. Primarily, it will enable the user to perform an appinspect of the content pack in preparation for deployment onto a Splunk Instance or via Splunk Cloud. It also compares detections in the new build against a prior build, confirming that any changed detections have had their versions incremented (this comparison happens at the savedsearch.conf level, which is why it must happen after the build).
pyth0n1c marked this conversation as resolved.
Show resolved Hide resolved

### contentctl deploy
The reason to build content is so that it can be deployed to your environment. However, deploying content to multiple servers and different types of infrastructure can be tricky and time-consuming. contentctl makes this easy by supporting a number of different deployment mechanisms. Deployment targets can be defined in [contentctl.yml](/contentctl/templates/contentctl_default.yml).
Expand Down
271 changes: 180 additions & 91 deletions contentctl/actions/inspect.py

Large diffs are not rendered by default.

98 changes: 88 additions & 10 deletions contentctl/helper/splunk_app.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import os
import time
import json
from typing import Optional, Collection
from pathlib import Path
import xml.etree.ElementTree as ET
from typing import List, Tuple, Optional
from urllib.parse import urlencode

import requests
import urllib3
import xmltodict
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from urllib3.util.retry import Retry

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

MAX_RETRY = 3


class APIEndPoint:
"""
Class which contains Static Endpoint
Expand All @@ -27,6 +27,7 @@ class APIEndPoint:
SPLUNK_BASE_GET_UID_REDIRECT = "https://apps.splunk.com/apps/id/{app_name_id}"
SPLUNK_BASE_APP_INFO = "https://splunkbase.splunk.com/api/v1/app/{app_uid}"


class RetryConstant:
"""
Class which contains Retry Constant
Expand All @@ -53,11 +54,11 @@ class InitializationError(Exception):

@staticmethod
def requests_retry_session(
retries=RetryConstant.RETRY_COUNT,
backoff_factor=1,
status_forcelist=(500, 502, 503, 504),
session=None,
):
retries: int = RetryConstant.RETRY_COUNT,
backoff_factor: int = 1,
status_forcelist: Collection[int] = (500, 502, 503, 504),
session: requests.Session | None = None,
) -> requests.Session:
session = session or requests.Session()
retry = Retry(
total=retries,
Expand Down Expand Up @@ -260,4 +261,81 @@ def set_latest_version_info(self) -> None:

# parse out the version number and fetch the download URL
self.latest_version = info_url.split("/")[-1]
self.latest_version_download_url = self.__fetch_url_latest_version_download(info_url)
self.latest_version_download_url = self.__fetch_url_latest_version_download(info_url)

def __get_splunk_base_session_token(self, username: str, password: str) -> str:
"""
This method will generate Splunk base session token
:return: Splunk base session token
"""
# Data payload for fetch splunk base session token
payload = urlencode(
{
"username": username,
"password": password,
}
)

headers = {
"content-type": "application/x-www-form-urlencoded",
"cache-control": "no-cache",
}

response = requests.request(
"POST",
APIEndPoint.SPLUNK_BASE_AUTH_URL,
data=payload,
headers=headers,
)

token_value = ""

if response.status_code != 200:
msg = (
f"Error occurred while executing the rest call for splunk base authentication api,"
f"{response.content}"
)
raise Exception(msg)
else:
root = ET.fromstring(response.content)
token_value = root.find("{http://www.w3.org/2005/Atom}id").text.strip()
return token_value

def download(self, out: Path, username: str, password: str, overwrite: bool = False) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmcginley-splunk , I am EXTREMELY tempted to remove the HTTP URLs for all of our apps and use this function to download them from Splunkbase on-demand. This makes it WAY easy to bump versions as we don't need to mirror a specific version of the app to S3.

The implementation for this would look something like:

  1. http_path None and local_path None? Download from splunkbase
    a. specific version set? Use that version
    b. specific version not set? Use latest version.
  2. http_path not None or local_path not None? Use the path

This is likely an enhancement and out of scope for this PR.
Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolving this as a possible enhancement: #284

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think consolidating under the SplunkApp umbrella is a great idea!

"""
Given an output path, download the app to the specified location
:param out: the Path to download the app to
:param username: Splunkbase username
:param password: Splunkbase password
"""
# Ensure the output path is not already occupied
if out.exists() and not overwrite:
msg = (
f"File already exists at {out}, cannot download the app."
)
raise Exception(msg)

# Make any parent directories as needed
out.parent.mkdir(parents=True, exist_ok=True)

# Get the Splunkbase session token
token = self.__get_splunk_base_session_token(username, password)
response = requests.request(
"GET",
self.latest_version_download_url,
cookies={
"sessionid": token
}
)

# Check for HTTP errors
if response.status_code != 200:
msg = (
f"Error occurred while executing the rest call for splunk base authentication api,"
f"{response.content}"
)
raise Exception(msg)

# Write the app to disk
with open(out, "wb") as file:
file.write(response.content)
68 changes: 55 additions & 13 deletions contentctl/objects/config.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
from __future__ import annotations

from os import environ
from datetime import datetime, UTC
from typing import Optional, Any, List, Union, Self
import random
from enum import StrEnum, auto
import pathlib
from urllib.parse import urlparse
from abc import ABC, abstractmethod
from functools import partialmethod

import tqdm
import semantic_version
from pydantic import (
BaseModel, Field, field_validator,
field_serializer, ConfigDict, DirectoryPath,
PositiveInt, FilePath, HttpUrl, AnyUrl, model_validator,
ValidationInfo
)

from contentctl.objects.constants import ESCU_APP_UID, LATEST_ESCU_DOWNLOAD_PATH
from contentctl.output.yml_writer import YmlWriter
from os import environ
from datetime import datetime, UTC
from typing import Optional,Any,Annotated,List,Union, Self
import semantic_version
import random
from enum import StrEnum, auto
import pathlib
from contentctl.helper.utils import Utils
from urllib.parse import urlparse
from abc import ABC, abstractmethod
from contentctl.objects.enums import PostTestBehavior, DetectionTestingMode
from contentctl.objects.detection import Detection
from contentctl.objects.annotated_types import APPID_TYPE
import tqdm
from functools import partialmethod
from contentctl.helper.splunk_app import SplunkApp

ENTERPRISE_SECURITY_UID = 263
COMMON_INFORMATION_MODEL_UID = 1621
Expand Down Expand Up @@ -249,11 +254,48 @@ class StackType(StrEnum):
classic = auto()
victoria = auto()


class inspect(build):
pyth0n1c marked this conversation as resolved.
Show resolved Hide resolved
cmcginley-splunk marked this conversation as resolved.
Show resolved Hide resolved
splunk_api_username: str = Field(description="Splunk API username used for running appinspect.")
splunk_api_password: str = Field(exclude=True, description="Splunk API password used for running appinspect.")
splunk_api_username: str = Field(
pyth0n1c marked this conversation as resolved.
Show resolved Hide resolved
description="Splunk API username used for appinspect and Splunkbase downloads."
)
splunk_api_password: str = Field(
exclude=True,
description="Splunk API password used for appinspect and Splunkbase downloads."
)
previous_build: str | None = Field(
default=None,
description=(
"Local path to the previous ESCU build for versioning enforcement (defaults to the "
"latest release published on Splunkbase)."
)
)
stack_type: StackType = Field(description="The type of your Splunk Cloud Stack")

def get_previous_package_file_path(self) -> pathlib.Path:
pyth0n1c marked this conversation as resolved.
Show resolved Hide resolved
"""
Returns a Path object for the path to the prior package build. If no path was provided, the
latest version is downloaded from Splunkbase and it's filepath is returned, and saved to the
in-memory config (so download doesn't happen twice in the same run).
:returns: Path object to previous ESCU build
"""
previous_build_path = self.previous_build
# Download the previous build as the latest release on Splunkbase if no path was provided
if previous_build_path is None:
print("Downloading latest ESCU build from Splunkbase to serve as previous build during validation...")
app = SplunkApp(app_uid=ESCU_APP_UID)
app.download(
out=pathlib.Path(LATEST_ESCU_DOWNLOAD_PATH),
username=self.splunk_api_username,
password=self.splunk_api_password,
overwrite=True
)
previous_build_path = LATEST_ESCU_DOWNLOAD_PATH
print(f"Latest release downloaded from Splunkbase to: {previous_build_path}")
self.previous_build = previous_build_path
return pathlib.Path(previous_build_path)


class NewContentType(StrEnum):
detection = auto()
story = auto()
Expand Down
11 changes: 10 additions & 1 deletion contentctl/objects/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,13 @@
RBA_OBSERVABLE_ROLE_MAPPING = {
"Attacker": 0,
"Victim": 1
}
}

# The app UID in Splunkbase of ESCU
pyth0n1c marked this conversation as resolved.
Show resolved Hide resolved
ESCU_APP_UID = 3449

# The relative path to the directory where any apps/packages will be downloaded
pyth0n1c marked this conversation as resolved.
Show resolved Hide resolved
DOWNLOADS_DIRECTORY = "downloads"

# The default download path for ESCU
pyth0n1c marked this conversation as resolved.
Show resolved Hide resolved
LATEST_ESCU_DOWNLOAD_PATH = f"{DOWNLOADS_DIRECTORY}/escu_latest.tgz"
115 changes: 115 additions & 0 deletions contentctl/objects/detection_stanza.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import uuid
from typing import Any, ClassVar
import json
import hashlib

from pydantic import BaseModel, Field, PrivateAttr, computed_field


class DetectionStanza(BaseModel):
"""
A model representing a stanza for a detection in savedsearches.conf
"""
# The lines that comprise this stanza, in the order they appear in the conf
lines: list[str] = Field(...)

# The full name of the detection (e.g. "ESCU - My Detection - Rule")
name: str = Field(...)

# The metadata extracted from the stanza
_metadata: dict[str, Any] = PrivateAttr(default={})
pyth0n1c marked this conversation as resolved.
Show resolved Hide resolved

# The key prefix indicating the metadata attribute
METADATA_LINE_PREFIX: ClassVar[str] = "action.correlationsearch.metadata = "

def model_post_init(self, __context: Any) -> None:
super().model_post_init(__context)
self._parse_metadata()

def _parse_metadata(self) -> None:
"""
Using the provided lines, parse out the metadata
"""
# Set a variable to store the metadata line in
meta_line: str | None = None

# Iterate over the lines to look for the metadata line
for line in self.lines:
if line.startswith(DetectionStanza.METADATA_LINE_PREFIX):
# If we find a matching line more than once, we've hit an error
if meta_line is not None:
raise Exception(
f"Metadata for detection '{self.name}' found twice in stanza."
)
meta_line = line

# Report if we could not find the metadata line
if meta_line is None:
raise Exception(f"No metadata for detection '{self.name}' found in stanza.")

# Try to to load the metadat JSON into a dict
pyth0n1c marked this conversation as resolved.
Show resolved Hide resolved
try:
self._metadata: dict[str, Any] = json.loads(meta_line[len(DetectionStanza.METADATA_LINE_PREFIX):])
except json.decoder.JSONDecodeError as e:
raise Exception(
f"Malformed metdata for detection '{self.name}': {e}"
)

@computed_field
@property
def deprecated(self) -> int:
"""
An int indicating whether the detection is deprecated
:returns: int
"""
return int(self._metadata["deprecated"])

@computed_field
@property
def detection_id(self) -> uuid.UUID:
"""
A UUID identifying the detection
:returns: UUID
"""
return uuid.UUID(self._metadata["detection_id"])

@computed_field
@property
def detection_version(self) -> int:
"""
The version of the detection
:returns: int
"""
return int(self._metadata["detection_version"])

@computed_field
@property
def publish_time(self) -> float:
"""
The time the detection was published
:returns: float
"""
return self._metadata["publish_time"]

@computed_field
@property
def hash(self) -> str:
"""
The SHA256 hash of the lines of the stanza, excluding the metadata line
:returns: str (hexdigest)
"""
hash = hashlib.sha256()
for line in self.lines:
if not line.startswith(DetectionStanza.METADATA_LINE_PREFIX):
pyth0n1c marked this conversation as resolved.
Show resolved Hide resolved
hash.update(line.encode("utf-8"))
return hash.hexdigest()

def version_should_be_bumped(self, previous: "DetectionStanza") -> bool:
"""
A helper method that compares this stanza against the same stanza from a previous build;
returns True if the version still needs to be bumped (e.g. the detection was changed but
the version was not), False otherwise.
:param previous: the previous build's DetectionStanza for comparison
:returns: True if the version still needs to be bumped
"""
return (self.hash != previous.hash) and (self.detection_version <= previous.detection_version)
Loading
Loading