You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
149 lines
5.5 KiB
149 lines
5.5 KiB
# -*- coding: utf-8 -*-
|
|
|
|
import logging
|
|
import pkg_resources
|
|
|
|
from haystack.abc import interfaces
|
|
|
|
log = logging.getLogger('heapwalker')
|
|
|
|
SUPPORTED_ALLOCATORS = {}
|
|
|
|
|
|
def _discover_supported_allocators():
|
|
# TODO use it in memory dump discovery. Maybe add platform selectors to Finder interface
|
|
for entry_point in pkg_resources.iter_entry_points("haystack.heap_finder"):
|
|
SUPPORTED_ALLOCATORS[entry_point.name] = entry_point.resolve()
|
|
|
|
|
|
class HeapWalker(interfaces.IHeapWalker):
|
|
|
|
def __init__(self, memory_handler, target_platform, heap_module, heap_mapping, heap_module_constraints, address):
|
|
if not isinstance(memory_handler, interfaces.IMemoryHandler):
|
|
raise TypeError("Feed me a IMemoryHandler")
|
|
if not isinstance(target_platform, interfaces.ITargetPlatform):
|
|
raise TypeError("Feed me a ITargetPlatform")
|
|
self._memory_handler = memory_handler
|
|
self._target = target_platform
|
|
self._heap_module = heap_module
|
|
self._heap_mapping = heap_mapping
|
|
self._heap_module_constraints = heap_module_constraints
|
|
self._address = address
|
|
self._init_heap()
|
|
|
|
def _init_heap(self):
|
|
""" Initialize anything"""
|
|
raise NotImplementedError('Please implement all methods')
|
|
|
|
def get_target_platform(self):
|
|
"""Returns the ITargetPlatform for that process memory."""
|
|
return self._target
|
|
|
|
def get_heap_address(self):
|
|
return self._address
|
|
|
|
def get_heap(self):
|
|
""" return the ctypes heap struct mapped at address on the mapping"""
|
|
raise NotImplementedError('Please implement all methods')
|
|
|
|
def get_heap_mapping(self):
|
|
""" return the mapping containing the root HEAP record"""
|
|
return self._heap_mapping
|
|
|
|
def get_heap_validator(self):
|
|
""" return the validator """
|
|
raise NotImplementedError('Please implement all methods')
|
|
|
|
def get_user_allocations(self):
|
|
""" returns all User allocations (addr,size) """
|
|
raise NotImplementedError('Please implement all methods')
|
|
|
|
def get_free_chunks(self):
|
|
""" returns all free chunks in the heap (addr,size) """
|
|
raise NotImplementedError('Please implement all methods')
|
|
|
|
def __contains__(self, address):
|
|
""" Does the heap walker or its relevant segments contains this address"""
|
|
raise NotImplementedError('Please implement all methods')
|
|
|
|
|
|
class HeapFinder(interfaces.IHeapFinder):
|
|
|
|
def __init__(self, memory_handler):
|
|
"""
|
|
:param memory_handler: IMemoryHandler
|
|
:return: HeapFinder
|
|
"""
|
|
if not isinstance(memory_handler, interfaces.IMemoryHandler):
|
|
raise TypeError('Feed me a IMemoryHandlerobject')
|
|
self._memory_handler = memory_handler
|
|
self._target = self._memory_handler.get_target_platform()
|
|
# optimisations
|
|
self._heap_walkers = None
|
|
self._heap_walkers_dict = None
|
|
|
|
def get_heap_walker(self, mapping):
|
|
if not isinstance(mapping, interfaces.IMemoryMapping):
|
|
raise TypeError('Feed me a IMemoryMapping object')
|
|
if not self._heap_walkers_dict:
|
|
self.list_heap_walkers()
|
|
# BUG FIXME reverse
|
|
if mapping.start not in self._heap_walkers_dict:
|
|
raise ValueError('mapping not used as a heap')
|
|
walker = self._heap_walkers_dict[mapping.start]
|
|
return walker
|
|
|
|
def list_heap_walkers(self):
|
|
"""return the list of heaps that load as heaps"""
|
|
if not self._heap_walkers:
|
|
self._heap_walkers = []
|
|
for mapping in self._memory_handler:
|
|
walker = self._find_heap(mapping)
|
|
if walker:
|
|
self._heap_walkers.append(walker)
|
|
# sort the list
|
|
self._heap_walkers.sort(key=lambda walker: walker.get_heap_address())
|
|
# FIXME, so do we have heaps in the middle of a mapping or not ?
|
|
# FIXME: what about segments
|
|
self._heap_walkers_dict = dict([(w.get_heap_address(), w) for w in self._heap_walkers])
|
|
return self._heap_walkers
|
|
|
|
def search_heap_direct(self, start_address_mapping):
|
|
"""
|
|
return a ctypes heap struct mapped at address on the mapping
|
|
Will use the memory handler
|
|
"""
|
|
raise NotImplementedError(self)
|
|
|
|
def _find_heap(self, mapping):
|
|
"""
|
|
return a ctypes heap struct mapped at address on the mapping.
|
|
Funny enough, a X64 process could have 32 bits and 64 bits heaps.
|
|
"""
|
|
raise NotImplementedError(self)
|
|
|
|
|
|
def make_heap_finder(memory_handler):
|
|
"""
|
|
Build a heap_finder for this memory_handler
|
|
|
|
:param memory_handler: IMemoryHandler
|
|
:return: a heap walker for that platform
|
|
:rtype: IHeapWalker
|
|
"""
|
|
if not isinstance(memory_handler, interfaces.IMemoryHandler):
|
|
raise TypeError('memory_handler should be an IMemoryHandler')
|
|
target_platform = memory_handler.get_target_platform()
|
|
os_name = target_platform.get_os_name()
|
|
if os_name == 'linux':
|
|
from haystack.allocators.libc import libcheapwalker
|
|
return libcheapwalker.LibcHeapFinder(memory_handler)
|
|
elif os_name == 'winxp':
|
|
from haystack.allocators.win32 import winxpheapwalker
|
|
return winxpheapwalker.WinXPHeapFinder(memory_handler)
|
|
elif os_name == 'win7':
|
|
from haystack.allocators.win32 import win7heapwalker
|
|
return win7heapwalker.Win7HeapFinder(memory_handler)
|
|
else:
|
|
raise NotImplementedError('Heap Walker not found for os %s', os_name)
|