You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
psrGROUP/env/Lib/site-packages/haystack/mappings/base.py

472 lines
16 KiB

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Provides basic memory _memory_handler helpers.
Short story, the memory of a process is segmented in several memory
zones called memory mapping,
exemple: the heap, the stack, mmap(2)-s of files, mmap(2)-ing a
dynamic library, etc.
Theses memory mapping represent the memory space of a process. Each
mapping hasca start and a end address, which gives boundaries for the
range of valid pointer values.
There are several ways to wraps around a memory mapping, given the precise
scenario you are in. You could need a wrapper for a live process debugging, a
wrapper for a mapping that has been dumps in a file, a wrapper for a mapping
that has been remapped to memory, etc.
Classes:
- MemoryMapping : memory mapping metadata
- ProcessMemoryMapping: memory space from a live process with the possibility to mmap the memspace at any moment.
- LocalMemoryMapping .fromAddress: memorymapping that lives in local space in a ctypes buffer.
- MemoryDumpMemoryMapping .fromFile : memory space from a raw file, with lazy loading capabilities.
- FileBackedMemoryMapping .fromFile : memory space based on a file, with direct read no cache from file.
This code first 150 lines is mostly inspired by python ptrace by Haypo / Victor Skinner.
Its intended to be retrofittable with ptrace's memory _memory_handler.
"""
from past.builtins import long
import logging
# haystack
from haystack import utils
from haystack import model
from haystack.abc import interfaces
from haystack.allocators import heapwalker
__author__ = "Loic Jaquemet"
__copyright__ = "Copyright (C) 2012 Loic Jaquemet"
__email__ = "loic.jaquemet+python@gmail.com"
__license__ = "GPL"
__maintainer__ = "Loic Jaquemet"
__status__ = "Production"
__credits__ = ["Victor Skinner"]
log = logging.getLogger('memorybase')
class AMemoryMapping(interfaces.IMemoryMapping):
"""
Just the metadata.
Attributes:
- start (int): first byte address
- end (int): last byte address + 1
- permissions (str)
- offset (int): for file, offset in bytes from the file start
- major_device / minor_device (int): major / minor device number
- inode (int)
- pathname (str)
- _process: weak reference to the process
Operations:
- "address in mapping" checks the address is in the mapping.
- "search(somestring)" returns the offsets of "somestring" in the mapping
- "mmap" mmap the MemoryMap to local address space
- "readWord()": read a memory word, from local mmap-ed memory if mmap-ed
- "readBytes()": read some bytes, from local mmap-ed memory if mmap-ed
- "readStruct()": read a structure, from local mmap-ed memory if mmap-ed
- "readArray()": read an array, from local mmap-ed memory if mmap-ed
- "readCString()": read a C string, from local mmap-ed memory if mmap-ed
- "str(mapping)" create one string describing the mapping
- "repr(mapping)" create a string representation of the mapping,
useful in list contexts
"""
def __init__(self, start, end, permissions, offset,
major_device, minor_device, inode, pathname):
self.start = start
self.end = end
self.permissions = permissions
self.offset = offset
self.major_device = major_device
self.minor_device = minor_device
self.inode = inode
self.pathname = str(pathname) # fix None
self._is_heap = False
self._is_heap_addr = None
self._ctypes = None
self._utils = None
def set_ctypes(self, _ctypes):
self._ctypes = _ctypes
self._utils = utils.Utils(_ctypes)
def __contains__(self, address):
return self.start <= address < self.end
def __str__(self):
start = '0x%0.8x' % self.start
end = '0x%0.8x' % self.end
size = 'size:0x%0.8x' % (self.end-self.start)
offset = 'offset:0x%0.8x' % self.offset
device = '%0.2x:%0.2x' % (self.major_device, self.minor_device)
inode = 'inode:%0.7d' % self.inode
text = ' '.join([start, end, self.permissions, size, offset, device, inode, str(self.pathname)])
return text
__repr__ = __str__
def __len__(self):
return int(self.end - self.start)
def __gt__(self, o):
return self.start > o.start
def search(self, bytestr):
bytestr_len = len(bytestr)
buf_len = 64 * 1024
if buf_len < bytestr_len:
buf_len = bytestr_len
remaining = self.end - self.start
covered = self.start
while remaining >= bytestr_len:
if remaining > buf_len:
requested = buf_len
else:
requested = remaining
data = self.read_bytes(covered, requested)
if data == "":
break
offset = data.find(bytestr)
if offset == -1:
skip = requested - bytestr_len + 1
else:
yield (covered + offset)
skip = offset + bytestr_len
covered += skip
remaining -= skip
return
def read_cstring(self, address, max_size, chunk_length=256):
""" Read character up to max_size until a \x00 byte is found """
string = []
size = 0
truncated = False
while True:
done = False
data = self.read_bytes(address, chunk_length)
if '\0' in data:
done = True
data = data[:data.index('\0')]
if max_size <= size + chunk_length:
data = data[:(max_size - size)]
string.append(data)
truncated = True
break
string.append(data)
if done:
break
size += chunk_length
address += chunk_length
return ''.join(string), truncated
def _vtop(self, vaddr):
ret = vaddr - self.start
if ret < 0 or ret > len(self):
raise ValueError(
'%x/%x is not a valid vaddr for me' %
(vaddr, ret))
return ret
def _ptov(self, paddr):
pstart = self._vtop(self.start)
vaddr = paddr - pstart
return vaddr
# ---- to implement if needed
def read_word(self, address):
raise NotImplementedError(self)
def read_bytes(self, address, size):
raise NotImplementedError(self)
def read_struct(self, address, struct):
raise NotImplementedError(self)
def read_array(self, address, basetype, count):
raise NotImplementedError(self)
def rebase(self, new_start_address):
log.debug("rebasing 0x%0.8x -> 0x%0.8x", self.start, new_start_address)
end = new_start_address + len(self)
self.start = new_start_address
self.end = end
class MemoryHandler(interfaces.IMemoryHandler, interfaces.IMemoryCache):
"""
Handler for the concept of process memory.
Parse a process memory _memory_handler from a storage concept,
then identify its ITargetPlatform characteristics
and produce an IMemoryHandler for this process memory dump """
def __init__(self, mapping_list, _target, name):
"""Set the list of IMemoryMapping and the ITargetPlatform
:param mapping_list: list of IMemoryMapping
:param _target: the ITargetPlatform
:return: IMemoryHandler, self
:rtype: IMemoryHandler
"""
if not isinstance(mapping_list, list):
raise TypeError('Please feed me a list of IMemoryMapping')
if not isinstance(_target, interfaces.ITargetPlatform):
raise TypeError('Please feed me a ITargetPlatform')
self._mappings = sorted(mapping_list)
self._target = _target
for m in mapping_list:
m.set_ctypes(self._target.get_target_ctypes())
self._utils = self._target.get_target_ctypes_utils()
self.__name = name
# book register to keep references to ctypes memory buffers
self.__book = _book()
self.__user_model = model.Model(self._target.get_target_ctypes())
self.__internal_model = model.Model(self._target.get_target_ctypes())
# FIXME reduce open files.
self.__required_maps = []
# finish initialization
self._heap_finder = None
self.__optim_get_mapping_for_address()
self.__context = None
def get_name(self):
"""Returns the name of the process memory dump we are analysing"""
return self.__name
def get_target_platform(self):
"""Returns the ITargetPlatform for that process memory."""
return self._target
def get_heap_finder(self):
"""Returns the IHeapFinder for that process memory."""
if self._heap_finder is None:
self._heap_finder = heapwalker.make_heap_finder(self)
return self._heap_finder
def get_model(self):
"""Returns the Model cache."""
return self.__user_model
# FIXME incorrect API
def _get_mapping(self, pathname):
mmap = None
if len(self._mappings) >= 1:
mmap = [m for m in self._mappings if m.pathname == pathname]
if len(mmap) < 1:
raise IndexError('No mmap of pathname %s' % pathname)
return mmap
def get_mappings(self):
return list(self._mappings)
def reset_mappings(self):
"""
Temporarly closes all file used by this handler.
:return:
"""
log.debug('reset_mappings')
# clean the book
self.__book = _book()
# reset the mappings
for m in self.get_mappings():
m.reset()
def __optim_get_mapping_for_address(self):
self.__optim_get_mapping_for_address_cache = dict()
for m in self.get_mappings():
for i in range(m.start, m.end, 0x1000):
self.__optim_get_mapping_for_address_cache[i] = m
return
def get_mapping_for_address(self, vaddr):
# TODO: optimization. 127s out of 288s = 40%
assert isinstance(vaddr, long) or isinstance(vaddr, int)
# check 4 Mo boundaries
_boundary_addr = (vaddr >> 12) << 12
if _boundary_addr in self.__optim_get_mapping_for_address_cache:
return self.__optim_get_mapping_for_address_cache[_boundary_addr]
return False
# reverse helper
def get_reverse_context(self):
from haystack.reverse import context
if self.__context is None:
# try to cache load
# otherwise, create an empty one.
self.__context = context.ProcessContext(self)
return self.__context
def is_valid_address(self, obj, structType=None): # FIXME is valid pointer
"""
:param obj: the obj to evaluate.
:param structType: the object's type, so the size could be taken in consideration.
Returns False if the object address is NULL.
Returns False if the object address is not in a mapping.
Returns the mapping in which the object stands otherwise.
"""
# check for null pointers
addr = self._utils.get_pointee_address(obj)
if addr == 0:
return False
return self.is_valid_address_value(addr, structType)
def is_valid_address_value(self, addr, structType=None):
"""
:param addr: the address to evaluate.
:param structType: the object's type, so the size could be taken in consideration.
Returns False if the object address is NULL.
Returns False if the object address is not in a mapping.
Returns False if the object overflows the mapping.
Returns the mapping in which the address stands otherwise.
"""
my_ctypes = self._target.get_target_ctypes()
m = self.get_mapping_for_address(addr)
log.debug('is_valid_address_value = %x %s' % (addr, m))
if m:
if structType is not None:
s = my_ctypes.sizeof(structType)
if (addr + s) < m.start or (addr + s) > m.end:
return False
return m
return False
def __contains__(self, vaddr):
for m in self._mappings:
if vaddr in m:
return True
return False
def __len__(self):
return len(self._mappings)
def __getitem__(self, i):
return self._mappings[i]
def __setitem__(self, i, val):
raise NotImplementedError()
def __iter__(self):
return iter(self._mappings)
def __str__(self):
return "<MemoryHandler for %s with %d mappings>" % (self.get_name(), len(self.get_mappings()))
def reset(self):
"""Clean the book"""
self.__book.refs = dict()
def getRefs(self):
"""Lists all references to already loaded structs. Useful for debug"""
return self.__book.refs.items()
def printRefs(self):
"""Prints all references to already loaded structs. Useful for debug"""
l = [(typ, obj, addr)
for ((typ, addr), obj) in self.__book.refs.items()]
for i in l:
print(l)
def printRefsLite(self):
"""Prints all references to already loaded structs. Useful for debug"""
l = [(typ, addr) for ((typ, addr), obj) in self.__book.refs.items()]
for i in l:
print(l)
def hasRef(self, typ, origAddr):
"""Check if this type has already been loaded at this address"""
return (typ, origAddr) in self.__book.refs
def getRef(self, typ, origAddr):
"""Returns the reference to the type previously loaded at this address"""
if (typ, origAddr) in self.__book.refs:
return self.__book.getRef(typ, origAddr)
return None
def getRefByAddr(self, addr):
ret = []
for (typ, origAddr) in self.__book.refs.keys():
if origAddr == addr:
ret.append((typ, origAddr, self.__book.refs[(typ, origAddr)]))
return ret
def keepRef(self, obj, typ=None, origAddr=None):
"""Keeps a reference for an object of a specific type loaded from a specific
address.
Sometypes, your have to cast a c_void_p, You can keep ref in Ctypes object,
they might be transient (if obj == somepointer.contents)."""
# TODO, memory leak for different objects of same size, overlapping
# struct.
if (typ, origAddr) in self.__book.refs:
# ADDRESS already in refs
if origAddr is None:
origAddr = 'None'
else:
origAddr = hex(origAddr)
if typ is not None:
log.debug('ignore keepRef - references already in cache %s/%s', typ, origAddr)
return
# there is no pre-existing typ().from_address(origAddr)
self.__book.addRef(obj, typ, origAddr)
return
def delRef(self, typ, origAddr):
"""Forget about a Ref."""
if (typ, origAddr) in self.__book.refs:
self.__book.delRef(typ, origAddr)
return
def rebase_mapping(self, user_mapping, new_start_address):
"""
Rebase a mapping in a new address space.
:param user_mapping:
:param new_start_address:
:return:
"""
if user_mapping not in self._mappings:
raise ValueError("User mapping not found")
log.debug("rebase_mapping 0x%0.8x -> 0x%0.8x", user_mapping.start, new_start_address)
user_mapping = self._mappings[self._mappings.index(user_mapping)]
user_mapping.rebase(new_start_address)
# end = new_start_address + len(user_mapping)
# user_mapping.start = new_start_address
# user_mapping.end = end
self._mappings.sort()
# reset the caches too
self.__optim_get_mapping_for_address()
return user_mapping
class _book(object):
"""The book registers all registered ctypes modules and keeps
some pointer refs to buffers allocated in memory _memory_handler.
# see also ctypes._pointer_type_cache , _reset_cache()
"""
def __init__(self):
self.refs = dict()
pass
def addRef(self, obj, typ, addr):
self.refs[(typ, addr)] = obj
def getRef(self, typ, addr):
if len(self.refs) > 35000:
log.warning('the book is full, you should haystack.model.reset()')
return self.refs[(typ, addr)]
def delRef(self, typ, addr):
del self.refs[(typ, addr)]