Source code for irsim.lib.algorithm.social_force_model

"""
Social Force Model (SFM) for pedestrian / mobile-agent navigation.

Independent Python implementation of the Social Force Model described in:

    Helbing, D. & Molnar, P. (1995), *Social force model for pedestrian
    dynamics.* Phys. Rev. E 51:4282.

    Moussaid, M., Helbing, D., Garnier, S., Johansson, A., Combe, M.,
    Theraulaz, G. (2009), *Experimental study of the behavioural
    mechanisms underlying self-organization in human crowds.*
    Proc. R. Soc. B 276:2755.

The neighbor interaction is the anisotropic, velocity-aware variant from
Moussaid-Helbing (2009); the obstacle term is the Helbing-Molnar (1995)
summation form -- every nearby line obstacle contributes an exponentially
decayed push, which (unlike a nearest-segment-only rule) lets symmetric
walls cancel and keeps the agent on the centreline.

Each agent integrates Newton-like dynamics ``v += dt * a`` where ``a``
is the weighted sum of three forces:

* desired force  -- relaxation of velocity toward ``v0 * e_goal``;
* social force   -- anisotropic repulsion from each neighbor
  (Moussaid-Helbing 2009 form);
* obstacle force -- exponential repulsion summed over all line
  obstacles within ``5 * sigma_obstacle`` (Helbing-Molnar 1995 form).

Acknowledgement:

    pedsim_ros (https://github.com/srl-freiburg/pedsim_ros) and its
    vendored ``libpedsim`` by Christian Gloor were consulted as
    reference implementations while writing this module.
"""

from math import atan2, exp, sqrt


