Skip to content

Temporal difference exercises #98

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions .idea/deep-reinforcement-learning.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/inspectionProfiles/profiles_settings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 75 additions & 0 deletions temporal-difference/TD_CliffWalking_Q_Learning_Solution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from collections import defaultdict

import gym
import numpy as np

import check_test
from plot_utils import plot_values

env = gym.make('CliffWalking-v0')

def q_learning(env, num_episodes, alpha, gamma=1.0, epsilon_start=1.0):
# decide epsilon
epsilon = epsilon_start
epsilon_min = 0.1
epsilon_decay = 0.997

nA = 4

# initialize action-value function (empty dictionary of arrays)
Q = defaultdict(lambda: np.zeros(env.nA))
# initialize performance monitor
# loop over episodes
for i_episode in range(1, num_episodes + 1):

# monitor progress
# if i_episode % 1 == 0:
# print("\rEpisode {}/{}.".format(i_episode, num_episodes), end="")
#
# sys.stdout.flush()

if i_episode % 100 == 0:
print(f"Episode {i_episode}: Epsilon = {epsilon:.4f}")

# set the value of epsilon
epsilon = max(epsilon * epsilon_decay, epsilon_min)

# generate episode
state, _ = env.reset()

while True:
action = np.random.choice(np.arange(nA), p=get_probs(Q[state], epsilon, nA))

next_state, reward, terminated, truncated, _ = env.step(action) # ✅ New API
if next_state not in Q:
Q[next_state] = np.zeros(nA) # ✅ Ensure Q-value initialization

if terminated or truncated:
break

next_Q = 0 if terminated else np.max(Q[next_state])
Q[state][action] += alpha * (reward + gamma * next_Q - Q[state][action])

state = next_state
return Q

def get_probs(Q_s, epsilon, nA):
""" obtains the action probabilities corresponding to epsilon-greedy policy """
policy_states = np.ones(nA) * epsilon / nA
best_action = np.argmax(Q_s)
policy_states[best_action] = 1 - epsilon + (epsilon / nA)
return policy_states


# obtain the estimated optimal policy and corresponding action-value function
Q_q_learning = q_learning(env, 5000, .01)

# print the estimated optimal policy
policy_q_learning = np.array([np.argmax(Q_q_learning[key]) if key in Q_q_learning else -1 for key in np.arange(48)]).reshape(4,12)
check_test.run_check('td_control_check', policy_q_learning)
print("\nEstimated Optimal Policy (UP = 0, RIGHT = 1, DOWN = 2, LEFT = 3, N/A = -1):")
print(policy_q_learning)

# plot the estimated optimal state-value function
V_q_learning = ([np.max(Q_q_learning[key]) if key in Q_q_learning else 0 for key in np.arange(48)])
plot_values(V_q_learning)
104 changes: 104 additions & 0 deletions temporal-difference/TD_CliffWalking_SARSA_Solution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import sys
import gym
import numpy as np
from collections import defaultdict, deque
import matplotlib.pyplot as plt

import check_test
from plot_utils import plot_values

env = gym.make('CliffWalking-v0')

def sarsa(env, num_episodes, alpha, gamma=1.0, epsilon_start=0.5):
# decide epsilon
epsilon = epsilon_start
epsilon_min = 0.1
epsilon_decay = 0.99

# initialize action-value function (empty dictionary of arrays)
Q = defaultdict(lambda: np.zeros(env.nA))
# initialize performance monitor
# loop over episodes
for i_episode in range(1, num_episodes + 1):

# monitor progress
if i_episode % 1 == 0:
print("\rEpisode {}/{}.".format(i_episode, num_episodes), end="")
sys.stdout.flush()

# set the value of epsilon
epsilon = max(epsilon * epsilon_decay, epsilon_min)

# generate episode
episode = generate_episode(env=env, Q=Q, epsilon=epsilon, nA=4)

Q = update_q(episode, Q, alpha, gamma)
return Q


def generate_episode(env, Q, epsilon, nA):
episode = []
state, _ = env.reset()

action = np.random.choice(np.arange(nA), p=get_probs(Q[state], epsilon, nA)) if state not in Q else np.argmax(
Q[state]
)

while True:
next_state, reward, terminated, truncated, _ = env.step(action) # ✅ New API
if next_state not in Q:
Q[next_state] = np.zeros(nA) # ✅ Ensure Q-value initialization

next_action = np.random.choice(np.arange(nA), p=get_probs(Q[next_state], epsilon, nA))

episode.append((state, action, reward))

if terminated or truncated:
break

state = next_state # ✅ Track next state
action = next_action # ✅ Track next action

return episode

