提交源码

pull/13/head
wangziyang 3 years ago
parent 1f8cedbdb0
commit 5710bc19e8

@ -0,0 +1,3 @@
# 默认忽略的文件
/shelf/
/workspace.xml

@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyDocumentationSettings">
<option name="format" value="PLAIN" />
<option name="myDocStringFormat" value="Plain" />
</component>
</module>

@ -0,0 +1,15 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyCompatibilityInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ourVersions">
<value>
<list size="2">
<item index="0" class="java.lang.String" itemvalue="3.7" />
<item index="1" class="java.lang.String" itemvalue="3.8" />
</list>
</value>
</option>
</inspection_tool>
</profile>
</component>

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.9" project-jdk-type="Python SDK" />
</project>

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/Search_2D.iml" filepath="$PROJECT_DIR$/.idea/Search_2D.iml" />
</modules>
</component>
</project>

@ -0,0 +1,222 @@
"""
ARA_star 2D (Anytime Repairing A*)
@author: huiming zhou
@description: local inconsistency: g-value decreased.
g(s) decreased introduces a local inconsistency between s and its successors.
"""
import os
import sys
import math
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
"/../../Search_based_Planning/")
from Search_2D import plotting, env
class AraStar:
def __init__(self, s_start, s_goal, e, heuristic_type):
self.s_start, self.s_goal = s_start, s_goal
self.heuristic_type = heuristic_type
self.Env = env.Env() # class Env
self.u_set = self.Env.motions # feasible input set
self.obs = self.Env.obs # position of obstacles
self.e = e # weight
self.g = dict() # Cost to come
self.OPEN = dict() # priority queue / OPEN set
self.CLOSED = set() # CLOSED set
self.INCONS = {} # INCONSISTENT set
self.PARENT = dict() # relations
self.path = [] # planning path
self.visited = [] # order of visited nodes
def init(self):
"""
initialize each set.
"""
self.g[self.s_start] = 0.0
self.g[self.s_goal] = math.inf
self.OPEN[self.s_start] = self.f_value(self.s_start)
self.PARENT[self.s_start] = self.s_start
def searching(self):
self.init()
self.ImprovePath()
self.path.append(self.extract_path())
while self.update_e() > 1: # continue condition
self.e -= 0.4 # increase weight
self.OPEN.update(self.INCONS)
self.OPEN = {s: self.f_value(s) for s in self.OPEN} # update f_value of OPEN set
self.INCONS = dict()
self.CLOSED = set()
self.ImprovePath() # improve path
self.path.append(self.extract_path())
return self.path, self.visited
def ImprovePath(self):
"""
:return: a e'-suboptimal path
"""
visited_each = []
while True:
s, f_small = self.calc_smallest_f()
if self.f_value(self.s_goal) <= f_small:
break
self.OPEN.pop(s)
self.CLOSED.add(s)
for s_n in self.get_neighbor(s):
if s_n in self.obs:
continue
new_cost = self.g[s] + self.cost(s, s_n)
if s_n not in self.g or new_cost < self.g[s_n]:
self.g[s_n] = new_cost
self.PARENT[s_n] = s
visited_each.append(s_n)
if s_n not in self.CLOSED:
self.OPEN[s_n] = self.f_value(s_n)
else:
self.INCONS[s_n] = 0.0
self.visited.append(visited_each)
def calc_smallest_f(self):
"""
:return: node with smallest f_value in OPEN set.
"""
s_small = min(self.OPEN, key=self.OPEN.get)
return s_small, self.OPEN[s_small]
def get_neighbor(self, s):
"""
find neighbors of state s that not in obstacles.
:param s: state
:return: neighbors
"""
return {(s[0] + u[0], s[1] + u[1]) for u in self.u_set}
def update_e(self):
v = float("inf")
if self.OPEN:
v = min(self.g[s] + self.h(s) for s in self.OPEN)
if self.INCONS:
v = min(v, min(self.g[s] + self.h(s) for s in self.INCONS))
return min(self.e, self.g[self.s_goal] / v)
def f_value(self, x):
"""
f = g + e * h
f = cost-to-come + weight * cost-to-go
:param x: current state
:return: f_value
"""
return self.g[x] + self.e * self.h(x)
def extract_path(self):
"""
Extract the path based on the PARENT set.
:return: The planning path
"""
path = [self.s_goal]
s = self.s_goal
while True:
s = self.PARENT[s]
path.append(s)
if s == self.s_start:
break
return list(path)
def h(self, s):
"""
Calculate heuristic.
:param s: current node (state)
:return: heuristic function value
"""
heuristic_type = self.heuristic_type # heuristic type
goal = self.s_goal # goal node
if heuristic_type == "manhattan":
return abs(goal[0] - s[0]) + abs(goal[1] - s[1])
else:
return math.hypot(goal[0] - s[0], goal[1] - s[1])
def cost(self, s_start, s_goal):
"""
Calculate Cost for this motion
:param s_start: starting node
:param s_goal: end node
:return: Cost for this motion
:note: Cost function could be more complicate!
"""
if self.is_collision(s_start, s_goal):
return math.inf
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
def is_collision(self, s_start, s_end):
"""
check if the line segment (s_start, s_end) is collision.
:param s_start: start node
:param s_end: end node
:return: True: is collision / False: not collision
"""
if s_start in self.obs or s_end in self.obs:
return True
if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
else:
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
if s1 in self.obs or s2 in self.obs:
return True
return False
def main():
s_start = (5, 5)
s_goal = (45, 25)
arastar = AraStar(s_start, s_goal, 2.5, "euclidean")
plot = plotting.Plotting(s_start, s_goal)
path, visited = arastar.searching()
plot.animation_ara_star(path, visited, "Anytime Repairing A* (ARA*)")
if __name__ == '__main__':
main()

@ -0,0 +1,317 @@
"""
Anytime_D_star 2D
@author: huiming zhou
"""
import os
import sys
import math
import matplotlib.pyplot as plt
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
"/../../Search_based_Planning/")
from Search_2D import plotting
from Search_2D import env
class ADStar:
def __init__(self, s_start, s_goal, eps, heuristic_type):
self.s_start, self.s_goal = s_start, s_goal
self.heuristic_type = heuristic_type
self.Env = env.Env() # class Env
self.Plot = plotting.Plotting(s_start, s_goal)
self.u_set = self.Env.motions # feasible input set
self.obs = self.Env.obs # position of obstacles
self.x = self.Env.x_range
self.y = self.Env.y_range
self.g, self.rhs, self.OPEN = {}, {}, {}
for i in range(1, self.Env.x_range - 1):
for j in range(1, self.Env.y_range - 1):
self.rhs[(i, j)] = float("inf")
self.g[(i, j)] = float("inf")
self.rhs[self.s_goal] = 0.0
self.eps = eps
self.OPEN[self.s_goal] = self.Key(self.s_goal)
self.CLOSED, self.INCONS = set(), dict()
self.visited = set()
self.count = 0
self.count_env_change = 0
self.obs_add = set()
self.obs_remove = set()
self.title = "Anytime D*: Small changes" # Significant changes
self.fig = plt.figure()
def run(self):
self.Plot.plot_grid(self.title)
self.ComputeOrImprovePath()
self.plot_visited()
self.plot_path(self.extract_path())
self.visited = set()
while True:
if self.eps <= 1.0:
break
self.eps -= 0.5
self.OPEN.update(self.INCONS)
for s in self.OPEN:
self.OPEN[s] = self.Key(s)
self.CLOSED = set()
self.ComputeOrImprovePath()
self.plot_visited()
self.plot_path(self.extract_path())
self.visited = set()
plt.pause(0.5)
self.fig.canvas.mpl_connect('button_press_event', self.on_press)
plt.show()
def on_press(self, event):
x, y = event.xdata, event.ydata
if x < 0 or x > self.x - 1 or y < 0 or y > self.y - 1:
print("Please choose right area!")
else:
self.count_env_change += 1
x, y = int(x), int(y)
print("Change position: s =", x, ",", "y =", y)
# for small changes
if self.title == "Anytime D*: Small changes":
if (x, y) not in self.obs:
self.obs.add((x, y))
self.g[(x, y)] = float("inf")
self.rhs[(x, y)] = float("inf")
else:
self.obs.remove((x, y))
self.UpdateState((x, y))
self.Plot.update_obs(self.obs)
for sn in self.get_neighbor((x, y)):
self.UpdateState(sn)
plt.cla()
self.Plot.plot_grid(self.title)
while True:
if len(self.INCONS) == 0:
break
self.OPEN.update(self.INCONS)
for s in self.OPEN:
self.OPEN[s] = self.Key(s)
self.CLOSED = set()
self.ComputeOrImprovePath()
self.plot_visited()
self.plot_path(self.extract_path())
# plt.plot(self.title)
self.visited = set()
if self.eps <= 1.0:
break
else:
if (x, y) not in self.obs:
self.obs.add((x, y))
self.obs_add.add((x, y))
plt.plot(x, y, 'sk')
if (x, y) in self.obs_remove:
self.obs_remove.remove((x, y))
else:
self.obs.remove((x, y))
self.obs_remove.add((x, y))
plt.plot(x, y, marker='s', color='white')
if (x, y) in self.obs_add:
self.obs_add.remove((x, y))
self.Plot.update_obs(self.obs)
if self.count_env_change >= 15:
self.count_env_change = 0
self.eps += 2.0
for s in self.obs_add:
self.g[(x, y)] = float("inf")
self.rhs[(x, y)] = float("inf")
for sn in self.get_neighbor(s):
self.UpdateState(sn)
for s in self.obs_remove:
for sn in self.get_neighbor(s):
self.UpdateState(sn)
self.UpdateState(s)
plt.cla()
self.Plot.plot_grid(self.title)
while True:
if self.eps <= 1.0:
break
self.eps -= 0.5
self.OPEN.update(self.INCONS)
for s in self.OPEN:
self.OPEN[s] = self.Key(s)
self.CLOSED = set()
self.ComputeOrImprovePath()
self.plot_visited()
self.plot_path(self.extract_path())
plt.title(self.title)
self.visited = set()
plt.pause(0.5)
self.fig.canvas.draw_idle()
def ComputeOrImprovePath(self):
while True:
s, v = self.TopKey()
if v >= self.Key(self.s_start) and \
self.rhs[self.s_start] == self.g[self.s_start]:
break
self.OPEN.pop(s)
self.visited.add(s)
if self.g[s] > self.rhs[s]:
self.g[s] = self.rhs[s]
self.CLOSED.add(s)
for sn in self.get_neighbor(s):
self.UpdateState(sn)
else:
self.g[s] = float("inf")
for sn in self.get_neighbor(s):
self.UpdateState(sn)
self.UpdateState(s)
def UpdateState(self, s):
if s != self.s_goal:
self.rhs[s] = float("inf")
for x in self.get_neighbor(s):
self.rhs[s] = min(self.rhs[s], self.g[x] + self.cost(s, x))
if s in self.OPEN:
self.OPEN.pop(s)
if self.g[s] != self.rhs[s]:
if s not in self.CLOSED:
self.OPEN[s] = self.Key(s)
else:
self.INCONS[s] = 0
def Key(self, s):
if self.g[s] > self.rhs[s]:
return [self.rhs[s] + self.eps * self.h(self.s_start, s), self.rhs[s]]
else:
return [self.g[s] + self.h(self.s_start, s), self.g[s]]
def TopKey(self):
"""
:return: return the min key and its value.
"""
s = min(self.OPEN, key=self.OPEN.get)
return s, self.OPEN[s]
def h(self, s_start, s_goal):
heuristic_type = self.heuristic_type # heuristic type
if heuristic_type == "manhattan":
return abs(s_goal[0] - s_start[0]) + abs(s_goal[1] - s_start[1])
else:
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
def cost(self, s_start, s_goal):
"""
Calculate Cost for this motion
:param s_start: starting node
:param s_goal: end node
:return: Cost for this motion
:note: Cost function could be more complicate!
"""
if self.is_collision(s_start, s_goal):
return float("inf")
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
def is_collision(self, s_start, s_end):
if s_start in self.obs or s_end in self.obs:
return True
if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
else:
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
if s1 in self.obs or s2 in self.obs:
return True
return False
def get_neighbor(self, s):
nei_list = set()
for u in self.u_set:
s_next = tuple([s[i] + u[i] for i in range(2)])
if s_next not in self.obs:
nei_list.add(s_next)
return nei_list
def extract_path(self):
"""
Extract the path based on the PARENT set.
:return: The planning path
"""
path = [self.s_start]
s = self.s_start
for k in range(100):
g_list = {}
for x in self.get_neighbor(s):
if not self.is_collision(s, x):
g_list[x] = self.g[x]
s = min(g_list, key=g_list.get)
path.append(s)
if s == self.s_goal:
break
return list(path)
def plot_path(self, path):
px = [x[0] for x in path]
py = [x[1] for x in path]
plt.plot(px, py, linewidth=2)
plt.plot(self.s_start[0], self.s_start[1], "bs")
plt.plot(self.s_goal[0], self.s_goal[1], "gs")
def plot_visited(self):
self.count += 1
color = ['gainsboro', 'lightgray', 'silver', 'darkgray',
'bisque', 'navajowhite', 'moccasin', 'wheat',
'powderblue', 'skyblue', 'lightskyblue', 'cornflowerblue']
if self.count >= len(color) - 1:
self.count = 0
for x in self.visited:
plt.plot(x[0], x[1], marker='s', color=color[self.count])
def main():
s_start = (5, 5)
s_goal = (45, 25)
dstar = ADStar(s_start, s_goal, 2.5, "euclidean")
dstar.run()
if __name__ == '__main__':
main()

