diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 000000000..13566b81b --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/deep-reinforcement-learning.iml b/.idea/deep-reinforcement-learning.iml new file mode 100644 index 000000000..5fdd65ba2 --- /dev/null +++ b/.idea/deep-reinforcement-learning.iml @@ -0,0 +1,15 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 000000000..105ce2da2 --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 000000000..db8786c06 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 000000000..110fab90a --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 000000000..35eb1ddfb --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/temporal-difference/TD_CliffWalking_Q_Learning_Solution.py b/temporal-difference/TD_CliffWalking_Q_Learning_Solution.py new file mode 100644 index 000000000..207416031 --- /dev/null +++ b/temporal-difference/TD_CliffWalking_Q_Learning_Solution.py @@ -0,0 +1,75 @@ +from collections import defaultdict + +import gym +import numpy as np + +import check_test +from plot_utils import plot_values + +env = gym.make('CliffWalking-v0') + +def q_learning(env, num_episodes, alpha, gamma=1.0, epsilon_start=1.0): + # decide epsilon + epsilon = epsilon_start + epsilon_min = 0.1 + epsilon_decay = 0.997 + + nA = 4 + + # initialize action-value function (empty dictionary of arrays) + Q = defaultdict(lambda: np.zeros(env.nA)) + # initialize performance monitor + # loop over episodes + for i_episode in range(1, num_episodes + 1): + + # monitor progress + # if i_episode % 1 == 0: + # print("\rEpisode {}/{}.".format(i_episode, num_episodes), end="") + # + # sys.stdout.flush() + + if i_episode % 100 == 0: + print(f"Episode {i_episode}: Epsilon = {epsilon:.4f}") + + # set the value of epsilon + epsilon = max(epsilon * epsilon_decay, epsilon_min) + + # generate episode + state, _ = env.reset() + + while True: + action = np.random.choice(np.arange(nA), p=get_probs(Q[state], epsilon, nA)) + + next_state, reward, terminated, truncated, _ = env.step(action) # ✅ New API + if next_state not in Q: + Q[next_state] = np.zeros(nA) # ✅ Ensure Q-value initialization + + if terminated or truncated: + break + + next_Q = 0 if terminated else np.max(Q[next_state]) + Q[state][action] += alpha * (reward + gamma * next_Q - Q[state][action]) + + state = next_state + return Q + +def get_probs(Q_s, epsilon, nA): + """ obtains the action probabilities corresponding to epsilon-greedy policy """ + policy_states = np.ones(nA) * epsilon / nA + best_action = np.argmax(Q_s) + policy_states[best_action] = 1 - epsilon + (epsilon / nA) + return policy_states + + +# obtain the estimated optimal policy and corresponding action-value function +Q_q_learning = q_learning(env, 5000, .01) + +# print the estimated optimal policy +policy_q_learning = np.array([np.argmax(Q_q_learning[key]) if key in Q_q_learning else -1 for key in np.arange(48)]).reshape(4,12) +check_test.run_check('td_control_check', policy_q_learning) +print("\nEstimated Optimal Policy (UP = 0, RIGHT = 1, DOWN = 2, LEFT = 3, N/A = -1):") +print(policy_q_learning) + +# plot the estimated optimal state-value function +V_q_learning = ([np.max(Q_q_learning[key]) if key in Q_q_learning else 0 for key in np.arange(48)]) +plot_values(V_q_learning) \ No newline at end of file diff --git a/temporal-difference/TD_CliffWalking_SARSA_Solution.py b/temporal-difference/TD_CliffWalking_SARSA_Solution.py new file mode 100644 index 000000000..cd44b56d9 --- /dev/null +++ b/temporal-difference/TD_CliffWalking_SARSA_Solution.py @@ -0,0 +1,104 @@ +import sys +import gym +import numpy as np +from collections import defaultdict, deque +import matplotlib.pyplot as plt + +import check_test +from plot_utils import plot_values + +env = gym.make('CliffWalking-v0') + +def sarsa(env, num_episodes, alpha, gamma=1.0, epsilon_start=0.5): + # decide epsilon + epsilon = epsilon_start + epsilon_min = 0.1 + epsilon_decay = 0.99 + + # initialize action-value function (empty dictionary of arrays) + Q = defaultdict(lambda: np.zeros(env.nA)) + # initialize performance monitor + # loop over episodes + for i_episode in range(1, num_episodes + 1): + + # monitor progress + if i_episode % 1 == 0: + print("\rEpisode {}/{}.".format(i_episode, num_episodes), end="") + sys.stdout.flush() + + # set the value of epsilon + epsilon = max(epsilon * epsilon_decay, epsilon_min) + + # generate episode + episode = generate_episode(env=env, Q=Q, epsilon=epsilon, nA=4) + + Q = update_q(episode, Q, alpha, gamma) + return Q + + +def generate_episode(env, Q, epsilon, nA): + episode = [] + state, _ = env.reset() + + action = np.random.choice(np.arange(nA), p=get_probs(Q[state], epsilon, nA)) if state not in Q else np.argmax( + Q[state] + ) + + while True: + next_state, reward, terminated, truncated, _ = env.step(action) # ✅ New API + if next_state not in Q: + Q[next_state] = np.zeros(nA) # ✅ Ensure Q-value initialization + + next_action = np.random.choice(np.arange(nA), p=get_probs(Q[next_state], epsilon, nA)) + + episode.append((state, action, reward)) + + if terminated or truncated: + break + + state = next_state # ✅ Track next state + action = next_action # ✅ Track next action + + return episode + +def get_probs(Q_s, epsilon, nA): + """ obtains the action probabilities corresponding to epsilon-greedy policy """ + policy_states = np.ones(nA) * epsilon / nA + best_action = np.argmax(Q_s) + policy_states[best_action] = 1 - epsilon + (epsilon / nA) + return policy_states + +def pick_action(epsilon, Q, next_state): + if np.random.rand() < epsilon: + next_action = env.action_space.sample() # Explore (random action) + else: + next_action = np.argmax(Q[next_state]) # Exploit (best action) + + return next_action + +def update_q(episode, Q, alpha, gamma): + """ updates the action-value function estimate using the most recent episode """ + states, actions, rewards = zip(*episode) + # prepare for discounting + for i in range(len(states) - 1): # Ignore last step + state, action = states[i], actions[i] + next_state, next_action = states[i + 1], actions[i + 1] # ✅ Use episode step + + old_Q = Q[state][action] + next_Q = Q[next_state][next_action] # ✅ Correct SARSA update + Q[state][action] = old_Q + alpha * (rewards[i] + gamma * next_Q - old_Q) + return Q + + +# obtain the estimated optimal policy and corresponding action-value function +Q_sarsa = sarsa(env, 5000, .01) + +# print the estimated optimal policy +policy_sarsa = np.array([np.argmax(Q_sarsa[key]) if key in Q_sarsa else -1 for key in np.arange(48)]).reshape(4,12) +check_test.run_check('td_control_check', policy_sarsa) +print("\nEstimated Optimal Policy (UP = 0, RIGHT = 1, DOWN = 2, LEFT = 3, N/A = -1):") +print(policy_sarsa) + +# plot the estimated optimal state-value function +V_sarsa = ([np.max(Q_sarsa[key]) if key in Q_sarsa else 0 for key in np.arange(48)]) +plot_values(V_sarsa) \ No newline at end of file diff --git a/temporal-difference/TD_CliffWalking_expected_sarsa_Solution.py b/temporal-difference/TD_CliffWalking_expected_sarsa_Solution.py new file mode 100644 index 000000000..e3f537cea --- /dev/null +++ b/temporal-difference/TD_CliffWalking_expected_sarsa_Solution.py @@ -0,0 +1,77 @@ +import sys +import gym +import numpy as np +from collections import defaultdict +import matplotlib.pyplot as plt + +import check_test +from plot_utils import plot_values + +env = gym.make('CliffWalking-v0') + +def expected_sarsa(env, num_episodes, alpha, gamma=0.95, epsilon_start=1.0): + # Initialize epsilon and related parameters + epsilon = epsilon_start + epsilon_min = 0.1 + epsilon_decay = 0.995 + + nA = 4 # number of actions + + # Initialize the action-value function (empty dictionary of arrays) + Q = defaultdict(lambda: np.zeros(env.nA)) + + # Loop over episodes + for i_episode in range(1, num_episodes + 1): + if i_episode % 100 == 0: + print(f"Episode {i_episode}: Epsilon = {epsilon:.4f}") + + # Decay epsilon for exploration-exploitation balance + epsilon = max(epsilon * epsilon_decay, epsilon_min) + + # Generate episode + state, _ = env.reset() + action = np.random.choice(np.arange(nA), p=get_probs(Q[state], epsilon, nA)) # epsilon-greedy policy + + while True: + next_state, reward, terminated, truncated, _ = env.step(action) + + # Initialize Q-values for the next state if not already initialized + if next_state not in Q: + Q[next_state] = np.zeros(nA) + + # Compute the expected Q-value for the next state + expected_Q = np.dot(get_probs(Q[next_state], epsilon, nA), Q[next_state]) + + # SARSA update + Q[state][action] += alpha * (reward + gamma * expected_Q - Q[state][action]) + + # If the episode ends, break out of the loop + if terminated or truncated: + break + + # Transition to the next state and choose the next action using epsilon-greedy + state = next_state + action = np.random.choice(np.arange(nA), p=get_probs(Q[state], epsilon, nA)) + + return Q + +def get_probs(Q_s, epsilon, nA): + """ Obtains the action probabilities corresponding to an epsilon-greedy policy """ + policy_states = np.ones(nA) * epsilon / nA + best_action = np.argmax(Q_s) + policy_states[best_action] = 1 - epsilon + (epsilon / nA) + return policy_states + + +# Obtain the estimated optimal policy and corresponding action-value function using Expected SARSA +Q_expected_sarsa = expected_sarsa(env, 50000, .01) + +# Print the estimated optimal policy +policy_expected_sarsa = np.array([np.argmax(Q_expected_sarsa[key]) if key in Q_expected_sarsa else -1 for key in np.arange(48)]).reshape(4, 12) +check_test.run_check('td_control_check', policy_expected_sarsa) +print("\nEstimated Optimal Policy (UP = 0, RIGHT = 1, DOWN = 2, LEFT = 3, N/A = -1):") +print(policy_expected_sarsa) + +# Plot the estimated optimal state-value function +V_expected_sarsa = [np.max(Q_expected_sarsa[key]) if key in Q_expected_sarsa else 0 for key in np.arange(48)] +plot_values(V_expected_sarsa)