Source code for irsim.lib.behavior.group_behavior_methods
from typing import Any, Optional
import numpy as np
from irsim.config import world_param
from irsim.lib.behavior.behavior_registry import register_group_behavior_class
from irsim.world.object_base import ObjectBase
[docs]
@register_group_behavior_class("omni", "orca")
def beh_omni_orca(members: list[ObjectBase], **kwargs: Any):
"""
Registered initializer returning a class-based handler for ORCA.
"""
return OrcaGroupBehavior(members, **kwargs)
[docs]
class OrcaGroupBehavior:
"""
Class-based ORCA group behavior with one-time initialization.
"""
def __init__(
self,
members: list[ObjectBase],
neighborDist: float = 15.0,
maxNeighbors: int = 10,
timeHorizon: float = 20.0,
timeHorizonObst: float = 10.0,
safe_radius: float = 0.1,
maxSpeed: Optional[float] = None,
**kwargs: Any,
) -> None:
self._neighborDist = neighborDist
self._maxNeighbors = maxNeighbors
self._timeHorizon = timeHorizon
self._timeHorizonObst = timeHorizonObst
self._safe_radius = safe_radius
self._maxSpeed = maxSpeed
self._sim = self._build_sim(members, **kwargs)
def _ensure_pyrvo(self):
try:
import pyrvo # type: ignore
return pyrvo
except ImportError as e:
raise ImportError(
"pyrvo is not installed. Please install it using `pip install pyrvo`."
) from e
def _build_sim(self, members: list[ObjectBase], **kwargs: Any):
"""
Add members to the simulator.
Args:
members: the members of the group
kwargs: the keyword arguments
Returns:
pyrvo.RVOSimulator: the simulator
"""
pyrvo = self._ensure_pyrvo()
sim = pyrvo.RVOSimulator()
sim.set_time_step(world_param.step_time)
for member in members:
agent_max_speed = (
float(self._maxSpeed)
if self._maxSpeed is not None
else member.max_speed
)
sim.add_agent(
member.state[:2, 0].tolist(),
self._neighborDist,
self._maxNeighbors,
self._timeHorizon,
self._timeHorizonObst,
float(member.radius + self._safe_radius),
agent_max_speed,
)
return sim
def __call__(self, members: list[ObjectBase], **kwargs: Any) -> list[np.ndarray]:
"""
Generate the velocity for the group.
Args:
members: the members of the group
kwargs: the keyword arguments
Returns:
list[np.ndarray]: the velocities of the members
"""
# If agent count mismatches, rebuild
try:
if self._sim.get_num_agents() != len(members):
self._sim = self._build_sim(members, **kwargs)
except Exception:
self._sim = self._build_sim(members, **kwargs)
for i, member in enumerate(members):
self._sim.set_agent_pref_velocity(
i, member.get_desired_omni_vel(normalized=True).flatten().tolist()
)
self._sim.set_agent_position(i, member.state[:2, 0].tolist())
self._sim.do_step()
return [
np.c_[list(self._sim.get_agent_velocity(i).to_tuple())]
for i in range(self._sim.get_num_agents())
]