Skip to content

Commit

Permalink
fix false positive forti detection, add fortiweb and fortianalyzer
Browse files Browse the repository at this point in the history
Signed-off-by: bretfourbe <[email protected]>
  • Loading branch information
bretfourbe committed May 13, 2024
1 parent 8ad4671 commit 1c8ddd3
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 13 deletions.
118 changes: 117 additions & 1 deletion tests/attack/test_mod_network_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ async def test_detect_ssl_vpn():
respx.get("http://perdu.com/remote/fgt_lang?lang=fr").mock(
return_value=httpx.Response(
200,
headers={"Content-Type": "application/javascript"},
content='<html><head><title>Vous Perdu ?</title></head><body><h1>Perdu sur Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> </body></html>'
)
Expand Down Expand Up @@ -224,7 +225,7 @@ async def test_detect_ssl_vpn():

@pytest.mark.asyncio
@respx.mock
async def test_detect_fortinet():
async def test_fortinet_false_positive():
respx.get("http://perdu.com/login/?next=/").mock(
return_value=httpx.Response(
200,
Expand All @@ -247,6 +248,42 @@ async def test_detect_fortinet():
request = Request("http://perdu.com/")
request.path_id = 1

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"))
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2, "tasks": 20}

module = ModuleNetworkDevice(crawler, persister, options, Event(), crawler_configuration)

await module.attack(request)

assert not persister.add_payload.call_count


@pytest.mark.asyncio
@respx.mock
async def test_detect_fortinet():
respx.get("http://perdu.com/login/?next=/").mock(
return_value=httpx.Response(
200,
content='<html><head><title>Fortinet</title></head><body><h1>Perdu sur Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> '
)
)
respx.get("http://perdu.com/").mock(
return_value=httpx.Response(
200,
content='<html><head><title>Vous Perdu ?</title></head><body><h1>Perdu sur Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> </body></html>'
)
)

respx.get(url__regex=r"http://perdu.com/.*?").mock(return_value=httpx.Response(404))

persister = AsyncMock()

request = Request("http://perdu.com/")
request.path_id = 1

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"))
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2, "tasks": 20}
Expand Down Expand Up @@ -343,6 +380,85 @@ async def test_detect_fortimail():
assert persister.add_payload.call_args_list[0][1]["module"] == "network_device"


@pytest.mark.asyncio
@respx.mock
async def test_detect_fortimanager():
respx.get("http://perdu.com/p/login/").mock(
return_value=httpx.Response(
200,
content='<html><head><title>FortiManager</title></head><body><h1>Perdu sur Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2></body></html>'
)
)
respx.get("http://perdu.com/").mock(
return_value=httpx.Response(
200,
content='<html><head><title>Vous Perdu ?</title></head><body><h1>Perdu sur Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> </body></html>'
)
)
respx.get(url__regex=r"http://perdu.com/.*?").mock(return_value=httpx.Response(404))

persister = AsyncMock()

request = Request("http://perdu.com/")
request.path_id = 1

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"))
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2, "tasks": 20}

module = ModuleNetworkDevice(crawler, persister, options, Event(), crawler_configuration)

await module.attack(request)

assert persister.add_payload.call_count == 1
assert persister.add_payload.call_args_list[0][1]["info"] == (
'{"name": "FortiManager", "versions": [], "categories": ["Network Equipment"], "groups": ["Content"]}'
)
assert persister.add_payload.call_args_list[0][1]["module"] == "network_device"


@pytest.mark.asyncio
@respx.mock
async def test_detect_fortianalyzer():
respx.get("http://perdu.com/p/login/").mock(
return_value=httpx.Response(
200,
content='<html><head><title>Login</title></head><body><h1>Perdu sur Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> \
<div class="sign-in-header">FortiAnalyzer</div></body></html>'
)
)
respx.get("http://perdu.com/").mock(
return_value=httpx.Response(
200,
content='<html><head><title>Vous Perdu ?</title></head><body><h1>Perdu sur Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> </body></html>'
)
)
respx.get(url__regex=r"http://perdu.com/.*?").mock(return_value=httpx.Response(404))

persister = AsyncMock()

request = Request("http://perdu.com/")
request.path_id = 1

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"))
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2, "tasks": 20}

module = ModuleNetworkDevice(crawler, persister, options, Event(), crawler_configuration)

await module.attack(request)

assert persister.add_payload.call_count == 1
assert persister.add_payload.call_args_list[0][1]["info"] == (
'{"name": "FortiAnalyzer", "versions": [], "categories": ["Network Equipment"], "groups": ["Content"]}'
)
assert persister.add_payload.call_args_list[0][1]["module"] == "network_device"


@pytest.mark.asyncio
@respx.mock
async def test_raise_on_request_error():
Expand Down
45 changes: 33 additions & 12 deletions wapitiCore/attack/network_devices/mod_forti.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from wapitiCore.net import Request
from wapitiCore.net.response import Response
from wapitiCore.definitions.fingerprint import NAME as TECHNO_DETECTED, WSTG_CODE
from wapitiCore.main.log import log_blue, logging
from wapitiCore.main.log import log_blue, logging, log_verbose

MSG_NO_FORTI = "No Forti Product Detected"

Expand All @@ -23,8 +23,10 @@ class ModuleForti(NetworkDeviceCommon):

async def check_forti(self, url):
fortivpn_list = ['remote/fgt_lang?lang=en',
'remote/fgt_lang?lang=fr']
check_list = fortivpn_list + ['logindisclaimer',
'remote/fgt_lang?lang=fr']
fortiweb_list = ['fgt_lang.js?paths=lang/en:com_info',
'fgt_lang.js?paths=lang/fr:com_info']
check_list = fortivpn_list + fortiweb_list + ['logindisclaimer',
'remote/login?lang=en',
'remote/login?lang=fr',
'fpc/app/login',
Expand All @@ -35,20 +37,27 @@ async def check_forti(self, url):
request = Request(full_url, 'GET')
try:
response: Response = await self.crawler.async_send(request, follow_redirects=False)
log_verbose(f"[¨] {request}")
except RequestError:
self.network_errors += 1
continue

if response.is_success:
if item in fortivpn_list:
self.device_name = "Fortinet SSL-VPN"
await self.detect_forti_product(full_url)
return True
if "content-type" in response.headers and \
"javascript" in response.headers["content-type"]:
if item in fortivpn_list:
self.device_name = "Fortinet SSL-VPN"
return True
if item in fortiweb_list:
self.device_name = "FortiWeb"
return True
return await self.detect_forti_product(full_url)

# Check Fortinet product from title
request = Request(url, 'GET')
try:
response: Response = await self.crawler.async_send(request, follow_redirects=False)
log_verbose(f"[¨] {request}")
except RequestError:
self.network_errors += 1
raise
Expand All @@ -68,6 +77,7 @@ async def check_forti(self, url):
request = Request(url_fortimail, 'GET')
try:
response: Response = await self.crawler.async_send(request, follow_redirects=False)
log_verbose(f"[¨] {request}")
except RequestError:
self.network_errors += 1
raise
Expand All @@ -81,28 +91,37 @@ async def check_forti(self, url):
self.device_name = match.group()
return True

# Check FortiManager
# Check FortiManager and FortiAnalyzer
url_fortimanager = urljoin(url, "p/login/")
request = Request(url_fortimanager, 'GET')
try:
response: Response = await self.crawler.async_send(request, follow_redirects=False)
log_verbose(f"[¨] {request}")
except RequestError:
self.network_errors += 1
raise
if response.is_success:
soup = BeautifulSoup(response.content, 'html.parser')
title_tag = soup.title
sign_in_header_div = soup.find('div', class_='sign-in-header')
if sign_in_header_div and 'FortiManager' in sign_in_header_div.text:
self.device_name = "FortiManager"
return True


for device_name in ["FortiManager", "FortiAnalyzer"]:
if title_tag:
if device_name in title_tag.string:
self.device_name = device_name
return True
# if custom title without Forti*, we check for specific div
if sign_in_header_div and device_name in sign_in_header_div.text:
self.device_name = device_name
return True
return False

async def detect_forti_product(self, url):
request = Request(url, 'GET')
try:
# Send an HTTP GET request to the URL
response: Response = await self.crawler.async_send(request, follow_redirects=True)
log_verbose(f"[¨] {request}")
except RequestError:
self.network_errors += 1
raise
Expand All @@ -119,6 +138,8 @@ async def detect_forti_product(self, url):
if match:
# Extract the matched product name
self.device_name = match.group()
return True
return False

async def attack(self, request: Request, response: Optional[Response] = None):
self.finished = True
Expand Down

0 comments on commit 1c8ddd3

Please sign in to comment.