# Impacket - Collection of Python classes for working with network protocols. # # Copyright (C) 2023 Fortra. All rights reserved. # # This software is provided under a slightly modified version # of the Apache Software License. See the accompanying LICENSE file # for more information. # # Tested so far: # ROpenSCManagerW # RControlService # RDeleteService # RLockServiceDatabase # RQueryServiceObjectSecurity # RQueryServiceStatus # RUnlockServiceDatabase # RNotifyBootConfigStatus # RChangeServiceConfigW # RCreateServiceW # REnumDependentServicesW # REnumServicesStatusW # ROpenSCManager # ROpenServiceW # RQueryServiceConfigW # RQueryServiceLockStatusW # RStartServiceW # CRGetServiceDisplayNameW # RGetServiceKeyNameW # REnumServiceGroupW # RChangeServiceConfig2W # RQueryServiceConfig2W # RQueryServiceStatusEx # REnumServicesStatusExW # RNotifyServiceStatusChange # RGetNotifyResults # RCloseNotifyHandle # RControlServiceExW # RQueryServiceConfigEx # # Not yet: # hRCloseServiceHandleCall # RSetServiceObjectSecurity # RSetServiceStatus # RCreateServiceWOW64W # import time import pytest import unittest from struct import unpack from tests.dcerpc import DCERPCTests from impacket.dcerpc.v5 import scmr from impacket.dcerpc.v5.ndr import NULL from impacket.crypto import encryptSecret from impacket.uuid import string_to_bin from impacket import ntlm class SCMRTests(DCERPCTests): iface_uuid = scmr.MSRPC_UUID_SCMR authn = True def get_service_handle(self, dce): lpMachineName = 'DUMMY\x00' lpDatabaseName = 'ServicesActive\x00' desiredAccess = scmr.SERVICE_START | scmr.SERVICE_STOP | scmr.SERVICE_CHANGE_CONFIG | scmr.SERVICE_QUERY_CONFIG | scmr.SERVICE_QUERY_STATUS | scmr.SERVICE_ENUMERATE_DEPENDENTS | scmr.SC_MANAGER_ENUMERATE_SERVICE resp = scmr.hROpenSCManagerW(dce, lpMachineName, lpDatabaseName, desiredAccess) scHandle = resp['lpScHandle'] return scHandle def changeServiceAndQuery(self, dce, cbBufSize, hService, dwServiceType, dwStartType, dwErrorControl, lpBinaryPathName, lpLoadOrderGroup, lpdwTagId, lpDependencies, dwDependSize, lpServiceStartName, lpPassword, dwPwSize, lpDisplayName): try: resp = scmr.hRChangeServiceConfigW(dce, hService, dwServiceType, dwStartType, dwErrorControl, lpBinaryPathName, lpLoadOrderGroup, lpdwTagId, lpDependencies, dwDependSize, lpServiceStartName, lpPassword, dwPwSize, lpDisplayName) resp = scmr.hRQueryServiceConfigW(dce, hService) resp.dump() # Now let's compare all the results if dwServiceType != scmr.SERVICE_NO_CHANGE: self.assertEqual(resp['lpServiceConfig']['dwServiceType'], dwServiceType) if dwStartType != scmr.SERVICE_NO_CHANGE: self.assertEqual(resp['lpServiceConfig']['dwStartType'], dwStartType) if dwErrorControl != scmr.SERVICE_NO_CHANGE: self.assertEqual(resp['lpServiceConfig']['dwErrorControl'], dwErrorControl) if lpBinaryPathName != NULL: self.assertEqual(resp['lpServiceConfig']['lpBinaryPathName'], lpBinaryPathName) if lpBinaryPathName != NULL: self.assertEqual(resp['lpServiceConfig']['lpBinaryPathName'], lpBinaryPathName) if lpLoadOrderGroup != NULL: self.assertEqual(resp['lpServiceConfig']['lpLoadOrderGroup'], lpLoadOrderGroup) #if lpDependencies != '': # self.assertEqual( resp['lpServiceConfig']['lpDependencies'], lpDependencies[:-4]+'/\x00\x00\x00') if lpServiceStartName != NULL: self.assertEqual(resp['lpServiceConfig']['lpServiceStartName'], lpServiceStartName) if lpDisplayName != NULL: self.assertEqual(resp['lpServiceConfig']['lpDisplayName'], lpDisplayName) #if lpdwTagId != scmr.SERVICE_NO_CHANGE: # if resp['lpServiceConfig']['dwTagId']['Data'] != lpdwTagId: # print "ERROR %s" % 'lpdwTagId' except Exception: scmr.hRDeleteService(dce, hService) raise def changeServiceAndQuery2(self, dce, info, changeDone): serviceHandle = info['hService'] dwInfoLevel = info['Info']['Union']['tag'] cbBuffSize = 0 request = scmr.RQueryServiceConfig2W() request['hService'] = serviceHandle request['dwInfoLevel'] = dwInfoLevel request['cbBufSize'] = cbBuffSize try: resp = dce.request(request) except scmr.DCERPCSessionError as e: if str(e).find('ERROR_INSUFFICIENT_BUFFER') <= 0: raise else: resp = e.get_packet() request['cbBufSize'] = resp['pcbBytesNeeded'] resp = dce.request(request) arrayData = b''.join(resp['lpBuffer']) if dwInfoLevel == 1: self.assertEqual(arrayData[4:].decode('utf-16le'), changeDone) elif dwInfoLevel == 2: offset = unpack('