From 626f7a5ca83fe134e95f22e516bdfb836dd8e475 Mon Sep 17 00:00:00 2001 From: David Emeis Date: Mon, 5 Aug 2024 18:02:06 +0200 Subject: [PATCH 1/6] feat: SecurityAccess key length scanner --- src/gallia/commands/__init__.py | 3 + src/gallia/commands/scan/uds/sa_keylen.py | 181 ++++++++++++++++++++++ 2 files changed, 184 insertions(+) create mode 100644 src/gallia/commands/scan/uds/sa_keylen.py diff --git a/src/gallia/commands/__init__.py b/src/gallia/commands/__init__.py index 9ba686f17..52dad3cb9 100644 --- a/src/gallia/commands/__init__.py +++ b/src/gallia/commands/__init__.py @@ -27,6 +27,7 @@ from gallia.commands.scan.uds.memory import MemoryFunctionsScanner from gallia.commands.scan.uds.reset import ResetScanner from gallia.commands.scan.uds.sa_dump_seeds import SASeedsDumper +from gallia.commands.scan.uds.sa_keylen import SAKeylenDetector from gallia.commands.scan.uds.services import ServicesScanner from gallia.commands.scan.uds.sessions import SessionsScanner @@ -42,6 +43,7 @@ RTCLPrimitive, ReadByIdentifierPrimitive, ResetScanner, + SAKeylenDetector, SASeedsDumper, ScanIdentifiers, SessionsScanner, @@ -77,6 +79,7 @@ "RTCLPrimitive", "ReadByIdentifierPrimitive", "ResetScanner", + "SAKeylenDetector", "SASeedsDumper", "ScanIdentifiers", "SessionsScanner", diff --git a/src/gallia/commands/scan/uds/sa_keylen.py b/src/gallia/commands/scan/uds/sa_keylen.py new file mode 100644 index 000000000..deb0d59cb --- /dev/null +++ b/src/gallia/commands/scan/uds/sa_keylen.py @@ -0,0 +1,181 @@ +# SPDX-FileCopyrightText: AISEC Pentesting Team +# +# SPDX-License-Identifier: Apache-2.0 + +import binascii +import sys +import time +from argparse import ArgumentParser, Namespace + +from gallia.command import UDSScanner +from gallia.config import Config +from gallia.log import get_logger +from gallia.services.uds import NegativeResponse, UDSRequestConfig +from gallia.services.uds.core.service import SecurityAccessResponse, UDSErrorCodes +from gallia.services.uds.core.utils import g_repr +from gallia.utils import auto_int + +logger = get_logger(__name__) + + +class SAKeylenDetector(UDSScanner): + """This scanner tries to determine the key length expected by SecurityAccess.""" + + COMMAND = "key-length" + SHORT_HELP = "determine key length expected by SecurityAccess" + + def __init__(self, parser: ArgumentParser, config: Config = Config()) -> None: + super().__init__(parser, config) + + self.implicit_logging = False + + def configure_parser(self) -> None: + self.parser.add_argument( + "--session", + metavar="INT", + type=auto_int, + default=0x02, + help="Set diagnostic session to perform test in", + ) + self.parser.add_argument( + "--check-session", + action="store_true", + default=False, + help="Check current session with read DID", + ) + self.parser.add_argument( + "--level", + default=0x11, + metavar="INT", + type=auto_int, + help="Set security access level for which the seed for calculating the key would be returned.", + ) + self.parser.add_argument( + "--request-seed", + action="store_true", + default=False, + help="Request a seed before sending a key. The default is to just send the key.", + ) + self.parser.add_argument( + "--reset", + nargs="?", + const=1, + default=None, + type=int, + help="Attempt to fool brute force protection by resetting the ECU after every nth sent key.", + ) + self.parser.add_argument( + "--max-length", + default=1000, + type=int, + metavar="INT", + help="Test key lengths from 1 up to N bytes. The default is N = 1000.", + ) + self.parser.add_argument( + "--data-record", + metavar="HEXSTRING", + type=binascii.unhexlify, + default=b"", + help="Append an optional data record to seed requests. Only has an effect when combined with '--request-seed'.", + ) + self.parser.add_argument( + "--sleep", + default=0, + type=float, + metavar="FLOAT", + help="Attempt to fool brute force protection by sleeping for N seconds between sending keys.", + ) + + async def request_seed(self, level: int, data: bytes) -> bytes | None: + resp = await self.ecu.security_access_request_seed(level, data) + if isinstance(resp, NegativeResponse): + logger.warning(f"Requesting seed failed with: {resp}") + return None + return resp.security_seed + + async def main(self, args: Namespace) -> None: + session = args.session + logger.info(f"scanning in session: {g_repr(session)}") + + resp = await self.ecu.set_session(session) + if isinstance(resp, NegativeResponse): + logger.critical(f"could not change to session: {resp}") + return + + key = bytes([0x00]) + reset = False + runs_since_last_reset = 0 + length_identified = False + + while len(key) <= args.max_length: + logger.info(f"Testing key length {len(key)}...") + + if args.check_session or reset: + if not await self.ecu.check_and_set_session(args.session): + logger.error(f"ECU persistently lost session {g_repr(args.session)}") + sys.exit(1) + + reset = False + + if args.request_seed: + try: + await self.request_seed(args.level, args.data_record) + except Exception as e: + logger.critical(f"Error while requesting seed: {g_repr(e)}") + sys.exit(1) + + resp = await self.ecu.security_access_send_key( + args.level + 1, key, config=UDSRequestConfig(tags=["ANALYZE"]) + ) + if isinstance(resp, SecurityAccessResponse): + logger.result( + f"That's unexpected: Unlocked SA level {g_repr(args.level)} with all-zero key of length {len(key)}." + ) + length_identified = True + break + elif isinstance(resp, NegativeResponse): + if ( + not args.request_seed + and resp.response_code == UDSErrorCodes.requestSequenceError + ) or ( + args.request_seed and resp.response_code == UDSErrorCodes.conditionsNotCorrect + ): + logger.result(f"The ECU seems to be expecting keys of length {len(key)}.") + length_identified = True + break + + key += bytes([0x00]) + + runs_since_last_reset += 1 + + if runs_since_last_reset == args.reset: + reset = True + runs_since_last_reset = 0 + + try: + logger.info("Resetting the ECU") + await self.ecu.ecu_reset(0x01) + logger.info("Waiting for the ECU to recover…") + await self.ecu.wait_for_ecu() + except TimeoutError: + logger.error("ECU did not respond after reset; exiting…") + sys.exit(1) + except ConnectionError: + logger.warning( + "Lost connection to the ECU after performing a reset. " + "Attempting to reconnect…" + ) + await self.ecu.reconnect() + + # Re-enter session. Checking/logging will be done at the beginning of next iteration + await self.ecu.set_session(session) + + if args.sleep > 0: + logger.info(f"Sleeping for {args.sleep} seconds between sending keys...") + time.sleep(args.sleep) + + if not length_identified: + logger.result( + f"Unable to identify valid key length for SecurityAccess between 1 and {args.max_length}." + ) + await self.ecu.leave_session(session, sleep=args.power_cycle_sleep) From 3fd57e75c0999daa6c9e8e324f39d37cb77c7a1f Mon Sep 17 00:00:00 2001 From: David Emeis Date: Mon, 5 Aug 2024 18:29:41 +0200 Subject: [PATCH 2/6] fix(sa_keylen): Correctly import UDSErrorCodes, make linters happy --- src/gallia/commands/scan/uds/sa_keylen.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/gallia/commands/scan/uds/sa_keylen.py b/src/gallia/commands/scan/uds/sa_keylen.py index deb0d59cb..409a25134 100644 --- a/src/gallia/commands/scan/uds/sa_keylen.py +++ b/src/gallia/commands/scan/uds/sa_keylen.py @@ -11,7 +11,8 @@ from gallia.config import Config from gallia.log import get_logger from gallia.services.uds import NegativeResponse, UDSRequestConfig -from gallia.services.uds.core.service import SecurityAccessResponse, UDSErrorCodes +from gallia.services.uds.core.constants import UDSErrorCodes +from gallia.services.uds.core.service import SecurityAccessResponse from gallia.services.uds.core.utils import g_repr from gallia.utils import auto_int @@ -97,9 +98,9 @@ async def main(self, args: Namespace) -> None: session = args.session logger.info(f"scanning in session: {g_repr(session)}") - resp = await self.ecu.set_session(session) - if isinstance(resp, NegativeResponse): - logger.critical(f"could not change to session: {resp}") + sess_resp = await self.ecu.set_session(session) + if isinstance(sess_resp, NegativeResponse): + logger.critical(f"could not change to session: {sess_resp}") return key = bytes([0x00]) @@ -124,21 +125,22 @@ async def main(self, args: Namespace) -> None: logger.critical(f"Error while requesting seed: {g_repr(e)}") sys.exit(1) - resp = await self.ecu.security_access_send_key( + key_resp = await self.ecu.security_access_send_key( args.level + 1, key, config=UDSRequestConfig(tags=["ANALYZE"]) ) - if isinstance(resp, SecurityAccessResponse): + if isinstance(key_resp, SecurityAccessResponse): logger.result( f"That's unexpected: Unlocked SA level {g_repr(args.level)} with all-zero key of length {len(key)}." ) length_identified = True break - elif isinstance(resp, NegativeResponse): + elif isinstance(key_resp, NegativeResponse): if ( not args.request_seed - and resp.response_code == UDSErrorCodes.requestSequenceError + and key_resp.response_code == UDSErrorCodes.requestSequenceError ) or ( - args.request_seed and resp.response_code == UDSErrorCodes.conditionsNotCorrect + args.request_seed + and key_resp.response_code == UDSErrorCodes.conditionsNotCorrect ): logger.result(f"The ECU seems to be expecting keys of length {len(key)}.") length_identified = True From ce10ffd0bf05f2a8adddd98c4429c2219ab6a6f3 Mon Sep 17 00:00:00 2001 From: David Emeis Date: Tue, 6 Aug 2024 14:49:27 +0200 Subject: [PATCH 3/6] fix(sa_keylen): Non-blocking sleep and cosmetic changes in output --- src/gallia/commands/scan/uds/sa_keylen.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/gallia/commands/scan/uds/sa_keylen.py b/src/gallia/commands/scan/uds/sa_keylen.py index 409a25134..634c34acb 100644 --- a/src/gallia/commands/scan/uds/sa_keylen.py +++ b/src/gallia/commands/scan/uds/sa_keylen.py @@ -2,6 +2,7 @@ # # SPDX-License-Identifier: Apache-2.0 +import asyncio import binascii import sys import time @@ -173,8 +174,8 @@ async def main(self, args: Namespace) -> None: await self.ecu.set_session(session) if args.sleep > 0: - logger.info(f"Sleeping for {args.sleep} seconds between sending keys...") - time.sleep(args.sleep) + logger.info(f"Sleeping for {args.sleep} seconds between sending keys…") + await asyncio.sleep(args.sleep) if not length_identified: logger.result( From c0b817d35c76f88f15f94e4fba1b2810c8e87def Mon Sep 17 00:00:00 2001 From: David Emeis Date: Tue, 6 Aug 2024 14:52:54 +0200 Subject: [PATCH 4/6] fix(sa_keylen): Drop unused import --- src/gallia/commands/scan/uds/sa_keylen.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/gallia/commands/scan/uds/sa_keylen.py b/src/gallia/commands/scan/uds/sa_keylen.py index 634c34acb..07ee7495a 100644 --- a/src/gallia/commands/scan/uds/sa_keylen.py +++ b/src/gallia/commands/scan/uds/sa_keylen.py @@ -5,7 +5,6 @@ import asyncio import binascii import sys -import time from argparse import ArgumentParser, Namespace from gallia.command import UDSScanner From 4dca0a562d83b70ac25fc3d95b790edcf290dd98 Mon Sep 17 00:00:00 2001 From: David Emeis Date: Mon, 19 Aug 2024 11:08:36 +0200 Subject: [PATCH 5/6] refactor(sa_keylen): remove default for sleep parameter --- src/gallia/commands/scan/uds/sa_keylen.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/gallia/commands/scan/uds/sa_keylen.py b/src/gallia/commands/scan/uds/sa_keylen.py index 07ee7495a..aec588ea3 100644 --- a/src/gallia/commands/scan/uds/sa_keylen.py +++ b/src/gallia/commands/scan/uds/sa_keylen.py @@ -81,7 +81,6 @@ def configure_parser(self) -> None: ) self.parser.add_argument( "--sleep", - default=0, type=float, metavar="FLOAT", help="Attempt to fool brute force protection by sleeping for N seconds between sending keys.", @@ -172,7 +171,7 @@ async def main(self, args: Namespace) -> None: # Re-enter session. Checking/logging will be done at the beginning of next iteration await self.ecu.set_session(session) - if args.sleep > 0: + if args.sleep is not None: logger.info(f"Sleeping for {args.sleep} seconds between sending keys…") await asyncio.sleep(args.sleep) From 8b540a2edbbefe5594230e1fe939c6285a61850f Mon Sep 17 00:00:00 2001 From: David Emeis Date: Fri, 7 Feb 2025 17:36:28 +0100 Subject: [PATCH 6/6] refactor: Integrate key length detection in dump-seeds --- src/gallia/commands/__init__.py | 3 - src/gallia/commands/scan/uds/sa_dump_seeds.py | 67 ++++++- src/gallia/commands/scan/uds/sa_keylen.py | 182 ------------------ 3 files changed, 60 insertions(+), 192 deletions(-) delete mode 100644 src/gallia/commands/scan/uds/sa_keylen.py diff --git a/src/gallia/commands/__init__.py b/src/gallia/commands/__init__.py index 52dad3cb9..9ba686f17 100644 --- a/src/gallia/commands/__init__.py +++ b/src/gallia/commands/__init__.py @@ -27,7 +27,6 @@ from gallia.commands.scan.uds.memory import MemoryFunctionsScanner from gallia.commands.scan.uds.reset import ResetScanner from gallia.commands.scan.uds.sa_dump_seeds import SASeedsDumper -from gallia.commands.scan.uds.sa_keylen import SAKeylenDetector from gallia.commands.scan.uds.services import ServicesScanner from gallia.commands.scan.uds.sessions import SessionsScanner @@ -43,7 +42,6 @@ RTCLPrimitive, ReadByIdentifierPrimitive, ResetScanner, - SAKeylenDetector, SASeedsDumper, ScanIdentifiers, SessionsScanner, @@ -79,7 +77,6 @@ "RTCLPrimitive", "ReadByIdentifierPrimitive", "ResetScanner", - "SAKeylenDetector", "SASeedsDumper", "ScanIdentifiers", "SessionsScanner", diff --git a/src/gallia/commands/scan/uds/sa_dump_seeds.py b/src/gallia/commands/scan/uds/sa_dump_seeds.py index 40d8204e0..4505b1665 100644 --- a/src/gallia/commands/scan/uds/sa_dump_seeds.py +++ b/src/gallia/commands/scan/uds/sa_dump_seeds.py @@ -12,6 +12,8 @@ from gallia.command.uds import UDSScannerConfig from gallia.log import get_logger from gallia.services.uds import NegativeResponse, UDSRequestConfig +from gallia.services.uds.core.constants import UDSErrorCodes +from gallia.services.uds.core.service import SecurityAccessResponse from gallia.services.uds.core.utils import g_repr logger = get_logger(__name__) @@ -25,11 +27,16 @@ class SASeedsDumperConfig(UDSScannerConfig): level: AutoInt = Field( 0x11, description="Set security access level to request seed from", metavar="INT" ) - send_zero_key: int = Field( - 0, - description="Attempt to fool brute force protection by pretending to send a key after requesting a seed (all zero bytes, length can be specified)", + send_zero_key: int | None = Field( + None, + description="Attempt to fool brute force protection by sending an all-zero key after each seed request. The length of the key can be specified or will otherwise be automatically determined.", metavar="BYTE_LENGTH", - const=96, + const=0, + ) + determine_key_size_max_length: int = Field( + 1000, + description="When trying to automatically determine the key size expected by the ECU, test key lengths from 1 up to N bytes.", + metavar="INT", ) reset: int | None = Field( None, @@ -60,6 +67,7 @@ def __init__(self, config: SASeedsDumperConfig): super().__init__(config) self.config: SASeedsDumperConfig = config self.implicit_logging = False + self.key_length: int | None = None async def request_seed(self, level: int, data: bytes) -> bytes | None: resp = await self.ecu.security_access_request_seed( @@ -70,7 +78,16 @@ async def request_seed(self, level: int, data: bytes) -> bytes | None: return None return resp.security_seed - async def send_key(self, level: int, key: bytes) -> bool: + async def send_key(self, level: int, fixed_key_size: int) -> bool: + if self.key_length is None and fixed_key_size > 0: + self.key_length = fixed_key_size + elif self.key_length is None: + self.key_length = await self.determine_key_length() + + if self.key_length <= 0: + return True + + key = bytes(self.key_length) resp = await self.ecu.security_access_send_key( level + 1, key, config=UDSRequestConfig(tags=["ANALYZE"]) ) @@ -93,6 +110,42 @@ def log_size(self, path: Path, time_delta: float) -> None: size_unit = "MiB" logger.notice(f"Dumping seeds with {rate:.2f}{rate_unit}/h: {size:.2f}{size_unit}") + async def determine_key_length(self) -> int: + key = bytes(1) + + logger.info("No key length given, trying to determine key length expected by the ECU…") + while len(key) <= self.config.determine_key_size_max_length: + logger.debug(f"Testing key length {len(key)}…") + + if self.config.check_session: + if not await self.ecu.check_and_set_session(self.config.session): + logger.error(f"ECU persistently lost session {g_repr(self.config.session)}.") + return -1 + + resp = await self.ecu.security_access_send_key(self.config.level + 1, key) + if isinstance(resp, SecurityAccessResponse): + logger.result( + f"That's unexpected: Unlocked SA level {g_repr(self.config.level)} with all-zero key of length {len(key)}." + ) + return -1 + elif ( + isinstance(resp, NegativeResponse) + and resp.response_code != UDSErrorCodes.incorrectMessageLengthOrInvalidFormat + ): + logger.info(f"The ECU seems to be expecting keys of length {len(key)}.") + return len(key) + + key += bytes(1) + + if self.config.sleep is not None: + logger.info(f"Sleeping for {self.config.sleep} seconds between sending keys…") + await asyncio.sleep(self.config.sleep) + + logger.error( + f"Unable to identify valid key length for SecurityAccess between 1 and {self.config.determine_key_size_max_length} bytes." + ) + return -1 + async def main(self) -> None: session = self.config.session logger.info(f"scanning in session: {g_repr(session)}") @@ -152,9 +205,9 @@ async def main(self) -> None: last_seed = seed - if self.config.send_zero_key > 0: + if self.config.send_zero_key is not None: try: - if await self.send_key(self.config.level, bytes(self.config.send_zero_key)): + if await self.send_key(self.config.level, self.config.send_zero_key): break except TimeoutError: logger.warning("Timeout while sending key") diff --git a/src/gallia/commands/scan/uds/sa_keylen.py b/src/gallia/commands/scan/uds/sa_keylen.py deleted file mode 100644 index aec588ea3..000000000 --- a/src/gallia/commands/scan/uds/sa_keylen.py +++ /dev/null @@ -1,182 +0,0 @@ -# SPDX-FileCopyrightText: AISEC Pentesting Team -# -# SPDX-License-Identifier: Apache-2.0 - -import asyncio -import binascii -import sys -from argparse import ArgumentParser, Namespace - -from gallia.command import UDSScanner -from gallia.config import Config -from gallia.log import get_logger -from gallia.services.uds import NegativeResponse, UDSRequestConfig -from gallia.services.uds.core.constants import UDSErrorCodes -from gallia.services.uds.core.service import SecurityAccessResponse -from gallia.services.uds.core.utils import g_repr -from gallia.utils import auto_int - -logger = get_logger(__name__) - - -class SAKeylenDetector(UDSScanner): - """This scanner tries to determine the key length expected by SecurityAccess.""" - - COMMAND = "key-length" - SHORT_HELP = "determine key length expected by SecurityAccess" - - def __init__(self, parser: ArgumentParser, config: Config = Config()) -> None: - super().__init__(parser, config) - - self.implicit_logging = False - - def configure_parser(self) -> None: - self.parser.add_argument( - "--session", - metavar="INT", - type=auto_int, - default=0x02, - help="Set diagnostic session to perform test in", - ) - self.parser.add_argument( - "--check-session", - action="store_true", - default=False, - help="Check current session with read DID", - ) - self.parser.add_argument( - "--level", - default=0x11, - metavar="INT", - type=auto_int, - help="Set security access level for which the seed for calculating the key would be returned.", - ) - self.parser.add_argument( - "--request-seed", - action="store_true", - default=False, - help="Request a seed before sending a key. The default is to just send the key.", - ) - self.parser.add_argument( - "--reset", - nargs="?", - const=1, - default=None, - type=int, - help="Attempt to fool brute force protection by resetting the ECU after every nth sent key.", - ) - self.parser.add_argument( - "--max-length", - default=1000, - type=int, - metavar="INT", - help="Test key lengths from 1 up to N bytes. The default is N = 1000.", - ) - self.parser.add_argument( - "--data-record", - metavar="HEXSTRING", - type=binascii.unhexlify, - default=b"", - help="Append an optional data record to seed requests. Only has an effect when combined with '--request-seed'.", - ) - self.parser.add_argument( - "--sleep", - type=float, - metavar="FLOAT", - help="Attempt to fool brute force protection by sleeping for N seconds between sending keys.", - ) - - async def request_seed(self, level: int, data: bytes) -> bytes | None: - resp = await self.ecu.security_access_request_seed(level, data) - if isinstance(resp, NegativeResponse): - logger.warning(f"Requesting seed failed with: {resp}") - return None - return resp.security_seed - - async def main(self, args: Namespace) -> None: - session = args.session - logger.info(f"scanning in session: {g_repr(session)}") - - sess_resp = await self.ecu.set_session(session) - if isinstance(sess_resp, NegativeResponse): - logger.critical(f"could not change to session: {sess_resp}") - return - - key = bytes([0x00]) - reset = False - runs_since_last_reset = 0 - length_identified = False - - while len(key) <= args.max_length: - logger.info(f"Testing key length {len(key)}...") - - if args.check_session or reset: - if not await self.ecu.check_and_set_session(args.session): - logger.error(f"ECU persistently lost session {g_repr(args.session)}") - sys.exit(1) - - reset = False - - if args.request_seed: - try: - await self.request_seed(args.level, args.data_record) - except Exception as e: - logger.critical(f"Error while requesting seed: {g_repr(e)}") - sys.exit(1) - - key_resp = await self.ecu.security_access_send_key( - args.level + 1, key, config=UDSRequestConfig(tags=["ANALYZE"]) - ) - if isinstance(key_resp, SecurityAccessResponse): - logger.result( - f"That's unexpected: Unlocked SA level {g_repr(args.level)} with all-zero key of length {len(key)}." - ) - length_identified = True - break - elif isinstance(key_resp, NegativeResponse): - if ( - not args.request_seed - and key_resp.response_code == UDSErrorCodes.requestSequenceError - ) or ( - args.request_seed - and key_resp.response_code == UDSErrorCodes.conditionsNotCorrect - ): - logger.result(f"The ECU seems to be expecting keys of length {len(key)}.") - length_identified = True - break - - key += bytes([0x00]) - - runs_since_last_reset += 1 - - if runs_since_last_reset == args.reset: - reset = True - runs_since_last_reset = 0 - - try: - logger.info("Resetting the ECU") - await self.ecu.ecu_reset(0x01) - logger.info("Waiting for the ECU to recover…") - await self.ecu.wait_for_ecu() - except TimeoutError: - logger.error("ECU did not respond after reset; exiting…") - sys.exit(1) - except ConnectionError: - logger.warning( - "Lost connection to the ECU after performing a reset. " - "Attempting to reconnect…" - ) - await self.ecu.reconnect() - - # Re-enter session. Checking/logging will be done at the beginning of next iteration - await self.ecu.set_session(session) - - if args.sleep is not None: - logger.info(f"Sleeping for {args.sleep} seconds between sending keys…") - await asyncio.sleep(args.sleep) - - if not length_identified: - logger.result( - f"Unable to identify valid key length for SecurityAccess between 1 and {args.max_length}." - ) - await self.ecu.leave_session(session, sleep=args.power_cycle_sleep)