@ -0,0 +1,224 @@
"""
A_star 2D
@author: huiming zhou
"""
import os
import sys
import math
import heapq
sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../PaddleClas-release-2.3")
from Search_2D import plotting, env
class AStar:
"""AStar set the cost + heuristics as the priority
"""
def __init__(self, s_start, s_goal, heuristic_type):
self.s_start = s_start
self.s_goal = s_goal
self.heuristic_type = heuristic_type
self.Env = env.Env() # class Env
self.u_set = self.Env.motions # feasible input set
self.obs = self.Env.obs # position of obstacles
self.OPEN = [] # priority queue / OPEN set
self.CLOSED = [] # CLOSED set / VISITED order
self.PARENT = dict() # recorded parent
self.g = dict() # cost to come
def searching(self):
"""
A_star Searching.
:return: path, visited order
"""
self.PARENT[self.s_start] = self.s_start
self.g[self.s_start] = 0
self.g[self.s_goal] = math.inf
heapq.heappush(self.OPEN,
(self.f_value(self.s_start), self.s_start))
while self.OPEN:
_, s = heapq.heappop(self.OPEN)
self.CLOSED.append(s)
if s == self.s_goal: # stop condition
break
for s_n in self.get_neighbor(s):
new_cost = self.g[s] + self.cost(s, s_n)
if s_n not in self.g:
self.g[s_n] = math.inf
if new_cost < self.g[s_n]: # conditions for updating Cost
self.g[s_n] = new_cost
self.PARENT[s_n] = s
heapq.heappush(self.OPEN, (self.f_value(s_n), s_n))
return self.extract_path(self.PARENT), self.CLOSED
def searching_repeated_astar(self, e):
"""
repeated A*.
:param e: weight of A*
:return: path and visited order
"""
path, visited = [], []
while e >= 1:
p_k, v_k = self.repeated_searching(self.s_start, self.s_goal, e)
path.append(p_k)
visited.append(v_k)
e -= 0.5
return path, visited
def repeated_searching(self, s_start, s_goal, e):
"""
run A* with weight e.
:param s_start: starting state
:param s_goal: goal state
:param e: weight of a*
:return: path and visited order.
"""
g = {s_start: 0, s_goal: float("inf")}
PARENT = {s_start: s_start}
OPEN = []
CLOSED = []
heapq.heappush(OPEN,
(g[s_start] + e * self.heuristic(s_start), s_start))
while OPEN:
_, s = heapq.heappop(OPEN)
CLOSED.append(s)
if s == s_goal:
break
for s_n in self.get_neighbor(s):
new_cost = g[s] + self.cost(s, s_n)
if s_n not in g:
g[s_n] = math.inf
if new_cost < g[s_n]: # conditions for updating Cost
g[s_n] = new_cost
PARENT[s_n] = s
heapq.heappush(OPEN, (g[s_n] + e * self.heuristic(s_n), s_n))
return self.extract_path(PARENT), CLOSED
def get_neighbor(self, s):
"""
find neighbors of state s that not in obstacles.
:param s: state
:return: neighbors
"""
return [(s[0] + u[0], s[1] + u[1]) for u in self.u_set]
def cost(self, s_start, s_goal):
"""
Calculate Cost for this motion
:param s_start: starting node
:param s_goal: end node
:return: Cost for this motion
:note: Cost function could be more complicate!
"""
if self.is_collision(s_start, s_goal):
return math.inf
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
def is_collision(self, s_start, s_end):
"""
check if the line segment (s_start, s_end) is collision.
:param s_start: start node
:param s_end: end node
:return: True: is collision / False: not collision
"""
if s_start in self.obs or s_end in self.obs:
return True
if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
else:
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
if s1 in self.obs or s2 in self.obs:
return True
return False
def f_value(self, s):
"""
f = g + h. (g: Cost to come, h: heuristic value)
:param s: current state
:return: f
"""
return self.g[s] + self.heuristic(s)
def extract_path(self, PARENT):
"""
Extract the path based on the PARENT set.
:return: The planning path
"""
path = [self.s_goal]
s = self.s_goal
while True:
s = PARENT[s]
path.append(s)
if s == self.s_start:
break
return list(path)
def heuristic(self, s):
"""
Calculate heuristic.
:param s: current node (state)
:return: heuristic function value
"""
heuristic_type = self.heuristic_type # heuristic type
goal = self.s_goal # goal node
if heuristic_type == "manhattan":
return abs(goal[0] - s[0]) + abs(goal[1] - s[1])
else:
return math.hypot(goal[0] - s[0], goal[1] - s[1])
def main():
s_start = (5, 5)
s_goal = (45, 25)
astar = AStar(s_start, s_goal, "euclidean")
plot = plotting.Plotting(s_start, s_goal)
path, visited = astar.searching()
plot.animation(path, visited, "A*") # animation
# path, visited = astar.searching_repeated_astar(2.5) # initial weight e = 2.5
# plot.animation_ara_star(path, visited, "Repeated A*")
if __name__ == '__main__':
main()

@ -0,0 +1,68 @@
"""
Best-First Searching
@author: huiming zhou
"""
import os
import sys
import math
import heapq
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
"/../../Search_based_Planning/")
from Search_2D import plotting, env
from Search_2D.Astar import AStar
class BestFirst(AStar):
"""BestFirst set the heuristics as the priority
"""
def searching(self):
"""
Breadth-first Searching.
:return: path, visited order
"""
self.PARENT[self.s_start] = self.s_start
self.g[self.s_start] = 0
self.g[self.s_goal] = math.inf
heapq.heappush(self.OPEN,
(self.heuristic(self.s_start), self.s_start))
while self.OPEN:
_, s = heapq.heappop(self.OPEN)
self.CLOSED.append(s)
if s == self.s_goal:
break
for s_n in self.get_neighbor(s):
new_cost = self.g[s] + self.cost(s, s_n)
if s_n not in self.g:
self.g[s_n] = math.inf
if new_cost < self.g[s_n]: # conditions for updating Cost
self.g[s_n] = new_cost
self.PARENT[s_n] = s
# best first set the heuristics as the priority
heapq.heappush(self.OPEN, (self.heuristic(s_n), s_n))
return self.extract_path(self.PARENT), self.CLOSED
def main():
s_start = (5, 5)
s_goal = (45, 25)
BF = BestFirst(s_start, s_goal, 'euclidean')
plot = plotting.Plotting(s_start, s_goal)
path, visited = BF.searching()
plot.animation(path, visited, "Best-first Searching") # animation
if __name__ == '__main__':
main()

@ -0,0 +1,229 @@
"""
Bidirectional_a_star 2D
@author: huiming zhou
"""
import os
import sys
import math
import heapq
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
"/../../Search_based_Planning/")
from Search_2D import plotting, env
class BidirectionalAStar:
def __init__(self, s_start, s_goal, heuristic_type):
self.s_start = s_start
self.s_goal = s_goal
self.heuristic_type = heuristic_type
self.Env = env.Env() # class Env
self.u_set = self.Env.motions # feasible input set
self.obs = self.Env.obs # position of obstacles
self.OPEN_fore = [] # OPEN set for forward searching
self.OPEN_back = [] # OPEN set for backward searching
self.CLOSED_fore = [] # CLOSED set for forward
self.CLOSED_back = [] # CLOSED set for backward
self.PARENT_fore = dict() # recorded parent for forward
self.PARENT_back = dict() # recorded parent for backward
self.g_fore = dict() # cost to come for forward
self.g_back = dict() # cost to come for backward
def init(self):
"""
initialize parameters
"""
self.g_fore[self.s_start] = 0.0
self.g_fore[self.s_goal] = math.inf
self.g_back[self.s_goal] = 0.0
self.g_back[self.s_start] = math.inf
self.PARENT_fore[self.s_start] = self.s_start
self.PARENT_back[self.s_goal] = self.s_goal
heapq.heappush(self.OPEN_fore,
(self.f_value_fore(self.s_start), self.s_start))
heapq.heappush(self.OPEN_back,
(self.f_value_back(self.s_goal), self.s_goal))
def searching(self):
"""
Bidirectional A*
:return: connected path, visited order of forward, visited order of backward
"""
self.init()
s_meet = self.s_start
while self.OPEN_fore and self.OPEN_back:
# solve foreward-search
_, s_fore = heapq.heappop(self.OPEN_fore)
if s_fore in self.PARENT_back:
s_meet = s_fore
break
self.CLOSED_fore.append(s_fore)
for s_n in self.get_neighbor(s_fore):
new_cost = self.g_fore[s_fore] + self.cost(s_fore, s_n)
if s_n not in self.g_fore:
self.g_fore[s_n] = math.inf
if new_cost < self.g_fore[s_n]:
self.g_fore[s_n] = new_cost
self.PARENT_fore[s_n] = s_fore
heapq.heappush(self.OPEN_fore,
(self.f_value_fore(s_n), s_n))
# solve backward-search
_, s_back = heapq.heappop(self.OPEN_back)
if s_back in self.PARENT_fore:
s_meet = s_back
break
self.CLOSED_back.append(s_back)
for s_n in self.get_neighbor(s_back):
new_cost = self.g_back[s_back] + self.cost(s_back, s_n)
if s_n not in self.g_back:
self.g_back[s_n] = math.inf
if new_cost < self.g_back[s_n]:
self.g_back[s_n] = new_cost
self.PARENT_back[s_n] = s_back
heapq.heappush(self.OPEN_back,
(self.f_value_back(s_n), s_n))
return self.extract_path(s_meet), self.CLOSED_fore, self.CLOSED_back
def get_neighbor(self, s):
"""
find neighbors of state s that not in obstacles.
:param s: state
:return: neighbors
"""
return [(s[0] + u[0], s[1] + u[1]) for u in self.u_set]
def extract_path(self, s_meet):
"""
extract path from start and goal
:param s_meet: meet point of bi-direction a*
:return: path
"""
# extract path for foreward part
path_fore = [s_meet]
s = s_meet
while True:
s = self.PARENT_fore[s]
path_fore.append(s)
if s == self.s_start:
break
# extract path for backward part
path_back = []
s = s_meet
while True:
s = self.PARENT_back[s]
path_back.append(s)
if s == self.s_goal:
break
return list(reversed(path_fore)) + list(path_back)
def f_value_fore(self, s):
"""
forward searching: f = g + h. (g: Cost to come, h: heuristic value)
:param s: current state
:return: f
"""
return self.g_fore[s] + self.h(s, self.s_goal)
def f_value_back(self, s):
"""
backward searching: f = g + h. (g: Cost to come, h: heuristic value)
:param s: current state
:return: f
"""
return self.g_back[s] + self.h(s, self.s_start)
def h(self, s, goal):
"""
Calculate heuristic value.
:param s: current node (state)
:param goal: goal node (state)
:return: heuristic value
"""
heuristic_type = self.heuristic_type
if heuristic_type == "manhattan":
return abs(goal[0] - s[0]) + abs(goal[1] - s[1])
else:
return math.hypot(goal[0] - s[0], goal[1] - s[1])
def cost(self, s_start, s_goal):
"""
Calculate Cost for this motion
:param s_start: starting node
:param s_goal: end node
:return: Cost for this motion
:note: Cost function could be more complicate!
"""
if self.is_collision(s_start, s_goal):
return math.inf
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
def is_collision(self, s_start, s_end):
"""
check if the line segment (s_start, s_end) is collision.
:param s_start: start node
:param s_end: end node
:return: True: is collision / False: not collision
"""
if s_start in self.obs or s_end in self.obs:
return True
if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
else:
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
if s1 in self.obs or s2 in self.obs:
return True
return False
def main():
x_start = (5, 5)
x_goal = (45, 25)
bastar = BidirectionalAStar(x_start, x_goal, "euclidean")
plot = plotting.Plotting(x_start, x_goal)
path, visited_fore, visited_back = bastar.searching()
plot.animation_bi_astar(path, visited_fore, visited_back, "Bidirectional-A*") # animation
if __name__ == '__main__':
main()

