-
Notifications
You must be signed in to change notification settings - Fork 13
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor: improves readability of the SSP end to end tests (#198)
* refactor: refines E2E ssp test for better readabilty Remove the redefinition of commands that would create the same output and removed missed subprocess line Signed-off-by: Jennifer Power <[email protected]> * refactor: improves readabilty of SSP E2E testing and tests error messages Signed-off-by: Jennifer Power <[email protected]> --------- Signed-off-by: Jennifer Power <[email protected]> Co-authored-by: beatrizmcouto <[email protected]>
- Loading branch information
1 parent
05b0c4c
commit 39c6b52
Showing
3 changed files
with
159 additions
and
145 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,7 +19,6 @@ | |
import logging | ||
import os | ||
import pathlib | ||
import subprocess | ||
from typing import Dict, Tuple | ||
|
||
import pytest | ||
|
@@ -44,142 +43,154 @@ | |
test_ssp_name = "test_ssp" | ||
|
||
|
||
@pytest.fixture | ||
def valid_args_dict() -> Dict[str, str]: | ||
return { | ||
"branch": "test", | ||
"markdown-path": test_ssp_md, | ||
"oscal-model": "ssp", | ||
"committer-name": "test", | ||
"committer-email": "[email protected]", | ||
"ssp-index": "ssp-index.json", | ||
} | ||
|
||
|
||
def replace_line_in_file_after_tag( | ||
file_path: pathlib.Path, tag: str, new_line: str | ||
) -> bool: | ||
"""Replace the line after tag with new string.""" | ||
with file_path.open("r") as f: | ||
lines = f.readlines() | ||
|
||
for i, line in enumerate(lines): | ||
if tag in line: | ||
lines[i + 1] = new_line | ||
|
||
with file_path.open("w") as f: | ||
f.writelines(lines) | ||
return True | ||
return False | ||
|
||
|
||
@pytest.mark.slow | ||
@pytest.mark.parametrize( | ||
"test_name, command_args, response, skip_create", | ||
[ | ||
( | ||
"success/happy path", | ||
{ | ||
"branch": "test", | ||
"markdown-path": test_ssp_md, | ||
"oscal-model": "ssp", | ||
"committer-name": "test", | ||
"committer-email": "[email protected]", | ||
"ssp-index": "ssp-index.json", | ||
}, | ||
SUCCESS_EXIT_CODE, | ||
False, | ||
), | ||
( | ||
"failure/missing-ssp-index", | ||
{ | ||
"branch": "test", | ||
"markdown-path": test_ssp_md, | ||
"oscal-model": "ssp", | ||
"committer-name": "test", | ||
"committer-email": "[email protected]", | ||
}, | ||
ERROR_EXIT_CODE, | ||
True, | ||
), | ||
], | ||
) | ||
def test_ssp_editing_e2e( | ||
tmp_repo: Tuple[str, Repo], | ||
e2e_runner: E2ETestRunner, | ||
test_name: str, | ||
command_args: Dict[str, str], | ||
response: int, | ||
skip_create: bool, | ||
valid_args_dict: Dict[str, str], | ||
) -> None: | ||
"""Test the trestlebot autosync command with SSPs.""" | ||
logger.info(f"Running test: {test_name}") | ||
tmp_repo_str, _ = tmp_repo | ||
tmp_repo_path = pathlib.Path(tmp_repo_str) | ||
|
||
ssp_md_path = pathlib.Path(test_ssp_md) / test_ssp_name | ||
_ = setup_for_ssp(tmp_repo_path, test_prof, [test_comp_name], str(ssp_md_path)) | ||
|
||
# Get command arguments for the test | ||
branch = valid_args_dict["branch"] | ||
markdown_path = valid_args_dict["markdown-path"] | ||
committer_name = valid_args_dict["committer-name"] | ||
committer_email = valid_args_dict["committer-email"] | ||
|
||
create_args: Dict[str, str] = { | ||
"markdown-path": markdown_path, | ||
"branch": branch, | ||
"committer-name": committer_name, | ||
"committer-email": committer_email, | ||
"ssp-name": test_ssp_name, | ||
"profile-name": test_prof, | ||
"compdefs": test_comp_name, | ||
} | ||
create_command = e2e_runner.build_test_command( | ||
tmp_repo_str, | ||
"create-ssp", | ||
create_args, | ||
) | ||
exit_code, _, _ = e2e_runner.invoke_command(create_command) | ||
assert exit_code == SUCCESS_EXIT_CODE | ||
assert (tmp_repo_path / markdown_path).exists() | ||
|
||
# Check that the correct files are present with the correct content | ||
ssp_path = ModelUtils.get_model_path_for_name_and_class( | ||
tmp_repo_path, test_ssp_name, SystemSecurityPlan, FileContentType.JSON | ||
) | ||
index_path = os.path.join(tmp_repo_str, "ssp-index.json") | ||
ssp_index = SSPIndex(index_path) | ||
assert ssp_index.get_profile_by_ssp(test_ssp_name) == test_prof | ||
assert ssp_index.get_comps_by_ssp(test_ssp_name) == [test_comp_name] | ||
assert ssp_index.get_leveraged_by_ssp(test_ssp_name) is None | ||
assert ssp_path.exists() | ||
|
||
# Make a change to the SSP | ||
ac_1_path = tmp_repo_path / ssp_md_path / "ac" / "ac-1.md" | ||
assert replace_line_in_file_after_tag( | ||
ac_1_path, "ac-1_prm_6:", " values:\n ssp-values:\n - my ssp val\n" | ||
) | ||
|
||
autosync_command = e2e_runner.build_test_command( | ||
tmp_repo_str, "autosync", valid_args_dict | ||
) | ||
exit_code, response_stdout, _ = e2e_runner.invoke_command(autosync_command) | ||
assert exit_code == SUCCESS_EXIT_CODE | ||
# Check that the ssp was pushed to the remote | ||
assert f"Changes pushed to {branch} successfully." in response_stdout | ||
|
||
# Check that if run again, the ssp is not pushed again | ||
exit_code, response_stdout, _ = e2e_runner.invoke_command(autosync_command) | ||
assert exit_code == SUCCESS_EXIT_CODE | ||
assert "Nothing to commit" in response_stdout | ||
|
||
# Check that if the upstream profile is updated, the ssp is updated | ||
local_upstream_path = prepare_upstream_repo() | ||
upstream_repos_arg = f"{e2e_runner.UPSTREAM_REPO}@main" | ||
upstream_command_args = { | ||
"branch": branch, | ||
"committer-name": committer_name, | ||
"committer-email": committer_email, | ||
"sources": upstream_repos_arg, | ||
} | ||
sync_upstreams_command = e2e_runner.build_test_command( | ||
tmp_repo_str, | ||
"sync-upstreams", | ||
upstream_command_args, | ||
local_upstream_path, | ||
) | ||
exit_code, response_stdout, _ = e2e_runner.invoke_command(sync_upstreams_command) | ||
assert exit_code == SUCCESS_EXIT_CODE | ||
assert f"Changes pushed to {branch} successfully." in response_stdout | ||
|
||
# Autosync again to check that the ssp is updated | ||
exit_code, response_stdout, _ = e2e_runner.invoke_command(autosync_command) | ||
assert exit_code == SUCCESS_EXIT_CODE | ||
assert f"Changes pushed to {branch} successfully." in response_stdout | ||
|
||
# Clean up the upstream repo | ||
clean(local_upstream_path, None) | ||
|
||
|
||
@pytest.mark.slow | ||
def test_ssp_e2e_editing_failure( | ||
tmp_repo: Tuple[str, Repo], | ||
e2e_runner: E2ETestRunner, | ||
valid_args_dict: Dict[str, str], | ||
) -> None: | ||
""" | ||
Test the trestlebot autosync command with SSPs with failure. | ||
Notes: The test should fail because of the missing entry in the ssp-index. | ||
This simulates the use case if an SSP is created outside of the tool. | ||
""" | ||
tmp_repo_str, _ = tmp_repo | ||
tmp_repo_path = pathlib.Path(tmp_repo_str) | ||
|
||
args = setup_for_ssp(tmp_repo_path, test_prof, [test_comp_name], test_ssp_md) | ||
|
||
# Create or generate the SSP | ||
if not skip_create: | ||
create_args: Dict[str, str] = { | ||
"markdown-path": command_args["markdown-path"], | ||
"branch": command_args["branch"], | ||
"committer-name": command_args["committer-name"], | ||
"committer-email": command_args["committer-email"], | ||
"ssp-name": test_ssp_name, | ||
"profile-name": test_prof, | ||
"compdefs": test_comp_name, | ||
} | ||
command = e2e_runner.build_test_command( | ||
tmp_repo_str, | ||
"create-ssp", | ||
create_args, | ||
) | ||
exit_code, _ = e2e_runner.invoke_command(command) | ||
assert exit_code == response | ||
assert (tmp_repo_path / command_args["markdown-path"]).exists() | ||
|
||
# Make a change to the SSP | ||
ssp, ssp_path = ModelUtils.load_model_for_class( | ||
tmp_repo_path, | ||
test_ssp_name, | ||
SystemSecurityPlan, | ||
FileContentType.JSON, | ||
) | ||
ssp.metadata.title = "New Title" | ||
ssp.oscal_write(ssp_path) | ||
else: | ||
ssp_generate = SSPGenerate() | ||
assert ssp_generate._run(args) == 0 | ||
|
||
command = e2e_runner.build_test_command(tmp_repo_str, "autosync", command_args) | ||
run_response = subprocess.run(command, capture_output=True) | ||
assert run_response.returncode == response | ||
ssp_md_path = pathlib.Path(test_ssp_md) / test_ssp_name | ||
args = setup_for_ssp(tmp_repo_path, test_prof, [test_comp_name], str(ssp_md_path)) | ||
|
||
# Check that the ssp was pushed to the remote | ||
if response == SUCCESS_EXIT_CODE: | ||
branch = command_args["branch"] | ||
assert ( | ||
f"Changes pushed to {branch} successfully." | ||
in run_response.stdout.decode("utf-8") | ||
) | ||
|
||
# Check that the correct files are present with the correct content | ||
index_path = os.path.join(tmp_repo_str, "ssp-index.json") | ||
ssp_index = SSPIndex(index_path) | ||
assert ssp_index.get_profile_by_ssp(test_ssp_name) == test_prof | ||
assert ssp_index.get_comps_by_ssp(test_ssp_name) == [test_comp_name] | ||
assert ssp_index.get_leveraged_by_ssp(test_ssp_name) is None | ||
assert ssp_path.exists() | ||
|
||
# Check that if run again, the ssp is not pushed again | ||
command = e2e_runner.build_test_command(tmp_repo_str, "autosync", command_args) | ||
exit_code, response_stdout = e2e_runner.invoke_command(command) | ||
assert exit_code == SUCCESS_EXIT_CODE | ||
assert "Nothing to commit" in response_stdout | ||
|
||
# Check that if the upstream profile is updated, the ssp is updated | ||
local_upstream_path = prepare_upstream_repo() | ||
upstream_repos_arg = f"{e2e_runner.UPSTREAM_REPO}@main" | ||
upstream_command_args = { | ||
"branch": command_args["branch"], | ||
"committer-name": command_args["committer-name"], | ||
"committer-email": command_args["committer-email"], | ||
"sources": upstream_repos_arg, | ||
} | ||
command = e2e_runner.build_test_command( | ||
tmp_repo_str, | ||
"sync-upstreams", | ||
upstream_command_args, | ||
local_upstream_path, | ||
) | ||
exit_code, response_stdout = e2e_runner.invoke_command(command) | ||
assert exit_code == SUCCESS_EXIT_CODE | ||
assert ( | ||
f"Changes pushed to {command_args['branch']} successfully." | ||
in run_response.stdout.decode("utf-8") | ||
) | ||
|
||
# Autosync again to check that the ssp is updated | ||
command = e2e_runner.build_test_command(tmp_repo_str, "autosync", command_args) | ||
exit_code, response_stdout = e2e_runner.invoke_command(command) | ||
assert exit_code == SUCCESS_EXIT_CODE | ||
assert ( | ||
f"Changes pushed to {command_args['branch']} successfully." | ||
in response_stdout | ||
) | ||
|
||
# Clean up the upstream repo | ||
clean(local_upstream_path, None) | ||
ssp_generate = SSPGenerate() | ||
assert ssp_generate._run(args) == 0 | ||
|
||
autosync_command = e2e_runner.build_test_command( | ||
tmp_repo_str, "autosync", valid_args_dict | ||
) | ||
exit_code, _, response_stderr = e2e_runner.invoke_command(autosync_command) | ||
assert exit_code == ERROR_EXIT_CODE | ||
assert "SSP test_ssp does not exists in the index" in response_stderr |