Skip to content

Commit

Permalink
add tests for module max-attack-time
Browse files Browse the repository at this point in the history
  • Loading branch information
bretfourbe committed Sep 26, 2023
1 parent 44b3fef commit d495f9f
Show file tree
Hide file tree
Showing 5 changed files with 298 additions and 0 deletions.
87 changes: 87 additions & 0 deletions tests/attack/test_mod_buster.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,90 @@ async def test_whole_stuff():
assert "http://perdu.com/admin" in persister.add_payload.call_args_list[0][1]["info"]
assert "http://perdu.com/config.inc" in persister.add_payload.call_args_list[1][1]["info"]
assert "http://perdu.com/admin/authconfig.php" in persister.add_payload.call_args_list[2][1]["info"]


@pytest.mark.asyncio
@respx.mock
async def test_max_attack_time_5():
# Test attacking with max_attack_time limitation
respx.get("http://perdu.com/").mock(return_value=httpx.Response(200, text="Default page"))
respx.get("http://perdu.com/admin").mock(
return_value=httpx.Response(301, text="Hello there", headers={"Location": "/admin/"})
)
respx.get("http://perdu.com/admin/").mock(return_value=httpx.Response(200, text="Hello there"))
respx.get("http://perdu.com/config.inc").mock(return_value=httpx.Response(200, text="pass = 123456"))
respx.get("http://perdu.com/admin/authconfig.php").mock(return_value=httpx.Response(200, text="Hello there"))
respx.get(url__regex=r"http://perdu\.com/.*").mock(return_value=httpx.Response(404))

persister = AsyncMock()

request = Request("http://perdu.com/")
request.path_id = 1
# Buster module will get requests from the persister
persister.get_links = AsyncIterator([(request, None)])

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"), timeout=1)
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2, "tasks": 1, "max_attack_time": 5}

files = {
"wordlist.txt": "nawak\nadmin\n" +
"nawak\n" * 2000 + "config.inc\n" +
"nawak\n" * 2000 + "authconfig.php",
}
with mock.patch("builtins.open", get_mock_open(files)):
module = ModuleBuster(crawler, persister, options, Event(), crawler_configuration)
module.DATA_DIR = ""
module.PATHS_FILE = "wordlist.txt"
module.do_get = True
await module.attack(request)

assert module.known_dirs == ["http://perdu.com/", "http://perdu.com/admin/"]
assert module.known_pages == ["http://perdu.com/config.inc"]
assert persister.add_payload.call_count == 2
assert "http://perdu.com/admin" in persister.add_payload.call_args_list[0][1]["info"]
assert "http://perdu.com/config.inc" in persister.add_payload.call_args_list[1][1]["info"]


@pytest.mark.asyncio
@respx.mock
async def test_max_attack_time_10():
# Test attacking with max_attack_time limitation
respx.get("http://perdu.com/").mock(return_value=httpx.Response(200, text="Default page"))
respx.get("http://perdu.com/admin").mock(
return_value=httpx.Response(301, text="Hello there", headers={"Location": "/admin/"})
)
respx.get("http://perdu.com/admin/").mock(return_value=httpx.Response(200, text="Hello there"))
respx.get("http://perdu.com/config.inc").mock(return_value=httpx.Response(200, text="pass = 123456"))
respx.get("http://perdu.com/admin/authconfig.php").mock(return_value=httpx.Response(200, text="Hello there"))
respx.get(url__regex=r"http://perdu\.com/.*").mock(return_value=httpx.Response(404))

persister = AsyncMock()

request = Request("http://perdu.com/")
request.path_id = 1
# Buster module will get requests from the persister
persister.get_links = AsyncIterator([(request, None)])

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"), timeout=1)
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2, "tasks": 1, "max_attack_time": 10}

files = {
"wordlist.txt": "nawak\nadmin\n" +
"nawak\n" * 2000 + "config.inc\n" +
"nawak\n" * 2000 + "authconfig.php",
}
with mock.patch("builtins.open", get_mock_open(files)):
module = ModuleBuster(crawler, persister, options, Event(), crawler_configuration)
module.DATA_DIR = ""
module.PATHS_FILE = "wordlist.txt"
module.do_get = True
await module.attack(request)

assert module.known_dirs == ["http://perdu.com/", "http://perdu.com/admin/"]
assert module.known_pages == ["http://perdu.com/config.inc", "http://perdu.com/admin/authconfig.php"]
assert persister.add_payload.call_count == 3
assert "http://perdu.com/admin" in persister.add_payload.call_args_list[0][1]["info"]
assert "http://perdu.com/config.inc" in persister.add_payload.call_args_list[1][1]["info"]
assert "http://perdu.com/admin/authconfig.php" in persister.add_payload.call_args_list[2][1]["info"]
56 changes: 56 additions & 0 deletions tests/attack/test_mod_exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,59 @@ def timeout_callback(http_request):
# then 3 requests for the sleep payload (first then two retries to check random lags)
# then 1 request to check state of original request
assert respx.calls.call_count == payloads_until_sleep + 3 + 1


@pytest.mark.asyncio
@respx.mock
async def test_max_attack_time_5():
respx.get(url__regex=r"http://perdu\.com/\?test=.*&vuln=.*env.*").mock(
return_value=httpx.Response(200, text="PATH=/bin:/usr/bin;PWD=/")
)

respx.get(url__regex=r"http://perdu\.com/\?test=.*&vuln=.*").mock(
return_value=httpx.Response(200, text="Hello there")
)

persister = AsyncMock()

request = Request("http://perdu.com/?" + "test=test&" * 80 + "vuln=hello")
request.path_id = 1

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"), timeout=1)
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 1, "max_attack_time": 5}

module = ModuleExec(crawler, persister, options, Event(), crawler_configuration)
await module.attack(request)

assert persister.add_payload.call_count == 0


@pytest.mark.asyncio
@respx.mock
async def test_max_attack_time_10():
respx.get(url__regex=r"http://perdu\.com/\?test=.*&vuln=.*env.*").mock(
return_value=httpx.Response(200, text="PATH=/bin:/usr/bin;PWD=/")
)

respx.get(url__regex=r"http://perdu\.com/\?test=.*&vuln=.*").mock(
return_value=httpx.Response(200, text="Hello there")
)

persister = AsyncMock()

request = Request("http://perdu.com/?" + "test=test&" * 80 + "vuln=hello")
request.path_id = 1

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"), timeout=1)
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 1, "max_attack_time": 10}

module = ModuleExec(crawler, persister, options, Event(), crawler_configuration)
await module.attack(request)

assert persister.add_payload.call_count == 1
assert persister.add_payload.call_args_list[0][1]["module"] == "exec"
assert persister.add_payload.call_args_list[0][1]["category"] == "Command execution"
assert persister.add_payload.call_args_list[0][1]["request"].get_params == \
[["test", "test"]] * 80 + [["vuln", ";env;"]]
39 changes: 39 additions & 0 deletions tests/attack/test_mod_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,45 @@ async def test_loknop_lfi_to_rce():
)


@pytest.mark.asyncio
async def test_max_attack_time_5():
# https://gist.github.com/loknop/b27422d355ea1fd0d90d6dbc1e278d4d
persister = AsyncMock()
request = Request("http://127.0.0.1:65085/inclusion.php?" + "yolo=nawak&" * 30 + "f=toto")
request.path_id = 42

crawler_configuration = CrawlerConfiguration(Request("http://127.0.0.1:65085/"))
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2, "max_attack_time": 5}

module = ModuleFile(crawler, persister, options, Event(), crawler_configuration)
module.do_post = False
await module.attack(request)

assert persister.add_payload.call_count == 0


@pytest.mark.asyncio
async def test_max_attack_time_10():
# https://gist.github.com/loknop/b27422d355ea1fd0d90d6dbc1e278d4d
persister = AsyncMock()
request = Request("http://127.0.0.1:65085/inclusion.php?" + "yolo=nawak&" * 30 + "f=toto")
request.path_id = 42

crawler_configuration = CrawlerConfiguration(Request("http://127.0.0.1:65085/"))
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2, "max_attack_time": 10}

module = ModuleFile(crawler, persister, options, Event(), crawler_configuration)
module.do_post = False
await module.attack(request)

assert persister.add_payload.call_count == 1
assert persister.add_payload.call_args_list[0][1]["module"] == "file"
assert persister.add_payload.call_args_list[0][1]["category"] == "Path Traversal"
assert ["f", "/etc/services"] in persister.add_payload.call_args_list[0][1]["request"].get_params


@pytest.mark.asyncio
async def test_warning_false_positive():
persister = AsyncMock()
Expand Down
76 changes: 76 additions & 0 deletions tests/attack/test_mod_nikto.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,82 @@ async def test_whole_stuff():
) in persister.add_payload.call_args_list[0][1]["info"]


@pytest.mark.asyncio
@respx.mock
async def test_max_attack_time_5():
# Test attacking with max_attack_time limitation
respx.route(host="raw.githubusercontent.com").pass_through()