@ -0,0 +1,304 @@
"""
D_star 2D
@author: huiming zhou
"""
import os
import sys
import math
import matplotlib.pyplot as plt
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
"/../../Search_based_Planning/")
from Search_2D import plotting, env
class DStar:
def __init__(self, s_start, s_goal):
self.s_start, self.s_goal = s_start, s_goal
self.Env = env.Env()
self.Plot = plotting.Plotting(self.s_start, self.s_goal)
self.u_set = self.Env.motions
self.obs = self.Env.obs
self.x = self.Env.x_range
self.y = self.Env.y_range
self.fig = plt.figure()
self.OPEN = set()
self.t = dict()
self.PARENT = dict()
self.h = dict()
self.k = dict()
self.path = []
self.visited = set()
self.count = 0
def init(self):
for i in range(self.Env.x_range):
for j in range(self.Env.y_range):
self.t[(i, j)] = 'NEW'
self.k[(i, j)] = 0.0
self.h[(i, j)] = float("inf")
self.PARENT[(i, j)] = None
self.h[self.s_goal] = 0.0
def run(self, s_start, s_end):
self.init()
self.insert(s_end, 0)
while True:
self.process_state()
if self.t[s_start] == 'CLOSED':
break
self.path = self.extract_path(s_start, s_end)
self.Plot.plot_grid("Dynamic A* (D*)")
self.plot_path(self.path)
self.fig.canvas.mpl_connect('button_press_event', self.on_press)
plt.show()
def on_press(self, event):
x, y = event.xdata, event.ydata
if x < 0 or x > self.x - 1 or y < 0 or y > self.y - 1:
print("Please choose right area!")
else:
x, y = int(x), int(y)
if (x, y) not in self.obs:
print("Add obstacle at: s =", x, ",", "y =", y)
self.obs.add((x, y))
self.Plot.update_obs(self.obs)
s = self.s_start
self.visited = set()
self.count += 1
while s != self.s_goal:
if self.is_collision(s, self.PARENT[s]):
self.modify(s)
continue
s = self.PARENT[s]
self.path = self.extract_path(self.s_start, self.s_goal)
plt.cla()
self.Plot.plot_grid("Dynamic A* (D*)")
self.plot_visited(self.visited)
self.plot_path(self.path)
self.fig.canvas.draw_idle()
def extract_path(self, s_start, s_end):
path = [s_start]
s = s_start
while True:
s = self.PARENT[s]
path.append(s)
if s == s_end:
return path
def process_state(self):
s = self.min_state() # get node in OPEN set with min k value
self.visited.add(s)
if s is None:
return -1 # OPEN set is empty
k_old = self.get_k_min() # record the min k value of this iteration (min path cost)
self.delete(s) # move state s from OPEN set to CLOSED set
# k_min < h[s] --> s: RAISE state (increased cost)
if k_old < self.h[s]:
for s_n in self.get_neighbor(s):
if self.h[s_n] <= k_old and \
self.h[s] > self.h[s_n] + self.cost(s_n, s):
# update h_value and choose parent
self.PARENT[s] = s_n
self.h[s] = self.h[s_n] + self.cost(s_n, s)
# s: k_min >= h[s] -- > s: LOWER state (cost reductions)
if k_old == self.h[s]:
for s_n in self.get_neighbor(s):
if self.t[s_n] == 'NEW' or \
(self.PARENT[s_n] == s and self.h[s_n] != self.h[s] + self.cost(s, s_n)) or \
(self.PARENT[s_n] != s and self.h[s_n] > self.h[s] + self.cost(s, s_n)):
# Condition:
# 1) t[s_n] == 'NEW': not visited
# 2) s_n's parent: cost reduction
# 3) s_n find a better parent
self.PARENT[s_n] = s
self.insert(s_n, self.h[s] + self.cost(s, s_n))
else:
for s_n in self.get_neighbor(s):
if self.t[s_n] == 'NEW' or \
(self.PARENT[s_n] == s and self.h[s_n] != self.h[s] + self.cost(s, s_n)):
# Condition:
# 1) t[s_n] == 'NEW': not visited
# 2) s_n's parent: cost reduction
self.PARENT[s_n] = s
self.insert(s_n, self.h[s] + self.cost(s, s_n))
else:
if self.PARENT[s_n] != s and \
self.h[s_n] > self.h[s] + self.cost(s, s_n):
# Condition: LOWER happened in OPEN set (s), s should be explored again
self.insert(s, self.h[s])
else:
if self.PARENT[s_n] != s and \
self.h[s] > self.h[s_n] + self.cost(s_n, s) and \
self.t[s_n] == 'CLOSED' and \
self.h[s_n] > k_old:
# Condition: LOWER happened in CLOSED set (s_n), s_n should be explored again
self.insert(s_n, self.h[s_n])
return self.get_k_min()
def min_state(self):
"""
choose the node with the minimum k value in OPEN set.
:return: state
"""
if not self.OPEN:
return None
return min(self.OPEN, key=lambda x: self.k[x])
def get_k_min(self):
"""
calc the min k value for nodes in OPEN set.
:return: k value
"""
if not self.OPEN:
return -1
return min([self.k[x] for x in self.OPEN])
def insert(self, s, h_new):
"""
insert node into OPEN set.
:param s: node
:param h_new: new or better cost to come value
"""
if self.t[s] == 'NEW':
self.k[s] = h_new
elif self.t[s] == 'OPEN':
self.k[s] = min(self.k[s], h_new)
elif self.t[s] == 'CLOSED':
self.k[s] = min(self.h[s], h_new)
self.h[s] = h_new
self.t[s] = 'OPEN'
self.OPEN.add(s)
def delete(self, s):
"""
delete: move state s from OPEN set to CLOSED set.
:param s: state should be deleted
"""
if self.t[s] == 'OPEN':
self.t[s] = 'CLOSED'
self.OPEN.remove(s)
def modify(self, s):
"""
start processing from state s.
:param s: is a node whose status is RAISE or LOWER.
"""
self.modify_cost(s)
while True:
k_min = self.process_state()
if k_min >= self.h[s]:
break
def modify_cost(self, s):
# if node in CLOSED set, put it into OPEN set.
# Since cost may be changed between s - s.parent, calc cost(s, s.p) again
if self.t[s] == 'CLOSED':
self.insert(s, self.h[self.PARENT[s]] + self.cost(s, self.PARENT[s]))
def get_neighbor(self, s):
nei_list = set()
for u in self.u_set:
s_next = tuple([s[i] + u[i] for i in range(2)])
if s_next not in self.obs:
nei_list.add(s_next)
return nei_list
def cost(self, s_start, s_goal):
"""
Calculate Cost for this motion
:param s_start: starting node
:param s_goal: end node
:return: Cost for this motion
:note: Cost function could be more complicate!
"""
if self.is_collision(s_start, s_goal):
return float("inf")
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
def is_collision(self, s_start, s_end):
if s_start in self.obs or s_end in self.obs:
return True
if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
else:
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
if s1 in self.obs or s2 in self.obs:
return True
return False
def plot_path(self, path):
px = [x[0] for x in path]
py = [x[1] for x in path]
plt.plot(px, py, linewidth=2)
plt.plot(self.s_start[0], self.s_start[1], "bs")
plt.plot(self.s_goal[0], self.s_goal[1], "gs")
def plot_visited(self, visited):
color = ['gainsboro', 'lightgray', 'silver', 'darkgray',
'bisque', 'navajowhite', 'moccasin', 'wheat',
'powderblue', 'skyblue', 'lightskyblue', 'cornflowerblue']
if self.count >= len(color) - 1:
self.count = 0
for x in visited:
plt.plot(x[0], x[1], marker='s', color=color[self.count])
def main():
s_start = (5, 5)
s_goal = (45, 25)
dstar = DStar(s_start, s_goal)
dstar.run(s_start, s_goal)
if __name__ == '__main__':
main()

@ -0,0 +1,239 @@
"""
D_star_Lite 2D
@author: huiming zhou
"""
import os
import sys
import math
import matplotlib.pyplot as plt
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
"/../../Search_based_Planning/")
from Search_2D import plotting, env
class DStar:
def __init__(self, s_start, s_goal, heuristic_type):
self.s_start, self.s_goal = s_start, s_goal
self.heuristic_type = heuristic_type
self.Env = env.Env() # class Env
self.Plot = plotting.Plotting(s_start, s_goal)
self.u_set = self.Env.motions # feasible input set
self.obs = self.Env.obs # position of obstacles
self.x = self.Env.x_range
self.y = self.Env.y_range
self.g, self.rhs, self.U = {}, {}, {}
self.km = 0
for i in range(1, self.Env.x_range - 1):
for j in range(1, self.Env.y_range - 1):
self.rhs[(i, j)] = float("inf")
self.g[(i, j)] = float("inf")
self.rhs[self.s_goal] = 0.0
self.U[self.s_goal] = self.CalculateKey(self.s_goal)
self.visited = set()
self.count = 0
self.fig = plt.figure()
def run(self):
self.Plot.plot_grid("D* Lite")
self.ComputePath()
self.plot_path(self.extract_path())
self.fig.canvas.mpl_connect('button_press_event', self.on_press)
plt.show()
def on_press(self, event):
x, y = event.xdata, event.ydata
if x < 0 or x > self.x - 1 or y < 0 or y > self.y - 1:
print("Please choose right area!")
else:
x, y = int(x), int(y)
print("Change position: s =", x, ",", "y =", y)
s_curr = self.s_start
s_last = self.s_start
i = 0
path = [self.s_start]
while s_curr != self.s_goal:
s_list = {}
for s in self.get_neighbor(s_curr):
s_list[s] = self.g[s] + self.cost(s_curr, s)
s_curr = min(s_list, key=s_list.get)
path.append(s_curr)
if i < 1:
self.km += self.h(s_last, s_curr)
s_last = s_curr
if (x, y) not in self.obs:
self.obs.add((x, y))
plt.plot(x, y, 'sk')
self.g[(x, y)] = float("inf")
self.rhs[(x, y)] = float("inf")
else:
self.obs.remove((x, y))
plt.plot(x, y, marker='s', color='white')
self.UpdateVertex((x, y))
for s in self.get_neighbor((x, y)):
self.UpdateVertex(s)
i += 1
self.count += 1
self.visited = set()
self.ComputePath()
self.plot_visited(self.visited)
self.plot_path(path)
self.fig.canvas.draw_idle()
def ComputePath(self):
while True:
s, v = self.TopKey()
if v >= self.CalculateKey(self.s_start) and \
self.rhs[self.s_start] == self.g[self.s_start]:
break
k_old = v
self.U.pop(s)
self.visited.add(s)
if k_old < self.CalculateKey(s):
self.U[s] = self.CalculateKey(s)
elif self.g[s] > self.rhs[s]:
self.g[s] = self.rhs[s]
for x in self.get_neighbor(s):
self.UpdateVertex(x)
else:
self.g[s] = float("inf")
self.UpdateVertex(s)
for x in self.get_neighbor(s):
self.UpdateVertex(x)
def UpdateVertex(self, s):
if s != self.s_goal:
self.rhs[s] = float("inf")
for x in self.get_neighbor(s):
self.rhs[s] = min(self.rhs[s], self.g[x] + self.cost(s, x))
if s in self.U:
self.U.pop(s)
if self.g[s] != self.rhs[s]:
self.U[s] = self.CalculateKey(s)
def CalculateKey(self, s):
return [min(self.g[s], self.rhs[s]) + self.h(self.s_start, s) + self.km,
min(self.g[s], self.rhs[s])]
def TopKey(self):
"""
:return: return the min key and its value.
"""
s = min(self.U, key=self.U.get)
return s, self.U[s]
def h(self, s_start, s_goal):
heuristic_type = self.heuristic_type # heuristic type
if heuristic_type == "manhattan":
return abs(s_goal[0] - s_start[0]) + abs(s_goal[1] - s_start[1])
else:
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
def cost(self, s_start, s_goal):
"""
Calculate Cost for this motion
:param s_start: starting node
:param s_goal: end node
:return: Cost for this motion
:note: Cost function could be more complicate!
"""
if self.is_collision(s_start, s_goal):
return float("inf")
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
def is_collision(self, s_start, s_end):
if s_start in self.obs or s_end in self.obs:
return True
if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
else:
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
if s1 in self.obs or s2 in self.obs:
return True
return False
def get_neighbor(self, s):
nei_list = set()
for u in self.u_set:
s_next = tuple([s[i] + u[i] for i in range(2)])
if s_next not in self.obs:
nei_list.add(s_next)
return nei_list
def extract_path(self):
"""
Extract the path based on the PARENT set.
:return: The planning path
"""
path = [self.s_start]
s = self.s_start
for k in range(100):
g_list = {}
for x in self.get_neighbor(s):
if not self.is_collision(s, x):
g_list[x] = self.g[x]
s = min(g_list, key=g_list.get)
path.append(s)
if s == self.s_goal:
break
return list(path)
def plot_path(self, path):
px = [x[0] for x in path]
py = [x[1] for x in path]
plt.plot(px, py, linewidth=2)
plt.plot(self.s_start[0], self.s_start[1], "bs")
plt.plot(self.s_goal[0], self.s_goal[1], "gs")
def plot_visited(self, visited):
color = ['gainsboro', 'lightgray', 'silver', 'darkgray',
'bisque', 'navajowhite', 'moccasin', 'wheat',
'powderblue', 'skyblue', 'lightskyblue', 'cornflowerblue']
if self.count >= len(color) - 1:
self.count = 0
for x in visited:
plt.plot(x[0], x[1], marker='s', color=color[self.count])
def main():
s_start = (5, 5)
s_goal = (45, 25)
dstar = DStar(s_start, s_goal, "euclidean")
dstar.run()
if __name__ == '__main__':
main()

