From 49282524aff874b1bd827154d3606a388254ea81 Mon Sep 17 00:00:00 2001 From: bretfourbe Date: Tue, 26 Sep 2023 10:36:46 +0200 Subject: [PATCH] add tests for module max-attack-time --- tests/attack/test_mod_buster.py | 87 ++++++++++++++++++++++++++++++++ tests/attack/test_mod_exec.py | 56 ++++++++++++++++++++ tests/attack/test_mod_file.py | 39 ++++++++++++++ tests/attack/test_mod_nikto.py | 76 ++++++++++++++++++++++++++++ tests/attack/test_mod_timesql.py | 40 +++++++++++++++ 5 files changed, 298 insertions(+) diff --git a/tests/attack/test_mod_buster.py b/tests/attack/test_mod_buster.py index 08d9dc19f..34af0277d 100644 --- a/tests/attack/test_mod_buster.py +++ b/tests/attack/test_mod_buster.py @@ -54,3 +54,90 @@ async def test_whole_stuff(): assert "http://perdu.com/admin" in persister.add_payload.call_args_list[0][1]["info"] assert "http://perdu.com/config.inc" in persister.add_payload.call_args_list[1][1]["info"] assert "http://perdu.com/admin/authconfig.php" in persister.add_payload.call_args_list[2][1]["info"] + + +@pytest.mark.asyncio +@respx.mock +async def test_max_attack_time_5(): + # Test attacking with max_attack_time limitation + respx.get("http://perdu.com/").mock(return_value=httpx.Response(200, text="Default page")) + respx.get("http://perdu.com/admin").mock( + return_value=httpx.Response(301, text="Hello there", headers={"Location": "/admin/"}) + ) + respx.get("http://perdu.com/admin/").mock(return_value=httpx.Response(200, text="Hello there")) + respx.get("http://perdu.com/config.inc").mock(return_value=httpx.Response(200, text="pass = 123456")) + respx.get("http://perdu.com/admin/authconfig.php").mock(return_value=httpx.Response(200, text="Hello there")) + respx.get(url__regex=r"http://perdu\.com/.*").mock(return_value=httpx.Response(404)) + + persister = AsyncMock() + + request = Request("http://perdu.com/") + request.path_id = 1 + # Buster module will get requests from the persister + persister.get_links = AsyncIterator([(request, None)]) + + crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"), timeout=1) + async with AsyncCrawler.with_configuration(crawler_configuration) as crawler: + options = {"timeout": 10, "level": 2, "tasks": 1, "max_attack_time": 5} + + files = { + "wordlist.txt": "nawak\nadmin\n" + + "nawak\n" * 2000 + "config.inc\n" + + "nawak\n" * 2000 + "authconfig.php", + } + with mock.patch("builtins.open", get_mock_open(files)): + module = ModuleBuster(crawler, persister, options, Event(), crawler_configuration) + module.DATA_DIR = "" + module.PATHS_FILE = "wordlist.txt" + module.do_get = True + await module.attack(request) + + assert module.known_dirs == ["http://perdu.com/", "http://perdu.com/admin/"] + assert module.known_pages == ["http://perdu.com/config.inc"] + assert persister.add_payload.call_count == 2 + assert "http://perdu.com/admin" in persister.add_payload.call_args_list[0][1]["info"] + assert "http://perdu.com/config.inc" in persister.add_payload.call_args_list[1][1]["info"] + + +@pytest.mark.asyncio +@respx.mock +async def test_max_attack_time_10(): + # Test attacking with max_attack_time limitation + respx.get("http://perdu.com/").mock(return_value=httpx.Response(200, text="Default page")) + respx.get("http://perdu.com/admin").mock( + return_value=httpx.Response(301, text="Hello there", headers={"Location": "/admin/"}) + ) + respx.get("http://perdu.com/admin/").mock(return_value=httpx.Response(200, text="Hello there")) + respx.get("http://perdu.com/config.inc").mock(return_value=httpx.Response(200, text="pass = 123456")) + respx.get("http://perdu.com/admin/authconfig.php").mock(return_value=httpx.Response(200, text="Hello there")) + respx.get(url__regex=r"http://perdu\.com/.*").mock(return_value=httpx.Response(404)) + + persister = AsyncMock() + + request = Request("http://perdu.com/") + request.path_id = 1 + # Buster module will get requests from the persister + persister.get_links = AsyncIterator([(request, None)]) + + crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"), timeout=1) + async with AsyncCrawler.with_configuration(crawler_configuration) as crawler: + options = {"timeout": 10, "level": 2, "tasks": 1, "max_attack_time": 10} + + files = { + "wordlist.txt": "nawak\nadmin\n" + + "nawak\n" * 2000 + "config.inc\n" + + "nawak\n" * 2000 + "authconfig.php", + } + with mock.patch("builtins.open", get_mock_open(files)): + module = ModuleBuster(crawler, persister, options, Event(), crawler_configuration) + module.DATA_DIR = "" + module.PATHS_FILE = "wordlist.txt" + module.do_get = True + await module.attack(request) + + assert module.known_dirs == ["http://perdu.com/", "http://perdu.com/admin/"] + assert module.known_pages == ["http://perdu.com/config.inc", "http://perdu.com/admin/authconfig.php"] + assert persister.add_payload.call_count == 3 + assert "http://perdu.com/admin" in persister.add_payload.call_args_list[0][1]["info"] + assert "http://perdu.com/config.inc" in persister.add_payload.call_args_list[1][1]["info"] + assert "http://perdu.com/admin/authconfig.php" in persister.add_payload.call_args_list[2][1]["info"] diff --git a/tests/attack/test_mod_exec.py b/tests/attack/test_mod_exec.py index 126614bac..662d8098e 100644 --- a/tests/attack/test_mod_exec.py +++ b/tests/attack/test_mod_exec.py @@ -130,3 +130,59 @@ def timeout_callback(http_request): # then 3 requests for the sleep payload (first then two retries to check random lags) # then 1 request to check state of original request assert respx.calls.call_count == payloads_until_sleep + 3 + 1 + + +@pytest.mark.asyncio +@respx.mock +async def test_max_attack_time_5(): + respx.get(url__regex=r"http://perdu\.com/\?test=.*&vuln=.*env.*").mock( + return_value=httpx.Response(200, text="PATH=/bin:/usr/bin;PWD=/") + ) + + respx.get(url__regex=r"http://perdu\.com/\?test=.*&vuln=.*").mock( + return_value=httpx.Response(200, text="Hello there") + ) + + persister = AsyncMock() + + request = Request("http://perdu.com/?" + "test=test&" * 80 + "vuln=hello") + request.path_id = 1 + + crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"), timeout=1) + async with AsyncCrawler.with_configuration(crawler_configuration) as crawler: + options = {"timeout": 10, "level": 1, "max_attack_time": 5} + + module = ModuleExec(crawler, persister, options, Event(), crawler_configuration) + await module.attack(request) + + assert persister.add_payload.call_count == 0 + + +@pytest.mark.asyncio +@respx.mock +async def test_max_attack_time_10(): + respx.get(url__regex=r"http://perdu\.com/\?test=.*&vuln=.*env.*").mock( + return_value=httpx.Response(200, text="PATH=/bin:/usr/bin;PWD=/") + ) + + respx.get(url__regex=r"http://perdu\.com/\?test=.*&vuln=.*").mock( + return_value=httpx.Response(200, text="Hello there") + ) + + persister = AsyncMock() + + request = Request("http://perdu.com/?" + "test=test&" * 80 + "vuln=hello") + request.path_id = 1 + + crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"), timeout=1) + async with AsyncCrawler.with_configuration(crawler_configuration) as crawler: + options = {"timeout": 10, "level": 1, "max_attack_time": 10} + + module = ModuleExec(crawler, persister, options, Event(), crawler_configuration) + await module.attack(request) + + assert persister.add_payload.call_count == 1 + assert persister.add_payload.call_args_list[0][1]["module"] == "exec" + assert persister.add_payload.call_args_list[0][1]["category"] == "Command execution" + assert persister.add_payload.call_args_list[0][1]["request"].get_params == \ + [["test", "test"]] * 80 + [["vuln", ";env;"]] diff --git a/tests/attack/test_mod_file.py b/tests/attack/test_mod_file.py index 45bcd79ea..ac03d21f4 100644 --- a/tests/attack/test_mod_file.py +++ b/tests/attack/test_mod_file.py @@ -66,6 +66,45 @@ async def test_loknop_lfi_to_rce(): ) +@pytest.mark.asyncio +async def test_max_attack_time_5(): + # https://gist.github.com/loknop/b27422d355ea1fd0d90d6dbc1e278d4d + persister = AsyncMock() + request = Request("http://127.0.0.1:65085/inclusion.php?" + "yolo=nawak&" * 30 + "f=toto") + request.path_id = 42 + + crawler_configuration = CrawlerConfiguration(Request("http://127.0.0.1:65085/")) + async with AsyncCrawler.with_configuration(crawler_configuration) as crawler: + options = {"timeout": 10, "level": 2, "max_attack_time": 5} + + module = ModuleFile(crawler, persister, options, Event(), crawler_configuration) + module.do_post = False + await module.attack(request) + + assert persister.add_payload.call_count == 0 + + +@pytest.mark.asyncio +async def test_max_attack_time_10(): + # https://gist.github.com/loknop/b27422d355ea1fd0d90d6dbc1e278d4d + persister = AsyncMock() + request = Request("http://127.0.0.1:65085/inclusion.php?" + "yolo=nawak&" * 30 + "f=toto") + request.path_id = 42 + + crawler_configuration = CrawlerConfiguration(Request("http://127.0.0.1:65085/")) + async with AsyncCrawler.with_configuration(crawler_configuration) as crawler: + options = {"timeout": 10, "level": 2, "max_attack_time": 10} + + module = ModuleFile(crawler, persister, options, Event(), crawler_configuration) + module.do_post = False + await module.attack(request) + + assert persister.add_payload.call_count == 1 + assert persister.add_payload.call_args_list[0][1]["module"] == "file" + assert persister.add_payload.call_args_list[0][1]["category"] == "Path Traversal" + assert ["f", "/etc/services"] in persister.add_payload.call_args_list[0][1]["request"].get_params + + @pytest.mark.asyncio async def test_warning_false_positive(): persister = AsyncMock() diff --git a/tests/attack/test_mod_nikto.py b/tests/attack/test_mod_nikto.py index ea83ad788..33366c5c2 100644 --- a/tests/attack/test_mod_nikto.py +++ b/tests/attack/test_mod_nikto.py @@ -55,6 +55,82 @@ async def test_whole_stuff(): ) in persister.add_payload.call_args_list[0][1]["info"] +@pytest.mark.asyncio +@respx.mock +async def test_max_attack_time_5(): + # Test attacking with max_attack_time limitation + respx.route(host="raw.githubusercontent.com").pass_through() + + respx.get("http://perdu.com/README.md").mock( + return_value=httpx.Response(200, text="root:0:0:") + ) + + respx.route(host="perdu.com").mock( + return_value=httpx.Response(404, text="Not found") + ) + + persister = AsyncMock() + home_dir = os.getenv("HOME") or os.getenv("USERPROFILE") or "/home" + base_dir = os.path.join(home_dir, ".wapiti") + persister.CONFIG_DIR = os.path.join(base_dir, "config") + + request = Request("http://perdu.com/") + request.path_id = 1 + persister.get_links.return_value = chain([request]) + + crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"), timeout=1) + async with AsyncCrawler.with_configuration(crawler_configuration) as crawler: + options = {"timeout": 10, "level": 2, "tasks": 1, "max_attack_time": 5} + + module = ModuleNikto(crawler, persister, options, Event(), crawler_configuration) + module.do_get = True + await module.attack(request) + + assert persister.add_payload.call_count == 0 + + +@pytest.mark.asyncio +@respx.mock +async def test_max_attack_time_10(): + # Test attacking all kind of parameter without crashing + respx.route(host="raw.githubusercontent.com").pass_through() + + respx.get("http://perdu.com/README.md").mock( + return_value=httpx.Response(200, text="root:0:0:") + ) + + respx.route(host="perdu.com").mock( + return_value=httpx.Response(404, text="Not found") + ) + + persister = AsyncMock() + home_dir = os.getenv("HOME") or os.getenv("USERPROFILE") or "/home" + base_dir = os.path.join(home_dir, ".wapiti") + persister.CONFIG_DIR = os.path.join(base_dir, "config") + + request = Request("http://perdu.com/") + request.path_id = 1 + persister.get_links.return_value = chain([request]) + + crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"), timeout=1) + async with AsyncCrawler.with_configuration(crawler_configuration) as crawler: + options = {"timeout": 10, "level": 2, "tasks": 1, "max_attack_time": 10} + + module = ModuleNikto(crawler, persister, options, Event(), crawler_configuration) + module.do_get = True + await module.attack(request) + + assert persister.add_payload.call_count == 1 + assert persister.add_payload.call_args_list[0][1]["module"] == "nikto" + assert persister.add_payload.call_args_list[0][1]["category"] == "Potentially dangerous file" + assert persister.add_payload.call_args_list[0][1]["request"].url == ( + "http://perdu.com/README.md" + ) + assert ( + "Readme Found" + ) in persister.add_payload.call_args_list[0][1]["info"] + + @pytest.mark.asyncio @respx.mock async def test_false_positives(): diff --git a/tests/attack/test_mod_timesql.py b/tests/attack/test_mod_timesql.py index ca0d95aef..fc4835892 100644 --- a/tests/attack/test_mod_timesql.py +++ b/tests/attack/test_mod_timesql.py @@ -52,6 +52,46 @@ async def test_timesql_detection(): ] +@pytest.mark.asyncio +async def test_max_attack_time_5(): + persister = AsyncMock() + request = Request("http://127.0.0.1:65082/blind_sql.php?" + "foo=bar&" * 10 + "vuln1=hello%20there") + request.path_id = 42 + crawler_configuration = CrawlerConfiguration(Request("http://127.0.0.1:65082/"), timeout=1) + async with AsyncCrawler.with_configuration(crawler_configuration) as crawler: + # Time out is set to 0 because in blind_sql.php we have a sleep(2) call + # and in the module we have ceil(attack_options.get("timeout", self.time_to_sleep)) + 1 + options = {"timeout": 0, "level": 1, "max_attack_time": 5} + + module = ModuleTimesql(crawler, persister, options, Event(), crawler_configuration) + module.do_post = False + await module.attack(request) + + assert persister.add_payload.call_count == 0 + + +@pytest.mark.asyncio +async def test_max_attack_time_10(): + persister = AsyncMock() + request = Request("http://127.0.0.1:65082/blind_sql.php?" + "foo=bar&" * 10 + "vuln1=hello%20there") + request.path_id = 42 + crawler_configuration = CrawlerConfiguration(Request("http://127.0.0.1:65082/"), timeout=1) + async with AsyncCrawler.with_configuration(crawler_configuration) as crawler: + # Time out is set to 0 because in blind_sql.php we have a sleep(2) call + # and in the module we have ceil(attack_options.get("timeout", self.time_to_sleep)) + 1 + options = {"timeout": 0, "level": 1, "max_attack_time": 10} + + module = ModuleTimesql(crawler, persister, options, Event(), crawler_configuration) + module.do_post = False + await module.attack(request) + + assert persister.add_payload.call_count == 1 + assert persister.add_payload.call_args_list[0][1]["module"] == "timesql" + assert persister.add_payload.call_args_list[0][1]["category"] == "SQL Injection" + assert persister.add_payload.call_args_list[0][1]["request"].get_params == \ + [['foo', 'bar']] * 10 + [['vuln1', 'sleep(1)#1']] + + @pytest.mark.asyncio async def test_timesql_false_positive(): persister = AsyncMock()