-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdqnAgents.py
355 lines (296 loc) · 13.4 KB
/
dqnAgents.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
from collections import deque
import numpy
from keras import Sequential
from keras.layers import Dense, Convolution2D, Flatten, np
from keras.optimizers import Adam
from game import *
from graphicsDisplay import getFrame, saveFrame
from learningAgents import ReinforcementAgent
from featureExtractors import *
import random,util,math,keras
from keras.optimizers import SGD
from skimage.transform import resize
import tensorflow as tf
PACMAN_ACTIONS = ['North', 'South', 'East', 'West', 'Stop']
class PacmanDQNAgent(ReinforcementAgent):
"""
Q-Learning Agent
Functions you should fill in:
- computeValueFromQValues
- computeActionFromQValues
- getQValue
- getAction
- update
Instance variables you have access to
- self.epsilon (exploration prob)
- self.alpha (learning rate)
- self.discount (discount rate)
Functions you should use
- self.getLegalActions(state)
which returns legal actions for a state
"""
def __init__(self, **args):
"You can initialize Q-values here..."
ReinforcementAgent.__init__(self, **args)
"*** YOUR CODE HERE ***"
self.state_size = 294
#self.state_size = 1320
self.action_size = 5
self.epsilon_min = 0.1
self.epsilon_decay = 0.999
self.count = 0
self.init = 0
self.epsilon = 1.0
self.alpha = 0.001
#self.alpha = 1e-6
self.discount = 0.95
self.batch_size = 35
self.alpha_decay = 0.01
self.frame_width = 85
self.frame_height = 85
self.memory = deque(maxlen=20000)
self.model = self._build_model()
# def getQValue(self, state, action):
# """
# Returns Q(state,action)
# Should return 0.0 if we have never seen a state
# or the Q node value otherwise
# """
# "*** YOUR CODE HERE ***"
# #return self.QValueCounter[(state, action)]
#
#
# def computeValueFromQValues(self, state):
# """
# Returns max_action Q(state,action)
# where the max is over legal actions. Note that if
# there are no legal actions, which is the case at the
# terminal state, you should return a value of 0.0.
# """
# "*** YOUR CODE HERE ***"
# if not self.getLegalActions(state): return 0
#
# best_action = self.computeActionFromQValues(state)
# return self.getQValue(state, best_action)
#
# def computeActionFromQValues(self, state):
# """
# Compute the best action to take in a state. Note that if there
# are no legal actions, which is the case at the terminal state,
# you should return None.
# """
# "*** YOUR CODE HERE ***"
# if not self.getLegalActions(state): return None
#
# best_action = None
# best_value = float('-inf')
# for action in self.getLegalActions(state):
# if self.getQValue(state, action) > best_value:
# best_value = self.getQValue(state, action)
# best_action = action
# return best_action
def getAction(self, state):
"""
Compute the action to take in the current state. With
probability self.epsilon, we should take a random action and
take the best policy action otherwise. Note that if there are
no legal actions, which is the case at the terminal state, you
should choose None as the action.
HINT: You might want to use util.flipCoin(prob)
HINT: To pick randomly from a list, use random.choice(list)
"""
# Pick Action
legalActions = self.getLegalActions(state)
# if 'Stop' in legalActions:
# legalActions.remove('Stop')
action = None
"*** YOUR CODE HERE ***"
if not self.getLegalActions(state): return action # Terminal State, return None
self.image = getFrame()
self.image = np.array(self.image)
self.image = resize(self.image, (self.frame_width, self.frame_height))
self.image = np.reshape(self.image, [1,self.frame_height, self.frame_width,3])
self.image = np.uint8(self.image)
#print 'Epsilon value: ', self.epsilon
if self.epsilon > random.random():
action = random.choice(legalActions) # Explore
else:
#action = self.computeActionFromQValues(state) # Exploit
#state_matrix = self.getStateMatrices(state)
#state_matrix = np.reshape(np.array(state_matrix), [1, self.state_size])
#state_matrix = np.reshape(state_matrix, (1, self.frame_width, self.frame_height))
act_values = self.model.predict(self.image)
action = PACMAN_ACTIONS[(np.argmax(act_values[0]))] # returns action
if action not in legalActions:
action = 'Stop'
#action = random.choice(legalActions)
self.doAction(state, action)
return action
def update(self, state, action, nextState, reward):
"""
The parent class calls this to observe a
state = action => nextState and reward transition.
You should do your Q-Value update here
NOTE: You should never call this function,
it will be called on your behalf
"""
"*** YOUR CODE HERE ***"
self.nextImage = getFrame()
self.nextImage = numpy.array(self.nextImage)
self.nextImage = resize(self.nextImage, (self.frame_width, self.frame_height))
self.nextImage = np.reshape(self.nextImage, [1, self.frame_height, self.frame_width, 3])
self.nextImage = np.uint8(self.nextImage)
#new code
if not self.getLegalActions(state):
done = True
else:
done = False
# state_matrix = self.getStateMatrices(state)
# nextState_matrix = self.getStateMatrices(nextState)
# state_matrix = np.reshape(state_matrix, [1, self.state_size])
# nextState_matrix = np.reshape(nextState_matrix, [1, self.state_size])
self.remember(self.image, action, reward, self.nextImage, done)
# entrenamos la red neuronal solo mientras estamos en entrenamiento
#if self.episodesSoFar < self.numTraining:
#if self.episodesSoFar < self.numTraining:
if len(self.memory) > 2*self.batch_size:
self.replay(self.batch_size)
# def getPolicy(self, state):
# return self.computeActionFromQValues(state)
#
# def getValue(self, state):
# return self.computeValueFromQValues(state)
# def final(self, state):
# "Called at the end of each game."
# # call the super-class final method
# ReinforcementAgent.final(self, state)
#
# state_matrix = self.getStateMatrices(state)
# nextState_matrix = self.getStateMatrices(self.lastState)
# state_matrix = np.reshape(state_matrix, [1, self.state_size])
# nextState_matrix = np.reshape(nextState_matrix, [1, self.state_size])
#
# self.remember(state_matrix, action, reward, nextState_matrix, done)
#
# # did we finish training?
# if self.episodesSoFar == self.numTraining:
# # you might want to print your weights here for debugging
# "*** YOUR CODE HERE ***"
# pass
def _build_model(self):
# Neural Net for Deep-Q learning Model
# model = Sequential()
# model.add(Dense(24, input_dim=self.state_size, activation='relu'))
# model.add(Dense(48, activation='relu'))
# model.add(Dense(self.action_size, activation='linear'))
# model.compile(loss='mse', optimizer=Adam(lr=self.alpha, decay=self.alpha_decay))
model = Sequential()
#STATE_LENGTH = self.state_size
FRAME_WIDTH = self.frame_width
FRAME_HEIGHT = self.frame_height
STATE_LENGTH = 3
model.add(Convolution2D(32, 8, 8, subsample=(4, 4), activation='relu',
# input_shape=(STATE_LENGTH, FRAME_WIDTH, FRAME_HEIGHT)))
input_shape=(FRAME_HEIGHT, FRAME_WIDTH, STATE_LENGTH)))
model.add(Convolution2D(64, 4, 4, subsample=(2, 2), activation='relu'))
model.add(Convolution2D(64, 3, 3, subsample=(1, 1), activation='relu'))
model.add(Flatten())
model.add(Dense(512, activation='relu'))
model.add(Dense(self.action_size))
model.compile(loss='mse', optimizer=Adam(lr=self.alpha))
return model
def remember(self, state, action, reward, next_state, done):
action_index = PACMAN_ACTIONS.index(action)
self.memory.append((state, action_index, reward, next_state, done))
def replay(self, batch_size):
x_batch, y_batch = [], []
minibatch = random.sample(self.memory, batch_size)
for state, action_index, reward, next_state, done in minibatch:
y_target = self.model.predict(state)
y_target[0][action_index] = reward if done else reward + self.discount * np.max(self.model.predict(next_state)[0])
x_batch.append(state[0])
y_batch.append(y_target[0])
self.model.fit(np.array(x_batch), np.array(y_batch), batch_size=len(x_batch), verbose=1)
if self.epsilon > self.epsilon_min:
#None
self.epsilon *= self.epsilon_decay
def getStateMatrices(self, state):
""" Return wall, ghosts, food, capsules matrices """
def getWallMatrix(state):
""" Return matrix with wall coordinates set to 1 """
width, height = state.data.layout.width, state.data.layout.height
grid = state.data.layout.walls
matrix = np.zeros((height, width), dtype=np.int8)
for i in range(grid.height):
for j in range(grid.width):
# Put cell vertically reversed in matrix
cell = 1 if grid[j][i] else 0
matrix[-1-i][j] = cell
return matrix
def getPacmanMatrix(state):
""" Return matrix with pacman coordinates set to 1 """
width, height = state.data.layout.width, state.data.layout.height
matrix = np.zeros((height, width), dtype=np.int8)
for agentState in state.data.agentStates:
if agentState.isPacman:
pos = agentState.configuration.getPosition()
cell = 1
matrix[-1-int(pos[1])][int(pos[0])] = cell
return matrix
def getGhostMatrix(state):
""" Return matrix with ghost coordinates set to 1 """
width, height = state.data.layout.width, state.data.layout.height
matrix = np.zeros((height, width), dtype=np.int8)
for agentState in state.data.agentStates:
if not agentState.isPacman:
if not agentState.scaredTimer > 0:
pos = agentState.configuration.getPosition()
cell = 1
matrix[-1-int(pos[1])][int(pos[0])] = cell
return matrix
def getScaredGhostMatrix(state):
""" Return matrix with ghost coordinates set to 1 """
width, height = state.data.layout.width, state.data.layout.height
matrix = np.zeros((height, width), dtype=np.int8)
for agentState in state.data.agentStates:
if not agentState.isPacman:
if agentState.scaredTimer > 0:
pos = agentState.configuration.getPosition()
cell = 1
matrix[-1-int(pos[1])][int(pos[0])] = cell
return matrix
def getFoodMatrix(state):
""" Return matrix with food coordinates set to 1 """
width, height = state.data.layout.width, state.data.layout.height
grid = state.data.food
matrix = np.zeros((height, width), dtype=np.int8)
for i in range(grid.height):
for j in range(grid.width):
# Put cell vertically reversed in matrix
cell = 1 if grid[j][i] else 0
matrix[-1-i][j] = cell
return matrix
def getCapsulesMatrix(state):
""" Return matrix with capsule coordinates set to 1 """
width, height = state.data.layout.width, state.data.layout.height
capsules = state.data.layout.capsules
matrix = np.zeros((height, width), dtype=np.int8)
for i in capsules:
# Insert capsule cells vertically reversed into matrix
matrix[-1-i[1], i[0]] = 1
return matrix
# Create observation matrix as a combination of
# wall, pacman, ghost, food and capsule matrices
# width, height = state.data.layout.width, state.data.layout.height
width, height = state.data.layout.width, state.data.layout.height
observation = np.zeros((6, height, width))
observation[0] = getWallMatrix(state)
observation[1] = getPacmanMatrix(state)
observation[2] = getGhostMatrix(state)
observation[3] = getScaredGhostMatrix(state)
observation[4] = getFoodMatrix(state)
observation[5] = getCapsulesMatrix(state)
#print 'observation: ', observation
#observation = np.swapaxes(observation, 0, 2)
#print 'observation: ', observation
return observation