from math import pi, cos, sin
import numpy as np
import matplotlib.transforms as mtransforms
from shapely import MultiLineString, Point, is_valid, prepare
from irsim.util.util import (
geometry_transform,
transform_point_with_state,
get_transform,
)
from irsim.global_param import env_param
from matplotlib.collections import LineCollection
from shapely.strtree import STRtree
from shapely.ops import unary_union
from mpl_toolkits.mplot3d import Axes3D
from mpl_toolkits.mplot3d.art3d import Line3DCollection
from typing import Optional
[docs]
class Lidar2D:
"""
Simulates a 2D Lidar sensor for detecting obstacles in the environment.
Args:
state (np.ndarray): Initial state of the sensor.
obj_id (int): ID of the associated object.
range_min (float): Minimum detection range.
range_max (float): Maximum detection range.
angle_range (float): Total angle range of the sensor.
number (int): Number of laser beams.
scan_time (float): Time taken for one complete scan.
noise (bool): Whether noise is added to measurements.
std (float): Standard deviation for range noise.
angle_std (float): Standard deviation for angle noise.
offset (list): Offset of the sensor from the object's position.
alpha (float): Transparency for plotting.
has_velocity (bool): Whether the sensor measures velocity.
**kwargs: Additional arguments.
color (str): Color of the sensor.
Attr:
- sensor_type (str): Type of sensor ("lidar2d"). Default is "lidar2d".
- range_min (float): Minimum detection range in meters. Default is 0.
- range_max (float): Maximum detection range in meters. Default is 10.
- angle_range (float): Total angle range of the sensor in radians. Default is pi.
- angle_min (float): Starting angle of the sensor's scan relative to the forward direction in radians. Calculated as -angle_range / 2.
- angle_max (float): Ending angle of the sensor's scan relative to the forward direction in radians. Calculated as angle_range / 2.
- angle_inc (float): Angular increment between each laser beam in radians. Calculated as angle_range / number.
- number (int): Number of laser beams. Default is 100.
- scan_time (float): Time taken to complete one full scan in seconds. Default is 0.1.
- noise (bool): Whether to add noise to the measurements. Default is False.
- std (float): Standard deviation for range noise in meters. Effective only if `noise` is True. Default is 0.2.
- angle_std (float): Standard deviation for angle noise in radians. Effective only if `noise` is True. Default is 0.02.
- offset (np.ndarray): Offset of the sensor relative to the object's position, formatted as [x, y, theta]. Default is [0, 0, 0].
- lidar_origin (np.ndarray): Origin position of the Lidar sensor, considering offset and the object's state.
- alpha (float): Transparency level for plotting the laser beams. Default is 0.3.
- has_velocity (bool): Whether the sensor measures the velocity of detected points. Default is False.
- velocity (np.ndarray): Velocity data for each laser beam, formatted as (2, number) array. Effective only if `has_velocity` is True. Initialized to zeros.
- time_inc (float): Time increment for each scan, simulating the sensor's time resolution. Default is 5e-4.
- range_data (np.ndarray): Array storing range data for each laser beam. Initialized to `range_max` for all beams.
- angle_list (np.ndarray): Array of angles corresponding to each laser beam, distributed linearly from `angle_min` to `angle_max`.
- color (str): Color of the sensor's representation in visualizations. Default is "r" (red).
- obj_id (int): ID of the associated object, used to differentiate between multiple sensors or objects in the environment. Default is 0.
- plot_patch_list (list): List storing plot patches (e.g., line collections) for visualization purposes.
- plot_line_list (list): List storing plot lines for visualization purposes.
- plot_text_list (list): List storing plot text elements for visualization purposes.
"""
def __init__(
self,
state: np.ndarray = None,
obj_id: int = 0,
range_min: float = 0,
range_max: float = 10,
angle_range: float = pi,
number: int = 100,
scan_time: float = 0.1,
noise: bool = False,
std: float = 0.2,
angle_std: float = 0.02,
offset: list[float] = [0, 0, 0],
alpha: float = 0.3,
has_velocity: bool = False,
**kwargs,
) -> None:
"""
Initialize the Lidar2D sensor.
"""
self.sensor_type = "lidar2d"
self.range_min = range_min
self.range_max = range_max
self.angle_range = angle_range
self.angle_min = -angle_range / 2
self.angle_max = angle_range / 2
self.angle_inc = angle_range / number
self.number = number
self.scan_time = scan_time
self.noise = noise
self.std = std
self.angle_std = angle_std
self.offset = np.c_[offset]
self.alpha = alpha
self.has_velocity = has_velocity
self.velocity = np.zeros((2, number))
self.time_inc = (angle_range / (2 * pi)) * scan_time / number
self.range_data = range_max * np.ones(number)
self.angle_list = np.linspace(self.angle_min, self.angle_max, num=number)
self._state = state
self.init_geometry(self._state)
self.color = kwargs.get("color", "r")
self.obj_id = obj_id
self.plot_patch_list = []
self.plot_line_list = []
self.plot_text_list = []
[docs]
def init_geometry(self, state):
"""
Initialize the Lidar's scanning geometry.
Args:
state (np.ndarray): Current state of the sensor.
"""
segment_point_list = []
for i in range(self.number):
x = self.range_data[i] * cos(self.angle_list[i])
y = self.range_data[i] * sin(self.angle_list[i])
point0 = np.zeros((1, 2))
point = np.array([[x], [y]]).T
segment = np.concatenate((point0, point), axis=0)
segment_point_list.append(segment)
self.origin_state = self.offset
geometry = MultiLineString(segment_point_list)
self._original_geometry = geometry_transform(geometry, self.origin_state)
self.lidar_origin = transform_point_with_state(self.offset, state)
self._geometry = geometry_transform(self._original_geometry, state)
self._init_geometry = self._geometry
[docs]
def step(self, state):
"""
Update the Lidar's state and process intersections with environment objects.
Args:
state (np.ndarray): New state of the sensor.
"""
self._state = state
self.lidar_origin = transform_point_with_state(self.offset, self._state)
new_geometry = geometry_transform(self._original_geometry, self._state)
prepare(new_geometry)
new_geometry, intersect_indices = self.laser_geometry_process(new_geometry)
if len(intersect_indices) == 0:
self._geometry = new_geometry
self.calculate_range()
else:
origin_point = Point(self.lidar_origin[0, 0], self.lidar_origin[1, 0])
filtered_geoms = [
g for g in new_geometry.geoms if g.intersects(origin_point)
]
self._geometry = MultiLineString(filtered_geoms)
self.calculate_range_vel(intersect_indices)
[docs]
def laser_geometry_process(self, lidar_geometry):
'''
Find the intersected objects and return the intersected indices with the lidar geometry
Args:
lidar_geometry (shapely.geometry.MultiLineString): The geometry of the lidar.
Returns:
list: The indices of the intersected objects.
'''
object_tree = env_param.GeometryTree
objects = env_param.objects
geometries = [obj._geometry for obj in objects]
potential_geometries_index = object_tree.query(lidar_geometry)
geometries_to_subtract = []
intersect_indices = []
for geom_index in potential_geometries_index:
geo = geometries[geom_index]
obj = objects[geom_index]
if obj._id == self.obj_id or not is_valid(obj._geometry) or obj.unobstructed:
continue
if obj.shape == 'map':
potential_intersections = obj.geometry_tree.query(lidar_geometry)
filtered_lines = [obj.linestrings[i] for i in potential_intersections]
filtered_multi_lines = MultiLineString(filtered_lines)
# prepare(filtered_multi_lines)
if lidar_geometry.intersects(filtered_multi_lines):
geometries_to_subtract.append(filtered_multi_lines)
intersect_indices.append(geom_index)
else:
if lidar_geometry.intersects(geo):
geometries_to_subtract.append(geo)
intersect_indices.append(geom_index)
if geometries_to_subtract:
merged_geometry = unary_union(geometries_to_subtract)
lidar_geometry = lidar_geometry.difference(merged_geometry)
return lidar_geometry, intersect_indices
[docs]
def calculate_range(self):
"""
Calculate the range data from the current geometry.
"""
for index, l in enumerate(self._geometry.geoms):
# self.range_data[index] = l.length
if self.noise:
self.range_data[index] = l.length + np.random.normal(0, self.std)
else:
self.range_data[index] = l.length
[docs]
def calculate_range_vel(self, intersect_index):
"""
Calculate the range data and velocities from intersected geometries.
Args:
intersect_index (list): List of intersected object indices.
"""
for index, l in enumerate(self._geometry.geoms):
# self.range_data[index] = l.length
self.range_data[index] = (
l.length + np.random.normal(0, self.std) if self.noise else l.length
)
if self.has_velocity:
if l.length < self.range_max - 0.02:
for index_obj in intersect_index:
obj = env_param.objects[index_obj]
if obj.geometry.distance(l) < 0.1:
self.velocity[:, index : index + 1] = obj.velocity_xy
break
[docs]
def get_scan(self):
"""
Get the 2D lidar scan data. refer to the ros topic scan: http://docs.ros.org/en/melodic/api/sensor_msgs/html/msg/LaserScan.html
Returns:
dict: Scan data including angles, ranges, and velocities.
"""
scan_data = {}
scan_data["angle_min"] = self.angle_min
scan_data["angle_max"] = self.angle_max
scan_data["angle_increment"] = self.angle_inc
scan_data["time_increment"] = self.time_inc
scan_data["scan_time"] = self.scan_time
scan_data["range_min"] = self.range_min
scan_data["range_max"] = self.range_max
scan_data["ranges"] = self.range_data
scan_data["intensities"] = None
scan_data["velocity"] = self.velocity
return scan_data
[docs]
def get_points(self):
"""
Convert scan data to a point cloud.
Returns:
np.ndarray: Point cloud (2xN).
"""
return self.scan_to_pointcloud()
[docs]
def get_offset(self):
"""
Get the sensor's offset.
Returns:
list: Offset as a list.
"""
return np.squeeze(self.offset).tolist()
[docs]
def plot(self, ax, state: Optional[np.ndarray] = None, **kwargs):
"""
Plot the Lidar's detected lines on a given axis.
"""
if state is None:
state = self.state
self._plot(ax, state, **kwargs)
def _init_plot(self, ax, **kwargs):
"""
Initialize the plot for the Lidar.
"""
self._plot(ax, self.origin_state, **kwargs)
@property
def state(self) -> np.ndarray:
"""
Get the current state of the lidar sensor.
Returns:
np.ndarray: Current state of the sensor.
"""
return self._state
def _plot(self, ax, state, **kwargs):
"""
Plot the Lidar's detected lines using the specified state for positioning.
Creates line segments in local coordinates and applies transforms to position them.
Args:
ax: Matplotlib axis.
state: State vector [x, y, theta, ...] defining lidar position and orientation.
**kwargs: Plotting options.
"""
lines = []
if isinstance(ax, Axes3D):
# For 3D plotting, calculate actual world coordinates since transforms don't work the same way
if state is not None and len(state) > 0:
# Calculate lidar position based on object state and sensor offset
lidar_x = self.lidar_origin[0, 0]
lidar_y = self.lidar_origin[1, 0]
lidar_theta = self.lidar_origin[2, 0] if self.lidar_origin.shape[0] > 2 else 0
else:
lidar_x, lidar_y, lidar_theta = 0, 0, 0
# Create line segments in world coordinates for 3D
for i in range(self.number):
x_local = self.range_data[i] * cos(self.angle_list[i])
y_local = self.range_data[i] * sin(self.angle_list[i])
# Transform to world coordinates
x_world = lidar_x + x_local * cos(lidar_theta) - y_local * sin(lidar_theta)
y_world = lidar_y + x_local * sin(lidar_theta) + y_local * cos(lidar_theta)
start_point = np.array([lidar_x, lidar_y, 0])
end_point = np.array([x_world, y_world, 0])
segment = [start_point, end_point]
lines.append(segment)
self.laser_LineCollection = Line3DCollection(
lines, linewidths=1, colors=self.color, alpha=self.alpha, zorder=2
)
ax.add_collection3d(self.laser_LineCollection)
else:
# For 2D plotting, create line segments in local coordinates and use transforms
for i in range(self.number):
x = self.range_data[i] * cos(self.angle_list[i])
y = self.range_data[i] * sin(self.angle_list[i])
segment = [np.array([0, 0]), np.array([x, y])]
lines.append(segment)
self.laser_LineCollection = LineCollection(
lines, linewidths=1, colors=self.color, alpha=self.alpha, zorder=2
)
ax.add_collection(self.laser_LineCollection)
# Apply transform for 2D case - use provided state for positioning
if state is not None and len(state) > 0:
lidar_x = self.lidar_origin[0, 0]
lidar_y = self.lidar_origin[1, 0]
lidar_theta = self.lidar_origin[2, 0] if self.lidar_origin.shape[0] > 2 else 0
# Create transform: rotate by lidar orientation, then translate to lidar position
trans = (
mtransforms.Affine2D().rotate(lidar_theta).translate(lidar_x, lidar_y)
+ ax.transData
)
self.laser_LineCollection.set_transform(trans)
self.plot_patch_list.append(self.laser_LineCollection)
def _step_plot(self):
"""
Update the lidar visualization using matplotlib transforms based on current state.
Creates line segments in local coordinates and applies transform to position them.
"""
if not hasattr(self, 'laser_LineCollection'):
return
ax = self.laser_LineCollection.axes
lines = []
if isinstance(ax, Axes3D):
# For 3D plotting, calculate actual world coordinates
lidar_x = self.lidar_origin[0, 0]
lidar_y = self.lidar_origin[1, 0]
lidar_theta = self.lidar_origin[2, 0] if self.lidar_origin.shape[0] > 2 else 0
# Create line segments in world coordinates for 3D
for i in range(self.number):
x_local = self.range_data[i] * cos(self.angle_list[i])
y_local = self.range_data[i] * sin(self.angle_list[i])
# Transform to world coordinates
x_world = lidar_x + x_local * cos(lidar_theta) - y_local * sin(lidar_theta)
y_world = lidar_y + x_local * sin(lidar_theta) + y_local * cos(lidar_theta)
start_point = np.array([lidar_x, lidar_y, 0])
end_point = np.array([x_world, y_world, 0])
segment = [start_point, end_point]
lines.append(segment)
else:
# For 2D plotting, create line segments in local coordinates
for i in range(self.number):
x = self.range_data[i] * cos(self.angle_list[i])
y = self.range_data[i] * sin(self.angle_list[i])
segment = [np.array([0, 0]), np.array([x, y])]
lines.append(segment)
# Update line segments
self.laser_LineCollection.set_segments(lines)
# Apply transform to position the LineCollection based on current lidar origin (2D only)
if not isinstance(ax, Axes3D): # 2D case
lidar_x = self.lidar_origin[0, 0]
lidar_y = self.lidar_origin[1, 0]
lidar_theta = self.lidar_origin[2, 0] if self.lidar_origin.shape[0] > 2 else 0
# Create transform: rotate by lidar orientation, then translate to lidar position
trans = (
mtransforms.Affine2D().rotate(lidar_theta).translate(lidar_x, lidar_y)
+ ax.transData
)
self.laser_LineCollection.set_transform(trans)
[docs]
def step_plot(self):
"""
Public method to update the lidar visualization, calls _step_plot.
"""
self._step_plot()
[docs]
def set_laser_color(self, laser_indices, laser_color: str = 'blue', alpha: float = 0.3):
"""
Set a specific color of the selected lasers.
Args:
laser_indices (list): The indices of the lasers to set the color.
laser_color (str): The color to set the selected lasers. Default is 'blue'.
alpha (float): The transparency of the lasers. Default is 0.3.
"""
current_color = [self.color] * self.number
current_alpha = [self.alpha] * self.number
for index in laser_indices:
if index < self.number:
current_color[index] = laser_color
current_alpha[index] = alpha
self.laser_LineCollection.set_color(current_color)
self.laser_LineCollection.set_alpha(current_alpha)
[docs]
def plot_clear(self):
"""
Clear the plot elements from the axis.
"""
[patch.remove() for patch in self.plot_patch_list]
[line.pop(0).remove() for line in self.plot_line_list]
[text.remove() for text in self.plot_text_list]
self.plot_patch_list = []
self.plot_line_list = []
self.plot_text_list = []
[docs]
def scan_to_pointcloud(self):
"""
Convert the Lidar scan data to a point cloud.
Returns:
np.ndarray: Point cloud (2xN).
"""
point_cloud = []
ranges = self.range_data
angles = np.linspace(self.angle_min, self.angle_max, len(ranges))
for i in range(len(ranges)):
scan_range = ranges[i]
angle = angles[i]
if scan_range < (self.range_max - 0.02):
point = np.array([[scan_range * cos(angle)], [scan_range * sin(angle)]])
point_cloud.append(point)
if len(point_cloud) == 0:
return None
point_array = np.hstack(point_cloud)
return point_array