#!/usr/bin/python # Copyright (C) 2010 Michael Ligh # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # [NOTES] ----------------------------------------------------------- # 1) Tested on Linux (Ubuntu), Windows XP/7, and Mac OS X #-------------------------------------------------------------------- import hashlib import time import binascii import string import os, sys import commands try: import pefile import peutils except ImportError: print 'pefile not installed, see http://code.google.com/p/pefile/' sys.exit() try: import magic except ImportError: print 'python-magic is not installed, file types will not be available' try: from ssdeep import ssdeep except ImportError: print 'pyssdeep is not installed, see http://code.google.com/p/pyssdeep/' try: import yara except ImportError: print 'yara-python is not installed, see http://code.google.com/p/yara-project/' # suspicious APIs to alert on alerts = ['OpenProcess', 'VirtualAllocEx', 'WriteProcessMemory', 'CreateRemoteThread', 'ReadProcessMemory', 'CreateProcess', 'WinExec', 'ShellExecute', 'HttpSendRequest', 'InternetReadFile', 'InternetConnect', 'CreateService', 'StartService'] # legit entry point sections good_ep_sections = ['.text', '.code', 'INIT', 'PAGE'] # path to clamscan (optional) clamscan_path = '/usr/bin/clamscanx' def convert_char(char): if char in string.ascii_letters or \ char in string.digits or \ char in string.punctuation or \ char in string.whitespace: return char else: return r'\x%02x' % ord(char) def convert_to_printable(s): return ''.join([convert_char(c) for c in s]) class PEScanner: def __init__(self, files, yara_rules=None, peid_sigs=None): self.files = files # initialize YARA rules if provided if yara_rules and sys.modules.has_key('yara'): self.rules = yara.compile(yara_rules) else: self.rules = None # initialize PEiD signatures if provided if peid_sigs: self.sigs = peutils.SignatureDatabase(peid_sigs) else: self.sigs = None # initialize python magic (file identification) # magic interface on python <= 2.6 is different than python >= 2.6 if sys.modules.has_key('magic'): if sys.version_info <= (2, 6): self.ms = magic.open(magic.MAGIC_NONE) self.ms.load() def check_ep_section(self, pe): """ Determine if a PE's entry point is suspicious """ name = '' ep = pe.OPTIONAL_HEADER.AddressOfEntryPoint for sec in pe.sections: if (ep >= sec.VirtualAddress) and \ (ep < (sec.VirtualAddress + sec.Misc_VirtualSize)): name = sec.Name.replace('\x00', '') return (ep,name) def check_verinfo(self, pe): """ Determine the version info in a PE file """ ret = [] if hasattr(pe, 'VS_VERSIONINFO'): if hasattr(pe, 'FileInfo'): for entry in pe.FileInfo: if hasattr(entry, 'StringTable'): for st_entry in entry.StringTable: for str_entry in st_entry.entries.items(): ret.append( convert_to_printable(str_entry[0]) + ': ' + convert_to_printable(str_entry[1]) ) elif hasattr(entry, 'Var'): for var_entry in entry.Var: if hasattr(var_entry, 'entry'): ret.append( convert_to_printable(var_entry.entry.keys()[0]) + ': ' + var_entry.entry.values()[0]) return '\n'.join(ret) def check_tls(self, pe): callbacks = [] if (hasattr(pe, 'DIRECTORY_ENTRY_TLS') and \ pe.DIRECTORY_ENTRY_TLS and \ pe.DIRECTORY_ENTRY_TLS.struct and \ pe.DIRECTORY_ENTRY_TLS.struct.AddressOfCallBacks): callback_array_rva = pe.DIRECTORY_ENTRY_TLS.struct.AddressOfCallBacks \ - pe.OPTIONAL_HEADER.ImageBase idx = 0 while True: func = pe.get_dword_from_data(pe.get_data(callback_array_rva + 4 * idx, 4), 0) if func == 0: break callbacks.append(func) idx += 1 return callbacks def check_rsrc(self, pe): ret = {} if hasattr(pe, 'DIRECTORY_ENTRY_RESOURCE'): i = 0 for resource_type in pe.DIRECTORY_ENTRY_RESOURCE.entries: if resource_type.name is not None: name = "%s" % resource_type.name else: name = "%s" % pefile.RESOURCE_TYPE.get(resource_type.struct.Id) if name == None: name = "%d" % resource_type.struct.Id if hasattr(resource_type, 'directory'): for resource_id in resource_type.directory.entries: if hasattr(resource_id, 'directory'): for resource_lang in resource_id.directory.entries: data = pe.get_data( resource_lang.data.struct.OffsetToData, resource_lang.data.struct.Size) if sys.modules.has_key('magic'): if sys.version_info <= (2, 6): filetype = self.ms.buffer(data) else: filetype = magic.from_buffer(data) else: filetype = None if filetype == None: filetype = '' ret[i] = (name, resource_lang.data.struct.OffsetToData, resource_lang.data.struct.Size, filetype) i += 1 return ret def check_imports(self, pe): ret = [] if not hasattr(pe, 'DIRECTORY_ENTRY_IMPORT'): return ret for lib in pe.DIRECTORY_ENTRY_IMPORT: for imp in lib.imports: if (imp.name != None) and (imp.name != ""): for alert in alerts: if imp.name.startswith(alert): ret.append(imp.name) return ret def get_timestamp(self, pe): val = pe.FILE_HEADER.TimeDateStamp ts = '0x%-8X' % (val) try: ts += ' [%s UTC]' % time.asctime(time.gmtime(val)) that_year = time.gmtime(val)[0] this_year = time.gmtime(time.time())[0] if that_year < 2000 or that_year > this_year: ts += " [SUSPICIOUS]" except: ts += ' [SUSPICIOUS]' return ts def check_packers(self, pe): packers = [] if self.sigs: matches = self.sigs.match(pe, ep_only = True) if matches != None: for match in matches: packers.append(match) return packers def check_yara(self, data): ret = [] if self.rules: yarahits = self.rules.match(data=data) if yarahits: for hit in yarahits: ret.append("YARA: %s" % hit.rule) #for key, val in hit.strings.iteritems(): for (key,stringname,val) in hit.strings: makehex = False for char in val: if char not in string.printable: makehex = True break if makehex == True: ret.append(" %s => %s" % (hex(key), binascii.hexlify(val))) else: ret.append(" %s => %s" % (hex(key), val)) return '\n'.join(ret) def check_clam(self, file): if os.path.isfile(clamscan_path): status, output = commands.getstatusoutput("%s %s" % (clamscan_path, file)) if status == 0: return "Clamav: %s" % output.split("\n")[0] return '' def header(self, msg): return "\n" + msg + "\n" + ("=" * 60) def collect(self): count = 0 for file in self.files: out = [] try: FILE = open(file, "rb") data = FILE.read() FILE.close() except: continue if data == None or len(data) == 0: out.append("Cannot read %s (maybe empty?)" % file) out.append("") continue try: pe = pefile.PE(data=data, fast_load=True) pe.parse_data_directories( directories=[ pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT'], pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXPORT'], pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_TLS'], pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_RESOURCE']]) except: out.append("Cannot parse %s (maybe not PE?)" % file) out.append("") continue out.append(("#" * 60) + "\nRecord %d\n" % count + ("#" * 60)) out.append(self.header("Meta-data")) out.append("File: %s" % file) out.append("Size: %d bytes" % len(data)) if sys.modules.has_key('magic'): if sys.version_info <= (2, 6): out.append("Type: %s" % self.ms.buffer(data)) else: out.append("Type: %s" % magic.from_buffer(data)) out.append("MD5: %s" % hashlib.md5(data).hexdigest()) out.append("SHA1: %s" % hashlib.sha1(data).hexdigest()) if sys.modules.has_key('ssdeep'): s = ssdeep() out.append("ssdeep: %s" % s.hash_file(file)) out.append("Date: %s" % self.get_timestamp(pe)) (ep,name) = self.check_ep_section(pe) s = "EP: %s (%s)" % (hex(ep+pe.OPTIONAL_HEADER.ImageBase), name) if name not in good_ep_sections: s += " [SUSPICIOUS]" out.append(s) packers = self.check_packers(pe) if len(packers): out.append("Packers: %s" % ','.join(packers)) if sys.modules.has_key('yara'): yarahits = self.check_yara(data) else: yarahits = [] clamhits = self.check_clam(file) if len(yarahits) or len(clamhits): out.append(self.header("Signature scans")) out.append(yarahits) out.append(clamhits) callbacks = self.check_tls(pe) if len(callbacks): out.append(self.header("TLS callbacks")) for cb in callbacks: out.append(" 0x%x" % cb) resources = self.check_rsrc(pe) if len(resources): out.append(self.header("Resource entries")) out.append("%-18s %-12s %-12s Type" % ("Name", "RVA", "Size")) out.append("-" * 60) for rsrc in resources.keys(): (name,rva,size,type) = resources[rsrc] out.append("%-18s %-12s %-12s %s" % (name,hex(rva),hex(size),type)) imports = self.check_imports(pe) if len(imports): out.append(self.header("Suspicious IAT alerts")) for imp in imports: out.append(imp) out.append(self.header("Sections")) out.append("%-10s %-12s %-12s %-12s %-12s" % ("Name", "VirtAddr", "VirtSize", "RawSize", "Entropy")) out.append("-" * 60) for sec in pe.sections: s = "%-10s %-12s %-12s %-12s %-12f" % ( ''.join([c for c in sec.Name if c in string.printable]), hex(sec.VirtualAddress), hex(sec.Misc_VirtualSize), hex(sec.SizeOfRawData), sec.get_entropy()) if sec.SizeOfRawData == 0 or \ (sec.get_entropy() > 0 and sec.get_entropy() < 1) or \ sec.get_entropy() > 7: s += "[SUSPICIOUS]" out.append(s) verinfo = self.check_verinfo(pe) if len(verinfo): out.append(self.header("Version info")) out.append(verinfo) out.append("") print '\n'.join(out) count += 1 if __name__ == "__main__": if len(sys.argv) != 2: print "Usage: %s \n" % (sys.argv[0]) sys.exit() object = sys.argv[1] files = [] if os.path.isdir(object): for entry in os.listdir(object): files.append(os.path.join(object, entry)) elif os.path.isfile(object): files.append(object) else: print "You must supply a file or directory!" sys.exit() # You should fill these in with a path to your YARA rules and PEiD database pescan = PEScanner(files, '', '') pescan.collect()