-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
New rule to check socket.create_connection with no timeout (#598)
This rule checks for cases where socket.create_connection and a parameter for the timeout is not set. If set to None or unset, the blocking socket can potentially hang indefinitely. Closes: #519 Signed-off-by: Eric Brown <[email protected]>
- Loading branch information
Showing
10 changed files
with
208 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
--- | ||
id: PY039 | ||
title: socket — no timeout | ||
hide_title: true | ||
pagination_prev: null | ||
pagination_next: null | ||
slug: /rules/PY039 | ||
--- | ||
|
||
::: precli.rules.python.stdlib.socket_no_timeout |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
# Copyright 2024 Secure Sauce LLC | ||
r""" | ||
# Synchronous Access of `socket` without Timeout | ||
The function socket.create_connection() in Python establishes a TCP connection | ||
to a remote host. By default, this function operates synchronously, meaning it | ||
will block indefinitely if no timeout is specified. This behavior can lead to | ||
resource exhaustion or unresponsive applications if the remote host is slow | ||
or unresponsive, creating the risk of a Denial of Service (DoS). | ||
This rule ensures that a timeout is always specified when using | ||
`socket.create_connection()` to prevent indefinite blocking and resource | ||
exhaustion. | ||
Failing to specify a timeout in `socket.create_connection()` may cause the | ||
system or application to block indefinitely while waiting for a connection, | ||
consuming resources unnecessarily and potentially leading to system hangs or | ||
Denial of Service (DoS) vulnerabilities. | ||
# Example | ||
```python linenums="1" hl_lines="4" title="socket_create_connection.py" | ||
import socket | ||
s = socket.create_connection(("127.0.0.1", 80)) | ||
s.recv(1024) | ||
s.close() | ||
``` | ||
??? example "Example Output" | ||
``` | ||
> precli tests/unit/rules/python/stdlib/socket/examples/socket_create_connection.py | ||
⚠️ Warning on line 9 in tests/unit/rules/python/stdlib/socket/examples/socket_create_connection.py | ||
PY039: Synchronous Access of Remote Resource without Timeout | ||
The function 'socket.create_connection' is used without a timeout, which may cause the application to block indefinitely if the remote server does not respond. | ||
``` | ||
# Remediation | ||
Always provide a timeout parameter when calling `socket.create_connection()`. | ||
This ensures that if the remote host is unreachable or unresponsive, the | ||
connection attempt will fail after a certain period, releasing resources | ||
and preventing indefinite blocking. | ||
```python linenums="1" hl_lines="4" title="socket_create_connection.py" | ||
import socket | ||
s = socket.create_connection(("127.0.0.1", 80), timeout=5) | ||
s.recv(1024) | ||
s.close() | ||
``` | ||
# See also | ||
!!! info | ||
- [socket.create_connection — Low-level networking interface](https://docs.python.org/3/library/socket.html#socket.create_connection) | ||
- [CWE-1088: Synchronous Access of Remote Resource without Timeout](https://cwe.mitre.org/data/definitions/1088.html) | ||
_New in version 0.6.7_ | ||
""" # noqa: E501 | ||
from precli.core.call import Call | ||
from precli.core.location import Location | ||
from precli.core.result import Result | ||
from precli.rules import Rule | ||
|
||
|
||
class SocketNoTimeout(Rule): | ||
def __init__(self, id: str): | ||
super().__init__( | ||
id=id, | ||
name="no_timeout", | ||
description=__doc__, | ||
cwe_id=1088, | ||
message="The function '{0}' is used without a timeout, which may " | ||
"cause the application to block indefinitely if the remote server " | ||
"does not respond.", | ||
) | ||
|
||
def analyze_call(self, context: dict, call: Call) -> Result | None: | ||
if call.name_qualified not in ("socket.create_connection",): | ||
return | ||
|
||
argument = call.get_argument(position=1, name="timeout") | ||
timeout = argument.value | ||
|
||
if argument.node is None: | ||
arg_list_node = call.arg_list_node | ||
fix_node = arg_list_node | ||
args = [child.string for child in arg_list_node.named_children] | ||
args.append("timeout=5") | ||
content = f"({', '.join(args)})" | ||
result_node = call.arg_list_node | ||
elif timeout is None: | ||
fix_node = argument.node | ||
result_node = argument.node | ||
content = "5" | ||
else: | ||
return | ||
|
||
fixes = Rule.get_fixes( | ||
context=context, | ||
deleted_location=Location(fix_node), | ||
description="Set timeout parameter to a small number of seconds.", | ||
inserted_content=content, | ||
) | ||
return Result( | ||
rule_id=self.id, | ||
location=Location(node=result_node), | ||
message=self.message.format(call.name_qualified), | ||
fixes=fixes, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
11 changes: 11 additions & 0 deletions
11
tests/unit/rules/python/stdlib/socket/examples/socket_create_connection.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
# level: WARNING | ||
# start_line: 9 | ||
# end_line: 9 | ||
# start_column: 28 | ||
# end_column: 47 | ||
import socket | ||
|
||
|
||
s = socket.create_connection(("127.0.0.1", 80)) | ||
s.recv(1024) | ||
s.close() |
7 changes: 7 additions & 0 deletions
7
tests/unit/rules/python/stdlib/socket/examples/socket_create_connection_timeout_5.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
# level: NONE | ||
import socket | ||
|
||
|
||
s = socket.create_connection(("127.0.0.1", 80), timeout=5) | ||
s.recv(1024) | ||
s.close() |
11 changes: 11 additions & 0 deletions
11
tests/unit/rules/python/stdlib/socket/examples/socket_create_connection_timeout_none.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
# level: WARNING | ||
# start_line: 9 | ||
# end_line: 9 | ||
# start_column: 56 | ||
# end_column: 60 | ||
import socket | ||
|
||
|
||
s = socket.create_connection(("127.0.0.1", 80), timeout=None) | ||
s.recv(1024) | ||
s.close() |
49 changes: 49 additions & 0 deletions
49
tests/unit/rules/python/stdlib/socket/test_socket_no_timeout.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
# Copyright 2024 Secure Sauce LLC | ||
import os | ||
|
||
import pytest | ||
|
||
from precli.core.level import Level | ||
from precli.parsers import python | ||
from precli.rules import Rule | ||
from tests.unit.rules import test_case | ||
|
||
|
||
class TestSocketNoTimeout(test_case.TestCase): | ||
@classmethod | ||
def setup_class(cls): | ||
cls.rule_id = "PY039" | ||
cls.parser = python.Python() | ||
cls.base_path = os.path.join( | ||
"tests", | ||
"unit", | ||
"rules", | ||
"python", | ||
"stdlib", | ||
"socket", | ||
"examples", | ||
) | ||
|
||
def test_rule_meta(self): | ||
rule = Rule.get_by_id(self.rule_id) | ||
assert rule.id == self.rule_id | ||
assert rule.name == "no_timeout" | ||
assert ( | ||
rule.help_url | ||
== f"https://docs.securesauce.dev/rules/{self.rule_id}" | ||
) | ||
assert rule.default_config.enabled is True | ||
assert rule.default_config.level == Level.WARNING | ||
assert rule.default_config.rank == -1.0 | ||
assert rule.cwe.id == 1088 | ||
|
||
@pytest.mark.parametrize( | ||
"filename", | ||
[ | ||
"socket_create_connection.py", | ||
"socket_create_connection_timeout_5.py", | ||
"socket_create_connection_timeout_none.py", | ||
], | ||
) | ||
def test(self, filename): | ||
self.check(filename) |