Merge pull request '修改文件、代码' (#47) from develop into master
commit
f4837452a9
Binary file not shown.
After Width: | Height: | Size: 24 MiB |
Binary file not shown.
After Width: | Height: | Size: 13 KiB |
@ -1,222 +0,0 @@
|
||||
"""
|
||||
ARA_star 2D (Anytime Repairing A*)
|
||||
@author: huiming zhou
|
||||
|
||||
@description: local inconsistency: g-value decreased.
|
||||
g(s) decreased introduces a local inconsistency between s and its successors.
|
||||
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import math
|
||||
|
||||
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
|
||||
"/../../Search_based_Planning/")
|
||||
|
||||
from Search_2D import plotting, env
|
||||
|
||||
|
||||
class AraStar:
|
||||
def __init__(self, s_start, s_goal, e, heuristic_type):
|
||||
self.s_start, self.s_goal = s_start, s_goal
|
||||
self.heuristic_type = heuristic_type
|
||||
|
||||
self.Env = env.Env() # class Env
|
||||
|
||||
self.u_set = self.Env.motions # feasible input set
|
||||
self.obs = self.Env.obs # position of obstacles
|
||||
self.e = e # weight
|
||||
|
||||
self.g = dict() # Cost to come
|
||||
self.OPEN = dict() # priority queue / OPEN set
|
||||
self.CLOSED = set() # CLOSED set
|
||||
self.INCONS = {} # INCONSISTENT set
|
||||
self.PARENT = dict() # relations
|
||||
self.path = [] # planning path
|
||||
self.visited = [] # order of visited nodes
|
||||
|
||||
def init(self):
|
||||
"""
|
||||
initialize each set.
|
||||
"""
|
||||
|
||||
self.g[self.s_start] = 0.0
|
||||
self.g[self.s_goal] = math.inf
|
||||
self.OPEN[self.s_start] = self.f_value(self.s_start)
|
||||
self.PARENT[self.s_start] = self.s_start
|
||||
|
||||
def searching(self):
|
||||
self.init()
|
||||
self.ImprovePath()
|
||||
self.path.append(self.extract_path())
|
||||
|
||||
while self.update_e() > 1: # continue condition
|
||||
self.e -= 0.4 # increase weight
|
||||
self.OPEN.update(self.INCONS)
|
||||
self.OPEN = {s: self.f_value(s) for s in self.OPEN} # update f_value of OPEN set
|
||||
|
||||
self.INCONS = dict()
|
||||
self.CLOSED = set()
|
||||
self.ImprovePath() # improve path
|
||||
self.path.append(self.extract_path())
|
||||
|
||||
return self.path, self.visited
|
||||
|
||||
def ImprovePath(self):
|
||||
"""
|
||||
:return: a e'-suboptimal path
|
||||
"""
|
||||
|
||||
visited_each = []
|
||||
|
||||
while True:
|
||||
s, f_small = self.calc_smallest_f()
|
||||
|
||||
if self.f_value(self.s_goal) <= f_small:
|
||||
break
|
||||
|
||||
self.OPEN.pop(s)
|
||||
self.CLOSED.add(s)
|
||||
|
||||
for s_n in self.get_neighbor(s):
|
||||
if s_n in self.obs:
|
||||
continue
|
||||
|
||||
new_cost = self.g[s] + self.cost(s, s_n)
|
||||
|
||||
if s_n not in self.g or new_cost < self.g[s_n]:
|
||||
self.g[s_n] = new_cost
|
||||
self.PARENT[s_n] = s
|
||||
visited_each.append(s_n)
|
||||
|
||||
if s_n not in self.CLOSED:
|
||||
self.OPEN[s_n] = self.f_value(s_n)
|
||||
else:
|
||||
self.INCONS[s_n] = 0.0
|
||||
|
||||
self.visited.append(visited_each)
|
||||
|
||||
def calc_smallest_f(self):
|
||||
"""
|
||||
:return: node with smallest f_value in OPEN set.
|
||||
"""
|
||||
|
||||
s_small = min(self.OPEN, key=self.OPEN.get)
|
||||
|
||||
return s_small, self.OPEN[s_small]
|
||||
|
||||
def get_neighbor(self, s):
|
||||
"""
|
||||
find neighbors of state s that not in obstacles.
|
||||
:param s: state
|
||||
:return: neighbors
|
||||
"""
|
||||
|
||||
return {(s[0] + u[0], s[1] + u[1]) for u in self.u_set}
|
||||
|
||||
def update_e(self):
|
||||
v = float("inf")
|
||||
|
||||
if self.OPEN:
|
||||
v = min(self.g[s] + self.h(s) for s in self.OPEN)
|
||||
if self.INCONS:
|
||||
v = min(v, min(self.g[s] + self.h(s) for s in self.INCONS))
|
||||
|
||||
return min(self.e, self.g[self.s_goal] / v)
|
||||
|
||||
def f_value(self, x):
|
||||
"""
|
||||
f = g + e * h
|
||||
f = cost-to-come + weight * cost-to-go
|
||||
:param x: current state
|
||||
:return: f_value
|
||||
"""
|
||||
|
||||
return self.g[x] + self.e * self.h(x)
|
||||
|
||||
def extract_path(self):
|
||||
"""
|
||||
Extract the path based on the PARENT set.
|
||||
:return: The planning path
|
||||
"""
|
||||
|
||||
path = [self.s_goal]
|
||||
s = self.s_goal
|
||||
|
||||
while True:
|
||||
s = self.PARENT[s]
|
||||
path.append(s)
|
||||
|
||||
if s == self.s_start:
|
||||
break
|
||||
|
||||
return list(path)
|
||||
|
||||
def h(self, s):
|
||||
"""
|
||||
Calculate heuristic.
|
||||
:param s: current node (state)
|
||||
:return: heuristic function value
|
||||
"""
|
||||
|
||||
heuristic_type = self.heuristic_type # heuristic type
|
||||
goal = self.s_goal # goal node
|
||||
|
||||
if heuristic_type == "manhattan":
|
||||
return abs(goal[0] - s[0]) + abs(goal[1] - s[1])
|
||||
else:
|
||||
return math.hypot(goal[0] - s[0], goal[1] - s[1])
|
||||
|
||||
def cost(self, s_start, s_goal):
|
||||
"""
|
||||
Calculate Cost for this motion
|
||||
:param s_start: starting node
|
||||
:param s_goal: end node
|
||||
:return: Cost for this motion
|
||||
:note: Cost function could be more complicate!
|
||||
"""
|
||||
|
||||
if self.is_collision(s_start, s_goal):
|
||||
return math.inf
|
||||
|
||||
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
|
||||
|
||||
def is_collision(self, s_start, s_end):
|
||||
"""
|
||||
check if the line segment (s_start, s_end) is collision.
|
||||
:param s_start: start node
|
||||
:param s_end: end node
|
||||
:return: True: is collision / False: not collision
|
||||
"""
|
||||
|
||||
if s_start in self.obs or s_end in self.obs:
|
||||
return True
|
||||
|
||||
if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
|
||||
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
|
||||
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
|
||||
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
|
||||
else:
|
||||
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
|
||||
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
|
||||
|
||||
if s1 in self.obs or s2 in self.obs:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def main():
|
||||
s_start = (5, 5)
|
||||
s_goal = (45, 25)
|
||||
|
||||
arastar = AraStar(s_start, s_goal, 2.5, "euclidean")
|
||||
plot = plotting.Plotting(s_start, s_goal)
|
||||
|
||||
path, visited = arastar.searching()
|
||||
plot.animation_ara_star(path, visited, "Anytime Repairing A* (ARA*)")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -1,317 +0,0 @@
|
||||
"""
|
||||
Anytime_D_star 2D
|
||||
@author: huiming zhou
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import math
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
|
||||
"/../../Search_based_Planning/")
|
||||
|
||||
from Search_2D import plotting
|
||||
from Search_2D import env
|
||||
|
||||
|
||||
class ADStar:
|
||||
def __init__(self, s_start, s_goal, eps, heuristic_type):
|
||||
self.s_start, self.s_goal = s_start, s_goal
|
||||
self.heuristic_type = heuristic_type
|
||||
|
||||
self.Env = env.Env() # class Env
|
||||
self.Plot = plotting.Plotting(s_start, s_goal)
|
||||
|
||||
self.u_set = self.Env.motions # feasible input set
|
||||
self.obs = self.Env.obs # position of obstacles
|
||||
self.x = self.Env.x_range
|
||||
self.y = self.Env.y_range
|
||||
|
||||
self.g, self.rhs, self.OPEN = {}, {}, {}
|
||||
|
||||
for i in range(1, self.Env.x_range - 1):
|
||||
for j in range(1, self.Env.y_range - 1):
|
||||
self.rhs[(i, j)] = float("inf")
|
||||
self.g[(i, j)] = float("inf")
|
||||
|
||||
self.rhs[self.s_goal] = 0.0
|
||||
self.eps = eps
|
||||
self.OPEN[self.s_goal] = self.Key(self.s_goal)
|
||||
self.CLOSED, self.INCONS = set(), dict()
|
||||
|
||||
self.visited = set()
|
||||
self.count = 0
|
||||
self.count_env_change = 0
|
||||
self.obs_add = set()
|
||||
self.obs_remove = set()
|
||||
self.title = "Anytime D*: Small changes" # Significant changes
|
||||
self.fig = plt.figure()
|
||||
|
||||
def run(self):
|
||||
self.Plot.plot_grid(self.title)
|
||||
self.ComputeOrImprovePath()
|
||||
self.plot_visited()
|
||||
self.plot_path(self.extract_path())
|
||||
self.visited = set()
|
||||
|
||||
while True:
|
||||
if self.eps <= 1.0:
|
||||
break
|
||||
self.eps -= 0.5
|
||||
self.OPEN.update(self.INCONS)
|
||||
for s in self.OPEN:
|
||||
self.OPEN[s] = self.Key(s)
|
||||
self.CLOSED = set()
|
||||
self.ComputeOrImprovePath()
|
||||
self.plot_visited()
|
||||
self.plot_path(self.extract_path())
|
||||
self.visited = set()
|
||||
plt.pause(0.5)
|
||||
|
||||
self.fig.canvas.mpl_connect('button_press_event', self.on_press)
|
||||
plt.show()
|
||||
|
||||
def on_press(self, event):
|
||||
x, y = event.xdata, event.ydata
|
||||
if x < 0 or x > self.x - 1 or y < 0 or y > self.y - 1:
|
||||
print("Please choose right area!")
|
||||
else:
|
||||
self.count_env_change += 1
|
||||
x, y = int(x), int(y)
|
||||
print("Change position: s =", x, ",", "y =", y)
|
||||
|
||||
# for small changes
|
||||
if self.title == "Anytime D*: Small changes":
|
||||
if (x, y) not in self.obs:
|
||||
self.obs.add((x, y))
|
||||
self.g[(x, y)] = float("inf")
|
||||
self.rhs[(x, y)] = float("inf")
|
||||
else:
|
||||
self.obs.remove((x, y))
|
||||
self.UpdateState((x, y))
|
||||
|
||||
self.Plot.update_obs(self.obs)
|
||||
|
||||
for sn in self.get_neighbor((x, y)):
|
||||
self.UpdateState(sn)
|
||||
|
||||
plt.cla()
|
||||
self.Plot.plot_grid(self.title)
|
||||
|
||||
while True:
|
||||
if len(self.INCONS) == 0:
|
||||
break
|
||||
self.OPEN.update(self.INCONS)
|
||||
for s in self.OPEN:
|
||||
self.OPEN[s] = self.Key(s)
|
||||
self.CLOSED = set()
|
||||
self.ComputeOrImprovePath()
|
||||
self.plot_visited()
|
||||
self.plot_path(self.extract_path())
|
||||
# plt.plot(self.title)
|
||||
self.visited = set()
|
||||
|
||||
if self.eps <= 1.0:
|
||||
break
|
||||
|
||||
else:
|
||||
if (x, y) not in self.obs:
|
||||
self.obs.add((x, y))
|
||||
self.obs_add.add((x, y))
|
||||
plt.plot(x, y, 'sk')
|
||||
if (x, y) in self.obs_remove:
|
||||
self.obs_remove.remove((x, y))
|
||||
else:
|
||||
self.obs.remove((x, y))
|
||||
self.obs_remove.add((x, y))
|
||||
plt.plot(x, y, marker='s', color='white')
|
||||
if (x, y) in self.obs_add:
|
||||
self.obs_add.remove((x, y))
|
||||
|
||||
self.Plot.update_obs(self.obs)
|
||||
|
||||
if self.count_env_change >= 15:
|
||||
self.count_env_change = 0
|
||||
self.eps += 2.0
|
||||
for s in self.obs_add:
|
||||
self.g[(x, y)] = float("inf")
|
||||
self.rhs[(x, y)] = float("inf")
|
||||
|
||||
for sn in self.get_neighbor(s):
|
||||
self.UpdateState(sn)
|
||||
|
||||
for s in self.obs_remove:
|
||||
for sn in self.get_neighbor(s):
|
||||
self.UpdateState(sn)
|
||||
self.UpdateState(s)
|
||||
|
||||
plt.cla()
|
||||
self.Plot.plot_grid(self.title)
|
||||
|
||||
while True:
|
||||
if self.eps <= 1.0:
|
||||
break
|
||||
self.eps -= 0.5
|
||||
self.OPEN.update(self.INCONS)
|
||||
for s in self.OPEN:
|
||||
self.OPEN[s] = self.Key(s)
|
||||
self.CLOSED = set()
|
||||
self.ComputeOrImprovePath()
|
||||
self.plot_visited()
|
||||
self.plot_path(self.extract_path())
|
||||
plt.title(self.title)
|
||||
self.visited = set()
|
||||
plt.pause(0.5)
|
||||
|
||||
self.fig.canvas.draw_idle()
|
||||
|
||||
def ComputeOrImprovePath(self):
|
||||
while True:
|
||||
s, v = self.TopKey()
|
||||
if v >= self.Key(self.s_start) and \
|
||||
self.rhs[self.s_start] == self.g[self.s_start]:
|
||||
break
|
||||
|
||||
self.OPEN.pop(s)
|
||||
self.visited.add(s)
|
||||
|
||||
if self.g[s] > self.rhs[s]:
|
||||
self.g[s] = self.rhs[s]
|
||||
self.CLOSED.add(s)
|
||||
for sn in self.get_neighbor(s):
|
||||
self.UpdateState(sn)
|
||||
else:
|
||||
self.g[s] = float("inf")
|
||||
for sn in self.get_neighbor(s):
|
||||
self.UpdateState(sn)
|
||||
self.UpdateState(s)
|
||||
|
||||
def UpdateState(self, s):
|
||||
if s != self.s_goal:
|
||||
self.rhs[s] = float("inf")
|
||||
for x in self.get_neighbor(s):
|
||||
self.rhs[s] = min(self.rhs[s], self.g[x] + self.cost(s, x))
|
||||
if s in self.OPEN:
|
||||
self.OPEN.pop(s)
|
||||
|
||||
if self.g[s] != self.rhs[s]:
|
||||
if s not in self.CLOSED:
|
||||
self.OPEN[s] = self.Key(s)
|
||||
else:
|
||||
self.INCONS[s] = 0
|
||||
|
||||
def Key(self, s):
|
||||
if self.g[s] > self.rhs[s]:
|
||||
return [self.rhs[s] + self.eps * self.h(self.s_start, s), self.rhs[s]]
|
||||
else:
|
||||
return [self.g[s] + self.h(self.s_start, s), self.g[s]]
|
||||
|
||||
def TopKey(self):
|
||||
"""
|
||||
:return: return the min key and its value.
|
||||
"""
|
||||
|
||||
s = min(self.OPEN, key=self.OPEN.get)
|
||||
return s, self.OPEN[s]
|
||||
|
||||
def h(self, s_start, s_goal):
|
||||
heuristic_type = self.heuristic_type # heuristic type
|
||||
|
||||
if heuristic_type == "manhattan":
|
||||
return abs(s_goal[0] - s_start[0]) + abs(s_goal[1] - s_start[1])
|
||||
else:
|
||||
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
|
||||
|
||||
def cost(self, s_start, s_goal):
|
||||
"""
|
||||
Calculate Cost for this motion
|
||||
:param s_start: starting node
|
||||
:param s_goal: end node
|
||||
:return: Cost for this motion
|
||||
:note: Cost function could be more complicate!
|
||||
"""
|
||||
|
||||
if self.is_collision(s_start, s_goal):
|
||||
return float("inf")
|
||||
|
||||
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
|
||||
|
||||
def is_collision(self, s_start, s_end):
|
||||
if s_start in self.obs or s_end in self.obs:
|
||||
return True
|
||||
|
||||
if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
|
||||
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
|
||||
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
|
||||
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
|
||||
else:
|
||||
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
|
||||
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
|
||||
|
||||
if s1 in self.obs or s2 in self.obs:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def get_neighbor(self, s):
|
||||
nei_list = set()
|
||||
for u in self.u_set:
|
||||
s_next = tuple([s[i] + u[i] for i in range(2)])
|
||||
if s_next not in self.obs:
|
||||
nei_list.add(s_next)
|
||||
|
||||
return nei_list
|
||||
|
||||
def extract_path(self):
|
||||
"""
|
||||
Extract the path based on the PARENT set.
|
||||
:return: The planning path
|
||||
"""
|
||||
|
||||
path = [self.s_start]
|
||||
s = self.s_start
|
||||
|
||||
for k in range(100):
|
||||
g_list = {}
|
||||
for x in self.get_neighbor(s):
|
||||
if not self.is_collision(s, x):
|
||||
g_list[x] = self.g[x]
|
||||
s = min(g_list, key=g_list.get)
|
||||
path.append(s)
|
||||
if s == self.s_goal:
|
||||
break
|
||||
|
||||
return list(path)
|
||||
|
||||
def plot_path(self, path):
|
||||
px = [x[0] for x in path]
|
||||
py = [x[1] for x in path]
|
||||
plt.plot(px, py, linewidth=2)
|
||||
plt.plot(self.s_start[0], self.s_start[1], "bs")
|
||||
plt.plot(self.s_goal[0], self.s_goal[1], "gs")
|
||||
|
||||
def plot_visited(self):
|
||||
self.count += 1
|
||||
|
||||
color = ['gainsboro', 'lightgray', 'silver', 'darkgray',
|
||||
'bisque', 'navajowhite', 'moccasin', 'wheat',
|
||||
'powderblue', 'skyblue', 'lightskyblue', 'cornflowerblue']
|
||||
|
||||
if self.count >= len(color) - 1:
|
||||
self.count = 0
|
||||
|
||||
for x in self.visited:
|
||||
plt.plot(x[0], x[1], marker='s', color=color[self.count])
|
||||
|
||||
|
||||
def main():
|
||||
s_start = (5, 5)
|
||||
s_goal = (45, 25)
|
||||
|
||||
dstar = ADStar(s_start, s_goal, 2.5, "euclidean")
|
||||
dstar.run()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -1,68 +0,0 @@
|
||||
"""
|
||||
Best-First Searching
|
||||
@author: huiming zhou
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import math
|
||||
import heapq
|
||||
|
||||
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
|
||||
"/../../Search_based_Planning/")
|
||||
|
||||
from Search_2D import plotting, env
|
||||
from Search_2D.Astar import AStar
|
||||
|
||||
|
||||
class BestFirst(AStar):
|
||||
"""BestFirst set the heuristics as the priority
|
||||
"""
|
||||
def searching(self):
|
||||
"""
|
||||
Breadth-first Searching.
|
||||
:return: path, visited order
|
||||
"""
|
||||
|
||||
self.PARENT[self.s_start] = self.s_start
|
||||
self.g[self.s_start] = 0
|
||||
self.g[self.s_goal] = math.inf
|
||||
heapq.heappush(self.OPEN,
|
||||
(self.heuristic(self.s_start), self.s_start))
|
||||
|
||||
while self.OPEN:
|
||||
_, s = heapq.heappop(self.OPEN)
|
||||
self.CLOSED.append(s)
|
||||
|
||||
if s == self.s_goal:
|
||||
break
|
||||
|
||||
for s_n in self.get_neighbor(s):
|
||||
new_cost = self.g[s] + self.cost(s, s_n)
|
||||
|
||||
if s_n not in self.g:
|
||||
self.g[s_n] = math.inf
|
||||
|
||||
if new_cost < self.g[s_n]: # conditions for updating Cost
|
||||
self.g[s_n] = new_cost
|
||||
self.PARENT[s_n] = s
|
||||
|
||||
# best first set the heuristics as the priority
|
||||
heapq.heappush(self.OPEN, (self.heuristic(s_n), s_n))
|
||||
|
||||
return self.extract_path(self.PARENT), self.CLOSED
|
||||
|
||||
|
||||
def main():
|
||||
s_start = (5, 5)
|
||||
s_goal = (45, 25)
|
||||
|
||||
BF = BestFirst(s_start, s_goal, 'euclidean')
|
||||
plot = plotting.Plotting(s_start, s_goal)
|
||||
|
||||
path, visited = BF.searching()
|
||||
plot.animation(path, visited, "Best-first Searching") # animation
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -1,229 +0,0 @@
|
||||
"""
|
||||
Bidirectional_a_star 2D
|
||||
@author: huiming zhou
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import math
|
||||
import heapq
|
||||
|
||||
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
|
||||
"/../../Search_based_Planning/")
|
||||
|
||||
from Search_2D import plotting, env
|
||||
|
||||
|
||||
class BidirectionalAStar:
|
||||
def __init__(self, s_start, s_goal, heuristic_type):
|
||||
self.s_start = s_start
|
||||
self.s_goal = s_goal
|
||||
self.heuristic_type = heuristic_type
|
||||
|
||||
self.Env = env.Env() # class Env
|
||||
|
||||
self.u_set = self.Env.motions # feasible input set
|
||||
self.obs = self.Env.obs # position of obstacles
|
||||
|
||||
self.OPEN_fore = [] # OPEN set for forward searching
|
||||
self.OPEN_back = [] # OPEN set for backward searching
|
||||
self.CLOSED_fore = [] # CLOSED set for forward
|
||||
self.CLOSED_back = [] # CLOSED set for backward
|
||||
self.PARENT_fore = dict() # recorded parent for forward
|
||||
self.PARENT_back = dict() # recorded parent for backward
|
||||
self.g_fore = dict() # cost to come for forward
|
||||
self.g_back = dict() # cost to come for backward
|
||||
|
||||
def init(self):
|
||||
"""
|
||||
initialize parameters
|
||||
"""
|
||||
|
||||
self.g_fore[self.s_start] = 0.0
|
||||
self.g_fore[self.s_goal] = math.inf
|
||||
self.g_back[self.s_goal] = 0.0
|
||||
self.g_back[self.s_start] = math.inf
|
||||
self.PARENT_fore[self.s_start] = self.s_start
|
||||
self.PARENT_back[self.s_goal] = self.s_goal
|
||||
heapq.heappush(self.OPEN_fore,
|
||||
(self.f_value_fore(self.s_start), self.s_start))
|
||||
heapq.heappush(self.OPEN_back,
|
||||
(self.f_value_back(self.s_goal), self.s_goal))
|
||||
|
||||
def searching(self):
|
||||
"""
|
||||
Bidirectional A*
|
||||
:return: connected path, visited order of forward, visited order of backward
|
||||
"""
|
||||
|
||||
self.init()
|
||||
s_meet = self.s_start
|
||||
|
||||
while self.OPEN_fore and self.OPEN_back:
|
||||
# solve foreward-search
|
||||
_, s_fore = heapq.heappop(self.OPEN_fore)
|
||||
|
||||
if s_fore in self.PARENT_back:
|
||||
s_meet = s_fore
|
||||
break
|
||||
|
||||
self.CLOSED_fore.append(s_fore)
|
||||
|
||||
for s_n in self.get_neighbor(s_fore):
|
||||
new_cost = self.g_fore[s_fore] + self.cost(s_fore, s_n)
|
||||
|
||||
if s_n not in self.g_fore:
|
||||
self.g_fore[s_n] = math.inf
|
||||
|
||||
if new_cost < self.g_fore[s_n]:
|
||||
self.g_fore[s_n] = new_cost
|
||||
self.PARENT_fore[s_n] = s_fore
|
||||
heapq.heappush(self.OPEN_fore,
|
||||
(self.f_value_fore(s_n), s_n))
|
||||
|
||||
# solve backward-search
|
||||
_, s_back = heapq.heappop(self.OPEN_back)
|
||||
|
||||
if s_back in self.PARENT_fore:
|
||||
s_meet = s_back
|
||||
break
|
||||
|
||||
self.CLOSED_back.append(s_back)
|
||||
|
||||
for s_n in self.get_neighbor(s_back):
|
||||
new_cost = self.g_back[s_back] + self.cost(s_back, s_n)
|
||||
|
||||
if s_n not in self.g_back:
|
||||
self.g_back[s_n] = math.inf
|
||||
|
||||
if new_cost < self.g_back[s_n]:
|
||||
self.g_back[s_n] = new_cost
|
||||
self.PARENT_back[s_n] = s_back
|
||||
heapq.heappush(self.OPEN_back,
|
||||
(self.f_value_back(s_n), s_n))
|
||||
|
||||
return self.extract_path(s_meet), self.CLOSED_fore, self.CLOSED_back
|
||||
|
||||
def get_neighbor(self, s):
|
||||
"""
|
||||
find neighbors of state s that not in obstacles.
|
||||
:param s: state
|
||||
:return: neighbors
|
||||
"""
|
||||
|
||||
return [(s[0] + u[0], s[1] + u[1]) for u in self.u_set]
|
||||
|
||||
def extract_path(self, s_meet):
|
||||
"""
|
||||
extract path from start and goal
|
||||
:param s_meet: meet point of bi-direction a*
|
||||
:return: path
|
||||
"""
|
||||
|
||||
# extract path for foreward part
|
||||
path_fore = [s_meet]
|
||||
s = s_meet
|
||||
|
||||
while True:
|
||||
s = self.PARENT_fore[s]
|
||||
path_fore.append(s)
|
||||
if s == self.s_start:
|
||||
break
|
||||
|
||||
# extract path for backward part
|
||||
path_back = []
|
||||
s = s_meet
|
||||
|
||||
while True:
|
||||
s = self.PARENT_back[s]
|
||||
path_back.append(s)
|
||||
if s == self.s_goal:
|
||||
break
|
||||
|
||||
return list(reversed(path_fore)) + list(path_back)
|
||||
|
||||
def f_value_fore(self, s):
|
||||
"""
|
||||
forward searching: f = g + h. (g: Cost to come, h: heuristic value)
|
||||
:param s: current state
|
||||
:return: f
|
||||
"""
|
||||
|
||||
return self.g_fore[s] + self.h(s, self.s_goal)
|
||||
|
||||
def f_value_back(self, s):
|
||||
"""
|
||||
backward searching: f = g + h. (g: Cost to come, h: heuristic value)
|
||||
:param s: current state
|
||||
:return: f
|
||||
"""
|
||||
|
||||
return self.g_back[s] + self.h(s, self.s_start)
|
||||
|
||||
def h(self, s, goal):
|
||||
"""
|
||||
Calculate heuristic value.
|
||||
:param s: current node (state)
|
||||
:param goal: goal node (state)
|
||||
:return: heuristic value
|
||||
"""
|
||||
|
||||
heuristic_type = self.heuristic_type
|
||||
|
||||
if heuristic_type == "manhattan":
|
||||
return abs(goal[0] - s[0]) + abs(goal[1] - s[1])
|
||||
else:
|
||||
return math.hypot(goal[0] - s[0], goal[1] - s[1])
|
||||
|
||||
def cost(self, s_start, s_goal):
|
||||
"""
|
||||
Calculate Cost for this motion
|
||||
:param s_start: starting node
|
||||
:param s_goal: end node
|
||||
:return: Cost for this motion
|
||||
:note: Cost function could be more complicate!
|
||||
"""
|
||||
|
||||
if self.is_collision(s_start, s_goal):
|
||||
return math.inf
|
||||
|
||||
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
|
||||
|
||||
def is_collision(self, s_start, s_end):
|
||||
"""
|
||||
check if the line segment (s_start, s_end) is collision.
|
||||
:param s_start: start node
|
||||
:param s_end: end node
|
||||
:return: True: is collision / False: not collision
|
||||
"""
|
||||
|
||||
if s_start in self.obs or s_end in self.obs:
|
||||
return True
|
||||
|
||||
if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
|
||||
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
|
||||
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
|
||||
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
|
||||
else:
|
||||
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
|
||||
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
|
||||
|
||||
if s1 in self.obs or s2 in self.obs:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def main():
|
||||
x_start = (5, 5)
|
||||
x_goal = (45, 25)
|
||||
|
||||
bastar = BidirectionalAStar(x_start, x_goal, "euclidean")
|
||||
plot = plotting.Plotting(x_start, x_goal)
|
||||
|
||||
path, visited_fore, visited_back = bastar.searching()
|
||||
plot.animation_bi_astar(path, visited_fore, visited_back, "Bidirectional-A*") # animation
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -1,304 +0,0 @@
|
||||
"""
|
||||
D_star 2D
|
||||
@author: huiming zhou
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import math
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
|
||||
"/../../Search_based_Planning/")
|
||||
|
||||
from Search_2D import plotting, env
|
||||
|
||||
|
||||
class DStar:
|
||||
def __init__(self, s_start, s_goal):
|
||||
self.s_start, self.s_goal = s_start, s_goal
|
||||
|
||||
self.Env = env.Env()
|
||||
self.Plot = plotting.Plotting(self.s_start, self.s_goal)
|
||||
|
||||
self.u_set = self.Env.motions
|
||||
self.obs = self.Env.obs
|
||||
self.x = self.Env.x_range
|
||||
self.y = self.Env.y_range
|
||||
|
||||
self.fig = plt.figure()
|
||||
|
||||
self.OPEN = set()
|
||||
self.t = dict()
|
||||
self.PARENT = dict()
|
||||
self.h = dict()
|
||||
self.k = dict()
|
||||
self.path = []
|
||||
self.visited = set()
|
||||
self.count = 0
|
||||
|
||||
def init(self):
|
||||
for i in range(self.Env.x_range):
|
||||
for j in range(self.Env.y_range):
|
||||
self.t[(i, j)] = 'NEW'
|
||||
self.k[(i, j)] = 0.0
|
||||
self.h[(i, j)] = float("inf")
|
||||
self.PARENT[(i, j)] = None
|
||||
|
||||
self.h[self.s_goal] = 0.0
|
||||
|
||||
def run(self, s_start, s_end):
|
||||
self.init()
|
||||
self.insert(s_end, 0)
|
||||
|
||||
while True:
|
||||
self.process_state()
|
||||
if self.t[s_start] == 'CLOSED':
|
||||
break
|
||||
|
||||
self.path = self.extract_path(s_start, s_end)
|
||||
self.Plot.plot_grid("Dynamic A* (D*)")
|
||||
self.plot_path(self.path)
|
||||
self.fig.canvas.mpl_connect('button_press_event', self.on_press)
|
||||
plt.show()
|
||||
|
||||
def on_press(self, event):
|
||||
x, y = event.xdata, event.ydata
|
||||
if x < 0 or x > self.x - 1 or y < 0 or y > self.y - 1:
|
||||
print("Please choose right area!")
|
||||
else:
|
||||
x, y = int(x), int(y)
|
||||
if (x, y) not in self.obs:
|
||||
print("Add obstacle at: s =", x, ",", "y =", y)
|
||||
self.obs.add((x, y))
|
||||
self.Plot.update_obs(self.obs)
|
||||
|
||||
s = self.s_start
|
||||
self.visited = set()
|
||||
self.count += 1
|
||||
|
||||
while s != self.s_goal:
|
||||
if self.is_collision(s, self.PARENT[s]):
|
||||
self.modify(s)
|
||||
continue
|
||||
s = self.PARENT[s]
|
||||
|
||||
self.path = self.extract_path(self.s_start, self.s_goal)
|
||||
|
||||
plt.cla()
|
||||
self.Plot.plot_grid("Dynamic A* (D*)")
|
||||
self.plot_visited(self.visited)
|
||||
self.plot_path(self.path)
|
||||
|
||||
self.fig.canvas.draw_idle()
|
||||
|
||||
def extract_path(self, s_start, s_end):
|
||||
path = [s_start]
|
||||
s = s_start
|
||||
while True:
|
||||
s = self.PARENT[s]
|
||||
path.append(s)
|
||||
if s == s_end:
|
||||
return path
|
||||
|
||||
def process_state(self):
|
||||
s = self.min_state() # get node in OPEN set with min k value
|
||||
self.visited.add(s)
|
||||
|
||||
if s is None:
|
||||
return -1 # OPEN set is empty
|
||||
|
||||
k_old = self.get_k_min() # record the min k value of this iteration (min path cost)
|
||||
self.delete(s) # move state s from OPEN set to CLOSED set
|
||||
|
||||
# k_min < h[s] --> s: RAISE state (increased cost)
|
||||
if k_old < self.h[s]:
|
||||
for s_n in self.get_neighbor(s):
|
||||
if self.h[s_n] <= k_old and \
|
||||
self.h[s] > self.h[s_n] + self.cost(s_n, s):
|
||||
|
||||
# update h_value and choose parent
|
||||
self.PARENT[s] = s_n
|
||||
self.h[s] = self.h[s_n] + self.cost(s_n, s)
|
||||
|
||||
# s: k_min >= h[s] -- > s: LOWER state (cost reductions)
|
||||
if k_old == self.h[s]:
|
||||
for s_n in self.get_neighbor(s):
|
||||
if self.t[s_n] == 'NEW' or \
|
||||
(self.PARENT[s_n] == s and self.h[s_n] != self.h[s] + self.cost(s, s_n)) or \
|
||||
(self.PARENT[s_n] != s and self.h[s_n] > self.h[s] + self.cost(s, s_n)):
|
||||
|
||||
# Condition:
|
||||
# 1) t[s_n] == 'NEW': not visited
|
||||
# 2) s_n's parent: cost reduction
|
||||
# 3) s_n find a better parent
|
||||
self.PARENT[s_n] = s
|
||||
self.insert(s_n, self.h[s] + self.cost(s, s_n))
|
||||
else:
|
||||
for s_n in self.get_neighbor(s):
|
||||
if self.t[s_n] == 'NEW' or \
|
||||
(self.PARENT[s_n] == s and self.h[s_n] != self.h[s] + self.cost(s, s_n)):
|
||||
|
||||
# Condition:
|
||||
# 1) t[s_n] == 'NEW': not visited
|
||||
# 2) s_n's parent: cost reduction
|
||||
self.PARENT[s_n] = s
|
||||
self.insert(s_n, self.h[s] + self.cost(s, s_n))
|
||||
else:
|
||||
if self.PARENT[s_n] != s and \
|
||||
self.h[s_n] > self.h[s] + self.cost(s, s_n):
|
||||
|
||||
# Condition: LOWER happened in OPEN set (s), s should be explored again
|
||||
self.insert(s, self.h[s])
|
||||
else:
|
||||
if self.PARENT[s_n] != s and \
|
||||
self.h[s] > self.h[s_n] + self.cost(s_n, s) and \
|
||||
self.t[s_n] == 'CLOSED' and \
|
||||
self.h[s_n] > k_old:
|
||||
|
||||
# Condition: LOWER happened in CLOSED set (s_n), s_n should be explored again
|
||||
self.insert(s_n, self.h[s_n])
|
||||
|
||||
return self.get_k_min()
|
||||
|
||||
def min_state(self):
|
||||
"""
|
||||
choose the node with the minimum k value in OPEN set.
|
||||
:return: state
|
||||
"""
|
||||
|
||||
if not self.OPEN:
|
||||
return None
|
||||
|
||||
return min(self.OPEN, key=lambda x: self.k[x])
|
||||
|
||||
def get_k_min(self):
|
||||
"""
|
||||
calc the min k value for nodes in OPEN set.
|
||||
:return: k value
|
||||
"""
|
||||
|
||||
if not self.OPEN:
|
||||
return -1
|
||||
|
||||
return min([self.k[x] for x in self.OPEN])
|
||||
|
||||
def insert(self, s, h_new):
|
||||
"""
|
||||
insert node into OPEN set.
|
||||
:param s: node
|
||||
:param h_new: new or better cost to come value
|
||||
"""
|
||||
|
||||
if self.t[s] == 'NEW':
|
||||
self.k[s] = h_new
|
||||
elif self.t[s] == 'OPEN':
|
||||
self.k[s] = min(self.k[s], h_new)
|
||||
elif self.t[s] == 'CLOSED':
|
||||
self.k[s] = min(self.h[s], h_new)
|
||||
|
||||
self.h[s] = h_new
|
||||
self.t[s] = 'OPEN'
|
||||
self.OPEN.add(s)
|
||||
|
||||
def delete(self, s):
|
||||
"""
|
||||
delete: move state s from OPEN set to CLOSED set.
|
||||
:param s: state should be deleted
|
||||
"""
|
||||
|
||||
if self.t[s] == 'OPEN':
|
||||
self.t[s] = 'CLOSED'
|
||||
|
||||
self.OPEN.remove(s)
|
||||
|
||||
def modify(self, s):
|
||||
"""
|
||||
start processing from state s.
|
||||
:param s: is a node whose status is RAISE or LOWER.
|
||||
"""
|
||||
|
||||
self.modify_cost(s)
|
||||
|
||||
while True:
|
||||
k_min = self.process_state()
|
||||
|
||||
if k_min >= self.h[s]:
|
||||
break
|
||||
|
||||
def modify_cost(self, s):
|
||||
# if node in CLOSED set, put it into OPEN set.
|
||||
# Since cost may be changed between s - s.parent, calc cost(s, s.p) again
|
||||
|
||||
if self.t[s] == 'CLOSED':
|
||||
self.insert(s, self.h[self.PARENT[s]] + self.cost(s, self.PARENT[s]))
|
||||
|
||||
def get_neighbor(self, s):
|
||||
nei_list = set()
|
||||
|
||||
for u in self.u_set:
|
||||
s_next = tuple([s[i] + u[i] for i in range(2)])
|
||||
if s_next not in self.obs:
|
||||
nei_list.add(s_next)
|
||||
|
||||
return nei_list
|
||||
|
||||
def cost(self, s_start, s_goal):
|
||||
"""
|
||||
Calculate Cost for this motion
|
||||
:param s_start: starting node
|
||||
:param s_goal: end node
|
||||
:return: Cost for this motion
|
||||
:note: Cost function could be more complicate!
|
||||
"""
|
||||
|
||||
if self.is_collision(s_start, s_goal):
|
||||
return float("inf")
|
||||
|
||||
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
|
||||
|
||||
def is_collision(self, s_start, s_end):
|
||||
if s_start in self.obs or s_end in self.obs:
|
||||
return True
|
||||
|
||||
if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
|
||||
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
|
||||
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
|
||||
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
|
||||
else:
|
||||
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
|
||||
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
|
||||
|
||||
if s1 in self.obs or s2 in self.obs:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def plot_path(self, path):
|
||||
px = [x[0] for x in path]
|
||||
py = [x[1] for x in path]
|
||||
plt.plot(px, py, linewidth=2)
|
||||
plt.plot(self.s_start[0], self.s_start[1], "bs")
|
||||
plt.plot(self.s_goal[0], self.s_goal[1], "gs")
|
||||
|
||||
def plot_visited(self, visited):
|
||||
color = ['gainsboro', 'lightgray', 'silver', 'darkgray',
|
||||
'bisque', 'navajowhite', 'moccasin', 'wheat',
|
||||
'powderblue', 'skyblue', 'lightskyblue', 'cornflowerblue']
|
||||
|
||||
if self.count >= len(color) - 1:
|
||||
self.count = 0
|
||||
|
||||
for x in visited:
|
||||
plt.plot(x[0], x[1], marker='s', color=color[self.count])
|
||||
|
||||
|
||||
def main():
|
||||
s_start = (5, 5)
|
||||
s_goal = (45, 25)
|
||||
dstar = DStar(s_start, s_goal)
|
||||
dstar.run(s_start, s_goal)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -1,239 +0,0 @@
|
||||
"""
|
||||
D_star_Lite 2D
|
||||
@author: huiming zhou
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import math
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
|
||||
"/../../Search_based_Planning/")
|
||||
|
||||
from Search_2D import plotting, env
|
||||
|
||||
|
||||
class DStar:
|
||||
def __init__(self, s_start, s_goal, heuristic_type):
|
||||
self.s_start, self.s_goal = s_start, s_goal
|
||||
self.heuristic_type = heuristic_type
|
||||
|
||||
self.Env = env.Env() # class Env
|
||||
self.Plot = plotting.Plotting(s_start, s_goal)
|
||||
|
||||
self.u_set = self.Env.motions # feasible input set
|
||||
self.obs = self.Env.obs # position of obstacles
|
||||
self.x = self.Env.x_range
|
||||
self.y = self.Env.y_range
|
||||
|
||||
self.g, self.rhs, self.U = {}, {}, {}
|
||||
self.km = 0
|
||||
|
||||
for i in range(1, self.Env.x_range - 1):
|
||||
for j in range(1, self.Env.y_range - 1):
|
||||
self.rhs[(i, j)] = float("inf")
|
||||
self.g[(i, j)] = float("inf")
|
||||
|
||||
self.rhs[self.s_goal] = 0.0
|
||||
self.U[self.s_goal] = self.CalculateKey(self.s_goal)
|
||||
self.visited = set()
|
||||
self.count = 0
|
||||
self.fig = plt.figure()
|
||||
|
||||
def run(self):
|
||||
self.Plot.plot_grid("D* Lite")
|
||||
self.ComputePath()
|
||||
self.plot_path(self.extract_path())
|
||||
self.fig.canvas.mpl_connect('button_press_event', self.on_press)
|
||||
plt.show()
|
||||
|
||||
def on_press(self, event):
|
||||
x, y = event.xdata, event.ydata
|
||||
if x < 0 or x > self.x - 1 or y < 0 or y > self.y - 1:
|
||||
print("Please choose right area!")
|
||||
else:
|
||||
x, y = int(x), int(y)
|
||||
print("Change position: s =", x, ",", "y =", y)
|
||||
|
||||
s_curr = self.s_start
|
||||
s_last = self.s_start
|
||||
i = 0
|
||||
path = [self.s_start]
|
||||
|
||||
while s_curr != self.s_goal:
|
||||
s_list = {}
|
||||
|
||||
for s in self.get_neighbor(s_curr):
|
||||
s_list[s] = self.g[s] + self.cost(s_curr, s)
|
||||
s_curr = min(s_list, key=s_list.get)
|
||||
path.append(s_curr)
|
||||
|
||||
if i < 1:
|
||||
self.km += self.h(s_last, s_curr)
|
||||
s_last = s_curr
|
||||
if (x, y) not in self.obs:
|
||||
self.obs.add((x, y))
|
||||
plt.plot(x, y, 'sk')
|
||||
self.g[(x, y)] = float("inf")
|
||||
self.rhs[(x, y)] = float("inf")
|
||||
else:
|
||||
self.obs.remove((x, y))
|
||||
plt.plot(x, y, marker='s', color='white')
|
||||
self.UpdateVertex((x, y))
|
||||
for s in self.get_neighbor((x, y)):
|
||||
self.UpdateVertex(s)
|
||||
i += 1
|
||||
|
||||
self.count += 1
|
||||
self.visited = set()
|
||||
self.ComputePath()
|
||||
|
||||
self.plot_visited(self.visited)
|
||||
self.plot_path(path)
|
||||
self.fig.canvas.draw_idle()
|
||||
|
||||
def ComputePath(self):
|
||||
while True:
|
||||
s, v = self.TopKey()
|
||||
if v >= self.CalculateKey(self.s_start) and \
|
||||
self.rhs[self.s_start] == self.g[self.s_start]:
|
||||
break
|
||||
|
||||
k_old = v
|
||||
self.U.pop(s)
|
||||
self.visited.add(s)
|
||||
|
||||
if k_old < self.CalculateKey(s):
|
||||
self.U[s] = self.CalculateKey(s)
|
||||
elif self.g[s] > self.rhs[s]:
|
||||
self.g[s] = self.rhs[s]
|
||||
for x in self.get_neighbor(s):
|
||||
self.UpdateVertex(x)
|
||||
else:
|
||||
self.g[s] = float("inf")
|
||||
self.UpdateVertex(s)
|
||||
for x in self.get_neighbor(s):
|
||||
self.UpdateVertex(x)
|
||||
|
||||
def UpdateVertex(self, s):
|
||||
if s != self.s_goal:
|
||||
self.rhs[s] = float("inf")
|
||||
for x in self.get_neighbor(s):
|
||||
self.rhs[s] = min(self.rhs[s], self.g[x] + self.cost(s, x))
|
||||
if s in self.U:
|
||||
self.U.pop(s)
|
||||
|
||||
if self.g[s] != self.rhs[s]:
|
||||
self.U[s] = self.CalculateKey(s)
|
||||
|
||||
def CalculateKey(self, s):
|
||||
return [min(self.g[s], self.rhs[s]) + self.h(self.s_start, s) + self.km,
|
||||
min(self.g[s], self.rhs[s])]
|
||||
|
||||
def TopKey(self):
|
||||
"""
|
||||
:return: return the min key and its value.
|
||||
"""
|
||||
|
||||
s = min(self.U, key=self.U.get)
|
||||
return s, self.U[s]
|
||||
|
||||
def h(self, s_start, s_goal):
|
||||
heuristic_type = self.heuristic_type # heuristic type
|
||||
|
||||
if heuristic_type == "manhattan":
|
||||
return abs(s_goal[0] - s_start[0]) + abs(s_goal[1] - s_start[1])
|
||||
else:
|
||||
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
|
||||
|
||||
def cost(self, s_start, s_goal):
|
||||
"""
|
||||
Calculate Cost for this motion
|
||||
:param s_start: starting node
|
||||
:param s_goal: end node
|
||||
:return: Cost for this motion
|
||||
:note: Cost function could be more complicate!
|
||||
"""
|
||||
|
||||
if self.is_collision(s_start, s_goal):
|
||||
return float("inf")
|
||||
|
||||
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
|
||||
|
||||
def is_collision(self, s_start, s_end):
|
||||
if s_start in self.obs or s_end in self.obs:
|
||||
return True
|
||||
|
||||
if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
|
||||
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
|
||||
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
|
||||
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
|
||||
else:
|
||||
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
|
||||
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
|
||||
|
||||
if s1 in self.obs or s2 in self.obs:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def get_neighbor(self, s):
|
||||
nei_list = set()
|
||||
for u in self.u_set:
|
||||
s_next = tuple([s[i] + u[i] for i in range(2)])
|
||||
if s_next not in self.obs:
|
||||
nei_list.add(s_next)
|
||||
|
||||
return nei_list
|
||||
|
||||
def extract_path(self):
|
||||
"""
|
||||
Extract the path based on the PARENT set.
|
||||
:return: The planning path
|
||||
"""
|
||||
|
||||
path = [self.s_start]
|
||||
s = self.s_start
|
||||
|
||||
for k in range(100):
|
||||
g_list = {}
|
||||
for x in self.get_neighbor(s):
|
||||
if not self.is_collision(s, x):
|
||||
g_list[x] = self.g[x]
|
||||
s = min(g_list, key=g_list.get)
|
||||
path.append(s)
|
||||
if s == self.s_goal:
|
||||
break
|
||||
|
||||
return list(path)
|
||||
|
||||
def plot_path(self, path):
|
||||
px = [x[0] for x in path]
|
||||
py = [x[1] for x in path]
|
||||
plt.plot(px, py, linewidth=2)
|
||||
plt.plot(self.s_start[0], self.s_start[1], "bs")
|
||||
plt.plot(self.s_goal[0], self.s_goal[1], "gs")
|
||||
|
||||
def plot_visited(self, visited):
|
||||
color = ['gainsboro', 'lightgray', 'silver', 'darkgray',
|
||||
'bisque', 'navajowhite', 'moccasin', 'wheat',
|
||||
'powderblue', 'skyblue', 'lightskyblue', 'cornflowerblue']
|
||||
|
||||
if self.count >= len(color) - 1:
|
||||
self.count = 0
|
||||
|
||||
for x in visited:
|
||||
plt.plot(x[0], x[1], marker='s', color=color[self.count])
|
||||
|
||||
|
||||
def main():
|
||||
s_start = (5, 5)
|
||||
s_goal = (45, 25)
|
||||
|
||||
dstar = DStar(s_start, s_goal, "euclidean")
|
||||
dstar.run()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -1,69 +0,0 @@
|
||||
"""
|
||||
Dijkstra 2D
|
||||
@author: huiming zhou
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import math
|
||||
import heapq
|
||||
|
||||
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
|
||||
"/../../Search_based_Planning/")
|
||||
|
||||
from Search_2D import plotting, env
|
||||
|
||||
from Search_2D.Astar import AStar
|
||||
|
||||
|
||||
class Dijkstra(AStar):
|
||||
"""Dijkstra set the cost as the priority
|
||||
"""
|
||||
def searching(self):
|
||||
"""
|
||||
Breadth-first Searching.
|
||||
:return: path, visited order
|
||||
"""
|
||||
|
||||
self.PARENT[self.s_start] = self.s_start
|
||||
self.g[self.s_start] = 0
|
||||
self.g[self.s_goal] = math.inf
|
||||
heapq.heappush(self.OPEN,
|
||||
(0, self.s_start))
|
||||
|
||||
while self.OPEN:
|
||||
_, s = heapq.heappop(self.OPEN)
|
||||
self.CLOSED.append(s)
|
||||
|
||||
if s == self.s_goal:
|
||||
break
|
||||
|
||||
for s_n in self.get_neighbor(s):
|
||||
new_cost = self.g[s] + self.cost(s, s_n)
|
||||
|
||||
if s_n not in self.g:
|
||||
self.g[s_n] = math.inf
|
||||
|
||||
if new_cost < self.g[s_n]: # conditions for updating Cost
|
||||
self.g[s_n] = new_cost
|
||||
self.PARENT[s_n] = s
|
||||
|
||||
# best first set the heuristics as the priority
|
||||
heapq.heappush(self.OPEN, (new_cost, s_n))
|
||||
|
||||
return self.extract_path(self.PARENT), self.CLOSED
|
||||
|
||||
|
||||
def main():
|
||||
s_start = (5, 5)
|
||||
s_goal = (45, 25)
|
||||
|
||||
dijkstra = Dijkstra(s_start, s_goal, 'None')
|
||||
plot = plotting.Plotting(s_start, s_goal)
|
||||
|
||||
path, visited = dijkstra.searching()
|
||||
plot.animation(path, visited, "Dijkstra's") # animation generate
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -1,256 +0,0 @@
|
||||
"""
|
||||
LPA_star 2D
|
||||
@author: huiming zhou
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import math
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
|
||||
"/../../Search_based_Planning/")
|
||||
|
||||
from Search_2D import plotting, env
|
||||
|
||||
|
||||
class LPAStar:
|
||||
def __init__(self, s_start, s_goal, heuristic_type):
|
||||
self.s_start, self.s_goal = s_start, s_goal
|
||||
self.heuristic_type = heuristic_type
|
||||
|
||||
self.Env = env.Env()
|
||||
self.Plot = plotting.Plotting(self.s_start, self.s_goal)
|
||||
|
||||
self.u_set = self.Env.motions
|
||||
self.obs = self.Env.obs
|
||||
self.x = self.Env.x_range
|
||||
self.y = self.Env.y_range
|
||||
|
||||
self.g, self.rhs, self.U = {}, {}, {}
|
||||
|
||||
for i in range(self.Env.x_range):
|
||||
for j in range(self.Env.y_range):
|
||||
self.rhs[(i, j)] = float("inf")
|
||||
self.g[(i, j)] = float("inf")
|
||||
|
||||
self.rhs[self.s_start] = 0
|
||||
self.U[self.s_start] = self.CalculateKey(self.s_start)
|
||||
self.visited = set()
|
||||
self.count = 0
|
||||
|
||||
self.fig = plt.figure()
|
||||
|
||||
def run(self):
|
||||
self.Plot.plot_grid("Lifelong Planning A*")
|
||||
|
||||
self.ComputeShortestPath()
|
||||
self.plot_path(self.extract_path())
|
||||
self.fig.canvas.mpl_connect('button_press_event', self.on_press)
|
||||
|
||||
plt.show()
|
||||
|
||||
def on_press(self, event):
|
||||
x, y = event.xdata, event.ydata
|
||||
if x < 0 or x > self.x - 1 or y < 0 or y > self.y - 1:
|
||||
print("Please choose right area!")
|
||||
else:
|
||||
x, y = int(x), int(y)
|
||||
print("Change position: s =", x, ",", "y =", y)
|
||||
|
||||
self.visited = set()
|
||||
self.count += 1
|
||||
|
||||
if (x, y) not in self.obs:
|
||||
self.obs.add((x, y))
|
||||
else:
|
||||
self.obs.remove((x, y))
|
||||
self.UpdateVertex((x, y))
|
||||
|
||||
self.Plot.update_obs(self.obs)
|
||||
|
||||
for s_n in self.get_neighbor((x, y)):
|
||||
self.UpdateVertex(s_n)
|
||||
|
||||
self.ComputeShortestPath()
|
||||
|
||||
plt.cla()
|
||||
self.Plot.plot_grid("Lifelong Planning A*")
|
||||
self.plot_visited(self.visited)
|
||||
self.plot_path(self.extract_path())
|
||||
self.fig.canvas.draw_idle()
|
||||
|
||||
def ComputeShortestPath(self):
|
||||
while True:
|
||||
s, v = self.TopKey()
|
||||
|
||||
if v >= self.CalculateKey(self.s_goal) and \
|
||||
self.rhs[self.s_goal] == self.g[self.s_goal]:
|
||||
break
|
||||
|
||||
self.U.pop(s)
|
||||
self.visited.add(s)
|
||||
|
||||
if self.g[s] > self.rhs[s]:
|
||||
|
||||
# Condition: over-consistent (eg: deleted obstacles)
|
||||
# So, rhs[s] decreased -- > rhs[s] < g[s]
|
||||
self.g[s] = self.rhs[s]
|
||||
else:
|
||||
|
||||
# Condition: # under-consistent (eg: added obstacles)
|
||||
# So, rhs[s] increased --> rhs[s] > g[s]
|
||||
self.g[s] = float("inf")
|
||||
self.UpdateVertex(s)
|
||||
|
||||
for s_n in self.get_neighbor(s):
|
||||
self.UpdateVertex(s_n)
|
||||
|
||||
def UpdateVertex(self, s):
|
||||
"""
|
||||
update the status and the current cost to come of state s.
|
||||
:param s: state s
|
||||
"""
|
||||
|
||||
if s != self.s_start:
|
||||
|
||||
# Condition: cost of parent of s changed
|
||||
# Since we do not record the children of a state, we need to enumerate its neighbors
|
||||
self.rhs[s] = min(self.g[s_n] + self.cost(s_n, s)
|
||||
for s_n in self.get_neighbor(s))
|
||||
|
||||
if s in self.U:
|
||||
self.U.pop(s)
|
||||
|
||||
if self.g[s] != self.rhs[s]:
|
||||
|
||||
# Condition: current cost to come is different to that of last time
|
||||
# state s should be added into OPEN set (set U)
|
||||
self.U[s] = self.CalculateKey(s)
|
||||
|
||||
def TopKey(self):
|
||||
"""
|
||||
:return: return the min key and its value.
|
||||
"""
|
||||
|
||||
s = min(self.U, key=self.U.get)
|
||||
|
||||
return s, self.U[s]
|
||||
|
||||
def CalculateKey(self, s):
|
||||
|
||||
return [min(self.g[s], self.rhs[s]) + self.h(s),
|
||||
min(self.g[s], self.rhs[s])]
|
||||
|
||||
def get_neighbor(self, s):
|
||||
"""
|
||||
find neighbors of state s that not in obstacles.
|
||||
:param s: state
|
||||
:return: neighbors
|
||||
"""
|
||||
|
||||
s_list = set()
|
||||
|
||||
for u in self.u_set:
|
||||
s_next = tuple([s[i] + u[i] for i in range(2)])
|
||||
if s_next not in self.obs:
|
||||
s_list.add(s_next)
|
||||
|
||||
return s_list
|
||||
|
||||
def h(self, s):
|
||||
"""
|
||||
Calculate heuristic.
|
||||
:param s: current node (state)
|
||||
:return: heuristic function value
|
||||
"""
|
||||
|
||||
heuristic_type = self.heuristic_type # heuristic type
|
||||
goal = self.s_goal # goal node
|
||||
|
||||
if heuristic_type == "manhattan":
|
||||
return abs(goal[0] - s[0]) + abs(goal[1] - s[1])
|
||||
else:
|
||||
return math.hypot(goal[0] - s[0], goal[1] - s[1])
|
||||
|
||||
def cost(self, s_start, s_goal):
|
||||
"""
|
||||
Calculate Cost for this motion
|
||||
:param s_start: starting node
|
||||
:param s_goal: end node
|
||||
:return: Cost for this motion
|
||||
:note: Cost function could be more complicate!
|
||||
"""
|
||||
|
||||
if self.is_collision(s_start, s_goal):
|
||||
return float("inf")
|
||||
|
||||
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
|
||||
|
||||
def is_collision(self, s_start, s_end):
|
||||
if s_start in self.obs or s_end in self.obs:
|
||||
return True
|
||||
|
||||
if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
|
||||
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
|
||||
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
|
||||
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
|
||||
else:
|
||||
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
|
||||
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
|
||||
|
||||
if s1 in self.obs or s2 in self.obs:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def extract_path(self):
|
||||
"""
|
||||
Extract the path based on the PARENT set.
|
||||
:return: The planning path
|
||||
"""
|
||||
|
||||
path = [self.s_goal]
|
||||
s = self.s_goal
|
||||
|
||||
for k in range(100):
|
||||
g_list = {}
|
||||
for x in self.get_neighbor(s):
|
||||
if not self.is_collision(s, x):
|
||||
g_list[x] = self.g[x]
|
||||
s = min(g_list, key=g_list.get)
|
||||
path.append(s)
|
||||
if s == self.s_start:
|
||||
break
|
||||
|
||||
return list(reversed(path))
|
||||
|
||||
def plot_path(self, path):
|
||||
px = [x[0] for x in path]
|
||||
py = [x[1] for x in path]
|
||||
plt.plot(px, py, linewidth=2)
|
||||
plt.plot(self.s_start[0], self.s_start[1], "bs")
|
||||
plt.plot(self.s_goal[0], self.s_goal[1], "gs")
|
||||
|
||||
def plot_visited(self, visited):
|
||||
color = ['gainsboro', 'lightgray', 'silver', 'darkgray',
|
||||
'bisque', 'navajowhite', 'moccasin', 'wheat',
|
||||
'powderblue', 'skyblue', 'lightskyblue', 'cornflowerblue']
|
||||
|
||||
if self.count >= len(color) - 1:
|
||||
self.count = 0
|
||||
|
||||
for x in visited:
|
||||
plt.plot(x[0], x[1], marker='s', color=color[self.count])
|
||||
|
||||
|
||||
def main():
|
||||
x_start = (5, 5)
|
||||
x_goal = (45, 25)
|
||||
|
||||
lpastar = LPAStar(x_start, x_goal, "Euclidean")
|
||||
lpastar.run()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -1,230 +0,0 @@
|
||||
"""
|
||||
LRTA_star 2D (Learning Real-time A*)
|
||||
@author: huiming zhou
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import copy
|
||||
import math
|
||||
|
||||
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
|
||||
"/../../Search_based_Planning/")
|
||||
|
||||
from Search_2D import queue, plotting, env
|
||||
|
||||
|
||||
class LrtAStarN:
|
||||
def __init__(self, s_start, s_goal, N, heuristic_type):
|
||||
self.s_start, self.s_goal = s_start, s_goal
|
||||
self.heuristic_type = heuristic_type
|
||||
|
||||
self.Env = env.Env()
|
||||
|
||||
self.u_set = self.Env.motions # feasible input set
|
||||
self.obs = self.Env.obs # position of obstacles
|
||||
|
||||
self.N = N # number of expand nodes each iteration
|
||||
self.visited = [] # order of visited nodes in planning
|
||||
self.path = [] # path of each iteration
|
||||
self.h_table = {} # h_value table
|
||||
|
||||
def init(self):
|
||||
"""
|
||||
initialize the h_value of all nodes in the environment.
|
||||
it is a global table.
|
||||
"""
|
||||
|
||||
for i in range(self.Env.x_range):
|
||||
for j in range(self.Env.y_range):
|
||||
self.h_table[(i, j)] = self.h((i, j))
|
||||
|
||||
def searching(self):
|
||||
self.init()
|
||||
s_start = self.s_start # initialize start node
|
||||
|
||||
while True:
|
||||
OPEN, CLOSED = self.AStar(s_start, self.N) # OPEN, CLOSED sets in each iteration
|
||||
|
||||
if OPEN == "FOUND": # reach the goal node
|
||||
self.path.append(CLOSED)
|
||||
break
|
||||
|
||||
h_value = self.iteration(CLOSED) # h_value table of CLOSED nodes
|
||||
|
||||
for x in h_value:
|
||||
self.h_table[x] = h_value[x]
|
||||
|
||||
s_start, path_k = self.extract_path_in_CLOSE(s_start, h_value) # x_init -> expected node in OPEN set
|
||||
self.path.append(path_k)
|
||||
|
||||
def extract_path_in_CLOSE(self, s_start, h_value):
|
||||
path = [s_start]
|
||||
s = s_start
|
||||
|
||||
while True:
|
||||
h_list = {}
|
||||
|
||||
for s_n in self.get_neighbor(s):
|
||||
if s_n in h_value:
|
||||
h_list[s_n] = h_value[s_n]
|
||||
else:
|
||||
h_list[s_n] = self.h_table[s_n]
|
||||
|
||||
s_key = min(h_list, key=h_list.get) # move to the smallest node with min h_value
|
||||
path.append(s_key) # generate path
|
||||
s = s_key # use end of this iteration as the start of next
|
||||
|
||||
if s_key not in h_value: # reach the expected node in OPEN set
|
||||
return s_key, path
|
||||
|
||||
def iteration(self, CLOSED):
|
||||
h_value = {}
|
||||
|
||||
for s in CLOSED:
|
||||
h_value[s] = float("inf") # initialize h_value of CLOSED nodes
|
||||
|
||||
while True:
|
||||
h_value_rec = copy.deepcopy(h_value)
|
||||
for s in CLOSED:
|
||||
h_list = []
|
||||
for s_n in self.get_neighbor(s):
|
||||
if s_n not in CLOSED:
|
||||
h_list.append(self.cost(s, s_n) + self.h_table[s_n])
|
||||
else:
|
||||
h_list.append(self.cost(s, s_n) + h_value[s_n])
|
||||
h_value[s] = min(h_list) # update h_value of current node
|
||||
|
||||
if h_value == h_value_rec: # h_value table converged
|
||||
return h_value
|
||||
|
||||
def AStar(self, x_start, N):
|
||||
OPEN = queue.QueuePrior() # OPEN set
|
||||
OPEN.put(x_start, self.h(x_start))
|
||||
CLOSED = [] # CLOSED set
|
||||
g_table = {x_start: 0, self.s_goal: float("inf")} # Cost to come
|
||||
PARENT = {x_start: x_start} # relations
|
||||
count = 0 # counter
|
||||
|
||||
while not OPEN.empty():
|
||||
count += 1
|
||||
s = OPEN.get()
|
||||
CLOSED.append(s)
|
||||
|
||||
if s == self.s_goal: # reach the goal node
|
||||
self.visited.append(CLOSED)
|
||||
return "FOUND", self.extract_path(x_start, PARENT)
|
||||
|
||||
for s_n in self.get_neighbor(s):
|
||||
if s_n not in CLOSED:
|
||||
new_cost = g_table[s] + self.cost(s, s_n)
|
||||
if s_n not in g_table:
|
||||
g_table[s_n] = float("inf")
|
||||
if new_cost < g_table[s_n]: # conditions for updating Cost
|
||||
g_table[s_n] = new_cost
|
||||
PARENT[s_n] = s
|
||||
OPEN.put(s_n, g_table[s_n] + self.h_table[s_n])
|
||||
|
||||
if count == N: # expand needed CLOSED nodes
|
||||
break
|
||||
|
||||
self.visited.append(CLOSED) # visited nodes in each iteration
|
||||
|
||||
return OPEN, CLOSED
|
||||
|
||||
def get_neighbor(self, s):
|
||||
"""
|
||||
find neighbors of state s that not in obstacles.
|
||||
:param s: state
|
||||
:return: neighbors
|
||||
"""
|
||||
|
||||
s_list = []
|
||||
|
||||
for u in self.u_set:
|
||||
s_next = tuple([s[i] + u[i] for i in range(2)])
|
||||
if s_next not in self.obs:
|
||||
s_list.append(s_next)
|
||||
|
||||
return s_list
|
||||
|
||||
def extract_path(self, x_start, parent):
|
||||
"""
|
||||
Extract the path based on the relationship of nodes.
|
||||
|
||||
:return: The planning path
|
||||
"""
|
||||
|
||||
path_back = [self.s_goal]
|
||||
x_current = self.s_goal
|
||||
|
||||
while True:
|
||||
x_current = parent[x_current]
|
||||
path_back.append(x_current)
|
||||
|
||||
if x_current == x_start:
|
||||
break
|
||||
|
||||
return list(reversed(path_back))
|
||||
|
||||
def h(self, s):
|
||||
"""
|
||||
Calculate heuristic.
|
||||
:param s: current node (state)
|
||||
:return: heuristic function value
|
||||
"""
|
||||
|
||||
heuristic_type = self.heuristic_type # heuristic type
|
||||
goal = self.s_goal # goal node
|
||||
|
||||
if heuristic_type == "manhattan":
|
||||
return abs(goal[0] - s[0]) + abs(goal[1] - s[1])
|
||||
else:
|
||||
return math.hypot(goal[0] - s[0], goal[1] - s[1])
|
||||
|
||||
def cost(self, s_start, s_goal):
|
||||
"""
|
||||
Calculate Cost for this motion
|
||||
:param s_start: starting node
|
||||
:param s_goal: end node
|
||||
:return: Cost for this motion
|
||||
:note: Cost function could be more complicate!
|
||||
"""
|
||||
|
||||
if self.is_collision(s_start, s_goal):
|
||||
return float("inf")
|
||||
|
||||
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
|
||||
|
||||
def is_collision(self, s_start, s_end):
|
||||
if s_start in self.obs or s_end in self.obs:
|
||||
return True
|
||||
|
||||
if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
|
||||
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
|
||||
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
|
||||
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
|
||||
else:
|
||||
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
|
||||
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
|
||||
|
||||
if s1 in self.obs or s2 in self.obs:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def main():
|
||||
s_start = (10, 5)
|
||||
s_goal = (45, 25)
|
||||
|
||||
lrta = LrtAStarN(s_start, s_goal, 250, "euclidean")
|
||||
plot = plotting.Plotting(s_start, s_goal)
|
||||
|
||||
lrta.searching()
|
||||
plot.animation_lrta(lrta.path, lrta.visited,
|
||||
"Learning Real-time A* (LRTA*)")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -1,237 +0,0 @@
|
||||
"""
|
||||
RTAAstar 2D (Real-time Adaptive A*)
|
||||
@author: huiming zhou
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import copy
|
||||
import math
|
||||
|
||||
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
|
||||
"/../../Search_based_Planning/")
|
||||
|
||||
from Search_2D import queue, plotting, env
|
||||
|
||||
|
||||
class RTAAStar:
|
||||
def __init__(self, s_start, s_goal, N, heuristic_type):
|
||||
self.s_start, self.s_goal = s_start, s_goal
|
||||
self.heuristic_type = heuristic_type
|
||||
|
||||
self.Env = env.Env()
|
||||
|
||||
self.u_set = self.Env.motions # feasible input set
|
||||
self.obs = self.Env.obs # position of obstacles
|
||||
|
||||
self.N = N # number of expand nodes each iteration
|
||||
self.visited = [] # order of visited nodes in planning
|
||||
self.path = [] # path of each iteration
|
||||
self.h_table = {} # h_value table
|
||||
|
||||
def init(self):
|
||||
"""
|
||||
initialize the h_value of all nodes in the environment.
|
||||
it is a global table.
|
||||
"""
|
||||
|
||||
for i in range(self.Env.x_range):
|
||||
for j in range(self.Env.y_range):
|
||||
self.h_table[(i, j)] = self.h((i, j))
|
||||
|
||||
def searching(self):
|
||||
self.init()
|
||||
s_start = self.s_start # initialize start node
|
||||
|
||||
while True:
|
||||
OPEN, CLOSED, g_table, PARENT = \
|
||||
self.Astar(s_start, self.N)
|
||||
|
||||
if OPEN == "FOUND": # reach the goal node
|
||||
self.path.append(CLOSED)
|
||||
break
|
||||
|
||||
s_next, h_value = self.cal_h_value(OPEN, CLOSED, g_table, PARENT)
|
||||
|
||||
for x in h_value:
|
||||
self.h_table[x] = h_value[x]
|
||||
|
||||
s_start, path_k = self.extract_path_in_CLOSE(s_start, s_next, h_value)
|
||||
self.path.append(path_k)
|
||||
|
||||
def cal_h_value(self, OPEN, CLOSED, g_table, PARENT):
|
||||
v_open = {}
|
||||
h_value = {}
|
||||
for (_, x) in OPEN.enumerate():
|
||||
v_open[x] = g_table[PARENT[x]] + 1 + self.h_table[x]
|
||||
s_open = min(v_open, key=v_open.get)
|
||||
f_min = v_open[s_open]
|
||||
for x in CLOSED:
|
||||
h_value[x] = f_min - g_table[x]
|
||||
|
||||
return s_open, h_value
|
||||
|
||||
def iteration(self, CLOSED):
|
||||
h_value = {}
|
||||
|
||||
for s in CLOSED:
|
||||
h_value[s] = float("inf") # initialize h_value of CLOSED nodes
|
||||
|
||||
while True:
|
||||
h_value_rec = copy.deepcopy(h_value)
|
||||
for s in CLOSED:
|
||||
h_list = []
|
||||
for s_n in self.get_neighbor(s):
|
||||
if s_n not in CLOSED:
|
||||
h_list.append(self.cost(s, s_n) + self.h_table[s_n])
|
||||
else:
|
||||
h_list.append(self.cost(s, s_n) + h_value[s_n])
|
||||
h_value[s] = min(h_list) # update h_value of current node
|
||||
|
||||
if h_value == h_value_rec: # h_value table converged
|
||||
return h_value
|
||||
|
||||
def Astar(self, x_start, N):
|
||||
OPEN = queue.QueuePrior() # OPEN set
|
||||
OPEN.put(x_start, self.h_table[x_start])
|
||||
CLOSED = [] # CLOSED set
|
||||
g_table = {x_start: 0, self.s_goal: float("inf")} # Cost to come
|
||||
PARENT = {x_start: x_start} # relations
|
||||
count = 0 # counter
|
||||
|
||||
while not OPEN.empty():
|
||||
count += 1
|
||||
s = OPEN.get()
|
||||
CLOSED.append(s)
|
||||
|
||||
if s == self.s_goal: # reach the goal node
|
||||
self.visited.append(CLOSED)
|
||||
return "FOUND", self.extract_path(x_start, PARENT), [], []
|
||||
|
||||
for s_n in self.get_neighbor(s):
|
||||
if s_n not in CLOSED:
|
||||
new_cost = g_table[s] + self.cost(s, s_n)
|
||||
if s_n not in g_table:
|
||||
g_table[s_n] = float("inf")
|
||||
if new_cost < g_table[s_n]: # conditions for updating Cost
|
||||
g_table[s_n] = new_cost
|
||||
PARENT[s_n] = s
|
||||
OPEN.put(s_n, g_table[s_n] + self.h_table[s_n])
|
||||
|
||||
if count == N: # expand needed CLOSED nodes
|
||||
break
|
||||
|
||||
self.visited.append(CLOSED) # visited nodes in each iteration
|
||||
|
||||
return OPEN, CLOSED, g_table, PARENT
|
||||
|
||||
def get_neighbor(self, s):
|
||||
"""
|
||||
find neighbors of state s that not in obstacles.
|
||||
:param s: state
|
||||
:return: neighbors
|
||||
"""
|
||||
|
||||
s_list = set()
|
||||
|
||||
for u in self.u_set:
|
||||
s_next = tuple([s[i] + u[i] for i in range(2)])
|
||||
if s_next not in self.obs:
|
||||
s_list.add(s_next)
|
||||
|
||||
return s_list
|
||||
|
||||
def extract_path_in_CLOSE(self, s_end, s_start, h_value):
|
||||
path = [s_start]
|
||||
s = s_start
|
||||
|
||||
while True:
|
||||
h_list = {}
|
||||
for s_n in self.get_neighbor(s):
|
||||
if s_n in h_value:
|
||||
h_list[s_n] = h_value[s_n]
|
||||
s_key = max(h_list, key=h_list.get) # move to the smallest node with min h_value
|
||||
path.append(s_key) # generate path
|
||||
s = s_key # use end of this iteration as the start of next
|
||||
|
||||
if s_key == s_end: # reach the expected node in OPEN set
|
||||
return s_start, list(reversed(path))
|
||||
|
||||
def extract_path(self, x_start, parent):
|
||||
"""
|
||||
Extract the path based on the relationship of nodes.
|
||||
:return: The planning path
|
||||
"""
|
||||
|
||||
path = [self.s_goal]
|
||||
s = self.s_goal
|
||||
|
||||
while True:
|
||||
s = parent[s]
|
||||
path.append(s)
|
||||
if s == x_start:
|
||||
break
|
||||
|
||||
return list(reversed(path))
|
||||
|
||||
def h(self, s):
|
||||
"""
|
||||
Calculate heuristic.
|
||||
:param s: current node (state)
|
||||
:return: heuristic function value
|
||||
"""
|
||||
|
||||
heuristic_type = self.heuristic_type # heuristic type
|
||||
goal = self.s_goal # goal node
|
||||
|
||||
if heuristic_type == "manhattan":
|
||||
return abs(goal[0] - s[0]) + abs(goal[1] - s[1])
|
||||
else:
|
||||
return math.hypot(goal[0] - s[0], goal[1] - s[1])
|
||||
|
||||
def cost(self, s_start, s_goal):
|
||||
"""
|
||||
Calculate Cost for this motion
|
||||
:param s_start: starting node
|
||||
:param s_goal: end node
|
||||
:return: Cost for this motion
|
||||
:note: Cost function could be more complicate!
|
||||
"""
|
||||
|
||||
if self.is_collision(s_start, s_goal):
|
||||
return float("inf")
|
||||
|
||||
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
|
||||
|
||||
def is_collision(self, s_start, s_end):
|
||||
if s_start in self.obs or s_end in self.obs:
|
||||
return True
|
||||
|
||||
if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
|
||||
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
|
||||
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
|
||||
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
|
||||
else:
|
||||
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
|
||||
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
|
||||
|
||||
if s1 in self.obs or s2 in self.obs:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def main():
|
||||
s_start = (10, 5)
|
||||
s_goal = (45, 25)
|
||||
|
||||
rtaa = RTAAStar(s_start, s_goal, 240, "euclidean")
|
||||
plot = plotting.Plotting(s_start, s_goal)
|
||||
|
||||
rtaa.searching()
|
||||
plot.animation_lrta(rtaa.path, rtaa.visited,
|
||||
"Real-time Adaptive A* (RTAA*)")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -1,69 +0,0 @@
|
||||
"""
|
||||
Breadth-first Searching_2D (BFS)
|
||||
@author: huiming zhou
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
from collections import deque
|
||||
|
||||
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
|
||||
"/../../Search_based_Planning/")
|
||||
|
||||
from Search_2D import plotting, env
|
||||
from Search_2D.Astar import AStar
|
||||
import math
|
||||
import heapq
|
||||
|
||||
class BFS(AStar):
|
||||
"""BFS add the new visited node in the end of the openset
|
||||
"""
|
||||
def searching(self):
|
||||
"""
|
||||
Breadth-first Searching.
|
||||
:return: path, visited order
|
||||
"""
|
||||
|
||||
self.PARENT[self.s_start] = self.s_start
|
||||
self.g[self.s_start] = 0
|
||||
self.g[self.s_goal] = math.inf
|
||||
heapq.heappush(self.OPEN,
|
||||
(0, self.s_start))
|
||||
|
||||
while self.OPEN:
|
||||
_, s = heapq.heappop(self.OPEN)
|
||||
self.CLOSED.append(s)
|
||||
|
||||
if s == self.s_goal:
|
||||
break
|
||||
|
||||
for s_n in self.get_neighbor(s):
|
||||
new_cost = self.g[s] + self.cost(s, s_n)
|
||||
|
||||
if s_n not in self.g:
|
||||
self.g[s_n] = math.inf
|
||||
|
||||
if new_cost < self.g[s_n]: # conditions for updating Cost
|
||||
self.g[s_n] = new_cost
|
||||
self.PARENT[s_n] = s
|
||||
|
||||
# bfs, add new node to the end of the openset
|
||||
prior = self.OPEN[-1][0]+1 if len(self.OPEN)>0 else 0
|
||||
heapq.heappush(self.OPEN, (prior, s_n))
|
||||
|
||||
return self.extract_path(self.PARENT), self.CLOSED
|
||||
|
||||
|
||||
def main():
|
||||
s_start = (5, 5)
|
||||
s_goal = (45, 25)
|
||||
|
||||
bfs = BFS(s_start, s_goal, 'None')
|
||||
plot = plotting.Plotting(s_start, s_goal)
|
||||
|
||||
path, visited = bfs.searching()
|
||||
plot.animation(path, visited, "Breadth-first Searching (BFS)")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -1,65 +0,0 @@
|
||||
|
||||
import os
|
||||
import sys
|
||||
import math
|
||||
import heapq
|
||||
|
||||
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
|
||||
"/../../Search_based_Planning/")
|
||||
|
||||
from Search_2D import plotting, env
|
||||
from Search_2D.Astar import AStar
|
||||
|
||||
class DFS(AStar):
|
||||
"""DFS add the new visited node in the front of the openset
|
||||
"""
|
||||
def searching(self):
|
||||
"""
|
||||
Breadth-first Searching.
|
||||
:return: path, visited order
|
||||
"""
|
||||
|
||||
self.PARENT[self.s_start] = self.s_start
|
||||
self.g[self.s_start] = 0
|
||||
self.g[self.s_goal] = math.inf
|
||||
heapq.heappush(self.OPEN,
|
||||
(0, self.s_start))
|
||||
|
||||
while self.OPEN:
|
||||
_, s = heapq.heappop(self.OPEN)
|
||||
self.CLOSED.append(s)
|
||||
|
||||
if s == self.s_goal:
|
||||
break
|
||||
|
||||
for s_n in self.get_neighbor(s):
|
||||
new_cost = self.g[s] + self.cost(s, s_n)
|
||||
|
||||
if s_n not in self.g:
|
||||
self.g[s_n] = math.inf
|
||||
|
||||
if new_cost < self.g[s_n]: # conditions for updating Cost
|
||||
self.g[s_n] = new_cost
|
||||
self.PARENT[s_n] = s
|
||||
|
||||
# dfs, add new node to the front of the openset
|
||||
prior = self.OPEN[0][0]-1 if len(self.OPEN)>0 else 0
|
||||
heapq.heappush(self.OPEN, (prior, s_n))
|
||||
|
||||
return self.extract_path(self.PARENT), self.CLOSED
|
||||
|
||||
|
||||
def main():
|
||||
s_start = (5, 5)
|
||||
s_goal = (45, 25)
|
||||
|
||||
dfs = DFS(s_start, s_goal, 'None')
|
||||
plot = plotting.Plotting(s_start, s_goal)
|
||||
|
||||
path, visited = dfs.searching()
|
||||
visited = list(dict.fromkeys(visited))
|
||||
plot.animation(path, visited, "Depth-first Searching (DFS)") # animation
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -1,62 +0,0 @@
|
||||
import collections
|
||||
import heapq
|
||||
|
||||
|
||||
class QueueFIFO:
|
||||
"""
|
||||
Class: QueueFIFO
|
||||
Description: QueueFIFO is designed for First-in-First-out rule.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.queue = collections.deque()
|
||||
|
||||
def empty(self):
|
||||
return len(self.queue) == 0
|
||||
|
||||
def put(self, node):
|
||||
self.queue.append(node) # enter from back
|
||||
|
||||
def get(self):
|
||||
return self.queue.popleft() # leave from front
|
||||
|
||||
|
||||
class QueueLIFO:
|
||||
"""
|
||||
Class: QueueLIFO
|
||||
Description: QueueLIFO is designed for Last-in-First-out rule.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.queue = collections.deque()
|
||||
|
||||
def empty(self):
|
||||
return len(self.queue) == 0
|
||||
|
||||
def put(self, node):
|
||||
self.queue.append(node) # enter from back
|
||||
|
||||
def get(self):
|
||||
return self.queue.pop() # leave from back
|
||||
|
||||
|
||||
class QueuePrior:
|
||||
"""
|
||||
Class: QueuePrior
|
||||
Description: QueuePrior reorders elements using value [priority]
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.queue = []
|
||||
|
||||
def empty(self):
|
||||
return len(self.queue) == 0
|
||||
|
||||
def put(self, item, priority):
|
||||
heapq.heappush(self.queue, (priority, item)) # reorder s using priority
|
||||
|
||||
def get(self):
|
||||
return heapq.heappop(self.queue)[1] # pop out the smallest item
|
||||
|
||||
def enumerate(self):
|
||||
return self.queue
|
Loading…
Reference in new issue