@ -0,0 +1,69 @@
"""
Dijkstra 2D
@author: huiming zhou
"""
import os
import sys
import math
import heapq
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
"/../../Search_based_Planning/")
from Search_2D import plotting, env
from Search_2D.Astar import AStar
class Dijkstra(AStar):
"""Dijkstra set the cost as the priority
"""
def searching(self):
"""
Breadth-first Searching.
:return: path, visited order
"""
self.PARENT[self.s_start] = self.s_start
self.g[self.s_start] = 0
self.g[self.s_goal] = math.inf
heapq.heappush(self.OPEN,
(0, self.s_start))
while self.OPEN:
_, s = heapq.heappop(self.OPEN)
self.CLOSED.append(s)
if s == self.s_goal:
break
for s_n in self.get_neighbor(s):
new_cost = self.g[s] + self.cost(s, s_n)
if s_n not in self.g:
self.g[s_n] = math.inf
if new_cost < self.g[s_n]: # conditions for updating Cost
self.g[s_n] = new_cost
self.PARENT[s_n] = s
# best first set the heuristics as the priority
heapq.heappush(self.OPEN, (new_cost, s_n))
return self.extract_path(self.PARENT), self.CLOSED
def main():
s_start = (5, 5)
s_goal = (45, 25)
dijkstra = Dijkstra(s_start, s_goal, 'None')
plot = plotting.Plotting(s_start, s_goal)
path, visited = dijkstra.searching()
plot.animation(path, visited, "Dijkstra's") # animation generate
if __name__ == '__main__':
main()

@ -0,0 +1,256 @@
"""
LPA_star 2D
@author: huiming zhou
"""
import os
import sys
import math
import matplotlib.pyplot as plt
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
"/../../Search_based_Planning/")
from Search_2D import plotting, env
class LPAStar:
def __init__(self, s_start, s_goal, heuristic_type):
self.s_start, self.s_goal = s_start, s_goal
self.heuristic_type = heuristic_type
self.Env = env.Env()
self.Plot = plotting.Plotting(self.s_start, self.s_goal)
self.u_set = self.Env.motions
self.obs = self.Env.obs
self.x = self.Env.x_range
self.y = self.Env.y_range
self.g, self.rhs, self.U = {}, {}, {}
for i in range(self.Env.x_range):
for j in range(self.Env.y_range):
self.rhs[(i, j)] = float("inf")
self.g[(i, j)] = float("inf")
self.rhs[self.s_start] = 0
self.U[self.s_start] = self.CalculateKey(self.s_start)
self.visited = set()
self.count = 0
self.fig = plt.figure()
def run(self):
self.Plot.plot_grid("Lifelong Planning A*")
self.ComputeShortestPath()
self.plot_path(self.extract_path())
self.fig.canvas.mpl_connect('button_press_event', self.on_press)
plt.show()
def on_press(self, event):
x, y = event.xdata, event.ydata
if x < 0 or x > self.x - 1 or y < 0 or y > self.y - 1:
print("Please choose right area!")
else:
x, y = int(x), int(y)
print("Change position: s =", x, ",", "y =", y)
self.visited = set()
self.count += 1
if (x, y) not in self.obs:
self.obs.add((x, y))
else:
self.obs.remove((x, y))
self.UpdateVertex((x, y))
self.Plot.update_obs(self.obs)
for s_n in self.get_neighbor((x, y)):
self.UpdateVertex(s_n)
self.ComputeShortestPath()
plt.cla()
self.Plot.plot_grid("Lifelong Planning A*")
self.plot_visited(self.visited)
self.plot_path(self.extract_path())
self.fig.canvas.draw_idle()
def ComputeShortestPath(self):
while True:
s, v = self.TopKey()
if v >= self.CalculateKey(self.s_goal) and \
self.rhs[self.s_goal] == self.g[self.s_goal]:
break
self.U.pop(s)
self.visited.add(s)
if self.g[s] > self.rhs[s]:
# Condition: over-consistent (eg: deleted obstacles)
# So, rhs[s] decreased -- > rhs[s] < g[s]
self.g[s] = self.rhs[s]
else:
# Condition: # under-consistent (eg: added obstacles)
# So, rhs[s] increased --> rhs[s] > g[s]
self.g[s] = float("inf")
self.UpdateVertex(s)
for s_n in self.get_neighbor(s):
self.UpdateVertex(s_n)
def UpdateVertex(self, s):
"""
update the status and the current cost to come of state s.
:param s: state s
"""
if s != self.s_start:
# Condition: cost of parent of s changed
# Since we do not record the children of a state, we need to enumerate its neighbors
self.rhs[s] = min(self.g[s_n] + self.cost(s_n, s)
for s_n in self.get_neighbor(s))
if s in self.U:
self.U.pop(s)
if self.g[s] != self.rhs[s]:
# Condition: current cost to come is different to that of last time
# state s should be added into OPEN set (set U)
self.U[s] = self.CalculateKey(s)
def TopKey(self):
"""
:return: return the min key and its value.
"""
s = min(self.U, key=self.U.get)
return s, self.U[s]
def CalculateKey(self, s):
return [min(self.g[s], self.rhs[s]) + self.h(s),
min(self.g[s], self.rhs[s])]
def get_neighbor(self, s):
"""
find neighbors of state s that not in obstacles.
:param s: state
:return: neighbors
"""
s_list = set()
for u in self.u_set:
s_next = tuple([s[i] + u[i] for i in range(2)])
if s_next not in self.obs:
s_list.add(s_next)
return s_list
def h(self, s):
"""
Calculate heuristic.
:param s: current node (state)
:return: heuristic function value
"""
heuristic_type = self.heuristic_type # heuristic type
goal = self.s_goal # goal node
if heuristic_type == "manhattan":
return abs(goal[0] - s[0]) + abs(goal[1] - s[1])
else:
return math.hypot(goal[0] - s[0], goal[1] - s[1])
def cost(self, s_start, s_goal):
"""
Calculate Cost for this motion
:param s_start: starting node
:param s_goal: end node
:return: Cost for this motion
:note: Cost function could be more complicate!
"""
if self.is_collision(s_start, s_goal):
return float("inf")
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
def is_collision(self, s_start, s_end):
if s_start in self.obs or s_end in self.obs:
return True
if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
else:
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
if s1 in self.obs or s2 in self.obs:
return True
return False
def extract_path(self):
"""
Extract the path based on the PARENT set.
:return: The planning path
"""
path = [self.s_goal]
s = self.s_goal
for k in range(100):
g_list = {}
for x in self.get_neighbor(s):
if not self.is_collision(s, x):
g_list[x] = self.g[x]
s = min(g_list, key=g_list.get)
path.append(s)
if s == self.s_start:
break
return list(reversed(path))
def plot_path(self, path):
px = [x[0] for x in path]
py = [x[1] for x in path]
plt.plot(px, py, linewidth=2)
plt.plot(self.s_start[0], self.s_start[1], "bs")
plt.plot(self.s_goal[0], self.s_goal[1], "gs")
def plot_visited(self, visited):
color = ['gainsboro', 'lightgray', 'silver', 'darkgray',
'bisque', 'navajowhite', 'moccasin', 'wheat',
'powderblue', 'skyblue', 'lightskyblue', 'cornflowerblue']
if self.count >= len(color) - 1:
self.count = 0
for x in visited:
plt.plot(x[0], x[1], marker='s', color=color[self.count])
def main():
x_start = (5, 5)
x_goal = (45, 25)
lpastar = LPAStar(x_start, x_goal, "Euclidean")
lpastar.run()
if __name__ == '__main__':
main()

@ -0,0 +1,230 @@
"""
LRTA_star 2D (Learning Real-time A*)
@author: huiming zhou
"""
import os
import sys
import copy
import math
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
"/../../Search_based_Planning/")
from Search_2D import queue, plotting, env
class LrtAStarN:
def __init__(self, s_start, s_goal, N, heuristic_type):
self.s_start, self.s_goal = s_start, s_goal
self.heuristic_type = heuristic_type
self.Env = env.Env()
self.u_set = self.Env.motions # feasible input set
self.obs = self.Env.obs # position of obstacles
self.N = N # number of expand nodes each iteration
self.visited = [] # order of visited nodes in planning
self.path = [] # path of each iteration
self.h_table = {} # h_value table
def init(self):
"""
initialize the h_value of all nodes in the environment.
it is a global table.
"""
for i in range(self.Env.x_range):
for j in range(self.Env.y_range):
self.h_table[(i, j)] = self.h((i, j))
def searching(self):
self.init()
s_start = self.s_start # initialize start node
while True:
OPEN, CLOSED = self.AStar(s_start, self.N) # OPEN, CLOSED sets in each iteration
if OPEN == "FOUND": # reach the goal node
self.path.append(CLOSED)
break
h_value = self.iteration(CLOSED) # h_value table of CLOSED nodes
for x in h_value:
self.h_table[x] = h_value[x]
s_start, path_k = self.extract_path_in_CLOSE(s_start, h_value) # x_init -> expected node in OPEN set
self.path.append(path_k)
def extract_path_in_CLOSE(self, s_start, h_value):
path = [s_start]
s = s_start
while True:
h_list = {}
for s_n in self.get_neighbor(s):
if s_n in h_value:
h_list[s_n] = h_value[s_n]
else:
h_list[s_n] = self.h_table[s_n]
s_key = min(h_list, key=h_list.get) # move to the smallest node with min h_value
path.append(s_key) # generate path
s = s_key # use end of this iteration as the start of next
if s_key not in h_value: # reach the expected node in OPEN set
return s_key, path
def iteration(self, CLOSED):
h_value = {}
for s in CLOSED:
h_value[s] = float("inf") # initialize h_value of CLOSED nodes
while True:
h_value_rec = copy.deepcopy(h_value)
for s in CLOSED:
h_list = []
for s_n in self.get_neighbor(s):
if s_n not in CLOSED:
h_list.append(self.cost(s, s_n) + self.h_table[s_n])
else:
h_list.append(self.cost(s, s_n) + h_value[s_n])
h_value[s] = min(h_list) # update h_value of current node
if h_value == h_value_rec: # h_value table converged
return h_value
def AStar(self, x_start, N):
OPEN = queue.QueuePrior() # OPEN set
OPEN.put(x_start, self.h(x_start))
CLOSED = [] # CLOSED set
g_table = {x_start: 0, self.s_goal: float("inf")} # Cost to come
PARENT = {x_start: x_start} # relations
count = 0 # counter
while not OPEN.empty():
count += 1
s = OPEN.get()
CLOSED.append(s)
if s == self.s_goal: # reach the goal node
self.visited.append(CLOSED)
return "FOUND", self.extract_path(x_start, PARENT)
for s_n in self.get_neighbor(s):
if s_n not in CLOSED:
new_cost = g_table[s] + self.cost(s, s_n)
if s_n not in g_table:
g_table[s_n] = float("inf")
if new_cost < g_table[s_n]: # conditions for updating Cost
g_table[s_n] = new_cost
PARENT[s_n] = s
OPEN.put(s_n, g_table[s_n] + self.h_table[s_n])
if count == N: # expand needed CLOSED nodes
break
self.visited.append(CLOSED) # visited nodes in each iteration
return OPEN, CLOSED
def get_neighbor(self, s):
"""
find neighbors of state s that not in obstacles.
:param s: state
:return: neighbors
"""
s_list = []
for u in self.u_set:
s_next = tuple([s[i] + u[i] for i in range(2)])
if s_next not in self.obs:
s_list.append(s_next)
return s_list
def extract_path(self, x_start, parent):
"""
Extract the path based on the relationship of nodes.
:return: The planning path
"""
path_back = [self.s_goal]
x_current = self.s_goal
while True:
x_current = parent[x_current]
path_back.append(x_current)
if x_current == x_start:
break
return list(reversed(path_back))
def h(self, s):
"""
Calculate heuristic.
:param s: current node (state)
:return: heuristic function value
"""
heuristic_type = self.heuristic_type # heuristic type
goal = self.s_goal # goal node
if heuristic_type == "manhattan":
return abs(goal[0] - s[0]) + abs(goal[1] - s[1])
else:
return math.hypot(goal[0] - s[0], goal[1] - s[1])
def cost(self, s_start, s_goal):
"""
Calculate Cost for this motion
:param s_start: starting node
:param s_goal: end node
:return: Cost for this motion
:note: Cost function could be more complicate!
"""
if self.is_collision(s_start, s_goal):
return float("inf")
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
def is_collision(self, s_start, s_end):
if s_start in self.obs or s_end in self.obs:
return True
if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
else:
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
if s1 in self.obs or s2 in self.obs:
return True
return False
def main():
s_start = (10, 5)
s_goal = (45, 25)
lrta = LrtAStarN(s_start, s_goal, 250, "euclidean")
plot = plotting.Plotting(s_start, s_goal)
lrta.searching()
plot.animation_lrta(lrta.path, lrta.visited,
"Learning Real-time A* (LRTA*)")
if __name__ == '__main__':
main()

