forked from beef-broccoli/deebo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathalgos_testfunc.py
220 lines (171 loc) · 7.98 KB
/
algos_testfunc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
import numpy as np
import pandas as pd
from tqdm import tqdm
import utils
def test_algorithm_regret(algo, arms, num_sims, horizon):
"""
testing function for regret-type bandit algorithms
Parameters
----------
algo: algos_regret.RegretAlgorithm
a regret bandit algorithm
arms: list of arms
a list of arms from arms.py
num_sims: int
total number of simulations
horizon: int
maximum time horizon for each simulation
Returns
-------
dataframe of testing result logs
"""
cols = ['num_sims', 'horizon', 'chosen_arm', 'reward', 'cumulative_reward']
ar = np.zeros((num_sims*horizon, len(cols)))
for sim in tqdm(range(num_sims), leave=False):
algo.reset(len(arms))
cumulative_reward = 0
for t in range(horizon):
chosen_arm = algo.select_next_arm() # algorithm selects an arm
reward = arms[chosen_arm].draw() # chosen arm returns reward
cumulative_reward = cumulative_reward + reward # calculate cumulative reward over time horizon
algo.update(chosen_arm, reward) # algorithm updates chosen arm with reward
ar[sim*horizon+t, :] = [sim, t, chosen_arm, reward, cumulative_reward] # logs info
df = pd.DataFrame(ar, columns=cols)
return df
def test_algorithm_regret_multidraw(algo, arms, num_sims, horizon, n_exps=1):
"""
For algorithms that use sampling (like Thompson sampling), initialize one algorithm and propose multiple experiments
at each round to conduct batch experiments.
Parameters
----------
algo: RegretAlgorithm object
arms: list of Arm object
num_sims: int
number of simulations
horizon: int
time horizon for each simulation
n_exps: int
number of experiments per batch
Returns
-------
dataframe of testing result logs
"""
n_rounds = horizon // int(n_exps) # num of complete rounds
n_residual = horizon % int(n_exps) # residual experiments that need to be handled
cols = ['num_sims', 'round', 'horizon', 'chosen_arm', 'reward', 'cumulative_reward']
ar = np.zeros((num_sims*horizon, len(cols)))
for sim in tqdm(range(num_sims), leave=False):
algo.reset(len(arms))
cumulative_reward = 0
t = 0
for r in range(n_rounds+1):
chosen_arms = []
if r == n_rounds:
n = n_residual # last round, using residual experiments
else:
n = n_exps # first n_rounds rounds, each batch has n_exps exps
for e in range(n):
chosen_arms.append(algo.select_next_arm()) # for each round, select next arm n_exps times
for chosen_arm in chosen_arms:
reward = arms[chosen_arm].draw()
algo.update(chosen_arm, reward)
cumulative_reward = cumulative_reward + reward
ar[sim*horizon+t, :] = [sim, r, t, chosen_arm, reward, cumulative_reward] # logs info
t = t + 1
df = pd.DataFrame(ar, columns=cols)
return df
def test_algorithm_regret_multialgos(algo_list, arms, num_sims, horizon):
"""
if a batch of n experiments is desired, initialize n experiments, and at each round each algorithm proposes an
experiment
Parameters
----------
algo_list: Collection
list of algorithms
arms: list of Arm objects
num_sims: int
number of simulations
horizon: int
time horizon for each simulation
Returns
-------
dataframe of testing result logs
"""
# designed for ucb-type algorithms, with external exploration round
n_rounds = (horizon-len(arms)) // len(algo_list) # num of complete rounds
n_residual = (horizon-len(arms)) % len(algo_list) # residual experiments that need to be handled
if n_residual > 0:
n_rounds = n_rounds + 1
# for logging
cols = ['num_sims', 'round', 'horizon', 'chosen_arm', 'reward', 'cumulative_reward', 'by_algo']
ar = np.zeros((num_sims*horizon, len(cols)))
for sim in tqdm(range(num_sims), leave=False):
algos = algo_list
for algo in algos:
algo.reset(len(arms))
cumulative_reward = 0
t = 0
# initial exploration round
exploration_reward = [arm.draw() for arm in arms]
for ii in range(len(exploration_reward)):
for algo in algos:
algo.update(ii, exploration_reward[ii])
cumulative_reward = cumulative_reward + exploration_reward[ii]
ar[sim*horizon+t, :] = [sim, -1, t, ii, exploration_reward[ii], cumulative_reward, -1]
t = t + 1
# now all ucb algos updated with exploration result, ready for acquisition
for r in range(n_rounds):
if r == n_rounds-1 and n_residual > 0: # last extra round, dealing with residual experiments
maxes = [max(algo.emp_means) for algo in algos] # the highest emp mean identified for any arm for each algo
indexes = np.argsort(maxes)[-n_residual:]
algos = [algos[i] for i in indexes]
# each algorithm selects one option
chosen_arms = list(map(lambda x: x.select_next_arm(), algos))
rewards = list(map(lambda x: arms[x].draw(), chosen_arms))
for ii in range(len(chosen_arms)):
for algo in algos:
algo.update(chosen_arms[ii], rewards[ii])
cumulative_reward = cumulative_reward + rewards[ii]
if r == n_rounds-1 and n_residual > 0:
ar[sim*horizon+t, :] = [sim, r, t, chosen_arms[ii], rewards[ii], cumulative_reward, indexes[ii]]
else:
ar[sim*horizon+t, :] = [sim, r, t, chosen_arms[ii], rewards[ii], cumulative_reward, ii] # logs info
t = t+1 # advance time point
return pd.DataFrame(ar, columns=cols)
def test_algorithm_arm(algo, arms, num_sims, max_horizon, n_candidates=1):
"""
Parameters
----------
algo: algos_arm.EliminationAlgorithm()
arms
num_sims
horizon
Returns
-------
acquisition history (if sim is terminated by algo before specified time limit, all empty entries are removed)
best arms identified by algo (# specified by user, padded with -1) (shape: n_sim, n_arms)
rankings based on # of samples at the end of each sim (shape: n_sim, n_arms)
the round at which simulation terminates (shape: n_sim, 1)
"""
cols = ['num_sims', 'horizon', 'chosen_arm', 'reward', 'cumulative_reward']
ar = np.negative(np.ones((num_sims*max_horizon, len(cols))))
best_arms = np.negative(np.ones((num_sims, len(arms)))) # -1 to distinguish empty ones; could initialize smaller with n_candidates
rankings = np.negative(np.ones((num_sims, len(arms)))) # rankings at the end of each simulations
termination_round = np.negative(np.ones((num_sims,))) # the round where each simulation terminates
for sim in tqdm(range(num_sims), leave=False):
algo.reset()
cumulative_reward = 0
for t in range(max_horizon):
chosen_arm = algo.select_next_arm() # algorithm selects an arm
if chosen_arm is None: # no next experiment; optimal arm has been found
termination_round[sim] = t-1
break
reward = arms[chosen_arm].draw() # chosen arm returns reward
cumulative_reward = cumulative_reward + reward # calculate cumulative reward over time horizon
algo.update(chosen_arm, reward) # algorithm updates chosen arm with reward
ar[sim*max_horizon+t, :] = [sim, t, chosen_arm, reward, cumulative_reward] # logs info
best_arms[sim, :] = utils.fill_list(algo.best_arms, len(arms), -1)
rankings[sim, :] = algo.rankings
# remove all zero rows: time horizons that are not used because requested # of candidates are found
ar = ar[~np.all(ar==-1, axis=1)]
return pd.DataFrame(ar, columns=cols), pd.DataFrame(best_arms), pd.DataFrame(rankings), pd.DataFrame(termination_round)