From e3bf661d08e47b1a125fa1cef5c5897b3ab23fca Mon Sep 17 00:00:00 2001 From: Niloth P <20315308+Niloth-p@users.noreply.github.com> Date: Wed, 4 Oct 2023 23:21:30 +0530 Subject: [PATCH] docs: Add missing docstrings fetch_json_db.py (#3381) --- cve_bin_tool/fetch_json_db.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/cve_bin_tool/fetch_json_db.py b/cve_bin_tool/fetch_json_db.py index cd505ed2c8..89c8a80fcb 100644 --- a/cve_bin_tool/fetch_json_db.py +++ b/cve_bin_tool/fetch_json_db.py @@ -41,6 +41,9 @@ def __init__( log_signature_error, error_mode: ErrorMode = ErrorMode.TruncTrace, ) -> None: + """ + Initialize the Fetch_JSON_DB instance. + """ self.root = cache_dir / "json_data" self.pubkey = pubkey self.ignore_signature = ignore_signature @@ -55,6 +58,9 @@ def __init__( self.failed_count = 0 async def handle_download(self): + """ + Manages the download process of JSON files from the mirror. + """ self.connector = aiohttp.TCPConnector(limit_per_host=10) async with aiohttp.ClientSession( connector=self.connector, headers=HTTP_HEADERS, trust_env=True @@ -90,6 +96,9 @@ async def handle_download(self): self.cleanup_directory() def cleanup_directory(self): + """ + Cleans up the directory structure and removes temporary files. + """ for directory in self.DIRECTORIES: if (self.root / directory).exists(): shutil.rmtree(self.root / directory) @@ -99,6 +108,9 @@ def cleanup_directory(self): Path.unlink(self.root / "metadata.json") def update_directory_structure(self): + """ + Updates the directory structure for storing downloaded files. + """ if self.root.is_dir(): shutil.rmtree(self.root) self.root.mkdir() @@ -108,6 +120,9 @@ def update_directory_structure(self): dir.mkdir() def get_download_urls(self, session): + """ + Retrieves the URLs for downloading JSON files from the mirror. + """ for key in self.metadata["db"]: self.tasks.extend( [ @@ -124,6 +139,9 @@ def get_download_urls(self, session): ) def get_failed_downloads(self): + """ + Identifies and logs unsuccessful download attempts. + """ db = {} for key in self.metadata["db"]: db[key] = [] @@ -133,6 +151,9 @@ def get_failed_downloads(self): self.metadata["db"] = db async def download_files(self, tasks, description): + """ + Downloads files asynchronously from the mirror. + """ # error_mode.value will only be greater than 1 if quiet mode. if self.error_mode.value > 1: total_tasks = len(tasks) @@ -157,6 +178,9 @@ async def download_files(self, tasks, description): self.download_failed = True async def get_metdata(self, session): + """ + Fetches and stores metadata information from the mirror. + """ resp = await session.get(f"{self.mirror}/metadata.json") resp.raise_for_status() if resp.status == 200: @@ -177,6 +201,9 @@ async def get_metdata(self, session): self.is_signed = False def verify_signature(self): + """ + Checks the authenticity of downloaded metadata using signatures. + """ temp_gnupg_home = Path(tempfile.mkdtemp(prefix=".gnupg-")) gpg = gnupg.GPG(gnupghome=temp_gnupg_home) if self.pubkey: