You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
326 lines
14 KiB
326 lines
14 KiB
# models.py
|
|
# ---------
|
|
# Licensing Information: You are free to use or extend these projects for
|
|
# educational purposes provided that (1) you do not distribute or publish
|
|
# solutions, (2) you retain this notice, and (3) you provide clear
|
|
# attribution to UC Berkeley, including a link to http://ai.berkeley.edu.
|
|
#
|
|
# Attribution Information: The Pacman AI projects were developed at UC Berkeley.
|
|
# The core projects and autograders were primarily created by John DeNero
|
|
# (denero@cs.berkeley.edu) and Dan Klein (klein@cs.berkeley.edu).
|
|
# Student side autograding was added by Brad Miller, Nick Hay, and
|
|
# Pieter Abbeel (pabbeel@cs.berkeley.edu).
|
|
|
|
|
|
from collections import OrderedDict
|
|
import numpy as np
|
|
import tensorflow as tf
|
|
import tensorflow_util as tfu
|
|
import util
|
|
|
|
_SEED = 66478 # Set to None for random seed.
|
|
_RANDOM = None
|
|
|
|
def get_fixed_random():
|
|
global _RANDOM
|
|
if _RANDOM is None:
|
|
_RANDOM = util.FixedRandom()
|
|
return _RANDOM
|
|
|
|
def truncated_normal(shape, mean=0.0, stddev=1.0, dtype=np.float32, fixed_random=None):
|
|
"""
|
|
Outputs random values from a truncated normal distribution.
|
|
|
|
The generated values follow a normal distribution with specified mean and
|
|
standard deviation, except that values whose magnitude is more than 2
|
|
standard deviations from the mean are dropped and re-picked.
|
|
"""
|
|
if fixed_random is None:
|
|
fixed_random = get_fixed_random()
|
|
value = np.empty(shape, dtype=dtype)
|
|
for v in np.nditer(value, op_flags=['readwrite']):
|
|
new_v = None
|
|
while new_v is None or abs(new_v - mean) > 2 * abs(stddev):
|
|
new_v = fixed_random.random.normalvariate(mean, stddev)
|
|
v[...] = new_v
|
|
return value
|
|
|
|
|
|
class Model(object):
|
|
def __init__(self, input_ph=None, prediction_tensor=None, max_eval_batch_size=500):
|
|
self.input_ph = input_ph
|
|
self.prediction_tensor = prediction_tensor
|
|
self._param_vars = OrderedDict()
|
|
self._fixed_random = util.FixedRandom() # deterministically initialize weights
|
|
self._max_eval_batch_size = max_eval_batch_size
|
|
|
|
@property
|
|
def input_shape(self):
|
|
input_shape = tuple(self.input_ph.get_shape().as_list()[1:]) # discard leading dimension (batch size)
|
|
if None in input_shape:
|
|
raise ValueError("the shape of the input_phs should be defined with the except of the leading dimension")
|
|
return input_shape
|
|
|
|
def add_param_var(self, param_var, name=None, **tags):
|
|
if not isinstance(param_var, tf.Variable):
|
|
param_var = tf.Variable(param_var, name=name)
|
|
# parameters are trainable and regularizable by default
|
|
tags['trainable'] = tags.get('trainable', True)
|
|
tags['regularizable'] = tags.get('regularizable', True)
|
|
self._param_vars[param_var] = set(tag for tag, value in tags.items() if value)
|
|
return param_var
|
|
|
|
def get_param_vars(self, **tags):
|
|
"""
|
|
Modified from here: https://github.com/Lasagne/Lasagne/blob/master/lasagne/layers/base.py
|
|
"""
|
|
result = list(self._param_vars.keys())
|
|
|
|
only = set(tag for tag, value in tags.items() if value)
|
|
if only:
|
|
# retain all parameters that have all of the tags in `only`
|
|
result = [param_var for param_var in result
|
|
if not (only - self._param_vars[param_var])]
|
|
|
|
exclude = set(tag for tag, value in tags.items() if not value)
|
|
if exclude:
|
|
# retain all parameters that have none of the tags in `exclude`
|
|
result = [param_var for param_var in result
|
|
if not (self._param_vars[param_var] & exclude)]
|
|
return result
|
|
|
|
def get_param_values(self, **tags):
|
|
param_vars = self.get_param_vars(**tags)
|
|
return [param_var.eval(session=tfu.get_session()) for param_var in param_vars]
|
|
|
|
def set_param_values(self, param_values, **tags):
|
|
param_vars = self.get_param_vars(**tags)
|
|
if len(param_values) != len(param_vars):
|
|
raise ValueError('there are %d parameter variables with the given tags'
|
|
'but %d parameter values were given' % (len(param_vars), len(param_values)))
|
|
tfu.get_session().run([tf.assign(param_var, param_value) for (param_var, param_value) in zip(param_vars, param_values)])
|
|
|
|
def get_batch_size(self, input_):
|
|
if input_.shape == self.input_shape: # input data is not batched
|
|
batch_size = 0
|
|
elif input_.shape[1:] == self.input_shape:
|
|
batch_size = input_.shape[0]
|
|
else:
|
|
raise ValueError('expecting input of shape %r or %r but got input of shape %r' %
|
|
(self.input_shape, (None,) + self.input_shape, input_.shape))
|
|
return batch_size
|
|
|
|
def predict(self, input_):
|
|
batch_size = self.get_batch_size(input_)
|
|
if batch_size == 0:
|
|
input_ = input_[None, :]
|
|
# do the computation in smaller chunks because some GPUs don't have too much memory
|
|
# the following block of code is equivalent to this line
|
|
# prediction = self.prediction_tensor.eval(session=tfu.get_session(), feed_dict=dict([(self.input_ph, input_)]))
|
|
predictions = []
|
|
for i in range(0, batch_size, self._max_eval_batch_size):
|
|
excerpt = slice(i, min(i+self._max_eval_batch_size, batch_size))
|
|
prediction = self.prediction_tensor.eval(session=tfu.get_session(),
|
|
feed_dict=dict([(self.input_ph, input_[excerpt])]))
|
|
predictions.append(prediction)
|
|
prediction = np.concatenate(predictions, axis=0)
|
|
if batch_size == 0:
|
|
prediction = np.squeeze(prediction, axis=0)
|
|
return prediction
|
|
|
|
|
|
class LinearRegressionModel(Model):
|
|
def __init__(self, num_features=784, num_labels=10):
|
|
super(LinearRegressionModel, self).__init__()
|
|
# input and target placeholder variables
|
|
self.x = tf.placeholder(tf.float32, shape=(None, num_features))
|
|
self.input_ph = self.x
|
|
|
|
# parameter variables
|
|
self.W = self.add_param_var(truncated_normal([num_features, num_labels], stddev=0.1, fixed_random=self._fixed_random), name='W')
|
|
self.b = self.add_param_var(tf.constant(0.1, shape=[num_labels]), name='b', regularizable=False)
|
|
|
|
# prediction tensor
|
|
self.y = tf.matmul(self.x, self.W) + self.b
|
|
self.prediction_tensor = self.y
|
|
|
|
# initialize parameters
|
|
tfu.get_session().run([param_var.initializer for param_var in self.get_param_vars()])
|
|
|
|
|
|
class ClassifierModel(Model):
|
|
def classify(self, input_datum_or_data):
|
|
"""
|
|
Classifies a datum or each datum in a list of data.
|
|
|
|
Args:
|
|
input_datum_or_data: a 1-dimensional np.array of a single datum or
|
|
a 2-dimensional np.array of data where each row is a datum.
|
|
|
|
Returns:
|
|
An integer (representing a label) if a single datum is passed in, or
|
|
a list of integers (representing the labels) if multiple data
|
|
is passed in.
|
|
"""
|
|
prediction = self.predict(input_datum_or_data)
|
|
category = np.argmax(prediction, axis=-1)
|
|
return category
|
|
|
|
def accuracy(self, input_data, target_data):
|
|
"""
|
|
Computes the accuracy of the model classification predictions.
|
|
|
|
Args:
|
|
input_data: a 2-dimensional np.array of input data where each row is
|
|
a datum.
|
|
target_data: a 2-dimensional np.array of correct labels where each
|
|
row is a probability distribution over the labels (or
|
|
alternatively, a one-hot vector representation of the label).
|
|
|
|
Returns:
|
|
A float, the accuracy of the model for the given data.
|
|
"""
|
|
category_labels = np.argmax(target_data, axis=-1)
|
|
correct_prediction = self.classify(input_data) == category_labels
|
|
accuracy = correct_prediction.mean()
|
|
return accuracy
|
|
|
|
|
|
class SoftmaxRegressionModel(ClassifierModel):
|
|
def __init__(self, num_features=784, num_labels=10):
|
|
super(SoftmaxRegressionModel, self).__init__()
|
|
# input and target placeholder variables
|
|
self.x = tf.placeholder(tf.float32, shape=(None, num_features))
|
|
self.input_ph = self.x
|
|
|
|
# parameter variables
|
|
self.W = self.add_param_var(truncated_normal([num_features, num_labels], stddev=0.1, fixed_random=self._fixed_random), name='W')
|
|
self.b = self.add_param_var(tf.constant(0.1, shape=[num_labels]), name='b', regularizable=False)
|
|
|
|
# prediction tensor
|
|
self.y = tf.nn.softmax(tf.matmul(self.x, self.W) + self.b)
|
|
self.prediction_tensor = self.y
|
|
|
|
# initialize parameters
|
|
tfu.get_session().run([param_var.initializer for param_var in self.get_param_vars()])
|
|
|
|
|
|
|
|
class ConvNetModel(ClassifierModel):
|
|
def __init__(self, use_batchnorm=False, use_dropout=False, x_shape=(None, 28, 28, 1), num_labels=10):
|
|
super(ConvNetModel, self).__init__()
|
|
_, image_size, _, num_channels = x_shape
|
|
assert x_shape[2] == image_size
|
|
self.x = tf.placeholder(tf.float32, shape=x_shape)
|
|
self.input_ph = self.x
|
|
is_train = True
|
|
init_symmetry = False
|
|
var_eps = 1e-20
|
|
use_global_bn = True
|
|
if use_global_bn:
|
|
bn_axes = [0,1,2]
|
|
else:
|
|
bn_axes = [0]
|
|
|
|
if init_symmetry:
|
|
conv1_weights = tf.Variable(
|
|
tf.zeros([5, 5, num_channels, 32])) # 5x5 filter, depth 32.
|
|
conv1_biases = tf.Variable(tf.zeros([32]))
|
|
conv2_weights = tf.Variable(
|
|
tf.zeros([5, 5, 32, 64]))
|
|
conv2_biases = tf.Variable(tf.zeros([64]))
|
|
fc1_weights = tf.Variable( # fully connected, depth 512.
|
|
tf.constant(0.1,
|
|
shape = [image_size // 4 * image_size // 4 * 64, 512],
|
|
))
|
|
fc1_biases = tf.Variable(tf.constant(0.1, shape=[512]))
|
|
fc2_weights = tf.Variable(
|
|
tf.constant(0.1,shape=[512, num_labels],
|
|
))
|
|
fc2_biases = tf.Variable(tf.constant(0.1, shape=[num_labels]))
|
|
else:
|
|
conv1_weights = tf.Variable(
|
|
truncated_normal([5, 5, num_channels, 32], # 5x5 filter, depth 32.
|
|
stddev=0.1, fixed_random=self._fixed_random))
|
|
conv1_biases = tf.Variable(tf.zeros([32]))
|
|
conv2_weights = tf.Variable(
|
|
truncated_normal([5, 5, 32, 64],
|
|
stddev=0.1, fixed_random=self._fixed_random))
|
|
conv2_biases = tf.Variable(tf.constant(0., shape=[64]))
|
|
fc1_weights = tf.Variable( # fully connected, depth 512.
|
|
truncated_normal(
|
|
[image_size // 4 * image_size // 4 * 64, 512],
|
|
stddev=0.1, fixed_random=self._fixed_random))
|
|
fc1_biases = tf.Variable(tf.constant(0., shape=[512]))
|
|
fc2_weights = tf.Variable(
|
|
truncated_normal([512, num_labels],
|
|
stddev=0.1, fixed_random=self._fixed_random))
|
|
fc2_biases = tf.Variable(tf.constant(0., shape=[num_labels]))
|
|
|
|
|
|
# Add parameter variables for solvers
|
|
self.conv1_weights = self.add_param_var(conv1_weights)
|
|
self.conv1_biases = self.add_param_var(conv1_biases)
|
|
self.conv2_weights = self.add_param_var(conv2_weights)
|
|
self.conv2_biases = self.add_param_var(conv2_biases)
|
|
self.fc1_weights = self.add_param_var(fc1_weights)
|
|
self.fc1_biases = self.add_param_var(fc1_biases)
|
|
self.fc2_weights = self.add_param_var(fc2_weights)
|
|
self.fc2_biases = self.add_param_var(fc2_biases)
|
|
|
|
#Run Inference
|
|
conv = tf.nn.conv2d(self.x,
|
|
conv1_weights,
|
|
strides=[1, 1, 1, 1],
|
|
padding='SAME')
|
|
|
|
conv = tf.nn.bias_add(conv, conv1_biases)
|
|
|
|
#Add batch norm
|
|
if use_batchnorm:
|
|
mean,variance = tf.nn.moments(conv, bn_axes)
|
|
conv = tf.nn.batch_normalization(conv, mean, variance, None,None,var_eps)
|
|
|
|
relu = tf.nn.relu(conv)
|
|
pool = tf.nn.max_pool(relu,
|
|
ksize=[1, 2, 2, 1],
|
|
strides=[1, 2, 2, 1],
|
|
padding='SAME')
|
|
|
|
conv = tf.nn.conv2d(pool,
|
|
conv2_weights,
|
|
strides=[1, 1, 1, 1],
|
|
padding='SAME')
|
|
|
|
conv = tf.nn.bias_add(conv, conv2_biases)
|
|
#Add batch norm
|
|
if use_batchnorm:
|
|
mean,variance = tf.nn.moments(conv, bn_axes)
|
|
conv = tf.nn.batch_normalization(conv, mean, variance, None,None,var_eps)
|
|
|
|
relu = tf.nn.relu(conv)
|
|
pool = tf.nn.max_pool(relu,
|
|
ksize=[1, 2, 2, 1],
|
|
strides=[1, 2, 2, 1],
|
|
padding='SAME')
|
|
|
|
|
|
#Reshape the feature map cuboid into a 2D matrix to feed it to the
|
|
# fully connected layers.
|
|
pool_shape = pool.get_shape().as_list()
|
|
reshape = tf.reshape(
|
|
pool,
|
|
[-1, pool_shape[1] * pool_shape[2] * pool_shape[3]])
|
|
# Fully connected layer. Note that the '+' operation automatically
|
|
# broadcasts the biases.
|
|
hidden = tf.nn.relu(tf.matmul(reshape, fc1_weights) + fc1_biases)
|
|
# Add a 50% dropout during training only. Dropout also scales
|
|
# activations such that no rescaling is needed at evaluation time.
|
|
if is_train and use_dropout:
|
|
hidden = tf.nn.dropout(hidden, 0.5, seed=_SEED)
|
|
|
|
logits = tf.matmul(hidden, fc2_weights) + fc2_biases
|
|
self.prediction_tensor = tf.nn.softmax(logits)
|
|
|
|
tfu.get_session().run([param_var.initializer for param_var in self.get_param_vars()])
|