Skip to content

Commit

Permalink
Remove max-attack-time tests in modules
Browse files Browse the repository at this point in the history
  • Loading branch information
bretfourbe committed Nov 27, 2023
1 parent 7b228af commit 6d9c2af
Show file tree
Hide file tree
Showing 8 changed files with 2 additions and 437 deletions.
43 changes: 0 additions & 43 deletions tests/attack/test_mod_buster.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,46 +59,3 @@ async def test_whole_stuff():
async def delayed_response():
await sleep(15)
return httpx.Response(200, text="Hello there")


@pytest.mark.asyncio
@respx.mock
async def test_max_attack_time_10():
# Test attacking with max_attack_time limitation
respx.get("http://perdu.com/").mock(return_value=httpx.Response(200, text="Default page"))
respx.get("http://perdu.com/admin").mock(
return_value=httpx.Response(301, text="Hello there", headers={"Location": "/admin/"})
)
respx.get("http://perdu.com/admin/").mock(return_value=httpx.Response(200, text="Hello there"))
respx.get("http://perdu.com/config.inc").mock(return_value=httpx.Response(200, text="pass = 123456"))
respx.get("http://perdu.com/admin/authconfig.php").mock(side_effect=delayed_response())
respx.get(url__regex=r"http://perdu\.com/.*").mock(return_value=httpx.Response(404))

persister = AsyncMock()

request = Request("http://perdu.com/")
request.path_id = 1
# Buster module will get requests from the persister
persister.get_links = AsyncIterator([(request, None)])

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"), timeout=1)
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 20, "level": 2, "tasks": 1, "max_attack_time": 10}

files = {
"wordlist.txt": "nawak\nadmin\n" +
"nawak\n" * 1000 + "config.inc\n" +
"nawak\n" * 1000 + "authconfig.php",
}
with mock.patch("builtins.open", get_mock_open(files)):
module = ModuleBuster(crawler, persister, options, Event(), crawler_configuration)
module.DATA_DIR = ""
module.PATHS_FILE = "wordlist.txt"
module.do_get = True
await module.attack(request)

assert module.known_dirs == ["http://perdu.com/", "http://perdu.com/admin/"]
assert module.known_pages == ["http://perdu.com/config.inc"]
assert persister.add_payload.call_count == 2
assert "http://perdu.com/admin" in persister.add_payload.call_args_list[0][1]["info"]
assert "http://perdu.com/config.inc" in persister.add_payload.call_args_list[1][1]["info"]
71 changes: 0 additions & 71 deletions tests/attack/test_mod_exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,74 +139,3 @@ def timeout_callback(http_request):
async def delayed_response():
await Sleep(6)
return httpx.Response(200, text="PATH=/bin:/usr/bin;PWD=/")


@pytest.mark.asyncio
@respx.mock
async def test_max_attack_time_5():
respx.get(url__regex=r"http://perdu\.com/\?vuln1=.*env.*&vuln2=there").mock(
side_effect=delayed_response()
)
respx.get(url__regex=r"http://perdu.com/\?vuln1=hello&vuln2=.*env.*").mock(
return_value=httpx.Response(200, text="PATH=/bin:/usr/bin;PWD=/")
)

respx.get(url__regex=r"http://perdu\.com/\?vuln1=.*&vuln=.*").mock(
return_value=httpx.Response(200, text="Hello there")
)

persister = AsyncMock()

request = Request("http://perdu.com/?vuln1=hello&vuln2=there")
request.path_id = 1

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"), timeout=1)
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 20, "level": 1, "max_attack_time": 5}

module = ModuleExec(crawler, persister, options, Event(), crawler_configuration)
await module.attack(request)

assert persister.add_payload.call_count == 1
assert persister.add_payload.call_args_list[0][1]["module"] == "exec"
assert persister.add_payload.call_args_list[0][1]["category"] == "Command execution"
assert persister.add_payload.call_args_list[0][1]["request_id"] == 1
assert persister.add_payload.call_args_list[0][1]["request"].get_params == \
[["vuln1", ";env;"]] + [["vuln2", "there"]]


