forked from lilianweng/multi-armed-bandit
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsolvers.py
159 lines (119 loc) · 4.64 KB
/
solvers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
from __future__ import division
import numpy as np
import time
from scipy.stats import beta
from bandits import BernoulliBandit
class Solver(object):
def __init__(self, bandit):
"""
bandit (Bandit): the target bandit to solve.
"""
assert isinstance(bandit, BernoulliBandit)
np.random.seed(int(time.time()))
self.bandit = bandit
self.counts = [0] * self.bandit.n
self.actions = [] # A list of machine ids, 0 to bandit.n-1.
self.regret = 0. # Cumulative regret.
self.regrets = [0.] # History of cumulative regret.
def update_regret(self, i):
# i (int): index of the selected machine.
self.regret += self.bandit.best_proba - self.bandit.probas[i]
self.regrets.append(self.regret)
@property
def estimated_probas(self):
raise NotImplementedError
def run_one_step(self):
"""Return the machine index to take action on."""
raise NotImplementedError
def run(self, num_steps):
assert self.bandit is not None
for _ in range(num_steps):
i = self.run_one_step()
self.counts[i] += 1
self.actions.append(i)
self.update_regret(i)
class EpsilonGreedy(Solver):
def __init__(self, bandit, eps, init_proba=1.0):
"""
eps (float): the probability to explore at each time step.
init_proba (float): default to be 1.0; optimistic initialization
"""
super(EpsilonGreedy, self).__init__(bandit)
assert 0. <= eps <= 1.0
self.eps = eps
self.estimates = [init_proba] * self.bandit.n # Optimistic initialization
@property
def estimated_probas(self):
return self.estimates
def run_one_step(self):
if np.random.random() < self.eps:
# Let's do random exploration!
i = np.random.randint(0, self.bandit.n)
else:
# Pick the best one.
i = max(range(self.bandit.n), key=lambda x: self.estimates[x])
r = self.bandit.generate_reward(i)
self.estimates[i] += 1. / (self.counts[i] + 1) * (r - self.estimates[i])
return i
class UCB1(Solver):
def __init__(self, bandit, init_proba=1.0):
super(UCB1, self).__init__(bandit)
self.t = 0
self.estimates = [init_proba] * self.bandit.n
@property
def estimated_probas(self):
return self.estimates
def run_one_step(self):
self.t += 1
# Pick the best one with consideration of upper confidence bounds.
i = max(range(self.bandit.n), key=lambda x: self.estimates[x] + np.sqrt(
2 * np.log(self.t) / (1 + self.counts[x])))
r = self.bandit.generate_reward(i)
self.estimates[i] += 1. / (self.counts[i] + 1) * (r - self.estimates[i])
return i
class BayesianUCB(Solver):
"""Assuming Beta prior."""
def __init__(self, bandit, c=3, init_a=1, init_b=1):
"""
c (float): how many standard dev to consider as upper confidence bound.
init_a (int): initial value of a in Beta(a, b).
init_b (int): initial value of b in Beta(a, b).
"""
super(BayesianUCB, self).__init__(bandit)
self.c = c
self._as = [init_a] * self.bandit.n
self._bs = [init_b] * self.bandit.n
@property
def estimated_probas(self):
return [self._as[i] / float(self._as[i] + self._bs[i]) for i in range(self.bandit.n)]
def run_one_step(self):
# Pick the best one with consideration of upper confidence bounds.
i = max(
range(self.bandit.n),
key=lambda x: self._as[x] / float(self._as[x] + self._bs[x]) + beta.std(
self._as[x], self._bs[x]) * self.c
)
r = self.bandit.generate_reward(i)
# Update Gaussian posterior
self._as[i] += r
self._bs[i] += (1 - r)
return i
class ThompsonSampling(Solver):
def __init__(self, bandit, init_a=1, init_b=1):
"""
init_a (int): initial value of a in Beta(a, b).
init_b (int): initial value of b in Beta(a, b).
"""
super(ThompsonSampling, self).__init__(bandit)
self._as = [init_a] * self.bandit.n
self._bs = [init_b] * self.bandit.n
@property
def estimated_probas(self):
return [self._as[i] / (self._as[i] + self._bs[i]) for i in range(self.bandit.n)]
def run_one_step(self):
samples = [np.random.beta(self._as[x], self._bs[x]) for x in range(self.bandit.n)]
i = max(range(self.bandit.n), key=lambda x: samples[x])
r = self.bandit.generate_reward(i)
self._as[i] += r
self._bs[i] += (1 - r)
return i