From 297709736c8ae7c51502fed03478c1b7f732fcd5 Mon Sep 17 00:00:00 2001 From: Jinyu-W <53509467+Jinyu-W@users.noreply.github.com> Date: Fri, 27 Oct 2023 14:12:46 +0800 Subject: [PATCH] Add DDQN (#598) * Refine explore strategy, add prioritized sampling support; add DDQN example; add DQN test (#590) * Runnable. Should setup a benchmark and test performance. * Refine logic * Test DQN on GYM passed * Refine explore strategy * Minor * Minor * Add Dueling DQN in CIM scenario * Resolve PR comments * Add one more explanation * fix env_sampler eval info list issue * update version to 0.3.2a4 --------- Co-authored-by: Huoran Li Co-authored-by: Jinyu Wang --- examples/cim/rl/algorithms/dqn.py | 72 ++++++--- examples/cim/rl/config.py | 2 +- examples/vm_scheduling/rl/algorithms/dqn.py | 16 +- maro/__misc__.py | 2 +- maro/rl/exploration/__init__.py | 12 +- maro/rl/exploration/scheduling.py | 127 ---------------- maro/rl/exploration/strategies.py | 159 ++++++++++---------- maro/rl/model/fc_block.py | 13 +- maro/rl/policy/discrete_rl_policy.py | 36 +---- maro/rl/rollout/env_sampler.py | 6 +- maro/rl/training/__init__.py | 9 +- maro/rl/training/algorithms/ddpg.py | 3 - maro/rl/training/algorithms/dqn.py | 90 ++++++++--- maro/rl/training/algorithms/sac.py | 3 - maro/rl/training/replay_memory.py | 108 ++++++++++++- maro/rl/training/trainer.py | 3 +- tests/rl/gym_wrapper/common.py | 22 ++- tests/rl/gym_wrapper/env_sampler.py | 9 +- tests/rl/performance.md | 35 ++++- tests/rl/tasks/ac/__init__.py | 3 + tests/rl/tasks/ddpg/__init__.py | 3 + tests/rl/tasks/dqn/__init__.py | 104 +++++++++++++ tests/rl/tasks/dqn/config.yml | 32 ++++ tests/rl/tasks/ppo/__init__.py | 3 + tests/rl/tasks/sac/__init__.py | 3 + 25 files changed, 541 insertions(+), 334 deletions(-) delete mode 100644 maro/rl/exploration/scheduling.py create mode 100644 tests/rl/tasks/dqn/__init__.py create mode 100644 tests/rl/tasks/dqn/config.yml diff --git a/examples/cim/rl/algorithms/dqn.py b/examples/cim/rl/algorithms/dqn.py index 022275552..c2c5f1952 100644 --- a/examples/cim/rl/algorithms/dqn.py +++ b/examples/cim/rl/algorithms/dqn.py @@ -1,10 +1,11 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. +from typing import Optional, Tuple import torch from torch.optim import RMSprop -from maro.rl.exploration import MultiLinearExplorationScheduler, epsilon_greedy +from maro.rl.exploration import EpsilonGreedy from maro.rl.model import DiscreteQNet, FullyConnected from maro.rl.policy import ValueBasedPolicy from maro.rl.training.algorithms import DQNParams, DQNTrainer @@ -23,32 +24,62 @@ class MyQNet(DiscreteQNet): - def __init__(self, state_dim: int, action_num: int) -> None: + def __init__( + self, + state_dim: int, + action_num: int, + dueling_param: Optional[Tuple[dict, dict]] = None, + ) -> None: super(MyQNet, self).__init__(state_dim=state_dim, action_num=action_num) - self._fc = FullyConnected(input_dim=state_dim, output_dim=action_num, **q_net_conf) - self._optim = RMSprop(self._fc.parameters(), lr=learning_rate) + + self._use_dueling = dueling_param is not None + self._fc = FullyConnected(input_dim=state_dim, output_dim=0 if self._use_dueling else action_num, **q_net_conf) + if self._use_dueling: + q_kwargs, v_kwargs = dueling_param + self._q = FullyConnected(input_dim=self._fc.output_dim, output_dim=action_num, **q_kwargs) + self._v = FullyConnected(input_dim=self._fc.output_dim, output_dim=1, **v_kwargs) + + self._optim = RMSprop(self.parameters(), lr=learning_rate) def _get_q_values_for_all_actions(self, states: torch.Tensor) -> torch.Tensor: - return self._fc(states) + logits = self._fc(states) + if self._use_dueling: + q = self._q(logits) + v = self._v(logits) + logits = q - q.mean(dim=1, keepdim=True) + v + return logits def get_dqn_policy(state_dim: int, action_num: int, name: str) -> ValueBasedPolicy: + q_kwargs = { + "hidden_dims": [128], + "activation": torch.nn.LeakyReLU, + "output_activation": torch.nn.LeakyReLU, + "softmax": False, + "batch_norm": True, + "skip_connection": False, + "head": True, + "dropout_p": 0.0, + } + v_kwargs = { + "hidden_dims": [128], + "activation": torch.nn.LeakyReLU, + "output_activation": None, + "softmax": False, + "batch_norm": True, + "skip_connection": False, + "head": True, + "dropout_p": 0.0, + } + return ValueBasedPolicy( name=name, - q_net=MyQNet(state_dim, action_num), - exploration_strategy=(epsilon_greedy, {"epsilon": 0.4}), - exploration_scheduling_options=[ - ( - "epsilon", - MultiLinearExplorationScheduler, - { - "splits": [(2, 0.32)], - "initial_value": 0.4, - "last_ep": 5, - "final_value": 0.0, - }, - ), - ], + q_net=MyQNet( + state_dim, + action_num, + dueling_param=(q_kwargs, v_kwargs), + ), + explore_strategy=EpsilonGreedy(epsilon=0.4, num_actions=action_num), warmup=100, ) @@ -64,6 +95,7 @@ def get_dqn(name: str) -> DQNTrainer: num_epochs=10, soft_update_coef=0.1, double=False, - random_overwrite=False, + alpha=1.0, + beta=1.0, ), ) diff --git a/examples/cim/rl/config.py b/examples/cim/rl/config.py index a46194900..e91e5edfc 100644 --- a/examples/cim/rl/config.py +++ b/examples/cim/rl/config.py @@ -35,4 +35,4 @@ action_num = len(action_shaping_conf["action_space"]) -algorithm = "ppo" # ac, ppo, dqn or discrete_maddpg +algorithm = "dqn" # ac, ppo, dqn or discrete_maddpg diff --git a/examples/vm_scheduling/rl/algorithms/dqn.py b/examples/vm_scheduling/rl/algorithms/dqn.py index 643d6c6d4..78be0d7bd 100644 --- a/examples/vm_scheduling/rl/algorithms/dqn.py +++ b/examples/vm_scheduling/rl/algorithms/dqn.py @@ -6,7 +6,7 @@ from torch.optim import SGD from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts -from maro.rl.exploration import MultiLinearExplorationScheduler +from maro.rl.exploration import EpsilonGreedy from maro.rl.model import DiscreteQNet, FullyConnected from maro.rl.policy import ValueBasedPolicy from maro.rl.training.algorithms import DQNParams, DQNTrainer @@ -58,19 +58,7 @@ def get_dqn_policy(state_dim: int, action_num: int, num_features: int, name: str return ValueBasedPolicy( name=name, q_net=MyQNet(state_dim, action_num, num_features), - exploration_strategy=(MaskedEpsGreedy(state_dim, num_features), {"epsilon": 0.4}), - exploration_scheduling_options=[ - ( - "epsilon", - MultiLinearExplorationScheduler, - { - "splits": [(100, 0.32)], - "initial_value": 0.4, - "last_ep": 400, - "final_value": 0.0, - }, - ), - ], + explore_strategy=EpsilonGreedy(epsilon=0.4, num_actions=action_num), warmup=100, ) diff --git a/maro/__misc__.py b/maro/__misc__.py index 58b495716..5f615891e 100644 --- a/maro/__misc__.py +++ b/maro/__misc__.py @@ -2,6 +2,6 @@ # Licensed under the MIT license. -__version__ = "0.3.2a3" +__version__ = "0.3.2a4" __data_version__ = "0.2" diff --git a/maro/rl/exploration/__init__.py b/maro/rl/exploration/__init__.py index 383cca89a..7be8b579d 100644 --- a/maro/rl/exploration/__init__.py +++ b/maro/rl/exploration/__init__.py @@ -1,14 +1,10 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. -from .scheduling import AbsExplorationScheduler, LinearExplorationScheduler, MultiLinearExplorationScheduler -from .strategies import epsilon_greedy, gaussian_noise, uniform_noise +from .strategies import EpsilonGreedy, ExploreStrategy, LinearExploration __all__ = [ - "AbsExplorationScheduler", - "LinearExplorationScheduler", - "MultiLinearExplorationScheduler", - "epsilon_greedy", - "gaussian_noise", - "uniform_noise", + "ExploreStrategy", + "EpsilonGreedy", + "LinearExploration", ] diff --git a/maro/rl/exploration/scheduling.py b/maro/rl/exploration/scheduling.py deleted file mode 100644 index 3981729c9..000000000 --- a/maro/rl/exploration/scheduling.py +++ /dev/null @@ -1,127 +0,0 @@ -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT license. - -from abc import ABC, abstractmethod -from typing import List, Tuple - - -class AbsExplorationScheduler(ABC): - """Abstract exploration scheduler. - - Args: - exploration_params (dict): The exploration params attribute from some ``RLPolicy`` instance to which the - scheduler is applied. - param_name (str): Name of the exploration parameter to which the scheduler is applied. - initial_value (float, default=None): Initial value for the exploration parameter. If None, the value used - when instantiating the policy will be used as the initial value. - """ - - def __init__(self, exploration_params: dict, param_name: str, initial_value: float = None) -> None: - super().__init__() - self._exploration_params = exploration_params - self.param_name = param_name - if initial_value is not None: - self._exploration_params[self.param_name] = initial_value - - def get_value(self) -> float: - return self._exploration_params[self.param_name] - - @abstractmethod - def step(self) -> None: - raise NotImplementedError - - -class LinearExplorationScheduler(AbsExplorationScheduler): - """Linear exploration parameter schedule. - - Args: - exploration_params (dict): The exploration params attribute from some ``RLPolicy`` instance to which the - scheduler is applied. - param_name (str): Name of the exploration parameter to which the scheduler is applied. - last_ep (int): Last episode. - final_value (float): The value of the exploration parameter corresponding to ``last_ep``. - start_ep (int, default=1): starting episode. - initial_value (float, default=None): Initial value for the exploration parameter. If None, the value used - when instantiating the policy will be used as the initial value. - """ - - def __init__( - self, - exploration_params: dict, - param_name: str, - *, - last_ep: int, - final_value: float, - start_ep: int = 1, - initial_value: float = None, - ) -> None: - super().__init__(exploration_params, param_name, initial_value=initial_value) - self.final_value = final_value - if last_ep > 1: - self.delta = (self.final_value - self._exploration_params[self.param_name]) / (last_ep - start_ep) - else: - self.delta = 0 - - def step(self) -> None: - if self._exploration_params[self.param_name] == self.final_value: - return - - self._exploration_params[self.param_name] += self.delta - - -class MultiLinearExplorationScheduler(AbsExplorationScheduler): - """Exploration parameter schedule that consists of multiple linear phases. - - Args: - exploration_params (dict): The exploration params attribute from some ``RLPolicy`` instance to which the - scheduler is applied. - param_name (str): Name of the exploration parameter to which the scheduler is applied. - splits (List[Tuple[int, float]]): List of points that separate adjacent linear phases. Each - point is a (episode, parameter_value) tuple that indicates the end of one linear phase and - the start of another. These points do not have to be given in any particular order. There - cannot be two points with the same first element (episode), or a ``ValueError`` will be raised. - last_ep (int): Last episode. - final_value (float): The value of the exploration parameter corresponding to ``last_ep``. - start_ep (int, default=1): starting episode. - initial_value (float, default=None): Initial value for the exploration parameter. If None, the value from - the original dictionary the policy is instantiated with will be used as the initial value. - """ - - def __init__( - self, - exploration_params: dict, - param_name: str, - *, - splits: List[Tuple[int, float]], - last_ep: int, - final_value: float, - start_ep: int = 1, - initial_value: float = None, - ) -> None: - super().__init__(exploration_params, param_name, initial_value=initial_value) - - # validate splits - splits = [(start_ep, self._exploration_params[self.param_name])] + splits + [(last_ep, final_value)] - splits.sort() - for (ep, _), (ep2, _) in zip(splits, splits[1:]): - if ep == ep2: - raise ValueError("The zeroth element of split points must be unique") - - self.final_value = final_value - self._splits = splits - self._ep = start_ep - self._split_index = 1 - self._delta = (self._splits[1][1] - self._exploration_params[self.param_name]) / (self._splits[1][0] - start_ep) - - def step(self) -> None: - if self._split_index == len(self._splits): - return - - self._exploration_params[self.param_name] += self._delta - self._ep += 1 - if self._ep == self._splits[self._split_index][0]: - self._split_index += 1 - if self._split_index < len(self._splits): - self._delta = (self._splits[self._split_index][1] - self._splits[self._split_index - 1][1]) / ( - self._splits[self._split_index][0] - self._splits[self._split_index - 1][0] - ) diff --git a/maro/rl/exploration/strategies.py b/maro/rl/exploration/strategies.py index c85340c78..37b164389 100644 --- a/maro/rl/exploration/strategies.py +++ b/maro/rl/exploration/strategies.py @@ -1,93 +1,100 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. - -from typing import Union +from abc import abstractmethod +from typing import Any import numpy as np -def epsilon_greedy( - state: np.ndarray, - action: np.ndarray, - num_actions: int, - *, - epsilon: float, -) -> np.ndarray: - """Epsilon-greedy exploration. +class ExploreStrategy: + def __init__(self) -> None: + pass + + @abstractmethod + def get_action( + self, + state: np.ndarray, + action: np.ndarray, + **kwargs: Any, + ) -> np.ndarray: + """ + Args: + state (np.ndarray): State(s) based on which ``action`` is chosen. This is not used by the vanilla + eps-greedy exploration and is put here to conform to the function signature required for the exploration + strategy parameter for ``DQN``. + action (np.ndarray): Action(s) chosen greedily by the policy. + + Returns: + Exploratory actions. + """ + raise NotImplementedError + + +class EpsilonGreedy(ExploreStrategy): + """Epsilon-greedy exploration. Returns uniformly random action with probability `epsilon` or returns original + action with probability `1.0 - epsilon`. Args: - state (np.ndarray): State(s) based on which ``action`` is chosen. This is not used by the vanilla - eps-greedy exploration and is put here to conform to the function signature required for the exploration - strategy parameter for ``DQN``. - action (np.ndarray): Action(s) chosen greedily by the policy. num_actions (int): Number of possible actions. epsilon (float): The probability that a random action will be selected. - - Returns: - Exploratory actions. """ - return np.array([act if np.random.random() > epsilon else np.random.randint(num_actions) for act in action]) + def __init__(self, num_actions: int, epsilon: float) -> None: + super(EpsilonGreedy, self).__init__() -def uniform_noise( - state: np.ndarray, - action: np.ndarray, - min_action: Union[float, list, np.ndarray] = None, - max_action: Union[float, list, np.ndarray] = None, - *, - low: Union[float, list, np.ndarray], - high: Union[float, list, np.ndarray], -) -> Union[float, np.ndarray]: - """Apply a uniform noise to a continuous multidimensional action. + assert 0.0 <= epsilon <= 1.0 - Args: - state (np.ndarray): State(s) based on which ``action`` is chosen. This is not used by the gaussian noise - exploration scheme and is put here to conform to the function signature for the exploration in continuous - action spaces. - action (np.ndarray): Action(s) chosen greedily by the policy. - min_action (Union[float, list, np.ndarray], default=None): Lower bound for the multidimensional action space. - max_action (Union[float, list, np.ndarray], default=None): Upper bound for the multidimensional action space. - low (Union[float, list, np.ndarray]): Lower bound for the noise range. - high (Union[float, list, np.ndarray]): Upper bound for the noise range. - - Returns: - Exploration actions with added noise. - """ - if min_action is None and max_action is None: - return action + np.random.uniform(low, high, size=action.shape) - else: - return np.clip(action + np.random.uniform(low, high, size=action.shape), min_action, max_action) - - -def gaussian_noise( - state: np.ndarray, - action: np.ndarray, - min_action: Union[float, list, np.ndarray] = None, - max_action: Union[float, list, np.ndarray] = None, - *, - mean: Union[float, list, np.ndarray] = 0.0, - stddev: Union[float, list, np.ndarray] = 1.0, - relative: bool = False, -) -> Union[float, np.ndarray]: - """Apply a gaussian noise to a continuous multidimensional action. + self._num_actions = num_actions + self._eps = epsilon + + def get_action( + self, + state: np.ndarray, + action: np.ndarray, + **kwargs: Any, + ) -> np.ndarray: + return np.array( + [act if np.random.random() > self._eps else np.random.randint(self._num_actions) for act in action], + ) + + +class LinearExploration(ExploreStrategy): + """Epsilon greedy which the probability `epsilon` is linearly interpolated between `start_explore_prob` and + `end_explore_prob` over `explore_steps`. After this many timesteps pass, `epsilon` is fixed to `end_explore_prob`. Args: - state (np.ndarray): State(s) based on which ``action`` is chosen. This is not used by the gaussian noise - exploration scheme and is put here to conform to the function signature for the exploration in continuous - action spaces. - action (np.ndarray): Action(s) chosen greedily by the policy. - min_action (Union[float, list, np.ndarray], default=None): Lower bound for the multidimensional action space. - max_action (Union[float, list, np.ndarray], default=None): Upper bound for the multidimensional action space. - mean (Union[float, list, np.ndarray], default=0.0): Gaussian noise mean. - stddev (Union[float, list, np.ndarray], default=1.0): Standard deviation for the Gaussian noise. - relative (bool, default=False): If True, the generated noise is treated as a relative measure and will - be multiplied by the action itself before being added to the action. - - Returns: - Exploration actions with added noise (a numpy ndarray). + num_actions (int): Number of possible actions. + explore_steps (int): Maximum number of steps to interpolate probability. + start_explore_prob (float): Starting explore probability. + end_explore_prob (float): Ending explore probability. """ - noise = np.random.normal(loc=mean, scale=stddev, size=action.shape) - if min_action is None and max_action is None: - return action + ((noise * action) if relative else noise) - else: - return np.clip(action + ((noise * action) if relative else noise), min_action, max_action) + + def __init__( + self, + num_actions: int, + explore_steps: int, + start_explore_prob: float, + end_explore_prob: float, + ) -> None: + super(LinearExploration, self).__init__() + + self._call_count = 0 + + self._num_actions = num_actions + self._explore_steps = explore_steps + self._start_explore_prob = start_explore_prob + self._end_explore_prob = end_explore_prob + + def get_action( + self, + state: np.ndarray, + action: np.ndarray, + **kwargs: Any, + ) -> np.ndarray: + ratio = min(self._call_count / self._explore_steps, 1.0) + epsilon = self._start_explore_prob + (self._end_explore_prob - self._start_explore_prob) * ratio + explore_flag = np.random.random() < epsilon + action = np.array([np.random.randint(self._num_actions) if explore_flag else act for act in action]) + + self._call_count += 1 + return action diff --git a/maro/rl/model/fc_block.py b/maro/rl/model/fc_block.py index 9154d6ff0..f7f78e518 100644 --- a/maro/rl/model/fc_block.py +++ b/maro/rl/model/fc_block.py @@ -13,7 +13,7 @@ class FullyConnected(nn.Module): Args: input_dim (int): Network input dimension. - output_dim (int): Network output dimension. + output_dim (int): Network output dimension. If it is 0, will not create the top layer. hidden_dims (List[int]): Dimensions of hidden layers. Its length is the number of hidden layers. For example, `hidden_dims=[128, 256]` refers to two hidden layers with output dim of 128 and 256, respectively. activation (Optional[Type[torch.nn.Module], default=nn.ReLU): Activation class provided by ``torch.nn`` or a @@ -52,7 +52,6 @@ def __init__( super(FullyConnected, self).__init__() self._input_dim = input_dim self._hidden_dims = hidden_dims if hidden_dims is not None else [] - self._output_dim = output_dim # network features self._activation = activation if activation else None @@ -76,9 +75,13 @@ def __init__( self._build_layer(in_dim, out_dim, activation=self._activation) for in_dim, out_dim in zip(dims, dims[1:]) ] # top layer - layers.append( - self._build_layer(dims[-1], self._output_dim, head=self._head, activation=self._output_activation), - ) + if output_dim != 0: + layers.append( + self._build_layer(dims[-1], output_dim, head=self._head, activation=self._output_activation), + ) + self._output_dim = output_dim + else: + self._output_dim = hidden_dims[-1] self._net = nn.Sequential(*layers) diff --git a/maro/rl/policy/discrete_rl_policy.py b/maro/rl/policy/discrete_rl_policy.py index 289d150e7..344be00d8 100644 --- a/maro/rl/policy/discrete_rl_policy.py +++ b/maro/rl/policy/discrete_rl_policy.py @@ -2,15 +2,14 @@ # Licensed under the MIT license. from abc import ABCMeta -from typing import Callable, Dict, List, Tuple +from typing import Dict, Optional, Tuple import numpy as np import torch -from maro.rl.exploration import epsilon_greedy +from maro.rl.exploration import ExploreStrategy from maro.rl.model import DiscretePolicyNet, DiscreteQNet from maro.rl.utils import match_shape, ndarray_to_tensor -from maro.utils import clone from .abs_policy import RLPolicy @@ -69,8 +68,7 @@ class ValueBasedPolicy(DiscreteRLPolicy): name (str): Name of the policy. q_net (DiscreteQNet): Q-net used in this value-based policy. trainable (bool, default=True): Whether this policy is trainable. - exploration_strategy (Tuple[Callable, dict], default=(epsilon_greedy, {"epsilon": 0.1})): Exploration strategy. - exploration_scheduling_options (List[tuple], default=None): List of exploration scheduler options. + explore_strategy (Optional[ExploreStrategy], default=None): Explore strategy. warmup (int, default=50000): Number of steps for uniform-random action selection, before running real policy. Helps exploration. """ @@ -80,8 +78,7 @@ def __init__( name: str, q_net: DiscreteQNet, trainable: bool = True, - exploration_strategy: Tuple[Callable, dict] = (epsilon_greedy, {"epsilon": 0.1}), - exploration_scheduling_options: List[tuple] = None, + explore_strategy: Optional[ExploreStrategy] = None, warmup: int = 50000, ) -> None: assert isinstance(q_net, DiscreteQNet) @@ -94,15 +91,7 @@ def __init__( warmup=warmup, ) self._q_net = q_net - - self._exploration_func = exploration_strategy[0] - self._exploration_params = clone(exploration_strategy[1]) # deep copy is needed to avoid unwanted sharing - self._exploration_schedulers = ( - [opt[1](self._exploration_params, opt[0], **opt[2]) for opt in exploration_scheduling_options] - if exploration_scheduling_options is not None - else [] - ) - + self._explore_strategy = explore_strategy self._softmax = torch.nn.Softmax(dim=1) @property @@ -176,9 +165,6 @@ def q_values_tensor(self, states: torch.Tensor, actions: torch.Tensor, **kwargs) assert match_shape(q_values, (states.shape[0],)) # [B] return q_values - def explore(self) -> None: - pass # Overwrite the base method and turn off explore mode. - def _get_actions_impl(self, states: torch.Tensor, **kwargs) -> torch.Tensor: return self._get_actions_with_probs_impl(states, **kwargs)[0] @@ -187,17 +173,11 @@ def _get_actions_with_probs_impl(self, states: torch.Tensor, **kwargs) -> Tuple[ q_matrix_softmax = self._softmax(q_matrix) _, actions = q_matrix.max(dim=1) # [B], [B] - if self._is_exploring: - actions = self._exploration_func( - states, - actions.cpu().numpy(), - self.action_num, - **self._exploration_params, - **kwargs, - ) + if self._is_exploring and self._explore_strategy is not None: + actions = self._explore_strategy.get_action(state=states.cpu().numpy(), action=actions.cpu().numpy()) actions = ndarray_to_tensor(actions, device=self._device) - actions = actions.unsqueeze(1) + actions = actions.unsqueeze(1).long() return actions, q_matrix_softmax.gather(1, actions).squeeze(-1) # [B, 1] def _get_actions_with_logps_impl(self, states: torch.Tensor, **kwargs) -> Tuple[torch.Tensor, torch.Tensor]: diff --git a/maro/rl/rollout/env_sampler.py b/maro/rl/rollout/env_sampler.py index 2825ef4aa..0dc311e8c 100644 --- a/maro/rl/rollout/env_sampler.py +++ b/maro/rl/rollout/env_sampler.py @@ -533,7 +533,7 @@ def sample( return { "experiences": [total_experiences], - "info": [deepcopy(self._info)], # TODO: may have overhead issues. Leave to future work. + "info": [deepcopy(self._info)], } def set_policy_state(self, policy_state_dict: Dict[str, dict]) -> None: @@ -592,7 +592,7 @@ def eval(self, policy_state: Dict[str, Dict[str, Any]] = None, num_episodes: int self._step(list(env_action_dict.values())) cache_element.next_state = self._state - if self._reward_eval_delay is None: # TODO: necessary to calculate reward in eval()? + if self._reward_eval_delay is None: self._calc_reward(cache_element) self._post_eval_step(cache_element) @@ -606,7 +606,7 @@ def eval(self, policy_state: Dict[str, Dict[str, Any]] = None, num_episodes: int self._calc_reward(cache_element) self._post_eval_step(cache_element) - info_list.append(self._info) + info_list.append(deepcopy(self._info)) return {"info": info_list} diff --git a/maro/rl/training/__init__.py b/maro/rl/training/__init__.py index a77296f98..0e4488915 100644 --- a/maro/rl/training/__init__.py +++ b/maro/rl/training/__init__.py @@ -2,7 +2,13 @@ # Licensed under the MIT license. from .proxy import TrainingProxy -from .replay_memory import FIFOMultiReplayMemory, FIFOReplayMemory, RandomMultiReplayMemory, RandomReplayMemory +from .replay_memory import ( + FIFOMultiReplayMemory, + FIFOReplayMemory, + PrioritizedReplayMemory, + RandomMultiReplayMemory, + RandomReplayMemory, +) from .train_ops import AbsTrainOps, RemoteOps, remote from .trainer import AbsTrainer, BaseTrainerParams, MultiAgentTrainer, SingleAgentTrainer from .training_manager import TrainingManager @@ -12,6 +18,7 @@ "TrainingProxy", "FIFOMultiReplayMemory", "FIFOReplayMemory", + "PrioritizedReplayMemory", "RandomMultiReplayMemory", "RandomReplayMemory", "AbsTrainOps", diff --git a/maro/rl/training/algorithms/ddpg.py b/maro/rl/training/algorithms/ddpg.py index aaa0b7454..bf7b0f8d4 100644 --- a/maro/rl/training/algorithms/ddpg.py +++ b/maro/rl/training/algorithms/ddpg.py @@ -261,9 +261,6 @@ def _register_policy(self, policy: RLPolicy) -> None: assert isinstance(policy, ContinuousRLPolicy) self._policy = policy - def _preprocess_batch(self, transition_batch: TransitionBatch) -> TransitionBatch: - return transition_batch - def get_local_ops(self) -> AbsTrainOps: return DDPGOps( name=self._policy.name, diff --git a/maro/rl/training/algorithms/dqn.py b/maro/rl/training/algorithms/dqn.py index 5a4f938ab..a12ba4a11 100644 --- a/maro/rl/training/algorithms/dqn.py +++ b/maro/rl/training/algorithms/dqn.py @@ -2,12 +2,21 @@ # Licensed under the MIT license. from dataclasses import dataclass -from typing import Dict, cast +from typing import Dict, Tuple, cast +import numpy as np import torch from maro.rl.policy import RLPolicy, ValueBasedPolicy -from maro.rl.training import AbsTrainOps, BaseTrainerParams, RandomReplayMemory, RemoteOps, SingleAgentTrainer, remote +from maro.rl.training import ( + AbsTrainOps, + BaseTrainerParams, + PrioritizedReplayMemory, + RandomReplayMemory, + RemoteOps, + SingleAgentTrainer, + remote, +) from maro.rl.utils import TransitionBatch, get_torch_device, ndarray_to_tensor from maro.utils import clone @@ -15,6 +24,9 @@ @dataclass class DQNParams(BaseTrainerParams): """ + use_prioritized_replay (bool, default=False): Whether to use prioritized replay memory. + alpha (float, default=0.4): Alpha in prioritized replay memory. + beta (float, default=0.6): Beta in prioritized replay memory. num_epochs (int, default=1): Number of training epochs. update_target_every (int, default=5): Number of gradient steps between target model updates. soft_update_coef (float, default=0.1): Soft update coefficient, e.g., @@ -27,11 +39,13 @@ class DQNParams(BaseTrainerParams): sequentially with wrap-around. """ + use_prioritized_replay: bool = False + alpha: float = 0.4 + beta: float = 0.6 num_epochs: int = 1 update_target_every: int = 5 soft_update_coef: float = 0.1 double: bool = False - random_overwrite: bool = False class DQNOps(AbsTrainOps): @@ -54,20 +68,21 @@ def __init__( self._reward_discount = reward_discount self._soft_update_coef = params.soft_update_coef self._double = params.double - self._loss_func = torch.nn.MSELoss() self._target_policy: ValueBasedPolicy = clone(self._policy) self._target_policy.set_name(f"target_{self._policy.name}") self._target_policy.eval() - def _get_batch_loss(self, batch: TransitionBatch) -> torch.Tensor: + def _get_batch_loss(self, batch: TransitionBatch, weight: np.ndarray) -> Tuple[torch.Tensor, torch.Tensor]: """Compute the loss of the batch. Args: batch (TransitionBatch): Batch. + weight (np.ndarray): Weight of each data entry. Returns: loss (torch.Tensor): The loss of the batch. + td_error (torch.Tensor): TD-error of the batch. """ assert isinstance(batch, TransitionBatch) assert isinstance(self._policy, ValueBasedPolicy) @@ -79,19 +94,21 @@ def _get_batch_loss(self, batch: TransitionBatch) -> torch.Tensor: rewards = ndarray_to_tensor(batch.rewards, device=self._device) terminals = ndarray_to_tensor(batch.terminals, device=self._device).float() + weight = ndarray_to_tensor(weight, device=self._device) + with torch.no_grad(): if self._double: self._policy.exploit() actions_by_eval_policy = self._policy.get_actions_tensor(next_states) next_q_values = self._target_policy.q_values_tensor(next_states, actions_by_eval_policy) else: - self._target_policy.exploit() - actions = self._target_policy.get_actions_tensor(next_states) - next_q_values = self._target_policy.q_values_tensor(next_states, actions) + next_q_values = self._target_policy.q_values_for_all_actions_tensor(next_states).max(dim=1)[0] target_q_values = (rewards + self._reward_discount * (1 - terminals) * next_q_values).detach() q_values = self._policy.q_values_tensor(states, actions) - return self._loss_func(q_values, target_q_values) + td_error = target_q_values - q_values + + return (td_error.pow(2) * weight).mean(), td_error @remote def get_batch_grad(self, batch: TransitionBatch) -> Dict[str, torch.Tensor]: @@ -103,7 +120,8 @@ def get_batch_grad(self, batch: TransitionBatch) -> Dict[str, torch.Tensor]: Returns: grad (torch.Tensor): The gradient of the batch. """ - return self._policy.get_gradients(self._get_batch_loss(batch)) + loss, _ = self._get_batch_loss(batch) + return self._policy.get_gradients(loss) def update_with_grad(self, grad_dict: dict) -> None: """Update the network with remotely computed gradients. @@ -114,14 +132,20 @@ def update_with_grad(self, grad_dict: dict) -> None: self._policy.train() self._policy.apply_gradients(grad_dict) - def update(self, batch: TransitionBatch) -> None: + def update(self, batch: TransitionBatch, weight: np.ndarray) -> np.ndarray: """Update the network using a batch. Args: batch (TransitionBatch): Batch. + weight (np.ndarray): Weight of each data entry. + + Returns: + td_errors (np.ndarray) """ self._policy.train() - self._policy.train_step(self._get_batch_loss(batch)) + loss, td_error = self._get_batch_loss(batch, weight) + self._policy.train_step(loss) + return td_error.detach().numpy() def get_non_policy_state(self) -> dict: return { @@ -168,20 +192,27 @@ def __init__( def build(self) -> None: self._ops = cast(DQNOps, self.get_ops()) - self._replay_memory = RandomReplayMemory( - capacity=self._replay_memory_capacity, - state_dim=self._ops.policy_state_dim, - action_dim=self._ops.policy_action_dim, - random_overwrite=self._params.random_overwrite, - ) + + if self._params.use_prioritized_replay: + self._replay_memory = PrioritizedReplayMemory( + capacity=self._replay_memory_capacity, + state_dim=self._ops.policy_state_dim, + action_dim=self._ops.policy_action_dim, + alpha=self._params.alpha, + beta=self._params.beta, + ) + else: + self._replay_memory = RandomReplayMemory( + capacity=self._replay_memory_capacity, + state_dim=self._ops.policy_state_dim, + action_dim=self._ops.policy_action_dim, + random_overwrite=False, + ) def _register_policy(self, policy: RLPolicy) -> None: assert isinstance(policy, ValueBasedPolicy) self._policy = policy - def _preprocess_batch(self, transition_batch: TransitionBatch) -> TransitionBatch: - return transition_batch - def get_local_ops(self) -> AbsTrainOps: return DQNOps( name=self._policy.name, @@ -191,13 +222,24 @@ def get_local_ops(self) -> AbsTrainOps: params=self._params, ) - def _get_batch(self, batch_size: int = None) -> TransitionBatch: - return self._replay_memory.sample(batch_size if batch_size is not None else self._batch_size) + def _get_batch(self, batch_size: int = None) -> Tuple[TransitionBatch, np.ndarray, np.ndarray]: + indexes = self.replay_memory.get_sample_indexes(batch_size or self._batch_size) + batch = self.replay_memory.sample_by_indexes(indexes) + + if self._params.use_prioritized_replay: + weight = cast(PrioritizedReplayMemory, self.replay_memory).get_weight(indexes) + else: + weight = np.ones(len(indexes)) + + return batch, indexes, weight def train_step(self) -> None: assert isinstance(self._ops, DQNOps) for _ in range(self._params.num_epochs): - self._ops.update(self._get_batch()) + batch, indexes, weight = self._get_batch() + td_error = self._ops.update(batch, weight) + if self._params.use_prioritized_replay: + cast(PrioritizedReplayMemory, self.replay_memory).update_weight(indexes, td_error) self._try_soft_update_target() diff --git a/maro/rl/training/algorithms/sac.py b/maro/rl/training/algorithms/sac.py index 7daf99c7d..54a6c4cbd 100644 --- a/maro/rl/training/algorithms/sac.py +++ b/maro/rl/training/algorithms/sac.py @@ -272,9 +272,6 @@ async def train_step_as_task(self) -> None: if early_stop: break - def _preprocess_batch(self, transition_batch: TransitionBatch) -> TransitionBatch: - return transition_batch - def get_local_ops(self) -> SoftActorCriticOps: return SoftActorCriticOps( name=self._policy.name, diff --git a/maro/rl/training/replay_memory.py b/maro/rl/training/replay_memory.py index da1e7d692..164c2580c 100644 --- a/maro/rl/training/replay_memory.py +++ b/maro/rl/training/replay_memory.py @@ -88,6 +88,73 @@ def get_sample_indexes(self, batch_size: int = None) -> np.ndarray: return np.random.choice(self._size, size=batch_size, replace=True) +class PriorityReplayIndexScheduler(AbsIndexScheduler): + """ + Indexer for priority replay memory: https://arxiv.org/abs/1511.05952. + + Args: + capacity (int): Maximum capacity of the replay memory. + alpha (float): Alpha (see original paper for explanation). + beta (float): Alpha (see original paper for explanation). + """ + + def __init__( + self, + capacity: int, + alpha: float, + beta: float, + ) -> None: + super(PriorityReplayIndexScheduler, self).__init__(capacity) + self._alpha = alpha + self._beta = beta + self._max_prio = self._min_prio = 1.0 + self._weights = np.zeros(capacity, dtype=np.float32) + + self._ptr = self._size = 0 + + def init_weights(self, indexes: np.ndarray) -> None: + self._weights[indexes] = self._max_prio**self._alpha + + def get_weight(self, indexes: np.ndarray) -> np.ndarray: + # important sampling weight calculation + # original formula: ((p_j/p_sum*N)**(-beta))/((p_min/p_sum*N)**(-beta)) + # simplified formula: (p_j/p_min)**(-beta) + return (self._weights[indexes] / self._min_prio) ** (-self._beta) + + def update_weight(self, indexes: np.ndarray, weight: np.ndarray) -> None: + assert indexes.shape == weight.shape + weight = np.abs(weight) + np.finfo(np.float32).eps.item() + self._weights[indexes] = weight**self._alpha + self._max_prio = max(self._max_prio, weight.max()) + self._min_prio = min(self._min_prio, weight.min()) + + def get_put_indexes(self, batch_size: int) -> np.ndarray: + if self._ptr + batch_size <= self._capacity: + indexes = np.arange(self._ptr, self._ptr + batch_size) + self._ptr += batch_size + else: + overwrites = self._ptr + batch_size - self._capacity + indexes = np.concatenate( + [ + np.arange(self._ptr, self._capacity), + np.arange(overwrites), + ], + ) + self._ptr = overwrites + + self._size = min(self._size + batch_size, self._capacity) + self.init_weights(indexes) + return indexes + + def get_sample_indexes(self, batch_size: int = None) -> np.ndarray: + assert batch_size is not None and batch_size > 0, f"Invalid batch size: {batch_size}" + assert self._size > 0, "Cannot sample from an empty memory." + + weights = self._weights[: self._size] + weights = weights / weights.sum() + return np.random.choice(np.arange(self._size), p=weights, size=batch_size, replace=True) + + class FIFOIndexScheduler(AbsIndexScheduler): """First-in-first-out index scheduler. @@ -154,11 +221,11 @@ def capacity(self) -> int: def state_dim(self) -> int: return self._state_dim - def _get_put_indexes(self, batch_size: int) -> np.ndarray: + def get_put_indexes(self, batch_size: int) -> np.ndarray: """Please refer to the doc string in AbsIndexScheduler.""" return self._idx_scheduler.get_put_indexes(batch_size) - def _get_sample_indexes(self, batch_size: int = None) -> np.ndarray: + def get_sample_indexes(self, batch_size: int = None) -> np.ndarray: """Please refer to the doc string in AbsIndexScheduler.""" return self._idx_scheduler.get_sample_indexes(batch_size) @@ -225,10 +292,10 @@ def put(self, transition_batch: TransitionBatch) -> None: if transition_batch.old_logps is not None: match_shape(transition_batch.old_logps, (batch_size,)) - self._put_by_indexes(self._get_put_indexes(batch_size), transition_batch) + self.put_by_indexes(self.get_put_indexes(batch_size), transition_batch) self._n_sample = min(self._n_sample + transition_batch.size, self._capacity) - def _put_by_indexes(self, indexes: np.ndarray, transition_batch: TransitionBatch) -> None: + def put_by_indexes(self, indexes: np.ndarray, transition_batch: TransitionBatch) -> None: """Store a transition batch into the memory at the give indexes. Args: @@ -258,7 +325,7 @@ def sample(self, batch_size: int = None) -> TransitionBatch: Returns: batch (TransitionBatch): The sampled batch. """ - indexes = self._get_sample_indexes(batch_size) + indexes = self.get_sample_indexes(batch_size) return self.sample_by_indexes(indexes) def sample_by_indexes(self, indexes: np.ndarray) -> TransitionBatch: @@ -306,6 +373,31 @@ def random_overwrite(self) -> bool: return self._random_overwrite +class PrioritizedReplayMemory(ReplayMemory): + def __init__( + self, + capacity: int, + state_dim: int, + action_dim: int, + alpha: float, + beta: float, + ) -> None: + super(PrioritizedReplayMemory, self).__init__( + capacity, + state_dim, + action_dim, + PriorityReplayIndexScheduler(capacity, alpha, beta), + ) + + def get_weight(self, indexes: np.ndarray) -> np.ndarray: + assert isinstance(self._idx_scheduler, PriorityReplayIndexScheduler) + return self._idx_scheduler.get_weight(indexes) + + def update_weight(self, indexes: np.ndarray, weight: np.ndarray) -> None: + assert isinstance(self._idx_scheduler, PriorityReplayIndexScheduler) + self._idx_scheduler.update_weight(indexes, weight) + + class FIFOReplayMemory(ReplayMemory): def __init__( self, @@ -393,9 +485,9 @@ def put(self, transition_batch: MultiTransitionBatch) -> None: assert match_shape(transition_batch.agent_states[i], (batch_size, self._agent_states_dims[i])) assert match_shape(transition_batch.next_agent_states[i], (batch_size, self._agent_states_dims[i])) - self._put_by_indexes(self._get_put_indexes(batch_size), transition_batch=transition_batch) + self.put_by_indexes(self.get_put_indexes(batch_size), transition_batch=transition_batch) - def _put_by_indexes(self, indexes: np.ndarray, transition_batch: MultiTransitionBatch) -> None: + def put_by_indexes(self, indexes: np.ndarray, transition_batch: MultiTransitionBatch) -> None: """Store a transition batch into the memory at the give indexes. Args: @@ -424,7 +516,7 @@ def sample(self, batch_size: int = None) -> MultiTransitionBatch: Returns: batch (MultiTransitionBatch): The sampled batch. """ - indexes = self._get_sample_indexes(batch_size) + indexes = self.get_sample_indexes(batch_size) return self.sample_by_indexes(indexes) def sample_by_indexes(self, indexes: np.ndarray) -> MultiTransitionBatch: diff --git a/maro/rl/training/trainer.py b/maro/rl/training/trainer.py index 774954f6c..53bd123d8 100644 --- a/maro/rl/training/trainer.py +++ b/maro/rl/training/trainer.py @@ -271,9 +271,8 @@ def record_multiple(self, env_idx: int, exp_elements: List[ExpElement]) -> None: transition_batch = self._preprocess_batch(transition_batch) self.replay_memory.put(transition_batch) - @abstractmethod def _preprocess_batch(self, transition_batch: TransitionBatch) -> TransitionBatch: - raise NotImplementedError + return transition_batch def _assert_ops_exists(self) -> None: if not self.ops: diff --git a/tests/rl/gym_wrapper/common.py b/tests/rl/gym_wrapper/common.py index 538a5f996..4a4bd12b2 100644 --- a/tests/rl/gym_wrapper/common.py +++ b/tests/rl/gym_wrapper/common.py @@ -3,12 +3,14 @@ from typing import cast +from gym import spaces + from maro.simulator import Env from tests.rl.gym_wrapper.simulator.business_engine import GymBusinessEngine env_conf = { - "topology": "Walker2d-v4", # HalfCheetah-v4, Hopper-v4, Walker2d-v4, Swimmer-v4, Ant-v4 + "topology": "CartPole-v1", # HalfCheetah-v4, Hopper-v4, Walker2d-v4, Swimmer-v4, Ant-v4, CartPole-v1 "start_tick": 0, "durations": 100000, # Set a very large number "options": {}, @@ -19,8 +21,18 @@ num_agents = len(learn_env.agent_idx_list) gym_env = cast(GymBusinessEngine, learn_env.business_engine).gym_env -gym_action_space = gym_env.action_space gym_state_dim = gym_env.observation_space.shape[0] -gym_action_dim = gym_action_space.shape[0] -action_lower_bound, action_upper_bound = gym_action_space.low, gym_action_space.high -action_limit = gym_action_space.high[0] +gym_action_space = gym_env.action_space +is_discrete = isinstance(gym_action_space, spaces.Discrete) +if is_discrete: + gym_action_space = cast(spaces.Discrete, gym_action_space) + gym_action_dim = 1 + gym_action_num = gym_action_space.n + action_lower_bound, action_upper_bound = None, None # Should never be used + action_limit = None # Should never be used +else: + gym_action_space = cast(spaces.Box, gym_action_space) + gym_action_dim = gym_action_space.shape[0] + gym_action_num = -1 # Should never be used + action_lower_bound, action_upper_bound = gym_action_space.low, gym_action_space.high + action_limit = action_upper_bound[0] diff --git a/tests/rl/gym_wrapper/env_sampler.py b/tests/rl/gym_wrapper/env_sampler.py index f95aaa546..e740bafdb 100644 --- a/tests/rl/gym_wrapper/env_sampler.py +++ b/tests/rl/gym_wrapper/env_sampler.py @@ -1,9 +1,10 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. -from typing import Any, Dict, List, Tuple, Type, Union +from typing import Any, Dict, List, Tuple, Type, Union, cast import numpy as np +from gym import spaces from maro.rl.policy.abs_policy import AbsPolicy from maro.rl.rollout import AbsEnvSampler, CacheElement @@ -40,6 +41,10 @@ def __init__( self._sample_rewards = [] self._eval_rewards = [] + gym_env = cast(GymBusinessEngine, learn_env.business_engine).gym_env + gym_action_space = gym_env.action_space + self._is_discrete = isinstance(gym_action_space, spaces.Discrete) + def _get_global_and_agent_state_impl( self, event: DecisionEvent, @@ -48,7 +53,7 @@ def _get_global_and_agent_state_impl( return None, {0: event.state} def _translate_to_env_action(self, action_dict: dict, event: Any) -> dict: - return {k: Action(v) for k, v in action_dict.items()} + return {k: Action(v.item() if self._is_discrete else v) for k, v in action_dict.items()} def _get_reward(self, env_action_dict: dict, event: Any, tick: int) -> Dict[Any, float]: be = self._env.business_engine diff --git a/tests/rl/performance.md b/tests/rl/performance.md index 75442035c..c49687750 100644 --- a/tests/rl/performance.md +++ b/tests/rl/performance.md @@ -1,11 +1,15 @@ # Performance for Gym Task Suite We benchmarked the MARO RL Toolkit implementation in Gym task suite. Some are compared to the benchmarks in -[OpenAI Spinning Up](https://spinningup.openai.com/en/latest/spinningup/bench.html#). We've tried to align the +[OpenAI Spinning Up](https://spinningup.openai.com/en/latest/spinningup/bench.html#) and [RL Baseline Zoo](https://github.com/DLR-RM/rl-baselines3-zoo/blob/master/benchmark.md). We've tried to align the hyper-parameters for these benchmarks , but limited by the environment version difference, there may be some gaps between the performance here and that in Spinning Up benchmarks. Generally speaking, the performance is comparable. -## Experimental Setting +## Compare with OpenAI Spinning Up + +We compare the performance of PPO, SAC, and DDPG in MARO with [OpenAI Spinning Up](https://spinningup.openai.com/en/latest/spinningup/bench.html#). + +### Experimental Setting The hyper-parameters are set to align with those used in [Spinning Up](https://spinningup.openai.com/en/latest/spinningup/bench.html#experiment-details): @@ -29,7 +33,7 @@ The hyper-parameters are set to align with those used in More details about the parameters can be found in *tests/rl/tasks/*. -## Performance +### Performance Five environments from the MuJoCo Gym task suite are reported in Spinning Up, they are: HalfCheetah, Hopper, Walker2d, Swimmer, and Ant. The commit id of the code used to conduct the experiments for MARO RL benchmarks is ee25ce1e97. @@ -52,3 +56,28 @@ python tests/rl/plot.py --smooth WINDOWSIZE | [**Walker2d**](https://gymnasium.farama.org/environments/mujoco/walker2d/) | ![Wab](https://spinningup.openai.com/en/latest/_images/pytorch_walker2d_performance.svg) | ![Wa1](./log/Walker2d_1.png) | ![Wa11](./log/Walker2d_11.png) | | [**Swimmer**](https://gymnasium.farama.org/environments/mujoco/swimmer/) | ![Swb](https://spinningup.openai.com/en/latest/_images/pytorch_swimmer_performance.svg) | ![Sw1](./log/Swimmer_1.png) | ![Sw11](./log/Swimmer_11.png) | | [**Ant**](https://gymnasium.farama.org/environments/mujoco/ant/) | ![Anb](https://spinningup.openai.com/en/latest/_images/pytorch_ant_performance.svg) | ![An1](./log/Ant_1.png) | ![An11](./log/Ant_11.png) | + +## Compare with RL Baseline Zoo + +[RL Baseline Zoo](https://github.com/DLR-RM/rl-baselines3-zoo/blob/master/benchmark.md) provides a comprehensive set of benchmarks for multiple algorithms and environments. +However, unlike OpenAI Spinning Up, it does not provide the complete learning curve. Instead, we can only find the final metrics in it. +We therefore leave the comparison with RL Baseline Zoo as a minor addition. + +We compare the performance of DQN with RL Baseline Zoo. + +### Experimental Setting + +- Batch size: size 64 for each gradient descent step; +- Network: size (256) with relu units; +- Performance metric: measured as the average trajectory return across the batch collected at 10 epochs; +- Total timesteps: 150,000. + +### Performance + +More details about the parameters can be found in *tests/rl/tasks/*. +Please refer to the original link of RL Baseline Zoo for the baseline metrics. + +| algo | env_id |mean_reward| +|--------|-------------------------------|----------:| +|DQN |CartPole-v1 | 500.00 | +|DQN |MountainCar-v0 | -116.90 | diff --git a/tests/rl/tasks/ac/__init__.py b/tests/rl/tasks/ac/__init__.py index 24cc961fc..d9a73ecf3 100644 --- a/tests/rl/tasks/ac/__init__.py +++ b/tests/rl/tasks/ac/__init__.py @@ -19,6 +19,7 @@ action_upper_bound, gym_action_dim, gym_state_dim, + is_discrete, learn_env, num_agents, test_env, @@ -109,6 +110,8 @@ def get_ac_trainer(name: str, state_dim: int) -> ActorCriticTrainer: ) +assert not is_discrete + algorithm = "ac" agent2policy = {agent: f"{algorithm}_{agent}.policy" for agent in learn_env.agent_idx_list} policies = [ diff --git a/tests/rl/tasks/ddpg/__init__.py b/tests/rl/tasks/ddpg/__init__.py index 861904a43..cd097ddbc 100644 --- a/tests/rl/tasks/ddpg/__init__.py +++ b/tests/rl/tasks/ddpg/__init__.py @@ -20,6 +20,7 @@ gym_action_dim, gym_action_space, gym_state_dim, + is_discrete, learn_env, num_agents, test_env, @@ -123,6 +124,8 @@ def get_ddpg_trainer(name: str, state_dim: int, action_dim: int) -> DDPGTrainer: ) +assert not is_discrete + algorithm = "ddpg" agent2policy = {agent: f"{algorithm}_{agent}.policy" for agent in learn_env.agent_idx_list} policies = [ diff --git a/tests/rl/tasks/dqn/__init__.py b/tests/rl/tasks/dqn/__init__.py new file mode 100644 index 000000000..b4b0befa7 --- /dev/null +++ b/tests/rl/tasks/dqn/__init__.py @@ -0,0 +1,104 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. +import torch +from torch.optim import Adam + +from maro.rl.exploration import LinearExploration +from maro.rl.model import DiscreteQNet, FullyConnected +from maro.rl.policy import ValueBasedPolicy +from maro.rl.rl_component.rl_component_bundle import RLComponentBundle +from maro.rl.training.algorithms import DQNParams, DQNTrainer + +from tests.rl.gym_wrapper.common import gym_action_num, gym_state_dim, is_discrete, learn_env, num_agents, test_env +from tests.rl.gym_wrapper.env_sampler import GymEnvSampler + +net_conf = { + "hidden_dims": [256], + "activation": torch.nn.ReLU, + "output_activation": None, +} +lr = 1e-3 + + +class MyQNet(DiscreteQNet): + def __init__(self, state_dim: int, action_num: int) -> None: + super(MyQNet, self).__init__(state_dim=state_dim, action_num=action_num) + + self._mlp = FullyConnected( + input_dim=state_dim, + output_dim=action_num, + **net_conf, + ) + self._optim = Adam(self._mlp.parameters(), lr=lr) + + def _get_q_values_for_all_actions(self, states: torch.Tensor) -> torch.Tensor: + return self._mlp(states) + + +def get_dqn_policy( + name: str, + state_dim: int, + action_num: int, +) -> ValueBasedPolicy: + return ValueBasedPolicy( + name=name, + q_net=MyQNet(state_dim=state_dim, action_num=action_num), + explore_strategy=LinearExploration( + num_actions=action_num, + explore_steps=10000, + start_explore_prob=1.0, + end_explore_prob=0.02, + ), + warmup=0, # TODO: check this + ) + + +def get_dqn_trainer( + name: str, +) -> DQNTrainer: + return DQNTrainer( + name=name, + params=DQNParams( + use_prioritized_replay=False, # + # alpha=0.4, + # beta=0.6, + num_epochs=50, + update_target_every=10, + soft_update_coef=1.0, + ), + replay_memory_capacity=50000, + batch_size=64, + reward_discount=1.0, + ) + + +assert is_discrete + +algorithm = "dqn" +agent2policy = {agent: f"{algorithm}_{agent}.policy" for agent in learn_env.agent_idx_list} +policies = [ + get_dqn_policy( + f"{algorithm}_{i}.policy", + state_dim=gym_state_dim, + action_num=gym_action_num, + ) + for i in range(num_agents) +] +trainers = [get_dqn_trainer(f"{algorithm}_{i}") for i in range(num_agents)] + +device_mapping = {f"{algorithm}_{i}.policy": "cuda:0" for i in range(num_agents)} if torch.cuda.is_available() else None + +rl_component_bundle = RLComponentBundle( + env_sampler=GymEnvSampler( + learn_env=learn_env, + test_env=test_env, + policies=policies, + agent2policy=agent2policy, + ), + agent2policy=agent2policy, + policies=policies, + trainers=trainers, + device_mapping=device_mapping, +) + +__all__ = ["rl_component_bundle"] diff --git a/tests/rl/tasks/dqn/config.yml b/tests/rl/tasks/dqn/config.yml new file mode 100644 index 000000000..aa3971127 --- /dev/null +++ b/tests/rl/tasks/dqn/config.yml @@ -0,0 +1,32 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +# Example RL config file for GYM scenario. +# Please refer to `maro/rl/workflows/config/template.yml` for the complete template and detailed explanations. + +job: gym_rl_workflow +scenario_path: "tests/rl/tasks/dqn" +log_path: "tests/rl/log/dqn_cartpole" +main: + num_episodes: 3000 + num_steps: 50 + eval_schedule: 50 + num_eval_episodes: 10 + min_n_sample: 1 + logging: + stdout: INFO + file: DEBUG +rollout: + logging: + stdout: INFO + file: DEBUG +training: + mode: simple + load_path: null + load_episode: null + checkpointing: + path: null + interval: 5 + logging: + stdout: INFO + file: DEBUG diff --git a/tests/rl/tasks/ppo/__init__.py b/tests/rl/tasks/ppo/__init__.py index 15fc71069..722fce328 100644 --- a/tests/rl/tasks/ppo/__init__.py +++ b/tests/rl/tasks/ppo/__init__.py @@ -11,6 +11,7 @@ action_upper_bound, gym_action_dim, gym_state_dim, + is_discrete, learn_env, num_agents, test_env, @@ -36,6 +37,8 @@ def get_ppo_trainer(name: str, state_dim: int) -> PPOTrainer: ) +assert not is_discrete + algorithm = "ppo" agent2policy = {agent: f"{algorithm}_{agent}.policy" for agent in learn_env.agent_idx_list} policies = [ diff --git a/tests/rl/tasks/sac/__init__.py b/tests/rl/tasks/sac/__init__.py index 1e033f12b..421ea4e96 100644 --- a/tests/rl/tasks/sac/__init__.py +++ b/tests/rl/tasks/sac/__init__.py @@ -24,6 +24,7 @@ gym_action_dim, gym_action_space, gym_state_dim, + is_discrete, learn_env, num_agents, test_env, @@ -133,6 +134,8 @@ def get_sac_trainer(name: str, state_dim: int, action_dim: int) -> SoftActorCrit ) +assert not is_discrete + algorithm = "sac" agent2policy = {agent: f"{algorithm}_{agent}.policy" for agent in learn_env.agent_idx_list} policies = [