@pytest.mark.asyncio
@respx.mock
async def test_max_attack_time_10():
respx.get(url__regex=r"http://perdu\.com/\?vuln1=.*env.*&vuln2=there").mock(
side_effect=delayed_response()
)
respx.get(url__regex=r"http://perdu.com/\?vuln1=hello&vuln2=.*env.*").mock(
return_value=httpx.Response(200, text="PATH=/bin:/usr/bin;PWD=/")
)

respx.get(url__regex=r"http://perdu\.com/\?vuln1=.*&vuln=.*").mock(
return_value=httpx.Response(200, text="Hello there")
)

persister = AsyncMock()

request = Request("http://perdu.com/?vuln1=hello&vuln2=there")
request.path_id = 1

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"), timeout=1)
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 20, "level": 1, "max_attack_time": 10}

module = ModuleExec(crawler, persister, options, Event(), crawler_configuration)
await module.attack(request)

assert persister.add_payload.call_count == 2
assert persister.add_payload.call_args_list[0][1]["module"] == "exec"
assert persister.add_payload.call_args_list[0][1]["category"] == "Command execution"
assert persister.add_payload.call_args_list[0][1]["request"].get_params == \
[["vuln1", ";env;"]] + [["vuln2", "there"]]
assert persister.add_payload.call_args_list[1][1]["module"] == "exec"
assert persister.add_payload.call_args_list[1][1]["category"] == "Command execution"
assert persister.add_payload.call_args_list[1][1]["request"].get_params == \
[["vuln1", "hello"]] + [["vuln2", ";env;"]]
94 changes: 0 additions & 94 deletions tests/attack/test_mod_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,100 +92,6 @@ async def delayed_response():
return mock_response


@pytest.mark.asyncio
@respx.mock
async def test_max_attack_time_5():
# https://gist.github.com/loknop/b27422d355ea1fd0d90d6dbc1e278d4d
respx.get("http://127.0.0.1:65085/inclusion.php?yolo=nawak&f=toto").mock(
return_value=httpx.Response(200, text="Hello")
)
respx.get("http://127.0.0.1:65085/inclusion.php?yolo=%2Fetc%2Fpasswd&f=toto").mock(
side_effect=delayed_response())

respx.get("http://127.0.0.1:65085/inclusion.php?yolo=nawak&f=%2Fetc%2Fpasswd").mock(
return_value=httpx.Response(200, text="Warning: AnotherFunction() Description of the warning \
root:x:0:0:root:/root:/bin/bash")
)

respx.get(url__regex=r"http://127\.0\.0\.1:65085/inclusion\.php\?yolo=.*&f=.*").mock(
return_value=httpx.Response(200, text="Hello")
)


respx.get(url__regex=r"http://127\.0\.0\.1:65085/inclusion2\.php\?yolo=nawak&f=.*").mock(
return_value=httpx.Response(200, text="Hello")
)

respx.get(url__regex=r"http://127\.0\.0\.1:65085/.*").mock(
return_value=httpx.Response(404, text="not found")
)
persister = AsyncMock()

request = Request("http://127.0.0.1:65085/inclusion.php?yolo=nawak&f=toto")
request.path_id = 1

crawler_configuration = CrawlerConfiguration(Request("http://127.0.0.1:65085/"), timeout=1)
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 20, "level": 1, "max_attack_time": 5}

module = ModuleFile(crawler, persister, options, Event(), crawler_configuration)
module.do_post = False
await module.attack(request)

assert persister.add_payload.call_count == 1
assert persister.add_payload.call_args_list[0][1]["module"] == "file"
assert persister.add_payload.call_args_list[0][1]["category"] == "Path Traversal"
assert ["yolo", "/etc/passwd"] in persister.add_payload.call_args_list[0][1]["request"].get_params


@pytest.mark.asyncio
@respx.mock
async def test_max_attack_time_10():
# https://gist.github.com/loknop/b27422d355ea1fd0d90d6dbc1e278d4d
respx.get("http://127.0.0.1:65085/inclusion.php?yolo=nawak&f=toto").mock(
return_value=httpx.Response(200, text="Hello")
)
respx.get("http://127.0.0.1:65085/inclusion.php?yolo=%2Fetc%2Fpasswd&f=toto").mock(
side_effect=delayed_response())

