Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Remove auto-discovery of sessions from multiple scanners #457

Merged
merged 1 commit into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 110 additions & 107 deletions src/gallia/commands/scan/uds/identifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def configure_parser(self) -> None:
nargs="?",
const=1,
type=int,
help="Check current session via read DID [for every nth DataIdentifier] and try to recover session",
help="Check current session via read DID [for every nth DataIdentifier] and try to recover session; only takes affect if --sessions is given",
)
self.parser.add_argument(
"--skip",
Expand All @@ -94,6 +94,7 @@ def configure_parser(self) -> None:
- 0x10-0x2f
- 0x01:0xf3,0x10-0x2f
Multiple session specific skips are separated by space.
Only takes affect if --sessions is given.
""",
)
self.parser.add_argument(
Expand All @@ -104,130 +105,132 @@ def configure_parser(self) -> None:

async def main(self, args: Namespace) -> None:
if args.sessions is None:
logger.info("No sessions specified, starting with session scan")
# Only until 0x80 because the eight bit is "SuppressResponse"
sessions = [
s
for s in range(1, 0x80)
if s not in args.skip or args.skip[s] is not None
]
sessions = await self.ecu.find_sessions(sessions)
logger.result(f"Found {len(sessions)} sessions: {g_repr(sessions)}")
logger.notice("Performing scan in current session")
await self.perform_scan(args)

else:
sessions = [
sessions: list[int] = [
s
for s in args.sessions
if s not in args.skip or args.skip[s] is not None
]
logger.info(f"testing sessions {g_repr(sessions)}")

logger.info(f"testing sessions {g_repr(sessions)}")
# TODO: Unified shortened output necessary here
logger.info(f"skipping identifiers {reprlib.repr(args.skip)}")

# TODO: Unified shortened output necessary here
logger.info(f"skipping identifiers {reprlib.repr(args.skip)}")
for session in sessions:
logger.notice(f"Switching to session {g_repr(session)}")
resp: UDSResponse = await self.ecu.set_session(session)
if isinstance(resp, NegativeResponse):
logger.warning(
f"Switching to session {g_repr(session)} failed: {resp}"
)
continue

for session in sessions:
logger.notice(f"Switching to session {g_repr(session)}")
resp: UDSResponse = await self.ecu.set_session(session)
if isinstance(resp, NegativeResponse):
logger.warning(f"Switching to session {g_repr(session)} failed: {resp}")
logger.result(f"Starting scan in session: {g_repr(session)}")

await self.perform_scan(args, session)

logger.result(f"Scan in session {g_repr(session)} is complete!")
logger.info(f"Leaving session {g_repr(session)} via hook")
await self.ecu.leave_session(session)

async def perform_scan(self, args: Namespace, session: None | int = None) -> None:
positive_DIDs = 0
abnormal_DIDs = 0
timeout_DIDs = 0
sub_functions = [0x00]

if args.sid == UDSIsoServices.RoutineControl:
if not args.payload:
logger.warning(
"Scanning RoutineControl with empty payload can successfully execute some "
+ "routines that might have irreversible effects without elevated privileges"
)

# Scan all three subfunctions (startRoutine, stopRoutine, requestRoutineResults)
sub_functions = list(map(int, RoutineControlSubFuncs))

if args.sid == UDSIsoServices.SecurityAccess and args.end > 0xFF:
logger.warning(
"Service 0x27 SecurityAccess only accepts subFunctions (1-byte identifiers); "
+ f"limiting END to {g_repr(0xff)} instead of {g_repr(args.end)}"
)
args.end = 0xFF

for DID, sub_function in product(
range(args.start, args.end + 1), sub_functions
):
if session in args.skip and DID in args.skip[session]:
logger.info(f"{g_repr(DID)}: skipped")
continue

logger.result(f"Starting scan in session: {g_repr(session)}")
positive_DIDs = 0
abnormal_DIDs = 0
timeout_DIDs = 0
sub_functions = [0x00]

if args.sid == UDSIsoServices.RoutineControl:
if not args.payload:
logger.warning(
"Scanning RoutineControl with empty payload can successfully execute some "
+ "routines, such as switching from plant mode to field mode, which can only "
+ "be reversed with a valid token!"
if (
session is not None
and args.check_session
and DID % args.check_session == 0
):
# Check session and try to recover from wrong session (max 3 times), else skip session
if not await self.ecu.check_and_set_session(session):
logger.error(
f"Aborting scan on session {g_repr(session)}; current DID was {g_repr(DID)}"
)

# Scan all three subfunctions (startRoutine, stopRoutine, requestRoutineResults)
sub_functions = list(map(int, RoutineControlSubFuncs))
break

if args.sid == UDSIsoServices.SecurityAccess:
if args.end > 0xFF:
logger.warning(
"Service 0x27 SecurityAccess only accepts subFunctions (1-byte identifiers); "
+ f"limiting END to {g_repr(0xff)} instead of {g_repr(args.end)}"
if DID & 0b10000000:
logger.info(
"Keep in mind that you set the SuppressResponse Bit (8th bit): "
+ f"{g_repr(DID)} = 0b{DID:b}"
)
args.end = 0xFF
pdu = bytes([args.sid, DID])

elif args.sid == UDSIsoServices.RoutineControl:
pdu = bytes(
[args.sid, sub_function, DID >> 8, DID & 0xFF]
) # Needs extra byte for sub function

# DefaultBehavior, e.g. for ReadDataByIdentifier/WriteDataByIdentifier
else:
pdu = bytes([args.sid, DID >> 8, DID & 0xFF])

if args.payload:
pdu += args.payload

try:
resp = await self.ecu.send_raw(
pdu, config=UDSRequestConfig(tags=["ANALYZE"], max_retry=3)
)
except asyncio.TimeoutError:
logger.result(f"{g_repr(DID)}: Retries exceeded")
timeout_DIDs += 1
continue
except IllegalResponse as e:
logger.warning(g_repr(e))
continue

for DID, sub_function in product(
range(args.start, args.end + 1), sub_functions
):
if session in args.skip and DID in args.skip[session]:
logger.info(f"{g_repr(DID)}: skipped")
continue
if isinstance(resp, NegativeResponse):
if suggests_service_not_supported(resp):
logger.info(
f"{g_repr(DID)}: {resp}; does session {g_repr(session)} "
f"support service {service_repr(args.sid)}?"
)

if args.check_session and DID % args.check_session == 0:
# Check session and try to recover from wrong session (max 3 times), else skip session
if not await self.ecu.check_and_set_session(session):
logger.error(
f"Aborting scan on session {g_repr(session)}; current DID was {g_repr(DID)}"
)
if args.skip_not_supported:
break

if args.sid == UDSIsoServices.SecurityAccess:
if DID & 0b10000000:
logger.info(
"Keep in mind that you set the SuppressResponse Bit (8th bit): "
+ f"{g_repr(DID)} = 0b{DID:b}"
)
pdu = bytes([args.sid, DID])

elif args.sid == UDSIsoServices.RoutineControl:
pdu = bytes(
[args.sid, sub_function, DID >> 8, DID & 0xFF]
) # Needs extra byte for sub function
# RequestOutOfRange is a common reply for invalid/unknown DataIdentifiers
elif resp.response_code == UDSErrorCodes.requestOutOfRange:
logger.info(f"{g_repr(DID)}: {resp}")

# DefaultBehaviour, e.g. for ReadDataByIdentifier/WriteDataByIdentifier
else:
pdu = bytes([args.sid, DID >> 8, DID & 0xFF])

if args.payload:
pdu += args.payload

try:
resp = await self.ecu.send_raw(
pdu, config=UDSRequestConfig(tags=["ANALYZE"], max_retry=3)
)
except asyncio.TimeoutError:
logger.result(f"{g_repr(DID)}: Retries exceeded")
timeout_DIDs += 1
continue
except IllegalResponse as e:
logger.warning(g_repr(e))

if isinstance(resp, NegativeResponse):
if suggests_service_not_supported(resp):
logger.info(
f"{g_repr(DID)}: {resp}; does session {g_repr(session)} "
f"support service {service_repr(args.sid)}?"
)

if args.skip_not_supported:
break

# RequestOutOfRange is a common reply for invalid DataIdentifiers
elif resp.response_code == UDSErrorCodes.requestOutOfRange:
logger.info(f"{g_repr(DID)}: {resp}")

else:
logger.result(f"{g_repr(DID)}: {resp}")
abnormal_DIDs += 1
else:
logger.result(f"{g_repr(DID)}: {resp}")
positive_DIDs += 1

logger.result(f"Scan in session {g_repr(session)} is complete!")
logger.result(f"Positive replies: {positive_DIDs}")
logger.result(f"Abnormal replies: {abnormal_DIDs}")
logger.result(f"Timeouts: {timeout_DIDs}")

logger.info(f"Leaving session {g_repr(session)} via hook")
await self.ecu.leave_session(session)
abnormal_DIDs += 1
else:
logger.result(f"{g_repr(DID)}: {resp}")
positive_DIDs += 1

logger.result(f"Positive replies: {positive_DIDs}")
logger.result(f"Abnormal replies: {abnormal_DIDs}")
logger.result(f"Timeouts: {timeout_DIDs}")
Loading
Loading