diff --git a/tests/attack/test_mod_network_device.py b/tests/attack/test_mod_network_device.py index fc969adca..3c24cb0b4 100644 --- a/tests/attack/test_mod_network_device.py +++ b/tests/attack/test_mod_network_device.py @@ -195,6 +195,7 @@ async def test_detect_ssl_vpn(): respx.get("http://perdu.com/remote/fgt_lang?lang=fr").mock( return_value=httpx.Response( 200, + headers={"Content-Type": "application/javascript"}, content='Vous Perdu ?

Perdu sur Internet ?

\

Pas de panique, on va vous aider

' ) @@ -224,7 +225,7 @@ async def test_detect_ssl_vpn(): @pytest.mark.asyncio @respx.mock -async def test_detect_fortinet(): +async def test_fortinet_false_positive(): respx.get("http://perdu.com/login/?next=/").mock( return_value=httpx.Response( 200, @@ -247,6 +248,42 @@ async def test_detect_fortinet(): request = Request("http://perdu.com/") request.path_id = 1 + crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/")) + async with AsyncCrawler.with_configuration(crawler_configuration) as crawler: + options = {"timeout": 10, "level": 2, "tasks": 20} + + module = ModuleNetworkDevice(crawler, persister, options, Event(), crawler_configuration) + + await module.attack(request) + + assert not persister.add_payload.call_count + + +@pytest.mark.asyncio +@respx.mock +async def test_detect_fortinet(): + respx.get("http://perdu.com/login/?next=/").mock( + return_value=httpx.Response( + 200, + content='Fortinet

Perdu sur Internet ?

\ +

Pas de panique, on va vous aider

' + ) + ) + respx.get("http://perdu.com/").mock( + return_value=httpx.Response( + 200, + content='Vous Perdu ?

Perdu sur Internet ?

\ +

Pas de panique, on va vous aider

' + ) + ) + + respx.get(url__regex=r"http://perdu.com/.*?").mock(return_value=httpx.Response(404)) + + persister = AsyncMock() + + request = Request("http://perdu.com/") + request.path_id = 1 + crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/")) async with AsyncCrawler.with_configuration(crawler_configuration) as crawler: options = {"timeout": 10, "level": 2, "tasks": 20} @@ -343,6 +380,85 @@ async def test_detect_fortimail(): assert persister.add_payload.call_args_list[0][1]["module"] == "network_device" +@pytest.mark.asyncio +@respx.mock +async def test_detect_fortimanager(): + respx.get("http://perdu.com/p/login/").mock( + return_value=httpx.Response( + 200, + content='FortiManager

Perdu sur Internet ?

\ +

Pas de panique, on va vous aider

' + ) + ) + respx.get("http://perdu.com/").mock( + return_value=httpx.Response( + 200, + content='Vous Perdu ?

Perdu sur Internet ?

\ +

Pas de panique, on va vous aider

' + ) + ) + respx.get(url__regex=r"http://perdu.com/.*?").mock(return_value=httpx.Response(404)) + + persister = AsyncMock() + + request = Request("http://perdu.com/") + request.path_id = 1 + + crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/")) + async with AsyncCrawler.with_configuration(crawler_configuration) as crawler: + options = {"timeout": 10, "level": 2, "tasks": 20} + + module = ModuleNetworkDevice(crawler, persister, options, Event(), crawler_configuration) + + await module.attack(request) + + assert persister.add_payload.call_count == 1 + assert persister.add_payload.call_args_list[0][1]["info"] == ( + '{"name": "FortiManager", "versions": [], "categories": ["Network Equipment"], "groups": ["Content"]}' + ) + assert persister.add_payload.call_args_list[0][1]["module"] == "network_device" + + +@pytest.mark.asyncio +@respx.mock +async def test_detect_fortianalyzer(): + respx.get("http://perdu.com/p/login/").mock( + return_value=httpx.Response( + 200, + content='Login

Perdu sur Internet ?

\ +

Pas de panique, on va vous aider

\ +
FortiAnalyzer
' + ) + ) + respx.get("http://perdu.com/").mock( + return_value=httpx.Response( + 200, + content='Vous Perdu ?

Perdu sur Internet ?

\ +

Pas de panique, on va vous aider

' + ) + ) + respx.get(url__regex=r"http://perdu.com/.*?").mock(return_value=httpx.Response(404)) + + persister = AsyncMock() + + request = Request("http://perdu.com/") + request.path_id = 1 + + crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/")) + async with AsyncCrawler.with_configuration(crawler_configuration) as crawler: + options = {"timeout": 10, "level": 2, "tasks": 20} + + module = ModuleNetworkDevice(crawler, persister, options, Event(), crawler_configuration) + + await module.attack(request) + + assert persister.add_payload.call_count == 1 + assert persister.add_payload.call_args_list[0][1]["info"] == ( + '{"name": "FortiAnalyzer", "versions": [], "categories": ["Network Equipment"], "groups": ["Content"]}' + ) + assert persister.add_payload.call_args_list[0][1]["module"] == "network_device" + + @pytest.mark.asyncio @respx.mock async def test_raise_on_request_error(): diff --git a/wapitiCore/attack/network_devices/mod_forti.py b/wapitiCore/attack/network_devices/mod_forti.py index 34410deb0..27287ace4 100644 --- a/wapitiCore/attack/network_devices/mod_forti.py +++ b/wapitiCore/attack/network_devices/mod_forti.py @@ -10,7 +10,7 @@ from wapitiCore.net import Request from wapitiCore.net.response import Response from wapitiCore.definitions.fingerprint import NAME as TECHNO_DETECTED, WSTG_CODE -from wapitiCore.main.log import log_blue, logging +from wapitiCore.main.log import log_blue, logging, log_verbose MSG_NO_FORTI = "No Forti Product Detected" @@ -23,8 +23,10 @@ class ModuleForti(NetworkDeviceCommon): async def check_forti(self, url): fortivpn_list = ['remote/fgt_lang?lang=en', - 'remote/fgt_lang?lang=fr'] - check_list = fortivpn_list + ['logindisclaimer', + 'remote/fgt_lang?lang=fr'] + fortiweb_list = ['fgt_lang.js?paths=lang/en:com_info', + 'fgt_lang.js?paths=lang/fr:com_info'] + check_list = fortivpn_list + fortiweb_list + ['logindisclaimer', 'remote/login?lang=en', 'remote/login?lang=fr', 'fpc/app/login', @@ -35,20 +37,27 @@ async def check_forti(self, url): request = Request(full_url, 'GET') try: response: Response = await self.crawler.async_send(request, follow_redirects=False) + log_verbose(f"[¨] {request}") except RequestError: self.network_errors += 1 continue if response.is_success: - if item in fortivpn_list: - self.device_name = "Fortinet SSL-VPN" - await self.detect_forti_product(full_url) - return True + if "content-type" in response.headers and \ + "javascript" in response.headers["content-type"]: + if item in fortivpn_list: + self.device_name = "Fortinet SSL-VPN" + return True + if item in fortiweb_list: + self.device_name = "FortiWeb" + return True + return await self.detect_forti_product(full_url) # Check Fortinet product from title request = Request(url, 'GET') try: response: Response = await self.crawler.async_send(request, follow_redirects=False) + log_verbose(f"[¨] {request}") except RequestError: self.network_errors += 1 raise @@ -68,6 +77,7 @@ async def check_forti(self, url): request = Request(url_fortimail, 'GET') try: response: Response = await self.crawler.async_send(request, follow_redirects=False) + log_verbose(f"[¨] {request}") except RequestError: self.network_errors += 1 raise @@ -81,21 +91,29 @@ async def check_forti(self, url): self.device_name = match.group() return True - # Check FortiManager + # Check FortiManager and FortiAnalyzer url_fortimanager = urljoin(url, "p/login/") request = Request(url_fortimanager, 'GET') try: response: Response = await self.crawler.async_send(request, follow_redirects=False) + log_verbose(f"[¨] {request}") except RequestError: self.network_errors += 1 raise if response.is_success: soup = BeautifulSoup(response.content, 'html.parser') + title_tag = soup.title sign_in_header_div = soup.find('div', class_='sign-in-header') - if sign_in_header_div and 'FortiManager' in sign_in_header_div.text: - self.device_name = "FortiManager" - return True - + + for device_name in ["FortiManager", "FortiAnalyzer"]: + if title_tag: + if device_name in title_tag.string: + self.device_name = device_name + return True + # if custom title without Forti*, we check for specific div + if sign_in_header_div and device_name in sign_in_header_div.text: + self.device_name = device_name + return True return False async def detect_forti_product(self, url): @@ -103,6 +121,7 @@ async def detect_forti_product(self, url): try: # Send an HTTP GET request to the URL response: Response = await self.crawler.async_send(request, follow_redirects=True) + log_verbose(f"[¨] {request}") except RequestError: self.network_errors += 1 raise @@ -119,6 +138,8 @@ async def detect_forti_product(self, url): if match: # Extract the matched product name self.device_name = match.group() + return True + return False async def attack(self, request: Request, response: Optional[Response] = None): self.finished = True