From 5c22d101dc319ccd49d5bb0e40da8852473c8348 Mon Sep 17 00:00:00 2001 From: dru1d Date: Fri, 19 May 2023 20:11:41 -0400 Subject: [PATCH] Added LAPS Version 2.0 decryption option; this requires the https://github.com/fortra/impacket/pull/1556 fork of impacket as it contains an implementation of MS-GKDI. Shout out to @zblurx for 99.9999% of the code I used. --- GetLAPSPassword.py | 138 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 118 insertions(+), 20 deletions(-) diff --git a/GetLAPSPassword.py b/GetLAPSPassword.py index d6fdf6f..9237da0 100755 --- a/GetLAPSPassword.py +++ b/GetLAPSPassword.py @@ -8,10 +8,11 @@ # for more information. # # Description: -# This script will gather data about the domain's computers and their LAPS passwords. +# This script will gather data about the domain's computers and their LAPS/LAPSv2 passwords. # Initial formatting for this tool came from the GetADUsers.py example script. # -# Author: +# Author(s): +# Thomas Seigneuret (@zblurx) # Tyler Booth (@dru1d-foofus) # # Reference for: @@ -21,18 +22,25 @@ from __future__ import division from __future__ import print_function from __future__ import unicode_literals -import argparse -import logging -import sys -import os from datetime import datetime - from impacket import version +from impacket.dcerpc.v5 import transport +from impacket.dcerpc.v5.epm import hept_map +from impacket.dcerpc.v5.gkdi import MSRPC_UUID_GKDI, GkdiGetKey, GroupKeyEnvelope +from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_LEVEL_PKT_INTEGRITY, RPC_C_AUTHN_LEVEL_PKT_PRIVACY from impacket.dcerpc.v5.samr import UF_ACCOUNTDISABLE +from impacket.dpapi_ng import EncryptedPasswordBlob, KeyIdentifier, compute_kek, create_sd, decrypt_plaintext, unwrap_cek from impacket.examples import logger from impacket.examples.utils import parse_credentials from impacket.ldap import ldap, ldapasn1 from impacket.smbconnection import SMBConnection, SessionError +from pyasn1.codec.der import decoder +from pyasn1_modules import rfc5652 +import argparse +import json +import logging +import os +import sys class GetLAPSPassword: @@ -51,6 +59,7 @@ def __init__(self, username, password, domain, cmdLineOptions, output_file=None) self.__kdcHost = cmdLineOptions.dc_host self.__targetComputer = cmdLineOptions.computer self.__allComputers = cmdLineOptions.all_computers + self.__lapsv2 = cmdLineOptions.lapsv2 if cmdLineOptions.hashes is not None: self.__lmhash, self.__nthash = cmdLineOptions.hashes.split(':') @@ -63,10 +72,14 @@ def __init__(self, username, password, domain, cmdLineOptions, output_file=None) # Remove last ',' self.baseDN = self.baseDN[:-1] - # Let's calculate the header and format - self.__header = ["Name", "LAPS Password", "LAPS Password Expiration"] - self.__colLen = [20, 20, 30] - self.__outputFormat = ' '.join(['{%d:%ds} ' % (num, width) for num, width in enumerate(self.__colLen)]) + if self.__lapsv2: + self.__header = ["Name", "LAPS Username", "LAPS Password", "LAPS Password Expiration"] + self.__colLen = [20, 20, 20, 30] + else: + self.__header = ["Name", "LAPS Password", "LAPS Password Expiration"] + self.__colLen = [20, 20, 30] + + self.__outputFormat = '| '.join(['{%d:%ds} ' % (num, width) for num, width in enumerate(self.__colLen)]) # Output file magic self.__output_file = output_file @@ -135,6 +148,7 @@ def processRecord(self, item): if isinstance(item, ldapasn1.SearchResultEntry) is not True: return cn = 'N/A' + lapsUsername = 'N/A' lapsPassword = 'N/A' lapsPasswordExpiration = 'N/A' laps_enabled = False @@ -150,20 +164,97 @@ def processRecord(self, item): elif str(attribute['type']) == 'ms-Mcs-AdmPwd': lapsPassword = attribute['vals'][0].asOctets().decode('utf-8') laps_enabled = True - - print((self.__outputFormat.format(*[cn, lapsPassword, lapsPasswordExpiration]))) + elif str(attribute['type']) == 'msLAPS-EncryptedPassword': # Checking for msLAPS-EncryptedPassword + rawEncryptedLAPSBlob = bytes(attribute['vals'][0]) + KDSCache = {} + try: + encryptedLAPSBlob = EncryptedPasswordBlob(rawEncryptedLAPSBlob) + parsed_cms_data, remaining = decoder.decode(encryptedLAPSBlob['Blob'], asn1Spec=rfc5652.ContentInfo()) + enveloped_data_blob = parsed_cms_data['content'] + parsed_enveloped_data, _ = decoder.decode(enveloped_data_blob, asn1Spec=rfc5652.EnvelopedData()) + + recipient_infos = parsed_enveloped_data['recipientInfos'] + kek_recipient_info = recipient_infos[0]['kekri'] + kek_identifier = kek_recipient_info['kekid'] + key_id = KeyIdentifier(bytes(kek_identifier['keyIdentifier'])) + tmp,_ = decoder.decode(kek_identifier['other']['keyAttr']) + sid = tmp['field-1'][0][0][1].asOctets().decode("utf-8") + target_sd = create_sd(sid) + laps_enabled = True + except Exception as e: + logging.error('Cannot unpack msLAPS-EncryptedPassword blob due to error %s' % str(e)) + # Check if item is in cache + if key_id['RootKeyId'] in KDSCache: + logging.debug("Got KDS from cache") + gke = KDSCache[key_id['RootKeyId']] + else: + # Connect on RPC over TCP to MS-GKDI to call opnum 0 GetKey + stringBinding = hept_map(destHost=self.__target, remoteIf=MSRPC_UUID_GKDI, protocol = 'ncacn_ip_tcp') + rpctransport = transport.DCERPCTransportFactory(stringBinding) + if hasattr(rpctransport, 'set_credentials'): + rpctransport.set_credentials(username=self.__username, password=self.__password, domain=self.__domain, lmhash=self.__lmhash, nthash=self.__nthash) + if self.__doKerberos: + rpctransport.set_kerberos(self.__doKerberos, kdcHost=self.__target) + if self.__kdcIP is not None: + rpctransport.setRemoteHost(self.__kdcIP) + rpctransport.setRemoteName(self.__target) + + dce = rpctransport.get_dce_rpc() + dce.set_auth_level(RPC_C_AUTHN_LEVEL_PKT_INTEGRITY) + dce.set_auth_level(RPC_C_AUTHN_LEVEL_PKT_PRIVACY) + logging.debug("Connecting to %s" % stringBinding) + try: + dce.connect() + except Exception as e: + logging.error("Something went wrong, check error status => %s" % str(e)) + return laps_enabled + logging.debug("Connected") + try: + dce.bind(MSRPC_UUID_GKDI) + except Exception as e: + logging.error("Something went wrong, check error status => %s" % str(e)) + return laps_enabled + logging.debug("Successfully bound") + + + logging.debug("Calling MS-GKDI GetKey") + resp = GkdiGetKey(dce, target_sd=target_sd, l0=key_id['L0Index'], l1=key_id['L1Index'], l2=key_id['L2Index'], root_key_id=key_id['RootKeyId']) + logging.debug("Decrypting password") + # Unpack GroupKeyEnvelope + gke = GroupKeyEnvelope(b''.join(resp['pbbOut'])) + KDSCache[gke['RootKeyId']] = gke + + kek = compute_kek(gke, key_id) + logging.debug("KEK:\t%s" % kek) + enc_content_parameter = bytes(parsed_enveloped_data["encryptedContentInfo"]["contentEncryptionAlgorithm"]["parameters"]) + iv, _ = decoder.decode(enc_content_parameter) + iv = bytes(iv[0]) + + cek = unwrap_cek(kek, bytes(kek_recipient_info['encryptedKey'])) + logging.debug("CEK:\t%s" % cek) + plaintext = decrypt_plaintext(cek, iv, remaining) + + if self.__lapsv2: + json_str = plaintext[:-18].decode('utf-16le') + json_obj = json.loads(json_str) # parse JSON string into Python dictionary + lapsUsername = json_obj.get('n', 'N/A') # if 'n' key does not exist, 'N/A' is returned + lapsPassword = json_obj.get('p', 'N/A') + print(self.__outputFormat.format(*[cn, lapsUsername, lapsPassword, lapsPasswordExpiration])) + else: + print((self.__outputFormat.format(*[cn, lapsPassword, lapsPasswordExpiration]))) except Exception as e: logging.error('Error processing record: %s', str(e)) logging.debug(item, exc_info=True) pass - cn = "test" - lapsPassword = "as13r34,234!dfa" - lapsPasswordExpiration = '2023-06-08ASDF' - output_line = self.__outputFormat.format(*[cn, lapsPassword, lapsPasswordExpiration]) - print(output_line) - self.__write_to_file(output_line.replace(' ', '|')) # Save the output to a PSV + if self.__lapsv2: + output_line = self.__outputFormat.format(cn, lapsUsername, lapsPassword, lapsPasswordExpiration) + self.__write_to_file(output_line.strip().replace(' ', '')) + else: + output_line = self.__outputFormat.format(cn, lapsPassword, lapsPasswordExpiration) + self.__write_to_file(output_line.strip().replace(' ', '')) return laps_enabled + def run(self): if self.__kdcHost is not None: self.__target = self.__kdcHost @@ -216,13 +307,19 @@ def run(self): elif self.__targetComputer is not None: searchFilter = "(&(objectCategory=computer)(ms-Mcs-AdmPwdExpirationtime=*)(cn=%s))" searchFilter = searchFilter % self.__targetComputer + elif self.__lapsv2: + searchFilter = "(&(objectCategory=computer)(msLAPS-EncryptedPassword=*))" + elif self.__lapsv2 and self.__targetComputer is not None: + searchFilter = "(&(objectCategory=computer)(msLAPS-EncryptedPassword=*)(cn=%s))" + searchFilter = searchFilter % self.__targetComputer try: logging.debug('Search Filter=%s' % searchFilter) sc = ldap.SimplePagedResultsControl(size=100) # Search for computer objects and include the 'ms-MCS-AdmPwdExpirationTime' attribute laps_enabled = ldapConnection.search(searchFilter=searchFilter, - attributes=['cn', 'ms-MCS-AdmPwd', 'ms-MCS-AdmPwdExpirationTime'], + attributes=['cn', 'ms-MCS-AdmPwd', 'ms-MCS-AdmPwdExpirationTime', 'msLAPS-EncryptedPassword', 'msLAPS-Password', \ + 'msLAPS-PasswordExpirationTime'], sizeLimit=0, searchControls = [sc], perRecordCallback=self.processRecord) if laps_enabled == False: print("\n[!] LAPS is not enabled for this domain.") @@ -247,6 +344,7 @@ def run(self): parser.add_argument('-ts', action='store_true', help='Adds timestamp to every logging output') parser.add_argument('-debug', action='store_true', help='Turn DEBUG output ON') parser.add_argument('-outputfile', '-o', action='store', help='Outputs to a file.') + parser.add_argument('-lapsv2', action='store_true', help='Toggles LAPS Version 2.0 extraction') group = parser.add_argument_group('authentication') group.add_argument('-hashes', action="store", metavar = "LMHASH:NTHASH", help='NTLM hashes, format is LMHASH:NTHASH')