Skip to content

Commit

Permalink
Adding test cases for graceful shutting of modules
Browse files Browse the repository at this point in the history
  • Loading branch information
spiderxm committed Sep 13, 2022
1 parent 2600e1a commit dcff265
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 2 deletions.
8 changes: 6 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.9.4, 3.8.9]
python-version: [ 3.9.4, 3.8.9 ]

steps:

- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
Expand All @@ -71,11 +72,14 @@ jobs:
run: docker logs python-honeypot_ohp_1

- name: Test with pytest
run: docker exec python-honeypot_ohp_1 python3 -m pytest -rpP --reruns 5 --reruns-delay 3
run: docker exec python-honeypot_ohp_1 python3 -m pytest -c tests/pytest.ini -rpP --reruns 5 --reruns-delay 3

- name: Check API server logs
run: docker-compose logs ohp

- name: Run modules test (graceful shutting of modules)
run: sudo python3 -m pytest tests/test_module_shutting.py tests/test_module_killing.py -s

- name: Run modules test
run: sudo python3 ohp.py -m all --test --store-pcap

Expand Down
2 changes: 2 additions & 0 deletions tests/pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[pytest]
addopts = --ignore=tests/test_module_killing.py --ignore=tests/test_module_shutting.py
77 changes: 77 additions & 0 deletions tests/test_module_killing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import os
import sys
import unittest
import subprocess
import signal
from os.path import dirname, abspath
from time import time

from core.messages import load_messages

messages = load_messages().message_contents


def run_container_in_sub_process(command, kill_container_command):
is_network_traffic_capture_started = False
parent_directory = str(dirname(dirname(abspath(__file__))))
output = str()
expected_result = False
process = subprocess.Popen(command, stdout=subprocess.PIPE, stdin=subprocess.PIPE, shell=False,
cwd=parent_directory)
start_time = time()
for c in iter(lambda: process.stdout.read(1), b""):
if time() - start_time > 300:
os.kill(process.pid, signal.SIGINT)
break
sys.stdout.buffer.write(c)
output += c.decode("utf-8")
if messages["network_traffic_capture_start"] in output and is_network_traffic_capture_started is False:
is_network_traffic_capture_started = True
os.system(kill_container_command)
elif is_network_traffic_capture_started is True and "finished." in output:
expected_result = True
break
assert True is expected_result


class TestModules(unittest.TestCase):

def test_module_ftp_weak_password(self):
kill_container_command = "docker kill ohp_ftpserver_weak_password"
command = ["python3", "ohp.py", "-m", "ftp/weak_password"]
run_container_in_sub_process(command, kill_container_command)

def test_module_ftp_strong_password(self):
kill_container_command = "docker kill ohp_ftpserver_strong_password"
command = ["python3", "ohp.py", "-m", "ftp/strong_password"]
run_container_in_sub_process(command, kill_container_command)

def test_module_http_basic_auth_strong_password(self):
kill_container_command = "docker kill ohp_httpserver_basic_auth_strong_password"
command = ["python3", "ohp.py", "-m", "http/basic_auth_strong_password"]
run_container_in_sub_process(command, kill_container_command)

def test_module_http_basic_auth_weak_password(self):
kill_container_command = "docker kill ohp_httpserver_basic_auth_weak_password"
command = ["python3", "ohp.py", "-m", "http/basic_auth_weak_password"]
run_container_in_sub_process(command, kill_container_command)

def test_module_http_ics_veeder_root_guardian_ast(self):
kill_container_command = "docker kill ohp_icsserver_veeder_root_guardian_ast"
command = ["python3", "ohp.py", "-m", "ics/veeder_root_guardian_ast"]
run_container_in_sub_process(command, kill_container_command)

def test_module_smtp_strong_password(self):
kill_container_command = "docker kill ohp_smtpserver_strong_password"
command = ["python3", "ohp.py", "-m", "smtp/strong_password"]
run_container_in_sub_process(command, kill_container_command)

def test_module_ssh_weak_password(self):
kill_container_command = "docker kill ohp_sshserver_weak_password"
command = ["python3", "ohp.py", "-m", "ssh/weak_password"]
run_container_in_sub_process(command, kill_container_command)

def test_module_ssh_strong_password(self):
kill_container_command = "docker kill ohp_sshserver_strong_password"
command = ["python3", "ohp.py", "-m", "ssh/strong_password"]
run_container_in_sub_process(command, kill_container_command)
69 changes: 69 additions & 0 deletions tests/test_module_shutting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import os
import sys
import unittest
import subprocess
import signal
from os.path import dirname, abspath
from time import time

from core.messages import load_messages

messages = load_messages().message_contents


def run_container_in_sub_process(command):
is_network_traffic_capture_started = False
parent_directory = str(dirname(dirname(abspath(__file__))))
output = str()
expected_result = False
process = subprocess.Popen(command, stdout=subprocess.PIPE, stdin=subprocess.PIPE, shell=False,
cwd=parent_directory)
start_time = time()
for c in iter(lambda: process.stdout.read(1), b""):
if time() - start_time > 300:
os.kill(process.pid, signal.SIGINT)
break
sys.stdout.buffer.write(c)
output += c.decode("utf-8")
if messages["network_traffic_capture_start"] in output and is_network_traffic_capture_started is False:
is_network_traffic_capture_started = True
os.kill(process.pid, signal.SIGINT)
elif is_network_traffic_capture_started is True and "finished." in output:
expected_result = True
break
assert True is expected_result


class TestModules(unittest.TestCase):

def test_module_ftp_weak_password(self):
command = ["python3", "ohp.py", "-m", "ftp/weak_password"]
run_container_in_sub_process(command)

def test_module_ftp_strong_password(self):
command = ["python3", "ohp.py", "-m", "ftp/strong_password"]
run_container_in_sub_process(command)

def test_module_http_basic_auth_strong_password(self):
command = ["python3", "ohp.py", "-m", "http/basic_auth_strong_password"]
run_container_in_sub_process(command)

def test_module_http_basic_auth_weak_password(self):
command = ["python3", "ohp.py", "-m", "http/basic_auth_weak_password"]
run_container_in_sub_process(command)

def test_module_http_ics_veeder_root_guardian_ast(self):
command = ["python3", "ohp.py", "-m", "ics/veeder_root_guardian_ast"]
run_container_in_sub_process(command)

def test_module_smtp_strong_password(self):
command = ["python3", "ohp.py", "-m", "smtp/strong_password"]
run_container_in_sub_process(command)

def test_module_ssh_weak_password(self):
command = ["python3", "ohp.py", "-m", "ssh/weak_password"]
run_container_in_sub_process(command)

def test_module_ssh_strong_password(self):
command = ["python3", "ohp.py", "-m", "ssh/strong_password"]
run_container_in_sub_process(command)

0 comments on commit dcff265

Please sign in to comment.