Skip to content

Commit

Permalink
feat: add create-cd entrypoint for component definition bootstrapping (
Browse files Browse the repository at this point in the history
…#67)

* feat(entrypoint): add CreatCDEntrypoint
Related issues: PSCE-256
Majority of code generated by chatGPT
Signed-off-by: Alex Flom <[email protected]>

* feat(entrypoint): add CreatCDEntrypoint
Related issues: PSCE-256
Majority of code generated by chatGPT
Signed-off-by: Alex Flom <[email protected]>

* feat: adds ModelFilter to allow inclusive and exclusive filtering during tasks

Replaces skip_model_list with ModelFilter
Add ModelFilter to existing tasks
Updates CreateCD to use the filter to only process the new compdef

BREAKING CHANGE: skip_model_list in tasks have been replace with ModelFilter

Signed-off-by: Jennifer Power <[email protected]>

* feat: adds profile filtering to create-cd entrypoint

Signed-off-by: Jennifer Power <[email protected]>

* chore: removes add filter profile and use existing

Signed-off-by: Jennifer Power <[email protected]>

---------

Signed-off-by: Jennifer Power <[email protected]>
Co-authored-by: Alex Flom <[email protected]>
  • Loading branch information
jpower432 and afflom authored Oct 26, 2023
1 parent 83a0e59 commit 7a73162
Show file tree
Hide file tree
Showing 16 changed files with 372 additions and 111 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ repository = 'https://github.com/RedHatProductSecurity/trestle-bot'
[tool.poetry.scripts]
trestlebot-autosync = "trestlebot.entrypoints.autosync:main"
trestlebot-rules-transform = "trestlebot.entrypoints.rule_transform:main"
trestlebot-create-cd = "trestlebot.entrypoints.create_cd:main"

[tool.poetry.dependencies]
python = '^3.8.1'
Expand Down
41 changes: 23 additions & 18 deletions tests/trestlebot/tasks/authored/test_compdef.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,21 @@
import re

import pytest
from trestle.common.model_utils import ModelUtils
from trestle.core.catalog.catalog_interface import CatalogInterface
from trestle.core.profile_resolver import ProfileResolver
from trestle.oscal import profile as prof
from trestle.common.err import TrestleError
from trestle.oscal.profile import Profile

from tests import testutils
from trestlebot.const import RULES_VIEW_DIR, YAML_EXTENSION
from trestlebot.tasks.authored.base_authored import AuthoredObjectException
from trestlebot.tasks.authored.compdef import AuthoredComponentDefinition
from trestlebot.tasks.authored.compdef import (
AuthoredComponentDefinition,
FilterByProfile,
)
from trestlebot.transformers.yaml_transformer import ToRulesYAMLTransformer


test_prof = "simplified_nist_profile"
test_filter_prof = "simplified_filter_profile"
test_comp = "test_comp"


Expand Down Expand Up @@ -89,24 +91,17 @@ def test_create_new_default(tmp_trestle_dir: str) -> None:

def test_create_new_default_with_filter(tmp_trestle_dir: str) -> None:
"""Test creating new default component definition with filter"""

# Prepare the workspace
trestle_root = pathlib.Path(tmp_trestle_dir)
_ = testutils.setup_for_profile(trestle_root, test_prof, "")
testutils.load_from_json(trestle_root, test_filter_prof, test_filter_prof, Profile)
authored_comp = AuthoredComponentDefinition(tmp_trestle_dir)

profile_path = ModelUtils.get_model_path_for_name_and_class(
trestle_root, test_prof, prof.Profile
)

catalog = ProfileResolver.get_resolved_profile_catalog(
trestle_root, profile_path=profile_path
)

catalog_interface = CatalogInterface(catalog)
catalog_interface.delete_control("ac-5")
filter_by_profile = FilterByProfile(trestle_root, test_filter_prof)

authored_comp.create_new_default(
test_prof, test_comp, "test", "My desc", "service", catalog_interface
test_prof, test_comp, "test", "My desc", "service", filter_by_profile
)

rules_view_dir = trestle_root / RULES_VIEW_DIR
Expand All @@ -119,9 +114,9 @@ def test_create_new_default_with_filter(tmp_trestle_dir: str) -> None:
assert comp_dir.exists()

# Verity that the number of rules YAML files has been reduced
# from 12 to 11.
# from 12 to 7.
yaml_files = list(comp_dir.glob(f"*{YAML_EXTENSION}"))
assert len(yaml_files) == 11
assert len(yaml_files) == 7


def test_create_new_default_no_profile(tmp_trestle_dir: str) -> None:
Expand All @@ -138,3 +133,13 @@ def test_create_new_default_no_profile(tmp_trestle_dir: str) -> None:
authored_comp.create_new_default(
"fake", test_comp, "test", "My desc", "service"
)


def test_filter_by_profile_with_no_profile(tmp_trestle_dir: str) -> None:
"""Test creating a profile filter with a non-existent profile"""
trestle_root = pathlib.Path(tmp_trestle_dir)

with pytest.raises(
TrestleError, match="Profile fake does not exist in the workspace"
):
_ = FilterByProfile(trestle_root, "fake")
5 changes: 4 additions & 1 deletion tests/trestlebot/tasks/test_assemble_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from trestlebot.tasks.assemble_task import AssembleTask
from trestlebot.tasks.authored.base_authored import AuthorObjectBase
from trestlebot.tasks.authored.types import AuthoredType
from trestlebot.tasks.base_task import ModelFilter


test_prof = "simplified_nist_profile"
Expand Down Expand Up @@ -95,11 +96,13 @@ def test_assemble_task_with_skip(tmp_trestle_dir: str, skip_list: List[str]) ->

mock = Mock(spec=AuthorObjectBase)

filter = ModelFilter(skip_list, ["."])

assemble_task = AssembleTask(
working_dir=tmp_trestle_dir,
authored_model=AuthoredType.CATALOG.value,
markdown_dir=cat_md_dir,
skip_model_list=skip_list,
filter=filter,
)

with patch(
Expand Down
47 changes: 47 additions & 0 deletions tests/trestlebot/tasks/test_base_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/usr/bin/python

# Copyright 2023 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""Test workspace filtering logic."""

import pathlib
from typing import List

import pytest

from trestlebot.tasks.base_task import ModelFilter


@pytest.mark.parametrize(
"skip_list, include_list, model_name, expected",
[
[["simplified_nist_catalog"], [], "simplified_nist_catalog", True],
[[], ["simplified_nist_catalog"], "simplified_nist_catalog", False],
[["simplified*"], ["."], "simplified_nist_catalog", True],
[
["simplified_nist_catalog"],
["simplified*"],
"simplified_nist_profile",
False,
],
],
)
def test_is_skipped(
skip_list: List[str], include_list: List[str], model_name: str, expected: str
) -> None:
"""Test skip logic."""
model_path = pathlib.Path(model_name)
model_filter = ModelFilter(skip_list, include_list)
assert model_filter.is_skipped(model_path) == expected
18 changes: 4 additions & 14 deletions tests/trestlebot/tasks/test_regenerate_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from tests import testutils
from trestlebot.tasks.authored.base_authored import AuthorObjectBase
from trestlebot.tasks.authored.types import AuthoredType
from trestlebot.tasks.base_task import ModelFilter
from trestlebot.tasks.regenerate_task import RegenerateTask


Expand Down Expand Up @@ -86,11 +87,13 @@ def test_regenerate_task_with_skip(tmp_trestle_dir: str, skip_list: List[str]) -

mock = Mock(spec=AuthorObjectBase)

filter = ModelFilter(skip_list, ["."])

regenerate_task = RegenerateTask(
working_dir=tmp_trestle_dir,
authored_model=AuthoredType.CATALOG.value,
markdown_dir=cat_md_dir,
skip_model_list=skip_list,
filter=filter,
)

with patch(
Expand Down Expand Up @@ -119,19 +122,6 @@ def test_catalog_regenerate_task(tmp_trestle_dir: str) -> None:
assert os.path.exists(os.path.join(tmp_trestle_dir, md_path))


def test_catalog_regenerate_task_with_skip(tmp_trestle_dir: str) -> None:
"""Test catalog regenerate at the task level"""
trestle_root = pathlib.Path(tmp_trestle_dir)
md_path = os.path.join(cat_md_dir, test_cat)
_ = testutils.setup_for_catalog(trestle_root, test_cat, md_path)

regenerate_task = RegenerateTask(
tmp_trestle_dir, AuthoredType.CATALOG.value, cat_md_dir, "", [test_cat]
)
assert regenerate_task.execute() == 0
assert not os.path.exists(os.path.join(tmp_trestle_dir, md_path))


def test_profile_regenerate_task(tmp_trestle_dir: str) -> None:
"""Test profile regenerate at the task level"""
trestle_root = pathlib.Path(tmp_trestle_dir)
Expand Down
8 changes: 5 additions & 3 deletions tests/trestlebot/tasks/test_rule_transform_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.

"""Test for Trestle Bot rule transform task"""
"""Test for Trestle Bot rule transform task."""

import pathlib

Expand All @@ -26,7 +26,7 @@
from trestle.tasks.csv_to_oscal_cd import RULE_DESCRIPTION, RULE_ID

from tests.testutils import setup_rules_view
from trestlebot.tasks.base_task import TaskException
from trestlebot.tasks.base_task import ModelFilter, TaskException
from trestlebot.tasks.rule_transform_task import RuleTransformTask
from trestlebot.transformers.yaml_transformer import ToRulesYAMLTransformer

Expand Down Expand Up @@ -116,8 +116,10 @@ def test_rule_transform_task_with_skip(tmp_trestle_dir: str) -> None:
trestle_root = pathlib.Path(tmp_trestle_dir)
setup_rules_view(trestle_root, test_comp, test_rules_dir)
transformer = ToRulesYAMLTransformer()

filter = ModelFilter([test_comp], [])
rule_transform_task = RuleTransformTask(
tmp_trestle_dir, test_rules_dir, transformer, skip_model_list=[test_comp]
tmp_trestle_dir, test_rules_dir, transformer, filter=filter
)
return_code = rule_transform_task.execute()
assert return_code == 0
Expand Down
1 change: 1 addition & 0 deletions trestlebot/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,4 @@
YAML_EXTENSION = ".yaml"

RULES_VIEW_DIR = "rules"
RULE_PREFIX = "rule-"
36 changes: 22 additions & 14 deletions trestlebot/entrypoints/autosync.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from trestlebot.entrypoints.log import set_log_level_from_args
from trestlebot.tasks.assemble_task import AssembleTask
from trestlebot.tasks.authored import types
from trestlebot.tasks.base_task import TaskBase
from trestlebot.tasks.base_task import ModelFilter, TaskBase
from trestlebot.tasks.regenerate_task import RegenerateTask


Expand Down Expand Up @@ -113,35 +113,43 @@ def run(self, args: argparse.Namespace) -> None:
logger.error("Must set markdown path with oscal model.")
sys.exit(const.ERROR_EXIT_CODE)

if args.oscal_model == "ssp" and args.ssp_index_path == "":
if (
args.oscal_model == types.AuthoredType.SSP.value
and args.ssp_index_path == ""
):
logger.error("Must set ssp_index_path when using SSP as oscal model.")
sys.exit(const.ERROR_EXIT_CODE)

filter: ModelFilter = ModelFilter(
skip_patterns=comma_sep_to_list(args.skip_items),
include_patterns=["."],
)

# Assuming an edit has occurred assemble would be run before regenerate.
# Adding this to the list first
if not args.skip_assemble:
assemble_task = AssembleTask(
args.working_dir,
args.oscal_model,
args.markdown_path,
args.ssp_index_path,
comma_sep_to_list(args.skip_items),
working_dir=args.working_dir,
authored_model=args.oscal_model,
markdown_dir=args.markdown_path,
ssp_index_path=args.ssp_index_path,
filter=filter,
)
pre_tasks.append(assemble_task)
else:
logger.info("Assemble task skipped")
logger.info("Assemble task skipped.")

if not args.skip_regenerate:
regenerate_task = RegenerateTask(
args.working_dir,
args.oscal_model,
args.markdown_path,
args.ssp_index_path,
comma_sep_to_list(args.skip_items),
working_dir=args.working_dir,
authored_model=args.oscal_model,
markdown_dir=args.markdown_path,
ssp_index_path=args.ssp_index_path,
filter=filter,
)
pre_tasks.append(regenerate_task)
else:
logger.info("Regeneration task skipped")
logger.info("Regeneration task skipped.")

super().run_base(args, pre_tasks)

Expand Down
Loading

0 comments on commit 7a73162

Please sign in to comment.