@ -0,0 +1,237 @@
"""
RTAAstar 2D (Real-time Adaptive A*)
@author: huiming zhou
"""
import os
import sys
import copy
import math
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
"/../../Search_based_Planning/")
from Search_2D import queue, plotting, env
class RTAAStar:
def __init__(self, s_start, s_goal, N, heuristic_type):
self.s_start, self.s_goal = s_start, s_goal
self.heuristic_type = heuristic_type
self.Env = env.Env()
self.u_set = self.Env.motions # feasible input set
self.obs = self.Env.obs # position of obstacles
self.N = N # number of expand nodes each iteration
self.visited = [] # order of visited nodes in planning
self.path = [] # path of each iteration
self.h_table = {} # h_value table
def init(self):
"""
initialize the h_value of all nodes in the environment.
it is a global table.
"""
for i in range(self.Env.x_range):
for j in range(self.Env.y_range):
self.h_table[(i, j)] = self.h((i, j))
def searching(self):
self.init()
s_start = self.s_start # initialize start node
while True:
OPEN, CLOSED, g_table, PARENT = \
self.Astar(s_start, self.N)
if OPEN == "FOUND": # reach the goal node
self.path.append(CLOSED)
break
s_next, h_value = self.cal_h_value(OPEN, CLOSED, g_table, PARENT)
for x in h_value:
self.h_table[x] = h_value[x]
s_start, path_k = self.extract_path_in_CLOSE(s_start, s_next, h_value)
self.path.append(path_k)
def cal_h_value(self, OPEN, CLOSED, g_table, PARENT):
v_open = {}
h_value = {}
for (_, x) in OPEN.enumerate():
v_open[x] = g_table[PARENT[x]] + 1 + self.h_table[x]
s_open = min(v_open, key=v_open.get)
f_min = v_open[s_open]
for x in CLOSED:
h_value[x] = f_min - g_table[x]
return s_open, h_value
def iteration(self, CLOSED):
h_value = {}
for s in CLOSED:
h_value[s] = float("inf") # initialize h_value of CLOSED nodes
while True:
h_value_rec = copy.deepcopy(h_value)
for s in CLOSED:
h_list = []
for s_n in self.get_neighbor(s):
if s_n not in CLOSED:
h_list.append(self.cost(s, s_n) + self.h_table[s_n])
else:
h_list.append(self.cost(s, s_n) + h_value[s_n])
h_value[s] = min(h_list) # update h_value of current node
if h_value == h_value_rec: # h_value table converged
return h_value
def Astar(self, x_start, N):
OPEN = queue.QueuePrior() # OPEN set
OPEN.put(x_start, self.h_table[x_start])
CLOSED = [] # CLOSED set
g_table = {x_start: 0, self.s_goal: float("inf")} # Cost to come
PARENT = {x_start: x_start} # relations
count = 0 # counter
while not OPEN.empty():
count += 1
s = OPEN.get()
CLOSED.append(s)
if s == self.s_goal: # reach the goal node
self.visited.append(CLOSED)
return "FOUND", self.extract_path(x_start, PARENT), [], []
for s_n in self.get_neighbor(s):
if s_n not in CLOSED:
new_cost = g_table[s] + self.cost(s, s_n)
if s_n not in g_table:
g_table[s_n] = float("inf")
if new_cost < g_table[s_n]: # conditions for updating Cost
g_table[s_n] = new_cost
PARENT[s_n] = s
OPEN.put(s_n, g_table[s_n] + self.h_table[s_n])
if count == N: # expand needed CLOSED nodes
break
self.visited.append(CLOSED) # visited nodes in each iteration
return OPEN, CLOSED, g_table, PARENT
def get_neighbor(self, s):
"""
find neighbors of state s that not in obstacles.
:param s: state
:return: neighbors
"""
s_list = set()
for u in self.u_set:
s_next = tuple([s[i] + u[i] for i in range(2)])
if s_next not in self.obs:
s_list.add(s_next)
return s_list
def extract_path_in_CLOSE(self, s_end, s_start, h_value):
path = [s_start]
s = s_start
while True:
h_list = {}
for s_n in self.get_neighbor(s):
if s_n in h_value:
h_list[s_n] = h_value[s_n]
s_key = max(h_list, key=h_list.get) # move to the smallest node with min h_value
path.append(s_key) # generate path
s = s_key # use end of this iteration as the start of next
if s_key == s_end: # reach the expected node in OPEN set
return s_start, list(reversed(path))
def extract_path(self, x_start, parent):
"""
Extract the path based on the relationship of nodes.
:return: The planning path
"""
path = [self.s_goal]
s = self.s_goal
while True:
s = parent[s]
path.append(s)
if s == x_start:
break
return list(reversed(path))
def h(self, s):
"""
Calculate heuristic.
:param s: current node (state)
:return: heuristic function value
"""
heuristic_type = self.heuristic_type # heuristic type
goal = self.s_goal # goal node
if heuristic_type == "manhattan":
return abs(goal[0] - s[0]) + abs(goal[1] - s[1])
else:
return math.hypot(goal[0] - s[0], goal[1] - s[1])
def cost(self, s_start, s_goal):
"""
Calculate Cost for this motion
:param s_start: starting node
:param s_goal: end node
:return: Cost for this motion
:note: Cost function could be more complicate!
"""
if self.is_collision(s_start, s_goal):
return float("inf")
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
def is_collision(self, s_start, s_end):
if s_start in self.obs or s_end in self.obs:
return True
if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
else:
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
if s1 in self.obs or s2 in self.obs:
return True
return False
def main():
s_start = (10, 5)
s_goal = (45, 25)
rtaa = RTAAStar(s_start, s_goal, 240, "euclidean")
plot = plotting.Plotting(s_start, s_goal)
rtaa.searching()
plot.animation_lrta(rtaa.path, rtaa.visited,
"Real-time Adaptive A* (RTAA*)")
if __name__ == '__main__':
main()

@ -0,0 +1,69 @@
"""
Breadth-first Searching_2D (BFS)
@author: huiming zhou
"""
import os
import sys
from collections import deque
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
"/../../Search_based_Planning/")
from Search_2D import plotting, env
from Search_2D.Astar import AStar
import math
import heapq
class BFS(AStar):
"""BFS add the new visited node in the end of the openset
"""
def searching(self):
"""
Breadth-first Searching.
:return: path, visited order
"""
self.PARENT[self.s_start] = self.s_start
self.g[self.s_start] = 0
self.g[self.s_goal] = math.inf
heapq.heappush(self.OPEN,
(0, self.s_start))
while self.OPEN:
_, s = heapq.heappop(self.OPEN)
self.CLOSED.append(s)
if s == self.s_goal:
break
for s_n in self.get_neighbor(s):
new_cost = self.g[s] + self.cost(s, s_n)
if s_n not in self.g:
self.g[s_n] = math.inf
if new_cost < self.g[s_n]: # conditions for updating Cost
self.g[s_n] = new_cost
self.PARENT[s_n] = s
# bfs, add new node to the end of the openset
prior = self.OPEN[-1][0]+1 if len(self.OPEN)>0 else 0
heapq.heappush(self.OPEN, (prior, s_n))
return self.extract_path(self.PARENT), self.CLOSED
def main():
s_start = (5, 5)
s_goal = (45, 25)
bfs = BFS(s_start, s_goal, 'None')
plot = plotting.Plotting(s_start, s_goal)
path, visited = bfs.searching()
plot.animation(path, visited, "Breadth-first Searching (BFS)")
if __name__ == '__main__':
main()

@ -0,0 +1,65 @@
import os
import sys
import math
import heapq
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
"/../../Search_based_Planning/")
from Search_2D import plotting, env
from Search_2D.Astar import AStar
class DFS(AStar):
"""DFS add the new visited node in the front of the openset
"""
def searching(self):
"""
Breadth-first Searching.
:return: path, visited order
"""
self.PARENT[self.s_start] = self.s_start
self.g[self.s_start] = 0
self.g[self.s_goal] = math.inf
heapq.heappush(self.OPEN,
(0, self.s_start))
while self.OPEN:
_, s = heapq.heappop(self.OPEN)
self.CLOSED.append(s)
if s == self.s_goal:
break
for s_n in self.get_neighbor(s):
new_cost = self.g[s] + self.cost(s, s_n)
if s_n not in self.g:
self.g[s_n] = math.inf
if new_cost < self.g[s_n]: # conditions for updating Cost
self.g[s_n] = new_cost
self.PARENT[s_n] = s
# dfs, add new node to the front of the openset
prior = self.OPEN[0][0]-1 if len(self.OPEN)>0 else 0
heapq.heappush(self.OPEN, (prior, s_n))
return self.extract_path(self.PARENT), self.CLOSED
def main():
s_start = (5, 5)
s_goal = (45, 25)
dfs = DFS(s_start, s_goal, 'None')
plot = plotting.Plotting(s_start, s_goal)
path, visited = dfs.searching()
visited = list(dict.fromkeys(visited))
plot.animation(path, visited, "Depth-first Searching (DFS)") # animation
if __name__ == '__main__':
main()

@ -0,0 +1,52 @@
"""
Env 2D
@author: huiming zhou
"""
class Env:
def __init__(self):
self.x_range = 51 # size of background
self.y_range = 31
self.motions = [(-1, 0), (-1, 1), (0, 1), (1, 1),
(1, 0), (1, -1), (0, -1), (-1, -1)]
self.obs = self.obs_map()
def update_obs(self, obs):
self.obs = obs
def obs_map(self):
"""
Initialize obstacles' positions
:return: map of obstacles
"""
x = self.x_range #51
y = self.y_range #31
obs = set()
#画上下边框
for i in range(x):
obs.add((i, 0))
for i in range(x):
obs.add((i, y - 1))
#画左右边框
for i in range(y):
obs.add((0, i))
for i in range(y):
obs.add((x - 1, i))
for i in range(2, 21):
obs.add((i, 15))
for i in range(15):
obs.add((20, i))
for i in range(15, 30):
obs.add((30, i))
for i in range(16):
obs.add((40, i))
return obs
# if __name__ == '__main__':
# a = Env()
# print(a.obs)