[docs] class social_force_model: """Social Force Model controller for a single agent. The interface mirrors :class:`reciprocal_vel_obs` so the two algorithms are interchangeable from a behavior method. Args: state (list): Agent state ``[x, y, vx, vy, radius, vx_des, vy_des, theta]``. neighbor_list (list): Other moving/static circular agents ``[[x, y, vx, vy, radius], ...]``. line_obs_list (list): Line obstacles ``[[x1, y1, x2, y2], ...]``. vmax (float): Speed cap applied after the velocity update. step_time (float): Integration step ``dt``. relaxation_time (float): ``tau`` in the desired-force term. force_factor_desired (float): Weight ``alpha_D`` on the desired force. force_factor_social (float): Weight ``alpha_S`` on the social force. force_factor_obstacle (float): Weight ``alpha_O`` on the obstacle force. sigma_obstacle (float): Decay length of the obstacle repulsion. lambda_importance (float): Weight of relative velocity in the interaction direction (``lambda`` in Moussaïd 2009). gamma (float): Sets the interaction range ``B = gamma * ||t||``. n_angular (float): Angular sharpness ``n`` for the sideways force. n_velocity (float): Angular sharpness ``n'`` for the slowdown force. neighbor_range (float): Max distance for an agent to count as a social-force neighbor. safety_radius (float): Personal-space buffer subtracted from the agent-to-agent distance inside the social-force exponential. ``0`` reproduces the upstream behavior (point agents). ``> 0`` shifts the decay closer-in so the repulsion saturates at ``2 * safety_radius`` of centre-to-centre clearance, effectively giving each agent a body radius for SFM. """ def __init__( self, state: list, neighbor_list: list | None = None, line_obs_list: list | None = None, vmax: float = 1.5, step_time: float = 0.1, relaxation_time: float = 0.5, force_factor_desired: float = 1.0, force_factor_social: float = 2.1, force_factor_obstacle: float = 10.0, sigma_obstacle: float = 0.8, lambda_importance: float = 2.0, gamma: float = 0.35, n_angular: float = 2.0, n_velocity: float = 3.0, neighbor_range: float = 10.0, safety_radius: float = 0.0, ) -> None: # Catch non-positive divisors early — these would otherwise raise # ``ZeroDivisionError`` deep inside the per-neighbour loop or # silently produce non-physical dynamics on the first ``cal_vel``. if relaxation_time <= 0.0: raise ValueError(f"relaxation_time must be > 0, got {relaxation_time}") if gamma <= 0.0: raise ValueError(f"gamma must be > 0, got {gamma}") if sigma_obstacle <= 0.0: raise ValueError(f"sigma_obstacle must be > 0, got {sigma_obstacle}") self.state = state self.neighbor_list = neighbor_list if neighbor_list is not None else [] self.line_obs_list = line_obs_list if line_obs_list is not None else [] self.vmax = vmax self.step_time = step_time self.relaxation_time = relaxation_time self.force_factor_desired = force_factor_desired self.force_factor_social = force_factor_social self.force_factor_obstacle = force_factor_obstacle self.sigma_obstacle = sigma_obstacle self.lambda_importance = lambda_importance self.gamma = gamma self.n_angular = n_angular self.n_velocity = n_velocity self.neighbor_range = neighbor_range self.safety_radius = safety_radius
[docs] def update( self, state: list, neighbor_list: list, line_obs_list: list | None = None, ) -> None: """Refresh the per-step inputs without re-instantiating.""" self.state = state self.neighbor_list = neighbor_list self.line_obs_list = line_obs_list if line_obs_list is not None else []
# ------------------------------------------------------------------ # Public entry point # ------------------------------------------------------------------
[docs] def cal_vel(self) -> list: """Integrate one SFM step and return the new global velocity. Returns: list[float]: Updated velocity ``[vx, vy]``, clipped to ``vmax``. """ fd = self.desired_force() fs = self.social_force() fo = self.obstacle_force() ax = ( self.force_factor_desired * fd[0] + self.force_factor_social * fs[0] + self.force_factor_obstacle * fo[0] ) ay = ( self.force_factor_desired * fd[1] + self.force_factor_social * fs[1] + self.force_factor_obstacle * fo[1] ) vx = self.state[2] + self.step_time * ax vy = self.state[3] + self.step_time * ay speed = sqrt(vx * vx + vy * vy) if speed > self.vmax and speed > 1e-9: scale = self.vmax / speed vx *= scale vy *= scale return [vx, vy]
# ------------------------------------------------------------------ # Individual force terms # ------------------------------------------------------------------
[docs] def desired_force(self) -> list: """Relaxation toward the desired velocity ``v0 * e_goal``. The desired velocity is supplied directly via ``state[5:7]``. """ vx = self.state[2] vy = self.state[3] vx_des = self.state[5] vy_des = self.state[6] return [ (vx_des - vx) / self.relaxation_time, (vy_des - vy) / self.relaxation_time, ]
[docs] def social_force(self) -> list: """Anisotropic neighbor repulsion (Moussaid-Helbing 2009). Iterates over all neighbors within ``neighbor_range`` and sums their contribution. """ x = self.state[0] y = self.state[1] vx = self.state[2] vy = self.state[3] fx = 0.0 fy = 0.0 for nb in self.neighbor_list: mx, my, mvx, mvy, _mr = nb dx = mx - x dy = my - y dist = sqrt(dx * dx + dy * dy) if dist < 1e-6 or dist > self.neighbor_range: continue d_hat_x = dx / dist d_hat_y = dy / dist # velocity diff is self - other (upstream convention) vdx = vx - mvx vdy = vy - mvy # interaction vector t = lambda * dv + d_hat tx = self.lambda_importance * vdx + d_hat_x ty = self.lambda_importance * vdy + d_hat_y t_norm = sqrt(tx * tx + ty * ty) if t_norm < 1e-9: continue t_hat_x = tx / t_norm t_hat_y = ty / t_norm B = self.gamma * t_norm # signed angle from t_hat to d_hat theta = atan2( t_hat_x * d_hat_y - t_hat_y * d_hat_x, t_hat_x * d_hat_x + t_hat_y * d_hat_y, ) # Shift the decay closer in by 2 * safety_radius so the # repulsion saturates at the safety-bubble surface instead of # at zero centre-to-centre distance. gap = max(dist - 2.0 * self.safety_radius, 0.0) f_v = -exp(-gap / B - (self.n_velocity * B * theta) ** 2) sign_theta = 1.0 if theta > 0 else (-1.0 if theta < 0 else 0.0) f_a = -sign_theta * exp(-gap / B - (self.n_angular * B * theta) ** 2) # left normal of t_hat t_perp_x = -t_hat_y t_perp_y = t_hat_x fx += f_v * t_hat_x + f_a * t_perp_x fy += f_v * t_hat_y + f_a * t_perp_y return [fx, fy]
[docs] def obstacle_force(self) -> list: """Exponential repulsion summed over all nearby line obstacles. The upstream reference uses only the single nearest obstacle, which oscillates in symmetric environments (two parallel walls flip which one is "nearest" each step). We use the Helbing-Molnar (1995) summation form instead: every segment within ``5 * sigma_obstacle`` contributes an exponentially decayed push, so symmetric walls cancel and the agent walks the centreline. The integration is also clamped against overlap (``distance < 0`` would otherwise make ``exp(-distance/sigma)`` explode). """ x = self.state[0] y = self.state[1] r = self.state[4] if not self.line_obs_list: return [0.0, 0.0] cutoff = 5.0 * self.sigma_obstacle + r fx = 0.0 fy = 0.0 for seg in self.line_obs_list: cx, cy = self._closest_point_on_segment(x, y, seg) dx = x - cx dy = y - cy dist_center = sqrt(dx * dx + dy * dy) if dist_center > cutoff: continue if dist_center < 1e-9: # overlap — push along an arbitrary axis at full strength fx += 1.0 continue # clamp at the surface so exp() can't blow up on penetration distance = max(dist_center - r, 0.0) amount = exp(-distance / self.sigma_obstacle) fx += amount * dx / dist_center fy += amount * dy / dist_center return [fx, fy]
# ------------------------------------------------------------------ # Geometry helpers # ------------------------------------------------------------------ @staticmethod def _closest_point_on_segment(x: float, y: float, seg: list) -> tuple[float, float]: """Closest point on the segment ``(x1,y1)->(x2,y2)`` to ``(x,y)``.""" x1, y1, x2, y2 = seg dx = x2 - x1 dy = y2 - y1 l2 = dx * dx + dy * dy if l2 < 1e-12: return x1, y1 t = ((x - x1) * dx + (y - y1) * dy) / l2 t = max(0.0, min(1.0, t)) return x1 + t * dx, y1 + t * dy
# ---------------------------------------------------------------------- # Standalone sanity check (run: python -m irsim.lib.algorithm.social_force_model) # ---------------------------------------------------------------------- if __name__ == "__main__": # pragma: no cover - standalone sanity demo # Two agents approaching head-on along the x axis. # Agent A at (0,0) heading +x; Agent B at (4,0.05) heading -x. state_a = [0.0, 0.0, 1.0, 0.0, 0.3, 1.0, 0.0, 0.0] state_b = [4.0, 0.05, -1.0, 0.0, 0.3] sfm = social_force_model( state=state_a, neighbor_list=[state_b], line_obs_list=[[-5.0, 1.5, 5.0, 1.5], [-5.0, -1.5, 5.0, -1.5]], vmax=1.5, step_time=0.05, ) for k in range(60): vx, vy = sfm.cal_vel() state_a[0] += sfm.step_time * vx state_a[1] += sfm.step_time * vy state_a[2] = vx state_a[3] = vy # toy update of B coming toward A state_b[0] += sfm.step_time * state_b[2] state_b[1] += sfm.step_time * state_b[3] sfm.update(state_a, [state_b], sfm.line_obs_list) if k % 10 == 0: print( f"t={k * sfm.step_time:4.2f} " f"A=({state_a[0]:5.2f},{state_a[1]:5.2f}) " f"v=({vx:5.2f},{vy:5.2f})" )