diff --git a/docs/logging.md b/docs/logging.md index 97e0a4ad1..58c5e236f 100644 --- a/docs/logging.md +++ b/docs/logging.md @@ -29,7 +29,7 @@ from gallia.log import get_logger, setup_logging, Loglevel # The logfile's loglevel is Loglevel.DEBUG. # It can be set with the keyword argument file_level. -setup_logging(level=Loglevel.INFO, path="log.json.zst") +setup_logging(level=Loglevel.INFO) logger = get_logger("test") logger.info("hello world") logger.debug("hello debug") diff --git a/src/gallia/cli.py b/src/gallia/cli.py index 5e5e2d2a2..172bdab74 100644 --- a/src/gallia/cli.py +++ b/src/gallia/cli.py @@ -32,6 +32,7 @@ load_ecu_plugin_eps, load_transport_plugin_eps, ) +from gallia.utils import get_log_level def load_parsers() -> Parsers: @@ -328,6 +329,7 @@ def cmd_show_plugins() -> None: def cmd_template(args: argparse.Namespace) -> None: template = """# [gallia] # verbosity = +# no-volatile-info = # trace_log = # lock_file = @@ -391,9 +393,6 @@ def main() -> None: argcomplete.autocomplete(parser) args = parser.parse_args() - # Will be set to the correct verbosity later. - setup_logging() - if args.show_config: cmd_show_config(args, config, config_path) sys.exit(exitcode.OK) @@ -418,6 +417,13 @@ def main() -> None: args.help_func() parser.exit(exitcode.USAGE) + setup_logging( + level=get_log_level(args), + no_volatile_info=args.no_volatile_info + if hasattr(args, "no_volatile_info") + else True, + ) + sys.exit(args.cls_object.entry_point(args)) diff --git a/src/gallia/command/base.py b/src/gallia/command/base.py index bde2ddcd8..a2c2d85ef 100644 --- a/src/gallia/command/base.py +++ b/src/gallia/command/base.py @@ -25,12 +25,12 @@ from gallia.config import Config from gallia.db.handler import DBHandler from gallia.dumpcap import Dumpcap -from gallia.log import Loglevel, get_logger, setup_logging, tz +from gallia.log import add_zst_log_handler, get_logger, tz from gallia.plugins import load_transport from gallia.powersupply import PowerSupply, PowerSupplyURI from gallia.services.uds.core.exception import UDSException from gallia.transports import BaseTransport, TargetURI -from gallia.utils import camel_to_snake, dump_args +from gallia.utils import camel_to_snake, dump_args, get_file_log_level @unique @@ -68,6 +68,9 @@ def json(self) -> str: return msgspec.json.encode(self).decode() +logger = get_logger("gallia.base") + + class BaseCommand(ABC): """BaseCommand is the baseclass for all gallia commands. This class can be used in standalone scripts via the @@ -93,8 +96,6 @@ class BaseCommand(ABC): #: The string which is shown at the bottom of --help. EPILOG: str | None = None - #: The name of the logger when this command is run. - LOGGER_NAME = "gallia" #: Enable a artifacts_dir. Setting this property to #: True enables the creation of a logfile. HAS_ARTIFACTS_DIR: bool = False @@ -105,7 +106,6 @@ class BaseCommand(ABC): def __init__(self, parser: ArgumentParser, config: Config = Config()) -> None: self.id = camel_to_snake(self.__class__.__name__) - self.logger = get_logger(self.LOGGER_NAME) self.parser = parser self.config = config self.artifacts_dir = Path() @@ -128,19 +128,6 @@ def __init__(self, parser: ArgumentParser, config: Config = Config()) -> None: def run(self, args: Namespace) -> int: ... - def get_log_level(self, args: Namespace) -> Loglevel: - level = Loglevel.INFO - if args.verbose == 1: - level = Loglevel.DEBUG - elif args.verbose >= 2: - level = Loglevel.TRACE - return level - - def get_file_log_level(self, args: Namespace) -> Loglevel: - if args.trace_log: - return Loglevel.TRACE - return Loglevel.TRACE if args.verbose >= 2 else Loglevel.DEBUG - def run_hook( self, variant: HookVariant, @@ -185,16 +172,14 @@ def run_hook( stdout = p.stdout stderr = p.stderr except CalledProcessError as e: - self.logger.warning( - f"{variant.value}-hook failed (exit code: {p.returncode})" - ) + logger.warning(f"{variant.value}-hook failed (exit code: {p.returncode})") stdout = e.stdout stderr = e.stderr if stdout: - self.logger.info(p.stdout.strip(), extra={"tags": [hook_id, "stdout"]}) + logger.info(p.stdout.strip(), extra={"tags": [hook_id, "stdout"]}) if stderr: - self.logger.info(p.stderr.strip(), extra={"tags": [hook_id, "stderr"]}) + logger.info(p.stderr.strip(), extra={"tags": [hook_id, "stderr"]}) def configure_class_parser(self) -> None: group = self.parser.add_argument_group("generic arguments") @@ -205,6 +190,12 @@ def configure_class_parser(self) -> None: default=self.config.get_value("gallia.verbosity", 0), help="increase verbosity on the console", ) + group.add_argument( + "--no-volatile-info", + action="store_true", + default=self.config.get_value("gallia.no-volatile-info", False), + help="do not overwrite log lines with level info or lower in terminal output", + ) group.add_argument( "--trace-log", action=argparse.BooleanOptionalAction, @@ -328,9 +319,9 @@ def _aquire_flock(self, path: Path) -> None: # log a message and do a blocking wait afterwards. fcntl.flock(self._lock_file_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) except BlockingIOError: - self.logger.notice(f"Waiting for flock: {path}") + logger.notice(f"Waiting for flock: {path}") fcntl.flock(self._lock_file_fd, fcntl.LOCK_EX) - self.logger.info("Acquired lock. Continuing.") + logger.info("Acquired lock. Continuing.") def _release_flock(self) -> None: assert self._lock_file_fd @@ -342,7 +333,7 @@ def entry_point(self, args: Namespace) -> int: try: self._aquire_flock(p) except OSError as e: - self.logger.critical(f"Unable to lock {p}: {e}") + logger.critical(f"Unable to lock {p}: {e}") return exitcode.OSFILE if self.HAS_ARTIFACTS_DIR: @@ -350,13 +341,11 @@ def entry_point(self, args: Namespace) -> int: args.artifacts_base, args.artifacts_dir, ) - setup_logging( - self.get_log_level(args), - self.get_file_log_level(args), - self.artifacts_dir.joinpath(FileNames.LOGFILE.value), + add_zst_log_handler( + logger_name="gallia", + filepath=self.artifacts_dir.joinpath(FileNames.LOGFILE.value), + file_log_level=get_file_log_level(args), ) - else: - setup_logging(self.get_log_level(args)) if args.hooks: self.run_hook(HookVariant.PRE, args) @@ -379,14 +368,14 @@ def entry_point(self, args: Namespace) -> int: if isinstance(e, t): # TODO: Map the exitcode to superclass of builtin exceptions. exit_code = exitcode.IOERR - self.logger.critical( + logger.critical( f"Caught expected exception, stack trace on debug level: {e!r}" ) - self.logger.debug(e, exc_info=True) + logger.debug(e, exc_info=True) break else: exit_code = exitcode.SOFTWARE - self.logger.critical(e, exc_info=True) + logger.critical(e, exc_info=True) finally: self.run_meta.exit_code = exit_code self.run_meta.end_time = datetime.now(tz).isoformat() @@ -395,7 +384,7 @@ def entry_point(self, args: Namespace) -> int: self.artifacts_dir.joinpath(FileNames.META.value).write_text( self.run_meta.json() + "\n" ) - self.logger.notice(f"Stored artifacts at {self.artifacts_dir}") + logger.notice(f"Stored artifacts at {self.artifacts_dir}") if args.hooks: self.run_hook(HookVariant.POST, args, exit_code) @@ -521,16 +510,14 @@ async def _db_finish_run_meta(self, args: Namespace, exit_code: int) -> None: self.artifacts_dir, ) except Exception as e: - self.logger.warning( + logger.warning( f"Could not write the run meta to the database: {e!r}" ) try: await self.db_handler.disconnect() except Exception as e: - self.logger.error( - f"Could not close the database connection properly: {e!r}" - ) + logger.error(f"Could not close the database connection properly: {e!r}") async def setup(self, args: Namespace) -> None: if args.target is None: diff --git a/src/gallia/command/uds.py b/src/gallia/command/uds.py index 99c282ccb..fbbeedd1b 100644 --- a/src/gallia/command/uds.py +++ b/src/gallia/command/uds.py @@ -9,11 +9,14 @@ from gallia.command.base import FileNames, Scanner from gallia.config import Config +from gallia.log import get_logger from gallia.plugins import load_ecu, load_ecu_plugins from gallia.services.uds.core.service import NegativeResponse, UDSResponse from gallia.services.uds.ecu import ECU from gallia.services.uds.helpers import raise_for_error +logger = get_logger("gallia.base.udsscan") + class UDSScanner(Scanner): """UDSScanner is a baseclass, particularly for scanning tasks @@ -134,19 +137,17 @@ async def setup(self, args: Namespace) -> None: await self.db_handler.insert_scan_run(args.target.raw) self._apply_implicit_logging_setting() except Exception as e: - self.logger.warning( - f"Could not write the scan run to the database: {e:!r}" - ) + logger.warning(f"Could not write the scan run to the database: {e:!r}") if args.ecu_reset is not None: resp: UDSResponse = await self.ecu.ecu_reset(args.ecu_reset) if isinstance(resp, NegativeResponse): - self.logger.warning(f"ECUReset failed: {resp}") - self.logger.warning("Switching to default session") + logger.warning(f"ECUReset failed: {resp}") + logger.warning("Switching to default session") raise_for_error(await self.ecu.set_session(0x01)) resp = await self.ecu.ecu_reset(args.ecu_reset) if isinstance(resp, NegativeResponse): - self.logger.warning(f"ECUReset in session 0x01 failed: {resp}") + logger.warning(f"ECUReset in session 0x01 failed: {resp}") # Handles connecting to the target and waits # until it is ready. @@ -171,7 +172,7 @@ async def setup(self, args: Namespace) -> None: ) self._apply_implicit_logging_setting() except Exception as e: - self.logger.warning( + logger.warning( f"Could not write the properties_pre to the database: {e!r}" ) @@ -187,7 +188,7 @@ async def teardown(self, args: Namespace) -> None: prop_pre = json.loads(await file.read()) if args.compare_properties and await self.ecu.properties(False) != prop_pre: - self.logger.warning("ecu properties differ, please investigate!") + logger.warning("ecu properties differ, please investigate!") if self.db_handler is not None: try: @@ -195,9 +196,7 @@ async def teardown(self, args: Namespace) -> None: await self.ecu.properties(False) ) except Exception as e: - self.logger.warning( - f"Could not write the scan run to the database: {e!r}" - ) + logger.warning(f"Could not write the scan run to the database: {e!r}") if args.tester_present: await self.ecu.stop_cyclic_tester_present() @@ -226,6 +225,6 @@ async def setup(self, args: Namespace) -> None: try: await self.db_handler.insert_discovery_run(args.target.url.scheme) except Exception as e: - self.logger.warning( + logger.warning( f"Could not write the discovery run to the database: {e!r}" ) diff --git a/src/gallia/commands/discover/doip.py b/src/gallia/commands/discover/doip.py index 748c9f3a4..704acc512 100644 --- a/src/gallia/commands/discover/doip.py +++ b/src/gallia/commands/discover/doip.py @@ -13,6 +13,7 @@ import psutil from gallia.command import AsyncScript +from gallia.log import get_logger from gallia.services.uds.core.service import ( TesterPresentRequest, TesterPresentResponse, @@ -28,6 +29,8 @@ TimingAndCommunicationParameters, ) +logger = get_logger("gallia.discover.doip") + class DoIPDiscoverer(AsyncScript): """This script scans for active DoIP endpoints and automatically enumerates allowed @@ -78,11 +81,11 @@ async def main(self, args: Namespace) -> None: pass async def main2(self, args: Namespace) -> int: - self.logger.notice("[👋] Welcome to @realDoIP-Discovery powered by MoarMemes…") + logger.notice("[👋] Welcome to @realDoIP-Discovery powered by MoarMemes…") target = urlparse(args.target) if args.target is not None else None if target is not None and target.scheme != "doip": - self.logger.error("[🫣] --target must be doip://…") + logger.error("[🫣] --target must be doip://…") return 2 # Discover Hostname and Port @@ -93,18 +96,16 @@ async def main2(self, args: Namespace) -> int: and target.hostname is not None and target.port is not None ): - self.logger.notice( - "[📋] Skipping host/port discovery because given by --target" - ) + logger.notice("[📋] Skipping host/port discovery because given by --target") tgt_hostname = target.hostname tgt_port = target.port else: - self.logger.notice("[🔍] Discovering Host and Port via UDP Broadcast") + logger.notice("[🔍] Discovering Host and Port via UDP Broadcast") hosts = await self.run_udp_discovery() if len(hosts) != 1: - self.logger.error( + logger.error( "[🍃] Can only continue with a single DoIP host! Give me a --target!" ) return 11 @@ -115,12 +116,12 @@ async def main2(self, args: Namespace) -> int: rat_success: list[int] = [] rat_wrong_source: list[int] = [] if target is not None and "activation_type" in parse_qs(target.query): - self.logger.notice( + logger.notice( "[📋] Skipping RoutingActivationType discovery because given by --target" ) rat_success = [int(parse_qs(target.query)["activation_type"][0], 0)] else: - self.logger.notice("[🔍] Enumerating all RoutingActivationTypes") + logger.notice("[🔍] Enumerating all RoutingActivationTypes") ( rat_success, @@ -134,14 +135,14 @@ async def main2(self, args: Namespace) -> int: ) if len(rat_success) == 0 and len(rat_wrong_source) == 0: - self.logger.error( + logger.error( "[🥾] Damn son, didn't find a single routing activation type with unknown source?! OUTTA HERE!" ) return 10 # Discovering correct source address for suitable RoutingActivationRequests if target is not None and "src_addr" in parse_qs(target.query): - self.logger.notice( + logger.notice( "[📋] Skipping SourceAddress discovery because given by --target" ) targets = [ @@ -150,7 +151,7 @@ async def main2(self, args: Namespace) -> int: ] else: - self.logger.notice("[🔍] Enumerating all SourceAddresses") + logger.notice("[🔍] Enumerating all SourceAddresses") targets = await self.enumerate_source_addresses( tgt_hostname, tgt_port, @@ -158,19 +159,19 @@ async def main2(self, args: Namespace) -> int: ) if len(targets) != 1: - self.logger.error( + logger.error( f"[💣] I found {len(targets)} valid RoutingActivationType/SourceAddress combos, but can only continue with exactly one; choose your weapon with --target!" ) return 20 # Enumerate valid TargetAddresses if target is not None and "target_addr" in parse_qs(target.query): - self.logger.error( + logger.error( "[😵] Why do you give me a target_addr in --target? Am I useless to you??? GOODBYE!" ) return 3 - self.logger.notice( + logger.notice( f"[🔍] Enumerating all TargetAddresses from {args.start:#x} to {args.stop:#x}" ) @@ -188,7 +189,7 @@ async def main2(self, args: Namespace) -> int: args.timeout, ) - self.logger.notice("[🛩️] All done, thanks for flying with us!") + logger.notice("[🛩️] All done, thanks for flying with us!") return 0 async def enumerate_routing_activation_types( @@ -209,17 +210,17 @@ async def enumerate_routing_activation_types( 0xAFFE, ) except OSError as e: - self.logger.error(f"[🚨] Mr. Stark I don't feel so good: {e}") + logger.error(f"[🚨] Mr. Stark I don't feel so good: {e}") return rat_success, rat_wrong_source try: await conn.write_routing_activation_request(routing_activation_type) rat_success.append(routing_activation_type) - self.logger.info( + logger.info( f"[🤯] Holy moly, it actually worked for activation_type {routing_activation_type:#x} and src_addr {src_addr:#x}!!!" ) except DoIPRoutingActivationDeniedError as e: - self.logger.info( + logger.info( f"[🌟] splendid, {routing_activation_type:#x} yields {e.rac_code.name}" ) @@ -235,7 +236,7 @@ async def enumerate_routing_activation_types( finally: await conn.close() - self.logger.notice( + logger.notice( f"[💎] Look what RoutingActivationTypes I've found that are not 'unsupported': {', '.join([f'{x:#x}' for x in rat_not_unsupported])}" ) return rat_success, rat_wrong_source @@ -260,7 +261,7 @@ async def enumerate_target_addresses( # noqa: PLR0913 ) for target_addr in search_space: - self.logger.debug(f"[🚧] Attempting connection to {target_addr:#x}") + logger.debug(f"[🚧] Attempting connection to {target_addr:#x}") conn.target_addr = target_addr @@ -272,7 +273,7 @@ async def enumerate_target_addresses( # noqa: PLR0913 known_targets.append( f"doip://{tgt_hostname}:{tgt_port}?activation_type={correct_rat:#x}&src_addr={correct_src:#x}&target_addr={target_addr:#x}" ) - self.logger.notice( + logger.notice( f"[🥇] HEUREKA: target address {target_addr:#x} is valid! " ) async with aiofiles.open( @@ -280,7 +281,7 @@ async def enumerate_target_addresses( # noqa: PLR0913 ) as f: await f.write(f"{known_targets[-1]}\n") - self.logger.info(f"[⏳] Waiting for reply of target {target_addr:#x}") + logger.info(f"[⏳] Waiting for reply of target {target_addr:#x}") # Hardcoded loop to detect potential broadcasts while True: pot_broadcast, data = await asyncio.wait_for( @@ -293,7 +294,7 @@ async def enumerate_target_addresses( # noqa: PLR0913 if pot_broadcast is None: break - self.logger.notice( + logger.notice( f"[🤑] B-B-B-B-B-B-BROADCAST at TargetAddress {target_addr:#x}! Got reply from {pot_broadcast:#x}" ) async with aiofiles.open( @@ -304,7 +305,7 @@ async def enumerate_target_addresses( # noqa: PLR0913 ) resp = TesterPresentResponse.parse_static(data) - self.logger.notice( + logger.notice( f"[🥳] It cannot get nicer: {target_addr:#x} responded: {resp}" ) responsive_targets.append(known_targets[-1]) @@ -318,14 +319,10 @@ async def enumerate_target_addresses( # noqa: PLR0913 e.nack_code == DiagnosticMessageNegativeAckCodes.UnknownTargetAddress ): - self.logger.info( - f"[🫥] {target_addr:#x} is an unknown target address" - ) + logger.info(f"[🫥] {target_addr:#x} is an unknown target address") continue elif e.nack_code == DiagnosticMessageNegativeAckCodes.TargetUnreachable: - self.logger.info( - f"[💤] {target_addr:#x} is (currently?) unreachable" - ) + logger.info(f"[💤] {target_addr:#x} is (currently?) unreachable") unreachable_targets.append(known_targets[-1]) async with aiofiles.open( self.artifacts_dir.joinpath("5_unreachable_targets.txt"), "a" @@ -333,7 +330,7 @@ async def enumerate_target_addresses( # noqa: PLR0913 await f.write(f"{known_targets[-1]}\n") continue else: - self.logger.warning( + logger.warning( f"[🤷] {target_addr:#x} is behaving strangely: {e.nack_code.name}" ) async with aiofiles.open( @@ -343,7 +340,7 @@ async def enumerate_target_addresses( # noqa: PLR0913 continue except asyncio.TimeoutError: # This triggers when DoIP ACK but no UDS reply - self.logger.info( + logger.info( f"[🙊] Presumably no active ECU on target address {target_addr:#x}" ) async with aiofiles.open( @@ -354,9 +351,7 @@ async def enumerate_target_addresses( # noqa: PLR0913 except (ConnectionError, ConnectionResetError) as e: # Whenever this triggers, but sometimes connections are closed not by us - self.logger.warn( - f"[🫦] Sexy, but unexpected: {target_addr:#x} triggered {e}" - ) + logger.warn(f"[🫦] Sexy, but unexpected: {target_addr:#x} triggered {e}") async with aiofiles.open( self.artifacts_dir.joinpath("7_targets_with_errors.txt"), "a" ) as f: @@ -370,25 +365,25 @@ async def enumerate_target_addresses( # noqa: PLR0913 await conn.close() - self.logger.notice( + logger.notice( f"[⚔️] It's dangerous to test alone, take one of these {len(known_targets)} known targets:" ) for item in known_targets: - self.logger.notice(item) + logger.notice(item) - self.logger.notice( + logger.notice( f"[❓] Those {len(unreachable_targets)} targets were unreachable by the gateway (could be just temporary):" ) for item in unreachable_targets: - self.logger.notice(item) + logger.notice(item) - self.logger.notice( + logger.notice( f"[💰] For even more profit, try one of the {len(responsive_targets)} targets that actually responded:" ) for item in responsive_targets: - self.logger.notice(item) + logger.notice(item) - self.logger.notice( + logger.notice( f"[🧭] Check out the content of the log files at {self.artifacts_dir} as well!" ) @@ -408,12 +403,12 @@ async def create_DoIP_conn( # noqa: PLR0913 src_addr, target_addr, ) - self.logger.info("[📫] Sending RoutingActivationRequest") + logger.info("[📫] Sending RoutingActivationRequest") await conn.write_routing_activation_request( RoutingActivationRequestTypes(routing_activation_type) ) except Exception as e: # TODO this probably is too broad - self.logger.warning( + logger.warning( f"[🫨] Got me some good errors when it should be working (dis an infinite loop): {e}" ) continue @@ -425,12 +420,12 @@ async def read_diag_request_custom( while True: hdr, payload = await conn.read_frame() if not isinstance(payload, DiagnosticMessage): - self.logger.warning(f"[🧨] Unexpected DoIP message: {hdr} {payload}") + logger.warning(f"[🧨] Unexpected DoIP message: {hdr} {payload}") return None, b"" if payload.SourceAddress != conn.target_addr: return payload.SourceAddress, payload.UserData if payload.TargetAddress != conn.src_addr: - self.logger.warning( + logger.warning( f"[🤌] You talking to me?! Unexpected DoIP target address: {payload.TargetAddress:#04x}" ) continue @@ -456,13 +451,13 @@ async def enumerate_source_addresses( 0xAFFE, ) except OSError as e: - self.logger.error(f"[🚨] Mr. Stark I don't feel so good: {e}") + logger.error(f"[🚨] Mr. Stark I don't feel so good: {e}") return [] try: await conn.write_routing_activation_request(routing_activation_type) except DoIPRoutingActivationDeniedError as e: - self.logger.info( + logger.info( f"[🌟] splendid, {source_address:#x} yields {e.rac_code.name}" ) @@ -480,7 +475,7 @@ async def enumerate_source_addresses( finally: await conn.close() - self.logger.info( + logger.info( f"[🤯] Holy moly, it actually worked for activation_type {routing_activation_type:#x} and src_addr {source_address:#x}!!!" ) known_sourceAddresses.append(source_address) @@ -493,15 +488,15 @@ async def enumerate_source_addresses( await f.write(f"{targets[-1]}\n") # Print valid SourceAddresses and suitable target string for config - self.logger.notice( + logger.notice( f"[💀] Look what SourceAddresses got denied: {', '.join([f'{x:#x}' for x in denied_sourceAddresses])}" ) - self.logger.notice( + logger.notice( f"[💎] Look what valid SourceAddresses I've found: {', '.join([f'{x:#x}' for x in known_sourceAddresses])}" ) - self.logger.notice("[⚔️] It's dangerous to test alone, take one of these:") + logger.notice("[⚔️] It's dangerous to test alone, take one of these:") for item in targets: - self.logger.notice(item) + logger.notice(item) return targets async def run_udp_discovery(self) -> list[tuple[str, int]]: @@ -516,7 +511,7 @@ async def run_udp_discovery(self) -> list[tuple[str, int]]: all_ips.append(ip) for ip in all_ips: - self.logger.info( + logger.info( f"[💌] Sending DoIP VehicleIdentificationRequest to {ip.broadcast}" ) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) @@ -528,7 +523,7 @@ async def run_udp_discovery(self) -> list[tuple[str, int]]: try: data, addr = sock.recvfrom(1024) except TimeoutError: - self.logger.info("[💔] no response") + logger.info("[💔] no response") continue finally: sock.close() @@ -536,15 +531,15 @@ async def run_udp_discovery(self) -> list[tuple[str, int]]: # Hardcoded slices vin = data[8 : 8 + 17] target_addr = int.from_bytes(data[25:27], "big") - self.logger.notice( + logger.notice( f"[💝]: {addr} responded with VIN {vin.decode('ascii')} and target_addr {target_addr:#x}" ) found.append(addr) - self.logger.notice("[💎] Look what valid hosts I've found:") + logger.notice("[💎] Look what valid hosts I've found:") for item in found: url = f"doip://{item[0]}:{item[1]}" - self.logger.notice(url) + logger.notice(url) async with aiofiles.open( self.artifacts_dir.joinpath("0_valid_hosts.txt"), "a" ) as f: diff --git a/src/gallia/commands/discover/find_xcp.py b/src/gallia/commands/discover/find_xcp.py index bb903a2cf..e1d4e61ed 100644 --- a/src/gallia/commands/discover/find_xcp.py +++ b/src/gallia/commands/discover/find_xcp.py @@ -9,10 +9,13 @@ from gallia.command import AsyncScript from gallia.config import Config +from gallia.log import get_logger from gallia.services.uds.core.utils import bytes_repr, g_repr from gallia.transports import RawCANTransport, TargetURI from gallia.utils import auto_int, can_id_repr +logger = get_logger("gallia.discover.xcp") + class FindXCP(AsyncScript): """Find XCP Slave""" @@ -105,12 +108,12 @@ def configure_parser(self) -> None: def pack_xcp_eth(self, data: bytes, ctr: int = 0) -> bytes: length = len(data) data = struct.pack(" tuple[int, int, bytes]: length, ctr = struct.unpack_from(" None: @@ -131,34 +134,34 @@ async def test_tcp(self, args: Namespace) -> None: endpoints = [] for port in args.tcp_ports.split(","): port = int(port, 0) # noqa - self.logger.info(f"Testing TCP port: {port}") + logger.info(f"Testing TCP port: {port}") server = (args.xcp_ip, port) self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.settimeout(0.2) try: self.socket.connect(server) except Exception as e: - self.logger.info(f"Connect: {g_repr(e)} on TCP port {port}") + logger.info(f"Connect: {g_repr(e)} on TCP port {port}") continue try: self.socket.send(self.pack_xcp_eth(data)) _, _, data_ret = self.unpack_xcp_eth(self.socket.recv(1024)) ret = bytes_repr(data_ret) - self.logger.info(f"Receive data on TCP port {port}: {ret}") + logger.info(f"Receive data on TCP port {port}: {ret}") if len(data_ret) > 0 and data_ret[0] == 0xFF: - self.logger.result(f"XCP Slave on TCP port {port}, data: {ret}") + logger.result(f"XCP Slave on TCP port {port}, data: {ret}") endpoints.append(port) else: - self.logger.info(f"TCP port {port} is no XCP slave, data: {ret}") + logger.info(f"TCP port {port} is no XCP slave, data: {ret}") except Exception as e: - self.logger.info(f"send/recv: {g_repr(e)} on TCP port {port:d}") + logger.info(f"send/recv: {g_repr(e)} on TCP port {port:d}") continue self.xcp_disconnect(server) self.socket.close() - self.logger.result(f"Finished; Found {len(endpoints)} XCP endpoints via TCP") + logger.result(f"Finished; Found {len(endpoints)} XCP endpoints via TCP") def xcp_disconnect(self, server: tuple[str, int]) -> None: try: @@ -174,7 +177,7 @@ async def test_udp(self, args: Namespace) -> None: endpoints = [] for port in args.udp_ports.split(","): port = int(port, 0) # noqa - self.logger.info(f"Testing UDP port: {port}") + logger.info(f"Testing UDP port: {port}") self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.socket.settimeout(0.5) server = (args.xcp_ip, port) @@ -182,20 +185,20 @@ async def test_udp(self, args: Namespace) -> None: try: _, _, data_ret = self.unpack_xcp_eth(self.socket.recv(1024)) ret = bytes_repr(data_ret) - self.logger.info(f"Receive data on TCP port {port}: {ret}") + logger.info(f"Receive data on TCP port {port}: {ret}") if len(data_ret) > 0 and data_ret[0] == 0xFF: - self.logger.result(f"XCP Slave on UDP port {port}, data: {ret}") + logger.result(f"XCP Slave on UDP port {port}, data: {ret}") endpoints.append(port) else: - self.logger.info(f"UDP port {port} is no XCP slave, data: {ret}") + logger.info(f"UDP port {port} is no XCP slave, data: {ret}") except socket.timeout: - self.logger.info(f"Timeout on UDP port {port}") + logger.info(f"Timeout on UDP port {port}") self.xcp_disconnect(server) self.socket.close() - self.logger.result(f"Finished; Found {len(endpoints)} XCP endpoints via UDP") + logger.result(f"Finished; Found {len(endpoints)} XCP endpoints via UDP") async def test_can(self, args: Namespace) -> None: target = TargetURI( @@ -206,15 +209,15 @@ async def test_can(self, args: Namespace) -> None: endpoints = [] sniff_time: int = args.sniff_time - self.logger.result(f"Listening to idle bus communication for {sniff_time}s...") + logger.result(f"Listening to idle bus communication for {sniff_time}s...") addr_idle = await transport.get_idle_traffic(sniff_time) - self.logger.result(f"Found {len(addr_idle)} CAN Addresses on idle Bus") + logger.result(f"Found {len(addr_idle)} CAN Addresses on idle Bus") transport.set_filter(addr_idle, inv_filter=True) # flush receive queue await transport.get_idle_traffic(2) for can_id in range(args.can_id_start, args.can_id_end + 1): - self.logger.info(f"Testing CAN ID: {can_id_repr(can_id)}") + logger.info(f"Testing CAN ID: {can_id_repr(can_id)}") pdu = bytes([0xFF, 0x00]) await transport.sendto(pdu, can_id, timeout=0.1) @@ -226,23 +229,23 @@ async def test_can(self, args: Namespace) -> None: f"Found XCP endpoint [master:slave]: CAN: {can_id_repr(master)}:{can_id_repr(can_id)} " f"data: {bytes_repr(data)}" ) - self.logger.result(msg) + logger.result(msg) endpoints.append((can_id, master)) else: - self.logger.info( + logger.info( f"Received non XCP answer for CAN-ID {can_id_repr(can_id)}: {can_id_repr(master)}:" f"{bytes_repr(data)}" ) except asyncio.TimeoutError: pass - self.logger.result(f"Finished; Found {len(endpoints)} XCP endpoints via CAN") + logger.result(f"Finished; Found {len(endpoints)} XCP endpoints via CAN") def test_eth_broadcast(self, args: Namespace) -> None: # TODO: rewrite as async multicast_group = ("239.255.0.0", 5556) - self.logger.result( + logger.result( f"Discover XCP via multicast group: {multicast_group[0]}:{multicast_group[1]}" ) @@ -250,7 +253,7 @@ def test_eth_broadcast(self, args: Namespace) -> None: self.socket.connect((args.xcp_ip, 5555)) addr = self.socket.getsockname()[0] self.socket.close() - self.logger.info(f"xcp interface ip for multicast group: {addr}") + logger.info(f"xcp interface ip for multicast group: {addr}") self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.socket.setsockopt( @@ -268,11 +271,11 @@ def test_eth_broadcast(self, args: Namespace) -> None: if not data: break - self.logger.result(f"Found XCP slave: {slave} {bytes_repr(data)}") + logger.result(f"Found XCP slave: {slave} {bytes_repr(data)}") endpoints.append(slave) except socket.timeout: - self.logger.info("Timeout") + logger.info("Timeout") - self.logger.result( + logger.result( f"Finished; Found {len(endpoints)} XCP endpoints via multicast group" ) diff --git a/src/gallia/commands/discover/uds/isotp.py b/src/gallia/commands/discover/uds/isotp.py index e9d7656a4..44a37d8b9 100644 --- a/src/gallia/commands/discover/uds/isotp.py +++ b/src/gallia/commands/discover/uds/isotp.py @@ -7,11 +7,14 @@ from binascii import unhexlify from gallia.command import UDSDiscoveryScanner +from gallia.log import get_logger from gallia.services.uds import NegativeResponse, UDSClient, UDSRequest from gallia.services.uds.core.utils import g_repr from gallia.transports import ISOTPTransport, RawCANTransport, TargetURI from gallia.utils import auto_int, can_id_repr, write_target_list +logger = get_logger("gallia.discover.isotp") + class IsotpDiscoverer(UDSDiscoveryScanner): """Discovers all UDS endpoints on an ECU using ISO-TP normal addressing. @@ -98,22 +101,22 @@ async def setup(self, args: Namespace) -> None: await super().setup(args) async def query_description(self, target_list: list[TargetURI], did: int) -> None: - self.logger.info("reading info DID from all discovered endpoints") + logger.info("reading info DID from all discovered endpoints") for target in target_list: - self.logger.result("----------------------------") - self.logger.result(f"Probing ECU: {target}") + logger.result("----------------------------") + logger.result(f"Probing ECU: {target}") transport = await ISOTPTransport.connect(target) uds_client = UDSClient(transport, timeout=2) - self.logger.result(f"reading device description at {g_repr(did)}") + logger.result(f"reading device description at {g_repr(did)}") try: resp = await uds_client.read_data_by_identifier(did) if isinstance(resp, NegativeResponse): - self.logger.result(f"could not read did: {resp}") + logger.result(f"could not read did: {resp}") else: - self.logger.result(f"response was: {resp}") + logger.result(f"response was: {resp}") except Exception as e: - self.logger.result(f"reading description failed: {e!r}") + logger.result(f"reading description failed: {e!r}") def _build_isotp_frame_extended( self, @@ -154,10 +157,10 @@ async def main(self, args: Namespace) -> None: found = [] sniff_time: int = args.sniff_time - self.logger.result(f"Recording idle bus communication for {sniff_time}s") + logger.result(f"Recording idle bus communication for {sniff_time}s") addr_idle = await transport.get_idle_traffic(sniff_time) - self.logger.result(f"Found {len(addr_idle)} CAN Addresses on idle Bus") + logger.result(f"Found {len(addr_idle)} CAN Addresses on idle Bus") transport.set_filter(addr_idle, inv_filter=True) req = UDSRequest.parse_dynamic(args.pdu) @@ -170,14 +173,14 @@ async def main(self, args: Namespace) -> None: if args.extended_addr: pdu = self.build_isotp_frame(req, ID, padding=args.padding) - self.logger.info(f"Testing ID {can_id_repr(ID)}") + logger.info(f"Testing ID {can_id_repr(ID)}") is_broadcast = False await transport.sendto(pdu, timeout=0.1, dst=dst_addr) try: addr, _ = await transport.recvfrom(timeout=0.1) if addr == ID: - self.logger.info( + logger.info( f"The same CAN ID {can_id_repr(ID)} answered. Skipping…" ) continue @@ -191,22 +194,22 @@ async def main(self, args: Namespace) -> None: new_addr, _ = await transport.recvfrom(timeout=0.1) if new_addr != addr: is_broadcast = True - self.logger.result( + logger.result( f"seems that broadcast was triggered on CAN ID {can_id_repr(ID)}, " f"got answer from {can_id_repr(new_addr)}" ) else: - self.logger.info( + logger.info( f"seems like a large ISO-TP packet was received on CAN ID {can_id_repr(ID)}" ) except asyncio.TimeoutError: if is_broadcast: - self.logger.result( + logger.result( f"seems that broadcast was triggered on CAN ID {can_id_repr(ID)}, " f"got answer from {can_id_repr(addr)}" ) else: - self.logger.result( + logger.result( f"found endpoint on CAN ID [src:dst]: {can_id_repr(ID)}:{can_id_repr(addr)}" ) target_args = {} @@ -238,9 +241,9 @@ async def main(self, args: Namespace) -> None: found.append(target) break - self.logger.result(f"finished; found {len(found)} UDS endpoints") + logger.result(f"finished; found {len(found)} UDS endpoints") ecus_file = self.artifacts_dir.joinpath("ECUs.txt") - self.logger.result(f"Writing urls to file: {ecus_file}") + logger.result(f"Writing urls to file: {ecus_file}") await write_target_list(ecus_file, found, self.db_handler) if args.query: diff --git a/src/gallia/commands/fuzz/uds/pdu.py b/src/gallia/commands/fuzz/uds/pdu.py index 36504eebe..2585cdf36 100644 --- a/src/gallia/commands/fuzz/uds/pdu.py +++ b/src/gallia/commands/fuzz/uds/pdu.py @@ -8,6 +8,7 @@ from argparse import Namespace from gallia.command import UDSScanner +from gallia.log import get_logger from gallia.services.uds.core.client import UDSRequestConfig from gallia.services.uds.core.constants import UDSErrorCodes, UDSIsoServices from gallia.services.uds.core.exception import IllegalResponse @@ -16,6 +17,8 @@ from gallia.transports import RawCANTransport, TargetURI from gallia.utils import auto_int +logger = get_logger("gallia.fuzz.uds") + class PDUFuzzer(UDSScanner): """Payload fuzzer""" @@ -94,26 +97,26 @@ async def observe_can_messages(self, can_ids: list[int], args: Namespace) -> Non try: can_msgs: dict[int, bytes] = {} - self.logger.debug("Started observe messages task") + logger.debug("Started observe messages task") while True: try: can_id, msg = await transport.recvfrom(timeout=1) if can_id in can_msgs: if msg != can_msgs[can_id]: - self.logger.result( + logger.result( f"Message for {can_id:03x} changed to {msg.hex()}" ) can_msgs[can_id] = msg else: can_msgs[can_id] = msg - self.logger.result( + logger.result( f"Observed new message from {can_id:03x}: {msg.hex()}" ) except asyncio.TimeoutError: continue except asyncio.CancelledError: - self.logger.debug("Can message observer task cancelled") + logger.debug("Can message observer task cancelled") async def main(self, args: Namespace) -> None: if args.observe_can_ids: @@ -121,7 +124,7 @@ async def main(self, args: Namespace) -> None: self.observe_can_messages(args.observe_can_ids, args) ) - self.logger.info(f"testing sessions {args.sessions}") + logger.info(f"testing sessions {args.sessions}") for did in args.dids: if args.serviceid == UDSIsoServices.RoutineControl: @@ -129,15 +132,15 @@ async def main(self, args: Namespace) -> None: elif args.serviceid == UDSIsoServices.WriteDataByIdentifier: pdu = bytes([args.serviceid, did >> 8, did & 0xFF]) for session in args.sessions: - self.logger.notice(f"Switching to session 0x{session:02x}") + logger.notice(f"Switching to session 0x{session:02x}") resp: UDSResponse = await self.ecu.set_session(session) if isinstance(resp, NegativeResponse): - self.logger.warning( + logger.warning( f"Switching to session 0x{session:02x} failed: {resp}" ) continue - self.logger.result(f"Starting scan in session: 0x{session:02x}") + logger.result(f"Starting scan in session: 0x{session:02x}") positive_DIDs = 0 negative_responses: dict[UDSErrorCodes, int] = {} timeout_DIDs = 0 @@ -154,44 +157,44 @@ async def main(self, args: Namespace) -> None: if isinstance(resp, NegativeResponse): if not suggests_identifier_not_supported(resp): - self.logger.result(f"0x{did:0x}: {resp}") + logger.result(f"0x{did:0x}: {resp}") else: - self.logger.info(f"0x{did:0x}: {resp}") + logger.info(f"0x{did:0x}: {resp}") if resp.response_code in negative_responses: negative_responses[resp.response_code] += 1 else: negative_responses[resp.response_code] = 1 else: - self.logger.result(f"0x{did:0x}: {resp}") + logger.result(f"0x{did:0x}: {resp}") positive_DIDs += 1 except asyncio.TimeoutError: - self.logger.warning(f"0x{did :0x}: Retries exceeded") + logger.warning(f"0x{did :0x}: Retries exceeded") timeout_DIDs += 1 except IllegalResponse as e: - self.logger.warning(f"{repr(e)}") + logger.warning(f"{repr(e)}") illegal_resp += 1 # Temporary patch: Exception handler is deleted when it goes productive except ConnectionError: - self.logger.warning( + logger.warning( "isotp flow control frame missing. Reconnecting…" ) flow_control_miss += 1 await self.ecu.reconnect() - self.logger.result(f"Scan in session 0x{session:0x} is complete!") + logger.result(f"Scan in session 0x{session:0x} is complete!") for k, v in negative_responses.items(): - self.logger.result(f"{k.name}: {v}") + logger.result(f"{k.name}: {v}") - self.logger.result(f"Positive replies: {positive_DIDs}") + logger.result(f"Positive replies: {positive_DIDs}") - self.logger.result(f"Timeouts: {timeout_DIDs}") - self.logger.result(f"Illegal replies: {illegal_resp}") - self.logger.result(f"Flow control frames missing: {flow_control_miss}") + logger.result(f"Timeouts: {timeout_DIDs}") + logger.result(f"Illegal replies: {illegal_resp}") + logger.result(f"Flow control frames missing: {flow_control_miss}") - self.logger.info(f"Leaving session 0x{session:02x} via hook") + logger.info(f"Leaving session 0x{session:02x} via hook") await self.ecu.leave_session(session) if args.observe_can_ids: diff --git a/src/gallia/commands/primitive/uds/dtc.py b/src/gallia/commands/primitive/uds/dtc.py index ba49378b4..7d755d7ba 100644 --- a/src/gallia/commands/primitive/uds/dtc.py +++ b/src/gallia/commands/primitive/uds/dtc.py @@ -9,6 +9,7 @@ from tabulate import tabulate from gallia.command import UDSScanner +from gallia.log import get_logger from gallia.services.uds.core.constants import ( CDTCSSubFuncs, DiagnosticSessionControlSubFuncs, @@ -18,6 +19,8 @@ from gallia.services.uds.core.utils import g_repr from gallia.utils import auto_int +logger = get_logger("gallia.primitive.dtc") + class DTCPrimitive(UDSScanner): """Read out the Diagnostic Troube Codes (DTC)""" @@ -96,23 +99,21 @@ async def fetch_error_codes(self, mask: int, split: bool = True) -> dict[int, in if isinstance(ecu_response, NegativeResponse): if ecu_response.response_code == UDSErrorCodes.responseTooLong: - self.logger.error( + logger.error( f"There are too many codes for (sub)mask {mask}. Consider setting --mask " f"with a parameter that excludes one or more of the corresponding bits." ) if split: - self.logger.warning("Trying to fetch the error codes iteratively.") + logger.warning("Trying to fetch the error codes iteratively.") for i in range(8): sub_mask = mask & 2**i if sub_mask > 0: - self.logger.info( - f"Trying to fetch with mask {g_repr(sub_mask)}" - ) + logger.info(f"Trying to fetch with mask {g_repr(sub_mask)}") dtcs.update(await self.fetch_error_codes(sub_mask, False)) else: - self.logger.critical( + logger.critical( f"Could not fetch error codes: {ecu_response}; exiting…" ) sys.exit(1) @@ -138,25 +139,25 @@ async def read(self, args: Namespace) -> None: # if any kind of test failure if error_state & 0xAF: - self.logger.warning(raw_output) + logger.warning(raw_output) failed_dtcs.append(table_output) # if not failed but also not completed yet (i.e. not yet in this cycle or since last clear) elif error_state & 0x50: - self.logger.result(raw_output) + logger.result(raw_output) uncompleted_dtcs.append(table_output) if args.show_legend: - self.logger.result("") + logger.result("") self.show_bit_legend() if args.show_failed: - self.logger.result("") - self.logger.result("Failed codes:") + logger.result("") + logger.result("Failed codes:") self.show_summary(failed_dtcs) if args.show_uncompleted: - self.logger.result("") - self.logger.result("Uncompleted codes:") + logger.result("") + logger.result("Uncompleted codes:") self.show_summary(uncompleted_dtcs) def show_bit_legend(self) -> None: @@ -174,7 +175,7 @@ def show_bit_legend(self) -> None: for line in ( tabulate([[d] for d in bit_descriptions], headers=["bit descriptions"]) ).splitlines(): - self.logger.result(line) + logger.result(line) def show_summary(self, dtcs: list[list[str]]) -> None: dtcs.sort() @@ -193,7 +194,7 @@ def show_summary(self, dtcs: list[list[str]]) -> None: ] for line in tabulate(dtcs, headers=header, tablefmt="fancy_grid").splitlines(): - self.logger.result(line) + logger.result(line) async def clear(self, args: Namespace) -> None: group_of_dtc: int = args.group_of_dtc @@ -202,16 +203,16 @@ async def clear(self, args: Namespace) -> None: max_group_of_dtc = 0xFFFFFF if not min_group_of_dtc <= group_of_dtc <= max_group_of_dtc: - self.logger.error( + logger.error( f"The parameter group_of_dtc must be in the range {g_repr(min_group_of_dtc)}-{g_repr(max_group_of_dtc)}" ) resp = await self.ecu.clear_diagnostic_information(group_of_dtc) if isinstance(resp, NegativeResponse): - self.logger.error(resp) + logger.error(resp) else: - self.logger.result("Success") + logger.result("Success") async def control(self, args: Namespace) -> None: if args.stop: @@ -229,5 +230,5 @@ async def main(self, args: Namespace) -> None: elif args.cmd == "read": await self.read(args) else: - self.logger.critical("Unhandled command") + logger.critical("Unhandled command") sys.exit(1) diff --git a/src/gallia/commands/primitive/uds/ecu_reset.py b/src/gallia/commands/primitive/uds/ecu_reset.py index a4672eb18..94fed60e0 100644 --- a/src/gallia/commands/primitive/uds/ecu_reset.py +++ b/src/gallia/commands/primitive/uds/ecu_reset.py @@ -6,10 +6,13 @@ from argparse import Namespace from gallia.command import UDSScanner +from gallia.log import get_logger from gallia.services.uds import NegativeResponse, UDSResponse from gallia.services.uds.core.utils import g_repr from gallia.utils import auto_int +logger = get_logger("gallia.primitive.reset") + class ECUResetPrimitive(UDSScanner): """Use the ECUReset UDS service to reset the ECU""" @@ -38,21 +41,21 @@ def configure_parser(self) -> None: async def main(self, args: Namespace) -> None: resp: UDSResponse = await self.ecu.set_session(args.session) if isinstance(resp, NegativeResponse): - self.logger.error(f"could not change to session: {g_repr(args.session)}") + logger.error(f"could not change to session: {g_repr(args.session)}") return try: - self.logger.info(f"try sub-func: {g_repr(args.subfunc)}") + logger.info(f"try sub-func: {g_repr(args.subfunc)}") resp = await self.ecu.ecu_reset(args.subfunc) if isinstance(resp, NegativeResponse): msg = f"ECU Reset {g_repr(args.subfunc)} failed in session: {g_repr(args.session)}: {resp}" - self.logger.error(msg) + logger.error(msg) else: - self.logger.result(f"ECU Reset {g_repr(args.subfunc)} succeeded") + logger.result(f"ECU Reset {g_repr(args.subfunc)} succeeded") except asyncio.TimeoutError: - self.logger.error("Timeout") + logger.error("Timeout") await asyncio.sleep(10) except ConnectionError: msg = f"Lost connection to ECU, session: {g_repr(args.session)} subFunc: {g_repr(args.subfunc)}" - self.logger.error(msg) + logger.error(msg) return diff --git a/src/gallia/commands/primitive/uds/iocbi.py b/src/gallia/commands/primitive/uds/iocbi.py index 924fd6eb3..a277741a5 100644 --- a/src/gallia/commands/primitive/uds/iocbi.py +++ b/src/gallia/commands/primitive/uds/iocbi.py @@ -7,10 +7,13 @@ from argparse import Namespace from gallia.command import UDSScanner +from gallia.log import get_logger from gallia.services.uds import NegativeResponse from gallia.services.uds.core.utils import g_repr from gallia.utils import auto_int +logger = get_logger("gallia.primitive.iocbi") + class IOCBIPrimitive(UDSScanner): """Input output control""" @@ -68,7 +71,7 @@ async def main(self, args: Namespace) -> None: try: await self.ecu.check_and_set_session(args.session) except Exception as e: - self.logger.critical( + logger.critical( f"Could not change to session: {g_repr(args.session)}: {e!r}" ) sys.exit(1) @@ -105,17 +108,17 @@ async def main(self, args: Namespace) -> None: ) uses_control_parameter = False else: - self.logger.critical("Unhandled control parameter") + logger.critical("Unhandled control parameter") sys.exit(1) if isinstance(resp, NegativeResponse): - self.logger.error(resp) + logger.error(resp) else: - self.logger.result("Positive response:") + logger.result("Positive response:") data = ( resp.control_status_record[1:] if uses_control_parameter else resp.control_status_record ) - self.logger.result(f"hex: {data.hex()}") - self.logger.result(f"raw: {repr(data)}") + logger.result(f"hex: {data.hex()}") + logger.result(f"raw: {repr(data)}") diff --git a/src/gallia/commands/primitive/uds/ping.py b/src/gallia/commands/primitive/uds/ping.py index fc5f6e2d1..22de03b8c 100644 --- a/src/gallia/commands/primitive/uds/ping.py +++ b/src/gallia/commands/primitive/uds/ping.py @@ -7,9 +7,12 @@ from argparse import Namespace from gallia.command import UDSScanner +from gallia.log import get_logger from gallia.services.uds.core.service import NegativeResponse from gallia.utils import auto_int +logger = get_logger("gallia.primitive.ping") + class PingPrimitive(UDSScanner): """Ping ECU via TesterPresent""" @@ -41,7 +44,7 @@ def configure_parser(self) -> None: async def main(self, args: Namespace) -> None: resp = await self.ecu.set_session(args.session) if isinstance(resp, NegativeResponse): - self.logger.error(f"Could not change to requested session: {resp}") + logger.error(f"Could not change to requested session: {resp}") sys.exit(1) i = 1 @@ -50,7 +53,7 @@ async def main(self, args: Namespace) -> None: break ret = await self.ecu.ping() if isinstance(ret, NegativeResponse): - self.logger.warning(ret) - self.logger.result("ECU is alive!") + logger.warning(ret) + logger.result("ECU is alive!") await asyncio.sleep(args.interval) i += 1 diff --git a/src/gallia/commands/primitive/uds/read_by_identifier.py b/src/gallia/commands/primitive/uds/read_by_identifier.py index 8fa4379c4..323a8aa71 100644 --- a/src/gallia/commands/primitive/uds/read_by_identifier.py +++ b/src/gallia/commands/primitive/uds/read_by_identifier.py @@ -6,9 +6,12 @@ from argparse import Namespace from gallia.command import UDSScanner +from gallia.log import get_logger from gallia.services.uds.core.service import NegativeResponse from gallia.utils import auto_int +logger = get_logger("gallia.primitive.rdbi") + class ReadByIdentifierPrimitive(UDSScanner): """Read data via the ReadDataByIdentifier service""" @@ -38,18 +41,18 @@ async def main(self, args: Namespace) -> None: if args.session != 0x01: await self.ecu.set_session(args.session) except Exception as e: - self.logger.critical(f"fatal error: {e!r}") + logger.critical(f"fatal error: {e!r}") sys.exit(1) resp = await self.ecu.read_data_by_identifier(args.data_id) if isinstance(resp, NegativeResponse): - self.logger.error(resp) + logger.error(resp) else: - self.logger.info("Positive response:") + logger.info("Positive response:") data = resp.data_record - self.logger.info(f"hex: {data.hex()}") - self.logger.info(f"raw: {repr(data)}") - self.logger.result( + logger.info(f"hex: {data.hex()}") + logger.info(f"raw: {repr(data)}") + logger.result( f"{self.ecu.transport.target} responds to {args.data_id:#06x} with {data.hex()}" ) self.result = data diff --git a/src/gallia/commands/primitive/uds/read_error_log.py b/src/gallia/commands/primitive/uds/read_error_log.py index 5c077e937..ba599b1c9 100644 --- a/src/gallia/commands/primitive/uds/read_error_log.py +++ b/src/gallia/commands/primitive/uds/read_error_log.py @@ -6,10 +6,13 @@ from argparse import Namespace from gallia.command import UDSScanner +from gallia.log import get_logger from gallia.services.uds import NegativeResponse from gallia.services.uds.core.utils import g_repr from gallia.utils import auto_int +logger = get_logger("gallia.primitive.read-dtc") + class ReadErrorLogPrimitive(UDSScanner): """Read the error log via the DTC service""" @@ -39,21 +42,21 @@ async def main(self, args: Namespace) -> None: sessions = list(range(1, 0x80)) sessions = await self.ecu.find_sessions(sessions) msg = f"Found {len(sessions)} sessions: {g_repr(sessions)}" - self.logger.result(msg) + logger.result(msg) for sess in sessions: await self.ecu.set_session(sess) resp = await self.ecu.read_dtc() if isinstance(resp, NegativeResponse): - self.logger.warning(resp) + logger.warning(resp) else: - self.logger.result(resp.dtc_and_status_record) + logger.result(resp.dtc_and_status_record) await self.ecu.leave_session(sess) if args.clear_dtc: await self.ecu.clear_dtc() await self.ecu.read_dtc() - self.logger.info("Rebooting ECU...") + logger.info("Rebooting ECU...") await self.ecu.ecu_reset(1) await asyncio.sleep(2) await self.ecu.read_dtc() diff --git a/src/gallia/commands/primitive/uds/rmba.py b/src/gallia/commands/primitive/uds/rmba.py index 12b954005..880b996c1 100644 --- a/src/gallia/commands/primitive/uds/rmba.py +++ b/src/gallia/commands/primitive/uds/rmba.py @@ -6,10 +6,13 @@ from argparse import Namespace from gallia.command import UDSScanner +from gallia.log import get_logger from gallia.services.uds import NegativeResponse from gallia.services.uds.core.utils import g_repr from gallia.utils import auto_int +logger = get_logger("gallia.primitive.rmba") + class RMBAPrimitive(UDSScanner): """Read memory by address""" @@ -42,7 +45,7 @@ async def main(self, args: Namespace) -> None: try: await self.ecu.check_and_set_session(args.session) except Exception as e: - self.logger.critical( + logger.critical( f"Could not change to session: {g_repr(args.session)}: {e!r}" ) sys.exit(1) @@ -50,9 +53,9 @@ async def main(self, args: Namespace) -> None: resp = await self.ecu.read_memory_by_address(args.address, args.length) if isinstance(resp, NegativeResponse): - self.logger.error(resp) + logger.error(resp) else: - self.logger.result("Positive response:") + logger.result("Positive response:") - self.logger.result(f"hex: {resp.data_record.hex()}") - self.logger.result(f"raw: {repr(resp.data_record)}") + logger.result(f"hex: {resp.data_record.hex()}") + logger.result(f"raw: {repr(resp.data_record)}") diff --git a/src/gallia/commands/primitive/uds/rtcl.py b/src/gallia/commands/primitive/uds/rtcl.py index 520edc816..42aa0593b 100644 --- a/src/gallia/commands/primitive/uds/rtcl.py +++ b/src/gallia/commands/primitive/uds/rtcl.py @@ -8,11 +8,14 @@ from argparse import Namespace from gallia.command import UDSScanner +from gallia.log import get_logger from gallia.services.uds import NegativeResponse from gallia.services.uds.core.service import RoutineControlResponse from gallia.services.uds.core.utils import g_repr from gallia.utils import auto_int +logger = get_logger("gallia.primitive.rtcl") + class RTCLPrimitive(UDSScanner): """Start or stop a provided routine or request its results""" @@ -91,13 +94,13 @@ async def main(self, args: Namespace) -> None: try: await self.ecu.check_and_set_session(args.session) except Exception as e: - self.logger.critical( + logger.critical( f"Could not change to session: {g_repr(args.session)}: {e!r}" ) sys.exit(1) if args.start is False and args.stop is False and args.results is False: - self.logger.warning("No instructions were given (start/stop/results)") + logger.warning("No instructions were given (start/stop/results)") if args.start: resp: ( @@ -107,17 +110,17 @@ async def main(self, args: Namespace) -> None: ) if isinstance(resp, NegativeResponse): - self.logger.error(f"start_routine: {resp}") + logger.error(f"start_routine: {resp}") else: - self.logger.result("[start] Positive response:") - self.logger.result(f"hex: {resp.routine_status_record.hex()}") - self.logger.result(f"raw: {resp.routine_status_record!r}") + logger.result("[start] Positive response:") + logger.result(f"hex: {resp.routine_status_record.hex()}") + logger.result(f"raw: {resp.routine_status_record!r}") if args.stop: delay = args.stop_delay if delay > 0: - self.logger.info( + logger.info( f"Delaying the request for stopping the routine by {delay} seconds" ) await asyncio.sleep(delay) @@ -127,17 +130,17 @@ async def main(self, args: Namespace) -> None: ) if isinstance(resp, NegativeResponse): - self.logger.error(f"stop routine: {resp}") + logger.error(f"stop routine: {resp}") else: - self.logger.result("[stop] Positive response:") - self.logger.result(f"hex: {resp.routine_status_record.hex()}") - self.logger.result(f"raw: {resp.routine_status_record!r}") + logger.result("[stop] Positive response:") + logger.result(f"hex: {resp.routine_status_record.hex()}") + logger.result(f"raw: {resp.routine_status_record!r}") if args.results: delay = args.results_delay if delay > 0: - self.logger.info( + logger.info( f"Delaying the request for the routine results by {delay} seconds" ) await asyncio.sleep(delay) @@ -147,8 +150,8 @@ async def main(self, args: Namespace) -> None: ) if isinstance(resp, NegativeResponse): - self.logger.error(f"request_routine_results: {resp}") + logger.error(f"request_routine_results: {resp}") else: - self.logger.result("[get result] Positive response:") - self.logger.result(f"hex: {resp.routine_status_record.hex()}") - self.logger.result(f"raw: {resp.routine_status_record!r}") + logger.result("[get result] Positive response:") + logger.result(f"hex: {resp.routine_status_record.hex()}") + logger.result(f"raw: {resp.routine_status_record!r}") diff --git a/src/gallia/commands/primitive/uds/send_pdu.py b/src/gallia/commands/primitive/uds/send_pdu.py index fdc82d24a..605d71843 100644 --- a/src/gallia/commands/primitive/uds/send_pdu.py +++ b/src/gallia/commands/primitive/uds/send_pdu.py @@ -7,6 +7,7 @@ from argparse import Namespace from gallia.command import UDSScanner +from gallia.log import get_logger from gallia.services.uds import ( NegativeResponse, UDSRequest, @@ -18,6 +19,8 @@ from gallia.services.uds.helpers import raise_for_error from gallia.utils import auto_int +logger = get_logger("gallia.primitive.pdu") + class SendPDUPrimitive(UDSScanner): """A raw scanner to send a plain pdu""" @@ -57,9 +60,9 @@ async def main(self, args: Namespace) -> None: parsed_request = UDSRequest.parse_dynamic(pdu) if isinstance(parsed_request, RawRequest): - self.logger.warning("Could not parse the request pdu") + logger.warning("Could not parse the request pdu") - self.logger.info(f"Sending {parsed_request}") + logger.info(f"Sending {parsed_request}") try: response = await self.ecu.send_raw( @@ -67,13 +70,13 @@ async def main(self, args: Namespace) -> None: config=UDSRequestConfig(max_retry=args.max_retry), ) except UDSException as e: - self.logger.error(repr(e)) + logger.error(repr(e)) sys.exit(1) if isinstance(response, NegativeResponse): - self.logger.warning(f"Received {response}") + logger.warning(f"Received {response}") else: if isinstance(response, RawResponse): - self.logger.warning("Could not parse the response pdu") + logger.warning("Could not parse the response pdu") - self.logger.notice(f"Received {response}") + logger.notice(f"Received {response}") diff --git a/src/gallia/commands/primitive/uds/vin.py b/src/gallia/commands/primitive/uds/vin.py index 2b888e4cb..131f10480 100644 --- a/src/gallia/commands/primitive/uds/vin.py +++ b/src/gallia/commands/primitive/uds/vin.py @@ -5,8 +5,11 @@ from argparse import Namespace from gallia.command import UDSScanner +from gallia.log import get_logger from gallia.services.uds.core.service import NegativeResponse +logger = get_logger("gallia.primitive.vin") + class VINPrimitive(UDSScanner): """Request VIN""" @@ -21,6 +24,6 @@ def configure_parser(self) -> None: async def main(self, args: Namespace) -> None: resp = await self.ecu.read_vin() if isinstance(resp, NegativeResponse): - self.logger.warning(f"ECU said: {resp}") + logger.warning(f"ECU said: {resp}") return - self.logger.result(resp.data_record.hex()) + logger.result(resp.data_record.hex()) diff --git a/src/gallia/commands/primitive/uds/wmba.py b/src/gallia/commands/primitive/uds/wmba.py index a9fc1f2ac..b0c9c4487 100644 --- a/src/gallia/commands/primitive/uds/wmba.py +++ b/src/gallia/commands/primitive/uds/wmba.py @@ -8,10 +8,13 @@ from pathlib import Path from gallia.command import UDSScanner +from gallia.log import get_logger from gallia.services.uds import NegativeResponse from gallia.services.uds.core.utils import g_repr from gallia.utils import auto_int +logger = get_logger("gallia.primitive.wmba") + class WMBAPrimitive(UDSScanner): """Write memory by address""" @@ -48,7 +51,7 @@ async def main(self, args: Namespace) -> None: try: await self.ecu.check_and_set_session(args.session) except Exception as e: - self.logger.critical( + logger.critical( f"Could not change to session: {g_repr(args.session)}: {e!r}" ) sys.exit(1) @@ -62,7 +65,7 @@ async def main(self, args: Namespace) -> None: resp = await self.ecu.write_memory_by_address(args.address, data) if isinstance(resp, NegativeResponse): - self.logger.error(resp) + logger.error(resp) else: # There is not real data returned, only echos - self.logger.result("Success") + logger.result("Success") diff --git a/src/gallia/commands/primitive/uds/write_by_identifier.py b/src/gallia/commands/primitive/uds/write_by_identifier.py index d91055b6e..b4be9717e 100644 --- a/src/gallia/commands/primitive/uds/write_by_identifier.py +++ b/src/gallia/commands/primitive/uds/write_by_identifier.py @@ -7,9 +7,12 @@ from argparse import Namespace from gallia.command import UDSScanner +from gallia.log import get_logger from gallia.services.uds import NegativeResponse, UDSResponse from gallia.utils import auto_int +logger = get_logger("gallia.primitive.wdbi") + class WriteByIdentifierPrimitive(UDSScanner): """A simple scanner to talk to the write by identifier service""" @@ -45,14 +48,14 @@ async def main(self, args: Namespace) -> None: if args.session != 0x01: resp: UDSResponse = await self.ecu.set_session(args.session) if isinstance(resp, NegativeResponse): - self.logger.critical(f"could not change to session: {resp}") + logger.critical(f"could not change to session: {resp}") sys.exit(1) except Exception as e: - self.logger.critical(f"fatal error: {e!r}") + logger.critical(f"fatal error: {e!r}") sys.exit(1) resp = await self.ecu.write_data_by_identifier(args.data_id, args.data) if isinstance(resp, NegativeResponse): - self.logger.error(resp) + logger.error(resp) else: - self.logger.info("Positive response") + logger.info("Positive response") diff --git a/src/gallia/commands/scan/uds/identifiers.py b/src/gallia/commands/scan/uds/identifiers.py index 476951ccd..88ac4aafc 100644 --- a/src/gallia/commands/scan/uds/identifiers.py +++ b/src/gallia/commands/scan/uds/identifiers.py @@ -9,6 +9,7 @@ from itertools import product from gallia.command import UDSScanner +from gallia.log import get_logger from gallia.services.uds.core.client import UDSRequestConfig from gallia.services.uds.core.constants import ( RoutineControlSubFuncs, @@ -21,6 +22,8 @@ from gallia.services.uds.helpers import suggests_service_not_supported from gallia.utils import ParseSkips, auto_int +logger = get_logger("gallia.scan.identifiers") + class ScanIdentifiers(UDSScanner): """This scanner scans DataIdentifiers of various @@ -101,7 +104,7 @@ def configure_parser(self) -> None: async def main(self, args: Namespace) -> None: if args.sessions is None: - self.logger.info("No sessions specified, starting with session scan") + logger.info("No sessions specified, starting with session scan") # Only until 0x80 because the eight bit is "SuppressResponse" sessions = [ s @@ -109,7 +112,7 @@ async def main(self, args: Namespace) -> None: if s not in args.skip or args.skip[s] is not None ] sessions = await self.ecu.find_sessions(sessions) - self.logger.result(f"Found {len(sessions)} sessions: {g_repr(sessions)}") + logger.result(f"Found {len(sessions)} sessions: {g_repr(sessions)}") else: sessions = [ s @@ -117,21 +120,19 @@ async def main(self, args: Namespace) -> None: if s not in args.skip or args.skip[s] is not None ] - self.logger.info(f"testing sessions {g_repr(sessions)}") + logger.info(f"testing sessions {g_repr(sessions)}") # TODO: Unified shortened output necessary here - self.logger.info(f"skipping identifiers {reprlib.repr(args.skip)}") + logger.info(f"skipping identifiers {reprlib.repr(args.skip)}") for session in sessions: - self.logger.notice(f"Switching to session {g_repr(session)}") + logger.notice(f"Switching to session {g_repr(session)}") resp: UDSResponse = await self.ecu.set_session(session) if isinstance(resp, NegativeResponse): - self.logger.warning( - f"Switching to session {g_repr(session)} failed: {resp}" - ) + logger.warning(f"Switching to session {g_repr(session)} failed: {resp}") continue - self.logger.result(f"Starting scan in session: {g_repr(session)}") + logger.result(f"Starting scan in session: {g_repr(session)}") positive_DIDs = 0 abnormal_DIDs = 0 timeout_DIDs = 0 @@ -139,7 +140,7 @@ async def main(self, args: Namespace) -> None: if args.sid == UDSIsoServices.RoutineControl: if not args.payload: - self.logger.warning( + logger.warning( "Scanning RoutineControl with empty payload can successfully execute some " + "routines, such as switching from plant mode to field mode, which can only " + "be reversed with a valid token!" @@ -150,7 +151,7 @@ async def main(self, args: Namespace) -> None: if args.sid == UDSIsoServices.SecurityAccess: if args.end > 0xFF: - self.logger.warning( + logger.warning( "Service 0x27 SecurityAccess only accepts subFunctions (1-byte identifiers); " + f"limiting END to {g_repr(0xff)} instead of {g_repr(args.end)}" ) @@ -160,20 +161,20 @@ async def main(self, args: Namespace) -> None: range(args.start, args.end + 1), sub_functions ): if session in args.skip and DID in args.skip[session]: - self.logger.info(f"{g_repr(DID)}: skipped") + logger.info(f"{g_repr(DID)}: skipped") continue if args.check_session and DID % args.check_session == 0: # Check session and try to recover from wrong session (max 3 times), else skip session if not await self.ecu.check_and_set_session(session): - self.logger.error( + logger.error( f"Aborting scan on session {g_repr(session)}; current DID was {g_repr(DID)}" ) break if args.sid == UDSIsoServices.SecurityAccess: if DID & 0b10000000: - self.logger.info( + logger.info( "Keep in mind that you set the SuppressResponse Bit (8th bit): " + f"{g_repr(DID)} = 0b{DID:b}" ) @@ -196,15 +197,15 @@ async def main(self, args: Namespace) -> None: pdu, config=UDSRequestConfig(tags=["ANALYZE"], max_retry=3) ) except asyncio.TimeoutError: - self.logger.result(f"{g_repr(DID)}: Retries exceeded") + logger.result(f"{g_repr(DID)}: Retries exceeded") timeout_DIDs += 1 continue except IllegalResponse as e: - self.logger.warning(g_repr(e)) + logger.warning(g_repr(e)) if isinstance(resp, NegativeResponse): if suggests_service_not_supported(resp): - self.logger.info( + logger.info( f"{g_repr(DID)}: {resp}; does session {g_repr(session)} " f"support service {service_repr(args.sid)}?" ) @@ -214,19 +215,19 @@ async def main(self, args: Namespace) -> None: # RequestOutOfRange is a common reply for invalid DataIdentifiers elif resp.response_code == UDSErrorCodes.requestOutOfRange: - self.logger.debug(f"{g_repr(DID)}: {resp}") + logger.info(f"{g_repr(DID)}: {resp}") else: - self.logger.result(f"{g_repr(DID)}: {resp}") + logger.result(f"{g_repr(DID)}: {resp}") abnormal_DIDs += 1 else: - self.logger.result(f"{g_repr(DID)}: {resp}") + logger.result(f"{g_repr(DID)}: {resp}") positive_DIDs += 1 - self.logger.result(f"Scan in session {g_repr(session)} is complete!") - self.logger.result(f"Positive replies: {positive_DIDs}") - self.logger.result(f"Abnormal replies: {abnormal_DIDs}") - self.logger.result(f"Timeouts: {timeout_DIDs}") + logger.result(f"Scan in session {g_repr(session)} is complete!") + logger.result(f"Positive replies: {positive_DIDs}") + logger.result(f"Abnormal replies: {abnormal_DIDs}") + logger.result(f"Timeouts: {timeout_DIDs}") - self.logger.info(f"Leaving session {g_repr(session)} via hook") + logger.info(f"Leaving session {g_repr(session)} via hook") await self.ecu.leave_session(session) diff --git a/src/gallia/commands/scan/uds/memory.py b/src/gallia/commands/scan/uds/memory.py index 168b592fa..24c1e4aa8 100644 --- a/src/gallia/commands/scan/uds/memory.py +++ b/src/gallia/commands/scan/uds/memory.py @@ -8,10 +8,13 @@ from binascii import unhexlify from gallia.command import UDSScanner +from gallia.log import get_logger from gallia.services.uds import NegativeResponse, UDSErrorCodes, UDSRequestConfig from gallia.services.uds.core.utils import g_repr, uds_memory_parameters from gallia.utils import auto_int +logger = get_logger("gallia.scan.memory") + class MemoryFunctionsScanner(UDSScanner): """This scanner scans functions with direct access to memory. @@ -55,7 +58,7 @@ def configure_parser(self) -> None: async def main(self, args: Namespace) -> None: resp = await self.ecu.set_session(args.session) if isinstance(resp, NegativeResponse): - self.logger.critical(f"could not change to session: {resp}") + logger.critical(f"could not change to session: {resp}") sys.exit(1) for i in range(5): @@ -87,7 +90,7 @@ async def scan_memory_address(self, args: Namespace, addr_offset: int = 0) -> No if args.check_session and i % args.check_session == 0: # Check session and try to recover from wrong session (max 3 times), else skip session if not await self.ecu.check_and_set_session(args.session): - self.logger.error( + logger.error( f"Aborting scan on session {g_repr(args.session)}; " + f"current memory address was {g_repr(addr)}" ) @@ -98,13 +101,13 @@ async def scan_memory_address(self, args: Namespace, addr_offset: int = 0) -> No pdu, config=UDSRequestConfig(tags=["ANALYZE"]) ) except asyncio.TimeoutError: - self.logger.result(f"Address {g_repr(addr)}: timeout") + logger.result(f"Address {g_repr(addr)}: timeout") continue if isinstance(resp, NegativeResponse): if resp.response_code is UDSErrorCodes.requestOutOfRange: - self.logger.info(f"Address {g_repr(addr)}: {resp}") + logger.info(f"Address {g_repr(addr)}: {resp}") else: - self.logger.result(f"Address {g_repr(addr)}: {resp}") + logger.result(f"Address {g_repr(addr)}: {resp}") else: - self.logger.result(f"Address {g_repr(addr)}: {resp}") + logger.result(f"Address {g_repr(addr)}: {resp}") diff --git a/src/gallia/commands/scan/uds/reset.py b/src/gallia/commands/scan/uds/reset.py index 5870afab3..7ac5fe334 100644 --- a/src/gallia/commands/scan/uds/reset.py +++ b/src/gallia/commands/scan/uds/reset.py @@ -9,6 +9,7 @@ from typing import Any from gallia.command import UDSScanner +from gallia.log import get_logger from gallia.services.uds import NegativeResponse, UDSRequestConfig, UDSResponse from gallia.services.uds.core.exception import ( IllegalResponse, @@ -18,6 +19,8 @@ from gallia.services.uds.helpers import suggests_sub_function_not_supported from gallia.utils import ParseSkips, auto_int +logger = get_logger("gallia.scan.reset") + class ResetScanner(UDSScanner): """Scan ecu_reset""" @@ -61,7 +64,7 @@ async def main(self, args: Namespace) -> None: l_error: dict[int, list[Any]] = {} if args.sessions is None: - self.logger.info("No sessions specified, starting with session scan") + logger.info("No sessions specified, starting with session scan") # Only until 0x80 because the eight bit is "SuppressResponse" sessions = [ s @@ -69,7 +72,7 @@ async def main(self, args: Namespace) -> None: if s not in args.skip or args.skip[s] is not None ] sessions = await self.ecu.find_sessions(sessions) - self.logger.result(f"Found {len(sessions)} sessions: {g_repr(sessions)}") + logger.result(f"Found {len(sessions)} sessions: {g_repr(sessions)}") else: sessions = [ s @@ -77,28 +80,26 @@ async def main(self, args: Namespace) -> None: if s not in args.skip or args.skip[s] is not None ] - self.logger.info(f"testing sessions {g_repr(sessions)}") + logger.info(f"testing sessions {g_repr(sessions)}") # TODO: Unified shortened output necessary here - self.logger.info(f"skipping identifiers {reprlib.repr(args.skip)}") + logger.info(f"skipping identifiers {reprlib.repr(args.skip)}") for session in sessions: - self.logger.notice(f"Switching to session {g_repr(session)}") + logger.notice(f"Switching to session {g_repr(session)}") resp: UDSResponse = await self.ecu.set_session(session) if isinstance(resp, NegativeResponse): - self.logger.warning( - f"Switching to session {g_repr(session)} failed: {resp}" - ) + logger.warning(f"Switching to session {g_repr(session)} failed: {resp}") continue - self.logger.result(f"Scanning in session: {g_repr(session)}") + logger.result(f"Scanning in session: {g_repr(session)}") l_ok[session] = [] l_timeout[session] = [] l_error[session] = [] for sub_func in range(0x01, 0x80): if session in args.skip and sub_func in args.skip[session]: - self.logger.notice( + logger.notice( f"skipping subFunc: {g_repr(sub_func)} because of --skip" ) continue @@ -106,7 +107,7 @@ async def main(self, args: Namespace) -> None: if not args.skip_check_session: # Check session and try to recover from wrong session (max 3 times), else skip session if not await self.ecu.check_and_set_session(session): - self.logger.error( + logger.error( f"Aborting scan on session {g_repr(session)}; current sub-func was {g_repr(sub_func)}" ) break @@ -118,24 +119,24 @@ async def main(self, args: Namespace) -> None: ) if isinstance(resp, NegativeResponse): if suggests_sub_function_not_supported(resp): - self.logger.info(f"{g_repr(sub_func)}: {resp}") + logger.info(f"{g_repr(sub_func)}: {resp}") else: l_error[session].append({sub_func: resp.response_code}) msg = f"{g_repr(sub_func)}: with error code: {resp}" - self.logger.result(msg) + logger.result(msg) continue except IllegalResponse as e: - self.logger.warning(f"{g_repr(e)}") + logger.warning(f"{g_repr(e)}") - self.logger.result(f"{g_repr(sub_func)}: reset level found!") + logger.result(f"{g_repr(sub_func)}: reset level found!") l_ok[session].append(sub_func) - self.logger.info("Waiting for the ECU to recover…") + logger.info("Waiting for the ECU to recover…") await self.ecu.wait_for_ecu() - self.logger.info("Reboot ECU to restore default conditions") + logger.info("Reboot ECU to restore default conditions") resp = await self.ecu.ecu_reset(0x01) if isinstance(resp, NegativeResponse): - self.logger.warning( + logger.warning( f"Could not reboot ECU after testing reset level {g_repr(sub_func)}" ) else: @@ -144,23 +145,23 @@ async def main(self, args: Namespace) -> None: except asyncio.TimeoutError: l_timeout[session].append(sub_func) if not args.power_cycle: - self.logger.error( + logger.error( f"ECU did not respond after reset level {g_repr(sub_func)}; exit" ) sys.exit(1) - self.logger.warning( + logger.warning( f"ECU did not respond after reset level {g_repr(sub_func)}; try power cycle…" ) try: await self.ecu.power_cycle() await self.ecu.wait_for_ecu() except (ConnectionError, asyncio.TimeoutError) as e: - self.logger.error(f"Failed to recover ECU: {g_repr(e)}; exit") + logger.error(f"Failed to recover ECU: {g_repr(e)}; exit") sys.exit(1) except ConnectionError: msg = f"{g_repr(sub_func)}: lost connection to ECU (post), current session: {g_repr(session)}" - self.logger.warning(msg) + logger.warning(msg) await self.ecu.reconnect() continue @@ -168,20 +169,20 @@ async def main(self, args: Namespace) -> None: if not args.skip_check_session: try: current_session = await self.ecu.read_session() - self.logger.result( + logger.result( f"{g_repr(sub_func)}: Currently in session {g_repr(current_session)}, " f"should be {g_repr(session)}" ) except UnexpectedNegativeResponse as e: - self.logger.warning( + logger.warning( f"Could not read current session: {e.RESPONSE_CODE.name}" ) - self.logger.info(f"Setting session {g_repr(session)}") + logger.info(f"Setting session {g_repr(session)}") await self.ecu.set_session(session) await self.ecu.leave_session(session) - self.logger.result(f"ok: {l_ok}") - self.logger.result(f"timeout: {l_timeout}") - self.logger.result(f"with error: {l_error}") + logger.result(f"ok: {l_ok}") + logger.result(f"timeout: {l_timeout}") + logger.result(f"with error: {l_error}") diff --git a/src/gallia/commands/scan/uds/sa_dump_seeds.py b/src/gallia/commands/scan/uds/sa_dump_seeds.py index c608a32a2..ae0b1e1ef 100644 --- a/src/gallia/commands/scan/uds/sa_dump_seeds.py +++ b/src/gallia/commands/scan/uds/sa_dump_seeds.py @@ -13,10 +13,13 @@ from gallia.command import UDSScanner from gallia.config import Config +from gallia.log import get_logger from gallia.services.uds import NegativeResponse, UDSRequestConfig from gallia.services.uds.core.utils import g_repr from gallia.utils import auto_int +logger = get_logger("gallia.scan.dump-seeds") + class SASeedsDumper(UDSScanner): """This scanner tries to enable ProgrammingSession and dump seeds for 12h.""" @@ -88,7 +91,7 @@ async def request_seed(self, level: int, data: bytes) -> bytes | None: level, data, config=UDSRequestConfig(tags=["ANALYZE"]) ) if isinstance(resp, NegativeResponse): - self.logger.warning(f"ECU replied with an error: {resp}") + logger.warning(f"ECU replied with an error: {resp}") return None return resp.security_seed @@ -97,9 +100,9 @@ async def send_key(self, level: int, key: bytes) -> bool: level + 1, key, config=UDSRequestConfig(tags=["ANALYZE"]) ) if isinstance(resp, NegativeResponse): - self.logger.debug(f"Key was rejected: {resp}") + logger.debug(f"Key was rejected: {resp}") return False - self.logger.result( + logger.result( f'Unlocked SA level {g_repr(level)} with key "{key.hex()}"! resp: {resp}' ) return True @@ -115,17 +118,17 @@ def log_size(self, path: Path, time_delta: float) -> None: if size > 1024: size = size / 1024 size_unit = "MiB" - self.logger.notice( + logger.notice( f"Dumping seeds with {rate:.2f}{rate_unit}/h: {size:.2f}{size_unit}" ) async def main(self, args: Namespace) -> None: session = args.session - self.logger.info(f"scanning in session: {g_repr(session)}") + logger.info(f"scanning in session: {g_repr(session)}") resp = await self.ecu.set_session(session) if isinstance(resp, NegativeResponse): - self.logger.critical(f"could not change to session: {resp}") + logger.critical(f"could not change to session: {resp}") return i = -1 @@ -152,7 +155,7 @@ async def main(self, args: Namespace) -> None: if args.check_session or reset: if not await self.ecu.check_and_set_session(args.session): - self.logger.error( + logger.error( f"ECU persistently lost session {g_repr(args.session)}" ) sys.exit(1) @@ -162,10 +165,10 @@ async def main(self, args: Namespace) -> None: try: seed = await self.request_seed(args.level, args.data_record) except asyncio.TimeoutError: - self.logger.error("Timeout while requesting seed") + logger.error("Timeout while requesting seed") continue except Exception as e: - self.logger.critical(f"Error while requesting seed: {g_repr(e)}") + logger.critical(f"Error while requesting seed: {g_repr(e)}") sys.exit(1) if seed is None: @@ -174,7 +177,7 @@ async def main(self, args: Namespace) -> None: await file.write(seed) if last_seed == seed: - self.logger.warning("Received the same seed as before") + logger.warning("Received the same seed as before") last_seed = seed @@ -183,10 +186,10 @@ async def main(self, args: Namespace) -> None: if await self.send_key(args.level, bytes(args.send_zero_key)): break except asyncio.TimeoutError: - self.logger.warning("Timeout while sending key") + logger.warning("Timeout while sending key") continue except Exception as e: - self.logger.critical(f"Error while sending key: {g_repr(e)}") + logger.critical(f"Error while sending key: {g_repr(e)}") sys.exit(1) runs_since_last_reset += 1 @@ -196,15 +199,15 @@ async def main(self, args: Namespace) -> None: runs_since_last_reset = 0 try: - self.logger.info("Resetting the ECU") + logger.info("Resetting the ECU") await self.ecu.ecu_reset(0x01) - self.logger.info("Waiting for the ECU to recover…") + logger.info("Waiting for the ECU to recover…") await self.ecu.wait_for_ecu() except asyncio.TimeoutError: - self.logger.error("ECU did not respond after reset; exiting…") + logger.error("ECU did not respond after reset; exiting…") sys.exit(1) except ConnectionError: - self.logger.warning( + logger.warning( "Lost connection to the ECU after performing a reset. " "Attempting to reconnect…" ) diff --git a/src/gallia/commands/scan/uds/services.py b/src/gallia/commands/scan/uds/services.py index e4cb7cd0f..6790b0643 100644 --- a/src/gallia/commands/scan/uds/services.py +++ b/src/gallia/commands/scan/uds/services.py @@ -8,6 +8,7 @@ from typing import Any from gallia.command import UDSScanner +from gallia.log import get_logger from gallia.services.uds import ( NegativeResponse, UDSErrorCodes, @@ -19,6 +20,8 @@ from gallia.services.uds.core.utils import g_repr from gallia.utils import ParseSkips, auto_int +logger = get_logger("gallia.scan.services") + class ServicesScanner(UDSScanner): """Iterate sessions and services and find endpoints""" @@ -79,7 +82,7 @@ async def main(self, args: Namespace) -> None: found: dict[int, dict[int, Any]] = {} if args.sessions is None: - self.logger.info("No sessions specified, starting with session scan") + logger.info("No sessions specified, starting with session scan") # Only until 0x80 because the eight bit is "SuppressResponse" sessions = [ s @@ -87,7 +90,7 @@ async def main(self, args: Namespace) -> None: if s not in args.skip or args.skip[s] is not None ] sessions = await self.ecu.find_sessions(sessions) - self.logger.result(f"Found {len(sessions)} sessions: {g_repr(sessions)}") + logger.result(f"Found {len(sessions)} sessions: {g_repr(sessions)}") else: sessions = [ s @@ -95,30 +98,30 @@ async def main(self, args: Namespace) -> None: if s not in args.skip or args.skip[s] is not None ] - self.logger.info(f"testing sessions {g_repr(sessions)}") + logger.info(f"testing sessions {g_repr(sessions)}") # TODO: Unified shortened output necessary here - self.logger.info(f"skipping identifiers {reprlib.repr(args.skip)}") + logger.info(f"skipping identifiers {reprlib.repr(args.skip)}") for session in sessions: - self.logger.info(f"Changing to session {g_repr(session)}") + logger.info(f"Changing to session {g_repr(session)}") try: resp: UDSResponse = await self.ecu.set_session( session, UDSRequestConfig(tags=["preparation"]) ) except (UDSException, RuntimeError) as e: # FIXME why catch RuntimeError? - self.logger.warning( + logger.warning( f"Could not complete session change to {g_repr(session)}: {g_repr(e)}; skipping session" ) continue if isinstance(resp, NegativeResponse): - self.logger.warning( + logger.warning( f"Could not complete session change to {g_repr(session)}: {resp}; skipping session" ) continue found[session] = {} - self.logger.result(f"scanning in session {g_repr(session)}") + logger.result(f"scanning in session {g_repr(session)}") # Starts at 0x00, see first loop iteration. sid = -1 @@ -128,12 +131,12 @@ async def main(self, args: Namespace) -> None: continue if session in args.skip and sid in args.skip[session]: - self.logger.info(f"{g_repr(sid)}: skipped") + logger.info(f"{g_repr(sid)}: skipped") continue if args.check_session: if not await self.ecu.check_and_set_session(session): - self.logger.error( + logger.error( f"Aborting scan on session {g_repr(session)}; current SID was {g_repr(sid)}" ) break @@ -145,15 +148,15 @@ async def main(self, args: Namespace) -> None: pdu, config=UDSRequestConfig(tags=["ANALYZE"]) ) except asyncio.TimeoutError: - self.logger.info(f"{g_repr(sid)}: timeout") + logger.info(f"{g_repr(sid)}: timeout") continue except MalformedResponse as e: - self.logger.warning( + logger.warning( f"{g_repr(sid)}: {e!r} occurred, this needs to be investigated!" ) continue except Exception as e: - self.logger.info(f"{g_repr(sid)}: {e!r} occurred") + logger.info(f"{g_repr(sid)}: {e!r} occurred") await self.ecu.reconnect() continue @@ -161,7 +164,7 @@ async def main(self, args: Namespace) -> None: UDSErrorCodes.serviceNotSupported, UDSErrorCodes.serviceNotSupportedInActiveSession, ]: - self.logger.info(f"{g_repr(sid)}: not supported [{resp}]") + logger.info(f"{g_repr(sid)}: not supported [{resp}]") break if isinstance(resp, NegativeResponse) and resp.response_code in [ @@ -169,7 +172,7 @@ async def main(self, args: Namespace) -> None: ]: continue - self.logger.result( + logger.result( f"{g_repr(sid)}: available in session {g_repr(session)}: {resp}" ) found[session][sid] = resp @@ -178,12 +181,12 @@ async def main(self, args: Namespace) -> None: await self.ecu.leave_session(session) for key, value in found.items(): - self.logger.result(f"findings in session 0x{key:02X}:") + logger.result(f"findings in session 0x{key:02X}:") for sid, data in value.items(): self.result.append((key, sid)) try: - self.logger.result( + logger.result( f" [{g_repr(sid)}] {UDSIsoServices(sid).name}: {data}" ) except Exception: - self.logger.result(f" [{g_repr(sid)}] vendor specific sid: {data}") + logger.result(f" [{g_repr(sid)}] vendor specific sid: {data}") diff --git a/src/gallia/commands/scan/uds/sessions.py b/src/gallia/commands/scan/uds/sessions.py index 6b39ca298..9afde89ae 100644 --- a/src/gallia/commands/scan/uds/sessions.py +++ b/src/gallia/commands/scan/uds/sessions.py @@ -8,6 +8,7 @@ from typing import Any from gallia.command import UDSScanner +from gallia.log import get_logger from gallia.services.uds import ( NegativeResponse, UDSErrorCodes, @@ -19,6 +20,8 @@ from gallia.services.uds.core.utils import g_repr from gallia.utils import auto_int +logger = get_logger("gallia.scan.sessions") + class SessionsScanner(UDSScanner): """Iterate Sessions""" @@ -75,11 +78,11 @@ async def set_session_with_hooks_handling( isinstance(resp, NegativeResponse) and resp.response_code == UDSErrorCodes.conditionsNotCorrect ): - self.logger.notice( + logger.notice( f"Received conditionsNotCorrect for session {g_repr(session)}" ) if not use_hooks: - self.logger.warning( + logger.warning( f"Session {g_repr(session)} is potentially available but could not be entered. " f"Use --with-hooks to try to enter the session using hooks to scan for " f"transitions available from that session." @@ -91,12 +94,12 @@ async def set_session_with_hooks_handling( ) if not isinstance(resp_, NegativeResponse): - self.logger.notice( + logger.notice( f"Successfully changed to session {g_repr(session)} with hooks" ) resp = resp_ else: - self.logger.notice( + logger.notice( f"Could not successfully change to session {g_repr(session)} even with hooks" ) @@ -108,13 +111,13 @@ async def recover_stack(self, stack: list[int], use_hooks: bool) -> bool: resp = await self.set_session_with_hooks_handling(session, use_hooks) if isinstance(resp, NegativeResponse): - self.logger.error( + logger.error( f"Could not change to session {g_repr(session)} as part of stack: {resp}. " f"Try with --reset to reset between each iteration." ) return False except Exception as e: - self.logger.error( + logger.error( f"Could not change to session {g_repr(session)} as part of stack: {g_repr(e)}. " f"Try with --reset to reset between each iteration." ) @@ -136,7 +139,7 @@ async def main(self, args: Namespace) -> None: depth += 1 found[depth] = [] - self.logger.info(f"Depth: {depth}") + logger.info(f"Depth: {depth}") for stack in found[depth - 1]: if args.fast and stack[-1] in search_sessions: @@ -145,60 +148,54 @@ async def main(self, args: Namespace) -> None: search_sessions.append(stack[-1]) if stack: - self.logger.info(f"Starting from session: {g_repr(stack[-1])}") + logger.info(f"Starting from session: {g_repr(stack[-1])}") for session in sessions: if session in args.skip: - self.logger.info( - f"Skipping session {g_repr(session)} as requested" - ) + logger.info(f"Skipping session {g_repr(session)} as requested") continue if args.reset: try: - self.logger.info("Resetting the ECU") + logger.info("Resetting the ECU") resp: UDSResponse = await self.ecu.ecu_reset(args.reset) if isinstance(resp, NegativeResponse): - self.logger.warning( + logger.warning( f"Could not reset ECU with {EcuResetSubFuncs(args.reset).name if args.reset in iter(EcuResetSubFuncs) else args.reset}: {resp}" f"; continuing without reset" ) else: - self.logger.info("Waiting for the ECU to recover…") + logger.info("Waiting for the ECU to recover…") await self.ecu.wait_for_ecu(timeout=5) except (asyncio.TimeoutError, ConnectionError): - self.logger.warning( + logger.warning( "Lost connection to the ECU after performing a reset. " "Attempting to reconnect…" ) await self.ecu.reconnect() try: - self.logger.debug("Changing session to DefaultSession") + logger.debug("Changing session to DefaultSession") resp = await self.ecu.set_session(0x01, use_db=False) if isinstance(resp, NegativeResponse): - self.logger.error( - f"Could not change to default session: {resp}" - ) + logger.error(f"Could not change to default session: {resp}") sys.exit(1) except Exception as e: - self.logger.error(f"Could not change to default session: {e!r}") + logger.error(f"Could not change to default session: {e!r}") sys.exit(1) - self.logger.debug( + logger.debug( f"Sleeping for {args.sleep}s after changing to DefaultSession" ) await asyncio.sleep(args.sleep) - self.logger.debug("Recovering the current session stack") + logger.debug("Recovering the current session stack") if not await self.recover_stack(stack, args.with_hooks): sys.exit(1) try: - self.logger.info( - f"Attempting to change to session {session:#04x}" - ) + logger.info(f"Attempting to change to session {session:#04x}") resp = await self.set_session_with_hooks_handling( session, args.with_hooks ) @@ -209,12 +206,12 @@ async def main(self, args: Namespace) -> None: and resp.response_code == UDSErrorCodes.subFunctionNotSupported ): - self.logger.info( + logger.info( f"Could not change to session {g_repr(session)}: {resp}" ) continue - self.logger.notice( + logger.notice( f"Found session: {g_repr(session)} via stack: {g_repr(stack)}; {resp}" ) @@ -237,12 +234,14 @@ async def main(self, args: Namespace) -> None: ) except asyncio.TimeoutError: - self.logger.warning( + logger.warning( f"Could not change to session {g_repr(session)}: Timeout" ) continue + except Exception as e: + logger.warning(f"Mamma mia: {repr(e)}") - self.logger.result("Scan finished; Found the following sessions:") + logger.result("Scan finished; Found the following sessions:") previous_session = 0 for res in sorted(positive_results, key=lambda x: x["session"]): @@ -251,18 +250,18 @@ async def main(self, args: Namespace) -> None: if session != previous_session: previous_session = session self.result.append(int(session)) - self.logger.result(f"* Session {g_repr(session)} ") + logger.result(f"* Session {g_repr(session)} ") if self.db_handler is not None: await self.db_handler.insert_session_transition( session, res["stack"] ) - self.logger.result( + logger.result( f"\tvia stack: {'->'.join([f'{g_repr(i)}' for i in res['stack']])}" ) - self.logger.result( + logger.result( "The following sessions were identified but could not be activated:" ) previous_session = 0 @@ -276,14 +275,14 @@ async def main(self, args: Namespace) -> None: ): if session != previous_session: previous_session = session - self.logger.result(f"* Session {g_repr(session)} ") + logger.result(f"* Session {g_repr(session)} ") if self.db_handler is not None: await self.db_handler.insert_session_transition( session, res["stack"] ) - self.logger.result( + logger.result( f"\tvia stack: {'->'.join([f'{g_repr(i)}' for i in res['stack']])} " f"(NRC: {res['error']})" ) diff --git a/src/gallia/commands/script/vecu.py b/src/gallia/commands/script/vecu.py index e7d4abb94..c714d04e0 100644 --- a/src/gallia/commands/script/vecu.py +++ b/src/gallia/commands/script/vecu.py @@ -9,6 +9,7 @@ from pathlib import Path from gallia.command import AsyncScript +from gallia.log import get_logger from gallia.services.uds.core.constants import UDSIsoServices from gallia.services.uds.server import ( DBUDSServer, @@ -29,6 +30,9 @@ dynamic_attr_prefix = "dynamic_attr_" +logger = get_logger("gallia.vecu.main") + + class VirtualECU(AsyncScript): """Spawn a virtual ECU for testing purposes""" diff --git a/src/gallia/db/handler.py b/src/gallia/db/handler.py index 147a82cfb..747de9cee 100644 --- a/src/gallia/db/handler.py +++ b/src/gallia/db/handler.py @@ -131,6 +131,8 @@ def bytes_repr(data: bytes) -> str: INSERT OR IGNORE INTO version VALUES('main', '{schema_version}'); """ +logger = get_logger("gallia.db") + class DBHandler: def __init__(self, database: Path): @@ -141,7 +143,6 @@ def __init__(self, database: Path): self.target: str | None = None self.discovery_run: int | None = None self.meta: int | None = None - self.logger = get_logger("db") async def connect(self) -> None: assert self.connection is None, "Already connected to the database" @@ -167,7 +168,7 @@ async def disconnect(self) -> None: try: await task except Exception as e: - self.logger.error(f"Inside task: {g_repr(e)}") + logger.error(f"Inside task: {g_repr(e)}") try: await self.connection.commit() @@ -395,7 +396,7 @@ async def execute() -> None: await self.connection.execute(query, query_parameter) done = True except aiosqlite.OperationalError: - self.logger.warning( + logger.warning( f"Could not log message for {query_parameter[5]} to database. Retrying ..." ) diff --git a/src/gallia/dumpcap.py b/src/gallia/dumpcap.py index 0b21d0ee0..75d7233b7 100644 --- a/src/gallia/dumpcap.py +++ b/src/gallia/dumpcap.py @@ -17,10 +17,12 @@ from typing import cast from urllib.parse import urlparse -from gallia.log import Logger, get_logger +from gallia.log import get_logger from gallia.transports import ISOTPTransport, RawCANTransport, TargetURI from gallia.utils import auto_int, split_host_port +logger = get_logger("gallia.dumpcap") + class Dumpcap: BUFSIZE = io.DEFAULT_BUFFER_SIZE @@ -28,13 +30,11 @@ class Dumpcap: def __init__( # noqa: PLR0913 self, proc: subprocess.Process, - logger: Logger, artifacts_dir: Path, outfile: Path, cleanup: int = 2, ) -> None: self.proc = proc - self.logger = logger self.artifacts_dir = artifacts_dir self.outfile = outfile self.cleanup = cleanup @@ -47,8 +47,6 @@ async def start( target: TargetURI, artifacts_dir: Path, ) -> Dumpcap: - logger = get_logger("dumpcap") - ts = int(datetime.now().timestamp()) if target.scheme in [ISOTPTransport.SCHEME, RawCANTransport.SCHEME]: outfile = artifacts_dir.joinpath(f"candump-{ts}.pcap.gz") @@ -86,7 +84,7 @@ async def start( logger.info(f'Started "dumpcap": {cmd_str}') - return cls(proc, logger, artifacts_dir, outfile) + return cls(proc, artifacts_dir, outfile) async def sync(self, timeout: float = 1) -> None: await asyncio.wait_for(self.ready_event.wait(), timeout) @@ -96,7 +94,7 @@ async def stop(self) -> None: try: self.proc.terminate() except ProcessLookupError: - self.logger.warning("dumpcap terminated before gallia") + logger.warning("dumpcap terminated before gallia") await self.proc.wait() await self.compressor diff --git a/src/gallia/log.py b/src/gallia/log.py index d082015e5..8aa0521fb 100644 --- a/src/gallia/log.py +++ b/src/gallia/log.py @@ -49,26 +49,22 @@ class ColorMode(Enum): NEVER = "never" -_COLORS_ENABLED = False - - -def set_color_mode(mode: ColorMode, stream: TextIO = sys.stderr) -> None: +def resolve_color_mode(mode: ColorMode, stream: TextIO = sys.stderr) -> bool: """Sets the color mode of the console log handler. :param mode: The available options are described in :class:`ColorMode`. :param stream: Used as a reference for :attr:`ColorMode.AUTO`. """ - global _COLORS_ENABLED # noqa: PLW0603 match mode: case ColorMode.ALWAYS: - _COLORS_ENABLED = True # noqa: PLW0603 + return True case ColorMode.AUTO: if os.getenv("NO_COLOR") is not None: - _COLORS_ENABLED = False # noqa: PLW0603 + return False else: - _COLORS_ENABLED = stream.isatty() # noqa: PLW0603 + return stream.isatty() case ColorMode.NEVER: - _COLORS_ENABLED = False + return False # https://stackoverflow.com/a/35804945 @@ -227,33 +223,11 @@ def to_level(self) -> Loglevel: raise ValueError("invalid value") -def _setup_queue() -> QueueListener: - queue: Queue[Any] = Queue() - root = logging.getLogger() - - handlers: list[logging.Handler] = [] - - handler = QueueHandler(queue) - root.addHandler(handler) - for h in root.handlers[:]: - if h is not handler: - root.removeHandler(h) - handlers.append(h) - - listener = QueueListener( - queue, - *handlers, - respect_handler_level=True, - ) - listener.start() - return listener - - def setup_logging( level: Loglevel | None = None, - file_level: Loglevel = Loglevel.DEBUG, - path: Path | None = None, color_mode: ColorMode = ColorMode.AUTO, + no_volatile_info: bool = False, + logger_name: str = "gallia", ) -> None: """Enable and configure gallia's logging system. If this fuction is not called as early as possible, @@ -270,9 +244,10 @@ def setup_logging( :param path: The path to the logfile containing json records. :param color_mode: The color mode to use for the console. """ - set_color_mode(color_mode) + colored = resolve_color_mode(color_mode) if level is None: + # FIXME why is this here and not in config? if (raw := os.getenv("GALLIA_LOGLEVEL")) is not None: level = PenlogPriority.from_str(raw).to_level() else: @@ -283,35 +258,66 @@ def setup_logging( logging.logThreads = False logging.logProcesses = False - # TODO: Do we want to have this configurable? - logging.getLogger("asyncio").setLevel(logging.CRITICAL) - logging.getLogger("aiosqlite").setLevel(logging.CRITICAL) + logger = logging.getLogger(logger_name) + # LogLevel cannot be 0 (NOTSET), because only the root logger sends it to its handlers then + logger.setLevel(1) + + # Clean up potentially existing handlers and create a new async QueueHandler for stderr output + while len(logger.handlers) > 0: + logger.handlers[0].close() + logger.removeHandler(logger.handlers[0]) + colored = resolve_color_mode(color_mode) + add_stderr_log_handler(logger_name, level, no_volatile_info, colored) + + +def add_stderr_log_handler( + logger_name: str, level: Loglevel, no_volatile_info: bool, colored: bool +) -> None: + queue: Queue[Any] = Queue() + logger = logging.getLogger(logger_name) + logger.addHandler(QueueHandler(queue)) stderr_handler = logging.StreamHandler(sys.stderr) stderr_handler.setLevel(level) - stderr_handler.setFormatter(_ConsoleFormatter()) - - handlers: list[logging.Handler] = [stderr_handler] - - if path is not None: - zstd_handler = _ZstdFileHandler(path, level=file_level) - zstd_handler.setFormatter(_JSONFormatter()) - zstd_handler.setLevel(file_level) - handlers.append(zstd_handler) - - logging.basicConfig( - handlers=handlers, - # Enable all log messages at the root logger. - # The stderr_handler and the zstd_handler have - # individual loglevels for appropriate filtering. - level=logging.NOTSET, - force=True, + console_formatter = _ConsoleFormatter() + + console_formatter.colored = colored + stderr_handler.terminator = "" # We manually handle the terminator while formatting + if no_volatile_info is False: + console_formatter.volatile_info = True + + stderr_handler.setFormatter(console_formatter) + + queue_listener = QueueListener( + queue, + *[stderr_handler], + respect_handler_level=True, ) + queue_listener.start() + atexit.register(queue_listener.stop) - # Replace handlers on the root logger with a LocalQueueHandler, and start a - # logging.QueueListener holding the original handlers. - queue = _setup_queue() - atexit.register(queue.stop) + +def add_zst_log_handler( + logger_name: str, filepath: Path, file_log_level: Loglevel +) -> None: + queue: Queue[Any] = Queue() + logger = get_logger(logger_name) + logger.addHandler(QueueHandler(queue)) + + zstd_handler = _ZstdFileHandler( + filepath, + level=file_log_level, + ) + zstd_handler.setLevel(file_log_level) + zstd_handler.setFormatter(_JSONFormatter()) + + queue_listener = QueueListener( + queue, + *[zstd_handler], + respect_handler_level=True, + ) + queue_listener.start() + atexit.register(queue_listener.stop) class _PenlogRecordV1(msgspec.Struct, omit_defaults=True): @@ -343,9 +349,9 @@ class _PenlogRecordV2(msgspec.Struct, omit_defaults=True, tag=2, tag_field="vers _PenlogRecord: TypeAlias = _PenlogRecordV1 | _PenlogRecordV2 -def _colorize_msg(data: str, levelno: int) -> str: +def _colorize_msg(data: str, levelno: int) -> tuple[str, int]: if not sys.stderr.isatty(): - return data + return data, 0 out = "" match levelno: @@ -370,7 +376,7 @@ def _colorize_msg(data: str, levelno: int) -> str: out += data out += _Color.RESET.value - return out + return out, len(style) def _format_record( # noqa: PLR0913 @@ -381,8 +387,10 @@ def _format_record( # noqa: PLR0913 tags: list[str] | None, stacktrace: str | None, colored: bool = False, + volatile_info: bool = False, ) -> str: - msg = "" + msg = "\33[2K" + extra_len = 4 msg += dt.strftime("%b %d %H:%M:%S.%f")[:-3] msg += " " msg += name @@ -391,10 +399,22 @@ def _format_record( # noqa: PLR0913 msg += ": " if colored: - msg += _colorize_msg(data, levelno) + tmp_msg, extra_len_tmp = _colorize_msg(data, levelno) + msg += tmp_msg + extra_len += extra_len_tmp else: msg += data + if volatile_info and levelno <= Loglevel.INFO: + terminal_width, _ = shutil.get_terminal_size() + msg = msg[ + : terminal_width + extra_len - 1 + ] # Adapt length to invisible ANSI colors + msg += _Color.RESET.value + msg += "\r" + else: + msg += "\n" + if stacktrace is not None: msg += "\n" msg += stacktrace @@ -411,6 +431,7 @@ class PenlogRecord: # FIXME: Enums are slow. priority: PenlogPriority tags: list[str] | None = None + colored: bool = False line: str | None = None stacktrace: str | None = None _python_level_no: int | None = None @@ -427,7 +448,7 @@ def __str__(self) -> str: else self.priority.to_level(), tags=self.tags, stacktrace=self.stacktrace, - colored=_COLORS_ENABLED, + colored=self.colored, ) @classmethod @@ -718,6 +739,9 @@ def format(self, record: logging.LogRecord) -> str: class _ConsoleFormatter(logging.Formatter): + colored: bool = False + volatile_info: bool = False + def format( self, record: logging.LogRecord, @@ -742,7 +766,8 @@ def format( levelno=record.levelno, tags=record.__dict__["tags"] if "tags" in record.__dict__ else None, stacktrace=stacktrace, - colored=_COLORS_ENABLED, + colored=self.colored, + volatile_info=self.volatile_info, ) diff --git a/src/gallia/powersupply.py b/src/gallia/powersupply.py index 2e348620c..ed2d6e051 100644 --- a/src/gallia/powersupply.py +++ b/src/gallia/powersupply.py @@ -14,6 +14,8 @@ from gallia.log import get_logger from gallia.transports import TargetURI +logger = get_logger("gallia.power-supply") + class PowerSupplyURI(TargetURI): @property @@ -39,7 +41,6 @@ def product_id(self) -> str: class PowerSupply: def __init__(self, client: BaseNetzteil, channel_id: int | list[int]) -> None: - self.logger = get_logger("power_supply") self.channel_id = channel_id self.netzteil = client self.mutex = asyncio.Lock() @@ -70,11 +71,11 @@ async def _power(self, op: bool) -> None: await self.netzteil.set_output(self.channel_id, op) async def power_up(self) -> None: - self.logger.info("power up experiment") + logger.info("power up experiment") await self._power(True) async def power_down(self) -> None: - self.logger.info("power down experiment") + logger.info("power down experiment") await self._power(False) async def power_cycle( diff --git a/src/gallia/services/uds/core/client.py b/src/gallia/services/uds/core/client.py index 90ab2ec1f..4f16b960b 100644 --- a/src/gallia/services/uds/core/client.py +++ b/src/gallia/services/uds/core/client.py @@ -30,6 +30,9 @@ class UDSRequestConfig: tags: list[str] | None = None +logger = get_logger("gallia.uds.client") + + class UDSClient: def __init__( self, @@ -43,7 +46,6 @@ def __init__( self.retry_wait = 0.2 self.pending_timeout = 5 self.mutex = asyncio.Lock() - self.logger = get_logger("uds") async def reconnect(self, timeout: int | None = None) -> None: """Calls the underlying transport to trigger a reconnect""" @@ -75,23 +77,21 @@ async def request_unsafe( # Avoid pasting this very line in every error branch. if i > 0: - self.logger.debug(f"retrying {i} from {max_retry}…") + logger.debug(f"retrying {i} from {max_retry}…") try: - self.logger.debug( - request.pdu.hex(), extra={"tags": ["write", "uds"] + tags} - ) + logger.debug(request.pdu.hex(), extra={"tags": ["write", "uds"] + tags}) raw_resp = await self.transport.request_unsafe( request.pdu, timeout, config.tags ) if raw_resp == b"": raise BrokenPipeError("connection to target lost") except asyncio.TimeoutError as e: - self.logger.debug(f"{request} failed with: {repr(e)}") + logger.debug(f"{request} failed with: {repr(e)}") last_exception = MissingResponse(request, str(e)) await asyncio.sleep(wait_time) continue - self.logger.debug(raw_resp.hex(), extra={"tags": ["read", "uds"] + tags}) + logger.debug(raw_resp.hex(), extra={"tags": ["read", "uds"] + tags}) resp = parse_pdu(raw_resp, request) if isinstance(resp, service.NegativeResponse): @@ -113,21 +113,17 @@ async def request_unsafe( and resp.response_code == UDSErrorCodes.requestCorrectlyReceivedResponsePending ): - self.logger.info( - f"Received ResponsePending: {n_pending}/{MAX_N_PENDING}" - ) + logger.info(f"Received ResponsePending: {n_pending}/{MAX_N_PENDING}") try: raw_resp = await self._read(timeout=waiting_time, tags=config.tags) if raw_resp == b"": raise BrokenPipeError("connection to target lost") - self.logger.debug( - raw_resp.hex(), extra={"tags": ["read", "uds"] + tags} - ) + logger.debug(raw_resp.hex(), extra={"tags": ["read", "uds"] + tags}) except asyncio.TimeoutError as e: # Send a tester present to indicate that # we are still there. await self._tester_present(suppress_resp=True) - self.logger.debug( + logger.debug( "Waiting for next message after ResponsePending: " f"{n_timeout}/{max_n_timeout}" ) @@ -148,7 +144,7 @@ async def request_unsafe( # and similar busy stuff is resolved. return resp - self.logger.debug(f"{request} failed after retry loop") + logger.debug(f"{request} failed after retry loop") raise last_exception async def _tester_present( @@ -159,7 +155,7 @@ async def _tester_present( if suppress_resp: pdu = service.TesterPresentRequest(suppress_response=True).pdu tags = config.tags if config.tags is not None else [] - self.logger.debug(pdu.hex(), extra={"tags": ["write", "uds"] + tags}) + logger.debug(pdu.hex(), extra={"tags": ["write", "uds"] + tags}) await self.transport.write(pdu, timeout, config.tags) return None return await self.tester_present(False, config) diff --git a/src/gallia/services/uds/core/service.py b/src/gallia/services/uds/core/service.py index 9e4261179..3be655c85 100644 --- a/src/gallia/services/uds/core/service.py +++ b/src/gallia/services/uds/core/service.py @@ -37,7 +37,7 @@ uds_memory_parameters, ) -logger = get_logger("uds") +logger = get_logger("gallia.uds.service") # **************** # * Base classes * diff --git a/src/gallia/services/uds/ecu.py b/src/gallia/services/uds/ecu.py index d664c3390..191244788 100644 --- a/src/gallia/services/uds/ecu.py +++ b/src/gallia/services/uds/ecu.py @@ -45,6 +45,9 @@ def __repr__(self) -> str: return f'{type(self).__name__}({", ".join(f"{key}={g_repr(value)}" for key, value in self.__dict__.items())})' +logger = get_logger("gallia.uds.ecu") + + class ECU(UDSClient): """ECU is a high level interface wrapping a UDSClient class. It provides semantically correct higher level interfaces such as read_session() @@ -62,7 +65,6 @@ def __init__( power_supply: PowerSupply | None = None, ) -> None: super().__init__(transport, timeout, max_retry) - self.logger = get_logger("ecu") self.tester_present_task: Task[None] | None = None self.tester_present_interval: float | None = None self.power_supply = power_supply @@ -145,9 +147,7 @@ async def check_and_set_session( Returns True if the current session matches the expected session, or if read_session is not supported by the ECU or in the current session.""" - self.logger.debug( - f"Checking current session, expecting {g_repr(expected_session)}" - ) + logger.debug(f"Checking current session, expecting {g_repr(expected_session)}") try: current_session = await self.read_session( @@ -155,33 +155,31 @@ async def check_and_set_session( ) except UnexpectedNegativeResponse as e: if suggests_identifier_not_supported(e.RESPONSE_CODE): - self.logger.info( + logger.info( f"Read current session not supported: {e.RESPONSE_CODE.name}, skipping check_session" ) return True raise e except asyncio.TimeoutError: - self.logger.warning( - "Reading current session timed out, skipping check_session" - ) + logger.warning("Reading current session timed out, skipping check_session") return True - self.logger.debug(f"Current session is {g_repr(current_session)}") + logger.debug(f"Current session is {g_repr(current_session)}") if current_session == expected_session: return True for i in range(retries): - self.logger.warning( + logger.warning( f"Not in session {g_repr(expected_session)}, ECU replied with {g_repr(current_session)}" ) - self.logger.info( + logger.info( f"Switching to session {g_repr(expected_session)}; attempt {i + 1} of {retries}" ) resp = await self.set_session(expected_session) if isinstance(resp, service.NegativeResponse): - self.logger.warning( + logger.warning( f"Switching to session {g_repr(expected_session)} failed: {resp}" ) @@ -189,30 +187,30 @@ async def check_and_set_session( current_session = await self.read_session( config=UDSRequestConfig(max_retry=retries) ) - self.logger.debug(f"Current session is {g_repr(current_session)}") + logger.debug(f"Current session is {g_repr(current_session)}") if current_session == expected_session: return True except UnexpectedNegativeResponse as e: if suggests_identifier_not_supported(e.RESPONSE_CODE): - self.logger.info( + logger.info( f"Read current session not supported: {e.RESPONSE_CODE.name}, skipping check_session" ) return True raise e except asyncio.TimeoutError: - self.logger.warning( + logger.warning( "Reading current session timed out, skipping check_session" ) return True - self.logger.warning( + logger.warning( f"Failed to switch to session {g_repr(expected_session)} after {retries} attempts" ) return False async def power_cycle(self, sleep: int = 5) -> bool: if self.power_supply is None: - self.logger.debug("no power_supply available") + logger.debug("no power_supply available") return False async def callback() -> None: @@ -281,14 +279,14 @@ async def set_session( and self.db_handler is not None and use_db ): - self.logger.debug( + logger.debug( "Could not switch to session. Trying with database transitions ..." ) if self.db_handler is not None: steps = await self.db_handler.get_session_transition(level) - self.logger.debug(f"Found the following steps in database: {steps}") + logger.debug(f"Found the following steps in database: {steps}") if steps is not None: for step in steps: @@ -331,7 +329,7 @@ async def transmit_data( """transmit_data splits the data to be sent in several blocks of size block_length, transfers all of them and concludes the transmission with RequestTransferExit""" if block_length > max_block_length: - self.logger.warning(f"Limiting block size to {g_repr(max_block_length)}") + logger.warning(f"Limiting block size to {g_repr(max_block_length)}") block_length = max_block_length # block_length includes the service identifier and block counter; payload must be smaller payload_size = block_length - 2 @@ -339,7 +337,7 @@ async def transmit_data( for i in range(0, len(data), payload_size): counter += 1 payload = data[i : i + payload_size] - self.logger.debug( + logger.debug( f"Transferring block {g_repr(counter)} " f"with payload size {g_repr(len(payload))}" ) @@ -353,16 +351,16 @@ async def transmit_data( async def _wait_for_ecu(self, sleep_time: float) -> None: """Internal method with endless loop in case of no answer from ECU""" config = UDSRequestConfig(timeout=0.5, max_retry=1, skip_hooks=True) - self.logger.info("waiting for ECU…") + logger.info("waiting for ECU…") while True: try: await asyncio.sleep(sleep_time) await self.ping(config=config) break except (ConnectionError, UDSException) as e: - self.logger.debug(f"ECU not ready: {e!r}") + logger.debug(f"ECU not ready: {e!r}") await self.reconnect() - self.logger.info("ECU ready") + logger.info("ECU ready") async def wait_for_ecu( self, @@ -378,7 +376,7 @@ async def wait_for_ecu( await asyncio.wait_for(self._wait_for_ecu(0.5), timeout=t) return True except asyncio.TimeoutError: - self.logger.critical("Timeout while waiting for ECU!") + logger.critical("Timeout while waiting for ECU!") return False finally: if self.tester_present_task and self.tester_present_interval: @@ -386,13 +384,13 @@ async def wait_for_ecu( async def _tester_present_worker(self, interval: float) -> None: assert self.transport - self.logger.debug("tester present worker started") + logger.debug("tester present worker started") while True: try: async with self.mutex: payload = bytes([0x3E, 0x80]) await self.transport.write(payload) - self.logger.debug(payload.hex(), extra={"tags": ["write", "uds"]}) + logger.debug(payload.hex(), extra={"tags": ["write", "uds"]}) # Hold the mutex for 10 ms to synchronize this background # worker with the main sender task. @@ -409,13 +407,13 @@ async def _tester_present_worker(self, interval: float) -> None: pass await asyncio.sleep(interval) except asyncio.CancelledError: - self.logger.debug("tester present worker terminated") + logger.debug("tester present worker terminated") break except ConnectionError: - self.logger.info("connection lost; reconnecting") + logger.info("connection lost; reconnecting") await self.reconnect() except Exception as e: - self.logger.debug(f"tester present got {e!r}") + logger.debug(f"tester present got {e!r}") async def start_cyclic_tester_present(self, interval: float) -> None: self.tester_present_interval = interval @@ -429,7 +427,7 @@ async def start_cyclic_tester_present(self, interval: float) -> None: async def stop_cyclic_tester_present(self) -> None: if self.tester_present_task is None: - self.logger.warning( + logger.warning( "BUG: stop_cyclic_tester_present() called but no task running" ) return @@ -527,7 +525,7 @@ async def _request( mode, ) except Exception as e: - self.logger.warning(f"Could not log messages to database: {g_repr(e)}") + logger.warning(f"Could not log messages to database: {g_repr(e)}") if response is not None: await self.update_state(request, response) diff --git a/src/gallia/services/uds/server.py b/src/gallia/services/uds/server.py index 22a8db6c5..a1b408625 100644 --- a/src/gallia/services/uds/server.py +++ b/src/gallia/services/uds/server.py @@ -29,11 +29,12 @@ from gallia.services.uds.ecu import ECUState from gallia.transports import ISOTPTransport, TargetURI +logger = get_logger("gallia.vecu.server") + class UDSServer(ABC): def __init__(self) -> None: self.state = ECUState() - self.logger = get_logger("v-ecu") self.use_default_response_if_service_not_supported = True self.use_default_response_if_missing_sub_function = True @@ -301,7 +302,7 @@ async def respond(self, request: service.UDSRequest) -> service.UDSResponse | No await self.update_state(request, response) if self.state.__dict__ != old_state.__dict__: - self.logger.debug(f"Changed state to {self.state}") + logger.debug(f"Changed state to {self.state}") if self.use_default_response_if_suppress: return self.default_response_if_suppress(request, response) @@ -385,8 +386,8 @@ def __init__(self, seed: int): async def setup(self) -> None: self.randomize() - self.logger.notice(f"Initialized random UDS server with seed {self.seed}") - self.logger.info( + logger.notice(f"Initialized random UDS server with seed {self.seed}") + logger.info( json.dumps( { int_repr(session): { @@ -804,7 +805,7 @@ async def respond_after_default( response = service.UDSResponse.parse_dynamic(unhexlify(response_pdu)) return response - self.logger.info("Reset ECU due to missing response") + logger.info("Reset ECU due to missing response") self.state.reset() return None @@ -814,7 +815,6 @@ class UDSServerTransport: def __init__(self, server: UDSServer, target: TargetURI): self.server = server self.target = target - self.logger = get_logger("v-ecu") self.last_time_active = time() async def run(self) -> None: @@ -824,20 +824,20 @@ async def handle_request(self, request_pdu: bytes) -> tuple[bytes | None, float] start = time() if start - self.last_time_active > 10: - self.logger.info("Server state reset due to inactivity") + logger.info("Server state reset due to inactivity") self.server.state.reset() request = service.UDSRequest.parse_dynamic(request_pdu) - self.logger.debug(f"---> {request}") + logger.debug(f"---> {request}") response = await self.server.respond(request) end = time() self.last_time_active = end if response is not None: - self.logger.debug(f" <--- {response} after {(end - start) * 1000:.2f} ms") + logger.debug(f" <--- {response} after {(end - start) * 1000:.2f} ms") return response.pdu, end - start - self.logger.debug(f" x--- NO RESPONSE after {(end - start) * 1000:.2f} ms") + logger.debug(f" x--- NO RESPONSE after {(end - start) * 1000:.2f} ms") return None, end - start @@ -845,7 +845,7 @@ class TCPUDSServerTransport(UDSServerTransport): async def handle_client( self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter ) -> None: - self.logger.info("New connection") + logger.info("New connection") response_times = [] while True: @@ -868,8 +868,8 @@ async def handle_client( except Exception: traceback.print_exc() - self.logger.info("Connection closed") - self.logger.info( + logger.info("Connection closed") + logger.info( f"Average response time: {sum(response_times) / len(response_times) * 1000:.2f}ms" ) diff --git a/src/gallia/services/xcp/__init__.py b/src/gallia/services/xcp/__init__.py index 3fdd0e114..717f51584 100644 --- a/src/gallia/services/xcp/__init__.py +++ b/src/gallia/services/xcp/__init__.py @@ -9,10 +9,11 @@ from gallia.transports import BaseTransport from gallia.transports.can import RawCANTransport +logger = get_logger("gallia.xcp") + class XCPService: def __init__(self, transport: BaseTransport, timeout: float = 1.0) -> None: - self.logger = get_logger("xcp") self.timeout = timeout self.transport = transport # This uses construct types which would result in a new @@ -23,7 +24,7 @@ async def request(self, data: bytes, timeout: float | None = None) -> bytes: t = timeout if timeout else self.timeout resp = await self.transport.request(data, t) header = types.Response.parse(resp) - self.logger.info(header) + logger.info(header) if int(header.type) != 255: raise ValueError( f"Unknown response type: {header.type}, maybe no XCP packet?" @@ -32,49 +33,49 @@ async def request(self, data: bytes, timeout: float | None = None) -> bytes: return resp[1:] async def connect(self) -> None: - self.logger.info("XCP CONNECT") + logger.info("XCP CONNECT") resp = await self.request(bytes([0xFF, 0x00])) tmp = types.ConnectResponsePartial.parse(resp) self.byte_order = tmp.commModeBasic.byteOrder tmp = types.ConnectResponse.parse(resp, byteOrder=self.byte_order) - self.logger.info(tmp) - self.logger.result("XCP CONNECT -> OK") + logger.info(tmp) + logger.result("XCP CONNECT -> OK") async def disconnect(self) -> None: - self.logger.info("XCP DISCONNECT") + logger.info("XCP DISCONNECT") resp = await self.request(bytes([0xFE, 0x00])) - self.logger.info(resp) - self.logger.result("XCP DISCONNECT -> OK") + logger.info(resp) + logger.result("XCP DISCONNECT -> OK") async def get_status(self) -> None: - self.logger.info("XCP GET_STATUS") + logger.info("XCP GET_STATUS") resp = await self.request(bytes([0xFD])) tmp = types.GetStatusResponse.parse(resp, byteOrder=self.byte_order) - self.logger.info(tmp) - self.logger.result("XCP GET_STATUS -> OK") + logger.info(tmp) + logger.result("XCP GET_STATUS -> OK") async def get_comm_mode_info(self) -> None: - self.logger.info("XCP GET_COMM_MODE_INFO") + logger.info("XCP GET_COMM_MODE_INFO") resp = await self.request(bytes([0xFB])) tmp = types.GetCommModeInfoResponse.parse( resp, byteOrder=self.byte_order, ) - self.logger.info(tmp) - self.logger.result("XCP GET_COMM_MODE_INFO -> OK") + logger.info(tmp) + logger.result("XCP GET_COMM_MODE_INFO -> OK") async def get_id(self, id_: int) -> None: - self.logger.info(f"XCP GET_ID({id_})") + logger.info(f"XCP GET_ID({id_})") resp = await self.request(bytes([0xFA, id_])) tmp = types.GetIDResponse.parse(resp, byteOrder=self.byte_order) - self.logger.info(tmp) - self.logger.result(f"XCP GET_ID({id_}) -> OK") + logger.info(tmp) + logger.result(f"XCP GET_ID({id_}) -> OK") async def upload(self, length: int) -> None: - self.logger.info(f"XCP GET_UPLOAD({length}") + logger.info(f"XCP GET_UPLOAD({length}") resp = await self.request(bytes([0xF5, length])) - self.logger.info(resp) - self.logger.result(f"XCP GET_UPLOAD({length} -> OK") + logger.info(resp) + logger.result(f"XCP GET_UPLOAD({length} -> OK") class CANXCPSerivce(XCPService): @@ -103,7 +104,7 @@ async def request(self, data: bytes, timeout: float | None = None) -> bytes: break header = types.Response.parse(resp) - self.logger.info(header) + logger.info(header) if int(header.type) != 255: raise ValueError( f"Unknown response type: {header.type}, maybe no XCP packet?" diff --git a/src/gallia/transports/base.py b/src/gallia/transports/base.py index 906267a16..6a72379b4 100644 --- a/src/gallia/transports/base.py +++ b/src/gallia/transports/base.py @@ -13,9 +13,11 @@ from typing_extensions import Protocol -from gallia.log import Logger, get_logger +from gallia.log import get_logger from gallia.utils import join_host_port +logger = get_logger("gallia.transport.base") + class TargetURI: """TargetURI represents a target to which gallia can connect. @@ -94,7 +96,6 @@ def qs_flat(self) -> dict[str, str]: class TransportProtocol(Protocol): mutex: asyncio.Lock - logger: Logger target: TargetURI is_closed: bool @@ -134,7 +135,6 @@ class BaseTransport(ABC): def __init__(self, target: TargetURI) -> None: self.mutex = asyncio.Lock() - self.logger = get_logger(self.SCHEME) self.target = target self.is_closed = False @@ -180,7 +180,7 @@ async def reconnect(self: TransportT, timeout: float | None = None) -> Transport try: await self.close() except ConnectionError as e: - self.logger.warning(f"close() failed during reconnect ({e}); ignoring") + logger.warning(f"close() failed during reconnect ({e}); ignoring") return await self.connect(self.target) @abstractmethod @@ -239,7 +239,7 @@ async def write( ) -> int: t = tags + ["write"] if tags is not None else ["write"] - self.logger.trace(data.hex() + "0a", extra={"tags": t}) + logger.trace(data.hex() + "0a", extra={"tags": t}) writer = self.get_writer() writer.write(binascii.hexlify(data) + b"\n") @@ -255,6 +255,6 @@ async def read( d = data.decode().strip() t = tags + ["read"] if tags is not None else ["read"] - self.logger.trace(d + "0a", extra={"tags": t}) + logger.trace(d + "0a", extra={"tags": t}) return binascii.unhexlify(d) diff --git a/src/gallia/transports/can.py b/src/gallia/transports/can.py index d53c8de0c..b36ab1952 100644 --- a/src/gallia/transports/can.py +++ b/src/gallia/transports/can.py @@ -12,9 +12,12 @@ from can import Message from pydantic import BaseModel, field_validator +from gallia.log import get_logger from gallia.transports.base import BaseTransport, TargetURI from gallia.utils import auto_int +logger = get_logger("gallia.transport.can-raw") + CANFD_MTU = 72 CAN_MTU = 16 @@ -178,9 +181,9 @@ async def sendto( ) t = tags + ["write"] if tags is not None else ["write"] if self.config.is_extended: - self.logger.trace(f"{dst:08x}#{data.hex()}", extra={"tags": t}) + logger.trace(f"{dst:08x}#{data.hex()}", extra={"tags": t}) else: - self.logger.trace(f"{dst:03x}#{data.hex()}", extra={"tags": t}) + logger.trace(f"{dst:03x}#{data.hex()}", extra={"tags": t}) loop = asyncio.get_running_loop() await asyncio.wait_for(loop.sock_sendall(self._sock, msg.pack()), timeout) @@ -197,11 +200,11 @@ async def recvfrom( t = tags + ["read"] if tags is not None else ["read"] if msg.is_extended_id: - self.logger.trace( + logger.trace( f"{msg.arbitration_id:08x}#{msg.data.hex()}", extra={"tags": t} ) else: - self.logger.trace( + logger.trace( f"{msg.arbitration_id:03x}#{msg.data.hex()}", extra={"tags": t} ) return msg.arbitration_id, msg.data @@ -220,7 +223,7 @@ async def get_idle_traffic(self, sniff_time: float) -> list[int]: try: addr, _ = await self.recvfrom(timeout=1) if addr not in addr_idle: - self.logger.info(f"Received a message from {addr:03x}") + logger.info(f"Received a message from {addr:03x}") addr_idle.append(addr) except asyncio.TimeoutError: continue diff --git a/src/gallia/transports/doip.py b/src/gallia/transports/doip.py index c08a9591a..ed4e0d4f5 100644 --- a/src/gallia/transports/doip.py +++ b/src/gallia/transports/doip.py @@ -16,6 +16,8 @@ from gallia.transports.base import BaseTransport, TargetURI from gallia.utils import auto_int +logger = get_logger("gallia.transport.doip") + @unique class ProtocolVersions(IntEnum): @@ -320,7 +322,6 @@ def __init__( src_addr: int, target_addr: int, ): - self.logger = get_logger("doip") self.reader = reader self.writer = writer self.src_addr = src_addr @@ -376,11 +377,11 @@ async def _read_worker(self) -> None: continue await self._read_queue.put((hdr, data)) except asyncio.CancelledError: - self.logger.debug("read worker cancelled") + logger.debug("read worker cancelled") except asyncio.IncompleteReadError as e: - self.logger.debug(f"read worker received EOF: {e}") + logger.debug(f"read worker received EOF: {e}") except Exception as e: - self.logger.critical(f"read worker died with {type(e)}: {e}") + logger.critical(f"read worker died with {type(e)}: {e}") async def read_frame_unsafe(self) -> DoIPFrame: # Avoid waiting on the queue forever when @@ -398,7 +399,7 @@ async def read_diag_request_raw(self) -> DoIPDiagFrame: while True: hdr, payload = await self.read_frame() if not isinstance(payload, DiagnosticMessage): - self.logger.warning( + logger.warning( f"expected DoIP DiagnosticMessage, instead got: {hdr} {payload}" ) unexpected_packets.append((hdr, payload)) @@ -407,7 +408,7 @@ async def read_diag_request_raw(self) -> DoIPDiagFrame: payload.SourceAddress != self.target_addr or payload.TargetAddress != self.src_addr ): - self.logger.warning( + logger.warning( f"DoIP-DiagnosticMessage: unexpected addresses (src:dst); expected {self.src_addr}:{self.target_addr} but got: {payload.SourceAddress:#04x}:{payload.TargetAddress:#04x}" ) unexpected_packets.append((hdr, payload)) @@ -430,7 +431,7 @@ async def _read_ack(self, prev_data: bytes) -> None: if not isinstance( payload, DiagnosticMessagePositiveAcknowledgement ) and not isinstance(payload, DiagnosticMessageNegativeAcknowledgement): - self.logger.warning( + logger.warning( f"expected DoIP positive/negative ACK, instead got: {hdr} {payload}" ) unexpected_packets.append((hdr, payload)) @@ -440,7 +441,7 @@ async def _read_ack(self, prev_data: bytes) -> None: payload.SourceAddress != self.target_addr or payload.TargetAddress != self.src_addr ): - self.logger.warning( + logger.warning( f"DoIP-ACK: unexpected addresses (src:dst); expected {self.src_addr}:{self.target_addr} but got: {payload.SourceAddress:#04x}:{payload.TargetAddress:#04x}" ) unexpected_packets.append((hdr, payload)) @@ -449,8 +450,8 @@ async def _read_ack(self, prev_data: bytes) -> None: len(payload.PreviousDiagnosticMessageData) > 0 and prev_data != payload.PreviousDiagnosticMessageData ): - self.logger.warning("ack: previous data differs from request") - self.logger.warning( + logger.warning("ack: previous data differs from request") + logger.warning( f"DoIP-ACK: got: {payload.PreviousDiagnosticMessageData.hex()} expected {prev_data.hex()}" ) unexpected_packets.append((hdr, payload)) @@ -469,7 +470,7 @@ async def _read_routing_activation_response(self) -> None: while True: hdr, payload = await self.read_frame_unsafe() if not isinstance(payload, RoutingActivationResponse): - self.logger.warning( + logger.warning( f"expected DoIP RoutingActivationResponse, instead got: {hdr} {payload}" ) unexpected_packets.append((hdr, payload)) @@ -496,7 +497,7 @@ async def write_request_raw(self, hdr: GenericHeader, payload: DoIPOutData) -> N self.writer.write(buf) await self.writer.drain() - self.logger.trace(f"hdr: {hdr}, payload: {payload}") + logger.trace(f"hdr: {hdr}, payload: {payload}") try: match payload: @@ -656,7 +657,7 @@ async def read( data = await asyncio.wait_for(self._conn.read_diag_request(), timeout) t = tags + ["read"] if tags is not None else ["read"] - self.logger.trace(data.hex(), extra={"tags": t}) + logger.trace(data.hex(), extra={"tags": t}) return data async def write( @@ -666,7 +667,7 @@ async def write( tags: list[str] | None = None, ) -> int: t = tags + ["write"] if tags is not None else ["write"] - self.logger.trace(data.hex(), extra={"tags": t}) + logger.trace(data.hex(), extra={"tags": t}) try: await asyncio.wait_for(self._conn.write_diag_request(data), timeout) @@ -676,7 +677,7 @@ async def write( # TargetUnreachable can be just a temporary issue. Thus, we do not raise # BrokenPipeError but instead ignore it here and let upper layers handle # missing responses (i.e. raise a TimeoutError instead) - self.logger.debug("DoIP message was ACKed with TargetUnreachable") + logger.debug("DoIP message was ACKed with TargetUnreachable") raise asyncio.TimeoutError from e return len(data) diff --git a/src/gallia/transports/isotp.py b/src/gallia/transports/isotp.py index e2bef8c90..df402b99a 100644 --- a/src/gallia/transports/isotp.py +++ b/src/gallia/transports/isotp.py @@ -11,9 +11,12 @@ from pydantic import BaseModel, field_validator +from gallia.log import get_logger from gallia.transports.base import BaseTransport, TargetURI from gallia.utils import auto_int +logger = get_logger("gallia.transport.isotp") + # Socket Constants not available in the socket module, # see linux/can/isotp.h # TODO: Can be removed in the future… @@ -178,7 +181,7 @@ async def write( tags: list[str] | None = None, ) -> int: t = tags + ["write"] if tags is not None else ["write"] - self.logger.trace(data.hex(), extra={"tags": t}) + logger.trace(data.hex(), extra={"tags": t}) loop = asyncio.get_running_loop() await asyncio.wait_for(loop.sock_sendall(self._sock, data), timeout) @@ -198,7 +201,7 @@ async def read( if e.errno == errno.EILSEQ: raise BrokenPipeError(f"invalid consecutive frame numbers: {e}") from e raise e - self.logger.trace(data.hex(), extra={"tags": tags}) + logger.trace(data.hex(), extra={"tags": tags}) return data async def close(self) -> None: diff --git a/src/gallia/transports/tcp.py b/src/gallia/transports/tcp.py index 9f8476fee..b934bb429 100644 --- a/src/gallia/transports/tcp.py +++ b/src/gallia/transports/tcp.py @@ -6,8 +6,11 @@ import asyncio +from gallia.log import get_logger from gallia.transports.base import BaseTransport, LinesTransportMixin, TargetURI +logger = get_logger("gallia.transport.tcp") + class TCPTransport(BaseTransport, scheme="tcp"): def __init__( @@ -46,7 +49,7 @@ async def write( tags: list[str] | None = None, ) -> int: t = tags + ["write"] if tags is not None else ["write"] - self.logger.trace(data.hex(), extra={"tags": t}) + logger.trace(data.hex(), extra={"tags": t}) self.writer.write(data) await asyncio.wait_for(self.writer.drain(), timeout) @@ -60,7 +63,7 @@ async def read( data = await asyncio.wait_for(self.reader.read(self.BUFSIZE), timeout) t = tags + ["read"] if tags is not None else ["read"] - self.logger.trace(data.hex(), extra={"tags": t}) + logger.trace(data.hex(), extra={"tags": t}) return data diff --git a/src/gallia/transports/unix.py b/src/gallia/transports/unix.py index 5071cbb45..ed0c81849 100644 --- a/src/gallia/transports/unix.py +++ b/src/gallia/transports/unix.py @@ -6,8 +6,11 @@ import asyncio +from gallia.log import get_logger from gallia.transports.base import BaseTransport, LinesTransportMixin, TargetURI +logger = get_logger("gallia.transport.unix") + class UnixTransport(BaseTransport, scheme="unix"): def __init__( @@ -46,7 +49,7 @@ async def write( tags: list[str] | None = None, ) -> int: t = tags + ["write"] if tags is not None else ["write"] - self.logger.trace(data.hex(), extra={"tags": t}) + logger.trace(data.hex(), extra={"tags": t}) self.writer.write(data) await asyncio.wait_for(self.writer.drain(), timeout) @@ -59,7 +62,7 @@ async def read( ) -> bytes: data = await self.reader.read() t = tags + ["read"] if tags is not None else ["read"] - self.logger.trace(data.hex(), extra={"tags": t}) + logger.trace(data.hex(), extra={"tags": t}) return data diff --git a/src/gallia/utils.py b/src/gallia/utils.py index 793f0e800..c687a05ad 100644 --- a/src/gallia/utils.py +++ b/src/gallia/utils.py @@ -18,6 +18,8 @@ import aiofiles +from gallia.log import Loglevel + if TYPE_CHECKING: from gallia.db.handler import DBHandler from gallia.transports import TargetURI @@ -223,3 +225,24 @@ def dump_args(args: Namespace) -> dict[str, str | int | float]: settings[key] = value return settings + + +def get_log_level(args: Namespace) -> Loglevel: + level = Loglevel.INFO + if hasattr(args, "verbose"): + if args.verbose == 1: + level = Loglevel.DEBUG + elif args.verbose >= 2: + level = Loglevel.TRACE + return level + + +def get_file_log_level(args: Namespace) -> Loglevel: + level = Loglevel.DEBUG + if hasattr(args, "trace_log"): + if args.trace_log: + level = Loglevel.TRACE + elif hasattr(args, "verbose"): + if args.verbose >= 2: + level = Loglevel.TRACE + return level diff --git a/src/hr/__init__.py b/src/hr/__init__.py index cd4d8084c..02e9c5b87 100644 --- a/src/hr/__init__.py +++ b/src/hr/__init__.py @@ -9,7 +9,7 @@ from typing import cast import msgspec -from gallia.log import ColorMode, PenlogPriority, PenlogReader, set_color_mode +from gallia.log import ColorMode, PenlogPriority, PenlogReader, resolve_color_mode def parse_args() -> argparse.Namespace: @@ -66,7 +66,7 @@ def _main() -> int: print(f"not a regular file: {path}", file=sys.stderr) return 1 - set_color_mode(ColorMode(args.color), stream=sys.stdout) + colored = resolve_color_mode(ColorMode(args.color), stream=sys.stdout) with PenlogReader(path) as reader: record_generator = reader.records(args.priority, reverse=args.reverse) @@ -76,7 +76,8 @@ def _main() -> int: record_generator = reader.records(args.priority, offset=-args.lines) for record in record_generator: - print(record) + record.colored = colored + print(record, end="") return 0