respx.get("http://perdu.com/README.md").mock(
return_value=httpx.Response(200, text="root:0:0:")
)

respx.route(host="perdu.com").mock(
return_value=httpx.Response(404, text="Not found")
)

persister = AsyncMock()
home_dir = os.getenv("HOME") or os.getenv("USERPROFILE") or "/home"
base_dir = os.path.join(home_dir, ".wapiti")
persister.CONFIG_DIR = os.path.join(base_dir, "config")

request = Request("http://perdu.com/")
request.path_id = 1
persister.get_links.return_value = chain([request])

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"), timeout=1)
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2, "tasks": 1, "max_attack_time": 5}

module = ModuleNikto(crawler, persister, options, Event(), crawler_configuration)
module.do_get = True
await module.attack(request)

assert persister.add_payload.call_count == 0


@pytest.mark.asyncio
@respx.mock
async def test_max_attack_time_10():
# Test attacking all kind of parameter without crashing
respx.route(host="raw.githubusercontent.com").pass_through()

respx.get("http://perdu.com/README.md").mock(
return_value=httpx.Response(200, text="root:0:0:")
)

respx.route(host="perdu.com").mock(
return_value=httpx.Response(404, text="Not found")
)

persister = AsyncMock()
home_dir = os.getenv("HOME") or os.getenv("USERPROFILE") or "/home"
base_dir = os.path.join(home_dir, ".wapiti")
persister.CONFIG_DIR = os.path.join(base_dir, "config")

request = Request("http://perdu.com/")
request.path_id = 1
persister.get_links.return_value = chain([request])

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"), timeout=1)
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2, "tasks": 1, "max_attack_time": 10}

module = ModuleNikto(crawler, persister, options, Event(), crawler_configuration)
module.do_get = True
await module.attack(request)

assert persister.add_payload.call_count == 1
assert persister.add_payload.call_args_list[0][1]["module"] == "nikto"
assert persister.add_payload.call_args_list[0][1]["category"] == "Potentially dangerous file"
assert persister.add_payload.call_args_list[0][1]["request"].url == (
"http://perdu.com/README.md"
)
assert (
"Readme Found"
) in persister.add_payload.call_args_list[0][1]["info"]


@pytest.mark.asyncio
@respx.mock
async def test_false_positives():
Expand Down
40 changes: 40 additions & 0 deletions tests/attack/test_mod_timesql.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,46 @@ async def test_timesql_detection():
]


@pytest.mark.asyncio
async def test_max_attack_time_5():
persister = AsyncMock()
request = Request("http://127.0.0.1:65082/blind_sql.php?" + "foo=bar&" * 10 + "vuln1=hello%20there")
request.path_id = 42
crawler_configuration = CrawlerConfiguration(Request("http://127.0.0.1:65082/"), timeout=1)
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
# Time out is set to 0 because in blind_sql.php we have a sleep(2) call
# and in the module we have ceil(attack_options.get("timeout", self.time_to_sleep)) + 1
options = {"timeout": 0, "level": 1, "max_attack_time": 5}

module = ModuleTimesql(crawler, persister, options, Event(), crawler_configuration)
module.do_post = False
await module.attack(request)

assert persister.add_payload.call_count == 0


@pytest.mark.asyncio
async def test_max_attack_time_10():
persister = AsyncMock()
request = Request("http://127.0.0.1:65082/blind_sql.php?" + "foo=bar&" * 10 + "vuln1=hello%20there")
request.path_id = 42
crawler_configuration = CrawlerConfiguration(Request("http://127.0.0.1:65082/"), timeout=1)
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
# Time out is set to 0 because in blind_sql.php we have a sleep(2) call
# and in the module we have ceil(attack_options.get("timeout", self.time_to_sleep)) + 1
options = {"timeout": 0, "level": 1, "max_attack_time": 10}

module = ModuleTimesql(crawler, persister, options, Event(), crawler_configuration)
module.do_post = False
await module.attack(request)

assert persister.add_payload.call_count == 1
assert persister.add_payload.call_args_list[0][1]["module"] == "timesql"
assert persister.add_payload.call_args_list[0][1]["category"] == "SQL Injection"
assert persister.add_payload.call_args_list[0][1]["request"].get_params == \
[['foo', 'bar']] * 10 + [['vuln1', 'sleep(1)#1']]


@pytest.mark.asyncio
async def test_timesql_false_positive():
persister = AsyncMock()
Expand Down

0 comments on commit d495f9f

Please sign in to comment.