diff --git a/common.py b/common.py index b3a5ab8..7b3e80a 100644 --- a/common.py +++ b/common.py @@ -1,4 +1,5 @@ import logging +import os from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Tuple, Optional, Any @@ -137,7 +138,23 @@ def _echo(self, message: str, level: int = logging.DEBUG): """ log_echo(message, self.log, level, use_tqdm=self._tqdm_echo) -def sigma_translation(product: str, sigma_rules: list) -> dict: +def sigma_translation(product: str, sigma_rules: list, pq: bool = False) -> dict: + """ + Translates a list of sigma rules into the target product language + + Parameters + ---------- + product : str + Name of target product + sigma_rules : list + List of files containing sigma rules or YML-formatted strings + Does not support a mixed list of files and strings + pq : bool + Only used for SentinelOne translations (default is False) + If true, translates into PowerQuery syntax + Otherwise, uses DeepVisibility + """ + supports_json_ouput = True try: @@ -158,16 +175,34 @@ def sigma_translation(product: str, sigma_rules: list) -> dict: backend = CarbonBlackBackend(cb_pipeline()) elif product == 's1': - plugins.get_plugin_by_id('sentinelone').install() - from sigma.backends.sentinelone import SentinelOneBackend # type: ignore - backend = SentinelOneBackend() + if pq: + plugins.get_plugin_by_id('sentinelone-pq').install() + from sigma.backends.sentinelone_pq import SentinelOnePQBackend # type: ignore + backend = SentinelOnePQBackend() + else: + plugins.get_plugin_by_id('sentinelone').install() + from sigma.backends.sentinelone import SentinelOneBackend # type: ignore + backend = SentinelOneBackend() elif product == 'dfe': supports_json_ouput = False plugins.get_plugin_by_id('microsoft365defender').install() from sigma.backends.microsoft365defender import Microsoft365DefenderBackend # type: ignore backend = Microsoft365DefenderBackend() + elif product == 'cortex': + plugins.get_plugin_by_id('cortexxdr').install() + from sigma.backends.cortexxdr import CortexXDRBackend # type: ignore + backend = CortexXDRBackend() + + are_files = [os.path.isfile(i) for i in sigma_rules] + + if all(are_files): # if all items in the list are files + rule_collection = SigmaCollection.load_ruleset(sigma_rules) + elif not any(are_files): # if none of the items in the list are files, assume YML formatted strings + rule_collection = SigmaCollection.merge([SigmaCollection.from_yaml(i) for i in sigma_rules]) + else: + logging.error("There appears to be a mix of files and YML strings. Cannot process a mixed list of values. Aborting.") + return {'queries': []} - rule_collection = SigmaCollection.load_ruleset(sigma_rules) if supports_json_ouput: return backend.convert(rule_collection, "json") else: diff --git a/definitions/remote-admin.json b/definitions/remote-admin.json index dda81e1..fc611a3 100644 --- a/definitions/remote-admin.json +++ b/definitions/remote-admin.json @@ -183,7 +183,8 @@ "domain": ["*.level.io"], "digsig_publisher": ["Level Software, Inc."], "process_name": ["level-windows-amd64.exe", - "level.exe"] + "level.exe", + "level-remote-control-ffmpeg.exe"] }, "FixMe": { "domain": ["fixme.it"], diff --git a/surveyor.py b/surveyor.py index 022650c..5cbcd29 100644 --- a/surveyor.py +++ b/surveyor.py @@ -236,12 +236,6 @@ def survey(ctx, product_str: str = 'cbr') -> None: if opt.days and opt.minutes: ctx.fail('--days and --minutes are mutually exclusive') - if (opt.sigma_rule or opt.sigma_dir) and product_str == 'cortex': - ctx.fail('Neither --sigmarule nor --sigmadir are supported by product "cortex"') - - if (opt.sigma_rule or opt.sigma_dir) and product_str == 's1' and opt.product_args['pq']: - ctx.fail('Neither --sigmarule nor --sigmadir are supported by SentinelOne PowerQuery') - if opt.sigma_rule and not os.path.isfile(opt.sigma_rule): ctx.fail('Supplied --sigmarule is not a file') @@ -428,7 +422,8 @@ def survey(ctx, product_str: str = 'cbr') -> None: # if there's sigma rules to be processed if len(sigma_rules) > 0: - translated_rules = sigma_translation(product_str, sigma_rules) + pq_check = True if 'pq' in opt.product_args and opt.product_args['pq'] else False + translated_rules = sigma_translation(product_str, sigma_rules, pq_check) if len(translated_rules['queries']) != len(sigma_rules): log.warning(f"Only {len(translated_rules['queries'])} out of {len(sigma_rules)} were able to be translated.") for rule in tqdm(translated_rules['queries'], desc="Processing sigma rules", disable=opt.no_progress): diff --git a/tests/test_surveyor.py b/tests/test_surveyor.py index e53ee3c..fe49803 100644 --- a/tests/test_surveyor.py +++ b/tests/test_surveyor.py @@ -426,51 +426,3 @@ def test_sigma_dir_with_base_query(runner, mocker): mocker.call(Tag('Test sigma rule 2 - 15ecb82d-b7c0-4e53-9bf3-deedb4c9908c', 'Sigma Rule'), {"query":["process_name:powershell.exe"]}, {'username':'admin', 'hostname':'workstation1','days':5 })] assert "Processing sigma rules" in result.output mocked_nested_process_search.assert_has_calls(expected_calls, any_order=True) - - -def test_sigma_rule_with_cortex(runner, mocker): - mocker.patch('products.vmware_cb_response.CbResponse._authenticate') - with runner.isolated_filesystem() as temp_dir: - cred_file = os.path.join(temp_dir, "test.ini") - with open(cred_file, 'w') as cred_file_output: - cred_file_output.write("testing123") - - result = runner.invoke(cli, ['--sigmarule', 'test.yml', 'cortex', '--creds', cred_file]) - assert 'Neither --sigmarule nor --sigmadir are supported by product "cortex"' in result.output - assert result.exit_code != 0 - - -def test_sigma_dir_with_cortex(runner, mocker): - mocker.patch('products.vmware_cb_response.CbResponse._authenticate') - with runner.isolated_filesystem() as temp_dir: - cred_file = os.path.join(temp_dir, "test.ini") - with open(cred_file, 'w') as cred_file_output: - cred_file_output.write("testing123") - - result = runner.invoke(cli, ['--sigmadir', './sigma_dir', 'cortex', '--creds', cred_file]) - assert 'Neither --sigmarule nor --sigmadir are supported by product "cortex"' in result.output - assert result.exit_code != 0 - - -def test_sigma_rule_with_s1_pq(runner, mocker): - mocker.patch('products.vmware_cb_response.CbResponse._authenticate') - with runner.isolated_filesystem() as temp_dir: - cred_file = os.path.join(temp_dir, "test.ini") - with open(cred_file, 'w') as cred_file_output: - cred_file_output.write("testing123") - - result = runner.invoke(cli, ['--sigmarule', 'test.yml', 's1', '--creds', cred_file]) - assert 'Neither --sigmarule nor --sigmadir are supported by SentinelOne PowerQuery' in result.output - assert result.exit_code != 0 - - -def test_sigma_dir_with_s1_pq(runner, mocker): - mocker.patch('products.vmware_cb_response.CbResponse._authenticate') - with runner.isolated_filesystem() as temp_dir: - cred_file = os.path.join(temp_dir, "test.ini") - with open(cred_file, 'w') as cred_file_output: - cred_file_output.write("testing123") - - result = runner.invoke(cli, ['--sigmadir', './sigma_dir', 's1', '--creds', cred_file]) - assert 'Neither --sigmarule nor --sigmadir are supported by SentinelOne PowerQuery' in result.output - assert result.exit_code != 0 \ No newline at end of file