"""
A* grid planning.
Collision precedence:
1. Grid lookup when ``env_map.grid`` is not ``None``; if occupied, collision.
2. When the grid reports free or is unavailable, Shapely vs. obstacle_list.
(Grid and obstacle_list are combined when both are present.)
author: Atsushi Sakai(@Atsushi_twi)
Nikos Kanargias (nkana@tee.gr)
adapted by: Reinis Cimurs
See Wikipedia article (https://en.wikipedia.org/wiki/A*_search_algorithm)
"""
from __future__ import annotations
import contextlib
import math
import matplotlib.pyplot as plt
import numpy as np
from irsim.lib.handler.geometry_handler import GeometryFactory
from irsim.util.util import to_numpy
from irsim.world.map import EnvGridMap
[docs]
class AStarPlanner:
def __init__(self, env_map: EnvGridMap) -> None:
"""
Initialize A* planner.
Args:
env_map: Environment map (any :class:`~irsim.world.map.EnvGridMap`
compatible object).
"""
self._map = env_map
self.obstacle_list = env_map.obstacle_list[:]
off = np.asarray(env_map.world_offset, dtype=float).flatten()
self.origin_x = float(off[0])
self.origin_y = float(off[1])
self.min_x, self.min_y = 0, 0 # grid indices are 0-based
self.max_x = self.origin_x + env_map.width
self.max_y = self.origin_y + env_map.height
# When map has a grid, use its actual resolution and shape so planner grid
# matches collision lookups (avoids "Open set is empty" on resolution mismatch).
grid = getattr(env_map, "grid", None)
gr = None
if grid is not None and hasattr(env_map, "grid_resolution"):
with contextlib.suppress(Exception):
gr = env_map.grid_resolution
if grid is not None and gr is not None:
self.resolution = gr[0] # m/cell; assume square cells (gr[0]==gr[1])
self.x_width = grid.shape[0]
self.y_width = grid.shape[1]
else:
self.resolution = env_map.resolution
self.x_width = round((self.max_x - self.origin_x) / self.resolution)
self.y_width = round((self.max_y - self.origin_y) / self.resolution)
self.motion = self.get_motion_model()
[docs]
class Node:
"""Node class"""
def __init__(self, x: int, y: int, cost: float, parent_index: int) -> None:
"""
Initialize Node
Args:
x (float): x position of the node
y (float): y position of the node
cost (float): heuristic cost of the node
parent_index (int): Nodes parent index
"""
self.x = x # index of grid
self.y = y # index of grid
self.cost = cost
self.parent_index = parent_index
def __str__(self) -> str:
"""str function for Node class"""
return (
str(self.x)
+ ","
+ str(self.y)
+ ","
+ str(self.cost)
+ ","
+ str(self.parent_index)
)
[docs]
def planning(
self,
start_pose: np.ndarray,
goal_pose: np.ndarray,
show_animation: bool = True,
) -> tuple[list[float], list[float]]:
"""
A star path search
Args:
start_pose (np.array): start pose [x,y]
goal_pose (np.array): goal pose [x,y]
show_animation (bool): If true, shows the animation of planning process
Returns:
(np.array): xy position array of the final path
"""
start_node = self.Node(
self.calc_xy_index(float(to_numpy(start_pose)[0].item()), self.origin_x),
self.calc_xy_index(float(to_numpy(start_pose)[1].item()), self.origin_y),
0.0,
-1,
)
goal_node = self.Node(
self.calc_xy_index(float(to_numpy(goal_pose)[0].item()), self.origin_x),
self.calc_xy_index(float(to_numpy(goal_pose)[1].item()), self.origin_y),
0.0,
-1,
)
open_set, closed_set = {}, {}
open_set[self.calc_grid_index(start_node)] = start_node
while True:
if len(open_set) == 0:
print("Open set is empty..")
break
c_id = min(
open_set,
key=lambda o: open_set[o].cost
+ self.calc_heuristic(goal_node, open_set[o]),
)
current = open_set[c_id]
# show graph
if show_animation: # pragma: no cover
plt.plot(
self.calc_grid_position(current.x, self.origin_x),
self.calc_grid_position(current.y, self.origin_y),
"xc",
)
# for stopping simulation with the esc key.
plt.gcf().canvas.mpl_connect(
"key_release_event",
lambda event: plt.close(event.canvas.figure)
if event.key == "escape"
else None,
)
if len(closed_set.keys()) % 10 == 0:
plt.pause(0.001)
if current.x == goal_node.x and current.y == goal_node.y:
print("Find goal")
goal_node.parent_index = current.parent_index
goal_node.cost = current.cost
break
# Remove the item from the open set
del open_set[c_id]
# Add it to the closed set
closed_set[c_id] = current
# expand_grid search grid based on motion model
for i, _ in enumerate(self.motion):
node = self.Node(
current.x + self.motion[i][0],
current.y + self.motion[i][1],
current.cost + self.motion[i][2],
c_id,
)
n_id = self.calc_grid_index(node)
# If the node is not safe, do nothing
if not self.verify_node(node):
continue
if n_id in closed_set:
continue
if n_id not in open_set:
open_set[n_id] = node # discovered a new node
else:
if open_set[n_id].cost > node.cost:
# This path is the best until now. record it
open_set[n_id] = node
rx, ry = self.calc_final_path(goal_node, closed_set)
return np.array([rx, ry])
[docs]
def calc_final_path(
self, goal_node: Node, closed_set: dict
) -> tuple[list[float], list[float]]:
"""Generate the final path
Args:
goal_node (Node): final goal node
closed_set (dict): dict of closed nodes
Returns:
rx (list): list of x positions of final path
ry (list): list of y positions of final path
"""
rx, ry = (
[self.calc_grid_position(goal_node.x, self.origin_x)],
[self.calc_grid_position(goal_node.y, self.origin_y)],
)
parent_index = goal_node.parent_index
while parent_index != -1:
n = closed_set[parent_index]
rx.append(self.calc_grid_position(n.x, self.origin_x))
ry.append(self.calc_grid_position(n.y, self.origin_y))
parent_index = n.parent_index
return rx, ry
[docs]
@staticmethod
def calc_heuristic(n1: Node, n2: Node) -> float:
w = 1.0 # weight of heuristic
return w * math.hypot(n1.x - n2.x, n1.y - n2.y)
[docs]
def calc_grid_position(self, index: int, min_position: float) -> float:
"""
calc grid position
Args:
index (int): index of a node
min_position (float): min value of search space
Returns:
(float): position of coordinates along the given axis
"""
return index * self.resolution + min_position
[docs]
def calc_xy_index(self, position: float, min_pos: float) -> int:
"""
calc xy index of node
Args:
position (float): position of a node
min_pos (float): min value of search space
Returns:
(int): index of position along the given axis
"""
return round((position - min_pos) / self.resolution)
[docs]
def calc_grid_index(self, node: Node) -> int:
"""
calc grid index of node
Args:
node (Node): node to calculate the index for
Returns:
(float): grid index of the node
"""
return (node.y - self.min_y) * self.x_width + (node.x - self.min_x)
[docs]
def verify_node(self, node: Node) -> bool:
"""
Check if node is acceptable - within limits of search space and free of collisions
Args:
node (Node): node to check
Returns:
(bool): True if node is acceptable. False otherwise
"""
px = self.calc_grid_position(node.x, self.origin_x)
py = self.calc_grid_position(node.y, self.origin_y)
if (
px < self.origin_x
or py < self.origin_y
or px >= self.max_x
or py >= self.max_y
):
return False
# collision check
return not self.check_node(px, py)
[docs]
def check_node(self, x: float, y: float) -> bool:
"""Check position for a collision.
Args:
x: World x coordinate of the cell centre.
y: World y coordinate of the cell centre.
Returns:
``True`` if a collision is detected.
"""
node_position = [x, y]
shape = {
"name": "rectangle",
"length": self.resolution,
"width": self.resolution,
}
gf = GeometryFactory.create_geometry(**shape)
geometry = gf.step(np.c_[node_position])
return self._map.is_collision(geometry)
[docs]
@staticmethod
def get_motion_model() -> list[list[float]]:
# dx, dy, cost
return [
[1, 0, 1],
[0, 1, 1],
[-1, 0, 1],
[0, -1, 1],
[-1, -1, math.sqrt(2)],
[-1, 1, math.sqrt(2)],
[1, -1, math.sqrt(2)],
[1, 1, math.sqrt(2)],
]