forked from yandexdataschool/Practical_RL
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
87 lines (69 loc) · 2.3 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import numpy as np
import psutil
from scipy.signal import fftconvolve, gaussian
def get_cum_discounted_rewards(rewards, gamma):
"""
evaluates cumulative discounted rewards:
r_t + gamma * r_{t+1} + gamma^2 * r_{t_2} + ...
"""
cum_rewards = []
cum_rewards.append(rewards[-1])
for r in reversed(rewards[:-1]):
cum_rewards.insert(0, r + gamma * cum_rewards[0])
return cum_rewards
def play_and_log_episode(env, agent, gamma=0.99, t_max=10000):
"""
always greedy
"""
states = []
v_mc = []
v_agent = []
q_spreads = []
td_errors = []
rewards = []
s = env.reset()
for step in range(t_max):
states.append(s)
qvalues = agent.get_qvalues([s])
max_q_value, min_q_value = np.max(qvalues), np.min(qvalues)
v_agent.append(max_q_value)
q_spreads.append(max_q_value - min_q_value)
if step > 0:
td_errors.append(
np.abs(rewards[-1] + gamma * v_agent[-1] - v_agent[-2]))
action = qvalues.argmax(axis=-1)[0]
s, r, done, _ = env.step(action)
rewards.append(r)
if done:
break
td_errors.append(np.abs(rewards[-1] + gamma * v_agent[-1] - v_agent[-2]))
v_mc = get_cum_discounted_rewards(rewards, gamma)
return_pack = {
'states': np.array(states),
'v_mc': np.array(v_mc),
'v_agent': np.array(v_agent),
'q_spreads': np.array(q_spreads),
'td_errors': np.array(td_errors),
'rewards': np.array(rewards),
'episode_finished': np.array(done)
}
return return_pack
def img_by_obs(obs, state_dim):
"""
Unwraps obs by channels.
observation is of shape [c, h=w, w=h]
"""
return obs.reshape([-1, state_dim[2]])
def is_enough_ram(min_available_gb=0.1):
mem = psutil.virtual_memory()
return mem.available >= min_available_gb * (1024 ** 3)
def linear_decay(init_val, final_val, cur_step, total_steps):
if cur_step >= total_steps:
return final_val
return (init_val * (total_steps - cur_step) +
final_val * cur_step) / total_steps
def smoothen(values):
kernel = gaussian(100, std=100)
# kernel = np.concatenate([np.arange(100), np.arange(99, -1, -1)])
kernel = kernel / np.sum(kernel)
return fftconvolve(values, kernel, 'valid')