-
Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathgazebo_env_randomizer.py
140 lines (102 loc) · 4.88 KB
/
gazebo_env_randomizer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# Copyright (C) 2020 Istituto Italiano di Tecnologia (IIT). All rights reserved.
# This software may be modified and distributed under the terms of the
# GNU Lesser General Public License v2.1 or any later version.
import abc
from typing import Callable, Dict, Optional, Union, cast
import gym
from gym_ignition.randomizers.abc import PhysicsRandomizer, TaskRandomizer
from gym_ignition.randomizers.physics import dart
from gym_ignition.runtimes import gazebo_runtime
from gym_ignition.utils import typing
MakeEnvCallable = Callable[[Optional[Dict]], gym.Env]
class GazeboEnvRandomizer(gym.Wrapper, TaskRandomizer, abc.ABC):
"""
Base class to implement an environment randomizer for Ignition Gazebo.
The randomizer is a :py:class:`gym.Wrapper` that extends the
:py:meth:`gym.Env.reset` method. Objects that inherit from this class are used to
setup the environment for the handled :py:class:`~gym_ignition.base.task.Task`.
In its simplest form, a randomizer populates the world with all the models that need
to be part of the simulation. The task could then operate on them from a
:py:class:`~scenario.core.Model` object.
More complex environments may require to randomize one or more simulated entities.
Concrete classes that implement a randomizer could use
:py:class:`~gym_ignition.randomizers.model.sdf.SDFRandomizer` to randomize the model
and :py:class:`~gym_ignition.randomizers.abc.PhysicsRandomizer` to randomize
the physics.
Args:
env: Defines the environment to handle. This argument could be either the string
id if the environment does not need to be registered or a function that
returns an environment object.
physics_randomizer: Object that randomizes physics. The default physics engine is
DART with no randomizations.
Note:
In order to randomize physics, the handled
:py:class:`scenario.gazebo.GazeboSimulator` is destroyed and created again.
This operation is demanding, consider randomizing physics at a low rate.
"""
def __init__(
self,
env: Union[str, MakeEnvCallable],
physics_randomizer: PhysicsRandomizer = dart.DART(),
**kwargs,
):
# Print the extra kwargs
gym.logger.debug(f"GazeboEnvRandomizer: {dict(kwargs=kwargs)}")
# Store the options
self._env_option = env
self._kwargs = dict(**kwargs, physics_engine=physics_randomizer.get_engine())
# Create the environment
env_to_wrap = self._create_environment(env=self._env_option, **self._kwargs)
# Initialize the wrapper
gym.Wrapper.__init__(self, env=env_to_wrap)
# Store the physics randomizer
self._physics_randomizer = physics_randomizer
# ===============
# gym.Env methods
# ===============
def reset(self, **kwargs) -> typing.Observation:
# Reset the physics
if self._physics_randomizer.physics_expired():
# Get the random components of the task
seed = self.env.task.seed
np_random = self.env.task.np_random
# Reset the runtime + task, creating a new Gazebo instance
self.env.close()
del self.env
self.env = self._create_environment(self._env_option, **self._kwargs)
# Restore the random components
self.env.seed(seed=seed)
assert self.env.task.seed == seed
self.env.task.np_random = np_random
# Mark the beginning of a new rollout
self._physics_randomizer.increase_rollout_counter()
# Reset the task through the TaskRandomizer
self.randomize_task(task=self.env.task, gazebo=self.env.gazebo, **kwargs)
ok_paused_run = self.env.gazebo.run(paused=True)
if not ok_paused_run:
raise RuntimeError("Failed to execute a paused Gazebo run")
# Reset the Task
return self.env.reset()
# ===============
# Private methods
# ===============
def _create_environment(
self, env: Union[str, MakeEnvCallable], **kwargs
) -> gazebo_runtime.GazeboRuntime:
if isinstance(env, str):
env_to_wrap = self._create_from_id(env_id=env, **kwargs)
elif callable(env):
env_to_wrap = self._create_from_callable(make_env=env, **kwargs)
else:
raise ValueError("The type of env object was not recognized")
if not isinstance(env_to_wrap.unwrapped, gazebo_runtime.GazeboRuntime):
raise ValueError("The environment to wrap is not a GazeboRuntime")
return cast(gazebo_runtime.GazeboRuntime, env_to_wrap)
@staticmethod
def _create_from_callable(make_env: MakeEnvCallable, **kwargs) -> gym.Env:
env = make_env(**kwargs)
return env
@staticmethod
def _create_from_id(env_id: str, **kwargs) -> gym.Env:
env = gym.make(env_id, **kwargs)
return env