From dcff265114563c0e83ae0891a05c67717e76f90d Mon Sep 17 00:00:00 2001 From: mrigankanand Date: Tue, 13 Sep 2022 18:06:42 +0530 Subject: [PATCH] Adding test cases for graceful shutting of modules --- .github/workflows/ci.yml | 8 +++- tests/pytest.ini | 2 + tests/test_module_killing.py | 77 +++++++++++++++++++++++++++++++++++ tests/test_module_shutting.py | 69 +++++++++++++++++++++++++++++++ 4 files changed, 154 insertions(+), 2 deletions(-) create mode 100644 tests/pytest.ini create mode 100644 tests/test_module_killing.py create mode 100644 tests/test_module_shutting.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7c490aaf..0ff54f94 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -42,9 +42,10 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [3.9.4, 3.8.9] + python-version: [ 3.9.4, 3.8.9 ] steps: + - uses: actions/checkout@v2 - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v2 @@ -71,11 +72,14 @@ jobs: run: docker logs python-honeypot_ohp_1 - name: Test with pytest - run: docker exec python-honeypot_ohp_1 python3 -m pytest -rpP --reruns 5 --reruns-delay 3 + run: docker exec python-honeypot_ohp_1 python3 -m pytest -c tests/pytest.ini -rpP --reruns 5 --reruns-delay 3 - name: Check API server logs run: docker-compose logs ohp + - name: Run modules test (graceful shutting of modules) + run: sudo python3 -m pytest tests/test_module_shutting.py tests/test_module_killing.py -s + - name: Run modules test run: sudo python3 ohp.py -m all --test --store-pcap diff --git a/tests/pytest.ini b/tests/pytest.ini new file mode 100644 index 00000000..8647c223 --- /dev/null +++ b/tests/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +addopts = --ignore=tests/test_module_killing.py --ignore=tests/test_module_shutting.py \ No newline at end of file diff --git a/tests/test_module_killing.py b/tests/test_module_killing.py new file mode 100644 index 00000000..8d23b927 --- /dev/null +++ b/tests/test_module_killing.py @@ -0,0 +1,77 @@ +import os +import sys +import unittest +import subprocess +import signal +from os.path import dirname, abspath +from time import time + +from core.messages import load_messages + +messages = load_messages().message_contents + + +def run_container_in_sub_process(command, kill_container_command): + is_network_traffic_capture_started = False + parent_directory = str(dirname(dirname(abspath(__file__)))) + output = str() + expected_result = False + process = subprocess.Popen(command, stdout=subprocess.PIPE, stdin=subprocess.PIPE, shell=False, + cwd=parent_directory) + start_time = time() + for c in iter(lambda: process.stdout.read(1), b""): + if time() - start_time > 300: + os.kill(process.pid, signal.SIGINT) + break + sys.stdout.buffer.write(c) + output += c.decode("utf-8") + if messages["network_traffic_capture_start"] in output and is_network_traffic_capture_started is False: + is_network_traffic_capture_started = True + os.system(kill_container_command) + elif is_network_traffic_capture_started is True and "finished." in output: + expected_result = True + break + assert True is expected_result + + +class TestModules(unittest.TestCase): + + def test_module_ftp_weak_password(self): + kill_container_command = "docker kill ohp_ftpserver_weak_password" + command = ["python3", "ohp.py", "-m", "ftp/weak_password"] + run_container_in_sub_process(command, kill_container_command) + + def test_module_ftp_strong_password(self): + kill_container_command = "docker kill ohp_ftpserver_strong_password" + command = ["python3", "ohp.py", "-m", "ftp/strong_password"] + run_container_in_sub_process(command, kill_container_command) + + def test_module_http_basic_auth_strong_password(self): + kill_container_command = "docker kill ohp_httpserver_basic_auth_strong_password" + command = ["python3", "ohp.py", "-m", "http/basic_auth_strong_password"] + run_container_in_sub_process(command, kill_container_command) + + def test_module_http_basic_auth_weak_password(self): + kill_container_command = "docker kill ohp_httpserver_basic_auth_weak_password" + command = ["python3", "ohp.py", "-m", "http/basic_auth_weak_password"] + run_container_in_sub_process(command, kill_container_command) + + def test_module_http_ics_veeder_root_guardian_ast(self): + kill_container_command = "docker kill ohp_icsserver_veeder_root_guardian_ast" + command = ["python3", "ohp.py", "-m", "ics/veeder_root_guardian_ast"] + run_container_in_sub_process(command, kill_container_command) + + def test_module_smtp_strong_password(self): + kill_container_command = "docker kill ohp_smtpserver_strong_password" + command = ["python3", "ohp.py", "-m", "smtp/strong_password"] + run_container_in_sub_process(command, kill_container_command) + + def test_module_ssh_weak_password(self): + kill_container_command = "docker kill ohp_sshserver_weak_password" + command = ["python3", "ohp.py", "-m", "ssh/weak_password"] + run_container_in_sub_process(command, kill_container_command) + + def test_module_ssh_strong_password(self): + kill_container_command = "docker kill ohp_sshserver_strong_password" + command = ["python3", "ohp.py", "-m", "ssh/strong_password"] + run_container_in_sub_process(command, kill_container_command) diff --git a/tests/test_module_shutting.py b/tests/test_module_shutting.py new file mode 100644 index 00000000..74092475 --- /dev/null +++ b/tests/test_module_shutting.py @@ -0,0 +1,69 @@ +import os +import sys +import unittest +import subprocess +import signal +from os.path import dirname, abspath +from time import time + +from core.messages import load_messages + +messages = load_messages().message_contents + + +def run_container_in_sub_process(command): + is_network_traffic_capture_started = False + parent_directory = str(dirname(dirname(abspath(__file__)))) + output = str() + expected_result = False + process = subprocess.Popen(command, stdout=subprocess.PIPE, stdin=subprocess.PIPE, shell=False, + cwd=parent_directory) + start_time = time() + for c in iter(lambda: process.stdout.read(1), b""): + if time() - start_time > 300: + os.kill(process.pid, signal.SIGINT) + break + sys.stdout.buffer.write(c) + output += c.decode("utf-8") + if messages["network_traffic_capture_start"] in output and is_network_traffic_capture_started is False: + is_network_traffic_capture_started = True + os.kill(process.pid, signal.SIGINT) + elif is_network_traffic_capture_started is True and "finished." in output: + expected_result = True + break + assert True is expected_result + + +class TestModules(unittest.TestCase): + + def test_module_ftp_weak_password(self): + command = ["python3", "ohp.py", "-m", "ftp/weak_password"] + run_container_in_sub_process(command) + + def test_module_ftp_strong_password(self): + command = ["python3", "ohp.py", "-m", "ftp/strong_password"] + run_container_in_sub_process(command) + + def test_module_http_basic_auth_strong_password(self): + command = ["python3", "ohp.py", "-m", "http/basic_auth_strong_password"] + run_container_in_sub_process(command) + + def test_module_http_basic_auth_weak_password(self): + command = ["python3", "ohp.py", "-m", "http/basic_auth_weak_password"] + run_container_in_sub_process(command) + + def test_module_http_ics_veeder_root_guardian_ast(self): + command = ["python3", "ohp.py", "-m", "ics/veeder_root_guardian_ast"] + run_container_in_sub_process(command) + + def test_module_smtp_strong_password(self): + command = ["python3", "ohp.py", "-m", "smtp/strong_password"] + run_container_in_sub_process(command) + + def test_module_ssh_weak_password(self): + command = ["python3", "ohp.py", "-m", "ssh/weak_password"] + run_container_in_sub_process(command) + + def test_module_ssh_strong_password(self): + command = ["python3", "ohp.py", "-m", "ssh/strong_password"] + run_container_in_sub_process(command)