@ -0,0 +1,165 @@
"""
Plot tools 2D
@author: huiming zhou
"""
import os
import sys
import matplotlib.pyplot as plt
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
"/../../Search_based_Planning/")
from Search_2D import env
class Plotting:
def __init__(self, xI, xG):
self.xI, self.xG = xI, xG
self.env = env.Env()
self.obs = self.env.obs_map()
def update_obs(self, obs):
self.obs = obs
def animation(self, path, visited, name):
self.plot_grid(name)
self.plot_visited(visited)
self.plot_path(path)
plt.show()
def animation_lrta(self, path, visited, name):
self.plot_grid(name)
cl = self.color_list_2()
path_combine = []
for k in range(len(path)):
self.plot_visited(visited[k], cl[k])
plt.pause(0.2)
self.plot_path(path[k])
path_combine += path[k]
plt.pause(0.2)
if self.xI in path_combine:
path_combine.remove(self.xI)
self.plot_path(path_combine)
plt.show()
def animation_ara_star(self, path, visited, name):
self.plot_grid(name)
cl_v, cl_p = self.color_list()
for k in range(len(path)):
self.plot_visited(visited[k], cl_v[k])
self.plot_path(path[k], cl_p[k], True)
plt.pause(0.5)
plt.show()
def animation_bi_astar(self, path, v_fore, v_back, name):
self.plot_grid(name)
self.plot_visited_bi(v_fore, v_back)
self.plot_path(path)
plt.show()
def plot_grid(self, name):
obs_x = [x[0] for x in self.obs]
obs_y = [x[1] for x in self.obs]
plt.plot(self.xI[0], self.xI[1], "bs")
plt.plot(self.xG[0], self.xG[1], "gs")
plt.plot(obs_x, obs_y, "sk")
plt.title(name)
plt.axis("equal")
def plot_visited(self, visited, cl='gray'):
if self.xI in visited:
visited.remove(self.xI)
if self.xG in visited:
visited.remove(self.xG)
count = 0
for x in visited:
count += 1
plt.plot(x[0], x[1], color=cl, marker='o')
plt.gcf().canvas.mpl_connect('key_release_event',
lambda event: [exit(0) if event.key == 'escape' else None])
if count < len(visited) / 3:
length = 20
elif count < len(visited) * 2 / 3:
length = 30
else:
length = 40
#
# length = 15
if count % length == 0:
plt.pause(0.001)
plt.pause(0.01)
def plot_path(self, path, cl='r', flag=False):
path_x = [path[i][0] for i in range(len(path))]
path_y = [path[i][1] for i in range(len(path))]
if not flag:
plt.plot(path_x, path_y, linewidth='3', color='r')
else:
plt.plot(path_x, path_y, linewidth='3', color=cl)
plt.plot(self.xI[0], self.xI[1], "bs")
plt.plot(self.xG[0], self.xG[1], "gs")
plt.pause(0.01)
def plot_visited_bi(self, v_fore, v_back):
if self.xI in v_fore:
v_fore.remove(self.xI)
if self.xG in v_back:
v_back.remove(self.xG)
len_fore, len_back = len(v_fore), len(v_back)
for k in range(max(len_fore, len_back)):
if k < len_fore:
plt.plot(v_fore[k][0], v_fore[k][1], linewidth='3', color='gray', marker='o')
if k < len_back:
plt.plot(v_back[k][0], v_back[k][1], linewidth='3', color='cornflowerblue', marker='o')
plt.gcf().canvas.mpl_connect('key_release_event',
lambda event: [exit(0) if event.key == 'escape' else None])
if k % 10 == 0:
plt.pause(0.001)
plt.pause(0.01)
@staticmethod
def color_list():
cl_v = ['silver',
'wheat',
'lightskyblue',
'royalblue',
'slategray']
cl_p = ['gray',
'orange',
'deepskyblue',
'red',
'm']
return cl_v, cl_p
@staticmethod
def color_list_2():
cl = ['silver',
'steelblue',
'dimgray',
'cornflowerblue',
'dodgerblue',
'royalblue',
'plum',
'mediumslateblue',
'mediumpurple',
'blueviolet',
]
return cl

@ -0,0 +1,62 @@
import collections
import heapq
class QueueFIFO:
"""
Class: QueueFIFO
Description: QueueFIFO is designed for First-in-First-out rule.
"""
def __init__(self):
self.queue = collections.deque()
def empty(self):
return len(self.queue) == 0
def put(self, node):
self.queue.append(node) # enter from back
def get(self):
return self.queue.popleft() # leave from front
class QueueLIFO:
"""
Class: QueueLIFO
Description: QueueLIFO is designed for Last-in-First-out rule.
"""
def __init__(self):
self.queue = collections.deque()
def empty(self):
return len(self.queue) == 0
def put(self, node):
self.queue.append(node) # enter from back
def get(self):
return self.queue.pop() # leave from back
class QueuePrior:
"""
Class: QueuePrior
Description: QueuePrior reorders elements using value [priority]
"""
def __init__(self):
self.queue = []
def empty(self):
return len(self.queue) == 0
def put(self, item, priority):
heapq.heappush(self.queue, (priority, item)) # reorder s using priority
def get(self):
return heapq.heappop(self.queue)[1] # pop out the smallest item
def enumerate(self):
return self.queue

@ -0,0 +1,19 @@
# @Time : 2022/5/9 20:49
# @Author : 2890199310@qq.com
# @File : KeyPress.py
# @Software: PyCharm
# @Function:
def main():
keyPress()
def keyPress(key):
if(key == 1):
return "e"
def result():
return keyPress()
if __name__ == '__main__':
main()

@ -0,0 +1,31 @@
# @Time : 2022/4/20 12:27
# @Author : 2890199310@qq.com
# @File : KeyPressModule.py.py
# @Software: PyCharm
# @Function:
import os
import sys
sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../PaddleClas-release-2.3")
# import pygame
import Tello.KeyPress as k
def init():
return
def getKey(keyName):
# ans = False
# for eve in pygame.event.get(): pass
# keyInput = pygame.key.get_pressed()
# myKey = getattr(pygame,'K_{}'.format(keyName))
# if keyInput[myKey]:
if keyName == k.result():
ans = True
# pygame.display.update()
return ans
def key(a):
return a
if __name__ == '__main__':
init()

@ -0,0 +1,105 @@
# @Time : 2022/4/20 12:27
# @Author : 2890199310@qq.com
# @File : KeyboardControl.py.py
# @Software: PyCharm
# @Function:
import os
import sys
sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../PaddleClas-release-2.3")
import logging
import time
import cv2
from djitellopy import tello
# import Tello.KeyPressModule as kp # 用于获取键盘按键
from time import sleep
# Tello初始化设置
Drone = tello.Tello() # 创建飞行器对象
Drone.connect() # 连接到飞行器
Drone.streamon() # 开启视频传输
Drone.LOGGER.setLevel(logging.ERROR) # 只显示错误信息
sleep(5) # 等待视频初始化
# kp.init() # 初始化按键处理模块
def getKeyboardInput(key):
image = cv2.resize(OriginalImage, (Camera_Width, Camera_Height))
speed = 70
drone = Drone
lr, fb, ud, yv = 0, 0, 0, 0
key_pressed = 0
# if kp.getKey("e"): #
if key == "e":
cv2.imwrite('D:/snap-{}.jpg'.format(time.strftime("%H%M%S", time.localtime())), image)
# if kp.getKey("UP"):# 上升
if key == "UP":
Drone.takeoff()
# elif kp.getKey("DOWN"):#下降
if key == "DOWN":
Drone.land()
# if kp.getKey("j"):# 向左飞行
if key == "j":
key_pressed = 1
lr = -speed
# elif kp.getKey("l"): #向右飞行
if key == "l":
key_pressed = 1
lr = speed
# if kp.getKey("i"): #向前飞行
if key == "i":
key_pressed = 1
fb = speed
# elif kp.getKey("k"):# 向后飞行
if key == "k":
key_pressed = 1
fb = -speed
# if kp.getKey("w"):# 向上飞行
if key == "w":
key_pressed = 1
ud = speed
# elif kp.getKey("s"): #向下飞行
if key == "s":
key_pressed = 1
ud = -speed
# if kp.getKey("a"): # 向左旋转
if key == "a":
key_pressed = 1
yv = -speed
# elif kp.getKey("d"): #向右旋转
if key == "d":
key_pressed = 1
yv = speed
InfoText = "battery : {0}% height: {1}cm time: {2}".format(drone.get_battery(), drone.get_height(), time.strftime("%H:%M:%S",time.localtime()))
cv2.putText(image, InfoText, (10, 20), font, fontScale, (0, 0, 255), lineThickness)
if key_pressed == 1:
InfoText = "Command : lr:{0}% fb:{1} ud:{2} yv:{3}".format(lr, fb, ud, yv)
cv2.putText(image, InfoText, (10, 40), font, fontScale, (0, 0, 255), lineThickness)
drone.send_rc_control(lr, fb, ud, yv)
# 主程序
# 摄像头设置
Camera_Width = 720
Camera_Height = 480
DetectRange = [6000, 11000] # DetectRange[0] 是保持静止的检测人脸面积阈值下限DetectRange[0] 是保持静止的检测人脸面积阈值上限
PID_Parameter = [0.5, 0.0004, 0.4]
pErrorRotate, pErrorUp = 0, 0
# 字体设置
font = cv2.FONT_HERSHEY_SIMPLEX
fontScale = 0.5
fontColor = (255, 0, 0)
lineThickness = 1
while True:
OriginalImage = Drone.get_frame_read().frame
Image = cv2.resize(OriginalImage, (Camera_Width, Camera_Height))
# getKeyboardInput(drone=Drone, speed=70, image=Image) # 按键控制
cv2.imshow("Drone Control Centre", Image)
cv2.waitKey(1)

@ -0,0 +1,12 @@
# @Time : 2022/5/9 8:45
# @Author : 2890199310@qq.com
# @File : calculate.py
# @Software: PyCharm
# @Function:
import os
os.chdir(r'E:\PaddleClas-release-2.3\PaddleClas-release-2.3\deploy')
####
os.system('python python/predict_system.py -c configs/inference_general.yaml -o Global.infer_imgs="./Trees/test_images/90.jpeg" -o IndexProcess.index_dir="./Trees/index" -o Global.use_gpu=False -o IndexProcess.Image_root="./Trees/gallery/" -o IndexProcess.data_file="./Trees/gallery/tree_list.txt')

@ -0,0 +1,84 @@
# @Time : 2022/4/19 9:41
# @Author : 2890199310@qq.com
# @File : change_file.py
# @Software: PyCharm
# @Function:
import os
import shutil
# 读取文件名
def readname():
filepath = "/deploy/Trees/gallery/0"
name = os.listdir(filepath)
return name
def main():
gallery = os.path.abspath(r"/deploy/drink_dataset_v1.0/gallery")
all_files = os.listdir(gallery)[:-2]
all_files.sort(key=lambda x:int(x))
# for file in all_files:
# print(file)
for i in range(0, 102): #创建文件夹
dire = "E:\\PaddleClas-release-2.3\\PaddleClas-release-2.3\\deploy\\flowers102\\gallery\\" + str(i)
if not os.path.exists(dire):
os.makedirs(dire)
print(f"{i}创建完毕")
f = open(r"/deploy/flowers102/train_extra_list.txt")
for line in f.readlines():
base = "E://PaddleClas-release-2.3//PaddleClas-release-2.3/deploy//flowers102//"
s = line.split('/')
str1 = s[0].strip() + "//"
str2 = s[1].split(' ')[0].strip()
str3 = s[1].split(' ')[1].strip() + "//"
# print(base + "gallery//" + str3 + str2)
# 复制训练文件
# shutil.copy(base + str1 + str2, base + "gallery/" + str3 + str2)
f.close()
f = open(r"/deploy/flowers102/val_list.txt")
for line in f.readlines():
base = "E://PaddleClas-release-2.3//PaddleClas-release-2.3/deploy//flowers102//"
s = line.split('/')
str1 = s[0].strip() + "//"
str2 = s[1].split(' ')[0].strip()
str3 = s[1].split(' ')[1].strip() + "//"
# 复制测试文件
# shutil.copy(base + str1 + str2, base + "test_images/" + str2)
f.close()
dit = dict()
with open(r"/deploy/flowers102/flowers102_label_list.txt", "r", encoding="utf-8") as f2:
for line in f2.readlines():
tmp = line.split(' ', 1)
dit[tmp[0]] = tmp[1].strip()
#print(dit)
#更改list内容
with open(r"/deploy/flowers102/train_list.txt", "r", encoding="utf-8") as f1, open(
r"/deploy/flowers102/train_label.txt", "w", encoding="utf-8") as f3:
for line1 in f1.readlines():
tmp1 = line1.split(' ', 1)
# f3.write(tmp1[1].strip() + '/' + tmp1[0].split('/', 1)[1].strip() + '\t' + dit[tmp1[1].strip()] + '\n')
# print(line1)
# print(line2)
# break
f1.close()
f2.close()
f3.close()
with open(r"/deploy/Trees/gallery/tree_list.txt", "w", encoding="utf-8") as f1:
name = readname()
for i in name:
if('GIF' not in i):
f1.write('0/' + i + '\t' + 'tree\n')
f1.close()
if __name__ == '__main__':
main()

@ -0,0 +1,5 @@
# @Time : 2022/4/19 9:54
# @Author : 2890199310@qq.com
# @File : spider.py
# @Software: PyCharm
# @Function:

