diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 9c26e755..9eb30e1b 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -73,7 +73,9 @@ For workflow diagrams, see the [diagrams](./docs/workflows/) under the `docs` fo #### Code structure - `actions` - Provides specific logic for `trestle-bot` tasks that are packaged as Actions. See [README.md](./actions/README.md) for more information. -- `entrypoints` - Provides top level logic for specific user-facing tasks. These tasks are not necessarily related in any way so they are not organized into a hierarchical command structure, but they do inherit logic and flags from a base class. +- `cli` - Provides top level logic for specific user-facing tasks. These tasks are not necessarily related so they are not organized into a hierarchical command structure, but they do share some common modules. +- `cli/commands` - Provides top level logic for commands and their associated subcommands. The commands are accessed by the single entrypoint `root.py`. +- `cli/options` - Provides command line options and arguments that are frequently used within `cli/commands`. - `provider.py, github.py, and gitlab.py` - Git provider abstract class and concrete implementations for interacting with the API. - `tasks` - Pre-tasks can be configured before the main git logic is run. Any task that does workspace management should go here. - `tasks/authored` - The `authored` package contains logic for managing authoring tasks for single instances of a top-level OSCAL model. These encapsulate logic from the `compliance-trestle` library and allows loose coupling between `tasks` and `authored` types. diff --git a/README.md b/README.md index ee98868d..f5ee35a5 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ -trestle-bot assists users in leveraging [Compliance-Trestle](https://github.com/oscal-compass/compliance-trestle) in CI/CD workflows for [OSCAL](https://github.com/usnistgov/OSCAL) formatted compliance content management. +trestle-bot is a CLI tool that assists users in leveraging [Compliance-Trestle](https://github.com/oscal-compass/compliance-trestle) in CI/CD workflows for [OSCAL](https://github.com/usnistgov/OSCAL) formatted compliance content management. > WARNING: This project is currently under initial development. APIs may be changed incompatibly from one commit to another. @@ -15,25 +15,25 @@ trestle-bot assists users in leveraging [Compliance-Trestle](https://github.com/ ### Available Commands -The `autosync` command will sync trestle-generated Markdown files to OSCAL JSON files in a trestle workspace. All content under the provided markdown directory when the action is run will be transformed. This action supports all top-level models [supported by compliance-trestle for authoring](https://oscal-compass.github.io/compliance-trestle/tutorials/ssp_profile_catalog_authoring/ssp_profile_catalog_authoring/). +The `autosync` command will sync trestle-generated Markdown files to OSCAL JSON files in a trestle workspace. All content under the provided markdown directory will be transformed when the action is run. This action supports all top-level models [supported by compliance-trestle for authoring](https://oscal-compass.github.io/compliance-trestle/tutorials/ssp_profile_catalog_authoring/ssp_profile_catalog_authoring/). The `rules-transform` command can be used when managing [OSCAL Component Definitions](https://pages.nist.gov/OSCAL-Reference/models/v1.1.1/component-definition/json-outline/) in a trestle workspace. The action will transform rules defined in the rules YAML view to an OSCAL Component Definition JSON file. -The `create-cd` command can be used to create a new [OSCAL Component Definition](https://pages.nist.gov/OSCAL-Reference/models/v1.1.1/component-definition/json-outline/) in a trestle workspace. The action will create a new Component Definition JSON file and corresponding directories that contain rules YAML files and trestle-generated Markdown files. This action prepares the workspace for use with the `rules-transform` and `autosync` actions. +The `create compdef` command can be used to create a new [OSCAL Component Definition](https://pages.nist.gov/OSCAL-Reference/models/v1.1.1/component-definition/json-outline/) in a trestle workspace. The action will create a new Component Definition JSON file and corresponding directories that contain rules YAML files and trestle-generated Markdown files. This action prepares the workspace for use with the `rules-transform` and `autosync` actions. -The `sync-upstreams` command can be used to sync and validate upstream OSCAL content stored in a git repository to a local trestle workspace. Which content is synced is determined by the `include_model_names` and `exclude_model_names` inputs. +The `sync-upstreams` command can be used to sync and validate upstream OSCAL content stored in a git repository to a local trestle workspace. The inputs `include_models` and `exclude_models` determine which content is synced to the trestle workspace. -The `create-ssp` command can be used to create a new [OSCAL System Security Plans](https://pages.nist.gov/OSCAL-Reference/models/v1.1.1/system-security-plan/json-outline/) (SSP) in a trestle workspace. The action will create a new SSP JSON file and corresponding directories that contain trestle-generated Markdown files. This action prepares the workspace for use with the `autosync` action by creating or updating the `ssp-index.json` file. The `ssp-index.json` file is used to track the relationships between the SSP and the other OSCAL content in the workspace for the `autosync` action. +The `create ssp` command can be used to create a new [OSCAL System Security Plans](https://pages.nist.gov/OSCAL-Reference/models/v1.1.1/system-security-plan/json-outline/) (SSP) in a trestle workspace. The action will create a new SSP JSON file and corresponding directories that contain trestle-generated Markdown files. This action prepares the workspace for use with the `autosync` action by creating or updating the `ssp-index.json` file. The `ssp-index.json` file is used to track the relationships between the SSP and the other OSCAL content in the workspace for the `autosync` action. Below is a table of the available commands and their current availability as a GitHub Action: -| Command | Available as a GitHub Action | -|--------------------|------------------------------| -| `autosync` | ✓ | -| `rules-transform` | ✓ | -| `create-cd` | ✓ | -| `sync-upstreams` | ✓ | -| `create-ssp` | | +| Command | Available as a GitHub Action | +|-------------------|------------------------------| +| `autosync` | ✓ | +| `rules-transform` | ✓ | +| `create compdef` | ✓ | +| `sync-upstreams` | ✓ | +| `create ssp` | | For detailed documentation on how to use each action, see the README.md in each folder under [actions](./actions/). @@ -47,7 +47,7 @@ provider information is supported for GitHub Actions (GitHub) and GitLab CI (Git ### Run as a Container -> Note: When running the commands in a container, all are prefixed with `trestlebot` (e.g. `trestlebot-autosync`). The default entrypoint for the container is the autosync command. +> Note: When running the commands in a container, all are prefixed with `trestlebot` (e.g. `trestlebot autosync`). The default entrypoint for the container is the autosync command. Build and run the container locally: @@ -72,4 +72,4 @@ This project is licensed under the Apache 2.0 License - see the [LICENSE.md](LIC ## Troubleshooting -See [TROUBLESHOOTING.md](./TROUBLESHOOTING.md) for troubleshooting tips. \ No newline at end of file +See [TROUBLESHOOTING.md](./TROUBLESHOOTING.md) for troubleshooting tips. diff --git a/poetry.lock b/poetry.lock index 76114638..3b3465d9 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2816,6 +2816,17 @@ rich = ">=10.11.0" shellingham = ">=1.3.0" typing-extensions = ">=3.7.4.3" +[[package]] +name = "types-pyyaml" +version = "6.0.12.20240917" +description = "Typing stubs for PyYAML" +optional = false +python-versions = ">=3.8" +files = [ + {file = "types-PyYAML-6.0.12.20240917.tar.gz", hash = "sha256:d1405a86f9576682234ef83bcb4e6fff7c9305c8b1fbad5e0bcd4f7dbdc9c587"}, + {file = "types_PyYAML-6.0.12.20240917-py3-none-any.whl", hash = "sha256:392b267f1c0fe6022952462bf5d6523f31e37f6cea49b14cee7ad634b6301570"}, +] + [[package]] name = "typing-extensions" version = "4.12.2" @@ -2944,4 +2955,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.0" python-versions = "^3.8.1" -content-hash = "1f08efc70a8602cf883b114f94118473daf61cff47ce10d922d00c7470bc1b49" +content-hash = "bd770486b5c4a8645c0b2d357a8e316aa47f7a2c7f38b997b659ab0de6334bfa" diff --git a/pyproject.toml b/pyproject.toml index bd9dd5fc..f22f9666 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,7 @@ flake8-print = "^5.0.0" pre-commit = "^3.4.0" mkdocs-material = "^9.5.43" markdown-include = "^0.8.1" +types-pyyaml = "^6.0.12.20240917" [tool.poetry.group.tests] optional = true diff --git a/tests/trestlebot/cli/test_autosync_cmd.py b/tests/trestlebot/cli/test_autosync_cmd.py new file mode 100644 index 00000000..e0d370ec --- /dev/null +++ b/tests/trestlebot/cli/test_autosync_cmd.py @@ -0,0 +1,94 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2024 Red Hat, Inc. + + +"""Testing module for trestlebot autosync command""" +import pathlib +from typing import Tuple + +from click.testing import CliRunner +from git import Repo + +from trestlebot.cli.commands.autosync import autosync_cmd +from trestlebot.cli.config import TrestleBotConfig, write_to_file + + +def test_invalid_oscal_model(tmp_repo: Tuple[str, Repo]) -> None: + """Test invalid OSCAl model option.""" + + repo_path, _ = tmp_repo + runner = CliRunner() + result = runner.invoke( + autosync_cmd, + [ + "--oscal-model", + "invalid", + "--repo-path", + repo_path, + "--markdown-dir", + "markdown", + "--branch", + "main", + "--committer-name", + "Test User", + "--committer-email", + "test@example.com", + ], + ) + assert "Invalid value for '--oscal-model'" in result.output + assert result.exit_code == 2 + + +def test_missing_ssp_index_file_option(tmp_repo: Tuple[str, Repo]) -> None: + """Test missing ssp_index_file option for autosync ssp.""" + repo_path, _ = tmp_repo + runner = CliRunner() + cmd_options = [ + "--oscal-model", + "ssp", + "--repo-path", + repo_path, + "--markdown-dir", + "markdown", + "--branch", + "main", + "--committer-name", + "Test User", + "--committer-email", + "test@example.com", + ] + result = runner.invoke(autosync_cmd, cmd_options) + assert result.exit_code == 1 + assert "Missing option '--ssp-index-file'" in result.output + + +def test_missing_markdown_dir_option(tmp_repo: Tuple[str, Repo]) -> None: + # When no markdown_dir setting in trestlebot config file. + repo_path, _ = tmp_repo + runner = CliRunner() + filepath = pathlib.Path(repo_path).joinpath("config.yml") + config_obj = TrestleBotConfig(repo_path=repo_path) + write_to_file(config_obj, filepath) + cmd_options = [ + "--oscal-model", + "compdef", + "--repo-path", + repo_path, + "--branch", + "main", + "--committer-name", + "Test User", + "--committer-email", + "test@example.com", + "--config", + str(filepath), + ] + result = runner.invoke(autosync_cmd, cmd_options) + assert result.exit_code == 2 + assert "Error: Missing option '--markdown-dir'" in result.output + + # With 'markdown_dir' setting in config.yml + config_obj = TrestleBotConfig(markdown_dir="markdown") + write_to_file(config_obj, filepath) + result = runner.invoke(autosync_cmd, cmd_options) + assert result.exit_code == 0 diff --git a/tests/trestlebot/cli/test_config.py b/tests/trestlebot/cli/test_config.py new file mode 100644 index 00000000..60a40210 --- /dev/null +++ b/tests/trestlebot/cli/test_config.py @@ -0,0 +1,80 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2024 Red Hat, Inc. + + +"""Unit tests for CLI config module""" +import pathlib + +import pytest +import yaml + +from trestlebot.cli.config import ( + TrestleBotConfig, + TrestleBotConfigError, + UpstreamsConfig, + load_from_file, + make_config, + write_to_file, +) + + +@pytest.fixture +def config_obj() -> TrestleBotConfig: + return TrestleBotConfig( + repo_path="/tmp", + markdown_dir="markdown", + upstreams=UpstreamsConfig(sources=["repo@main"]), + ) + + +def test_invalid_config_raises_errors() -> None: + """Test create config with invalid directory to raise error.""" + + with pytest.raises(TrestleBotConfigError) as ex: + _ = make_config(dict(repo_path="0")) + + assert ( + str(ex.value) + == "Invalid config value for repo_path. Path does not point to a directory." + ) + + +def test_make_config_raises_no_errors(tmp_init_dir: str) -> None: + """Test create a valid config object.""" + values = { + "repo_path": tmp_init_dir, + "markdown_dir": "markdown", + "committer_name": "committer-name", + "committer_email": "committer-email", + "upstreams": {"sources": ["https://test@main"], "skip_validation": True}, + } + config = make_config(values) + assert isinstance(config, TrestleBotConfig) + assert config.upstreams is not None + assert config.upstreams.sources == ["https://test@main"] + assert config.upstreams.skip_validation is True + assert config.repo_path == pathlib.Path(tmp_init_dir) + assert config.markdown_dir == values["markdown_dir"] + assert config.committer_name == values["committer_name"] + assert config.committer_email == values["committer_email"] + + +def test_config_write_to_file(config_obj: TrestleBotConfig, tmp_init_dir: str) -> None: + """Test config is written to yaml file.""" + filepath = pathlib.Path(tmp_init_dir).joinpath("config.yml") + write_to_file(config_obj, filepath) + with open(filepath, "r") as f: + yaml_data = yaml.safe_load(f) + + assert yaml_data == config_obj.to_yaml_dict() + + +def test_config_load_from_file(config_obj: TrestleBotConfig, tmp_init_dir: str) -> None: + """Test config is read from yaml file into config object.""" + filepath = pathlib.Path(tmp_init_dir).joinpath("config.yml") + with filepath.open("w") as config_file: + yaml.dump(config_obj.to_yaml_dict(), config_file) + + config = load_from_file(filepath) + assert isinstance(config, TrestleBotConfig) + assert config == config_obj diff --git a/tests/trestlebot/cli/test_create_cmd.py b/tests/trestlebot/cli/test_create_cmd.py new file mode 100644 index 00000000..c081645f --- /dev/null +++ b/tests/trestlebot/cli/test_create_cmd.py @@ -0,0 +1,167 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2024 Red Hat, Inc. + +""" Unit test for create commands ssp and cd""" +import pathlib +from typing import Tuple + +from click.testing import CliRunner +from git import Repo + +from tests.testutils import setup_for_compdef, setup_for_ssp +from trestlebot.cli.commands.create import create_cmd + + +test_prof = "simplified_nist_profile" +test_comp_name = "test_comp" +test_ssp_md = "md_ssp" +test_ssp_cd = "md_cd" + + +def test_invalid_create_cmd() -> None: + """Tests that create command fails if given invalid oscal model subcommand.""" + runner = CliRunner() + result = runner.invoke(create_cmd, ["invalid"]) + + assert "Error: No such command 'invalid'" in result.output + assert result.exit_code == 2 + + +def test_create_ssp_cmd(tmp_repo: Tuple[str, Repo]) -> None: + """Tests successful create ssp command.""" + + repo_dir, _ = tmp_repo + repo_path = pathlib.Path(repo_dir) + + ssp_index_file = repo_path.joinpath("ssp-index.json") + + _ = setup_for_ssp(repo_path, test_prof, [test_comp_name], test_ssp_md) + + runner = CliRunner() + result = runner.invoke( + create_cmd, + [ + "ssp", + "--profile-name", + test_prof, + "--markdown-dir", + "markdown", + "--compdefs", + test_comp_name, + "--ssp-name", + "test-name", + "--repo-path", + str(repo_path.resolve()), + "--ssp-index-file", + ssp_index_file, + "--committer-email", + "test@email.com", + "--committer-name", + "test name", + "--branch", + "test", + ], + ) + assert result.exit_code == 0 + + +def test_create_compdef_cmd(tmp_repo: Tuple[str, Repo]) -> None: + """Tests successful create compdef command.""" + repo_dir, _ = tmp_repo + repo_path = pathlib.Path(repo_dir) + + _ = setup_for_compdef(repo_path, test_comp_name, test_ssp_cd) + + runner = CliRunner() + result = runner.invoke( + create_cmd, + [ + "compdef", + "--profile-name", + "oscal-profile-name", + "--compdef-name", + "test-name", + "--component-title", + "title-test", + "--component-description", + "description-test", + "--component-definition-type", + "type-test", + "--repo-path", + str(repo_path.resolve()), + "--committer-email", + "test@email.com", + "--committer-name", + "test name", + "--branch", + "test", + ], + ) + assert result.exit_code == 0 + + +def test_default_ssp_index_file_cmd(tmp_repo: Tuple[str, Repo]) -> None: + """Tests successful default ssp_index.json file creation.""" + + repo_dir, _ = tmp_repo + repo_path = pathlib.Path(repo_dir) + + runner = CliRunner() + result = runner.invoke( + create_cmd, + [ + "ssp", + "--profile-name", + "oscal-profile-name", + "--markdown-dir", + "markdown", + "--compdefs", + "test-compdef", + "--ssp-name", + "test-name", + "--repo-path", + str(repo_path.resolve()), + "--committer-email", + "test@email.com", + "--committer-name", + "test name", + "--branch", + "test", + ], + ) + assert result.exit_code == 0 + + +def test_markdown_files_created(tmp_repo: Tuple[str, Repo]) -> None: + """Tests successful creation of markdown files.""" + + repo_dir, _ = tmp_repo + repo_path = pathlib.Path(repo_dir) + ssp_index_file = repo_path.joinpath("ssp-index.json") + + runner = CliRunner() + result = runner.invoke( + create_cmd, + [ + "ssp", + "--profile-name", + "oscal-profile-name", + "--markdown-dir", + "markdown", + "--compdefs", + "test-compdef", + "--ssp-name", + "test-name", + "--repo-path", + str(repo_path.resolve()), + "--ssp-index-file", + ssp_index_file, + "--committer-email", + "test@email.com", + "--committer-name", + "test name", + "--branch", + "test", + ], + ) + assert result.exit_code == 0 diff --git a/tests/trestlebot/cli/test_init_cmd.py b/tests/trestlebot/cli/test_init_cmd.py new file mode 100644 index 00000000..0bf05dc9 --- /dev/null +++ b/tests/trestlebot/cli/test_init_cmd.py @@ -0,0 +1,114 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2024 Red Hat, Inc. + + +"""Testing module for trestlebot init command""" +import pathlib + +import yaml +from click.testing import CliRunner +from trestle.common.const import MODEL_DIR_LIST, TRESTLE_CONFIG_DIR, TRESTLE_KEEP_FILE +from trestle.common.file_utils import is_hidden + +from tests.testutils import setup_for_init +from trestlebot.cli.commands.init import call_trestle_init, init_cmd +from trestlebot.const import TRESTLEBOT_CONFIG_DIR + + +def test_init_repo_dir_does_not_exist() -> None: + """Init should fail if repo dir does not exit""" + runner = CliRunner() + result = runner.invoke(init_cmd, ["--repo-path", "0"]) + assert result.exit_code == 2 + assert "does not exist." in result.output + + +def test_init_not_git_repo(tmp_init_dir: str) -> None: + """Init should fail if repo dir is not a Git repo.""" + runner = CliRunner() + result = runner.invoke( + init_cmd, ["--repo-path", tmp_init_dir, "--markdown-dir", "markdown"] + ) + assert result.exit_code == 1 + assert "not a Git repository" in result.output + + +def test_init_existing_trestlebot_dir(tmp_init_dir: str) -> None: + """Init should fail if repo already contains .trestlebot/ dir.""" + + # setup_for_init(pathlib.Path(tmp_init_dir)) + # Manulaly create .trestlebot dir so it already exists + trestlebot_dir = pathlib.Path(tmp_init_dir) / pathlib.Path(TRESTLEBOT_CONFIG_DIR) + trestlebot_dir.mkdir() + + setup_for_init(pathlib.Path(tmp_init_dir)) + + runner = CliRunner() + result = runner.invoke( + init_cmd, ["--repo-path", tmp_init_dir, "--markdown-dir", "markdown"] + ) + + assert result.exit_code == 1 + assert "existing .trestlebot directory" in result.output + + +def test_init_creates_config_file(tmp_init_dir: str) -> None: + """Test init command creates yaml config file.""" + + setup_for_init(pathlib.Path(tmp_init_dir)) + + runner = CliRunner() + result = runner.invoke( + init_cmd, ["--repo-path", tmp_init_dir, "--markdown-dir", "markdown"] + ) + assert result.exit_code == 0 + assert "Successfully initialized trestlebot" in result.output + + config_path = ( + pathlib.Path(tmp_init_dir) + .joinpath(TRESTLEBOT_CONFIG_DIR) + .joinpath("config.yml") + ) + with open(config_path, "r") as f: + yaml_data = yaml.safe_load(f) + + assert yaml_data["repo_path"] == tmp_init_dir + assert yaml_data["markdown_dir"] == "markdown" + + +def test_init_creates_model_dirs(tmp_init_dir: str) -> None: + """Init should create model directories in repo""" + + tmp_dir = pathlib.Path(tmp_init_dir) + setup_for_init(tmp_dir) + + runner = CliRunner() + runner.invoke(init_cmd, ["--repo-path", tmp_init_dir, "--markdown-dir", "markdown"]) + + model_dirs = [d.name for d in tmp_dir.iterdir() if not is_hidden(d)] + model_dirs.remove("markdown") # pop markdown dir + assert sorted(model_dirs) == sorted(MODEL_DIR_LIST) + + +def test_init_creates_markdown_dirs(tmp_init_dir: str) -> None: + """Init should create model directories in repo""" + + tmp_dir = pathlib.Path(tmp_init_dir) + markdown_dir = tmp_dir.joinpath("markdown") + setup_for_init(tmp_dir) + + runner = CliRunner() + runner.invoke(init_cmd, ["--repo-path", tmp_init_dir, "--markdown-dir", "markdown"]) + + markdown_dirs = [d.name for d in markdown_dir.iterdir() if not is_hidden(d)] + assert sorted(markdown_dirs) == sorted(MODEL_DIR_LIST) + + +def test_init_creates_trestle_dirs(tmp_init_dir: str) -> None: + """Init should create markdown dirs in repo""" + + tmp_dir = pathlib.Path(tmp_init_dir) + call_trestle_init(tmp_dir, False) + trestle_dir = tmp_dir.joinpath(TRESTLE_CONFIG_DIR) + keep_file = trestle_dir.joinpath(TRESTLE_KEEP_FILE) + assert keep_file.exists() is True diff --git a/tests/trestlebot/cli/test_rule_transform_cmd.py b/tests/trestlebot/cli/test_rule_transform_cmd.py new file mode 100644 index 00000000..e2d4f741 --- /dev/null +++ b/tests/trestlebot/cli/test_rule_transform_cmd.py @@ -0,0 +1,53 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2024 Red Hat, Inc. + + +"""Testing module for trestlebot rule-transform command""" + +import pathlib +from typing import Tuple + +from click.testing import CliRunner +from git import Repo + +from tests.testutils import setup_for_compdef, setup_rules_view +from trestlebot.cli.commands.rule_transform import rule_transform_cmd + + +test_comp_name = "test_comp" +test_md = "md_cd" + + +def test_rule_transform(tmp_repo: Tuple[str, Repo]) -> None: + """Test rule transform.""" + repo_path_str, repo = tmp_repo + + repo_path = pathlib.Path(repo_path_str) + + setup_for_compdef(repo_path, test_comp_name, test_md) + setup_rules_view(repo_path, test_comp_name) + + assert not repo_path.joinpath(test_md).exists() + + runner = CliRunner() + result = runner.invoke( + rule_transform_cmd, + [ + "--dry-run", + "--repo-path", + repo_path, + "--markdown-dir", + test_md, + "--branch", + "main", + "--committer-name", + "Test User", + "--committer-email", + "test@example.com", + ], + ) + + assert result.exit_code == 0 + assert repo_path.joinpath(test_md).exists() + commit = next(repo.iter_commits()) + assert len(commit.stats.files) == 9 diff --git a/tests/trestlebot/cli/test_sync_upstreams_cmd.py b/tests/trestlebot/cli/test_sync_upstreams_cmd.py new file mode 100644 index 00000000..b3e9e1ba --- /dev/null +++ b/tests/trestlebot/cli/test_sync_upstreams_cmd.py @@ -0,0 +1,157 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2024 Red Hat, Inc. + + +"""Unit tests for upstreams commands.""" + +import pathlib +from typing import Tuple + +from click.testing import CliRunner +from git import Repo + +from tests.testutils import clean, prepare_upstream_repo, setup_for_init +from trestlebot.cli.commands.sync_upstreams import sync_upstreams_cmd +from trestlebot.cli.config import make_config, write_to_file +from trestlebot.const import TRESTLEBOT_CONFIG_DIR + + +TEST_CATALOG = "simplified_nist_catalog" +TEST_CATALOG_PATH = "catalogs/simplified_nist_catalog/catalog.json" +TEST_PROFILE = "simplified_nist_profile" +TEST_PROFILE_PATH = "profiles/simplified_nist_profile/profile.json" + + +def test_sync_upstreams(tmp_repo: Tuple[str, Repo]) -> None: + """Test sync upstreams""" + repo_dir, _ = tmp_repo + repo_path = pathlib.Path(repo_dir) + + source: str = prepare_upstream_repo() + + runner = CliRunner() + result = runner.invoke( + sync_upstreams_cmd, + [ + "--repo-path", + repo_path, + "--sources", + f"{source}@main", + "--branch", + "main", + "--committer-email", + "test@email", + "--committer-name", + "test name", + ], + ) + + assert repo_path.joinpath(TEST_CATALOG_PATH).exists() + assert repo_path.joinpath(TEST_PROFILE_PATH).exists() + assert result.exit_code == 0 + clean(source) + + +def test_sync_upstreams_with_config( + tmp_repo: Tuple[str, Repo], tmp_init_dir: str +) -> None: + """Test sync upstreams using a config file.""" + + repo_dir, _ = tmp_repo + repo_path = pathlib.Path(repo_dir) + + source: str = prepare_upstream_repo() + + trestlebot_dir = pathlib.Path(tmp_init_dir) / pathlib.Path(TRESTLEBOT_CONFIG_DIR) + trestlebot_dir.mkdir() + + setup_for_init(pathlib.Path(tmp_init_dir)) + + config_path = ( + pathlib.Path(tmp_init_dir) + .joinpath(TRESTLEBOT_CONFIG_DIR) + .joinpath("config.yml") + ) + + config = make_config( + { + "branch": "main", + "committer_name": "test name", + "committer_email": "test@email", + "upstreams": { + "sources": [f"{source}@main"], + "include_models": ["*"], + "skip_validation": False, + }, + } + ) + + write_to_file(config, config_path) + + runner = CliRunner() + result = runner.invoke( + sync_upstreams_cmd, + ["--repo-path", repo_path, "--config", config_path.resolve()], + ) + assert repo_path.joinpath(TEST_CATALOG_PATH).exists() + assert repo_path.joinpath(TEST_PROFILE_PATH).exists() + assert result.exit_code == 0 + clean(source) + + +def test_sync_upstreams_exclude_models(tmp_repo: Tuple[str, Repo]) -> None: + """Test sync upstreams with exclude models""" + + repo_dir, _ = tmp_repo + repo_path = pathlib.Path(repo_dir) + + source: str = prepare_upstream_repo() + + runner = CliRunner() + result = runner.invoke( + sync_upstreams_cmd, + [ + "--repo-path", + repo_path, + "--sources", + f"{source}@main", + "--branch", + "main", + "--exclude-models", + TEST_PROFILE, + "--committer-email", + "test@email", + "--committer-name", + "test", + ], + ) + + assert repo_path.joinpath(TEST_CATALOG_PATH).exists() + assert not repo_path.joinpath(TEST_PROFILE_PATH).exists() + assert result.exit_code == 0 + clean(source) + + +def test_sync_upstreams_no_sources(tmp_repo: Tuple[str, Repo]) -> None: + """Test sync upstreams with sources option missing""" + + repo_dir, _ = tmp_repo + repo_path = pathlib.Path(repo_dir) + + runner = CliRunner() + result = runner.invoke( + sync_upstreams_cmd, + [ + "--repo-path", + repo_path, + "--branch", + "main", + "--committer-name", + "test name", + "--committer-email", + "test@email", + ], + ) + + assert "Missing option '--sources'" in result.output + assert result.exit_code == 1 diff --git a/trestlebot/cli/commands/autosync.py b/trestlebot/cli/commands/autosync.py new file mode 100644 index 00000000..87d607a5 --- /dev/null +++ b/trestlebot/cli/commands/autosync.py @@ -0,0 +1,123 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2024 Red Hat, Inc. + +""" Autosync command""" + +import logging +import sys +from typing import Any, List + +import click + +from trestlebot.cli.options.common import common_options, git_options, handle_exceptions +from trestlebot.cli.utils import comma_sep_to_list, run_bot +from trestlebot.const import ERROR_EXIT_CODE +from trestlebot.tasks.assemble_task import AssembleTask +from trestlebot.tasks.authored import types +from trestlebot.tasks.authored.base_authored import AuthoredObjectBase +from trestlebot.tasks.base_task import ModelFilter, TaskBase +from trestlebot.tasks.regenerate_task import RegenerateTask + + +logger = logging.getLogger(__name__) + + +@click.command("autosync", help="Autosync catalog, profile, compdef and ssp.") +@click.pass_context +@common_options +@git_options +@click.option( + "--oscal-model", + type=click.Choice(choices=[model.value for model in types.AuthoredType]), + help="OSCAL model type for autosync.", + required=True, +) +@click.option( + "--markdown-dir", + type=str, + help="Directory containing markdown files.", + required=True, +) +@click.option( + "--skip-items", + type=str, + help="Comma-separated list of glob patterns of the chosen model type \ + to skip when running tasks.", +) +@click.option( + "--skip-assemble", + help="Skip assembly task.", + is_flag=True, + default=False, + show_default=True, +) +@click.option( + "--skip-regenerate", + help="Skip regenerate task.", + is_flag=True, + default=False, + show_default=True, +) +@click.option( + "--version", + help="Version of the OSCAL model to set during assembly into JSON.", + type=str, +) +@click.option( + "--ssp-index-file", + help="Path to ssp index file. Required if --oscal-model is 'ssp'.", + type=str, + required=False, +) +@handle_exceptions +def autosync_cmd(ctx: click.Context, **kwargs: Any) -> None: + """Command to autosync catalog, profile, compdef and ssp.""" + + oscal_model = kwargs["oscal_model"] + markdown_dir = kwargs["markdown_dir"] + working_dir = str(kwargs["repo_path"].resolve()) + kwargs["working_dir"] = working_dir + + if oscal_model == "ssp" and not kwargs.get("ssp_index_file"): + logger.error("Trestlebot Error: Missing option '--ssp-index-file'.") + sys.exit(ERROR_EXIT_CODE) + + pre_tasks: List[TaskBase] = [] + + if kwargs.get("file_pattern"): + kwargs.update({"patterns": comma_sep_to_list(kwargs["file_patterns"])}) + + model_filter: ModelFilter = ModelFilter( + skip_patterns=comma_sep_to_list(kwargs.get("skip_items", "")), + include_patterns=["*"], + ) + authored_object: AuthoredObjectBase = types.get_authored_object( + oscal_model, + working_dir, + kwargs.get("ssp_index_path", ""), + ) + + # Assuming an edit has occurred assemble would be run before regenerate. + if not kwargs.get("skip_assemble"): + assemble_task: AssembleTask = AssembleTask( + authored_object=authored_object, + markdown_dir=markdown_dir, + version=kwargs.get("version", ""), + model_filter=model_filter, + ) + pre_tasks.append(assemble_task) + else: + logger.info("Assemble task skipped.") + + if not kwargs.get("skip_regenerate"): + regenerate_task: RegenerateTask = RegenerateTask( + authored_object=authored_object, + markdown_dir=markdown_dir, + model_filter=model_filter, + ) + pre_tasks.append(regenerate_task) + else: + logger.info("Regeneration task skipped.") + + results = run_bot(pre_tasks, kwargs) + logger.debug(f"Trestlebot results: {results}") diff --git a/trestlebot/cli/commands/create.py b/trestlebot/cli/commands/create.py new file mode 100644 index 00000000..f37d5104 --- /dev/null +++ b/trestlebot/cli/commands/create.py @@ -0,0 +1,233 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2024 Red Hat, Inc. + +""" +Module for create-cd create-ssp command for CLI +""" + +import logging +from typing import Any, List + +import click + +from trestlebot import const +from trestlebot.cli.options.common import common_options, git_options, handle_exceptions +from trestlebot.cli.options.create import common_create_options +from trestlebot.cli.utils import run_bot +from trestlebot.entrypoints.entrypoint_base import comma_sep_to_list +from trestlebot.tasks.assemble_task import AssembleTask +from trestlebot.tasks.authored.compdef import ( + AuthoredComponentDefinition, + FilterByProfile, +) +from trestlebot.tasks.authored.ssp import AuthoredSSP, SSPIndex +from trestlebot.tasks.base_task import ModelFilter, TaskBase +from trestlebot.tasks.regenerate_task import RegenerateTask +from trestlebot.tasks.rule_transform_task import RuleTransformTask +from trestlebot.transformers.yaml_transformer import ToRulesYAMLTransformer + + +logger = logging.getLogger(__name__) + + +@click.group(name="create", help="Component definition and ssp authoring.") +@click.pass_context +@handle_exceptions +def create_cmd(ctx: click.Context) -> None: + """ + Command leveraged for component definition and ssp authoring in trestlebot. + """ + pass + + +@create_cmd.command(name="compdef", help="Component definition authoring subcommand.") +@click.pass_context +@common_create_options +@common_options +@git_options +@click.option( + "--compdef-name", + required=True, + help="Name of component definition.", +) +@click.option( + "--component-title", + required=True, + help="Title of initial component.", +) +@click.option( + "--component-description", + required=True, + help="Description of initial component.", +) +@click.option( + "--filter-by-profile", + required=False, + type=str, + help="Optionally filter the controls in the component definition by a profile", +) +@click.option( + "--component-definition-type", + required=False, + type=str, + default="service", + help="Type of component definition", +) +@handle_exceptions +def compdef_cmd( + ctx: click.Context, + **kwargs: Any, +) -> None: + """ + Component definition authoring command. + """ + pre_tasks: List[TaskBase] = [] + + profile_name = kwargs["profile_name"] + compdef_name = kwargs["compdef_name"] + component_title = kwargs["component_title"] + component_description = kwargs["component_description"] + filter_by_profile = kwargs.get("filter_by_profile") + component_definition_type = kwargs.get("component_definition_type", "service") + repo_path = kwargs["repo_path"] + markdown_dir = kwargs["markdown_dir"] + if filter_by_profile: + filter_by_profile = FilterByProfile(repo_path, filter_by_profile) + + authored_comp: AuthoredComponentDefinition = AuthoredComponentDefinition( + trestle_root=repo_path, + ) + authored_comp.create_new_default( + profile_name=profile_name, + compdef_name=compdef_name, + comp_title=component_title, + comp_description=component_description, + comp_type=component_definition_type, + filter_by_profile=filter_by_profile, + ) + logger.info(f"Component definition name is: {component_title}.") + + transformer: ToRulesYAMLTransformer = ToRulesYAMLTransformer() + + model_filter: ModelFilter = ModelFilter( + [], [profile_name, component_title, f"{const.RULE_PREFIX}*"] + ) + + rule_transform_task: RuleTransformTask = RuleTransformTask( + working_dir=repo_path, + rules_view_dir=const.RULES_VIEW_DIR, + rule_transformer=transformer, + model_filter=model_filter, + ) + logger.info( + f"Profile to filter controls in the component files is: {filter_by_profile}." + ) + logger.debug( + f"Oscal profile in use with the component definition is: {profile_name}." + ) + logger.debug(f"Component definition type is {component_definition_type}.") + + pre_tasks.append(rule_transform_task) + + regenerate_task: RegenerateTask = RegenerateTask( + authored_object=authored_comp, + markdown_dir=markdown_dir, + model_filter=model_filter, + ) + pre_tasks.append(regenerate_task) + + run_bot(pre_tasks, kwargs) + + logger.debug(f"You have successfully authored the {compdef_name}.") + + +@create_cmd.command(name="ssp", help="Authoring ssp subcommand.") +@click.pass_context +@common_create_options +@common_options +@git_options +@click.option( + "--ssp-name", + required=True, + type=str, + help="Name of SSP to create.", +) +@click.option( + "--leveraged-ssp", + required=False, + type=str, + help="Provider SSP to leverage for the new SSP.", +) +@click.option( + "--ssp-index-file", + required=False, + type=str, + default="ssp-index.json", + help="Optionally set the path to the SSP index file.", +) +@click.option( + "--yaml-header-path", + required=False, + type=str, + help="Optionally set a path to a YAML file for custom SSP Markdown YAML headers.", +) +@click.option( + "--version", + required=False, + type=str, + help="Optionally set the version of the SSP.", +) +@click.option( + "--compdefs", + required=True, + type=str, + help="Comma separated list of component definitions.", +) +@handle_exceptions +def ssp_cmd( + ctx: click.Context, + **kwargs: Any, +) -> None: + """ + SSP Authoring command + """ + profile_name = kwargs["profile_name"] + ssp_name = kwargs["ssp_name"] + ssp_index_file = kwargs.get("ssp_index_file", "ssp-index.json") + repo_path = kwargs["repo_path"] + markdown_dir = kwargs["markdown_dir"] + compdefs = kwargs["compdefs"] + version = kwargs["version"] + + ssp_index: SSPIndex = SSPIndex(index_path=ssp_index_file) + authored_ssp: AuthoredSSP = AuthoredSSP(trestle_root=repo_path, ssp_index=ssp_index) + + logger.info(f"SSP index file is: {ssp_index_file}.") + + comps: List[str] = comma_sep_to_list(compdefs) + authored_ssp.create_new_default( + ssp_name=ssp_name, + profile_name=profile_name, + compdefs=comps, + markdown_path=markdown_dir, + leveraged_ssp=kwargs["leveraged_ssp"], + yaml_header=kwargs["yaml_header_path"], + ) + + logger.debug(f"The name of the SSP to create is {ssp_name}.") + logger.debug(f"Oscal profile in use with the SSP is: {profile_name}.") + + # The starting point for SSPs is the markdown, so assemble into JSON. + model_filter: ModelFilter = ModelFilter([], [ssp_name]) + assemble_task: AssembleTask = AssembleTask( + authored_object=authored_ssp, + markdown_dir=markdown_dir, + version=version, + model_filter=model_filter, + ) + + pre_tasks: List[TaskBase] = [assemble_task] + + run_bot(pre_tasks, kwargs) + + logger.debug(f"You have successfully authored the {ssp_name}.") diff --git a/trestlebot/cli/commands/init.py b/trestlebot/cli/commands/init.py new file mode 100644 index 00000000..28cb4336 --- /dev/null +++ b/trestlebot/cli/commands/init.py @@ -0,0 +1,181 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2024 Red Hat, Inc. + +"""" +Module for Trestle-bot init command +""" +import argparse +import logging +import os +import pathlib +import sys + +import click +from trestle.common.const import MODEL_DIR_LIST +from trestle.common.file_utils import make_hidden_file +from trestle.core.commands.common.return_codes import CmdReturnCodes +from trestle.core.commands.init import InitCmd + +from trestlebot.cli.config import make_config, write_to_file +from trestlebot.cli.options.common import common_options +from trestlebot.const import ( + ERROR_EXIT_CODE, + TRESTLEBOT_CONFIG_DIR, + TRESTLEBOT_KEEP_FILE, +) + + +logger = logging.getLogger(__name__) +logging.getLogger("trestle.core.commands.init").setLevel("CRITICAL") + + +def call_trestle_init(repo_path: pathlib.Path, debug: bool) -> None: + """Call compliance-trestle to initialize workspace""" + + verbose = 1 if debug else 0 + trestle_args = argparse.Namespace( + verbose=verbose, + trestle_root=repo_path, + full=False, + govdocs=True, + local=False, + ) + return_code = InitCmd()._run(trestle_args) + if return_code == CmdReturnCodes.SUCCESS.value: + logger.debug("Initialized trestle project successfully") + else: + logger.error( + f"Initialization failed. Unexpected trestle error: {CmdReturnCodes(return_code).name}" + ) + sys.exit(ERROR_EXIT_CODE) + + +def mkdir_with_hidden_file(file_path: pathlib.Path) -> None: + """Creates empty directory with .keep file""" + file_path.parent.mkdir(exist_ok=True, parents=True) + make_hidden_file(file_path) + + +@click.command(name="init", help="Initialize a new trestle-bot repo.") +@click.pass_context +@common_options +@click.option( + "--repo-path", + type=click.Path(path_type=pathlib.Path, exists=True), + help="Path to Git repository to initialize.", + default=os.getcwd(), + prompt="Enter path to Git repository", +) # override repo-path in common options to force prompt +@click.option( + "--markdown-dir", + type=str, + help="Directory name to store markdown files.", + default="markdown/", + prompt="Enter path to store markdown files", +) +@click.option( + "--default-committer-name", + type=str, + help="Default user name for Git commits.", + default="", + show_default=False, + prompt="Enter default user name for Git commits (press to skip)", +) +@click.option( + "--default-committer-email", + type=str, + help="Default user email for Git commits.", + default="", + show_default=False, + prompt="Enter default user email for Git commits (press to skip)", +) +@click.option( + "--default-commit-message", + type=str, + help="Default message for Git commits.", + default="", + show_default=False, + prompt="Enter default message for Git commits (press to skip)", +) +@click.option( + "--default-branch", + type=str, + help="Default repo branch to push automated changes.", + default="", + show_default=False, + prompt="Enter default repo branch to push automated changes (press to skip)", +) +def init_cmd( + ctx: click.Context, + debug: bool, + config_path: pathlib.Path, + dry_run: bool, + repo_path: pathlib.Path, + markdown_dir: str, + default_committer_name: str, + default_committer_email: str, + default_commit_message: str, + default_branch: str, +) -> None: + """Command to initialize a new trestlebot repo""" + + repo_path = repo_path.resolve() + git_path: pathlib.Path = repo_path.joinpath(pathlib.Path(".git")) + if not git_path.exists(): + logger.error( + f"Initialization failed. Given directory {repo_path} is not a Git repository." + ) + sys.exit(ERROR_EXIT_CODE) + + trestlebot_dir = repo_path.joinpath(pathlib.Path(TRESTLEBOT_CONFIG_DIR)) + if trestlebot_dir.exists(): + logger.error( + f"Initialization failed. Found existing {TRESTLEBOT_CONFIG_DIR} directory in {repo_path}" + ) + sys.exit(ERROR_EXIT_CODE) + + # Create model directories in workspace root + list( + map( + lambda d: mkdir_with_hidden_file( + repo_path.joinpath(d).joinpath(TRESTLEBOT_KEEP_FILE) + ), + MODEL_DIR_LIST, + ) + ) + logger.debug("Created model directories successfully") + + # Create markdown directories in workspace root + list( + map( + lambda d: mkdir_with_hidden_file( + repo_path.joinpath(markdown_dir) + .joinpath(d) + .joinpath(TRESTLEBOT_KEEP_FILE) + ), + MODEL_DIR_LIST, + ) + ) + logger.debug("Created markdown directories successfully") + + # invoke the init command in compliance trestle + call_trestle_init(repo_path, debug) + + # generate and write trestle-bot config + config_values = dict(repo_path=repo_path, markdown_dir=markdown_dir) + if default_committer_name: + config_values.update(committer_name=default_committer_name) + + if default_committer_email: + config_values.update(committer_email=default_committer_email) + + if default_commit_message: + config_values.update(commit_message=default_commit_message) + + if default_branch: + config_values.update(branch=default_branch) + + config = make_config(config_values) + write_to_file(config, trestlebot_dir.joinpath("config.yml")) + logger.debug(f"trestle-bot config file created at {str(config_path)}") + logger.info(f"Successfully initialized trestlebot project in {repo_path}") diff --git a/trestlebot/cli/commands/rule_transform.py b/trestlebot/cli/commands/rule_transform.py new file mode 100644 index 00000000..b67edd2c --- /dev/null +++ b/trestlebot/cli/commands/rule_transform.py @@ -0,0 +1,73 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2024 Red Hat, Inc. + +"""Module for rule-transform command""" + +import logging +from typing import Any, List + +import click + +from trestlebot.cli.options.common import common_options, git_options, handle_exceptions +from trestlebot.cli.utils import comma_sep_to_list, run_bot +from trestlebot.const import RULES_VIEW_DIR +from trestlebot.tasks.authored.compdef import AuthoredComponentDefinition +from trestlebot.tasks.base_task import ModelFilter, TaskBase +from trestlebot.tasks.regenerate_task import RegenerateTask +from trestlebot.tasks.rule_transform_task import RuleTransformTask +from trestlebot.transformers.yaml_transformer import ToRulesYAMLTransformer + + +logger = logging.getLogger(__name__) + + +@click.command( + name="rule-transform", + help="Transform rules to an OSCAL Component Definition JSON file.", +) +@click.pass_context +@common_options +@git_options +@click.option( + "--markdown-dir", + type=str, + help="Directory name to store markdown files.", +) +@click.option( + "--rules-view-dir", + type=str, + help="Top-level rules-view directory.", + default=RULES_VIEW_DIR, +) +@click.option( + "--skip-items", + type=str, + help="Comma-separated list of glob patterns for directories to skip when running tasks.", +) +@handle_exceptions +def rule_transform_cmd(ctx: click.Context, **kwargs: Any) -> None: + """Run the rule transform operation.""" + # Allow any model to be skipped by setting skip_item, by default include all + model_filter: ModelFilter = ModelFilter( + skip_patterns=comma_sep_to_list(kwargs.get("skip_items", "")), + include_patterns=["*"], + ) + + transformer = ToRulesYAMLTransformer() + rule_transform_task: RuleTransformTask = RuleTransformTask( + working_dir=kwargs["repo_path"], + rules_view_dir=kwargs["rules_view_dir"], + rule_transformer=transformer, + model_filter=model_filter, + ) + regenerate_task: RegenerateTask = RegenerateTask( + markdown_dir=kwargs["markdown_dir"], + authored_object=AuthoredComponentDefinition(kwargs["repo_path"]), + model_filter=model_filter, + ) + + pre_tasks: List[TaskBase] = [rule_transform_task, regenerate_task] + kwargs["working_dir"] = str(kwargs["repo_path"].resolve()) + result = run_bot(pre_tasks, kwargs) + logger.debug(f"Bot results: {result}") + logger.info("Rule transform complete!") diff --git a/trestlebot/cli/commands/sync_upstreams.py b/trestlebot/cli/commands/sync_upstreams.py new file mode 100644 index 00000000..688e360a --- /dev/null +++ b/trestlebot/cli/commands/sync_upstreams.py @@ -0,0 +1,130 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2024 Red Hat, Inc. + +"""Module for upstream command""" +import logging +import sys +from typing import Any, List + +import click + +from trestlebot.bot import TrestleBot +from trestlebot.cli.options.common import common_options, git_options, handle_exceptions +from trestlebot.cli.utils import comma_sep_to_list +from trestlebot.const import ERROR_EXIT_CODE +from trestlebot.tasks.base_task import ModelFilter, TaskBase +from trestlebot.tasks.sync_upstreams_task import SyncUpstreamsTask + + +logger = logging.getLogger(__name__) + + +def load_value_from_ctx( + ctx: click.Context, param: click.Parameter, value: Any = None +) -> Any: + """Load config value for option from context.""" + if value: + return value + + if not ctx.default_map: + return None + + upstreams = ctx.default_map.get("upstreams") + if not upstreams: + return None + + config = upstreams.model_dump() + value = config.get(param.name) + if isinstance(value, List): + return ",".join(value) + return value + + +@click.command( + name="sync-upstreams", + help="Sync OSCAL content from upstream repositories.", +) +@click.pass_context +@click.option( + "--sources", + type=str, + help="Comma-separated list of upstream git sources to sync. Each source is a string \ + in the form @ where ref is a git ref such as a tag or branch.", + envvar="TRESTLEBOT_UPSTREAMS_SOURCES", + callback=load_value_from_ctx, + required=False, +) +@click.option( + "--exclude-models", + type=str, + help="Comma-separated list of glob patterns for model names to exclude when running \ + tasks (e.g. --include-models='component_x,profile_y*')", + required=False, + envvar="TRESTLEBOT_UPSTREAMS_EXCLUDE_MODELS", + callback=load_value_from_ctx, +) +@click.option( + "--include-models", + type=str, + default="*", + help="Comma-separated list of glob patterns for model names to include when running \ + tasks (e.g. --include-models='component_x,profile_y*')", + required=False, + envvar="TRESTLEBOT_UPSTREAMS_INCLUDE_MODELS", + callback=load_value_from_ctx, +) +@click.option( + "--skip-validation", + type=bool, + help="Skip validation of the models when they are copied.", + is_flag=True, + envvar="TRESTLEBOT_UPSTREAMS_SKIP_VALIDATION", + callback=load_value_from_ctx, +) +@common_options +@git_options +@handle_exceptions +def sync_upstreams_cmd(ctx: click.Context, **kwargs: Any) -> None: + """Add new upstream sources to workspace.""" + if not kwargs.get("sources"): + logger.error("Trestlebot Error: Missing option '--sources'.") + sys.exit(ERROR_EXIT_CODE) + + working_dir = str(kwargs["repo_path"].resolve()) + include_model_list = comma_sep_to_list(kwargs["include_models"]) + + model_filter: ModelFilter = ModelFilter( + skip_patterns=comma_sep_to_list(kwargs.get("exclude_models", "")), + include_patterns=include_model_list, + ) + + validate: bool = not kwargs.get("skip_validation", False) + + sync_upstreams_task = SyncUpstreamsTask( + working_dir=working_dir, + git_sources=comma_sep_to_list(kwargs["sources"]), + model_filter=model_filter, + validate=validate, + ) + + pre_tasks: List[TaskBase] = [sync_upstreams_task] + + bot = TrestleBot( + working_dir=working_dir, + branch=kwargs["branch"], + commit_name=kwargs["committer_name"], + commit_email=kwargs["committer_email"], + author_name=kwargs.get("author_name", ""), + author_email=kwargs.get("author_email", ""), + target_branch=kwargs.get("target_branch", ""), + ) + + results = bot.run( + patterns=["*.json"], + pre_tasks=pre_tasks, + commit_message=kwargs.get("commit_message", ""), + pull_request_title=kwargs.get("pull_request_title", ""), + dry_run=kwargs.get("dry_run", False), + ) + + logger.debug(f"Trestlebot results: {results}") diff --git a/trestlebot/cli/commands/version.py b/trestlebot/cli/commands/version.py new file mode 100644 index 00000000..bec415cc --- /dev/null +++ b/trestlebot/cli/commands/version.py @@ -0,0 +1,4 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2024 Red Hat, Inc. + +""" Version command """ diff --git a/trestlebot/cli/config.py b/trestlebot/cli/config.py new file mode 100644 index 00000000..f7cc7039 --- /dev/null +++ b/trestlebot/cli/config.py @@ -0,0 +1,151 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2024 Red Hat, Inc. + +""" +Trestle-bot CLI configuration module. +""" + +import logging +from pathlib import Path +from typing import Any, Dict, List, Optional + +import yaml +from pydantic import BaseModel, DirectoryPath, ValidationError + + +logger = logging.getLogger(__name__) + + +class TrestleBotConfigError(Exception): + """Custom error to better format pydantic exceptions. + + Example pydantic error dict: {'type': str, 'loc': tuple[str], 'msg': str, 'input': str} + + """ + + def __init__(self, errors: List[Dict[str, Any]]): + self.errors = list(map(self._format, errors)) + super().__init__( + f"Trestle-bot config file contains {len(self.errors)} error(s)." + ) + + def _format(self, err: Dict[str, Any]) -> str: + """Returns a formatted string with the error details.""" + msg = "Unable to load config." # default message if we can't parse error + + if err.get("loc"): + msg = f"Invalid config value for {err['loc'][0]}." + if err.get("msg"): + msg += f" {err['msg']}." # Add error message details if present + return msg + + def __str__(self) -> str: + return "".join(self.errors) + + +class UpstreamsConfig(BaseModel): + """Data model for upstream sources.""" + + sources: List[str] + include_models: List[str] = ["*"] + exclude_models: List[str] = [] + skip_validation: bool = False + + +class TrestleBotConfig(BaseModel): + """Data model for trestle-bot configuration.""" + + repo_path: Optional[DirectoryPath] = None + markdown_dir: Optional[str] = None + committer_name: Optional[str] = None + committer_email: Optional[str] = None + commit_message: Optional[str] = None + branch: Optional[str] = None + ssp_index_file: Optional[str] = None + upstreams: Optional[UpstreamsConfig] = None + + def to_yaml_dict(self) -> Dict[str, Any]: + """Returns a dict that can be cleanly written to a yaml file. + + This custom model serializer provides a cleaner dict that can + be stored as a YAML file. For example, we want to omit empty values + from being written to the YAML config file, or we want paths to be + written as strings, not posix path objects. + + Ex: instead of `ssp_index_file: None` appearing in the YAML, we + just want to exclude it from the config file all together. This + produces a YAML config file that only includes values that have + been set (or have a default we want to include). + + Values listed in IGNORED_VALUES will be skipped. + """ + + IGNORED_VALUES: List[Any] = [None, "None", []] + + config_dict: Dict[str, Any] = { + "repo_path": str(self.repo_path), + "markdown_dir": self.markdown_dir, + "ssp_index_file": self.ssp_index_file, + "committer_name": self.committer_name, + "committer_email": self.committer_email, + "commit_message": self.commit_message, + "branch": self.branch, + } + + if self.upstreams: + upstreams = { + "sources": self.upstreams.sources, + "skip_validation": self.upstreams.skip_validation, + "include_models": self.upstreams.include_models, + } + if self.upstreams.exclude_models: + upstreams["exclude_models"] = self.upstreams.exclude_models + + config_dict.update({"upstreams": upstreams}) + + # Filter out emtpy values to prevent them from appearing in the config + return dict( + filter(lambda item: item[1] not in IGNORED_VALUES, config_dict.items()) + ) + + +def load_from_file(file_path: Path) -> Optional[TrestleBotConfig]: + """Load yaml file to trestlebot config object""" + try: + with open(file_path, "r") as config_file: + config_yaml = yaml.safe_load(config_file) + return TrestleBotConfig(**config_yaml) + except ValidationError as ex: + raise TrestleBotConfigError(ex.errors()) + except (FileNotFoundError, TypeError): + logger.debug(f"No config file found at {file_path}") + return None + + +def write_to_file(config: TrestleBotConfig, file_path: Path) -> None: + """Write config object to yaml file""" + try: + file_path.parent.mkdir(parents=True, exist_ok=True) + with file_path.open("w") as config_file: + yaml.dump(config.to_yaml_dict(), config_file) + except ValidationError as ex: + raise TrestleBotConfigError(ex.errors()) + + +def make_config(values: Optional[Dict[str, Any]] = None) -> TrestleBotConfig: + """Generates a new trestle-bot config object""" + try: + if values: + return TrestleBotConfig.model_validate(values) + else: + return TrestleBotConfig() + except ValidationError as ex: + raise TrestleBotConfigError(ex.errors()) + + +def update_config(config: TrestleBotConfig, update: Dict[str, Any]) -> TrestleBotConfig: + """Returns a new config object with specified updates.""" + try: + return config.model_copy(update=update) + except ValidationError as ex: + raise TrestleBotConfigError(ex.errors()) diff --git a/trestlebot/cli/log.py b/trestlebot/cli/log.py new file mode 100644 index 00000000..bfc5a523 --- /dev/null +++ b/trestlebot/cli/log.py @@ -0,0 +1,58 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2023 Red Hat, Inc. + +"""Configure logger for trestlebot and trestle.""" + +import argparse +import logging +import sys +from typing import List + +import trestle.common.log as trestle_log + + +_logger = logging.getLogger("trestlebot") + + +def set_log_level(level: int = logging.INFO) -> None: + """Set the log level from the args for trestle and trestlebot.""" + + configure_logger(level) + + # Setup the trestle logger, it expects an argparse Namespace with a verbose int + verbose = 1 if level == logging.DEBUG else 0 + args = argparse.Namespace(verbose=verbose) + trestle_log.set_log_level_from_args(args=args) + + +def configure_logger(level: int = logging.INFO, propagate: bool = False) -> None: + """Configure the logger.""" + # Prevent extra message + _logger.propagate = propagate + _logger.setLevel(level=level) + for handler in configure_handlers(): + _logger.addHandler(handler) + + +def configure_handlers() -> List[logging.Handler]: + """Configure the handlers.""" + # Create a StreamHandler to send non-error logs to stdout + stdout_info_handler = logging.StreamHandler(sys.stdout) + stdout_info_handler.setLevel(logging.INFO) + stdout_info_handler.addFilter(trestle_log.SpecificLevelFilter(logging.INFO)) + + stdout_debug_handler = logging.StreamHandler(sys.stdout) + stdout_debug_handler.setLevel(logging.DEBUG) + stdout_debug_handler.addFilter(trestle_log.SpecificLevelFilter(logging.DEBUG)) + + # Create a StreamHandler to send error logs to stderr + stderr_handler = logging.StreamHandler(sys.stderr) + stderr_handler.setLevel(logging.WARNING) + + # Create a formatter and set it on both handlers + detailed_formatter = logging.Formatter( + "%(name)s:%(lineno)d %(levelname)s: %(message)s" + ) + stdout_debug_handler.setFormatter(detailed_formatter) + stderr_handler.setFormatter(detailed_formatter) + return [stdout_debug_handler, stdout_info_handler, stderr_handler] diff --git a/trestlebot/cli/options/common.py b/trestlebot/cli/options/common.py new file mode 100644 index 00000000..f9b633ac --- /dev/null +++ b/trestlebot/cli/options/common.py @@ -0,0 +1,187 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2024 Red Hat, Inc. + +""" +Common command options for trestle-bot commands. +""" + +import logging +import os +import pathlib +import sys +import traceback +from typing import Any, Callable, Dict, Optional, Sequence, TypeVar + +import click + +from trestlebot.cli.config import TrestleBotConfigError, load_from_file +from trestlebot.cli.log import set_log_level +from trestlebot.const import ERROR_EXIT_CODE, TRESTLEBOT_CONFIG_DIR + + +F = TypeVar("F", bound=Callable[..., Any]) + +logger = logging.getLogger(__name__) + + +def handle_exceptions(func: F) -> Any: + def wrapper(*args: Sequence[Any], **kwargs: Dict[Any, Any]) -> Any: + try: + return func(*args, **kwargs) + except Exception as ex: + traceback_str = traceback.format_exc() + logger.error(f"Trestle-bot Error: {str(ex)}") + logger.debug(traceback_str) + return ERROR_EXIT_CODE + + return wrapper + + +def debug_to_log_level(ctx: click.Context, param: str, value: str) -> None: + """Sets logging level based on debug flag.""" + + # TODO: get log level from config file + log_level = logging.DEBUG if value else logging.INFO + set_log_level(log_level) + + +def load_config_to_ctx( + ctx: click.Context, param: str, value: pathlib.Path +) -> Optional[pathlib.Path]: + """Load yaml config file into Click context to set default values. This + allows values from the yaml config to be used as the default value for a + given command option. + + If the user specifies a value for the option directly (e.g. uses --option value) + then that value is used in favor of the value loaded from the config. + + Similarly, if an option has an associated ENVVAR, and that ENVVAR is set, then the + ENVVAR value is used in favor of the value loaded from the config. + + Since the config contains values that should not be mapped to command option values + the dictionary is explicitly built by directly plucking values from the config. + + This will always run before other options because the --config is_eager is True. + """ + try: + config = load_from_file(value) + if not config: + logger.debug(f"No trestlebot config file found at {value}.") + return value + except TrestleBotConfigError as ex: + logger.error(str(ex)) + for err in ex.errors: + logger.error({err}) + sys.exit(ERROR_EXIT_CODE) + + if not ctx.default_map: + ctx.default_map = { + "repo_path": config.repo_path, + "markdown_dir": config.markdown_dir, + "ssp_index_file": config.ssp_index_file, + "committer_name": config.committer_name, + "committer_email": config.committer_email, + "commit_message": config.commit_message, + "branch": config.branch, + "upstreams": config.upstreams, + } + else: + ctx.default_map.update(config) + logger.debug(f"Successfully loaded config file {value} into context.") + return value + + +def common_options(f: F) -> F: + """ + Configures common options used across commands. + """ + + f = click.option( + "--debug", + default=False, + is_flag=True, + is_eager=True, + envvar="TRESTLEBOT_DEBUG", + help="Enable debug logging messages.", + callback=debug_to_log_level, + )(f) + f = click.option( + "--config", + "config_path", + type=click.Path(path_type=pathlib.Path), + envvar="TRESTLEBOT_CONFIG", + help="Path to trestlebot configuration file.", + default=f"{TRESTLEBOT_CONFIG_DIR}/config.yml", + is_eager=True, + callback=load_config_to_ctx, + )(f) + click.option( + "--repo-path", + type=click.Path(path_type=pathlib.Path, exists=True), + envvar="TRESTLEBOT_REPO_PATH", + help="Path to git respository root.", + required=True, + default=os.getcwd(), + )(f) + f = click.option( + "--dry-run", + help="Run tasks, but do not push changes to the repository.", + is_flag=True, + )(f) + + return f + + +def git_options(f: F) -> F: + """ + Configure git options used for git operations. + """ + f = click.option( + "--branch", + help="Git repo branch to push automated changes.", + required=True, + type=str, + )(f) + f = click.option( + "--target-branch", + help="Target branch (base branch) to create a pull request against. \ + No pull request is created if unset.", + required=False, + type=str, + )(f) + f = click.option( + "--committer-name", + help="User name for git committer.", + required=True, + type=str, + )(f) + f = click.option( + "--committer-email", + help="User email for git committer", + required=True, + type=str, + )(f) + f = click.option( + "--file-patterns", + help="Comma-separated list of file patterns to be used with `git add` in repository updates.", + type=str, + default=".", + )(f) + f = click.option( + "--commit-message", + help="Commit message for automated updates.", + default="Automatic updates from trestle-bot", + type=str, + )(f) + f = click.option( + "--author-name", + help="Name for commit author if differs from committer.", + type=str, + )(f) + f = click.option( + "--author-email", + help="Email for commit author if differs from committer.", + type=str, + )(f) + + return f diff --git a/trestlebot/cli/options/create.py b/trestlebot/cli/options/create.py new file mode 100644 index 00000000..1323a9ee --- /dev/null +++ b/trestlebot/cli/options/create.py @@ -0,0 +1,40 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2024 Red Hat, Inc. + +""" +Module for common create commands +""" + +import functools +from typing import Any, Callable, Dict, Sequence, TypeVar + +import click + + +F = TypeVar("F", bound=Callable[..., Any]) + + +def common_create_options(f: F) -> F: + """ + Configuring common create options decorator for SSP and CD command + """ + + @click.option( + "--profile-name", + prompt="Enter name of profile in trestle workspace to include", + help="Name of profile in trestle workspace to include.", + ) + @click.option( + "--markdown-dir", + type=str, + prompt="Enter path to store markdown files", + default="markdown/", + help="Directory name to store markdown files.", + ) + @functools.wraps(f) + def wrapper_common_create_options( + *args: Sequence[Any], **kwargs: Dict[Any, Any] + ) -> Any: + return f(*args, **kwargs) + + return wrapper_common_create_options diff --git a/trestlebot/cli/root.py b/trestlebot/cli/root.py new file mode 100644 index 00000000..53837b7e --- /dev/null +++ b/trestlebot/cli/root.py @@ -0,0 +1,36 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2024 Red Hat, Inc. + + +"""Main entrypoint for trestlebot""" + +import click + +from trestlebot.cli.commands.autosync import autosync_cmd +from trestlebot.cli.commands.create import create_cmd +from trestlebot.cli.commands.init import init_cmd +from trestlebot.cli.commands.rule_transform import rule_transform_cmd +from trestlebot.cli.commands.sync_upstreams import sync_upstreams_cmd + + +EPILOG = "See our docs at https://redhatproductsecurity.github.io/trestle-bot" + +CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) + + +@click.group( + name="trestlebot", + help="Trestle-bot CLI", + context_settings=CONTEXT_SETTINGS, + epilog=EPILOG, +) +@click.pass_context +def root_cmd(ctx: click.Context) -> None: + """Root command""" + + +root_cmd.add_command(init_cmd) +root_cmd.add_command(autosync_cmd) +root_cmd.add_command(create_cmd) +root_cmd.add_command(rule_transform_cmd) +root_cmd.add_command(sync_upstreams_cmd) diff --git a/trestlebot/cli/utils.py b/trestlebot/cli/utils.py new file mode 100644 index 00000000..f2ef4690 --- /dev/null +++ b/trestlebot/cli/utils.py @@ -0,0 +1,37 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2024 Red Hat, Inc. + +from typing import Any, Dict, List + +from trestlebot.bot import TrestleBot +from trestlebot.reporter import BotResults +from trestlebot.tasks.base_task import TaskBase + + +def comma_sep_to_list(string: str) -> List[str]: + """Convert comma-sep string to list of strings and strip.""" + string = string.strip() if string else "" + return list(map(str.strip, string.split(","))) if string else [] + + +def run_bot(pre_tasks: List[TaskBase], kwargs: Dict[Any, Any]) -> BotResults: + """Reusable logic for all commands.""" + + # Configure and run the bot + bot = TrestleBot( + working_dir=kwargs["repo_path"], + branch=kwargs["branch"], + commit_name=kwargs["committer_name"], + commit_email=kwargs["committer_email"], + author_name=kwargs.get("author_name", ""), + author_email=kwargs.get("author_email", ""), + ) + + return bot.run( + pre_tasks=pre_tasks, + patterns=kwargs.get("patterns", ["."]), + commit_message=kwargs.get( + "commit_message", "Automatic updates from trestle-bot" + ), + dry_run=kwargs.get("dry_run", False), + )