From d595006f82d63743f78eeb4e1de63dadcfce5b59 Mon Sep 17 00:00:00 2001
From: simondrugan16 <119412400+simondrugan16@users.noreply.github.com>
Date: Sat, 15 Feb 2025 12:59:09 +0000
Subject: [PATCH 1/2] MC blackjack and TD exercises for Sarsa, Sarsamax and
Expected Sarsa
---
monte-carlo/blackjack.py | 0
.../TD_CliffWalking_Q_Learning_Solution.py | 126 ++++++++++++++++++
.../TD_CliffWalking_SARSA_Solution.py | 126 ++++++++++++++++++
...TD_CliffWalking_expected_sarsa_Solution.py | 117 ++++++++++++++++
4 files changed, 369 insertions(+)
create mode 100644 monte-carlo/blackjack.py
create mode 100644 temporal-difference/TD_CliffWalking_Q_Learning_Solution.py
create mode 100644 temporal-difference/TD_CliffWalking_SARSA_Solution.py
create mode 100644 temporal-difference/TD_CliffWalking_expected_sarsa_Solution.py
diff --git a/monte-carlo/blackjack.py b/monte-carlo/blackjack.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/temporal-difference/TD_CliffWalking_Q_Learning_Solution.py b/temporal-difference/TD_CliffWalking_Q_Learning_Solution.py
new file mode 100644
index 000000000..d2750770a
--- /dev/null
+++ b/temporal-difference/TD_CliffWalking_Q_Learning_Solution.py
@@ -0,0 +1,126 @@
+import sys
+import gym
+import numpy as np
+from collections import defaultdict, deque
+import matplotlib.pyplot as plt
+
+import check_test
+from plot_utils import plot_values
+
+env = gym.make('CliffWalking-v0')
+
+# print(env.action_space)
+# print(env.observation_space)
+#
+# # define the optimal state-value function
+# V_opt = np.zeros((4, 12))
+# V_opt[0:13][0] = -np.arange(3, 15)[::-1]
+# V_opt[0:13][1] = -np.arange(3, 15)[::-1] + 1
+# V_opt[0:13][2] = -np.arange(3, 15)[::-1] + 2
+# V_opt[3][0] = -13
+#
+# plot_values(V_opt)
+
+
+def sarsa(env, num_episodes, alpha, gamma=1.0, epsilon_start=1.0):
+ # decide epsilon
+ epsilon = epsilon_start
+ epsilon_min = 0.1
+ epsilon_decay = 0.9999
+
+ # initialize action-value function (empty dictionary of arrays)
+ Q = defaultdict(lambda: np.zeros(env.nA))
+ # initialize performance monitor
+ # loop over episodes
+ for i_episode in range(1, num_episodes + 1):
+
+ # monitor progress
+ if i_episode % 499999 == 0:
+ print("\rEpisode {}/{}".format(i_episode, num_episodes), end="")
+ print (str(Q))
+
+
+ # set the value of epsilon
+ epsilon = max(epsilon * epsilon_decay, epsilon_min)
+
+ # generate episode
+ episode = generate_episode(env=env, Q=Q, epsilon=epsilon, nA=4)
+
+ Q = update_q(episode, Q, alpha, gamma)
+ return Q
+
+
+def generate_episode(env, Q, epsilon, nA):
+ episode = []
+ state, _ = env.reset()
+ if isinstance(state, dict):
+ state = tuple(sorted(state.items()))
+
+ action = np.random.choice(np.arange(nA),
+ p=get_probs(Q[state], epsilon, nA)) if state in Q else env.action_space.sample()
+
+ while True:
+ if isinstance(state, tuple):
+ state = state[0] # Extract actual state if (state, info) is returned
+
+ # ✅ Convert state to tuple if it’s a dictionary
+ if isinstance(state, dict):
+ state = tuple(sorted(state.items()))
+
+ next_state, reward, terminated, truncated, _ = env.step(action) # ✅ New API
+ if next_state not in Q:
+ Q[next_state] = np.zeros(nA) # ✅ Ensure Q-value initialization
+
+ next_action = np.random.choice(np.arange(nA), p=get_probs(Q[next_state], epsilon, nA))
+
+ episode.append((state, action, reward))
+
+ if terminated or truncated:
+ break
+
+ state = next_state # ✅ Track next state
+ action = next_action # ✅ Track next action
+
+ return episode
+
+def get_probs(Q_s, epsilon, nA):
+ """ obtains the action probabilities corresponding to epsilon-greedy policy """
+ policy_states = np.ones(nA) * epsilon / nA
+ best_action = np.argmax(Q_s)
+ policy_states[best_action] = 1 - epsilon + (epsilon / nA)
+ return policy_states
+
+def pick_action(epsilon, Q, next_state):
+ if np.random.rand() < epsilon:
+ next_action = env.action_space.sample() # Explore (random action)
+ else:
+ next_action = np.argmax(Q[next_state]) # Exploit (best action)
+
+ return next_action
+
+def update_q(episode, Q, alpha, gamma):
+ """ updates the action-value function estimate using the most recent episode """
+ states, actions, rewards = zip(*episode)
+ # prepare for discounting
+ for i in range(len(states) - 1): # Ignore last step
+ state, action = states[i], actions[i]
+ next_state, next_action = states[i + 1], actions[i + 1] # ✅ Use episode step
+
+ old_Q = Q[state][action]
+ next_Q = Q[next_state][next_action] # ✅ Correct SARSA update
+ Q[state][action] = old_Q + alpha * (rewards[i] + gamma * next_Q - old_Q)
+ return Q
+
+
+# obtain the estimated optimal policy and corresponding action-value function
+Q_sarsa = sarsa(env, 500000, .01)
+
+# print the estimated optimal policy
+policy_sarsa = np.array([np.argmax(Q_sarsa[key]) if key in Q_sarsa else -1 for key in np.arange(48)]).reshape(4,12)
+check_test.run_check('td_control_check', policy_sarsa)
+print("\nEstimated Optimal Policy (UP = 0, RIGHT = 1, DOWN = 2, LEFT = 3, N/A = -1):")
+print(policy_sarsa)
+
+# plot the estimated optimal state-value function
+V_sarsa = ([np.max(Q_sarsa[key]) if key in Q_sarsa else 0 for key in np.arange(48)])
+plot_values(V_sarsa)
\ No newline at end of file
diff --git a/temporal-difference/TD_CliffWalking_SARSA_Solution.py b/temporal-difference/TD_CliffWalking_SARSA_Solution.py
new file mode 100644
index 000000000..d2750770a
--- /dev/null
+++ b/temporal-difference/TD_CliffWalking_SARSA_Solution.py
@@ -0,0 +1,126 @@
+import sys
+import gym
+import numpy as np
+from collections import defaultdict, deque
+import matplotlib.pyplot as plt
+
+import check_test
+from plot_utils import plot_values
+
+env = gym.make('CliffWalking-v0')
+
+# print(env.action_space)
+# print(env.observation_space)
+#
+# # define the optimal state-value function
+# V_opt = np.zeros((4, 12))
+# V_opt[0:13][0] = -np.arange(3, 15)[::-1]
+# V_opt[0:13][1] = -np.arange(3, 15)[::-1] + 1
+# V_opt[0:13][2] = -np.arange(3, 15)[::-1] + 2
+# V_opt[3][0] = -13
+#
+# plot_values(V_opt)
+
+
+def sarsa(env, num_episodes, alpha, gamma=1.0, epsilon_start=1.0):
+ # decide epsilon
+ epsilon = epsilon_start
+ epsilon_min = 0.1
+ epsilon_decay = 0.9999
+
+ # initialize action-value function (empty dictionary of arrays)
+ Q = defaultdict(lambda: np.zeros(env.nA))
+ # initialize performance monitor
+ # loop over episodes
+ for i_episode in range(1, num_episodes + 1):
+
+ # monitor progress
+ if i_episode % 499999 == 0:
+ print("\rEpisode {}/{}".format(i_episode, num_episodes), end="")
+ print (str(Q))
+
+
+ # set the value of epsilon
+ epsilon = max(epsilon * epsilon_decay, epsilon_min)
+
+ # generate episode
+ episode = generate_episode(env=env, Q=Q, epsilon=epsilon, nA=4)
+
+ Q = update_q(episode, Q, alpha, gamma)
+ return Q
+
+
+def generate_episode(env, Q, epsilon, nA):
+ episode = []
+ state, _ = env.reset()
+ if isinstance(state, dict):
+ state = tuple(sorted(state.items()))
+
+ action = np.random.choice(np.arange(nA),
+ p=get_probs(Q[state], epsilon, nA)) if state in Q else env.action_space.sample()
+
+ while True:
+ if isinstance(state, tuple):
+ state = state[0] # Extract actual state if (state, info) is returned
+
+ # ✅ Convert state to tuple if it’s a dictionary
+ if isinstance(state, dict):
+ state = tuple(sorted(state.items()))
+
+ next_state, reward, terminated, truncated, _ = env.step(action) # ✅ New API
+ if next_state not in Q:
+ Q[next_state] = np.zeros(nA) # ✅ Ensure Q-value initialization
+
+ next_action = np.random.choice(np.arange(nA), p=get_probs(Q[next_state], epsilon, nA))
+
+ episode.append((state, action, reward))
+
+ if terminated or truncated:
+ break
+
+ state = next_state # ✅ Track next state
+ action = next_action # ✅ Track next action
+
+ return episode
+
+def get_probs(Q_s, epsilon, nA):
+ """ obtains the action probabilities corresponding to epsilon-greedy policy """
+ policy_states = np.ones(nA) * epsilon / nA
+ best_action = np.argmax(Q_s)
+ policy_states[best_action] = 1 - epsilon + (epsilon / nA)
+ return policy_states
+
+def pick_action(epsilon, Q, next_state):
+ if np.random.rand() < epsilon:
+ next_action = env.action_space.sample() # Explore (random action)
+ else:
+ next_action = np.argmax(Q[next_state]) # Exploit (best action)
+
+ return next_action
+
+def update_q(episode, Q, alpha, gamma):
+ """ updates the action-value function estimate using the most recent episode """
+ states, actions, rewards = zip(*episode)
+ # prepare for discounting
+ for i in range(len(states) - 1): # Ignore last step
+ state, action = states[i], actions[i]
+ next_state, next_action = states[i + 1], actions[i + 1] # ✅ Use episode step
+
+ old_Q = Q[state][action]
+ next_Q = Q[next_state][next_action] # ✅ Correct SARSA update
+ Q[state][action] = old_Q + alpha * (rewards[i] + gamma * next_Q - old_Q)
+ return Q
+
+
+# obtain the estimated optimal policy and corresponding action-value function
+Q_sarsa = sarsa(env, 500000, .01)
+
+# print the estimated optimal policy
+policy_sarsa = np.array([np.argmax(Q_sarsa[key]) if key in Q_sarsa else -1 for key in np.arange(48)]).reshape(4,12)
+check_test.run_check('td_control_check', policy_sarsa)
+print("\nEstimated Optimal Policy (UP = 0, RIGHT = 1, DOWN = 2, LEFT = 3, N/A = -1):")
+print(policy_sarsa)
+
+# plot the estimated optimal state-value function
+V_sarsa = ([np.max(Q_sarsa[key]) if key in Q_sarsa else 0 for key in np.arange(48)])
+plot_values(V_sarsa)
\ No newline at end of file
diff --git a/temporal-difference/TD_CliffWalking_expected_sarsa_Solution.py b/temporal-difference/TD_CliffWalking_expected_sarsa_Solution.py
new file mode 100644
index 000000000..963c85f7a
--- /dev/null
+++ b/temporal-difference/TD_CliffWalking_expected_sarsa_Solution.py
@@ -0,0 +1,117 @@
+import sys
+import gym
+import numpy as np
+from collections import defaultdict, deque
+import matplotlib.pyplot as plt
+
+import check_test
+from plot_utils import plot_values
+
+env = gym.make('CliffWalking-v0')
+
+# print(env.action_space)
+# print(env.observation_space)
+#
+# # define the optimal state-value function
+# V_opt = np.zeros((4, 12))
+# V_opt[0:13][0] = -np.arange(3, 15)[::-1]
+# V_opt[0:13][1] = -np.arange(3, 15)[::-1] + 1
+# V_opt[0:13][2] = -np.arange(3, 15)[::-1] + 2
+# V_opt[3][0] = -13
+#
+# plot_values(V_opt)
+
+
+def sarsa(env, num_episodes, alpha, gamma=1.0, epsilon_start=0.5):
+ # decide epsilon
+ epsilon = epsilon_start
+ epsilon_min = 0.1
+ epsilon_decay = 0.99
+
+ # initialize action-value function (empty dictionary of arrays)
+ Q = defaultdict(lambda: np.zeros(env.nA))
+ # initialize performance monitor
+ # loop over episodes
+ for i_episode in range(1, num_episodes + 1):
+
+ # monitor progress
+ if i_episode % 1 == 0:
+ print("\rEpisode {}/{}.".format(i_episode, num_episodes), end="")
+ sys.stdout.flush()
+
+ # set the value of epsilon
+ epsilon = max(epsilon * epsilon_decay, epsilon_min)
+
+ # generate episode
+ episode = generate_episode(env=env, Q=Q, epsilon=epsilon, nA=4)
+
+ Q = update_q(episode, Q, alpha, gamma)
+ return Q
+
+
+def generate_episode(env, Q, epsilon, nA):
+ episode = []
+ state, _ = env.reset()
+
+ action = np.random.choice(np.arange(nA), p=get_probs(Q[state], epsilon, nA)) if state not in Q else np.argmax(
+ Q[state]
+ )
+
+ while True:
+ next_state, reward, terminated, truncated, _ = env.step(action) # ✅ New API
+ if next_state not in Q:
+ Q[next_state] = np.zeros(nA) # ✅ Ensure Q-value initialization
+
+ next_action = np.random.choice(np.arange(nA), p=get_probs(Q[next_state], epsilon, nA))
+
+ episode.append((state, action, reward))
+
+ if terminated or truncated:
+ break
+
+ state = next_state # ✅ Track next state
+ action = next_action # ✅ Track next action
+
+ return episode
+
+def get_probs(Q_s, epsilon, nA):
+ """ obtains the action probabilities corresponding to epsilon-greedy policy """
+ policy_states = np.ones(nA) * epsilon / nA
+ best_action = np.argmax(Q_s)
+ policy_states[best_action] = 1 - epsilon + (epsilon / nA)
+ return policy_states
+
+def pick_action(epsilon, Q, next_state):
+ if np.random.rand() < epsilon:
+ next_action = env.action_space.sample() # Explore (random action)
+ else:
+ next_action = np.argmax(Q[next_state]) # Exploit (best action)
+
+ return next_action
+
+def update_q(episode, Q, alpha, gamma):
+ """ updates the action-value function estimate using the most recent episode """
+ states, actions, rewards = zip(*episode)
+ # prepare for discounting
+ for i in range(len(states) - 1): # Ignore last step
+ state, action = states[i], actions[i]
+ next_state, next_action = states[i + 1], actions[i + 1] # ✅ Use episode step
+
+ old_Q = Q[state][action]
+ next_Q = Q[next_state][next_action] # ✅ Correct SARSA update
+ Q[state][action] = old_Q + alpha * (rewards[i] + gamma * next_Q - old_Q)
+ return Q
+
+
+# obtain the estimated optimal policy and corresponding action-value function
+Q_sarsa = sarsa(env, 5000, .01)
+
+# print the estimated optimal policy
+policy_sarsa = np.array([np.argmax(Q_sarsa[key]) if key in Q_sarsa else -1 for key in np.arange(48)]).reshape(4,12)
+check_test.run_check('td_control_check', policy_sarsa)
+print("\nEstimated Optimal Policy (UP = 0, RIGHT = 1, DOWN = 2, LEFT = 3, N/A = -1):")
+print(policy_sarsa)
+
+# plot the estimated optimal state-value function
+V_sarsa = ([np.max(Q_sarsa[key]) if key in Q_sarsa else 0 for key in np.arange(48)])
+plot_values(V_sarsa)
\ No newline at end of file
From 90884dd1c97e327f9e2de214539aacb81d8c47ba Mon Sep 17 00:00:00 2001
From: simondrugan16 <119412400+simondrugan16@users.noreply.github.com>
Date: Sat, 15 Feb 2025 13:05:36 +0000
Subject: [PATCH 2/2] Temporal Difference Exercises
---
.idea/.gitignore | 8 ++
.idea/deep-reinforcement-learning.iml | 15 +++
.../inspectionProfiles/profiles_settings.xml | 6 +
.idea/misc.xml | 7 +
.idea/modules.xml | 8 ++
.idea/vcs.xml | 6 +
monte-carlo/blackjack.py | 0
.../TD_CliffWalking_Q_Learning_Solution.py | 111 +++++-----------
.../TD_CliffWalking_SARSA_Solution.py | 40 ++----
...TD_CliffWalking_expected_sarsa_Solution.py | 122 ++++++------------
10 files changed, 130 insertions(+), 193 deletions(-)
create mode 100644 .idea/.gitignore
create mode 100644 .idea/deep-reinforcement-learning.iml
create mode 100644 .idea/inspectionProfiles/profiles_settings.xml
create mode 100644 .idea/misc.xml
create mode 100644 .idea/modules.xml
create mode 100644 .idea/vcs.xml
delete mode 100644 monte-carlo/blackjack.py
diff --git a/.idea/.gitignore b/.idea/.gitignore
new file mode 100644
index 000000000..13566b81b
--- /dev/null
+++ b/.idea/.gitignore
@@ -0,0 +1,8 @@
+# Default ignored files
+/shelf/
+/workspace.xml
+# Editor-based HTTP Client requests
+/httpRequests/
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml
diff --git a/.idea/deep-reinforcement-learning.iml b/.idea/deep-reinforcement-learning.iml
new file mode 100644
index 000000000..5fdd65ba2
--- /dev/null
+++ b/.idea/deep-reinforcement-learning.iml
@@ -0,0 +1,15 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml
new file mode 100644
index 000000000..105ce2da2
--- /dev/null
+++ b/.idea/inspectionProfiles/profiles_settings.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/misc.xml b/.idea/misc.xml
new file mode 100644
index 000000000..db8786c06
--- /dev/null
+++ b/.idea/misc.xml
@@ -0,0 +1,7 @@
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/modules.xml b/.idea/modules.xml
new file mode 100644
index 000000000..110fab90a
--- /dev/null
+++ b/.idea/modules.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 000000000..35eb1ddfb
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/monte-carlo/blackjack.py b/monte-carlo/blackjack.py
deleted file mode 100644
index e69de29bb..000000000
diff --git a/temporal-difference/TD_CliffWalking_Q_Learning_Solution.py b/temporal-difference/TD_CliffWalking_Q_Learning_Solution.py
index d2750770a..207416031 100644
--- a/temporal-difference/TD_CliffWalking_Q_Learning_Solution.py
+++ b/temporal-difference/TD_CliffWalking_Q_Learning_Solution.py
@@ -1,32 +1,20 @@
-import sys
+from collections import defaultdict
+
import gym
import numpy as np
-from collections import defaultdict, deque
-import matplotlib.pyplot as plt
import check_test
from plot_utils import plot_values
env = gym.make('CliffWalking-v0')
-# print(env.action_space)
-# print(env.observation_space)
-#
-# # define the optimal state-value function
-# V_opt = np.zeros((4, 12))
-# V_opt[0:13][0] = -np.arange(3, 15)[::-1]
-# V_opt[0:13][1] = -np.arange(3, 15)[::-1] + 1
-# V_opt[0:13][2] = -np.arange(3, 15)[::-1] + 2
-# V_opt[3][0] = -13
-#
-# plot_values(V_opt)
-
-
-def sarsa(env, num_episodes, alpha, gamma=1.0, epsilon_start=1.0):
+def q_learning(env, num_episodes, alpha, gamma=1.0, epsilon_start=1.0):
# decide epsilon
epsilon = epsilon_start
epsilon_min = 0.1
- epsilon_decay = 0.9999
+ epsilon_decay = 0.997
+
+ nA = 4
# initialize action-value function (empty dictionary of arrays)
Q = defaultdict(lambda: np.zeros(env.nA))
@@ -35,53 +23,35 @@ def sarsa(env, num_episodes, alpha, gamma=1.0, epsilon_start=1.0):
for i_episode in range(1, num_episodes + 1):
# monitor progress
- if i_episode % 499999 == 0:
- print("\rEpisode {}/{}".format(i_episode, num_episodes), end="")
- print (str(Q))
+ # if i_episode % 1 == 0:
+ # print("\rEpisode {}/{}.".format(i_episode, num_episodes), end="")
+ #
+ # sys.stdout.flush()
+ if i_episode % 100 == 0:
+ print(f"Episode {i_episode}: Epsilon = {epsilon:.4f}")
# set the value of epsilon
epsilon = max(epsilon * epsilon_decay, epsilon_min)
# generate episode
- episode = generate_episode(env=env, Q=Q, epsilon=epsilon, nA=4)
-
- Q = update_q(episode, Q, alpha, gamma)
- return Q
-
-
-def generate_episode(env, Q, epsilon, nA):
- episode = []
- state, _ = env.reset()
- if isinstance(state, dict):
- state = tuple(sorted(state.items()))
+ state, _ = env.reset()
- action = np.random.choice(np.arange(nA),
- p=get_probs(Q[state], epsilon, nA)) if state in Q else env.action_space.sample()
+ while True:
+ action = np.random.choice(np.arange(nA), p=get_probs(Q[state], epsilon, nA))
- while True:
- if isinstance(state, tuple):
- state = state[0] # Extract actual state if (state, info) is returned
+ next_state, reward, terminated, truncated, _ = env.step(action) # ✅ New API
+ if next_state not in Q:
+ Q[next_state] = np.zeros(nA) # ✅ Ensure Q-value initialization
- # ✅ Convert state to tuple if it’s a dictionary
- if isinstance(state, dict):
- state = tuple(sorted(state.items()))
+ if terminated or truncated:
+ break
- next_state, reward, terminated, truncated, _ = env.step(action) # ✅ New API
- if next_state not in Q:
- Q[next_state] = np.zeros(nA) # ✅ Ensure Q-value initialization
+ next_Q = 0 if terminated else np.max(Q[next_state])
+ Q[state][action] += alpha * (reward + gamma * next_Q - Q[state][action])
- next_action = np.random.choice(np.arange(nA), p=get_probs(Q[next_state], epsilon, nA))
-
- episode.append((state, action, reward))
-
- if terminated or truncated:
- break
-
- state = next_state # ✅ Track next state
- action = next_action # ✅ Track next action
-
- return episode
+ state = next_state
+ return Q
def get_probs(Q_s, epsilon, nA):
""" obtains the action probabilities corresponding to epsilon-greedy policy """
@@ -90,37 +60,16 @@ def get_probs(Q_s, epsilon, nA):
policy_states[best_action] = 1 - epsilon + (epsilon / nA)
return policy_states
-def pick_action(epsilon, Q, next_state):
- if np.random.rand() < epsilon:
- next_action = env.action_space.sample() # Explore (random action)
- else:
- next_action = np.argmax(Q[next_state]) # Exploit (best action)
-
- return next_action
-
-def update_q(episode, Q, alpha, gamma):
- """ updates the action-value function estimate using the most recent episode """
- states, actions, rewards = zip(*episode)
- # prepare for discounting
- for i in range(len(states) - 1): # Ignore last step
- state, action = states[i], actions[i]
- next_state, next_action = states[i + 1], actions[i + 1] # ✅ Use episode step
-
- old_Q = Q[state][action]
- next_Q = Q[next_state][next_action] # ✅ Correct SARSA update
- Q[state][action] = old_Q + alpha * (rewards[i] + gamma * next_Q - old_Q)
- return Q
-
# obtain the estimated optimal policy and corresponding action-value function
-Q_sarsa = sarsa(env, 500000, .01)
+Q_q_learning = q_learning(env, 5000, .01)
# print the estimated optimal policy
-policy_sarsa = np.array([np.argmax(Q_sarsa[key]) if key in Q_sarsa else -1 for key in np.arange(48)]).reshape(4,12)
-check_test.run_check('td_control_check', policy_sarsa)
+policy_q_learning = np.array([np.argmax(Q_q_learning[key]) if key in Q_q_learning else -1 for key in np.arange(48)]).reshape(4,12)
+check_test.run_check('td_control_check', policy_q_learning)
print("\nEstimated Optimal Policy (UP = 0, RIGHT = 1, DOWN = 2, LEFT = 3, N/A = -1):")
-print(policy_sarsa)
+print(policy_q_learning)
# plot the estimated optimal state-value function
-V_sarsa = ([np.max(Q_sarsa[key]) if key in Q_sarsa else 0 for key in np.arange(48)])
-plot_values(V_sarsa)
\ No newline at end of file
+V_q_learning = ([np.max(Q_q_learning[key]) if key in Q_q_learning else 0 for key in np.arange(48)])
+plot_values(V_q_learning)
\ No newline at end of file
diff --git a/temporal-difference/TD_CliffWalking_SARSA_Solution.py b/temporal-difference/TD_CliffWalking_SARSA_Solution.py
index d2750770a..cd44b56d9 100644
--- a/temporal-difference/TD_CliffWalking_SARSA_Solution.py
+++ b/temporal-difference/TD_CliffWalking_SARSA_Solution.py
@@ -9,24 +9,11 @@
env = gym.make('CliffWalking-v0')
-# print(env.action_space)
-# print(env.observation_space)
-#
-# # define the optimal state-value function
-# V_opt = np.zeros((4, 12))
-# V_opt[0:13][0] = -np.arange(3, 15)[::-1]
-# V_opt[0:13][1] = -np.arange(3, 15)[::-1] + 1
-# V_opt[0:13][2] = -np.arange(3, 15)[::-1] + 2
-# V_opt[3][0] = -13
-#
-# plot_values(V_opt)
-
-
-def sarsa(env, num_episodes, alpha, gamma=1.0, epsilon_start=1.0):
+def sarsa(env, num_episodes, alpha, gamma=1.0, epsilon_start=0.5):
# decide epsilon
epsilon = epsilon_start
epsilon_min = 0.1
- epsilon_decay = 0.9999
+ epsilon_decay = 0.99
# initialize action-value function (empty dictionary of arrays)
Q = defaultdict(lambda: np.zeros(env.nA))
@@ -35,10 +22,9 @@ def sarsa(env, num_episodes, alpha, gamma=1.0, epsilon_start=1.0):
for i_episode in range(1, num_episodes + 1):
# monitor progress
- if i_episode % 499999 == 0:
- print("\rEpisode {}/{}".format(i_episode, num_episodes), end="")
- print (str(Q))
-
+ if i_episode % 1 == 0:
+ print("\rEpisode {}/{}.".format(i_episode, num_episodes), end="")
+ sys.stdout.flush()
# set the value of epsilon
epsilon = max(epsilon * epsilon_decay, epsilon_min)
@@ -53,20 +39,12 @@ def sarsa(env, num_episodes, alpha, gamma=1.0, epsilon_start=1.0):
def generate_episode(env, Q, epsilon, nA):
episode = []
state, _ = env.reset()
- if isinstance(state, dict):
- state = tuple(sorted(state.items()))
- action = np.random.choice(np.arange(nA),
- p=get_probs(Q[state], epsilon, nA)) if state in Q else env.action_space.sample()
+ action = np.random.choice(np.arange(nA), p=get_probs(Q[state], epsilon, nA)) if state not in Q else np.argmax(
+ Q[state]
+ )
while True:
- if isinstance(state, tuple):
- state = state[0] # Extract actual state if (state, info) is returned
-
- # ✅ Convert state to tuple if it’s a dictionary
- if isinstance(state, dict):
- state = tuple(sorted(state.items()))
-
next_state, reward, terminated, truncated, _ = env.step(action) # ✅ New API
if next_state not in Q:
Q[next_state] = np.zeros(nA) # ✅ Ensure Q-value initialization
@@ -113,7 +91,7 @@ def update_q(episode, Q, alpha, gamma):
# obtain the estimated optimal policy and corresponding action-value function
-Q_sarsa = sarsa(env, 500000, .01)
+Q_sarsa = sarsa(env, 5000, .01)
# print the estimated optimal policy
policy_sarsa = np.array([np.argmax(Q_sarsa[key]) if key in Q_sarsa else -1 for key in np.arange(48)]).reshape(4,12)
diff --git a/temporal-difference/TD_CliffWalking_expected_sarsa_Solution.py b/temporal-difference/TD_CliffWalking_expected_sarsa_Solution.py
index 963c85f7a..e3f537cea 100644
--- a/temporal-difference/TD_CliffWalking_expected_sarsa_Solution.py
+++ b/temporal-difference/TD_CliffWalking_expected_sarsa_Solution.py
@@ -1,7 +1,7 @@
import sys
import gym
import numpy as np
-from collections import defaultdict, deque
+from collections import defaultdict
import matplotlib.pyplot as plt
import check_test
@@ -9,109 +9,69 @@
env = gym.make('CliffWalking-v0')
-# print(env.action_space)
-# print(env.observation_space)
-#
-# # define the optimal state-value function
-# V_opt = np.zeros((4, 12))
-# V_opt[0:13][0] = -np.arange(3, 15)[::-1]
-# V_opt[0:13][1] = -np.arange(3, 15)[::-1] + 1
-# V_opt[0:13][2] = -np.arange(3, 15)[::-1] + 2
-# V_opt[3][0] = -13
-#
-# plot_values(V_opt)
-
-
-def sarsa(env, num_episodes, alpha, gamma=1.0, epsilon_start=0.5):
- # decide epsilon
+def expected_sarsa(env, num_episodes, alpha, gamma=0.95, epsilon_start=1.0):
+ # Initialize epsilon and related parameters
epsilon = epsilon_start
epsilon_min = 0.1
- epsilon_decay = 0.99
+ epsilon_decay = 0.995
- # initialize action-value function (empty dictionary of arrays)
+ nA = 4 # number of actions
+
+ # Initialize the action-value function (empty dictionary of arrays)
Q = defaultdict(lambda: np.zeros(env.nA))
- # initialize performance monitor
- # loop over episodes
- for i_episode in range(1, num_episodes + 1):
- # monitor progress
- if i_episode % 1 == 0:
- print("\rEpisode {}/{}.".format(i_episode, num_episodes), end="")
- sys.stdout.flush()
+ # Loop over episodes
+ for i_episode in range(1, num_episodes + 1):
+ if i_episode % 100 == 0:
+ print(f"Episode {i_episode}: Epsilon = {epsilon:.4f}")
- # set the value of epsilon
+ # Decay epsilon for exploration-exploitation balance
epsilon = max(epsilon * epsilon_decay, epsilon_min)
- # generate episode
- episode = generate_episode(env=env, Q=Q, epsilon=epsilon, nA=4)
-
- Q = update_q(episode, Q, alpha, gamma)
- return Q
-
-
-def generate_episode(env, Q, epsilon, nA):
- episode = []
- state, _ = env.reset()
+ # Generate episode
+ state, _ = env.reset()
+ action = np.random.choice(np.arange(nA), p=get_probs(Q[state], epsilon, nA)) # epsilon-greedy policy
- action = np.random.choice(np.arange(nA), p=get_probs(Q[state], epsilon, nA)) if state not in Q else np.argmax(
- Q[state]
- )
+ while True:
+ next_state, reward, terminated, truncated, _ = env.step(action)
- while True:
- next_state, reward, terminated, truncated, _ = env.step(action) # ✅ New API
- if next_state not in Q:
- Q[next_state] = np.zeros(nA) # ✅ Ensure Q-value initialization
+ # Initialize Q-values for the next state if not already initialized
+ if next_state not in Q:
+ Q[next_state] = np.zeros(nA)
- next_action = np.random.choice(np.arange(nA), p=get_probs(Q[next_state], epsilon, nA))
+ # Compute the expected Q-value for the next state
+ expected_Q = np.dot(get_probs(Q[next_state], epsilon, nA), Q[next_state])
- episode.append((state, action, reward))
+ # SARSA update
+ Q[state][action] += alpha * (reward + gamma * expected_Q - Q[state][action])
- if terminated or truncated:
- break
+ # If the episode ends, break out of the loop
+ if terminated or truncated:
+ break
- state = next_state # ✅ Track next state
- action = next_action # ✅ Track next action
+ # Transition to the next state and choose the next action using epsilon-greedy
+ state = next_state
+ action = np.random.choice(np.arange(nA), p=get_probs(Q[state], epsilon, nA))
- return episode
+ return Q
def get_probs(Q_s, epsilon, nA):
- """ obtains the action probabilities corresponding to epsilon-greedy policy """
+ """ Obtains the action probabilities corresponding to an epsilon-greedy policy """
policy_states = np.ones(nA) * epsilon / nA
best_action = np.argmax(Q_s)
policy_states[best_action] = 1 - epsilon + (epsilon / nA)
return policy_states
-def pick_action(epsilon, Q, next_state):
- if np.random.rand() < epsilon:
- next_action = env.action_space.sample() # Explore (random action)
- else:
- next_action = np.argmax(Q[next_state]) # Exploit (best action)
-
- return next_action
-
-def update_q(episode, Q, alpha, gamma):
- """ updates the action-value function estimate using the most recent episode """
- states, actions, rewards = zip(*episode)
- # prepare for discounting
- for i in range(len(states) - 1): # Ignore last step
- state, action = states[i], actions[i]
- next_state, next_action = states[i + 1], actions[i + 1] # ✅ Use episode step
-
- old_Q = Q[state][action]
- next_Q = Q[next_state][next_action] # ✅ Correct SARSA update
- Q[state][action] = old_Q + alpha * (rewards[i] + gamma * next_Q - old_Q)
- return Q
-
-# obtain the estimated optimal policy and corresponding action-value function
-Q_sarsa = sarsa(env, 5000, .01)
+# Obtain the estimated optimal policy and corresponding action-value function using Expected SARSA
+Q_expected_sarsa = expected_sarsa(env, 50000, .01)
-# print the estimated optimal policy
-policy_sarsa = np.array([np.argmax(Q_sarsa[key]) if key in Q_sarsa else -1 for key in np.arange(48)]).reshape(4,12)
-check_test.run_check('td_control_check', policy_sarsa)
+# Print the estimated optimal policy
+policy_expected_sarsa = np.array([np.argmax(Q_expected_sarsa[key]) if key in Q_expected_sarsa else -1 for key in np.arange(48)]).reshape(4, 12)
+check_test.run_check('td_control_check', policy_expected_sarsa)
print("\nEstimated Optimal Policy (UP = 0, RIGHT = 1, DOWN = 2, LEFT = 3, N/A = -1):")
-print(policy_sarsa)
+print(policy_expected_sarsa)
-# plot the estimated optimal state-value function
-V_sarsa = ([np.max(Q_sarsa[key]) if key in Q_sarsa else 0 for key in np.arange(48)])
-plot_values(V_sarsa)
\ No newline at end of file
+# Plot the estimated optimal state-value function
+V_expected_sarsa = [np.max(Q_expected_sarsa[key]) if key in Q_expected_sarsa else 0 for key in np.arange(48)]
+plot_values(V_expected_sarsa)