Skip to content
This repository has been archived by the owner on Feb 15, 2024. It is now read-only.

Commit

Permalink
Merge pull request #69 from RedHatProductSecurity/enable-search-relat…
Browse files Browse the repository at this point in the history
…ed-url

enable --search-related-url on products-contain-component
  • Loading branch information
JimFuller-RedHat authored Mar 8, 2023
2 parents 03b45c9 + 84f1ccf commit bf6066e
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 74 deletions.
194 changes: 121 additions & 73 deletions griffon/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,7 @@ def text_output_products_contain_component(ctx, output, format):
list(set([item["product_version"] for item in ordered_results]))
)
for pv in product_versions:
names = []
for item in ordered_results:
if pv == item["product_version"]:
names.append(item["name"])
names = [item["name"] for item in ordered_results if pv == item["product_version"]]
names = list(set(names))
for name in names:
dep_name = name.replace(component_name, f"[b]{component_name}[/b]")
Expand All @@ -152,76 +149,127 @@ def text_output_products_contain_component(ctx, output, format):
no_wrap=False,
)
if ctx.obj["VERBOSE"] == 1: # product_stream X source component
for item in ordered_results:
dep_name = item["name"].replace(component_name, f"[b]{component_name}[/b]")
dep = f"[white]({dep_name})[/white]"
root_component = "[i]Root component[/i]"
if item.get("root_component"):
root_component = item["root_component"]
console.print(
Text(item["product_stream"], style="magenta b u"),
root_component,
dep,
no_wrap=False,
)

product_streams = sorted(
list(set([item["product_stream"] for item in ordered_results]))
)
for ps in product_streams:
ps_components = [item for item in ordered_results if item["product_stream"] == ps]
names = sorted(list(set([item["name"] for item in ps_components])))

for name in names:
sources = []
for item in ps_components:
if item["name"] == name and "sources" in item:
sources.extend(item["sources"])

root_component = "root component"
if sources:
source_purl = PackageURL.from_string(sources[0]["purl"])
root_component = source_purl.name
dep_name = name.replace(component_name, f"[b]{component_name}[/b]")
dep = f"[white]({dep_name})[/white]"
console.print(
Text(ps, style="magenta b u"),
root_component,
dep,
no_wrap=False,
)

if ctx.obj["VERBOSE"] == 2: # product_stream X nvr
for item in ordered_results:
dep_name = item["nvr"].replace(component_name, f"[b]{component_name}[/b]")
dep = f"[white]({dep_name})[/white]"
root_component = "[i]Root component[/i]"
if item.get("root_component"):
root_component = item["root_component"]
console.print(
Text(item["product_stream"], style="magenta b u"),
root_component,
dep,
no_wrap=False,
)
if ctx.obj["VERBOSE"] == 3: # related url
for item in ordered_results:
dep_name = item["nvr"].replace(component_name, f"[b]{component_name}[/b]")
dep = f"[white]({dep_name})[/white]"
root_component = "[i]Root component[/i]"
if item.get("root_component"):
root_component = item["root_component"]
console.print(
Text(item["product_stream"], style="magenta b u"),
root_component,
dep,
item["related_url"],
no_wrap=False,
)
if ctx.obj["VERBOSE"] == 4: # source url
for item in ordered_results:
dep_name = item["nvr"].replace(component_name, f"[b]{component_name}[/b]")
dep = f"[white]({dep_name})[/white]"
root_component = "[i]Root component[/i]"
if item.get("root_component"):
root_component = item["root_component"]
console.print(
Text(item["product_stream"], style="magenta b u"),
root_component,
dep,
item["related_url"],
item["build_source_url"],
no_wrap=False,
)
if ctx.obj["VERBOSE"] > 4: # source url, upstream
for item in ordered_results:
dep_name = item["nvr"].replace(component_name, f"[b]{component_name}[/b]")
dep = f"[white]({dep_name})[/white]"
root_component = "[i]Root component[/i]"
if item.get("root_component"):
root_component = item["root_component"]
console.print(
Text(item["product_stream"], style="magenta b u"),
root_component,
dep,
item["related_url"],
item["build_source_url"],
item["upstream_purl"],
no_wrap=False,
)

product_streams = sorted(
list(set([item["product_stream"] for item in ordered_results]))
)
for ps in product_streams:
ps_components = [item for item in ordered_results if item["product_stream"] == ps]
names = sorted(list(set([item["name"] for item in ps_components])))

for name in names:
sources = []
for item in ps_components:
if item["name"] == name and "sources" in item:
sources.extend(item["sources"])

