diff --git a/tools/devsecops_engine_tools/engine_core/src/domain/model/exclusions.py b/tools/devsecops_engine_tools/engine_core/src/domain/model/exclusions.py index 012aa3a2..2a59578e 100644 --- a/tools/devsecops_engine_tools/engine_core/src/domain/model/exclusions.py +++ b/tools/devsecops_engine_tools/engine_core/src/domain/model/exclusions.py @@ -12,3 +12,8 @@ def __init__(self, **kwargs): self.severity = kwargs.get("severity", "") self.hu = kwargs.get("hu", "") self.reason = kwargs.get("reason", "Risk Accepted") + self.vm_id = kwargs.get("vm_id", "") + self.vm_id_url = kwargs.get("vm_id_url", "") + self.service = kwargs.get("service", "") + self.service_url = kwargs.get("service_url", "") + self.tags = kwargs.get("tags", []) diff --git a/tools/devsecops_engine_tools/engine_core/src/infrastructure/driven_adapters/defect_dojo/defect_dojo.py b/tools/devsecops_engine_tools/engine_core/src/infrastructure/driven_adapters/defect_dojo/defect_dojo.py index 1ab27304..c2a49875 100644 --- a/tools/devsecops_engine_tools/engine_core/src/infrastructure/driven_adapters/defect_dojo/defect_dojo.py +++ b/tools/devsecops_engine_tools/engine_core/src/infrastructure/driven_adapters/defect_dojo/defect_dojo.py @@ -243,7 +243,7 @@ def get_all(self, service, dict_args, secret_tool, config_tool): ) all_exclusions = self._get_report_exclusions( - all_findings, self._format_date_to_dd_format + all_findings, self._format_date_to_dd_format, host_dd=host_dd ) return all_findings, all_exclusions @@ -288,25 +288,25 @@ def _get_session_manager(self, dict_args, secret_tool, config_tool): config_tool["VULNERABILITY_MANAGER"]["DEFECT_DOJO"]["HOST_DEFECT_DOJO"], ) - def _get_report_exclusions(self, total_findings, date_fn): + def _get_report_exclusions(self, total_findings, date_fn, host_dd): exclusions = [] for finding in total_findings: if finding.risk_accepted: exclusions.append( - self._create_exclusion( - finding, date_fn, "engine_risk", "Risk Accepted" + self._create_report_exclusion( + finding, date_fn, "engine_risk", "Risk Accepted", host_dd ) ) elif finding.false_p: exclusions.append( - self._create_exclusion( - finding, date_fn, "engine_risk", "False Positive" + self._create_report_exclusion( + finding, date_fn, "engine_risk", "False Positive", host_dd ) ) elif finding.risk_status == "Transfer Accepted": exclusions.append( - self._create_exclusion( - finding, date_fn, "engine_risk", "Transferred Finding" + self._create_report_exclusion( + finding, date_fn, "engine_risk", "Transferred Finding", host_dd ) ) return exclusions @@ -343,7 +343,7 @@ def _retries_requests(self, request_func, max_retries, retry_delay): logger.error("Maximum number of retries reached, aborting.") raise e - def _create_exclusion(self, finding, date_fn, tool, reason): + def _date_reason_based(self, finding, date_fn, reason): if reason == "False Positive": create_date = date_fn(finding.last_status_update) expired_date = date_fn(None) @@ -355,6 +355,12 @@ def _create_exclusion(self, finding, date_fn, tool, reason): create_date = date_fn(last_accepted_risk["created"]) expired_date = date_fn(last_accepted_risk["expiration_date"]) + return create_date, expired_date + + + def _create_exclusion(self, finding, date_fn, tool, reason): + create_date, expired_date = self._date_reason_based(finding, date_fn, reason) + return Exclusions( id=finding.vuln_id_from_tool, where=self._get_where(finding, tool), @@ -363,6 +369,23 @@ def _create_exclusion(self, finding, date_fn, tool, reason): severity=finding.severity, reason=reason, ) + + def _create_report_exclusion(self, finding, date_fn, tool, reason, host_dd): + create_date, expired_date = self._date_reason_based(finding, date_fn, reason) + + return Exclusions( + id=finding.vuln_id_from_tool, + where=self._get_where(finding, tool), + create_date=create_date, + expired_date=expired_date, + severity=finding.severity, + reason=reason, + vm_id=str(finding.id), + vm_id_url=f"{host_dd}/finding/{finding.id}", + service=finding.service, + service_url=f"{host_dd}/finding?active=true&service={finding.service}", + tags=finding.tags, + ) def _create_report(self, finding, host_dd): return Report( diff --git a/tools/devsecops_engine_tools/engine_core/src/infrastructure/driven_adapters/printer_rich_table/printer_rich_table.py b/tools/devsecops_engine_tools/engine_core/src/infrastructure/driven_adapters/printer_rich_table/printer_rich_table.py index dcc5e208..3ed0a542 100644 --- a/tools/devsecops_engine_tools/engine_core/src/infrastructure/driven_adapters/printer_rich_table/printer_rich_table.py +++ b/tools/devsecops_engine_tools/engine_core/src/infrastructure/driven_adapters/printer_rich_table/printer_rich_table.py @@ -10,12 +10,13 @@ Report, ) from devsecops_engine_tools.engine_core.src.infrastructure.helpers.util import ( - format_date + format_date, ) from rich.console import Console from rich.table import Table from rich import box + @dataclass class PrinterRichTable(PrinterTableGateway): def print_table_findings(self, finding_list: "list[Finding]"): @@ -23,9 +24,13 @@ def print_table_findings(self, finding_list: "list[Finding]"): return def print_table_report(self, report_list: "list[Report]"): - sorted_report_list = sorted(report_list, key=lambda report: report.risk_score, reverse=True) + sorted_report_list = sorted( + report_list, key=lambda report: report.risk_score, reverse=True + ) headers = ["Risk Score", "ID", "Tags", "Services"] - table = Table(show_header=True, header_style="bold magenta", box=box.DOUBLE_EDGE) + table = Table( + show_header=True, header_style="bold magenta", box=box.DOUBLE_EDGE + ) for header in headers: table.add_column(header) for report in sorted_report_list: @@ -33,15 +38,38 @@ def print_table_report(self, report_list: "list[Report]"): str(report.risk_score), self._check_spaces(report.vm_id, report.vm_id_url), ", ".join(report.tags), - self._check_spaces(report.service, report.service_url) + self._check_spaces(report.service, report.service_url), ] table.add_row(*row_data) console = Console() console.print(table) def print_table_exclusions(self, exclusions_list): - # To implement - return + headers = [] + if exclusions_list: + headers = ["ID", "Tags", "Create Date", "Expired Date", "Reason", "Service"] + table = Table( + show_header=True, header_style="bold magenta", box=box.DOUBLE_EDGE + ) + for header in headers: + table.add_column(header) + for exclusion in exclusions_list: + row_data = [ + self._check_spaces(exclusion["vm_id"], exclusion["vm_id_url"]), + ", ".join(exclusion["tags"]), + format_date(exclusion["create_date"], "%d%m%Y", "%d/%m/%Y"), + ( + format_date(exclusion["expired_date"], "%d%m%Y", "%d/%m/%Y") + if exclusion["expired_date"] + and exclusion["expired_date"] != "undefined" + else "NA" + ), + exclusion["reason"], + self._check_spaces(exclusion["service"], exclusion["service_url"]), + ] + table.add_row(*row_data) + console = Console() + console.print(table) def _check_spaces(self, value, url): values = value.split() @@ -55,4 +83,4 @@ def _check_spaces(self, value, url): return new_value def _make_hyperlink(self, value, url): - return f"[link={url}]{value}[/link]" \ No newline at end of file + return f"[link={url}]{value}[/link]" diff --git a/tools/devsecops_engine_tools/engine_risk/src/domain/usecases/break_build.py b/tools/devsecops_engine_tools/engine_risk/src/domain/usecases/break_build.py index 16dcb32c..a49aca05 100644 --- a/tools/devsecops_engine_tools/engine_risk/src/domain/usecases/break_build.py +++ b/tools/devsecops_engine_tools/engine_risk/src/domain/usecases/break_build.py @@ -162,6 +162,11 @@ def _map_applied_exclusion(self, exclusions: "list[Exclusions]"): "create_date": exclusion.create_date, "expired_date": exclusion.expired_date, "reason": exclusion.reason, + "vm_id": exclusion.vm_id, + "vm_id_url": exclusion.vm_id_url, + "service": exclusion.service, + "service_url": exclusion.service_url, + "tags": exclusion.tags, } for exclusion in exclusions ] @@ -181,7 +186,13 @@ def _apply_exclusions(self, report_list: "list[Report]"): or (report.id and report.id == exclusion.id) ) and ((exclusion.where in report.where) or (exclusion.where == "all")): exclude = True - applied_exclusions.append(exclusion) + exclusion_copy = copy.deepcopy(exclusion) + exclusion_copy.vm_id = report.vm_id + exclusion_copy.vm_id_url = report.vm_id_url + exclusion_copy.service = report.service + exclusion_copy.service_url = report.service_url + exclusion_copy.tags = report.tags + applied_exclusions.append(exclusion_copy) break if not exclude: report.reason = "Remediation Rate"