-
Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathpendulum_swingup.py
135 lines (100 loc) · 4.45 KB
/
pendulum_swingup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# Copyright (C) 2019 Istituto Italiano di Tecnologia (IIT). All rights reserved.
# This software may be modified and distributed under the terms of the
# GNU Lesser General Public License v2.1 or any later version.
import abc
import gym
import numpy as np
from typing import Tuple
from gym_ignition.base import task
from gym_ignition.utils import logger
from gym_ignition.utils.typing import Action, Observation, Reward
from gym_ignition.utils.typing import ActionSpace, ObservationSpace
from gym_ignition.base.robot import robot_abc, feature_detector, robot_joints
@feature_detector
class RobotFeatures(robot_abc.RobotABC,
robot_joints.RobotJoints,
abc.ABC):
pass
class PendulumSwingUp(task.Task, abc.ABC):
def __init__(self) -> None:
super().__init__()
# Store the requested robot features for this task
self.robot_features = RobotFeatures
# Limits
self._max_speed = 8.0
self._max_torque = 2.0
# Private attributes
self._last_a = None
# Create the spaces
self.action_space, self.observation_space = self.create_spaces()
# Seed the environment
self.seed()
def create_spaces(self) -> Tuple[ActionSpace, ObservationSpace]:
action_space = gym.spaces.Box(low=-self._max_torque, high=self._max_torque,
shape=(1,), dtype=np.float32)
high = np.array(
[1., # cos(theta)
1., # sin(theta)
self._max_speed])
observation_space = gym.spaces.Box(low=-high, high=high, dtype=np.float32)
return action_space, observation_space
def set_action(self, action: Action) -> bool:
# Validate the action
assert self.action_space.contains(action), \
"%r (%s) invalid" % (action, type(action))
# Store the last action. It is used to calculate the reward.
self._last_a = action
# Read the action and send the force to the cart
force = action.tolist()[0]
ok = self.robot.set_joint_force("pivot", force)
if not ok:
raise Exception("Failed to set the force to the pendulum")
return True
def get_observation(self) -> Observation:
# Get the robot object
robot = self.robot
# Get the new pendulum position and velocity
theta = robot.joint_position("pivot")
theta_dot = robot.joint_velocity("pivot")
# Create the observation object
observation = Observation(np.array([np.cos(theta), np.sin(theta), theta_dot]))
# Return the observation
return observation
def get_reward(self) -> Reward:
# This environments is done only if the observation goes outside its limits.
# Since it can happen only when velocity is too high, penalize this happening.
if self.is_done():
return Reward(-10000)
# Get the data from the robot object
theta = self.robot.joint_position("pivot")
theta_dot = self.robot.joint_velocity("pivot")
cost = \
theta * theta + \
0.1 * theta_dot * theta_dot +\
0.001 * self._last_a
return Reward(-cost)
def is_done(self) -> bool:
if not self.observation_space.contains(self.get_observation()):
logger.warn("Observation is outside its space. Marking the episode as done.")
return True
# This environment is episodic and always reach the max_episode_steps
return False
def reset_task(self) -> bool:
# Sample the angular velocity from the observation space
_, _, theta_dot = self.observation_space.sample().tolist()
# Sample the angular position from an uniform rng
theta = self.np_random.uniform(0, 2 * np.pi)
try:
desired_control_mode = robot_joints.JointControlMode.TORQUE
if self.robot.joint_control_mode("pivot") != desired_control_mode:
ok_mode = self.robot.set_joint_control_mode("pivot", desired_control_mode)
assert ok_mode, "Failed to set pendulum control mode"
except Exception:
logger.warn("Failed to set control mode. Is it supported by the runtime?")
pass
# Reset the robot state
ok_reset = self.robot.reset_joint("pivot", theta, theta_dot)
assert ok_reset, "Failed to reset the pendulum"
# Clean the last applied force
self._last_a = None
return True