@ -0,0 +1,114 @@
# -*- coding: utf-8 -*-
# Form implementation generated from reading ui file 'page1.ui'
#
# Created by: PyQt5 UI code generator 5.15.4
#
# WARNING: Any manual changes made to this file will be lost when pyuic5 is
# run again. Do not edit this file unless you know what you are doing.
import logging
from PyQt5 import QtCore, QtWidgets
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../PaddleClas-release-2.3")
from Screenshot.Screenshot_Gui import *
from tello_UI import *
from djitellopy import *
import time
class Ui_Form1(object):
def setupUi(self, Form):
Form.setObjectName("Form")
Form.resize(1108, 767)
self.pushButton = QtWidgets.QPushButton(Form)
self.pushButton.setGeometry(QtCore.QRect(10, 740, 75, 23))
sizePolicy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Fixed, QtWidgets.QSizePolicy.Fixed)
sizePolicy.setHorizontalStretch(0)
sizePolicy.setVerticalStretch(0)
sizePolicy.setHeightForWidth(self.pushButton.sizePolicy().hasHeightForWidth())
self.pushButton.setSizePolicy(sizePolicy)
self.pushButton.setObjectName("pushButton")
sizePolicy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Fixed, QtWidgets.QSizePolicy.Fixed)
sizePolicy.setHorizontalStretch(0)
sizePolicy.setVerticalStretch(0)
self.mdiArea = QtWidgets.QMdiArea(Form)
self.mdiArea.setGeometry(QtCore.QRect(9, 61, 1091, 671))
sizePolicy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Preferred, QtWidgets.QSizePolicy.Preferred)
sizePolicy.setHorizontalStretch(0)
sizePolicy.setVerticalStretch(0)
sizePolicy.setHeightForWidth(self.mdiArea.sizePolicy().hasHeightForWidth())
self.mdiArea.setSizePolicy(sizePolicy)
self.mdiArea.setMaximumSize(QtCore.QSize(16777215, 16777215))
self.mdiArea.setObjectName("mdiArea")
self.pushButton_4 = QtWidgets.QPushButton(Form)
self.pushButton_4.setGeometry(QtCore.QRect(1020, 740, 75, 23))
sizePolicy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Fixed, QtWidgets.QSizePolicy.Fixed)
sizePolicy.setHorizontalStretch(0)
sizePolicy.setVerticalStretch(0)
sizePolicy.setHeightForWidth(self.pushButton_4.sizePolicy().hasHeightForWidth())
self.pushButton_4.setSizePolicy(sizePolicy)
self.pushButton_4.setObjectName("pushButton_4")
self.pushButton_3 = QtWidgets.QPushButton(Form)
self.pushButton_3.setGeometry(QtCore.QRect(940, 740, 75, 23))
sizePolicy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Fixed, QtWidgets.QSizePolicy.Fixed)
sizePolicy.setHorizontalStretch(0)
sizePolicy.setVerticalStretch(0)
sizePolicy.setHeightForWidth(self.pushButton_3.sizePolicy().hasHeightForWidth())
self.pushButton_3.setSizePolicy(sizePolicy)
self.pushButton_3.setObjectName("pushButton_3")
self.label = QtWidgets.QLabel(Form)
self.label.setGeometry(QtCore.QRect(10, 10, 101, 31))
self.label.setObjectName("label")
self.pushButton_5 = QtWidgets.QPushButton(Form)
self.pushButton_5.setGeometry(QtCore.QRect(480, 740, 75, 23))
self.pushButton_5.setObjectName("pushButton_5")
self.retranslateUi(Form)
QtCore.QMetaObject.connectSlotsByName(Form)
self.pushButton.clicked.connect(lambda: self.show_camera())
self.pushButton_5.clicked.connect(lambda: self.record_vedio())
def show_camera(self):
MainWindow = QMainWindow()
ui = Ui_Form()
ui.setupUi(MainWindow)
MainWindow.setFixedSize(1022, 618)
self.mdiArea.addSubWindow(MainWindow)
MainWindow.show()
try:
import Tello.KeyboardControl as kb
ui.pushButton.clicked.connect(lambda: up())
ui.pushButton2.clicked.connect(lambda: left())
ui.pushButton4.clicked.connect(lambda: right())
ui.pushButton5.clicked.connect(lambda: return1())
ui.pushButton3.clicked.connect(lambda: qifei())
ui.pushButton6.clicked.connect(lambda: jiangluo())
except:
print("")
def up():
kb.getKeyboardInput("i")# 前
def left():#左
kb.getKeyboardInput("j")
def right():
kb.getKeyboardInput("l")#右
def return1():
kb.getKeyboardInput("k")#后
def qifei():
kb.getKeyboardInput("UP")
def jiangluo():
kb.getKeyboardInput("DOWN")
def record_vedio(self):
ui = Ui_MainWindow1()
self.mdiArea.addSubWindow(ui)
ui.show()
def retranslateUi(self, Form):
_translate = QtCore.QCoreApplication.translate
Form.setWindowTitle(_translate("Form", "page1"))
self.pushButton.setText(_translate("Form", "连接"))
self.pushButton_4.setText(_translate("Form", "缩小"))
self.pushButton_3.setText(_translate("Form", "放大"))
self.label.setText(_translate("Form", "实时画面显示系统"))
self.pushButton_5.setText(_translate("Form", "视频录制"))

@ -0,0 +1,280 @@
# -*- coding: utf-8 -*-
# Form implementation generated from reading ui file 'page2.ui'
#
# Created by: PyQt5 UI code generator 5.15.4
#
# WARNING: Any manual changes made to this file will be lost when pyuic5 is
# run again. Do not edit this file unless you know what you are doing.
from PyQt5 import QtCore, QtGui, QtWidgets
from vedio_demo import Ui_MainWindow
from PyQt5.QtWidgets import *
from PyQt5.QtMultimedia import QMediaContent, QMediaPlayer
import os
import sys
import math
import heapq
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
"/../../Search_based_Planning/")
from Search_2D import plotting, env
class AStar:
"""AStar set the cost + heuristics as the priority
"""
def __init__(self, s_start, s_goal, heuristic_type):
self.s_start = s_start
self.s_goal = s_goal
self.heuristic_type = heuristic_type
self.Env = env.Env() # class Env
self.u_set = self.Env.motions # feasible input set
self.obs = self.Env.obs # position of obstacles
self.OPEN = [] # priority queue / OPEN set
self.CLOSED = [] # CLOSED set / VISITED order
self.PARENT = dict() # recorded parent
self.g = dict() # cost to come
def searching(self):
"""
A_star Searching.
:return: path, visited order
"""
self.PARENT[self.s_start] = self.s_start
self.g[self.s_start] = 0
self.g[self.s_goal] = math.inf
heapq.heappush(self.OPEN,
(self.f_value(self.s_start), self.s_start))
while self.OPEN:
_, s = heapq.heappop(self.OPEN)
self.CLOSED.append(s)
if s == self.s_goal: # stop condition
break
for s_n in self.get_neighbor(s):
new_cost = self.g[s] + self.cost(s, s_n)
if s_n not in self.g:
self.g[s_n] = math.inf
if new_cost < self.g[s_n]: # conditions for updating Cost
self.g[s_n] = new_cost
self.PARENT[s_n] = s
heapq.heappush(self.OPEN, (self.f_value(s_n), s_n))
return self.extract_path(self.PARENT), self.CLOSED
def searching_repeated_astar(self, e):
"""
repeated A*.
:param e: weight of A*
:return: path and visited order
"""
path, visited = [], []
while e >= 1:
p_k, v_k = self.repeated_searching(self.s_start, self.s_goal, e)
path.append(p_k)
visited.append(v_k)
e -= 0.5
return path, visited
def repeated_searching(self, s_start, s_goal, e):
"""
run A* with weight e.
:param s_start: starting state
:param s_goal: goal state
:param e: weight of a*
:return: path and visited order.
"""
g = {s_start: 0, s_goal: float("inf")}
PARENT = {s_start: s_start}
OPEN = []
CLOSED = []
heapq.heappush(OPEN,
(g[s_start] + e * self.heuristic(s_start), s_start))
while OPEN:
_, s = heapq.heappop(OPEN)
CLOSED.append(s)
if s == s_goal:
break
for s_n in self.get_neighbor(s):
new_cost = g[s] + self.cost(s, s_n)
if s_n not in g:
g[s_n] = math.inf
if new_cost < g[s_n]: # conditions for updating Cost
g[s_n] = new_cost
PARENT[s_n] = s
heapq.heappush(OPEN, (g[s_n] + e * self.heuristic(s_n), s_n))
return self.extract_path(PARENT), CLOSED
def get_neighbor(self, s):
"""
find neighbors of state s that not in obstacles.
:param s: state
:return: neighbors
"""
return [(s[0] + u[0], s[1] + u[1]) for u in self.u_set]
def cost(self, s_start, s_goal):
"""
Calculate Cost for this motion
:param s_start: starting node
:param s_goal: end node
:return: Cost for this motion
:note: Cost function could be more complicate!
"""
if self.is_collision(s_start, s_goal):
return math.inf
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
def is_collision(self, s_start, s_end):
"""
check if the line segment (s_start, s_end) is collision.
:param s_start: start node
:param s_end: end node
:return: True: is collision / False: not collision
"""
if s_start in self.obs or s_end in self.obs:
return True
if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
else:
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
if s1 in self.obs or s2 in self.obs:
return True
return False
def f_value(self, s):
"""
f = g + h. (g: Cost to come, h: heuristic value)
:param s: current state
:return: f
"""
return self.g[s] + self.heuristic(s)
def extract_path(self, PARENT):
"""
Extract the path based on the PARENT set.
:return: The planning path
"""
path = [self.s_goal]
s = self.s_goal
while True:
s = PARENT[s]
path.append(s)
if s == self.s_start:
break
return list(path)
def heuristic(self, s):
"""
Calculate heuristic.
:param s: current node (state)
:return: heuristic function value
"""
heuristic_type = self.heuristic_type # heuristic type
goal = self.s_goal # goal node
if heuristic_type == "manhattan":
return abs(goal[0] - s[0]) + abs(goal[1] - s[1])
else:
return math.hypot(goal[0] - s[0], goal[1] - s[1])
class Ui_MainWindow1(object):
def setupUi(self, MainWindow):
MainWindow.setObjectName("MainWindow")
MainWindow.resize(1112, 766)
self.centralwidget = QtWidgets.QWidget(MainWindow)
self.centralwidget.setObjectName("centralwidget")
self.gridLayout = QtWidgets.QGridLayout(self.centralwidget)
self.gridLayout.setObjectName("gridLayout")
self.pushButton = QtWidgets.QPushButton(self.centralwidget)
self.pushButton.setObjectName("pushButton")
self.gridLayout.addWidget(self.pushButton, 2, 0, 1, 1)
self.mdiArea = QtWidgets.QMdiArea(self.centralwidget)
self.mdiArea.setObjectName("mdiArea")
self.gridLayout.addWidget(self.mdiArea, 1, 0, 1, 1)
self.pushButton_2 = QtWidgets.QPushButton(self.centralwidget)
self.pushButton_2.setObjectName("pushButton_2")
self.gridLayout.addWidget(self.pushButton_2, 3, 0, 1, 1)
self.label = QtWidgets.QLabel(self.centralwidget)
self.label.setObjectName("label")
self.gridLayout.addWidget(self.label, 0, 0, 1, 1)
MainWindow.setCentralWidget(self.centralwidget)
self.menubar = QtWidgets.QMenuBar(MainWindow)
self.menubar.setGeometry(QtCore.QRect(0, 0, 1112, 23))
self.menubar.setObjectName("menubar")
MainWindow.setMenuBar(self.menubar)
self.statusbar = QtWidgets.QStatusBar(MainWindow)
self.statusbar.setObjectName("statusbar")
MainWindow.setStatusBar(self.statusbar)
self.retranslateUi(MainWindow)
QtCore.QMetaObject.connectSlotsByName(MainWindow)
self.pushButton.clicked.connect(lambda: self.msg())
self.pushButton_2.clicked.connect(lambda: self.search())
def search(self):
s_start = (5, 5)
s_goal = (45, 25)
astar = AStar(s_start, s_goal, "euclidean")
plot = plotting.Plotting(s_start, s_goal)
path, visited = astar.searching()
plot.animation(path, visited, "A*") # animation
def msg(self):
MainWindow = QMainWindow()
ui = Ui_MainWindow()
ui.setupUi(MainWindow)
self.mdiArea.addSubWindow(MainWindow)
MainWindow.showMaximized()
ui.player = QMediaPlayer()
ui.player.setVideoOutput(ui.wgt_video)
ui.pushButton.clicked.connect(lambda: openVideoFile(ui))
ui.player.setMedia(QMediaContent(QFileDialog.getOpenFileUrl()[0]))
ui.pushButton_2.clicked.connect(lambda: pause(ui))
def pause(a):
a.player.pause()
def openVideoFile(a):
a.player.play()
def retranslateUi(self, MainWindow):
_translate = QtCore.QCoreApplication.translate
MainWindow.setWindowTitle(_translate("MainWindow", "MainWindow"))
self.pushButton.setText(_translate("MainWindow", "添加文件"))
self.pushButton_2.setText(_translate("MainWindow", "分析路径"))
self.label.setText(_translate("MainWindow", "路径分析界面"))

