From 84f1ccfe153bde4c47d44a339a2e2850c0b03287 Mon Sep 17 00:00:00 2001 From: jfuller Date: Wed, 8 Mar 2023 09:20:50 +0100 Subject: [PATCH] enable --search-related-url --- griffon/output.py | 194 +++++++++++++++++++------------ griffon/services/core_queries.py | 55 ++++++++- 2 files changed, 175 insertions(+), 74 deletions(-) diff --git a/griffon/output.py b/griffon/output.py index 5ed9612..404c878 100644 --- a/griffon/output.py +++ b/griffon/output.py @@ -138,10 +138,7 @@ def text_output_products_contain_component(ctx, output, format): list(set([item["product_version"] for item in ordered_results])) ) for pv in product_versions: - names = [] - for item in ordered_results: - if pv == item["product_version"]: - names.append(item["name"]) + names = [item["name"] for item in ordered_results if pv == item["product_version"]] names = list(set(names)) for name in names: dep_name = name.replace(component_name, f"[b]{component_name}[/b]") @@ -152,76 +149,127 @@ def text_output_products_contain_component(ctx, output, format): no_wrap=False, ) if ctx.obj["VERBOSE"] == 1: # product_stream X source component - for item in ordered_results: - dep_name = item["name"].replace(component_name, f"[b]{component_name}[/b]") - dep = f"[white]({dep_name})[/white]" - root_component = "[i]Root component[/i]" - if item.get("root_component"): - root_component = item["root_component"] - console.print( - Text(item["product_stream"], style="magenta b u"), - root_component, - dep, - no_wrap=False, - ) + + product_streams = sorted( + list(set([item["product_stream"] for item in ordered_results])) + ) + for ps in product_streams: + ps_components = [item for item in ordered_results if item["product_stream"] == ps] + names = sorted(list(set([item["name"] for item in ps_components]))) + + for name in names: + sources = [] + for item in ps_components: + if item["name"] == name and "sources" in item: + sources.extend(item["sources"]) + + root_component = "root component" + if sources: + source_purl = PackageURL.from_string(sources[0]["purl"]) + root_component = source_purl.name + dep_name = name.replace(component_name, f"[b]{component_name}[/b]") + dep = f"[white]({dep_name})[/white]" + console.print( + Text(ps, style="magenta b u"), + root_component, + dep, + no_wrap=False, + ) + if ctx.obj["VERBOSE"] == 2: # product_stream X nvr - for item in ordered_results: - dep_name = item["nvr"].replace(component_name, f"[b]{component_name}[/b]") - dep = f"[white]({dep_name})[/white]" - root_component = "[i]Root component[/i]" - if item.get("root_component"): - root_component = item["root_component"] - console.print( - Text(item["product_stream"], style="magenta b u"), - root_component, - dep, - no_wrap=False, - ) - if ctx.obj["VERBOSE"] == 3: # related url - for item in ordered_results: - dep_name = item["nvr"].replace(component_name, f"[b]{component_name}[/b]") - dep = f"[white]({dep_name})[/white]" - root_component = "[i]Root component[/i]" - if item.get("root_component"): - root_component = item["root_component"] - console.print( - Text(item["product_stream"], style="magenta b u"), - root_component, - dep, - item["related_url"], - no_wrap=False, - ) - if ctx.obj["VERBOSE"] == 4: # source url - for item in ordered_results: - dep_name = item["nvr"].replace(component_name, f"[b]{component_name}[/b]") - dep = f"[white]({dep_name})[/white]" - root_component = "[i]Root component[/i]" - if item.get("root_component"): - root_component = item["root_component"] - console.print( - Text(item["product_stream"], style="magenta b u"), - root_component, - dep, - item["related_url"], - item["build_source_url"], - no_wrap=False, - ) - if ctx.obj["VERBOSE"] > 4: # source url, upstream - for item in ordered_results: - dep_name = item["nvr"].replace(component_name, f"[b]{component_name}[/b]") - dep = f"[white]({dep_name})[/white]" - root_component = "[i]Root component[/i]" - if item.get("root_component"): - root_component = item["root_component"] - console.print( - Text(item["product_stream"], style="magenta b u"), - root_component, - dep, - item["related_url"], - item["build_source_url"], - item["upstream_purl"], - no_wrap=False, - ) + + product_streams = sorted( + list(set([item["product_stream"] for item in ordered_results])) + ) + for ps in product_streams: + ps_components = [item for item in ordered_results if item["product_stream"] == ps] + names = sorted(list(set([item["name"] for item in ps_components]))) + + for name in names: + sources = [] + for item in ps_components: + if item["name"] == name and "sources" in item: + sources.extend(item["sources"]) + + root_component = "root component" + if sources: + source_purl = PackageURL.from_string(sources[0]["purl"]) + root_component = f"{source_purl.name}-{source_purl.version}" + dep_name = name.replace(component_name, f"[b]{component_name}[/b]") + dep = f"[white]({dep_name})[/white]" + console.print( + Text(ps, style="magenta b u"), + root_component, + dep, + no_wrap=False, + ) + + if ctx.obj["VERBOSE"] == 3: # source url, upstream + product_streams = sorted( + list(set([item["product_stream"] for item in ordered_results])) + ) + for ps in product_streams: + ps_components = [item for item in ordered_results if item["product_stream"] == ps] + names = sorted(list(set([item["name"] for item in ps_components]))) + for name in names: + sources = [] + related_url = "" + build_source_url = "" + for item in ps_components: + if item["name"] == name: + if "sources" in item: + sources.extend(item["sources"]) + if item["related_url"]: + related_url = item["related_url"] + if item["build_source_url"]: + build_source_url = item["build_source_url"] + root_component = "root component" + if sources: + source_purl = PackageURL.from_string(sources[0]["purl"]) + root_component = f"{source_purl.name}-{source_purl.version}" + dep_name = name.replace(component_name, f"[b]{component_name}[/b]") + dep = f"[white]({dep_name})[/white]" + console.print( + Text(ps, style="magenta b u"), + root_component, + dep, + related_url, + no_wrap=False, + ) + if ctx.obj["VERBOSE"] > 3: # source url, upstream + product_streams = sorted( + list(set([item["product_stream"] for item in ordered_results])) + ) + for ps in product_streams: + ps_components = [item for item in ordered_results if item["product_stream"] == ps] + names = sorted(list(set([item["name"] for item in ps_components]))) + for name in names: + sources = [] + related_url = "" + build_source_url = "" + for item in ps_components: + if item["name"] == name: + if "sources" in item: + sources.extend(item["sources"]) + if item["related_url"]: + related_url = item["related_url"] + if item["build_source_url"]: + build_source_url = item["build_source_url"] + root_component = "root component" + if sources: + source_purl = PackageURL.from_string(sources[0]["purl"]) + root_component = f"{source_purl.name}-{source_purl.version}" + dep_name = name.replace(component_name, f"[b]{component_name}[/b]") + dep = f"[white]({dep_name})[/white]" + console.print( + Text(ps, style="magenta b u"), + root_component, + dep, + related_url, + build_source_url, + no_wrap=False, + ) + ctx.exit() diff --git a/griffon/services/core_queries.py b/griffon/services/core_queries.py index d5f9412..ce4b77b 100644 --- a/griffon/services/core_queries.py +++ b/griffon/services/core_queries.py @@ -6,6 +6,8 @@ import logging from typing import Any, Dict, List +import requests + from griffon import CORGI_API_URL, OSIDB_API_URL, CorgiService, OSIDBService logger = logging.getLogger("griffon") @@ -168,6 +170,9 @@ def __init__(self, params: dict) -> None: self.strict_name_search = self.params.get("strict_name_search") self.search_deps = self.params.get("search_deps") self.ns = self.params.get("namespace") + self.search_all = self.params.get("search_all") + self.search_related_url = self.params.get("search_related_url") + self.search_community = self.params.get("search_community") def execute(self) -> List[Dict[str, Any]]: cond = {"view": "latest"} @@ -180,7 +185,55 @@ def execute(self) -> List[Dict[str, Any]]: cond["type"] = self.component_type result = self.corgi_session.components.retrieve_list(**cond, limit=1000) - return result.results + + results = result.results + if self.search_related_url: + # TODO - not in bindings yet + related_url_search = requests.get( + f"{CORGI_API_URL}/api/v1/components", + params={ + "include_fields": "name,arch,namespace,release,version,nvr,type,link,purl,software_build,product_versions,product_streams,sources", # noqa + "related_url": self.component_name, + "limit": 10000, + }, + ) + + for c in related_url_search.json()["results"]: + for pv in c["product_versions"]: + for ps in c["product_streams"]: + + is_dep = False + if c["arch"] == "src" or c["arch"] == "noarch": + is_dep = True + component = { + "is_dep": is_dep, + "product_version": pv["name"], + "product_version_ofuri": pv["ofuri"], + "product_stream": ps["name"], + "product_stream_ofuri": ps["ofuri"], + "product_active": True, + "purl": c["purl"], + "type": c["type"], + "namespace": c["namespace"], + "name": c["name"], + "arch": c["arch"], + "release": c["release"], + "version": c["version"], + "sources": c["sources"], + "nvr": c["nvr"], + "build_id": None, + "build_type": None, + "build_source_url": None, + "related_url": None, + "upstream_purl": None, + } + if c["software_build"]: + component["build_id"] = c["software_build"]["build_id"] + component["build_type"] = c["software_build"]["build_type"] + component["build_source_url"] = c["software_build"]["source"] + results.append(component) + + return results class components_containing_specific_component_query: