-
Notifications
You must be signed in to change notification settings - Fork 14
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor: improves readabilty of SSP E2E testing and tests error mess…
…ages Signed-off-by: Jennifer Power <[email protected]>
- Loading branch information
Showing
4 changed files
with
156 additions
and
137 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,138 +43,154 @@ | |
test_ssp_name = "test_ssp" | ||
|
||
|
||
@pytest.fixture | ||
def valid_args_dict() -> Dict[str, str]: | ||
return { | ||
"branch": "test", | ||
"markdown-path": test_ssp_md, | ||
"oscal-model": "ssp", | ||
"committer-name": "test", | ||
"committer-email": "[email protected]", | ||
"ssp-index": "ssp-index.json", | ||
} | ||
|
||
|
||
def replace_line_in_file_after_tag( | ||
file_path: pathlib.Path, tag: str, new_line: str | ||
) -> bool: | ||
"""Replace the line after tag with new string.""" | ||
with file_path.open("r") as f: | ||
lines = f.readlines() | ||
|
||
for i, line in enumerate(lines): | ||
if tag in line: | ||
lines[i + 1] = new_line | ||
|
||
with file_path.open("w") as f: | ||
f.writelines(lines) | ||
return True | ||
return False | ||
|
||
|
||
@pytest.mark.slow | ||
@pytest.mark.parametrize( | ||
"test_name, command_args, response, skip_create", | ||
[ | ||
( | ||
"success/happy path", | ||
{ | ||
"branch": "test", | ||
"markdown-path": test_ssp_md, | ||
"oscal-model": "ssp", | ||
"committer-name": "test", | ||
"committer-email": "[email protected]", | ||
"ssp-index": "ssp-index.json", | ||
}, | ||
SUCCESS_EXIT_CODE, | ||
False, | ||
), | ||
( | ||
"failure/missing-ssp-index", | ||
{ | ||
"branch": "test", | ||
"markdown-path": test_ssp_md, | ||
"oscal-model": "ssp", | ||
"committer-name": "test", | ||
"committer-email": "[email protected]", | ||
}, | ||
ERROR_EXIT_CODE, | ||
True, | ||
), | ||
], | ||
) | ||
def test_ssp_editing_e2e( | ||
tmp_repo: Tuple[str, Repo], | ||
e2e_runner: E2ETestRunner, | ||
test_name: str, | ||
command_args: Dict[str, str], | ||
response: int, | ||
skip_create: bool, | ||
valid_args_dict: Dict[str, str], | ||
) -> None: | ||
"""Test the trestlebot autosync command with SSPs.""" | ||
logger.info(f"Running test: {test_name}") | ||
|
||
tmp_repo_str, _ = tmp_repo | ||
tmp_repo_path = pathlib.Path(tmp_repo_str) | ||
|
||
args = setup_for_ssp(tmp_repo_path, test_prof, [test_comp_name], test_ssp_md) | ||
ssp_md_path = pathlib.Path(test_ssp_md) / test_ssp_name | ||
_ = setup_for_ssp(tmp_repo_path, test_prof, [test_comp_name], str(ssp_md_path)) | ||
|
||
# Get command arguments for the test | ||
branch = command_args["branch"] | ||
markdown_path = command_args["markdown-path"] | ||
committer_name = command_args["committer-name"] | ||
committer_email = command_args["committer-email"] | ||
|
||
# Create or generate the SSP | ||
if not skip_create: | ||
create_args: Dict[str, str] = { | ||
"markdown-path": markdown_path, | ||
"branch": branch, | ||
"committer-name": committer_name, | ||
"committer-email": committer_email, | ||
"ssp-name": test_ssp_name, | ||
"profile-name": test_prof, | ||
"compdefs": test_comp_name, | ||
} | ||
create_command = e2e_runner.build_test_command( | ||
tmp_repo_str, | ||
"create-ssp", | ||
create_args, | ||
) | ||
exit_code, _ = e2e_runner.invoke_command(create_command) | ||
assert exit_code == response | ||
assert (tmp_repo_path / markdown_path).exists() | ||
|
||
# Make a change to the SSP | ||
ssp, ssp_path = ModelUtils.load_model_for_class( | ||
tmp_repo_path, | ||
test_ssp_name, | ||
SystemSecurityPlan, | ||
FileContentType.JSON, | ||
) | ||
ssp.metadata.title = "New Title" | ||
ssp.oscal_write(ssp_path) | ||
else: | ||
ssp_generate = SSPGenerate() | ||
assert ssp_generate._run(args) == 0 | ||
branch = valid_args_dict["branch"] | ||
markdown_path = valid_args_dict["markdown-path"] | ||
committer_name = valid_args_dict["committer-name"] | ||
committer_email = valid_args_dict["committer-email"] | ||
|
||
create_args: Dict[str, str] = { | ||
"markdown-path": markdown_path, | ||
"branch": branch, | ||
"committer-name": committer_name, | ||
"committer-email": committer_email, | ||
"ssp-name": test_ssp_name, | ||
"profile-name": test_prof, | ||
"compdefs": test_comp_name, | ||
} | ||
create_command = e2e_runner.build_test_command( | ||
tmp_repo_str, | ||
"create-ssp", | ||
create_args, | ||
) | ||
exit_code, _, _ = e2e_runner.invoke_command(create_command) | ||
assert exit_code == SUCCESS_EXIT_CODE | ||
assert (tmp_repo_path / markdown_path).exists() | ||
|
||
autosync_command = e2e_runner.build_test_command( | ||
tmp_repo_str, "autosync", command_args | ||
# Check that the correct files are present with the correct content | ||
ssp_path = ModelUtils.get_model_path_for_name_and_class( | ||
tmp_repo_path, test_ssp_name, SystemSecurityPlan, FileContentType.JSON | ||
) | ||
index_path = os.path.join(tmp_repo_str, "ssp-index.json") | ||
ssp_index = SSPIndex(index_path) | ||
assert ssp_index.get_profile_by_ssp(test_ssp_name) == test_prof | ||
assert ssp_index.get_comps_by_ssp(test_ssp_name) == [test_comp_name] | ||
assert ssp_index.get_leveraged_by_ssp(test_ssp_name) is None | ||
assert ssp_path.exists() | ||
|
||
# Make a change to the SSP | ||
ac_1_path = tmp_repo_path / ssp_md_path / "ac" / "ac-1.md" | ||
assert replace_line_in_file_after_tag( | ||
ac_1_path, "ac-1_prm_6:", " values:\n ssp-values:\n - my ssp val\n" | ||
) | ||
exit_code, response_stdout = e2e_runner.invoke_command(autosync_command) | ||
assert exit_code == response | ||
|
||
autosync_command = e2e_runner.build_test_command( | ||
tmp_repo_str, "autosync", valid_args_dict | ||
) | ||
exit_code, response_stdout, _ = e2e_runner.invoke_command(autosync_command) | ||
assert exit_code == SUCCESS_EXIT_CODE | ||
# Check that the ssp was pushed to the remote | ||
if response == SUCCESS_EXIT_CODE: | ||
assert f"Changes pushed to {branch} successfully." in response_stdout | ||
|
||
# Check that the correct files are present with the correct content | ||
index_path = os.path.join(tmp_repo_str, "ssp-index.json") | ||
ssp_index = SSPIndex(index_path) | ||
assert ssp_index.get_profile_by_ssp(test_ssp_name) == test_prof | ||
assert ssp_index.get_comps_by_ssp(test_ssp_name) == [test_comp_name] | ||
assert ssp_index.get_leveraged_by_ssp(test_ssp_name) is None | ||
assert ssp_path.exists() | ||
|
||
# Check that if run again, the ssp is not pushed again | ||
exit_code, response_stdout = e2e_runner.invoke_command(autosync_command) | ||
assert exit_code == SUCCESS_EXIT_CODE | ||
assert "Nothing to commit" in response_stdout | ||
|
||
# Check that if the upstream profile is updated, the ssp is updated | ||
local_upstream_path = prepare_upstream_repo() | ||
upstream_repos_arg = f"{e2e_runner.UPSTREAM_REPO}@main" | ||
upstream_command_args = { | ||
"branch": branch, | ||
"committer-name": committer_name, | ||
"committer-email": committer_email, | ||
"sources": upstream_repos_arg, | ||
} | ||
sync_upstreams_command = e2e_runner.build_test_command( | ||
tmp_repo_str, | ||
"sync-upstreams", | ||
upstream_command_args, | ||
local_upstream_path, | ||
) | ||
exit_code, response_stdout = e2e_runner.invoke_command(sync_upstreams_command) | ||
assert exit_code == SUCCESS_EXIT_CODE | ||
assert f"Changes pushed to {branch} successfully." in response_stdout | ||
|
||
# Autosync again to check that the ssp is updated | ||
exit_code, response_stdout = e2e_runner.invoke_command(autosync_command) | ||
assert exit_code == SUCCESS_EXIT_CODE | ||
assert f"Changes pushed to {branch} successfully." in response_stdout | ||
|
||
# Clean up the upstream repo | ||
clean(local_upstream_path, None) | ||
assert f"Changes pushed to {branch} successfully." in response_stdout | ||
|
||
# Check that if run again, the ssp is not pushed again | ||
exit_code, response_stdout, _ = e2e_runner.invoke_command(autosync_command) | ||
assert exit_code == SUCCESS_EXIT_CODE | ||
assert "Nothing to commit" in response_stdout | ||
|
||
# Check that if the upstream profile is updated, the ssp is updated | ||
local_upstream_path = prepare_upstream_repo() | ||
upstream_repos_arg = f"{e2e_runner.UPSTREAM_REPO}@main" | ||
upstream_command_args = { | ||
"branch": branch, | ||
"committer-name": committer_name, | ||
"committer-email": committer_email, | ||
"sources": upstream_repos_arg, | ||
} | ||
sync_upstreams_command = e2e_runner.build_test_command( | ||
tmp_repo_str, | ||
"sync-upstreams", | ||
upstream_command_args, | ||
local_upstream_path, | ||
) | ||
exit_code, response_stdout, _ = e2e_runner.invoke_command(sync_upstreams_command) | ||
assert exit_code == SUCCESS_EXIT_CODE | ||
assert f"Changes pushed to {branch} successfully." in response_stdout | ||
|
||
# Autosync again to check that the ssp is updated | ||
exit_code, response_stdout, _ = e2e_runner.invoke_command(autosync_command) | ||
assert exit_code == SUCCESS_EXIT_CODE | ||
assert f"Changes pushed to {branch} successfully." in response_stdout | ||
|
||
# Clean up the upstream repo | ||
clean(local_upstream_path, None) | ||
|
||
|
||
@pytest.mark.slow | ||
def test_ssp_e2e_editing_failure( | ||
tmp_repo: Tuple[str, Repo], | ||
e2e_runner: E2ETestRunner, | ||
valid_args_dict: Dict[str, str], | ||
) -> None: | ||
""" | ||
Test the trestlebot autosync command with SSPs with failure. | ||
Notes: The test should fail because of the missing entry in the ssp-index. | ||
This simulates the use case if an SSP is created outside of the tool. | ||
""" | ||
tmp_repo_str, _ = tmp_repo | ||
tmp_repo_path = pathlib.Path(tmp_repo_str) | ||
|
||
ssp_md_path = pathlib.Path(test_ssp_md) / test_ssp_name | ||
args = setup_for_ssp(tmp_repo_path, test_prof, [test_comp_name], str(ssp_md_path)) | ||
|
||
ssp_generate = SSPGenerate() | ||
assert ssp_generate._run(args) == 0 | ||
|
||
autosync_command = e2e_runner.build_test_command( | ||
tmp_repo_str, "autosync", valid_args_dict | ||
) | ||
exit_code, _, response_stderr = e2e_runner.invoke_command(autosync_command) | ||
assert exit_code == ERROR_EXIT_CODE | ||
assert "SSP test_ssp does not exists in the index" in response_stderr |