#!/usr/bin/python
# Copyright (C) 2010 Michael Ligh
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
# [NOTES] -----------------------------------------------------------
# 1) Tested on Linux (Ubuntu), Windows XP/7, and Mac OS X
#--------------------------------------------------------------------
import hashlib
import time
import binascii
import string
import os, sys
import commands
try:
import pefile
import peutils
except ImportError:
print 'pefile not installed, see http://code.google.com/p/pefile/'
sys.exit()
try:
import magic
except ImportError:
print 'python-magic is not installed, file types will not be available'
try:
from ssdeep import ssdeep
except ImportError:
print 'pyssdeep is not installed, see http://code.google.com/p/pyssdeep/'
try:
import yara
except ImportError:
print 'yara-python is not installed, see http://code.google.com/p/yara-project/'
# suspicious APIs to alert on
alerts = ['OpenProcess', 'VirtualAllocEx', 'WriteProcessMemory', 'CreateRemoteThread', 'ReadProcessMemory',
'CreateProcess', 'WinExec', 'ShellExecute', 'HttpSendRequest', 'InternetReadFile', 'InternetConnect',
'CreateService', 'StartService']
# legit entry point sections
good_ep_sections = ['.text', '.code', 'INIT', 'PAGE']
# path to clamscan (optional)
clamscan_path = '/usr/bin/clamscanx'
def convert_char(char):
if char in string.ascii_letters or \
char in string.digits or \
char in string.punctuation or \
char in string.whitespace:
return char
else:
return r'\x%02x' % ord(char)
def convert_to_printable(s):
return ''.join([convert_char(c) for c in s])
class PEScanner:
def __init__(self, files, yara_rules=None, peid_sigs=None):
self.files = files
# initialize YARA rules if provided
if yara_rules and sys.modules.has_key('yara'):
self.rules = yara.compile(yara_rules)
else:
self.rules = None
# initialize PEiD signatures if provided
if peid_sigs:
self.sigs = peutils.SignatureDatabase(peid_sigs)
else:
self.sigs = None
# initialize python magic (file identification)
# magic interface on python <= 2.6 is different than python >= 2.6
if sys.modules.has_key('magic'):
if sys.version_info <= (2, 6):
self.ms = magic.open(magic.MAGIC_NONE)
self.ms.load()
def check_ep_section(self, pe):
""" Determine if a PE's entry point is suspicious """
name = ''
ep = pe.OPTIONAL_HEADER.AddressOfEntryPoint
for sec in pe.sections:
if (ep >= sec.VirtualAddress) and \
(ep < (sec.VirtualAddress + sec.Misc_VirtualSize)):
name = sec.Name.replace('\x00', '')
return (ep,name)
def check_verinfo(self, pe):
""" Determine the version info in a PE file """
ret = []
if hasattr(pe, 'VS_VERSIONINFO'):
if hasattr(pe, 'FileInfo'):
for entry in pe.FileInfo:
if hasattr(entry, 'StringTable'):
for st_entry in entry.StringTable:
for str_entry in st_entry.entries.items():
ret.append(
convert_to_printable(str_entry[0]) + ': ' +
convert_to_printable(str_entry[1]) )
elif hasattr(entry, 'Var'):
for var_entry in entry.Var:
if hasattr(var_entry, 'entry'):
ret.append(
convert_to_printable(var_entry.entry.keys()[0]) +
': ' + var_entry.entry.values()[0])
return '\n'.join(ret)
def check_tls(self, pe):
callbacks = []
if (hasattr(pe, 'DIRECTORY_ENTRY_TLS') and \
pe.DIRECTORY_ENTRY_TLS and \
pe.DIRECTORY_ENTRY_TLS.struct and \
pe.DIRECTORY_ENTRY_TLS.struct.AddressOfCallBacks):
callback_array_rva = pe.DIRECTORY_ENTRY_TLS.struct.AddressOfCallBacks \
- pe.OPTIONAL_HEADER.ImageBase
idx = 0
while True:
func = pe.get_dword_from_data(pe.get_data(callback_array_rva + 4 * idx, 4), 0)
if func == 0:
break
callbacks.append(func)
idx += 1
return callbacks
def check_rsrc(self, pe):
ret = {}
if hasattr(pe, 'DIRECTORY_ENTRY_RESOURCE'):
i = 0
for resource_type in pe.DIRECTORY_ENTRY_RESOURCE.entries:
if resource_type.name is not None:
name = "%s" % resource_type.name
else:
name = "%s" % pefile.RESOURCE_TYPE.get(resource_type.struct.Id)
if name == None:
name = "%d" % resource_type.struct.Id
if hasattr(resource_type, 'directory'):
for resource_id in resource_type.directory.entries:
if hasattr(resource_id, 'directory'):
for resource_lang in resource_id.directory.entries:
data = pe.get_data(
resource_lang.data.struct.OffsetToData,
resource_lang.data.struct.Size)
if sys.modules.has_key('magic'):
if sys.version_info <= (2, 6):
filetype = self.ms.buffer(data)
else:
filetype = magic.from_buffer(data)
else:
filetype = None
if filetype == None:
filetype = ''
ret[i] = (name, resource_lang.data.struct.OffsetToData, resource_lang.data.struct.Size, filetype)
i += 1
return ret
def check_imports(self, pe):
ret = []
if not hasattr(pe, 'DIRECTORY_ENTRY_IMPORT'):
return ret
for lib in pe.DIRECTORY_ENTRY_IMPORT:
for imp in lib.imports:
if (imp.name != None) and (imp.name != ""):
for alert in alerts:
if imp.name.startswith(alert):
ret.append(imp.name)
return ret
def get_timestamp(self, pe):
val = pe.FILE_HEADER.TimeDateStamp
ts = '0x%-8X' % (val)
try:
ts += ' [%s UTC]' % time.asctime(time.gmtime(val))
that_year = time.gmtime(val)[0]
this_year = time.gmtime(time.time())[0]
if that_year < 2000 or that_year > this_year:
ts += " [SUSPICIOUS]"
except:
ts += ' [SUSPICIOUS]'
return ts
def check_packers(self, pe):
packers = []
if self.sigs:
matches = self.sigs.match(pe, ep_only = True)
if matches != None:
for match in matches:
packers.append(match)
return packers
def check_yara(self, data):
ret = []
if self.rules:
yarahits = self.rules.match(data=data)
if yarahits:
for hit in yarahits:
ret.append("YARA: %s" % hit.rule)
#for key, val in hit.strings.iteritems():
for (key,stringname,val) in hit.strings:
makehex = False
for char in val:
if char not in string.printable:
makehex = True
break
if makehex == True:
ret.append(" %s => %s" % (hex(key), binascii.hexlify(val)))
else:
ret.append(" %s => %s" % (hex(key), val))
return '\n'.join(ret)
def check_clam(self, file):
if os.path.isfile(clamscan_path):
status, output = commands.getstatusoutput("%s %s" % (clamscan_path, file))
if status == 0:
return "Clamav: %s" % output.split("\n")[0]
return ''
def header(self, msg):
return "\n" + msg + "\n" + ("=" * 60)
def collect(self):
count = 0
for file in self.files:
out = []
try:
FILE = open(file, "rb")
data = FILE.read()
FILE.close()
except:
continue
if data == None or len(data) == 0:
out.append("Cannot read %s (maybe empty?)" % file)
out.append("")
continue
try:
pe = pefile.PE(data=data, fast_load=True)
pe.parse_data_directories( directories=[
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT'],
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXPORT'],
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_TLS'],
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_RESOURCE']])
except:
out.append("Cannot parse %s (maybe not PE?)" % file)
out.append("")
continue
out.append(("#" * 60) + "\nRecord %d\n" % count + ("#" * 60))
out.append(self.header("Meta-data"))
out.append("File: %s" % file)
out.append("Size: %d bytes" % len(data))
if sys.modules.has_key('magic'):
if sys.version_info <= (2, 6):
out.append("Type: %s" % self.ms.buffer(data))
else:
out.append("Type: %s" % magic.from_buffer(data))
out.append("MD5: %s" % hashlib.md5(data).hexdigest())
out.append("SHA1: %s" % hashlib.sha1(data).hexdigest())
if sys.modules.has_key('ssdeep'):
s = ssdeep()
out.append("ssdeep: %s" % s.hash_file(file))
out.append("Date: %s" % self.get_timestamp(pe))
(ep,name) = self.check_ep_section(pe)
s = "EP: %s (%s)" % (hex(ep+pe.OPTIONAL_HEADER.ImageBase), name)
if name not in good_ep_sections:
s += " [SUSPICIOUS]"
out.append(s)
packers = self.check_packers(pe)
if len(packers):
out.append("Packers: %s" % ','.join(packers))
if sys.modules.has_key('yara'):
yarahits = self.check_yara(data)
else:
yarahits = []
clamhits = self.check_clam(file)
if len(yarahits) or len(clamhits):
out.append(self.header("Signature scans"))
out.append(yarahits)
out.append(clamhits)
callbacks = self.check_tls(pe)
if len(callbacks):
out.append(self.header("TLS callbacks"))
for cb in callbacks:
out.append(" 0x%x" % cb)
resources = self.check_rsrc(pe)
if len(resources):
out.append(self.header("Resource entries"))
out.append("%-18s %-12s %-12s Type" % ("Name", "RVA", "Size"))
out.append("-" * 60)
for rsrc in resources.keys():
(name,rva,size,type) = resources[rsrc]
out.append("%-18s %-12s %-12s %s" % (name,hex(rva),hex(size),type))
imports = self.check_imports(pe)
if len(imports):
out.append(self.header("Suspicious IAT alerts"))
for imp in imports:
out.append(imp)
out.append(self.header("Sections"))
out.append("%-10s %-12s %-12s %-12s %-12s" % ("Name", "VirtAddr", "VirtSize", "RawSize", "Entropy"))
out.append("-" * 60)
for sec in pe.sections:
s = "%-10s %-12s %-12s %-12s %-12f" % (
''.join([c for c in sec.Name if c in string.printable]),
hex(sec.VirtualAddress),
hex(sec.Misc_VirtualSize),
hex(sec.SizeOfRawData),
sec.get_entropy())
if sec.SizeOfRawData == 0 or \
(sec.get_entropy() > 0 and sec.get_entropy() < 1) or \
sec.get_entropy() > 7:
s += "[SUSPICIOUS]"
out.append(s)
verinfo = self.check_verinfo(pe)
if len(verinfo):
out.append(self.header("Version info"))
out.append(verinfo)
out.append("")
print '\n'.join(out)
count += 1
if __name__ == "__main__":
if len(sys.argv) != 2:
print "Usage: %s \n" % (sys.argv[0])
sys.exit()
object = sys.argv[1]
files = []
if os.path.isdir(object):
for entry in os.listdir(object):
files.append(os.path.join(object, entry))
elif os.path.isfile(object):
files.append(object)
else:
print "You must supply a file or directory!"
sys.exit()
# You should fill these in with a path to your YARA rules and PEiD database
pescan = PEScanner(files, '', '')
pescan.collect()