from math import cos, pi, sin
from typing import TYPE_CHECKING
import matplotlib.transforms as mtransforms
import numpy as np
import shapely
from matplotlib.collections import LineCollection
from mpl_toolkits.mplot3d import Axes3D
from mpl_toolkits.mplot3d.art3d import Line3DCollection
from shapely import MultiLineString, prepare
from shapely.geometry import GeometryCollection
from irsim.util.random import rng
from irsim.util.util import (
WrapTo2Pi,
geometry_transform,
transform_point_with_state,
)
if TYPE_CHECKING:
from irsim.world.object_base import ObjectBase
[docs]
class Lidar2D:
"""
Simulates a 2D Lidar sensor for detecting obstacles in the environment.
Args:
state (np.ndarray): Initial state of the sensor.
obj_id (int): ID of the associated object.
range_min (float): Minimum detection range.
range_max (float): Maximum detection range.
angle_range (float): Total angle range of the sensor.
number (int): Number of laser beams.
scan_time (float): Time taken for one complete scan.
noise (bool): Whether noise is added to measurements.
std (float): Standard deviation for range noise.
angle_std (float): Standard deviation for angle noise.
offset (list): Offset of the sensor from the object's position.
alpha (float): Transparency for plotting.
has_velocity (bool): Whether the sensor measures velocity.
**kwargs: Additional arguments.
color (str): Color of the sensor.
Attr:
- sensor_type (str): Type of sensor ("lidar2d"). Default is "lidar2d".
- range_min (float): Minimum detection range in meters. Default is 0.
- range_max (float): Maximum detection range in meters. Default is 10.
- angle_range (float): Total angle range of the sensor in radians. Default is pi. WrapTo2Pi is applied.
- angle_min (float): Starting angle of the sensor's scan relative to the forward direction in radians. Calculated as -angle_range / 2.
- angle_max (float): Ending angle of the sensor's scan relative to the forward direction in radians. Calculated as angle_range / 2.
- angle_inc (float): Angular increment between each laser beam in radians. Calculated as angle_range / number.
- number (int): Number of laser beams. Default is 100.
- scan_time (float): Time taken to complete one full scan in seconds. Default is 0.1.
- noise (bool): Whether to add noise to the measurements. Default is False.
- std (float): Standard deviation for range noise in meters. Effective only if `noise` is True. Default is 0.2.
- angle_std (float): Standard deviation for angle noise in radians. Effective only if `noise` is True. Default is 0.02.
- offset (np.ndarray): Offset of the sensor relative to the object's position, formatted as [x, y, theta]. Default is [0, 0, 0].
- lidar_origin (np.ndarray): Origin position of the Lidar sensor, considering offset and the object's state.
- alpha (float): Transparency level for plotting the laser beams. Default is 0.3.
- has_velocity (bool): Whether the sensor measures the velocity of detected points. Default is False.
- velocity (np.ndarray): Velocity data for each laser beam, formatted as (2, number) array. Effective only if `has_velocity` is True. Initialized to zeros.
- time_inc (float): Time increment for each scan, simulating the sensor's time resolution. Default is 5e-4.
- range_data (np.ndarray): Array storing range data for each laser beam. Initialized to `range_max` for all beams.
- angle_list (np.ndarray): Array of angles corresponding to each laser beam, distributed linearly from `angle_min` to `angle_max`.
- color (str): Color of the sensor's representation in visualizations. Default is "r" (red).
- obj_id (int): ID of the associated object, used to differentiate between multiple sensors or objects in the environment. Default is 0.
- plot_patch_list (list): List storing plot patches (e.g., line collections) for visualization purposes.
- plot_line_list (list): List storing plot lines for visualization purposes.
- plot_text_list (list): List storing plot text elements for visualization purposes.
"""
def __init__(
self,
state: np.ndarray | None = None,
obj_id: int = 0,
range_min: float = 0,
range_max: float = 10,
angle_range: float = pi,
number: int = 100,
scan_time: float = 0.1,
noise: bool = False,
std: float = 0.2,
angle_std: float = 0.02,
offset: list[float] | None = None,
alpha: float = 0.3,
has_velocity: bool = False,
**kwargs,
) -> None:
"""
Initialize the Lidar2D sensor.
"""
if offset is None:
offset = [0, 0, 0]
self.sensor_type = "lidar2d"
self.range_min = range_min
self.range_max = range_max
self.angle_range = WrapTo2Pi(angle_range)
self.angle_min = -self.angle_range / 2
self.angle_max = self.angle_range / 2
self.angle_inc = self.angle_range / number
self.number = number
self.scan_time = scan_time
self.noise = noise
self.std = std
self.angle_std = angle_std
self.offset = np.c_[offset]
self.alpha = alpha
self.has_velocity = has_velocity
self.velocity = np.zeros((2, number))
self.time_inc = (self.angle_range / (2 * pi)) * scan_time / number
self.range_data = range_max * np.ones(number)
self.angle_list = np.linspace(self.angle_min, self.angle_max, num=number)
self._state = state
self.init_geometry(self._state)
self.color = kwargs.get("color", "r")
self.obj_id = obj_id
# Parent object reference (set by ObjectBase or SensorFactory)
self.parent: ObjectBase | None = None
self.plot_patch_list = []
self.plot_line_list = []
self.plot_text_list = []
@property
def _env_param(self):
"""Access env_param via parent's env instance if available."""
if self.parent is not None and self.parent._env is not None:
return self.parent._env._env_param
from irsim.config import env_param
return env_param
def _ensure_multi_linestring(self, geometry):
"""
Ensure geometry is a MultiLineString, converting if necessary.
Args:
geometry: Shapely geometry object.
Returns:
MultiLineString: Geometry as MultiLineString.
"""
if geometry.geom_type == "LineString":
return MultiLineString([geometry])
if geometry.geom_type == "MultiLineString":
return geometry
if geometry.is_empty:
return MultiLineString()
if geometry.geom_type == "GeometryCollection":
# Extract LineString components (and any lines from nested MultiLineStrings)
linestrings = []
for g in geometry.geoms:
if g.geom_type == "LineString":
linestrings.append(g)
elif g.geom_type == "MultiLineString":
linestrings.extend(list(g.geoms))
return MultiLineString(linestrings) if linestrings else MultiLineString()
# For unsupported geometry types (e.g., Point, Polygon), return an empty MultiLineString
return MultiLineString()
[docs]
def init_geometry(self, state):
"""
Initialize the Lidar's scanning geometry.
Args:
state (np.ndarray): Current state of the sensor.
"""
segment_point_list = []
for i in range(self.number):
x = self.range_data[i] * cos(self.angle_list[i])
y = self.range_data[i] * sin(self.angle_list[i])
point0 = np.zeros((1, 2))
point = np.array([[x], [y]]).T
segment = np.concatenate((point0, point), axis=0)
segment_point_list.append(segment)
self.origin_state = self.offset
geometry = MultiLineString(segment_point_list)
self._original_geometry = geometry_transform(geometry, self.origin_state)
self.lidar_origin = transform_point_with_state(self.offset, state)
self._geometry = geometry_transform(self._original_geometry, state)
self._init_geometry = self._geometry
[docs]
def step(self, state):
"""
Update the Lidar's state and process intersections with environment objects.
Args:
state (np.ndarray): New state of the sensor.
"""
self._state = state
self.lidar_origin = transform_point_with_state(self.offset, self._state)
new_geometry = geometry_transform(self._original_geometry, self._state)
prepare(new_geometry)
new_geometry, intersect_indices = self.laser_geometry_process(new_geometry)
if len(intersect_indices) == 0:
self._geometry = self._ensure_multi_linestring(new_geometry)
self.calculate_range()
else:
origin_pt = shapely.points(self.lidar_origin[0, 0], self.lidar_origin[1, 0])
parts = shapely.get_parts(new_geometry)
self._geometry = MultiLineString(
list(parts[shapely.intersects(parts, origin_pt)])
)
self.calculate_range_vel(intersect_indices)
[docs]
def laser_geometry_process(self, lidar_geometry):
"""
Find the intersected objects and return the intersected indices with the lidar geometry
Args:
lidar_geometry (shapely.geometry.MultiLineString): The geometry of the lidar.
Returns:
list: The indices of the intersected objects.
"""
object_tree = self._env_param.GeometryTree
objects = self._env_param.objects
geometries = [obj._geometry for obj in objects]
# Guard against missing geometry index
if object_tree is None:
return lidar_geometry, []
potential_geometries_index = object_tree.query(lidar_geometry)
geometries_to_subtract = []
intersect_indices = []
for geom_index in potential_geometries_index:
geo = geometries[geom_index]
obj = objects[geom_index]
if obj._id == self.obj_id or not obj._geometry_valid or obj.unobstructed:
continue
if obj.shape == "map":
intersecting_indices = obj.geometry_tree.query(
lidar_geometry, predicate="intersects"
)
if len(intersecting_indices) > 0:
filtered_lines = [obj.linestrings[i] for i in intersecting_indices]
geometries_to_subtract.extend(filtered_lines)
intersect_indices.append(geom_index)
else:
if lidar_geometry.intersects(geo):
geometries_to_subtract.append(geo)
intersect_indices.append(geom_index)
if geometries_to_subtract:
obstacle = (
geometries_to_subtract[0]
if len(geometries_to_subtract) == 1
else GeometryCollection(geometries_to_subtract)
)
lidar_geometry = lidar_geometry.difference(obstacle)
lidar_geometry = self._ensure_multi_linestring(lidar_geometry)
return lidar_geometry, intersect_indices
[docs]
def calculate_range(self):
"""
Calculate the range data from the current geometry.
"""
# Reset all beams to the default maximum range to avoid stale values
self.range_data[:] = self.range_max
parts = shapely.get_parts(self._geometry)
lengths = shapely.length(parts)
if self.noise:
self.range_data[: len(lengths)] = lengths + rng.normal(
0, self.std, len(lengths)
)
else:
self.range_data[: len(lengths)] = lengths
[docs]
def calculate_range_vel(self, intersect_index):
"""
Calculate the range data and velocities from intersected geometries.
Args:
intersect_index (list): List of intersected object indices.
"""
parts = shapely.get_parts(self._geometry)
lengths = shapely.length(parts)
if self.noise:
self.range_data[: len(lengths)] = lengths + rng.normal(
0, self.std, len(lengths)
)
else:
self.range_data[: len(lengths)] = lengths
if self.has_velocity:
# Reset all beam velocities to avoid carrying over stale values
self.velocity[:] = 0.0
for index, (line, length) in enumerate(zip(parts, lengths, strict=True)):
if length < self.range_max - 0.02:
for index_obj in intersect_index:
obj = self._env_param.objects[index_obj]
if obj.geometry.distance(line) < 0.1:
self.velocity[:, index : index + 1] = obj.velocity_xy
break
[docs]
def get_scan(self):
"""
Get the 2D lidar scan data. refer to the ros topic scan: http://docs.ros.org/en/melodic/api/sensor_msgs/html/msg/LaserScan.html
Returns:
dict: Scan data including angles, ranges, and velocities.
"""
scan_data = {}
scan_data["angle_min"] = self.angle_min
scan_data["angle_max"] = self.angle_max
scan_data["angle_increment"] = self.angle_inc
scan_data["time_increment"] = self.time_inc
scan_data["scan_time"] = self.scan_time
scan_data["range_min"] = self.range_min
scan_data["range_max"] = self.range_max
scan_data["ranges"] = self.range_data
scan_data["intensities"] = None
scan_data["velocity"] = self.velocity
return scan_data
[docs]
def get_points(self):
"""
Convert scan data to a point cloud.
Returns:
np.ndarray: Point cloud (2xN).
"""
return self.scan_to_pointcloud()
[docs]
def get_offset(self):
"""
Get the sensor's offset.
Returns:
list: Offset as a list.
"""
return np.squeeze(self.offset).tolist()
[docs]
def plot(self, ax, state: np.ndarray | None = None, **kwargs):
"""
Plot the Lidar's detected lines on a given axis.
"""
if state is None:
state = self.state
self._plot(ax, state, **kwargs)
def _init_plot(self, ax, **kwargs):
"""
Initialize the plot for the Lidar.
"""
self._plot(ax, self.origin_state, **kwargs)
@property
def state(self) -> np.ndarray:
"""
Get the current state of the lidar sensor.
Returns:
np.ndarray: Current state of the sensor.
"""
return self._state
def _plot(self, ax, state, **kwargs):
"""
Plot the Lidar's detected lines using the specified state for positioning.
Creates line segments in local coordinates and applies transforms to position them.
Args:
ax: Matplotlib axis.
state: State vector [x, y, theta, ...] defining lidar position and orientation.
**kwargs: Plotting options.
"""
lines = []
if isinstance(ax, Axes3D):
# For 3D plotting, calculate actual world coordinates since transforms don't work the same way
if state is not None and len(state) > 0:
# Calculate lidar position based on object state and sensor offset
lidar_x = self.lidar_origin[0, 0]
lidar_y = self.lidar_origin[1, 0]
lidar_theta = (
self.lidar_origin[2, 0] if self.lidar_origin.shape[0] > 2 else 0
)
else:
lidar_x, lidar_y, lidar_theta = 0, 0, 0
# Create line segments in world coordinates for 3D
for i in range(self.number):
x_local = self.range_data[i] * cos(self.angle_list[i])
y_local = self.range_data[i] * sin(self.angle_list[i])
# Transform to world coordinates
x_world = (
lidar_x + x_local * cos(lidar_theta) - y_local * sin(lidar_theta)
)
y_world = (
lidar_y + x_local * sin(lidar_theta) + y_local * cos(lidar_theta)
)
start_point = np.array([lidar_x, lidar_y, 0])
end_point = np.array([x_world, y_world, 0])
segment = [start_point, end_point]
lines.append(segment)
self.laser_LineCollection = Line3DCollection(
lines, linewidths=1, colors=self.color, alpha=self.alpha, zorder=2
)
ax.add_collection3d(self.laser_LineCollection)
else:
# For 2D plotting, create line segments in local coordinates and use transforms
for i in range(self.number):
x = self.range_data[i] * cos(self.angle_list[i])
y = self.range_data[i] * sin(self.angle_list[i])
segment = [np.array([0, 0]), np.array([x, y])]
lines.append(segment)
self.laser_LineCollection = LineCollection(
lines, linewidths=1, colors=self.color, alpha=self.alpha, zorder=2
)
ax.add_collection(self.laser_LineCollection)
# Apply transform for 2D case - use provided state for positioning
if state is not None and len(state) > 0:
lidar_x = self.lidar_origin[0, 0]
lidar_y = self.lidar_origin[1, 0]
lidar_theta = (
self.lidar_origin[2, 0] if self.lidar_origin.shape[0] > 2 else 0
)
# Create transform: rotate by lidar orientation, then translate to lidar position
trans = (
mtransforms.Affine2D()
.rotate(lidar_theta)
.translate(lidar_x, lidar_y)
+ ax.transData
)
self.laser_LineCollection.set_transform(trans)
self.plot_patch_list.append(self.laser_LineCollection)
def _step_plot(self):
"""
Update the lidar visualization using matplotlib transforms based on current state.
Creates line segments in local coordinates and applies transform to position them.
"""
if not hasattr(self, "laser_LineCollection"):
return
ax = self.laser_LineCollection.axes
lines = []
if ax is None:
return
if isinstance(ax, Axes3D):
# For 3D plotting, calculate actual world coordinates
lidar_x = self.lidar_origin[0, 0]
lidar_y = self.lidar_origin[1, 0]
lidar_theta = (
self.lidar_origin[2, 0] if self.lidar_origin.shape[0] > 2 else 0
)
# Create line segments in world coordinates for 3D
for i in range(self.number):
x_local = self.range_data[i] * cos(self.angle_list[i])
y_local = self.range_data[i] * sin(self.angle_list[i])
# Transform to world coordinates
x_world = (
lidar_x + x_local * cos(lidar_theta) - y_local * sin(lidar_theta)
)
y_world = (
lidar_y + x_local * sin(lidar_theta) + y_local * cos(lidar_theta)
)
start_point = np.array([lidar_x, lidar_y, 0])
end_point = np.array([x_world, y_world, 0])
segment = [start_point, end_point]
lines.append(segment)
else:
# For 2D plotting, create line segments in local coordinates
for i in range(self.number):
x = self.range_data[i] * cos(self.angle_list[i])
y = self.range_data[i] * sin(self.angle_list[i])
segment = [np.array([0, 0]), np.array([x, y])]
lines.append(segment)
# Update line segments
self.laser_LineCollection.set_segments(lines)
# Apply transform to position the LineCollection based on current lidar origin (2D only)
if not isinstance(ax, Axes3D): # 2D case
lidar_x = self.lidar_origin[0, 0]
lidar_y = self.lidar_origin[1, 0]
lidar_theta = (
self.lidar_origin[2, 0] if self.lidar_origin.shape[0] > 2 else 0
)
# Create transform: rotate by lidar orientation, then translate to lidar position
trans = (
mtransforms.Affine2D().rotate(lidar_theta).translate(lidar_x, lidar_y)
+ ax.transData
)
self.laser_LineCollection.set_transform(trans)
[docs]
def step_plot(self):
"""
Public method to update the lidar visualization, calls _step_plot.
"""
self._step_plot()
[docs]
def set_laser_color(
self, laser_indices, laser_color: str = "blue", alpha: float = 0.3
):
"""
Set a specific color of the selected lasers.
Args:
laser_indices (list): The indices of the lasers to set the color.
laser_color (str): The color to set the selected lasers. Default is 'blue'.
alpha (float): The transparency of the lasers. Default is 0.3.
"""
current_color = [self.color] * self.number
current_alpha = [self.alpha] * self.number
for index in laser_indices:
if index < self.number:
current_color[index] = laser_color
current_alpha[index] = alpha
self.laser_LineCollection.set_color(current_color)
self.laser_LineCollection.set_alpha(current_alpha)
[docs]
def plot_clear(self):
"""
Clear the plot elements from the axis.
"""
[patch.remove() for patch in self.plot_patch_list]
[line.pop(0).remove() for line in self.plot_line_list]
[text.remove() for text in self.plot_text_list]
self.plot_patch_list = []
self.plot_line_list = []
self.plot_text_list = []
[docs]
def scan_to_pointcloud(self):
"""
Convert the Lidar scan data to a point cloud.
Returns:
np.ndarray: Point cloud (2xN).
"""
point_cloud = []
ranges = self.range_data
angles = np.linspace(self.angle_min, self.angle_max, len(ranges))
for i in range(len(ranges)):
scan_range = ranges[i]
angle = angles[i]
if scan_range < (self.range_max - 0.02):
point = np.array([[scan_range * cos(angle)], [scan_range * sin(angle)]])
point_cloud.append(point)
if len(point_cloud) == 0:
return None
return np.hstack(point_cloud)