diff --git a/src/Search_2D/.idea/.gitignore b/src/Search_2D/.idea/.gitignore new file mode 100644 index 0000000..359bb53 --- /dev/null +++ b/src/Search_2D/.idea/.gitignore @@ -0,0 +1,3 @@ +# 默认忽略的文件 +/shelf/ +/workspace.xml diff --git a/src/Search_2D/.idea/Search_2D.iml b/src/Search_2D/.idea/Search_2D.iml new file mode 100644 index 0000000..8b8c395 --- /dev/null +++ b/src/Search_2D/.idea/Search_2D.iml @@ -0,0 +1,12 @@ + + + + + + + + + + \ No newline at end of file diff --git a/src/Search_2D/.idea/inspectionProfiles/Project_Default.xml b/src/Search_2D/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..6736707 --- /dev/null +++ b/src/Search_2D/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,15 @@ + + + + \ No newline at end of file diff --git a/src/Search_2D/.idea/inspectionProfiles/profiles_settings.xml b/src/Search_2D/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/src/Search_2D/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/src/Search_2D/.idea/misc.xml b/src/Search_2D/.idea/misc.xml new file mode 100644 index 0000000..d56657a --- /dev/null +++ b/src/Search_2D/.idea/misc.xml @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/src/Search_2D/.idea/modules.xml b/src/Search_2D/.idea/modules.xml new file mode 100644 index 0000000..01049a7 --- /dev/null +++ b/src/Search_2D/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/src/Search_2D/ARAstar.py b/src/Search_2D/ARAstar.py new file mode 100644 index 0000000..c014616 --- /dev/null +++ b/src/Search_2D/ARAstar.py @@ -0,0 +1,222 @@ +""" +ARA_star 2D (Anytime Repairing A*) +@author: huiming zhou + +@description: local inconsistency: g-value decreased. +g(s) decreased introduces a local inconsistency between s and its successors. + +""" + +import os +import sys +import math + +sys.path.append(os.path.dirname(os.path.abspath(__file__)) + + "/../../Search_based_Planning/") + +from Search_2D import plotting, env + + +class AraStar: + def __init__(self, s_start, s_goal, e, heuristic_type): + self.s_start, self.s_goal = s_start, s_goal + self.heuristic_type = heuristic_type + + self.Env = env.Env() # class Env + + self.u_set = self.Env.motions # feasible input set + self.obs = self.Env.obs # position of obstacles + self.e = e # weight + + self.g = dict() # Cost to come + self.OPEN = dict() # priority queue / OPEN set + self.CLOSED = set() # CLOSED set + self.INCONS = {} # INCONSISTENT set + self.PARENT = dict() # relations + self.path = [] # planning path + self.visited = [] # order of visited nodes + + def init(self): + """ + initialize each set. + """ + + self.g[self.s_start] = 0.0 + self.g[self.s_goal] = math.inf + self.OPEN[self.s_start] = self.f_value(self.s_start) + self.PARENT[self.s_start] = self.s_start + + def searching(self): + self.init() + self.ImprovePath() + self.path.append(self.extract_path()) + + while self.update_e() > 1: # continue condition + self.e -= 0.4 # increase weight + self.OPEN.update(self.INCONS) + self.OPEN = {s: self.f_value(s) for s in self.OPEN} # update f_value of OPEN set + + self.INCONS = dict() + self.CLOSED = set() + self.ImprovePath() # improve path + self.path.append(self.extract_path()) + + return self.path, self.visited + + def ImprovePath(self): + """ + :return: a e'-suboptimal path + """ + + visited_each = [] + + while True: + s, f_small = self.calc_smallest_f() + + if self.f_value(self.s_goal) <= f_small: + break + + self.OPEN.pop(s) + self.CLOSED.add(s) + + for s_n in self.get_neighbor(s): + if s_n in self.obs: + continue + + new_cost = self.g[s] + self.cost(s, s_n) + + if s_n not in self.g or new_cost < self.g[s_n]: + self.g[s_n] = new_cost + self.PARENT[s_n] = s + visited_each.append(s_n) + + if s_n not in self.CLOSED: + self.OPEN[s_n] = self.f_value(s_n) + else: + self.INCONS[s_n] = 0.0 + + self.visited.append(visited_each) + + def calc_smallest_f(self): + """ + :return: node with smallest f_value in OPEN set. + """ + + s_small = min(self.OPEN, key=self.OPEN.get) + + return s_small, self.OPEN[s_small] + + def get_neighbor(self, s): + """ + find neighbors of state s that not in obstacles. + :param s: state + :return: neighbors + """ + + return {(s[0] + u[0], s[1] + u[1]) for u in self.u_set} + + def update_e(self): + v = float("inf") + + if self.OPEN: + v = min(self.g[s] + self.h(s) for s in self.OPEN) + if self.INCONS: + v = min(v, min(self.g[s] + self.h(s) for s in self.INCONS)) + + return min(self.e, self.g[self.s_goal] / v) + + def f_value(self, x): + """ + f = g + e * h + f = cost-to-come + weight * cost-to-go + :param x: current state + :return: f_value + """ + + return self.g[x] + self.e * self.h(x) + + def extract_path(self): + """ + Extract the path based on the PARENT set. + :return: The planning path + """ + + path = [self.s_goal] + s = self.s_goal + + while True: + s = self.PARENT[s] + path.append(s) + + if s == self.s_start: + break + + return list(path) + + def h(self, s): + """ + Calculate heuristic. + :param s: current node (state) + :return: heuristic function value + """ + + heuristic_type = self.heuristic_type # heuristic type + goal = self.s_goal # goal node + + if heuristic_type == "manhattan": + return abs(goal[0] - s[0]) + abs(goal[1] - s[1]) + else: + return math.hypot(goal[0] - s[0], goal[1] - s[1]) + + def cost(self, s_start, s_goal): + """ + Calculate Cost for this motion + :param s_start: starting node + :param s_goal: end node + :return: Cost for this motion + :note: Cost function could be more complicate! + """ + + if self.is_collision(s_start, s_goal): + return math.inf + + return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1]) + + def is_collision(self, s_start, s_end): + """ + check if the line segment (s_start, s_end) is collision. + :param s_start: start node + :param s_end: end node + :return: True: is collision / False: not collision + """ + + if s_start in self.obs or s_end in self.obs: + return True + + if s_start[0] != s_end[0] and s_start[1] != s_end[1]: + if s_end[0] - s_start[0] == s_start[1] - s_end[1]: + s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1])) + s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1])) + else: + s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1])) + s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1])) + + if s1 in self.obs or s2 in self.obs: + return True + + return False + + +def main(): + s_start = (5, 5) + s_goal = (45, 25) + + arastar = AraStar(s_start, s_goal, 2.5, "euclidean") + plot = plotting.Plotting(s_start, s_goal) + + path, visited = arastar.searching() + plot.animation_ara_star(path, visited, "Anytime Repairing A* (ARA*)") + + +if __name__ == '__main__': + main() diff --git a/src/Search_2D/Anytime_D_star.py b/src/Search_2D/Anytime_D_star.py new file mode 100644 index 0000000..cd1d62b --- /dev/null +++ b/src/Search_2D/Anytime_D_star.py @@ -0,0 +1,317 @@ +""" +Anytime_D_star 2D +@author: huiming zhou +""" + +import os +import sys +import math +import matplotlib.pyplot as plt + +sys.path.append(os.path.dirname(os.path.abspath(__file__)) + + "/../../Search_based_Planning/") + +from Search_2D import plotting +from Search_2D import env + + +class ADStar: + def __init__(self, s_start, s_goal, eps, heuristic_type): + self.s_start, self.s_goal = s_start, s_goal + self.heuristic_type = heuristic_type + + self.Env = env.Env() # class Env + self.Plot = plotting.Plotting(s_start, s_goal) + + self.u_set = self.Env.motions # feasible input set + self.obs = self.Env.obs # position of obstacles + self.x = self.Env.x_range + self.y = self.Env.y_range + + self.g, self.rhs, self.OPEN = {}, {}, {} + + for i in range(1, self.Env.x_range - 1): + for j in range(1, self.Env.y_range - 1): + self.rhs[(i, j)] = float("inf") + self.g[(i, j)] = float("inf") + + self.rhs[self.s_goal] = 0.0 + self.eps = eps + self.OPEN[self.s_goal] = self.Key(self.s_goal) + self.CLOSED, self.INCONS = set(), dict() + + self.visited = set() + self.count = 0 + self.count_env_change = 0 + self.obs_add = set() + self.obs_remove = set() + self.title = "Anytime D*: Small changes" # Significant changes + self.fig = plt.figure() + + def run(self): + self.Plot.plot_grid(self.title) + self.ComputeOrImprovePath() + self.plot_visited() + self.plot_path(self.extract_path()) + self.visited = set() + + while True: + if self.eps <= 1.0: + break + self.eps -= 0.5 + self.OPEN.update(self.INCONS) + for s in self.OPEN: + self.OPEN[s] = self.Key(s) + self.CLOSED = set() + self.ComputeOrImprovePath() + self.plot_visited() + self.plot_path(self.extract_path()) + self.visited = set() + plt.pause(0.5) + + self.fig.canvas.mpl_connect('button_press_event', self.on_press) + plt.show() + + def on_press(self, event): + x, y = event.xdata, event.ydata + if x < 0 or x > self.x - 1 or y < 0 or y > self.y - 1: + print("Please choose right area!") + else: + self.count_env_change += 1 + x, y = int(x), int(y) + print("Change position: s =", x, ",", "y =", y) + + # for small changes + if self.title == "Anytime D*: Small changes": + if (x, y) not in self.obs: + self.obs.add((x, y)) + self.g[(x, y)] = float("inf") + self.rhs[(x, y)] = float("inf") + else: + self.obs.remove((x, y)) + self.UpdateState((x, y)) + + self.Plot.update_obs(self.obs) + + for sn in self.get_neighbor((x, y)): + self.UpdateState(sn) + + plt.cla() + self.Plot.plot_grid(self.title) + + while True: + if len(self.INCONS) == 0: + break + self.OPEN.update(self.INCONS) + for s in self.OPEN: + self.OPEN[s] = self.Key(s) + self.CLOSED = set() + self.ComputeOrImprovePath() + self.plot_visited() + self.plot_path(self.extract_path()) + # plt.plot(self.title) + self.visited = set() + + if self.eps <= 1.0: + break + + else: + if (x, y) not in self.obs: + self.obs.add((x, y)) + self.obs_add.add((x, y)) + plt.plot(x, y, 'sk') + if (x, y) in self.obs_remove: + self.obs_remove.remove((x, y)) + else: + self.obs.remove((x, y)) + self.obs_remove.add((x, y)) + plt.plot(x, y, marker='s', color='white') + if (x, y) in self.obs_add: + self.obs_add.remove((x, y)) + + self.Plot.update_obs(self.obs) + + if self.count_env_change >= 15: + self.count_env_change = 0 + self.eps += 2.0 + for s in self.obs_add: + self.g[(x, y)] = float("inf") + self.rhs[(x, y)] = float("inf") + + for sn in self.get_neighbor(s): + self.UpdateState(sn) + + for s in self.obs_remove: + for sn in self.get_neighbor(s): + self.UpdateState(sn) + self.UpdateState(s) + + plt.cla() + self.Plot.plot_grid(self.title) + + while True: + if self.eps <= 1.0: + break + self.eps -= 0.5 + self.OPEN.update(self.INCONS) + for s in self.OPEN: + self.OPEN[s] = self.Key(s) + self.CLOSED = set() + self.ComputeOrImprovePath() + self.plot_visited() + self.plot_path(self.extract_path()) + plt.title(self.title) + self.visited = set() + plt.pause(0.5) + + self.fig.canvas.draw_idle() + + def ComputeOrImprovePath(self): + while True: + s, v = self.TopKey() + if v >= self.Key(self.s_start) and \ + self.rhs[self.s_start] == self.g[self.s_start]: + break + + self.OPEN.pop(s) + self.visited.add(s) + + if self.g[s] > self.rhs[s]: + self.g[s] = self.rhs[s] + self.CLOSED.add(s) + for sn in self.get_neighbor(s): + self.UpdateState(sn) + else: + self.g[s] = float("inf") + for sn in self.get_neighbor(s): + self.UpdateState(sn) + self.UpdateState(s) + + def UpdateState(self, s): + if s != self.s_goal: + self.rhs[s] = float("inf") + for x in self.get_neighbor(s): + self.rhs[s] = min(self.rhs[s], self.g[x] + self.cost(s, x)) + if s in self.OPEN: + self.OPEN.pop(s) + + if self.g[s] != self.rhs[s]: + if s not in self.CLOSED: + self.OPEN[s] = self.Key(s) + else: + self.INCONS[s] = 0 + + def Key(self, s): + if self.g[s] > self.rhs[s]: + return [self.rhs[s] + self.eps * self.h(self.s_start, s), self.rhs[s]] + else: + return [self.g[s] + self.h(self.s_start, s), self.g[s]] + + def TopKey(self): + """ + :return: return the min key and its value. + """ + + s = min(self.OPEN, key=self.OPEN.get) + return s, self.OPEN[s] + + def h(self, s_start, s_goal): + heuristic_type = self.heuristic_type # heuristic type + + if heuristic_type == "manhattan": + return abs(s_goal[0] - s_start[0]) + abs(s_goal[1] - s_start[1]) + else: + return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1]) + + def cost(self, s_start, s_goal): + """ + Calculate Cost for this motion + :param s_start: starting node + :param s_goal: end node + :return: Cost for this motion + :note: Cost function could be more complicate! + """ + + if self.is_collision(s_start, s_goal): + return float("inf") + + return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1]) + + def is_collision(self, s_start, s_end): + if s_start in self.obs or s_end in self.obs: + return True + + if s_start[0] != s_end[0] and s_start[1] != s_end[1]: + if s_end[0] - s_start[0] == s_start[1] - s_end[1]: + s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1])) + s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1])) + else: + s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1])) + s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1])) + + if s1 in self.obs or s2 in self.obs: + return True + + return False + + def get_neighbor(self, s): + nei_list = set() + for u in self.u_set: + s_next = tuple([s[i] + u[i] for i in range(2)]) + if s_next not in self.obs: + nei_list.add(s_next) + + return nei_list + + def extract_path(self): + """ + Extract the path based on the PARENT set. + :return: The planning path + """ + + path = [self.s_start] + s = self.s_start + + for k in range(100): + g_list = {} + for x in self.get_neighbor(s): + if not self.is_collision(s, x): + g_list[x] = self.g[x] + s = min(g_list, key=g_list.get) + path.append(s) + if s == self.s_goal: + break + + return list(path) + + def plot_path(self, path): + px = [x[0] for x in path] + py = [x[1] for x in path] + plt.plot(px, py, linewidth=2) + plt.plot(self.s_start[0], self.s_start[1], "bs") + plt.plot(self.s_goal[0], self.s_goal[1], "gs") + + def plot_visited(self): + self.count += 1 + + color = ['gainsboro', 'lightgray', 'silver', 'darkgray', + 'bisque', 'navajowhite', 'moccasin', 'wheat', + 'powderblue', 'skyblue', 'lightskyblue', 'cornflowerblue'] + + if self.count >= len(color) - 1: + self.count = 0 + + for x in self.visited: + plt.plot(x[0], x[1], marker='s', color=color[self.count]) + + +def main(): + s_start = (5, 5) + s_goal = (45, 25) + + dstar = ADStar(s_start, s_goal, 2.5, "euclidean") + dstar.run() + + +if __name__ == '__main__': + main() diff --git a/src/Search_2D/Astar.py b/src/Search_2D/Astar.py new file mode 100644 index 0000000..3c8e554 --- /dev/null +++ b/src/Search_2D/Astar.py @@ -0,0 +1,224 @@ +""" +A_star 2D +@author: huiming zhou +""" + +import os +import sys +import math +import heapq + +sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../PaddleClas-release-2.3") + +from Search_2D import plotting, env + + +class AStar: + """AStar set the cost + heuristics as the priority + """ + def __init__(self, s_start, s_goal, heuristic_type): + self.s_start = s_start + self.s_goal = s_goal + self.heuristic_type = heuristic_type + + self.Env = env.Env() # class Env + + self.u_set = self.Env.motions # feasible input set + self.obs = self.Env.obs # position of obstacles + + self.OPEN = [] # priority queue / OPEN set + self.CLOSED = [] # CLOSED set / VISITED order + self.PARENT = dict() # recorded parent + self.g = dict() # cost to come + + def searching(self): + """ + A_star Searching. + :return: path, visited order + """ + + self.PARENT[self.s_start] = self.s_start + self.g[self.s_start] = 0 + self.g[self.s_goal] = math.inf + heapq.heappush(self.OPEN, + (self.f_value(self.s_start), self.s_start)) + + while self.OPEN: + _, s = heapq.heappop(self.OPEN) + self.CLOSED.append(s) + + if s == self.s_goal: # stop condition + break + + for s_n in self.get_neighbor(s): + new_cost = self.g[s] + self.cost(s, s_n) + + if s_n not in self.g: + self.g[s_n] = math.inf + + if new_cost < self.g[s_n]: # conditions for updating Cost + self.g[s_n] = new_cost + self.PARENT[s_n] = s + heapq.heappush(self.OPEN, (self.f_value(s_n), s_n)) + + return self.extract_path(self.PARENT), self.CLOSED + + def searching_repeated_astar(self, e): + """ + repeated A*. + :param e: weight of A* + :return: path and visited order + """ + + path, visited = [], [] + + while e >= 1: + p_k, v_k = self.repeated_searching(self.s_start, self.s_goal, e) + path.append(p_k) + visited.append(v_k) + e -= 0.5 + + return path, visited + + def repeated_searching(self, s_start, s_goal, e): + """ + run A* with weight e. + :param s_start: starting state + :param s_goal: goal state + :param e: weight of a* + :return: path and visited order. + """ + + g = {s_start: 0, s_goal: float("inf")} + PARENT = {s_start: s_start} + OPEN = [] + CLOSED = [] + heapq.heappush(OPEN, + (g[s_start] + e * self.heuristic(s_start), s_start)) + + while OPEN: + _, s = heapq.heappop(OPEN) + CLOSED.append(s) + + if s == s_goal: + break + + for s_n in self.get_neighbor(s): + new_cost = g[s] + self.cost(s, s_n) + + if s_n not in g: + g[s_n] = math.inf + + if new_cost < g[s_n]: # conditions for updating Cost + g[s_n] = new_cost + PARENT[s_n] = s + heapq.heappush(OPEN, (g[s_n] + e * self.heuristic(s_n), s_n)) + + return self.extract_path(PARENT), CLOSED + + def get_neighbor(self, s): + """ + find neighbors of state s that not in obstacles. + :param s: state + :return: neighbors + """ + + return [(s[0] + u[0], s[1] + u[1]) for u in self.u_set] + + def cost(self, s_start, s_goal): + """ + Calculate Cost for this motion + :param s_start: starting node + :param s_goal: end node + :return: Cost for this motion + :note: Cost function could be more complicate! + """ + + if self.is_collision(s_start, s_goal): + return math.inf + + return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1]) + + def is_collision(self, s_start, s_end): + """ + check if the line segment (s_start, s_end) is collision. + :param s_start: start node + :param s_end: end node + :return: True: is collision / False: not collision + """ + + if s_start in self.obs or s_end in self.obs: + return True + + if s_start[0] != s_end[0] and s_start[1] != s_end[1]: + if s_end[0] - s_start[0] == s_start[1] - s_end[1]: + s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1])) + s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1])) + else: + s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1])) + s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1])) + + if s1 in self.obs or s2 in self.obs: + return True + + return False + + def f_value(self, s): + """ + f = g + h. (g: Cost to come, h: heuristic value) + :param s: current state + :return: f + """ + + return self.g[s] + self.heuristic(s) + + def extract_path(self, PARENT): + """ + Extract the path based on the PARENT set. + :return: The planning path + """ + + path = [self.s_goal] + s = self.s_goal + + while True: + s = PARENT[s] + path.append(s) + + if s == self.s_start: + break + + return list(path) + + def heuristic(self, s): + """ + Calculate heuristic. + :param s: current node (state) + :return: heuristic function value + """ + + heuristic_type = self.heuristic_type # heuristic type + goal = self.s_goal # goal node + + if heuristic_type == "manhattan": + return abs(goal[0] - s[0]) + abs(goal[1] - s[1]) + else: + return math.hypot(goal[0] - s[0], goal[1] - s[1]) + + +def main(): + s_start = (5, 5) + s_goal = (45, 25) + + astar = AStar(s_start, s_goal, "euclidean") + plot = plotting.Plotting(s_start, s_goal) + + path, visited = astar.searching() + plot.animation(path, visited, "A*") # animation + + # path, visited = astar.searching_repeated_astar(2.5) # initial weight e = 2.5 + # plot.animation_ara_star(path, visited, "Repeated A*") + + +if __name__ == '__main__': + main() diff --git a/src/Search_2D/Best_First.py b/src/Search_2D/Best_First.py new file mode 100644 index 0000000..0c85fba --- /dev/null +++ b/src/Search_2D/Best_First.py @@ -0,0 +1,68 @@ +""" +Best-First Searching +@author: huiming zhou +""" + +import os +import sys +import math +import heapq + +sys.path.append(os.path.dirname(os.path.abspath(__file__)) + + "/../../Search_based_Planning/") + +from Search_2D import plotting, env +from Search_2D.Astar import AStar + + +class BestFirst(AStar): + """BestFirst set the heuristics as the priority + """ + def searching(self): + """ + Breadth-first Searching. + :return: path, visited order + """ + + self.PARENT[self.s_start] = self.s_start + self.g[self.s_start] = 0 + self.g[self.s_goal] = math.inf + heapq.heappush(self.OPEN, + (self.heuristic(self.s_start), self.s_start)) + + while self.OPEN: + _, s = heapq.heappop(self.OPEN) + self.CLOSED.append(s) + + if s == self.s_goal: + break + + for s_n in self.get_neighbor(s): + new_cost = self.g[s] + self.cost(s, s_n) + + if s_n not in self.g: + self.g[s_n] = math.inf + + if new_cost < self.g[s_n]: # conditions for updating Cost + self.g[s_n] = new_cost + self.PARENT[s_n] = s + + # best first set the heuristics as the priority + heapq.heappush(self.OPEN, (self.heuristic(s_n), s_n)) + + return self.extract_path(self.PARENT), self.CLOSED + + +def main(): + s_start = (5, 5) + s_goal = (45, 25) + + BF = BestFirst(s_start, s_goal, 'euclidean') + plot = plotting.Plotting(s_start, s_goal) + + path, visited = BF.searching() + plot.animation(path, visited, "Best-first Searching") # animation + + +if __name__ == '__main__': + main() diff --git a/src/Search_2D/Bidirectional_a_star.py b/src/Search_2D/Bidirectional_a_star.py new file mode 100644 index 0000000..3580c1a --- /dev/null +++ b/src/Search_2D/Bidirectional_a_star.py @@ -0,0 +1,229 @@ +""" +Bidirectional_a_star 2D +@author: huiming zhou +""" + +import os +import sys +import math +import heapq + +sys.path.append(os.path.dirname(os.path.abspath(__file__)) + + "/../../Search_based_Planning/") + +from Search_2D import plotting, env + + +class BidirectionalAStar: + def __init__(self, s_start, s_goal, heuristic_type): + self.s_start = s_start + self.s_goal = s_goal + self.heuristic_type = heuristic_type + + self.Env = env.Env() # class Env + + self.u_set = self.Env.motions # feasible input set + self.obs = self.Env.obs # position of obstacles + + self.OPEN_fore = [] # OPEN set for forward searching + self.OPEN_back = [] # OPEN set for backward searching + self.CLOSED_fore = [] # CLOSED set for forward + self.CLOSED_back = [] # CLOSED set for backward + self.PARENT_fore = dict() # recorded parent for forward + self.PARENT_back = dict() # recorded parent for backward + self.g_fore = dict() # cost to come for forward + self.g_back = dict() # cost to come for backward + + def init(self): + """ + initialize parameters + """ + + self.g_fore[self.s_start] = 0.0 + self.g_fore[self.s_goal] = math.inf + self.g_back[self.s_goal] = 0.0 + self.g_back[self.s_start] = math.inf + self.PARENT_fore[self.s_start] = self.s_start + self.PARENT_back[self.s_goal] = self.s_goal + heapq.heappush(self.OPEN_fore, + (self.f_value_fore(self.s_start), self.s_start)) + heapq.heappush(self.OPEN_back, + (self.f_value_back(self.s_goal), self.s_goal)) + + def searching(self): + """ + Bidirectional A* + :return: connected path, visited order of forward, visited order of backward + """ + + self.init() + s_meet = self.s_start + + while self.OPEN_fore and self.OPEN_back: + # solve foreward-search + _, s_fore = heapq.heappop(self.OPEN_fore) + + if s_fore in self.PARENT_back: + s_meet = s_fore + break + + self.CLOSED_fore.append(s_fore) + + for s_n in self.get_neighbor(s_fore): + new_cost = self.g_fore[s_fore] + self.cost(s_fore, s_n) + + if s_n not in self.g_fore: + self.g_fore[s_n] = math.inf + + if new_cost < self.g_fore[s_n]: + self.g_fore[s_n] = new_cost + self.PARENT_fore[s_n] = s_fore + heapq.heappush(self.OPEN_fore, + (self.f_value_fore(s_n), s_n)) + + # solve backward-search + _, s_back = heapq.heappop(self.OPEN_back) + + if s_back in self.PARENT_fore: + s_meet = s_back + break + + self.CLOSED_back.append(s_back) + + for s_n in self.get_neighbor(s_back): + new_cost = self.g_back[s_back] + self.cost(s_back, s_n) + + if s_n not in self.g_back: + self.g_back[s_n] = math.inf + + if new_cost < self.g_back[s_n]: + self.g_back[s_n] = new_cost + self.PARENT_back[s_n] = s_back + heapq.heappush(self.OPEN_back, + (self.f_value_back(s_n), s_n)) + + return self.extract_path(s_meet), self.CLOSED_fore, self.CLOSED_back + + def get_neighbor(self, s): + """ + find neighbors of state s that not in obstacles. + :param s: state + :return: neighbors + """ + + return [(s[0] + u[0], s[1] + u[1]) for u in self.u_set] + + def extract_path(self, s_meet): + """ + extract path from start and goal + :param s_meet: meet point of bi-direction a* + :return: path + """ + + # extract path for foreward part + path_fore = [s_meet] + s = s_meet + + while True: + s = self.PARENT_fore[s] + path_fore.append(s) + if s == self.s_start: + break + + # extract path for backward part + path_back = [] + s = s_meet + + while True: + s = self.PARENT_back[s] + path_back.append(s) + if s == self.s_goal: + break + + return list(reversed(path_fore)) + list(path_back) + + def f_value_fore(self, s): + """ + forward searching: f = g + h. (g: Cost to come, h: heuristic value) + :param s: current state + :return: f + """ + + return self.g_fore[s] + self.h(s, self.s_goal) + + def f_value_back(self, s): + """ + backward searching: f = g + h. (g: Cost to come, h: heuristic value) + :param s: current state + :return: f + """ + + return self.g_back[s] + self.h(s, self.s_start) + + def h(self, s, goal): + """ + Calculate heuristic value. + :param s: current node (state) + :param goal: goal node (state) + :return: heuristic value + """ + + heuristic_type = self.heuristic_type + + if heuristic_type == "manhattan": + return abs(goal[0] - s[0]) + abs(goal[1] - s[1]) + else: + return math.hypot(goal[0] - s[0], goal[1] - s[1]) + + def cost(self, s_start, s_goal): + """ + Calculate Cost for this motion + :param s_start: starting node + :param s_goal: end node + :return: Cost for this motion + :note: Cost function could be more complicate! + """ + + if self.is_collision(s_start, s_goal): + return math.inf + + return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1]) + + def is_collision(self, s_start, s_end): + """ + check if the line segment (s_start, s_end) is collision. + :param s_start: start node + :param s_end: end node + :return: True: is collision / False: not collision + """ + + if s_start in self.obs or s_end in self.obs: + return True + + if s_start[0] != s_end[0] and s_start[1] != s_end[1]: + if s_end[0] - s_start[0] == s_start[1] - s_end[1]: + s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1])) + s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1])) + else: + s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1])) + s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1])) + + if s1 in self.obs or s2 in self.obs: + return True + + return False + + +def main(): + x_start = (5, 5) + x_goal = (45, 25) + + bastar = BidirectionalAStar(x_start, x_goal, "euclidean") + plot = plotting.Plotting(x_start, x_goal) + + path, visited_fore, visited_back = bastar.searching() + plot.animation_bi_astar(path, visited_fore, visited_back, "Bidirectional-A*") # animation + + +if __name__ == '__main__': + main() diff --git a/src/Search_2D/D_star.py b/src/Search_2D/D_star.py new file mode 100644 index 0000000..60b6c7e --- /dev/null +++ b/src/Search_2D/D_star.py @@ -0,0 +1,304 @@ +""" +D_star 2D +@author: huiming zhou +""" + +import os +import sys +import math +import matplotlib.pyplot as plt + +sys.path.append(os.path.dirname(os.path.abspath(__file__)) + + "/../../Search_based_Planning/") + +from Search_2D import plotting, env + + +class DStar: + def __init__(self, s_start, s_goal): + self.s_start, self.s_goal = s_start, s_goal + + self.Env = env.Env() + self.Plot = plotting.Plotting(self.s_start, self.s_goal) + + self.u_set = self.Env.motions + self.obs = self.Env.obs + self.x = self.Env.x_range + self.y = self.Env.y_range + + self.fig = plt.figure() + + self.OPEN = set() + self.t = dict() + self.PARENT = dict() + self.h = dict() + self.k = dict() + self.path = [] + self.visited = set() + self.count = 0 + + def init(self): + for i in range(self.Env.x_range): + for j in range(self.Env.y_range): + self.t[(i, j)] = 'NEW' + self.k[(i, j)] = 0.0 + self.h[(i, j)] = float("inf") + self.PARENT[(i, j)] = None + + self.h[self.s_goal] = 0.0 + + def run(self, s_start, s_end): + self.init() + self.insert(s_end, 0) + + while True: + self.process_state() + if self.t[s_start] == 'CLOSED': + break + + self.path = self.extract_path(s_start, s_end) + self.Plot.plot_grid("Dynamic A* (D*)") + self.plot_path(self.path) + self.fig.canvas.mpl_connect('button_press_event', self.on_press) + plt.show() + + def on_press(self, event): + x, y = event.xdata, event.ydata + if x < 0 or x > self.x - 1 or y < 0 or y > self.y - 1: + print("Please choose right area!") + else: + x, y = int(x), int(y) + if (x, y) not in self.obs: + print("Add obstacle at: s =", x, ",", "y =", y) + self.obs.add((x, y)) + self.Plot.update_obs(self.obs) + + s = self.s_start + self.visited = set() + self.count += 1 + + while s != self.s_goal: + if self.is_collision(s, self.PARENT[s]): + self.modify(s) + continue + s = self.PARENT[s] + + self.path = self.extract_path(self.s_start, self.s_goal) + + plt.cla() + self.Plot.plot_grid("Dynamic A* (D*)") + self.plot_visited(self.visited) + self.plot_path(self.path) + + self.fig.canvas.draw_idle() + + def extract_path(self, s_start, s_end): + path = [s_start] + s = s_start + while True: + s = self.PARENT[s] + path.append(s) + if s == s_end: + return path + + def process_state(self): + s = self.min_state() # get node in OPEN set with min k value + self.visited.add(s) + + if s is None: + return -1 # OPEN set is empty + + k_old = self.get_k_min() # record the min k value of this iteration (min path cost) + self.delete(s) # move state s from OPEN set to CLOSED set + + # k_min < h[s] --> s: RAISE state (increased cost) + if k_old < self.h[s]: + for s_n in self.get_neighbor(s): + if self.h[s_n] <= k_old and \ + self.h[s] > self.h[s_n] + self.cost(s_n, s): + + # update h_value and choose parent + self.PARENT[s] = s_n + self.h[s] = self.h[s_n] + self.cost(s_n, s) + + # s: k_min >= h[s] -- > s: LOWER state (cost reductions) + if k_old == self.h[s]: + for s_n in self.get_neighbor(s): + if self.t[s_n] == 'NEW' or \ + (self.PARENT[s_n] == s and self.h[s_n] != self.h[s] + self.cost(s, s_n)) or \ + (self.PARENT[s_n] != s and self.h[s_n] > self.h[s] + self.cost(s, s_n)): + + # Condition: + # 1) t[s_n] == 'NEW': not visited + # 2) s_n's parent: cost reduction + # 3) s_n find a better parent + self.PARENT[s_n] = s + self.insert(s_n, self.h[s] + self.cost(s, s_n)) + else: + for s_n in self.get_neighbor(s): + if self.t[s_n] == 'NEW' or \ + (self.PARENT[s_n] == s and self.h[s_n] != self.h[s] + self.cost(s, s_n)): + + # Condition: + # 1) t[s_n] == 'NEW': not visited + # 2) s_n's parent: cost reduction + self.PARENT[s_n] = s + self.insert(s_n, self.h[s] + self.cost(s, s_n)) + else: + if self.PARENT[s_n] != s and \ + self.h[s_n] > self.h[s] + self.cost(s, s_n): + + # Condition: LOWER happened in OPEN set (s), s should be explored again + self.insert(s, self.h[s]) + else: + if self.PARENT[s_n] != s and \ + self.h[s] > self.h[s_n] + self.cost(s_n, s) and \ + self.t[s_n] == 'CLOSED' and \ + self.h[s_n] > k_old: + + # Condition: LOWER happened in CLOSED set (s_n), s_n should be explored again + self.insert(s_n, self.h[s_n]) + + return self.get_k_min() + + def min_state(self): + """ + choose the node with the minimum k value in OPEN set. + :return: state + """ + + if not self.OPEN: + return None + + return min(self.OPEN, key=lambda x: self.k[x]) + + def get_k_min(self): + """ + calc the min k value for nodes in OPEN set. + :return: k value + """ + + if not self.OPEN: + return -1 + + return min([self.k[x] for x in self.OPEN]) + + def insert(self, s, h_new): + """ + insert node into OPEN set. + :param s: node + :param h_new: new or better cost to come value + """ + + if self.t[s] == 'NEW': + self.k[s] = h_new + elif self.t[s] == 'OPEN': + self.k[s] = min(self.k[s], h_new) + elif self.t[s] == 'CLOSED': + self.k[s] = min(self.h[s], h_new) + + self.h[s] = h_new + self.t[s] = 'OPEN' + self.OPEN.add(s) + + def delete(self, s): + """ + delete: move state s from OPEN set to CLOSED set. + :param s: state should be deleted + """ + + if self.t[s] == 'OPEN': + self.t[s] = 'CLOSED' + + self.OPEN.remove(s) + + def modify(self, s): + """ + start processing from state s. + :param s: is a node whose status is RAISE or LOWER. + """ + + self.modify_cost(s) + + while True: + k_min = self.process_state() + + if k_min >= self.h[s]: + break + + def modify_cost(self, s): + # if node in CLOSED set, put it into OPEN set. + # Since cost may be changed between s - s.parent, calc cost(s, s.p) again + + if self.t[s] == 'CLOSED': + self.insert(s, self.h[self.PARENT[s]] + self.cost(s, self.PARENT[s])) + + def get_neighbor(self, s): + nei_list = set() + + for u in self.u_set: + s_next = tuple([s[i] + u[i] for i in range(2)]) + if s_next not in self.obs: + nei_list.add(s_next) + + return nei_list + + def cost(self, s_start, s_goal): + """ + Calculate Cost for this motion + :param s_start: starting node + :param s_goal: end node + :return: Cost for this motion + :note: Cost function could be more complicate! + """ + + if self.is_collision(s_start, s_goal): + return float("inf") + + return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1]) + + def is_collision(self, s_start, s_end): + if s_start in self.obs or s_end in self.obs: + return True + + if s_start[0] != s_end[0] and s_start[1] != s_end[1]: + if s_end[0] - s_start[0] == s_start[1] - s_end[1]: + s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1])) + s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1])) + else: + s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1])) + s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1])) + + if s1 in self.obs or s2 in self.obs: + return True + + return False + + def plot_path(self, path): + px = [x[0] for x in path] + py = [x[1] for x in path] + plt.plot(px, py, linewidth=2) + plt.plot(self.s_start[0], self.s_start[1], "bs") + plt.plot(self.s_goal[0], self.s_goal[1], "gs") + + def plot_visited(self, visited): + color = ['gainsboro', 'lightgray', 'silver', 'darkgray', + 'bisque', 'navajowhite', 'moccasin', 'wheat', + 'powderblue', 'skyblue', 'lightskyblue', 'cornflowerblue'] + + if self.count >= len(color) - 1: + self.count = 0 + + for x in visited: + plt.plot(x[0], x[1], marker='s', color=color[self.count]) + + +def main(): + s_start = (5, 5) + s_goal = (45, 25) + dstar = DStar(s_start, s_goal) + dstar.run(s_start, s_goal) + + +if __name__ == '__main__': + main() diff --git a/src/Search_2D/D_star_Lite.py b/src/Search_2D/D_star_Lite.py new file mode 100644 index 0000000..4996be2 --- /dev/null +++ b/src/Search_2D/D_star_Lite.py @@ -0,0 +1,239 @@ +""" +D_star_Lite 2D +@author: huiming zhou +""" + +import os +import sys +import math +import matplotlib.pyplot as plt + +sys.path.append(os.path.dirname(os.path.abspath(__file__)) + + "/../../Search_based_Planning/") + +from Search_2D import plotting, env + + +class DStar: + def __init__(self, s_start, s_goal, heuristic_type): + self.s_start, self.s_goal = s_start, s_goal + self.heuristic_type = heuristic_type + + self.Env = env.Env() # class Env + self.Plot = plotting.Plotting(s_start, s_goal) + + self.u_set = self.Env.motions # feasible input set + self.obs = self.Env.obs # position of obstacles + self.x = self.Env.x_range + self.y = self.Env.y_range + + self.g, self.rhs, self.U = {}, {}, {} + self.km = 0 + + for i in range(1, self.Env.x_range - 1): + for j in range(1, self.Env.y_range - 1): + self.rhs[(i, j)] = float("inf") + self.g[(i, j)] = float("inf") + + self.rhs[self.s_goal] = 0.0 + self.U[self.s_goal] = self.CalculateKey(self.s_goal) + self.visited = set() + self.count = 0 + self.fig = plt.figure() + + def run(self): + self.Plot.plot_grid("D* Lite") + self.ComputePath() + self.plot_path(self.extract_path()) + self.fig.canvas.mpl_connect('button_press_event', self.on_press) + plt.show() + + def on_press(self, event): + x, y = event.xdata, event.ydata + if x < 0 or x > self.x - 1 or y < 0 or y > self.y - 1: + print("Please choose right area!") + else: + x, y = int(x), int(y) + print("Change position: s =", x, ",", "y =", y) + + s_curr = self.s_start + s_last = self.s_start + i = 0 + path = [self.s_start] + + while s_curr != self.s_goal: + s_list = {} + + for s in self.get_neighbor(s_curr): + s_list[s] = self.g[s] + self.cost(s_curr, s) + s_curr = min(s_list, key=s_list.get) + path.append(s_curr) + + if i < 1: + self.km += self.h(s_last, s_curr) + s_last = s_curr + if (x, y) not in self.obs: + self.obs.add((x, y)) + plt.plot(x, y, 'sk') + self.g[(x, y)] = float("inf") + self.rhs[(x, y)] = float("inf") + else: + self.obs.remove((x, y)) + plt.plot(x, y, marker='s', color='white') + self.UpdateVertex((x, y)) + for s in self.get_neighbor((x, y)): + self.UpdateVertex(s) + i += 1 + + self.count += 1 + self.visited = set() + self.ComputePath() + + self.plot_visited(self.visited) + self.plot_path(path) + self.fig.canvas.draw_idle() + + def ComputePath(self): + while True: + s, v = self.TopKey() + if v >= self.CalculateKey(self.s_start) and \ + self.rhs[self.s_start] == self.g[self.s_start]: + break + + k_old = v + self.U.pop(s) + self.visited.add(s) + + if k_old < self.CalculateKey(s): + self.U[s] = self.CalculateKey(s) + elif self.g[s] > self.rhs[s]: + self.g[s] = self.rhs[s] + for x in self.get_neighbor(s): + self.UpdateVertex(x) + else: + self.g[s] = float("inf") + self.UpdateVertex(s) + for x in self.get_neighbor(s): + self.UpdateVertex(x) + + def UpdateVertex(self, s): + if s != self.s_goal: + self.rhs[s] = float("inf") + for x in self.get_neighbor(s): + self.rhs[s] = min(self.rhs[s], self.g[x] + self.cost(s, x)) + if s in self.U: + self.U.pop(s) + + if self.g[s] != self.rhs[s]: + self.U[s] = self.CalculateKey(s) + + def CalculateKey(self, s): + return [min(self.g[s], self.rhs[s]) + self.h(self.s_start, s) + self.km, + min(self.g[s], self.rhs[s])] + + def TopKey(self): + """ + :return: return the min key and its value. + """ + + s = min(self.U, key=self.U.get) + return s, self.U[s] + + def h(self, s_start, s_goal): + heuristic_type = self.heuristic_type # heuristic type + + if heuristic_type == "manhattan": + return abs(s_goal[0] - s_start[0]) + abs(s_goal[1] - s_start[1]) + else: + return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1]) + + def cost(self, s_start, s_goal): + """ + Calculate Cost for this motion + :param s_start: starting node + :param s_goal: end node + :return: Cost for this motion + :note: Cost function could be more complicate! + """ + + if self.is_collision(s_start, s_goal): + return float("inf") + + return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1]) + + def is_collision(self, s_start, s_end): + if s_start in self.obs or s_end in self.obs: + return True + + if s_start[0] != s_end[0] and s_start[1] != s_end[1]: + if s_end[0] - s_start[0] == s_start[1] - s_end[1]: + s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1])) + s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1])) + else: + s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1])) + s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1])) + + if s1 in self.obs or s2 in self.obs: + return True + + return False + + def get_neighbor(self, s): + nei_list = set() + for u in self.u_set: + s_next = tuple([s[i] + u[i] for i in range(2)]) + if s_next not in self.obs: + nei_list.add(s_next) + + return nei_list + + def extract_path(self): + """ + Extract the path based on the PARENT set. + :return: The planning path + """ + + path = [self.s_start] + s = self.s_start + + for k in range(100): + g_list = {} + for x in self.get_neighbor(s): + if not self.is_collision(s, x): + g_list[x] = self.g[x] + s = min(g_list, key=g_list.get) + path.append(s) + if s == self.s_goal: + break + + return list(path) + + def plot_path(self, path): + px = [x[0] for x in path] + py = [x[1] for x in path] + plt.plot(px, py, linewidth=2) + plt.plot(self.s_start[0], self.s_start[1], "bs") + plt.plot(self.s_goal[0], self.s_goal[1], "gs") + + def plot_visited(self, visited): + color = ['gainsboro', 'lightgray', 'silver', 'darkgray', + 'bisque', 'navajowhite', 'moccasin', 'wheat', + 'powderblue', 'skyblue', 'lightskyblue', 'cornflowerblue'] + + if self.count >= len(color) - 1: + self.count = 0 + + for x in visited: + plt.plot(x[0], x[1], marker='s', color=color[self.count]) + + +def main(): + s_start = (5, 5) + s_goal = (45, 25) + + dstar = DStar(s_start, s_goal, "euclidean") + dstar.run() + + +if __name__ == '__main__': + main() diff --git a/src/Search_2D/Dijkstra.py b/src/Search_2D/Dijkstra.py new file mode 100644 index 0000000..e5e7b68 --- /dev/null +++ b/src/Search_2D/Dijkstra.py @@ -0,0 +1,69 @@ +""" +Dijkstra 2D +@author: huiming zhou +""" + +import os +import sys +import math +import heapq + +sys.path.append(os.path.dirname(os.path.abspath(__file__)) + + "/../../Search_based_Planning/") + +from Search_2D import plotting, env + +from Search_2D.Astar import AStar + + +class Dijkstra(AStar): + """Dijkstra set the cost as the priority + """ + def searching(self): + """ + Breadth-first Searching. + :return: path, visited order + """ + + self.PARENT[self.s_start] = self.s_start + self.g[self.s_start] = 0 + self.g[self.s_goal] = math.inf + heapq.heappush(self.OPEN, + (0, self.s_start)) + + while self.OPEN: + _, s = heapq.heappop(self.OPEN) + self.CLOSED.append(s) + + if s == self.s_goal: + break + + for s_n in self.get_neighbor(s): + new_cost = self.g[s] + self.cost(s, s_n) + + if s_n not in self.g: + self.g[s_n] = math.inf + + if new_cost < self.g[s_n]: # conditions for updating Cost + self.g[s_n] = new_cost + self.PARENT[s_n] = s + + # best first set the heuristics as the priority + heapq.heappush(self.OPEN, (new_cost, s_n)) + + return self.extract_path(self.PARENT), self.CLOSED + + +def main(): + s_start = (5, 5) + s_goal = (45, 25) + + dijkstra = Dijkstra(s_start, s_goal, 'None') + plot = plotting.Plotting(s_start, s_goal) + + path, visited = dijkstra.searching() + plot.animation(path, visited, "Dijkstra's") # animation generate + + +if __name__ == '__main__': + main() diff --git a/src/Search_2D/LPAstar.py b/src/Search_2D/LPAstar.py new file mode 100644 index 0000000..4fd70ae --- /dev/null +++ b/src/Search_2D/LPAstar.py @@ -0,0 +1,256 @@ +""" +LPA_star 2D +@author: huiming zhou +""" + +import os +import sys +import math +import matplotlib.pyplot as plt + +sys.path.append(os.path.dirname(os.path.abspath(__file__)) + + "/../../Search_based_Planning/") + +from Search_2D import plotting, env + + +class LPAStar: + def __init__(self, s_start, s_goal, heuristic_type): + self.s_start, self.s_goal = s_start, s_goal + self.heuristic_type = heuristic_type + + self.Env = env.Env() + self.Plot = plotting.Plotting(self.s_start, self.s_goal) + + self.u_set = self.Env.motions + self.obs = self.Env.obs + self.x = self.Env.x_range + self.y = self.Env.y_range + + self.g, self.rhs, self.U = {}, {}, {} + + for i in range(self.Env.x_range): + for j in range(self.Env.y_range): + self.rhs[(i, j)] = float("inf") + self.g[(i, j)] = float("inf") + + self.rhs[self.s_start] = 0 + self.U[self.s_start] = self.CalculateKey(self.s_start) + self.visited = set() + self.count = 0 + + self.fig = plt.figure() + + def run(self): + self.Plot.plot_grid("Lifelong Planning A*") + + self.ComputeShortestPath() + self.plot_path(self.extract_path()) + self.fig.canvas.mpl_connect('button_press_event', self.on_press) + + plt.show() + + def on_press(self, event): + x, y = event.xdata, event.ydata + if x < 0 or x > self.x - 1 or y < 0 or y > self.y - 1: + print("Please choose right area!") + else: + x, y = int(x), int(y) + print("Change position: s =", x, ",", "y =", y) + + self.visited = set() + self.count += 1 + + if (x, y) not in self.obs: + self.obs.add((x, y)) + else: + self.obs.remove((x, y)) + self.UpdateVertex((x, y)) + + self.Plot.update_obs(self.obs) + + for s_n in self.get_neighbor((x, y)): + self.UpdateVertex(s_n) + + self.ComputeShortestPath() + + plt.cla() + self.Plot.plot_grid("Lifelong Planning A*") + self.plot_visited(self.visited) + self.plot_path(self.extract_path()) + self.fig.canvas.draw_idle() + + def ComputeShortestPath(self): + while True: + s, v = self.TopKey() + + if v >= self.CalculateKey(self.s_goal) and \ + self.rhs[self.s_goal] == self.g[self.s_goal]: + break + + self.U.pop(s) + self.visited.add(s) + + if self.g[s] > self.rhs[s]: + + # Condition: over-consistent (eg: deleted obstacles) + # So, rhs[s] decreased -- > rhs[s] < g[s] + self.g[s] = self.rhs[s] + else: + + # Condition: # under-consistent (eg: added obstacles) + # So, rhs[s] increased --> rhs[s] > g[s] + self.g[s] = float("inf") + self.UpdateVertex(s) + + for s_n in self.get_neighbor(s): + self.UpdateVertex(s_n) + + def UpdateVertex(self, s): + """ + update the status and the current cost to come of state s. + :param s: state s + """ + + if s != self.s_start: + + # Condition: cost of parent of s changed + # Since we do not record the children of a state, we need to enumerate its neighbors + self.rhs[s] = min(self.g[s_n] + self.cost(s_n, s) + for s_n in self.get_neighbor(s)) + + if s in self.U: + self.U.pop(s) + + if self.g[s] != self.rhs[s]: + + # Condition: current cost to come is different to that of last time + # state s should be added into OPEN set (set U) + self.U[s] = self.CalculateKey(s) + + def TopKey(self): + """ + :return: return the min key and its value. + """ + + s = min(self.U, key=self.U.get) + + return s, self.U[s] + + def CalculateKey(self, s): + + return [min(self.g[s], self.rhs[s]) + self.h(s), + min(self.g[s], self.rhs[s])] + + def get_neighbor(self, s): + """ + find neighbors of state s that not in obstacles. + :param s: state + :return: neighbors + """ + + s_list = set() + + for u in self.u_set: + s_next = tuple([s[i] + u[i] for i in range(2)]) + if s_next not in self.obs: + s_list.add(s_next) + + return s_list + + def h(self, s): + """ + Calculate heuristic. + :param s: current node (state) + :return: heuristic function value + """ + + heuristic_type = self.heuristic_type # heuristic type + goal = self.s_goal # goal node + + if heuristic_type == "manhattan": + return abs(goal[0] - s[0]) + abs(goal[1] - s[1]) + else: + return math.hypot(goal[0] - s[0], goal[1] - s[1]) + + def cost(self, s_start, s_goal): + """ + Calculate Cost for this motion + :param s_start: starting node + :param s_goal: end node + :return: Cost for this motion + :note: Cost function could be more complicate! + """ + + if self.is_collision(s_start, s_goal): + return float("inf") + + return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1]) + + def is_collision(self, s_start, s_end): + if s_start in self.obs or s_end in self.obs: + return True + + if s_start[0] != s_end[0] and s_start[1] != s_end[1]: + if s_end[0] - s_start[0] == s_start[1] - s_end[1]: + s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1])) + s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1])) + else: + s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1])) + s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1])) + + if s1 in self.obs or s2 in self.obs: + return True + + return False + + def extract_path(self): + """ + Extract the path based on the PARENT set. + :return: The planning path + """ + + path = [self.s_goal] + s = self.s_goal + + for k in range(100): + g_list = {} + for x in self.get_neighbor(s): + if not self.is_collision(s, x): + g_list[x] = self.g[x] + s = min(g_list, key=g_list.get) + path.append(s) + if s == self.s_start: + break + + return list(reversed(path)) + + def plot_path(self, path): + px = [x[0] for x in path] + py = [x[1] for x in path] + plt.plot(px, py, linewidth=2) + plt.plot(self.s_start[0], self.s_start[1], "bs") + plt.plot(self.s_goal[0], self.s_goal[1], "gs") + + def plot_visited(self, visited): + color = ['gainsboro', 'lightgray', 'silver', 'darkgray', + 'bisque', 'navajowhite', 'moccasin', 'wheat', + 'powderblue', 'skyblue', 'lightskyblue', 'cornflowerblue'] + + if self.count >= len(color) - 1: + self.count = 0 + + for x in visited: + plt.plot(x[0], x[1], marker='s', color=color[self.count]) + + +def main(): + x_start = (5, 5) + x_goal = (45, 25) + + lpastar = LPAStar(x_start, x_goal, "Euclidean") + lpastar.run() + + +if __name__ == '__main__': + main() diff --git a/src/Search_2D/LRTAstar.py b/src/Search_2D/LRTAstar.py new file mode 100644 index 0000000..108903b --- /dev/null +++ b/src/Search_2D/LRTAstar.py @@ -0,0 +1,230 @@ +""" +LRTA_star 2D (Learning Real-time A*) +@author: huiming zhou +""" + +import os +import sys +import copy +import math + +sys.path.append(os.path.dirname(os.path.abspath(__file__)) + + "/../../Search_based_Planning/") + +from Search_2D import queue, plotting, env + + +class LrtAStarN: + def __init__(self, s_start, s_goal, N, heuristic_type): + self.s_start, self.s_goal = s_start, s_goal + self.heuristic_type = heuristic_type + + self.Env = env.Env() + + self.u_set = self.Env.motions # feasible input set + self.obs = self.Env.obs # position of obstacles + + self.N = N # number of expand nodes each iteration + self.visited = [] # order of visited nodes in planning + self.path = [] # path of each iteration + self.h_table = {} # h_value table + + def init(self): + """ + initialize the h_value of all nodes in the environment. + it is a global table. + """ + + for i in range(self.Env.x_range): + for j in range(self.Env.y_range): + self.h_table[(i, j)] = self.h((i, j)) + + def searching(self): + self.init() + s_start = self.s_start # initialize start node + + while True: + OPEN, CLOSED = self.AStar(s_start, self.N) # OPEN, CLOSED sets in each iteration + + if OPEN == "FOUND": # reach the goal node + self.path.append(CLOSED) + break + + h_value = self.iteration(CLOSED) # h_value table of CLOSED nodes + + for x in h_value: + self.h_table[x] = h_value[x] + + s_start, path_k = self.extract_path_in_CLOSE(s_start, h_value) # x_init -> expected node in OPEN set + self.path.append(path_k) + + def extract_path_in_CLOSE(self, s_start, h_value): + path = [s_start] + s = s_start + + while True: + h_list = {} + + for s_n in self.get_neighbor(s): + if s_n in h_value: + h_list[s_n] = h_value[s_n] + else: + h_list[s_n] = self.h_table[s_n] + + s_key = min(h_list, key=h_list.get) # move to the smallest node with min h_value + path.append(s_key) # generate path + s = s_key # use end of this iteration as the start of next + + if s_key not in h_value: # reach the expected node in OPEN set + return s_key, path + + def iteration(self, CLOSED): + h_value = {} + + for s in CLOSED: + h_value[s] = float("inf") # initialize h_value of CLOSED nodes + + while True: + h_value_rec = copy.deepcopy(h_value) + for s in CLOSED: + h_list = [] + for s_n in self.get_neighbor(s): + if s_n not in CLOSED: + h_list.append(self.cost(s, s_n) + self.h_table[s_n]) + else: + h_list.append(self.cost(s, s_n) + h_value[s_n]) + h_value[s] = min(h_list) # update h_value of current node + + if h_value == h_value_rec: # h_value table converged + return h_value + + def AStar(self, x_start, N): + OPEN = queue.QueuePrior() # OPEN set + OPEN.put(x_start, self.h(x_start)) + CLOSED = [] # CLOSED set + g_table = {x_start: 0, self.s_goal: float("inf")} # Cost to come + PARENT = {x_start: x_start} # relations + count = 0 # counter + + while not OPEN.empty(): + count += 1 + s = OPEN.get() + CLOSED.append(s) + + if s == self.s_goal: # reach the goal node + self.visited.append(CLOSED) + return "FOUND", self.extract_path(x_start, PARENT) + + for s_n in self.get_neighbor(s): + if s_n not in CLOSED: + new_cost = g_table[s] + self.cost(s, s_n) + if s_n not in g_table: + g_table[s_n] = float("inf") + if new_cost < g_table[s_n]: # conditions for updating Cost + g_table[s_n] = new_cost + PARENT[s_n] = s + OPEN.put(s_n, g_table[s_n] + self.h_table[s_n]) + + if count == N: # expand needed CLOSED nodes + break + + self.visited.append(CLOSED) # visited nodes in each iteration + + return OPEN, CLOSED + + def get_neighbor(self, s): + """ + find neighbors of state s that not in obstacles. + :param s: state + :return: neighbors + """ + + s_list = [] + + for u in self.u_set: + s_next = tuple([s[i] + u[i] for i in range(2)]) + if s_next not in self.obs: + s_list.append(s_next) + + return s_list + + def extract_path(self, x_start, parent): + """ + Extract the path based on the relationship of nodes. + + :return: The planning path + """ + + path_back = [self.s_goal] + x_current = self.s_goal + + while True: + x_current = parent[x_current] + path_back.append(x_current) + + if x_current == x_start: + break + + return list(reversed(path_back)) + + def h(self, s): + """ + Calculate heuristic. + :param s: current node (state) + :return: heuristic function value + """ + + heuristic_type = self.heuristic_type # heuristic type + goal = self.s_goal # goal node + + if heuristic_type == "manhattan": + return abs(goal[0] - s[0]) + abs(goal[1] - s[1]) + else: + return math.hypot(goal[0] - s[0], goal[1] - s[1]) + + def cost(self, s_start, s_goal): + """ + Calculate Cost for this motion + :param s_start: starting node + :param s_goal: end node + :return: Cost for this motion + :note: Cost function could be more complicate! + """ + + if self.is_collision(s_start, s_goal): + return float("inf") + + return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1]) + + def is_collision(self, s_start, s_end): + if s_start in self.obs or s_end in self.obs: + return True + + if s_start[0] != s_end[0] and s_start[1] != s_end[1]: + if s_end[0] - s_start[0] == s_start[1] - s_end[1]: + s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1])) + s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1])) + else: + s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1])) + s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1])) + + if s1 in self.obs or s2 in self.obs: + return True + + return False + + +def main(): + s_start = (10, 5) + s_goal = (45, 25) + + lrta = LrtAStarN(s_start, s_goal, 250, "euclidean") + plot = plotting.Plotting(s_start, s_goal) + + lrta.searching() + plot.animation_lrta(lrta.path, lrta.visited, + "Learning Real-time A* (LRTA*)") + + +if __name__ == '__main__': + main() diff --git a/src/Search_2D/RTAAStar.py b/src/Search_2D/RTAAStar.py new file mode 100644 index 0000000..de0a615 --- /dev/null +++ b/src/Search_2D/RTAAStar.py @@ -0,0 +1,237 @@ +""" +RTAAstar 2D (Real-time Adaptive A*) +@author: huiming zhou +""" + +import os +import sys +import copy +import math + +sys.path.append(os.path.dirname(os.path.abspath(__file__)) + + "/../../Search_based_Planning/") + +from Search_2D import queue, plotting, env + + +class RTAAStar: + def __init__(self, s_start, s_goal, N, heuristic_type): + self.s_start, self.s_goal = s_start, s_goal + self.heuristic_type = heuristic_type + + self.Env = env.Env() + + self.u_set = self.Env.motions # feasible input set + self.obs = self.Env.obs # position of obstacles + + self.N = N # number of expand nodes each iteration + self.visited = [] # order of visited nodes in planning + self.path = [] # path of each iteration + self.h_table = {} # h_value table + + def init(self): + """ + initialize the h_value of all nodes in the environment. + it is a global table. + """ + + for i in range(self.Env.x_range): + for j in range(self.Env.y_range): + self.h_table[(i, j)] = self.h((i, j)) + + def searching(self): + self.init() + s_start = self.s_start # initialize start node + + while True: + OPEN, CLOSED, g_table, PARENT = \ + self.Astar(s_start, self.N) + + if OPEN == "FOUND": # reach the goal node + self.path.append(CLOSED) + break + + s_next, h_value = self.cal_h_value(OPEN, CLOSED, g_table, PARENT) + + for x in h_value: + self.h_table[x] = h_value[x] + + s_start, path_k = self.extract_path_in_CLOSE(s_start, s_next, h_value) + self.path.append(path_k) + + def cal_h_value(self, OPEN, CLOSED, g_table, PARENT): + v_open = {} + h_value = {} + for (_, x) in OPEN.enumerate(): + v_open[x] = g_table[PARENT[x]] + 1 + self.h_table[x] + s_open = min(v_open, key=v_open.get) + f_min = v_open[s_open] + for x in CLOSED: + h_value[x] = f_min - g_table[x] + + return s_open, h_value + + def iteration(self, CLOSED): + h_value = {} + + for s in CLOSED: + h_value[s] = float("inf") # initialize h_value of CLOSED nodes + + while True: + h_value_rec = copy.deepcopy(h_value) + for s in CLOSED: + h_list = [] + for s_n in self.get_neighbor(s): + if s_n not in CLOSED: + h_list.append(self.cost(s, s_n) + self.h_table[s_n]) + else: + h_list.append(self.cost(s, s_n) + h_value[s_n]) + h_value[s] = min(h_list) # update h_value of current node + + if h_value == h_value_rec: # h_value table converged + return h_value + + def Astar(self, x_start, N): + OPEN = queue.QueuePrior() # OPEN set + OPEN.put(x_start, self.h_table[x_start]) + CLOSED = [] # CLOSED set + g_table = {x_start: 0, self.s_goal: float("inf")} # Cost to come + PARENT = {x_start: x_start} # relations + count = 0 # counter + + while not OPEN.empty(): + count += 1 + s = OPEN.get() + CLOSED.append(s) + + if s == self.s_goal: # reach the goal node + self.visited.append(CLOSED) + return "FOUND", self.extract_path(x_start, PARENT), [], [] + + for s_n in self.get_neighbor(s): + if s_n not in CLOSED: + new_cost = g_table[s] + self.cost(s, s_n) + if s_n not in g_table: + g_table[s_n] = float("inf") + if new_cost < g_table[s_n]: # conditions for updating Cost + g_table[s_n] = new_cost + PARENT[s_n] = s + OPEN.put(s_n, g_table[s_n] + self.h_table[s_n]) + + if count == N: # expand needed CLOSED nodes + break + + self.visited.append(CLOSED) # visited nodes in each iteration + + return OPEN, CLOSED, g_table, PARENT + + def get_neighbor(self, s): + """ + find neighbors of state s that not in obstacles. + :param s: state + :return: neighbors + """ + + s_list = set() + + for u in self.u_set: + s_next = tuple([s[i] + u[i] for i in range(2)]) + if s_next not in self.obs: + s_list.add(s_next) + + return s_list + + def extract_path_in_CLOSE(self, s_end, s_start, h_value): + path = [s_start] + s = s_start + + while True: + h_list = {} + for s_n in self.get_neighbor(s): + if s_n in h_value: + h_list[s_n] = h_value[s_n] + s_key = max(h_list, key=h_list.get) # move to the smallest node with min h_value + path.append(s_key) # generate path + s = s_key # use end of this iteration as the start of next + + if s_key == s_end: # reach the expected node in OPEN set + return s_start, list(reversed(path)) + + def extract_path(self, x_start, parent): + """ + Extract the path based on the relationship of nodes. + :return: The planning path + """ + + path = [self.s_goal] + s = self.s_goal + + while True: + s = parent[s] + path.append(s) + if s == x_start: + break + + return list(reversed(path)) + + def h(self, s): + """ + Calculate heuristic. + :param s: current node (state) + :return: heuristic function value + """ + + heuristic_type = self.heuristic_type # heuristic type + goal = self.s_goal # goal node + + if heuristic_type == "manhattan": + return abs(goal[0] - s[0]) + abs(goal[1] - s[1]) + else: + return math.hypot(goal[0] - s[0], goal[1] - s[1]) + + def cost(self, s_start, s_goal): + """ + Calculate Cost for this motion + :param s_start: starting node + :param s_goal: end node + :return: Cost for this motion + :note: Cost function could be more complicate! + """ + + if self.is_collision(s_start, s_goal): + return float("inf") + + return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1]) + + def is_collision(self, s_start, s_end): + if s_start in self.obs or s_end in self.obs: + return True + + if s_start[0] != s_end[0] and s_start[1] != s_end[1]: + if s_end[0] - s_start[0] == s_start[1] - s_end[1]: + s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1])) + s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1])) + else: + s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1])) + s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1])) + + if s1 in self.obs or s2 in self.obs: + return True + + return False + + +def main(): + s_start = (10, 5) + s_goal = (45, 25) + + rtaa = RTAAStar(s_start, s_goal, 240, "euclidean") + plot = plotting.Plotting(s_start, s_goal) + + rtaa.searching() + plot.animation_lrta(rtaa.path, rtaa.visited, + "Real-time Adaptive A* (RTAA*)") + + +if __name__ == '__main__': + main() diff --git a/src/Search_2D/__pycache__/Astar.cpython-38.pyc b/src/Search_2D/__pycache__/Astar.cpython-38.pyc new file mode 100644 index 0000000..7d90ba3 Binary files /dev/null and b/src/Search_2D/__pycache__/Astar.cpython-38.pyc differ diff --git a/src/Search_2D/__pycache__/env.cpython-37.pyc b/src/Search_2D/__pycache__/env.cpython-37.pyc new file mode 100644 index 0000000..945aa4d Binary files /dev/null and b/src/Search_2D/__pycache__/env.cpython-37.pyc differ diff --git a/src/Search_2D/__pycache__/env.cpython-38.pyc b/src/Search_2D/__pycache__/env.cpython-38.pyc new file mode 100644 index 0000000..e45c75b Binary files /dev/null and b/src/Search_2D/__pycache__/env.cpython-38.pyc differ diff --git a/src/Search_2D/__pycache__/env.cpython-39.pyc b/src/Search_2D/__pycache__/env.cpython-39.pyc new file mode 100644 index 0000000..4776345 Binary files /dev/null and b/src/Search_2D/__pycache__/env.cpython-39.pyc differ diff --git a/src/Search_2D/__pycache__/plotting.cpython-37.pyc b/src/Search_2D/__pycache__/plotting.cpython-37.pyc new file mode 100644 index 0000000..8a41db2 Binary files /dev/null and b/src/Search_2D/__pycache__/plotting.cpython-37.pyc differ diff --git a/src/Search_2D/__pycache__/plotting.cpython-38.pyc b/src/Search_2D/__pycache__/plotting.cpython-38.pyc new file mode 100644 index 0000000..5e8cff3 Binary files /dev/null and b/src/Search_2D/__pycache__/plotting.cpython-38.pyc differ diff --git a/src/Search_2D/__pycache__/plotting.cpython-39.pyc b/src/Search_2D/__pycache__/plotting.cpython-39.pyc new file mode 100644 index 0000000..c381ea6 Binary files /dev/null and b/src/Search_2D/__pycache__/plotting.cpython-39.pyc differ diff --git a/src/Search_2D/__pycache__/queue.cpython-37.pyc b/src/Search_2D/__pycache__/queue.cpython-37.pyc new file mode 100644 index 0000000..6c5f684 Binary files /dev/null and b/src/Search_2D/__pycache__/queue.cpython-37.pyc differ diff --git a/src/Search_2D/__pycache__/queue.cpython-38.pyc b/src/Search_2D/__pycache__/queue.cpython-38.pyc new file mode 100644 index 0000000..69c46c6 Binary files /dev/null and b/src/Search_2D/__pycache__/queue.cpython-38.pyc differ diff --git a/src/Search_2D/bfs.py b/src/Search_2D/bfs.py new file mode 100644 index 0000000..881e7ff --- /dev/null +++ b/src/Search_2D/bfs.py @@ -0,0 +1,69 @@ +""" +Breadth-first Searching_2D (BFS) +@author: huiming zhou +""" + +import os +import sys +from collections import deque + +sys.path.append(os.path.dirname(os.path.abspath(__file__)) + + "/../../Search_based_Planning/") + +from Search_2D import plotting, env +from Search_2D.Astar import AStar +import math +import heapq + +class BFS(AStar): + """BFS add the new visited node in the end of the openset + """ + def searching(self): + """ + Breadth-first Searching. + :return: path, visited order + """ + + self.PARENT[self.s_start] = self.s_start + self.g[self.s_start] = 0 + self.g[self.s_goal] = math.inf + heapq.heappush(self.OPEN, + (0, self.s_start)) + + while self.OPEN: + _, s = heapq.heappop(self.OPEN) + self.CLOSED.append(s) + + if s == self.s_goal: + break + + for s_n in self.get_neighbor(s): + new_cost = self.g[s] + self.cost(s, s_n) + + if s_n not in self.g: + self.g[s_n] = math.inf + + if new_cost < self.g[s_n]: # conditions for updating Cost + self.g[s_n] = new_cost + self.PARENT[s_n] = s + + # bfs, add new node to the end of the openset + prior = self.OPEN[-1][0]+1 if len(self.OPEN)>0 else 0 + heapq.heappush(self.OPEN, (prior, s_n)) + + return self.extract_path(self.PARENT), self.CLOSED + + +def main(): + s_start = (5, 5) + s_goal = (45, 25) + + bfs = BFS(s_start, s_goal, 'None') + plot = plotting.Plotting(s_start, s_goal) + + path, visited = bfs.searching() + plot.animation(path, visited, "Breadth-first Searching (BFS)") + + +if __name__ == '__main__': + main() diff --git a/src/Search_2D/dfs.py b/src/Search_2D/dfs.py new file mode 100644 index 0000000..3b30b03 --- /dev/null +++ b/src/Search_2D/dfs.py @@ -0,0 +1,65 @@ + +import os +import sys +import math +import heapq + +sys.path.append(os.path.dirname(os.path.abspath(__file__)) + + "/../../Search_based_Planning/") + +from Search_2D import plotting, env +from Search_2D.Astar import AStar + +class DFS(AStar): + """DFS add the new visited node in the front of the openset + """ + def searching(self): + """ + Breadth-first Searching. + :return: path, visited order + """ + + self.PARENT[self.s_start] = self.s_start + self.g[self.s_start] = 0 + self.g[self.s_goal] = math.inf + heapq.heappush(self.OPEN, + (0, self.s_start)) + + while self.OPEN: + _, s = heapq.heappop(self.OPEN) + self.CLOSED.append(s) + + if s == self.s_goal: + break + + for s_n in self.get_neighbor(s): + new_cost = self.g[s] + self.cost(s, s_n) + + if s_n not in self.g: + self.g[s_n] = math.inf + + if new_cost < self.g[s_n]: # conditions for updating Cost + self.g[s_n] = new_cost + self.PARENT[s_n] = s + + # dfs, add new node to the front of the openset + prior = self.OPEN[0][0]-1 if len(self.OPEN)>0 else 0 + heapq.heappush(self.OPEN, (prior, s_n)) + + return self.extract_path(self.PARENT), self.CLOSED + + +def main(): + s_start = (5, 5) + s_goal = (45, 25) + + dfs = DFS(s_start, s_goal, 'None') + plot = plotting.Plotting(s_start, s_goal) + + path, visited = dfs.searching() + visited = list(dict.fromkeys(visited)) + plot.animation(path, visited, "Depth-first Searching (DFS)") # animation + + +if __name__ == '__main__': + main() diff --git a/src/Search_2D/env.py b/src/Search_2D/env.py new file mode 100644 index 0000000..9523c98 --- /dev/null +++ b/src/Search_2D/env.py @@ -0,0 +1,52 @@ +""" +Env 2D +@author: huiming zhou +""" + + +class Env: + def __init__(self): + self.x_range = 51 # size of background + self.y_range = 31 + self.motions = [(-1, 0), (-1, 1), (0, 1), (1, 1), + (1, 0), (1, -1), (0, -1), (-1, -1)] + self.obs = self.obs_map() + + def update_obs(self, obs): + self.obs = obs + + def obs_map(self): + """ + Initialize obstacles' positions + :return: map of obstacles + """ + + x = self.x_range #51 + y = self.y_range #31 + obs = set() + #画上下边框 + for i in range(x): + obs.add((i, 0)) + for i in range(x): + obs.add((i, y - 1)) + #画左右边框 + for i in range(y): + obs.add((0, i)) + for i in range(y): + obs.add((x - 1, i)) + + for i in range(2, 21): + obs.add((i, 15)) + for i in range(15): + obs.add((20, i)) + + for i in range(15, 30): + obs.add((30, i)) + for i in range(16): + obs.add((40, i)) + + return obs + +# if __name__ == '__main__': +# a = Env() +# print(a.obs) \ No newline at end of file diff --git a/src/Search_2D/plotting.py b/src/Search_2D/plotting.py new file mode 100644 index 0000000..1cf98a3 --- /dev/null +++ b/src/Search_2D/plotting.py @@ -0,0 +1,165 @@ +""" +Plot tools 2D +@author: huiming zhou +""" + +import os +import sys +import matplotlib.pyplot as plt + +sys.path.append(os.path.dirname(os.path.abspath(__file__)) + + "/../../Search_based_Planning/") + +from Search_2D import env + + +class Plotting: + def __init__(self, xI, xG): + self.xI, self.xG = xI, xG + self.env = env.Env() + self.obs = self.env.obs_map() + + def update_obs(self, obs): + self.obs = obs + + def animation(self, path, visited, name): + self.plot_grid(name) + self.plot_visited(visited) + self.plot_path(path) + plt.show() + + def animation_lrta(self, path, visited, name): + self.plot_grid(name) + cl = self.color_list_2() + path_combine = [] + + for k in range(len(path)): + self.plot_visited(visited[k], cl[k]) + plt.pause(0.2) + self.plot_path(path[k]) + path_combine += path[k] + plt.pause(0.2) + if self.xI in path_combine: + path_combine.remove(self.xI) + self.plot_path(path_combine) + plt.show() + + def animation_ara_star(self, path, visited, name): + self.plot_grid(name) + cl_v, cl_p = self.color_list() + + for k in range(len(path)): + self.plot_visited(visited[k], cl_v[k]) + self.plot_path(path[k], cl_p[k], True) + plt.pause(0.5) + + plt.show() + + def animation_bi_astar(self, path, v_fore, v_back, name): + self.plot_grid(name) + self.plot_visited_bi(v_fore, v_back) + self.plot_path(path) + plt.show() + + def plot_grid(self, name): + obs_x = [x[0] for x in self.obs] + obs_y = [x[1] for x in self.obs] + + plt.plot(self.xI[0], self.xI[1], "bs") + plt.plot(self.xG[0], self.xG[1], "gs") + plt.plot(obs_x, obs_y, "sk") + plt.title(name) + plt.axis("equal") + + def plot_visited(self, visited, cl='gray'): + if self.xI in visited: + visited.remove(self.xI) + + if self.xG in visited: + visited.remove(self.xG) + + count = 0 + + for x in visited: + count += 1 + plt.plot(x[0], x[1], color=cl, marker='o') + plt.gcf().canvas.mpl_connect('key_release_event', + lambda event: [exit(0) if event.key == 'escape' else None]) + + if count < len(visited) / 3: + length = 20 + elif count < len(visited) * 2 / 3: + length = 30 + else: + length = 40 + # + # length = 15 + + if count % length == 0: + plt.pause(0.001) + plt.pause(0.01) + + def plot_path(self, path, cl='r', flag=False): + path_x = [path[i][0] for i in range(len(path))] + path_y = [path[i][1] for i in range(len(path))] + + if not flag: + plt.plot(path_x, path_y, linewidth='3', color='r') + else: + plt.plot(path_x, path_y, linewidth='3', color=cl) + + plt.plot(self.xI[0], self.xI[1], "bs") + plt.plot(self.xG[0], self.xG[1], "gs") + + plt.pause(0.01) + + def plot_visited_bi(self, v_fore, v_back): + if self.xI in v_fore: + v_fore.remove(self.xI) + + if self.xG in v_back: + v_back.remove(self.xG) + + len_fore, len_back = len(v_fore), len(v_back) + + for k in range(max(len_fore, len_back)): + if k < len_fore: + plt.plot(v_fore[k][0], v_fore[k][1], linewidth='3', color='gray', marker='o') + if k < len_back: + plt.plot(v_back[k][0], v_back[k][1], linewidth='3', color='cornflowerblue', marker='o') + + plt.gcf().canvas.mpl_connect('key_release_event', + lambda event: [exit(0) if event.key == 'escape' else None]) + + if k % 10 == 0: + plt.pause(0.001) + plt.pause(0.01) + + @staticmethod + def color_list(): + cl_v = ['silver', + 'wheat', + 'lightskyblue', + 'royalblue', + 'slategray'] + cl_p = ['gray', + 'orange', + 'deepskyblue', + 'red', + 'm'] + return cl_v, cl_p + + @staticmethod + def color_list_2(): + cl = ['silver', + 'steelblue', + 'dimgray', + 'cornflowerblue', + 'dodgerblue', + 'royalblue', + 'plum', + 'mediumslateblue', + 'mediumpurple', + 'blueviolet', + ] + return cl diff --git a/src/Search_2D/queue.py b/src/Search_2D/queue.py new file mode 100644 index 0000000..51703ae --- /dev/null +++ b/src/Search_2D/queue.py @@ -0,0 +1,62 @@ +import collections +import heapq + + +class QueueFIFO: + """ + Class: QueueFIFO + Description: QueueFIFO is designed for First-in-First-out rule. + """ + + def __init__(self): + self.queue = collections.deque() + + def empty(self): + return len(self.queue) == 0 + + def put(self, node): + self.queue.append(node) # enter from back + + def get(self): + return self.queue.popleft() # leave from front + + +class QueueLIFO: + """ + Class: QueueLIFO + Description: QueueLIFO is designed for Last-in-First-out rule. + """ + + def __init__(self): + self.queue = collections.deque() + + def empty(self): + return len(self.queue) == 0 + + def put(self, node): + self.queue.append(node) # enter from back + + def get(self): + return self.queue.pop() # leave from back + + +class QueuePrior: + """ + Class: QueuePrior + Description: QueuePrior reorders elements using value [priority] + """ + + def __init__(self): + self.queue = [] + + def empty(self): + return len(self.queue) == 0 + + def put(self, item, priority): + heapq.heappush(self.queue, (priority, item)) # reorder s using priority + + def get(self): + return heapq.heappop(self.queue)[1] # pop out the smallest item + + def enumerate(self): + return self.queue diff --git a/src/Tello/KeyPress.py b/src/Tello/KeyPress.py new file mode 100644 index 0000000..66e9f94 --- /dev/null +++ b/src/Tello/KeyPress.py @@ -0,0 +1,19 @@ +# @Time : 2022/5/9 20:49 +# @Author : 2890199310@qq.com +# @File : KeyPress.py +# @Software: PyCharm +# @Function: + +def main(): + keyPress() + +def keyPress(key): + if(key == 1): + return "e" + +def result(): + return keyPress() + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/src/Tello/KeyPressModule.py b/src/Tello/KeyPressModule.py new file mode 100644 index 0000000..bd7182a --- /dev/null +++ b/src/Tello/KeyPressModule.py @@ -0,0 +1,31 @@ +# @Time : 2022/4/20 12:27 +# @Author : 2890199310@qq.com +# @File : KeyPressModule.py.py +# @Software: PyCharm +# @Function: +import os +import sys +sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../PaddleClas-release-2.3") + +# import pygame +import Tello.KeyPress as k + +def init(): + return + +def getKey(keyName): + # ans = False + # for eve in pygame.event.get(): pass + # keyInput = pygame.key.get_pressed() + # myKey = getattr(pygame,'K_{}'.format(keyName)) + # if keyInput[myKey]: + if keyName == k.result(): + ans = True + # pygame.display.update() + return ans + +def key(a): + return a + +if __name__ == '__main__': + init() \ No newline at end of file diff --git a/src/Tello/KeyboardControl.py b/src/Tello/KeyboardControl.py new file mode 100644 index 0000000..6997c9f --- /dev/null +++ b/src/Tello/KeyboardControl.py @@ -0,0 +1,105 @@ +# @Time : 2022/4/20 12:27 +# @Author : 2890199310@qq.com +# @File : KeyboardControl.py.py +# @Software: PyCharm +# @Function: +import os +import sys +sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../PaddleClas-release-2.3") +import logging +import time +import cv2 +from djitellopy import tello +# import Tello.KeyPressModule as kp # 用于获取键盘按键 +from time import sleep + +# Tello初始化设置 +Drone = tello.Tello() # 创建飞行器对象 +Drone.connect() # 连接到飞行器 +Drone.streamon() # 开启视频传输 +Drone.LOGGER.setLevel(logging.ERROR) # 只显示错误信息 +sleep(5) # 等待视频初始化 +# kp.init() # 初始化按键处理模块 + +def getKeyboardInput(key): + image = cv2.resize(OriginalImage, (Camera_Width, Camera_Height)) + speed = 70 + drone = Drone + lr, fb, ud, yv = 0, 0, 0, 0 + key_pressed = 0 + # if kp.getKey("e"): # + if key == "e": + cv2.imwrite('D:/snap-{}.jpg'.format(time.strftime("%H%M%S", time.localtime())), image) + # if kp.getKey("UP"):# 上升 + if key == "UP": + Drone.takeoff() + # elif kp.getKey("DOWN"):#下降 + if key == "DOWN": + Drone.land() + + # if kp.getKey("j"):# 向左飞行 + if key == "j": + key_pressed = 1 + lr = -speed + # elif kp.getKey("l"): #向右飞行 + if key == "l": + key_pressed = 1 + lr = speed + + # if kp.getKey("i"): #向前飞行 + if key == "i": + key_pressed = 1 + fb = speed + # elif kp.getKey("k"):# 向后飞行 + if key == "k": + key_pressed = 1 + fb = -speed + + # if kp.getKey("w"):# 向上飞行 + if key == "w": + key_pressed = 1 + ud = speed + # elif kp.getKey("s"): #向下飞行 + if key == "s": + key_pressed = 1 + ud = -speed + + # if kp.getKey("a"): # 向左旋转 + if key == "a": + key_pressed = 1 + yv = -speed + # elif kp.getKey("d"): #向右旋转 + if key == "d": + key_pressed = 1 + yv = speed + InfoText = "battery : {0}% height: {1}cm time: {2}".format(drone.get_battery(), drone.get_height(), time.strftime("%H:%M:%S",time.localtime())) + cv2.putText(image, InfoText, (10, 20), font, fontScale, (0, 0, 255), lineThickness) + if key_pressed == 1: + InfoText = "Command : lr:{0}% fb:{1} ud:{2} yv:{3}".format(lr, fb, ud, yv) + cv2.putText(image, InfoText, (10, 40), font, fontScale, (0, 0, 255), lineThickness) + + drone.send_rc_control(lr, fb, ud, yv) + +# 主程序 +# 摄像头设置 +Camera_Width = 720 +Camera_Height = 480 +DetectRange = [6000, 11000] # DetectRange[0] 是保持静止的检测人脸面积阈值下限,DetectRange[0] 是保持静止的检测人脸面积阈值上限 +PID_Parameter = [0.5, 0.0004, 0.4] +pErrorRotate, pErrorUp = 0, 0 + +# 字体设置 +font = cv2.FONT_HERSHEY_SIMPLEX +fontScale = 0.5 +fontColor = (255, 0, 0) +lineThickness = 1 + + + + +while True: + OriginalImage = Drone.get_frame_read().frame + Image = cv2.resize(OriginalImage, (Camera_Width, Camera_Height)) + # getKeyboardInput(drone=Drone, speed=70, image=Image) # 按键控制 + cv2.imshow("Drone Control Centre", Image) + cv2.waitKey(1) \ No newline at end of file diff --git a/src/Tello/__pycache__/KeyPress.cpython-39.pyc b/src/Tello/__pycache__/KeyPress.cpython-39.pyc new file mode 100644 index 0000000..3608c27 Binary files /dev/null and b/src/Tello/__pycache__/KeyPress.cpython-39.pyc differ diff --git a/src/Tello/__pycache__/KeyPressModule.cpython-39.pyc b/src/Tello/__pycache__/KeyPressModule.cpython-39.pyc new file mode 100644 index 0000000..1114802 Binary files /dev/null and b/src/Tello/__pycache__/KeyPressModule.cpython-39.pyc differ diff --git a/src/Tello/__pycache__/KeyboardControl.cpython-39.pyc b/src/Tello/__pycache__/KeyboardControl.cpython-39.pyc new file mode 100644 index 0000000..aad69a2 Binary files /dev/null and b/src/Tello/__pycache__/KeyboardControl.cpython-39.pyc differ diff --git a/src/prepare/calculate.py b/src/prepare/calculate.py new file mode 100644 index 0000000..7182c26 --- /dev/null +++ b/src/prepare/calculate.py @@ -0,0 +1,12 @@ +# @Time : 2022/5/9 8:45 +# @Author : 2890199310@qq.com +# @File : calculate.py +# @Software: PyCharm +# @Function: +import os + +os.chdir(r'E:\PaddleClas-release-2.3\PaddleClas-release-2.3\deploy') + +#### + +os.system('python python/predict_system.py -c configs/inference_general.yaml -o Global.infer_imgs="./Trees/test_images/90.jpeg" -o IndexProcess.index_dir="./Trees/index" -o Global.use_gpu=False -o IndexProcess.Image_root="./Trees/gallery/" -o IndexProcess.data_file="./Trees/gallery/tree_list.txt') diff --git a/src/prepare/change_file.py b/src/prepare/change_file.py new file mode 100644 index 0000000..2c06271 --- /dev/null +++ b/src/prepare/change_file.py @@ -0,0 +1,84 @@ +# @Time : 2022/4/19 9:41 +# @Author : 2890199310@qq.com +# @File : change_file.py +# @Software: PyCharm +# @Function: + +import os +import shutil + +# 读取文件名 +def readname(): + filepath = "/deploy/Trees/gallery/0" + name = os.listdir(filepath) + return name + +def main(): + gallery = os.path.abspath(r"/deploy/drink_dataset_v1.0/gallery") + all_files = os.listdir(gallery)[:-2] + all_files.sort(key=lambda x:int(x)) + # for file in all_files: + # print(file) + + for i in range(0, 102): #创建文件夹 + dire = "E:\\PaddleClas-release-2.3\\PaddleClas-release-2.3\\deploy\\flowers102\\gallery\\" + str(i) + if not os.path.exists(dire): + os.makedirs(dire) + print(f"{i}创建完毕") + f = open(r"/deploy/flowers102/train_extra_list.txt") + for line in f.readlines(): + base = "E://PaddleClas-release-2.3//PaddleClas-release-2.3/deploy//flowers102//" + s = line.split('/') + + str1 = s[0].strip() + "//" + str2 = s[1].split(' ')[0].strip() + str3 = s[1].split(' ')[1].strip() + "//" + # print(base + "gallery//" + str3 + str2) + + # 复制训练文件 + # shutil.copy(base + str1 + str2, base + "gallery/" + str3 + str2) + f.close() + + f = open(r"/deploy/flowers102/val_list.txt") + for line in f.readlines(): + base = "E://PaddleClas-release-2.3//PaddleClas-release-2.3/deploy//flowers102//" + s = line.split('/') + str1 = s[0].strip() + "//" + str2 = s[1].split(' ')[0].strip() + str3 = s[1].split(' ')[1].strip() + "//" + # 复制测试文件 + # shutil.copy(base + str1 + str2, base + "test_images/" + str2) + f.close() + + dit = dict() + + + with open(r"/deploy/flowers102/flowers102_label_list.txt", "r", encoding="utf-8") as f2: + for line in f2.readlines(): + tmp = line.split(' ', 1) + dit[tmp[0]] = tmp[1].strip() + #print(dit) + + #更改list内容 + with open(r"/deploy/flowers102/train_list.txt", "r", encoding="utf-8") as f1, open( + r"/deploy/flowers102/train_label.txt", "w", encoding="utf-8") as f3: + for line1 in f1.readlines(): + tmp1 = line1.split(' ', 1) + # f3.write(tmp1[1].strip() + '/' + tmp1[0].split('/', 1)[1].strip() + '\t' + dit[tmp1[1].strip()] + '\n') + + # print(line1) + # print(line2) + # break + f1.close() + f2.close() + f3.close() + with open(r"/deploy/Trees/gallery/tree_list.txt", "w", encoding="utf-8") as f1: + name = readname() + for i in name: + if('GIF' not in i): + f1.write('0/' + i + '\t' + 'tree\n') + f1.close() + + +if __name__ == '__main__': + main() diff --git a/src/prepare/spider.py b/src/prepare/spider.py new file mode 100644 index 0000000..0373296 --- /dev/null +++ b/src/prepare/spider.py @@ -0,0 +1,5 @@ +# @Time : 2022/4/19 9:54 +# @Author : 2890199310@qq.com +# @File : spider.py +# @Software: PyCharm +# @Function: diff --git a/src/qt/__pycache__/page1.cpython-39.pyc b/src/qt/__pycache__/page1.cpython-39.pyc new file mode 100644 index 0000000..54af08c Binary files /dev/null and b/src/qt/__pycache__/page1.cpython-39.pyc differ diff --git a/src/qt/__pycache__/page2.cpython-39.pyc b/src/qt/__pycache__/page2.cpython-39.pyc new file mode 100644 index 0000000..f7d7721 Binary files /dev/null and b/src/qt/__pycache__/page2.cpython-39.pyc differ diff --git a/src/qt/__pycache__/tello_UI.cpython-39.pyc b/src/qt/__pycache__/tello_UI.cpython-39.pyc new file mode 100644 index 0000000..e3eb413 Binary files /dev/null and b/src/qt/__pycache__/tello_UI.cpython-39.pyc differ diff --git a/src/qt/__pycache__/vedio_demo.cpython-39.pyc b/src/qt/__pycache__/vedio_demo.cpython-39.pyc new file mode 100644 index 0000000..476261d Binary files /dev/null and b/src/qt/__pycache__/vedio_demo.cpython-39.pyc differ diff --git a/src/qt/page1.py b/src/qt/page1.py new file mode 100644 index 0000000..5337880 --- /dev/null +++ b/src/qt/page1.py @@ -0,0 +1,114 @@ +# -*- coding: utf-8 -*- + +# Form implementation generated from reading ui file 'page1.ui' +# +# Created by: PyQt5 UI code generator 5.15.4 +# +# WARNING: Any manual changes made to this file will be lost when pyuic5 is +# run again. Do not edit this file unless you know what you are doing. +import logging + +from PyQt5 import QtCore, QtWidgets +import sys +import os +sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../PaddleClas-release-2.3") +from Screenshot.Screenshot_Gui import * +from tello_UI import * + + +from djitellopy import * +import time + + +class Ui_Form1(object): + def setupUi(self, Form): + Form.setObjectName("Form") + Form.resize(1108, 767) + self.pushButton = QtWidgets.QPushButton(Form) + self.pushButton.setGeometry(QtCore.QRect(10, 740, 75, 23)) + sizePolicy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Fixed, QtWidgets.QSizePolicy.Fixed) + sizePolicy.setHorizontalStretch(0) + sizePolicy.setVerticalStretch(0) + sizePolicy.setHeightForWidth(self.pushButton.sizePolicy().hasHeightForWidth()) + self.pushButton.setSizePolicy(sizePolicy) + self.pushButton.setObjectName("pushButton") + sizePolicy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Fixed, QtWidgets.QSizePolicy.Fixed) + sizePolicy.setHorizontalStretch(0) + sizePolicy.setVerticalStretch(0) + self.mdiArea = QtWidgets.QMdiArea(Form) + self.mdiArea.setGeometry(QtCore.QRect(9, 61, 1091, 671)) + sizePolicy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Preferred, QtWidgets.QSizePolicy.Preferred) + sizePolicy.setHorizontalStretch(0) + sizePolicy.setVerticalStretch(0) + sizePolicy.setHeightForWidth(self.mdiArea.sizePolicy().hasHeightForWidth()) + self.mdiArea.setSizePolicy(sizePolicy) + self.mdiArea.setMaximumSize(QtCore.QSize(16777215, 16777215)) + self.mdiArea.setObjectName("mdiArea") + self.pushButton_4 = QtWidgets.QPushButton(Form) + self.pushButton_4.setGeometry(QtCore.QRect(1020, 740, 75, 23)) + sizePolicy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Fixed, QtWidgets.QSizePolicy.Fixed) + sizePolicy.setHorizontalStretch(0) + sizePolicy.setVerticalStretch(0) + sizePolicy.setHeightForWidth(self.pushButton_4.sizePolicy().hasHeightForWidth()) + self.pushButton_4.setSizePolicy(sizePolicy) + self.pushButton_4.setObjectName("pushButton_4") + self.pushButton_3 = QtWidgets.QPushButton(Form) + self.pushButton_3.setGeometry(QtCore.QRect(940, 740, 75, 23)) + sizePolicy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Fixed, QtWidgets.QSizePolicy.Fixed) + sizePolicy.setHorizontalStretch(0) + sizePolicy.setVerticalStretch(0) + sizePolicy.setHeightForWidth(self.pushButton_3.sizePolicy().hasHeightForWidth()) + self.pushButton_3.setSizePolicy(sizePolicy) + self.pushButton_3.setObjectName("pushButton_3") + self.label = QtWidgets.QLabel(Form) + self.label.setGeometry(QtCore.QRect(10, 10, 101, 31)) + self.label.setObjectName("label") + self.pushButton_5 = QtWidgets.QPushButton(Form) + self.pushButton_5.setGeometry(QtCore.QRect(480, 740, 75, 23)) + self.pushButton_5.setObjectName("pushButton_5") + + self.retranslateUi(Form) + QtCore.QMetaObject.connectSlotsByName(Form) + self.pushButton.clicked.connect(lambda: self.show_camera()) + self.pushButton_5.clicked.connect(lambda: self.record_vedio()) + def show_camera(self): + MainWindow = QMainWindow() + ui = Ui_Form() + ui.setupUi(MainWindow) + MainWindow.setFixedSize(1022, 618) + self.mdiArea.addSubWindow(MainWindow) + MainWindow.show() + try: + import Tello.KeyboardControl as kb + ui.pushButton.clicked.connect(lambda: up()) + ui.pushButton2.clicked.connect(lambda: left()) + ui.pushButton4.clicked.connect(lambda: right()) + ui.pushButton5.clicked.connect(lambda: return1()) + ui.pushButton3.clicked.connect(lambda: qifei()) + ui.pushButton6.clicked.connect(lambda: jiangluo()) + except: + print("") + def up(): + kb.getKeyboardInput("i")# 前 + def left():#左 + kb.getKeyboardInput("j") + def right(): + kb.getKeyboardInput("l")#右 + def return1(): + kb.getKeyboardInput("k")#后 + def qifei(): + kb.getKeyboardInput("UP") + def jiangluo(): + kb.getKeyboardInput("DOWN") + def record_vedio(self): + ui = Ui_MainWindow1() + self.mdiArea.addSubWindow(ui) + ui.show() + def retranslateUi(self, Form): + _translate = QtCore.QCoreApplication.translate + Form.setWindowTitle(_translate("Form", "page1")) + self.pushButton.setText(_translate("Form", "连接")) + self.pushButton_4.setText(_translate("Form", "缩小")) + self.pushButton_3.setText(_translate("Form", "放大")) + self.label.setText(_translate("Form", "实时画面显示系统")) + self.pushButton_5.setText(_translate("Form", "视频录制")) diff --git a/src/qt/page2.py b/src/qt/page2.py new file mode 100644 index 0000000..18b6d82 --- /dev/null +++ b/src/qt/page2.py @@ -0,0 +1,280 @@ +# -*- coding: utf-8 -*- + +# Form implementation generated from reading ui file 'page2.ui' +# +# Created by: PyQt5 UI code generator 5.15.4 +# +# WARNING: Any manual changes made to this file will be lost when pyuic5 is +# run again. Do not edit this file unless you know what you are doing. + + +from PyQt5 import QtCore, QtGui, QtWidgets +from vedio_demo import Ui_MainWindow +from PyQt5.QtWidgets import * +from PyQt5.QtMultimedia import QMediaContent, QMediaPlayer +import os +import sys +import math +import heapq + +sys.path.append(os.path.dirname(os.path.abspath(__file__)) + + "/../../Search_based_Planning/") + +from Search_2D import plotting, env + +class AStar: + """AStar set the cost + heuristics as the priority + """ + def __init__(self, s_start, s_goal, heuristic_type): + self.s_start = s_start + self.s_goal = s_goal + self.heuristic_type = heuristic_type + + self.Env = env.Env() # class Env + + self.u_set = self.Env.motions # feasible input set + self.obs = self.Env.obs # position of obstacles + + self.OPEN = [] # priority queue / OPEN set + self.CLOSED = [] # CLOSED set / VISITED order + self.PARENT = dict() # recorded parent + self.g = dict() # cost to come + + def searching(self): + """ + A_star Searching. + :return: path, visited order + """ + + self.PARENT[self.s_start] = self.s_start + self.g[self.s_start] = 0 + self.g[self.s_goal] = math.inf + heapq.heappush(self.OPEN, + (self.f_value(self.s_start), self.s_start)) + + while self.OPEN: + _, s = heapq.heappop(self.OPEN) + self.CLOSED.append(s) + + if s == self.s_goal: # stop condition + break + + for s_n in self.get_neighbor(s): + new_cost = self.g[s] + self.cost(s, s_n) + + if s_n not in self.g: + self.g[s_n] = math.inf + + if new_cost < self.g[s_n]: # conditions for updating Cost + self.g[s_n] = new_cost + self.PARENT[s_n] = s + heapq.heappush(self.OPEN, (self.f_value(s_n), s_n)) + + return self.extract_path(self.PARENT), self.CLOSED + + def searching_repeated_astar(self, e): + """ + repeated A*. + :param e: weight of A* + :return: path and visited order + """ + + path, visited = [], [] + + while e >= 1: + p_k, v_k = self.repeated_searching(self.s_start, self.s_goal, e) + path.append(p_k) + visited.append(v_k) + e -= 0.5 + + return path, visited + + def repeated_searching(self, s_start, s_goal, e): + """ + run A* with weight e. + :param s_start: starting state + :param s_goal: goal state + :param e: weight of a* + :return: path and visited order. + """ + + g = {s_start: 0, s_goal: float("inf")} + PARENT = {s_start: s_start} + OPEN = [] + CLOSED = [] + heapq.heappush(OPEN, + (g[s_start] + e * self.heuristic(s_start), s_start)) + + while OPEN: + _, s = heapq.heappop(OPEN) + CLOSED.append(s) + + if s == s_goal: + break + + for s_n in self.get_neighbor(s): + new_cost = g[s] + self.cost(s, s_n) + + if s_n not in g: + g[s_n] = math.inf + + if new_cost < g[s_n]: # conditions for updating Cost + g[s_n] = new_cost + PARENT[s_n] = s + heapq.heappush(OPEN, (g[s_n] + e * self.heuristic(s_n), s_n)) + + return self.extract_path(PARENT), CLOSED + + def get_neighbor(self, s): + """ + find neighbors of state s that not in obstacles. + :param s: state + :return: neighbors + """ + + return [(s[0] + u[0], s[1] + u[1]) for u in self.u_set] + + def cost(self, s_start, s_goal): + """ + Calculate Cost for this motion + :param s_start: starting node + :param s_goal: end node + :return: Cost for this motion + :note: Cost function could be more complicate! + """ + + if self.is_collision(s_start, s_goal): + return math.inf + + return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1]) + + def is_collision(self, s_start, s_end): + """ + check if the line segment (s_start, s_end) is collision. + :param s_start: start node + :param s_end: end node + :return: True: is collision / False: not collision + """ + + if s_start in self.obs or s_end in self.obs: + return True + + if s_start[0] != s_end[0] and s_start[1] != s_end[1]: + if s_end[0] - s_start[0] == s_start[1] - s_end[1]: + s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1])) + s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1])) + else: + s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1])) + s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1])) + + if s1 in self.obs or s2 in self.obs: + return True + + return False + + def f_value(self, s): + """ + f = g + h. (g: Cost to come, h: heuristic value) + :param s: current state + :return: f + """ + + return self.g[s] + self.heuristic(s) + + def extract_path(self, PARENT): + """ + Extract the path based on the PARENT set. + :return: The planning path + """ + + path = [self.s_goal] + s = self.s_goal + + while True: + s = PARENT[s] + path.append(s) + + if s == self.s_start: + break + + return list(path) + + def heuristic(self, s): + """ + Calculate heuristic. + :param s: current node (state) + :return: heuristic function value + """ + + heuristic_type = self.heuristic_type # heuristic type + goal = self.s_goal # goal node + + if heuristic_type == "manhattan": + return abs(goal[0] - s[0]) + abs(goal[1] - s[1]) + else: + return math.hypot(goal[0] - s[0], goal[1] - s[1]) + + + +class Ui_MainWindow1(object): + def setupUi(self, MainWindow): + MainWindow.setObjectName("MainWindow") + MainWindow.resize(1112, 766) + self.centralwidget = QtWidgets.QWidget(MainWindow) + self.centralwidget.setObjectName("centralwidget") + self.gridLayout = QtWidgets.QGridLayout(self.centralwidget) + self.gridLayout.setObjectName("gridLayout") + self.pushButton = QtWidgets.QPushButton(self.centralwidget) + self.pushButton.setObjectName("pushButton") + self.gridLayout.addWidget(self.pushButton, 2, 0, 1, 1) + self.mdiArea = QtWidgets.QMdiArea(self.centralwidget) + self.mdiArea.setObjectName("mdiArea") + self.gridLayout.addWidget(self.mdiArea, 1, 0, 1, 1) + self.pushButton_2 = QtWidgets.QPushButton(self.centralwidget) + self.pushButton_2.setObjectName("pushButton_2") + self.gridLayout.addWidget(self.pushButton_2, 3, 0, 1, 1) + self.label = QtWidgets.QLabel(self.centralwidget) + self.label.setObjectName("label") + self.gridLayout.addWidget(self.label, 0, 0, 1, 1) + MainWindow.setCentralWidget(self.centralwidget) + self.menubar = QtWidgets.QMenuBar(MainWindow) + self.menubar.setGeometry(QtCore.QRect(0, 0, 1112, 23)) + self.menubar.setObjectName("menubar") + MainWindow.setMenuBar(self.menubar) + self.statusbar = QtWidgets.QStatusBar(MainWindow) + self.statusbar.setObjectName("statusbar") + MainWindow.setStatusBar(self.statusbar) + self.retranslateUi(MainWindow) + QtCore.QMetaObject.connectSlotsByName(MainWindow) + self.pushButton.clicked.connect(lambda: self.msg()) + self.pushButton_2.clicked.connect(lambda: self.search()) + def search(self): + s_start = (5, 5) + s_goal = (45, 25) + + astar = AStar(s_start, s_goal, "euclidean") + plot = plotting.Plotting(s_start, s_goal) + + path, visited = astar.searching() + plot.animation(path, visited, "A*") # animation + def msg(self): + MainWindow = QMainWindow() + ui = Ui_MainWindow() + ui.setupUi(MainWindow) + self.mdiArea.addSubWindow(MainWindow) + MainWindow.showMaximized() + ui.player = QMediaPlayer() + ui.player.setVideoOutput(ui.wgt_video) + ui.pushButton.clicked.connect(lambda: openVideoFile(ui)) + ui.player.setMedia(QMediaContent(QFileDialog.getOpenFileUrl()[0])) + ui.pushButton_2.clicked.connect(lambda: pause(ui)) + def pause(a): + a.player.pause() + def openVideoFile(a): + a.player.play() + def retranslateUi(self, MainWindow): + _translate = QtCore.QCoreApplication.translate + MainWindow.setWindowTitle(_translate("MainWindow", "MainWindow")) + self.pushButton.setText(_translate("MainWindow", "添加文件")) + self.pushButton_2.setText(_translate("MainWindow", "分析路径")) + self.label.setText(_translate("MainWindow", "路径分析界面")) diff --git a/src/qt/system_main.py b/src/qt/system_main.py new file mode 100644 index 0000000..e45f8b7 --- /dev/null +++ b/src/qt/system_main.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- + +# Form implementation generated from reading ui file 'system_main.ui' +# +# Created by: PyQt5 UI code generator 5.15.4 +# +# WARNING: Any manual changes made to this file will be lost when pyuic5 is +# run again. Do not edit this file unless you know what you are doing. + +from PyQt5 import QtCore, QtGui, QtWidgets +import sys +import os +from PyQt5.QtWidgets import * +from page1 import Ui_Form1 +from page2 import Ui_MainWindow1 + +sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../PaddleClas-release-2.3") +class Ui_MainWindow(object): + def setupUi(self, MainWindow): + MainWindow.setObjectName("MainWindow") + MainWindow.resize(1275, 896) + self.centralwidget = QtWidgets.QWidget(MainWindow) + self.centralwidget.setObjectName("centralwidget") + self.gridLayout = QtWidgets.QGridLayout(self.centralwidget) + self.gridLayout.setObjectName("gridLayout") + self.label = QtWidgets.QLabel(self.centralwidget) + self.label.setMinimumSize(QtCore.QSize(0, 20)) + self.label.setMaximumSize(QtCore.QSize(300, 20)) + self.label.setTextFormat(QtCore.Qt.AutoText) + self.label.setAlignment(QtCore.Qt.AlignCenter) + self.label.setObjectName("label") + self.gridLayout.addWidget(self.label, 1, 0, 1, 1) + self.groupBox = QtWidgets.QGroupBox(self.centralwidget) + sizePolicy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Fixed, QtWidgets.QSizePolicy.Expanding) + sizePolicy.setHorizontalStretch(0) + sizePolicy.setVerticalStretch(0) + sizePolicy.setHeightForWidth(self.groupBox.sizePolicy().hasHeightForWidth()) + self.groupBox.setSizePolicy(sizePolicy) + self.groupBox.setMaximumSize(QtCore.QSize(600, 16777215)) + self.groupBox.setFlat(False) + self.groupBox.setCheckable(False) + self.groupBox.setObjectName("groupBox") + self.gridLayout_2 = QtWidgets.QGridLayout(self.groupBox) + self.gridLayout_2.setObjectName("gridLayout_2") + self.pushButton = QtWidgets.QPushButton(self.groupBox) + self.pushButton.setObjectName("pushButton") + self.gridLayout_2.addWidget(self.pushButton, 0, 0, 1, 1) + self.pushButton_2 = QtWidgets.QPushButton(self.groupBox) + self.pushButton_2.setObjectName("pushButton_2") + self.gridLayout_2.addWidget(self.pushButton_2, 1, 0, 1, 1) + self.gridLayout.addWidget(self.groupBox, 2, 0, 2, 1) + self.mdiArea = QtWidgets.QMdiArea(self.centralwidget) + self.mdiArea.setObjectName("mdiArea") + self.gridLayout.addWidget(self.mdiArea, 2, 1, 2, 1) + MainWindow.setCentralWidget(self.centralwidget) + self.menubar = QtWidgets.QMenuBar(MainWindow) + self.menubar.setGeometry(QtCore.QRect(0, 0, 1275, 23)) + self.menubar.setObjectName("menubar") + MainWindow.setMenuBar(self.menubar) + self.statusbar = QtWidgets.QStatusBar(MainWindow) + self.statusbar.setObjectName("statusbar") + MainWindow.setStatusBar(self.statusbar) + self.pushButton.clicked.connect(lambda: self.open1()) + self.pushButton_2.clicked.connect(lambda: self.open2()) + self.retranslateUi(MainWindow) + QtCore.QMetaObject.connectSlotsByName(MainWindow) + + def open1(self): + MainWindow = QMainWindow() + ui = Ui_Form1() + ui.setupUi(MainWindow) + self.mdiArea.addSubWindow(MainWindow) + MainWindow.showMaximized() + + def open2(self): + MainWindow = QMainWindow() + ui = Ui_MainWindow1() + ui.setupUi(MainWindow) + self.mdiArea.addSubWindow(MainWindow) + MainWindow.showMaximized() + + def retranslateUi(self, MainWindow): + _translate = QtCore.QCoreApplication.translate + MainWindow.setWindowTitle(_translate("MainWindow", "无人机自动寻路系统")) + self.label.setText(_translate("MainWindow", "无人机自动寻路系统(主界面)")) + self.groupBox.setTitle(_translate("MainWindow", "菜单栏")) + self.pushButton.setText(_translate("MainWindow", "实时画面")) + self.pushButton_2.setText(_translate("MainWindow", "路径分析")) + +if __name__ == '__main__': + app = QApplication(sys.argv) + MainWindow = QMainWindow() + ui = Ui_MainWindow() + ui.setupUi(MainWindow) + MainWindow.show() + sys.exit(app.exec_()) \ No newline at end of file diff --git a/src/qt/tello_UI.py b/src/qt/tello_UI.py new file mode 100644 index 0000000..02977ce --- /dev/null +++ b/src/qt/tello_UI.py @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- + +# Form implementation generated from reading ui file 'tello_UI.ui' +# +# Created by: PyQt5 UI code generator 5.15.4 +# +# WARNING: Any manual changes made to this file will be lost when pyuic5 is +# run again. Do not edit this file unless you know what you are doing. + + +from PyQt5 import QtCore, QtGui, QtWidgets + + +class Ui_Form(object): + def setupUi(self, Form): + Form.setObjectName("Form") + Form.resize(1022, 618) + self.groupBox_4 = QtWidgets.QGroupBox(Form) + self.groupBox_4.setGeometry(QtCore.QRect(670, 567, 350, 50)) + self.groupBox_4.setMaximumSize(QtCore.QSize(350, 50)) + self.groupBox_4.setObjectName("groupBox_4") + self.lineEdit_3 = QtWidgets.QLineEdit(self.groupBox_4) + self.lineEdit_3.setGeometry(QtCore.QRect(10, 20, 331, 20)) + self.lineEdit_3.setObjectName("lineEdit_3") + self.groupBox_2 = QtWidgets.QGroupBox(Form) + self.groupBox_2.setGeometry(QtCore.QRect(670, 156, 350, 405)) + self.groupBox_2.setObjectName("groupBox_2") + self.pushButton = QtWidgets.QPushButton(self.groupBox_2) + self.pushButton.setGeometry(QtCore.QRect(150, 120, 71, 71)) + self.pushButton.setText("") + icon = QtGui.QIcon() + icon.addPixmap(QtGui.QPixmap("tello_png/up.png"), QtGui.QIcon.Normal, QtGui.QIcon.Off) + self.pushButton.setIcon(icon) + self.pushButton.setIconSize(QtCore.QSize(70, 100)) + self.pushButton.setObjectName("pushButton") + self.pushButton_2 = QtWidgets.QPushButton(self.groupBox_2) + self.pushButton_2.setGeometry(QtCore.QRect(70, 200, 71, 71)) + self.pushButton_2.setText("") + icon1 = QtGui.QIcon() + icon1.addPixmap(QtGui.QPixmap("tello_png/left.png"), QtGui.QIcon.Normal, QtGui.QIcon.Off) + self.pushButton_2.setIcon(icon1) + self.pushButton_2.setIconSize(QtCore.QSize(70, 100)) + self.pushButton_2.setObjectName("pushButton_2") + self.pushButton_4 = QtWidgets.QPushButton(self.groupBox_2) + self.pushButton_4.setGeometry(QtCore.QRect(230, 200, 71, 71)) + self.pushButton_4.setText("") + icon2 = QtGui.QIcon() + icon2.addPixmap(QtGui.QPixmap("tello_png/right.png"), QtGui.QIcon.Normal, QtGui.QIcon.Off) + self.pushButton_4.setIcon(icon2) + self.pushButton_4.setIconSize(QtCore.QSize(70, 100)) + self.pushButton_4.setObjectName("pushButton_4") + self.pushButton_5 = QtWidgets.QPushButton(self.groupBox_2) + self.pushButton_5.setGeometry(QtCore.QRect(150, 200, 71, 71)) + self.pushButton_5.setText("") + icon3 = QtGui.QIcon() + icon3.addPixmap(QtGui.QPixmap("tello_png/return.png"), QtGui.QIcon.Normal, QtGui.QIcon.Off) + self.pushButton_5.setIcon(icon3) + self.pushButton_5.setIconSize(QtCore.QSize(70, 100)) + self.pushButton_5.setObjectName("pushButton_5") + self.pushButton_3 = QtWidgets.QPushButton(self.groupBox_2) + self.pushButton_3.setGeometry(QtCore.QRect(70, 20, 231, 91)) + icon4 = QtGui.QIcon() + icon4.addPixmap(QtGui.QPixmap("tello_png/qifei.png"), QtGui.QIcon.Normal, QtGui.QIcon.Off) + self.pushButton_3.setIcon(icon4) + self.pushButton_3.setIconSize(QtCore.QSize(150, 150)) + self.pushButton_3.setObjectName("pushButton_3") + self.pushButton_6 = QtWidgets.QPushButton(self.groupBox_2) + self.pushButton_6.setGeometry(QtCore.QRect(70, 290, 231, 91)) + icon5 = QtGui.QIcon() + icon5.addPixmap(QtGui.QPixmap("tello_png/jiangluo.png"), QtGui.QIcon.Normal, QtGui.QIcon.Off) + self.pushButton_6.setIcon(icon5) + self.pushButton_6.setIconSize(QtCore.QSize(150, 150)) + self.pushButton_6.setObjectName("pushButton_6") + self.groupBox = QtWidgets.QGroupBox(Form) + self.groupBox.setGeometry(QtCore.QRect(670, 0, 350, 150)) + self.groupBox.setMaximumSize(QtCore.QSize(16777215, 150)) + self.groupBox.setObjectName("groupBox") + self.lineEdit = QtWidgets.QLineEdit(self.groupBox) + self.lineEdit.setGeometry(QtCore.QRect(20, 20, 311, 41)) + self.lineEdit.setObjectName("lineEdit") + self.lineEdit_2 = QtWidgets.QLineEdit(self.groupBox) + self.lineEdit_2.setGeometry(QtCore.QRect(20, 70, 311, 41)) + self.lineEdit_2.setObjectName("lineEdit_2") + + self.retranslateUi(Form) + QtCore.QMetaObject.connectSlotsByName(Form) + + def retranslateUi(self, Form): + _translate = QtCore.QCoreApplication.translate + Form.setWindowTitle(_translate("Form", "Form")) + self.groupBox_4.setTitle(_translate("Form", "连接状态")) + self.groupBox_2.setTitle(_translate("Form", "控制面板")) + self.pushButton_3.setText(_translate("Form", "起飞")) + self.pushButton_6.setText(_translate("Form", "降落")) + self.groupBox.setTitle(_translate("Form", "无人机状态")) + self.lineEdit.setText(_translate("Form", "剩余电量:")) + self.lineEdit_2.setText(_translate("Form", "WIFI强度:")) diff --git a/src/qt/tello_png/jiangluo.png b/src/qt/tello_png/jiangluo.png new file mode 100644 index 0000000..22b198d Binary files /dev/null and b/src/qt/tello_png/jiangluo.png differ diff --git a/src/qt/tello_png/left.png b/src/qt/tello_png/left.png new file mode 100644 index 0000000..c327560 Binary files /dev/null and b/src/qt/tello_png/left.png differ diff --git a/src/qt/tello_png/qifei.png b/src/qt/tello_png/qifei.png new file mode 100644 index 0000000..e198649 Binary files /dev/null and b/src/qt/tello_png/qifei.png differ diff --git a/src/qt/tello_png/return.png b/src/qt/tello_png/return.png new file mode 100644 index 0000000..55d770b Binary files /dev/null and b/src/qt/tello_png/return.png differ diff --git a/src/qt/tello_png/right.png b/src/qt/tello_png/right.png new file mode 100644 index 0000000..6d87a6a Binary files /dev/null and b/src/qt/tello_png/right.png differ diff --git a/src/qt/tello_png/up.png b/src/qt/tello_png/up.png new file mode 100644 index 0000000..9e2aebe Binary files /dev/null and b/src/qt/tello_png/up.png differ diff --git a/src/qt/vedio_demo.py b/src/qt/vedio_demo.py new file mode 100644 index 0000000..5a6b7f0 --- /dev/null +++ b/src/qt/vedio_demo.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- + +# Form implementation generated from reading ui file 'vedio_demo.ui' +# +# Created by: PyQt5 UI code generator 5.15.4 +# +# WARNING: Any manual changes made to this file will be lost when pyuic5 is +# run again. Do not edit this file unless you know what you are doing. + + +from PyQt5 import QtCore, QtGui, QtWidgets +from PyQt5.QtMultimediaWidgets import QVideoWidget + +class Ui_MainWindow(object): + def setupUi(self, MainWindow): + MainWindow.setObjectName("MainWindow") + MainWindow.resize(1111, 676) + self.centralwidget = QtWidgets.QWidget(MainWindow) + self.centralwidget.setObjectName("centralwidget") + self.gridLayout = QtWidgets.QGridLayout(self.centralwidget) + self.gridLayout.setObjectName("gridLayout") + self.pushButton_2 = QtWidgets.QPushButton(self.centralwidget) + self.pushButton_2.setObjectName("pushButton_2") + self.gridLayout.addWidget(self.pushButton_2, 3, 0, 1, 1) + self.pushButton = QtWidgets.QPushButton(self.centralwidget) + self.pushButton.setObjectName("pushButton") + self.gridLayout.addWidget(self.pushButton, 2, 0, 1, 1) + self.wgt_video = QVideoWidget(self.centralwidget) + self.wgt_video.setObjectName("widget") + self.gridLayout.addWidget(self.wgt_video, 0, 0, 1, 1) + self.horizontalSlider = QtWidgets.QSlider(self.centralwidget) + self.horizontalSlider.setOrientation(QtCore.Qt.Horizontal) + self.horizontalSlider.setObjectName("horizontalSlider") + self.gridLayout.addWidget(self.horizontalSlider, 1, 0, 1, 1) + MainWindow.setCentralWidget(self.centralwidget) + self.menubar = QtWidgets.QMenuBar(MainWindow) + self.menubar.setGeometry(QtCore.QRect(0, 0, 1111, 23)) + self.menubar.setObjectName("menubar") + MainWindow.setMenuBar(self.menubar) + self.statusbar = QtWidgets.QStatusBar(MainWindow) + self.statusbar.setObjectName("statusbar") + MainWindow.setStatusBar(self.statusbar) + self.retranslateUi(MainWindow) + QtCore.QMetaObject.connectSlotsByName(MainWindow) + + def retranslateUi(self, MainWindow): + _translate = QtCore.QCoreApplication.translate + MainWindow.setWindowTitle(_translate("MainWindow", "MainWindow")) + self.pushButton_2.setText(_translate("MainWindow", "暂停")) + self.pushButton.setText(_translate("MainWindow", "播放"))