Source code for irsim.world.map

from __future__ import annotations

import warnings
from typing import Any, Optional, Protocol, Union, runtime_checkable

import numpy as np
import shapely
from shapely.geometry import Point

from .grid_map_generator_base import GridMapGenerator
from .image_map_generator import ImageGridGenerator
from .obstacle_map import (
    CELL_CENTER_OFFSET,
    COLLISION_RADIUS_FACTOR,
    OCCUPANCY_THRESHOLD,
    ObstacleMap,
)
from .perlin_map_generator import PerlinGridGenerator

# ---------------------------------------------------------------------------
# Typed protocol - structural contract expected by all path planners
# ---------------------------------------------------------------------------


[docs] @runtime_checkable class EnvGridMap(Protocol): """Structural type accepted by all path planners. Any object that exposes the attributes below (including :class:`Map`) is a valid ``EnvGridMap``. Planners should annotate their *env_map* parameter with this protocol instead of the concrete :class:`Map` class to support duck-typed map objects. Collision precedence (adopted by every planner): 1. Grid lookup (O(1) per cell) when *grid* is not ``None``; if the grid reports occupied, the point is in collision. 2. When the grid reports free or is unavailable, Shapely geometry intersection with *obstacle_list* is used. Planners therefore combine grid and obstacle_list when both are present. """ width: float height: float resolution: float obstacle_list: list grid: np.ndarray | None world_offset: tuple[float, float] @property def grid_resolution(self) -> tuple[float, float] | None: """Actual cell size ``(x_reso, y_reso)`` derived from *grid* shape and world size.""" ...
[docs] def grid_occupied( self, x: float, y: float, margin_x: float = 0.0, margin_y: float = 0.0, threshold: float = 50.0, ) -> bool | None: """Check if any grid cell within the bounding box is occupied.""" ...
[docs] def is_collision(self, geometry) -> bool: """Check collision for a Shapely geometry against the map.""" ...
def _downsample_occupancy_grid( grid: np.ndarray, width: float, height: float, target_resolution: float, ) -> np.ndarray: """Downsample an occupancy grid to a coarser resolution (conservative: block max). Each coarse cell is the max of the fine cells it covers, so any obstacle in the block keeps the coarse cell occupied. Output dtype and 0-100 range are preserved. Args: grid: Fine-resolution occupancy grid (0-100), shape (fine_nx, fine_ny). width: World width in metres (x). height: World height in metres (y). target_resolution: Desired cell size in metres (same for x and y). Returns: Downsampled grid, shape (coarse_nx, coarse_ny), same dtype as grid. """ fine_nx, fine_ny = grid.shape[0], grid.shape[1] coarse_nx = max(1, round(width / target_resolution)) coarse_ny = max(1, round(height / target_resolution)) out = np.zeros((coarse_nx, coarse_ny), dtype=grid.dtype) for ic in range(coarse_nx): i_lo = int(ic * fine_nx / coarse_nx) i_hi = int((ic + 1) * fine_nx / coarse_nx) if i_hi <= i_lo: i_hi = i_lo + 1 i_hi = min(i_hi, fine_nx) for jc in range(coarse_ny): j_lo = int(jc * fine_ny / coarse_ny) j_hi = int((jc + 1) * fine_ny / coarse_ny) if j_hi <= j_lo: j_hi = j_lo + 1 j_hi = min(j_hi, fine_ny) out[ic, jc] = np.max(grid[i_lo:i_hi, j_lo:j_hi]) return out def _grid_collision_geometry( grid: np.ndarray, grid_reso: tuple[float, float], geometry, world_offset: tuple[float, float] = (0.0, 0.0), ) -> bool: """Check collision of a Shapely geometry against an occupancy grid. Uses the same logic as ObstacleMap.check_grid_collision. """ if grid is None: return False minx, miny, maxx, maxy = geometry.bounds x_reso, y_reso = grid_reso offset_x, offset_y = world_offset i_min = max(0, int((minx - offset_x) / x_reso)) i_max = min(grid.shape[0] - 1, int((maxx - offset_x) / x_reso)) j_min = max(0, int((miny - offset_y) / y_reso)) j_max = min(grid.shape[1] - 1, int((maxy - offset_y) / y_reso)) if i_min > i_max or j_min > j_max: return False collision_radius = max(x_reso, y_reso) * COLLISION_RADIUS_FACTOR for i in range(i_min, i_max + 1): for j in range(j_min, j_max + 1): if grid[i, j] > OCCUPANCY_THRESHOLD: cell_x = offset_x + (i + CELL_CENTER_OFFSET) * x_reso cell_y = offset_y + (j + CELL_CENTER_OFFSET) * y_reso cell_center = Point(cell_x, cell_y) if geometry.distance(cell_center) <= collision_radius: return True return False
[docs] def resolve_obstacle_map( obstacle_map: str | np.ndarray | dict[str, Any] | None = None, world_width: float | None = None, world_height: float | None = None, ) -> np.ndarray | None: """Resolve obstacle_map to None or a float64 occupancy grid ndarray. Accepted types: ``None``, ndarray, or a generator spec **dict** with ``name`` (e.g. ``"image"`` or ``"perlin"``). For backward compatibility, a **str** is treated as ``{"name": "image", "path": obstacle_map}``. - ``name == "image"``: only ``path`` is required; grid size comes from the image. - Other names (e.g. ``"perlin"``): require ``resolution`` and world size (``world_width`` / ``world_height``); grid size = world size / resolution. Returns: None, or ndarray (0-100 grid, dtype float64). """ if obstacle_map is None: return None if isinstance(obstacle_map, np.ndarray): return np.asarray(obstacle_map, dtype=np.float64) if isinstance(obstacle_map, str): obstacle_map = {"name": "image", "path": obstacle_map} if isinstance(obstacle_map, dict) and obstacle_map.get("name"): name = obstacle_map.get("name") if name == "image": path = obstacle_map.get("path") if not path: raise ValueError("obstacle_map image generator requires 'path'.") gen = ImageGridGenerator(path=path).generate() return np.asarray(gen.grid, dtype=np.float64) if world_width is None or world_height is None: raise ValueError( "obstacle_map generator spec (non-image) requires world_width and " "world_height (passed by World.gen_grid_map)." ) return build_grid_from_generator( obstacle_map, world_width=world_width, world_height=world_height, ) raise TypeError( "obstacle_map must be None, an ndarray, or a generator spec dict with 'name'." )
[docs] def build_grid_from_generator( spec: dict[str, Any], world_width: float, world_height: float, ) -> np.ndarray: """Build a grid map from a YAML grid_generator spec (name + resolution + params). Grid size is always computed from world size and ``resolution`` (meters per cell): (world_width / resolution, world_height / resolution) cells. Args: spec: Dict from YAML, e.g. ``{name: perlin, resolution: 0.1, ...}``. world_width: World width in meters. world_height: World height in meters. Returns: Occupancy grid (0-100) as float64 ndarray. """ name = spec.get("name") if not name or name not in GridMapGenerator.registry: known = ", ".join(GridMapGenerator.registry) raise ValueError( f"Unknown or missing grid_generator name: {name!r}. Known: {known}" ) resolution = spec.get("resolution") if resolution is None: raise ValueError( "obstacle_map generator spec must include 'resolution' (meters per cell)." ) grid_width = max(1, round(float(world_width) / float(resolution))) grid_height = max(1, round(float(world_height) / float(resolution))) cls = GridMapGenerator.registry[name] params = { k: v for k, v in spec.items() if k not in ("name", "resolution") and k in cls.yaml_param_names } params["width"] = grid_width params["height"] = grid_height return np.asarray(cls(**params).generate().grid, dtype=np.float64)
[docs] class Map: """Map data container for navigation / path-planning. Satisfies the :class:`EnvGridMap` protocol so that it can be passed to any planner that expects ``EnvGridMap``. Collision precedence (shared by all planners): 1. Grid lookup (O(1) per cell) when *grid* is not ``None``. 2. Shapely geometry intersection when *grid* is unavailable. """ def __init__( self, width: float = 10, height: float = 10, resolution: float = 0.1, obstacle_list: list | None = None, grid: np.ndarray | None = None, world_offset: tuple[float, float] | list[float] | None = None, ): """ Initialize the Map. Args: width: Width of the world (metres). height: Height of the world (metres). resolution: Planner discretisation cell size (metres/cell). obstacle_list: Obstacle objects for Shapely collision detection. grid: Occupancy grid (0-100) for grid-based collision detection. world_offset: World origin (x, y) for grid indexing. When non-zero, geometry and positions are interpreted in world coordinates so grid lookups align with ObstacleMap. Default (0, 0). """ if obstacle_list is None: obstacle_list = [] self.width = width self.height = height self.resolution = resolution self.obstacle_list = obstacle_list self.grid = grid if world_offset is None: world_offset = (0.0, 0.0) self.world_offset = (float(world_offset[0]), float(world_offset[1])) self._obstacles_prepared: bool = False # Warn when the user-specified resolution diverges from the actual # grid cell size by more than 5 %. if grid is not None: gr = self.grid_resolution if gr is not None: gx, _gy = gr if abs(resolution - gx) / max(resolution, gx) > 0.05: warnings.warn( f"Map.resolution ({resolution}) differs from grid " f"cell size ({gx:.4f} x {_gy:.4f}). Grid-based " f"planners will use grid_resolution for lookups.", stacklevel=2, ) @property def grid_resolution(self) -> tuple[float, float] | None: """Actual cell size ``(x_reso, y_reso)`` derived from *grid* shape and world size. Returns ``None`` when no grid is present. """ if self.grid is None: return None return ( self.width / self.grid.shape[0], self.height / self.grid.shape[1], )
[docs] def grid_occupied( self, x: float, y: float, margin_x: float = 0.0, margin_y: float = 0.0, threshold: float = 50.0, ) -> bool | None: """Check if any grid cell within the bounding box around ``(x, y)`` is occupied. The bounding box extends *margin_x* / *margin_y* (in world metres) in each direction. Grid cells whose occupancy exceeds *threshold* are considered occupied. Returns: ``None`` when no grid is present (caller should fall back to Shapely or another collision method). ``True`` / ``False`` otherwise. Points outside the world bounds are treated as occupied so planners cannot escape the map. """ if self.grid is None: return None gr = self.grid_resolution if gr is None: return None # defensive; grid is None already caught above ox, oy = self.world_offset if x < ox or x >= ox + self.width or y < oy or y >= oy + self.height: return True # out-of-bounds: treat as occupied rx, ry = gr gx = int((x - ox) / rx) gy = int((y - oy) / ry) rows, cols = self.grid.shape mx = max(1, int(np.ceil(margin_x / rx))) if margin_x > 0 else 0 my = max(1, int(np.ceil(margin_y / ry))) if margin_y > 0 else 0 return bool( np.any( self.grid[ max(0, gx - mx) : min(rows, gx + mx + 1), max(0, gy - my) : min(cols, gy + my + 1), ] > threshold ) )
[docs] def is_collision(self, geometry) -> bool: """Check collision for a Shapely geometry against grid + obstacles. Collision precedence: 1. Grid lookup when *grid* is not None; if occupied, collision. 2. When the grid reports free or is unavailable, Shapely geometry intersection with *obstacle_list*. """ minx, miny, maxx, maxy = geometry.bounds ox, oy = self.world_offset if ( minx < ox or miny < oy or maxx >= ox + self.width or maxy >= oy + self.height ): return True if self.grid is not None: gr = self.grid_resolution if gr is not None and _grid_collision_geometry( self.grid, gr, geometry, world_offset=self.world_offset ): return True if not self.obstacle_list: return False if not self._obstacles_prepared: for obj in self.obstacle_list: shapely.prepare(obj._geometry) self._obstacles_prepared = True return any( shapely.intersects(geometry, obj._geometry) for obj in self.obstacle_list )
__all__ = [ "EnvGridMap", "GridMapGenerator", "ImageGridGenerator", "Map", "ObstacleMap", "PerlinGridGenerator", "build_grid_from_generator", "resolve_obstacle_map", ]