Source code for irsim.world.world

import os
import warnings
from typing import TYPE_CHECKING, Any, Optional

import numpy as np

from irsim.world.map import Map, _downsample_occupancy_grid, resolve_obstacle_map

if TYPE_CHECKING:
    from irsim.config.world_param import WorldParam


[docs] class World: """ Represents the main simulation environment, managing objects and maps. Attributes: name (str): Name of the world. height (float): Height of the world. width (float): Width of the world. step_time (float): Time interval between steps. sample_time (float): Time interval between samples. offset (list): Offset for the world's position. control_mode (str): Control mode ('auto' or 'keyboard'). collision_mode (str): Collision mode ('stop', , 'unobstructed'). obstacle_map: ``None``, image path (str), grid ndarray, or generator spec dict. mdownsample (int): Downsampling factor for the obstacle map. status: Status of the world and objects. plot: Plot configuration for the world. """ def __init__( self, name: str | None = "world", height: float = 10, width: float = 10, step_time: float = 0.1, sample_time: float | None = None, offset: list[float] | None = None, control_mode: str = "auto", collision_mode: str = "stop", obstacle_map: Any | None = None, mdownsample: int = 1, plot: dict[str, Any] | None = None, status: str = "None", world_param_instance: Optional["WorldParam"] = None, **kwargs: Any, ) -> None: """ Initialize the world object. Parameters: name (str): Name of the world. height (float): Height of the world. width (float): Width of the world. step_time (float): Time interval between steps. sample_time (float): Time interval between samples. offset (list): Offset for the world's position. control_mode (str): Control mode ('auto' or 'keyboard'). collision_mode (str): Collision mode ('stop', , 'unobstructed'). obstacle_map: ``None``, image path (str), grid ndarray, or generator spec dict. mdownsample (int): Downsampling factor for the obstacle map. plot (dict): Plot configuration. status (str): Initial simulation status. world_param_instance: Optional WorldParam instance. If provided, uses this instance for param storage; otherwise falls back to global. """ # Store world_param instance or fallback to global if world_param_instance is not None: self._wp = world_param_instance else: from irsim.config import world_param self._wp = world_param # basic properties if offset is None: offset = [0, 0] if plot is None: plot = {} self.name = os.path.basename(name or "world").split(".")[0] self.height = height self.width = width self.step_time = step_time self.sample_time = sample_time if sample_time is not None else step_time self.offset = offset self.count = 0 self.sampling = True self._wp.step_time = step_time self.x_range = [self.offset[0], self.offset[0] + self.width] self.y_range = [self.offset[1], self.offset[1] + self.height] # obstacle map self.grid_map, self.obstacle_index, self.obstacle_positions = self.gen_grid_map( obstacle_map, mdownsample ) # visualization self.plot_parse = plot self.status = status # mode self._wp.control_mode = control_mode self._wp.collision_mode = collision_mode
[docs] def step(self) -> None: """ Advance the simulation by one step. """ self.count += 1 self.sampling = self.count % (int(self.sample_time / self.step_time)) == 0 self._wp.time = self.time self._wp.count = self.count
[docs] def gen_grid_map( self, obstacle_map: Any | None = None, mdownsample: int = 1, ) -> tuple: """Generate a grid map for obstacles. The *obstacle_map* value is resolved to a float64 ndarray by :pyfunc:`irsim.world.map.resolve_obstacle_map`. Accepted types: ``None``, path string (image), ndarray, or generator spec dict. Args: obstacle_map: ``None``, path string, ndarray, or generator spec dict. mdownsample (int): Downsampling factor. Returns: tuple: ``(grid_map, obstacle_index, obstacle_positions)``. """ grid_map = resolve_obstacle_map( obstacle_map, world_width=self.width, world_height=self.height, ) if grid_map is not None: grid_map = grid_map[::mdownsample, ::mdownsample] x_reso = self.width / grid_map.shape[0] y_reso = self.height / grid_map.shape[1] self.reso = np.array([[x_reso], [y_reso]]) obstacle_index = np.array(np.where(grid_map > 50)) obstacle_positions = obstacle_index * self.reso else: obstacle_index = None obstacle_positions = None self.reso = np.zeros((2, 1)) return grid_map, obstacle_index, obstacle_positions
[docs] def get_map( self, resolution: float = 0.1, obstacle_list: list[Any] | None = None ) -> "Map": """ Get the map of the world with the given resolution. When *resolution* is coarser than the obstacle grid, the grid is downsampled (conservative: any obstacle in a block marks the block) so that planning and collision use the same grid. When *resolution* is finer than the grid, no upsampling is done; planning uses the grid resolution and a warning is emitted. """ world_offset = (float(self.offset[0]), float(self.offset[1])) grid = self.grid_map res = resolution if grid is not None: if not (np.isfinite(res) and res > 0): res = float(self.width / grid.shape[0]) warnings.warn( f"resolution must be positive and finite; using grid " f"resolution {res}.", stacklevel=2, ) else: current_rx = self.width / grid.shape[0] current_ry = self.height / grid.shape[1] thresh_coarser = 1.05 thresh_finer = 0.95 if res > max(current_rx, current_ry) * thresh_coarser: grid = _downsample_occupancy_grid( grid, self.width, self.height, res ) warnings.warn( f"Planning grid downsampled from ({self.grid_map.shape[0]}, " f"{self.grid_map.shape[1]}) at {current_rx:.4f}m/cell to " f"({grid.shape[0]}, {grid.shape[1]}) at {res}m/cell.", stacklevel=2, ) elif res < min(current_rx, current_ry) * thresh_finer: warnings.warn( f"Requested resolution ({res}) is finer than the " f"obstacle grid ({current_rx:.4f} x {current_ry:.4f}). " "Planning will use the grid resolution.", stacklevel=2, ) return Map( self.width, self.height, res, obstacle_list, grid, world_offset=world_offset, )
[docs] def reset(self) -> None: """ Reset the world simulation. """ self._wp.count = 0 self.count = 0
@property def time(self) -> float: """ Get the current simulation time. Returns: float: Current time based on steps and step_time. """ return round(self.count * self.step_time, 2) @property def buffer_reso(self) -> float: """ Get the maximum resolution of the world. Returns: float: Maximum resolution. """ return np.max(self.reso)