Skip to content

Commit

Permalink
Merge branch 'master' into dependabot/pip/tqdm-approx-eq-4.66.1
Browse files Browse the repository at this point in the history
  • Loading branch information
jholtmann authored Dec 18, 2023
2 parents c41d616 + 6f232de commit e716146
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 61 deletions.
45 changes: 40 additions & 5 deletions common.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import os
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Tuple, Optional, Any
Expand Down Expand Up @@ -137,7 +138,23 @@ def _echo(self, message: str, level: int = logging.DEBUG):
"""
log_echo(message, self.log, level, use_tqdm=self._tqdm_echo)

def sigma_translation(product: str, sigma_rules: list) -> dict:
def sigma_translation(product: str, sigma_rules: list, pq: bool = False) -> dict:
"""
Translates a list of sigma rules into the target product language
Parameters
----------
product : str
Name of target product
sigma_rules : list
List of files containing sigma rules or YML-formatted strings
Does not support a mixed list of files and strings
pq : bool
Only used for SentinelOne translations (default is False)
If true, translates into PowerQuery syntax
Otherwise, uses DeepVisibility
"""

supports_json_ouput = True

try:
Expand All @@ -158,16 +175,34 @@ def sigma_translation(product: str, sigma_rules: list) -> dict:

backend = CarbonBlackBackend(cb_pipeline())
elif product == 's1':
plugins.get_plugin_by_id('sentinelone').install()
from sigma.backends.sentinelone import SentinelOneBackend # type: ignore
backend = SentinelOneBackend()
if pq:
plugins.get_plugin_by_id('sentinelone-pq').install()
from sigma.backends.sentinelone_pq import SentinelOnePQBackend # type: ignore
backend = SentinelOnePQBackend()
else:
plugins.get_plugin_by_id('sentinelone').install()
from sigma.backends.sentinelone import SentinelOneBackend # type: ignore
backend = SentinelOneBackend()
elif product == 'dfe':
supports_json_ouput = False
plugins.get_plugin_by_id('microsoft365defender').install()
from sigma.backends.microsoft365defender import Microsoft365DefenderBackend # type: ignore
backend = Microsoft365DefenderBackend()
elif product == 'cortex':
plugins.get_plugin_by_id('cortexxdr').install()
from sigma.backends.cortexxdr import CortexXDRBackend # type: ignore
backend = CortexXDRBackend()

are_files = [os.path.isfile(i) for i in sigma_rules]

if all(are_files): # if all items in the list are files
rule_collection = SigmaCollection.load_ruleset(sigma_rules)
elif not any(are_files): # if none of the items in the list are files, assume YML formatted strings
rule_collection = SigmaCollection.merge([SigmaCollection.from_yaml(i) for i in sigma_rules])
else:
logging.error("There appears to be a mix of files and YML strings. Cannot process a mixed list of values. Aborting.")
return {'queries': []}

rule_collection = SigmaCollection.load_ruleset(sigma_rules)
if supports_json_ouput:
return backend.convert(rule_collection, "json")
else:
Expand Down
3 changes: 2 additions & 1 deletion definitions/remote-admin.json
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@
"domain": ["*.level.io"],
"digsig_publisher": ["Level Software, Inc."],
"process_name": ["level-windows-amd64.exe",
"level.exe"]
"level.exe",
"level-remote-control-ffmpeg.exe"]
},
"FixMe": {
"domain": ["fixme.it"],
Expand Down
9 changes: 2 additions & 7 deletions surveyor.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,6 @@ def survey(ctx, product_str: str = 'cbr') -> None:
if opt.days and opt.minutes:
ctx.fail('--days and --minutes are mutually exclusive')

if (opt.sigma_rule or opt.sigma_dir) and product_str == 'cortex':
ctx.fail('Neither --sigmarule nor --sigmadir are supported by product "cortex"')

if (opt.sigma_rule or opt.sigma_dir) and product_str == 's1' and opt.product_args['pq']:
ctx.fail('Neither --sigmarule nor --sigmadir are supported by SentinelOne PowerQuery')

if opt.sigma_rule and not os.path.isfile(opt.sigma_rule):
ctx.fail('Supplied --sigmarule is not a file')

Expand Down Expand Up @@ -428,7 +422,8 @@ def survey(ctx, product_str: str = 'cbr') -> None:

# if there's sigma rules to be processed
if len(sigma_rules) > 0:
translated_rules = sigma_translation(product_str, sigma_rules)
pq_check = True if 'pq' in opt.product_args and opt.product_args['pq'] else False
translated_rules = sigma_translation(product_str, sigma_rules, pq_check)
if len(translated_rules['queries']) != len(sigma_rules):
log.warning(f"Only {len(translated_rules['queries'])} out of {len(sigma_rules)} were able to be translated.")
for rule in tqdm(translated_rules['queries'], desc="Processing sigma rules", disable=opt.no_progress):
Expand Down
48 changes: 0 additions & 48 deletions tests/test_surveyor.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,51 +426,3 @@ def test_sigma_dir_with_base_query(runner, mocker):
mocker.call(Tag('Test sigma rule 2 - 15ecb82d-b7c0-4e53-9bf3-deedb4c9908c', 'Sigma Rule'), {"query":["process_name:powershell.exe"]}, {'username':'admin', 'hostname':'workstation1','days':5 })]
assert "Processing sigma rules" in result.output
mocked_nested_process_search.assert_has_calls(expected_calls, any_order=True)


def test_sigma_rule_with_cortex(runner, mocker):
mocker.patch('products.vmware_cb_response.CbResponse._authenticate')
with runner.isolated_filesystem() as temp_dir:
cred_file = os.path.join(temp_dir, "test.ini")
with open(cred_file, 'w') as cred_file_output:
cred_file_output.write("testing123")

result = runner.invoke(cli, ['--sigmarule', 'test.yml', 'cortex', '--creds', cred_file])
assert 'Neither --sigmarule nor --sigmadir are supported by product "cortex"' in result.output
assert result.exit_code != 0


def test_sigma_dir_with_cortex(runner, mocker):
mocker.patch('products.vmware_cb_response.CbResponse._authenticate')
with runner.isolated_filesystem() as temp_dir:
cred_file = os.path.join(temp_dir, "test.ini")
with open(cred_file, 'w') as cred_file_output:
cred_file_output.write("testing123")

result = runner.invoke(cli, ['--sigmadir', './sigma_dir', 'cortex', '--creds', cred_file])
assert 'Neither --sigmarule nor --sigmadir are supported by product "cortex"' in result.output
assert result.exit_code != 0


def test_sigma_rule_with_s1_pq(runner, mocker):
mocker.patch('products.vmware_cb_response.CbResponse._authenticate')
with runner.isolated_filesystem() as temp_dir:
cred_file = os.path.join(temp_dir, "test.ini")
with open(cred_file, 'w') as cred_file_output:
cred_file_output.write("testing123")

result = runner.invoke(cli, ['--sigmarule', 'test.yml', 's1', '--creds', cred_file])
assert 'Neither --sigmarule nor --sigmadir are supported by SentinelOne PowerQuery' in result.output
assert result.exit_code != 0


def test_sigma_dir_with_s1_pq(runner, mocker):
mocker.patch('products.vmware_cb_response.CbResponse._authenticate')
with runner.isolated_filesystem() as temp_dir:
cred_file = os.path.join(temp_dir, "test.ini")
with open(cred_file, 'w') as cred_file_output:
cred_file_output.write("testing123")

result = runner.invoke(cli, ['--sigmadir', './sigma_dir', 's1', '--creds', cred_file])
assert 'Neither --sigmarule nor --sigmadir are supported by SentinelOne PowerQuery' in result.output
assert result.exit_code != 0

0 comments on commit e716146

Please sign in to comment.