-
Notifications
You must be signed in to change notification settings - Fork 22
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
space-time A* skeleton almost finished
- Loading branch information
Showing
7 changed files
with
152 additions
and
43 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
|
||
from distutils.core import setup | ||
|
||
setup( | ||
name = 'space-time-astar', # How you named your package folder (MyLib) | ||
packages = ['space-time-astar'], # Chose the same as "name" | ||
version = '0.1', # Start with a small number and increase it with every change you make | ||
license='MIT', # Chose a license from here: https://help.github.com/articles/licensing-a-repository | ||
description = 'A* search algorithm with an added time dimension to deal with dynamic obstacles.', # Give a short description about your library | ||
author = 'Haoran Peng', # Type in your name | ||
author_email = '[email protected]', # Type in your E-Mail | ||
url = 'https://github.com/GavinPHR/Space-Time-AStar', # Provide either the link to your github or to your website | ||
download_url = 'https://github.com/user/reponame/archive/v_01.tar.gz', # I explain this later on | ||
keywords = ['astar-algorithm', 'obstacle-avoidance', 'time-dimension'], # Keywords that define your package best | ||
install_requires=[ # I get to this in a second | ||
'validators', | ||
'beautifulsoup4', | ||
], | ||
classifiers=[ | ||
'Development Status :: 3 - Alpha', # Chose either "3 - Alpha", "4 - Beta" or "5 - Production/Stable" as the current state of your package | ||
'Intended Audience :: Developers', # Define that your audience are developers | ||
'Topic :: Software Development :: Build Tools', | ||
'License :: OSI Approved :: MIT License', # Again, pick a license | ||
'Programming Language :: Python :: 3', #Specify which pyhton versions that you want to support | ||
'Programming Language :: Python :: 3.4', | ||
'Programming Language :: Python :: 3.5', | ||
'Programming Language :: Python :: 3.6', | ||
'Programming Language :: Python :: 3.7', | ||
], | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
from space-time-astar.planner import Planner |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,14 +4,13 @@ | |
Email: [email protected] | ||
''' | ||
from typing import Tuple, List, Dict | ||
from collections import defaultdict | ||
import heapq | ||
from heapq import heappush, heappop | ||
import numpy as np | ||
from scipy.spatial import KDTree | ||
|
||
from neighbour_table import NeighbourTable | ||
from grid import Grid | ||
from copy import deepcopy | ||
from state import State | ||
|
||
|
||
# static obstacles must include the boundary | ||
|
@@ -24,11 +23,11 @@ def __init__(self, grid_size: int, | |
start: Tuple[int, int], | ||
goal: Tuple[int, int], | ||
static_obstacles: List[Tuple[int, int]], | ||
dynamic_obstacles: Dict[int, List[Tuple[int, int]]]): | ||
dynamic_obstacles: Dict[int, List[Tuple[int, int]]], | ||
max_iter:int = 10000, | ||
debug = False): | ||
self.grid_size = grid_size | ||
self.robot_radius = robot_radius | ||
self.start = np.array(start) | ||
self.goal = np.array(goal) | ||
np_static_obstacles = np.array(static_obstacles) | ||
self.static_obstacles = KDTree(np_static_obstacles) | ||
self.dynamic_obstacles = dict((k, np.array(v)) for k, v in dynamic_obstacles.items()) | ||
|
@@ -37,64 +36,96 @@ def __init__(self, grid_size: int, | |
self.grid = Grid(grid_size, np_static_obstacles) | ||
# Make a lookup table for looking up neighbours of a grid | ||
self.neighbour_table = NeighbourTable(self.grid.grid) | ||
# Function to hash a position | ||
self.hash = NeighbourTable.hash | ||
|
||
self.start = self.grid.snap_to_grid(np.array(start)) | ||
self.goal = self.grid.snap_to_grid(np.array(goal)) | ||
self.max_iter = max_iter | ||
self.debug = debug | ||
|
||
''' | ||
Used to calculate distance between two points | ||
Also an admissible and consistent heuristic for A* | ||
An admissible and consistent heuristic for A* | ||
''' | ||
@staticmethod | ||
def h(start: np.ndarray, goal: np.ndarray) -> int: | ||
return int(np.linalg.norm(start-goal, 1)) # L2 norm | ||
return int(np.linalg.norm(start-goal, 1)) # L1 norm | ||
|
||
@staticmethod | ||
def l2(start: np.ndarray, goal: np.ndarray) -> int: | ||
return int(np.linalg.norm(start-goal, 2)) # L2 norm | ||
|
||
''' | ||
Check whether the nearest static obstacle is within radius | ||
''' | ||
def safe_static(self, grid_pos: np.ndarray) -> bool: | ||
return self.h(grid_pos, self.static_obstacles.query(grid_pos)) > self.robot_radius | ||
return self.l2(grid_pos, self.static_obstacles.query(grid_pos)) > self.robot_radius | ||
|
||
''' | ||
Assume dynamic obstacles are agents with same radius, distance needs to be 2*radius | ||
''' | ||
def safe_dynamic(self, grid_pos: np.ndarray, time: int) -> bool: | ||
return all(self.h(grid_pos, obstacle) > 2*self.robot_radius | ||
return all(self.l2(grid_pos, obstacle) > 2*self.robot_radius | ||
for obstacle in self.dynamic_obstacles.setdefault(time, [])) | ||
|
||
''' | ||
Reconstruct path from A* search result | ||
''' | ||
def reconstruct_path(self, came_from: Dict[int, np.ndarray], current: np.ndarray): | ||
total_path = [current] | ||
while self.hash(current) in came_from.keys(): | ||
current = came_from[self.hash(current)] | ||
total_path.append(current) | ||
return total_path[::-1] | ||
|
||
''' | ||
Space-Time A* | ||
''' | ||
def plan(self): | ||
# Standard A* setup, no surprise here | ||
''' | ||
need to implement heap here | ||
''' | ||
open_set = [] | ||
def plan(self) -> np.ndarray: | ||
# Initialize the start state | ||
s = State(self.start, 0, 0, self.h(self.start, self.goal)) | ||
|
||
open_set = [s] | ||
closed_set = set() | ||
|
||
# Keep track of parent nodes for reconstruction | ||
came_from = dict() | ||
g_score = defaultdict(lambda: float('inf')) | ||
g_score[self.hash(self.start)] = 0 | ||
f_score = defaultdict(lambda: float('inf')) | ||
f_score[self.hash(self.start)] = self.h(self.start, self.goal) | ||
|
||
while open_set: | ||
iter_ = 0 | ||
while open_set and iter_ < self.max_iter: | ||
iter_ += 1 | ||
current_state = open_set[0] # Smallest element in min-heap | ||
if current_state.pos_equal_to(self.goal): | ||
if self.debug: | ||
print('Path found after {0} iterations'.format(iter_)) | ||
return self.reconstruct_path(came_from, current_state) | ||
|
||
closed_set.add(heappop(open_set)) | ||
epoch = current_state.time + 1 | ||
for neighbour in self.neighbour_table.lookup(current_state.pos): | ||
neighbour_state = State(neighbour, epoch, current_state.g_score + 1, self.h(neighbour, self.goal)) | ||
# Check if visited | ||
if neighbour_state in closed_set: | ||
continue | ||
|
||
# Avoid obstacles | ||
if not self.safe_static(neighbour) or not self.safe_dynamic(neighbour, epoch): | ||
continue | ||
|
||
# Add to open set | ||
if neighbour_state not in open_set: | ||
came_from[neighbour_state] = current_state | ||
heappush(open_set, neighbour_state) | ||
|
||
if self.debug: | ||
print('Open set is empty, no path found.') | ||
return None | ||
|
||
''' | ||
Reconstruct path from A* search result | ||
''' | ||
def reconstruct_path(self, came_from: Dict[State, State], current: State) -> np.ndarray: | ||
total_path = [current.pos] | ||
while current in came_from.keys(): | ||
current = came_from[current] | ||
total_path.append(current.pos) | ||
return np.array(total_path[::-1]) | ||
|
||
|
||
if __name__ == '__main__': | ||
grid_size = 1 | ||
robot_radius = 2 | ||
start = (3, 10) | ||
goal = (15, 20) | ||
static_obstacles = [(5, 15), (10, 20)] | ||
start = (2, 2) | ||
goal = (50, 50) | ||
static_obstacles = [(0, 0), (60, 70)] | ||
dynamic_obstacles = {0: [(5, 16)], 1: [(5, 17)], 2: [(5, 18), (11, 20)]} | ||
planner = Planner(grid_size, robot_radius, start, goal, static_obstacles, dynamic_obstacles) | ||
print(planner.neighbour_table.lookup(planner.grid.snap_to_grid([10,15]))) | ||
print(planner.plan()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
#!/usr/bin/env python3 | ||
''' | ||
Author: Haoran Peng | ||
Email: [email protected] | ||
''' | ||
import numpy as np | ||
|
||
class State: | ||
|
||
def __init__(self, pos: np.ndarray, time: int, g_score: int, h_score: int): | ||
self.pos = pos | ||
self.time = time | ||
self.g_score = g_score | ||
self.f_score = g_score + h_score | ||
|
||
def __hash__(self) -> int: | ||
concat = str(self.pos[0]) + str(self.pos[1]) + '0' + str(self.time) | ||
return int(concat) | ||
|
||
def pos_equal_to(self, pos: np.ndarray) -> bool: | ||
return np.array_equal(self.pos, pos) | ||
|
||
def __lt__(self, other: 'State') -> bool: | ||
return self.f_score < other.f_score | ||
|
||
def __eq__(self, other: 'State') -> bool: | ||
return self.__hash__() == other.__hash__() | ||
|
||
def __str__(self): | ||
return 'State(pos=[' + str(self.pos[0]) + ', ' + str(self.pos[1]) + '], ' \ | ||
+ 'time=' + str(self.time) + ', fscore=' + str(self.f_score) + ')' | ||
|
||
def __repr__(self): | ||
return self.__str__() |