from math import cos, sin
from typing import Any
import numpy as np
from irsim.lib import reciprocal_vel_obs, register_behavior, social_force_model
from irsim.util.util import WrapToPi, omni_to_diff, relative_position
"""
Behavior Methods Module
This module contains behavior functions for different types of robots in the IR-Sim environment.
It provides implementations for:
1. Registered behavior functions (decorated with @register_behavior):
- beh_diff_rvo: Differential drive robot with RVO behavior
- beh_diff_dash: Differential drive robot with dash-to-goal behavior
- beh_diff_sfm: Differential drive robot with Social Force Model behavior
- beh_omni_dash: Omnidirectional robot with dash-to-goal behavior
- beh_omni_rvo: Omnidirectional robot with RVO behavior
- beh_omni_sfm: Omnidirectional robot with Social Force Model behavior
- beh_acker_dash: Ackermann steering robot with dash-to-goal behavior
2. Core behavior calculation functions:
- OmniRVO: Omnidirectional RVO velocity calculation
- DiffRVO: Differential drive RVO velocity calculation
- OmniDash: Omnidirectional dash-to-goal velocity calculation
- DiffDash: Differential drive dash-to-goal velocity calculation
- AckerDash: Ackermann steering dash-to-goal velocity calculation
- SFMVelocity: Social Force Model velocity calculation
These functions are designed to be used with the robot behavior system to control
robot movements in various scenarios including collision avoidance (RVO) and
goal-reaching behaviors (dash).
See :mod:`irsim.lib.algorithm.social_force_model` for the SFM algorithmic
details.
"""
[docs]
@register_behavior("diff", "rvo")
def beh_diff_rvo(
ego_object: Any, external_objects: list[Any], **kwargs: Any
) -> np.ndarray:
"""
Behavior function for differential drive robot using RVO (Reciprocal Velocity Obstacles).
Args:
ego_object: The ego robot object.
external_objects (list): List of external objects in the environment.
**kwargs: Additional keyword arguments:
- vxmax (float): Maximum x velocity, default 1.5.
- vymax (float): Maximum y velocity, default 1.5.
- acce (float): Acceleration factor, default 1.0.
- factor (float): Additional scaling factor, default 1.0.
- mode (str): RVO calculation mode, default "rvo".
- neighbor_threshold (float): Neighbor threshold distance, default 10.0.
Returns:
np.array: Velocity [linear, angular] (2x1) for differential drive.
"""
if ego_object.goal is None:
if ego_object._world_param.count % 10 == 0:
ego_object.logger.warning(
"Goal is currently None. This rvo behavior is waiting for goal configuration"
)
return np.zeros((2, 1))
rvo_neighbor = []
line_segments = []
for obj in external_objects:
segs = obj.rvo_line_segments
if segs:
line_segments.extend(segs)
else:
rvo_neighbor.append(obj.rvo_neighbor_state)
rvo_state = ego_object.rvo_state
vxmax = kwargs.get("vxmax", 1.5)
vymax = kwargs.get("vymax", 1.5)
acce = kwargs.get("acce", 1.0)
factor = kwargs.get("factor", 1.0)
mode = kwargs.get("mode", "rvo")
neighbor_threshold = kwargs.get("neighbor_threshold", 3.0)
return DiffRVO(
rvo_state,
rvo_neighbor,
vxmax,
vymax,
acce,
factor,
mode,
neighbor_threshold,
line_segments=line_segments,
)
[docs]
@register_behavior("diff", "dash")
def beh_diff_dash(
ego_object: Any, external_objects: list[Any], **kwargs: Any
) -> np.ndarray:
"""
Behavior function for differential drive robot using dash-to-goal behavior.
Args:
ego_object: The ego robot object.
external_objects (list): List of external objects in the environment.
**kwargs: Additional keyword arguments:
- angle_tolerance (float): Allowable angular deviation, default 0.1.
Returns:
np.array: Velocity [linear, angular] (2x1) for differential drive.
"""
state = ego_object.state
goal = ego_object.goal
goal_threshold = ego_object.goal_threshold
_, max_vel = ego_object.get_vel_range()
angle_tolerance = kwargs.get("angle_tolerance", 0.1)
if goal is None:
if ego_object._world_param.count % 10 == 0:
ego_object.logger.warning(
"Goal is currently None. This dash behavior is waiting for goal configuration"
)
return np.zeros((2, 1))
return DiffDash(state, goal, max_vel, goal_threshold, angle_tolerance)
[docs]
@register_behavior("omni", "dash")
def beh_omni_dash(
ego_object: Any, external_objects: list[Any], **kwargs: Any
) -> np.ndarray:
"""
Behavior function for omnidirectional robot using dash-to-goal behavior.
Args:
ego_object: The ego robot object.
external_objects (list): List of external objects in the environment.
**kwargs: Additional keyword arguments (currently unused).
Returns:
np.array: Velocity [vx, vy] (2x1) for omnidirectional drive.
"""
if ego_object.goal is None:
if ego_object._world_param.count % 10 == 0:
ego_object.logger.warning(
"Goal is currently None. This dash behavior is waiting for goal configuration"
)
return np.zeros((2, 1))
state = ego_object.state
goal = ego_object.goal
goal_threshold = ego_object.goal_threshold
_, max_vel = ego_object.get_vel_range()
return OmniDash(state, goal, max_vel, goal_threshold)
[docs]
@register_behavior("omni", "rvo")
def beh_omni_rvo(
ego_object: Any, external_objects: list[Any], **kwargs: Any
) -> np.ndarray:
"""
Behavior function for omnidirectional robot using RVO (Reciprocal Velocity Obstacles).
Args:
ego_object: The ego robot object.
external_objects (list): List of external objects in the environment.
**kwargs: Additional keyword arguments:
- vxmax (float): Maximum x velocity, default 1.5.
- vymax (float): Maximum y velocity, default 1.5.
- acce (float): Acceleration factor, default 1.0.
- factor (float): Additional scaling factor, default 1.0.
- mode (str): RVO calculation mode, default "rvo".
- neighbor_threshold (float): Neighbor threshold distance, default 3.0.
Returns:
np.array: Velocity [vx, vy] (2x1) for omnidirectional drive.
"""
if ego_object.goal is None:
if ego_object._world_param.count % 10 == 0:
ego_object.logger.warning(
"Goal is currently None. This rvo behavior is waiting for goal configuration"
)
return np.zeros((2, 1))
rvo_neighbor = []
line_segments = []
for obj in external_objects:
segs = obj.rvo_line_segments
if segs:
line_segments.extend(segs)
else:
rvo_neighbor.append(obj.rvo_neighbor_state)
rvo_state = ego_object.rvo_state
vxmax = kwargs.get("vxmax", 1.5)
vymax = kwargs.get("vymax", 1.5)
acce = kwargs.get("acce", 1.0)
factor = kwargs.get("factor", 1.0)
mode = kwargs.get("mode", "rvo")
neighbor_threshold = kwargs.get("neighbor_threshold", 3.0)
return OmniRVO(
rvo_state,
rvo_neighbor,
vxmax,
vymax,
acce,
factor,
mode,
neighbor_threshold,
line_segments=line_segments,
)
[docs]
@register_behavior("diff", "sfm")
def beh_diff_sfm(
ego_object: Any, external_objects: list[Any], **kwargs: Any
) -> np.ndarray:
"""
Behavior function for differential drive robot using the Social Force Model.
Args:
ego_object: The ego robot object.
external_objects (list): List of external objects in the environment.
**kwargs: SFM tuning parameters; see :func:`SFMVelocity`.
Returns:
np.array: Velocity [linear, angular] (2x1) for differential drive.
"""
if ego_object.goal is None:
if ego_object._world_param.count % 10 == 0:
ego_object.logger.warning(
"Goal is currently None. This sfm behavior is waiting for goal configuration"
)
return np.zeros((2, 1))
neighbors = []
line_segments = []
for obj in external_objects:
segs = obj.rvo_line_segments
if segs:
line_segments.extend(segs)
else:
neighbors.append(obj.rvo_neighbor_state)
vx, vy = SFMVelocity(
ego_object.rvo_state,
neighbors,
line_segments,
step_time=ego_object._world_param.step_time,
**kwargs,
)
# SFM produces a holonomic ``(vx, vy)``; track it tightly so the small
# sideways component from anisotropic social/obstacle forces doesn't
# get clipped by ``omni_to_diff``'s default deadband.
_, vmax_pair = ego_object.get_vel_range()
w_max = float(vmax_pair[1, 0])
return omni_to_diff(
ego_object.rvo_state[-1],
[vx, vy],
w_max=w_max,
guarantee_time=ego_object._world_param.step_time,
tolerance=0.02,
)
[docs]
@register_behavior("omni", "sfm")
def beh_omni_sfm(
ego_object: Any, external_objects: list[Any], **kwargs: Any
) -> np.ndarray:
"""
Behavior function for omnidirectional robot using the Social Force Model.
Args:
ego_object: The ego robot object.
external_objects (list): List of external objects in the environment.
**kwargs: SFM tuning parameters; see :func:`SFMVelocity`.
Returns:
np.array: Velocity [vx, vy] (2x1) for omnidirectional drive.
"""
if ego_object.goal is None:
if ego_object._world_param.count % 10 == 0:
ego_object.logger.warning(
"Goal is currently None. This sfm behavior is waiting for goal configuration"
)
return np.zeros((2, 1))
neighbors = []
line_segments = []
for obj in external_objects:
segs = obj.rvo_line_segments
if segs:
line_segments.extend(segs)
else:
neighbors.append(obj.rvo_neighbor_state)
vx, vy = SFMVelocity(
ego_object.rvo_state,
neighbors,
line_segments,
step_time=ego_object._world_param.step_time,
**kwargs,
)
return np.array([[vx], [vy]])
[docs]
@register_behavior("omni_angular", "dash")
def beh_omni_angular_dash(
ego_object: Any, external_objects: list[Any], **kwargs: Any
) -> np.ndarray:
"""
Behavior function for omnidirectional-angular robot using dash-to-goal behavior.
Args:
ego_object: The ego robot object.
external_objects (list): List of external objects in the environment.
**kwargs: Additional keyword arguments:
- angle_tolerance (float): Allowable angular deviation, default 0.1.
Returns:
np.array: Velocity [forward, lateral, yaw_rate] (3x1) in body frame.
"""
if ego_object.goal is None:
if ego_object._world_param.count % 10 == 0:
ego_object.logger.warning(
"Goal is currently None. This dash behavior is waiting for goal configuration"
)
return np.zeros((3, 1))
state = ego_object.state
goal = ego_object.goal
goal_threshold = ego_object.goal_threshold
_, max_vel = ego_object.get_vel_range()
angle_tolerance = kwargs.get("angle_tolerance", 0.1)
return OmniAngularDash(state, goal, max_vel, goal_threshold, angle_tolerance)
[docs]
@register_behavior("acker", "dash")
def beh_acker_dash(
ego_object: Any, external_objects: list[Any], **kwargs: Any
) -> np.ndarray:
"""
Behavior function for Ackermann steering robot using dash-to-goal behavior.
Args:
ego_object: The ego robot object.
external_objects (list): List of external objects in the environment.
**kwargs: Additional keyword arguments:
- angle_tolerance (float): Allowable angular deviation, default 0.1.
Returns:
np.array: Velocity [linear, steering angle] (2x1) for Ackermann drive.
"""
if ego_object.goal is None:
if ego_object._world_param.count % 10 == 0:
ego_object.logger.warning(
"Goal is currently None. This dash behavior is waiting for goal configuration"
)
return np.zeros((2, 1))
state = ego_object.state
goal = ego_object.goal
goal_threshold = ego_object.goal_threshold
_, max_vel = ego_object.get_vel_range()
angle_tolerance = kwargs.get("angle_tolerance", 0.1)
return AckerDash(state, goal, max_vel, goal_threshold, angle_tolerance)
[docs]
def SFMVelocity(
state_tuple: Any,
neighbor_list: list[Any] | None = None,
line_segments: list[list[float]] | None = None,
vmax: float = 1.5,
step_time: float = 0.1,
neighbor_threshold: float = 10.0,
relaxation_time: float = 0.5,
force_factor_desired: float = 1.0,
force_factor_social: float = 2.1,
force_factor_obstacle: float = 10.0,
sigma_obstacle: float = 0.8,
lambda_importance: float = 2.0,
gamma: float = 0.35,
n_angular: float = 2.0,
n_velocity: float = 3.0,
safety_radius: float = 0.0,
**_: Any,
) -> tuple[float, float]:
"""
Compute the next world-frame velocity using the Social Force Model.
Anisotropic Moussaid-Helbing (2009) variant.
Args:
state_tuple: Full RVO-style state
``[x, y, vx, vy, radius, vx_des, vy_des, theta]``.
neighbor_list: Neighbour states ``[[x, y, vx, vy, radius], ...]``.
line_segments: Line obstacles ``[[x1, y1, x2, y2], ...]``.
vmax: Speed cap.
step_time: Integration step.
neighbor_threshold: Spatial cutoff for social interactions.
relaxation_time, force_factor_*, sigma_obstacle, lambda_importance,
gamma, n_angular, n_velocity: SFM tuning parameters
(see :class:`social_force_model`).
Returns:
tuple[float, float]: Updated world-frame velocity ``(vx, vy)``.
"""
if neighbor_list is None:
neighbor_list = []
if line_segments is None:
line_segments = []
x, y = state_tuple[0], state_tuple[1]
filtered_neighbors = [
nb
for nb in neighbor_list
if (x - nb[0]) ** 2 + (y - nb[1]) ** 2 < neighbor_threshold**2
]
# ``rvo_state`` provides ``(vx_des, vy_des)`` with components scaled by
# the kinematics-specific ``vel_max`` (anisotropic for diff/acker).
# SFM expects ``desired_direction * vmax``, so renormalise here.
state_tuple = list(state_tuple)
vx_des, vy_des = state_tuple[5], state_tuple[6]
norm = (vx_des * vx_des + vy_des * vy_des) ** 0.5
if norm > 1e-9:
state_tuple[5] = vmax * vx_des / norm
state_tuple[6] = vmax * vy_des / norm
sfm = social_force_model(
state=state_tuple,
neighbor_list=filtered_neighbors,
line_obs_list=line_segments,
vmax=vmax,
step_time=step_time,
relaxation_time=relaxation_time,
force_factor_desired=force_factor_desired,
force_factor_social=force_factor_social,
force_factor_obstacle=force_factor_obstacle,
sigma_obstacle=sigma_obstacle,
lambda_importance=lambda_importance,
gamma=gamma,
n_angular=n_angular,
n_velocity=n_velocity,
neighbor_range=neighbor_threshold,
safety_radius=safety_radius,
)
vx, vy = sfm.cal_vel()
return vx, vy
[docs]
def OmniRVO(
state_tuple: Any,
neighbor_list: list[Any] | None = None,
vxmax: float = 1.5,
vymax: float = 1.5,
acce: float = 1,
factor: float = 1.0,
mode: str = "rvo",
neighbor_threshold: float = 3.0,
line_segments: list[list[float]] | None = None,
) -> np.ndarray:
"""
Calculate the omnidirectional velocity using RVO.
Args:
state_tuple (tuple): Current state and orientation.
neighbor_list (list): List of neighboring agents (default None).
vxmax (float): Maximum x velocity (default 1.5).
vymax (float): Maximum y velocity (default 1.5).
acce (float): Acceleration factor (default 1).
factor (float): Additional scaling factor (default 1.0).
mode (str): RVO calculation mode (default "rvo").
neighbor_threshold (float): Neighbor threshold (default 3.0).
line_segments (list): Line segments [[x1, y1, x2, y2], ...] (default None).
Returns:
np.array: Velocity [vx, vy] (2x1).
"""
if neighbor_list is None:
neighbor_list = []
x, y = state_tuple[0], state_tuple[1]
filtered_neighbor_list = [
neighbor
for neighbor in neighbor_list
if (x - neighbor[0]) ** 2 + (y - neighbor[1]) ** 2 < neighbor_threshold**2
]
rvo_behavior = reciprocal_vel_obs(
state_tuple,
filtered_neighbor_list,
vxmax,
vymax,
acce,
factor,
line_obs_list=line_segments,
)
rvo_vel = rvo_behavior.cal_vel(mode)
return np.array([[rvo_vel[0]], [rvo_vel[1]]])
[docs]
def DiffRVO(
state_tuple: Any,
neighbor_list: list[Any] | None = None,
vxmax: float = 1.5,
vymax: float = 1.5,
acce: float = 1,
factor: float = 1.0,
mode: str = "rvo",
neighbor_threshold: float = 3.0,
line_segments: list[list[float]] | None = None,
) -> np.ndarray:
"""
Calculate the differential drive velocity using RVO.
Args:
state_tuple (tuple): Current state and orientation.
neighbor_list (list): List of neighboring agents (default None).
vxmax (float): Maximum x velocity (default 1.5).
vymax (float): Maximum y velocity (default 1.5).
acce (float): Acceleration factor (default 1).
factor (float): Additional scaling factor (default 1.0).
mode (str): RVO calculation mode (default "rvo").
neighbor_threshold (float): Neighbor threshold (default 3.0).
line_segments (list): Line segments [[x1, y1, x2, y2], ...] (default None).
Returns:
np.array: Velocity [linear, angular] (2x1).
"""
if neighbor_list is None:
neighbor_list = []
x, y = state_tuple[0], state_tuple[1]
filtered_neighbor_list = [
neighbor
for neighbor in neighbor_list
if (x - neighbor[0]) ** 2 + (y - neighbor[1]) ** 2 < neighbor_threshold**2
]
rvo_behavior = reciprocal_vel_obs(
state_tuple,
filtered_neighbor_list,
vxmax,
vymax,
acce,
factor,
line_obs_list=line_segments,
)
rvo_vel = rvo_behavior.cal_vel(mode)
return omni_to_diff(state_tuple[-1], rvo_vel)
[docs]
def OmniDash(
state: np.ndarray,
goal: np.ndarray,
max_vel: np.ndarray,
goal_threshold: float = 0.1,
) -> np.ndarray:
"""
Calculate the body-frame velocity to reach a goal.
Args:
state (np.array): Current state [x, y, theta] (3x1 or 2x1).
goal (np.array): Goal position [x, y] (2x1).
max_vel (np.array): Maximum velocity [forward, lateral] (2x1).
goal_threshold (float): Distance threshold to consider goal reached (default 0.1).
Returns:
np.array: Body-frame velocity [forward, lateral] (2x1).
"""
distance, radian = relative_position(state, goal)
theta = state[2, 0] if state.shape[0] > 2 else 0.0
if distance > goal_threshold:
diff_angle = WrapToPi(radian - theta)
forward = max_vel[0, 0] * cos(diff_angle)
lateral = max_vel[1, 0] * sin(diff_angle)
else:
forward = 0
lateral = 0
return np.array([[forward], [lateral]])
[docs]
def OmniAngularDash(
state: np.ndarray,
goal: np.ndarray,
max_vel: np.ndarray,
goal_threshold: float = 0.3,
angle_tolerance: float = 0.1,
) -> np.ndarray:
"""
Calculate body-frame velocity to reach a goal.
Drives forward and strafes laterally toward the goal while turning
to face it. After arriving at the goal position, rotates in place
to match the goal orientation.
Args:
state (np.array): Current state [x, y, theta] (3x1).
goal (np.array): Goal state [x, y, theta] (3x1).
max_vel (np.array): Maximum velocity [forward, lateral, yaw_rate] (3x1).
goal_threshold (float): Distance threshold to consider goal reached (default 0.3).
angle_tolerance (float): Allowable angular deviation (default 0.1).
Returns:
np.array: Body-frame velocity [forward, lateral, yaw_rate] (3x1).
"""
distance, radian = relative_position(state, goal)
theta = state[2, 0]
if distance > goal_threshold:
diff_angle = WrapToPi(radian - theta)
forward = max_vel[0, 0] * cos(diff_angle)
lateral = max_vel[1, 0] * sin(diff_angle)
target_angle = radian
else:
forward = 0
lateral = 0
target_angle = goal[2, 0] if goal.shape[0] > 2 else theta
diff_angle = WrapToPi(target_angle - theta)
if abs(diff_angle) > angle_tolerance:
yaw_rate = max_vel[2, 0] * np.sign(diff_angle)
else:
yaw_rate = 0
return np.array([[forward], [lateral], [yaw_rate]])
[docs]
def DiffDash(
state: np.ndarray,
goal: np.ndarray,
max_vel: np.ndarray,
goal_threshold: float = 0.1,
angle_tolerance: float = 0.2,
) -> np.ndarray:
"""
Calculate the differential drive velocity to reach a goal.
Args:
state (np.array): Current state [x, y, theta] (3x1).
goal (np.array): Goal position [x, y] (2x1).
max_vel (np.array): Maximum velocity [linear, angular] (2x1).
goal_threshold (float): Distance threshold to consider goal reached (default 0.1).
angle_tolerance (float): Allowable angular deviation (default 0.2).
Returns:
np.array: Velocity [linear, angular] (2x1).
"""
distance, radian = relative_position(state, goal)
if distance < goal_threshold:
return np.zeros((2, 1))
diff_radian = WrapToPi(radian - state[2, 0])
linear = max_vel[0, 0] * np.cos(diff_radian)
if abs(diff_radian) < angle_tolerance:
angular = 0
else:
angular = max_vel[1, 0] * np.sign(diff_radian)
return np.array([[linear], [angular]])
[docs]
def AckerDash(
state: np.ndarray,
goal: np.ndarray,
max_vel: np.ndarray,
goal_threshold: float,
angle_tolerance: float,
) -> np.ndarray:
"""
Calculate the Ackermann steering velocity to reach a goal.
Args:
state (np.array): Current state [x, y, theta] (3x1).
goal (np.array): Goal position [x, y] (2x1).
max_vel (np.array): Maximum velocity [linear, steering angle] (2x1).
goal_threshold (float): Distance threshold to consider goal reached.
angle_tolerance (float): Allowable angular deviation.
Returns:
np.array: Velocity [linear, steering angle] (2x1).
"""
dis, radian = relative_position(state, goal)
steer_opt = 0.0
diff_radian = WrapToPi(radian - state[2, 0])
if diff_radian > -angle_tolerance and diff_radian < angle_tolerance:
diff_radian = 0
if dis < goal_threshold:
v_opt, steer_opt = 0, 0
else:
v_opt = max_vel[0, 0]
steer_opt = np.clip(diff_radian, -max_vel[1, 0], max_vel[1, 0])
return np.array([[v_opt], [steer_opt]])