From fb3ceb779ca3041f865fa3056cb6795b33da663b Mon Sep 17 00:00:00 2001 From: Daniel Deutsch Date: Mon, 15 Jul 2024 11:28:31 +0000 Subject: [PATCH] Ran ruff linter and fixed issues --- cli/bin/airgap/download_scripts.py | 9 ++++++--- cli/bin/airgap/upload_compliance_scripts.py | 2 +- cli/bin/airgap/upload_scripts.py | 2 +- cyberwatch_api.py | 8 +++++--- examples/add_windows_os_to_rules_of_repository.py | 1 - examples/change_asset_group_from_hostname.py | 1 - examples/retrieve_assets_with_kb_patch_available.py | 3 ++- examples/retrieve_high_priorities_cve.py | 3 ++- 8 files changed, 17 insertions(+), 12 deletions(-) diff --git a/cli/bin/airgap/download_scripts.py b/cli/bin/airgap/download_scripts.py index ba95486..d33985e 100644 --- a/cli/bin/airgap/download_scripts.py +++ b/cli/bin/airgap/download_scripts.py @@ -66,13 +66,16 @@ def download_scripts(destination_folder, CBW_API, verify_ssl=False, with_attachm print("\033[A\033[A\nDownload complete ! Scripts are located in '" + str(destination_folder) + "'" + " " * 20) def append_extension(target_os): - if target_os in ("Aix", "Linux", "Macos", "Vmware"): return ".sh" - if target_os == "Windows": return ".ps1" + if target_os in ("Aix", "Linux", "Macos", "Vmware"): + return ".sh" + if target_os == "Windows": + return ".ps1" return "" def download_individual_script(scriptID, base_dir, CBW_API, with_attachment=False, verify_ssl=False): script = retrieve_scripts(str(scriptID), CBW_API, verify_ssl) - if script is None or script["type"] is None: return None, None + if script is None or script["type"] is None: + return None, None target_os, script_name = script["type"].split("::")[1:] script_dir = join(base_dir + "/" + target_os) diff --git a/cli/bin/airgap/upload_compliance_scripts.py b/cli/bin/airgap/upload_compliance_scripts.py index bd0b904..e6afa65 100644 --- a/cli/bin/airgap/upload_compliance_scripts.py +++ b/cli/bin/airgap/upload_compliance_scripts.py @@ -50,7 +50,7 @@ def manager(arguments, CBW_API, verify_ssl=False): help() elif not options.files: - if not "cyberwatch-airgap-compliance" in os.listdir("."): + if "cyberwatch-airgap-compliance" not in os.listdir("."): help() else: print("No file has been specified, searching through the `cyberwatch-airgap-compliance/uploads/` directory.\n--") diff --git a/cli/bin/airgap/upload_scripts.py b/cli/bin/airgap/upload_scripts.py index ba035be..4fd9361 100644 --- a/cli/bin/airgap/upload_scripts.py +++ b/cli/bin/airgap/upload_scripts.py @@ -53,7 +53,7 @@ def manager(arguments, CBW_API, verify_ssl=False): help() elif not options.files: - if not "cyberwatch-airgap" in os.listdir("."): + if "cyberwatch-airgap" not in os.listdir("."): help() else: print("No file has been specified, searching through the `cyberwatch-airgap/uploads/` directory.\n--") diff --git a/cyberwatch_api.py b/cyberwatch_api.py index 0db3112..4ace65e 100644 --- a/cyberwatch_api.py +++ b/cyberwatch_api.py @@ -52,7 +52,7 @@ def wrapper(*args, **kwargs): parameter_value = (kwargs.get("params",{}).get(parameter) or kwargs.get("body_params",{}).get(parameter)) endpoint = endpoint.replace("{" + parameter + "}", str(parameter_value)) # Deleting the 'parameter' from the kwargs arguments if it exists - [kwargs[key].pop(parameter, "") for key in kwargs if type(kwargs[key]) == dict] + [kwargs[key].pop(parameter, "") for key in kwargs if isinstance(kwargs[key], dict)] kwargs.update({"endpoint": endpoint}) return f(*args, **kwargs) @@ -125,8 +125,10 @@ def request(self, **kwargs) -> Generator[requests.models.Response, None, None]: """ Only accessible method, handles every step of the API call """ - if not isinstance(kwargs.get("method"), str): raise Exception("The type of endpoint parameter should be str") - if kwargs.get("timeout") and not isinstance(kwargs.get("timeout"), int): raise Exception("The type of timeout parameter should be int") + if not isinstance(kwargs.get("method"), str): + raise Exception("The type of endpoint parameter should be str") + if kwargs.get("timeout") and not isinstance(kwargs.get("timeout"), int): + raise Exception("The type of timeout parameter should be int") method = str(kwargs.get("method")).upper() timeout = kwargs.get("timeout") or 10 diff --git a/examples/add_windows_os_to_rules_of_repository.py b/examples/add_windows_os_to_rules_of_repository.py index 7e7b775..261f91a 100644 --- a/examples/add_windows_os_to_rules_of_repository.py +++ b/examples/add_windows_os_to_rules_of_repository.py @@ -1,5 +1,4 @@ from cyberwatch_api import Cyberwatch_Pyhelper -from datetime import datetime """ This script will fetch all the rules associated to the repository list defined, and diff --git a/examples/change_asset_group_from_hostname.py b/examples/change_asset_group_from_hostname.py index 8296791..c97aca8 100644 --- a/examples/change_asset_group_from_hostname.py +++ b/examples/change_asset_group_from_hostname.py @@ -1,6 +1,5 @@ # This script will replace asset's groups by another one from cyberwatch_api import Cyberwatch_Pyhelper -import json import requests from urllib3.exceptions import InsecureRequestWarning diff --git a/examples/retrieve_assets_with_kb_patch_available.py b/examples/retrieve_assets_with_kb_patch_available.py index 4b35105..38288f8 100644 --- a/examples/retrieve_assets_with_kb_patch_available.py +++ b/examples/retrieve_assets_with_kb_patch_available.py @@ -30,7 +30,8 @@ def retrieve_asset_cves_patches(assetID): def launch_script(): assets = retrieve_assets() print("[+] " + str(len(assets)) + " assets were found on this cyberwatch node") - if not assets: return + if not assets: + return assets_with_KB_patch_available = {} # All assets with at least one KB patch available diff --git a/examples/retrieve_high_priorities_cve.py b/examples/retrieve_high_priorities_cve.py index 5a6fb13..5299cb9 100644 --- a/examples/retrieve_high_priorities_cve.py +++ b/examples/retrieve_high_priorities_cve.py @@ -245,7 +245,8 @@ def launch_script(): # Outputing print("\n\n================= " + str(len(high_priority_cve_set)) + " new high-priority CVEs found ================='") - for key, value in high_priority_cve_set.items(): print("{: >20} : {}".format(key, value)) + for key, value in high_priority_cve_set.items(): + print("{: >20} : {}".format(key, value)) # Send the email try: