Skip to content

Commit

Permalink
Add docs for SensorFusion class for combining sensor data into observ…
Browse files Browse the repository at this point in the history
…ations
  • Loading branch information
ll7 committed Feb 20, 2024
1 parent 5687917 commit 6ca726a
Showing 1 changed file with 51 additions and 2 deletions.
53 changes: 51 additions & 2 deletions robot_sf/sensor/sensor_fusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,29 @@ def fused_sensor_space(

@dataclass
class SensorFusion:
"""
A class that combines data from multiple sensors into a single observation.
Attributes
----------
lidar_sensor : Callable[[], np.ndarray]
A function that returns the current LiDAR sensor data.
robot_speed_sensor : Callable[[], PolarVec2D]
A function that returns the current robot speed.
target_sensor : Callable[[], Tuple[float, float, float]]
A function that returns the current target sensor data.
unnormed_obs_space : spaces.Dict
The unnormalized observation space.
use_next_goal : bool
Whether to use the next goal in the observation.
drive_state_cache : List[np.ndarray]
A cache of previous drive states.
lidar_state_cache : List[np.ndarray]
A cache of previous LiDAR states.
cache_steps : int
The number of steps to cache.
"""

lidar_sensor: Callable[[], np.ndarray]
robot_speed_sensor: Callable[[], PolarVec2D]
target_sensor: Callable[[], Tuple[float, float, float]]
Expand All @@ -85,36 +108,62 @@ class SensorFusion:
cache_steps: int = field(init=False)

def __post_init__(self):
# Initialize the number of steps to cache based on the LiDAR observation space
self.cache_steps = self.unnormed_obs_space[OBS_RAYS].shape[0]

def next_obs(self) -> Dict[str, np.ndarray]:
"""
Get the next observation by combining data from all sensors.
Returns
-------
Dict[str, np.ndarray]
The next observation, consisting of the drive state and LiDAR state.
"""
# Get the current LiDAR state
lidar_state = self.lidar_sensor()
# TODO: append beginning at the end for conv feature extractor

# Get the current robot speed
speed_x, speed_rot = self.robot_speed_sensor()

# Get the current target sensor data
target_distance, target_angle, next_target_angle = self.target_sensor()

# If not using the next goal, set the next target angle to 0.0
next_target_angle = next_target_angle if self.use_next_goal else 0.0

# Combine the robot speed and target sensor data into the drive state
drive_state = np.array([speed_x, speed_rot, target_distance, target_angle, next_target_angle])

# info: populate cache with same states -> no movement
# If the caches are empty, fill them with the current states
if len(self.drive_state_cache) == 0:
for _ in range(self.cache_steps):
self.drive_state_cache.append(drive_state)
self.lidar_state_cache.append(lidar_state)

# Add the current states to the caches and remove the oldest states
self.drive_state_cache.append(drive_state)
self.lidar_state_cache.append(lidar_state)
self.drive_state_cache.pop(0)
self.lidar_state_cache.pop(0)

# Stack the cached states into arrays
stacked_drive_state = np.array(self.drive_state_cache, dtype=np.float32)
stacked_lidar_state = np.array(self.lidar_state_cache, dtype=np.float32)

# Normalize the stacked states by the maximum values in the observation space
max_drive = self.unnormed_obs_space[OBS_DRIVE_STATE].high
max_lidar = self.unnormed_obs_space[OBS_RAYS].high
return { OBS_DRIVE_STATE: stacked_drive_state / max_drive,
OBS_RAYS: stacked_lidar_state / max_lidar }
return {
OBS_DRIVE_STATE: stacked_drive_state / max_drive,
OBS_RAYS: stacked_lidar_state / max_lidar
}

def reset_cache(self):
"""
Clear the caches of previous drive and LiDAR states.
"""
self.drive_state_cache.clear()
self.lidar_state_cache.clear()

0 comments on commit 6ca726a

Please sign in to comment.