You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
psrGROUP/env/Lib/site-packages/haystack/listmodel.py

688 lines
31 KiB

# -*- coding: utf-8 -*-
#
from __future__ import print_function
"""
Extension for list grammars.
"""
__author__ = "Loic Jaquemet"
__copyright__ = "Copyright (C) 2012 Loic Jaquemet"
__license__ = "GPL"
__maintainer__ = "Loic Jaquemet"
__email__ = "loic.jaquemet+python@gmail.com"
__status__ = "Beta"
"""
listmodel usage:
C code example:
/* For single pointer lists */
struct SListEntry {
struct SListEntry * next;
}
struct Items {
int hi;
struct SListEntry list_of_items;
}
struct SubItems {
int hi;
struct SListEntry siblings;
}
struct Items2 {
int hi;
struct SListEntry list_of_items;
int ho;
}
/* For double linked lists */
struct DoubleListEntry {
struct DoubleListEntry * next;
struct DoubleListEntry * previous;
}
struct Node {
int hi;
struct DoubleListEntry list_of_nodes;
}
struct Root {
int hi;
struct DoubleListEntry children;
}
struct Child {
float c;
struct DoubleListEntry siblings_child;
}
Registration usage:
In case of a single link list of siblings:
register_single_linked_list_record_type(struct_SListEntry, 'next')
register_linked_list_field_and_type(struct_Items, 'list_of_items', struct_Items, 'list_of_items')
register_linked_list_field_and_type(struct_Items2, 'list_of_items', struct_Items2, 'list_of_items')
In case of a single link list:
register_single_linked_list_record_type(struct_SListEntry, 'next')
register_linked_list_field_and_type(struct_Items, 'list_of_items', struct_SubItems, 'siblings')
register_linked_list_field_and_type(struct_SubItems, 'siblings', struct_SubItems, 'siblings')
In the case of a double linked list of siblings:
register_double_linked_list_record_type(struct_DoubleListEntry, 'next', 'previous')
register_linked_list_field_and_type(struct_Node, 'list_of_nodes', struct_DoubleListEntry, 'list_of_nodes')
In the case of a double linked list of child elements with siblings:
register_double_linked_list_record_type(struct_DoubleListEntry, 'next', 'previous')
register_linked_list_field_and_type(struct_Root, 'children', struct_Child, 'siblings')
register_linked_list_field_and_type(struct_Child, 'siblings', struct_Child, 'siblings')
In code usage:
use
_iterate_list_from_field_with_link_info(record, fieldname, sentinel_values)
to iterate on elements of the list.
"""
import ctypes
import logging
from haystack import basicmodel
from haystack import constraints
log = logging.getLogger('listmodel')
class ListModel(basicmodel.CTypesRecordConstraintValidator):
"""
Helpers to support records with double linked list members.
"""
# single linked list management structure register
_single_link_list_types = dict()
# double linked list management structure register
_double_link_list_types = dict()
# register for record/field part of a linked list
_list_fields = dict()
def is_single_linked_list_type(self, record_type):
return record_type in self._single_link_list_types
def is_double_linked_list_type(self, record_type):
return record_type in self._double_link_list_types
def get_single_linked_list_type(self, record_type):
"""
return the registered single linked list information.
:param record_type:
:return: (forward, sentinels) fieldnames
"""
return self._single_link_list_types[record_type]
def get_double_linked_list_type(self, record_type):
"""
return the registered double linked list information.
:param record_type:
:return: (forward, backward, sentinels) fieldnames
"""
return self._double_link_list_types[record_type]
def register_single_linked_list_record_type(self, record_type, forward, sentinels=None):
"""
Declares a structure to be a single linked list management structure.
register_single_linked_list_record_type(struct_SListEntry, 'forward')
C code example:
struct SListEntry {
struct SListEntry * next;
}
Impacts:
The effect will be the structure type's instance will NOT be validated, or loaded
by the ListModel Constraints validator.
No member, pointers members will be loaded.
But elements of the list can be iterated upon with ListModel._iterate_double_linked_list,
At what point, address validation of both forward and backward pointer
occurs before loading of pointee elements.
:param record_type: the ctypes.Structure type that holds double linked list pointers fields
:param forward: the list pointer fieldname
:return: None
"""
if not issubclass(record_type, ctypes.Structure) and not issubclass(record_type, ctypes.Union):
raise TypeError('Feed me a ctypes record rype')
# test field existences in instance
flink_type = getattr(record_type, forward)
# test field existences in type
d = dict(basicmodel.get_record_type_fields(record_type))
flink_type = d[forward]
if not self._ctypes.is_pointer_type(flink_type):
raise TypeError('The %s field is not a pointer.', forward)
if sentinels is None:
sentinels = {0} # Null pointer
# ok - save that structure information
self._single_link_list_types[record_type] = (forward, sentinels)
return
def register_double_linked_list_record_type(self, record_type, forward, backward, sentinels=None):
"""
Declares a structure to be a double linked list management structure.
register_double_linked_list_record_type(struct_Entry, 'next', 'previous')
C code example:
struct Entry {
struct Entry * next;
struct Entry * previous;
}
Most of the time, these structure type are used as fields of a parent struct_A
record as to either:
a) maintain a list of struct_B elements/children
b) chain to a list of struct_A siblings
The difference between case a) and b) is handled by the following function
register_linked_list_field_and_type(struct_type, field_name, list_entry_type, list_entry_field_name, offset)
Impacts:
The effect will be the structure type's instance will NOT be validated, or loaded
by the ListModel Constraints validator.
No member, pointers members will be loaded.
But elements of the list can be iterated upon with ListModel._iterate_double_linked_list,
At what point, address validation of both forward and backward pointer
occurs before loading of pointee elements.
:param record_type: the ctypes.Structure type that holds double linked list pointers fields
:param forward: the forward pointer
:param backward: the backward pointer
:return: None
"""
if not issubclass(record_type, ctypes.Structure) and not issubclass(record_type, ctypes.Union):
raise TypeError('Feed me a ctypes record rype')
# test field existences in instance
flink_type = getattr(record_type, forward)
blink_type = getattr(record_type, backward)
# test field existences in type
d = dict(basicmodel.get_record_type_fields(record_type))
flink_type = d[forward]
blink_type = d[backward]
if not self._ctypes.is_pointer_type(flink_type):
raise TypeError('The %s field is not a pointer.', forward)
if not self._ctypes.is_pointer_type(blink_type):
raise TypeError('The %s field is not a pointer.', backward)
if sentinels is None:
sentinels = {0} # Null pointer
# ok - save that structure information
self._double_link_list_types[record_type] = (forward, backward, sentinels)
return
def register_linked_list_field_and_type(self, record_type, field_name, list_entry_type, list_entry_field_name):
"""
Register the member <field_name> of a <struct_type> record type,
to be of the starting point of a list of <list_entry_type> record types.
The list member are referenced by the address of field <list_entry_field_name> in <list_entry_type>
The field_name type should have been previously registered with this ListModel.
C code example:
struct Entry {
struct Entry * next;
struct Entry * previous;
}
struct Node {
int hi;
struct Entry list;
}
struct Child {
float c;
struct Entry siblings;
}
Python code example:
In the case of a list of siblings:
register_double_linked_list_record_type(struct_Entry, 'next', 'previous')
register_linked_list_field_and_type(struct_Node, 'list', struct_Node, 'list')
In the case of a list of child elements with siblings:
register_double_linked_list_record_type(struct_Entry, 'next', 'previous')
register_linked_list_field_and_type(struct_Node, 'list', struct_Child, 'siblings')
register_linked_list_field_and_type(struct_Child, 'siblings', struct_Child, 'siblings')
:param record_type: a ctypes record type
:param field_name:
:param list_entry_type:
:param list_entry_field_name:
:return:
"""
if not issubclass(record_type, ctypes.Structure) and not issubclass(record_type, ctypes.Union):
raise TypeError('Feed me a ctypes record type in record_type')
if not issubclass(list_entry_type, ctypes.Structure) and not issubclass(list_entry_type, ctypes.Union):
raise TypeError('Feed me a ctypes record type in list_entry_type')
#
if record_type not in self._list_fields:
self._list_fields[record_type] = dict()
# care offset is now positive
offset = self._utils.offsetof(list_entry_type, list_entry_field_name)
self._list_fields[record_type][field_name] = (list_entry_type, list_entry_field_name, offset)
return
def _get_list_info_for_field_for(self, record_type, fieldname):
"""
Returns the list info for a field in a record_type.
:param record_type:
:param fieldname:
:return:
"""
for x in self._get_list_fields(record_type):
if x[0] == fieldname:
return x
raise ValueError('No such registered list field: %s for type: %s' % (fieldname, record_type.__name__))
def _get_list_fields(self, record_type):
"""
return the field in record_type that are part of a linked list.
:param record_type:
:return: [] if record_type has no registered field that is part of a linked list.
[(field_name, list_entry_type, list_entry_field_name, offset)]
"""
if isinstance(record_type, ctypes.Structure) or isinstance(record_type, ctypes.Union):
raise TypeError('Feed me a type not an instance')
if not issubclass(record_type, ctypes.Structure) and not issubclass(record_type, ctypes.Union):
raise TypeError('Feed me a record type')
# FIXME, ctypes.Structure should not be modified
mro = list(record_type.__mro__[:-3]) # cut Structure, _CData and object
mro.reverse()
#me = mro.pop(-1)
ret = []
for typ in mro: # firsts are first, cls is in here in [-1]
if typ not in self._list_fields:
continue
for field_name, entries in self._list_fields[typ].items():
ret.append((field_name, entries[0], entries[1], entries[2]))
return ret
def iterate_list_from_field(self, record, fieldname, sentinels=None, ignore_head=True):
"""
Iterate over the items of the list designated by fieldname in record.
:param record:
:param fieldname:
:param sentinels: values that stop the iteration
:return:
"""
link_info = self._get_list_info_for_field_for(type(record), fieldname)
return self._iterate_list_from_field_with_link_info(record, link_info, sentinels, ignore_head)
def iterate_list_from_pointer_field(self, pointer_field, target_fieldname, sentinels=None):
"""
Iterate over the items of the list designated by fieldname (a pointer) in record.
Do not ignore head, by default.
:param record:
:param fieldname:
:param sentinels: values that stop the iteration
:return:
"""
if sentinels is None:
sentinels = set()
# instead of asking the pointer field to be registered,
# we will look up directly the pointee type.
# But them how would we know 'which' list to get ? -> target_fieldname in the pointee type.
# the pointee type has to be a registered list record type. Not a parent record
# therefore, the list is obvious.
field_record_type = type(pointer_field)
# if the field is a pointer, assume that the root of the list is the pointee
if not self._ctypes.is_pointer_type(field_record_type):
raise TypeError('Field should be a pointer')
pointee_type = self._ctypes.get_pointee_type(field_record_type)
if not self._ctypes.is_struct_type(pointee_type):
raise TypeError('Field should be a pointer to a structure')
if not self.is_single_linked_list_type(pointee_type) \
and not self.is_double_linked_list_type(pointee_type)\
and pointee_type not in self._list_fields:
raise TypeError('Pointee type %s is not a registered list record type' % pointee_type)
head_addr = self._utils.get_pointee_address(pointer_field)
if head_addr == 0 or head_addr in sentinels:
return []
memory_map = self._memory_handler.is_valid_address_value(head_addr, pointee_type)
if memory_map is False:
log.error("_iterate_list_pointer_field: the root of this linked list has a bad value: 0x%x", head_addr)
raise ValueError('ValueError: the root of this linked list has a bad value: 0x%x' % head_addr)
head = memory_map.read_struct(head_addr, pointee_type)
self._memory_handler.keepRef(head, pointee_type, head_addr)
# you need to use target_fieldname in pointee_type, to know what is the list.
# if type of target_fieldname is registered as a list type
link_info = self._get_list_info_for_field_for(pointee_type, target_fieldname)
return self._iterate_list_from_field_with_link_info(head, link_info, sentinels, ignore_head=False)
def _iterate_list_from_field_with_link_info(self, record, link_info, sentinels=None, ignore_head=True):
"""
iterates over all entry of a double linked list.
we do not return record in the list.
:param record:
:param link_info:
:param sentinels: values that stop the iteration
:return:
"""
if not isinstance(record, ctypes.Structure) and not isinstance(record, ctypes.Union):
raise TypeError('Feed me a ctypes record instance')
if sentinels is None:
sentinels = set()
fieldname, pointee_record_type, lefn, offset = link_info
# get the list field name
head = getattr(record, fieldname)
# and its record_type
field_record_type = type(head)
# handle pointer cases
if self._ctypes.is_pointer_type(field_record_type):
field_record_type = self._ctypes.get_pointee_type(field_record_type)
# check that forward and backwards link field name were registered
iterator_fn = None
if self.is_single_linked_list_type(field_record_type):
iterator_fn = self._iterate_single_linked_list
# stop at the first sign of a previously found list entry
_, gbl_sentinels = self.get_single_linked_list_type(field_record_type)
elif self.is_double_linked_list_type(field_record_type):
iterator_fn = self._iterate_double_linked_list
# stop at the first sign of a previously found list entry
_, _, gbl_sentinels = self.get_double_linked_list_type(field_record_type)
else:
import traceback
print(traceback.print_stack())
raise RuntimeError("Field %s was defined as linked link entry record type %s, but not registered" % (
fieldname, field_record_type))
# now that this is cleared, lets iterate.
if not ignore_head:
# in case of ENTRY x2 list, head is ENTRY.
log.debug('Yield head because NOT Ignoring head in inner')
yield record
# @ of the fieldname in record. This can be different from offset.
head_address = record._orig_address_ + self._utils.offsetof(type(record), fieldname)
head._orig_address_ = head_address
# stop at the first sign of a previously found list entry
if ignore_head:
done = sentinels | gbl_sentinels | {head_address}
log.debug('Ignore head_address self.%s at 0x%0.8x' % (fieldname, head_address))
else:
done = sentinels | gbl_sentinels
log.debug('NOT Ignoring head_address self.%s at 0x%0.8x' % (fieldname, head_address))
# TODO, TU that.
#
log.debug("_iterate_list_from_field_with_link_info Field:%s at offset:%d st_size:%d", fieldname, offset, self._ctypes.sizeof(pointee_record_type))
for x in self._iterate_list_from_field_inner(iterator_fn, head, pointee_record_type, offset, done):
yield x
raise StopIteration
def _iterate_list_from_field_inner(self, iterator_fn, head, pointee_record_type, offset, sentinels):
"""
Use iterator_fn to iterate on the list allocators. (as in struct__LIST_ENTRY)
For each list entry address:
+ verify if the entry address is a sentinel value or already parsed.
+ calculate <list_entry_address> - offset to find the real list entry address
+ check if its in cache and yield it
+ otherwise, if its a valid address, read the record from the new address
+ save it to cache
+ yield it.
:param iterator_fn: an iterator on a registered list entry management structure
:param head: the first list entry
:param pointee_record_type: the list entry record type we want to load
:param offset: the offset between the list entry pointers and the interesting record
:param sentinels: values that indicates we should stop iterating.
:return: pointee_record_type()
"""
# we get all addresses for the instances of the double linked list record_type
# not, the list_member itself. Only the address of the double linked list field.
for entry in iterator_fn(head, sentinels):
# @ of the list member record
list_member_address = entry - offset
log.debug('iterate on entry 0x%x, offset 0x%x res: 0x%x', entry, offset, list_member_address)
# entry is not null. We also ignore record (head_address).
if entry in sentinels:
log.debug('entry 0x%x in sentinel, skipping', entry)
continue
elif list_member_address in sentinels:
log.debug('list_member_address 0x%x in sentinel, skipping', list_member_address)
continue
# use cache if possible, avoid loops.
st = self._memory_handler.getRef(pointee_record_type, list_member_address)
# st._orig_address_ = link
if st is not None:
# we return the cached value
log.debug('_iterate_list_from_field_inner getRef returned cached value on 0x%x', list_member_address)
yield st
else:
memoryMap = self._memory_handler.is_valid_address_value(list_member_address, pointee_record_type)
if memoryMap == False:
type_size = self._ctypes.sizeof(pointee_record_type)
log.error('_iterate_list_from_field_inner: bad value link_addr: 0x%x size: 0x%x', list_member_address, type_size)
log.debug('sentinels: {%s}', ','.join(['0x%0.8x' % s for s in sentinels]))
log.debug('entry on sentinels: %s', entry in sentinels)
log.debug('list_member_address in sentinels: %s', list_member_address in sentinels)
raise ValueError('the link of this linked list has a bad value: 0x%x' % entry)
st = memoryMap.read_struct(list_member_address, pointee_record_type)
st._orig_address_ = list_member_address
self._memory_handler.keepRef(st, pointee_record_type, list_member_address)
yield st
# save the last address as a sentinel
sentinels.add(list_member_address)
raise StopIteration
def _iterate_double_linked_list(self, record, sentinels=None):
"""
iterate forward, then backward, until null or duplicate
:param record:
:return:
"""
# stop when Null
done = {0}
if sentinels is None:
sentinels = {}
obj = record
record_type = type(record)
# we ignore the sentinels here, as this is an internal iterator
forward, backward, _ = self.get_double_linked_list_type(record_type)
log.debug('sentinels: {%s}', ','.join(['0x%0.8x' % s for s in sentinels]))
# make the stack
stack = set()
for fieldname in [forward, backward]:
link = getattr(obj, fieldname)
addr = self._utils.get_pointee_address(link)
stack.add(addr)
# go trough the tree
while 1:
new_nodes = set()
if len(stack) == 0:
log.debug('x2_list: empty stack, stop iteration')
break
for addr in stack:
if addr in done or addr in sentinels:
log.debug('x2_list: addr: 0x%x in done or sentinels', addr)
continue
done.add(addr)
memory_map = self._memory_handler.is_valid_address_value(addr, record_type)
if memory_map is False:
log.error("_iterate_double_linked_list: the link of this linked list has a bad value: 0x%x", addr)
raise ValueError('ValueError: the link of this linked list has a bad value: 0x%x' % addr)
st = memory_map.read_struct(addr, record_type)
st._orig_address_ = addr
self._memory_handler.keepRef(st, record_type, addr)
log.debug("keepRefx2 %s.%s: @%x", record_type.__name__, fieldname, addr)
yield addr
# next
f_addr = self._utils.get_pointee_address(getattr(st, forward))
b_addr = self._utils.get_pointee_address(getattr(st, backward))
new_nodes.add(f_addr)
new_nodes.add(b_addr)
log.debug('_iterate_double_linked_list %s <%s>/0x%x', forward, link.__class__.__name__, f_addr)
log.debug('_iterate_double_linked_list %s <%s>/0x%x', backward, link.__class__.__name__, b_addr)
stack = new_nodes
raise StopIteration
def _iterate_single_linked_list(self, record, sentinels=None):
"""
iterate forward, until null or duplicate
:param record:
:return:
"""
# stop when Null
done = {0}
record_type = type(record)
# handle pointer cases
if self._ctypes.is_pointer_type(record_type):
record_type = self._ctypes.get_pointee_type(record_type)
# we ignore the sentinels here as this is an internal iterator
fieldname, _ = self.get_single_linked_list_type(record_type)
addr = self._utils.get_pointee_address(record)
log.debug('_iterate_single_linked_list <%s>/0x%x', record_type.__name__, addr)
else:
# we ignore the sentinels here as this is an internal iterator
fieldname, _ = self.get_single_linked_list_type(record_type)
# log.debug("sentinels %s", str([hex(s) for s in sentinels]))
link = getattr(record, fieldname)
addr = self._utils.get_pointee_address(link)
log.debug('_iterate_single_linked_list <%s>/0x%x', link.__class__.__name__, addr)
nb = 0
while addr not in done and addr not in sentinels:
done.add(addr)
_map = self._memory_handler.is_valid_address_value(addr, record_type)
if not _map:
log.error("_iterate_single_linked_list: the link of this linked list has a bad value")
raise ValueError('ValueError: the link of this linked list has a bad value: 0x%x' % addr)
st = _map.read_struct(addr, record_type)
st._orig_address_ = addr
self._memory_handler.keepRef(st, record_type, addr)
log.debug("keepRefx2 %s.%s: @%x", record_type.__name__, fieldname, addr)
yield addr
# next
link = getattr(st, fieldname)
addr = self._utils.get_pointee_address(link)
log.debug('_iterate_single_linked_list <%s>/0x%x', link.__class__.__name__, addr)
raise StopIteration
def is_valid(self, record):
"""
Checks if each members has coherent data.
If its a list record, check its list values for sentinels.
"""
if not isinstance(record, ctypes.Structure) and not isinstance(record, ctypes.Union):
raise TypeError('Feed me a record')
# we check for sentinels
sentinels = None
if self.is_single_linked_list_type(type(record)):
forward, sentinels = self.get_single_linked_list_type(type(record))
fieldnames = [forward]
elif self.is_double_linked_list_type(type(record)):
forward, backward, sentinels = self.get_double_linked_list_type(type(record))
fieldnames = [forward, backward]
else:
return super(ListModel, self).is_valid(record)
# check if it is
for fieldname in fieldnames:
link = getattr(record, fieldname)
addr = self._utils.get_pointee_address(link)
if addr in sentinels:
# FIXME: we are here ignoring the values of any other fields in record.
return True
# else transfer control to super
return super(ListModel, self).is_valid(record)
def load_members(self, record, max_depth):
"""
load basic types members,
then load list elements members recursively,
then load list head elements members recursively.
:param record:
:param max_depth:
:return:
"""
if not isinstance(record, ctypes.Structure) and not isinstance(record, ctypes.Union):
raise TypeError('Feed me a ctypes record instance')
if max_depth <= 0:
log.debug('Maximum depth reach. Not loading any deeper members.')
log.debug('Struct partially LOADED. %s not loaded', record.__class__.__name__)
return True
if max_depth > 100:
raise RuntimeError('max_depth')
log.debug('-+ <%s> ListModel load_members +- @%x', record.__class__.__name__, record._orig_address_)
if not super(ListModel, self).load_members(record, max_depth):
log.debug('basicmodel returned False')
return False
# now try to load the list members
log.debug('d:%d load list elements members recursively on %s @%x ', max_depth, type(record).__name__, record._orig_address_)
if self.is_single_linked_list_type(type(record)):
# we cannot devine what the element of the list are gonna be.
return True
elif self.is_double_linked_list_type(type(record)):
# we cannot devine what the element of the list are gonna be.
return True
try:
record_constraints = self._get_constraints_for(record)
# we look at the list we know about
for link_info in self._get_list_fields(type(record)):
attrname = link_info[0]
ignore = False
# shorcut ignores
if attrname in record_constraints:
for constraint in record_constraints[attrname]:
if constraint is constraints.IgnoreMember:
log.debug('ignoring %s as requested', attrname)
ignore = True
break
elif isinstance(constraint, constraints.ListLimitDepthValidation):
max_depth = constraint.max_depth
log.debug('setting max_depth %d as requested', max_depth)
continue
if ignore:
continue
log.debug('d:%d checking listmember %s for %s', max_depth, link_info[0], record.__class__.__name__)
# FIXME: choose if its a pointer to list or a list member
entry_iterator = self._iterate_list_from_field_with_link_info(record, link_info)
self._load_list_entries(record, entry_iterator, max_depth - 1)
except ValueError as e:
log.debug(e)
return False
except RuntimeError as e: # for DEBUG
log.debug(e)
return False
log.debug('-+ <%s> load_members END +-', record.__class__.__name__)
return True
def _load_list_entries(self, record, link_iterator, max_depth):
"""
for the link_info, we list max_depth element of the linked list.
For each entry, we try to load the record using the constraint model.
We keep cache in the memory_handler as to only evaluate every instance once.
we need to load the pointed entry as a valid struct at the right offset,
and parse it.
When does it stop following FLink/BLink ?
sentinel is headAddr only
:param record:
:param link_info:
:param max_depth:
:return:
"""
if not isinstance(record, ctypes.Structure) and not isinstance(record, ctypes.Union):
raise TypeError('Feed me a ctypes record instance')
for list_member in link_iterator:
# load the list entry structure members
log.debug('send %s to load_members' % list_member)
if not self.load_members(list_member, max_depth - 1):
log.error(
'Error while loading members on %s',
record.__class__.__name__)
# print st
raise ValueError('error while loading members')
return True