Source code for irsim.world.object_base

import itertools
import math
from collections import deque
from dataclasses import dataclass
from math import cos, pi, sin
from typing import Any, ClassVar

import matplotlib.transforms as mtransforms
import numpy as np
import shapely
from matplotlib import image
from matplotlib.patches import Arrow, Circle, Wedge
from mpl_toolkits.mplot3d import Axes3D
from shapely.geometry.base import BaseGeometry

from irsim.config.path_param import path_manager
from irsim.env.env_plot import draw_patch, linewidth_from_data_units, set_patch_property
from irsim.lib import Behavior, GeometryFactory, KinematicsFactory
from irsim.util.util import (
    WrapTo2Pi,
    WrapToPi,
    WrapToRegion,
    check_unknown_kwargs,
    file_check,
    is_2d_list,
    random_point_range,
    relative_position,
    to_numpy,
    vertices_transform,
)
from irsim.world.sensors.sensor_factory import SensorFactory


[docs] @dataclass class ObjectInfo: id: int shape: str kinematics: str role: str color: str static: bool goal: np.ndarray vel_min: np.ndarray vel_max: np.ndarray acce: np.ndarray angle_range: np.ndarray goal_threshold: float wheelbase: float G: np.ndarray h: np.ndarray cone_type: str convex_flag: bool name: str
[docs] def add_property(self, key, value): setattr(self, key, value)
[docs] @dataclass class ObstacleInfo: center: np.ndarray vertex: np.ndarray velocity: np.ndarray radius: float G: np.ndarray h: np.ndarray cone_type: str convex_flag: bool
[docs] def add_property(self, key, value): setattr(self, key, value)
[docs] class ObjectBase: """ Base class representing a generic object in the robot simulator. This class encapsulates common attributes and behaviors for all objects, including robots and obstacles, managing their state, velocity, goals, and kinematics. Args: shape (dict): Parameters defining the shape of the object for geometry creation. The dictionary should contain keys and values required by the GeometryFactory to create the object's geometry, such as 'type' (e.g., 'circle', 'rectangle') and associated parameters. Defaults to None. kinematics (dict): Parameters defining the kinematics of the object. Includes kinematic model and any necessary parameters. If None, no kinematics model is applied. Defaults to None. state (list of float): Initial state vector [x, y, theta, ...]. The state can have more dimensions depending on `state_dim`. Excess dimensions are truncated, and missing dimensions are filled with zeros. Defaults to [0, 0, 0]. velocity (list of float): Initial velocity vector [vx, vy] or according to the kinematics model. Defaults to [0, 0]. goal (list of float or list of list of float): Goal state vector [x, y, theta, ...] or [[x, y, theta], [x, y, theta], ...] for multiple goals Used by behaviors to determine the desired movement. Defaults to [10, 10, 0]. role (str): Role of the object in the simulation, e.g., "robot" or "obstacle". Defaults to "obstacle". color (str): Color of the object when plotted. Defaults to "k" (black). static (bool): Indicates if the object is static (does not move). Defaults to False. vel_min (list of float): Minimum velocity limits for each control dimension. Used to constrain the object's velocity. Defaults to [-1, -1]. vel_max (list of float): Maximum velocity limits for each control dimension. Used to constrain the object's velocity. Defaults to [1, 1]. acce (list of float): Acceleration limits, specifying the maximum change in velocity per time step. Defaults to [inf, inf]. angle_range (list of float): Allowed range of orientation angles [min, max] in radians. The object's orientation will be wrapped within this range. Defaults to [-pi, pi]. behavior (dict or str): Behavioral mode or configuration of the object. Can be a behavior name (str) or a dictionary with behavior parameters. If None, default behavior is applied. Defaults to {'name': 'dash'}, moving to the goal directly. group_behavior (dict): Shared behavior defaults for objects in the same group. When an object's own behavior configuration is empty or missing, the group behavior will be used as a fallback and exposed via `beh_config`. goal_threshold (float): Threshold distance to determine if the object has reached its goal. When the object is within this distance to the goal, it's considered to have arrived. Defaults to 0.1. sensors (list of dict): List of sensor configurations attached to the object. Each sensor configuration is a dictionary specifying sensor type and parameters. Defaults to None. arrive_mode (str): Mode for arrival detection, either "position" or "state". Determines how arrival at the goal is evaluated. Defaults to "position". description (str): Description or label for the object. Can be used for identification or attaching images in plotting. Defaults to None. group (int): Group identifier for organizational purposes, allowing objects to be grouped. Defaults to 0. state_dim (int): Dimension of the state vector. If None, it is inferred from the class attribute `state_shape`. Defaults to None. vel_dim (int): Dimension of the velocity vector. If None, it is inferred from the class attribute `vel_shape`. Defaults to None. unobstructed (bool): Indicates if the object should be considered to have an unobstructed path, ignoring obstacles in certain scenarios. Defaults to False. fov (float): Field of view angles in radians for the object's sensors. Defaults to None. If set lidar, the default value is angle range of lidar. fov_radius (float): Field of view radius for the object's sensors. Defaults to None. If set lidar, the default value is range_max of lidar. **kwargs: Additional keyword arguments for extended functionality. - plot (dict): Plotting options for the object. May include 'show_goal', 'show_text', 'show_arrow', 'show_uncertainty', 'show_trajectory', 'trail_freq', etc. Raises: ValueError: If dimension parameters do not match the provided shapes or if input parameters are invalid. Attributes: state_dim (int): Dimension of the state vector. state_shape (tuple): Shape of the state array. vel_dim (int): Dimension of the velocity vector. vel_shape (tuple): Shape of the velocity array. state (np.ndarray): Current state of the object. _init_state (np.ndarray): Initial state of the object. _velocity (np.ndarray): Current velocity of the object. _init_velocity (np.ndarray): Initial velocity of the object. _goal (np.ndarray): Goal state of the object. _init_goal (np.ndarray): Initial goal state of the object. _geometry (any): Geometry representation of the object. group (int): Group identifier for the object. stop_flag (bool): Flag indicating if the object should stop. arrive_flag (bool): Flag indicating if the object has arrived at the goal. collision_flag (bool): Flag indicating a collision has occurred. unobstructed (bool): Indicates if the object has an unobstructed path. static (bool): Indicates if the object is static. vel_min (np.ndarray): Minimum velocity limits. vel_max (np.ndarray): Maximum velocity limits. color (str): Color of the object. role (str): Role of the object (e.g., "robot", "obstacle"). info (ObjectInfo): Information container for the object. wheelbase (float): Distance between the front and rear wheels. Specified for ackermann robots. fov (float): Field of view angles in radians. fov_radius (float): Field of view radius. """ id_iter = itertools.count() vel_shape = (2, 1) state_shape = (3, 1) _VALID_PARAMS: ClassVar[set[str]] = { "shape", "kinematics", "state", "velocity", "goal", "role", "color", "static", "vel_min", "vel_max", "acce", "angle_range", "behavior", "group_behavior", "goal_threshold", "sensors", "arrive_mode", "description", "group", "group_name", "state_dim", "vel_dim", "unobstructed", "fov", "fov_radius", "name", "plot", # consumed by ObjectFactory before reaching __init__ "number", "distribution", } def __init__( self, shape: dict | None = None, kinematics: dict | None = None, state: list | None = None, velocity: list | None = None, goal: list | None = None, role: str = "obstacle", color: str = "k", static: bool = False, vel_min: list | None = None, vel_max: list | None = None, acce: list | None = None, angle_range: list | None = None, behavior: dict | None = None, group_behavior: dict | None = None, goal_threshold: float = 0.1, sensors: dict | None = None, arrive_mode: str = "position", description: str | None = None, group: int = 0, group_name: str | None = None, state_dim: int | None = None, vel_dim: int | None = None, unobstructed: bool = False, fov: float | None = None, fov_radius: float | None = None, name: str | None = None, **kwargs, ) -> None: """ Initialize an ObjectBase instance. This method sets up a new ObjectBase object with the specified parameters, initializing its geometry, kinematics, behaviors, sensors, and other properties relevant to simulation. The initialization process includes: - Setting up geometry handlers and collision detection - Configuring kinematics models for movement - Initializing state vectors and goal management - Setting up behaviors and sensor systems - Configuring visualization and plotting options Note: All parameters are documented in the class docstring above. Refer to the :py:class:`ObjectBase` class documentation for detailed parameter descriptions. Raises: ValueError: If dimension parameters do not match the provided shapes or if input parameters are invalid. """ # --- 1. Identity --- self._env = None self._id = next(ObjectBase.id_iter) self._name = name self.role = role self.group = group self._group_name = group_name self.description = description self.color = color # --- 2. Geometry & kinematics handlers --- if shape is None: self.logger.warning( f"No shape provided for object {self._id}, using default circle" ) shape = {"name": "circle", "radius": 1, "center": [0, 0]} self.gf = GeometryFactory.create_geometry(**shape) self.kf = ( KinematicsFactory.create_kinematics( wheelbase=self.wheelbase, role=role, **kinematics ) if kinematics is not None else None ) if self.gf is not None: self.G, self.h, self.cone_type, self.convex_flag = self.gf.get_init_Gh() else: self.G, self.h, self.cone_type, self.convex_flag = None, None, None, None # --- 3. Dimensions (derived from handlers) --- action_dim = self.kf.action_dim if self.kf else 2 self.state_dim = state_dim if state_dim is not None else self.state_shape[0] self.state_shape = ( (self.state_dim, 1) if state_dim is not None else self.state_shape ) self.vel_dim = vel_dim if vel_dim is not None else action_dim self.vel_shape = (self.vel_dim, 1) # --- 4. Resolve defaults from kf --- if angle_range is None: angle_range = [-pi, pi] if self.kf is not None: acce = self.kf.acce if acce is None else acce vel_max = self.kf.vel_max if vel_max is None else vel_max vel_min = self.kf.vel_min if vel_min is None else vel_min else: acce = acce or [float("inf"), float("inf")] vel_max = vel_max or [1, 1] vel_min = vel_min or [-1, -1] # --- 5. State & velocity --- if state is None: state = [0, 0, 0] if velocity is None: velocity = [0] * action_dim state = self.input_state_check(state, self.state_dim) self._state = np.c_[state] self._init_state = np.c_[state] self._velocity = np.c_[velocity] self._init_velocity = np.c_[velocity] self.vel_min = np.c_[vel_min] self.vel_max = np.c_[vel_max] self.static = static if self.kf is not None else True # --- 6. Goal --- self._goal = ( deque(goal) if goal is not None and is_2d_list(goal) else deque([goal]) if goal is not None else None ) self._goal_vertices = ( vertices_transform(self.original_vertices, self.goal) if self.goal is not None else None ) self._init_goal = self._goal.copy() if self._goal is not None else None self._init_goal_vertices = ( self._goal_vertices.copy() if self._goal_vertices is not None else None ) self.goal_threshold = goal_threshold self.arrive_mode = arrive_mode # --- 7. Geometry instance --- self._geometry = self.gf.step(self.state) if self.gf is not None else None self._geometry_valid = ( shapely.is_valid(self._geometry) if self._geometry is not None else False ) # --- 8. ObjectInfo --- self.info = ObjectInfo( self._id, self.shape, self.kinematics, role, color, static, np.c_[goal], self.vel_min, self.vel_max, np.c_[acce], np.c_[angle_range], goal_threshold, self.wheelbase, self.G, self.h, self.cone_type, self.convex_flag, self.name, ) self.obstacle_info = None self.trajectory = [] # --- 9. Sensors --- sf = SensorFactory() self.lidar = None if sensors is not None: self.sensors = [ sf.create_sensor(self._state[0:3], self._id, **sensor_kwargs) for sensor_kwargs in sensors ] for sensor in self.sensors: sensor.parent = self self.lidar = next( ( sensor for sensor in self.sensors if sensor.sensor_type in {"lidar2d", "fmcw_lidar2d"} ), None, ) else: self.sensors = [] if fov is None: self.fov = ( WrapTo2Pi(self.lidar.angle_range) if self.lidar is not None else None ) self.fov_radius = self.lidar.range_max if self.lidar is not None else None else: self.fov = WrapTo2Pi(fov) self.fov_radius = fov_radius # --- 10. Behavior --- self.obj_behavior = Behavior(self.info, behavior) self.group_behavior_dict = group_behavior if group_behavior is not None else {} self.rl = self.beh_config.get("range_low", [0, 0, -pi]) self.rh = self.beh_config.get("range_high", [10, 10, pi]) self.wander = self.beh_config.get("wander", False) self.loop = self.beh_config.get("loop", False) if self.wander and self.loop: self.logger.warning( f"Object {self.id}: Both 'wander' and 'loop' are enabled. " "'wander' takes priority, 'loop' will be disabled." ) self.loop = False if self.wander: self._goal = deque([random_point_range(self.rl, self.rh)]) # --- 11. Flags --- self.stop_flag = False self.arrive_flag = False self.collision_flag = False self.unobstructed = unobstructed # --- 12. Plot state --- self.plot_kwargs = kwargs.get("plot", {}) self.plot_patch_list = [] self.plot_line_list = [] self.plot_text_list = [] self._custom_text: str | None = None self._custom_goal_text: str | None = None self.collision_obj = [] self.plot_trail_list = [] # --- 13. Validate kwargs --- check_unknown_kwargs(kwargs, self._VALID_PARAMS, context=f" in '{role}' config") def __eq__(self, o: "ObjectBase") -> bool: if isinstance(o, ObjectBase): return self._id == o._id return False def __hash__(self) -> int: return self._id def __str__(self) -> str: return f"ObjectBase: {self._id}"
[docs] @classmethod def reset_id_iter(cls, start: int = 0, step: int = 1): """reset the id iterator""" cls.id_iter = itertools.count(start, step)
[docs] def step( self, velocity: np.ndarray | None = None, sensor_step: bool = True, **kwargs: Any, ): """ Perform a single simulation step, updating the object's state and sensors. This method advances the object by one time step, integrating the given velocity or behavior-generated velocity to update the object's position, orientation, and other state variables. It also updates sensors and checks for collisions. Args: velocity (np.ndarray, optional): Desired velocity for this step. If None, the object will use its behavior system to generate velocity. The shape and meaning depend on the kinematics model: - Differential: [linear_velocity, angular_velocity] - Omnidirectional: [velocity_x, velocity_y] - Ackermann: [linear_velocity, steering_angle] **kwargs: Additional parameters passed to behavior generation and processing. Returns: np.ndarray: The updated state vector of the object after the step. Returns the current state unchanged if the object is static or stopped. Note: - Static objects (static=True) will not move and return their current state - Objects with stop_flag=True will halt and return their current state - The method automatically handles sensor updates and trajectory recording """ if self.static or self.stop_flag: self._velocity = np.zeros(self.vel_shape) # velocity just got zeroed — invalidate per-tick caches so # other agents don't perceive a stopped object as still moving. self._invalidate_reactive_cache() return self.state self.pre_process() behavior_vel = self.gen_behavior_vel(velocity) new_state = self.kf.step(self.state, behavior_vel, self._world_param.step_time) next_state = self.mid_process(new_state) self._state = next_state self._velocity = behavior_vel self._geometry = self.gf.step(self.state) self._geometry_valid = shapely.is_valid(self._geometry) # state/velocity/geometry changed — invalidate per-tick caches # used by reactive behaviors (SFM/RVO read these once per # neighbour). Non-static linestring objects have their vertices # re-transformed by ``gf.step(state)`` above, so the segment # cache must drop too; static objects skip this whole method # (early-return at top), so their segment cache remains valid # for the lifetime of the env. self._invalidate_reactive_cache() if sensor_step: self.sensor_step() self.post_process() self.trajectory.append(self.state.copy()) return next_state
[docs] def sensor_step(self): """ Update all sensors for the current state. """ [sensor.step(self.state[0:3]) for sensor in self.sensors]
[docs] def check_status(self): """ Check the current status of the object, including arrival and collision detection. This method evaluates collision detection and sets stop flags based on the collision mode. It also handles different collision modes like 'stop', 'reactive', 'unobstructed', and 'unobstructed_obstacles'. """ self.check_arrive_status() self.check_collision_status() if self._world_param.collision_mode == "stop": self.stop_flag = any(not obj.unobstructed for obj in self.collision_obj) elif self._world_param.collision_mode == "reactive": "currently same as unobstructed: to be further implemented" pass elif self._world_param.collision_mode == "unobstructed": pass elif self._world_param.collision_mode == "unobstructed_obstacles": if self.role == "robot": self.stop_flag = any(not obj.unobstructed for obj in self.collision_obj) elif self.role == "obstacle": self.stop_flag = False else: if self._world_param.count % 50 == 0 and self.role == "robot": self.logger.warning( f"collision mode {self._world_param.collision_mode} is not defined within [stop, reactive, unobstructed, unobstructed_obstacles], the unobstructed mode is used" )
[docs] def check_arrive_status(self): """ Check if the object has arrived at its goal position. The arrival detection depends on the arrive_mode setting: - "state": Compares full state (x, y, theta) - "position": Compares only position (x, y) Updates the arrive_flag and handles multiple goals by removing completed ones. """ if self.check_arrive(self.goal): if len(self._goal) == 1: self.arrive_flag = True else: self._goal.popleft() self.arrive_flag = False else: self.arrive_flag = False
[docs] def check_arrive(self, goal, threshold=None): """ Check if the object has arrived at a given goal. Args: goal (np.ndarray): Goal state to check arrival against. threshold (float, optional): Distance threshold for arrival. Defaults to self.goal_threshold if not provided. Returns: bool: True if the object is within the threshold, False otherwise. """ if goal is None: return False if threshold is None: threshold = self.goal_threshold if self.arrive_mode == "state": diff = np.linalg.norm(self.state[:3] - goal[:3]) elif self.arrive_mode == "position": diff = np.linalg.norm(self.state[:2] - goal[:2]) else: raise ValueError( f"Unsupported arrive_mode '{self.arrive_mode}'. " "Supported modes are 'state' and 'position'." ) return diff < threshold
[docs] def check_collision_status(self): """ Check if the object is in collision with other objects in the environment. This method queries possible collision objects from the geometry tree and checks for intersections. It logs collision warnings for robots and updates the collision_flag and collision_obj list. """ collision_flags = [] self.collision_obj = [] for obj in self.possible_collision_objects: if self.check_collision(obj): collision_flags.append(True) self.collision_obj.append(obj) if self.role == "robot" and not self.collision_flag: self.logger.warning( f"{self.name} collided with {obj.name} at state {np.round(self.state[:3, 0], 2).tolist()}" ) else: collision_flags.append(False) self.collision_flag = any(collision_flags)
[docs] def check_collision(self, obj): """ Check collision with another object. Args: obj (ObjectBase): Another object to check collision with. Returns: bool: True if collision occurs, False otherwise. """ if obj.shape == "map": return obj.is_collision(self.geometry) return shapely.intersects(self.geometry, obj._geometry)
[docs] def gen_behavior_vel(self, velocity: np.ndarray | None = None) -> np.ndarray: """ Generate behavior-influenced velocity for the object. This method adjusts the desired velocity based on the object's behavior configurations. If no desired velocity is provided (`velocity` is None), the method may generate a default velocity or issue warnings based on the object's role and behavior settings. Note: Wander goal renewal (sampling a new random goal upon arrival) is handled in pre_process(). Args: velocity (Optional[np.ndarray]): Desired velocity vector. If None, the method determines the velocity based on behavior configurations. Defaults to None. Returns: np.ndarray: Velocity vector adjusted based on behavior configurations and constraints. Logging: Emits a warning if `velocity` is None and no behavior configuration is set for a robot. """ min_vel, max_vel = self.get_vel_range() if velocity is None: if self.beh_config is None: if self.role == "robot": self.logger.warning( "behavior and input velocity is not defined, robot will stay static" ) return np.zeros_like(self._velocity) behavior_vel = self.obj_behavior.gen_vel( self.ego_object, self.external_objects ) else: velocity = to_numpy(velocity, expected_shape=self.vel_shape) behavior_vel = velocity # clip the behavior_vel by maximum and minimum limits if (behavior_vel < (min_vel - 0.01)).any(): self.logger.warning( f"Input velocity {np.round(behavior_vel.flatten(), 2)} below min {np.round(min_vel.flatten(), 2)}. Clipped due to acceleration limit." ) elif (behavior_vel > (max_vel + 0.01)).any(): self.logger.warning( f"Input velocity {np.round(behavior_vel.flatten(), 2)} exceeds max {np.round(max_vel.flatten(), 2)}. Clipped due to acceleration limit." ) return np.clip(behavior_vel, min_vel, max_vel)
[docs] def pre_process(self): """ Perform pre-processing before stepping the object. This method is called before velocity generation and state updates. Can be overridden by subclasses to implement custom pre-processing logic. Default behavior: - If `wander` is enabled and the object has just arrived (`arrive_flag`), sample a new random goal within [`rl`, `rh`] and clear `arrive_flag`. - If `loop` is enabled and the object has just arrived (`arrive_flag`), reset goals to initial waypoints and clear `arrive_flag`. """ if self.wander and self.arrive_flag: self._goal = deque([random_point_range(self.rl, self.rh)]) self.arrive_flag = False if self.loop and self.arrive_flag and self._init_goal: if len(self._init_goal) > 1: self._goal = self._init_goal.copy() else: # Single goal: cycle between start position and goal start_pos = self._init_state[0:2, 0].tolist() goal_pos = self._init_goal[0][0:2] self._goal = deque([goal_pos, start_pos]) self.arrive_flag = False
[docs] def post_process(self): """ Perform post-processing after stepping the object. This method is called after state updates and sensor updates. Can be overridden by subclasses to implement custom post-processing logic. """ pass
[docs] def mid_process(self, state: np.ndarray): """ Process state in the middle of a step. Make sure the state is within the desired dimension. Args: state (np.ndarray): State vector. Returns: np.ndarray: Processed state. """ if state.shape[0] > 2: state[2, 0] = WrapToRegion( float(state[2, 0]), self.info.angle_range.flatten().tolist() ) if state.shape[0] < self.state_dim: pad_value = ( self.state[self.state_dim - 1, 0] if self.state.shape[0] >= self.state_dim else 0 ) pad_rows = self.state_dim - state.shape[0] padding = pad_value * np.ones((pad_rows, state.shape[1])) state = np.concatenate((state, padding), axis=0) elif state.shape[0] > self.state_dim: state = state[: self.state_dim] return state
[docs] def get_lidar_scan(self): """ Get the lidar scan of the object. Returns: dict: Lidar scan data containing range and angle information. """ return self.lidar.get_scan()
[docs] def get_lidar_points(self): """ Get the lidar scan points of the object. Returns: np.ndarray: Array of lidar scan points. """ return self.lidar.get_points()
[docs] def get_lidar_offset(self): """ Get the lidar offset relative to the object. Returns: list: Lidar offset [x, y, theta] relative to the object center. """ return self.lidar.get_offset()
[docs] def get_fov_detected_objects(self): """ Detect the env objects that in the field of view. Returns: list: The objects that in the field of view of the object. """ return [obj for obj in self.external_objects if self.fov_detect_object(obj)]
[docs] def fov_detect_object(self, detected_object: "ObjectBase"): """ Detect whether the input object is in the field of view. Args: object: The object that to be detected. Returns: bool: Whether the object is in the field of view. """ rx, ry = self.state[:2, 0] px, py = detected_object.state[:2, 0] radius_do = detected_object.radius dx = px - rx dy = py - ry rad_to_do = WrapToPi(math.atan2(dy, dx)) object_orientation = self.state[2, 0] rad_diff = WrapToPi(rad_to_do - object_orientation, True) distance_do = math.hypot(dx, dy) if distance_do == 0: rad_offset = pi / 2 else: rad_offset = WrapToPi(math.asin(min(radius_do / distance_do, 1))) fov_diff = abs(rad_diff - rad_offset) return bool( fov_diff <= self.fov / 2 and distance_do - radius_do <= self.fov_radius )
[docs] def set_state(self, state: list | np.ndarray | None = None, init: bool = False): """ Set the current state of the object. This method updates the object's position, orientation, and other state variables. It also updates the object's geometry representation to match the new state. Args: state (Union[list, np.ndarray]): The new state vector for the object. The format depends on the object's state dimension: - 2D objects: [x, y, theta] where theta is orientation in radians - 3D objects: [x, y, z, roll, pitch, yaw] or similar based on configuration Must match the object's state_dim dimension. init (bool): Whether to also set this as the initial state for reset purposes. If True, the object will return to this state when reset() is called. Default is False. Raises: AssertionError: If the state dimension doesn't match the expected state_dim. Example: >>> # Set robot position and orientation >>> robot.set_state([5.0, 3.0, 1.57]) # x=5, y=3, facing pi/2 radians >>> >>> # Set as initial state for resets >>> robot.set_state([0, 0, 0], init=True) """ if state is None: state = [0, 0, 0] temp_state = to_numpy(state, expected_shape=self.state_shape) if init: self._init_state = temp_state.copy() self._state = temp_state.copy() self._geometry = self.gf.step(self.state) self._geometry_valid = shapely.is_valid(self._geometry) self._invalidate_reactive_cache()
[docs] def set_velocity( self, velocity: list | np.ndarray | None = None, init: bool = False ) -> None: """ Set the velocity of the object. Args: velocity: The velocity of the object. Depending on the kinematics model. init (bool): Whether to set the initial velocity (default False). """ temp_velocity = to_numpy( velocity, default=np.zeros(self.vel_shape), expected_shape=self.vel_shape ) if init: self._init_velocity = temp_velocity.copy() self._velocity = temp_velocity.copy() self._invalidate_reactive_cache()
[docs] def set_original_geometry(self, geometry: BaseGeometry): """ Set the original geometry of the object. Args: geometry (BaseGeometry): Shapely geometry to use as the new original geometry. Subsequent geometry updates will be transformed from this base. """ self.gf._original_geometry = geometry
[docs] def set_random_goal( self, obstacle_list, init: bool = False, free: bool = True, goal_check_radius: float = 0.2, range_limits: list | None = None, max_attempts: int = 100, ): """ Set random goal(s) in the environment. If free set to True, the goal will be placed only in the free from obstacles part of the environment. Args: obstacle_list: List of objects in the environment init (bool): Whether to set the initial goal (default False). free (bool): Whether to check that goal is placed in a position free of obstacles. goal_check_radius (float): Radius in which to check if the goal is free of obstacles. range_limits (list): List of lower and upper bound range limits in which to set the random goal position. max_attempts (int): Max number of attempts to place the goal in a position free of obstacles. """ if range_limits is None: range_limits = [self.rl, self.rh] deque_goals = deque() for _ in range(len(self._goal)): if free: covered_goal = True counter = 0 while covered_goal and counter < max_attempts: goal = random_point_range(range_limits[0], range_limits[1]) shape = {"name": "circle", "radius": goal_check_radius} gf = GeometryFactory.create_geometry(**shape) geometry = gf.step(np.c_[goal]) covered_goal = any( shapely.intersects(geometry, obj._geometry) for obj in obstacle_list ) counter += 1 if counter == max_attempts: self.logger.warning( f"Could not place the goal in a position free of obstacles in {max_attempts} tries" ) else: goal = random_point_range(range_limits[0], range_limits[1]) deque_goals.append(goal) self.set_goal(deque_goals, init=init)
[docs] def set_goal(self, goal: list | np.ndarray | None = None, init: bool = False): """ Set the goal(s) for the object to navigate towards. This method configures the target location(s) that the object's behavior system will attempt to reach. Multiple goals can be provided for sequential navigation. Args: goal (Union[list, np.ndarray]): The goal specification. Can be: - Single goal: [x, y, theta] for one target location - Multiple goals: [[x1, y1, theta1], [x2, y2, theta2], ...] for sequential targets - None: Clear all goals The theta component specifies the desired final orientation in radians. init (bool): Whether to also set this as the initial goal for reset purposes. If True, these goals will be restored when reset() is called. Default is False. Example: >>> # Set single goal >>> robot.set_goal([10.0, 5.0, 0.0]) # Move to (10,5) facing East >>> >>> # Set multiple sequential goals >>> waypoints = [[5, 0, 0], [10, 5, 1.57], [0, 10, 3.14]] >>> robot.set_goal(waypoints) >>> >>> # Clear goals >>> robot.set_goal(None) """ if goal is None: self._goal = None if init: self._init_goal = None return if isinstance(goal, list) and is_2d_list(goal): self._goal = deque(goal) if init: self._init_goal = self._goal.copy() return if isinstance(goal, deque): self._goal = goal if init: self._init_goal = self._goal.copy() return if isinstance(goal, list): assert len(goal) >= 2, ( f"The goal dimension is not correct. Your input goal length is {len(goal)} and the goal dimension should be at least 2" ) temp_goal = np.c_[goal] elif isinstance(goal, np.ndarray): assert goal.shape[0] >= 2, ( f"The goal dimension is not correct. Your input goal dimension is {goal.shape[0]} and the goal dimension should be at least 2" ) temp_goal = goal goal_deque = deque([temp_goal.copy().flatten().tolist()]) if init: self._init_goal = goal_deque self._goal = goal_deque
[docs] def append_goal(self, goal: list | np.ndarray): """ Append a goal to the goal list. """ if isinstance(goal, list): self._goal.append(goal) elif isinstance(goal, np.ndarray): self._goal.append(goal.flatten().tolist())
[docs] def set_laser_color( self, laser_indices, laser_color: str = "cyan", alpha: float = 0.3 ): """ Set the color of the lasers. Args: laser_indices (list): The indices of the lasers to set the color. laser_color (str): The color to set the lasers. Default is 'cyan'. alpha (float): The transparency of the lasers. Default is 0.3. """ if self.lidar is not None: self.lidar.set_laser_color(laser_indices, laser_color, alpha) else: self.logger.warning("No lidar sensor found for this object.")
[docs] def input_state_check(self, state: list, dim: int = 3): """ Check and adjust the state to match the desired dimension. Args: state (list): State of the object. dim (int): Desired dimension. Defaults to 3. Returns: list: Adjusted state. """ if len(state) > dim: if self.role == "robot": self.logger.warning( f"The state dimension {len(state)} of {self.abbr} is larger than the desired dimension {dim}, the state dimension is truncated" ) return state[:dim] if len(state) < dim: if self.role == "robot": self.logger.warning( f"The state dimension {len(state)} of {self.abbr} is smaller than the desired dimension {dim}, zero padding is added" ) return state + [0] * (dim - len(state)) return state
[docs] def plot( self, ax, state: np.ndarray | None = None, vertices: np.ndarray | None = None, **kwargs, ): """ Plot the object on the given axis. Args: ax: Matplotlib axis object for plotting. state: State vector [x, y, theta, ...] defining object position and orientation. vertices: Vertices array defining object shape for polygon/rectangle objects. **kwargs: Plotting configuration options. """ if state is None: state = self.state if vertices is None: vertices = self.vertices self._plot(ax, state, vertices, **kwargs)
def _init_plot(self, ax, **kwargs): """ Initialize plotting elements using zero state and initial vertices. Returns: list: Names of plot attributes created (e.g., 'object_patch', 'goal_patch'). """ # Apply handler-derived show_arrow default when not explicitly set if ( self.kf is not None and "show_arrow" not in self.plot_kwargs and "show_arrow" not in kwargs ): kwargs.setdefault("show_arrow", self.kf.show_arrow) return self._plot( ax, self.original_state, self.original_vertices, initial=True, **kwargs ) def _plot(self, ax, state, vertices, initial: bool = False, **kwargs): """ Plot the object with the specified state and vertices. Args: ax: Matplotlib axis object for plotting. state: State vector [x, y, theta, ...] defining object position and orientation. vertices: Vertices array defining object shape for polygon/rectangle objects. initial: Whether the plot is for the initial state. Defaults to False. **kwargs: Plotting configuration options: Object visualization properties: - obj_linestyle (str): Line style for object outline (e.g., '-', '--', ':', '-.'). - obj_zorder (int): Z-order (drawing layer) for object elements; defaults to 3 for robots, 1 for obstacles. - obj_color (str): Color of the object. - obj_alpha (float): Transparency of the object (0.0 to 1.0). - show_goal (bool): Whether to show the goal position. Defaults to False. - goal_color (str): Color of the goal marker. Defaults to object color. - goal_zorder (int): Z-order of the goal marker. Defaults to 1. - goal_alpha (float): Transparency of the goal marker. Defaults to 0.5. - show_text (bool): Whether to show text information. Defaults to False. - text_color (str): Color of the text. Defaults to 'k'. - text_size (int): Font size of the text. Defaults to 10. - text_zorder (int): Z-order of the text. Defaults to 2. - show_arrow (bool): Whether to show the velocity arrow. Defaults to False. - arrow_color (str): Color of the arrow. Defaults to "gold". - arrow_length (float): Length of the arrow. Defaults to 0.4. - arrow_width (float): Width of the arrow. Defaults to 0.6. - arrow_zorder (int): Z-order of the arrow. Defaults to 4. - show_trajectory (bool): Whether to show the trajectory line. Defaults to False. - traj_color (str): Color of the trajectory. Defaults to object color. - traj_style (str): Line style of the trajectory. Defaults to "-". - traj_width (float): Width of the trajectory line. Defaults to object width. - traj_alpha (float): Transparency of the trajectory. Defaults to 0.5. - show_trail (bool): Whether to show object trails. Defaults to False. - trail_freq (int): Frequency of trail display (every N steps). Defaults to 2. - trail_edgecolor (str): Edge color of the trail. Defaults to object color. - trail_linewidth (float): Width of the trail outline. Defaults to 0.8. - trail_alpha (float): Transparency of the trail. Defaults to 0.7. - trail_fill (bool): Whether to fill the trail shape. Defaults to False. - trail_color (str): Fill color of the trail. Defaults to object color. - show_sensor (bool): Whether to show sensor visualizations. Defaults to True. - show_fov (bool): Whether to show field of view visualization. Defaults to False. - fov_color (str): Fill color of the field of view. Defaults to "lightblue". - fov_edge_color (str): Edge color of the field of view. Defaults to "blue". - fov_zorder (int): Z-order of the field of view. Defaults to 1. - fov_alpha (float): Transparency of the field of view. Defaults to 0.5. Creates and stores the following plot elements: - object_patch: Main object visualization (patch) - object_line: Object outline for linestring shapes (line) - object_img: Object image for description-based visualization - goal_patch: Goal position marker (patch) - _text: Object text label - _goal_text: Goal text label - arrow_patch: Velocity direction arrow (patch) - trajectory_line: Trajectory path visualization (line) - fov_patch: Field of view visualization (patch) Returns: list: List of plot attribute names that were created. """ self.plot_attr_list = [ "object_patch", "object_line", "object_img", "goal_patch", "_text", "_goal_text", "arrow_patch", "trajectory_line", "fov_patch", ] self.plot_kwargs.update(kwargs) self.ax = ax self.show_goal = self.plot_kwargs.get("show_goal", False) self.show_goal_text = self.plot_kwargs.get("show_goal_text", False) self.show_goals = self.plot_kwargs.get("show_goals", False) show_text = self.plot_kwargs.get("show_text", False) show_arrow = self.plot_kwargs.get("show_arrow", False) show_trajectory = self.plot_kwargs.get("show_trajectory", False) self.show_trail = self.plot_kwargs.get("show_trail", False) self.show_sensor = self.plot_kwargs.get("show_sensor", True) show_fov = self.plot_kwargs.get("show_fov", False) self.trail_freq = self.plot_kwargs.get("trail_freq", 2) if self.shape != "map": self.plot_object(ax, state, vertices, **self.plot_kwargs) if self.show_goal: goal_state = state if initial else self.goal goal_vertices = vertices if initial else self.goal_vertices self.plot_goal(ax, goal_state, goal_vertices, **self.plot_kwargs) if show_text: self.plot_text(ax, state, **self.plot_kwargs) if show_arrow: current_velocity = self.velocity_xy if np.any(state) else np.zeros((2, 1)) arrow_theta = 0.0 if initial else self.heading self.plot_arrow( ax, state, current_velocity, arrow_theta, **self.plot_kwargs ) if show_trajectory: trajectory_data = self.trajectory if np.any(state) else [] self.plot_trajectory(ax, trajectory_data, **self.plot_kwargs) if ( self.show_trail and self._world_param.count % self.trail_freq == 0 and self._world_param.count > 0 ): self.plot_trail(ax, state, self.vertices, **self.plot_kwargs) if self.show_sensor: [sensor.plot(ax, state, **self.plot_kwargs) for sensor in self.sensors] if show_fov: self.plot_fov(ax, **self.plot_kwargs) return self.plot_attr_list def _step_plot(self, **kwargs): """ Update the positions and properties of all plot elements created in _init_plot. Elements are updated using transforms for patches and set_data/set_data_3d for lines, based on the object's current state, in both 2D and 3D. Update methods by element type: - Patches (object_patch, goal_patch, arrow_patch, fov_patch): Updated using matplotlib transforms - Lines (object_line, trajectory_line): Updated using set_data method - Text (_text): Updated using set_position method - Images (object_img): Updated using extent and transform methods Args: **kwargs: Dynamic plotting properties to update during rendering: Object visualization properties: - obj_linestyle (str): Line style for object outline (e.g., '-', '--', ':', '-.'). - obj_zorder (int): Z-order (drawing layer) for object elements. - obj_color (str): Color of the object. - obj_alpha (float): Transparency of the object (0.0 to 1.0). Goal visualization properties: - goal_color (str): Color of the goal marker. - goal_alpha (float): Transparency of the goal marker. - goal_zorder (int): Z-order for goal elements. Arrow visualization properties: - arrow_color (str): Color of the velocity arrow. - arrow_alpha (float): Transparency of the arrow. - arrow_zorder (int): Z-order for arrow elements. Trajectory visualization properties: - traj_color (str): Color of the trajectory line. - traj_style (str): Line style of the trajectory (e.g., '-', '--', ':', '-.'). - traj_width (float): Width of the trajectory line. - traj_alpha (float): Transparency of the trajectory line. - traj_zorder (int): Z-order for trajectory elements. Text visualization properties: - text_color (str): Color of the text label. - text_size (int): Font size of the text. - text_alpha (float): Transparency of the text. - text_zorder (int): Z-order for text elements. Field of view properties: - fov_color (str): Fill color of the field of view. - fov_alpha (float): Transparency of the field of view. - fov_edge_color (str): Edge color of the field of view. - fov_zorder (int): Z-order of the field of view. Trail visualization properties (for new trail elements): - trail_edgecolor (str): Edge color of the trail. - trail_linewidth (float): Width of the trail outline. - trail_alpha (float): Transparency of the trail. - trail_fill (bool): Whether to fill the trail shape. - trail_color (str): Fill color of the trail. - trail_zorder (int): Z-order for trail elements. Note: This method updates existing plot elements in place. Trail elements are created new each time based on trail_freq setting and are not updated but recreated. """ x = self.state[0, 0] y = self.state[1, 0] r_phi = self.state[2, 0] r_phi_ang = 180 * r_phi / pi for attr in self.plot_attr_list: if hasattr(self, attr): element = getattr(self, attr) if attr == "object_patch": # For patches created at origin in _init_plot, use transform to position them obj_kwargs = { "color": kwargs.get("obj_color", self.color), "alpha": kwargs.get("obj_alpha"), "zorder": kwargs.get("obj_zorder"), "linestyle": kwargs.get("obj_linestyle"), } set_patch_property(element, self.ax, state=self.state, **obj_kwargs) elif attr == "object_line": # For lines, use set_data to update coordinates (works for both 2D and 3D) vertices = self.vertices cos_phi = np.cos(r_phi) sin_phi = np.sin(r_phi) rotation_matrix = np.array( [[cos_phi, -sin_phi], [sin_phi, cos_phi]] ) rotated_vertices = np.dot(vertices.T, rotation_matrix.T).T translated_vertices = rotated_vertices + np.array([[x], [y]]) element.set_data( translated_vertices[0, :], translated_vertices[1, :] ) # Update object line properties if "obj_linestyle" in kwargs: element.set_linestyle(kwargs["obj_linestyle"]) if "obj_color" in kwargs: element.set_color(kwargs["obj_color"]) if "obj_alpha" in kwargs: element.set_alpha(kwargs["obj_alpha"]) if "obj_zorder" in kwargs: element.set_zorder(kwargs["obj_zorder"]) elif attr == "object_img": # Update image position and rotation using transform start_x = self.vertices[0, 0] start_y = self.vertices[1, 0] if not isinstance(self.ax, Axes3D): # Update image extent based on current vertices element.set_extent( [ start_x, start_x + self.length, start_y, start_y + self.width, ] ) # Create new transform for image trans_data = ( mtransforms.Affine2D().rotate_deg_around( start_x, start_y, r_phi_ang ) + self.ax.transData ) element.set_transform(trans_data) elif attr == "goal_patch": goal_kwargs = { "color": kwargs.get("goal_color"), "alpha": kwargs.get("goal_alpha"), "zorder": kwargs.get("goal_zorder"), } if self.goal is None: element.set_visible(False) continue if not element.get_visible(): element.set_visible(True) if self.goal.shape[0] > 2: goal_state = self.goal else: goal_state = np.pad( self.goal, (0, 1), "constant", constant_values=0 ) set_patch_property( element, self.ax, state=goal_state, **goal_kwargs ) elif attr == "arrow_patch": # Update arrow patch using set_element_property if isinstance(element, Arrow): # Calculate orientation for arrow direction theta = self.heading arrow_state = np.array([[x], [y], [theta]]) arrow_kwargs = { "color": kwargs.get("arrow_color"), "alpha": kwargs.get("arrow_alpha"), "zorder": kwargs.get("arrow_zorder"), } set_patch_property( element, self.ax, state=arrow_state, **arrow_kwargs ) elif attr == "trajectory_line": # Update trajectory line using set_data (works for both 2D and 3D) if isinstance(element, list) and len(element) > 0: line = element[0] x_list = [ t[0, 0] for t in self.trajectory[-self.keep_traj_length :] ] y_list = [ t[1, 0] for t in self.trajectory[-self.keep_traj_length :] ] if isinstance(self.ax, Axes3D): # For 3D, add z-coordinate (set to 0) z_list = [0] * len(x_list) line.set_data_3d(x_list, y_list, z_list) else: line.set_data(x_list, y_list) ax = line.axes if ax is not None: linewidth = kwargs.get("traj_width", self.width) linewidth_data = linewidth_from_data_units( linewidth, ax, "y" ) line.set_linewidth(linewidth_data) if "traj_color" in kwargs: line.set_color(kwargs["traj_color"]) if "traj_style" in kwargs: line.set_linestyle(kwargs["traj_style"]) if "traj_alpha" in kwargs: line.set_alpha(kwargs["traj_alpha"]) if "traj_zorder" in kwargs: line.set_zorder(kwargs["traj_zorder"]) elif attr == "fov_patch": # Update FOV patch using set_element_property if isinstance(element, Wedge | Circle): direction = r_phi if self.state_dim >= 3 else 0 fov_state = np.array([[x], [y], [direction]]) fov_kwargs = { "alpha": kwargs.get("fov_alpha"), "zorder": kwargs.get("fov_zorder"), } set_patch_property( element, self.ax, state=fov_state, **fov_kwargs ) # Update text position using set_position (works for both 2D and 3D) if hasattr(self, "_text"): text = self._text # Prefer runtime kwargs, then initial plot kwargs, fallback to default default_text_pos = [-self.radius - 0.1, self.radius + 0.1] text_position = kwargs.get( "text_position", self.plot_kwargs.get("text_position", default_text_pos), ) text.set_position((x + text_position[0], y + text_position[1])) # Sync display text (may have been changed via set_text) text.set_text(self._get_text()) # Update text properties if "text_color" in kwargs: text.set_color(kwargs["text_color"]) if "text_size" in kwargs: text.set_fontsize(kwargs["text_size"]) if "text_alpha" in kwargs: text.set_alpha(kwargs["text_alpha"]) if "text_zorder" in kwargs: text.set_zorder(kwargs["text_zorder"]) # Update goal text position using set_position (works for both 2D and 3D) if self.goal is not None: goal_x = self.goal[0, 0] goal_y = self.goal[1, 0] if hasattr(self, "_goal_text"): goal_text = self._goal_text # Prefer runtime kwargs, then initial plot kwargs, fallback to default default_text_pos = [-self.radius - 0.1, self.radius + 0.1] text_position = kwargs.get( "text_position", self.plot_kwargs.get("text_position", default_text_pos), ) goal_text.set_position( (goal_x + text_position[0], goal_y + text_position[1]) ) # Sync goal display text (may have been changed via set_goal_text) goal_text.set_text(self._get_goal_text()) # Update text properties if "text_color" in kwargs: goal_text.set_color(kwargs["text_color"]) if "text_size" in kwargs: goal_text.set_fontsize(kwargs["text_size"]) if "text_alpha" in kwargs: goal_text.set_alpha(kwargs["text_alpha"]) if "text_zorder" in kwargs: goal_text.set_zorder(kwargs["text_zorder"]) # Handle trail plotting (creates new elements each time) if self.show_trail and self._world_param.count % self.trail_freq == 0: self.plot_trail( self.ax, self.state, self.original_vertices, **self.plot_kwargs ) # Update sensors if self.show_sensor: [sensor.step_plot() for sensor in self.sensors]
[docs] def plot_object( self, ax, state: np.ndarray | None = None, vertices: np.ndarray | None = None, **kwargs, ): """ Plot the object itself in the specified coordinate system. Args: ax: Matplotlib axis object state: State of the object (x, y, r_phi) defining position and orientation. If None, uses the object's current state. Defaults to None. vertices: Vertices of the object [[x1, y1], [x2, y2], ...] for polygon and rectangle shapes. If None, uses the object's current vertices. Defaults to None. **kwargs: Additional plotting options - obj_linestyle (str): Line style for object outline, defaults to '-' - obj_zorder (int): Drawing layer order, defaults to 3 if object is robot, 1 if object is the obstacle. - obj_color (str): Color of the object, defaults to 'k' (black). - obj_alpha (float): Transparency of the object, defaults to 1.0. Returns: None Raises: Exception: If the underlying patch creation fails (e.g., unsupported shape or backend issues). """ obj_linestyle = kwargs.get("obj_linestyle", "-") obj_zorder = kwargs.get("obj_zorder", 3) if self.role == "robot" else 1 state = self.state if state is None else state vertices = self.vertices if vertices is None else vertices # Handle 3D plot or no description case if self.description is None or isinstance(ax, Axes3D): try: if self.shape != "map": self.object_patch = draw_patch( ax, shape=self.shape, state=state, radius=self.radius, vertices=vertices, color=self.color, linestyle=obj_linestyle, zorder=obj_zorder, ) self.plot_patch_list.append(self.object_patch) except Exception as e: self.logger.error(f"Error occurred while plotting object: {e!s}") raise else: self.plot_object_image(ax, state, vertices, self.description, **kwargs)
[docs] def plot_object_image( self, ax, state: np.ndarray | None = None, vertices: np.ndarray | None = None, description: str | None = None, **kwargs, ): """ Plot the object using an image file based on the description. Args: ax: Matplotlib axis object for plotting. state (Optional[np.ndarray]): State of the object (x, y, r_phi) defining position and orientation. If None, uses the object's current state. Defaults to None. vertices (Optional[np.ndarray]): Vertices of the object for positioning the image. If None, uses the object's current vertices. Defaults to None. description (str): Path or name of the image file to display. Defaults to None. **kwargs: Additional plotting options (currently unused). Note: The image file is searched in the world/description/ directory relative to the project root. The image is rotated and positioned according to the object's state and vertices. """ if vertices is None or state is None: return robot_image_path = file_check( description, root_path=path_manager.root_path + "/world/description/" ) if robot_image_path is None: return start_x = float(vertices[0, 0]) start_y = float(vertices[1, 0]) r_phi = float(state[2, 0]) r_phi_ang = 180 * r_phi / pi obj_zorder = kwargs.get("obj_zorder", 2) robot_img_read = image.imread(robot_image_path) robot_img = ax.imshow( robot_img_read, extent=[ float(start_x), float(start_x + self.length), float(start_y), float(start_y + self.width), ], zorder=obj_zorder, ) trans_data = ( mtransforms.Affine2D().rotate_deg_around(start_x, start_y, r_phi_ang) + ax.transData ) robot_img.set_transform(trans_data) self.plot_patch_list.append(robot_img) self.object_img = robot_img
[docs] def plot_trajectory( self, ax, trajectory: list | None = None, keep_traj_length: int = 0, **kwargs ): """ Plot the trajectory path of the object using the specified trajectory data. Args: ax: Matplotlib axis. trajectory: List of trajectory points to plot, where each point is a numpy array [x, y, theta, ...]. If None, uses self.trajectory. Defaults to None. keep_traj_length (int): Number of steps to keep from the end of trajectory. If 0, plots entire trajectory. Defaults to 0. **kwargs: Additional plotting options: traj_color (str): Color of the trajectory line. traj_style (str): Line style of the trajectory. traj_width (float): Width of the trajectory line. traj_alpha (float): Transparency of the trajectory line. traj_zorder (int): Zorder of the trajectory. """ if trajectory is None: trajectory = self.trajectory self.keep_traj_length = keep_traj_length traj_color = kwargs.get("traj_color", self.color) traj_style = kwargs.get("traj_style", "-") traj_width = kwargs.get("traj_width", self.width) traj_alpha = kwargs.get("traj_alpha", 0.5) traj_zorder = kwargs.get("traj_zorder", 0) x_list = [t[0, 0] for t in trajectory[-self.keep_traj_length :]] y_list = [t[1, 0] for t in trajectory[-self.keep_traj_length :]] linewidth = linewidth_from_data_units(traj_width, ax, "y") if isinstance(ax, Axes3D): linewidth = traj_width * 10 solid_capstyle = "round" if self.shape == "circle" else "butt" self.trajectory_line = ax.plot( x_list, y_list, color=traj_color, linestyle=traj_style, linewidth=linewidth, solid_joinstyle="round", solid_capstyle=solid_capstyle, alpha=traj_alpha, zorder=traj_zorder, ) self.plot_line_list.append(self.trajectory_line)
[docs] def plot_goal( self, ax, goal_state: np.ndarray | None = None, vertices: np.ndarray | None = None, goal_color: str | None = None, goal_zorder: int | None = 1, goal_alpha: float | None = 0.5, **kwargs, ): """ Plot the goal position of the object in the specified coordinate system. Args: ax: Matplotlib axis. goal_state: State of the goal (x, y, r_phi) defining goal position and orientation. If None, nothing is plotted. Defaults to None. vertices: Vertices for polygon/rectangle goal shapes. If None, uses original_vertices. Defaults to None. goal_color (str): Color of the goal marker. Defaults to be the color of the object. goal_zorder (int): Zorder of the goal marker. Defaults to 1. goal_alpha (float): Transparency of the goal marker. Defaults to 0.5. """ goal_color = self.color if goal_color is None else goal_color if goal_state is None: return self.goal_patch = draw_patch( ax, shape=self.shape, state=goal_state, radius=self.radius, vertices=vertices, color=goal_color, alpha=goal_alpha, zorder=goal_zorder, ) self.plot_patch_list.append(self.goal_patch)
[docs] def plot_text(self, ax, state: np.ndarray | None = None, **kwargs): """ Plot the text label of the object at the specified position. Args: ax: Matplotlib axis. state: State of the object (x, y, r_phi) to determine text position. If None, uses the object's current state. Defaults to None. **kwargs: Additional plotting options. - text_color (str): Color of the text, default is 'k'. - text_size (int): Font size of the text, default is 10. - text_position (list): Position offset from object center [dx, dy], default is [-radius-0.1, radius+0.1]. - text_zorder (int): Zorder of the text. Defaults to 2. - text_alpha (float): Transparency of the text. Defaults to 1. Note: Subsequent updates via _step_plot honor the configured text_position if provided. """ if state is None: state = self.state text_color = kwargs.get("text_color", "k") text_size = kwargs.get("text_size", 10) text_position = kwargs.get( "text_position", [-self.radius - 0.1, self.radius + 0.1] ) text_zorder = kwargs.get("text_zorder", 2) text_alpha = kwargs.get("text_alpha", 1) x, y = state[0, 0], state[1, 0] if isinstance(ax, Axes3D): self._text = ax.text( x + text_position[0], y + text_position[1], self.z, self._get_text(), fontsize=text_size, color=text_color, zorder=text_zorder, alpha=text_alpha, ) else: self._text = ax.text( x + text_position[0], y + text_position[1], self._get_text(), fontsize=text_size, color=text_color, zorder=text_zorder, alpha=text_alpha, ) self.plot_text_list.append(self._text) if self.show_goal and self.show_goal_text: goal_x, goal_y = self.goal[0, 0], self.goal[1, 0] if isinstance(ax, Axes3D): self._goal_text = ax.text( goal_x + text_position[0], goal_y + text_position[1], self.z, self._get_goal_text(), fontsize=text_size, color=text_color, zorder=text_zorder, alpha=text_alpha, ) else: self._goal_text = ax.text( goal_x + text_position[0], goal_y + text_position[1], self._get_goal_text(), fontsize=text_size, color=text_color, zorder=text_zorder, alpha=text_alpha, ) self.plot_text_list.append(self._goal_text)
[docs] def plot_arrow( self, ax, state: np.ndarray | None = None, velocity: np.ndarray | None = None, arrow_theta: float | None = 0.0, arrow_length: float = 0.4, arrow_width: float = 0.6, arrow_color: str | None = None, arrow_zorder: int = 3, **kwargs, ): """ Plot an arrow indicating the velocity orientation of the object at the specified position. Args: ax: Matplotlib axis. state: State of the object (x, y, r_phi) to determine arrow position. If None, uses the object's current state. Defaults to None. velocity: Velocity of the object to determine arrow direction. If None, uses the object's current velocity_xy. Defaults to None. arrow_length (float): Length of the arrow. Defaults to 0.4. arrow_width (float): Width of the arrow. Defaults to 0.6. arrow_color (str): Color of the arrow. Defaults to "gold". arrow_zorder (int): Z-order for drawing layer. Defaults to 4. """ if state is None: state = self.state if velocity is None: velocity = self.velocity_xy if arrow_color is None: arrow_color = "gold" self.arrow_patch = draw_patch( ax, shape="arrow", state=state, color=arrow_color, zorder=arrow_zorder, arrow_length=arrow_length, arrow_width=arrow_width, theta=arrow_theta, ) self.plot_patch_list.append(self.arrow_patch)
[docs] def plot_trail( self, ax, state: np.ndarray | None = None, vertices: np.ndarray | None = None, keep_trail_length: int = 0, **kwargs, ): """ Plot the trail/outline of the object at the specified position for visualization purposes. Args: ax: Matplotlib axis. state: State of the object (x, y, r_phi) to determine trail position and orientation. If None, uses the object's current state. Defaults to None. vertices: Original vertices of the object for polygon and rectangle trail shapes. If None, uses the object's current vertices. Defaults to None. keep_trail_length (int): Number of steps to keep from the recent trajectory of trail. **kwargs: Additional plotting options: trail_type (str): Type of trail shape, defaults to object's shape. trail_edgecolor (str): Edge color of the trail. trail_linewidth (float): Line width of the trail edge. trail_alpha (float): Transparency of the trail. trail_fill (bool): Whether to fill the trail shape. trail_color (str): Fill color of the trail. trail_zorder (int): Z-order of the trail. """ if vertices is None: vertices = self.original_vertices trail_type = kwargs.get("trail_type", self.shape) trail_edgecolor = kwargs.get("trail_edgecolor", self.color) trail_linewidth = kwargs.get("trail_linewidth", 0.8) trail_alpha = kwargs.get("trail_alpha", 0.7) trail_fill = kwargs.get("trail_fill", False) trail_color = kwargs.get("trail_color", self.color) trail_zorder = kwargs.get("trail_zorder", 0) # angle in degrees, no longer needed due to generic draw_patch usage trail = draw_patch( ax, shape=trail_type, state=state, vertices=vertices, radius=self.radius, width=self.length, height=self.width, edgecolor=trail_edgecolor, facecolor=trail_color, fill=trail_fill, alpha=trail_alpha, linewidth=trail_linewidth, zorder=trail_zorder, ) self.plot_trail_list.append(trail) if len(self.plot_trail_list) > keep_trail_length and keep_trail_length > 0: self.plot_trail_list.pop(0).remove()
[docs] def plot_fov(self, ax, **kwargs): """ Plot the field of view of the object. Creates FOV wedge at origin, will be positioned using transforms in step_plot. if fov is 2*pi, plot a circle, otherwise plot a wedge. Args: ax: Matplotlib axis. **kwargs: Additional plotting options. fov_color (str): Color of the field of view. Default is 'lightblue'. fov_edge_color (str): Edge color of the field of view. Default is 'blue'. fov_zorder (int): Z-order of the field of view. Default is 1. fov_alpha (float): Transparency of the field of view. Default is 0.5. Note: No-op when FOV is not configured (fov or fov_radius is None). """ if self.fov is None or self.fov_radius is None: return fov_color = kwargs.get("fov_color", "lightblue") fov_edge_color = kwargs.get("fov_edge_color", "blue") fov_zorder = kwargs.get("fov_zorder", 1) fov_alpha = kwargs.get("fov_alpha", 0.5) # Create FOV wedge at origin with no rotation (-fov/2 to +fov/2 around 0 degrees) start_degree = -180 * self.fov / (2 * pi) end_degree = 180 * self.fov / (2 * pi) state = np.array([[0.0], [0.0], [0.0]]) shape = "circle" if abs(self.fov - 2 * pi) < 0.01 else "wedge" self.fov_patch = draw_patch( ax, shape=shape, state=state, radius=self.fov_radius, theta1=start_degree, theta2=end_degree, facecolor=fov_color, edgecolor=fov_edge_color, alpha=fov_alpha, zorder=fov_zorder, ) self.plot_patch_list.append(self.fov_patch)
[docs] def plot_uncertainty(self, ax, **kwargs): """ To be completed. """ pass
[docs] def plot_clear(self, all: bool = False): """ Clear all plotted elements from the axis. Args: all (bool): If True, also clears trail elements. If False, keeps trail elements. Defaults to False. """ [patch.remove() for patch in self.plot_patch_list] [line.pop(0).remove() for line in self.plot_line_list] [text.remove() for text in self.plot_text_list] if all: [trail.remove() for trail in self.plot_trail_list] self.plot_trail_list = [] self.plot_patch_list = [] self.plot_line_list = [] self.plot_text_list = [] [sensor.plot_clear() for sensor in self.sensors]
[docs] def done(self): """ Check if the object has completed its task. Returns: bool: True if the task is done, False otherwise. """ return bool(self.stop_flag or self.arrive_flag)
[docs] def reset(self): """ Reset the object to its initial state. """ self._state = self._init_state.copy() self._goal = self._init_goal.copy() if self._init_goal is not None else None self._velocity = self._init_velocity.copy() self.collision_flag = False self.arrive_flag = False self.stop_flag = False self.trajectory = [] self._invalidate_reactive_cache()
[docs] def refresh(self): """ Refresh state-derived attributes (geometry and sensors) without advancing the simulation. Used after ``reset`` so geometry/sensor readings reflect the current state without running a kinematic step (which would clobber ``_velocity`` and add noise drift). """ self._geometry = self.gf.step(self.state) self._geometry_valid = shapely.is_valid(self._geometry) self.sensor_step() self._invalidate_reactive_cache()
def _invalidate_reactive_cache(self) -> None: """Drop per-tick caches read by reactive behaviors (SFM/RVO). Must be called from every path that mutates ``_state``, ``_velocity``, or the transformed geometry — ``step()``, ``set_state``, ``set_velocity``, ``reset``, ``refresh`` — otherwise the next reader gets stale cached data. """ self._velocity_xy_cache = None self._rvo_neighbor_state_cache = None self._rvo_line_segments_cache = None
[docs] def remove(self): """ Remove the object from the environment. """ del self
[docs] def get_vel_range(self) -> tuple[np.ndarray, np.ndarray]: """ Get the velocity range considering acceleration limits. Returns: tuple: Minimum and maximum velocities. """ min_vel = np.maximum( self.vel_min, self.velocity - self.info.acce * self._world_param.step_time ) max_vel = np.minimum( self.vel_max, self.velocity + self.info.acce * self._world_param.step_time ) return min_vel, max_vel
[docs] def get_info(self) -> ObjectInfo: """ Get object information. Returns: ObjectInfo: Information about the object. """ return self.info
[docs] def get_obstacle_info(self) -> ObstacleInfo: """ Get information about the object as an obstacle. Returns: ObstacleInfo: Obstacle-related information, including state, vertices, velocity, and radius. """ return ObstacleInfo( self.state[:2, :], self.vertices, self.velocity, self.radius, self.G, self.h, self.cone_type, self.convex_flag, )
[docs] def get_init_Gh(self) -> tuple[np.ndarray, np.ndarray]: """ Get the initial generalized inequality matrices G and h for the convex object. Returns: tuple[np.ndarray, np.ndarray]: Tuple containing initial G matrix and h vector. """ return self.gf.get_init_Gh()
[docs] def get_Gh(self) -> tuple[np.ndarray, np.ndarray]: """ Get the generalized inequality matrices G and h for the convex object. Returns: tuple[np.ndarray, np.ndarray]: Tuple containing G matrix and h vector. """ return self.gf.get_Gh( center=self.position, radius=self.radius, vertices=self.vertices )
[docs] def get_desired_omni_vel(self, goal_threshold=0.1, normalized=False) -> np.ndarray: """ Get the desired omnidirectional velocity of the object. Args: goal_threshold (float): Threshold for goal proximity. normalized (bool): Whether to normalize the velocity. """ if self.goal is None: return np.zeros((2, 1)) dis, radian = relative_position(self.state, self.goal) if dis > goal_threshold: vx = self.vel_max[0, 0] * cos(radian) vy = self.vel_max[1, 0] * sin(radian) else: vx = 0 vy = 0 if normalized: length = np.linalg.norm([vx, vy]) if length > 1: vx = vx / length vy = vy / length return np.array([[vx], [vy]])
@property def name(self) -> str: """ Get the name of the object. Returns: str: The name of the object. """ return self._name if self._name is not None else self.role + "_" + str(self.id) @property def group_name(self) -> str: """ Get the group name of the object. Returns: str: The group name of the object. """ return ( self._group_name if self._group_name is not None else self.role + "_" + str(self.group) ) @property def abbr(self) -> str: """ Get the abbreviation of the object. Returns: str: The abbreviation of the object. """ return self.role[0] + str(self.id) @property def goal_abbr(self) -> str: """ Get the goal abbreviation of the object. Returns: str: The goal abbreviation of the object. """ return "G" + "-" + self.role[0] + str(self.id) def _get_text(self) -> str: """Return custom text if set, otherwise the default abbreviation.""" return self._custom_text if self._custom_text is not None else self.abbr
[docs] def set_text(self, text: str | None) -> None: """ Set custom display text for this object. The text will be shown on the next render when ``show_text`` is enabled. Pass ``None`` to reset back to the default abbreviation. Args: text: The text string to display, or ``None`` to reset. """ self._custom_text = text if hasattr(self, "_text"): self._text.set_text(self._get_text())
def _get_goal_text(self) -> str: """Return custom goal text if set, otherwise the default goal abbreviation.""" return ( self._custom_goal_text if self._custom_goal_text is not None else self.goal_abbr )
[docs] def set_goal_text(self, text: str | None) -> None: """ Set custom display text for this object's goal. The text will be shown on the next render when ``show_goal_text`` is enabled. Pass ``None`` to reset back to the default goal abbreviation. Args: text: The text string to display, or ``None`` to reset. """ self._custom_goal_text = text if hasattr(self, "_goal_text"): self._goal_text.set_text(self._get_goal_text())
@property def shape(self) -> str: """ Get the shape name of the object. Returns: str: The shape name of the object. """ return self.gf.name @property def z(self) -> float: """ Get the z coordinate of the object. For 3D object, the z coordinate is the height of the object, for 2D object, the z coordinate is 0. Returns: float: The z coordinate of the object. """ return self.state[2, 0] if self.state_dim >= 6 else 0 @property def kinematics(self) -> str | None: """ Get the kinematics name of the object. Returns: str: The kinematics name of the object. """ return self.kf.name if self.kf is not None else None @property def geometry(self) -> BaseGeometry: """ Get the geometry Instance of the object. Returns: shapely.geometry.base.BaseGeometry: The geometry of the object. """ return self._geometry @property def centroid(self) -> np.ndarray: """ Get the centroid of the object. Returns: np.ndarray: The centroid of the object. """ return self._geometry.centroid.coords._coords.T @property def id(self) -> int: """ Get the id of the object. Returns: int: The id of the object. """ return self._id @property def state(self) -> np.ndarray: """ Get the state of the object. Returns: np.ndarray: The state of the object. """ return self._state @property def init_state(self) -> np.ndarray: """ Get the initial state of the object. Returns: np.ndarray: The initial state of the object. """ return self._init_state @property def velocity(self) -> np.ndarray: """ Get the velocity of the object. Returns: np.ndarray: The velocity of the object. """ return self._velocity @property def goal(self) -> np.ndarray | None: """ Get the goal of the object. Returns: np.ndarray: The goal of the object. """ if self._goal is None: return None return np.c_[self._goal[0]] @property def goal_vertices(self) -> np.ndarray: """ Get the goal vertices of the object. Returns: np.ndarray: The goal vertices of the object. """ return self._goal_vertices @property def position(self) -> np.ndarray: """ Get the position of the object. Returns: np.ndarray: The position of the object . """ return self._state[:2] @property def radius(self) -> float: """ Get the radius of the object. Returns: float: The radius of the object. """ return self.gf.radius @property def length(self) -> float: """ Get the length of the object. Returns: float: The length of the object. """ return self.gf.length @property def width(self) -> float: """ Get the width of the object. Returns: float: The width of the object. """ return self.gf.width @property def wheelbase(self) -> float: """ Get the wheelbase of the object. Returns: float: The wheelbase of the object. """ return self.gf.wheelbase @property def radius_extend(self) -> float: """ Get the radius of the object with a buffer. Returns: float: The radius of the object with a buffer. """ return self.radius + 0.1 @property def arrive(self) -> bool: """ Get the arrive flag of the object. Returns: bool: The arrive flag of the object. """ return self.arrive_flag @property def collision(self) -> bool: """ Get the collision flag of the object. Returns: bool: The collision flag of the object. """ return self.collision_flag @property def vertices(self) -> np.ndarray: """ Get the vertices of the object. Returns: np.ndarray: The vertices of the object. """ return self.gf.vertices @property def original_vertices(self) -> np.ndarray: """ Get the original vertices of the object. Returns: np.ndarray: The original vertices of the object before any transformations. """ return self.gf.original_vertices @property def original_geometry(self) -> BaseGeometry: """ Get the original geometry of the object. Returns: shapely.geometry.base.BaseGeometry: The original geometry of the object. """ return self.gf._original_geometry @property def original_centroid(self) -> np.ndarray: """ Get the center of the object. Returns: np.ndarray: The center of the object. """ return self.gf.original_centroid @property def original_state(self) -> np.ndarray: """ Get the original state of the object from the original centroid. Returns: np.ndarray (3,1): The original state of the object. """ return np.vstack((self.original_centroid, 0)) @property def external_objects(self): """ Get the environment objects that are not the self object. Returns: list: The environment objects that are not the self object. """ return [obj for obj in self._env_param.objects if self.id != obj.id] @property def ego_object(self): """ Get the ego object (this object itself). Returns: ObjectBase: The ego object (this object). """ return self @property def possible_collision_objects(self): """ Get the possible collision objects of the object from the geometry tree. Returns: list: The possible collision objects that could collide with this object. """ tree = self._env_param.GeometryTree possible = [] if tree is None: return possible candidates_index = tree.query(self.geometry) for index in candidates_index: obj = self._env_param.objects[index] if obj.unobstructed or obj.id == self.id: continue possible.append(obj) return possible @property def desired_omni_vel(self, goal_threshold=0.1): """ Calculate the desired omnidirectional velocity. Args: goal_threshold (float): Threshold for goal proximity. Returns: np.ndarray: Desired velocity [vx, vy]. """ if self.goal is None: return np.zeros((2, 1)) dis, radian = relative_position(self.state, self.goal) if dis > goal_threshold: vx = self.vel_max[0, 0] * cos(radian) vy = self.vel_max[1, 0] * sin(radian) else: vx = 0 vy = 0 return np.array([[vx], [vy]]) @property def rvo_neighbors(self): """ Get the list of RVO neighbors. Returns: list: List of RVO neighbor states [x, y, vx, vy, radius]. """ return [ obj.rvo_neighbor_state for obj in self.external_objects if not obj.unobstructed ] @property def rvo_neighbor_state(self): """ Get the RVO state for this object. Returns: list: State [x, y, vx, vy, radius]. """ cached = getattr(self, "_rvo_neighbor_state_cache", None) if cached is not None: return cached vxy = self.velocity_xy out = [ self.state[0, 0], self.state[1, 0], vxy[0, 0], vxy[1, 0], self.radius_extend, ] self._rvo_neighbor_state_cache = out return out @property def rvo_line_segments(self) -> list[list[float]]: """ Get line segments for RVO line obstacle avoidance. Returns: list: List of line segments [[x1, y1, x2, y2], ...] for linestring objects, empty list for other shapes. """ # Per-tick cache: invalidated in ``step()`` so a moving linestring # rebuilds its segments from the freshly transformed vertices on # the next read. Static objects never enter ``step()``, so their # cached segments persist for the lifetime of the env. cached = getattr(self, "_rvo_line_segments_cache", None) if cached is not None: return cached if self.shape != "linestring": self._rvo_line_segments_cache = [] return self._rvo_line_segments_cache verts = self.vertices # 2xN array segments = [ [verts[0, i], verts[1, i], verts[0, i + 1], verts[1, i + 1]] for i in range(verts.shape[1] - 1) ] self._rvo_line_segments_cache = segments return segments @property def rvo_state(self): """ Get the full RVO state including desired velocity. Returns: list: State [x, y, vx, vy, radius, vx_des, vy_des, theta]. """ vx_des, vy_des = self.desired_omni_vel[:, 0] return [ self.state[0, 0], self.state[1, 0], self.velocity_xy[0, 0], self.velocity_xy[1, 0], self.radius_extend, vx_des, vy_des, self.state[2, 0], ] @property def velocity_xy(self): """ Get the velocity in x and y directions. Returns: (2*1) np.ndarray: Velocity [vx, vy]. """ cached = getattr(self, "_velocity_xy_cache", None) if cached is not None: return cached if self.kf is not None: out = self.kf.velocity_to_xy(self.state, self.velocity) else: out = np.zeros((2, 1)) self._velocity_xy_cache = out return out @property def max_speed(self): """ Get the maximum speed of the object. Returns: float: The maximum speed of the object. """ if self.kf is not None: return self.kf.compute_max_speed(self.vel_max) return 0 @property def beh_config(self) -> dict[str, Any]: """ Get the behavior configuration for this object with group fallback. Returns: dict: The per-object behavior configuration if defined and non-empty; otherwise, the group's shared behavior configuration. """ return self.obj_behavior.behavior_dict or self.group_behavior_dict @property def _world_param(self): """ Access world_param via env instance if available, otherwise fallback to global. Returns: WorldParam: The world param instance. """ if self._env is not None: return self._env._world_param from irsim.config import world_param return world_param @property def _env_param(self): """ Access env_param via env instance if available, otherwise fallback to global. Returns: EnvParam: The env param instance. """ if self._env is not None: return self._env._env_param from irsim.config import env_param return env_param @property def world_param(self): """ Get the world parameters. Returns: WorldParam: World parameters including time, control_mode, collision_mode, step_time, and count. """ return self._world_param @property def env_param(self): """ Get the environment parameters. Returns: EnvParam: Environment parameters including logger and objects. """ return self._env_param @property def logger(self): """ Get the logger of the env_param. Returns: Logger: The logger associated in the env_param. """ return self._env_param.logger @property def heading(self): """ Get the heading of the object. Returns: float: The heading of the object. """ if self.kf is not None: return self.kf.compute_heading(self.state, self.velocity) if self.state.shape[0] > 2: return self.state[2, 0] return 0.0 @property def orientation(self): """ Get the orientation of the object. Returns: float: The orientation angle of the object in radians. """ return self.state[2, 0]