Skip to content

Commit

Permalink
Add files via upload
Browse files Browse the repository at this point in the history
  • Loading branch information
pythonlessons authored Oct 7, 2020
1 parent 2402f74 commit 3576709
Showing 1 changed file with 234 additions and 0 deletions.
234 changes: 234 additions & 0 deletions 02_CartPole-reinforcement-learning_DDQN/Cartpole_DDQN_TF2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
# Tutorial by www.pylessons.com
# Tutorial written for - Tensorflow 2.3.1

import os
import random
import gym
import pylab
import numpy as np
from collections import deque
import tensorflow as tf
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.optimizers import Adam, RMSprop


def OurModel(input_shape, action_space):
X_input = Input(input_shape)
X = X_input

# 'Dense' is the basic form of a neural network layer
# Input Layer of state size(4) and Hidden Layer with 512 nodes
X = Dense(512, input_shape=input_shape, activation="relu", kernel_initializer='he_uniform')(X)

# Hidden layer with 256 nodes
X = Dense(256, activation="relu", kernel_initializer='he_uniform')(X)

# Hidden layer with 64 nodes
X = Dense(64, activation="relu", kernel_initializer='he_uniform')(X)

# Output Layer with # of actions: 2 nodes (left, right)
X = Dense(action_space, activation="linear", kernel_initializer='he_uniform')(X)

model = Model(inputs = X_input, outputs = X)
model.compile(loss="mean_squared_error", optimizer=RMSprop(lr=0.00025, rho=0.95, epsilon=0.01), metrics=["accuracy"])

model.summary()
return model

class DQNAgent:
def __init__(self, env_name):
self.env_name = env_name
self.env = gym.make(env_name)
self.env.seed(0)
# by default, CartPole-v1 has max episode steps = 500
self.env._max_episode_steps = 4000
self.state_size = self.env.observation_space.shape[0]
self.action_size = self.env.action_space.n

self.EPISODES = 1000
self.memory = deque(maxlen=2000)

self.gamma = 0.95 # discount rate
self.epsilon = 1.0 # exploration rate
self.epsilon_min = 0.01
self.epsilon_decay = 0.999
self.batch_size = 32
self.train_start = 1000

# defining model parameters
self.ddqn = True
self.Soft_Update = False

self.TAU = 0.1 # target network soft update hyperparameter

self.Save_Path = 'Models'
self.scores, self.episodes, self.average = [], [], []

if self.ddqn:
print("----------Double DQN--------")
self.Model_name = os.path.join(self.Save_Path,"DDQN_"+self.env_name+".h5")
else:
print("-------------DQN------------")
self.Model_name = os.path.join(self.Save_Path,"DQN_"+self.env_name+".h5")

# create main model
self.model = OurModel(input_shape=(self.state_size,), action_space = self.action_size)
self.target_model = OurModel(input_shape=(self.state_size,), action_space = self.action_size)

# after some time interval update the target model to be same with model
def update_target_model(self):
if not self.Soft_Update and self.ddqn:
self.target_model.set_weights(self.model.get_weights())
return
if self.Soft_Update and self.ddqn:
q_model_theta = self.model.get_weights()
target_model_theta = self.target_model.get_weights()
counter = 0
for q_weight, target_weight in zip(q_model_theta, target_model_theta):
target_weight = target_weight * (1-self.TAU) + q_weight * self.TAU
target_model_theta[counter] = target_weight
counter += 1
self.target_model.set_weights(target_model_theta)

def remember(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))
if len(self.memory) > self.train_start:
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay

def act(self, state):
if np.random.random() <= self.epsilon:
return random.randrange(self.action_size)
else:
return np.argmax(self.model.predict(state))

def replay(self):
if len(self.memory) < self.train_start:
return
# Randomly sample minibatch from the memory
minibatch = random.sample(self.memory, min(self.batch_size, self.batch_size))

state = np.zeros((self.batch_size, self.state_size))
next_state = np.zeros((self.batch_size, self.state_size))
action, reward, done = [], [], []

# do this before prediction
# for speedup, this could be done on the tensor level
# but easier to understand using a loop
for i in range(self.batch_size):
state[i] = minibatch[i][0]
action.append(minibatch[i][1])
reward.append(minibatch[i][2])
next_state[i] = minibatch[i][3]
done.append(minibatch[i][4])

# do batch prediction to save speed
target = self.model.predict(state)
target_next = self.model.predict(next_state)
target_val = self.target_model.predict(next_state)

for i in range(len(minibatch)):
# correction on the Q value for the action used
if done[i]:
target[i][action[i]] = reward[i]
else:
if self.ddqn: # Double - DQN
# current Q Network selects the action
# a'_max = argmax_a' Q(s', a')
a = np.argmax(target_next[i])
# target Q Network evaluates the action
# Q_max = Q_target(s', a'_max)
target[i][action[i]] = reward[i] + self.gamma * (target_val[i][a])
else: # Standard - DQN
# DQN chooses the max Q value among next actions
# selection and evaluation of action is on the target Q Network
# Q_max = max_a' Q_target(s', a')
target[i][action[i]] = reward[i] + self.gamma * (np.amax(target_next[i]))

# Train the Neural Network with batches
self.model.fit(state, target, batch_size=self.batch_size, verbose=0)


def load(self, name):
self.model = load_model(name)

def save(self, name):
self.model.save(name)

pylab.figure(figsize=(18, 9))
def PlotModel(self, score, episode):
self.scores.append(score)
self.episodes.append(episode)
self.average.append(sum(self.scores) / len(self.scores))
pylab.plot(self.episodes, self.average, 'r')
pylab.plot(self.episodes, self.scores, 'b')
pylab.ylabel('Score', fontsize=18)
pylab.xlabel('Steps', fontsize=18)
dqn = 'DQN_'
softupdate = ''
if self.ddqn:
dqn = 'DDQN_'
if self.Soft_Update:
softupdate = '_soft'
try:
pylab.savefig(dqn+self.env_name+softupdate+".png")
except OSError:
pass

return str(self.average[-1])[:5]

def run(self):
for e in range(self.EPISODES):
state = self.env.reset()
state = np.reshape(state, [1, self.state_size])
done = False
i = 0
while not done:
#self.env.render()
action = self.act(state)
next_state, reward, done, _ = self.env.step(action)
next_state = np.reshape(next_state, [1, self.state_size])
if not done or i == self.env._max_episode_steps-1:
reward = reward
else:
reward = -100
self.remember(state, action, reward, next_state, done)
state = next_state
i += 1
if done:
# every step update target model
self.update_target_model()

# every episode, plot the result
average = self.PlotModel(i, e)

print("episode: {}/{}, score: {}, e: {:.2}, average: {}".format(e, self.EPISODES, i, self.epsilon, average))
if i == self.env._max_episode_steps:
print("Saving trained model as cartpole-ddqn.h5")
#self.save("cartpole-ddqn.h5")
break
self.replay()

def test(self):
self.load("cartpole-ddqn.h5")
for e in range(self.EPISODES):
state = self.env.reset()
state = np.reshape(state, [1, self.state_size])
done = False
i = 0
while not done:
self.env.render()
action = np.argmax(self.model.predict(state))
next_state, reward, done, _ = self.env.step(action)
state = np.reshape(next_state, [1, self.state_size])
i += 1
if done:
print("episode: {}/{}, score: {}".format(e, self.EPISODES, i))
break

if __name__ == "__main__":
env_name = 'CartPole-v1'
agent = DQNAgent(env_name)
agent.run()
#agent.test()

0 comments on commit 3576709

Please sign in to comment.