You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
141 lines
4.4 KiB
141 lines
4.4 KiB
import numpy as np
|
|
from numpy.linalg import norm
|
|
import abc
|
|
import logging
|
|
import sys
|
|
from policy.policy_factory import policy_factory
|
|
from action import ActionXY, ActionRot
|
|
from state import ObservableState, FullState
|
|
|
|
|
|
class Agent(object):
|
|
def __init__(self, config, section):
|
|
"""
|
|
Base class for robot and human. Have the physical attributes of an agent.
|
|
|
|
"""
|
|
self.visible = config.getboolean(section, 'visible')
|
|
self.v_pref = config.getfloat(section, 'v_pref')
|
|
self.radius = config.getfloat(section, 'radius')
|
|
self.policy = policy_factory[config.get(section, 'policy')]()
|
|
self.sensor = config.get(section, 'sensor')
|
|
self.kinematics = self.policy.kinematics if self.policy is not None else None
|
|
self.px = None
|
|
self.py = None
|
|
self.gx = None
|
|
self.gy = None
|
|
self.vx = None
|
|
self.vy = None
|
|
self.theta = None
|
|
self.time_step = None
|
|
|
|
def print_info(self):
|
|
logging.info('Agent is {} and has {} kinematic constraint'.format(
|
|
'visible' if self.visible else 'invisible', self.kinematics))
|
|
|
|
def set_policy(self, policy):
|
|
self.policy = policy
|
|
self.kinematics = policy.kinematics
|
|
|
|
def sample_random_attributes(self):
|
|
"""
|
|
Sample agent radius and v_pref attribute from certain distribution
|
|
:return:
|
|
"""
|
|
self.v_pref = np.random.uniform(0.5, 1.5)
|
|
self.radius = np.random.uniform(0.3, 0.5)
|
|
|
|
def set(self, px, py, gx, gy, vx, vy, theta, radius=None, v_pref=None):
|
|
self.px = px
|
|
self.py = py
|
|
self.gx = gx
|
|
self.gy = gy
|
|
self.vx = vx
|
|
self.vy = vy
|
|
self.theta = theta
|
|
if radius is not None:
|
|
self.radius = radius
|
|
if v_pref is not None:
|
|
self.v_pref = v_pref
|
|
|
|
def get_observable_state(self):
|
|
return ObservableState(self.px, self.py, self.vx, self.vy, self.radius)
|
|
|
|
def get_next_observable_state(self, action):
|
|
self.check_validity(action)
|
|
pos = self.compute_position(action, self.time_step)
|
|
next_px, next_py = pos
|
|
if self.kinematics == 'holonomic':
|
|
next_vx = action.vx
|
|
next_vy = action.vy
|
|
else:
|
|
next_theta = self.theta + action.r
|
|
next_vx = action.v * np.cos(next_theta)
|
|
next_vy = action.v * np.sin(next_theta)
|
|
return ObservableState(next_px, next_py, next_vx, next_vy, self.radius)
|
|
|
|
def get_full_state(self):
|
|
return FullState(self.px, self.py, self.vx, self.vy, self.radius, self.gx, self.gy, self.v_pref, self.theta)
|
|
|
|
def get_position(self):
|
|
return self.px, self.py
|
|
|
|
def set_position(self, position):
|
|
self.px = position[0]
|
|
self.py = position[1]
|
|
|
|
def get_goal_position(self):
|
|
return self.gx, self.gy
|
|
|
|
def get_velocity(self):
|
|
return self.vx, self.vy
|
|
|
|
def set_velocity(self, velocity):
|
|
self.vx = velocity[0]
|
|
self.vy = velocity[1]
|
|
self.theta = np.arctan2(velocity[1], velocity[0])
|
|
|
|
@abc.abstractmethod
|
|
def act(self, ob):
|
|
"""
|
|
Compute state using received observation and pass it to policy
|
|
|
|
"""
|
|
return
|
|
|
|
def check_validity(self, action):
|
|
if self.kinematics == 'holonomic':
|
|
assert isinstance(action, ActionXY)
|
|
else:
|
|
assert isinstance(action, ActionRot)
|
|
|
|
def compute_position(self, action, delta_t):
|
|
self.check_validity(action)
|
|
if self.kinematics == 'holonomic':
|
|
px = self.px + action.vx * delta_t
|
|
py = self.py + action.vy * delta_t
|
|
else:
|
|
theta = self.theta + action.r
|
|
px = self.px + np.cos(theta) * action.v * delta_t
|
|
py = self.py + np.sin(theta) * action.v * delta_t
|
|
|
|
return px, py
|
|
|
|
def step(self, action):
|
|
"""
|
|
Perform an action and update the state
|
|
"""
|
|
self.check_validity(action)
|
|
pos = self.compute_position(action, self.time_step)
|
|
self.px, self.py = pos
|
|
if self.kinematics == 'holonomic':
|
|
self.vx = action.vx
|
|
self.vy = action.vy
|
|
else:
|
|
self.theta = (self.theta + action.r) % (2 * np.pi)
|
|
self.vx = action.v * np.cos(self.theta)
|
|
self.vy = action.v * np.sin(self.theta)
|
|
|
|
def reached_destination(self):
|
|
return norm(np.array(self.get_position()) - np.array(self.get_goal_position())) < self.radius
|