@ -1,3 +0,0 @@
# 默认忽略的文件
@ -1,12 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<component name="PyDocumentationSettings">
<option name="format" value="PLAIN" />
<option name="myDocStringFormat" value="Plain" />
@ -1,15 +0,0 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyCompatibilityInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ourVersions">
<list size="2">
<item index="0" class="java.lang.String" itemvalue="3.7" />
<item index="1" class="java.lang.String" itemvalue="3.8" />
@ -1,6 +0,0 @@
<component name="InspectionProjectProfileManager">
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
@ -1,4 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.9" project-jdk-type="Python SDK" />
@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<module fileurl="file://$PROJECT_DIR$/.idea/Search_2D.iml" filepath="$PROJECT_DIR$/.idea/Search_2D.iml" />
@ -1,222 +0,0 @@
ARA_star 2D (Anytime Repairing A*)
@author: huiming zhou
@description: local inconsistency: g-value decreased.
g(s) decreased introduces a local inconsistency between s and its successors.
import os
import sys
import math
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
from Search_2D import plotting, env
class AraStar:
def __init__(self, s_start, s_goal, e, heuristic_type):
self.s_start, self.s_goal = s_start, s_goal
self.heuristic_type = heuristic_type
self.Env = env.Env() # class Env
self.u_set = self.Env.motions # feasible input set
self.obs = self.Env.obs # position of obstacles
self.e = e # weight
self.g = dict() # Cost to come
self.OPEN = dict() # priority queue / OPEN set
self.CLOSED = set() # CLOSED set
self.PARENT = dict() # relations
self.path = [] # planning path
self.visited = [] # order of visited nodes
def init(self):
initialize each set.
self.g[self.s_start] = 0.0
self.g[self.s_goal] = math.inf
self.OPEN[self.s_start] = self.f_value(self.s_start)
self.PARENT[self.s_start] = self.s_start
def searching(self):
while self.update_e() > 1: # continue condition
self.e -= 0.4 # increase weight
self.OPEN = {s: self.f_value(s) for s in self.OPEN} # update f_value of OPEN set
self.INCONS = dict()
self.CLOSED = set()
self.ImprovePath() # improve path
return self.path, self.visited
def ImprovePath(self):
:return: a e'-suboptimal path
visited_each = []
while True:
s, f_small = self.calc_smallest_f()
if self.f_value(self.s_goal) <= f_small:
for s_n in self.get_neighbor(s):
if s_n in self.obs:
new_cost = self.g[s] + self.cost(s, s_n)
if s_n not in self.g or new_cost < self.g[s_n]:
self.g[s_n] = new_cost
self.PARENT[s_n] = s
if s_n not in self.CLOSED:
self.OPEN[s_n] = self.f_value(s_n)
self.INCONS[s_n] = 0.0
def calc_smallest_f(self):
:return: node with smallest f_value in OPEN set.
s_small = min(self.OPEN, key=self.OPEN.get)
return s_small, self.OPEN[s_small]
def get_neighbor(self, s):
find neighbors of state s that not in obstacles.
:param s: state
:return: neighbors
return {(s[0] + u[0], s[1] + u[1]) for u in self.u_set}
def update_e(self):
v = float("inf")
if self.OPEN:
v = min(self.g[s] + self.h(s) for s in self.OPEN)
if self.INCONS:
v = min(v, min(self.g[s] + self.h(s) for s in self.INCONS))
return min(self.e, self.g[self.s_goal] / v)
def f_value(self, x):
f = g + e * h
f = cost-to-come + weight * cost-to-go
:param x: current state
:return: f_value
return self.g[x] + self.e * self.h(x)
def extract_path(self):
Extract the path based on the PARENT set.
:return: The planning path
path = [self.s_goal]
s = self.s_goal
while True:
s = self.PARENT[s]
if s == self.s_start:
return list(path)
def h(self, s):
Calculate heuristic.
:param s: current node (state)
:return: heuristic function value
heuristic_type = self.heuristic_type # heuristic type
goal = self.s_goal # goal node
if heuristic_type == "manhattan":
return abs(goal[0] - s[0]) + abs(goal[1] - s[1])
return math.hypot(goal[0] - s[0], goal[1] - s[1])
def cost(self, s_start, s_goal):
Calculate Cost for this motion
:param s_start: starting node
:param s_goal: end node
:return: Cost for this motion
:note: Cost function could be more complicate!
if self.is_collision(s_start, s_goal):
return math.inf
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
def is_collision(self, s_start, s_end):
check if the line segment (s_start, s_end) is collision.
:param s_start: start node
:param s_end: end node
:return: True: is collision / False: not collision
if s_start in self.obs or s_end in self.obs:
return True
if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
if s1 in self.obs or s2 in self.obs:
return True
return False
def main():
s_start = (5, 5)
s_goal = (45, 25)
arastar = AraStar(s_start, s_goal, 2.5, "euclidean")
plot = plotting.Plotting(s_start, s_goal)
path, visited = arastar.searching()
plot.animation_ara_star(path, visited, "Anytime Repairing A* (ARA*)")
if __name__ == '__main__':
@ -1,317 +0,0 @@
Anytime_D_star 2D
@author: huiming zhou
import os
import sys
import math
import matplotlib.pyplot as plt
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
from Search_2D import plotting
from Search_2D import env
class ADStar:
def __init__(self, s_start, s_goal, eps, heuristic_type):
self.s_start, self.s_goal = s_start, s_goal
self.heuristic_type = heuristic_type
self.Env = env.Env() # class Env
self.Plot = plotting.Plotting(s_start, s_goal)
self.u_set = self.Env.motions # feasible input set
self.obs = self.Env.obs # position of obstacles
self.x = self.Env.x_range
self.y = self.Env.y_range
self.g, self.rhs, self.OPEN = {}, {}, {}
for i in range(1, self.Env.x_range - 1):
for j in range(1, self.Env.y_range - 1):
self.rhs[(i, j)] = float("inf")
self.g[(i, j)] = float("inf")
self.rhs[self.s_goal] = 0.0
self.eps = eps
self.OPEN[self.s_goal] = self.Key(self.s_goal)
self.CLOSED, self.INCONS = set(), dict()
self.visited = set()
self.count = 0
self.count_env_change = 0
self.obs_add = set()
self.obs_remove = set()
self.title = "Anytime D*: Small changes" # Significant changes
self.fig = plt.figure()
def run(self):
self.visited = set()
while True:
if self.eps <= 1.0:
self.eps -= 0.5
for s in self.OPEN:
self.OPEN[s] = self.Key(s)
self.CLOSED = set()
self.visited = set()
self.fig.canvas.mpl_connect('button_press_event', self.on_press)
def on_press(self, event):
x, y = event.xdata, event.ydata
if x < 0 or x > self.x - 1 or y < 0 or y > self.y - 1:
print("Please choose right area!")
self.count_env_change += 1
x, y = int(x), int(y)
print("Change position: s =", x, ",", "y =", y)
# for small changes
if self.title == "Anytime D*: Small changes":
if (x, y) not in self.obs:
self.obs.add((x, y))
self.g[(x, y)] = float("inf")
self.rhs[(x, y)] = float("inf")
self.obs.remove((x, y))
self.UpdateState((x, y))
for sn in self.get_neighbor((x, y)):
while True:
if len(self.INCONS) == 0:
for s in self.OPEN:
self.OPEN[s] = self.Key(s)
self.CLOSED = set()
# plt.plot(self.title)
self.visited = set()
if self.eps <= 1.0:
if (x, y) not in self.obs:
self.obs.add((x, y))
self.obs_add.add((x, y))
plt.plot(x, y, 'sk')
if (x, y) in self.obs_remove:
self.obs_remove.remove((x, y))
self.obs.remove((x, y))
self.obs_remove.add((x, y))
plt.plot(x, y, marker='s', color='white')
if (x, y) in self.obs_add:
self.obs_add.remove((x, y))
if self.count_env_change >= 15:
self.count_env_change = 0
self.eps += 2.0
for s in self.obs_add:
self.g[(x, y)] = float("inf")
self.rhs[(x, y)] = float("inf")
for sn in self.get_neighbor(s):
for s in self.obs_remove:
for sn in self.get_neighbor(s):
while True:
if self.eps <= 1.0:
self.eps -= 0.5
for s in self.OPEN:
self.OPEN[s] = self.Key(s)
self.CLOSED = set()
self.visited = set()
def ComputeOrImprovePath(self):
while True:
s, v = self.TopKey()
if v >= self.Key(self.s_start) and \
self.rhs[self.s_start] == self.g[self.s_start]:
if self.g[s] > self.rhs[s]:
self.g[s] = self.rhs[s]
for sn in self.get_neighbor(s):
self.g[s] = float("inf")
for sn in self.get_neighbor(s):
def UpdateState(self, s):
if s != self.s_goal:
self.rhs[s] = float("inf")
for x in self.get_neighbor(s):
self.rhs[s] = min(self.rhs[s], self.g[x] + self.cost(s, x))
if s in self.OPEN:
if self.g[s] != self.rhs[s]:
if s not in self.CLOSED:
self.OPEN[s] = self.Key(s)
self.INCONS[s] = 0
def Key(self, s):
if self.g[s] > self.rhs[s]:
return [self.rhs[s] + self.eps * self.h(self.s_start, s), self.rhs[s]]
return [self.g[s] + self.h(self.s_start, s), self.g[s]]
def TopKey(self):
:return: return the min key and its value.
s = min(self.OPEN, key=self.OPEN.get)
return s, self.OPEN[s]
def h(self, s_start, s_goal):
heuristic_type = self.heuristic_type # heuristic type
if heuristic_type == "manhattan":
return abs(s_goal[0] - s_start[0]) + abs(s_goal[1] - s_start[1])
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
def cost(self, s_start, s_goal):
Calculate Cost for this motion
:param s_start: starting node
:param s_goal: end node
:return: Cost for this motion
:note: Cost function could be more complicate!
if self.is_collision(s_start, s_goal):
return float("inf")
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
def is_collision(self, s_start, s_end):
if s_start in self.obs or s_end in self.obs:
return True
if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
if s1 in self.obs or s2 in self.obs:
return True
return False
def get_neighbor(self, s):
nei_list = set()
for u in self.u_set:
s_next = tuple([s[i] + u[i] for i in range(2)])
if s_next not in self.obs:
return nei_list
def extract_path(self):
Extract the path based on the PARENT set.
:return: The planning path
path = [self.s_start]
s = self.s_start
for k in range(100):
g_list = {}
for x in self.get_neighbor(s):
if not self.is_collision(s, x):
g_list[x] = self.g[x]
s = min(g_list, key=g_list.get)
if s == self.s_goal:
return list(path)
def plot_path(self, path):
px = [x[0] for x in path]
py = [x[1] for x in path]
plt.plot(px, py, linewidth=2)
plt.plot(self.s_start[0], self.s_start[1], "bs")
plt.plot(self.s_goal[0], self.s_goal[1], "gs")
def plot_visited(self):
self.count += 1
color = ['gainsboro', 'lightgray', 'silver', 'darkgray',
'bisque', 'navajowhite', 'moccasin', 'wheat',
'powderblue', 'skyblue', 'lightskyblue', 'cornflowerblue']
if self.count >= len(color) - 1:
self.count = 0
for x in self.visited:
plt.plot(x[0], x[1], marker='s', color=color[self.count])
def main():
s_start = (5, 5)
s_goal = (45, 25)
dstar = ADStar(s_start, s_goal, 2.5, "euclidean")
if __name__ == '__main__':
@ -1,225 +0,0 @@
A_star 2D
@author: huiming zhou
import os
import sys
import math
import heapq
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
from Search_2D import plotting, env
class AStar:
"""AStar set the cost + heuristics as the priority
def __init__(self, s_start, s_goal, heuristic_type):
self.s_start = s_start
self.s_goal = s_goal
self.heuristic_type = heuristic_type
self.Env = env.Env() # class Env
self.u_set = self.Env.motions # feasible input set
self.obs = self.Env.obs # position of obstacles
self.OPEN = [] # priority queue / OPEN set
self.CLOSED = [] # CLOSED set / VISITED order
self.PARENT = dict() # recorded parent
self.g = dict() # cost to come
def searching(self):
A_star Searching.
:return: path, visited order
self.PARENT[self.s_start] = self.s_start
self.g[self.s_start] = 0
self.g[self.s_goal] = math.inf
(self.f_value(self.s_start), self.s_start))
while self.OPEN:
_, s = heapq.heappop(self.OPEN)
if s == self.s_goal: # stop condition
for s_n in self.get_neighbor(s):
new_cost = self.g[s] + self.cost(s, s_n)
if s_n not in self.g:
self.g[s_n] = math.inf
if new_cost < self.g[s_n]: # conditions for updating Cost
self.g[s_n] = new_cost
self.PARENT[s_n] = s
heapq.heappush(self.OPEN, (self.f_value(s_n), s_n))
return self.extract_path(self.PARENT), self.CLOSED
def searching_repeated_astar(self, e):
repeated A*.
:param e: weight of A*
:return: path and visited order
path, visited = [], []
while e >= 1:
p_k, v_k = self.repeated_searching(self.s_start, self.s_goal, e)
e -= 0.5
return path, visited
def repeated_searching(self, s_start, s_goal, e):
run A* with weight e.
:param s_start: starting state
:param s_goal: goal state
:param e: weight of a*
:return: path and visited order.
g = {s_start: 0, s_goal: float("inf")}
PARENT = {s_start: s_start}
OPEN = []
(g[s_start] + e * self.heuristic(s_start), s_start))
while OPEN:
_, s = heapq.heappop(OPEN)
if s == s_goal:
for s_n in self.get_neighbor(s):
new_cost = g[s] + self.cost(s, s_n)
if s_n not in g:
g[s_n] = math.inf
if new_cost < g[s_n]: # conditions for updating Cost
g[s_n] = new_cost
PARENT[s_n] = s
heapq.heappush(OPEN, (g[s_n] + e * self.heuristic(s_n), s_n))
return self.extract_path(PARENT), CLOSED
def get_neighbor(self, s):
find neighbors of state s that not in obstacles.
:param s: state
:return: neighbors
return [(s[0] + u[0], s[1] + u[1]) for u in self.u_set]
def cost(self, s_start, s_goal):
Calculate Cost for this motion
:param s_start: starting node
:param s_goal: end node
:return: Cost for this motion
:note: Cost function could be more complicate!
if self.is_collision(s_start, s_goal):
return math.inf
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
def is_collision(self, s_start, s_end):
check if the line segment (s_start, s_end) is collision.
:param s_start: start node
:param s_end: end node
:return: True: is collision / False: not collision
if s_start in self.obs or s_end in self.obs:
return True
if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
if s1 in self.obs or s2 in self.obs:
return True
return False
def f_value(self, s):
f = g + h. (g: Cost to come, h: heuristic value)
:param s: current state
:return: f
return self.g[s] + self.heuristic(s)
def extract_path(self, PARENT):
Extract the path based on the PARENT set.
:return: The planning path
path = [self.s_goal]
s = self.s_goal
while True:
s = PARENT[s]
if s == self.s_start:
return list(path)
def heuristic(self, s):
Calculate heuristic.
:param s: current node (state)
:return: heuristic function value
heuristic_type = self.heuristic_type # heuristic type
goal = self.s_goal # goal node
if heuristic_type == "manhattan":
return abs(goal[0] - s[0]) + abs(goal[1] - s[1])
return math.hypot(goal[0] - s[0], goal[1] - s[1])
def main():
s_start = (5, 5)
s_goal = (45, 25)
astar = AStar(s_start, s_goal, "euclidean")
plot = plotting.Plotting(s_start, s_goal)
path, visited = astar.searching()
plot.animation(path, visited, "A*") # animation
# path, visited = astar.searching_repeated_astar(2.5) # initial weight e = 2.5
# plot.animation_ara_star(path, visited, "Repeated A*")
if __name__ == '__main__':
@ -1,68 +0,0 @@
Best-First Searching
@author: huiming zhou
import os
import sys
import math
import heapq
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
from Search_2D import plotting, env
from Search_2D.Astar import AStar
class BestFirst(AStar):
"""BestFirst set the heuristics as the priority
def searching(self):
Breadth-first Searching.
:return: path, visited order
self.PARENT[self.s_start] = self.s_start
self.g[self.s_start] = 0
self.g[self.s_goal] = math.inf
(self.heuristic(self.s_start), self.s_start))
while self.OPEN:
_, s = heapq.heappop(self.OPEN)
if s == self.s_goal:
for s_n in self.get_neighbor(s):
new_cost = self.g[s] + self.cost(s, s_n)
if s_n not in self.g:
self.g[s_n] = math.inf
if new_cost < self.g[s_n]: # conditions for updating Cost
self.g[s_n] = new_cost
self.PARENT[s_n] = s
# best first set the heuristics as the priority
heapq.heappush(self.OPEN, (self.heuristic(s_n), s_n))
return self.extract_path(self.PARENT), self.CLOSED
def main():
s_start = (5, 5)
s_goal = (45, 25)
BF = BestFirst(s_start, s_goal, 'euclidean')
plot = plotting.Plotting(s_start, s_goal)
path, visited = BF.searching()
plot.animation(path, visited, "Best-first Searching") # animation
if __name__ == '__main__':
@ -1,229 +0,0 @@
Bidirectional_a_star 2D
@author: huiming zhou
import os
import sys
import math
import heapq
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
from Search_2D import plotting, env
class BidirectionalAStar:
def __init__(self, s_start, s_goal, heuristic_type):
self.s_start = s_start
self.s_goal = s_goal
self.heuristic_type = heuristic_type
self.Env = env.Env() # class Env
self.u_set = self.Env.motions # feasible input set
self.obs = self.Env.obs # position of obstacles
self.OPEN_fore = [] # OPEN set for forward searching
self.OPEN_back = [] # OPEN set for backward searching
self.CLOSED_fore = [] # CLOSED set for forward
self.CLOSED_back = [] # CLOSED set for backward
self.PARENT_fore = dict() # recorded parent for forward
self.PARENT_back = dict() # recorded parent for backward
self.g_fore = dict() # cost to come for forward
self.g_back = dict() # cost to come for backward
def init(self):
initialize parameters
self.g_fore[self.s_start] = 0.0
self.g_fore[self.s_goal] = math.inf
self.g_back[self.s_goal] = 0.0
self.g_back[self.s_start] = math.inf
self.PARENT_fore[self.s_start] = self.s_start
self.PARENT_back[self.s_goal] = self.s_goal
(self.f_value_fore(self.s_start), self.s_start))
(self.f_value_back(self.s_goal), self.s_goal))
def searching(self):
Bidirectional A*
:return: connected path, visited order of forward, visited order of backward
s_meet = self.s_start
while self.OPEN_fore and self.OPEN_back:
# solve foreward-search
_, s_fore = heapq.heappop(self.OPEN_fore)
if s_fore in self.PARENT_back:
s_meet = s_fore
for s_n in self.get_neighbor(s_fore):
new_cost = self.g_fore[s_fore] + self.cost(s_fore, s_n)
if s_n not in self.g_fore:
self.g_fore[s_n] = math.inf
if new_cost < self.g_fore[s_n]:
self.g_fore[s_n] = new_cost
self.PARENT_fore[s_n] = s_fore
(self.f_value_fore(s_n), s_n))
# solve backward-search
_, s_back = heapq.heappop(self.OPEN_back)
if s_back in self.PARENT_fore:
s_meet = s_back
for s_n in self.get_neighbor(s_back):
new_cost = self.g_back[s_back] + self.cost(s_back, s_n)
if s_n not in self.g_back:
self.g_back[s_n] = math.inf
if new_cost < self.g_back[s_n]:
self.g_back[s_n] = new_cost
self.PARENT_back[s_n] = s_back
(self.f_value_back(s_n), s_n))
return self.extract_path(s_meet), self.CLOSED_fore, self.CLOSED_back
def get_neighbor(self, s):
find neighbors of state s that not in obstacles.
:param s: state
:return: neighbors
return [(s[0] + u[0], s[1] + u[1]) for u in self.u_set]
def extract_path(self, s_meet):
extract path from start and goal
:param s_meet: meet point of bi-direction a*
:return: path
# extract path for foreward part
path_fore = [s_meet]
s = s_meet
while True:
s = self.PARENT_fore[s]
if s == self.s_start:
# extract path for backward part
path_back = []
s = s_meet
while True:
s = self.PARENT_back[s]
if s == self.s_goal:
return list(reversed(path_fore)) + list(path_back)
def f_value_fore(self, s):
forward searching: f = g + h. (g: Cost to come, h: heuristic value)
:param s: current state
:return: f
return self.g_fore[s] + self.h(s, self.s_goal)
def f_value_back(self, s):
backward searching: f = g + h. (g: Cost to come, h: heuristic value)
:param s: current state
:return: f
return self.g_back[s] + self.h(s, self.s_start)
def h(self, s, goal):
Calculate heuristic value.
:param s: current node (state)
:param goal: goal node (state)
:return: heuristic value
heuristic_type = self.heuristic_type
if heuristic_type == "manhattan":
return abs(goal[0] - s[0]) + abs(goal[1] - s[1])
return math.hypot(goal[0] - s[0], goal[1] - s[1])
def cost(self, s_start, s_goal):
Calculate Cost for this motion
:param s_start: starting node
:param s_goal: end node
:return: Cost for this motion
:note: Cost function could be more complicate!
if self.is_collision(s_start, s_goal):
return math.inf
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
def is_collision(self, s_start, s_end):
check if the line segment (s_start, s_end) is collision.
:param s_start: start node
:param s_end: end node
:return: True: is collision / False: not collision
if s_start in self.obs or s_end in self.obs:
return True
if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
if s1 in self.obs or s2 in self.obs:
return True
return False
def main():
x_start = (5, 5)
x_goal = (45, 25)
bastar = BidirectionalAStar(x_start, x_goal, "euclidean")
plot = plotting.Plotting(x_start, x_goal)
path, visited_fore, visited_back = bastar.searching()
plot.animation_bi_astar(path, visited_fore, visited_back, "Bidirectional-A*") # animation
if __name__ == '__main__':
@ -1,304 +0,0 @@
D_star 2D
@author: huiming zhou
import os
import sys
import math
import matplotlib.pyplot as plt
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
from Search_2D import plotting, env
class DStar:
def __init__(self, s_start, s_goal):
self.s_start, self.s_goal = s_start, s_goal
self.Env = env.Env()
self.Plot = plotting.Plotting(self.s_start, self.s_goal)
self.u_set = self.Env.motions
self.obs = self.Env.obs
self.x = self.Env.x_range
self.y = self.Env.y_range
self.fig = plt.figure()
self.OPEN = set()
self.t = dict()
self.PARENT = dict()
self.h = dict()
self.k = dict()
self.path = []
self.visited = set()
self.count = 0
def init(self):
for i in range(self.Env.x_range):
for j in range(self.Env.y_range):
self.t[(i, j)] = 'NEW'
self.k[(i, j)] = 0.0
self.h[(i, j)] = float("inf")
self.PARENT[(i, j)] = None
self.h[self.s_goal] = 0.0
def run(self, s_start, s_end):
self.insert(s_end, 0)
while True:
if self.t[s_start] == 'CLOSED':
self.path = self.extract_path(s_start, s_end)
self.Plot.plot_grid("Dynamic A* (D*)")
self.fig.canvas.mpl_connect('button_press_event', self.on_press)
def on_press(self, event):
x, y = event.xdata, event.ydata
if x < 0 or x > self.x - 1 or y < 0 or y > self.y - 1:
print("Please choose right area!")
x, y = int(x), int(y)
if (x, y) not in self.obs:
print("Add obstacle at: s =", x, ",", "y =", y)
self.obs.add((x, y))
s = self.s_start
self.visited = set()
self.count += 1
while s != self.s_goal:
if self.is_collision(s, self.PARENT[s]):
s = self.PARENT[s]
self.path = self.extract_path(self.s_start, self.s_goal)
self.Plot.plot_grid("Dynamic A* (D*)")
def extract_path(self, s_start, s_end):
path = [s_start]
s = s_start
while True:
s = self.PARENT[s]
if s == s_end:
return path
def process_state(self):
s = self.min_state() # get node in OPEN set with min k value
if s is None:
return -1 # OPEN set is empty
k_old = self.get_k_min() # record the min k value of this iteration (min path cost)
self.delete(s) # move state s from OPEN set to CLOSED set
# k_min < h[s] --> s: RAISE state (increased cost)
if k_old < self.h[s]:
for s_n in self.get_neighbor(s):
if self.h[s_n] <= k_old and \
self.h[s] > self.h[s_n] + self.cost(s_n, s):
# update h_value and choose parent
self.PARENT[s] = s_n
self.h[s] = self.h[s_n] + self.cost(s_n, s)
# s: k_min >= h[s] -- > s: LOWER state (cost reductions)
if k_old == self.h[s]:
for s_n in self.get_neighbor(s):
if self.t[s_n] == 'NEW' or \
(self.PARENT[s_n] == s and self.h[s_n] != self.h[s] + self.cost(s, s_n)) or \
(self.PARENT[s_n] != s and self.h[s_n] > self.h[s] + self.cost(s, s_n)):
# Condition:
# 1) t[s_n] == 'NEW': not visited
# 2) s_n's parent: cost reduction
# 3) s_n find a better parent
self.PARENT[s_n] = s
self.insert(s_n, self.h[s] + self.cost(s, s_n))
for s_n in self.get_neighbor(s):
if self.t[s_n] == 'NEW' or \
(self.PARENT[s_n] == s and self.h[s_n] != self.h[s] + self.cost(s, s_n)):
# Condition:
# 1) t[s_n] == 'NEW': not visited
# 2) s_n's parent: cost reduction
self.PARENT[s_n] = s
self.insert(s_n, self.h[s] + self.cost(s, s_n))
if self.PARENT[s_n] != s and \
self.h[s_n] > self.h[s] + self.cost(s, s_n):
# Condition: LOWER happened in OPEN set (s), s should be explored again
self.insert(s, self.h[s])
if self.PARENT[s_n] != s and \
self.h[s] > self.h[s_n] + self.cost(s_n, s) and \
self.t[s_n] == 'CLOSED' and \
self.h[s_n] > k_old:
# Condition: LOWER happened in CLOSED set (s_n), s_n should be explored again
self.insert(s_n, self.h[s_n])
return self.get_k_min()
def min_state(self):
choose the node with the minimum k value in OPEN set.
:return: state
if not self.OPEN:
return None
return min(self.OPEN, key=lambda x: self.k[x])
def get_k_min(self):
calc the min k value for nodes in OPEN set.
:return: k value
if not self.OPEN:
return -1
return min([self.k[x] for x in self.OPEN])
def insert(self, s, h_new):
insert node into OPEN set.
:param s: node
:param h_new: new or better cost to come value
if self.t[s] == 'NEW':
self.k[s] = h_new
elif self.t[s] == 'OPEN':
self.k[s] = min(self.k[s], h_new)
elif self.t[s] == 'CLOSED':
self.k[s] = min(self.h[s], h_new)
self.h[s] = h_new
self.t[s] = 'OPEN'
def delete(self, s):
delete: move state s from OPEN set to CLOSED set.
:param s: state should be deleted
if self.t[s] == 'OPEN':
self.t[s] = 'CLOSED'
def modify(self, s):
start processing from state s.
:param s: is a node whose status is RAISE or LOWER.
while True:
k_min = self.process_state()
if k_min >= self.h[s]:
def modify_cost(self, s):
# if node in CLOSED set, put it into OPEN set.
# Since cost may be changed between s - s.parent, calc cost(s, s.p) again
if self.t[s] == 'CLOSED':
self.insert(s, self.h[self.PARENT[s]] + self.cost(s, self.PARENT[s]))
def get_neighbor(self, s):
nei_list = set()
for u in self.u_set:
s_next = tuple([s[i] + u[i] for i in range(2)])
if s_next not in self.obs:
return nei_list
def cost(self, s_start, s_goal):
Calculate Cost for this motion
:param s_start: starting node
:param s_goal: end node
:return: Cost for this motion
:note: Cost function could be more complicate!
if self.is_collision(s_start, s_goal):
return float("inf")
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
def is_collision(self, s_start, s_end):
if s_start in self.obs or s_end in self.obs:
return True
if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
if s1 in self.obs or s2 in self.obs:
return True
return False
def plot_path(self, path):
px = [x[0] for x in path]
py = [x[1] for x in path]
plt.plot(px, py, linewidth=2)
plt.plot(self.s_start[0], self.s_start[1], "bs")
plt.plot(self.s_goal[0], self.s_goal[1], "gs")
def plot_visited(self, visited):
color = ['gainsboro', 'lightgray', 'silver', 'darkgray',
'bisque', 'navajowhite', 'moccasin', 'wheat',
'powderblue', 'skyblue', 'lightskyblue', 'cornflowerblue']
if self.count >= len(color) - 1:
self.count = 0
for x in visited:
plt.plot(x[0], x[1], marker='s', color=color[self.count])
def main():
s_start = (5, 5)
s_goal = (45, 25)
dstar = DStar(s_start, s_goal)
||||||, s_goal)
if __name__ == '__main__':
@ -1,239 +0,0 @@
D_star_Lite 2D
@author: huiming zhou
import os
import sys
import math
import matplotlib.pyplot as plt
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
from Search_2D import plotting, env
class DStar:
def __init__(self, s_start, s_goal, heuristic_type):
self.s_start, self.s_goal = s_start, s_goal
self.heuristic_type = heuristic_type
self.Env = env.Env() # class Env
self.Plot = plotting.Plotting(s_start, s_goal)
self.u_set = self.Env.motions # feasible input set
self.obs = self.Env.obs # position of obstacles
self.x = self.Env.x_range
self.y = self.Env.y_range
self.g, self.rhs, self.U = {}, {}, {}
|||||| = 0
for i in range(1, self.Env.x_range - 1):
for j in range(1, self.Env.y_range - 1):
self.rhs[(i, j)] = float("inf")
self.g[(i, j)] = float("inf")
self.rhs[self.s_goal] = 0.0
self.U[self.s_goal] = self.CalculateKey(self.s_goal)
self.visited = set()
self.count = 0
self.fig = plt.figure()
def run(self):
self.Plot.plot_grid("D* Lite")
self.fig.canvas.mpl_connect('button_press_event', self.on_press)
def on_press(self, event):
x, y = event.xdata, event.ydata
if x < 0 or x > self.x - 1 or y < 0 or y > self.y - 1:
print("Please choose right area!")
x, y = int(x), int(y)
print("Change position: s =", x, ",", "y =", y)
s_curr = self.s_start
s_last = self.s_start
i = 0
path = [self.s_start]
while s_curr != self.s_goal:
s_list = {}
for s in self.get_neighbor(s_curr):
s_list[s] = self.g[s] + self.cost(s_curr, s)
s_curr = min(s_list, key=s_list.get)
if i < 1:
|||||| += self.h(s_last, s_curr)
s_last = s_curr
if (x, y) not in self.obs:
self.obs.add((x, y))
plt.plot(x, y, 'sk')
self.g[(x, y)] = float("inf")
self.rhs[(x, y)] = float("inf")
self.obs.remove((x, y))
plt.plot(x, y, marker='s', color='white')
self.UpdateVertex((x, y))
for s in self.get_neighbor((x, y)):
i += 1
self.count += 1
self.visited = set()
def ComputePath(self):
while True:
s, v = self.TopKey()
if v >= self.CalculateKey(self.s_start) and \
self.rhs[self.s_start] == self.g[self.s_start]:
k_old = v
if k_old < self.CalculateKey(s):
self.U[s] = self.CalculateKey(s)
elif self.g[s] > self.rhs[s]:
self.g[s] = self.rhs[s]
for x in self.get_neighbor(s):
self.g[s] = float("inf")
for x in self.get_neighbor(s):
def UpdateVertex(self, s):
if s != self.s_goal:
self.rhs[s] = float("inf")
for x in self.get_neighbor(s):
self.rhs[s] = min(self.rhs[s], self.g[x] + self.cost(s, x))
if s in self.U:
if self.g[s] != self.rhs[s]:
self.U[s] = self.CalculateKey(s)
def CalculateKey(self, s):
return [min(self.g[s], self.rhs[s]) + self.h(self.s_start, s) +,
min(self.g[s], self.rhs[s])]
def TopKey(self):
:return: return the min key and its value.
s = min(self.U, key=self.U.get)
return s, self.U[s]
def h(self, s_start, s_goal):
heuristic_type = self.heuristic_type # heuristic type
if heuristic_type == "manhattan":
return abs(s_goal[0] - s_start[0]) + abs(s_goal[1] - s_start[1])
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
def cost(self, s_start, s_goal):
Calculate Cost for this motion
:param s_start: starting node
:param s_goal: end node
:return: Cost for this motion
:note: Cost function could be more complicate!
if self.is_collision(s_start, s_goal):
return float("inf")
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
def is_collision(self, s_start, s_end):
if s_start in self.obs or s_end in self.obs:
return True
if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
if s1 in self.obs or s2 in self.obs:
return True
return False
def get_neighbor(self, s):
nei_list = set()
for u in self.u_set:
s_next = tuple([s[i] + u[i] for i in range(2)])
if s_next not in self.obs:
return nei_list
def extract_path(self):
Extract the path based on the PARENT set.
:return: The planning path
path = [self.s_start]
s = self.s_start
for k in range(100):
g_list = {}
for x in self.get_neighbor(s):
if not self.is_collision(s, x):
g_list[x] = self.g[x]
s = min(g_list, key=g_list.get)
if s == self.s_goal:
return list(path)
def plot_path(self, path):
px = [x[0] for x in path]
py = [x[1] for x in path]
plt.plot(px, py, linewidth=2)
plt.plot(self.s_start[0], self.s_start[1], "bs")
plt.plot(self.s_goal[0], self.s_goal[1], "gs")
def plot_visited(self, visited):
color = ['gainsboro', 'lightgray', 'silver', 'darkgray',
'bisque', 'navajowhite', 'moccasin', 'wheat',
'powderblue', 'skyblue', 'lightskyblue', 'cornflowerblue']
if self.count >= len(color) - 1:
self.count = 0
for x in visited:
plt.plot(x[0], x[1], marker='s', color=color[self.count])
def main():
s_start = (5, 5)
s_goal = (45, 25)
dstar = DStar(s_start, s_goal, "euclidean")
if __name__ == '__main__':
@ -1,69 +0,0 @@
Dijkstra 2D
@author: huiming zhou
import os
import sys
import math
import heapq
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
from Search_2D import plotting, env
from Search_2D.Astar import AStar
class Dijkstra(AStar):
"""Dijkstra set the cost as the priority
def searching(self):
Breadth-first Searching.
:return: path, visited order
self.PARENT[self.s_start] = self.s_start
self.g[self.s_start] = 0
self.g[self.s_goal] = math.inf
(0, self.s_start))
while self.OPEN:
_, s = heapq.heappop(self.OPEN)
if s == self.s_goal:
for s_n in self.get_neighbor(s):
new_cost = self.g[s] + self.cost(s, s_n)
if s_n not in self.g:
self.g[s_n] = math.inf
if new_cost < self.g[s_n]: # conditions for updating Cost
self.g[s_n] = new_cost
self.PARENT[s_n] = s
# best first set the heuristics as the priority
heapq.heappush(self.OPEN, (new_cost, s_n))
return self.extract_path(self.PARENT), self.CLOSED
def main():
s_start = (5, 5)
s_goal = (45, 25)
dijkstra = Dijkstra(s_start, s_goal, 'None')
plot = plotting.Plotting(s_start, s_goal)
path, visited = dijkstra.searching()
plot.animation(path, visited, "Dijkstra's") # animation generate
if __name__ == '__main__':
@ -1,256 +0,0 @@
LPA_star 2D
@author: huiming zhou
import os
import sys
import math
import matplotlib.pyplot as plt
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
from Search_2D import plotting, env
class LPAStar:
def __init__(self, s_start, s_goal, heuristic_type):
self.s_start, self.s_goal = s_start, s_goal
self.heuristic_type = heuristic_type
self.Env = env.Env()
self.Plot = plotting.Plotting(self.s_start, self.s_goal)
self.u_set = self.Env.motions
self.obs = self.Env.obs
self.x = self.Env.x_range
self.y = self.Env.y_range
self.g, self.rhs, self.U = {}, {}, {}
for i in range(self.Env.x_range):
for j in range(self.Env.y_range):
self.rhs[(i, j)] = float("inf")
self.g[(i, j)] = float("inf")
self.rhs[self.s_start] = 0
self.U[self.s_start] = self.CalculateKey(self.s_start)
self.visited = set()
self.count = 0
self.fig = plt.figure()
def run(self):
self.Plot.plot_grid("Lifelong Planning A*")
self.fig.canvas.mpl_connect('button_press_event', self.on_press)
def on_press(self, event):
x, y = event.xdata, event.ydata
if x < 0 or x > self.x - 1 or y < 0 or y > self.y - 1:
print("Please choose right area!")
x, y = int(x), int(y)
print("Change position: s =", x, ",", "y =", y)
self.visited = set()
self.count += 1
if (x, y) not in self.obs:
self.obs.add((x, y))
self.obs.remove((x, y))
self.UpdateVertex((x, y))
for s_n in self.get_neighbor((x, y)):
self.Plot.plot_grid("Lifelong Planning A*")
def ComputeShortestPath(self):
while True:
s, v = self.TopKey()
if v >= self.CalculateKey(self.s_goal) and \
self.rhs[self.s_goal] == self.g[self.s_goal]:
if self.g[s] > self.rhs[s]:
# Condition: over-consistent (eg: deleted obstacles)
# So, rhs[s] decreased -- > rhs[s] < g[s]
self.g[s] = self.rhs[s]
# Condition: # under-consistent (eg: added obstacles)
# So, rhs[s] increased --> rhs[s] > g[s]
self.g[s] = float("inf")
for s_n in self.get_neighbor(s):
def UpdateVertex(self, s):
update the status and the current cost to come of state s.
:param s: state s
if s != self.s_start:
# Condition: cost of parent of s changed
# Since we do not record the children of a state, we need to enumerate its neighbors
self.rhs[s] = min(self.g[s_n] + self.cost(s_n, s)
for s_n in self.get_neighbor(s))
if s in self.U:
if self.g[s] != self.rhs[s]:
# Condition: current cost to come is different to that of last time
# state s should be added into OPEN set (set U)
self.U[s] = self.CalculateKey(s)
def TopKey(self):
:return: return the min key and its value.
s = min(self.U, key=self.U.get)
return s, self.U[s]
def CalculateKey(self, s):
return [min(self.g[s], self.rhs[s]) + self.h(s),
min(self.g[s], self.rhs[s])]
def get_neighbor(self, s):
find neighbors of state s that not in obstacles.
:param s: state
:return: neighbors
s_list = set()
for u in self.u_set:
s_next = tuple([s[i] + u[i] for i in range(2)])
if s_next not in self.obs:
return s_list
def h(self, s):
Calculate heuristic.
:param s: current node (state)
:return: heuristic function value
heuristic_type = self.heuristic_type # heuristic type
goal = self.s_goal # goal node
if heuristic_type == "manhattan":
return abs(goal[0] - s[0]) + abs(goal[1] - s[1])
return math.hypot(goal[0] - s[0], goal[1] - s[1])
def cost(self, s_start, s_goal):
Calculate Cost for this motion
:param s_start: starting node
:param s_goal: end node
:return: Cost for this motion
:note: Cost function could be more complicate!
if self.is_collision(s_start, s_goal):
return float("inf")
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
def is_collision(self, s_start, s_end):
if s_start in self.obs or s_end in self.obs:
return True
if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
if s1 in self.obs or s2 in self.obs:
return True
return False
def extract_path(self):
Extract the path based on the PARENT set.
:return: The planning path
path = [self.s_goal]
s = self.s_goal
for k in range(100):
g_list = {}
for x in self.get_neighbor(s):
if not self.is_collision(s, x):
g_list[x] = self.g[x]
s = min(g_list, key=g_list.get)
if s == self.s_start:
return list(reversed(path))
def plot_path(self, path):
px = [x[0] for x in path]
py = [x[1] for x in path]
plt.plot(px, py, linewidth=2)
plt.plot(self.s_start[0], self.s_start[1], "bs")
plt.plot(self.s_goal[0], self.s_goal[1], "gs")
def plot_visited(self, visited):
color = ['gainsboro', 'lightgray', 'silver', 'darkgray',
'bisque', 'navajowhite', 'moccasin', 'wheat',
'powderblue', 'skyblue', 'lightskyblue', 'cornflowerblue']
if self.count >= len(color) - 1:
self.count = 0
for x in visited:
plt.plot(x[0], x[1], marker='s', color=color[self.count])
def main():
x_start = (5, 5)
x_goal = (45, 25)
lpastar = LPAStar(x_start, x_goal, "Euclidean")
if __name__ == '__main__':
@ -1,230 +0,0 @@
LRTA_star 2D (Learning Real-time A*)
@author: huiming zhou
import os
import sys
import copy
import math
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
from Search_2D import queue, plotting, env
class LrtAStarN:
def __init__(self, s_start, s_goal, N, heuristic_type):
self.s_start, self.s_goal = s_start, s_goal
self.heuristic_type = heuristic_type
self.Env = env.Env()
self.u_set = self.Env.motions # feasible input set
self.obs = self.Env.obs # position of obstacles
self.N = N # number of expand nodes each iteration
self.visited = [] # order of visited nodes in planning
self.path = [] # path of each iteration
self.h_table = {} # h_value table
def init(self):
initialize the h_value of all nodes in the environment.
it is a global table.
for i in range(self.Env.x_range):
for j in range(self.Env.y_range):
self.h_table[(i, j)] = self.h((i, j))
def searching(self):
s_start = self.s_start # initialize start node
while True:
OPEN, CLOSED = self.AStar(s_start, self.N) # OPEN, CLOSED sets in each iteration
if OPEN == "FOUND": # reach the goal node
h_value = self.iteration(CLOSED) # h_value table of CLOSED nodes
for x in h_value:
self.h_table[x] = h_value[x]
s_start, path_k = self.extract_path_in_CLOSE(s_start, h_value) # x_init -> expected node in OPEN set
def extract_path_in_CLOSE(self, s_start, h_value):
path = [s_start]
s = s_start
while True:
h_list = {}
for s_n in self.get_neighbor(s):
if s_n in h_value:
h_list[s_n] = h_value[s_n]
h_list[s_n] = self.h_table[s_n]
s_key = min(h_list, key=h_list.get) # move to the smallest node with min h_value
path.append(s_key) # generate path
s = s_key # use end of this iteration as the start of next
if s_key not in h_value: # reach the expected node in OPEN set
return s_key, path
def iteration(self, CLOSED):
h_value = {}
for s in CLOSED:
h_value[s] = float("inf") # initialize h_value of CLOSED nodes
while True:
h_value_rec = copy.deepcopy(h_value)
for s in CLOSED:
h_list = []
for s_n in self.get_neighbor(s):
if s_n not in CLOSED:
h_list.append(self.cost(s, s_n) + self.h_table[s_n])
h_list.append(self.cost(s, s_n) + h_value[s_n])
h_value[s] = min(h_list) # update h_value of current node
if h_value == h_value_rec: # h_value table converged
return h_value
def AStar(self, x_start, N):
OPEN = queue.QueuePrior() # OPEN set
OPEN.put(x_start, self.h(x_start))
CLOSED = [] # CLOSED set
g_table = {x_start: 0, self.s_goal: float("inf")} # Cost to come
PARENT = {x_start: x_start} # relations
count = 0 # counter
while not OPEN.empty():
count += 1
s = OPEN.get()
if s == self.s_goal: # reach the goal node
return "FOUND", self.extract_path(x_start, PARENT)
for s_n in self.get_neighbor(s):
if s_n not in CLOSED:
new_cost = g_table[s] + self.cost(s, s_n)
if s_n not in g_table:
g_table[s_n] = float("inf")
if new_cost < g_table[s_n]: # conditions for updating Cost
g_table[s_n] = new_cost
PARENT[s_n] = s
OPEN.put(s_n, g_table[s_n] + self.h_table[s_n])
if count == N: # expand needed CLOSED nodes
self.visited.append(CLOSED) # visited nodes in each iteration
def get_neighbor(self, s):
find neighbors of state s that not in obstacles.
:param s: state
:return: neighbors
s_list = []
for u in self.u_set:
s_next = tuple([s[i] + u[i] for i in range(2)])
if s_next not in self.obs:
return s_list
def extract_path(self, x_start, parent):
Extract the path based on the relationship of nodes.
:return: The planning path
path_back = [self.s_goal]
x_current = self.s_goal
while True:
x_current = parent[x_current]
if x_current == x_start:
return list(reversed(path_back))
def h(self, s):
Calculate heuristic.
:param s: current node (state)
:return: heuristic function value
heuristic_type = self.heuristic_type # heuristic type
goal = self.s_goal # goal node
if heuristic_type == "manhattan":
return abs(goal[0] - s[0]) + abs(goal[1] - s[1])
return math.hypot(goal[0] - s[0], goal[1] - s[1])
def cost(self, s_start, s_goal):
Calculate Cost for this motion
:param s_start: starting node
:param s_goal: end node
:return: Cost for this motion
:note: Cost function could be more complicate!
if self.is_collision(s_start, s_goal):
return float("inf")
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
def is_collision(self, s_start, s_end):
if s_start in self.obs or s_end in self.obs:
return True
if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
if s1 in self.obs or s2 in self.obs:
return True
return False
def main():
s_start = (10, 5)
s_goal = (45, 25)
lrta = LrtAStarN(s_start, s_goal, 250, "euclidean")
plot = plotting.Plotting(s_start, s_goal)
plot.animation_lrta(lrta.path, lrta.visited,
"Learning Real-time A* (LRTA*)")
if __name__ == '__main__':
@ -1,237 +0,0 @@
RTAAstar 2D (Real-time Adaptive A*)
@author: huiming zhou
import os
import sys
import copy
import math
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
from Search_2D import queue, plotting, env
class RTAAStar:
def __init__(self, s_start, s_goal, N, heuristic_type):
self.s_start, self.s_goal = s_start, s_goal
self.heuristic_type = heuristic_type
self.Env = env.Env()
self.u_set = self.Env.motions # feasible input set
self.obs = self.Env.obs # position of obstacles
self.N = N # number of expand nodes each iteration
self.visited = [] # order of visited nodes in planning
self.path = [] # path of each iteration
self.h_table = {} # h_value table
def init(self):
initialize the h_value of all nodes in the environment.
it is a global table.
for i in range(self.Env.x_range):
for j in range(self.Env.y_range):
self.h_table[(i, j)] = self.h((i, j))
def searching(self):
s_start = self.s_start # initialize start node
while True:
OPEN, CLOSED, g_table, PARENT = \
self.Astar(s_start, self.N)
if OPEN == "FOUND": # reach the goal node
s_next, h_value = self.cal_h_value(OPEN, CLOSED, g_table, PARENT)
for x in h_value:
self.h_table[x] = h_value[x]
s_start, path_k = self.extract_path_in_CLOSE(s_start, s_next, h_value)
def cal_h_value(self, OPEN, CLOSED, g_table, PARENT):
v_open = {}
h_value = {}
for (_, x) in OPEN.enumerate():
v_open[x] = g_table[PARENT[x]] + 1 + self.h_table[x]
s_open = min(v_open, key=v_open.get)
f_min = v_open[s_open]
for x in CLOSED:
h_value[x] = f_min - g_table[x]
return s_open, h_value
def iteration(self, CLOSED):
h_value = {}
for s in CLOSED:
h_value[s] = float("inf") # initialize h_value of CLOSED nodes
while True:
h_value_rec = copy.deepcopy(h_value)
for s in CLOSED:
h_list = []
for s_n in self.get_neighbor(s):
if s_n not in CLOSED:
h_list.append(self.cost(s, s_n) + self.h_table[s_n])
h_list.append(self.cost(s, s_n) + h_value[s_n])
h_value[s] = min(h_list) # update h_value of current node
if h_value == h_value_rec: # h_value table converged
return h_value
def Astar(self, x_start, N):
OPEN = queue.QueuePrior() # OPEN set
OPEN.put(x_start, self.h_table[x_start])
CLOSED = [] # CLOSED set
g_table = {x_start: 0, self.s_goal: float("inf")} # Cost to come
PARENT = {x_start: x_start} # relations
count = 0 # counter
while not OPEN.empty():
count += 1
s = OPEN.get()
if s == self.s_goal: # reach the goal node
return "FOUND", self.extract_path(x_start, PARENT), [], []
for s_n in self.get_neighbor(s):
if s_n not in CLOSED:
new_cost = g_table[s] + self.cost(s, s_n)
if s_n not in g_table:
g_table[s_n] = float("inf")
if new_cost < g_table[s_n]: # conditions for updating Cost
g_table[s_n] = new_cost
PARENT[s_n] = s
OPEN.put(s_n, g_table[s_n] + self.h_table[s_n])
if count == N: # expand needed CLOSED nodes
self.visited.append(CLOSED) # visited nodes in each iteration
return OPEN, CLOSED, g_table, PARENT
def get_neighbor(self, s):
find neighbors of state s that not in obstacles.
:param s: state
:return: neighbors
s_list = set()
for u in self.u_set:
s_next = tuple([s[i] + u[i] for i in range(2)])
if s_next not in self.obs:
return s_list
def extract_path_in_CLOSE(self, s_end, s_start, h_value):
path = [s_start]
s = s_start
while True:
h_list = {}
for s_n in self.get_neighbor(s):
if s_n in h_value:
h_list[s_n] = h_value[s_n]
s_key = max(h_list, key=h_list.get) # move to the smallest node with min h_value
path.append(s_key) # generate path
s = s_key # use end of this iteration as the start of next
if s_key == s_end: # reach the expected node in OPEN set
return s_start, list(reversed(path))
def extract_path(self, x_start, parent):
Extract the path based on the relationship of nodes.
:return: The planning path
path = [self.s_goal]
s = self.s_goal
while True:
s = parent[s]
if s == x_start:
return list(reversed(path))
def h(self, s):
Calculate heuristic.
:param s: current node (state)
:return: heuristic function value
heuristic_type = self.heuristic_type # heuristic type
goal = self.s_goal # goal node
if heuristic_type == "manhattan":
return abs(goal[0] - s[0]) + abs(goal[1] - s[1])
return math.hypot(goal[0] - s[0], goal[1] - s[1])
def cost(self, s_start, s_goal):
Calculate Cost for this motion
:param s_start: starting node
:param s_goal: end node
:return: Cost for this motion
:note: Cost function could be more complicate!
if self.is_collision(s_start, s_goal):
return float("inf")
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
def is_collision(self, s_start, s_end):
if s_start in self.obs or s_end in self.obs:
return True
if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
if s1 in self.obs or s2 in self.obs:
return True
return False
def main():
s_start = (10, 5)
s_goal = (45, 25)
rtaa = RTAAStar(s_start, s_goal, 240, "euclidean")
plot = plotting.Plotting(s_start, s_goal)
plot.animation_lrta(rtaa.path, rtaa.visited,
"Real-time Adaptive A* (RTAA*)")
if __name__ == '__main__':
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -1,69 +0,0 @@
Breadth-first Searching_2D (BFS)
@author: huiming zhou
import os
import sys
from collections import deque
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
from Search_2D import plotting, env
from Search_2D.Astar import AStar
import math
import heapq
class BFS(AStar):
"""BFS add the new visited node in the end of the openset
def searching(self):
Breadth-first Searching.
:return: path, visited order
self.PARENT[self.s_start] = self.s_start
self.g[self.s_start] = 0
self.g[self.s_goal] = math.inf
(0, self.s_start))
while self.OPEN:
_, s = heapq.heappop(self.OPEN)
if s == self.s_goal:
for s_n in self.get_neighbor(s):
new_cost = self.g[s] + self.cost(s, s_n)
if s_n not in self.g:
self.g[s_n] = math.inf
if new_cost < self.g[s_n]: # conditions for updating Cost
self.g[s_n] = new_cost
self.PARENT[s_n] = s
# bfs, add new node to the end of the openset
prior = self.OPEN[-1][0]+1 if len(self.OPEN)>0 else 0
heapq.heappush(self.OPEN, (prior, s_n))
return self.extract_path(self.PARENT), self.CLOSED
def main():
s_start = (5, 5)
s_goal = (45, 25)
bfs = BFS(s_start, s_goal, 'None')
plot = plotting.Plotting(s_start, s_goal)
path, visited = bfs.searching()
plot.animation(path, visited, "Breadth-first Searching (BFS)")
if __name__ == '__main__':
@ -1,65 +0,0 @@
import os
import sys
import math
import heapq
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
from Search_2D import plotting, env
from Search_2D.Astar import AStar
class DFS(AStar):
"""DFS add the new visited node in the front of the openset
def searching(self):
Breadth-first Searching.
:return: path, visited order
self.PARENT[self.s_start] = self.s_start
self.g[self.s_start] = 0
self.g[self.s_goal] = math.inf
(0, self.s_start))
while self.OPEN:
_, s = heapq.heappop(self.OPEN)
if s == self.s_goal:
for s_n in self.get_neighbor(s):
new_cost = self.g[s] + self.cost(s, s_n)
if s_n not in self.g:
self.g[s_n] = math.inf
if new_cost < self.g[s_n]: # conditions for updating Cost
self.g[s_n] = new_cost
self.PARENT[s_n] = s
# dfs, add new node to the front of the openset
prior = self.OPEN[0][0]-1 if len(self.OPEN)>0 else 0
heapq.heappush(self.OPEN, (prior, s_n))
return self.extract_path(self.PARENT), self.CLOSED
def main():
s_start = (5, 5)
s_goal = (45, 25)
dfs = DFS(s_start, s_goal, 'None')
plot = plotting.Plotting(s_start, s_goal)
path, visited = dfs.searching()
visited = list(dict.fromkeys(visited))
plot.animation(path, visited, "Depth-first Searching (DFS)") # animation
if __name__ == '__main__':
@ -1,52 +0,0 @@
Env 2D
@author: huiming zhou
class Env:
def __init__(self):
self.x_range = 51 # size of background
self.y_range = 31
self.motions = [(-1, 0), (-1, 1), (0, 1), (1, 1),
(1, 0), (1, -1), (0, -1), (-1, -1)]
self.obs = self.obs_map()
def update_obs(self, obs):
self.obs = obs
def obs_map(self):
Initialize obstacles' positions
:return: map of obstacles
x = self.x_range #51
y = self.y_range #31
obs = set()
for i in range(x):
obs.add((i, 0))
for i in range(x):
obs.add((i, y - 1))
for i in range(y):
obs.add((0, i))
for i in range(y):
obs.add((x - 1, i))
for i in range(2, 21):
obs.add((i, 15))
for i in range(15):
obs.add((20, i))
for i in range(15, 30):
obs.add((30, i))
for i in range(16):
obs.add((40, i))
return obs
# if __name__ == '__main__':
# a = Env()
# print(a.obs)
@ -1,165 +0,0 @@
Plot tools 2D
@author: huiming zhou
import os
import sys
import matplotlib.pyplot as plt
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
from Search_2D import env
class Plotting:
def __init__(self, xI, xG):
self.xI, self.xG = xI, xG
self.env = env.Env()
self.obs = self.env.obs_map()
def update_obs(self, obs):
self.obs = obs
def animation(self, path, visited, name):
def animation_lrta(self, path, visited, name):
cl = self.color_list_2()
path_combine = []
for k in range(len(path)):
self.plot_visited(visited[k], cl[k])
path_combine += path[k]
if self.xI in path_combine:
def animation_ara_star(self, path, visited, name):
cl_v, cl_p = self.color_list()
for k in range(len(path)):
self.plot_visited(visited[k], cl_v[k])
self.plot_path(path[k], cl_p[k], True)
def animation_bi_astar(self, path, v_fore, v_back, name):
self.plot_visited_bi(v_fore, v_back)
def plot_grid(self, name):
obs_x = [x[0] for x in self.obs]
obs_y = [x[1] for x in self.obs]
plt.plot(self.xI[0], self.xI[1], "bs")
plt.plot(self.xG[0], self.xG[1], "gs")
plt.plot(obs_x, obs_y, "sk")
def plot_visited(self, visited, cl='gray'):
if self.xI in visited:
if self.xG in visited:
count = 0
for x in visited:
count += 1
plt.plot(x[0], x[1], color=cl, marker='o')
lambda event: [exit(0) if event.key == 'escape' else None])
if count < len(visited) / 3:
length = 20
elif count < len(visited) * 2 / 3:
length = 30
length = 40
# length = 15
if count % length == 0:
def plot_path(self, path, cl='r', flag=False):
path_x = [path[i][0] for i in range(len(path))]
path_y = [path[i][1] for i in range(len(path))]
if not flag:
plt.plot(path_x, path_y, linewidth='3', color='r')
plt.plot(path_x, path_y, linewidth='3', color=cl)
plt.plot(self.xI[0], self.xI[1], "bs")
plt.plot(self.xG[0], self.xG[1], "gs")
def plot_visited_bi(self, v_fore, v_back):
if self.xI in v_fore:
if self.xG in v_back:
len_fore, len_back = len(v_fore), len(v_back)
for k in range(max(len_fore, len_back)):
if k < len_fore:
plt.plot(v_fore[k][0], v_fore[k][1], linewidth='3', color='gray', marker='o')
if k < len_back:
plt.plot(v_back[k][0], v_back[k][1], linewidth='3', color='cornflowerblue', marker='o')
lambda event: [exit(0) if event.key == 'escape' else None])
if k % 10 == 0:
def color_list():
cl_v = ['silver',
cl_p = ['gray',
return cl_v, cl_p
def color_list_2():
cl = ['silver',
return cl
@ -1,62 +0,0 @@
import collections
import heapq
class QueueFIFO:
Class: QueueFIFO
Description: QueueFIFO is designed for First-in-First-out rule.
def __init__(self):
self.queue = collections.deque()
def empty(self):
return len(self.queue) == 0
def put(self, node):
self.queue.append(node) # enter from back
def get(self):
return self.queue.popleft() # leave from front
class QueueLIFO:
Class: QueueLIFO
Description: QueueLIFO is designed for Last-in-First-out rule.
def __init__(self):
self.queue = collections.deque()
def empty(self):
return len(self.queue) == 0
def put(self, node):
self.queue.append(node) # enter from back
def get(self):
return self.queue.pop() # leave from back
class QueuePrior:
Class: QueuePrior
Description: QueuePrior reorders elements using value [priority]
def __init__(self):
self.queue = []
def empty(self):
return len(self.queue) == 0
def put(self, item, priority):
heapq.heappush(self.queue, (priority, item)) # reorder s using priority
def get(self):
return heapq.heappop(self.queue)[1] # pop out the smallest item
def enumerate(self):
return self.queue
Reference in new issue