-
Notifications
You must be signed in to change notification settings - Fork 0
/
agent.py
87 lines (72 loc) · 3.51 KB
/
agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import torch as T
from networks import ActorNetwork, CriticNetwork
import numpy as np
class Agent:
def __init__(self, actor_dims, critic_dims, n_actions, n_agents, agent_idx, chkpt_dir,
alpha=0.0001, beta=0.0002, fc1=128,
fc2=128, gamma=0.99, tau=0.01):
self.gamma = gamma
self.tau = tau
self.n_actions = n_actions
self.agent_name = 'agent_%s' % agent_idx
self.actor = ActorNetwork(alpha, actor_dims, fc1, fc2, n_actions,
chkpt_dir=chkpt_dir, name=self.agent_name+'_actor')
self.critic = CriticNetwork(beta, critic_dims,
fc1, fc2, n_agents, n_actions,
chkpt_dir=chkpt_dir, name=self.agent_name+'_critic')
self.target_actor = ActorNetwork(alpha, actor_dims, fc1, fc2, n_actions,
chkpt_dir=chkpt_dir,
name=self.agent_name+'_target_actor')
self.target_critic = CriticNetwork(beta, critic_dims,
fc1, fc2, n_agents, n_actions,
chkpt_dir=chkpt_dir,
name=self.agent_name+'_target_critic')
self.update_network_parameters(tau=1)
def choose_action(self, observation, time_step, evaluate=False):
state = T.tensor([observation], dtype=T.float).to(self.actor.device)
actions = self.actor.forward(state)
# exploration
max_noise = 0.75
min_noise = 0.01
decay_rate = 0.999995
noise_scale = max(min_noise, max_noise * (decay_rate ** time_step))
noise = 2 * T.rand(self.n_actions).to(self.actor.device) - 1 # [-1,1)
if not evaluate:
noise = noise_scale * noise
else:
noise = 0 * noise
action = actions + noise
action_np = action.detach().cpu().numpy()[0]
magnitude = np.linalg.norm(action_np)
if magnitude > 0.04:
action_np = action_np / magnitude * 0.04
return action_np
def update_network_parameters(self, tau=None):
if tau is None:
tau = self.tau
target_actor_params = self.target_actor.named_parameters()
actor_params = self.actor.named_parameters()
target_actor_state_dict = dict(target_actor_params)
actor_state_dict = dict(actor_params)
for name in actor_state_dict:
actor_state_dict[name] = tau*actor_state_dict[name].clone() + \
(1-tau)*target_actor_state_dict[name].clone()
self.target_actor.load_state_dict(actor_state_dict)
target_critic_params = self.target_critic.named_parameters()
critic_params = self.critic.named_parameters()
target_critic_state_dict = dict(target_critic_params)
critic_state_dict = dict(critic_params)
for name in critic_state_dict:
critic_state_dict[name] = tau*critic_state_dict[name].clone() + \
(1-tau)*target_critic_state_dict[name].clone()
self.target_critic.load_state_dict(critic_state_dict)
def save_models(self):
self.actor.save_checkpoint()
self.target_actor.save_checkpoint()
self.critic.save_checkpoint()
self.target_critic.save_checkpoint()
def load_models(self):
self.actor.load_checkpoint()
self.target_actor.load_checkpoint()
self.critic.load_checkpoint()
self.target_critic.load_checkpoint()