From 8bf1d09d8202253016ac6e8102ec83eea1ada4d9 Mon Sep 17 00:00:00 2001 From: "Jarisch, Ferdinand" Date: Thu, 28 Sep 2023 22:57:07 +0200 Subject: [PATCH] fix: Remove auto-discovery of sessions from multiple scanners Since we have a dedicated scanner for identifying available sessions it does not make sense to implement a low-effort variant of sessions-scanning for multiple scanners with totally different tasks. Moreover, this change allows to run these scanners without requiring the availability of DiagnosticSessionControl, as long as no --sessions flag is given. --- src/gallia/commands/scan/uds/identifiers.py | 217 ++++++++++---------- src/gallia/commands/scan/uds/reset.py | 202 +++++++++--------- src/gallia/commands/scan/uds/services.py | 173 ++++++++-------- src/gallia/services/uds/ecu.py | 15 -- 4 files changed, 293 insertions(+), 314 deletions(-) diff --git a/src/gallia/commands/scan/uds/identifiers.py b/src/gallia/commands/scan/uds/identifiers.py index 88ac4aafc..26565b6ff 100644 --- a/src/gallia/commands/scan/uds/identifiers.py +++ b/src/gallia/commands/scan/uds/identifiers.py @@ -77,7 +77,7 @@ def configure_parser(self) -> None: nargs="?", const=1, type=int, - help="Check current session via read DID [for every nth DataIdentifier] and try to recover session", + help="Check current session via read DID [for every nth DataIdentifier] and try to recover session; only takes affect if --sessions is given", ) self.parser.add_argument( "--skip", @@ -94,6 +94,7 @@ def configure_parser(self) -> None: - 0x10-0x2f - 0x01:0xf3,0x10-0x2f Multiple session specific skips are separated by space. + Only takes affect if --sessions is given. """, ) self.parser.add_argument( @@ -104,130 +105,132 @@ def configure_parser(self) -> None: async def main(self, args: Namespace) -> None: if args.sessions is None: - logger.info("No sessions specified, starting with session scan") - # Only until 0x80 because the eight bit is "SuppressResponse" - sessions = [ - s - for s in range(1, 0x80) - if s not in args.skip or args.skip[s] is not None - ] - sessions = await self.ecu.find_sessions(sessions) - logger.result(f"Found {len(sessions)} sessions: {g_repr(sessions)}") + logger.notice("Performing scan in current session") + await self.perform_scan(args) + else: - sessions = [ + sessions: list[int] = [ s for s in args.sessions if s not in args.skip or args.skip[s] is not None ] + logger.info(f"testing sessions {g_repr(sessions)}") - logger.info(f"testing sessions {g_repr(sessions)}") + # TODO: Unified shortened output necessary here + logger.info(f"skipping identifiers {reprlib.repr(args.skip)}") - # TODO: Unified shortened output necessary here - logger.info(f"skipping identifiers {reprlib.repr(args.skip)}") + for session in sessions: + logger.notice(f"Switching to session {g_repr(session)}") + resp: UDSResponse = await self.ecu.set_session(session) + if isinstance(resp, NegativeResponse): + logger.warning( + f"Switching to session {g_repr(session)} failed: {resp}" + ) + continue - for session in sessions: - logger.notice(f"Switching to session {g_repr(session)}") - resp: UDSResponse = await self.ecu.set_session(session) - if isinstance(resp, NegativeResponse): - logger.warning(f"Switching to session {g_repr(session)} failed: {resp}") + logger.result(f"Starting scan in session: {g_repr(session)}") + + await self.perform_scan(args, session) + + logger.result(f"Scan in session {g_repr(session)} is complete!") + logger.info(f"Leaving session {g_repr(session)} via hook") + await self.ecu.leave_session(session) + + async def perform_scan(self, args: Namespace, session: None | int = None) -> None: + positive_DIDs = 0 + abnormal_DIDs = 0 + timeout_DIDs = 0 + sub_functions = [0x00] + + if args.sid == UDSIsoServices.RoutineControl: + if not args.payload: + logger.warning( + "Scanning RoutineControl with empty payload can successfully execute some " + + "routines that might have irreversible effects without elevated privileges" + ) + + # Scan all three subfunctions (startRoutine, stopRoutine, requestRoutineResults) + sub_functions = list(map(int, RoutineControlSubFuncs)) + + if args.sid == UDSIsoServices.SecurityAccess and args.end > 0xFF: + logger.warning( + "Service 0x27 SecurityAccess only accepts subFunctions (1-byte identifiers); " + + f"limiting END to {g_repr(0xff)} instead of {g_repr(args.end)}" + ) + args.end = 0xFF + + for DID, sub_function in product( + range(args.start, args.end + 1), sub_functions + ): + if session in args.skip and DID in args.skip[session]: + logger.info(f"{g_repr(DID)}: skipped") continue - logger.result(f"Starting scan in session: {g_repr(session)}") - positive_DIDs = 0 - abnormal_DIDs = 0 - timeout_DIDs = 0 - sub_functions = [0x00] - - if args.sid == UDSIsoServices.RoutineControl: - if not args.payload: - logger.warning( - "Scanning RoutineControl with empty payload can successfully execute some " - + "routines, such as switching from plant mode to field mode, which can only " - + "be reversed with a valid token!" + if ( + session is not None + and args.check_session + and DID % args.check_session == 0 + ): + # Check session and try to recover from wrong session (max 3 times), else skip session + if not await self.ecu.check_and_set_session(session): + logger.error( + f"Aborting scan on session {g_repr(session)}; current DID was {g_repr(DID)}" ) - - # Scan all three subfunctions (startRoutine, stopRoutine, requestRoutineResults) - sub_functions = list(map(int, RoutineControlSubFuncs)) + break if args.sid == UDSIsoServices.SecurityAccess: - if args.end > 0xFF: - logger.warning( - "Service 0x27 SecurityAccess only accepts subFunctions (1-byte identifiers); " - + f"limiting END to {g_repr(0xff)} instead of {g_repr(args.end)}" + if DID & 0b10000000: + logger.info( + "Keep in mind that you set the SuppressResponse Bit (8th bit): " + + f"{g_repr(DID)} = 0b{DID:b}" ) - args.end = 0xFF + pdu = bytes([args.sid, DID]) + + elif args.sid == UDSIsoServices.RoutineControl: + pdu = bytes( + [args.sid, sub_function, DID >> 8, DID & 0xFF] + ) # Needs extra byte for sub function + + # DefaultBehavior, e.g. for ReadDataByIdentifier/WriteDataByIdentifier + else: + pdu = bytes([args.sid, DID >> 8, DID & 0xFF]) + + if args.payload: + pdu += args.payload + + try: + resp = await self.ecu.send_raw( + pdu, config=UDSRequestConfig(tags=["ANALYZE"], max_retry=3) + ) + except asyncio.TimeoutError: + logger.result(f"{g_repr(DID)}: Retries exceeded") + timeout_DIDs += 1 + continue + except IllegalResponse as e: + logger.warning(g_repr(e)) + continue - for DID, sub_function in product( - range(args.start, args.end + 1), sub_functions - ): - if session in args.skip and DID in args.skip[session]: - logger.info(f"{g_repr(DID)}: skipped") - continue + if isinstance(resp, NegativeResponse): + if suggests_service_not_supported(resp): + logger.info( + f"{g_repr(DID)}: {resp}; does session {g_repr(session)} " + f"support service {service_repr(args.sid)}?" + ) - if args.check_session and DID % args.check_session == 0: - # Check session and try to recover from wrong session (max 3 times), else skip session - if not await self.ecu.check_and_set_session(session): - logger.error( - f"Aborting scan on session {g_repr(session)}; current DID was {g_repr(DID)}" - ) + if args.skip_not_supported: break - if args.sid == UDSIsoServices.SecurityAccess: - if DID & 0b10000000: - logger.info( - "Keep in mind that you set the SuppressResponse Bit (8th bit): " - + f"{g_repr(DID)} = 0b{DID:b}" - ) - pdu = bytes([args.sid, DID]) - - elif args.sid == UDSIsoServices.RoutineControl: - pdu = bytes( - [args.sid, sub_function, DID >> 8, DID & 0xFF] - ) # Needs extra byte for sub function + # RequestOutOfRange is a common reply for invalid/unknown DataIdentifiers + elif resp.response_code == UDSErrorCodes.requestOutOfRange: + logger.info(f"{g_repr(DID)}: {resp}") - # DefaultBehaviour, e.g. for ReadDataByIdentifier/WriteDataByIdentifier - else: - pdu = bytes([args.sid, DID >> 8, DID & 0xFF]) - - if args.payload: - pdu += args.payload - - try: - resp = await self.ecu.send_raw( - pdu, config=UDSRequestConfig(tags=["ANALYZE"], max_retry=3) - ) - except asyncio.TimeoutError: - logger.result(f"{g_repr(DID)}: Retries exceeded") - timeout_DIDs += 1 - continue - except IllegalResponse as e: - logger.warning(g_repr(e)) - - if isinstance(resp, NegativeResponse): - if suggests_service_not_supported(resp): - logger.info( - f"{g_repr(DID)}: {resp}; does session {g_repr(session)} " - f"support service {service_repr(args.sid)}?" - ) - - if args.skip_not_supported: - break - - # RequestOutOfRange is a common reply for invalid DataIdentifiers - elif resp.response_code == UDSErrorCodes.requestOutOfRange: - logger.info(f"{g_repr(DID)}: {resp}") - - else: - logger.result(f"{g_repr(DID)}: {resp}") - abnormal_DIDs += 1 else: logger.result(f"{g_repr(DID)}: {resp}") - positive_DIDs += 1 - - logger.result(f"Scan in session {g_repr(session)} is complete!") - logger.result(f"Positive replies: {positive_DIDs}") - logger.result(f"Abnormal replies: {abnormal_DIDs}") - logger.result(f"Timeouts: {timeout_DIDs}") - - logger.info(f"Leaving session {g_repr(session)} via hook") - await self.ecu.leave_session(session) + abnormal_DIDs += 1 + else: + logger.result(f"{g_repr(DID)}: {resp}") + positive_DIDs += 1 + + logger.result(f"Positive replies: {positive_DIDs}") + logger.result(f"Abnormal replies: {abnormal_DIDs}") + logger.result(f"Timeouts: {timeout_DIDs}") diff --git a/src/gallia/commands/scan/uds/reset.py b/src/gallia/commands/scan/uds/reset.py index 7ac5fe334..1c2f18947 100644 --- a/src/gallia/commands/scan/uds/reset.py +++ b/src/gallia/commands/scan/uds/reset.py @@ -50,139 +50,127 @@ def configure_parser(self) -> None: - 0x10-0x2f - 0x01:0xf3,0x10-0x2f Multiple session specific skips are separated by space. + Only takes affect if --sessions is given. """, ) self.parser.add_argument( "--skip-check-session", action="store_true", - help="skip check current session", + help="skip check current session; only takes affect if --sessions is given", ) async def main(self, args: Namespace) -> None: - l_ok: dict[int, list[int]] = {} - l_timeout: dict[int, list[int]] = {} - l_error: dict[int, list[Any]] = {} - if args.sessions is None: - logger.info("No sessions specified, starting with session scan") - # Only until 0x80 because the eight bit is "SuppressResponse" - sessions = [ - s - for s in range(1, 0x80) - if s not in args.skip or args.skip[s] is not None - ] - sessions = await self.ecu.find_sessions(sessions) - logger.result(f"Found {len(sessions)} sessions: {g_repr(sessions)}") + await self.perform_scan(args) else: - sessions = [ - s - for s in args.sessions - if s not in args.skip or args.skip[s] is not None - ] - - logger.info(f"testing sessions {g_repr(sessions)}") - - # TODO: Unified shortened output necessary here - logger.info(f"skipping identifiers {reprlib.repr(args.skip)}") - - for session in sessions: - logger.notice(f"Switching to session {g_repr(session)}") - resp: UDSResponse = await self.ecu.set_session(session) - if isinstance(resp, NegativeResponse): - logger.warning(f"Switching to session {g_repr(session)} failed: {resp}") - continue + sessions = args.sessions + logger.info(f"testing sessions {g_repr(sessions)}") - logger.result(f"Scanning in session: {g_repr(session)}") - l_ok[session] = [] - l_timeout[session] = [] - l_error[session] = [] + # TODO: Unified shortened output necessary here + logger.info(f"skipping identifiers {reprlib.repr(args.skip)}") - for sub_func in range(0x01, 0x80): - if session in args.skip and sub_func in args.skip[session]: - logger.notice( - f"skipping subFunc: {g_repr(sub_func)} because of --skip" + for session in sessions: + logger.notice(f"Switching to session {g_repr(session)}") + resp: UDSResponse = await self.ecu.set_session(session) + if isinstance(resp, NegativeResponse): + logger.warning( + f"Switching to session {g_repr(session)} failed: {resp}" ) continue - if not args.skip_check_session: - # Check session and try to recover from wrong session (max 3 times), else skip session - if not await self.ecu.check_and_set_session(session): - logger.error( - f"Aborting scan on session {g_repr(session)}; current sub-func was {g_repr(sub_func)}" - ) - break + logger.result(f"Scanning in session: {g_repr(session)}") + await self.perform_scan(args, session) + + await self.ecu.leave_session(session) + + async def perform_scan(self, args: Namespace, session: None | int = None) -> None: + l_ok: list[int] = [] + l_timeout: list[int] = [] + l_error: list[Any] = [] + + for sub_func in range(0x01, 0x80): + if session in args.skip and sub_func in args.skip[session]: + logger.notice(f"skipping subFunc: {g_repr(sub_func)} because of --skip") + continue + + if session is not None and not args.skip_check_session: + # Check session and try to recover from wrong session (max 3 times), else skip session + if not await self.ecu.check_and_set_session(session): + logger.error( + f"Aborting scan on session {g_repr(session)}; current sub-func was {g_repr(sub_func)}" + ) + break + try: try: - try: - resp = await self.ecu.ecu_reset( - sub_func, config=UDSRequestConfig(tags=["ANALYZE"]) - ) - if isinstance(resp, NegativeResponse): - if suggests_sub_function_not_supported(resp): - logger.info(f"{g_repr(sub_func)}: {resp}") - else: - l_error[session].append({sub_func: resp.response_code}) - msg = f"{g_repr(sub_func)}: with error code: {resp}" - logger.result(msg) - continue - except IllegalResponse as e: - logger.warning(f"{g_repr(e)}") - - logger.result(f"{g_repr(sub_func)}: reset level found!") - l_ok[session].append(sub_func) - logger.info("Waiting for the ECU to recover…") + resp = await self.ecu.ecu_reset( + sub_func, config=UDSRequestConfig(tags=["ANALYZE"]) + ) + if isinstance(resp, NegativeResponse): + if suggests_sub_function_not_supported(resp): + logger.info(f"{g_repr(sub_func)}: {resp}") + else: + l_error.append({sub_func: resp.response_code}) + msg = f"{g_repr(sub_func)}: with error code: {resp}" + logger.result(msg) + continue + except IllegalResponse as e: + logger.warning(f"{g_repr(e)}") + + logger.result(f"{g_repr(sub_func)}: reset level found!") + l_ok.append(sub_func) + logger.info("Waiting for the ECU to recover…") + await self.ecu.wait_for_ecu() + + logger.info("Reboot ECU to restore default conditions") + resp = await self.ecu.ecu_reset(0x01) + if isinstance(resp, NegativeResponse): + logger.warning( + f"Could not reboot ECU after testing reset level {g_repr(sub_func)}" + ) + else: await self.ecu.wait_for_ecu() - logger.info("Reboot ECU to restore default conditions") - resp = await self.ecu.ecu_reset(0x01) - if isinstance(resp, NegativeResponse): - logger.warning( - f"Could not reboot ECU after testing reset level {g_repr(sub_func)}" - ) - else: - await self.ecu.wait_for_ecu() - - except asyncio.TimeoutError: - l_timeout[session].append(sub_func) - if not args.power_cycle: - logger.error( - f"ECU did not respond after reset level {g_repr(sub_func)}; exit" - ) - sys.exit(1) + except asyncio.TimeoutError: + l_timeout.append(sub_func) + if not args.power_cycle: + logger.error( + f"ECU did not respond after reset level {g_repr(sub_func)}; exit" + ) + sys.exit(1) + + logger.warning( + f"ECU did not respond after reset level {g_repr(sub_func)}; try power cycle…" + ) + try: + await self.ecu.power_cycle() + await self.ecu.wait_for_ecu() + except (ConnectionError, asyncio.TimeoutError) as e: + logger.error(f"Failed to recover ECU: {g_repr(e)}; exit") + sys.exit(1) + except ConnectionError: + msg = f"{g_repr(sub_func)}: lost connection to ECU (post), current session: {g_repr(session)}" + logger.warning(msg) + await self.ecu.reconnect() + continue + # We reach this code only for positive responses + if session is not None and not args.skip_check_session: + try: + current_session = await self.ecu.read_session() + logger.result( + f"{g_repr(sub_func)}: Currently in session {g_repr(current_session)}, " + f"should be {g_repr(session)}" + ) + except UnexpectedNegativeResponse as e: logger.warning( - f"ECU did not respond after reset level {g_repr(sub_func)}; try power cycle…" + f"Could not read current session: {e.RESPONSE_CODE.name}" ) - try: - await self.ecu.power_cycle() - await self.ecu.wait_for_ecu() - except (ConnectionError, asyncio.TimeoutError) as e: - logger.error(f"Failed to recover ECU: {g_repr(e)}; exit") - sys.exit(1) - except ConnectionError: - msg = f"{g_repr(sub_func)}: lost connection to ECU (post), current session: {g_repr(session)}" - logger.warning(msg) - await self.ecu.reconnect() - continue - - # We reach this code only for positive responses - if not args.skip_check_session: - try: - current_session = await self.ecu.read_session() - logger.result( - f"{g_repr(sub_func)}: Currently in session {g_repr(current_session)}, " - f"should be {g_repr(session)}" - ) - except UnexpectedNegativeResponse as e: - logger.warning( - f"Could not read current session: {e.RESPONSE_CODE.name}" - ) + if session is not None: logger.info(f"Setting session {g_repr(session)}") await self.ecu.set_session(session) - await self.ecu.leave_session(session) - logger.result(f"ok: {l_ok}") logger.result(f"timeout: {l_timeout}") logger.result(f"with error: {l_error}") diff --git a/src/gallia/commands/scan/uds/services.py b/src/gallia/commands/scan/uds/services.py index 6790b0643..26ed1ac92 100644 --- a/src/gallia/commands/scan/uds/services.py +++ b/src/gallia/commands/scan/uds/services.py @@ -44,7 +44,7 @@ def configure_parser(self) -> None: "--check-session", action="store_true", default=False, - help="check current session", + help="check current session; only takes affect if --sessions is given", ) self.parser.add_argument( "--scan-response-ids", @@ -73,6 +73,7 @@ def configure_parser(self) -> None: - 0x10-0x2f - 0x01:0xf3,0x10-0x2f Multiple session specific skips are separated by space. + Only takes affect if --sessions is given. """, ) @@ -82,103 +83,43 @@ async def main(self, args: Namespace) -> None: found: dict[int, dict[int, Any]] = {} if args.sessions is None: - logger.info("No sessions specified, starting with session scan") - # Only until 0x80 because the eight bit is "SuppressResponse" - sessions = [ - s - for s in range(1, 0x80) - if s not in args.skip or args.skip[s] is not None - ] - sessions = await self.ecu.find_sessions(sessions) - logger.result(f"Found {len(sessions)} sessions: {g_repr(sessions)}") + found[0] = await self.perform_scan(args) else: sessions = [ s for s in args.sessions if s not in args.skip or args.skip[s] is not None ] + logger.info(f"testing sessions {g_repr(sessions)}") - logger.info(f"testing sessions {g_repr(sessions)}") - - # TODO: Unified shortened output necessary here - logger.info(f"skipping identifiers {reprlib.repr(args.skip)}") - - for session in sessions: - logger.info(f"Changing to session {g_repr(session)}") - try: - resp: UDSResponse = await self.ecu.set_session( - session, UDSRequestConfig(tags=["preparation"]) - ) - except (UDSException, RuntimeError) as e: # FIXME why catch RuntimeError? - logger.warning( - f"Could not complete session change to {g_repr(session)}: {g_repr(e)}; skipping session" - ) - continue - if isinstance(resp, NegativeResponse): - logger.warning( - f"Could not complete session change to {g_repr(session)}: {resp}; skipping session" - ) - continue + # TODO: Unified shortened output necessary here + logger.info(f"skipping identifiers {reprlib.repr(args.skip)}") - found[session] = {} - logger.result(f"scanning in session {g_repr(session)}") - - # Starts at 0x00, see first loop iteration. - sid = -1 - while sid < 0xFF: - sid += 1 - if sid & 0x40 and not args.scan_response_ids: + for session in sessions: + logger.info(f"Changing to session {g_repr(session)}") + try: + resp: UDSResponse = await self.ecu.set_session( + session, UDSRequestConfig(tags=["preparation"]) + ) + except ( + UDSException, + RuntimeError, + ) as e: # FIXME why catch RuntimeError? + logger.warning( + f"Could not complete session change to {g_repr(session)}: {g_repr(e)}; skipping session" + ) continue - - if session in args.skip and sid in args.skip[session]: - logger.info(f"{g_repr(sid)}: skipped") + if isinstance(resp, NegativeResponse): + logger.warning( + f"Could not complete session change to {g_repr(session)}: {resp}; skipping session" + ) continue - if args.check_session: - if not await self.ecu.check_and_set_session(session): - logger.error( - f"Aborting scan on session {g_repr(session)}; current SID was {g_repr(sid)}" - ) - break - - for length_payload in [1, 2, 3, 5]: - pdu = bytes([sid]) + bytes(length_payload) - try: - resp = await self.ecu.send_raw( - pdu, config=UDSRequestConfig(tags=["ANALYZE"]) - ) - except asyncio.TimeoutError: - logger.info(f"{g_repr(sid)}: timeout") - continue - except MalformedResponse as e: - logger.warning( - f"{g_repr(sid)}: {e!r} occurred, this needs to be investigated!" - ) - continue - except Exception as e: - logger.info(f"{g_repr(sid)}: {e!r} occurred") - await self.ecu.reconnect() - continue - - if isinstance(resp, NegativeResponse) and resp.response_code in [ - UDSErrorCodes.serviceNotSupported, - UDSErrorCodes.serviceNotSupportedInActiveSession, - ]: - logger.info(f"{g_repr(sid)}: not supported [{resp}]") - break - - if isinstance(resp, NegativeResponse) and resp.response_code in [ - UDSErrorCodes.incorrectMessageLengthOrInvalidFormat, - ]: - continue + logger.result(f"scanning in session {g_repr(session)}") - logger.result( - f"{g_repr(sid)}: available in session {g_repr(session)}: {resp}" - ) - found[session][sid] = resp - break + found[session] = await self.perform_scan(args) - await self.ecu.leave_session(session) + await self.ecu.leave_session(session) for key, value in found.items(): logger.result(f"findings in session 0x{key:02X}:") @@ -190,3 +131,65 @@ async def main(self, args: Namespace) -> None: ) except Exception: logger.result(f" [{g_repr(sid)}] vendor specific sid: {data}") + + async def perform_scan( + self, args: Namespace, session: None | int = None + ) -> dict[int, Any]: + result: dict[int, Any] = {} + + # Starts at 0x00, see first loop iteration. + sid = -1 + while sid < 0xFF: + sid += 1 + if sid & 0x40 and not args.scan_response_ids: + continue + + if session in args.skip and sid in args.skip[session]: + logger.info(f"{g_repr(sid)}: skipped") + continue + + if session is not None and args.check_session: + if not await self.ecu.check_and_set_session(session): + logger.error( + f"Aborting scan on session {g_repr(session)}; current SID was {g_repr(sid)}" + ) + break + + for length_payload in [1, 2, 3, 5]: + pdu = bytes([sid]) + bytes(length_payload) + try: + resp = await self.ecu.send_raw( + pdu, config=UDSRequestConfig(tags=["ANALYZE"]) + ) + except asyncio.TimeoutError: + logger.info(f"{g_repr(sid)}: timeout") + continue + except MalformedResponse as e: + logger.warning( + f"{g_repr(sid)}: {e!r} occurred, this needs to be investigated!" + ) + continue + except Exception as e: + logger.info(f"{g_repr(sid)}: {e!r} occurred") + await self.ecu.reconnect() + continue + + if isinstance(resp, NegativeResponse) and resp.response_code in [ + UDSErrorCodes.serviceNotSupported, + UDSErrorCodes.serviceNotSupportedInActiveSession, + ]: + logger.info(f"{g_repr(sid)}: not supported [{resp}]") + break + + if isinstance(resp, NegativeResponse) and resp.response_code in [ + UDSErrorCodes.incorrectMessageLengthOrInvalidFormat, + ]: + continue + + logger.result( + f"{g_repr(sid)}: available in session {g_repr(session)}: {resp}" + ) + result[sid] = resp + break + + return result diff --git a/src/gallia/services/uds/ecu.py b/src/gallia/services/uds/ecu.py index 191244788..d32b89ab2 100644 --- a/src/gallia/services/uds/ecu.py +++ b/src/gallia/services/uds/ecu.py @@ -246,21 +246,6 @@ async def leave_session( await self.reconnect() return True - async def find_sessions(self, search: list[int], max_retry: int = 4) -> list[int]: - sessions = [] - for sid in search: - try: - resp = await self.set_session( - sid, config=UDSRequestConfig(max_retry=max_retry) - ) - if isinstance(resp, service.NegativeResponse): - continue - except Exception: - continue - sessions.append(sid) - await self.leave_session(sid) - return sessions - async def set_session( self, level: int,