You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

141 lines
4.4 KiB

import numpy as np
from numpy.linalg import norm
import abc
import logging
import sys
from policy.policy_factory import policy_factory
from action import ActionXY, ActionRot
from state import ObservableState, FullState
class Agent(object):
def __init__(self, config, section):
"""
Base class for robot and human. Have the physical attributes of an agent.
"""
self.visible = config.getboolean(section, 'visible')
self.v_pref = config.getfloat(section, 'v_pref')
self.radius = config.getfloat(section, 'radius')
self.policy = policy_factory[config.get(section, 'policy')]()
self.sensor = config.get(section, 'sensor')
self.kinematics = self.policy.kinematics if self.policy is not None else None
self.px = None
self.py = None
self.gx = None
self.gy = None
self.vx = None
self.vy = None
self.theta = None
self.time_step = None
def print_info(self):
logging.info('Agent is {} and has {} kinematic constraint'.format(
'visible' if self.visible else 'invisible', self.kinematics))
def set_policy(self, policy):
self.policy = policy
self.kinematics = policy.kinematics
def sample_random_attributes(self):
"""
Sample agent radius and v_pref attribute from certain distribution
:return:
"""
self.v_pref = np.random.uniform(0.5, 1.5)
self.radius = np.random.uniform(0.3, 0.5)
def set(self, px, py, gx, gy, vx, vy, theta, radius=None, v_pref=None):
self.px = px
self.py = py
self.gx = gx
self.gy = gy
self.vx = vx
self.vy = vy
self.theta = theta
if radius is not None:
self.radius = radius
if v_pref is not None:
self.v_pref = v_pref
def get_observable_state(self):
return ObservableState(self.px, self.py, self.vx, self.vy, self.radius)
def get_next_observable_state(self, action):
self.check_validity(action)
pos = self.compute_position(action, self.time_step)
next_px, next_py = pos
if self.kinematics == 'holonomic':
next_vx = action.vx
next_vy = action.vy
else:
next_theta = self.theta + action.r
next_vx = action.v * np.cos(next_theta)
next_vy = action.v * np.sin(next_theta)
return ObservableState(next_px, next_py, next_vx, next_vy, self.radius)
def get_full_state(self):
return FullState(self.px, self.py, self.vx, self.vy, self.radius, self.gx, self.gy, self.v_pref, self.theta)
def get_position(self):
return self.px, self.py
def set_position(self, position):
self.px = position[0]
self.py = position[1]
def get_goal_position(self):
return self.gx, self.gy
def get_velocity(self):
return self.vx, self.vy
def set_velocity(self, velocity):
self.vx = velocity[0]
self.vy = velocity[1]
self.theta = np.arctan2(velocity[1], velocity[0])
@abc.abstractmethod
def act(self, ob):
"""
Compute state using received observation and pass it to policy
"""
return
def check_validity(self, action):
if self.kinematics == 'holonomic':
assert isinstance(action, ActionXY)
else:
assert isinstance(action, ActionRot)
def compute_position(self, action, delta_t):
self.check_validity(action)
if self.kinematics == 'holonomic':
px = self.px + action.vx * delta_t
py = self.py + action.vy * delta_t
else:
theta = self.theta + action.r
px = self.px + np.cos(theta) * action.v * delta_t
py = self.py + np.sin(theta) * action.v * delta_t
return px, py
def step(self, action):
"""
Perform an action and update the state
"""
self.check_validity(action)
pos = self.compute_position(action, self.time_step)
self.px, self.py = pos
if self.kinematics == 'holonomic':
self.vx = action.vx
self.vy = action.vy
else:
self.theta = (self.theta + action.r) % (2 * np.pi)
self.vx = action.v * np.cos(self.theta)
self.vy = action.v * np.sin(self.theta)
def reached_destination(self):
return norm(np.array(self.get_position()) - np.array(self.get_goal_position())) < self.radius