Source code for irsim.world.sensors.lidar2d

from math import cos, pi, sin
from typing import TYPE_CHECKING

import matplotlib.transforms as mtransforms
import numpy as np
import shapely
from matplotlib.collections import LineCollection
from mpl_toolkits.mplot3d import Axes3D
from mpl_toolkits.mplot3d.art3d import Line3DCollection
from shapely import MultiLineString, prepare
from shapely.geometry import GeometryCollection

from irsim.util.random import rng
from irsim.util.util import (
    WrapTo2Pi,
    geometry_transform,
    transform_point_with_state,
)

if TYPE_CHECKING:
    from irsim.world.object_base import ObjectBase


[docs] class Lidar2D: """ Simulates a 2D Lidar sensor for detecting obstacles in the environment. Args: state (np.ndarray): Initial state of the sensor. obj_id (int): ID of the associated object. range_min (float): Minimum detection range. range_max (float): Maximum detection range. angle_range (float): Total angle range of the sensor. number (int): Number of laser beams. scan_time (float): Time taken for one complete scan. noise (bool): Whether noise is added to measurements. std (float): Standard deviation for range noise. angle_std (float): Standard deviation for angle noise. offset (list): Offset of the sensor from the object's position. alpha (float): Transparency for plotting. has_velocity (bool): Whether the sensor measures velocity. **kwargs: Additional arguments. color (str): Color of the sensor. Attr: - sensor_type (str): Type of sensor ("lidar2d"). Default is "lidar2d". - range_min (float): Minimum detection range in meters. Default is 0. - range_max (float): Maximum detection range in meters. Default is 10. - angle_range (float): Total angle range of the sensor in radians. Default is pi. WrapTo2Pi is applied. - angle_min (float): Starting angle of the sensor's scan relative to the forward direction in radians. Calculated as -angle_range / 2. - angle_max (float): Ending angle of the sensor's scan relative to the forward direction in radians. Calculated as angle_range / 2. - angle_inc (float): Angular increment between each laser beam in radians. Calculated as angle_range / number. - number (int): Number of laser beams. Default is 100. - scan_time (float): Time taken to complete one full scan in seconds. Default is 0.1. - noise (bool): Whether to add noise to the measurements. Default is False. - std (float): Standard deviation for range noise in meters. Effective only if `noise` is True. Default is 0.2. - angle_std (float): Standard deviation for angle noise in radians. Effective only if `noise` is True. Default is 0.02. - offset (np.ndarray): Offset of the sensor relative to the object's position, formatted as [x, y, theta]. Default is [0, 0, 0]. - lidar_origin (np.ndarray): Origin position of the Lidar sensor, considering offset and the object's state. - alpha (float): Transparency level for plotting the laser beams. Default is 0.3. - has_velocity (bool): Whether the sensor measures the velocity of detected points. Default is False. - velocity (np.ndarray): Velocity data for each laser beam, formatted as (2, number) array. Effective only if `has_velocity` is True. Initialized to zeros. - time_inc (float): Time increment for each scan, simulating the sensor's time resolution. Default is 5e-4. - range_data (np.ndarray): Array storing range data for each laser beam. Initialized to `range_max` for all beams. - angle_list (np.ndarray): Array of angles corresponding to each laser beam, distributed linearly from `angle_min` to `angle_max`. - color (str): Color of the sensor's representation in visualizations. Default is "r" (red). - obj_id (int): ID of the associated object, used to differentiate between multiple sensors or objects in the environment. Default is 0. - plot_patch_list (list): List storing plot patches (e.g., line collections) for visualization purposes. - plot_line_list (list): List storing plot lines for visualization purposes. - plot_text_list (list): List storing plot text elements for visualization purposes. """ def __init__( self, state: np.ndarray | None = None, obj_id: int = 0, range_min: float = 0, range_max: float = 10, angle_range: float = pi, number: int = 100, scan_time: float = 0.1, noise: bool = False, std: float = 0.2, angle_std: float = 0.02, offset: list[float] | None = None, alpha: float = 0.3, has_velocity: bool = False, **kwargs, ) -> None: """ Initialize the Lidar2D sensor. """ if offset is None: offset = [0, 0, 0] self.sensor_type = "lidar2d" self.range_min = range_min self.range_max = range_max self.angle_range = WrapTo2Pi(angle_range) self.angle_min = -self.angle_range / 2 self.angle_max = self.angle_range / 2 self.angle_inc = self.angle_range / number self.number = number self.scan_time = scan_time self.noise = noise self.std = std self.angle_std = angle_std self.offset = np.c_[offset] self.alpha = alpha self.has_velocity = has_velocity self.velocity = np.zeros((2, number)) self.time_inc = (self.angle_range / (2 * pi)) * scan_time / number self.range_data = range_max * np.ones(number) self.angle_list = np.linspace(self.angle_min, self.angle_max, num=number) self._state = state self.init_geometry(self._state) self.color = kwargs.get("color", "r") self.obj_id = obj_id # Parent object reference (set by ObjectBase or SensorFactory) self.parent: ObjectBase | None = None self.plot_patch_list = [] self.plot_line_list = [] self.plot_text_list = [] @property def _env_param(self): """Access env_param via parent's env instance if available.""" if self.parent is not None and self.parent._env is not None: return self.parent._env._env_param from irsim.config import env_param return env_param def _ensure_multi_linestring(self, geometry): """ Ensure geometry is a MultiLineString, converting if necessary. Args: geometry: Shapely geometry object. Returns: MultiLineString: Geometry as MultiLineString. """ if geometry.geom_type == "LineString": return MultiLineString([geometry]) if geometry.geom_type == "MultiLineString": return geometry if geometry.is_empty: return MultiLineString() if geometry.geom_type == "GeometryCollection": # Extract LineString components (and any lines from nested MultiLineStrings) linestrings = [] for g in geometry.geoms: if g.geom_type == "LineString": linestrings.append(g) elif g.geom_type == "MultiLineString": linestrings.extend(list(g.geoms)) return MultiLineString(linestrings) if linestrings else MultiLineString() # For unsupported geometry types (e.g., Point, Polygon), return an empty MultiLineString return MultiLineString()
[docs] def init_geometry(self, state): """ Initialize the Lidar's scanning geometry. Args: state (np.ndarray): Current state of the sensor. """ segment_point_list = [] for i in range(self.number): x = self.range_data[i] * cos(self.angle_list[i]) y = self.range_data[i] * sin(self.angle_list[i]) point0 = np.zeros((1, 2)) point = np.array([[x], [y]]).T segment = np.concatenate((point0, point), axis=0) segment_point_list.append(segment) self.origin_state = self.offset geometry = MultiLineString(segment_point_list) self._original_geometry = geometry_transform(geometry, self.origin_state) self.lidar_origin = transform_point_with_state(self.offset, state) self._geometry = geometry_transform(self._original_geometry, state) self._init_geometry = self._geometry
[docs] def step(self, state): """ Update the Lidar's state and process intersections with environment objects. Args: state (np.ndarray): New state of the sensor. """ self._state = state self.lidar_origin = transform_point_with_state(self.offset, self._state) new_geometry = geometry_transform(self._original_geometry, self._state) prepare(new_geometry) new_geometry, intersect_indices = self.laser_geometry_process(new_geometry) if len(intersect_indices) == 0: self._geometry = self._ensure_multi_linestring(new_geometry) self.calculate_range() else: origin_pt = shapely.points(self.lidar_origin[0, 0], self.lidar_origin[1, 0]) parts = shapely.get_parts(new_geometry) self._geometry = MultiLineString( list(parts[shapely.intersects(parts, origin_pt)]) ) self.calculate_range_vel(intersect_indices)
[docs] def laser_geometry_process(self, lidar_geometry): """ Find the intersected objects and return the intersected indices with the lidar geometry Args: lidar_geometry (shapely.geometry.MultiLineString): The geometry of the lidar. Returns: list: The indices of the intersected objects. """ object_tree = self._env_param.GeometryTree objects = self._env_param.objects geometries = [obj._geometry for obj in objects] # Guard against missing geometry index if object_tree is None: return lidar_geometry, [] potential_geometries_index = object_tree.query(lidar_geometry) geometries_to_subtract = [] intersect_indices = [] for geom_index in potential_geometries_index: geo = geometries[geom_index] obj = objects[geom_index] if obj._id == self.obj_id or not obj._geometry_valid or obj.unobstructed: continue if obj.shape == "map": intersecting_indices = obj.geometry_tree.query( lidar_geometry, predicate="intersects" ) if len(intersecting_indices) > 0: filtered_lines = [obj.linestrings[i] for i in intersecting_indices] geometries_to_subtract.extend(filtered_lines) intersect_indices.append(geom_index) else: if lidar_geometry.intersects(geo): geometries_to_subtract.append(geo) intersect_indices.append(geom_index) if geometries_to_subtract: obstacle = ( geometries_to_subtract[0] if len(geometries_to_subtract) == 1 else GeometryCollection(geometries_to_subtract) ) lidar_geometry = lidar_geometry.difference(obstacle) lidar_geometry = self._ensure_multi_linestring(lidar_geometry) return lidar_geometry, intersect_indices
[docs] def calculate_range(self): """ Calculate the range data from the current geometry. """ # Reset all beams to the default maximum range to avoid stale values self.range_data[:] = self.range_max parts = shapely.get_parts(self._geometry) lengths = shapely.length(parts) if self.noise: self.range_data[: len(lengths)] = lengths + rng.normal( 0, self.std, len(lengths) ) else: self.range_data[: len(lengths)] = lengths
[docs] def calculate_range_vel(self, intersect_index): """ Calculate the range data and velocities from intersected geometries. Args: intersect_index (list): List of intersected object indices. """ parts = shapely.get_parts(self._geometry) lengths = shapely.length(parts) if self.noise: self.range_data[: len(lengths)] = lengths + rng.normal( 0, self.std, len(lengths) ) else: self.range_data[: len(lengths)] = lengths if self.has_velocity: # Reset all beam velocities to avoid carrying over stale values self.velocity[:] = 0.0 for index, (line, length) in enumerate(zip(parts, lengths, strict=True)): if length < self.range_max - 0.02: for index_obj in intersect_index: obj = self._env_param.objects[index_obj] if obj.geometry.distance(line) < 0.1: self.velocity[:, index : index + 1] = obj.velocity_xy break
[docs] def get_scan(self): """ Get the 2D lidar scan data. refer to the ros topic scan: http://docs.ros.org/en/melodic/api/sensor_msgs/html/msg/LaserScan.html Returns: dict: Scan data including angles, ranges, and velocities. """ scan_data = {} scan_data["angle_min"] = self.angle_min scan_data["angle_max"] = self.angle_max scan_data["angle_increment"] = self.angle_inc scan_data["time_increment"] = self.time_inc scan_data["scan_time"] = self.scan_time scan_data["range_min"] = self.range_min scan_data["range_max"] = self.range_max scan_data["ranges"] = self.range_data scan_data["intensities"] = None scan_data["velocity"] = self.velocity return scan_data
[docs] def get_points(self): """ Convert scan data to a point cloud. Returns: np.ndarray: Point cloud (2xN). """ return self.scan_to_pointcloud()
[docs] def get_offset(self): """ Get the sensor's offset. Returns: list: Offset as a list. """ return np.squeeze(self.offset).tolist()
[docs] def plot(self, ax, state: np.ndarray | None = None, **kwargs): """ Plot the Lidar's detected lines on a given axis. """ if state is None: state = self.state self._plot(ax, state, **kwargs)
def _init_plot(self, ax, **kwargs): """ Initialize the plot for the Lidar. """ self._plot(ax, self.origin_state, **kwargs) @property def state(self) -> np.ndarray: """ Get the current state of the lidar sensor. Returns: np.ndarray: Current state of the sensor. """ return self._state def _plot(self, ax, state, **kwargs): """ Plot the Lidar's detected lines using the specified state for positioning. Creates line segments in local coordinates and applies transforms to position them. Args: ax: Matplotlib axis. state: State vector [x, y, theta, ...] defining lidar position and orientation. **kwargs: Plotting options. """ lines = [] if isinstance(ax, Axes3D): # For 3D plotting, calculate actual world coordinates since transforms don't work the same way if state is not None and len(state) > 0: # Calculate lidar position based on object state and sensor offset lidar_x = self.lidar_origin[0, 0] lidar_y = self.lidar_origin[1, 0] lidar_theta = ( self.lidar_origin[2, 0] if self.lidar_origin.shape[0] > 2 else 0 ) else: lidar_x, lidar_y, lidar_theta = 0, 0, 0 # Create line segments in world coordinates for 3D for i in range(self.number): x_local = self.range_data[i] * cos(self.angle_list[i]) y_local = self.range_data[i] * sin(self.angle_list[i]) # Transform to world coordinates x_world = ( lidar_x + x_local * cos(lidar_theta) - y_local * sin(lidar_theta) ) y_world = ( lidar_y + x_local * sin(lidar_theta) + y_local * cos(lidar_theta) ) start_point = np.array([lidar_x, lidar_y, 0]) end_point = np.array([x_world, y_world, 0]) segment = [start_point, end_point] lines.append(segment) self.laser_LineCollection = Line3DCollection( lines, linewidths=1, colors=self.color, alpha=self.alpha, zorder=2 ) ax.add_collection3d(self.laser_LineCollection) else: # For 2D plotting, create line segments in local coordinates and use transforms for i in range(self.number): x = self.range_data[i] * cos(self.angle_list[i]) y = self.range_data[i] * sin(self.angle_list[i]) segment = [np.array([0, 0]), np.array([x, y])] lines.append(segment) self.laser_LineCollection = LineCollection( lines, linewidths=1, colors=self.color, alpha=self.alpha, zorder=2 ) ax.add_collection(self.laser_LineCollection) # Apply transform for 2D case - use provided state for positioning if state is not None and len(state) > 0: lidar_x = self.lidar_origin[0, 0] lidar_y = self.lidar_origin[1, 0] lidar_theta = ( self.lidar_origin[2, 0] if self.lidar_origin.shape[0] > 2 else 0 ) # Create transform: rotate by lidar orientation, then translate to lidar position trans = ( mtransforms.Affine2D() .rotate(lidar_theta) .translate(lidar_x, lidar_y) + ax.transData ) self.laser_LineCollection.set_transform(trans) self.plot_patch_list.append(self.laser_LineCollection) def _step_plot(self): """ Update the lidar visualization using matplotlib transforms based on current state. Creates line segments in local coordinates and applies transform to position them. """ if not hasattr(self, "laser_LineCollection"): return ax = self.laser_LineCollection.axes lines = [] if ax is None: return if isinstance(ax, Axes3D): # For 3D plotting, calculate actual world coordinates lidar_x = self.lidar_origin[0, 0] lidar_y = self.lidar_origin[1, 0] lidar_theta = ( self.lidar_origin[2, 0] if self.lidar_origin.shape[0] > 2 else 0 ) # Create line segments in world coordinates for 3D for i in range(self.number): x_local = self.range_data[i] * cos(self.angle_list[i]) y_local = self.range_data[i] * sin(self.angle_list[i]) # Transform to world coordinates x_world = ( lidar_x + x_local * cos(lidar_theta) - y_local * sin(lidar_theta) ) y_world = ( lidar_y + x_local * sin(lidar_theta) + y_local * cos(lidar_theta) ) start_point = np.array([lidar_x, lidar_y, 0]) end_point = np.array([x_world, y_world, 0]) segment = [start_point, end_point] lines.append(segment) else: # For 2D plotting, create line segments in local coordinates for i in range(self.number): x = self.range_data[i] * cos(self.angle_list[i]) y = self.range_data[i] * sin(self.angle_list[i]) segment = [np.array([0, 0]), np.array([x, y])] lines.append(segment) # Update line segments self.laser_LineCollection.set_segments(lines) # Apply transform to position the LineCollection based on current lidar origin (2D only) if not isinstance(ax, Axes3D): # 2D case lidar_x = self.lidar_origin[0, 0] lidar_y = self.lidar_origin[1, 0] lidar_theta = ( self.lidar_origin[2, 0] if self.lidar_origin.shape[0] > 2 else 0 ) # Create transform: rotate by lidar orientation, then translate to lidar position trans = ( mtransforms.Affine2D().rotate(lidar_theta).translate(lidar_x, lidar_y) + ax.transData ) self.laser_LineCollection.set_transform(trans)
[docs] def step_plot(self): """ Public method to update the lidar visualization, calls _step_plot. """ self._step_plot()
[docs] def set_laser_color( self, laser_indices, laser_color: str = "blue", alpha: float = 0.3 ): """ Set a specific color of the selected lasers. Args: laser_indices (list): The indices of the lasers to set the color. laser_color (str): The color to set the selected lasers. Default is 'blue'. alpha (float): The transparency of the lasers. Default is 0.3. """ current_color = [self.color] * self.number current_alpha = [self.alpha] * self.number for index in laser_indices: if index < self.number: current_color[index] = laser_color current_alpha[index] = alpha self.laser_LineCollection.set_color(current_color) self.laser_LineCollection.set_alpha(current_alpha)
[docs] def plot_clear(self): """ Clear the plot elements from the axis. """ [patch.remove() for patch in self.plot_patch_list] [line.pop(0).remove() for line in self.plot_line_list] [text.remove() for text in self.plot_text_list] self.plot_patch_list = [] self.plot_line_list = [] self.plot_text_list = []
[docs] def scan_to_pointcloud(self): """ Convert the Lidar scan data to a point cloud. Returns: np.ndarray: Point cloud (2xN). """ point_cloud = [] ranges = self.range_data angles = np.linspace(self.angle_min, self.angle_max, len(ranges)) for i in range(len(ranges)): scan_range = ranges[i] angle = angles[i] if scan_range < (self.range_max - 0.02): point = np.array([[scan_range * cos(angle)], [scan_range * sin(angle)]]) point_cloud.append(point) if len(point_cloud) == 0: return None return np.hstack(point_cloud)