root_component = "root component"
if sources:
source_purl = PackageURL.from_string(sources[0]["purl"])
root_component = f"{source_purl.name}-{source_purl.version}"
dep_name = name.replace(component_name, f"[b]{component_name}[/b]")
dep = f"[white]({dep_name})[/white]"
console.print(
Text(ps, style="magenta b u"),
root_component,
dep,
no_wrap=False,
)

if ctx.obj["VERBOSE"] == 3: # source url, upstream
product_streams = sorted(
list(set([item["product_stream"] for item in ordered_results]))
)
for ps in product_streams:
ps_components = [item for item in ordered_results if item["product_stream"] == ps]
names = sorted(list(set([item["name"] for item in ps_components])))
for name in names:
sources = []
related_url = ""
build_source_url = ""
for item in ps_components:
if item["name"] == name:
if "sources" in item:
sources.extend(item["sources"])
if item["related_url"]:
related_url = item["related_url"]
if item["build_source_url"]:
build_source_url = item["build_source_url"]
root_component = "root component"
if sources:
source_purl = PackageURL.from_string(sources[0]["purl"])
root_component = f"{source_purl.name}-{source_purl.version}"
dep_name = name.replace(component_name, f"[b]{component_name}[/b]")
dep = f"[white]({dep_name})[/white]"
console.print(
Text(ps, style="magenta b u"),
root_component,
dep,
related_url,
no_wrap=False,
)
if ctx.obj["VERBOSE"] > 3: # source url, upstream
product_streams = sorted(
list(set([item["product_stream"] for item in ordered_results]))
)
for ps in product_streams:
ps_components = [item for item in ordered_results if item["product_stream"] == ps]
names = sorted(list(set([item["name"] for item in ps_components])))
for name in names:
sources = []
related_url = ""
build_source_url = ""
for item in ps_components:
if item["name"] == name:
if "sources" in item:
sources.extend(item["sources"])
if item["related_url"]:
related_url = item["related_url"]
if item["build_source_url"]:
build_source_url = item["build_source_url"]
root_component = "root component"
if sources:
source_purl = PackageURL.from_string(sources[0]["purl"])
root_component = f"{source_purl.name}-{source_purl.version}"
dep_name = name.replace(component_name, f"[b]{component_name}[/b]")
dep = f"[white]({dep_name})[/white]"
console.print(
Text(ps, style="magenta b u"),
root_component,
dep,
related_url,
build_source_url,
no_wrap=False,
)

ctx.exit()


Expand Down
55 changes: 54 additions & 1 deletion griffon/services/core_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import logging
from typing import Any, Dict, List

import requests

from griffon import CORGI_API_URL, OSIDB_API_URL, CorgiService, OSIDBService

logger = logging.getLogger("griffon")
Expand Down Expand Up @@ -168,6 +170,9 @@ def __init__(self, params: dict) -> None:
self.strict_name_search = self.params.get("strict_name_search")
self.search_deps = self.params.get("search_deps")
self.ns = self.params.get("namespace")
self.search_all = self.params.get("search_all")
self.search_related_url = self.params.get("search_related_url")
self.search_community = self.params.get("search_community")

def execute(self) -> List[Dict[str, Any]]:
cond = {"view": "latest"}
Expand All @@ -180,7 +185,55 @@ def execute(self) -> List[Dict[str, Any]]:
cond["type"] = self.component_type

result = self.corgi_session.components.retrieve_list(**cond, limit=1000)
return result.results

results = result.results
if self.search_related_url:
# TODO - not in bindings yet
related_url_search = requests.get(
f"{CORGI_API_URL}/api/v1/components",
params={
"include_fields": "name,arch,namespace,release,version,nvr,type,link,purl,software_build,product_versions,product_streams,sources", # noqa
"related_url": self.component_name,
"limit": 10000,
},
)

for c in related_url_search.json()["results"]:
for pv in c["product_versions"]:
for ps in c["product_streams"]:

is_dep = False
if c["arch"] == "src" or c["arch"] == "noarch":
is_dep = True
component = {
"is_dep": is_dep,
"product_version": pv["name"],
"product_version_ofuri": pv["ofuri"],
"product_stream": ps["name"],
"product_stream_ofuri": ps["ofuri"],
"product_active": True,
"purl": c["purl"],
"type": c["type"],
"namespace": c["namespace"],
"name": c["name"],
"arch": c["arch"],
"release": c["release"],
"version": c["version"],
"sources": c["sources"],
"nvr": c["nvr"],
"build_id": None,
"build_type": None,
"build_source_url": None,
"related_url": None,
"upstream_purl": None,
}
if c["software_build"]:
component["build_id"] = c["software_build"]["build_id"]
component["build_type"] = c["software_build"]["build_type"]
component["build_source_url"] = c["software_build"]["source"]
results.append(component)

return results


class components_containing_specific_component_query:
Expand Down

0 comments on commit bf6066e

Please sign in to comment.