def get_probs(Q_s, epsilon, nA):
""" obtains the action probabilities corresponding to epsilon-greedy policy """
policy_states = np.ones(nA) * epsilon / nA
best_action = np.argmax(Q_s)
policy_states[best_action] = 1 - epsilon + (epsilon / nA)
return policy_states

def pick_action(epsilon, Q, next_state):
if np.random.rand() < epsilon:
next_action = env.action_space.sample() # Explore (random action)
else:
next_action = np.argmax(Q[next_state]) # Exploit (best action)

return next_action

def update_q(episode, Q, alpha, gamma):
""" updates the action-value function estimate using the most recent episode """
states, actions, rewards = zip(*episode)
# prepare for discounting
for i in range(len(states) - 1): # Ignore last step
state, action = states[i], actions[i]
next_state, next_action = states[i + 1], actions[i + 1] # ✅ Use episode step

old_Q = Q[state][action]
next_Q = Q[next_state][next_action] # ✅ Correct SARSA update
Q[state][action] = old_Q + alpha * (rewards[i] + gamma * next_Q - old_Q)
return Q


# obtain the estimated optimal policy and corresponding action-value function
Q_sarsa = sarsa(env, 5000, .01)

# print the estimated optimal policy
policy_sarsa = np.array([np.argmax(Q_sarsa[key]) if key in Q_sarsa else -1 for key in np.arange(48)]).reshape(4,12)
check_test.run_check('td_control_check', policy_sarsa)
print("\nEstimated Optimal Policy (UP = 0, RIGHT = 1, DOWN = 2, LEFT = 3, N/A = -1):")
print(policy_sarsa)

# plot the estimated optimal state-value function
V_sarsa = ([np.max(Q_sarsa[key]) if key in Q_sarsa else 0 for key in np.arange(48)])
plot_values(V_sarsa)
77 changes: 77 additions & 0 deletions temporal-difference/TD_CliffWalking_expected_sarsa_Solution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import sys
import gym
import numpy as np
from collections import defaultdict
import matplotlib.pyplot as plt

import check_test
from plot_utils import plot_values

env = gym.make('CliffWalking-v0')

def expected_sarsa(env, num_episodes, alpha, gamma=0.95, epsilon_start=1.0):
# Initialize epsilon and related parameters
epsilon = epsilon_start
epsilon_min = 0.1
epsilon_decay = 0.995

nA = 4 # number of actions

# Initialize the action-value function (empty dictionary of arrays)
Q = defaultdict(lambda: np.zeros(env.nA))

# Loop over episodes
for i_episode in range(1, num_episodes + 1):
if i_episode % 100 == 0:
print(f"Episode {i_episode}: Epsilon = {epsilon:.4f}")

# Decay epsilon for exploration-exploitation balance
epsilon = max(epsilon * epsilon_decay, epsilon_min)

# Generate episode
state, _ = env.reset()
action = np.random.choice(np.arange(nA), p=get_probs(Q[state], epsilon, nA)) # epsilon-greedy policy

while True:
next_state, reward, terminated, truncated, _ = env.step(action)

# Initialize Q-values for the next state if not already initialized
if next_state not in Q:
Q[next_state] = np.zeros(nA)

# Compute the expected Q-value for the next state
expected_Q = np.dot(get_probs(Q[next_state], epsilon, nA), Q[next_state])

# SARSA update
Q[state][action] += alpha * (reward + gamma * expected_Q - Q[state][action])

# If the episode ends, break out of the loop
if terminated or truncated:
break

# Transition to the next state and choose the next action using epsilon-greedy
state = next_state
action = np.random.choice(np.arange(nA), p=get_probs(Q[state], epsilon, nA))

return Q

def get_probs(Q_s, epsilon, nA):
""" Obtains the action probabilities corresponding to an epsilon-greedy policy """
policy_states = np.ones(nA) * epsilon / nA
best_action = np.argmax(Q_s)
policy_states[best_action] = 1 - epsilon + (epsilon / nA)
return policy_states


# Obtain the estimated optimal policy and corresponding action-value function using Expected SARSA
Q_expected_sarsa = expected_sarsa(env, 50000, .01)

# Print the estimated optimal policy
policy_expected_sarsa = np.array([np.argmax(Q_expected_sarsa[key]) if key in Q_expected_sarsa else -1 for key in np.arange(48)]).reshape(4, 12)
check_test.run_check('td_control_check', policy_expected_sarsa)
print("\nEstimated Optimal Policy (UP = 0, RIGHT = 1, DOWN = 2, LEFT = 3, N/A = -1):")
print(policy_expected_sarsa)

# Plot the estimated optimal state-value function
V_expected_sarsa = [np.max(Q_expected_sarsa[key]) if key in Q_expected_sarsa else 0 for key in np.arange(48)]
plot_values(V_expected_sarsa)