Source code for irsim.world.sensors.fmcw_lidar2d

"""Simplified 2D FMCW LiDAR with per-beam radial Doppler velocity."""

from math import cos, sin

import numpy as np
from matplotlib.colors import to_rgb
from mpl_toolkits.mplot3d import Axes3D
from shapely.geometry import LineString, MultiLineString, Point

from irsim.util.random import rng
from irsim.util.util import transform_point_with_state
from irsim.world.sensors.lidar2d import Lidar2D


[docs] class FMCWLidar2D(Lidar2D): """Simulate a 2D FMCW LiDAR using ray intersections plus Doppler projection. This sensor keeps the geometric scanning model of :class:`Lidar2D` but replaces the optional Cartesian target velocity output with a scalar radial velocity for each beam. The radial velocity is defined along the beam direction and can be reported either in the sensor-relative frame or motion-compensated to the world frame. In addition to scan geometry parameters inherited from ``Lidar2D``, this sensor accepts: Args: motion_compensate (bool): When ``False``, report target velocity relative to the sensor motion. When ``True``, remove the ego sensor motion and report world-frame radial target velocity. velocity_noise_std (float): Standard deviation of Gaussian noise applied to radial velocity measurements. velocity_color (bool): Whether to color beams by radial velocity when plotting. velocity_color_max (float): Magnitude at which the plotting color saturates. velocity_linewidth (float): Line width for valid Doppler beams. no_hit_linewidth (float): Line width for beams without valid returns. no_hit_alpha (float): Alpha for invalid beams in plots. show_velocity_markers (bool): Whether to draw colored endpoint markers for valid returns. velocity_marker_size (float): Scatter marker size for valid endpoints. zero_velocity_color (str): Plot color used near zero radial velocity. positive_velocity_color (str): Plot color used for positive radial velocity. negative_velocity_color (str): Plot color used for negative radial velocity. no_hit_color (str): Plot color used for invalid beams. """ def __init__( self, state: np.ndarray | None = None, obj_id: int = 0, motion_compensate: bool = False, velocity_noise_std: float = 0.0, **kwargs, ) -> None: super().__init__(state=state, obj_id=obj_id, has_velocity=False, **kwargs) self.sensor_type = "fmcw_lidar2d" self.motion_compensate = motion_compensate self.velocity_noise_std = velocity_noise_std # Visualization params: prefer the `plot:` sub-dict (set on the # base sensor as self._plot_cfg), fall back to flat top-level # keys for backward compatibility, then the default. def _pg(key, default): return self._plot_cfg.get(key, kwargs.get(key, default)) self.velocity_color = _pg("velocity_color", True) self.velocity_color_max = _pg("velocity_color_max", 2.0) self.velocity_linewidth = _pg("velocity_linewidth", 2.5) self.no_hit_linewidth = _pg("no_hit_linewidth", 0.8) self.no_hit_alpha = _pg("no_hit_alpha", 0.03) self.show_velocity_markers = _pg("show_velocity_markers", True) self.velocity_marker_size = _pg("velocity_marker_size", 36) self.velocity_marker_edge_color = _pg("velocity_marker_edge_color", "black") self.velocity_marker_edge_width = _pg("velocity_marker_edge_width", 0.6) self.zero_velocity_color = np.array(to_rgb(_pg("zero_velocity_color", "cyan"))) self.positive_velocity_color = np.array( to_rgb(_pg("positive_velocity_color", "crimson")) ) self.negative_velocity_color = np.array( to_rgb(_pg("negative_velocity_color", "royalblue")) ) self.no_hit_color = np.array(to_rgb(_pg("no_hit_color", "lightgray"))) self.radial_velocity = np.zeros(self.number) self.valid = np.zeros(self.number, dtype=bool)
[docs] def step(self, state): """Update ranges, validity flags, and radial velocities for every beam.""" self._state = state self.lidar_origin = transform_point_with_state(self.offset, self._state) origin_xy = np.array([self.lidar_origin[0, 0], self.lidar_origin[1, 0]]) sensor_theta = ( float(self.lidar_origin[2, 0]) if self.lidar_origin.shape[0] > 2 else 0.0 ) self.range_data[:] = self.range_max self.radial_velocity[:] = 0.0 self.valid[:] = False segments = [] for index, beam_angle in enumerate(self.angle_list): # Cast each beam independently so the sensor can keep the closest hit # object and project that object's motion onto the beam direction. world_angle = sensor_theta + beam_angle direction = np.array([cos(world_angle), sin(world_angle)]) ray_end = origin_xy + self.range_max * direction ray = LineString([tuple(origin_xy), tuple(ray_end)]) hit_distance, hit_object = self._find_nearest_hit(ray, origin_xy) if hit_distance is not None: if self.noise: hit_distance += rng.normal(0, self.std) # Treat noise-induced out-of-range measurements as invalid so # ``valid`` stays consistent with ``range_data`` and downstream # consumers (e.g. ``get_points``) cannot silently drop a beam # that is still flagged as a hit. if self.range_min <= hit_distance <= self.range_max: self.range_data[index] = hit_distance self.valid[index] = True radial_velocity = self._compute_radial_velocity( hit_object, direction ) if self.velocity_noise_std > 0: radial_velocity += rng.normal(0, self.velocity_noise_std) self.radial_velocity[index] = radial_velocity segment_end = origin_xy + self.range_data[index] * direction segments.append([tuple(origin_xy), tuple(segment_end)]) self._geometry = MultiLineString(segments)
def _plot(self, ax, state, **kwargs): """Plot beams, then color them from radial velocity for visualization.""" super()._plot(ax, state, **kwargs) self._apply_velocity_visuals() self._plot_velocity_markers(ax) def _step_plot(self): """Update beam geometry, then refresh velocity-based colors.""" super()._step_plot() self._apply_velocity_visuals() self._update_velocity_markers() def _find_nearest_hit(self, ray: LineString, origin_xy: np.ndarray): """Return the nearest intersected object and hit distance for one beam.""" object_tree = self._env_param.GeometryTree objects = self._env_param.objects if object_tree is None: return None, None origin_point = Point(*origin_xy) best_distance = None best_object = None for geom_index in object_tree.query(ray): obj = objects[geom_index] if obj._id == self.obj_id or not obj._geometry_valid or obj.unobstructed: continue geometry = self._get_candidate_geometry(obj, ray) if geometry is None: continue distance = self._nearest_intersection_distance(ray, geometry, origin_point) if distance is None: continue if best_distance is None or distance < best_distance: best_distance = distance best_object = obj return best_distance, best_object def _get_candidate_geometry(self, obj, ray: LineString): """Return the geometry subset that should be tested against a beam ray.""" if obj.shape == "map": intersecting_indices = obj.geometry_tree.query(ray, predicate="intersects") if len(intersecting_indices) == 0: return None return MultiLineString([obj.linestrings[i] for i in intersecting_indices]) if not ray.intersects(obj.geometry): return None return obj.geometry def _nearest_intersection_distance( self, ray: LineString, geometry, origin_point: Point, ) -> float | None: """Return the closest intersection distance along a beam ray.""" intersection = ray.intersection(geometry) best_distance = None for point in self._iter_intersection_points(intersection): distance = origin_point.distance(point) if distance <= 1e-9 or distance > self.range_max + 1e-9: continue if best_distance is None or distance < best_distance: best_distance = distance return best_distance def _iter_intersection_points(self, geometry): """Yield representative points from a Shapely intersection result.""" if geometry.is_empty: return geom_type = geometry.geom_type if geom_type == "Point": yield geometry return if geom_type == "MultiPoint": yield from geometry.geoms return if geom_type in {"LineString", "LinearRing"}: for coord in geometry.coords: yield Point(coord) return if geom_type == "MultiLineString": for part in geometry.geoms: yield from self._iter_intersection_points(part) return if geom_type == "GeometryCollection": for part in geometry.geoms: yield from self._iter_intersection_points(part) def _compute_radial_velocity(self, obj, direction: np.ndarray) -> float: """Project the target motion onto the beam direction.""" target_velocity = np.asarray(obj.velocity_xy, dtype=float).reshape(-1)[:2] if self.motion_compensate: relative_velocity = target_velocity else: relative_velocity = target_velocity - self._sensor_velocity_xy() return float(np.dot(relative_velocity, direction)) def _sensor_velocity_xy(self) -> np.ndarray: """Return the parent object's translational velocity in world XY.""" if self.parent is None or not hasattr(self.parent, "velocity_xy"): return np.zeros(2) return np.asarray(self.parent.velocity_xy, dtype=float).reshape(-1)[:2] def _apply_velocity_visuals(self): """Apply per-beam colors based on radial velocity without changing data.""" if not self.velocity_color or not hasattr(self, "laser_LineCollection"): return colors, alphas = self._get_velocity_visuals() self.laser_LineCollection.set_color(colors) self.laser_LineCollection.set_alpha(alphas) self.laser_LineCollection.set_linewidths(self._get_velocity_linewidths()) def _get_velocity_visuals(self): """Return colors and alpha values derived from current radial velocities.""" scale = max(float(self.velocity_color_max), 1e-6) colors = [] alphas = [] for is_valid, radial_velocity in zip( self.valid, self.radial_velocity, strict=True ): if not is_valid: colors.append(tuple(self.no_hit_color)) alphas.append(self.no_hit_alpha) continue ratio = min(abs(float(radial_velocity)) / scale, 1.0) if radial_velocity >= 0: color = ( 1.0 - ratio ) * self.zero_velocity_color + ratio * self.positive_velocity_color else: color = ( 1.0 - ratio ) * self.zero_velocity_color + ratio * self.negative_velocity_color colors.append(tuple(color)) alphas.append(1.0) return colors, alphas def _get_velocity_linewidths(self): """Return per-beam linewidths for clearer velocity visualization.""" return [ self.velocity_linewidth if is_valid else self.no_hit_linewidth for is_valid in self.valid ] def _plot_velocity_markers(self, ax): """Draw colored hit markers at valid beam endpoints for 2D plots.""" if not self.show_velocity_markers or isinstance(ax, Axes3D): return points, colors = self._get_velocity_marker_points() self.velocity_marker_plot = ax.scatter( points[:, 0] if len(points) > 0 else [], points[:, 1] if len(points) > 0 else [], s=self.velocity_marker_size, c=colors if len(colors) > 0 else None, edgecolors=self.velocity_marker_edge_color, linewidths=self.velocity_marker_edge_width, zorder=4, ) self.plot_patch_list.append(self.velocity_marker_plot) def _update_velocity_markers(self): """Refresh hit marker positions and colors without touching measurements.""" if not self.show_velocity_markers or not hasattr(self, "velocity_marker_plot"): return points, colors = self._get_velocity_marker_points() self.velocity_marker_plot.set_offsets(points) if len(colors) > 0: self.velocity_marker_plot.set_facecolor(colors) else: self.velocity_marker_plot.set_facecolor(np.empty((0, 4))) def _get_velocity_marker_points(self): """Return world-space hit points and colors for valid beams.""" if not np.any(self.valid): return np.empty((0, 2)), np.empty((0, 3)) lidar_theta = ( float(self.lidar_origin[2, 0]) if self.lidar_origin.shape[0] > 2 else 0.0 ) origin_xy = np.array([self.lidar_origin[0, 0], self.lidar_origin[1, 0]]) colors, _ = self._get_velocity_visuals() points = [] point_colors = [] for is_valid, beam_angle, beam_range, color in zip( self.valid, self.angle_list, self.range_data, colors, strict=True, ): if not is_valid: continue world_angle = lidar_theta + beam_angle direction = np.array([cos(world_angle), sin(world_angle)]) points.append(origin_xy + beam_range * direction) point_colors.append(color) return np.asarray(points), np.asarray(point_colors)
[docs] def get_scan(self): """Get the FMCW scan with radial velocity and validity per beam. Returns: dict: Scan dictionary containing the standard LiDAR angular metadata plus ``ranges``, ``radial_velocity``, ``valid``, and ``intensities=None``. """ scan_data = super().get_scan() scan_data.pop("velocity", None) scan_data["intensities"] = None scan_data["radial_velocity"] = self.radial_velocity scan_data["valid"] = self.valid return scan_data