respx.get("http://127.0.0.1:65085/inclusion.php?yolo=nawak&f=%2Fetc%2Fpasswd").mock(
return_value=httpx.Response(200, text="Warning: AnotherFunction() Description of the warning \
root:x:0:0:root:/root:/bin/bash")
)

respx.get(url__regex=r"http://127\.0\.0\.1:65085/inclusion\.php\?yolo=.*&f=.*").mock(
return_value=httpx.Response(200, text="Hello")
)


respx.get(url__regex=r"http://127\.0\.0\.1:65085/inclusion2\.php\?yolo=nawak&f=.*").mock(
return_value=httpx.Response(200, text="Hello")
)

respx.get(url__regex=r"http://127\.0\.0\.1:65085/.*").mock(
return_value=httpx.Response(404, text="not found")
)
persister = AsyncMock()

request = Request("http://127.0.0.1:65085/inclusion.php?yolo=nawak&f=toto")
request.path_id = 1

crawler_configuration = CrawlerConfiguration(Request("http://127.0.0.1:65085/"), timeout=1)
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 20, "level": 1, "max_attack_time": 10}

module = ModuleFile(crawler, persister, options, Event(), crawler_configuration)
module.do_post = False
await module.attack(request)

assert persister.add_payload.call_count == 2
assert persister.add_payload.call_args_list[0][1]["module"] == "file"
assert persister.add_payload.call_args_list[0][1]["category"] == "Path Traversal"
assert ["yolo", "/etc/passwd"] in persister.add_payload.call_args_list[0][1]["request"].get_params
assert persister.add_payload.call_args_list[1][1]["module"] == "file"
assert persister.add_payload.call_args_list[1][1]["category"] == "Path Traversal"
assert ["f", "/etc/passwd"] in persister.add_payload.call_args_list[1][1]["request"].get_params

@pytest.mark.asyncio
async def test_warning_false_positive():
persister = AsyncMock()
Expand Down
51 changes: 1 addition & 50 deletions tests/attack/test_mod_log4shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
import os
import random
from asyncio import Event, sleep
from typing import Dict
from unittest import mock
from unittest.mock import MagicMock, mock_open, patch, AsyncMock
from unittest.mock import patch, AsyncMock
from httpx import Response as HttpxResponse
import uuid

Expand Down Expand Up @@ -396,51 +395,3 @@ async def test_attack_unifi():

assert crawler.async_send.assert_called_once
assert mock_verify_url.assert_called_once


async def delayed_response():
await sleep(5)
return httpx.Response(200, text="Hi there")

@pytest.mark.asyncio
@respx.mock
async def test_max_attack_time_5():

# When a vuln has been found
persister = AsyncMock()
home_dir = os.getenv("HOME") or os.getenv("USERPROFILE") or "/home"
base_dir = os.path.join(home_dir, ".wapiti")
persister.CONFIG_DIR = os.path.join(base_dir, "config")


def mock_generate_payload(_self, _unique_id: uuid.UUID) -> str:
return "${jndi:dns://dns.wapiti3.ovh/" + "903fca88-6489-4403-8b50-1c330f9989e4.l}"


respx.get("http://perdu.com/",
headers__contains={"authorization": "${jndi:dns://dns.wapiti3.ovh/"+"903fca88-6489-4403-8b50-1c330f9989e4.l}"}
).mock(side_effect=delayed_response())

respx.get("http://perdu.com/?SAMLRequest=").mock(
return_value=httpx.Response(200, text="Hi there")
)

respx.get("http://perdu.com/").mock(
return_value=httpx.Response(200, text="Hi there")
)

request = Request("http://perdu.com/")
request.path_id = 1

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"))
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2, "max_attack_time": 5}

module = ModuleLog4Shell(crawler, persister, options, Event(), crawler_configuration)

with patch.object(module, "_verify_dns", return_value=True):
with patch.object(module, '_generate_payload', return_value=mock_generate_payload(module, "1")):
await module.attack(request)

# 1 for X-Forwarded-For + 10 headers on first request
assert persister.add_payload.call_count == 11
103 changes: 0 additions & 103 deletions tests/attack/test_mod_nikto.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,109 +55,6 @@ async def test_whole_stuff():
) in persister.add_payload.call_args_list[0][1]["info"]


