Source code for irsim.lib.path_planners.probabilistic_road_map

"""
Probabilistic Road Map (PRM) Planner.

Collision precedence:
  1. Grid lookup when ``env_map.grid`` is not ``None``; if occupied, collision.
  2. When the grid reports free or is unavailable, Shapely vs. obstacle_list.
  (Grid and obstacle_list are combined when both are present.)

author: Atsushi Sakai (@Atsushi_twi)

adapted by: Reinis Cimurs

"""

from __future__ import annotations

import math
from typing import Any

import matplotlib.pyplot as plt
import numpy as np
from scipy.spatial import KDTree

from irsim.lib.handler.geometry_handler import GeometryFactory
from irsim.util.random import rng as sim_rng
from irsim.world.map import EnvGridMap


[docs] class Node: """ Node class for dijkstra search """ def __init__(self, x: float, y: float, cost: float, parent_index: int) -> None: """ Initialize Node Args: x (float): x position of the node y (float): y position of the node cost (float): heuristic cost of the node parent_index (int): Nodes parent index """ self.x = x self.y = y self.cost = cost self.parent_index = parent_index def __str__(self) -> str: """str function for Node class""" return ( str(self.x) + "," + str(self.y) + "," + str(self.cost) + "," + str(self.parent_index) )
[docs] class PRMPlanner: def __init__( self, env_map: EnvGridMap, robot_radius: float, n_sample: int = 500, n_knn: int = 10, max_edge_len: float = 30.0, ) -> None: """ Initialize the PRM planner. Args: env_map: Environment map (any :class:`~irsim.world.map.EnvGridMap` compatible object). robot_radius: Robot radius modeled as a circle. n_sample: Number of sampled points. n_knn: Number of nearest neighbors per node. max_edge_len: Maximum allowed edge length. """ self._map = env_map self.rr = robot_radius self.obstacle_list = env_map.obstacle_list[:] off = np.asarray(env_map.world_offset, dtype=float).flatten() self.min_x = float(off[0]) self.min_y = float(off[1]) self.max_x = self.min_x + float(np.asarray(env_map.width).flat[0]) self.max_y = self.min_y + float(np.asarray(env_map.height).flat[0]) self.n_sample = n_sample self.n_knn = n_knn self.max_edge_len = max_edge_len
[docs] def planning( self, start_pose: np.ndarray, goal_pose: np.ndarray, rng: Any | None = None, show_animation: bool = True, ) -> tuple[list[float], list[float]] | None: """ Plan a path from start to goal using the PRM method. Args: start_pose (np.array): start pose [x,y] goal_pose (np.array): goal pose [x,y] rng (Optional): Random generator show_animation (bool): If true, shows the animation of planning process Returns: (np.array): xy position array of the final path """ start_pose = np.asarray(start_pose, dtype=float).flatten() goal_pose = np.asarray(goal_pose, dtype=float).flatten() start_x, start_y = float(start_pose[0]), float(start_pose[1]) goal_x, goal_y = float(goal_pose[0]), float(goal_pose[1]) sample_x, sample_y = self.sample_points(start_x, start_y, goal_x, goal_y, rng) if show_animation: plt.plot(sample_x, sample_y, ".b") road_map = self.generate_road_map(sample_x, sample_y) rx, ry = self.dijkstra_planning( start_x, start_y, goal_x, goal_y, road_map, sample_x, sample_y, show_animation, ) return np.array([rx, ry])
[docs] def check_node(self, x: float, y: float, rr: float) -> bool: """Check position for a collision. Args: x: World x coordinate. y: World y coordinate. rr: Robot radius for the check. Returns: ``True`` if a collision is detected. """ node_position = [x, y] shape = {"name": "circle", "radius": rr} gf = GeometryFactory.create_geometry(**shape) geometry = gf.step(np.c_[node_position]) return self._map.is_collision(geometry)
[docs] def is_collision(self, sx: float, sy: float, gx: float, gy: float) -> bool: """ Check if line between points is acceptable - within edge limits and free of collisions Args: sx (float): start x position sy (float): start y position gx (float): goal x position gy (float): goal y position Returns: result (bool): True if node is not acceptable. False otherwise """ x = sx y = sy dx = gx - sx dy = gy - sy yaw = math.atan2(gy - sy, gx - sx) d = math.hypot(dx, dy) if d >= self.max_edge_len: return True D = self.rr n_step = round(d / D) for _i in range(n_step): if self.check_node(x, y, self.rr): return True # collision x += D * math.cos(yaw) y += D * math.sin(yaw) # goal point check return self.check_node(gx, gy, self.rr)
[docs] def generate_road_map( self, sample_x: list[float], sample_y: list[float] ) -> list[list[int]]: """ Road map generation Args: sample_x (List): [m] x positions of sampled points sample_y (List): [m] y positions of sampled points Returns: road_map (List): list of edge ids """ road_map = [] n_sample = len(sample_x) sample_kd_tree = KDTree(np.vstack((sample_x, sample_y)).T) for _i, ix, iy in zip(range(n_sample), sample_x, sample_y, strict=True): _, indexes = sample_kd_tree.query([ix, iy], k=n_sample) edge_id = [] for ii in range(1, len(indexes)): nx = sample_x[indexes[ii]] ny = sample_y[indexes[ii]] if not self.is_collision(ix, iy, nx, ny): edge_id.append(indexes[ii]) if len(edge_id) >= self.n_knn: break road_map.append(edge_id) # self.plot_road_map(road_map, sample_x, sample_y) return road_map
[docs] @staticmethod def dijkstra_planning( sx: float, sy: float, gx: float, gy: float, road_map: list[list[int]], sample_x: list[float], sample_y: list[float], show_animation: bool, ) -> tuple[list[float], list[float]] | None: """ Args: sx (float): start x position [m] sy (float): start y position [m] gx (float): goal x position [m] gy (float): goal y position [m] road_map (list): list of edge ids sample_x (float): ??? [m] sample_y (float): ??? [m] Returns: (tuple(list, list)): Two lists of path coordinates ([x1, x2, ...], [y1, y2, ...]), empty list when no path was found """ start_node = Node(sx, sy, 0.0, -1) goal_node = Node(gx, gy, 0.0, -1) open_set, closed_set = {}, {} open_set[len(road_map) - 2] = start_node path_found = True while True: if not open_set: print("Cannot find path") path_found = False break c_id = min(open_set, key=lambda o: open_set[o].cost) current = open_set[c_id] # show graph if show_animation and len(closed_set.keys()) % 2 == 0: # for stopping simulation with the esc key. plt.gcf().canvas.mpl_connect( "key_release_event", lambda event: ( plt.close(event.canvas.figure) if event.key == "escape" else None ), ) plt.plot(current.x, current.y, "xg") plt.pause(0.001) if c_id == (len(road_map) - 1): print("goal is found!") goal_node.parent_index = current.parent_index goal_node.cost = current.cost break # Remove the item from the open set del open_set[c_id] # Add it to the closed set closed_set[c_id] = current # expand search grid based on motion model for i in range(len(road_map[c_id])): n_id = road_map[c_id][i] dx = sample_x[n_id] - current.x dy = sample_y[n_id] - current.y d = math.hypot(dx, dy) node = Node(sample_x[n_id], sample_y[n_id], current.cost + d, c_id) if n_id in closed_set: continue # Otherwise if it is already in the open set if n_id in open_set: if open_set[n_id].cost > node.cost: open_set[n_id].cost = node.cost open_set[n_id].parent_index = c_id else: open_set[n_id] = node if path_found is False: return [], [] # generate final course rx, ry = [goal_node.x], [goal_node.y] parent_index = goal_node.parent_index while parent_index != -1: n = closed_set[parent_index] rx.append(n.x) ry.append(n.y) parent_index = n.parent_index return rx, ry
[docs] @staticmethod def plot_road_map( road_map: list[list[int]], sample_x: list[float], sample_y: list[float] ) -> None: # pragma: no cover for i, _ in enumerate(road_map): for ii in range(len(road_map[i])): ind = road_map[i][ii] plt.plot( [sample_x[i], sample_x[ind]], [sample_y[i], sample_y[ind]], "-k" )
[docs] def sample_points( self, sx: float, sy: float, gx: float, gy: float, rng: Any | None ) -> tuple[list[float], list[float]]: """ Generate sample points Args: sx (float): start x position [m] sy (float): start y position [m] gx (float): goal x position [m] gy (float): goal y position [m] rng: Random generator Returns: sample (tuple (list, list)): sample positions """ sample_x, sample_y = [], [] if rng is None: rng = sim_rng while len(sample_x) <= self.n_sample: tx = (rng.random() * (self.max_x - self.min_x)) + self.min_x ty = (rng.random() * (self.max_y - self.min_y)) + self.min_y if not self.check_node(tx, ty, self.rr): sample_x.append(tx) sample_y.append(ty) sample_x.append(sx) sample_y.append(sy) sample_x.append(gx) sample_y.append(gy) return sample_x, sample_y