Skip to content

Commit

Permalink
Merge pull request #51 from jjshoots/pole_env
Browse files Browse the repository at this point in the history
Pole Waypoints Env
  • Loading branch information
jjshoots authored Jul 5, 2024
2 parents be0a846 + 68fcd83 commit bed365c
Show file tree
Hide file tree
Showing 33 changed files with 182 additions and 176 deletions.
2 changes: 1 addition & 1 deletion PyFlyt/core/abstractions/boosters.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def get_states(self) -> np.ndarray:
- (b0, b1, ..., bn) represent the remaining fuel ratio
- (c0, c1, ..., cn) represent the current throttle state
Returns
Returns:
-------
np.ndarray: A (3 * num_boosters, ) array
Expand Down
4 changes: 2 additions & 2 deletions PyFlyt/core/abstractions/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def __init__(
def view_mat(self) -> np.ndarray:
"""Generates the view matrix for the camera depending on the current orientation and implicit parameters.
Returns
Returns:
-------
np.ndarray: view matrix.
Expand Down Expand Up @@ -161,7 +161,7 @@ def physics_update(self):
def capture_image(self) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Captures the 3 relevant images from the camera.
Returns
Returns:
-------
tuple[np.ndarray, np.ndarray, np.ndarray]: rgbaImg, depthImg, segImg
Expand Down
2 changes: 1 addition & 1 deletion PyFlyt/core/abstractions/gimbals.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def reset(self):
def get_states(self) -> np.ndarray:
"""Gets the current state of the components.
Returns
Returns:
-------
np.ndarray: a (2 * num_gimbals, ) array where every pair of values represents the current state of the gimbal
Expand Down
4 changes: 2 additions & 2 deletions PyFlyt/core/abstractions/lifting_surfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def reset(self):
def get_states(self) -> np.ndarray:
"""Gets the current state of the components.
Returns
Returns:
-------
np.ndarray: a (num_surfaces, ) array representing the actuation state for each surface
Expand Down Expand Up @@ -254,7 +254,7 @@ def reset(self):
def get_states(self) -> float:
"""Gets the current state of the components.
Returns
Returns:
-------
float: the level of deflection of the surface.
Expand Down
2 changes: 1 addition & 1 deletion PyFlyt/core/abstractions/motors.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def reset(self) -> None:
def get_states(self) -> np.ndarray:
"""Gets the current state of the components.
Returns
Returns:
-------
np.ndarray: an (num_motors, ) array for the current throttle level of each motor
Expand Down
6 changes: 3 additions & 3 deletions PyFlyt/core/aviary.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(self, message: str) -> None:
def __str__(self) -> str:
"""__str__.
Returns
Returns:
-------
str:
Expand Down Expand Up @@ -380,7 +380,7 @@ def all_states(self) -> list[np.ndarray]:
This function is not very optimized, if you want the state of a single drone, do `state(i)`.
Returns
Returns:
-------
np.ndarray: list of states
Expand All @@ -399,7 +399,7 @@ def all_aux_states(self) -> list[np.ndarray]:
This function is not very optimized, if you want the aux state of a single drone, do `aux_state(i)`.
Returns
Returns:
-------
np.ndarray: list of auxiliary states
Expand Down
3 changes: 3 additions & 0 deletions PyFlyt/core/drones/quadx.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(
use_gimbal: bool = False,
camera_angle_degrees: int = 20,
camera_FOV_degrees: int = 90,
camera_position_offset: np.ndarray = np.array([0.0, 0.0, 0.0]),
camera_resolution: tuple[int, int] = (128, 128),
camera_fps: None | int = None,
):
Expand All @@ -51,6 +52,7 @@ def __init__(
use_gimbal (bool): use_gimbal
camera_angle_degrees (int): camera_angle_degrees
camera_FOV_degrees (int): camera_FOV_degrees
camera_position_offset (np.ndarray): offset position of the camera
camera_resolution (tuple[int, int]): camera_resolution
camera_fps (None | int): camera_fps
Expand Down Expand Up @@ -205,6 +207,7 @@ def __init__(
camera_FOV_degrees=camera_FOV_degrees,
camera_angle_degrees=camera_angle_degrees,
camera_resolution=camera_resolution,
camera_position_offset=camera_position_offset,
)

# compute camera fps parameters
Expand Down
18 changes: 7 additions & 11 deletions PyFlyt/gym_envs/fixedwing_envs/fixedwing_waypoints_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ def reset(
super().begin_reset(seed, options)
self.waypoints.reset(self.env, self.np_random)
self.info["num_targets_reached"] = 0
self.distance_to_immediate = np.inf
super().end_reset()

return self.state, self.info
Expand Down Expand Up @@ -165,12 +164,9 @@ def compute_state(self) -> None:
axis=-1,
)

new_state["target_deltas"] = self.waypoints.distance_to_target(
new_state["target_deltas"] = self.waypoints.distance_to_targets(
ang_pos, lin_pos, quaternion
)
self.distance_to_immediate = float(
np.linalg.norm(new_state["target_deltas"][0])
)

self.state: dict[Literal["attitude", "target_deltas"], np.ndarray] = new_state

Expand All @@ -180,17 +176,17 @@ def compute_term_trunc_reward(self) -> None:

# bonus reward if we are not sparse
if not self.sparse_reward:
self.reward += max(3.0 * self.waypoints.progress_to_target(), 0.0)
self.reward += 1.0 / self.distance_to_immediate
self.reward += max(3.0 * self.waypoints.progress_to_next_target, 0.0)
self.reward += 1.0 / self.waypoints.distance_to_next_target

# target reached
if self.waypoints.target_reached():
if self.waypoints.target_reached:
self.reward = 100.0

# advance the targets
self.waypoints.advance_targets()

# update infos and dones
self.truncation |= self.waypoints.all_targets_reached()
self.info["env_complete"] = self.waypoints.all_targets_reached()
self.info["num_targets_reached"] = self.waypoints.num_targets_reached()
self.truncation |= self.waypoints.all_targets_reached
self.info["env_complete"] = self.waypoints.all_targets_reached
self.info["num_targets_reached"] = self.waypoints.num_targets_reached
40 changes: 23 additions & 17 deletions PyFlyt/gym_envs/quadx_envs/quadx_base_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,24 +75,30 @@ def __init__(
self.auxiliary_space = spaces.Box(
low=-np.inf, high=np.inf, shape=(4,), dtype=np.float64
)
angular_rate_limit = np.pi

# define the action space
xyz_limit = np.pi
thrust_limit = 0.8
high = np.array(
[
angular_rate_limit,
angular_rate_limit,
angular_rate_limit,
thrust_limit,
]
)
low = np.array(
[
-angular_rate_limit,
-angular_rate_limit,
-angular_rate_limit,
0.0,
]
)
if flight_mode == -1:
high = np.ones((4,)) * thrust_limit
low = np.zeros((4,))
else:
high = np.array(
[
xyz_limit,
xyz_limit,
xyz_limit,
thrust_limit,
]
)
low = np.array(
[
-xyz_limit,
-xyz_limit,
-xyz_limit,
0.0,
]
)
self.action_space = spaces.Box(low=low, high=high, dtype=np.float64)

# the whole implicit state space = attitude + previous action + auxiliary information
Expand Down
13 changes: 8 additions & 5 deletions PyFlyt/gym_envs/quadx_envs/quadx_pole_balance_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
class QuadXPoleBalanceEnv(QuadXBaseEnv):
"""Simple Hover Environment with the additional goal of keeping a pole upright.
Actions are vp, vq, vr, T, ie: angular rates and thrust.
Actions are direct motor PWM commands because any underlying controller introduces too much control latency.
The target is to not crash and not let the pole hit the ground for the longest time possible.
Args:
Expand All @@ -32,7 +32,7 @@ class QuadXPoleBalanceEnv(QuadXBaseEnv):
def __init__(
self,
sparse_reward: bool = False,
flight_mode: int = 0,
flight_mode: int = -1,
flight_dome_size: float = 3.0,
max_duration_seconds: float = 20.0,
angle_representation: Literal["euler", "quaternion"] = "quaternion",
Expand All @@ -45,12 +45,12 @@ def __init__(
Args:
----
sparse_reward (bool): whether to use sparse rewards or not.
flight_mode (int): the flight mode of the UAV
flight_mode (int): the flight mode of the UAV.
flight_dome_size (float): size of the allowable flying area.
max_duration_seconds (float): maximum simulation time of the environment.
angle_representation (Literal["euler", "quaternion"]): can be "euler" or "quaternion".
agent_hz (int): looprate of the agent to environment interaction.
render_mode (None | Literal["human", "rgb_array"]): render_mode
render_mode (None | Literal["human", "rgb_array"]): render_mode.
render_resolution (tuple[int, int]): render_resolution.
"""
Expand Down Expand Up @@ -94,7 +94,10 @@ def reset(
super().begin_reset(
seed,
options,
drone_options={"drone_model": "primitive_drone"},
drone_options={
"drone_model": "primitive_drone",
"camera_position_offset": np.array([-3.0, 0.0, 1.0]),
},
)
self.pole.reset(p=self.env, start_location=np.array([0.0, 0.0, 1.55]))
super().end_reset(seed, options)
Expand Down
47 changes: 24 additions & 23 deletions PyFlyt/gym_envs/quadx_envs/quadx_pole_waypoints_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
class QuadXPoleWaypointsEnv(QuadXBaseEnv):
"""QuadX Pole Waypoints Environment.
Actions are vp, vq, vr, T, ie: angular rates and thrust.
Actions are direct motor PWM commands because any underlying controller introduces too much control latency.
The target is to get to a set of `[x, y, z]` waypoints in space without dropping the pole.
Args:
Expand All @@ -37,11 +37,11 @@ def __init__(
sparse_reward: bool = False,
num_targets: int = 4,
goal_reach_distance: float = 0.2,
flight_mode: int = 0,
flight_mode: int = -1,
flight_dome_size: float = 10.0,
max_duration_seconds: float = 60.0,
max_duration_seconds: float = 20.0,
angle_representation: Literal["euler", "quaternion"] = "quaternion",
agent_hz: int = 30,
agent_hz: int = 40,
render_mode: None | Literal["human", "rgb_array"] = None,
render_resolution: tuple[int, int] = (480, 480),
):
Expand All @@ -57,12 +57,11 @@ def __init__(
max_duration_seconds (float): maximum simulation time of the environment.
angle_representation (Literal["euler", "quaternion"]): can be "euler" or "quaternion".
agent_hz (int): looprate of the agent to environment interaction.
render_mode (None | Literal["human", "rgb_array"]): render_mode
render_mode (None | Literal["human", "rgb_array"]): render_mode.
render_resolution (tuple[int, int]): render_resolution.
"""
super().__init__(
start_pos=np.array([[0.0, 0.0, 1.0]]),
flight_mode=flight_mode,
flight_dome_size=flight_dome_size,
max_duration_seconds=max_duration_seconds,
Expand Down Expand Up @@ -126,7 +125,12 @@ def reset(
"""
super().begin_reset(
seed, options, drone_options={"drone_model": "primitive_drone"}
seed,
options,
drone_options={
"drone_model": "primitive_drone",
"camera_position_offset": np.array([-3.0, 0.0, 1.0]),
},
)

# spawn in a pole
Expand All @@ -135,7 +139,6 @@ def reset(
# init some other metadata
self.waypoints.reset(self.env, self.np_random)
self.info["num_targets_reached"] = 0
self.distance_to_immediate = np.inf

super().end_reset()

Expand All @@ -162,10 +165,10 @@ def compute_state(self) -> None:
"""
# compute attitude of self
ang_vel, ang_pos, lin_vel, lin_pos, quaternion = super().compute_attitude()
aux_state = super().compute_auxiliary()
rotation = (
np.array(self.env.getMatrixFromQuaternion(quaternion)).reshape(3, 3).T
)
aux_state = super().compute_auxiliary()

# compute the pole's states
(
Expand Down Expand Up @@ -210,36 +213,34 @@ def compute_state(self) -> None:
pole_bot_pos,
pole_top_vel,
pole_bot_vel,
]
],
axis=-1,
)

new_state["target_deltas"] = self.waypoints.distance_to_target(
new_state["target_deltas"] = self.waypoints.distance_to_targets(
ang_pos, lin_pos, quaternion
)
self.distance_to_immediate = float(
np.linalg.norm(new_state["target_deltas"][0])
)

self.state: dict[Literal["attitude", "target_deltas"], np.ndarray] = new_state

def compute_term_trunc_reward(self) -> None:
"""Computes the termination, trunction, and reward of the current timestep."""
"""Computes the termination, truncation, and reward of the current timestep."""
super().compute_base_term_trunc_reward()

# bonus reward if we are not sparse
if not self.sparse_reward:
self.reward += max(3.0 * self.waypoints.progress_to_target(), 0.0)
self.reward += 0.1 / self.distance_to_immediate
self.reward -= self.pole.leaningness
self.reward += max(15.0 * self.waypoints.progress_to_next_target, 0.0)
self.reward += 0.5 / self.waypoints.distance_to_next_target
self.reward += (0.5 - self.pole.leaningness)

# target reached
if self.waypoints.target_reached():
self.reward = 100.0
if self.waypoints.target_reached:
self.reward = 300.0

# advance the targets
self.waypoints.advance_targets()

# update infos and dones
self.truncation |= self.waypoints.all_targets_reached()
self.info["env_complete"] = self.waypoints.all_targets_reached()
self.info["num_targets_reached"] = self.waypoints.num_targets_reached()
self.truncation |= self.waypoints.all_targets_reached
self.info["env_complete"] = self.waypoints.all_targets_reached
self.info["num_targets_reached"] = self.waypoints.num_targets_reached
Loading

0 comments on commit bed365c

Please sign in to comment.