Skip to content

Commit

Permalink
feat(engine_risk): 🚀 New report table with hyperlinks
Browse files Browse the repository at this point in the history
  • Loading branch information
ssantaa9 committed Nov 6, 2024
1 parent afcf254 commit 621b3c4
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
from devsecops_engine_tools.engine_core.src.infrastructure.driven_adapters.printer_pretty_table.printer_pretty_table import (
PrinterPrettyTable,
)
from devsecops_engine_tools.engine_core.src.infrastructure.driven_adapters.printer_rich_table.printer_rich_table import (
PrinterRichTable,
)
import sys
import argparse
from devsecops_engine_tools.engine_utilities.utils.logger_info import MyLogger
Expand Down Expand Up @@ -191,9 +194,13 @@ def application_core():
"github": GithubActions(),
"local": RuntimeLocal(),
}.get(args["platform_devops"])
printer_table_gateway = PrinterPrettyTable()
metrics_manager_gateway = S3Manager()

if args["tool"] == "engine_risk":
printer_table_gateway = PrinterRichTable()
else:
printer_table_gateway = PrinterPrettyTable()

init_engine_core(
vulnerability_management_gateway,
secrets_manager_gateway,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
@dataclass
class Report:
def __init__(self, **kwargs):
self.vm_id = kwargs.get("vm_id", "")
self.vm_id_url = kwargs.get("vm_id_url", "")
self.id = kwargs.get("id", [])
self.vuln_id_from_tool = kwargs.get("vuln_id_from_tool", "")
self.where = kwargs.get("where", "")
Expand All @@ -27,6 +29,7 @@ def __init__(self, **kwargs):
self.risk_accepted = kwargs.get("risk_accepted", "")
self.false_p = kwargs.get("false_p", "")
self.service = kwargs.get("service", "")
self.service_url = kwargs.get("service_url", "")
self.reason = kwargs.get("reason", "")
self.component_name = kwargs.get("component_name", "")
self.component_version = kwargs.get("component_version", "")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ def get_all(self, service, dict_args, secret_tool, config_tool):
max_retries = config_tool["VULNERABILITY_MANAGER"]["DEFECT_DOJO"][
"MAX_RETRIES_QUERY"
]
host_dd = config_tool["VULNERABILITY_MANAGER"]["DEFECT_DOJO"]["HOST_DEFECT_DOJO"]

findings = self._get_findings(
self._get_session_manager(dict_args, secret_tool, config_tool),
Expand All @@ -236,7 +237,7 @@ def get_all(self, service, dict_args, secret_tool, config_tool):

all_findings = list(
map(
partial(self._create_report),
partial(self._create_report, host_dd=host_dd),
findings,
)
)
Expand Down Expand Up @@ -363,8 +364,10 @@ def _create_exclusion(self, finding, date_fn, tool, reason):
reason=reason,
)

def _create_report(self, finding):
def _create_report(self, finding, host_dd):
return Report(
vm_id=str(finding.id),
vm_id_url=f"{host_dd}/finding/{finding.id}",
id=finding.vulnerability_ids,
vuln_id_from_tool=finding.vuln_id_from_tool,
status=finding.display_status,
Expand All @@ -391,6 +394,7 @@ def _create_report(self, finding):
risk_accepted=finding.risk_accepted,
false_p=finding.false_p,
service=finding.service,
service_url=f"{host_dd}/finding?active=true&service={finding.service}",
)

def _format_date_to_dd_format(self, date_string):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from dataclasses import dataclass

from devsecops_engine_tools.engine_core.src.domain.model.gateway.printer_table_gateway import (
PrinterTableGateway,
)
from devsecops_engine_tools.engine_core.src.domain.model.finding import (
Finding,
)
from devsecops_engine_tools.engine_core.src.domain.model.report import (
Report,
)
from devsecops_engine_tools.engine_core.src.infrastructure.helpers.util import (
format_date
)
from rich.console import Console
from rich.table import Table
from rich import box

@dataclass
class PrinterRichTable(PrinterTableGateway):
def print_table_findings(self, finding_list: "list[Finding]"):
# To implement
return

def print_table_report(self, report_list: "list[Report]"):
sorted_report_list = sorted(report_list, key=lambda report: report.risk_score, reverse=True)
headers = ["Risk Score", "ID", "Tags", "Services"]
table = Table(show_header=True, header_style="bold magenta", box=box.DOUBLE_EDGE)
for header in headers:
table.add_column(header)
for report in sorted_report_list:
row_data = [
str(report.risk_score),
self._check_spaces(report.vm_id, report.vm_id_url),
", ".join(report.tags),
self._check_spaces(report.service, report.service_url)
]
table.add_row(*row_data)
console = Console()
console.print(table)

def print_table_exclusions(self, exclusions_list):
# To implement
return

def _check_spaces(self, value, url):
values = value.split()
urls = url.split()
new_value = ""
if len(values) > 1 or len(urls) > 1:
for value, url in zip(values, urls):
new_value += self._make_hyperlink(value, url) + " "
else:
new_value = self._make_hyperlink(values[0], urls[0])
return new_value

def _make_hyperlink(self, value, url):
return f"[link={url}]{value}[/link]"
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,38 @@ def filter_duplicated(self, findings):
key = (finding.where, tuple(finding.id), finding.vuln_id_from_tool)
if key in findings_map:
existing_finding = findings_map[key]
combined_services = set(
existing_finding.service.split() + finding.service.split()
)
combined_services = existing_finding.service.split() + [
s
for s in finding.service.split()
if s not in existing_finding.service.split()
]
combined_services_url = existing_finding.service_url.split() + [
s_url
for s_url in finding.service_url.split()
if s_url not in existing_finding.service_url.split()
]
combined_vm_ids = existing_finding.vm_id.split() + [
vm
for vm in finding.vm_id.split()
if vm not in existing_finding.vm_id.split()
]
combined_vm_id_urls = existing_finding.vm_id_url.split() + [
vm_url
for vm_url in finding.vm_id_url.split()
if vm_url not in existing_finding.vm_id_url.split()
]
if finding.age >= existing_finding.age:
new_finding = copy.deepcopy(finding)
new_finding.service = " ".join(combined_services)
new_finding.service_url = " ".join(combined_services_url)
new_finding.vm_id = " ".join(combined_vm_ids)
new_finding.vm_id_url = " ".join(combined_vm_id_urls)
findings_map[key] = new_finding
else:
existing_finding.service = " ".join(combined_services)
existing_finding.service_url = " ".join(combined_services_url)
new_finding.vm_id = " ".join(combined_vm_ids)
new_finding.vm_id_url = " ".join(combined_vm_id_urls)
else:
findings_map[key] = copy.deepcopy(finding)

Expand Down

0 comments on commit 621b3c4

Please sign in to comment.