@ -0,0 +1,96 @@
# -*- coding: utf-8 -*-
# Form implementation generated from reading ui file 'system_main.ui'
#
# Created by: PyQt5 UI code generator 5.15.4
#
# WARNING: Any manual changes made to this file will be lost when pyuic5 is
# run again. Do not edit this file unless you know what you are doing.
from PyQt5 import QtCore, QtGui, QtWidgets
import sys
import os
from PyQt5.QtWidgets import *
from page1 import Ui_Form1
from page2 import Ui_MainWindow1
sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../PaddleClas-release-2.3")
class Ui_MainWindow(object):
def setupUi(self, MainWindow):
MainWindow.setObjectName("MainWindow")
MainWindow.resize(1275, 896)
self.centralwidget = QtWidgets.QWidget(MainWindow)
self.centralwidget.setObjectName("centralwidget")
self.gridLayout = QtWidgets.QGridLayout(self.centralwidget)
self.gridLayout.setObjectName("gridLayout")
self.label = QtWidgets.QLabel(self.centralwidget)
self.label.setMinimumSize(QtCore.QSize(0, 20))
self.label.setMaximumSize(QtCore.QSize(300, 20))
self.label.setTextFormat(QtCore.Qt.AutoText)
self.label.setAlignment(QtCore.Qt.AlignCenter)
self.label.setObjectName("label")
self.gridLayout.addWidget(self.label, 1, 0, 1, 1)
self.groupBox = QtWidgets.QGroupBox(self.centralwidget)
sizePolicy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Fixed, QtWidgets.QSizePolicy.Expanding)
sizePolicy.setHorizontalStretch(0)
sizePolicy.setVerticalStretch(0)
sizePolicy.setHeightForWidth(self.groupBox.sizePolicy().hasHeightForWidth())
self.groupBox.setSizePolicy(sizePolicy)
self.groupBox.setMaximumSize(QtCore.QSize(600, 16777215))
self.groupBox.setFlat(False)
self.groupBox.setCheckable(False)
self.groupBox.setObjectName("groupBox")
self.gridLayout_2 = QtWidgets.QGridLayout(self.groupBox)
self.gridLayout_2.setObjectName("gridLayout_2")
self.pushButton = QtWidgets.QPushButton(self.groupBox)
self.pushButton.setObjectName("pushButton")
self.gridLayout_2.addWidget(self.pushButton, 0, 0, 1, 1)
self.pushButton_2 = QtWidgets.QPushButton(self.groupBox)
self.pushButton_2.setObjectName("pushButton_2")
self.gridLayout_2.addWidget(self.pushButton_2, 1, 0, 1, 1)
self.gridLayout.addWidget(self.groupBox, 2, 0, 2, 1)
self.mdiArea = QtWidgets.QMdiArea(self.centralwidget)
self.mdiArea.setObjectName("mdiArea")
self.gridLayout.addWidget(self.mdiArea, 2, 1, 2, 1)
MainWindow.setCentralWidget(self.centralwidget)
self.menubar = QtWidgets.QMenuBar(MainWindow)
self.menubar.setGeometry(QtCore.QRect(0, 0, 1275, 23))
self.menubar.setObjectName("menubar")
MainWindow.setMenuBar(self.menubar)
self.statusbar = QtWidgets.QStatusBar(MainWindow)
self.statusbar.setObjectName("statusbar")
MainWindow.setStatusBar(self.statusbar)
self.pushButton.clicked.connect(lambda: self.open1())
self.pushButton_2.clicked.connect(lambda: self.open2())
self.retranslateUi(MainWindow)
QtCore.QMetaObject.connectSlotsByName(MainWindow)
def open1(self):
MainWindow = QMainWindow()
ui = Ui_Form1()
ui.setupUi(MainWindow)
self.mdiArea.addSubWindow(MainWindow)
MainWindow.showMaximized()
def open2(self):
MainWindow = QMainWindow()
ui = Ui_MainWindow1()
ui.setupUi(MainWindow)
self.mdiArea.addSubWindow(MainWindow)
MainWindow.showMaximized()
def retranslateUi(self, MainWindow):
_translate = QtCore.QCoreApplication.translate
MainWindow.setWindowTitle(_translate("MainWindow", "无人机自动寻路系统"))
self.label.setText(_translate("MainWindow", "无人机自动寻路系统(主界面)"))
self.groupBox.setTitle(_translate("MainWindow", "菜单栏"))
self.pushButton.setText(_translate("MainWindow", "实时画面"))
self.pushButton_2.setText(_translate("MainWindow", "路径分析"))
if __name__ == '__main__':
app = QApplication(sys.argv)
MainWindow = QMainWindow()
ui = Ui_MainWindow()
ui.setupUi(MainWindow)
MainWindow.show()
sys.exit(app.exec_())

@ -0,0 +1,97 @@
# -*- coding: utf-8 -*-
# Form implementation generated from reading ui file 'tello_UI.ui'
#
# Created by: PyQt5 UI code generator 5.15.4
#
# WARNING: Any manual changes made to this file will be lost when pyuic5 is
# run again. Do not edit this file unless you know what you are doing.
from PyQt5 import QtCore, QtGui, QtWidgets
class Ui_Form(object):
def setupUi(self, Form):
Form.setObjectName("Form")
Form.resize(1022, 618)
self.groupBox_4 = QtWidgets.QGroupBox(Form)
self.groupBox_4.setGeometry(QtCore.QRect(670, 567, 350, 50))
self.groupBox_4.setMaximumSize(QtCore.QSize(350, 50))
self.groupBox_4.setObjectName("groupBox_4")
self.lineEdit_3 = QtWidgets.QLineEdit(self.groupBox_4)
self.lineEdit_3.setGeometry(QtCore.QRect(10, 20, 331, 20))
self.lineEdit_3.setObjectName("lineEdit_3")
self.groupBox_2 = QtWidgets.QGroupBox(Form)
self.groupBox_2.setGeometry(QtCore.QRect(670, 156, 350, 405))
self.groupBox_2.setObjectName("groupBox_2")
self.pushButton = QtWidgets.QPushButton(self.groupBox_2)
self.pushButton.setGeometry(QtCore.QRect(150, 120, 71, 71))
self.pushButton.setText("")
icon = QtGui.QIcon()
icon.addPixmap(QtGui.QPixmap("tello_png/up.png"), QtGui.QIcon.Normal, QtGui.QIcon.Off)
self.pushButton.setIcon(icon)
self.pushButton.setIconSize(QtCore.QSize(70, 100))
self.pushButton.setObjectName("pushButton")
self.pushButton_2 = QtWidgets.QPushButton(self.groupBox_2)
self.pushButton_2.setGeometry(QtCore.QRect(70, 200, 71, 71))
self.pushButton_2.setText("")
icon1 = QtGui.QIcon()
icon1.addPixmap(QtGui.QPixmap("tello_png/left.png"), QtGui.QIcon.Normal, QtGui.QIcon.Off)
self.pushButton_2.setIcon(icon1)
self.pushButton_2.setIconSize(QtCore.QSize(70, 100))
self.pushButton_2.setObjectName("pushButton_2")
self.pushButton_4 = QtWidgets.QPushButton(self.groupBox_2)
self.pushButton_4.setGeometry(QtCore.QRect(230, 200, 71, 71))
self.pushButton_4.setText("")
icon2 = QtGui.QIcon()
icon2.addPixmap(QtGui.QPixmap("tello_png/right.png"), QtGui.QIcon.Normal, QtGui.QIcon.Off)
self.pushButton_4.setIcon(icon2)
self.pushButton_4.setIconSize(QtCore.QSize(70, 100))
self.pushButton_4.setObjectName("pushButton_4")
self.pushButton_5 = QtWidgets.QPushButton(self.groupBox_2)
self.pushButton_5.setGeometry(QtCore.QRect(150, 200, 71, 71))
self.pushButton_5.setText("")
icon3 = QtGui.QIcon()
icon3.addPixmap(QtGui.QPixmap("tello_png/return.png"), QtGui.QIcon.Normal, QtGui.QIcon.Off)
self.pushButton_5.setIcon(icon3)
self.pushButton_5.setIconSize(QtCore.QSize(70, 100))
self.pushButton_5.setObjectName("pushButton_5")
self.pushButton_3 = QtWidgets.QPushButton(self.groupBox_2)
self.pushButton_3.setGeometry(QtCore.QRect(70, 20, 231, 91))
icon4 = QtGui.QIcon()
icon4.addPixmap(QtGui.QPixmap("tello_png/qifei.png"), QtGui.QIcon.Normal, QtGui.QIcon.Off)
self.pushButton_3.setIcon(icon4)
self.pushButton_3.setIconSize(QtCore.QSize(150, 150))
self.pushButton_3.setObjectName("pushButton_3")
self.pushButton_6 = QtWidgets.QPushButton(self.groupBox_2)
self.pushButton_6.setGeometry(QtCore.QRect(70, 290, 231, 91))
icon5 = QtGui.QIcon()
icon5.addPixmap(QtGui.QPixmap("tello_png/jiangluo.png"), QtGui.QIcon.Normal, QtGui.QIcon.Off)
self.pushButton_6.setIcon(icon5)
self.pushButton_6.setIconSize(QtCore.QSize(150, 150))
self.pushButton_6.setObjectName("pushButton_6")
self.groupBox = QtWidgets.QGroupBox(Form)
self.groupBox.setGeometry(QtCore.QRect(670, 0, 350, 150))
self.groupBox.setMaximumSize(QtCore.QSize(16777215, 150))
self.groupBox.setObjectName("groupBox")
self.lineEdit = QtWidgets.QLineEdit(self.groupBox)
self.lineEdit.setGeometry(QtCore.QRect(20, 20, 311, 41))
self.lineEdit.setObjectName("lineEdit")
self.lineEdit_2 = QtWidgets.QLineEdit(self.groupBox)
self.lineEdit_2.setGeometry(QtCore.QRect(20, 70, 311, 41))
self.lineEdit_2.setObjectName("lineEdit_2")
self.retranslateUi(Form)
QtCore.QMetaObject.connectSlotsByName(Form)
def retranslateUi(self, Form):
_translate = QtCore.QCoreApplication.translate
Form.setWindowTitle(_translate("Form", "Form"))
self.groupBox_4.setTitle(_translate("Form", "连接状态"))
self.groupBox_2.setTitle(_translate("Form", "控制面板"))
self.pushButton_3.setText(_translate("Form", "起飞"))
self.pushButton_6.setText(_translate("Form", "降落"))
self.groupBox.setTitle(_translate("Form", "无人机状态"))
self.lineEdit.setText(_translate("Form", "剩余电量:"))
self.lineEdit_2.setText(_translate("Form", "WIFI强度"))

Binary file not shown.

After

Width:  |  Height:  |  Size: 61 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 18 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 56 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 19 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 18 KiB

@ -0,0 +1,50 @@
# -*- coding: utf-8 -*-
# Form implementation generated from reading ui file 'vedio_demo.ui'
#
# Created by: PyQt5 UI code generator 5.15.4
#
# WARNING: Any manual changes made to this file will be lost when pyuic5 is
# run again. Do not edit this file unless you know what you are doing.
from PyQt5 import QtCore, QtGui, QtWidgets
from PyQt5.QtMultimediaWidgets import QVideoWidget
class Ui_MainWindow(object):
def setupUi(self, MainWindow):
MainWindow.setObjectName("MainWindow")
MainWindow.resize(1111, 676)
self.centralwidget = QtWidgets.QWidget(MainWindow)
self.centralwidget.setObjectName("centralwidget")
self.gridLayout = QtWidgets.QGridLayout(self.centralwidget)
self.gridLayout.setObjectName("gridLayout")
self.pushButton_2 = QtWidgets.QPushButton(self.centralwidget)
self.pushButton_2.setObjectName("pushButton_2")
self.gridLayout.addWidget(self.pushButton_2, 3, 0, 1, 1)
self.pushButton = QtWidgets.QPushButton(self.centralwidget)
self.pushButton.setObjectName("pushButton")
self.gridLayout.addWidget(self.pushButton, 2, 0, 1, 1)
self.wgt_video = QVideoWidget(self.centralwidget)
self.wgt_video.setObjectName("widget")
self.gridLayout.addWidget(self.wgt_video, 0, 0, 1, 1)
self.horizontalSlider = QtWidgets.QSlider(self.centralwidget)
self.horizontalSlider.setOrientation(QtCore.Qt.Horizontal)
self.horizontalSlider.setObjectName("horizontalSlider")
self.gridLayout.addWidget(self.horizontalSlider, 1, 0, 1, 1)
MainWindow.setCentralWidget(self.centralwidget)
self.menubar = QtWidgets.QMenuBar(MainWindow)
self.menubar.setGeometry(QtCore.QRect(0, 0, 1111, 23))
self.menubar.setObjectName("menubar")
MainWindow.setMenuBar(self.menubar)
self.statusbar = QtWidgets.QStatusBar(MainWindow)
self.statusbar.setObjectName("statusbar")
MainWindow.setStatusBar(self.statusbar)
self.retranslateUi(MainWindow)
QtCore.QMetaObject.connectSlotsByName(MainWindow)
def retranslateUi(self, MainWindow):
_translate = QtCore.QCoreApplication.translate
MainWindow.setWindowTitle(_translate("MainWindow", "MainWindow"))
self.pushButton_2.setText(_translate("MainWindow", "暂停"))
self.pushButton.setText(_translate("MainWindow", "播放"))
Loading…
Cancel
Save