From ce06ffc85f398d47552376741161c915e2b2d217 Mon Sep 17 00:00:00 2001 From: amd-pworfolk Date: Fri, 6 Dec 2024 11:40:03 -0800 Subject: [PATCH] Integrated wmi module for accessing system information Signed-off-by: amd-pworfolk --- setup.py | 1 + src/turnkeyml/common/build.py | 206 +------------- src/turnkeyml/common/system_info.py | 409 ++++++++++++++++++++++++++++ src/turnkeyml/sequence/sequence.py | 3 +- 4 files changed, 413 insertions(+), 206 deletions(-) create mode 100644 src/turnkeyml/common/system_info.py diff --git a/setup.py b/setup.py index 1d233fae..c5afc06e 100644 --- a/setup.py +++ b/setup.py @@ -46,6 +46,7 @@ "fasteners", "GitPython>=3.1.40", "psutil", + "wmi", ], extras_require={ "llm": [ diff --git a/src/turnkeyml/common/build.py b/src/turnkeyml/common/build.py index d90956f0..5637836b 100644 --- a/src/turnkeyml/common/build.py +++ b/src/turnkeyml/common/build.py @@ -2,12 +2,8 @@ import logging import sys import traceback -import platform -import subprocess from typing import Dict, Union import hashlib -import re -import importlib.metadata import psutil import yaml import torch @@ -196,7 +192,7 @@ def get_shapes_and_dtypes(inputs: dict): class Logger: """ - Redirects stdout to to file (and console if needed) + Redirects stdout to file (and console if needed) """ def __init__( @@ -265,203 +261,3 @@ def write(self, message): def flush(self): # needed for python 3 compatibility. pass - - -def get_windows_driver_version(device_name: str) -> str: - """ - Returns the driver version for a given device name. If not found, returns None. - - This information is extracted from parsing the output of the powershell Get-WmiObject command. - - All drivers and versions can be viewed with the PowerShell command: - > Get-WmiObject Win32_PnPSignedDriver | select DeviceName, Manufacturer, DriverVersion - - To find a specific driver version by name: - > Get-WmiObject Win32_PnPSignedDriver | select DeviceName, DriverVersion | - where-object {$_.DeviceName -eq 'NPU Compute Accelerator Device'} - """ - - cmd = ( - "Get-WmiObject Win32_PnPSignedDriver | select DeviceName, DriverVersion | " - + "where-object {$_.DeviceName -eq '" - + device_name - + "'}" - ) - try: - output = subprocess.run( - "powershell " + cmd, capture_output=True, text=True, check=True - ).stdout - return output.split("\n")[3].split()[-1] - except Exception: # pylint: disable=broad-except - return "" - - -def get_system_info(): - os_type = platform.system() - info_dict = {} - - # Get OS Version - try: - info_dict["OS Version"] = platform.platform() - except Exception as e: # pylint: disable=broad-except - info_dict["Error OS Version"] = str(e) - - def get_cim_info(command): - try: - output = subprocess.run( - "powershell " + command, capture_output=True, text=True, check=True - ).stdout - return output.strip() - except Exception as e: # pylint: disable=broad-except - return str(e) - - if os_type == "Windows": - - processor = get_cim_info( - "Get-CimInstance -ClassName Win32_Processor | " - "%{ '{0}###({1} cores, {2} logical processors)' " - "-f $_.Name,$_.NumberOfCores,$_.NumberOfLogicalProcessors }" - ) - info_dict["Processor"] = " ".join( - [str.strip() for str in processor.split("###")] - ) - - info_dict["OEM System"] = get_cim_info( - "Get-CimInstance -ClassName Win32_ComputerSystem | " - "Select-Object -ExpandProperty Model" - ) - - memory = get_cim_info( - "Get-CimInstance -ClassName Win32_PhysicalMemory | " - "%{ '{0} {1} GB {2} ns' -f $_.Manufacturer,($_.Capacity/1gb),$_.Speed }" - ) - info_dict["Physical Memory"] = " + ".join(memory.split("\n")) - - info_dict["BIOS Version"] = get_cim_info( - "Get-CimInstance -ClassName Win32_BIOS | " - "Select-Object -ExpandProperty SMBIOSBIOSVersion" - ) - - info_dict["CPU Max Clock"] = get_cim_info( - "Get-CimInstance -ClassName Win32_Processor | " - "Select-Object -ExpandProperty MaxClockSpeed" - ) - info_dict["CPU Max Clock"] += " MHz" - - # Get driver versions - device_names = [ - "NPU Compute Accelerator Device", - "AMD-OpenCL User Mode Driver", - ] - driver_versions = {} - for device_name in device_names: - driver_version = get_windows_driver_version(device_name) - driver_versions[device_name] = ( - driver_version if len(driver_version) else "DRIVER NOT FOUND" - ) - info_dict["Driver Versions"] = driver_versions - - # Get NPU power mode - if "AMD" in info_dict["Processor"]: - try: - out = subprocess.check_output( - [ - r"C:\Windows\System32\AMD\xrt-smi.exe", - "examine", - "-r", - "platform", - ], - stderr=subprocess.STDOUT, - ).decode() - lines = out.splitlines() - modes = [line.split()[-1] for line in lines if "Mode" in line] - if len(modes) > 0: - info_dict["NPU Power Mode"] = modes[0] - except FileNotFoundError: - # xrt-smi not present - pass - except subprocess.CalledProcessError as e: - info_dict["NPU Power Mode ERROR"] = e.output.decode() - - # Get windows power setting - try: - out = subprocess.check_output(["powercfg", "/getactivescheme"]).decode() - info_dict["Windows Power Setting"] = re.search(r"\((.*?)\)", out).group(1) - except subprocess.CalledProcessError as e: - info_dict["Windows Power Setting ERROR"] = e.output.decode() - - elif os_type == "Linux": - # WSL has to be handled differently compared to native Linux - if "microsoft" in str(platform.release()): - try: - oem_info = ( - subprocess.check_output( - 'powershell.exe -Command "wmic computersystem get model"', - shell=True, - ) - .decode() - .strip() - ) - oem_info = ( - oem_info.replace("\r", "") - .replace("\n", "") - .split("Model")[-1] - .strip() - ) - info_dict["OEM System"] = oem_info - except Exception as e: # pylint: disable=broad-except - info_dict["Error OEM System (WSL)"] = str(e) - - else: - # Get OEM System Information - try: - oem_info = ( - subprocess.check_output( - "sudo -n dmidecode -s system-product-name", - shell=True, - stderr=subprocess.DEVNULL, - ) - .decode() - .strip() - .replace("\n", " ") - ) - info_dict["OEM System"] = oem_info - except subprocess.CalledProcessError: - # This catches the case where sudo requires a password - info_dict["OEM System"] = "Unable to get oem info - password required" - except Exception as e: # pylint: disable=broad-except - info_dict["Error OEM System"] = str(e) - - # Get CPU Information - try: - cpu_info = subprocess.check_output("lscpu", shell=True).decode() - for line in cpu_info.split("\n"): - if "Model name:" in line: - info_dict["Processor"] = line.split(":")[1].strip() - break - except Exception as e: # pylint: disable=broad-except - info_dict["Error Processor"] = str(e) - - # Get Memory Information - try: - mem_info = ( - subprocess.check_output("free -m", shell=True) - .decode() - .split("\n")[1] - .split()[1] - ) - mem_info_gb = round(int(mem_info) / 1024, 2) - info_dict["Memory Info"] = f"{mem_info_gb} GB" - except Exception as e: # pylint: disable=broad-except - info_dict["Error Memory Info"] = str(e) - - else: - info_dict["Error"] = "Unsupported OS" - - # Get Python Packages - distributions = importlib.metadata.distributions() - info_dict["Python Packages"] = [ - f"{dist.metadata['name']}=={dist.metadata['version']}" for dist in distributions - ] - - return info_dict diff --git a/src/turnkeyml/common/system_info.py b/src/turnkeyml/common/system_info.py new file mode 100644 index 00000000..8439467a --- /dev/null +++ b/src/turnkeyml/common/system_info.py @@ -0,0 +1,409 @@ +import platform +import subprocess +import re +import importlib.metadata +import wmi +from abc import ABC + + +class SystemInfo(ABC): + """Abstract base class for OS-dependent system information classes""" + + def __init__(self): + pass + + def get_dict(self): + """ + Retrieves all the system information into a dictionary + + Returns: + dict: System information + """ + info_dict = { + "OS Version": self.get_os_version(), + "Python Packages": self.get_python_packages(), + } + return info_dict + + @staticmethod + def get_os_version() -> str: + """ + Retrieves the OS version. + + Returns: + str: OS Version + """ + try: + return platform.platform() + except Exception as e: # pylint: disable=broad-except + return f"ERROR - {e}" + + @staticmethod + def get_python_packages() -> list: + """ + Retrieves the Python package versions. + + Returns: + list: List of Python package versions in the form ["package-name==package-version", ...] + """ + # Get Python Packages + distributions = importlib.metadata.distributions() + return [ + f"{dist.metadata['name']}=={dist.metadata['version']}" + for dist in distributions + ] + + +class WindowsSystemInfo(SystemInfo): + """Class used to access system information in Windows""" + + def __init__(self): + super().__init__() + self.connection = wmi.WMI() + + def get_processor_name(self) -> str: + """ + Retrieves the name of the processor. + + Returns: + str: Name of the processor. + """ + processors = self.connection.Win32_Processor() + if processors: + return f"{processors[0].Name.strip()} ({processors[0].NumberOfCores} cores, {processors[0].NumberOfLogicalProcessors} logical processors)" + return "Processor information not found." + + def get_system_model(self) -> str: + """ + Retrieves the model of the computer system. + + Returns: + str: Model of the computer system. + """ + systems = self.connection.Win32_ComputerSystem() + if systems: + return systems[0].Model + return "System model information not found." + + def get_physical_memory(self) -> str: + """ + Retrieves the physical memory of the computer system. + + Returns: + str: Physical memory + """ + memory = self.connection.Win32_PhysicalMemory() + if memory: + total_capacity = sum([int(m.Capacity) for m in memory]) + total_capacity_str = f"{total_capacity/(1024**3)} GB" + details_str = " + ".join( + [ + f"{m.Manufacturer} {int(m.Capacity)/(1024**3)} GB {m.Speed} ns" + for m in memory + ] + ) + return total_capacity_str + " (" + details_str + ")" + return "Physical memory information not found." + + def get_bios_version(self) -> str: + """ + Retrieves the BIOS Version of the computer system. + + Returns: + str: BIOS Version + """ + bios = self.connection.Win32_BIOS() + if bios: + return bios[0].Name + return "BIOS Version not found." + + def get_max_clock_speed(self) -> str: + """ + Retrieves the max clock speed of the CPU of the system. + + Returns: + str: Max CPU clock speed + """ + processor = self.connection.Win32_Processor() + if processor: + return f"{processor[0].MaxClockSpeed} MHz" + return "Max CPU clock speed not found." + + def get_driver_version(self, device_name) -> str: + """ + Retrieves the driver version for the specified device name. + + Returns: + str: Driver version, or None if device driver not found + """ + drivers = self.connection.Win32_PnPSignedDriver(DeviceName=device_name) + if drivers: + return drivers[0].DriverVersion + return "" + + @staticmethod + def get_npu_power_mode() -> str: + """ + Retrieves the NPU power mode. + + Returns: + str: NPU power mode + """ + try: + out = subprocess.check_output( + [ + r"C:\Windows\System32\AMD\xrt-smi.exe", + "examine", + "-r", + "platform", + ], + stderr=subprocess.STDOUT, + ).decode() + lines = out.splitlines() + modes = [line.split()[-1] for line in lines if "Mode" in line] + if len(modes) > 0: + return modes[0] + except FileNotFoundError: + # xrt-smi not present + pass + except subprocess.CalledProcessError as e: + pass + return "NPU power mode not found." + + @staticmethod + def get_windows_power_setting() -> str: + """ + Retrieves the Windows power setting. + + Returns: + str: Windows power setting. + """ + try: + out = subprocess.check_output(["powercfg", "/getactivescheme"]).decode() + return re.search(r"\((.*?)\)", out).group(1) + except subprocess.CalledProcessError as e: + pass + return "Windows power setting not found" + + def get_dict(self) -> dict: + """ + Retrieves all the system information into a dictionary + + Returns: + dict: System information + """ + info_dict = super().get_dict() + info_dict["Processor"] = self.get_processor_name() + info_dict["OEM System"] = self.get_system_model() + info_dict["Physical Memory"] = self.get_physical_memory() + info_dict["BIOS Version"] = self.get_bios_version() + info_dict["CPU Max Clock"] = self.get_max_clock_speed() + info_dict["Windows Power Setting"] = self.get_windows_power_setting() + if "AMD" in info_dict["Processor"]: + device_names = [ + "NPU Compute Accelerator Device", + "AMD-OpenCL User Mode Driver", + ] + driver_versions = { + device_name: self.get_driver_version(device_name) + for device_name in device_names + } + info_dict["Driver Versions"] = { + k: (v if len(v) else "DEVICE NOT FOUND") + for k, v in driver_versions.items() + } + info_dict["NPU Power Mode"] = self.get_npu_power_mode() + return info_dict + + +class WSLSystemInfo(SystemInfo): + """Class used to access system information in WSL""" + + def __init__(self): + super().__init__() + + @staticmethod + def get_system_model() -> str: + """ + Retrieves the model of the computer system. + + Returns: + str: Model of the computer system. + """ + try: + oem_info = ( + subprocess.check_output( + 'powershell.exe -Command "wmic computersystem get model"', + shell=True, + ) + .decode() + .strip() + ) + oem_info = ( + oem_info.replace("\r", "").replace("\n", "").split("Model")[-1].strip() + ) + return oem_info + except Exception as e: # pylint: disable=broad-except + return f"ERROR - {e}" + + def get_dict(self) -> dict: + """ + Retrieves all the system information into a dictionary + + Returns: + dict: System information + """ + info_dict = super().get_dict() + info_dict["OEM System"] = self.get_system_model() + return info_dict + + +class LinuxSystemInfo(SystemInfo): + """Class used to access system information in Linux""" + + def __init__(self): + super().__init__() + + @staticmethod + def get_processor_name() -> str: + """ + Retrieves the name of the processor. + + Returns: + str: Name of the processor. + """ + # Get CPU Information + try: + cpu_info = subprocess.check_output("lscpu", shell=True).decode() + for line in cpu_info.split("\n"): + if "Model name:" in line: + return line.split(":")[1].strip() + except Exception as e: # pylint: disable=broad-except + return f"ERROR - {e}" + + @staticmethod + def get_system_model() -> str: + """ + Retrieves the model of the computer system. + + Returns: + str: Model of the computer system. + """ + # Get OEM System Information + try: + oem_info = ( + subprocess.check_output( + "sudo -n dmidecode -s system-product-name", + shell=True, + stderr=subprocess.DEVNULL, + ) + .decode() + .strip() + .replace("\n", " ") + ) + return oem_info + except subprocess.CalledProcessError: + # This catches the case where sudo requires a password + return "Unable to get oem info - password required" + except Exception as e: # pylint: disable=broad-except + return f"ERROR - {e}" + + @staticmethod + def get_physical_memory() -> str: + """ + Retrieves the physical memory of the computer system. + + Returns: + str: Physical memory + """ + try: + mem_info = ( + subprocess.check_output("free -m", shell=True) + .decode() + .split("\n")[1] + .split()[1] + ) + mem_info_gb = round(int(mem_info) / 1024, 2) + return f"{mem_info_gb} GB" + except Exception as e: # pylint: disable=broad-except + return f"ERROR - {e}" + + def get_dict(self) -> dict: + """ + Retrieves all the system information into a dictionary + + Returns: + dict: System information + """ + info_dict = super().get_dict() + info_dict["Processor"] = self.get_processor_name() + info_dict["OEM System"] = self.get_system_model() + info_dict["Physical Memory"] = self.get_physical_memory() + return info_dict + + +class UnsupportedOSSystemInfo(SystemInfo): + """Class used to access system information in unsupported operating systems""" + + def __init__(self): + super().__init__() + + def get_dict(self): + """ + Retrieves all the system information into a dictionary + + Returns: + dict: System information + """ + info_dict = super().get_dict() + info_dict["Error"] = "UNSUPPORTED OS" + return info_dict + + +def get_system_info() -> SystemInfo: + """ + Creates the appropriate SystemInfo object based on the operating system. + + Returns: + A subclass of SystemInfo for the current operating system. + """ + os_type = platform.system() + if os_type == "Windows": + return WindowsSystemInfo() + elif os_type == "Linux": + # WSL has to be handled differently compared to native Linux + if "microsoft" in str(platform.release()): + return WSLSystemInfo() + else: + return LinuxSystemInfo() + else: + return UnsupportedOSSystemInfo() + + +def get_system_info_dict() -> dict: + """ + Puts the system information into a dictionary. + + Returns: + dict: Dictionary containing the system information. + """ + return get_system_info().get_dict() + + +def main(): + """ + Prints the system information dictionary. + """ + info_dict = get_system_info_dict() + for k, v in info_dict.items(): + print(f"{k}: {v}") + + +# def temp(self): +# c = wmi.WMI(moniker="//./root/cimv2/power") +# pp = c.Win32_PowerPlan() +# pass + + +if __name__ == "__main__": + main() diff --git a/src/turnkeyml/sequence/sequence.py b/src/turnkeyml/sequence/sequence.py index 483ad988..dbf9a97b 100644 --- a/src/turnkeyml/sequence/sequence.py +++ b/src/turnkeyml/sequence/sequence.py @@ -7,6 +7,7 @@ import turnkeyml.common.printing as printing import turnkeyml.common.exceptions as exp import turnkeyml.common.build as build +from turnkeyml.common.system_info import get_system_info_dict import turnkeyml.common.filesystem as fs import turnkeyml.common.status as status from turnkeyml.tools.tool import Tool @@ -133,7 +134,7 @@ def launch( ) # Save the system information used for this build - system_info = build.get_system_info() + system_info = get_system_info_dict() state.save_stat( fs.Keys.SYSTEM_INFO, system_info,