You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

571 lines
20 KiB

# inference.py
# ------------
# Licensing Information: You are free to use or extend these projects for
# educational purposes provided that (1) you do not distribute or publish
# solutions, (2) you retain this notice, and (3) you provide clear
# attribution to UC Berkeley, including a link to http://ai.berkeley.edu.
#
# Attribution Information: The Pacman AI projects were developed at UC Berkeley.
# The core projects and autograders were primarily created by John DeNero
# (denero@cs.berkeley.edu) and Dan Klein (klein@cs.berkeley.edu).
# Student side autograding was added by Brad Miller, Nick Hay, and
# Pieter Abbeel (pabbeel@cs.berkeley.edu).
import itertools
import random
import busters
import game
from util import manhattanDistance
class DiscreteDistribution(dict):
"""
A DiscreteDistribution models belief distributions and weight distributions
over a finite set of discrete keys.
"""
def __getitem__(self, key):
self.setdefault(key, 0)
return dict.__getitem__(self, key)
def copy(self):
"""
Return a copy of the distribution.
"""
return DiscreteDistribution(dict.copy(self))
def argMax(self):
"""
Return the key with the highest value.
"""
if len(self.keys()) == 0:
return None
all = self.items()
values = [x[1] for x in all]
maxIndex = values.index(max(values))
return all[maxIndex][0]
def total(self):
"""
Return the sum of values for all keys.
"""
return float(sum(self.values()))
def normalize(self):
"""
Normalize the distribution such that the total value of all keys sums
to 1. The ratio of values for all keys will remain the same. In the case
where the total value of the distribution is 0, do nothing.
>>> dist = DiscreteDistribution()
>>> dist['a'] = 1
>>> dist['b'] = 2
>>> dist['c'] = 2
>>> dist['d'] = 0
>>> dist.normalize()
>>> list(sorted(dist.items()))
[('a', 0.2), ('b', 0.4), ('c', 0.4), ('d', 0.0)]
>>> dist['e'] = 4
>>> list(sorted(dist.items()))
[('a', 0.2), ('b', 0.4), ('c', 0.4), ('d', 0.0), ('e', 4)]
>>> empty = DiscreteDistribution()
>>> empty.normalize()
>>> empty
{}
"""
"*** YOUR CODE HERE ***"
total = float(self.total())
if total == 0: return
for key in self.keys():
self[key] = self[key] / total
def sample(self):
"""
Draw a random sample from the distribution and return the key, weighted
by the values associated with each key.
>>> dist = DiscreteDistribution()
>>> dist['a'] = 1
>>> dist['b'] = 2
>>> dist['c'] = 2
>>> dist['d'] = 0
>>> N = 100000.0
>>> samples = [dist.sample() for _ in range(int(N))]
>>> round(samples.count('a') * 1.0/N, 1) # proportion of 'a'
0.2
>>> round(samples.count('b') * 1.0/N, 1)
0.4
>>> round(samples.count('c') * 1.0/N, 1)
0.4
>>> round(samples.count('d') * 1.0/N, 1)
0.0
"""
"*** YOUR CODE HERE ***"
if self.total() != 1:
self.normalize()
items = sorted(self.items())
distribution = [i[1] for i in items]
values = [i[0] for i in items]
choice = random.random()
i, total = 0, distribution[0]
while choice > total:
i += 1
total += distribution[i]
return values[i]
class InferenceModule:
"""
An inference module tracks a belief distribution over a ghost's location.
"""
############################################
# Useful methods for all inference modules #
############################################
def __init__(self, ghostAgent):
"""
Set the ghost agent for later access.
"""
self.ghostAgent = ghostAgent
self.index = ghostAgent.index
self.obs = [] # most recent observation position
def getJailPosition(self):
return (2 * self.ghostAgent.index - 1, 1)
def getPositionDistributionHelper(self, gameState, pos, index, agent):
try:
jail = self.getJailPosition()
gameState = self.setGhostPosition(gameState, pos, index + 1)
except TypeError:
jail = self.getJailPosition(index)
gameState = self.setGhostPositions(gameState, pos)
pacmanPosition = gameState.getPacmanPosition()
ghostPosition = gameState.getGhostPosition(index + 1) # The position you set
dist = DiscreteDistribution()
if pacmanPosition == ghostPosition: # The ghost has been caught!
dist[jail] = 1.0
return dist
pacmanSuccessorStates = game.Actions.getLegalNeighbors(pacmanPosition, \
gameState.getWalls()) # Positions Pacman can move to
if ghostPosition in pacmanSuccessorStates: # Ghost could get caught
mult = 1.0 / float(len(pacmanSuccessorStates))
dist[jail] = mult
else:
mult = 0.0
actionDist = agent.getDistribution(gameState)
for action, prob in actionDist.items():
successorPosition = game.Actions.getSuccessor(ghostPosition, action)
if successorPosition in pacmanSuccessorStates: # Ghost could get caught
denom = float(len(actionDist))
dist[jail] += prob * (1.0 / denom) * (1.0 - mult)
dist[successorPosition] = prob * ((denom - 1.0) / denom) * (1.0 - mult)
else:
dist[successorPosition] = prob * (1.0 - mult)
return dist
def getPositionDistribution(self, gameState, pos, index=None, agent=None):
"""
Return a distribution over successor positions of the ghost from the
given gameState. You must first place the ghost in the gameState, using
setGhostPosition below.
"""
if index == None:
index = self.index - 1
if agent == None:
agent = self.ghostAgent
return self.getPositionDistributionHelper(gameState, pos, index, agent)
def getObservationProb(self, noisyDistance, pacmanPosition, ghostPosition, jailPosition):
"""
Return the probability P(noisyDistance | pacmanPosition, ghostPosition).
"""
"*** YOUR CODE HERE ***"
if ghostPosition == jailPosition:
if noisyDistance == None:
return 1.0
else:
return 0.0
if noisyDistance == None:
return 0.0
trueDistance = manhattanDistance(pacmanPosition, ghostPosition)
return busters.getObservationProbability(noisyDistance, trueDistance)
def setGhostPosition(self, gameState, ghostPosition, index):
"""
Set the position of the ghost for this inference module to the specified
position in the supplied gameState.
Note that calling setGhostPosition does not change the position of the
ghost in the GameState object used for tracking the true progression of
the game. The code in inference.py only ever receives a deep copy of
the GameState object which is responsible for maintaining game state,
not a reference to the original object. Note also that the ghost
distance observations are stored at the time the GameState object is
created, so changing the position of the ghost will not affect the
functioning of observe.
"""
conf = game.Configuration(ghostPosition, game.Directions.STOP)
gameState.data.agentStates[index] = game.AgentState(conf, False)
return gameState
def setGhostPositions(self, gameState, ghostPositions):
"""
Sets the position of all ghosts to the values in ghostPositions.
"""
for index, pos in enumerate(ghostPositions):
conf = game.Configuration(pos, game.Directions.STOP)
gameState.data.agentStates[index + 1] = game.AgentState(conf, False)
return gameState
def observe(self, gameState):
"""
Collect the relevant noisy distance observation and pass it along.
"""
distances = gameState.getNoisyGhostDistances()
if len(distances) >= self.index: # Check for missing observations
obs = distances[self.index - 1]
self.obs = obs
self.observeUpdate(obs, gameState)
def initialize(self, gameState):
"""
Initialize beliefs to a uniform distribution over all legal positions.
"""
self.legalPositions = [p for p in gameState.getWalls().asList(False) if p[1] > 1]
self.allPositions = self.legalPositions + [self.getJailPosition()]
self.initializeUniformly(gameState)
######################################
# Methods that need to be overridden #
######################################
def initializeUniformly(self, gameState):
"""
Set the belief state to a uniform prior belief over all positions.
"""
raise NotImplementedError
def observeUpdate(self, observation, gameState):
"""
Update beliefs based on the given distance observation and gameState.
"""
raise NotImplementedError
def elapseTime(self, gameState):
"""
Predict beliefs for the next time step from a gameState.
"""
raise NotImplementedError
def getBeliefDistribution(self):
"""
Return the agent's current belief state, a distribution over ghost
locations conditioned on all evidence so far.
"""
raise NotImplementedError
class ExactInference(InferenceModule):
"""
The exact dynamic inference module should use forward algorithm updates to
compute the exact belief function at each time step.
"""
def initializeUniformly(self, gameState):
"""
Begin with a uniform distribution over legal ghost positions (i.e., not
including the jail position).
"""
self.beliefs = DiscreteDistribution()
for p in self.legalPositions:
self.beliefs[p] = 1.0
self.beliefs.normalize()
def observeUpdate(self, observation, gameState):
"""
Update beliefs based on the distance observation and Pacman's position.
The observation is the noisy Manhattan distance to the ghost you are
tracking.
self.allPositions is a list of the possible ghost positions, including
the jail position. You should only consider positions that are in
self.allPositions.
The update model is not entirely stationary: it may depend on Pacman's
current position. However, this is not a problem, as Pacman's current
position is known.
"""
"*** YOUR CODE HERE ***"
dist = DiscreteDistribution()
pacmanPosition = gameState.getPacmanPosition()
jailPosition = self.getJailPosition()
for p in self.allPositions:
prob = self.getObservationProb(observation, pacmanPosition, p, jailPosition)
dist[p] = prob * self.beliefs[p]
dist.normalize()
self.beliefs = dist
def elapseTime(self, gameState):
"""
Predict beliefs in response to a time step passing from the current
state.
The transition model is not entirely stationary: it may depend on
Pacman's current position. However, this is not a problem, as Pacman's
current position is known.
"""
"*** YOUR CODE HERE ***"
dist = DiscreteDistribution()
for oldPos in self.allPositions:
newPosDist = self.getPositionDistribution(gameState, oldPos)
old_prob = self.beliefs[oldPos]
for newPos in newPosDist.keys():
dist[newPos] += old_prob * newPosDist[newPos]
self.beliefs = dist
def getBeliefDistribution(self):
return self.beliefs
class ParticleFilter(InferenceModule):
"""
A particle filter for approximately tracking a single ghost.
"""
def __init__(self, ghostAgent, numParticles=300):
InferenceModule.__init__(self, ghostAgent);
self.setNumParticles(numParticles)
def setNumParticles(self, numParticles):
self.numParticles = numParticles
def initializeUniformly(self, gameState):
"""
Initialize a list of particles. Use self.numParticles for the number of
particles. Use self.legalPositions for the legal board positions where
a particle could be located. Particles should be evenly (not randomly)
distributed across positions in order to ensure a uniform prior. Use
self.particles for the list of particles.
"""
"*** YOUR CODE HERE ***"
temp = []
n = self.numParticles
size = len(self.legalPositions)
while n > 0:
if n > size:
temp += self.legalPositions
n -= size
else:
temp += self.legalPositions[0:n]
n = 0
self.particles = temp
def observeUpdate(self, observation, gameState):
"""
Update beliefs based on the distance observation and Pacman's position.
The observation is the noisy Manhattan distance to the ghost you are
tracking.
There is one special case that a correct implementation must handle.
When all particles receive zero weight, the list of particles should
be reinitialized by calling initializeUniformly. The total method of
the DiscreteDistribution may be useful.
"""
"*** YOUR CODE HERE ***"
pacmanPosition = gameState.getPacmanPosition()
jailPosition = self.getJailPosition()
dist = DiscreteDistribution()
for p in self.particles:
prob = self.getObservationProb(observation, pacmanPosition, p, jailPosition)
dist[p] += prob
if dist.total() == 0:
self.initializeUniformly(gameState)
else:
dist.normalize()
self.beliefs = dist
for i in range(self.numParticles):
new_sample = dist.sample()
self.particles[i] = new_sample
def elapseTime(self, gameState):
"""
Sample each particle's next state based on its current state and the
gameState.
"""
"*** YOUR CODE HERE ***"
cache = {}
for i in range(self.numParticles):
particle = self.particles[i]
if particle in cache:
self.particles[i] = cache[particle].sample()
else:
dist = self.getPositionDistribution(gameState, particle)
cache[particle] = dist
self.particles[i] = dist.sample()
def getBeliefDistribution(self):
"""
Return the agent's current belief state, a distribution over ghost
locations conditioned on all evidence and time passage. This method
essentially converts a list of particles into a belief distribution.
"""
"*** YOUR CODE HERE ***"
dist = DiscreteDistribution()
for p in self.particles:
dist[p] += 1
dist.normalize()
return dist
class JointParticleFilter(ParticleFilter):
"""
JointParticleFilter tracks a joint distribution over tuples of all ghost
positions.
"""
def __init__(self, numParticles=600):
self.setNumParticles(numParticles)
def initialize(self, gameState, legalPositions):
"""
Store information about the game, then initialize particles.
"""
self.numGhosts = gameState.getNumAgents() - 1
self.ghostAgents = []
self.legalPositions = legalPositions
self.initializeUniformly(gameState)
def initializeUniformly(self, gameState):
"""
Initialize particles to be consistent with a uniform prior. Particles
should be evenly distributed across positions in order to ensure a
uniform prior.
"""
self.particles = []
"*** YOUR CODE HERE ***"
permutations = list(itertools.product(self.legalPositions, repeat = self.numGhosts))
random.shuffle(permutations)
n = self.numParticles
size = len(permutations)
while n > size:
self.particles += permutations
n -= size
self.particles += permutations[:n]
def addGhostAgent(self, agent):
"""
Each ghost agent is registered separately and stored (in case they are
different).
"""
self.ghostAgents.append(agent)
def getJailPosition(self, i):
return (2 * i + 1, 1);
def observe(self, gameState):
"""
Resample the set of particles using the likelihood of the noisy
observations.
"""
observation = gameState.getNoisyGhostDistances()
self.observeUpdate(observation, gameState)
def observeUpdate(self, observation, gameState):
"""
Update beliefs based on the distance observation and Pacman's position.
The observation is the noisy Manhattan distances to all ghosts you
are tracking.
There is one special case that a correct implementation must handle.
When all particles receive zero weight, the list of particles should
be reinitialized by calling initializeUniformly. The total method of
the DiscreteDistribution may be useful.
"""
"*** YOUR CODE HERE ***"
pacmanPosition = gameState.getPacmanPosition()
dist = DiscreteDistribution()
for p in self.particles:
prob = 1
for i in range(self.numGhosts):
noisy_dist = observation[i]
prob *= self.getObservationProb(noisy_dist, pacmanPosition, p[i], self.getJailPosition(i))
dist[p] += prob
self.beliefs = dist
if self.beliefs.total() == 0:
self.initializeUniformly(gameState)
else:
self.beliefs.normalize()
for i in range(self.numParticles):
newPos = self.beliefs.sample()
self.particles[i] = newPos
def elapseTime(self, gameState):
"""
Sample each particle's next state based on its current state and the
gameState.
"""
newParticles = []
cache = {}
for oldParticle in self.particles:
newParticle = list(oldParticle) # A list of ghost positions
# now loop through and update each entry in newParticle...
"*** YOUR CODE HERE ***"
prevGhostPositions = list(oldParticle)
for i in range(self.numGhosts):
if (oldParticle, i) in cache:
newParticle[i] = cache[(oldParticle, i)].sample()
else:
newPosDist = self.getPositionDistribution(gameState, prevGhostPositions, i, self.ghostAgents[i])
cache[(oldParticle, i)] = newPosDist
newParticle[i] = newPosDist.sample()
"""*** END YOUR CODE HERE ***"""
newParticles.append(tuple(newParticle))
self.particles = newParticles
# One JointInference module is shared globally across instances of MarginalInference
jointInference = JointParticleFilter()
class MarginalInference(InferenceModule):
"""
A wrapper around the JointInference module that returns marginal beliefs
about ghosts.
"""
def initializeUniformly(self, gameState):
"""
Set the belief state to an initial, prior value.
"""
if self.index == 1:
jointInference.initialize(gameState, self.legalPositions)
jointInference.addGhostAgent(self.ghostAgent)
def observe(self, gameState):
"""
Update beliefs based on the given distance observation and gameState.
"""
if self.index == 1:
jointInference.observe(gameState)
def elapseTime(self, gameState):
"""
Predict beliefs for a time step elapsing from a gameState.
"""
if self.index == 1:
jointInference.elapseTime(gameState)
def getBeliefDistribution(self):
"""
Return the marginal belief over a particular ghost by summing out the
others.
"""
jointDistribution = jointInference.getBeliefDistribution()
dist = DiscreteDistribution()
for t, prob in jointDistribution.items():
dist[t[self.index - 1]] += prob
return dist