async def delayed_response():
await sleep(4)
return httpx.Response(200, text="uid=0")

@pytest.mark.asyncio
@respx.mock
async def test_max_attack_time_5():
# Test attacking with max_attack_time limitation
respx.route(host="raw.githubusercontent.com").pass_through()

respx.get("http://perdu.com/guestbook/pwd").mock(
side_effect=delayed_response()
)
respx.get("http://perdu.com/cgi-bin/a1disp3.cgi?../../../../../../../../../../etc/passwd").mock(
return_value=httpx.Response(200, text="root:0:0:")
)

respx.route(host="perdu.com").mock(
return_value=httpx.Response(404, text="Not found")
)

persister = AsyncMock()
home_dir = os.getenv("HOME") or os.getenv("USERPROFILE") or "/home"
base_dir = os.path.join(home_dir, ".wapiti")
persister.CONFIG_DIR = os.path.join(base_dir, "config")

request = Request("http://perdu.com/")
request.path_id = 1
persister.get_links.return_value = chain([request])

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"), timeout=1)
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 20, "level": 2, "tasks": 1, "max_attack_time": 5}

module = ModuleNikto(crawler, persister, options, Event(), crawler_configuration)
module.do_get = True
await module.attack(request)

assert persister.add_payload.call_count == 1
assert persister.add_payload.call_args_list[0][1]["module"] == "nikto"
assert persister.add_payload.call_args_list[0][1]["category"] == "Potentially dangerous file"
assert persister.add_payload.call_args_list[0][1]["request"].url == (
"http://perdu.com/guestbook/pwd"
)
assert (
"PHP-Gastebuch 1.60 Beta reveals the md5 hash of the admin password"
) in persister.add_payload.call_args_list[0][1]["info"]


@pytest.mark.asyncio
@respx.mock
async def test_max_attack_time_10():
# Test attacking with max_attack_time limitation
respx.route(host="raw.githubusercontent.com").pass_through()

respx.get("http://perdu.com/guestbook/pwd").mock(
side_effect=delayed_response()
)
respx.get("http://perdu.com/cgi-bin/a1disp3.cgi?../../../../../../../../../../etc/passwd").mock(
return_value=httpx.Response(200, text="root:0:0:")
)

respx.route(host="perdu.com").mock(
return_value=httpx.Response(404, text="Not found")
)

persister = AsyncMock()
home_dir = os.getenv("HOME") or os.getenv("USERPROFILE") or "/home"
base_dir = os.path.join(home_dir, ".wapiti")
persister.CONFIG_DIR = os.path.join(base_dir, "config")

request = Request("http://perdu.com/")
request.path_id = 1
persister.get_links.return_value = chain([request])

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"), timeout=1)
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 20, "level": 2, "tasks": 1, "max_attack_time": 10}

module = ModuleNikto(crawler, persister, options, Event(), crawler_configuration)
module.do_get = True
await module.attack(request)

assert persister.add_payload.call_count == 2
assert persister.add_payload.call_args_list[0][1]["module"] == "nikto"
assert persister.add_payload.call_args_list[0][1]["category"] == "Potentially dangerous file"
assert persister.add_payload.call_args_list[0][1]["request"].url == (
"http://perdu.com/guestbook/pwd"
)
assert (
"PHP-Gastebuch 1.60 Beta reveals the md5 hash of the admin password"
) in persister.add_payload.call_args_list[0][1]["info"]

assert persister.add_payload.call_args_list[1][1]["module"] == "nikto"
assert persister.add_payload.call_args_list[1][1]["category"] == "Potentially dangerous file"
assert persister.add_payload.call_args_list[1][1]["request"].url == (
"http://perdu.com/cgi-bin/a1disp3.cgi?..%2F..%2F..%2F..%2F..%2F..%2F..%2F..%2F..%2F..%2Fetc%2Fpasswd"
)
assert (
"This CGI allows attackers read arbitrary files on the host"
) in persister.add_payload.call_args_list[1][1]["info"]


@pytest.mark.asyncio
@respx.mock
async def test_false_positives():
Expand Down
Loading

0 comments on commit 6d9c2af

Please sign in to comment.