Source code for irsim.lib.algorithm.rvo

'''
This file is the implementation of the Reciprocal Velocity Obstacle (RVO) algorithm for multi-robot collision avoidance.

Author: Ruihua Han

reference: https://github.com/MengGuo/RVO_Py_MAS
'''

import numpy as np
from math import sin, cos, atan2, asin, pi
from irsim.util.util import dist_hypot

[docs] class reciprocal_vel_obs: """ A class to implement the Reciprocal Velocity Obstacle (RVO) algorithm for multi-robot collision avoidance. Args: state (list): The rvo state of the agent [x, y, vx, vy, radius, vx_des, vy_des]. obs_state_list (list) : List of states of static obstacles [[x, y, vx, vy, radius]]. vxmax (float): Maximum velocity in the x direction. vymax (float): Maximum velocity in the y direction. acce (float): Acceleration limit. """ def __init__( self, state: list, obs_state_list=[], vxmax=1.5, vymax=1.5, acce=0.5, factor=1.0, ): self.state = state self.obs_state_list = obs_state_list self.vxmax = vxmax self.vymax = vymax self.acce = acce self.factor = factor
[docs] def update(self, state, obs_state_list): self.state = state self.obs_state_list = obs_state_list
[docs] def cal_vel(self, mode="rvo"): """ Calculate the velocity of the agent based on the Reciprocal Velocity Obstacle (RVO) algorithm. Args: mode (str): The vo configure to calculate the velocity. It can be "rvo", "hrvo", or "vo". - rvo: Reciprocal Velocity Obstacle (RVO) algorithm, for multi-robot collision avoidance. - hrvo: Hybrid Reciprocal Velocity Obstacle (HRVO) algorithm, for multi-robot collision avoidance. - vo: Velocity Obstacle (VO) algorithm, for obstacle-robot collision avoidance. """ if mode == "rvo": rvo_list = self.config_rvo() elif mode == "hrvo": rvo_list = self.config_hrvo() elif mode == "vo": rvo_list = self.config_vo() else: print("wrong method mode, pleas input vo, rvo or hrvo") vo_outside, vo_inside = self.vel_candidate(rvo_list) rvo_vel = self.vel_select(vo_outside, vo_inside) return rvo_vel
[docs] def config_rvo(self): rvo_list = [] for obstacle in self.obs_state_list: rvo = self.config_rvo_mode(obstacle) rvo_list.append(rvo) return rvo_list
[docs] def config_rvo_mode(self, obstacle): x = self.state[0] y = self.state[1] vx = self.state[2] vy = self.state[3] r = self.state[4] if vx == 0 and vy == 0: mode = "sta_circular" else: mode = "moving" if mode == "moving": mx = obstacle[0] my = obstacle[1] mvx = obstacle[2] mvy = obstacle[3] mr = obstacle[4] rvo_apex = [(vx + mvx) / 2, (vy + mvy) / 2] elif mode == "sta_circular": mx = obstacle[0] my = obstacle[1] mvx = 0 mvy = 0 mr = obstacle[2] + 0.2 vo_apex = [mvx, mvy] rvo_apex = vo_apex # vo else: print("wrong rvo mode") dis_mr = np.sqrt((my - y) ** 2 + (mx - x) ** 2) angle_mr = atan2(my - y, mx - x) if dis_mr < r + mr: dis_mr = r + mr ratio = (r + mr) / dis_mr if ratio > 1: ratio = 1 if ratio < -1: ratio = -1 half_angle = asin(ratio) line_left_ori = angle_mr + half_angle line_right_ori = angle_mr - half_angle line_left_vector = [cos(line_left_ori), sin(line_left_ori)] line_right_vector = [cos(line_right_ori), sin(line_right_ori)] # return [rvo_apex, line_left_ori, line_right_ori] return [rvo_apex, line_left_vector, line_right_vector]
[docs] def config_hrvo(self): hrvo_list = [] for obstacle in self.obs_state_list: # for circular: [x, y, radius] hrvo = self.config_hrvo_mode(obstacle) hrvo_list.append(hrvo) return hrvo_list
[docs] def config_hrvo_mode(self, obstacle): x = self.state[0] y = self.state[1] vx = self.state[2] vy = self.state[3] r = self.state[4] if vx == 0 and vy == 0: mode = "sta_circular" else: mode = "moving" if mode == "moving": mx = obstacle[0] my = obstacle[1] mvx = obstacle[2] mvy = obstacle[3] mr = obstacle[4] elif mode == "sta_circular": mx = obstacle[0] my = obstacle[1] mvx = 0 mvy = 0 mr = obstacle[2] + 0.2 else: print("wrong hrvo mode") rvo_apex = [(vx + mvx) / 2, (vy + mvy) / 2] vo_apex = [mvx, mvy] dis_mr = np.sqrt((my - y) ** 2 + (mx - x) ** 2) angle_mr = atan2(my - y, mx - x) if dis_mr < r + mr: dis_mr = r + mr ratio = (r + mr) / dis_mr half_angle = asin(ratio) line_left_ori = angle_mr + half_angle line_right_ori = angle_mr - half_angle line_left_vector = [cos(line_left_ori), sin(line_left_ori)] line_right_vector = [cos(line_right_ori), sin(line_right_ori)] if mode == "moving": cl_vector = [mx - x, my - y] cur_v = [vx - rvo_apex[0], vy - rvo_apex[1]] dis_rv = dist_hypot(rvo_apex[0], rvo_apex[1], vo_apex[0], vo_apex[1]) radians_rv = atan2(rvo_apex[1] - vo_apex[1], rvo_apex[0] - vo_apex[0]) diff = line_left_ori - radians_rv temp = pi - 2 * half_angle if temp == 0: temp = temp + 0.01 dis_diff = dis_rv * sin(diff) / sin(temp) if reciprocal_vel_obs.cross_product(cl_vector, cur_v) <= 0: hrvo_apex = [ rvo_apex[0] - dis_diff * cos(line_right_ori), rvo_apex[1] - dis_diff * sin(line_right_ori), ] else: hrvo_apex = [ vo_apex[0] + dis_diff * cos(line_right_ori), vo_apex[1] + dis_diff * sin(line_right_ori), ] return [hrvo_apex, line_left_vector, line_right_vector] elif mode == "sta_circular": return [vo_apex, line_left_vector, line_right_vector]
[docs] def config_vo(self): vo_list = [] for obstacle in self.obs_state_list: vo = self.config_vo_mode(obstacle) vo_list.append(vo) return vo_list
[docs] def config_vo_mode(self, obstacle): x = self.state[0] y = self.state[1] vx = self.state[2] vy = self.state[3] r = self.state[4] if vx == 0 and vy == 0: mode = "sta_circular" else: mode = "moving" if mode == "moving": mx = obstacle[0] my = obstacle[1] mvx = obstacle[2] mvy = obstacle[3] mr = obstacle[4] elif mode == "sta_circular": mx = obstacle[0] my = obstacle[1] mvx = 0 mvy = 0 mr = obstacle[2] + 0.2 else: print("wrong obstacle mode") vo_apex = [mvx, mvy] dis_mr = np.sqrt((my - y) ** 2 + (mx - x) ** 2) angle_mr = atan2(my - y, mx - x) if dis_mr < r + mr: dis_mr = r + mr ratio = (r + mr) / dis_mr if ratio > 1: ratio = 1 if ratio < -1: ratio = -1 half_angle = asin(ratio) line_left_ori = angle_mr + half_angle line_right_ori = angle_mr - half_angle line_left_vector = [cos(line_left_ori), sin(line_left_ori)] line_right_vector = [cos(line_right_ori), sin(line_right_ori)] return [vo_apex, line_left_vector, line_right_vector]
[docs] def vel_candidate(self, rvo_list): vo_outside = [] vo_inside = [] cur_vx = self.state[2] cur_vy = self.state[3] cur_vx_min = max((cur_vx - self.acce), -self.vxmax) cur_vx_max = min((cur_vx + self.acce), self.vxmax) cur_vy_min = max((cur_vy - self.acce), -self.vymax) cur_vy_max = min((cur_vy + self.acce), self.vymax) for new_vx in np.arange(cur_vx_min, cur_vx_max, 0.05): for new_vy in np.arange(cur_vy_min, cur_vy_max, 0.05): if self.vo_out(new_vx, new_vy, rvo_list): vo_outside.append([new_vx, new_vy]) else: vo_inside.append([new_vx, new_vy]) return vo_outside, vo_inside
[docs] def vo_out(self, vx, vy, rvo_list): for rvo in rvo_list: rel_vx = vx - rvo[0][0] rel_vy = vy - rvo[0][1] # rel_radians = atan2(rel_vy, rel_vx) rel_vector = [rel_vx, rel_vy] if reciprocal_vel_obs.between_vector(rvo[1], rvo[2], rel_vector): return False return True
[docs] def vel_select(self, vo_outside, vo_inside): vel_des = [self.state[5], self.state[6]] if len(vo_outside) != 0: return min( vo_outside, key=lambda v: dist_hypot(v[0], v[1], vel_des[0], vel_des[1]) ) else: return min(vo_inside, key=lambda v: self.penalty(v, vel_des, self.factor))
[docs] def penalty(self, vel, vel_des, factor): tc_list = [] for moving in self.obs_state_list: distance = dist_hypot( moving[0], moving[1], self.state[0], self.state[1] ) diff = distance**2 - (self.state[4] + moving[4]) ** 2 if diff < 0: diff = 0 dis_vel = np.sqrt(diff) vel_trans = [ 2 * vel[0] - self.state[2] - moving[2], 2 * vel[1] - self.state[3] - moving[3], ] vel_trans_speed = np.sqrt(vel_trans[0] ** 2 + vel_trans[1] ** 2) + 1e-7 tc = dis_vel / vel_trans_speed tc_list.append(tc) tc_min = min(tc_list) if tc_min == 0: tc_min = 0.0001 penalty_vel = factor * (1 / tc_min) + dist_hypot(vel_des[0], vel_des[1], vel[0], vel[1]) return penalty_vel
# judge the direction by vector
[docs] @staticmethod def between_vector(line_left_vector, line_right_vector, line_vector): if ( reciprocal_vel_obs.cross_product(line_left_vector, line_vector) <= 0 and reciprocal_vel_obs.cross_product(line_right_vector, line_vector) >= 0 ): return True else: return False
[docs] @staticmethod def cross_product(vector1, vector2): return float(vector1[0] * vector2[1] - vector2[0] * vector1[1])