forked from wapiti-scanner/wapiti
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add tests and module to test https redirections
- Loading branch information
1 parent
64456d1
commit a8d06d9
Showing
4 changed files
with
352 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,210 @@ | ||
from asyncio import Event | ||
from unittest.mock import AsyncMock | ||
|
||
import pytest | ||
import httpx | ||
import respx | ||
|
||
from wapitiCore.net.classes import CrawlerConfiguration | ||
from wapitiCore.net import Request | ||
from wapitiCore.net.crawler import AsyncCrawler | ||
from wapitiCore.attack.mod_https_redirect import ModuleHttpsRedirect | ||
|
||
@pytest.mark.asyncio | ||
@respx.mock | ||
async def test_no_redirect(): | ||
# Test cases where there is no redirection | ||
respx.get("http://perdu.com/").mock(httpx.Response(200, text="Hello there")) | ||
respx.get("https://perdu.com").mock(httpx.Response(200, text="Hello there")) | ||
respx.post("http://perdu.com/post").mock(httpx.Response(200, text="Hello there")) | ||
respx.post("https://perdu.com/post").mock(httpx.Response(200, text="Hello there")) | ||
|
||
persister = AsyncMock() | ||
all_requests = [] | ||
|
||
request = Request("http://perdu.com") | ||
request.path_id = 1 | ||
all_requests.append(request) | ||
|
||
request = Request("https://perdu.com") | ||
request.path_id = 2 | ||
all_requests.append(request) | ||
|
||
request = Request("http://perdu.com/post", method="POST", post_params=[["a", "b"]]) | ||
request.path_id = 3 | ||
all_requests.append(request) | ||
|
||
request = Request("https://perdu.com/post", method="POST", post_params=[["a", "b"]]) | ||
request.path_id = 4 | ||
all_requests.append(request) | ||
|
||
crawler_configuration = CrawlerConfiguration(Request("https://perdu.com"), timeout=1) | ||
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler: | ||
options = {"timeout": 10, "level": 2} | ||
|
||
module = ModuleHttpsRedirect(crawler, persister, options, Event(), crawler_configuration) | ||
module.do_post = True | ||
for request in all_requests: | ||
await module.attack(request) | ||
|
||
assert persister.add_payload.call_count == 4 | ||
for i in range(persister.add_payload.call_count): | ||
assert persister.add_payload.call_args_list[i][1]["info"] == "No HTTPS redirection" | ||
assert persister.add_payload.call_args_list[i][1]["request"].method == "GET" if i < 2 else "POST" | ||
|
||
|
||
@pytest.mark.asyncio | ||
@respx.mock | ||
async def test_redirect_http(): | ||
# Test cases where there are redirections | ||
respx.get("http://perdu.com/").mock(httpx.Response(301, headers={"Location": "/get"})) | ||
respx.get("http://perdu.com/get").mock(httpx.Response(301, headers={"Location": "https:/perdu.com/"})) | ||
respx.get("https://perdu.com").mock(httpx.Response(200, text="Hello there")) | ||
respx.post("http://perdu.com/post").mock(httpx.Response(301, headers={"Location": "/get"})) | ||
respx.post("https://perdu.com/post").mock(httpx.Response(200, text="Hello there")) | ||
|
||
persister = AsyncMock() | ||
all_requests = [] | ||
|
||
request = Request("http://perdu.com") | ||
request.path_id = 1 | ||
all_requests.append(request) | ||
|
||
request = Request("https://perdu.com") | ||
request.path_id = 2 | ||
all_requests.append(request) | ||
|
||
request = Request("http://perdu.com/get") | ||
request.path_id = 3 | ||
all_requests.append(request) | ||
|
||
request = Request("http://perdu.com/post", method="POST", post_params=[["a", "b"]]) | ||
request.path_id = 4 | ||
all_requests.append(request) | ||
|
||
request = Request("https://perdu.com/post", method="POST", post_params=[["a", "b"]]) | ||
request.path_id = 5 | ||
all_requests.append(request) | ||
|
||
crawler_configuration = CrawlerConfiguration(Request("https://perdu.com"), timeout=1) | ||
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler: | ||
options = {"timeout": 10, "level": 2} | ||
|
||
module = ModuleHttpsRedirect(crawler, persister, options, Event(), crawler_configuration) | ||
module.do_post = True | ||
for request in all_requests: | ||
await module.attack(request) | ||
|
||
assert persister.add_payload.call_count == 4 | ||
for i in range(persister.add_payload.call_count): | ||
assert persister.add_payload.call_args_list[i][1]["info"] == "Redirected to HTTP location : /get" | ||
assert persister.add_payload.call_args_list[i][1]["request"].method == "GET" if i < 2 else "POST" | ||
|
||
|
||
@pytest.mark.asyncio | ||
@respx.mock | ||
async def test_error_response(): | ||
# Test cases where there are redirections | ||
respx.get("http://perdu.com/").mock(httpx.Response(403, text="Forbidden")) | ||
respx.get("https://perdu.com").mock(httpx.Response(200, text="Hello there")) | ||
respx.post("http://perdu.com/post").mock(httpx.Response(500, text="Internal error")) | ||
respx.post("https://perdu.com/post").mock(httpx.Response(200, text="Hello there")) | ||
|
||
persister = AsyncMock() | ||
all_requests = [] | ||
|
||
request = Request("http://perdu.com") | ||
request.path_id = 1 | ||
all_requests.append(request) | ||
|
||
request = Request("https://perdu.com") | ||
request.path_id = 2 | ||
all_requests.append(request) | ||
|
||
request = Request("http://perdu.com/post", method="POST", post_params=[["a", "b"]]) | ||
request.path_id = 3 | ||
all_requests.append(request) | ||
|
||
request = Request("https://perdu.com/post", method="POST", post_params=[["a", "b"]]) | ||
request.path_id = 4 | ||
all_requests.append(request) | ||
|
||
crawler_configuration = CrawlerConfiguration(Request("https://perdu.com"), timeout=1) | ||
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler: | ||
options = {"timeout": 10, "level": 2} | ||
|
||
module = ModuleHttpsRedirect(crawler, persister, options, Event(), crawler_configuration) | ||
module.do_post = True | ||
for request in all_requests: | ||
await module.attack(request) | ||
|
||
assert persister.add_payload.call_count == 2 | ||
for i in range(persister.add_payload.call_count): | ||
assert persister.add_payload.call_args_list[i][1]["info"] == \ | ||
"Received a HTTP 500 error in http://perdu.com/post : 500" | ||
assert persister.add_payload.call_args_list[i][1]["request"].method == "POST" | ||
|
||
|
||
@pytest.mark.asyncio | ||
@respx.mock | ||
async def test_http_url_provided(): | ||
# Test cases where the provided url is http | ||
respx.get("http://perdu.com/").mock(httpx.Response(200, text="Hello there")) | ||
respx.get("http://perdu.com/get").mock(httpx.Response(200, text="Hello there")) | ||
|
||
persister = AsyncMock() | ||
all_requests = [] | ||
|
||
request = Request("http://perdu.com") | ||
request.path_id = 1 | ||
all_requests.append(request) | ||
|
||
request = Request("http://perdu.com/get") | ||
request.path_id = 2 | ||
all_requests.append(request) | ||
|
||
crawler_configuration = CrawlerConfiguration(Request("http://perdu.com"), timeout=1) | ||
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler: | ||
options = {"timeout": 10, "level": 2} | ||
|
||
module = ModuleHttpsRedirect(crawler, persister, options, Event(), crawler_configuration) | ||
module.do_post = True | ||
for request in all_requests: | ||
if not module.finished: | ||
await module.attack(request) | ||
|
||
assert persister.add_payload.call_count == 1 | ||
assert persister.add_payload.call_args_list[0][1]["info"] == "No HTTPS redirection" | ||
assert persister.add_payload.call_args_list[0][1]["request"].method == "GET" | ||
|
||
|
||
@pytest.mark.asyncio | ||
@respx.mock | ||
async def test_specific_port_provided(): | ||
# Test cases where the provided port is specific | ||
respx.get("https://perdu.com:8443/").mock(httpx.Response(200, text="Hello there")) | ||
respx.get("http://perdu.com:8443/").mock(httpx.Response(400, text="SSL error")) | ||
respx.get("https://perdu.com:8443/get").mock(httpx.Response(200, text="Hello there")) | ||
|
||
persister = AsyncMock() | ||
all_requests = [] | ||
|
||
request = Request("https://perdu.com:8443") | ||
request.path_id = 1 | ||
all_requests.append(request) | ||
|
||
request = Request("https://perdu.com:8443/get") | ||
request.path_id = 2 | ||
all_requests.append(request) | ||
|
||
crawler_configuration = CrawlerConfiguration(Request("https://perdu.com/8443"), timeout=1) | ||
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler: | ||
options = {"timeout": 10, "level": 2} | ||
|
||
module = ModuleHttpsRedirect(crawler, persister, options, Event(), crawler_configuration) | ||
module.do_post = True | ||
for request in all_requests: | ||
if not module.finished: | ||
await module.attack(request) | ||
|
||
assert not persister.add_payload.call_count |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,6 +54,7 @@ | |
"htaccess", | ||
"htp", | ||
"http_headers", | ||
"https_redirect", | ||
"log4shell", | ||
"methods", | ||
"nikto", | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
from functools import lru_cache | ||
from typing import Optional | ||
from urllib.parse import urlparse, urlunparse | ||
import socket | ||
|
||
from httpx import RequestError | ||
|
||
from wapitiCore.attack.attack import Attack | ||
from wapitiCore.definitions.https_redirect import NAME, WSTG_CODE | ||
from wapitiCore.definitions.internal_error import WSTG_CODE as INTERNAL_ERROR_WSTG_CODE | ||
from wapitiCore.language.vulnerability import Messages | ||
from wapitiCore.net import Request | ||
from wapitiCore.net.response import Response | ||
from wapitiCore.main.log import log_red, log_orange | ||
|
||
@lru_cache | ||
def test_port(address: str, dest_port: int, timeout: float = None) -> bool: | ||
"""Check if dest_port is open on address""" | ||
try: | ||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: | ||
sock.settimeout(timeout) | ||
if sock.connect_ex((address, dest_port)) == 0: | ||
return True | ||
return False | ||
except (OSError, ValueError): | ||
return False | ||
|
||
# This module check whether HTTP requests are redirected to HTTPS or not | ||
class ModuleHttpsRedirect(Attack): | ||
"""Check for HTTPS redirections.""" | ||
name = "https_redirect" | ||
MSG_VULN_NO_REDIRECT = "No HTTPS redirection" | ||
MSG_VULN_REDIRECT = "Redirected to HTTP" | ||
|
||
async def must_attack(self, request: Request, response: Optional[Response] = None): | ||
if self.finished: | ||
return False | ||
|
||
url = urlparse(request.url) | ||
if not test_port(url.hostname, url.port or 80, self.crawler_configuration.timeout): | ||
log_orange(f"Port {url.port or 80} appears to be closed on {url.hostname}") | ||
self.finished = True | ||
return False | ||
return True | ||
|
||
async def attack(self, request: Request, response: Optional[Response] = None): | ||
|
||
url = urlparse(request.url) | ||
|
||
is_http = False | ||
if url.scheme == "http": | ||
log_red(f"HTTP URL provided : {request.url}") | ||
is_http = True | ||
# if http url is provided we will stop the module after the first request | ||
# this will allow to add only one vuln for http target | ||
self.finished = True | ||
|
||
if url.port: | ||
log_orange(f"Specific port provided : {url.port}") | ||
# if specific port (different from 80/443) is provided | ||
# we will stop the module after the first request | ||
# if service exposed requires SSL/TLS, we will get 400 errors | ||
self.finished = True | ||
|
||
# ensure targeting http url | ||
http_url = request.url if is_http else urlunparse(url._replace(scheme='http')) | ||
http_request = Request( | ||
http_url, request.method, request.get_params, | ||
request.post_params, request.file_params, request.encoding, | ||
request.enctype, request.referer, request.link_depth | ||
) | ||
|
||
try: | ||
http_response: Response = await self.crawler.async_send(http_request, follow_redirects=False) | ||
except RequestError: | ||
self.network_errors += 1 | ||
return | ||
|
||
if http_response.is_success: | ||
log_red(http_response.url) | ||
await self.add_vuln_low( | ||
category=NAME, | ||
request=http_request, | ||
info=self.MSG_VULN_NO_REDIRECT, | ||
wstg=WSTG_CODE, | ||
response=http_response | ||
) | ||
|
||
elif http_response.is_redirect: | ||
# add vuln if redirected to url without https | ||
# might cause false positive in case of multiple redirections | ||
if urlparse(http_response.headers["location"]).scheme != "https": | ||
log_red("Location : " + http_response.headers["location"]) | ||
await self.add_vuln_low( | ||
category=NAME, | ||
request=http_request, | ||
info=f"{self.MSG_VULN_REDIRECT} location : {http_response.headers['location']}", | ||
wstg=WSTG_CODE, | ||
response=http_response | ||
) | ||
else: | ||
log_orange(http_response.url + " responded with code " + str(http_response.status)) | ||
|
||
if http_response.status >= 500: | ||
await self.add_anom_medium( | ||
category=Messages.ERROR_500, | ||
request=http_request, | ||
info=Messages.MSG_500.format(http_response.url + " : " + str(http_response.status)), | ||
wstg=INTERNAL_ERROR_WSTG_CODE, | ||
response=http_response | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
#!/usr/bin/env python3 | ||
# -*- coding: utf-8 -*- | ||
|
||
TYPE = "vulnerability" | ||
|
||
NAME = "Unencrypted Channels" | ||
SHORT_NAME = NAME | ||
|
||
WSTG_CODE = ["WSTG-CRYP-03"] | ||
|
||
DESCRIPTION = ( | ||
"Sensitive data must be protected when it is transmitted through the network." | ||
) | ||
|
||
SOLUTION = "Use HTTPS for the whole web site and redirect any HTTP requests to HTTPS." | ||
|
||
REFERENCES = [ | ||
{ | ||
"title": "Testing for Sensitive Information Sent via Unencrypted Channels", | ||
"url": ("https://owasp.org/www-project-web-security-testing-guide/latest/4-Web_Application_Security_Testing/" | ||
"09-Testing_for_Weak_Cryptography/03-Testing_for_Sensitive_Information_Sent_via_Unencrypted_Channels" | ||
) | ||
}, | ||
{ | ||
"title": "Testing for Weak Transport Layer Security", | ||
"url": ("https://owasp.org/www-project-web-security-testing-guide/latest/4-Web_Application_Security_Testing/" | ||
"09-Testing_for_Weak_Cryptography/01-Testing_for_Weak_Transport_Layer_Security" | ||
) | ||
} | ||
] |